hotshot_query_service/data_source/
update.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! A generic algorithm for updating a HotShot Query Service data source with new data.
14use std::iter::once;
15
16use anyhow::{Context, ensure};
17use async_trait::async_trait;
18use futures::future::Future;
19use hotshot::types::{Event, EventType};
20use hotshot_types::{
21    data::{Leaf2, VidCommitment, VidCommon, VidDisperseShare, VidShare, ns_table::parse_ns_table},
22    event::LeafInfo,
23    traits::{
24        block_contents::{BlockHeader, BlockPayload, EncodeBytes, GENESIS_VID_NUM_STORAGE_NODES},
25        node_implementation::NodeType,
26    },
27    vid::{
28        advz::advz_scheme,
29        avidm::{AvidMScheme, init_avidm_param},
30        avidm_gf2::{AvidmGf2Scheme, init_avidm_gf2_param},
31    },
32    vote::HasViewNumber,
33};
34use jf_advz::VidScheme;
35
36use crate::{
37    Header, Payload,
38    availability::{
39        BlockInfo, BlockQueryData, LeafQueryData, QueryableHeader, QueryablePayload,
40        UpdateAvailabilityData, VidCommonQueryData,
41    },
42};
43
44/// An extension trait for types which implement the update trait for each API module.
45///
46/// If a type implements [UpdateAvailabilityData] and
47/// [UpdateStatusData](crate::status::UpdateStatusData), then it can be fully kept up to date
48/// through two interfaces:
49/// * [populate_metrics](crate::status::UpdateStatusData::populate_metrics), to get a handle for
50///   populating the status metrics, which should be used when initializing a
51///   [SystemContextHandle](hotshot::types::SystemContextHandle)
52/// * [update](Self::update), provided by this extension trait, to update the query state when a new
53///   HotShot event is emitted
54#[async_trait]
55pub trait UpdateDataSource<Types: NodeType>: UpdateAvailabilityData<Types> {
56    /// Update query state based on a new consensus event.
57    ///
58    /// The caller is responsible for authenticating `event`. This function does not perform any
59    /// authentication, and if given an invalid `event` (one which does not follow from the latest
60    /// known state of the ledger) it may panic or silently accept the invalid `event`. This allows
61    /// the best possible performance in the case where the query service and the HotShot instance
62    /// are running in the same process (and thus the event stream, directly from HotShot) is
63    /// trusted.
64    ///
65    /// If you want to update the data source with an untrusted event, for example one received from
66    /// a peer over the network, you must authenticate it first.
67    ///
68    /// # Returns
69    ///
70    /// If all provided data is successfully inserted into the database, returns `Ok(())`. If any
71    /// error occurred, the error is logged, and the return value is the height of the first leaf
72    /// which failed to be inserted.
73    async fn update(&self, event: &Event<Types>) -> Result<(), u64>;
74}
75
76#[async_trait]
77impl<Types: NodeType, T> UpdateDataSource<Types> for T
78where
79    T: UpdateAvailabilityData<Types> + Send + Sync,
80    Header<Types>: QueryableHeader<Types>,
81    Payload<Types>: QueryablePayload<Types>,
82{
83    async fn update(&self, event: &Event<Types>) -> Result<(), u64> {
84        if let EventType::Decide {
85            leaf_chain,
86            committing_qc,
87            deciding_qc,
88            ..
89        } = &event.event
90        {
91            // `qc` justifies the first (most recent) leaf...
92            let qcs = once(committing_qc.qc().clone())
93                // ...and each leaf in the chain justifies the subsequent leaf (its parent) through
94                // `leaf.justify_qc`.
95                .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc()))
96                // Put the QCs in chronological order.
97                .rev()
98                // The oldest QC is the `justify_qc` of the oldest leaf, which does not justify any
99                // leaf in the new chain, so we don't need it.
100                .skip(1);
101            for (
102                qc2,
103                LeafInfo {
104                    leaf: leaf2,
105                    vid_share,
106                    state_cert: _,
107                    ..
108                },
109            ) in qcs.zip(leaf_chain.iter().rev())
110            {
111                let height = leaf2.block_header().block_number();
112
113                let leaf_data = match LeafQueryData::new(leaf2.clone(), qc2.clone()) {
114                    Ok(leaf) => leaf,
115                    Err(err) => {
116                        tracing::error!(
117                            height,
118                            ?leaf2,
119                            ?committing_qc,
120                            "inconsistent leaf; cannot append leaf information: {err:#}"
121                        );
122                        return Err(leaf2.block_header().block_number());
123                    },
124                };
125                let block_data = leaf2
126                    .block_payload()
127                    .map(|payload| BlockQueryData::new(leaf2.block_header().clone(), payload));
128                if block_data.is_none() {
129                    tracing::info!(height, "block not available at decide");
130                }
131
132                let (vid_common, vid_share) = match vid_share {
133                    Some(VidDisperseShare::V0(share)) => (
134                        Some(VidCommonQueryData::new(
135                            leaf2.block_header().clone(),
136                            VidCommon::V0(share.common.clone()),
137                        )),
138                        Some(VidShare::V0(share.share.clone())),
139                    ),
140                    Some(VidDisperseShare::V1(share)) => (
141                        Some(VidCommonQueryData::new(
142                            leaf2.block_header().clone(),
143                            VidCommon::V1(share.common.clone()),
144                        )),
145                        Some(VidShare::V1(share.share.clone())),
146                    ),
147                    Some(VidDisperseShare::V2(share)) => (
148                        Some(VidCommonQueryData::new(
149                            leaf2.block_header().clone(),
150                            VidCommon::V2(share.common.clone()),
151                        )),
152                        Some(VidShare::V2(share.share.clone())),
153                    ),
154                    None => {
155                        if leaf2.view_number().u64() == 0 {
156                            // HotShot does not run VID in consensus for the genesis block. In this case,
157                            // the block payload is guaranteed to always be empty, so VID isn't really
158                            // necessary. But for consistency, we will still store the VID dispersal data,
159                            // computing it ourselves based on the well-known genesis VID commitment.
160                            match genesis_vid(leaf2) {
161                                Ok((common, share)) => (Some(common), Some(share)),
162                                Err(err) => {
163                                    tracing::warn!("failed to compute genesis VID: {err:#}");
164                                    (None, None)
165                                },
166                            }
167                        } else {
168                            (None, None)
169                        }
170                    },
171                };
172
173                if vid_common.is_none() {
174                    tracing::info!(height, "VID not available at decide");
175                }
176
177                let mut info = BlockInfo::new(leaf_data, block_data, vid_common, vid_share);
178                if let Some(deciding_qc) = deciding_qc
179                    && committing_qc.view_number() == info.leaf.leaf().view_number()
180                {
181                    let qc_chain = [committing_qc.as_ref().clone(), deciding_qc.as_ref().clone()];
182                    info = info.with_qc_chain(qc_chain);
183                }
184                if let Err(err) = self.append(info).await {
185                    tracing::error!(height, "failed to append leaf information: {err:#}");
186                    return Err(leaf2.block_header().block_number());
187                }
188            }
189        }
190        Ok(())
191    }
192}
193
194fn genesis_vid<Types: NodeType>(
195    leaf: &Leaf2<Types>,
196) -> anyhow::Result<(VidCommonQueryData<Types>, VidShare)> {
197    let payload = Payload::<Types>::empty().0;
198    let bytes = payload.encode();
199
200    match leaf.block_header().payload_commitment() {
201        VidCommitment::V0(commit) => {
202            let mut disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
203                .disperse(bytes)
204                .context("unable to compute VID dispersal for genesis block")?;
205
206            ensure!(
207                disperse.commit == commit,
208                "computed VID commit {} for genesis block does not match header commit {}",
209                disperse.commit,
210                commit
211            );
212            Ok((
213                VidCommonQueryData::new(
214                    leaf.block_header().clone(),
215                    VidCommon::V0(disperse.common),
216                ),
217                VidShare::V0(disperse.shares.remove(0)),
218            ))
219        },
220        VidCommitment::V1(commit) => {
221            let avidm_param = init_avidm_param(GENESIS_VID_NUM_STORAGE_NODES)?;
222            let weights = vec![1; GENESIS_VID_NUM_STORAGE_NODES];
223            let ns_table = parse_ns_table(bytes.len(), &leaf.block_header().metadata().encode());
224
225            let (calculated_commit, mut shares) =
226                AvidMScheme::ns_disperse(&avidm_param, &weights, &bytes, ns_table).unwrap();
227
228            ensure!(
229                calculated_commit == commit,
230                "computed VID commit {} for genesis block does not match header commit {}",
231                calculated_commit,
232                commit
233            );
234
235            Ok((
236                VidCommonQueryData::new(leaf.block_header().clone(), VidCommon::V1(avidm_param)),
237                VidShare::V1(shares.remove(0)),
238            ))
239        },
240        VidCommitment::V2(commit) => {
241            let avidm_gf2_param = init_avidm_gf2_param(GENESIS_VID_NUM_STORAGE_NODES)?;
242            let weights = vec![1; GENESIS_VID_NUM_STORAGE_NODES];
243            let ns_table = parse_ns_table(bytes.len(), &leaf.block_header().metadata().encode());
244
245            let (calculated_commit, common, mut shares) =
246                AvidmGf2Scheme::ns_disperse(&avidm_gf2_param, &weights, &bytes, ns_table).unwrap();
247
248            ensure!(
249                calculated_commit == commit,
250                "computed VID commit {} for genesis block does not match header commit {}",
251                calculated_commit,
252                commit
253            );
254
255            Ok((
256                VidCommonQueryData::new(leaf.block_header().clone(), VidCommon::V2(common)),
257                VidShare::V2(shares.remove(0)),
258            ))
259        },
260    }
261}
262
263/// A data source with an atomic transaction-based synchronization interface.
264///
265/// Changes are made to a versioned data source through a [`Transaction`]. Any changes made in a
266/// [`Transaction`] are initially visible only when queried through that same [`Transaction`]. They
267/// are not immediately written back to storage, which means that a new data source object opened
268/// against the same persistent storage will not reflect the changes. In particular, this means that
269/// if the process restarts and reopens its storage, uncommitted changes will be lost.
270///
271/// Only when a [`Transaction`] is committed are changes written back to storage, synchronized with
272/// any concurrent changes, and made visible to other connections to the same data source.
273pub trait VersionedDataSource: Send + Sync {
274    /// A transaction which can read and modify the data source.
275    type Transaction<'a>: Transaction
276    where
277        Self: 'a;
278
279    type ReadOnly<'a>: Transaction
280    where
281        Self: 'a;
282
283    /// Start an atomic transaction on the data source.
284    fn write(&self) -> impl Future<Output = anyhow::Result<Self::Transaction<'_>>> + Send;
285
286    /// Start a read-only transaction on the data source.
287    ///
288    /// A read-only transaction allows the owner to string together multiple queries of the data
289    /// source, which otherwise would not be atomic with respect to concurrent writes, in an atomic
290    /// fashion. Upon returning, [`read`](Self::read) locks in a fully consistent snapshot of the
291    /// data source, and any read operations performed upon the transaction thereafter read from the
292    /// same consistent snapshot. Concurrent modifications to the data source may occur (for
293    /// example, from concurrent [`write`](Self::write) transactions being committed), but their
294    /// results will not be reflected in a successful read-only transaction which was opened before
295    /// the write was committed.
296    ///
297    /// Read-only transactions do not need to be committed, and reverting has no effect.
298    fn read(&self) -> impl Future<Output = anyhow::Result<Self::ReadOnly<'_>>> + Send;
299}
300
301/// A unit of atomicity for updating a shared data source.
302///
303/// The methods provided by this trait can be used to write such pending changes back to persistent
304/// storage ([commit](Self::commit)) so that they become visible to other clients of the same
305/// underlying storage, and are saved if the process restarts. It also allows pending changes to be
306/// rolled back ([revert](Self::revert)) so that they are never written back to storage and are no
307/// longer reflected even through the data source object which was used to make the changes.
308pub trait Transaction: Send + Sync + Sized {
309    fn commit(self) -> impl Future<Output = anyhow::Result<()>> + Send;
310    fn revert(self) -> impl Future + Send;
311}