1use std::collections::{HashSet, VecDeque};
2
3use anyhow::{bail, ensure, Context};
4use async_trait::async_trait;
5use committable::{Commitment, Committable};
6use espresso_types::{
7 get_l1_deposits,
8 v0_1::IterableFeeInfo,
9 v0_3::{ChainConfig, RewardAccountV1, RewardMerkleTreeV1, REWARD_MERKLE_TREE_V1_HEIGHT},
10 v0_4::{RewardAccountV2, RewardMerkleTreeV2, REWARD_MERKLE_TREE_V2_HEIGHT},
11 BlockMerkleTree, DrbAndHeaderUpgradeVersion, EpochVersion, FeeAccount, FeeMerkleTree, Leaf2,
12 NodeState, ValidatedState,
13};
14use hotshot::traits::ValidatedState as _;
15use hotshot_query_service::{
16 availability::LeafId,
17 data_source::{
18 sql::{Config, SqlDataSource, Transaction},
19 storage::{
20 sql::{query_as, Db, TransactionMode, Write},
21 AvailabilityStorage, MerklizedStateStorage, NodeStorage, SqlStorage,
22 },
23 VersionedDataSource,
24 },
25 merklized_state::Snapshot,
26 Resolvable,
27};
28use hotshot_types::{
29 data::{EpochNumber, QuorumProposalWrapper, ViewNumber},
30 message::Proposal,
31 traits::node_implementation::ConsensusTime,
32 utils::epoch_from_block_number,
33 vote::HasViewNumber,
34};
35use jf_merkle_tree::{
36 prelude::MerkleNode, ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme,
37 LookupResult, MerkleTreeScheme,
38};
39use sqlx::{Encode, Type};
40use vbs::version::StaticVersionType;
41
42use super::{
43 data_source::{Provider, SequencerDataSource},
44 BlocksFrontier,
45};
46use crate::{
47 catchup::{CatchupStorage, NullStateCatchup},
48 persistence::{sql::Options, ChainConfigPersistence},
49 state::compute_state_update,
50 SeqTypes,
51};
52
53pub type DataSource = SqlDataSource<SeqTypes, Provider>;
54
55#[async_trait]
56impl SequencerDataSource for DataSource {
57 type Options = Options;
58
59 async fn create(opt: Self::Options, provider: Provider, reset: bool) -> anyhow::Result<Self> {
60 let fetch_limit = opt.fetch_rate_limit;
61 let active_fetch_delay = opt.active_fetch_delay;
62 let chunk_fetch_delay = opt.chunk_fetch_delay;
63 let mut cfg = Config::try_from(&opt)?;
64
65 if reset {
66 cfg = cfg.reset_schema();
67 }
68
69 let mut builder = cfg.builder(provider).await?;
70
71 if let Some(limit) = fetch_limit {
72 builder = builder.with_rate_limit(limit);
73 }
74
75 if opt.lightweight {
76 tracing::warn!("enabling light weight mode..");
77 builder = builder.leaf_only();
78 }
79
80 if let Some(delay) = active_fetch_delay {
81 builder = builder.with_active_fetch_delay(delay);
82 }
83 if let Some(delay) = chunk_fetch_delay {
84 builder = builder.with_chunk_fetch_delay(delay);
85 }
86
87 if let Some(batch_size) = opt.types_migration_batch_size {
88 builder = builder.with_types_migration_batch_size(batch_size);
89 }
90
91 builder.build().await
92 }
93}
94
95impl CatchupStorage for SqlStorage {
96 async fn get_reward_accounts_v1(
97 &self,
98 instance: &NodeState,
99 height: u64,
100 view: ViewNumber,
101 accounts: &[RewardAccountV1],
102 ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
103 let mut tx = self.read().await.context(format!(
104 "opening transaction to fetch v1 reward account {accounts:?}; height {height}"
105 ))?;
106
107 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
108 .await
109 .context("getting block height")? as u64;
110 ensure!(
111 block_height > 0,
112 "cannot get accounts for height {height}: no blocks available"
113 );
114
115 if height < block_height {
118 load_v1_reward_accounts(&mut tx, height, accounts).await
119 } else {
120 let accounts: Vec<_> = accounts
121 .iter()
122 .map(|acct| RewardAccountV2::from(*acct))
123 .collect();
124 let (state, leaf) =
127 reconstruct_state(instance, &mut tx, block_height - 1, view, &[], &accounts)
128 .await?;
129 Ok((state.reward_merkle_tree_v1, leaf))
130 }
131 }
132
133 async fn get_reward_accounts_v2(
134 &self,
135 instance: &NodeState,
136 height: u64,
137 view: ViewNumber,
138 accounts: &[RewardAccountV2],
139 ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
140 let mut tx = self.read().await.context(format!(
141 "opening transaction to fetch reward account {accounts:?}; height {height}"
142 ))?;
143
144 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
145 .await
146 .context("getting block height")? as u64;
147 ensure!(
148 block_height > 0,
149 "cannot get accounts for height {height}: no blocks available"
150 );
151
152 if height < block_height {
155 load_v2_reward_accounts(&mut tx, height, accounts).await
156 } else {
157 let (state, leaf) =
160 reconstruct_state(instance, &mut tx, block_height - 1, view, &[], accounts).await?;
161 Ok((state.reward_merkle_tree_v2, leaf))
162 }
163 }
164
165 async fn get_accounts(
166 &self,
167 instance: &NodeState,
168 height: u64,
169 view: ViewNumber,
170 accounts: &[FeeAccount],
171 ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
172 let mut tx = self.read().await.context(format!(
173 "opening transaction to fetch account {accounts:?}; height {height}"
174 ))?;
175
176 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
177 .await
178 .context("getting block height")? as u64;
179 ensure!(
180 block_height > 0,
181 "cannot get accounts for height {height}: no blocks available"
182 );
183
184 if height < block_height {
187 load_accounts(&mut tx, height, accounts).await
188 } else {
189 let (state, leaf) =
192 reconstruct_state(instance, &mut tx, block_height - 1, view, accounts, &[]).await?;
193 Ok((state.fee_merkle_tree, leaf))
194 }
195 }
196
197 async fn get_frontier(
198 &self,
199 instance: &NodeState,
200 height: u64,
201 view: ViewNumber,
202 ) -> anyhow::Result<BlocksFrontier> {
203 let mut tx = self.read().await.context(format!(
204 "opening transaction to fetch frontier at height {height}"
205 ))?;
206
207 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
208 .await
209 .context("getting block height")? as u64;
210 ensure!(
211 block_height > 0,
212 "cannot get frontier for height {height}: no blocks available"
213 );
214
215 if height < block_height {
218 load_frontier(&mut tx, height).await
219 } else {
220 let (state, _) =
223 reconstruct_state(instance, &mut tx, block_height - 1, view, &[], &[]).await?;
224 match state.block_merkle_tree.lookup(height - 1) {
225 LookupResult::Ok(_, proof) => Ok(proof),
226 _ => {
227 bail!(
228 "state snapshot {view:?},{height} was found but does not contain frontier \
229 at height {}; this should not be possible",
230 height - 1
231 );
232 },
233 }
234 }
235 }
236
237 async fn get_chain_config(
238 &self,
239 commitment: Commitment<ChainConfig>,
240 ) -> anyhow::Result<ChainConfig> {
241 let mut tx = self.read().await.context(format!(
242 "opening transaction to fetch chain config {commitment}"
243 ))?;
244 load_chain_config(&mut tx, commitment).await
245 }
246
247 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
248 let mut tx = self
249 .read()
250 .await
251 .context(format!("opening transaction to fetch leaf at {height}"))?;
252 let leaf = tx
253 .get_leaf((height as usize).into())
254 .await
255 .context(format!("leaf {height} not available"))?;
256 let mut last_leaf: Leaf2 = leaf.leaf().clone();
257 let mut chain = vec![last_leaf.clone()];
258 let mut h = height + 1;
259
260 loop {
261 let lqd = tx.get_leaf((h as usize).into()).await?;
262 let leaf = lqd.leaf();
263
264 if leaf.justify_qc().view_number() == last_leaf.view_number() {
265 chain.push(leaf.clone());
266 } else {
267 h += 1;
268 continue;
269 }
270
271 if leaf.view_number() == last_leaf.view_number() + 1 {
273 last_leaf = leaf.clone();
274 h += 1;
275 break;
276 }
277 h += 1;
278 last_leaf = leaf.clone();
279 }
280
281 loop {
282 let lqd = tx.get_leaf((h as usize).into()).await?;
283 let leaf = lqd.leaf();
284 if leaf.justify_qc().view_number() == last_leaf.view_number() {
285 chain.push(leaf.clone());
286 break;
287 }
288 h += 1;
289 }
290
291 Ok(chain)
292 }
293}
294
295impl CatchupStorage for DataSource {
296 async fn get_accounts(
297 &self,
298 instance: &NodeState,
299 height: u64,
300 view: ViewNumber,
301 accounts: &[FeeAccount],
302 ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
303 self.as_ref()
304 .get_accounts(instance, height, view, accounts)
305 .await
306 }
307
308 async fn get_reward_accounts_v2(
309 &self,
310 instance: &NodeState,
311 height: u64,
312 view: ViewNumber,
313 accounts: &[RewardAccountV2],
314 ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
315 self.as_ref()
316 .get_reward_accounts_v2(instance, height, view, accounts)
317 .await
318 }
319
320 async fn get_reward_accounts_v1(
321 &self,
322 instance: &NodeState,
323 height: u64,
324 view: ViewNumber,
325 accounts: &[RewardAccountV1],
326 ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
327 self.as_ref()
328 .get_reward_accounts_v1(instance, height, view, accounts)
329 .await
330 }
331
332 async fn get_frontier(
333 &self,
334 instance: &NodeState,
335 height: u64,
336 view: ViewNumber,
337 ) -> anyhow::Result<BlocksFrontier> {
338 self.as_ref().get_frontier(instance, height, view).await
339 }
340
341 async fn get_chain_config(
342 &self,
343 commitment: Commitment<ChainConfig>,
344 ) -> anyhow::Result<ChainConfig> {
345 self.as_ref().get_chain_config(commitment).await
346 }
347 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
348 self.as_ref().get_leaf_chain(height).await
349 }
350}
351
352#[async_trait]
353impl ChainConfigPersistence for Transaction<Write> {
354 async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()> {
355 let commitment = chain_config.commitment();
356 let data = bincode::serialize(&chain_config)?;
357 self.upsert(
358 "chain_config",
359 ["commitment", "data"],
360 ["commitment"],
361 [(commitment.to_string(), data)],
362 )
363 .await
364 }
365}
366
367async fn load_frontier<Mode: TransactionMode>(
368 tx: &mut Transaction<Mode>,
369 height: u64,
370) -> anyhow::Result<BlocksFrontier> {
371 tx.get_path(
372 Snapshot::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::Index(height),
373 height
374 .checked_sub(1)
375 .ok_or(anyhow::anyhow!("Subtract with overflow ({height})!"))?,
376 )
377 .await
378 .context(format!("fetching frontier at height {height}"))
379}
380
381async fn load_v1_reward_accounts<Mode: TransactionMode>(
382 tx: &mut Transaction<Mode>,
383 height: u64,
384 accounts: &[RewardAccountV1],
385) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
386 let leaf = tx
387 .get_leaf(LeafId::<SeqTypes>::from(height as usize))
388 .await
389 .context(format!("leaf {height} not available"))?;
390 let header = leaf.header();
391
392 if header.version() < EpochVersion::version()
393 || header.version() >= DrbAndHeaderUpgradeVersion::version()
394 {
395 return Ok((
396 RewardMerkleTreeV1::new(REWARD_MERKLE_TREE_V1_HEIGHT),
397 leaf.leaf().clone(),
398 ));
399 }
400
401 let merkle_root = header.reward_merkle_tree_root().unwrap_left();
402 let mut snapshot = RewardMerkleTreeV1::from_commitment(merkle_root);
403 for account in accounts {
404 let proof = tx
405 .get_path(
406 Snapshot::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::Index(
407 header.height(),
408 ),
409 *account,
410 )
411 .await
412 .context(format!(
413 "fetching v1 reward account {account}; height {}",
414 header.height()
415 ))?;
416 match proof.proof.first().context(format!(
417 "empty proof for v1 reward account {account}; height {}",
418 header.height()
419 ))? {
420 MerkleNode::Leaf { pos, elem, .. } => {
421 snapshot.remember(*pos, *elem, proof)?;
422 },
423 MerkleNode::Empty => {
424 snapshot.non_membership_remember(*account, proof)?;
425 },
426 _ => {
427 bail!("Invalid proof");
428 },
429 }
430 }
431
432 Ok((snapshot, leaf.leaf().clone()))
433}
434
435async fn load_v2_reward_accounts<Mode: TransactionMode>(
437 tx: &mut Transaction<Mode>,
438 height: u64,
439 accounts: &[RewardAccountV2],
440) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
441 let leaf = tx
442 .get_leaf(LeafId::<SeqTypes>::from(height as usize))
443 .await
444 .context(format!("leaf {height} not available"))?;
445 let header = leaf.header();
446
447 if header.version() <= EpochVersion::version() {
448 return Ok((
449 RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT),
450 leaf.leaf().clone(),
451 ));
452 }
453
454 let merkle_root = header.reward_merkle_tree_root().unwrap_right();
455 let mut snapshot = RewardMerkleTreeV2::from_commitment(merkle_root);
456 for account in accounts {
457 let proof = tx
458 .get_path(
459 Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(
460 header.height(),
461 ),
462 *account,
463 )
464 .await
465 .context(format!(
466 "fetching reward account {account}; height {}",
467 header.height()
468 ))?;
469 match proof.proof.first().context(format!(
470 "empty proof for reward account {account}; height {}",
471 header.height()
472 ))? {
473 MerkleNode::Leaf { pos, elem, .. } => {
474 snapshot.remember(*pos, *elem, proof)?;
475 },
476 MerkleNode::Empty => {
477 snapshot.non_membership_remember(*account, proof)?;
478 },
479 _ => {
480 bail!("Invalid proof");
481 },
482 }
483 }
484
485 Ok((snapshot, leaf.leaf().clone()))
486}
487
488async fn load_accounts<Mode: TransactionMode>(
489 tx: &mut Transaction<Mode>,
490 height: u64,
491 accounts: &[FeeAccount],
492) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
493 let leaf = tx
494 .get_leaf(LeafId::<SeqTypes>::from(height as usize))
495 .await
496 .context(format!("leaf {height} not available"))?;
497 let header = leaf.header();
498
499 let mut snapshot = FeeMerkleTree::from_commitment(header.fee_merkle_tree_root());
500 for account in accounts {
501 let proof = tx
502 .get_path(
503 Snapshot::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::Index(
504 header.height(),
505 ),
506 *account,
507 )
508 .await
509 .context(format!(
510 "fetching account {account}; height {}",
511 header.height()
512 ))?;
513 match proof.proof.first().context(format!(
514 "empty proof for account {account}; height {}",
515 header.height()
516 ))? {
517 MerkleNode::Leaf { pos, elem, .. } => {
518 snapshot.remember(*pos, *elem, proof)?;
519 },
520 MerkleNode::Empty => {
521 snapshot.non_membership_remember(*account, proof)?;
522 },
523 _ => {
524 bail!("Invalid proof");
525 },
526 }
527 }
528
529 Ok((snapshot, leaf.leaf().clone()))
530}
531
532async fn load_chain_config<Mode: TransactionMode>(
533 tx: &mut Transaction<Mode>,
534 commitment: Commitment<ChainConfig>,
535) -> anyhow::Result<ChainConfig> {
536 let (data,) = query_as::<(Vec<u8>,)>("SELECT data from chain_config where commitment = $1")
537 .bind(commitment.to_string())
538 .fetch_one(tx.as_mut())
539 .await
540 .unwrap();
541
542 bincode::deserialize(&data[..]).context("failed to deserialize")
543}
544
545#[tracing::instrument(skip(instance, tx))]
556pub(crate) async fn reconstruct_state<Mode: TransactionMode>(
557 instance: &NodeState,
558 tx: &mut Transaction<Mode>,
559 from_height: u64,
560 to_view: ViewNumber,
561 fee_accounts: &[FeeAccount],
562 reward_accounts: &[RewardAccountV2],
563) -> anyhow::Result<(ValidatedState, Leaf2)> {
564 tracing::info!("attempting to reconstruct fee state");
565 let from_leaf = tx
566 .get_leaf((from_height as usize).into())
567 .await
568 .context(format!("leaf {from_height} not available"))?;
569 let from_leaf: Leaf2 = from_leaf.leaf().clone();
570 ensure!(
571 from_leaf.view_number() < to_view,
572 "state reconstruction: starting state {:?} must be before ending state {to_view:?}",
573 from_leaf.view_number(),
574 );
575
576 let mut leaves = VecDeque::new();
578 let to_leaf = get_leaf_from_proposal(tx, "view = $1", &(to_view.u64() as i64))
579 .await
580 .context(format!(
581 "unable to reconstruct state because leaf {to_view:?} is not available"
582 ))?;
583 let mut parent = to_leaf.parent_commitment();
584 tracing::debug!(?to_leaf, ?parent, view = ?to_view, "have required leaf");
585 leaves.push_front(to_leaf.clone());
586 while parent != Committable::commit(&from_leaf) {
587 let leaf = get_leaf_from_proposal(tx, "leaf_hash = $1", &parent.to_string())
588 .await
589 .context(format!(
590 "unable to reconstruct state because leaf {parent} is not available"
591 ))?;
592 parent = leaf.parent_commitment();
593 tracing::debug!(?leaf, ?parent, "have required leaf");
594 leaves.push_front(leaf);
595 }
596
597 let mut parent = from_leaf;
599 let mut state = ValidatedState::from_header(parent.block_header());
600
601 let mut catchup = NullStateCatchup::default();
604
605 let mut fee_accounts = fee_accounts.iter().copied().collect::<HashSet<_>>();
606 tracing::info!(
609 "reconstructing fee accounts state for from height {from_height} to view {to_view}"
610 );
611
612 let dependencies =
613 fee_header_dependencies(&mut catchup, tx, instance, &parent, &leaves).await?;
614 fee_accounts.extend(dependencies);
615 let fee_accounts = fee_accounts.into_iter().collect::<Vec<_>>();
616 state.fee_merkle_tree = load_accounts(tx, from_height, &fee_accounts)
617 .await
618 .context("unable to reconstruct state because accounts are not available at origin")?
619 .0;
620 ensure!(
621 state.fee_merkle_tree.commitment() == parent.block_header().fee_merkle_tree_root(),
622 "loaded fee state does not match parent header"
623 );
624
625 tracing::info!(
626 "reconstructing reward accounts for from height {from_height} to view {to_view}"
627 );
628
629 let mut reward_accounts = reward_accounts.iter().copied().collect::<HashSet<_>>();
630
631 let dependencies = reward_header_dependencies(instance, &leaves).await?;
634 reward_accounts.extend(dependencies);
635 let reward_accounts = reward_accounts.into_iter().collect::<Vec<_>>();
636
637 match parent.block_header().reward_merkle_tree_root() {
639 either::Either::Left(expected_root) => {
640 let accts = reward_accounts
641 .into_iter()
642 .map(RewardAccountV1::from)
643 .collect::<Vec<_>>();
644 state.reward_merkle_tree_v1 = load_v1_reward_accounts(tx, from_height, &accts)
645 .await
646 .context(
647 "unable to reconstruct state because v1 reward accounts are not available at \
648 origin",
649 )?
650 .0;
651 ensure!(
652 state.reward_merkle_tree_v1.commitment() == expected_root,
653 "loaded v1 reward state does not match parent header"
654 );
655 },
656 either::Either::Right(expected_root) => {
657 state.reward_merkle_tree_v2 =
658 load_v2_reward_accounts(tx, from_height, &reward_accounts)
659 .await
660 .context(
661 "unable to reconstruct state because v2 reward accounts are not available \
662 at origin",
663 )?
664 .0;
665 ensure!(
666 state.reward_merkle_tree_v2.commitment() == expected_root,
667 "loaded reward state does not match parent header"
668 );
669 },
670 }
671
672 let frontier = load_frontier(tx, from_height)
674 .await
675 .context("unable to reconstruct state because frontier is not available at origin")?;
676 match frontier
677 .proof
678 .first()
679 .context("empty proof for frontier at origin")?
680 {
681 MerkleNode::Leaf { pos, elem, .. } => state
682 .block_merkle_tree
683 .remember(*pos, *elem, frontier)
684 .context("failed to remember frontier")?,
685 _ => bail!("invalid frontier proof"),
686 }
687
688 for proposal in leaves {
690 state = compute_state_update(&state, instance, &catchup, &parent, &proposal)
691 .await
692 .context(format!(
693 "unable to reconstruct state because state update {} failed",
694 proposal.height(),
695 ))?
696 .0;
697 parent = proposal;
698 }
699
700 tracing::info!(from_height, ?to_view, "successfully reconstructed state");
701 Ok((state, to_leaf))
702}
703
704async fn fee_header_dependencies<Mode: TransactionMode>(
711 catchup: &mut NullStateCatchup,
712 tx: &mut Transaction<Mode>,
713 instance: &NodeState,
714 mut parent: &Leaf2,
715 leaves: impl IntoIterator<Item = &Leaf2>,
716) -> anyhow::Result<HashSet<FeeAccount>> {
717 let mut accounts = HashSet::default();
718
719 for proposal in leaves {
720 let header = proposal.block_header();
721 let height = header.height();
722 let view = proposal.view_number();
723 tracing::debug!(height, ?view, "fetching dependencies for proposal");
724
725 let header_cf = header.chain_config();
726 let chain_config = if header_cf.commit() == instance.chain_config.commit() {
727 instance.chain_config
728 } else {
729 match header_cf.resolve() {
730 Some(cf) => cf,
731 None => {
732 tracing::info!(
733 height,
734 ?view,
735 commit = %header_cf.commit(),
736 "chain config not available, attempting to load from storage",
737 );
738 let cf = load_chain_config(tx, header_cf.commit())
739 .await
740 .context(format!(
741 "loading chain config {} for header {},{:?}",
742 header_cf.commit(),
743 header.height(),
744 proposal.view_number()
745 ))?;
746
747 catchup.add_chain_config(cf);
750 cf
751 },
752 }
753 };
754
755 accounts.insert(chain_config.fee_recipient);
756 accounts.extend(
757 get_l1_deposits(instance, header, parent, chain_config.fee_contract)
758 .await
759 .into_iter()
760 .map(|fee| fee.account()),
761 );
762 accounts.extend(header.fee_info().accounts());
763 parent = proposal;
764 }
765 Ok(accounts)
766}
767
768async fn reward_header_dependencies(
772 instance: &NodeState,
773 leaves: impl IntoIterator<Item = &Leaf2>,
774) -> anyhow::Result<HashSet<RewardAccountV2>> {
775 let mut reward_accounts = HashSet::default();
776 let epoch_height = instance.epoch_height;
777
778 let Some(epoch_height) = epoch_height else {
779 tracing::info!("epoch height is not set. returning empty reward_header_dependencies");
780 return Ok(HashSet::new());
781 };
782
783 let coordinator = instance.coordinator.clone();
784 let membership_lock = coordinator.membership().read().await;
785 let first_epoch = membership_lock.first_epoch();
786 drop(membership_lock);
787 for proposal in leaves {
789 let header = proposal.block_header();
790
791 let height = header.height();
792 let view = proposal.view_number();
793 tracing::debug!(height, ?view, "fetching dependencies for proposal");
794
795 let version = header.version();
796 if version < EpochVersion::version() {
798 continue;
799 }
800
801 let first_epoch = first_epoch.context("first epoch not found")?;
802
803 let proposal_epoch = EpochNumber::new(epoch_from_block_number(height, epoch_height));
804
805 if proposal_epoch <= first_epoch + 1 {
807 continue;
808 }
809
810 let epoch_membership = match coordinator.membership_for_epoch(Some(proposal_epoch)).await {
811 Ok(e) => e,
812 Err(err) => {
813 tracing::info!(
814 "failed to get membership for epoch={proposal_epoch:?}. err={err:#}"
815 );
816
817 coordinator
818 .wait_for_catchup(proposal_epoch)
819 .await
820 .context(format!("failed to catchup for epoch={proposal_epoch}"))?
821 },
822 };
823
824 let leader = epoch_membership.leader(proposal.view_number()).await?;
825 let membership_lock = coordinator.membership().read().await;
826 let validator = membership_lock.get_validator_config(&proposal_epoch, leader)?;
827 drop(membership_lock);
828
829 reward_accounts.insert(RewardAccountV2(validator.account));
830
831 let delegators: Vec<RewardAccountV2> = validator
832 .delegators
833 .keys()
834 .map(|d| RewardAccountV2(*d))
835 .collect();
836
837 reward_accounts.extend(delegators);
838 }
839 Ok(reward_accounts)
840}
841
842async fn get_leaf_from_proposal<Mode, P>(
843 tx: &mut Transaction<Mode>,
844 where_clause: &str,
845 param: P,
846) -> anyhow::Result<Leaf2>
847where
848 P: Type<Db> + for<'q> Encode<'q, Db>,
849{
850 let (data,) = query_as::<(Vec<u8>,)>(&format!(
851 "SELECT data FROM quorum_proposals2 WHERE {where_clause} LIMIT 1",
852 ))
853 .bind(param)
854 .fetch_one(tx.as_mut())
855 .await?;
856 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
857 bincode::deserialize(&data)?;
858 Ok(Leaf2::from_quorum_proposal(&proposal.data))
859}
860
861#[cfg(any(test, feature = "testing"))]
862pub(crate) mod impl_testable_data_source {
863
864 use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
865
866 use super::*;
867 use crate::api::{self, data_source::testing::TestableSequencerDataSource};
868
869 pub fn tmp_options(db: &TmpDb) -> Options {
870 #[cfg(not(feature = "embedded-db"))]
871 {
872 let opt = crate::persistence::sql::PostgresOptions {
873 port: Some(db.port()),
874 host: Some(db.host()),
875 user: Some("postgres".into()),
876 password: Some("password".into()),
877 ..Default::default()
878 };
879
880 opt.into()
881 }
882
883 #[cfg(feature = "embedded-db")]
884 {
885 let opt = crate::persistence::sql::SqliteOptions {
886 path: Some(db.path()),
887 };
888 opt.into()
889 }
890 }
891
892 #[async_trait]
893 impl TestableSequencerDataSource for DataSource {
894 type Storage = TmpDb;
895
896 async fn create_storage() -> Self::Storage {
897 TmpDb::init().await
898 }
899
900 fn persistence_options(storage: &Self::Storage) -> Self::Options {
901 tmp_options(storage)
902 }
903
904 fn leaf_only_ds_options(
905 storage: &Self::Storage,
906 opt: api::Options,
907 ) -> anyhow::Result<api::Options> {
908 let mut ds_opts = tmp_options(storage);
909 ds_opts.lightweight = true;
910 Ok(opt.query_sql(Default::default(), ds_opts))
911 }
912
913 fn options(storage: &Self::Storage, opt: api::Options) -> api::Options {
914 opt.query_sql(Default::default(), tmp_options(storage))
915 }
916 }
917}