1use std::{
2 cmp::{max, min},
3 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
4 future::Future,
5 sync::Arc,
6 time::{Duration, Instant},
7};
8
9use alloy::{
10 eips::{BlockId, BlockNumberOrTag},
11 primitives::{utils::format_ether, Address, U256},
12 providers::Provider,
13 rpc::types::Log,
14};
15use anyhow::{bail, ensure, Context};
16use async_lock::{Mutex, RwLock};
17use committable::Committable;
18use futures::{
19 future::BoxFuture,
20 stream::{self, StreamExt},
21};
22use hotshot::types::{BLSPubKey, SchnorrPubKey, SignatureKey as _};
23use hotshot_contract_adapter::sol_types::{
24 EspToken::{self, EspTokenInstance},
25 StakeTableV2::{
26 self, ConsensusKeysUpdated, ConsensusKeysUpdatedV2, Delegated, Undelegated, ValidatorExit,
27 ValidatorRegistered, ValidatorRegisteredV2,
28 },
29};
30use hotshot_types::{
31 data::{vid_disperse::VID_TARGET_TOTAL_STAKE, EpochNumber},
32 drb::{
33 election::{generate_stake_cdf, select_randomized_leader, RandomizedCommittee},
34 DrbResult,
35 },
36 stake_table::{HSStakeTable, StakeTableEntry},
37 traits::{
38 election::Membership,
39 node_implementation::{ConsensusTime, NodeType},
40 signature_key::StakeTableEntryType,
41 },
42 PeerConfig,
43};
44use humantime::format_duration;
45use indexmap::IndexMap;
46use thiserror::Error;
47use tokio::{spawn, time::sleep};
48use tracing::Instrument;
49
50#[cfg(any(test, feature = "testing"))]
51use super::v0_3::DAMembers;
52use super::{
53 traits::{MembershipPersistence, StateCatchup},
54 v0_3::{ChainConfig, EventKey, Fetcher, StakeTableEvent, StakeTableUpdateTask, Validator},
55 Header, L1Client, Leaf2, PubKey, SeqTypes,
56};
57use crate::{
58 traits::EventsPersistenceRead,
59 v0_1::{L1Provider, RewardAmount, BLOCKS_PER_YEAR, COMMISSION_BASIS_POINTS, INFLATION_RATE},
60 v0_3::{EventSortingError, ExpectedStakeTableError, FetchRewardError, StakeTableError},
61};
62
63type Epoch = <SeqTypes as NodeType>::Epoch;
64pub type ValidatorMap = IndexMap<Address, Validator<BLSPubKey>>;
65type ApplyEventResult<T> = Result<Result<T, ExpectedStakeTableError>, StakeTableError>;
70
71trait DisplayLog {
73 fn display(&self) -> String;
74}
75
76impl DisplayLog for Log {
77 fn display(&self) -> String {
78 let block = self.block_number.unwrap_or_default();
82 let index = self.log_index.unwrap_or_default();
83 let hash = self.transaction_hash.unwrap_or_default();
84 format!("Log(block={block},index={index},transaction_hash={hash})")
85 }
86}
87
88#[derive(Clone, PartialEq)]
89pub struct StakeTableEvents {
90 registrations: Vec<(ValidatorRegistered, Log)>,
91 registrations_v2: Vec<(ValidatorRegisteredV2, Log)>,
92 deregistrations: Vec<(ValidatorExit, Log)>,
93 delegated: Vec<(Delegated, Log)>,
94 undelegated: Vec<(Undelegated, Log)>,
95 keys: Vec<(ConsensusKeysUpdated, Log)>,
96 keys_v2: Vec<(ConsensusKeysUpdatedV2, Log)>,
97}
98
99impl StakeTableEvents {
100 fn from_l1_logs(
104 registrations: Vec<(ValidatorRegistered, Log)>,
105 registrations_v2: Vec<(ValidatorRegisteredV2, Log)>,
106 deregistrations: Vec<(ValidatorExit, Log)>,
107 delegated: Vec<(Delegated, Log)>,
108 undelegated: Vec<(Undelegated, Log)>,
109 keys: Vec<(ConsensusKeysUpdated, Log)>,
110 keys_v2: Vec<(ConsensusKeysUpdatedV2, Log)>,
111 ) -> Self {
112 let registrations_v2 = registrations_v2
113 .into_iter()
114 .filter(|(event, log)| {
115 event
116 .authenticate()
117 .map_err(|_| {
118 tracing::warn!(
119 "Failed to authenticate ValidatorRegisteredV2 event {}",
120 log.display()
121 );
122 })
123 .is_ok()
124 })
125 .collect();
126 let keys_v2 = keys_v2
127 .into_iter()
128 .filter(|(event, log)| {
129 event
130 .authenticate()
131 .map_err(|_| {
132 tracing::warn!(
133 "Failed to authenticate ConsensusKeysUpdatedV2 event {}",
134 log.display()
135 );
136 })
137 .is_ok()
138 })
139 .collect();
140 Self {
141 registrations,
142 registrations_v2,
143 deregistrations,
144 delegated,
145 undelegated,
146 keys,
147 keys_v2,
148 }
149 }
150
151 pub fn sort_events(self) -> Result<Vec<(EventKey, StakeTableEvent)>, EventSortingError> {
152 let mut events: Vec<(EventKey, StakeTableEvent)> = Vec::new();
153 let Self {
154 registrations,
155 registrations_v2,
156 deregistrations,
157 delegated,
158 undelegated,
159 keys,
160 keys_v2,
161 } = self;
162
163 let key = |log: &Log| -> Result<EventKey, EventSortingError> {
164 let block_number = log
165 .block_number
166 .ok_or(EventSortingError::MissingBlockNumber)?;
167 let log_index = log.log_index.ok_or(EventSortingError::MissingLogIndex)?;
168 Ok((block_number, log_index))
169 };
170
171 for (registration, log) in registrations {
172 events.push((key(&log)?, registration.into()));
173 }
174 for (registration, log) in registrations_v2 {
175 events.push((key(&log)?, registration.into()));
176 }
177 for (dereg, log) in deregistrations {
178 events.push((key(&log)?, dereg.into()));
179 }
180 for (delegation, log) in delegated {
181 events.push((key(&log)?, delegation.into()));
182 }
183 for (undelegated, log) in undelegated {
184 events.push((key(&log)?, undelegated.into()));
185 }
186 for (update, log) in keys {
187 events.push((key(&log)?, update.into()));
188 }
189 for (update, log) in keys_v2 {
190 events.push((key(&log)?, update.into()));
191 }
192
193 events.sort_by_key(|(key, _)| *key);
194 Ok(events)
195 }
196}
197
198#[derive(Debug)]
199pub struct StakeTableState {
200 validators: ValidatorMap,
201 used_bls_keys: HashSet<BLSPubKey>,
202 used_schnorr_keys: HashSet<SchnorrPubKey>,
203}
204
205impl StakeTableState {
206 pub fn new() -> Self {
207 Self {
208 validators: IndexMap::new(),
209 used_bls_keys: HashSet::new(),
210 used_schnorr_keys: HashSet::new(),
211 }
212 }
213
214 pub fn get_validators(self) -> ValidatorMap {
215 self.validators
216 }
217
218 pub fn apply_event(&mut self, event: StakeTableEvent) -> ApplyEventResult<()> {
219 match event {
220 StakeTableEvent::Register(ValidatorRegistered {
221 account,
222 blsVk,
223 schnorrVk,
224 commission,
225 }) => {
226 let stake_table_key: BLSPubKey = blsVk.into();
227 let state_ver_key: SchnorrPubKey = schnorrVk.into();
228
229 let entry = self.validators.entry(account);
230 if let indexmap::map::Entry::Occupied(_) = entry {
231 return Err(StakeTableError::AlreadyRegistered(account));
232 }
233
234 if !self.used_bls_keys.insert(stake_table_key) {
236 return Err(StakeTableError::BlsKeyAlreadyUsed(
237 stake_table_key.to_string(),
238 ));
239 }
240
241 if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
243 return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
244 state_ver_key.to_string(),
245 )));
246 }
247
248 entry.or_insert(Validator {
249 account,
250 stake_table_key,
251 state_ver_key,
252 stake: U256::ZERO,
253 commission,
254 delegators: HashMap::new(),
255 });
256 },
257
258 StakeTableEvent::RegisterV2(reg) => {
259 reg.authenticate()
262 .map_err(|e| StakeTableError::AuthenticationFailed(e.to_string()))?;
263
264 let ValidatorRegisteredV2 {
265 account,
266 blsVK,
267 schnorrVK,
268 commission,
269 ..
270 } = reg;
271
272 let stake_table_key: BLSPubKey = blsVK.into();
273 let state_ver_key: SchnorrPubKey = schnorrVK.into();
274
275 let entry = self.validators.entry(account);
276 if let indexmap::map::Entry::Occupied(_) = entry {
277 return Err(StakeTableError::AlreadyRegistered(account));
278 }
279
280 if !self.used_bls_keys.insert(stake_table_key) {
282 return Err(StakeTableError::BlsKeyAlreadyUsed(
283 stake_table_key.to_string(),
284 ));
285 }
286
287 if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
289 return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
290 state_ver_key.to_string(),
291 )));
292 }
293
294 entry.or_insert(Validator {
295 account,
296 stake_table_key,
297 state_ver_key,
298 stake: U256::ZERO,
299 commission,
300 delegators: HashMap::new(),
301 });
302 },
303
304 StakeTableEvent::Deregister(exit) => {
305 self.validators
306 .shift_remove(&exit.validator)
307 .ok_or(StakeTableError::ValidatorNotFound(exit.validator))?;
308 },
309
310 StakeTableEvent::Delegate(delegated) => {
311 let Delegated {
312 delegator,
313 validator,
314 amount,
315 } = delegated;
316
317 let val = self
318 .validators
319 .get_mut(&validator)
320 .ok_or(StakeTableError::ValidatorNotFound(validator))?;
321
322 if amount.is_zero() {
323 return Err(StakeTableError::ZeroDelegatorStake(delegator));
324 }
325
326 val.stake += amount;
327 val.delegators
330 .entry(delegator)
331 .and_modify(|stake| *stake += amount)
332 .or_insert(amount);
333 },
334
335 StakeTableEvent::Undelegate(undelegated) => {
336 let Undelegated {
337 delegator,
338 validator,
339 amount,
340 } = undelegated;
341
342 let val = self
343 .validators
344 .get_mut(&validator)
345 .ok_or(StakeTableError::ValidatorNotFound(validator))?;
346
347 val.stake = val
348 .stake
349 .checked_sub(amount)
350 .ok_or(StakeTableError::InsufficientStake)?;
351
352 let delegator_stake = val
353 .delegators
354 .get_mut(&delegator)
355 .ok_or(StakeTableError::DelegatorNotFound(delegator))?;
356
357 *delegator_stake = delegator_stake
358 .checked_sub(amount)
359 .ok_or(StakeTableError::InsufficientStake)?;
360
361 if delegator_stake.is_zero() {
362 val.delegators.remove(&delegator);
363 }
364 },
365
366 StakeTableEvent::KeyUpdate(update) => {
367 let ConsensusKeysUpdated {
368 account,
369 blsVK,
370 schnorrVK,
371 } = update;
372
373 let validator = self
374 .validators
375 .get_mut(&account)
376 .ok_or(StakeTableError::ValidatorNotFound(account))?;
377
378 let stake_table_key: BLSPubKey = blsVK.into();
379 let state_ver_key: SchnorrPubKey = schnorrVK.into();
380
381 if !self.used_bls_keys.insert(stake_table_key) {
382 return Err(StakeTableError::BlsKeyAlreadyUsed(
383 stake_table_key.to_string(),
384 ));
385 }
386
387 if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
390 return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
391 state_ver_key.to_string(),
392 )));
393 }
394
395 validator.stake_table_key = stake_table_key;
396 validator.state_ver_key = state_ver_key;
397 },
398
399 StakeTableEvent::KeyUpdateV2(update) => {
400 update
403 .authenticate()
404 .map_err(|e| StakeTableError::AuthenticationFailed(e.to_string()))?;
405
406 let ConsensusKeysUpdatedV2 {
407 account,
408 blsVK,
409 schnorrVK,
410 ..
411 } = update;
412
413 let validator = self
414 .validators
415 .get_mut(&account)
416 .ok_or(StakeTableError::ValidatorNotFound(account))?;
417
418 let stake_table_key: BLSPubKey = blsVK.into();
419 let state_ver_key: SchnorrPubKey = schnorrVK.into();
420
421 if !self.used_bls_keys.insert(stake_table_key) {
423 return Err(StakeTableError::BlsKeyAlreadyUsed(
424 stake_table_key.to_string(),
425 ));
426 }
427
428 if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
431 return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
432 state_ver_key.to_string(),
433 )));
434 }
435
436 validator.stake_table_key = stake_table_key;
437 validator.state_ver_key = state_ver_key;
438 },
439 }
440
441 Ok(Ok(()))
442 }
443}
444
445pub fn validators_from_l1_events<I: Iterator<Item = StakeTableEvent>>(
446 events: I,
447) -> Result<ValidatorMap, StakeTableError> {
448 let mut state = StakeTableState::new();
449 for event in events {
450 match state.apply_event(event.clone()) {
451 Ok(Ok(())) => (), Ok(Err(expected_err)) => {
453 tracing::warn!("Expected error while applying event {event:?}: {expected_err}");
455 },
456 Err(err) => {
457 tracing::error!("Fatal error in applying event {event:?}: {err}");
459 return Err(err);
460 },
461 }
462 }
463 Ok(state.get_validators())
464}
465
466pub(crate) fn select_active_validator_set(
470 validators: &mut ValidatorMap,
471) -> Result<(), StakeTableError> {
472 let total_validators = validators.len();
473
474 validators.retain(|address, validator| {
476 if validator.delegators.is_empty() {
477 tracing::info!("Validator {address:?} does not have any delegator");
478 return false;
479 }
480
481 if validator.stake.is_zero() {
482 tracing::info!("Validator {address:?} does not have any stake");
483 return false;
484 }
485
486 true
487 });
488
489 tracing::debug!(
490 total_validators,
491 filtered = validators.len(),
492 "Filtered out invalid validators"
493 );
494
495 if validators.is_empty() {
496 tracing::warn!("Validator selection failed: no validators passed minimum criteria");
497 return Err(StakeTableError::NoValidValidators);
498 }
499
500 let maximum_stake = validators.values().map(|v| v.stake).max().ok_or_else(|| {
501 tracing::error!("Could not compute maximum stake from filtered validators");
502 StakeTableError::MissingMaximumStake
503 })?;
504
505 let minimum_stake = maximum_stake
506 .checked_div(U256::from(VID_TARGET_TOTAL_STAKE))
507 .ok_or_else(|| {
508 tracing::error!("Overflow while calculating minimum stake threshold");
509 StakeTableError::MinimumStakeOverflow
510 })?;
511
512 let mut valid_stakers: Vec<_> = validators
513 .iter()
514 .filter(|(_, v)| v.stake >= minimum_stake)
515 .map(|(addr, v)| (*addr, v.stake))
516 .collect();
517
518 tracing::info!(
519 count = valid_stakers.len(),
520 "Number of validators above minimum stake threshold"
521 );
522
523 valid_stakers.sort_by_key(|(_, stake)| std::cmp::Reverse(*stake));
525
526 if valid_stakers.len() > 100 {
527 valid_stakers.truncate(100);
528 }
529
530 let selected_addresses: HashSet<_> = valid_stakers.iter().map(|(addr, _)| *addr).collect();
532 validators.retain(|address, _| selected_addresses.contains(address));
533
534 tracing::info!(
535 final_count = validators.len(),
536 "Selected active validator set"
537 );
538
539 Ok(())
540}
541
542pub(crate) fn active_validator_set_from_l1_events<I: Iterator<Item = StakeTableEvent>>(
544 events: I,
545) -> Result<ValidatorMap, StakeTableError> {
546 let mut validators = validators_from_l1_events(events)?;
547 select_active_validator_set(&mut validators)?;
548 Ok(validators)
549}
550
551impl std::fmt::Debug for StakeTableEvent {
552 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
553 match self {
554 StakeTableEvent::Register(event) => write!(f, "Register({:?})", event.account),
555 StakeTableEvent::RegisterV2(event) => write!(f, "RegisterV2({:?})", event.account),
556 StakeTableEvent::Deregister(event) => write!(f, "Deregister({:?})", event.validator),
557 StakeTableEvent::Delegate(event) => write!(f, "Delegate({:?})", event.delegator),
558 StakeTableEvent::Undelegate(event) => write!(f, "Undelegate({:?})", event.delegator),
559 StakeTableEvent::KeyUpdate(event) => write!(f, "KeyUpdate({:?})", event.account),
560 StakeTableEvent::KeyUpdateV2(event) => write!(f, "KeyUpdateV2({:?})", event.account),
561 }
562 }
563}
564
565#[derive(Clone, derive_more::derive::Debug)]
566pub struct EpochCommittees {
568 non_epoch_committee: NonEpochCommittee,
570 state: HashMap<Epoch, EpochCommittee>,
572 randomized_committees: BTreeMap<Epoch, RandomizedCommittee<StakeTableEntry<PubKey>>>,
574 first_epoch: Option<Epoch>,
575 block_reward: RewardAmount,
576 fetcher: Arc<Fetcher>,
577}
578
579impl Fetcher {
580 pub fn new(
581 peers: Arc<dyn StateCatchup>,
582 persistence: Arc<Mutex<dyn MembershipPersistence>>,
583 l1_client: L1Client,
584 chain_config: ChainConfig,
585 ) -> Self {
586 Self {
587 peers,
588 persistence,
589 l1_client,
590 chain_config: Arc::new(Mutex::new(chain_config)),
591 update_task: StakeTableUpdateTask(Mutex::new(None)).into(),
592 }
593 }
594
595 pub async fn spawn_update_loop(&self) {
596 let mut update_task = self.update_task.0.lock().await;
597 if update_task.is_none() {
598 *update_task = Some(spawn(self.update_loop()));
599 }
600 }
601
602 fn update_loop(&self) -> impl Future<Output = ()> {
607 let span = tracing::warn_span!("Stake table update loop");
608 let self_clone = self.clone();
609 let state = self.l1_client.state.clone();
610 let l1_retry = self.l1_client.options().l1_retry_delay;
611 let update_delay = self.l1_client.options().stake_table_update_interval;
612 let chain_config = self.chain_config.clone();
613
614 async move {
615 let stake_contract_address = loop {
620 match chain_config.lock().await.stake_table_contract {
621 Some(addr) => break addr,
622 None => {
623 tracing::debug!(
624 "Stake table contract address not found. Retrying in {l1_retry:?}...",
625 );
626 },
627 }
628 sleep(l1_retry).await;
629 };
630
631 loop {
633 let finalized_block = loop {
634 if let Some(block) = state.lock().await.last_finalized {
635 break block;
636 }
637 tracing::debug!("Finalized block not yet available. Retrying in {l1_retry:?}",);
638 sleep(l1_retry).await;
639 };
640
641 tracing::debug!("Attempting to fetch stake table at L1 block {finalized_block:?}",);
642
643 loop {
644 match self_clone
645 .fetch_and_store_stake_table_events(stake_contract_address, finalized_block)
646 .await
647 {
648 Ok(events) => {
649 tracing::info!(
650 "Successfully fetched and stored stake table events at \
651 block={finalized_block:?}"
652 );
653 tracing::debug!("events={events:?}");
654 break;
655 },
656 Err(e) => {
657 tracing::error!(
658 "Error fetching stake table at block {finalized_block:?}. err= \
659 {e:#}",
660 );
661 sleep(l1_retry).await;
662 },
663 }
664 }
665
666 tracing::debug!("Waiting {update_delay:?} before next stake table update...",);
667 sleep(update_delay).await;
668 }
669 }
670 .instrument(span)
671 }
672
673 pub async fn fetch_events(
674 &self,
675 contract: Address,
676 to_block: u64,
677 ) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
678 let persistence_lock = self.persistence.lock().await;
679 let (read_l1_offset, persistence_events) = persistence_lock.load_events(to_block).await?;
680 drop(persistence_lock);
681
682 tracing::info!("loaded events from storage to_block={to_block:?}");
683
684 if let Some(EventsPersistenceRead::Complete) = read_l1_offset {
687 return Ok(persistence_events);
688 }
689
690 let from_block = read_l1_offset
691 .map(|read| match read {
692 EventsPersistenceRead::UntilL1Block(block) => Ok(block + 1),
693 EventsPersistenceRead::Complete => Err(anyhow::anyhow!(
694 "Unexpected state. offset is complete after returning early"
695 )),
696 })
697 .transpose()?;
698
699 ensure!(
700 Some(to_block) >= from_block,
701 "to_block {to_block:?} is less than from_block {from_block:?}"
702 );
703
704 tracing::info!(%to_block, from_block = ?from_block, "Fetching events from contract");
705
706 let contract_events = Self::fetch_events_from_contract(
707 self.l1_client.clone(),
708 contract,
709 from_block,
710 to_block,
711 )
712 .await;
713
714 let contract_events = contract_events.sort_events()?;
715 let mut events = match from_block {
716 Some(_) => persistence_events
717 .into_iter()
718 .chain(contract_events)
719 .collect(),
720 None => contract_events,
721 };
722
723 let len_before_dedup = events.len();
728 events.dedup();
729 let len_after_dedup = events.len();
730 if len_before_dedup != len_after_dedup {
731 tracing::warn!("Duplicate events found and removed. This should not normally happen.")
732 }
733
734 Ok(events)
735 }
736
737 pub async fn fetch_events_from_contract(
739 l1_client: L1Client,
740 contract: Address,
741 from_block: Option<u64>,
742 to_block: u64,
743 ) -> StakeTableEvents {
744 let stake_table_contract = StakeTableV2::new(contract, l1_client.provider.clone());
745 let max_retry_duration = l1_client.options().l1_events_max_retry_duration;
746 let from_block = match from_block {
749 Some(block) => block,
750 None => {
751 let start = Instant::now();
752
753 loop {
754 match stake_table_contract.initializedAtBlock().call().await {
755 Ok(init_block) => break init_block._0.to::<u64>(),
756 Err(err) => {
757 if start.elapsed() >= max_retry_duration {
758 panic!(
759 "Failed to retrieve initial block after `{}`: {err}",
760 format_duration(max_retry_duration)
761 );
762 }
763 tracing::warn!(%err, "Failed to retrieve initial block, retrying...");
764 sleep(l1_client.options().l1_retry_delay).await;
765 },
766 }
767 }
768 },
769 };
770
771 let mut start = from_block;
775 let end = to_block;
776 let chunk_size = l1_client.options().l1_events_max_block_range;
777 let chunks = std::iter::from_fn(move || {
778 let chunk_end = min(start + chunk_size - 1, end);
779 if chunk_end < start {
780 return None;
781 }
782
783 let chunk = (start, chunk_end);
784 start = chunk_end + 1;
785 Some(chunk)
786 });
787
788 let registered_events = stream::iter(chunks.clone()).then(|(from, to)| {
791 let retry_delay = l1_client.options().l1_retry_delay;
792 let stake_table_contract = stake_table_contract.clone();
793 async move {
794 tracing::debug!(from, to, "fetch ValidatorRegistered events in range");
795 let events = retry(
796 retry_delay,
797 max_retry_duration,
798 "ValidatorRegistered event fetch",
799 move || {
800 let stake_table_contract = stake_table_contract.clone();
801 Box::pin(async move {
802 stake_table_contract
803 .ValidatorRegistered_filter()
804 .from_block(from)
805 .to_block(to)
806 .query()
807 .await
808 })
809 },
810 )
811 .await;
812 stream::iter(events)
813 }
814 });
815
816 let registered_events_v2 = stream::iter(chunks.clone()).then(|(from, to)| {
819 let retry_delay = l1_client.options().l1_retry_delay;
820 let max_retry_duration = l1_client.options().l1_events_max_retry_duration;
821 let stake_table_contract = stake_table_contract.clone();
822 async move {
823 tracing::debug!(from, to, "fetch ValidatorRegisteredV2 events in range");
824 let events = retry(
825 retry_delay,
826 max_retry_duration,
827 "ValidatorRegisteredV2 event fetch",
828 move || {
829 let stake_table_contract = stake_table_contract.clone();
830 Box::pin(async move {
831 stake_table_contract
832 .ValidatorRegisteredV2_filter()
833 .from_block(from)
834 .to_block(to)
835 .query()
836 .await
837 })
838 },
839 )
840 .await;
841
842 stream::iter(events.into_iter().filter(|(event, log)| {
843 if let Err(e) = event.authenticate() {
844 tracing::warn!(
845 %e,
846 "Failed to authenticate ValidatorRegisteredV2 event: {}",
847 log.display()
848 );
849 return false;
850 }
851 true
852 }))
853 }
854 });
855
856 let deregistered_events = stream::iter(chunks.clone()).then(|(from, to)| {
858 let retry_delay = l1_client.options().l1_retry_delay;
859 let stake_table_contract = stake_table_contract.clone();
860 async move {
861 tracing::debug!(from, to, "fetch ValidatorExit events in range");
862 let events = retry(
863 retry_delay,
864 max_retry_duration,
865 "ValidatorExit event fetch",
866 move || {
867 let stake_table_contract = stake_table_contract.clone();
868 Box::pin(async move {
869 stake_table_contract
870 .ValidatorExit_filter()
871 .from_block(from)
872 .to_block(to)
873 .query()
874 .await
875 })
876 },
877 )
878 .await;
879 stream::iter(events)
880 }
881 });
882
883 let delegated_events = stream::iter(chunks.clone()).then(|(from, to)| {
885 let retry_delay = l1_client.options().l1_retry_delay;
886 let stake_table_contract = stake_table_contract.clone();
887 async move {
888 tracing::debug!(from, to, "fetch Delegated events in range");
889 let events = retry(
890 retry_delay,
891 max_retry_duration,
892 "Delegated event fetch",
893 move || {
894 let stake_table_contract = stake_table_contract.clone();
895 Box::pin(async move {
896 stake_table_contract
897 .Delegated_filter()
898 .from_block(from)
899 .to_block(to)
900 .query()
901 .await
902 })
903 },
904 )
905 .await;
906 stream::iter(events)
907 }
908 });
909
910 let undelegated_events = stream::iter(chunks.clone()).then(|(from, to)| {
912 let retry_delay = l1_client.options().l1_retry_delay;
913 let stake_table_contract = stake_table_contract.clone();
914 async move {
915 tracing::debug!(from, to, "fetch Undelegated events in range");
916 let events = retry(
917 retry_delay,
918 max_retry_duration,
919 "Undelegated event fetch",
920 move || {
921 let stake_table_contract = stake_table_contract.clone();
922 Box::pin(async move {
923 stake_table_contract
924 .Undelegated_filter()
925 .from_block(from)
926 .to_block(to)
927 .query()
928 .await
929 })
930 },
931 )
932 .await;
933 stream::iter(events)
934 }
935 });
936 let keys_update_events = stream::iter(chunks.clone()).then(|(from, to)| {
938 let retry_delay = l1_client.options().l1_retry_delay;
939 let stake_table_contract = stake_table_contract.clone();
940 async move {
941 tracing::debug!(from, to, "fetch ConsensusKeysUpdated events in range");
942 let events = retry(
943 retry_delay,
944 max_retry_duration,
945 "ConsensusKeysUpdated event fetch",
946 move || {
947 let stake_table_contract = stake_table_contract.clone();
948 Box::pin(async move {
949 stake_table_contract
950 .ConsensusKeysUpdated_filter()
951 .from_block(from)
952 .to_block(to)
953 .query()
954 .await
955 })
956 },
957 )
958 .await;
959 stream::iter(events)
960 }
961 });
962 let keys_update_events_v2 = stream::iter(chunks).then(|(from, to)| {
964 let retry_delay = l1_client.options().l1_retry_delay;
965 let stake_table_contract = stake_table_contract.clone();
966 async move {
967 tracing::debug!(from, to, "fetch ConsensusKeysUpdatedV2 events in range");
968 let events = retry(
969 retry_delay,
970 max_retry_duration,
971 "ConsensusKeysUpdatedV2 event fetch",
972 move || {
973 let stake_table_contract = stake_table_contract.clone();
974 Box::pin(async move {
975 stake_table_contract
976 .ConsensusKeysUpdatedV2_filter()
977 .from_block(from)
978 .to_block(to)
979 .query()
980 .await
981 })
982 },
983 )
984 .await;
985
986 stream::iter(events.into_iter().filter(|(event, log)| {
987 if let Err(e) = event.authenticate() {
988 tracing::warn!(
989 %e,
990 "Failed to authenticate ConsensusKeysUpdatedV2 event {}",
991 log.display()
992 );
993 return false;
994 }
995 true
996 }))
997 }
998 });
999 let registrations = registered_events.flatten().collect().await;
1000 let registrations_v2 = registered_events_v2.flatten().collect().await;
1001 let deregistrations = deregistered_events.flatten().collect().await;
1002 let delegated = delegated_events.flatten().collect().await;
1003 let undelegated = undelegated_events.flatten().collect().await;
1004 let keys = keys_update_events.flatten().collect().await;
1005 let keys_v2 = keys_update_events_v2.flatten().collect().await;
1006
1007 StakeTableEvents::from_l1_logs(
1008 registrations,
1009 registrations_v2,
1010 deregistrations,
1011 delegated,
1012 undelegated,
1013 keys,
1014 keys_v2,
1015 )
1016 }
1017
1018 pub async fn fetch_and_store_stake_table_events(
1024 &self,
1025 contract: Address,
1026 to_block: u64,
1027 ) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
1028 let events = self.fetch_events(contract, to_block).await?;
1029
1030 tracing::info!("storing events in storage to_block={to_block:?}");
1031
1032 {
1033 let persistence_lock = self.persistence.lock().await;
1034 persistence_lock
1035 .store_events(to_block, events.clone())
1036 .await
1037 .inspect_err(|e| tracing::error!("failed to store events. err={e}"))?;
1038 }
1039
1040 Ok(events)
1041 }
1042
1043 pub async fn fetch_all_validators_from_contract(
1045 l1_client: L1Client,
1046 contract: Address,
1047 to_block: u64,
1048 ) -> anyhow::Result<ValidatorMap> {
1049 let events = Self::fetch_events_from_contract(l1_client, contract, None, to_block).await;
1050 let sorted = events.sort_events()?;
1051 validators_from_l1_events(sorted.into_iter().map(|(_, e)| e))
1053 .context("failed to construct validators set from l1 events")
1054 }
1055 pub async fn fetch_block_reward(&self) -> Result<RewardAmount, FetchRewardError> {
1072 let chain_config = *self.chain_config.lock().await;
1073
1074 let stake_table_contract = chain_config
1075 .stake_table_contract
1076 .ok_or(FetchRewardError::MissingStakeTableContract)?;
1077
1078 let provider = self.l1_client.provider.clone();
1079 let stake_table = StakeTableV2::new(stake_table_contract, provider.clone());
1080
1081 let stake_table_init_block = stake_table
1085 .initializedAtBlock()
1086 .block(BlockId::finalized())
1087 .call()
1088 .await
1089 .map_err(FetchRewardError::ContractCall)?
1090 ._0
1091 .to::<u64>();
1092
1093 tracing::info!("stake table init block ={stake_table_init_block}");
1094
1095 let token_address = stake_table
1096 .token()
1097 .block(BlockId::finalized())
1098 .call()
1099 .await
1100 .map_err(FetchRewardError::TokenAddressFetch)?
1101 ._0;
1102
1103 let token = EspToken::new(token_address, provider.clone());
1104
1105 let init_logs = token
1112 .Initialized_filter()
1113 .from_block(0u64)
1114 .to_block(BlockNumberOrTag::Finalized)
1115 .query()
1116 .await;
1117
1118 let init_log = match init_logs {
1119 Ok(init_logs) => {
1120 if init_logs.is_empty() {
1121 tracing::error!(
1122 "Token Initialized event logs are empty. This should never happen"
1123 );
1124 return Err(FetchRewardError::MissingInitializedEvent);
1125 }
1126
1127 let (_, init_log) = init_logs[0].clone();
1128
1129 tracing::debug!(tx_hash = ?init_log.transaction_hash, "Found token `Initialized` event");
1130 init_log
1131 },
1132 Err(err) => {
1133 tracing::warn!(
1134 "RPC returned error {err:?}. will fallback to scanning over fixed block range"
1135 );
1136 self.scan_token_contract_initialized_event_log(stake_table_init_block, token)
1137 .await?
1138 },
1139 };
1140
1141 let tx_hash =
1143 init_log
1144 .transaction_hash
1145 .ok_or_else(|| FetchRewardError::MissingTransactionHash {
1146 init_log: init_log.clone().into(),
1147 })?;
1148
1149 let init_tx = provider
1151 .get_transaction_receipt(tx_hash)
1152 .await
1153 .map_err(FetchRewardError::Rpc)?
1154 .ok_or_else(|| FetchRewardError::MissingTransactionReceipt {
1155 tx_hash: tx_hash.to_string(),
1156 })?;
1157
1158 let mint_transfer = init_tx.decoded_log::<EspToken::Transfer>().ok_or(
1159 FetchRewardError::DecodeTransferLog {
1160 tx_hash: tx_hash.to_string(),
1161 },
1162 )?;
1163
1164 tracing::debug!("mint transfer event ={mint_transfer:?}");
1165 if mint_transfer.from != Address::ZERO {
1166 return Err(FetchRewardError::InvalidMintFromAddress);
1167 }
1168
1169 let initial_supply = mint_transfer.value;
1170
1171 tracing::info!("Initial token amount: {} ESP", format_ether(initial_supply));
1172
1173 let reward = ((initial_supply * U256::from(INFLATION_RATE)) / U256::from(BLOCKS_PER_YEAR))
1174 .checked_div(U256::from(COMMISSION_BASIS_POINTS))
1175 .ok_or(FetchRewardError::DivisionByZero)?;
1176
1177 Ok(RewardAmount(reward))
1178 }
1179
1180 pub async fn scan_token_contract_initialized_event_log(
1181 &self,
1182 stake_table_init_block: u64,
1183 token: EspTokenInstance<(), L1Provider>,
1184 ) -> Result<Log, FetchRewardError> {
1185 let max_events_range = self.l1_client.options().l1_events_max_block_range;
1186 const MAX_BLOCKS_SCANNED: u64 = 200_000;
1187 let mut total_scanned = 0;
1188
1189 let mut from_block = stake_table_init_block.saturating_sub(max_events_range);
1190 let mut to_block = stake_table_init_block;
1191
1192 loop {
1193 if total_scanned >= MAX_BLOCKS_SCANNED {
1194 tracing::error!(
1195 total_scanned,
1196 "Exceeded maximum scan range while searching for token Initialized event"
1197 );
1198 return Err(FetchRewardError::ExceededMaxScanRange(MAX_BLOCKS_SCANNED));
1199 }
1200
1201 let init_logs = token
1202 .Initialized_filter()
1203 .from_block(from_block)
1204 .to_block(to_block)
1205 .query()
1206 .await
1207 .map_err(FetchRewardError::ScanQueryFailed)?;
1208
1209 if !init_logs.is_empty() {
1210 let (_, init_log) = init_logs[0].clone();
1211 tracing::info!(
1212 from_block,
1213 tx_hash = ?init_log.transaction_hash,
1214 "Found token Initialized event during scan"
1215 );
1216 return Ok(init_log);
1217 }
1218
1219 total_scanned += max_events_range;
1220 from_block = from_block.saturating_sub(max_events_range);
1221 to_block = to_block.saturating_sub(max_events_range);
1222 }
1223 }
1224
1225 pub async fn fetch(&self, epoch: Epoch, header: Header) -> anyhow::Result<ValidatorMap> {
1226 let chain_config = self.get_chain_config(&header).await?;
1227 *self.chain_config.lock().await = chain_config;
1229
1230 let Some(address) = chain_config.stake_table_contract else {
1231 bail!("No stake table contract address found in Chain config");
1232 };
1233
1234 let Some(l1_finalized_block_info) = header.l1_finalized() else {
1235 bail!(
1236 "The epoch root for epoch {epoch} is missing the L1 finalized block info. This is \
1237 a fatal error. Consensus is blocked and will not recover."
1238 );
1239 };
1240
1241 let events = match self
1242 .fetch_and_store_stake_table_events(address, l1_finalized_block_info.number())
1243 .await
1244 .map_err(GetStakeTablesError::L1ClientFetchError)
1245 {
1246 Ok(events) => events,
1247 Err(e) => {
1248 bail!("failed to fetch stake table events {e:?}");
1249 },
1250 };
1251
1252 match active_validator_set_from_l1_events(events.into_iter().map(|(_, e)| e)) {
1253 Ok(validators) => Ok(validators),
1254 Err(e) => {
1255 bail!("failed to construct stake table {e:?}");
1256 },
1257 }
1258 }
1259
1260 pub(crate) async fn get_chain_config(&self, header: &Header) -> anyhow::Result<ChainConfig> {
1263 let chain_config = self.chain_config.lock().await;
1264 let peers = self.peers.clone();
1265 let header_cf = header.chain_config();
1266 if chain_config.commit() == header_cf.commit() {
1267 return Ok(*chain_config);
1268 }
1269
1270 let cf = match header_cf.resolve() {
1271 Some(cf) => cf,
1272 None => peers
1273 .fetch_chain_config(header_cf.commit())
1274 .await
1275 .map_err(|err| {
1276 tracing::error!("failed to get chain_config from peers. err: {err:?}");
1277 err
1278 })?,
1279 };
1280
1281 Ok(cf)
1282 }
1283
1284 #[cfg(any(test, feature = "testing"))]
1285 pub fn mock() -> Self {
1286 use crate::{mock, v0_1::NoStorage};
1287 let chain_config = ChainConfig::default();
1288 let l1 = L1Client::new(vec!["http://localhost:3331".parse().unwrap()])
1289 .expect("Failed to create L1 client");
1290
1291 let peers = Arc::new(mock::MockStateCatchup::default());
1292 let persistence = NoStorage;
1293
1294 Self::new(peers, Arc::new(Mutex::new(persistence)), l1, chain_config)
1295 }
1296}
1297
1298async fn retry<F, T, E>(
1299 retry_delay: Duration,
1300 max_duration: Duration,
1301 operation_name: &str,
1302 mut operation: F,
1303) -> T
1304where
1305 F: FnMut() -> BoxFuture<'static, Result<T, E>>,
1306 E: std::fmt::Display,
1307{
1308 let start = Instant::now();
1309 loop {
1310 match operation().await {
1311 Ok(result) => return result,
1312 Err(err) => {
1313 if start.elapsed() >= max_duration {
1314 panic!(
1315 r#"
1316 Failed to complete operation `{operation_name}` after `{}`.
1317 error: {err}
1318
1319
1320 This might be caused by:
1321 - The current block range being too large for your RPC provider.
1322 - The event query returning more data than your RPC allows as some RPC providers limit the number of events returned.
1323 - RPC provider outage
1324
1325 Suggested solution:
1326 - Reduce the value of the environment variable `ESPRESSO_SEQUENCER_L1_EVENTS_MAX_BLOCK_RANGE` to query smaller ranges.
1327 - Add multiple RPC providers
1328 - Use a different RPC provider with higher rate limits."#,
1329 format_duration(max_duration)
1330 );
1331 }
1332 tracing::warn!(%err, "Retrying `{operation_name}` after error");
1333 sleep(retry_delay).await;
1334 },
1335 }
1336 }
1337}
1338
1339#[derive(Clone, Debug)]
1341struct NonEpochCommittee {
1342 eligible_leaders: Vec<PeerConfig<SeqTypes>>,
1346
1347 stake_table: Vec<PeerConfig<SeqTypes>>,
1349
1350 da_members: Vec<PeerConfig<SeqTypes>>,
1352
1353 indexed_stake_table: HashMap<PubKey, PeerConfig<SeqTypes>>,
1355
1356 indexed_da_members: HashMap<PubKey, PeerConfig<SeqTypes>>,
1358}
1359
1360#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
1362pub struct EpochCommittee {
1363 eligible_leaders: Vec<PeerConfig<SeqTypes>>,
1367 stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>>,
1369 validators: ValidatorMap,
1370 address_mapping: HashMap<BLSPubKey, Address>,
1371}
1372
1373impl EpochCommittees {
1374 pub fn first_epoch(&self) -> Option<Epoch> {
1375 self.first_epoch
1376 }
1377
1378 pub fn fetcher(&self) -> &Fetcher {
1379 &self.fetcher
1380 }
1381
1382 fn update(
1388 &mut self,
1389 epoch: EpochNumber,
1390 validators: ValidatorMap,
1391 block_reward: Option<RewardAmount>,
1392 ) {
1393 let mut address_mapping = HashMap::new();
1394 let stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>> = validators
1395 .values()
1396 .map(|v| {
1397 address_mapping.insert(v.stake_table_key, v.account);
1398 (
1399 v.stake_table_key,
1400 PeerConfig {
1401 stake_table_entry: BLSPubKey::stake_table_entry(
1402 &v.stake_table_key,
1403 v.stake,
1404 ),
1405 state_ver_key: v.state_ver_key.clone(),
1406 },
1407 )
1408 })
1409 .collect();
1410
1411 let eligible_leaders: Vec<PeerConfig<SeqTypes>> =
1412 stake_table.iter().map(|(_, l)| l.clone()).collect();
1413
1414 self.state.insert(
1415 epoch,
1416 EpochCommittee {
1417 eligible_leaders,
1418 stake_table,
1419 validators,
1420 address_mapping,
1421 },
1422 );
1423
1424 if let Some(block_reward) = block_reward {
1425 self.block_reward = block_reward;
1426 }
1427 }
1428
1429 pub fn validators(&self, epoch: &Epoch) -> anyhow::Result<ValidatorMap> {
1430 Ok(self
1431 .state
1432 .get(epoch)
1433 .context("state for found")?
1434 .validators
1435 .clone())
1436 }
1437
1438 pub fn address(&self, epoch: &Epoch, bls_key: BLSPubKey) -> anyhow::Result<Address> {
1439 let mapping = self
1440 .state
1441 .get(epoch)
1442 .context("state for found")?
1443 .address_mapping
1444 .clone();
1445
1446 Ok(*mapping.get(&bls_key).context(format!(
1447 "failed to get ethereum address for bls key {bls_key}. epoch={epoch}"
1448 ))?)
1449 }
1450
1451 pub fn get_validator_config(
1452 &self,
1453 epoch: &Epoch,
1454 key: BLSPubKey,
1455 ) -> anyhow::Result<Validator<BLSPubKey>> {
1456 let address = self.address(epoch, key)?;
1457 let validators = self.validators(epoch)?;
1458 validators
1459 .get(&address)
1460 .context("validator not found")
1461 .cloned()
1462 }
1463
1464 pub fn block_reward(&self) -> RewardAmount {
1465 self.block_reward
1466 }
1467
1468 pub fn new_stake(
1470 committee_members: Vec<PeerConfig<SeqTypes>>,
1473 da_members: Vec<PeerConfig<SeqTypes>>,
1474 block_reward: RewardAmount,
1475 fetcher: Fetcher,
1476 ) -> Self {
1477 let stake_table: Vec<_> = committee_members
1479 .iter()
1480 .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
1481 .cloned()
1482 .collect();
1483
1484 let eligible_leaders = stake_table.clone();
1485 let da_members: Vec<_> = da_members
1487 .iter()
1488 .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
1489 .cloned()
1490 .collect();
1491
1492 let indexed_stake_table: HashMap<PubKey, _> = stake_table
1494 .iter()
1495 .map(|peer_config| {
1496 (
1497 PubKey::public_key(&peer_config.stake_table_entry),
1498 peer_config.clone(),
1499 )
1500 })
1501 .collect();
1502
1503 let indexed_da_members: HashMap<PubKey, _> = da_members
1505 .iter()
1506 .map(|peer_config| {
1507 (
1508 PubKey::public_key(&peer_config.stake_table_entry),
1509 peer_config.clone(),
1510 )
1511 })
1512 .collect();
1513
1514 let members = NonEpochCommittee {
1515 eligible_leaders,
1516 stake_table,
1517 da_members,
1518 indexed_stake_table,
1519 indexed_da_members,
1520 };
1521
1522 let mut map = HashMap::new();
1523 let epoch_committee = EpochCommittee {
1524 eligible_leaders: members.eligible_leaders.clone(),
1525 stake_table: members
1526 .stake_table
1527 .iter()
1528 .map(|x| (PubKey::public_key(&x.stake_table_entry), x.clone()))
1529 .collect(),
1530 validators: Default::default(),
1531 address_mapping: HashMap::new(),
1532 };
1533 map.insert(Epoch::genesis(), epoch_committee.clone());
1534 map.insert(Epoch::genesis() + 1u64, epoch_committee.clone());
1536
1537 Self {
1538 non_epoch_committee: members,
1539 state: map,
1540 randomized_committees: BTreeMap::new(),
1541 first_epoch: None,
1542 block_reward,
1543 fetcher: Arc::new(fetcher),
1544 }
1545 }
1546
1547 pub async fn reload_stake(&mut self, limit: u64) {
1548 let loaded_stake = match self
1551 .fetcher
1552 .persistence
1553 .lock()
1554 .await
1555 .load_latest_stake(limit)
1556 .await
1557 {
1558 Ok(Some(loaded)) => loaded,
1559 Ok(None) => {
1560 tracing::warn!("No stake table history found in persistence!");
1561 return;
1562 },
1563 Err(e) => {
1564 tracing::error!("Failed to load stake table history from persistence: {e}");
1565 return;
1566 },
1567 };
1568
1569 for (epoch, stake_table) in loaded_stake {
1570 self.update(epoch, stake_table, None);
1571 }
1572 }
1573
1574 fn get_stake_table(&self, epoch: &Option<Epoch>) -> Option<Vec<PeerConfig<SeqTypes>>> {
1575 if let Some(epoch) = epoch {
1576 self.state
1577 .get(epoch)
1578 .map(|committee| committee.stake_table.clone().into_values().collect())
1579 } else {
1580 Some(self.non_epoch_committee.stake_table.clone())
1581 }
1582 }
1583}
1584
1585#[derive(Error, Debug)]
1586enum GetStakeTablesError {
1588 #[error("Error fetching from L1: {0}")]
1589 L1ClientFetchError(anyhow::Error),
1590}
1591
1592#[derive(Error, Debug)]
1593#[error("Could not lookup leader")] pub struct LeaderLookupError;
1595
1596impl Membership<SeqTypes> for EpochCommittees {
1598 type Error = LeaderLookupError;
1599 fn new(
1601 _committee_members: Vec<PeerConfig<SeqTypes>>,
1604 _da_members: Vec<PeerConfig<SeqTypes>>,
1605 ) -> Self {
1606 panic!("This function has been replaced with new_stake()");
1607 }
1608
1609 fn stake_table(&self, epoch: Option<Epoch>) -> HSStakeTable<SeqTypes> {
1611 self.get_stake_table(&epoch).unwrap_or_default().into()
1612 }
1613 fn da_stake_table(&self, _epoch: Option<Epoch>) -> HSStakeTable<SeqTypes> {
1615 self.non_epoch_committee.da_members.clone().into()
1616 }
1617
1618 fn committee_members(
1620 &self,
1621 _view_number: <SeqTypes as NodeType>::View,
1622 epoch: Option<Epoch>,
1623 ) -> BTreeSet<PubKey> {
1624 let stake_table = self.stake_table(epoch);
1625 stake_table
1626 .iter()
1627 .map(|x| PubKey::public_key(&x.stake_table_entry))
1628 .collect()
1629 }
1630
1631 fn da_committee_members(
1633 &self,
1634 _view_number: <SeqTypes as NodeType>::View,
1635 _epoch: Option<Epoch>,
1636 ) -> BTreeSet<PubKey> {
1637 self.non_epoch_committee
1638 .indexed_da_members
1639 .clone()
1640 .into_keys()
1641 .collect()
1642 }
1643
1644 fn stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> Option<PeerConfig<SeqTypes>> {
1646 if let Some(epoch) = epoch {
1648 self.state
1649 .get(&epoch)
1650 .and_then(|h| h.stake_table.get(pub_key))
1651 .cloned()
1652 } else {
1653 self.non_epoch_committee
1654 .indexed_stake_table
1655 .get(pub_key)
1656 .cloned()
1657 }
1658 }
1659
1660 fn da_stake(&self, pub_key: &PubKey, _epoch: Option<Epoch>) -> Option<PeerConfig<SeqTypes>> {
1662 self.non_epoch_committee
1664 .indexed_da_members
1665 .get(pub_key)
1666 .cloned()
1667 }
1668
1669 fn has_stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> bool {
1671 self.stake(pub_key, epoch)
1672 .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1673 .unwrap_or_default()
1674 }
1675
1676 fn has_da_stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> bool {
1678 self.da_stake(pub_key, epoch)
1679 .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1680 .unwrap_or_default()
1681 }
1682
1683 fn lookup_leader(
1696 &self,
1697 view_number: <SeqTypes as NodeType>::View,
1698 epoch: Option<Epoch>,
1699 ) -> Result<PubKey, Self::Error> {
1700 match (self.first_epoch(), epoch) {
1701 (Some(first_epoch), Some(epoch)) => {
1702 if epoch < first_epoch {
1703 tracing::error!(
1704 "lookup_leader called with epoch {} before first epoch {}",
1705 epoch,
1706 first_epoch,
1707 );
1708 return Err(LeaderLookupError);
1709 }
1710 let Some(randomized_committee) = self.randomized_committees.get(&epoch) else {
1711 tracing::error!(
1712 "We are missing the randomized committee for epoch {}",
1713 epoch
1714 );
1715 return Err(LeaderLookupError);
1716 };
1717
1718 Ok(PubKey::public_key(&select_randomized_leader(
1719 randomized_committee,
1720 *view_number,
1721 )))
1722 },
1723 (_, None) => {
1724 let leaders = &self.non_epoch_committee.eligible_leaders;
1725
1726 let index = *view_number as usize % leaders.len();
1727 let res = leaders[index].clone();
1728 Ok(PubKey::public_key(&res.stake_table_entry))
1729 },
1730 (None, Some(epoch)) => {
1731 tracing::error!(
1732 "lookup_leader called with epoch {} but we don't have a first epoch",
1733 epoch,
1734 );
1735 Err(LeaderLookupError)
1736 },
1737 }
1738 }
1739
1740 fn total_nodes(&self, epoch: Option<Epoch>) -> usize {
1742 self.stake_table(epoch).len()
1743 }
1744
1745 fn da_total_nodes(&self, epoch: Option<Epoch>) -> usize {
1747 self.da_stake_table(epoch).len()
1748 }
1749
1750 fn success_threshold(&self, epoch: Option<Epoch>) -> U256 {
1752 let total_stake = self.total_stake(epoch);
1753 let one = U256::ONE;
1754 let two = U256::from(2);
1755 let three = U256::from(3);
1756 if total_stake < U256::MAX / two {
1757 ((total_stake * two) / three) + one
1758 } else {
1759 ((total_stake / three) * two) + two
1760 }
1761 }
1762
1763 fn da_success_threshold(&self, epoch: Option<Epoch>) -> U256 {
1765 let total_stake = self.total_da_stake(epoch);
1766 let one = U256::ONE;
1767 let two = U256::from(2);
1768 let three = U256::from(3);
1769
1770 if total_stake < U256::MAX / two {
1771 ((total_stake * two) / three) + one
1772 } else {
1773 ((total_stake / three) * two) + two
1774 }
1775 }
1776
1777 fn failure_threshold(&self, epoch: Option<Epoch>) -> U256 {
1779 let total_stake = self.total_stake(epoch);
1780 let one = U256::ONE;
1781 let three = U256::from(3);
1782
1783 (total_stake / three) + one
1784 }
1785
1786 fn upgrade_threshold(&self, epoch: Option<Epoch>) -> U256 {
1788 let total_stake = self.total_stake(epoch);
1789 let nine = U256::from(9);
1790 let ten = U256::from(10);
1791
1792 let normal_threshold = self.success_threshold(epoch);
1793 let higher_threshold = if total_stake < U256::MAX / nine {
1794 (total_stake * nine) / ten
1795 } else {
1796 (total_stake / ten) * nine
1797 };
1798
1799 max(higher_threshold, normal_threshold)
1800 }
1801
1802 async fn add_epoch_root(
1803 membership: Arc<RwLock<Self>>,
1804 epoch: Epoch,
1805 block_header: Header,
1806 ) -> anyhow::Result<()> {
1807 let membership_reader = membership.read().await;
1808 if membership_reader.state.contains_key(&epoch) {
1809 tracing::info!(
1810 "We already have the stake table for epoch {}. Skipping L1 fetching.",
1811 epoch
1812 );
1813 return Ok(());
1814 }
1815 let fetcher = Arc::clone(&membership_reader.fetcher);
1816 drop(membership_reader);
1817
1818 let stake_tables = fetcher.fetch(epoch, block_header).await?;
1819
1820 let mut block_reward = None;
1821
1822 {
1823 let membership_reader = membership.read().await;
1824 if membership_reader.block_reward == RewardAmount(U256::ZERO) {
1831 block_reward = Some(fetcher.fetch_block_reward().await?);
1832 }
1833 }
1834
1835 {
1837 let persistence_lock = fetcher.persistence.lock().await;
1838 if let Err(e) = persistence_lock
1839 .store_stake(epoch, stake_tables.clone())
1840 .await
1841 {
1842 tracing::error!(?e, "`add_epoch_root`, error storing stake table");
1843 }
1844 }
1845
1846 let mut membership_writer = membership.write().await;
1847 membership_writer.update(epoch, stake_tables, block_reward);
1848 Ok(())
1849 }
1850
1851 fn has_stake_table(&self, epoch: Epoch) -> bool {
1852 self.state.contains_key(&epoch)
1853 }
1854
1855 fn has_randomized_stake_table(&self, epoch: Epoch) -> anyhow::Result<bool> {
1867 let Some(first_epoch) = self.first_epoch else {
1868 bail!(
1869 "Called has_randomized_stake_table with epoch {} but first_epoch is None",
1870 epoch
1871 );
1872 };
1873 ensure!(
1874 epoch >= first_epoch,
1875 "Called has_randomized_stake_table with epoch {} but first_epoch is {}",
1876 epoch,
1877 first_epoch
1878 );
1879 Ok(self.randomized_committees.contains_key(&epoch))
1880 }
1881
1882 async fn get_epoch_root(
1883 membership: Arc<RwLock<Self>>,
1884 block_height: u64,
1885 epoch: Epoch,
1886 ) -> anyhow::Result<Leaf2> {
1887 let membership_reader = membership.read().await;
1888 let peers = membership_reader.fetcher.peers.clone();
1889 let stake_table = membership_reader.stake_table(Some(epoch)).clone();
1890 let success_threshold = membership_reader.success_threshold(Some(epoch));
1891 drop(membership_reader);
1892
1893 let leaf: Leaf2 = peers
1895 .fetch_leaf(block_height, stake_table.clone(), success_threshold)
1896 .await?;
1897
1898 Ok(leaf)
1899 }
1900
1901 async fn get_epoch_drb(
1902 membership: Arc<RwLock<Self>>,
1903 block_height: u64,
1904 epoch: Epoch,
1905 ) -> anyhow::Result<DrbResult> {
1906 let membership_reader = membership.read().await;
1907 let peers = membership_reader.fetcher.peers.clone();
1908 let stake_table = membership_reader.stake_table(Some(epoch)).clone();
1909 let success_threshold = membership_reader.success_threshold(Some(epoch));
1910 drop(membership_reader);
1911
1912 tracing::debug!(
1913 "Getting DRB for epoch {}, block height {}",
1914 epoch,
1915 block_height
1916 );
1917 let drb_leaf = peers
1918 .try_fetch_leaf(1, block_height, stake_table, success_threshold)
1919 .await?;
1920
1921 let Some(drb) = drb_leaf.next_drb_result else {
1922 tracing::error!(
1923 "We received a leaf that should contain a DRB result, but the DRB result is \
1924 missing: {:?}",
1925 drb_leaf
1926 );
1927
1928 bail!("DRB leaf is missing the DRB result.");
1929 };
1930
1931 Ok(drb)
1932 }
1933
1934 fn add_drb_result(&mut self, epoch: Epoch, drb: DrbResult) {
1935 let Some(raw_stake_table) = self.state.get(&epoch) else {
1936 tracing::error!(
1937 "add_drb_result({epoch}, {drb:?}) was called, but we do not yet have the stake \
1938 table for epoch {epoch}"
1939 );
1940 return;
1941 };
1942
1943 let leaders = raw_stake_table
1944 .eligible_leaders
1945 .clone()
1946 .into_iter()
1947 .map(|peer_config| peer_config.stake_table_entry)
1948 .collect::<Vec<_>>();
1949 let randomized_committee = generate_stake_cdf(leaders, drb);
1950
1951 self.randomized_committees
1952 .insert(epoch, randomized_committee);
1953 }
1954
1955 fn set_first_epoch(&mut self, epoch: Epoch, initial_drb_result: DrbResult) {
1956 self.first_epoch = Some(epoch);
1957
1958 let epoch_committee = self.state.get(&Epoch::genesis()).unwrap().clone();
1959 self.state.insert(epoch, epoch_committee.clone());
1960 self.state.insert(epoch + 1, epoch_committee);
1961 self.add_drb_result(epoch, initial_drb_result);
1962 self.add_drb_result(epoch + 1, initial_drb_result);
1963 }
1964
1965 fn first_epoch(&self) -> Option<<SeqTypes as NodeType>::Epoch> {
1966 self.first_epoch
1967 }
1968}
1969
1970#[cfg(any(test, feature = "testing"))]
1971impl super::v0_3::StakeTable {
1972 pub fn mock(n: u64) -> Self {
1974 [..n]
1975 .iter()
1976 .map(|_| PeerConfig::default())
1977 .collect::<Vec<PeerConfig<SeqTypes>>>()
1978 .into()
1979 }
1980}
1981
1982#[cfg(any(test, feature = "testing"))]
1983impl DAMembers {
1984 pub fn mock(n: u64) -> Self {
1986 [..n]
1987 .iter()
1988 .map(|_| PeerConfig::default())
1989 .collect::<Vec<PeerConfig<SeqTypes>>>()
1990 .into()
1991 }
1992}
1993
1994#[cfg(any(test, feature = "testing"))]
1995pub mod testing {
1996 use alloy::primitives::Bytes;
1997 use hotshot_contract_adapter::{
1998 sol_types::{EdOnBN254PointSol, G1PointSol, G2PointSol},
1999 stake_table::{sign_address_bls, sign_address_schnorr},
2000 };
2001 use hotshot_types::{light_client::StateKeyPair, signature_key::BLSKeyPair};
2002 use rand::{Rng as _, RngCore as _};
2003
2004 use super::*;
2005
2006 #[derive(Debug, Clone)]
2009 pub struct TestValidator {
2010 pub account: Address,
2011 pub bls_vk: G2PointSol,
2012 pub schnorr_vk: EdOnBN254PointSol,
2013 pub commission: u16,
2014 pub bls_sig: G1PointSol,
2015 pub schnorr_sig: Bytes,
2016 }
2017
2018 impl TestValidator {
2019 pub fn random() -> Self {
2020 let account = Address::random();
2021 let commission = rand::thread_rng().gen_range(0..10000);
2022 Self::random_update_keys(account, commission)
2023 }
2024
2025 pub fn randomize_keys(&self) -> Self {
2026 Self::random_update_keys(self.account, self.commission)
2027 }
2028
2029 fn random_update_keys(account: Address, commission: u16) -> Self {
2030 let mut rng = &mut rand::thread_rng();
2031 let mut seed = [0u8; 32];
2032 rng.fill_bytes(&mut seed);
2033 let bls_key_pair = BLSKeyPair::generate(&mut rng);
2034 let bls_sig = sign_address_bls(&bls_key_pair, account);
2035 let schnorr_key_pair = StateKeyPair::generate_from_seed_indexed(seed, 0);
2036 let schnorr_sig = sign_address_schnorr(&schnorr_key_pair, account);
2037 Self {
2038 account,
2039 bls_vk: bls_key_pair.ver_key().to_affine().into(),
2040 schnorr_vk: schnorr_key_pair.ver_key().to_affine().into(),
2041 commission,
2042 bls_sig,
2043 schnorr_sig,
2044 }
2045 }
2046 }
2047
2048 impl From<&TestValidator> for ValidatorRegistered {
2049 fn from(value: &TestValidator) -> Self {
2050 Self {
2051 account: value.account,
2052 blsVk: value.bls_vk,
2053 schnorrVk: value.schnorr_vk,
2054 commission: value.commission,
2055 }
2056 }
2057 }
2058
2059 impl From<&TestValidator> for ValidatorRegisteredV2 {
2060 fn from(value: &TestValidator) -> Self {
2061 Self {
2062 account: value.account,
2063 blsVK: value.bls_vk,
2064 schnorrVK: value.schnorr_vk,
2065 commission: value.commission,
2066 blsSig: value.bls_sig.into(),
2067 schnorrSig: value.schnorr_sig.clone(),
2068 }
2069 }
2070 }
2071
2072 impl From<&TestValidator> for ConsensusKeysUpdated {
2073 fn from(value: &TestValidator) -> Self {
2074 Self {
2075 account: value.account,
2076 blsVK: value.bls_vk,
2077 schnorrVK: value.schnorr_vk,
2078 }
2079 }
2080 }
2081
2082 impl From<&TestValidator> for ConsensusKeysUpdatedV2 {
2083 fn from(value: &TestValidator) -> Self {
2084 Self {
2085 account: value.account,
2086 blsVK: value.bls_vk,
2087 schnorrVK: value.schnorr_vk,
2088 blsSig: value.bls_sig.into(),
2089 schnorrSig: value.schnorr_sig.clone(),
2090 }
2091 }
2092 }
2093
2094 impl From<&TestValidator> for ValidatorExit {
2095 fn from(value: &TestValidator) -> Self {
2096 Self {
2097 validator: value.account,
2098 }
2099 }
2100 }
2101
2102 impl Validator<BLSPubKey> {
2103 pub fn mock() -> Validator<BLSPubKey> {
2104 let val = TestValidator::random();
2105 let rng = &mut rand::thread_rng();
2106 let mut seed = [1u8; 32];
2107 rng.fill_bytes(&mut seed);
2108 let mut validator_stake = alloy::primitives::U256::from(0);
2109 let mut delegators = HashMap::new();
2110 for _i in 0..=5000 {
2111 let stake: u64 = rng.gen_range(0..10000);
2112 delegators.insert(Address::random(), alloy::primitives::U256::from(stake));
2113 validator_stake += alloy::primitives::U256::from(stake);
2114 }
2115
2116 let stake_table_key = val.bls_vk.into();
2117 let state_ver_key = val.schnorr_vk.into();
2118
2119 Validator {
2120 account: val.account,
2121 stake_table_key,
2122 state_ver_key,
2123 stake: validator_stake,
2124 commission: val.commission,
2125 delegators,
2126 }
2127 }
2128 }
2129}
2130
2131#[cfg(test)]
2132mod tests {
2133
2134 use alloy::{primitives::Address, rpc::types::Log};
2135 use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
2136 use pretty_assertions::assert_matches;
2137 use rstest::rstest;
2138 use sequencer_utils::test_utils::setup_test;
2139
2140 use super::*;
2141 use crate::{v0::impls::testing::*, L1ClientOptions};
2142
2143 #[test]
2144 fn test_from_l1_events() -> anyhow::Result<()> {
2145 setup_test();
2146 let val_1 = TestValidator::random();
2148 let val_1_new_keys = val_1.randomize_keys();
2149 let val_2 = TestValidator::random();
2150 let val_2_new_keys = val_2.randomize_keys();
2151 let delegator = Address::random();
2152 let mut events: Vec<StakeTableEvent> = [
2153 ValidatorRegistered::from(&val_1).into(),
2154 ValidatorRegisteredV2::from(&val_2).into(),
2155 Delegated {
2156 delegator,
2157 validator: val_1.account,
2158 amount: U256::from(10),
2159 }
2160 .into(),
2161 ConsensusKeysUpdated::from(&val_1_new_keys).into(),
2162 ConsensusKeysUpdatedV2::from(&val_2_new_keys).into(),
2163 Undelegated {
2164 delegator,
2165 validator: val_1.account,
2166 amount: U256::from(7),
2167 }
2168 .into(),
2169 Delegated {
2171 delegator,
2172 validator: val_1.account,
2173 amount: U256::from(5),
2174 }
2175 .into(),
2176 Delegated {
2178 delegator: Address::random(),
2179 validator: val_2.account,
2180 amount: U256::from(3),
2181 }
2182 .into(),
2183 ]
2184 .to_vec();
2185
2186 let st = active_validator_set_from_l1_events(events.iter().cloned())?;
2187 let st_val_1 = st.get(&val_1.account).unwrap();
2188 assert_eq!(st_val_1.stake, U256::from(8));
2190 assert_eq!(st_val_1.commission, val_1.commission);
2191 assert_eq!(st_val_1.delegators.len(), 1);
2192 assert_eq!(*st_val_1.delegators.get(&delegator).unwrap(), U256::from(8));
2194
2195 let st_val_2 = st.get(&val_2.account).unwrap();
2196 assert_eq!(st_val_2.stake, U256::from(3));
2197 assert_eq!(st_val_2.commission, val_2.commission);
2198 assert_eq!(st_val_2.delegators.len(), 1);
2199
2200 events.push(ValidatorExit::from(&val_1).into());
2201
2202 let st = active_validator_set_from_l1_events(events.iter().cloned())?;
2203 assert_eq!(st.get(&val_1.account), None);
2205
2206 let st_val_2 = st.get(&val_2.account).unwrap();
2208 assert_eq!(st_val_2.stake, U256::from(3));
2209 assert_eq!(st_val_2.commission, val_2.commission);
2210 assert_eq!(st_val_2.delegators.len(), 1);
2211
2212 events.push(ValidatorExit::from(&val_2).into());
2214
2215 assert!(active_validator_set_from_l1_events(events.iter().cloned()).is_err());
2217
2218 Ok(())
2219 }
2220
2221 #[test]
2222 fn test_from_l1_events_failures() -> anyhow::Result<()> {
2223 let val = TestValidator::random();
2224 let delegator = Address::random();
2225
2226 let register: StakeTableEvent = ValidatorRegistered::from(&val).into();
2227 let register_v2: StakeTableEvent = ValidatorRegisteredV2::from(&val).into();
2228 let delegate: StakeTableEvent = Delegated {
2229 delegator,
2230 validator: val.account,
2231 amount: U256::from(10),
2232 }
2233 .into();
2234 let key_update: StakeTableEvent = ConsensusKeysUpdated::from(&val).into();
2235 let key_update_v2: StakeTableEvent = ConsensusKeysUpdatedV2::from(&val).into();
2236 let undelegate: StakeTableEvent = Undelegated {
2237 delegator,
2238 validator: val.account,
2239 amount: U256::from(7),
2240 }
2241 .into();
2242
2243 let exit: StakeTableEvent = ValidatorExit::from(&val).into();
2244
2245 let cases = [
2246 vec![exit],
2247 vec![undelegate.clone()],
2248 vec![delegate.clone()],
2249 vec![key_update],
2250 vec![key_update_v2],
2251 vec![register.clone(), register.clone()],
2252 vec![register_v2.clone(), register_v2.clone()],
2253 vec![register.clone(), register_v2.clone()],
2254 vec![register_v2.clone(), register.clone()],
2255 vec![
2256 register,
2257 delegate.clone(),
2258 undelegate.clone(),
2259 undelegate.clone(),
2260 ],
2261 vec![register_v2, delegate, undelegate.clone(), undelegate],
2262 ];
2263
2264 for events in cases.iter() {
2265 let res = validators_from_l1_events(events.iter().cloned());
2269 assert!(
2270 res.is_err(),
2271 "events {res:?}, not a valid sequence of events"
2272 );
2273 }
2274 Ok(())
2275 }
2276
2277 #[test]
2278 fn test_validators_selection() {
2279 let mut validators = IndexMap::new();
2280 let mut highest_stake = alloy::primitives::U256::ZERO;
2281
2282 for _i in 0..3000 {
2283 let validator = Validator::mock();
2284 validators.insert(validator.account, validator.clone());
2285
2286 if validator.stake > highest_stake {
2287 highest_stake = validator.stake;
2288 }
2289 }
2290
2291 let minimum_stake = highest_stake / U256::from(VID_TARGET_TOTAL_STAKE);
2292
2293 select_active_validator_set(&mut validators).expect("Failed to select validators");
2294 assert!(
2295 validators.len() <= 100,
2296 "validators len is {}, expected at most 100",
2297 validators.len()
2298 );
2299
2300 let mut selected_validators_highest_stake = alloy::primitives::U256::ZERO;
2301 for (address, validator) in &validators {
2303 assert!(
2304 validator.stake >= minimum_stake,
2305 "Validator {:?} has stake below minimum: {}",
2306 address,
2307 validator.stake
2308 );
2309
2310 if validator.stake > selected_validators_highest_stake {
2311 selected_validators_highest_stake = validator.stake;
2312 }
2313 }
2314 }
2315
2316 #[rstest::rstest]
2319 fn test_regression_non_unique_bls_keys_not_discarded(
2320 #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
2321 version: StakeTableContractVersion,
2322 ) {
2323 let val = TestValidator::random();
2324 let register: StakeTableEvent = match version {
2325 StakeTableContractVersion::V1 => ValidatorRegistered::from(&val).into(),
2326 StakeTableContractVersion::V2 => ValidatorRegisteredV2::from(&val).into(),
2327 };
2328 let delegate: StakeTableEvent = Delegated {
2329 delegator: Address::random(),
2330 validator: val.account,
2331 amount: U256::from(10),
2332 }
2333 .into();
2334
2335 assert!(active_validator_set_from_l1_events(
2337 vec![register.clone(), delegate.clone()].into_iter()
2338 )
2339 .is_ok());
2340
2341 let key_update = ConsensusKeysUpdated::from(&val).into();
2343 let err =
2344 active_validator_set_from_l1_events(vec![register, delegate, key_update].into_iter())
2345 .unwrap_err();
2346
2347 let bls: BLSPubKey = val.bls_vk.into();
2348 assert!(matches!(err, StakeTableError::BlsKeyAlreadyUsed(addr) if addr == bls.to_string()));
2349 }
2350
2351 #[test]
2352 fn test_display_log() {
2353 let serialized = r#"{"address":"0x0000000000000000000000000000000000000069","topics":["0x0000000000000000000000000000000000000000000000000000000000000069"],"data":"0x69","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000069","blockNumber":"0x69","blockTimestamp":"0x69","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000069","transactionIndex":"0x69","logIndex":"0x70","removed":false}"#;
2354 let log: Log = serde_json::from_str(serialized).unwrap();
2355 assert_eq!(
2356 log.display(),
2357 "Log(block=105,index=112,\
2358 transaction_hash=0x0000000000000000000000000000000000000000000000000000000000000069)"
2359 )
2360 }
2361
2362 #[rstest]
2363 #[case::v1(StakeTableContractVersion::V1)]
2364 #[case::v2(StakeTableContractVersion::V2)]
2365 fn test_register_validator(#[case] version: StakeTableContractVersion) {
2366 let mut state = StakeTableState::new();
2367 let validator = TestValidator::random();
2368
2369 let event = match version {
2370 StakeTableContractVersion::V1 => StakeTableEvent::Register((&validator).into()),
2371 StakeTableContractVersion::V2 => StakeTableEvent::RegisterV2((&validator).into()),
2372 };
2373
2374 assert!(state.apply_event(event).unwrap().is_ok());
2375
2376 let stored = state.validators.get(&validator.account).unwrap();
2377 assert_eq!(stored.account, validator.account);
2378 }
2379
2380 #[rstest]
2381 #[case::v1(StakeTableContractVersion::V1)]
2382 #[case::v2(StakeTableContractVersion::V2)]
2383 fn test_validator_already_registered(#[case] version: StakeTableContractVersion) {
2384 let mut stake_table_state = StakeTableState::new();
2385
2386 let test_validator = TestValidator::random();
2387
2388 let first_registration_result =
2390 match version {
2391 StakeTableContractVersion::V1 => stake_table_state
2392 .apply_event(StakeTableEvent::Register((&test_validator).into())),
2393 StakeTableContractVersion::V2 => stake_table_state
2394 .apply_event(StakeTableEvent::RegisterV2((&test_validator).into())),
2395 };
2396
2397 assert!(first_registration_result.unwrap().is_ok());
2399
2400 let v1_already_registered_result =
2402 stake_table_state.apply_event(StakeTableEvent::Register((&test_validator).into()));
2403
2404 pretty_assertions::assert_matches!(
2405 v1_already_registered_result, Err(StakeTableError::AlreadyRegistered(account)) if account == test_validator.account,
2406 "Expected AlreadyRegistered error. version ={version:?} result={v1_already_registered_result:?}",
2407 );
2408
2409 let v2_already_registered_result =
2411 stake_table_state.apply_event(StakeTableEvent::RegisterV2((&test_validator).into()));
2412
2413 pretty_assertions::assert_matches!(
2414 v2_already_registered_result,
2415 Err(StakeTableError::AlreadyRegistered(account)) if account == test_validator.account,
2416 "Expected AlreadyRegistered error. version ={version:?} result={v2_already_registered_result:?}",
2417
2418 );
2419 }
2420
2421 #[test]
2422 fn test_register_validator_v2_auth_fails() {
2423 let mut state = StakeTableState::new();
2424 let mut val = TestValidator::random();
2425 val.bls_sig = Default::default();
2426 let event = StakeTableEvent::RegisterV2((&val).into());
2427
2428 let result = state.apply_event(event);
2429 assert!(matches!(
2430 result,
2431 Err(StakeTableError::AuthenticationFailed(_))
2432 ));
2433 }
2434
2435 #[test]
2436 fn test_deregister_validator() {
2437 let mut state = StakeTableState::new();
2438 let val = TestValidator::random();
2439
2440 let reg = StakeTableEvent::Register((&val).into());
2441 state.apply_event(reg).unwrap().unwrap();
2442
2443 let dereg = StakeTableEvent::Deregister((&val).into());
2444 assert!(state.apply_event(dereg).unwrap().is_ok());
2445 assert!(!state.validators.contains_key(&val.account));
2446 }
2447
2448 #[test]
2449 fn test_delegate_and_undelegate() {
2450 let mut state = StakeTableState::new();
2451 let val = TestValidator::random();
2452 state
2453 .apply_event(StakeTableEvent::Register((&val).into()))
2454 .unwrap()
2455 .unwrap();
2456
2457 let delegator = Address::random();
2458 let amount = U256::from(1000);
2459 let delegate_event = StakeTableEvent::Delegate(Delegated {
2460 delegator,
2461 validator: val.account,
2462 amount,
2463 });
2464 assert!(state.apply_event(delegate_event).unwrap().is_ok());
2465
2466 let validator = state.validators.get(&val.account).unwrap();
2467 assert_eq!(validator.delegators.get(&delegator).cloned(), Some(amount));
2468
2469 let undelegate_event = StakeTableEvent::Undelegate(Undelegated {
2470 delegator,
2471 validator: val.account,
2472 amount,
2473 });
2474 assert!(state.apply_event(undelegate_event).unwrap().is_ok());
2475 let validator = state.validators.get(&val.account).unwrap();
2476 assert!(!validator.delegators.contains_key(&delegator));
2477 }
2478
2479 #[rstest]
2480 #[case::v1(StakeTableContractVersion::V1)]
2481 #[case::v2(StakeTableContractVersion::V2)]
2482 fn test_key_update_event(#[case] version: StakeTableContractVersion) {
2483 let mut state = StakeTableState::new();
2484 let val = TestValidator::random();
2485
2486 state
2488 .apply_event(StakeTableEvent::Register((&val).into()))
2489 .unwrap()
2490 .unwrap();
2491
2492 let new_keys = val.randomize_keys();
2493
2494 let event = match version {
2495 StakeTableContractVersion::V1 => StakeTableEvent::KeyUpdate((&new_keys).into()),
2496 StakeTableContractVersion::V2 => StakeTableEvent::KeyUpdateV2((&new_keys).into()),
2497 };
2498
2499 assert!(state.apply_event(event).unwrap().is_ok());
2500
2501 let updated = state.validators.get(&val.account).unwrap();
2502 assert_eq!(updated.stake_table_key, new_keys.bls_vk.into());
2503 assert_eq!(updated.state_ver_key, new_keys.schnorr_vk.into());
2504 }
2505
2506 #[test]
2507 fn test_duplicate_bls_key() {
2508 let mut state = StakeTableState::new();
2509 let val = TestValidator::random();
2510 let event1 = StakeTableEvent::Register((&val).into());
2511 let mut val2 = TestValidator::random();
2512 val2.bls_vk = val.bls_vk;
2513 val2.account = Address::random();
2514
2515 let event2 = StakeTableEvent::Register((&val2).into());
2516 assert!(state.apply_event(event1).unwrap().is_ok());
2517 let result = state.apply_event(event2);
2518
2519 let expected_bls_key = BLSPubKey::from(val.bls_vk).to_string();
2520
2521 assert_matches!(
2522 result,
2523 Err(StakeTableError::BlsKeyAlreadyUsed(key))
2524 if key == expected_bls_key,
2525 "Expected BlsKeyAlreadyUsed({expected_bls_key}), but got: {result:?}",
2526 );
2527 }
2528
2529 #[test]
2530 fn test_duplicate_schnorr_key() {
2531 let mut state = StakeTableState::new();
2532 let val = TestValidator::random();
2533 let event1 = StakeTableEvent::Register((&val).into());
2534 let mut val2 = TestValidator::random();
2535 val2.schnorr_vk = val.schnorr_vk;
2536 val2.account = Address::random();
2537 val2.bls_vk = val2.randomize_keys().bls_vk;
2538
2539 let event2 = StakeTableEvent::Register((&val2).into());
2540 assert!(state.apply_event(event1).unwrap().is_ok());
2541 let result = state.apply_event(event2);
2542
2543 let schnorr: SchnorrPubKey = val.schnorr_vk.into();
2544 assert_matches!(
2545 result,
2546 Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(key)))
2547 if key == schnorr.to_string(),
2548 "Expected SchnorrKeyAlreadyUsed({schnorr}), but got: {result:?}",
2549
2550 );
2551 }
2552
2553 #[test]
2554 fn test_register_and_deregister_validator() {
2555 let mut state = StakeTableState::new();
2556 let validator = TestValidator::random();
2557 let event = StakeTableEvent::Register((&validator).into());
2558 assert!(state.apply_event(event).unwrap().is_ok());
2559
2560 let deregister_event = StakeTableEvent::Deregister((&validator).into());
2561 assert!(state.apply_event(deregister_event).unwrap().is_ok());
2562 }
2563
2564 #[test]
2565 fn test_delegate_zero_amount_is_rejected() {
2566 let mut state = StakeTableState::new();
2567 let validator = TestValidator::random();
2568 let account = validator.account;
2569 state
2570 .apply_event(StakeTableEvent::Register((&validator).into()))
2571 .unwrap()
2572 .unwrap();
2573
2574 let delegator = Address::random();
2575 let amount = U256::ZERO;
2576 let event = StakeTableEvent::Delegate(Delegated {
2577 delegator,
2578 validator: account,
2579 amount,
2580 });
2581 let result = state.apply_event(event);
2582
2583 assert_matches!(
2584 result,
2585 Err(StakeTableError::ZeroDelegatorStake(addr))
2586 if addr == delegator,
2587 "delegator stake is zero"
2588
2589 );
2590 }
2591
2592 #[test]
2593 fn test_undelegate_more_than_stake_fails() {
2594 let mut state = StakeTableState::new();
2595 let validator = TestValidator::random();
2596 let account = validator.account;
2597 state
2598 .apply_event(StakeTableEvent::Register((&validator).into()))
2599 .unwrap()
2600 .unwrap();
2601
2602 let delegator = Address::random();
2603 let event = StakeTableEvent::Delegate(Delegated {
2604 delegator,
2605 validator: account,
2606 amount: U256::from(10u64),
2607 });
2608 state.apply_event(event).unwrap().unwrap();
2609
2610 let result = state.apply_event(StakeTableEvent::Undelegate(Undelegated {
2611 delegator,
2612 validator: account,
2613 amount: U256::from(20u64),
2614 }));
2615 assert_matches!(
2616 result,
2617 Err(StakeTableError::InsufficientStake),
2618 "Expected InsufficientStake error, got: {result:?}",
2619 );
2620 }
2621
2622 #[tokio::test(flavor = "multi_thread")]
2623 async fn test_decaf_stake_table() {
2624 setup_test();
2625
2626 let events_json =
2664 std::fs::read_to_string("../data/v3/decaf_stake_table_events.json").unwrap();
2665 let events: Vec<(EventKey, StakeTableEvent)> = serde_json::from_str(&events_json).unwrap();
2666
2667 let reconstructed_stake_table =
2669 active_validator_set_from_l1_events(events.into_iter().map(|(_, e)| e)).unwrap();
2670
2671 let stake_table_json =
2672 std::fs::read_to_string("../data/v3/decaf_stake_table.json").unwrap();
2673 let expected: IndexMap<Address, Validator<BLSPubKey>> =
2674 serde_json::from_str(&stake_table_json).unwrap();
2675
2676 assert_eq!(
2677 reconstructed_stake_table, expected,
2678 "Stake table reconstructed from events does not match the expected stake table "
2679 );
2680 }
2681
2682 #[tokio::test(flavor = "multi_thread")]
2683 #[should_panic]
2684 async fn test_large_max_events_range_panic() {
2685 setup_test();
2686
2687 let contract_address = "0x40304fbe94d5e7d1492dd90c53a2d63e8506a037";
2689
2690 let l1 = L1ClientOptions {
2691 l1_events_max_retry_duration: Duration::from_secs(30),
2692 l1_events_max_block_range: 10_u64.pow(9),
2694 l1_retry_delay: Duration::from_secs(1),
2695 ..Default::default()
2696 }
2697 .connect(vec!["https://ethereum-sepolia.publicnode.com"
2698 .parse()
2699 .unwrap()])
2700 .expect("unable to construct l1 client");
2701
2702 let latest_block = l1.provider.get_block_number().await.unwrap();
2703 let _events = Fetcher::fetch_events_from_contract(
2704 l1,
2705 contract_address.parse().unwrap(),
2706 None,
2707 latest_block,
2708 )
2709 .await;
2710 }
2711}