hotshot/traits/networking/
memory_network.rs1use core::time::Duration;
13use std::{
14 fmt::Debug,
15 sync::{
16 atomic::{AtomicUsize, Ordering},
17 Arc,
18 },
19};
20
21use async_lock::{Mutex, RwLock};
22use async_trait::async_trait;
23use dashmap::DashMap;
24use hotshot_types::{
25 boxed_sync,
26 traits::{
27 network::{
28 AsyncGenerator, BroadcastDelay, ConnectedNetwork, TestableNetworkingImplementation,
29 Topic,
30 },
31 node_implementation::NodeType,
32 signature_key::SignatureKey,
33 },
34 BoxSyncFuture,
35};
36use tokio::{
37 spawn,
38 sync::mpsc::{channel, error::SendError, Receiver, Sender},
39};
40use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
41
42use super::{NetworkError, NetworkReliability};
43
44#[derive(derive_more::Debug)]
49pub struct MasterMap<K: SignatureKey> {
50 #[debug(skip)]
52 map: DashMap<K, MemoryNetwork<K>>,
53
54 subscribed_map: DashMap<Topic, Vec<(K, MemoryNetwork<K>)>>,
56}
57
58impl<K: SignatureKey> MasterMap<K> {
59 #[must_use]
61 pub fn new() -> Arc<MasterMap<K>> {
62 Arc::new(MasterMap {
63 map: DashMap::new(),
64 subscribed_map: DashMap::new(),
65 })
66 }
67}
68
69#[derive(Debug)]
71struct MemoryNetworkInner<K: SignatureKey> {
72 input: RwLock<Option<Sender<Vec<u8>>>>,
74 output: Mutex<Receiver<Vec<u8>>>,
76 master_map: Arc<MasterMap<K>>,
78
79 in_flight_message_count: AtomicUsize,
81
82 reliability_config: Option<Box<dyn NetworkReliability>>,
84}
85
86#[derive(Clone)]
94pub struct MemoryNetwork<K: SignatureKey> {
95 inner: Arc<MemoryNetworkInner<K>>,
97}
98
99impl<K: SignatureKey> Debug for MemoryNetwork<K> {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 f.debug_struct("MemoryNetwork")
102 .field("inner", &"inner")
103 .finish()
104 }
105}
106
107impl<K: SignatureKey> MemoryNetwork<K> {
108 pub fn new(
110 pub_key: &K,
111 master_map: &Arc<MasterMap<K>>,
112 subscribed_topics: &[Topic],
113 reliability_config: Option<Box<dyn NetworkReliability>>,
114 ) -> MemoryNetwork<K> {
115 info!("Attaching new MemoryNetwork");
116 let (input, mut task_recv) = channel(128);
117 let (task_send, output) = channel(128);
118 let in_flight_message_count = AtomicUsize::new(0);
119 trace!("Channels open, spawning background task");
120
121 spawn(
122 async move {
123 debug!("Starting background task");
124 trace!("Entering processing loop");
125 while let Some(vec) = task_recv.recv().await {
126 trace!(?vec, "Incoming message");
127 let ts = task_send.clone();
129 let res = ts.send(vec).await;
130 if res.is_ok() {
131 trace!("Passed message to output queue");
132 } else {
133 error!("Output queue receivers are shutdown");
134 }
135 }
136 }
137 .instrument(info_span!("MemoryNetwork Background task", map = ?master_map)),
138 );
139 trace!("Notifying other networks of the new connected peer");
140 trace!("Task spawned, creating MemoryNetwork");
141 let mn = MemoryNetwork {
142 inner: Arc::new(MemoryNetworkInner {
143 input: RwLock::new(Some(input)),
144 output: Mutex::new(output),
145 master_map: Arc::clone(master_map),
146 in_flight_message_count,
147 reliability_config,
148 }),
149 };
150 master_map.map.insert(pub_key.clone(), mn.clone());
152 for topic in subscribed_topics {
154 master_map
155 .subscribed_map
156 .entry(topic.clone())
157 .or_default()
158 .push((pub_key.clone(), mn.clone()));
159 }
160
161 mn
162 }
163
164 async fn input(&self, message: Vec<u8>) -> Result<(), SendError<Vec<u8>>> {
166 self.inner
167 .in_flight_message_count
168 .fetch_add(1, Ordering::Relaxed);
169 let input = self.inner.input.read().await;
170 if let Some(input) = &*input {
171 input.send(message).await
172 } else {
173 Err(SendError(message))
174 }
175 }
176}
177
178impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
179 for MemoryNetwork<TYPES::SignatureKey>
180{
181 fn generator(
182 _expected_node_count: usize,
183 _num_bootstrap: usize,
184 _network_id: usize,
185 da_committee_size: usize,
186 reliability_config: Option<Box<dyn NetworkReliability>>,
187 _secondary_network_delay: Duration,
188 ) -> AsyncGenerator<Arc<Self>> {
189 let master: Arc<_> = MasterMap::new();
190 Box::pin(move |node_id| {
192 let privkey = TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
193 let pubkey = TYPES::SignatureKey::from_private(&privkey);
194
195 let subscribed_topics = if node_id < da_committee_size as u64 {
197 vec![Topic::Da, Topic::Global]
199 } else {
200 vec![Topic::Global]
202 };
203
204 let net = MemoryNetwork::new(
205 &pubkey,
206 &master,
207 &subscribed_topics,
208 reliability_config.clone(),
209 );
210 Box::pin(async move { net.into() })
211 })
212 }
213
214 fn in_flight_message_count(&self) -> Option<usize> {
215 Some(self.inner.in_flight_message_count.load(Ordering::Relaxed))
216 }
217}
218
219#[async_trait]
221impl<K: SignatureKey + 'static> ConnectedNetwork<K> for MemoryNetwork<K> {
222 #[instrument(name = "MemoryNetwork::ready_blocking")]
223 async fn wait_for_ready(&self) {}
224
225 fn pause(&self) {
226 unimplemented!("Pausing not implemented for the Memory network");
227 }
228
229 fn resume(&self) {
230 unimplemented!("Resuming not implemented for the Memory network");
231 }
232
233 #[instrument(name = "MemoryNetwork::shut_down")]
234 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
235 where
236 'a: 'b,
237 Self: 'b,
238 {
239 let closure = async move {
240 *self.inner.input.write().await = None;
241 };
242 boxed_sync(closure)
243 }
244
245 #[instrument(name = "MemoryNetwork::broadcast_message")]
246 async fn broadcast_message(
247 &self,
248 message: Vec<u8>,
249 topic: Topic,
250 _broadcast_delay: BroadcastDelay,
251 ) -> Result<(), NetworkError> {
252 trace!(?message, "Broadcasting message");
253 for node in self
254 .inner
255 .master_map
256 .subscribed_map
257 .entry(topic)
258 .or_default()
259 .iter()
260 {
261 let (key, node) = node;
263 trace!(?key, "Sending message to node");
264 if let Some(ref config) = &self.inner.reliability_config {
265 {
266 let node2 = node.clone();
267 let fut = config.chaos_send_msg(
268 message.clone(),
269 Arc::new(move |msg: Vec<u8>| {
270 let node3 = (node2).clone();
271 boxed_sync(async move {
272 let _res = node3.input(msg).await;
273 })
276 }),
277 );
278 spawn(fut);
279 }
280 } else {
281 let res = node.input(message.clone()).await;
282 match res {
283 Ok(()) => {
284 trace!(?key, "Delivered message to remote");
285 },
286 Err(e) => {
287 warn!(?e, ?key, "Error sending broadcast message to node");
288 },
289 }
290 }
291 }
292 Ok(())
293 }
294
295 #[instrument(name = "MemoryNetwork::da_broadcast_message")]
296 async fn da_broadcast_message(
297 &self,
298 message: Vec<u8>,
299 recipients: Vec<K>,
300 _broadcast_delay: BroadcastDelay,
301 ) -> Result<(), NetworkError> {
302 trace!(?message, "Broadcasting message to DA");
303 for node in self
304 .inner
305 .master_map
306 .subscribed_map
307 .entry(Topic::Da)
308 .or_default()
309 .iter()
310 {
311 if !recipients.contains(&node.0) {
312 tracing::trace!("Skipping node because not in recipient list: {:?}", node.0);
313 continue;
314 }
315 let (key, node) = node;
317 trace!(?key, "Sending message to node");
318 if let Some(ref config) = &self.inner.reliability_config {
319 {
320 let node2 = node.clone();
321 let fut = config.chaos_send_msg(
322 message.clone(),
323 Arc::new(move |msg: Vec<u8>| {
324 let node3 = (node2).clone();
325 boxed_sync(async move {
326 let _res = node3.input(msg).await;
327 })
330 }),
331 );
332 spawn(fut);
333 }
334 } else {
335 let res = node.input(message.clone()).await;
336 match res {
337 Ok(()) => {
338 trace!(?key, "Delivered message to remote");
339 },
340 Err(e) => {
341 warn!(?e, ?key, "Error sending broadcast message to node");
342 },
343 }
344 }
345 }
346 Ok(())
347 }
348
349 #[instrument(name = "MemoryNetwork::direct_message")]
350 async fn direct_message(&self, message: Vec<u8>, recipient: K) -> Result<(), NetworkError> {
351 trace!("Message bincoded, finding recipient");
354 if let Some(node) = self.inner.master_map.map.get(&recipient) {
355 let node = node.value().clone();
356 if let Some(ref config) = &self.inner.reliability_config {
357 {
358 let fut = config.chaos_send_msg(
359 message.clone(),
360 Arc::new(move |msg: Vec<u8>| {
361 let node2 = node.clone();
362 boxed_sync(async move {
363 let _res = node2.input(msg).await;
364 })
367 }),
368 );
369 spawn(fut);
370 }
371 Ok(())
372 } else {
373 let res = node.input(message).await;
374 match res {
375 Ok(()) => {
376 trace!(?recipient, "Delivered message to remote");
377 Ok(())
378 },
379 Err(e) => Err(NetworkError::MessageSendError(format!(
380 "error sending direct message to node: {e}",
381 ))),
382 }
383 }
384 } else {
385 Err(NetworkError::MessageSendError(
386 "node does not exist".to_string(),
387 ))
388 }
389 }
390
391 #[instrument(name = "MemoryNetwork::recv_messages", skip_all)]
396 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
397 let ret = self
398 .inner
399 .output
400 .lock()
401 .await
402 .recv()
403 .await
404 .ok_or(NetworkError::ShutDown)?;
405 self.inner
406 .in_flight_message_count
407 .fetch_sub(1, Ordering::Relaxed);
408 Ok(ret)
409 }
410}