espresso_types/v0/impls/
stake_table.rs

1use std::{
2    cmp::{max, min},
3    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
4    future::Future,
5    sync::Arc,
6};
7
8use alloy::{
9    eips::{BlockId, BlockNumberOrTag},
10    primitives::{utils::format_ether, Address, U256},
11    providers::Provider,
12    rpc::types::Log,
13};
14use anyhow::{bail, ensure, Context};
15use async_lock::{Mutex, RwLock};
16use committable::Committable;
17use futures::stream::{self, StreamExt};
18use hotshot::types::{BLSPubKey, SchnorrPubKey, SignatureKey as _};
19use hotshot_contract_adapter::sol_types::{
20    EspToken::{self, EspTokenInstance},
21    StakeTableV2::{
22        self, ConsensusKeysUpdated, ConsensusKeysUpdatedV2, Delegated, Undelegated, ValidatorExit,
23        ValidatorRegistered, ValidatorRegisteredV2,
24    },
25};
26use hotshot_types::{
27    data::{vid_disperse::VID_TARGET_TOTAL_STAKE, EpochNumber},
28    drb::{
29        election::{generate_stake_cdf, select_randomized_leader, RandomizedCommittee},
30        DrbResult,
31    },
32    stake_table::{HSStakeTable, StakeTableEntry},
33    traits::{
34        election::Membership,
35        node_implementation::{ConsensusTime, NodeType},
36        signature_key::StakeTableEntryType,
37    },
38    PeerConfig,
39};
40use indexmap::IndexMap;
41use thiserror::Error;
42use tokio::{spawn, time::sleep};
43use tracing::Instrument;
44
45#[cfg(any(test, feature = "testing"))]
46use super::v0_3::DAMembers;
47use super::{
48    traits::{MembershipPersistence, StateCatchup},
49    v0_3::{ChainConfig, EventKey, Fetcher, StakeTableEvent, StakeTableUpdateTask, Validator},
50    Header, L1Client, Leaf2, PubKey, SeqTypes,
51};
52use crate::{
53    traits::EventsPersistenceRead,
54    v0_1::{L1Provider, RewardAmount, BLOCKS_PER_YEAR, COMMISSION_BASIS_POINTS, INFLATION_RATE},
55    v0_3::{EventSortingError, ExpectedStakeTableError, FetchRewardError, StakeTableError},
56};
57
58type Epoch = <SeqTypes as NodeType>::Epoch;
59pub type ValidatorMap = IndexMap<Address, Validator<BLSPubKey>>;
60/// The result of applying a stake table event:
61/// - `Ok(Ok(()))`: success
62/// - `Ok(Err(...))`: expected error
63/// - `Err(...)`: serious error
64type ApplyEventResult<T> = Result<Result<T, ExpectedStakeTableError>, StakeTableError>;
65
66/// Format the alloy Log RPC type in a way to make it easy to find the event in an explorer.
67trait DisplayLog {
68    fn display(&self) -> String;
69}
70
71impl DisplayLog for Log {
72    fn display(&self) -> String {
73        // These values are all unlikely to be missing because we only create Log variables by
74        // fetching them from the RPC, so for simplicity we use defaults if the any of the values
75        // are missing.
76        let block = self.block_number.unwrap_or_default();
77        let index = self.log_index.unwrap_or_default();
78        let hash = self.transaction_hash.unwrap_or_default();
79        format!("Log(block={block},index={index},transaction_hash={hash})")
80    }
81}
82
83#[derive(Clone, PartialEq)]
84pub struct StakeTableEvents {
85    registrations: Vec<(ValidatorRegistered, Log)>,
86    registrations_v2: Vec<(ValidatorRegisteredV2, Log)>,
87    deregistrations: Vec<(ValidatorExit, Log)>,
88    delegated: Vec<(Delegated, Log)>,
89    undelegated: Vec<(Undelegated, Log)>,
90    keys: Vec<(ConsensusKeysUpdated, Log)>,
91    keys_v2: Vec<(ConsensusKeysUpdatedV2, Log)>,
92}
93
94impl StakeTableEvents {
95    /// Creates a new instance of `StakeTableEvents` with the provided events.
96    ///
97    /// Remove unauthenticated registration and key update events
98    fn from_l1_logs(
99        registrations: Vec<(ValidatorRegistered, Log)>,
100        registrations_v2: Vec<(ValidatorRegisteredV2, Log)>,
101        deregistrations: Vec<(ValidatorExit, Log)>,
102        delegated: Vec<(Delegated, Log)>,
103        undelegated: Vec<(Undelegated, Log)>,
104        keys: Vec<(ConsensusKeysUpdated, Log)>,
105        keys_v2: Vec<(ConsensusKeysUpdatedV2, Log)>,
106    ) -> Self {
107        let registrations_v2 = registrations_v2
108            .into_iter()
109            .filter(|(event, log)| {
110                event
111                    .authenticate()
112                    .map_err(|_| {
113                        tracing::warn!(
114                            "Failed to authenticate ValidatorRegisteredV2 event {}",
115                            log.display()
116                        );
117                    })
118                    .is_ok()
119            })
120            .collect();
121        let keys_v2 = keys_v2
122            .into_iter()
123            .filter(|(event, log)| {
124                event
125                    .authenticate()
126                    .map_err(|_| {
127                        tracing::warn!(
128                            "Failed to authenticate ConsensusKeysUpdatedV2 event {}",
129                            log.display()
130                        );
131                    })
132                    .is_ok()
133            })
134            .collect();
135        Self {
136            registrations,
137            registrations_v2,
138            deregistrations,
139            delegated,
140            undelegated,
141            keys,
142            keys_v2,
143        }
144    }
145
146    pub fn sort_events(self) -> Result<Vec<(EventKey, StakeTableEvent)>, EventSortingError> {
147        let mut events: Vec<(EventKey, StakeTableEvent)> = Vec::new();
148        let Self {
149            registrations,
150            registrations_v2,
151            deregistrations,
152            delegated,
153            undelegated,
154            keys,
155            keys_v2,
156        } = self;
157
158        let key = |log: &Log| -> Result<EventKey, EventSortingError> {
159            let block_number = log
160                .block_number
161                .ok_or(EventSortingError::MissingBlockNumber)?;
162            let log_index = log.log_index.ok_or(EventSortingError::MissingLogIndex)?;
163            Ok((block_number, log_index))
164        };
165
166        for (registration, log) in registrations {
167            events.push((key(&log)?, registration.into()));
168        }
169        for (registration, log) in registrations_v2 {
170            events.push((key(&log)?, registration.into()));
171        }
172        for (dereg, log) in deregistrations {
173            events.push((key(&log)?, dereg.into()));
174        }
175        for (delegation, log) in delegated {
176            events.push((key(&log)?, delegation.into()));
177        }
178        for (undelegated, log) in undelegated {
179            events.push((key(&log)?, undelegated.into()));
180        }
181        for (update, log) in keys {
182            events.push((key(&log)?, update.into()));
183        }
184        for (update, log) in keys_v2 {
185            events.push((key(&log)?, update.into()));
186        }
187
188        events.sort_by_key(|(key, _)| *key);
189        Ok(events)
190    }
191}
192
193#[derive(Debug)]
194pub struct StakeTableState {
195    validators: ValidatorMap,
196    used_bls_keys: HashSet<BLSPubKey>,
197    used_schnorr_keys: HashSet<SchnorrPubKey>,
198}
199
200impl StakeTableState {
201    pub fn new() -> Self {
202        Self {
203            validators: IndexMap::new(),
204            used_bls_keys: HashSet::new(),
205            used_schnorr_keys: HashSet::new(),
206        }
207    }
208
209    pub fn get_validators(self) -> ValidatorMap {
210        self.validators
211    }
212
213    pub fn apply_event(&mut self, event: StakeTableEvent) -> ApplyEventResult<()> {
214        match event {
215            StakeTableEvent::Register(ValidatorRegistered {
216                account,
217                blsVk,
218                schnorrVk,
219                commission,
220            }) => {
221                let stake_table_key: BLSPubKey = blsVk.into();
222                let state_ver_key: SchnorrPubKey = schnorrVk.into();
223
224                let entry = self.validators.entry(account);
225                if let indexmap::map::Entry::Occupied(_) = entry {
226                    return Err(StakeTableError::AlreadyRegistered(account));
227                }
228
229                // The stake table contract enforces that each bls key is only used once.
230                if !self.used_bls_keys.insert(stake_table_key) {
231                    return Err(StakeTableError::BlsKeyAlreadyUsed(
232                        stake_table_key.to_string(),
233                    ));
234                }
235
236                // The contract does *not* enforce that each schnorr key is only used once.
237                if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
238                    return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
239                        state_ver_key.to_string(),
240                    )));
241                }
242
243                entry.or_insert(Validator {
244                    account,
245                    stake_table_key,
246                    state_ver_key,
247                    stake: U256::ZERO,
248                    commission,
249                    delegators: HashMap::new(),
250                });
251            },
252
253            StakeTableEvent::RegisterV2(reg) => {
254                // Signature authentication is performed right after fetching, if we get an
255                // unauthenticated event here, something went wrong, we abort early.
256                reg.authenticate()
257                    .map_err(|e| StakeTableError::AuthenticationFailed(e.to_string()))?;
258
259                let ValidatorRegisteredV2 {
260                    account,
261                    blsVK,
262                    schnorrVK,
263                    commission,
264                    ..
265                } = reg;
266
267                let stake_table_key: BLSPubKey = blsVK.into();
268                let state_ver_key: SchnorrPubKey = schnorrVK.into();
269
270                let entry = self.validators.entry(account);
271                if let indexmap::map::Entry::Occupied(_) = entry {
272                    return Err(StakeTableError::AlreadyRegistered(account));
273                }
274
275                // The stake table contract enforces that each bls key is only used once.
276                if !self.used_bls_keys.insert(stake_table_key) {
277                    return Err(StakeTableError::BlsKeyAlreadyUsed(
278                        stake_table_key.to_string(),
279                    ));
280                }
281
282                // The contract does *not* enforce that each schnorr key is only used once.
283                if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
284                    return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
285                        state_ver_key.to_string(),
286                    )));
287                }
288
289                entry.or_insert(Validator {
290                    account,
291                    stake_table_key,
292                    state_ver_key,
293                    stake: U256::ZERO,
294                    commission,
295                    delegators: HashMap::new(),
296                });
297            },
298
299            StakeTableEvent::Deregister(exit) => {
300                self.validators
301                    .shift_remove(&exit.validator)
302                    .ok_or(StakeTableError::ValidatorNotFound(exit.validator))?;
303            },
304
305            StakeTableEvent::Delegate(delegated) => {
306                let Delegated {
307                    delegator,
308                    validator,
309                    amount,
310                } = delegated;
311
312                let val = self
313                    .validators
314                    .get_mut(&validator)
315                    .ok_or(StakeTableError::ValidatorNotFound(validator))?;
316
317                if amount.is_zero() {
318                    return Err(StakeTableError::ZeroDelegatorStake(delegator));
319                }
320
321                val.stake += amount;
322                // Insert the delegator with the given stake
323                // or increase the stake if already present
324                val.delegators
325                    .entry(delegator)
326                    .and_modify(|stake| *stake += amount)
327                    .or_insert(amount);
328            },
329
330            StakeTableEvent::Undelegate(undelegated) => {
331                let Undelegated {
332                    delegator,
333                    validator,
334                    amount,
335                } = undelegated;
336
337                let val = self
338                    .validators
339                    .get_mut(&validator)
340                    .ok_or(StakeTableError::ValidatorNotFound(validator))?;
341
342                val.stake = val
343                    .stake
344                    .checked_sub(amount)
345                    .ok_or(StakeTableError::InsufficientStake)?;
346
347                let delegator_stake = val
348                    .delegators
349                    .get_mut(&delegator)
350                    .ok_or(StakeTableError::DelegatorNotFound(delegator))?;
351
352                *delegator_stake = delegator_stake
353                    .checked_sub(amount)
354                    .ok_or(StakeTableError::InsufficientStake)?;
355
356                if delegator_stake.is_zero() {
357                    val.delegators.remove(&delegator);
358                }
359            },
360
361            StakeTableEvent::KeyUpdate(update) => {
362                let ConsensusKeysUpdated {
363                    account,
364                    blsVK,
365                    schnorrVK,
366                } = update;
367
368                let validator = self
369                    .validators
370                    .get_mut(&account)
371                    .ok_or(StakeTableError::ValidatorNotFound(account))?;
372
373                let stake_table_key: BLSPubKey = blsVK.into();
374                let state_ver_key: SchnorrPubKey = schnorrVK.into();
375
376                if !self.used_bls_keys.insert(stake_table_key) {
377                    return Err(StakeTableError::BlsKeyAlreadyUsed(
378                        stake_table_key.to_string(),
379                    ));
380                }
381
382                // The contract does *not* enforce that each schnorr key is only used once,
383                // therefore it's possible to have multiple validators with the same schnorr key.
384                if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
385                    return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
386                        state_ver_key.to_string(),
387                    )));
388                }
389
390                validator.stake_table_key = stake_table_key;
391                validator.state_ver_key = state_ver_key;
392            },
393
394            StakeTableEvent::KeyUpdateV2(update) => {
395                // Signature authentication is performed right after fetching, if we get an
396                // unauthenticated event here, something went wrong, we abort early.
397                update
398                    .authenticate()
399                    .map_err(|e| StakeTableError::AuthenticationFailed(e.to_string()))?;
400
401                let ConsensusKeysUpdatedV2 {
402                    account,
403                    blsVK,
404                    schnorrVK,
405                    ..
406                } = update;
407
408                let validator = self
409                    .validators
410                    .get_mut(&account)
411                    .ok_or(StakeTableError::ValidatorNotFound(account))?;
412
413                let stake_table_key: BLSPubKey = blsVK.into();
414                let state_ver_key: SchnorrPubKey = schnorrVK.into();
415
416                // The stake table contract enforces that each bls key is only used once.
417                if !self.used_bls_keys.insert(stake_table_key) {
418                    return Err(StakeTableError::BlsKeyAlreadyUsed(
419                        stake_table_key.to_string(),
420                    ));
421                }
422
423                // The contract does *not* enforce that each schnorr key is only used once,
424                // therefore it's possible to have multiple validators with the same schnorr key.
425                if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
426                    return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
427                        state_ver_key.to_string(),
428                    )));
429                }
430
431                validator.stake_table_key = stake_table_key;
432                validator.state_ver_key = state_ver_key;
433            },
434        }
435
436        Ok(Ok(()))
437    }
438}
439
440pub fn validators_from_l1_events<I: Iterator<Item = StakeTableEvent>>(
441    events: I,
442) -> Result<ValidatorMap, StakeTableError> {
443    let mut state = StakeTableState::new();
444    for event in events {
445        match state.apply_event(event.clone()) {
446            Ok(Ok(())) => (), // Event successfully applied
447            Ok(Err(expected_err)) => {
448                // expected error, continue
449                tracing::warn!("Expected error while applying event {event:?}: {expected_err}");
450            },
451            Err(err) => {
452                // stop processing due to fatal error
453                tracing::error!("Fatal error in applying event {event:?}: {err}");
454                return Err(err);
455            },
456        }
457    }
458    Ok(state.get_validators())
459}
460
461/// Select active validators
462///
463/// Removes the validators without stake and selects the top 100 staked validators.
464pub(crate) fn select_active_validator_set(
465    validators: &mut ValidatorMap,
466) -> Result<(), StakeTableError> {
467    let total_validators = validators.len();
468
469    // Remove invalid validators first
470    validators.retain(|address, validator| {
471        if validator.delegators.is_empty() {
472            tracing::info!("Validator {address:?} does not have any delegator");
473            return false;
474        }
475
476        if validator.stake.is_zero() {
477            tracing::info!("Validator {address:?} does not have any stake");
478            return false;
479        }
480
481        true
482    });
483
484    tracing::debug!(
485        total_validators,
486        filtered = validators.len(),
487        "Filtered out invalid validators"
488    );
489
490    if validators.is_empty() {
491        tracing::warn!("Validator selection failed: no validators passed minimum criteria");
492        return Err(StakeTableError::NoValidValidators);
493    }
494
495    let maximum_stake = validators.values().map(|v| v.stake).max().ok_or_else(|| {
496        tracing::error!("Could not compute maximum stake from filtered validators");
497        StakeTableError::MissingMaximumStake
498    })?;
499
500    let minimum_stake = maximum_stake
501        .checked_div(U256::from(VID_TARGET_TOTAL_STAKE))
502        .ok_or_else(|| {
503            tracing::error!("Overflow while calculating minimum stake threshold");
504            StakeTableError::MinimumStakeOverflow
505        })?;
506
507    let mut valid_stakers: Vec<_> = validators
508        .iter()
509        .filter(|(_, v)| v.stake >= minimum_stake)
510        .map(|(addr, v)| (*addr, v.stake))
511        .collect();
512
513    tracing::info!(
514        count = valid_stakers.len(),
515        "Number of validators above minimum stake threshold"
516    );
517
518    // Sort by stake (descending order)
519    valid_stakers.sort_by_key(|(_, stake)| std::cmp::Reverse(*stake));
520
521    if valid_stakers.len() > 100 {
522        valid_stakers.truncate(100);
523    }
524
525    // Retain only the selected validators
526    let selected_addresses: HashSet<_> = valid_stakers.iter().map(|(addr, _)| *addr).collect();
527    validators.retain(|address, _| selected_addresses.contains(address));
528
529    tracing::info!(
530        final_count = validators.len(),
531        "Selected active validator set"
532    );
533
534    Ok(())
535}
536
537/// Extract the active validator set from the L1 stake table events.
538pub(crate) fn active_validator_set_from_l1_events<I: Iterator<Item = StakeTableEvent>>(
539    events: I,
540) -> Result<ValidatorMap, StakeTableError> {
541    let mut validators = validators_from_l1_events(events)?;
542    select_active_validator_set(&mut validators)?;
543    Ok(validators)
544}
545
546impl std::fmt::Debug for StakeTableEvent {
547    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
548        match self {
549            StakeTableEvent::Register(event) => write!(f, "Register({:?})", event.account),
550            StakeTableEvent::RegisterV2(event) => write!(f, "RegisterV2({:?})", event.account),
551            StakeTableEvent::Deregister(event) => write!(f, "Deregister({:?})", event.validator),
552            StakeTableEvent::Delegate(event) => write!(f, "Delegate({:?})", event.delegator),
553            StakeTableEvent::Undelegate(event) => write!(f, "Undelegate({:?})", event.delegator),
554            StakeTableEvent::KeyUpdate(event) => write!(f, "KeyUpdate({:?})", event.account),
555            StakeTableEvent::KeyUpdateV2(event) => write!(f, "KeyUpdateV2({:?})", event.account),
556        }
557    }
558}
559
560#[derive(Clone, derive_more::derive::Debug)]
561/// Type to describe DA and Stake memberships
562pub struct EpochCommittees {
563    /// Committee used when we're in pre-epoch state
564    non_epoch_committee: NonEpochCommittee,
565    /// Holds Stake table and da stake
566    state: HashMap<Epoch, EpochCommittee>,
567    /// Randomized committees, filled when we receive the DrbResult
568    randomized_committees: BTreeMap<Epoch, RandomizedCommittee<StakeTableEntry<PubKey>>>,
569    first_epoch: Option<Epoch>,
570    block_reward: RewardAmount,
571    fetcher: Arc<Fetcher>,
572}
573
574impl Fetcher {
575    pub fn new(
576        peers: Arc<dyn StateCatchup>,
577        persistence: Arc<Mutex<dyn MembershipPersistence>>,
578        l1_client: L1Client,
579        chain_config: ChainConfig,
580    ) -> Self {
581        Self {
582            peers,
583            persistence,
584            l1_client,
585            chain_config: Arc::new(Mutex::new(chain_config)),
586            update_task: StakeTableUpdateTask(Mutex::new(None)).into(),
587        }
588    }
589
590    pub async fn spawn_update_loop(&self) {
591        let mut update_task = self.update_task.0.lock().await;
592        if update_task.is_none() {
593            *update_task = Some(spawn(self.update_loop()));
594        }
595    }
596
597    /// Periodically updates the stake table from the L1 contract.
598    /// This function polls the finalized block number from the L1 client at an interval
599    /// and fetches stake table from contract
600    /// and updates the persistence
601    fn update_loop(&self) -> impl Future<Output = ()> {
602        let span = tracing::warn_span!("Stake table update loop");
603        let self_clone = self.clone();
604        let state = self.l1_client.state.clone();
605        let l1_retry = self.l1_client.options().l1_retry_delay;
606        let update_delay = self.l1_client.options().stake_table_update_interval;
607        let chain_config = self.chain_config.clone();
608
609        async move {
610            // Get the stake table contract address from the chain config.
611            // This may not contain a stake table address if we are on a pre-epoch version.
612            // It keeps retrying until the chain config is upgraded
613            // after a successful upgrade to an epoch version.
614            let stake_contract_address = loop {
615                match chain_config.lock().await.stake_table_contract {
616                    Some(addr) => break addr,
617                    None => {
618                        tracing::debug!(
619                            "Stake table contract address not found. Retrying in {l1_retry:?}...",
620                        );
621                    },
622                }
623                sleep(l1_retry).await;
624            };
625
626            // Begin the main polling loop
627            loop {
628                let finalized_block = loop {
629                    if let Some(block) = state.lock().await.last_finalized {
630                        break block;
631                    }
632                    tracing::debug!(
633                        "Finalized block not yet available. Retrying in {l1_retry:?}",
634                    );
635                    sleep(l1_retry).await;
636                };
637
638                tracing::debug!(
639                    "Attempting to fetch stake table at L1 block {finalized_block:?}",
640                );
641
642                loop {
643                    match self_clone
644                        .fetch_and_store_stake_table_events(stake_contract_address, finalized_block)
645                        .await
646                        {
647                            Ok(events) => {
648                                tracing::info!("Successfully fetched and stored stake table events at block={finalized_block:?}");
649                                tracing::debug!("events={events:?}");
650                                break;
651                            },
652                            Err(e) => {
653                                tracing::error!(
654                                    "Error fetching stake table at block {finalized_block:?}. err= {e:#}",
655                                );
656                                sleep(l1_retry).await;
657                            },
658                        }
659                    }
660
661                tracing::debug!(
662                    "Waiting {update_delay:?} before next stake table update...",
663                );
664                sleep(update_delay).await;
665            }
666        }
667        .instrument(span)
668    }
669
670    pub async fn fetch_events(
671        &self,
672        contract: Address,
673        to_block: u64,
674    ) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
675        let persistence_lock = self.persistence.lock().await;
676        let (read_l1_offset, persistence_events) = persistence_lock.load_events(to_block).await?;
677        drop(persistence_lock);
678
679        tracing::info!("loaded events from storage to_block={to_block:?}");
680
681        // No need to fetch from contract
682        // if persistence returns all the events that we need
683        if let Some(EventsPersistenceRead::Complete) = read_l1_offset {
684            return Ok(persistence_events);
685        }
686
687        let from_block = read_l1_offset
688            .map(|read| match read {
689                EventsPersistenceRead::UntilL1Block(block) => Ok(block + 1),
690                EventsPersistenceRead::Complete => Err(anyhow::anyhow!(
691                    "Unexpected state. offset is complete after returning early"
692                )),
693            })
694            .transpose()?;
695
696        ensure!(
697            Some(to_block) >= from_block,
698            "to_block {to_block:?} is less than from_block {from_block:?}"
699        );
700
701        tracing::info!(%to_block, from_block = ?from_block, "Fetching events from contract");
702
703        let contract_events = Self::fetch_events_from_contract(
704            self.l1_client.clone(),
705            contract,
706            from_block,
707            to_block,
708        )
709        .await;
710
711        let contract_events = contract_events.sort_events()?;
712        let mut events = match from_block {
713            Some(_) => persistence_events
714                .into_iter()
715                .chain(contract_events)
716                .collect(),
717            None => contract_events,
718        };
719
720        // There are no duplicates because the RPC returns all events,
721        // which are stored directly in persistence as is.
722        // However, this step is taken as a precaution.
723        // The vector is already sorted above, so this should be fast.
724        let len_before_dedup = events.len();
725        events.dedup();
726        let len_after_dedup = events.len();
727        if len_before_dedup != len_after_dedup {
728            tracing::warn!("Duplicate events found and removed. This should not normally happen.")
729        }
730
731        Ok(events)
732    }
733
734    /// Fetch all stake table events from L1
735    pub async fn fetch_events_from_contract(
736        l1_client: L1Client,
737        contract: Address,
738        from_block: Option<u64>,
739        to_block: u64,
740    ) -> StakeTableEvents {
741        let stake_table_contract = StakeTableV2::new(contract, l1_client.provider.clone());
742
743        // get the block number when the contract was initialized
744        // to avoid fetching events from block number 0
745        let from_block = match from_block {
746            Some(block) => block,
747            None => {
748                loop {
749                    match stake_table_contract.initializedAtBlock().call().await {
750                        Ok(init_block) => {
751                            break init_block._0.to::<u64>();
752                        },
753                        Err(err) => {
754                            // Retry fetching incase of an error
755                            tracing::warn!(%err, "Failed to retrieve initial block, retrying..");
756                            sleep(l1_client.options().l1_retry_delay).await;
757                        },
758                    }
759                }
760            },
761        };
762
763        // To avoid making large RPC calls, divide the range into smaller chunks.
764        // chunk size is from env "ESPRESSO_SEQUENCER_L1_EVENTS_MAX_BLOCK_RANGE
765        // default value  is `10000` if env variable is not set
766        let mut start = from_block;
767        let end = to_block;
768        let chunk_size = l1_client.options().l1_events_max_block_range;
769        let chunks = std::iter::from_fn(move || {
770            let chunk_end = min(start + chunk_size - 1, end);
771            if chunk_end < start {
772                return None;
773            }
774
775            let chunk = (start, chunk_end);
776            start = chunk_end + 1;
777            Some(chunk)
778        });
779
780        // fetch registered events
781        // retry if the call to the provider to fetch the events fails
782        let registered_events = stream::iter(chunks.clone()).then(|(from, to)| {
783            let retry_delay = l1_client.options().l1_retry_delay;
784            let stake_table_contract = stake_table_contract.clone();
785            async move {
786                tracing::debug!(from, to, "fetch ValidatorRegistered events in range");
787                loop {
788                    match stake_table_contract
789                        .clone()
790                        .ValidatorRegistered_filter()
791                        .from_block(from)
792                        .to_block(to)
793                        .query()
794                        .await
795                    {
796                        Ok(events) => break stream::iter(events),
797                        Err(err) => {
798                            tracing::warn!(from, to, %err, "ValidatorRegistered Error");
799                            sleep(retry_delay).await;
800                        },
801                    }
802                }
803            }
804        });
805
806        // fetch registered events v2
807        // retry if the call to the provider to fetch the events fails
808        let registered_events_v2 = stream::iter(chunks.clone()).then(|(from, to)| {
809            let retry_delay = l1_client.options().l1_retry_delay;
810            let stake_table_contract = stake_table_contract.clone();
811            async move {
812                tracing::debug!(from, to, "fetch ValidatorRegisteredV2 events in range");
813                loop {
814                    match stake_table_contract
815                        .clone()
816                        .ValidatorRegisteredV2_filter()
817                        .from_block(from)
818                        .to_block(to)
819                        .query()
820                        .await
821                    {
822                        Ok(events) => {
823                            break stream::iter(events.into_iter().filter(|(event, log)| {
824                                if let Err(e) = event.authenticate() {
825                                    tracing::warn!(%e, "Failed to authenticate ValidatorRegisteredV2 event: {}", log.display());
826                                    return false;
827                                }
828                                true
829                            }));
830                        },
831                        Err(err) => {
832                            tracing::warn!(from, to, %err, "ValidatorRegisteredV2 Error");
833                            sleep(retry_delay).await;
834                        },
835                    }
836                }
837            }
838        });
839
840        // fetch validator de registration events
841        let deregistered_events = stream::iter(chunks.clone()).then(|(from, to)| {
842            let retry_delay = l1_client.options().l1_retry_delay;
843            let stake_table_contract = stake_table_contract.clone();
844            async move {
845                tracing::debug!(from, to, "fetch ValidatorExit events in range");
846                loop {
847                    match stake_table_contract
848                        .ValidatorExit_filter()
849                        .from_block(from)
850                        .to_block(to)
851                        .query()
852                        .await
853                    {
854                        Ok(events) => break stream::iter(events),
855                        Err(err) => {
856                            tracing::warn!(from, to, %err, "ValidatorExit Error");
857                            sleep(retry_delay).await;
858                        },
859                    }
860                }
861            }
862        });
863
864        // fetch delegated events
865        let delegated_events = stream::iter(chunks.clone()).then(|(from, to)| {
866            let retry_delay = l1_client.options().l1_retry_delay;
867            let stake_table_contract = stake_table_contract.clone();
868            async move {
869                tracing::debug!(from, to, "fetch Delegated events in range");
870                loop {
871                    match stake_table_contract
872                        .Delegated_filter()
873                        .from_block(from)
874                        .to_block(to)
875                        .query()
876                        .await
877                    {
878                        Ok(events) => break stream::iter(events),
879                        Err(err) => {
880                            tracing::warn!(from, to, %err, "Delegated Error");
881                            sleep(retry_delay).await;
882                        },
883                    }
884                }
885            }
886        });
887        // fetch undelegated events
888        let undelegated_events = stream::iter(chunks.clone()).then(|(from, to)| {
889            let retry_delay = l1_client.options().l1_retry_delay;
890            let stake_table_contract = stake_table_contract.clone();
891            async move {
892                tracing::debug!(from, to, "fetch Undelegated events in range");
893                loop {
894                    match stake_table_contract
895                        .Undelegated_filter()
896                        .from_block(from)
897                        .to_block(to)
898                        .query()
899                        .await
900                    {
901                        Ok(events) => break stream::iter(events),
902                        Err(err) => {
903                            tracing::warn!(from, to, %err, "Undelegated Error");
904                            sleep(retry_delay).await;
905                        },
906                    }
907                }
908            }
909        });
910
911        // fetch consensus keys updated events
912        let keys_update_events = stream::iter(chunks.clone()).then(|(from, to)| {
913            let retry_delay = l1_client.options().l1_retry_delay;
914            let stake_table_contract = stake_table_contract.clone();
915            async move {
916                tracing::debug!(from, to, "fetch ConsensusKeysUpdated events in range");
917                loop {
918                    match stake_table_contract
919                        .ConsensusKeysUpdated_filter()
920                        .from_block(from)
921                        .to_block(to)
922                        .query()
923                        .await
924                    {
925                        Ok(events) => break stream::iter(events),
926                        Err(err) => {
927                            tracing::warn!(from, to, %err, "ConsensusKeysUpdated Error");
928                            sleep(retry_delay).await;
929                        },
930                    }
931                }
932            }
933        });
934
935        // fetch consensus keys updated v2 events
936        let keys_update_events_v2 = stream::iter(chunks).then(|(from, to)| {
937            let retry_delay = l1_client.options().l1_retry_delay;
938            let stake_table_contract = stake_table_contract.clone();
939            async move {
940                tracing::debug!(from, to, "fetch ConsensusKeysUpdatedV2 events in range");
941                loop {
942                    match stake_table_contract
943                        .ConsensusKeysUpdatedV2_filter()
944                        .from_block(from)
945                        .to_block(to)
946                        .query()
947                        .await
948                    {
949                        Ok(events) => {
950                            break stream::iter(events.into_iter().filter(|(event, log)| {
951                                if let Err(e) = event.authenticate() {
952                                    tracing::warn!(%e, "Failed to authenticate ConsensusKeysUpdatedV2 event {}", log.display());
953                                    return false;
954                                }
955                                true
956                            }));
957                        },
958                        Err(err) => {
959                            tracing::warn!(from, to, %err, "ConsensusKeysUpdatedV2 Error");
960                            sleep(retry_delay).await;
961                        },
962                    }
963                }
964            }
965        });
966
967        let registrations = registered_events.flatten().collect().await;
968        let registrations_v2 = registered_events_v2.flatten().collect().await;
969        let deregistrations = deregistered_events.flatten().collect().await;
970        let delegated = delegated_events.flatten().collect().await;
971        let undelegated = undelegated_events.flatten().collect().await;
972        let keys = keys_update_events.flatten().collect().await;
973        let keys_v2 = keys_update_events_v2.flatten().collect().await;
974
975        StakeTableEvents::from_l1_logs(
976            registrations,
977            registrations_v2,
978            deregistrations,
979            delegated,
980            undelegated,
981            keys,
982            keys_v2,
983        )
984    }
985
986    /// Get `StakeTable` at specific l1 block height.
987    /// This function fetches and processes various events (ValidatorRegistered, ValidatorExit,
988    /// Delegated, Undelegated, and ConsensusKeysUpdated) within the block range from the
989    /// contract's initialization block to the provided `to_block` value.
990    /// Events are fetched in chunks to and retries are implemented for failed requests.
991    pub async fn fetch_and_store_stake_table_events(
992        &self,
993        contract: Address,
994        to_block: u64,
995    ) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
996        let events = self.fetch_events(contract, to_block).await?;
997
998        tracing::info!("storing events in storage to_block={to_block:?}");
999
1000        {
1001            let persistence_lock = self.persistence.lock().await;
1002            persistence_lock
1003                .store_events(to_block, events.clone())
1004                .await
1005                .inspect_err(|e| tracing::error!("failed to store events. err={e}"))?;
1006        }
1007
1008        Ok(events)
1009    }
1010
1011    // Only used by staking CLI which doesn't have persistence
1012    pub async fn fetch_all_validators_from_contract(
1013        l1_client: L1Client,
1014        contract: Address,
1015        to_block: u64,
1016    ) -> anyhow::Result<ValidatorMap> {
1017        let events = Self::fetch_events_from_contract(l1_client, contract, None, to_block).await;
1018        let sorted = events.sort_events()?;
1019        // Process the sorted events and return the resulting stake table.
1020        validators_from_l1_events(sorted.into_iter().map(|(_, e)| e))
1021            .context("failed to construct validators set from l1 events")
1022    }
1023    /// This function is used to calculate the reward for a block.
1024    /// It fetches the initial supply from the token contract.
1025    ///
1026    /// - We now rely on the `Initialized` event of the token contract (which should only occur once).
1027    /// - After locating this event, we fetch its transaction receipt and look for a decoded `Transfer` log
1028    /// - If either step fails, the function aborts to prevent incorrect reward calculations.
1029    ///
1030    /// Relying on mint events directly e.g., searching for mints from the zero address is prone to errors
1031    /// because in future when reward withdrawals are supported, there might be more than one mint transfer logs from
1032    /// zero address
1033    ///
1034    /// The ESP token contract itself does not expose the initialization block
1035    /// but the stake table contract does
1036    /// The stake table contract is deployed after the token contract as it holds the token
1037    /// contract address. We use the stake table contract initialization block as a safe upper bound when scanning
1038    ///  backwards for the token contract initialization event
1039    pub async fn fetch_block_reward(&self) -> Result<RewardAmount, FetchRewardError> {
1040        let chain_config = *self.chain_config.lock().await;
1041
1042        let stake_table_contract = chain_config
1043            .stake_table_contract
1044            .ok_or(FetchRewardError::MissingStakeTableContract)?;
1045
1046        let provider = self.l1_client.provider.clone();
1047        let stake_table = StakeTableV2::new(stake_table_contract, provider.clone());
1048
1049        // Get the block number where the stake table was initialized
1050        // Stake table contract has the token contract address
1051        // so the token contract is deployed before the stake table contract
1052        let stake_table_init_block = stake_table
1053            .initializedAtBlock()
1054            .block(BlockId::finalized())
1055            .call()
1056            .await
1057            .map_err(FetchRewardError::ContractCall)?
1058            ._0
1059            .to::<u64>();
1060
1061        tracing::info!("stake table init block ={stake_table_init_block}");
1062
1063        let token_address = stake_table
1064            .token()
1065            .block(BlockId::finalized())
1066            .call()
1067            .await
1068            .map_err(FetchRewardError::TokenAddressFetch)?
1069            ._0;
1070
1071        let token = EspToken::new(token_address, provider.clone());
1072
1073        // Try to fetch the `Initialized` event directly. This event is emitted only once,
1074        // during the token contract initialization. The initialization transaction also transfers initial supply minted
1075        // from the zero address. Since the result set is small (a single event),
1076        // most RPC providers like Infura and Alchemy allow querying across the full block range
1077        // If this fails because provider does not allow the query due to rate limiting (or some other error), we fall back to scanning over
1078        // a fixed block range.
1079        let init_logs = token
1080            .Initialized_filter()
1081            .from_block(0u64)
1082            .to_block(BlockNumberOrTag::Finalized)
1083            .query()
1084            .await;
1085
1086        let init_log = match init_logs {
1087            Ok(init_logs) => {
1088                if init_logs.is_empty() {
1089                    tracing::error!(
1090                        "Token Initialized event logs are empty. This should never happen"
1091                    );
1092                    return Err(FetchRewardError::MissingInitializedEvent);
1093                }
1094
1095                let (_, init_log) = init_logs[0].clone();
1096
1097                tracing::debug!(tx_hash = ?init_log.transaction_hash, "Found token `Initialized` event");
1098                init_log
1099            },
1100            Err(err) => {
1101                tracing::warn!(
1102                    "RPC returned error {err:?}. will fallback to scanning over fixed block range"
1103                );
1104                self.scan_token_contract_initialized_event_log(stake_table_init_block, token)
1105                    .await?
1106            },
1107        };
1108
1109        // Get the transaction that emitted the Initialized event
1110        let tx_hash =
1111            init_log
1112                .transaction_hash
1113                .ok_or_else(|| FetchRewardError::MissingTransactionHash {
1114                    init_log: init_log.clone().into(),
1115                })?;
1116
1117        // Get the transaction that emitted the Initialized event
1118        let init_tx = provider
1119            .get_transaction_receipt(tx_hash)
1120            .await
1121            .map_err(FetchRewardError::Rpc)?
1122            .ok_or_else(|| FetchRewardError::MissingTransactionReceipt {
1123                tx_hash: tx_hash.to_string(),
1124            })?;
1125
1126        let mint_transfer = init_tx.decoded_log::<EspToken::Transfer>().ok_or(
1127            FetchRewardError::DecodeTransferLog {
1128                tx_hash: tx_hash.to_string(),
1129            },
1130        )?;
1131
1132        tracing::debug!("mint transfer event ={mint_transfer:?}");
1133        if mint_transfer.from != Address::ZERO {
1134            return Err(FetchRewardError::InvalidMintFromAddress);
1135        }
1136
1137        let initial_supply = mint_transfer.value;
1138
1139        tracing::info!("Initial token amount: {} ESP", format_ether(initial_supply));
1140
1141        let reward = ((initial_supply * U256::from(INFLATION_RATE)) / U256::from(BLOCKS_PER_YEAR))
1142            .checked_div(U256::from(COMMISSION_BASIS_POINTS))
1143            .ok_or(FetchRewardError::DivisionByZero)?;
1144
1145        Ok(RewardAmount(reward))
1146    }
1147
1148    pub async fn scan_token_contract_initialized_event_log(
1149        &self,
1150        stake_table_init_block: u64,
1151        token: EspTokenInstance<(), L1Provider>,
1152    ) -> Result<Log, FetchRewardError> {
1153        let max_events_range = self.l1_client.options().l1_events_max_block_range;
1154        const MAX_BLOCKS_SCANNED: u64 = 200_000;
1155        let mut total_scanned = 0;
1156
1157        let mut from_block = stake_table_init_block.saturating_sub(max_events_range);
1158        let mut to_block = stake_table_init_block;
1159
1160        loop {
1161            if total_scanned >= MAX_BLOCKS_SCANNED {
1162                tracing::error!(
1163                    total_scanned,
1164                    "Exceeded maximum scan range while searching for token Initialized event"
1165                );
1166                return Err(FetchRewardError::ExceededMaxScanRange(MAX_BLOCKS_SCANNED));
1167            }
1168
1169            let init_logs = token
1170                .Initialized_filter()
1171                .from_block(from_block)
1172                .to_block(to_block)
1173                .query()
1174                .await
1175                .map_err(FetchRewardError::ScanQueryFailed)?;
1176
1177            if !init_logs.is_empty() {
1178                let (_, init_log) = init_logs[0].clone();
1179                tracing::info!(
1180                    from_block,
1181                    tx_hash = ?init_log.transaction_hash,
1182                    "Found token Initialized event during scan"
1183                );
1184                return Ok(init_log);
1185            }
1186
1187            total_scanned += max_events_range;
1188            from_block = from_block.saturating_sub(max_events_range);
1189            to_block = to_block.saturating_sub(max_events_range);
1190        }
1191    }
1192
1193    pub async fn fetch(&self, epoch: Epoch, header: Header) -> anyhow::Result<ValidatorMap> {
1194        let chain_config = self.get_chain_config(&header).await?;
1195        // update chain config
1196        *self.chain_config.lock().await = chain_config;
1197
1198        let Some(address) = chain_config.stake_table_contract else {
1199            bail!("No stake table contract address found in Chain config");
1200        };
1201
1202        let Some(l1_finalized_block_info) = header.l1_finalized() else {
1203            bail!("The epoch root for epoch {epoch} is missing the L1 finalized block info. This is a fatal error. Consensus is blocked and will not recover.");
1204        };
1205
1206        let events = match self
1207            .fetch_and_store_stake_table_events(address, l1_finalized_block_info.number())
1208            .await
1209            .map_err(GetStakeTablesError::L1ClientFetchError)
1210        {
1211            Ok(events) => events,
1212            Err(e) => {
1213                bail!("failed to fetch stake table events {e:?}");
1214            },
1215        };
1216
1217        match active_validator_set_from_l1_events(events.into_iter().map(|(_, e)| e)) {
1218            Ok(validators) => Ok(validators),
1219            Err(e) => {
1220                bail!("failed to construct stake table {e:?}");
1221            },
1222        }
1223    }
1224
1225    /// Retrieve and verify `ChainConfig`
1226    // TODO move to appropriate object (Header?)
1227    pub(crate) async fn get_chain_config(&self, header: &Header) -> anyhow::Result<ChainConfig> {
1228        let chain_config = self.chain_config.lock().await;
1229        let peers = self.peers.clone();
1230        let header_cf = header.chain_config();
1231        if chain_config.commit() == header_cf.commit() {
1232            return Ok(*chain_config);
1233        }
1234
1235        let cf = match header_cf.resolve() {
1236            Some(cf) => cf,
1237            None => peers
1238                .fetch_chain_config(header_cf.commit())
1239                .await
1240                .map_err(|err| {
1241                    tracing::error!("failed to get chain_config from peers. err: {err:?}");
1242                    err
1243                })?,
1244        };
1245
1246        Ok(cf)
1247    }
1248
1249    #[cfg(any(test, feature = "testing"))]
1250    pub fn mock() -> Self {
1251        use crate::{mock, v0_1::NoStorage};
1252        let chain_config = ChainConfig::default();
1253        let l1 = L1Client::new(vec!["http://localhost:3331".parse().unwrap()])
1254            .expect("Failed to create L1 client");
1255
1256        let peers = Arc::new(mock::MockStateCatchup::default());
1257        let persistence = NoStorage;
1258
1259        Self::new(peers, Arc::new(Mutex::new(persistence)), l1, chain_config)
1260    }
1261}
1262
1263/// Holds Stake table and da stake
1264#[derive(Clone, Debug)]
1265struct NonEpochCommittee {
1266    /// The nodes eligible for leadership.
1267    /// NOTE: This is currently a hack because the DA leader needs to be the quorum
1268    /// leader but without voting rights.
1269    eligible_leaders: Vec<PeerConfig<SeqTypes>>,
1270
1271    /// Keys for nodes participating in the network
1272    stake_table: Vec<PeerConfig<SeqTypes>>,
1273
1274    /// Keys for DA members
1275    da_members: Vec<PeerConfig<SeqTypes>>,
1276
1277    /// Stake entries indexed by public key, for efficient lookup.
1278    indexed_stake_table: HashMap<PubKey, PeerConfig<SeqTypes>>,
1279
1280    /// DA entries indexed by public key, for efficient lookup.
1281    indexed_da_members: HashMap<PubKey, PeerConfig<SeqTypes>>,
1282}
1283
1284/// Holds Stake table and da stake
1285#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
1286pub struct EpochCommittee {
1287    /// The nodes eligible for leadership.
1288    /// NOTE: This is currently a hack because the DA leader needs to be the quorum
1289    /// leader but without voting rights.
1290    eligible_leaders: Vec<PeerConfig<SeqTypes>>,
1291    /// Keys for nodes participating in the network
1292    stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>>,
1293    validators: ValidatorMap,
1294    address_mapping: HashMap<BLSPubKey, Address>,
1295}
1296
1297impl EpochCommittees {
1298    pub fn first_epoch(&self) -> Option<Epoch> {
1299        self.first_epoch
1300    }
1301
1302    pub fn fetcher(&self) -> &Fetcher {
1303        &self.fetcher
1304    }
1305
1306    /// Updates `Self.stake_table` with stake_table for
1307    /// `Self.contract_address` at `l1_block_height`. This is intended
1308    /// to be called before calling `self.stake()` so that
1309    /// `Self.stake_table` only needs to be updated once in a given
1310    /// life-cycle but may be read from many times.
1311    fn update(
1312        &mut self,
1313        epoch: EpochNumber,
1314        validators: ValidatorMap,
1315        block_reward: Option<RewardAmount>,
1316    ) {
1317        let mut address_mapping = HashMap::new();
1318        let stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>> = validators
1319            .values()
1320            .map(|v| {
1321                address_mapping.insert(v.stake_table_key, v.account);
1322                (
1323                    v.stake_table_key,
1324                    PeerConfig {
1325                        stake_table_entry: BLSPubKey::stake_table_entry(
1326                            &v.stake_table_key,
1327                            v.stake,
1328                        ),
1329                        state_ver_key: v.state_ver_key.clone(),
1330                    },
1331                )
1332            })
1333            .collect();
1334
1335        let eligible_leaders: Vec<PeerConfig<SeqTypes>> =
1336            stake_table.iter().map(|(_, l)| l.clone()).collect();
1337
1338        self.state.insert(
1339            epoch,
1340            EpochCommittee {
1341                eligible_leaders,
1342                stake_table,
1343                validators,
1344                address_mapping,
1345            },
1346        );
1347
1348        if let Some(block_reward) = block_reward {
1349            self.block_reward = block_reward;
1350        }
1351    }
1352
1353    pub fn validators(&self, epoch: &Epoch) -> anyhow::Result<ValidatorMap> {
1354        Ok(self
1355            .state
1356            .get(epoch)
1357            .context("state for found")?
1358            .validators
1359            .clone())
1360    }
1361
1362    pub fn address(&self, epoch: &Epoch, bls_key: BLSPubKey) -> anyhow::Result<Address> {
1363        let mapping = self
1364            .state
1365            .get(epoch)
1366            .context("state for found")?
1367            .address_mapping
1368            .clone();
1369
1370        Ok(*mapping.get(&bls_key).context(format!(
1371            "failed to get ethereum address for bls key {bls_key}. epoch={epoch}"
1372        ))?)
1373    }
1374
1375    pub fn get_validator_config(
1376        &self,
1377        epoch: &Epoch,
1378        key: BLSPubKey,
1379    ) -> anyhow::Result<Validator<BLSPubKey>> {
1380        let address = self.address(epoch, key)?;
1381        let validators = self.validators(epoch)?;
1382        validators
1383            .get(&address)
1384            .context("validator not found")
1385            .cloned()
1386    }
1387
1388    pub fn block_reward(&self) -> RewardAmount {
1389        self.block_reward
1390    }
1391
1392    // We need a constructor to match our concrete type.
1393    pub fn new_stake(
1394        // TODO remove `new` from trait and rename this to `new`.
1395        // https://github.com/EspressoSystems/HotShot/commit/fcb7d54a4443e29d643b3bbc53761856aef4de8b
1396        committee_members: Vec<PeerConfig<SeqTypes>>,
1397        da_members: Vec<PeerConfig<SeqTypes>>,
1398        block_reward: RewardAmount,
1399        fetcher: Fetcher,
1400    ) -> Self {
1401        // For each member, get the stake table entry
1402        let stake_table: Vec<_> = committee_members
1403            .iter()
1404            .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
1405            .cloned()
1406            .collect();
1407
1408        let eligible_leaders = stake_table.clone();
1409        // For each member, get the stake table entry
1410        let da_members: Vec<_> = da_members
1411            .iter()
1412            .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
1413            .cloned()
1414            .collect();
1415
1416        // Index the stake table by public key
1417        let indexed_stake_table: HashMap<PubKey, _> = stake_table
1418            .iter()
1419            .map(|peer_config| {
1420                (
1421                    PubKey::public_key(&peer_config.stake_table_entry),
1422                    peer_config.clone(),
1423                )
1424            })
1425            .collect();
1426
1427        // Index the stake table by public key
1428        let indexed_da_members: HashMap<PubKey, _> = da_members
1429            .iter()
1430            .map(|peer_config| {
1431                (
1432                    PubKey::public_key(&peer_config.stake_table_entry),
1433                    peer_config.clone(),
1434                )
1435            })
1436            .collect();
1437
1438        let members = NonEpochCommittee {
1439            eligible_leaders,
1440            stake_table,
1441            da_members,
1442            indexed_stake_table,
1443            indexed_da_members,
1444        };
1445
1446        let mut map = HashMap::new();
1447        let epoch_committee = EpochCommittee {
1448            eligible_leaders: members.eligible_leaders.clone(),
1449            stake_table: members
1450                .stake_table
1451                .iter()
1452                .map(|x| (PubKey::public_key(&x.stake_table_entry), x.clone()))
1453                .collect(),
1454            validators: Default::default(),
1455            address_mapping: HashMap::new(),
1456        };
1457        map.insert(Epoch::genesis(), epoch_committee.clone());
1458        // TODO: remove this, workaround for hotshot asking for stake tables from epoch 1
1459        map.insert(Epoch::genesis() + 1u64, epoch_committee.clone());
1460
1461        Self {
1462            non_epoch_committee: members,
1463            state: map,
1464            randomized_committees: BTreeMap::new(),
1465            first_epoch: None,
1466            block_reward,
1467            fetcher: Arc::new(fetcher),
1468        }
1469    }
1470
1471    pub async fn reload_stake(&mut self, limit: u64) {
1472        // Load the 50 latest stored stake tables
1473
1474        let loaded_stake = match self
1475            .fetcher
1476            .persistence
1477            .lock()
1478            .await
1479            .load_latest_stake(limit)
1480            .await
1481        {
1482            Ok(Some(loaded)) => loaded,
1483            Ok(None) => {
1484                tracing::warn!("No stake table history found in persistence!");
1485                return;
1486            },
1487            Err(e) => {
1488                tracing::error!("Failed to load stake table history from persistence: {e}");
1489                return;
1490            },
1491        };
1492
1493        for (epoch, stake_table) in loaded_stake {
1494            self.update(epoch, stake_table, None);
1495        }
1496    }
1497
1498    fn get_stake_table(&self, epoch: &Option<Epoch>) -> Option<Vec<PeerConfig<SeqTypes>>> {
1499        if let Some(epoch) = epoch {
1500            self.state
1501                .get(epoch)
1502                .map(|committee| committee.stake_table.clone().into_values().collect())
1503        } else {
1504            Some(self.non_epoch_committee.stake_table.clone())
1505        }
1506    }
1507}
1508
1509#[derive(Error, Debug)]
1510/// Error representing fail cases for retrieving the stake table.
1511enum GetStakeTablesError {
1512    #[error("Error fetching from L1: {0}")]
1513    L1ClientFetchError(anyhow::Error),
1514}
1515
1516#[derive(Error, Debug)]
1517#[error("Could not lookup leader")] // TODO error variants? message?
1518pub struct LeaderLookupError;
1519
1520// #[async_trait]
1521impl Membership<SeqTypes> for EpochCommittees {
1522    type Error = LeaderLookupError;
1523    // DO NOT USE. Dummy constructor to comply w/ trait.
1524    fn new(
1525        // TODO remove `new` from trait and remove this fn as well.
1526        // https://github.com/EspressoSystems/HotShot/commit/fcb7d54a4443e29d643b3bbc53761856aef4de8b
1527        _committee_members: Vec<PeerConfig<SeqTypes>>,
1528        _da_members: Vec<PeerConfig<SeqTypes>>,
1529    ) -> Self {
1530        panic!("This function has been replaced with new_stake()");
1531    }
1532
1533    /// Get the stake table for the current view
1534    fn stake_table(&self, epoch: Option<Epoch>) -> HSStakeTable<SeqTypes> {
1535        self.get_stake_table(&epoch).unwrap_or_default().into()
1536    }
1537    /// Get the stake table for the current view
1538    fn da_stake_table(&self, _epoch: Option<Epoch>) -> HSStakeTable<SeqTypes> {
1539        self.non_epoch_committee.da_members.clone().into()
1540    }
1541
1542    /// Get all members of the committee for the current view
1543    fn committee_members(
1544        &self,
1545        _view_number: <SeqTypes as NodeType>::View,
1546        epoch: Option<Epoch>,
1547    ) -> BTreeSet<PubKey> {
1548        let stake_table = self.stake_table(epoch);
1549        stake_table
1550            .iter()
1551            .map(|x| PubKey::public_key(&x.stake_table_entry))
1552            .collect()
1553    }
1554
1555    /// Get all members of the committee for the current view
1556    fn da_committee_members(
1557        &self,
1558        _view_number: <SeqTypes as NodeType>::View,
1559        _epoch: Option<Epoch>,
1560    ) -> BTreeSet<PubKey> {
1561        self.non_epoch_committee
1562            .indexed_da_members
1563            .clone()
1564            .into_keys()
1565            .collect()
1566    }
1567
1568    /// Get the stake table entry for a public key
1569    fn stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> Option<PeerConfig<SeqTypes>> {
1570        // Only return the stake if it is above zero
1571        if let Some(epoch) = epoch {
1572            self.state
1573                .get(&epoch)
1574                .and_then(|h| h.stake_table.get(pub_key))
1575                .cloned()
1576        } else {
1577            self.non_epoch_committee
1578                .indexed_stake_table
1579                .get(pub_key)
1580                .cloned()
1581        }
1582    }
1583
1584    /// Get the DA stake table entry for a public key
1585    fn da_stake(&self, pub_key: &PubKey, _epoch: Option<Epoch>) -> Option<PeerConfig<SeqTypes>> {
1586        // Only return the stake if it is above zero
1587        self.non_epoch_committee
1588            .indexed_da_members
1589            .get(pub_key)
1590            .cloned()
1591    }
1592
1593    /// Check if a node has stake in the committee
1594    fn has_stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> bool {
1595        self.stake(pub_key, epoch)
1596            .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1597            .unwrap_or_default()
1598    }
1599
1600    /// Check if a node has stake in the committee
1601    fn has_da_stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> bool {
1602        self.da_stake(pub_key, epoch)
1603            .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1604            .unwrap_or_default()
1605    }
1606
1607    /// Returns the leader's public key for a given view number and epoch.
1608    ///
1609    /// If an epoch is provided and a randomized committee exists for that epoch,
1610    /// the leader is selected from the randomized committee. Otherwise, the leader
1611    /// is selected from the non-epoch committee.
1612    ///
1613    /// # Arguments
1614    /// * `view_number` - The view number to index into the committee.
1615    /// * `epoch` - The epoch for which to determine the leader. If `None`, uses the non-epoch committee.
1616    ///
1617    /// # Errors
1618    /// Returns `LeaderLookupError` if the epoch is before the first epoch or if the committee is missing.
1619    fn lookup_leader(
1620        &self,
1621        view_number: <SeqTypes as NodeType>::View,
1622        epoch: Option<Epoch>,
1623    ) -> Result<PubKey, Self::Error> {
1624        match (self.first_epoch(), epoch) {
1625            (Some(first_epoch), Some(epoch)) => {
1626                if epoch < first_epoch {
1627                    tracing::error!(
1628                        "lookup_leader called with epoch {} before first epoch {}",
1629                        epoch,
1630                        first_epoch,
1631                    );
1632                    return Err(LeaderLookupError);
1633                }
1634                let Some(randomized_committee) = self.randomized_committees.get(&epoch) else {
1635                    tracing::error!(
1636                        "We are missing the randomized committee for epoch {}",
1637                        epoch
1638                    );
1639                    return Err(LeaderLookupError);
1640                };
1641
1642                Ok(PubKey::public_key(&select_randomized_leader(
1643                    randomized_committee,
1644                    *view_number,
1645                )))
1646            },
1647            (_, None) => {
1648                let leaders = &self.non_epoch_committee.eligible_leaders;
1649
1650                let index = *view_number as usize % leaders.len();
1651                let res = leaders[index].clone();
1652                Ok(PubKey::public_key(&res.stake_table_entry))
1653            },
1654            (None, Some(epoch)) => {
1655                tracing::error!(
1656                    "lookup_leader called with epoch {} but we don't have a first epoch",
1657                    epoch,
1658                );
1659                Err(LeaderLookupError)
1660            },
1661        }
1662    }
1663
1664    /// Get the total number of nodes in the committee
1665    fn total_nodes(&self, epoch: Option<Epoch>) -> usize {
1666        self.stake_table(epoch).len()
1667    }
1668
1669    /// Get the total number of DA nodes in the committee
1670    fn da_total_nodes(&self, epoch: Option<Epoch>) -> usize {
1671        self.da_stake_table(epoch).len()
1672    }
1673
1674    /// Get the voting success threshold for the committee
1675    fn success_threshold(&self, epoch: Option<Epoch>) -> U256 {
1676        let total_stake = self.total_stake(epoch);
1677        let one = U256::ONE;
1678        let two = U256::from(2);
1679        let three = U256::from(3);
1680        if total_stake < U256::MAX / two {
1681            ((total_stake * two) / three) + one
1682        } else {
1683            ((total_stake / three) * two) + two
1684        }
1685    }
1686
1687    /// Get the voting success threshold for the committee
1688    fn da_success_threshold(&self, epoch: Option<Epoch>) -> U256 {
1689        let total_stake = self.total_da_stake(epoch);
1690        let one = U256::ONE;
1691        let two = U256::from(2);
1692        let three = U256::from(3);
1693
1694        if total_stake < U256::MAX / two {
1695            ((total_stake * two) / three) + one
1696        } else {
1697            ((total_stake / three) * two) + two
1698        }
1699    }
1700
1701    /// Get the voting failure threshold for the committee
1702    fn failure_threshold(&self, epoch: Option<Epoch>) -> U256 {
1703        let total_stake = self.total_stake(epoch);
1704        let one = U256::ONE;
1705        let three = U256::from(3);
1706
1707        (total_stake / three) + one
1708    }
1709
1710    /// Get the voting upgrade threshold for the committee
1711    fn upgrade_threshold(&self, epoch: Option<Epoch>) -> U256 {
1712        let total_stake = self.total_stake(epoch);
1713        let nine = U256::from(9);
1714        let ten = U256::from(10);
1715
1716        let normal_threshold = self.success_threshold(epoch);
1717        let higher_threshold = if total_stake < U256::MAX / nine {
1718            (total_stake * nine) / ten
1719        } else {
1720            (total_stake / ten) * nine
1721        };
1722
1723        max(higher_threshold, normal_threshold)
1724    }
1725
1726    async fn add_epoch_root(
1727        membership: Arc<RwLock<Self>>,
1728        epoch: Epoch,
1729        block_header: Header,
1730    ) -> anyhow::Result<()> {
1731        let membership_reader = membership.read().await;
1732        if membership_reader.state.contains_key(&epoch) {
1733            tracing::info!(
1734                "We already have the stake table for epoch {}. Skipping L1 fetching.",
1735                epoch
1736            );
1737            return Ok(());
1738        }
1739        let fetcher = Arc::clone(&membership_reader.fetcher);
1740        drop(membership_reader);
1741
1742        let stake_tables = fetcher.fetch(epoch, block_header).await?;
1743
1744        let mut block_reward = None;
1745
1746        {
1747            let membership_reader = membership.read().await;
1748            // Assumes the stake table contract proxy address does not change
1749            // In the future, if we want to support updates to the stake table contract address via chain config,
1750            // or allow the contract to handle additional block reward calculation parameters (e.g., inflation, block time),
1751            // the `fetch_block_reward` logic can be updated to support per-epoch rewards.
1752            // Initially, the block reward is zero if the node starts on pre-epoch version
1753            // but it is updated on the first call to `add_epoch_root()`
1754            if membership_reader.block_reward == RewardAmount(U256::ZERO) {
1755                block_reward = Some(fetcher.fetch_block_reward().await?);
1756            }
1757        }
1758
1759        // Store stake table in persistence
1760        {
1761            let persistence_lock = fetcher.persistence.lock().await;
1762            if let Err(e) = persistence_lock
1763                .store_stake(epoch, stake_tables.clone())
1764                .await
1765            {
1766                tracing::error!(?e, "`add_epoch_root`, error storing stake table");
1767            }
1768        }
1769
1770        let mut membership_writer = membership.write().await;
1771        membership_writer.update(epoch, stake_tables, block_reward);
1772        Ok(())
1773    }
1774
1775    fn has_stake_table(&self, epoch: Epoch) -> bool {
1776        self.state.contains_key(&epoch)
1777    }
1778
1779    /// Checks if the randomized stake table is available for the given epoch.
1780    ///
1781    /// Returns `Ok(true)` if a randomized committee exists for the specified epoch and
1782    /// the epoch is not before the first epoch. Returns an error if `first_epoch` is `None`
1783    /// or if the provided epoch is before the first epoch.
1784    ///
1785    /// # Arguments
1786    /// * `epoch` - The epoch for which to check the presence of a randomized stake table.
1787    ///
1788    /// # Errors
1789    /// Returns an error if `first_epoch` is `None` or if `epoch` is before `first_epoch`.
1790    fn has_randomized_stake_table(&self, epoch: Epoch) -> anyhow::Result<bool> {
1791        let Some(first_epoch) = self.first_epoch else {
1792            bail!(
1793                "Called has_randomized_stake_table with epoch {} but first_epoch is None",
1794                epoch
1795            );
1796        };
1797        ensure!(
1798            epoch >= first_epoch,
1799            "Called has_randomized_stake_table with epoch {} but first_epoch is {}",
1800            epoch,
1801            first_epoch
1802        );
1803        Ok(self.randomized_committees.contains_key(&epoch))
1804    }
1805
1806    async fn get_epoch_root(
1807        membership: Arc<RwLock<Self>>,
1808        block_height: u64,
1809        epoch: Epoch,
1810    ) -> anyhow::Result<Leaf2> {
1811        let membership_reader = membership.read().await;
1812        let peers = membership_reader.fetcher.peers.clone();
1813        let stake_table = membership_reader.stake_table(Some(epoch)).clone();
1814        let success_threshold = membership_reader.success_threshold(Some(epoch));
1815        drop(membership_reader);
1816
1817        // Fetch leaves from peers
1818        let leaf: Leaf2 = peers
1819            .fetch_leaf(block_height, stake_table.clone(), success_threshold)
1820            .await?;
1821
1822        Ok(leaf)
1823    }
1824
1825    async fn get_epoch_drb(
1826        membership: Arc<RwLock<Self>>,
1827        block_height: u64,
1828        epoch: Epoch,
1829    ) -> anyhow::Result<DrbResult> {
1830        let membership_reader = membership.read().await;
1831        let peers = membership_reader.fetcher.peers.clone();
1832        let stake_table = membership_reader.stake_table(Some(epoch)).clone();
1833        let success_threshold = membership_reader.success_threshold(Some(epoch));
1834        drop(membership_reader);
1835
1836        tracing::debug!(
1837            "Getting DRB for epoch {}, block height {}",
1838            epoch,
1839            block_height
1840        );
1841        let drb_leaf = peers
1842            .try_fetch_leaf(1, block_height, stake_table, success_threshold)
1843            .await?;
1844
1845        let Some(drb) = drb_leaf.next_drb_result else {
1846            tracing::error!(
1847          "We received a leaf that should contain a DRB result, but the DRB result is missing: {:?}",
1848          drb_leaf
1849        );
1850
1851            bail!("DRB leaf is missing the DRB result.");
1852        };
1853
1854        Ok(drb)
1855    }
1856
1857    fn add_drb_result(&mut self, epoch: Epoch, drb: DrbResult) {
1858        let Some(raw_stake_table) = self.state.get(&epoch) else {
1859            tracing::error!("add_drb_result({epoch}, {drb:?}) was called, but we do not yet have the stake table for epoch {epoch}");
1860            return;
1861        };
1862
1863        let leaders = raw_stake_table
1864            .eligible_leaders
1865            .clone()
1866            .into_iter()
1867            .map(|peer_config| peer_config.stake_table_entry)
1868            .collect::<Vec<_>>();
1869        let randomized_committee = generate_stake_cdf(leaders, drb);
1870
1871        self.randomized_committees
1872            .insert(epoch, randomized_committee);
1873    }
1874
1875    fn set_first_epoch(&mut self, epoch: Epoch, initial_drb_result: DrbResult) {
1876        self.first_epoch = Some(epoch);
1877
1878        let epoch_committee = self.state.get(&Epoch::genesis()).unwrap().clone();
1879        self.state.insert(epoch, epoch_committee.clone());
1880        self.state.insert(epoch + 1, epoch_committee);
1881        self.add_drb_result(epoch, initial_drb_result);
1882        self.add_drb_result(epoch + 1, initial_drb_result);
1883    }
1884
1885    fn first_epoch(&self) -> Option<<SeqTypes as NodeType>::Epoch> {
1886        self.first_epoch
1887    }
1888}
1889
1890#[cfg(any(test, feature = "testing"))]
1891impl super::v0_3::StakeTable {
1892    /// Generate a `StakeTable` with `n` members.
1893    pub fn mock(n: u64) -> Self {
1894        [..n]
1895            .iter()
1896            .map(|_| PeerConfig::default())
1897            .collect::<Vec<PeerConfig<SeqTypes>>>()
1898            .into()
1899    }
1900}
1901
1902#[cfg(any(test, feature = "testing"))]
1903impl DAMembers {
1904    /// Generate a `DaMembers` (alias committee) with `n` members.
1905    pub fn mock(n: u64) -> Self {
1906        [..n]
1907            .iter()
1908            .map(|_| PeerConfig::default())
1909            .collect::<Vec<PeerConfig<SeqTypes>>>()
1910            .into()
1911    }
1912}
1913
1914#[cfg(any(test, feature = "testing"))]
1915pub mod testing {
1916    use alloy::primitives::Bytes;
1917    use hotshot_contract_adapter::{
1918        sol_types::{EdOnBN254PointSol, G1PointSol, G2PointSol},
1919        stake_table::{sign_address_bls, sign_address_schnorr},
1920    };
1921    use hotshot_types::{light_client::StateKeyPair, signature_key::BLSKeyPair};
1922    use rand::{Rng as _, RngCore as _};
1923
1924    use super::*;
1925
1926    // TODO: current tests are just sanity checks, we need more.
1927
1928    #[derive(Debug, Clone)]
1929    pub struct TestValidator {
1930        pub account: Address,
1931        pub bls_vk: G2PointSol,
1932        pub schnorr_vk: EdOnBN254PointSol,
1933        pub commission: u16,
1934        pub bls_sig: G1PointSol,
1935        pub schnorr_sig: Bytes,
1936    }
1937
1938    impl TestValidator {
1939        pub fn random() -> Self {
1940            let account = Address::random();
1941            let commission = rand::thread_rng().gen_range(0..10000);
1942            Self::random_update_keys(account, commission)
1943        }
1944
1945        pub fn randomize_keys(&self) -> Self {
1946            Self::random_update_keys(self.account, self.commission)
1947        }
1948
1949        fn random_update_keys(account: Address, commission: u16) -> Self {
1950            let mut rng = &mut rand::thread_rng();
1951            let mut seed = [0u8; 32];
1952            rng.fill_bytes(&mut seed);
1953            let bls_key_pair = BLSKeyPair::generate(&mut rng);
1954            let bls_sig = sign_address_bls(&bls_key_pair, account);
1955            let schnorr_key_pair = StateKeyPair::generate_from_seed_indexed(seed, 0);
1956            let schnorr_sig = sign_address_schnorr(&schnorr_key_pair, account);
1957            Self {
1958                account,
1959                bls_vk: bls_key_pair.ver_key().to_affine().into(),
1960                schnorr_vk: schnorr_key_pair.ver_key().to_affine().into(),
1961                commission,
1962                bls_sig,
1963                schnorr_sig,
1964            }
1965        }
1966    }
1967
1968    impl From<&TestValidator> for ValidatorRegistered {
1969        fn from(value: &TestValidator) -> Self {
1970            Self {
1971                account: value.account,
1972                blsVk: value.bls_vk,
1973                schnorrVk: value.schnorr_vk,
1974                commission: value.commission,
1975            }
1976        }
1977    }
1978
1979    impl From<&TestValidator> for ValidatorRegisteredV2 {
1980        fn from(value: &TestValidator) -> Self {
1981            Self {
1982                account: value.account,
1983                blsVK: value.bls_vk,
1984                schnorrVK: value.schnorr_vk,
1985                commission: value.commission,
1986                blsSig: value.bls_sig.into(),
1987                schnorrSig: value.schnorr_sig.clone(),
1988            }
1989        }
1990    }
1991
1992    impl From<&TestValidator> for ConsensusKeysUpdated {
1993        fn from(value: &TestValidator) -> Self {
1994            Self {
1995                account: value.account,
1996                blsVK: value.bls_vk,
1997                schnorrVK: value.schnorr_vk,
1998            }
1999        }
2000    }
2001
2002    impl From<&TestValidator> for ConsensusKeysUpdatedV2 {
2003        fn from(value: &TestValidator) -> Self {
2004            Self {
2005                account: value.account,
2006                blsVK: value.bls_vk,
2007                schnorrVK: value.schnorr_vk,
2008                blsSig: value.bls_sig.into(),
2009                schnorrSig: value.schnorr_sig.clone(),
2010            }
2011        }
2012    }
2013
2014    impl From<&TestValidator> for ValidatorExit {
2015        fn from(value: &TestValidator) -> Self {
2016            Self {
2017                validator: value.account,
2018            }
2019        }
2020    }
2021
2022    impl Validator<BLSPubKey> {
2023        pub fn mock() -> Validator<BLSPubKey> {
2024            let val = TestValidator::random();
2025            let rng = &mut rand::thread_rng();
2026            let mut seed = [1u8; 32];
2027            rng.fill_bytes(&mut seed);
2028            let mut validator_stake = alloy::primitives::U256::from(0);
2029            let mut delegators = HashMap::new();
2030            for _i in 0..=5000 {
2031                let stake: u64 = rng.gen_range(0..10000);
2032                delegators.insert(Address::random(), alloy::primitives::U256::from(stake));
2033                validator_stake += alloy::primitives::U256::from(stake);
2034            }
2035
2036            let stake_table_key = val.bls_vk.into();
2037            let state_ver_key = val.schnorr_vk.into();
2038
2039            Validator {
2040                account: val.account,
2041                stake_table_key,
2042                state_ver_key,
2043                stake: validator_stake,
2044                commission: val.commission,
2045                delegators,
2046            }
2047        }
2048    }
2049}
2050
2051#[cfg(test)]
2052mod tests {
2053    use alloy::{primitives::Address, rpc::types::Log};
2054    use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
2055    use pretty_assertions::assert_matches;
2056    use rstest::rstest;
2057    use sequencer_utils::test_utils::setup_test;
2058
2059    use super::*;
2060    use crate::v0::impls::testing::*;
2061
2062    #[test]
2063    fn test_from_l1_events() -> anyhow::Result<()> {
2064        setup_test();
2065        // Build a stake table with one DA node and one consensus node.
2066        let val_1 = TestValidator::random();
2067        let val_1_new_keys = val_1.randomize_keys();
2068        let val_2 = TestValidator::random();
2069        let val_2_new_keys = val_2.randomize_keys();
2070        let delegator = Address::random();
2071        let mut events: Vec<StakeTableEvent> = [
2072            ValidatorRegistered::from(&val_1).into(),
2073            ValidatorRegisteredV2::from(&val_2).into(),
2074            Delegated {
2075                delegator,
2076                validator: val_1.account,
2077                amount: U256::from(10),
2078            }
2079            .into(),
2080            ConsensusKeysUpdated::from(&val_1_new_keys).into(),
2081            ConsensusKeysUpdatedV2::from(&val_2_new_keys).into(),
2082            Undelegated {
2083                delegator,
2084                validator: val_1.account,
2085                amount: U256::from(7),
2086            }
2087            .into(),
2088            // delegate to the same validator again
2089            Delegated {
2090                delegator,
2091                validator: val_1.account,
2092                amount: U256::from(5),
2093            }
2094            .into(),
2095            // delegate to the second validator
2096            Delegated {
2097                delegator: Address::random(),
2098                validator: val_2.account,
2099                amount: U256::from(3),
2100            }
2101            .into(),
2102        ]
2103        .to_vec();
2104
2105        let st = active_validator_set_from_l1_events(events.iter().cloned())?;
2106        let st_val_1 = st.get(&val_1.account).unwrap();
2107        // final staked amount should be 10 (delegated) - 7 (undelegated) + 5 (Delegated)
2108        assert_eq!(st_val_1.stake, U256::from(8));
2109        assert_eq!(st_val_1.commission, val_1.commission);
2110        assert_eq!(st_val_1.delegators.len(), 1);
2111        // final delegated amount should be 10 (delegated) - 7 (undelegated) + 5 (Delegated)
2112        assert_eq!(*st_val_1.delegators.get(&delegator).unwrap(), U256::from(8));
2113
2114        let st_val_2 = st.get(&val_2.account).unwrap();
2115        assert_eq!(st_val_2.stake, U256::from(3));
2116        assert_eq!(st_val_2.commission, val_2.commission);
2117        assert_eq!(st_val_2.delegators.len(), 1);
2118
2119        events.push(ValidatorExit::from(&val_1).into());
2120
2121        let st = active_validator_set_from_l1_events(events.iter().cloned())?;
2122        // The first validator should have been removed
2123        assert_eq!(st.get(&val_1.account), None);
2124
2125        // The second validator should be unchanged
2126        let st_val_2 = st.get(&val_2.account).unwrap();
2127        assert_eq!(st_val_2.stake, U256::from(3));
2128        assert_eq!(st_val_2.commission, val_2.commission);
2129        assert_eq!(st_val_2.delegators.len(), 1);
2130
2131        // remove the 2nd validator
2132        events.push(ValidatorExit::from(&val_2).into());
2133
2134        // This should fail because the validator has exited and no longer exists in the stake table.
2135        assert!(active_validator_set_from_l1_events(events.iter().cloned()).is_err());
2136
2137        Ok(())
2138    }
2139
2140    #[test]
2141    fn test_from_l1_events_failures() -> anyhow::Result<()> {
2142        let val = TestValidator::random();
2143        let delegator = Address::random();
2144
2145        let register: StakeTableEvent = ValidatorRegistered::from(&val).into();
2146        let register_v2: StakeTableEvent = ValidatorRegisteredV2::from(&val).into();
2147        let delegate: StakeTableEvent = Delegated {
2148            delegator,
2149            validator: val.account,
2150            amount: U256::from(10),
2151        }
2152        .into();
2153        let key_update: StakeTableEvent = ConsensusKeysUpdated::from(&val).into();
2154        let key_update_v2: StakeTableEvent = ConsensusKeysUpdatedV2::from(&val).into();
2155        let undelegate: StakeTableEvent = Undelegated {
2156            delegator,
2157            validator: val.account,
2158            amount: U256::from(7),
2159        }
2160        .into();
2161
2162        let exit: StakeTableEvent = ValidatorExit::from(&val).into();
2163
2164        let cases = [
2165            vec![exit],
2166            vec![undelegate.clone()],
2167            vec![delegate.clone()],
2168            vec![key_update],
2169            vec![key_update_v2],
2170            vec![register.clone(), register.clone()],
2171            vec![register_v2.clone(), register_v2.clone()],
2172            vec![register.clone(), register_v2.clone()],
2173            vec![register_v2.clone(), register.clone()],
2174            vec![
2175                register,
2176                delegate.clone(),
2177                undelegate.clone(),
2178                undelegate.clone(),
2179            ],
2180            vec![register_v2, delegate, undelegate.clone(), undelegate],
2181        ];
2182
2183        for events in cases.iter() {
2184            // NOTE: not selecting the active validator set because we care about wrong sequences of
2185            // events being detected. If we compute the active set we will also get an error if the
2186            // set is empty but that's not what we want to test here.
2187            let res = validators_from_l1_events(events.iter().cloned());
2188            assert!(
2189                res.is_err(),
2190                "events {res:?}, not a valid sequence of events"
2191            );
2192        }
2193        Ok(())
2194    }
2195
2196    #[test]
2197    fn test_validators_selection() {
2198        let mut validators = IndexMap::new();
2199        let mut highest_stake = alloy::primitives::U256::ZERO;
2200
2201        for _i in 0..3000 {
2202            let validator = Validator::mock();
2203            validators.insert(validator.account, validator.clone());
2204
2205            if validator.stake > highest_stake {
2206                highest_stake = validator.stake;
2207            }
2208        }
2209
2210        let minimum_stake = highest_stake / U256::from(VID_TARGET_TOTAL_STAKE);
2211
2212        select_active_validator_set(&mut validators).expect("Failed to select validators");
2213        assert!(
2214            validators.len() <= 100,
2215            "validators len is {}, expected at most 100",
2216            validators.len()
2217        );
2218
2219        let mut selected_validators_highest_stake = alloy::primitives::U256::ZERO;
2220        // Ensure every validator in the final selection is above or equal to minimum stake
2221        for (address, validator) in &validators {
2222            assert!(
2223                validator.stake >= minimum_stake,
2224                "Validator {:?} has stake below minimum: {}",
2225                address,
2226                validator.stake
2227            );
2228
2229            if validator.stake > selected_validators_highest_stake {
2230                selected_validators_highest_stake = validator.stake;
2231            }
2232        }
2233    }
2234
2235    // For a bug where the GCL did not match the stake table contract implementation and allowed
2236    // duplicated BLS keys via the update keys events.
2237    #[rstest::rstest]
2238    fn test_regression_non_unique_bls_keys_not_discarded(
2239        #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
2240        version: StakeTableContractVersion,
2241    ) {
2242        let val = TestValidator::random();
2243        let register: StakeTableEvent = match version {
2244            StakeTableContractVersion::V1 => ValidatorRegistered::from(&val).into(),
2245            StakeTableContractVersion::V2 => ValidatorRegisteredV2::from(&val).into(),
2246        };
2247        let delegate: StakeTableEvent = Delegated {
2248            delegator: Address::random(),
2249            validator: val.account,
2250            amount: U256::from(10),
2251        }
2252        .into();
2253
2254        // first ensure that wan build a valid stake table
2255        assert!(active_validator_set_from_l1_events(
2256            vec![register.clone(), delegate.clone()].into_iter()
2257        )
2258        .is_ok());
2259
2260        // add the invalid key update (re-using the same consensus keys)
2261        let key_update = ConsensusKeysUpdated::from(&val).into();
2262        let err =
2263            active_validator_set_from_l1_events(vec![register, delegate, key_update].into_iter())
2264                .unwrap_err();
2265
2266        let bls: BLSPubKey = val.bls_vk.into();
2267        assert!(matches!(err, StakeTableError::BlsKeyAlreadyUsed(addr) if addr == bls.to_string()));
2268    }
2269
2270    #[test]
2271    fn test_display_log() {
2272        let serialized = r#"{"address":"0x0000000000000000000000000000000000000069","topics":["0x0000000000000000000000000000000000000000000000000000000000000069"],"data":"0x69","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000069","blockNumber":"0x69","blockTimestamp":"0x69","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000069","transactionIndex":"0x69","logIndex":"0x70","removed":false}"#;
2273        let log: Log = serde_json::from_str(serialized).unwrap();
2274        assert_eq!(
2275            log.display(),
2276            "Log(block=105,index=112,transaction_hash=0x0000000000000000000000000000000000000000000000000000000000000069)"
2277        )
2278    }
2279
2280    #[rstest]
2281    #[case::v1(StakeTableContractVersion::V1)]
2282    #[case::v2(StakeTableContractVersion::V2)]
2283    fn test_register_validator(#[case] version: StakeTableContractVersion) {
2284        let mut state = StakeTableState::new();
2285        let validator = TestValidator::random();
2286
2287        let event = match version {
2288            StakeTableContractVersion::V1 => StakeTableEvent::Register((&validator).into()),
2289            StakeTableContractVersion::V2 => StakeTableEvent::RegisterV2((&validator).into()),
2290        };
2291
2292        assert!(state.apply_event(event).unwrap().is_ok());
2293
2294        let stored = state.validators.get(&validator.account).unwrap();
2295        assert_eq!(stored.account, validator.account);
2296    }
2297
2298    #[rstest]
2299    #[case::v1(StakeTableContractVersion::V1)]
2300    #[case::v2(StakeTableContractVersion::V2)]
2301    fn test_validator_already_registered(#[case] version: StakeTableContractVersion) {
2302        let mut stake_table_state = StakeTableState::new();
2303
2304        let test_validator = TestValidator::random();
2305
2306        // First registration attempt using the specified contract version
2307        let first_registration_result =
2308            match version {
2309                StakeTableContractVersion::V1 => stake_table_state
2310                    .apply_event(StakeTableEvent::Register((&test_validator).into())),
2311                StakeTableContractVersion::V2 => stake_table_state
2312                    .apply_event(StakeTableEvent::RegisterV2((&test_validator).into())),
2313            };
2314
2315        // Expect the first registration to succeed
2316        assert!(first_registration_result.unwrap().is_ok());
2317
2318        // attempt using V1 registration (should fail with AlreadyRegistered)
2319        let v1_already_registered_result =
2320            stake_table_state.apply_event(StakeTableEvent::Register((&test_validator).into()));
2321
2322        pretty_assertions::assert_matches!(
2323           v1_already_registered_result,  Err(StakeTableError::AlreadyRegistered(account)) if account == test_validator.account,
2324           "Expected AlreadyRegistered error. version ={version:?} result={v1_already_registered_result:?}",
2325        );
2326
2327        // attempt using V2 registration (should also fail with AlreadyRegistered)
2328        let v2_already_registered_result =
2329            stake_table_state.apply_event(StakeTableEvent::RegisterV2((&test_validator).into()));
2330
2331        pretty_assertions::assert_matches!(
2332            v2_already_registered_result,
2333            Err(StakeTableError::AlreadyRegistered(account)) if account == test_validator.account,
2334            "Expected AlreadyRegistered error. version ={version:?} result={v2_already_registered_result:?}",
2335
2336        );
2337    }
2338
2339    #[test]
2340    fn test_register_validator_v2_auth_fails() {
2341        let mut state = StakeTableState::new();
2342        let mut val = TestValidator::random();
2343        val.bls_sig = Default::default();
2344        let event = StakeTableEvent::RegisterV2((&val).into());
2345
2346        let result = state.apply_event(event);
2347        assert!(matches!(
2348            result,
2349            Err(StakeTableError::AuthenticationFailed(_))
2350        ));
2351    }
2352
2353    #[test]
2354    fn test_deregister_validator() {
2355        let mut state = StakeTableState::new();
2356        let val = TestValidator::random();
2357
2358        let reg = StakeTableEvent::Register((&val).into());
2359        state.apply_event(reg).unwrap().unwrap();
2360
2361        let dereg = StakeTableEvent::Deregister((&val).into());
2362        assert!(state.apply_event(dereg).unwrap().is_ok());
2363        assert!(!state.validators.contains_key(&val.account));
2364    }
2365
2366    #[test]
2367    fn test_delegate_and_undelegate() {
2368        let mut state = StakeTableState::new();
2369        let val = TestValidator::random();
2370        state
2371            .apply_event(StakeTableEvent::Register((&val).into()))
2372            .unwrap()
2373            .unwrap();
2374
2375        let delegator = Address::random();
2376        let amount = U256::from(1000);
2377        let delegate_event = StakeTableEvent::Delegate(Delegated {
2378            delegator,
2379            validator: val.account,
2380            amount,
2381        });
2382        assert!(state.apply_event(delegate_event).unwrap().is_ok());
2383
2384        let validator = state.validators.get(&val.account).unwrap();
2385        assert_eq!(validator.delegators.get(&delegator).cloned(), Some(amount));
2386
2387        let undelegate_event = StakeTableEvent::Undelegate(Undelegated {
2388            delegator,
2389            validator: val.account,
2390            amount,
2391        });
2392        assert!(state.apply_event(undelegate_event).unwrap().is_ok());
2393        let validator = state.validators.get(&val.account).unwrap();
2394        assert!(!validator.delegators.contains_key(&delegator));
2395    }
2396
2397    #[rstest]
2398    #[case::v1(StakeTableContractVersion::V1)]
2399    #[case::v2(StakeTableContractVersion::V2)]
2400    fn test_key_update_event(#[case] version: StakeTableContractVersion) {
2401        let mut state = StakeTableState::new();
2402        let val = TestValidator::random();
2403
2404        // Always register first using V1 to simulate upgrade scenarios
2405        state
2406            .apply_event(StakeTableEvent::Register((&val).into()))
2407            .unwrap()
2408            .unwrap();
2409
2410        let new_keys = val.randomize_keys();
2411
2412        let event = match version {
2413            StakeTableContractVersion::V1 => StakeTableEvent::KeyUpdate((&new_keys).into()),
2414            StakeTableContractVersion::V2 => StakeTableEvent::KeyUpdateV2((&new_keys).into()),
2415        };
2416
2417        assert!(state.apply_event(event).unwrap().is_ok());
2418
2419        let updated = state.validators.get(&val.account).unwrap();
2420        assert_eq!(updated.stake_table_key, new_keys.bls_vk.into());
2421        assert_eq!(updated.state_ver_key, new_keys.schnorr_vk.into());
2422    }
2423
2424    #[test]
2425    fn test_duplicate_bls_key() {
2426        let mut state = StakeTableState::new();
2427        let val = TestValidator::random();
2428        let event1 = StakeTableEvent::Register((&val).into());
2429        let mut val2 = TestValidator::random();
2430        val2.bls_vk = val.bls_vk;
2431        val2.account = Address::random();
2432
2433        let event2 = StakeTableEvent::Register((&val2).into());
2434        assert!(state.apply_event(event1).unwrap().is_ok());
2435        let result = state.apply_event(event2);
2436
2437        let expected_bls_key = BLSPubKey::from(val.bls_vk).to_string();
2438
2439        assert_matches!(
2440            result,
2441            Err(StakeTableError::BlsKeyAlreadyUsed(key))
2442                if key == expected_bls_key,
2443            "Expected BlsKeyAlreadyUsed({expected_bls_key}), but got: {result:?}",
2444        );
2445    }
2446
2447    #[test]
2448    fn test_duplicate_schnorr_key() {
2449        let mut state = StakeTableState::new();
2450        let val = TestValidator::random();
2451        let event1 = StakeTableEvent::Register((&val).into());
2452        let mut val2 = TestValidator::random();
2453        val2.schnorr_vk = val.schnorr_vk;
2454        val2.account = Address::random();
2455        val2.bls_vk = val2.randomize_keys().bls_vk;
2456
2457        let event2 = StakeTableEvent::Register((&val2).into());
2458        assert!(state.apply_event(event1).unwrap().is_ok());
2459        let result = state.apply_event(event2);
2460
2461        let schnorr: SchnorrPubKey = val.schnorr_vk.into();
2462        assert_matches!(
2463            result,
2464            Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(key)))
2465                if key == schnorr.to_string(),
2466            "Expected SchnorrKeyAlreadyUsed({schnorr}), but got: {result:?}",
2467
2468        );
2469    }
2470
2471    #[test]
2472    fn test_register_and_deregister_validator() {
2473        let mut state = StakeTableState::new();
2474        let validator = TestValidator::random();
2475        let event = StakeTableEvent::Register((&validator).into());
2476        assert!(state.apply_event(event).unwrap().is_ok());
2477
2478        let deregister_event = StakeTableEvent::Deregister((&validator).into());
2479        assert!(state.apply_event(deregister_event).unwrap().is_ok());
2480    }
2481
2482    #[test]
2483    fn test_delegate_zero_amount_is_rejected() {
2484        let mut state = StakeTableState::new();
2485        let validator = TestValidator::random();
2486        let account = validator.account;
2487        state
2488            .apply_event(StakeTableEvent::Register((&validator).into()))
2489            .unwrap()
2490            .unwrap();
2491
2492        let delegator = Address::random();
2493        let amount = U256::ZERO;
2494        let event = StakeTableEvent::Delegate(Delegated {
2495            delegator,
2496            validator: account,
2497            amount,
2498        });
2499        let result = state.apply_event(event);
2500
2501        assert_matches!(
2502            result,
2503            Err(StakeTableError::ZeroDelegatorStake(addr))
2504                if addr == delegator,
2505            "delegator stake is zero"
2506
2507        );
2508    }
2509
2510    #[test]
2511    fn test_undelegate_more_than_stake_fails() {
2512        let mut state = StakeTableState::new();
2513        let validator = TestValidator::random();
2514        let account = validator.account;
2515        state
2516            .apply_event(StakeTableEvent::Register((&validator).into()))
2517            .unwrap()
2518            .unwrap();
2519
2520        let delegator = Address::random();
2521        let event = StakeTableEvent::Delegate(Delegated {
2522            delegator,
2523            validator: account,
2524            amount: U256::from(10u64),
2525        });
2526        state.apply_event(event).unwrap().unwrap();
2527
2528        let result = state.apply_event(StakeTableEvent::Undelegate(Undelegated {
2529            delegator,
2530            validator: account,
2531            amount: U256::from(20u64),
2532        }));
2533        assert_matches!(
2534            result,
2535            Err(StakeTableError::InsufficientStake),
2536            "Expected InsufficientStake error, got: {result:?}",
2537        );
2538    }
2539
2540    #[tokio::test(flavor = "multi_thread")]
2541    async fn test_decaf_stake_table() {
2542        setup_test();
2543
2544        // The following commented-out block demonstrates how the `decaf_stake_table_events.json`
2545        // and `decaf_stake_table.json` files were originally generated.
2546
2547        // It generates decaf stake table data by fetching events from the contract,
2548        // writes events and the constructed stake table to JSON files.
2549
2550        /*
2551        let l1 = L1Client::new(vec!["https://ethereum-sepolia.publicnode.com"
2552            .parse()
2553            .unwrap()])
2554        .unwrap();
2555        let contract_address = "0x40304fbe94d5e7d1492dd90c53a2d63e8506a037";
2556
2557        let events = Fetcher::fetch_events_from_contract(
2558            l1,
2559            contract_address.parse().unwrap(),
2560            None,
2561            8582328,
2562        )
2563        .await;
2564
2565        let sorted_events = events.sort_events().expect("failed to sort");
2566
2567        // Serialize and write sorted events
2568        let json_events = serde_json::to_string_pretty(&sorted_events)?;
2569        let mut events_file = File::create("decaf_stake_table_events.json").await?;
2570        events_file.write_all(json_events.as_bytes()).await?;
2571
2572        // Process into stake table
2573        let stake_table = validators_from_l1_events(sorted_events.into_iter().map(|(_, e)| e))?;
2574
2575        // Serialize and write stake table
2576        let json_stake_table = serde_json::to_string_pretty(&stake_table)?;
2577        let mut stake_file = File::create("decaf_stake_table.json").await?;
2578        stake_file.write_all(json_stake_table.as_bytes()).await?;
2579        */
2580
2581        let events_json =
2582            std::fs::read_to_string("../data/v3/decaf_stake_table_events.json").unwrap();
2583        let events: Vec<(EventKey, StakeTableEvent)> = serde_json::from_str(&events_json).unwrap();
2584
2585        // Reconstruct stake table from events
2586        let reconstructed_stake_table =
2587            active_validator_set_from_l1_events(events.into_iter().map(|(_, e)| e)).unwrap();
2588
2589        let stake_table_json =
2590            std::fs::read_to_string("../data/v3/decaf_stake_table.json").unwrap();
2591        let expected: IndexMap<Address, Validator<BLSPubKey>> =
2592            serde_json::from_str(&stake_table_json).unwrap();
2593
2594        assert_eq!(
2595            reconstructed_stake_table, expected,
2596            "Stake table reconstructed from events does not match the expected stake table "
2597        );
2598    }
2599}