sequencer/api/
sql.rs

1use std::collections::{HashSet, VecDeque};
2
3use anyhow::{bail, ensure, Context};
4use async_trait::async_trait;
5use committable::{Commitment, Committable};
6use espresso_types::{
7    get_l1_deposits,
8    v0_1::IterableFeeInfo,
9    v0_3::{ChainConfig, RewardAccountV1, RewardMerkleTreeV1, REWARD_MERKLE_TREE_V1_HEIGHT},
10    v0_4::{RewardAccountV2, RewardMerkleTreeV2, REWARD_MERKLE_TREE_V2_HEIGHT},
11    BlockMerkleTree, DrbAndHeaderUpgradeVersion, EpochVersion, FeeAccount, FeeMerkleTree, Leaf2,
12    NodeState, ValidatedState,
13};
14use hotshot::traits::ValidatedState as _;
15use hotshot_query_service::{
16    availability::LeafId,
17    data_source::{
18        sql::{Config, SqlDataSource, Transaction},
19        storage::{
20            sql::{query_as, Db, TransactionMode, Write},
21            AvailabilityStorage, MerklizedStateStorage, NodeStorage, SqlStorage,
22        },
23        VersionedDataSource,
24    },
25    merklized_state::Snapshot,
26    Resolvable,
27};
28use hotshot_types::{
29    data::{EpochNumber, QuorumProposalWrapper, ViewNumber},
30    message::Proposal,
31    traits::node_implementation::ConsensusTime,
32    utils::epoch_from_block_number,
33    vote::HasViewNumber,
34};
35use jf_merkle_tree::{
36    prelude::MerkleNode, ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme,
37    LookupResult, MerkleTreeScheme,
38};
39use sqlx::{Encode, Type};
40use vbs::version::StaticVersionType;
41
42use super::{
43    data_source::{Provider, SequencerDataSource},
44    BlocksFrontier,
45};
46use crate::{
47    catchup::{CatchupStorage, NullStateCatchup},
48    persistence::{sql::Options, ChainConfigPersistence},
49    state::compute_state_update,
50    SeqTypes,
51};
52
53pub type DataSource = SqlDataSource<SeqTypes, Provider>;
54
55#[async_trait]
56impl SequencerDataSource for DataSource {
57    type Options = Options;
58
59    async fn create(opt: Self::Options, provider: Provider, reset: bool) -> anyhow::Result<Self> {
60        let fetch_limit = opt.fetch_rate_limit;
61        let active_fetch_delay = opt.active_fetch_delay;
62        let chunk_fetch_delay = opt.chunk_fetch_delay;
63        let mut cfg = Config::try_from(&opt)?;
64
65        if reset {
66            cfg = cfg.reset_schema();
67        }
68
69        let mut builder = cfg.builder(provider).await?;
70
71        if let Some(limit) = fetch_limit {
72            builder = builder.with_rate_limit(limit);
73        }
74
75        if opt.lightweight {
76            tracing::warn!("enabling light weight mode..");
77            builder = builder.leaf_only();
78        }
79
80        if let Some(delay) = active_fetch_delay {
81            builder = builder.with_active_fetch_delay(delay);
82        }
83        if let Some(delay) = chunk_fetch_delay {
84            builder = builder.with_chunk_fetch_delay(delay);
85        }
86
87        if let Some(batch_size) = opt.types_migration_batch_size {
88            builder = builder.with_types_migration_batch_size(batch_size);
89        }
90
91        builder.build().await
92    }
93}
94
95impl CatchupStorage for SqlStorage {
96    async fn get_reward_accounts_v1(
97        &self,
98        instance: &NodeState,
99        height: u64,
100        view: ViewNumber,
101        accounts: &[RewardAccountV1],
102    ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
103        let mut tx = self.read().await.context(format!(
104            "opening transaction to fetch v1 reward account {accounts:?}; height {height}"
105        ))?;
106
107        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
108            .await
109            .context("getting block height")? as u64;
110        ensure!(
111            block_height > 0,
112            "cannot get accounts for height {height}: no blocks available"
113        );
114
115        // Check if we have the desired state snapshot. If so, we can load the desired accounts
116        // directly.
117        if height < block_height {
118            load_v1_reward_accounts(&mut tx, height, accounts).await
119        } else {
120            let accounts: Vec<_> = accounts
121                .iter()
122                .map(|acct| RewardAccountV2::from(*acct))
123                .collect();
124            // If we do not have the exact snapshot we need, we can try going back to the last
125            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
126            let (state, leaf) =
127                reconstruct_state(instance, &mut tx, block_height - 1, view, &[], &accounts)
128                    .await?;
129            Ok((state.reward_merkle_tree_v1, leaf))
130        }
131    }
132
133    async fn get_reward_accounts_v2(
134        &self,
135        instance: &NodeState,
136        height: u64,
137        view: ViewNumber,
138        accounts: &[RewardAccountV2],
139    ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
140        let mut tx = self.read().await.context(format!(
141            "opening transaction to fetch reward account {accounts:?}; height {height}"
142        ))?;
143
144        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
145            .await
146            .context("getting block height")? as u64;
147        ensure!(
148            block_height > 0,
149            "cannot get accounts for height {height}: no blocks available"
150        );
151
152        // Check if we have the desired state snapshot. If so, we can load the desired accounts
153        // directly.
154        if height < block_height {
155            load_v2_reward_accounts(&mut tx, height, accounts).await
156        } else {
157            // If we do not have the exact snapshot we need, we can try going back to the last
158            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
159            let (state, leaf) =
160                reconstruct_state(instance, &mut tx, block_height - 1, view, &[], accounts).await?;
161            Ok((state.reward_merkle_tree_v2, leaf))
162        }
163    }
164
165    async fn get_accounts(
166        &self,
167        instance: &NodeState,
168        height: u64,
169        view: ViewNumber,
170        accounts: &[FeeAccount],
171    ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
172        let mut tx = self.read().await.context(format!(
173            "opening transaction to fetch account {accounts:?}; height {height}"
174        ))?;
175
176        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
177            .await
178            .context("getting block height")? as u64;
179        ensure!(
180            block_height > 0,
181            "cannot get accounts for height {height}: no blocks available"
182        );
183
184        // Check if we have the desired state snapshot. If so, we can load the desired accounts
185        // directly.
186        if height < block_height {
187            load_accounts(&mut tx, height, accounts).await
188        } else {
189            // If we do not have the exact snapshot we need, we can try going back to the last
190            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
191            let (state, leaf) =
192                reconstruct_state(instance, &mut tx, block_height - 1, view, accounts, &[]).await?;
193            Ok((state.fee_merkle_tree, leaf))
194        }
195    }
196
197    async fn get_frontier(
198        &self,
199        instance: &NodeState,
200        height: u64,
201        view: ViewNumber,
202    ) -> anyhow::Result<BlocksFrontier> {
203        let mut tx = self.read().await.context(format!(
204            "opening transaction to fetch frontier at height {height}"
205        ))?;
206
207        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
208            .await
209            .context("getting block height")? as u64;
210        ensure!(
211            block_height > 0,
212            "cannot get frontier for height {height}: no blocks available"
213        );
214
215        // Check if we have the desired state snapshot. If so, we can load the desired frontier
216        // directly.
217        if height < block_height {
218            load_frontier(&mut tx, height).await
219        } else {
220            // If we do not have the exact snapshot we need, we can try going back to the last
221            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
222            let (state, _) =
223                reconstruct_state(instance, &mut tx, block_height - 1, view, &[], &[]).await?;
224            match state.block_merkle_tree.lookup(height - 1) {
225                LookupResult::Ok(_, proof) => Ok(proof),
226                _ => {
227                    bail!(
228                        "state snapshot {view:?},{height} was found but does not contain frontier \
229                         at height {}; this should not be possible",
230                        height - 1
231                    );
232                },
233            }
234        }
235    }
236
237    async fn get_chain_config(
238        &self,
239        commitment: Commitment<ChainConfig>,
240    ) -> anyhow::Result<ChainConfig> {
241        let mut tx = self.read().await.context(format!(
242            "opening transaction to fetch chain config {commitment}"
243        ))?;
244        load_chain_config(&mut tx, commitment).await
245    }
246
247    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
248        let mut tx = self
249            .read()
250            .await
251            .context(format!("opening transaction to fetch leaf at {height}"))?;
252        let leaf = tx
253            .get_leaf((height as usize).into())
254            .await
255            .context(format!("leaf {height} not available"))?;
256        let mut last_leaf: Leaf2 = leaf.leaf().clone();
257        let mut chain = vec![last_leaf.clone()];
258        let mut h = height + 1;
259
260        loop {
261            let lqd = tx.get_leaf((h as usize).into()).await?;
262            let leaf = lqd.leaf();
263
264            if leaf.justify_qc().view_number() == last_leaf.view_number() {
265                chain.push(leaf.clone());
266            } else {
267                h += 1;
268                continue;
269            }
270
271            // just one away from deciding
272            if leaf.view_number() == last_leaf.view_number() + 1 {
273                last_leaf = leaf.clone();
274                h += 1;
275                break;
276            }
277            h += 1;
278            last_leaf = leaf.clone();
279        }
280
281        loop {
282            let lqd = tx.get_leaf((h as usize).into()).await?;
283            let leaf = lqd.leaf();
284            if leaf.justify_qc().view_number() == last_leaf.view_number() {
285                chain.push(leaf.clone());
286                break;
287            }
288            h += 1;
289        }
290
291        Ok(chain)
292    }
293}
294
295impl CatchupStorage for DataSource {
296    async fn get_accounts(
297        &self,
298        instance: &NodeState,
299        height: u64,
300        view: ViewNumber,
301        accounts: &[FeeAccount],
302    ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
303        self.as_ref()
304            .get_accounts(instance, height, view, accounts)
305            .await
306    }
307
308    async fn get_reward_accounts_v2(
309        &self,
310        instance: &NodeState,
311        height: u64,
312        view: ViewNumber,
313        accounts: &[RewardAccountV2],
314    ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
315        self.as_ref()
316            .get_reward_accounts_v2(instance, height, view, accounts)
317            .await
318    }
319
320    async fn get_reward_accounts_v1(
321        &self,
322        instance: &NodeState,
323        height: u64,
324        view: ViewNumber,
325        accounts: &[RewardAccountV1],
326    ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
327        self.as_ref()
328            .get_reward_accounts_v1(instance, height, view, accounts)
329            .await
330    }
331
332    async fn get_frontier(
333        &self,
334        instance: &NodeState,
335        height: u64,
336        view: ViewNumber,
337    ) -> anyhow::Result<BlocksFrontier> {
338        self.as_ref().get_frontier(instance, height, view).await
339    }
340
341    async fn get_chain_config(
342        &self,
343        commitment: Commitment<ChainConfig>,
344    ) -> anyhow::Result<ChainConfig> {
345        self.as_ref().get_chain_config(commitment).await
346    }
347    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
348        self.as_ref().get_leaf_chain(height).await
349    }
350}
351
352#[async_trait]
353impl ChainConfigPersistence for Transaction<Write> {
354    async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()> {
355        let commitment = chain_config.commitment();
356        let data = bincode::serialize(&chain_config)?;
357        self.upsert(
358            "chain_config",
359            ["commitment", "data"],
360            ["commitment"],
361            [(commitment.to_string(), data)],
362        )
363        .await
364    }
365}
366
367async fn load_frontier<Mode: TransactionMode>(
368    tx: &mut Transaction<Mode>,
369    height: u64,
370) -> anyhow::Result<BlocksFrontier> {
371    tx.get_path(
372        Snapshot::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::Index(height),
373        height
374            .checked_sub(1)
375            .ok_or(anyhow::anyhow!("Subtract with overflow ({height})!"))?,
376    )
377    .await
378    .context(format!("fetching frontier at height {height}"))
379}
380
381async fn load_v1_reward_accounts<Mode: TransactionMode>(
382    tx: &mut Transaction<Mode>,
383    height: u64,
384    accounts: &[RewardAccountV1],
385) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
386    let leaf = tx
387        .get_leaf(LeafId::<SeqTypes>::from(height as usize))
388        .await
389        .context(format!("leaf {height} not available"))?;
390    let header = leaf.header();
391
392    if header.version() < EpochVersion::version()
393        || header.version() >= DrbAndHeaderUpgradeVersion::version()
394    {
395        return Ok((
396            RewardMerkleTreeV1::new(REWARD_MERKLE_TREE_V1_HEIGHT),
397            leaf.leaf().clone(),
398        ));
399    }
400
401    let merkle_root = header.reward_merkle_tree_root().unwrap_left();
402    let mut snapshot = RewardMerkleTreeV1::from_commitment(merkle_root);
403    for account in accounts {
404        let proof = tx
405            .get_path(
406                Snapshot::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::Index(
407                    header.height(),
408                ),
409                *account,
410            )
411            .await
412            .context(format!(
413                "fetching v1 reward account {account}; height {}",
414                header.height()
415            ))?;
416        match proof.proof.first().context(format!(
417            "empty proof for v1 reward account {account}; height {}",
418            header.height()
419        ))? {
420            MerkleNode::Leaf { pos, elem, .. } => {
421                snapshot.remember(*pos, *elem, proof)?;
422            },
423            MerkleNode::Empty => {
424                snapshot.non_membership_remember(*account, proof)?;
425            },
426            _ => {
427                bail!("Invalid proof");
428            },
429        }
430    }
431
432    Ok((snapshot, leaf.leaf().clone()))
433}
434
435/// Loads reward accounts for new reward merkle tree (V4).
436async fn load_v2_reward_accounts<Mode: TransactionMode>(
437    tx: &mut Transaction<Mode>,
438    height: u64,
439    accounts: &[RewardAccountV2],
440) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
441    let leaf = tx
442        .get_leaf(LeafId::<SeqTypes>::from(height as usize))
443        .await
444        .context(format!("leaf {height} not available"))?;
445    let header = leaf.header();
446
447    if header.version() <= EpochVersion::version() {
448        return Ok((
449            RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT),
450            leaf.leaf().clone(),
451        ));
452    }
453
454    let merkle_root = header.reward_merkle_tree_root().unwrap_right();
455    let mut snapshot = RewardMerkleTreeV2::from_commitment(merkle_root);
456    for account in accounts {
457        let proof = tx
458            .get_path(
459                Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(
460                    header.height(),
461                ),
462                *account,
463            )
464            .await
465            .context(format!(
466                "fetching reward account {account}; height {}",
467                header.height()
468            ))?;
469        match proof.proof.first().context(format!(
470            "empty proof for reward account {account}; height {}",
471            header.height()
472        ))? {
473            MerkleNode::Leaf { pos, elem, .. } => {
474                snapshot.remember(*pos, *elem, proof)?;
475            },
476            MerkleNode::Empty => {
477                snapshot.non_membership_remember(*account, proof)?;
478            },
479            _ => {
480                bail!("Invalid proof");
481            },
482        }
483    }
484
485    Ok((snapshot, leaf.leaf().clone()))
486}
487
488async fn load_accounts<Mode: TransactionMode>(
489    tx: &mut Transaction<Mode>,
490    height: u64,
491    accounts: &[FeeAccount],
492) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
493    let leaf = tx
494        .get_leaf(LeafId::<SeqTypes>::from(height as usize))
495        .await
496        .context(format!("leaf {height} not available"))?;
497    let header = leaf.header();
498
499    let mut snapshot = FeeMerkleTree::from_commitment(header.fee_merkle_tree_root());
500    for account in accounts {
501        let proof = tx
502            .get_path(
503                Snapshot::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::Index(
504                    header.height(),
505                ),
506                *account,
507            )
508            .await
509            .context(format!(
510                "fetching account {account}; height {}",
511                header.height()
512            ))?;
513        match proof.proof.first().context(format!(
514            "empty proof for account {account}; height {}",
515            header.height()
516        ))? {
517            MerkleNode::Leaf { pos, elem, .. } => {
518                snapshot.remember(*pos, *elem, proof)?;
519            },
520            MerkleNode::Empty => {
521                snapshot.non_membership_remember(*account, proof)?;
522            },
523            _ => {
524                bail!("Invalid proof");
525            },
526        }
527    }
528
529    Ok((snapshot, leaf.leaf().clone()))
530}
531
532async fn load_chain_config<Mode: TransactionMode>(
533    tx: &mut Transaction<Mode>,
534    commitment: Commitment<ChainConfig>,
535) -> anyhow::Result<ChainConfig> {
536    let (data,) = query_as::<(Vec<u8>,)>("SELECT data from chain_config where commitment = $1")
537        .bind(commitment.to_string())
538        .fetch_one(tx.as_mut())
539        .await
540        .unwrap();
541
542    bincode::deserialize(&data[..]).context("failed to deserialize")
543}
544
545/// Reconstructs the `ValidatedState` from a specific block height up to a given view.
546///
547/// This loads all required fee and reward accounts into the Merkle tree before applying the
548/// State Transition Function (STF), preventing recursive catchup during STF replay.
549///
550/// Note: Even if the primary goal is to catch up the block Merkle tree,
551/// fee and reward header dependencies must still be present beforehand
552/// This is because reconstructing the `ValidatedState` involves replaying the STF over a
553/// range of leaves, and the STF requires all associated data to be present in the `ValidatedState`;
554/// otherwise, it will attempt to trigger catchup itself.
555#[tracing::instrument(skip(instance, tx))]
556pub(crate) async fn reconstruct_state<Mode: TransactionMode>(
557    instance: &NodeState,
558    tx: &mut Transaction<Mode>,
559    from_height: u64,
560    to_view: ViewNumber,
561    fee_accounts: &[FeeAccount],
562    reward_accounts: &[RewardAccountV2],
563) -> anyhow::Result<(ValidatedState, Leaf2)> {
564    tracing::info!("attempting to reconstruct fee state");
565    let from_leaf = tx
566        .get_leaf((from_height as usize).into())
567        .await
568        .context(format!("leaf {from_height} not available"))?;
569    let from_leaf: Leaf2 = from_leaf.leaf().clone();
570    ensure!(
571        from_leaf.view_number() < to_view,
572        "state reconstruction: starting state {:?} must be before ending state {to_view:?}",
573        from_leaf.view_number(),
574    );
575
576    // Get the sequence of headers we will be applying to compute the latest state.
577    let mut leaves = VecDeque::new();
578    let to_leaf = get_leaf_from_proposal(tx, "view = $1", &(to_view.u64() as i64))
579        .await
580        .context(format!(
581            "unable to reconstruct state because leaf {to_view:?} is not available"
582        ))?;
583    let mut parent = to_leaf.parent_commitment();
584    tracing::debug!(?to_leaf, ?parent, view = ?to_view, "have required leaf");
585    leaves.push_front(to_leaf.clone());
586    while parent != Committable::commit(&from_leaf) {
587        let leaf = get_leaf_from_proposal(tx, "leaf_hash = $1", &parent.to_string())
588            .await
589            .context(format!(
590                "unable to reconstruct state because leaf {parent} is not available"
591            ))?;
592        parent = leaf.parent_commitment();
593        tracing::debug!(?leaf, ?parent, "have required leaf");
594        leaves.push_front(leaf);
595    }
596
597    // Get the initial state.
598    let mut parent = from_leaf;
599    let mut state = ValidatedState::from_header(parent.block_header());
600
601    // Pre-load the state with the accounts we care about to ensure they will be present in the
602    // final state.
603    let mut catchup = NullStateCatchup::default();
604
605    let mut fee_accounts = fee_accounts.iter().copied().collect::<HashSet<_>>();
606    // Add in all the accounts we will need to replay any of the headers, to ensure that we don't
607    // need to do catchup recursively.
608    tracing::info!(
609        "reconstructing fee accounts state for from height {from_height} to view {to_view}"
610    );
611
612    let dependencies =
613        fee_header_dependencies(&mut catchup, tx, instance, &parent, &leaves).await?;
614    fee_accounts.extend(dependencies);
615    let fee_accounts = fee_accounts.into_iter().collect::<Vec<_>>();
616    state.fee_merkle_tree = load_accounts(tx, from_height, &fee_accounts)
617        .await
618        .context("unable to reconstruct state because accounts are not available at origin")?
619        .0;
620    ensure!(
621        state.fee_merkle_tree.commitment() == parent.block_header().fee_merkle_tree_root(),
622        "loaded fee state does not match parent header"
623    );
624
625    tracing::info!(
626        "reconstructing reward accounts for from height {from_height} to view {to_view}"
627    );
628
629    let mut reward_accounts = reward_accounts.iter().copied().collect::<HashSet<_>>();
630
631    // Collect all reward account dependencies needed for replaying the STF.
632    // These accounts must be preloaded into the reward Merkle tree to prevent recursive catchups.
633    let dependencies = reward_header_dependencies(instance, &leaves).await?;
634    reward_accounts.extend(dependencies);
635    let reward_accounts = reward_accounts.into_iter().collect::<Vec<_>>();
636
637    // Load all required reward accounts and update the reward Merkle tree.
638    match parent.block_header().reward_merkle_tree_root() {
639        either::Either::Left(expected_root) => {
640            let accts = reward_accounts
641                .into_iter()
642                .map(RewardAccountV1::from)
643                .collect::<Vec<_>>();
644            state.reward_merkle_tree_v1 = load_v1_reward_accounts(tx, from_height, &accts)
645                .await
646                .context(
647                    "unable to reconstruct state because v1 reward accounts are not available at \
648                     origin",
649                )?
650                .0;
651            ensure!(
652                state.reward_merkle_tree_v1.commitment() == expected_root,
653                "loaded v1 reward state does not match parent header"
654            );
655        },
656        either::Either::Right(expected_root) => {
657            state.reward_merkle_tree_v2 =
658                load_v2_reward_accounts(tx, from_height, &reward_accounts)
659                    .await
660                    .context(
661                        "unable to reconstruct state because v2 reward accounts are not available \
662                         at origin",
663                    )?
664                    .0;
665            ensure!(
666                state.reward_merkle_tree_v2.commitment() == expected_root,
667                "loaded reward state does not match parent header"
668            );
669        },
670    }
671
672    // We need the blocks frontier as well, to apply the STF.
673    let frontier = load_frontier(tx, from_height)
674        .await
675        .context("unable to reconstruct state because frontier is not available at origin")?;
676    match frontier
677        .proof
678        .first()
679        .context("empty proof for frontier at origin")?
680    {
681        MerkleNode::Leaf { pos, elem, .. } => state
682            .block_merkle_tree
683            .remember(*pos, *elem, frontier)
684            .context("failed to remember frontier")?,
685        _ => bail!("invalid frontier proof"),
686    }
687
688    // Apply subsequent headers to compute the later state.
689    for proposal in leaves {
690        state = compute_state_update(&state, instance, &catchup, &parent, &proposal)
691            .await
692            .context(format!(
693                "unable to reconstruct state because state update {} failed",
694                proposal.height(),
695            ))?
696            .0;
697        parent = proposal;
698    }
699
700    tracing::info!(from_height, ?to_view, "successfully reconstructed state");
701    Ok((state, to_leaf))
702}
703
704/// Get the dependencies needed to apply the STF to the given list of headers.
705///
706/// Returns
707/// * A state catchup implementation seeded with all the chain configs required to apply the headers
708///   in `leaves`
709/// * The set of accounts that must be preloaded to apply these headers
710async fn fee_header_dependencies<Mode: TransactionMode>(
711    catchup: &mut NullStateCatchup,
712    tx: &mut Transaction<Mode>,
713    instance: &NodeState,
714    mut parent: &Leaf2,
715    leaves: impl IntoIterator<Item = &Leaf2>,
716) -> anyhow::Result<HashSet<FeeAccount>> {
717    let mut accounts = HashSet::default();
718
719    for proposal in leaves {
720        let header = proposal.block_header();
721        let height = header.height();
722        let view = proposal.view_number();
723        tracing::debug!(height, ?view, "fetching dependencies for proposal");
724
725        let header_cf = header.chain_config();
726        let chain_config = if header_cf.commit() == instance.chain_config.commit() {
727            instance.chain_config
728        } else {
729            match header_cf.resolve() {
730                Some(cf) => cf,
731                None => {
732                    tracing::info!(
733                        height,
734                        ?view,
735                        commit = %header_cf.commit(),
736                        "chain config not available, attempting to load from storage",
737                    );
738                    let cf = load_chain_config(tx, header_cf.commit())
739                        .await
740                        .context(format!(
741                            "loading chain config {} for header {},{:?}",
742                            header_cf.commit(),
743                            header.height(),
744                            proposal.view_number()
745                        ))?;
746
747                    // If we had to fetch a chain config now, store it in the catchup implementation
748                    // so the STF will be able to look it up later.
749                    catchup.add_chain_config(cf);
750                    cf
751                },
752            }
753        };
754
755        accounts.insert(chain_config.fee_recipient);
756        accounts.extend(
757            get_l1_deposits(instance, header, parent, chain_config.fee_contract)
758                .await
759                .into_iter()
760                .map(|fee| fee.account()),
761        );
762        accounts.extend(header.fee_info().accounts());
763        parent = proposal;
764    }
765    Ok(accounts)
766}
767
768/// Identifies all reward accounts required to replay the State Transition Function
769/// for the given leaf proposals. These accounts should be present in the Merkle tree
770/// *before* applying the STF to avoid recursive catchup (i.e., STF triggering another catchup).
771async fn reward_header_dependencies(
772    instance: &NodeState,
773    leaves: impl IntoIterator<Item = &Leaf2>,
774) -> anyhow::Result<HashSet<RewardAccountV2>> {
775    let mut reward_accounts = HashSet::default();
776    let epoch_height = instance.epoch_height;
777
778    let Some(epoch_height) = epoch_height else {
779        tracing::info!("epoch height is not set. returning empty reward_header_dependencies");
780        return Ok(HashSet::new());
781    };
782
783    let coordinator = instance.coordinator.clone();
784    let membership_lock = coordinator.membership().read().await;
785    let first_epoch = membership_lock.first_epoch();
786    drop(membership_lock);
787    // add all the chain configs needed to apply STF to headers to the catchup
788    for proposal in leaves {
789        let header = proposal.block_header();
790
791        let height = header.height();
792        let view = proposal.view_number();
793        tracing::debug!(height, ?view, "fetching dependencies for proposal");
794
795        let version = header.version();
796        // Skip if version is less than epoch version
797        if version < EpochVersion::version() {
798            continue;
799        }
800
801        let first_epoch = first_epoch.context("first epoch not found")?;
802
803        let proposal_epoch = EpochNumber::new(epoch_from_block_number(height, epoch_height));
804
805        // reward distribution starts third epoch onwards
806        if proposal_epoch <= first_epoch + 1 {
807            continue;
808        }
809
810        let epoch_membership = match coordinator.membership_for_epoch(Some(proposal_epoch)).await {
811            Ok(e) => e,
812            Err(err) => {
813                tracing::info!(
814                    "failed to get membership for epoch={proposal_epoch:?}. err={err:#}"
815                );
816
817                coordinator
818                    .wait_for_catchup(proposal_epoch)
819                    .await
820                    .context(format!("failed to catchup for epoch={proposal_epoch}"))?
821            },
822        };
823
824        let leader = epoch_membership.leader(proposal.view_number()).await?;
825        let membership_lock = coordinator.membership().read().await;
826        let validator = membership_lock.get_validator_config(&proposal_epoch, leader)?;
827        drop(membership_lock);
828
829        reward_accounts.insert(RewardAccountV2(validator.account));
830
831        let delegators: Vec<RewardAccountV2> = validator
832            .delegators
833            .keys()
834            .map(|d| RewardAccountV2(*d))
835            .collect();
836
837        reward_accounts.extend(delegators);
838    }
839    Ok(reward_accounts)
840}
841
842async fn get_leaf_from_proposal<Mode, P>(
843    tx: &mut Transaction<Mode>,
844    where_clause: &str,
845    param: P,
846) -> anyhow::Result<Leaf2>
847where
848    P: Type<Db> + for<'q> Encode<'q, Db>,
849{
850    let (data,) = query_as::<(Vec<u8>,)>(&format!(
851        "SELECT data FROM quorum_proposals2 WHERE {where_clause} LIMIT 1",
852    ))
853    .bind(param)
854    .fetch_one(tx.as_mut())
855    .await?;
856    let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
857        bincode::deserialize(&data)?;
858    Ok(Leaf2::from_quorum_proposal(&proposal.data))
859}
860
861#[cfg(any(test, feature = "testing"))]
862pub(crate) mod impl_testable_data_source {
863
864    use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
865
866    use super::*;
867    use crate::api::{self, data_source::testing::TestableSequencerDataSource};
868
869    pub fn tmp_options(db: &TmpDb) -> Options {
870        #[cfg(not(feature = "embedded-db"))]
871        {
872            let opt = crate::persistence::sql::PostgresOptions {
873                port: Some(db.port()),
874                host: Some(db.host()),
875                user: Some("postgres".into()),
876                password: Some("password".into()),
877                ..Default::default()
878            };
879
880            opt.into()
881        }
882
883        #[cfg(feature = "embedded-db")]
884        {
885            let opt = crate::persistence::sql::SqliteOptions {
886                path: Some(db.path()),
887            };
888            opt.into()
889        }
890    }
891
892    #[async_trait]
893    impl TestableSequencerDataSource for DataSource {
894        type Storage = TmpDb;
895
896        async fn create_storage() -> Self::Storage {
897            TmpDb::init().await
898        }
899
900        fn persistence_options(storage: &Self::Storage) -> Self::Options {
901            tmp_options(storage)
902        }
903
904        fn leaf_only_ds_options(
905            storage: &Self::Storage,
906            opt: api::Options,
907        ) -> anyhow::Result<api::Options> {
908            let mut ds_opts = tmp_options(storage);
909            ds_opts.lightweight = true;
910            Ok(opt.query_sql(Default::default(), ds_opts))
911        }
912
913        fn options(storage: &Self::Storage, opt: api::Options) -> api::Options {
914            opt.query_sql(Default::default(), tmp_options(storage))
915        }
916    }
917}