hotshot_types/traits/
network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Network access compatibility
8//!
9//! Contains types and traits used by `HotShot` to abstract over network access
10
11use std::{
12    collections::HashMap,
13    fmt::{Debug, Display},
14    hash::Hash,
15    pin::Pin,
16    sync::Arc,
17    time::Duration,
18};
19
20use async_trait::async_trait;
21use dyn_clone::DynClone;
22use futures::{Future, future::join_all};
23use rand::{
24    distributions::{Bernoulli, Uniform},
25    prelude::Distribution,
26};
27use serde::{Deserialize, Serialize};
28use thiserror::Error;
29use tokio::{sync::mpsc::error::TrySendError, time::sleep};
30
31use super::{node_implementation::NodeType, signature_key::SignatureKey};
32use crate::{
33    BoxSyncFuture, PeerConnectInfo,
34    data::{EpochNumber, ViewNumber},
35    epoch_membership::EpochMembershipCoordinator,
36    message::SequencingMessage,
37};
38
39/// Centralized server specific errors
40#[derive(Debug, Error, Serialize, Deserialize)]
41pub enum PushCdnNetworkError {}
42
43/// the type of transmission
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub enum TransmitType<TYPES: NodeType> {
46    /// directly transmit
47    Direct(TYPES::SignatureKey),
48    /// broadcast the message to all
49    Broadcast,
50    /// broadcast to DA committee
51    DaCommitteeBroadcast,
52}
53
54/// Errors that can occur in the network
55#[derive(Debug, Error)]
56pub enum NetworkError {
57    /// Multiple errors. Allows us to roll up multiple errors into one.
58    #[error("Multiple errors: {0:?}")]
59    Multiple(Vec<NetworkError>),
60
61    /// A configuration error
62    #[error("Configuration error: {0}")]
63    ConfigError(String),
64
65    /// An error occurred while sending a message
66    #[error("Failed to send message: {0}")]
67    MessageSendError(String),
68
69    /// An error occurred while receiving a message
70    #[error("Failed to receive message: {0}")]
71    MessageReceiveError(String),
72
73    /// The feature is unimplemented
74    #[error("Unimplemented")]
75    Unimplemented,
76
77    /// An error occurred while attempting to listen
78    #[error("Listen error: {0}")]
79    ListenError(String),
80
81    /// Failed to send over a channel
82    #[error("Channel send error: {0}")]
83    ChannelSendError(String),
84
85    /// Failed to receive over a channel
86    #[error("Channel receive error: {0}")]
87    ChannelReceiveError(String),
88
89    /// The network has been shut down and can no longer be used
90    #[error("Network has been shut down")]
91    ShutDown,
92
93    /// Failed to serialize
94    #[error("Failed to serialize: {0}")]
95    FailedToSerialize(String),
96
97    /// Failed to deserialize
98    #[error("Failed to deserialize: {0}")]
99    FailedToDeserialize(String),
100
101    /// Timed out performing an operation
102    #[error("Timeout: {0}")]
103    Timeout(String),
104
105    /// The network request had been cancelled before it could be fulfilled
106    #[error("The request was cancelled before it could be fulfilled")]
107    RequestCancelled,
108
109    /// This node does not have any peers yet
110    #[error("This node does not have any peers yet")]
111    NoPeersYet,
112
113    /// Failed to look up a node on the network
114    #[error("Node lookup failed: {0}")]
115    LookupError(String),
116}
117
118/// Trait that bundles what we need from a request ID
119pub trait Id: Eq + PartialEq + Hash {}
120
121/// a message
122pub trait ViewMessage<TYPES: NodeType> {
123    /// get the view out of the message
124    fn view_number(&self) -> ViewNumber;
125}
126
127/// A request for some data that the consensus layer is asking for.
128#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
129#[serde(bound(deserialize = ""))]
130pub struct DataRequest<TYPES: NodeType> {
131    /// Request
132    pub request: RequestKind<TYPES>,
133    /// View this message is for
134    pub view: ViewNumber,
135    /// signature of the Sha256 hash of the data so outsiders can't use know
136    /// public keys with stake.
137    pub signature: <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
138}
139
140/// Underlying data request
141#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
142pub enum RequestKind<TYPES: NodeType> {
143    /// Request VID data by our key and the VID commitment
144    Vid(ViewNumber, TYPES::SignatureKey),
145    /// Request a DA proposal for a certain view
146    DaProposal(ViewNumber),
147    /// Request for quorum proposal for a view
148    Proposal(ViewNumber),
149}
150
151impl<TYPES: NodeType> std::fmt::Debug for RequestKind<TYPES> {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        match self {
154            RequestKind::Vid(view, key) => write!(f, "Vid({view:?}, {key})"),
155            RequestKind::DaProposal(view) => write!(f, "DaProposal({view:?})"),
156            RequestKind::Proposal(view) => write!(f, "Proposal({view:?})"),
157        }
158    }
159}
160
161/// A response for a request.  `SequencingMessage` is the same as other network messages
162/// The kind of message `M` is determined by what we requested
163#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
164#[serde(bound(deserialize = ""))]
165#[allow(clippy::large_enum_variant)]
166/// TODO: Put `Found` content in a `Box` to make enum smaller
167pub enum ResponseMessage<TYPES: NodeType> {
168    /// Peer returned us some data
169    Found(SequencingMessage<TYPES>),
170    /// Peer failed to get us data
171    NotFound,
172    /// The Request was denied
173    Denied,
174}
175
176/// When a message should be broadcast to the network.
177///
178/// Network implementations may or may not respect this, at their discretion.
179#[derive(Debug, Clone, PartialEq, Eq)]
180pub enum BroadcastDelay {
181    /// Broadcast the message immediately
182    None,
183    /// Delay the broadcast to a given view.
184    View(u64),
185}
186
187/// represents a networking implmentration
188/// exposes low level API for interacting with a network
189/// intended to be implemented for libp2p, the centralized server,
190/// and memory network
191#[async_trait]
192pub trait ConnectedNetwork<K: SignatureKey + 'static>: Clone + Send + Sync + 'static {
193    /// Pauses the underlying network
194    fn pause(&self);
195
196    /// Resumes the underlying network
197    fn resume(&self);
198
199    /// Blocks until the network is successfully initialized
200    async fn wait_for_ready(&self);
201
202    /// Blocks until the network is shut down
203    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
204    where
205        'a: 'b,
206        Self: 'b;
207
208    /// broadcast message to some subset of nodes
209    /// blocking
210    async fn broadcast_message(
211        &self,
212        view: ViewNumber,
213        message: Vec<u8>,
214        topic: Topic,
215        broadcast_delay: BroadcastDelay,
216    ) -> Result<(), NetworkError>;
217
218    /// broadcast a message only to a DA committee
219    /// blocking
220    async fn da_broadcast_message(
221        &self,
222        view: ViewNumber,
223        message: Vec<u8>,
224        recipients: Vec<K>,
225        broadcast_delay: BroadcastDelay,
226    ) -> Result<(), NetworkError>;
227
228    /// send messages with vid shares to its recipients
229    /// blocking
230    async fn vid_broadcast_message(
231        &self,
232        messages: HashMap<K, (ViewNumber, Vec<u8>)>,
233    ) -> Result<(), NetworkError> {
234        let future_results = messages
235            .into_iter()
236            .map(|(recipient_key, (v, m))| self.direct_message(v, m, recipient_key));
237        let results = join_all(future_results).await;
238
239        let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
240
241        if errors.is_empty() {
242            Ok(())
243        } else {
244            Err(NetworkError::Multiple(errors))
245        }
246    }
247
248    /// Sends a direct message to a specific node
249    /// blocking
250    async fn direct_message(
251        &self,
252        view: ViewNumber,
253        message: Vec<u8>,
254        recipient: K,
255    ) -> Result<(), NetworkError>;
256
257    /// Receive one or many messages from the underlying network.
258    ///
259    /// # Errors
260    /// If there is a network-related failure.
261    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError>;
262
263    /// queues lookup of a node
264    ///
265    /// # Errors
266    /// Does not error.
267    fn queue_node_lookup(
268        &self,
269        _: ViewNumber,
270        _: K,
271    ) -> Result<(), TrySendError<Option<(ViewNumber, K)>>> {
272        Ok(())
273    }
274
275    /// Update view can be used for any reason, but mostly it's for canceling tasks,
276    /// and looking up the address of the leader of a future view.
277    async fn update_view<TYPES>(
278        &self,
279        _: ViewNumber,
280        _: Option<EpochNumber>,
281        _: EpochMembershipCoordinator<TYPES>,
282    ) where
283        TYPES: NodeType<SignatureKey = K>,
284    {
285    }
286
287    /// Is primary network down? Makes sense only for combined network
288    fn is_primary_down(&self) -> bool {
289        false
290    }
291}
292
293/// A channel generator for types that need asynchronous execution
294pub type AsyncGenerator<T> =
295    Pin<Box<dyn Fn(u64) -> Pin<Box<dyn Future<Output = T> + Send>> + Send + Sync>>;
296
297/// Describes additional functionality needed by the test network implementation
298pub trait TestableNetworkingImplementation<TYPES: NodeType>
299where
300    Self: Sized,
301{
302    /// generates a network given an expected node count
303    #[allow(clippy::type_complexity)]
304    fn generator(
305        expected_node_count: usize,
306        num_bootstrap: usize,
307        network_id: usize,
308        da_committee_size: usize,
309        reliability_config: Option<Box<dyn NetworkReliability>>,
310        secondary_network_delay: Duration,
311        connect_infos: &mut HashMap<TYPES::SignatureKey, PeerConnectInfo>,
312    ) -> AsyncGenerator<Arc<Self>>;
313
314    /// Get the number of messages in-flight.
315    ///
316    /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`.
317    fn in_flight_message_count(&self) -> Option<usize>;
318}
319
320/// Changes that can occur in the network
321#[derive(Debug)]
322pub enum NetworkChange<P: SignatureKey> {
323    /// A node is connected
324    NodeConnected(P),
325
326    /// A node is disconnected
327    NodeDisconnected(P),
328}
329
330/// interface describing how reliable the network is
331#[async_trait]
332pub trait NetworkReliability: Debug + Sync + std::marker::Send + DynClone + 'static {
333    /// Sample from bernoulli distribution to decide whether
334    /// or not to keep a packet
335    /// # Panics
336    ///
337    /// Panics if `self.keep_numerator > self.keep_denominator`
338    ///
339    fn sample_keep(&self) -> bool {
340        true
341    }
342
343    /// sample from uniform distribution to decide whether
344    /// or not to keep a packet
345    fn sample_delay(&self) -> Duration {
346        std::time::Duration::ZERO
347    }
348
349    /// scramble the packet
350    fn scramble(&self, msg: Vec<u8>) -> Vec<u8> {
351        msg
352    }
353
354    /// number of times to repeat the packet
355    fn sample_repeat(&self) -> usize {
356        1
357    }
358
359    /// given a message and a way to send the message,
360    /// decide whether or not to send the message
361    /// how long to delay the message
362    /// whether or not to send duplicates
363    /// and whether or not to include noise with the message
364    /// then send the message
365    /// note: usually self is stored in a rwlock
366    /// so instead of doing the sending part, we just fiddle with the message
367    /// then return a future that does the sending and delaying
368    fn chaos_send_msg(
369        &self,
370        msg: Vec<u8>,
371        send_fn: Arc<dyn Send + Sync + 'static + Fn(Vec<u8>) -> BoxSyncFuture<'static, ()>>,
372    ) -> BoxSyncFuture<'static, ()> {
373        let sample_keep = self.sample_keep();
374        let delay = self.sample_delay();
375        let repeats = self.sample_repeat();
376        let mut msgs = Vec::new();
377        for _idx in 0..repeats {
378            let scrambled = self.scramble(msg.clone());
379            msgs.push(scrambled);
380        }
381        let closure = async move {
382            if sample_keep {
383                sleep(delay).await;
384                for msg in msgs {
385                    send_fn(msg).await;
386                }
387            }
388        };
389        Box::pin(closure)
390    }
391}
392
393// hack to get clone
394dyn_clone::clone_trait_object!(NetworkReliability);
395
396/// ideal network
397#[derive(Clone, Copy, Debug, Default)]
398pub struct PerfectNetwork {}
399
400impl NetworkReliability for PerfectNetwork {}
401
402/// A synchronous network. Packets may be delayed, but are guaranteed
403/// to arrive within `timeout` ns
404#[derive(Clone, Copy, Debug, Default)]
405pub struct SynchronousNetwork {
406    /// Max value in milliseconds that a packet may be delayed
407    pub delay_high_ms: u64,
408    /// Lowest value in milliseconds that a packet may be delayed
409    pub delay_low_ms: u64,
410}
411
412impl NetworkReliability for SynchronousNetwork {
413    /// never drop a packet
414    fn sample_keep(&self) -> bool {
415        true
416    }
417    fn sample_delay(&self) -> Duration {
418        Duration::from_millis(
419            Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms)
420                .sample(&mut rand::thread_rng()),
421        )
422    }
423}
424
425/// An asynchronous network. Packets may be dropped entirely
426/// or delayed for arbitrarily long periods
427/// probability that packet is kept = `keep_numerator` / `keep_denominator`
428/// packet delay is obtained by sampling from a uniform distribution
429/// between `delay_low_ms` and `delay_high_ms`, inclusive
430#[derive(Debug, Clone, Copy)]
431pub struct AsynchronousNetwork {
432    /// numerator for probability of keeping packets
433    pub keep_numerator: u32,
434    /// denominator for probability of keeping packets
435    pub keep_denominator: u32,
436    /// lowest value in milliseconds that a packet may be delayed
437    pub delay_low_ms: u64,
438    /// highest value in milliseconds that a packet may be delayed
439    pub delay_high_ms: u64,
440}
441
442impl NetworkReliability for AsynchronousNetwork {
443    fn sample_keep(&self) -> bool {
444        Bernoulli::from_ratio(self.keep_numerator, self.keep_denominator)
445            .unwrap()
446            .sample(&mut rand::thread_rng())
447    }
448    fn sample_delay(&self) -> Duration {
449        Duration::from_millis(
450            Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms)
451                .sample(&mut rand::thread_rng()),
452        )
453    }
454}
455
456/// An partially synchronous network. Behaves asynchronously
457/// until some arbitrary time bound, GST,
458/// then synchronously after GST
459#[allow(clippy::similar_names)]
460#[derive(Debug, Clone, Copy)]
461pub struct PartiallySynchronousNetwork {
462    /// asynchronous portion of network
463    pub asynchronous: AsynchronousNetwork,
464    /// synchronous portion of network
465    pub synchronous: SynchronousNetwork,
466    /// time when GST occurs
467    pub gst: std::time::Duration,
468    /// when the network was started
469    pub start: std::time::Instant,
470}
471
472impl NetworkReliability for PartiallySynchronousNetwork {
473    /// never drop a packet
474    fn sample_keep(&self) -> bool {
475        true
476    }
477    fn sample_delay(&self) -> Duration {
478        // act asynchronous before gst
479        if self.start.elapsed() < self.gst {
480            if self.asynchronous.sample_keep() {
481                self.asynchronous.sample_delay()
482            } else {
483                // assume packet was "dropped" and will arrive after gst
484                self.synchronous.sample_delay() + self.gst
485            }
486        } else {
487            // act synchronous after gst
488            self.synchronous.sample_delay()
489        }
490    }
491}
492
493impl Default for AsynchronousNetwork {
494    // disable all chance of failure
495    fn default() -> Self {
496        AsynchronousNetwork {
497            keep_numerator: 1,
498            keep_denominator: 1,
499            delay_low_ms: 0,
500            delay_high_ms: 0,
501        }
502    }
503}
504
505impl Default for PartiallySynchronousNetwork {
506    fn default() -> Self {
507        PartiallySynchronousNetwork {
508            synchronous: SynchronousNetwork::default(),
509            asynchronous: AsynchronousNetwork::default(),
510            gst: std::time::Duration::new(0, 0),
511            start: std::time::Instant::now(),
512        }
513    }
514}
515
516impl SynchronousNetwork {
517    /// create new `SynchronousNetwork`
518    #[must_use]
519    pub fn new(timeout: u64, delay_low_ms: u64) -> Self {
520        SynchronousNetwork {
521            delay_high_ms: timeout,
522            delay_low_ms,
523        }
524    }
525}
526
527impl AsynchronousNetwork {
528    /// create new `AsynchronousNetwork`
529    #[must_use]
530    pub fn new(
531        keep_numerator: u32,
532        keep_denominator: u32,
533        delay_low_ms: u64,
534        delay_high_ms: u64,
535    ) -> Self {
536        AsynchronousNetwork {
537            keep_numerator,
538            keep_denominator,
539            delay_low_ms,
540            delay_high_ms,
541        }
542    }
543}
544
545impl PartiallySynchronousNetwork {
546    /// create new `PartiallySynchronousNetwork`
547    #[allow(clippy::similar_names)]
548    #[must_use]
549    pub fn new(
550        asynchronous: AsynchronousNetwork,
551        synchronous: SynchronousNetwork,
552        gst: std::time::Duration,
553    ) -> Self {
554        PartiallySynchronousNetwork {
555            asynchronous,
556            synchronous,
557            gst,
558            start: std::time::Instant::now(),
559        }
560    }
561}
562
563/// A chaotic network using all the networking calls
564#[derive(Debug, Clone)]
565pub struct ChaosNetwork {
566    /// numerator for probability of keeping packets
567    pub keep_numerator: u32,
568    /// denominator for probability of keeping packets
569    pub keep_denominator: u32,
570    /// lowest value in milliseconds that a packet may be delayed
571    pub delay_low_ms: u64,
572    /// highest value in milliseconds that a packet may be delayed
573    pub delay_high_ms: u64,
574    /// lowest value of repeats for a message
575    pub repeat_low: usize,
576    /// highest value of repeats for a message
577    pub repeat_high: usize,
578}
579
580impl NetworkReliability for ChaosNetwork {
581    fn sample_keep(&self) -> bool {
582        Bernoulli::from_ratio(self.keep_numerator, self.keep_denominator)
583            .unwrap()
584            .sample(&mut rand::thread_rng())
585    }
586
587    fn sample_delay(&self) -> Duration {
588        Duration::from_millis(
589            Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms)
590                .sample(&mut rand::thread_rng()),
591        )
592    }
593
594    fn sample_repeat(&self) -> usize {
595        Uniform::new_inclusive(self.repeat_low, self.repeat_high).sample(&mut rand::thread_rng())
596    }
597}
598
599/// Used when broadcasting messages
600#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)]
601pub enum Topic {
602    /// The `Global` topic goes out to all nodes
603    Global,
604    /// The `Da` topic goes out to only the DA committee
605    Da,
606}
607
608/// Libp2p topics require a string, so we need to convert our enum to a string
609impl Display for Topic {
610    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
611        match self {
612            Topic::Global => write!(f, "global"),
613            Topic::Da => write!(f, "DA"),
614        }
615    }
616}