espresso_types/v0/impls/
stake_table.rs

1use std::{
2    cmp::{max, min},
3    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
4    future::Future,
5    sync::Arc,
6    time::{Duration, Instant},
7};
8
9use alloy::{
10    eips::{BlockId, BlockNumberOrTag},
11    primitives::{utils::format_ether, Address, U256},
12    providers::Provider,
13    rpc::types::Log,
14};
15use anyhow::{bail, ensure, Context};
16use async_lock::{Mutex, RwLock};
17use committable::Committable;
18use futures::{
19    future::BoxFuture,
20    stream::{self, StreamExt},
21};
22use hotshot::types::{BLSPubKey, SchnorrPubKey, SignatureKey as _};
23use hotshot_contract_adapter::sol_types::{
24    EspToken::{self, EspTokenInstance},
25    StakeTableV2::{
26        self, ConsensusKeysUpdated, ConsensusKeysUpdatedV2, Delegated, Undelegated, ValidatorExit,
27        ValidatorRegistered, ValidatorRegisteredV2,
28    },
29};
30use hotshot_types::{
31    data::{vid_disperse::VID_TARGET_TOTAL_STAKE, EpochNumber},
32    drb::{
33        election::{generate_stake_cdf, select_randomized_leader, RandomizedCommittee},
34        DrbResult,
35    },
36    stake_table::{HSStakeTable, StakeTableEntry},
37    traits::{
38        election::Membership,
39        node_implementation::{ConsensusTime, NodeType},
40        signature_key::StakeTableEntryType,
41    },
42    PeerConfig,
43};
44use humantime::format_duration;
45use indexmap::IndexMap;
46use thiserror::Error;
47use tokio::{spawn, time::sleep};
48use tracing::Instrument;
49
50#[cfg(any(test, feature = "testing"))]
51use super::v0_3::DAMembers;
52use super::{
53    traits::{MembershipPersistence, StateCatchup},
54    v0_3::{ChainConfig, EventKey, Fetcher, StakeTableEvent, StakeTableUpdateTask, Validator},
55    Header, L1Client, Leaf2, PubKey, SeqTypes,
56};
57use crate::{
58    traits::EventsPersistenceRead,
59    v0_1::{L1Provider, RewardAmount, BLOCKS_PER_YEAR, COMMISSION_BASIS_POINTS, INFLATION_RATE},
60    v0_3::{EventSortingError, ExpectedStakeTableError, FetchRewardError, StakeTableError},
61};
62
63type Epoch = <SeqTypes as NodeType>::Epoch;
64pub type ValidatorMap = IndexMap<Address, Validator<BLSPubKey>>;
65/// The result of applying a stake table event:
66/// - `Ok(Ok(()))`: success
67/// - `Ok(Err(...))`: expected error
68/// - `Err(...)`: serious error
69type ApplyEventResult<T> = Result<Result<T, ExpectedStakeTableError>, StakeTableError>;
70
71/// Format the alloy Log RPC type in a way to make it easy to find the event in an explorer.
72trait DisplayLog {
73    fn display(&self) -> String;
74}
75
76impl DisplayLog for Log {
77    fn display(&self) -> String {
78        // These values are all unlikely to be missing because we only create Log variables by
79        // fetching them from the RPC, so for simplicity we use defaults if the any of the values
80        // are missing.
81        let block = self.block_number.unwrap_or_default();
82        let index = self.log_index.unwrap_or_default();
83        let hash = self.transaction_hash.unwrap_or_default();
84        format!("Log(block={block},index={index},transaction_hash={hash})")
85    }
86}
87
88#[derive(Clone, PartialEq)]
89pub struct StakeTableEvents {
90    registrations: Vec<(ValidatorRegistered, Log)>,
91    registrations_v2: Vec<(ValidatorRegisteredV2, Log)>,
92    deregistrations: Vec<(ValidatorExit, Log)>,
93    delegated: Vec<(Delegated, Log)>,
94    undelegated: Vec<(Undelegated, Log)>,
95    keys: Vec<(ConsensusKeysUpdated, Log)>,
96    keys_v2: Vec<(ConsensusKeysUpdatedV2, Log)>,
97}
98
99impl StakeTableEvents {
100    /// Creates a new instance of `StakeTableEvents` with the provided events.
101    ///
102    /// Remove unauthenticated registration and key update events
103    fn from_l1_logs(
104        registrations: Vec<(ValidatorRegistered, Log)>,
105        registrations_v2: Vec<(ValidatorRegisteredV2, Log)>,
106        deregistrations: Vec<(ValidatorExit, Log)>,
107        delegated: Vec<(Delegated, Log)>,
108        undelegated: Vec<(Undelegated, Log)>,
109        keys: Vec<(ConsensusKeysUpdated, Log)>,
110        keys_v2: Vec<(ConsensusKeysUpdatedV2, Log)>,
111    ) -> Self {
112        let registrations_v2 = registrations_v2
113            .into_iter()
114            .filter(|(event, log)| {
115                event
116                    .authenticate()
117                    .map_err(|_| {
118                        tracing::warn!(
119                            "Failed to authenticate ValidatorRegisteredV2 event {}",
120                            log.display()
121                        );
122                    })
123                    .is_ok()
124            })
125            .collect();
126        let keys_v2 = keys_v2
127            .into_iter()
128            .filter(|(event, log)| {
129                event
130                    .authenticate()
131                    .map_err(|_| {
132                        tracing::warn!(
133                            "Failed to authenticate ConsensusKeysUpdatedV2 event {}",
134                            log.display()
135                        );
136                    })
137                    .is_ok()
138            })
139            .collect();
140        Self {
141            registrations,
142            registrations_v2,
143            deregistrations,
144            delegated,
145            undelegated,
146            keys,
147            keys_v2,
148        }
149    }
150
151    pub fn sort_events(self) -> Result<Vec<(EventKey, StakeTableEvent)>, EventSortingError> {
152        let mut events: Vec<(EventKey, StakeTableEvent)> = Vec::new();
153        let Self {
154            registrations,
155            registrations_v2,
156            deregistrations,
157            delegated,
158            undelegated,
159            keys,
160            keys_v2,
161        } = self;
162
163        let key = |log: &Log| -> Result<EventKey, EventSortingError> {
164            let block_number = log
165                .block_number
166                .ok_or(EventSortingError::MissingBlockNumber)?;
167            let log_index = log.log_index.ok_or(EventSortingError::MissingLogIndex)?;
168            Ok((block_number, log_index))
169        };
170
171        for (registration, log) in registrations {
172            events.push((key(&log)?, registration.into()));
173        }
174        for (registration, log) in registrations_v2 {
175            events.push((key(&log)?, registration.into()));
176        }
177        for (dereg, log) in deregistrations {
178            events.push((key(&log)?, dereg.into()));
179        }
180        for (delegation, log) in delegated {
181            events.push((key(&log)?, delegation.into()));
182        }
183        for (undelegated, log) in undelegated {
184            events.push((key(&log)?, undelegated.into()));
185        }
186        for (update, log) in keys {
187            events.push((key(&log)?, update.into()));
188        }
189        for (update, log) in keys_v2 {
190            events.push((key(&log)?, update.into()));
191        }
192
193        events.sort_by_key(|(key, _)| *key);
194        Ok(events)
195    }
196}
197
198#[derive(Debug)]
199pub struct StakeTableState {
200    validators: ValidatorMap,
201    used_bls_keys: HashSet<BLSPubKey>,
202    used_schnorr_keys: HashSet<SchnorrPubKey>,
203}
204
205impl StakeTableState {
206    pub fn new() -> Self {
207        Self {
208            validators: IndexMap::new(),
209            used_bls_keys: HashSet::new(),
210            used_schnorr_keys: HashSet::new(),
211        }
212    }
213
214    pub fn get_validators(self) -> ValidatorMap {
215        self.validators
216    }
217
218    pub fn apply_event(&mut self, event: StakeTableEvent) -> ApplyEventResult<()> {
219        match event {
220            StakeTableEvent::Register(ValidatorRegistered {
221                account,
222                blsVk,
223                schnorrVk,
224                commission,
225            }) => {
226                let stake_table_key: BLSPubKey = blsVk.into();
227                let state_ver_key: SchnorrPubKey = schnorrVk.into();
228
229                let entry = self.validators.entry(account);
230                if let indexmap::map::Entry::Occupied(_) = entry {
231                    return Err(StakeTableError::AlreadyRegistered(account));
232                }
233
234                // The stake table contract enforces that each bls key is only used once.
235                if !self.used_bls_keys.insert(stake_table_key) {
236                    return Err(StakeTableError::BlsKeyAlreadyUsed(
237                        stake_table_key.to_string(),
238                    ));
239                }
240
241                // The contract does *not* enforce that each schnorr key is only used once.
242                if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
243                    return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
244                        state_ver_key.to_string(),
245                    )));
246                }
247
248                entry.or_insert(Validator {
249                    account,
250                    stake_table_key,
251                    state_ver_key,
252                    stake: U256::ZERO,
253                    commission,
254                    delegators: HashMap::new(),
255                });
256            },
257
258            StakeTableEvent::RegisterV2(reg) => {
259                // Signature authentication is performed right after fetching, if we get an
260                // unauthenticated event here, something went wrong, we abort early.
261                reg.authenticate()
262                    .map_err(|e| StakeTableError::AuthenticationFailed(e.to_string()))?;
263
264                let ValidatorRegisteredV2 {
265                    account,
266                    blsVK,
267                    schnorrVK,
268                    commission,
269                    ..
270                } = reg;
271
272                let stake_table_key: BLSPubKey = blsVK.into();
273                let state_ver_key: SchnorrPubKey = schnorrVK.into();
274
275                let entry = self.validators.entry(account);
276                if let indexmap::map::Entry::Occupied(_) = entry {
277                    return Err(StakeTableError::AlreadyRegistered(account));
278                }
279
280                // The stake table contract enforces that each bls key is only used once.
281                if !self.used_bls_keys.insert(stake_table_key) {
282                    return Err(StakeTableError::BlsKeyAlreadyUsed(
283                        stake_table_key.to_string(),
284                    ));
285                }
286
287                // The contract does *not* enforce that each schnorr key is only used once.
288                if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
289                    return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
290                        state_ver_key.to_string(),
291                    )));
292                }
293
294                entry.or_insert(Validator {
295                    account,
296                    stake_table_key,
297                    state_ver_key,
298                    stake: U256::ZERO,
299                    commission,
300                    delegators: HashMap::new(),
301                });
302            },
303
304            StakeTableEvent::Deregister(exit) => {
305                self.validators
306                    .shift_remove(&exit.validator)
307                    .ok_or(StakeTableError::ValidatorNotFound(exit.validator))?;
308            },
309
310            StakeTableEvent::Delegate(delegated) => {
311                let Delegated {
312                    delegator,
313                    validator,
314                    amount,
315                } = delegated;
316
317                let val = self
318                    .validators
319                    .get_mut(&validator)
320                    .ok_or(StakeTableError::ValidatorNotFound(validator))?;
321
322                if amount.is_zero() {
323                    return Err(StakeTableError::ZeroDelegatorStake(delegator));
324                }
325
326                val.stake += amount;
327                // Insert the delegator with the given stake
328                // or increase the stake if already present
329                val.delegators
330                    .entry(delegator)
331                    .and_modify(|stake| *stake += amount)
332                    .or_insert(amount);
333            },
334
335            StakeTableEvent::Undelegate(undelegated) => {
336                let Undelegated {
337                    delegator,
338                    validator,
339                    amount,
340                } = undelegated;
341
342                let val = self
343                    .validators
344                    .get_mut(&validator)
345                    .ok_or(StakeTableError::ValidatorNotFound(validator))?;
346
347                val.stake = val
348                    .stake
349                    .checked_sub(amount)
350                    .ok_or(StakeTableError::InsufficientStake)?;
351
352                let delegator_stake = val
353                    .delegators
354                    .get_mut(&delegator)
355                    .ok_or(StakeTableError::DelegatorNotFound(delegator))?;
356
357                *delegator_stake = delegator_stake
358                    .checked_sub(amount)
359                    .ok_or(StakeTableError::InsufficientStake)?;
360
361                if delegator_stake.is_zero() {
362                    val.delegators.remove(&delegator);
363                }
364            },
365
366            StakeTableEvent::KeyUpdate(update) => {
367                let ConsensusKeysUpdated {
368                    account,
369                    blsVK,
370                    schnorrVK,
371                } = update;
372
373                let validator = self
374                    .validators
375                    .get_mut(&account)
376                    .ok_or(StakeTableError::ValidatorNotFound(account))?;
377
378                let stake_table_key: BLSPubKey = blsVK.into();
379                let state_ver_key: SchnorrPubKey = schnorrVK.into();
380
381                if !self.used_bls_keys.insert(stake_table_key) {
382                    return Err(StakeTableError::BlsKeyAlreadyUsed(
383                        stake_table_key.to_string(),
384                    ));
385                }
386
387                // The contract does *not* enforce that each schnorr key is only used once,
388                // therefore it's possible to have multiple validators with the same schnorr key.
389                if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
390                    return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
391                        state_ver_key.to_string(),
392                    )));
393                }
394
395                validator.stake_table_key = stake_table_key;
396                validator.state_ver_key = state_ver_key;
397            },
398
399            StakeTableEvent::KeyUpdateV2(update) => {
400                // Signature authentication is performed right after fetching, if we get an
401                // unauthenticated event here, something went wrong, we abort early.
402                update
403                    .authenticate()
404                    .map_err(|e| StakeTableError::AuthenticationFailed(e.to_string()))?;
405
406                let ConsensusKeysUpdatedV2 {
407                    account,
408                    blsVK,
409                    schnorrVK,
410                    ..
411                } = update;
412
413                let validator = self
414                    .validators
415                    .get_mut(&account)
416                    .ok_or(StakeTableError::ValidatorNotFound(account))?;
417
418                let stake_table_key: BLSPubKey = blsVK.into();
419                let state_ver_key: SchnorrPubKey = schnorrVK.into();
420
421                // The stake table contract enforces that each bls key is only used once.
422                if !self.used_bls_keys.insert(stake_table_key) {
423                    return Err(StakeTableError::BlsKeyAlreadyUsed(
424                        stake_table_key.to_string(),
425                    ));
426                }
427
428                // The contract does *not* enforce that each schnorr key is only used once,
429                // therefore it's possible to have multiple validators with the same schnorr key.
430                if !self.used_schnorr_keys.insert(state_ver_key.clone()) {
431                    return Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(
432                        state_ver_key.to_string(),
433                    )));
434                }
435
436                validator.stake_table_key = stake_table_key;
437                validator.state_ver_key = state_ver_key;
438            },
439        }
440
441        Ok(Ok(()))
442    }
443}
444
445pub fn validators_from_l1_events<I: Iterator<Item = StakeTableEvent>>(
446    events: I,
447) -> Result<ValidatorMap, StakeTableError> {
448    let mut state = StakeTableState::new();
449    for event in events {
450        match state.apply_event(event.clone()) {
451            Ok(Ok(())) => (), // Event successfully applied
452            Ok(Err(expected_err)) => {
453                // expected error, continue
454                tracing::warn!("Expected error while applying event {event:?}: {expected_err}");
455            },
456            Err(err) => {
457                // stop processing due to fatal error
458                tracing::error!("Fatal error in applying event {event:?}: {err}");
459                return Err(err);
460            },
461        }
462    }
463    Ok(state.get_validators())
464}
465
466/// Select active validators
467///
468/// Removes the validators without stake and selects the top 100 staked validators.
469pub(crate) fn select_active_validator_set(
470    validators: &mut ValidatorMap,
471) -> Result<(), StakeTableError> {
472    let total_validators = validators.len();
473
474    // Remove invalid validators first
475    validators.retain(|address, validator| {
476        if validator.delegators.is_empty() {
477            tracing::info!("Validator {address:?} does not have any delegator");
478            return false;
479        }
480
481        if validator.stake.is_zero() {
482            tracing::info!("Validator {address:?} does not have any stake");
483            return false;
484        }
485
486        true
487    });
488
489    tracing::debug!(
490        total_validators,
491        filtered = validators.len(),
492        "Filtered out invalid validators"
493    );
494
495    if validators.is_empty() {
496        tracing::warn!("Validator selection failed: no validators passed minimum criteria");
497        return Err(StakeTableError::NoValidValidators);
498    }
499
500    let maximum_stake = validators.values().map(|v| v.stake).max().ok_or_else(|| {
501        tracing::error!("Could not compute maximum stake from filtered validators");
502        StakeTableError::MissingMaximumStake
503    })?;
504
505    let minimum_stake = maximum_stake
506        .checked_div(U256::from(VID_TARGET_TOTAL_STAKE))
507        .ok_or_else(|| {
508            tracing::error!("Overflow while calculating minimum stake threshold");
509            StakeTableError::MinimumStakeOverflow
510        })?;
511
512    let mut valid_stakers: Vec<_> = validators
513        .iter()
514        .filter(|(_, v)| v.stake >= minimum_stake)
515        .map(|(addr, v)| (*addr, v.stake))
516        .collect();
517
518    tracing::info!(
519        count = valid_stakers.len(),
520        "Number of validators above minimum stake threshold"
521    );
522
523    // Sort by stake (descending order)
524    valid_stakers.sort_by_key(|(_, stake)| std::cmp::Reverse(*stake));
525
526    if valid_stakers.len() > 100 {
527        valid_stakers.truncate(100);
528    }
529
530    // Retain only the selected validators
531    let selected_addresses: HashSet<_> = valid_stakers.iter().map(|(addr, _)| *addr).collect();
532    validators.retain(|address, _| selected_addresses.contains(address));
533
534    tracing::info!(
535        final_count = validators.len(),
536        "Selected active validator set"
537    );
538
539    Ok(())
540}
541
542/// Extract the active validator set from the L1 stake table events.
543pub(crate) fn active_validator_set_from_l1_events<I: Iterator<Item = StakeTableEvent>>(
544    events: I,
545) -> Result<ValidatorMap, StakeTableError> {
546    let mut validators = validators_from_l1_events(events)?;
547    select_active_validator_set(&mut validators)?;
548    Ok(validators)
549}
550
551impl std::fmt::Debug for StakeTableEvent {
552    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
553        match self {
554            StakeTableEvent::Register(event) => write!(f, "Register({:?})", event.account),
555            StakeTableEvent::RegisterV2(event) => write!(f, "RegisterV2({:?})", event.account),
556            StakeTableEvent::Deregister(event) => write!(f, "Deregister({:?})", event.validator),
557            StakeTableEvent::Delegate(event) => write!(f, "Delegate({:?})", event.delegator),
558            StakeTableEvent::Undelegate(event) => write!(f, "Undelegate({:?})", event.delegator),
559            StakeTableEvent::KeyUpdate(event) => write!(f, "KeyUpdate({:?})", event.account),
560            StakeTableEvent::KeyUpdateV2(event) => write!(f, "KeyUpdateV2({:?})", event.account),
561        }
562    }
563}
564
565#[derive(Clone, derive_more::derive::Debug)]
566/// Type to describe DA and Stake memberships
567pub struct EpochCommittees {
568    /// Committee used when we're in pre-epoch state
569    non_epoch_committee: NonEpochCommittee,
570    /// Holds Stake table and da stake
571    state: HashMap<Epoch, EpochCommittee>,
572    /// Randomized committees, filled when we receive the DrbResult
573    randomized_committees: BTreeMap<Epoch, RandomizedCommittee<StakeTableEntry<PubKey>>>,
574    first_epoch: Option<Epoch>,
575    block_reward: RewardAmount,
576    fetcher: Arc<Fetcher>,
577}
578
579impl Fetcher {
580    pub fn new(
581        peers: Arc<dyn StateCatchup>,
582        persistence: Arc<Mutex<dyn MembershipPersistence>>,
583        l1_client: L1Client,
584        chain_config: ChainConfig,
585    ) -> Self {
586        Self {
587            peers,
588            persistence,
589            l1_client,
590            chain_config: Arc::new(Mutex::new(chain_config)),
591            update_task: StakeTableUpdateTask(Mutex::new(None)).into(),
592        }
593    }
594
595    pub async fn spawn_update_loop(&self) {
596        let mut update_task = self.update_task.0.lock().await;
597        if update_task.is_none() {
598            *update_task = Some(spawn(self.update_loop()));
599        }
600    }
601
602    /// Periodically updates the stake table from the L1 contract.
603    /// This function polls the finalized block number from the L1 client at an interval
604    /// and fetches stake table from contract
605    /// and updates the persistence
606    fn update_loop(&self) -> impl Future<Output = ()> {
607        let span = tracing::warn_span!("Stake table update loop");
608        let self_clone = self.clone();
609        let state = self.l1_client.state.clone();
610        let l1_retry = self.l1_client.options().l1_retry_delay;
611        let update_delay = self.l1_client.options().stake_table_update_interval;
612        let chain_config = self.chain_config.clone();
613
614        async move {
615            // Get the stake table contract address from the chain config.
616            // This may not contain a stake table address if we are on a pre-epoch version.
617            // It keeps retrying until the chain config is upgraded
618            // after a successful upgrade to an epoch version.
619            let stake_contract_address = loop {
620                match chain_config.lock().await.stake_table_contract {
621                    Some(addr) => break addr,
622                    None => {
623                        tracing::debug!(
624                            "Stake table contract address not found. Retrying in {l1_retry:?}...",
625                        );
626                    },
627                }
628                sleep(l1_retry).await;
629            };
630
631            // Begin the main polling loop
632            loop {
633                let finalized_block = loop {
634                    if let Some(block) = state.lock().await.last_finalized {
635                        break block;
636                    }
637                    tracing::debug!("Finalized block not yet available. Retrying in {l1_retry:?}",);
638                    sleep(l1_retry).await;
639                };
640
641                tracing::debug!("Attempting to fetch stake table at L1 block {finalized_block:?}",);
642
643                loop {
644                    match self_clone
645                        .fetch_and_store_stake_table_events(stake_contract_address, finalized_block)
646                        .await
647                    {
648                        Ok(events) => {
649                            tracing::info!(
650                                "Successfully fetched and stored stake table events at \
651                                 block={finalized_block:?}"
652                            );
653                            tracing::debug!("events={events:?}");
654                            break;
655                        },
656                        Err(e) => {
657                            tracing::error!(
658                                "Error fetching stake table at block {finalized_block:?}. err= \
659                                 {e:#}",
660                            );
661                            sleep(l1_retry).await;
662                        },
663                    }
664                }
665
666                tracing::debug!("Waiting {update_delay:?} before next stake table update...",);
667                sleep(update_delay).await;
668            }
669        }
670        .instrument(span)
671    }
672
673    pub async fn fetch_events(
674        &self,
675        contract: Address,
676        to_block: u64,
677    ) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
678        let persistence_lock = self.persistence.lock().await;
679        let (read_l1_offset, persistence_events) = persistence_lock.load_events(to_block).await?;
680        drop(persistence_lock);
681
682        tracing::info!("loaded events from storage to_block={to_block:?}");
683
684        // No need to fetch from contract
685        // if persistence returns all the events that we need
686        if let Some(EventsPersistenceRead::Complete) = read_l1_offset {
687            return Ok(persistence_events);
688        }
689
690        let from_block = read_l1_offset
691            .map(|read| match read {
692                EventsPersistenceRead::UntilL1Block(block) => Ok(block + 1),
693                EventsPersistenceRead::Complete => Err(anyhow::anyhow!(
694                    "Unexpected state. offset is complete after returning early"
695                )),
696            })
697            .transpose()?;
698
699        ensure!(
700            Some(to_block) >= from_block,
701            "to_block {to_block:?} is less than from_block {from_block:?}"
702        );
703
704        tracing::info!(%to_block, from_block = ?from_block, "Fetching events from contract");
705
706        let contract_events = Self::fetch_events_from_contract(
707            self.l1_client.clone(),
708            contract,
709            from_block,
710            to_block,
711        )
712        .await;
713
714        let contract_events = contract_events.sort_events()?;
715        let mut events = match from_block {
716            Some(_) => persistence_events
717                .into_iter()
718                .chain(contract_events)
719                .collect(),
720            None => contract_events,
721        };
722
723        // There are no duplicates because the RPC returns all events,
724        // which are stored directly in persistence as is.
725        // However, this step is taken as a precaution.
726        // The vector is already sorted above, so this should be fast.
727        let len_before_dedup = events.len();
728        events.dedup();
729        let len_after_dedup = events.len();
730        if len_before_dedup != len_after_dedup {
731            tracing::warn!("Duplicate events found and removed. This should not normally happen.")
732        }
733
734        Ok(events)
735    }
736
737    /// Fetch all stake table events from L1
738    pub async fn fetch_events_from_contract(
739        l1_client: L1Client,
740        contract: Address,
741        from_block: Option<u64>,
742        to_block: u64,
743    ) -> StakeTableEvents {
744        let stake_table_contract = StakeTableV2::new(contract, l1_client.provider.clone());
745        let max_retry_duration = l1_client.options().l1_events_max_retry_duration;
746        // get the block number when the contract was initialized
747        // to avoid fetching events from block number 0
748        let from_block = match from_block {
749            Some(block) => block,
750            None => {
751                let start = Instant::now();
752
753                loop {
754                    match stake_table_contract.initializedAtBlock().call().await {
755                        Ok(init_block) => break init_block._0.to::<u64>(),
756                        Err(err) => {
757                            if start.elapsed() >= max_retry_duration {
758                                panic!(
759                                    "Failed to retrieve initial block after `{}`: {err}",
760                                    format_duration(max_retry_duration)
761                                );
762                            }
763                            tracing::warn!(%err, "Failed to retrieve initial block, retrying...");
764                            sleep(l1_client.options().l1_retry_delay).await;
765                        },
766                    }
767                }
768            },
769        };
770
771        // To avoid making large RPC calls, divide the range into smaller chunks.
772        // chunk size is from env "ESPRESSO_SEQUENCER_L1_EVENTS_MAX_BLOCK_RANGE
773        // default value  is `10000` if env variable is not set
774        let mut start = from_block;
775        let end = to_block;
776        let chunk_size = l1_client.options().l1_events_max_block_range;
777        let chunks = std::iter::from_fn(move || {
778            let chunk_end = min(start + chunk_size - 1, end);
779            if chunk_end < start {
780                return None;
781            }
782
783            let chunk = (start, chunk_end);
784            start = chunk_end + 1;
785            Some(chunk)
786        });
787
788        // fetch registered events
789        // retry if the call to the provider to fetch the events fails
790        let registered_events = stream::iter(chunks.clone()).then(|(from, to)| {
791            let retry_delay = l1_client.options().l1_retry_delay;
792            let stake_table_contract = stake_table_contract.clone();
793            async move {
794                tracing::debug!(from, to, "fetch ValidatorRegistered events in range");
795                let events = retry(
796                    retry_delay,
797                    max_retry_duration,
798                    "ValidatorRegistered event fetch",
799                    move || {
800                        let stake_table_contract = stake_table_contract.clone();
801                        Box::pin(async move {
802                            stake_table_contract
803                                .ValidatorRegistered_filter()
804                                .from_block(from)
805                                .to_block(to)
806                                .query()
807                                .await
808                        })
809                    },
810                )
811                .await;
812                stream::iter(events)
813            }
814        });
815
816        // fetch registered events v2
817        // retry if the call to the provider to fetch the events fails
818        let registered_events_v2 = stream::iter(chunks.clone()).then(|(from, to)| {
819            let retry_delay = l1_client.options().l1_retry_delay;
820            let max_retry_duration = l1_client.options().l1_events_max_retry_duration;
821            let stake_table_contract = stake_table_contract.clone();
822            async move {
823                tracing::debug!(from, to, "fetch ValidatorRegisteredV2 events in range");
824                let events = retry(
825                    retry_delay,
826                    max_retry_duration,
827                    "ValidatorRegisteredV2 event fetch",
828                    move || {
829                        let stake_table_contract = stake_table_contract.clone();
830                        Box::pin(async move {
831                            stake_table_contract
832                                .ValidatorRegisteredV2_filter()
833                                .from_block(from)
834                                .to_block(to)
835                                .query()
836                                .await
837                        })
838                    },
839                )
840                .await;
841
842                stream::iter(events.into_iter().filter(|(event, log)| {
843                    if let Err(e) = event.authenticate() {
844                        tracing::warn!(
845                            %e,
846                            "Failed to authenticate ValidatorRegisteredV2 event: {}",
847                            log.display()
848                        );
849                        return false;
850                    }
851                    true
852                }))
853            }
854        });
855
856        // fetch validator de registration events
857        let deregistered_events = stream::iter(chunks.clone()).then(|(from, to)| {
858            let retry_delay = l1_client.options().l1_retry_delay;
859            let stake_table_contract = stake_table_contract.clone();
860            async move {
861                tracing::debug!(from, to, "fetch ValidatorExit events in range");
862                let events = retry(
863                    retry_delay,
864                    max_retry_duration,
865                    "ValidatorExit event fetch",
866                    move || {
867                        let stake_table_contract = stake_table_contract.clone();
868                        Box::pin(async move {
869                            stake_table_contract
870                                .ValidatorExit_filter()
871                                .from_block(from)
872                                .to_block(to)
873                                .query()
874                                .await
875                        })
876                    },
877                )
878                .await;
879                stream::iter(events)
880            }
881        });
882
883        // fetch delegated events
884        let delegated_events = stream::iter(chunks.clone()).then(|(from, to)| {
885            let retry_delay = l1_client.options().l1_retry_delay;
886            let stake_table_contract = stake_table_contract.clone();
887            async move {
888                tracing::debug!(from, to, "fetch Delegated events in range");
889                let events = retry(
890                    retry_delay,
891                    max_retry_duration,
892                    "Delegated event fetch",
893                    move || {
894                        let stake_table_contract = stake_table_contract.clone();
895                        Box::pin(async move {
896                            stake_table_contract
897                                .Delegated_filter()
898                                .from_block(from)
899                                .to_block(to)
900                                .query()
901                                .await
902                        })
903                    },
904                )
905                .await;
906                stream::iter(events)
907            }
908        });
909
910        // fetch undelegated events
911        let undelegated_events = stream::iter(chunks.clone()).then(|(from, to)| {
912            let retry_delay = l1_client.options().l1_retry_delay;
913            let stake_table_contract = stake_table_contract.clone();
914            async move {
915                tracing::debug!(from, to, "fetch Undelegated events in range");
916                let events = retry(
917                    retry_delay,
918                    max_retry_duration,
919                    "Undelegated event fetch",
920                    move || {
921                        let stake_table_contract = stake_table_contract.clone();
922                        Box::pin(async move {
923                            stake_table_contract
924                                .Undelegated_filter()
925                                .from_block(from)
926                                .to_block(to)
927                                .query()
928                                .await
929                        })
930                    },
931                )
932                .await;
933                stream::iter(events)
934            }
935        });
936        // fetch consensus keys updated events
937        let keys_update_events = stream::iter(chunks.clone()).then(|(from, to)| {
938            let retry_delay = l1_client.options().l1_retry_delay;
939            let stake_table_contract = stake_table_contract.clone();
940            async move {
941                tracing::debug!(from, to, "fetch ConsensusKeysUpdated events in range");
942                let events = retry(
943                    retry_delay,
944                    max_retry_duration,
945                    "ConsensusKeysUpdated event fetch",
946                    move || {
947                        let stake_table_contract = stake_table_contract.clone();
948                        Box::pin(async move {
949                            stake_table_contract
950                                .ConsensusKeysUpdated_filter()
951                                .from_block(from)
952                                .to_block(to)
953                                .query()
954                                .await
955                        })
956                    },
957                )
958                .await;
959                stream::iter(events)
960            }
961        });
962        // fetch consensus keys updated v2 events
963        let keys_update_events_v2 = stream::iter(chunks).then(|(from, to)| {
964            let retry_delay = l1_client.options().l1_retry_delay;
965            let stake_table_contract = stake_table_contract.clone();
966            async move {
967                tracing::debug!(from, to, "fetch ConsensusKeysUpdatedV2 events in range");
968                let events = retry(
969                    retry_delay,
970                    max_retry_duration,
971                    "ConsensusKeysUpdatedV2 event fetch",
972                    move || {
973                        let stake_table_contract = stake_table_contract.clone();
974                        Box::pin(async move {
975                            stake_table_contract
976                                .ConsensusKeysUpdatedV2_filter()
977                                .from_block(from)
978                                .to_block(to)
979                                .query()
980                                .await
981                        })
982                    },
983                )
984                .await;
985
986                stream::iter(events.into_iter().filter(|(event, log)| {
987                    if let Err(e) = event.authenticate() {
988                        tracing::warn!(
989                            %e,
990                            "Failed to authenticate ConsensusKeysUpdatedV2 event {}",
991                            log.display()
992                        );
993                        return false;
994                    }
995                    true
996                }))
997            }
998        });
999        let registrations = registered_events.flatten().collect().await;
1000        let registrations_v2 = registered_events_v2.flatten().collect().await;
1001        let deregistrations = deregistered_events.flatten().collect().await;
1002        let delegated = delegated_events.flatten().collect().await;
1003        let undelegated = undelegated_events.flatten().collect().await;
1004        let keys = keys_update_events.flatten().collect().await;
1005        let keys_v2 = keys_update_events_v2.flatten().collect().await;
1006
1007        StakeTableEvents::from_l1_logs(
1008            registrations,
1009            registrations_v2,
1010            deregistrations,
1011            delegated,
1012            undelegated,
1013            keys,
1014            keys_v2,
1015        )
1016    }
1017
1018    /// Get `StakeTable` at specific l1 block height.
1019    /// This function fetches and processes various events (ValidatorRegistered, ValidatorExit,
1020    /// Delegated, Undelegated, and ConsensusKeysUpdated) within the block range from the
1021    /// contract's initialization block to the provided `to_block` value.
1022    /// Events are fetched in chunks to and retries are implemented for failed requests.
1023    pub async fn fetch_and_store_stake_table_events(
1024        &self,
1025        contract: Address,
1026        to_block: u64,
1027    ) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
1028        let events = self.fetch_events(contract, to_block).await?;
1029
1030        tracing::info!("storing events in storage to_block={to_block:?}");
1031
1032        {
1033            let persistence_lock = self.persistence.lock().await;
1034            persistence_lock
1035                .store_events(to_block, events.clone())
1036                .await
1037                .inspect_err(|e| tracing::error!("failed to store events. err={e}"))?;
1038        }
1039
1040        Ok(events)
1041    }
1042
1043    // Only used by staking CLI which doesn't have persistence
1044    pub async fn fetch_all_validators_from_contract(
1045        l1_client: L1Client,
1046        contract: Address,
1047        to_block: u64,
1048    ) -> anyhow::Result<ValidatorMap> {
1049        let events = Self::fetch_events_from_contract(l1_client, contract, None, to_block).await;
1050        let sorted = events.sort_events()?;
1051        // Process the sorted events and return the resulting stake table.
1052        validators_from_l1_events(sorted.into_iter().map(|(_, e)| e))
1053            .context("failed to construct validators set from l1 events")
1054    }
1055    /// This function is used to calculate the reward for a block.
1056    /// It fetches the initial supply from the token contract.
1057    ///
1058    /// - We now rely on the `Initialized` event of the token contract (which should only occur once).
1059    /// - After locating this event, we fetch its transaction receipt and look for a decoded `Transfer` log
1060    /// - If either step fails, the function aborts to prevent incorrect reward calculations.
1061    ///
1062    /// Relying on mint events directly e.g., searching for mints from the zero address is prone to errors
1063    /// because in future when reward withdrawals are supported, there might be more than one mint transfer logs from
1064    /// zero address
1065    ///
1066    /// The ESP token contract itself does not expose the initialization block
1067    /// but the stake table contract does
1068    /// The stake table contract is deployed after the token contract as it holds the token
1069    /// contract address. We use the stake table contract initialization block as a safe upper bound when scanning
1070    ///  backwards for the token contract initialization event
1071    pub async fn fetch_block_reward(&self) -> Result<RewardAmount, FetchRewardError> {
1072        let chain_config = *self.chain_config.lock().await;
1073
1074        let stake_table_contract = chain_config
1075            .stake_table_contract
1076            .ok_or(FetchRewardError::MissingStakeTableContract)?;
1077
1078        let provider = self.l1_client.provider.clone();
1079        let stake_table = StakeTableV2::new(stake_table_contract, provider.clone());
1080
1081        // Get the block number where the stake table was initialized
1082        // Stake table contract has the token contract address
1083        // so the token contract is deployed before the stake table contract
1084        let stake_table_init_block = stake_table
1085            .initializedAtBlock()
1086            .block(BlockId::finalized())
1087            .call()
1088            .await
1089            .map_err(FetchRewardError::ContractCall)?
1090            ._0
1091            .to::<u64>();
1092
1093        tracing::info!("stake table init block ={stake_table_init_block}");
1094
1095        let token_address = stake_table
1096            .token()
1097            .block(BlockId::finalized())
1098            .call()
1099            .await
1100            .map_err(FetchRewardError::TokenAddressFetch)?
1101            ._0;
1102
1103        let token = EspToken::new(token_address, provider.clone());
1104
1105        // Try to fetch the `Initialized` event directly. This event is emitted only once,
1106        // during the token contract initialization. The initialization transaction also transfers initial supply minted
1107        // from the zero address. Since the result set is small (a single event),
1108        // most RPC providers like Infura and Alchemy allow querying across the full block range
1109        // If this fails because provider does not allow the query due to rate limiting (or some other error), we fall back to scanning over
1110        // a fixed block range.
1111        let init_logs = token
1112            .Initialized_filter()
1113            .from_block(0u64)
1114            .to_block(BlockNumberOrTag::Finalized)
1115            .query()
1116            .await;
1117
1118        let init_log = match init_logs {
1119            Ok(init_logs) => {
1120                if init_logs.is_empty() {
1121                    tracing::error!(
1122                        "Token Initialized event logs are empty. This should never happen"
1123                    );
1124                    return Err(FetchRewardError::MissingInitializedEvent);
1125                }
1126
1127                let (_, init_log) = init_logs[0].clone();
1128
1129                tracing::debug!(tx_hash = ?init_log.transaction_hash, "Found token `Initialized` event");
1130                init_log
1131            },
1132            Err(err) => {
1133                tracing::warn!(
1134                    "RPC returned error {err:?}. will fallback to scanning over fixed block range"
1135                );
1136                self.scan_token_contract_initialized_event_log(stake_table_init_block, token)
1137                    .await?
1138            },
1139        };
1140
1141        // Get the transaction that emitted the Initialized event
1142        let tx_hash =
1143            init_log
1144                .transaction_hash
1145                .ok_or_else(|| FetchRewardError::MissingTransactionHash {
1146                    init_log: init_log.clone().into(),
1147                })?;
1148
1149        // Get the transaction that emitted the Initialized event
1150        let init_tx = provider
1151            .get_transaction_receipt(tx_hash)
1152            .await
1153            .map_err(FetchRewardError::Rpc)?
1154            .ok_or_else(|| FetchRewardError::MissingTransactionReceipt {
1155                tx_hash: tx_hash.to_string(),
1156            })?;
1157
1158        let mint_transfer = init_tx.decoded_log::<EspToken::Transfer>().ok_or(
1159            FetchRewardError::DecodeTransferLog {
1160                tx_hash: tx_hash.to_string(),
1161            },
1162        )?;
1163
1164        tracing::debug!("mint transfer event ={mint_transfer:?}");
1165        if mint_transfer.from != Address::ZERO {
1166            return Err(FetchRewardError::InvalidMintFromAddress);
1167        }
1168
1169        let initial_supply = mint_transfer.value;
1170
1171        tracing::info!("Initial token amount: {} ESP", format_ether(initial_supply));
1172
1173        let reward = ((initial_supply * U256::from(INFLATION_RATE)) / U256::from(BLOCKS_PER_YEAR))
1174            .checked_div(U256::from(COMMISSION_BASIS_POINTS))
1175            .ok_or(FetchRewardError::DivisionByZero)?;
1176
1177        Ok(RewardAmount(reward))
1178    }
1179
1180    pub async fn scan_token_contract_initialized_event_log(
1181        &self,
1182        stake_table_init_block: u64,
1183        token: EspTokenInstance<(), L1Provider>,
1184    ) -> Result<Log, FetchRewardError> {
1185        let max_events_range = self.l1_client.options().l1_events_max_block_range;
1186        const MAX_BLOCKS_SCANNED: u64 = 200_000;
1187        let mut total_scanned = 0;
1188
1189        let mut from_block = stake_table_init_block.saturating_sub(max_events_range);
1190        let mut to_block = stake_table_init_block;
1191
1192        loop {
1193            if total_scanned >= MAX_BLOCKS_SCANNED {
1194                tracing::error!(
1195                    total_scanned,
1196                    "Exceeded maximum scan range while searching for token Initialized event"
1197                );
1198                return Err(FetchRewardError::ExceededMaxScanRange(MAX_BLOCKS_SCANNED));
1199            }
1200
1201            let init_logs = token
1202                .Initialized_filter()
1203                .from_block(from_block)
1204                .to_block(to_block)
1205                .query()
1206                .await
1207                .map_err(FetchRewardError::ScanQueryFailed)?;
1208
1209            if !init_logs.is_empty() {
1210                let (_, init_log) = init_logs[0].clone();
1211                tracing::info!(
1212                    from_block,
1213                    tx_hash = ?init_log.transaction_hash,
1214                    "Found token Initialized event during scan"
1215                );
1216                return Ok(init_log);
1217            }
1218
1219            total_scanned += max_events_range;
1220            from_block = from_block.saturating_sub(max_events_range);
1221            to_block = to_block.saturating_sub(max_events_range);
1222        }
1223    }
1224
1225    pub async fn fetch(&self, epoch: Epoch, header: Header) -> anyhow::Result<ValidatorMap> {
1226        let chain_config = self.get_chain_config(&header).await?;
1227        // update chain config
1228        *self.chain_config.lock().await = chain_config;
1229
1230        let Some(address) = chain_config.stake_table_contract else {
1231            bail!("No stake table contract address found in Chain config");
1232        };
1233
1234        let Some(l1_finalized_block_info) = header.l1_finalized() else {
1235            bail!(
1236                "The epoch root for epoch {epoch} is missing the L1 finalized block info. This is \
1237                 a fatal error. Consensus is blocked and will not recover."
1238            );
1239        };
1240
1241        let events = match self
1242            .fetch_and_store_stake_table_events(address, l1_finalized_block_info.number())
1243            .await
1244            .map_err(GetStakeTablesError::L1ClientFetchError)
1245        {
1246            Ok(events) => events,
1247            Err(e) => {
1248                bail!("failed to fetch stake table events {e:?}");
1249            },
1250        };
1251
1252        match active_validator_set_from_l1_events(events.into_iter().map(|(_, e)| e)) {
1253            Ok(validators) => Ok(validators),
1254            Err(e) => {
1255                bail!("failed to construct stake table {e:?}");
1256            },
1257        }
1258    }
1259
1260    /// Retrieve and verify `ChainConfig`
1261    // TODO move to appropriate object (Header?)
1262    pub(crate) async fn get_chain_config(&self, header: &Header) -> anyhow::Result<ChainConfig> {
1263        let chain_config = self.chain_config.lock().await;
1264        let peers = self.peers.clone();
1265        let header_cf = header.chain_config();
1266        if chain_config.commit() == header_cf.commit() {
1267            return Ok(*chain_config);
1268        }
1269
1270        let cf = match header_cf.resolve() {
1271            Some(cf) => cf,
1272            None => peers
1273                .fetch_chain_config(header_cf.commit())
1274                .await
1275                .map_err(|err| {
1276                    tracing::error!("failed to get chain_config from peers. err: {err:?}");
1277                    err
1278                })?,
1279        };
1280
1281        Ok(cf)
1282    }
1283
1284    #[cfg(any(test, feature = "testing"))]
1285    pub fn mock() -> Self {
1286        use crate::{mock, v0_1::NoStorage};
1287        let chain_config = ChainConfig::default();
1288        let l1 = L1Client::new(vec!["http://localhost:3331".parse().unwrap()])
1289            .expect("Failed to create L1 client");
1290
1291        let peers = Arc::new(mock::MockStateCatchup::default());
1292        let persistence = NoStorage;
1293
1294        Self::new(peers, Arc::new(Mutex::new(persistence)), l1, chain_config)
1295    }
1296}
1297
1298async fn retry<F, T, E>(
1299    retry_delay: Duration,
1300    max_duration: Duration,
1301    operation_name: &str,
1302    mut operation: F,
1303) -> T
1304where
1305    F: FnMut() -> BoxFuture<'static, Result<T, E>>,
1306    E: std::fmt::Display,
1307{
1308    let start = Instant::now();
1309    loop {
1310        match operation().await {
1311            Ok(result) => return result,
1312            Err(err) => {
1313                if start.elapsed() >= max_duration {
1314                    panic!(
1315                        r#"
1316                    Failed to complete operation `{operation_name}` after `{}`.
1317                    error: {err}
1318                    
1319
1320                    This might be caused by:
1321                    - The current block range being too large for your RPC provider.
1322                    - The event query returning more data than your RPC allows as some RPC providers limit the number of events returned.
1323                    - RPC provider outage
1324
1325                    Suggested solution:
1326                    - Reduce the value of the environment variable `ESPRESSO_SEQUENCER_L1_EVENTS_MAX_BLOCK_RANGE` to query smaller ranges.
1327                    - Add multiple RPC providers
1328                    - Use a different RPC provider with higher rate limits."#,
1329                        format_duration(max_duration)
1330                    );
1331                }
1332                tracing::warn!(%err, "Retrying `{operation_name}` after error");
1333                sleep(retry_delay).await;
1334            },
1335        }
1336    }
1337}
1338
1339/// Holds Stake table and da stake
1340#[derive(Clone, Debug)]
1341struct NonEpochCommittee {
1342    /// The nodes eligible for leadership.
1343    /// NOTE: This is currently a hack because the DA leader needs to be the quorum
1344    /// leader but without voting rights.
1345    eligible_leaders: Vec<PeerConfig<SeqTypes>>,
1346
1347    /// Keys for nodes participating in the network
1348    stake_table: Vec<PeerConfig<SeqTypes>>,
1349
1350    /// Keys for DA members
1351    da_members: Vec<PeerConfig<SeqTypes>>,
1352
1353    /// Stake entries indexed by public key, for efficient lookup.
1354    indexed_stake_table: HashMap<PubKey, PeerConfig<SeqTypes>>,
1355
1356    /// DA entries indexed by public key, for efficient lookup.
1357    indexed_da_members: HashMap<PubKey, PeerConfig<SeqTypes>>,
1358}
1359
1360/// Holds Stake table and da stake
1361#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
1362pub struct EpochCommittee {
1363    /// The nodes eligible for leadership.
1364    /// NOTE: This is currently a hack because the DA leader needs to be the quorum
1365    /// leader but without voting rights.
1366    eligible_leaders: Vec<PeerConfig<SeqTypes>>,
1367    /// Keys for nodes participating in the network
1368    stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>>,
1369    validators: ValidatorMap,
1370    address_mapping: HashMap<BLSPubKey, Address>,
1371}
1372
1373impl EpochCommittees {
1374    pub fn first_epoch(&self) -> Option<Epoch> {
1375        self.first_epoch
1376    }
1377
1378    pub fn fetcher(&self) -> &Fetcher {
1379        &self.fetcher
1380    }
1381
1382    /// Updates `Self.stake_table` with stake_table for
1383    /// `Self.contract_address` at `l1_block_height`. This is intended
1384    /// to be called before calling `self.stake()` so that
1385    /// `Self.stake_table` only needs to be updated once in a given
1386    /// life-cycle but may be read from many times.
1387    fn update(
1388        &mut self,
1389        epoch: EpochNumber,
1390        validators: ValidatorMap,
1391        block_reward: Option<RewardAmount>,
1392    ) {
1393        let mut address_mapping = HashMap::new();
1394        let stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>> = validators
1395            .values()
1396            .map(|v| {
1397                address_mapping.insert(v.stake_table_key, v.account);
1398                (
1399                    v.stake_table_key,
1400                    PeerConfig {
1401                        stake_table_entry: BLSPubKey::stake_table_entry(
1402                            &v.stake_table_key,
1403                            v.stake,
1404                        ),
1405                        state_ver_key: v.state_ver_key.clone(),
1406                    },
1407                )
1408            })
1409            .collect();
1410
1411        let eligible_leaders: Vec<PeerConfig<SeqTypes>> =
1412            stake_table.iter().map(|(_, l)| l.clone()).collect();
1413
1414        self.state.insert(
1415            epoch,
1416            EpochCommittee {
1417                eligible_leaders,
1418                stake_table,
1419                validators,
1420                address_mapping,
1421            },
1422        );
1423
1424        if let Some(block_reward) = block_reward {
1425            self.block_reward = block_reward;
1426        }
1427    }
1428
1429    pub fn validators(&self, epoch: &Epoch) -> anyhow::Result<ValidatorMap> {
1430        Ok(self
1431            .state
1432            .get(epoch)
1433            .context("state for found")?
1434            .validators
1435            .clone())
1436    }
1437
1438    pub fn address(&self, epoch: &Epoch, bls_key: BLSPubKey) -> anyhow::Result<Address> {
1439        let mapping = self
1440            .state
1441            .get(epoch)
1442            .context("state for found")?
1443            .address_mapping
1444            .clone();
1445
1446        Ok(*mapping.get(&bls_key).context(format!(
1447            "failed to get ethereum address for bls key {bls_key}. epoch={epoch}"
1448        ))?)
1449    }
1450
1451    pub fn get_validator_config(
1452        &self,
1453        epoch: &Epoch,
1454        key: BLSPubKey,
1455    ) -> anyhow::Result<Validator<BLSPubKey>> {
1456        let address = self.address(epoch, key)?;
1457        let validators = self.validators(epoch)?;
1458        validators
1459            .get(&address)
1460            .context("validator not found")
1461            .cloned()
1462    }
1463
1464    pub fn block_reward(&self) -> RewardAmount {
1465        self.block_reward
1466    }
1467
1468    // We need a constructor to match our concrete type.
1469    pub fn new_stake(
1470        // TODO remove `new` from trait and rename this to `new`.
1471        // https://github.com/EspressoSystems/HotShot/commit/fcb7d54a4443e29d643b3bbc53761856aef4de8b
1472        committee_members: Vec<PeerConfig<SeqTypes>>,
1473        da_members: Vec<PeerConfig<SeqTypes>>,
1474        block_reward: RewardAmount,
1475        fetcher: Fetcher,
1476    ) -> Self {
1477        // For each member, get the stake table entry
1478        let stake_table: Vec<_> = committee_members
1479            .iter()
1480            .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
1481            .cloned()
1482            .collect();
1483
1484        let eligible_leaders = stake_table.clone();
1485        // For each member, get the stake table entry
1486        let da_members: Vec<_> = da_members
1487            .iter()
1488            .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
1489            .cloned()
1490            .collect();
1491
1492        // Index the stake table by public key
1493        let indexed_stake_table: HashMap<PubKey, _> = stake_table
1494            .iter()
1495            .map(|peer_config| {
1496                (
1497                    PubKey::public_key(&peer_config.stake_table_entry),
1498                    peer_config.clone(),
1499                )
1500            })
1501            .collect();
1502
1503        // Index the stake table by public key
1504        let indexed_da_members: HashMap<PubKey, _> = da_members
1505            .iter()
1506            .map(|peer_config| {
1507                (
1508                    PubKey::public_key(&peer_config.stake_table_entry),
1509                    peer_config.clone(),
1510                )
1511            })
1512            .collect();
1513
1514        let members = NonEpochCommittee {
1515            eligible_leaders,
1516            stake_table,
1517            da_members,
1518            indexed_stake_table,
1519            indexed_da_members,
1520        };
1521
1522        let mut map = HashMap::new();
1523        let epoch_committee = EpochCommittee {
1524            eligible_leaders: members.eligible_leaders.clone(),
1525            stake_table: members
1526                .stake_table
1527                .iter()
1528                .map(|x| (PubKey::public_key(&x.stake_table_entry), x.clone()))
1529                .collect(),
1530            validators: Default::default(),
1531            address_mapping: HashMap::new(),
1532        };
1533        map.insert(Epoch::genesis(), epoch_committee.clone());
1534        // TODO: remove this, workaround for hotshot asking for stake tables from epoch 1
1535        map.insert(Epoch::genesis() + 1u64, epoch_committee.clone());
1536
1537        Self {
1538            non_epoch_committee: members,
1539            state: map,
1540            randomized_committees: BTreeMap::new(),
1541            first_epoch: None,
1542            block_reward,
1543            fetcher: Arc::new(fetcher),
1544        }
1545    }
1546
1547    pub async fn reload_stake(&mut self, limit: u64) {
1548        // Load the 50 latest stored stake tables
1549
1550        let loaded_stake = match self
1551            .fetcher
1552            .persistence
1553            .lock()
1554            .await
1555            .load_latest_stake(limit)
1556            .await
1557        {
1558            Ok(Some(loaded)) => loaded,
1559            Ok(None) => {
1560                tracing::warn!("No stake table history found in persistence!");
1561                return;
1562            },
1563            Err(e) => {
1564                tracing::error!("Failed to load stake table history from persistence: {e}");
1565                return;
1566            },
1567        };
1568
1569        for (epoch, stake_table) in loaded_stake {
1570            self.update(epoch, stake_table, None);
1571        }
1572    }
1573
1574    fn get_stake_table(&self, epoch: &Option<Epoch>) -> Option<Vec<PeerConfig<SeqTypes>>> {
1575        if let Some(epoch) = epoch {
1576            self.state
1577                .get(epoch)
1578                .map(|committee| committee.stake_table.clone().into_values().collect())
1579        } else {
1580            Some(self.non_epoch_committee.stake_table.clone())
1581        }
1582    }
1583}
1584
1585#[derive(Error, Debug)]
1586/// Error representing fail cases for retrieving the stake table.
1587enum GetStakeTablesError {
1588    #[error("Error fetching from L1: {0}")]
1589    L1ClientFetchError(anyhow::Error),
1590}
1591
1592#[derive(Error, Debug)]
1593#[error("Could not lookup leader")] // TODO error variants? message?
1594pub struct LeaderLookupError;
1595
1596// #[async_trait]
1597impl Membership<SeqTypes> for EpochCommittees {
1598    type Error = LeaderLookupError;
1599    // DO NOT USE. Dummy constructor to comply w/ trait.
1600    fn new(
1601        // TODO remove `new` from trait and remove this fn as well.
1602        // https://github.com/EspressoSystems/HotShot/commit/fcb7d54a4443e29d643b3bbc53761856aef4de8b
1603        _committee_members: Vec<PeerConfig<SeqTypes>>,
1604        _da_members: Vec<PeerConfig<SeqTypes>>,
1605    ) -> Self {
1606        panic!("This function has been replaced with new_stake()");
1607    }
1608
1609    /// Get the stake table for the current view
1610    fn stake_table(&self, epoch: Option<Epoch>) -> HSStakeTable<SeqTypes> {
1611        self.get_stake_table(&epoch).unwrap_or_default().into()
1612    }
1613    /// Get the stake table for the current view
1614    fn da_stake_table(&self, _epoch: Option<Epoch>) -> HSStakeTable<SeqTypes> {
1615        self.non_epoch_committee.da_members.clone().into()
1616    }
1617
1618    /// Get all members of the committee for the current view
1619    fn committee_members(
1620        &self,
1621        _view_number: <SeqTypes as NodeType>::View,
1622        epoch: Option<Epoch>,
1623    ) -> BTreeSet<PubKey> {
1624        let stake_table = self.stake_table(epoch);
1625        stake_table
1626            .iter()
1627            .map(|x| PubKey::public_key(&x.stake_table_entry))
1628            .collect()
1629    }
1630
1631    /// Get all members of the committee for the current view
1632    fn da_committee_members(
1633        &self,
1634        _view_number: <SeqTypes as NodeType>::View,
1635        _epoch: Option<Epoch>,
1636    ) -> BTreeSet<PubKey> {
1637        self.non_epoch_committee
1638            .indexed_da_members
1639            .clone()
1640            .into_keys()
1641            .collect()
1642    }
1643
1644    /// Get the stake table entry for a public key
1645    fn stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> Option<PeerConfig<SeqTypes>> {
1646        // Only return the stake if it is above zero
1647        if let Some(epoch) = epoch {
1648            self.state
1649                .get(&epoch)
1650                .and_then(|h| h.stake_table.get(pub_key))
1651                .cloned()
1652        } else {
1653            self.non_epoch_committee
1654                .indexed_stake_table
1655                .get(pub_key)
1656                .cloned()
1657        }
1658    }
1659
1660    /// Get the DA stake table entry for a public key
1661    fn da_stake(&self, pub_key: &PubKey, _epoch: Option<Epoch>) -> Option<PeerConfig<SeqTypes>> {
1662        // Only return the stake if it is above zero
1663        self.non_epoch_committee
1664            .indexed_da_members
1665            .get(pub_key)
1666            .cloned()
1667    }
1668
1669    /// Check if a node has stake in the committee
1670    fn has_stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> bool {
1671        self.stake(pub_key, epoch)
1672            .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1673            .unwrap_or_default()
1674    }
1675
1676    /// Check if a node has stake in the committee
1677    fn has_da_stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> bool {
1678        self.da_stake(pub_key, epoch)
1679            .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1680            .unwrap_or_default()
1681    }
1682
1683    /// Returns the leader's public key for a given view number and epoch.
1684    ///
1685    /// If an epoch is provided and a randomized committee exists for that epoch,
1686    /// the leader is selected from the randomized committee. Otherwise, the leader
1687    /// is selected from the non-epoch committee.
1688    ///
1689    /// # Arguments
1690    /// * `view_number` - The view number to index into the committee.
1691    /// * `epoch` - The epoch for which to determine the leader. If `None`, uses the non-epoch committee.
1692    ///
1693    /// # Errors
1694    /// Returns `LeaderLookupError` if the epoch is before the first epoch or if the committee is missing.
1695    fn lookup_leader(
1696        &self,
1697        view_number: <SeqTypes as NodeType>::View,
1698        epoch: Option<Epoch>,
1699    ) -> Result<PubKey, Self::Error> {
1700        match (self.first_epoch(), epoch) {
1701            (Some(first_epoch), Some(epoch)) => {
1702                if epoch < first_epoch {
1703                    tracing::error!(
1704                        "lookup_leader called with epoch {} before first epoch {}",
1705                        epoch,
1706                        first_epoch,
1707                    );
1708                    return Err(LeaderLookupError);
1709                }
1710                let Some(randomized_committee) = self.randomized_committees.get(&epoch) else {
1711                    tracing::error!(
1712                        "We are missing the randomized committee for epoch {}",
1713                        epoch
1714                    );
1715                    return Err(LeaderLookupError);
1716                };
1717
1718                Ok(PubKey::public_key(&select_randomized_leader(
1719                    randomized_committee,
1720                    *view_number,
1721                )))
1722            },
1723            (_, None) => {
1724                let leaders = &self.non_epoch_committee.eligible_leaders;
1725
1726                let index = *view_number as usize % leaders.len();
1727                let res = leaders[index].clone();
1728                Ok(PubKey::public_key(&res.stake_table_entry))
1729            },
1730            (None, Some(epoch)) => {
1731                tracing::error!(
1732                    "lookup_leader called with epoch {} but we don't have a first epoch",
1733                    epoch,
1734                );
1735                Err(LeaderLookupError)
1736            },
1737        }
1738    }
1739
1740    /// Get the total number of nodes in the committee
1741    fn total_nodes(&self, epoch: Option<Epoch>) -> usize {
1742        self.stake_table(epoch).len()
1743    }
1744
1745    /// Get the total number of DA nodes in the committee
1746    fn da_total_nodes(&self, epoch: Option<Epoch>) -> usize {
1747        self.da_stake_table(epoch).len()
1748    }
1749
1750    /// Get the voting success threshold for the committee
1751    fn success_threshold(&self, epoch: Option<Epoch>) -> U256 {
1752        let total_stake = self.total_stake(epoch);
1753        let one = U256::ONE;
1754        let two = U256::from(2);
1755        let three = U256::from(3);
1756        if total_stake < U256::MAX / two {
1757            ((total_stake * two) / three) + one
1758        } else {
1759            ((total_stake / three) * two) + two
1760        }
1761    }
1762
1763    /// Get the voting success threshold for the committee
1764    fn da_success_threshold(&self, epoch: Option<Epoch>) -> U256 {
1765        let total_stake = self.total_da_stake(epoch);
1766        let one = U256::ONE;
1767        let two = U256::from(2);
1768        let three = U256::from(3);
1769
1770        if total_stake < U256::MAX / two {
1771            ((total_stake * two) / three) + one
1772        } else {
1773            ((total_stake / three) * two) + two
1774        }
1775    }
1776
1777    /// Get the voting failure threshold for the committee
1778    fn failure_threshold(&self, epoch: Option<Epoch>) -> U256 {
1779        let total_stake = self.total_stake(epoch);
1780        let one = U256::ONE;
1781        let three = U256::from(3);
1782
1783        (total_stake / three) + one
1784    }
1785
1786    /// Get the voting upgrade threshold for the committee
1787    fn upgrade_threshold(&self, epoch: Option<Epoch>) -> U256 {
1788        let total_stake = self.total_stake(epoch);
1789        let nine = U256::from(9);
1790        let ten = U256::from(10);
1791
1792        let normal_threshold = self.success_threshold(epoch);
1793        let higher_threshold = if total_stake < U256::MAX / nine {
1794            (total_stake * nine) / ten
1795        } else {
1796            (total_stake / ten) * nine
1797        };
1798
1799        max(higher_threshold, normal_threshold)
1800    }
1801
1802    async fn add_epoch_root(
1803        membership: Arc<RwLock<Self>>,
1804        epoch: Epoch,
1805        block_header: Header,
1806    ) -> anyhow::Result<()> {
1807        let membership_reader = membership.read().await;
1808        if membership_reader.state.contains_key(&epoch) {
1809            tracing::info!(
1810                "We already have the stake table for epoch {}. Skipping L1 fetching.",
1811                epoch
1812            );
1813            return Ok(());
1814        }
1815        let fetcher = Arc::clone(&membership_reader.fetcher);
1816        drop(membership_reader);
1817
1818        let stake_tables = fetcher.fetch(epoch, block_header).await?;
1819
1820        let mut block_reward = None;
1821
1822        {
1823            let membership_reader = membership.read().await;
1824            // Assumes the stake table contract proxy address does not change
1825            // In the future, if we want to support updates to the stake table contract address via chain config,
1826            // or allow the contract to handle additional block reward calculation parameters (e.g., inflation, block time),
1827            // the `fetch_block_reward` logic can be updated to support per-epoch rewards.
1828            // Initially, the block reward is zero if the node starts on pre-epoch version
1829            // but it is updated on the first call to `add_epoch_root()`
1830            if membership_reader.block_reward == RewardAmount(U256::ZERO) {
1831                block_reward = Some(fetcher.fetch_block_reward().await?);
1832            }
1833        }
1834
1835        // Store stake table in persistence
1836        {
1837            let persistence_lock = fetcher.persistence.lock().await;
1838            if let Err(e) = persistence_lock
1839                .store_stake(epoch, stake_tables.clone())
1840                .await
1841            {
1842                tracing::error!(?e, "`add_epoch_root`, error storing stake table");
1843            }
1844        }
1845
1846        let mut membership_writer = membership.write().await;
1847        membership_writer.update(epoch, stake_tables, block_reward);
1848        Ok(())
1849    }
1850
1851    fn has_stake_table(&self, epoch: Epoch) -> bool {
1852        self.state.contains_key(&epoch)
1853    }
1854
1855    /// Checks if the randomized stake table is available for the given epoch.
1856    ///
1857    /// Returns `Ok(true)` if a randomized committee exists for the specified epoch and
1858    /// the epoch is not before the first epoch. Returns an error if `first_epoch` is `None`
1859    /// or if the provided epoch is before the first epoch.
1860    ///
1861    /// # Arguments
1862    /// * `epoch` - The epoch for which to check the presence of a randomized stake table.
1863    ///
1864    /// # Errors
1865    /// Returns an error if `first_epoch` is `None` or if `epoch` is before `first_epoch`.
1866    fn has_randomized_stake_table(&self, epoch: Epoch) -> anyhow::Result<bool> {
1867        let Some(first_epoch) = self.first_epoch else {
1868            bail!(
1869                "Called has_randomized_stake_table with epoch {} but first_epoch is None",
1870                epoch
1871            );
1872        };
1873        ensure!(
1874            epoch >= first_epoch,
1875            "Called has_randomized_stake_table with epoch {} but first_epoch is {}",
1876            epoch,
1877            first_epoch
1878        );
1879        Ok(self.randomized_committees.contains_key(&epoch))
1880    }
1881
1882    async fn get_epoch_root(
1883        membership: Arc<RwLock<Self>>,
1884        block_height: u64,
1885        epoch: Epoch,
1886    ) -> anyhow::Result<Leaf2> {
1887        let membership_reader = membership.read().await;
1888        let peers = membership_reader.fetcher.peers.clone();
1889        let stake_table = membership_reader.stake_table(Some(epoch)).clone();
1890        let success_threshold = membership_reader.success_threshold(Some(epoch));
1891        drop(membership_reader);
1892
1893        // Fetch leaves from peers
1894        let leaf: Leaf2 = peers
1895            .fetch_leaf(block_height, stake_table.clone(), success_threshold)
1896            .await?;
1897
1898        Ok(leaf)
1899    }
1900
1901    async fn get_epoch_drb(
1902        membership: Arc<RwLock<Self>>,
1903        block_height: u64,
1904        epoch: Epoch,
1905    ) -> anyhow::Result<DrbResult> {
1906        let membership_reader = membership.read().await;
1907        let peers = membership_reader.fetcher.peers.clone();
1908        let stake_table = membership_reader.stake_table(Some(epoch)).clone();
1909        let success_threshold = membership_reader.success_threshold(Some(epoch));
1910        drop(membership_reader);
1911
1912        tracing::debug!(
1913            "Getting DRB for epoch {}, block height {}",
1914            epoch,
1915            block_height
1916        );
1917        let drb_leaf = peers
1918            .try_fetch_leaf(1, block_height, stake_table, success_threshold)
1919            .await?;
1920
1921        let Some(drb) = drb_leaf.next_drb_result else {
1922            tracing::error!(
1923                "We received a leaf that should contain a DRB result, but the DRB result is \
1924                 missing: {:?}",
1925                drb_leaf
1926            );
1927
1928            bail!("DRB leaf is missing the DRB result.");
1929        };
1930
1931        Ok(drb)
1932    }
1933
1934    fn add_drb_result(&mut self, epoch: Epoch, drb: DrbResult) {
1935        let Some(raw_stake_table) = self.state.get(&epoch) else {
1936            tracing::error!(
1937                "add_drb_result({epoch}, {drb:?}) was called, but we do not yet have the stake \
1938                 table for epoch {epoch}"
1939            );
1940            return;
1941        };
1942
1943        let leaders = raw_stake_table
1944            .eligible_leaders
1945            .clone()
1946            .into_iter()
1947            .map(|peer_config| peer_config.stake_table_entry)
1948            .collect::<Vec<_>>();
1949        let randomized_committee = generate_stake_cdf(leaders, drb);
1950
1951        self.randomized_committees
1952            .insert(epoch, randomized_committee);
1953    }
1954
1955    fn set_first_epoch(&mut self, epoch: Epoch, initial_drb_result: DrbResult) {
1956        self.first_epoch = Some(epoch);
1957
1958        let epoch_committee = self.state.get(&Epoch::genesis()).unwrap().clone();
1959        self.state.insert(epoch, epoch_committee.clone());
1960        self.state.insert(epoch + 1, epoch_committee);
1961        self.add_drb_result(epoch, initial_drb_result);
1962        self.add_drb_result(epoch + 1, initial_drb_result);
1963    }
1964
1965    fn first_epoch(&self) -> Option<<SeqTypes as NodeType>::Epoch> {
1966        self.first_epoch
1967    }
1968}
1969
1970#[cfg(any(test, feature = "testing"))]
1971impl super::v0_3::StakeTable {
1972    /// Generate a `StakeTable` with `n` members.
1973    pub fn mock(n: u64) -> Self {
1974        [..n]
1975            .iter()
1976            .map(|_| PeerConfig::default())
1977            .collect::<Vec<PeerConfig<SeqTypes>>>()
1978            .into()
1979    }
1980}
1981
1982#[cfg(any(test, feature = "testing"))]
1983impl DAMembers {
1984    /// Generate a `DaMembers` (alias committee) with `n` members.
1985    pub fn mock(n: u64) -> Self {
1986        [..n]
1987            .iter()
1988            .map(|_| PeerConfig::default())
1989            .collect::<Vec<PeerConfig<SeqTypes>>>()
1990            .into()
1991    }
1992}
1993
1994#[cfg(any(test, feature = "testing"))]
1995pub mod testing {
1996    use alloy::primitives::Bytes;
1997    use hotshot_contract_adapter::{
1998        sol_types::{EdOnBN254PointSol, G1PointSol, G2PointSol},
1999        stake_table::{sign_address_bls, sign_address_schnorr},
2000    };
2001    use hotshot_types::{light_client::StateKeyPair, signature_key::BLSKeyPair};
2002    use rand::{Rng as _, RngCore as _};
2003
2004    use super::*;
2005
2006    // TODO: current tests are just sanity checks, we need more.
2007
2008    #[derive(Debug, Clone)]
2009    pub struct TestValidator {
2010        pub account: Address,
2011        pub bls_vk: G2PointSol,
2012        pub schnorr_vk: EdOnBN254PointSol,
2013        pub commission: u16,
2014        pub bls_sig: G1PointSol,
2015        pub schnorr_sig: Bytes,
2016    }
2017
2018    impl TestValidator {
2019        pub fn random() -> Self {
2020            let account = Address::random();
2021            let commission = rand::thread_rng().gen_range(0..10000);
2022            Self::random_update_keys(account, commission)
2023        }
2024
2025        pub fn randomize_keys(&self) -> Self {
2026            Self::random_update_keys(self.account, self.commission)
2027        }
2028
2029        fn random_update_keys(account: Address, commission: u16) -> Self {
2030            let mut rng = &mut rand::thread_rng();
2031            let mut seed = [0u8; 32];
2032            rng.fill_bytes(&mut seed);
2033            let bls_key_pair = BLSKeyPair::generate(&mut rng);
2034            let bls_sig = sign_address_bls(&bls_key_pair, account);
2035            let schnorr_key_pair = StateKeyPair::generate_from_seed_indexed(seed, 0);
2036            let schnorr_sig = sign_address_schnorr(&schnorr_key_pair, account);
2037            Self {
2038                account,
2039                bls_vk: bls_key_pair.ver_key().to_affine().into(),
2040                schnorr_vk: schnorr_key_pair.ver_key().to_affine().into(),
2041                commission,
2042                bls_sig,
2043                schnorr_sig,
2044            }
2045        }
2046    }
2047
2048    impl From<&TestValidator> for ValidatorRegistered {
2049        fn from(value: &TestValidator) -> Self {
2050            Self {
2051                account: value.account,
2052                blsVk: value.bls_vk,
2053                schnorrVk: value.schnorr_vk,
2054                commission: value.commission,
2055            }
2056        }
2057    }
2058
2059    impl From<&TestValidator> for ValidatorRegisteredV2 {
2060        fn from(value: &TestValidator) -> Self {
2061            Self {
2062                account: value.account,
2063                blsVK: value.bls_vk,
2064                schnorrVK: value.schnorr_vk,
2065                commission: value.commission,
2066                blsSig: value.bls_sig.into(),
2067                schnorrSig: value.schnorr_sig.clone(),
2068            }
2069        }
2070    }
2071
2072    impl From<&TestValidator> for ConsensusKeysUpdated {
2073        fn from(value: &TestValidator) -> Self {
2074            Self {
2075                account: value.account,
2076                blsVK: value.bls_vk,
2077                schnorrVK: value.schnorr_vk,
2078            }
2079        }
2080    }
2081
2082    impl From<&TestValidator> for ConsensusKeysUpdatedV2 {
2083        fn from(value: &TestValidator) -> Self {
2084            Self {
2085                account: value.account,
2086                blsVK: value.bls_vk,
2087                schnorrVK: value.schnorr_vk,
2088                blsSig: value.bls_sig.into(),
2089                schnorrSig: value.schnorr_sig.clone(),
2090            }
2091        }
2092    }
2093
2094    impl From<&TestValidator> for ValidatorExit {
2095        fn from(value: &TestValidator) -> Self {
2096            Self {
2097                validator: value.account,
2098            }
2099        }
2100    }
2101
2102    impl Validator<BLSPubKey> {
2103        pub fn mock() -> Validator<BLSPubKey> {
2104            let val = TestValidator::random();
2105            let rng = &mut rand::thread_rng();
2106            let mut seed = [1u8; 32];
2107            rng.fill_bytes(&mut seed);
2108            let mut validator_stake = alloy::primitives::U256::from(0);
2109            let mut delegators = HashMap::new();
2110            for _i in 0..=5000 {
2111                let stake: u64 = rng.gen_range(0..10000);
2112                delegators.insert(Address::random(), alloy::primitives::U256::from(stake));
2113                validator_stake += alloy::primitives::U256::from(stake);
2114            }
2115
2116            let stake_table_key = val.bls_vk.into();
2117            let state_ver_key = val.schnorr_vk.into();
2118
2119            Validator {
2120                account: val.account,
2121                stake_table_key,
2122                state_ver_key,
2123                stake: validator_stake,
2124                commission: val.commission,
2125                delegators,
2126            }
2127        }
2128    }
2129}
2130
2131#[cfg(test)]
2132mod tests {
2133
2134    use alloy::{primitives::Address, rpc::types::Log};
2135    use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
2136    use pretty_assertions::assert_matches;
2137    use rstest::rstest;
2138    use sequencer_utils::test_utils::setup_test;
2139
2140    use super::*;
2141    use crate::{v0::impls::testing::*, L1ClientOptions};
2142
2143    #[test]
2144    fn test_from_l1_events() -> anyhow::Result<()> {
2145        setup_test();
2146        // Build a stake table with one DA node and one consensus node.
2147        let val_1 = TestValidator::random();
2148        let val_1_new_keys = val_1.randomize_keys();
2149        let val_2 = TestValidator::random();
2150        let val_2_new_keys = val_2.randomize_keys();
2151        let delegator = Address::random();
2152        let mut events: Vec<StakeTableEvent> = [
2153            ValidatorRegistered::from(&val_1).into(),
2154            ValidatorRegisteredV2::from(&val_2).into(),
2155            Delegated {
2156                delegator,
2157                validator: val_1.account,
2158                amount: U256::from(10),
2159            }
2160            .into(),
2161            ConsensusKeysUpdated::from(&val_1_new_keys).into(),
2162            ConsensusKeysUpdatedV2::from(&val_2_new_keys).into(),
2163            Undelegated {
2164                delegator,
2165                validator: val_1.account,
2166                amount: U256::from(7),
2167            }
2168            .into(),
2169            // delegate to the same validator again
2170            Delegated {
2171                delegator,
2172                validator: val_1.account,
2173                amount: U256::from(5),
2174            }
2175            .into(),
2176            // delegate to the second validator
2177            Delegated {
2178                delegator: Address::random(),
2179                validator: val_2.account,
2180                amount: U256::from(3),
2181            }
2182            .into(),
2183        ]
2184        .to_vec();
2185
2186        let st = active_validator_set_from_l1_events(events.iter().cloned())?;
2187        let st_val_1 = st.get(&val_1.account).unwrap();
2188        // final staked amount should be 10 (delegated) - 7 (undelegated) + 5 (Delegated)
2189        assert_eq!(st_val_1.stake, U256::from(8));
2190        assert_eq!(st_val_1.commission, val_1.commission);
2191        assert_eq!(st_val_1.delegators.len(), 1);
2192        // final delegated amount should be 10 (delegated) - 7 (undelegated) + 5 (Delegated)
2193        assert_eq!(*st_val_1.delegators.get(&delegator).unwrap(), U256::from(8));
2194
2195        let st_val_2 = st.get(&val_2.account).unwrap();
2196        assert_eq!(st_val_2.stake, U256::from(3));
2197        assert_eq!(st_val_2.commission, val_2.commission);
2198        assert_eq!(st_val_2.delegators.len(), 1);
2199
2200        events.push(ValidatorExit::from(&val_1).into());
2201
2202        let st = active_validator_set_from_l1_events(events.iter().cloned())?;
2203        // The first validator should have been removed
2204        assert_eq!(st.get(&val_1.account), None);
2205
2206        // The second validator should be unchanged
2207        let st_val_2 = st.get(&val_2.account).unwrap();
2208        assert_eq!(st_val_2.stake, U256::from(3));
2209        assert_eq!(st_val_2.commission, val_2.commission);
2210        assert_eq!(st_val_2.delegators.len(), 1);
2211
2212        // remove the 2nd validator
2213        events.push(ValidatorExit::from(&val_2).into());
2214
2215        // This should fail because the validator has exited and no longer exists in the stake table.
2216        assert!(active_validator_set_from_l1_events(events.iter().cloned()).is_err());
2217
2218        Ok(())
2219    }
2220
2221    #[test]
2222    fn test_from_l1_events_failures() -> anyhow::Result<()> {
2223        let val = TestValidator::random();
2224        let delegator = Address::random();
2225
2226        let register: StakeTableEvent = ValidatorRegistered::from(&val).into();
2227        let register_v2: StakeTableEvent = ValidatorRegisteredV2::from(&val).into();
2228        let delegate: StakeTableEvent = Delegated {
2229            delegator,
2230            validator: val.account,
2231            amount: U256::from(10),
2232        }
2233        .into();
2234        let key_update: StakeTableEvent = ConsensusKeysUpdated::from(&val).into();
2235        let key_update_v2: StakeTableEvent = ConsensusKeysUpdatedV2::from(&val).into();
2236        let undelegate: StakeTableEvent = Undelegated {
2237            delegator,
2238            validator: val.account,
2239            amount: U256::from(7),
2240        }
2241        .into();
2242
2243        let exit: StakeTableEvent = ValidatorExit::from(&val).into();
2244
2245        let cases = [
2246            vec![exit],
2247            vec![undelegate.clone()],
2248            vec![delegate.clone()],
2249            vec![key_update],
2250            vec![key_update_v2],
2251            vec![register.clone(), register.clone()],
2252            vec![register_v2.clone(), register_v2.clone()],
2253            vec![register.clone(), register_v2.clone()],
2254            vec![register_v2.clone(), register.clone()],
2255            vec![
2256                register,
2257                delegate.clone(),
2258                undelegate.clone(),
2259                undelegate.clone(),
2260            ],
2261            vec![register_v2, delegate, undelegate.clone(), undelegate],
2262        ];
2263
2264        for events in cases.iter() {
2265            // NOTE: not selecting the active validator set because we care about wrong sequences of
2266            // events being detected. If we compute the active set we will also get an error if the
2267            // set is empty but that's not what we want to test here.
2268            let res = validators_from_l1_events(events.iter().cloned());
2269            assert!(
2270                res.is_err(),
2271                "events {res:?}, not a valid sequence of events"
2272            );
2273        }
2274        Ok(())
2275    }
2276
2277    #[test]
2278    fn test_validators_selection() {
2279        let mut validators = IndexMap::new();
2280        let mut highest_stake = alloy::primitives::U256::ZERO;
2281
2282        for _i in 0..3000 {
2283            let validator = Validator::mock();
2284            validators.insert(validator.account, validator.clone());
2285
2286            if validator.stake > highest_stake {
2287                highest_stake = validator.stake;
2288            }
2289        }
2290
2291        let minimum_stake = highest_stake / U256::from(VID_TARGET_TOTAL_STAKE);
2292
2293        select_active_validator_set(&mut validators).expect("Failed to select validators");
2294        assert!(
2295            validators.len() <= 100,
2296            "validators len is {}, expected at most 100",
2297            validators.len()
2298        );
2299
2300        let mut selected_validators_highest_stake = alloy::primitives::U256::ZERO;
2301        // Ensure every validator in the final selection is above or equal to minimum stake
2302        for (address, validator) in &validators {
2303            assert!(
2304                validator.stake >= minimum_stake,
2305                "Validator {:?} has stake below minimum: {}",
2306                address,
2307                validator.stake
2308            );
2309
2310            if validator.stake > selected_validators_highest_stake {
2311                selected_validators_highest_stake = validator.stake;
2312            }
2313        }
2314    }
2315
2316    // For a bug where the GCL did not match the stake table contract implementation and allowed
2317    // duplicated BLS keys via the update keys events.
2318    #[rstest::rstest]
2319    fn test_regression_non_unique_bls_keys_not_discarded(
2320        #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
2321        version: StakeTableContractVersion,
2322    ) {
2323        let val = TestValidator::random();
2324        let register: StakeTableEvent = match version {
2325            StakeTableContractVersion::V1 => ValidatorRegistered::from(&val).into(),
2326            StakeTableContractVersion::V2 => ValidatorRegisteredV2::from(&val).into(),
2327        };
2328        let delegate: StakeTableEvent = Delegated {
2329            delegator: Address::random(),
2330            validator: val.account,
2331            amount: U256::from(10),
2332        }
2333        .into();
2334
2335        // first ensure that wan build a valid stake table
2336        assert!(active_validator_set_from_l1_events(
2337            vec![register.clone(), delegate.clone()].into_iter()
2338        )
2339        .is_ok());
2340
2341        // add the invalid key update (re-using the same consensus keys)
2342        let key_update = ConsensusKeysUpdated::from(&val).into();
2343        let err =
2344            active_validator_set_from_l1_events(vec![register, delegate, key_update].into_iter())
2345                .unwrap_err();
2346
2347        let bls: BLSPubKey = val.bls_vk.into();
2348        assert!(matches!(err, StakeTableError::BlsKeyAlreadyUsed(addr) if addr == bls.to_string()));
2349    }
2350
2351    #[test]
2352    fn test_display_log() {
2353        let serialized = r#"{"address":"0x0000000000000000000000000000000000000069","topics":["0x0000000000000000000000000000000000000000000000000000000000000069"],"data":"0x69","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000069","blockNumber":"0x69","blockTimestamp":"0x69","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000069","transactionIndex":"0x69","logIndex":"0x70","removed":false}"#;
2354        let log: Log = serde_json::from_str(serialized).unwrap();
2355        assert_eq!(
2356            log.display(),
2357            "Log(block=105,index=112,\
2358             transaction_hash=0x0000000000000000000000000000000000000000000000000000000000000069)"
2359        )
2360    }
2361
2362    #[rstest]
2363    #[case::v1(StakeTableContractVersion::V1)]
2364    #[case::v2(StakeTableContractVersion::V2)]
2365    fn test_register_validator(#[case] version: StakeTableContractVersion) {
2366        let mut state = StakeTableState::new();
2367        let validator = TestValidator::random();
2368
2369        let event = match version {
2370            StakeTableContractVersion::V1 => StakeTableEvent::Register((&validator).into()),
2371            StakeTableContractVersion::V2 => StakeTableEvent::RegisterV2((&validator).into()),
2372        };
2373
2374        assert!(state.apply_event(event).unwrap().is_ok());
2375
2376        let stored = state.validators.get(&validator.account).unwrap();
2377        assert_eq!(stored.account, validator.account);
2378    }
2379
2380    #[rstest]
2381    #[case::v1(StakeTableContractVersion::V1)]
2382    #[case::v2(StakeTableContractVersion::V2)]
2383    fn test_validator_already_registered(#[case] version: StakeTableContractVersion) {
2384        let mut stake_table_state = StakeTableState::new();
2385
2386        let test_validator = TestValidator::random();
2387
2388        // First registration attempt using the specified contract version
2389        let first_registration_result =
2390            match version {
2391                StakeTableContractVersion::V1 => stake_table_state
2392                    .apply_event(StakeTableEvent::Register((&test_validator).into())),
2393                StakeTableContractVersion::V2 => stake_table_state
2394                    .apply_event(StakeTableEvent::RegisterV2((&test_validator).into())),
2395            };
2396
2397        // Expect the first registration to succeed
2398        assert!(first_registration_result.unwrap().is_ok());
2399
2400        // attempt using V1 registration (should fail with AlreadyRegistered)
2401        let v1_already_registered_result =
2402            stake_table_state.apply_event(StakeTableEvent::Register((&test_validator).into()));
2403
2404        pretty_assertions::assert_matches!(
2405           v1_already_registered_result,  Err(StakeTableError::AlreadyRegistered(account)) if account == test_validator.account,
2406           "Expected AlreadyRegistered error. version ={version:?} result={v1_already_registered_result:?}",
2407        );
2408
2409        // attempt using V2 registration (should also fail with AlreadyRegistered)
2410        let v2_already_registered_result =
2411            stake_table_state.apply_event(StakeTableEvent::RegisterV2((&test_validator).into()));
2412
2413        pretty_assertions::assert_matches!(
2414            v2_already_registered_result,
2415            Err(StakeTableError::AlreadyRegistered(account)) if account == test_validator.account,
2416            "Expected AlreadyRegistered error. version ={version:?} result={v2_already_registered_result:?}",
2417
2418        );
2419    }
2420
2421    #[test]
2422    fn test_register_validator_v2_auth_fails() {
2423        let mut state = StakeTableState::new();
2424        let mut val = TestValidator::random();
2425        val.bls_sig = Default::default();
2426        let event = StakeTableEvent::RegisterV2((&val).into());
2427
2428        let result = state.apply_event(event);
2429        assert!(matches!(
2430            result,
2431            Err(StakeTableError::AuthenticationFailed(_))
2432        ));
2433    }
2434
2435    #[test]
2436    fn test_deregister_validator() {
2437        let mut state = StakeTableState::new();
2438        let val = TestValidator::random();
2439
2440        let reg = StakeTableEvent::Register((&val).into());
2441        state.apply_event(reg).unwrap().unwrap();
2442
2443        let dereg = StakeTableEvent::Deregister((&val).into());
2444        assert!(state.apply_event(dereg).unwrap().is_ok());
2445        assert!(!state.validators.contains_key(&val.account));
2446    }
2447
2448    #[test]
2449    fn test_delegate_and_undelegate() {
2450        let mut state = StakeTableState::new();
2451        let val = TestValidator::random();
2452        state
2453            .apply_event(StakeTableEvent::Register((&val).into()))
2454            .unwrap()
2455            .unwrap();
2456
2457        let delegator = Address::random();
2458        let amount = U256::from(1000);
2459        let delegate_event = StakeTableEvent::Delegate(Delegated {
2460            delegator,
2461            validator: val.account,
2462            amount,
2463        });
2464        assert!(state.apply_event(delegate_event).unwrap().is_ok());
2465
2466        let validator = state.validators.get(&val.account).unwrap();
2467        assert_eq!(validator.delegators.get(&delegator).cloned(), Some(amount));
2468
2469        let undelegate_event = StakeTableEvent::Undelegate(Undelegated {
2470            delegator,
2471            validator: val.account,
2472            amount,
2473        });
2474        assert!(state.apply_event(undelegate_event).unwrap().is_ok());
2475        let validator = state.validators.get(&val.account).unwrap();
2476        assert!(!validator.delegators.contains_key(&delegator));
2477    }
2478
2479    #[rstest]
2480    #[case::v1(StakeTableContractVersion::V1)]
2481    #[case::v2(StakeTableContractVersion::V2)]
2482    fn test_key_update_event(#[case] version: StakeTableContractVersion) {
2483        let mut state = StakeTableState::new();
2484        let val = TestValidator::random();
2485
2486        // Always register first using V1 to simulate upgrade scenarios
2487        state
2488            .apply_event(StakeTableEvent::Register((&val).into()))
2489            .unwrap()
2490            .unwrap();
2491
2492        let new_keys = val.randomize_keys();
2493
2494        let event = match version {
2495            StakeTableContractVersion::V1 => StakeTableEvent::KeyUpdate((&new_keys).into()),
2496            StakeTableContractVersion::V2 => StakeTableEvent::KeyUpdateV2((&new_keys).into()),
2497        };
2498
2499        assert!(state.apply_event(event).unwrap().is_ok());
2500
2501        let updated = state.validators.get(&val.account).unwrap();
2502        assert_eq!(updated.stake_table_key, new_keys.bls_vk.into());
2503        assert_eq!(updated.state_ver_key, new_keys.schnorr_vk.into());
2504    }
2505
2506    #[test]
2507    fn test_duplicate_bls_key() {
2508        let mut state = StakeTableState::new();
2509        let val = TestValidator::random();
2510        let event1 = StakeTableEvent::Register((&val).into());
2511        let mut val2 = TestValidator::random();
2512        val2.bls_vk = val.bls_vk;
2513        val2.account = Address::random();
2514
2515        let event2 = StakeTableEvent::Register((&val2).into());
2516        assert!(state.apply_event(event1).unwrap().is_ok());
2517        let result = state.apply_event(event2);
2518
2519        let expected_bls_key = BLSPubKey::from(val.bls_vk).to_string();
2520
2521        assert_matches!(
2522            result,
2523            Err(StakeTableError::BlsKeyAlreadyUsed(key))
2524                if key == expected_bls_key,
2525            "Expected BlsKeyAlreadyUsed({expected_bls_key}), but got: {result:?}",
2526        );
2527    }
2528
2529    #[test]
2530    fn test_duplicate_schnorr_key() {
2531        let mut state = StakeTableState::new();
2532        let val = TestValidator::random();
2533        let event1 = StakeTableEvent::Register((&val).into());
2534        let mut val2 = TestValidator::random();
2535        val2.schnorr_vk = val.schnorr_vk;
2536        val2.account = Address::random();
2537        val2.bls_vk = val2.randomize_keys().bls_vk;
2538
2539        let event2 = StakeTableEvent::Register((&val2).into());
2540        assert!(state.apply_event(event1).unwrap().is_ok());
2541        let result = state.apply_event(event2);
2542
2543        let schnorr: SchnorrPubKey = val.schnorr_vk.into();
2544        assert_matches!(
2545            result,
2546            Ok(Err(ExpectedStakeTableError::SchnorrKeyAlreadyUsed(key)))
2547                if key == schnorr.to_string(),
2548            "Expected SchnorrKeyAlreadyUsed({schnorr}), but got: {result:?}",
2549
2550        );
2551    }
2552
2553    #[test]
2554    fn test_register_and_deregister_validator() {
2555        let mut state = StakeTableState::new();
2556        let validator = TestValidator::random();
2557        let event = StakeTableEvent::Register((&validator).into());
2558        assert!(state.apply_event(event).unwrap().is_ok());
2559
2560        let deregister_event = StakeTableEvent::Deregister((&validator).into());
2561        assert!(state.apply_event(deregister_event).unwrap().is_ok());
2562    }
2563
2564    #[test]
2565    fn test_delegate_zero_amount_is_rejected() {
2566        let mut state = StakeTableState::new();
2567        let validator = TestValidator::random();
2568        let account = validator.account;
2569        state
2570            .apply_event(StakeTableEvent::Register((&validator).into()))
2571            .unwrap()
2572            .unwrap();
2573
2574        let delegator = Address::random();
2575        let amount = U256::ZERO;
2576        let event = StakeTableEvent::Delegate(Delegated {
2577            delegator,
2578            validator: account,
2579            amount,
2580        });
2581        let result = state.apply_event(event);
2582
2583        assert_matches!(
2584            result,
2585            Err(StakeTableError::ZeroDelegatorStake(addr))
2586                if addr == delegator,
2587            "delegator stake is zero"
2588
2589        );
2590    }
2591
2592    #[test]
2593    fn test_undelegate_more_than_stake_fails() {
2594        let mut state = StakeTableState::new();
2595        let validator = TestValidator::random();
2596        let account = validator.account;
2597        state
2598            .apply_event(StakeTableEvent::Register((&validator).into()))
2599            .unwrap()
2600            .unwrap();
2601
2602        let delegator = Address::random();
2603        let event = StakeTableEvent::Delegate(Delegated {
2604            delegator,
2605            validator: account,
2606            amount: U256::from(10u64),
2607        });
2608        state.apply_event(event).unwrap().unwrap();
2609
2610        let result = state.apply_event(StakeTableEvent::Undelegate(Undelegated {
2611            delegator,
2612            validator: account,
2613            amount: U256::from(20u64),
2614        }));
2615        assert_matches!(
2616            result,
2617            Err(StakeTableError::InsufficientStake),
2618            "Expected InsufficientStake error, got: {result:?}",
2619        );
2620    }
2621
2622    #[tokio::test(flavor = "multi_thread")]
2623    async fn test_decaf_stake_table() {
2624        setup_test();
2625
2626        // The following commented-out block demonstrates how the `decaf_stake_table_events.json`
2627        // and `decaf_stake_table.json` files were originally generated.
2628
2629        // It generates decaf stake table data by fetching events from the contract,
2630        // writes events and the constructed stake table to JSON files.
2631
2632        /*
2633        let l1 = L1Client::new(vec!["https://ethereum-sepolia.publicnode.com"
2634            .parse()
2635            .unwrap()])
2636        .unwrap();
2637        let contract_address = "0x40304fbe94d5e7d1492dd90c53a2d63e8506a037";
2638
2639        let events = Fetcher::fetch_events_from_contract(
2640            l1,
2641            contract_address.parse().unwrap(),
2642            None,
2643            8582328,
2644        )
2645        .await;
2646
2647        let sorted_events = events.sort_events().expect("failed to sort");
2648
2649        // Serialize and write sorted events
2650        let json_events = serde_json::to_string_pretty(&sorted_events)?;
2651        let mut events_file = File::create("decaf_stake_table_events.json").await?;
2652        events_file.write_all(json_events.as_bytes()).await?;
2653
2654        // Process into stake table
2655        let stake_table = validators_from_l1_events(sorted_events.into_iter().map(|(_, e)| e))?;
2656
2657        // Serialize and write stake table
2658        let json_stake_table = serde_json::to_string_pretty(&stake_table)?;
2659        let mut stake_file = File::create("decaf_stake_table.json").await?;
2660        stake_file.write_all(json_stake_table.as_bytes()).await?;
2661        */
2662
2663        let events_json =
2664            std::fs::read_to_string("../data/v3/decaf_stake_table_events.json").unwrap();
2665        let events: Vec<(EventKey, StakeTableEvent)> = serde_json::from_str(&events_json).unwrap();
2666
2667        // Reconstruct stake table from events
2668        let reconstructed_stake_table =
2669            active_validator_set_from_l1_events(events.into_iter().map(|(_, e)| e)).unwrap();
2670
2671        let stake_table_json =
2672            std::fs::read_to_string("../data/v3/decaf_stake_table.json").unwrap();
2673        let expected: IndexMap<Address, Validator<BLSPubKey>> =
2674            serde_json::from_str(&stake_table_json).unwrap();
2675
2676        assert_eq!(
2677            reconstructed_stake_table, expected,
2678            "Stake table reconstructed from events does not match the expected stake table "
2679        );
2680    }
2681
2682    #[tokio::test(flavor = "multi_thread")]
2683    #[should_panic]
2684    async fn test_large_max_events_range_panic() {
2685        setup_test();
2686
2687        // decaf stake table contract address
2688        let contract_address = "0x40304fbe94d5e7d1492dd90c53a2d63e8506a037";
2689
2690        let l1 = L1ClientOptions {
2691            l1_events_max_retry_duration: Duration::from_secs(30),
2692            // max block range for public node rpc is 50000 so this should result in a panic
2693            l1_events_max_block_range: 10_u64.pow(9),
2694            l1_retry_delay: Duration::from_secs(1),
2695            ..Default::default()
2696        }
2697        .connect(vec!["https://ethereum-sepolia.publicnode.com"
2698            .parse()
2699            .unwrap()])
2700        .expect("unable to construct l1 client");
2701
2702        let latest_block = l1.provider.get_block_number().await.unwrap();
2703        let _events = Fetcher::fetch_events_from_contract(
2704            l1,
2705            contract_address.parse().unwrap(),
2706            None,
2707            latest_block,
2708        )
2709        .await;
2710    }
2711}