1use std::{
2 collections::BTreeMap,
3 path::PathBuf,
4 str::FromStr,
5 sync::Arc,
6 time::{Duration, Instant},
7};
8
9use anyhow::{bail, Context};
10use async_trait::async_trait;
11use clap::Parser;
12use committable::Committable;
13use derivative::Derivative;
14use derive_more::derive::{From, Into};
15use espresso_types::{
16 parse_duration, parse_size,
17 traits::{EventsPersistenceRead, MembershipPersistence},
18 v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence, StateCatchup},
19 v0_3::{EventKey, IndexedStake, RewardAmount, StakeTableEvent},
20 BackoffParams, BlockMerkleTree, FeeMerkleTree, Leaf, Leaf2, NetworkConfig, Payload,
21 StakeTableHash, ValidatorMap,
22};
23use futures::stream::StreamExt;
24use hotshot::InitializerEpochInfo;
25use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
26 DhtPersistentStorage, SerializableRecord,
27};
28use hotshot_query_service::{
29 availability::LeafQueryData,
30 data_source::{
31 storage::{
32 pruning::PrunerCfg,
33 sql::{
34 include_migrations, query_as, syntax_helpers::MAX_FN, Config, Db, Read, SqlStorage,
35 Transaction, TransactionMode, Write,
36 },
37 },
38 Transaction as _, VersionedDataSource,
39 },
40 fetching::{
41 request::{LeafRequest, PayloadRequest, VidCommonRequest},
42 Provider,
43 },
44 merklized_state::MerklizedState,
45 VidCommon,
46};
47use hotshot_types::{
48 data::{
49 vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
50 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
51 QuorumProposalWrapperLegacy, VidCommitment, VidDisperseShare,
52 },
53 drb::{DrbInput, DrbResult},
54 event::{Event, EventType, HotShotAction, LeafInfo},
55 message::{convert_proposal, Proposal},
56 simple_certificate::{
57 LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
58 NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
59 },
60 traits::{
61 block_contents::{BlockHeader, BlockPayload},
62 metrics::Metrics,
63 node_implementation::ConsensusTime,
64 },
65 vote::HasViewNumber,
66};
67use itertools::Itertools;
68use sqlx::{query, Executor, Row};
69
70use crate::{
71 catchup::SqlStateCatchup, persistence::persistence_metrics::PersistenceMetricsValue, NodeType,
72 SeqTypes, ViewNumber, RECENT_STAKE_TABLES_LIMIT,
73};
74
75#[derive(Parser, Clone, Derivative)]
77#[derivative(Debug)]
78pub struct PostgresOptions {
79 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_HOST")]
81 pub(crate) host: Option<String>,
82
83 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PORT")]
85 pub(crate) port: Option<u16>,
86
87 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_DATABASE")]
89 pub(crate) database: Option<String>,
90
91 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USER")]
93 pub(crate) user: Option<String>,
94
95 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PASSWORD")]
97 #[derivative(Debug = "ignore")]
99 pub(crate) password: Option<String>,
100
101 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USE_TLS")]
103 pub(crate) use_tls: bool,
104}
105
106impl Default for PostgresOptions {
107 fn default() -> Self {
108 Self::parse_from(std::iter::empty::<String>())
109 }
110}
111
112#[derive(Parser, Clone, Derivative, Default, From, Into)]
113#[derivative(Debug)]
114pub struct SqliteOptions {
115 #[clap(
118 long,
119 env = "ESPRESSO_SEQUENCER_STORAGE_PATH",
120 value_parser = build_sqlite_path
121 )]
122 pub(crate) path: Option<PathBuf>,
123}
124
125pub fn build_sqlite_path(path: &str) -> anyhow::Result<PathBuf> {
126 let sub_dir = PathBuf::from_str(path)?.join("sqlite");
127
128 if !sub_dir.exists() {
130 std::fs::create_dir_all(&sub_dir)
131 .with_context(|| format!("failed to create directory: {sub_dir:?}"))?;
132 }
133
134 Ok(sub_dir.join("database"))
135}
136
137#[derive(Parser, Clone, Derivative, From, Into)]
139#[derivative(Debug)]
140pub struct Options {
141 #[cfg(not(feature = "embedded-db"))]
142 #[clap(flatten)]
143 pub(crate) postgres_options: PostgresOptions,
144
145 #[cfg(feature = "embedded-db")]
146 #[clap(flatten)]
147 pub(crate) sqlite_options: SqliteOptions,
148
149 #[derivative(Debug = "ignore")]
162 pub(crate) uri: Option<String>,
163
164 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_PRUNE")]
173 pub(crate) prune: bool,
174
175 #[clap(flatten)]
177 pub(crate) pruning: PruningOptions,
178
179 #[clap(flatten)]
181 pub(crate) consensus_pruning: ConsensusPruningOptions,
182
183 #[clap(long, env = "ESPRESSO_SEQUENCER_FETCH_RATE_LIMIT")]
185 pub(crate) fetch_rate_limit: Option<usize>,
186
187 #[clap(long, env = "ESPRESSO_SEQUENCER_ACTIVE_FETCH_DELAY", value_parser = parse_duration)]
189 pub(crate) active_fetch_delay: Option<Duration>,
190
191 #[clap(long, env = "ESPRESSO_SEQUENCER_CHUNK_FETCH_DELAY", value_parser = parse_duration)]
193 pub(crate) chunk_fetch_delay: Option<Duration>,
194
195 #[clap(long, env = "ESPRESSO_SEQUENCER_ARCHIVE", conflicts_with = "prune")]
202 pub(crate) archive: bool,
203
204 #[clap(
206 long,
207 env = "ESPRESSO_SEQUENCER_LIGHTWEIGHT",
208 default_value_t = false,
209 conflicts_with = "archive"
210 )]
211 pub(crate) lightweight: bool,
212
213 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_IDLE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
218 pub(crate) idle_connection_timeout: Duration,
219
220 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "30m")]
227 pub(crate) connection_timeout: Duration,
228
229 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_SLOW_STATEMENT_THRESHOLD", value_parser = parse_duration, default_value = "1s")]
230 pub(crate) slow_statement_threshold: Duration,
231
232 #[clap(
238 long,
239 env = "ESPRESSO_SEQUENCER_DATABASE_MIN_CONNECTIONS",
240 default_value = "0"
241 )]
242 pub(crate) min_connections: u32,
243
244 #[clap(
249 long,
250 env = "ESPRESSO_SEQUENCER_DATABASE_MAX_CONNECTIONS",
251 default_value = "25"
252 )]
253 pub(crate) max_connections: u32,
254
255 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_TYPES_MIGRATION_BATCH_SIZE")]
260 pub(crate) types_migration_batch_size: Option<u64>,
261
262 #[clap(skip)]
270 pub(crate) pool: Option<sqlx::Pool<Db>>,
271}
272
273impl Default for Options {
274 fn default() -> Self {
275 Self::parse_from(std::iter::empty::<String>())
276 }
277}
278
279#[cfg(not(feature = "embedded-db"))]
280impl From<PostgresOptions> for Config {
281 fn from(opt: PostgresOptions) -> Self {
282 let mut cfg = Config::default();
283
284 if let Some(host) = opt.host {
285 cfg = cfg.host(host);
286 }
287
288 if let Some(port) = opt.port {
289 cfg = cfg.port(port);
290 }
291
292 if let Some(database) = &opt.database {
293 cfg = cfg.database(database);
294 }
295
296 if let Some(user) = &opt.user {
297 cfg = cfg.user(user);
298 }
299
300 if let Some(password) = &opt.password {
301 cfg = cfg.password(password);
302 }
303
304 if opt.use_tls {
305 cfg = cfg.tls();
306 }
307
308 cfg = cfg.max_connections(20);
309 cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
310 cfg = cfg.connection_timeout(Duration::from_secs(10240));
311 cfg = cfg.slow_statement_threshold(Duration::from_secs(1));
312
313 cfg
314 }
315}
316
317#[cfg(feature = "embedded-db")]
318impl From<SqliteOptions> for Config {
319 fn from(opt: SqliteOptions) -> Self {
320 let mut cfg = Config::default();
321
322 if let Some(path) = opt.path {
323 cfg = cfg.db_path(path);
324 }
325
326 cfg = cfg.max_connections(20);
327 cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
328 cfg = cfg.connection_timeout(Duration::from_secs(10240));
329 cfg = cfg.slow_statement_threshold(Duration::from_secs(2));
330 cfg
331 }
332}
333
334#[cfg(not(feature = "embedded-db"))]
335impl From<PostgresOptions> for Options {
336 fn from(opt: PostgresOptions) -> Self {
337 Options {
338 postgres_options: opt,
339 max_connections: 20,
340 idle_connection_timeout: Duration::from_secs(120),
341 connection_timeout: Duration::from_secs(10240),
342 slow_statement_threshold: Duration::from_secs(1),
343 ..Default::default()
344 }
345 }
346}
347
348#[cfg(feature = "embedded-db")]
349impl From<SqliteOptions> for Options {
350 fn from(opt: SqliteOptions) -> Self {
351 Options {
352 sqlite_options: opt,
353 max_connections: 10,
354 idle_connection_timeout: Duration::from_secs(120),
355 connection_timeout: Duration::from_secs(10240),
356 slow_statement_threshold: Duration::from_secs(1),
357 ..Default::default()
358 }
359 }
360}
361impl TryFrom<&Options> for Config {
362 type Error = anyhow::Error;
363
364 fn try_from(opt: &Options) -> Result<Self, Self::Error> {
365 let mut cfg = match &opt.uri {
366 Some(uri) => uri.parse()?,
367 None => Self::default(),
368 };
369
370 if let Some(pool) = &opt.pool {
371 cfg = cfg.pool(pool.clone());
372 }
373
374 cfg = cfg.max_connections(opt.max_connections);
375 cfg = cfg.idle_connection_timeout(opt.idle_connection_timeout);
376 cfg = cfg.min_connections(opt.min_connections);
377 cfg = cfg.connection_timeout(opt.connection_timeout);
378 cfg = cfg.slow_statement_threshold(opt.slow_statement_threshold);
379
380 #[cfg(not(feature = "embedded-db"))]
381 {
382 cfg = cfg.migrations(include_migrations!(
383 "$CARGO_MANIFEST_DIR/api/migrations/postgres"
384 ));
385
386 let pg_options = &opt.postgres_options;
387
388 if let Some(host) = &pg_options.host {
389 cfg = cfg.host(host.clone());
390 }
391
392 if let Some(port) = pg_options.port {
393 cfg = cfg.port(port);
394 }
395
396 if let Some(database) = &pg_options.database {
397 cfg = cfg.database(database);
398 }
399
400 if let Some(user) = &pg_options.user {
401 cfg = cfg.user(user);
402 }
403
404 if let Some(password) = &pg_options.password {
405 cfg = cfg.password(password);
406 }
407
408 if pg_options.use_tls {
409 cfg = cfg.tls();
410 }
411 }
412
413 #[cfg(feature = "embedded-db")]
414 {
415 cfg = cfg.migrations(include_migrations!(
416 "$CARGO_MANIFEST_DIR/api/migrations/sqlite"
417 ));
418
419 if let Some(path) = &opt.sqlite_options.path {
420 cfg = cfg.db_path(path.clone());
421 }
422 }
423
424 if opt.prune {
425 cfg = cfg.pruner_cfg(PrunerCfg::from(opt.pruning))?;
426 }
427 if opt.archive {
428 cfg = cfg.archive();
429 }
430
431 Ok(cfg)
432 }
433}
434
435#[derive(Parser, Clone, Copy, Debug)]
437pub struct PruningOptions {
438 #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_PRUNING_THRESHOLD", value_parser = parse_size)]
442 pruning_threshold: Option<u64>,
443
444 #[clap(
447 long,
448 env = "ESPRESSO_SEQUENCER_PRUNER_MINIMUM_RETENTION",
449 value_parser = parse_duration,
450 )]
451 minimum_retention: Option<Duration>,
452
453 #[clap(
456 long,
457 env = "ESPRESSO_SEQUENCER_PRUNER_TARGET_RETENTION",
458 value_parser = parse_duration,
459 )]
460 target_retention: Option<Duration>,
461
462 #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_BATCH_SIZE")]
465 batch_size: Option<u64>,
466
467 #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_MAX_USAGE")]
473 max_usage: Option<u16>,
474
475 #[clap(
477 long,
478 env = "ESPRESSO_SEQUENCER_PRUNER_INTERVAL",
479 value_parser = parse_duration,
480 )]
481 interval: Option<Duration>,
482
483 #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_INCREMENTAL_VACUUM_PAGES")]
487 pages: Option<u64>,
488}
489
490impl From<PruningOptions> for PrunerCfg {
491 fn from(opt: PruningOptions) -> Self {
492 let mut cfg = PrunerCfg::new();
493 if let Some(threshold) = opt.pruning_threshold {
494 cfg = cfg.with_pruning_threshold(threshold);
495 }
496 if let Some(min) = opt.minimum_retention {
497 cfg = cfg.with_minimum_retention(min);
498 }
499 if let Some(target) = opt.target_retention {
500 cfg = cfg.with_target_retention(target);
501 }
502 if let Some(batch) = opt.batch_size {
503 cfg = cfg.with_batch_size(batch);
504 }
505 if let Some(max) = opt.max_usage {
506 cfg = cfg.with_max_usage(max);
507 }
508 if let Some(interval) = opt.interval {
509 cfg = cfg.with_interval(interval);
510 }
511
512 if let Some(pages) = opt.pages {
513 cfg = cfg.with_incremental_vacuum_pages(pages)
514 }
515
516 cfg = cfg.with_state_tables(vec![
517 BlockMerkleTree::state_type().to_string(),
518 FeeMerkleTree::state_type().to_string(),
519 ]);
520
521 cfg
522 }
523}
524
525#[derive(Parser, Clone, Copy, Debug)]
527pub struct ConsensusPruningOptions {
528 #[clap(
544 name = "TARGET_RETENTION",
545 long = "consensus-storage-target-retention",
546 env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION",
547 default_value = "302000"
548 )]
549 target_retention: u64,
550
551 #[clap(
562 name = "MINIMUM_RETENTION",
563 long = "consensus-storage-minimum-retention",
564 env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION",
565 default_value = "130000"
566 )]
567 minimum_retention: u64,
568
569 #[clap(
574 name = "TARGET_USAGE",
575 long = "consensus-storage-target-usage",
576 env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE",
577 default_value = "1000000000"
578 )]
579 target_usage: u64,
580}
581
582#[async_trait]
583impl PersistenceOptions for Options {
584 type Persistence = Persistence;
585
586 fn set_view_retention(&mut self, view_retention: u64) {
587 self.consensus_pruning.target_retention = view_retention;
588 self.consensus_pruning.minimum_retention = view_retention;
589 }
590
591 async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
592 let config = (&*self).try_into()?;
593 let persistence = Persistence {
594 db: SqlStorage::connect(config).await?,
595 gc_opt: self.consensus_pruning,
596 internal_metrics: PersistenceMetricsValue::default(),
597 };
598 persistence.migrate_quorum_proposal_leaf_hashes().await?;
599 self.pool = Some(persistence.db.pool());
600 Ok(persistence)
601 }
602
603 async fn reset(self) -> anyhow::Result<()> {
604 SqlStorage::connect(Config::try_from(&self)?.reset_schema()).await?;
605 Ok(())
606 }
607}
608
609#[derive(Clone, Debug)]
611pub struct Persistence {
612 db: SqlStorage,
613 gc_opt: ConsensusPruningOptions,
614 internal_metrics: PersistenceMetricsValue,
616}
617
618impl Persistence {
619 async fn migrate_quorum_proposal_leaf_hashes(&self) -> anyhow::Result<()> {
626 let mut tx = self.db.write().await?;
627
628 let mut proposals = tx.fetch("SELECT * FROM quorum_proposals");
629
630 let mut updates = vec![];
631 while let Some(row) = proposals.next().await {
632 let row = row?;
633
634 let hash: Option<String> = row.try_get("leaf_hash")?;
635 if hash.is_none() {
636 let view: i64 = row.try_get("view")?;
637 let data: Vec<u8> = row.try_get("data")?;
638 let proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
639 bincode::deserialize(&data)?;
640 let leaf = Leaf::from_quorum_proposal(&proposal.data);
641 let leaf_hash = Committable::commit(&leaf);
642 tracing::info!(view, %leaf_hash, "populating quorum proposal leaf hash");
643 updates.push((view, leaf_hash.to_string()));
644 }
645 }
646 drop(proposals);
647
648 tx.upsert("quorum_proposals", ["view", "leaf_hash"], ["view"], updates)
649 .await?;
650
651 tx.commit().await
652 }
653
654 async fn generate_decide_events(&self, consumer: &impl EventConsumer) -> anyhow::Result<()> {
655 let mut last_processed_view: Option<i64> = self
656 .db
657 .read()
658 .await?
659 .fetch_optional("SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1")
660 .await?
661 .map(|row| row.get("last_processed_view"));
662 loop {
663 let mut tx = self.db.read().await?;
670
671 let from_view = match last_processed_view {
676 Some(v) => v + 1,
677 None => 0,
678 };
679
680 let mut parent = None;
681 let mut rows =
682 query("SELECT leaf, qc FROM anchor_leaf2 WHERE view >= $1 ORDER BY view")
683 .bind(from_view)
684 .fetch(tx.as_mut());
685 let mut leaves = vec![];
686 let mut final_qc = None;
687 while let Some(row) = rows.next().await {
688 let row = match row {
689 Ok(row) => row,
690 Err(err) => {
691 tracing::warn!("error loading row: {err:#}");
694 break;
695 },
696 };
697
698 let leaf_data: Vec<u8> = row.get("leaf");
699 let leaf = bincode::deserialize::<Leaf2>(&leaf_data)?;
700 let qc_data: Vec<u8> = row.get("qc");
701 let qc = bincode::deserialize::<QuorumCertificate2<SeqTypes>>(&qc_data)?;
702 let height = leaf.block_header().block_number();
703
704 if let Some(parent) = parent {
708 if height != parent + 1 {
709 tracing::debug!(
710 height,
711 parent,
712 "ending decide event at non-consecutive leaf"
713 );
714 break;
715 }
716 }
717 parent = Some(height);
718 leaves.push(leaf);
719 final_qc = Some(qc);
720 }
721 drop(rows);
722
723 let Some(final_qc) = final_qc else {
724 tracing::debug!(from_view, "no new leaves at decide");
726 return Ok(());
727 };
728
729 let from_view = leaves[0].view_number();
732 let to_view = leaves[leaves.len() - 1].view_number();
733
734 let mut vid_shares = tx
736 .fetch_all(
737 query("SELECT view, data FROM vid_share2 where view >= $1 AND view <= $2")
738 .bind(from_view.u64() as i64)
739 .bind(to_view.u64() as i64),
740 )
741 .await?
742 .into_iter()
743 .map(|row| {
744 let view: i64 = row.get("view");
745 let data: Vec<u8> = row.get("data");
746 let vid_proposal = bincode::deserialize::<
747 Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
748 >(&data)?;
749 Ok((view as u64, vid_proposal.data))
750 })
751 .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
752
753 let mut da_proposals = tx
755 .fetch_all(
756 query("SELECT view, data FROM da_proposal2 where view >= $1 AND view <= $2")
757 .bind(from_view.u64() as i64)
758 .bind(to_view.u64() as i64),
759 )
760 .await?
761 .into_iter()
762 .map(|row| {
763 let view: i64 = row.get("view");
764 let data: Vec<u8> = row.get("data");
765 let da_proposal =
766 bincode::deserialize::<Proposal<SeqTypes, DaProposal2<SeqTypes>>>(&data)?;
767 Ok((view as u64, da_proposal.data))
768 })
769 .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
770
771 let state_certs = Self::load_state_certs(&mut tx, from_view, to_view)
773 .await
774 .inspect_err(|err| {
775 tracing::error!(
776 ?from_view,
777 ?to_view,
778 "failed to load state certificates. error={err:#}"
779 );
780 })?;
781 drop(tx);
782
783 let leaf_chain = leaves
785 .into_iter()
786 .rev()
788 .map(|mut leaf| {
789 let view = leaf.view_number();
790
791 let vid_share = vid_shares.remove(&view);
793 if vid_share.is_none() {
794 tracing::debug!(?view, "VID share not available at decide");
795 }
796
797 if let Some(proposal) = da_proposals.remove(&view) {
799 let payload =
800 Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata);
801 leaf.fill_block_payload_unchecked(payload);
802 } else if view == ViewNumber::genesis() {
803 leaf.fill_block_payload_unchecked(Payload::empty().0);
806 } else {
807 tracing::debug!(?view, "DA proposal not available at decide");
808 }
809
810 let state_cert = state_certs
811 .get(&view)
812 .cloned();
813
814 LeafInfo {
815 leaf,
816 vid_share,
817 state_cert,
818 state: Default::default(),
821 delta: Default::default(),
822 }
823 })
824 .collect();
825
826 tracing::debug!(?to_view, ?final_qc, ?leaf_chain, "generating decide event");
828 consumer
829 .handle_event(&Event {
830 view_number: to_view,
831 event: EventType::Decide {
832 leaf_chain: Arc::new(leaf_chain),
833 qc: Arc::new(final_qc),
834 block_size: None,
835 },
836 })
837 .await?;
838
839 let mut tx = self.db.write().await?;
840
841 tx.upsert(
847 "event_stream",
848 ["id", "last_processed_view"],
849 ["id"],
850 [(1i32, to_view.u64() as i64)],
851 )
852 .await?;
853
854 for (epoch, state_cert) in state_certs {
856 let state_cert_bytes = bincode::serialize(&state_cert)?;
857 tx.upsert(
858 "finalized_state_cert",
859 ["epoch", "state_cert"],
860 ["epoch"],
861 [(epoch as i64, state_cert_bytes)],
862 )
863 .await?;
864 }
865
866 tx.execute(
868 query("DELETE FROM vid_share2 where view >= $1 AND view <= $2")
869 .bind(from_view.u64() as i64)
870 .bind(to_view.u64() as i64),
871 )
872 .await?;
873 tx.execute(
874 query("DELETE FROM da_proposal2 where view >= $1 AND view <= $2")
875 .bind(from_view.u64() as i64)
876 .bind(to_view.u64() as i64),
877 )
878 .await?;
879 tx.execute(
880 query("DELETE FROM quorum_proposals2 where view >= $1 AND view <= $2")
881 .bind(from_view.u64() as i64)
882 .bind(to_view.u64() as i64),
883 )
884 .await?;
885 tx.execute(
886 query("DELETE FROM quorum_certificate2 where view >= $1 AND view <= $2")
887 .bind(from_view.u64() as i64)
888 .bind(to_view.u64() as i64),
889 )
890 .await?;
891 tx.execute(
892 query("DELETE FROM state_cert where view >= $1 AND view <= $2")
893 .bind(from_view.u64() as i64)
894 .bind(to_view.u64() as i64),
895 )
896 .await?;
897
898 tx.execute(
902 query("DELETE FROM anchor_leaf2 WHERE view >= $1 AND view < $2")
903 .bind(from_view.u64() as i64)
904 .bind(to_view.u64() as i64),
905 )
906 .await?;
907
908 tx.commit().await?;
909 last_processed_view = Some(to_view.u64() as i64);
910 }
911 }
912
913 async fn load_state_certs(
914 tx: &mut Transaction<Read>,
915 from_view: ViewNumber,
916 to_view: ViewNumber,
917 ) -> anyhow::Result<BTreeMap<u64, LightClientStateUpdateCertificateV2<SeqTypes>>> {
918 let rows = tx
919 .fetch_all(
920 query("SELECT view, state_cert FROM state_cert WHERE view >= $1 AND view <= $2")
921 .bind(from_view.u64() as i64)
922 .bind(to_view.u64() as i64),
923 )
924 .await?;
925
926 let mut result = BTreeMap::new();
927
928 for row in rows {
929 let data: Vec<u8> = row.get("state_cert");
930
931 let cert: LightClientStateUpdateCertificateV2<SeqTypes> = bincode::deserialize(&data)
932 .or_else(|err_v2| {
933 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&data)
934 .map(Into::into)
935 .context(format!(
936 "Failed to deserialize LightClientStateUpdateCertificate: with v1 and v2. \
937 error: {err_v2}"
938 ))
939 })?;
940
941 result.insert(cert.epoch.u64(), cert);
942 }
943
944 Ok(result)
945 }
946
947 #[tracing::instrument(skip(self))]
948 async fn prune(&self, cur_view: ViewNumber) -> anyhow::Result<()> {
949 let mut tx = self.db.write().await?;
950
951 prune_to_view(
953 &mut tx,
954 cur_view.u64().saturating_sub(self.gc_opt.target_retention),
955 )
956 .await?;
957
958 #[cfg(feature = "embedded-db")]
961 let usage_query = format!(
962 "SELECT sum(pgsize) FROM dbstat WHERE name IN ({})",
963 PRUNE_TABLES
964 .iter()
965 .map(|table| format!("'{table}'"))
966 .join(",")
967 );
968
969 #[cfg(not(feature = "embedded-db"))]
970 let usage_query = {
971 let table_sizes = PRUNE_TABLES
972 .iter()
973 .map(|table| format!("pg_table_size('{table}')"))
974 .join(" + ");
975 format!("SELECT {table_sizes}")
976 };
977
978 let (usage,): (i64,) = query_as(&usage_query).fetch_one(tx.as_mut()).await?;
979 tracing::debug!(usage, "consensus storage usage after pruning");
980
981 if (usage as u64) > self.gc_opt.target_usage {
982 tracing::warn!(
983 usage,
984 gc_opt = ?self.gc_opt,
985 "consensus storage is running out of space, pruning to minimum retention"
986 );
987 prune_to_view(
988 &mut tx,
989 cur_view.u64().saturating_sub(self.gc_opt.minimum_retention),
990 )
991 .await?;
992 }
993
994 tx.commit().await
995 }
996}
997
998const PRUNE_TABLES: &[&str] = &[
999 "anchor_leaf2",
1000 "vid_share2",
1001 "da_proposal2",
1002 "quorum_proposals2",
1003 "quorum_certificate2",
1004];
1005
1006async fn prune_to_view(tx: &mut Transaction<Write>, view: u64) -> anyhow::Result<()> {
1007 if view == 0 {
1008 return Ok(());
1010 }
1011 tracing::debug!(view, "pruning consensus storage");
1012
1013 for table in PRUNE_TABLES {
1014 let res = query(&format!("DELETE FROM {table} WHERE view < $1"))
1015 .bind(view as i64)
1016 .execute(tx.as_mut())
1017 .await
1018 .context(format!("pruning {table}"))?;
1019 if res.rows_affected() > 0 {
1020 tracing::info!(
1021 "garbage collected {} rows from {table}",
1022 res.rows_affected()
1023 );
1024 }
1025 }
1026
1027 Ok(())
1028}
1029
1030#[async_trait]
1031impl SequencerPersistence for Persistence {
1032 fn into_catchup_provider(
1033 self,
1034 backoff: BackoffParams,
1035 ) -> anyhow::Result<Arc<dyn StateCatchup>> {
1036 Ok(Arc::new(SqlStateCatchup::new(Arc::new(self.db), backoff)))
1037 }
1038
1039 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
1040 tracing::info!("loading config from Postgres");
1041
1042 let Some(row) = self
1044 .db
1045 .read()
1046 .await?
1047 .fetch_optional("SELECT config FROM network_config ORDER BY id DESC LIMIT 1")
1048 .await?
1049 else {
1050 tracing::info!("config not found");
1051 return Ok(None);
1052 };
1053 let config = row.try_get("config")?;
1054 Ok(serde_json::from_value(config)?)
1055 }
1056
1057 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
1058 tracing::info!("saving config to database");
1059 let json = serde_json::to_value(cfg)?;
1060
1061 let mut tx = self.db.write().await?;
1062 tx.execute(query("INSERT INTO network_config (config) VALUES ($1)").bind(json))
1063 .await?;
1064 tx.commit().await
1065 }
1066
1067 async fn append_decided_leaves(
1068 &self,
1069 view: ViewNumber,
1070 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, QuorumCertificate2<SeqTypes>)> + Send,
1071 consumer: &(impl EventConsumer + 'static),
1072 ) -> anyhow::Result<()> {
1073 let values = leaf_chain
1074 .into_iter()
1075 .map(|(info, qc2)| {
1076 let mut leaf = info.leaf.clone();
1081 leaf.unfill_block_payload();
1082
1083 let view = qc2.view_number.u64() as i64;
1084 let leaf_bytes = bincode::serialize(&leaf)?;
1085 let qc_bytes = bincode::serialize(&qc2)?;
1086 Ok((view, leaf_bytes, qc_bytes))
1087 })
1088 .collect::<anyhow::Result<Vec<_>>>()?;
1089
1090 let mut tx = self.db.write().await?;
1093
1094 tx.upsert("anchor_leaf2", ["view", "leaf", "qc"], ["view"], values)
1095 .await?;
1096 tx.commit().await?;
1097
1098 if let Err(err) = self.generate_decide_events(consumer).await {
1101 tracing::warn!(?view, "event processing failed: {err:#}");
1105 return Ok(());
1106 }
1107
1108 if let Err(err) = self.prune(view).await {
1111 tracing::warn!(?view, "pruning failed: {err:#}");
1112 }
1113
1114 Ok(())
1115 }
1116
1117 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1118 Ok(self
1119 .db
1120 .read()
1121 .await?
1122 .fetch_optional(query("SELECT view FROM highest_voted_view WHERE id = 0"))
1123 .await?
1124 .map(|row| {
1125 let view: i64 = row.get("view");
1126 ViewNumber::new(view as u64)
1127 }))
1128 }
1129
1130 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1131 Ok(self
1132 .db
1133 .read()
1134 .await?
1135 .fetch_optional(query("SELECT view FROM restart_view WHERE id = 0"))
1136 .await?
1137 .map(|row| {
1138 let view: i64 = row.get("view");
1139 ViewNumber::new(view as u64)
1140 }))
1141 }
1142
1143 async fn load_anchor_leaf(
1144 &self,
1145 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
1146 let Some(row) = self
1147 .db
1148 .read()
1149 .await?
1150 .fetch_optional("SELECT leaf, qc FROM anchor_leaf2 ORDER BY view DESC LIMIT 1")
1151 .await?
1152 else {
1153 return Ok(None);
1154 };
1155
1156 let leaf_bytes: Vec<u8> = row.get("leaf");
1157 let leaf2: Leaf2 = bincode::deserialize(&leaf_bytes)?;
1158
1159 let qc_bytes: Vec<u8> = row.get("qc");
1160 let qc2: QuorumCertificate2<SeqTypes> = bincode::deserialize(&qc_bytes)?;
1161
1162 Ok(Some((leaf2, qc2)))
1163 }
1164
1165 async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
1166 let mut tx = self.db.read().await?;
1167 let (view,) = query_as::<(i64,)>("SELECT coalesce(max(view), 0) FROM anchor_leaf2")
1168 .fetch_one(tx.as_mut())
1169 .await?;
1170 Ok(ViewNumber::new(view as u64))
1171 }
1172
1173 async fn load_da_proposal(
1174 &self,
1175 view: ViewNumber,
1176 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
1177 let result = self
1178 .db
1179 .read()
1180 .await?
1181 .fetch_optional(
1182 query("SELECT data FROM da_proposal2 where view = $1").bind(view.u64() as i64),
1183 )
1184 .await?;
1185
1186 result
1187 .map(|row| {
1188 let bytes: Vec<u8> = row.get("data");
1189 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1190 })
1191 .transpose()
1192 }
1193
1194 async fn load_vid_share(
1195 &self,
1196 view: ViewNumber,
1197 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
1198 let result = self
1199 .db
1200 .read()
1201 .await?
1202 .fetch_optional(
1203 query("SELECT data FROM vid_share2 where view = $1").bind(view.u64() as i64),
1204 )
1205 .await?;
1206
1207 result
1208 .map(|row| {
1209 let bytes: Vec<u8> = row.get("data");
1210 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1211 })
1212 .transpose()
1213 }
1214
1215 async fn load_quorum_proposals(
1216 &self,
1217 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
1218 {
1219 let rows = self
1220 .db
1221 .read()
1222 .await?
1223 .fetch_all("SELECT * FROM quorum_proposals2")
1224 .await?;
1225
1226 Ok(BTreeMap::from_iter(
1227 rows.into_iter()
1228 .map(|row| {
1229 let view: i64 = row.get("view");
1230 let view_number: ViewNumber = ViewNumber::new(view.try_into()?);
1231 let bytes: Vec<u8> = row.get("data");
1232 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1233 bincode::deserialize(&bytes).or_else(|error| {
1234 bincode::deserialize::<
1235 Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>,
1236 >(&bytes)
1237 .map(convert_proposal)
1238 .inspect_err(|err_v3| {
1239 tracing::warn!(
1240 ?view_number,
1241 %error,
1242 %err_v3,
1243 "ignoring malformed quorum proposal DB row"
1244 );
1245 })
1246 })?;
1247 Ok((view_number, proposal))
1248 })
1249 .collect::<anyhow::Result<Vec<_>>>()?,
1250 ))
1251 }
1252
1253 async fn load_quorum_proposal(
1254 &self,
1255 view: ViewNumber,
1256 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
1257 let mut tx = self.db.read().await?;
1258 let (data,) =
1259 query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE view = $1 LIMIT 1")
1260 .bind(view.u64() as i64)
1261 .fetch_one(tx.as_mut())
1262 .await?;
1263 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1264 bincode::deserialize(&data).or_else(|error| {
1265 bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1266 &data,
1267 )
1268 .map(convert_proposal)
1269 .context(format!(
1270 "Failed to deserialize quorum proposal for view {view}. error={error}"
1271 ))
1272 })?;
1273
1274 Ok(proposal)
1275 }
1276
1277 async fn append_vid(
1278 &self,
1279 proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
1280 ) -> anyhow::Result<()> {
1281 let view = proposal.data.view_number.u64();
1282 let payload_hash = proposal.data.payload_commitment;
1283 let proposal: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1284 convert_proposal(proposal.clone());
1285 let data_bytes = bincode::serialize(&proposal).unwrap();
1286
1287 let now = Instant::now();
1288 let mut tx = self.db.write().await?;
1289 tx.upsert(
1290 "vid_share2",
1291 ["view", "data", "payload_hash"],
1292 ["view"],
1293 [(view as i64, data_bytes, payload_hash.to_string())],
1294 )
1295 .await?;
1296 let res = tx.commit().await;
1297 self.internal_metrics
1298 .internal_append_vid_duration
1299 .add_point(now.elapsed().as_secs_f64());
1300 res
1301 }
1302 async fn append_vid2(
1303 &self,
1304 proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
1305 ) -> anyhow::Result<()> {
1306 let view = proposal.data.view_number.u64();
1307 let payload_hash = proposal.data.payload_commitment;
1308 let proposal: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1309 convert_proposal(proposal.clone());
1310 let data_bytes = bincode::serialize(&proposal).unwrap();
1311
1312 let now = Instant::now();
1313 let mut tx = self.db.write().await?;
1314 tx.upsert(
1315 "vid_share2",
1316 ["view", "data", "payload_hash"],
1317 ["view"],
1318 [(view as i64, data_bytes, payload_hash.to_string())],
1319 )
1320 .await?;
1321 let res = tx.commit().await;
1322 self.internal_metrics
1323 .internal_append_vid2_duration
1324 .add_point(now.elapsed().as_secs_f64());
1325 res
1326 }
1327
1328 async fn append_da(
1329 &self,
1330 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1331 vid_commit: VidCommitment,
1332 ) -> anyhow::Result<()> {
1333 let data = &proposal.data;
1334 let view = data.view_number().u64();
1335 let data_bytes = bincode::serialize(proposal).unwrap();
1336
1337 let now = Instant::now();
1338 let mut tx = self.db.write().await?;
1339 tx.upsert(
1340 "da_proposal",
1341 ["view", "data", "payload_hash"],
1342 ["view"],
1343 [(view as i64, data_bytes, vid_commit.to_string())],
1344 )
1345 .await?;
1346 let res = tx.commit().await;
1347 self.internal_metrics
1348 .internal_append_da_duration
1349 .add_point(now.elapsed().as_secs_f64());
1350 res
1351 }
1352
1353 async fn record_action(
1354 &self,
1355 view: ViewNumber,
1356 _epoch: Option<EpochNumber>,
1357 action: HotShotAction,
1358 ) -> anyhow::Result<()> {
1359 if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
1361 return Ok(());
1362 }
1363
1364 let stmt = format!(
1365 "INSERT INTO highest_voted_view (id, view) VALUES (0, $1)
1366 ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(highest_voted_view.view, excluded.view)"
1367 );
1368
1369 let mut tx = self.db.write().await?;
1370 tx.execute(query(&stmt).bind(view.u64() as i64)).await?;
1371
1372 if matches!(action, HotShotAction::Vote) {
1373 let restart_view = view + 1;
1374 let stmt = format!(
1375 "INSERT INTO restart_view (id, view) VALUES (0, $1)
1376 ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(restart_view.view, excluded.view)"
1377 );
1378 tx.execute(query(&stmt).bind(restart_view.u64() as i64))
1379 .await?;
1380 }
1381
1382 tx.commit().await
1383 }
1384
1385 async fn append_quorum_proposal2(
1386 &self,
1387 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1388 ) -> anyhow::Result<()> {
1389 let view_number = proposal.data.view_number().u64();
1390
1391 let proposal_bytes = bincode::serialize(&proposal).context("serializing proposal")?;
1392 let leaf_hash = Committable::commit(&Leaf2::from_quorum_proposal(&proposal.data));
1393
1394 let now = Instant::now();
1395 let mut tx = self.db.write().await?;
1396 tx.upsert(
1397 "quorum_proposals2",
1398 ["view", "leaf_hash", "data"],
1399 ["view"],
1400 [(view_number as i64, leaf_hash.to_string(), proposal_bytes)],
1401 )
1402 .await?;
1403
1404 let justify_qc = proposal.data.justify_qc();
1406 let justify_qc_bytes = bincode::serialize(&justify_qc).context("serializing QC")?;
1407 tx.upsert(
1408 "quorum_certificate2",
1409 ["view", "leaf_hash", "data"],
1410 ["view"],
1411 [(
1412 justify_qc.view_number.u64() as i64,
1413 justify_qc.data.leaf_commit.to_string(),
1414 &justify_qc_bytes,
1415 )],
1416 )
1417 .await?;
1418 let res = tx.commit().await;
1419 self.internal_metrics
1420 .internal_append_quorum2_duration
1421 .add_point(now.elapsed().as_secs_f64());
1422 res
1423 }
1424
1425 async fn load_upgrade_certificate(
1426 &self,
1427 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1428 let result = self
1429 .db
1430 .read()
1431 .await?
1432 .fetch_optional("SELECT * FROM upgrade_certificate where id = true")
1433 .await?;
1434
1435 result
1436 .map(|row| {
1437 let bytes: Vec<u8> = row.get("data");
1438 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1439 })
1440 .transpose()
1441 }
1442
1443 async fn store_upgrade_certificate(
1444 &self,
1445 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1446 ) -> anyhow::Result<()> {
1447 let certificate = match decided_upgrade_certificate {
1448 Some(cert) => cert,
1449 None => return Ok(()),
1450 };
1451 let upgrade_certificate_bytes =
1452 bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1453 let mut tx = self.db.write().await?;
1454 tx.upsert(
1455 "upgrade_certificate",
1456 ["id", "data"],
1457 ["id"],
1458 [(true, upgrade_certificate_bytes)],
1459 )
1460 .await?;
1461 tx.commit().await
1462 }
1463
1464 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1465 let batch_size: i64 = 10000;
1466 let mut tx = self.db.read().await?;
1467
1468 let (is_completed, mut offset) = query_as::<(bool, i64)>(
1473 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'anchor_leaf'",
1474 )
1475 .fetch_one(tx.as_mut())
1476 .await?;
1477
1478 if is_completed {
1479 tracing::info!("decided leaves already migrated");
1480 return Ok(());
1481 }
1482
1483 tracing::warn!("migrating decided leaves..");
1484 loop {
1485 let mut tx = self.db.read().await?;
1486 let rows = query(
1487 "SELECT view, leaf, qc FROM anchor_leaf WHERE view >= $1 ORDER BY view LIMIT $2",
1488 )
1489 .bind(offset)
1490 .bind(batch_size)
1491 .fetch_all(tx.as_mut())
1492 .await?;
1493
1494 drop(tx);
1495 if rows.is_empty() {
1496 break;
1497 }
1498 let mut values = Vec::new();
1499
1500 for row in rows.iter() {
1501 let leaf: Vec<u8> = row.try_get("leaf")?;
1502 let qc: Vec<u8> = row.try_get("qc")?;
1503 let leaf1: Leaf = bincode::deserialize(&leaf)?;
1504 let qc1: QuorumCertificate<SeqTypes> = bincode::deserialize(&qc)?;
1505 let view: i64 = row.try_get("view")?;
1506
1507 let leaf2: Leaf2 = leaf1.into();
1508 let qc2: QuorumCertificate2<SeqTypes> = qc1.to_qc2();
1509
1510 let leaf2_bytes = bincode::serialize(&leaf2)?;
1511 let qc2_bytes = bincode::serialize(&qc2)?;
1512
1513 values.push((view, leaf2_bytes, qc2_bytes));
1514 }
1515
1516 let mut query_builder: sqlx::QueryBuilder<Db> =
1517 sqlx::QueryBuilder::new("INSERT INTO anchor_leaf2 (view, leaf, qc) ");
1518
1519 offset = values.last().context("last row")?.0;
1520
1521 query_builder.push_values(values.into_iter(), |mut b, (view, leaf, qc)| {
1522 b.push_bind(view).push_bind(leaf).push_bind(qc);
1523 });
1524
1525 query_builder.push(" ON CONFLICT DO NOTHING");
1528
1529 let query = query_builder.build();
1530
1531 let mut tx = self.db.write().await?;
1532 query.execute(tx.as_mut()).await?;
1533
1534 tx.upsert(
1535 "epoch_migration",
1536 ["table_name", "completed", "migrated_rows"],
1537 ["table_name"],
1538 [("anchor_leaf".to_string(), false, offset)],
1539 )
1540 .await?;
1541 tx.commit().await?;
1542
1543 tracing::info!(
1544 "anchor leaf migration progress: rows={} offset={}",
1545 rows.len(),
1546 offset
1547 );
1548
1549 if rows.len() < batch_size as usize {
1550 break;
1551 }
1552 }
1553
1554 tracing::warn!("migrated decided leaves");
1555
1556 let mut tx = self.db.write().await?;
1557 tx.upsert(
1558 "epoch_migration",
1559 ["table_name", "completed", "migrated_rows"],
1560 ["table_name"],
1561 [("anchor_leaf".to_string(), true, offset)],
1562 )
1563 .await?;
1564 tx.commit().await?;
1565
1566 tracing::info!("updated epoch_migration table for anchor_leaf");
1567
1568 Ok(())
1569 }
1570
1571 async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1572 let batch_size: i64 = 10000;
1573 let mut tx = self.db.read().await?;
1574
1575 let (is_completed, mut offset) = query_as::<(bool, i64)>(
1576 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'da_proposal'",
1577 )
1578 .fetch_one(tx.as_mut())
1579 .await?;
1580
1581 if is_completed {
1582 tracing::info!("da proposals migration already done");
1583 return Ok(());
1584 }
1585
1586 tracing::warn!("migrating da proposals..");
1587
1588 loop {
1589 let mut tx = self.db.read().await?;
1590 let rows = query(
1591 "SELECT payload_hash, data FROM da_proposal WHERE view >= $1 ORDER BY view LIMIT \
1592 $2",
1593 )
1594 .bind(offset)
1595 .bind(batch_size)
1596 .fetch_all(tx.as_mut())
1597 .await?;
1598
1599 drop(tx);
1600 if rows.is_empty() {
1601 break;
1602 }
1603 let mut values = Vec::new();
1604
1605 for row in rows.iter() {
1606 let data: Vec<u8> = row.try_get("data")?;
1607 let payload_hash: String = row.try_get("payload_hash")?;
1608
1609 let da_proposal: Proposal<SeqTypes, DaProposal<SeqTypes>> =
1610 bincode::deserialize(&data)?;
1611 let da_proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
1612 convert_proposal(da_proposal);
1613
1614 let view = da_proposal2.data.view_number.u64() as i64;
1615 let data = bincode::serialize(&da_proposal2)?;
1616
1617 values.push((view, payload_hash, data));
1618 }
1619
1620 let mut query_builder: sqlx::QueryBuilder<Db> =
1621 sqlx::QueryBuilder::new("INSERT INTO da_proposal2 (view, payload_hash, data) ");
1622
1623 offset = values.last().context("last row")?.0;
1624 query_builder.push_values(values.into_iter(), |mut b, (view, payload_hash, data)| {
1625 b.push_bind(view).push_bind(payload_hash).push_bind(data);
1626 });
1627 query_builder.push(" ON CONFLICT DO NOTHING");
1628 let query = query_builder.build();
1629
1630 let mut tx = self.db.write().await?;
1631 query.execute(tx.as_mut()).await?;
1632
1633 tx.upsert(
1634 "epoch_migration",
1635 ["table_name", "completed", "migrated_rows"],
1636 ["table_name"],
1637 [("da_proposal".to_string(), false, offset)],
1638 )
1639 .await?;
1640 tx.commit().await?;
1641
1642 tracing::info!(
1643 "DA proposals migration progress: rows={} offset={}",
1644 rows.len(),
1645 offset
1646 );
1647 if rows.len() < batch_size as usize {
1648 break;
1649 }
1650 }
1651
1652 tracing::warn!("migrated da proposals");
1653
1654 let mut tx = self.db.write().await?;
1655 tx.upsert(
1656 "epoch_migration",
1657 ["table_name", "completed", "migrated_rows"],
1658 ["table_name"],
1659 [("da_proposal".to_string(), true, offset)],
1660 )
1661 .await?;
1662 tx.commit().await?;
1663
1664 tracing::info!("updated epoch_migration table for da_proposal");
1665
1666 Ok(())
1667 }
1668
1669 async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1670 let batch_size: i64 = 10000;
1671
1672 let mut tx = self.db.read().await?;
1673
1674 let (is_completed, mut offset) = query_as::<(bool, i64)>(
1675 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'vid_share'",
1676 )
1677 .fetch_one(tx.as_mut())
1678 .await?;
1679
1680 if is_completed {
1681 tracing::info!("vid_share migration already done");
1682 return Ok(());
1683 }
1684
1685 tracing::warn!("migrating vid shares..");
1686 loop {
1687 let mut tx = self.db.read().await?;
1688 let rows = query(
1689 "SELECT payload_hash, data FROM vid_share WHERE view >= $1 ORDER BY view LIMIT $2",
1690 )
1691 .bind(offset)
1692 .bind(batch_size)
1693 .fetch_all(tx.as_mut())
1694 .await?;
1695
1696 drop(tx);
1697 if rows.is_empty() {
1698 break;
1699 }
1700 let mut values = Vec::new();
1701
1702 for row in rows.iter() {
1703 let data: Vec<u8> = row.try_get("data")?;
1704 let payload_hash: String = row.try_get("payload_hash")?;
1705
1706 let vid_share: Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>> =
1707 bincode::deserialize(&data)?;
1708 let vid_share2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1709 convert_proposal(vid_share);
1710
1711 let view = vid_share2.data.view_number().u64() as i64;
1712 let data = bincode::serialize(&vid_share2)?;
1713
1714 values.push((view, payload_hash, data));
1715 }
1716
1717 let mut query_builder: sqlx::QueryBuilder<Db> =
1718 sqlx::QueryBuilder::new("INSERT INTO vid_share2 (view, payload_hash, data) ");
1719
1720 offset = values.last().context("last row")?.0;
1721
1722 query_builder.push_values(values.into_iter(), |mut b, (view, payload_hash, data)| {
1723 b.push_bind(view).push_bind(payload_hash).push_bind(data);
1724 });
1725
1726 let query = query_builder.build();
1727
1728 let mut tx = self.db.write().await?;
1729 query.execute(tx.as_mut()).await?;
1730
1731 tx.upsert(
1732 "epoch_migration",
1733 ["table_name", "completed", "migrated_rows"],
1734 ["table_name"],
1735 [("vid_share".to_string(), false, offset)],
1736 )
1737 .await?;
1738 tx.commit().await?;
1739
1740 tracing::info!(
1741 "VID shares migration progress: rows={} offset={}",
1742 rows.len(),
1743 offset
1744 );
1745 if rows.len() < batch_size as usize {
1746 break;
1747 }
1748 }
1749
1750 tracing::warn!("migrated vid shares");
1751
1752 let mut tx = self.db.write().await?;
1753 tx.upsert(
1754 "epoch_migration",
1755 ["table_name", "completed", "migrated_rows"],
1756 ["table_name"],
1757 [("vid_share".to_string(), true, offset)],
1758 )
1759 .await?;
1760 tx.commit().await?;
1761
1762 tracing::info!("updated epoch_migration table for vid_share");
1763
1764 Ok(())
1765 }
1766
1767 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
1768 let batch_size: i64 = 10000;
1769 let mut tx = self.db.read().await?;
1770
1771 let (is_completed, mut offset) = query_as::<(bool, i64)>(
1772 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
1773 'quorum_proposals'",
1774 )
1775 .fetch_one(tx.as_mut())
1776 .await?;
1777
1778 if is_completed {
1779 tracing::info!("quorum proposals migration already done");
1780 return Ok(());
1781 }
1782
1783 tracing::warn!("migrating quorum proposals..");
1784
1785 loop {
1786 let mut tx = self.db.read().await?;
1787 let rows = query(
1788 "SELECT view, leaf_hash, data FROM quorum_proposals WHERE view >= $1 ORDER BY \
1789 view LIMIT $2",
1790 )
1791 .bind(offset)
1792 .bind(batch_size)
1793 .fetch_all(tx.as_mut())
1794 .await?;
1795
1796 drop(tx);
1797
1798 if rows.is_empty() {
1799 break;
1800 }
1801
1802 let mut values = Vec::new();
1803
1804 for row in rows.iter() {
1805 let leaf_hash: String = row.try_get("leaf_hash")?;
1806 let data: Vec<u8> = row.try_get("data")?;
1807
1808 let quorum_proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
1809 bincode::deserialize(&data)?;
1810 let quorum_proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1811 convert_proposal(quorum_proposal);
1812
1813 let view = quorum_proposal2.data.view_number().u64() as i64;
1814 let data = bincode::serialize(&quorum_proposal2)?;
1815
1816 values.push((view, leaf_hash, data));
1817 }
1818
1819 let mut query_builder: sqlx::QueryBuilder<Db> =
1820 sqlx::QueryBuilder::new("INSERT INTO quorum_proposals2 (view, leaf_hash, data) ");
1821
1822 offset = values.last().context("last row")?.0;
1823 query_builder.push_values(values.into_iter(), |mut b, (view, leaf_hash, data)| {
1824 b.push_bind(view).push_bind(leaf_hash).push_bind(data);
1825 });
1826
1827 query_builder.push(" ON CONFLICT DO NOTHING");
1828
1829 let query = query_builder.build();
1830
1831 let mut tx = self.db.write().await?;
1832 query.execute(tx.as_mut()).await?;
1833
1834 tx.upsert(
1835 "epoch_migration",
1836 ["table_name", "completed", "migrated_rows"],
1837 ["table_name"],
1838 [("quorum_proposals".to_string(), false, offset)],
1839 )
1840 .await?;
1841 tx.commit().await?;
1842
1843 tracing::info!(
1844 "quorum proposals migration progress: rows={} offset={}",
1845 rows.len(),
1846 offset
1847 );
1848
1849 if rows.len() < batch_size as usize {
1850 break;
1851 }
1852 }
1853
1854 tracing::warn!("migrated quorum proposals");
1855
1856 let mut tx = self.db.write().await?;
1857 tx.upsert(
1858 "epoch_migration",
1859 ["table_name", "completed", "migrated_rows"],
1860 ["table_name"],
1861 [("quorum_proposals".to_string(), true, offset)],
1862 )
1863 .await?;
1864 tx.commit().await?;
1865
1866 tracing::info!("updated epoch_migration table for quorum_proposals");
1867
1868 Ok(())
1869 }
1870
1871 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
1872 let batch_size: i64 = 10000;
1873 let mut tx = self.db.read().await?;
1874
1875 let (is_completed, mut offset) = query_as::<(bool, i64)>(
1876 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
1877 'quorum_certificate'",
1878 )
1879 .fetch_one(tx.as_mut())
1880 .await?;
1881
1882 if is_completed {
1883 tracing::info!("quorum certificates migration already done");
1884 return Ok(());
1885 }
1886
1887 tracing::warn!("migrating quorum certificates..");
1888 loop {
1889 let mut tx = self.db.read().await?;
1890 let rows = query(
1891 "SELECT view, leaf_hash, data FROM quorum_certificate WHERE view >= $1 ORDER BY \
1892 view LIMIT $2",
1893 )
1894 .bind(offset)
1895 .bind(batch_size)
1896 .fetch_all(tx.as_mut())
1897 .await?;
1898
1899 drop(tx);
1900 if rows.is_empty() {
1901 break;
1902 }
1903 let mut values = Vec::new();
1904
1905 for row in rows.iter() {
1906 let leaf_hash: String = row.try_get("leaf_hash")?;
1907 let data: Vec<u8> = row.try_get("data")?;
1908
1909 let qc: QuorumCertificate<SeqTypes> = bincode::deserialize(&data)?;
1910 let qc2: QuorumCertificate2<SeqTypes> = qc.to_qc2();
1911
1912 let view = qc2.view_number().u64() as i64;
1913 let data = bincode::serialize(&qc2)?;
1914
1915 values.push((view, leaf_hash, data));
1916 }
1917
1918 let mut query_builder: sqlx::QueryBuilder<Db> =
1919 sqlx::QueryBuilder::new("INSERT INTO quorum_certificate2 (view, leaf_hash, data) ");
1920
1921 offset = values.last().context("last row")?.0;
1922
1923 query_builder.push_values(values.into_iter(), |mut b, (view, leaf_hash, data)| {
1924 b.push_bind(view).push_bind(leaf_hash).push_bind(data);
1925 });
1926
1927 query_builder.push(" ON CONFLICT DO NOTHING");
1928 let query = query_builder.build();
1929
1930 let mut tx = self.db.write().await?;
1931 query.execute(tx.as_mut()).await?;
1932
1933 tx.upsert(
1934 "epoch_migration",
1935 ["table_name", "completed", "migrated_rows"],
1936 ["table_name"],
1937 [("quorum_certificate".to_string(), false, offset)],
1938 )
1939 .await?;
1940 tx.commit().await?;
1941
1942 tracing::info!(
1943 "Quorum certificates migration progress: rows={} offset={}",
1944 rows.len(),
1945 offset
1946 );
1947
1948 if rows.len() < batch_size as usize {
1949 break;
1950 }
1951 }
1952
1953 tracing::warn!("migrated quorum certificates");
1954
1955 let mut tx = self.db.write().await?;
1956 tx.upsert(
1957 "epoch_migration",
1958 ["table_name", "completed", "migrated_rows"],
1959 ["table_name"],
1960 [("quorum_certificate".to_string(), true, offset)],
1961 )
1962 .await?;
1963 tx.commit().await?;
1964 tracing::info!("updated epoch_migration table for quorum_certificate");
1965
1966 Ok(())
1967 }
1968
1969 async fn store_next_epoch_quorum_certificate(
1970 &self,
1971 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1972 ) -> anyhow::Result<()> {
1973 let qc2_bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
1974 let mut tx = self.db.write().await?;
1975 tx.upsert(
1976 "next_epoch_quorum_certificate",
1977 ["id", "data"],
1978 ["id"],
1979 [(true, qc2_bytes)],
1980 )
1981 .await?;
1982 tx.commit().await
1983 }
1984
1985 async fn load_next_epoch_quorum_certificate(
1986 &self,
1987 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
1988 let result = self
1989 .db
1990 .read()
1991 .await?
1992 .fetch_optional("SELECT * FROM next_epoch_quorum_certificate where id = true")
1993 .await?;
1994
1995 result
1996 .map(|row| {
1997 let bytes: Vec<u8> = row.get("data");
1998 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1999 })
2000 .transpose()
2001 }
2002
2003 async fn append_da2(
2004 &self,
2005 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
2006 vid_commit: VidCommitment,
2007 ) -> anyhow::Result<()> {
2008 let data = &proposal.data;
2009 let view = data.view_number().u64();
2010 let data_bytes = bincode::serialize(proposal).unwrap();
2011
2012 let now = Instant::now();
2013 let mut tx = self.db.write().await?;
2014 tx.upsert(
2015 "da_proposal2",
2016 ["view", "data", "payload_hash"],
2017 ["view"],
2018 [(view as i64, data_bytes, vid_commit.to_string())],
2019 )
2020 .await?;
2021 let res = tx.commit().await;
2022 self.internal_metrics
2023 .internal_append_da2_duration
2024 .add_point(now.elapsed().as_secs_f64());
2025 res
2026 }
2027
2028 async fn store_drb_result(
2029 &self,
2030 epoch: EpochNumber,
2031 drb_result: DrbResult,
2032 ) -> anyhow::Result<()> {
2033 let drb_result_vec = Vec::from(drb_result);
2034 let mut tx = self.db.write().await?;
2035 tx.upsert(
2036 "epoch_drb_and_root",
2037 ["epoch", "drb_result"],
2038 ["epoch"],
2039 [(epoch.u64() as i64, drb_result_vec)],
2040 )
2041 .await?;
2042 tx.commit().await
2043 }
2044
2045 async fn store_epoch_root(
2046 &self,
2047 epoch: EpochNumber,
2048 block_header: <SeqTypes as NodeType>::BlockHeader,
2049 ) -> anyhow::Result<()> {
2050 let block_header_bytes =
2051 bincode::serialize(&block_header).context("serializing block header")?;
2052
2053 let mut tx = self.db.write().await?;
2054 tx.upsert(
2055 "epoch_drb_and_root",
2056 ["epoch", "block_header"],
2057 ["epoch"],
2058 [(epoch.u64() as i64, block_header_bytes)],
2059 )
2060 .await?;
2061 tx.commit().await
2062 }
2063
2064 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
2065 if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
2066 if loaded_drb_input.iteration >= drb_input.iteration {
2067 anyhow::bail!(
2068 "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
2069 loaded_drb_input,
2070 drb_input
2071 )
2072 }
2073 }
2074
2075 let drb_input_bytes = bincode::serialize(&drb_input)
2076 .context("Failed to serialize DrbInput. This is not fatal, but should never happen.")?;
2077
2078 let mut tx = self.db.write().await?;
2079
2080 tx.upsert(
2081 "drb",
2082 ["epoch", "drb_input"],
2083 ["epoch"],
2084 [(drb_input.epoch as i64, drb_input_bytes)],
2085 )
2086 .await?;
2087 tx.commit().await
2088 }
2089
2090 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
2091 let row = self
2092 .db
2093 .read()
2094 .await?
2095 .fetch_optional(query("SELECT drb_input FROM drb WHERE epoch = $1").bind(epoch as i64))
2096 .await?;
2097
2098 match row {
2099 None => anyhow::bail!("No DrbInput for epoch {} in storage", epoch),
2100 Some(row) => {
2101 let drb_input_bytes: Vec<u8> = row.try_get("drb_input")?;
2102 let drb_input = bincode::deserialize(&drb_input_bytes)
2103 .context("Failed to deserialize drb_input from storage")?;
2104
2105 Ok(drb_input)
2106 },
2107 }
2108 }
2109
2110 async fn add_state_cert(
2111 &self,
2112 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2113 ) -> anyhow::Result<()> {
2114 let state_cert_bytes = bincode::serialize(&state_cert)
2115 .context("serializing light client state update certificate")?;
2116
2117 let mut tx = self.db.write().await?;
2118 tx.upsert(
2119 "state_cert",
2120 ["view", "state_cert"],
2121 ["view"],
2122 [(
2123 state_cert.light_client_state.view_number as i64,
2124 state_cert_bytes,
2125 )],
2126 )
2127 .await?;
2128 tx.commit().await
2129 }
2130
2131 async fn load_state_cert(
2132 &self,
2133 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2134 let Some(row) = self
2135 .db
2136 .read()
2137 .await?
2138 .fetch_optional(
2139 "SELECT state_cert FROM finalized_state_cert ORDER BY epoch DESC LIMIT 1",
2140 )
2141 .await?
2142 else {
2143 return Ok(None);
2144 };
2145 let bytes: Vec<u8> = row.get("state_cert");
2146
2147 let cert = match bincode::deserialize(&bytes) {
2148 Ok(cert) => cert,
2149 Err(err) => {
2150 tracing::info!(
2151 error = %err,
2152 "Failed to deserialize state certificate with v2. attempting with v1"
2153 );
2154
2155 let v1_cert =
2156 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2157 .with_context(|| {
2158 format!("Failed to deserialize using both v1 and v2. error: {err}")
2159 })?;
2160
2161 v1_cert.into()
2162 },
2163 };
2164
2165 Ok(Some(cert))
2166 }
2167
2168 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
2169 let rows = self
2170 .db
2171 .read()
2172 .await?
2173 .fetch_all(
2174 query("SELECT * from epoch_drb_and_root ORDER BY epoch DESC LIMIT $1")
2175 .bind(RECENT_STAKE_TABLES_LIMIT as i64),
2176 )
2177 .await?;
2178
2179 rows.into_iter()
2181 .rev()
2182 .map(|row| {
2183 let epoch: i64 = row.try_get("epoch")?;
2184 let drb_result: Option<Vec<u8>> = row.try_get("drb_result")?;
2185 let block_header: Option<Vec<u8>> = row.try_get("block_header")?;
2186 if let Some(drb_result) = drb_result {
2187 let drb_result_array = drb_result
2188 .try_into()
2189 .or_else(|_| bail!("invalid drb result"))?;
2190 let block_header: Option<<SeqTypes as NodeType>::BlockHeader> = block_header
2191 .map(|data| bincode::deserialize(&data))
2192 .transpose()?;
2193 Ok(Some(InitializerEpochInfo::<SeqTypes> {
2194 epoch: <SeqTypes as NodeType>::Epoch::new(epoch as u64),
2195 drb_result: drb_result_array,
2196 block_header,
2197 }))
2198 } else {
2199 Ok(None)
2202 }
2203 })
2204 .filter_map(|e| match e {
2205 Err(v) => Some(Err(v)),
2206 Ok(Some(v)) => Some(Ok(v)),
2207 Ok(None) => None,
2208 })
2209 .collect()
2210 }
2211
2212 fn enable_metrics(&mut self, metrics: &dyn Metrics) {
2213 self.internal_metrics = PersistenceMetricsValue::new(metrics);
2214 }
2215}
2216
2217#[async_trait]
2218impl MembershipPersistence for Persistence {
2219 async fn load_stake(
2220 &self,
2221 epoch: EpochNumber,
2222 ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>> {
2223 let result = self
2224 .db
2225 .read()
2226 .await?
2227 .fetch_optional(
2228 query(
2229 "SELECT stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
2230 epoch = $1",
2231 )
2232 .bind(epoch.u64() as i64),
2233 )
2234 .await?;
2235
2236 result
2237 .map(|row| {
2238 let stake_table_bytes: Vec<u8> = row.get("stake");
2239 let reward_bytes: Option<Vec<u8>> = row.get("block_reward");
2240 let stake_table_hash_bytes: Option<Vec<u8>> = row.get("stake_table_hash");
2241 let stake_table = bincode::deserialize(&stake_table_bytes)
2242 .context("deserializing stake table")?;
2243 let reward: Option<RewardAmount> = reward_bytes
2244 .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
2245 .transpose()?;
2246 let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes
2247 .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
2248 .transpose()?;
2249
2250 Ok((stake_table, reward, stake_table_hash))
2251 })
2252 .transpose()
2253 }
2254
2255 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
2256 let mut tx = self.db.read().await?;
2257
2258 let rows = match query_as::<(i64, Vec<u8>, Option<Vec<u8>>, Option<Vec<u8>>)>(
2259 "SELECT epoch, stake, block_reward, stake_table_hash FROM epoch_drb_and_root ORDER BY \
2260 epoch DESC LIMIT $1",
2261 )
2262 .bind(limit as i64)
2263 .fetch_all(tx.as_mut())
2264 .await
2265 {
2266 Ok(bytes) => bytes,
2267 Err(err) => {
2268 tracing::error!("error loading stake tables: {err:#}");
2269 bail!("{err:#}");
2270 },
2271 };
2272
2273 let stakes: anyhow::Result<Vec<IndexedStake>> = rows
2274 .into_iter()
2275 .map(
2276 |(id, stake_bytes, reward_bytes_opt, stake_table_hash_bytes_opt)| {
2277 let stake_table =
2278 bincode::deserialize(&stake_bytes).context("deserializing stake table")?;
2279
2280 let block_reward: Option<RewardAmount> = reward_bytes_opt
2281 .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
2282 .transpose()?;
2283
2284 let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes_opt
2285 .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
2286 .transpose()?;
2287
2288 Ok((
2289 EpochNumber::new(id as u64),
2290 (stake_table, block_reward),
2291 stake_table_hash,
2292 ))
2293 },
2294 )
2295 .collect();
2296
2297 Ok(Some(stakes?))
2298 }
2299
2300 async fn store_stake(
2301 &self,
2302 epoch: EpochNumber,
2303 stake: ValidatorMap,
2304 block_reward: Option<RewardAmount>,
2305 stake_table_hash: Option<StakeTableHash>,
2306 ) -> anyhow::Result<()> {
2307 let mut tx = self.db.write().await?;
2308
2309 let stake_table_bytes = bincode::serialize(&stake).context("serializing stake table")?;
2310 let reward_bytes = block_reward
2311 .map(|r| bincode::serialize(&r).context("serializing block reward"))
2312 .transpose()?;
2313 let stake_table_hash_bytes = stake_table_hash
2314 .map(|h| bincode::serialize(&h).context("serializing stake table hash"))
2315 .transpose()?;
2316 tx.upsert(
2317 "epoch_drb_and_root",
2318 ["epoch", "stake", "block_reward", "stake_table_hash"],
2319 ["epoch"],
2320 [(
2321 epoch.u64() as i64,
2322 stake_table_bytes,
2323 reward_bytes,
2324 stake_table_hash_bytes,
2325 )],
2326 )
2327 .await?;
2328 tx.commit().await
2329 }
2330
2331 async fn store_events(
2332 &self,
2333 l1_finalized: u64,
2334 events: Vec<(EventKey, StakeTableEvent)>,
2335 ) -> anyhow::Result<()> {
2336 if events.is_empty() {
2337 return Ok(());
2338 }
2339
2340 let mut tx = self.db.write().await?;
2341
2342 let last_processed_l1_block = query_as::<(i64,)>(
2344 "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
2345 )
2346 .fetch_optional(tx.as_mut())
2347 .await?
2348 .map(|(l1,)| l1);
2349
2350 tracing::debug!("last l1 finalizes in database = {last_processed_l1_block:?}");
2351
2352 if last_processed_l1_block > Some(l1_finalized.try_into()?) {
2354 tracing::debug!(
2355 ?last_processed_l1_block,
2356 ?l1_finalized,
2357 ?events,
2358 "last l1 finalized stored is already higher"
2359 );
2360 return Ok(());
2361 }
2362
2363 let mut query_builder: sqlx::QueryBuilder<Db> =
2364 sqlx::QueryBuilder::new("INSERT INTO stake_table_events (l1_block, log_index, event) ");
2365
2366 let events = events
2367 .into_iter()
2368 .map(|((block_number, index), event)| {
2369 Ok((
2370 i64::try_from(block_number)?,
2371 i64::try_from(index)?,
2372 serde_json::to_value(event).context("l1 event to value")?,
2373 ))
2374 })
2375 .collect::<anyhow::Result<Vec<_>>>()?;
2376
2377 query_builder.push_values(events.into_iter(), |mut b, (l1_block, log_index, event)| {
2378 b.push_bind(l1_block).push_bind(log_index).push_bind(event);
2379 });
2380
2381 query_builder.push(" ON CONFLICT DO NOTHING");
2382 let query = query_builder.build();
2383
2384 query.execute(tx.as_mut()).await?;
2385
2386 tx.upsert(
2388 "stake_table_events_l1_block",
2389 ["id", "last_l1_block"],
2390 ["id"],
2391 [(0_i32, l1_finalized as i64)],
2392 )
2393 .await?;
2394
2395 tx.commit().await?;
2396
2397 Ok(())
2398 }
2399
2400 async fn load_events(
2411 &self,
2412 to_l1_block: u64,
2413 ) -> anyhow::Result<(
2414 Option<EventsPersistenceRead>,
2415 Vec<(EventKey, StakeTableEvent)>,
2416 )> {
2417 let mut tx = self.db.read().await?;
2418
2419 let res = query_as::<(i64,)>(
2421 "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
2422 )
2423 .fetch_optional(tx.as_mut())
2424 .await?;
2425
2426 let Some((last_processed_l1_block,)) = res else {
2427 return Ok((None, Vec::new()));
2429 };
2430
2431 let to_l1_block = to_l1_block.try_into()?;
2435 let query_l1_block = if last_processed_l1_block > to_l1_block {
2436 to_l1_block
2437 } else {
2438 last_processed_l1_block
2439 };
2440
2441 let rows = query(
2442 "SELECT l1_block, log_index, event FROM stake_table_events WHERE l1_block <= $1 ORDER \
2443 BY l1_block ASC, log_index ASC",
2444 )
2445 .bind(query_l1_block)
2446 .fetch_all(tx.as_mut())
2447 .await?;
2448
2449 let events = rows
2450 .into_iter()
2451 .map(|row| {
2452 let l1_block: i64 = row.try_get("l1_block")?;
2453 let log_index: i64 = row.try_get("log_index")?;
2454 let event = serde_json::from_value(row.try_get("event")?)?;
2455
2456 Ok(((l1_block.try_into()?, log_index.try_into()?), event))
2457 })
2458 .collect::<anyhow::Result<Vec<_>>>()?;
2459
2460 if query_l1_block == to_l1_block {
2464 Ok((Some(EventsPersistenceRead::Complete), events))
2465 } else {
2466 Ok((
2467 Some(EventsPersistenceRead::UntilL1Block(
2468 query_l1_block.try_into()?,
2469 )),
2470 events,
2471 ))
2472 }
2473 }
2474}
2475
2476#[async_trait]
2477impl DhtPersistentStorage for Persistence {
2478 async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
2484 let to_save =
2486 bincode::serialize(&records).with_context(|| "failed to serialize records")?;
2487
2488 let stmt = "INSERT INTO libp2p_dht (id, serialized_records) VALUES (0, $1) ON CONFLICT \
2490 (id) DO UPDATE SET serialized_records = $1";
2491
2492 let mut tx = self
2494 .db
2495 .write()
2496 .await
2497 .with_context(|| "failed to start an atomic DB transaction")?;
2498 tx.execute(query(stmt).bind(to_save))
2499 .await
2500 .with_context(|| "failed to execute DB query")?;
2501
2502 tx.commit().await.with_context(|| "failed to commit to DB")
2504 }
2505
2506 async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
2512 let result = self
2514 .db
2515 .read()
2516 .await
2517 .with_context(|| "failed to start a DB read transaction")?
2518 .fetch_one("SELECT * FROM libp2p_dht where id = 0")
2519 .await
2520 .with_context(|| "failed to fetch from DB")?;
2521
2522 let serialied_records: Vec<u8> = result.get("serialized_records");
2524
2525 let records: Vec<SerializableRecord> = bincode::deserialize(&serialied_records)
2527 .with_context(|| "Failed to deserialize records")?;
2528
2529 Ok(records)
2530 }
2531}
2532
2533#[async_trait]
2534impl Provider<SeqTypes, VidCommonRequest> for Persistence {
2535 #[tracing::instrument(skip(self))]
2536 async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
2537 let mut tx = match self.db.read().await {
2538 Ok(tx) => tx,
2539 Err(err) => {
2540 tracing::warn!("could not open transaction: {err:#}");
2541 return None;
2542 },
2543 };
2544
2545 let bytes = match query_as::<(Vec<u8>,)>(
2546 "SELECT data FROM vid_share2 WHERE payload_hash = $1 LIMIT 1",
2547 )
2548 .bind(req.0.to_string())
2549 .fetch_optional(tx.as_mut())
2550 .await
2551 {
2552 Ok(Some((bytes,))) => bytes,
2553 Ok(None) => return None,
2554 Err(err) => {
2555 tracing::error!("error loading VID share: {err:#}");
2556 return None;
2557 },
2558 };
2559
2560 let share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
2561 match bincode::deserialize(&bytes) {
2562 Ok(share) => share,
2563 Err(err) => {
2564 tracing::warn!("error decoding VID share: {err:#}");
2565 return None;
2566 },
2567 };
2568
2569 match share.data {
2570 VidDisperseShare::V0(vid) => Some(VidCommon::V0(vid.common)),
2571 VidDisperseShare::V1(vid) => Some(VidCommon::V1(vid.common)),
2572 }
2573 }
2574}
2575
2576#[async_trait]
2577impl Provider<SeqTypes, PayloadRequest> for Persistence {
2578 #[tracing::instrument(skip(self))]
2579 async fn fetch(&self, req: PayloadRequest) -> Option<Payload> {
2580 let mut tx = match self.db.read().await {
2581 Ok(tx) => tx,
2582 Err(err) => {
2583 tracing::warn!("could not open transaction: {err:#}");
2584 return None;
2585 },
2586 };
2587
2588 let bytes = match query_as::<(Vec<u8>,)>(
2589 "SELECT data FROM da_proposal2 WHERE payload_hash = $1 LIMIT 1",
2590 )
2591 .bind(req.0.to_string())
2592 .fetch_optional(tx.as_mut())
2593 .await
2594 {
2595 Ok(Some((bytes,))) => bytes,
2596 Ok(None) => return None,
2597 Err(err) => {
2598 tracing::warn!("error loading DA proposal: {err:#}");
2599 return None;
2600 },
2601 };
2602
2603 let proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> = match bincode::deserialize(&bytes)
2604 {
2605 Ok(proposal) => proposal,
2606 Err(err) => {
2607 tracing::error!("error decoding DA proposal: {err:#}");
2608 return None;
2609 },
2610 };
2611
2612 Some(Payload::from_bytes(
2613 &proposal.data.encoded_transactions,
2614 &proposal.data.metadata,
2615 ))
2616 }
2617}
2618
2619#[async_trait]
2620impl Provider<SeqTypes, LeafRequest<SeqTypes>> for Persistence {
2621 #[tracing::instrument(skip(self))]
2622 async fn fetch(&self, req: LeafRequest<SeqTypes>) -> Option<LeafQueryData<SeqTypes>> {
2623 let mut tx = match self.db.read().await {
2624 Ok(tx) => tx,
2625 Err(err) => {
2626 tracing::warn!("could not open transaction: {err:#}");
2627 return None;
2628 },
2629 };
2630
2631 let (leaf, qc) = match fetch_leaf_from_proposals(&mut tx, req).await {
2632 Ok(res) => res?,
2633 Err(err) => {
2634 tracing::info!("requested leaf not found in undecided proposals: {err:#}");
2635 return None;
2636 },
2637 };
2638
2639 match LeafQueryData::new(leaf, qc) {
2640 Ok(leaf) => Some(leaf),
2641 Err(err) => {
2642 tracing::warn!("fetched invalid leaf: {err:#}");
2643 None
2644 },
2645 }
2646 }
2647}
2648
2649async fn fetch_leaf_from_proposals<Mode: TransactionMode>(
2650 tx: &mut Transaction<Mode>,
2651 req: LeafRequest<SeqTypes>,
2652) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
2653 let Some((proposal_bytes,)) =
2655 query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE leaf_hash = $1 LIMIT 1")
2656 .bind(req.expected_leaf.to_string())
2657 .fetch_optional(tx.as_mut())
2658 .await
2659 .context("fetching proposal")?
2660 else {
2661 return Ok(None);
2662 };
2663
2664 let Some((qc_bytes,)) =
2666 query_as::<(Vec<u8>,)>("SELECT data FROM quorum_certificate2 WHERE leaf_hash = $1 LIMIT 1")
2667 .bind(req.expected_leaf.to_string())
2668 .fetch_optional(tx.as_mut())
2669 .await
2670 .context("fetching QC")?
2671 else {
2672 return Ok(None);
2673 };
2674
2675 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
2676 bincode::deserialize(&proposal_bytes).context("deserializing quorum proposal")?;
2677 let qc: QuorumCertificate2<SeqTypes> =
2678 bincode::deserialize(&qc_bytes).context("deserializing quorum certificate")?;
2679
2680 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
2681 Ok(Some((leaf, qc)))
2682}
2683
2684#[cfg(test)]
2685mod testing {
2686 use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
2687
2688 use super::*;
2689 use crate::persistence::tests::TestablePersistence;
2690
2691 #[async_trait]
2692 impl TestablePersistence for Persistence {
2693 type Storage = Arc<TmpDb>;
2694
2695 async fn tmp_storage() -> Self::Storage {
2696 Arc::new(TmpDb::init().await)
2697 }
2698
2699 #[allow(refining_impl_trait)]
2700 fn options(db: &Self::Storage) -> Options {
2701 #[cfg(not(feature = "embedded-db"))]
2702 {
2703 PostgresOptions {
2704 port: Some(db.port()),
2705 host: Some(db.host()),
2706 user: Some("postgres".into()),
2707 password: Some("password".into()),
2708 ..Default::default()
2709 }
2710 .into()
2711 }
2712
2713 #[cfg(feature = "embedded-db")]
2714 {
2715 SqliteOptions {
2716 path: Some(db.path()),
2717 }
2718 .into()
2719 }
2720 }
2721 }
2722}
2723
2724#[cfg(test)]
2725mod test {
2726
2727 use committable::{Commitment, CommitmentBoundsArkless};
2728 use espresso_types::{traits::NullEventConsumer, Header, Leaf, NodeState, ValidatedState};
2729 use futures::stream::TryStreamExt;
2730 use hotshot_example_types::node_types::TestVersions;
2731 use hotshot_types::{
2732 data::{
2733 ns_table::parse_ns_table, vid_disperse::VidDisperseShare2, EpochNumber, QuorumProposal2,
2734 },
2735 message::convert_proposal,
2736 simple_certificate::QuorumCertificate,
2737 simple_vote::QuorumData,
2738 traits::{
2739 block_contents::{BlockHeader, GENESIS_VID_NUM_STORAGE_NODES},
2740 node_implementation::Versions,
2741 signature_key::SignatureKey,
2742 EncodeBytes,
2743 },
2744 utils::EpochTransitionIndicator,
2745 vid::{
2746 advz::advz_scheme,
2747 avidm::{init_avidm_param, AvidMScheme},
2748 },
2749 };
2750 use jf_vid::VidScheme;
2751 use vbs::version::StaticVersionType;
2752
2753 use super::*;
2754 use crate::{persistence::tests::TestablePersistence as _, BLSPubKey, PubKey};
2755
2756 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2757 async fn test_quorum_proposals_leaf_hash_migration() {
2758 let leaf: Leaf2 =
2760 Leaf::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock())
2761 .await
2762 .into();
2763 let privkey = BLSPubKey::generated_from_seed_indexed([0; 32], 1).1;
2764 let signature = PubKey::sign(&privkey, &[]).unwrap();
2765 let mut quorum_proposal = Proposal {
2766 data: QuorumProposal2::<SeqTypes> {
2767 epoch: None,
2768 block_header: leaf.block_header().clone(),
2769 view_number: ViewNumber::genesis(),
2770 justify_qc: QuorumCertificate::genesis::<TestVersions>(
2771 &ValidatedState::default(),
2772 &NodeState::mock(),
2773 )
2774 .await
2775 .to_qc2(),
2776 upgrade_certificate: None,
2777 view_change_evidence: None,
2778 next_drb_result: None,
2779 next_epoch_justify_qc: None,
2780 state_cert: None,
2781 },
2782 signature,
2783 _pd: Default::default(),
2784 };
2785
2786 let qp1: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
2787 convert_proposal(quorum_proposal.clone());
2788
2789 quorum_proposal.data.view_number = ViewNumber::new(1);
2790
2791 let qp2: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
2792 convert_proposal(quorum_proposal.clone());
2793 let qps = [qp1, qp2];
2794
2795 let db = Persistence::tmp_storage().await;
2797 let persistence = Persistence::connect(&db).await;
2798 let mut tx = persistence.db.write().await.unwrap();
2799 let params = qps
2800 .iter()
2801 .map(|qp| {
2802 (
2803 qp.data.view_number.u64() as i64,
2804 bincode::serialize(&qp).unwrap(),
2805 )
2806 })
2807 .collect::<Vec<_>>();
2808 tx.upsert("quorum_proposals", ["view", "data"], ["view"], params)
2809 .await
2810 .unwrap();
2811 tx.commit().await.unwrap();
2812
2813 let persistence = Persistence::connect(&db).await;
2815 let mut tx = persistence.db.read().await.unwrap();
2816 let rows = tx
2817 .fetch("SELECT * FROM quorum_proposals ORDER BY view ASC")
2818 .try_collect::<Vec<_>>()
2819 .await
2820 .unwrap();
2821 assert_eq!(rows.len(), qps.len());
2822 for (row, qp) in rows.into_iter().zip(qps) {
2823 assert_eq!(row.get::<i64, _>("view"), qp.data.view_number.u64() as i64);
2824 assert_eq!(
2825 row.get::<Vec<u8>, _>("data"),
2826 bincode::serialize(&qp).unwrap()
2827 );
2828 assert_eq!(
2829 row.get::<String, _>("leaf_hash"),
2830 Committable::commit(&Leaf::from_quorum_proposal(&qp.data)).to_string()
2831 );
2832 }
2833 }
2834
2835 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2836 async fn test_fetching_providers() {
2837 let tmp = Persistence::tmp_storage().await;
2838 let storage = Persistence::connect(&tmp).await;
2839
2840 let leaf =
2842 Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
2843 let leaf_payload = leaf.block_payload().unwrap();
2844 let leaf_payload_bytes_arc = leaf_payload.encode();
2845
2846 let avidm_param = init_avidm_param(2).unwrap();
2847 let weights = vec![1u32; 2];
2848
2849 let ns_table = parse_ns_table(
2850 leaf_payload.byte_len().as_usize(),
2851 &leaf_payload.ns_table().encode(),
2852 );
2853 let (payload_commitment, shares) =
2854 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
2855 .unwrap();
2856 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
2857 let vid_share = VidDisperseShare2::<SeqTypes> {
2858 view_number: ViewNumber::new(0),
2859 payload_commitment,
2860 share: shares[0].clone(),
2861 recipient_key: pubkey,
2862 epoch: None,
2863 target_epoch: None,
2864 common: avidm_param.clone(),
2865 }
2866 .to_proposal(&privkey)
2867 .unwrap()
2868 .clone();
2869
2870 let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
2871 proposal: QuorumProposal2::<SeqTypes> {
2872 block_header: leaf.block_header().clone(),
2873 view_number: leaf.view_number(),
2874 justify_qc: leaf.justify_qc(),
2875 upgrade_certificate: None,
2876 view_change_evidence: None,
2877 next_drb_result: None,
2878 next_epoch_justify_qc: None,
2879 epoch: None,
2880 state_cert: None,
2881 },
2882 };
2883 let quorum_proposal_signature =
2884 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2885 .expect("Failed to sign quorum proposal");
2886 let quorum_proposal = Proposal {
2887 data: quorum_proposal,
2888 signature: quorum_proposal_signature,
2889 _pd: Default::default(),
2890 };
2891
2892 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
2893 .expect("Failed to sign block payload");
2894 let da_proposal = Proposal {
2895 data: DaProposal2::<SeqTypes> {
2896 encoded_transactions: leaf_payload_bytes_arc,
2897 metadata: leaf_payload.ns_table().clone(),
2898 view_number: ViewNumber::new(0),
2899 epoch: None,
2900 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
2901 },
2902 signature: block_payload_signature,
2903 _pd: Default::default(),
2904 };
2905
2906 let mut next_quorum_proposal = quorum_proposal.clone();
2907 next_quorum_proposal.data.proposal.view_number += 1;
2908 next_quorum_proposal.data.proposal.justify_qc.view_number += 1;
2909 next_quorum_proposal
2910 .data
2911 .proposal
2912 .justify_qc
2913 .data
2914 .leaf_commit = Committable::commit(&leaf.clone());
2915 let qc = next_quorum_proposal.data.justify_qc();
2916
2917 storage
2919 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
2920 .await
2921 .unwrap();
2922 storage
2923 .append_vid2(&convert_proposal(vid_share.clone()))
2924 .await
2925 .unwrap();
2926 storage
2927 .append_quorum_proposal2(&quorum_proposal)
2928 .await
2929 .unwrap();
2930
2931 storage
2933 .append_quorum_proposal2(&next_quorum_proposal)
2934 .await
2935 .unwrap();
2936
2937 assert_eq!(
2939 Some(VidCommon::V1(avidm_param)),
2940 storage
2941 .fetch(VidCommonRequest(VidCommitment::V1(
2942 vid_share.data.payload_commitment
2943 )))
2944 .await
2945 );
2946 assert_eq!(
2947 leaf_payload,
2948 storage
2949 .fetch(PayloadRequest(VidCommitment::V1(
2950 vid_share.data.payload_commitment
2951 )))
2952 .await
2953 .unwrap()
2954 );
2955 assert_eq!(
2956 LeafQueryData::new(leaf.clone(), qc.clone()).unwrap(),
2957 storage
2958 .fetch(LeafRequest::new(
2959 leaf.block_header().block_number(),
2960 Committable::commit(&leaf),
2961 qc.clone().commit()
2962 ))
2963 .await
2964 .unwrap()
2965 );
2966 }
2967
2968 async fn test_pruning_helper(pruning_opt: ConsensusPruningOptions) {
2976 let tmp = Persistence::tmp_storage().await;
2977 let mut opt = Persistence::options(&tmp);
2978 opt.consensus_pruning = pruning_opt;
2979 let storage = opt.create().await.unwrap();
2980
2981 let data_view = ViewNumber::new(1);
2982
2983 let leaf =
2985 Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
2986 let leaf_payload = leaf.block_payload().unwrap();
2987 let leaf_payload_bytes_arc = leaf_payload.encode();
2988
2989 let avidm_param = init_avidm_param(2).unwrap();
2990 let weights = vec![1u32; 2];
2991
2992 let ns_table = parse_ns_table(
2993 leaf_payload.byte_len().as_usize(),
2994 &leaf_payload.ns_table().encode(),
2995 );
2996 let (payload_commitment, shares) =
2997 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
2998 .unwrap();
2999
3000 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
3001 let vid = VidDisperseShare2::<SeqTypes> {
3002 view_number: data_view,
3003 payload_commitment,
3004 share: shares[0].clone(),
3005 recipient_key: pubkey,
3006 epoch: None,
3007 target_epoch: None,
3008 common: avidm_param,
3009 }
3010 .to_proposal(&privkey)
3011 .unwrap()
3012 .clone();
3013 let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
3014 proposal: QuorumProposal2::<SeqTypes> {
3015 epoch: None,
3016 block_header: leaf.block_header().clone(),
3017 view_number: data_view,
3018 justify_qc: QuorumCertificate2::genesis::<TestVersions>(
3019 &ValidatedState::default(),
3020 &NodeState::mock(),
3021 )
3022 .await,
3023 upgrade_certificate: None,
3024 view_change_evidence: None,
3025 next_drb_result: None,
3026 next_epoch_justify_qc: None,
3027 state_cert: None,
3028 },
3029 };
3030 let quorum_proposal_signature =
3031 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3032 .expect("Failed to sign quorum proposal");
3033 let quorum_proposal = Proposal {
3034 data: quorum_proposal,
3035 signature: quorum_proposal_signature,
3036 _pd: Default::default(),
3037 };
3038
3039 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
3040 .expect("Failed to sign block payload");
3041 let da_proposal = Proposal {
3042 data: DaProposal2::<SeqTypes> {
3043 encoded_transactions: leaf_payload_bytes_arc.clone(),
3044 metadata: leaf_payload.ns_table().clone(),
3045 view_number: data_view,
3046 epoch: Some(EpochNumber::new(0)),
3047 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
3048 },
3049 signature: block_payload_signature,
3050 _pd: Default::default(),
3051 };
3052
3053 tracing::info!(?vid, ?da_proposal, ?quorum_proposal, "append data");
3054 storage.append_vid2(&vid).await.unwrap();
3055 storage
3056 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
3057 .await
3058 .unwrap();
3059 storage
3060 .append_quorum_proposal2(&quorum_proposal)
3061 .await
3062 .unwrap();
3063
3064 tracing::info!("decide view 1");
3067 storage
3068 .append_decided_leaves(data_view + 1, [], &NullEventConsumer)
3069 .await
3070 .unwrap();
3071 assert_eq!(
3072 storage.load_vid_share(data_view).await.unwrap().unwrap(),
3073 convert_proposal(vid)
3074 );
3075 assert_eq!(
3076 storage.load_da_proposal(data_view).await.unwrap().unwrap(),
3077 da_proposal
3078 );
3079 assert_eq!(
3080 storage.load_quorum_proposal(data_view).await.unwrap(),
3081 quorum_proposal
3082 );
3083
3084 tracing::info!("decide view 2");
3087 storage
3088 .append_decided_leaves(data_view + 2, [], &NullEventConsumer)
3089 .await
3090 .unwrap();
3091 assert!(storage.load_vid_share(data_view).await.unwrap().is_none(),);
3092 assert!(storage.load_da_proposal(data_view).await.unwrap().is_none());
3093 storage.load_quorum_proposal(data_view).await.unwrap_err();
3094 }
3095
3096 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3097 async fn test_pruning_minimum_retention() {
3098 test_pruning_helper(ConsensusPruningOptions {
3099 target_usage: 0,
3102 minimum_retention: 1,
3103 target_retention: u64::MAX,
3106 })
3107 .await
3108 }
3109
3110 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3111 async fn test_pruning_target_retention() {
3112 test_pruning_helper(ConsensusPruningOptions {
3113 target_retention: 1,
3114 minimum_retention: 0,
3117 target_usage: u64::MAX,
3120 })
3121 .await
3122 }
3123
3124 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3125 async fn test_consensus_migration() {
3126 let tmp = Persistence::tmp_storage().await;
3127 let mut opt = Persistence::options(&tmp);
3128
3129 let storage = opt.create().await.unwrap();
3130
3131 let rows = 300;
3132
3133 assert!(storage.load_state_cert().await.unwrap().is_none());
3134
3135 for i in 0..rows {
3136 let view = ViewNumber::new(i);
3137 let validated_state = ValidatedState::default();
3138 let instance_state = NodeState::default();
3139
3140 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
3141 let (payload, metadata) =
3142 Payload::from_transactions([], &validated_state, &instance_state)
3143 .await
3144 .unwrap();
3145
3146 let payload_bytes = payload.encode();
3147
3148 let block_header =
3149 Header::genesis::<TestVersions>(&instance_state, payload.clone(), &metadata);
3150
3151 let null_quorum_data = QuorumData {
3152 leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
3153 };
3154
3155 let justify_qc = QuorumCertificate::new(
3156 null_quorum_data.clone(),
3157 null_quorum_data.commit(),
3158 view,
3159 None,
3160 std::marker::PhantomData,
3161 );
3162
3163 let quorum_proposal = QuorumProposal {
3164 block_header,
3165 view_number: view,
3166 justify_qc: justify_qc.clone(),
3167 upgrade_certificate: None,
3168 proposal_certificate: None,
3169 };
3170
3171 let quorum_proposal_signature =
3172 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3173 .expect("Failed to sign quorum proposal");
3174
3175 let proposal = Proposal {
3176 data: quorum_proposal.clone(),
3177 signature: quorum_proposal_signature,
3178 _pd: std::marker::PhantomData,
3179 };
3180
3181 let proposal_bytes = bincode::serialize(&proposal)
3182 .context("serializing proposal")
3183 .unwrap();
3184
3185 let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
3186 leaf.fill_block_payload::<TestVersions>(
3187 payload,
3188 GENESIS_VID_NUM_STORAGE_NODES,
3189 <TestVersions as Versions>::Base::VERSION,
3190 )
3191 .unwrap();
3192
3193 let mut tx = storage.db.write().await.unwrap();
3194
3195 let qc_bytes = bincode::serialize(&justify_qc).unwrap();
3196 let leaf_bytes = bincode::serialize(&leaf).unwrap();
3197
3198 tx.upsert(
3199 "anchor_leaf",
3200 ["view", "leaf", "qc"],
3201 ["view"],
3202 [(i as i64, leaf_bytes, qc_bytes)],
3203 )
3204 .await
3205 .unwrap();
3206
3207 let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
3208 epoch: EpochNumber::new(i),
3209 light_client_state: Default::default(), next_stake_table_state: Default::default(), signatures: vec![], auth_root: Default::default(),
3213 };
3214 let state_cert_bytes = bincode::serialize(&state_cert).unwrap();
3216 tx.upsert(
3217 "finalized_state_cert",
3218 ["epoch", "state_cert"],
3219 ["epoch"],
3220 [(i as i64, state_cert_bytes)],
3221 )
3222 .await
3223 .unwrap();
3224
3225 tx.commit().await.unwrap();
3226
3227 let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
3228 .disperse(payload_bytes.clone())
3229 .unwrap();
3230
3231 let vid = ADVZDisperseShare::<SeqTypes> {
3232 view_number: ViewNumber::new(i),
3233 payload_commitment: Default::default(),
3234 share: disperse.shares[0].clone(),
3235 common: disperse.common,
3236 recipient_key: pubkey,
3237 };
3238
3239 let (payload, metadata) =
3240 Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
3241 .await
3242 .unwrap();
3243
3244 let da = DaProposal::<SeqTypes> {
3245 encoded_transactions: payload.encode(),
3246 metadata,
3247 view_number: ViewNumber::new(i),
3248 };
3249
3250 let block_payload_signature =
3251 BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
3252
3253 let da_proposal = Proposal {
3254 data: da,
3255 signature: block_payload_signature,
3256 _pd: Default::default(),
3257 };
3258
3259 storage
3260 .append_vid(&vid.to_proposal(&privkey).unwrap())
3261 .await
3262 .unwrap();
3263 storage
3264 .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
3265 .await
3266 .unwrap();
3267
3268 let leaf_hash = Committable::commit(&leaf);
3269 let mut tx = storage.db.write().await.expect("failed to start write tx");
3270 tx.upsert(
3271 "quorum_proposals",
3272 ["view", "leaf_hash", "data"],
3273 ["view"],
3274 [(i as i64, leaf_hash.to_string(), proposal_bytes)],
3275 )
3276 .await
3277 .expect("failed to upsert quorum proposal");
3278
3279 let justify_qc = &proposal.data.justify_qc;
3280 let justify_qc_bytes = bincode::serialize(&justify_qc)
3281 .context("serializing QC")
3282 .unwrap();
3283 tx.upsert(
3284 "quorum_certificate",
3285 ["view", "leaf_hash", "data"],
3286 ["view"],
3287 [(
3288 justify_qc.view_number.u64() as i64,
3289 justify_qc.data.leaf_commit.to_string(),
3290 &justify_qc_bytes,
3291 )],
3292 )
3293 .await
3294 .expect("failed to upsert qc");
3295
3296 tx.commit().await.expect("failed to commit");
3297 }
3298
3299 storage.migrate_consensus().await.unwrap();
3300
3301 let mut tx = storage.db.read().await.unwrap();
3302 let (anchor_leaf2_count,) = query_as::<(i64,)>("SELECT COUNT(*) from anchor_leaf2")
3303 .fetch_one(tx.as_mut())
3304 .await
3305 .unwrap();
3306 assert_eq!(
3307 anchor_leaf2_count, rows as i64,
3308 "anchor leaf count does not match rows",
3309 );
3310
3311 let (da_proposal_count,) = query_as::<(i64,)>("SELECT COUNT(*) from da_proposal2")
3312 .fetch_one(tx.as_mut())
3313 .await
3314 .unwrap();
3315 assert_eq!(
3316 da_proposal_count, rows as i64,
3317 "da proposal count does not match rows",
3318 );
3319
3320 let (vid_share_count,) = query_as::<(i64,)>("SELECT COUNT(*) from vid_share2")
3321 .fetch_one(tx.as_mut())
3322 .await
3323 .unwrap();
3324 assert_eq!(
3325 vid_share_count, rows as i64,
3326 "vid share count does not match rows"
3327 );
3328
3329 let (quorum_proposals_count,) =
3330 query_as::<(i64,)>("SELECT COUNT(*) from quorum_proposals2")
3331 .fetch_one(tx.as_mut())
3332 .await
3333 .unwrap();
3334 assert_eq!(
3335 quorum_proposals_count, rows as i64,
3336 "quorum proposals count does not match rows",
3337 );
3338
3339 let (quorum_certificates_count,) =
3340 query_as::<(i64,)>("SELECT COUNT(*) from quorum_certificate2")
3341 .fetch_one(tx.as_mut())
3342 .await
3343 .unwrap();
3344 assert_eq!(
3345 quorum_certificates_count, rows as i64,
3346 "quorum certificates count does not match rows",
3347 );
3348
3349 let (state_cert_count,) = query_as::<(i64,)>("SELECT COUNT(*) from finalized_state_cert")
3350 .fetch_one(tx.as_mut())
3351 .await
3352 .unwrap();
3353 assert_eq!(
3354 state_cert_count, rows as i64,
3355 "Light client state update certificates count does not match rows",
3356 );
3357 assert_eq!(
3358 storage.load_state_cert().await.unwrap().unwrap(),
3359 LightClientStateUpdateCertificateV2::<SeqTypes> {
3360 epoch: EpochNumber::new(rows - 1),
3361 light_client_state: Default::default(),
3362 next_stake_table_state: Default::default(),
3363 signatures: vec![],
3364 auth_root: Default::default(),
3365 },
3366 "Wrong light client state update certificate in the storage",
3367 );
3368
3369 storage.migrate_consensus().await.unwrap();
3370 }
3371}
3372
3373#[cfg(test)]
3374#[cfg(not(feature = "embedded-db"))]
3375mod postgres_tests {
3376 use espresso_types::{FeeAccount, Header, Leaf, NodeState, Transaction as Tx};
3377 use hotshot_example_types::node_types::TestVersions;
3378 use hotshot_query_service::{
3379 availability::BlockQueryData, data_source::storage::UpdateAvailabilityStorage,
3380 };
3381 use hotshot_types::{
3382 data::vid_commitment,
3383 simple_certificate::QuorumCertificate,
3384 traits::{
3385 block_contents::{BlockHeader, BuilderFee, GENESIS_VID_NUM_STORAGE_NODES},
3386 election::Membership,
3387 signature_key::BuilderSignatureKey,
3388 EncodeBytes,
3389 },
3390 };
3391
3392 use super::*;
3393 use crate::persistence::tests::TestablePersistence as _;
3394
3395 async fn test_postgres_read_ns_table(instance_state: NodeState) {
3396 instance_state
3397 .coordinator
3398 .membership()
3399 .write()
3400 .await
3401 .set_first_epoch(EpochNumber::genesis(), Default::default());
3402
3403 let tmp = Persistence::tmp_storage().await;
3404 let mut opt = Persistence::options(&tmp);
3405 let storage = opt.create().await.unwrap();
3406
3407 let txs = [
3408 Tx::new(10001u32.into(), vec![1, 2, 3]),
3409 Tx::new(10001u32.into(), vec![4, 5, 6]),
3410 Tx::new(10009u32.into(), vec![7, 8, 9]),
3411 ];
3412
3413 let validated_state = Default::default();
3414 let justify_qc =
3415 QuorumCertificate::genesis::<TestVersions>(&validated_state, &instance_state).await;
3416 let view_number: ViewNumber = justify_qc.view_number + 1;
3417 let parent_leaf = Leaf::genesis::<TestVersions>(&validated_state, &instance_state)
3418 .await
3419 .into();
3420
3421 let (payload, ns_table) =
3422 Payload::from_transactions(txs.clone(), &validated_state, &instance_state)
3423 .await
3424 .unwrap();
3425 let payload_bytes = payload.encode();
3426 let payload_commitment = vid_commitment::<TestVersions>(
3427 &payload_bytes,
3428 &ns_table.encode(),
3429 GENESIS_VID_NUM_STORAGE_NODES,
3430 instance_state.current_version,
3431 );
3432 let builder_commitment = payload.builder_commitment(&ns_table);
3433 let (fee_account, fee_key) = FeeAccount::generated_from_seed_indexed([0; 32], 0);
3434 let fee_amount = 0;
3435 let fee_signature = FeeAccount::sign_fee(&fee_key, fee_amount, &ns_table).unwrap();
3436 let block_header = Header::new(
3437 &validated_state,
3438 &instance_state,
3439 &parent_leaf,
3440 payload_commitment,
3441 builder_commitment,
3442 ns_table,
3443 BuilderFee {
3444 fee_amount,
3445 fee_account,
3446 fee_signature,
3447 },
3448 instance_state.current_version,
3449 view_number.u64(),
3450 )
3451 .await
3452 .unwrap();
3453 let proposal = QuorumProposal {
3454 block_header: block_header.clone(),
3455 view_number,
3456 justify_qc: justify_qc.clone(),
3457 upgrade_certificate: None,
3458 proposal_certificate: None,
3459 };
3460 let leaf: Leaf2 = Leaf::from_quorum_proposal(&proposal).into();
3461 let mut qc = justify_qc.to_qc2();
3462 qc.data.leaf_commit = leaf.commit();
3463 qc.view_number = view_number;
3464
3465 let mut tx = storage.db.write().await.unwrap();
3466 tx.insert_leaf(LeafQueryData::new(leaf, qc).unwrap())
3467 .await
3468 .unwrap();
3469 tx.insert_block(BlockQueryData::<SeqTypes>::new(block_header, payload))
3470 .await
3471 .unwrap();
3472 tx.commit().await.unwrap();
3473
3474 let mut tx = storage.db.read().await.unwrap();
3475 let rows = query(
3476 "
3477 SELECT ns_id, read_ns_id(get_ns_table(h.data), t.ns_index) AS read_ns_id
3478 FROM header AS h
3479 JOIN transactions AS t ON t.block_height = h.height
3480 ORDER BY t.ns_index, t.position
3481 ",
3482 )
3483 .fetch_all(tx.as_mut())
3484 .await
3485 .unwrap();
3486 assert_eq!(rows.len(), txs.len());
3487 for (i, row) in rows.into_iter().enumerate() {
3488 let ns = u64::from(txs[i].namespace()) as i64;
3489 assert_eq!(row.get::<i64, _>("ns_id"), ns);
3490 assert_eq!(row.get::<i64, _>("read_ns_id"), ns);
3491 }
3492 }
3493
3494 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3495 async fn test_postgres_read_ns_table_v0_1() {
3496 test_postgres_read_ns_table(NodeState::mock()).await;
3497 }
3498
3499 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3500 async fn test_postgres_read_ns_table_v0_2() {
3501 test_postgres_read_ns_table(NodeState::mock_v2()).await;
3502 }
3503
3504 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3505 async fn test_postgres_read_ns_table_v0_3() {
3506 test_postgres_read_ns_table(NodeState::mock_v3().with_epoch_height(0)).await;
3507 }
3508}