hotshot/traits/networking/
memory_network.rs1use core::time::Duration;
13use std::{
14 collections::HashMap,
15 fmt::Debug,
16 sync::{
17 Arc,
18 atomic::{AtomicUsize, Ordering},
19 },
20};
21
22use async_lock::{Mutex, RwLock};
23use async_trait::async_trait;
24use dashmap::DashMap;
25use hotshot_types::{
26 BoxSyncFuture, PeerConnectInfo, boxed_sync,
27 data::ViewNumber,
28 traits::{
29 network::{
30 AsyncGenerator, BroadcastDelay, ConnectedNetwork, TestableNetworkingImplementation,
31 Topic,
32 },
33 node_implementation::NodeType,
34 signature_key::SignatureKey,
35 },
36};
37use tokio::{
38 spawn,
39 sync::mpsc::{Receiver, Sender, channel, error::SendError},
40};
41use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn};
42
43use super::{NetworkError, NetworkReliability};
44
45#[derive(derive_more::Debug)]
50pub struct MasterMap<K: SignatureKey> {
51 #[debug(skip)]
53 map: DashMap<K, MemoryNetwork<K>>,
54
55 subscribed_map: DashMap<Topic, Vec<(K, MemoryNetwork<K>)>>,
57}
58
59impl<K: SignatureKey> MasterMap<K> {
60 #[must_use]
62 pub fn new() -> Arc<MasterMap<K>> {
63 Arc::new(MasterMap {
64 map: DashMap::new(),
65 subscribed_map: DashMap::new(),
66 })
67 }
68}
69
70#[derive(Debug)]
72struct MemoryNetworkInner<K: SignatureKey> {
73 input: RwLock<Option<Sender<Vec<u8>>>>,
75 output: Mutex<Receiver<Vec<u8>>>,
77 master_map: Arc<MasterMap<K>>,
79
80 in_flight_message_count: AtomicUsize,
82
83 reliability_config: Option<Box<dyn NetworkReliability>>,
85}
86
87#[derive(Clone)]
95pub struct MemoryNetwork<K: SignatureKey> {
96 inner: Arc<MemoryNetworkInner<K>>,
98}
99
100impl<K: SignatureKey> Debug for MemoryNetwork<K> {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("MemoryNetwork")
103 .field("inner", &"inner")
104 .finish()
105 }
106}
107
108impl<K: SignatureKey> MemoryNetwork<K> {
109 pub fn new(
111 pub_key: &K,
112 master_map: &Arc<MasterMap<K>>,
113 subscribed_topics: &[Topic],
114 reliability_config: Option<Box<dyn NetworkReliability>>,
115 ) -> MemoryNetwork<K> {
116 info!("Attaching new MemoryNetwork");
117 let (input, mut task_recv) = channel(128);
118 let (task_send, output) = channel(128);
119 let in_flight_message_count = AtomicUsize::new(0);
120 trace!("Channels open, spawning background task");
121
122 spawn(
123 async move {
124 debug!("Starting background task");
125 trace!("Entering processing loop");
126 while let Some(vec) = task_recv.recv().await {
127 trace!(?vec, "Incoming message");
128 let ts = task_send.clone();
130 let res = ts.send(vec).await;
131 if res.is_ok() {
132 trace!("Passed message to output queue");
133 } else {
134 error!("Output queue receivers are shutdown");
135 }
136 }
137 }
138 .instrument(info_span!("MemoryNetwork Background task", map = ?master_map)),
139 );
140 trace!("Notifying other networks of the new connected peer");
141 trace!("Task spawned, creating MemoryNetwork");
142 let mn = MemoryNetwork {
143 inner: Arc::new(MemoryNetworkInner {
144 input: RwLock::new(Some(input)),
145 output: Mutex::new(output),
146 master_map: Arc::clone(master_map),
147 in_flight_message_count,
148 reliability_config,
149 }),
150 };
151 master_map.map.insert(pub_key.clone(), mn.clone());
153 for topic in subscribed_topics {
155 master_map
156 .subscribed_map
157 .entry(*topic)
158 .or_default()
159 .push((pub_key.clone(), mn.clone()));
160 }
161
162 mn
163 }
164
165 async fn input(&self, message: Vec<u8>) -> Result<(), SendError<Vec<u8>>> {
167 self.inner
168 .in_flight_message_count
169 .fetch_add(1, Ordering::Relaxed);
170 let input = self.inner.input.read().await;
171 if let Some(input) = &*input {
172 input.send(message).await
173 } else {
174 Err(SendError(message))
175 }
176 }
177}
178
179impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
180 for MemoryNetwork<TYPES::SignatureKey>
181{
182 fn generator(
183 _expected_node_count: usize,
184 _num_bootstrap: usize,
185 _network_id: usize,
186 da_committee_size: usize,
187 reliability_config: Option<Box<dyn NetworkReliability>>,
188 _secondary_network_delay: Duration,
189 _connect_infos: &mut HashMap<TYPES::SignatureKey, PeerConnectInfo>,
190 ) -> AsyncGenerator<Arc<Self>> {
191 let master: Arc<_> = MasterMap::new();
192 Box::pin(move |node_id| {
194 let privkey = TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
195 let pubkey = TYPES::SignatureKey::from_private(&privkey);
196
197 let subscribed_topics = if node_id < da_committee_size as u64 {
199 vec![Topic::Da, Topic::Global]
201 } else {
202 vec![Topic::Global]
204 };
205
206 let net = MemoryNetwork::new(
207 &pubkey,
208 &master,
209 &subscribed_topics,
210 reliability_config.clone(),
211 );
212 Box::pin(async move { net.into() })
213 })
214 }
215
216 fn in_flight_message_count(&self) -> Option<usize> {
217 Some(self.inner.in_flight_message_count.load(Ordering::Relaxed))
218 }
219}
220
221#[async_trait]
223impl<K: SignatureKey + 'static> ConnectedNetwork<K> for MemoryNetwork<K> {
224 #[instrument(name = "MemoryNetwork::ready_blocking")]
225 async fn wait_for_ready(&self) {}
226
227 fn pause(&self) {
228 unimplemented!("Pausing not implemented for the Memory network");
229 }
230
231 fn resume(&self) {
232 unimplemented!("Resuming not implemented for the Memory network");
233 }
234
235 #[instrument(name = "MemoryNetwork::shut_down")]
236 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
237 where
238 'a: 'b,
239 Self: 'b,
240 {
241 let closure = async move {
242 *self.inner.input.write().await = None;
243 };
244 boxed_sync(closure)
245 }
246
247 #[instrument(name = "MemoryNetwork::broadcast_message")]
248 async fn broadcast_message(
249 &self,
250 _: ViewNumber,
251 message: Vec<u8>,
252 topic: Topic,
253 _broadcast_delay: BroadcastDelay,
254 ) -> Result<(), NetworkError> {
255 trace!(?message, "Broadcasting message");
256 for node in self
257 .inner
258 .master_map
259 .subscribed_map
260 .entry(topic)
261 .or_default()
262 .iter()
263 {
264 let (key, node) = node;
266 trace!(?key, "Sending message to node");
267 if let Some(config) = &self.inner.reliability_config {
268 {
269 let node2 = node.clone();
270 let fut = config.chaos_send_msg(
271 message.clone(),
272 Arc::new(move |msg: Vec<u8>| {
273 let node3 = (node2).clone();
274 boxed_sync(async move {
275 let _res = node3.input(msg).await;
276 })
279 }),
280 );
281 spawn(fut);
282 }
283 } else {
284 let res = node.input(message.clone()).await;
285 match res {
286 Ok(()) => {
287 trace!(?key, "Delivered message to remote");
288 },
289 Err(e) => {
290 warn!(?e, ?key, "Error sending broadcast message to node");
291 },
292 }
293 }
294 }
295 Ok(())
296 }
297
298 #[instrument(name = "MemoryNetwork::da_broadcast_message")]
299 async fn da_broadcast_message(
300 &self,
301 _: ViewNumber,
302 message: Vec<u8>,
303 recipients: Vec<K>,
304 _broadcast_delay: BroadcastDelay,
305 ) -> Result<(), NetworkError> {
306 trace!(?message, "Broadcasting message to DA");
307 for node in self
308 .inner
309 .master_map
310 .subscribed_map
311 .entry(Topic::Da)
312 .or_default()
313 .iter()
314 {
315 if !recipients.contains(&node.0) {
316 tracing::trace!("Skipping node because not in recipient list: {:?}", node.0);
317 continue;
318 }
319 let (key, node) = node;
321 trace!(?key, "Sending message to node");
322 if let Some(config) = &self.inner.reliability_config {
323 {
324 let node2 = node.clone();
325 let fut = config.chaos_send_msg(
326 message.clone(),
327 Arc::new(move |msg: Vec<u8>| {
328 let node3 = (node2).clone();
329 boxed_sync(async move {
330 let _res = node3.input(msg).await;
331 })
334 }),
335 );
336 spawn(fut);
337 }
338 } else {
339 let res = node.input(message.clone()).await;
340 match res {
341 Ok(()) => {
342 trace!(?key, "Delivered message to remote");
343 },
344 Err(e) => {
345 warn!(?e, ?key, "Error sending broadcast message to node");
346 },
347 }
348 }
349 }
350 Ok(())
351 }
352
353 #[instrument(name = "MemoryNetwork::direct_message")]
354 async fn direct_message(
355 &self,
356 _: ViewNumber,
357 message: Vec<u8>,
358 recipient: K,
359 ) -> Result<(), NetworkError> {
360 trace!("Message bincoded, finding recipient");
363 if let Some(node) = self.inner.master_map.map.get(&recipient) {
364 let node = node.value().clone();
365 if let Some(config) = &self.inner.reliability_config {
366 {
367 let fut = config.chaos_send_msg(
368 message.clone(),
369 Arc::new(move |msg: Vec<u8>| {
370 let node2 = node.clone();
371 boxed_sync(async move {
372 let _res = node2.input(msg).await;
373 })
376 }),
377 );
378 spawn(fut);
379 }
380 Ok(())
381 } else {
382 let res = node.input(message).await;
383 match res {
384 Ok(()) => {
385 trace!(?recipient, "Delivered message to remote");
386 Ok(())
387 },
388 Err(e) => Err(NetworkError::MessageSendError(format!(
389 "error sending direct message to node: {e}",
390 ))),
391 }
392 }
393 } else {
394 Err(NetworkError::MessageSendError(
395 "node does not exist".to_string(),
396 ))
397 }
398 }
399
400 #[instrument(name = "MemoryNetwork::recv_messages", skip_all)]
405 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
406 let ret = self
407 .inner
408 .output
409 .lock()
410 .await
411 .recv()
412 .await
413 .ok_or(NetworkError::ShutDown)?;
414 self.inner
415 .in_flight_message_count
416 .fetch_sub(1, Ordering::Relaxed);
417 Ok(ret)
418 }
419}