hotshot_query_service/data_source/fetching/
transaction.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use derive_more::From;
19use futures::future::{BoxFuture, FutureExt};
20use hotshot_types::traits::node_implementation::NodeType;
21
22use super::{AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Notifiers};
23use crate::{
24 availability::{BlockWithTransaction, QueryableHeader, QueryablePayload, TransactionHash},
25 data_source::{
26 storage::{
27 pruning::PrunedHeightStorage, AvailabilityStorage, NodeStorage,
28 UpdateAvailabilityStorage,
29 },
30 update::VersionedDataSource,
31 },
32 types::HeightIndexed,
33 Header, Payload, QueryError, QueryResult,
34};
35
36#[derive(Clone, Copy, Debug, From)]
37pub(super) struct TransactionRequest<Types: NodeType>(TransactionHash<Types>);
38
39impl<Types: NodeType> FetchRequest for TransactionRequest<Types> {}
40
41#[async_trait]
42impl<Types> Fetchable<Types> for BlockWithTransaction<Types>
43where
44 Types: NodeType,
45 Header<Types>: QueryableHeader<Types>,
46 Payload<Types>: QueryablePayload<Types>,
47{
48 type Request = TransactionRequest<Types>;
49
50 fn satisfies(&self, req: Self::Request) -> bool {
51 req.0 == self.transaction.hash()
52 }
53
54 async fn passive_fetch(
55 notifications: &Notifiers<Types>,
56 req: Self::Request,
57 ) -> BoxFuture<'static, Option<Self>> {
58 let wait_block = notifications
61 .block
62 .wait_for(move |block| block.transaction_by_hash(req.0).is_some())
63 .await;
64
65 async move {
66 let block = wait_block.await?;
67 BlockWithTransaction::with_hash(block, req.0)
68 }
69 .boxed()
70 }
71
72 async fn active_fetch<S, P>(
73 _tx: &mut impl AvailabilityStorage<Types>,
74 _fetcher: Arc<Fetcher<Types, S, P>>,
75 req: Self::Request,
76 ) -> anyhow::Result<()>
77 where
78 S: VersionedDataSource + 'static,
79 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
80 for<'a> S::ReadOnly<'a>:
81 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
82 P: AvailabilityProvider<Types>,
83 {
84 tracing::debug!("not fetching unknown transaction {req:?}");
88 Ok(())
89 }
90
91 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
92 where
93 S: AvailabilityStorage<Types>,
94 {
95 let hash = req.0;
96 let block = storage.get_block_with_transaction(hash).await?;
97 let height = block.height();
98 BlockWithTransaction::with_hash(block, hash).ok_or(QueryError::Error {
99 message: format!(
100 "transaction index inconsistent: block {height} contains no transaction {hash}"
101 ),
102 })
103 }
104}