hotshot_example_types/membership/
fetcher.rs1use std::{collections::BTreeMap, sync::Arc};
7
8use alloy::transports::BoxFuture;
9use anyhow::Context;
10use async_broadcast::{Receiver, RecvError};
11use hotshot::traits::NodeImplementation;
12use hotshot_types::{
13 data::{Leaf2, ViewNumber},
14 event::{Event, EventType},
15 message::{Message, MessageKind},
16 traits::{
17 block_contents::BlockHeader, network::ConnectedNetwork, node_implementation::NodeType,
18 },
19 vote::HasViewNumber,
20};
21use tokio::task::JoinHandle;
22use vbs::{BinarySerializer, bincode_serializer::BincodeSerializer, version::StaticVersion};
23
24use crate::storage_types::TestStorage;
25
26pub struct Leaf2Fetcher<TYPES: NodeType> {
27 pub network_functions: NetworkFunctions<TYPES>,
28 pub storage: TestStorage<TYPES>,
29 pub listener: Option<JoinHandle<()>>,
30 pub public_key: TYPES::SignatureKey,
31 pub network_receiver: Option<Receiver<Event<TYPES>>>,
32}
33
34pub type RecvMessageFn =
35 std::sync::Arc<dyn Fn() -> BoxFuture<'static, anyhow::Result<Vec<u8>>> + Send + Sync>;
36
37pub type DirectMessageFn<TYPES> = std::sync::Arc<
38 dyn Fn(
39 ViewNumber,
40 Vec<u8>,
41 <TYPES as NodeType>::SignatureKey,
42 ) -> BoxFuture<'static, anyhow::Result<()>>
43 + Send
44 + Sync,
45>;
46
47#[derive(Clone)]
48pub struct NetworkFunctions<TYPES: NodeType> {
49 direct_message: DirectMessageFn<TYPES>,
50}
51
52pub async fn direct_message_impl<TYPES: NodeType, I: NodeImplementation<TYPES>>(
53 network: Arc<<I as NodeImplementation<TYPES>>::Network>,
54 view: ViewNumber,
55 message: Vec<u8>,
56 recipient: <TYPES as NodeType>::SignatureKey,
57) -> anyhow::Result<()> {
58 network
59 .direct_message(view, message, recipient.clone())
60 .await
61 .context(format!("Failed to send message to recipient {recipient}"))
62}
63
64pub fn direct_message_fn<TYPES: NodeType, I: NodeImplementation<TYPES>>(
65 network: Arc<<I as NodeImplementation<TYPES>>::Network>,
66) -> DirectMessageFn<TYPES> {
67 Arc::new(move |view, message, recipient| {
68 let network = network.clone();
69 Box::pin(direct_message_impl::<TYPES, I>(
70 network, view, message, recipient,
71 ))
72 })
73}
74
75pub fn network_functions<TYPES: NodeType, I: NodeImplementation<TYPES>>(
76 network: Arc<<I as NodeImplementation<TYPES>>::Network>,
77) -> NetworkFunctions<TYPES> {
78 let direct_message = direct_message_fn::<TYPES, I>(network.clone());
79
80 NetworkFunctions { direct_message }
81}
82
83impl<TYPES: NodeType> Leaf2Fetcher<TYPES> {
84 pub fn new<I: NodeImplementation<TYPES>>(
85 network: Arc<<I as NodeImplementation<TYPES>>::Network>,
86 storage: TestStorage<TYPES>,
87 public_key: TYPES::SignatureKey,
88 ) -> Self {
89 let listener = None;
90
91 let network_functions: NetworkFunctions<TYPES> = network_functions::<TYPES, I>(network);
92 Self {
93 network_functions,
94 storage,
95 listener,
96 public_key,
97 network_receiver: None,
98 }
99 }
100
101 pub fn set_external_channel(&mut self, mut network_receiver: Receiver<Event<TYPES>>) {
102 let public_key = self.public_key.clone();
103 let storage = self.storage.clone();
104 let network_functions = self.network_functions.clone();
105
106 self.network_receiver = Some(network_receiver.clone());
107
108 let listener = tokio::spawn(async move {
109 loop {
110 match network_receiver.recv_direct().await {
111 Ok(Event {
112 view_number: view,
113 event: EventType::ExternalMessageReceived { sender: _, data },
114 }) => {
115 let (requested_height, requester): (u64, TYPES::SignatureKey) =
116 match bincode::deserialize(&data) {
117 Ok(message) => message,
118 Err(e) => {
119 tracing::debug!("Failed to deserialize message: {e:?}");
120 continue;
121 },
122 };
123
124 let leaves: BTreeMap<u64, Leaf2<TYPES>> = storage
125 .inner
126 .read()
127 .await
128 .proposals_wrapper
129 .values()
130 .map(|proposal| {
131 (
132 proposal.data.block_header().block_number(),
133 Leaf2::from_quorum_proposal(&proposal.data.clone()),
134 )
135 })
136 .collect();
137
138 let heights = leaves.keys().collect::<Vec<_>>();
139
140 let Some(leaf) = leaves.get(&requested_height) else {
141 tracing::error!(
142 "Block at height {requested_height} not found in storage.\n\n \
143 stored leaf heights: {heights:?}"
144 );
145 continue;
146 };
147
148 let leaf_response = Message {
149 sender: public_key.clone(),
150 kind: MessageKind::<TYPES>::External(
151 bincode::serialize(&leaf).expect("Failed to serialize leaf"),
152 ),
153 };
154
155 let serialized_leaf_response =
156 BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_response)
157 .expect("Failed to serialize leaf response");
158
159 if let Err(e) = (network_functions.direct_message)(
160 view.u64().into(),
161 serialized_leaf_response,
162 requester,
163 )
164 .await
165 {
166 tracing::error!(
167 "Failed to send leaf response in test membership fetcher: {e}, \
168 requested height: {requested_height}"
169 );
170 };
171 },
172 Err(RecvError::Closed) => {
173 break;
174 },
175 _ => {
176 continue;
177 },
178 }
179 }
180 });
181
182 self.listener = Some(listener);
183 }
184
185 pub async fn fetch_leaf(
186 &self,
187 height: u64,
188 source: TYPES::SignatureKey,
189 ) -> anyhow::Result<Leaf2<TYPES>> {
190 let leaf_request = Message {
191 sender: self.public_key.clone(),
192 kind: MessageKind::<TYPES>::External(
193 bincode::serialize(&(height, self.public_key.clone()))
194 .expect("Failed to serialize leaf request"),
195 ),
196 };
197 let view = leaf_request.view_number();
198
199 let leaves: BTreeMap<u64, Leaf2<TYPES>> = self
200 .storage
201 .inner
202 .read()
203 .await
204 .proposals_wrapper
205 .values()
206 .map(|proposal| {
207 (
208 proposal.data.block_header().block_number(),
209 Leaf2::from_quorum_proposal(&proposal.data.clone()),
210 )
211 })
212 .collect();
213
214 let heights = leaves.keys().collect::<Vec<_>>();
215
216 if let Some(leaf) = leaves.get(&height) {
217 return Ok(leaf.clone());
218 };
219 tracing::debug!(
220 "Leaf at height {height} not found in storage. Stored leaf heights: {heights:?}"
221 );
222
223 let mut network_receiver = self
224 .network_receiver
225 .clone()
226 .expect("Tried to fetch leaf before calling `set_external_channel`");
227
228 let serialized_leaf_request =
229 BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_request)
230 .expect("Failed to serialize leaf request");
231
232 if let Err(e) = (self.network_functions.direct_message)(
233 view.u64().into(),
234 serialized_leaf_request,
235 source,
236 )
237 .await
238 {
239 tracing::error!("Failed to send leaf request in test membership fetcher: {e}");
240 };
241
242 tokio::time::timeout(std::time::Duration::from_millis(20), async {
243 loop {
244 match network_receiver.recv_direct().await {
245 Ok(Event {
246 view_number: _,
247 event: EventType::ExternalMessageReceived { sender: _, data },
248 }) => {
249 let leaf: Leaf2<TYPES> = match bincode::deserialize(&data) {
250 Ok(message) => message,
251 Err(e) => {
252 tracing::debug!("Failed to deserialize message: {e:?}");
253 continue;
254 },
255 };
256
257 if leaf.height() == height {
258 return Ok(leaf);
259 }
260 },
261 Err(RecvError::Closed) => {
262 break Err(anyhow::anyhow!(
263 "Failed to fetch leaf: network task receiver closed"
264 ));
265 },
266 _ => {
267 continue;
268 },
269 }
270 }
271 })
272 .await
273 .context("Leaf fetch timed out")?
274 }
275}