1use std::{
2 collections::{BTreeSet, HashMap},
3 sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_broadcast::{broadcast, InactiveReceiver};
8use async_lock::{Mutex, RwLock};
9use hotshot_utils::{
10 anytrace::{self, Error, Level, Result, Wrap, DEFAULT_LOG_LEVEL},
11 ensure, line_info, log, warn,
12};
13
14use crate::{
15 data::Leaf2,
16 drb::{compute_drb_result, DrbInput, DrbResult},
17 stake_table::HSStakeTable,
18 traits::{
19 election::Membership,
20 node_implementation::{ConsensusTime, NodeType},
21 storage::{
22 storage_add_drb_result, store_drb_progress_fn, Storage, StorageAddDrbResultFn,
23 StoreDrbProgressFn,
24 },
25 },
26 utils::{root_block_in_epoch, transition_block_for_epoch},
27 PeerConfig,
28};
29
30type EpochMap<TYPES> =
31 HashMap<<TYPES as NodeType>::Epoch, InactiveReceiver<Result<EpochMembership<TYPES>>>>;
32
33pub struct EpochMembershipCoordinator<TYPES: NodeType> {
35 membership: Arc<RwLock<TYPES::Membership>>,
37
38 catchup_map: Arc<Mutex<EpochMap<TYPES>>>,
43
44 storage_add_drb_result_fn: Option<StorageAddDrbResultFn<TYPES>>,
46
47 pub epoch_height: u64,
49
50 store_drb_progress_fn: StoreDrbProgressFn,
51}
52
53impl<TYPES: NodeType> Clone for EpochMembershipCoordinator<TYPES> {
54 fn clone(&self) -> Self {
55 Self {
56 membership: Arc::clone(&self.membership),
57 catchup_map: Arc::clone(&self.catchup_map),
58 storage_add_drb_result_fn: self.storage_add_drb_result_fn.clone(),
59 epoch_height: self.epoch_height,
60 store_drb_progress_fn: Arc::clone(&self.store_drb_progress_fn),
61 }
62 }
63}
64
65impl<TYPES: NodeType> EpochMembershipCoordinator<TYPES>
66where
67 Self: Send,
68{
69 pub fn new<S: Storage<TYPES>>(
71 membership: Arc<RwLock<TYPES::Membership>>,
72 epoch_height: u64,
73 storage: &S,
74 ) -> Self {
75 Self {
76 membership,
77 catchup_map: Arc::default(),
78 epoch_height,
79 store_drb_progress_fn: store_drb_progress_fn(storage.clone()),
80 storage_add_drb_result_fn: Some(storage_add_drb_result(storage.clone())),
81 }
82 }
83
84 #[must_use]
86 pub fn membership(&self) -> &Arc<RwLock<TYPES::Membership>> {
87 &self.membership
88 }
89
90 pub async fn membership_for_epoch(
93 &self,
94 maybe_epoch: Option<TYPES::Epoch>,
95 ) -> Result<EpochMembership<TYPES>> {
96 let ret_val = EpochMembership {
97 epoch: maybe_epoch,
98 coordinator: self.clone(),
99 };
100 let Some(epoch) = maybe_epoch else {
101 return Ok(ret_val);
102 };
103 if self
104 .membership
105 .read()
106 .await
107 .has_randomized_stake_table(epoch)
108 {
109 return Ok(ret_val);
110 }
111 if self.catchup_map.lock().await.contains_key(&epoch) {
112 return Err(warn!(
113 "Randomized stake table for epoch {:?} unavailable. Catchup already in progress",
114 epoch
115 ));
116 }
117 let coordinator = self.clone();
118 spawn_catchup(coordinator, epoch);
119
120 Err(warn!(
121 "Randomized stake table for epoch {:?} unavailable. Starting catchup",
122 epoch
123 ))
124 }
125
126 pub async fn stake_table_for_epoch(
129 &self,
130 maybe_epoch: Option<TYPES::Epoch>,
131 ) -> Result<EpochMembership<TYPES>> {
132 let ret_val = EpochMembership {
133 epoch: maybe_epoch,
134 coordinator: self.clone(),
135 };
136 let Some(epoch) = maybe_epoch else {
137 return Ok(ret_val);
138 };
139 if self.membership.read().await.has_stake_table(epoch) {
140 return Ok(ret_val);
141 }
142 if self.catchup_map.lock().await.contains_key(&epoch) {
143 return Err(warn!(
144 "Stake table for Epoch {:?} Unavailable. Catch up already in Progress",
145 epoch
146 ));
147 }
148 let coordinator = self.clone();
149 spawn_catchup(coordinator, epoch);
150
151 Err(warn!(
152 "Stake table for Epoch {:?} Unavailable. Starting catchup",
153 epoch
154 ))
155 }
156
157 async fn catchup(self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
166 ensure!(
168 *epoch != 0 && *epoch != 1,
169 "We are trying to catchup to epoch 0! This means the initial stake table is missing!"
170 );
171 let root_epoch = TYPES::Epoch::new(*epoch - 2);
172
173 let root_membership = if self.membership.read().await.has_stake_table(root_epoch) {
174 EpochMembership {
175 epoch: Some(root_epoch),
176 coordinator: self.clone(),
177 }
178 } else {
179 Box::pin(self.wait_for_catchup(root_epoch)).await?
180 };
181
182 let Ok(root_leaf) = root_membership
185 .get_epoch_root(root_block_in_epoch(*root_epoch, self.epoch_height))
186 .await
187 else {
188 anytrace::bail!("get epoch root failed for epoch {:?}", root_epoch);
189 };
190
191 let add_epoch_root_updater = {
192 let membership_read = self.membership.read().await;
193 membership_read
194 .add_epoch_root(epoch, root_leaf.block_header().clone())
195 .await
196 };
197
198 if let Some(updater) = add_epoch_root_updater {
199 let mut membership_write = self.membership.write().await;
200 updater(&mut *(membership_write));
201 };
202
203 let drb_membership = match root_membership.next_epoch_stake_table().await {
204 Ok(drb_membership) => drb_membership,
205 Err(_) => Box::pin(self.wait_for_catchup(root_epoch + 1)).await?,
206 };
207
208 let drb = if let Ok(drb) = drb_membership
211 .get_epoch_drb(transition_block_for_epoch(
212 *(root_epoch + 1),
213 self.epoch_height,
214 ))
215 .await
216 {
217 drb
218 } else {
219 let Ok(drb_seed_input_vec) = bincode::serialize(&root_leaf.justify_qc().signatures)
220 else {
221 return Err(anytrace::error!("Failed to serialize the QC signature."));
222 };
223
224 let mut drb_seed_input = [0u8; 32];
225 let len = drb_seed_input_vec.len().min(32);
226 drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
227 let drb_input = DrbInput {
228 epoch: *epoch,
229 iteration: 0,
230 value: drb_seed_input,
231 };
232 let store_drb_progress_fn = self.store_drb_progress_fn.clone();
233 tokio::task::spawn_blocking(move || {
234 compute_drb_result(drb_input, store_drb_progress_fn)
235 })
236 .await
237 .unwrap()
238 };
239
240 if let Some(cb) = &self.storage_add_drb_result_fn {
241 tracing::info!("Writing drb result from catchup to storage for epoch {epoch}");
242 if let Err(e) = cb(epoch, drb).await {
243 tracing::warn!("Failed to add drb result to storage: {e}");
244 }
245 }
246
247 self.membership.write().await.add_drb_result(epoch, drb);
248 Ok(EpochMembership {
249 epoch: Some(epoch),
250 coordinator: self.clone(),
251 })
252 }
253
254 pub async fn wait_for_catchup(&self, epoch: TYPES::Epoch) -> Result<EpochMembership<TYPES>> {
255 let Some(mut rx) = self
256 .catchup_map
257 .lock()
258 .await
259 .get(&epoch)
260 .map(InactiveReceiver::activate_cloned)
261 else {
262 return self.clone().catchup(epoch).await;
263 };
264 let Ok(Ok(mem)) = rx.recv_direct().await else {
265 return self.clone().catchup(epoch).await;
266 };
267 Ok(mem)
268 }
269}
270
271fn spawn_catchup<T: NodeType>(coordinator: EpochMembershipCoordinator<T>, epoch: T::Epoch) {
272 tokio::spawn(async move {
273 let tx = {
274 let mut map = coordinator.catchup_map.lock().await;
275 if map.contains_key(&epoch) {
276 return;
277 }
278 let (tx, rx) = broadcast(1);
279 map.insert(epoch, rx.deactivate());
280 tx
281 };
282 let result = coordinator.clone().catchup(epoch).await;
285 let _ = tx.broadcast_direct(result.clone()).await;
286
287 if let Err(err) = result {
288 tracing::warn!("failed to catchup for epoch={epoch:?}. err={err:#}");
289 coordinator.catchup_map.lock().await.remove(&epoch);
290 }
291 });
292}
293pub struct EpochMembership<TYPES: NodeType> {
296 pub epoch: Option<TYPES::Epoch>,
298 pub coordinator: EpochMembershipCoordinator<TYPES>,
300}
301
302impl<TYPES: NodeType> Clone for EpochMembership<TYPES> {
303 fn clone(&self) -> Self {
304 Self {
305 coordinator: self.coordinator.clone(),
306 epoch: self.epoch,
307 }
308 }
309}
310
311impl<TYPES: NodeType> EpochMembership<TYPES> {
312 pub fn epoch(&self) -> Option<TYPES::Epoch> {
314 self.epoch
315 }
316
317 pub async fn next_epoch(&self) -> Result<Self> {
319 ensure!(
320 self.epoch().is_some(),
321 "No next epoch because epoch is None"
322 );
323 self.coordinator
324 .membership_for_epoch(self.epoch.map(|e| e + 1))
325 .await
326 }
327 pub async fn next_epoch_stake_table(&self) -> Result<Self> {
329 ensure!(
330 self.epoch().is_some(),
331 "No next epoch because epoch is None"
332 );
333 self.coordinator
334 .stake_table_for_epoch(self.epoch.map(|e| e + 1))
335 .await
336 }
337 pub async fn get_new_epoch(&self, epoch: Option<TYPES::Epoch>) -> Result<Self> {
338 self.coordinator.membership_for_epoch(epoch).await
339 }
340
341 async fn get_epoch_root(&self, block_height: u64) -> anyhow::Result<Leaf2<TYPES>> {
343 let Some(epoch) = self.epoch else {
344 anyhow::bail!("Cannot get root for None epoch");
345 };
346 <TYPES::Membership as Membership<TYPES>>::get_epoch_root(
347 self.coordinator.membership.clone(),
348 block_height,
349 epoch,
350 )
351 .await
352 }
353
354 async fn get_epoch_drb(&self, block_height: u64) -> Result<DrbResult> {
356 let Some(epoch) = self.epoch else {
357 return Err(anytrace::warn!("Cannot get drb for None epoch"));
358 };
359 <TYPES::Membership as Membership<TYPES>>::get_epoch_drb(
360 self.coordinator.membership.clone(),
361 block_height,
362 epoch,
363 )
364 .await
365 .wrap()
366 }
367
368 pub async fn stake_table(&self) -> HSStakeTable<TYPES> {
370 self.coordinator
371 .membership
372 .read()
373 .await
374 .stake_table(self.epoch)
375 }
376
377 pub async fn da_stake_table(&self) -> HSStakeTable<TYPES> {
379 self.coordinator
380 .membership
381 .read()
382 .await
383 .da_stake_table(self.epoch)
384 }
385
386 pub async fn committee_members(
388 &self,
389 view_number: TYPES::View,
390 ) -> BTreeSet<TYPES::SignatureKey> {
391 self.coordinator
392 .membership
393 .read()
394 .await
395 .committee_members(view_number, self.epoch)
396 }
397
398 pub async fn da_committee_members(
400 &self,
401 view_number: TYPES::View,
402 ) -> BTreeSet<TYPES::SignatureKey> {
403 self.coordinator
404 .membership
405 .read()
406 .await
407 .da_committee_members(view_number, self.epoch)
408 }
409
410 pub async fn stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
413 self.coordinator
414 .membership
415 .read()
416 .await
417 .stake(pub_key, self.epoch)
418 }
419
420 pub async fn da_stake(&self, pub_key: &TYPES::SignatureKey) -> Option<PeerConfig<TYPES>> {
423 self.coordinator
424 .membership
425 .read()
426 .await
427 .da_stake(pub_key, self.epoch)
428 }
429
430 pub async fn has_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
432 self.coordinator
433 .membership
434 .read()
435 .await
436 .has_stake(pub_key, self.epoch)
437 }
438
439 pub async fn has_da_stake(&self, pub_key: &TYPES::SignatureKey) -> bool {
441 self.coordinator
442 .membership
443 .read()
444 .await
445 .has_da_stake(pub_key, self.epoch)
446 }
447
448 pub async fn leader(&self, view: TYPES::View) -> Result<TYPES::SignatureKey> {
456 self.coordinator
457 .membership
458 .read()
459 .await
460 .leader(view, self.epoch)
461 }
462
463 pub async fn lookup_leader(
471 &self,
472 view: TYPES::View,
473 ) -> std::result::Result<
474 TYPES::SignatureKey,
475 <<TYPES as NodeType>::Membership as Membership<TYPES>>::Error,
476 > {
477 self.coordinator
478 .membership
479 .read()
480 .await
481 .lookup_leader(view, self.epoch)
482 }
483
484 pub async fn total_nodes(&self) -> usize {
486 self.coordinator
487 .membership
488 .read()
489 .await
490 .total_nodes(self.epoch)
491 }
492
493 pub async fn da_total_nodes(&self) -> usize {
495 self.coordinator
496 .membership
497 .read()
498 .await
499 .da_total_nodes(self.epoch)
500 }
501
502 pub async fn success_threshold(&self) -> U256 {
504 self.coordinator
505 .membership
506 .read()
507 .await
508 .success_threshold(self.epoch)
509 }
510
511 pub async fn da_success_threshold(&self) -> U256 {
513 self.coordinator
514 .membership
515 .read()
516 .await
517 .da_success_threshold(self.epoch)
518 }
519
520 pub async fn failure_threshold(&self) -> U256 {
522 self.coordinator
523 .membership
524 .read()
525 .await
526 .failure_threshold(self.epoch)
527 }
528
529 pub async fn upgrade_threshold(&self) -> U256 {
531 self.coordinator
532 .membership
533 .read()
534 .await
535 .upgrade_threshold(self.epoch)
536 }
537
538 pub async fn add_drb_result(&self, drb_result: DrbResult) {
540 if let Some(epoch) = self.epoch() {
541 self.coordinator
542 .membership
543 .write()
544 .await
545 .add_drb_result(epoch, drb_result);
546 }
547 }
548}