1use std::{
2 collections::{BTreeSet, HashMap, HashSet},
3 sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
8use async_lock::{Mutex, RwLock};
9use committable::Commitment;
10use hotshot_utils::{anytrace::*, *};
11
12use crate::{
13 data::Leaf2,
14 drb::{compute_drb_result, DrbDifficultySelectorFn, DrbInput, DrbResult},
15 event::Event,
16 stake_table::HSStakeTable,
17 traits::{
18 election::Membership,
19 node_implementation::{ConsensusTime, NodeType},
20 storage::{
21 load_drb_progress_fn, store_drb_progress_fn, store_drb_result_fn, LoadDrbProgressFn,
22 Storage, StoreDrbProgressFn, StoreDrbResultFn,
23 },
24 },
25 utils::root_block_in_epoch,
26 PeerConfig,
27};
28
29type EpochMap<TYPES> =
30 HashMap<<TYPES as NodeType>::Epoch, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
31
32type DrbMap<TYPES> = HashSet<<TYPES as NodeType>::Epoch>;
33
34type EpochSender<TYPES> = (
35 <TYPES as NodeType>::Epoch,
36 Sender<Result<EpochMembership<TYPES>>>,
37);
38
39pub struct EpochMembershipCoordinator<TYPES: NodeType> {
41 membership: Arc<RwLock<TYPES::Membership>>,
43
44 catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
49
50 drb_calculation_map: Arc<Mutex<DrbMap<TYPES>>>,
51
52 pub epoch_height: u64,
54
55 store_drb_progress_fn: StoreDrbProgressFn,
56
57 load_drb_progress_fn: LoadDrbProgressFn,
58
59 store_drb_result_fn: StoreDrbResultFn<TYPES>,
61
62 pub drb_difficulty_selector: Arc<RwLock<Option<DrbDifficultySelectorFn<TYPES>>>>,
64}
65
66impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
67 fn clone(&self) -> Self {
68 Self {
69 membership: Arc::clone(&self.membership),
70 catchup_map: Arc::clone(&self.catchup_map),
71 drb_calculation_map: Arc::clone(&self.drb_calculation_map),
72 epoch_height: self.epoch_height,
73 store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
74 load_drb_progress_fn: Arc::clone(&self.load_drb_progress_fn),
75 store_drb_result_fn: self.store_drb_result_fn.clone(),
76 drb_difficulty_selector: Arc::clone(&self.drb_difficulty_selector),
77 }
78 }
79}
80
81impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
82where
83 Self: Send,
84{
85 pub fn new<S: Storage<TYPES>>(
87 membership: Arc<RwLock<TYPES::Membership>>,
88 epoch_height: u64,
89 storage: &S,
90 ) -> Self {
91 Self {
92 membership,
93 catchup_map: Arc::default(),
94 drb_calculation_map: Arc::default(),
95 epoch_height,
96 store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
97 load_drb_progress_fn: load_drb_progress_fn(storage.clone()),
98 store_drb_result_fn: store_drb_result_fn(storage.clone()),
99 drb_difficulty_selector: Arc::new(RwLock::new(None)),
100 }
101 }
102
103 pub async fn set_external_channel(&mut self, external_channel: Receiver<Event<TYPES>>) {
104 self.membership
105 .write()
106 .await
107 .set_external_channel(external_channel)
108 .await;
109 }
110
111 #[must_use]
113 pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
114 &self.membership
115 }
116
117 pub async fn set_drb_difficulty_selector(
119 &self,
120 drb_difficulty_selector: DrbDifficultySelectorFn<TYPES>,
121 ) {
122 let mut drb_difficulty_selector_writer = self.drb_difficulty_selector.write().await;
123
124 *drb_difficulty_selector_writer = Some(drb_difficulty_selector);
125 }
126
127 pub async fn membership_for_epoch(
130 &self,
131 maybe_epoch: Option<TYPES::Epoch>,
132 ) -> Result<EpochMembership<TYPES>> {
133 let ret_val = EpochMembership {
134 epoch: maybe_epoch,
135 coordinator: self.clone(),
136 };
137 let Some(epoch) = maybe_epoch else {
138 return Ok(ret_val);
139 };
140 if self
141 .membership
142 .read()
143 .await
144 .has_randomized_stake_table(epoch)
145 .map_err(|e| {
146 error!(
147 "membership_for_epoch failed while called with maybe_epoch {maybe_epoch:?}: \
148 {e}"
149 )
150 })?
151 {
152 return Ok(ret_val);
153 }
154 if self.catchup_map.lock().await.contains_key(&epoch) {
155 return Err(warn!(
156 "Randomized stake table for epoch {epoch:?} unavailable. Catchup already in \
157 progress"
158 ));
159 }
160 let coordinator = self.clone();
161 let (tx, rx) = broadcast(1);
162 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
163 spawn_catchup(coordinator, epoch, tx);
164
165 Err(warn!(
166 "Randomized stake table for epoch {epoch:?} unavailable. Starting catchup"
167 ))
168 }
169
170 pub async fn stake_table_for_epoch(
173 &self,
174 maybe_epoch: Option<TYPES::Epoch>,
175 ) -> Result<EpochMembership<TYPES>> {
176 let ret_val = EpochMembership {
177 epoch: maybe_epoch,
178 coordinator: self.clone(),
179 };
180 let Some(epoch) = maybe_epoch else {
181 return Ok(ret_val);
182 };
183 if self.membership.read().await.has_stake_table(epoch) {
184 return Ok(ret_val);
185 }
186 if self.catchup_map.lock().await.contains_key(&epoch) {
187 return Err(warn!(
188 "Stake table for Epoch {epoch:?} Unavailable. Catch up already in Progress"
189 ));
190 }
191 let coordinator = self.clone();
192 let (tx, rx) = broadcast(1);
193 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
194 spawn_catchup(coordinator, epoch, tx);
195
196 Err(warn!(
197 "Stake table for Epoch {epoch:?} Unavailable. Starting catchup"
198 ))
199 }
200
201 async fn catchup(
211 mut self,
212 epoch: TYPES::Epoch,
213 epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
214 ) {
215 let mut fetch_epochs = vec![];
217
218 let mut try_epoch = TYPES::Epoch::new(epoch.saturating_sub(1));
219 let maybe_first_epoch = self.membership.read().await.first_epoch();
220 let Some(first_epoch) = maybe_first_epoch else {
221 let err = anytrace::error!(
222 "We got a catchup request for epoch {epoch:?} but the first epoch is not set"
223 );
224 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
225 .await;
226 return;
227 };
228
229 loop {
231 let has_stake_table = self.membership.read().await.has_stake_table(try_epoch);
232 if has_stake_table {
233 if try_epoch <= TYPES::Epoch::new(epoch.saturating_sub(2)) {
235 break;
236 }
237 try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
238 } else {
239 if try_epoch <= first_epoch + 1 {
240 let err = anytrace::error!(
241 "We are trying to catchup to an epoch lower than the second epoch! This \
242 means the initial stake table is missing!"
243 );
244 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
245 .await;
246 return;
247 }
248 let mut map_lock = self.catchup_map.lock().await;
250 if let Some(mut rx) = map_lock
251 .get(&try_epoch)
252 .map(InactiveReceiver::activate_cloned)
253 {
254 drop(map_lock);
256 if let Ok(Ok(_)) = rx.recv_direct().await {
257 break;
258 };
259 } else {
261 let (mut tx, rx) = broadcast(1);
263 tx.set_overflow(true);
264 map_lock.insert(try_epoch, rx.deactivate());
265 drop(map_lock);
266 fetch_epochs.push((try_epoch, tx));
267 try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
268 }
269 };
270 }
271
272 while let Some((current_fetch_epoch, tx)) = fetch_epochs.pop() {
274 match self.fetch_stake_table(current_fetch_epoch).await {
275 Ok(_) => {},
276 Err(err) => {
277 fetch_epochs.push((current_fetch_epoch, tx));
278 self.catchup_cleanup(epoch, epoch_tx, fetch_epochs, err)
279 .await;
280 return;
281 },
282 };
283
284 if let Ok(Some(res)) = tx.try_broadcast(Ok(EpochMembership {
286 epoch: Some(current_fetch_epoch),
287 coordinator: self.clone(),
288 })) {
289 tracing::warn!(
290 "The catchup channel for epoch {} was overflown, dropped message {:?}",
291 current_fetch_epoch,
292 res.map(|em| em.epoch)
293 );
294 }
295
296 self.catchup_map.lock().await.remove(¤t_fetch_epoch);
298 }
299
300 let root_leaf = match self.fetch_stake_table(epoch).await {
301 Ok(root_leaf) => root_leaf,
302 Err(err) => {
303 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
304 .await;
305 return;
306 },
307 };
308
309 match <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
310 self.membership.clone(),
311 epoch,
312 )
313 .await
314 {
315 Ok(drb_result) => {
316 self.membership
317 .write()
318 .await
319 .add_drb_result(epoch, drb_result);
320 },
321 Err(err) => {
322 tracing::warn!(
323 "Recalculating missing DRB result for epoch {}. Catchup failed with error: {}",
324 epoch,
325 err
326 );
327
328 let result = self.compute_drb_result(epoch, root_leaf).await;
329
330 log!(result);
331
332 if let Err(err) = result {
333 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
334 .await;
335 }
336 },
337 };
338
339 if let Ok(Some(res)) = epoch_tx.try_broadcast(Ok(EpochMembership {
341 epoch: Some(epoch),
342 coordinator: self.clone(),
343 })) {
344 tracing::warn!(
345 "The catchup channel for epoch {} was overflown, dropped message {:?}",
346 epoch,
347 res.map(|em| em.epoch)
348 );
349 }
350
351 self.catchup_map.lock().await.remove(&epoch);
353 }
354
355 pub async fn wait_for_catchup(&self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
361 let maybe_receiver = self
362 .catchup_map
363 .lock()
364 .await
365 .get(&epoch)
366 .map(InactiveReceiver::activate_cloned);
367 let Some(mut rx) = maybe_receiver else {
368 if self.membership.read().await.has_stake_table(epoch) {
370 return Ok(EpochMembership {
371 epoch: Some(epoch),
372 coordinator: self.clone(),
373 });
374 }
375 return Err(anytrace::error!(
376 "No catchup in progress for epoch {epoch} and we don't have a stake table for it"
377 ));
378 };
379 let Ok(Ok(mem)) = rx.recv_direct().await else {
380 return Err(anytrace::error!("Catchup for epoch {epoch} failed"));
381 };
382 Ok(mem)
383 }
384
385 async fn catchup_cleanup(
392 &mut self,
393 req_epoch: TYPES::Epoch,
394 epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
395 mut cancel_epochs: Vec<EpochSender<TYPES>>,
396 err: Error,
397 ) {
398 cancel_epochs.push((req_epoch, epoch_tx));
400
401 tracing::error!(
402 "catchup for epoch {req_epoch:?} failed: {err:?}. Canceling catchup for epochs: {:?}",
403 cancel_epochs.iter().map(|(e, _)| e).collect::<Vec<_>>()
404 );
405 let mut map_lock = self.catchup_map.lock().await;
406 for (epoch, _) in cancel_epochs.iter() {
407 map_lock.remove(epoch);
409 }
410 drop(map_lock);
411 for (cancel_epoch, tx) in cancel_epochs {
412 if let Ok(Some(res)) = tx.try_broadcast(Err(err.clone())) {
414 tracing::warn!(
415 "The catchup channel for epoch {} was overflown during cleanup, dropped \
416 message {:?}",
417 cancel_epoch,
418 res.map(|em| em.epoch)
419 );
420 }
421 }
422 }
423
424 async fn fetch_stake_table(&self, epoch: TYPES::Epoch) -> Result<Leaf2<TYPES>> {
438 let root_epoch = TYPES::Epoch::new(epoch.saturating_sub(2));
439 let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
440 return Err(anytrace::error!(
441 "We tried to fetch stake table for epoch {epoch:?} but we don't have its root \
442 epoch {root_epoch:?}. This should not happen"
443 ));
444 };
445
446 let Ok(root_leaf) = root_membership
449 .get_epoch_root(root_block_in_epoch(*root_epoch, self.epoch_height))
450 .await
451 else {
452 return Err(anytrace::error!(
453 "get epoch root leaf failed for epoch {root_epoch:?}"
454 ));
455 };
456
457 Membership::add_epoch_root(
458 Arc::clone(&self.membership),
459 epoch,
460 root_leaf.block_header().clone(),
461 )
462 .await
463 .map_err(|e| {
464 anytrace::error!("Failed to add epoch root for epoch {epoch:?} to membership: {e}")
465 })?;
466
467 Ok(root_leaf)
468 }
469
470 pub async fn compute_drb_result(
471 &self,
472 epoch: TYPES::Epoch,
473 root_leaf: Leaf2<TYPES>,
474 ) -> Result<DrbResult> {
475 let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
476
477 if drb_calculation_map_lock.contains(&epoch) {
478 return Err(anytrace::debug!(
479 "DRB calculation for epoch {} already in progress",
480 epoch
481 ));
482 } else {
483 drb_calculation_map_lock.insert(epoch);
484 }
485
486 drop(drb_calculation_map_lock);
487
488 let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures) else {
489 return Err(anytrace::error!(
490 "Failed to serialize the QC signature for leaf {root_leaf:?}"
491 ));
492 };
493
494 let Some(drb_difficulty_selector) = self.drb_difficulty_selector.read().await.clone()
495 else {
496 return Err(anytrace::error!(
497 "The DRB difficulty selector is missing from the epoch membership coordinator. \
498 This node will not be able to spawn any DRB calculation tasks from catchup."
499 ));
500 };
501
502 let drb_difficulty = drb_difficulty_selector(root_leaf.view_number()).await;
503
504 let mut drb_seed_input = [0u8; 32];
505 let len = drb_seed_input_vec.len().min(32);
506 drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
507 let drb_input = DrbInput {
508 epoch: *epoch,
509 iteration: 0,
510 value: drb_seed_input,
511 difficulty_level: drb_difficulty,
512 };
513
514 let store_drb_progress_fn = self.store_drb_progress_fn.clone();
515 let load_drb_progress_fn = self.load_drb_progress_fn.clone();
516
517 let drb = compute_drb_result(drb_input, store_drb_progress_fn, load_drb_progress_fn).await;
518
519 let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
520 drb_calculation_map_lock.remove(&epoch);
521 drop(drb_calculation_map_lock);
522
523 tracing::info!("Writing drb result from catchup to storage for epoch {epoch}: {drb:?}");
524 if let Err(e) = (self.store_drb_result_fn)(epoch, drb).await {
525 tracing::warn!("Failed to add drb result to storage: {e}");
526 }
527 self.membership.write().await.add_drb_result(epoch, drb);
528
529 Ok(drb)
530 }
531}
532
533fn spawn_catchup<T: NodeType>(
534 coordinator: EpochMembershipCoordinator<T>,
535 epoch: T::Epoch,
536 epoch_tx: Sender<Result<EpochMembership<T>>>,
537) {
538 tokio::spawn(async move {
539 coordinator.clone().catchup(epoch, epoch_tx).await;
540 });
541}
542pub struct EpochMembership<TYPES: NodeType> {
545 pub epoch: Option<TYPES::Epoch>,
547 pub coordinator: EpochMembershipCoordinator<TYPES>,
549}
550
551impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
552 fn clone(&self) -> Self {
553 Self {
554 coordinator: self.coordinator.clone(),
555 epoch: self.epoch,
556 }
557 }
558}
559
560impl<TYPES: NodeType> EpochMembership<TYPES> {
561 pub fn epoch(&self) -> Option<TYPES::Epoch> {
563 self.epoch
564 }
565
566 pub async fn next_epoch(&self) -> Result<Self> {
568 ensure!(
569 self.epoch().is_some(),
570 "No next epoch because epoch is None"
571 );
572 self.coordinator
573 .membership_for_epoch(self.epoch.map(|e| e + 1))
574 .await
575 }
576 pub async fn next_epoch_stake_table(&self) -> Result<Self> {
578 ensure!(
579 self.epoch().is_some(),
580 "No next epoch because epoch is None"
581 );
582 self.coordinator
583 .stake_table_for_epoch(self.epoch.map(|e| e + 1))
584 .await
585 }
586 pub async fn get_new_epoch(&self, epoch: Option<TYPES::Epoch>) -> Result<Self> {
587 self.coordinator.membership_for_epoch(epoch).await
588 }
589
590 async fn get_epoch_root(&self, block_height: u64) -> anyhow::Result<Leaf2<TYPES>> {
592 let Some(epoch) = self.epoch else {
593 anyhow::bail!("Cannot get root for None epoch");
594 };
595 <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
596 self.coordinator.membership.clone(),
597 block_height,
598 epoch,
599 )
600 .await
601 }
602
603 pub async fn get_epoch_drb(&self) -> Result<DrbResult> {
605 let Some(epoch) = self.epoch else {
606 return Err(anytrace::warn!("Cannot get drb for None epoch"));
607 };
608 <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
609 self.coordinator.membership.clone(),
610 epoch,
611 )
612 .await
613 .wrap()
614 }
615
616 pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
618 self.coordinator
619 .membership
620 .read()
621 .await
622 .stake_table(self.epoch)
623 }
624
625 pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
627 self.coordinator
628 .membership
629 .read()
630 .await
631 .da_stake_table(self.epoch)
632 }
633
634 pub async fn committee_members(
636 &self,
637 view_number: TYPES::View,
638 ) -> BTreeSet<TYPES::SignatureKey> {
639 self.coordinator
640 .membership
641 .read()
642 .await
643 .committee_members(view_number, self.epoch)
644 }
645
646 pub async fn da_committee_members(
648 &self,
649 view_number: TYPES::View,
650 ) -> BTreeSet<TYPES::SignatureKey> {
651 self.coordinator
652 .membership
653 .read()
654 .await
655 .da_committee_members(view_number, self.epoch)
656 }
657
658 pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
661 self.coordinator
662 .membership
663 .read()
664 .await
665 .stake(pub_key, self.epoch)
666 }
667
668 pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
671 self.coordinator
672 .membership
673 .read()
674 .await
675 .da_stake(pub_key, self.epoch)
676 }
677
678 pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
680 self.coordinator
681 .membership
682 .read()
683 .await
684 .has_stake(pub_key, self.epoch)
685 }
686
687 pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
689 self.coordinator
690 .membership
691 .read()
692 .await
693 .has_da_stake(pub_key, self.epoch)
694 }
695
696 pub async fn leader(&self, view: TYPES::View) -> Result<TYPES::SignatureKey> {
704 self.coordinator
705 .membership
706 .read()
707 .await
708 .leader(view, self.epoch)
709 }
710
711 pub async fn lookup_leader(
719 &self,
720 view: TYPES::View,
721 ) -> std::result::Result<
722 TYPES::SignatureKey,
723 <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
724 > {
725 self.coordinator
726 .membership
727 .read()
728 .await
729 .lookup_leader(view, self.epoch)
730 }
731
732 pub async fn total_nodes(&self) -> usize {
734 self.coordinator
735 .membership
736 .read()
737 .await
738 .total_nodes(self.epoch)
739 }
740
741 pub async fn da_total_nodes(&self) -> usize {
743 self.coordinator
744 .membership
745 .read()
746 .await
747 .da_total_nodes(self.epoch)
748 }
749
750 pub async fn success_threshold(&self) -> U256 {
752 self.coordinator
753 .membership
754 .read()
755 .await
756 .success_threshold(self.epoch)
757 }
758
759 pub async fn da_success_threshold(&self) -> U256 {
761 self.coordinator
762 .membership
763 .read()
764 .await
765 .da_success_threshold(self.epoch)
766 }
767
768 pub async fn failure_threshold(&self) -> U256 {
770 self.coordinator
771 .membership
772 .read()
773 .await
774 .failure_threshold(self.epoch)
775 }
776
777 pub async fn upgrade_threshold(&self) -> U256 {
779 self.coordinator
780 .membership
781 .read()
782 .await
783 .upgrade_threshold(self.epoch)
784 }
785
786 pub async fn add_drb_result(&self, drb_result: DrbResult) {
788 if let Some(epoch) = self.epoch() {
789 self.coordinator
790 .membership
791 .write()
792 .await
793 .add_drb_result(epoch, drb_result);
794 }
795 }
796 pub async fn stake_table_hash(
797 &self,
798 ) -> Option<Commitment<<TYPES::Membership as Membership<TYPES>>::StakeTableHash>> {
799 self.coordinator
800 .membership
801 .read()
802 .await
803 .stake_table_hash(self.epoch?)
804 }
805}