sequencer/request_response/
mod.rs1use std::future::Future;
2
3use data_source::DataSource;
4use derive_more::derive::Deref;
5use espresso_types::{traits::SequencerPersistence, PubKey, SeqTypes};
6use hotshot::{traits::NodeImplementation, types::BLSPrivKey};
7use hotshot_types::traits::{network::ConnectedNetwork, node_implementation::Versions};
8use network::Sender;
9use recipient_source::RecipientSource;
10use request::{Request, Response};
11use request_response::{
12 network::Bytes, RequestError, RequestResponse, RequestResponseConfig, RequestType,
13};
14use tokio::sync::mpsc::Receiver;
15
16pub mod catchup;
17pub mod data_source;
18pub mod network;
19pub mod recipient_source;
20pub mod request;
21
22#[derive(Clone, Deref)]
25pub struct RequestResponseProtocol<
26 I: NodeImplementation<SeqTypes>,
27 V: Versions,
28 N: ConnectedNetwork<PubKey>,
29 P: SequencerPersistence,
30> {
31 #[deref]
32 #[allow(clippy::type_complexity)]
33 inner: RequestResponse<
35 Sender,
36 Receiver<Bytes>,
37 Request,
38 RecipientSource<I, V>,
39 DataSource<I, V, N, P>,
40 PubKey,
41 >,
42
43 config: RequestResponseConfig,
46
47 public_key: PubKey,
49 private_key: BLSPrivKey,
51}
52
53impl<
54 I: NodeImplementation<SeqTypes>,
55 V: Versions,
56 N: ConnectedNetwork<PubKey>,
57 P: SequencerPersistence,
58 > RequestResponseProtocol<I, V, N, P>
59{
60 pub fn new(
62 config: RequestResponseConfig,
64 sender: Sender,
66 receiver: Receiver<Bytes>,
68 recipient_source: RecipientSource<I, V>,
71 data_source: DataSource<I, V, N, P>,
74 public_key: PubKey,
76 private_key: BLSPrivKey,
78 ) -> Self {
79 Self {
80 inner: RequestResponse::new(
81 config.clone(),
82 sender,
83 receiver,
84 recipient_source,
85 data_source,
86 ),
87 config,
88 public_key,
89 private_key,
90 }
91 }
92}
93
94impl<
95 I: NodeImplementation<SeqTypes>,
96 V: Versions,
97 N: ConnectedNetwork<PubKey>,
98 P: SequencerPersistence,
99 > RequestResponseProtocol<I, V, N, P>
100{
101 pub async fn request_indefinitely<F, Fut, O>(
102 &self,
103 request: Request,
105 request_type: RequestType,
107 response_validation_fn: F,
109 ) -> std::result::Result<O, RequestError>
110 where
111 F: Fn(&Request, Response) -> Fut + Send + Sync + 'static + Clone,
112 Fut: Future<Output = anyhow::Result<O>> + Send + Sync + 'static,
113 O: Send + Sync + 'static + Clone,
114 {
115 self.inner
117 .request_indefinitely(
118 &self.public_key,
119 &self.private_key,
120 request_type,
121 self.config.incoming_request_ttl,
122 request,
123 response_validation_fn,
124 )
125 .await
126 }
127}