1use std::{
2 collections::{BTreeSet, HashMap},
3 sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{broadcast, InactiveReceiver, Sender};
8use async_lock::{Mutex, RwLock};
9use hotshot_utils::{
10 anytrace::{self, Error, Level, Result, Wrap, DEFAULT_LOG_LEVEL},
11 ensure, error, line_info, log, warn,
12};
13
14use crate::{
15 data::Leaf2,
16 drb::{compute_drb_result, DrbDifficultySelectorFn, DrbInput, DrbResult},
17 stake_table::HSStakeTable,
18 traits::{
19 election::Membership,
20 node_implementation::{ConsensusTime, NodeType},
21 storage::{
22 load_drb_progress_fn, store_drb_progress_fn, store_drb_result_fn, LoadDrbProgressFn,
23 Storage, StoreDrbProgressFn, StoreDrbResultFn,
24 },
25 },
26 utils::{root_block_in_epoch, transition_block_for_epoch},
27 PeerConfig,
28};
29
30type EpochMap<TYPES> =
31 HashMap<<TYPES as NodeType>::Epoch, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
32
33type EpochSender<TYPES> = (
34 <TYPES as NodeType>::Epoch,
35 Sender<Result<EpochMembership<TYPES>>>,
36);
37
38pub struct EpochMembershipCoordinator<TYPES: NodeType> {
40 membership: Arc<RwLock<TYPES::Membership>>,
42
43 catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
48
49 pub epoch_height: u64,
51
52 store_drb_progress_fn: StoreDrbProgressFn,
53
54 load_drb_progress_fn: LoadDrbProgressFn,
55
56 store_drb_result_fn: StoreDrbResultFn<TYPES>,
58
59 pub drb_difficulty_selector: Arc<RwLock<Option<DrbDifficultySelectorFn<TYPES>>>>,
61}
62
63impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
64 fn clone(&self) -> Self {
65 Self {
66 membership: Arc::clone(&self.membership),
67 catchup_map: Arc::clone(&self.catchup_map),
68 epoch_height: self.epoch_height,
69 store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
70 load_drb_progress_fn: Arc::clone(&self.load_drb_progress_fn),
71 store_drb_result_fn: self.store_drb_result_fn.clone(),
72 drb_difficulty_selector: Arc::clone(&self.drb_difficulty_selector),
73 }
74 }
75}
76
77impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
78where
79 Self: Send,
80{
81 pub fn new<S: Storage<TYPES>>(
83 membership: Arc<RwLock<TYPES::Membership>>,
84 epoch_height: u64,
85 storage: &S,
86 ) -> Self {
87 Self {
88 membership,
89 catchup_map: Arc::default(),
90 epoch_height,
91 store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
92 load_drb_progress_fn: load_drb_progress_fn(storage.clone()),
93 store_drb_result_fn: store_drb_result_fn(storage.clone()),
94 drb_difficulty_selector: Arc::new(RwLock::new(None)),
95 }
96 }
97
98 #[must_use]
100 pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
101 &self.membership
102 }
103
104 pub async fn set_drb_difficulty_selector(
106 &self,
107 drb_difficulty_selector: DrbDifficultySelectorFn<TYPES>,
108 ) {
109 let mut drb_difficulty_selector_writer = self.drb_difficulty_selector.write().await;
110
111 *drb_difficulty_selector_writer = Some(drb_difficulty_selector);
112 }
113
114 pub async fn membership_for_epoch(
117 &self,
118 maybe_epoch: Option<TYPES::Epoch>,
119 ) -> Result<EpochMembership<TYPES>> {
120 let ret_val = EpochMembership {
121 epoch: maybe_epoch,
122 coordinator: self.clone(),
123 };
124 let Some(epoch) = maybe_epoch else {
125 return Ok(ret_val);
126 };
127 if self
128 .membership
129 .read()
130 .await
131 .has_randomized_stake_table(epoch)
132 .map_err(|e| {
133 error!(
134 "membership_for_epoch failed while called with maybe_epoch {maybe_epoch:?}: {e}"
135 )
136 })?
137 {
138 return Ok(ret_val);
139 }
140 if self.catchup_map.lock().await.contains_key(&epoch) {
141 return Err(warn!(
142 "Randomized stake table for epoch {epoch:?} unavailable. Catchup already in progress"
143 ));
144 }
145 let coordinator = self.clone();
146 let (tx, rx) = broadcast(1);
147 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
148 spawn_catchup(coordinator, epoch, tx);
149
150 Err(warn!(
151 "Randomized stake table for epoch {epoch:?} unavailable. Starting catchup"
152 ))
153 }
154
155 pub async fn stake_table_for_epoch(
158 &self,
159 maybe_epoch: Option<TYPES::Epoch>,
160 ) -> Result<EpochMembership<TYPES>> {
161 let ret_val = EpochMembership {
162 epoch: maybe_epoch,
163 coordinator: self.clone(),
164 };
165 let Some(epoch) = maybe_epoch else {
166 return Ok(ret_val);
167 };
168 if self.membership.read().await.has_stake_table(epoch) {
169 return Ok(ret_val);
170 }
171 if self.catchup_map.lock().await.contains_key(&epoch) {
172 return Err(warn!(
173 "Stake table for Epoch {epoch:?} Unavailable. Catch up already in Progress"
174 ));
175 }
176 let coordinator = self.clone();
177 let (tx, rx) = broadcast(1);
178 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
179 spawn_catchup(coordinator, epoch, tx);
180
181 Err(warn!(
182 "Stake table for Epoch {epoch:?} Unavailable. Starting catchup"
183 ))
184 }
185
186 async fn catchup(
196 mut self,
197 epoch: TYPES::Epoch,
198 epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
199 ) {
200 let mut fetch_epochs = vec![];
202 fetch_epochs.push((epoch, epoch_tx));
203
204 let mut try_epoch = TYPES::Epoch::new(epoch.saturating_sub(1));
205 let maybe_first_epoch = self.membership.read().await.first_epoch();
206 let Some(first_epoch) = maybe_first_epoch else {
207 let err = anytrace::error!(
208 "We got a catchup request for epoch {epoch:?} but the first epoch is not set"
209 );
210 self.catchup_cleanup(epoch, fetch_epochs, err).await;
211 return;
212 };
213
214 loop {
216 let has_stake_table = self.membership.read().await.has_stake_table(try_epoch);
217 if has_stake_table {
218 if try_epoch <= TYPES::Epoch::new(epoch.saturating_sub(2)) {
220 break;
221 }
222 try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
223 } else {
224 if try_epoch <= first_epoch + 1 {
225 let err = anytrace::error!(
226 "We are trying to catchup to an epoch lower than the second epoch! \
227 This means the initial stake table is missing!"
228 );
229 self.catchup_cleanup(epoch, fetch_epochs, err).await;
230 return;
231 }
232 let mut map_lock = self.catchup_map.lock().await;
234 if let Some(mut rx) = map_lock
235 .get(&try_epoch)
236 .map(InactiveReceiver::activate_cloned)
237 {
238 drop(map_lock);
240 if let Ok(Ok(_)) = rx.recv_direct().await {
241 break;
242 };
243 } else {
245 let (mut tx, rx) = broadcast(1);
247 tx.set_overflow(true);
248 map_lock.insert(try_epoch, rx.deactivate());
249 drop(map_lock);
250 fetch_epochs.push((try_epoch, tx));
251 try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
252 }
253 };
254 }
255
256 while let Some((current_fetch_epoch, tx)) = fetch_epochs.pop() {
258 let root_leaf = match self.fetch_stake_table(current_fetch_epoch).await {
259 Ok(roof_leaf) => roof_leaf,
260 Err(err) => {
261 fetch_epochs.push((current_fetch_epoch, tx));
262 self.catchup_cleanup(epoch, fetch_epochs, err).await;
263 return;
264 },
265 };
266
267 if let Err(err) = self
268 .fetch_or_calc_drb_results(current_fetch_epoch, root_leaf)
269 .await
270 {
271 fetch_epochs.push((current_fetch_epoch, tx));
272 self.catchup_cleanup(epoch, fetch_epochs, err).await;
273 return;
274 }
275
276 if let Ok(Some(res)) = tx.try_broadcast(Ok(EpochMembership {
278 epoch: Some(current_fetch_epoch),
279 coordinator: self.clone(),
280 })) {
281 tracing::warn!(
282 "The catchup channel for epoch {} was overflown, dropped message {:?}",
283 current_fetch_epoch,
284 res.map(|em| em.epoch)
285 );
286 }
287
288 self.catchup_map.lock().await.remove(¤t_fetch_epoch);
290 }
291 }
292
293 pub async fn wait_for_catchup(&self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
299 let maybe_receiver = self
300 .catchup_map
301 .lock()
302 .await
303 .get(&epoch)
304 .map(InactiveReceiver::activate_cloned);
305 let Some(mut rx) = maybe_receiver else {
306 if self.membership.read().await.has_stake_table(epoch) {
308 return Ok(EpochMembership {
309 epoch: Some(epoch),
310 coordinator: self.clone(),
311 });
312 }
313 return Err(anytrace::error!(
314 "No catchup in progress for epoch {epoch} and we don't have a stake table for it"
315 ));
316 };
317 let Ok(Ok(mem)) = rx.recv_direct().await else {
318 return Err(anytrace::error!("Catchup for epoch {epoch} failed"));
319 };
320 Ok(mem)
321 }
322
323 async fn catchup_cleanup(
330 &mut self,
331 req_epoch: TYPES::Epoch,
332 cancel_epochs: Vec<EpochSender<TYPES>>,
333 err: Error,
334 ) {
335 let mut map_lock = self.catchup_map.lock().await;
337 for (epoch, _) in cancel_epochs.iter() {
338 map_lock.remove(epoch);
340 }
341 drop(map_lock);
342 for (cancel_epoch, tx) in cancel_epochs {
343 if let Ok(Some(res)) = tx.try_broadcast(Err(err.clone())) {
345 tracing::warn!(
346 "The catchup channel for epoch {} was overflown during cleanup, dropped message {:?}",
347 cancel_epoch,
348 res.map(|em| em.epoch)
349 );
350 }
351 }
352 tracing::error!("catchup for epoch {req_epoch:?} failed: {err:?}");
353 }
354
355 async fn fetch_stake_table(&self, epoch: TYPES::Epoch) -> Result<Leaf2<TYPES>> {
369 let root_epoch = TYPES::Epoch::new(epoch.saturating_sub(2));
370 let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
371 return Err(anytrace::error!(
372 "We tried to fetch stake table for epoch {epoch:?} but we don't have its root epoch {root_epoch:?}. This should not happen"
373 ));
374 };
375
376 let Ok(root_leaf) = root_membership
379 .get_epoch_root(root_block_in_epoch(*root_epoch, self.epoch_height))
380 .await
381 else {
382 return Err(anytrace::error!(
383 "get epoch root leaf failed for epoch {root_epoch:?}"
384 ));
385 };
386
387 Membership::add_epoch_root(
388 Arc::clone(&self.membership),
389 epoch,
390 root_leaf.block_header().clone(),
391 )
392 .await
393 .map_err(|e| {
394 anytrace::error!("Failed to add epoch root for epoch {epoch:?} to membership: {e}")
395 })?;
396
397 Ok(root_leaf)
398 }
399
400 async fn fetch_or_calc_drb_results(
418 &self,
419 epoch: TYPES::Epoch,
420 root_leaf: Leaf2<TYPES>,
421 ) -> Result<()> {
422 let root_epoch = TYPES::Epoch::new(epoch.saturating_sub(2));
423 let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
424 return Err(anytrace::error!("We tried to fetch drb result for epoch {epoch:?} but we don't have its root epoch {root_epoch:?}. This should not happen"));
425 };
426
427 let Ok(drb_membership) = root_membership.next_epoch_stake_table().await else {
428 return Err(anytrace::error!(
429 "get drb stake table failed for epoch {root_epoch:?}"
430 ));
431 };
432
433 let drb = if let Ok(drb) = drb_membership
436 .get_epoch_drb(transition_block_for_epoch(
437 *(root_epoch + 1),
438 self.epoch_height,
439 ))
440 .await
441 {
442 drb
443 } else {
444 let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures)
445 else {
446 return Err(anytrace::error!(
447 "Failed to serialize the QC signature for leaf {root_leaf:?}"
448 ));
449 };
450
451 let Some(ref drb_difficulty_selector) = *self.drb_difficulty_selector.read().await
452 else {
453 return Err(anytrace::error!(
454 "The DRB difficulty selector is missing from the epoch membership coordinator. This node will not be able to spawn any DRB calculation tasks from catchup."
455 ));
456 };
457
458 let drb_difficulty = drb_difficulty_selector(root_leaf.view_number()).await;
459
460 let mut drb_seed_input = [0u8; 32];
461 let len = drb_seed_input_vec.len().min(32);
462 drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
463 let drb_input = DrbInput {
464 epoch: *epoch,
465 iteration: 0,
466 value: drb_seed_input,
467 difficulty_level: drb_difficulty,
468 };
469
470 let store_drb_progress_fn = self.store_drb_progress_fn.clone();
471 let load_drb_progress_fn = self.load_drb_progress_fn.clone();
472
473 compute_drb_result(drb_input, store_drb_progress_fn, load_drb_progress_fn).await
474 };
475
476 tracing::info!("Writing drb result from catchup to storage for epoch {epoch}");
477 if let Err(e) = (self.store_drb_result_fn)(epoch, drb).await {
478 tracing::warn!("Failed to add drb result to storage: {e}");
479 }
480 self.membership.write().await.add_drb_result(epoch, drb);
481
482 Ok(())
483 }
484}
485
486fn spawn_catchup<T: NodeType>(
487 coordinator: EpochMembershipCoordinator<T>,
488 epoch: T::Epoch,
489 epoch_tx: Sender<Result<EpochMembership<T>>>,
490) {
491 tokio::spawn(async move {
492 coordinator.clone().catchup(epoch, epoch_tx).await;
493 });
494}
495pub struct EpochMembership<TYPES: NodeType> {
498 pub epoch: Option<TYPES::Epoch>,
500 pub coordinator: EpochMembershipCoordinator<TYPES>,
502}
503
504impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
505 fn clone(&self) -> Self {
506 Self {
507 coordinator: self.coordinator.clone(),
508 epoch: self.epoch,
509 }
510 }
511}
512
513impl<TYPES: NodeType> EpochMembership<TYPES> {
514 pub fn epoch(&self) -> Option<TYPES::Epoch> {
516 self.epoch
517 }
518
519 pub async fn next_epoch(&self) -> Result<Self> {
521 ensure!(
522 self.epoch().is_some(),
523 "No next epoch because epoch is None"
524 );
525 self.coordinator
526 .membership_for_epoch(self.epoch.map(|e| e + 1))
527 .await
528 }
529 pub async fn next_epoch_stake_table(&self) -> Result<Self> {
531 ensure!(
532 self.epoch().is_some(),
533 "No next epoch because epoch is None"
534 );
535 self.coordinator
536 .stake_table_for_epoch(self.epoch.map(|e| e + 1))
537 .await
538 }
539 pub async fn get_new_epoch(&self, epoch: Option<TYPES::Epoch>) -> Result<Self> {
540 self.coordinator.membership_for_epoch(epoch).await
541 }
542
543 async fn get_epoch_root(&self, block_height: u64) -> anyhow::Result<Leaf2<TYPES>> {
545 let Some(epoch) = self.epoch else {
546 anyhow::bail!("Cannot get root for None epoch");
547 };
548 <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
549 self.coordinator.membership.clone(),
550 block_height,
551 epoch,
552 )
553 .await
554 }
555
556 async fn get_epoch_drb(&self, block_height: u64) -> Result<DrbResult> {
558 let Some(epoch) = self.epoch else {
559 return Err(anytrace::warn!("Cannot get drb for None epoch"));
560 };
561 <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
562 self.coordinator.membership.clone(),
563 block_height,
564 epoch,
565 )
566 .await
567 .wrap()
568 }
569
570 pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
572 self.coordinator
573 .membership
574 .read()
575 .await
576 .stake_table(self.epoch)
577 }
578
579 pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
581 self.coordinator
582 .membership
583 .read()
584 .await
585 .da_stake_table(self.epoch)
586 }
587
588 pub async fn committee_members(
590 &self,
591 view_number: TYPES::View,
592 ) -> BTreeSet<TYPES::SignatureKey> {
593 self.coordinator
594 .membership
595 .read()
596 .await
597 .committee_members(view_number, self.epoch)
598 }
599
600 pub async fn da_committee_members(
602 &self,
603 view_number: TYPES::View,
604 ) -> BTreeSet<TYPES::SignatureKey> {
605 self.coordinator
606 .membership
607 .read()
608 .await
609 .da_committee_members(view_number, self.epoch)
610 }
611
612 pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
615 self.coordinator
616 .membership
617 .read()
618 .await
619 .stake(pub_key, self.epoch)
620 }
621
622 pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
625 self.coordinator
626 .membership
627 .read()
628 .await
629 .da_stake(pub_key, self.epoch)
630 }
631
632 pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
634 self.coordinator
635 .membership
636 .read()
637 .await
638 .has_stake(pub_key, self.epoch)
639 }
640
641 pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
643 self.coordinator
644 .membership
645 .read()
646 .await
647 .has_da_stake(pub_key, self.epoch)
648 }
649
650 pub async fn leader(&self, view: TYPES::View) -> Result<TYPES::SignatureKey> {
658 self.coordinator
659 .membership
660 .read()
661 .await
662 .leader(view, self.epoch)
663 }
664
665 pub async fn lookup_leader(
673 &self,
674 view: TYPES::View,
675 ) -> std::result::Result<
676 TYPES::SignatureKey,
677 <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
678 > {
679 self.coordinator
680 .membership
681 .read()
682 .await
683 .lookup_leader(view, self.epoch)
684 }
685
686 pub async fn total_nodes(&self) -> usize {
688 self.coordinator
689 .membership
690 .read()
691 .await
692 .total_nodes(self.epoch)
693 }
694
695 pub async fn da_total_nodes(&self) -> usize {
697 self.coordinator
698 .membership
699 .read()
700 .await
701 .da_total_nodes(self.epoch)
702 }
703
704 pub async fn success_threshold(&self) -> U256 {
706 self.coordinator
707 .membership
708 .read()
709 .await
710 .success_threshold(self.epoch)
711 }
712
713 pub async fn da_success_threshold(&self) -> U256 {
715 self.coordinator
716 .membership
717 .read()
718 .await
719 .da_success_threshold(self.epoch)
720 }
721
722 pub async fn failure_threshold(&self) -> U256 {
724 self.coordinator
725 .membership
726 .read()
727 .await
728 .failure_threshold(self.epoch)
729 }
730
731 pub async fn upgrade_threshold(&self) -> U256 {
733 self.coordinator
734 .membership
735 .read()
736 .await
737 .upgrade_threshold(self.epoch)
738 }
739
740 pub async fn add_drb_result(&self, drb_result: DrbResult) {
742 if let Some(epoch) = self.epoch() {
743 self.coordinator
744 .membership
745 .write()
746 .await
747 .add_drb_result(epoch, drb_result);
748 }
749 }
750}