1use std::{
2 collections::{BTreeSet, HashMap},
3 sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{broadcast, InactiveReceiver, Sender};
8use async_lock::{Mutex, RwLock};
9use hotshot_utils::{
10 anytrace::{self, Error, Level, Result, Wrap, DEFAULT_LOG_LEVEL},
11 ensure, error, line_info, log, warn,
12};
13
14use crate::{
15 data::Leaf2,
16 drb::{compute_drb_result, DrbDifficultySelectorFn, DrbInput, DrbResult},
17 stake_table::HSStakeTable,
18 traits::{
19 election::Membership,
20 node_implementation::{ConsensusTime, NodeType},
21 storage::{
22 load_drb_progress_fn, store_drb_progress_fn, store_drb_result_fn, LoadDrbProgressFn,
23 Storage, StoreDrbProgressFn, StoreDrbResultFn,
24 },
25 },
26 utils::{root_block_in_epoch, transition_block_for_epoch},
27 PeerConfig,
28};
29
30type EpochMap<TYPES> =
31 HashMap<<TYPES as NodeType>::Epoch, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
32
33type EpochSender<TYPES> = (
34 <TYPES as NodeType>::Epoch,
35 Sender<Result<EpochMembership<TYPES>>>,
36);
37
38pub struct EpochMembershipCoordinator<TYPES: NodeType> {
40 membership: Arc<RwLock<TYPES::Membership>>,
42
43 catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
48
49 pub epoch_height: u64,
51
52 store_drb_progress_fn: StoreDrbProgressFn,
53
54 load_drb_progress_fn: LoadDrbProgressFn,
55
56 store_drb_result_fn: StoreDrbResultFn<TYPES>,
58
59 pub drb_difficulty_selector: Arc<RwLock<Option<DrbDifficultySelectorFn<TYPES>>>>,
61}
62
63impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
64 fn clone(&self) -> Self {
65 Self {
66 membership: Arc::clone(&self.membership),
67 catchup_map: Arc::clone(&self.catchup_map),
68 epoch_height: self.epoch_height,
69 store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
70 load_drb_progress_fn: Arc::clone(&self.load_drb_progress_fn),
71 store_drb_result_fn: self.store_drb_result_fn.clone(),
72 drb_difficulty_selector: Arc::clone(&self.drb_difficulty_selector),
73 }
74 }
75}
76
77impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
78where
79 Self: Send,
80{
81 pub fn new<S: Storage<TYPES>>(
83 membership: Arc<RwLock<TYPES::Membership>>,
84 epoch_height: u64,
85 storage: &S,
86 ) -> Self {
87 Self {
88 membership,
89 catchup_map: Arc::default(),
90 epoch_height,
91 store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
92 load_drb_progress_fn: load_drb_progress_fn(storage.clone()),
93 store_drb_result_fn: store_drb_result_fn(storage.clone()),
94 drb_difficulty_selector: Arc::new(RwLock::new(None)),
95 }
96 }
97
98 #[must_use]
100 pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
101 &self.membership
102 }
103
104 pub async fn set_drb_difficulty_selector(
106 &self,
107 drb_difficulty_selector: DrbDifficultySelectorFn<TYPES>,
108 ) {
109 let mut drb_difficulty_selector_writer = self.drb_difficulty_selector.write().await;
110
111 *drb_difficulty_selector_writer = Some(drb_difficulty_selector);
112 }
113
114 pub async fn membership_for_epoch(
117 &self,
118 maybe_epoch: Option<TYPES::Epoch>,
119 ) -> Result<EpochMembership<TYPES>> {
120 let ret_val = EpochMembership {
121 epoch: maybe_epoch,
122 coordinator: self.clone(),
123 };
124 let Some(epoch) = maybe_epoch else {
125 return Ok(ret_val);
126 };
127 if self
128 .membership
129 .read()
130 .await
131 .has_randomized_stake_table(epoch)
132 .map_err(|e| {
133 error!(
134 "membership_for_epoch failed while called with maybe_epoch {maybe_epoch:?}: \
135 {e}"
136 )
137 })?
138 {
139 return Ok(ret_val);
140 }
141 if self.catchup_map.lock().await.contains_key(&epoch) {
142 return Err(warn!(
143 "Randomized stake table for epoch {epoch:?} unavailable. Catchup already in \
144 progress"
145 ));
146 }
147 let coordinator = self.clone();
148 let (tx, rx) = broadcast(1);
149 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
150 spawn_catchup(coordinator, epoch, tx);
151
152 Err(warn!(
153 "Randomized stake table for epoch {epoch:?} unavailable. Starting catchup"
154 ))
155 }
156
157 pub async fn stake_table_for_epoch(
160 &self,
161 maybe_epoch: Option<TYPES::Epoch>,
162 ) -> Result<EpochMembership<TYPES>> {
163 let ret_val = EpochMembership {
164 epoch: maybe_epoch,
165 coordinator: self.clone(),
166 };
167 let Some(epoch) = maybe_epoch else {
168 return Ok(ret_val);
169 };
170 if self.membership.read().await.has_stake_table(epoch) {
171 return Ok(ret_val);
172 }
173 if self.catchup_map.lock().await.contains_key(&epoch) {
174 return Err(warn!(
175 "Stake table for Epoch {epoch:?} Unavailable. Catch up already in Progress"
176 ));
177 }
178 let coordinator = self.clone();
179 let (tx, rx) = broadcast(1);
180 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
181 spawn_catchup(coordinator, epoch, tx);
182
183 Err(warn!(
184 "Stake table for Epoch {epoch:?} Unavailable. Starting catchup"
185 ))
186 }
187
188 async fn catchup(
198 mut self,
199 epoch: TYPES::Epoch,
200 epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
201 ) {
202 let mut fetch_epochs = vec![];
204 fetch_epochs.push((epoch, epoch_tx));
205
206 let mut try_epoch = TYPES::Epoch::new(epoch.saturating_sub(1));
207 let maybe_first_epoch = self.membership.read().await.first_epoch();
208 let Some(first_epoch) = maybe_first_epoch else {
209 let err = anytrace::error!(
210 "We got a catchup request for epoch {epoch:?} but the first epoch is not set"
211 );
212 self.catchup_cleanup(epoch, fetch_epochs, err).await;
213 return;
214 };
215
216 loop {
218 let has_stake_table = self.membership.read().await.has_stake_table(try_epoch);
219 if has_stake_table {
220 if try_epoch <= TYPES::Epoch::new(epoch.saturating_sub(2)) {
222 break;
223 }
224 try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
225 } else {
226 if try_epoch <= first_epoch + 1 {
227 let err = anytrace::error!(
228 "We are trying to catchup to an epoch lower than the second epoch! This \
229 means the initial stake table is missing!"
230 );
231 self.catchup_cleanup(epoch, fetch_epochs, err).await;
232 return;
233 }
234 let mut map_lock = self.catchup_map.lock().await;
236 if let Some(mut rx) = map_lock
237 .get(&try_epoch)
238 .map(InactiveReceiver::activate_cloned)
239 {
240 drop(map_lock);
242 if let Ok(Ok(_)) = rx.recv_direct().await {
243 break;
244 };
245 } else {
247 let (mut tx, rx) = broadcast(1);
249 tx.set_overflow(true);
250 map_lock.insert(try_epoch, rx.deactivate());
251 drop(map_lock);
252 fetch_epochs.push((try_epoch, tx));
253 try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
254 }
255 };
256 }
257
258 while let Some((current_fetch_epoch, tx)) = fetch_epochs.pop() {
260 let root_leaf = match self.fetch_stake_table(current_fetch_epoch).await {
261 Ok(roof_leaf) => roof_leaf,
262 Err(err) => {
263 fetch_epochs.push((current_fetch_epoch, tx));
264 self.catchup_cleanup(epoch, fetch_epochs, err).await;
265 return;
266 },
267 };
268
269 if let Err(err) = self
270 .fetch_or_calc_drb_results(current_fetch_epoch, root_leaf)
271 .await
272 {
273 fetch_epochs.push((current_fetch_epoch, tx));
274 self.catchup_cleanup(epoch, fetch_epochs, err).await;
275 return;
276 }
277
278 if let Ok(Some(res)) = tx.try_broadcast(Ok(EpochMembership {
280 epoch: Some(current_fetch_epoch),
281 coordinator: self.clone(),
282 })) {
283 tracing::warn!(
284 "The catchup channel for epoch {} was overflown, dropped message {:?}",
285 current_fetch_epoch,
286 res.map(|em| em.epoch)
287 );
288 }
289
290 self.catchup_map.lock().await.remove(¤t_fetch_epoch);
292 }
293 }
294
295 pub async fn wait_for_catchup(&self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
301 let maybe_receiver = self
302 .catchup_map
303 .lock()
304 .await
305 .get(&epoch)
306 .map(InactiveReceiver::activate_cloned);
307 let Some(mut rx) = maybe_receiver else {
308 if self.membership.read().await.has_stake_table(epoch) {
310 return Ok(EpochMembership {
311 epoch: Some(epoch),
312 coordinator: self.clone(),
313 });
314 }
315 return Err(anytrace::error!(
316 "No catchup in progress for epoch {epoch} and we don't have a stake table for it"
317 ));
318 };
319 let Ok(Ok(mem)) = rx.recv_direct().await else {
320 return Err(anytrace::error!("Catchup for epoch {epoch} failed"));
321 };
322 Ok(mem)
323 }
324
325 async fn catchup_cleanup(
332 &mut self,
333 req_epoch: TYPES::Epoch,
334 cancel_epochs: Vec<EpochSender<TYPES>>,
335 err: Error,
336 ) {
337 let mut map_lock = self.catchup_map.lock().await;
339 for (epoch, _) in cancel_epochs.iter() {
340 map_lock.remove(epoch);
342 }
343 drop(map_lock);
344 for (cancel_epoch, tx) in cancel_epochs {
345 if let Ok(Some(res)) = tx.try_broadcast(Err(err.clone())) {
347 tracing::warn!(
348 "The catchup channel for epoch {} was overflown during cleanup, dropped \
349 message {:?}",
350 cancel_epoch,
351 res.map(|em| em.epoch)
352 );
353 }
354 }
355 tracing::error!("catchup for epoch {req_epoch:?} failed: {err:?}");
356 }
357
358 async fn fetch_stake_table(&self, epoch: TYPES::Epoch) -> Result<Leaf2<TYPES>> {
372 let root_epoch = TYPES::Epoch::new(epoch.saturating_sub(2));
373 let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
374 return Err(anytrace::error!(
375 "We tried to fetch stake table for epoch {epoch:?} but we don't have its root \
376 epoch {root_epoch:?}. This should not happen"
377 ));
378 };
379
380 let Ok(root_leaf) = root_membership
383 .get_epoch_root(root_block_in_epoch(*root_epoch, self.epoch_height))
384 .await
385 else {
386 return Err(anytrace::error!(
387 "get epoch root leaf failed for epoch {root_epoch:?}"
388 ));
389 };
390
391 Membership::add_epoch_root(
392 Arc::clone(&self.membership),
393 epoch,
394 root_leaf.block_header().clone(),
395 )
396 .await
397 .map_err(|e| {
398 anytrace::error!("Failed to add epoch root for epoch {epoch:?} to membership: {e}")
399 })?;
400
401 Ok(root_leaf)
402 }
403
404 async fn fetch_or_calc_drb_results(
422 &self,
423 epoch: TYPES::Epoch,
424 root_leaf: Leaf2<TYPES>,
425 ) -> Result<()> {
426 let root_epoch = TYPES::Epoch::new(epoch.saturating_sub(2));
427 let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
428 return Err(anytrace::error!(
429 "We tried to fetch drb result for epoch {epoch:?} but we don't have its root \
430 epoch {root_epoch:?}. This should not happen"
431 ));
432 };
433
434 let Ok(drb_membership) = root_membership.next_epoch_stake_table().await else {
435 return Err(anytrace::error!(
436 "get drb stake table failed for epoch {root_epoch:?}"
437 ));
438 };
439
440 let drb = if let Ok(drb) = drb_membership
443 .get_epoch_drb(transition_block_for_epoch(
444 *(root_epoch + 1),
445 self.epoch_height,
446 ))
447 .await
448 {
449 drb
450 } else {
451 let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures)
452 else {
453 return Err(anytrace::error!(
454 "Failed to serialize the QC signature for leaf {root_leaf:?}"
455 ));
456 };
457
458 let Some(ref drb_difficulty_selector) = *self.drb_difficulty_selector.read().await
459 else {
460 return Err(anytrace::error!(
461 "The DRB difficulty selector is missing from the epoch membership \
462 coordinator. This node will not be able to spawn any DRB calculation tasks \
463 from catchup."
464 ));
465 };
466
467 let drb_difficulty = drb_difficulty_selector(root_leaf.view_number()).await;
468
469 let mut drb_seed_input = [0u8; 32];
470 let len = drb_seed_input_vec.len().min(32);
471 drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
472 let drb_input = DrbInput {
473 epoch: *epoch,
474 iteration: 0,
475 value: drb_seed_input,
476 difficulty_level: drb_difficulty,
477 };
478
479 let store_drb_progress_fn = self.store_drb_progress_fn.clone();
480 let load_drb_progress_fn = self.load_drb_progress_fn.clone();
481
482 compute_drb_result(drb_input, store_drb_progress_fn, load_drb_progress_fn).await
483 };
484
485 tracing::info!("Writing drb result from catchup to storage for epoch {epoch}");
486 if let Err(e) = (self.store_drb_result_fn)(epoch, drb).await {
487 tracing::warn!("Failed to add drb result to storage: {e}");
488 }
489 self.membership.write().await.add_drb_result(epoch, drb);
490
491 Ok(())
492 }
493}
494
495fn spawn_catchup<T: NodeType>(
496 coordinator: EpochMembershipCoordinator<T>,
497 epoch: T::Epoch,
498 epoch_tx: Sender<Result<EpochMembership<T>>>,
499) {
500 tokio::spawn(async move {
501 coordinator.clone().catchup(epoch, epoch_tx).await;
502 });
503}
504pub struct EpochMembership<TYPES: NodeType> {
507 pub epoch: Option<TYPES::Epoch>,
509 pub coordinator: EpochMembershipCoordinator<TYPES>,
511}
512
513impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
514 fn clone(&self) -> Self {
515 Self {
516 coordinator: self.coordinator.clone(),
517 epoch: self.epoch,
518 }
519 }
520}
521
522impl<TYPES: NodeType> EpochMembership<TYPES> {
523 pub fn epoch(&self) -> Option<TYPES::Epoch> {
525 self.epoch
526 }
527
528 pub async fn next_epoch(&self) -> Result<Self> {
530 ensure!(
531 self.epoch().is_some(),
532 "No next epoch because epoch is None"
533 );
534 self.coordinator
535 .membership_for_epoch(self.epoch.map(|e| e + 1))
536 .await
537 }
538 pub async fn next_epoch_stake_table(&self) -> Result<Self> {
540 ensure!(
541 self.epoch().is_some(),
542 "No next epoch because epoch is None"
543 );
544 self.coordinator
545 .stake_table_for_epoch(self.epoch.map(|e| e + 1))
546 .await
547 }
548 pub async fn get_new_epoch(&self, epoch: Option<TYPES::Epoch>) -> Result<Self> {
549 self.coordinator.membership_for_epoch(epoch).await
550 }
551
552 async fn get_epoch_root(&self, block_height: u64) -> anyhow::Result<Leaf2<TYPES>> {
554 let Some(epoch) = self.epoch else {
555 anyhow::bail!("Cannot get root for None epoch");
556 };
557 <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
558 self.coordinator.membership.clone(),
559 block_height,
560 epoch,
561 )
562 .await
563 }
564
565 async fn get_epoch_drb(&self, block_height: u64) -> Result<DrbResult> {
567 let Some(epoch) = self.epoch else {
568 return Err(anytrace::warn!("Cannot get drb for None epoch"));
569 };
570 <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
571 self.coordinator.membership.clone(),
572 block_height,
573 epoch,
574 )
575 .await
576 .wrap()
577 }
578
579 pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
581 self.coordinator
582 .membership
583 .read()
584 .await
585 .stake_table(self.epoch)
586 }
587
588 pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
590 self.coordinator
591 .membership
592 .read()
593 .await
594 .da_stake_table(self.epoch)
595 }
596
597 pub async fn committee_members(
599 &self,
600 view_number: TYPES::View,
601 ) -> BTreeSet<TYPES::SignatureKey> {
602 self.coordinator
603 .membership
604 .read()
605 .await
606 .committee_members(view_number, self.epoch)
607 }
608
609 pub async fn da_committee_members(
611 &self,
612 view_number: TYPES::View,
613 ) -> BTreeSet<TYPES::SignatureKey> {
614 self.coordinator
615 .membership
616 .read()
617 .await
618 .da_committee_members(view_number, self.epoch)
619 }
620
621 pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
624 self.coordinator
625 .membership
626 .read()
627 .await
628 .stake(pub_key, self.epoch)
629 }
630
631 pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
634 self.coordinator
635 .membership
636 .read()
637 .await
638 .da_stake(pub_key, self.epoch)
639 }
640
641 pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
643 self.coordinator
644 .membership
645 .read()
646 .await
647 .has_stake(pub_key, self.epoch)
648 }
649
650 pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
652 self.coordinator
653 .membership
654 .read()
655 .await
656 .has_da_stake(pub_key, self.epoch)
657 }
658
659 pub async fn leader(&self, view: TYPES::View) -> Result<TYPES::SignatureKey> {
667 self.coordinator
668 .membership
669 .read()
670 .await
671 .leader(view, self.epoch)
672 }
673
674 pub async fn lookup_leader(
682 &self,
683 view: TYPES::View,
684 ) -> std::result::Result<
685 TYPES::SignatureKey,
686 <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
687 > {
688 self.coordinator
689 .membership
690 .read()
691 .await
692 .lookup_leader(view, self.epoch)
693 }
694
695 pub async fn total_nodes(&self) -> usize {
697 self.coordinator
698 .membership
699 .read()
700 .await
701 .total_nodes(self.epoch)
702 }
703
704 pub async fn da_total_nodes(&self) -> usize {
706 self.coordinator
707 .membership
708 .read()
709 .await
710 .da_total_nodes(self.epoch)
711 }
712
713 pub async fn success_threshold(&self) -> U256 {
715 self.coordinator
716 .membership
717 .read()
718 .await
719 .success_threshold(self.epoch)
720 }
721
722 pub async fn da_success_threshold(&self) -> U256 {
724 self.coordinator
725 .membership
726 .read()
727 .await
728 .da_success_threshold(self.epoch)
729 }
730
731 pub async fn failure_threshold(&self) -> U256 {
733 self.coordinator
734 .membership
735 .read()
736 .await
737 .failure_threshold(self.epoch)
738 }
739
740 pub async fn upgrade_threshold(&self) -> U256 {
742 self.coordinator
743 .membership
744 .read()
745 .await
746 .upgrade_threshold(self.epoch)
747 }
748
749 pub async fn add_drb_result(&self, drb_result: DrbResult) {
751 if let Some(epoch) = self.epoch() {
752 self.coordinator
753 .membership
754 .write()
755 .await
756 .add_drb_result(epoch, drb_result);
757 }
758 }
759}