1use std::{
2 collections::{BTreeSet, HashMap, HashSet},
3 sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{broadcast, InactiveReceiver, Sender};
8use async_lock::{Mutex, RwLock};
9use committable::Commitment;
10use hotshot_utils::{
11 anytrace::{self, Error, Level, Result, Wrap, DEFAULT_LOG_LEVEL},
12 ensure, error, line_info, log, warn,
13};
14
15use crate::{
16 data::Leaf2,
17 drb::{compute_drb_result, DrbDifficultySelectorFn, DrbInput, DrbResult},
18 stake_table::HSStakeTable,
19 traits::{
20 election::Membership,
21 node_implementation::{ConsensusTime, NodeType},
22 storage::{
23 load_drb_progress_fn, store_drb_progress_fn, store_drb_result_fn, LoadDrbProgressFn,
24 Storage, StoreDrbProgressFn, StoreDrbResultFn,
25 },
26 },
27 utils::root_block_in_epoch,
28 PeerConfig,
29};
30
31type EpochMap<TYPES> =
32 HashMap<<TYPES as NodeType>::Epoch, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
33
34type DrbMap<TYPES> = HashSet<<TYPES as NodeType>::Epoch>;
35
36type EpochSender<TYPES> = (
37 <TYPES as NodeType>::Epoch,
38 Sender<Result<EpochMembership<TYPES>>>,
39);
40
41pub struct EpochMembershipCoordinator<TYPES: NodeType> {
43 membership: Arc<RwLock<TYPES::Membership>>,
45
46 catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
51
52 drb_calculation_map: Arc<Mutex<DrbMap<TYPES>>>,
53
54 pub epoch_height: u64,
56
57 store_drb_progress_fn: StoreDrbProgressFn,
58
59 load_drb_progress_fn: LoadDrbProgressFn,
60
61 store_drb_result_fn: StoreDrbResultFn<TYPES>,
63
64 pub drb_difficulty_selector: Arc<RwLock<Option<DrbDifficultySelectorFn<TYPES>>>>,
66}
67
68impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
69 fn clone(&self) -> Self {
70 Self {
71 membership: Arc::clone(&self.membership),
72 catchup_map: Arc::clone(&self.catchup_map),
73 drb_calculation_map: Arc::clone(&self.drb_calculation_map),
74 epoch_height: self.epoch_height,
75 store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
76 load_drb_progress_fn: Arc::clone(&self.load_drb_progress_fn),
77 store_drb_result_fn: self.store_drb_result_fn.clone(),
78 drb_difficulty_selector: Arc::clone(&self.drb_difficulty_selector),
79 }
80 }
81}
82
83impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
84where
85 Self: Send,
86{
87 pub fn new<S: Storage<TYPES>>(
89 membership: Arc<RwLock<TYPES::Membership>>,
90 epoch_height: u64,
91 storage: &S,
92 ) -> Self {
93 Self {
94 membership,
95 catchup_map: Arc::default(),
96 drb_calculation_map: Arc::default(),
97 epoch_height,
98 store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
99 load_drb_progress_fn: load_drb_progress_fn(storage.clone()),
100 store_drb_result_fn: store_drb_result_fn(storage.clone()),
101 drb_difficulty_selector: Arc::new(RwLock::new(None)),
102 }
103 }
104
105 #[must_use]
107 pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
108 &self.membership
109 }
110
111 pub async fn set_drb_difficulty_selector(
113 &self,
114 drb_difficulty_selector: DrbDifficultySelectorFn<TYPES>,
115 ) {
116 let mut drb_difficulty_selector_writer = self.drb_difficulty_selector.write().await;
117
118 *drb_difficulty_selector_writer = Some(drb_difficulty_selector);
119 }
120
121 pub async fn membership_for_epoch(
124 &self,
125 maybe_epoch: Option<TYPES::Epoch>,
126 ) -> Result<EpochMembership<TYPES>> {
127 let ret_val = EpochMembership {
128 epoch: maybe_epoch,
129 coordinator: self.clone(),
130 };
131 let Some(epoch) = maybe_epoch else {
132 return Ok(ret_val);
133 };
134 if self
135 .membership
136 .read()
137 .await
138 .has_randomized_stake_table(epoch)
139 .map_err(|e| {
140 error!(
141 "membership_for_epoch failed while called with maybe_epoch {maybe_epoch:?}: \
142 {e}"
143 )
144 })?
145 {
146 return Ok(ret_val);
147 }
148 if self.catchup_map.lock().await.contains_key(&epoch) {
149 return Err(warn!(
150 "Randomized stake table for epoch {epoch:?} unavailable. Catchup already in \
151 progress"
152 ));
153 }
154 let coordinator = self.clone();
155 let (tx, rx) = broadcast(1);
156 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
157 spawn_catchup(coordinator, epoch, tx);
158
159 Err(warn!(
160 "Randomized stake table for epoch {epoch:?} unavailable. Starting catchup"
161 ))
162 }
163
164 pub async fn stake_table_for_epoch(
167 &self,
168 maybe_epoch: Option<TYPES::Epoch>,
169 ) -> Result<EpochMembership<TYPES>> {
170 let ret_val = EpochMembership {
171 epoch: maybe_epoch,
172 coordinator: self.clone(),
173 };
174 let Some(epoch) = maybe_epoch else {
175 return Ok(ret_val);
176 };
177 if self.membership.read().await.has_stake_table(epoch) {
178 return Ok(ret_val);
179 }
180 if self.catchup_map.lock().await.contains_key(&epoch) {
181 return Err(warn!(
182 "Stake table for Epoch {epoch:?} Unavailable. Catch up already in Progress"
183 ));
184 }
185 let coordinator = self.clone();
186 let (tx, rx) = broadcast(1);
187 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
188 spawn_catchup(coordinator, epoch, tx);
189
190 Err(warn!(
191 "Stake table for Epoch {epoch:?} Unavailable. Starting catchup"
192 ))
193 }
194
195 async fn catchup(
205 mut self,
206 epoch: TYPES::Epoch,
207 epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
208 ) {
209 let mut fetch_epochs = vec![];
211
212 let mut try_epoch = TYPES::Epoch::new(epoch.saturating_sub(1));
213 let maybe_first_epoch = self.membership.read().await.first_epoch();
214 let Some(first_epoch) = maybe_first_epoch else {
215 let err = anytrace::error!(
216 "We got a catchup request for epoch {epoch:?} but the first epoch is not set"
217 );
218 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
219 .await;
220 return;
221 };
222
223 loop {
225 let has_stake_table = self.membership.read().await.has_stake_table(try_epoch);
226 if has_stake_table {
227 if try_epoch <= TYPES::Epoch::new(epoch.saturating_sub(2)) {
229 break;
230 }
231 try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
232 } else {
233 if try_epoch <= first_epoch + 1 {
234 let err = anytrace::error!(
235 "We are trying to catchup to an epoch lower than the second epoch! This \
236 means the initial stake table is missing!"
237 );
238 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
239 .await;
240 return;
241 }
242 let mut map_lock = self.catchup_map.lock().await;
244 if let Some(mut rx) = map_lock
245 .get(&try_epoch)
246 .map(InactiveReceiver::activate_cloned)
247 {
248 drop(map_lock);
250 if let Ok(Ok(_)) = rx.recv_direct().await {
251 break;
252 };
253 } else {
255 let (mut tx, rx) = broadcast(1);
257 tx.set_overflow(true);
258 map_lock.insert(try_epoch, rx.deactivate());
259 drop(map_lock);
260 fetch_epochs.push((try_epoch, tx));
261 try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
262 }
263 };
264 }
265
266 while let Some((current_fetch_epoch, tx)) = fetch_epochs.pop() {
268 match self.fetch_stake_table(current_fetch_epoch).await {
269 Ok(_) => {},
270 Err(err) => {
271 fetch_epochs.push((current_fetch_epoch, tx));
272 self.catchup_cleanup(epoch, epoch_tx, fetch_epochs, err)
273 .await;
274 return;
275 },
276 };
277
278 if let Ok(Some(res)) = tx.try_broadcast(Ok(EpochMembership {
280 epoch: Some(current_fetch_epoch),
281 coordinator: self.clone(),
282 })) {
283 tracing::warn!(
284 "The catchup channel for epoch {} was overflown, dropped message {:?}",
285 current_fetch_epoch,
286 res.map(|em| em.epoch)
287 );
288 }
289
290 self.catchup_map.lock().await.remove(¤t_fetch_epoch);
292 }
293
294 let root_leaf = match self.fetch_stake_table(epoch).await {
295 Ok(root_leaf) => root_leaf,
296 Err(err) => {
297 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
298 .await;
299 return;
300 },
301 };
302
303 match <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
304 self.membership.clone(),
305 epoch,
306 )
307 .await
308 {
309 Ok(drb_result) => {
310 self.membership
311 .write()
312 .await
313 .add_drb_result(epoch, drb_result);
314 },
315 Err(err) => {
316 tracing::warn!(
317 "DRB result for epoch {} missing from membership. Beginning catchup to \
318 recalculate it. Error: {}",
319 epoch,
320 err
321 );
322
323 if let Err(err) = self.compute_drb_result(epoch, root_leaf).await {
324 tracing::error!(
325 "DRB calculation for epoch {} failed . Error: {}",
326 epoch,
327 err
328 );
329 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
330 .await;
331 }
332 },
333 };
334
335 if let Ok(Some(res)) = epoch_tx.try_broadcast(Ok(EpochMembership {
337 epoch: Some(epoch),
338 coordinator: self.clone(),
339 })) {
340 tracing::warn!(
341 "The catchup channel for epoch {} was overflown, dropped message {:?}",
342 epoch,
343 res.map(|em| em.epoch)
344 );
345 }
346
347 self.catchup_map.lock().await.remove(&epoch);
349 }
350
351 pub async fn wait_for_catchup(&self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
357 let maybe_receiver = self
358 .catchup_map
359 .lock()
360 .await
361 .get(&epoch)
362 .map(InactiveReceiver::activate_cloned);
363 let Some(mut rx) = maybe_receiver else {
364 if self.membership.read().await.has_stake_table(epoch) {
366 return Ok(EpochMembership {
367 epoch: Some(epoch),
368 coordinator: self.clone(),
369 });
370 }
371 return Err(anytrace::error!(
372 "No catchup in progress for epoch {epoch} and we don't have a stake table for it"
373 ));
374 };
375 let Ok(Ok(mem)) = rx.recv_direct().await else {
376 return Err(anytrace::error!("Catchup for epoch {epoch} failed"));
377 };
378 Ok(mem)
379 }
380
381 async fn catchup_cleanup(
388 &mut self,
389 req_epoch: TYPES::Epoch,
390 epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
391 mut cancel_epochs: Vec<EpochSender<TYPES>>,
392 err: Error,
393 ) {
394 cancel_epochs.push((req_epoch, epoch_tx));
396
397 tracing::error!(
398 "catchup for epoch {req_epoch:?} failed: {err:?}. Canceling catchup for epochs: {:?}",
399 cancel_epochs.iter().map(|(e, _)| e).collect::<Vec<_>>()
400 );
401 let mut map_lock = self.catchup_map.lock().await;
402 for (epoch, _) in cancel_epochs.iter() {
403 map_lock.remove(epoch);
405 }
406 drop(map_lock);
407 for (cancel_epoch, tx) in cancel_epochs {
408 if let Ok(Some(res)) = tx.try_broadcast(Err(err.clone())) {
410 tracing::warn!(
411 "The catchup channel for epoch {} was overflown during cleanup, dropped \
412 message {:?}",
413 cancel_epoch,
414 res.map(|em| em.epoch)
415 );
416 }
417 }
418 }
419
420 async fn fetch_stake_table(&self, epoch: TYPES::Epoch) -> Result<Leaf2<TYPES>> {
434 let root_epoch = TYPES::Epoch::new(epoch.saturating_sub(2));
435 let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
436 return Err(anytrace::error!(
437 "We tried to fetch stake table for epoch {epoch:?} but we don't have its root \
438 epoch {root_epoch:?}. This should not happen"
439 ));
440 };
441
442 let Ok(root_leaf) = root_membership
445 .get_epoch_root(root_block_in_epoch(*root_epoch, self.epoch_height))
446 .await
447 else {
448 return Err(anytrace::error!(
449 "get epoch root leaf failed for epoch {root_epoch:?}"
450 ));
451 };
452
453 Membership::add_epoch_root(
454 Arc::clone(&self.membership),
455 epoch,
456 root_leaf.block_header().clone(),
457 )
458 .await
459 .map_err(|e| {
460 anytrace::error!("Failed to add epoch root for epoch {epoch:?} to membership: {e}")
461 })?;
462
463 Ok(root_leaf)
464 }
465
466 pub async fn compute_drb_result(
467 &self,
468 epoch: TYPES::Epoch,
469 root_leaf: Leaf2<TYPES>,
470 ) -> Result<DrbResult> {
471 let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
472
473 if drb_calculation_map_lock.contains(&epoch) {
474 return Err(anytrace::warn!(
475 "DRB calculation for epoch {} already in progress",
476 epoch
477 ));
478 } else {
479 drb_calculation_map_lock.insert(epoch);
480 }
481
482 drop(drb_calculation_map_lock);
483
484 let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures) else {
485 return Err(anytrace::error!(
486 "Failed to serialize the QC signature for leaf {root_leaf:?}"
487 ));
488 };
489
490 let Some(drb_difficulty_selector) = self.drb_difficulty_selector.read().await.clone()
491 else {
492 return Err(anytrace::error!(
493 "The DRB difficulty selector is missing from the epoch membership coordinator. \
494 This node will not be able to spawn any DRB calculation tasks from catchup."
495 ));
496 };
497
498 let drb_difficulty = drb_difficulty_selector(root_leaf.view_number()).await;
499
500 let mut drb_seed_input = [0u8; 32];
501 let len = drb_seed_input_vec.len().min(32);
502 drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
503 let drb_input = DrbInput {
504 epoch: *epoch,
505 iteration: 0,
506 value: drb_seed_input,
507 difficulty_level: drb_difficulty,
508 };
509
510 let store_drb_progress_fn = self.store_drb_progress_fn.clone();
511 let load_drb_progress_fn = self.load_drb_progress_fn.clone();
512
513 let drb = compute_drb_result(drb_input, store_drb_progress_fn, load_drb_progress_fn).await;
514
515 let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
516 drb_calculation_map_lock.remove(&epoch);
517 drop(drb_calculation_map_lock);
518
519 tracing::info!("Writing drb result from catchup to storage for epoch {epoch}: {drb:?}");
520 if let Err(e) = (self.store_drb_result_fn)(epoch, drb).await {
521 tracing::warn!("Failed to add drb result to storage: {e}");
522 }
523 self.membership.write().await.add_drb_result(epoch, drb);
524
525 Ok(drb)
526 }
527}
528
529fn spawn_catchup<T: NodeType>(
530 coordinator: EpochMembershipCoordinator<T>,
531 epoch: T::Epoch,
532 epoch_tx: Sender<Result<EpochMembership<T>>>,
533) {
534 tokio::spawn(async move {
535 coordinator.clone().catchup(epoch, epoch_tx).await;
536 });
537}
538pub struct EpochMembership<TYPES: NodeType> {
541 pub epoch: Option<TYPES::Epoch>,
543 pub coordinator: EpochMembershipCoordinator<TYPES>,
545}
546
547impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
548 fn clone(&self) -> Self {
549 Self {
550 coordinator: self.coordinator.clone(),
551 epoch: self.epoch,
552 }
553 }
554}
555
556impl<TYPES: NodeType> EpochMembership<TYPES> {
557 pub fn epoch(&self) -> Option<TYPES::Epoch> {
559 self.epoch
560 }
561
562 pub async fn next_epoch(&self) -> Result<Self> {
564 ensure!(
565 self.epoch().is_some(),
566 "No next epoch because epoch is None"
567 );
568 self.coordinator
569 .membership_for_epoch(self.epoch.map(|e| e + 1))
570 .await
571 }
572 pub async fn next_epoch_stake_table(&self) -> Result<Self> {
574 ensure!(
575 self.epoch().is_some(),
576 "No next epoch because epoch is None"
577 );
578 self.coordinator
579 .stake_table_for_epoch(self.epoch.map(|e| e + 1))
580 .await
581 }
582 pub async fn get_new_epoch(&self, epoch: Option<TYPES::Epoch>) -> Result<Self> {
583 self.coordinator.membership_for_epoch(epoch).await
584 }
585
586 async fn get_epoch_root(&self, block_height: u64) -> anyhow::Result<Leaf2<TYPES>> {
588 let Some(epoch) = self.epoch else {
589 anyhow::bail!("Cannot get root for None epoch");
590 };
591 <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
592 self.coordinator.membership.clone(),
593 block_height,
594 epoch,
595 )
596 .await
597 }
598
599 pub async fn get_epoch_drb(&self) -> Result<DrbResult> {
601 let Some(epoch) = self.epoch else {
602 return Err(anytrace::warn!("Cannot get drb for None epoch"));
603 };
604 <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
605 self.coordinator.membership.clone(),
606 epoch,
607 )
608 .await
609 .wrap()
610 }
611
612 pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
614 self.coordinator
615 .membership
616 .read()
617 .await
618 .stake_table(self.epoch)
619 }
620
621 pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
623 self.coordinator
624 .membership
625 .read()
626 .await
627 .da_stake_table(self.epoch)
628 }
629
630 pub async fn committee_members(
632 &self,
633 view_number: TYPES::View,
634 ) -> BTreeSet<TYPES::SignatureKey> {
635 self.coordinator
636 .membership
637 .read()
638 .await
639 .committee_members(view_number, self.epoch)
640 }
641
642 pub async fn da_committee_members(
644 &self,
645 view_number: TYPES::View,
646 ) -> BTreeSet<TYPES::SignatureKey> {
647 self.coordinator
648 .membership
649 .read()
650 .await
651 .da_committee_members(view_number, self.epoch)
652 }
653
654 pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
657 self.coordinator
658 .membership
659 .read()
660 .await
661 .stake(pub_key, self.epoch)
662 }
663
664 pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
667 self.coordinator
668 .membership
669 .read()
670 .await
671 .da_stake(pub_key, self.epoch)
672 }
673
674 pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
676 self.coordinator
677 .membership
678 .read()
679 .await
680 .has_stake(pub_key, self.epoch)
681 }
682
683 pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
685 self.coordinator
686 .membership
687 .read()
688 .await
689 .has_da_stake(pub_key, self.epoch)
690 }
691
692 pub async fn leader(&self, view: TYPES::View) -> Result<TYPES::SignatureKey> {
700 self.coordinator
701 .membership
702 .read()
703 .await
704 .leader(view, self.epoch)
705 }
706
707 pub async fn lookup_leader(
715 &self,
716 view: TYPES::View,
717 ) -> std::result::Result<
718 TYPES::SignatureKey,
719 <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
720 > {
721 self.coordinator
722 .membership
723 .read()
724 .await
725 .lookup_leader(view, self.epoch)
726 }
727
728 pub async fn total_nodes(&self) -> usize {
730 self.coordinator
731 .membership
732 .read()
733 .await
734 .total_nodes(self.epoch)
735 }
736
737 pub async fn da_total_nodes(&self) -> usize {
739 self.coordinator
740 .membership
741 .read()
742 .await
743 .da_total_nodes(self.epoch)
744 }
745
746 pub async fn success_threshold(&self) -> U256 {
748 self.coordinator
749 .membership
750 .read()
751 .await
752 .success_threshold(self.epoch)
753 }
754
755 pub async fn da_success_threshold(&self) -> U256 {
757 self.coordinator
758 .membership
759 .read()
760 .await
761 .da_success_threshold(self.epoch)
762 }
763
764 pub async fn failure_threshold(&self) -> U256 {
766 self.coordinator
767 .membership
768 .read()
769 .await
770 .failure_threshold(self.epoch)
771 }
772
773 pub async fn upgrade_threshold(&self) -> U256 {
775 self.coordinator
776 .membership
777 .read()
778 .await
779 .upgrade_threshold(self.epoch)
780 }
781
782 pub async fn add_drb_result(&self, drb_result: DrbResult) {
784 if let Some(epoch) = self.epoch() {
785 self.coordinator
786 .membership
787 .write()
788 .await
789 .add_drb_result(epoch, drb_result);
790 }
791 }
792 pub async fn stake_table_hash(
793 &self,
794 ) -> Option<Commitment<<TYPES::Membership as Membership<TYPES>>::StakeTableHash>> {
795 self.coordinator
796 .membership
797 .read()
798 .await
799 .stake_table_hash(self.epoch?)
800 }
801}