1use std::{marker::PhantomData, sync::Arc};
6
7use anyhow::{bail, Context, Result};
8use async_trait::async_trait;
9use espresso_types::{
10 retain_accounts,
11 traits::SequencerPersistence,
12 v0_3::{RewardAccountV1, RewardMerkleTreeV1},
13 v0_4::{RewardAccountV2, RewardMerkleTreeV2},
14 NodeState, PubKey, SeqTypes,
15};
16use hotshot::{traits::NodeImplementation, SystemContext};
17use hotshot_query_service::{
18 data_source::{
19 storage::{FileSystemStorage, NodeStorage, SqlStorage},
20 VersionedDataSource,
21 },
22 node::BlockId,
23};
24use hotshot_types::{
25 data::ViewNumber,
26 traits::{
27 network::ConnectedNetwork,
28 node_implementation::{ConsensusTime, Versions},
29 },
30 vote::HasViewNumber,
31};
32use itertools::Itertools;
33use jf_merkle_tree_compat::{
34 ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, LookupResult,
35 MerkleTreeScheme, UniversalMerkleTreeScheme,
36};
37use request_response::data_source::DataSource as DataSourceTrait;
38
39use super::request::{Request, Response};
40use crate::{
41 api::BlocksFrontier,
42 catchup::{
43 add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
44 add_v2_reward_accounts_to_state, CatchupStorage,
45 },
46};
47
48#[derive(Clone)]
50pub enum Storage {
51 Sql(Arc<SqlStorage>),
52 Fs(Arc<FileSystemStorage<SeqTypes>>),
53}
54
55type Consensus<I, V> = Arc<SystemContext<SeqTypes, I, V>>;
57
58#[derive(Clone)]
59pub struct DataSource<
60 I: NodeImplementation<SeqTypes>,
61 V: Versions,
62 N: ConnectedNetwork<PubKey>,
63 P: SequencerPersistence,
64> {
65 pub consensus: Consensus<I, V>,
67 pub node_state: NodeState,
69 pub storage: Option<Storage>,
71 pub persistence: Arc<P>,
73 pub phantom: PhantomData<N>,
75}
76
77#[async_trait]
79impl<
80 I: NodeImplementation<SeqTypes>,
81 V: Versions,
82 N: ConnectedNetwork<PubKey>,
83 P: SequencerPersistence,
84 > DataSourceTrait<Request> for DataSource<I, V, N, P>
85{
86 async fn derive_response_for(&self, request: &Request) -> Result<Response> {
87 match request {
88 Request::Accounts(height, view, accounts) => {
89 if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
91 if let Ok(accounts) =
92 retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
93 {
94 return Ok(Response::Accounts(accounts));
95 }
96 }
97
98 let (merkle_tree, leaf) = match &self.storage {
100 Some(Storage::Sql(storage)) => storage
101 .get_accounts(&self.node_state, *height, ViewNumber::new(*view), accounts)
102 .await
103 .with_context(|| "failed to get accounts from sql storage")?,
104 Some(Storage::Fs(_)) => bail!("fs storage not supported for accounts"),
105 _ => bail!("storage was not initialized"),
106 };
107
108 if let Err(err) = add_fee_accounts_to_state::<N, V, P>(
111 &self.consensus.consensus(),
112 &ViewNumber::new(*view),
113 accounts,
114 &merkle_tree,
115 leaf,
116 )
117 .await
118 {
119 tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
120 }
121
122 Ok(Response::Accounts(merkle_tree))
123 },
124
125 Request::Leaf(height) => {
126 let mut leaves = self.consensus.consensus().read().await.undecided_leaves();
128 leaves.sort_by_key(|l| l.view_number());
129
130 if let Some((position, mut last_leaf)) =
131 leaves.iter().find_position(|l| l.height() == *height)
132 {
133 let mut leaf_chain = vec![last_leaf.clone()];
134 for leaf in leaves.iter().skip(position + 1) {
135 if leaf.justify_qc().view_number() == last_leaf.view_number() {
136 leaf_chain.push(leaf.clone());
137 } else {
138 continue;
139 }
140 if leaf.view_number() == last_leaf.view_number() + 1 {
141 last_leaf = leaf;
143 break;
144 }
145 last_leaf = leaf;
146 }
147
148 for leaf in leaves
150 .iter()
151 .skip_while(|l| l.view_number() <= last_leaf.view_number())
152 {
153 if leaf.justify_qc().view_number() == last_leaf.view_number() {
154 leaf_chain.push(leaf.clone());
155 return Ok(Response::Leaf(leaf_chain));
156 }
157 }
158 }
159
160 let leaf_chain = match &self.storage {
162 Some(Storage::Sql(storage)) => storage
163 .get_leaf_chain(*height)
164 .await
165 .with_context(|| "failed to get leaf from sql storage")?,
166 Some(Storage::Fs(_)) => bail!("fs storage not supported for leaf"),
168 _ => bail!("storage was not initialized"),
169 };
170
171 Ok(Response::Leaf(leaf_chain))
172 },
173 Request::ChainConfig(commitment) => {
174 let chain_config_from_memory = self.consensus.decided_state().await.chain_config;
176 if chain_config_from_memory.commit() == *commitment {
177 if let Some(chain_config) = chain_config_from_memory.resolve() {
178 return Ok(Response::ChainConfig(chain_config));
179 }
180 }
181
182 Ok(Response::ChainConfig(match &self.storage {
184 Some(Storage::Sql(storage)) => storage
185 .get_chain_config(*commitment)
186 .await
187 .with_context(|| "failed to get chain config from sql storage")?,
188 Some(Storage::Fs(_)) => {
189 bail!("fs storage not supported for chain config")
190 },
191 _ => bail!("storage was not initialized"),
192 }))
193 },
194 Request::BlocksFrontier(height, view) => {
195 let blocks_frontier_from_memory: Option<Result<BlocksFrontier>> = self
197 .consensus
198 .state(ViewNumber::new(*view))
199 .await
200 .map(|state| {
201 let tree = &state.block_merkle_tree;
202 let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
203 Ok(frontier)
204 });
205
206 if let Some(Ok(blocks_frontier_from_memory)) = blocks_frontier_from_memory {
207 return Ok(Response::BlocksFrontier(blocks_frontier_from_memory));
208 } else {
209 let blocks_frontier_from_storage = match &self.storage {
211 Some(Storage::Sql(storage)) => storage
212 .get_frontier(&self.node_state, *height, ViewNumber::new(*view))
213 .await
214 .with_context(|| "failed to get blocks frontier from sql storage")?,
215 Some(Storage::Fs(_)) => {
216 bail!("fs storage not supported for blocks frontier")
217 },
218 _ => bail!("storage was not initialized"),
219 };
220
221 Ok(Response::BlocksFrontier(blocks_frontier_from_storage))
222 }
223 },
224 Request::RewardAccountsV2(height, view, accounts) => {
225 if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
227 if let Ok(reward_accounts) = retain_v2_reward_accounts(
228 &state.reward_merkle_tree_v2,
229 accounts.iter().copied(),
230 ) {
231 return Ok(Response::RewardAccountsV2(reward_accounts));
232 }
233 }
234
235 let (merkle_tree, leaf) = match &self.storage {
237 Some(Storage::Sql(storage)) => storage
238 .get_reward_accounts_v2(
239 &self.node_state,
240 *height,
241 ViewNumber::new(*view),
242 accounts,
243 )
244 .await
245 .with_context(|| "failed to get accounts from sql storage")?,
246 Some(Storage::Fs(_)) => {
247 bail!("fs storage not supported for reward accounts")
248 },
249 _ => bail!("storage was not initialized"),
250 };
251
252 if let Err(err) = add_v2_reward_accounts_to_state::<N, V, P>(
255 &self.consensus.consensus(),
256 &ViewNumber::new(*view),
257 accounts,
258 &merkle_tree,
259 leaf,
260 )
261 .await
262 {
263 tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
264 }
265
266 Ok(Response::RewardAccountsV2(merkle_tree))
267 },
268
269 Request::RewardAccountsV1(height, view, accounts) => {
270 if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
272 if let Ok(reward_accounts) = retain_v1_reward_accounts(
273 &state.reward_merkle_tree_v1,
274 accounts.iter().copied(),
275 ) {
276 return Ok(Response::RewardAccountsV1(reward_accounts));
277 }
278 }
279
280 let (merkle_tree, leaf) = match &self.storage {
282 Some(Storage::Sql(storage)) => storage
283 .get_reward_accounts_v1(
284 &self.node_state,
285 *height,
286 ViewNumber::new(*view),
287 accounts,
288 )
289 .await
290 .with_context(|| "failed to get v1 reward accounts from sql storage")?,
291 Some(Storage::Fs(_)) => {
292 bail!("fs storage not supported for v1 reward accounts")
293 },
294 _ => bail!("storage was not initialized"),
295 };
296
297 if let Err(err) = add_v1_reward_accounts_to_state::<N, V, P>(
300 &self.consensus.consensus(),
301 &ViewNumber::new(*view),
302 accounts,
303 &merkle_tree,
304 leaf,
305 )
306 .await
307 {
308 tracing::warn!(
309 ?view,
310 "Cannot update fetched v1 reward account state: {err:#}"
311 );
312 }
313
314 Ok(Response::RewardAccountsV1(merkle_tree))
315 },
316 Request::VidShare(block_number, _request_id) => {
317 let vid_share = match &self.storage {
319 Some(Storage::Sql(storage)) => storage
320 .get_vid_share::<SeqTypes>(BlockId::Number(*block_number as usize))
321 .await
322 .with_context(|| "failed to get vid share from sql storage")?,
323 Some(Storage::Fs(storage)) => {
324 let mut transaction = storage
326 .read()
327 .await
328 .with_context(|| "failed to open fs storage transaction")?;
329
330 transaction
332 .vid_share(BlockId::Number(*block_number as usize))
333 .await
334 .with_context(|| "failed to get vid share from fs storage")?
335 },
336 _ => bail!("storage was not initialized"),
337 };
338
339 Ok(Response::VidShare(vid_share))
340 },
341 Request::StateCert(epoch) => {
342 let state_cert = self
343 .persistence
344 .get_state_cert_by_epoch(*epoch)
345 .await
346 .with_context(|| {
347 format!("failed to get state cert for epoch {epoch} from persistence")
348 })?;
349
350 match state_cert {
351 Some(cert) => Ok(Response::StateCert(cert)),
352 None => bail!("State certificate for epoch {epoch} not found"),
353 }
354 },
355 }
356 }
357}
358
359pub fn retain_v2_reward_accounts(
363 state: &RewardMerkleTreeV2,
364 accounts: impl IntoIterator<Item = RewardAccountV2>,
365) -> anyhow::Result<RewardMerkleTreeV2> {
366 let mut snapshot = RewardMerkleTreeV2::from_commitment(state.commitment());
367 for account in accounts {
368 match state.universal_lookup(account) {
369 LookupResult::Ok(elem, proof) => {
370 snapshot.remember(account, *elem, proof).unwrap();
373 },
374 LookupResult::NotFound(proof) => {
375 snapshot.non_membership_remember(account, proof).unwrap()
377 },
378 LookupResult::NotInMemory => {
379 bail!("missing account {account}");
380 },
381 }
382 }
383
384 Ok(snapshot)
385}
386
387pub fn retain_v1_reward_accounts(
391 state: &RewardMerkleTreeV1,
392 accounts: impl IntoIterator<Item = RewardAccountV1>,
393) -> anyhow::Result<RewardMerkleTreeV1> {
394 let mut snapshot = RewardMerkleTreeV1::from_commitment(state.commitment());
395 for account in accounts {
396 match state.universal_lookup(account) {
397 LookupResult::Ok(elem, proof) => {
398 snapshot.remember(account, *elem, proof).unwrap();
401 },
402 LookupResult::NotFound(proof) => {
403 snapshot.non_membership_remember(account, proof).unwrap()
405 },
406 LookupResult::NotInMemory => {
407 bail!("missing account {account}");
408 },
409 }
410 }
411
412 Ok(snapshot)
413}