hotshot_types/
epoch_membership.rs

1use std::{
2    collections::{BTreeSet, HashMap},
3    sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{broadcast, InactiveReceiver};
8use async_lock::{Mutex, RwLock};
9use hotshot_utils::{
10    anytrace::{self, Error, Level, Result, Wrap, DEFAULT_LOG_LEVEL},
11    ensure, line_info, log, warn,
12};
13
14use crate::{
15    data::Leaf2,
16    drb::{compute_drb_result, DrbInput, DrbResult},
17    stake_table::HSStakeTable,
18    traits::{
19        election::Membership,
20        node_implementation::{ConsensusTime, NodeType},
21        storage::{
22            storage_add_drb_result, store_drb_progress_fn, Storage, StorageAddDrbResultFn,
23            StoreDrbProgressFn,
24        },
25    },
26    utils::{root_block_in_epoch, transition_block_for_epoch},
27    PeerConfig,
28};
29
30type EpochMap<TYPES> =
31    HashMap<<TYPES as NodeType>::Epoch, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
32
33/// Struct to Coordinate membership catchup
34pub struct EpochMembershipCoordinator<TYPES: NodeType> {
35    /// The underlying membhersip
36    membership: Arc<RwLock<TYPES::Membership>>,
37
38    /// Any in progress attempts at catching up are stored in this map
39    /// Any new callers wantin an `EpochMembership` will await on the signal
40    /// alerting them the membership is ready.  The first caller for an epoch will
41    /// wait for the actual catchup and allert future callers when it's done
42    catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
43
44    /// Callback function to store a drb result when one is calculated during catchup
45    storage_add_drb_result_fn: Option<StorageAddDrbResultFn<TYPES>>,
46
47    /// Number of blocks in an epoch
48    pub epoch_height: u64,
49
50    store_drb_progress_fn: StoreDrbProgressFn,
51}
52
53impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
54    fn clone(&self) -> Self {
55        Self {
56            membership: Arc::clone(&self.membership),
57            catchup_map: Arc::clone(&self.catchup_map),
58            storage_add_drb_result_fn: self.storage_add_drb_result_fn.clone(),
59            epoch_height: self.epoch_height,
60            store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
61        }
62    }
63}
64
65impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
66where
67    Self: Send,
68{
69    /// Create an EpochMembershipCoordinator
70    pub fn new<S: Storage<TYPES>>(
71        membership: Arc<RwLock<TYPES::Membership>>,
72        epoch_height: u64,
73        storage: &S,
74    ) -> Self {
75        Self {
76            membership,
77            catchup_map: Arc::default(),
78            epoch_height,
79            store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
80            storage_add_drb_result_fn: Some(storage_add_drb_result(storage.clone())),
81        }
82    }
83
84    /// Get a reference to the membership
85    #[must_use]
86    pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
87        &self.membership
88    }
89
90    /// Get a Membership for a given Epoch, which is guaranteed to have a randomized stake
91    /// table for the given Epoch
92    pub async fn membership_for_epoch(
93        &self,
94        maybe_epoch: Option<TYPES::Epoch>,
95    ) -> Result<EpochMembership<TYPES>> {
96        let ret_val = EpochMembership {
97            epoch: maybe_epoch,
98            coordinator: self.clone(),
99        };
100        let Some(epoch) = maybe_epoch else {
101            return Ok(ret_val);
102        };
103        if self
104            .membership
105            .read()
106            .await
107            .has_randomized_stake_table(epoch)
108        {
109            return Ok(ret_val);
110        }
111        if self.catchup_map.lock().await.contains_key(&epoch) {
112            return Err(warn!(
113                "Randomized stake table for epoch {:?} unavailable. Catchup already in progress",
114                epoch
115            ));
116        }
117        let coordinator = self.clone();
118        spawn_catchup(coordinator, epoch);
119
120        Err(warn!(
121            "Randomized stake table for epoch {:?} unavailable. Starting catchup",
122            epoch
123        ))
124    }
125
126    /// Get a Membership for a given Epoch, which is guaranteed to have a stake
127    /// table for the given Epoch
128    pub async fn stake_table_for_epoch(
129        &self,
130        maybe_epoch: Option<TYPES::Epoch>,
131    ) -> Result<EpochMembership<TYPES>> {
132        let ret_val = EpochMembership {
133            epoch: maybe_epoch,
134            coordinator: self.clone(),
135        };
136        let Some(epoch) = maybe_epoch else {
137            return Ok(ret_val);
138        };
139        if self.membership.read().await.has_stake_table(epoch) {
140            return Ok(ret_val);
141        }
142        if self.catchup_map.lock().await.contains_key(&epoch) {
143            return Err(warn!(
144                "Stake table for Epoch {:?} Unavailable. Catch up already in Progress",
145                epoch
146            ));
147        }
148        let coordinator = self.clone();
149        spawn_catchup(coordinator, epoch);
150
151        Err(warn!(
152            "Stake table for Epoch {:?} Unavailable. Starting catchup",
153            epoch
154        ))
155    }
156
157    /// Catches the membership up to the epoch passed as an argument.  
158    /// To do this try to get the stake table for the epoch containing this epoch's root
159    /// if the root does not exist recursively catchup until you've found it
160    ///
161    /// If there is another catchup in progress this will not duplicate efforts
162    /// e.g. if we start with only epoch 0 stake table and call catchup for epoch 10, then call catchup for epoch 20
163    /// the first caller will actually do the work for to catchup to epoch 10 then the second caller will continue
164    /// catching up to epoch 20
165    async fn catchup(self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
166        // recursively catchup until we have a stake table for the epoch containing our root
167        ensure!(
168            *epoch != 0 && *epoch != 1,
169            "We are trying to catchup to epoch 0! This means the initial stake table is missing!"
170        );
171        let root_epoch = TYPES::Epoch::new(*epoch - 2);
172
173        let root_membership = if self.membership.read().await.has_stake_table(root_epoch) {
174            EpochMembership {
175                epoch: Some(root_epoch),
176                coordinator: self.clone(),
177            }
178        } else {
179            Box::pin(self.wait_for_catchup(root_epoch)).await?
180        };
181
182        // Get the epoch root headers and update our membership with them, finally sync them
183        // Verification of the root is handled in get_epoch_root_and_drb
184        let Ok(root_leaf) = root_membership
185            .get_epoch_root(root_block_in_epoch(*root_epoch, self.epoch_height))
186            .await
187        else {
188            anytrace::bail!("get epoch root failed for epoch {:?}", root_epoch);
189        };
190
191        let add_epoch_root_updater = {
192            let membership_read = self.membership.read().await;
193            membership_read
194                .add_epoch_root(epoch, root_leaf.block_header().clone())
195                .await
196        };
197
198        if let Some(updater) = add_epoch_root_updater {
199            let mut membership_write = self.membership.write().await;
200            updater(&mut *(membership_write));
201        };
202
203        let drb_membership = match root_membership.next_epoch_stake_table().await {
204            Ok(drb_membership) => drb_membership,
205            Err(_) => Box::pin(self.wait_for_catchup(root_epoch + 1)).await?,
206        };
207
208        // get the DRB from the last block of the epoch right before the one we're catching up to
209        // or compute it if it's not available
210        let drb = if let Ok(drb) = drb_membership
211            .get_epoch_drb(transition_block_for_epoch(
212                *(root_epoch + 1),
213                self.epoch_height,
214            ))
215            .await
216        {
217            drb
218        } else {
219            let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures)
220            else {
221                return Err(anytrace::error!("Failed to serialize the QC signature."));
222            };
223
224            let mut drb_seed_input = [0u8; 32];
225            let len = drb_seed_input_vec.len().min(32);
226            drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
227            let drb_input = DrbInput {
228                epoch: *epoch,
229                iteration: 0,
230                value: drb_seed_input,
231            };
232            let store_drb_progress_fn = self.store_drb_progress_fn.clone();
233            tokio::task::spawn_blocking(move || {
234                compute_drb_result(drb_input, store_drb_progress_fn)
235            })
236            .await
237            .unwrap()
238        };
239
240        if let Some(cb) = &self.storage_add_drb_result_fn {
241            tracing::info!("Writing drb result from catchup to storage for epoch {epoch}");
242            if let Err(e) = cb(epoch, drb).await {
243                tracing::warn!("Failed to add drb result to storage: {e}");
244            }
245        }
246
247        self.membership.write().await.add_drb_result(epoch, drb);
248        Ok(EpochMembership {
249            epoch: Some(epoch),
250            coordinator: self.clone(),
251        })
252    }
253
254    pub async fn wait_for_catchup(&self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
255        let Some(mut rx) = self
256            .catchup_map
257            .lock()
258            .await
259            .get(&epoch)
260            .map(InactiveReceiver::activate_cloned)
261        else {
262            return self.clone().catchup(epoch).await;
263        };
264        let Ok(Ok(mem)) = rx.recv_direct().await else {
265            return self.clone().catchup(epoch).await;
266        };
267        Ok(mem)
268    }
269}
270
271fn spawn_catchup<T: NodeType>(coordinator: EpochMembershipCoordinator<T>, epoch: T::Epoch) {
272    tokio::spawn(async move {
273        let tx = {
274            let mut map = coordinator.catchup_map.lock().await;
275            if map.contains_key(&epoch) {
276                return;
277            }
278            let (tx, rx) = broadcast(1);
279            map.insert(epoch, rx.deactivate());
280            tx
281        };
282        // do catchup
283
284        let result = coordinator.clone().catchup(epoch).await;
285        let _ = tx.broadcast_direct(result.clone()).await;
286
287        if let Err(err) = result {
288            tracing::warn!("failed to catchup for epoch={epoch:?}. err={err:#}");
289            coordinator.catchup_map.lock().await.remove(&epoch);
290        }
291    });
292}
293/// Wrapper around a membership that guarantees that the epoch
294/// has a stake table
295pub struct EpochMembership<TYPES: NodeType> {
296    /// Epoch the `membership` is guaranteed to have a stake table for
297    pub epoch: Option<TYPES::Epoch>,
298    /// Underlying membership
299    pub coordinator: EpochMembershipCoordinator<TYPES>,
300}
301
302impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
303    fn clone(&self) -> Self {
304        Self {
305            coordinator: self.coordinator.clone(),
306            epoch: self.epoch,
307        }
308    }
309}
310
311impl<TYPES: NodeType> EpochMembership<TYPES> {
312    /// Get the epoch this membership is good for
313    pub fn epoch(&self) -> Option<TYPES::Epoch> {
314        self.epoch
315    }
316
317    /// Get a membership for the next epoch
318    pub async fn next_epoch(&self) -> Result<Self> {
319        ensure!(
320            self.epoch().is_some(),
321            "No next epoch because epoch is None"
322        );
323        self.coordinator
324            .membership_for_epoch(self.epoch.map(|e| e + 1))
325            .await
326    }
327    /// Get a membership for the next epoch
328    pub async fn next_epoch_stake_table(&self) -> Result<Self> {
329        ensure!(
330            self.epoch().is_some(),
331            "No next epoch because epoch is None"
332        );
333        self.coordinator
334            .stake_table_for_epoch(self.epoch.map(|e| e + 1))
335            .await
336    }
337    pub async fn get_new_epoch(&self, epoch: Option<TYPES::Epoch>) -> Result<Self> {
338        self.coordinator.membership_for_epoch(epoch).await
339    }
340
341    /// Wraps the same named Membership trait fn
342    async fn get_epoch_root(&self, block_height: u64) -> anyhow::Result<Leaf2<TYPES>> {
343        let Some(epoch) = self.epoch else {
344            anyhow::bail!("Cannot get root for None epoch");
345        };
346        <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
347            self.coordinator.membership.clone(),
348            block_height,
349            epoch,
350        )
351        .await
352    }
353
354    /// Wraps the same named Membership trait fn
355    async fn get_epoch_drb(&self, block_height: u64) -> Result<DrbResult> {
356        let Some(epoch) = self.epoch else {
357            return Err(anytrace::warn!("Cannot get drb for None epoch"));
358        };
359        <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
360            self.coordinator.membership.clone(),
361            block_height,
362            epoch,
363        )
364        .await
365        .wrap()
366    }
367
368    /// Get all participants in the committee (including their stake) for a specific epoch
369    pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
370        self.coordinator
371            .membership
372            .read()
373            .await
374            .stake_table(self.epoch)
375    }
376
377    /// Get all participants in the committee (including their stake) for a specific epoch
378    pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
379        self.coordinator
380            .membership
381            .read()
382            .await
383            .da_stake_table(self.epoch)
384    }
385
386    /// Get all participants in the committee for a specific view for a specific epoch
387    pub async fn committee_members(
388        &self,
389        view_number: TYPES::View,
390    ) -> BTreeSet<TYPES::SignatureKey> {
391        self.coordinator
392            .membership
393            .read()
394            .await
395            .committee_members(view_number, self.epoch)
396    }
397
398    /// Get all participants in the committee for a specific view for a specific epoch
399    pub async fn da_committee_members(
400        &self,
401        view_number: TYPES::View,
402    ) -> BTreeSet<TYPES::SignatureKey> {
403        self.coordinator
404            .membership
405            .read()
406            .await
407            .da_committee_members(view_number, self.epoch)
408    }
409
410    /// Get the stake table entry for a public key, returns `None` if the
411    /// key is not in the table for a specific epoch
412    pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
413        self.coordinator
414            .membership
415            .read()
416            .await
417            .stake(pub_key, self.epoch)
418    }
419
420    /// Get the DA stake table entry for a public key, returns `None` if the
421    /// key is not in the table for a specific epoch
422    pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
423        self.coordinator
424            .membership
425            .read()
426            .await
427            .da_stake(pub_key, self.epoch)
428    }
429
430    /// See if a node has stake in the committee in a specific epoch
431    pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
432        self.coordinator
433            .membership
434            .read()
435            .await
436            .has_stake(pub_key, self.epoch)
437    }
438
439    /// See if a node has stake in the committee in a specific epoch
440    pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
441        self.coordinator
442            .membership
443            .read()
444            .await
445            .has_da_stake(pub_key, self.epoch)
446    }
447
448    /// The leader of the committee for view `view_number` in `epoch`.
449    ///
450    /// Note: this function uses a HotShot-internal error type.
451    /// You should implement `lookup_leader`, rather than implementing this function directly.
452    ///
453    /// # Errors
454    /// Returns an error if the leader cannot be calculated.
455    pub async fn leader(&self, view: TYPES::View) -> Result<TYPES::SignatureKey> {
456        self.coordinator
457            .membership
458            .read()
459            .await
460            .leader(view, self.epoch)
461    }
462
463    /// The leader of the committee for view `view_number` in `epoch`.
464    ///
465    /// Note: There is no such thing as a DA leader, so any consumer
466    /// requiring a leader should call this.
467    ///
468    /// # Errors
469    /// Returns an error if the leader cannot be calculated
470    pub async fn lookup_leader(
471        &self,
472        view: TYPES::View,
473    ) -> std::result::Result<
474        TYPES::SignatureKey,
475        <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
476    > {
477        self.coordinator
478            .membership
479            .read()
480            .await
481            .lookup_leader(view, self.epoch)
482    }
483
484    /// Returns the number of total nodes in the committee in an epoch `epoch`
485    pub async fn total_nodes(&self) -> usize {
486        self.coordinator
487            .membership
488            .read()
489            .await
490            .total_nodes(self.epoch)
491    }
492
493    /// Returns the number of total DA nodes in the committee in an epoch `epoch`
494    pub async fn da_total_nodes(&self) -> usize {
495        self.coordinator
496            .membership
497            .read()
498            .await
499            .da_total_nodes(self.epoch)
500    }
501
502    /// Returns the threshold for a specific `Membership` implementation
503    pub async fn success_threshold(&self) -> U256 {
504        self.coordinator
505            .membership
506            .read()
507            .await
508            .success_threshold(self.epoch)
509    }
510
511    /// Returns the DA threshold for a specific `Membership` implementation
512    pub async fn da_success_threshold(&self) -> U256 {
513        self.coordinator
514            .membership
515            .read()
516            .await
517            .da_success_threshold(self.epoch)
518    }
519
520    /// Returns the threshold for a specific `Membership` implementation
521    pub async fn failure_threshold(&self) -> U256 {
522        self.coordinator
523            .membership
524            .read()
525            .await
526            .failure_threshold(self.epoch)
527    }
528
529    /// Returns the threshold required to upgrade the network protocol
530    pub async fn upgrade_threshold(&self) -> U256 {
531        self.coordinator
532            .membership
533            .read()
534            .await
535            .upgrade_threshold(self.epoch)
536    }
537
538    /// Add the epoch result to the membership
539    pub async fn add_drb_result(&self, drb_result: DrbResult) {
540        if let Some(epoch) = self.epoch() {
541            self.coordinator
542                .membership
543                .write()
544                .await
545                .add_drb_result(epoch, drb_result);
546        }
547    }
548}