sequencer/persistence/
sql.rs

1use std::{
2    collections::BTreeMap,
3    path::PathBuf,
4    str::FromStr,
5    sync::Arc,
6    time::{Duration, Instant},
7};
8
9use anyhow::{bail, Context};
10use async_trait::async_trait;
11use clap::Parser;
12use committable::Committable;
13use derivative::Derivative;
14use derive_more::derive::{From, Into};
15use espresso_types::{
16    parse_duration, parse_size,
17    traits::{EventsPersistenceRead, MembershipPersistence},
18    v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence, StateCatchup},
19    v0_3::{EventKey, IndexedStake, RewardAmount, StakeTableEvent},
20    BackoffParams, BlockMerkleTree, FeeMerkleTree, Leaf, Leaf2, NetworkConfig, Payload,
21    StakeTableHash, ValidatorMap,
22};
23use futures::stream::StreamExt;
24use hotshot::InitializerEpochInfo;
25use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
26    DhtPersistentStorage, SerializableRecord,
27};
28use hotshot_query_service::{
29    availability::LeafQueryData,
30    data_source::{
31        storage::{
32            pruning::PrunerCfg,
33            sql::{
34                include_migrations, query_as, syntax_helpers::MAX_FN, Config, Db, Read, SqlStorage,
35                Transaction, TransactionMode, Write,
36            },
37        },
38        Transaction as _, VersionedDataSource,
39    },
40    fetching::{
41        request::{LeafRequest, PayloadRequest, VidCommonRequest},
42        Provider,
43    },
44    merklized_state::MerklizedState,
45    VidCommon,
46};
47use hotshot_types::{
48    data::{
49        vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
50        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
51        QuorumProposalWrapperLegacy, VidCommitment, VidDisperseShare,
52    },
53    drb::{DrbInput, DrbResult},
54    event::{Event, EventType, HotShotAction, LeafInfo},
55    message::{convert_proposal, Proposal},
56    simple_certificate::{
57        LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
58        NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
59    },
60    traits::{
61        block_contents::{BlockHeader, BlockPayload},
62        metrics::Metrics,
63        node_implementation::ConsensusTime,
64    },
65    vote::HasViewNumber,
66};
67use itertools::Itertools;
68use sqlx::{query, Executor, Row};
69
70use crate::{
71    catchup::SqlStateCatchup, persistence::persistence_metrics::PersistenceMetricsValue, NodeType,
72    SeqTypes, ViewNumber, RECENT_STAKE_TABLES_LIMIT,
73};
74
75/// Options for Postgres-backed persistence.
76#[derive(Parser, Clone, Derivative)]
77#[derivative(Debug)]
78pub struct PostgresOptions {
79    /// Hostname for the remote Postgres database server.
80    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_HOST")]
81    pub(crate) host: Option<String>,
82
83    /// Port for the remote Postgres database server.
84    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PORT")]
85    pub(crate) port: Option<u16>,
86
87    /// Name of database to connect to.
88    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_DATABASE")]
89    pub(crate) database: Option<String>,
90
91    /// Postgres user to connect as.
92    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USER")]
93    pub(crate) user: Option<String>,
94
95    /// Password for Postgres user.
96    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PASSWORD")]
97    // Hide from debug output since may contain sensitive data.
98    #[derivative(Debug = "ignore")]
99    pub(crate) password: Option<String>,
100
101    /// Use TLS for an encrypted connection to the database.
102    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USE_TLS")]
103    pub(crate) use_tls: bool,
104}
105
106impl Default for PostgresOptions {
107    fn default() -> Self {
108        Self::parse_from(std::iter::empty::<String>())
109    }
110}
111
112#[derive(Parser, Clone, Derivative, Default, From, Into)]
113#[derivative(Debug)]
114pub struct SqliteOptions {
115    /// Base directory for the SQLite database.
116    /// The SQLite file will be created in the `sqlite` subdirectory with filename as `database`.
117    #[clap(
118        long,
119        env = "ESPRESSO_SEQUENCER_STORAGE_PATH",
120        value_parser = build_sqlite_path
121    )]
122    pub(crate) path: Option<PathBuf>,
123}
124
125pub fn build_sqlite_path(path: &str) -> anyhow::Result<PathBuf> {
126    let sub_dir = PathBuf::from_str(path)?.join("sqlite");
127
128    // if `sqlite` sub dir does not exist then create it
129    if !sub_dir.exists() {
130        std::fs::create_dir_all(&sub_dir)
131            .with_context(|| format!("failed to create directory: {sub_dir:?}"))?;
132    }
133
134    Ok(sub_dir.join("database"))
135}
136
137/// Options for database-backed persistence, supporting both Postgres and SQLite.
138#[derive(Parser, Clone, Derivative, From, Into)]
139#[derivative(Debug)]
140pub struct Options {
141    #[cfg(not(feature = "embedded-db"))]
142    #[clap(flatten)]
143    pub(crate) postgres_options: PostgresOptions,
144
145    #[cfg(feature = "embedded-db")]
146    #[clap(flatten)]
147    pub(crate) sqlite_options: SqliteOptions,
148
149    /// Database URI for Postgres or SQLite.
150    ///
151    /// This is a shorthand for setting a number of other options all at once. The URI has the
152    /// following format ([brackets] indicate optional segments):
153    ///
154    /// - **Postgres:** `postgres[ql]://[username[:password]@][host[:port],]/database[?parameter_list]`
155    /// - **SQLite:** `sqlite://path/to/db.sqlite`
156    ///
157    /// Options set explicitly via other env vars or flags will take precedence, so you can use this
158    /// URI to set a baseline and then use other parameters to override or add configuration. In
159    /// addition, there are some parameters which cannot be set via the URI, such as TLS.
160    // Hide from debug output since may contain sensitive data.
161    #[derivative(Debug = "ignore")]
162    pub(crate) uri: Option<String>,
163
164    /// This will enable the pruner and set the default pruning parameters unless provided.
165    /// Default parameters:
166    /// - pruning_threshold: 3 TB
167    /// - minimum_retention: 1 day
168    /// - target_retention: 7 days
169    /// - batch_size: 1000
170    /// - max_usage: 80%
171    /// - interval: 1 hour
172    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_PRUNE")]
173    pub(crate) prune: bool,
174
175    /// Pruning parameters.
176    #[clap(flatten)]
177    pub(crate) pruning: PruningOptions,
178
179    /// Pruning parameters for ephemeral consensus storage.
180    #[clap(flatten)]
181    pub(crate) consensus_pruning: ConsensusPruningOptions,
182
183    /// Specifies the maximum number of concurrent fetch requests allowed from peers.
184    #[clap(long, env = "ESPRESSO_SEQUENCER_FETCH_RATE_LIMIT")]
185    pub(crate) fetch_rate_limit: Option<usize>,
186
187    /// The minimum delay between active fetches in a stream.
188    #[clap(long, env = "ESPRESSO_SEQUENCER_ACTIVE_FETCH_DELAY", value_parser = parse_duration)]
189    pub(crate) active_fetch_delay: Option<Duration>,
190
191    /// The minimum delay between loading chunks in a stream.
192    #[clap(long, env = "ESPRESSO_SEQUENCER_CHUNK_FETCH_DELAY", value_parser = parse_duration)]
193    pub(crate) chunk_fetch_delay: Option<Duration>,
194
195    /// Disable pruning and reconstruct previously pruned data.
196    ///
197    /// While running without pruning is the default behavior, the default will not try to
198    /// reconstruct data that was pruned in a previous run where pruning was enabled. This option
199    /// instructs the service to run without pruning _and_ reconstruct all previously pruned data by
200    /// fetching from peers.
201    #[clap(long, env = "ESPRESSO_SEQUENCER_ARCHIVE", conflicts_with = "prune")]
202    pub(crate) archive: bool,
203
204    /// Turns on leaf only data storage
205    #[clap(
206        long,
207        env = "ESPRESSO_SEQUENCER_LIGHTWEIGHT",
208        default_value_t = false,
209        conflicts_with = "archive"
210    )]
211    pub(crate) lightweight: bool,
212
213    /// The maximum idle time of a database connection.
214    ///
215    /// Any connection which has been open and unused longer than this duration will be
216    /// automatically closed to reduce load on the server.
217    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_IDLE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
218    pub(crate) idle_connection_timeout: Duration,
219
220    /// The maximum lifetime of a database connection.
221    ///
222    /// Any connection which has been open longer than this duration will be automatically closed
223    /// (and, if needed, replaced), even if it is otherwise healthy. It is good practice to refresh
224    /// even healthy connections once in a while (e.g. daily) in case of resource leaks in the
225    /// server implementation.
226    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "30m")]
227    pub(crate) connection_timeout: Duration,
228
229    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_SLOW_STATEMENT_THRESHOLD", value_parser = parse_duration, default_value = "1s")]
230    pub(crate) slow_statement_threshold: Duration,
231
232    /// The minimum number of database connections to maintain at any time.
233    ///
234    /// The database client will, to the best of its ability, maintain at least `min` open
235    /// connections at all times. This can be used to reduce the latency hit of opening new
236    /// connections when at least this many simultaneous connections are frequently needed.
237    #[clap(
238        long,
239        env = "ESPRESSO_SEQUENCER_DATABASE_MIN_CONNECTIONS",
240        default_value = "0"
241    )]
242    pub(crate) min_connections: u32,
243
244    /// The maximum number of database connections to maintain at any time.
245    ///
246    /// Once `max` connections are in use simultaneously, further attempts to acquire a connection
247    /// (or begin a transaction) will block until one of the existing connections is released.
248    #[clap(
249        long,
250        env = "ESPRESSO_SEQUENCER_DATABASE_MAX_CONNECTIONS",
251        default_value = "25"
252    )]
253    pub(crate) max_connections: u32,
254
255    /// Sets the batch size for the types migration.
256    /// Determines how many `(leaf, vid)` rows are selected from the old types table
257    /// and migrated at once.
258    /// Default is `10000`` if not set
259    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_TYPES_MIGRATION_BATCH_SIZE")]
260    pub(crate) types_migration_batch_size: Option<u64>,
261
262    // Keep the database connection pool when persistence is created,
263    // allowing it to be reused across multiple instances instead of creating
264    // a new pool each time such as for API, consensus storage etc
265    // This also ensures all storage instances adhere to the MAX_CONNECTIONS limit if set
266    //
267    // Note: Cloning the `Pool` is lightweight and efficient because it simply
268    // creates a new reference-counted handle to the underlying pool state.
269    #[clap(skip)]
270    pub(crate) pool: Option<sqlx::Pool<Db>>,
271}
272
273impl Default for Options {
274    fn default() -> Self {
275        Self::parse_from(std::iter::empty::<String>())
276    }
277}
278
279#[cfg(not(feature = "embedded-db"))]
280impl From<PostgresOptions> for Config {
281    fn from(opt: PostgresOptions) -> Self {
282        let mut cfg = Config::default();
283
284        if let Some(host) = opt.host {
285            cfg = cfg.host(host);
286        }
287
288        if let Some(port) = opt.port {
289            cfg = cfg.port(port);
290        }
291
292        if let Some(database) = &opt.database {
293            cfg = cfg.database(database);
294        }
295
296        if let Some(user) = &opt.user {
297            cfg = cfg.user(user);
298        }
299
300        if let Some(password) = &opt.password {
301            cfg = cfg.password(password);
302        }
303
304        if opt.use_tls {
305            cfg = cfg.tls();
306        }
307
308        cfg = cfg.max_connections(20);
309        cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
310        cfg = cfg.connection_timeout(Duration::from_secs(10240));
311        cfg = cfg.slow_statement_threshold(Duration::from_secs(1));
312
313        cfg
314    }
315}
316
317#[cfg(feature = "embedded-db")]
318impl From<SqliteOptions> for Config {
319    fn from(opt: SqliteOptions) -> Self {
320        let mut cfg = Config::default();
321
322        if let Some(path) = opt.path {
323            cfg = cfg.db_path(path);
324        }
325
326        cfg = cfg.max_connections(20);
327        cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
328        cfg = cfg.connection_timeout(Duration::from_secs(10240));
329        cfg = cfg.slow_statement_threshold(Duration::from_secs(2));
330        cfg
331    }
332}
333
334#[cfg(not(feature = "embedded-db"))]
335impl From<PostgresOptions> for Options {
336    fn from(opt: PostgresOptions) -> Self {
337        Options {
338            postgres_options: opt,
339            max_connections: 20,
340            idle_connection_timeout: Duration::from_secs(120),
341            connection_timeout: Duration::from_secs(10240),
342            slow_statement_threshold: Duration::from_secs(1),
343            ..Default::default()
344        }
345    }
346}
347
348#[cfg(feature = "embedded-db")]
349impl From<SqliteOptions> for Options {
350    fn from(opt: SqliteOptions) -> Self {
351        Options {
352            sqlite_options: opt,
353            max_connections: 10,
354            idle_connection_timeout: Duration::from_secs(120),
355            connection_timeout: Duration::from_secs(10240),
356            slow_statement_threshold: Duration::from_secs(1),
357            ..Default::default()
358        }
359    }
360}
361impl TryFrom<&Options> for Config {
362    type Error = anyhow::Error;
363
364    fn try_from(opt: &Options) -> Result<Self, Self::Error> {
365        let mut cfg = match &opt.uri {
366            Some(uri) => uri.parse()?,
367            None => Self::default(),
368        };
369
370        if let Some(pool) = &opt.pool {
371            cfg = cfg.pool(pool.clone());
372        }
373
374        cfg = cfg.max_connections(opt.max_connections);
375        cfg = cfg.idle_connection_timeout(opt.idle_connection_timeout);
376        cfg = cfg.min_connections(opt.min_connections);
377        cfg = cfg.connection_timeout(opt.connection_timeout);
378        cfg = cfg.slow_statement_threshold(opt.slow_statement_threshold);
379
380        #[cfg(not(feature = "embedded-db"))]
381        {
382            cfg = cfg.migrations(include_migrations!(
383                "$CARGO_MANIFEST_DIR/api/migrations/postgres"
384            ));
385
386            let pg_options = &opt.postgres_options;
387
388            if let Some(host) = &pg_options.host {
389                cfg = cfg.host(host.clone());
390            }
391
392            if let Some(port) = pg_options.port {
393                cfg = cfg.port(port);
394            }
395
396            if let Some(database) = &pg_options.database {
397                cfg = cfg.database(database);
398            }
399
400            if let Some(user) = &pg_options.user {
401                cfg = cfg.user(user);
402            }
403
404            if let Some(password) = &pg_options.password {
405                cfg = cfg.password(password);
406            }
407
408            if pg_options.use_tls {
409                cfg = cfg.tls();
410            }
411        }
412
413        #[cfg(feature = "embedded-db")]
414        {
415            cfg = cfg.migrations(include_migrations!(
416                "$CARGO_MANIFEST_DIR/api/migrations/sqlite"
417            ));
418
419            if let Some(path) = &opt.sqlite_options.path {
420                cfg = cfg.db_path(path.clone());
421            }
422        }
423
424        if opt.prune {
425            cfg = cfg.pruner_cfg(PrunerCfg::from(opt.pruning))?;
426        }
427        if opt.archive {
428            cfg = cfg.archive();
429        }
430
431        Ok(cfg)
432    }
433}
434
435/// Pruning parameters.
436#[derive(Parser, Clone, Copy, Debug)]
437pub struct PruningOptions {
438    /// Threshold for pruning, specified in bytes.
439    /// If the disk usage surpasses this threshold, pruning is initiated for data older than the specified minimum retention period.
440    /// Pruning continues until the disk usage drops below the MAX USAGE.
441    #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_PRUNING_THRESHOLD", value_parser = parse_size)]
442    pruning_threshold: Option<u64>,
443
444    /// Minimum retention period.
445    /// Data is retained for at least this duration, even if there's no free disk space.
446    #[clap(
447        long,
448        env = "ESPRESSO_SEQUENCER_PRUNER_MINIMUM_RETENTION",
449        value_parser = parse_duration,
450    )]
451    minimum_retention: Option<Duration>,
452
453    /// Target retention period.
454    /// Data older than this is pruned to free up space.
455    #[clap(
456        long,
457        env = "ESPRESSO_SEQUENCER_PRUNER_TARGET_RETENTION",
458        value_parser = parse_duration,
459    )]
460    target_retention: Option<Duration>,
461
462    /// Batch size for pruning.
463    /// This is the number of blocks data to delete in a single transaction.
464    #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_BATCH_SIZE")]
465    batch_size: Option<u64>,
466
467    /// Maximum disk usage (in basis points).
468    ///
469    /// Pruning stops once the disk usage falls below this value, even if
470    /// some data older than the `MINIMUM_RETENTION` remains. Values range
471    /// from 0 (0%) to 10000 (100%).
472    #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_MAX_USAGE")]
473    max_usage: Option<u16>,
474
475    /// Interval for running the pruner.
476    #[clap(
477        long,
478        env = "ESPRESSO_SEQUENCER_PRUNER_INTERVAL",
479        value_parser = parse_duration,
480    )]
481    interval: Option<Duration>,
482
483    /// Number of SQLite pages to vacuum from the freelist
484    /// during each pruner cycle.
485    /// This value corresponds to `N` in the SQLite PRAGMA `incremental_vacuum(N)`,
486    #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_INCREMENTAL_VACUUM_PAGES")]
487    pages: Option<u64>,
488}
489
490impl From<PruningOptions> for PrunerCfg {
491    fn from(opt: PruningOptions) -> Self {
492        let mut cfg = PrunerCfg::new();
493        if let Some(threshold) = opt.pruning_threshold {
494            cfg = cfg.with_pruning_threshold(threshold);
495        }
496        if let Some(min) = opt.minimum_retention {
497            cfg = cfg.with_minimum_retention(min);
498        }
499        if let Some(target) = opt.target_retention {
500            cfg = cfg.with_target_retention(target);
501        }
502        if let Some(batch) = opt.batch_size {
503            cfg = cfg.with_batch_size(batch);
504        }
505        if let Some(max) = opt.max_usage {
506            cfg = cfg.with_max_usage(max);
507        }
508        if let Some(interval) = opt.interval {
509            cfg = cfg.with_interval(interval);
510        }
511
512        if let Some(pages) = opt.pages {
513            cfg = cfg.with_incremental_vacuum_pages(pages)
514        }
515
516        cfg = cfg.with_state_tables(vec![
517            BlockMerkleTree::state_type().to_string(),
518            FeeMerkleTree::state_type().to_string(),
519        ]);
520
521        cfg
522    }
523}
524
525/// Pruning parameters for ephemeral consensus storage.
526#[derive(Parser, Clone, Copy, Debug)]
527pub struct ConsensusPruningOptions {
528    /// Number of views to try to retain in consensus storage before data that hasn't been archived
529    /// is garbage collected.
530    ///
531    /// The longer this is, the more certain that all data will eventually be archived, even if
532    /// there are temporary problems with archive storage or partially missing data. This can be set
533    /// very large, as most data is garbage collected as soon as it is finalized by consensus. This
534    /// setting only applies to views which never get decided (ie forks in consensus) and views for
535    /// which this node is partially offline. These should be exceptionally rare.
536    ///
537    /// Note that in extreme scenarios, data may be garbage collected even before TARGET_RETENTION
538    /// views, if consensus storage exceeds TARGET_USAGE. For a hard lower bound on how long
539    /// consensus data will be retained, see MINIMUM_RETENTION.
540    ///
541    /// The default of 302000 views equates to approximately 1 week (604800 seconds) at an average
542    /// view time of 2s.
543    #[clap(
544        name = "TARGET_RETENTION",
545        long = "consensus-storage-target-retention",
546        env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION",
547        default_value = "302000"
548    )]
549    target_retention: u64,
550
551    /// Minimum number of views to try to retain in consensus storage before data that hasn't been
552    /// archived is garbage collected.
553    ///
554    /// This bound allows data to be retained even if consensus storage occupies more than
555    /// TARGET_USAGE. This can be used to ensure sufficient time to move consensus data to archival
556    /// storage as necessary, even under extreme circumstances where otherwise garbage collection
557    /// would kick in based on TARGET_RETENTION.
558    ///
559    /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average
560    /// view time of 2s.
561    #[clap(
562        name = "MINIMUM_RETENTION",
563        long = "consensus-storage-minimum-retention",
564        env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION",
565        default_value = "130000"
566    )]
567    minimum_retention: u64,
568
569    /// Amount (in bytes) of data to retain in consensus storage before garbage collecting more
570    /// aggressively.
571    ///
572    /// See also TARGET_RETENTION and MINIMUM_RETENTION.
573    #[clap(
574        name = "TARGET_USAGE",
575        long = "consensus-storage-target-usage",
576        env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE",
577        default_value = "1000000000"
578    )]
579    target_usage: u64,
580}
581
582#[async_trait]
583impl PersistenceOptions for Options {
584    type Persistence = Persistence;
585
586    fn set_view_retention(&mut self, view_retention: u64) {
587        self.consensus_pruning.target_retention = view_retention;
588        self.consensus_pruning.minimum_retention = view_retention;
589    }
590
591    async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
592        let config = (&*self).try_into()?;
593        let persistence = Persistence {
594            db: SqlStorage::connect(config).await?,
595            gc_opt: self.consensus_pruning,
596            internal_metrics: PersistenceMetricsValue::default(),
597        };
598        persistence.migrate_quorum_proposal_leaf_hashes().await?;
599        self.pool = Some(persistence.db.pool());
600        Ok(persistence)
601    }
602
603    async fn reset(self) -> anyhow::Result<()> {
604        SqlStorage::connect(Config::try_from(&self)?.reset_schema()).await?;
605        Ok(())
606    }
607}
608
609/// Postgres-backed persistence.
610#[derive(Clone, Debug)]
611pub struct Persistence {
612    db: SqlStorage,
613    gc_opt: ConsensusPruningOptions,
614    /// A reference to the internal metrics
615    internal_metrics: PersistenceMetricsValue,
616}
617
618impl Persistence {
619    /// Ensure the `leaf_hash` column is populated for all existing quorum proposals.
620    ///
621    /// This column was added in a migration, but because it requires computing a commitment of the
622    /// existing data, it is not easy to populate in the SQL migration itself. Thus, on startup, we
623    /// check if there are any just-migrated quorum proposals with a `NULL` value for this column,
624    /// and if so we populate the column manually.
625    async fn migrate_quorum_proposal_leaf_hashes(&self) -> anyhow::Result<()> {
626        let mut tx = self.db.write().await?;
627
628        let mut proposals = tx.fetch("SELECT * FROM quorum_proposals");
629
630        let mut updates = vec![];
631        while let Some(row) = proposals.next().await {
632            let row = row?;
633
634            let hash: Option<String> = row.try_get("leaf_hash")?;
635            if hash.is_none() {
636                let view: i64 = row.try_get("view")?;
637                let data: Vec<u8> = row.try_get("data")?;
638                let proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
639                    bincode::deserialize(&data)?;
640                let leaf = Leaf::from_quorum_proposal(&proposal.data);
641                let leaf_hash = Committable::commit(&leaf);
642                tracing::info!(view, %leaf_hash, "populating quorum proposal leaf hash");
643                updates.push((view, leaf_hash.to_string()));
644            }
645        }
646        drop(proposals);
647
648        tx.upsert("quorum_proposals", ["view", "leaf_hash"], ["view"], updates)
649            .await?;
650
651        tx.commit().await
652    }
653
654    async fn generate_decide_events(&self, consumer: &impl EventConsumer) -> anyhow::Result<()> {
655        let mut last_processed_view: Option<i64> = self
656            .db
657            .read()
658            .await?
659            .fetch_optional("SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1")
660            .await?
661            .map(|row| row.get("last_processed_view"));
662        loop {
663            // In SQLite, overlapping read and write transactions can lead to database errors. To
664            // avoid this:
665            // - start a read transaction to query and collect all the necessary data.
666            // - Commit (or implicitly drop) the read transaction once the data is fetched.
667            // - use the collected data to generate a "decide" event for the consumer.
668            // - begin a write transaction to delete the data and update the event stream.
669            let mut tx = self.db.read().await?;
670
671            // Collect a chain of consecutive leaves, starting from the first view after the last
672            // decide. This will correspond to a decide event, and defines a range of views which
673            // can be garbage collected. This may even include views for which there was no leaf,
674            // for which we might still have artifacts like proposals that never finalized.
675            let from_view = match last_processed_view {
676                Some(v) => v + 1,
677                None => 0,
678            };
679
680            let mut parent = None;
681            let mut rows =
682                query("SELECT leaf, qc FROM anchor_leaf2 WHERE view >= $1 ORDER BY view")
683                    .bind(from_view)
684                    .fetch(tx.as_mut());
685            let mut leaves = vec![];
686            let mut final_qc = None;
687            while let Some(row) = rows.next().await {
688                let row = match row {
689                    Ok(row) => row,
690                    Err(err) => {
691                        // If there's an error getting a row, try generating an event with the rows
692                        // we do have.
693                        tracing::warn!("error loading row: {err:#}");
694                        break;
695                    },
696                };
697
698                let leaf_data: Vec<u8> = row.get("leaf");
699                let leaf = bincode::deserialize::<Leaf2>(&leaf_data)?;
700                let qc_data: Vec<u8> = row.get("qc");
701                let qc = bincode::deserialize::<QuorumCertificate2<SeqTypes>>(&qc_data)?;
702                let height = leaf.block_header().block_number();
703
704                // Ensure we are only dealing with a consecutive chain of leaves. We don't want to
705                // garbage collect any views for which we missed a leaf or decide event; at least
706                // not right away, in case we need to recover that data later.
707                if let Some(parent) = parent {
708                    if height != parent + 1 {
709                        tracing::debug!(
710                            height,
711                            parent,
712                            "ending decide event at non-consecutive leaf"
713                        );
714                        break;
715                    }
716                }
717                parent = Some(height);
718                leaves.push(leaf);
719                final_qc = Some(qc);
720            }
721            drop(rows);
722
723            let Some(final_qc) = final_qc else {
724                // End event processing when there are no more decided views.
725                tracing::debug!(from_view, "no new leaves at decide");
726                return Ok(());
727            };
728
729            // Find the range of views encompassed by this leaf chain. All data in this range can be
730            // processed by the consumer and then deleted.
731            let from_view = leaves[0].view_number();
732            let to_view = leaves[leaves.len() - 1].view_number();
733
734            // Collect VID shares for the decide event.
735            let mut vid_shares = tx
736                .fetch_all(
737                    query("SELECT view, data FROM vid_share2 where view >= $1 AND view <= $2")
738                        .bind(from_view.u64() as i64)
739                        .bind(to_view.u64() as i64),
740                )
741                .await?
742                .into_iter()
743                .map(|row| {
744                    let view: i64 = row.get("view");
745                    let data: Vec<u8> = row.get("data");
746                    let vid_proposal = bincode::deserialize::<
747                        Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
748                    >(&data)?;
749                    Ok((view as u64, vid_proposal.data))
750                })
751                .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
752
753            // Collect DA proposals for the decide event.
754            let mut da_proposals = tx
755                .fetch_all(
756                    query("SELECT view, data FROM da_proposal2 where view >= $1 AND view <= $2")
757                        .bind(from_view.u64() as i64)
758                        .bind(to_view.u64() as i64),
759                )
760                .await?
761                .into_iter()
762                .map(|row| {
763                    let view: i64 = row.get("view");
764                    let data: Vec<u8> = row.get("data");
765                    let da_proposal =
766                        bincode::deserialize::<Proposal<SeqTypes, DaProposal2<SeqTypes>>>(&data)?;
767                    Ok((view as u64, da_proposal.data))
768                })
769                .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
770
771            // Collect state certs for the decide event.
772            let state_certs = Self::load_state_certs(&mut tx, from_view, to_view)
773                .await
774                .inspect_err(|err| {
775                    tracing::error!(
776                        ?from_view,
777                        ?to_view,
778                        "failed to load state certificates. error={err:#}"
779                    );
780                })?;
781            drop(tx);
782
783            // Collate all the information by view number and construct a chain of leaves.
784            let leaf_chain = leaves
785                .into_iter()
786                // Go in reverse chronological order, as expected by Decide events.
787                .rev()
788                .map(|mut leaf| {
789                    let view = leaf.view_number();
790
791                    // Include the VID share if available.
792                    let vid_share = vid_shares.remove(&view);
793                    if vid_share.is_none() {
794                        tracing::debug!(?view, "VID share not available at decide");
795                    }
796
797                    // Fill in the full block payload using the DA proposals we had persisted.
798                    if let Some(proposal) = da_proposals.remove(&view) {
799                        let payload =
800                            Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata);
801                        leaf.fill_block_payload_unchecked(payload);
802                    } else if view == ViewNumber::genesis() {
803                        // We don't get a DA proposal for the genesis view, but we know what the
804                        // payload always is.
805                        leaf.fill_block_payload_unchecked(Payload::empty().0);
806                    } else {
807                        tracing::debug!(?view, "DA proposal not available at decide");
808                    }
809
810                    let state_cert = state_certs
811                        .get(&view)
812                        .cloned();
813
814                    LeafInfo {
815                        leaf,
816                        vid_share,
817                        state_cert,
818                        // Note: the following fields are not used in Decide event processing, and
819                        // should be removed. For now, we just default them.
820                        state: Default::default(),
821                        delta: Default::default(),
822                    }
823                })
824                .collect();
825
826            // Generate decide event for the consumer.
827            tracing::debug!(?to_view, ?final_qc, ?leaf_chain, "generating decide event");
828            consumer
829                .handle_event(&Event {
830                    view_number: to_view,
831                    event: EventType::Decide {
832                        leaf_chain: Arc::new(leaf_chain),
833                        qc: Arc::new(final_qc),
834                        block_size: None,
835                    },
836                })
837                .await?;
838
839            let mut tx = self.db.write().await?;
840
841            // Now that we have definitely processed leaves up to `to_view`, we can update
842            // `last_processed_view` so we don't process these leaves again. We may still fail at
843            // this point, or shut down, and fail to complete this update. At worst this will lead
844            // to us sending a duplicate decide event the next time we are called; this is fine as
845            // the event consumer is required to be idempotent.
846            tx.upsert(
847                "event_stream",
848                ["id", "last_processed_view"],
849                ["id"],
850                [(1i32, to_view.u64() as i64)],
851            )
852            .await?;
853
854            // Store all the finalized state certs
855            for (epoch, state_cert) in state_certs {
856                let state_cert_bytes = bincode::serialize(&state_cert)?;
857                tx.upsert(
858                    "finalized_state_cert",
859                    ["epoch", "state_cert"],
860                    ["epoch"],
861                    [(epoch as i64, state_cert_bytes)],
862                )
863                .await?;
864            }
865
866            // Delete the data that has been fully processed.
867            tx.execute(
868                query("DELETE FROM vid_share2 where view >= $1 AND view <= $2")
869                    .bind(from_view.u64() as i64)
870                    .bind(to_view.u64() as i64),
871            )
872            .await?;
873            tx.execute(
874                query("DELETE FROM da_proposal2 where view >= $1 AND view <= $2")
875                    .bind(from_view.u64() as i64)
876                    .bind(to_view.u64() as i64),
877            )
878            .await?;
879            tx.execute(
880                query("DELETE FROM quorum_proposals2 where view >= $1 AND view <= $2")
881                    .bind(from_view.u64() as i64)
882                    .bind(to_view.u64() as i64),
883            )
884            .await?;
885            tx.execute(
886                query("DELETE FROM quorum_certificate2 where view >= $1 AND view <= $2")
887                    .bind(from_view.u64() as i64)
888                    .bind(to_view.u64() as i64),
889            )
890            .await?;
891            tx.execute(
892                query("DELETE FROM state_cert where view >= $1 AND view <= $2")
893                    .bind(from_view.u64() as i64)
894                    .bind(to_view.u64() as i64),
895            )
896            .await?;
897
898            // Clean up leaves, but do not delete the most recent one (all leaves with a view number
899            // less than the given value). This is necessary to ensure that, in case of a restart,
900            // we can resume from the last decided leaf.
901            tx.execute(
902                query("DELETE FROM anchor_leaf2 WHERE view >= $1 AND view < $2")
903                    .bind(from_view.u64() as i64)
904                    .bind(to_view.u64() as i64),
905            )
906            .await?;
907
908            tx.commit().await?;
909            last_processed_view = Some(to_view.u64() as i64);
910        }
911    }
912
913    async fn load_state_certs(
914        tx: &mut Transaction<Read>,
915        from_view: ViewNumber,
916        to_view: ViewNumber,
917    ) -> anyhow::Result<BTreeMap<u64, LightClientStateUpdateCertificateV2<SeqTypes>>> {
918        let rows = tx
919            .fetch_all(
920                query("SELECT view, state_cert FROM state_cert WHERE view >= $1 AND view <= $2")
921                    .bind(from_view.u64() as i64)
922                    .bind(to_view.u64() as i64),
923            )
924            .await?;
925
926        let mut result = BTreeMap::new();
927
928        for row in rows {
929            let data: Vec<u8> = row.get("state_cert");
930
931            let cert: LightClientStateUpdateCertificateV2<SeqTypes> = bincode::deserialize(&data)
932                .or_else(|err_v2| {
933                bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&data)
934                    .map(Into::into)
935                    .context(format!(
936                        "Failed to deserialize LightClientStateUpdateCertificate: with v1 and v2. \
937                         error: {err_v2}"
938                    ))
939            })?;
940
941            result.insert(cert.epoch.u64(), cert);
942        }
943
944        Ok(result)
945    }
946
947    #[tracing::instrument(skip(self))]
948    async fn prune(&self, cur_view: ViewNumber) -> anyhow::Result<()> {
949        let mut tx = self.db.write().await?;
950
951        // Prune everything older than the target retention period.
952        prune_to_view(
953            &mut tx,
954            cur_view.u64().saturating_sub(self.gc_opt.target_retention),
955        )
956        .await?;
957
958        // Check our storage usage; if necessary we will prune more aggressively (up to the minimum
959        // retention) to get below the target usage.
960        #[cfg(feature = "embedded-db")]
961        let usage_query = format!(
962            "SELECT sum(pgsize) FROM dbstat WHERE name IN ({})",
963            PRUNE_TABLES
964                .iter()
965                .map(|table| format!("'{table}'"))
966                .join(",")
967        );
968
969        #[cfg(not(feature = "embedded-db"))]
970        let usage_query = {
971            let table_sizes = PRUNE_TABLES
972                .iter()
973                .map(|table| format!("pg_table_size('{table}')"))
974                .join(" + ");
975            format!("SELECT {table_sizes}")
976        };
977
978        let (usage,): (i64,) = query_as(&usage_query).fetch_one(tx.as_mut()).await?;
979        tracing::debug!(usage, "consensus storage usage after pruning");
980
981        if (usage as u64) > self.gc_opt.target_usage {
982            tracing::warn!(
983                usage,
984                gc_opt = ?self.gc_opt,
985                "consensus storage is running out of space, pruning to minimum retention"
986            );
987            prune_to_view(
988                &mut tx,
989                cur_view.u64().saturating_sub(self.gc_opt.minimum_retention),
990            )
991            .await?;
992        }
993
994        tx.commit().await
995    }
996}
997
998const PRUNE_TABLES: &[&str] = &[
999    "anchor_leaf2",
1000    "vid_share2",
1001    "da_proposal2",
1002    "quorum_proposals2",
1003    "quorum_certificate2",
1004];
1005
1006async fn prune_to_view(tx: &mut Transaction<Write>, view: u64) -> anyhow::Result<()> {
1007    if view == 0 {
1008        // Nothing to prune, the entire chain is younger than the retention period.
1009        return Ok(());
1010    }
1011    tracing::debug!(view, "pruning consensus storage");
1012
1013    for table in PRUNE_TABLES {
1014        let res = query(&format!("DELETE FROM {table} WHERE view < $1"))
1015            .bind(view as i64)
1016            .execute(tx.as_mut())
1017            .await
1018            .context(format!("pruning {table}"))?;
1019        if res.rows_affected() > 0 {
1020            tracing::info!(
1021                "garbage collected {} rows from {table}",
1022                res.rows_affected()
1023            );
1024        }
1025    }
1026
1027    Ok(())
1028}
1029
1030#[async_trait]
1031impl SequencerPersistence for Persistence {
1032    fn into_catchup_provider(
1033        self,
1034        backoff: BackoffParams,
1035    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
1036        Ok(Arc::new(SqlStateCatchup::new(Arc::new(self.db), backoff)))
1037    }
1038
1039    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
1040        tracing::info!("loading config from Postgres");
1041
1042        // Select the most recent config (although there should only be one).
1043        let Some(row) = self
1044            .db
1045            .read()
1046            .await?
1047            .fetch_optional("SELECT config FROM network_config ORDER BY id DESC LIMIT 1")
1048            .await?
1049        else {
1050            tracing::info!("config not found");
1051            return Ok(None);
1052        };
1053        let config = row.try_get("config")?;
1054        Ok(serde_json::from_value(config)?)
1055    }
1056
1057    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
1058        tracing::info!("saving config to database");
1059        let json = serde_json::to_value(cfg)?;
1060
1061        let mut tx = self.db.write().await?;
1062        tx.execute(query("INSERT INTO network_config (config) VALUES ($1)").bind(json))
1063            .await?;
1064        tx.commit().await
1065    }
1066
1067    async fn append_decided_leaves(
1068        &self,
1069        view: ViewNumber,
1070        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, QuorumCertificate2<SeqTypes>)> + Send,
1071        consumer: &(impl EventConsumer + 'static),
1072    ) -> anyhow::Result<()> {
1073        let values = leaf_chain
1074            .into_iter()
1075            .map(|(info, qc2)| {
1076                // The leaf may come with a large payload attached. We don't care about this payload
1077                // because we already store it separately, as part of the DA proposal. Storing it
1078                // here contributes to load on the DB for no reason, so we remove it before
1079                // serializing the leaf.
1080                let mut leaf = info.leaf.clone();
1081                leaf.unfill_block_payload();
1082
1083                let view = qc2.view_number.u64() as i64;
1084                let leaf_bytes = bincode::serialize(&leaf)?;
1085                let qc_bytes = bincode::serialize(&qc2)?;
1086                Ok((view, leaf_bytes, qc_bytes))
1087            })
1088            .collect::<anyhow::Result<Vec<_>>>()?;
1089
1090        // First, append the new leaves. We do this in its own transaction because even if GC or the
1091        // event consumer later fails, there is no need to abort the storage of the leaves.
1092        let mut tx = self.db.write().await?;
1093
1094        tx.upsert("anchor_leaf2", ["view", "leaf", "qc"], ["view"], values)
1095            .await?;
1096        tx.commit().await?;
1097
1098        // Generate an event for the new leaves and, only if it succeeds, clean up data we no longer
1099        // need.
1100        if let Err(err) = self.generate_decide_events(consumer).await {
1101            // GC/event processing failure is not an error, since by this point we have at least
1102            // managed to persist the decided leaves successfully, and GC will just run again at the
1103            // next decide. Log an error but do not return it.
1104            tracing::warn!(?view, "event processing failed: {err:#}");
1105            return Ok(());
1106        }
1107
1108        // Garbage collect data which was not included in any decide event, but which at this point
1109        // is old enough to just forget about.
1110        if let Err(err) = self.prune(view).await {
1111            tracing::warn!(?view, "pruning failed: {err:#}");
1112        }
1113
1114        Ok(())
1115    }
1116
1117    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1118        Ok(self
1119            .db
1120            .read()
1121            .await?
1122            .fetch_optional(query("SELECT view FROM highest_voted_view WHERE id = 0"))
1123            .await?
1124            .map(|row| {
1125                let view: i64 = row.get("view");
1126                ViewNumber::new(view as u64)
1127            }))
1128    }
1129
1130    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1131        Ok(self
1132            .db
1133            .read()
1134            .await?
1135            .fetch_optional(query("SELECT view FROM restart_view WHERE id = 0"))
1136            .await?
1137            .map(|row| {
1138                let view: i64 = row.get("view");
1139                ViewNumber::new(view as u64)
1140            }))
1141    }
1142
1143    async fn load_anchor_leaf(
1144        &self,
1145    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
1146        let Some(row) = self
1147            .db
1148            .read()
1149            .await?
1150            .fetch_optional("SELECT leaf, qc FROM anchor_leaf2 ORDER BY view DESC LIMIT 1")
1151            .await?
1152        else {
1153            return Ok(None);
1154        };
1155
1156        let leaf_bytes: Vec<u8> = row.get("leaf");
1157        let leaf2: Leaf2 = bincode::deserialize(&leaf_bytes)?;
1158
1159        let qc_bytes: Vec<u8> = row.get("qc");
1160        let qc2: QuorumCertificate2<SeqTypes> = bincode::deserialize(&qc_bytes)?;
1161
1162        Ok(Some((leaf2, qc2)))
1163    }
1164
1165    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
1166        let mut tx = self.db.read().await?;
1167        let (view,) = query_as::<(i64,)>("SELECT coalesce(max(view), 0) FROM anchor_leaf2")
1168            .fetch_one(tx.as_mut())
1169            .await?;
1170        Ok(ViewNumber::new(view as u64))
1171    }
1172
1173    async fn load_da_proposal(
1174        &self,
1175        view: ViewNumber,
1176    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
1177        let result = self
1178            .db
1179            .read()
1180            .await?
1181            .fetch_optional(
1182                query("SELECT data FROM da_proposal2 where view = $1").bind(view.u64() as i64),
1183            )
1184            .await?;
1185
1186        result
1187            .map(|row| {
1188                let bytes: Vec<u8> = row.get("data");
1189                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1190            })
1191            .transpose()
1192    }
1193
1194    async fn load_vid_share(
1195        &self,
1196        view: ViewNumber,
1197    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
1198        let result = self
1199            .db
1200            .read()
1201            .await?
1202            .fetch_optional(
1203                query("SELECT data FROM vid_share2 where view = $1").bind(view.u64() as i64),
1204            )
1205            .await?;
1206
1207        result
1208            .map(|row| {
1209                let bytes: Vec<u8> = row.get("data");
1210                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1211            })
1212            .transpose()
1213    }
1214
1215    async fn load_quorum_proposals(
1216        &self,
1217    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
1218    {
1219        let rows = self
1220            .db
1221            .read()
1222            .await?
1223            .fetch_all("SELECT * FROM quorum_proposals2")
1224            .await?;
1225
1226        Ok(BTreeMap::from_iter(
1227            rows.into_iter()
1228                .map(|row| {
1229                    let view: i64 = row.get("view");
1230                    let view_number: ViewNumber = ViewNumber::new(view.try_into()?);
1231                    let bytes: Vec<u8> = row.get("data");
1232                    let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1233                        bincode::deserialize(&bytes).or_else(|error| {
1234                            bincode::deserialize::<
1235                                Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>,
1236                            >(&bytes)
1237                            .map(convert_proposal)
1238                            .inspect_err(|err_v3| {
1239                                tracing::warn!(
1240                                    ?view_number,
1241                                    %error,
1242                                    %err_v3,
1243                                    "ignoring malformed quorum proposal DB row"
1244                                );
1245                            })
1246                        })?;
1247                    Ok((view_number, proposal))
1248                })
1249                .collect::<anyhow::Result<Vec<_>>>()?,
1250        ))
1251    }
1252
1253    async fn load_quorum_proposal(
1254        &self,
1255        view: ViewNumber,
1256    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
1257        let mut tx = self.db.read().await?;
1258        let (data,) =
1259            query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE view = $1 LIMIT 1")
1260                .bind(view.u64() as i64)
1261                .fetch_one(tx.as_mut())
1262                .await?;
1263        let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1264            bincode::deserialize(&data).or_else(|error| {
1265                bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1266                    &data,
1267                )
1268                .map(convert_proposal)
1269                .context(format!(
1270                    "Failed to deserialize quorum proposal for view {view}. error={error}"
1271                ))
1272            })?;
1273
1274        Ok(proposal)
1275    }
1276
1277    async fn append_vid(
1278        &self,
1279        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
1280    ) -> anyhow::Result<()> {
1281        let view = proposal.data.view_number.u64();
1282        let payload_hash = proposal.data.payload_commitment;
1283        let proposal: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1284            convert_proposal(proposal.clone());
1285        let data_bytes = bincode::serialize(&proposal).unwrap();
1286
1287        let now = Instant::now();
1288        let mut tx = self.db.write().await?;
1289        tx.upsert(
1290            "vid_share2",
1291            ["view", "data", "payload_hash"],
1292            ["view"],
1293            [(view as i64, data_bytes, payload_hash.to_string())],
1294        )
1295        .await?;
1296        let res = tx.commit().await;
1297        self.internal_metrics
1298            .internal_append_vid_duration
1299            .add_point(now.elapsed().as_secs_f64());
1300        res
1301    }
1302    async fn append_vid2(
1303        &self,
1304        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
1305    ) -> anyhow::Result<()> {
1306        let view = proposal.data.view_number.u64();
1307        let payload_hash = proposal.data.payload_commitment;
1308        let proposal: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1309            convert_proposal(proposal.clone());
1310        let data_bytes = bincode::serialize(&proposal).unwrap();
1311
1312        let now = Instant::now();
1313        let mut tx = self.db.write().await?;
1314        tx.upsert(
1315            "vid_share2",
1316            ["view", "data", "payload_hash"],
1317            ["view"],
1318            [(view as i64, data_bytes, payload_hash.to_string())],
1319        )
1320        .await?;
1321        let res = tx.commit().await;
1322        self.internal_metrics
1323            .internal_append_vid2_duration
1324            .add_point(now.elapsed().as_secs_f64());
1325        res
1326    }
1327
1328    async fn append_da(
1329        &self,
1330        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1331        vid_commit: VidCommitment,
1332    ) -> anyhow::Result<()> {
1333        let data = &proposal.data;
1334        let view = data.view_number().u64();
1335        let data_bytes = bincode::serialize(proposal).unwrap();
1336
1337        let now = Instant::now();
1338        let mut tx = self.db.write().await?;
1339        tx.upsert(
1340            "da_proposal",
1341            ["view", "data", "payload_hash"],
1342            ["view"],
1343            [(view as i64, data_bytes, vid_commit.to_string())],
1344        )
1345        .await?;
1346        let res = tx.commit().await;
1347        self.internal_metrics
1348            .internal_append_da_duration
1349            .add_point(now.elapsed().as_secs_f64());
1350        res
1351    }
1352
1353    async fn record_action(
1354        &self,
1355        view: ViewNumber,
1356        _epoch: Option<EpochNumber>,
1357        action: HotShotAction,
1358    ) -> anyhow::Result<()> {
1359        // Todo Remove this after https://github.com/EspressoSystems/espresso-sequencer/issues/1931
1360        if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
1361            return Ok(());
1362        }
1363
1364        let stmt = format!(
1365            "INSERT INTO highest_voted_view (id, view) VALUES (0, $1)
1366            ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(highest_voted_view.view, excluded.view)"
1367        );
1368
1369        let mut tx = self.db.write().await?;
1370        tx.execute(query(&stmt).bind(view.u64() as i64)).await?;
1371
1372        if matches!(action, HotShotAction::Vote) {
1373            let restart_view = view + 1;
1374            let stmt = format!(
1375                "INSERT INTO restart_view (id, view) VALUES (0, $1)
1376                ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(restart_view.view, excluded.view)"
1377            );
1378            tx.execute(query(&stmt).bind(restart_view.u64() as i64))
1379                .await?;
1380        }
1381
1382        tx.commit().await
1383    }
1384
1385    async fn append_quorum_proposal2(
1386        &self,
1387        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1388    ) -> anyhow::Result<()> {
1389        let view_number = proposal.data.view_number().u64();
1390
1391        let proposal_bytes = bincode::serialize(&proposal).context("serializing proposal")?;
1392        let leaf_hash = Committable::commit(&Leaf2::from_quorum_proposal(&proposal.data));
1393
1394        let now = Instant::now();
1395        let mut tx = self.db.write().await?;
1396        tx.upsert(
1397            "quorum_proposals2",
1398            ["view", "leaf_hash", "data"],
1399            ["view"],
1400            [(view_number as i64, leaf_hash.to_string(), proposal_bytes)],
1401        )
1402        .await?;
1403
1404        // We also keep track of any QC we see in case we need it to recover our archival storage.
1405        let justify_qc = proposal.data.justify_qc();
1406        let justify_qc_bytes = bincode::serialize(&justify_qc).context("serializing QC")?;
1407        tx.upsert(
1408            "quorum_certificate2",
1409            ["view", "leaf_hash", "data"],
1410            ["view"],
1411            [(
1412                justify_qc.view_number.u64() as i64,
1413                justify_qc.data.leaf_commit.to_string(),
1414                &justify_qc_bytes,
1415            )],
1416        )
1417        .await?;
1418        let res = tx.commit().await;
1419        self.internal_metrics
1420            .internal_append_quorum2_duration
1421            .add_point(now.elapsed().as_secs_f64());
1422        res
1423    }
1424
1425    async fn load_upgrade_certificate(
1426        &self,
1427    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1428        let result = self
1429            .db
1430            .read()
1431            .await?
1432            .fetch_optional("SELECT * FROM upgrade_certificate where id = true")
1433            .await?;
1434
1435        result
1436            .map(|row| {
1437                let bytes: Vec<u8> = row.get("data");
1438                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1439            })
1440            .transpose()
1441    }
1442
1443    async fn store_upgrade_certificate(
1444        &self,
1445        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1446    ) -> anyhow::Result<()> {
1447        let certificate = match decided_upgrade_certificate {
1448            Some(cert) => cert,
1449            None => return Ok(()),
1450        };
1451        let upgrade_certificate_bytes =
1452            bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1453        let mut tx = self.db.write().await?;
1454        tx.upsert(
1455            "upgrade_certificate",
1456            ["id", "data"],
1457            ["id"],
1458            [(true, upgrade_certificate_bytes)],
1459        )
1460        .await?;
1461        tx.commit().await
1462    }
1463
1464    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1465        let batch_size: i64 = 10000;
1466        let mut tx = self.db.read().await?;
1467
1468        // The SQL migration populates the table name and sets a default value of 0 for migrated rows.
1469        // so, fetch_one() would always return a row
1470        // The number of migrated rows is updated after each batch insert.
1471        // This allows the types migration to resume from where it left off.
1472        let (is_completed, mut offset) = query_as::<(bool, i64)>(
1473            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'anchor_leaf'",
1474        )
1475        .fetch_one(tx.as_mut())
1476        .await?;
1477
1478        if is_completed {
1479            tracing::info!("decided leaves already migrated");
1480            return Ok(());
1481        }
1482
1483        tracing::warn!("migrating decided leaves..");
1484        loop {
1485            let mut tx = self.db.read().await?;
1486            let rows = query(
1487                "SELECT view, leaf, qc FROM anchor_leaf WHERE view >= $1 ORDER BY view LIMIT $2",
1488            )
1489            .bind(offset)
1490            .bind(batch_size)
1491            .fetch_all(tx.as_mut())
1492            .await?;
1493
1494            drop(tx);
1495            if rows.is_empty() {
1496                break;
1497            }
1498            let mut values = Vec::new();
1499
1500            for row in rows.iter() {
1501                let leaf: Vec<u8> = row.try_get("leaf")?;
1502                let qc: Vec<u8> = row.try_get("qc")?;
1503                let leaf1: Leaf = bincode::deserialize(&leaf)?;
1504                let qc1: QuorumCertificate<SeqTypes> = bincode::deserialize(&qc)?;
1505                let view: i64 = row.try_get("view")?;
1506
1507                let leaf2: Leaf2 = leaf1.into();
1508                let qc2: QuorumCertificate2<SeqTypes> = qc1.to_qc2();
1509
1510                let leaf2_bytes = bincode::serialize(&leaf2)?;
1511                let qc2_bytes = bincode::serialize(&qc2)?;
1512
1513                values.push((view, leaf2_bytes, qc2_bytes));
1514            }
1515
1516            let mut query_builder: sqlx::QueryBuilder<Db> =
1517                sqlx::QueryBuilder::new("INSERT INTO anchor_leaf2 (view, leaf, qc) ");
1518
1519            offset = values.last().context("last row")?.0;
1520
1521            query_builder.push_values(values.into_iter(), |mut b, (view, leaf, qc)| {
1522                b.push_bind(view).push_bind(leaf).push_bind(qc);
1523            });
1524
1525            // Offset tracking prevents duplicate inserts
1526            // Added as a safeguard.
1527            query_builder.push(" ON CONFLICT DO NOTHING");
1528
1529            let query = query_builder.build();
1530
1531            let mut tx = self.db.write().await?;
1532            query.execute(tx.as_mut()).await?;
1533
1534            tx.upsert(
1535                "epoch_migration",
1536                ["table_name", "completed", "migrated_rows"],
1537                ["table_name"],
1538                [("anchor_leaf".to_string(), false, offset)],
1539            )
1540            .await?;
1541            tx.commit().await?;
1542
1543            tracing::info!(
1544                "anchor leaf migration progress: rows={} offset={}",
1545                rows.len(),
1546                offset
1547            );
1548
1549            if rows.len() < batch_size as usize {
1550                break;
1551            }
1552        }
1553
1554        tracing::warn!("migrated decided leaves");
1555
1556        let mut tx = self.db.write().await?;
1557        tx.upsert(
1558            "epoch_migration",
1559            ["table_name", "completed", "migrated_rows"],
1560            ["table_name"],
1561            [("anchor_leaf".to_string(), true, offset)],
1562        )
1563        .await?;
1564        tx.commit().await?;
1565
1566        tracing::info!("updated epoch_migration table for anchor_leaf");
1567
1568        Ok(())
1569    }
1570
1571    async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1572        let batch_size: i64 = 10000;
1573        let mut tx = self.db.read().await?;
1574
1575        let (is_completed, mut offset) = query_as::<(bool, i64)>(
1576            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'da_proposal'",
1577        )
1578        .fetch_one(tx.as_mut())
1579        .await?;
1580
1581        if is_completed {
1582            tracing::info!("da proposals migration already done");
1583            return Ok(());
1584        }
1585
1586        tracing::warn!("migrating da proposals..");
1587
1588        loop {
1589            let mut tx = self.db.read().await?;
1590            let rows = query(
1591                "SELECT payload_hash, data FROM da_proposal WHERE view >= $1 ORDER BY view LIMIT \
1592                 $2",
1593            )
1594            .bind(offset)
1595            .bind(batch_size)
1596            .fetch_all(tx.as_mut())
1597            .await?;
1598
1599            drop(tx);
1600            if rows.is_empty() {
1601                break;
1602            }
1603            let mut values = Vec::new();
1604
1605            for row in rows.iter() {
1606                let data: Vec<u8> = row.try_get("data")?;
1607                let payload_hash: String = row.try_get("payload_hash")?;
1608
1609                let da_proposal: Proposal<SeqTypes, DaProposal<SeqTypes>> =
1610                    bincode::deserialize(&data)?;
1611                let da_proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
1612                    convert_proposal(da_proposal);
1613
1614                let view = da_proposal2.data.view_number.u64() as i64;
1615                let data = bincode::serialize(&da_proposal2)?;
1616
1617                values.push((view, payload_hash, data));
1618            }
1619
1620            let mut query_builder: sqlx::QueryBuilder<Db> =
1621                sqlx::QueryBuilder::new("INSERT INTO da_proposal2 (view, payload_hash, data) ");
1622
1623            offset = values.last().context("last row")?.0;
1624            query_builder.push_values(values.into_iter(), |mut b, (view, payload_hash, data)| {
1625                b.push_bind(view).push_bind(payload_hash).push_bind(data);
1626            });
1627            query_builder.push(" ON CONFLICT DO NOTHING");
1628            let query = query_builder.build();
1629
1630            let mut tx = self.db.write().await?;
1631            query.execute(tx.as_mut()).await?;
1632
1633            tx.upsert(
1634                "epoch_migration",
1635                ["table_name", "completed", "migrated_rows"],
1636                ["table_name"],
1637                [("da_proposal".to_string(), false, offset)],
1638            )
1639            .await?;
1640            tx.commit().await?;
1641
1642            tracing::info!(
1643                "DA proposals migration progress: rows={} offset={}",
1644                rows.len(),
1645                offset
1646            );
1647            if rows.len() < batch_size as usize {
1648                break;
1649            }
1650        }
1651
1652        tracing::warn!("migrated da proposals");
1653
1654        let mut tx = self.db.write().await?;
1655        tx.upsert(
1656            "epoch_migration",
1657            ["table_name", "completed", "migrated_rows"],
1658            ["table_name"],
1659            [("da_proposal".to_string(), true, offset)],
1660        )
1661        .await?;
1662        tx.commit().await?;
1663
1664        tracing::info!("updated epoch_migration table for da_proposal");
1665
1666        Ok(())
1667    }
1668
1669    async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1670        let batch_size: i64 = 10000;
1671
1672        let mut tx = self.db.read().await?;
1673
1674        let (is_completed, mut offset) = query_as::<(bool, i64)>(
1675            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'vid_share'",
1676        )
1677        .fetch_one(tx.as_mut())
1678        .await?;
1679
1680        if is_completed {
1681            tracing::info!("vid_share migration already done");
1682            return Ok(());
1683        }
1684
1685        tracing::warn!("migrating vid shares..");
1686        loop {
1687            let mut tx = self.db.read().await?;
1688            let rows = query(
1689                "SELECT payload_hash, data FROM vid_share WHERE view >= $1 ORDER BY view LIMIT $2",
1690            )
1691            .bind(offset)
1692            .bind(batch_size)
1693            .fetch_all(tx.as_mut())
1694            .await?;
1695
1696            drop(tx);
1697            if rows.is_empty() {
1698                break;
1699            }
1700            let mut values = Vec::new();
1701
1702            for row in rows.iter() {
1703                let data: Vec<u8> = row.try_get("data")?;
1704                let payload_hash: String = row.try_get("payload_hash")?;
1705
1706                let vid_share: Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>> =
1707                    bincode::deserialize(&data)?;
1708                let vid_share2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1709                    convert_proposal(vid_share);
1710
1711                let view = vid_share2.data.view_number().u64() as i64;
1712                let data = bincode::serialize(&vid_share2)?;
1713
1714                values.push((view, payload_hash, data));
1715            }
1716
1717            let mut query_builder: sqlx::QueryBuilder<Db> =
1718                sqlx::QueryBuilder::new("INSERT INTO vid_share2 (view, payload_hash, data) ");
1719
1720            offset = values.last().context("last row")?.0;
1721
1722            query_builder.push_values(values.into_iter(), |mut b, (view, payload_hash, data)| {
1723                b.push_bind(view).push_bind(payload_hash).push_bind(data);
1724            });
1725
1726            let query = query_builder.build();
1727
1728            let mut tx = self.db.write().await?;
1729            query.execute(tx.as_mut()).await?;
1730
1731            tx.upsert(
1732                "epoch_migration",
1733                ["table_name", "completed", "migrated_rows"],
1734                ["table_name"],
1735                [("vid_share".to_string(), false, offset)],
1736            )
1737            .await?;
1738            tx.commit().await?;
1739
1740            tracing::info!(
1741                "VID shares migration progress: rows={} offset={}",
1742                rows.len(),
1743                offset
1744            );
1745            if rows.len() < batch_size as usize {
1746                break;
1747            }
1748        }
1749
1750        tracing::warn!("migrated vid shares");
1751
1752        let mut tx = self.db.write().await?;
1753        tx.upsert(
1754            "epoch_migration",
1755            ["table_name", "completed", "migrated_rows"],
1756            ["table_name"],
1757            [("vid_share".to_string(), true, offset)],
1758        )
1759        .await?;
1760        tx.commit().await?;
1761
1762        tracing::info!("updated epoch_migration table for vid_share");
1763
1764        Ok(())
1765    }
1766
1767    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
1768        let batch_size: i64 = 10000;
1769        let mut tx = self.db.read().await?;
1770
1771        let (is_completed, mut offset) = query_as::<(bool, i64)>(
1772            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
1773             'quorum_proposals'",
1774        )
1775        .fetch_one(tx.as_mut())
1776        .await?;
1777
1778        if is_completed {
1779            tracing::info!("quorum proposals migration already done");
1780            return Ok(());
1781        }
1782
1783        tracing::warn!("migrating quorum proposals..");
1784
1785        loop {
1786            let mut tx = self.db.read().await?;
1787            let rows = query(
1788                "SELECT view, leaf_hash, data FROM quorum_proposals WHERE view >= $1 ORDER BY \
1789                 view LIMIT $2",
1790            )
1791            .bind(offset)
1792            .bind(batch_size)
1793            .fetch_all(tx.as_mut())
1794            .await?;
1795
1796            drop(tx);
1797
1798            if rows.is_empty() {
1799                break;
1800            }
1801
1802            let mut values = Vec::new();
1803
1804            for row in rows.iter() {
1805                let leaf_hash: String = row.try_get("leaf_hash")?;
1806                let data: Vec<u8> = row.try_get("data")?;
1807
1808                let quorum_proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
1809                    bincode::deserialize(&data)?;
1810                let quorum_proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1811                    convert_proposal(quorum_proposal);
1812
1813                let view = quorum_proposal2.data.view_number().u64() as i64;
1814                let data = bincode::serialize(&quorum_proposal2)?;
1815
1816                values.push((view, leaf_hash, data));
1817            }
1818
1819            let mut query_builder: sqlx::QueryBuilder<Db> =
1820                sqlx::QueryBuilder::new("INSERT INTO quorum_proposals2 (view, leaf_hash, data) ");
1821
1822            offset = values.last().context("last row")?.0;
1823            query_builder.push_values(values.into_iter(), |mut b, (view, leaf_hash, data)| {
1824                b.push_bind(view).push_bind(leaf_hash).push_bind(data);
1825            });
1826
1827            query_builder.push(" ON CONFLICT DO NOTHING");
1828
1829            let query = query_builder.build();
1830
1831            let mut tx = self.db.write().await?;
1832            query.execute(tx.as_mut()).await?;
1833
1834            tx.upsert(
1835                "epoch_migration",
1836                ["table_name", "completed", "migrated_rows"],
1837                ["table_name"],
1838                [("quorum_proposals".to_string(), false, offset)],
1839            )
1840            .await?;
1841            tx.commit().await?;
1842
1843            tracing::info!(
1844                "quorum proposals migration progress: rows={} offset={}",
1845                rows.len(),
1846                offset
1847            );
1848
1849            if rows.len() < batch_size as usize {
1850                break;
1851            }
1852        }
1853
1854        tracing::warn!("migrated quorum proposals");
1855
1856        let mut tx = self.db.write().await?;
1857        tx.upsert(
1858            "epoch_migration",
1859            ["table_name", "completed", "migrated_rows"],
1860            ["table_name"],
1861            [("quorum_proposals".to_string(), true, offset)],
1862        )
1863        .await?;
1864        tx.commit().await?;
1865
1866        tracing::info!("updated epoch_migration table for quorum_proposals");
1867
1868        Ok(())
1869    }
1870
1871    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
1872        let batch_size: i64 = 10000;
1873        let mut tx = self.db.read().await?;
1874
1875        let (is_completed, mut offset) = query_as::<(bool, i64)>(
1876            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
1877             'quorum_certificate'",
1878        )
1879        .fetch_one(tx.as_mut())
1880        .await?;
1881
1882        if is_completed {
1883            tracing::info!("quorum certificates migration already done");
1884            return Ok(());
1885        }
1886
1887        tracing::warn!("migrating quorum certificates..");
1888        loop {
1889            let mut tx = self.db.read().await?;
1890            let rows = query(
1891                "SELECT view, leaf_hash, data FROM quorum_certificate WHERE view >= $1 ORDER BY \
1892                 view LIMIT $2",
1893            )
1894            .bind(offset)
1895            .bind(batch_size)
1896            .fetch_all(tx.as_mut())
1897            .await?;
1898
1899            drop(tx);
1900            if rows.is_empty() {
1901                break;
1902            }
1903            let mut values = Vec::new();
1904
1905            for row in rows.iter() {
1906                let leaf_hash: String = row.try_get("leaf_hash")?;
1907                let data: Vec<u8> = row.try_get("data")?;
1908
1909                let qc: QuorumCertificate<SeqTypes> = bincode::deserialize(&data)?;
1910                let qc2: QuorumCertificate2<SeqTypes> = qc.to_qc2();
1911
1912                let view = qc2.view_number().u64() as i64;
1913                let data = bincode::serialize(&qc2)?;
1914
1915                values.push((view, leaf_hash, data));
1916            }
1917
1918            let mut query_builder: sqlx::QueryBuilder<Db> =
1919                sqlx::QueryBuilder::new("INSERT INTO quorum_certificate2 (view, leaf_hash, data) ");
1920
1921            offset = values.last().context("last row")?.0;
1922
1923            query_builder.push_values(values.into_iter(), |mut b, (view, leaf_hash, data)| {
1924                b.push_bind(view).push_bind(leaf_hash).push_bind(data);
1925            });
1926
1927            query_builder.push(" ON CONFLICT DO NOTHING");
1928            let query = query_builder.build();
1929
1930            let mut tx = self.db.write().await?;
1931            query.execute(tx.as_mut()).await?;
1932
1933            tx.upsert(
1934                "epoch_migration",
1935                ["table_name", "completed", "migrated_rows"],
1936                ["table_name"],
1937                [("quorum_certificate".to_string(), false, offset)],
1938            )
1939            .await?;
1940            tx.commit().await?;
1941
1942            tracing::info!(
1943                "Quorum certificates migration progress: rows={} offset={}",
1944                rows.len(),
1945                offset
1946            );
1947
1948            if rows.len() < batch_size as usize {
1949                break;
1950            }
1951        }
1952
1953        tracing::warn!("migrated quorum certificates");
1954
1955        let mut tx = self.db.write().await?;
1956        tx.upsert(
1957            "epoch_migration",
1958            ["table_name", "completed", "migrated_rows"],
1959            ["table_name"],
1960            [("quorum_certificate".to_string(), true, offset)],
1961        )
1962        .await?;
1963        tx.commit().await?;
1964        tracing::info!("updated epoch_migration table for quorum_certificate");
1965
1966        Ok(())
1967    }
1968
1969    async fn store_next_epoch_quorum_certificate(
1970        &self,
1971        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1972    ) -> anyhow::Result<()> {
1973        let qc2_bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
1974        let mut tx = self.db.write().await?;
1975        tx.upsert(
1976            "next_epoch_quorum_certificate",
1977            ["id", "data"],
1978            ["id"],
1979            [(true, qc2_bytes)],
1980        )
1981        .await?;
1982        tx.commit().await
1983    }
1984
1985    async fn load_next_epoch_quorum_certificate(
1986        &self,
1987    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
1988        let result = self
1989            .db
1990            .read()
1991            .await?
1992            .fetch_optional("SELECT * FROM next_epoch_quorum_certificate where id = true")
1993            .await?;
1994
1995        result
1996            .map(|row| {
1997                let bytes: Vec<u8> = row.get("data");
1998                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1999            })
2000            .transpose()
2001    }
2002
2003    async fn append_da2(
2004        &self,
2005        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
2006        vid_commit: VidCommitment,
2007    ) -> anyhow::Result<()> {
2008        let data = &proposal.data;
2009        let view = data.view_number().u64();
2010        let data_bytes = bincode::serialize(proposal).unwrap();
2011
2012        let now = Instant::now();
2013        let mut tx = self.db.write().await?;
2014        tx.upsert(
2015            "da_proposal2",
2016            ["view", "data", "payload_hash"],
2017            ["view"],
2018            [(view as i64, data_bytes, vid_commit.to_string())],
2019        )
2020        .await?;
2021        let res = tx.commit().await;
2022        self.internal_metrics
2023            .internal_append_da2_duration
2024            .add_point(now.elapsed().as_secs_f64());
2025        res
2026    }
2027
2028    async fn store_drb_result(
2029        &self,
2030        epoch: EpochNumber,
2031        drb_result: DrbResult,
2032    ) -> anyhow::Result<()> {
2033        let drb_result_vec = Vec::from(drb_result);
2034        let mut tx = self.db.write().await?;
2035        tx.upsert(
2036            "epoch_drb_and_root",
2037            ["epoch", "drb_result"],
2038            ["epoch"],
2039            [(epoch.u64() as i64, drb_result_vec)],
2040        )
2041        .await?;
2042        tx.commit().await
2043    }
2044
2045    async fn store_epoch_root(
2046        &self,
2047        epoch: EpochNumber,
2048        block_header: <SeqTypes as NodeType>::BlockHeader,
2049    ) -> anyhow::Result<()> {
2050        let block_header_bytes =
2051            bincode::serialize(&block_header).context("serializing block header")?;
2052
2053        let mut tx = self.db.write().await?;
2054        tx.upsert(
2055            "epoch_drb_and_root",
2056            ["epoch", "block_header"],
2057            ["epoch"],
2058            [(epoch.u64() as i64, block_header_bytes)],
2059        )
2060        .await?;
2061        tx.commit().await
2062    }
2063
2064    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
2065        if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
2066            if loaded_drb_input.iteration >= drb_input.iteration {
2067                anyhow::bail!(
2068                    "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
2069                    loaded_drb_input,
2070                    drb_input
2071                )
2072            }
2073        }
2074
2075        let drb_input_bytes = bincode::serialize(&drb_input)
2076            .context("Failed to serialize DrbInput. This is not fatal, but should never happen.")?;
2077
2078        let mut tx = self.db.write().await?;
2079
2080        tx.upsert(
2081            "drb",
2082            ["epoch", "drb_input"],
2083            ["epoch"],
2084            [(drb_input.epoch as i64, drb_input_bytes)],
2085        )
2086        .await?;
2087        tx.commit().await
2088    }
2089
2090    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
2091        let row = self
2092            .db
2093            .read()
2094            .await?
2095            .fetch_optional(query("SELECT drb_input FROM drb WHERE epoch = $1").bind(epoch as i64))
2096            .await?;
2097
2098        match row {
2099            None => anyhow::bail!("No DrbInput for epoch {} in storage", epoch),
2100            Some(row) => {
2101                let drb_input_bytes: Vec<u8> = row.try_get("drb_input")?;
2102                let drb_input = bincode::deserialize(&drb_input_bytes)
2103                    .context("Failed to deserialize drb_input from storage")?;
2104
2105                Ok(drb_input)
2106            },
2107        }
2108    }
2109
2110    async fn add_state_cert(
2111        &self,
2112        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2113    ) -> anyhow::Result<()> {
2114        let state_cert_bytes = bincode::serialize(&state_cert)
2115            .context("serializing light client state update certificate")?;
2116
2117        let mut tx = self.db.write().await?;
2118        tx.upsert(
2119            "state_cert",
2120            ["view", "state_cert"],
2121            ["view"],
2122            [(
2123                state_cert.light_client_state.view_number as i64,
2124                state_cert_bytes,
2125            )],
2126        )
2127        .await?;
2128        tx.commit().await
2129    }
2130
2131    async fn load_state_cert(
2132        &self,
2133    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2134        let Some(row) = self
2135            .db
2136            .read()
2137            .await?
2138            .fetch_optional(
2139                "SELECT state_cert FROM finalized_state_cert ORDER BY epoch DESC LIMIT 1",
2140            )
2141            .await?
2142        else {
2143            return Ok(None);
2144        };
2145        let bytes: Vec<u8> = row.get("state_cert");
2146
2147        let cert = match bincode::deserialize(&bytes) {
2148            Ok(cert) => cert,
2149            Err(err) => {
2150                tracing::info!(
2151                    error = %err,
2152                    "Failed to deserialize state certificate with v2. attempting with v1"
2153                );
2154
2155                let v1_cert =
2156                    bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2157                        .with_context(|| {
2158                        format!("Failed to deserialize using both v1 and v2. error: {err}")
2159                    })?;
2160
2161                v1_cert.into()
2162            },
2163        };
2164
2165        Ok(Some(cert))
2166    }
2167
2168    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
2169        let rows = self
2170            .db
2171            .read()
2172            .await?
2173            .fetch_all(
2174                query("SELECT * from epoch_drb_and_root ORDER BY epoch DESC LIMIT $1")
2175                    .bind(RECENT_STAKE_TABLES_LIMIT as i64),
2176            )
2177            .await?;
2178
2179        // reverse the rows vector to return the most recent epochs, but in ascending order
2180        rows.into_iter()
2181            .rev()
2182            .map(|row| {
2183                let epoch: i64 = row.try_get("epoch")?;
2184                let drb_result: Option<Vec<u8>> = row.try_get("drb_result")?;
2185                let block_header: Option<Vec<u8>> = row.try_get("block_header")?;
2186                if let Some(drb_result) = drb_result {
2187                    let drb_result_array = drb_result
2188                        .try_into()
2189                        .or_else(|_| bail!("invalid drb result"))?;
2190                    let block_header: Option<<SeqTypes as NodeType>::BlockHeader> = block_header
2191                        .map(|data| bincode::deserialize(&data))
2192                        .transpose()?;
2193                    Ok(Some(InitializerEpochInfo::<SeqTypes> {
2194                        epoch: <SeqTypes as NodeType>::Epoch::new(epoch as u64),
2195                        drb_result: drb_result_array,
2196                        block_header,
2197                    }))
2198                } else {
2199                    // Right now we skip the epoch_drb_and_root row if there is no drb result.
2200                    // This seems reasonable based on the expected order of events, but please double check!
2201                    Ok(None)
2202                }
2203            })
2204            .filter_map(|e| match e {
2205                Err(v) => Some(Err(v)),
2206                Ok(Some(v)) => Some(Ok(v)),
2207                Ok(None) => None,
2208            })
2209            .collect()
2210    }
2211
2212    fn enable_metrics(&mut self, metrics: &dyn Metrics) {
2213        self.internal_metrics = PersistenceMetricsValue::new(metrics);
2214    }
2215}
2216
2217#[async_trait]
2218impl MembershipPersistence for Persistence {
2219    async fn load_stake(
2220        &self,
2221        epoch: EpochNumber,
2222    ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>> {
2223        let result = self
2224            .db
2225            .read()
2226            .await?
2227            .fetch_optional(
2228                query(
2229                    "SELECT stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
2230                     epoch = $1",
2231                )
2232                .bind(epoch.u64() as i64),
2233            )
2234            .await?;
2235
2236        result
2237            .map(|row| {
2238                let stake_table_bytes: Vec<u8> = row.get("stake");
2239                let reward_bytes: Option<Vec<u8>> = row.get("block_reward");
2240                let stake_table_hash_bytes: Option<Vec<u8>> = row.get("stake_table_hash");
2241                let stake_table = bincode::deserialize(&stake_table_bytes)
2242                    .context("deserializing stake table")?;
2243                let reward: Option<RewardAmount> = reward_bytes
2244                    .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
2245                    .transpose()?;
2246                let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes
2247                    .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
2248                    .transpose()?;
2249
2250                Ok((stake_table, reward, stake_table_hash))
2251            })
2252            .transpose()
2253    }
2254
2255    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
2256        let mut tx = self.db.read().await?;
2257
2258        let rows = match query_as::<(i64, Vec<u8>, Option<Vec<u8>>, Option<Vec<u8>>)>(
2259            "SELECT epoch, stake, block_reward, stake_table_hash FROM epoch_drb_and_root ORDER BY \
2260             epoch DESC LIMIT $1",
2261        )
2262        .bind(limit as i64)
2263        .fetch_all(tx.as_mut())
2264        .await
2265        {
2266            Ok(bytes) => bytes,
2267            Err(err) => {
2268                tracing::error!("error loading stake tables: {err:#}");
2269                bail!("{err:#}");
2270            },
2271        };
2272
2273        let stakes: anyhow::Result<Vec<IndexedStake>> = rows
2274            .into_iter()
2275            .map(
2276                |(id, stake_bytes, reward_bytes_opt, stake_table_hash_bytes_opt)| {
2277                    let stake_table =
2278                        bincode::deserialize(&stake_bytes).context("deserializing stake table")?;
2279
2280                    let block_reward: Option<RewardAmount> = reward_bytes_opt
2281                        .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
2282                        .transpose()?;
2283
2284                    let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes_opt
2285                        .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
2286                        .transpose()?;
2287
2288                    Ok((
2289                        EpochNumber::new(id as u64),
2290                        (stake_table, block_reward),
2291                        stake_table_hash,
2292                    ))
2293                },
2294            )
2295            .collect();
2296
2297        Ok(Some(stakes?))
2298    }
2299
2300    async fn store_stake(
2301        &self,
2302        epoch: EpochNumber,
2303        stake: ValidatorMap,
2304        block_reward: Option<RewardAmount>,
2305        stake_table_hash: Option<StakeTableHash>,
2306    ) -> anyhow::Result<()> {
2307        let mut tx = self.db.write().await?;
2308
2309        let stake_table_bytes = bincode::serialize(&stake).context("serializing stake table")?;
2310        let reward_bytes = block_reward
2311            .map(|r| bincode::serialize(&r).context("serializing block reward"))
2312            .transpose()?;
2313        let stake_table_hash_bytes = stake_table_hash
2314            .map(|h| bincode::serialize(&h).context("serializing stake table hash"))
2315            .transpose()?;
2316        tx.upsert(
2317            "epoch_drb_and_root",
2318            ["epoch", "stake", "block_reward", "stake_table_hash"],
2319            ["epoch"],
2320            [(
2321                epoch.u64() as i64,
2322                stake_table_bytes,
2323                reward_bytes,
2324                stake_table_hash_bytes,
2325            )],
2326        )
2327        .await?;
2328        tx.commit().await
2329    }
2330
2331    async fn store_events(
2332        &self,
2333        l1_finalized: u64,
2334        events: Vec<(EventKey, StakeTableEvent)>,
2335    ) -> anyhow::Result<()> {
2336        if events.is_empty() {
2337            return Ok(());
2338        }
2339
2340        let mut tx = self.db.write().await?;
2341
2342        // check last l1 block if there is any
2343        let last_processed_l1_block = query_as::<(i64,)>(
2344            "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
2345        )
2346        .fetch_optional(tx.as_mut())
2347        .await?
2348        .map(|(l1,)| l1);
2349
2350        tracing::debug!("last l1 finalizes in database = {last_processed_l1_block:?}");
2351
2352        // skip events storage if the database already has higher l1 block events
2353        if last_processed_l1_block > Some(l1_finalized.try_into()?) {
2354            tracing::debug!(
2355                ?last_processed_l1_block,
2356                ?l1_finalized,
2357                ?events,
2358                "last l1 finalized stored is already higher"
2359            );
2360            return Ok(());
2361        }
2362
2363        let mut query_builder: sqlx::QueryBuilder<Db> =
2364            sqlx::QueryBuilder::new("INSERT INTO stake_table_events (l1_block, log_index, event) ");
2365
2366        let events = events
2367            .into_iter()
2368            .map(|((block_number, index), event)| {
2369                Ok((
2370                    i64::try_from(block_number)?,
2371                    i64::try_from(index)?,
2372                    serde_json::to_value(event).context("l1 event to value")?,
2373                ))
2374            })
2375            .collect::<anyhow::Result<Vec<_>>>()?;
2376
2377        query_builder.push_values(events.into_iter(), |mut b, (l1_block, log_index, event)| {
2378            b.push_bind(l1_block).push_bind(log_index).push_bind(event);
2379        });
2380
2381        query_builder.push(" ON CONFLICT DO NOTHING");
2382        let query = query_builder.build();
2383
2384        query.execute(tx.as_mut()).await?;
2385
2386        // update l1 block
2387        tx.upsert(
2388            "stake_table_events_l1_block",
2389            ["id", "last_l1_block"],
2390            ["id"],
2391            [(0_i32, l1_finalized as i64)],
2392        )
2393        .await?;
2394
2395        tx.commit().await?;
2396
2397        Ok(())
2398    }
2399
2400    /// Loads all events from persistent storage up to the specified L1 block.
2401    ///
2402    /// # Returns
2403    ///
2404    /// Returns a tuple containing:
2405    /// - `Option<u64>` - The queried L1 block for which all events have been successfully fetched.
2406    /// - `Vec<(EventKey, StakeTableEvent)>` - A list of events, where each entry is a tuple of the event key
2407    /// event key is (l1 block number, log index)
2408    ///   and the corresponding StakeTable event.
2409    ///
2410    async fn load_events(
2411        &self,
2412        to_l1_block: u64,
2413    ) -> anyhow::Result<(
2414        Option<EventsPersistenceRead>,
2415        Vec<(EventKey, StakeTableEvent)>,
2416    )> {
2417        let mut tx = self.db.read().await?;
2418
2419        // check last l1 block if there is any
2420        let res = query_as::<(i64,)>(
2421            "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
2422        )
2423        .fetch_optional(tx.as_mut())
2424        .await?;
2425
2426        let Some((last_processed_l1_block,)) = res else {
2427            // this just means we dont have any events stored
2428            return Ok((None, Vec::new()));
2429        };
2430
2431        // Determine the L1 block for querying events.
2432        // If the last stored L1 block is greater than the requested block, limit the query to the requested block.
2433        // Otherwise, query up to the last stored block.
2434        let to_l1_block = to_l1_block.try_into()?;
2435        let query_l1_block = if last_processed_l1_block > to_l1_block {
2436            to_l1_block
2437        } else {
2438            last_processed_l1_block
2439        };
2440
2441        let rows = query(
2442            "SELECT l1_block, log_index, event FROM stake_table_events WHERE l1_block <= $1 ORDER \
2443             BY l1_block ASC, log_index ASC",
2444        )
2445        .bind(query_l1_block)
2446        .fetch_all(tx.as_mut())
2447        .await?;
2448
2449        let events = rows
2450            .into_iter()
2451            .map(|row| {
2452                let l1_block: i64 = row.try_get("l1_block")?;
2453                let log_index: i64 = row.try_get("log_index")?;
2454                let event = serde_json::from_value(row.try_get("event")?)?;
2455
2456                Ok(((l1_block.try_into()?, log_index.try_into()?), event))
2457            })
2458            .collect::<anyhow::Result<Vec<_>>>()?;
2459
2460        // Determine the read state based on the queried block range.
2461        // - If the persistence returned events up to the requested block, the read is complete.
2462        // - Otherwise, indicate that the read is up to the last processed block.
2463        if query_l1_block == to_l1_block {
2464            Ok((Some(EventsPersistenceRead::Complete), events))
2465        } else {
2466            Ok((
2467                Some(EventsPersistenceRead::UntilL1Block(
2468                    query_l1_block.try_into()?,
2469                )),
2470                events,
2471            ))
2472        }
2473    }
2474}
2475
2476#[async_trait]
2477impl DhtPersistentStorage for Persistence {
2478    /// Save the DHT to the database
2479    ///
2480    /// # Errors
2481    /// - If we fail to serialize the records
2482    /// - If we fail to write the serialized records to the DB
2483    async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
2484        // Bincode-serialize the records
2485        let to_save =
2486            bincode::serialize(&records).with_context(|| "failed to serialize records")?;
2487
2488        // Prepare the statement
2489        let stmt = "INSERT INTO libp2p_dht (id, serialized_records) VALUES (0, $1) ON CONFLICT \
2490                    (id) DO UPDATE SET serialized_records = $1";
2491
2492        // Execute the query
2493        let mut tx = self
2494            .db
2495            .write()
2496            .await
2497            .with_context(|| "failed to start an atomic DB transaction")?;
2498        tx.execute(query(stmt).bind(to_save))
2499            .await
2500            .with_context(|| "failed to execute DB query")?;
2501
2502        // Commit the state
2503        tx.commit().await.with_context(|| "failed to commit to DB")
2504    }
2505
2506    /// Load the DHT from the database
2507    ///
2508    /// # Errors
2509    /// - If we fail to read from the DB
2510    /// - If we fail to deserialize the records
2511    async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
2512        // Fetch the results from the DB
2513        let result = self
2514            .db
2515            .read()
2516            .await
2517            .with_context(|| "failed to start a DB read transaction")?
2518            .fetch_one("SELECT * FROM libp2p_dht where id = 0")
2519            .await
2520            .with_context(|| "failed to fetch from DB")?;
2521
2522        // Get the `serialized_records` row
2523        let serialied_records: Vec<u8> = result.get("serialized_records");
2524
2525        // Deserialize it
2526        let records: Vec<SerializableRecord> = bincode::deserialize(&serialied_records)
2527            .with_context(|| "Failed to deserialize records")?;
2528
2529        Ok(records)
2530    }
2531}
2532
2533#[async_trait]
2534impl Provider<SeqTypes, VidCommonRequest> for Persistence {
2535    #[tracing::instrument(skip(self))]
2536    async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
2537        let mut tx = match self.db.read().await {
2538            Ok(tx) => tx,
2539            Err(err) => {
2540                tracing::warn!("could not open transaction: {err:#}");
2541                return None;
2542            },
2543        };
2544
2545        let bytes = match query_as::<(Vec<u8>,)>(
2546            "SELECT data FROM vid_share2 WHERE payload_hash = $1 LIMIT 1",
2547        )
2548        .bind(req.0.to_string())
2549        .fetch_optional(tx.as_mut())
2550        .await
2551        {
2552            Ok(Some((bytes,))) => bytes,
2553            Ok(None) => return None,
2554            Err(err) => {
2555                tracing::error!("error loading VID share: {err:#}");
2556                return None;
2557            },
2558        };
2559
2560        let share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
2561            match bincode::deserialize(&bytes) {
2562                Ok(share) => share,
2563                Err(err) => {
2564                    tracing::warn!("error decoding VID share: {err:#}");
2565                    return None;
2566                },
2567            };
2568
2569        match share.data {
2570            VidDisperseShare::V0(vid) => Some(VidCommon::V0(vid.common)),
2571            VidDisperseShare::V1(vid) => Some(VidCommon::V1(vid.common)),
2572        }
2573    }
2574}
2575
2576#[async_trait]
2577impl Provider<SeqTypes, PayloadRequest> for Persistence {
2578    #[tracing::instrument(skip(self))]
2579    async fn fetch(&self, req: PayloadRequest) -> Option<Payload> {
2580        let mut tx = match self.db.read().await {
2581            Ok(tx) => tx,
2582            Err(err) => {
2583                tracing::warn!("could not open transaction: {err:#}");
2584                return None;
2585            },
2586        };
2587
2588        let bytes = match query_as::<(Vec<u8>,)>(
2589            "SELECT data FROM da_proposal2 WHERE payload_hash = $1 LIMIT 1",
2590        )
2591        .bind(req.0.to_string())
2592        .fetch_optional(tx.as_mut())
2593        .await
2594        {
2595            Ok(Some((bytes,))) => bytes,
2596            Ok(None) => return None,
2597            Err(err) => {
2598                tracing::warn!("error loading DA proposal: {err:#}");
2599                return None;
2600            },
2601        };
2602
2603        let proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> = match bincode::deserialize(&bytes)
2604        {
2605            Ok(proposal) => proposal,
2606            Err(err) => {
2607                tracing::error!("error decoding DA proposal: {err:#}");
2608                return None;
2609            },
2610        };
2611
2612        Some(Payload::from_bytes(
2613            &proposal.data.encoded_transactions,
2614            &proposal.data.metadata,
2615        ))
2616    }
2617}
2618
2619#[async_trait]
2620impl Provider<SeqTypes, LeafRequest<SeqTypes>> for Persistence {
2621    #[tracing::instrument(skip(self))]
2622    async fn fetch(&self, req: LeafRequest<SeqTypes>) -> Option<LeafQueryData<SeqTypes>> {
2623        let mut tx = match self.db.read().await {
2624            Ok(tx) => tx,
2625            Err(err) => {
2626                tracing::warn!("could not open transaction: {err:#}");
2627                return None;
2628            },
2629        };
2630
2631        let (leaf, qc) = match fetch_leaf_from_proposals(&mut tx, req).await {
2632            Ok(res) => res?,
2633            Err(err) => {
2634                tracing::info!("requested leaf not found in undecided proposals: {err:#}");
2635                return None;
2636            },
2637        };
2638
2639        match LeafQueryData::new(leaf, qc) {
2640            Ok(leaf) => Some(leaf),
2641            Err(err) => {
2642                tracing::warn!("fetched invalid leaf: {err:#}");
2643                None
2644            },
2645        }
2646    }
2647}
2648
2649async fn fetch_leaf_from_proposals<Mode: TransactionMode>(
2650    tx: &mut Transaction<Mode>,
2651    req: LeafRequest<SeqTypes>,
2652) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
2653    // Look for a quorum proposal corresponding to this leaf.
2654    let Some((proposal_bytes,)) =
2655        query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE leaf_hash = $1 LIMIT 1")
2656            .bind(req.expected_leaf.to_string())
2657            .fetch_optional(tx.as_mut())
2658            .await
2659            .context("fetching proposal")?
2660    else {
2661        return Ok(None);
2662    };
2663
2664    // Look for a QC corresponding to this leaf.
2665    let Some((qc_bytes,)) =
2666        query_as::<(Vec<u8>,)>("SELECT data FROM quorum_certificate2 WHERE leaf_hash = $1 LIMIT 1")
2667            .bind(req.expected_leaf.to_string())
2668            .fetch_optional(tx.as_mut())
2669            .await
2670            .context("fetching QC")?
2671    else {
2672        return Ok(None);
2673    };
2674
2675    let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
2676        bincode::deserialize(&proposal_bytes).context("deserializing quorum proposal")?;
2677    let qc: QuorumCertificate2<SeqTypes> =
2678        bincode::deserialize(&qc_bytes).context("deserializing quorum certificate")?;
2679
2680    let leaf = Leaf2::from_quorum_proposal(&proposal.data);
2681    Ok(Some((leaf, qc)))
2682}
2683
2684#[cfg(test)]
2685mod testing {
2686    use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
2687
2688    use super::*;
2689    use crate::persistence::tests::TestablePersistence;
2690
2691    #[async_trait]
2692    impl TestablePersistence for Persistence {
2693        type Storage = Arc<TmpDb>;
2694
2695        async fn tmp_storage() -> Self::Storage {
2696            Arc::new(TmpDb::init().await)
2697        }
2698
2699        #[allow(refining_impl_trait)]
2700        fn options(db: &Self::Storage) -> Options {
2701            #[cfg(not(feature = "embedded-db"))]
2702            {
2703                PostgresOptions {
2704                    port: Some(db.port()),
2705                    host: Some(db.host()),
2706                    user: Some("postgres".into()),
2707                    password: Some("password".into()),
2708                    ..Default::default()
2709                }
2710                .into()
2711            }
2712
2713            #[cfg(feature = "embedded-db")]
2714            {
2715                SqliteOptions {
2716                    path: Some(db.path()),
2717                }
2718                .into()
2719            }
2720        }
2721    }
2722}
2723
2724#[cfg(test)]
2725mod test {
2726
2727    use committable::{Commitment, CommitmentBoundsArkless};
2728    use espresso_types::{traits::NullEventConsumer, Header, Leaf, NodeState, ValidatedState};
2729    use futures::stream::TryStreamExt;
2730    use hotshot_example_types::node_types::TestVersions;
2731    use hotshot_types::{
2732        data::{
2733            ns_table::parse_ns_table, vid_disperse::VidDisperseShare2, EpochNumber, QuorumProposal2,
2734        },
2735        message::convert_proposal,
2736        simple_certificate::QuorumCertificate,
2737        simple_vote::QuorumData,
2738        traits::{
2739            block_contents::{BlockHeader, GENESIS_VID_NUM_STORAGE_NODES},
2740            node_implementation::Versions,
2741            signature_key::SignatureKey,
2742            EncodeBytes,
2743        },
2744        utils::EpochTransitionIndicator,
2745        vid::{
2746            advz::advz_scheme,
2747            avidm::{init_avidm_param, AvidMScheme},
2748        },
2749    };
2750    use jf_vid::VidScheme;
2751    use vbs::version::StaticVersionType;
2752
2753    use super::*;
2754    use crate::{persistence::tests::TestablePersistence as _, BLSPubKey, PubKey};
2755
2756    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2757    async fn test_quorum_proposals_leaf_hash_migration() {
2758        // Create some quorum proposals to test with.
2759        let leaf: Leaf2 =
2760            Leaf::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock())
2761                .await
2762                .into();
2763        let privkey = BLSPubKey::generated_from_seed_indexed([0; 32], 1).1;
2764        let signature = PubKey::sign(&privkey, &[]).unwrap();
2765        let mut quorum_proposal = Proposal {
2766            data: QuorumProposal2::<SeqTypes> {
2767                epoch: None,
2768                block_header: leaf.block_header().clone(),
2769                view_number: ViewNumber::genesis(),
2770                justify_qc: QuorumCertificate::genesis::<TestVersions>(
2771                    &ValidatedState::default(),
2772                    &NodeState::mock(),
2773                )
2774                .await
2775                .to_qc2(),
2776                upgrade_certificate: None,
2777                view_change_evidence: None,
2778                next_drb_result: None,
2779                next_epoch_justify_qc: None,
2780                state_cert: None,
2781            },
2782            signature,
2783            _pd: Default::default(),
2784        };
2785
2786        let qp1: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
2787            convert_proposal(quorum_proposal.clone());
2788
2789        quorum_proposal.data.view_number = ViewNumber::new(1);
2790
2791        let qp2: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
2792            convert_proposal(quorum_proposal.clone());
2793        let qps = [qp1, qp2];
2794
2795        // Create persistence and add the quorum proposals with NULL leaf hash.
2796        let db = Persistence::tmp_storage().await;
2797        let persistence = Persistence::connect(&db).await;
2798        let mut tx = persistence.db.write().await.unwrap();
2799        let params = qps
2800            .iter()
2801            .map(|qp| {
2802                (
2803                    qp.data.view_number.u64() as i64,
2804                    bincode::serialize(&qp).unwrap(),
2805                )
2806            })
2807            .collect::<Vec<_>>();
2808        tx.upsert("quorum_proposals", ["view", "data"], ["view"], params)
2809            .await
2810            .unwrap();
2811        tx.commit().await.unwrap();
2812
2813        // Create a new persistence and ensure the commitments get populated.
2814        let persistence = Persistence::connect(&db).await;
2815        let mut tx = persistence.db.read().await.unwrap();
2816        let rows = tx
2817            .fetch("SELECT * FROM quorum_proposals ORDER BY view ASC")
2818            .try_collect::<Vec<_>>()
2819            .await
2820            .unwrap();
2821        assert_eq!(rows.len(), qps.len());
2822        for (row, qp) in rows.into_iter().zip(qps) {
2823            assert_eq!(row.get::<i64, _>("view"), qp.data.view_number.u64() as i64);
2824            assert_eq!(
2825                row.get::<Vec<u8>, _>("data"),
2826                bincode::serialize(&qp).unwrap()
2827            );
2828            assert_eq!(
2829                row.get::<String, _>("leaf_hash"),
2830                Committable::commit(&Leaf::from_quorum_proposal(&qp.data)).to_string()
2831            );
2832        }
2833    }
2834
2835    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2836    async fn test_fetching_providers() {
2837        let tmp = Persistence::tmp_storage().await;
2838        let storage = Persistence::connect(&tmp).await;
2839
2840        // Mock up some data.
2841        let leaf =
2842            Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
2843        let leaf_payload = leaf.block_payload().unwrap();
2844        let leaf_payload_bytes_arc = leaf_payload.encode();
2845
2846        let avidm_param = init_avidm_param(2).unwrap();
2847        let weights = vec![1u32; 2];
2848
2849        let ns_table = parse_ns_table(
2850            leaf_payload.byte_len().as_usize(),
2851            &leaf_payload.ns_table().encode(),
2852        );
2853        let (payload_commitment, shares) =
2854            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
2855                .unwrap();
2856        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
2857        let vid_share = VidDisperseShare2::<SeqTypes> {
2858            view_number: ViewNumber::new(0),
2859            payload_commitment,
2860            share: shares[0].clone(),
2861            recipient_key: pubkey,
2862            epoch: None,
2863            target_epoch: None,
2864            common: avidm_param.clone(),
2865        }
2866        .to_proposal(&privkey)
2867        .unwrap()
2868        .clone();
2869
2870        let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
2871            proposal: QuorumProposal2::<SeqTypes> {
2872                block_header: leaf.block_header().clone(),
2873                view_number: leaf.view_number(),
2874                justify_qc: leaf.justify_qc(),
2875                upgrade_certificate: None,
2876                view_change_evidence: None,
2877                next_drb_result: None,
2878                next_epoch_justify_qc: None,
2879                epoch: None,
2880                state_cert: None,
2881            },
2882        };
2883        let quorum_proposal_signature =
2884            BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2885                .expect("Failed to sign quorum proposal");
2886        let quorum_proposal = Proposal {
2887            data: quorum_proposal,
2888            signature: quorum_proposal_signature,
2889            _pd: Default::default(),
2890        };
2891
2892        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
2893            .expect("Failed to sign block payload");
2894        let da_proposal = Proposal {
2895            data: DaProposal2::<SeqTypes> {
2896                encoded_transactions: leaf_payload_bytes_arc,
2897                metadata: leaf_payload.ns_table().clone(),
2898                view_number: ViewNumber::new(0),
2899                epoch: None,
2900                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
2901            },
2902            signature: block_payload_signature,
2903            _pd: Default::default(),
2904        };
2905
2906        let mut next_quorum_proposal = quorum_proposal.clone();
2907        next_quorum_proposal.data.proposal.view_number += 1;
2908        next_quorum_proposal.data.proposal.justify_qc.view_number += 1;
2909        next_quorum_proposal
2910            .data
2911            .proposal
2912            .justify_qc
2913            .data
2914            .leaf_commit = Committable::commit(&leaf.clone());
2915        let qc = next_quorum_proposal.data.justify_qc();
2916
2917        // Add to database.
2918        storage
2919            .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
2920            .await
2921            .unwrap();
2922        storage
2923            .append_vid2(&convert_proposal(vid_share.clone()))
2924            .await
2925            .unwrap();
2926        storage
2927            .append_quorum_proposal2(&quorum_proposal)
2928            .await
2929            .unwrap();
2930
2931        // Add an extra quorum proposal so we have a QC pointing back at `leaf`.
2932        storage
2933            .append_quorum_proposal2(&next_quorum_proposal)
2934            .await
2935            .unwrap();
2936
2937        // Fetch it as if we were rebuilding an archive.
2938        assert_eq!(
2939            Some(VidCommon::V1(avidm_param)),
2940            storage
2941                .fetch(VidCommonRequest(VidCommitment::V1(
2942                    vid_share.data.payload_commitment
2943                )))
2944                .await
2945        );
2946        assert_eq!(
2947            leaf_payload,
2948            storage
2949                .fetch(PayloadRequest(VidCommitment::V1(
2950                    vid_share.data.payload_commitment
2951                )))
2952                .await
2953                .unwrap()
2954        );
2955        assert_eq!(
2956            LeafQueryData::new(leaf.clone(), qc.clone()).unwrap(),
2957            storage
2958                .fetch(LeafRequest::new(
2959                    leaf.block_header().block_number(),
2960                    Committable::commit(&leaf),
2961                    qc.clone().commit()
2962                ))
2963                .await
2964                .unwrap()
2965        );
2966    }
2967
2968    /// Test conditions that trigger pruning.
2969    ///
2970    /// This is a configurable test that can be used to test different configurations of GC,
2971    /// `pruning_opt`. The test populates the database with some data for view 1, asserts that it is
2972    /// retained for view 2, and then asserts that it is pruned by view 3. There are various
2973    /// different configurations that can achieve this behavior, such that the data is retained and
2974    /// then pruned due to different logic and code paths.
2975    async fn test_pruning_helper(pruning_opt: ConsensusPruningOptions) {
2976        let tmp = Persistence::tmp_storage().await;
2977        let mut opt = Persistence::options(&tmp);
2978        opt.consensus_pruning = pruning_opt;
2979        let storage = opt.create().await.unwrap();
2980
2981        let data_view = ViewNumber::new(1);
2982
2983        // Populate some data.
2984        let leaf =
2985            Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
2986        let leaf_payload = leaf.block_payload().unwrap();
2987        let leaf_payload_bytes_arc = leaf_payload.encode();
2988
2989        let avidm_param = init_avidm_param(2).unwrap();
2990        let weights = vec![1u32; 2];
2991
2992        let ns_table = parse_ns_table(
2993            leaf_payload.byte_len().as_usize(),
2994            &leaf_payload.ns_table().encode(),
2995        );
2996        let (payload_commitment, shares) =
2997            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
2998                .unwrap();
2999
3000        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
3001        let vid = VidDisperseShare2::<SeqTypes> {
3002            view_number: data_view,
3003            payload_commitment,
3004            share: shares[0].clone(),
3005            recipient_key: pubkey,
3006            epoch: None,
3007            target_epoch: None,
3008            common: avidm_param,
3009        }
3010        .to_proposal(&privkey)
3011        .unwrap()
3012        .clone();
3013        let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
3014            proposal: QuorumProposal2::<SeqTypes> {
3015                epoch: None,
3016                block_header: leaf.block_header().clone(),
3017                view_number: data_view,
3018                justify_qc: QuorumCertificate2::genesis::<TestVersions>(
3019                    &ValidatedState::default(),
3020                    &NodeState::mock(),
3021                )
3022                .await,
3023                upgrade_certificate: None,
3024                view_change_evidence: None,
3025                next_drb_result: None,
3026                next_epoch_justify_qc: None,
3027                state_cert: None,
3028            },
3029        };
3030        let quorum_proposal_signature =
3031            BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3032                .expect("Failed to sign quorum proposal");
3033        let quorum_proposal = Proposal {
3034            data: quorum_proposal,
3035            signature: quorum_proposal_signature,
3036            _pd: Default::default(),
3037        };
3038
3039        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
3040            .expect("Failed to sign block payload");
3041        let da_proposal = Proposal {
3042            data: DaProposal2::<SeqTypes> {
3043                encoded_transactions: leaf_payload_bytes_arc.clone(),
3044                metadata: leaf_payload.ns_table().clone(),
3045                view_number: data_view,
3046                epoch: Some(EpochNumber::new(0)),
3047                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
3048            },
3049            signature: block_payload_signature,
3050            _pd: Default::default(),
3051        };
3052
3053        tracing::info!(?vid, ?da_proposal, ?quorum_proposal, "append data");
3054        storage.append_vid2(&vid).await.unwrap();
3055        storage
3056            .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
3057            .await
3058            .unwrap();
3059        storage
3060            .append_quorum_proposal2(&quorum_proposal)
3061            .await
3062            .unwrap();
3063
3064        // The first decide doesn't trigger any garbage collection, even though our usage exceeds
3065        // the target, because of the minimum retention.
3066        tracing::info!("decide view 1");
3067        storage
3068            .append_decided_leaves(data_view + 1, [], &NullEventConsumer)
3069            .await
3070            .unwrap();
3071        assert_eq!(
3072            storage.load_vid_share(data_view).await.unwrap().unwrap(),
3073            convert_proposal(vid)
3074        );
3075        assert_eq!(
3076            storage.load_da_proposal(data_view).await.unwrap().unwrap(),
3077            da_proposal
3078        );
3079        assert_eq!(
3080            storage.load_quorum_proposal(data_view).await.unwrap(),
3081            quorum_proposal
3082        );
3083
3084        // After another view, our data is beyond the minimum retention (though not the target
3085        // retention) so it gets pruned.
3086        tracing::info!("decide view 2");
3087        storage
3088            .append_decided_leaves(data_view + 2, [], &NullEventConsumer)
3089            .await
3090            .unwrap();
3091        assert!(storage.load_vid_share(data_view).await.unwrap().is_none(),);
3092        assert!(storage.load_da_proposal(data_view).await.unwrap().is_none());
3093        storage.load_quorum_proposal(data_view).await.unwrap_err();
3094    }
3095
3096    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3097    async fn test_pruning_minimum_retention() {
3098        test_pruning_helper(ConsensusPruningOptions {
3099            // Use a very low target usage, to show that we still retain data up to the minimum
3100            // retention even when usage is above target.
3101            target_usage: 0,
3102            minimum_retention: 1,
3103            // Use a very high target retention, so that pruning is only triggered by the minimum
3104            // retention.
3105            target_retention: u64::MAX,
3106        })
3107        .await
3108    }
3109
3110    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3111    async fn test_pruning_target_retention() {
3112        test_pruning_helper(ConsensusPruningOptions {
3113            target_retention: 1,
3114            // Use a very low minimum retention, so that data is only kept around due to the target
3115            // retention.
3116            minimum_retention: 0,
3117            // Use a very high target usage, so that pruning is only triggered by the target
3118            // retention.
3119            target_usage: u64::MAX,
3120        })
3121        .await
3122    }
3123
3124    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3125    async fn test_consensus_migration() {
3126        let tmp = Persistence::tmp_storage().await;
3127        let mut opt = Persistence::options(&tmp);
3128
3129        let storage = opt.create().await.unwrap();
3130
3131        let rows = 300;
3132
3133        assert!(storage.load_state_cert().await.unwrap().is_none());
3134
3135        for i in 0..rows {
3136            let view = ViewNumber::new(i);
3137            let validated_state = ValidatedState::default();
3138            let instance_state = NodeState::default();
3139
3140            let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
3141            let (payload, metadata) =
3142                Payload::from_transactions([], &validated_state, &instance_state)
3143                    .await
3144                    .unwrap();
3145
3146            let payload_bytes = payload.encode();
3147
3148            let block_header =
3149                Header::genesis::<TestVersions>(&instance_state, payload.clone(), &metadata);
3150
3151            let null_quorum_data = QuorumData {
3152                leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
3153            };
3154
3155            let justify_qc = QuorumCertificate::new(
3156                null_quorum_data.clone(),
3157                null_quorum_data.commit(),
3158                view,
3159                None,
3160                std::marker::PhantomData,
3161            );
3162
3163            let quorum_proposal = QuorumProposal {
3164                block_header,
3165                view_number: view,
3166                justify_qc: justify_qc.clone(),
3167                upgrade_certificate: None,
3168                proposal_certificate: None,
3169            };
3170
3171            let quorum_proposal_signature =
3172                BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3173                    .expect("Failed to sign quorum proposal");
3174
3175            let proposal = Proposal {
3176                data: quorum_proposal.clone(),
3177                signature: quorum_proposal_signature,
3178                _pd: std::marker::PhantomData,
3179            };
3180
3181            let proposal_bytes = bincode::serialize(&proposal)
3182                .context("serializing proposal")
3183                .unwrap();
3184
3185            let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
3186            leaf.fill_block_payload::<TestVersions>(
3187                payload,
3188                GENESIS_VID_NUM_STORAGE_NODES,
3189                <TestVersions as Versions>::Base::VERSION,
3190            )
3191            .unwrap();
3192
3193            let mut tx = storage.db.write().await.unwrap();
3194
3195            let qc_bytes = bincode::serialize(&justify_qc).unwrap();
3196            let leaf_bytes = bincode::serialize(&leaf).unwrap();
3197
3198            tx.upsert(
3199                "anchor_leaf",
3200                ["view", "leaf", "qc"],
3201                ["view"],
3202                [(i as i64, leaf_bytes, qc_bytes)],
3203            )
3204            .await
3205            .unwrap();
3206
3207            let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
3208                epoch: EpochNumber::new(i),
3209                light_client_state: Default::default(), // filling arbitrary value
3210                next_stake_table_state: Default::default(), // filling arbitrary value
3211                signatures: vec![],                     // filling arbitrary value
3212                auth_root: Default::default(),
3213            };
3214            // manually upsert the state cert to the finalized database
3215            let state_cert_bytes = bincode::serialize(&state_cert).unwrap();
3216            tx.upsert(
3217                "finalized_state_cert",
3218                ["epoch", "state_cert"],
3219                ["epoch"],
3220                [(i as i64, state_cert_bytes)],
3221            )
3222            .await
3223            .unwrap();
3224
3225            tx.commit().await.unwrap();
3226
3227            let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
3228                .disperse(payload_bytes.clone())
3229                .unwrap();
3230
3231            let vid = ADVZDisperseShare::<SeqTypes> {
3232                view_number: ViewNumber::new(i),
3233                payload_commitment: Default::default(),
3234                share: disperse.shares[0].clone(),
3235                common: disperse.common,
3236                recipient_key: pubkey,
3237            };
3238
3239            let (payload, metadata) =
3240                Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
3241                    .await
3242                    .unwrap();
3243
3244            let da = DaProposal::<SeqTypes> {
3245                encoded_transactions: payload.encode(),
3246                metadata,
3247                view_number: ViewNumber::new(i),
3248            };
3249
3250            let block_payload_signature =
3251                BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
3252
3253            let da_proposal = Proposal {
3254                data: da,
3255                signature: block_payload_signature,
3256                _pd: Default::default(),
3257            };
3258
3259            storage
3260                .append_vid(&vid.to_proposal(&privkey).unwrap())
3261                .await
3262                .unwrap();
3263            storage
3264                .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
3265                .await
3266                .unwrap();
3267
3268            let leaf_hash = Committable::commit(&leaf);
3269            let mut tx = storage.db.write().await.expect("failed to start write tx");
3270            tx.upsert(
3271                "quorum_proposals",
3272                ["view", "leaf_hash", "data"],
3273                ["view"],
3274                [(i as i64, leaf_hash.to_string(), proposal_bytes)],
3275            )
3276            .await
3277            .expect("failed to upsert quorum proposal");
3278
3279            let justify_qc = &proposal.data.justify_qc;
3280            let justify_qc_bytes = bincode::serialize(&justify_qc)
3281                .context("serializing QC")
3282                .unwrap();
3283            tx.upsert(
3284                "quorum_certificate",
3285                ["view", "leaf_hash", "data"],
3286                ["view"],
3287                [(
3288                    justify_qc.view_number.u64() as i64,
3289                    justify_qc.data.leaf_commit.to_string(),
3290                    &justify_qc_bytes,
3291                )],
3292            )
3293            .await
3294            .expect("failed to upsert qc");
3295
3296            tx.commit().await.expect("failed to commit");
3297        }
3298
3299        storage.migrate_consensus().await.unwrap();
3300
3301        let mut tx = storage.db.read().await.unwrap();
3302        let (anchor_leaf2_count,) = query_as::<(i64,)>("SELECT COUNT(*) from anchor_leaf2")
3303            .fetch_one(tx.as_mut())
3304            .await
3305            .unwrap();
3306        assert_eq!(
3307            anchor_leaf2_count, rows as i64,
3308            "anchor leaf count does not match rows",
3309        );
3310
3311        let (da_proposal_count,) = query_as::<(i64,)>("SELECT COUNT(*) from da_proposal2")
3312            .fetch_one(tx.as_mut())
3313            .await
3314            .unwrap();
3315        assert_eq!(
3316            da_proposal_count, rows as i64,
3317            "da proposal count does not match rows",
3318        );
3319
3320        let (vid_share_count,) = query_as::<(i64,)>("SELECT COUNT(*) from vid_share2")
3321            .fetch_one(tx.as_mut())
3322            .await
3323            .unwrap();
3324        assert_eq!(
3325            vid_share_count, rows as i64,
3326            "vid share count does not match rows"
3327        );
3328
3329        let (quorum_proposals_count,) =
3330            query_as::<(i64,)>("SELECT COUNT(*) from quorum_proposals2")
3331                .fetch_one(tx.as_mut())
3332                .await
3333                .unwrap();
3334        assert_eq!(
3335            quorum_proposals_count, rows as i64,
3336            "quorum proposals count does not match rows",
3337        );
3338
3339        let (quorum_certificates_count,) =
3340            query_as::<(i64,)>("SELECT COUNT(*) from quorum_certificate2")
3341                .fetch_one(tx.as_mut())
3342                .await
3343                .unwrap();
3344        assert_eq!(
3345            quorum_certificates_count, rows as i64,
3346            "quorum certificates count does not match rows",
3347        );
3348
3349        let (state_cert_count,) = query_as::<(i64,)>("SELECT COUNT(*) from finalized_state_cert")
3350            .fetch_one(tx.as_mut())
3351            .await
3352            .unwrap();
3353        assert_eq!(
3354            state_cert_count, rows as i64,
3355            "Light client state update certificates count does not match rows",
3356        );
3357        assert_eq!(
3358            storage.load_state_cert().await.unwrap().unwrap(),
3359            LightClientStateUpdateCertificateV2::<SeqTypes> {
3360                epoch: EpochNumber::new(rows - 1),
3361                light_client_state: Default::default(),
3362                next_stake_table_state: Default::default(),
3363                signatures: vec![],
3364                auth_root: Default::default(),
3365            },
3366            "Wrong light client state update certificate in the storage",
3367        );
3368
3369        storage.migrate_consensus().await.unwrap();
3370    }
3371}
3372
3373#[cfg(test)]
3374#[cfg(not(feature = "embedded-db"))]
3375mod postgres_tests {
3376    use espresso_types::{FeeAccount, Header, Leaf, NodeState, Transaction as Tx};
3377    use hotshot_example_types::node_types::TestVersions;
3378    use hotshot_query_service::{
3379        availability::BlockQueryData, data_source::storage::UpdateAvailabilityStorage,
3380    };
3381    use hotshot_types::{
3382        data::vid_commitment,
3383        simple_certificate::QuorumCertificate,
3384        traits::{
3385            block_contents::{BlockHeader, BuilderFee, GENESIS_VID_NUM_STORAGE_NODES},
3386            election::Membership,
3387            signature_key::BuilderSignatureKey,
3388            EncodeBytes,
3389        },
3390    };
3391
3392    use super::*;
3393    use crate::persistence::tests::TestablePersistence as _;
3394
3395    async fn test_postgres_read_ns_table(instance_state: NodeState) {
3396        instance_state
3397            .coordinator
3398            .membership()
3399            .write()
3400            .await
3401            .set_first_epoch(EpochNumber::genesis(), Default::default());
3402
3403        let tmp = Persistence::tmp_storage().await;
3404        let mut opt = Persistence::options(&tmp);
3405        let storage = opt.create().await.unwrap();
3406
3407        let txs = [
3408            Tx::new(10001u32.into(), vec![1, 2, 3]),
3409            Tx::new(10001u32.into(), vec![4, 5, 6]),
3410            Tx::new(10009u32.into(), vec![7, 8, 9]),
3411        ];
3412
3413        let validated_state = Default::default();
3414        let justify_qc =
3415            QuorumCertificate::genesis::<TestVersions>(&validated_state, &instance_state).await;
3416        let view_number: ViewNumber = justify_qc.view_number + 1;
3417        let parent_leaf = Leaf::genesis::<TestVersions>(&validated_state, &instance_state)
3418            .await
3419            .into();
3420
3421        let (payload, ns_table) =
3422            Payload::from_transactions(txs.clone(), &validated_state, &instance_state)
3423                .await
3424                .unwrap();
3425        let payload_bytes = payload.encode();
3426        let payload_commitment = vid_commitment::<TestVersions>(
3427            &payload_bytes,
3428            &ns_table.encode(),
3429            GENESIS_VID_NUM_STORAGE_NODES,
3430            instance_state.current_version,
3431        );
3432        let builder_commitment = payload.builder_commitment(&ns_table);
3433        let (fee_account, fee_key) = FeeAccount::generated_from_seed_indexed([0; 32], 0);
3434        let fee_amount = 0;
3435        let fee_signature = FeeAccount::sign_fee(&fee_key, fee_amount, &ns_table).unwrap();
3436        let block_header = Header::new(
3437            &validated_state,
3438            &instance_state,
3439            &parent_leaf,
3440            payload_commitment,
3441            builder_commitment,
3442            ns_table,
3443            BuilderFee {
3444                fee_amount,
3445                fee_account,
3446                fee_signature,
3447            },
3448            instance_state.current_version,
3449            view_number.u64(),
3450        )
3451        .await
3452        .unwrap();
3453        let proposal = QuorumProposal {
3454            block_header: block_header.clone(),
3455            view_number,
3456            justify_qc: justify_qc.clone(),
3457            upgrade_certificate: None,
3458            proposal_certificate: None,
3459        };
3460        let leaf: Leaf2 = Leaf::from_quorum_proposal(&proposal).into();
3461        let mut qc = justify_qc.to_qc2();
3462        qc.data.leaf_commit = leaf.commit();
3463        qc.view_number = view_number;
3464
3465        let mut tx = storage.db.write().await.unwrap();
3466        tx.insert_leaf(LeafQueryData::new(leaf, qc).unwrap())
3467            .await
3468            .unwrap();
3469        tx.insert_block(BlockQueryData::<SeqTypes>::new(block_header, payload))
3470            .await
3471            .unwrap();
3472        tx.commit().await.unwrap();
3473
3474        let mut tx = storage.db.read().await.unwrap();
3475        let rows = query(
3476            "
3477            SELECT ns_id, read_ns_id(get_ns_table(h.data), t.ns_index) AS read_ns_id
3478              FROM header AS h
3479              JOIN transactions AS t ON t.block_height = h.height
3480              ORDER BY t.ns_index, t.position
3481        ",
3482        )
3483        .fetch_all(tx.as_mut())
3484        .await
3485        .unwrap();
3486        assert_eq!(rows.len(), txs.len());
3487        for (i, row) in rows.into_iter().enumerate() {
3488            let ns = u64::from(txs[i].namespace()) as i64;
3489            assert_eq!(row.get::<i64, _>("ns_id"), ns);
3490            assert_eq!(row.get::<i64, _>("read_ns_id"), ns);
3491        }
3492    }
3493
3494    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3495    async fn test_postgres_read_ns_table_v0_1() {
3496        test_postgres_read_ns_table(NodeState::mock()).await;
3497    }
3498
3499    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3500    async fn test_postgres_read_ns_table_v0_2() {
3501        test_postgres_read_ns_table(NodeState::mock_v2()).await;
3502    }
3503
3504    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3505    async fn test_postgres_read_ns_table_v0_3() {
3506        test_postgres_read_ns_table(NodeState::mock_v3().with_epoch_height(0)).await;
3507    }
3508}