hotshot_example_types/membership/
fetcher.rs1use std::{collections::BTreeMap, sync::Arc};
7
8use alloy::transports::BoxFuture;
9use anyhow::Context;
10use async_broadcast::{Receiver, RecvError};
11use hotshot::traits::NodeImplementation;
12use hotshot_types::{
13 data::Leaf2,
14 event::{Event, EventType},
15 message::{Message, MessageKind},
16 traits::{
17 block_contents::BlockHeader, network::ConnectedNetwork, node_implementation::NodeType,
18 },
19};
20use tokio::task::JoinHandle;
21use vbs::{bincode_serializer::BincodeSerializer, version::StaticVersion, BinarySerializer};
22
23use crate::storage_types::TestStorage;
24
25pub struct Leaf2Fetcher<TYPES: NodeType> {
26 pub network_functions: NetworkFunctions<TYPES>,
27 pub storage: TestStorage<TYPES>,
28 pub listener: Option<JoinHandle<()>>,
29 pub public_key: TYPES::SignatureKey,
30 pub network_receiver: Option<Receiver<Event<TYPES>>>,
31}
32
33pub type RecvMessageFn =
34 std::sync::Arc<dyn Fn() -> BoxFuture<'static, anyhow::Result<Vec<u8>>> + Send + Sync>;
35
36pub type DirectMessageFn<TYPES> = std::sync::Arc<
37 dyn Fn(Vec<u8>, <TYPES as NodeType>::SignatureKey) -> BoxFuture<'static, anyhow::Result<()>>
38 + Send
39 + Sync,
40>;
41
42#[derive(Clone)]
43pub struct NetworkFunctions<TYPES: NodeType> {
44 direct_message: DirectMessageFn<TYPES>,
45}
46
47pub async fn direct_message_impl<TYPES: NodeType, I: NodeImplementation<TYPES>>(
48 network: Arc<<I as NodeImplementation<TYPES>>::Network>,
49 message: Vec<u8>,
50 recipient: <TYPES as NodeType>::SignatureKey,
51) -> anyhow::Result<()> {
52 network
53 .direct_message(message, recipient.clone())
54 .await
55 .context(format!("Failed to send message to recipient {recipient}"))
56}
57
58pub fn direct_message_fn<TYPES: NodeType, I: NodeImplementation<TYPES>>(
59 network: Arc<<I as NodeImplementation<TYPES>>::Network>,
60) -> DirectMessageFn<TYPES> {
61 Arc::new(move |message, recipient| {
62 let network = network.clone();
63 Box::pin(direct_message_impl::<TYPES, I>(network, message, recipient))
64 })
65}
66
67pub fn network_functions<TYPES: NodeType, I: NodeImplementation<TYPES>>(
68 network: Arc<<I as NodeImplementation<TYPES>>::Network>,
69) -> NetworkFunctions<TYPES> {
70 let direct_message = direct_message_fn::<TYPES, I>(network.clone());
71
72 NetworkFunctions { direct_message }
73}
74
75impl<TYPES: NodeType> Leaf2Fetcher<TYPES> {
76 pub fn new<I: NodeImplementation<TYPES>>(
77 network: Arc<<I as NodeImplementation<TYPES>>::Network>,
78 storage: TestStorage<TYPES>,
79 public_key: TYPES::SignatureKey,
80 ) -> Self {
81 let listener = None;
82
83 let network_functions: NetworkFunctions<TYPES> = network_functions::<TYPES, I>(network);
84 Self {
85 network_functions,
86 storage,
87 listener,
88 public_key,
89 network_receiver: None,
90 }
91 }
92
93 pub fn set_external_channel(&mut self, mut network_receiver: Receiver<Event<TYPES>>) {
94 let public_key = self.public_key.clone();
95 let storage = self.storage.clone();
96 let network_functions = self.network_functions.clone();
97
98 self.network_receiver = Some(network_receiver.clone());
99
100 let listener = tokio::spawn(async move {
101 loop {
102 match network_receiver.recv_direct().await {
103 Ok(Event {
104 view_number: _,
105 event: EventType::ExternalMessageReceived { sender: _, data },
106 }) => {
107 let (requested_height, requester): (u64, TYPES::SignatureKey) =
108 match bincode::deserialize(&data) {
109 Ok(message) => message,
110 Err(e) => {
111 tracing::debug!("Failed to deserialize message: {e:?}");
112 continue;
113 },
114 };
115
116 let leaves: BTreeMap<u64, Leaf2<TYPES>> = storage
117 .inner
118 .read()
119 .await
120 .proposals_wrapper
121 .values()
122 .map(|proposal| {
123 (
124 proposal.data.block_header().block_number(),
125 Leaf2::from_quorum_proposal(&proposal.data.clone()),
126 )
127 })
128 .collect();
129
130 let heights = leaves.keys().collect::<Vec<_>>();
131
132 let Some(leaf) = leaves.get(&requested_height) else {
133 tracing::error!(
134 "Block at height {requested_height} not found in storage.\n\n \
135 stored leaf heights: {heights:?}"
136 );
137 continue;
138 };
139
140 let leaf_response = Message {
141 sender: public_key.clone(),
142 kind: MessageKind::<TYPES>::External(
143 bincode::serialize(&leaf).expect("Failed to serialize leaf"),
144 ),
145 };
146
147 let serialized_leaf_response =
148 BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_response)
149 .expect("Failed to serialize leaf response");
150
151 if let Err(e) =
152 (network_functions.direct_message)(serialized_leaf_response, requester)
153 .await
154 {
155 tracing::error!(
156 "Failed to send leaf response in test membership fetcher: {e}, \
157 requested height: {requested_height}"
158 );
159 };
160 },
161 Err(RecvError::Closed) => {
162 break;
163 },
164 _ => {
165 continue;
166 },
167 }
168 }
169 });
170
171 self.listener = Some(listener);
172 }
173
174 pub async fn fetch_leaf(
175 &self,
176 height: u64,
177 source: TYPES::SignatureKey,
178 ) -> anyhow::Result<Leaf2<TYPES>> {
179 let leaf_request = Message {
180 sender: self.public_key.clone(),
181 kind: MessageKind::<TYPES>::External(
182 bincode::serialize(&(height, self.public_key.clone()))
183 .expect("Failed to serialize leaf request"),
184 ),
185 };
186
187 let leaves: BTreeMap<u64, Leaf2<TYPES>> = self
188 .storage
189 .inner
190 .read()
191 .await
192 .proposals_wrapper
193 .values()
194 .map(|proposal| {
195 (
196 proposal.data.block_header().block_number(),
197 Leaf2::from_quorum_proposal(&proposal.data.clone()),
198 )
199 })
200 .collect();
201
202 let heights = leaves.keys().collect::<Vec<_>>();
203
204 if let Some(leaf) = leaves.get(&height) {
205 return Ok(leaf.clone());
206 };
207 tracing::debug!(
208 "Leaf at height {height} not found in storage. Stored leaf heights: {heights:?}"
209 );
210
211 let mut network_receiver = self
212 .network_receiver
213 .clone()
214 .expect("Tried to fetch leaf before calling `set_external_channel`");
215
216 let serialized_leaf_request =
217 BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_request)
218 .expect("Failed to serialize leaf request");
219
220 if let Err(e) =
221 (self.network_functions.direct_message)(serialized_leaf_request, source).await
222 {
223 tracing::error!("Failed to send leaf request in test membership fetcher: {e}");
224 };
225
226 tokio::time::timeout(std::time::Duration::from_millis(20), async {
227 loop {
228 match network_receiver.recv_direct().await {
229 Ok(Event {
230 view_number: _,
231 event: EventType::ExternalMessageReceived { sender: _, data },
232 }) => {
233 let leaf: Leaf2<TYPES> = match bincode::deserialize(&data) {
234 Ok(message) => message,
235 Err(e) => {
236 tracing::debug!("Failed to deserialize message: {e:?}");
237 continue;
238 },
239 };
240
241 if leaf.height() == height {
242 return Ok(leaf);
243 }
244 },
245 Err(RecvError::Closed) => {
246 break Err(anyhow::anyhow!(
247 "Failed to fetch leaf: network task receiver closed"
248 ));
249 },
250 _ => {
251 continue;
252 },
253 }
254 }
255 })
256 .await
257 .context("Leaf fetch timed out")?
258 }
259}