1use std::{
2 cmp::{max, min},
3 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
4 future::Future,
5 sync::Arc,
6};
7
8use alloy::{
9 primitives::{Address, U256},
10 rpc::types::Log,
11};
12use anyhow::{bail, Context};
13use async_lock::{Mutex, RwLock};
14use committable::Committable;
15use futures::stream::{self, StreamExt};
16use hotshot::types::{BLSPubKey, SchnorrPubKey, SignatureKey as _};
17use hotshot_contract_adapter::sol_types::StakeTable::{
18 self, ConsensusKeysUpdated, Delegated, Undelegated, ValidatorExit, ValidatorRegistered,
19};
20use hotshot_types::{
21 data::{vid_disperse::VID_TARGET_TOTAL_STAKE, EpochNumber},
22 drb::{
23 election::{generate_stake_cdf, select_randomized_leader, RandomizedCommittee},
24 DrbResult,
25 },
26 stake_table::{HSStakeTable, StakeTableEntry},
27 traits::{
28 election::Membership,
29 node_implementation::{ConsensusTime, NodeType},
30 signature_key::StakeTableEntryType,
31 },
32 PeerConfig,
33};
34use indexmap::IndexMap;
35use thiserror::Error;
36use tokio::{spawn, time::sleep};
37use tracing::Instrument;
38
39#[cfg(any(test, feature = "testing"))]
40use super::v0_3::DAMembers;
41use super::{
42 traits::{MembershipPersistence, StateCatchup},
43 v0_3::{EventKey, StakeTableEvent, StakeTableFetcher, StakeTableUpdateTask, Validator},
44 v0_99::ChainConfig,
45 Header, L1Client, Leaf2, PubKey, SeqTypes,
46};
47
48type Epoch = <SeqTypes as NodeType>::Epoch;
49
50#[derive(Clone, PartialEq)]
51pub struct StakeTableEvents {
52 registrations: Vec<(ValidatorRegistered, Log)>,
53 deregistrations: Vec<(ValidatorExit, Log)>,
54 delegated: Vec<(Delegated, Log)>,
55 undelegated: Vec<(Undelegated, Log)>,
56 keys: Vec<(ConsensusKeysUpdated, Log)>,
57}
58
59impl StakeTableEvents {
60 pub fn sort_events(self) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
61 let mut events: Vec<(EventKey, StakeTableEvent)> = Vec::new();
62 let Self {
63 registrations,
64 deregistrations,
65 delegated,
66 undelegated,
67 keys,
68 } = self;
69
70 for (registration, log) in registrations {
71 events.push((
72 (
73 log.block_number.context("block number")?,
74 log.log_index.context("log index")?,
75 ),
76 registration.into(),
77 ));
78 }
79 for (dereg, log) in deregistrations {
80 events.push((
81 (
82 log.block_number.context("block number")?,
83 log.log_index.context("log index")?,
84 ),
85 dereg.into(),
86 ));
87 }
88 for (delegation, log) in delegated {
89 events.push((
90 (
91 log.block_number.context("block number")?,
92 log.log_index.context("log index")?,
93 ),
94 delegation.into(),
95 ));
96 }
97 for (undelegated, log) in undelegated {
98 events.push((
99 (
100 log.block_number.context("block number")?,
101 log.log_index.context("log index")?,
102 ),
103 undelegated.into(),
104 ));
105 }
106
107 for (update, log) in keys {
108 events.push((
109 (
110 log.block_number.context("block number")?,
111 log.log_index.context("log index")?,
112 ),
113 update.into(),
114 ));
115 }
116
117 events.sort_by_key(|(key, _)| (key.0, key.1));
118 Ok(events)
119 }
120}
121
122pub fn validators_from_l1_events<I: Iterator<Item = StakeTableEvent>>(
124 events: I,
125) -> anyhow::Result<IndexMap<Address, Validator<BLSPubKey>>> {
126 let mut validators = IndexMap::new();
127 let mut bls_keys = HashSet::new();
128 let mut schnorr_keys = HashSet::new();
129 for event in events {
130 tracing::debug!("Processing stake table event: {:?}", event);
131 match event {
132 StakeTableEvent::Register(ValidatorRegistered {
133 account,
134 blsVk,
135 schnorrVk,
136 commission,
137 }) => {
138 let stake_table_key: BLSPubKey = blsVk.clone().into();
140 let state_ver_key: SchnorrPubKey = schnorrVk.clone().into();
141 if bls_keys.contains(&stake_table_key) {
145 bail!("bls key {} already used", stake_table_key.to_string());
146 };
147
148 if schnorr_keys.contains(&state_ver_key) {
150 tracing::warn!("schnorr key {} already used", state_ver_key.to_string());
151 };
152
153 bls_keys.insert(stake_table_key);
154 schnorr_keys.insert(state_ver_key.clone());
155
156 match validators.entry(account) {
157 indexmap::map::Entry::Occupied(_occupied_entry) => {
158 bail!("validator {:#x} already registered", *account)
159 },
160 indexmap::map::Entry::Vacant(vacant_entry) => vacant_entry.insert(Validator {
161 account,
162 stake_table_key,
163 state_ver_key,
164 stake: U256::from(0_u64),
165 commission,
166 delegators: HashMap::default(),
167 }),
168 };
169 },
170 StakeTableEvent::Deregister(exit) => {
171 validators
172 .shift_remove(&exit.validator)
173 .with_context(|| format!("validator {:#x} not found", exit.validator))?;
174 },
175 StakeTableEvent::Delegate(delegated) => {
176 let Delegated {
177 delegator,
178 validator,
179 amount,
180 } = delegated;
181 let validator_entry = validators
182 .get_mut(&validator)
183 .with_context(|| format!("validator {validator:#x} not found"))?;
184
185 if amount.is_zero() {
186 tracing::warn!("delegator {delegator:?} has 0 stake");
187 continue;
188 }
189 validator_entry.stake += amount;
191 validator_entry
194 .delegators
195 .entry(delegator)
196 .and_modify(|stake| *stake += amount)
197 .or_insert(amount);
198 },
199 StakeTableEvent::Undelegate(undelegated) => {
200 let Undelegated {
201 delegator,
202 validator,
203 amount,
204 } = undelegated;
205 let validator_entry = validators
206 .get_mut(&validator)
207 .with_context(|| format!("validator {validator:#x} not found"))?;
208
209 validator_entry.stake = validator_entry
210 .stake
211 .checked_sub(amount)
212 .with_context(|| "stake is less than undelegated amount")?;
213
214 let delegator_stake = validator_entry
215 .delegators
216 .get_mut(&delegator)
217 .with_context(|| format!("delegator {delegator:#x} not found"))?;
218 *delegator_stake = delegator_stake
219 .checked_sub(amount)
220 .with_context(|| "delegator_stake is less than undelegated amount")?;
221
222 if delegator_stake.is_zero() {
223 validator_entry.delegators.remove(&delegator);
225 }
226 },
227 StakeTableEvent::KeyUpdate(update) => {
228 let ConsensusKeysUpdated {
229 account,
230 blsVK,
231 schnorrVK,
232 } = update;
233 let validator = validators
234 .get_mut(&account)
235 .with_context(|| "validator {account:#x} not found")?;
236 let bls = blsVK.into();
237 let state_ver_key = schnorrVK.into();
238
239 validator.stake_table_key = bls;
240 validator.state_ver_key = state_ver_key;
241 },
242 }
243 }
244
245 Ok(validators)
246}
247
248pub(crate) fn select_active_validator_set(
252 validators: &mut IndexMap<Address, Validator<BLSPubKey>>,
253) -> anyhow::Result<()> {
254 validators.retain(|address, validator| {
256 if validator.delegators.is_empty() {
257 tracing::info!("Validator {address:?} does not have any delegator");
258 return false;
259 }
260
261 if validator.stake.is_zero() {
262 tracing::info!("Validator {address:?} does not have any stake");
263 return false;
264 }
265
266 true
267 });
268
269 if validators.is_empty() {
270 bail!("No valid validators found");
271 }
272
273 let maximum_stake = validators
275 .values()
276 .map(|v| v.stake)
277 .max()
278 .context("Failed to determine max stake")?;
279
280 let minimum_stake = maximum_stake
281 .checked_div(U256::from(VID_TARGET_TOTAL_STAKE))
282 .context("div err")?;
283
284 let mut valid_stakers: Vec<_> = validators
286 .iter()
287 .filter(|(_, v)| v.stake >= minimum_stake)
288 .map(|(addr, v)| (*addr, v.stake))
289 .collect();
290
291 valid_stakers.sort_by_key(|(_, stake)| std::cmp::Reverse(*stake));
293
294 if valid_stakers.len() > 100 {
296 valid_stakers.truncate(100);
297 }
298
299 let selected_addresses: HashSet<_> = valid_stakers.iter().map(|(addr, _)| *addr).collect();
301 validators.retain(|address, _| selected_addresses.contains(address));
302
303 Ok(())
304}
305
306pub(crate) fn active_validator_set_from_l1_events<I: Iterator<Item = StakeTableEvent>>(
308 events: I,
309) -> anyhow::Result<IndexMap<Address, Validator<BLSPubKey>>> {
310 let mut validators = validators_from_l1_events(events)?;
311 select_active_validator_set(&mut validators)?;
312 Ok(validators)
313}
314
315impl std::fmt::Debug for StakeTableEvent {
316 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
317 match self {
318 StakeTableEvent::Register(event) => write!(f, "Register({:?})", event.account),
319 StakeTableEvent::Deregister(event) => write!(f, "Deregister({:?})", event.validator),
320 StakeTableEvent::Delegate(event) => write!(f, "Delegate({:?})", event.delegator),
321 StakeTableEvent::Undelegate(event) => write!(f, "Undelegate({:?})", event.delegator),
322 StakeTableEvent::KeyUpdate(event) => write!(f, "KeyUpdate({:?})", event.account),
323 }
324 }
325}
326
327#[derive(Clone, derive_more::derive::Debug)]
328pub struct EpochCommittees {
330 non_epoch_committee: NonEpochCommittee,
332 state: HashMap<Epoch, EpochCommittee>,
334 randomized_committees: BTreeMap<Epoch, RandomizedCommittee<StakeTableEntry<PubKey>>>,
336 first_epoch: Option<Epoch>,
337 fetcher: Arc<StakeTableFetcher>,
338}
339
340impl StakeTableFetcher {
341 pub fn new(
342 peers: Arc<dyn StateCatchup>,
343 persistence: Arc<Mutex<dyn MembershipPersistence>>,
344 l1_client: L1Client,
345 chain_config: ChainConfig,
346 ) -> Self {
347 Self {
348 peers,
349 persistence,
350 l1_client,
351 chain_config: Arc::new(Mutex::new(chain_config)),
352 update_task: StakeTableUpdateTask(Mutex::new(None)).into(),
353 }
354 }
355
356 pub async fn spawn_update_loop(&self) {
357 let mut update_task = self.update_task.0.lock().await;
358 if update_task.is_none() {
359 *update_task = Some(spawn(self.update_loop()));
360 }
361 }
362
363 fn update_loop(&self) -> impl Future<Output = ()> {
368 let span = tracing::warn_span!("Stake table update loop");
369 let self_clone = self.clone();
370 let state = self.l1_client.state.clone();
371 let l1_retry = self.l1_client.options().l1_retry_delay;
372 let update_delay = self.l1_client.options().stake_table_update_interval;
373 let chain_config = self.chain_config.clone();
374
375 async move {
376 let stake_contract_address = loop {
381 match chain_config.lock().await.stake_table_contract {
382 Some(addr) => break addr,
383 None => {
384 tracing::debug!(
385 "Stake table contract address not found. Retrying in {l1_retry:?}...",
386 );
387 },
388 }
389 sleep(l1_retry).await;
390 };
391
392 loop {
394 let finalized_block = loop {
395 if let Some(block) = state.lock().await.last_finalized {
396 break block;
397 }
398 tracing::debug!(
399 "Finalized block not yet available. Retrying in {l1_retry:?}",
400 );
401 sleep(l1_retry).await;
402 };
403
404 tracing::debug!(
405 "Attempting to fetch stake table at L1 block {finalized_block:?}",
406 );
407
408 loop {
410 match self_clone
411 .fetch_and_store_stake_table_events(stake_contract_address, finalized_block)
412 .await
413 {
414 Ok(_) => {
415 tracing::info!("Successfully fetched and stored stake table events at block={finalized_block:?}");
416 break;
417 },
418 Err(e) => {
419 tracing::error!(
420 "Error fetching stake table at block {finalized_block:?}. err= {e:#}",
421 );
422 sleep(l1_retry).await;
423 },
424 }
425 }
426
427 tracing::debug!(
428 "Waiting {update_delay:?} before next stake table update...",
429 );
430 sleep(update_delay).await;
431 }
432 }
433 .instrument(span)
434 }
435
436 pub async fn fetch_events(
437 &self,
438 contract: Address,
439 to_block: u64,
440 ) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
441 let persistence_lock = self.persistence.lock().await;
442 let res = persistence_lock.load_events().await?;
443 drop(persistence_lock);
444
445 let from_block = res
446 .as_ref()
447 .map(|(block, _)| block + 1)
448 .filter(|from| *from <= to_block); tracing::info!("loaded events from storage from_block={from_block:?}");
451
452 let contract_events = Self::fetch_events_from_contract(
453 self.l1_client.clone(),
454 contract,
455 from_block,
456 to_block,
457 )
458 .await?;
459
460 tracing::info!("loading events from contract to_block={to_block:?}");
461
462 let contract_events = contract_events.sort_events()?;
463 let mut events = match (from_block, res) {
464 (Some(_), Some((_, persistence_events))) => persistence_events
465 .into_iter()
466 .chain(contract_events)
467 .collect(),
468 _ => contract_events,
469 };
470
471 let len_before_dedup = events.len();
476 events.dedup();
477 let len_after_dedup = events.len();
478 if len_before_dedup != len_after_dedup {
479 tracing::warn!("Duplicate events found and removed. This should not normally happen.")
480 }
481
482 Ok(events)
483 }
484
485 pub async fn fetch_events_from_contract(
487 l1_client: L1Client,
488 contract: Address,
489 from_block: Option<u64>,
490 to_block: u64,
491 ) -> anyhow::Result<StakeTableEvents> {
492 let stake_table_contract = StakeTable::new(contract, l1_client.provider.clone());
493
494 let from_block = match from_block {
497 Some(block) => block,
498 None => {
499 loop {
500 match stake_table_contract.initializedAtBlock().call().await {
501 Ok(init_block) => {
502 break init_block._0.to::<u64>();
503 },
504 Err(err) => {
505 tracing::warn!(%err, "Failed to retrieve initial block, retrying..");
507 sleep(l1_client.options().l1_retry_delay).await;
508 },
509 }
510 }
511 },
512 };
513
514 let mut start = from_block;
518 let end = to_block;
519 let chunk_size = l1_client.options().l1_events_max_block_range;
520 let chunks = std::iter::from_fn(move || {
521 let chunk_end = min(start + chunk_size - 1, end);
522 if chunk_end < start {
523 return None;
524 }
525
526 let chunk = (start, chunk_end);
527 start = chunk_end + 1;
528 Some(chunk)
529 });
530
531 let registered_events = stream::iter(chunks.clone()).then(|(from, to)| {
534 let retry_delay = l1_client.options().l1_retry_delay;
535 let stake_table_contract = stake_table_contract.clone();
536 async move {
537 tracing::debug!(from, to, "fetch ValidatorRegistered events in range");
538 loop {
539 match stake_table_contract
540 .clone()
541 .ValidatorRegistered_filter()
542 .from_block(from)
543 .to_block(to)
544 .query()
545 .await
546 {
547 Ok(events) => break stream::iter(events),
548 Err(err) => {
549 tracing::warn!(from, to, %err, "ValidatorRegistered Error");
550 sleep(retry_delay).await;
551 },
552 }
553 }
554 }
555 });
556
557 let deregistered_events = stream::iter(chunks.clone()).then(|(from, to)| {
559 let retry_delay = l1_client.options().l1_retry_delay;
560 let stake_table_contract = stake_table_contract.clone();
561 async move {
562 tracing::debug!(from, to, "fetch ValidatorExit events in range");
563 loop {
564 match stake_table_contract
565 .ValidatorExit_filter()
566 .from_block(from)
567 .to_block(to)
568 .query()
569 .await
570 {
571 Ok(events) => break stream::iter(events),
572 Err(err) => {
573 tracing::warn!(from, to, %err, "ValidatorExit Error");
574 sleep(retry_delay).await;
575 },
576 }
577 }
578 }
579 });
580
581 let delegated_events = stream::iter(chunks.clone()).then(|(from, to)| {
583 let retry_delay = l1_client.options().l1_retry_delay;
584 let stake_table_contract = stake_table_contract.clone();
585 async move {
586 tracing::debug!(from, to, "fetch Delegated events in range");
587 loop {
588 match stake_table_contract
589 .Delegated_filter()
590 .from_block(from)
591 .to_block(to)
592 .query()
593 .await
594 {
595 Ok(events) => break stream::iter(events),
596 Err(err) => {
597 tracing::warn!(from, to, %err, "Delegated Error");
598 sleep(retry_delay).await;
599 },
600 }
601 }
602 }
603 });
604 let undelegated_events = stream::iter(chunks.clone()).then(|(from, to)| {
606 let retry_delay = l1_client.options().l1_retry_delay;
607 let stake_table_contract = stake_table_contract.clone();
608 async move {
609 tracing::debug!(from, to, "fetch Undelegated events in range");
610 loop {
611 match stake_table_contract
612 .Undelegated_filter()
613 .from_block(from)
614 .to_block(to)
615 .query()
616 .await
617 {
618 Ok(events) => break stream::iter(events),
619 Err(err) => {
620 tracing::warn!(from, to, %err, "Undelegated Error");
621 sleep(retry_delay).await;
622 },
623 }
624 }
625 }
626 });
627
628 let keys_update_events = stream::iter(chunks).then(|(from, to)| {
630 let retry_delay = l1_client.options().l1_retry_delay;
631 let stake_table_contract = stake_table_contract.clone();
632 async move {
633 tracing::debug!(from, to, "fetch ConsensusKeysUpdated events in range");
634 loop {
635 match stake_table_contract
636 .ConsensusKeysUpdated_filter()
637 .from_block(from)
638 .to_block(to)
639 .query()
640 .await
641 {
642 Ok(events) => break stream::iter(events),
643 Err(err) => {
644 tracing::warn!(from, to, %err, "ConsensusKeysUpdated Error");
645 sleep(retry_delay).await;
646 },
647 }
648 }
649 }
650 });
651
652 let registrations = registered_events.flatten().collect().await;
653 let deregistrations = deregistered_events.flatten().collect().await;
654 let delegated = delegated_events.flatten().collect().await;
655 let undelegated = undelegated_events.flatten().collect().await;
656 let keys = keys_update_events.flatten().collect().await;
657
658 Ok(StakeTableEvents {
659 registrations,
660 deregistrations,
661 delegated,
662 undelegated,
663 keys,
664 })
665 }
666
667 pub async fn fetch_and_store_stake_table_events(
673 &self,
674 contract: Address,
675 to_block: u64,
676 ) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
677 let events = self.fetch_events(contract, to_block).await?;
678
679 tracing::info!("storing events in storage to_block={to_block:?}");
680
681 {
682 let persistence_lock = self.persistence.lock().await;
683 persistence_lock
684 .store_events(to_block, events.clone())
685 .await
686 .inspect_err(|e| tracing::error!("failed to store events. err={e}"))?;
687 }
688
689 Ok(events)
690 }
691
692 pub async fn fetch_all_validators(
694 l1_client: L1Client,
695 contract: Address,
696 to_block: u64,
697 ) -> anyhow::Result<IndexMap<Address, Validator<BLSPubKey>>> {
698 let events = Self::fetch_events_from_contract(l1_client, contract, None, to_block).await?;
699 let sorted = events.sort_events()?;
700 validators_from_l1_events(sorted.into_iter().map(|(_, e)| e))
702 }
703
704 pub async fn fetch(
705 &self,
706 epoch: Epoch,
707 header: Header,
708 ) -> Option<IndexMap<Address, Validator<BLSPubKey>>> {
709 let chain_config = self.get_chain_config(&header).await.ok()?;
710 *self.chain_config.lock().await = chain_config;
712
713 let Some(address) = chain_config.stake_table_contract else {
714 tracing::error!("No stake table contract address found in Chain config");
715 return None;
716 };
717
718 let Some(l1_finalized_block_info) = header.l1_finalized() else {
719 tracing::error!("The epoch root for epoch {} is missing the L1 finalized block info. This is a fatal error. Consensus is blocked and will not recover.", epoch);
720 return None;
721 };
722
723 let events = match self
724 .fetch_and_store_stake_table_events(address, l1_finalized_block_info.number())
725 .await
726 .map_err(GetStakeTablesError::L1ClientFetchError)
727 {
728 Ok(events) => events,
729 Err(e) => {
730 tracing::error!("failed to fetch stake table events {e:?}");
731 return None;
732 },
733 };
734
735 match active_validator_set_from_l1_events(events.into_iter().map(|(_, e)| e)) {
736 Ok(validators) => Some(validators),
737 Err(e) => {
738 tracing::error!("failed to construct stake table {e:?}");
739 None
740 },
741 }
742 }
743
744 pub(crate) async fn get_chain_config(&self, header: &Header) -> anyhow::Result<ChainConfig> {
747 let chain_config = self.chain_config.lock().await;
748 let peers = self.peers.clone();
749 let header_cf = header.chain_config();
750 if chain_config.commit() == header_cf.commit() {
751 return Ok(*chain_config);
752 }
753
754 let cf = match header_cf.resolve() {
755 Some(cf) => cf,
756 None => peers
757 .fetch_chain_config(header_cf.commit())
758 .await
759 .map_err(|err| {
760 tracing::error!("failed to get chain_config from peers. err: {err:?}");
761 err
762 })?,
763 };
764
765 Ok(cf)
766 }
767
768 #[cfg(any(test, feature = "testing"))]
769 pub fn mock() -> Self {
770 use crate::{mock, v0_1::NoStorage};
771 let chain_config = ChainConfig::default();
772 let l1 = L1Client::new(vec!["http://localhost:3331".parse().unwrap()])
773 .expect("Failed to create L1 client");
774
775 let peers = Arc::new(mock::MockStateCatchup::default());
776 let persistence = NoStorage;
777
778 Self::new(peers, Arc::new(Mutex::new(persistence)), l1, chain_config)
779 }
780}
781#[derive(Clone, Debug)]
783struct NonEpochCommittee {
784 eligible_leaders: Vec<PeerConfig<SeqTypes>>,
788
789 stake_table: Vec<PeerConfig<SeqTypes>>,
791
792 da_members: Vec<PeerConfig<SeqTypes>>,
794
795 indexed_stake_table: HashMap<PubKey, PeerConfig<SeqTypes>>,
797
798 indexed_da_members: HashMap<PubKey, PeerConfig<SeqTypes>>,
800}
801
802#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
804pub struct EpochCommittee {
805 eligible_leaders: Vec<PeerConfig<SeqTypes>>,
809 stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>>,
811 validators: IndexMap<Address, Validator<BLSPubKey>>,
812 address_mapping: HashMap<BLSPubKey, Address>,
813}
814
815impl EpochCommittees {
816 pub fn first_epoch(&self) -> Option<Epoch> {
817 self.first_epoch
818 }
819
820 pub fn fetcher(&self) -> &StakeTableFetcher {
821 &self.fetcher
822 }
823
824 fn update_stake_table(
830 &mut self,
831 epoch: EpochNumber,
832 validators: IndexMap<Address, Validator<BLSPubKey>>,
833 ) {
834 let mut address_mapping = HashMap::new();
835 let stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>> = validators
836 .values()
837 .map(|v| {
838 address_mapping.insert(v.stake_table_key, v.account);
839 (
840 v.stake_table_key,
841 PeerConfig {
842 stake_table_entry: BLSPubKey::stake_table_entry(
843 &v.stake_table_key,
844 v.stake,
845 ),
846 state_ver_key: v.state_ver_key.clone(),
847 },
848 )
849 })
850 .collect();
851
852 let eligible_leaders: Vec<PeerConfig<SeqTypes>> =
853 stake_table.iter().map(|(_, l)| l.clone()).collect();
854
855 self.state.insert(
856 epoch,
857 EpochCommittee {
858 eligible_leaders,
859 stake_table,
860 validators,
861 address_mapping,
862 },
863 );
864 }
865
866 pub fn validators(
867 &self,
868 epoch: &Epoch,
869 ) -> anyhow::Result<IndexMap<Address, Validator<BLSPubKey>>> {
870 Ok(self
871 .state
872 .get(epoch)
873 .context("state for found")?
874 .validators
875 .clone())
876 }
877
878 pub fn address(&self, epoch: &Epoch, bls_key: BLSPubKey) -> anyhow::Result<Address> {
879 let mapping = self
880 .state
881 .get(epoch)
882 .context("state for found")?
883 .address_mapping
884 .clone();
885
886 Ok(*mapping.get(&bls_key).context(format!(
887 "failed to get ethereum address for bls key {bls_key}. epoch={epoch:?}"
888 ))?)
889 }
890
891 pub fn get_validator_config(
892 &self,
893 epoch: &Epoch,
894 key: BLSPubKey,
895 ) -> anyhow::Result<Validator<BLSPubKey>> {
896 let address = self.address(epoch, key)?;
897 let validators = self.validators(epoch)?;
898 validators
899 .get(&address)
900 .context("validator not found")
901 .cloned()
902 }
903
904 pub fn new_stake(
906 committee_members: Vec<PeerConfig<SeqTypes>>,
909 da_members: Vec<PeerConfig<SeqTypes>>,
910 fetcher: StakeTableFetcher,
911 ) -> Self {
912 let stake_table: Vec<_> = committee_members
914 .iter()
915 .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
916 .cloned()
917 .collect();
918
919 let eligible_leaders = stake_table.clone();
920 let da_members: Vec<_> = da_members
922 .iter()
923 .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
924 .cloned()
925 .collect();
926
927 let indexed_stake_table: HashMap<PubKey, _> = stake_table
929 .iter()
930 .map(|peer_config| {
931 (
932 PubKey::public_key(&peer_config.stake_table_entry),
933 peer_config.clone(),
934 )
935 })
936 .collect();
937
938 let indexed_da_members: HashMap<PubKey, _> = da_members
940 .iter()
941 .map(|peer_config| {
942 (
943 PubKey::public_key(&peer_config.stake_table_entry),
944 peer_config.clone(),
945 )
946 })
947 .collect();
948
949 let members = NonEpochCommittee {
950 eligible_leaders,
951 stake_table,
952 da_members,
953 indexed_stake_table,
954 indexed_da_members,
955 };
956
957 let mut map = HashMap::new();
958 let epoch_committee = EpochCommittee {
959 eligible_leaders: members.eligible_leaders.clone(),
960 stake_table: members
961 .stake_table
962 .iter()
963 .map(|x| (PubKey::public_key(&x.stake_table_entry), x.clone()))
964 .collect(),
965 validators: Default::default(),
966 address_mapping: HashMap::new(),
967 };
968 map.insert(Epoch::genesis(), epoch_committee.clone());
969 map.insert(Epoch::genesis() + 1u64, epoch_committee.clone());
971
972 Self {
973 non_epoch_committee: members,
974 state: map,
975 randomized_committees: BTreeMap::new(),
976 first_epoch: None,
977 fetcher: Arc::new(fetcher),
978 }
979 }
980
981 pub async fn reload_stake(&mut self, limit: u64) {
982 let loaded_stake = match self
985 .fetcher
986 .persistence
987 .lock()
988 .await
989 .load_latest_stake(limit)
990 .await
991 {
992 Ok(Some(loaded)) => loaded,
993 Ok(None) => {
994 tracing::warn!("No stake table history found in persistence!");
995 return;
996 },
997 Err(e) => {
998 tracing::error!("Failed to load stake table history from persistence: {}", e);
999 return;
1000 },
1001 };
1002
1003 for (epoch, stake_table) in loaded_stake {
1004 self.update_stake_table(epoch, stake_table);
1005 }
1006 }
1007
1008 fn get_stake_table(&self, epoch: &Option<Epoch>) -> Option<Vec<PeerConfig<SeqTypes>>> {
1009 if let Some(epoch) = epoch {
1010 self.state
1011 .get(epoch)
1012 .map(|committee| committee.stake_table.clone().into_values().collect())
1013 } else {
1014 Some(self.non_epoch_committee.stake_table.clone())
1015 }
1016 }
1017}
1018
1019#[derive(Error, Debug)]
1020enum GetStakeTablesError {
1022 #[error("Error fetching from L1: {0}")]
1023 L1ClientFetchError(anyhow::Error),
1024}
1025
1026#[derive(Error, Debug)]
1027#[error("Could not lookup leader")] pub struct LeaderLookupError;
1029
1030impl Membership<SeqTypes> for EpochCommittees {
1032 type Error = LeaderLookupError;
1033 fn new(
1035 _committee_members: Vec<PeerConfig<SeqTypes>>,
1038 _da_members: Vec<PeerConfig<SeqTypes>>,
1039 ) -> Self {
1040 panic!("This function has been replaced with new_stake()");
1041 }
1042
1043 fn stake_table(&self, epoch: Option<Epoch>) -> HSStakeTable<SeqTypes> {
1045 self.get_stake_table(&epoch).unwrap_or_default().into()
1046 }
1047 fn da_stake_table(&self, _epoch: Option<Epoch>) -> HSStakeTable<SeqTypes> {
1049 self.non_epoch_committee.da_members.clone().into()
1050 }
1051
1052 fn committee_members(
1054 &self,
1055 _view_number: <SeqTypes as NodeType>::View,
1056 epoch: Option<Epoch>,
1057 ) -> BTreeSet<PubKey> {
1058 let stake_table = self.stake_table(epoch);
1059 stake_table
1060 .iter()
1061 .map(|x| PubKey::public_key(&x.stake_table_entry))
1062 .collect()
1063 }
1064
1065 fn da_committee_members(
1067 &self,
1068 _view_number: <SeqTypes as NodeType>::View,
1069 _epoch: Option<Epoch>,
1070 ) -> BTreeSet<PubKey> {
1071 self.non_epoch_committee
1072 .indexed_da_members
1073 .clone()
1074 .into_keys()
1075 .collect()
1076 }
1077
1078 fn stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> Option<PeerConfig<SeqTypes>> {
1080 if let Some(epoch) = epoch {
1082 self.state
1083 .get(&epoch)
1084 .and_then(|h| h.stake_table.get(pub_key))
1085 .cloned()
1086 } else {
1087 self.non_epoch_committee
1088 .indexed_stake_table
1089 .get(pub_key)
1090 .cloned()
1091 }
1092 }
1093
1094 fn da_stake(&self, pub_key: &PubKey, _epoch: Option<Epoch>) -> Option<PeerConfig<SeqTypes>> {
1096 self.non_epoch_committee
1098 .indexed_da_members
1099 .get(pub_key)
1100 .cloned()
1101 }
1102
1103 fn has_stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> bool {
1105 self.stake(pub_key, epoch)
1106 .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1107 .unwrap_or_default()
1108 }
1109
1110 fn has_da_stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> bool {
1112 self.da_stake(pub_key, epoch)
1113 .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1114 .unwrap_or_default()
1115 }
1116
1117 fn lookup_leader(
1119 &self,
1120 view_number: <SeqTypes as NodeType>::View,
1121 epoch: Option<Epoch>,
1122 ) -> Result<PubKey, Self::Error> {
1123 if let Some(epoch) = epoch {
1124 let Some(randomized_committee) = self.randomized_committees.get(&epoch) else {
1125 tracing::error!(
1126 "We are missing the randomized committee for epoch {}",
1127 epoch
1128 );
1129 return Err(LeaderLookupError);
1130 };
1131
1132 Ok(PubKey::public_key(&select_randomized_leader(
1133 randomized_committee,
1134 *view_number,
1135 )))
1136 } else {
1137 let leaders = &self.non_epoch_committee.eligible_leaders;
1138
1139 let index = *view_number as usize % leaders.len();
1140 let res = leaders[index].clone();
1141 Ok(PubKey::public_key(&res.stake_table_entry))
1142 }
1143 }
1144
1145 fn total_nodes(&self, epoch: Option<Epoch>) -> usize {
1147 self.stake_table(epoch).len()
1148 }
1149
1150 fn da_total_nodes(&self, epoch: Option<Epoch>) -> usize {
1152 self.da_stake_table(epoch).len()
1153 }
1154
1155 fn success_threshold(&self, epoch: Option<Epoch>) -> U256 {
1157 let total_stake = self.total_stake(epoch);
1158 let one = U256::ONE;
1159 let two = U256::from(2);
1160 let three = U256::from(3);
1161 if total_stake < U256::MAX / two {
1162 ((total_stake * two) / three) + one
1163 } else {
1164 ((total_stake / three) * two) + two
1165 }
1166 }
1167
1168 fn da_success_threshold(&self, epoch: Option<Epoch>) -> U256 {
1170 let total_stake = self.total_da_stake(epoch);
1171 let one = U256::ONE;
1172 let two = U256::from(2);
1173 let three = U256::from(3);
1174
1175 if total_stake < U256::MAX / two {
1176 ((total_stake * two) / three) + one
1177 } else {
1178 ((total_stake / three) * two) + two
1179 }
1180 }
1181
1182 fn failure_threshold(&self, epoch: Option<Epoch>) -> U256 {
1184 let total_stake = self.total_stake(epoch);
1185 let one = U256::ONE;
1186 let three = U256::from(3);
1187
1188 (total_stake / three) + one
1189 }
1190
1191 fn upgrade_threshold(&self, epoch: Option<Epoch>) -> U256 {
1193 let total_stake = self.total_stake(epoch);
1194 let nine = U256::from(9);
1195 let ten = U256::from(10);
1196
1197 let normal_threshold = self.success_threshold(epoch);
1198 let higher_threshold = if total_stake < U256::MAX / nine {
1199 (total_stake * nine) / ten
1200 } else {
1201 (total_stake / ten) * nine
1202 };
1203
1204 max(higher_threshold, normal_threshold)
1205 }
1206
1207 #[allow(refining_impl_trait)]
1208 async fn add_epoch_root(
1209 &self,
1210 epoch: Epoch,
1211 block_header: Header,
1212 ) -> Option<Box<dyn FnOnce(&mut Self) + Send>> {
1213 if self.state.contains_key(&epoch) {
1214 tracing::info!(
1215 "We already have a the stake table for epoch {}. Skipping L1 fetching.",
1216 epoch
1217 );
1218 return None;
1219 }
1220
1221 let stake_tables = self.fetcher.fetch(epoch, block_header).await?;
1222 {
1224 let persistence_lock = self.fetcher.persistence.lock().await;
1225 if let Err(e) = persistence_lock
1226 .store_stake(epoch, stake_tables.clone())
1227 .await
1228 {
1229 tracing::error!(?e, "`add_epoch_root`, error storing stake table");
1230 }
1231 }
1232
1233 Some(Box::new(move |committee: &mut Self| {
1234 committee.update_stake_table(epoch, stake_tables);
1235 }))
1236 }
1237
1238 fn has_stake_table(&self, epoch: Epoch) -> bool {
1239 self.state.contains_key(&epoch)
1240 }
1241
1242 fn has_randomized_stake_table(&self, epoch: Epoch) -> bool {
1243 match self.first_epoch {
1244 None => true,
1245 Some(first_epoch) => {
1246 if epoch < first_epoch {
1247 self.state.contains_key(&epoch)
1248 } else {
1249 self.randomized_committees.contains_key(&epoch)
1250 }
1251 },
1252 }
1253 }
1254
1255 async fn get_epoch_root(
1256 membership: Arc<RwLock<Self>>,
1257 block_height: u64,
1258 epoch: Epoch,
1259 ) -> anyhow::Result<Leaf2> {
1260 let membership_reader = membership.read().await;
1261 let peers = membership_reader.fetcher.peers.clone();
1262 let stake_table = membership_reader.stake_table(Some(epoch)).clone();
1263 let success_threshold = membership_reader.success_threshold(Some(epoch));
1264 drop(membership_reader);
1265
1266 let leaf: Leaf2 = peers
1268 .fetch_leaf(block_height, stake_table.clone(), success_threshold)
1269 .await?;
1270
1271 Ok(leaf)
1272 }
1273
1274 async fn get_epoch_drb(
1275 membership: Arc<RwLock<Self>>,
1276 block_height: u64,
1277 epoch: Epoch,
1278 ) -> anyhow::Result<DrbResult> {
1279 let membership_reader = membership.read().await;
1280 let peers = membership_reader.fetcher.peers.clone();
1281 let stake_table = membership_reader.stake_table(Some(epoch)).clone();
1282 let success_threshold = membership_reader.success_threshold(Some(epoch));
1283 drop(membership_reader);
1284
1285 tracing::debug!(
1286 "Getting DRB for epoch {:?}, block height {:?}",
1287 epoch,
1288 block_height
1289 );
1290 let drb_leaf = peers
1291 .try_fetch_leaf(1, block_height, stake_table, success_threshold)
1292 .await?;
1293
1294 let Some(drb) = drb_leaf.next_drb_result else {
1295 tracing::error!(
1296 "We received a leaf that should contain a DRB result, but the DRB result is missing: {:?}",
1297 drb_leaf
1298 );
1299
1300 bail!("DRB leaf is missing the DRB result.");
1301 };
1302
1303 Ok(drb)
1304 }
1305
1306 fn add_drb_result(&mut self, epoch: Epoch, drb: DrbResult) {
1307 let Some(raw_stake_table) = self.state.get(&epoch) else {
1308 tracing::error!("add_drb_result({}, {:?}) was called, but we do not yet have the stake table for epoch {}", epoch, drb, epoch);
1309 return;
1310 };
1311
1312 let leaders = raw_stake_table
1313 .eligible_leaders
1314 .clone()
1315 .into_iter()
1316 .map(|peer_config| peer_config.stake_table_entry)
1317 .collect::<Vec<_>>();
1318 let randomized_committee = generate_stake_cdf(leaders, drb);
1319
1320 self.randomized_committees
1321 .insert(epoch, randomized_committee);
1322 }
1323
1324 fn set_first_epoch(&mut self, epoch: Epoch, initial_drb_result: DrbResult) {
1325 self.first_epoch = Some(epoch);
1326
1327 let epoch_committee = self.state.get(&Epoch::genesis()).unwrap().clone();
1328 self.state.insert(epoch, epoch_committee.clone());
1329 self.state.insert(epoch + 1, epoch_committee);
1330 self.add_drb_result(epoch, initial_drb_result);
1331 self.add_drb_result(epoch + 1, initial_drb_result);
1332 }
1333}
1334
1335#[cfg(any(test, feature = "testing"))]
1336impl super::v0_3::StakeTable {
1337 pub fn mock(n: u64) -> Self {
1339 [..n]
1340 .iter()
1341 .map(|_| PeerConfig::default())
1342 .collect::<Vec<PeerConfig<SeqTypes>>>()
1343 .into()
1344 }
1345}
1346
1347#[cfg(any(test, feature = "testing"))]
1348impl DAMembers {
1349 pub fn mock(n: u64) -> Self {
1351 [..n]
1352 .iter()
1353 .map(|_| PeerConfig::default())
1354 .collect::<Vec<PeerConfig<SeqTypes>>>()
1355 .into()
1356 }
1357}
1358
1359#[cfg(any(test, feature = "testing"))]
1360pub mod testing {
1361 use hotshot_contract_adapter::sol_types::{EdOnBN254PointSol, G2PointSol};
1362 use hotshot_types::light_client::StateKeyPair;
1363 use rand::{Rng as _, RngCore as _};
1364
1365 use super::*;
1366
1367 pub struct TestValidator {
1370 pub account: Address,
1371 pub bls_vk: G2PointSol,
1372 pub schnorr_vk: EdOnBN254PointSol,
1373 pub commission: u16,
1374 }
1375
1376 impl TestValidator {
1377 pub fn random() -> Self {
1378 let rng = &mut rand::thread_rng();
1379 let mut seed = [0u8; 32];
1380 rng.fill_bytes(&mut seed);
1381
1382 let (bls_vk, _) = BLSPubKey::generated_from_seed_indexed(seed, 0);
1383 let schnorr_vk: EdOnBN254PointSol = StateKeyPair::generate_from_seed_indexed(seed, 0)
1384 .ver_key()
1385 .to_affine()
1386 .into();
1387
1388 Self {
1389 account: Address::random(),
1390 bls_vk: bls_vk.to_affine().into(),
1391 schnorr_vk,
1392 commission: rng.gen_range(0..10000),
1393 }
1394 }
1395 }
1396
1397 impl Validator<BLSPubKey> {
1398 pub fn mock() -> Validator<BLSPubKey> {
1399 let val = TestValidator::random();
1400 let rng = &mut rand::thread_rng();
1401 let mut seed = [1u8; 32];
1402 rng.fill_bytes(&mut seed);
1403 let mut validator_stake = alloy::primitives::U256::from(0);
1404 let mut delegators = HashMap::new();
1405 for _i in 0..=5000 {
1406 let stake: u64 = rng.gen_range(0..10000);
1407 delegators.insert(Address::random(), alloy::primitives::U256::from(stake));
1408 validator_stake += alloy::primitives::U256::from(stake);
1409 }
1410
1411 let stake_table_key = val.bls_vk.clone().into();
1412 let state_ver_key = val.schnorr_vk.clone().into();
1413
1414 Validator {
1415 account: val.account,
1416 stake_table_key,
1417 state_ver_key,
1418 stake: validator_stake,
1419 commission: val.commission,
1420 delegators,
1421 }
1422 }
1423 }
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428 use alloy::primitives::Address;
1429 use sequencer_utils::test_utils::setup_test;
1430
1431 use super::*;
1432 use crate::v0::impls::testing::*;
1433
1434 #[test]
1435 fn test_from_l1_events() -> anyhow::Result<()> {
1436 setup_test();
1437 let val = TestValidator::random();
1439 let val_new_keys = TestValidator::random();
1440 let delegator = Address::random();
1441 let mut events: Vec<StakeTableEvent> = [
1442 ValidatorRegistered {
1443 account: val.account,
1444 blsVk: val.bls_vk.clone(),
1445 schnorrVk: val.schnorr_vk.clone(),
1446 commission: val.commission,
1447 }
1448 .into(),
1449 Delegated {
1450 delegator,
1451 validator: val.account,
1452 amount: U256::from(10),
1453 }
1454 .into(),
1455 ConsensusKeysUpdated {
1456 account: val.account,
1457 blsVK: val_new_keys.bls_vk.clone(),
1458 schnorrVK: val_new_keys.schnorr_vk.clone(),
1459 }
1460 .into(),
1461 Undelegated {
1462 delegator,
1463 validator: val.account,
1464 amount: U256::from(7),
1465 }
1466 .into(),
1467 Delegated {
1469 delegator,
1470 validator: val.account,
1471 amount: U256::from(5),
1472 }
1473 .into(),
1474 ]
1475 .to_vec();
1476
1477 let st = active_validator_set_from_l1_events(events.iter().cloned())?;
1478 let st_val = st.get(&val.account).unwrap();
1479 assert_eq!(st_val.stake, U256::from(8));
1481 assert_eq!(st_val.commission, val.commission);
1482 assert_eq!(st_val.delegators.len(), 1);
1483 assert_eq!(*st_val.delegators.get(&delegator).unwrap(), U256::from(8));
1485
1486 events.push(
1487 ValidatorExit {
1488 validator: val.account,
1489 }
1490 .into(),
1491 );
1492
1493 assert!(active_validator_set_from_l1_events(events.iter().cloned()).is_err());
1495
1496 Ok(())
1497 }
1498
1499 #[test]
1500 fn test_from_l1_events_failures() -> anyhow::Result<()> {
1501 let val = TestValidator::random();
1502 let delegator = Address::random();
1503
1504 let register: StakeTableEvent = ValidatorRegistered {
1505 account: val.account,
1506 blsVk: val.bls_vk.clone(),
1507 schnorrVk: val.schnorr_vk.clone(),
1508 commission: val.commission,
1509 }
1510 .into();
1511 let delegate: StakeTableEvent = Delegated {
1512 delegator,
1513 validator: val.account,
1514 amount: U256::from(10),
1515 }
1516 .into();
1517 let key_update: StakeTableEvent = ConsensusKeysUpdated {
1518 account: val.account,
1519 blsVK: val.bls_vk.clone(),
1520 schnorrVK: val.schnorr_vk.clone(),
1521 }
1522 .into();
1523 let undelegate: StakeTableEvent = Undelegated {
1524 delegator,
1525 validator: val.account,
1526 amount: U256::from(7),
1527 }
1528 .into();
1529
1530 let exit: StakeTableEvent = ValidatorExit {
1531 validator: val.account,
1532 }
1533 .into();
1534
1535 let cases = [
1536 vec![exit],
1537 vec![undelegate.clone()],
1538 vec![delegate.clone()],
1539 vec![key_update],
1540 vec![register.clone(), register.clone()],
1541 vec![register, delegate, undelegate.clone(), undelegate],
1542 ];
1543
1544 for events in cases.iter() {
1545 let res = active_validator_set_from_l1_events(events.iter().cloned());
1546 assert!(
1547 res.is_err(),
1548 "events {:?}, not a valid sequencer of events",
1549 res
1550 );
1551 }
1552 Ok(())
1553 }
1554
1555 #[test]
1556 fn test_validators_selection() {
1557 let mut validators = IndexMap::new();
1558 let mut highest_stake = alloy::primitives::U256::ZERO;
1559
1560 for _i in 0..3000 {
1561 let validator = Validator::mock();
1562 validators.insert(validator.account, validator.clone());
1563
1564 if validator.stake > highest_stake {
1565 highest_stake = validator.stake;
1566 }
1567 }
1568
1569 let minimum_stake = highest_stake / U256::from(VID_TARGET_TOTAL_STAKE);
1570
1571 select_active_validator_set(&mut validators).expect("Failed to select validators");
1572 assert!(
1573 validators.len() <= 100,
1574 "validators len is {}, expected at most 100",
1575 validators.len()
1576 );
1577
1578 let mut selected_validators_highest_stake = alloy::primitives::U256::ZERO;
1579 for (address, validator) in &validators {
1581 assert!(
1582 validator.stake >= minimum_stake,
1583 "Validator {:?} has stake below minimum: {}",
1584 address,
1585 validator.stake
1586 );
1587
1588 if validator.stake > selected_validators_highest_stake {
1589 selected_validators_highest_stake = validator.stake;
1590 }
1591 }
1592 }
1593}