hotshot_types/
epoch_membership.rs

1use std::{
2    collections::{BTreeSet, HashMap},
3    sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{broadcast, InactiveReceiver, Sender};
8use async_lock::{Mutex, RwLock};
9use hotshot_utils::{
10    anytrace::{self, Error, Level, Result, Wrap, DEFAULT_LOG_LEVEL},
11    ensure, error, line_info, log, warn,
12};
13
14use crate::{
15    data::Leaf2,
16    drb::{compute_drb_result, DrbDifficultySelectorFn, DrbInput, DrbResult},
17    stake_table::HSStakeTable,
18    traits::{
19        election::Membership,
20        node_implementation::{ConsensusTime, NodeType},
21        storage::{
22            load_drb_progress_fn, store_drb_progress_fn, store_drb_result_fn, LoadDrbProgressFn,
23            Storage, StoreDrbProgressFn, StoreDrbResultFn,
24        },
25    },
26    utils::{root_block_in_epoch, transition_block_for_epoch},
27    PeerConfig,
28};
29
30type EpochMap<TYPES> =
31    HashMap<<TYPES as NodeType>::Epoch, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
32
33type EpochSender<TYPES> = (
34    <TYPES as NodeType>::Epoch,
35    Sender<Result<EpochMembership<TYPES>>>,
36);
37
38/// Struct to Coordinate membership catchup
39pub struct EpochMembershipCoordinator<TYPES: NodeType> {
40    /// The underlying membhersip
41    membership: Arc<RwLock<TYPES::Membership>>,
42
43    /// Any in progress attempts at catching up are stored in this map
44    /// Any new callers wantin an `EpochMembership` will await on the signal
45    /// alerting them the membership is ready.  The first caller for an epoch will
46    /// wait for the actual catchup and allert future callers when it's done
47    catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
48
49    /// Number of blocks in an epoch
50    pub epoch_height: u64,
51
52    store_drb_progress_fn: StoreDrbProgressFn,
53
54    load_drb_progress_fn: LoadDrbProgressFn,
55
56    /// Callback function to store a drb result in storage when one is calculated during catchup
57    store_drb_result_fn: StoreDrbResultFn<TYPES>,
58
59    /// Callback function to select a DRB difficulty based on the view number of the seed
60    pub drb_difficulty_selector: Arc<RwLock<Option<DrbDifficultySelectorFn<TYPES>>>>,
61}
62
63impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
64    fn clone(&self) -> Self {
65        Self {
66            membership: Arc::clone(&self.membership),
67            catchup_map: Arc::clone(&self.catchup_map),
68            epoch_height: self.epoch_height,
69            store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
70            load_drb_progress_fn: Arc::clone(&self.load_drb_progress_fn),
71            store_drb_result_fn: self.store_drb_result_fn.clone(),
72            drb_difficulty_selector: Arc::clone(&self.drb_difficulty_selector),
73        }
74    }
75}
76
77impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
78where
79    Self: Send,
80{
81    /// Create an EpochMembershipCoordinator
82    pub fn new<S: Storage<TYPES>>(
83        membership: Arc<RwLock<TYPES::Membership>>,
84        epoch_height: u64,
85        storage: &S,
86    ) -> Self {
87        Self {
88            membership,
89            catchup_map: Arc::default(),
90            epoch_height,
91            store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
92            load_drb_progress_fn: load_drb_progress_fn(storage.clone()),
93            store_drb_result_fn: store_drb_result_fn(storage.clone()),
94            drb_difficulty_selector: Arc::new(RwLock::new(None)),
95        }
96    }
97
98    /// Get a reference to the membership
99    #[must_use]
100    pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
101        &self.membership
102    }
103
104    /// Set the DRB difficulty selector
105    pub async fn set_drb_difficulty_selector(
106        &self,
107        drb_difficulty_selector: DrbDifficultySelectorFn<TYPES>,
108    ) {
109        let mut drb_difficulty_selector_writer = self.drb_difficulty_selector.write().await;
110
111        *drb_difficulty_selector_writer = Some(drb_difficulty_selector);
112    }
113
114    /// Get a Membership for a given Epoch, which is guaranteed to have a randomized stake
115    /// table for the given Epoch
116    pub async fn membership_for_epoch(
117        &self,
118        maybe_epoch: Option<TYPES::Epoch>,
119    ) -> Result<EpochMembership<TYPES>> {
120        let ret_val = EpochMembership {
121            epoch: maybe_epoch,
122            coordinator: self.clone(),
123        };
124        let Some(epoch) = maybe_epoch else {
125            return Ok(ret_val);
126        };
127        if self
128            .membership
129            .read()
130            .await
131            .has_randomized_stake_table(epoch)
132            .map_err(|e| {
133                error!(
134                    "membership_for_epoch failed while called with maybe_epoch {maybe_epoch:?}: \
135                     {e}"
136                )
137            })?
138        {
139            return Ok(ret_val);
140        }
141        if self.catchup_map.lock().await.contains_key(&epoch) {
142            return Err(warn!(
143                "Randomized stake table for epoch {epoch:?} unavailable. Catchup already in \
144                 progress"
145            ));
146        }
147        let coordinator = self.clone();
148        let (tx, rx) = broadcast(1);
149        self.catchup_map.lock().await.insert(epoch, rx.deactivate());
150        spawn_catchup(coordinator, epoch, tx);
151
152        Err(warn!(
153            "Randomized stake table for epoch {epoch:?} unavailable. Starting catchup"
154        ))
155    }
156
157    /// Get a Membership for a given Epoch, which is guaranteed to have a stake
158    /// table for the given Epoch
159    pub async fn stake_table_for_epoch(
160        &self,
161        maybe_epoch: Option<TYPES::Epoch>,
162    ) -> Result<EpochMembership<TYPES>> {
163        let ret_val = EpochMembership {
164            epoch: maybe_epoch,
165            coordinator: self.clone(),
166        };
167        let Some(epoch) = maybe_epoch else {
168            return Ok(ret_val);
169        };
170        if self.membership.read().await.has_stake_table(epoch) {
171            return Ok(ret_val);
172        }
173        if self.catchup_map.lock().await.contains_key(&epoch) {
174            return Err(warn!(
175                "Stake table for Epoch {epoch:?} Unavailable. Catch up already in Progress"
176            ));
177        }
178        let coordinator = self.clone();
179        let (tx, rx) = broadcast(1);
180        self.catchup_map.lock().await.insert(epoch, rx.deactivate());
181        spawn_catchup(coordinator, epoch, tx);
182
183        Err(warn!(
184            "Stake table for Epoch {epoch:?} Unavailable. Starting catchup"
185        ))
186    }
187
188    /// Catches the membership up to the epoch passed as an argument.  
189    /// To do this, try to get the stake table for the epoch containing this epoch's root and
190    /// the stake table for the epoch containing this epoch's drb result.
191    /// If they do not exist, then go one by one back until we find a stake table.
192    ///
193    /// If there is another catchup in progress this will not duplicate efforts
194    /// e.g. if we start with only the first epoch stake table and call catchup for epoch 10, then call catchup for epoch 20
195    /// the first caller will actually do the work for to catchup to epoch 10 then the second caller will continue
196    /// catching up to epoch 20
197    async fn catchup(
198        mut self,
199        epoch: TYPES::Epoch,
200        epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
201    ) {
202        // We need to fetch the requested epoch, that's for sure
203        let mut fetch_epochs = vec![];
204        fetch_epochs.push((epoch, epoch_tx));
205
206        let mut try_epoch = TYPES::Epoch::new(epoch.saturating_sub(1));
207        let maybe_first_epoch = self.membership.read().await.first_epoch();
208        let Some(first_epoch) = maybe_first_epoch else {
209            let err = anytrace::error!(
210                "We got a catchup request for epoch {epoch:?} but the first epoch is not set"
211            );
212            self.catchup_cleanup(epoch, fetch_epochs, err).await;
213            return;
214        };
215
216        // First figure out which epochs we need to fetch
217        loop {
218            let has_stake_table = self.membership.read().await.has_stake_table(try_epoch);
219            if has_stake_table {
220                // We have this stake table but we need to make sure we have the epoch root of the requested epoch
221                if try_epoch <= TYPES::Epoch::new(epoch.saturating_sub(2)) {
222                    break;
223                }
224                try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
225            } else {
226                if try_epoch <= first_epoch + 1 {
227                    let err = anytrace::error!(
228                        "We are trying to catchup to an epoch lower than the second epoch! This \
229                         means the initial stake table is missing!"
230                    );
231                    self.catchup_cleanup(epoch, fetch_epochs, err).await;
232                    return;
233                }
234                // Lock the catchup map
235                let mut map_lock = self.catchup_map.lock().await;
236                if let Some(mut rx) = map_lock
237                    .get(&try_epoch)
238                    .map(InactiveReceiver::activate_cloned)
239                {
240                    // Somebody else is already fetching this epoch, drop the lock and wait for them to finish
241                    drop(map_lock);
242                    if let Ok(Ok(_)) = rx.recv_direct().await {
243                        break;
244                    };
245                    // If we didn't receive the epoch then we need to try again
246                } else {
247                    // Nobody else is fetching this epoch. We need to do it. Put it in the map and move on to the next epoch
248                    let (mut tx, rx) = broadcast(1);
249                    tx.set_overflow(true);
250                    map_lock.insert(try_epoch, rx.deactivate());
251                    drop(map_lock);
252                    fetch_epochs.push((try_epoch, tx));
253                    try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
254                }
255            };
256        }
257
258        // Iterate through the epochs we need to fetch in reverse, i.e. from the oldest to the newest
259        while let Some((current_fetch_epoch, tx)) = fetch_epochs.pop() {
260            let root_leaf = match self.fetch_stake_table(current_fetch_epoch).await {
261                Ok(roof_leaf) => roof_leaf,
262                Err(err) => {
263                    fetch_epochs.push((current_fetch_epoch, tx));
264                    self.catchup_cleanup(epoch, fetch_epochs, err).await;
265                    return;
266                },
267            };
268
269            if let Err(err) = self
270                .fetch_or_calc_drb_results(current_fetch_epoch, root_leaf)
271                .await
272            {
273                fetch_epochs.push((current_fetch_epoch, tx));
274                self.catchup_cleanup(epoch, fetch_epochs, err).await;
275                return;
276            }
277
278            // Signal the other tasks about the success
279            if let Ok(Some(res)) = tx.try_broadcast(Ok(EpochMembership {
280                epoch: Some(current_fetch_epoch),
281                coordinator: self.clone(),
282            })) {
283                tracing::warn!(
284                    "The catchup channel for epoch {} was overflown, dropped message {:?}",
285                    current_fetch_epoch,
286                    res.map(|em| em.epoch)
287                );
288            }
289
290            // Remove the epoch from the catchup map to indicate that the catchup is complete
291            self.catchup_map.lock().await.remove(&current_fetch_epoch);
292        }
293    }
294
295    /// Call this method if you think catchup is in progress for a given epoch
296    /// and you want to wait for it to finish and get the stake table.
297    /// If it's not, it will try to return the stake table if already available.
298    /// Returns an error if the catchup failed or the catchup is not in progress
299    /// and the stake table is not available.
300    pub async fn wait_for_catchup(&self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
301        let maybe_receiver = self
302            .catchup_map
303            .lock()
304            .await
305            .get(&epoch)
306            .map(InactiveReceiver::activate_cloned);
307        let Some(mut rx) = maybe_receiver else {
308            // There is no catchup in progress, maybe the epoch is already finalized
309            if self.membership.read().await.has_stake_table(epoch) {
310                return Ok(EpochMembership {
311                    epoch: Some(epoch),
312                    coordinator: self.clone(),
313                });
314            }
315            return Err(anytrace::error!(
316                "No catchup in progress for epoch {epoch} and we don't have a stake table for it"
317            ));
318        };
319        let Ok(Ok(mem)) = rx.recv_direct().await else {
320            return Err(anytrace::error!("Catchup for epoch {epoch} failed"));
321        };
322        Ok(mem)
323    }
324
325    /// Clean up after a failed catchup attempt.
326    ///
327    /// This method is called when a catchup attempt fails. It cleans up the state of the
328    /// `EpochMembershipCoordinator` by removing the failed epochs from the
329    /// `catchup_map` and broadcasting the error to any tasks that are waiting for the
330    /// catchup to complete.
331    async fn catchup_cleanup(
332        &mut self,
333        req_epoch: TYPES::Epoch,
334        cancel_epochs: Vec<EpochSender<TYPES>>,
335        err: Error,
336    ) {
337        // Cleanup in case of error
338        let mut map_lock = self.catchup_map.lock().await;
339        for (epoch, _) in cancel_epochs.iter() {
340            // Remove the failed epochs from the catchup map
341            map_lock.remove(epoch);
342        }
343        drop(map_lock);
344        for (cancel_epoch, tx) in cancel_epochs {
345            // Signal the other tasks about the failures
346            if let Ok(Some(res)) = tx.try_broadcast(Err(err.clone())) {
347                tracing::warn!(
348                    "The catchup channel for epoch {} was overflown during cleanup, dropped \
349                     message {:?}",
350                    cancel_epoch,
351                    res.map(|em| em.epoch)
352                );
353            }
354        }
355        tracing::error!("catchup for epoch {req_epoch:?} failed: {err:?}");
356    }
357
358    /// A helper method to the `catchup` method.
359    ///
360    /// It tries to fetch the requested stake table from the root epoch,
361    /// and updates the membership accordingly.
362    ///
363    /// # Arguments
364    ///
365    /// * `epoch` - The epoch for which to fetch the stake table.
366    ///
367    /// # Returns
368    ///
369    /// * `Ok(Leaf2<TYPES>)` containing the epoch root leaf if successful.
370    /// * `Err(Error)` if the root membership or root leaf cannot be found, or if updating the membership fails.
371    async fn fetch_stake_table(&self, epoch: TYPES::Epoch) -> Result<Leaf2<TYPES>> {
372        let root_epoch = TYPES::Epoch::new(epoch.saturating_sub(2));
373        let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
374            return Err(anytrace::error!(
375                "We tried to fetch stake table for epoch {epoch:?} but we don't have its root \
376                 epoch {root_epoch:?}. This should not happen"
377            ));
378        };
379
380        // Get the epoch root headers and update our membership with them, finally sync them
381        // Verification of the root is handled in get_epoch_root_and_drb
382        let Ok(root_leaf) = root_membership
383            .get_epoch_root(root_block_in_epoch(*root_epoch, self.epoch_height))
384            .await
385        else {
386            return Err(anytrace::error!(
387                "get epoch root leaf failed for epoch {root_epoch:?}"
388            ));
389        };
390
391        Membership::add_epoch_root(
392            Arc::clone(&self.membership),
393            epoch,
394            root_leaf.block_header().clone(),
395        )
396        .await
397        .map_err(|e| {
398            anytrace::error!("Failed to add epoch root for epoch {epoch:?} to membership: {e}")
399        })?;
400
401        Ok(root_leaf)
402    }
403
404    /// A helper method to the `catchup` method.
405    ///
406    /// Fetch or compute the DRB (Distributed Random Beacon) result for a given epoch.
407    ///
408    /// This method attempts to retrieve the DRB result for the specified epoch. If the DRB
409    /// result is not available, it computes the DRB using the provided root leaf and stores
410    /// the result in the membership state.
411    ///
412    /// # Arguments
413    ///
414    /// * `epoch` - The epoch for which to fetch or compute the DRB result.
415    /// * `root_leaf` - The epoch root leaf used for DRB computation if the result is not available.
416    ///
417    /// # Returns
418    ///
419    /// * `Ok(())` if the DRB result was successfully fetched or computed and stored.
420    /// * `Err(Error)` if the DRB result could not be fetched, computed, or stored.
421    async fn fetch_or_calc_drb_results(
422        &self,
423        epoch: TYPES::Epoch,
424        root_leaf: Leaf2<TYPES>,
425    ) -> Result<()> {
426        let root_epoch = TYPES::Epoch::new(epoch.saturating_sub(2));
427        let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
428            return Err(anytrace::error!(
429                "We tried to fetch drb result for epoch {epoch:?} but we don't have its root \
430                 epoch {root_epoch:?}. This should not happen"
431            ));
432        };
433
434        let Ok(drb_membership) = root_membership.next_epoch_stake_table().await else {
435            return Err(anytrace::error!(
436                "get drb stake table failed for epoch {root_epoch:?}"
437            ));
438        };
439
440        // get the DRB from the last block of the epoch right before the one we're catching up to
441        // or compute it if it's not available
442        let drb = if let Ok(drb) = drb_membership
443            .get_epoch_drb(transition_block_for_epoch(
444                *(root_epoch + 1),
445                self.epoch_height,
446            ))
447            .await
448        {
449            drb
450        } else {
451            let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures)
452            else {
453                return Err(anytrace::error!(
454                    "Failed to serialize the QC signature for leaf {root_leaf:?}"
455                ));
456            };
457
458            let Some(ref drb_difficulty_selector) = *self.drb_difficulty_selector.read().await
459            else {
460                return Err(anytrace::error!(
461                    "The DRB difficulty selector is missing from the epoch membership \
462                     coordinator. This node will not be able to spawn any DRB calculation tasks \
463                     from catchup."
464                ));
465            };
466
467            let drb_difficulty = drb_difficulty_selector(root_leaf.view_number()).await;
468
469            let mut drb_seed_input = [0u8; 32];
470            let len = drb_seed_input_vec.len().min(32);
471            drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
472            let drb_input = DrbInput {
473                epoch: *epoch,
474                iteration: 0,
475                value: drb_seed_input,
476                difficulty_level: drb_difficulty,
477            };
478
479            let store_drb_progress_fn = self.store_drb_progress_fn.clone();
480            let load_drb_progress_fn = self.load_drb_progress_fn.clone();
481
482            compute_drb_result(drb_input, store_drb_progress_fn, load_drb_progress_fn).await
483        };
484
485        tracing::info!("Writing drb result from catchup to storage for epoch {epoch}");
486        if let Err(e) = (self.store_drb_result_fn)(epoch, drb).await {
487            tracing::warn!("Failed to add drb result to storage: {e}");
488        }
489        self.membership.write().await.add_drb_result(epoch, drb);
490
491        Ok(())
492    }
493}
494
495fn spawn_catchup<T: NodeType>(
496    coordinator: EpochMembershipCoordinator<T>,
497    epoch: T::Epoch,
498    epoch_tx: Sender<Result<EpochMembership<T>>>,
499) {
500    tokio::spawn(async move {
501        coordinator.clone().catchup(epoch, epoch_tx).await;
502    });
503}
504/// Wrapper around a membership that guarantees that the epoch
505/// has a stake table
506pub struct EpochMembership<TYPES: NodeType> {
507    /// Epoch the `membership` is guaranteed to have a stake table for
508    pub epoch: Option<TYPES::Epoch>,
509    /// Underlying membership
510    pub coordinator: EpochMembershipCoordinator<TYPES>,
511}
512
513impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
514    fn clone(&self) -> Self {
515        Self {
516            coordinator: self.coordinator.clone(),
517            epoch: self.epoch,
518        }
519    }
520}
521
522impl<TYPES: NodeType> EpochMembership<TYPES> {
523    /// Get the epoch this membership is good for
524    pub fn epoch(&self) -> Option<TYPES::Epoch> {
525        self.epoch
526    }
527
528    /// Get a membership for the next epoch
529    pub async fn next_epoch(&self) -> Result<Self> {
530        ensure!(
531            self.epoch().is_some(),
532            "No next epoch because epoch is None"
533        );
534        self.coordinator
535            .membership_for_epoch(self.epoch.map(|e| e + 1))
536            .await
537    }
538    /// Get a membership for the next epoch
539    pub async fn next_epoch_stake_table(&self) -> Result<Self> {
540        ensure!(
541            self.epoch().is_some(),
542            "No next epoch because epoch is None"
543        );
544        self.coordinator
545            .stake_table_for_epoch(self.epoch.map(|e| e + 1))
546            .await
547    }
548    pub async fn get_new_epoch(&self, epoch: Option<TYPES::Epoch>) -> Result<Self> {
549        self.coordinator.membership_for_epoch(epoch).await
550    }
551
552    /// Wraps the same named Membership trait fn
553    async fn get_epoch_root(&self, block_height: u64) -> anyhow::Result<Leaf2<TYPES>> {
554        let Some(epoch) = self.epoch else {
555            anyhow::bail!("Cannot get root for None epoch");
556        };
557        <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
558            self.coordinator.membership.clone(),
559            block_height,
560            epoch,
561        )
562        .await
563    }
564
565    /// Wraps the same named Membership trait fn
566    async fn get_epoch_drb(&self, block_height: u64) -> Result<DrbResult> {
567        let Some(epoch) = self.epoch else {
568            return Err(anytrace::warn!("Cannot get drb for None epoch"));
569        };
570        <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
571            self.coordinator.membership.clone(),
572            block_height,
573            epoch,
574        )
575        .await
576        .wrap()
577    }
578
579    /// Get all participants in the committee (including their stake) for a specific epoch
580    pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
581        self.coordinator
582            .membership
583            .read()
584            .await
585            .stake_table(self.epoch)
586    }
587
588    /// Get all participants in the committee (including their stake) for a specific epoch
589    pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
590        self.coordinator
591            .membership
592            .read()
593            .await
594            .da_stake_table(self.epoch)
595    }
596
597    /// Get all participants in the committee for a specific view for a specific epoch
598    pub async fn committee_members(
599        &self,
600        view_number: TYPES::View,
601    ) -> BTreeSet<TYPES::SignatureKey> {
602        self.coordinator
603            .membership
604            .read()
605            .await
606            .committee_members(view_number, self.epoch)
607    }
608
609    /// Get all participants in the committee for a specific view for a specific epoch
610    pub async fn da_committee_members(
611        &self,
612        view_number: TYPES::View,
613    ) -> BTreeSet<TYPES::SignatureKey> {
614        self.coordinator
615            .membership
616            .read()
617            .await
618            .da_committee_members(view_number, self.epoch)
619    }
620
621    /// Get the stake table entry for a public key, returns `None` if the
622    /// key is not in the table for a specific epoch
623    pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
624        self.coordinator
625            .membership
626            .read()
627            .await
628            .stake(pub_key, self.epoch)
629    }
630
631    /// Get the DA stake table entry for a public key, returns `None` if the
632    /// key is not in the table for a specific epoch
633    pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
634        self.coordinator
635            .membership
636            .read()
637            .await
638            .da_stake(pub_key, self.epoch)
639    }
640
641    /// See if a node has stake in the committee in a specific epoch
642    pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
643        self.coordinator
644            .membership
645            .read()
646            .await
647            .has_stake(pub_key, self.epoch)
648    }
649
650    /// See if a node has stake in the committee in a specific epoch
651    pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
652        self.coordinator
653            .membership
654            .read()
655            .await
656            .has_da_stake(pub_key, self.epoch)
657    }
658
659    /// The leader of the committee for view `view_number` in `epoch`.
660    ///
661    /// Note: this function uses a HotShot-internal error type.
662    /// You should implement `lookup_leader`, rather than implementing this function directly.
663    ///
664    /// # Errors
665    /// Returns an error if the leader cannot be calculated.
666    pub async fn leader(&self, view: TYPES::View) -> Result<TYPES::SignatureKey> {
667        self.coordinator
668            .membership
669            .read()
670            .await
671            .leader(view, self.epoch)
672    }
673
674    /// The leader of the committee for view `view_number` in `epoch`.
675    ///
676    /// Note: There is no such thing as a DA leader, so any consumer
677    /// requiring a leader should call this.
678    ///
679    /// # Errors
680    /// Returns an error if the leader cannot be calculated
681    pub async fn lookup_leader(
682        &self,
683        view: TYPES::View,
684    ) -> std::result::Result<
685        TYPES::SignatureKey,
686        <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
687    > {
688        self.coordinator
689            .membership
690            .read()
691            .await
692            .lookup_leader(view, self.epoch)
693    }
694
695    /// Returns the number of total nodes in the committee in an epoch `epoch`
696    pub async fn total_nodes(&self) -> usize {
697        self.coordinator
698            .membership
699            .read()
700            .await
701            .total_nodes(self.epoch)
702    }
703
704    /// Returns the number of total DA nodes in the committee in an epoch `epoch`
705    pub async fn da_total_nodes(&self) -> usize {
706        self.coordinator
707            .membership
708            .read()
709            .await
710            .da_total_nodes(self.epoch)
711    }
712
713    /// Returns the threshold for a specific `Membership` implementation
714    pub async fn success_threshold(&self) -> U256 {
715        self.coordinator
716            .membership
717            .read()
718            .await
719            .success_threshold(self.epoch)
720    }
721
722    /// Returns the DA threshold for a specific `Membership` implementation
723    pub async fn da_success_threshold(&self) -> U256 {
724        self.coordinator
725            .membership
726            .read()
727            .await
728            .da_success_threshold(self.epoch)
729    }
730
731    /// Returns the threshold for a specific `Membership` implementation
732    pub async fn failure_threshold(&self) -> U256 {
733        self.coordinator
734            .membership
735            .read()
736            .await
737            .failure_threshold(self.epoch)
738    }
739
740    /// Returns the threshold required to upgrade the network protocol
741    pub async fn upgrade_threshold(&self) -> U256 {
742        self.coordinator
743            .membership
744            .read()
745            .await
746            .upgrade_threshold(self.epoch)
747    }
748
749    /// Add the epoch result to the membership
750    pub async fn add_drb_result(&self, drb_result: DrbResult) {
751        if let Some(epoch) = self.epoch() {
752            self.coordinator
753                .membership
754                .write()
755                .await
756                .add_drb_result(epoch, drb_result);
757        }
758    }
759}