sequencer/
state.rs

1use core::fmt::Debug;
2use std::{cmp::max, sync::Arc, time::Duration};
3
4use anyhow::{bail, ensure, Context};
5use either::Either;
6use espresso_types::{
7    traits::StateCatchup,
8    v0_3::{ChainConfig, RewardAccountV1, RewardMerkleTreeV1},
9    v0_4::{Delta, RewardAccountV2, RewardMerkleTreeV2},
10    BlockMerkleTree, DrbAndHeaderUpgradeVersion, EpochVersion, FeeAccount, FeeMerkleTree, Leaf2,
11    ValidatedState,
12};
13use futures::{future::Future, StreamExt};
14use hotshot::traits::ValidatedState as HotShotState;
15use hotshot_query_service::{
16    availability::{AvailabilityDataSource, LeafQueryData},
17    data_source::{storage::pruning::PrunedHeightDataSource, Transaction, VersionedDataSource},
18    merklized_state::{MerklizedStateHeightPersistence, UpdateStateData},
19    status::StatusDataSource,
20    types::HeightIndexed,
21};
22use jf_merkle_tree_compat::{
23    LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme,
24};
25use tokio::time::sleep;
26use vbs::version::StaticVersionType;
27
28use crate::{
29    api::RewardAccountProofDataSource,
30    catchup::{CatchupStorage, SqlStateCatchup},
31    persistence::ChainConfigPersistence,
32    NodeState, SeqTypes,
33};
34
35pub(crate) async fn compute_state_update(
36    parent_state: &ValidatedState,
37    instance: &NodeState,
38    peers: &impl StateCatchup,
39    parent_leaf: &Leaf2,
40    proposed_leaf: &Leaf2,
41) -> anyhow::Result<(ValidatedState, Delta)> {
42    let header = proposed_leaf.block_header();
43
44    let (state, delta, total_rewards_distributed) = parent_state
45        .apply_header(
46            instance,
47            peers,
48            parent_leaf,
49            header,
50            header.version(),
51            proposed_leaf.view_number(),
52        )
53        .await?;
54
55    // Check internal consistency.
56    ensure!(
57        state.chain_config.commit() == header.chain_config().commit(),
58        "internal error! in-memory chain config {:?} does not match header {:?}",
59        state.chain_config,
60        header.chain_config(),
61    );
62    ensure!(
63        state.block_merkle_tree.commitment() == header.block_merkle_tree_root(),
64        "internal error! in-memory block tree {} does not match header {}",
65        state.block_merkle_tree.commitment(),
66        header.block_merkle_tree_root()
67    );
68    ensure!(
69        state.fee_merkle_tree.commitment() == header.fee_merkle_tree_root(),
70        "internal error! in-memory fee tree {} does not match header {}",
71        state.fee_merkle_tree.commitment(),
72        header.fee_merkle_tree_root()
73    );
74
75    match header.reward_merkle_tree_root() {
76        Either::Left(v1_root) => {
77            ensure!(
78                state.reward_merkle_tree_v1.commitment() == v1_root,
79                "internal error! in-memory v1 reward tree {} does not match header {}",
80                state.reward_merkle_tree_v1.commitment(),
81                v1_root
82            )
83        },
84        Either::Right(v2_root) => {
85            ensure!(
86                state.reward_merkle_tree_v2.commitment() == v2_root,
87                "internal error! in-memory v2 reward tree {} does not match header {}",
88                state.reward_merkle_tree_v2.commitment(),
89                v2_root
90            )
91        },
92    }
93
94    if header.version() >= DrbAndHeaderUpgradeVersion::version() {
95        let Some(actual_total) = total_rewards_distributed else {
96            bail!(
97                "internal error! total_rewards_distributed is None for version {:?}",
98                header.version()
99            );
100        };
101
102        let Some(proposed_total) = header.total_reward_distributed() else {
103            bail!(
104                "internal error! proposed header.total_reward_distributed() is None for version \
105                 {:?}",
106                header.version()
107            );
108        };
109
110        ensure!(
111            proposed_total == actual_total,
112            "Total rewards mismatch: proposed header has {proposed_total} but actual total is \
113             {actual_total}",
114        );
115    }
116
117    Ok((state, delta))
118}
119
120async fn store_state_update(
121    tx: &mut impl SequencerStateUpdate,
122    block_number: u64,
123    version: vbs::version::Version,
124    state: &ValidatedState,
125    delta: Delta,
126) -> anyhow::Result<()> {
127    let ValidatedState {
128        fee_merkle_tree,
129        block_merkle_tree,
130        reward_merkle_tree_v2,
131        reward_merkle_tree_v1,
132        ..
133    } = state;
134    let Delta {
135        fees_delta,
136        rewards_delta,
137    } = delta;
138
139    // Collect fee merkle tree proofs for batch insertion
140    let fee_proofs: Vec<_> = fees_delta
141        .iter()
142        .map(|delta| {
143            let proof = match fee_merkle_tree.universal_lookup(*delta) {
144                LookupResult::Ok(_, proof) => proof,
145                LookupResult::NotFound(proof) => proof,
146                LookupResult::NotInMemory => bail!("missing merkle path for fee account {delta}"),
147            };
148            let path = FeeAccount::to_traversal_path(delta, fee_merkle_tree.height());
149            Ok((proof, path))
150        })
151        .collect::<anyhow::Result<Vec<_>>>()?;
152
153    tracing::debug!(count = fee_proofs.len(), "inserting fee accounts in batch");
154    UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes_batch(
155        tx,
156        fee_proofs,
157        block_number,
158    )
159    .await
160    .context("failed to store fee merkle nodes")?;
161
162    // Insert block merkle tree nodes
163    let (_, proof) = block_merkle_tree
164        .lookup(block_number - 1)
165        .expect_ok()
166        .context("getting blocks frontier")?;
167    let path = <u64 as ToTraversalPath<{ BlockMerkleTree::ARITY }>>::to_traversal_path(
168        &(block_number - 1),
169        block_merkle_tree.height(),
170    );
171
172    {
173        tracing::debug!("inserting blocks frontier");
174        UpdateStateData::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::insert_merkle_nodes(
175            tx,
176            proof,
177            path,
178            block_number,
179        )
180        .await
181        .context("failed to store block merkle nodes")?;
182    }
183
184    if version <= EpochVersion::version() {
185        // Collect reward merkle tree v1 proofs for batch insertion
186        let reward_proofs: Vec<_> = rewards_delta
187            .iter()
188            .map(|delta| {
189                let key = RewardAccountV1::from(*delta);
190                let proof = match reward_merkle_tree_v1.universal_lookup(key) {
191                    LookupResult::Ok(_, proof) => proof,
192                    LookupResult::NotFound(proof) => proof,
193                    LookupResult::NotInMemory => {
194                        bail!("missing merkle path for reward account {delta}")
195                    },
196                };
197                let path = <RewardAccountV1 as ToTraversalPath<
198                        { RewardMerkleTreeV1::ARITY },
199                    >>::to_traversal_path(
200                        &key, reward_merkle_tree_v1.height()
201                    );
202                Ok((proof, path))
203            })
204            .collect::<anyhow::Result<Vec<_>>>()?;
205
206        tracing::debug!(
207            count = reward_proofs.len(),
208            "inserting v1 reward accounts in batch"
209        );
210        UpdateStateData::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::insert_merkle_nodes_batch(
211            tx,
212            reward_proofs,
213            block_number,
214        )
215        .await
216        .context("failed to store reward merkle nodes")?;
217    } else {
218        // Collect reward merkle tree v2 proofs for batch insertion
219        let reward_proofs: Vec<_> = rewards_delta
220            .iter()
221            .map(|delta| {
222                let proof = match reward_merkle_tree_v2.universal_lookup(*delta) {
223                    LookupResult::Ok(_, proof) => proof,
224                    LookupResult::NotFound(proof) => proof,
225                    LookupResult::NotInMemory => {
226                        bail!("missing merkle path for reward account {delta}")
227                    },
228                };
229                let path = <RewardAccountV2 as ToTraversalPath<
230                        { RewardMerkleTreeV2::ARITY },
231                    >>::to_traversal_path(
232                        delta, reward_merkle_tree_v2.height()
233                    );
234                Ok((proof, path))
235            })
236            .collect::<anyhow::Result<Vec<_>>>()?;
237
238        tracing::debug!(
239            count = reward_proofs.len(),
240            "inserting v2 reward accounts in batch"
241        );
242        UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::insert_merkle_nodes_batch(
243            tx,
244            reward_proofs,
245            block_number,
246        )
247        .await
248        .context("failed to store reward merkle nodes")?;
249    }
250
251    tracing::debug!(block_number, "updating state height");
252    UpdateStateData::<SeqTypes, _, { BlockMerkleTree::ARITY }>::set_last_state_height(
253        tx,
254        block_number as usize,
255    )
256    .await
257    .context("setting state height")?;
258
259    Ok(())
260}
261
262#[tracing::instrument(
263    skip_all,
264    fields(
265        node_id = instance.node_id,
266        view = ?parent_leaf.leaf().view_number(),
267        height = parent_leaf.height(),
268    ),
269)]
270async fn update_state_storage<T>(
271    parent_state: &ValidatedState,
272    storage: &Arc<T>,
273    instance: &NodeState,
274    peers: &impl StateCatchup,
275    parent_leaf: &LeafQueryData<SeqTypes>,
276    proposed_leaf: &LeafQueryData<SeqTypes>,
277) -> anyhow::Result<ValidatedState>
278where
279    T: SequencerStateDataSource,
280    for<'a> T::Transaction<'a>: SequencerStateUpdate,
281{
282    let parent_chain_config = parent_state.chain_config;
283
284    let (state, delta) = compute_state_update(
285        parent_state,
286        instance,
287        peers,
288        &parent_leaf.leaf().clone(),
289        &proposed_leaf.leaf().clone(),
290    )
291    .await
292    .context("computing state update")?;
293
294    tracing::debug!("storing state update");
295    let mut tx = storage
296        .write()
297        .await
298        .context("opening transaction for state update")?;
299
300    store_state_update(
301        &mut tx,
302        proposed_leaf.height(),
303        proposed_leaf.header().version(),
304        &state,
305        delta,
306    )
307    .await?;
308
309    if parent_chain_config != state.chain_config {
310        let cf = state
311            .chain_config
312            .resolve()
313            .context("failed to resolve to chain config")?;
314
315        tx.insert_chain_config(cf).await?;
316    }
317
318    tx.commit().await?;
319    Ok(state)
320}
321
322async fn store_genesis_state<T>(
323    mut tx: T,
324    chain_config: ChainConfig,
325    state: &ValidatedState,
326) -> anyhow::Result<()>
327where
328    T: SequencerStateUpdate,
329{
330    ensure!(
331        state.block_merkle_tree.num_leaves() == 0,
332        "genesis state with non-empty block tree is unsupported"
333    );
334
335    // Insert fee merkle tree nodes
336    for (account, _) in state.fee_merkle_tree.iter() {
337        let proof = match state.fee_merkle_tree.universal_lookup(account) {
338            LookupResult::Ok(_, proof) => proof,
339            LookupResult::NotFound(proof) => proof,
340            LookupResult::NotInMemory => bail!("missing merkle path for fee account {account}"),
341        };
342        let path: Vec<usize> =
343            <FeeAccount as ToTraversalPath<{ FeeMerkleTree::ARITY }>>::to_traversal_path(
344                account,
345                state.fee_merkle_tree.height(),
346            );
347
348        UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes(
349            &mut tx, proof, path, 0,
350        )
351        .await
352        .context("failed to store fee merkle nodes")?;
353    }
354
355    tx.insert_chain_config(chain_config).await?;
356
357    tx.commit().await?;
358    Ok(())
359}
360
361#[tracing::instrument(skip_all)]
362pub(crate) async fn update_state_storage_loop<T>(
363    storage: Arc<T>,
364    instance: impl Future<Output = NodeState>,
365) -> anyhow::Result<()>
366where
367    T: SequencerStateDataSource,
368    for<'a> T::Transaction<'a>: SequencerStateUpdate,
369{
370    let instance = instance.await;
371    let peers = SqlStateCatchup::new(storage.clone(), Default::default());
372
373    // get last saved merklized state
374    let (last_height, parent_leaf, mut leaves) = {
375        let last_height = storage.get_last_state_height().await?;
376        let pruned_height = storage.load_pruned_height().await?;
377
378        let height = match pruned_height {
379            // If `last_height > pruned_height`, start from `last_height`
380            // as it represents the latest state in storage.
381            // If `pruned_height > last_height`, start from `pruned_height`
382            // as data below this height is no longer needed and will be pruned again during the next pruner run.
383            Some(pruned_height) => max(last_height, pruned_height as usize + 1),
384            // if we have not pruned any data then just start from last_height
385            None => last_height,
386        };
387
388        let current_height = storage.block_height().await?;
389        tracing::info!(
390            node_id = instance.node_id,
391            last_height,
392            current_height,
393            "updating state storage"
394        );
395
396        let parent_leaf = storage.get_leaf(height).await;
397        let leaves = storage.subscribe_leaves(height + 1).await;
398        (last_height, parent_leaf, leaves)
399    };
400    // resolve the parent leaf future _after_ dropping our lock on the state, in case it is not
401    // ready yet and another task needs a mutable lock on the state to produce the parent leaf.
402    let mut parent_leaf = parent_leaf.await;
403    let mut parent_state = ValidatedState::from_header(parent_leaf.header());
404
405    if last_height == 0 {
406        // If the last height is 0, we need to insert the genesis state, since this state is
407        // never the result of a state update and thus is not inserted in the loop below.
408        tracing::info!("storing genesis merklized state");
409        let tx = storage
410            .write()
411            .await
412            .context("starting transaction for genesis state")?;
413        store_genesis_state(tx, instance.chain_config, &instance.genesis_state)
414            .await
415            .context("storing genesis state")?;
416    }
417
418    while let Some(leaf) = leaves.next().await {
419        loop {
420            tracing::debug!(
421                height = leaf.height(),
422                node_id = instance.node_id,
423                ?leaf,
424                "updating persistent merklized state"
425            );
426            match update_state_storage(
427                &parent_state,
428                &storage,
429                &instance,
430                &peers,
431                &parent_leaf,
432                &leaf,
433            )
434            .await
435            {
436                Ok(state) => {
437                    parent_leaf = leaf;
438                    parent_state = state;
439                    break;
440                },
441                Err(err) => {
442                    tracing::error!(height = leaf.height(), "failed to update state: {err:#}");
443                    // If we fail, delay for a second and retry.
444                    sleep(Duration::from_secs(1)).await;
445                },
446            }
447        }
448    }
449
450    Ok(())
451}
452
453pub(crate) trait SequencerStateDataSource:
454    'static
455    + Debug
456    + AvailabilityDataSource<SeqTypes>
457    + StatusDataSource
458    + VersionedDataSource
459    + CatchupStorage
460    + RewardAccountProofDataSource
461    + PrunedHeightDataSource
462    + MerklizedStateHeightPersistence
463{
464}
465
466impl<T> SequencerStateDataSource for T where
467    T: 'static
468        + Debug
469        + AvailabilityDataSource<SeqTypes>
470        + StatusDataSource
471        + VersionedDataSource
472        + CatchupStorage
473        + RewardAccountProofDataSource
474        + PrunedHeightDataSource
475        + MerklizedStateHeightPersistence
476{
477}
478
479pub(crate) trait SequencerStateUpdate:
480    Transaction
481    + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
482    + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
483    + UpdateStateData<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>
484    + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
485    + ChainConfigPersistence
486{
487}
488
489impl<T> SequencerStateUpdate for T where
490    T: Transaction
491        + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
492        + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
493        + UpdateStateData<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>
494        + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
495        + ChainConfigPersistence
496{
497}