hotshot/traits/networking/
memory_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! In memory network simulator
8//!
9//! This module provides an in-memory only simulation of an actual network, useful for unit and
10//! integration tests.
11
12use core::time::Duration;
13use std::{
14    fmt::Debug,
15    sync::{
16        atomic::{AtomicUsize, Ordering},
17        Arc,
18    },
19};
20
21use async_lock::{Mutex, RwLock};
22use async_trait::async_trait;
23use dashmap::DashMap;
24use hotshot_types::{
25    boxed_sync,
26    data::ViewNumber,
27    traits::{
28        network::{
29            AsyncGenerator, BroadcastDelay, ConnectedNetwork, TestableNetworkingImplementation,
30            Topic,
31        },
32        node_implementation::NodeType,
33        signature_key::SignatureKey,
34    },
35    BoxSyncFuture,
36};
37use tokio::{
38    spawn,
39    sync::mpsc::{channel, error::SendError, Receiver, Sender},
40};
41use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
42
43use super::{NetworkError, NetworkReliability};
44
45/// Shared state for in-memory mock networking.
46///
47/// This type is responsible for keeping track of the channels to each [`MemoryNetwork`], and is
48/// used to group the [`MemoryNetwork`] instances.
49#[derive(derive_more::Debug)]
50pub struct MasterMap<K: SignatureKey> {
51    /// The list of `MemoryNetwork`s
52    #[debug(skip)]
53    map: DashMap<K, MemoryNetwork<K>>,
54
55    /// The list of `MemoryNetwork`s aggregated by topic
56    subscribed_map: DashMap<Topic, Vec<(K, MemoryNetwork<K>)>>,
57}
58
59impl<K: SignatureKey> MasterMap<K> {
60    /// Create a new, empty, `MasterMap`
61    #[must_use]
62    pub fn new() -> Arc<MasterMap<K>> {
63        Arc::new(MasterMap {
64            map: DashMap::new(),
65            subscribed_map: DashMap::new(),
66        })
67    }
68}
69
70/// Internal state for a `MemoryNetwork` instance
71#[derive(Debug)]
72struct MemoryNetworkInner<K: SignatureKey> {
73    /// Input for messages
74    input: RwLock<Option<Sender<Vec<u8>>>>,
75    /// Output for messages
76    output: Mutex<Receiver<Vec<u8>>>,
77    /// The master map
78    master_map: Arc<MasterMap<K>>,
79
80    /// Count of messages that are in-flight (send but not processed yet)
81    in_flight_message_count: AtomicUsize,
82
83    /// config to introduce unreliability to the network
84    reliability_config: Option<Box<dyn NetworkReliability>>,
85}
86
87/// In memory only network simulator.
88///
89/// This provides an in memory simulation of a networking implementation, allowing nodes running on
90/// the same machine to mock networking while testing other functionality.
91///
92/// Under the hood, this simply maintains mpmc channels to every other `MemoryNetwork` instance of the
93/// same group.
94#[derive(Clone)]
95pub struct MemoryNetwork<K: SignatureKey> {
96    /// The actual internal state
97    inner: Arc<MemoryNetworkInner<K>>,
98}
99
100impl<K: SignatureKey> Debug for MemoryNetwork<K> {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct("MemoryNetwork")
103            .field("inner", &"inner")
104            .finish()
105    }
106}
107
108impl<K: SignatureKey> MemoryNetwork<K> {
109    /// Creates a new `MemoryNetwork` and hooks it up to the group through the provided `MasterMap`
110    pub fn new(
111        pub_key: &K,
112        master_map: &Arc<MasterMap<K>>,
113        subscribed_topics: &[Topic],
114        reliability_config: Option<Box<dyn NetworkReliability>>,
115    ) -> MemoryNetwork<K> {
116        info!("Attaching new MemoryNetwork");
117        let (input, mut task_recv) = channel(128);
118        let (task_send, output) = channel(128);
119        let in_flight_message_count = AtomicUsize::new(0);
120        trace!("Channels open, spawning background task");
121
122        spawn(
123            async move {
124                debug!("Starting background task");
125                trace!("Entering processing loop");
126                while let Some(vec) = task_recv.recv().await {
127                    trace!(?vec, "Incoming message");
128                    // Attempt to decode message
129                    let ts = task_send.clone();
130                    let res = ts.send(vec).await;
131                    if res.is_ok() {
132                        trace!("Passed message to output queue");
133                    } else {
134                        error!("Output queue receivers are shutdown");
135                    }
136                }
137            }
138            .instrument(info_span!("MemoryNetwork Background task", map = ?master_map)),
139        );
140        trace!("Notifying other networks of the new connected peer");
141        trace!("Task spawned, creating MemoryNetwork");
142        let mn = MemoryNetwork {
143            inner: Arc::new(MemoryNetworkInner {
144                input: RwLock::new(Some(input)),
145                output: Mutex::new(output),
146                master_map: Arc::clone(master_map),
147                in_flight_message_count,
148                reliability_config,
149            }),
150        };
151        // Insert our public key into the master map
152        master_map.map.insert(pub_key.clone(), mn.clone());
153        // Insert our subscribed topics into the master map
154        for topic in subscribed_topics {
155            master_map
156                .subscribed_map
157                .entry(*topic)
158                .or_default()
159                .push((pub_key.clone(), mn.clone()));
160        }
161
162        mn
163    }
164
165    /// Send a [`Vec<u8>`] message to the inner `input`
166    async fn input(&self, message: Vec<u8>) -> Result<(), SendError<Vec<u8>>> {
167        self.inner
168            .in_flight_message_count
169            .fetch_add(1, Ordering::Relaxed);
170        let input = self.inner.input.read().await;
171        if let Some(input) = &*input {
172            input.send(message).await
173        } else {
174            Err(SendError(message))
175        }
176    }
177}
178
179impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
180    for MemoryNetwork<TYPES::SignatureKey>
181{
182    fn generator(
183        _expected_node_count: usize,
184        _num_bootstrap: usize,
185        _network_id: usize,
186        da_committee_size: usize,
187        reliability_config: Option<Box<dyn NetworkReliability>>,
188        _secondary_network_delay: Duration,
189    ) -> AsyncGenerator<Arc<Self>> {
190        let master: Arc<_> = MasterMap::new();
191        // We assign known_nodes' public key and stake value rather than read from config file since it's a test
192        Box::pin(move |node_id| {
193            let privkey = TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
194            let pubkey = TYPES::SignatureKey::from_private(&privkey);
195
196            // Subscribe to topics based on our index
197            let subscribed_topics = if node_id < da_committee_size as u64 {
198                // DA node
199                vec![Topic::Da, Topic::Global]
200            } else {
201                // Non-DA node
202                vec![Topic::Global]
203            };
204
205            let net = MemoryNetwork::new(
206                &pubkey,
207                &master,
208                &subscribed_topics,
209                reliability_config.clone(),
210            );
211            Box::pin(async move { net.into() })
212        })
213    }
214
215    fn in_flight_message_count(&self) -> Option<usize> {
216        Some(self.inner.in_flight_message_count.load(Ordering::Relaxed))
217    }
218}
219
220// TODO instrument these functions
221#[async_trait]
222impl<K: SignatureKey + 'static> ConnectedNetwork<K> for MemoryNetwork<K> {
223    #[instrument(name = "MemoryNetwork::ready_blocking")]
224    async fn wait_for_ready(&self) {}
225
226    fn pause(&self) {
227        unimplemented!("Pausing not implemented for the Memory network");
228    }
229
230    fn resume(&self) {
231        unimplemented!("Resuming not implemented for the Memory network");
232    }
233
234    #[instrument(name = "MemoryNetwork::shut_down")]
235    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
236    where
237        'a: 'b,
238        Self: 'b,
239    {
240        let closure = async move {
241            *self.inner.input.write().await = None;
242        };
243        boxed_sync(closure)
244    }
245
246    #[instrument(name = "MemoryNetwork::broadcast_message")]
247    async fn broadcast_message(
248        &self,
249        _: ViewNumber,
250        message: Vec<u8>,
251        topic: Topic,
252        _broadcast_delay: BroadcastDelay,
253    ) -> Result<(), NetworkError> {
254        trace!(?message, "Broadcasting message");
255        for node in self
256            .inner
257            .master_map
258            .subscribed_map
259            .entry(topic)
260            .or_default()
261            .iter()
262        {
263            // TODO delay/drop etc here
264            let (key, node) = node;
265            trace!(?key, "Sending message to node");
266            if let Some(ref config) = &self.inner.reliability_config {
267                {
268                    let node2 = node.clone();
269                    let fut = config.chaos_send_msg(
270                        message.clone(),
271                        Arc::new(move |msg: Vec<u8>| {
272                            let node3 = (node2).clone();
273                            boxed_sync(async move {
274                                let _res = node3.input(msg).await;
275                                // NOTE we're dropping metrics here but this is only for testing
276                                // purposes. I think that should be okay
277                            })
278                        }),
279                    );
280                    spawn(fut);
281                }
282            } else {
283                let res = node.input(message.clone()).await;
284                match res {
285                    Ok(()) => {
286                        trace!(?key, "Delivered message to remote");
287                    },
288                    Err(e) => {
289                        warn!(?e, ?key, "Error sending broadcast message to node");
290                    },
291                }
292            }
293        }
294        Ok(())
295    }
296
297    #[instrument(name = "MemoryNetwork::da_broadcast_message")]
298    async fn da_broadcast_message(
299        &self,
300        _: ViewNumber,
301        message: Vec<u8>,
302        recipients: Vec<K>,
303        _broadcast_delay: BroadcastDelay,
304    ) -> Result<(), NetworkError> {
305        trace!(?message, "Broadcasting message to DA");
306        for node in self
307            .inner
308            .master_map
309            .subscribed_map
310            .entry(Topic::Da)
311            .or_default()
312            .iter()
313        {
314            if !recipients.contains(&node.0) {
315                tracing::trace!("Skipping node because not in recipient list: {:?}", node.0);
316                continue;
317            }
318            // TODO delay/drop etc here
319            let (key, node) = node;
320            trace!(?key, "Sending message to node");
321            if let Some(ref config) = &self.inner.reliability_config {
322                {
323                    let node2 = node.clone();
324                    let fut = config.chaos_send_msg(
325                        message.clone(),
326                        Arc::new(move |msg: Vec<u8>| {
327                            let node3 = (node2).clone();
328                            boxed_sync(async move {
329                                let _res = node3.input(msg).await;
330                                // NOTE we're dropping metrics here but this is only for testing
331                                // purposes. I think that should be okay
332                            })
333                        }),
334                    );
335                    spawn(fut);
336                }
337            } else {
338                let res = node.input(message.clone()).await;
339                match res {
340                    Ok(()) => {
341                        trace!(?key, "Delivered message to remote");
342                    },
343                    Err(e) => {
344                        warn!(?e, ?key, "Error sending broadcast message to node");
345                    },
346                }
347            }
348        }
349        Ok(())
350    }
351
352    #[instrument(name = "MemoryNetwork::direct_message")]
353    async fn direct_message(
354        &self,
355        _: ViewNumber,
356        message: Vec<u8>,
357        recipient: K,
358    ) -> Result<(), NetworkError> {
359        // debug!(?message, ?recipient, "Sending direct message");
360        // Bincode the message
361        trace!("Message bincoded, finding recipient");
362        if let Some(node) = self.inner.master_map.map.get(&recipient) {
363            let node = node.value().clone();
364            if let Some(ref config) = &self.inner.reliability_config {
365                {
366                    let fut = config.chaos_send_msg(
367                        message.clone(),
368                        Arc::new(move |msg: Vec<u8>| {
369                            let node2 = node.clone();
370                            boxed_sync(async move {
371                                let _res = node2.input(msg).await;
372                                // NOTE we're dropping metrics here but this is only for testing
373                                // purposes. I think that should be okay
374                            })
375                        }),
376                    );
377                    spawn(fut);
378                }
379                Ok(())
380            } else {
381                let res = node.input(message).await;
382                match res {
383                    Ok(()) => {
384                        trace!(?recipient, "Delivered message to remote");
385                        Ok(())
386                    },
387                    Err(e) => Err(NetworkError::MessageSendError(format!(
388                        "error sending direct message to node: {e}",
389                    ))),
390                }
391            }
392        } else {
393            Err(NetworkError::MessageSendError(
394                "node does not exist".to_string(),
395            ))
396        }
397    }
398
399    /// Receive one or many messages from the underlying network.
400    ///
401    /// # Errors
402    /// If the other side of the channel is closed
403    #[instrument(name = "MemoryNetwork::recv_messages", skip_all)]
404    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
405        let ret = self
406            .inner
407            .output
408            .lock()
409            .await
410            .recv()
411            .await
412            .ok_or(NetworkError::ShutDown)?;
413        self.inner
414            .in_flight_message_count
415            .fetch_sub(1, Ordering::Relaxed);
416        Ok(ret)
417    }
418}