1use std::{
2 collections::{BTreeSet, HashMap, HashSet},
3 sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
8use async_lock::{Mutex, RwLock};
9use committable::Commitment;
10use hotshot_utils::{anytrace::*, *};
11
12use crate::{
13 data::Leaf2,
14 drb::{compute_drb_result, DrbDifficultySelectorFn, DrbInput, DrbResult},
15 event::Event,
16 stake_table::HSStakeTable,
17 traits::{
18 block_contents::BlockHeader,
19 election::Membership,
20 node_implementation::{ConsensusTime, NodeType},
21 storage::{
22 load_drb_progress_fn, store_drb_progress_fn, store_drb_result_fn, LoadDrbProgressFn,
23 Storage, StoreDrbProgressFn, StoreDrbResultFn,
24 },
25 },
26 PeerConfig,
27};
28
29type EpochMap<TYPES> =
30 HashMap<<TYPES as NodeType>::Epoch, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
31
32type DrbMap<TYPES> = HashSet<<TYPES as NodeType>::Epoch>;
33
34type EpochSender<TYPES> = (
35 <TYPES as NodeType>::Epoch,
36 Sender<Result<EpochMembership<TYPES>>>,
37);
38
39pub struct EpochMembershipCoordinator<TYPES: NodeType> {
41 membership: Arc<RwLock<TYPES::Membership>>,
43
44 catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
49
50 drb_calculation_map: Arc<Mutex<DrbMap<TYPES>>>,
51
52 pub epoch_height: u64,
54
55 store_drb_progress_fn: StoreDrbProgressFn,
56
57 load_drb_progress_fn: LoadDrbProgressFn,
58
59 store_drb_result_fn: StoreDrbResultFn<TYPES>,
61
62 pub drb_difficulty_selector: Arc<RwLock<Option<DrbDifficultySelectorFn>>>,
64}
65
66impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
67 fn clone(&self) -> Self {
68 Self {
69 membership: Arc::clone(&self.membership),
70 catchup_map: Arc::clone(&self.catchup_map),
71 drb_calculation_map: Arc::clone(&self.drb_calculation_map),
72 epoch_height: self.epoch_height,
73 store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
74 load_drb_progress_fn: Arc::clone(&self.load_drb_progress_fn),
75 store_drb_result_fn: self.store_drb_result_fn.clone(),
76 drb_difficulty_selector: Arc::clone(&self.drb_difficulty_selector),
77 }
78 }
79}
80
81impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
82where
83 Self: Send,
84{
85 pub fn new<S: Storage<TYPES>>(
87 membership: Arc<RwLock<TYPES::Membership>>,
88 epoch_height: u64,
89 storage: &S,
90 ) -> Self {
91 Self {
92 membership,
93 catchup_map: Arc::default(),
94 drb_calculation_map: Arc::default(),
95 epoch_height,
96 store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
97 load_drb_progress_fn: load_drb_progress_fn(storage.clone()),
98 store_drb_result_fn: store_drb_result_fn(storage.clone()),
99 drb_difficulty_selector: Arc::new(RwLock::new(None)),
100 }
101 }
102
103 pub async fn set_external_channel(&mut self, external_channel: Receiver<Event<TYPES>>) {
104 self.membership
105 .write()
106 .await
107 .set_external_channel(external_channel)
108 .await;
109 }
110
111 #[must_use]
113 pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
114 &self.membership
115 }
116
117 pub async fn set_drb_difficulty_selector(
119 &self,
120 drb_difficulty_selector: DrbDifficultySelectorFn,
121 ) {
122 let mut drb_difficulty_selector_writer = self.drb_difficulty_selector.write().await;
123
124 *drb_difficulty_selector_writer = Some(drb_difficulty_selector);
125 }
126
127 pub async fn membership_for_epoch(
130 &self,
131 maybe_epoch: Option<TYPES::Epoch>,
132 ) -> Result<EpochMembership<TYPES>> {
133 let ret_val = EpochMembership {
134 epoch: maybe_epoch,
135 coordinator: self.clone(),
136 };
137 let Some(epoch) = maybe_epoch else {
138 return Ok(ret_val);
139 };
140 if self
141 .membership
142 .read()
143 .await
144 .has_randomized_stake_table(epoch)
145 .map_err(|e| {
146 error!(
147 "membership_for_epoch failed while called with maybe_epoch {maybe_epoch:?}: \
148 {e}"
149 )
150 })?
151 {
152 return Ok(ret_val);
153 }
154 if self.catchup_map.lock().await.contains_key(&epoch) {
155 return Err(warn!(
156 "Randomized stake table for epoch {epoch:?} unavailable. Catchup already in \
157 progress"
158 ));
159 }
160 let coordinator = self.clone();
161 let (tx, rx) = broadcast(1);
162 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
163 spawn_catchup(coordinator, epoch, tx);
164
165 Err(warn!(
166 "Randomized stake table for epoch {epoch:?} unavailable. Starting catchup"
167 ))
168 }
169
170 pub async fn stake_table_for_epoch(
173 &self,
174 maybe_epoch: Option<TYPES::Epoch>,
175 ) -> Result<EpochMembership<TYPES>> {
176 let ret_val = EpochMembership {
177 epoch: maybe_epoch,
178 coordinator: self.clone(),
179 };
180 let Some(epoch) = maybe_epoch else {
181 return Ok(ret_val);
182 };
183 if self.membership.read().await.has_stake_table(epoch) {
184 return Ok(ret_val);
185 }
186 if self.catchup_map.lock().await.contains_key(&epoch) {
187 return Err(warn!(
188 "Stake table for Epoch {epoch:?} Unavailable. Catch up already in Progress"
189 ));
190 }
191 let coordinator = self.clone();
192 let (tx, rx) = broadcast(1);
193 self.catchup_map.lock().await.insert(epoch, rx.deactivate());
194 spawn_catchup(coordinator, epoch, tx);
195
196 Err(warn!(
197 "Stake table for Epoch {epoch:?} Unavailable. Starting catchup"
198 ))
199 }
200
201 async fn catchup(
211 mut self,
212 epoch: TYPES::Epoch,
213 epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
214 ) {
215 let mut fetch_epochs = vec![];
217
218 let mut try_epoch = TYPES::Epoch::new(epoch.saturating_sub(1));
219 let maybe_first_epoch = self.membership.read().await.first_epoch();
220 let Some(first_epoch) = maybe_first_epoch else {
221 let err = anytrace::error!(
222 "We got a catchup request for epoch {epoch:?} but the first epoch is not set"
223 );
224 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
225 .await;
226 return;
227 };
228
229 loop {
231 let has_stake_table = self.membership.read().await.has_stake_table(try_epoch);
232 if has_stake_table {
233 if try_epoch <= TYPES::Epoch::new(epoch.saturating_sub(2)) {
235 break;
236 }
237 try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
238 } else {
239 if try_epoch <= first_epoch + 1 {
240 let err = anytrace::error!(
241 "We are trying to catchup to an epoch lower than the second epoch! This \
242 means the initial stake table is missing!"
243 );
244 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
245 .await;
246 return;
247 }
248 let mut map_lock = self.catchup_map.lock().await;
250 if let Some(mut rx) = map_lock
251 .get(&try_epoch)
252 .map(InactiveReceiver::activate_cloned)
253 {
254 drop(map_lock);
256 if let Ok(Ok(_)) = rx.recv_direct().await {
257 break;
258 };
259 } else {
261 let (mut tx, rx) = broadcast(1);
263 tx.set_overflow(true);
264 map_lock.insert(try_epoch, rx.deactivate());
265 drop(map_lock);
266 fetch_epochs.push((try_epoch, tx));
267 try_epoch = TYPES::Epoch::new(try_epoch.saturating_sub(1));
268 }
269 };
270 }
271 let epochs = fetch_epochs.iter().map(|(e, _)| e).collect::<Vec<_>>();
272 tracing::warn!("Fetching stake tables for epochs: {epochs:?}");
273
274 while let Some((current_fetch_epoch, tx)) = fetch_epochs.pop() {
276 match self.fetch_stake_table(current_fetch_epoch).await {
277 Ok(_) => {},
278 Err(err) => {
279 fetch_epochs.push((current_fetch_epoch, tx));
280 self.catchup_cleanup(epoch, epoch_tx, fetch_epochs, err)
281 .await;
282 return;
283 },
284 };
285
286 if let Ok(Some(res)) = tx.try_broadcast(Ok(EpochMembership {
288 epoch: Some(current_fetch_epoch),
289 coordinator: self.clone(),
290 })) {
291 tracing::warn!(
292 "The catchup channel for epoch {} was overflown, dropped message {:?}",
293 current_fetch_epoch,
294 res.map(|em| em.epoch)
295 );
296 }
297
298 self.catchup_map.lock().await.remove(¤t_fetch_epoch);
300 }
301
302 let root_leaf = match self.fetch_stake_table(epoch).await {
303 Ok(root_leaf) => root_leaf,
304 Err(err) => {
305 tracing::error!("Failed to fetch stake table for epoch {epoch:?}: {err:?}");
306 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
307 .await;
308 return;
309 },
310 };
311
312 match <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
313 self.membership.clone(),
314 epoch,
315 )
316 .await
317 {
318 Ok(drb_result) => {
319 tracing::warn!(
320 ?drb_result,
321 "DRB result for epoch {epoch:?} retrieved from peers. Updating membership."
322 );
323 self.membership
324 .write()
325 .await
326 .add_drb_result(epoch, drb_result);
327 },
328 Err(err) => {
329 tracing::warn!(
330 "Recalculating missing DRB result for epoch {}. Catchup failed with error: {}",
331 epoch,
332 err
333 );
334
335 let result = self.compute_drb_result(epoch, root_leaf).await;
336
337 log!(result);
338
339 if let Err(err) = result {
340 self.catchup_cleanup(epoch, epoch_tx.clone(), fetch_epochs, err)
341 .await;
342 }
343 },
344 };
345
346 if let Ok(Some(res)) = epoch_tx.try_broadcast(Ok(EpochMembership {
348 epoch: Some(epoch),
349 coordinator: self.clone(),
350 })) {
351 tracing::warn!(
352 "The catchup channel for epoch {} was overflown, dropped message {:?}",
353 epoch,
354 res.map(|em| em.epoch)
355 );
356 }
357
358 self.catchup_map.lock().await.remove(&epoch);
360 }
361
362 pub async fn wait_for_catchup(&self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
368 let maybe_receiver = self
369 .catchup_map
370 .lock()
371 .await
372 .get(&epoch)
373 .map(InactiveReceiver::activate_cloned);
374 let Some(mut rx) = maybe_receiver else {
375 if self.membership.read().await.has_stake_table(epoch) {
377 return Ok(EpochMembership {
378 epoch: Some(epoch),
379 coordinator: self.clone(),
380 });
381 }
382 return Err(anytrace::error!(
383 "No catchup in progress for epoch {epoch} and we don't have a stake table for it"
384 ));
385 };
386 let Ok(Ok(mem)) = rx.recv_direct().await else {
387 return Err(anytrace::error!("Catchup for epoch {epoch} failed"));
388 };
389 Ok(mem)
390 }
391
392 async fn catchup_cleanup(
399 &mut self,
400 req_epoch: TYPES::Epoch,
401 epoch_tx: Sender<Result<EpochMembership<TYPES>>>,
402 mut cancel_epochs: Vec<EpochSender<TYPES>>,
403 err: Error,
404 ) {
405 cancel_epochs.push((req_epoch, epoch_tx));
407
408 tracing::error!(
409 "catchup for epoch {req_epoch:?} failed: {err:?}. Canceling catchup for epochs: {:?}",
410 cancel_epochs.iter().map(|(e, _)| e).collect::<Vec<_>>()
411 );
412 let mut map_lock = self.catchup_map.lock().await;
413 for (epoch, _) in cancel_epochs.iter() {
414 map_lock.remove(epoch);
416 }
417 drop(map_lock);
418 for (cancel_epoch, tx) in cancel_epochs {
419 if let Ok(Some(res)) = tx.try_broadcast(Err(err.clone())) {
421 tracing::warn!(
422 "The catchup channel for epoch {} was overflown during cleanup, dropped \
423 message {:?}",
424 cancel_epoch,
425 res.map(|em| em.epoch)
426 );
427 }
428 }
429 }
430
431 async fn fetch_stake_table(&self, epoch: TYPES::Epoch) -> Result<Leaf2<TYPES>> {
445 let root_epoch = TYPES::Epoch::new(epoch.saturating_sub(2));
446 let Ok(root_membership) = self.stake_table_for_epoch(Some(root_epoch)).await else {
447 return Err(anytrace::error!(
448 "We tried to fetch stake table for epoch {epoch:?} but we don't have its root \
449 epoch {root_epoch:?}. This should not happen"
450 ));
451 };
452
453 let Ok(root_leaf) = root_membership.get_epoch_root().await else {
456 return Err(anytrace::error!(
457 "get epoch root leaf failed for epoch {root_epoch:?}"
458 ));
459 };
460
461 Membership::add_epoch_root(
462 Arc::clone(&self.membership),
463 root_leaf.block_header().clone(),
464 )
465 .await
466 .map_err(|e| {
467 anytrace::error!("Failed to add epoch root for epoch {epoch:?} to membership: {e}")
468 })?;
469
470 Ok(root_leaf)
471 }
472
473 pub async fn compute_drb_result(
474 &self,
475 epoch: TYPES::Epoch,
476 root_leaf: Leaf2<TYPES>,
477 ) -> Result<DrbResult> {
478 let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
479
480 if drb_calculation_map_lock.contains(&epoch) {
481 return Err(anytrace::debug!(
482 "DRB calculation for epoch {} already in progress",
483 epoch
484 ));
485 } else {
486 drb_calculation_map_lock.insert(epoch);
487 }
488
489 drop(drb_calculation_map_lock);
490
491 let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures) else {
492 return Err(anytrace::error!(
493 "Failed to serialize the QC signature for leaf {root_leaf:?}"
494 ));
495 };
496
497 let Some(drb_difficulty_selector) = self.drb_difficulty_selector.read().await.clone()
498 else {
499 return Err(anytrace::error!(
500 "The DRB difficulty selector is missing from the epoch membership coordinator. \
501 This node will not be able to spawn any DRB calculation tasks from catchup."
502 ));
503 };
504
505 let drb_difficulty = drb_difficulty_selector(root_leaf.block_header().version()).await;
506
507 let mut drb_seed_input = [0u8; 32];
508 let len = drb_seed_input_vec.len().min(32);
509 drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
510 let drb_input = DrbInput {
511 epoch: *epoch,
512 iteration: 0,
513 value: drb_seed_input,
514 difficulty_level: drb_difficulty,
515 };
516
517 let store_drb_progress_fn = self.store_drb_progress_fn.clone();
518 let load_drb_progress_fn = self.load_drb_progress_fn.clone();
519
520 let drb = compute_drb_result(drb_input, store_drb_progress_fn, load_drb_progress_fn).await;
521
522 let mut drb_calculation_map_lock = self.drb_calculation_map.lock().await;
523 drb_calculation_map_lock.remove(&epoch);
524 drop(drb_calculation_map_lock);
525
526 tracing::info!("Writing drb result from catchup to storage for epoch {epoch}: {drb:?}");
527 if let Err(e) = (self.store_drb_result_fn)(epoch, drb).await {
528 tracing::warn!("Failed to add drb result to storage: {e}");
529 }
530 self.membership.write().await.add_drb_result(epoch, drb);
531
532 Ok(drb)
533 }
534}
535
536fn spawn_catchup<T: NodeType>(
537 coordinator: EpochMembershipCoordinator<T>,
538 epoch: T::Epoch,
539 epoch_tx: Sender<Result<EpochMembership<T>>>,
540) {
541 tokio::spawn(async move {
542 coordinator.clone().catchup(epoch, epoch_tx).await;
543 });
544}
545pub struct EpochMembership<TYPES: NodeType> {
548 pub epoch: Option<TYPES::Epoch>,
550 pub coordinator: EpochMembershipCoordinator<TYPES>,
552}
553
554impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
555 fn clone(&self) -> Self {
556 Self {
557 coordinator: self.coordinator.clone(),
558 epoch: self.epoch,
559 }
560 }
561}
562
563impl<TYPES: NodeType> EpochMembership<TYPES> {
564 pub fn epoch(&self) -> Option<TYPES::Epoch> {
566 self.epoch
567 }
568
569 pub async fn next_epoch(&self) -> Result<Self> {
571 ensure!(
572 self.epoch().is_some(),
573 "No next epoch because epoch is None"
574 );
575 self.coordinator
576 .membership_for_epoch(self.epoch.map(|e| e + 1))
577 .await
578 }
579 pub async fn next_epoch_stake_table(&self) -> Result<Self> {
581 ensure!(
582 self.epoch().is_some(),
583 "No next epoch because epoch is None"
584 );
585 self.coordinator
586 .stake_table_for_epoch(self.epoch.map(|e| e + 1))
587 .await
588 }
589 pub async fn get_new_epoch(&self, epoch: Option<TYPES::Epoch>) -> Result<Self> {
590 self.coordinator.membership_for_epoch(epoch).await
591 }
592
593 async fn get_epoch_root(&self) -> anyhow::Result<Leaf2<TYPES>> {
595 let Some(epoch) = self.epoch else {
596 anyhow::bail!("Cannot get root for None epoch");
597 };
598 <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
599 self.coordinator.membership.clone(),
600 epoch,
601 )
602 .await
603 }
604
605 pub async fn get_epoch_drb(&self) -> Result<DrbResult> {
607 let Some(epoch) = self.epoch else {
608 return Err(anytrace::warn!("Cannot get drb for None epoch"));
609 };
610 <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
611 self.coordinator.membership.clone(),
612 epoch,
613 )
614 .await
615 .wrap()
616 }
617
618 pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
620 self.coordinator
621 .membership
622 .read()
623 .await
624 .stake_table(self.epoch)
625 }
626
627 pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
629 self.coordinator
630 .membership
631 .read()
632 .await
633 .da_stake_table(self.epoch)
634 }
635
636 pub async fn committee_members(
638 &self,
639 view_number: TYPES::View,
640 ) -> BTreeSet<TYPES::SignatureKey> {
641 self.coordinator
642 .membership
643 .read()
644 .await
645 .committee_members(view_number, self.epoch)
646 }
647
648 pub async fn da_committee_members(
650 &self,
651 view_number: TYPES::View,
652 ) -> BTreeSet<TYPES::SignatureKey> {
653 self.coordinator
654 .membership
655 .read()
656 .await
657 .da_committee_members(view_number, self.epoch)
658 }
659
660 pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
663 self.coordinator
664 .membership
665 .read()
666 .await
667 .stake(pub_key, self.epoch)
668 }
669
670 pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
673 self.coordinator
674 .membership
675 .read()
676 .await
677 .da_stake(pub_key, self.epoch)
678 }
679
680 pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
682 self.coordinator
683 .membership
684 .read()
685 .await
686 .has_stake(pub_key, self.epoch)
687 }
688
689 pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
691 self.coordinator
692 .membership
693 .read()
694 .await
695 .has_da_stake(pub_key, self.epoch)
696 }
697
698 pub async fn leader(&self, view: TYPES::View) -> Result<TYPES::SignatureKey> {
706 self.coordinator
707 .membership
708 .read()
709 .await
710 .leader(view, self.epoch)
711 }
712
713 pub async fn lookup_leader(
721 &self,
722 view: TYPES::View,
723 ) -> std::result::Result<
724 TYPES::SignatureKey,
725 <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
726 > {
727 self.coordinator
728 .membership
729 .read()
730 .await
731 .lookup_leader(view, self.epoch)
732 }
733
734 pub async fn total_nodes(&self) -> usize {
736 self.coordinator
737 .membership
738 .read()
739 .await
740 .total_nodes(self.epoch)
741 }
742
743 pub async fn da_total_nodes(&self) -> usize {
745 self.coordinator
746 .membership
747 .read()
748 .await
749 .da_total_nodes(self.epoch)
750 }
751
752 pub async fn success_threshold(&self) -> U256 {
754 self.coordinator
755 .membership
756 .read()
757 .await
758 .success_threshold(self.epoch)
759 }
760
761 pub async fn da_success_threshold(&self) -> U256 {
763 self.coordinator
764 .membership
765 .read()
766 .await
767 .da_success_threshold(self.epoch)
768 }
769
770 pub async fn failure_threshold(&self) -> U256 {
772 self.coordinator
773 .membership
774 .read()
775 .await
776 .failure_threshold(self.epoch)
777 }
778
779 pub async fn upgrade_threshold(&self) -> U256 {
781 self.coordinator
782 .membership
783 .read()
784 .await
785 .upgrade_threshold(self.epoch)
786 }
787
788 pub async fn add_drb_result(&self, drb_result: DrbResult) {
790 if let Some(epoch) = self.epoch() {
791 self.coordinator
792 .membership
793 .write()
794 .await
795 .add_drb_result(epoch, drb_result);
796 }
797 }
798 pub async fn stake_table_hash(
799 &self,
800 ) -> Option<Commitment<<TYPES::Membership as Membership<TYPES>>::StakeTableHash>> {
801 self.coordinator
802 .membership
803 .read()
804 .await
805 .stake_table_hash(self.epoch?)
806 }
807}