1use core::fmt::Debug;
2use std::{cmp::max, sync::Arc, time::Duration};
3
4use anyhow::{bail, ensure, Context};
5use either::Either;
6use espresso_types::{
7 traits::StateCatchup,
8 v0_3::{ChainConfig, RewardAccountV1, RewardMerkleTreeV1},
9 v0_4::{Delta, RewardAccountV2, RewardMerkleTreeV2},
10 BlockMerkleTree, DrbAndHeaderUpgradeVersion, EpochVersion, FeeAccount, FeeMerkleTree, Leaf2,
11 ValidatedState,
12};
13use futures::{future::Future, StreamExt};
14use hotshot::traits::ValidatedState as HotShotState;
15use hotshot_query_service::{
16 availability::{AvailabilityDataSource, LeafQueryData},
17 data_source::{storage::pruning::PrunedHeightDataSource, Transaction, VersionedDataSource},
18 merklized_state::{MerklizedStateHeightPersistence, UpdateStateData},
19 status::StatusDataSource,
20 types::HeightIndexed,
21};
22use jf_merkle_tree_compat::{
23 LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme,
24};
25use tokio::time::sleep;
26use vbs::version::StaticVersionType;
27
28use crate::{
29 api::RewardAccountProofDataSource,
30 catchup::{CatchupStorage, SqlStateCatchup},
31 persistence::ChainConfigPersistence,
32 NodeState, SeqTypes,
33};
34
35pub(crate) async fn compute_state_update(
36 parent_state: &ValidatedState,
37 instance: &NodeState,
38 peers: &impl StateCatchup,
39 parent_leaf: &Leaf2,
40 proposed_leaf: &Leaf2,
41) -> anyhow::Result<(ValidatedState, Delta)> {
42 let header = proposed_leaf.block_header();
43
44 let (state, delta, total_rewards_distributed) = parent_state
45 .apply_header(
46 instance,
47 peers,
48 parent_leaf,
49 header,
50 header.version(),
51 proposed_leaf.view_number(),
52 )
53 .await?;
54
55 ensure!(
57 state.chain_config.commit() == header.chain_config().commit(),
58 "internal error! in-memory chain config {:?} does not match header {:?}",
59 state.chain_config,
60 header.chain_config(),
61 );
62 ensure!(
63 state.block_merkle_tree.commitment() == header.block_merkle_tree_root(),
64 "internal error! in-memory block tree {} does not match header {}",
65 state.block_merkle_tree.commitment(),
66 header.block_merkle_tree_root()
67 );
68 ensure!(
69 state.fee_merkle_tree.commitment() == header.fee_merkle_tree_root(),
70 "internal error! in-memory fee tree {} does not match header {}",
71 state.fee_merkle_tree.commitment(),
72 header.fee_merkle_tree_root()
73 );
74
75 match header.reward_merkle_tree_root() {
76 Either::Left(v1_root) => {
77 ensure!(
78 state.reward_merkle_tree_v1.commitment() == v1_root,
79 "internal error! in-memory v1 reward tree {} does not match header {}",
80 state.reward_merkle_tree_v1.commitment(),
81 v1_root
82 )
83 },
84 Either::Right(v2_root) => {
85 ensure!(
86 state.reward_merkle_tree_v2.commitment() == v2_root,
87 "internal error! in-memory v2 reward tree {} does not match header {}",
88 state.reward_merkle_tree_v2.commitment(),
89 v2_root
90 )
91 },
92 }
93
94 if header.version() >= DrbAndHeaderUpgradeVersion::version() {
95 let Some(actual_total) = total_rewards_distributed else {
96 bail!(
97 "internal error! total_rewards_distributed is None for version {:?}",
98 header.version()
99 );
100 };
101
102 let Some(proposed_total) = header.total_reward_distributed() else {
103 bail!(
104 "internal error! proposed header.total_reward_distributed() is None for version \
105 {:?}",
106 header.version()
107 );
108 };
109
110 ensure!(
111 proposed_total == actual_total,
112 "Total rewards mismatch: proposed header has {proposed_total} but actual total is \
113 {actual_total}",
114 );
115 }
116
117 Ok((state, delta))
118}
119
120async fn store_state_update(
121 tx: &mut impl SequencerStateUpdate,
122 block_number: u64,
123 version: vbs::version::Version,
124 state: &ValidatedState,
125 delta: Delta,
126) -> anyhow::Result<()> {
127 let ValidatedState {
128 fee_merkle_tree,
129 block_merkle_tree,
130 reward_merkle_tree_v2,
131 reward_merkle_tree_v1,
132 ..
133 } = state;
134 let Delta {
135 fees_delta,
136 rewards_delta,
137 } = delta;
138
139 let fee_proofs: Vec<_> = fees_delta
141 .iter()
142 .map(|delta| {
143 let proof = match fee_merkle_tree.universal_lookup(*delta) {
144 LookupResult::Ok(_, proof) => proof,
145 LookupResult::NotFound(proof) => proof,
146 LookupResult::NotInMemory => bail!("missing merkle path for fee account {delta}"),
147 };
148 let path = FeeAccount::to_traversal_path(delta, fee_merkle_tree.height());
149 Ok((proof, path))
150 })
151 .collect::<anyhow::Result<Vec<_>>>()?;
152
153 tracing::debug!(count = fee_proofs.len(), "inserting fee accounts in batch");
154 UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes_batch(
155 tx,
156 fee_proofs,
157 block_number,
158 )
159 .await
160 .context("failed to store fee merkle nodes")?;
161
162 let (_, proof) = block_merkle_tree
164 .lookup(block_number - 1)
165 .expect_ok()
166 .context("getting blocks frontier")?;
167 let path = <u64 as ToTraversalPath<{ BlockMerkleTree::ARITY }>>::to_traversal_path(
168 &(block_number - 1),
169 block_merkle_tree.height(),
170 );
171
172 {
173 tracing::debug!("inserting blocks frontier");
174 UpdateStateData::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::insert_merkle_nodes(
175 tx,
176 proof,
177 path,
178 block_number,
179 )
180 .await
181 .context("failed to store block merkle nodes")?;
182 }
183
184 if version <= EpochVersion::version() {
185 let reward_proofs: Vec<_> = rewards_delta
187 .iter()
188 .map(|delta| {
189 let key = RewardAccountV1::from(*delta);
190 let proof = match reward_merkle_tree_v1.universal_lookup(key) {
191 LookupResult::Ok(_, proof) => proof,
192 LookupResult::NotFound(proof) => proof,
193 LookupResult::NotInMemory => {
194 bail!("missing merkle path for reward account {delta}")
195 },
196 };
197 let path = <RewardAccountV1 as ToTraversalPath<
198 { RewardMerkleTreeV1::ARITY },
199 >>::to_traversal_path(
200 &key, reward_merkle_tree_v1.height()
201 );
202 Ok((proof, path))
203 })
204 .collect::<anyhow::Result<Vec<_>>>()?;
205
206 tracing::debug!(
207 count = reward_proofs.len(),
208 "inserting v1 reward accounts in batch"
209 );
210 UpdateStateData::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::insert_merkle_nodes_batch(
211 tx,
212 reward_proofs,
213 block_number,
214 )
215 .await
216 .context("failed to store reward merkle nodes")?;
217 } else {
218 let reward_proofs: Vec<_> = rewards_delta
220 .iter()
221 .map(|delta| {
222 let proof = match reward_merkle_tree_v2.universal_lookup(*delta) {
223 LookupResult::Ok(_, proof) => proof,
224 LookupResult::NotFound(proof) => proof,
225 LookupResult::NotInMemory => {
226 bail!("missing merkle path for reward account {delta}")
227 },
228 };
229 let path = <RewardAccountV2 as ToTraversalPath<
230 { RewardMerkleTreeV2::ARITY },
231 >>::to_traversal_path(
232 delta, reward_merkle_tree_v2.height()
233 );
234 Ok((proof, path))
235 })
236 .collect::<anyhow::Result<Vec<_>>>()?;
237
238 tracing::debug!(
239 count = reward_proofs.len(),
240 "inserting v2 reward accounts in batch"
241 );
242 UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::insert_merkle_nodes_batch(
243 tx,
244 reward_proofs,
245 block_number,
246 )
247 .await
248 .context("failed to store reward merkle nodes")?;
249 }
250
251 tracing::debug!(block_number, "updating state height");
252 UpdateStateData::<SeqTypes, _, { BlockMerkleTree::ARITY }>::set_last_state_height(
253 tx,
254 block_number as usize,
255 )
256 .await
257 .context("setting state height")?;
258
259 Ok(())
260}
261
262#[tracing::instrument(
263 skip_all,
264 fields(
265 node_id = instance.node_id,
266 view = ?parent_leaf.leaf().view_number(),
267 height = parent_leaf.height(),
268 ),
269)]
270async fn update_state_storage<T>(
271 parent_state: &ValidatedState,
272 storage: &Arc<T>,
273 instance: &NodeState,
274 peers: &impl StateCatchup,
275 parent_leaf: &LeafQueryData<SeqTypes>,
276 proposed_leaf: &LeafQueryData<SeqTypes>,
277) -> anyhow::Result<ValidatedState>
278where
279 T: SequencerStateDataSource,
280 for<'a> T::Transaction<'a>: SequencerStateUpdate,
281{
282 let parent_chain_config = parent_state.chain_config;
283
284 let (state, delta) = compute_state_update(
285 parent_state,
286 instance,
287 peers,
288 &parent_leaf.leaf().clone(),
289 &proposed_leaf.leaf().clone(),
290 )
291 .await
292 .context("computing state update")?;
293
294 tracing::debug!("storing state update");
295 let mut tx = storage
296 .write()
297 .await
298 .context("opening transaction for state update")?;
299
300 store_state_update(
301 &mut tx,
302 proposed_leaf.height(),
303 proposed_leaf.header().version(),
304 &state,
305 delta,
306 )
307 .await?;
308
309 if parent_chain_config != state.chain_config {
310 let cf = state
311 .chain_config
312 .resolve()
313 .context("failed to resolve to chain config")?;
314
315 tx.insert_chain_config(cf).await?;
316 }
317
318 tx.commit().await?;
319 Ok(state)
320}
321
322async fn store_genesis_state<T>(
323 mut tx: T,
324 chain_config: ChainConfig,
325 state: &ValidatedState,
326) -> anyhow::Result<()>
327where
328 T: SequencerStateUpdate,
329{
330 ensure!(
331 state.block_merkle_tree.num_leaves() == 0,
332 "genesis state with non-empty block tree is unsupported"
333 );
334
335 for (account, _) in state.fee_merkle_tree.iter() {
337 let proof = match state.fee_merkle_tree.universal_lookup(account) {
338 LookupResult::Ok(_, proof) => proof,
339 LookupResult::NotFound(proof) => proof,
340 LookupResult::NotInMemory => bail!("missing merkle path for fee account {account}"),
341 };
342 let path: Vec<usize> =
343 <FeeAccount as ToTraversalPath<{ FeeMerkleTree::ARITY }>>::to_traversal_path(
344 account,
345 state.fee_merkle_tree.height(),
346 );
347
348 UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes(
349 &mut tx, proof, path, 0,
350 )
351 .await
352 .context("failed to store fee merkle nodes")?;
353 }
354
355 tx.insert_chain_config(chain_config).await?;
356
357 tx.commit().await?;
358 Ok(())
359}
360
361#[tracing::instrument(skip_all)]
362pub(crate) async fn update_state_storage_loop<T>(
363 storage: Arc<T>,
364 instance: impl Future<Output = NodeState>,
365) -> anyhow::Result<()>
366where
367 T: SequencerStateDataSource,
368 for<'a> T::Transaction<'a>: SequencerStateUpdate,
369{
370 let instance = instance.await;
371 let peers = SqlStateCatchup::new(storage.clone(), Default::default());
372
373 let (last_height, parent_leaf, mut leaves) = {
375 let last_height = storage.get_last_state_height().await?;
376 let pruned_height = storage.load_pruned_height().await?;
377
378 let height = match pruned_height {
379 Some(pruned_height) => max(last_height, pruned_height as usize + 1),
384 None => last_height,
386 };
387
388 let current_height = storage.block_height().await?;
389 tracing::info!(
390 node_id = instance.node_id,
391 last_height,
392 current_height,
393 "updating state storage"
394 );
395
396 let parent_leaf = storage.get_leaf(height).await;
397 let leaves = storage.subscribe_leaves(height + 1).await;
398 (last_height, parent_leaf, leaves)
399 };
400 let mut parent_leaf = parent_leaf.await;
403 let mut parent_state = ValidatedState::from_header(parent_leaf.header());
404
405 if last_height == 0 {
406 tracing::info!("storing genesis merklized state");
409 let tx = storage
410 .write()
411 .await
412 .context("starting transaction for genesis state")?;
413 store_genesis_state(tx, instance.chain_config, &instance.genesis_state)
414 .await
415 .context("storing genesis state")?;
416 }
417
418 while let Some(leaf) = leaves.next().await {
419 loop {
420 tracing::debug!(
421 height = leaf.height(),
422 node_id = instance.node_id,
423 ?leaf,
424 "updating persistent merklized state"
425 );
426 match update_state_storage(
427 &parent_state,
428 &storage,
429 &instance,
430 &peers,
431 &parent_leaf,
432 &leaf,
433 )
434 .await
435 {
436 Ok(state) => {
437 parent_leaf = leaf;
438 parent_state = state;
439 break;
440 },
441 Err(err) => {
442 tracing::error!(height = leaf.height(), "failed to update state: {err:#}");
443 sleep(Duration::from_secs(1)).await;
445 },
446 }
447 }
448 }
449
450 Ok(())
451}
452
453pub(crate) trait SequencerStateDataSource:
454 'static
455 + Debug
456 + AvailabilityDataSource<SeqTypes>
457 + StatusDataSource
458 + VersionedDataSource
459 + CatchupStorage
460 + RewardAccountProofDataSource
461 + PrunedHeightDataSource
462 + MerklizedStateHeightPersistence
463{
464}
465
466impl<T> SequencerStateDataSource for T where
467 T: 'static
468 + Debug
469 + AvailabilityDataSource<SeqTypes>
470 + StatusDataSource
471 + VersionedDataSource
472 + CatchupStorage
473 + RewardAccountProofDataSource
474 + PrunedHeightDataSource
475 + MerklizedStateHeightPersistence
476{
477}
478
479pub(crate) trait SequencerStateUpdate:
480 Transaction
481 + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
482 + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
483 + UpdateStateData<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>
484 + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
485 + ChainConfigPersistence
486{
487}
488
489impl<T> SequencerStateUpdate for T where
490 T: Transaction
491 + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
492 + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
493 + UpdateStateData<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>
494 + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
495 + ChainConfigPersistence
496{
497}