sequencer/state_signature/relay_server/
stake_table_tracker.rs1use std::{
2 collections::{BTreeSet, HashMap},
3 sync::Arc,
4};
5
6use alloy::primitives::U256;
7use async_lock::RwLock;
8use espresso_contract_deployer::network_config::{
9 fetch_epoch_config_from_sequencer, fetch_stake_table_from_sequencer,
10};
11use hotshot_types::{
12 data::EpochNumber,
13 light_client::StateVerKey,
14 stake_table::one_honest_threshold,
15 traits::{node_implementation::ConsensusTime, signature_key::StakeTableEntryType},
16 utils::epoch_from_block_number,
17};
18use url::Url;
19
20#[derive(Clone, Debug, Default)]
22pub struct StakeTableInfo {
23 pub threshold: U256,
25 pub known_nodes: HashMap<StateVerKey, U256>,
27}
28
29pub struct StakeTableTrackerInner {
31 sequencer_url: Url,
33
34 blocks_per_epoch: Option<u64>,
36
37 epoch_start_block: Option<u64>,
39
40 stake_table_infos: HashMap<u64, Arc<StakeTableInfo>>,
42
43 genesis_stake_table_info: Option<Arc<StakeTableInfo>>,
45
46 gc_queue: BTreeSet<u64>,
48}
49
50const PRUNE_GAP: u64 = 2;
52
53pub struct StakeTableTracker {
55 inner: Arc<RwLock<StakeTableTrackerInner>>,
56}
57
58impl StakeTableTracker {
59 pub fn new(sequencer_url: Url) -> Self {
60 Self {
61 inner: Arc::new(RwLock::new(StakeTableTrackerInner {
62 sequencer_url,
63 blocks_per_epoch: None,
64 epoch_start_block: None,
65 stake_table_infos: HashMap::new(),
66 genesis_stake_table_info: None,
67 gc_queue: BTreeSet::new(),
68 })),
69 }
70 }
71
72 pub async fn genesis_stake_table_info(&self) -> anyhow::Result<Arc<StakeTableInfo>> {
74 tracing::trace!("Acquire read lock for genesis stake table info");
75 let read_guard = self.inner.read().await;
76 if let Some(stake_table_info) = &read_guard.genesis_stake_table_info {
77 return Ok(stake_table_info.clone());
78 }
79 tracing::trace!("Drop read lock for genesis stake table info");
80 drop(read_guard);
81 tracing::trace!("Acquire write lock for genesis stake table info");
82 let mut write_guard = self.inner.write().await;
83
84 if let Some(stake_table_info) = &write_guard.genesis_stake_table_info {
85 return Ok(stake_table_info.clone());
86 }
87
88 let genesis_stake_table =
89 fetch_stake_table_from_sequencer(&write_guard.sequencer_url, None).await?;
90 let genesis_total_stake = genesis_stake_table.total_stakes();
91
92 tracing::debug!("Fetching genesis stake table from sequencer");
93 let genesis_stake_table_info = Arc::new(StakeTableInfo {
94 threshold: one_honest_threshold(genesis_total_stake),
95 known_nodes: genesis_stake_table
96 .into_iter()
97 .map(|entry| (entry.state_ver_key, entry.stake_table_entry.stake()))
98 .collect(),
99 });
100 tracing::debug!("Genesis stake table info updated");
101
102 write_guard.genesis_stake_table_info = Some(genesis_stake_table_info.clone());
103 tracing::trace!("Drop write lock for genesis stake table info");
104
105 Ok(genesis_stake_table_info)
106 }
107
108 pub async fn stake_table_info_for_block(
111 &self,
112 block_height: u64,
113 ) -> anyhow::Result<Arc<StakeTableInfo>> {
114 tracing::debug!("Fetch stake table for block {block_height}");
115
116 tracing::trace!("Acquire read lock for stake table info");
117 let read_guard = self.inner.read().await;
118 let (blocks_per_epoch, epoch_start_block) =
119 if let Some(blocks_per_epoch) = read_guard.blocks_per_epoch {
120 let epoch_start_block = read_guard.epoch_start_block.unwrap();
121 tracing::trace!("Drop read lock for stake table info");
122 drop(read_guard);
123 (blocks_per_epoch, epoch_start_block)
124 } else {
125 tracing::trace!("Drop read lock for stake table info");
126 drop(read_guard);
127 tracing::trace!("Acquire write lock for stake table info");
128 let mut write_guard = self.inner.write().await;
129 if let Some(blocks_per_epoch) = write_guard.blocks_per_epoch {
130 (blocks_per_epoch, write_guard.epoch_start_block.unwrap())
131 } else {
132 tracing::debug!("Fetching epoch config from sequencer");
133 let (blocks_per_epoch, epoch_start_block) =
134 fetch_epoch_config_from_sequencer(&write_guard.sequencer_url).await?;
135 write_guard.blocks_per_epoch.get_or_insert(blocks_per_epoch);
136 write_guard
137 .epoch_start_block
138 .get_or_insert(epoch_start_block);
139 tracing::debug!(
140 "Fetched epoch config from sequencer: blocks_per_epoch: {}, \
141 epoch_start_block: {}",
142 blocks_per_epoch,
143 epoch_start_block
144 );
145 tracing::trace!("Drop write lock for stake table info");
146 drop(write_guard);
147 (blocks_per_epoch, epoch_start_block)
148 }
149 };
150 if block_height <= epoch_start_block || blocks_per_epoch == 0 {
151 return self.genesis_stake_table_info().await;
152 }
153
154 let epoch = epoch_from_block_number(block_height, blocks_per_epoch);
155 tracing::trace!("Acquire read lock for stake table info");
156 let read_guard = self.inner.read().await;
157 if let Some(stake_table_info) = read_guard.stake_table_infos.get(&epoch) {
158 return Ok(stake_table_info.clone());
159 }
160 tracing::trace!("Drop read lock for stake table info");
161 drop(read_guard);
162 tracing::trace!("Acquire write lock for stake table info");
163 let mut write_guard = self.inner.write().await;
164 if let Some(stake_table_info) = write_guard.stake_table_infos.get(&epoch) {
165 return Ok(stake_table_info.clone());
166 }
167
168 tracing::debug!("Fetching stake table for epoch {} from sequencer", epoch);
169 let stake_table = fetch_stake_table_from_sequencer(
170 &write_guard.sequencer_url,
171 Some(EpochNumber::new(epoch)),
172 )
173 .await?;
174 let total_stake = stake_table.total_stakes();
175
176 let stake_table_info = Arc::new(StakeTableInfo {
177 threshold: one_honest_threshold(total_stake),
178 known_nodes: stake_table
179 .into_iter()
180 .map(|entry| (entry.state_ver_key, entry.stake_table_entry.stake()))
181 .collect(),
182 });
183
184 write_guard
185 .stake_table_infos
186 .insert(epoch, stake_table_info.clone());
187 write_guard.gc_queue.insert(epoch);
188 tracing::debug!("Stake table info for epoch {} updated", epoch);
189 while let Some(&old_epoch) = write_guard.gc_queue.first() {
191 if epoch < PRUNE_GAP || old_epoch >= epoch - PRUNE_GAP {
192 break;
193 }
194 write_guard.stake_table_infos.remove(&old_epoch);
195 write_guard.gc_queue.pop_first();
196 tracing::debug!(%old_epoch, "garbage collected for epoch");
197 }
198 tracing::trace!("Drop write lock for stake table info");
199
200 Ok(stake_table_info)
201 }
202}