sequencer/request_response/
data_source.rs

1//! This file contains the [`DataSource`] trait. This trait allows the [`RequestResponseProtocol`]
2//! to calculate/derive a response for a specific request. In the confirmation layer the implementer
3//! would be something like a [`FeeMerkleTree`] for fee catchup
4
5use std::{marker::PhantomData, sync::Arc};
6
7use anyhow::{bail, Context, Result};
8use async_trait::async_trait;
9use espresso_types::{
10    retain_accounts,
11    traits::SequencerPersistence,
12    v0_3::{RewardAccountV1, RewardMerkleTreeV1},
13    v0_4::{RewardAccountV2, RewardMerkleTreeV2},
14    NodeState, PubKey, SeqTypes,
15};
16use hotshot::{traits::NodeImplementation, SystemContext};
17use hotshot_query_service::{
18    data_source::{
19        storage::{FileSystemStorage, NodeStorage, SqlStorage},
20        VersionedDataSource,
21    },
22    node::BlockId,
23};
24use hotshot_types::{
25    data::ViewNumber,
26    traits::{
27        network::ConnectedNetwork,
28        node_implementation::{ConsensusTime, Versions},
29    },
30    vote::HasViewNumber,
31};
32use itertools::Itertools;
33use jf_merkle_tree_compat::{
34    ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, LookupResult,
35    MerkleTreeScheme, UniversalMerkleTreeScheme,
36};
37use request_response::data_source::DataSource as DataSourceTrait;
38
39use super::request::{Request, Response};
40use crate::{
41    api::BlocksFrontier,
42    catchup::{
43        add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
44        add_v2_reward_accounts_to_state, CatchupStorage,
45    },
46};
47
48/// Query Service Storage types that can be used for request-response data source
49#[derive(Clone)]
50pub enum Storage {
51    Sql(Arc<SqlStorage>),
52    Fs(Arc<FileSystemStorage<SeqTypes>>),
53}
54
55/// A type alias for the consensus handle
56type Consensus<I, V> = Arc<SystemContext<SeqTypes, I, V>>;
57
58#[derive(Clone)]
59pub struct DataSource<
60    I: NodeImplementation<SeqTypes>,
61    V: Versions,
62    N: ConnectedNetwork<PubKey>,
63    P: SequencerPersistence,
64> {
65    /// The consensus handle
66    pub consensus: Consensus<I, V>,
67    /// The node's state
68    pub node_state: NodeState,
69    /// The storage
70    pub storage: Option<Storage>,
71    /// sequencer persistence
72    pub persistence: Arc<P>,
73    /// Phantom data
74    pub phantom: PhantomData<N>,
75}
76
77/// Implement the trait that allows the [`RequestResponseProtocol`] to calculate/derive a response for a specific request
78#[async_trait]
79impl<
80        I: NodeImplementation<SeqTypes>,
81        V: Versions,
82        N: ConnectedNetwork<PubKey>,
83        P: SequencerPersistence,
84    > DataSourceTrait<Request> for DataSource<I, V, N, P>
85{
86    async fn derive_response_for(&self, request: &Request) -> Result<Response> {
87        match request {
88            Request::Accounts(height, view, accounts) => {
89                // Try to get accounts from memory first, then fall back to storage
90                if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
91                    if let Ok(accounts) =
92                        retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
93                    {
94                        return Ok(Response::Accounts(accounts));
95                    }
96                }
97
98                // Fall back to storage
99                let (merkle_tree, leaf) = match &self.storage {
100                    Some(Storage::Sql(storage)) => storage
101                        .get_accounts(&self.node_state, *height, ViewNumber::new(*view), accounts)
102                        .await
103                        .with_context(|| "failed to get accounts from sql storage")?,
104                    Some(Storage::Fs(_)) => bail!("fs storage not supported for accounts"),
105                    _ => bail!("storage was not initialized"),
106                };
107
108                // If we successfully fetched accounts from storage, try to add them back into the in-memory
109                // state.
110                if let Err(err) = add_fee_accounts_to_state::<N, V, P>(
111                    &self.consensus.consensus(),
112                    &ViewNumber::new(*view),
113                    accounts,
114                    &merkle_tree,
115                    leaf,
116                )
117                .await
118                {
119                    tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
120                }
121
122                Ok(Response::Accounts(merkle_tree))
123            },
124
125            Request::Leaf(height) => {
126                // Try to get the leaves from memory first, then fall back to storage
127                let mut leaves = self.consensus.consensus().read().await.undecided_leaves();
128                leaves.sort_by_key(|l| l.view_number());
129
130                if let Some((position, mut last_leaf)) =
131                    leaves.iter().find_position(|l| l.height() == *height)
132                {
133                    let mut leaf_chain = vec![last_leaf.clone()];
134                    for leaf in leaves.iter().skip(position + 1) {
135                        if leaf.justify_qc().view_number() == last_leaf.view_number() {
136                            leaf_chain.push(leaf.clone());
137                        } else {
138                            continue;
139                        }
140                        if leaf.view_number() == last_leaf.view_number() + 1 {
141                            // one away from decide
142                            last_leaf = leaf;
143                            break;
144                        }
145                        last_leaf = leaf;
146                    }
147
148                    // Make sure we got one more leaf to confirm the decide
149                    for leaf in leaves
150                        .iter()
151                        .skip_while(|l| l.view_number() <= last_leaf.view_number())
152                    {
153                        if leaf.justify_qc().view_number() == last_leaf.view_number() {
154                            leaf_chain.push(leaf.clone());
155                            return Ok(Response::Leaf(leaf_chain));
156                        }
157                    }
158                }
159
160                // Fall back to storage
161                let leaf_chain = match &self.storage {
162                    Some(Storage::Sql(storage)) => storage
163                        .get_leaf_chain(*height)
164                        .await
165                        .with_context(|| "failed to get leaf from sql storage")?,
166                    // TODO: Actually implement FS storage for some of these
167                    Some(Storage::Fs(_)) => bail!("fs storage not supported for leaf"),
168                    _ => bail!("storage was not initialized"),
169                };
170
171                Ok(Response::Leaf(leaf_chain))
172            },
173            Request::ChainConfig(commitment) => {
174                // Try to get the chain config from memory first, then fall back to storage
175                let chain_config_from_memory = self.consensus.decided_state().await.chain_config;
176                if chain_config_from_memory.commit() == *commitment {
177                    if let Some(chain_config) = chain_config_from_memory.resolve() {
178                        return Ok(Response::ChainConfig(chain_config));
179                    }
180                }
181
182                // Fall back to storage
183                Ok(Response::ChainConfig(match &self.storage {
184                    Some(Storage::Sql(storage)) => storage
185                        .get_chain_config(*commitment)
186                        .await
187                        .with_context(|| "failed to get chain config from sql storage")?,
188                    Some(Storage::Fs(_)) => {
189                        bail!("fs storage not supported for chain config")
190                    },
191                    _ => bail!("storage was not initialized"),
192                }))
193            },
194            Request::BlocksFrontier(height, view) => {
195                // First try to respond from memory
196                let blocks_frontier_from_memory: Option<Result<BlocksFrontier>> = self
197                    .consensus
198                    .state(ViewNumber::new(*view))
199                    .await
200                    .map(|state| {
201                        let tree = &state.block_merkle_tree;
202                        let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
203                        Ok(frontier)
204                    });
205
206                if let Some(Ok(blocks_frontier_from_memory)) = blocks_frontier_from_memory {
207                    return Ok(Response::BlocksFrontier(blocks_frontier_from_memory));
208                } else {
209                    // If we can't get the blocks frontier from memory, fall through to storage
210                    let blocks_frontier_from_storage = match &self.storage {
211                        Some(Storage::Sql(storage)) => storage
212                            .get_frontier(&self.node_state, *height, ViewNumber::new(*view))
213                            .await
214                            .with_context(|| "failed to get blocks frontier from sql storage")?,
215                        Some(Storage::Fs(_)) => {
216                            bail!("fs storage not supported for blocks frontier")
217                        },
218                        _ => bail!("storage was not initialized"),
219                    };
220
221                    Ok(Response::BlocksFrontier(blocks_frontier_from_storage))
222                }
223            },
224            Request::RewardAccountsV2(height, view, accounts) => {
225                // Try to get the reward accounts from memory first, then fall back to storage
226                if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
227                    if let Ok(reward_accounts) = retain_v2_reward_accounts(
228                        &state.reward_merkle_tree_v2,
229                        accounts.iter().copied(),
230                    ) {
231                        return Ok(Response::RewardAccountsV2(reward_accounts));
232                    }
233                }
234
235                // Fall back to storage
236                let (merkle_tree, leaf) = match &self.storage {
237                    Some(Storage::Sql(storage)) => storage
238                        .get_reward_accounts_v2(
239                            &self.node_state,
240                            *height,
241                            ViewNumber::new(*view),
242                            accounts,
243                        )
244                        .await
245                        .with_context(|| "failed to get accounts from sql storage")?,
246                    Some(Storage::Fs(_)) => {
247                        bail!("fs storage not supported for reward accounts")
248                    },
249                    _ => bail!("storage was not initialized"),
250                };
251
252                // If we successfully fetched accounts from storage, try to add them back into the in-memory
253                // state.
254                if let Err(err) = add_v2_reward_accounts_to_state::<N, V, P>(
255                    &self.consensus.consensus(),
256                    &ViewNumber::new(*view),
257                    accounts,
258                    &merkle_tree,
259                    leaf,
260                )
261                .await
262                {
263                    tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
264                }
265
266                Ok(Response::RewardAccountsV2(merkle_tree))
267            },
268
269            Request::RewardAccountsV1(height, view, accounts) => {
270                // Try to get the reward accounts from memory first, then fall back to storage
271                if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
272                    if let Ok(reward_accounts) = retain_v1_reward_accounts(
273                        &state.reward_merkle_tree_v1,
274                        accounts.iter().copied(),
275                    ) {
276                        return Ok(Response::RewardAccountsV1(reward_accounts));
277                    }
278                }
279
280                // Fall back to storage
281                let (merkle_tree, leaf) = match &self.storage {
282                    Some(Storage::Sql(storage)) => storage
283                        .get_reward_accounts_v1(
284                            &self.node_state,
285                            *height,
286                            ViewNumber::new(*view),
287                            accounts,
288                        )
289                        .await
290                        .with_context(|| "failed to get v1 reward accounts from sql storage")?,
291                    Some(Storage::Fs(_)) => {
292                        bail!("fs storage not supported for v1 reward accounts")
293                    },
294                    _ => bail!("storage was not initialized"),
295                };
296
297                // If we successfully fetched accounts from storage, try to add them back into the in-memory
298                // state.
299                if let Err(err) = add_v1_reward_accounts_to_state::<N, V, P>(
300                    &self.consensus.consensus(),
301                    &ViewNumber::new(*view),
302                    accounts,
303                    &merkle_tree,
304                    leaf,
305                )
306                .await
307                {
308                    tracing::warn!(
309                        ?view,
310                        "Cannot update fetched v1 reward account state: {err:#}"
311                    );
312                }
313
314                Ok(Response::RewardAccountsV1(merkle_tree))
315            },
316            Request::VidShare(block_number, _request_id) => {
317                // Load the VID share from storage
318                let vid_share = match &self.storage {
319                    Some(Storage::Sql(storage)) => storage
320                        .get_vid_share::<SeqTypes>(BlockId::Number(*block_number as usize))
321                        .await
322                        .with_context(|| "failed to get vid share from sql storage")?,
323                    Some(Storage::Fs(storage)) => {
324                        // Open a read transaction
325                        let mut transaction = storage
326                            .read()
327                            .await
328                            .with_context(|| "failed to open fs storage transaction")?;
329
330                        // Get the VID share
331                        transaction
332                            .vid_share(BlockId::Number(*block_number as usize))
333                            .await
334                            .with_context(|| "failed to get vid share from fs storage")?
335                    },
336                    _ => bail!("storage was not initialized"),
337                };
338
339                Ok(Response::VidShare(vid_share))
340            },
341            Request::StateCert(epoch) => {
342                let state_cert = self
343                    .persistence
344                    .get_state_cert_by_epoch(*epoch)
345                    .await
346                    .with_context(|| {
347                        format!("failed to get state cert for epoch {epoch} from persistence")
348                    })?;
349
350                match state_cert {
351                    Some(cert) => Ok(Response::StateCert(cert)),
352                    None => bail!("State certificate for epoch {epoch} not found"),
353                }
354            },
355        }
356    }
357}
358
359/// Get a partial snapshot of the given reward state, which contains only the specified accounts.
360///
361/// Fails if one of the requested accounts is not represented in the original `state`.
362pub fn retain_v2_reward_accounts(
363    state: &RewardMerkleTreeV2,
364    accounts: impl IntoIterator<Item = RewardAccountV2>,
365) -> anyhow::Result<RewardMerkleTreeV2> {
366    let mut snapshot = RewardMerkleTreeV2::from_commitment(state.commitment());
367    for account in accounts {
368        match state.universal_lookup(account) {
369            LookupResult::Ok(elem, proof) => {
370                // This remember cannot fail, since we just constructed a valid proof, and are
371                // remembering into a tree with the same commitment.
372                snapshot.remember(account, *elem, proof).unwrap();
373            },
374            LookupResult::NotFound(proof) => {
375                // Likewise this cannot fail.
376                snapshot.non_membership_remember(account, proof).unwrap()
377            },
378            LookupResult::NotInMemory => {
379                bail!("missing account {account}");
380            },
381        }
382    }
383
384    Ok(snapshot)
385}
386
387/// Get a partial snapshot of the given reward state, which contains only the specified accounts.
388///
389/// Fails if one of the requested accounts is not represented in the original `state`.
390pub fn retain_v1_reward_accounts(
391    state: &RewardMerkleTreeV1,
392    accounts: impl IntoIterator<Item = RewardAccountV1>,
393) -> anyhow::Result<RewardMerkleTreeV1> {
394    let mut snapshot = RewardMerkleTreeV1::from_commitment(state.commitment());
395    for account in accounts {
396        match state.universal_lookup(account) {
397            LookupResult::Ok(elem, proof) => {
398                // This remember cannot fail, since we just constructed a valid proof, and are
399                // remembering into a tree with the same commitment.
400                snapshot.remember(account, *elem, proof).unwrap();
401            },
402            LookupResult::NotFound(proof) => {
403                // Likewise this cannot fail.
404                snapshot.non_membership_remember(account, proof).unwrap()
405            },
406            LookupResult::NotInMemory => {
407                bail!("missing account {account}");
408            },
409        }
410    }
411
412    Ok(snapshot)
413}