hotshot_types/
epoch_membership.rs

1use std::{
2    collections::{BTreeSet, HashMap, HashSet},
3    sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
8use async_lock::{Mutex, RwLock};
9use committable::Commitment;
10use hotshot_utils::{anytrace::*, *};
11
12use crate::{
13    data::Leaf2,
14    drb::{compute_drb_result, DrbDifficultySelectorFn, DrbInput, DrbResult},
15    event::Event,
16    stake_table::HSStakeTable,
17    traits::{
18        election::Membership,
19        node_implementation::{ConsensusTime, NodeType},
20        storage::{
21            load_drb_progress_fn, store_drb_progress_fn, store_drb_result_fn, LoadDrbProgressFn,
22            Storage, StoreDrbProgressFn, StoreDrbResultFn,
23        },
24    },
25    utils::root_block_in_epoch,
26    PeerConfig,
27};
28
29type EpochMap<TYPES> =
30    HashMap<<TYPES as NodeType>::Epoch, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
31
32type DrbMap<TYPES> = HashSet<<TYPES as NodeType>::Epoch>;
33
34type EpochSender<TYPES> = (
35    <TYPES as NodeType>::Epoch,
36    Sender<Result<EpochMembership<TYPES>>>,
37);
38
39/// Struct to Coordinate membership catchup
40pub struct EpochMembershipCoordinator<TYPES: NodeType> {
41    /// The underlying membhersip
42    membership: Arc<RwLock<TYPES::Membership>>,
43
44    /// Any in progress attempts at catching up are stored in this map
45    /// Any new callers wantin an `EpochMembership` will await on the signal
46    /// alerting them the membership is ready.  The first caller for an epoch will
47    /// wait for the actual catchup and alert future callers when it's done
48    catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
49
50    drb_calculation_map: Arc<Mutex<DrbMap<TYPES>>>,
51
52    /// Number of blocks in an epoch
53    pub epoch_height: u64,
54
55    store_drb_progress_fn: StoreDrbProgressFn,
56
57    load_drb_progress_fn: LoadDrbProgressFn,
58
59    /// Callback function to store a drb result in storage when one is calculated during catchup
60    store_drb_result_fn: StoreDrbResultFn<TYPES>,
61
62    /// Callback function to select a DRB difficulty based on the view number of the seed
63    pub drb_difficulty_selector: Arc<RwLock<Option<DrbDifficultySelectorFn<TYPES>>>>,
64}
65
66impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
67    fn clone(&self) -> Self {
68        Self {
69            membership: Arc::clone(&self.membership),
70            catchup_map: Arc::clone(&self.catchup_map),
71            drb_calculation_map: Arc::clone(&self.drb_calculation_map),
72            epoch_height: self.epoch_height,
73            store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
74            load_drb_progress_fn: Arc::clone(&self.load_drb_progress_fn),
75            store_drb_result_fn: self.store_drb_result_fn.clone(),
76            drb_difficulty_selector: Arc::clone(&self.drb_difficulty_selector),
77        }
78    }
79}
80
81impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
82where
83    Self: Send,
84{
85    /// Create an EpochMembershipCoordinator
86    pub fn new<S: Storage<TYPES>>(
87        membership: Arc<RwLock<TYPES::Membership>>,
88        epoch_height: u64,
89        storage: &S,
90    ) -> Self {
91        Self {
92            membership,
93            catchup_map: Arc::default(),
94            drb_calculation_map: Arc::default(),
95            epoch_height,
96            store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
97            load_drb_progress_fn: load_drb_progress_fn(storage.clone()),
98            store_drb_result_fn: store_drb_result_fn(storage.clone()),
99            drb_difficulty_selector: Arc::new(RwLock::new(None)),
100        }
101    }
102
103    pub async fn set_external_channel(&mut self, external_channel: Receiver<Event<TYPES>>) {
104        self.membership
105            .write()
106            .await
107            .set_external_channel(external_channel)
108            .await;
109    }
110
111    /// Get a reference to the membership
112    #[must_use]
113    pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
114        &self.membership
115    }
116
117    /// Set the DRB difficulty selector
118    pub async fn set_drb_difficulty_selector(
119        &self,
120        drb_difficulty_selector: DrbDifficultySelectorFn<TYPES>,
121    ) {
122        let mut drb_difficulty_selector_writer = self.drb_difficulty_selector.write().await;
123
124        *drb_difficulty_selector_writer = Some(drb_difficulty_selector);
125    }
126
127    /// Get a Membership for a given Epoch, which is guaranteed to have a randomized stake
128    /// table for the given Epoch
129    pub async fn membership_for_epoch(
130        &self,
131        maybe_epoch: Option<TYPES::Epoch>,
132    ) -> Result<EpochMembership<TYPES>> {
133        let ret_val = EpochMembership {
134            epoch: maybe_epoch,
135            coordinator: self.clone(),
136        };
137        let Some(epoch) = maybe_epoch else {
138            return Ok(ret_val);
139        };
140        if self
141            .membership
142            .read()
143            .await
144            .has_randomized_stake_table(epoch)
145            .map_err(|e| {
146                error!(
147                    "membership_for_epoch failed while called with maybe_epoch {maybe_epoch:?}: \
148                     {e}"
149                )
150            })?
151        {
152            return Ok(ret_val);
153        }
154        if self.catchup_map.lock().await.contains_key(&epoch) {
155            return Err(warn!(
156                "Randomized stake table for epoch {epoch:?} unavailable. Catchup already in \
157                 progress"
158            ));
159        }
160        let coordinator = self.clone();
161        let (tx, rx) = broadcast(1);
162        self.catchup_map.lock().await.insert(epoch, rx.deactivate());
163        spawn_catchup(coordinator, epoch, tx);
164
165        Err(warn!(
166            "Randomized stake table for epoch {epoch:?} unavailable. Starting catchup"
167        ))
168    }
169
170    /// Get a Membership for a given Epoch, which is guaranteed to have a stake
171    /// table for the given Epoch
172    pub async fn stake_table_for_epoch(
173        &self,
174        maybe_epoch: Option<TYPES::Epoch>,
175    ) -> Result<EpochMembership<TYPES>> {
176        let ret_val = EpochMembership {
177            epoch: maybe_epoch,
178            coordinator: self.clone(),
179        };
180        let Some(epoch) = maybe_epoch else {
181            return Ok(ret_val);
182        };
183        if self.membership.read().await.has_stake_table(epoch) {
184            return Ok(ret_val);
185        }
186        if self.catchup_map.lock().await.contains_key(&epoch) {
187            return Err(warn!(
188                "Stake table for Epoch {epoch:?} Unavailable. Catch up already in Progress"
189            ));
190        }
191        let coordinator = self.clone();
192        let (tx, rx) = broadcast(1);
193        self.catchup_map.lock().await.insert(epoch, rx.deactivate());
194        spawn_catchup(coordinator, epoch, tx);
195
196        Err(warn!(
197            "Stake table for Epoch {epoch:?} Unavailable. Starting catchup"
198        ))
199    }
200
201    /// Catches the membership up to the epoch passed as an argument.  
202    /// To do this, try to get the stake table for the epoch containing this epoch's root and
203    /// the stake table for the epoch containing this epoch's drb result.
204    /// If they do not exist, then go one by one back until we find a stake table.
205    ///
206    /// If there is another catchup in progress this will not duplicate efforts
207    /// e.g. if we start with only the first epoch stake table and call catchup for epoch 10, then call catchup for epoch 20
208    /// the first caller will actually do the work for to catchup to epoch 10 then the second caller will continue
209    /// catching up to epoch 20
210    async fn catchup(
211        mut self,
212        epoch: TYPES::Epoch,
213        epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
214    ) {
215        // We need to fetch the requested epoch, that's for sure
216        let mut fetch_epochs = vec![];
217
218        let mut try_epoch = TYPES::Epoch::new(epoch.saturating_sub(1));
219        let maybe_first_epoch = self.membership.read().await.first_epoch();
220        let Some(first_epoch) = maybe_first_epoch else {
221            let err = anytrace::error!(
222                "We got a catchup request for epoch {epoch:?} but the first epoch is not set"
223            );
224            self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
225                .await;
226            return;
227        };
228
229        // First figure out which epochs we need to fetch
230        loop {
231            let has_stake_table = self.membership.read().await.has_stake_table(try_epoch);
232            if has_stake_table {
233                // We have this stake table but we need to make sure we have the epoch root of the requested epoch
234                if try_epoch <= TYPES::Epoch::new(epoch.saturating_sub(2)) {
235                    break;
236                }
237                try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
238            } else {
239                if try_epoch <= first_epoch + 1 {
240                    let err = anytrace::error!(
241                        "We are trying to catchup to an epoch lower than the second epoch! This \
242                         means the initial stake table is missing!"
243                    );
244                    self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
245                        .await;
246                    return;
247                }
248                // Lock the catchup map
249                let mut map_lock = self.catchup_map.lock().await;
250                if let Some(mut rx) = map_lock
251                    .get(&try_epoch)
252                    .map(InactiveReceiver::activate_cloned)
253                {
254                    // Somebody else is already fetching this epoch, drop the lock and wait for them to finish
255                    drop(map_lock);
256                    if let Ok(Ok(_)) = rx.recv_direct().await {
257                        break;
258                    };
259                    // If we didn't receive the epoch then we need to try again
260                } else {
261                    // Nobody else is fetching this epoch. We need to do it. Put it in the map and move on to the next epoch
262                    let (mut tx, rx) = broadcast(1);
263                    tx.set_overflow(true);
264                    map_lock.insert(try_epoch, rx.deactivate());
265                    drop(map_lock);
266                    fetch_epochs.push((try_epoch, tx));
267                    try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
268                }
269            };
270        }
271
272        // Iterate through the epochs we need to fetch in reverse, i.e. from the oldest to the newest
273        while let Some((current_fetch_epoch, tx)) = fetch_epochs.pop() {
274            match self.fetch_stake_table(current_fetch_epoch).await {
275                Ok(_) => {},
276                Err(err) => {
277                    fetch_epochs.push((current_fetch_epoch, tx));
278                    self.catchup_cleanup(epoch, epoch_tx, fetch_epochs, err)
279                        .await;
280                    return;
281                },
282            };
283
284            // Signal the other tasks about the success
285            if let Ok(Some(res)) = tx.try_broadcast(Ok(EpochMembership {
286                epoch: Some(current_fetch_epoch),
287                coordinator: self.clone(),
288            })) {
289                tracing::warn!(
290                    "The catchup channel for epoch {} was overflown, dropped message {:?}",
291                    current_fetch_epoch,
292                    res.map(|em| em.epoch)
293                );
294            }
295
296            // Remove the epoch from the catchup map to indicate that the catchup is complete
297            self.catchup_map.lock().await.remove(&current_fetch_epoch);
298        }
299
300        let root_leaf = match self.fetch_stake_table(epoch).await {
301            Ok(root_leaf) => root_leaf,
302            Err(err) => {
303                self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
304                    .await;
305                return;
306            },
307        };
308
309        match <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
310            self.membership.clone(),
311            epoch,
312        )
313        .await
314        {
315            Ok(drb_result) => {
316                self.membership
317                    .write()
318                    .await
319                    .add_drb_result(epoch, drb_result);
320            },
321            Err(err) => {
322                tracing::warn!(
323                    "Recalculating missing DRB result for epoch {}. Catchup failed with error: {}",
324                    epoch,
325                    err
326                );
327
328                let result = self.compute_drb_result(epoch, root_leaf).await;
329
330                log!(result);
331
332                if let Err(err) = result {
333                    self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
334                        .await;
335                }
336            },
337        };
338
339        // Signal the other tasks about the success
340        if let Ok(Some(res)) = epoch_tx.try_broadcast(Ok(EpochMembership {
341            epoch: Some(epoch),
342            coordinator: self.clone(),
343        })) {
344            tracing::warn!(
345                "The catchup channel for epoch {} was overflown, dropped message {:?}",
346                epoch,
347                res.map(|em| em.epoch)
348            );
349        }
350
351        // Remove the epoch from the catchup map to indicate that the catchup is complete
352        self.catchup_map.lock().await.remove(&epoch);
353    }
354
355    /// Call this method if you think catchup is in progress for a given epoch
356    /// and you want to wait for it to finish and get the stake table.
357    /// If it's not, it will try to return the stake table if already available.
358    /// Returns an error if the catchup failed or the catchup is not in progress
359    /// and the stake table is not available.
360    pub async fn wait_for_catchup(&self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
361        let maybe_receiver = self
362            .catchup_map
363            .lock()
364            .await
365            .get(&epoch)
366            .map(InactiveReceiver::activate_cloned);
367        let Some(mut rx) = maybe_receiver else {
368            // There is no catchup in progress, maybe the epoch is already finalized
369            if self.membership.read().await.has_stake_table(epoch) {
370                return Ok(EpochMembership {
371                    epoch: Some(epoch),
372                    coordinator: self.clone(),
373                });
374            }
375            return Err(anytrace::error!(
376                "No catchup in progress for epoch {epoch} and we don't have a stake table for it"
377            ));
378        };
379        let Ok(Ok(mem)) = rx.recv_direct().await else {
380            return Err(anytrace::error!("Catchup for epoch {epoch} failed"));
381        };
382        Ok(mem)
383    }
384
385    /// Clean up after a failed catchup attempt.
386    ///
387    /// This method is called when a catchup attempt fails. It cleans up the state of the
388    /// `EpochMembershipCoordinator` by removing the failed epochs from the
389    /// `catchup_map` and broadcasting the error to any tasks that are waiting for the
390    /// catchup to complete.
391    async fn catchup_cleanup(
392        &mut self,
393        req_epoch: TYPES::Epoch,
394        epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
395        mut cancel_epochs: Vec<EpochSender<TYPES>>,
396        err: Error,
397    ) {
398        // Cleanup in case of error
399        cancel_epochs.push((req_epoch, epoch_tx));
400
401        tracing::error!(
402            "catchup for epoch {req_epoch:?} failed: {err:?}. Canceling catchup for epochs: {:?}",
403            cancel_epochs.iter().map(|(e, _)| e).collect::<Vec<_>>()
404        );
405        let mut map_lock = self.catchup_map.lock().await;
406        for (epoch, _) in cancel_epochs.iter() {
407            // Remove the failed epochs from the catchup map
408            map_lock.remove(epoch);
409        }
410        drop(map_lock);
411        for (cancel_epoch, tx) in cancel_epochs {
412            // Signal the other tasks about the failures
413            if let Ok(Some(res)) = tx.try_broadcast(Err(err.clone())) {
414                tracing::warn!(
415                    "The catchup channel for epoch {} was overflown during cleanup, dropped \
416                     message {:?}",
417                    cancel_epoch,
418                    res.map(|em| em.epoch)
419                );
420            }
421        }
422    }
423
424    /// A helper method to the `catchup` method.
425    ///
426    /// It tries to fetch the requested stake table from the root epoch,
427    /// and updates the membership accordingly.
428    ///
429    /// # Arguments
430    ///
431    /// * `epoch` - The epoch for which to fetch the stake table.
432    ///
433    /// # Returns
434    ///
435    /// * `Ok(Leaf2<TYPES>)` containing the epoch root leaf if successful.
436    /// * `Err(Error)` if the root membership or root leaf cannot be found, or if updating the membership fails.
437    async fn fetch_stake_table(&self, epoch: TYPES::Epoch) -> Result<Leaf2<TYPES>> {
438        let root_epoch = TYPES::Epoch::new(epoch.saturating_sub(2));
439        let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
440            return Err(anytrace::error!(
441                "We tried to fetch stake table for epoch {epoch:?} but we don't have its root \
442                 epoch {root_epoch:?}. This should not happen"
443            ));
444        };
445
446        // Get the epoch root headers and update our membership with them, finally sync them
447        // Verification of the root is handled in get_epoch_root_and_drb
448        let Ok(root_leaf) = root_membership
449            .get_epoch_root(root_block_in_epoch(*root_epoch, self.epoch_height))
450            .await
451        else {
452            return Err(anytrace::error!(
453                "get epoch root leaf failed for epoch {root_epoch:?}"
454            ));
455        };
456
457        Membership::add_epoch_root(
458            Arc::clone(&self.membership),
459            epoch,
460            root_leaf.block_header().clone(),
461        )
462        .await
463        .map_err(|e| {
464            anytrace::error!("Failed to add epoch root for epoch {epoch:?} to membership: {e}")
465        })?;
466
467        Ok(root_leaf)
468    }
469
470    pub async fn compute_drb_result(
471        &self,
472        epoch: TYPES::Epoch,
473        root_leaf: Leaf2<TYPES>,
474    ) -> Result<DrbResult> {
475        let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
476
477        if drb_calculation_map_lock.contains(&epoch) {
478            return Err(anytrace::debug!(
479                "DRB calculation for epoch {} already in progress",
480                epoch
481            ));
482        } else {
483            drb_calculation_map_lock.insert(epoch);
484        }
485
486        drop(drb_calculation_map_lock);
487
488        let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures) else {
489            return Err(anytrace::error!(
490                "Failed to serialize the QC signature for leaf {root_leaf:?}"
491            ));
492        };
493
494        let Some(drb_difficulty_selector) = self.drb_difficulty_selector.read().await.clone()
495        else {
496            return Err(anytrace::error!(
497                "The DRB difficulty selector is missing from the epoch membership coordinator. \
498                 This node will not be able to spawn any DRB calculation tasks from catchup."
499            ));
500        };
501
502        let drb_difficulty = drb_difficulty_selector(root_leaf.view_number()).await;
503
504        let mut drb_seed_input = [0u8; 32];
505        let len = drb_seed_input_vec.len().min(32);
506        drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
507        let drb_input = DrbInput {
508            epoch: *epoch,
509            iteration: 0,
510            value: drb_seed_input,
511            difficulty_level: drb_difficulty,
512        };
513
514        let store_drb_progress_fn = self.store_drb_progress_fn.clone();
515        let load_drb_progress_fn = self.load_drb_progress_fn.clone();
516
517        let drb = compute_drb_result(drb_input, store_drb_progress_fn, load_drb_progress_fn).await;
518
519        let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
520        drb_calculation_map_lock.remove(&epoch);
521        drop(drb_calculation_map_lock);
522
523        tracing::info!("Writing drb result from catchup to storage for epoch {epoch}: {drb:?}");
524        if let Err(e) = (self.store_drb_result_fn)(epoch, drb).await {
525            tracing::warn!("Failed to add drb result to storage: {e}");
526        }
527        self.membership.write().await.add_drb_result(epoch, drb);
528
529        Ok(drb)
530    }
531}
532
533fn spawn_catchup<T: NodeType>(
534    coordinator: EpochMembershipCoordinator<T>,
535    epoch: T::Epoch,
536    epoch_tx: Sender<Result<EpochMembership<T>>>,
537) {
538    tokio::spawn(async move {
539        coordinator.clone().catchup(epoch, epoch_tx).await;
540    });
541}
542/// Wrapper around a membership that guarantees that the epoch
543/// has a stake table
544pub struct EpochMembership<TYPES: NodeType> {
545    /// Epoch the `membership` is guaranteed to have a stake table for
546    pub epoch: Option<TYPES::Epoch>,
547    /// Underlying membership
548    pub coordinator: EpochMembershipCoordinator<TYPES>,
549}
550
551impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
552    fn clone(&self) -> Self {
553        Self {
554            coordinator: self.coordinator.clone(),
555            epoch: self.epoch,
556        }
557    }
558}
559
560impl<TYPES: NodeType> EpochMembership<TYPES> {
561    /// Get the epoch this membership is good for
562    pub fn epoch(&self) -> Option<TYPES::Epoch> {
563        self.epoch
564    }
565
566    /// Get a membership for the next epoch
567    pub async fn next_epoch(&self) -> Result<Self> {
568        ensure!(
569            self.epoch().is_some(),
570            "No next epoch because epoch is None"
571        );
572        self.coordinator
573            .membership_for_epoch(self.epoch.map(|e| e + 1))
574            .await
575    }
576    /// Get a membership for the next epoch
577    pub async fn next_epoch_stake_table(&self) -> Result<Self> {
578        ensure!(
579            self.epoch().is_some(),
580            "No next epoch because epoch is None"
581        );
582        self.coordinator
583            .stake_table_for_epoch(self.epoch.map(|e| e + 1))
584            .await
585    }
586    pub async fn get_new_epoch(&self, epoch: Option<TYPES::Epoch>) -> Result<Self> {
587        self.coordinator.membership_for_epoch(epoch).await
588    }
589
590    /// Wraps the same named Membership trait fn
591    async fn get_epoch_root(&self, block_height: u64) -> anyhow::Result<Leaf2<TYPES>> {
592        let Some(epoch) = self.epoch else {
593            anyhow::bail!("Cannot get root for None epoch");
594        };
595        <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
596            self.coordinator.membership.clone(),
597            block_height,
598            epoch,
599        )
600        .await
601    }
602
603    /// Wraps the same named Membership trait fn
604    pub async fn get_epoch_drb(&self) -> Result<DrbResult> {
605        let Some(epoch) = self.epoch else {
606            return Err(anytrace::warn!("Cannot get drb for None epoch"));
607        };
608        <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
609            self.coordinator.membership.clone(),
610            epoch,
611        )
612        .await
613        .wrap()
614    }
615
616    /// Get all participants in the committee (including their stake) for a specific epoch
617    pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
618        self.coordinator
619            .membership
620            .read()
621            .await
622            .stake_table(self.epoch)
623    }
624
625    /// Get all participants in the committee (including their stake) for a specific epoch
626    pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
627        self.coordinator
628            .membership
629            .read()
630            .await
631            .da_stake_table(self.epoch)
632    }
633
634    /// Get all participants in the committee for a specific view for a specific epoch
635    pub async fn committee_members(
636        &self,
637        view_number: TYPES::View,
638    ) -> BTreeSet<TYPES::SignatureKey> {
639        self.coordinator
640            .membership
641            .read()
642            .await
643            .committee_members(view_number, self.epoch)
644    }
645
646    /// Get all participants in the committee for a specific view for a specific epoch
647    pub async fn da_committee_members(
648        &self,
649        view_number: TYPES::View,
650    ) -> BTreeSet<TYPES::SignatureKey> {
651        self.coordinator
652            .membership
653            .read()
654            .await
655            .da_committee_members(view_number, self.epoch)
656    }
657
658    /// Get the stake table entry for a public key, returns `None` if the
659    /// key is not in the table for a specific epoch
660    pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
661        self.coordinator
662            .membership
663            .read()
664            .await
665            .stake(pub_key, self.epoch)
666    }
667
668    /// Get the DA stake table entry for a public key, returns `None` if the
669    /// key is not in the table for a specific epoch
670    pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
671        self.coordinator
672            .membership
673            .read()
674            .await
675            .da_stake(pub_key, self.epoch)
676    }
677
678    /// See if a node has stake in the committee in a specific epoch
679    pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
680        self.coordinator
681            .membership
682            .read()
683            .await
684            .has_stake(pub_key, self.epoch)
685    }
686
687    /// See if a node has stake in the committee in a specific epoch
688    pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
689        self.coordinator
690            .membership
691            .read()
692            .await
693            .has_da_stake(pub_key, self.epoch)
694    }
695
696    /// The leader of the committee for view `view_number` in `epoch`.
697    ///
698    /// Note: this function uses a HotShot-internal error type.
699    /// You should implement `lookup_leader`, rather than implementing this function directly.
700    ///
701    /// # Errors
702    /// Returns an error if the leader cannot be calculated.
703    pub async fn leader(&self, view: TYPES::View) -> Result<TYPES::SignatureKey> {
704        self.coordinator
705            .membership
706            .read()
707            .await
708            .leader(view, self.epoch)
709    }
710
711    /// The leader of the committee for view `view_number` in `epoch`.
712    ///
713    /// Note: There is no such thing as a DA leader, so any consumer
714    /// requiring a leader should call this.
715    ///
716    /// # Errors
717    /// Returns an error if the leader cannot be calculated
718    pub async fn lookup_leader(
719        &self,
720        view: TYPES::View,
721    ) -> std::result::Result<
722        TYPES::SignatureKey,
723        <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
724    > {
725        self.coordinator
726            .membership
727            .read()
728            .await
729            .lookup_leader(view, self.epoch)
730    }
731
732    /// Returns the number of total nodes in the committee in an epoch `epoch`
733    pub async fn total_nodes(&self) -> usize {
734        self.coordinator
735            .membership
736            .read()
737            .await
738            .total_nodes(self.epoch)
739    }
740
741    /// Returns the number of total DA nodes in the committee in an epoch `epoch`
742    pub async fn da_total_nodes(&self) -> usize {
743        self.coordinator
744            .membership
745            .read()
746            .await
747            .da_total_nodes(self.epoch)
748    }
749
750    /// Returns the threshold for a specific `Membership` implementation
751    pub async fn success_threshold(&self) -> U256 {
752        self.coordinator
753            .membership
754            .read()
755            .await
756            .success_threshold(self.epoch)
757    }
758
759    /// Returns the DA threshold for a specific `Membership` implementation
760    pub async fn da_success_threshold(&self) -> U256 {
761        self.coordinator
762            .membership
763            .read()
764            .await
765            .da_success_threshold(self.epoch)
766    }
767
768    /// Returns the threshold for a specific `Membership` implementation
769    pub async fn failure_threshold(&self) -> U256 {
770        self.coordinator
771            .membership
772            .read()
773            .await
774            .failure_threshold(self.epoch)
775    }
776
777    /// Returns the threshold required to upgrade the network protocol
778    pub async fn upgrade_threshold(&self) -> U256 {
779        self.coordinator
780            .membership
781            .read()
782            .await
783            .upgrade_threshold(self.epoch)
784    }
785
786    /// Add the epoch result to the membership
787    pub async fn add_drb_result(&self, drb_result: DrbResult) {
788        if let Some(epoch) = self.epoch() {
789            self.coordinator
790                .membership
791                .write()
792                .await
793                .add_drb_result(epoch, drb_result);
794        }
795    }
796    pub async fn stake_table_hash(
797        &self,
798    ) -> Option<Commitment<<TYPES::Membership as Membership<TYPES>>::StakeTableHash>> {
799        self.coordinator
800            .membership
801            .read()
802            .await
803            .stake_table_hash(self.epoch?)
804    }
805}