hotshot_query_service/data_source/update.rs
1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! A generic algorithm for updating a HotShot Query Service data source with new data.
14use std::iter::once;
15
16use anyhow::{ensure, Context};
17use async_trait::async_trait;
18use futures::future::Future;
19use hotshot::types::{Event, EventType};
20use hotshot_types::{
21 data::{ns_table::parse_ns_table, Leaf2, VidCommitment, VidDisperseShare, VidShare},
22 event::LeafInfo,
23 traits::{
24 block_contents::{BlockHeader, BlockPayload, EncodeBytes, GENESIS_VID_NUM_STORAGE_NODES},
25 node_implementation::{ConsensusTime, NodeType},
26 },
27 vid::{
28 advz::advz_scheme,
29 avidm::{init_avidm_param, AvidMScheme},
30 },
31 vote::HasViewNumber,
32};
33use jf_advz::VidScheme;
34
35use crate::{
36 availability::{
37 BlockInfo, BlockQueryData, LeafQueryData, QueryableHeader, QueryablePayload,
38 UpdateAvailabilityData, VidCommonQueryData,
39 },
40 Header, Payload, VidCommon,
41};
42
43/// An extension trait for types which implement the update trait for each API module.
44///
45/// If a type implements [UpdateAvailabilityData] and
46/// [UpdateStatusData](crate::status::UpdateStatusData), then it can be fully kept up to date
47/// through two interfaces:
48/// * [populate_metrics](crate::status::UpdateStatusData::populate_metrics), to get a handle for
49/// populating the status metrics, which should be used when initializing a
50/// [SystemContextHandle](hotshot::types::SystemContextHandle)
51/// * [update](Self::update), provided by this extension trait, to update the query state when a new
52/// HotShot event is emitted
53#[async_trait]
54pub trait UpdateDataSource<Types: NodeType>: UpdateAvailabilityData<Types> {
55 /// Update query state based on a new consensus event.
56 ///
57 /// The caller is responsible for authenticating `event`. This function does not perform any
58 /// authentication, and if given an invalid `event` (one which does not follow from the latest
59 /// known state of the ledger) it may panic or silently accept the invalid `event`. This allows
60 /// the best possible performance in the case where the query service and the HotShot instance
61 /// are running in the same process (and thus the event stream, directly from HotShot) is
62 /// trusted.
63 ///
64 /// If you want to update the data source with an untrusted event, for example one received from
65 /// a peer over the network, you must authenticate it first.
66 ///
67 /// # Returns
68 ///
69 /// If all provided data is successfully inserted into the database, returns `Ok(())`. If any
70 /// error occurred, the error is logged, and the return value is the height of the first leaf
71 /// which failed to be inserted.
72 async fn update(&self, event: &Event<Types>) -> Result<(), u64>;
73}
74
75#[async_trait]
76impl<Types: NodeType, T> UpdateDataSource<Types> for T
77where
78 T: UpdateAvailabilityData<Types> + Send + Sync,
79 Header<Types>: QueryableHeader<Types>,
80 Payload<Types>: QueryablePayload<Types>,
81{
82 async fn update(&self, event: &Event<Types>) -> Result<(), u64> {
83 if let EventType::Decide {
84 leaf_chain,
85 committing_qc,
86 deciding_qc,
87 ..
88 } = &event.event
89 {
90 // `qc` justifies the first (most recent) leaf...
91 let qcs = once(committing_qc.qc().clone())
92 // ...and each leaf in the chain justifies the subsequent leaf (its parent) through
93 // `leaf.justify_qc`.
94 .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc()))
95 // Put the QCs in chronological order.
96 .rev()
97 // The oldest QC is the `justify_qc` of the oldest leaf, which does not justify any
98 // leaf in the new chain, so we don't need it.
99 .skip(1);
100 for (
101 qc2,
102 LeafInfo {
103 leaf: leaf2,
104 vid_share,
105 state_cert: _,
106 ..
107 },
108 ) in qcs.zip(leaf_chain.iter().rev())
109 {
110 let height = leaf2.block_header().block_number();
111
112 let leaf_data = match LeafQueryData::new(leaf2.clone(), qc2.clone()) {
113 Ok(leaf) => leaf,
114 Err(err) => {
115 tracing::error!(
116 height,
117 ?leaf2,
118 ?committing_qc,
119 "inconsistent leaf; cannot append leaf information: {err:#}"
120 );
121 return Err(leaf2.block_header().block_number());
122 },
123 };
124 let block_data = leaf2
125 .block_payload()
126 .map(|payload| BlockQueryData::new(leaf2.block_header().clone(), payload));
127 if block_data.is_none() {
128 tracing::info!(height, "block not available at decide");
129 }
130
131 let (vid_common, vid_share) = match vid_share {
132 Some(VidDisperseShare::V0(share)) => (
133 Some(VidCommonQueryData::new(
134 leaf2.block_header().clone(),
135 VidCommon::V0(share.common.clone()),
136 )),
137 Some(VidShare::V0(share.share.clone())),
138 ),
139 Some(VidDisperseShare::V1(share)) => (
140 Some(VidCommonQueryData::new(
141 leaf2.block_header().clone(),
142 VidCommon::V1(share.common.clone()),
143 )),
144 Some(VidShare::V1(share.share.clone())),
145 ),
146 None => {
147 if leaf2.view_number().u64() == 0 {
148 // HotShot does not run VID in consensus for the genesis block. In this case,
149 // the block payload is guaranteed to always be empty, so VID isn't really
150 // necessary. But for consistency, we will still store the VID dispersal data,
151 // computing it ourselves based on the well-known genesis VID commitment.
152 match genesis_vid(leaf2) {
153 Ok((common, share)) => (Some(common), Some(share)),
154 Err(err) => {
155 tracing::warn!("failed to compute genesis VID: {err:#}");
156 (None, None)
157 },
158 }
159 } else {
160 (None, None)
161 }
162 },
163 };
164
165 if vid_common.is_none() {
166 tracing::info!(height, "VID not available at decide");
167 }
168
169 let mut info = BlockInfo::new(leaf_data, block_data, vid_common, vid_share);
170 if let Some(deciding_qc) = deciding_qc {
171 if committing_qc.view_number() == info.leaf.leaf().view_number() {
172 let qc_chain =
173 [committing_qc.as_ref().clone(), deciding_qc.as_ref().clone()];
174 info = info.with_qc_chain(qc_chain);
175 }
176 }
177 if let Err(err) = self.append(info).await {
178 tracing::error!(height, "failed to append leaf information: {err:#}");
179 return Err(leaf2.block_header().block_number());
180 }
181 }
182 }
183 Ok(())
184 }
185}
186
187fn genesis_vid<Types: NodeType>(
188 leaf: &Leaf2<Types>,
189) -> anyhow::Result<(VidCommonQueryData<Types>, VidShare)> {
190 let payload = Payload::<Types>::empty().0;
191 let bytes = payload.encode();
192
193 match leaf.block_header().payload_commitment() {
194 VidCommitment::V0(commit) => {
195 let mut disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
196 .disperse(bytes)
197 .context("unable to compute VID dispersal for genesis block")?;
198
199 ensure!(
200 disperse.commit == commit,
201 "computed VID commit {} for genesis block does not match header commit {}",
202 disperse.commit,
203 commit
204 );
205 Ok((
206 VidCommonQueryData::new(
207 leaf.block_header().clone(),
208 VidCommon::V0(disperse.common),
209 ),
210 VidShare::V0(disperse.shares.remove(0)),
211 ))
212 },
213 VidCommitment::V1(commit) => {
214 let avidm_param = init_avidm_param(GENESIS_VID_NUM_STORAGE_NODES)?;
215 let weights = vec![1; GENESIS_VID_NUM_STORAGE_NODES];
216 let ns_table = parse_ns_table(bytes.len(), &leaf.block_header().metadata().encode());
217
218 let (calculated_commit, mut shares) =
219 AvidMScheme::ns_disperse(&avidm_param, &weights, &bytes, ns_table).unwrap();
220
221 ensure!(
222 calculated_commit == commit,
223 "computed VID commit {} for genesis block does not match header commit {}",
224 calculated_commit,
225 commit
226 );
227
228 Ok((
229 VidCommonQueryData::new(leaf.block_header().clone(), VidCommon::V1(avidm_param)),
230 VidShare::V1(shares.remove(0)),
231 ))
232 },
233 }
234}
235
236/// A data source with an atomic transaction-based synchronization interface.
237///
238/// Changes are made to a versioned data source through a [`Transaction`]. Any changes made in a
239/// [`Transaction`] are initially visible only when queried through that same [`Transaction`]. They
240/// are not immediately written back to storage, which means that a new data source object opened
241/// against the same persistent storage will not reflect the changes. In particular, this means that
242/// if the process restarts and reopens its storage, uncommitted changes will be lost.
243///
244/// Only when a [`Transaction`] is committed are changes written back to storage, synchronized with
245/// any concurrent changes, and made visible to other connections to the same data source.
246pub trait VersionedDataSource: Send + Sync {
247 /// A transaction which can read and modify the data source.
248 type Transaction<'a>: Transaction
249 where
250 Self: 'a;
251
252 type ReadOnly<'a>: Transaction
253 where
254 Self: 'a;
255
256 /// Start an atomic transaction on the data source.
257 fn write(&self) -> impl Future<Output = anyhow::Result<Self::Transaction<'_>>> + Send;
258
259 /// Start a read-only transaction on the data source.
260 ///
261 /// A read-only transaction allows the owner to string together multiple queries of the data
262 /// source, which otherwise would not be atomic with respect to concurrent writes, in an atomic
263 /// fashion. Upon returning, [`read`](Self::read) locks in a fully consistent snapshot of the
264 /// data source, and any read operations performed upon the transaction thereafter read from the
265 /// same consistent snapshot. Concurrent modifications to the data source may occur (for
266 /// example, from concurrent [`write`](Self::write) transactions being committed), but their
267 /// results will not be reflected in a successful read-only transaction which was opened before
268 /// the write was committed.
269 ///
270 /// Read-only transactions do not need to be committed, and reverting has no effect.
271 fn read(&self) -> impl Future<Output = anyhow::Result<Self::ReadOnly<'_>>> + Send;
272}
273
274/// A unit of atomicity for updating a shared data source.
275///
276/// The methods provided by this trait can be used to write such pending changes back to persistent
277/// storage ([commit](Self::commit)) so that they become visible to other clients of the same
278/// underlying storage, and are saved if the process restarts. It also allows pending changes to be
279/// rolled back ([revert](Self::revert)) so that they are never written back to storage and are no
280/// longer reflected even through the data source object which was used to make the changes.
281pub trait Transaction: Send + Sync + Sized {
282 fn commit(self) -> impl Future<Output = anyhow::Result<()>> + Send;
283 fn revert(self) -> impl Future + Send;
284}