1use std::{
2 collections::{BTreeSet, HashMap, HashSet},
3 sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{broadcast, InactiveReceiver, Sender};
8use async_lock::{Mutex, RwLock};
9use committable::Commitment;
10use hotshot_utils::{
11 anytrace::{self, Error, Level, Result, Wrap, DEFAULT_LOG_LEVEL},
12 ensure, error, line_info, log, warn,
13};
14
15use crate::{
16 data::Leaf2,
17 drb::{compute_drb_result, DrbDifficultySelectorFn, DrbInput, DrbResult},
18 stake_table::HSStakeTable,
19 traits::{
20 election::Membership,
21 node_implementation::{ConsensusTime, NodeType},
22 storage::{
23 load_drb_progress_fn, store_drb_progress_fn, store_drb_result_fn, LoadDrbProgressFn,
24 Storage, StoreDrbProgressFn, StoreDrbResultFn,
25 },
26 },
27 utils::root_block_in_epoch,
28 PeerConfig,
29};
30
31type EpochMap<TYPES> =
32 HashMap<<TYPES as NodeType>::Epoch, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
33
34type DrbMap<TYPES> = HashSet<<TYPES as NodeType>::Epoch>;
35
36type EpochSender<TYPES> = (
37 <TYPES as NodeType>::Epoch,
38 Sender<Result<EpochMembership<TYPES>>>,
39);
40
41pub struct EpochMembershipCoordinator<TYPES: NodeType> {
43 membership: Arc<RwLock<TYPES::Membership>>,
45
46 catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
51
52 drb_calculation_map: Arc<Mutex<DrbMap<TYPES>>>,
53
54 pub epoch_height: u64,
56
57 store_drb_progress_fn: StoreDrbProgressFn,
58
59 load_drb_progress_fn: LoadDrbProgressFn,
60
61 store_drb_result_fn: StoreDrbResultFn<TYPES>,
63
64 pub drb_difficulty_selector: Arc<RwLock<Option<DrbDifficultySelectorFn<TYPES>>>>,
66}
67
68impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
69 fn clone(&self) -> Self {
70 Self {
71 membership: Arc::clone(&self.membership),
72 catchup_map: Arc::clone(&self.catchup_map),
73 drb_calculation_map: Arc::clone(&self.drb_calculation_map),
74 epoch_height: self.epoch_height,
75 store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
76 load_drb_progress_fn: Arc::clone(&self.load_drb_progress_fn),
77 store_drb_result_fn: self.store_drb_result_fn.clone(),
78 drb_difficulty_selector: Arc::clone(&self.drb_difficulty_selector),
79 }
80 }
81}
82
83impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
84where
85 Self: Send,
86{
87 pub fn new<S: Storage<TYPES>>(
89 membership: Arc<RwLock<TYPES::Membership>>,
90 epoch_height: u64,
91 storage: &S,
92 ) -> Self {
93 Self {
94 membership,
95 catchup_map: Arc::default(),
96 drb_calculation_map: Arc::default(),
97 epoch_height,
98 store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
99 load_drb_progress_fn: load_drb_progress_fn(storage.clone()),
100 store_drb_result_fn: store_drb_result_fn(storage.clone()),
101 drb_difficulty_selector: Arc::new(RwLock::new(None)),
102 }
103 }
104
105 #[must_use]
107 pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
108 &self.membership
109 }
110
111 pub async fn set_drb_difficulty_selector(
113 &self,
114 drb_difficulty_selector: DrbDifficultySelectorFn<TYPES>,
115 ) {
116 let mut drb_difficulty_selector_writer = self.drb_difficulty_selector.write().await;
117
118 *drb_difficulty_selector_writer = Some(drb_difficulty_selector);
119 }
120
121 pub async fn membership_for_epoch(
124 &self,
125 maybe_epoch: Option<TYPES::Epoch>,
126 ) -> Result<EpochMembership<TYPES>> {
127 let ret_val = EpochMembership {
128 epoch: maybe_epoch,
129 coordinator: self.clone(),
130 };
131 let Some(epoch) = maybe_epoch else {
132 return Ok(ret_val);
133 };
134 if self
135 .membership
136 .read()
137 .await
138 .has_randomized_stake_table(epoch)
139 .map_err(|e| {
140 error!(
141 "membership_for_epoch failed while called with maybe_epoch {maybe_epoch:?}: \
142 {e}"
143 )
144 })?
145 {
146 return Ok(ret_val);
147 }
148 if self.catchup_map.lock().await.contains_key(&epoch) {
149 return Err(warn!(
150 "Randomized stake table for epoch {epoch:?} unavailable. Catchup already in \
151 progress"
152 ));
153 }
154 let coordinator = self.clone();
155 let (tx, rx) = broadcast(1);
156 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
157 spawn_catchup(coordinator, epoch, tx);
158
159 Err(warn!(
160 "Randomized stake table for epoch {epoch:?} unavailable. Starting catchup"
161 ))
162 }
163
164 pub async fn stake_table_for_epoch(
167 &self,
168 maybe_epoch: Option<TYPES::Epoch>,
169 ) -> Result<EpochMembership<TYPES>> {
170 let ret_val = EpochMembership {
171 epoch: maybe_epoch,
172 coordinator: self.clone(),
173 };
174 let Some(epoch) = maybe_epoch else {
175 return Ok(ret_val);
176 };
177 if self.membership.read().await.has_stake_table(epoch) {
178 return Ok(ret_val);
179 }
180 if self.catchup_map.lock().await.contains_key(&epoch) {
181 return Err(warn!(
182 "Stake table for Epoch {epoch:?} Unavailable. Catch up already in Progress"
183 ));
184 }
185 let coordinator = self.clone();
186 let (tx, rx) = broadcast(1);
187 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
188 spawn_catchup(coordinator, epoch, tx);
189
190 Err(warn!(
191 "Stake table for Epoch {epoch:?} Unavailable. Starting catchup"
192 ))
193 }
194
195 async fn catchup(
205 mut self,
206 epoch: TYPES::Epoch,
207 epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
208 ) {
209 let mut fetch_epochs = vec![];
211 fetch_epochs.push((epoch, epoch_tx));
212
213 let mut try_epoch = TYPES::Epoch::new(epoch.saturating_sub(1));
214 let maybe_first_epoch = self.membership.read().await.first_epoch();
215 let Some(first_epoch) = maybe_first_epoch else {
216 let err = anytrace::error!(
217 "We got a catchup request for epoch {epoch:?} but the first epoch is not set"
218 );
219 self.catchup_cleanup(epoch, fetch_epochs, err).await;
220 return;
221 };
222
223 loop {
225 let has_stake_table = self.membership.read().await.has_stake_table(try_epoch);
226 if has_stake_table {
227 if try_epoch <= TYPES::Epoch::new(epoch.saturating_sub(2)) {
229 break;
230 }
231 try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
232 } else {
233 if try_epoch <= first_epoch + 1 {
234 let err = anytrace::error!(
235 "We are trying to catchup to an epoch lower than the second epoch! This \
236 means the initial stake table is missing!"
237 );
238 self.catchup_cleanup(epoch, fetch_epochs, err).await;
239 return;
240 }
241 let mut map_lock = self.catchup_map.lock().await;
243 if let Some(mut rx) = map_lock
244 .get(&try_epoch)
245 .map(InactiveReceiver::activate_cloned)
246 {
247 drop(map_lock);
249 if let Ok(Ok(_)) = rx.recv_direct().await {
250 break;
251 };
252 } else {
254 let (mut tx, rx) = broadcast(1);
256 tx.set_overflow(true);
257 map_lock.insert(try_epoch, rx.deactivate());
258 drop(map_lock);
259 fetch_epochs.push((try_epoch, tx));
260 try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
261 }
262 };
263 }
264
265 while let Some((current_fetch_epoch, tx)) = fetch_epochs.pop() {
267 let root_leaf = match self.fetch_stake_table(current_fetch_epoch).await {
268 Ok(root_leaf) => root_leaf,
269 Err(err) => {
270 fetch_epochs.push((current_fetch_epoch, tx));
271 self.catchup_cleanup(epoch, fetch_epochs, err).await;
272 return;
273 },
274 };
275 match <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
276 self.membership.clone(),
277 epoch,
278 )
279 .await
280 {
281 Ok(drb_result) => {
282 self.membership
283 .write()
284 .await
285 .add_drb_result(current_fetch_epoch, drb_result);
286 },
287 Err(err) => {
288 tracing::warn!(
289 "DRB result for epoch {} missing from membership. Beginning catchup to \
290 recalculate it. Error: {}",
291 current_fetch_epoch,
292 err
293 );
294
295 if let Err(err) = self
296 .compute_drb_result(current_fetch_epoch, root_leaf)
297 .await
298 {
299 tracing::info!(
300 "DRB calculation for epoch {} failed . Error: {}",
301 current_fetch_epoch,
302 err
303 );
304 fetch_epochs.push((current_fetch_epoch, tx));
305 self.catchup_cleanup(epoch, fetch_epochs, err).await;
306 return;
307 }
308 },
309 };
310
311 if let Ok(Some(res)) = tx.try_broadcast(Ok(EpochMembership {
313 epoch: Some(current_fetch_epoch),
314 coordinator: self.clone(),
315 })) {
316 tracing::warn!(
317 "The catchup channel for epoch {} was overflown, dropped message {:?}",
318 current_fetch_epoch,
319 res.map(|em| em.epoch)
320 );
321 }
322
323 self.catchup_map.lock().await.remove(¤t_fetch_epoch);
325 }
326 }
327
328 pub async fn wait_for_catchup(&self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
334 let maybe_receiver = self
335 .catchup_map
336 .lock()
337 .await
338 .get(&epoch)
339 .map(InactiveReceiver::activate_cloned);
340 let Some(mut rx) = maybe_receiver else {
341 if self.membership.read().await.has_stake_table(epoch) {
343 return Ok(EpochMembership {
344 epoch: Some(epoch),
345 coordinator: self.clone(),
346 });
347 }
348 return Err(anytrace::error!(
349 "No catchup in progress for epoch {epoch} and we don't have a stake table for it"
350 ));
351 };
352 let Ok(Ok(mem)) = rx.recv_direct().await else {
353 return Err(anytrace::error!("Catchup for epoch {epoch} failed"));
354 };
355 Ok(mem)
356 }
357
358 async fn catchup_cleanup(
365 &mut self,
366 req_epoch: TYPES::Epoch,
367 cancel_epochs: Vec<EpochSender<TYPES>>,
368 err: Error,
369 ) {
370 let mut map_lock = self.catchup_map.lock().await;
372 for (epoch, _) in cancel_epochs.iter() {
373 map_lock.remove(epoch);
375 }
376 drop(map_lock);
377 for (cancel_epoch, tx) in cancel_epochs {
378 if let Ok(Some(res)) = tx.try_broadcast(Err(err.clone())) {
380 tracing::warn!(
381 "The catchup channel for epoch {} was overflown during cleanup, dropped \
382 message {:?}",
383 cancel_epoch,
384 res.map(|em| em.epoch)
385 );
386 }
387 }
388 tracing::error!("catchup for epoch {req_epoch:?} failed: {err:?}");
389 }
390
391 async fn fetch_stake_table(&self, epoch: TYPES::Epoch) -> Result<Leaf2<TYPES>> {
405 let root_epoch = TYPES::Epoch::new(epoch.saturating_sub(2));
406 let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
407 return Err(anytrace::error!(
408 "We tried to fetch stake table for epoch {epoch:?} but we don't have its root \
409 epoch {root_epoch:?}. This should not happen"
410 ));
411 };
412
413 let Ok(root_leaf) = root_membership
416 .get_epoch_root(root_block_in_epoch(*root_epoch, self.epoch_height))
417 .await
418 else {
419 return Err(anytrace::error!(
420 "get epoch root leaf failed for epoch {root_epoch:?}"
421 ));
422 };
423
424 Membership::add_epoch_root(
425 Arc::clone(&self.membership),
426 epoch,
427 root_leaf.block_header().clone(),
428 )
429 .await
430 .map_err(|e| {
431 anytrace::error!("Failed to add epoch root for epoch {epoch:?} to membership: {e}")
432 })?;
433
434 Ok(root_leaf)
435 }
436
437 pub async fn compute_drb_result(
438 &self,
439 epoch: TYPES::Epoch,
440 root_leaf: Leaf2<TYPES>,
441 ) -> Result<DrbResult> {
442 let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
443
444 if drb_calculation_map_lock.contains(&epoch) {
445 return Err(anytrace::warn!(
446 "DRB calculation for epoch {} already in progress",
447 epoch
448 ));
449 } else {
450 drb_calculation_map_lock.insert(epoch);
451 }
452
453 drop(drb_calculation_map_lock);
454
455 let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures) else {
456 return Err(anytrace::error!(
457 "Failed to serialize the QC signature for leaf {root_leaf:?}"
458 ));
459 };
460
461 let Some(ref drb_difficulty_selector) = *self.drb_difficulty_selector.read().await else {
462 return Err(anytrace::error!(
463 "The DRB difficulty selector is missing from the epoch membership coordinator. \
464 This node will not be able to spawn any DRB calculation tasks from catchup."
465 ));
466 };
467
468 let drb_difficulty = drb_difficulty_selector(root_leaf.view_number()).await;
469
470 let mut drb_seed_input = [0u8; 32];
471 let len = drb_seed_input_vec.len().min(32);
472 drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
473 let drb_input = DrbInput {
474 epoch: *epoch,
475 iteration: 0,
476 value: drb_seed_input,
477 difficulty_level: drb_difficulty,
478 };
479
480 let store_drb_progress_fn = self.store_drb_progress_fn.clone();
481 let load_drb_progress_fn = self.load_drb_progress_fn.clone();
482
483 let drb = compute_drb_result(drb_input, store_drb_progress_fn, load_drb_progress_fn).await;
484
485 let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
486 drb_calculation_map_lock.remove(&epoch);
487 drop(drb_calculation_map_lock);
488
489 tracing::info!("Writing drb result from catchup to storage for epoch {epoch}: {drb:?}");
490 if let Err(e) = (self.store_drb_result_fn)(epoch, drb).await {
491 tracing::warn!("Failed to add drb result to storage: {e}");
492 }
493 self.membership.write().await.add_drb_result(epoch, drb);
494
495 Ok(drb)
496 }
497}
498
499fn spawn_catchup<T: NodeType>(
500 coordinator: EpochMembershipCoordinator<T>,
501 epoch: T::Epoch,
502 epoch_tx: Sender<Result<EpochMembership<T>>>,
503) {
504 tokio::spawn(async move {
505 coordinator.clone().catchup(epoch, epoch_tx).await;
506 });
507}
508pub struct EpochMembership<TYPES: NodeType> {
511 pub epoch: Option<TYPES::Epoch>,
513 pub coordinator: EpochMembershipCoordinator<TYPES>,
515}
516
517impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
518 fn clone(&self) -> Self {
519 Self {
520 coordinator: self.coordinator.clone(),
521 epoch: self.epoch,
522 }
523 }
524}
525
526impl<TYPES: NodeType> EpochMembership<TYPES> {
527 pub fn epoch(&self) -> Option<TYPES::Epoch> {
529 self.epoch
530 }
531
532 pub async fn next_epoch(&self) -> Result<Self> {
534 ensure!(
535 self.epoch().is_some(),
536 "No next epoch because epoch is None"
537 );
538 self.coordinator
539 .membership_for_epoch(self.epoch.map(|e| e + 1))
540 .await
541 }
542 pub async fn next_epoch_stake_table(&self) -> Result<Self> {
544 ensure!(
545 self.epoch().is_some(),
546 "No next epoch because epoch is None"
547 );
548 self.coordinator
549 .stake_table_for_epoch(self.epoch.map(|e| e + 1))
550 .await
551 }
552 pub async fn get_new_epoch(&self, epoch: Option<TYPES::Epoch>) -> Result<Self> {
553 self.coordinator.membership_for_epoch(epoch).await
554 }
555
556 async fn get_epoch_root(&self, block_height: u64) -> anyhow::Result<Leaf2<TYPES>> {
558 let Some(epoch) = self.epoch else {
559 anyhow::bail!("Cannot get root for None epoch");
560 };
561 <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
562 self.coordinator.membership.clone(),
563 block_height,
564 epoch,
565 )
566 .await
567 }
568
569 pub async fn get_epoch_drb(&self) -> Result<DrbResult> {
571 let Some(epoch) = self.epoch else {
572 return Err(anytrace::warn!("Cannot get drb for None epoch"));
573 };
574 <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
575 self.coordinator.membership.clone(),
576 epoch,
577 )
578 .await
579 .wrap()
580 }
581
582 pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
584 self.coordinator
585 .membership
586 .read()
587 .await
588 .stake_table(self.epoch)
589 }
590
591 pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
593 self.coordinator
594 .membership
595 .read()
596 .await
597 .da_stake_table(self.epoch)
598 }
599
600 pub async fn committee_members(
602 &self,
603 view_number: TYPES::View,
604 ) -> BTreeSet<TYPES::SignatureKey> {
605 self.coordinator
606 .membership
607 .read()
608 .await
609 .committee_members(view_number, self.epoch)
610 }
611
612 pub async fn da_committee_members(
614 &self,
615 view_number: TYPES::View,
616 ) -> BTreeSet<TYPES::SignatureKey> {
617 self.coordinator
618 .membership
619 .read()
620 .await
621 .da_committee_members(view_number, self.epoch)
622 }
623
624 pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
627 self.coordinator
628 .membership
629 .read()
630 .await
631 .stake(pub_key, self.epoch)
632 }
633
634 pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
637 self.coordinator
638 .membership
639 .read()
640 .await
641 .da_stake(pub_key, self.epoch)
642 }
643
644 pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
646 self.coordinator
647 .membership
648 .read()
649 .await
650 .has_stake(pub_key, self.epoch)
651 }
652
653 pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
655 self.coordinator
656 .membership
657 .read()
658 .await
659 .has_da_stake(pub_key, self.epoch)
660 }
661
662 pub async fn leader(&self, view: TYPES::View) -> Result<TYPES::SignatureKey> {
670 self.coordinator
671 .membership
672 .read()
673 .await
674 .leader(view, self.epoch)
675 }
676
677 pub async fn lookup_leader(
685 &self,
686 view: TYPES::View,
687 ) -> std::result::Result<
688 TYPES::SignatureKey,
689 <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
690 > {
691 self.coordinator
692 .membership
693 .read()
694 .await
695 .lookup_leader(view, self.epoch)
696 }
697
698 pub async fn total_nodes(&self) -> usize {
700 self.coordinator
701 .membership
702 .read()
703 .await
704 .total_nodes(self.epoch)
705 }
706
707 pub async fn da_total_nodes(&self) -> usize {
709 self.coordinator
710 .membership
711 .read()
712 .await
713 .da_total_nodes(self.epoch)
714 }
715
716 pub async fn success_threshold(&self) -> U256 {
718 self.coordinator
719 .membership
720 .read()
721 .await
722 .success_threshold(self.epoch)
723 }
724
725 pub async fn da_success_threshold(&self) -> U256 {
727 self.coordinator
728 .membership
729 .read()
730 .await
731 .da_success_threshold(self.epoch)
732 }
733
734 pub async fn failure_threshold(&self) -> U256 {
736 self.coordinator
737 .membership
738 .read()
739 .await
740 .failure_threshold(self.epoch)
741 }
742
743 pub async fn upgrade_threshold(&self) -> U256 {
745 self.coordinator
746 .membership
747 .read()
748 .await
749 .upgrade_threshold(self.epoch)
750 }
751
752 pub async fn add_drb_result(&self, drb_result: DrbResult) {
754 if let Some(epoch) = self.epoch() {
755 self.coordinator
756 .membership
757 .write()
758 .await
759 .add_drb_result(epoch, drb_result);
760 }
761 }
762 pub async fn stake_table_hash(
763 &self,
764 ) -> Option<Commitment<<TYPES::Membership as Membership<TYPES>>::StakeTableHash>> {
765 self.coordinator
766 .membership
767 .read()
768 .await
769 .stake_table_hash(self.epoch?)
770 }
771}