1use core::fmt::Debug;
2use std::{cmp::max, sync::Arc, time::Duration};
3
4use anyhow::{bail, ensure, Context};
5use either::Either;
6use espresso_types::{
7 traits::StateCatchup,
8 v0_3::{ChainConfig, RewardAccountV1, RewardMerkleTreeV1},
9 v0_4::{Delta, RewardAccountV2, RewardMerkleTreeV2},
10 BlockMerkleTree, EpochVersion, FeeAccount, FeeMerkleTree, Leaf2, ValidatedState,
11};
12use futures::{future::Future, StreamExt};
13use hotshot::traits::ValidatedState as HotShotState;
14use hotshot_query_service::{
15 availability::{AvailabilityDataSource, LeafQueryData},
16 data_source::{storage::pruning::PrunedHeightDataSource, Transaction, VersionedDataSource},
17 merklized_state::{MerklizedStateHeightPersistence, UpdateStateData},
18 status::StatusDataSource,
19 types::HeightIndexed,
20};
21use jf_merkle_tree::{LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme};
22use tokio::time::sleep;
23use vbs::version::StaticVersionType;
24
25use crate::{
26 catchup::{CatchupStorage, SqlStateCatchup},
27 persistence::ChainConfigPersistence,
28 NodeState, SeqTypes,
29};
30
31pub(crate) async fn compute_state_update(
32 state: &ValidatedState,
33 instance: &NodeState,
34 peers: &impl StateCatchup,
35 parent_leaf: &Leaf2,
36 proposed_leaf: &Leaf2,
37) -> anyhow::Result<(ValidatedState, Delta)> {
38 let header = proposed_leaf.block_header();
39
40 let parent_header = parent_leaf.block_header();
42 ensure!(
43 state.chain_config.commit() == parent_header.chain_config().commit(),
44 "internal error! in-memory chain config {:?} does not match parent header {:?}",
45 state.chain_config,
46 parent_header.chain_config(),
47 );
48 ensure!(
49 state.block_merkle_tree.commitment() == parent_header.block_merkle_tree_root(),
50 "internal error! in-memory block tree {:?} does not match parent header {:?}",
51 state.block_merkle_tree.commitment(),
52 parent_header.block_merkle_tree_root()
53 );
54 ensure!(
55 state.fee_merkle_tree.commitment() == parent_header.fee_merkle_tree_root(),
56 "internal error! in-memory fee tree {:?} does not match parent header {:?}",
57 state.fee_merkle_tree.commitment(),
58 parent_header.fee_merkle_tree_root()
59 );
60
61 match parent_header.reward_merkle_tree_root() {
62 Either::Left(v1_root) => {
63 ensure!(
64 state.reward_merkle_tree_v1.commitment() == v1_root,
65 "internal error! in-memory v1 reward tree {:?} does not match parent header {:?}",
66 state.reward_merkle_tree_v1.commitment(),
67 v1_root
68 )
69 },
70 Either::Right(v2_root) => {
71 ensure!(
72 state.reward_merkle_tree_v2.commitment() == v2_root,
73 "internal error! in-memory v2 reward tree {:?} does not match parent header {:?}",
74 state.reward_merkle_tree_v2.commitment(),
75 v2_root
76 )
77 },
78 }
79
80 state
81 .apply_header(
82 instance,
83 peers,
84 parent_leaf,
85 header,
86 header.version(),
87 proposed_leaf.view_number(),
88 )
89 .await
90}
91
92async fn store_state_update(
93 tx: &mut impl SequencerStateUpdate,
94 block_number: u64,
95 version: vbs::version::Version,
96 state: &ValidatedState,
97 delta: Delta,
98) -> anyhow::Result<()> {
99 let ValidatedState {
100 fee_merkle_tree,
101 block_merkle_tree,
102 reward_merkle_tree_v2,
103 reward_merkle_tree_v1,
104 ..
105 } = state;
106 let Delta {
107 fees_delta,
108 rewards_delta,
109 } = delta;
110
111 for delta in fees_delta {
113 let proof = match fee_merkle_tree.universal_lookup(delta) {
114 LookupResult::Ok(_, proof) => proof,
115 LookupResult::NotFound(proof) => proof,
116 LookupResult::NotInMemory => bail!("missing merkle path for fee account {delta}"),
117 };
118 let path: Vec<usize> =
119 <FeeAccount as ToTraversalPath<{ FeeMerkleTree::ARITY }>>::to_traversal_path(
120 &delta,
121 fee_merkle_tree.height(),
122 );
123
124 tracing::debug!(%delta, "inserting fee account");
125 UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes(
126 tx,
127 proof,
128 path,
129 block_number,
130 )
131 .await
132 .context("failed to store fee merkle nodes")?;
133 }
134
135 let (_, proof) = block_merkle_tree
137 .lookup(block_number - 1)
138 .expect_ok()
139 .context("getting blocks frontier")?;
140 let path = <u64 as ToTraversalPath<{ BlockMerkleTree::ARITY }>>::to_traversal_path(
141 &(block_number - 1),
142 block_merkle_tree.height(),
143 );
144
145 {
146 tracing::debug!("inserting blocks frontier");
147 UpdateStateData::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::insert_merkle_nodes(
148 tx,
149 proof,
150 path,
151 block_number,
152 )
153 .await
154 .context("failed to store block merkle nodes")?;
155 }
156
157 if version <= EpochVersion::version() {
158 for delta in rewards_delta {
159 let proof = match reward_merkle_tree_v1.universal_lookup(RewardAccountV1::from(delta)) {
160 LookupResult::Ok(_, proof) => proof,
161 LookupResult::NotFound(proof) => proof,
162 LookupResult::NotInMemory => {
163 bail!("missing merkle path for reward account {delta}")
164 },
165 };
166 let path: Vec<usize> = <RewardAccountV1 as ToTraversalPath<
167 { RewardMerkleTreeV1::ARITY },
168 >>::to_traversal_path(
169 &delta.into(), reward_merkle_tree_v1.height()
170 );
171
172 tracing::debug!(%delta, "inserting reward account");
173 UpdateStateData::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::insert_merkle_nodes(
174 tx,
175 proof,
176 path,
177 block_number,
178 )
179 .await
180 .context("failed to store reward merkle nodes")?;
181 }
182 } else {
183 for delta in rewards_delta {
184 let proof = match reward_merkle_tree_v2.universal_lookup(delta) {
185 LookupResult::Ok(_, proof) => proof,
186 LookupResult::NotFound(proof) => proof,
187 LookupResult::NotInMemory => {
188 bail!("missing merkle path for reward account {delta}")
189 },
190 };
191 let path: Vec<usize> = <RewardAccountV2 as ToTraversalPath<
192 { RewardMerkleTreeV2::ARITY },
193 >>::to_traversal_path(
194 &delta, reward_merkle_tree_v2.height()
195 );
196
197 tracing::debug!(%delta, "inserting reward account");
198 UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::insert_merkle_nodes(
199 tx,
200 proof,
201 path,
202 block_number,
203 )
204 .await
205 .context("failed to store reward merkle nodes")?;
206 }
207 }
208
209 tracing::debug!(block_number, "updating state height");
210 UpdateStateData::<SeqTypes, _, { BlockMerkleTree::ARITY }>::set_last_state_height(
211 tx,
212 block_number as usize,
213 )
214 .await
215 .context("setting state height")?;
216
217 Ok(())
218}
219
220#[tracing::instrument(
221 skip_all,
222 fields(
223 node_id = instance.node_id,
224 view = ?parent_leaf.leaf().view_number(),
225 height = parent_leaf.height(),
226 ),
227)]
228async fn update_state_storage<T>(
229 parent_state: &ValidatedState,
230 storage: &Arc<T>,
231 instance: &NodeState,
232 peers: &impl StateCatchup,
233 parent_leaf: &LeafQueryData<SeqTypes>,
234 proposed_leaf: &LeafQueryData<SeqTypes>,
235) -> anyhow::Result<ValidatedState>
236where
237 T: SequencerStateDataSource,
238 for<'a> T::Transaction<'a>: SequencerStateUpdate,
239{
240 let parent_chain_config = parent_state.chain_config;
241
242 let (state, delta) = compute_state_update(
243 parent_state,
244 instance,
245 peers,
246 &parent_leaf.leaf().clone(),
247 &proposed_leaf.leaf().clone(),
248 )
249 .await
250 .context("computing state update")?;
251
252 tracing::debug!("storing state update");
253 let mut tx = storage
254 .write()
255 .await
256 .context("opening transaction for state update")?;
257
258 store_state_update(
259 &mut tx,
260 proposed_leaf.height(),
261 proposed_leaf.header().version(),
262 &state,
263 delta,
264 )
265 .await?;
266
267 if parent_chain_config != state.chain_config {
268 let cf = state
269 .chain_config
270 .resolve()
271 .context("failed to resolve to chain config")?;
272
273 tx.insert_chain_config(cf).await?;
274 }
275
276 tx.commit().await?;
277 Ok(state)
278}
279
280async fn store_genesis_state<T>(
281 mut tx: T,
282 chain_config: ChainConfig,
283 state: &ValidatedState,
284) -> anyhow::Result<()>
285where
286 T: SequencerStateUpdate,
287{
288 ensure!(
289 state.block_merkle_tree.num_leaves() == 0,
290 "genesis state with non-empty block tree is unsupported"
291 );
292
293 for (account, _) in state.fee_merkle_tree.iter() {
295 let proof = match state.fee_merkle_tree.universal_lookup(account) {
296 LookupResult::Ok(_, proof) => proof,
297 LookupResult::NotFound(proof) => proof,
298 LookupResult::NotInMemory => bail!("missing merkle path for fee account {account}"),
299 };
300 let path: Vec<usize> =
301 <FeeAccount as ToTraversalPath<{ FeeMerkleTree::ARITY }>>::to_traversal_path(
302 account,
303 state.fee_merkle_tree.height(),
304 );
305
306 UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes(
307 &mut tx, proof, path, 0,
308 )
309 .await
310 .context("failed to store fee merkle nodes")?;
311 }
312
313 tx.insert_chain_config(chain_config).await?;
314
315 tx.commit().await?;
316 Ok(())
317}
318
319#[tracing::instrument(skip_all)]
320pub(crate) async fn update_state_storage_loop<T>(
321 storage: Arc<T>,
322 instance: impl Future<Output = NodeState>,
323) -> anyhow::Result<()>
324where
325 T: SequencerStateDataSource,
326 for<'a> T::Transaction<'a>: SequencerStateUpdate,
327{
328 let instance = instance.await;
329 let peers = SqlStateCatchup::new(storage.clone(), Default::default());
330
331 let (last_height, parent_leaf, mut leaves) = {
333 let last_height = storage.get_last_state_height().await?;
334 let pruned_height = storage.load_pruned_height().await?;
335
336 let height = match pruned_height {
337 Some(pruned_height) => max(last_height, pruned_height as usize + 1),
342 None => last_height,
344 };
345
346 let current_height = storage.block_height().await?;
347 tracing::info!(
348 node_id = instance.node_id,
349 last_height,
350 current_height,
351 "updating state storage"
352 );
353
354 let parent_leaf = storage.get_leaf(height).await;
355 let leaves = storage.subscribe_leaves(height + 1).await;
356 (last_height, parent_leaf, leaves)
357 };
358 let mut parent_leaf = parent_leaf.await;
361 let mut parent_state = ValidatedState::from_header(parent_leaf.header());
362
363 if last_height == 0 {
364 tracing::info!("storing genesis merklized state");
367 let tx = storage
368 .write()
369 .await
370 .context("starting transaction for genesis state")?;
371 store_genesis_state(tx, instance.chain_config, &instance.genesis_state)
372 .await
373 .context("storing genesis state")?;
374 }
375
376 while let Some(leaf) = leaves.next().await {
377 loop {
378 tracing::debug!(
379 height = leaf.height(),
380 node_id = instance.node_id,
381 ?leaf,
382 "updating persistent merklized state"
383 );
384 match update_state_storage(
385 &parent_state,
386 &storage,
387 &instance,
388 &peers,
389 &parent_leaf,
390 &leaf,
391 )
392 .await
393 {
394 Ok(state) => {
395 parent_leaf = leaf;
396 parent_state = state;
397 break;
398 },
399 Err(err) => {
400 tracing::error!(height = leaf.height(), "failed to update state: {err:#}");
401 sleep(Duration::from_secs(1)).await;
403 },
404 }
405 }
406 }
407
408 Ok(())
409}
410
411pub(crate) trait SequencerStateDataSource:
412 'static
413 + Debug
414 + AvailabilityDataSource<SeqTypes>
415 + StatusDataSource
416 + VersionedDataSource
417 + CatchupStorage
418 + PrunedHeightDataSource
419 + MerklizedStateHeightPersistence
420{
421}
422
423impl<T> SequencerStateDataSource for T where
424 T: 'static
425 + Debug
426 + AvailabilityDataSource<SeqTypes>
427 + StatusDataSource
428 + VersionedDataSource
429 + CatchupStorage
430 + PrunedHeightDataSource
431 + MerklizedStateHeightPersistence
432{
433}
434
435pub(crate) trait SequencerStateUpdate:
436 Transaction
437 + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
438 + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
439 + UpdateStateData<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>
440 + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
441 + ChainConfigPersistence
442{
443}
444
445impl<T> SequencerStateUpdate for T where
446 T: Transaction
447 + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
448 + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
449 + UpdateStateData<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>
450 + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
451 + ChainConfigPersistence
452{
453}