hotshot/traits/networking/
memory_network.rs1use core::time::Duration;
13use std::{
14 fmt::Debug,
15 sync::{
16 atomic::{AtomicUsize, Ordering},
17 Arc,
18 },
19};
20
21use async_lock::{Mutex, RwLock};
22use async_trait::async_trait;
23use dashmap::DashMap;
24use hotshot_types::{
25 boxed_sync,
26 data::ViewNumber,
27 traits::{
28 network::{
29 AsyncGenerator, BroadcastDelay, ConnectedNetwork, TestableNetworkingImplementation,
30 Topic,
31 },
32 node_implementation::NodeType,
33 signature_key::SignatureKey,
34 },
35 BoxSyncFuture,
36};
37use tokio::{
38 spawn,
39 sync::mpsc::{channel, error::SendError, Receiver, Sender},
40};
41use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
42
43use super::{NetworkError, NetworkReliability};
44
45#[derive(derive_more::Debug)]
50pub struct MasterMap<K: SignatureKey> {
51 #[debug(skip)]
53 map: DashMap<K, MemoryNetwork<K>>,
54
55 subscribed_map: DashMap<Topic, Vec<(K, MemoryNetwork<K>)>>,
57}
58
59impl<K: SignatureKey> MasterMap<K> {
60 #[must_use]
62 pub fn new() -> Arc<MasterMap<K>> {
63 Arc::new(MasterMap {
64 map: DashMap::new(),
65 subscribed_map: DashMap::new(),
66 })
67 }
68}
69
70#[derive(Debug)]
72struct MemoryNetworkInner<K: SignatureKey> {
73 input: RwLock<Option<Sender<Vec<u8>>>>,
75 output: Mutex<Receiver<Vec<u8>>>,
77 master_map: Arc<MasterMap<K>>,
79
80 in_flight_message_count: AtomicUsize,
82
83 reliability_config: Option<Box<dyn NetworkReliability>>,
85}
86
87#[derive(Clone)]
95pub struct MemoryNetwork<K: SignatureKey> {
96 inner: Arc<MemoryNetworkInner<K>>,
98}
99
100impl<K: SignatureKey> Debug for MemoryNetwork<K> {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("MemoryNetwork")
103 .field("inner", &"inner")
104 .finish()
105 }
106}
107
108impl<K: SignatureKey> MemoryNetwork<K> {
109 pub fn new(
111 pub_key: &K,
112 master_map: &Arc<MasterMap<K>>,
113 subscribed_topics: &[Topic],
114 reliability_config: Option<Box<dyn NetworkReliability>>,
115 ) -> MemoryNetwork<K> {
116 info!("Attaching new MemoryNetwork");
117 let (input, mut task_recv) = channel(128);
118 let (task_send, output) = channel(128);
119 let in_flight_message_count = AtomicUsize::new(0);
120 trace!("Channels open, spawning background task");
121
122 spawn(
123 async move {
124 debug!("Starting background task");
125 trace!("Entering processing loop");
126 while let Some(vec) = task_recv.recv().await {
127 trace!(?vec, "Incoming message");
128 let ts = task_send.clone();
130 let res = ts.send(vec).await;
131 if res.is_ok() {
132 trace!("Passed message to output queue");
133 } else {
134 error!("Output queue receivers are shutdown");
135 }
136 }
137 }
138 .instrument(info_span!("MemoryNetwork Background task", map = ?master_map)),
139 );
140 trace!("Notifying other networks of the new connected peer");
141 trace!("Task spawned, creating MemoryNetwork");
142 let mn = MemoryNetwork {
143 inner: Arc::new(MemoryNetworkInner {
144 input: RwLock::new(Some(input)),
145 output: Mutex::new(output),
146 master_map: Arc::clone(master_map),
147 in_flight_message_count,
148 reliability_config,
149 }),
150 };
151 master_map.map.insert(pub_key.clone(), mn.clone());
153 for topic in subscribed_topics {
155 master_map
156 .subscribed_map
157 .entry(*topic)
158 .or_default()
159 .push((pub_key.clone(), mn.clone()));
160 }
161
162 mn
163 }
164
165 async fn input(&self, message: Vec<u8>) -> Result<(), SendError<Vec<u8>>> {
167 self.inner
168 .in_flight_message_count
169 .fetch_add(1, Ordering::Relaxed);
170 let input = self.inner.input.read().await;
171 if let Some(input) = &*input {
172 input.send(message).await
173 } else {
174 Err(SendError(message))
175 }
176 }
177}
178
179impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
180 for MemoryNetwork<TYPES::SignatureKey>
181{
182 fn generator(
183 _expected_node_count: usize,
184 _num_bootstrap: usize,
185 _network_id: usize,
186 da_committee_size: usize,
187 reliability_config: Option<Box<dyn NetworkReliability>>,
188 _secondary_network_delay: Duration,
189 ) -> AsyncGenerator<Arc<Self>> {
190 let master: Arc<_> = MasterMap::new();
191 Box::pin(move |node_id| {
193 let privkey = TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
194 let pubkey = TYPES::SignatureKey::from_private(&privkey);
195
196 let subscribed_topics = if node_id < da_committee_size as u64 {
198 vec![Topic::Da, Topic::Global]
200 } else {
201 vec![Topic::Global]
203 };
204
205 let net = MemoryNetwork::new(
206 &pubkey,
207 &master,
208 &subscribed_topics,
209 reliability_config.clone(),
210 );
211 Box::pin(async move { net.into() })
212 })
213 }
214
215 fn in_flight_message_count(&self) -> Option<usize> {
216 Some(self.inner.in_flight_message_count.load(Ordering::Relaxed))
217 }
218}
219
220#[async_trait]
222impl<K: SignatureKey + 'static> ConnectedNetwork<K> for MemoryNetwork<K> {
223 #[instrument(name = "MemoryNetwork::ready_blocking")]
224 async fn wait_for_ready(&self) {}
225
226 fn pause(&self) {
227 unimplemented!("Pausing not implemented for the Memory network");
228 }
229
230 fn resume(&self) {
231 unimplemented!("Resuming not implemented for the Memory network");
232 }
233
234 #[instrument(name = "MemoryNetwork::shut_down")]
235 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
236 where
237 'a: 'b,
238 Self: 'b,
239 {
240 let closure = async move {
241 *self.inner.input.write().await = None;
242 };
243 boxed_sync(closure)
244 }
245
246 #[instrument(name = "MemoryNetwork::broadcast_message")]
247 async fn broadcast_message(
248 &self,
249 _: ViewNumber,
250 message: Vec<u8>,
251 topic: Topic,
252 _broadcast_delay: BroadcastDelay,
253 ) -> Result<(), NetworkError> {
254 trace!(?message, "Broadcasting message");
255 for node in self
256 .inner
257 .master_map
258 .subscribed_map
259 .entry(topic)
260 .or_default()
261 .iter()
262 {
263 let (key, node) = node;
265 trace!(?key, "Sending message to node");
266 if let Some(ref config) = &self.inner.reliability_config {
267 {
268 let node2 = node.clone();
269 let fut = config.chaos_send_msg(
270 message.clone(),
271 Arc::new(move |msg: Vec<u8>| {
272 let node3 = (node2).clone();
273 boxed_sync(async move {
274 let _res = node3.input(msg).await;
275 })
278 }),
279 );
280 spawn(fut);
281 }
282 } else {
283 let res = node.input(message.clone()).await;
284 match res {
285 Ok(()) => {
286 trace!(?key, "Delivered message to remote");
287 },
288 Err(e) => {
289 warn!(?e, ?key, "Error sending broadcast message to node");
290 },
291 }
292 }
293 }
294 Ok(())
295 }
296
297 #[instrument(name = "MemoryNetwork::da_broadcast_message")]
298 async fn da_broadcast_message(
299 &self,
300 _: ViewNumber,
301 message: Vec<u8>,
302 recipients: Vec<K>,
303 _broadcast_delay: BroadcastDelay,
304 ) -> Result<(), NetworkError> {
305 trace!(?message, "Broadcasting message to DA");
306 for node in self
307 .inner
308 .master_map
309 .subscribed_map
310 .entry(Topic::Da)
311 .or_default()
312 .iter()
313 {
314 if !recipients.contains(&node.0) {
315 tracing::trace!("Skipping node because not in recipient list: {:?}", node.0);
316 continue;
317 }
318 let (key, node) = node;
320 trace!(?key, "Sending message to node");
321 if let Some(ref config) = &self.inner.reliability_config {
322 {
323 let node2 = node.clone();
324 let fut = config.chaos_send_msg(
325 message.clone(),
326 Arc::new(move |msg: Vec<u8>| {
327 let node3 = (node2).clone();
328 boxed_sync(async move {
329 let _res = node3.input(msg).await;
330 })
333 }),
334 );
335 spawn(fut);
336 }
337 } else {
338 let res = node.input(message.clone()).await;
339 match res {
340 Ok(()) => {
341 trace!(?key, "Delivered message to remote");
342 },
343 Err(e) => {
344 warn!(?e, ?key, "Error sending broadcast message to node");
345 },
346 }
347 }
348 }
349 Ok(())
350 }
351
352 #[instrument(name = "MemoryNetwork::direct_message")]
353 async fn direct_message(
354 &self,
355 _: ViewNumber,
356 message: Vec<u8>,
357 recipient: K,
358 ) -> Result<(), NetworkError> {
359 trace!("Message bincoded, finding recipient");
362 if let Some(node) = self.inner.master_map.map.get(&recipient) {
363 let node = node.value().clone();
364 if let Some(ref config) = &self.inner.reliability_config {
365 {
366 let fut = config.chaos_send_msg(
367 message.clone(),
368 Arc::new(move |msg: Vec<u8>| {
369 let node2 = node.clone();
370 boxed_sync(async move {
371 let _res = node2.input(msg).await;
372 })
375 }),
376 );
377 spawn(fut);
378 }
379 Ok(())
380 } else {
381 let res = node.input(message).await;
382 match res {
383 Ok(()) => {
384 trace!(?recipient, "Delivered message to remote");
385 Ok(())
386 },
387 Err(e) => Err(NetworkError::MessageSendError(format!(
388 "error sending direct message to node: {e}",
389 ))),
390 }
391 }
392 } else {
393 Err(NetworkError::MessageSendError(
394 "node does not exist".to_string(),
395 ))
396 }
397 }
398
399 #[instrument(name = "MemoryNetwork::recv_messages", skip_all)]
404 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
405 let ret = self
406 .inner
407 .output
408 .lock()
409 .await
410 .recv()
411 .await
412 .ok_or(NetworkError::ShutDown)?;
413 self.inner
414 .in_flight_message_count
415 .fetch_sub(1, Ordering::Relaxed);
416 Ok(ret)
417 }
418}