hotshot_types/
epoch_membership.rs

1use std::{
2    collections::{BTreeSet, HashMap, HashSet},
3    sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{broadcast, InactiveReceiver, Sender};
8use async_lock::{Mutex, RwLock};
9use committable::Commitment;
10use hotshot_utils::{
11    anytrace::{self, Error, Level, Result, Wrap, DEFAULT_LOG_LEVEL},
12    ensure, error, line_info, log, warn,
13};
14
15use crate::{
16    data::Leaf2,
17    drb::{compute_drb_result, DrbDifficultySelectorFn, DrbInput, DrbResult},
18    stake_table::HSStakeTable,
19    traits::{
20        election::Membership,
21        node_implementation::{ConsensusTime, NodeType},
22        storage::{
23            load_drb_progress_fn, store_drb_progress_fn, store_drb_result_fn, LoadDrbProgressFn,
24            Storage, StoreDrbProgressFn, StoreDrbResultFn,
25        },
26    },
27    utils::root_block_in_epoch,
28    PeerConfig,
29};
30
31type EpochMap<TYPES> =
32    HashMap<<TYPES as NodeType>::Epoch, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
33
34type DrbMap<TYPES> = HashSet<<TYPES as NodeType>::Epoch>;
35
36type EpochSender<TYPES> = (
37    <TYPES as NodeType>::Epoch,
38    Sender<Result<EpochMembership<TYPES>>>,
39);
40
41/// Struct to Coordinate membership catchup
42pub struct EpochMembershipCoordinator<TYPES: NodeType> {
43    /// The underlying membhersip
44    membership: Arc<RwLock<TYPES::Membership>>,
45
46    /// Any in progress attempts at catching up are stored in this map
47    /// Any new callers wantin an `EpochMembership` will await on the signal
48    /// alerting them the membership is ready.  The first caller for an epoch will
49    /// wait for the actual catchup and alert future callers when it's done
50    catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
51
52    drb_calculation_map: Arc<Mutex<DrbMap<TYPES>>>,
53
54    /// Number of blocks in an epoch
55    pub epoch_height: u64,
56
57    store_drb_progress_fn: StoreDrbProgressFn,
58
59    load_drb_progress_fn: LoadDrbProgressFn,
60
61    /// Callback function to store a drb result in storage when one is calculated during catchup
62    store_drb_result_fn: StoreDrbResultFn<TYPES>,
63
64    /// Callback function to select a DRB difficulty based on the view number of the seed
65    pub drb_difficulty_selector: Arc<RwLock<Option<DrbDifficultySelectorFn<TYPES>>>>,
66}
67
68impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
69    fn clone(&self) -> Self {
70        Self {
71            membership: Arc::clone(&self.membership),
72            catchup_map: Arc::clone(&self.catchup_map),
73            drb_calculation_map: Arc::clone(&self.drb_calculation_map),
74            epoch_height: self.epoch_height,
75            store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
76            load_drb_progress_fn: Arc::clone(&self.load_drb_progress_fn),
77            store_drb_result_fn: self.store_drb_result_fn.clone(),
78            drb_difficulty_selector: Arc::clone(&self.drb_difficulty_selector),
79        }
80    }
81}
82
83impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
84where
85    Self: Send,
86{
87    /// Create an EpochMembershipCoordinator
88    pub fn new<S: Storage<TYPES>>(
89        membership: Arc<RwLock<TYPES::Membership>>,
90        epoch_height: u64,
91        storage: &S,
92    ) -> Self {
93        Self {
94            membership,
95            catchup_map: Arc::default(),
96            drb_calculation_map: Arc::default(),
97            epoch_height,
98            store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
99            load_drb_progress_fn: load_drb_progress_fn(storage.clone()),
100            store_drb_result_fn: store_drb_result_fn(storage.clone()),
101            drb_difficulty_selector: Arc::new(RwLock::new(None)),
102        }
103    }
104
105    /// Get a reference to the membership
106    #[must_use]
107    pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
108        &self.membership
109    }
110
111    /// Set the DRB difficulty selector
112    pub async fn set_drb_difficulty_selector(
113        &self,
114        drb_difficulty_selector: DrbDifficultySelectorFn<TYPES>,
115    ) {
116        let mut drb_difficulty_selector_writer = self.drb_difficulty_selector.write().await;
117
118        *drb_difficulty_selector_writer = Some(drb_difficulty_selector);
119    }
120
121    /// Get a Membership for a given Epoch, which is guaranteed to have a randomized stake
122    /// table for the given Epoch
123    pub async fn membership_for_epoch(
124        &self,
125        maybe_epoch: Option<TYPES::Epoch>,
126    ) -> Result<EpochMembership<TYPES>> {
127        let ret_val = EpochMembership {
128            epoch: maybe_epoch,
129            coordinator: self.clone(),
130        };
131        let Some(epoch) = maybe_epoch else {
132            return Ok(ret_val);
133        };
134        if self
135            .membership
136            .read()
137            .await
138            .has_randomized_stake_table(epoch)
139            .map_err(|e| {
140                error!(
141                    "membership_for_epoch failed while called with maybe_epoch {maybe_epoch:?}: \
142                     {e}"
143                )
144            })?
145        {
146            return Ok(ret_val);
147        }
148        if self.catchup_map.lock().await.contains_key(&epoch) {
149            return Err(warn!(
150                "Randomized stake table for epoch {epoch:?} unavailable. Catchup already in \
151                 progress"
152            ));
153        }
154        let coordinator = self.clone();
155        let (tx, rx) = broadcast(1);
156        self.catchup_map.lock().await.insert(epoch, rx.deactivate());
157        spawn_catchup(coordinator, epoch, tx);
158
159        Err(warn!(
160            "Randomized stake table for epoch {epoch:?} unavailable. Starting catchup"
161        ))
162    }
163
164    /// Get a Membership for a given Epoch, which is guaranteed to have a stake
165    /// table for the given Epoch
166    pub async fn stake_table_for_epoch(
167        &self,
168        maybe_epoch: Option<TYPES::Epoch>,
169    ) -> Result<EpochMembership<TYPES>> {
170        let ret_val = EpochMembership {
171            epoch: maybe_epoch,
172            coordinator: self.clone(),
173        };
174        let Some(epoch) = maybe_epoch else {
175            return Ok(ret_val);
176        };
177        if self.membership.read().await.has_stake_table(epoch) {
178            return Ok(ret_val);
179        }
180        if self.catchup_map.lock().await.contains_key(&epoch) {
181            return Err(warn!(
182                "Stake table for Epoch {epoch:?} Unavailable. Catch up already in Progress"
183            ));
184        }
185        let coordinator = self.clone();
186        let (tx, rx) = broadcast(1);
187        self.catchup_map.lock().await.insert(epoch, rx.deactivate());
188        spawn_catchup(coordinator, epoch, tx);
189
190        Err(warn!(
191            "Stake table for Epoch {epoch:?} Unavailable. Starting catchup"
192        ))
193    }
194
195    /// Catches the membership up to the epoch passed as an argument.  
196    /// To do this, try to get the stake table for the epoch containing this epoch's root and
197    /// the stake table for the epoch containing this epoch's drb result.
198    /// If they do not exist, then go one by one back until we find a stake table.
199    ///
200    /// If there is another catchup in progress this will not duplicate efforts
201    /// e.g. if we start with only the first epoch stake table and call catchup for epoch 10, then call catchup for epoch 20
202    /// the first caller will actually do the work for to catchup to epoch 10 then the second caller will continue
203    /// catching up to epoch 20
204    async fn catchup(
205        mut self,
206        epoch: TYPES::Epoch,
207        epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
208    ) {
209        // We need to fetch the requested epoch, that's for sure
210        let mut fetch_epochs = vec![];
211
212        let mut try_epoch = TYPES::Epoch::new(epoch.saturating_sub(1));
213        let maybe_first_epoch = self.membership.read().await.first_epoch();
214        let Some(first_epoch) = maybe_first_epoch else {
215            let err = anytrace::error!(
216                "We got a catchup request for epoch {epoch:?} but the first epoch is not set"
217            );
218            self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
219                .await;
220            return;
221        };
222
223        // First figure out which epochs we need to fetch
224        loop {
225            let has_stake_table = self.membership.read().await.has_stake_table(try_epoch);
226            if has_stake_table {
227                // We have this stake table but we need to make sure we have the epoch root of the requested epoch
228                if try_epoch <= TYPES::Epoch::new(epoch.saturating_sub(2)) {
229                    break;
230                }
231                try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
232            } else {
233                if try_epoch <= first_epoch + 1 {
234                    let err = anytrace::error!(
235                        "We are trying to catchup to an epoch lower than the second epoch! This \
236                         means the initial stake table is missing!"
237                    );
238                    self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
239                        .await;
240                    return;
241                }
242                // Lock the catchup map
243                let mut map_lock = self.catchup_map.lock().await;
244                if let Some(mut rx) = map_lock
245                    .get(&try_epoch)
246                    .map(InactiveReceiver::activate_cloned)
247                {
248                    // Somebody else is already fetching this epoch, drop the lock and wait for them to finish
249                    drop(map_lock);
250                    if let Ok(Ok(_)) = rx.recv_direct().await {
251                        break;
252                    };
253                    // If we didn't receive the epoch then we need to try again
254                } else {
255                    // Nobody else is fetching this epoch. We need to do it. Put it in the map and move on to the next epoch
256                    let (mut tx, rx) = broadcast(1);
257                    tx.set_overflow(true);
258                    map_lock.insert(try_epoch, rx.deactivate());
259                    drop(map_lock);
260                    fetch_epochs.push((try_epoch, tx));
261                    try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
262                }
263            };
264        }
265
266        // Iterate through the epochs we need to fetch in reverse, i.e. from the oldest to the newest
267        while let Some((current_fetch_epoch, tx)) = fetch_epochs.pop() {
268            match self.fetch_stake_table(current_fetch_epoch).await {
269                Ok(_) => {},
270                Err(err) => {
271                    fetch_epochs.push((current_fetch_epoch, tx));
272                    self.catchup_cleanup(epoch, epoch_tx, fetch_epochs, err)
273                        .await;
274                    return;
275                },
276            };
277
278            // Signal the other tasks about the success
279            if let Ok(Some(res)) = tx.try_broadcast(Ok(EpochMembership {
280                epoch: Some(current_fetch_epoch),
281                coordinator: self.clone(),
282            })) {
283                tracing::warn!(
284                    "The catchup channel for epoch {} was overflown, dropped message {:?}",
285                    current_fetch_epoch,
286                    res.map(|em| em.epoch)
287                );
288            }
289
290            // Remove the epoch from the catchup map to indicate that the catchup is complete
291            self.catchup_map.lock().await.remove(&current_fetch_epoch);
292        }
293
294        let root_leaf = match self.fetch_stake_table(epoch).await {
295            Ok(root_leaf) => root_leaf,
296            Err(err) => {
297                self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
298                    .await;
299                return;
300            },
301        };
302
303        match <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
304            self.membership.clone(),
305            epoch,
306        )
307        .await
308        {
309            Ok(drb_result) => {
310                self.membership
311                    .write()
312                    .await
313                    .add_drb_result(epoch, drb_result);
314            },
315            Err(err) => {
316                tracing::warn!(
317                    "DRB result for epoch {} missing from membership. Beginning catchup to \
318                     recalculate it. Error: {}",
319                    epoch,
320                    err
321                );
322
323                if let Err(err) = self.compute_drb_result(epoch, root_leaf).await {
324                    tracing::error!(
325                        "DRB calculation for epoch {} failed . Error: {}",
326                        epoch,
327                        err
328                    );
329                    self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
330                        .await;
331                }
332            },
333        };
334
335        // Signal the other tasks about the success
336        if let Ok(Some(res)) = epoch_tx.try_broadcast(Ok(EpochMembership {
337            epoch: Some(epoch),
338            coordinator: self.clone(),
339        })) {
340            tracing::warn!(
341                "The catchup channel for epoch {} was overflown, dropped message {:?}",
342                epoch,
343                res.map(|em| em.epoch)
344            );
345        }
346
347        // Remove the epoch from the catchup map to indicate that the catchup is complete
348        self.catchup_map.lock().await.remove(&epoch);
349    }
350
351    /// Call this method if you think catchup is in progress for a given epoch
352    /// and you want to wait for it to finish and get the stake table.
353    /// If it's not, it will try to return the stake table if already available.
354    /// Returns an error if the catchup failed or the catchup is not in progress
355    /// and the stake table is not available.
356    pub async fn wait_for_catchup(&self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
357        let maybe_receiver = self
358            .catchup_map
359            .lock()
360            .await
361            .get(&epoch)
362            .map(InactiveReceiver::activate_cloned);
363        let Some(mut rx) = maybe_receiver else {
364            // There is no catchup in progress, maybe the epoch is already finalized
365            if self.membership.read().await.has_stake_table(epoch) {
366                return Ok(EpochMembership {
367                    epoch: Some(epoch),
368                    coordinator: self.clone(),
369                });
370            }
371            return Err(anytrace::error!(
372                "No catchup in progress for epoch {epoch} and we don't have a stake table for it"
373            ));
374        };
375        let Ok(Ok(mem)) = rx.recv_direct().await else {
376            return Err(anytrace::error!("Catchup for epoch {epoch} failed"));
377        };
378        Ok(mem)
379    }
380
381    /// Clean up after a failed catchup attempt.
382    ///
383    /// This method is called when a catchup attempt fails. It cleans up the state of the
384    /// `EpochMembershipCoordinator` by removing the failed epochs from the
385    /// `catchup_map` and broadcasting the error to any tasks that are waiting for the
386    /// catchup to complete.
387    async fn catchup_cleanup(
388        &mut self,
389        req_epoch: TYPES::Epoch,
390        epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
391        mut cancel_epochs: Vec<EpochSender<TYPES>>,
392        err: Error,
393    ) {
394        // Cleanup in case of error
395        cancel_epochs.push((req_epoch, epoch_tx));
396
397        tracing::error!(
398            "catchup for epoch {req_epoch:?} failed: {err:?}. Canceling catchup for epochs: {:?}",
399            cancel_epochs.iter().map(|(e, _)| e).collect::<Vec<_>>()
400        );
401        let mut map_lock = self.catchup_map.lock().await;
402        for (epoch, _) in cancel_epochs.iter() {
403            // Remove the failed epochs from the catchup map
404            map_lock.remove(epoch);
405        }
406        drop(map_lock);
407        for (cancel_epoch, tx) in cancel_epochs {
408            // Signal the other tasks about the failures
409            if let Ok(Some(res)) = tx.try_broadcast(Err(err.clone())) {
410                tracing::warn!(
411                    "The catchup channel for epoch {} was overflown during cleanup, dropped \
412                     message {:?}",
413                    cancel_epoch,
414                    res.map(|em| em.epoch)
415                );
416            }
417        }
418    }
419
420    /// A helper method to the `catchup` method.
421    ///
422    /// It tries to fetch the requested stake table from the root epoch,
423    /// and updates the membership accordingly.
424    ///
425    /// # Arguments
426    ///
427    /// * `epoch` - The epoch for which to fetch the stake table.
428    ///
429    /// # Returns
430    ///
431    /// * `Ok(Leaf2<TYPES>)` containing the epoch root leaf if successful.
432    /// * `Err(Error)` if the root membership or root leaf cannot be found, or if updating the membership fails.
433    async fn fetch_stake_table(&self, epoch: TYPES::Epoch) -> Result<Leaf2<TYPES>> {
434        let root_epoch = TYPES::Epoch::new(epoch.saturating_sub(2));
435        let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
436            return Err(anytrace::error!(
437                "We tried to fetch stake table for epoch {epoch:?} but we don't have its root \
438                 epoch {root_epoch:?}. This should not happen"
439            ));
440        };
441
442        // Get the epoch root headers and update our membership with them, finally sync them
443        // Verification of the root is handled in get_epoch_root_and_drb
444        let Ok(root_leaf) = root_membership
445            .get_epoch_root(root_block_in_epoch(*root_epoch, self.epoch_height))
446            .await
447        else {
448            return Err(anytrace::error!(
449                "get epoch root leaf failed for epoch {root_epoch:?}"
450            ));
451        };
452
453        Membership::add_epoch_root(
454            Arc::clone(&self.membership),
455            epoch,
456            root_leaf.block_header().clone(),
457        )
458        .await
459        .map_err(|e| {
460            anytrace::error!("Failed to add epoch root for epoch {epoch:?} to membership: {e}")
461        })?;
462
463        Ok(root_leaf)
464    }
465
466    pub async fn compute_drb_result(
467        &self,
468        epoch: TYPES::Epoch,
469        root_leaf: Leaf2<TYPES>,
470    ) -> Result<DrbResult> {
471        let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
472
473        if drb_calculation_map_lock.contains(&epoch) {
474            return Err(anytrace::warn!(
475                "DRB calculation for epoch {} already in progress",
476                epoch
477            ));
478        } else {
479            drb_calculation_map_lock.insert(epoch);
480        }
481
482        drop(drb_calculation_map_lock);
483
484        let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures) else {
485            return Err(anytrace::error!(
486                "Failed to serialize the QC signature for leaf {root_leaf:?}"
487            ));
488        };
489
490        let Some(drb_difficulty_selector) = self.drb_difficulty_selector.read().await.clone()
491        else {
492            return Err(anytrace::error!(
493                "The DRB difficulty selector is missing from the epoch membership coordinator. \
494                 This node will not be able to spawn any DRB calculation tasks from catchup."
495            ));
496        };
497
498        let drb_difficulty = drb_difficulty_selector(root_leaf.view_number()).await;
499
500        let mut drb_seed_input = [0u8; 32];
501        let len = drb_seed_input_vec.len().min(32);
502        drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
503        let drb_input = DrbInput {
504            epoch: *epoch,
505            iteration: 0,
506            value: drb_seed_input,
507            difficulty_level: drb_difficulty,
508        };
509
510        let store_drb_progress_fn = self.store_drb_progress_fn.clone();
511        let load_drb_progress_fn = self.load_drb_progress_fn.clone();
512
513        let drb = compute_drb_result(drb_input, store_drb_progress_fn, load_drb_progress_fn).await;
514
515        let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
516        drb_calculation_map_lock.remove(&epoch);
517        drop(drb_calculation_map_lock);
518
519        tracing::info!("Writing drb result from catchup to storage for epoch {epoch}: {drb:?}");
520        if let Err(e) = (self.store_drb_result_fn)(epoch, drb).await {
521            tracing::warn!("Failed to add drb result to storage: {e}");
522        }
523        self.membership.write().await.add_drb_result(epoch, drb);
524
525        Ok(drb)
526    }
527}
528
529fn spawn_catchup<T: NodeType>(
530    coordinator: EpochMembershipCoordinator<T>,
531    epoch: T::Epoch,
532    epoch_tx: Sender<Result<EpochMembership<T>>>,
533) {
534    tokio::spawn(async move {
535        coordinator.clone().catchup(epoch, epoch_tx).await;
536    });
537}
538/// Wrapper around a membership that guarantees that the epoch
539/// has a stake table
540pub struct EpochMembership<TYPES: NodeType> {
541    /// Epoch the `membership` is guaranteed to have a stake table for
542    pub epoch: Option<TYPES::Epoch>,
543    /// Underlying membership
544    pub coordinator: EpochMembershipCoordinator<TYPES>,
545}
546
547impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
548    fn clone(&self) -> Self {
549        Self {
550            coordinator: self.coordinator.clone(),
551            epoch: self.epoch,
552        }
553    }
554}
555
556impl<TYPES: NodeType> EpochMembership<TYPES> {
557    /// Get the epoch this membership is good for
558    pub fn epoch(&self) -> Option<TYPES::Epoch> {
559        self.epoch
560    }
561
562    /// Get a membership for the next epoch
563    pub async fn next_epoch(&self) -> Result<Self> {
564        ensure!(
565            self.epoch().is_some(),
566            "No next epoch because epoch is None"
567        );
568        self.coordinator
569            .membership_for_epoch(self.epoch.map(|e| e + 1))
570            .await
571    }
572    /// Get a membership for the next epoch
573    pub async fn next_epoch_stake_table(&self) -> Result<Self> {
574        ensure!(
575            self.epoch().is_some(),
576            "No next epoch because epoch is None"
577        );
578        self.coordinator
579            .stake_table_for_epoch(self.epoch.map(|e| e + 1))
580            .await
581    }
582    pub async fn get_new_epoch(&self, epoch: Option<TYPES::Epoch>) -> Result<Self> {
583        self.coordinator.membership_for_epoch(epoch).await
584    }
585
586    /// Wraps the same named Membership trait fn
587    async fn get_epoch_root(&self, block_height: u64) -> anyhow::Result<Leaf2<TYPES>> {
588        let Some(epoch) = self.epoch else {
589            anyhow::bail!("Cannot get root for None epoch");
590        };
591        <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
592            self.coordinator.membership.clone(),
593            block_height,
594            epoch,
595        )
596        .await
597    }
598
599    /// Wraps the same named Membership trait fn
600    pub async fn get_epoch_drb(&self) -> Result<DrbResult> {
601        let Some(epoch) = self.epoch else {
602            return Err(anytrace::warn!("Cannot get drb for None epoch"));
603        };
604        <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
605            self.coordinator.membership.clone(),
606            epoch,
607        )
608        .await
609        .wrap()
610    }
611
612    /// Get all participants in the committee (including their stake) for a specific epoch
613    pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
614        self.coordinator
615            .membership
616            .read()
617            .await
618            .stake_table(self.epoch)
619    }
620
621    /// Get all participants in the committee (including their stake) for a specific epoch
622    pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
623        self.coordinator
624            .membership
625            .read()
626            .await
627            .da_stake_table(self.epoch)
628    }
629
630    /// Get all participants in the committee for a specific view for a specific epoch
631    pub async fn committee_members(
632        &self,
633        view_number: TYPES::View,
634    ) -> BTreeSet<TYPES::SignatureKey> {
635        self.coordinator
636            .membership
637            .read()
638            .await
639            .committee_members(view_number, self.epoch)
640    }
641
642    /// Get all participants in the committee for a specific view for a specific epoch
643    pub async fn da_committee_members(
644        &self,
645        view_number: TYPES::View,
646    ) -> BTreeSet<TYPES::SignatureKey> {
647        self.coordinator
648            .membership
649            .read()
650            .await
651            .da_committee_members(view_number, self.epoch)
652    }
653
654    /// Get the stake table entry for a public key, returns `None` if the
655    /// key is not in the table for a specific epoch
656    pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
657        self.coordinator
658            .membership
659            .read()
660            .await
661            .stake(pub_key, self.epoch)
662    }
663
664    /// Get the DA stake table entry for a public key, returns `None` if the
665    /// key is not in the table for a specific epoch
666    pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
667        self.coordinator
668            .membership
669            .read()
670            .await
671            .da_stake(pub_key, self.epoch)
672    }
673
674    /// See if a node has stake in the committee in a specific epoch
675    pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
676        self.coordinator
677            .membership
678            .read()
679            .await
680            .has_stake(pub_key, self.epoch)
681    }
682
683    /// See if a node has stake in the committee in a specific epoch
684    pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
685        self.coordinator
686            .membership
687            .read()
688            .await
689            .has_da_stake(pub_key, self.epoch)
690    }
691
692    /// The leader of the committee for view `view_number` in `epoch`.
693    ///
694    /// Note: this function uses a HotShot-internal error type.
695    /// You should implement `lookup_leader`, rather than implementing this function directly.
696    ///
697    /// # Errors
698    /// Returns an error if the leader cannot be calculated.
699    pub async fn leader(&self, view: TYPES::View) -> Result<TYPES::SignatureKey> {
700        self.coordinator
701            .membership
702            .read()
703            .await
704            .leader(view, self.epoch)
705    }
706
707    /// The leader of the committee for view `view_number` in `epoch`.
708    ///
709    /// Note: There is no such thing as a DA leader, so any consumer
710    /// requiring a leader should call this.
711    ///
712    /// # Errors
713    /// Returns an error if the leader cannot be calculated
714    pub async fn lookup_leader(
715        &self,
716        view: TYPES::View,
717    ) -> std::result::Result<
718        TYPES::SignatureKey,
719        <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
720    > {
721        self.coordinator
722            .membership
723            .read()
724            .await
725            .lookup_leader(view, self.epoch)
726    }
727
728    /// Returns the number of total nodes in the committee in an epoch `epoch`
729    pub async fn total_nodes(&self) -> usize {
730        self.coordinator
731            .membership
732            .read()
733            .await
734            .total_nodes(self.epoch)
735    }
736
737    /// Returns the number of total DA nodes in the committee in an epoch `epoch`
738    pub async fn da_total_nodes(&self) -> usize {
739        self.coordinator
740            .membership
741            .read()
742            .await
743            .da_total_nodes(self.epoch)
744    }
745
746    /// Returns the threshold for a specific `Membership` implementation
747    pub async fn success_threshold(&self) -> U256 {
748        self.coordinator
749            .membership
750            .read()
751            .await
752            .success_threshold(self.epoch)
753    }
754
755    /// Returns the DA threshold for a specific `Membership` implementation
756    pub async fn da_success_threshold(&self) -> U256 {
757        self.coordinator
758            .membership
759            .read()
760            .await
761            .da_success_threshold(self.epoch)
762    }
763
764    /// Returns the threshold for a specific `Membership` implementation
765    pub async fn failure_threshold(&self) -> U256 {
766        self.coordinator
767            .membership
768            .read()
769            .await
770            .failure_threshold(self.epoch)
771    }
772
773    /// Returns the threshold required to upgrade the network protocol
774    pub async fn upgrade_threshold(&self) -> U256 {
775        self.coordinator
776            .membership
777            .read()
778            .await
779            .upgrade_threshold(self.epoch)
780    }
781
782    /// Add the epoch result to the membership
783    pub async fn add_drb_result(&self, drb_result: DrbResult) {
784        if let Some(epoch) = self.epoch() {
785            self.coordinator
786                .membership
787                .write()
788                .await
789                .add_drb_result(epoch, drb_result);
790        }
791    }
792    pub async fn stake_table_hash(
793        &self,
794    ) -> Option<Commitment<<TYPES::Membership as Membership<TYPES>>::StakeTableHash>> {
795        self.coordinator
796            .membership
797            .read()
798            .await
799            .stake_table_hash(self.epoch?)
800    }
801}