sequencer/request_response/
data_source.rs

1//! This file contains the [`DataSource`] trait. This trait allows the [`RequestResponseProtocol`]
2//! to calculate/derive a response for a specific request. In the confirmation layer the implementer
3//! would be something like a [`FeeMerkleTree`] for fee catchup
4
5use std::{marker::PhantomData, sync::Arc};
6
7use anyhow::{bail, Context, Result};
8use async_trait::async_trait;
9use espresso_types::{
10    retain_accounts,
11    traits::SequencerPersistence,
12    v0_3::{RewardAccountV1, RewardMerkleTreeV1},
13    v0_4::{RewardAccountV2, RewardMerkleTreeV2},
14    NodeState, PubKey, SeqTypes,
15};
16use hotshot::{traits::NodeImplementation, SystemContext};
17use hotshot_query_service::{
18    data_source::{
19        storage::{FileSystemStorage, NodeStorage, SqlStorage},
20        VersionedDataSource,
21    },
22    node::BlockId,
23};
24use hotshot_types::{
25    data::ViewNumber,
26    traits::{
27        network::ConnectedNetwork,
28        node_implementation::{ConsensusTime, Versions},
29    },
30    vote::HasViewNumber,
31};
32use itertools::Itertools;
33use jf_merkle_tree::{
34    ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, LookupResult,
35    MerkleTreeScheme, UniversalMerkleTreeScheme,
36};
37use request_response::data_source::DataSource as DataSourceTrait;
38
39use super::request::{Request, Response};
40use crate::{
41    api::BlocksFrontier,
42    catchup::{
43        add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
44        add_v2_reward_accounts_to_state, CatchupStorage,
45    },
46};
47
48/// A type alias for SQL storage
49#[derive(Clone)]
50pub enum Storage {
51    Sql(Arc<SqlStorage>),
52    Fs(Arc<FileSystemStorage<SeqTypes>>),
53}
54
55/// A type alias for the consensus handle
56type Consensus<I, V> = Arc<SystemContext<SeqTypes, I, V>>;
57
58#[derive(Clone)]
59pub struct DataSource<
60    I: NodeImplementation<SeqTypes>,
61    V: Versions,
62    N: ConnectedNetwork<PubKey>,
63    P: SequencerPersistence,
64> {
65    /// The consensus handle
66    pub consensus: Consensus<I, V>,
67    /// The node's state
68    pub node_state: NodeState,
69    /// The storage
70    pub storage: Option<Storage>,
71    /// Phantom data
72    pub phantom: PhantomData<(N, P)>,
73}
74
75/// Implement the trait that allows the [`RequestResponseProtocol`] to calculate/derive a response for a specific request
76#[async_trait]
77impl<
78        I: NodeImplementation<SeqTypes>,
79        V: Versions,
80        N: ConnectedNetwork<PubKey>,
81        P: SequencerPersistence,
82    > DataSourceTrait<Request> for DataSource<I, V, N, P>
83{
84    async fn derive_response_for(&self, request: &Request) -> Result<Response> {
85        match request {
86            Request::Accounts(height, view, accounts) => {
87                // Try to get accounts from memory first, then fall back to storage
88                if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
89                    if let Ok(accounts) =
90                        retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
91                    {
92                        return Ok(Response::Accounts(accounts));
93                    }
94                }
95
96                // Fall back to storage
97                let (merkle_tree, leaf) = match &self.storage {
98                    Some(Storage::Sql(storage)) => storage
99                        .get_accounts(&self.node_state, *height, ViewNumber::new(*view), accounts)
100                        .await
101                        .with_context(|| "failed to get accounts from sql storage")?,
102                    Some(Storage::Fs(_)) => bail!("fs storage not supported for accounts"),
103                    _ => bail!("storage was not initialized"),
104                };
105
106                // If we successfully fetched accounts from storage, try to add them back into the in-memory
107                // state.
108                if let Err(err) = add_fee_accounts_to_state::<N, V, P>(
109                    &self.consensus.consensus(),
110                    &ViewNumber::new(*view),
111                    accounts,
112                    &merkle_tree,
113                    leaf,
114                )
115                .await
116                {
117                    tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
118                }
119
120                Ok(Response::Accounts(merkle_tree))
121            },
122
123            Request::Leaf(height) => {
124                // Try to get the leaves from memory first, then fall back to storage
125                let mut leaves = self.consensus.consensus().read().await.undecided_leaves();
126                leaves.sort_by_key(|l| l.view_number());
127
128                if let Some((position, mut last_leaf)) =
129                    leaves.iter().find_position(|l| l.height() == *height)
130                {
131                    let mut leaf_chain = vec![last_leaf.clone()];
132                    for leaf in leaves.iter().skip(position + 1) {
133                        if leaf.justify_qc().view_number() == last_leaf.view_number() {
134                            leaf_chain.push(leaf.clone());
135                        } else {
136                            continue;
137                        }
138                        if leaf.view_number() == last_leaf.view_number() + 1 {
139                            // one away from decide
140                            last_leaf = leaf;
141                            break;
142                        }
143                        last_leaf = leaf;
144                    }
145
146                    // Make sure we got one more leaf to confirm the decide
147                    for leaf in leaves
148                        .iter()
149                        .skip_while(|l| l.view_number() <= last_leaf.view_number())
150                    {
151                        if leaf.justify_qc().view_number() == last_leaf.view_number() {
152                            leaf_chain.push(leaf.clone());
153                            return Ok(Response::Leaf(leaf_chain));
154                        }
155                    }
156                }
157
158                // Fall back to storage
159                let leaf_chain = match &self.storage {
160                    Some(Storage::Sql(storage)) => storage
161                        .get_leaf_chain(*height)
162                        .await
163                        .with_context(|| "failed to get leaf from sql storage")?,
164                    // TODO: Actually implement FS storage for some of these
165                    Some(Storage::Fs(_)) => bail!("fs storage not supported for leaf"),
166                    _ => bail!("storage was not initialized"),
167                };
168
169                Ok(Response::Leaf(leaf_chain))
170            },
171            Request::ChainConfig(commitment) => {
172                // Try to get the chain config from memory first, then fall back to storage
173                let chain_config_from_memory = self.consensus.decided_state().await.chain_config;
174                if chain_config_from_memory.commit() == *commitment {
175                    if let Some(chain_config) = chain_config_from_memory.resolve() {
176                        return Ok(Response::ChainConfig(chain_config));
177                    }
178                }
179
180                // Fall back to storage
181                Ok(Response::ChainConfig(match &self.storage {
182                    Some(Storage::Sql(storage)) => storage
183                        .get_chain_config(*commitment)
184                        .await
185                        .with_context(|| "failed to get chain config from sql storage")?,
186                    Some(Storage::Fs(_)) => {
187                        bail!("fs storage not supported for chain config")
188                    },
189                    _ => bail!("storage was not initialized"),
190                }))
191            },
192            Request::BlocksFrontier(height, view) => {
193                // First try to respond from memory
194                let blocks_frontier_from_memory: Option<Result<BlocksFrontier>> = self
195                    .consensus
196                    .state(ViewNumber::new(*view))
197                    .await
198                    .map(|state| {
199                        let tree = &state.block_merkle_tree;
200                        let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
201                        Ok(frontier)
202                    });
203
204                if let Some(Ok(blocks_frontier_from_memory)) = blocks_frontier_from_memory {
205                    return Ok(Response::BlocksFrontier(blocks_frontier_from_memory));
206                } else {
207                    // If we can't get the blocks frontier from memory, fall through to storage
208                    let blocks_frontier_from_storage = match &self.storage {
209                        Some(Storage::Sql(storage)) => storage
210                            .get_frontier(&self.node_state, *height, ViewNumber::new(*view))
211                            .await
212                            .with_context(|| "failed to get blocks frontier from sql storage")?,
213                        Some(Storage::Fs(_)) => {
214                            bail!("fs storage not supported for blocks frontier")
215                        },
216                        _ => bail!("storage was not initialized"),
217                    };
218
219                    Ok(Response::BlocksFrontier(blocks_frontier_from_storage))
220                }
221            },
222            Request::RewardAccountsV2(height, view, accounts) => {
223                // Try to get the reward accounts from memory first, then fall back to storage
224                if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
225                    if let Ok(reward_accounts) = retain_v2_reward_accounts(
226                        &state.reward_merkle_tree_v2,
227                        accounts.iter().copied(),
228                    ) {
229                        return Ok(Response::RewardAccountsV2(reward_accounts));
230                    }
231                }
232
233                // Fall back to storage
234                let (merkle_tree, leaf) = match &self.storage {
235                    Some(Storage::Sql(storage)) => storage
236                        .get_reward_accounts_v2(
237                            &self.node_state,
238                            *height,
239                            ViewNumber::new(*view),
240                            accounts,
241                        )
242                        .await
243                        .with_context(|| "failed to get accounts from sql storage")?,
244                    Some(Storage::Fs(_)) => {
245                        bail!("fs storage not supported for reward accounts")
246                    },
247                    _ => bail!("storage was not initialized"),
248                };
249
250                // If we successfully fetched accounts from storage, try to add them back into the in-memory
251                // state.
252                if let Err(err) = add_v2_reward_accounts_to_state::<N, V, P>(
253                    &self.consensus.consensus(),
254                    &ViewNumber::new(*view),
255                    accounts,
256                    &merkle_tree,
257                    leaf,
258                )
259                .await
260                {
261                    tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
262                }
263
264                Ok(Response::RewardAccountsV2(merkle_tree))
265            },
266
267            Request::RewardAccountsV1(height, view, accounts) => {
268                // Try to get the reward accounts from memory first, then fall back to storage
269                if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
270                    if let Ok(reward_accounts) = retain_v1_reward_accounts(
271                        &state.reward_merkle_tree_v1,
272                        accounts.iter().copied(),
273                    ) {
274                        return Ok(Response::RewardAccountsV1(reward_accounts));
275                    }
276                }
277
278                // Fall back to storage
279                let (merkle_tree, leaf) = match &self.storage {
280                    Some(Storage::Sql(storage)) => storage
281                        .get_reward_accounts_v1(
282                            &self.node_state,
283                            *height,
284                            ViewNumber::new(*view),
285                            accounts,
286                        )
287                        .await
288                        .with_context(|| "failed to get v1 reward accounts from sql storage")?,
289                    Some(Storage::Fs(_)) => {
290                        bail!("fs storage not supported for v1 reward accounts")
291                    },
292                    _ => bail!("storage was not initialized"),
293                };
294
295                // If we successfully fetched accounts from storage, try to add them back into the in-memory
296                // state.
297                if let Err(err) = add_v1_reward_accounts_to_state::<N, V, P>(
298                    &self.consensus.consensus(),
299                    &ViewNumber::new(*view),
300                    accounts,
301                    &merkle_tree,
302                    leaf,
303                )
304                .await
305                {
306                    tracing::warn!(
307                        ?view,
308                        "Cannot update fetched v1 reward account state: {err:#}"
309                    );
310                }
311
312                Ok(Response::RewardAccountsV1(merkle_tree))
313            },
314            Request::VidShare(block_number, _request_id) => {
315                // Load the VID share from storage
316                let vid_share = match &self.storage {
317                    Some(Storage::Sql(storage)) => storage
318                        .get_vid_share::<SeqTypes>(BlockId::Number(*block_number as usize))
319                        .await
320                        .with_context(|| "failed to get vid share from sql storage")?,
321                    Some(Storage::Fs(storage)) => {
322                        // Open a read transaction
323                        let mut transaction = storage
324                            .read()
325                            .await
326                            .with_context(|| "failed to open fs storage transaction")?;
327
328                        // Get the VID share
329                        transaction
330                            .vid_share(BlockId::Number(*block_number as usize))
331                            .await
332                            .with_context(|| "failed to get vid share from fs storage")?
333                    },
334                    _ => bail!("storage was not initialized"),
335                };
336
337                Ok(Response::VidShare(vid_share))
338            },
339        }
340    }
341}
342
343/// Get a partial snapshot of the given reward state, which contains only the specified accounts.
344///
345/// Fails if one of the requested accounts is not represented in the original `state`.
346pub fn retain_v2_reward_accounts(
347    state: &RewardMerkleTreeV2,
348    accounts: impl IntoIterator<Item = RewardAccountV2>,
349) -> anyhow::Result<RewardMerkleTreeV2> {
350    let mut snapshot = RewardMerkleTreeV2::from_commitment(state.commitment());
351    for account in accounts {
352        match state.universal_lookup(account) {
353            LookupResult::Ok(elem, proof) => {
354                // This remember cannot fail, since we just constructed a valid proof, and are
355                // remembering into a tree with the same commitment.
356                snapshot.remember(account, *elem, proof).unwrap();
357            },
358            LookupResult::NotFound(proof) => {
359                // Likewise this cannot fail.
360                snapshot.non_membership_remember(account, proof).unwrap()
361            },
362            LookupResult::NotInMemory => {
363                bail!("missing account {account}");
364            },
365        }
366    }
367
368    Ok(snapshot)
369}
370
371/// Get a partial snapshot of the given reward state, which contains only the specified accounts.
372///
373/// Fails if one of the requested accounts is not represented in the original `state`.
374pub fn retain_v1_reward_accounts(
375    state: &RewardMerkleTreeV1,
376    accounts: impl IntoIterator<Item = RewardAccountV1>,
377) -> anyhow::Result<RewardMerkleTreeV1> {
378    let mut snapshot = RewardMerkleTreeV1::from_commitment(state.commitment());
379    for account in accounts {
380        match state.universal_lookup(account) {
381            LookupResult::Ok(elem, proof) => {
382                // This remember cannot fail, since we just constructed a valid proof, and are
383                // remembering into a tree with the same commitment.
384                snapshot.remember(account, *elem, proof).unwrap();
385            },
386            LookupResult::NotFound(proof) => {
387                // Likewise this cannot fail.
388                snapshot.non_membership_remember(account, proof).unwrap()
389            },
390            LookupResult::NotInMemory => {
391                bail!("missing account {account}");
392            },
393        }
394    }
395
396    Ok(snapshot)
397}