1use std::{
2 collections::BTreeMap,
3 path::PathBuf,
4 str::FromStr,
5 sync::Arc,
6 time::{Duration, Instant},
7};
8
9use anyhow::{bail, Context};
10use async_trait::async_trait;
11use clap::Parser;
12use committable::Committable;
13use derivative::Derivative;
14use derive_more::derive::{From, Into};
15use espresso_types::{
16 parse_duration, parse_size,
17 traits::{EventsPersistenceRead, MembershipPersistence},
18 v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence, StateCatchup},
19 v0_3::{EventKey, IndexedStake, RewardAmount, StakeTableEvent, Validator},
20 BackoffParams, BlockMerkleTree, FeeMerkleTree, Leaf, Leaf2, NetworkConfig, Payload, PubKey,
21 StakeTableHash, ValidatorMap,
22};
23use futures::stream::StreamExt;
24use hotshot::InitializerEpochInfo;
25use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
26 DhtPersistentStorage, SerializableRecord,
27};
28use hotshot_query_service::{
29 availability::LeafQueryData,
30 data_source::{
31 storage::{
32 pruning::PrunerCfg,
33 sql::{
34 include_migrations, query_as, syntax_helpers::MAX_FN, Config, Db, Read, SqlStorage,
35 StorageConnectionType, Transaction, TransactionMode, Write,
36 },
37 },
38 Transaction as _, VersionedDataSource,
39 },
40 fetching::{
41 request::{LeafRequest, PayloadRequest, VidCommonRequest},
42 Provider,
43 },
44 merklized_state::MerklizedState,
45};
46use hotshot_types::{
47 data::{
48 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
49 QuorumProposalWrapperLegacy, VidCommitment, VidCommon, VidDisperseShare, VidDisperseShare0,
50 },
51 drb::{DrbInput, DrbResult},
52 event::{Event, EventType, HotShotAction, LeafInfo},
53 message::{convert_proposal, Proposal},
54 simple_certificate::{
55 CertificatePair, LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
56 NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
57 },
58 traits::{
59 block_contents::{BlockHeader, BlockPayload},
60 metrics::Metrics,
61 node_implementation::ConsensusTime,
62 },
63 vote::HasViewNumber,
64};
65use itertools::Itertools;
66use sqlx::{query, Executor, QueryBuilder, Row};
67
68use crate::{
69 catchup::SqlStateCatchup,
70 persistence::{migrate_network_config, persistence_metrics::PersistenceMetricsValue},
71 NodeType, SeqTypes, ViewNumber, RECENT_STAKE_TABLES_LIMIT,
72};
73
74#[derive(Parser, Clone, Derivative)]
76#[derivative(Debug)]
77pub struct PostgresOptions {
78 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_HOST")]
80 pub(crate) host: Option<String>,
81
82 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PORT")]
84 pub(crate) port: Option<u16>,
85
86 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_DATABASE")]
88 pub(crate) database: Option<String>,
89
90 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USER")]
92 pub(crate) user: Option<String>,
93
94 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PASSWORD")]
96 #[derivative(Debug = "ignore")]
98 pub(crate) password: Option<String>,
99
100 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USE_TLS")]
102 pub(crate) use_tls: bool,
103}
104
105impl Default for PostgresOptions {
106 fn default() -> Self {
107 Self::parse_from(std::iter::empty::<String>())
108 }
109}
110
111#[derive(Parser, Clone, Derivative, Default, From, Into)]
112#[derivative(Debug)]
113pub struct SqliteOptions {
114 #[clap(
117 long,
118 env = "ESPRESSO_SEQUENCER_STORAGE_PATH",
119 value_parser = build_sqlite_path
120 )]
121 pub(crate) path: PathBuf,
122}
123
124pub fn build_sqlite_path(path: &str) -> anyhow::Result<PathBuf> {
125 let sub_dir = PathBuf::from_str(path)?.join("sqlite");
126
127 if !sub_dir.exists() {
129 std::fs::create_dir_all(&sub_dir)
130 .with_context(|| format!("failed to create directory: {sub_dir:?}"))?;
131 }
132
133 Ok(sub_dir.join("database"))
134}
135
136#[derive(Parser, Clone, Derivative, From, Into)]
138#[derivative(Debug)]
139pub struct Options {
140 #[cfg(not(feature = "embedded-db"))]
141 #[clap(flatten)]
142 pub(crate) postgres_options: PostgresOptions,
143
144 #[cfg(feature = "embedded-db")]
145 #[clap(flatten)]
146 pub(crate) sqlite_options: SqliteOptions,
147
148 #[derivative(Debug = "ignore")]
161 pub(crate) uri: Option<String>,
162
163 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_PRUNE")]
172 pub(crate) prune: bool,
173
174 #[clap(flatten)]
176 pub(crate) pruning: PruningOptions,
177
178 #[clap(flatten)]
180 pub(crate) consensus_pruning: ConsensusPruningOptions,
181
182 #[clap(long, env = "ESPRESSO_SEQUENCER_FETCH_RATE_LIMIT")]
184 pub(crate) fetch_rate_limit: Option<usize>,
185
186 #[clap(long, env = "ESPRESSO_SEQUENCER_ACTIVE_FETCH_DELAY", value_parser = parse_duration)]
188 pub(crate) active_fetch_delay: Option<Duration>,
189
190 #[clap(long, env = "ESPRESSO_SEQUENCER_CHUNK_FETCH_DELAY", value_parser = parse_duration)]
192 pub(crate) chunk_fetch_delay: Option<Duration>,
193
194 #[clap(long, env = "ESPRESSO_SEQUENCER_ARCHIVE", conflicts_with = "prune")]
201 pub(crate) archive: bool,
202
203 #[clap(
205 long,
206 env = "ESPRESSO_SEQUENCER_LIGHTWEIGHT",
207 default_value_t = false,
208 conflicts_with = "archive"
209 )]
210 pub(crate) lightweight: bool,
211
212 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_IDLE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
217 pub(crate) idle_connection_timeout: Duration,
218
219 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "30m")]
226 pub(crate) connection_timeout: Duration,
227
228 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_SLOW_STATEMENT_THRESHOLD", value_parser = parse_duration, default_value = "1s")]
229 pub(crate) slow_statement_threshold: Duration,
230
231 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_STATEMENT_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
236 pub(crate) statement_timeout: Duration,
237
238 #[clap(
244 long,
245 env = "ESPRESSO_SEQUENCER_DATABASE_MIN_CONNECTIONS",
246 default_value = "0"
247 )]
248 pub(crate) min_connections: u32,
249
250 #[cfg(not(feature = "embedded-db"))]
253 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_QUERY_MIN_CONNECTIONS", default_value = None)]
254 pub(crate) query_min_connections: Option<u32>,
255
256 #[clap(
261 long,
262 env = "ESPRESSO_SEQUENCER_DATABASE_MAX_CONNECTIONS",
263 default_value = "25"
264 )]
265 pub(crate) max_connections: u32,
266
267 #[cfg(not(feature = "embedded-db"))]
270 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_QUERY_MAX_CONNECTIONS", default_value = None)]
271 pub(crate) query_max_connections: Option<u32>,
272
273 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_TYPES_MIGRATION_BATCH_SIZE")]
278 pub(crate) types_migration_batch_size: Option<u64>,
279
280 #[clap(skip)]
288 pub(crate) pool: Option<sqlx::Pool<Db>>,
289}
290
291impl Default for Options {
292 fn default() -> Self {
293 Self::parse_from(std::iter::empty::<String>())
294 }
295}
296
297#[cfg(not(feature = "embedded-db"))]
298impl From<PostgresOptions> for Config {
299 fn from(opt: PostgresOptions) -> Self {
300 let mut cfg = Config::default();
301
302 if let Some(host) = opt.host {
303 cfg = cfg.host(host);
304 }
305
306 if let Some(port) = opt.port {
307 cfg = cfg.port(port);
308 }
309
310 if let Some(database) = &opt.database {
311 cfg = cfg.database(database);
312 }
313
314 if let Some(user) = &opt.user {
315 cfg = cfg.user(user);
316 }
317
318 if let Some(password) = &opt.password {
319 cfg = cfg.password(password);
320 }
321
322 if opt.use_tls {
323 cfg = cfg.tls();
324 }
325
326 cfg = cfg.max_connections(20);
327 cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
328 cfg = cfg.connection_timeout(Duration::from_secs(10240));
329 cfg = cfg.slow_statement_threshold(Duration::from_secs(1));
330 cfg = cfg.statement_timeout(Duration::from_secs(600)); cfg
333 }
334}
335
336#[cfg(feature = "embedded-db")]
337impl From<SqliteOptions> for Config {
338 fn from(opt: SqliteOptions) -> Self {
339 let mut cfg = Config::default();
340
341 cfg = cfg.db_path(opt.path);
342
343 cfg = cfg.max_connections(20);
344 cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
345 cfg = cfg.connection_timeout(Duration::from_secs(10240));
346 cfg = cfg.slow_statement_threshold(Duration::from_secs(2));
347 cfg = cfg.statement_timeout(Duration::from_secs(600));
348 cfg
349 }
350}
351
352#[cfg(not(feature = "embedded-db"))]
353impl From<PostgresOptions> for Options {
354 fn from(opt: PostgresOptions) -> Self {
355 Options {
356 postgres_options: opt,
357 max_connections: 20,
358 idle_connection_timeout: Duration::from_secs(120),
359 connection_timeout: Duration::from_secs(10240),
360 slow_statement_threshold: Duration::from_secs(1),
361 statement_timeout: Duration::from_secs(600),
362 ..Default::default()
363 }
364 }
365}
366
367#[cfg(feature = "embedded-db")]
368impl From<SqliteOptions> for Options {
369 fn from(opt: SqliteOptions) -> Self {
370 Options {
371 sqlite_options: opt,
372 max_connections: 5,
373 idle_connection_timeout: Duration::from_secs(120),
374 connection_timeout: Duration::from_secs(10240),
375 slow_statement_threshold: Duration::from_secs(1),
376 uri: None,
377 statement_timeout: Duration::from_secs(600),
378 prune: false,
379 pruning: Default::default(),
380 consensus_pruning: Default::default(),
381 fetch_rate_limit: None,
382 active_fetch_delay: None,
383 chunk_fetch_delay: None,
384 archive: false,
385 lightweight: false,
386 min_connections: 0,
387 types_migration_batch_size: None,
388 pool: None,
389 }
390 }
391}
392impl TryFrom<&Options> for Config {
393 type Error = anyhow::Error;
394
395 fn try_from(opt: &Options) -> Result<Self, Self::Error> {
396 let mut cfg = match &opt.uri {
397 Some(uri) => uri.parse()?,
398 None => Self::default(),
399 };
400
401 if let Some(pool) = &opt.pool {
402 cfg = cfg.pool(pool.clone());
403 }
404
405 cfg = cfg.max_connections(opt.max_connections);
406 cfg = cfg.idle_connection_timeout(opt.idle_connection_timeout);
407 cfg = cfg.min_connections(opt.min_connections);
408
409 #[cfg(not(feature = "embedded-db"))]
410 {
411 cfg =
412 cfg.query_max_connections(opt.query_max_connections.unwrap_or(opt.max_connections));
413 cfg =
414 cfg.query_min_connections(opt.query_min_connections.unwrap_or(opt.min_connections));
415 }
416
417 cfg = cfg.connection_timeout(opt.connection_timeout);
418 cfg = cfg.slow_statement_threshold(opt.slow_statement_threshold);
419 cfg = cfg.statement_timeout(opt.statement_timeout);
420
421 #[cfg(not(feature = "embedded-db"))]
422 {
423 cfg = cfg.migrations(include_migrations!(
424 "$CARGO_MANIFEST_DIR/api/migrations/postgres"
425 ));
426
427 let pg_options = &opt.postgres_options;
428
429 if let Some(host) = &pg_options.host {
430 cfg = cfg.host(host.clone());
431 }
432
433 if let Some(port) = pg_options.port {
434 cfg = cfg.port(port);
435 }
436
437 if let Some(database) = &pg_options.database {
438 cfg = cfg.database(database);
439 }
440
441 if let Some(user) = &pg_options.user {
442 cfg = cfg.user(user);
443 }
444
445 if let Some(password) = &pg_options.password {
446 cfg = cfg.password(password);
447 }
448
449 if pg_options.use_tls {
450 cfg = cfg.tls();
451 }
452 }
453
454 #[cfg(feature = "embedded-db")]
455 {
456 cfg = cfg.migrations(include_migrations!(
457 "$CARGO_MANIFEST_DIR/api/migrations/sqlite"
458 ));
459
460 cfg = cfg.db_path(opt.sqlite_options.path.clone());
461 }
462
463 if opt.prune {
464 cfg = cfg.pruner_cfg(PrunerCfg::from(opt.pruning))?;
465 }
466 if opt.archive {
467 cfg = cfg.archive();
468 }
469
470 Ok(cfg)
471 }
472}
473
474#[derive(Parser, Clone, Copy, Debug)]
476pub struct PruningOptions {
477 #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_PRUNING_THRESHOLD", value_parser = parse_size)]
481 pruning_threshold: Option<u64>,
482
483 #[clap(
486 long,
487 env = "ESPRESSO_SEQUENCER_PRUNER_MINIMUM_RETENTION",
488 value_parser = parse_duration,
489 )]
490 minimum_retention: Option<Duration>,
491
492 #[clap(
495 long,
496 env = "ESPRESSO_SEQUENCER_PRUNER_TARGET_RETENTION",
497 value_parser = parse_duration,
498 )]
499 target_retention: Option<Duration>,
500
501 #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_BATCH_SIZE")]
504 batch_size: Option<u64>,
505
506 #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_MAX_USAGE")]
512 max_usage: Option<u16>,
513
514 #[clap(
516 long,
517 env = "ESPRESSO_SEQUENCER_PRUNER_INTERVAL",
518 value_parser = parse_duration,
519 )]
520 interval: Option<Duration>,
521
522 #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_INCREMENTAL_VACUUM_PAGES")]
526 pages: Option<u64>,
527}
528
529impl Default for PruningOptions {
530 fn default() -> Self {
531 Self::parse_from(std::iter::empty::<String>())
532 }
533}
534
535impl From<PruningOptions> for PrunerCfg {
536 fn from(opt: PruningOptions) -> Self {
537 let mut cfg = PrunerCfg::new();
538 if let Some(threshold) = opt.pruning_threshold {
539 cfg = cfg.with_pruning_threshold(threshold);
540 }
541 if let Some(min) = opt.minimum_retention {
542 cfg = cfg.with_minimum_retention(min);
543 }
544 if let Some(target) = opt.target_retention {
545 cfg = cfg.with_target_retention(target);
546 }
547 if let Some(batch) = opt.batch_size {
548 cfg = cfg.with_batch_size(batch);
549 }
550 if let Some(max) = opt.max_usage {
551 cfg = cfg.with_max_usage(max);
552 }
553 if let Some(interval) = opt.interval {
554 cfg = cfg.with_interval(interval);
555 }
556
557 if let Some(pages) = opt.pages {
558 cfg = cfg.with_incremental_vacuum_pages(pages)
559 }
560
561 cfg = cfg.with_state_tables(vec![
562 BlockMerkleTree::state_type().to_string(),
563 FeeMerkleTree::state_type().to_string(),
564 ]);
565
566 cfg
567 }
568}
569
570#[derive(Parser, Clone, Copy, Debug)]
572pub struct ConsensusPruningOptions {
573 #[clap(
589 name = "TARGET_RETENTION",
590 long = "consensus-storage-target-retention",
591 env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION",
592 default_value = "302000"
593 )]
594 target_retention: u64,
595
596 #[clap(
607 name = "MINIMUM_RETENTION",
608 long = "consensus-storage-minimum-retention",
609 env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION",
610 default_value = "130000"
611 )]
612 minimum_retention: u64,
613
614 #[clap(
619 name = "TARGET_USAGE",
620 long = "consensus-storage-target-usage",
621 env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE",
622 default_value = "1000000000"
623 )]
624 target_usage: u64,
625}
626
627impl Default for ConsensusPruningOptions {
628 fn default() -> Self {
629 Self::parse_from(std::iter::empty::<String>())
630 }
631}
632
633#[async_trait]
634impl PersistenceOptions for Options {
635 type Persistence = Persistence;
636
637 fn set_view_retention(&mut self, view_retention: u64) {
638 self.consensus_pruning.target_retention = view_retention;
639 self.consensus_pruning.minimum_retention = view_retention;
640 }
641
642 async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
643 let config = (&*self).try_into()?;
644 let persistence = Persistence {
645 db: SqlStorage::connect(config, StorageConnectionType::Sequencer).await?,
646 gc_opt: self.consensus_pruning,
647 internal_metrics: PersistenceMetricsValue::default(),
648 };
649 persistence.migrate_quorum_proposal_leaf_hashes().await?;
650 self.pool = Some(persistence.db.pool());
651 Ok(persistence)
652 }
653
654 async fn reset(self) -> anyhow::Result<()> {
655 SqlStorage::connect(
656 Config::try_from(&self)?.reset_schema(),
657 StorageConnectionType::Sequencer,
658 )
659 .await?;
660 Ok(())
661 }
662}
663
664#[derive(Clone, Debug)]
666pub struct Persistence {
667 db: SqlStorage,
668 gc_opt: ConsensusPruningOptions,
669 internal_metrics: PersistenceMetricsValue,
671}
672
673impl Persistence {
674 async fn migrate_quorum_proposal_leaf_hashes(&self) -> anyhow::Result<()> {
681 let mut tx = self.db.write().await?;
682
683 let mut proposals = tx.fetch("SELECT * FROM quorum_proposals");
684
685 let mut updates = vec![];
686 while let Some(row) = proposals.next().await {
687 let row = row?;
688
689 let hash: Option<String> = row.try_get("leaf_hash")?;
690 if hash.is_none() {
691 let view: i64 = row.try_get("view")?;
692 let data: Vec<u8> = row.try_get("data")?;
693 let proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
694 bincode::deserialize(&data)?;
695 let leaf = Leaf::from_quorum_proposal(&proposal.data);
696 let leaf_hash = Committable::commit(&leaf);
697 tracing::info!(view, %leaf_hash, "populating quorum proposal leaf hash");
698 updates.push((view, leaf_hash.to_string()));
699 }
700 }
701 drop(proposals);
702
703 tx.upsert("quorum_proposals", ["view", "leaf_hash"], ["view"], updates)
704 .await?;
705
706 tx.commit().await
707 }
708
709 async fn generate_decide_events(
710 &self,
711 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
712 consumer: &impl EventConsumer,
713 ) -> anyhow::Result<()> {
714 let mut last_processed_view: Option<i64> = self
715 .db
716 .read()
717 .await?
718 .fetch_optional("SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1")
719 .await?
720 .map(|row| row.get("last_processed_view"));
721 loop {
722 let mut tx = self.db.read().await?;
729
730 let from_view = match last_processed_view {
735 Some(v) => v + 1,
736 None => 0,
737 };
738 tracing::debug!(?from_view, "generate decide event");
739
740 let mut parent = None;
741 let mut rows = query(
742 "SELECT leaf, qc, next_epoch_qc FROM anchor_leaf2 WHERE view >= $1 ORDER BY view",
743 )
744 .bind(from_view)
745 .fetch(tx.as_mut());
746 let mut leaves = vec![];
747 let mut final_qc = None;
748 while let Some(row) = rows.next().await {
749 let row = match row {
750 Ok(row) => row,
751 Err(err) => {
752 tracing::warn!("error loading row: {err:#}");
755 break;
756 },
757 };
758
759 let leaf_data: Vec<u8> = row.get("leaf");
760 let leaf = bincode::deserialize::<Leaf2>(&leaf_data)?;
761 let qc_data: Vec<u8> = row.get("qc");
762 let qc = bincode::deserialize::<QuorumCertificate2<SeqTypes>>(&qc_data)?;
763 let next_epoch_qc = match row.get::<Option<Vec<u8>>, _>("next_epoch_qc") {
764 Some(bytes) => {
765 Some(bincode::deserialize::<NextEpochQuorumCertificate2<SeqTypes>>(&bytes)?)
766 },
767 None => None,
768 };
769 let height = leaf.block_header().block_number();
770
771 if let Some(parent) = parent {
775 if height != parent + 1 {
776 tracing::debug!(
777 height,
778 parent,
779 "ending decide event at non-consecutive leaf"
780 );
781 break;
782 }
783 }
784 parent = Some(height);
785 leaves.push(leaf);
786 final_qc = Some(CertificatePair::new(qc, next_epoch_qc));
787 }
788 drop(rows);
789
790 let Some(final_qc) = final_qc else {
791 tracing::debug!(from_view, "no new leaves at decide");
793 return Ok(());
794 };
795
796 let from_view = leaves[0].view_number();
799 let to_view = leaves[leaves.len() - 1].view_number();
800
801 let mut vid_shares = tx
803 .fetch_all(
804 query("SELECT view, data FROM vid_share2 where view >= $1 AND view <= $2")
805 .bind(from_view.u64() as i64)
806 .bind(to_view.u64() as i64),
807 )
808 .await?
809 .into_iter()
810 .map(|row| {
811 let view: i64 = row.get("view");
812 let data: Vec<u8> = row.get("data");
813 let vid_proposal = bincode::deserialize::<
814 Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
815 >(&data)?;
816 Ok((view as u64, vid_proposal.data))
817 })
818 .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
819
820 let mut da_proposals = tx
822 .fetch_all(
823 query("SELECT view, data FROM da_proposal2 where view >= $1 AND view <= $2")
824 .bind(from_view.u64() as i64)
825 .bind(to_view.u64() as i64),
826 )
827 .await?
828 .into_iter()
829 .map(|row| {
830 let view: i64 = row.get("view");
831 let data: Vec<u8> = row.get("data");
832 let da_proposal =
833 bincode::deserialize::<Proposal<SeqTypes, DaProposal2<SeqTypes>>>(&data)?;
834 Ok((view as u64, da_proposal.data))
835 })
836 .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
837
838 let state_certs = Self::load_state_certs(&mut tx, from_view, to_view)
840 .await
841 .inspect_err(|err| {
842 tracing::error!(
843 ?from_view,
844 ?to_view,
845 "failed to load state certificates. error={err:#}"
846 );
847 })?;
848 drop(tx);
849
850 let leaf_chain = leaves
852 .into_iter()
853 .rev()
855 .map(|mut leaf| {
856 let view = leaf.view_number();
857
858 let vid_share = vid_shares.remove(&view);
860 if vid_share.is_none() {
861 tracing::debug!(?view, "VID share not available at decide");
862 }
863
864 if let Some(proposal) = da_proposals.remove(&view) {
866 let payload =
867 Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata);
868 leaf.fill_block_payload_unchecked(payload);
869 } else if view == ViewNumber::genesis() {
870 leaf.fill_block_payload_unchecked(Payload::empty().0);
873 } else {
874 tracing::debug!(?view, "DA proposal not available at decide");
875 }
876
877 let state_cert = state_certs
878 .get(&view)
879 .cloned();
880
881 LeafInfo {
882 leaf,
883 vid_share,
884 state_cert,
885 state: Default::default(),
888 delta: Default::default(),
889 }
890 })
891 .collect();
892
893 {
894 tracing::debug!(
896 ?from_view,
897 ?to_view,
898 ?final_qc,
899 ?leaf_chain,
900 "generating decide event"
901 );
902 let deciding_qc = if let Some(deciding_qc) = &deciding_qc {
905 (deciding_qc.view_number() == final_qc.view_number() + 1)
906 .then_some(deciding_qc.clone())
907 } else {
908 None
909 };
910 consumer
911 .handle_event(&Event {
912 view_number: to_view,
913 event: EventType::Decide {
914 leaf_chain: Arc::new(leaf_chain),
915 committing_qc: Arc::new(final_qc),
916 deciding_qc,
917 block_size: None,
918 },
919 })
920 .await?;
921 }
922
923 let mut tx = self.db.write().await?;
924
925 tx.upsert(
931 "event_stream",
932 ["id", "last_processed_view"],
933 ["id"],
934 [(1i32, to_view.u64() as i64)],
935 )
936 .await?;
937
938 for (epoch, state_cert) in state_certs {
940 let state_cert_bytes = bincode::serialize(&state_cert)?;
941 tx.upsert(
942 "finalized_state_cert",
943 ["epoch", "state_cert"],
944 ["epoch"],
945 [(epoch as i64, state_cert_bytes)],
946 )
947 .await?;
948 }
949
950 tx.execute(
952 query("DELETE FROM vid_share2 where view >= $1 AND view <= $2")
953 .bind(from_view.u64() as i64)
954 .bind(to_view.u64() as i64),
955 )
956 .await?;
957 tx.execute(
958 query("DELETE FROM da_proposal2 where view >= $1 AND view <= $2")
959 .bind(from_view.u64() as i64)
960 .bind(to_view.u64() as i64),
961 )
962 .await?;
963 tx.execute(
964 query("DELETE FROM quorum_proposals2 where view >= $1 AND view <= $2")
965 .bind(from_view.u64() as i64)
966 .bind(to_view.u64() as i64),
967 )
968 .await?;
969 tx.execute(
970 query("DELETE FROM quorum_certificate2 where view >= $1 AND view <= $2")
971 .bind(from_view.u64() as i64)
972 .bind(to_view.u64() as i64),
973 )
974 .await?;
975 tx.execute(
976 query("DELETE FROM state_cert where view >= $1 AND view <= $2")
977 .bind(from_view.u64() as i64)
978 .bind(to_view.u64() as i64),
979 )
980 .await?;
981
982 tx.execute(
986 query("DELETE FROM anchor_leaf2 WHERE view >= $1 AND view < $2")
987 .bind(from_view.u64() as i64)
988 .bind(to_view.u64() as i64),
989 )
990 .await?;
991
992 tx.commit().await?;
993 last_processed_view = Some(to_view.u64() as i64);
994 }
995 }
996
997 async fn load_state_certs(
998 tx: &mut Transaction<Read>,
999 from_view: ViewNumber,
1000 to_view: ViewNumber,
1001 ) -> anyhow::Result<BTreeMap<u64, LightClientStateUpdateCertificateV2<SeqTypes>>> {
1002 let rows = tx
1003 .fetch_all(
1004 query("SELECT view, state_cert FROM state_cert WHERE view >= $1 AND view <= $2")
1005 .bind(from_view.u64() as i64)
1006 .bind(to_view.u64() as i64),
1007 )
1008 .await?;
1009
1010 let mut result = BTreeMap::new();
1011
1012 for row in rows {
1013 let data: Vec<u8> = row.get("state_cert");
1014
1015 let cert: LightClientStateUpdateCertificateV2<SeqTypes> = bincode::deserialize(&data)
1016 .or_else(|err_v2| {
1017 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&data)
1018 .map(Into::into)
1019 .context(format!(
1020 "Failed to deserialize LightClientStateUpdateCertificate: with v1 and v2. \
1021 error: {err_v2}"
1022 ))
1023 })?;
1024
1025 result.insert(cert.epoch.u64(), cert);
1026 }
1027
1028 Ok(result)
1029 }
1030
1031 #[tracing::instrument(skip(self))]
1032 async fn prune(&self, cur_view: ViewNumber) -> anyhow::Result<()> {
1033 let mut tx = self.db.write().await?;
1034
1035 prune_to_view(
1037 &mut tx,
1038 cur_view.u64().saturating_sub(self.gc_opt.target_retention),
1039 )
1040 .await?;
1041
1042 #[cfg(feature = "embedded-db")]
1045 let usage_query = format!(
1046 "SELECT sum(pgsize) FROM dbstat WHERE name IN ({})",
1047 PRUNE_TABLES
1048 .iter()
1049 .map(|table| format!("'{table}'"))
1050 .join(",")
1051 );
1052
1053 #[cfg(not(feature = "embedded-db"))]
1054 let usage_query = {
1055 let table_sizes = PRUNE_TABLES
1056 .iter()
1057 .map(|table| format!("pg_table_size('{table}')"))
1058 .join(" + ");
1059 format!("SELECT {table_sizes}")
1060 };
1061
1062 let (usage,): (i64,) = query_as(&usage_query).fetch_one(tx.as_mut()).await?;
1063 tracing::debug!(usage, "consensus storage usage after pruning");
1064
1065 if (usage as u64) > self.gc_opt.target_usage {
1066 tracing::warn!(
1067 usage,
1068 gc_opt = ?self.gc_opt,
1069 "consensus storage is running out of space, pruning to minimum retention"
1070 );
1071 prune_to_view(
1072 &mut tx,
1073 cur_view.u64().saturating_sub(self.gc_opt.minimum_retention),
1074 )
1075 .await?;
1076 }
1077
1078 tx.commit().await
1079 }
1080}
1081
1082const PRUNE_TABLES: &[&str] = &[
1083 "anchor_leaf2",
1084 "vid_share2",
1085 "da_proposal2",
1086 "quorum_proposals2",
1087 "quorum_certificate2",
1088];
1089
1090async fn prune_to_view(tx: &mut Transaction<Write>, view: u64) -> anyhow::Result<()> {
1091 if view == 0 {
1092 return Ok(());
1094 }
1095 tracing::debug!(view, "pruning consensus storage");
1096
1097 for table in PRUNE_TABLES {
1098 let res = query(&format!("DELETE FROM {table} WHERE view < $1"))
1099 .bind(view as i64)
1100 .execute(tx.as_mut())
1101 .await
1102 .context(format!("pruning {table}"))?;
1103 if res.rows_affected() > 0 {
1104 tracing::info!(
1105 "garbage collected {} rows from {table}",
1106 res.rows_affected()
1107 );
1108 }
1109 }
1110
1111 Ok(())
1112}
1113
1114#[async_trait]
1115impl SequencerPersistence for Persistence {
1116 fn into_catchup_provider(
1117 self,
1118 backoff: BackoffParams,
1119 ) -> anyhow::Result<Arc<dyn StateCatchup>> {
1120 Ok(Arc::new(SqlStateCatchup::new(Arc::new(self.db), backoff)))
1121 }
1122
1123 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
1124 tracing::info!("loading config from Postgres");
1125
1126 let Some(row) = self
1128 .db
1129 .read()
1130 .await?
1131 .fetch_optional("SELECT config FROM network_config ORDER BY id DESC LIMIT 1")
1132 .await?
1133 else {
1134 tracing::info!("config not found");
1135 return Ok(None);
1136 };
1137 let json = row.try_get("config")?;
1138
1139 let json = migrate_network_config(json).context("migration of network config failed")?;
1140 let config = serde_json::from_value(json).context("malformed config file")?;
1141
1142 Ok(Some(config))
1143 }
1144
1145 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
1146 tracing::info!("saving config to database");
1147 let json = serde_json::to_value(cfg)?;
1148
1149 let mut tx = self.db.write().await?;
1150 tx.execute(query("INSERT INTO network_config (config) VALUES ($1)").bind(json))
1151 .await?;
1152 tx.commit().await
1153 }
1154
1155 async fn append_decided_leaves(
1156 &self,
1157 view: ViewNumber,
1158 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
1159 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
1160 consumer: &(impl EventConsumer + 'static),
1161 ) -> anyhow::Result<()> {
1162 let values = leaf_chain
1163 .into_iter()
1164 .map(|(info, cert)| {
1165 let mut leaf = info.leaf.clone();
1170 leaf.unfill_block_payload();
1171
1172 let view = cert.view_number().u64() as i64;
1173 let leaf_bytes = bincode::serialize(&leaf)?;
1174 let qc_bytes = bincode::serialize(cert.qc())?;
1175 let next_epoch_qc_bytes = match cert.next_epoch_qc() {
1176 Some(qc) => Some(bincode::serialize(qc)?),
1177 None => None,
1178 };
1179 Ok((view, leaf_bytes, qc_bytes, next_epoch_qc_bytes))
1180 })
1181 .collect::<anyhow::Result<Vec<_>>>()?;
1182
1183 let mut tx = self.db.write().await?;
1186
1187 tx.upsert(
1188 "anchor_leaf2",
1189 ["view", "leaf", "qc", "next_epoch_qc"],
1190 ["view"],
1191 values,
1192 )
1193 .await?;
1194 tx.commit().await?;
1195
1196 if let Err(err) = self.generate_decide_events(deciding_qc, consumer).await {
1199 tracing::warn!(?view, "event processing failed: {err:#}");
1203 return Ok(());
1204 }
1205
1206 if let Err(err) = self.prune(view).await {
1209 tracing::warn!(?view, "pruning failed: {err:#}");
1210 }
1211
1212 Ok(())
1213 }
1214
1215 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1216 Ok(self
1217 .db
1218 .read()
1219 .await?
1220 .fetch_optional(query("SELECT view FROM highest_voted_view WHERE id = 0"))
1221 .await?
1222 .map(|row| {
1223 let view: i64 = row.get("view");
1224 ViewNumber::new(view as u64)
1225 }))
1226 }
1227
1228 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1229 Ok(self
1230 .db
1231 .read()
1232 .await?
1233 .fetch_optional(query("SELECT view FROM restart_view WHERE id = 0"))
1234 .await?
1235 .map(|row| {
1236 let view: i64 = row.get("view");
1237 ViewNumber::new(view as u64)
1238 }))
1239 }
1240
1241 async fn load_anchor_leaf(
1242 &self,
1243 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
1244 let Some(row) = self
1245 .db
1246 .read()
1247 .await?
1248 .fetch_optional("SELECT leaf, qc FROM anchor_leaf2 ORDER BY view DESC LIMIT 1")
1249 .await?
1250 else {
1251 return Ok(None);
1252 };
1253
1254 let leaf_bytes: Vec<u8> = row.get("leaf");
1255 let leaf2: Leaf2 = bincode::deserialize(&leaf_bytes)?;
1256
1257 let qc_bytes: Vec<u8> = row.get("qc");
1258 let qc2: QuorumCertificate2<SeqTypes> = bincode::deserialize(&qc_bytes)?;
1259
1260 Ok(Some((leaf2, qc2)))
1261 }
1262
1263 async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
1264 let mut tx = self.db.read().await?;
1265 let (view,) = query_as::<(i64,)>("SELECT coalesce(max(view), 0) FROM anchor_leaf2")
1266 .fetch_one(tx.as_mut())
1267 .await?;
1268 Ok(ViewNumber::new(view as u64))
1269 }
1270
1271 async fn load_da_proposal(
1272 &self,
1273 view: ViewNumber,
1274 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
1275 let result = self
1276 .db
1277 .read()
1278 .await?
1279 .fetch_optional(
1280 query("SELECT data FROM da_proposal2 where view = $1").bind(view.u64() as i64),
1281 )
1282 .await?;
1283
1284 result
1285 .map(|row| {
1286 let bytes: Vec<u8> = row.get("data");
1287 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1288 })
1289 .transpose()
1290 }
1291
1292 async fn load_vid_share(
1293 &self,
1294 view: ViewNumber,
1295 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
1296 let result = self
1297 .db
1298 .read()
1299 .await?
1300 .fetch_optional(
1301 query("SELECT data FROM vid_share2 where view = $1").bind(view.u64() as i64),
1302 )
1303 .await?;
1304
1305 result
1306 .map(|row| {
1307 let bytes: Vec<u8> = row.get("data");
1308 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1309 })
1310 .transpose()
1311 }
1312
1313 async fn load_quorum_proposals(
1314 &self,
1315 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
1316 {
1317 let rows = self
1318 .db
1319 .read()
1320 .await?
1321 .fetch_all("SELECT * FROM quorum_proposals2")
1322 .await?;
1323
1324 Ok(BTreeMap::from_iter(
1325 rows.into_iter()
1326 .map(|row| {
1327 let view: i64 = row.get("view");
1328 let view_number: ViewNumber = ViewNumber::new(view.try_into()?);
1329 let bytes: Vec<u8> = row.get("data");
1330 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1331 bincode::deserialize(&bytes).or_else(|error| {
1332 bincode::deserialize::<
1333 Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>,
1334 >(&bytes)
1335 .map(convert_proposal)
1336 .inspect_err(|err_v3| {
1337 tracing::warn!(
1338 ?view_number,
1339 %error,
1340 %err_v3,
1341 "ignoring malformed quorum proposal DB row"
1342 );
1343 })
1344 })?;
1345 Ok((view_number, proposal))
1346 })
1347 .collect::<anyhow::Result<Vec<_>>>()?,
1348 ))
1349 }
1350
1351 async fn load_quorum_proposal(
1352 &self,
1353 view: ViewNumber,
1354 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
1355 let mut tx = self.db.read().await?;
1356 let (data,) =
1357 query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE view = $1 LIMIT 1")
1358 .bind(view.u64() as i64)
1359 .fetch_one(tx.as_mut())
1360 .await?;
1361 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1362 bincode::deserialize(&data).or_else(|error| {
1363 bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1364 &data,
1365 )
1366 .map(convert_proposal)
1367 .context(format!(
1368 "Failed to deserialize quorum proposal for view {view}. error={error}"
1369 ))
1370 })?;
1371
1372 Ok(proposal)
1373 }
1374
1375 async fn append_vid(
1376 &self,
1377 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1378 ) -> anyhow::Result<()> {
1379 let view = proposal.data.view_number().u64();
1380 let payload_hash = proposal.data.payload_commitment();
1381 let data_bytes = bincode::serialize(proposal).unwrap();
1382
1383 let now = Instant::now();
1384 let mut tx = self.db.write().await?;
1385 tx.upsert(
1386 "vid_share2",
1387 ["view", "data", "payload_hash"],
1388 ["view"],
1389 [(view as i64, data_bytes, payload_hash.to_string())],
1390 )
1391 .await?;
1392 let res = tx.commit().await;
1393 self.internal_metrics
1394 .internal_append_vid_duration
1395 .add_point(now.elapsed().as_secs_f64());
1396 res
1397 }
1398
1399 async fn append_da(
1400 &self,
1401 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1402 vid_commit: VidCommitment,
1403 ) -> anyhow::Result<()> {
1404 let data = &proposal.data;
1405 let view = data.view_number().u64();
1406 let data_bytes = bincode::serialize(proposal).unwrap();
1407
1408 let now = Instant::now();
1409 let mut tx = self.db.write().await?;
1410 tx.upsert(
1411 "da_proposal",
1412 ["view", "data", "payload_hash"],
1413 ["view"],
1414 [(view as i64, data_bytes, vid_commit.to_string())],
1415 )
1416 .await?;
1417 let res = tx.commit().await;
1418 self.internal_metrics
1419 .internal_append_da_duration
1420 .add_point(now.elapsed().as_secs_f64());
1421 res
1422 }
1423
1424 async fn record_action(
1425 &self,
1426 view: ViewNumber,
1427 _epoch: Option<EpochNumber>,
1428 action: HotShotAction,
1429 ) -> anyhow::Result<()> {
1430 if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
1432 return Ok(());
1433 }
1434
1435 let stmt = format!(
1436 "INSERT INTO highest_voted_view (id, view) VALUES (0, $1)
1437 ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(highest_voted_view.view, excluded.view)"
1438 );
1439
1440 let mut tx = self.db.write().await?;
1441 tx.execute(query(&stmt).bind(view.u64() as i64)).await?;
1442
1443 if matches!(action, HotShotAction::Vote) {
1444 let restart_view = view + 1;
1445 let stmt = format!(
1446 "INSERT INTO restart_view (id, view) VALUES (0, $1)
1447 ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(restart_view.view, excluded.view)"
1448 );
1449 tx.execute(query(&stmt).bind(restart_view.u64() as i64))
1450 .await?;
1451 }
1452
1453 tx.commit().await
1454 }
1455
1456 async fn append_quorum_proposal2(
1457 &self,
1458 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1459 ) -> anyhow::Result<()> {
1460 let view_number = proposal.data.view_number().u64();
1461
1462 let proposal_bytes = bincode::serialize(&proposal).context("serializing proposal")?;
1463 let leaf_hash = Committable::commit(&Leaf2::from_quorum_proposal(&proposal.data));
1464
1465 let now = Instant::now();
1466 let mut tx = self.db.write().await?;
1467 tx.upsert(
1468 "quorum_proposals2",
1469 ["view", "leaf_hash", "data"],
1470 ["view"],
1471 [(view_number as i64, leaf_hash.to_string(), proposal_bytes)],
1472 )
1473 .await?;
1474
1475 let justify_qc = proposal.data.justify_qc();
1477 let justify_qc_bytes = bincode::serialize(&justify_qc).context("serializing QC")?;
1478 tx.upsert(
1479 "quorum_certificate2",
1480 ["view", "leaf_hash", "data"],
1481 ["view"],
1482 [(
1483 justify_qc.view_number.u64() as i64,
1484 justify_qc.data.leaf_commit.to_string(),
1485 &justify_qc_bytes,
1486 )],
1487 )
1488 .await?;
1489 let res = tx.commit().await;
1490 self.internal_metrics
1491 .internal_append_quorum2_duration
1492 .add_point(now.elapsed().as_secs_f64());
1493 res
1494 }
1495
1496 async fn load_upgrade_certificate(
1497 &self,
1498 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1499 let result = self
1500 .db
1501 .read()
1502 .await?
1503 .fetch_optional("SELECT * FROM upgrade_certificate where id = true")
1504 .await?;
1505
1506 result
1507 .map(|row| {
1508 let bytes: Vec<u8> = row.get("data");
1509 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1510 })
1511 .transpose()
1512 }
1513
1514 async fn store_upgrade_certificate(
1515 &self,
1516 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1517 ) -> anyhow::Result<()> {
1518 let certificate = match decided_upgrade_certificate {
1519 Some(cert) => cert,
1520 None => return Ok(()),
1521 };
1522 let upgrade_certificate_bytes =
1523 bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1524 let mut tx = self.db.write().await?;
1525 tx.upsert(
1526 "upgrade_certificate",
1527 ["id", "data"],
1528 ["id"],
1529 [(true, upgrade_certificate_bytes)],
1530 )
1531 .await?;
1532 tx.commit().await
1533 }
1534
1535 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1536 let batch_size: i64 = 10000;
1537 let mut tx = self.db.read().await?;
1538
1539 let (is_completed, mut offset) = query_as::<(bool, i64)>(
1544 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'anchor_leaf'",
1545 )
1546 .fetch_one(tx.as_mut())
1547 .await?;
1548
1549 if is_completed {
1550 tracing::info!("decided leaves already migrated");
1551 return Ok(());
1552 }
1553
1554 tracing::warn!("migrating decided leaves..");
1555 loop {
1556 let mut tx = self.db.read().await?;
1557 let rows = query(
1558 "SELECT view, leaf, qc FROM anchor_leaf WHERE view >= $1 ORDER BY view LIMIT $2",
1559 )
1560 .bind(offset)
1561 .bind(batch_size)
1562 .fetch_all(tx.as_mut())
1563 .await?;
1564
1565 drop(tx);
1566 if rows.is_empty() {
1567 break;
1568 }
1569 let mut values = Vec::new();
1570
1571 for row in rows.iter() {
1572 let leaf: Vec<u8> = row.try_get("leaf")?;
1573 let qc: Vec<u8> = row.try_get("qc")?;
1574 let leaf1: Leaf = bincode::deserialize(&leaf)?;
1575 let qc1: QuorumCertificate<SeqTypes> = bincode::deserialize(&qc)?;
1576 let view: i64 = row.try_get("view")?;
1577
1578 let leaf2: Leaf2 = leaf1.into();
1579 let qc2: QuorumCertificate2<SeqTypes> = qc1.to_qc2();
1580
1581 let leaf2_bytes = bincode::serialize(&leaf2)?;
1582 let qc2_bytes = bincode::serialize(&qc2)?;
1583
1584 values.push((view, leaf2_bytes, qc2_bytes));
1585 }
1586
1587 let mut query_builder: sqlx::QueryBuilder<Db> =
1588 sqlx::QueryBuilder::new("INSERT INTO anchor_leaf2 (view, leaf, qc) ");
1589
1590 offset = values.last().context("last row")?.0;
1591
1592 query_builder.push_values(values, |mut b, (view, leaf, qc)| {
1593 b.push_bind(view).push_bind(leaf).push_bind(qc);
1594 });
1595
1596 query_builder.push(" ON CONFLICT DO NOTHING");
1599
1600 let query = query_builder.build();
1601
1602 let mut tx = self.db.write().await?;
1603 query.execute(tx.as_mut()).await?;
1604
1605 tx.upsert(
1606 "epoch_migration",
1607 ["table_name", "completed", "migrated_rows"],
1608 ["table_name"],
1609 [("anchor_leaf".to_string(), false, offset)],
1610 )
1611 .await?;
1612 tx.commit().await?;
1613
1614 tracing::info!(
1615 "anchor leaf migration progress: rows={} offset={}",
1616 rows.len(),
1617 offset
1618 );
1619
1620 if rows.len() < batch_size as usize {
1621 break;
1622 }
1623 }
1624
1625 tracing::warn!("migrated decided leaves");
1626
1627 let mut tx = self.db.write().await?;
1628 tx.upsert(
1629 "epoch_migration",
1630 ["table_name", "completed", "migrated_rows"],
1631 ["table_name"],
1632 [("anchor_leaf".to_string(), true, offset)],
1633 )
1634 .await?;
1635 tx.commit().await?;
1636
1637 tracing::info!("updated epoch_migration table for anchor_leaf");
1638
1639 Ok(())
1640 }
1641
1642 async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1643 let batch_size: i64 = 10000;
1644 let mut tx = self.db.read().await?;
1645
1646 let (is_completed, mut offset) = query_as::<(bool, i64)>(
1647 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'da_proposal'",
1648 )
1649 .fetch_one(tx.as_mut())
1650 .await?;
1651
1652 if is_completed {
1653 tracing::info!("da proposals migration already done");
1654 return Ok(());
1655 }
1656
1657 tracing::warn!("migrating da proposals..");
1658
1659 loop {
1660 let mut tx = self.db.read().await?;
1661 let rows = query(
1662 "SELECT payload_hash, data FROM da_proposal WHERE view >= $1 ORDER BY view LIMIT \
1663 $2",
1664 )
1665 .bind(offset)
1666 .bind(batch_size)
1667 .fetch_all(tx.as_mut())
1668 .await?;
1669
1670 drop(tx);
1671 if rows.is_empty() {
1672 break;
1673 }
1674 let mut values = Vec::new();
1675
1676 for row in rows.iter() {
1677 let data: Vec<u8> = row.try_get("data")?;
1678 let payload_hash: String = row.try_get("payload_hash")?;
1679
1680 let da_proposal: Proposal<SeqTypes, DaProposal<SeqTypes>> =
1681 bincode::deserialize(&data)?;
1682 let da_proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
1683 convert_proposal(da_proposal);
1684
1685 let view = da_proposal2.data.view_number.u64() as i64;
1686 let data = bincode::serialize(&da_proposal2)?;
1687
1688 values.push((view, payload_hash, data));
1689 }
1690
1691 let mut query_builder: sqlx::QueryBuilder<Db> =
1692 sqlx::QueryBuilder::new("INSERT INTO da_proposal2 (view, payload_hash, data) ");
1693
1694 offset = values.last().context("last row")?.0;
1695 query_builder.push_values(values, |mut b, (view, payload_hash, data)| {
1696 b.push_bind(view).push_bind(payload_hash).push_bind(data);
1697 });
1698 query_builder.push(" ON CONFLICT DO NOTHING");
1699 let query = query_builder.build();
1700
1701 let mut tx = self.db.write().await?;
1702 query.execute(tx.as_mut()).await?;
1703
1704 tx.upsert(
1705 "epoch_migration",
1706 ["table_name", "completed", "migrated_rows"],
1707 ["table_name"],
1708 [("da_proposal".to_string(), false, offset)],
1709 )
1710 .await?;
1711 tx.commit().await?;
1712
1713 tracing::info!(
1714 "DA proposals migration progress: rows={} offset={}",
1715 rows.len(),
1716 offset
1717 );
1718 if rows.len() < batch_size as usize {
1719 break;
1720 }
1721 }
1722
1723 tracing::warn!("migrated da proposals");
1724
1725 let mut tx = self.db.write().await?;
1726 tx.upsert(
1727 "epoch_migration",
1728 ["table_name", "completed", "migrated_rows"],
1729 ["table_name"],
1730 [("da_proposal".to_string(), true, offset)],
1731 )
1732 .await?;
1733 tx.commit().await?;
1734
1735 tracing::info!("updated epoch_migration table for da_proposal");
1736
1737 Ok(())
1738 }
1739
1740 async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1741 let batch_size: i64 = 10000;
1742
1743 let mut tx = self.db.read().await?;
1744
1745 let (is_completed, mut offset) = query_as::<(bool, i64)>(
1746 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'vid_share'",
1747 )
1748 .fetch_one(tx.as_mut())
1749 .await?;
1750
1751 if is_completed {
1752 tracing::info!("vid_share migration already done");
1753 return Ok(());
1754 }
1755
1756 tracing::warn!("migrating vid shares..");
1757 loop {
1758 let mut tx = self.db.read().await?;
1759 let rows = query(
1760 "SELECT payload_hash, data FROM vid_share WHERE view >= $1 ORDER BY view LIMIT $2",
1761 )
1762 .bind(offset)
1763 .bind(batch_size)
1764 .fetch_all(tx.as_mut())
1765 .await?;
1766
1767 drop(tx);
1768 if rows.is_empty() {
1769 break;
1770 }
1771 let mut values = Vec::new();
1772
1773 for row in rows.iter() {
1774 let data: Vec<u8> = row.try_get("data")?;
1775 let payload_hash: String = row.try_get("payload_hash")?;
1776
1777 let vid_share: Proposal<SeqTypes, VidDisperseShare0<SeqTypes>> =
1778 bincode::deserialize(&data)?;
1779 let vid_share2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1780 convert_proposal(vid_share);
1781
1782 let view = vid_share2.data.view_number().u64() as i64;
1783 let data = bincode::serialize(&vid_share2)?;
1784
1785 values.push((view, payload_hash, data));
1786 }
1787
1788 let mut query_builder: sqlx::QueryBuilder<Db> =
1789 sqlx::QueryBuilder::new("INSERT INTO vid_share2 (view, payload_hash, data) ");
1790
1791 offset = values.last().context("last row")?.0;
1792
1793 query_builder.push_values(values, |mut b, (view, payload_hash, data)| {
1794 b.push_bind(view).push_bind(payload_hash).push_bind(data);
1795 });
1796
1797 let query = query_builder.build();
1798
1799 let mut tx = self.db.write().await?;
1800 query.execute(tx.as_mut()).await?;
1801
1802 tx.upsert(
1803 "epoch_migration",
1804 ["table_name", "completed", "migrated_rows"],
1805 ["table_name"],
1806 [("vid_share".to_string(), false, offset)],
1807 )
1808 .await?;
1809 tx.commit().await?;
1810
1811 tracing::info!(
1812 "VID shares migration progress: rows={} offset={}",
1813 rows.len(),
1814 offset
1815 );
1816 if rows.len() < batch_size as usize {
1817 break;
1818 }
1819 }
1820
1821 tracing::warn!("migrated vid shares");
1822
1823 let mut tx = self.db.write().await?;
1824 tx.upsert(
1825 "epoch_migration",
1826 ["table_name", "completed", "migrated_rows"],
1827 ["table_name"],
1828 [("vid_share".to_string(), true, offset)],
1829 )
1830 .await?;
1831 tx.commit().await?;
1832
1833 tracing::info!("updated epoch_migration table for vid_share");
1834
1835 Ok(())
1836 }
1837
1838 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
1839 let batch_size: i64 = 10000;
1840 let mut tx = self.db.read().await?;
1841
1842 let (is_completed, mut offset) = query_as::<(bool, i64)>(
1843 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
1844 'quorum_proposals'",
1845 )
1846 .fetch_one(tx.as_mut())
1847 .await?;
1848
1849 if is_completed {
1850 tracing::info!("quorum proposals migration already done");
1851 return Ok(());
1852 }
1853
1854 tracing::warn!("migrating quorum proposals..");
1855
1856 loop {
1857 let mut tx = self.db.read().await?;
1858 let rows = query(
1859 "SELECT view, leaf_hash, data FROM quorum_proposals WHERE view >= $1 ORDER BY \
1860 view LIMIT $2",
1861 )
1862 .bind(offset)
1863 .bind(batch_size)
1864 .fetch_all(tx.as_mut())
1865 .await?;
1866
1867 drop(tx);
1868
1869 if rows.is_empty() {
1870 break;
1871 }
1872
1873 let mut values = Vec::new();
1874
1875 for row in rows.iter() {
1876 let leaf_hash: String = row.try_get("leaf_hash")?;
1877 let data: Vec<u8> = row.try_get("data")?;
1878
1879 let quorum_proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
1880 bincode::deserialize(&data)?;
1881 let quorum_proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1882 convert_proposal(quorum_proposal);
1883
1884 let view = quorum_proposal2.data.view_number().u64() as i64;
1885 let data = bincode::serialize(&quorum_proposal2)?;
1886
1887 values.push((view, leaf_hash, data));
1888 }
1889
1890 let mut query_builder: sqlx::QueryBuilder<Db> =
1891 sqlx::QueryBuilder::new("INSERT INTO quorum_proposals2 (view, leaf_hash, data) ");
1892
1893 offset = values.last().context("last row")?.0;
1894 query_builder.push_values(values, |mut b, (view, leaf_hash, data)| {
1895 b.push_bind(view).push_bind(leaf_hash).push_bind(data);
1896 });
1897
1898 query_builder.push(" ON CONFLICT DO NOTHING");
1899
1900 let query = query_builder.build();
1901
1902 let mut tx = self.db.write().await?;
1903 query.execute(tx.as_mut()).await?;
1904
1905 tx.upsert(
1906 "epoch_migration",
1907 ["table_name", "completed", "migrated_rows"],
1908 ["table_name"],
1909 [("quorum_proposals".to_string(), false, offset)],
1910 )
1911 .await?;
1912 tx.commit().await?;
1913
1914 tracing::info!(
1915 "quorum proposals migration progress: rows={} offset={}",
1916 rows.len(),
1917 offset
1918 );
1919
1920 if rows.len() < batch_size as usize {
1921 break;
1922 }
1923 }
1924
1925 tracing::warn!("migrated quorum proposals");
1926
1927 let mut tx = self.db.write().await?;
1928 tx.upsert(
1929 "epoch_migration",
1930 ["table_name", "completed", "migrated_rows"],
1931 ["table_name"],
1932 [("quorum_proposals".to_string(), true, offset)],
1933 )
1934 .await?;
1935 tx.commit().await?;
1936
1937 tracing::info!("updated epoch_migration table for quorum_proposals");
1938
1939 Ok(())
1940 }
1941
1942 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
1943 let batch_size: i64 = 10000;
1944 let mut tx = self.db.read().await?;
1945
1946 let (is_completed, mut offset) = query_as::<(bool, i64)>(
1947 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
1948 'quorum_certificate'",
1949 )
1950 .fetch_one(tx.as_mut())
1951 .await?;
1952
1953 if is_completed {
1954 tracing::info!("quorum certificates migration already done");
1955 return Ok(());
1956 }
1957
1958 tracing::warn!("migrating quorum certificates..");
1959 loop {
1960 let mut tx = self.db.read().await?;
1961 let rows = query(
1962 "SELECT view, leaf_hash, data FROM quorum_certificate WHERE view >= $1 ORDER BY \
1963 view LIMIT $2",
1964 )
1965 .bind(offset)
1966 .bind(batch_size)
1967 .fetch_all(tx.as_mut())
1968 .await?;
1969
1970 drop(tx);
1971 if rows.is_empty() {
1972 break;
1973 }
1974 let mut values = Vec::new();
1975
1976 for row in rows.iter() {
1977 let leaf_hash: String = row.try_get("leaf_hash")?;
1978 let data: Vec<u8> = row.try_get("data")?;
1979
1980 let qc: QuorumCertificate<SeqTypes> = bincode::deserialize(&data)?;
1981 let qc2: QuorumCertificate2<SeqTypes> = qc.to_qc2();
1982
1983 let view = qc2.view_number().u64() as i64;
1984 let data = bincode::serialize(&qc2)?;
1985
1986 values.push((view, leaf_hash, data));
1987 }
1988
1989 let mut query_builder: sqlx::QueryBuilder<Db> =
1990 sqlx::QueryBuilder::new("INSERT INTO quorum_certificate2 (view, leaf_hash, data) ");
1991
1992 offset = values.last().context("last row")?.0;
1993
1994 query_builder.push_values(values, |mut b, (view, leaf_hash, data)| {
1995 b.push_bind(view).push_bind(leaf_hash).push_bind(data);
1996 });
1997
1998 query_builder.push(" ON CONFLICT DO NOTHING");
1999 let query = query_builder.build();
2000
2001 let mut tx = self.db.write().await?;
2002 query.execute(tx.as_mut()).await?;
2003
2004 tx.upsert(
2005 "epoch_migration",
2006 ["table_name", "completed", "migrated_rows"],
2007 ["table_name"],
2008 [("quorum_certificate".to_string(), false, offset)],
2009 )
2010 .await?;
2011 tx.commit().await?;
2012
2013 tracing::info!(
2014 "Quorum certificates migration progress: rows={} offset={}",
2015 rows.len(),
2016 offset
2017 );
2018
2019 if rows.len() < batch_size as usize {
2020 break;
2021 }
2022 }
2023
2024 tracing::warn!("migrated quorum certificates");
2025
2026 let mut tx = self.db.write().await?;
2027 tx.upsert(
2028 "epoch_migration",
2029 ["table_name", "completed", "migrated_rows"],
2030 ["table_name"],
2031 [("quorum_certificate".to_string(), true, offset)],
2032 )
2033 .await?;
2034 tx.commit().await?;
2035 tracing::info!("updated epoch_migration table for quorum_certificate");
2036
2037 Ok(())
2038 }
2039
2040 async fn store_next_epoch_quorum_certificate(
2041 &self,
2042 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
2043 ) -> anyhow::Result<()> {
2044 let qc2_bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
2045 let mut tx = self.db.write().await?;
2046 tx.upsert(
2047 "next_epoch_quorum_certificate",
2048 ["id", "data"],
2049 ["id"],
2050 [(true, qc2_bytes)],
2051 )
2052 .await?;
2053 tx.commit().await
2054 }
2055
2056 async fn load_next_epoch_quorum_certificate(
2057 &self,
2058 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
2059 let result = self
2060 .db
2061 .read()
2062 .await?
2063 .fetch_optional("SELECT * FROM next_epoch_quorum_certificate where id = true")
2064 .await?;
2065
2066 result
2067 .map(|row| {
2068 let bytes: Vec<u8> = row.get("data");
2069 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
2070 })
2071 .transpose()
2072 }
2073
2074 async fn store_eqc(
2075 &self,
2076 high_qc: QuorumCertificate2<SeqTypes>,
2077 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
2078 ) -> anyhow::Result<()> {
2079 let eqc_bytes =
2080 bincode::serialize(&(high_qc, next_epoch_high_qc)).context("serializing eqc")?;
2081 let mut tx = self.db.write().await?;
2082 tx.upsert("eqc", ["id", "data"], ["id"], [(true, eqc_bytes)])
2083 .await?;
2084 tx.commit().await
2085 }
2086
2087 async fn load_eqc(
2088 &self,
2089 ) -> Option<(
2090 QuorumCertificate2<SeqTypes>,
2091 NextEpochQuorumCertificate2<SeqTypes>,
2092 )> {
2093 let result = self
2094 .db
2095 .read()
2096 .await
2097 .ok()?
2098 .fetch_optional("SELECT * FROM eqc where id = true")
2099 .await
2100 .ok()?;
2101
2102 result
2103 .map(|row| {
2104 let bytes: Vec<u8> = row.get("data");
2105 bincode::deserialize(&bytes)
2106 })
2107 .transpose()
2108 .ok()?
2109 }
2110
2111 async fn append_da2(
2112 &self,
2113 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
2114 vid_commit: VidCommitment,
2115 ) -> anyhow::Result<()> {
2116 let data = &proposal.data;
2117 let view = data.view_number().u64();
2118 let data_bytes = bincode::serialize(proposal).unwrap();
2119
2120 let now = Instant::now();
2121 let mut tx = self.db.write().await?;
2122 tx.upsert(
2123 "da_proposal2",
2124 ["view", "data", "payload_hash"],
2125 ["view"],
2126 [(view as i64, data_bytes, vid_commit.to_string())],
2127 )
2128 .await?;
2129 let res = tx.commit().await;
2130 self.internal_metrics
2131 .internal_append_da2_duration
2132 .add_point(now.elapsed().as_secs_f64());
2133 res
2134 }
2135
2136 async fn store_drb_result(
2137 &self,
2138 epoch: EpochNumber,
2139 drb_result: DrbResult,
2140 ) -> anyhow::Result<()> {
2141 let drb_result_vec = Vec::from(drb_result);
2142 let mut tx = self.db.write().await?;
2143 tx.upsert(
2144 "epoch_drb_and_root",
2145 ["epoch", "drb_result"],
2146 ["epoch"],
2147 [(epoch.u64() as i64, drb_result_vec)],
2148 )
2149 .await?;
2150 tx.commit().await
2151 }
2152
2153 async fn store_epoch_root(
2154 &self,
2155 epoch: EpochNumber,
2156 block_header: <SeqTypes as NodeType>::BlockHeader,
2157 ) -> anyhow::Result<()> {
2158 let block_header_bytes =
2159 bincode::serialize(&block_header).context("serializing block header")?;
2160
2161 let mut tx = self.db.write().await?;
2162 tx.upsert(
2163 "epoch_drb_and_root",
2164 ["epoch", "block_header"],
2165 ["epoch"],
2166 [(epoch.u64() as i64, block_header_bytes)],
2167 )
2168 .await?;
2169 tx.commit().await
2170 }
2171
2172 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
2173 if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
2174 if loaded_drb_input.difficulty_level != drb_input.difficulty_level {
2175 tracing::error!("Overwriting {loaded_drb_input:?} in storage with {drb_input:?}");
2176 } else if loaded_drb_input.iteration >= drb_input.iteration {
2177 anyhow::bail!(
2178 "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
2179 loaded_drb_input,
2180 drb_input
2181 )
2182 }
2183 }
2184
2185 let drb_input_bytes = bincode::serialize(&drb_input)
2186 .context("Failed to serialize DrbInput. This is not fatal, but should never happen.")?;
2187
2188 let mut tx = self.db.write().await?;
2189
2190 tx.upsert(
2191 "drb",
2192 ["epoch", "drb_input"],
2193 ["epoch"],
2194 [(drb_input.epoch as i64, drb_input_bytes)],
2195 )
2196 .await?;
2197 tx.commit().await
2198 }
2199
2200 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
2201 let row = self
2202 .db
2203 .read()
2204 .await?
2205 .fetch_optional(query("SELECT drb_input FROM drb WHERE epoch = $1").bind(epoch as i64))
2206 .await?;
2207
2208 match row {
2209 None => anyhow::bail!("No DrbInput for epoch {} in storage", epoch),
2210 Some(row) => {
2211 let drb_input_bytes: Vec<u8> = row.try_get("drb_input")?;
2212 let drb_input = bincode::deserialize(&drb_input_bytes)
2213 .context("Failed to deserialize drb_input from storage")?;
2214
2215 Ok(drb_input)
2216 },
2217 }
2218 }
2219
2220 async fn add_state_cert(
2221 &self,
2222 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2223 ) -> anyhow::Result<()> {
2224 let state_cert_bytes = bincode::serialize(&state_cert)
2225 .context("serializing light client state update certificate")?;
2226
2227 let mut tx = self.db.write().await?;
2228 tx.upsert(
2229 "state_cert",
2230 ["view", "state_cert"],
2231 ["view"],
2232 [(
2233 state_cert.light_client_state.view_number as i64,
2234 state_cert_bytes,
2235 )],
2236 )
2237 .await?;
2238 tx.commit().await
2239 }
2240
2241 async fn load_state_cert(
2242 &self,
2243 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2244 let Some(row) = self
2245 .db
2246 .read()
2247 .await?
2248 .fetch_optional(
2249 "SELECT state_cert FROM finalized_state_cert ORDER BY epoch DESC LIMIT 1",
2250 )
2251 .await?
2252 else {
2253 return Ok(None);
2254 };
2255 let bytes: Vec<u8> = row.get("state_cert");
2256
2257 let cert = match bincode::deserialize(&bytes) {
2258 Ok(cert) => cert,
2259 Err(err) => {
2260 tracing::info!(
2261 error = %err,
2262 "Failed to deserialize state certificate with v2. attempting with v1"
2263 );
2264
2265 let v1_cert =
2266 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2267 .with_context(|| {
2268 format!("Failed to deserialize using both v1 and v2. error: {err}")
2269 })?;
2270
2271 v1_cert.into()
2272 },
2273 };
2274
2275 Ok(Some(cert))
2276 }
2277
2278 async fn get_state_cert_by_epoch(
2279 &self,
2280 epoch: u64,
2281 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2282 let Some(row) = self
2283 .db
2284 .read()
2285 .await?
2286 .fetch_optional(
2287 query("SELECT state_cert FROM finalized_state_cert WHERE epoch = $1")
2288 .bind(epoch as i64),
2289 )
2290 .await?
2291 else {
2292 return Ok(None);
2293 };
2294 let bytes: Vec<u8> = row.get("state_cert");
2295
2296 let cert = match bincode::deserialize(&bytes) {
2297 Ok(cert) => cert,
2298 Err(err) => {
2299 tracing::info!(
2300 error = %err,
2301 "Failed to deserialize state certificate with v2. attempting with v1"
2302 );
2303
2304 let v1_cert =
2305 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2306 .with_context(|| {
2307 format!("Failed to deserialize using both v1 and v2. error: {err}")
2308 })?;
2309
2310 v1_cert.into()
2311 },
2312 };
2313
2314 Ok(Some(cert))
2315 }
2316
2317 async fn insert_state_cert(
2318 &self,
2319 epoch: u64,
2320 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2321 ) -> anyhow::Result<()> {
2322 let bytes = bincode::serialize(&cert)
2323 .with_context(|| format!("Failed to serialize state cert for epoch {epoch}"))?;
2324
2325 let mut tx = self.db.write().await?;
2326
2327 tx.upsert(
2328 "finalized_state_cert",
2329 ["epoch", "state_cert"],
2330 ["epoch"],
2331 [(epoch as i64, bytes)],
2332 )
2333 .await
2334 }
2335
2336 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
2337 let rows = self
2338 .db
2339 .read()
2340 .await?
2341 .fetch_all(
2342 query("SELECT * from epoch_drb_and_root ORDER BY epoch DESC LIMIT $1")
2343 .bind(RECENT_STAKE_TABLES_LIMIT as i64),
2344 )
2345 .await?;
2346
2347 rows.into_iter()
2349 .rev()
2350 .map(|row| {
2351 let epoch: i64 = row.try_get("epoch")?;
2352 let drb_result: Option<Vec<u8>> = row.try_get("drb_result")?;
2353 let block_header: Option<Vec<u8>> = row.try_get("block_header")?;
2354 if let Some(drb_result) = drb_result {
2355 let drb_result_array = drb_result
2356 .try_into()
2357 .or_else(|_| bail!("invalid drb result"))?;
2358 let block_header: Option<<SeqTypes as NodeType>::BlockHeader> = block_header
2359 .map(|data| bincode::deserialize(&data))
2360 .transpose()?;
2361 Ok(Some(InitializerEpochInfo::<SeqTypes> {
2362 epoch: <SeqTypes as NodeType>::Epoch::new(epoch as u64),
2363 drb_result: drb_result_array,
2364 block_header,
2365 }))
2366 } else {
2367 Ok(None)
2370 }
2371 })
2372 .filter_map(|e| match e {
2373 Err(v) => Some(Err(v)),
2374 Ok(Some(v)) => Some(Ok(v)),
2375 Ok(None) => None,
2376 })
2377 .collect()
2378 }
2379
2380 fn enable_metrics(&mut self, metrics: &dyn Metrics) {
2381 self.internal_metrics = PersistenceMetricsValue::new(metrics);
2382 }
2383}
2384
2385#[async_trait]
2386impl MembershipPersistence for Persistence {
2387 async fn load_stake(
2388 &self,
2389 epoch: EpochNumber,
2390 ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>> {
2391 let result = self
2392 .db
2393 .read()
2394 .await?
2395 .fetch_optional(
2396 query(
2397 "SELECT stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
2398 epoch = $1",
2399 )
2400 .bind(epoch.u64() as i64),
2401 )
2402 .await?;
2403
2404 result
2405 .map(|row| {
2406 let stake_table_bytes: Vec<u8> = row.get("stake");
2407 let reward_bytes: Option<Vec<u8>> = row.get("block_reward");
2408 let stake_table_hash_bytes: Option<Vec<u8>> = row.get("stake_table_hash");
2409 let stake_table = bincode::deserialize(&stake_table_bytes)
2410 .context("deserializing stake table")?;
2411 let reward: Option<RewardAmount> = reward_bytes
2412 .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
2413 .transpose()?;
2414 let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes
2415 .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
2416 .transpose()?;
2417
2418 Ok((stake_table, reward, stake_table_hash))
2419 })
2420 .transpose()
2421 }
2422
2423 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
2424 let mut tx = self.db.read().await?;
2425
2426 let rows = match query_as::<(i64, Vec<u8>, Option<Vec<u8>>, Option<Vec<u8>>)>(
2427 "SELECT epoch, stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
2428 stake is NOT NULL ORDER BY epoch DESC LIMIT $1",
2429 )
2430 .bind(limit as i64)
2431 .fetch_all(tx.as_mut())
2432 .await
2433 {
2434 Ok(bytes) => bytes,
2435 Err(err) => {
2436 tracing::error!("error loading stake tables: {err:#}");
2437 bail!("{err:#}");
2438 },
2439 };
2440
2441 let stakes: anyhow::Result<Vec<IndexedStake>> = rows
2442 .into_iter()
2443 .map(
2444 |(id, stake_bytes, reward_bytes_opt, stake_table_hash_bytes_opt)| {
2445 let stake_table =
2446 bincode::deserialize(&stake_bytes).context("deserializing stake table")?;
2447
2448 let block_reward: Option<RewardAmount> = reward_bytes_opt
2449 .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
2450 .transpose()?;
2451
2452 let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes_opt
2453 .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
2454 .transpose()?;
2455
2456 Ok((
2457 EpochNumber::new(id as u64),
2458 (stake_table, block_reward),
2459 stake_table_hash,
2460 ))
2461 },
2462 )
2463 .collect();
2464
2465 Ok(Some(stakes?))
2466 }
2467
2468 async fn store_stake(
2469 &self,
2470 epoch: EpochNumber,
2471 stake: ValidatorMap,
2472 block_reward: Option<RewardAmount>,
2473 stake_table_hash: Option<StakeTableHash>,
2474 ) -> anyhow::Result<()> {
2475 let mut tx = self.db.write().await?;
2476
2477 let stake_table_bytes = bincode::serialize(&stake).context("serializing stake table")?;
2478 let reward_bytes = block_reward
2479 .map(|r| bincode::serialize(&r).context("serializing block reward"))
2480 .transpose()?;
2481 let stake_table_hash_bytes = stake_table_hash
2482 .map(|h| bincode::serialize(&h).context("serializing stake table hash"))
2483 .transpose()?;
2484 tx.upsert(
2485 "epoch_drb_and_root",
2486 ["epoch", "stake", "block_reward", "stake_table_hash"],
2487 ["epoch"],
2488 [(
2489 epoch.u64() as i64,
2490 stake_table_bytes,
2491 reward_bytes,
2492 stake_table_hash_bytes,
2493 )],
2494 )
2495 .await?;
2496 tx.commit().await
2497 }
2498
2499 async fn store_events(
2500 &self,
2501 l1_finalized: u64,
2502 events: Vec<(EventKey, StakeTableEvent)>,
2503 ) -> anyhow::Result<()> {
2504 if events.is_empty() {
2505 return Ok(());
2506 }
2507
2508 let mut tx = self.db.write().await?;
2509
2510 let last_processed_l1_block = query_as::<(i64,)>(
2512 "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
2513 )
2514 .fetch_optional(tx.as_mut())
2515 .await?
2516 .map(|(l1,)| l1);
2517
2518 tracing::debug!("last l1 finalizes in database = {last_processed_l1_block:?}");
2519
2520 if last_processed_l1_block > Some(l1_finalized.try_into()?) {
2522 tracing::debug!(
2523 ?last_processed_l1_block,
2524 ?l1_finalized,
2525 ?events,
2526 "last l1 finalized stored is already higher"
2527 );
2528 return Ok(());
2529 }
2530
2531 let mut query_builder: sqlx::QueryBuilder<Db> =
2532 sqlx::QueryBuilder::new("INSERT INTO stake_table_events (l1_block, log_index, event) ");
2533
2534 let events = events
2535 .into_iter()
2536 .map(|((block_number, index), event)| {
2537 Ok((
2538 i64::try_from(block_number)?,
2539 i64::try_from(index)?,
2540 serde_json::to_value(event).context("l1 event to value")?,
2541 ))
2542 })
2543 .collect::<anyhow::Result<Vec<_>>>()?;
2544
2545 query_builder.push_values(events, |mut b, (l1_block, log_index, event)| {
2546 b.push_bind(l1_block).push_bind(log_index).push_bind(event);
2547 });
2548
2549 query_builder.push(" ON CONFLICT DO NOTHING");
2550 let query = query_builder.build();
2551
2552 query.execute(tx.as_mut()).await?;
2553
2554 tx.upsert(
2556 "stake_table_events_l1_block",
2557 ["id", "last_l1_block"],
2558 ["id"],
2559 [(0_i32, l1_finalized as i64)],
2560 )
2561 .await?;
2562
2563 tx.commit().await?;
2564
2565 Ok(())
2566 }
2567
2568 async fn load_events(
2579 &self,
2580 from_l1_block: u64,
2581 to_l1_block: u64,
2582 ) -> anyhow::Result<(
2583 Option<EventsPersistenceRead>,
2584 Vec<(EventKey, StakeTableEvent)>,
2585 )> {
2586 let mut tx = self.db.read().await?;
2587
2588 let res = query_as::<(i64,)>(
2590 "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
2591 )
2592 .fetch_optional(tx.as_mut())
2593 .await?;
2594
2595 let Some((last_processed_l1_block,)) = res else {
2596 return Ok((None, Vec::new()));
2598 };
2599
2600 let to_l1_block = to_l1_block.try_into()?;
2604 let query_l1_block = if last_processed_l1_block > to_l1_block {
2605 to_l1_block
2606 } else {
2607 last_processed_l1_block
2608 };
2609
2610 let rows = query(
2611 "SELECT l1_block, log_index, event FROM stake_table_events WHERE $1 <= l1_block AND \
2612 l1_block <= $2 ORDER BY l1_block ASC, log_index ASC",
2613 )
2614 .bind(i64::try_from(from_l1_block)?)
2615 .bind(query_l1_block)
2616 .fetch_all(tx.as_mut())
2617 .await?;
2618
2619 let events = rows
2620 .into_iter()
2621 .map(|row| {
2622 let l1_block: i64 = row.try_get("l1_block")?;
2623 let log_index: i64 = row.try_get("log_index")?;
2624 let event = serde_json::from_value(row.try_get("event")?)?;
2625
2626 Ok(((l1_block.try_into()?, log_index.try_into()?), event))
2627 })
2628 .collect::<anyhow::Result<Vec<_>>>()?;
2629
2630 if query_l1_block == to_l1_block {
2634 Ok((Some(EventsPersistenceRead::Complete), events))
2635 } else {
2636 Ok((
2637 Some(EventsPersistenceRead::UntilL1Block(
2638 query_l1_block.try_into()?,
2639 )),
2640 events,
2641 ))
2642 }
2643 }
2644
2645 async fn store_all_validators(
2646 &self,
2647 epoch: EpochNumber,
2648 all_validators: ValidatorMap,
2649 ) -> anyhow::Result<()> {
2650 let mut tx = self.db.write().await?;
2651
2652 if all_validators.is_empty() {
2653 return Ok(());
2654 }
2655
2656 let mut query_builder =
2657 QueryBuilder::new("INSERT INTO stake_table_validators (epoch, address, validator) ");
2658
2659 query_builder.push_values(all_validators, |mut b, (address, validator)| {
2660 let validator_json =
2661 serde_json::to_value(&validator).expect("cannot serialize validator to json");
2662 b.push_bind(epoch.u64() as i64)
2663 .push_bind(address.to_string())
2664 .push_bind(validator_json);
2665 });
2666
2667 query_builder
2668 .push(" ON CONFLICT (epoch, address) DO UPDATE SET validator = EXCLUDED.validator");
2669
2670 let query = query_builder.build();
2671
2672 query.execute(tx.as_mut()).await?;
2673
2674 tx.commit().await?;
2675 Ok(())
2676 }
2677
2678 async fn load_all_validators(
2679 &self,
2680 epoch: EpochNumber,
2681 offset: u64,
2682 limit: u64,
2683 ) -> anyhow::Result<Vec<Validator<PubKey>>> {
2684 let mut tx = self.db.read().await?;
2685
2686 let rows = query(
2690 "SELECT address, validator
2691 FROM stake_table_validators
2692 WHERE epoch = $1
2693 ORDER BY LOWER(address) ASC
2694 LIMIT $2 OFFSET $3",
2695 )
2696 .bind(epoch.u64() as i64)
2697 .bind(limit as i64)
2698 .bind(offset as i64)
2699 .fetch_all(tx.as_mut())
2700 .await?;
2701 rows.into_iter()
2702 .map(|row| {
2703 let validator_json: serde_json::Value = row.try_get("validator")?;
2704 serde_json::from_value::<Validator<PubKey>>(validator_json).map_err(Into::into)
2705 })
2706 .collect()
2707 }
2708}
2709
2710#[async_trait]
2711impl DhtPersistentStorage for Persistence {
2712 async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
2718 let to_save =
2720 bincode::serialize(&records).with_context(|| "failed to serialize records")?;
2721
2722 let stmt = "INSERT INTO libp2p_dht (id, serialized_records) VALUES (0, $1) ON CONFLICT \
2724 (id) DO UPDATE SET serialized_records = $1";
2725
2726 let mut tx = self
2728 .db
2729 .write()
2730 .await
2731 .with_context(|| "failed to start an atomic DB transaction")?;
2732 tx.execute(query(stmt).bind(to_save))
2733 .await
2734 .with_context(|| "failed to execute DB query")?;
2735
2736 tx.commit().await.with_context(|| "failed to commit to DB")
2738 }
2739
2740 async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
2746 let result = self
2748 .db
2749 .read()
2750 .await
2751 .with_context(|| "failed to start a DB read transaction")?
2752 .fetch_one("SELECT * FROM libp2p_dht where id = 0")
2753 .await
2754 .with_context(|| "failed to fetch from DB")?;
2755
2756 let serialied_records: Vec<u8> = result.get("serialized_records");
2758
2759 let records: Vec<SerializableRecord> = bincode::deserialize(&serialied_records)
2761 .with_context(|| "Failed to deserialize records")?;
2762
2763 Ok(records)
2764 }
2765}
2766
2767#[async_trait]
2768impl Provider<SeqTypes, VidCommonRequest> for Persistence {
2769 #[tracing::instrument(skip(self))]
2770 async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
2771 let mut tx = match self.db.read().await {
2772 Ok(tx) => tx,
2773 Err(err) => {
2774 tracing::warn!("could not open transaction: {err:#}");
2775 return None;
2776 },
2777 };
2778
2779 let bytes = match query_as::<(Vec<u8>,)>(
2780 "SELECT data FROM vid_share2 WHERE payload_hash = $1 LIMIT 1",
2781 )
2782 .bind(req.0.to_string())
2783 .fetch_optional(tx.as_mut())
2784 .await
2785 {
2786 Ok(Some((bytes,))) => bytes,
2787 Ok(None) => return None,
2788 Err(err) => {
2789 tracing::error!("error loading VID share: {err:#}");
2790 return None;
2791 },
2792 };
2793
2794 let share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
2795 match bincode::deserialize(&bytes) {
2796 Ok(share) => share,
2797 Err(err) => {
2798 tracing::warn!("error decoding VID share: {err:#}");
2799 return None;
2800 },
2801 };
2802
2803 match share.data {
2804 VidDisperseShare::V0(vid) => Some(VidCommon::V0(vid.common)),
2805 VidDisperseShare::V1(vid) => Some(VidCommon::V1(vid.common)),
2806 VidDisperseShare::V2(vid) => Some(VidCommon::V2(vid.common)),
2807 }
2808 }
2809}
2810
2811#[async_trait]
2812impl Provider<SeqTypes, PayloadRequest> for Persistence {
2813 #[tracing::instrument(skip(self))]
2814 async fn fetch(&self, req: PayloadRequest) -> Option<Payload> {
2815 let mut tx = match self.db.read().await {
2816 Ok(tx) => tx,
2817 Err(err) => {
2818 tracing::warn!("could not open transaction: {err:#}");
2819 return None;
2820 },
2821 };
2822
2823 let bytes = match query_as::<(Vec<u8>,)>(
2824 "SELECT data FROM da_proposal2 WHERE payload_hash = $1 LIMIT 1",
2825 )
2826 .bind(req.0.to_string())
2827 .fetch_optional(tx.as_mut())
2828 .await
2829 {
2830 Ok(Some((bytes,))) => bytes,
2831 Ok(None) => return None,
2832 Err(err) => {
2833 tracing::warn!("error loading DA proposal: {err:#}");
2834 return None;
2835 },
2836 };
2837
2838 let proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> = match bincode::deserialize(&bytes)
2839 {
2840 Ok(proposal) => proposal,
2841 Err(err) => {
2842 tracing::error!("error decoding DA proposal: {err:#}");
2843 return None;
2844 },
2845 };
2846
2847 Some(Payload::from_bytes(
2848 &proposal.data.encoded_transactions,
2849 &proposal.data.metadata,
2850 ))
2851 }
2852}
2853
2854#[async_trait]
2855impl Provider<SeqTypes, LeafRequest<SeqTypes>> for Persistence {
2856 #[tracing::instrument(skip(self))]
2857 async fn fetch(&self, req: LeafRequest<SeqTypes>) -> Option<LeafQueryData<SeqTypes>> {
2858 let mut tx = match self.db.read().await {
2859 Ok(tx) => tx,
2860 Err(err) => {
2861 tracing::warn!("could not open transaction: {err:#}");
2862 return None;
2863 },
2864 };
2865
2866 let (leaf, qc) = match fetch_leaf_from_proposals(&mut tx, req).await {
2867 Ok(res) => res?,
2868 Err(err) => {
2869 tracing::info!("requested leaf not found in undecided proposals: {err:#}");
2870 return None;
2871 },
2872 };
2873
2874 match LeafQueryData::new(leaf, qc) {
2875 Ok(leaf) => Some(leaf),
2876 Err(err) => {
2877 tracing::warn!("fetched invalid leaf: {err:#}");
2878 None
2879 },
2880 }
2881 }
2882}
2883
2884async fn fetch_leaf_from_proposals<Mode: TransactionMode>(
2885 tx: &mut Transaction<Mode>,
2886 req: LeafRequest<SeqTypes>,
2887) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
2888 let Some((proposal_bytes,)) =
2890 query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE leaf_hash = $1 LIMIT 1")
2891 .bind(req.expected_leaf.to_string())
2892 .fetch_optional(tx.as_mut())
2893 .await
2894 .context("fetching proposal")?
2895 else {
2896 return Ok(None);
2897 };
2898
2899 let Some((qc_bytes,)) =
2901 query_as::<(Vec<u8>,)>("SELECT data FROM quorum_certificate2 WHERE leaf_hash = $1 LIMIT 1")
2902 .bind(req.expected_leaf.to_string())
2903 .fetch_optional(tx.as_mut())
2904 .await
2905 .context("fetching QC")?
2906 else {
2907 return Ok(None);
2908 };
2909
2910 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
2911 bincode::deserialize(&proposal_bytes).context("deserializing quorum proposal")?;
2912 let qc: QuorumCertificate2<SeqTypes> =
2913 bincode::deserialize(&qc_bytes).context("deserializing quorum certificate")?;
2914
2915 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
2916 Ok(Some((leaf, qc)))
2917}
2918
2919#[cfg(test)]
2920mod testing {
2921 use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
2922
2923 use super::*;
2924 use crate::persistence::tests::TestablePersistence;
2925
2926 #[async_trait]
2927 impl TestablePersistence for Persistence {
2928 type Storage = Arc<TmpDb>;
2929
2930 async fn tmp_storage() -> Self::Storage {
2931 Arc::new(TmpDb::init().await)
2932 }
2933
2934 #[allow(refining_impl_trait)]
2935 fn options(db: &Self::Storage) -> Options {
2936 #[cfg(not(feature = "embedded-db"))]
2937 {
2938 PostgresOptions {
2939 port: Some(db.port()),
2940 host: Some(db.host()),
2941 user: Some("postgres".into()),
2942 password: Some("password".into()),
2943 ..Default::default()
2944 }
2945 .into()
2946 }
2947
2948 #[cfg(feature = "embedded-db")]
2949 {
2950 SqliteOptions { path: db.path() }.into()
2951 }
2952 }
2953 }
2954}
2955
2956#[cfg(test)]
2957mod test {
2958 use committable::{Commitment, CommitmentBoundsArkless};
2959 use espresso_types::{traits::NullEventConsumer, Header, Leaf, NodeState, ValidatedState};
2960 use futures::stream::TryStreamExt;
2961 use hotshot_example_types::node_types::TestVersions;
2962 use hotshot_types::{
2963 data::{
2964 ns_table::parse_ns_table, vid_disperse::AvidMDisperseShare, EpochNumber,
2965 QuorumProposal2,
2966 },
2967 message::convert_proposal,
2968 simple_certificate::QuorumCertificate,
2969 simple_vote::QuorumData,
2970 traits::{
2971 block_contents::{BlockHeader, GENESIS_VID_NUM_STORAGE_NODES},
2972 node_implementation::Versions,
2973 signature_key::SignatureKey,
2974 EncodeBytes,
2975 },
2976 utils::EpochTransitionIndicator,
2977 vid::{
2978 advz::advz_scheme,
2979 avidm::{init_avidm_param, AvidMScheme},
2980 },
2981 };
2982 use jf_advz::VidScheme;
2983 use vbs::version::StaticVersionType;
2984
2985 use super::*;
2986 use crate::{persistence::tests::TestablePersistence as _, BLSPubKey, PubKey};
2987
2988 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2989 async fn test_quorum_proposals_leaf_hash_migration() {
2990 let leaf: Leaf2 =
2992 Leaf::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock())
2993 .await
2994 .into();
2995 let privkey = BLSPubKey::generated_from_seed_indexed([0; 32], 1).1;
2996 let signature = PubKey::sign(&privkey, &[]).unwrap();
2997 let mut quorum_proposal = Proposal {
2998 data: QuorumProposal2::<SeqTypes> {
2999 epoch: None,
3000 block_header: leaf.block_header().clone(),
3001 view_number: ViewNumber::genesis(),
3002 justify_qc: QuorumCertificate::genesis::<TestVersions>(
3003 &ValidatedState::default(),
3004 &NodeState::mock(),
3005 )
3006 .await
3007 .to_qc2(),
3008 upgrade_certificate: None,
3009 view_change_evidence: None,
3010 next_drb_result: None,
3011 next_epoch_justify_qc: None,
3012 state_cert: None,
3013 },
3014 signature,
3015 _pd: Default::default(),
3016 };
3017
3018 let qp1: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
3019 convert_proposal(quorum_proposal.clone());
3020
3021 quorum_proposal.data.view_number = ViewNumber::new(1);
3022
3023 let qp2: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
3024 convert_proposal(quorum_proposal.clone());
3025 let qps = [qp1, qp2];
3026
3027 let db = Persistence::tmp_storage().await;
3029 let persistence = Persistence::connect(&db).await;
3030 let mut tx = persistence.db.write().await.unwrap();
3031 let params = qps
3032 .iter()
3033 .map(|qp| {
3034 (
3035 qp.data.view_number.u64() as i64,
3036 bincode::serialize(&qp).unwrap(),
3037 )
3038 })
3039 .collect::<Vec<_>>();
3040 tx.upsert("quorum_proposals", ["view", "data"], ["view"], params)
3041 .await
3042 .unwrap();
3043 tx.commit().await.unwrap();
3044
3045 let persistence = Persistence::connect(&db).await;
3047 let mut tx = persistence.db.read().await.unwrap();
3048 let rows = tx
3049 .fetch("SELECT * FROM quorum_proposals ORDER BY view ASC")
3050 .try_collect::<Vec<_>>()
3051 .await
3052 .unwrap();
3053 assert_eq!(rows.len(), qps.len());
3054 for (row, qp) in rows.into_iter().zip(qps) {
3055 assert_eq!(row.get::<i64, _>("view"), qp.data.view_number.u64() as i64);
3056 assert_eq!(
3057 row.get::<Vec<u8>, _>("data"),
3058 bincode::serialize(&qp).unwrap()
3059 );
3060 assert_eq!(
3061 row.get::<String, _>("leaf_hash"),
3062 Committable::commit(&Leaf::from_quorum_proposal(&qp.data)).to_string()
3063 );
3064 }
3065 }
3066
3067 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3068 async fn test_fetching_providers() {
3069 let tmp = Persistence::tmp_storage().await;
3070 let storage = Persistence::connect(&tmp).await;
3071
3072 let leaf =
3074 Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
3075 let leaf_payload = leaf.block_payload().unwrap();
3076 let leaf_payload_bytes_arc = leaf_payload.encode();
3077
3078 let avidm_param = init_avidm_param(2).unwrap();
3079 let weights = vec![1u32; 2];
3080
3081 let ns_table = parse_ns_table(
3082 leaf_payload.byte_len().as_usize(),
3083 &leaf_payload.ns_table().encode(),
3084 );
3085 let (payload_commitment, shares) =
3086 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
3087 .unwrap();
3088 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
3089 let vid_share = convert_proposal(
3090 AvidMDisperseShare::<SeqTypes> {
3091 view_number: ViewNumber::new(0),
3092 payload_commitment,
3093 share: shares[0].clone(),
3094 recipient_key: pubkey,
3095 epoch: None,
3096 target_epoch: None,
3097 common: avidm_param.clone(),
3098 }
3099 .to_proposal(&privkey)
3100 .unwrap()
3101 .clone(),
3102 );
3103
3104 let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
3105 proposal: QuorumProposal2::<SeqTypes> {
3106 block_header: leaf.block_header().clone(),
3107 view_number: leaf.view_number(),
3108 justify_qc: leaf.justify_qc(),
3109 upgrade_certificate: None,
3110 view_change_evidence: None,
3111 next_drb_result: None,
3112 next_epoch_justify_qc: None,
3113 epoch: None,
3114 state_cert: None,
3115 },
3116 };
3117 let quorum_proposal_signature =
3118 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3119 .expect("Failed to sign quorum proposal");
3120 let quorum_proposal = Proposal {
3121 data: quorum_proposal,
3122 signature: quorum_proposal_signature,
3123 _pd: Default::default(),
3124 };
3125
3126 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
3127 .expect("Failed to sign block payload");
3128 let da_proposal = Proposal {
3129 data: DaProposal2::<SeqTypes> {
3130 encoded_transactions: leaf_payload_bytes_arc,
3131 metadata: leaf_payload.ns_table().clone(),
3132 view_number: ViewNumber::new(0),
3133 epoch: None,
3134 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
3135 },
3136 signature: block_payload_signature,
3137 _pd: Default::default(),
3138 };
3139
3140 let mut next_quorum_proposal = quorum_proposal.clone();
3141 next_quorum_proposal.data.proposal.view_number += 1;
3142 next_quorum_proposal.data.proposal.justify_qc.view_number += 1;
3143 next_quorum_proposal
3144 .data
3145 .proposal
3146 .justify_qc
3147 .data
3148 .leaf_commit = Committable::commit(&leaf.clone());
3149 let qc = next_quorum_proposal.data.justify_qc();
3150
3151 storage
3153 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
3154 .await
3155 .unwrap();
3156 storage.append_vid(&vid_share).await.unwrap();
3157 storage
3158 .append_quorum_proposal2(&quorum_proposal)
3159 .await
3160 .unwrap();
3161
3162 storage
3164 .append_quorum_proposal2(&next_quorum_proposal)
3165 .await
3166 .unwrap();
3167
3168 assert_eq!(
3170 Some(VidCommon::V1(avidm_param)),
3171 storage
3172 .fetch(VidCommonRequest(vid_share.data.payload_commitment()))
3173 .await
3174 );
3175 assert_eq!(
3176 leaf_payload,
3177 storage
3178 .fetch(PayloadRequest(vid_share.data.payload_commitment()))
3179 .await
3180 .unwrap()
3181 );
3182 assert_eq!(
3183 LeafQueryData::new(leaf.clone(), qc.clone()).unwrap(),
3184 storage
3185 .fetch(LeafRequest::new(
3186 leaf.block_header().block_number(),
3187 Committable::commit(&leaf),
3188 qc.clone().commit()
3189 ))
3190 .await
3191 .unwrap()
3192 );
3193 }
3194
3195 async fn test_pruning_helper(pruning_opt: ConsensusPruningOptions) {
3203 let tmp = Persistence::tmp_storage().await;
3204 let mut opt = Persistence::options(&tmp);
3205 opt.consensus_pruning = pruning_opt;
3206 let storage = opt.create().await.unwrap();
3207
3208 let data_view = ViewNumber::new(1);
3209
3210 let leaf =
3212 Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
3213 let leaf_payload = leaf.block_payload().unwrap();
3214 let leaf_payload_bytes_arc = leaf_payload.encode();
3215
3216 let avidm_param = init_avidm_param(2).unwrap();
3217 let weights = vec![1u32; 2];
3218
3219 let ns_table = parse_ns_table(
3220 leaf_payload.byte_len().as_usize(),
3221 &leaf_payload.ns_table().encode(),
3222 );
3223 let (payload_commitment, shares) =
3224 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
3225 .unwrap();
3226
3227 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
3228 let vid = convert_proposal(
3229 AvidMDisperseShare::<SeqTypes> {
3230 view_number: data_view,
3231 payload_commitment,
3232 share: shares[0].clone(),
3233 recipient_key: pubkey,
3234 epoch: None,
3235 target_epoch: None,
3236 common: avidm_param,
3237 }
3238 .to_proposal(&privkey)
3239 .unwrap()
3240 .clone(),
3241 );
3242 let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
3243 proposal: QuorumProposal2::<SeqTypes> {
3244 epoch: None,
3245 block_header: leaf.block_header().clone(),
3246 view_number: data_view,
3247 justify_qc: QuorumCertificate2::genesis::<TestVersions>(
3248 &ValidatedState::default(),
3249 &NodeState::mock(),
3250 )
3251 .await,
3252 upgrade_certificate: None,
3253 view_change_evidence: None,
3254 next_drb_result: None,
3255 next_epoch_justify_qc: None,
3256 state_cert: None,
3257 },
3258 };
3259 let quorum_proposal_signature =
3260 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3261 .expect("Failed to sign quorum proposal");
3262 let quorum_proposal = Proposal {
3263 data: quorum_proposal,
3264 signature: quorum_proposal_signature,
3265 _pd: Default::default(),
3266 };
3267
3268 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
3269 .expect("Failed to sign block payload");
3270 let da_proposal = Proposal {
3271 data: DaProposal2::<SeqTypes> {
3272 encoded_transactions: leaf_payload_bytes_arc.clone(),
3273 metadata: leaf_payload.ns_table().clone(),
3274 view_number: data_view,
3275 epoch: Some(EpochNumber::new(0)),
3276 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
3277 },
3278 signature: block_payload_signature,
3279 _pd: Default::default(),
3280 };
3281
3282 tracing::info!(?vid, ?da_proposal, ?quorum_proposal, "append data");
3283 storage.append_vid(&vid).await.unwrap();
3284 storage
3285 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
3286 .await
3287 .unwrap();
3288 storage
3289 .append_quorum_proposal2(&quorum_proposal)
3290 .await
3291 .unwrap();
3292
3293 tracing::info!("decide view 1");
3296 storage
3297 .append_decided_leaves(data_view + 1, [], None, &NullEventConsumer)
3298 .await
3299 .unwrap();
3300 assert_eq!(
3301 storage.load_vid_share(data_view).await.unwrap().unwrap(),
3302 vid
3303 );
3304 assert_eq!(
3305 storage.load_da_proposal(data_view).await.unwrap().unwrap(),
3306 da_proposal
3307 );
3308 assert_eq!(
3309 storage.load_quorum_proposal(data_view).await.unwrap(),
3310 quorum_proposal
3311 );
3312
3313 tracing::info!("decide view 2");
3316 storage
3317 .append_decided_leaves(data_view + 2, [], None, &NullEventConsumer)
3318 .await
3319 .unwrap();
3320 assert!(storage.load_vid_share(data_view).await.unwrap().is_none(),);
3321 assert!(storage.load_da_proposal(data_view).await.unwrap().is_none());
3322 storage.load_quorum_proposal(data_view).await.unwrap_err();
3323 }
3324
3325 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3326 async fn test_pruning_minimum_retention() {
3327 test_pruning_helper(ConsensusPruningOptions {
3328 target_usage: 0,
3331 minimum_retention: 1,
3332 target_retention: u64::MAX,
3335 })
3336 .await
3337 }
3338
3339 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3340 async fn test_pruning_target_retention() {
3341 test_pruning_helper(ConsensusPruningOptions {
3342 target_retention: 1,
3343 minimum_retention: 0,
3346 target_usage: u64::MAX,
3349 })
3350 .await
3351 }
3352
3353 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3354 async fn test_consensus_migration() {
3355 let tmp = Persistence::tmp_storage().await;
3356 let mut opt = Persistence::options(&tmp);
3357
3358 let storage = opt.create().await.unwrap();
3359
3360 let rows = 300;
3361
3362 assert!(storage.load_state_cert().await.unwrap().is_none());
3363
3364 for i in 0..rows {
3365 let view = ViewNumber::new(i);
3366 let validated_state = ValidatedState::default();
3367 let instance_state = NodeState::default();
3368
3369 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
3370 let (payload, metadata) =
3371 Payload::from_transactions([], &validated_state, &instance_state)
3372 .await
3373 .unwrap();
3374
3375 let payload_bytes = payload.encode();
3376
3377 let block_header =
3378 Header::genesis::<TestVersions>(&instance_state, payload.clone(), &metadata);
3379
3380 let null_quorum_data = QuorumData {
3381 leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
3382 };
3383
3384 let justify_qc = QuorumCertificate::new(
3385 null_quorum_data.clone(),
3386 null_quorum_data.commit(),
3387 view,
3388 None,
3389 std::marker::PhantomData,
3390 );
3391
3392 let quorum_proposal = QuorumProposal {
3393 block_header,
3394 view_number: view,
3395 justify_qc: justify_qc.clone(),
3396 upgrade_certificate: None,
3397 proposal_certificate: None,
3398 };
3399
3400 let quorum_proposal_signature =
3401 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3402 .expect("Failed to sign quorum proposal");
3403
3404 let proposal = Proposal {
3405 data: quorum_proposal.clone(),
3406 signature: quorum_proposal_signature,
3407 _pd: std::marker::PhantomData,
3408 };
3409
3410 let proposal_bytes = bincode::serialize(&proposal)
3411 .context("serializing proposal")
3412 .unwrap();
3413
3414 let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
3415 leaf.fill_block_payload::<TestVersions>(
3416 payload,
3417 GENESIS_VID_NUM_STORAGE_NODES,
3418 <TestVersions as Versions>::Base::VERSION,
3419 )
3420 .unwrap();
3421
3422 let mut tx = storage.db.write().await.unwrap();
3423
3424 let qc_bytes = bincode::serialize(&justify_qc).unwrap();
3425 let leaf_bytes = bincode::serialize(&leaf).unwrap();
3426
3427 tx.upsert(
3428 "anchor_leaf",
3429 ["view", "leaf", "qc"],
3430 ["view"],
3431 [(i as i64, leaf_bytes, qc_bytes)],
3432 )
3433 .await
3434 .unwrap();
3435
3436 let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
3437 epoch: EpochNumber::new(i),
3438 light_client_state: Default::default(), next_stake_table_state: Default::default(), signatures: vec![], auth_root: Default::default(),
3442 };
3443 let state_cert_bytes = bincode::serialize(&state_cert).unwrap();
3445 tx.upsert(
3446 "finalized_state_cert",
3447 ["epoch", "state_cert"],
3448 ["epoch"],
3449 [(i as i64, state_cert_bytes)],
3450 )
3451 .await
3452 .unwrap();
3453
3454 tx.commit().await.unwrap();
3455
3456 let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
3457 .disperse(payload_bytes.clone())
3458 .unwrap();
3459
3460 let vid = VidDisperseShare0::<SeqTypes> {
3461 view_number: ViewNumber::new(i),
3462 payload_commitment: Default::default(),
3463 share: disperse.shares[0].clone(),
3464 common: disperse.common,
3465 recipient_key: pubkey,
3466 };
3467
3468 let (payload, metadata) =
3469 Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
3470 .await
3471 .unwrap();
3472
3473 let da = DaProposal::<SeqTypes> {
3474 encoded_transactions: payload.encode(),
3475 metadata,
3476 view_number: ViewNumber::new(i),
3477 };
3478
3479 let block_payload_signature =
3480 BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
3481
3482 let da_proposal = Proposal {
3483 data: da,
3484 signature: block_payload_signature,
3485 _pd: Default::default(),
3486 };
3487
3488 storage
3489 .append_vid(&convert_proposal(vid.to_proposal(&privkey).unwrap()))
3490 .await
3491 .unwrap();
3492 storage
3493 .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
3494 .await
3495 .unwrap();
3496
3497 let leaf_hash = Committable::commit(&leaf);
3498 let mut tx = storage.db.write().await.expect("failed to start write tx");
3499 tx.upsert(
3500 "quorum_proposals",
3501 ["view", "leaf_hash", "data"],
3502 ["view"],
3503 [(i as i64, leaf_hash.to_string(), proposal_bytes)],
3504 )
3505 .await
3506 .expect("failed to upsert quorum proposal");
3507
3508 let justify_qc = &proposal.data.justify_qc;
3509 let justify_qc_bytes = bincode::serialize(&justify_qc)
3510 .context("serializing QC")
3511 .unwrap();
3512 tx.upsert(
3513 "quorum_certificate",
3514 ["view", "leaf_hash", "data"],
3515 ["view"],
3516 [(
3517 justify_qc.view_number.u64() as i64,
3518 justify_qc.data.leaf_commit.to_string(),
3519 &justify_qc_bytes,
3520 )],
3521 )
3522 .await
3523 .expect("failed to upsert qc");
3524
3525 tx.commit().await.expect("failed to commit");
3526 }
3527
3528 storage.migrate_storage().await.unwrap();
3529
3530 let mut tx = storage.db.read().await.unwrap();
3531 let (anchor_leaf2_count,) = query_as::<(i64,)>("SELECT COUNT(*) from anchor_leaf2")
3532 .fetch_one(tx.as_mut())
3533 .await
3534 .unwrap();
3535 assert_eq!(
3536 anchor_leaf2_count, rows as i64,
3537 "anchor leaf count does not match rows",
3538 );
3539
3540 let (da_proposal_count,) = query_as::<(i64,)>("SELECT COUNT(*) from da_proposal2")
3541 .fetch_one(tx.as_mut())
3542 .await
3543 .unwrap();
3544 assert_eq!(
3545 da_proposal_count, rows as i64,
3546 "da proposal count does not match rows",
3547 );
3548
3549 let (vid_share_count,) = query_as::<(i64,)>("SELECT COUNT(*) from vid_share2")
3550 .fetch_one(tx.as_mut())
3551 .await
3552 .unwrap();
3553 assert_eq!(
3554 vid_share_count, rows as i64,
3555 "vid share count does not match rows"
3556 );
3557
3558 let (quorum_proposals_count,) =
3559 query_as::<(i64,)>("SELECT COUNT(*) from quorum_proposals2")
3560 .fetch_one(tx.as_mut())
3561 .await
3562 .unwrap();
3563 assert_eq!(
3564 quorum_proposals_count, rows as i64,
3565 "quorum proposals count does not match rows",
3566 );
3567
3568 let (quorum_certificates_count,) =
3569 query_as::<(i64,)>("SELECT COUNT(*) from quorum_certificate2")
3570 .fetch_one(tx.as_mut())
3571 .await
3572 .unwrap();
3573 assert_eq!(
3574 quorum_certificates_count, rows as i64,
3575 "quorum certificates count does not match rows",
3576 );
3577
3578 let (state_cert_count,) = query_as::<(i64,)>("SELECT COUNT(*) from finalized_state_cert")
3579 .fetch_one(tx.as_mut())
3580 .await
3581 .unwrap();
3582 assert_eq!(
3583 state_cert_count, rows as i64,
3584 "Light client state update certificates count does not match rows",
3585 );
3586 assert_eq!(
3587 storage.load_state_cert().await.unwrap().unwrap(),
3588 LightClientStateUpdateCertificateV2::<SeqTypes> {
3589 epoch: EpochNumber::new(rows - 1),
3590 light_client_state: Default::default(),
3591 next_stake_table_state: Default::default(),
3592 signatures: vec![],
3593 auth_root: Default::default(),
3594 },
3595 "Wrong light client state update certificate in the storage",
3596 );
3597
3598 storage.migrate_storage().await.unwrap();
3599 }
3600}
3601
3602#[cfg(test)]
3603#[cfg(not(feature = "embedded-db"))]
3604mod postgres_tests {
3605 use espresso_types::{FeeAccount, Header, Leaf, NodeState, Transaction as Tx};
3606 use hotshot_example_types::node_types::TestVersions;
3607 use hotshot_query_service::{
3608 availability::BlockQueryData, data_source::storage::UpdateAvailabilityStorage,
3609 };
3610 use hotshot_types::{
3611 data::vid_commitment,
3612 simple_certificate::QuorumCertificate,
3613 traits::{
3614 block_contents::{BlockHeader, BuilderFee, GENESIS_VID_NUM_STORAGE_NODES},
3615 election::Membership,
3616 signature_key::BuilderSignatureKey,
3617 EncodeBytes,
3618 },
3619 };
3620
3621 use super::*;
3622 use crate::persistence::tests::TestablePersistence as _;
3623
3624 async fn test_postgres_read_ns_table(instance_state: NodeState) {
3625 instance_state
3626 .coordinator
3627 .membership()
3628 .write()
3629 .await
3630 .set_first_epoch(EpochNumber::genesis(), Default::default());
3631
3632 let tmp = Persistence::tmp_storage().await;
3633 let mut opt = Persistence::options(&tmp);
3634 let storage = opt.create().await.unwrap();
3635
3636 let txs = [
3637 Tx::new(10001u32.into(), vec![1, 2, 3]),
3638 Tx::new(10001u32.into(), vec![4, 5, 6]),
3639 Tx::new(10009u32.into(), vec![7, 8, 9]),
3640 ];
3641
3642 let validated_state = Default::default();
3643 let justify_qc =
3644 QuorumCertificate::genesis::<TestVersions>(&validated_state, &instance_state).await;
3645 let view_number: ViewNumber = justify_qc.view_number + 1;
3646 let parent_leaf = Leaf::genesis::<TestVersions>(&validated_state, &instance_state)
3647 .await
3648 .into();
3649
3650 let (payload, ns_table) =
3651 Payload::from_transactions(txs.clone(), &validated_state, &instance_state)
3652 .await
3653 .unwrap();
3654 let payload_bytes = payload.encode();
3655 let payload_commitment = vid_commitment::<TestVersions>(
3656 &payload_bytes,
3657 &ns_table.encode(),
3658 GENESIS_VID_NUM_STORAGE_NODES,
3659 instance_state.current_version,
3660 );
3661 let builder_commitment = payload.builder_commitment(&ns_table);
3662 let (fee_account, fee_key) = FeeAccount::generated_from_seed_indexed([0; 32], 0);
3663 let fee_amount = 0;
3664 let fee_signature = FeeAccount::sign_fee(&fee_key, fee_amount, &ns_table).unwrap();
3665 let block_header = Header::new(
3666 &validated_state,
3667 &instance_state,
3668 &parent_leaf,
3669 payload_commitment,
3670 builder_commitment,
3671 ns_table,
3672 BuilderFee {
3673 fee_amount,
3674 fee_account,
3675 fee_signature,
3676 },
3677 instance_state.current_version,
3678 view_number.u64(),
3679 )
3680 .await
3681 .unwrap();
3682 let proposal = QuorumProposal {
3683 block_header: block_header.clone(),
3684 view_number,
3685 justify_qc: justify_qc.clone(),
3686 upgrade_certificate: None,
3687 proposal_certificate: None,
3688 };
3689 let leaf: Leaf2 = Leaf::from_quorum_proposal(&proposal).into();
3690 let mut qc = justify_qc.to_qc2();
3691 qc.data.leaf_commit = leaf.commit();
3692 qc.view_number = view_number;
3693
3694 let mut tx = storage.db.write().await.unwrap();
3695 tx.insert_leaf(LeafQueryData::new(leaf, qc).unwrap())
3696 .await
3697 .unwrap();
3698 tx.insert_block(BlockQueryData::<SeqTypes>::new(block_header, payload))
3699 .await
3700 .unwrap();
3701 tx.commit().await.unwrap();
3702
3703 let mut tx = storage.db.read().await.unwrap();
3704 let rows = query(
3705 "
3706 SELECT ns_id, read_ns_id(get_ns_table(h.data), t.ns_index) AS read_ns_id
3707 FROM header AS h
3708 JOIN transactions AS t ON t.block_height = h.height
3709 ORDER BY t.ns_index, t.position
3710 ",
3711 )
3712 .fetch_all(tx.as_mut())
3713 .await
3714 .unwrap();
3715 assert_eq!(rows.len(), txs.len());
3716 for (i, row) in rows.into_iter().enumerate() {
3717 let ns = u64::from(txs[i].namespace()) as i64;
3718 assert_eq!(row.get::<i64, _>("ns_id"), ns);
3719 assert_eq!(row.get::<i64, _>("read_ns_id"), ns);
3720 }
3721 }
3722
3723 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3724 async fn test_postgres_read_ns_table_v0_1() {
3725 test_postgres_read_ns_table(NodeState::mock()).await;
3726 }
3727
3728 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3729 async fn test_postgres_read_ns_table_v0_2() {
3730 test_postgres_read_ns_table(NodeState::mock_v2()).await;
3731 }
3732
3733 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3734 async fn test_postgres_read_ns_table_v0_3() {
3735 test_postgres_read_ns_table(NodeState::mock_v3().with_epoch_height(0)).await;
3736 }
3737}