hotshot_query_service/data_source/
update.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! A generic algorithm for updating a HotShot Query Service data source with new data.
14use std::iter::once;
15
16use anyhow::{ensure, Context};
17use async_trait::async_trait;
18use futures::future::Future;
19use hotshot::types::{Event, EventType};
20use hotshot_types::{
21    data::{ns_table::parse_ns_table, Leaf2, VidCommitment, VidDisperseShare, VidShare},
22    event::LeafInfo,
23    traits::{
24        block_contents::{BlockHeader, BlockPayload, EncodeBytes, GENESIS_VID_NUM_STORAGE_NODES},
25        node_implementation::{ConsensusTime, NodeType},
26    },
27    vid::{
28        advz::advz_scheme,
29        avidm::{init_avidm_param, AvidMScheme},
30    },
31};
32use jf_vid::VidScheme;
33
34use crate::{
35    availability::{
36        BlockInfo, BlockQueryData, LeafQueryData, QueryableHeader, QueryablePayload,
37        StateCertQueryDataV2, UpdateAvailabilityData, VidCommonQueryData,
38    },
39    Header, Payload, VidCommon,
40};
41
42/// An extension trait for types which implement the update trait for each API module.
43///
44/// If a type implements [UpdateAvailabilityData] and
45/// [UpdateStatusData](crate::status::UpdateStatusData), then it can be fully kept up to date
46/// through two interfaces:
47/// * [populate_metrics](crate::status::UpdateStatusData::populate_metrics), to get a handle for
48///   populating the status metrics, which should be used when initializing a
49///   [SystemContextHandle](hotshot::types::SystemContextHandle)
50/// * [update](Self::update), provided by this extension trait, to update the query state when a new
51///   HotShot event is emitted
52#[async_trait]
53pub trait UpdateDataSource<Types: NodeType>: UpdateAvailabilityData<Types> {
54    /// Update query state based on a new consensus event.
55    ///
56    /// The caller is responsible for authenticating `event`. This function does not perform any
57    /// authentication, and if given an invalid `event` (one which does not follow from the latest
58    /// known state of the ledger) it may panic or silently accept the invalid `event`. This allows
59    /// the best possible performance in the case where the query service and the HotShot instance
60    /// are running in the same process (and thus the event stream, directly from HotShot) is
61    /// trusted.
62    ///
63    /// If you want to update the data source with an untrusted event, for example one received from
64    /// a peer over the network, you must authenticate it first.
65    ///
66    /// # Returns
67    ///
68    /// If all provided data is successfully inserted into the database, returns `Ok(())`. If any
69    /// error occurred, the error is logged, and the return value is the height of the first leaf
70    /// which failed to be inserted.
71    async fn update(&self, event: &Event<Types>) -> Result<(), u64>;
72}
73
74#[async_trait]
75impl<Types: NodeType, T> UpdateDataSource<Types> for T
76where
77    T: UpdateAvailabilityData<Types> + Send + Sync,
78    Header<Types>: QueryableHeader<Types>,
79    Payload<Types>: QueryablePayload<Types>,
80{
81    async fn update(&self, event: &Event<Types>) -> Result<(), u64> {
82        if let EventType::Decide { leaf_chain, qc, .. } = &event.event {
83            // `qc` justifies the first (most recent) leaf...
84            let qcs = once((**qc).clone())
85                // ...and each leaf in the chain justifies the subsequent leaf (its parent) through
86                // `leaf.justify_qc`.
87                .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc()))
88                // Put the QCs in chronological order.
89                .rev()
90                // The oldest QC is the `justify_qc` of the oldest leaf, which does not justify any
91                // leaf in the new chain, so we don't need it.
92                .skip(1);
93            for (
94                qc2,
95                LeafInfo {
96                    leaf: leaf2,
97                    vid_share,
98                    state_cert,
99                    ..
100                },
101            ) in qcs.zip(leaf_chain.iter().rev())
102            {
103                let height = leaf2.block_header().block_number();
104
105                let leaf_data = match LeafQueryData::new(leaf2.clone(), qc2.clone()) {
106                    Ok(leaf) => leaf,
107                    Err(err) => {
108                        tracing::error!(
109                            height,
110                            ?leaf2,
111                            ?qc,
112                            "inconsistent leaf; cannot append leaf information: {err:#}"
113                        );
114                        return Err(leaf2.block_header().block_number());
115                    },
116                };
117                let block_data = leaf2
118                    .block_payload()
119                    .map(|payload| BlockQueryData::new(leaf2.block_header().clone(), payload));
120                if block_data.is_none() {
121                    tracing::info!(height, "block not available at decide");
122                }
123
124                let (vid_common, vid_share) = match vid_share {
125                    Some(VidDisperseShare::V0(share)) => (
126                        Some(VidCommonQueryData::new(
127                            leaf2.block_header().clone(),
128                            VidCommon::V0(share.common.clone()),
129                        )),
130                        Some(VidShare::V0(share.share.clone())),
131                    ),
132                    Some(VidDisperseShare::V1(share)) => (
133                        Some(VidCommonQueryData::new(
134                            leaf2.block_header().clone(),
135                            VidCommon::V1(share.common.clone()),
136                        )),
137                        Some(VidShare::V1(share.share.clone())),
138                    ),
139                    None => {
140                        if leaf2.view_number().u64() == 0 {
141                            // HotShot does not run VID in consensus for the genesis block. In this case,
142                            // the block payload is guaranteed to always be empty, so VID isn't really
143                            // necessary. But for consistency, we will still store the VID dispersal data,
144                            // computing it ourselves based on the well-known genesis VID commitment.
145                            match genesis_vid(leaf2) {
146                                Ok((common, share)) => (Some(common), Some(share)),
147                                Err(err) => {
148                                    tracing::warn!("failed to compute genesis VID: {err:#}");
149                                    (None, None)
150                                },
151                            }
152                        } else {
153                            (None, None)
154                        }
155                    },
156                };
157
158                if vid_common.is_none() {
159                    tracing::info!(height, "VID not available at decide");
160                }
161
162                if let Err(err) = self
163                    .append(BlockInfo::new(
164                        leaf_data,
165                        block_data,
166                        vid_common,
167                        vid_share,
168                        state_cert.clone().map(StateCertQueryDataV2),
169                    ))
170                    .await
171                {
172                    tracing::error!(height, "failed to append leaf information: {err:#}");
173                    return Err(leaf2.block_header().block_number());
174                }
175            }
176        }
177        Ok(())
178    }
179}
180
181fn genesis_vid<Types: NodeType>(
182    leaf: &Leaf2<Types>,
183) -> anyhow::Result<(VidCommonQueryData<Types>, VidShare)> {
184    let payload = Payload::<Types>::empty().0;
185    let bytes = payload.encode();
186
187    match leaf.block_header().payload_commitment() {
188        VidCommitment::V0(commit) => {
189            let mut disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
190                .disperse(bytes)
191                .context("unable to compute VID dispersal for genesis block")?;
192
193            ensure!(
194                disperse.commit == commit,
195                "computed VID commit {} for genesis block does not match header commit {}",
196                disperse.commit,
197                commit
198            );
199            Ok((
200                VidCommonQueryData::new(
201                    leaf.block_header().clone(),
202                    VidCommon::V0(disperse.common),
203                ),
204                VidShare::V0(disperse.shares.remove(0)),
205            ))
206        },
207        VidCommitment::V1(commit) => {
208            let avidm_param = init_avidm_param(GENESIS_VID_NUM_STORAGE_NODES)?;
209            let weights = vec![1; GENESIS_VID_NUM_STORAGE_NODES];
210            let ns_table = parse_ns_table(bytes.len(), &leaf.block_header().metadata().encode());
211
212            let (calculated_commit, mut shares) =
213                AvidMScheme::ns_disperse(&avidm_param, &weights, &bytes, ns_table).unwrap();
214
215            ensure!(
216                calculated_commit == commit,
217                "computed VID commit {} for genesis block does not match header commit {}",
218                calculated_commit,
219                commit
220            );
221
222            Ok((
223                VidCommonQueryData::new(leaf.block_header().clone(), VidCommon::V1(avidm_param)),
224                VidShare::V1(shares.remove(0)),
225            ))
226        },
227    }
228}
229
230/// A data source with an atomic transaction-based synchronization interface.
231///
232/// Changes are made to a versioned data source through a [`Transaction`]. Any changes made in a
233/// [`Transaction`] are initially visible only when queried through that same [`Transaction`]. They
234/// are not immediately written back to storage, which means that a new data source object opened
235/// against the same persistent storage will not reflect the changes. In particular, this means that
236/// if the process restarts and reopens its storage, uncommitted changes will be lost.
237///
238/// Only when a [`Transaction`] is committed are changes written back to storage, synchronized with
239/// any concurrent changes, and made visible to other connections to the same data source.
240pub trait VersionedDataSource: Send + Sync {
241    /// A transaction which can read and modify the data source.
242    type Transaction<'a>: Transaction
243    where
244        Self: 'a;
245
246    type ReadOnly<'a>: Transaction
247    where
248        Self: 'a;
249
250    /// Start an atomic transaction on the data source.
251    fn write(&self) -> impl Future<Output = anyhow::Result<Self::Transaction<'_>>> + Send;
252
253    /// Start a read-only transaction on the data source.
254    ///
255    /// A read-only transaction allows the owner to string together multiple queries of the data
256    /// source, which otherwise would not be atomic with respect to concurrent writes, in an atomic
257    /// fashion. Upon returning, [`read`](Self::read) locks in a fully consistent snapshot of the
258    /// data source, and any read operations performed upon the transaction thereafter read from the
259    /// same consistent snapshot. Concurrent modifications to the data source may occur (for
260    /// example, from concurrent [`write`](Self::write) transactions being committed), but their
261    /// results will not be reflected in a successful read-only transaction which was opened before
262    /// the write was committed.
263    ///
264    /// Read-only transactions do not need to be committed, and reverting has no effect.
265    fn read(&self) -> impl Future<Output = anyhow::Result<Self::ReadOnly<'_>>> + Send;
266}
267
268/// A unit of atomicity for updating a shared data source.
269///
270/// The methods provided by this trait can be used to write such pending changes back to persistent
271/// storage ([commit](Self::commit)) so that they become visible to other clients of the same
272/// underlying storage, and are saved if the process restarts. It also allows pending changes to be
273/// rolled back ([revert](Self::revert)) so that they are never written back to storage and are no
274/// longer reflected even through the data source object which was used to make the changes.
275pub trait Transaction: Send + Sync + Sized {
276    fn commit(self) -> impl Future<Output = anyhow::Result<()>> + Send;
277    fn revert(self) -> impl Future + Send;
278}