sequencer/state_signature/relay_server/
stake_table_tracker.rs

1use std::{
2    collections::{BTreeSet, HashMap},
3    sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_lock::RwLock;
8use espresso_contract_deployer::network_config::{
9    fetch_epoch_config_from_sequencer, fetch_stake_table_from_sequencer,
10};
11use hotshot_types::{
12    data::EpochNumber,
13    light_client::StateVerKey,
14    stake_table::one_honest_threshold,
15    traits::{node_implementation::ConsensusTime, signature_key::StakeTableEntryType},
16    utils::epoch_from_block_number,
17};
18use url::Url;
19
20/// Stake table info for a specific epoch
21#[derive(Clone, Debug, Default)]
22pub struct StakeTableInfo {
23    /// Minimum weight to form an available state signature bundle
24    pub threshold: U256,
25    /// Stake table: map(vk, weight)
26    pub known_nodes: HashMap<StateVerKey, U256>,
27}
28
29/// Tracks the stake table info for each epoch
30pub struct StakeTableTrackerInner {
31    /// Sequencer endpoint to query for stake table info
32    sequencer_url: Url,
33
34    /// Blocks per epoch, should be initialized from the sequencer
35    blocks_per_epoch: Option<u64>,
36
37    /// Epoch start block, should be initialized from the sequencer
38    epoch_start_block: Option<u64>,
39
40    /// Stake table info for each epoch
41    stake_table_infos: HashMap<u64, Arc<StakeTableInfo>>,
42
43    /// Genesis stake table info
44    genesis_stake_table_info: Option<Arc<StakeTableInfo>>,
45
46    /// Queue for garbage collection
47    gc_queue: BTreeSet<u64>,
48}
49
50/// Number of epochs to keep the stake table info
51const PRUNE_GAP: u64 = 2;
52
53/// Tracks the stake table info for each epoch
54pub struct StakeTableTracker {
55    inner: Arc<RwLock<StakeTableTrackerInner>>,
56}
57
58impl StakeTableTracker {
59    pub fn new(sequencer_url: Url) -> Self {
60        Self {
61            inner: Arc::new(RwLock::new(StakeTableTrackerInner {
62                sequencer_url,
63                blocks_per_epoch: None,
64                epoch_start_block: None,
65                stake_table_infos: HashMap::new(),
66                genesis_stake_table_info: None,
67                gc_queue: BTreeSet::new(),
68            })),
69        }
70    }
71
72    /// Return the genesis stake table info
73    pub async fn genesis_stake_table_info(&self) -> anyhow::Result<Arc<StakeTableInfo>> {
74        tracing::trace!("Acquire read lock for genesis stake table info");
75        let read_guard = self.inner.read().await;
76        if let Some(stake_table_info) = &read_guard.genesis_stake_table_info {
77            return Ok(stake_table_info.clone());
78        }
79        tracing::trace!("Drop read lock for genesis stake table info");
80        drop(read_guard);
81        tracing::trace!("Acquire write lock for genesis stake table info");
82        let mut write_guard = self.inner.write().await;
83
84        if let Some(stake_table_info) = &write_guard.genesis_stake_table_info {
85            return Ok(stake_table_info.clone());
86        }
87
88        let genesis_stake_table =
89            fetch_stake_table_from_sequencer(&write_guard.sequencer_url, None).await?;
90        let genesis_total_stake = genesis_stake_table.total_stakes();
91
92        tracing::debug!("Fetching genesis stake table from sequencer");
93        let genesis_stake_table_info = Arc::new(StakeTableInfo {
94            threshold: one_honest_threshold(genesis_total_stake),
95            known_nodes: genesis_stake_table
96                .into_iter()
97                .map(|entry| (entry.state_ver_key, entry.stake_table_entry.stake()))
98                .collect(),
99        });
100        tracing::debug!("Genesis stake table info updated");
101
102        write_guard.genesis_stake_table_info = Some(genesis_stake_table_info.clone());
103        tracing::trace!("Drop write lock for genesis stake table info");
104
105        Ok(genesis_stake_table_info)
106    }
107
108    /// Return the stake table info for the given block height
109    /// If the block height is older than the epoch start block, return the genesis stake table info
110    pub async fn stake_table_info_for_block(
111        &self,
112        block_height: u64,
113    ) -> anyhow::Result<Arc<StakeTableInfo>> {
114        tracing::debug!("Fetch stake table for block {block_height}");
115
116        tracing::trace!("Acquire read lock for stake table info");
117        let read_guard = self.inner.read().await;
118        let (blocks_per_epoch, epoch_start_block) =
119            if let Some(blocks_per_epoch) = read_guard.blocks_per_epoch {
120                let epoch_start_block = read_guard.epoch_start_block.unwrap();
121                tracing::trace!("Drop read lock for stake table info");
122                drop(read_guard);
123                (blocks_per_epoch, epoch_start_block)
124            } else {
125                tracing::trace!("Drop read lock for stake table info");
126                drop(read_guard);
127                tracing::trace!("Acquire write lock for stake table info");
128                let mut write_guard = self.inner.write().await;
129                if let Some(blocks_per_epoch) = write_guard.blocks_per_epoch {
130                    (blocks_per_epoch, write_guard.epoch_start_block.unwrap())
131                } else {
132                    tracing::debug!("Fetching epoch config from sequencer");
133                    let (blocks_per_epoch, epoch_start_block) =
134                        fetch_epoch_config_from_sequencer(&write_guard.sequencer_url).await?;
135                    write_guard.blocks_per_epoch.get_or_insert(blocks_per_epoch);
136                    write_guard
137                        .epoch_start_block
138                        .get_or_insert(epoch_start_block);
139                    tracing::debug!(
140                        "Fetched epoch config from sequencer: blocks_per_epoch: {}, \
141                         epoch_start_block: {}",
142                        blocks_per_epoch,
143                        epoch_start_block
144                    );
145                    tracing::trace!("Drop write lock for stake table info");
146                    drop(write_guard);
147                    (blocks_per_epoch, epoch_start_block)
148                }
149            };
150        if block_height <= epoch_start_block || blocks_per_epoch == 0 {
151            return self.genesis_stake_table_info().await;
152        }
153
154        let epoch = epoch_from_block_number(block_height, blocks_per_epoch);
155        tracing::trace!("Acquire read lock for stake table info");
156        let read_guard = self.inner.read().await;
157        if let Some(stake_table_info) = read_guard.stake_table_infos.get(&epoch) {
158            return Ok(stake_table_info.clone());
159        }
160        tracing::trace!("Drop read lock for stake table info");
161        drop(read_guard);
162        tracing::trace!("Acquire write lock for stake table info");
163        let mut write_guard = self.inner.write().await;
164        if let Some(stake_table_info) = write_guard.stake_table_infos.get(&epoch) {
165            return Ok(stake_table_info.clone());
166        }
167
168        tracing::debug!("Fetching stake table for epoch {} from sequencer", epoch);
169        let stake_table = fetch_stake_table_from_sequencer(
170            &write_guard.sequencer_url,
171            Some(EpochNumber::new(epoch)),
172        )
173        .await?;
174        let total_stake = stake_table.total_stakes();
175
176        let stake_table_info = Arc::new(StakeTableInfo {
177            threshold: one_honest_threshold(total_stake),
178            known_nodes: stake_table
179                .into_iter()
180                .map(|entry| (entry.state_ver_key, entry.stake_table_entry.stake()))
181                .collect(),
182        });
183
184        write_guard
185            .stake_table_infos
186            .insert(epoch, stake_table_info.clone());
187        write_guard.gc_queue.insert(epoch);
188        tracing::debug!("Stake table info for epoch {} updated", epoch);
189        // Remove the stake table info if it's older than 2 epochs
190        while let Some(&old_epoch) = write_guard.gc_queue.first() {
191            if epoch < PRUNE_GAP || old_epoch >= epoch - PRUNE_GAP {
192                break;
193            }
194            write_guard.stake_table_infos.remove(&old_epoch);
195            write_guard.gc_queue.pop_first();
196            tracing::debug!(%old_epoch, "garbage collected for epoch");
197        }
198        tracing::trace!("Drop write lock for stake table info");
199
200        Ok(stake_table_info)
201    }
202}