1use std::{
2 cmp::{max, min},
3 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
4 future::Future,
5 sync::Arc,
6};
7
8use alloy::{
9 eips::{BlockId, BlockNumberOrTag},
10 primitives::{utils::format_ether, Address, U256},
11 providers::Provider,
12 rpc::types::Log,
13};
14use anyhow::{bail, ensure, Context};
15use async_lock::{Mutex, RwLock};
16use committable::Committable;
17use futures::stream::{self, StreamExt};
18use hotshot::types::{BLSPubKey, SchnorrPubKey, SignatureKey as _};
19use hotshot_contract_adapter::sol_types::{
20 EspToken::{self, EspTokenInstance},
21 StakeTableV2::{
22 self, ConsensusKeysUpdated, ConsensusKeysUpdatedV2, Delegated, Undelegated, ValidatorExit,
23 ValidatorRegistered, ValidatorRegisteredV2,
24 },
25};
26use hotshot_types::{
27 data::{vid_disperse::VID_TARGET_TOTAL_STAKE, EpochNumber},
28 drb::{
29 election::{generate_stake_cdf, select_randomized_leader, RandomizedCommittee},
30 DrbResult,
31 },
32 stake_table::{HSStakeTable, StakeTableEntry},
33 traits::{
34 election::Membership,
35 node_implementation::{ConsensusTime, NodeType},
36 signature_key::StakeTableEntryType,
37 },
38 PeerConfig,
39};
40use indexmap::IndexMap;
41use thiserror::Error;
42use tokio::{spawn, time::sleep};
43use tracing::Instrument;
44
45#[cfg(any(test, feature = "testing"))]
46use super::v0_3::DAMembers;
47use super::{
48 traits::{MembershipPersistence, StateCatchup},
49 v0_3::{ChainConfig, EventKey, Fetcher, StakeTableEvent, StakeTableUpdateTask, Validator},
50 Header, L1Client, Leaf2, PubKey, SeqTypes,
51};
52use crate::{
53 traits::EventsPersistenceRead,
54 v0_1::{L1Provider, RewardAmount, BLOCKS_PER_YEAR, COMMISSION_BASIS_POINTS, INFLATION_RATE},
55 v0_3::{EventSortingError, ExpectedStakeTableError, FetchRewardError, StakeTableError},
56};
57
58type Epoch = <SeqTypes as NodeType>::Epoch;
59pub type ValidatorMap = IndexMap<Address, Validator<BLSPubKey>>;
60type ApplyEventResult<T> = Result<Result<T, ExpectedStakeTableError>, StakeTableError>;
65
66trait DisplayLog {
68 fn display(&self) -> String;
69}
70
71impl DisplayLog for Log {
72 fn display(&self) -> String {
73 let block = self.block_number.unwrap_or_default();
77 let index = self.log_index.unwrap_or_default();
78 let hash = self.transaction_hash.unwrap_or_default();
79 format!("Log(block={block},index={index},transaction_hash={hash})")
80 }
81}
82
83#[derive(Clone, PartialEq)]
84pub struct StakeTableEvents {
85 registrations: Vec<(ValidatorRegistered, Log)>,
86 registrations_v2: Vec<(ValidatorRegisteredV2, Log)>,
87 deregistrations: Vec<(ValidatorExit, Log)>,
88 delegated: Vec<(Delegated, Log)>,
89 undelegated: Vec<(Undelegated, Log)>,
90 keys: Vec<(ConsensusKeysUpdated, Log)>,
91 keys_v2: Vec<(ConsensusKeysUpdatedV2, Log)>,
92}
93
94impl StakeTableEvents {
95 fn from_l1_logs(
99 registrations: Vec<(ValidatorRegistered, Log)>,
100 registrations_v2: Vec<(ValidatorRegisteredV2, Log)>,
101 deregistrations: Vec<(ValidatorExit, Log)>,
102 delegated: Vec<(Delegated, Log)>,
103 undelegated: Vec<(Undelegated, Log)>,
104 keys: Vec<(ConsensusKeysUpdated, Log)>,
105 keys_v2: Vec<(ConsensusKeysUpdatedV2, Log)>,
106 ) -> Self {
107 let registrations_v2 = registrations_v2
108 .into_iter()
109 .filter(|(event, log)| {
110 event
111 .authenticate()
112 .map_err(|_| {
113 tracing::warn!(
114 "Failed to authenticate ValidatorRegisteredV2 event {}",
115 log.display()
116 );
117 })
118 .is_ok()
119 })
120 .collect();
121 let keys_v2 = keys_v2
122 .into_iter()
123 .filter(|(event, log)| {
124 event
125 .authenticate()
126 .map_err(|_| {
127 tracing::warn!(
128 "Failed to authenticate ConsensusKeysUpdatedV2 event {}",
129 log.display()
130 );
131 })
132 .is_ok()
133 })
134 .collect();
135 Self {
136 registrations,
137 registrations_v2,
138 deregistrations,
139 delegated,
140 undelegated,
141 keys,
142 keys_v2,
143 }
144 }
145
146 pub fn sort_events(self) -> Result<Vec<(EventKey, StakeTableEvent)>, EventSortingError> {
147 let mut events: Vec<(EventKey, StakeTableEvent)> = Vec::new();
148 let Self {
149 registrations,
150 registrations_v2,
151 deregistrations,
152 delegated,
153 undelegated,
154 keys,
155 keys_v2,
156 } = self;
157
158 let key = |log: &Log| -> Result<EventKey, EventSortingError> {
159 let block_number = log
160 .block_number
161 .ok_or(EventSortingError::MissingBlockNumber)?;
162 let log_index = log.log_index.ok_or(EventSortingError::MissingLogIndex)?;
163 Ok((block_number, log_index))
164 };
165
166 for (registration, log) in registrations {
167 events.push((key(&log)?, registration.into()));
168 }
169 for (registration, log) in registrations_v2 {
170 events.push((key(&log)?, registration.into()));
171 }
172 for (dereg, log) in deregistrations {
173 events.push((key(&log)?, dereg.into()));
174 }
175 for (delegation, log) in delegated {
176 events.push((key(&log)?, delegation.into()));
177 }
178 for (undelegated, log) in undelegated {
179 events.push((key(&log)?, undelegated.into()));
180 }
181 for (update, log) in keys {
182 events.push((key(&log)?, update.into()));
183 }
184 for (update, log) in keys_v2 {
185 events.push((key(&log)?, update.into()));
186 }
187
188 events.sort_by_key(|(key, _)| *key);
189 Ok(events)
190 }
191}
192
193#[derive(Debug)]
194pub struct StakeTableState {
195 validators: ValidatorMap,
196 used_bls_keys: HashSet<BLSPubKey>,
197 used_schnorr_keys: HashSet<SchnorrPubKey>,
198}
199
200impl StakeTableState {
201 pub fn new() -> Self {
202 Self {
203 validators: IndexMap::new(),
204 used_bls_keys: HashSet::new(),
205 used_schnorr_keys: HashSet::new(),
206 }
207 }
208
209 pub fn get_validators(self) -> ValidatorMap {
210 self.validators
211 }
212
213 pub fn apply_event(&mut self, event: StakeTableEvent) -> ApplyEventResult<()> {
214 match event {
215 StakeTableEvent::Register(ValidatorRegistered {
216 account,
217 blsVk,
218 schnorrVk,
219 commission,
220 }) => {
221 let stake_table_key: BLSPubKey = blsVk.into();
222 let state_ver_key: SchnorrPubKey = schnorrVk.into();
223
224 let entry = self.validators.entry(account);
225 if let indexmap::map::Entry::Occupied(_) = entry {
226 return Err(StakeTableError::AlreadyRegistered(account));
227 }
228
229 if !self.used_bls_keys.insert(stake_table_key) {
231 return Err(StakeTableError::BlsKeyAlreadyUsed(
232 stake_table_key.to_string(),
233 ));
234 }
235
236 if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
238 return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
239 state_ver_key.to_string(),
240 )));
241 }
242
243 entry.or_insert(Validator {
244 account,
245 stake_table_key,
246 state_ver_key,
247 stake: U256::ZERO,
248 commission,
249 delegators: HashMap::new(),
250 });
251 },
252
253 StakeTableEvent::RegisterV2(reg) => {
254 reg.authenticate()
257 .map_err(|e| StakeTableError::AuthenticationFailed(e.to_string()))?;
258
259 let ValidatorRegisteredV2 {
260 account,
261 blsVK,
262 schnorrVK,
263 commission,
264 ..
265 } = reg;
266
267 let stake_table_key: BLSPubKey = blsVK.into();
268 let state_ver_key: SchnorrPubKey = schnorrVK.into();
269
270 let entry = self.validators.entry(account);
271 if let indexmap::map::Entry::Occupied(_) = entry {
272 return Err(StakeTableError::AlreadyRegistered(account));
273 }
274
275 if !self.used_bls_keys.insert(stake_table_key) {
277 return Err(StakeTableError::BlsKeyAlreadyUsed(
278 stake_table_key.to_string(),
279 ));
280 }
281
282 if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
284 return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
285 state_ver_key.to_string(),
286 )));
287 }
288
289 entry.or_insert(Validator {
290 account,
291 stake_table_key,
292 state_ver_key,
293 stake: U256::ZERO,
294 commission,
295 delegators: HashMap::new(),
296 });
297 },
298
299 StakeTableEvent::Deregister(exit) => {
300 self.validators
301 .shift_remove(&exit.validator)
302 .ok_or(StakeTableError::ValidatorNotFound(exit.validator))?;
303 },
304
305 StakeTableEvent::Delegate(delegated) => {
306 let Delegated {
307 delegator,
308 validator,
309 amount,
310 } = delegated;
311
312 let val = self
313 .validators
314 .get_mut(&validator)
315 .ok_or(StakeTableError::ValidatorNotFound(validator))?;
316
317 if amount.is_zero() {
318 return Err(StakeTableError::ZeroDelegatorStake(delegator));
319 }
320
321 val.stake += amount;
322 val.delegators
325 .entry(delegator)
326 .and_modify(|stake| *stake += amount)
327 .or_insert(amount);
328 },
329
330 StakeTableEvent::Undelegate(undelegated) => {
331 let Undelegated {
332 delegator,
333 validator,
334 amount,
335 } = undelegated;
336
337 let val = self
338 .validators
339 .get_mut(&validator)
340 .ok_or(StakeTableError::ValidatorNotFound(validator))?;
341
342 val.stake = val
343 .stake
344 .checked_sub(amount)
345 .ok_or(StakeTableError::InsufficientStake)?;
346
347 let delegator_stake = val
348 .delegators
349 .get_mut(&delegator)
350 .ok_or(StakeTableError::DelegatorNotFound(delegator))?;
351
352 *delegator_stake = delegator_stake
353 .checked_sub(amount)
354 .ok_or(StakeTableError::InsufficientStake)?;
355
356 if delegator_stake.is_zero() {
357 val.delegators.remove(&delegator);
358 }
359 },
360
361 StakeTableEvent::KeyUpdate(update) => {
362 let ConsensusKeysUpdated {
363 account,
364 blsVK,
365 schnorrVK,
366 } = update;
367
368 let validator = self
369 .validators
370 .get_mut(&account)
371 .ok_or(StakeTableError::ValidatorNotFound(account))?;
372
373 let stake_table_key: BLSPubKey = blsVK.into();
374 let state_ver_key: SchnorrPubKey = schnorrVK.into();
375
376 if !self.used_bls_keys.insert(stake_table_key) {
377 return Err(StakeTableError::BlsKeyAlreadyUsed(
378 stake_table_key.to_string(),
379 ));
380 }
381
382 if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
385 return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
386 state_ver_key.to_string(),
387 )));
388 }
389
390 validator.stake_table_key = stake_table_key;
391 validator.state_ver_key = state_ver_key;
392 },
393
394 StakeTableEvent::KeyUpdateV2(update) => {
395 update
398 .authenticate()
399 .map_err(|e| StakeTableError::AuthenticationFailed(e.to_string()))?;
400
401 let ConsensusKeysUpdatedV2 {
402 account,
403 blsVK,
404 schnorrVK,
405 ..
406 } = update;
407
408 let validator = self
409 .validators
410 .get_mut(&account)
411 .ok_or(StakeTableError::ValidatorNotFound(account))?;
412
413 let stake_table_key: BLSPubKey = blsVK.into();
414 let state_ver_key: SchnorrPubKey = schnorrVK.into();
415
416 if !self.used_bls_keys.insert(stake_table_key) {
418 return Err(StakeTableError::BlsKeyAlreadyUsed(
419 stake_table_key.to_string(),
420 ));
421 }
422
423 if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
426 return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
427 state_ver_key.to_string(),
428 )));
429 }
430
431 validator.stake_table_key = stake_table_key;
432 validator.state_ver_key = state_ver_key;
433 },
434 }
435
436 Ok(Ok(()))
437 }
438}
439
440pub fn validators_from_l1_events<I: Iterator<Item = StakeTableEvent>>(
441 events: I,
442) -> Result<ValidatorMap, StakeTableError> {
443 let mut state = StakeTableState::new();
444 for event in events {
445 match state.apply_event(event.clone()) {
446 Ok(Ok(())) => (), Ok(Err(expected_err)) => {
448 tracing::warn!("Expected error while applying event {event:?}: {expected_err}");
450 },
451 Err(err) => {
452 tracing::error!("Fatal error in applying event {event:?}: {err}");
454 return Err(err);
455 },
456 }
457 }
458 Ok(state.get_validators())
459}
460
461pub(crate) fn select_active_validator_set(
465 validators: &mut ValidatorMap,
466) -> Result<(), StakeTableError> {
467 let total_validators = validators.len();
468
469 validators.retain(|address, validator| {
471 if validator.delegators.is_empty() {
472 tracing::info!("Validator {address:?} does not have any delegator");
473 return false;
474 }
475
476 if validator.stake.is_zero() {
477 tracing::info!("Validator {address:?} does not have any stake");
478 return false;
479 }
480
481 true
482 });
483
484 tracing::debug!(
485 total_validators,
486 filtered = validators.len(),
487 "Filtered out invalid validators"
488 );
489
490 if validators.is_empty() {
491 tracing::warn!("Validator selection failed: no validators passed minimum criteria");
492 return Err(StakeTableError::NoValidValidators);
493 }
494
495 let maximum_stake = validators.values().map(|v| v.stake).max().ok_or_else(|| {
496 tracing::error!("Could not compute maximum stake from filtered validators");
497 StakeTableError::MissingMaximumStake
498 })?;
499
500 let minimum_stake = maximum_stake
501 .checked_div(U256::from(VID_TARGET_TOTAL_STAKE))
502 .ok_or_else(|| {
503 tracing::error!("Overflow while calculating minimum stake threshold");
504 StakeTableError::MinimumStakeOverflow
505 })?;
506
507 let mut valid_stakers: Vec<_> = validators
508 .iter()
509 .filter(|(_, v)| v.stake >= minimum_stake)
510 .map(|(addr, v)| (*addr, v.stake))
511 .collect();
512
513 tracing::info!(
514 count = valid_stakers.len(),
515 "Number of validators above minimum stake threshold"
516 );
517
518 valid_stakers.sort_by_key(|(_, stake)| std::cmp::Reverse(*stake));
520
521 if valid_stakers.len() > 100 {
522 valid_stakers.truncate(100);
523 }
524
525 let selected_addresses: HashSet<_> = valid_stakers.iter().map(|(addr, _)| *addr).collect();
527 validators.retain(|address, _| selected_addresses.contains(address));
528
529 tracing::info!(
530 final_count = validators.len(),
531 "Selected active validator set"
532 );
533
534 Ok(())
535}
536
537pub(crate) fn active_validator_set_from_l1_events<I: Iterator<Item = StakeTableEvent>>(
539 events: I,
540) -> Result<ValidatorMap, StakeTableError> {
541 let mut validators = validators_from_l1_events(events)?;
542 select_active_validator_set(&mut validators)?;
543 Ok(validators)
544}
545
546impl std::fmt::Debug for StakeTableEvent {
547 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
548 match self {
549 StakeTableEvent::Register(event) => write!(f, "Register({:?})", event.account),
550 StakeTableEvent::RegisterV2(event) => write!(f, "RegisterV2({:?})", event.account),
551 StakeTableEvent::Deregister(event) => write!(f, "Deregister({:?})", event.validator),
552 StakeTableEvent::Delegate(event) => write!(f, "Delegate({:?})", event.delegator),
553 StakeTableEvent::Undelegate(event) => write!(f, "Undelegate({:?})", event.delegator),
554 StakeTableEvent::KeyUpdate(event) => write!(f, "KeyUpdate({:?})", event.account),
555 StakeTableEvent::KeyUpdateV2(event) => write!(f, "KeyUpdateV2({:?})", event.account),
556 }
557 }
558}
559
560#[derive(Clone, derive_more::derive::Debug)]
561pub struct EpochCommittees {
563 non_epoch_committee: NonEpochCommittee,
565 state: HashMap<Epoch, EpochCommittee>,
567 randomized_committees: BTreeMap<Epoch, RandomizedCommittee<StakeTableEntry<PubKey>>>,
569 first_epoch: Option<Epoch>,
570 block_reward: RewardAmount,
571 fetcher: Arc<Fetcher>,
572}
573
574impl Fetcher {
575 pub fn new(
576 peers: Arc<dyn StateCatchup>,
577 persistence: Arc<Mutex<dyn MembershipPersistence>>,
578 l1_client: L1Client,
579 chain_config: ChainConfig,
580 ) -> Self {
581 Self {
582 peers,
583 persistence,
584 l1_client,
585 chain_config: Arc::new(Mutex::new(chain_config)),
586 update_task: StakeTableUpdateTask(Mutex::new(None)).into(),
587 }
588 }
589
590 pub async fn spawn_update_loop(&self) {
591 let mut update_task = self.update_task.0.lock().await;
592 if update_task.is_none() {
593 *update_task = Some(spawn(self.update_loop()));
594 }
595 }
596
597 fn update_loop(&self) -> impl Future<Output = ()> {
602 let span = tracing::warn_span!("Stake table update loop");
603 let self_clone = self.clone();
604 let state = self.l1_client.state.clone();
605 let l1_retry = self.l1_client.options().l1_retry_delay;
606 let update_delay = self.l1_client.options().stake_table_update_interval;
607 let chain_config = self.chain_config.clone();
608
609 async move {
610 let stake_contract_address = loop {
615 match chain_config.lock().await.stake_table_contract {
616 Some(addr) => break addr,
617 None => {
618 tracing::debug!(
619 "Stake table contract address not found. Retrying in {l1_retry:?}...",
620 );
621 },
622 }
623 sleep(l1_retry).await;
624 };
625
626 loop {
628 let finalized_block = loop {
629 if let Some(block) = state.lock().await.last_finalized {
630 break block;
631 }
632 tracing::debug!(
633 "Finalized block not yet available. Retrying in {l1_retry:?}",
634 );
635 sleep(l1_retry).await;
636 };
637
638 tracing::debug!(
639 "Attempting to fetch stake table at L1 block {finalized_block:?}",
640 );
641
642 loop {
643 match self_clone
644 .fetch_and_store_stake_table_events(stake_contract_address, finalized_block)
645 .await
646 {
647 Ok(events) => {
648 tracing::info!("Successfully fetched and stored stake table events at block={finalized_block:?}");
649 tracing::debug!("events={events:?}");
650 break;
651 },
652 Err(e) => {
653 tracing::error!(
654 "Error fetching stake table at block {finalized_block:?}. err= {e:#}",
655 );
656 sleep(l1_retry).await;
657 },
658 }
659 }
660
661 tracing::debug!(
662 "Waiting {update_delay:?} before next stake table update...",
663 );
664 sleep(update_delay).await;
665 }
666 }
667 .instrument(span)
668 }
669
670 pub async fn fetch_events(
671 &self,
672 contract: Address,
673 to_block: u64,
674 ) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
675 let persistence_lock = self.persistence.lock().await;
676 let (read_l1_offset, persistence_events) = persistence_lock.load_events(to_block).await?;
677 drop(persistence_lock);
678
679 tracing::info!("loaded events from storage to_block={to_block:?}");
680
681 if let Some(EventsPersistenceRead::Complete) = read_l1_offset {
684 return Ok(persistence_events);
685 }
686
687 let from_block = read_l1_offset
688 .map(|read| match read {
689 EventsPersistenceRead::UntilL1Block(block) => Ok(block + 1),
690 EventsPersistenceRead::Complete => Err(anyhow::anyhow!(
691 "Unexpected state. offset is complete after returning early"
692 )),
693 })
694 .transpose()?;
695
696 ensure!(
697 Some(to_block) >= from_block,
698 "to_block {to_block:?} is less than from_block {from_block:?}"
699 );
700
701 tracing::info!(%to_block, from_block = ?from_block, "Fetching events from contract");
702
703 let contract_events = Self::fetch_events_from_contract(
704 self.l1_client.clone(),
705 contract,
706 from_block,
707 to_block,
708 )
709 .await;
710
711 let contract_events = contract_events.sort_events()?;
712 let mut events = match from_block {
713 Some(_) => persistence_events
714 .into_iter()
715 .chain(contract_events)
716 .collect(),
717 None => contract_events,
718 };
719
720 let len_before_dedup = events.len();
725 events.dedup();
726 let len_after_dedup = events.len();
727 if len_before_dedup != len_after_dedup {
728 tracing::warn!("Duplicate events found and removed. This should not normally happen.")
729 }
730
731 Ok(events)
732 }
733
734 pub async fn fetch_events_from_contract(
736 l1_client: L1Client,
737 contract: Address,
738 from_block: Option<u64>,
739 to_block: u64,
740 ) -> StakeTableEvents {
741 let stake_table_contract = StakeTableV2::new(contract, l1_client.provider.clone());
742
743 let from_block = match from_block {
746 Some(block) => block,
747 None => {
748 loop {
749 match stake_table_contract.initializedAtBlock().call().await {
750 Ok(init_block) => {
751 break init_block._0.to::<u64>();
752 },
753 Err(err) => {
754 tracing::warn!(%err, "Failed to retrieve initial block, retrying..");
756 sleep(l1_client.options().l1_retry_delay).await;
757 },
758 }
759 }
760 },
761 };
762
763 let mut start = from_block;
767 let end = to_block;
768 let chunk_size = l1_client.options().l1_events_max_block_range;
769 let chunks = std::iter::from_fn(move || {
770 let chunk_end = min(start + chunk_size - 1, end);
771 if chunk_end < start {
772 return None;
773 }
774
775 let chunk = (start, chunk_end);
776 start = chunk_end + 1;
777 Some(chunk)
778 });
779
780 let registered_events = stream::iter(chunks.clone()).then(|(from, to)| {
783 let retry_delay = l1_client.options().l1_retry_delay;
784 let stake_table_contract = stake_table_contract.clone();
785 async move {
786 tracing::debug!(from, to, "fetch ValidatorRegistered events in range");
787 loop {
788 match stake_table_contract
789 .clone()
790 .ValidatorRegistered_filter()
791 .from_block(from)
792 .to_block(to)
793 .query()
794 .await
795 {
796 Ok(events) => break stream::iter(events),
797 Err(err) => {
798 tracing::warn!(from, to, %err, "ValidatorRegistered Error");
799 sleep(retry_delay).await;
800 },
801 }
802 }
803 }
804 });
805
806 let registered_events_v2 = stream::iter(chunks.clone()).then(|(from, to)| {
809 let retry_delay = l1_client.options().l1_retry_delay;
810 let stake_table_contract = stake_table_contract.clone();
811 async move {
812 tracing::debug!(from, to, "fetch ValidatorRegisteredV2 events in range");
813 loop {
814 match stake_table_contract
815 .clone()
816 .ValidatorRegisteredV2_filter()
817 .from_block(from)
818 .to_block(to)
819 .query()
820 .await
821 {
822 Ok(events) => {
823 break stream::iter(events.into_iter().filter(|(event, log)| {
824 if let Err(e) = event.authenticate() {
825 tracing::warn!(%e, "Failed to authenticate ValidatorRegisteredV2 event: {}", log.display());
826 return false;
827 }
828 true
829 }));
830 },
831 Err(err) => {
832 tracing::warn!(from, to, %err, "ValidatorRegisteredV2 Error");
833 sleep(retry_delay).await;
834 },
835 }
836 }
837 }
838 });
839
840 let deregistered_events = stream::iter(chunks.clone()).then(|(from, to)| {
842 let retry_delay = l1_client.options().l1_retry_delay;
843 let stake_table_contract = stake_table_contract.clone();
844 async move {
845 tracing::debug!(from, to, "fetch ValidatorExit events in range");
846 loop {
847 match stake_table_contract
848 .ValidatorExit_filter()
849 .from_block(from)
850 .to_block(to)
851 .query()
852 .await
853 {
854 Ok(events) => break stream::iter(events),
855 Err(err) => {
856 tracing::warn!(from, to, %err, "ValidatorExit Error");
857 sleep(retry_delay).await;
858 },
859 }
860 }
861 }
862 });
863
864 let delegated_events = stream::iter(chunks.clone()).then(|(from, to)| {
866 let retry_delay = l1_client.options().l1_retry_delay;
867 let stake_table_contract = stake_table_contract.clone();
868 async move {
869 tracing::debug!(from, to, "fetch Delegated events in range");
870 loop {
871 match stake_table_contract
872 .Delegated_filter()
873 .from_block(from)
874 .to_block(to)
875 .query()
876 .await
877 {
878 Ok(events) => break stream::iter(events),
879 Err(err) => {
880 tracing::warn!(from, to, %err, "Delegated Error");
881 sleep(retry_delay).await;
882 },
883 }
884 }
885 }
886 });
887 let undelegated_events = stream::iter(chunks.clone()).then(|(from, to)| {
889 let retry_delay = l1_client.options().l1_retry_delay;
890 let stake_table_contract = stake_table_contract.clone();
891 async move {
892 tracing::debug!(from, to, "fetch Undelegated events in range");
893 loop {
894 match stake_table_contract
895 .Undelegated_filter()
896 .from_block(from)
897 .to_block(to)
898 .query()
899 .await
900 {
901 Ok(events) => break stream::iter(events),
902 Err(err) => {
903 tracing::warn!(from, to, %err, "Undelegated Error");
904 sleep(retry_delay).await;
905 },
906 }
907 }
908 }
909 });
910
911 let keys_update_events = stream::iter(chunks.clone()).then(|(from, to)| {
913 let retry_delay = l1_client.options().l1_retry_delay;
914 let stake_table_contract = stake_table_contract.clone();
915 async move {
916 tracing::debug!(from, to, "fetch ConsensusKeysUpdated events in range");
917 loop {
918 match stake_table_contract
919 .ConsensusKeysUpdated_filter()
920 .from_block(from)
921 .to_block(to)
922 .query()
923 .await
924 {
925 Ok(events) => break stream::iter(events),
926 Err(err) => {
927 tracing::warn!(from, to, %err, "ConsensusKeysUpdated Error");
928 sleep(retry_delay).await;
929 },
930 }
931 }
932 }
933 });
934
935 let keys_update_events_v2 = stream::iter(chunks).then(|(from, to)| {
937 let retry_delay = l1_client.options().l1_retry_delay;
938 let stake_table_contract = stake_table_contract.clone();
939 async move {
940 tracing::debug!(from, to, "fetch ConsensusKeysUpdatedV2 events in range");
941 loop {
942 match stake_table_contract
943 .ConsensusKeysUpdatedV2_filter()
944 .from_block(from)
945 .to_block(to)
946 .query()
947 .await
948 {
949 Ok(events) => {
950 break stream::iter(events.into_iter().filter(|(event, log)| {
951 if let Err(e) = event.authenticate() {
952 tracing::warn!(%e, "Failed to authenticate ConsensusKeysUpdatedV2 event {}", log.display());
953 return false;
954 }
955 true
956 }));
957 },
958 Err(err) => {
959 tracing::warn!(from, to, %err, "ConsensusKeysUpdatedV2 Error");
960 sleep(retry_delay).await;
961 },
962 }
963 }
964 }
965 });
966
967 let registrations = registered_events.flatten().collect().await;
968 let registrations_v2 = registered_events_v2.flatten().collect().await;
969 let deregistrations = deregistered_events.flatten().collect().await;
970 let delegated = delegated_events.flatten().collect().await;
971 let undelegated = undelegated_events.flatten().collect().await;
972 let keys = keys_update_events.flatten().collect().await;
973 let keys_v2 = keys_update_events_v2.flatten().collect().await;
974
975 StakeTableEvents::from_l1_logs(
976 registrations,
977 registrations_v2,
978 deregistrations,
979 delegated,
980 undelegated,
981 keys,
982 keys_v2,
983 )
984 }
985
986 pub async fn fetch_and_store_stake_table_events(
992 &self,
993 contract: Address,
994 to_block: u64,
995 ) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
996 let events = self.fetch_events(contract, to_block).await?;
997
998 tracing::info!("storing events in storage to_block={to_block:?}");
999
1000 {
1001 let persistence_lock = self.persistence.lock().await;
1002 persistence_lock
1003 .store_events(to_block, events.clone())
1004 .await
1005 .inspect_err(|e| tracing::error!("failed to store events. err={e}"))?;
1006 }
1007
1008 Ok(events)
1009 }
1010
1011 pub async fn fetch_all_validators_from_contract(
1013 l1_client: L1Client,
1014 contract: Address,
1015 to_block: u64,
1016 ) -> anyhow::Result<ValidatorMap> {
1017 let events = Self::fetch_events_from_contract(l1_client, contract, None, to_block).await;
1018 let sorted = events.sort_events()?;
1019 validators_from_l1_events(sorted.into_iter().map(|(_, e)| e))
1021 .context("failed to construct validators set from l1 events")
1022 }
1023 pub async fn fetch_block_reward(&self) -> Result<RewardAmount, FetchRewardError> {
1040 let chain_config = *self.chain_config.lock().await;
1041
1042 let stake_table_contract = chain_config
1043 .stake_table_contract
1044 .ok_or(FetchRewardError::MissingStakeTableContract)?;
1045
1046 let provider = self.l1_client.provider.clone();
1047 let stake_table = StakeTableV2::new(stake_table_contract, provider.clone());
1048
1049 let stake_table_init_block = stake_table
1053 .initializedAtBlock()
1054 .block(BlockId::finalized())
1055 .call()
1056 .await
1057 .map_err(FetchRewardError::ContractCall)?
1058 ._0
1059 .to::<u64>();
1060
1061 tracing::info!("stake table init block ={stake_table_init_block}");
1062
1063 let token_address = stake_table
1064 .token()
1065 .block(BlockId::finalized())
1066 .call()
1067 .await
1068 .map_err(FetchRewardError::TokenAddressFetch)?
1069 ._0;
1070
1071 let token = EspToken::new(token_address, provider.clone());
1072
1073 let init_logs = token
1080 .Initialized_filter()
1081 .from_block(0u64)
1082 .to_block(BlockNumberOrTag::Finalized)
1083 .query()
1084 .await;
1085
1086 let init_log = match init_logs {
1087 Ok(init_logs) => {
1088 if init_logs.is_empty() {
1089 tracing::error!(
1090 "Token Initialized event logs are empty. This should never happen"
1091 );
1092 return Err(FetchRewardError::MissingInitializedEvent);
1093 }
1094
1095 let (_, init_log) = init_logs[0].clone();
1096
1097 tracing::debug!(tx_hash = ?init_log.transaction_hash, "Found token `Initialized` event");
1098 init_log
1099 },
1100 Err(err) => {
1101 tracing::warn!(
1102 "RPC returned error {err:?}. will fallback to scanning over fixed block range"
1103 );
1104 self.scan_token_contract_initialized_event_log(stake_table_init_block, token)
1105 .await?
1106 },
1107 };
1108
1109 let tx_hash =
1111 init_log
1112 .transaction_hash
1113 .ok_or_else(|| FetchRewardError::MissingTransactionHash {
1114 init_log: init_log.clone().into(),
1115 })?;
1116
1117 let init_tx = provider
1119 .get_transaction_receipt(tx_hash)
1120 .await
1121 .map_err(FetchRewardError::Rpc)?
1122 .ok_or_else(|| FetchRewardError::MissingTransactionReceipt {
1123 tx_hash: tx_hash.to_string(),
1124 })?;
1125
1126 let mint_transfer = init_tx.decoded_log::<EspToken::Transfer>().ok_or(
1127 FetchRewardError::DecodeTransferLog {
1128 tx_hash: tx_hash.to_string(),
1129 },
1130 )?;
1131
1132 tracing::debug!("mint transfer event ={mint_transfer:?}");
1133 if mint_transfer.from != Address::ZERO {
1134 return Err(FetchRewardError::InvalidMintFromAddress);
1135 }
1136
1137 let initial_supply = mint_transfer.value;
1138
1139 tracing::info!("Initial token amount: {} ESP", format_ether(initial_supply));
1140
1141 let reward = ((initial_supply * U256::from(INFLATION_RATE)) / U256::from(BLOCKS_PER_YEAR))
1142 .checked_div(U256::from(COMMISSION_BASIS_POINTS))
1143 .ok_or(FetchRewardError::DivisionByZero)?;
1144
1145 Ok(RewardAmount(reward))
1146 }
1147
1148 pub async fn scan_token_contract_initialized_event_log(
1149 &self,
1150 stake_table_init_block: u64,
1151 token: EspTokenInstance<(), L1Provider>,
1152 ) -> Result<Log, FetchRewardError> {
1153 let max_events_range = self.l1_client.options().l1_events_max_block_range;
1154 const MAX_BLOCKS_SCANNED: u64 = 200_000;
1155 let mut total_scanned = 0;
1156
1157 let mut from_block = stake_table_init_block.saturating_sub(max_events_range);
1158 let mut to_block = stake_table_init_block;
1159
1160 loop {
1161 if total_scanned >= MAX_BLOCKS_SCANNED {
1162 tracing::error!(
1163 total_scanned,
1164 "Exceeded maximum scan range while searching for token Initialized event"
1165 );
1166 return Err(FetchRewardError::ExceededMaxScanRange(MAX_BLOCKS_SCANNED));
1167 }
1168
1169 let init_logs = token
1170 .Initialized_filter()
1171 .from_block(from_block)
1172 .to_block(to_block)
1173 .query()
1174 .await
1175 .map_err(FetchRewardError::ScanQueryFailed)?;
1176
1177 if !init_logs.is_empty() {
1178 let (_, init_log) = init_logs[0].clone();
1179 tracing::info!(
1180 from_block,
1181 tx_hash = ?init_log.transaction_hash,
1182 "Found token Initialized event during scan"
1183 );
1184 return Ok(init_log);
1185 }
1186
1187 total_scanned += max_events_range;
1188 from_block = from_block.saturating_sub(max_events_range);
1189 to_block = to_block.saturating_sub(max_events_range);
1190 }
1191 }
1192
1193 pub async fn fetch(&self, epoch: Epoch, header: Header) -> anyhow::Result<ValidatorMap> {
1194 let chain_config = self.get_chain_config(&header).await?;
1195 *self.chain_config.lock().await = chain_config;
1197
1198 let Some(address) = chain_config.stake_table_contract else {
1199 bail!("No stake table contract address found in Chain config");
1200 };
1201
1202 let Some(l1_finalized_block_info) = header.l1_finalized() else {
1203 bail!("The epoch root for epoch {epoch} is missing the L1 finalized block info. This is a fatal error. Consensus is blocked and will not recover.");
1204 };
1205
1206 let events = match self
1207 .fetch_and_store_stake_table_events(address, l1_finalized_block_info.number())
1208 .await
1209 .map_err(GetStakeTablesError::L1ClientFetchError)
1210 {
1211 Ok(events) => events,
1212 Err(e) => {
1213 bail!("failed to fetch stake table events {e:?}");
1214 },
1215 };
1216
1217 match active_validator_set_from_l1_events(events.into_iter().map(|(_, e)| e)) {
1218 Ok(validators) => Ok(validators),
1219 Err(e) => {
1220 bail!("failed to construct stake table {e:?}");
1221 },
1222 }
1223 }
1224
1225 pub(crate) async fn get_chain_config(&self, header: &Header) -> anyhow::Result<ChainConfig> {
1228 let chain_config = self.chain_config.lock().await;
1229 let peers = self.peers.clone();
1230 let header_cf = header.chain_config();
1231 if chain_config.commit() == header_cf.commit() {
1232 return Ok(*chain_config);
1233 }
1234
1235 let cf = match header_cf.resolve() {
1236 Some(cf) => cf,
1237 None => peers
1238 .fetch_chain_config(header_cf.commit())
1239 .await
1240 .map_err(|err| {
1241 tracing::error!("failed to get chain_config from peers. err: {err:?}");
1242 err
1243 })?,
1244 };
1245
1246 Ok(cf)
1247 }
1248
1249 #[cfg(any(test, feature = "testing"))]
1250 pub fn mock() -> Self {
1251 use crate::{mock, v0_1::NoStorage};
1252 let chain_config = ChainConfig::default();
1253 let l1 = L1Client::new(vec!["http://localhost:3331".parse().unwrap()])
1254 .expect("Failed to create L1 client");
1255
1256 let peers = Arc::new(mock::MockStateCatchup::default());
1257 let persistence = NoStorage;
1258
1259 Self::new(peers, Arc::new(Mutex::new(persistence)), l1, chain_config)
1260 }
1261}
1262
1263#[derive(Clone, Debug)]
1265struct NonEpochCommittee {
1266 eligible_leaders: Vec<PeerConfig<SeqTypes>>,
1270
1271 stake_table: Vec<PeerConfig<SeqTypes>>,
1273
1274 da_members: Vec<PeerConfig<SeqTypes>>,
1276
1277 indexed_stake_table: HashMap<PubKey, PeerConfig<SeqTypes>>,
1279
1280 indexed_da_members: HashMap<PubKey, PeerConfig<SeqTypes>>,
1282}
1283
1284#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
1286pub struct EpochCommittee {
1287 eligible_leaders: Vec<PeerConfig<SeqTypes>>,
1291 stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>>,
1293 validators: ValidatorMap,
1294 address_mapping: HashMap<BLSPubKey, Address>,
1295}
1296
1297impl EpochCommittees {
1298 pub fn first_epoch(&self) -> Option<Epoch> {
1299 self.first_epoch
1300 }
1301
1302 pub fn fetcher(&self) -> &Fetcher {
1303 &self.fetcher
1304 }
1305
1306 fn update(
1312 &mut self,
1313 epoch: EpochNumber,
1314 validators: ValidatorMap,
1315 block_reward: Option<RewardAmount>,
1316 ) {
1317 let mut address_mapping = HashMap::new();
1318 let stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>> = validators
1319 .values()
1320 .map(|v| {
1321 address_mapping.insert(v.stake_table_key, v.account);
1322 (
1323 v.stake_table_key,
1324 PeerConfig {
1325 stake_table_entry: BLSPubKey::stake_table_entry(
1326 &v.stake_table_key,
1327 v.stake,
1328 ),
1329 state_ver_key: v.state_ver_key.clone(),
1330 },
1331 )
1332 })
1333 .collect();
1334
1335 let eligible_leaders: Vec<PeerConfig<SeqTypes>> =
1336 stake_table.iter().map(|(_, l)| l.clone()).collect();
1337
1338 self.state.insert(
1339 epoch,
1340 EpochCommittee {
1341 eligible_leaders,
1342 stake_table,
1343 validators,
1344 address_mapping,
1345 },
1346 );
1347
1348 if let Some(block_reward) = block_reward {
1349 self.block_reward = block_reward;
1350 }
1351 }
1352
1353 pub fn validators(&self, epoch: &Epoch) -> anyhow::Result<ValidatorMap> {
1354 Ok(self
1355 .state
1356 .get(epoch)
1357 .context("state for found")?
1358 .validators
1359 .clone())
1360 }
1361
1362 pub fn address(&self, epoch: &Epoch, bls_key: BLSPubKey) -> anyhow::Result<Address> {
1363 let mapping = self
1364 .state
1365 .get(epoch)
1366 .context("state for found")?
1367 .address_mapping
1368 .clone();
1369
1370 Ok(*mapping.get(&bls_key).context(format!(
1371 "failed to get ethereum address for bls key {bls_key}. epoch={epoch}"
1372 ))?)
1373 }
1374
1375 pub fn get_validator_config(
1376 &self,
1377 epoch: &Epoch,
1378 key: BLSPubKey,
1379 ) -> anyhow::Result<Validator<BLSPubKey>> {
1380 let address = self.address(epoch, key)?;
1381 let validators = self.validators(epoch)?;
1382 validators
1383 .get(&address)
1384 .context("validator not found")
1385 .cloned()
1386 }
1387
1388 pub fn block_reward(&self) -> RewardAmount {
1389 self.block_reward
1390 }
1391
1392 pub fn new_stake(
1394 committee_members: Vec<PeerConfig<SeqTypes>>,
1397 da_members: Vec<PeerConfig<SeqTypes>>,
1398 block_reward: RewardAmount,
1399 fetcher: Fetcher,
1400 ) -> Self {
1401 let stake_table: Vec<_> = committee_members
1403 .iter()
1404 .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
1405 .cloned()
1406 .collect();
1407
1408 let eligible_leaders = stake_table.clone();
1409 let da_members: Vec<_> = da_members
1411 .iter()
1412 .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
1413 .cloned()
1414 .collect();
1415
1416 let indexed_stake_table: HashMap<PubKey, _> = stake_table
1418 .iter()
1419 .map(|peer_config| {
1420 (
1421 PubKey::public_key(&peer_config.stake_table_entry),
1422 peer_config.clone(),
1423 )
1424 })
1425 .collect();
1426
1427 let indexed_da_members: HashMap<PubKey, _> = da_members
1429 .iter()
1430 .map(|peer_config| {
1431 (
1432 PubKey::public_key(&peer_config.stake_table_entry),
1433 peer_config.clone(),
1434 )
1435 })
1436 .collect();
1437
1438 let members = NonEpochCommittee {
1439 eligible_leaders,
1440 stake_table,
1441 da_members,
1442 indexed_stake_table,
1443 indexed_da_members,
1444 };
1445
1446 let mut map = HashMap::new();
1447 let epoch_committee = EpochCommittee {
1448 eligible_leaders: members.eligible_leaders.clone(),
1449 stake_table: members
1450 .stake_table
1451 .iter()
1452 .map(|x| (PubKey::public_key(&x.stake_table_entry), x.clone()))
1453 .collect(),
1454 validators: Default::default(),
1455 address_mapping: HashMap::new(),
1456 };
1457 map.insert(Epoch::genesis(), epoch_committee.clone());
1458 map.insert(Epoch::genesis() + 1u64, epoch_committee.clone());
1460
1461 Self {
1462 non_epoch_committee: members,
1463 state: map,
1464 randomized_committees: BTreeMap::new(),
1465 first_epoch: None,
1466 block_reward,
1467 fetcher: Arc::new(fetcher),
1468 }
1469 }
1470
1471 pub async fn reload_stake(&mut self, limit: u64) {
1472 let loaded_stake = match self
1475 .fetcher
1476 .persistence
1477 .lock()
1478 .await
1479 .load_latest_stake(limit)
1480 .await
1481 {
1482 Ok(Some(loaded)) => loaded,
1483 Ok(None) => {
1484 tracing::warn!("No stake table history found in persistence!");
1485 return;
1486 },
1487 Err(e) => {
1488 tracing::error!("Failed to load stake table history from persistence: {e}");
1489 return;
1490 },
1491 };
1492
1493 for (epoch, stake_table) in loaded_stake {
1494 self.update(epoch, stake_table, None);
1495 }
1496 }
1497
1498 fn get_stake_table(&self, epoch: &Option<Epoch>) -> Option<Vec<PeerConfig<SeqTypes>>> {
1499 if let Some(epoch) = epoch {
1500 self.state
1501 .get(epoch)
1502 .map(|committee| committee.stake_table.clone().into_values().collect())
1503 } else {
1504 Some(self.non_epoch_committee.stake_table.clone())
1505 }
1506 }
1507}
1508
1509#[derive(Error, Debug)]
1510enum GetStakeTablesError {
1512 #[error("Error fetching from L1: {0}")]
1513 L1ClientFetchError(anyhow::Error),
1514}
1515
1516#[derive(Error, Debug)]
1517#[error("Could not lookup leader")] pub struct LeaderLookupError;
1519
1520impl Membership<SeqTypes> for EpochCommittees {
1522 type Error = LeaderLookupError;
1523 fn new(
1525 _committee_members: Vec<PeerConfig<SeqTypes>>,
1528 _da_members: Vec<PeerConfig<SeqTypes>>,
1529 ) -> Self {
1530 panic!("This function has been replaced with new_stake()");
1531 }
1532
1533 fn stake_table(&self, epoch: Option<Epoch>) -> HSStakeTable<SeqTypes> {
1535 self.get_stake_table(&epoch).unwrap_or_default().into()
1536 }
1537 fn da_stake_table(&self, _epoch: Option<Epoch>) -> HSStakeTable<SeqTypes> {
1539 self.non_epoch_committee.da_members.clone().into()
1540 }
1541
1542 fn committee_members(
1544 &self,
1545 _view_number: <SeqTypes as NodeType>::View,
1546 epoch: Option<Epoch>,
1547 ) -> BTreeSet<PubKey> {
1548 let stake_table = self.stake_table(epoch);
1549 stake_table
1550 .iter()
1551 .map(|x| PubKey::public_key(&x.stake_table_entry))
1552 .collect()
1553 }
1554
1555 fn da_committee_members(
1557 &self,
1558 _view_number: <SeqTypes as NodeType>::View,
1559 _epoch: Option<Epoch>,
1560 ) -> BTreeSet<PubKey> {
1561 self.non_epoch_committee
1562 .indexed_da_members
1563 .clone()
1564 .into_keys()
1565 .collect()
1566 }
1567
1568 fn stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> Option<PeerConfig<SeqTypes>> {
1570 if let Some(epoch) = epoch {
1572 self.state
1573 .get(&epoch)
1574 .and_then(|h| h.stake_table.get(pub_key))
1575 .cloned()
1576 } else {
1577 self.non_epoch_committee
1578 .indexed_stake_table
1579 .get(pub_key)
1580 .cloned()
1581 }
1582 }
1583
1584 fn da_stake(&self, pub_key: &PubKey, _epoch: Option<Epoch>) -> Option<PeerConfig<SeqTypes>> {
1586 self.non_epoch_committee
1588 .indexed_da_members
1589 .get(pub_key)
1590 .cloned()
1591 }
1592
1593 fn has_stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> bool {
1595 self.stake(pub_key, epoch)
1596 .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1597 .unwrap_or_default()
1598 }
1599
1600 fn has_da_stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> bool {
1602 self.da_stake(pub_key, epoch)
1603 .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1604 .unwrap_or_default()
1605 }
1606
1607 fn lookup_leader(
1620 &self,
1621 view_number: <SeqTypes as NodeType>::View,
1622 epoch: Option<Epoch>,
1623 ) -> Result<PubKey, Self::Error> {
1624 match (self.first_epoch(), epoch) {
1625 (Some(first_epoch), Some(epoch)) => {
1626 if epoch < first_epoch {
1627 tracing::error!(
1628 "lookup_leader called with epoch {} before first epoch {}",
1629 epoch,
1630 first_epoch,
1631 );
1632 return Err(LeaderLookupError);
1633 }
1634 let Some(randomized_committee) = self.randomized_committees.get(&epoch) else {
1635 tracing::error!(
1636 "We are missing the randomized committee for epoch {}",
1637 epoch
1638 );
1639 return Err(LeaderLookupError);
1640 };
1641
1642 Ok(PubKey::public_key(&select_randomized_leader(
1643 randomized_committee,
1644 *view_number,
1645 )))
1646 },
1647 (_, None) => {
1648 let leaders = &self.non_epoch_committee.eligible_leaders;
1649
1650 let index = *view_number as usize % leaders.len();
1651 let res = leaders[index].clone();
1652 Ok(PubKey::public_key(&res.stake_table_entry))
1653 },
1654 (None, Some(epoch)) => {
1655 tracing::error!(
1656 "lookup_leader called with epoch {} but we don't have a first epoch",
1657 epoch,
1658 );
1659 Err(LeaderLookupError)
1660 },
1661 }
1662 }
1663
1664 fn total_nodes(&self, epoch: Option<Epoch>) -> usize {
1666 self.stake_table(epoch).len()
1667 }
1668
1669 fn da_total_nodes(&self, epoch: Option<Epoch>) -> usize {
1671 self.da_stake_table(epoch).len()
1672 }
1673
1674 fn success_threshold(&self, epoch: Option<Epoch>) -> U256 {
1676 let total_stake = self.total_stake(epoch);
1677 let one = U256::ONE;
1678 let two = U256::from(2);
1679 let three = U256::from(3);
1680 if total_stake < U256::MAX / two {
1681 ((total_stake * two) / three) + one
1682 } else {
1683 ((total_stake / three) * two) + two
1684 }
1685 }
1686
1687 fn da_success_threshold(&self, epoch: Option<Epoch>) -> U256 {
1689 let total_stake = self.total_da_stake(epoch);
1690 let one = U256::ONE;
1691 let two = U256::from(2);
1692 let three = U256::from(3);
1693
1694 if total_stake < U256::MAX / two {
1695 ((total_stake * two) / three) + one
1696 } else {
1697 ((total_stake / three) * two) + two
1698 }
1699 }
1700
1701 fn failure_threshold(&self, epoch: Option<Epoch>) -> U256 {
1703 let total_stake = self.total_stake(epoch);
1704 let one = U256::ONE;
1705 let three = U256::from(3);
1706
1707 (total_stake / three) + one
1708 }
1709
1710 fn upgrade_threshold(&self, epoch: Option<Epoch>) -> U256 {
1712 let total_stake = self.total_stake(epoch);
1713 let nine = U256::from(9);
1714 let ten = U256::from(10);
1715
1716 let normal_threshold = self.success_threshold(epoch);
1717 let higher_threshold = if total_stake < U256::MAX / nine {
1718 (total_stake * nine) / ten
1719 } else {
1720 (total_stake / ten) * nine
1721 };
1722
1723 max(higher_threshold, normal_threshold)
1724 }
1725
1726 async fn add_epoch_root(
1727 membership: Arc<RwLock<Self>>,
1728 epoch: Epoch,
1729 block_header: Header,
1730 ) -> anyhow::Result<()> {
1731 let membership_reader = membership.read().await;
1732 if membership_reader.state.contains_key(&epoch) {
1733 tracing::info!(
1734 "We already have the stake table for epoch {}. Skipping L1 fetching.",
1735 epoch
1736 );
1737 return Ok(());
1738 }
1739 let fetcher = Arc::clone(&membership_reader.fetcher);
1740 drop(membership_reader);
1741
1742 let stake_tables = fetcher.fetch(epoch, block_header).await?;
1743
1744 let mut block_reward = None;
1745
1746 {
1747 let membership_reader = membership.read().await;
1748 if membership_reader.block_reward == RewardAmount(U256::ZERO) {
1755 block_reward = Some(fetcher.fetch_block_reward().await?);
1756 }
1757 }
1758
1759 {
1761 let persistence_lock = fetcher.persistence.lock().await;
1762 if let Err(e) = persistence_lock
1763 .store_stake(epoch, stake_tables.clone())
1764 .await
1765 {
1766 tracing::error!(?e, "`add_epoch_root`, error storing stake table");
1767 }
1768 }
1769
1770 let mut membership_writer = membership.write().await;
1771 membership_writer.update(epoch, stake_tables, block_reward);
1772 Ok(())
1773 }
1774
1775 fn has_stake_table(&self, epoch: Epoch) -> bool {
1776 self.state.contains_key(&epoch)
1777 }
1778
1779 fn has_randomized_stake_table(&self, epoch: Epoch) -> anyhow::Result<bool> {
1791 let Some(first_epoch) = self.first_epoch else {
1792 bail!(
1793 "Called has_randomized_stake_table with epoch {} but first_epoch is None",
1794 epoch
1795 );
1796 };
1797 ensure!(
1798 epoch >= first_epoch,
1799 "Called has_randomized_stake_table with epoch {} but first_epoch is {}",
1800 epoch,
1801 first_epoch
1802 );
1803 Ok(self.randomized_committees.contains_key(&epoch))
1804 }
1805
1806 async fn get_epoch_root(
1807 membership: Arc<RwLock<Self>>,
1808 block_height: u64,
1809 epoch: Epoch,
1810 ) -> anyhow::Result<Leaf2> {
1811 let membership_reader = membership.read().await;
1812 let peers = membership_reader.fetcher.peers.clone();
1813 let stake_table = membership_reader.stake_table(Some(epoch)).clone();
1814 let success_threshold = membership_reader.success_threshold(Some(epoch));
1815 drop(membership_reader);
1816
1817 let leaf: Leaf2 = peers
1819 .fetch_leaf(block_height, stake_table.clone(), success_threshold)
1820 .await?;
1821
1822 Ok(leaf)
1823 }
1824
1825 async fn get_epoch_drb(
1826 membership: Arc<RwLock<Self>>,
1827 block_height: u64,
1828 epoch: Epoch,
1829 ) -> anyhow::Result<DrbResult> {
1830 let membership_reader = membership.read().await;
1831 let peers = membership_reader.fetcher.peers.clone();
1832 let stake_table = membership_reader.stake_table(Some(epoch)).clone();
1833 let success_threshold = membership_reader.success_threshold(Some(epoch));
1834 drop(membership_reader);
1835
1836 tracing::debug!(
1837 "Getting DRB for epoch {}, block height {}",
1838 epoch,
1839 block_height
1840 );
1841 let drb_leaf = peers
1842 .try_fetch_leaf(1, block_height, stake_table, success_threshold)
1843 .await?;
1844
1845 let Some(drb) = drb_leaf.next_drb_result else {
1846 tracing::error!(
1847 "We received a leaf that should contain a DRB result, but the DRB result is missing: {:?}",
1848 drb_leaf
1849 );
1850
1851 bail!("DRB leaf is missing the DRB result.");
1852 };
1853
1854 Ok(drb)
1855 }
1856
1857 fn add_drb_result(&mut self, epoch: Epoch, drb: DrbResult) {
1858 let Some(raw_stake_table) = self.state.get(&epoch) else {
1859 tracing::error!("add_drb_result({epoch}, {drb:?}) was called, but we do not yet have the stake table for epoch {epoch}");
1860 return;
1861 };
1862
1863 let leaders = raw_stake_table
1864 .eligible_leaders
1865 .clone()
1866 .into_iter()
1867 .map(|peer_config| peer_config.stake_table_entry)
1868 .collect::<Vec<_>>();
1869 let randomized_committee = generate_stake_cdf(leaders, drb);
1870
1871 self.randomized_committees
1872 .insert(epoch, randomized_committee);
1873 }
1874
1875 fn set_first_epoch(&mut self, epoch: Epoch, initial_drb_result: DrbResult) {
1876 self.first_epoch = Some(epoch);
1877
1878 let epoch_committee = self.state.get(&Epoch::genesis()).unwrap().clone();
1879 self.state.insert(epoch, epoch_committee.clone());
1880 self.state.insert(epoch + 1, epoch_committee);
1881 self.add_drb_result(epoch, initial_drb_result);
1882 self.add_drb_result(epoch + 1, initial_drb_result);
1883 }
1884
1885 fn first_epoch(&self) -> Option<<SeqTypes as NodeType>::Epoch> {
1886 self.first_epoch
1887 }
1888}
1889
1890#[cfg(any(test, feature = "testing"))]
1891impl super::v0_3::StakeTable {
1892 pub fn mock(n: u64) -> Self {
1894 [..n]
1895 .iter()
1896 .map(|_| PeerConfig::default())
1897 .collect::<Vec<PeerConfig<SeqTypes>>>()
1898 .into()
1899 }
1900}
1901
1902#[cfg(any(test, feature = "testing"))]
1903impl DAMembers {
1904 pub fn mock(n: u64) -> Self {
1906 [..n]
1907 .iter()
1908 .map(|_| PeerConfig::default())
1909 .collect::<Vec<PeerConfig<SeqTypes>>>()
1910 .into()
1911 }
1912}
1913
1914#[cfg(any(test, feature = "testing"))]
1915pub mod testing {
1916 use alloy::primitives::Bytes;
1917 use hotshot_contract_adapter::{
1918 sol_types::{EdOnBN254PointSol, G1PointSol, G2PointSol},
1919 stake_table::{sign_address_bls, sign_address_schnorr},
1920 };
1921 use hotshot_types::{light_client::StateKeyPair, signature_key::BLSKeyPair};
1922 use rand::{Rng as _, RngCore as _};
1923
1924 use super::*;
1925
1926 #[derive(Debug, Clone)]
1929 pub struct TestValidator {
1930 pub account: Address,
1931 pub bls_vk: G2PointSol,
1932 pub schnorr_vk: EdOnBN254PointSol,
1933 pub commission: u16,
1934 pub bls_sig: G1PointSol,
1935 pub schnorr_sig: Bytes,
1936 }
1937
1938 impl TestValidator {
1939 pub fn random() -> Self {
1940 let account = Address::random();
1941 let commission = rand::thread_rng().gen_range(0..10000);
1942 Self::random_update_keys(account, commission)
1943 }
1944
1945 pub fn randomize_keys(&self) -> Self {
1946 Self::random_update_keys(self.account, self.commission)
1947 }
1948
1949 fn random_update_keys(account: Address, commission: u16) -> Self {
1950 let mut rng = &mut rand::thread_rng();
1951 let mut seed = [0u8; 32];
1952 rng.fill_bytes(&mut seed);
1953 let bls_key_pair = BLSKeyPair::generate(&mut rng);
1954 let bls_sig = sign_address_bls(&bls_key_pair, account);
1955 let schnorr_key_pair = StateKeyPair::generate_from_seed_indexed(seed, 0);
1956 let schnorr_sig = sign_address_schnorr(&schnorr_key_pair, account);
1957 Self {
1958 account,
1959 bls_vk: bls_key_pair.ver_key().to_affine().into(),
1960 schnorr_vk: schnorr_key_pair.ver_key().to_affine().into(),
1961 commission,
1962 bls_sig,
1963 schnorr_sig,
1964 }
1965 }
1966 }
1967
1968 impl From<&TestValidator> for ValidatorRegistered {
1969 fn from(value: &TestValidator) -> Self {
1970 Self {
1971 account: value.account,
1972 blsVk: value.bls_vk,
1973 schnorrVk: value.schnorr_vk,
1974 commission: value.commission,
1975 }
1976 }
1977 }
1978
1979 impl From<&TestValidator> for ValidatorRegisteredV2 {
1980 fn from(value: &TestValidator) -> Self {
1981 Self {
1982 account: value.account,
1983 blsVK: value.bls_vk,
1984 schnorrVK: value.schnorr_vk,
1985 commission: value.commission,
1986 blsSig: value.bls_sig.into(),
1987 schnorrSig: value.schnorr_sig.clone(),
1988 }
1989 }
1990 }
1991
1992 impl From<&TestValidator> for ConsensusKeysUpdated {
1993 fn from(value: &TestValidator) -> Self {
1994 Self {
1995 account: value.account,
1996 blsVK: value.bls_vk,
1997 schnorrVK: value.schnorr_vk,
1998 }
1999 }
2000 }
2001
2002 impl From<&TestValidator> for ConsensusKeysUpdatedV2 {
2003 fn from(value: &TestValidator) -> Self {
2004 Self {
2005 account: value.account,
2006 blsVK: value.bls_vk,
2007 schnorrVK: value.schnorr_vk,
2008 blsSig: value.bls_sig.into(),
2009 schnorrSig: value.schnorr_sig.clone(),
2010 }
2011 }
2012 }
2013
2014 impl From<&TestValidator> for ValidatorExit {
2015 fn from(value: &TestValidator) -> Self {
2016 Self {
2017 validator: value.account,
2018 }
2019 }
2020 }
2021
2022 impl Validator<BLSPubKey> {
2023 pub fn mock() -> Validator<BLSPubKey> {
2024 let val = TestValidator::random();
2025 let rng = &mut rand::thread_rng();
2026 let mut seed = [1u8; 32];
2027 rng.fill_bytes(&mut seed);
2028 let mut validator_stake = alloy::primitives::U256::from(0);
2029 let mut delegators = HashMap::new();
2030 for _i in 0..=5000 {
2031 let stake: u64 = rng.gen_range(0..10000);
2032 delegators.insert(Address::random(), alloy::primitives::U256::from(stake));
2033 validator_stake += alloy::primitives::U256::from(stake);
2034 }
2035
2036 let stake_table_key = val.bls_vk.into();
2037 let state_ver_key = val.schnorr_vk.into();
2038
2039 Validator {
2040 account: val.account,
2041 stake_table_key,
2042 state_ver_key,
2043 stake: validator_stake,
2044 commission: val.commission,
2045 delegators,
2046 }
2047 }
2048 }
2049}
2050
2051#[cfg(test)]
2052mod tests {
2053 use alloy::{primitives::Address, rpc::types::Log};
2054 use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
2055 use pretty_assertions::assert_matches;
2056 use rstest::rstest;
2057 use sequencer_utils::test_utils::setup_test;
2058
2059 use super::*;
2060 use crate::v0::impls::testing::*;
2061
2062 #[test]
2063 fn test_from_l1_events() -> anyhow::Result<()> {
2064 setup_test();
2065 let val_1 = TestValidator::random();
2067 let val_1_new_keys = val_1.randomize_keys();
2068 let val_2 = TestValidator::random();
2069 let val_2_new_keys = val_2.randomize_keys();
2070 let delegator = Address::random();
2071 let mut events: Vec<StakeTableEvent> = [
2072 ValidatorRegistered::from(&val_1).into(),
2073 ValidatorRegisteredV2::from(&val_2).into(),
2074 Delegated {
2075 delegator,
2076 validator: val_1.account,
2077 amount: U256::from(10),
2078 }
2079 .into(),
2080 ConsensusKeysUpdated::from(&val_1_new_keys).into(),
2081 ConsensusKeysUpdatedV2::from(&val_2_new_keys).into(),
2082 Undelegated {
2083 delegator,
2084 validator: val_1.account,
2085 amount: U256::from(7),
2086 }
2087 .into(),
2088 Delegated {
2090 delegator,
2091 validator: val_1.account,
2092 amount: U256::from(5),
2093 }
2094 .into(),
2095 Delegated {
2097 delegator: Address::random(),
2098 validator: val_2.account,
2099 amount: U256::from(3),
2100 }
2101 .into(),
2102 ]
2103 .to_vec();
2104
2105 let st = active_validator_set_from_l1_events(events.iter().cloned())?;
2106 let st_val_1 = st.get(&val_1.account).unwrap();
2107 assert_eq!(st_val_1.stake, U256::from(8));
2109 assert_eq!(st_val_1.commission, val_1.commission);
2110 assert_eq!(st_val_1.delegators.len(), 1);
2111 assert_eq!(*st_val_1.delegators.get(&delegator).unwrap(), U256::from(8));
2113
2114 let st_val_2 = st.get(&val_2.account).unwrap();
2115 assert_eq!(st_val_2.stake, U256::from(3));
2116 assert_eq!(st_val_2.commission, val_2.commission);
2117 assert_eq!(st_val_2.delegators.len(), 1);
2118
2119 events.push(ValidatorExit::from(&val_1).into());
2120
2121 let st = active_validator_set_from_l1_events(events.iter().cloned())?;
2122 assert_eq!(st.get(&val_1.account), None);
2124
2125 let st_val_2 = st.get(&val_2.account).unwrap();
2127 assert_eq!(st_val_2.stake, U256::from(3));
2128 assert_eq!(st_val_2.commission, val_2.commission);
2129 assert_eq!(st_val_2.delegators.len(), 1);
2130
2131 events.push(ValidatorExit::from(&val_2).into());
2133
2134 assert!(active_validator_set_from_l1_events(events.iter().cloned()).is_err());
2136
2137 Ok(())
2138 }
2139
2140 #[test]
2141 fn test_from_l1_events_failures() -> anyhow::Result<()> {
2142 let val = TestValidator::random();
2143 let delegator = Address::random();
2144
2145 let register: StakeTableEvent = ValidatorRegistered::from(&val).into();
2146 let register_v2: StakeTableEvent = ValidatorRegisteredV2::from(&val).into();
2147 let delegate: StakeTableEvent = Delegated {
2148 delegator,
2149 validator: val.account,
2150 amount: U256::from(10),
2151 }
2152 .into();
2153 let key_update: StakeTableEvent = ConsensusKeysUpdated::from(&val).into();
2154 let key_update_v2: StakeTableEvent = ConsensusKeysUpdatedV2::from(&val).into();
2155 let undelegate: StakeTableEvent = Undelegated {
2156 delegator,
2157 validator: val.account,
2158 amount: U256::from(7),
2159 }
2160 .into();
2161
2162 let exit: StakeTableEvent = ValidatorExit::from(&val).into();
2163
2164 let cases = [
2165 vec![exit],
2166 vec![undelegate.clone()],
2167 vec![delegate.clone()],
2168 vec![key_update],
2169 vec![key_update_v2],
2170 vec![register.clone(), register.clone()],
2171 vec![register_v2.clone(), register_v2.clone()],
2172 vec![register.clone(), register_v2.clone()],
2173 vec![register_v2.clone(), register.clone()],
2174 vec![
2175 register,
2176 delegate.clone(),
2177 undelegate.clone(),
2178 undelegate.clone(),
2179 ],
2180 vec![register_v2, delegate, undelegate.clone(), undelegate],
2181 ];
2182
2183 for events in cases.iter() {
2184 let res = validators_from_l1_events(events.iter().cloned());
2188 assert!(
2189 res.is_err(),
2190 "events {res:?}, not a valid sequence of events"
2191 );
2192 }
2193 Ok(())
2194 }
2195
2196 #[test]
2197 fn test_validators_selection() {
2198 let mut validators = IndexMap::new();
2199 let mut highest_stake = alloy::primitives::U256::ZERO;
2200
2201 for _i in 0..3000 {
2202 let validator = Validator::mock();
2203 validators.insert(validator.account, validator.clone());
2204
2205 if validator.stake > highest_stake {
2206 highest_stake = validator.stake;
2207 }
2208 }
2209
2210 let minimum_stake = highest_stake / U256::from(VID_TARGET_TOTAL_STAKE);
2211
2212 select_active_validator_set(&mut validators).expect("Failed to select validators");
2213 assert!(
2214 validators.len() <= 100,
2215 "validators len is {}, expected at most 100",
2216 validators.len()
2217 );
2218
2219 let mut selected_validators_highest_stake = alloy::primitives::U256::ZERO;
2220 for (address, validator) in &validators {
2222 assert!(
2223 validator.stake >= minimum_stake,
2224 "Validator {:?} has stake below minimum: {}",
2225 address,
2226 validator.stake
2227 );
2228
2229 if validator.stake > selected_validators_highest_stake {
2230 selected_validators_highest_stake = validator.stake;
2231 }
2232 }
2233 }
2234
2235 #[rstest::rstest]
2238 fn test_regression_non_unique_bls_keys_not_discarded(
2239 #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
2240 version: StakeTableContractVersion,
2241 ) {
2242 let val = TestValidator::random();
2243 let register: StakeTableEvent = match version {
2244 StakeTableContractVersion::V1 => ValidatorRegistered::from(&val).into(),
2245 StakeTableContractVersion::V2 => ValidatorRegisteredV2::from(&val).into(),
2246 };
2247 let delegate: StakeTableEvent = Delegated {
2248 delegator: Address::random(),
2249 validator: val.account,
2250 amount: U256::from(10),
2251 }
2252 .into();
2253
2254 assert!(active_validator_set_from_l1_events(
2256 vec![register.clone(), delegate.clone()].into_iter()
2257 )
2258 .is_ok());
2259
2260 let key_update = ConsensusKeysUpdated::from(&val).into();
2262 let err =
2263 active_validator_set_from_l1_events(vec![register, delegate, key_update].into_iter())
2264 .unwrap_err();
2265
2266 let bls: BLSPubKey = val.bls_vk.into();
2267 assert!(matches!(err, StakeTableError::BlsKeyAlreadyUsed(addr) if addr == bls.to_string()));
2268 }
2269
2270 #[test]
2271 fn test_display_log() {
2272 let serialized = r#"{"address":"0x0000000000000000000000000000000000000069","topics":["0x0000000000000000000000000000000000000000000000000000000000000069"],"data":"0x69","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000069","blockNumber":"0x69","blockTimestamp":"0x69","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000069","transactionIndex":"0x69","logIndex":"0x70","removed":false}"#;
2273 let log: Log = serde_json::from_str(serialized).unwrap();
2274 assert_eq!(
2275 log.display(),
2276 "Log(block=105,index=112,transaction_hash=0x0000000000000000000000000000000000000000000000000000000000000069)"
2277 )
2278 }
2279
2280 #[rstest]
2281 #[case::v1(StakeTableContractVersion::V1)]
2282 #[case::v2(StakeTableContractVersion::V2)]
2283 fn test_register_validator(#[case] version: StakeTableContractVersion) {
2284 let mut state = StakeTableState::new();
2285 let validator = TestValidator::random();
2286
2287 let event = match version {
2288 StakeTableContractVersion::V1 => StakeTableEvent::Register((&validator).into()),
2289 StakeTableContractVersion::V2 => StakeTableEvent::RegisterV2((&validator).into()),
2290 };
2291
2292 assert!(state.apply_event(event).unwrap().is_ok());
2293
2294 let stored = state.validators.get(&validator.account).unwrap();
2295 assert_eq!(stored.account, validator.account);
2296 }
2297
2298 #[rstest]
2299 #[case::v1(StakeTableContractVersion::V1)]
2300 #[case::v2(StakeTableContractVersion::V2)]
2301 fn test_validator_already_registered(#[case] version: StakeTableContractVersion) {
2302 let mut stake_table_state = StakeTableState::new();
2303
2304 let test_validator = TestValidator::random();
2305
2306 let first_registration_result =
2308 match version {
2309 StakeTableContractVersion::V1 => stake_table_state
2310 .apply_event(StakeTableEvent::Register((&test_validator).into())),
2311 StakeTableContractVersion::V2 => stake_table_state
2312 .apply_event(StakeTableEvent::RegisterV2((&test_validator).into())),
2313 };
2314
2315 assert!(first_registration_result.unwrap().is_ok());
2317
2318 let v1_already_registered_result =
2320 stake_table_state.apply_event(StakeTableEvent::Register((&test_validator).into()));
2321
2322 pretty_assertions::assert_matches!(
2323 v1_already_registered_result, Err(StakeTableError::AlreadyRegistered(account)) if account == test_validator.account,
2324 "Expected AlreadyRegistered error. version ={version:?} result={v1_already_registered_result:?}",
2325 );
2326
2327 let v2_already_registered_result =
2329 stake_table_state.apply_event(StakeTableEvent::RegisterV2((&test_validator).into()));
2330
2331 pretty_assertions::assert_matches!(
2332 v2_already_registered_result,
2333 Err(StakeTableError::AlreadyRegistered(account)) if account == test_validator.account,
2334 "Expected AlreadyRegistered error. version ={version:?} result={v2_already_registered_result:?}",
2335
2336 );
2337 }
2338
2339 #[test]
2340 fn test_register_validator_v2_auth_fails() {
2341 let mut state = StakeTableState::new();
2342 let mut val = TestValidator::random();
2343 val.bls_sig = Default::default();
2344 let event = StakeTableEvent::RegisterV2((&val).into());
2345
2346 let result = state.apply_event(event);
2347 assert!(matches!(
2348 result,
2349 Err(StakeTableError::AuthenticationFailed(_))
2350 ));
2351 }
2352
2353 #[test]
2354 fn test_deregister_validator() {
2355 let mut state = StakeTableState::new();
2356 let val = TestValidator::random();
2357
2358 let reg = StakeTableEvent::Register((&val).into());
2359 state.apply_event(reg).unwrap().unwrap();
2360
2361 let dereg = StakeTableEvent::Deregister((&val).into());
2362 assert!(state.apply_event(dereg).unwrap().is_ok());
2363 assert!(!state.validators.contains_key(&val.account));
2364 }
2365
2366 #[test]
2367 fn test_delegate_and_undelegate() {
2368 let mut state = StakeTableState::new();
2369 let val = TestValidator::random();
2370 state
2371 .apply_event(StakeTableEvent::Register((&val).into()))
2372 .unwrap()
2373 .unwrap();
2374
2375 let delegator = Address::random();
2376 let amount = U256::from(1000);
2377 let delegate_event = StakeTableEvent::Delegate(Delegated {
2378 delegator,
2379 validator: val.account,
2380 amount,
2381 });
2382 assert!(state.apply_event(delegate_event).unwrap().is_ok());
2383
2384 let validator = state.validators.get(&val.account).unwrap();
2385 assert_eq!(validator.delegators.get(&delegator).cloned(), Some(amount));
2386
2387 let undelegate_event = StakeTableEvent::Undelegate(Undelegated {
2388 delegator,
2389 validator: val.account,
2390 amount,
2391 });
2392 assert!(state.apply_event(undelegate_event).unwrap().is_ok());
2393 let validator = state.validators.get(&val.account).unwrap();
2394 assert!(!validator.delegators.contains_key(&delegator));
2395 }
2396
2397 #[rstest]
2398 #[case::v1(StakeTableContractVersion::V1)]
2399 #[case::v2(StakeTableContractVersion::V2)]
2400 fn test_key_update_event(#[case] version: StakeTableContractVersion) {
2401 let mut state = StakeTableState::new();
2402 let val = TestValidator::random();
2403
2404 state
2406 .apply_event(StakeTableEvent::Register((&val).into()))
2407 .unwrap()
2408 .unwrap();
2409
2410 let new_keys = val.randomize_keys();
2411
2412 let event = match version {
2413 StakeTableContractVersion::V1 => StakeTableEvent::KeyUpdate((&new_keys).into()),
2414 StakeTableContractVersion::V2 => StakeTableEvent::KeyUpdateV2((&new_keys).into()),
2415 };
2416
2417 assert!(state.apply_event(event).unwrap().is_ok());
2418
2419 let updated = state.validators.get(&val.account).unwrap();
2420 assert_eq!(updated.stake_table_key, new_keys.bls_vk.into());
2421 assert_eq!(updated.state_ver_key, new_keys.schnorr_vk.into());
2422 }
2423
2424 #[test]
2425 fn test_duplicate_bls_key() {
2426 let mut state = StakeTableState::new();
2427 let val = TestValidator::random();
2428 let event1 = StakeTableEvent::Register((&val).into());
2429 let mut val2 = TestValidator::random();
2430 val2.bls_vk = val.bls_vk;
2431 val2.account = Address::random();
2432
2433 let event2 = StakeTableEvent::Register((&val2).into());
2434 assert!(state.apply_event(event1).unwrap().is_ok());
2435 let result = state.apply_event(event2);
2436
2437 let expected_bls_key = BLSPubKey::from(val.bls_vk).to_string();
2438
2439 assert_matches!(
2440 result,
2441 Err(StakeTableError::BlsKeyAlreadyUsed(key))
2442 if key == expected_bls_key,
2443 "Expected BlsKeyAlreadyUsed({expected_bls_key}), but got: {result:?}",
2444 );
2445 }
2446
2447 #[test]
2448 fn test_duplicate_schnorr_key() {
2449 let mut state = StakeTableState::new();
2450 let val = TestValidator::random();
2451 let event1 = StakeTableEvent::Register((&val).into());
2452 let mut val2 = TestValidator::random();
2453 val2.schnorr_vk = val.schnorr_vk;
2454 val2.account = Address::random();
2455 val2.bls_vk = val2.randomize_keys().bls_vk;
2456
2457 let event2 = StakeTableEvent::Register((&val2).into());
2458 assert!(state.apply_event(event1).unwrap().is_ok());
2459 let result = state.apply_event(event2);
2460
2461 let schnorr: SchnorrPubKey = val.schnorr_vk.into();
2462 assert_matches!(
2463 result,
2464 Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(key)))
2465 if key == schnorr.to_string(),
2466 "Expected SchnorrKeyAlreadyUsed({schnorr}), but got: {result:?}",
2467
2468 );
2469 }
2470
2471 #[test]
2472 fn test_register_and_deregister_validator() {
2473 let mut state = StakeTableState::new();
2474 let validator = TestValidator::random();
2475 let event = StakeTableEvent::Register((&validator).into());
2476 assert!(state.apply_event(event).unwrap().is_ok());
2477
2478 let deregister_event = StakeTableEvent::Deregister((&validator).into());
2479 assert!(state.apply_event(deregister_event).unwrap().is_ok());
2480 }
2481
2482 #[test]
2483 fn test_delegate_zero_amount_is_rejected() {
2484 let mut state = StakeTableState::new();
2485 let validator = TestValidator::random();
2486 let account = validator.account;
2487 state
2488 .apply_event(StakeTableEvent::Register((&validator).into()))
2489 .unwrap()
2490 .unwrap();
2491
2492 let delegator = Address::random();
2493 let amount = U256::ZERO;
2494 let event = StakeTableEvent::Delegate(Delegated {
2495 delegator,
2496 validator: account,
2497 amount,
2498 });
2499 let result = state.apply_event(event);
2500
2501 assert_matches!(
2502 result,
2503 Err(StakeTableError::ZeroDelegatorStake(addr))
2504 if addr == delegator,
2505 "delegator stake is zero"
2506
2507 );
2508 }
2509
2510 #[test]
2511 fn test_undelegate_more_than_stake_fails() {
2512 let mut state = StakeTableState::new();
2513 let validator = TestValidator::random();
2514 let account = validator.account;
2515 state
2516 .apply_event(StakeTableEvent::Register((&validator).into()))
2517 .unwrap()
2518 .unwrap();
2519
2520 let delegator = Address::random();
2521 let event = StakeTableEvent::Delegate(Delegated {
2522 delegator,
2523 validator: account,
2524 amount: U256::from(10u64),
2525 });
2526 state.apply_event(event).unwrap().unwrap();
2527
2528 let result = state.apply_event(StakeTableEvent::Undelegate(Undelegated {
2529 delegator,
2530 validator: account,
2531 amount: U256::from(20u64),
2532 }));
2533 assert_matches!(
2534 result,
2535 Err(StakeTableError::InsufficientStake),
2536 "Expected InsufficientStake error, got: {result:?}",
2537 );
2538 }
2539
2540 #[tokio::test(flavor = "multi_thread")]
2541 async fn test_decaf_stake_table() {
2542 setup_test();
2543
2544 let events_json =
2582 std::fs::read_to_string("../data/v3/decaf_stake_table_events.json").unwrap();
2583 let events: Vec<(EventKey, StakeTableEvent)> = serde_json::from_str(&events_json).unwrap();
2584
2585 let reconstructed_stake_table =
2587 active_validator_set_from_l1_events(events.into_iter().map(|(_, e)| e)).unwrap();
2588
2589 let stake_table_json =
2590 std::fs::read_to_string("../data/v3/decaf_stake_table.json").unwrap();
2591 let expected: IndexMap<Address, Validator<BLSPubKey>> =
2592 serde_json::from_str(&stake_table_json).unwrap();
2593
2594 assert_eq!(
2595 reconstructed_stake_table, expected,
2596 "Stake table reconstructed from events does not match the expected stake table "
2597 );
2598 }
2599}