sequencer/
state.rs

1use core::fmt::Debug;
2use std::{cmp::max, sync::Arc, time::Duration};
3
4use anyhow::{bail, ensure, Context};
5use either::Either;
6use espresso_types::{
7    traits::StateCatchup,
8    v0_3::{ChainConfig, RewardAccountV1, RewardMerkleTreeV1},
9    v0_4::{Delta, RewardAccountV2, RewardMerkleTreeV2},
10    BlockMerkleTree, EpochVersion, FeeAccount, FeeMerkleTree, Leaf2, ValidatedState,
11};
12use futures::{future::Future, StreamExt};
13use hotshot::traits::ValidatedState as HotShotState;
14use hotshot_query_service::{
15    availability::{AvailabilityDataSource, LeafQueryData},
16    data_source::{storage::pruning::PrunedHeightDataSource, Transaction, VersionedDataSource},
17    merklized_state::{MerklizedStateHeightPersistence, UpdateStateData},
18    status::StatusDataSource,
19    types::HeightIndexed,
20};
21use jf_merkle_tree::{LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme};
22use tokio::time::sleep;
23use vbs::version::StaticVersionType;
24
25use crate::{
26    catchup::{CatchupStorage, SqlStateCatchup},
27    persistence::ChainConfigPersistence,
28    NodeState, SeqTypes,
29};
30
31pub(crate) async fn compute_state_update(
32    state: &ValidatedState,
33    instance: &NodeState,
34    peers: &impl StateCatchup,
35    parent_leaf: &Leaf2,
36    proposed_leaf: &Leaf2,
37) -> anyhow::Result<(ValidatedState, Delta)> {
38    let header = proposed_leaf.block_header();
39
40    // Check internal consistency.
41    let parent_header = parent_leaf.block_header();
42    ensure!(
43        state.chain_config.commit() == parent_header.chain_config().commit(),
44        "internal error! in-memory chain config {:?} does not match parent header {:?}",
45        state.chain_config,
46        parent_header.chain_config(),
47    );
48    ensure!(
49        state.block_merkle_tree.commitment() == parent_header.block_merkle_tree_root(),
50        "internal error! in-memory block tree {:?} does not match parent header {:?}",
51        state.block_merkle_tree.commitment(),
52        parent_header.block_merkle_tree_root()
53    );
54    ensure!(
55        state.fee_merkle_tree.commitment() == parent_header.fee_merkle_tree_root(),
56        "internal error! in-memory fee tree {:?} does not match parent header {:?}",
57        state.fee_merkle_tree.commitment(),
58        parent_header.fee_merkle_tree_root()
59    );
60
61    match parent_header.reward_merkle_tree_root() {
62        Either::Left(v1_root) => {
63            ensure!(
64                state.reward_merkle_tree_v1.commitment() == v1_root,
65                "internal error! in-memory v1 reward tree {:?} does not match parent header {:?}",
66                state.reward_merkle_tree_v1.commitment(),
67                v1_root
68            )
69        },
70        Either::Right(v2_root) => {
71            ensure!(
72                state.reward_merkle_tree_v2.commitment() == v2_root,
73                "internal error! in-memory v2 reward tree {:?} does not match parent header {:?}",
74                state.reward_merkle_tree_v2.commitment(),
75                v2_root
76            )
77        },
78    }
79
80    state
81        .apply_header(
82            instance,
83            peers,
84            parent_leaf,
85            header,
86            header.version(),
87            proposed_leaf.view_number(),
88        )
89        .await
90}
91
92async fn store_state_update(
93    tx: &mut impl SequencerStateUpdate,
94    block_number: u64,
95    version: vbs::version::Version,
96    state: &ValidatedState,
97    delta: Delta,
98) -> anyhow::Result<()> {
99    let ValidatedState {
100        fee_merkle_tree,
101        block_merkle_tree,
102        reward_merkle_tree_v2,
103        reward_merkle_tree_v1,
104        ..
105    } = state;
106    let Delta {
107        fees_delta,
108        rewards_delta,
109    } = delta;
110
111    // Insert fee merkle tree nodes
112    for delta in fees_delta {
113        let proof = match fee_merkle_tree.universal_lookup(delta) {
114            LookupResult::Ok(_, proof) => proof,
115            LookupResult::NotFound(proof) => proof,
116            LookupResult::NotInMemory => bail!("missing merkle path for fee account {delta}"),
117        };
118        let path: Vec<usize> =
119            <FeeAccount as ToTraversalPath<{ FeeMerkleTree::ARITY }>>::to_traversal_path(
120                &delta,
121                fee_merkle_tree.height(),
122            );
123
124        tracing::debug!(%delta, "inserting fee account");
125        UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes(
126            tx,
127            proof,
128            path,
129            block_number,
130        )
131        .await
132        .context("failed to store fee merkle nodes")?;
133    }
134
135    // Insert block merkle tree nodes
136    let (_, proof) = block_merkle_tree
137        .lookup(block_number - 1)
138        .expect_ok()
139        .context("getting blocks frontier")?;
140    let path = <u64 as ToTraversalPath<{ BlockMerkleTree::ARITY }>>::to_traversal_path(
141        &(block_number - 1),
142        block_merkle_tree.height(),
143    );
144
145    {
146        tracing::debug!("inserting blocks frontier");
147        UpdateStateData::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::insert_merkle_nodes(
148            tx,
149            proof,
150            path,
151            block_number,
152        )
153        .await
154        .context("failed to store block merkle nodes")?;
155    }
156
157    if version <= EpochVersion::version() {
158        for delta in rewards_delta {
159            let proof = match reward_merkle_tree_v1.universal_lookup(RewardAccountV1::from(delta)) {
160                LookupResult::Ok(_, proof) => proof,
161                LookupResult::NotFound(proof) => proof,
162                LookupResult::NotInMemory => {
163                    bail!("missing merkle path for reward account {delta}")
164                },
165            };
166            let path: Vec<usize> = <RewardAccountV1 as ToTraversalPath<
167                { RewardMerkleTreeV1::ARITY },
168            >>::to_traversal_path(
169                &delta.into(), reward_merkle_tree_v1.height()
170            );
171
172            tracing::debug!(%delta, "inserting reward account");
173            UpdateStateData::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::insert_merkle_nodes(
174            tx,
175            proof,
176            path,
177            block_number,
178        )
179        .await
180        .context("failed to store reward merkle nodes")?;
181        }
182    } else {
183        for delta in rewards_delta {
184            let proof = match reward_merkle_tree_v2.universal_lookup(delta) {
185                LookupResult::Ok(_, proof) => proof,
186                LookupResult::NotFound(proof) => proof,
187                LookupResult::NotInMemory => {
188                    bail!("missing merkle path for reward account {delta}")
189                },
190            };
191            let path: Vec<usize> = <RewardAccountV2 as ToTraversalPath<
192                { RewardMerkleTreeV2::ARITY },
193            >>::to_traversal_path(
194                &delta, reward_merkle_tree_v2.height()
195            );
196
197            tracing::debug!(%delta, "inserting reward account");
198            UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::insert_merkle_nodes(
199            tx,
200            proof,
201            path,
202            block_number,
203        )
204        .await
205        .context("failed to store reward merkle nodes")?;
206        }
207    }
208
209    tracing::debug!(block_number, "updating state height");
210    UpdateStateData::<SeqTypes, _, { BlockMerkleTree::ARITY }>::set_last_state_height(
211        tx,
212        block_number as usize,
213    )
214    .await
215    .context("setting state height")?;
216
217    Ok(())
218}
219
220#[tracing::instrument(
221    skip_all,
222    fields(
223        node_id = instance.node_id,
224        view = ?parent_leaf.leaf().view_number(),
225        height = parent_leaf.height(),
226    ),
227)]
228async fn update_state_storage<T>(
229    parent_state: &ValidatedState,
230    storage: &Arc<T>,
231    instance: &NodeState,
232    peers: &impl StateCatchup,
233    parent_leaf: &LeafQueryData<SeqTypes>,
234    proposed_leaf: &LeafQueryData<SeqTypes>,
235) -> anyhow::Result<ValidatedState>
236where
237    T: SequencerStateDataSource,
238    for<'a> T::Transaction<'a>: SequencerStateUpdate,
239{
240    let parent_chain_config = parent_state.chain_config;
241
242    let (state, delta) = compute_state_update(
243        parent_state,
244        instance,
245        peers,
246        &parent_leaf.leaf().clone(),
247        &proposed_leaf.leaf().clone(),
248    )
249    .await
250    .context("computing state update")?;
251
252    tracing::debug!("storing state update");
253    let mut tx = storage
254        .write()
255        .await
256        .context("opening transaction for state update")?;
257
258    store_state_update(
259        &mut tx,
260        proposed_leaf.height(),
261        proposed_leaf.header().version(),
262        &state,
263        delta,
264    )
265    .await?;
266
267    if parent_chain_config != state.chain_config {
268        let cf = state
269            .chain_config
270            .resolve()
271            .context("failed to resolve to chain config")?;
272
273        tx.insert_chain_config(cf).await?;
274    }
275
276    tx.commit().await?;
277    Ok(state)
278}
279
280async fn store_genesis_state<T>(
281    mut tx: T,
282    chain_config: ChainConfig,
283    state: &ValidatedState,
284) -> anyhow::Result<()>
285where
286    T: SequencerStateUpdate,
287{
288    ensure!(
289        state.block_merkle_tree.num_leaves() == 0,
290        "genesis state with non-empty block tree is unsupported"
291    );
292
293    // Insert fee merkle tree nodes
294    for (account, _) in state.fee_merkle_tree.iter() {
295        let proof = match state.fee_merkle_tree.universal_lookup(account) {
296            LookupResult::Ok(_, proof) => proof,
297            LookupResult::NotFound(proof) => proof,
298            LookupResult::NotInMemory => bail!("missing merkle path for fee account {account}"),
299        };
300        let path: Vec<usize> =
301            <FeeAccount as ToTraversalPath<{ FeeMerkleTree::ARITY }>>::to_traversal_path(
302                account,
303                state.fee_merkle_tree.height(),
304            );
305
306        UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes(
307            &mut tx, proof, path, 0,
308        )
309        .await
310        .context("failed to store fee merkle nodes")?;
311    }
312
313    tx.insert_chain_config(chain_config).await?;
314
315    tx.commit().await?;
316    Ok(())
317}
318
319#[tracing::instrument(skip_all)]
320pub(crate) async fn update_state_storage_loop<T>(
321    storage: Arc<T>,
322    instance: impl Future<Output = NodeState>,
323) -> anyhow::Result<()>
324where
325    T: SequencerStateDataSource,
326    for<'a> T::Transaction<'a>: SequencerStateUpdate,
327{
328    let instance = instance.await;
329    let peers = SqlStateCatchup::new(storage.clone(), Default::default());
330
331    // get last saved merklized state
332    let (last_height, parent_leaf, mut leaves) = {
333        let last_height = storage.get_last_state_height().await?;
334        let pruned_height = storage.load_pruned_height().await?;
335
336        let height = match pruned_height {
337            // If `last_height > pruned_height`, start from `last_height`
338            // as it represents the latest state in storage.
339            // If `pruned_height > last_height`, start from `pruned_height`
340            // as data below this height is no longer needed and will be pruned again during the next pruner run.
341            Some(pruned_height) => max(last_height, pruned_height as usize + 1),
342            // if we have not pruned any data then just start from last_height
343            None => last_height,
344        };
345
346        let current_height = storage.block_height().await?;
347        tracing::info!(
348            node_id = instance.node_id,
349            last_height,
350            current_height,
351            "updating state storage"
352        );
353
354        let parent_leaf = storage.get_leaf(height).await;
355        let leaves = storage.subscribe_leaves(height + 1).await;
356        (last_height, parent_leaf, leaves)
357    };
358    // resolve the parent leaf future _after_ dropping our lock on the state, in case it is not
359    // ready yet and another task needs a mutable lock on the state to produce the parent leaf.
360    let mut parent_leaf = parent_leaf.await;
361    let mut parent_state = ValidatedState::from_header(parent_leaf.header());
362
363    if last_height == 0 {
364        // If the last height is 0, we need to insert the genesis state, since this state is
365        // never the result of a state update and thus is not inserted in the loop below.
366        tracing::info!("storing genesis merklized state");
367        let tx = storage
368            .write()
369            .await
370            .context("starting transaction for genesis state")?;
371        store_genesis_state(tx, instance.chain_config, &instance.genesis_state)
372            .await
373            .context("storing genesis state")?;
374    }
375
376    while let Some(leaf) = leaves.next().await {
377        loop {
378            tracing::debug!(
379                height = leaf.height(),
380                node_id = instance.node_id,
381                ?leaf,
382                "updating persistent merklized state"
383            );
384            match update_state_storage(
385                &parent_state,
386                &storage,
387                &instance,
388                &peers,
389                &parent_leaf,
390                &leaf,
391            )
392            .await
393            {
394                Ok(state) => {
395                    parent_leaf = leaf;
396                    parent_state = state;
397                    break;
398                },
399                Err(err) => {
400                    tracing::error!(height = leaf.height(), "failed to update state: {err:#}");
401                    // If we fail, delay for a second and retry.
402                    sleep(Duration::from_secs(1)).await;
403                },
404            }
405        }
406    }
407
408    Ok(())
409}
410
411pub(crate) trait SequencerStateDataSource:
412    'static
413    + Debug
414    + AvailabilityDataSource<SeqTypes>
415    + StatusDataSource
416    + VersionedDataSource
417    + CatchupStorage
418    + PrunedHeightDataSource
419    + MerklizedStateHeightPersistence
420{
421}
422
423impl<T> SequencerStateDataSource for T where
424    T: 'static
425        + Debug
426        + AvailabilityDataSource<SeqTypes>
427        + StatusDataSource
428        + VersionedDataSource
429        + CatchupStorage
430        + PrunedHeightDataSource
431        + MerklizedStateHeightPersistence
432{
433}
434
435pub(crate) trait SequencerStateUpdate:
436    Transaction
437    + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
438    + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
439    + UpdateStateData<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>
440    + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
441    + ChainConfigPersistence
442{
443}
444
445impl<T> SequencerStateUpdate for T where
446    T: Transaction
447        + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
448        + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
449        + UpdateStateData<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>
450        + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
451        + ChainConfigPersistence
452{
453}