hotshot_types/
epoch_membership.rs

1use std::{
2    collections::{BTreeSet, HashMap, HashSet},
3    sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
8use async_lock::{Mutex, RwLock};
9use committable::Commitment;
10use hotshot_utils::{anytrace::*, *};
11
12use crate::{
13    data::Leaf2,
14    drb::{compute_drb_result, DrbDifficultySelectorFn, DrbInput, DrbResult},
15    event::Event,
16    stake_table::HSStakeTable,
17    traits::{
18        block_contents::BlockHeader,
19        election::Membership,
20        node_implementation::{ConsensusTime, NodeType},
21        storage::{
22            load_drb_progress_fn, store_drb_progress_fn, store_drb_result_fn, LoadDrbProgressFn,
23            Storage, StoreDrbProgressFn, StoreDrbResultFn,
24        },
25    },
26    PeerConfig,
27};
28
29type EpochMap<TYPES> =
30    HashMap<<TYPES as NodeType>::Epoch, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
31
32type DrbMap<TYPES> = HashSet<<TYPES as NodeType>::Epoch>;
33
34type EpochSender<TYPES> = (
35    <TYPES as NodeType>::Epoch,
36    Sender<Result<EpochMembership<TYPES>>>,
37);
38
39/// Struct to Coordinate membership catchup
40pub struct EpochMembershipCoordinator<TYPES: NodeType> {
41    /// The underlying membhersip
42    membership: Arc<RwLock<TYPES::Membership>>,
43
44    /// Any in progress attempts at catching up are stored in this map
45    /// Any new callers wantin an `EpochMembership` will await on the signal
46    /// alerting them the membership is ready.  The first caller for an epoch will
47    /// wait for the actual catchup and alert future callers when it's done
48    catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
49
50    drb_calculation_map: Arc<Mutex<DrbMap<TYPES>>>,
51
52    /// Number of blocks in an epoch
53    pub epoch_height: u64,
54
55    store_drb_progress_fn: StoreDrbProgressFn,
56
57    load_drb_progress_fn: LoadDrbProgressFn,
58
59    /// Callback function to store a drb result in storage when one is calculated during catchup
60    store_drb_result_fn: StoreDrbResultFn<TYPES>,
61
62    /// Callback function to select a DRB difficulty based on the view number of the seed
63    pub drb_difficulty_selector: Arc<RwLock<Option<DrbDifficultySelectorFn>>>,
64}
65
66impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
67    fn clone(&self) -> Self {
68        Self {
69            membership: Arc::clone(&self.membership),
70            catchup_map: Arc::clone(&self.catchup_map),
71            drb_calculation_map: Arc::clone(&self.drb_calculation_map),
72            epoch_height: self.epoch_height,
73            store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
74            load_drb_progress_fn: Arc::clone(&self.load_drb_progress_fn),
75            store_drb_result_fn: self.store_drb_result_fn.clone(),
76            drb_difficulty_selector: Arc::clone(&self.drb_difficulty_selector),
77        }
78    }
79}
80
81impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
82where
83    Self: Send,
84{
85    /// Create an EpochMembershipCoordinator
86    pub fn new<S: Storage<TYPES>>(
87        membership: Arc<RwLock<TYPES::Membership>>,
88        epoch_height: u64,
89        storage: &S,
90    ) -> Self {
91        Self {
92            membership,
93            catchup_map: Arc::default(),
94            drb_calculation_map: Arc::default(),
95            epoch_height,
96            store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
97            load_drb_progress_fn: load_drb_progress_fn(storage.clone()),
98            store_drb_result_fn: store_drb_result_fn(storage.clone()),
99            drb_difficulty_selector: Arc::new(RwLock::new(None)),
100        }
101    }
102
103    pub async fn set_external_channel(&mut self, external_channel: Receiver<Event<TYPES>>) {
104        self.membership
105            .write()
106            .await
107            .set_external_channel(external_channel)
108            .await;
109    }
110
111    /// Get a reference to the membership
112    #[must_use]
113    pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
114        &self.membership
115    }
116
117    /// Set the DRB difficulty selector
118    pub async fn set_drb_difficulty_selector(
119        &self,
120        drb_difficulty_selector: DrbDifficultySelectorFn,
121    ) {
122        let mut drb_difficulty_selector_writer = self.drb_difficulty_selector.write().await;
123
124        *drb_difficulty_selector_writer = Some(drb_difficulty_selector);
125    }
126
127    /// Get a Membership for a given Epoch, which is guaranteed to have a randomized stake
128    /// table for the given Epoch
129    pub async fn membership_for_epoch(
130        &self,
131        maybe_epoch: Option<TYPES::Epoch>,
132    ) -> Result<EpochMembership<TYPES>> {
133        let ret_val = EpochMembership {
134            epoch: maybe_epoch,
135            coordinator: self.clone(),
136        };
137        let Some(epoch) = maybe_epoch else {
138            return Ok(ret_val);
139        };
140        if self
141            .membership
142            .read()
143            .await
144            .has_randomized_stake_table(epoch)
145            .map_err(|e| {
146                error!(
147                    "membership_for_epoch failed while called with maybe_epoch {maybe_epoch:?}: \
148                     {e}"
149                )
150            })?
151        {
152            return Ok(ret_val);
153        }
154        if self.catchup_map.lock().await.contains_key(&epoch) {
155            return Err(warn!(
156                "Randomized stake table for epoch {epoch:?} unavailable. Catchup already in \
157                 progress"
158            ));
159        }
160        let coordinator = self.clone();
161        let (tx, rx) = broadcast(1);
162        self.catchup_map.lock().await.insert(epoch, rx.deactivate());
163        spawn_catchup(coordinator, epoch, tx);
164
165        Err(warn!(
166            "Randomized stake table for epoch {epoch:?} unavailable. Starting catchup"
167        ))
168    }
169
170    /// Get a Membership for a given Epoch, which is guaranteed to have a stake
171    /// table for the given Epoch
172    pub async fn stake_table_for_epoch(
173        &self,
174        maybe_epoch: Option<TYPES::Epoch>,
175    ) -> Result<EpochMembership<TYPES>> {
176        let ret_val = EpochMembership {
177            epoch: maybe_epoch,
178            coordinator: self.clone(),
179        };
180        let Some(epoch) = maybe_epoch else {
181            return Ok(ret_val);
182        };
183        if self.membership.read().await.has_stake_table(epoch) {
184            return Ok(ret_val);
185        }
186        if self.catchup_map.lock().await.contains_key(&epoch) {
187            return Err(warn!(
188                "Stake table for Epoch {epoch:?} Unavailable. Catch up already in Progress"
189            ));
190        }
191        let coordinator = self.clone();
192        let (tx, rx) = broadcast(1);
193        self.catchup_map.lock().await.insert(epoch, rx.deactivate());
194        spawn_catchup(coordinator, epoch, tx);
195
196        Err(warn!(
197            "Stake table for Epoch {epoch:?} Unavailable. Starting catchup"
198        ))
199    }
200
201    /// Catches the membership up to the epoch passed as an argument.  
202    /// To do this, try to get the stake table for the epoch containing this epoch's root and
203    /// the stake table for the epoch containing this epoch's drb result.
204    /// If they do not exist, then go one by one back until we find a stake table.
205    ///
206    /// If there is another catchup in progress this will not duplicate efforts
207    /// e.g. if we start with only the first epoch stake table and call catchup for epoch 10, then call catchup for epoch 20
208    /// the first caller will actually do the work for to catchup to epoch 10 then the second caller will continue
209    /// catching up to epoch 20
210    async fn catchup(
211        mut self,
212        epoch: TYPES::Epoch,
213        epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
214    ) {
215        // We need to fetch the requested epoch, that's for sure
216        let mut fetch_epochs = vec![];
217
218        let mut try_epoch = TYPES::Epoch::new(epoch.saturating_sub(1));
219        let maybe_first_epoch = self.membership.read().await.first_epoch();
220        let Some(first_epoch) = maybe_first_epoch else {
221            let err = anytrace::error!(
222                "We got a catchup request for epoch {epoch:?} but the first epoch is not set"
223            );
224            self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
225                .await;
226            return;
227        };
228
229        // First figure out which epochs we need to fetch
230        loop {
231            let has_stake_table = self.membership.read().await.has_stake_table(try_epoch);
232            if has_stake_table {
233                // We have this stake table but we need to make sure we have the epoch root of the requested epoch
234                if try_epoch <= TYPES::Epoch::new(epoch.saturating_sub(2)) {
235                    break;
236                }
237                try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
238            } else {
239                if try_epoch <= first_epoch + 1 {
240                    let err = anytrace::error!(
241                        "We are trying to catchup to an epoch lower than the second epoch! This \
242                         means the initial stake table is missing!"
243                    );
244                    self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
245                        .await;
246                    return;
247                }
248                // Lock the catchup map
249                let mut map_lock = self.catchup_map.lock().await;
250                if let Some(mut rx) = map_lock
251                    .get(&try_epoch)
252                    .map(InactiveReceiver::activate_cloned)
253                {
254                    // Somebody else is already fetching this epoch, drop the lock and wait for them to finish
255                    drop(map_lock);
256                    if let Ok(Ok(_)) = rx.recv_direct().await {
257                        break;
258                    };
259                    // If we didn't receive the epoch then we need to try again
260                } else {
261                    // Nobody else is fetching this epoch. We need to do it. Put it in the map and move on to the next epoch
262                    let (mut tx, rx) = broadcast(1);
263                    tx.set_overflow(true);
264                    map_lock.insert(try_epoch, rx.deactivate());
265                    drop(map_lock);
266                    fetch_epochs.push((try_epoch, tx));
267                    try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
268                }
269            };
270        }
271        let epochs = fetch_epochs.iter().map(|(e, _)| e).collect::<Vec<_>>();
272        tracing::warn!("Fetching stake tables for epochs: {epochs:?}");
273
274        // Iterate through the epochs we need to fetch in reverse, i.e. from the oldest to the newest
275        while let Some((current_fetch_epoch, tx)) = fetch_epochs.pop() {
276            match self.fetch_stake_table(current_fetch_epoch).await {
277                Ok(_) => {},
278                Err(err) => {
279                    fetch_epochs.push((current_fetch_epoch, tx));
280                    self.catchup_cleanup(epoch, epoch_tx, fetch_epochs, err)
281                        .await;
282                    return;
283                },
284            };
285
286            // Signal the other tasks about the success
287            if let Ok(Some(res)) = tx.try_broadcast(Ok(EpochMembership {
288                epoch: Some(current_fetch_epoch),
289                coordinator: self.clone(),
290            })) {
291                tracing::warn!(
292                    "The catchup channel for epoch {} was overflown, dropped message {:?}",
293                    current_fetch_epoch,
294                    res.map(|em| em.epoch)
295                );
296            }
297
298            // Remove the epoch from the catchup map to indicate that the catchup is complete
299            self.catchup_map.lock().await.remove(&current_fetch_epoch);
300        }
301
302        let root_leaf = match self.fetch_stake_table(epoch).await {
303            Ok(root_leaf) => root_leaf,
304            Err(err) => {
305                tracing::error!("Failed to fetch stake table for epoch {epoch:?}: {err:?}");
306                self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
307                    .await;
308                return;
309            },
310        };
311
312        match <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
313            self.membership.clone(),
314            epoch,
315        )
316        .await
317        {
318            Ok(drb_result) => {
319                tracing::warn!(
320                    ?drb_result,
321                    "DRB result for epoch {epoch:?} retrieved from peers. Updating membership."
322                );
323                self.membership
324                    .write()
325                    .await
326                    .add_drb_result(epoch, drb_result);
327            },
328            Err(err) => {
329                tracing::warn!(
330                    "Recalculating missing DRB result for epoch {}. Catchup failed with error: {}",
331                    epoch,
332                    err
333                );
334
335                let result = self.compute_drb_result(epoch, root_leaf).await;
336
337                log!(result);
338
339                if let Err(err) = result {
340                    self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
341                        .await;
342                }
343            },
344        };
345
346        // Signal the other tasks about the success
347        if let Ok(Some(res)) = epoch_tx.try_broadcast(Ok(EpochMembership {
348            epoch: Some(epoch),
349            coordinator: self.clone(),
350        })) {
351            tracing::warn!(
352                "The catchup channel for epoch {} was overflown, dropped message {:?}",
353                epoch,
354                res.map(|em| em.epoch)
355            );
356        }
357
358        // Remove the epoch from the catchup map to indicate that the catchup is complete
359        self.catchup_map.lock().await.remove(&epoch);
360    }
361
362    /// Call this method if you think catchup is in progress for a given epoch
363    /// and you want to wait for it to finish and get the stake table.
364    /// If it's not, it will try to return the stake table if already available.
365    /// Returns an error if the catchup failed or the catchup is not in progress
366    /// and the stake table is not available.
367    pub async fn wait_for_catchup(&self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
368        let maybe_receiver = self
369            .catchup_map
370            .lock()
371            .await
372            .get(&epoch)
373            .map(InactiveReceiver::activate_cloned);
374        let Some(mut rx) = maybe_receiver else {
375            // There is no catchup in progress, maybe the epoch is already finalized
376            if self.membership.read().await.has_stake_table(epoch) {
377                return Ok(EpochMembership {
378                    epoch: Some(epoch),
379                    coordinator: self.clone(),
380                });
381            }
382            return Err(anytrace::error!(
383                "No catchup in progress for epoch {epoch} and we don't have a stake table for it"
384            ));
385        };
386        let Ok(Ok(mem)) = rx.recv_direct().await else {
387            return Err(anytrace::error!("Catchup for epoch {epoch} failed"));
388        };
389        Ok(mem)
390    }
391
392    /// Clean up after a failed catchup attempt.
393    ///
394    /// This method is called when a catchup attempt fails. It cleans up the state of the
395    /// `EpochMembershipCoordinator` by removing the failed epochs from the
396    /// `catchup_map` and broadcasting the error to any tasks that are waiting for the
397    /// catchup to complete.
398    async fn catchup_cleanup(
399        &mut self,
400        req_epoch: TYPES::Epoch,
401        epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
402        mut cancel_epochs: Vec<EpochSender<TYPES>>,
403        err: Error,
404    ) {
405        // Cleanup in case of error
406        cancel_epochs.push((req_epoch, epoch_tx));
407
408        tracing::error!(
409            "catchup for epoch {req_epoch:?} failed: {err:?}. Canceling catchup for epochs: {:?}",
410            cancel_epochs.iter().map(|(e, _)| e).collect::<Vec<_>>()
411        );
412        let mut map_lock = self.catchup_map.lock().await;
413        for (epoch, _) in cancel_epochs.iter() {
414            // Remove the failed epochs from the catchup map
415            map_lock.remove(epoch);
416        }
417        drop(map_lock);
418        for (cancel_epoch, tx) in cancel_epochs {
419            // Signal the other tasks about the failures
420            if let Ok(Some(res)) = tx.try_broadcast(Err(err.clone())) {
421                tracing::warn!(
422                    "The catchup channel for epoch {} was overflown during cleanup, dropped \
423                     message {:?}",
424                    cancel_epoch,
425                    res.map(|em| em.epoch)
426                );
427            }
428        }
429    }
430
431    /// A helper method to the `catchup` method.
432    ///
433    /// It tries to fetch the requested stake table from the root epoch,
434    /// and updates the membership accordingly.
435    ///
436    /// # Arguments
437    ///
438    /// * `epoch` - The epoch for which to fetch the stake table.
439    ///
440    /// # Returns
441    ///
442    /// * `Ok(Leaf2<TYPES>)` containing the epoch root leaf if successful.
443    /// * `Err(Error)` if the root membership or root leaf cannot be found, or if updating the membership fails.
444    async fn fetch_stake_table(&self, epoch: TYPES::Epoch) -> Result<Leaf2<TYPES>> {
445        let root_epoch = TYPES::Epoch::new(epoch.saturating_sub(2));
446        let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
447            return Err(anytrace::error!(
448                "We tried to fetch stake table for epoch {epoch:?} but we don't have its root \
449                 epoch {root_epoch:?}. This should not happen"
450            ));
451        };
452
453        // Get the epoch root headers and update our membership with them, finally sync them
454        // Verification of the root is handled in get_epoch_root_and_drb
455        let Ok(root_leaf) = root_membership.get_epoch_root().await else {
456            return Err(anytrace::error!(
457                "get epoch root leaf failed for epoch {root_epoch:?}"
458            ));
459        };
460
461        Membership::add_epoch_root(
462            Arc::clone(&self.membership),
463            root_leaf.block_header().clone(),
464        )
465        .await
466        .map_err(|e| {
467            anytrace::error!("Failed to add epoch root for epoch {epoch:?} to membership: {e}")
468        })?;
469
470        Ok(root_leaf)
471    }
472
473    pub async fn compute_drb_result(
474        &self,
475        epoch: TYPES::Epoch,
476        root_leaf: Leaf2<TYPES>,
477    ) -> Result<DrbResult> {
478        let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
479
480        if drb_calculation_map_lock.contains(&epoch) {
481            return Err(anytrace::debug!(
482                "DRB calculation for epoch {} already in progress",
483                epoch
484            ));
485        } else {
486            drb_calculation_map_lock.insert(epoch);
487        }
488
489        drop(drb_calculation_map_lock);
490
491        let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures) else {
492            return Err(anytrace::error!(
493                "Failed to serialize the QC signature for leaf {root_leaf:?}"
494            ));
495        };
496
497        let Some(drb_difficulty_selector) = self.drb_difficulty_selector.read().await.clone()
498        else {
499            return Err(anytrace::error!(
500                "The DRB difficulty selector is missing from the epoch membership coordinator. \
501                 This node will not be able to spawn any DRB calculation tasks from catchup."
502            ));
503        };
504
505        let drb_difficulty = drb_difficulty_selector(root_leaf.block_header().version()).await;
506
507        let mut drb_seed_input = [0u8; 32];
508        let len = drb_seed_input_vec.len().min(32);
509        drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
510        let drb_input = DrbInput {
511            epoch: *epoch,
512            iteration: 0,
513            value: drb_seed_input,
514            difficulty_level: drb_difficulty,
515        };
516
517        let store_drb_progress_fn = self.store_drb_progress_fn.clone();
518        let load_drb_progress_fn = self.load_drb_progress_fn.clone();
519
520        let drb = compute_drb_result(drb_input, store_drb_progress_fn, load_drb_progress_fn).await;
521
522        let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
523        drb_calculation_map_lock.remove(&epoch);
524        drop(drb_calculation_map_lock);
525
526        tracing::info!("Writing drb result from catchup to storage for epoch {epoch}: {drb:?}");
527        if let Err(e) = (self.store_drb_result_fn)(epoch, drb).await {
528            tracing::warn!("Failed to add drb result to storage: {e}");
529        }
530        self.membership.write().await.add_drb_result(epoch, drb);
531
532        Ok(drb)
533    }
534}
535
536fn spawn_catchup<T: NodeType>(
537    coordinator: EpochMembershipCoordinator<T>,
538    epoch: T::Epoch,
539    epoch_tx: Sender<Result<EpochMembership<T>>>,
540) {
541    tokio::spawn(async move {
542        coordinator.clone().catchup(epoch, epoch_tx).await;
543    });
544}
545/// Wrapper around a membership that guarantees that the epoch
546/// has a stake table
547pub struct EpochMembership<TYPES: NodeType> {
548    /// Epoch the `membership` is guaranteed to have a stake table for
549    pub epoch: Option<TYPES::Epoch>,
550    /// Underlying membership
551    pub coordinator: EpochMembershipCoordinator<TYPES>,
552}
553
554impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
555    fn clone(&self) -> Self {
556        Self {
557            coordinator: self.coordinator.clone(),
558            epoch: self.epoch,
559        }
560    }
561}
562
563impl<TYPES: NodeType> EpochMembership<TYPES> {
564    /// Get the epoch this membership is good for
565    pub fn epoch(&self) -> Option<TYPES::Epoch> {
566        self.epoch
567    }
568
569    /// Get a membership for the next epoch
570    pub async fn next_epoch(&self) -> Result<Self> {
571        ensure!(
572            self.epoch().is_some(),
573            "No next epoch because epoch is None"
574        );
575        self.coordinator
576            .membership_for_epoch(self.epoch.map(|e| e + 1))
577            .await
578    }
579    /// Get a membership for the next epoch
580    pub async fn next_epoch_stake_table(&self) -> Result<Self> {
581        ensure!(
582            self.epoch().is_some(),
583            "No next epoch because epoch is None"
584        );
585        self.coordinator
586            .stake_table_for_epoch(self.epoch.map(|e| e + 1))
587            .await
588    }
589    pub async fn get_new_epoch(&self, epoch: Option<TYPES::Epoch>) -> Result<Self> {
590        self.coordinator.membership_for_epoch(epoch).await
591    }
592
593    /// Wraps the same named Membership trait fn
594    async fn get_epoch_root(&self) -> anyhow::Result<Leaf2<TYPES>> {
595        let Some(epoch) = self.epoch else {
596            anyhow::bail!("Cannot get root for None epoch");
597        };
598        <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
599            self.coordinator.membership.clone(),
600            epoch,
601        )
602        .await
603    }
604
605    /// Wraps the same named Membership trait fn
606    pub async fn get_epoch_drb(&self) -> Result<DrbResult> {
607        let Some(epoch) = self.epoch else {
608            return Err(anytrace::warn!("Cannot get drb for None epoch"));
609        };
610        <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
611            self.coordinator.membership.clone(),
612            epoch,
613        )
614        .await
615        .wrap()
616    }
617
618    /// Get all participants in the committee (including their stake) for a specific epoch
619    pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
620        self.coordinator
621            .membership
622            .read()
623            .await
624            .stake_table(self.epoch)
625    }
626
627    /// Get all participants in the committee (including their stake) for a specific epoch
628    pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
629        self.coordinator
630            .membership
631            .read()
632            .await
633            .da_stake_table(self.epoch)
634    }
635
636    /// Get all participants in the committee for a specific view for a specific epoch
637    pub async fn committee_members(
638        &self,
639        view_number: TYPES::View,
640    ) -> BTreeSet<TYPES::SignatureKey> {
641        self.coordinator
642            .membership
643            .read()
644            .await
645            .committee_members(view_number, self.epoch)
646    }
647
648    /// Get all participants in the committee for a specific view for a specific epoch
649    pub async fn da_committee_members(
650        &self,
651        view_number: TYPES::View,
652    ) -> BTreeSet<TYPES::SignatureKey> {
653        self.coordinator
654            .membership
655            .read()
656            .await
657            .da_committee_members(view_number, self.epoch)
658    }
659
660    /// Get the stake table entry for a public key, returns `None` if the
661    /// key is not in the table for a specific epoch
662    pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
663        self.coordinator
664            .membership
665            .read()
666            .await
667            .stake(pub_key, self.epoch)
668    }
669
670    /// Get the DA stake table entry for a public key, returns `None` if the
671    /// key is not in the table for a specific epoch
672    pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
673        self.coordinator
674            .membership
675            .read()
676            .await
677            .da_stake(pub_key, self.epoch)
678    }
679
680    /// See if a node has stake in the committee in a specific epoch
681    pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
682        self.coordinator
683            .membership
684            .read()
685            .await
686            .has_stake(pub_key, self.epoch)
687    }
688
689    /// See if a node has stake in the committee in a specific epoch
690    pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
691        self.coordinator
692            .membership
693            .read()
694            .await
695            .has_da_stake(pub_key, self.epoch)
696    }
697
698    /// The leader of the committee for view `view_number` in `epoch`.
699    ///
700    /// Note: this function uses a HotShot-internal error type.
701    /// You should implement `lookup_leader`, rather than implementing this function directly.
702    ///
703    /// # Errors
704    /// Returns an error if the leader cannot be calculated.
705    pub async fn leader(&self, view: TYPES::View) -> Result<TYPES::SignatureKey> {
706        self.coordinator
707            .membership
708            .read()
709            .await
710            .leader(view, self.epoch)
711    }
712
713    /// The leader of the committee for view `view_number` in `epoch`.
714    ///
715    /// Note: There is no such thing as a DA leader, so any consumer
716    /// requiring a leader should call this.
717    ///
718    /// # Errors
719    /// Returns an error if the leader cannot be calculated
720    pub async fn lookup_leader(
721        &self,
722        view: TYPES::View,
723    ) -> std::result::Result<
724        TYPES::SignatureKey,
725        <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
726    > {
727        self.coordinator
728            .membership
729            .read()
730            .await
731            .lookup_leader(view, self.epoch)
732    }
733
734    /// Returns the number of total nodes in the committee in an epoch `epoch`
735    pub async fn total_nodes(&self) -> usize {
736        self.coordinator
737            .membership
738            .read()
739            .await
740            .total_nodes(self.epoch)
741    }
742
743    /// Returns the number of total DA nodes in the committee in an epoch `epoch`
744    pub async fn da_total_nodes(&self) -> usize {
745        self.coordinator
746            .membership
747            .read()
748            .await
749            .da_total_nodes(self.epoch)
750    }
751
752    /// Returns the threshold for a specific `Membership` implementation
753    pub async fn success_threshold(&self) -> U256 {
754        self.coordinator
755            .membership
756            .read()
757            .await
758            .success_threshold(self.epoch)
759    }
760
761    /// Returns the DA threshold for a specific `Membership` implementation
762    pub async fn da_success_threshold(&self) -> U256 {
763        self.coordinator
764            .membership
765            .read()
766            .await
767            .da_success_threshold(self.epoch)
768    }
769
770    /// Returns the threshold for a specific `Membership` implementation
771    pub async fn failure_threshold(&self) -> U256 {
772        self.coordinator
773            .membership
774            .read()
775            .await
776            .failure_threshold(self.epoch)
777    }
778
779    /// Returns the threshold required to upgrade the network protocol
780    pub async fn upgrade_threshold(&self) -> U256 {
781        self.coordinator
782            .membership
783            .read()
784            .await
785            .upgrade_threshold(self.epoch)
786    }
787
788    /// Add the epoch result to the membership
789    pub async fn add_drb_result(&self, drb_result: DrbResult) {
790        if let Some(epoch) = self.epoch() {
791            self.coordinator
792                .membership
793                .write()
794                .await
795                .add_drb_result(epoch, drb_result);
796        }
797    }
798    pub async fn stake_table_hash(
799        &self,
800    ) -> Option<Commitment<<TYPES::Membership as Membership<TYPES>>::StakeTableHash>> {
801        self.coordinator
802            .membership
803            .read()
804            .await
805            .stake_table_hash(self.epoch?)
806    }
807}