sequencer/request_response/
data_source.rs1use std::{marker::PhantomData, sync::Arc};
6
7use anyhow::{bail, Context, Result};
8use async_trait::async_trait;
9use espresso_types::{
10 retain_accounts,
11 traits::SequencerPersistence,
12 v0_3::{RewardAccountV1, RewardMerkleTreeV1},
13 v0_4::{RewardAccountV2, RewardMerkleTreeV2},
14 NodeState, PubKey, SeqTypes,
15};
16use hotshot::{traits::NodeImplementation, SystemContext};
17use hotshot_query_service::{
18 data_source::{
19 storage::{FileSystemStorage, NodeStorage, SqlStorage},
20 VersionedDataSource,
21 },
22 node::BlockId,
23};
24use hotshot_types::{
25 data::ViewNumber,
26 traits::{
27 network::ConnectedNetwork,
28 node_implementation::{ConsensusTime, Versions},
29 },
30 vote::HasViewNumber,
31};
32use itertools::Itertools;
33use jf_merkle_tree::{
34 ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, LookupResult,
35 MerkleTreeScheme, UniversalMerkleTreeScheme,
36};
37use request_response::data_source::DataSource as DataSourceTrait;
38
39use super::request::{Request, Response};
40use crate::{
41 api::BlocksFrontier,
42 catchup::{
43 add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
44 add_v2_reward_accounts_to_state, CatchupStorage,
45 },
46};
47
48#[derive(Clone)]
50pub enum Storage {
51 Sql(Arc<SqlStorage>),
52 Fs(Arc<FileSystemStorage<SeqTypes>>),
53}
54
55type Consensus<I, V> = Arc<SystemContext<SeqTypes, I, V>>;
57
58#[derive(Clone)]
59pub struct DataSource<
60 I: NodeImplementation<SeqTypes>,
61 V: Versions,
62 N: ConnectedNetwork<PubKey>,
63 P: SequencerPersistence,
64> {
65 pub consensus: Consensus<I, V>,
67 pub node_state: NodeState,
69 pub storage: Option<Storage>,
71 pub phantom: PhantomData<(N, P)>,
73}
74
75#[async_trait]
77impl<
78 I: NodeImplementation<SeqTypes>,
79 V: Versions,
80 N: ConnectedNetwork<PubKey>,
81 P: SequencerPersistence,
82 > DataSourceTrait<Request> for DataSource<I, V, N, P>
83{
84 async fn derive_response_for(&self, request: &Request) -> Result<Response> {
85 match request {
86 Request::Accounts(height, view, accounts) => {
87 if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
89 if let Ok(accounts) =
90 retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
91 {
92 return Ok(Response::Accounts(accounts));
93 }
94 }
95
96 let (merkle_tree, leaf) = match &self.storage {
98 Some(Storage::Sql(storage)) => storage
99 .get_accounts(&self.node_state, *height, ViewNumber::new(*view), accounts)
100 .await
101 .with_context(|| "failed to get accounts from sql storage")?,
102 Some(Storage::Fs(_)) => bail!("fs storage not supported for accounts"),
103 _ => bail!("storage was not initialized"),
104 };
105
106 if let Err(err) = add_fee_accounts_to_state::<N, V, P>(
109 &self.consensus.consensus(),
110 &ViewNumber::new(*view),
111 accounts,
112 &merkle_tree,
113 leaf,
114 )
115 .await
116 {
117 tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
118 }
119
120 Ok(Response::Accounts(merkle_tree))
121 },
122
123 Request::Leaf(height) => {
124 let mut leaves = self.consensus.consensus().read().await.undecided_leaves();
126 leaves.sort_by_key(|l| l.view_number());
127
128 if let Some((position, mut last_leaf)) =
129 leaves.iter().find_position(|l| l.height() == *height)
130 {
131 let mut leaf_chain = vec![last_leaf.clone()];
132 for leaf in leaves.iter().skip(position + 1) {
133 if leaf.justify_qc().view_number() == last_leaf.view_number() {
134 leaf_chain.push(leaf.clone());
135 } else {
136 continue;
137 }
138 if leaf.view_number() == last_leaf.view_number() + 1 {
139 last_leaf = leaf;
141 break;
142 }
143 last_leaf = leaf;
144 }
145
146 for leaf in leaves
148 .iter()
149 .skip_while(|l| l.view_number() <= last_leaf.view_number())
150 {
151 if leaf.justify_qc().view_number() == last_leaf.view_number() {
152 leaf_chain.push(leaf.clone());
153 return Ok(Response::Leaf(leaf_chain));
154 }
155 }
156 }
157
158 let leaf_chain = match &self.storage {
160 Some(Storage::Sql(storage)) => storage
161 .get_leaf_chain(*height)
162 .await
163 .with_context(|| "failed to get leaf from sql storage")?,
164 Some(Storage::Fs(_)) => bail!("fs storage not supported for leaf"),
166 _ => bail!("storage was not initialized"),
167 };
168
169 Ok(Response::Leaf(leaf_chain))
170 },
171 Request::ChainConfig(commitment) => {
172 let chain_config_from_memory = self.consensus.decided_state().await.chain_config;
174 if chain_config_from_memory.commit() == *commitment {
175 if let Some(chain_config) = chain_config_from_memory.resolve() {
176 return Ok(Response::ChainConfig(chain_config));
177 }
178 }
179
180 Ok(Response::ChainConfig(match &self.storage {
182 Some(Storage::Sql(storage)) => storage
183 .get_chain_config(*commitment)
184 .await
185 .with_context(|| "failed to get chain config from sql storage")?,
186 Some(Storage::Fs(_)) => {
187 bail!("fs storage not supported for chain config")
188 },
189 _ => bail!("storage was not initialized"),
190 }))
191 },
192 Request::BlocksFrontier(height, view) => {
193 let blocks_frontier_from_memory: Option<Result<BlocksFrontier>> = self
195 .consensus
196 .state(ViewNumber::new(*view))
197 .await
198 .map(|state| {
199 let tree = &state.block_merkle_tree;
200 let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
201 Ok(frontier)
202 });
203
204 if let Some(Ok(blocks_frontier_from_memory)) = blocks_frontier_from_memory {
205 return Ok(Response::BlocksFrontier(blocks_frontier_from_memory));
206 } else {
207 let blocks_frontier_from_storage = match &self.storage {
209 Some(Storage::Sql(storage)) => storage
210 .get_frontier(&self.node_state, *height, ViewNumber::new(*view))
211 .await
212 .with_context(|| "failed to get blocks frontier from sql storage")?,
213 Some(Storage::Fs(_)) => {
214 bail!("fs storage not supported for blocks frontier")
215 },
216 _ => bail!("storage was not initialized"),
217 };
218
219 Ok(Response::BlocksFrontier(blocks_frontier_from_storage))
220 }
221 },
222 Request::RewardAccountsV2(height, view, accounts) => {
223 if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
225 if let Ok(reward_accounts) = retain_v2_reward_accounts(
226 &state.reward_merkle_tree_v2,
227 accounts.iter().copied(),
228 ) {
229 return Ok(Response::RewardAccountsV2(reward_accounts));
230 }
231 }
232
233 let (merkle_tree, leaf) = match &self.storage {
235 Some(Storage::Sql(storage)) => storage
236 .get_reward_accounts_v2(
237 &self.node_state,
238 *height,
239 ViewNumber::new(*view),
240 accounts,
241 )
242 .await
243 .with_context(|| "failed to get accounts from sql storage")?,
244 Some(Storage::Fs(_)) => {
245 bail!("fs storage not supported for reward accounts")
246 },
247 _ => bail!("storage was not initialized"),
248 };
249
250 if let Err(err) = add_v2_reward_accounts_to_state::<N, V, P>(
253 &self.consensus.consensus(),
254 &ViewNumber::new(*view),
255 accounts,
256 &merkle_tree,
257 leaf,
258 )
259 .await
260 {
261 tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
262 }
263
264 Ok(Response::RewardAccountsV2(merkle_tree))
265 },
266
267 Request::RewardAccountsV1(height, view, accounts) => {
268 if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
270 if let Ok(reward_accounts) = retain_v1_reward_accounts(
271 &state.reward_merkle_tree_v1,
272 accounts.iter().copied(),
273 ) {
274 return Ok(Response::RewardAccountsV1(reward_accounts));
275 }
276 }
277
278 let (merkle_tree, leaf) = match &self.storage {
280 Some(Storage::Sql(storage)) => storage
281 .get_reward_accounts_v1(
282 &self.node_state,
283 *height,
284 ViewNumber::new(*view),
285 accounts,
286 )
287 .await
288 .with_context(|| "failed to get v1 reward accounts from sql storage")?,
289 Some(Storage::Fs(_)) => {
290 bail!("fs storage not supported for v1 reward accounts")
291 },
292 _ => bail!("storage was not initialized"),
293 };
294
295 if let Err(err) = add_v1_reward_accounts_to_state::<N, V, P>(
298 &self.consensus.consensus(),
299 &ViewNumber::new(*view),
300 accounts,
301 &merkle_tree,
302 leaf,
303 )
304 .await
305 {
306 tracing::warn!(
307 ?view,
308 "Cannot update fetched v1 reward account state: {err:#}"
309 );
310 }
311
312 Ok(Response::RewardAccountsV1(merkle_tree))
313 },
314 Request::VidShare(block_number, _request_id) => {
315 let vid_share = match &self.storage {
317 Some(Storage::Sql(storage)) => storage
318 .get_vid_share::<SeqTypes>(BlockId::Number(*block_number as usize))
319 .await
320 .with_context(|| "failed to get vid share from sql storage")?,
321 Some(Storage::Fs(storage)) => {
322 let mut transaction = storage
324 .read()
325 .await
326 .with_context(|| "failed to open fs storage transaction")?;
327
328 transaction
330 .vid_share(BlockId::Number(*block_number as usize))
331 .await
332 .with_context(|| "failed to get vid share from fs storage")?
333 },
334 _ => bail!("storage was not initialized"),
335 };
336
337 Ok(Response::VidShare(vid_share))
338 },
339 }
340 }
341}
342
343pub fn retain_v2_reward_accounts(
347 state: &RewardMerkleTreeV2,
348 accounts: impl IntoIterator<Item = RewardAccountV2>,
349) -> anyhow::Result<RewardMerkleTreeV2> {
350 let mut snapshot = RewardMerkleTreeV2::from_commitment(state.commitment());
351 for account in accounts {
352 match state.universal_lookup(account) {
353 LookupResult::Ok(elem, proof) => {
354 snapshot.remember(account, *elem, proof).unwrap();
357 },
358 LookupResult::NotFound(proof) => {
359 snapshot.non_membership_remember(account, proof).unwrap()
361 },
362 LookupResult::NotInMemory => {
363 bail!("missing account {account}");
364 },
365 }
366 }
367
368 Ok(snapshot)
369}
370
371pub fn retain_v1_reward_accounts(
375 state: &RewardMerkleTreeV1,
376 accounts: impl IntoIterator<Item = RewardAccountV1>,
377) -> anyhow::Result<RewardMerkleTreeV1> {
378 let mut snapshot = RewardMerkleTreeV1::from_commitment(state.commitment());
379 for account in accounts {
380 match state.universal_lookup(account) {
381 LookupResult::Ok(elem, proof) => {
382 snapshot.remember(account, *elem, proof).unwrap();
385 },
386 LookupResult::NotFound(proof) => {
387 snapshot.non_membership_remember(account, proof).unwrap()
389 },
390 LookupResult::NotInMemory => {
391 bail!("missing account {account}");
392 },
393 }
394 }
395
396 Ok(snapshot)
397}