hotshot/traits/networking/
memory_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! In memory network simulator
8//!
9//! This module provides an in-memory only simulation of an actual network, useful for unit and
10//! integration tests.
11
12use core::time::Duration;
13use std::{
14    fmt::Debug,
15    sync::{
16        atomic::{AtomicUsize, Ordering},
17        Arc,
18    },
19};
20
21use async_lock::{Mutex, RwLock};
22use async_trait::async_trait;
23use dashmap::DashMap;
24use hotshot_types::{
25    boxed_sync,
26    traits::{
27        network::{
28            AsyncGenerator, BroadcastDelay, ConnectedNetwork, TestableNetworkingImplementation,
29            Topic,
30        },
31        node_implementation::NodeType,
32        signature_key::SignatureKey,
33    },
34    BoxSyncFuture,
35};
36use tokio::{
37    spawn,
38    sync::mpsc::{channel, error::SendError, Receiver, Sender},
39};
40use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
41
42use super::{NetworkError, NetworkReliability};
43
44/// Shared state for in-memory mock networking.
45///
46/// This type is responsible for keeping track of the channels to each [`MemoryNetwork`], and is
47/// used to group the [`MemoryNetwork`] instances.
48#[derive(derive_more::Debug)]
49pub struct MasterMap<K: SignatureKey> {
50    /// The list of `MemoryNetwork`s
51    #[debug(skip)]
52    map: DashMap<K, MemoryNetwork<K>>,
53
54    /// The list of `MemoryNetwork`s aggregated by topic
55    subscribed_map: DashMap<Topic, Vec<(K, MemoryNetwork<K>)>>,
56}
57
58impl<K: SignatureKey> MasterMap<K> {
59    /// Create a new, empty, `MasterMap`
60    #[must_use]
61    pub fn new() -> Arc<MasterMap<K>> {
62        Arc::new(MasterMap {
63            map: DashMap::new(),
64            subscribed_map: DashMap::new(),
65        })
66    }
67}
68
69/// Internal state for a `MemoryNetwork` instance
70#[derive(Debug)]
71struct MemoryNetworkInner<K: SignatureKey> {
72    /// Input for messages
73    input: RwLock<Option<Sender<Vec<u8>>>>,
74    /// Output for messages
75    output: Mutex<Receiver<Vec<u8>>>,
76    /// The master map
77    master_map: Arc<MasterMap<K>>,
78
79    /// Count of messages that are in-flight (send but not processed yet)
80    in_flight_message_count: AtomicUsize,
81
82    /// config to introduce unreliability to the network
83    reliability_config: Option<Box<dyn NetworkReliability>>,
84}
85
86/// In memory only network simulator.
87///
88/// This provides an in memory simulation of a networking implementation, allowing nodes running on
89/// the same machine to mock networking while testing other functionality.
90///
91/// Under the hood, this simply maintains mpmc channels to every other `MemoryNetwork` instance of the
92/// same group.
93#[derive(Clone)]
94pub struct MemoryNetwork<K: SignatureKey> {
95    /// The actual internal state
96    inner: Arc<MemoryNetworkInner<K>>,
97}
98
99impl<K: SignatureKey> Debug for MemoryNetwork<K> {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        f.debug_struct("MemoryNetwork")
102            .field("inner", &"inner")
103            .finish()
104    }
105}
106
107impl<K: SignatureKey> MemoryNetwork<K> {
108    /// Creates a new `MemoryNetwork` and hooks it up to the group through the provided `MasterMap`
109    pub fn new(
110        pub_key: &K,
111        master_map: &Arc<MasterMap<K>>,
112        subscribed_topics: &[Topic],
113        reliability_config: Option<Box<dyn NetworkReliability>>,
114    ) -> MemoryNetwork<K> {
115        info!("Attaching new MemoryNetwork");
116        let (input, mut task_recv) = channel(128);
117        let (task_send, output) = channel(128);
118        let in_flight_message_count = AtomicUsize::new(0);
119        trace!("Channels open, spawning background task");
120
121        spawn(
122            async move {
123                debug!("Starting background task");
124                trace!("Entering processing loop");
125                while let Some(vec) = task_recv.recv().await {
126                    trace!(?vec, "Incoming message");
127                    // Attempt to decode message
128                    let ts = task_send.clone();
129                    let res = ts.send(vec).await;
130                    if res.is_ok() {
131                        trace!("Passed message to output queue");
132                    } else {
133                        error!("Output queue receivers are shutdown");
134                    }
135                }
136            }
137            .instrument(info_span!("MemoryNetwork Background task", map = ?master_map)),
138        );
139        trace!("Notifying other networks of the new connected peer");
140        trace!("Task spawned, creating MemoryNetwork");
141        let mn = MemoryNetwork {
142            inner: Arc::new(MemoryNetworkInner {
143                input: RwLock::new(Some(input)),
144                output: Mutex::new(output),
145                master_map: Arc::clone(master_map),
146                in_flight_message_count,
147                reliability_config,
148            }),
149        };
150        // Insert our public key into the master map
151        master_map.map.insert(pub_key.clone(), mn.clone());
152        // Insert our subscribed topics into the master map
153        for topic in subscribed_topics {
154            master_map
155                .subscribed_map
156                .entry(topic.clone())
157                .or_default()
158                .push((pub_key.clone(), mn.clone()));
159        }
160
161        mn
162    }
163
164    /// Send a [`Vec<u8>`] message to the inner `input`
165    async fn input(&self, message: Vec<u8>) -> Result<(), SendError<Vec<u8>>> {
166        self.inner
167            .in_flight_message_count
168            .fetch_add(1, Ordering::Relaxed);
169        let input = self.inner.input.read().await;
170        if let Some(input) = &*input {
171            input.send(message).await
172        } else {
173            Err(SendError(message))
174        }
175    }
176}
177
178impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
179    for MemoryNetwork<TYPES::SignatureKey>
180{
181    fn generator(
182        _expected_node_count: usize,
183        _num_bootstrap: usize,
184        _network_id: usize,
185        da_committee_size: usize,
186        reliability_config: Option<Box<dyn NetworkReliability>>,
187        _secondary_network_delay: Duration,
188    ) -> AsyncGenerator<Arc<Self>> {
189        let master: Arc<_> = MasterMap::new();
190        // We assign known_nodes' public key and stake value rather than read from config file since it's a test
191        Box::pin(move |node_id| {
192            let privkey = TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
193            let pubkey = TYPES::SignatureKey::from_private(&privkey);
194
195            // Subscribe to topics based on our index
196            let subscribed_topics = if node_id < da_committee_size as u64 {
197                // DA node
198                vec![Topic::Da, Topic::Global]
199            } else {
200                // Non-DA node
201                vec![Topic::Global]
202            };
203
204            let net = MemoryNetwork::new(
205                &pubkey,
206                &master,
207                &subscribed_topics,
208                reliability_config.clone(),
209            );
210            Box::pin(async move { net.into() })
211        })
212    }
213
214    fn in_flight_message_count(&self) -> Option<usize> {
215        Some(self.inner.in_flight_message_count.load(Ordering::Relaxed))
216    }
217}
218
219// TODO instrument these functions
220#[async_trait]
221impl<K: SignatureKey + 'static> ConnectedNetwork<K> for MemoryNetwork<K> {
222    #[instrument(name = "MemoryNetwork::ready_blocking")]
223    async fn wait_for_ready(&self) {}
224
225    fn pause(&self) {
226        unimplemented!("Pausing not implemented for the Memory network");
227    }
228
229    fn resume(&self) {
230        unimplemented!("Resuming not implemented for the Memory network");
231    }
232
233    #[instrument(name = "MemoryNetwork::shut_down")]
234    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
235    where
236        'a: 'b,
237        Self: 'b,
238    {
239        let closure = async move {
240            *self.inner.input.write().await = None;
241        };
242        boxed_sync(closure)
243    }
244
245    #[instrument(name = "MemoryNetwork::broadcast_message")]
246    async fn broadcast_message(
247        &self,
248        message: Vec<u8>,
249        topic: Topic,
250        _broadcast_delay: BroadcastDelay,
251    ) -> Result<(), NetworkError> {
252        trace!(?message, "Broadcasting message");
253        for node in self
254            .inner
255            .master_map
256            .subscribed_map
257            .entry(topic)
258            .or_default()
259            .iter()
260        {
261            // TODO delay/drop etc here
262            let (key, node) = node;
263            trace!(?key, "Sending message to node");
264            if let Some(ref config) = &self.inner.reliability_config {
265                {
266                    let node2 = node.clone();
267                    let fut = config.chaos_send_msg(
268                        message.clone(),
269                        Arc::new(move |msg: Vec<u8>| {
270                            let node3 = (node2).clone();
271                            boxed_sync(async move {
272                                let _res = node3.input(msg).await;
273                                // NOTE we're dropping metrics here but this is only for testing
274                                // purposes. I think that should be okay
275                            })
276                        }),
277                    );
278                    spawn(fut);
279                }
280            } else {
281                let res = node.input(message.clone()).await;
282                match res {
283                    Ok(()) => {
284                        trace!(?key, "Delivered message to remote");
285                    },
286                    Err(e) => {
287                        warn!(?e, ?key, "Error sending broadcast message to node");
288                    },
289                }
290            }
291        }
292        Ok(())
293    }
294
295    #[instrument(name = "MemoryNetwork::da_broadcast_message")]
296    async fn da_broadcast_message(
297        &self,
298        message: Vec<u8>,
299        recipients: Vec<K>,
300        _broadcast_delay: BroadcastDelay,
301    ) -> Result<(), NetworkError> {
302        trace!(?message, "Broadcasting message to DA");
303        for node in self
304            .inner
305            .master_map
306            .subscribed_map
307            .entry(Topic::Da)
308            .or_default()
309            .iter()
310        {
311            if !recipients.contains(&node.0) {
312                tracing::trace!("Skipping node because not in recipient list: {:?}", node.0);
313                continue;
314            }
315            // TODO delay/drop etc here
316            let (key, node) = node;
317            trace!(?key, "Sending message to node");
318            if let Some(ref config) = &self.inner.reliability_config {
319                {
320                    let node2 = node.clone();
321                    let fut = config.chaos_send_msg(
322                        message.clone(),
323                        Arc::new(move |msg: Vec<u8>| {
324                            let node3 = (node2).clone();
325                            boxed_sync(async move {
326                                let _res = node3.input(msg).await;
327                                // NOTE we're dropping metrics here but this is only for testing
328                                // purposes. I think that should be okay
329                            })
330                        }),
331                    );
332                    spawn(fut);
333                }
334            } else {
335                let res = node.input(message.clone()).await;
336                match res {
337                    Ok(()) => {
338                        trace!(?key, "Delivered message to remote");
339                    },
340                    Err(e) => {
341                        warn!(?e, ?key, "Error sending broadcast message to node");
342                    },
343                }
344            }
345        }
346        Ok(())
347    }
348
349    #[instrument(name = "MemoryNetwork::direct_message")]
350    async fn direct_message(&self, message: Vec<u8>, recipient: K) -> Result<(), NetworkError> {
351        // debug!(?message, ?recipient, "Sending direct message");
352        // Bincode the message
353        trace!("Message bincoded, finding recipient");
354        if let Some(node) = self.inner.master_map.map.get(&recipient) {
355            let node = node.value().clone();
356            if let Some(ref config) = &self.inner.reliability_config {
357                {
358                    let fut = config.chaos_send_msg(
359                        message.clone(),
360                        Arc::new(move |msg: Vec<u8>| {
361                            let node2 = node.clone();
362                            boxed_sync(async move {
363                                let _res = node2.input(msg).await;
364                                // NOTE we're dropping metrics here but this is only for testing
365                                // purposes. I think that should be okay
366                            })
367                        }),
368                    );
369                    spawn(fut);
370                }
371                Ok(())
372            } else {
373                let res = node.input(message).await;
374                match res {
375                    Ok(()) => {
376                        trace!(?recipient, "Delivered message to remote");
377                        Ok(())
378                    },
379                    Err(e) => Err(NetworkError::MessageSendError(format!(
380                        "error sending direct message to node: {e}",
381                    ))),
382                }
383            }
384        } else {
385            Err(NetworkError::MessageSendError(
386                "node does not exist".to_string(),
387            ))
388        }
389    }
390
391    /// Receive one or many messages from the underlying network.
392    ///
393    /// # Errors
394    /// If the other side of the channel is closed
395    #[instrument(name = "MemoryNetwork::recv_messages", skip_all)]
396    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
397        let ret = self
398            .inner
399            .output
400            .lock()
401            .await
402            .recv()
403            .await
404            .ok_or(NetworkError::ShutDown)?;
405        self.inner
406            .in_flight_message_count
407            .fetch_sub(1, Ordering::Relaxed);
408        Ok(ret)
409    }
410}