hotshot_example_types/membership/
fetcher.rs1use std::{collections::BTreeMap, sync::Arc};
7
8use alloy::transports::BoxFuture;
9use anyhow::Context;
10use async_broadcast::{Receiver, RecvError};
11use hotshot::traits::NodeImplementation;
12use hotshot_types::{
13 data::{Leaf2, ViewNumber},
14 event::{Event, EventType},
15 message::{Message, MessageKind},
16 traits::{
17 block_contents::BlockHeader,
18 network::ConnectedNetwork,
19 node_implementation::{ConsensusTime, NodeType},
20 },
21 vote::HasViewNumber,
22};
23use tokio::task::JoinHandle;
24use vbs::{bincode_serializer::BincodeSerializer, version::StaticVersion, BinarySerializer};
25
26use crate::storage_types::TestStorage;
27
28pub struct Leaf2Fetcher<TYPES: NodeType> {
29 pub network_functions: NetworkFunctions<TYPES>,
30 pub storage: TestStorage<TYPES>,
31 pub listener: Option<JoinHandle<()>>,
32 pub public_key: TYPES::SignatureKey,
33 pub network_receiver: Option<Receiver<Event<TYPES>>>,
34}
35
36pub type RecvMessageFn =
37 std::sync::Arc<dyn Fn() -> BoxFuture<'static, anyhow::Result<Vec<u8>>> + Send + Sync>;
38
39pub type DirectMessageFn<TYPES> = std::sync::Arc<
40 dyn Fn(
41 ViewNumber,
42 Vec<u8>,
43 <TYPES as NodeType>::SignatureKey,
44 ) -> BoxFuture<'static, anyhow::Result<()>>
45 + Send
46 + Sync,
47>;
48
49#[derive(Clone)]
50pub struct NetworkFunctions<TYPES: NodeType> {
51 direct_message: DirectMessageFn<TYPES>,
52}
53
54pub async fn direct_message_impl<TYPES: NodeType, I: NodeImplementation<TYPES>>(
55 network: Arc<<I as NodeImplementation<TYPES>>::Network>,
56 view: ViewNumber,
57 message: Vec<u8>,
58 recipient: <TYPES as NodeType>::SignatureKey,
59) -> anyhow::Result<()> {
60 network
61 .direct_message(view, message, recipient.clone())
62 .await
63 .context(format!("Failed to send message to recipient {recipient}"))
64}
65
66pub fn direct_message_fn<TYPES: NodeType, I: NodeImplementation<TYPES>>(
67 network: Arc<<I as NodeImplementation<TYPES>>::Network>,
68) -> DirectMessageFn<TYPES> {
69 Arc::new(move |view, message, recipient| {
70 let network = network.clone();
71 Box::pin(direct_message_impl::<TYPES, I>(
72 network, view, message, recipient,
73 ))
74 })
75}
76
77pub fn network_functions<TYPES: NodeType, I: NodeImplementation<TYPES>>(
78 network: Arc<<I as NodeImplementation<TYPES>>::Network>,
79) -> NetworkFunctions<TYPES> {
80 let direct_message = direct_message_fn::<TYPES, I>(network.clone());
81
82 NetworkFunctions { direct_message }
83}
84
85impl<TYPES: NodeType> Leaf2Fetcher<TYPES> {
86 pub fn new<I: NodeImplementation<TYPES>>(
87 network: Arc<<I as NodeImplementation<TYPES>>::Network>,
88 storage: TestStorage<TYPES>,
89 public_key: TYPES::SignatureKey,
90 ) -> Self {
91 let listener = None;
92
93 let network_functions: NetworkFunctions<TYPES> = network_functions::<TYPES, I>(network);
94 Self {
95 network_functions,
96 storage,
97 listener,
98 public_key,
99 network_receiver: None,
100 }
101 }
102
103 pub fn set_external_channel(&mut self, mut network_receiver: Receiver<Event<TYPES>>) {
104 let public_key = self.public_key.clone();
105 let storage = self.storage.clone();
106 let network_functions = self.network_functions.clone();
107
108 self.network_receiver = Some(network_receiver.clone());
109
110 let listener = tokio::spawn(async move {
111 loop {
112 match network_receiver.recv_direct().await {
113 Ok(Event {
114 view_number: view,
115 event: EventType::ExternalMessageReceived { sender: _, data },
116 }) => {
117 let (requested_height, requester): (u64, TYPES::SignatureKey) =
118 match bincode::deserialize(&data) {
119 Ok(message) => message,
120 Err(e) => {
121 tracing::debug!("Failed to deserialize message: {e:?}");
122 continue;
123 },
124 };
125
126 let leaves: BTreeMap<u64, Leaf2<TYPES>> = storage
127 .inner
128 .read()
129 .await
130 .proposals_wrapper
131 .values()
132 .map(|proposal| {
133 (
134 proposal.data.block_header().block_number(),
135 Leaf2::from_quorum_proposal(&proposal.data.clone()),
136 )
137 })
138 .collect();
139
140 let heights = leaves.keys().collect::<Vec<_>>();
141
142 let Some(leaf) = leaves.get(&requested_height) else {
143 tracing::error!(
144 "Block at height {requested_height} not found in storage.\n\n \
145 stored leaf heights: {heights:?}"
146 );
147 continue;
148 };
149
150 let leaf_response = Message {
151 sender: public_key.clone(),
152 kind: MessageKind::<TYPES>::External(
153 bincode::serialize(&leaf).expect("Failed to serialize leaf"),
154 ),
155 };
156
157 let serialized_leaf_response =
158 BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_response)
159 .expect("Failed to serialize leaf response");
160
161 if let Err(e) = (network_functions.direct_message)(
162 view.u64().into(),
163 serialized_leaf_response,
164 requester,
165 )
166 .await
167 {
168 tracing::error!(
169 "Failed to send leaf response in test membership fetcher: {e}, \
170 requested height: {requested_height}"
171 );
172 };
173 },
174 Err(RecvError::Closed) => {
175 break;
176 },
177 _ => {
178 continue;
179 },
180 }
181 }
182 });
183
184 self.listener = Some(listener);
185 }
186
187 pub async fn fetch_leaf(
188 &self,
189 height: u64,
190 source: TYPES::SignatureKey,
191 ) -> anyhow::Result<Leaf2<TYPES>> {
192 let leaf_request = Message {
193 sender: self.public_key.clone(),
194 kind: MessageKind::<TYPES>::External(
195 bincode::serialize(&(height, self.public_key.clone()))
196 .expect("Failed to serialize leaf request"),
197 ),
198 };
199 let view = leaf_request.view_number();
200
201 let leaves: BTreeMap<u64, Leaf2<TYPES>> = self
202 .storage
203 .inner
204 .read()
205 .await
206 .proposals_wrapper
207 .values()
208 .map(|proposal| {
209 (
210 proposal.data.block_header().block_number(),
211 Leaf2::from_quorum_proposal(&proposal.data.clone()),
212 )
213 })
214 .collect();
215
216 let heights = leaves.keys().collect::<Vec<_>>();
217
218 if let Some(leaf) = leaves.get(&height) {
219 return Ok(leaf.clone());
220 };
221 tracing::debug!(
222 "Leaf at height {height} not found in storage. Stored leaf heights: {heights:?}"
223 );
224
225 let mut network_receiver = self
226 .network_receiver
227 .clone()
228 .expect("Tried to fetch leaf before calling `set_external_channel`");
229
230 let serialized_leaf_request =
231 BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_request)
232 .expect("Failed to serialize leaf request");
233
234 if let Err(e) = (self.network_functions.direct_message)(
235 view.u64().into(),
236 serialized_leaf_request,
237 source,
238 )
239 .await
240 {
241 tracing::error!("Failed to send leaf request in test membership fetcher: {e}");
242 };
243
244 tokio::time::timeout(std::time::Duration::from_millis(20), async {
245 loop {
246 match network_receiver.recv_direct().await {
247 Ok(Event {
248 view_number: _,
249 event: EventType::ExternalMessageReceived { sender: _, data },
250 }) => {
251 let leaf: Leaf2<TYPES> = match bincode::deserialize(&data) {
252 Ok(message) => message,
253 Err(e) => {
254 tracing::debug!("Failed to deserialize message: {e:?}");
255 continue;
256 },
257 };
258
259 if leaf.height() == height {
260 return Ok(leaf);
261 }
262 },
263 Err(RecvError::Closed) => {
264 break Err(anyhow::anyhow!(
265 "Failed to fetch leaf: network task receiver closed"
266 ));
267 },
268 _ => {
269 continue;
270 },
271 }
272 }
273 })
274 .await
275 .context("Leaf fetch timed out")?
276 }
277}