espresso_types/v0/impls/
stake_table.rs

1use std::{
2    cmp::{max, min},
3    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
4    future::Future,
5    sync::Arc,
6};
7
8use alloy::{
9    primitives::{Address, U256},
10    rpc::types::Log,
11};
12use anyhow::{bail, Context};
13use async_lock::{Mutex, RwLock};
14use committable::Committable;
15use futures::stream::{self, StreamExt};
16use hotshot::types::{BLSPubKey, SchnorrPubKey, SignatureKey as _};
17use hotshot_contract_adapter::sol_types::StakeTable::{
18    self, ConsensusKeysUpdated, Delegated, Undelegated, ValidatorExit, ValidatorRegistered,
19};
20use hotshot_types::{
21    data::{vid_disperse::VID_TARGET_TOTAL_STAKE, EpochNumber},
22    drb::{
23        election::{generate_stake_cdf, select_randomized_leader, RandomizedCommittee},
24        DrbResult,
25    },
26    stake_table::{HSStakeTable, StakeTableEntry},
27    traits::{
28        election::Membership,
29        node_implementation::{ConsensusTime, NodeType},
30        signature_key::StakeTableEntryType,
31    },
32    PeerConfig,
33};
34use indexmap::IndexMap;
35use thiserror::Error;
36use tokio::{spawn, time::sleep};
37use tracing::Instrument;
38
39#[cfg(any(test, feature = "testing"))]
40use super::v0_3::DAMembers;
41use super::{
42    traits::{MembershipPersistence, StateCatchup},
43    v0_3::{EventKey, StakeTableEvent, StakeTableFetcher, StakeTableUpdateTask, Validator},
44    v0_99::ChainConfig,
45    Header, L1Client, Leaf2, PubKey, SeqTypes,
46};
47
48type Epoch = <SeqTypes as NodeType>::Epoch;
49
50#[derive(Clone, PartialEq)]
51pub struct StakeTableEvents {
52    registrations: Vec<(ValidatorRegistered, Log)>,
53    deregistrations: Vec<(ValidatorExit, Log)>,
54    delegated: Vec<(Delegated, Log)>,
55    undelegated: Vec<(Undelegated, Log)>,
56    keys: Vec<(ConsensusKeysUpdated, Log)>,
57}
58
59impl StakeTableEvents {
60    pub fn sort_events(self) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
61        let mut events: Vec<(EventKey, StakeTableEvent)> = Vec::new();
62        let Self {
63            registrations,
64            deregistrations,
65            delegated,
66            undelegated,
67            keys,
68        } = self;
69
70        for (registration, log) in registrations {
71            events.push((
72                (
73                    log.block_number.context("block number")?,
74                    log.log_index.context("log index")?,
75                ),
76                registration.into(),
77            ));
78        }
79        for (dereg, log) in deregistrations {
80            events.push((
81                (
82                    log.block_number.context("block number")?,
83                    log.log_index.context("log index")?,
84                ),
85                dereg.into(),
86            ));
87        }
88        for (delegation, log) in delegated {
89            events.push((
90                (
91                    log.block_number.context("block number")?,
92                    log.log_index.context("log index")?,
93                ),
94                delegation.into(),
95            ));
96        }
97        for (undelegated, log) in undelegated {
98            events.push((
99                (
100                    log.block_number.context("block number")?,
101                    log.log_index.context("log index")?,
102                ),
103                undelegated.into(),
104            ));
105        }
106
107        for (update, log) in keys {
108            events.push((
109                (
110                    log.block_number.context("block number")?,
111                    log.log_index.context("log index")?,
112                ),
113                update.into(),
114            ));
115        }
116
117        events.sort_by_key(|(key, _)| (key.0, key.1));
118        Ok(events)
119    }
120}
121
122/// Extract all validators from L1 stake table events.
123pub fn validators_from_l1_events<I: Iterator<Item = StakeTableEvent>>(
124    events: I,
125) -> anyhow::Result<IndexMap<Address, Validator<BLSPubKey>>> {
126    let mut validators = IndexMap::new();
127    let mut bls_keys = HashSet::new();
128    let mut schnorr_keys = HashSet::new();
129    for event in events {
130        tracing::debug!("Processing stake table event: {:?}", event);
131        match event {
132            StakeTableEvent::Register(ValidatorRegistered {
133                account,
134                blsVk,
135                schnorrVk,
136                commission,
137            }) => {
138                // TODO(abdul): BLS and Schnorr signature keys verification
139                let stake_table_key: BLSPubKey = blsVk.clone().into();
140                let state_ver_key: SchnorrPubKey = schnorrVk.clone().into();
141                // TODO(MA): The stake table contract currently enforces that each bls key is only used once. We will
142                // move this check to the confirmation layer and remove it from the contract. Once we have the signature
143                // check in this functions we can skip if a BLS key, or Schnorr key was previously used.
144                if bls_keys.contains(&stake_table_key) {
145                    bail!("bls key {} already used", stake_table_key.to_string());
146                };
147
148                // The contract does *not* enforce that each schnorr key is only used once.
149                if schnorr_keys.contains(&state_ver_key) {
150                    tracing::warn!("schnorr key {} already used", state_ver_key.to_string());
151                };
152
153                bls_keys.insert(stake_table_key);
154                schnorr_keys.insert(state_ver_key.clone());
155
156                match validators.entry(account) {
157                    indexmap::map::Entry::Occupied(_occupied_entry) => {
158                        bail!("validator {:#x} already registered", *account)
159                    },
160                    indexmap::map::Entry::Vacant(vacant_entry) => vacant_entry.insert(Validator {
161                        account,
162                        stake_table_key,
163                        state_ver_key,
164                        stake: U256::from(0_u64),
165                        commission,
166                        delegators: HashMap::default(),
167                    }),
168                };
169            },
170            StakeTableEvent::Deregister(exit) => {
171                validators
172                    .shift_remove(&exit.validator)
173                    .with_context(|| format!("validator {:#x} not found", exit.validator))?;
174            },
175            StakeTableEvent::Delegate(delegated) => {
176                let Delegated {
177                    delegator,
178                    validator,
179                    amount,
180                } = delegated;
181                let validator_entry = validators
182                    .get_mut(&validator)
183                    .with_context(|| format!("validator {validator:#x} not found"))?;
184
185                if amount.is_zero() {
186                    tracing::warn!("delegator {delegator:?} has 0 stake");
187                    continue;
188                }
189                // Increase stake
190                validator_entry.stake += amount;
191                // Insert the delegator with the given stake
192                // or increase the stake if already present
193                validator_entry
194                    .delegators
195                    .entry(delegator)
196                    .and_modify(|stake| *stake += amount)
197                    .or_insert(amount);
198            },
199            StakeTableEvent::Undelegate(undelegated) => {
200                let Undelegated {
201                    delegator,
202                    validator,
203                    amount,
204                } = undelegated;
205                let validator_entry = validators
206                    .get_mut(&validator)
207                    .with_context(|| format!("validator {validator:#x} not found"))?;
208
209                validator_entry.stake = validator_entry
210                    .stake
211                    .checked_sub(amount)
212                    .with_context(|| "stake is less than undelegated amount")?;
213
214                let delegator_stake = validator_entry
215                    .delegators
216                    .get_mut(&delegator)
217                    .with_context(|| format!("delegator {delegator:#x} not found"))?;
218                *delegator_stake = delegator_stake
219                    .checked_sub(amount)
220                    .with_context(|| "delegator_stake is less than undelegated amount")?;
221
222                if delegator_stake.is_zero() {
223                    // if delegator stake is 0, remove from set
224                    validator_entry.delegators.remove(&delegator);
225                }
226            },
227            StakeTableEvent::KeyUpdate(update) => {
228                let ConsensusKeysUpdated {
229                    account,
230                    blsVK,
231                    schnorrVK,
232                } = update;
233                let validator = validators
234                    .get_mut(&account)
235                    .with_context(|| "validator {account:#x} not found")?;
236                let bls = blsVK.into();
237                let state_ver_key = schnorrVK.into();
238
239                validator.stake_table_key = bls;
240                validator.state_ver_key = state_ver_key;
241            },
242        }
243    }
244
245    Ok(validators)
246}
247
248/// Select active validators
249///
250/// Removes the validators without stake and selects the top 100 staked validators.
251pub(crate) fn select_active_validator_set(
252    validators: &mut IndexMap<Address, Validator<BLSPubKey>>,
253) -> anyhow::Result<()> {
254    // Remove invalid validators first
255    validators.retain(|address, validator| {
256        if validator.delegators.is_empty() {
257            tracing::info!("Validator {address:?} does not have any delegator");
258            return false;
259        }
260
261        if validator.stake.is_zero() {
262            tracing::info!("Validator {address:?} does not have any stake");
263            return false;
264        }
265
266        true
267    });
268
269    if validators.is_empty() {
270        bail!("No valid validators found");
271    }
272
273    // Find the maximum stake
274    let maximum_stake = validators
275        .values()
276        .map(|v| v.stake)
277        .max()
278        .context("Failed to determine max stake")?;
279
280    let minimum_stake = maximum_stake
281        .checked_div(U256::from(VID_TARGET_TOTAL_STAKE))
282        .context("div err")?;
283
284    // Collect validators that meet the minimum stake criteria
285    let mut valid_stakers: Vec<_> = validators
286        .iter()
287        .filter(|(_, v)| v.stake >= minimum_stake)
288        .map(|(addr, v)| (*addr, v.stake))
289        .collect();
290
291    // Sort by stake (descending order)
292    valid_stakers.sort_by_key(|(_, stake)| std::cmp::Reverse(*stake));
293
294    // Keep only the top 100 stakers
295    if valid_stakers.len() > 100 {
296        valid_stakers.truncate(100);
297    }
298
299    // Retain only the selected validators
300    let selected_addresses: HashSet<_> = valid_stakers.iter().map(|(addr, _)| *addr).collect();
301    validators.retain(|address, _| selected_addresses.contains(address));
302
303    Ok(())
304}
305
306/// Extract the active validator set from the L1 stake table events.
307pub(crate) fn active_validator_set_from_l1_events<I: Iterator<Item = StakeTableEvent>>(
308    events: I,
309) -> anyhow::Result<IndexMap<Address, Validator<BLSPubKey>>> {
310    let mut validators = validators_from_l1_events(events)?;
311    select_active_validator_set(&mut validators)?;
312    Ok(validators)
313}
314
315impl std::fmt::Debug for StakeTableEvent {
316    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
317        match self {
318            StakeTableEvent::Register(event) => write!(f, "Register({:?})", event.account),
319            StakeTableEvent::Deregister(event) => write!(f, "Deregister({:?})", event.validator),
320            StakeTableEvent::Delegate(event) => write!(f, "Delegate({:?})", event.delegator),
321            StakeTableEvent::Undelegate(event) => write!(f, "Undelegate({:?})", event.delegator),
322            StakeTableEvent::KeyUpdate(event) => write!(f, "KeyUpdate({:?})", event.account),
323        }
324    }
325}
326
327#[derive(Clone, derive_more::derive::Debug)]
328/// Type to describe DA and Stake memberships
329pub struct EpochCommittees {
330    /// Committee used when we're in pre-epoch state
331    non_epoch_committee: NonEpochCommittee,
332    /// Holds Stake table and da stake
333    state: HashMap<Epoch, EpochCommittee>,
334    /// Randomized committees, filled when we receive the DrbResult
335    randomized_committees: BTreeMap<Epoch, RandomizedCommittee<StakeTableEntry<PubKey>>>,
336    first_epoch: Option<Epoch>,
337    fetcher: Arc<StakeTableFetcher>,
338}
339
340impl StakeTableFetcher {
341    pub fn new(
342        peers: Arc<dyn StateCatchup>,
343        persistence: Arc<Mutex<dyn MembershipPersistence>>,
344        l1_client: L1Client,
345        chain_config: ChainConfig,
346    ) -> Self {
347        Self {
348            peers,
349            persistence,
350            l1_client,
351            chain_config: Arc::new(Mutex::new(chain_config)),
352            update_task: StakeTableUpdateTask(Mutex::new(None)).into(),
353        }
354    }
355
356    pub async fn spawn_update_loop(&self) {
357        let mut update_task = self.update_task.0.lock().await;
358        if update_task.is_none() {
359            *update_task = Some(spawn(self.update_loop()));
360        }
361    }
362
363    /// Periodically updates the stake table from the L1 contract.
364    /// This function polls the finalized block number from the L1 client at an interval
365    /// and fetches stake table from contract
366    /// and updates the persistence
367    fn update_loop(&self) -> impl Future<Output = ()> {
368        let span = tracing::warn_span!("Stake table update loop");
369        let self_clone = self.clone();
370        let state = self.l1_client.state.clone();
371        let l1_retry = self.l1_client.options().l1_retry_delay;
372        let update_delay = self.l1_client.options().stake_table_update_interval;
373        let chain_config = self.chain_config.clone();
374
375        async move {
376            // Get the stake table contract address from the chain config.
377            // This may not contain a stake table address if we are on a pre-epoch version.
378            // It keeps retrying until the chain config is upgraded
379            // after a successful upgrade to an epoch version.
380            let stake_contract_address = loop {
381                match chain_config.lock().await.stake_table_contract {
382                    Some(addr) => break addr,
383                    None => {
384                        tracing::debug!(
385                            "Stake table contract address not found. Retrying in {l1_retry:?}...",
386                        );
387                    },
388                }
389                sleep(l1_retry).await;
390            };
391
392            // Begin the main polling loop
393            loop {
394                let finalized_block = loop {
395                    if let Some(block) = state.lock().await.last_finalized {
396                        break block;
397                    }
398                    tracing::debug!(
399                        "Finalized block not yet available. Retrying in {l1_retry:?}",
400                    );
401                    sleep(l1_retry).await;
402                };
403
404                tracing::debug!(
405                    "Attempting to fetch stake table at L1 block {finalized_block:?}",
406                );
407
408                // Retry stake table fetch until it succeeds
409                loop {
410                    match self_clone
411                        .fetch_and_store_stake_table_events(stake_contract_address, finalized_block)
412                        .await
413                    {
414                        Ok(_) => {
415                            tracing::info!("Successfully fetched and stored stake table events at block={finalized_block:?}");
416                            break;
417                        },
418                        Err(e) => {
419                            tracing::error!(
420                                "Error fetching stake table at block {finalized_block:?}. err= {e:#}",
421                            );
422                            sleep(l1_retry).await;
423                        },
424                    }
425                }
426
427                tracing::debug!(
428                    "Waiting {update_delay:?} before next stake table update...",
429                );
430                sleep(update_delay).await;
431            }
432        }
433        .instrument(span)
434    }
435
436    pub async fn fetch_events(
437        &self,
438        contract: Address,
439        to_block: u64,
440    ) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
441        let persistence_lock = self.persistence.lock().await;
442        let res = persistence_lock.load_events().await?;
443        drop(persistence_lock);
444
445        let from_block = res
446            .as_ref()
447            .map(|(block, _)| block + 1)
448            .filter(|from| *from <= to_block); // Only use from_block if it's less than to_block
449
450        tracing::info!("loaded events from storage from_block={from_block:?}");
451
452        let contract_events = Self::fetch_events_from_contract(
453            self.l1_client.clone(),
454            contract,
455            from_block,
456            to_block,
457        )
458        .await?;
459
460        tracing::info!("loading events from contract to_block={to_block:?}");
461
462        let contract_events = contract_events.sort_events()?;
463        let mut events = match (from_block, res) {
464            (Some(_), Some((_, persistence_events))) => persistence_events
465                .into_iter()
466                .chain(contract_events)
467                .collect(),
468            _ => contract_events,
469        };
470
471        // There are no duplicates because the RPC returns all events,
472        // which are stored directly in persistence as is.
473        // However, this step is taken as a precaution.
474        // The vector is already sorted above, so this should be fast.
475        let len_before_dedup = events.len();
476        events.dedup();
477        let len_after_dedup = events.len();
478        if len_before_dedup != len_after_dedup {
479            tracing::warn!("Duplicate events found and removed. This should not normally happen.")
480        }
481
482        Ok(events)
483    }
484
485    /// Fetch all stake table events from L1
486    pub async fn fetch_events_from_contract(
487        l1_client: L1Client,
488        contract: Address,
489        from_block: Option<u64>,
490        to_block: u64,
491    ) -> anyhow::Result<StakeTableEvents> {
492        let stake_table_contract = StakeTable::new(contract, l1_client.provider.clone());
493
494        // get the block number when the contract was initialized
495        // to avoid fetching events from block number 0
496        let from_block = match from_block {
497            Some(block) => block,
498            None => {
499                loop {
500                    match stake_table_contract.initializedAtBlock().call().await {
501                        Ok(init_block) => {
502                            break init_block._0.to::<u64>();
503                        },
504                        Err(err) => {
505                            // Retry fetching incase of an error
506                            tracing::warn!(%err, "Failed to retrieve initial block, retrying..");
507                            sleep(l1_client.options().l1_retry_delay).await;
508                        },
509                    }
510                }
511            },
512        };
513
514        // To avoid making large RPC calls, divide the range into smaller chunks.
515        // chunk size is from env "ESPRESSO_SEQUENCER_L1_EVENTS_MAX_BLOCK_RANGE
516        // default value  is `10000` if env variable is not set
517        let mut start = from_block;
518        let end = to_block;
519        let chunk_size = l1_client.options().l1_events_max_block_range;
520        let chunks = std::iter::from_fn(move || {
521            let chunk_end = min(start + chunk_size - 1, end);
522            if chunk_end < start {
523                return None;
524            }
525
526            let chunk = (start, chunk_end);
527            start = chunk_end + 1;
528            Some(chunk)
529        });
530
531        // fetch registered events
532        // retry if the call to the provider to fetch the events fails
533        let registered_events = stream::iter(chunks.clone()).then(|(from, to)| {
534            let retry_delay = l1_client.options().l1_retry_delay;
535            let stake_table_contract = stake_table_contract.clone();
536            async move {
537                tracing::debug!(from, to, "fetch ValidatorRegistered events in range");
538                loop {
539                    match stake_table_contract
540                        .clone()
541                        .ValidatorRegistered_filter()
542                        .from_block(from)
543                        .to_block(to)
544                        .query()
545                        .await
546                    {
547                        Ok(events) => break stream::iter(events),
548                        Err(err) => {
549                            tracing::warn!(from, to, %err, "ValidatorRegistered Error");
550                            sleep(retry_delay).await;
551                        },
552                    }
553                }
554            }
555        });
556
557        // fetch validator de registration events
558        let deregistered_events = stream::iter(chunks.clone()).then(|(from, to)| {
559            let retry_delay = l1_client.options().l1_retry_delay;
560            let stake_table_contract = stake_table_contract.clone();
561            async move {
562                tracing::debug!(from, to, "fetch ValidatorExit events in range");
563                loop {
564                    match stake_table_contract
565                        .ValidatorExit_filter()
566                        .from_block(from)
567                        .to_block(to)
568                        .query()
569                        .await
570                    {
571                        Ok(events) => break stream::iter(events),
572                        Err(err) => {
573                            tracing::warn!(from, to, %err, "ValidatorExit Error");
574                            sleep(retry_delay).await;
575                        },
576                    }
577                }
578            }
579        });
580
581        // fetch delegated events
582        let delegated_events = stream::iter(chunks.clone()).then(|(from, to)| {
583            let retry_delay = l1_client.options().l1_retry_delay;
584            let stake_table_contract = stake_table_contract.clone();
585            async move {
586                tracing::debug!(from, to, "fetch Delegated events in range");
587                loop {
588                    match stake_table_contract
589                        .Delegated_filter()
590                        .from_block(from)
591                        .to_block(to)
592                        .query()
593                        .await
594                    {
595                        Ok(events) => break stream::iter(events),
596                        Err(err) => {
597                            tracing::warn!(from, to, %err, "Delegated Error");
598                            sleep(retry_delay).await;
599                        },
600                    }
601                }
602            }
603        });
604        // fetch undelegated events
605        let undelegated_events = stream::iter(chunks.clone()).then(|(from, to)| {
606            let retry_delay = l1_client.options().l1_retry_delay;
607            let stake_table_contract = stake_table_contract.clone();
608            async move {
609                tracing::debug!(from, to, "fetch Undelegated events in range");
610                loop {
611                    match stake_table_contract
612                        .Undelegated_filter()
613                        .from_block(from)
614                        .to_block(to)
615                        .query()
616                        .await
617                    {
618                        Ok(events) => break stream::iter(events),
619                        Err(err) => {
620                            tracing::warn!(from, to, %err, "Undelegated Error");
621                            sleep(retry_delay).await;
622                        },
623                    }
624                }
625            }
626        });
627
628        // fetch consensus keys updated events
629        let keys_update_events = stream::iter(chunks).then(|(from, to)| {
630            let retry_delay = l1_client.options().l1_retry_delay;
631            let stake_table_contract = stake_table_contract.clone();
632            async move {
633                tracing::debug!(from, to, "fetch ConsensusKeysUpdated events in range");
634                loop {
635                    match stake_table_contract
636                        .ConsensusKeysUpdated_filter()
637                        .from_block(from)
638                        .to_block(to)
639                        .query()
640                        .await
641                    {
642                        Ok(events) => break stream::iter(events),
643                        Err(err) => {
644                            tracing::warn!(from, to, %err, "ConsensusKeysUpdated Error");
645                            sleep(retry_delay).await;
646                        },
647                    }
648                }
649            }
650        });
651
652        let registrations = registered_events.flatten().collect().await;
653        let deregistrations = deregistered_events.flatten().collect().await;
654        let delegated = delegated_events.flatten().collect().await;
655        let undelegated = undelegated_events.flatten().collect().await;
656        let keys = keys_update_events.flatten().collect().await;
657
658        Ok(StakeTableEvents {
659            registrations,
660            deregistrations,
661            delegated,
662            undelegated,
663            keys,
664        })
665    }
666
667    /// Get `StakeTable` at specific l1 block height.
668    /// This function fetches and processes various events (ValidatorRegistered, ValidatorExit,
669    /// Delegated, Undelegated, and ConsensusKeysUpdated) within the block range from the
670    /// contract's initialization block to the provided `to_block` value.
671    /// Events are fetched in chunks to and retries are implemented for failed requests.
672    pub async fn fetch_and_store_stake_table_events(
673        &self,
674        contract: Address,
675        to_block: u64,
676    ) -> anyhow::Result<Vec<(EventKey, StakeTableEvent)>> {
677        let events = self.fetch_events(contract, to_block).await?;
678
679        tracing::info!("storing events in storage to_block={to_block:?}");
680
681        {
682            let persistence_lock = self.persistence.lock().await;
683            persistence_lock
684                .store_events(to_block, events.clone())
685                .await
686                .inspect_err(|e| tracing::error!("failed to store events. err={e}"))?;
687        }
688
689        Ok(events)
690    }
691
692    // Only used by staking CLI which doesn't have persistence
693    pub async fn fetch_all_validators(
694        l1_client: L1Client,
695        contract: Address,
696        to_block: u64,
697    ) -> anyhow::Result<IndexMap<Address, Validator<BLSPubKey>>> {
698        let events = Self::fetch_events_from_contract(l1_client, contract, None, to_block).await?;
699        let sorted = events.sort_events()?;
700        // Process the sorted events and return the resulting stake table.
701        validators_from_l1_events(sorted.into_iter().map(|(_, e)| e))
702    }
703
704    pub async fn fetch(
705        &self,
706        epoch: Epoch,
707        header: Header,
708    ) -> Option<IndexMap<Address, Validator<BLSPubKey>>> {
709        let chain_config = self.get_chain_config(&header).await.ok()?;
710        // update chain config
711        *self.chain_config.lock().await = chain_config;
712
713        let Some(address) = chain_config.stake_table_contract else {
714            tracing::error!("No stake table contract address found in Chain config");
715            return None;
716        };
717
718        let Some(l1_finalized_block_info) = header.l1_finalized() else {
719            tracing::error!("The epoch root for epoch {} is missing the L1 finalized block info. This is a fatal error. Consensus is blocked and will not recover.", epoch);
720            return None;
721        };
722
723        let events = match self
724            .fetch_and_store_stake_table_events(address, l1_finalized_block_info.number())
725            .await
726            .map_err(GetStakeTablesError::L1ClientFetchError)
727        {
728            Ok(events) => events,
729            Err(e) => {
730                tracing::error!("failed to fetch stake table events {e:?}");
731                return None;
732            },
733        };
734
735        match active_validator_set_from_l1_events(events.into_iter().map(|(_, e)| e)) {
736            Ok(validators) => Some(validators),
737            Err(e) => {
738                tracing::error!("failed to construct stake table {e:?}");
739                None
740            },
741        }
742    }
743
744    /// Retrieve and verify `ChainConfig`
745    // TODO move to appropriate object (Header?)
746    pub(crate) async fn get_chain_config(&self, header: &Header) -> anyhow::Result<ChainConfig> {
747        let chain_config = self.chain_config.lock().await;
748        let peers = self.peers.clone();
749        let header_cf = header.chain_config();
750        if chain_config.commit() == header_cf.commit() {
751            return Ok(*chain_config);
752        }
753
754        let cf = match header_cf.resolve() {
755            Some(cf) => cf,
756            None => peers
757                .fetch_chain_config(header_cf.commit())
758                .await
759                .map_err(|err| {
760                    tracing::error!("failed to get chain_config from peers. err: {err:?}");
761                    err
762                })?,
763        };
764
765        Ok(cf)
766    }
767
768    #[cfg(any(test, feature = "testing"))]
769    pub fn mock() -> Self {
770        use crate::{mock, v0_1::NoStorage};
771        let chain_config = ChainConfig::default();
772        let l1 = L1Client::new(vec!["http://localhost:3331".parse().unwrap()])
773            .expect("Failed to create L1 client");
774
775        let peers = Arc::new(mock::MockStateCatchup::default());
776        let persistence = NoStorage;
777
778        Self::new(peers, Arc::new(Mutex::new(persistence)), l1, chain_config)
779    }
780}
781/// Holds Stake table and da stake
782#[derive(Clone, Debug)]
783struct NonEpochCommittee {
784    /// The nodes eligible for leadership.
785    /// NOTE: This is currently a hack because the DA leader needs to be the quorum
786    /// leader but without voting rights.
787    eligible_leaders: Vec<PeerConfig<SeqTypes>>,
788
789    /// Keys for nodes participating in the network
790    stake_table: Vec<PeerConfig<SeqTypes>>,
791
792    /// Keys for DA members
793    da_members: Vec<PeerConfig<SeqTypes>>,
794
795    /// Stake entries indexed by public key, for efficient lookup.
796    indexed_stake_table: HashMap<PubKey, PeerConfig<SeqTypes>>,
797
798    /// DA entries indexed by public key, for efficient lookup.
799    indexed_da_members: HashMap<PubKey, PeerConfig<SeqTypes>>,
800}
801
802/// Holds Stake table and da stake
803#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
804pub struct EpochCommittee {
805    /// The nodes eligible for leadership.
806    /// NOTE: This is currently a hack because the DA leader needs to be the quorum
807    /// leader but without voting rights.
808    eligible_leaders: Vec<PeerConfig<SeqTypes>>,
809    /// Keys for nodes participating in the network
810    stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>>,
811    validators: IndexMap<Address, Validator<BLSPubKey>>,
812    address_mapping: HashMap<BLSPubKey, Address>,
813}
814
815impl EpochCommittees {
816    pub fn first_epoch(&self) -> Option<Epoch> {
817        self.first_epoch
818    }
819
820    pub fn fetcher(&self) -> &StakeTableFetcher {
821        &self.fetcher
822    }
823
824    /// Updates `Self.stake_table` with stake_table for
825    /// `Self.contract_address` at `l1_block_height`. This is intended
826    /// to be called before calling `self.stake()` so that
827    /// `Self.stake_table` only needs to be updated once in a given
828    /// life-cycle but may be read from many times.
829    fn update_stake_table(
830        &mut self,
831        epoch: EpochNumber,
832        validators: IndexMap<Address, Validator<BLSPubKey>>,
833    ) {
834        let mut address_mapping = HashMap::new();
835        let stake_table: IndexMap<PubKey, PeerConfig<SeqTypes>> = validators
836            .values()
837            .map(|v| {
838                address_mapping.insert(v.stake_table_key, v.account);
839                (
840                    v.stake_table_key,
841                    PeerConfig {
842                        stake_table_entry: BLSPubKey::stake_table_entry(
843                            &v.stake_table_key,
844                            v.stake,
845                        ),
846                        state_ver_key: v.state_ver_key.clone(),
847                    },
848                )
849            })
850            .collect();
851
852        let eligible_leaders: Vec<PeerConfig<SeqTypes>> =
853            stake_table.iter().map(|(_, l)| l.clone()).collect();
854
855        self.state.insert(
856            epoch,
857            EpochCommittee {
858                eligible_leaders,
859                stake_table,
860                validators,
861                address_mapping,
862            },
863        );
864    }
865
866    pub fn validators(
867        &self,
868        epoch: &Epoch,
869    ) -> anyhow::Result<IndexMap<Address, Validator<BLSPubKey>>> {
870        Ok(self
871            .state
872            .get(epoch)
873            .context("state for found")?
874            .validators
875            .clone())
876    }
877
878    pub fn address(&self, epoch: &Epoch, bls_key: BLSPubKey) -> anyhow::Result<Address> {
879        let mapping = self
880            .state
881            .get(epoch)
882            .context("state for found")?
883            .address_mapping
884            .clone();
885
886        Ok(*mapping.get(&bls_key).context(format!(
887            "failed to get ethereum address for bls key {bls_key}. epoch={epoch:?}"
888        ))?)
889    }
890
891    pub fn get_validator_config(
892        &self,
893        epoch: &Epoch,
894        key: BLSPubKey,
895    ) -> anyhow::Result<Validator<BLSPubKey>> {
896        let address = self.address(epoch, key)?;
897        let validators = self.validators(epoch)?;
898        validators
899            .get(&address)
900            .context("validator not found")
901            .cloned()
902    }
903
904    // We need a constructor to match our concrete type.
905    pub fn new_stake(
906        // TODO remove `new` from trait and rename this to `new`.
907        // https://github.com/EspressoSystems/HotShot/commit/fcb7d54a4443e29d643b3bbc53761856aef4de8b
908        committee_members: Vec<PeerConfig<SeqTypes>>,
909        da_members: Vec<PeerConfig<SeqTypes>>,
910        fetcher: StakeTableFetcher,
911    ) -> Self {
912        // For each member, get the stake table entry
913        let stake_table: Vec<_> = committee_members
914            .iter()
915            .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
916            .cloned()
917            .collect();
918
919        let eligible_leaders = stake_table.clone();
920        // For each member, get the stake table entry
921        let da_members: Vec<_> = da_members
922            .iter()
923            .filter(|&peer_config| peer_config.stake_table_entry.stake() > U256::ZERO)
924            .cloned()
925            .collect();
926
927        // Index the stake table by public key
928        let indexed_stake_table: HashMap<PubKey, _> = stake_table
929            .iter()
930            .map(|peer_config| {
931                (
932                    PubKey::public_key(&peer_config.stake_table_entry),
933                    peer_config.clone(),
934                )
935            })
936            .collect();
937
938        // Index the stake table by public key
939        let indexed_da_members: HashMap<PubKey, _> = da_members
940            .iter()
941            .map(|peer_config| {
942                (
943                    PubKey::public_key(&peer_config.stake_table_entry),
944                    peer_config.clone(),
945                )
946            })
947            .collect();
948
949        let members = NonEpochCommittee {
950            eligible_leaders,
951            stake_table,
952            da_members,
953            indexed_stake_table,
954            indexed_da_members,
955        };
956
957        let mut map = HashMap::new();
958        let epoch_committee = EpochCommittee {
959            eligible_leaders: members.eligible_leaders.clone(),
960            stake_table: members
961                .stake_table
962                .iter()
963                .map(|x| (PubKey::public_key(&x.stake_table_entry), x.clone()))
964                .collect(),
965            validators: Default::default(),
966            address_mapping: HashMap::new(),
967        };
968        map.insert(Epoch::genesis(), epoch_committee.clone());
969        // TODO: remove this, workaround for hotshot asking for stake tables from epoch 1
970        map.insert(Epoch::genesis() + 1u64, epoch_committee.clone());
971
972        Self {
973            non_epoch_committee: members,
974            state: map,
975            randomized_committees: BTreeMap::new(),
976            first_epoch: None,
977            fetcher: Arc::new(fetcher),
978        }
979    }
980
981    pub async fn reload_stake(&mut self, limit: u64) {
982        // Load the 50 latest stored stake tables
983
984        let loaded_stake = match self
985            .fetcher
986            .persistence
987            .lock()
988            .await
989            .load_latest_stake(limit)
990            .await
991        {
992            Ok(Some(loaded)) => loaded,
993            Ok(None) => {
994                tracing::warn!("No stake table history found in persistence!");
995                return;
996            },
997            Err(e) => {
998                tracing::error!("Failed to load stake table history from persistence: {}", e);
999                return;
1000            },
1001        };
1002
1003        for (epoch, stake_table) in loaded_stake {
1004            self.update_stake_table(epoch, stake_table);
1005        }
1006    }
1007
1008    fn get_stake_table(&self, epoch: &Option<Epoch>) -> Option<Vec<PeerConfig<SeqTypes>>> {
1009        if let Some(epoch) = epoch {
1010            self.state
1011                .get(epoch)
1012                .map(|committee| committee.stake_table.clone().into_values().collect())
1013        } else {
1014            Some(self.non_epoch_committee.stake_table.clone())
1015        }
1016    }
1017}
1018
1019#[derive(Error, Debug)]
1020/// Error representing fail cases for retrieving the stake table.
1021enum GetStakeTablesError {
1022    #[error("Error fetching from L1: {0}")]
1023    L1ClientFetchError(anyhow::Error),
1024}
1025
1026#[derive(Error, Debug)]
1027#[error("Could not lookup leader")] // TODO error variants? message?
1028pub struct LeaderLookupError;
1029
1030// #[async_trait]
1031impl Membership<SeqTypes> for EpochCommittees {
1032    type Error = LeaderLookupError;
1033    // DO NOT USE. Dummy constructor to comply w/ trait.
1034    fn new(
1035        // TODO remove `new` from trait and remove this fn as well.
1036        // https://github.com/EspressoSystems/HotShot/commit/fcb7d54a4443e29d643b3bbc53761856aef4de8b
1037        _committee_members: Vec<PeerConfig<SeqTypes>>,
1038        _da_members: Vec<PeerConfig<SeqTypes>>,
1039    ) -> Self {
1040        panic!("This function has been replaced with new_stake()");
1041    }
1042
1043    /// Get the stake table for the current view
1044    fn stake_table(&self, epoch: Option<Epoch>) -> HSStakeTable<SeqTypes> {
1045        self.get_stake_table(&epoch).unwrap_or_default().into()
1046    }
1047    /// Get the stake table for the current view
1048    fn da_stake_table(&self, _epoch: Option<Epoch>) -> HSStakeTable<SeqTypes> {
1049        self.non_epoch_committee.da_members.clone().into()
1050    }
1051
1052    /// Get all members of the committee for the current view
1053    fn committee_members(
1054        &self,
1055        _view_number: <SeqTypes as NodeType>::View,
1056        epoch: Option<Epoch>,
1057    ) -> BTreeSet<PubKey> {
1058        let stake_table = self.stake_table(epoch);
1059        stake_table
1060            .iter()
1061            .map(|x| PubKey::public_key(&x.stake_table_entry))
1062            .collect()
1063    }
1064
1065    /// Get all members of the committee for the current view
1066    fn da_committee_members(
1067        &self,
1068        _view_number: <SeqTypes as NodeType>::View,
1069        _epoch: Option<Epoch>,
1070    ) -> BTreeSet<PubKey> {
1071        self.non_epoch_committee
1072            .indexed_da_members
1073            .clone()
1074            .into_keys()
1075            .collect()
1076    }
1077
1078    /// Get the stake table entry for a public key
1079    fn stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> Option<PeerConfig<SeqTypes>> {
1080        // Only return the stake if it is above zero
1081        if let Some(epoch) = epoch {
1082            self.state
1083                .get(&epoch)
1084                .and_then(|h| h.stake_table.get(pub_key))
1085                .cloned()
1086        } else {
1087            self.non_epoch_committee
1088                .indexed_stake_table
1089                .get(pub_key)
1090                .cloned()
1091        }
1092    }
1093
1094    /// Get the DA stake table entry for a public key
1095    fn da_stake(&self, pub_key: &PubKey, _epoch: Option<Epoch>) -> Option<PeerConfig<SeqTypes>> {
1096        // Only return the stake if it is above zero
1097        self.non_epoch_committee
1098            .indexed_da_members
1099            .get(pub_key)
1100            .cloned()
1101    }
1102
1103    /// Check if a node has stake in the committee
1104    fn has_stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> bool {
1105        self.stake(pub_key, epoch)
1106            .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1107            .unwrap_or_default()
1108    }
1109
1110    /// Check if a node has stake in the committee
1111    fn has_da_stake(&self, pub_key: &PubKey, epoch: Option<Epoch>) -> bool {
1112        self.da_stake(pub_key, epoch)
1113            .map(|x| x.stake_table_entry.stake() > U256::ZERO)
1114            .unwrap_or_default()
1115    }
1116
1117    /// Index the vector of public keys with the current view number
1118    fn lookup_leader(
1119        &self,
1120        view_number: <SeqTypes as NodeType>::View,
1121        epoch: Option<Epoch>,
1122    ) -> Result<PubKey, Self::Error> {
1123        if let Some(epoch) = epoch {
1124            let Some(randomized_committee) = self.randomized_committees.get(&epoch) else {
1125                tracing::error!(
1126                    "We are missing the randomized committee for epoch {}",
1127                    epoch
1128                );
1129                return Err(LeaderLookupError);
1130            };
1131
1132            Ok(PubKey::public_key(&select_randomized_leader(
1133                randomized_committee,
1134                *view_number,
1135            )))
1136        } else {
1137            let leaders = &self.non_epoch_committee.eligible_leaders;
1138
1139            let index = *view_number as usize % leaders.len();
1140            let res = leaders[index].clone();
1141            Ok(PubKey::public_key(&res.stake_table_entry))
1142        }
1143    }
1144
1145    /// Get the total number of nodes in the committee
1146    fn total_nodes(&self, epoch: Option<Epoch>) -> usize {
1147        self.stake_table(epoch).len()
1148    }
1149
1150    /// Get the total number of DA nodes in the committee
1151    fn da_total_nodes(&self, epoch: Option<Epoch>) -> usize {
1152        self.da_stake_table(epoch).len()
1153    }
1154
1155    /// Get the voting success threshold for the committee
1156    fn success_threshold(&self, epoch: Option<Epoch>) -> U256 {
1157        let total_stake = self.total_stake(epoch);
1158        let one = U256::ONE;
1159        let two = U256::from(2);
1160        let three = U256::from(3);
1161        if total_stake < U256::MAX / two {
1162            ((total_stake * two) / three) + one
1163        } else {
1164            ((total_stake / three) * two) + two
1165        }
1166    }
1167
1168    /// Get the voting success threshold for the committee
1169    fn da_success_threshold(&self, epoch: Option<Epoch>) -> U256 {
1170        let total_stake = self.total_da_stake(epoch);
1171        let one = U256::ONE;
1172        let two = U256::from(2);
1173        let three = U256::from(3);
1174
1175        if total_stake < U256::MAX / two {
1176            ((total_stake * two) / three) + one
1177        } else {
1178            ((total_stake / three) * two) + two
1179        }
1180    }
1181
1182    /// Get the voting failure threshold for the committee
1183    fn failure_threshold(&self, epoch: Option<Epoch>) -> U256 {
1184        let total_stake = self.total_stake(epoch);
1185        let one = U256::ONE;
1186        let three = U256::from(3);
1187
1188        (total_stake / three) + one
1189    }
1190
1191    /// Get the voting upgrade threshold for the committee
1192    fn upgrade_threshold(&self, epoch: Option<Epoch>) -> U256 {
1193        let total_stake = self.total_stake(epoch);
1194        let nine = U256::from(9);
1195        let ten = U256::from(10);
1196
1197        let normal_threshold = self.success_threshold(epoch);
1198        let higher_threshold = if total_stake < U256::MAX / nine {
1199            (total_stake * nine) / ten
1200        } else {
1201            (total_stake / ten) * nine
1202        };
1203
1204        max(higher_threshold, normal_threshold)
1205    }
1206
1207    #[allow(refining_impl_trait)]
1208    async fn add_epoch_root(
1209        &self,
1210        epoch: Epoch,
1211        block_header: Header,
1212    ) -> Option<Box<dyn FnOnce(&mut Self) + Send>> {
1213        if self.state.contains_key(&epoch) {
1214            tracing::info!(
1215                "We already have a the stake table for epoch {}. Skipping L1 fetching.",
1216                epoch
1217            );
1218            return None;
1219        }
1220
1221        let stake_tables = self.fetcher.fetch(epoch, block_header).await?;
1222        // Store stake table in persistence
1223        {
1224            let persistence_lock = self.fetcher.persistence.lock().await;
1225            if let Err(e) = persistence_lock
1226                .store_stake(epoch, stake_tables.clone())
1227                .await
1228            {
1229                tracing::error!(?e, "`add_epoch_root`, error storing stake table");
1230            }
1231        }
1232
1233        Some(Box::new(move |committee: &mut Self| {
1234            committee.update_stake_table(epoch, stake_tables);
1235        }))
1236    }
1237
1238    fn has_stake_table(&self, epoch: Epoch) -> bool {
1239        self.state.contains_key(&epoch)
1240    }
1241
1242    fn has_randomized_stake_table(&self, epoch: Epoch) -> bool {
1243        match self.first_epoch {
1244            None => true,
1245            Some(first_epoch) => {
1246                if epoch < first_epoch {
1247                    self.state.contains_key(&epoch)
1248                } else {
1249                    self.randomized_committees.contains_key(&epoch)
1250                }
1251            },
1252        }
1253    }
1254
1255    async fn get_epoch_root(
1256        membership: Arc<RwLock<Self>>,
1257        block_height: u64,
1258        epoch: Epoch,
1259    ) -> anyhow::Result<Leaf2> {
1260        let membership_reader = membership.read().await;
1261        let peers = membership_reader.fetcher.peers.clone();
1262        let stake_table = membership_reader.stake_table(Some(epoch)).clone();
1263        let success_threshold = membership_reader.success_threshold(Some(epoch));
1264        drop(membership_reader);
1265
1266        // Fetch leaves from peers
1267        let leaf: Leaf2 = peers
1268            .fetch_leaf(block_height, stake_table.clone(), success_threshold)
1269            .await?;
1270
1271        Ok(leaf)
1272    }
1273
1274    async fn get_epoch_drb(
1275        membership: Arc<RwLock<Self>>,
1276        block_height: u64,
1277        epoch: Epoch,
1278    ) -> anyhow::Result<DrbResult> {
1279        let membership_reader = membership.read().await;
1280        let peers = membership_reader.fetcher.peers.clone();
1281        let stake_table = membership_reader.stake_table(Some(epoch)).clone();
1282        let success_threshold = membership_reader.success_threshold(Some(epoch));
1283        drop(membership_reader);
1284
1285        tracing::debug!(
1286            "Getting DRB for epoch {:?}, block height {:?}",
1287            epoch,
1288            block_height
1289        );
1290        let drb_leaf = peers
1291            .try_fetch_leaf(1, block_height, stake_table, success_threshold)
1292            .await?;
1293
1294        let Some(drb) = drb_leaf.next_drb_result else {
1295            tracing::error!(
1296          "We received a leaf that should contain a DRB result, but the DRB result is missing: {:?}",
1297          drb_leaf
1298        );
1299
1300            bail!("DRB leaf is missing the DRB result.");
1301        };
1302
1303        Ok(drb)
1304    }
1305
1306    fn add_drb_result(&mut self, epoch: Epoch, drb: DrbResult) {
1307        let Some(raw_stake_table) = self.state.get(&epoch) else {
1308            tracing::error!("add_drb_result({}, {:?}) was called, but we do not yet have the stake table for epoch {}", epoch, drb, epoch);
1309            return;
1310        };
1311
1312        let leaders = raw_stake_table
1313            .eligible_leaders
1314            .clone()
1315            .into_iter()
1316            .map(|peer_config| peer_config.stake_table_entry)
1317            .collect::<Vec<_>>();
1318        let randomized_committee = generate_stake_cdf(leaders, drb);
1319
1320        self.randomized_committees
1321            .insert(epoch, randomized_committee);
1322    }
1323
1324    fn set_first_epoch(&mut self, epoch: Epoch, initial_drb_result: DrbResult) {
1325        self.first_epoch = Some(epoch);
1326
1327        let epoch_committee = self.state.get(&Epoch::genesis()).unwrap().clone();
1328        self.state.insert(epoch, epoch_committee.clone());
1329        self.state.insert(epoch + 1, epoch_committee);
1330        self.add_drb_result(epoch, initial_drb_result);
1331        self.add_drb_result(epoch + 1, initial_drb_result);
1332    }
1333}
1334
1335#[cfg(any(test, feature = "testing"))]
1336impl super::v0_3::StakeTable {
1337    /// Generate a `StakeTable` with `n` members.
1338    pub fn mock(n: u64) -> Self {
1339        [..n]
1340            .iter()
1341            .map(|_| PeerConfig::default())
1342            .collect::<Vec<PeerConfig<SeqTypes>>>()
1343            .into()
1344    }
1345}
1346
1347#[cfg(any(test, feature = "testing"))]
1348impl DAMembers {
1349    /// Generate a `DaMembers` (alias committee) with `n` members.
1350    pub fn mock(n: u64) -> Self {
1351        [..n]
1352            .iter()
1353            .map(|_| PeerConfig::default())
1354            .collect::<Vec<PeerConfig<SeqTypes>>>()
1355            .into()
1356    }
1357}
1358
1359#[cfg(any(test, feature = "testing"))]
1360pub mod testing {
1361    use hotshot_contract_adapter::sol_types::{EdOnBN254PointSol, G2PointSol};
1362    use hotshot_types::light_client::StateKeyPair;
1363    use rand::{Rng as _, RngCore as _};
1364
1365    use super::*;
1366
1367    // TODO: current tests are just sanity checks, we need more.
1368
1369    pub struct TestValidator {
1370        pub account: Address,
1371        pub bls_vk: G2PointSol,
1372        pub schnorr_vk: EdOnBN254PointSol,
1373        pub commission: u16,
1374    }
1375
1376    impl TestValidator {
1377        pub fn random() -> Self {
1378            let rng = &mut rand::thread_rng();
1379            let mut seed = [0u8; 32];
1380            rng.fill_bytes(&mut seed);
1381
1382            let (bls_vk, _) = BLSPubKey::generated_from_seed_indexed(seed, 0);
1383            let schnorr_vk: EdOnBN254PointSol = StateKeyPair::generate_from_seed_indexed(seed, 0)
1384                .ver_key()
1385                .to_affine()
1386                .into();
1387
1388            Self {
1389                account: Address::random(),
1390                bls_vk: bls_vk.to_affine().into(),
1391                schnorr_vk,
1392                commission: rng.gen_range(0..10000),
1393            }
1394        }
1395    }
1396
1397    impl Validator<BLSPubKey> {
1398        pub fn mock() -> Validator<BLSPubKey> {
1399            let val = TestValidator::random();
1400            let rng = &mut rand::thread_rng();
1401            let mut seed = [1u8; 32];
1402            rng.fill_bytes(&mut seed);
1403            let mut validator_stake = alloy::primitives::U256::from(0);
1404            let mut delegators = HashMap::new();
1405            for _i in 0..=5000 {
1406                let stake: u64 = rng.gen_range(0..10000);
1407                delegators.insert(Address::random(), alloy::primitives::U256::from(stake));
1408                validator_stake += alloy::primitives::U256::from(stake);
1409            }
1410
1411            let stake_table_key = val.bls_vk.clone().into();
1412            let state_ver_key = val.schnorr_vk.clone().into();
1413
1414            Validator {
1415                account: val.account,
1416                stake_table_key,
1417                state_ver_key,
1418                stake: validator_stake,
1419                commission: val.commission,
1420                delegators,
1421            }
1422        }
1423    }
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428    use alloy::primitives::Address;
1429    use sequencer_utils::test_utils::setup_test;
1430
1431    use super::*;
1432    use crate::v0::impls::testing::*;
1433
1434    #[test]
1435    fn test_from_l1_events() -> anyhow::Result<()> {
1436        setup_test();
1437        // Build a stake table with one DA node and one consensus node.
1438        let val = TestValidator::random();
1439        let val_new_keys = TestValidator::random();
1440        let delegator = Address::random();
1441        let mut events: Vec<StakeTableEvent> = [
1442            ValidatorRegistered {
1443                account: val.account,
1444                blsVk: val.bls_vk.clone(),
1445                schnorrVk: val.schnorr_vk.clone(),
1446                commission: val.commission,
1447            }
1448            .into(),
1449            Delegated {
1450                delegator,
1451                validator: val.account,
1452                amount: U256::from(10),
1453            }
1454            .into(),
1455            ConsensusKeysUpdated {
1456                account: val.account,
1457                blsVK: val_new_keys.bls_vk.clone(),
1458                schnorrVK: val_new_keys.schnorr_vk.clone(),
1459            }
1460            .into(),
1461            Undelegated {
1462                delegator,
1463                validator: val.account,
1464                amount: U256::from(7),
1465            }
1466            .into(),
1467            // delegate to the same validator again
1468            Delegated {
1469                delegator,
1470                validator: val.account,
1471                amount: U256::from(5),
1472            }
1473            .into(),
1474        ]
1475        .to_vec();
1476
1477        let st = active_validator_set_from_l1_events(events.iter().cloned())?;
1478        let st_val = st.get(&val.account).unwrap();
1479        // final staked amount should be 10 (delegated) - 7 (undelegated) + 5 (Delegated)
1480        assert_eq!(st_val.stake, U256::from(8));
1481        assert_eq!(st_val.commission, val.commission);
1482        assert_eq!(st_val.delegators.len(), 1);
1483        // final delegated amount should be 10 (delegated) - 7 (undelegated) + 5 (Delegated)
1484        assert_eq!(*st_val.delegators.get(&delegator).unwrap(), U256::from(8));
1485
1486        events.push(
1487            ValidatorExit {
1488                validator: val.account,
1489            }
1490            .into(),
1491        );
1492
1493        // This should fail because the validator has exited and no longer exists in the stake table.
1494        assert!(active_validator_set_from_l1_events(events.iter().cloned()).is_err());
1495
1496        Ok(())
1497    }
1498
1499    #[test]
1500    fn test_from_l1_events_failures() -> anyhow::Result<()> {
1501        let val = TestValidator::random();
1502        let delegator = Address::random();
1503
1504        let register: StakeTableEvent = ValidatorRegistered {
1505            account: val.account,
1506            blsVk: val.bls_vk.clone(),
1507            schnorrVk: val.schnorr_vk.clone(),
1508            commission: val.commission,
1509        }
1510        .into();
1511        let delegate: StakeTableEvent = Delegated {
1512            delegator,
1513            validator: val.account,
1514            amount: U256::from(10),
1515        }
1516        .into();
1517        let key_update: StakeTableEvent = ConsensusKeysUpdated {
1518            account: val.account,
1519            blsVK: val.bls_vk.clone(),
1520            schnorrVK: val.schnorr_vk.clone(),
1521        }
1522        .into();
1523        let undelegate: StakeTableEvent = Undelegated {
1524            delegator,
1525            validator: val.account,
1526            amount: U256::from(7),
1527        }
1528        .into();
1529
1530        let exit: StakeTableEvent = ValidatorExit {
1531            validator: val.account,
1532        }
1533        .into();
1534
1535        let cases = [
1536            vec![exit],
1537            vec![undelegate.clone()],
1538            vec![delegate.clone()],
1539            vec![key_update],
1540            vec![register.clone(), register.clone()],
1541            vec![register, delegate, undelegate.clone(), undelegate],
1542        ];
1543
1544        for events in cases.iter() {
1545            let res = active_validator_set_from_l1_events(events.iter().cloned());
1546            assert!(
1547                res.is_err(),
1548                "events {:?}, not a valid sequencer of events",
1549                res
1550            );
1551        }
1552        Ok(())
1553    }
1554
1555    #[test]
1556    fn test_validators_selection() {
1557        let mut validators = IndexMap::new();
1558        let mut highest_stake = alloy::primitives::U256::ZERO;
1559
1560        for _i in 0..3000 {
1561            let validator = Validator::mock();
1562            validators.insert(validator.account, validator.clone());
1563
1564            if validator.stake > highest_stake {
1565                highest_stake = validator.stake;
1566            }
1567        }
1568
1569        let minimum_stake = highest_stake / U256::from(VID_TARGET_TOTAL_STAKE);
1570
1571        select_active_validator_set(&mut validators).expect("Failed to select validators");
1572        assert!(
1573            validators.len() <= 100,
1574            "validators len is {}, expected at most 100",
1575            validators.len()
1576        );
1577
1578        let mut selected_validators_highest_stake = alloy::primitives::U256::ZERO;
1579        // Ensure every validator in the final selection is above or equal to minimum stake
1580        for (address, validator) in &validators {
1581            assert!(
1582                validator.stake >= minimum_stake,
1583                "Validator {:?} has stake below minimum: {}",
1584                address,
1585                validator.stake
1586            );
1587
1588            if validator.stake > selected_validators_highest_stake {
1589                selected_validators_highest_stake = validator.stake;
1590            }
1591        }
1592    }
1593}