sequencer/persistence/
sql.rs

1use std::{
2    collections::BTreeMap,
3    path::PathBuf,
4    str::FromStr,
5    sync::Arc,
6    time::{Duration, Instant},
7};
8
9use anyhow::{bail, Context};
10use async_trait::async_trait;
11use clap::Parser;
12use committable::Committable;
13use derivative::Derivative;
14use derive_more::derive::{From, Into};
15use espresso_types::{
16    parse_duration, parse_size,
17    traits::{EventsPersistenceRead, MembershipPersistence},
18    v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence, StateCatchup},
19    v0_3::{EventKey, IndexedStake, RewardAmount, StakeTableEvent, Validator},
20    BackoffParams, BlockMerkleTree, FeeMerkleTree, Leaf, Leaf2, NetworkConfig, Payload, PubKey,
21    StakeTableHash, ValidatorMap,
22};
23use futures::stream::StreamExt;
24use hotshot::InitializerEpochInfo;
25use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
26    DhtPersistentStorage, SerializableRecord,
27};
28use hotshot_query_service::{
29    availability::LeafQueryData,
30    data_source::{
31        storage::{
32            pruning::PrunerCfg,
33            sql::{
34                include_migrations, query_as, syntax_helpers::MAX_FN, Config, Db, Read, SqlStorage,
35                StorageConnectionType, Transaction, TransactionMode, Write,
36            },
37        },
38        Transaction as _, VersionedDataSource,
39    },
40    fetching::{
41        request::{LeafRequest, PayloadRequest, VidCommonRequest},
42        Provider,
43    },
44    merklized_state::MerklizedState,
45};
46use hotshot_types::{
47    data::{
48        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
49        QuorumProposalWrapperLegacy, VidCommitment, VidCommon, VidDisperseShare, VidDisperseShare0,
50    },
51    drb::{DrbInput, DrbResult},
52    event::{Event, EventType, HotShotAction, LeafInfo},
53    message::{convert_proposal, Proposal},
54    simple_certificate::{
55        CertificatePair, LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
56        NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
57    },
58    traits::{
59        block_contents::{BlockHeader, BlockPayload},
60        metrics::Metrics,
61        node_implementation::ConsensusTime,
62    },
63    vote::HasViewNumber,
64};
65use itertools::Itertools;
66use sqlx::{query, Executor, QueryBuilder, Row};
67
68use crate::{
69    catchup::SqlStateCatchup,
70    persistence::{migrate_network_config, persistence_metrics::PersistenceMetricsValue},
71    NodeType, SeqTypes, ViewNumber, RECENT_STAKE_TABLES_LIMIT,
72};
73
74/// Options for Postgres-backed persistence.
75#[derive(Parser, Clone, Derivative)]
76#[derivative(Debug)]
77pub struct PostgresOptions {
78    /// Hostname for the remote Postgres database server.
79    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_HOST")]
80    pub(crate) host: Option<String>,
81
82    /// Port for the remote Postgres database server.
83    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PORT")]
84    pub(crate) port: Option<u16>,
85
86    /// Name of database to connect to.
87    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_DATABASE")]
88    pub(crate) database: Option<String>,
89
90    /// Postgres user to connect as.
91    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USER")]
92    pub(crate) user: Option<String>,
93
94    /// Password for Postgres user.
95    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PASSWORD")]
96    // Hide from debug output since may contain sensitive data.
97    #[derivative(Debug = "ignore")]
98    pub(crate) password: Option<String>,
99
100    /// Use TLS for an encrypted connection to the database.
101    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USE_TLS")]
102    pub(crate) use_tls: bool,
103}
104
105impl Default for PostgresOptions {
106    fn default() -> Self {
107        Self::parse_from(std::iter::empty::<String>())
108    }
109}
110
111#[derive(Parser, Clone, Derivative, Default, From, Into)]
112#[derivative(Debug)]
113pub struct SqliteOptions {
114    /// Base directory for the SQLite database.
115    /// The SQLite file will be created in the `sqlite` subdirectory with filename as `database`.
116    #[clap(
117        long,
118        env = "ESPRESSO_SEQUENCER_STORAGE_PATH",
119        value_parser = build_sqlite_path
120    )]
121    pub(crate) path: PathBuf,
122}
123
124pub fn build_sqlite_path(path: &str) -> anyhow::Result<PathBuf> {
125    let sub_dir = PathBuf::from_str(path)?.join("sqlite");
126
127    // if `sqlite` sub dir does not exist then create it
128    if !sub_dir.exists() {
129        std::fs::create_dir_all(&sub_dir)
130            .with_context(|| format!("failed to create directory: {sub_dir:?}"))?;
131    }
132
133    Ok(sub_dir.join("database"))
134}
135
136/// Options for database-backed persistence, supporting both Postgres and SQLite.
137#[derive(Parser, Clone, Derivative, From, Into)]
138#[derivative(Debug)]
139pub struct Options {
140    #[cfg(not(feature = "embedded-db"))]
141    #[clap(flatten)]
142    pub(crate) postgres_options: PostgresOptions,
143
144    #[cfg(feature = "embedded-db")]
145    #[clap(flatten)]
146    pub(crate) sqlite_options: SqliteOptions,
147
148    /// Database URI for Postgres or SQLite.
149    ///
150    /// This is a shorthand for setting a number of other options all at once. The URI has the
151    /// following format ([brackets] indicate optional segments):
152    ///
153    /// - **Postgres:** `postgres[ql]://[username[:password]@][host[:port],]/database[?parameter_list]`
154    /// - **SQLite:** `sqlite://path/to/db.sqlite`
155    ///
156    /// Options set explicitly via other env vars or flags will take precedence, so you can use this
157    /// URI to set a baseline and then use other parameters to override or add configuration. In
158    /// addition, there are some parameters which cannot be set via the URI, such as TLS.
159    // Hide from debug output since may contain sensitive data.
160    #[derivative(Debug = "ignore")]
161    pub(crate) uri: Option<String>,
162
163    /// This will enable the pruner and set the default pruning parameters unless provided.
164    /// Default parameters:
165    /// - pruning_threshold: 3 TB
166    /// - minimum_retention: 1 day
167    /// - target_retention: 7 days
168    /// - batch_size: 1000
169    /// - max_usage: 80%
170    /// - interval: 1 hour
171    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_PRUNE")]
172    pub(crate) prune: bool,
173
174    /// Pruning parameters.
175    #[clap(flatten)]
176    pub(crate) pruning: PruningOptions,
177
178    /// Pruning parameters for ephemeral consensus storage.
179    #[clap(flatten)]
180    pub(crate) consensus_pruning: ConsensusPruningOptions,
181
182    /// Specifies the maximum number of concurrent fetch requests allowed from peers.
183    #[clap(long, env = "ESPRESSO_SEQUENCER_FETCH_RATE_LIMIT")]
184    pub(crate) fetch_rate_limit: Option<usize>,
185
186    /// The minimum delay between active fetches in a stream.
187    #[clap(long, env = "ESPRESSO_SEQUENCER_ACTIVE_FETCH_DELAY", value_parser = parse_duration)]
188    pub(crate) active_fetch_delay: Option<Duration>,
189
190    /// The minimum delay between loading chunks in a stream.
191    #[clap(long, env = "ESPRESSO_SEQUENCER_CHUNK_FETCH_DELAY", value_parser = parse_duration)]
192    pub(crate) chunk_fetch_delay: Option<Duration>,
193
194    /// Disable pruning and reconstruct previously pruned data.
195    ///
196    /// While running without pruning is the default behavior, the default will not try to
197    /// reconstruct data that was pruned in a previous run where pruning was enabled. This option
198    /// instructs the service to run without pruning _and_ reconstruct all previously pruned data by
199    /// fetching from peers.
200    #[clap(long, env = "ESPRESSO_SEQUENCER_ARCHIVE", conflicts_with = "prune")]
201    pub(crate) archive: bool,
202
203    /// Turns on leaf only data storage
204    #[clap(
205        long,
206        env = "ESPRESSO_SEQUENCER_LIGHTWEIGHT",
207        default_value_t = false,
208        conflicts_with = "archive"
209    )]
210    pub(crate) lightweight: bool,
211
212    /// The maximum idle time of a database connection.
213    ///
214    /// Any connection which has been open and unused longer than this duration will be
215    /// automatically closed to reduce load on the server.
216    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_IDLE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
217    pub(crate) idle_connection_timeout: Duration,
218
219    /// The maximum lifetime of a database connection.
220    ///
221    /// Any connection which has been open longer than this duration will be automatically closed
222    /// (and, if needed, replaced), even if it is otherwise healthy. It is good practice to refresh
223    /// even healthy connections once in a while (e.g. daily) in case of resource leaks in the
224    /// server implementation.
225    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "30m")]
226    pub(crate) connection_timeout: Duration,
227
228    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_SLOW_STATEMENT_THRESHOLD", value_parser = parse_duration, default_value = "1s")]
229    pub(crate) slow_statement_threshold: Duration,
230
231    /// The maximum time a single SQL statement is allowed to run before being canceled.
232    ///
233    /// This helps prevent queries from running indefinitely and consuming resources.
234    /// Set to 10 minutes by default
235    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_STATEMENT_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
236    pub(crate) statement_timeout: Duration,
237
238    /// The minimum number of database connections to maintain at any time.
239    ///
240    /// The database client will, to the best of its ability, maintain at least `min` open
241    /// connections at all times. This can be used to reduce the latency hit of opening new
242    /// connections when at least this many simultaneous connections are frequently needed.
243    #[clap(
244        long,
245        env = "ESPRESSO_SEQUENCER_DATABASE_MIN_CONNECTIONS",
246        default_value = "0"
247    )]
248    pub(crate) min_connections: u32,
249
250    /// Allows setting a different maximum number of connections for query operations.
251    /// Default value of None implies using the min_connections value.
252    #[cfg(not(feature = "embedded-db"))]
253    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_QUERY_MIN_CONNECTIONS", default_value = None)]
254    pub(crate) query_min_connections: Option<u32>,
255
256    /// The maximum number of database connections to maintain at any time.
257    ///
258    /// Once `max` connections are in use simultaneously, further attempts to acquire a connection
259    /// (or begin a transaction) will block until one of the existing connections is released.
260    #[clap(
261        long,
262        env = "ESPRESSO_SEQUENCER_DATABASE_MAX_CONNECTIONS",
263        default_value = "25"
264    )]
265    pub(crate) max_connections: u32,
266
267    /// Allows setting a different maximum number of connections for query operations.
268    /// Default value of None implies using the max_connections value.
269    #[cfg(not(feature = "embedded-db"))]
270    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_QUERY_MAX_CONNECTIONS", default_value = None)]
271    pub(crate) query_max_connections: Option<u32>,
272
273    /// Sets the batch size for the types migration.
274    /// Determines how many `(leaf, vid)` rows are selected from the old types table
275    /// and migrated at once.
276    /// Default is `10000`` if not set
277    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_TYPES_MIGRATION_BATCH_SIZE")]
278    pub(crate) types_migration_batch_size: Option<u64>,
279
280    // Keep the database connection pool when persistence is created,
281    // allowing it to be reused across multiple instances instead of creating
282    // a new pool each time such as for API, consensus storage etc
283    // This also ensures all storage instances adhere to the MAX_CONNECTIONS limit if set
284    //
285    // Note: Cloning the `Pool` is lightweight and efficient because it simply
286    // creates a new reference-counted handle to the underlying pool state.
287    #[clap(skip)]
288    pub(crate) pool: Option<sqlx::Pool<Db>>,
289}
290
291impl Default for Options {
292    fn default() -> Self {
293        Self::parse_from(std::iter::empty::<String>())
294    }
295}
296
297#[cfg(not(feature = "embedded-db"))]
298impl From<PostgresOptions> for Config {
299    fn from(opt: PostgresOptions) -> Self {
300        let mut cfg = Config::default();
301
302        if let Some(host) = opt.host {
303            cfg = cfg.host(host);
304        }
305
306        if let Some(port) = opt.port {
307            cfg = cfg.port(port);
308        }
309
310        if let Some(database) = &opt.database {
311            cfg = cfg.database(database);
312        }
313
314        if let Some(user) = &opt.user {
315            cfg = cfg.user(user);
316        }
317
318        if let Some(password) = &opt.password {
319            cfg = cfg.password(password);
320        }
321
322        if opt.use_tls {
323            cfg = cfg.tls();
324        }
325
326        cfg = cfg.max_connections(20);
327        cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
328        cfg = cfg.connection_timeout(Duration::from_secs(10240));
329        cfg = cfg.slow_statement_threshold(Duration::from_secs(1));
330        cfg = cfg.statement_timeout(Duration::from_secs(600)); // 10 minutes default
331
332        cfg
333    }
334}
335
336#[cfg(feature = "embedded-db")]
337impl From<SqliteOptions> for Config {
338    fn from(opt: SqliteOptions) -> Self {
339        let mut cfg = Config::default();
340
341        cfg = cfg.db_path(opt.path);
342
343        cfg = cfg.max_connections(20);
344        cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
345        cfg = cfg.connection_timeout(Duration::from_secs(10240));
346        cfg = cfg.slow_statement_threshold(Duration::from_secs(2));
347        cfg = cfg.statement_timeout(Duration::from_secs(600));
348        cfg
349    }
350}
351
352#[cfg(not(feature = "embedded-db"))]
353impl From<PostgresOptions> for Options {
354    fn from(opt: PostgresOptions) -> Self {
355        Options {
356            postgres_options: opt,
357            max_connections: 20,
358            idle_connection_timeout: Duration::from_secs(120),
359            connection_timeout: Duration::from_secs(10240),
360            slow_statement_threshold: Duration::from_secs(1),
361            statement_timeout: Duration::from_secs(600),
362            ..Default::default()
363        }
364    }
365}
366
367#[cfg(feature = "embedded-db")]
368impl From<SqliteOptions> for Options {
369    fn from(opt: SqliteOptions) -> Self {
370        Options {
371            sqlite_options: opt,
372            max_connections: 5,
373            idle_connection_timeout: Duration::from_secs(120),
374            connection_timeout: Duration::from_secs(10240),
375            slow_statement_threshold: Duration::from_secs(1),
376            uri: None,
377            statement_timeout: Duration::from_secs(600),
378            prune: false,
379            pruning: Default::default(),
380            consensus_pruning: Default::default(),
381            fetch_rate_limit: None,
382            active_fetch_delay: None,
383            chunk_fetch_delay: None,
384            archive: false,
385            lightweight: false,
386            min_connections: 0,
387            types_migration_batch_size: None,
388            pool: None,
389        }
390    }
391}
392impl TryFrom<&Options> for Config {
393    type Error = anyhow::Error;
394
395    fn try_from(opt: &Options) -> Result<Self, Self::Error> {
396        let mut cfg = match &opt.uri {
397            Some(uri) => uri.parse()?,
398            None => Self::default(),
399        };
400
401        if let Some(pool) = &opt.pool {
402            cfg = cfg.pool(pool.clone());
403        }
404
405        cfg = cfg.max_connections(opt.max_connections);
406        cfg = cfg.idle_connection_timeout(opt.idle_connection_timeout);
407        cfg = cfg.min_connections(opt.min_connections);
408
409        #[cfg(not(feature = "embedded-db"))]
410        {
411            cfg =
412                cfg.query_max_connections(opt.query_max_connections.unwrap_or(opt.max_connections));
413            cfg =
414                cfg.query_min_connections(opt.query_min_connections.unwrap_or(opt.min_connections));
415        }
416
417        cfg = cfg.connection_timeout(opt.connection_timeout);
418        cfg = cfg.slow_statement_threshold(opt.slow_statement_threshold);
419        cfg = cfg.statement_timeout(opt.statement_timeout);
420
421        #[cfg(not(feature = "embedded-db"))]
422        {
423            cfg = cfg.migrations(include_migrations!(
424                "$CARGO_MANIFEST_DIR/api/migrations/postgres"
425            ));
426
427            let pg_options = &opt.postgres_options;
428
429            if let Some(host) = &pg_options.host {
430                cfg = cfg.host(host.clone());
431            }
432
433            if let Some(port) = pg_options.port {
434                cfg = cfg.port(port);
435            }
436
437            if let Some(database) = &pg_options.database {
438                cfg = cfg.database(database);
439            }
440
441            if let Some(user) = &pg_options.user {
442                cfg = cfg.user(user);
443            }
444
445            if let Some(password) = &pg_options.password {
446                cfg = cfg.password(password);
447            }
448
449            if pg_options.use_tls {
450                cfg = cfg.tls();
451            }
452        }
453
454        #[cfg(feature = "embedded-db")]
455        {
456            cfg = cfg.migrations(include_migrations!(
457                "$CARGO_MANIFEST_DIR/api/migrations/sqlite"
458            ));
459
460            cfg = cfg.db_path(opt.sqlite_options.path.clone());
461        }
462
463        if opt.prune {
464            cfg = cfg.pruner_cfg(PrunerCfg::from(opt.pruning))?;
465        }
466        if opt.archive {
467            cfg = cfg.archive();
468        }
469
470        Ok(cfg)
471    }
472}
473
474/// Pruning parameters.
475#[derive(Parser, Clone, Copy, Debug)]
476pub struct PruningOptions {
477    /// Threshold for pruning, specified in bytes.
478    /// If the disk usage surpasses this threshold, pruning is initiated for data older than the specified minimum retention period.
479    /// Pruning continues until the disk usage drops below the MAX USAGE.
480    #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_PRUNING_THRESHOLD", value_parser = parse_size)]
481    pruning_threshold: Option<u64>,
482
483    /// Minimum retention period.
484    /// Data is retained for at least this duration, even if there's no free disk space.
485    #[clap(
486        long,
487        env = "ESPRESSO_SEQUENCER_PRUNER_MINIMUM_RETENTION",
488        value_parser = parse_duration,
489    )]
490    minimum_retention: Option<Duration>,
491
492    /// Target retention period.
493    /// Data older than this is pruned to free up space.
494    #[clap(
495        long,
496        env = "ESPRESSO_SEQUENCER_PRUNER_TARGET_RETENTION",
497        value_parser = parse_duration,
498    )]
499    target_retention: Option<Duration>,
500
501    /// Batch size for pruning.
502    /// This is the number of blocks data to delete in a single transaction.
503    #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_BATCH_SIZE")]
504    batch_size: Option<u64>,
505
506    /// Maximum disk usage (in basis points).
507    ///
508    /// Pruning stops once the disk usage falls below this value, even if
509    /// some data older than the `MINIMUM_RETENTION` remains. Values range
510    /// from 0 (0%) to 10000 (100%).
511    #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_MAX_USAGE")]
512    max_usage: Option<u16>,
513
514    /// Interval for running the pruner.
515    #[clap(
516        long,
517        env = "ESPRESSO_SEQUENCER_PRUNER_INTERVAL",
518        value_parser = parse_duration,
519    )]
520    interval: Option<Duration>,
521
522    /// Number of SQLite pages to vacuum from the freelist
523    /// during each pruner cycle.
524    /// This value corresponds to `N` in the SQLite PRAGMA `incremental_vacuum(N)`,
525    #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_INCREMENTAL_VACUUM_PAGES")]
526    pages: Option<u64>,
527}
528
529impl Default for PruningOptions {
530    fn default() -> Self {
531        Self::parse_from(std::iter::empty::<String>())
532    }
533}
534
535impl From<PruningOptions> for PrunerCfg {
536    fn from(opt: PruningOptions) -> Self {
537        let mut cfg = PrunerCfg::new();
538        if let Some(threshold) = opt.pruning_threshold {
539            cfg = cfg.with_pruning_threshold(threshold);
540        }
541        if let Some(min) = opt.minimum_retention {
542            cfg = cfg.with_minimum_retention(min);
543        }
544        if let Some(target) = opt.target_retention {
545            cfg = cfg.with_target_retention(target);
546        }
547        if let Some(batch) = opt.batch_size {
548            cfg = cfg.with_batch_size(batch);
549        }
550        if let Some(max) = opt.max_usage {
551            cfg = cfg.with_max_usage(max);
552        }
553        if let Some(interval) = opt.interval {
554            cfg = cfg.with_interval(interval);
555        }
556
557        if let Some(pages) = opt.pages {
558            cfg = cfg.with_incremental_vacuum_pages(pages)
559        }
560
561        cfg = cfg.with_state_tables(vec![
562            BlockMerkleTree::state_type().to_string(),
563            FeeMerkleTree::state_type().to_string(),
564        ]);
565
566        cfg
567    }
568}
569
570/// Pruning parameters for ephemeral consensus storage.
571#[derive(Parser, Clone, Copy, Debug)]
572pub struct ConsensusPruningOptions {
573    /// Number of views to try to retain in consensus storage before data that hasn't been archived
574    /// is garbage collected.
575    ///
576    /// The longer this is, the more certain that all data will eventually be archived, even if
577    /// there are temporary problems with archive storage or partially missing data. This can be set
578    /// very large, as most data is garbage collected as soon as it is finalized by consensus. This
579    /// setting only applies to views which never get decided (ie forks in consensus) and views for
580    /// which this node is partially offline. These should be exceptionally rare.
581    ///
582    /// Note that in extreme scenarios, data may be garbage collected even before TARGET_RETENTION
583    /// views, if consensus storage exceeds TARGET_USAGE. For a hard lower bound on how long
584    /// consensus data will be retained, see MINIMUM_RETENTION.
585    ///
586    /// The default of 302000 views equates to approximately 1 week (604800 seconds) at an average
587    /// view time of 2s.
588    #[clap(
589        name = "TARGET_RETENTION",
590        long = "consensus-storage-target-retention",
591        env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION",
592        default_value = "302000"
593    )]
594    target_retention: u64,
595
596    /// Minimum number of views to try to retain in consensus storage before data that hasn't been
597    /// archived is garbage collected.
598    ///
599    /// This bound allows data to be retained even if consensus storage occupies more than
600    /// TARGET_USAGE. This can be used to ensure sufficient time to move consensus data to archival
601    /// storage as necessary, even under extreme circumstances where otherwise garbage collection
602    /// would kick in based on TARGET_RETENTION.
603    ///
604    /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average
605    /// view time of 2s.
606    #[clap(
607        name = "MINIMUM_RETENTION",
608        long = "consensus-storage-minimum-retention",
609        env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION",
610        default_value = "130000"
611    )]
612    minimum_retention: u64,
613
614    /// Amount (in bytes) of data to retain in consensus storage before garbage collecting more
615    /// aggressively.
616    ///
617    /// See also TARGET_RETENTION and MINIMUM_RETENTION.
618    #[clap(
619        name = "TARGET_USAGE",
620        long = "consensus-storage-target-usage",
621        env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE",
622        default_value = "1000000000"
623    )]
624    target_usage: u64,
625}
626
627impl Default for ConsensusPruningOptions {
628    fn default() -> Self {
629        Self::parse_from(std::iter::empty::<String>())
630    }
631}
632
633#[async_trait]
634impl PersistenceOptions for Options {
635    type Persistence = Persistence;
636
637    fn set_view_retention(&mut self, view_retention: u64) {
638        self.consensus_pruning.target_retention = view_retention;
639        self.consensus_pruning.minimum_retention = view_retention;
640    }
641
642    async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
643        let config = (&*self).try_into()?;
644        let persistence = Persistence {
645            db: SqlStorage::connect(config, StorageConnectionType::Sequencer).await?,
646            gc_opt: self.consensus_pruning,
647            internal_metrics: PersistenceMetricsValue::default(),
648        };
649        persistence.migrate_quorum_proposal_leaf_hashes().await?;
650        self.pool = Some(persistence.db.pool());
651        Ok(persistence)
652    }
653
654    async fn reset(self) -> anyhow::Result<()> {
655        SqlStorage::connect(
656            Config::try_from(&self)?.reset_schema(),
657            StorageConnectionType::Sequencer,
658        )
659        .await?;
660        Ok(())
661    }
662}
663
664/// Postgres-backed persistence.
665#[derive(Clone, Debug)]
666pub struct Persistence {
667    db: SqlStorage,
668    gc_opt: ConsensusPruningOptions,
669    /// A reference to the internal metrics
670    internal_metrics: PersistenceMetricsValue,
671}
672
673impl Persistence {
674    /// Ensure the `leaf_hash` column is populated for all existing quorum proposals.
675    ///
676    /// This column was added in a migration, but because it requires computing a commitment of the
677    /// existing data, it is not easy to populate in the SQL migration itself. Thus, on startup, we
678    /// check if there are any just-migrated quorum proposals with a `NULL` value for this column,
679    /// and if so we populate the column manually.
680    async fn migrate_quorum_proposal_leaf_hashes(&self) -> anyhow::Result<()> {
681        let mut tx = self.db.write().await?;
682
683        let mut proposals = tx.fetch("SELECT * FROM quorum_proposals");
684
685        let mut updates = vec![];
686        while let Some(row) = proposals.next().await {
687            let row = row?;
688
689            let hash: Option<String> = row.try_get("leaf_hash")?;
690            if hash.is_none() {
691                let view: i64 = row.try_get("view")?;
692                let data: Vec<u8> = row.try_get("data")?;
693                let proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
694                    bincode::deserialize(&data)?;
695                let leaf = Leaf::from_quorum_proposal(&proposal.data);
696                let leaf_hash = Committable::commit(&leaf);
697                tracing::info!(view, %leaf_hash, "populating quorum proposal leaf hash");
698                updates.push((view, leaf_hash.to_string()));
699            }
700        }
701        drop(proposals);
702
703        tx.upsert("quorum_proposals", ["view", "leaf_hash"], ["view"], updates)
704            .await?;
705
706        tx.commit().await
707    }
708
709    async fn generate_decide_events(
710        &self,
711        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
712        consumer: &impl EventConsumer,
713    ) -> anyhow::Result<()> {
714        let mut last_processed_view: Option<i64> = self
715            .db
716            .read()
717            .await?
718            .fetch_optional("SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1")
719            .await?
720            .map(|row| row.get("last_processed_view"));
721        loop {
722            // In SQLite, overlapping read and write transactions can lead to database errors. To
723            // avoid this:
724            // - start a read transaction to query and collect all the necessary data.
725            // - Commit (or implicitly drop) the read transaction once the data is fetched.
726            // - use the collected data to generate a "decide" event for the consumer.
727            // - begin a write transaction to delete the data and update the event stream.
728            let mut tx = self.db.read().await?;
729
730            // Collect a chain of consecutive leaves, starting from the first view after the last
731            // decide. This will correspond to a decide event, and defines a range of views which
732            // can be garbage collected. This may even include views for which there was no leaf,
733            // for which we might still have artifacts like proposals that never finalized.
734            let from_view = match last_processed_view {
735                Some(v) => v + 1,
736                None => 0,
737            };
738            tracing::debug!(?from_view, "generate decide event");
739
740            let mut parent = None;
741            let mut rows = query(
742                "SELECT leaf, qc, next_epoch_qc FROM anchor_leaf2 WHERE view >= $1 ORDER BY view",
743            )
744            .bind(from_view)
745            .fetch(tx.as_mut());
746            let mut leaves = vec![];
747            let mut final_qc = None;
748            while let Some(row) = rows.next().await {
749                let row = match row {
750                    Ok(row) => row,
751                    Err(err) => {
752                        // If there's an error getting a row, try generating an event with the rows
753                        // we do have.
754                        tracing::warn!("error loading row: {err:#}");
755                        break;
756                    },
757                };
758
759                let leaf_data: Vec<u8> = row.get("leaf");
760                let leaf = bincode::deserialize::<Leaf2>(&leaf_data)?;
761                let qc_data: Vec<u8> = row.get("qc");
762                let qc = bincode::deserialize::<QuorumCertificate2<SeqTypes>>(&qc_data)?;
763                let next_epoch_qc = match row.get::<Option<Vec<u8>>, _>("next_epoch_qc") {
764                    Some(bytes) => {
765                        Some(bincode::deserialize::<NextEpochQuorumCertificate2<SeqTypes>>(&bytes)?)
766                    },
767                    None => None,
768                };
769                let height = leaf.block_header().block_number();
770
771                // Ensure we are only dealing with a consecutive chain of leaves. We don't want to
772                // garbage collect any views for which we missed a leaf or decide event; at least
773                // not right away, in case we need to recover that data later.
774                if let Some(parent) = parent {
775                    if height != parent + 1 {
776                        tracing::debug!(
777                            height,
778                            parent,
779                            "ending decide event at non-consecutive leaf"
780                        );
781                        break;
782                    }
783                }
784                parent = Some(height);
785                leaves.push(leaf);
786                final_qc = Some(CertificatePair::new(qc, next_epoch_qc));
787            }
788            drop(rows);
789
790            let Some(final_qc) = final_qc else {
791                // End event processing when there are no more decided views.
792                tracing::debug!(from_view, "no new leaves at decide");
793                return Ok(());
794            };
795
796            // Find the range of views encompassed by this leaf chain. All data in this range can be
797            // processed by the consumer and then deleted.
798            let from_view = leaves[0].view_number();
799            let to_view = leaves[leaves.len() - 1].view_number();
800
801            // Collect VID shares for the decide event.
802            let mut vid_shares = tx
803                .fetch_all(
804                    query("SELECT view, data FROM vid_share2 where view >= $1 AND view <= $2")
805                        .bind(from_view.u64() as i64)
806                        .bind(to_view.u64() as i64),
807                )
808                .await?
809                .into_iter()
810                .map(|row| {
811                    let view: i64 = row.get("view");
812                    let data: Vec<u8> = row.get("data");
813                    let vid_proposal = bincode::deserialize::<
814                        Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
815                    >(&data)?;
816                    Ok((view as u64, vid_proposal.data))
817                })
818                .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
819
820            // Collect DA proposals for the decide event.
821            let mut da_proposals = tx
822                .fetch_all(
823                    query("SELECT view, data FROM da_proposal2 where view >= $1 AND view <= $2")
824                        .bind(from_view.u64() as i64)
825                        .bind(to_view.u64() as i64),
826                )
827                .await?
828                .into_iter()
829                .map(|row| {
830                    let view: i64 = row.get("view");
831                    let data: Vec<u8> = row.get("data");
832                    let da_proposal =
833                        bincode::deserialize::<Proposal<SeqTypes, DaProposal2<SeqTypes>>>(&data)?;
834                    Ok((view as u64, da_proposal.data))
835                })
836                .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
837
838            // Collect state certs for the decide event.
839            let state_certs = Self::load_state_certs(&mut tx, from_view, to_view)
840                .await
841                .inspect_err(|err| {
842                    tracing::error!(
843                        ?from_view,
844                        ?to_view,
845                        "failed to load state certificates. error={err:#}"
846                    );
847                })?;
848            drop(tx);
849
850            // Collate all the information by view number and construct a chain of leaves.
851            let leaf_chain = leaves
852                .into_iter()
853                // Go in reverse chronological order, as expected by Decide events.
854                .rev()
855                .map(|mut leaf| {
856                    let view = leaf.view_number();
857
858                    // Include the VID share if available.
859                    let vid_share = vid_shares.remove(&view);
860                    if vid_share.is_none() {
861                        tracing::debug!(?view, "VID share not available at decide");
862                    }
863
864                    // Fill in the full block payload using the DA proposals we had persisted.
865                    if let Some(proposal) = da_proposals.remove(&view) {
866                        let payload =
867                            Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata);
868                        leaf.fill_block_payload_unchecked(payload);
869                    } else if view == ViewNumber::genesis() {
870                        // We don't get a DA proposal for the genesis view, but we know what the
871                        // payload always is.
872                        leaf.fill_block_payload_unchecked(Payload::empty().0);
873                    } else {
874                        tracing::debug!(?view, "DA proposal not available at decide");
875                    }
876
877                    let state_cert = state_certs
878                        .get(&view)
879                        .cloned();
880
881                    LeafInfo {
882                        leaf,
883                        vid_share,
884                        state_cert,
885                        // Note: the following fields are not used in Decide event processing, and
886                        // should be removed. For now, we just default them.
887                        state: Default::default(),
888                        delta: Default::default(),
889                    }
890                })
891                .collect();
892
893            {
894                // Generate decide event for the consumer.
895                tracing::debug!(
896                    ?from_view,
897                    ?to_view,
898                    ?final_qc,
899                    ?leaf_chain,
900                    "generating decide event"
901                );
902                // Insert the deciding QC at the appropriate position, with the last decide event in
903                // the chain.
904                let deciding_qc = if let Some(deciding_qc) = &deciding_qc {
905                    (deciding_qc.view_number() == final_qc.view_number() + 1)
906                        .then_some(deciding_qc.clone())
907                } else {
908                    None
909                };
910                consumer
911                    .handle_event(&Event {
912                        view_number: to_view,
913                        event: EventType::Decide {
914                            leaf_chain: Arc::new(leaf_chain),
915                            committing_qc: Arc::new(final_qc),
916                            deciding_qc,
917                            block_size: None,
918                        },
919                    })
920                    .await?;
921            }
922
923            let mut tx = self.db.write().await?;
924
925            // Now that we have definitely processed leaves up to `to_view`, we can update
926            // `last_processed_view` so we don't process these leaves again. We may still fail at
927            // this point, or shut down, and fail to complete this update. At worst this will lead
928            // to us sending a duplicate decide event the next time we are called; this is fine as
929            // the event consumer is required to be idempotent.
930            tx.upsert(
931                "event_stream",
932                ["id", "last_processed_view"],
933                ["id"],
934                [(1i32, to_view.u64() as i64)],
935            )
936            .await?;
937
938            // Store all the finalized state certs
939            for (epoch, state_cert) in state_certs {
940                let state_cert_bytes = bincode::serialize(&state_cert)?;
941                tx.upsert(
942                    "finalized_state_cert",
943                    ["epoch", "state_cert"],
944                    ["epoch"],
945                    [(epoch as i64, state_cert_bytes)],
946                )
947                .await?;
948            }
949
950            // Delete the data that has been fully processed.
951            tx.execute(
952                query("DELETE FROM vid_share2 where view >= $1 AND view <= $2")
953                    .bind(from_view.u64() as i64)
954                    .bind(to_view.u64() as i64),
955            )
956            .await?;
957            tx.execute(
958                query("DELETE FROM da_proposal2 where view >= $1 AND view <= $2")
959                    .bind(from_view.u64() as i64)
960                    .bind(to_view.u64() as i64),
961            )
962            .await?;
963            tx.execute(
964                query("DELETE FROM quorum_proposals2 where view >= $1 AND view <= $2")
965                    .bind(from_view.u64() as i64)
966                    .bind(to_view.u64() as i64),
967            )
968            .await?;
969            tx.execute(
970                query("DELETE FROM quorum_certificate2 where view >= $1 AND view <= $2")
971                    .bind(from_view.u64() as i64)
972                    .bind(to_view.u64() as i64),
973            )
974            .await?;
975            tx.execute(
976                query("DELETE FROM state_cert where view >= $1 AND view <= $2")
977                    .bind(from_view.u64() as i64)
978                    .bind(to_view.u64() as i64),
979            )
980            .await?;
981
982            // Clean up leaves, but do not delete the most recent one (all leaves with a view number
983            // less than the given value). This is necessary to ensure that, in case of a restart,
984            // we can resume from the last decided leaf.
985            tx.execute(
986                query("DELETE FROM anchor_leaf2 WHERE view >= $1 AND view < $2")
987                    .bind(from_view.u64() as i64)
988                    .bind(to_view.u64() as i64),
989            )
990            .await?;
991
992            tx.commit().await?;
993            last_processed_view = Some(to_view.u64() as i64);
994        }
995    }
996
997    async fn load_state_certs(
998        tx: &mut Transaction<Read>,
999        from_view: ViewNumber,
1000        to_view: ViewNumber,
1001    ) -> anyhow::Result<BTreeMap<u64, LightClientStateUpdateCertificateV2<SeqTypes>>> {
1002        let rows = tx
1003            .fetch_all(
1004                query("SELECT view, state_cert FROM state_cert WHERE view >= $1 AND view <= $2")
1005                    .bind(from_view.u64() as i64)
1006                    .bind(to_view.u64() as i64),
1007            )
1008            .await?;
1009
1010        let mut result = BTreeMap::new();
1011
1012        for row in rows {
1013            let data: Vec<u8> = row.get("state_cert");
1014
1015            let cert: LightClientStateUpdateCertificateV2<SeqTypes> = bincode::deserialize(&data)
1016                .or_else(|err_v2| {
1017                bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&data)
1018                    .map(Into::into)
1019                    .context(format!(
1020                        "Failed to deserialize LightClientStateUpdateCertificate: with v1 and v2. \
1021                         error: {err_v2}"
1022                    ))
1023            })?;
1024
1025            result.insert(cert.epoch.u64(), cert);
1026        }
1027
1028        Ok(result)
1029    }
1030
1031    #[tracing::instrument(skip(self))]
1032    async fn prune(&self, cur_view: ViewNumber) -> anyhow::Result<()> {
1033        let mut tx = self.db.write().await?;
1034
1035        // Prune everything older than the target retention period.
1036        prune_to_view(
1037            &mut tx,
1038            cur_view.u64().saturating_sub(self.gc_opt.target_retention),
1039        )
1040        .await?;
1041
1042        // Check our storage usage; if necessary we will prune more aggressively (up to the minimum
1043        // retention) to get below the target usage.
1044        #[cfg(feature = "embedded-db")]
1045        let usage_query = format!(
1046            "SELECT sum(pgsize) FROM dbstat WHERE name IN ({})",
1047            PRUNE_TABLES
1048                .iter()
1049                .map(|table| format!("'{table}'"))
1050                .join(",")
1051        );
1052
1053        #[cfg(not(feature = "embedded-db"))]
1054        let usage_query = {
1055            let table_sizes = PRUNE_TABLES
1056                .iter()
1057                .map(|table| format!("pg_table_size('{table}')"))
1058                .join(" + ");
1059            format!("SELECT {table_sizes}")
1060        };
1061
1062        let (usage,): (i64,) = query_as(&usage_query).fetch_one(tx.as_mut()).await?;
1063        tracing::debug!(usage, "consensus storage usage after pruning");
1064
1065        if (usage as u64) > self.gc_opt.target_usage {
1066            tracing::warn!(
1067                usage,
1068                gc_opt = ?self.gc_opt,
1069                "consensus storage is running out of space, pruning to minimum retention"
1070            );
1071            prune_to_view(
1072                &mut tx,
1073                cur_view.u64().saturating_sub(self.gc_opt.minimum_retention),
1074            )
1075            .await?;
1076        }
1077
1078        tx.commit().await
1079    }
1080}
1081
1082const PRUNE_TABLES: &[&str] = &[
1083    "anchor_leaf2",
1084    "vid_share2",
1085    "da_proposal2",
1086    "quorum_proposals2",
1087    "quorum_certificate2",
1088];
1089
1090async fn prune_to_view(tx: &mut Transaction<Write>, view: u64) -> anyhow::Result<()> {
1091    if view == 0 {
1092        // Nothing to prune, the entire chain is younger than the retention period.
1093        return Ok(());
1094    }
1095    tracing::debug!(view, "pruning consensus storage");
1096
1097    for table in PRUNE_TABLES {
1098        let res = query(&format!("DELETE FROM {table} WHERE view < $1"))
1099            .bind(view as i64)
1100            .execute(tx.as_mut())
1101            .await
1102            .context(format!("pruning {table}"))?;
1103        if res.rows_affected() > 0 {
1104            tracing::info!(
1105                "garbage collected {} rows from {table}",
1106                res.rows_affected()
1107            );
1108        }
1109    }
1110
1111    Ok(())
1112}
1113
1114#[async_trait]
1115impl SequencerPersistence for Persistence {
1116    fn into_catchup_provider(
1117        self,
1118        backoff: BackoffParams,
1119    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
1120        Ok(Arc::new(SqlStateCatchup::new(Arc::new(self.db), backoff)))
1121    }
1122
1123    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
1124        tracing::info!("loading config from Postgres");
1125
1126        // Select the most recent config (although there should only be one).
1127        let Some(row) = self
1128            .db
1129            .read()
1130            .await?
1131            .fetch_optional("SELECT config FROM network_config ORDER BY id DESC LIMIT 1")
1132            .await?
1133        else {
1134            tracing::info!("config not found");
1135            return Ok(None);
1136        };
1137        let json = row.try_get("config")?;
1138
1139        let json = migrate_network_config(json).context("migration of network config failed")?;
1140        let config = serde_json::from_value(json).context("malformed config file")?;
1141
1142        Ok(Some(config))
1143    }
1144
1145    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
1146        tracing::info!("saving config to database");
1147        let json = serde_json::to_value(cfg)?;
1148
1149        let mut tx = self.db.write().await?;
1150        tx.execute(query("INSERT INTO network_config (config) VALUES ($1)").bind(json))
1151            .await?;
1152        tx.commit().await
1153    }
1154
1155    async fn append_decided_leaves(
1156        &self,
1157        view: ViewNumber,
1158        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
1159        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
1160        consumer: &(impl EventConsumer + 'static),
1161    ) -> anyhow::Result<()> {
1162        let values = leaf_chain
1163            .into_iter()
1164            .map(|(info, cert)| {
1165                // The leaf may come with a large payload attached. We don't care about this payload
1166                // because we already store it separately, as part of the DA proposal. Storing it
1167                // here contributes to load on the DB for no reason, so we remove it before
1168                // serializing the leaf.
1169                let mut leaf = info.leaf.clone();
1170                leaf.unfill_block_payload();
1171
1172                let view = cert.view_number().u64() as i64;
1173                let leaf_bytes = bincode::serialize(&leaf)?;
1174                let qc_bytes = bincode::serialize(cert.qc())?;
1175                let next_epoch_qc_bytes = match cert.next_epoch_qc() {
1176                    Some(qc) => Some(bincode::serialize(qc)?),
1177                    None => None,
1178                };
1179                Ok((view, leaf_bytes, qc_bytes, next_epoch_qc_bytes))
1180            })
1181            .collect::<anyhow::Result<Vec<_>>>()?;
1182
1183        // First, append the new leaves. We do this in its own transaction because even if GC or the
1184        // event consumer later fails, there is no need to abort the storage of the leaves.
1185        let mut tx = self.db.write().await?;
1186
1187        tx.upsert(
1188            "anchor_leaf2",
1189            ["view", "leaf", "qc", "next_epoch_qc"],
1190            ["view"],
1191            values,
1192        )
1193        .await?;
1194        tx.commit().await?;
1195
1196        // Generate an event for the new leaves and, only if it succeeds, clean up data we no longer
1197        // need.
1198        if let Err(err) = self.generate_decide_events(deciding_qc, consumer).await {
1199            // GC/event processing failure is not an error, since by this point we have at least
1200            // managed to persist the decided leaves successfully, and GC will just run again at the
1201            // next decide. Log an error but do not return it.
1202            tracing::warn!(?view, "event processing failed: {err:#}");
1203            return Ok(());
1204        }
1205
1206        // Garbage collect data which was not included in any decide event, but which at this point
1207        // is old enough to just forget about.
1208        if let Err(err) = self.prune(view).await {
1209            tracing::warn!(?view, "pruning failed: {err:#}");
1210        }
1211
1212        Ok(())
1213    }
1214
1215    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1216        Ok(self
1217            .db
1218            .read()
1219            .await?
1220            .fetch_optional(query("SELECT view FROM highest_voted_view WHERE id = 0"))
1221            .await?
1222            .map(|row| {
1223                let view: i64 = row.get("view");
1224                ViewNumber::new(view as u64)
1225            }))
1226    }
1227
1228    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1229        Ok(self
1230            .db
1231            .read()
1232            .await?
1233            .fetch_optional(query("SELECT view FROM restart_view WHERE id = 0"))
1234            .await?
1235            .map(|row| {
1236                let view: i64 = row.get("view");
1237                ViewNumber::new(view as u64)
1238            }))
1239    }
1240
1241    async fn load_anchor_leaf(
1242        &self,
1243    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
1244        let Some(row) = self
1245            .db
1246            .read()
1247            .await?
1248            .fetch_optional("SELECT leaf, qc FROM anchor_leaf2 ORDER BY view DESC LIMIT 1")
1249            .await?
1250        else {
1251            return Ok(None);
1252        };
1253
1254        let leaf_bytes: Vec<u8> = row.get("leaf");
1255        let leaf2: Leaf2 = bincode::deserialize(&leaf_bytes)?;
1256
1257        let qc_bytes: Vec<u8> = row.get("qc");
1258        let qc2: QuorumCertificate2<SeqTypes> = bincode::deserialize(&qc_bytes)?;
1259
1260        Ok(Some((leaf2, qc2)))
1261    }
1262
1263    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
1264        let mut tx = self.db.read().await?;
1265        let (view,) = query_as::<(i64,)>("SELECT coalesce(max(view), 0) FROM anchor_leaf2")
1266            .fetch_one(tx.as_mut())
1267            .await?;
1268        Ok(ViewNumber::new(view as u64))
1269    }
1270
1271    async fn load_da_proposal(
1272        &self,
1273        view: ViewNumber,
1274    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
1275        let result = self
1276            .db
1277            .read()
1278            .await?
1279            .fetch_optional(
1280                query("SELECT data FROM da_proposal2 where view = $1").bind(view.u64() as i64),
1281            )
1282            .await?;
1283
1284        result
1285            .map(|row| {
1286                let bytes: Vec<u8> = row.get("data");
1287                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1288            })
1289            .transpose()
1290    }
1291
1292    async fn load_vid_share(
1293        &self,
1294        view: ViewNumber,
1295    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
1296        let result = self
1297            .db
1298            .read()
1299            .await?
1300            .fetch_optional(
1301                query("SELECT data FROM vid_share2 where view = $1").bind(view.u64() as i64),
1302            )
1303            .await?;
1304
1305        result
1306            .map(|row| {
1307                let bytes: Vec<u8> = row.get("data");
1308                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1309            })
1310            .transpose()
1311    }
1312
1313    async fn load_quorum_proposals(
1314        &self,
1315    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
1316    {
1317        let rows = self
1318            .db
1319            .read()
1320            .await?
1321            .fetch_all("SELECT * FROM quorum_proposals2")
1322            .await?;
1323
1324        Ok(BTreeMap::from_iter(
1325            rows.into_iter()
1326                .map(|row| {
1327                    let view: i64 = row.get("view");
1328                    let view_number: ViewNumber = ViewNumber::new(view.try_into()?);
1329                    let bytes: Vec<u8> = row.get("data");
1330                    let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1331                        bincode::deserialize(&bytes).or_else(|error| {
1332                            bincode::deserialize::<
1333                                Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>,
1334                            >(&bytes)
1335                            .map(convert_proposal)
1336                            .inspect_err(|err_v3| {
1337                                tracing::warn!(
1338                                    ?view_number,
1339                                    %error,
1340                                    %err_v3,
1341                                    "ignoring malformed quorum proposal DB row"
1342                                );
1343                            })
1344                        })?;
1345                    Ok((view_number, proposal))
1346                })
1347                .collect::<anyhow::Result<Vec<_>>>()?,
1348        ))
1349    }
1350
1351    async fn load_quorum_proposal(
1352        &self,
1353        view: ViewNumber,
1354    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
1355        let mut tx = self.db.read().await?;
1356        let (data,) =
1357            query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE view = $1 LIMIT 1")
1358                .bind(view.u64() as i64)
1359                .fetch_one(tx.as_mut())
1360                .await?;
1361        let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1362            bincode::deserialize(&data).or_else(|error| {
1363                bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1364                    &data,
1365                )
1366                .map(convert_proposal)
1367                .context(format!(
1368                    "Failed to deserialize quorum proposal for view {view}. error={error}"
1369                ))
1370            })?;
1371
1372        Ok(proposal)
1373    }
1374
1375    async fn append_vid(
1376        &self,
1377        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1378    ) -> anyhow::Result<()> {
1379        let view = proposal.data.view_number().u64();
1380        let payload_hash = proposal.data.payload_commitment();
1381        let data_bytes = bincode::serialize(proposal).unwrap();
1382
1383        let now = Instant::now();
1384        let mut tx = self.db.write().await?;
1385        tx.upsert(
1386            "vid_share2",
1387            ["view", "data", "payload_hash"],
1388            ["view"],
1389            [(view as i64, data_bytes, payload_hash.to_string())],
1390        )
1391        .await?;
1392        let res = tx.commit().await;
1393        self.internal_metrics
1394            .internal_append_vid_duration
1395            .add_point(now.elapsed().as_secs_f64());
1396        res
1397    }
1398
1399    async fn append_da(
1400        &self,
1401        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1402        vid_commit: VidCommitment,
1403    ) -> anyhow::Result<()> {
1404        let data = &proposal.data;
1405        let view = data.view_number().u64();
1406        let data_bytes = bincode::serialize(proposal).unwrap();
1407
1408        let now = Instant::now();
1409        let mut tx = self.db.write().await?;
1410        tx.upsert(
1411            "da_proposal",
1412            ["view", "data", "payload_hash"],
1413            ["view"],
1414            [(view as i64, data_bytes, vid_commit.to_string())],
1415        )
1416        .await?;
1417        let res = tx.commit().await;
1418        self.internal_metrics
1419            .internal_append_da_duration
1420            .add_point(now.elapsed().as_secs_f64());
1421        res
1422    }
1423
1424    async fn record_action(
1425        &self,
1426        view: ViewNumber,
1427        _epoch: Option<EpochNumber>,
1428        action: HotShotAction,
1429    ) -> anyhow::Result<()> {
1430        // Todo Remove this after https://github.com/EspressoSystems/espresso-sequencer/issues/1931
1431        if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
1432            return Ok(());
1433        }
1434
1435        let stmt = format!(
1436            "INSERT INTO highest_voted_view (id, view) VALUES (0, $1)
1437            ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(highest_voted_view.view, excluded.view)"
1438        );
1439
1440        let mut tx = self.db.write().await?;
1441        tx.execute(query(&stmt).bind(view.u64() as i64)).await?;
1442
1443        if matches!(action, HotShotAction::Vote) {
1444            let restart_view = view + 1;
1445            let stmt = format!(
1446                "INSERT INTO restart_view (id, view) VALUES (0, $1)
1447                ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(restart_view.view, excluded.view)"
1448            );
1449            tx.execute(query(&stmt).bind(restart_view.u64() as i64))
1450                .await?;
1451        }
1452
1453        tx.commit().await
1454    }
1455
1456    async fn append_quorum_proposal2(
1457        &self,
1458        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1459    ) -> anyhow::Result<()> {
1460        let view_number = proposal.data.view_number().u64();
1461
1462        let proposal_bytes = bincode::serialize(&proposal).context("serializing proposal")?;
1463        let leaf_hash = Committable::commit(&Leaf2::from_quorum_proposal(&proposal.data));
1464
1465        let now = Instant::now();
1466        let mut tx = self.db.write().await?;
1467        tx.upsert(
1468            "quorum_proposals2",
1469            ["view", "leaf_hash", "data"],
1470            ["view"],
1471            [(view_number as i64, leaf_hash.to_string(), proposal_bytes)],
1472        )
1473        .await?;
1474
1475        // We also keep track of any QC we see in case we need it to recover our archival storage.
1476        let justify_qc = proposal.data.justify_qc();
1477        let justify_qc_bytes = bincode::serialize(&justify_qc).context("serializing QC")?;
1478        tx.upsert(
1479            "quorum_certificate2",
1480            ["view", "leaf_hash", "data"],
1481            ["view"],
1482            [(
1483                justify_qc.view_number.u64() as i64,
1484                justify_qc.data.leaf_commit.to_string(),
1485                &justify_qc_bytes,
1486            )],
1487        )
1488        .await?;
1489        let res = tx.commit().await;
1490        self.internal_metrics
1491            .internal_append_quorum2_duration
1492            .add_point(now.elapsed().as_secs_f64());
1493        res
1494    }
1495
1496    async fn load_upgrade_certificate(
1497        &self,
1498    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1499        let result = self
1500            .db
1501            .read()
1502            .await?
1503            .fetch_optional("SELECT * FROM upgrade_certificate where id = true")
1504            .await?;
1505
1506        result
1507            .map(|row| {
1508                let bytes: Vec<u8> = row.get("data");
1509                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1510            })
1511            .transpose()
1512    }
1513
1514    async fn store_upgrade_certificate(
1515        &self,
1516        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1517    ) -> anyhow::Result<()> {
1518        let certificate = match decided_upgrade_certificate {
1519            Some(cert) => cert,
1520            None => return Ok(()),
1521        };
1522        let upgrade_certificate_bytes =
1523            bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1524        let mut tx = self.db.write().await?;
1525        tx.upsert(
1526            "upgrade_certificate",
1527            ["id", "data"],
1528            ["id"],
1529            [(true, upgrade_certificate_bytes)],
1530        )
1531        .await?;
1532        tx.commit().await
1533    }
1534
1535    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1536        let batch_size: i64 = 10000;
1537        let mut tx = self.db.read().await?;
1538
1539        // The SQL migration populates the table name and sets a default value of 0 for migrated rows.
1540        // so, fetch_one() would always return a row
1541        // The number of migrated rows is updated after each batch insert.
1542        // This allows the types migration to resume from where it left off.
1543        let (is_completed, mut offset) = query_as::<(bool, i64)>(
1544            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'anchor_leaf'",
1545        )
1546        .fetch_one(tx.as_mut())
1547        .await?;
1548
1549        if is_completed {
1550            tracing::info!("decided leaves already migrated");
1551            return Ok(());
1552        }
1553
1554        tracing::warn!("migrating decided leaves..");
1555        loop {
1556            let mut tx = self.db.read().await?;
1557            let rows = query(
1558                "SELECT view, leaf, qc FROM anchor_leaf WHERE view >= $1 ORDER BY view LIMIT $2",
1559            )
1560            .bind(offset)
1561            .bind(batch_size)
1562            .fetch_all(tx.as_mut())
1563            .await?;
1564
1565            drop(tx);
1566            if rows.is_empty() {
1567                break;
1568            }
1569            let mut values = Vec::new();
1570
1571            for row in rows.iter() {
1572                let leaf: Vec<u8> = row.try_get("leaf")?;
1573                let qc: Vec<u8> = row.try_get("qc")?;
1574                let leaf1: Leaf = bincode::deserialize(&leaf)?;
1575                let qc1: QuorumCertificate<SeqTypes> = bincode::deserialize(&qc)?;
1576                let view: i64 = row.try_get("view")?;
1577
1578                let leaf2: Leaf2 = leaf1.into();
1579                let qc2: QuorumCertificate2<SeqTypes> = qc1.to_qc2();
1580
1581                let leaf2_bytes = bincode::serialize(&leaf2)?;
1582                let qc2_bytes = bincode::serialize(&qc2)?;
1583
1584                values.push((view, leaf2_bytes, qc2_bytes));
1585            }
1586
1587            let mut query_builder: sqlx::QueryBuilder<Db> =
1588                sqlx::QueryBuilder::new("INSERT INTO anchor_leaf2 (view, leaf, qc) ");
1589
1590            offset = values.last().context("last row")?.0;
1591
1592            query_builder.push_values(values, |mut b, (view, leaf, qc)| {
1593                b.push_bind(view).push_bind(leaf).push_bind(qc);
1594            });
1595
1596            // Offset tracking prevents duplicate inserts
1597            // Added as a safeguard.
1598            query_builder.push(" ON CONFLICT DO NOTHING");
1599
1600            let query = query_builder.build();
1601
1602            let mut tx = self.db.write().await?;
1603            query.execute(tx.as_mut()).await?;
1604
1605            tx.upsert(
1606                "epoch_migration",
1607                ["table_name", "completed", "migrated_rows"],
1608                ["table_name"],
1609                [("anchor_leaf".to_string(), false, offset)],
1610            )
1611            .await?;
1612            tx.commit().await?;
1613
1614            tracing::info!(
1615                "anchor leaf migration progress: rows={} offset={}",
1616                rows.len(),
1617                offset
1618            );
1619
1620            if rows.len() < batch_size as usize {
1621                break;
1622            }
1623        }
1624
1625        tracing::warn!("migrated decided leaves");
1626
1627        let mut tx = self.db.write().await?;
1628        tx.upsert(
1629            "epoch_migration",
1630            ["table_name", "completed", "migrated_rows"],
1631            ["table_name"],
1632            [("anchor_leaf".to_string(), true, offset)],
1633        )
1634        .await?;
1635        tx.commit().await?;
1636
1637        tracing::info!("updated epoch_migration table for anchor_leaf");
1638
1639        Ok(())
1640    }
1641
1642    async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1643        let batch_size: i64 = 10000;
1644        let mut tx = self.db.read().await?;
1645
1646        let (is_completed, mut offset) = query_as::<(bool, i64)>(
1647            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'da_proposal'",
1648        )
1649        .fetch_one(tx.as_mut())
1650        .await?;
1651
1652        if is_completed {
1653            tracing::info!("da proposals migration already done");
1654            return Ok(());
1655        }
1656
1657        tracing::warn!("migrating da proposals..");
1658
1659        loop {
1660            let mut tx = self.db.read().await?;
1661            let rows = query(
1662                "SELECT payload_hash, data FROM da_proposal WHERE view >= $1 ORDER BY view LIMIT \
1663                 $2",
1664            )
1665            .bind(offset)
1666            .bind(batch_size)
1667            .fetch_all(tx.as_mut())
1668            .await?;
1669
1670            drop(tx);
1671            if rows.is_empty() {
1672                break;
1673            }
1674            let mut values = Vec::new();
1675
1676            for row in rows.iter() {
1677                let data: Vec<u8> = row.try_get("data")?;
1678                let payload_hash: String = row.try_get("payload_hash")?;
1679
1680                let da_proposal: Proposal<SeqTypes, DaProposal<SeqTypes>> =
1681                    bincode::deserialize(&data)?;
1682                let da_proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
1683                    convert_proposal(da_proposal);
1684
1685                let view = da_proposal2.data.view_number.u64() as i64;
1686                let data = bincode::serialize(&da_proposal2)?;
1687
1688                values.push((view, payload_hash, data));
1689            }
1690
1691            let mut query_builder: sqlx::QueryBuilder<Db> =
1692                sqlx::QueryBuilder::new("INSERT INTO da_proposal2 (view, payload_hash, data) ");
1693
1694            offset = values.last().context("last row")?.0;
1695            query_builder.push_values(values, |mut b, (view, payload_hash, data)| {
1696                b.push_bind(view).push_bind(payload_hash).push_bind(data);
1697            });
1698            query_builder.push(" ON CONFLICT DO NOTHING");
1699            let query = query_builder.build();
1700
1701            let mut tx = self.db.write().await?;
1702            query.execute(tx.as_mut()).await?;
1703
1704            tx.upsert(
1705                "epoch_migration",
1706                ["table_name", "completed", "migrated_rows"],
1707                ["table_name"],
1708                [("da_proposal".to_string(), false, offset)],
1709            )
1710            .await?;
1711            tx.commit().await?;
1712
1713            tracing::info!(
1714                "DA proposals migration progress: rows={} offset={}",
1715                rows.len(),
1716                offset
1717            );
1718            if rows.len() < batch_size as usize {
1719                break;
1720            }
1721        }
1722
1723        tracing::warn!("migrated da proposals");
1724
1725        let mut tx = self.db.write().await?;
1726        tx.upsert(
1727            "epoch_migration",
1728            ["table_name", "completed", "migrated_rows"],
1729            ["table_name"],
1730            [("da_proposal".to_string(), true, offset)],
1731        )
1732        .await?;
1733        tx.commit().await?;
1734
1735        tracing::info!("updated epoch_migration table for da_proposal");
1736
1737        Ok(())
1738    }
1739
1740    async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1741        let batch_size: i64 = 10000;
1742
1743        let mut tx = self.db.read().await?;
1744
1745        let (is_completed, mut offset) = query_as::<(bool, i64)>(
1746            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'vid_share'",
1747        )
1748        .fetch_one(tx.as_mut())
1749        .await?;
1750
1751        if is_completed {
1752            tracing::info!("vid_share migration already done");
1753            return Ok(());
1754        }
1755
1756        tracing::warn!("migrating vid shares..");
1757        loop {
1758            let mut tx = self.db.read().await?;
1759            let rows = query(
1760                "SELECT payload_hash, data FROM vid_share WHERE view >= $1 ORDER BY view LIMIT $2",
1761            )
1762            .bind(offset)
1763            .bind(batch_size)
1764            .fetch_all(tx.as_mut())
1765            .await?;
1766
1767            drop(tx);
1768            if rows.is_empty() {
1769                break;
1770            }
1771            let mut values = Vec::new();
1772
1773            for row in rows.iter() {
1774                let data: Vec<u8> = row.try_get("data")?;
1775                let payload_hash: String = row.try_get("payload_hash")?;
1776
1777                let vid_share: Proposal<SeqTypes, VidDisperseShare0<SeqTypes>> =
1778                    bincode::deserialize(&data)?;
1779                let vid_share2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1780                    convert_proposal(vid_share);
1781
1782                let view = vid_share2.data.view_number().u64() as i64;
1783                let data = bincode::serialize(&vid_share2)?;
1784
1785                values.push((view, payload_hash, data));
1786            }
1787
1788            let mut query_builder: sqlx::QueryBuilder<Db> =
1789                sqlx::QueryBuilder::new("INSERT INTO vid_share2 (view, payload_hash, data) ");
1790
1791            offset = values.last().context("last row")?.0;
1792
1793            query_builder.push_values(values, |mut b, (view, payload_hash, data)| {
1794                b.push_bind(view).push_bind(payload_hash).push_bind(data);
1795            });
1796
1797            let query = query_builder.build();
1798
1799            let mut tx = self.db.write().await?;
1800            query.execute(tx.as_mut()).await?;
1801
1802            tx.upsert(
1803                "epoch_migration",
1804                ["table_name", "completed", "migrated_rows"],
1805                ["table_name"],
1806                [("vid_share".to_string(), false, offset)],
1807            )
1808            .await?;
1809            tx.commit().await?;
1810
1811            tracing::info!(
1812                "VID shares migration progress: rows={} offset={}",
1813                rows.len(),
1814                offset
1815            );
1816            if rows.len() < batch_size as usize {
1817                break;
1818            }
1819        }
1820
1821        tracing::warn!("migrated vid shares");
1822
1823        let mut tx = self.db.write().await?;
1824        tx.upsert(
1825            "epoch_migration",
1826            ["table_name", "completed", "migrated_rows"],
1827            ["table_name"],
1828            [("vid_share".to_string(), true, offset)],
1829        )
1830        .await?;
1831        tx.commit().await?;
1832
1833        tracing::info!("updated epoch_migration table for vid_share");
1834
1835        Ok(())
1836    }
1837
1838    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
1839        let batch_size: i64 = 10000;
1840        let mut tx = self.db.read().await?;
1841
1842        let (is_completed, mut offset) = query_as::<(bool, i64)>(
1843            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
1844             'quorum_proposals'",
1845        )
1846        .fetch_one(tx.as_mut())
1847        .await?;
1848
1849        if is_completed {
1850            tracing::info!("quorum proposals migration already done");
1851            return Ok(());
1852        }
1853
1854        tracing::warn!("migrating quorum proposals..");
1855
1856        loop {
1857            let mut tx = self.db.read().await?;
1858            let rows = query(
1859                "SELECT view, leaf_hash, data FROM quorum_proposals WHERE view >= $1 ORDER BY \
1860                 view LIMIT $2",
1861            )
1862            .bind(offset)
1863            .bind(batch_size)
1864            .fetch_all(tx.as_mut())
1865            .await?;
1866
1867            drop(tx);
1868
1869            if rows.is_empty() {
1870                break;
1871            }
1872
1873            let mut values = Vec::new();
1874
1875            for row in rows.iter() {
1876                let leaf_hash: String = row.try_get("leaf_hash")?;
1877                let data: Vec<u8> = row.try_get("data")?;
1878
1879                let quorum_proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
1880                    bincode::deserialize(&data)?;
1881                let quorum_proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1882                    convert_proposal(quorum_proposal);
1883
1884                let view = quorum_proposal2.data.view_number().u64() as i64;
1885                let data = bincode::serialize(&quorum_proposal2)?;
1886
1887                values.push((view, leaf_hash, data));
1888            }
1889
1890            let mut query_builder: sqlx::QueryBuilder<Db> =
1891                sqlx::QueryBuilder::new("INSERT INTO quorum_proposals2 (view, leaf_hash, data) ");
1892
1893            offset = values.last().context("last row")?.0;
1894            query_builder.push_values(values, |mut b, (view, leaf_hash, data)| {
1895                b.push_bind(view).push_bind(leaf_hash).push_bind(data);
1896            });
1897
1898            query_builder.push(" ON CONFLICT DO NOTHING");
1899
1900            let query = query_builder.build();
1901
1902            let mut tx = self.db.write().await?;
1903            query.execute(tx.as_mut()).await?;
1904
1905            tx.upsert(
1906                "epoch_migration",
1907                ["table_name", "completed", "migrated_rows"],
1908                ["table_name"],
1909                [("quorum_proposals".to_string(), false, offset)],
1910            )
1911            .await?;
1912            tx.commit().await?;
1913
1914            tracing::info!(
1915                "quorum proposals migration progress: rows={} offset={}",
1916                rows.len(),
1917                offset
1918            );
1919
1920            if rows.len() < batch_size as usize {
1921                break;
1922            }
1923        }
1924
1925        tracing::warn!("migrated quorum proposals");
1926
1927        let mut tx = self.db.write().await?;
1928        tx.upsert(
1929            "epoch_migration",
1930            ["table_name", "completed", "migrated_rows"],
1931            ["table_name"],
1932            [("quorum_proposals".to_string(), true, offset)],
1933        )
1934        .await?;
1935        tx.commit().await?;
1936
1937        tracing::info!("updated epoch_migration table for quorum_proposals");
1938
1939        Ok(())
1940    }
1941
1942    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
1943        let batch_size: i64 = 10000;
1944        let mut tx = self.db.read().await?;
1945
1946        let (is_completed, mut offset) = query_as::<(bool, i64)>(
1947            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
1948             'quorum_certificate'",
1949        )
1950        .fetch_one(tx.as_mut())
1951        .await?;
1952
1953        if is_completed {
1954            tracing::info!("quorum certificates migration already done");
1955            return Ok(());
1956        }
1957
1958        tracing::warn!("migrating quorum certificates..");
1959        loop {
1960            let mut tx = self.db.read().await?;
1961            let rows = query(
1962                "SELECT view, leaf_hash, data FROM quorum_certificate WHERE view >= $1 ORDER BY \
1963                 view LIMIT $2",
1964            )
1965            .bind(offset)
1966            .bind(batch_size)
1967            .fetch_all(tx.as_mut())
1968            .await?;
1969
1970            drop(tx);
1971            if rows.is_empty() {
1972                break;
1973            }
1974            let mut values = Vec::new();
1975
1976            for row in rows.iter() {
1977                let leaf_hash: String = row.try_get("leaf_hash")?;
1978                let data: Vec<u8> = row.try_get("data")?;
1979
1980                let qc: QuorumCertificate<SeqTypes> = bincode::deserialize(&data)?;
1981                let qc2: QuorumCertificate2<SeqTypes> = qc.to_qc2();
1982
1983                let view = qc2.view_number().u64() as i64;
1984                let data = bincode::serialize(&qc2)?;
1985
1986                values.push((view, leaf_hash, data));
1987            }
1988
1989            let mut query_builder: sqlx::QueryBuilder<Db> =
1990                sqlx::QueryBuilder::new("INSERT INTO quorum_certificate2 (view, leaf_hash, data) ");
1991
1992            offset = values.last().context("last row")?.0;
1993
1994            query_builder.push_values(values, |mut b, (view, leaf_hash, data)| {
1995                b.push_bind(view).push_bind(leaf_hash).push_bind(data);
1996            });
1997
1998            query_builder.push(" ON CONFLICT DO NOTHING");
1999            let query = query_builder.build();
2000
2001            let mut tx = self.db.write().await?;
2002            query.execute(tx.as_mut()).await?;
2003
2004            tx.upsert(
2005                "epoch_migration",
2006                ["table_name", "completed", "migrated_rows"],
2007                ["table_name"],
2008                [("quorum_certificate".to_string(), false, offset)],
2009            )
2010            .await?;
2011            tx.commit().await?;
2012
2013            tracing::info!(
2014                "Quorum certificates migration progress: rows={} offset={}",
2015                rows.len(),
2016                offset
2017            );
2018
2019            if rows.len() < batch_size as usize {
2020                break;
2021            }
2022        }
2023
2024        tracing::warn!("migrated quorum certificates");
2025
2026        let mut tx = self.db.write().await?;
2027        tx.upsert(
2028            "epoch_migration",
2029            ["table_name", "completed", "migrated_rows"],
2030            ["table_name"],
2031            [("quorum_certificate".to_string(), true, offset)],
2032        )
2033        .await?;
2034        tx.commit().await?;
2035        tracing::info!("updated epoch_migration table for quorum_certificate");
2036
2037        Ok(())
2038    }
2039
2040    async fn store_next_epoch_quorum_certificate(
2041        &self,
2042        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
2043    ) -> anyhow::Result<()> {
2044        let qc2_bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
2045        let mut tx = self.db.write().await?;
2046        tx.upsert(
2047            "next_epoch_quorum_certificate",
2048            ["id", "data"],
2049            ["id"],
2050            [(true, qc2_bytes)],
2051        )
2052        .await?;
2053        tx.commit().await
2054    }
2055
2056    async fn load_next_epoch_quorum_certificate(
2057        &self,
2058    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
2059        let result = self
2060            .db
2061            .read()
2062            .await?
2063            .fetch_optional("SELECT * FROM next_epoch_quorum_certificate where id = true")
2064            .await?;
2065
2066        result
2067            .map(|row| {
2068                let bytes: Vec<u8> = row.get("data");
2069                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
2070            })
2071            .transpose()
2072    }
2073
2074    async fn store_eqc(
2075        &self,
2076        high_qc: QuorumCertificate2<SeqTypes>,
2077        next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
2078    ) -> anyhow::Result<()> {
2079        let eqc_bytes =
2080            bincode::serialize(&(high_qc, next_epoch_high_qc)).context("serializing eqc")?;
2081        let mut tx = self.db.write().await?;
2082        tx.upsert("eqc", ["id", "data"], ["id"], [(true, eqc_bytes)])
2083            .await?;
2084        tx.commit().await
2085    }
2086
2087    async fn load_eqc(
2088        &self,
2089    ) -> Option<(
2090        QuorumCertificate2<SeqTypes>,
2091        NextEpochQuorumCertificate2<SeqTypes>,
2092    )> {
2093        let result = self
2094            .db
2095            .read()
2096            .await
2097            .ok()?
2098            .fetch_optional("SELECT * FROM eqc where id = true")
2099            .await
2100            .ok()?;
2101
2102        result
2103            .map(|row| {
2104                let bytes: Vec<u8> = row.get("data");
2105                bincode::deserialize(&bytes)
2106            })
2107            .transpose()
2108            .ok()?
2109    }
2110
2111    async fn append_da2(
2112        &self,
2113        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
2114        vid_commit: VidCommitment,
2115    ) -> anyhow::Result<()> {
2116        let data = &proposal.data;
2117        let view = data.view_number().u64();
2118        let data_bytes = bincode::serialize(proposal).unwrap();
2119
2120        let now = Instant::now();
2121        let mut tx = self.db.write().await?;
2122        tx.upsert(
2123            "da_proposal2",
2124            ["view", "data", "payload_hash"],
2125            ["view"],
2126            [(view as i64, data_bytes, vid_commit.to_string())],
2127        )
2128        .await?;
2129        let res = tx.commit().await;
2130        self.internal_metrics
2131            .internal_append_da2_duration
2132            .add_point(now.elapsed().as_secs_f64());
2133        res
2134    }
2135
2136    async fn store_drb_result(
2137        &self,
2138        epoch: EpochNumber,
2139        drb_result: DrbResult,
2140    ) -> anyhow::Result<()> {
2141        let drb_result_vec = Vec::from(drb_result);
2142        let mut tx = self.db.write().await?;
2143        tx.upsert(
2144            "epoch_drb_and_root",
2145            ["epoch", "drb_result"],
2146            ["epoch"],
2147            [(epoch.u64() as i64, drb_result_vec)],
2148        )
2149        .await?;
2150        tx.commit().await
2151    }
2152
2153    async fn store_epoch_root(
2154        &self,
2155        epoch: EpochNumber,
2156        block_header: <SeqTypes as NodeType>::BlockHeader,
2157    ) -> anyhow::Result<()> {
2158        let block_header_bytes =
2159            bincode::serialize(&block_header).context("serializing block header")?;
2160
2161        let mut tx = self.db.write().await?;
2162        tx.upsert(
2163            "epoch_drb_and_root",
2164            ["epoch", "block_header"],
2165            ["epoch"],
2166            [(epoch.u64() as i64, block_header_bytes)],
2167        )
2168        .await?;
2169        tx.commit().await
2170    }
2171
2172    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
2173        if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
2174            if loaded_drb_input.difficulty_level != drb_input.difficulty_level {
2175                tracing::error!("Overwriting {loaded_drb_input:?} in storage with {drb_input:?}");
2176            } else if loaded_drb_input.iteration >= drb_input.iteration {
2177                anyhow::bail!(
2178                    "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
2179                    loaded_drb_input,
2180                    drb_input
2181                )
2182            }
2183        }
2184
2185        let drb_input_bytes = bincode::serialize(&drb_input)
2186            .context("Failed to serialize DrbInput. This is not fatal, but should never happen.")?;
2187
2188        let mut tx = self.db.write().await?;
2189
2190        tx.upsert(
2191            "drb",
2192            ["epoch", "drb_input"],
2193            ["epoch"],
2194            [(drb_input.epoch as i64, drb_input_bytes)],
2195        )
2196        .await?;
2197        tx.commit().await
2198    }
2199
2200    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
2201        let row = self
2202            .db
2203            .read()
2204            .await?
2205            .fetch_optional(query("SELECT drb_input FROM drb WHERE epoch = $1").bind(epoch as i64))
2206            .await?;
2207
2208        match row {
2209            None => anyhow::bail!("No DrbInput for epoch {} in storage", epoch),
2210            Some(row) => {
2211                let drb_input_bytes: Vec<u8> = row.try_get("drb_input")?;
2212                let drb_input = bincode::deserialize(&drb_input_bytes)
2213                    .context("Failed to deserialize drb_input from storage")?;
2214
2215                Ok(drb_input)
2216            },
2217        }
2218    }
2219
2220    async fn add_state_cert(
2221        &self,
2222        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2223    ) -> anyhow::Result<()> {
2224        let state_cert_bytes = bincode::serialize(&state_cert)
2225            .context("serializing light client state update certificate")?;
2226
2227        let mut tx = self.db.write().await?;
2228        tx.upsert(
2229            "state_cert",
2230            ["view", "state_cert"],
2231            ["view"],
2232            [(
2233                state_cert.light_client_state.view_number as i64,
2234                state_cert_bytes,
2235            )],
2236        )
2237        .await?;
2238        tx.commit().await
2239    }
2240
2241    async fn load_state_cert(
2242        &self,
2243    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2244        let Some(row) = self
2245            .db
2246            .read()
2247            .await?
2248            .fetch_optional(
2249                "SELECT state_cert FROM finalized_state_cert ORDER BY epoch DESC LIMIT 1",
2250            )
2251            .await?
2252        else {
2253            return Ok(None);
2254        };
2255        let bytes: Vec<u8> = row.get("state_cert");
2256
2257        let cert = match bincode::deserialize(&bytes) {
2258            Ok(cert) => cert,
2259            Err(err) => {
2260                tracing::info!(
2261                    error = %err,
2262                    "Failed to deserialize state certificate with v2. attempting with v1"
2263                );
2264
2265                let v1_cert =
2266                    bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2267                        .with_context(|| {
2268                        format!("Failed to deserialize using both v1 and v2. error: {err}")
2269                    })?;
2270
2271                v1_cert.into()
2272            },
2273        };
2274
2275        Ok(Some(cert))
2276    }
2277
2278    async fn get_state_cert_by_epoch(
2279        &self,
2280        epoch: u64,
2281    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2282        let Some(row) = self
2283            .db
2284            .read()
2285            .await?
2286            .fetch_optional(
2287                query("SELECT state_cert FROM finalized_state_cert WHERE epoch = $1")
2288                    .bind(epoch as i64),
2289            )
2290            .await?
2291        else {
2292            return Ok(None);
2293        };
2294        let bytes: Vec<u8> = row.get("state_cert");
2295
2296        let cert = match bincode::deserialize(&bytes) {
2297            Ok(cert) => cert,
2298            Err(err) => {
2299                tracing::info!(
2300                    error = %err,
2301                    "Failed to deserialize state certificate with v2. attempting with v1"
2302                );
2303
2304                let v1_cert =
2305                    bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2306                        .with_context(|| {
2307                        format!("Failed to deserialize using both v1 and v2. error: {err}")
2308                    })?;
2309
2310                v1_cert.into()
2311            },
2312        };
2313
2314        Ok(Some(cert))
2315    }
2316
2317    async fn insert_state_cert(
2318        &self,
2319        epoch: u64,
2320        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2321    ) -> anyhow::Result<()> {
2322        let bytes = bincode::serialize(&cert)
2323            .with_context(|| format!("Failed to serialize state cert for epoch {epoch}"))?;
2324
2325        let mut tx = self.db.write().await?;
2326
2327        tx.upsert(
2328            "finalized_state_cert",
2329            ["epoch", "state_cert"],
2330            ["epoch"],
2331            [(epoch as i64, bytes)],
2332        )
2333        .await
2334    }
2335
2336    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
2337        let rows = self
2338            .db
2339            .read()
2340            .await?
2341            .fetch_all(
2342                query("SELECT * from epoch_drb_and_root ORDER BY epoch DESC LIMIT $1")
2343                    .bind(RECENT_STAKE_TABLES_LIMIT as i64),
2344            )
2345            .await?;
2346
2347        // reverse the rows vector to return the most recent epochs, but in ascending order
2348        rows.into_iter()
2349            .rev()
2350            .map(|row| {
2351                let epoch: i64 = row.try_get("epoch")?;
2352                let drb_result: Option<Vec<u8>> = row.try_get("drb_result")?;
2353                let block_header: Option<Vec<u8>> = row.try_get("block_header")?;
2354                if let Some(drb_result) = drb_result {
2355                    let drb_result_array = drb_result
2356                        .try_into()
2357                        .or_else(|_| bail!("invalid drb result"))?;
2358                    let block_header: Option<<SeqTypes as NodeType>::BlockHeader> = block_header
2359                        .map(|data| bincode::deserialize(&data))
2360                        .transpose()?;
2361                    Ok(Some(InitializerEpochInfo::<SeqTypes> {
2362                        epoch: <SeqTypes as NodeType>::Epoch::new(epoch as u64),
2363                        drb_result: drb_result_array,
2364                        block_header,
2365                    }))
2366                } else {
2367                    // Right now we skip the epoch_drb_and_root row if there is no drb result.
2368                    // This seems reasonable based on the expected order of events, but please double check!
2369                    Ok(None)
2370                }
2371            })
2372            .filter_map(|e| match e {
2373                Err(v) => Some(Err(v)),
2374                Ok(Some(v)) => Some(Ok(v)),
2375                Ok(None) => None,
2376            })
2377            .collect()
2378    }
2379
2380    fn enable_metrics(&mut self, metrics: &dyn Metrics) {
2381        self.internal_metrics = PersistenceMetricsValue::new(metrics);
2382    }
2383}
2384
2385#[async_trait]
2386impl MembershipPersistence for Persistence {
2387    async fn load_stake(
2388        &self,
2389        epoch: EpochNumber,
2390    ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>> {
2391        let result = self
2392            .db
2393            .read()
2394            .await?
2395            .fetch_optional(
2396                query(
2397                    "SELECT stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
2398                     epoch = $1",
2399                )
2400                .bind(epoch.u64() as i64),
2401            )
2402            .await?;
2403
2404        result
2405            .map(|row| {
2406                let stake_table_bytes: Vec<u8> = row.get("stake");
2407                let reward_bytes: Option<Vec<u8>> = row.get("block_reward");
2408                let stake_table_hash_bytes: Option<Vec<u8>> = row.get("stake_table_hash");
2409                let stake_table = bincode::deserialize(&stake_table_bytes)
2410                    .context("deserializing stake table")?;
2411                let reward: Option<RewardAmount> = reward_bytes
2412                    .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
2413                    .transpose()?;
2414                let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes
2415                    .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
2416                    .transpose()?;
2417
2418                Ok((stake_table, reward, stake_table_hash))
2419            })
2420            .transpose()
2421    }
2422
2423    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
2424        let mut tx = self.db.read().await?;
2425
2426        let rows = match query_as::<(i64, Vec<u8>, Option<Vec<u8>>, Option<Vec<u8>>)>(
2427            "SELECT epoch, stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
2428             stake is NOT NULL ORDER BY epoch DESC LIMIT $1",
2429        )
2430        .bind(limit as i64)
2431        .fetch_all(tx.as_mut())
2432        .await
2433        {
2434            Ok(bytes) => bytes,
2435            Err(err) => {
2436                tracing::error!("error loading stake tables: {err:#}");
2437                bail!("{err:#}");
2438            },
2439        };
2440
2441        let stakes: anyhow::Result<Vec<IndexedStake>> = rows
2442            .into_iter()
2443            .map(
2444                |(id, stake_bytes, reward_bytes_opt, stake_table_hash_bytes_opt)| {
2445                    let stake_table =
2446                        bincode::deserialize(&stake_bytes).context("deserializing stake table")?;
2447
2448                    let block_reward: Option<RewardAmount> = reward_bytes_opt
2449                        .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
2450                        .transpose()?;
2451
2452                    let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes_opt
2453                        .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
2454                        .transpose()?;
2455
2456                    Ok((
2457                        EpochNumber::new(id as u64),
2458                        (stake_table, block_reward),
2459                        stake_table_hash,
2460                    ))
2461                },
2462            )
2463            .collect();
2464
2465        Ok(Some(stakes?))
2466    }
2467
2468    async fn store_stake(
2469        &self,
2470        epoch: EpochNumber,
2471        stake: ValidatorMap,
2472        block_reward: Option<RewardAmount>,
2473        stake_table_hash: Option<StakeTableHash>,
2474    ) -> anyhow::Result<()> {
2475        let mut tx = self.db.write().await?;
2476
2477        let stake_table_bytes = bincode::serialize(&stake).context("serializing stake table")?;
2478        let reward_bytes = block_reward
2479            .map(|r| bincode::serialize(&r).context("serializing block reward"))
2480            .transpose()?;
2481        let stake_table_hash_bytes = stake_table_hash
2482            .map(|h| bincode::serialize(&h).context("serializing stake table hash"))
2483            .transpose()?;
2484        tx.upsert(
2485            "epoch_drb_and_root",
2486            ["epoch", "stake", "block_reward", "stake_table_hash"],
2487            ["epoch"],
2488            [(
2489                epoch.u64() as i64,
2490                stake_table_bytes,
2491                reward_bytes,
2492                stake_table_hash_bytes,
2493            )],
2494        )
2495        .await?;
2496        tx.commit().await
2497    }
2498
2499    async fn store_events(
2500        &self,
2501        l1_finalized: u64,
2502        events: Vec<(EventKey, StakeTableEvent)>,
2503    ) -> anyhow::Result<()> {
2504        if events.is_empty() {
2505            return Ok(());
2506        }
2507
2508        let mut tx = self.db.write().await?;
2509
2510        // check last l1 block if there is any
2511        let last_processed_l1_block = query_as::<(i64,)>(
2512            "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
2513        )
2514        .fetch_optional(tx.as_mut())
2515        .await?
2516        .map(|(l1,)| l1);
2517
2518        tracing::debug!("last l1 finalizes in database = {last_processed_l1_block:?}");
2519
2520        // skip events storage if the database already has higher l1 block events
2521        if last_processed_l1_block > Some(l1_finalized.try_into()?) {
2522            tracing::debug!(
2523                ?last_processed_l1_block,
2524                ?l1_finalized,
2525                ?events,
2526                "last l1 finalized stored is already higher"
2527            );
2528            return Ok(());
2529        }
2530
2531        let mut query_builder: sqlx::QueryBuilder<Db> =
2532            sqlx::QueryBuilder::new("INSERT INTO stake_table_events (l1_block, log_index, event) ");
2533
2534        let events = events
2535            .into_iter()
2536            .map(|((block_number, index), event)| {
2537                Ok((
2538                    i64::try_from(block_number)?,
2539                    i64::try_from(index)?,
2540                    serde_json::to_value(event).context("l1 event to value")?,
2541                ))
2542            })
2543            .collect::<anyhow::Result<Vec<_>>>()?;
2544
2545        query_builder.push_values(events, |mut b, (l1_block, log_index, event)| {
2546            b.push_bind(l1_block).push_bind(log_index).push_bind(event);
2547        });
2548
2549        query_builder.push(" ON CONFLICT DO NOTHING");
2550        let query = query_builder.build();
2551
2552        query.execute(tx.as_mut()).await?;
2553
2554        // update l1 block
2555        tx.upsert(
2556            "stake_table_events_l1_block",
2557            ["id", "last_l1_block"],
2558            ["id"],
2559            [(0_i32, l1_finalized as i64)],
2560        )
2561        .await?;
2562
2563        tx.commit().await?;
2564
2565        Ok(())
2566    }
2567
2568    /// Loads all events from persistent storage up to the specified L1 block.
2569    ///
2570    /// # Returns
2571    ///
2572    /// Returns a tuple containing:
2573    /// - `Option<u64>` - The queried L1 block for which all events have been successfully fetched.
2574    /// - `Vec<(EventKey, StakeTableEvent)>` - A list of events, where each entry is a tuple of the event key
2575    /// event key is (l1 block number, log index)
2576    ///   and the corresponding StakeTable event.
2577    ///
2578    async fn load_events(
2579        &self,
2580        from_l1_block: u64,
2581        to_l1_block: u64,
2582    ) -> anyhow::Result<(
2583        Option<EventsPersistenceRead>,
2584        Vec<(EventKey, StakeTableEvent)>,
2585    )> {
2586        let mut tx = self.db.read().await?;
2587
2588        // check last l1 block if there is any
2589        let res = query_as::<(i64,)>(
2590            "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
2591        )
2592        .fetch_optional(tx.as_mut())
2593        .await?;
2594
2595        let Some((last_processed_l1_block,)) = res else {
2596            // this just means we dont have any events stored
2597            return Ok((None, Vec::new()));
2598        };
2599
2600        // Determine the L1 block for querying events.
2601        // If the last stored L1 block is greater than the requested block, limit the query to the requested block.
2602        // Otherwise, query up to the last stored block.
2603        let to_l1_block = to_l1_block.try_into()?;
2604        let query_l1_block = if last_processed_l1_block > to_l1_block {
2605            to_l1_block
2606        } else {
2607            last_processed_l1_block
2608        };
2609
2610        let rows = query(
2611            "SELECT l1_block, log_index, event FROM stake_table_events WHERE $1 <= l1_block AND \
2612             l1_block <= $2 ORDER BY l1_block ASC, log_index ASC",
2613        )
2614        .bind(i64::try_from(from_l1_block)?)
2615        .bind(query_l1_block)
2616        .fetch_all(tx.as_mut())
2617        .await?;
2618
2619        let events = rows
2620            .into_iter()
2621            .map(|row| {
2622                let l1_block: i64 = row.try_get("l1_block")?;
2623                let log_index: i64 = row.try_get("log_index")?;
2624                let event = serde_json::from_value(row.try_get("event")?)?;
2625
2626                Ok(((l1_block.try_into()?, log_index.try_into()?), event))
2627            })
2628            .collect::<anyhow::Result<Vec<_>>>()?;
2629
2630        // Determine the read state based on the queried block range.
2631        // - If the persistence returned events up to the requested block, the read is complete.
2632        // - Otherwise, indicate that the read is up to the last processed block.
2633        if query_l1_block == to_l1_block {
2634            Ok((Some(EventsPersistenceRead::Complete), events))
2635        } else {
2636            Ok((
2637                Some(EventsPersistenceRead::UntilL1Block(
2638                    query_l1_block.try_into()?,
2639                )),
2640                events,
2641            ))
2642        }
2643    }
2644
2645    async fn store_all_validators(
2646        &self,
2647        epoch: EpochNumber,
2648        all_validators: ValidatorMap,
2649    ) -> anyhow::Result<()> {
2650        let mut tx = self.db.write().await?;
2651
2652        if all_validators.is_empty() {
2653            return Ok(());
2654        }
2655
2656        let mut query_builder =
2657            QueryBuilder::new("INSERT INTO stake_table_validators (epoch, address, validator) ");
2658
2659        query_builder.push_values(all_validators, |mut b, (address, validator)| {
2660            let validator_json =
2661                serde_json::to_value(&validator).expect("cannot serialize validator to json");
2662            b.push_bind(epoch.u64() as i64)
2663                .push_bind(address.to_string())
2664                .push_bind(validator_json);
2665        });
2666
2667        query_builder
2668            .push(" ON CONFLICT (epoch, address) DO UPDATE SET validator = EXCLUDED.validator");
2669
2670        let query = query_builder.build();
2671
2672        query.execute(tx.as_mut()).await?;
2673
2674        tx.commit().await?;
2675        Ok(())
2676    }
2677
2678    async fn load_all_validators(
2679        &self,
2680        epoch: EpochNumber,
2681        offset: u64,
2682        limit: u64,
2683    ) -> anyhow::Result<Vec<Validator<PubKey>>> {
2684        let mut tx = self.db.read().await?;
2685
2686        // Use LOWER(address) in ORDER BY to ensure consistent ordering for SQlite and Postgres.
2687        // Postgres sorts text case sensitively by default, while SQLite sorts case insensitively.
2688        // Applying LOWER() makes the result consistent.
2689        let rows = query(
2690            "SELECT address, validator
2691         FROM stake_table_validators
2692         WHERE epoch = $1
2693         ORDER BY LOWER(address) ASC
2694         LIMIT $2 OFFSET $3",
2695        )
2696        .bind(epoch.u64() as i64)
2697        .bind(limit as i64)
2698        .bind(offset as i64)
2699        .fetch_all(tx.as_mut())
2700        .await?;
2701        rows.into_iter()
2702            .map(|row| {
2703                let validator_json: serde_json::Value = row.try_get("validator")?;
2704                serde_json::from_value::<Validator<PubKey>>(validator_json).map_err(Into::into)
2705            })
2706            .collect()
2707    }
2708}
2709
2710#[async_trait]
2711impl DhtPersistentStorage for Persistence {
2712    /// Save the DHT to the database
2713    ///
2714    /// # Errors
2715    /// - If we fail to serialize the records
2716    /// - If we fail to write the serialized records to the DB
2717    async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
2718        // Bincode-serialize the records
2719        let to_save =
2720            bincode::serialize(&records).with_context(|| "failed to serialize records")?;
2721
2722        // Prepare the statement
2723        let stmt = "INSERT INTO libp2p_dht (id, serialized_records) VALUES (0, $1) ON CONFLICT \
2724                    (id) DO UPDATE SET serialized_records = $1";
2725
2726        // Execute the query
2727        let mut tx = self
2728            .db
2729            .write()
2730            .await
2731            .with_context(|| "failed to start an atomic DB transaction")?;
2732        tx.execute(query(stmt).bind(to_save))
2733            .await
2734            .with_context(|| "failed to execute DB query")?;
2735
2736        // Commit the state
2737        tx.commit().await.with_context(|| "failed to commit to DB")
2738    }
2739
2740    /// Load the DHT from the database
2741    ///
2742    /// # Errors
2743    /// - If we fail to read from the DB
2744    /// - If we fail to deserialize the records
2745    async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
2746        // Fetch the results from the DB
2747        let result = self
2748            .db
2749            .read()
2750            .await
2751            .with_context(|| "failed to start a DB read transaction")?
2752            .fetch_one("SELECT * FROM libp2p_dht where id = 0")
2753            .await
2754            .with_context(|| "failed to fetch from DB")?;
2755
2756        // Get the `serialized_records` row
2757        let serialied_records: Vec<u8> = result.get("serialized_records");
2758
2759        // Deserialize it
2760        let records: Vec<SerializableRecord> = bincode::deserialize(&serialied_records)
2761            .with_context(|| "Failed to deserialize records")?;
2762
2763        Ok(records)
2764    }
2765}
2766
2767#[async_trait]
2768impl Provider<SeqTypes, VidCommonRequest> for Persistence {
2769    #[tracing::instrument(skip(self))]
2770    async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
2771        let mut tx = match self.db.read().await {
2772            Ok(tx) => tx,
2773            Err(err) => {
2774                tracing::warn!("could not open transaction: {err:#}");
2775                return None;
2776            },
2777        };
2778
2779        let bytes = match query_as::<(Vec<u8>,)>(
2780            "SELECT data FROM vid_share2 WHERE payload_hash = $1 LIMIT 1",
2781        )
2782        .bind(req.0.to_string())
2783        .fetch_optional(tx.as_mut())
2784        .await
2785        {
2786            Ok(Some((bytes,))) => bytes,
2787            Ok(None) => return None,
2788            Err(err) => {
2789                tracing::error!("error loading VID share: {err:#}");
2790                return None;
2791            },
2792        };
2793
2794        let share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
2795            match bincode::deserialize(&bytes) {
2796                Ok(share) => share,
2797                Err(err) => {
2798                    tracing::warn!("error decoding VID share: {err:#}");
2799                    return None;
2800                },
2801            };
2802
2803        match share.data {
2804            VidDisperseShare::V0(vid) => Some(VidCommon::V0(vid.common)),
2805            VidDisperseShare::V1(vid) => Some(VidCommon::V1(vid.common)),
2806            VidDisperseShare::V2(vid) => Some(VidCommon::V2(vid.common)),
2807        }
2808    }
2809}
2810
2811#[async_trait]
2812impl Provider<SeqTypes, PayloadRequest> for Persistence {
2813    #[tracing::instrument(skip(self))]
2814    async fn fetch(&self, req: PayloadRequest) -> Option<Payload> {
2815        let mut tx = match self.db.read().await {
2816            Ok(tx) => tx,
2817            Err(err) => {
2818                tracing::warn!("could not open transaction: {err:#}");
2819                return None;
2820            },
2821        };
2822
2823        let bytes = match query_as::<(Vec<u8>,)>(
2824            "SELECT data FROM da_proposal2 WHERE payload_hash = $1 LIMIT 1",
2825        )
2826        .bind(req.0.to_string())
2827        .fetch_optional(tx.as_mut())
2828        .await
2829        {
2830            Ok(Some((bytes,))) => bytes,
2831            Ok(None) => return None,
2832            Err(err) => {
2833                tracing::warn!("error loading DA proposal: {err:#}");
2834                return None;
2835            },
2836        };
2837
2838        let proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> = match bincode::deserialize(&bytes)
2839        {
2840            Ok(proposal) => proposal,
2841            Err(err) => {
2842                tracing::error!("error decoding DA proposal: {err:#}");
2843                return None;
2844            },
2845        };
2846
2847        Some(Payload::from_bytes(
2848            &proposal.data.encoded_transactions,
2849            &proposal.data.metadata,
2850        ))
2851    }
2852}
2853
2854#[async_trait]
2855impl Provider<SeqTypes, LeafRequest<SeqTypes>> for Persistence {
2856    #[tracing::instrument(skip(self))]
2857    async fn fetch(&self, req: LeafRequest<SeqTypes>) -> Option<LeafQueryData<SeqTypes>> {
2858        let mut tx = match self.db.read().await {
2859            Ok(tx) => tx,
2860            Err(err) => {
2861                tracing::warn!("could not open transaction: {err:#}");
2862                return None;
2863            },
2864        };
2865
2866        let (leaf, qc) = match fetch_leaf_from_proposals(&mut tx, req).await {
2867            Ok(res) => res?,
2868            Err(err) => {
2869                tracing::info!("requested leaf not found in undecided proposals: {err:#}");
2870                return None;
2871            },
2872        };
2873
2874        match LeafQueryData::new(leaf, qc) {
2875            Ok(leaf) => Some(leaf),
2876            Err(err) => {
2877                tracing::warn!("fetched invalid leaf: {err:#}");
2878                None
2879            },
2880        }
2881    }
2882}
2883
2884async fn fetch_leaf_from_proposals<Mode: TransactionMode>(
2885    tx: &mut Transaction<Mode>,
2886    req: LeafRequest<SeqTypes>,
2887) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
2888    // Look for a quorum proposal corresponding to this leaf.
2889    let Some((proposal_bytes,)) =
2890        query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE leaf_hash = $1 LIMIT 1")
2891            .bind(req.expected_leaf.to_string())
2892            .fetch_optional(tx.as_mut())
2893            .await
2894            .context("fetching proposal")?
2895    else {
2896        return Ok(None);
2897    };
2898
2899    // Look for a QC corresponding to this leaf.
2900    let Some((qc_bytes,)) =
2901        query_as::<(Vec<u8>,)>("SELECT data FROM quorum_certificate2 WHERE leaf_hash = $1 LIMIT 1")
2902            .bind(req.expected_leaf.to_string())
2903            .fetch_optional(tx.as_mut())
2904            .await
2905            .context("fetching QC")?
2906    else {
2907        return Ok(None);
2908    };
2909
2910    let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
2911        bincode::deserialize(&proposal_bytes).context("deserializing quorum proposal")?;
2912    let qc: QuorumCertificate2<SeqTypes> =
2913        bincode::deserialize(&qc_bytes).context("deserializing quorum certificate")?;
2914
2915    let leaf = Leaf2::from_quorum_proposal(&proposal.data);
2916    Ok(Some((leaf, qc)))
2917}
2918
2919#[cfg(test)]
2920mod testing {
2921    use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
2922
2923    use super::*;
2924    use crate::persistence::tests::TestablePersistence;
2925
2926    #[async_trait]
2927    impl TestablePersistence for Persistence {
2928        type Storage = Arc<TmpDb>;
2929
2930        async fn tmp_storage() -> Self::Storage {
2931            Arc::new(TmpDb::init().await)
2932        }
2933
2934        #[allow(refining_impl_trait)]
2935        fn options(db: &Self::Storage) -> Options {
2936            #[cfg(not(feature = "embedded-db"))]
2937            {
2938                PostgresOptions {
2939                    port: Some(db.port()),
2940                    host: Some(db.host()),
2941                    user: Some("postgres".into()),
2942                    password: Some("password".into()),
2943                    ..Default::default()
2944                }
2945                .into()
2946            }
2947
2948            #[cfg(feature = "embedded-db")]
2949            {
2950                SqliteOptions { path: db.path() }.into()
2951            }
2952        }
2953    }
2954}
2955
2956#[cfg(test)]
2957mod test {
2958    use committable::{Commitment, CommitmentBoundsArkless};
2959    use espresso_types::{traits::NullEventConsumer, Header, Leaf, NodeState, ValidatedState};
2960    use futures::stream::TryStreamExt;
2961    use hotshot_example_types::node_types::TestVersions;
2962    use hotshot_types::{
2963        data::{
2964            ns_table::parse_ns_table, vid_disperse::AvidMDisperseShare, EpochNumber,
2965            QuorumProposal2,
2966        },
2967        message::convert_proposal,
2968        simple_certificate::QuorumCertificate,
2969        simple_vote::QuorumData,
2970        traits::{
2971            block_contents::{BlockHeader, GENESIS_VID_NUM_STORAGE_NODES},
2972            node_implementation::Versions,
2973            signature_key::SignatureKey,
2974            EncodeBytes,
2975        },
2976        utils::EpochTransitionIndicator,
2977        vid::{
2978            advz::advz_scheme,
2979            avidm::{init_avidm_param, AvidMScheme},
2980        },
2981    };
2982    use jf_advz::VidScheme;
2983    use vbs::version::StaticVersionType;
2984
2985    use super::*;
2986    use crate::{persistence::tests::TestablePersistence as _, BLSPubKey, PubKey};
2987
2988    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2989    async fn test_quorum_proposals_leaf_hash_migration() {
2990        // Create some quorum proposals to test with.
2991        let leaf: Leaf2 =
2992            Leaf::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock())
2993                .await
2994                .into();
2995        let privkey = BLSPubKey::generated_from_seed_indexed([0; 32], 1).1;
2996        let signature = PubKey::sign(&privkey, &[]).unwrap();
2997        let mut quorum_proposal = Proposal {
2998            data: QuorumProposal2::<SeqTypes> {
2999                epoch: None,
3000                block_header: leaf.block_header().clone(),
3001                view_number: ViewNumber::genesis(),
3002                justify_qc: QuorumCertificate::genesis::<TestVersions>(
3003                    &ValidatedState::default(),
3004                    &NodeState::mock(),
3005                )
3006                .await
3007                .to_qc2(),
3008                upgrade_certificate: None,
3009                view_change_evidence: None,
3010                next_drb_result: None,
3011                next_epoch_justify_qc: None,
3012                state_cert: None,
3013            },
3014            signature,
3015            _pd: Default::default(),
3016        };
3017
3018        let qp1: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
3019            convert_proposal(quorum_proposal.clone());
3020
3021        quorum_proposal.data.view_number = ViewNumber::new(1);
3022
3023        let qp2: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
3024            convert_proposal(quorum_proposal.clone());
3025        let qps = [qp1, qp2];
3026
3027        // Create persistence and add the quorum proposals with NULL leaf hash.
3028        let db = Persistence::tmp_storage().await;
3029        let persistence = Persistence::connect(&db).await;
3030        let mut tx = persistence.db.write().await.unwrap();
3031        let params = qps
3032            .iter()
3033            .map(|qp| {
3034                (
3035                    qp.data.view_number.u64() as i64,
3036                    bincode::serialize(&qp).unwrap(),
3037                )
3038            })
3039            .collect::<Vec<_>>();
3040        tx.upsert("quorum_proposals", ["view", "data"], ["view"], params)
3041            .await
3042            .unwrap();
3043        tx.commit().await.unwrap();
3044
3045        // Create a new persistence and ensure the commitments get populated.
3046        let persistence = Persistence::connect(&db).await;
3047        let mut tx = persistence.db.read().await.unwrap();
3048        let rows = tx
3049            .fetch("SELECT * FROM quorum_proposals ORDER BY view ASC")
3050            .try_collect::<Vec<_>>()
3051            .await
3052            .unwrap();
3053        assert_eq!(rows.len(), qps.len());
3054        for (row, qp) in rows.into_iter().zip(qps) {
3055            assert_eq!(row.get::<i64, _>("view"), qp.data.view_number.u64() as i64);
3056            assert_eq!(
3057                row.get::<Vec<u8>, _>("data"),
3058                bincode::serialize(&qp).unwrap()
3059            );
3060            assert_eq!(
3061                row.get::<String, _>("leaf_hash"),
3062                Committable::commit(&Leaf::from_quorum_proposal(&qp.data)).to_string()
3063            );
3064        }
3065    }
3066
3067    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3068    async fn test_fetching_providers() {
3069        let tmp = Persistence::tmp_storage().await;
3070        let storage = Persistence::connect(&tmp).await;
3071
3072        // Mock up some data.
3073        let leaf =
3074            Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
3075        let leaf_payload = leaf.block_payload().unwrap();
3076        let leaf_payload_bytes_arc = leaf_payload.encode();
3077
3078        let avidm_param = init_avidm_param(2).unwrap();
3079        let weights = vec![1u32; 2];
3080
3081        let ns_table = parse_ns_table(
3082            leaf_payload.byte_len().as_usize(),
3083            &leaf_payload.ns_table().encode(),
3084        );
3085        let (payload_commitment, shares) =
3086            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
3087                .unwrap();
3088        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
3089        let vid_share = convert_proposal(
3090            AvidMDisperseShare::<SeqTypes> {
3091                view_number: ViewNumber::new(0),
3092                payload_commitment,
3093                share: shares[0].clone(),
3094                recipient_key: pubkey,
3095                epoch: None,
3096                target_epoch: None,
3097                common: avidm_param.clone(),
3098            }
3099            .to_proposal(&privkey)
3100            .unwrap()
3101            .clone(),
3102        );
3103
3104        let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
3105            proposal: QuorumProposal2::<SeqTypes> {
3106                block_header: leaf.block_header().clone(),
3107                view_number: leaf.view_number(),
3108                justify_qc: leaf.justify_qc(),
3109                upgrade_certificate: None,
3110                view_change_evidence: None,
3111                next_drb_result: None,
3112                next_epoch_justify_qc: None,
3113                epoch: None,
3114                state_cert: None,
3115            },
3116        };
3117        let quorum_proposal_signature =
3118            BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3119                .expect("Failed to sign quorum proposal");
3120        let quorum_proposal = Proposal {
3121            data: quorum_proposal,
3122            signature: quorum_proposal_signature,
3123            _pd: Default::default(),
3124        };
3125
3126        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
3127            .expect("Failed to sign block payload");
3128        let da_proposal = Proposal {
3129            data: DaProposal2::<SeqTypes> {
3130                encoded_transactions: leaf_payload_bytes_arc,
3131                metadata: leaf_payload.ns_table().clone(),
3132                view_number: ViewNumber::new(0),
3133                epoch: None,
3134                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
3135            },
3136            signature: block_payload_signature,
3137            _pd: Default::default(),
3138        };
3139
3140        let mut next_quorum_proposal = quorum_proposal.clone();
3141        next_quorum_proposal.data.proposal.view_number += 1;
3142        next_quorum_proposal.data.proposal.justify_qc.view_number += 1;
3143        next_quorum_proposal
3144            .data
3145            .proposal
3146            .justify_qc
3147            .data
3148            .leaf_commit = Committable::commit(&leaf.clone());
3149        let qc = next_quorum_proposal.data.justify_qc();
3150
3151        // Add to database.
3152        storage
3153            .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
3154            .await
3155            .unwrap();
3156        storage.append_vid(&vid_share).await.unwrap();
3157        storage
3158            .append_quorum_proposal2(&quorum_proposal)
3159            .await
3160            .unwrap();
3161
3162        // Add an extra quorum proposal so we have a QC pointing back at `leaf`.
3163        storage
3164            .append_quorum_proposal2(&next_quorum_proposal)
3165            .await
3166            .unwrap();
3167
3168        // Fetch it as if we were rebuilding an archive.
3169        assert_eq!(
3170            Some(VidCommon::V1(avidm_param)),
3171            storage
3172                .fetch(VidCommonRequest(vid_share.data.payload_commitment()))
3173                .await
3174        );
3175        assert_eq!(
3176            leaf_payload,
3177            storage
3178                .fetch(PayloadRequest(vid_share.data.payload_commitment()))
3179                .await
3180                .unwrap()
3181        );
3182        assert_eq!(
3183            LeafQueryData::new(leaf.clone(), qc.clone()).unwrap(),
3184            storage
3185                .fetch(LeafRequest::new(
3186                    leaf.block_header().block_number(),
3187                    Committable::commit(&leaf),
3188                    qc.clone().commit()
3189                ))
3190                .await
3191                .unwrap()
3192        );
3193    }
3194
3195    /// Test conditions that trigger pruning.
3196    ///
3197    /// This is a configurable test that can be used to test different configurations of GC,
3198    /// `pruning_opt`. The test populates the database with some data for view 1, asserts that it is
3199    /// retained for view 2, and then asserts that it is pruned by view 3. There are various
3200    /// different configurations that can achieve this behavior, such that the data is retained and
3201    /// then pruned due to different logic and code paths.
3202    async fn test_pruning_helper(pruning_opt: ConsensusPruningOptions) {
3203        let tmp = Persistence::tmp_storage().await;
3204        let mut opt = Persistence::options(&tmp);
3205        opt.consensus_pruning = pruning_opt;
3206        let storage = opt.create().await.unwrap();
3207
3208        let data_view = ViewNumber::new(1);
3209
3210        // Populate some data.
3211        let leaf =
3212            Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
3213        let leaf_payload = leaf.block_payload().unwrap();
3214        let leaf_payload_bytes_arc = leaf_payload.encode();
3215
3216        let avidm_param = init_avidm_param(2).unwrap();
3217        let weights = vec![1u32; 2];
3218
3219        let ns_table = parse_ns_table(
3220            leaf_payload.byte_len().as_usize(),
3221            &leaf_payload.ns_table().encode(),
3222        );
3223        let (payload_commitment, shares) =
3224            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
3225                .unwrap();
3226
3227        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
3228        let vid = convert_proposal(
3229            AvidMDisperseShare::<SeqTypes> {
3230                view_number: data_view,
3231                payload_commitment,
3232                share: shares[0].clone(),
3233                recipient_key: pubkey,
3234                epoch: None,
3235                target_epoch: None,
3236                common: avidm_param,
3237            }
3238            .to_proposal(&privkey)
3239            .unwrap()
3240            .clone(),
3241        );
3242        let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
3243            proposal: QuorumProposal2::<SeqTypes> {
3244                epoch: None,
3245                block_header: leaf.block_header().clone(),
3246                view_number: data_view,
3247                justify_qc: QuorumCertificate2::genesis::<TestVersions>(
3248                    &ValidatedState::default(),
3249                    &NodeState::mock(),
3250                )
3251                .await,
3252                upgrade_certificate: None,
3253                view_change_evidence: None,
3254                next_drb_result: None,
3255                next_epoch_justify_qc: None,
3256                state_cert: None,
3257            },
3258        };
3259        let quorum_proposal_signature =
3260            BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3261                .expect("Failed to sign quorum proposal");
3262        let quorum_proposal = Proposal {
3263            data: quorum_proposal,
3264            signature: quorum_proposal_signature,
3265            _pd: Default::default(),
3266        };
3267
3268        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
3269            .expect("Failed to sign block payload");
3270        let da_proposal = Proposal {
3271            data: DaProposal2::<SeqTypes> {
3272                encoded_transactions: leaf_payload_bytes_arc.clone(),
3273                metadata: leaf_payload.ns_table().clone(),
3274                view_number: data_view,
3275                epoch: Some(EpochNumber::new(0)),
3276                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
3277            },
3278            signature: block_payload_signature,
3279            _pd: Default::default(),
3280        };
3281
3282        tracing::info!(?vid, ?da_proposal, ?quorum_proposal, "append data");
3283        storage.append_vid(&vid).await.unwrap();
3284        storage
3285            .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
3286            .await
3287            .unwrap();
3288        storage
3289            .append_quorum_proposal2(&quorum_proposal)
3290            .await
3291            .unwrap();
3292
3293        // The first decide doesn't trigger any garbage collection, even though our usage exceeds
3294        // the target, because of the minimum retention.
3295        tracing::info!("decide view 1");
3296        storage
3297            .append_decided_leaves(data_view + 1, [], None, &NullEventConsumer)
3298            .await
3299            .unwrap();
3300        assert_eq!(
3301            storage.load_vid_share(data_view).await.unwrap().unwrap(),
3302            vid
3303        );
3304        assert_eq!(
3305            storage.load_da_proposal(data_view).await.unwrap().unwrap(),
3306            da_proposal
3307        );
3308        assert_eq!(
3309            storage.load_quorum_proposal(data_view).await.unwrap(),
3310            quorum_proposal
3311        );
3312
3313        // After another view, our data is beyond the minimum retention (though not the target
3314        // retention) so it gets pruned.
3315        tracing::info!("decide view 2");
3316        storage
3317            .append_decided_leaves(data_view + 2, [], None, &NullEventConsumer)
3318            .await
3319            .unwrap();
3320        assert!(storage.load_vid_share(data_view).await.unwrap().is_none(),);
3321        assert!(storage.load_da_proposal(data_view).await.unwrap().is_none());
3322        storage.load_quorum_proposal(data_view).await.unwrap_err();
3323    }
3324
3325    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3326    async fn test_pruning_minimum_retention() {
3327        test_pruning_helper(ConsensusPruningOptions {
3328            // Use a very low target usage, to show that we still retain data up to the minimum
3329            // retention even when usage is above target.
3330            target_usage: 0,
3331            minimum_retention: 1,
3332            // Use a very high target retention, so that pruning is only triggered by the minimum
3333            // retention.
3334            target_retention: u64::MAX,
3335        })
3336        .await
3337    }
3338
3339    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3340    async fn test_pruning_target_retention() {
3341        test_pruning_helper(ConsensusPruningOptions {
3342            target_retention: 1,
3343            // Use a very low minimum retention, so that data is only kept around due to the target
3344            // retention.
3345            minimum_retention: 0,
3346            // Use a very high target usage, so that pruning is only triggered by the target
3347            // retention.
3348            target_usage: u64::MAX,
3349        })
3350        .await
3351    }
3352
3353    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3354    async fn test_consensus_migration() {
3355        let tmp = Persistence::tmp_storage().await;
3356        let mut opt = Persistence::options(&tmp);
3357
3358        let storage = opt.create().await.unwrap();
3359
3360        let rows = 300;
3361
3362        assert!(storage.load_state_cert().await.unwrap().is_none());
3363
3364        for i in 0..rows {
3365            let view = ViewNumber::new(i);
3366            let validated_state = ValidatedState::default();
3367            let instance_state = NodeState::default();
3368
3369            let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
3370            let (payload, metadata) =
3371                Payload::from_transactions([], &validated_state, &instance_state)
3372                    .await
3373                    .unwrap();
3374
3375            let payload_bytes = payload.encode();
3376
3377            let block_header =
3378                Header::genesis::<TestVersions>(&instance_state, payload.clone(), &metadata);
3379
3380            let null_quorum_data = QuorumData {
3381                leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
3382            };
3383
3384            let justify_qc = QuorumCertificate::new(
3385                null_quorum_data.clone(),
3386                null_quorum_data.commit(),
3387                view,
3388                None,
3389                std::marker::PhantomData,
3390            );
3391
3392            let quorum_proposal = QuorumProposal {
3393                block_header,
3394                view_number: view,
3395                justify_qc: justify_qc.clone(),
3396                upgrade_certificate: None,
3397                proposal_certificate: None,
3398            };
3399
3400            let quorum_proposal_signature =
3401                BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3402                    .expect("Failed to sign quorum proposal");
3403
3404            let proposal = Proposal {
3405                data: quorum_proposal.clone(),
3406                signature: quorum_proposal_signature,
3407                _pd: std::marker::PhantomData,
3408            };
3409
3410            let proposal_bytes = bincode::serialize(&proposal)
3411                .context("serializing proposal")
3412                .unwrap();
3413
3414            let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
3415            leaf.fill_block_payload::<TestVersions>(
3416                payload,
3417                GENESIS_VID_NUM_STORAGE_NODES,
3418                <TestVersions as Versions>::Base::VERSION,
3419            )
3420            .unwrap();
3421
3422            let mut tx = storage.db.write().await.unwrap();
3423
3424            let qc_bytes = bincode::serialize(&justify_qc).unwrap();
3425            let leaf_bytes = bincode::serialize(&leaf).unwrap();
3426
3427            tx.upsert(
3428                "anchor_leaf",
3429                ["view", "leaf", "qc"],
3430                ["view"],
3431                [(i as i64, leaf_bytes, qc_bytes)],
3432            )
3433            .await
3434            .unwrap();
3435
3436            let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
3437                epoch: EpochNumber::new(i),
3438                light_client_state: Default::default(), // filling arbitrary value
3439                next_stake_table_state: Default::default(), // filling arbitrary value
3440                signatures: vec![],                     // filling arbitrary value
3441                auth_root: Default::default(),
3442            };
3443            // manually upsert the state cert to the finalized database
3444            let state_cert_bytes = bincode::serialize(&state_cert).unwrap();
3445            tx.upsert(
3446                "finalized_state_cert",
3447                ["epoch", "state_cert"],
3448                ["epoch"],
3449                [(i as i64, state_cert_bytes)],
3450            )
3451            .await
3452            .unwrap();
3453
3454            tx.commit().await.unwrap();
3455
3456            let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
3457                .disperse(payload_bytes.clone())
3458                .unwrap();
3459
3460            let vid = VidDisperseShare0::<SeqTypes> {
3461                view_number: ViewNumber::new(i),
3462                payload_commitment: Default::default(),
3463                share: disperse.shares[0].clone(),
3464                common: disperse.common,
3465                recipient_key: pubkey,
3466            };
3467
3468            let (payload, metadata) =
3469                Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
3470                    .await
3471                    .unwrap();
3472
3473            let da = DaProposal::<SeqTypes> {
3474                encoded_transactions: payload.encode(),
3475                metadata,
3476                view_number: ViewNumber::new(i),
3477            };
3478
3479            let block_payload_signature =
3480                BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
3481
3482            let da_proposal = Proposal {
3483                data: da,
3484                signature: block_payload_signature,
3485                _pd: Default::default(),
3486            };
3487
3488            storage
3489                .append_vid(&convert_proposal(vid.to_proposal(&privkey).unwrap()))
3490                .await
3491                .unwrap();
3492            storage
3493                .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
3494                .await
3495                .unwrap();
3496
3497            let leaf_hash = Committable::commit(&leaf);
3498            let mut tx = storage.db.write().await.expect("failed to start write tx");
3499            tx.upsert(
3500                "quorum_proposals",
3501                ["view", "leaf_hash", "data"],
3502                ["view"],
3503                [(i as i64, leaf_hash.to_string(), proposal_bytes)],
3504            )
3505            .await
3506            .expect("failed to upsert quorum proposal");
3507
3508            let justify_qc = &proposal.data.justify_qc;
3509            let justify_qc_bytes = bincode::serialize(&justify_qc)
3510                .context("serializing QC")
3511                .unwrap();
3512            tx.upsert(
3513                "quorum_certificate",
3514                ["view", "leaf_hash", "data"],
3515                ["view"],
3516                [(
3517                    justify_qc.view_number.u64() as i64,
3518                    justify_qc.data.leaf_commit.to_string(),
3519                    &justify_qc_bytes,
3520                )],
3521            )
3522            .await
3523            .expect("failed to upsert qc");
3524
3525            tx.commit().await.expect("failed to commit");
3526        }
3527
3528        storage.migrate_storage().await.unwrap();
3529
3530        let mut tx = storage.db.read().await.unwrap();
3531        let (anchor_leaf2_count,) = query_as::<(i64,)>("SELECT COUNT(*) from anchor_leaf2")
3532            .fetch_one(tx.as_mut())
3533            .await
3534            .unwrap();
3535        assert_eq!(
3536            anchor_leaf2_count, rows as i64,
3537            "anchor leaf count does not match rows",
3538        );
3539
3540        let (da_proposal_count,) = query_as::<(i64,)>("SELECT COUNT(*) from da_proposal2")
3541            .fetch_one(tx.as_mut())
3542            .await
3543            .unwrap();
3544        assert_eq!(
3545            da_proposal_count, rows as i64,
3546            "da proposal count does not match rows",
3547        );
3548
3549        let (vid_share_count,) = query_as::<(i64,)>("SELECT COUNT(*) from vid_share2")
3550            .fetch_one(tx.as_mut())
3551            .await
3552            .unwrap();
3553        assert_eq!(
3554            vid_share_count, rows as i64,
3555            "vid share count does not match rows"
3556        );
3557
3558        let (quorum_proposals_count,) =
3559            query_as::<(i64,)>("SELECT COUNT(*) from quorum_proposals2")
3560                .fetch_one(tx.as_mut())
3561                .await
3562                .unwrap();
3563        assert_eq!(
3564            quorum_proposals_count, rows as i64,
3565            "quorum proposals count does not match rows",
3566        );
3567
3568        let (quorum_certificates_count,) =
3569            query_as::<(i64,)>("SELECT COUNT(*) from quorum_certificate2")
3570                .fetch_one(tx.as_mut())
3571                .await
3572                .unwrap();
3573        assert_eq!(
3574            quorum_certificates_count, rows as i64,
3575            "quorum certificates count does not match rows",
3576        );
3577
3578        let (state_cert_count,) = query_as::<(i64,)>("SELECT COUNT(*) from finalized_state_cert")
3579            .fetch_one(tx.as_mut())
3580            .await
3581            .unwrap();
3582        assert_eq!(
3583            state_cert_count, rows as i64,
3584            "Light client state update certificates count does not match rows",
3585        );
3586        assert_eq!(
3587            storage.load_state_cert().await.unwrap().unwrap(),
3588            LightClientStateUpdateCertificateV2::<SeqTypes> {
3589                epoch: EpochNumber::new(rows - 1),
3590                light_client_state: Default::default(),
3591                next_stake_table_state: Default::default(),
3592                signatures: vec![],
3593                auth_root: Default::default(),
3594            },
3595            "Wrong light client state update certificate in the storage",
3596        );
3597
3598        storage.migrate_storage().await.unwrap();
3599    }
3600}
3601
3602#[cfg(test)]
3603#[cfg(not(feature = "embedded-db"))]
3604mod postgres_tests {
3605    use espresso_types::{FeeAccount, Header, Leaf, NodeState, Transaction as Tx};
3606    use hotshot_example_types::node_types::TestVersions;
3607    use hotshot_query_service::{
3608        availability::BlockQueryData, data_source::storage::UpdateAvailabilityStorage,
3609    };
3610    use hotshot_types::{
3611        data::vid_commitment,
3612        simple_certificate::QuorumCertificate,
3613        traits::{
3614            block_contents::{BlockHeader, BuilderFee, GENESIS_VID_NUM_STORAGE_NODES},
3615            election::Membership,
3616            signature_key::BuilderSignatureKey,
3617            EncodeBytes,
3618        },
3619    };
3620
3621    use super::*;
3622    use crate::persistence::tests::TestablePersistence as _;
3623
3624    async fn test_postgres_read_ns_table(instance_state: NodeState) {
3625        instance_state
3626            .coordinator
3627            .membership()
3628            .write()
3629            .await
3630            .set_first_epoch(EpochNumber::genesis(), Default::default());
3631
3632        let tmp = Persistence::tmp_storage().await;
3633        let mut opt = Persistence::options(&tmp);
3634        let storage = opt.create().await.unwrap();
3635
3636        let txs = [
3637            Tx::new(10001u32.into(), vec![1, 2, 3]),
3638            Tx::new(10001u32.into(), vec![4, 5, 6]),
3639            Tx::new(10009u32.into(), vec![7, 8, 9]),
3640        ];
3641
3642        let validated_state = Default::default();
3643        let justify_qc =
3644            QuorumCertificate::genesis::<TestVersions>(&validated_state, &instance_state).await;
3645        let view_number: ViewNumber = justify_qc.view_number + 1;
3646        let parent_leaf = Leaf::genesis::<TestVersions>(&validated_state, &instance_state)
3647            .await
3648            .into();
3649
3650        let (payload, ns_table) =
3651            Payload::from_transactions(txs.clone(), &validated_state, &instance_state)
3652                .await
3653                .unwrap();
3654        let payload_bytes = payload.encode();
3655        let payload_commitment = vid_commitment::<TestVersions>(
3656            &payload_bytes,
3657            &ns_table.encode(),
3658            GENESIS_VID_NUM_STORAGE_NODES,
3659            instance_state.current_version,
3660        );
3661        let builder_commitment = payload.builder_commitment(&ns_table);
3662        let (fee_account, fee_key) = FeeAccount::generated_from_seed_indexed([0; 32], 0);
3663        let fee_amount = 0;
3664        let fee_signature = FeeAccount::sign_fee(&fee_key, fee_amount, &ns_table).unwrap();
3665        let block_header = Header::new(
3666            &validated_state,
3667            &instance_state,
3668            &parent_leaf,
3669            payload_commitment,
3670            builder_commitment,
3671            ns_table,
3672            BuilderFee {
3673                fee_amount,
3674                fee_account,
3675                fee_signature,
3676            },
3677            instance_state.current_version,
3678            view_number.u64(),
3679        )
3680        .await
3681        .unwrap();
3682        let proposal = QuorumProposal {
3683            block_header: block_header.clone(),
3684            view_number,
3685            justify_qc: justify_qc.clone(),
3686            upgrade_certificate: None,
3687            proposal_certificate: None,
3688        };
3689        let leaf: Leaf2 = Leaf::from_quorum_proposal(&proposal).into();
3690        let mut qc = justify_qc.to_qc2();
3691        qc.data.leaf_commit = leaf.commit();
3692        qc.view_number = view_number;
3693
3694        let mut tx = storage.db.write().await.unwrap();
3695        tx.insert_leaf(LeafQueryData::new(leaf, qc).unwrap())
3696            .await
3697            .unwrap();
3698        tx.insert_block(BlockQueryData::<SeqTypes>::new(block_header, payload))
3699            .await
3700            .unwrap();
3701        tx.commit().await.unwrap();
3702
3703        let mut tx = storage.db.read().await.unwrap();
3704        let rows = query(
3705            "
3706            SELECT ns_id, read_ns_id(get_ns_table(h.data), t.ns_index) AS read_ns_id
3707              FROM header AS h
3708              JOIN transactions AS t ON t.block_height = h.height
3709              ORDER BY t.ns_index, t.position
3710        ",
3711        )
3712        .fetch_all(tx.as_mut())
3713        .await
3714        .unwrap();
3715        assert_eq!(rows.len(), txs.len());
3716        for (i, row) in rows.into_iter().enumerate() {
3717            let ns = u64::from(txs[i].namespace()) as i64;
3718            assert_eq!(row.get::<i64, _>("ns_id"), ns);
3719            assert_eq!(row.get::<i64, _>("read_ns_id"), ns);
3720        }
3721    }
3722
3723    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3724    async fn test_postgres_read_ns_table_v0_1() {
3725        test_postgres_read_ns_table(NodeState::mock()).await;
3726    }
3727
3728    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3729    async fn test_postgres_read_ns_table_v0_2() {
3730        test_postgres_read_ns_table(NodeState::mock_v2()).await;
3731    }
3732
3733    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3734    async fn test_postgres_read_ns_table_v0_3() {
3735        test_postgres_read_ns_table(NodeState::mock_v3().with_epoch_height(0)).await;
3736    }
3737}