hotshot_types/traits/
network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Network access compatibility
8//!
9//! Contains types and traits used by `HotShot` to abstract over network access
10
11use std::{
12    collections::HashMap,
13    fmt::{Debug, Display},
14    hash::Hash,
15    pin::Pin,
16    sync::Arc,
17    time::Duration,
18};
19
20use async_trait::async_trait;
21use dyn_clone::DynClone;
22use futures::{future::join_all, Future};
23use rand::{
24    distributions::{Bernoulli, Uniform},
25    prelude::Distribution,
26};
27use serde::{Deserialize, Serialize};
28use thiserror::Error;
29use tokio::{sync::mpsc::error::TrySendError, time::sleep};
30
31use super::{node_implementation::NodeType, signature_key::SignatureKey};
32use crate::{
33    data::ViewNumber, epoch_membership::EpochMembershipCoordinator, message::SequencingMessage,
34    BoxSyncFuture,
35};
36
37/// Centralized server specific errors
38#[derive(Debug, Error, Serialize, Deserialize)]
39pub enum PushCdnNetworkError {}
40
41/// the type of transmission
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub enum TransmitType<TYPES: NodeType> {
44    /// directly transmit
45    Direct(TYPES::SignatureKey),
46    /// broadcast the message to all
47    Broadcast,
48    /// broadcast to DA committee
49    DaCommitteeBroadcast,
50}
51
52/// Errors that can occur in the network
53#[derive(Debug, Error)]
54pub enum NetworkError {
55    /// Multiple errors. Allows us to roll up multiple errors into one.
56    #[error("Multiple errors: {0:?}")]
57    Multiple(Vec<NetworkError>),
58
59    /// A configuration error
60    #[error("Configuration error: {0}")]
61    ConfigError(String),
62
63    /// An error occurred while sending a message
64    #[error("Failed to send message: {0}")]
65    MessageSendError(String),
66
67    /// An error occurred while receiving a message
68    #[error("Failed to receive message: {0}")]
69    MessageReceiveError(String),
70
71    /// The feature is unimplemented
72    #[error("Unimplemented")]
73    Unimplemented,
74
75    /// An error occurred while attempting to listen
76    #[error("Listen error: {0}")]
77    ListenError(String),
78
79    /// Failed to send over a channel
80    #[error("Channel send error: {0}")]
81    ChannelSendError(String),
82
83    /// Failed to receive over a channel
84    #[error("Channel receive error: {0}")]
85    ChannelReceiveError(String),
86
87    /// The network has been shut down and can no longer be used
88    #[error("Network has been shut down")]
89    ShutDown,
90
91    /// Failed to serialize
92    #[error("Failed to serialize: {0}")]
93    FailedToSerialize(String),
94
95    /// Failed to deserialize
96    #[error("Failed to deserialize: {0}")]
97    FailedToDeserialize(String),
98
99    /// Timed out performing an operation
100    #[error("Timeout: {0}")]
101    Timeout(String),
102
103    /// The network request had been cancelled before it could be fulfilled
104    #[error("The request was cancelled before it could be fulfilled")]
105    RequestCancelled,
106
107    /// This node does not have any peers yet
108    #[error("This node does not have any peers yet")]
109    NoPeersYet,
110
111    /// Failed to look up a node on the network
112    #[error("Node lookup failed: {0}")]
113    LookupError(String),
114}
115
116/// Trait that bundles what we need from a request ID
117pub trait Id: Eq + PartialEq + Hash {}
118
119/// a message
120pub trait ViewMessage<TYPES: NodeType> {
121    /// get the view out of the message
122    fn view_number(&self) -> TYPES::View;
123}
124
125/// A request for some data that the consensus layer is asking for.
126#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
127#[serde(bound(deserialize = ""))]
128pub struct DataRequest<TYPES: NodeType> {
129    /// Request
130    pub request: RequestKind<TYPES>,
131    /// View this message is for
132    pub view: TYPES::View,
133    /// signature of the Sha256 hash of the data so outsiders can't use know
134    /// public keys with stake.
135    pub signature: <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
136}
137
138/// Underlying data request
139#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
140pub enum RequestKind<TYPES: NodeType> {
141    /// Request VID data by our key and the VID commitment
142    Vid(TYPES::View, TYPES::SignatureKey),
143    /// Request a DA proposal for a certain view
144    DaProposal(TYPES::View),
145    /// Request for quorum proposal for a view
146    Proposal(TYPES::View),
147}
148
149impl<TYPES: NodeType> std::fmt::Debug for RequestKind<TYPES> {
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        match self {
152            RequestKind::Vid(view, key) => write!(f, "Vid({view:?}, {key})"),
153            RequestKind::DaProposal(view) => write!(f, "DaProposal({view:?})"),
154            RequestKind::Proposal(view) => write!(f, "Proposal({view:?})"),
155        }
156    }
157}
158
159/// A response for a request.  `SequencingMessage` is the same as other network messages
160/// The kind of message `M` is determined by what we requested
161#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
162#[serde(bound(deserialize = ""))]
163#[allow(clippy::large_enum_variant)]
164/// TODO: Put `Found` content in a `Box` to make enum smaller
165pub enum ResponseMessage<TYPES: NodeType> {
166    /// Peer returned us some data
167    Found(SequencingMessage<TYPES>),
168    /// Peer failed to get us data
169    NotFound,
170    /// The Request was denied
171    Denied,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175/// When a message should be broadcast to the network.
176///
177/// Network implementations may or may not respect this, at their discretion.
178pub enum BroadcastDelay {
179    /// Broadcast the message immediately
180    None,
181    /// Delay the broadcast to a given view.
182    View(u64),
183}
184
185#[async_trait]
186/// represents a networking implmentration
187/// exposes low level API for interacting with a network
188/// intended to be implemented for libp2p, the centralized server,
189/// and memory network
190pub trait ConnectedNetwork<K: SignatureKey + 'static>: Clone + Send + Sync + 'static {
191    /// Pauses the underlying network
192    fn pause(&self);
193
194    /// Resumes the underlying network
195    fn resume(&self);
196
197    /// Blocks until the network is successfully initialized
198    async fn wait_for_ready(&self);
199
200    /// Blocks until the network is shut down
201    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
202    where
203        'a: 'b,
204        Self: 'b;
205
206    /// broadcast message to some subset of nodes
207    /// blocking
208    async fn broadcast_message(
209        &self,
210        message: Vec<u8>,
211        topic: Topic,
212        broadcast_delay: BroadcastDelay,
213    ) -> Result<(), NetworkError>;
214
215    /// broadcast a message only to a DA committee
216    /// blocking
217    async fn da_broadcast_message(
218        &self,
219        message: Vec<u8>,
220        recipients: Vec<K>,
221        broadcast_delay: BroadcastDelay,
222    ) -> Result<(), NetworkError>;
223
224    /// send messages with vid shares to its recipients
225    /// blocking
226    async fn vid_broadcast_message(
227        &self,
228        messages: HashMap<K, Vec<u8>>,
229    ) -> Result<(), NetworkError> {
230        let future_results = messages
231            .into_iter()
232            .map(|(recipient_key, message)| self.direct_message(message, recipient_key));
233        let results = join_all(future_results).await;
234
235        let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
236
237        if errors.is_empty() {
238            Ok(())
239        } else {
240            Err(NetworkError::Multiple(errors))
241        }
242    }
243
244    /// Sends a direct message to a specific node
245    /// blocking
246    async fn direct_message(&self, message: Vec<u8>, recipient: K) -> Result<(), NetworkError>;
247
248    /// Receive one or many messages from the underlying network.
249    ///
250    /// # Errors
251    /// If there is a network-related failure.
252    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError>;
253
254    /// queues lookup of a node
255    ///
256    /// # Errors
257    /// Does not error.
258    fn queue_node_lookup(
259        &self,
260        _view_number: ViewNumber,
261        _pk: K,
262    ) -> Result<(), TrySendError<Option<(ViewNumber, K)>>> {
263        Ok(())
264    }
265
266    /// Update view can be used for any reason, but mostly it's for canceling tasks,
267    /// and looking up the address of the leader of a future view.
268    async fn update_view<'a, TYPES>(
269        &'a self,
270        _view: u64,
271        _epoch: Option<u64>,
272        _membership_coordinator: EpochMembershipCoordinator<TYPES>,
273    ) where
274        TYPES: NodeType<SignatureKey = K> + 'a,
275    {
276    }
277
278    /// Is primary network down? Makes sense only for combined network
279    fn is_primary_down(&self) -> bool {
280        false
281    }
282}
283
284/// A channel generator for types that need asynchronous execution
285pub type AsyncGenerator<T> =
286    Pin<Box<dyn Fn(u64) -> Pin<Box<dyn Future<Output = T> + Send>> + Send + Sync>>;
287
288/// Describes additional functionality needed by the test network implementation
289pub trait TestableNetworkingImplementation<TYPES: NodeType>
290where
291    Self: Sized,
292{
293    /// generates a network given an expected node count
294    #[allow(clippy::type_complexity)]
295    fn generator(
296        expected_node_count: usize,
297        num_bootstrap: usize,
298        network_id: usize,
299        da_committee_size: usize,
300        reliability_config: Option<Box<dyn NetworkReliability>>,
301        secondary_network_delay: Duration,
302    ) -> AsyncGenerator<Arc<Self>>;
303
304    /// Get the number of messages in-flight.
305    ///
306    /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`.
307    fn in_flight_message_count(&self) -> Option<usize>;
308}
309
310/// Changes that can occur in the network
311#[derive(Debug)]
312pub enum NetworkChange<P: SignatureKey> {
313    /// A node is connected
314    NodeConnected(P),
315
316    /// A node is disconnected
317    NodeDisconnected(P),
318}
319
320/// interface describing how reliable the network is
321#[async_trait]
322pub trait NetworkReliability: Debug + Sync + std::marker::Send + DynClone + 'static {
323    /// Sample from bernoulli distribution to decide whether
324    /// or not to keep a packet
325    /// # Panics
326    ///
327    /// Panics if `self.keep_numerator > self.keep_denominator`
328    ///
329    fn sample_keep(&self) -> bool {
330        true
331    }
332
333    /// sample from uniform distribution to decide whether
334    /// or not to keep a packet
335    fn sample_delay(&self) -> Duration {
336        std::time::Duration::ZERO
337    }
338
339    /// scramble the packet
340    fn scramble(&self, msg: Vec<u8>) -> Vec<u8> {
341        msg
342    }
343
344    /// number of times to repeat the packet
345    fn sample_repeat(&self) -> usize {
346        1
347    }
348
349    /// given a message and a way to send the message,
350    /// decide whether or not to send the message
351    /// how long to delay the message
352    /// whether or not to send duplicates
353    /// and whether or not to include noise with the message
354    /// then send the message
355    /// note: usually self is stored in a rwlock
356    /// so instead of doing the sending part, we just fiddle with the message
357    /// then return a future that does the sending and delaying
358    fn chaos_send_msg(
359        &self,
360        msg: Vec<u8>,
361        send_fn: Arc<dyn Send + Sync + 'static + Fn(Vec<u8>) -> BoxSyncFuture<'static, ()>>,
362    ) -> BoxSyncFuture<'static, ()> {
363        let sample_keep = self.sample_keep();
364        let delay = self.sample_delay();
365        let repeats = self.sample_repeat();
366        let mut msgs = Vec::new();
367        for _idx in 0..repeats {
368            let scrambled = self.scramble(msg.clone());
369            msgs.push(scrambled);
370        }
371        let closure = async move {
372            if sample_keep {
373                sleep(delay).await;
374                for msg in msgs {
375                    send_fn(msg).await;
376                }
377            }
378        };
379        Box::pin(closure)
380    }
381}
382
383// hack to get clone
384dyn_clone::clone_trait_object!(NetworkReliability);
385
386/// ideal network
387#[derive(Clone, Copy, Debug, Default)]
388pub struct PerfectNetwork {}
389
390impl NetworkReliability for PerfectNetwork {}
391
392/// A synchronous network. Packets may be delayed, but are guaranteed
393/// to arrive within `timeout` ns
394#[derive(Clone, Copy, Debug, Default)]
395pub struct SynchronousNetwork {
396    /// Max value in milliseconds that a packet may be delayed
397    pub delay_high_ms: u64,
398    /// Lowest value in milliseconds that a packet may be delayed
399    pub delay_low_ms: u64,
400}
401
402impl NetworkReliability for SynchronousNetwork {
403    /// never drop a packet
404    fn sample_keep(&self) -> bool {
405        true
406    }
407    fn sample_delay(&self) -> Duration {
408        Duration::from_millis(
409            Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms)
410                .sample(&mut rand::thread_rng()),
411        )
412    }
413}
414
415/// An asynchronous network. Packets may be dropped entirely
416/// or delayed for arbitrarily long periods
417/// probability that packet is kept = `keep_numerator` / `keep_denominator`
418/// packet delay is obtained by sampling from a uniform distribution
419/// between `delay_low_ms` and `delay_high_ms`, inclusive
420#[derive(Debug, Clone, Copy)]
421pub struct AsynchronousNetwork {
422    /// numerator for probability of keeping packets
423    pub keep_numerator: u32,
424    /// denominator for probability of keeping packets
425    pub keep_denominator: u32,
426    /// lowest value in milliseconds that a packet may be delayed
427    pub delay_low_ms: u64,
428    /// highest value in milliseconds that a packet may be delayed
429    pub delay_high_ms: u64,
430}
431
432impl NetworkReliability for AsynchronousNetwork {
433    fn sample_keep(&self) -> bool {
434        Bernoulli::from_ratio(self.keep_numerator, self.keep_denominator)
435            .unwrap()
436            .sample(&mut rand::thread_rng())
437    }
438    fn sample_delay(&self) -> Duration {
439        Duration::from_millis(
440            Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms)
441                .sample(&mut rand::thread_rng()),
442        )
443    }
444}
445
446/// An partially synchronous network. Behaves asynchronously
447/// until some arbitrary time bound, GST,
448/// then synchronously after GST
449#[allow(clippy::similar_names)]
450#[derive(Debug, Clone, Copy)]
451pub struct PartiallySynchronousNetwork {
452    /// asynchronous portion of network
453    pub asynchronous: AsynchronousNetwork,
454    /// synchronous portion of network
455    pub synchronous: SynchronousNetwork,
456    /// time when GST occurs
457    pub gst: std::time::Duration,
458    /// when the network was started
459    pub start: std::time::Instant,
460}
461
462impl NetworkReliability for PartiallySynchronousNetwork {
463    /// never drop a packet
464    fn sample_keep(&self) -> bool {
465        true
466    }
467    fn sample_delay(&self) -> Duration {
468        // act asynchronous before gst
469        if self.start.elapsed() < self.gst {
470            if self.asynchronous.sample_keep() {
471                self.asynchronous.sample_delay()
472            } else {
473                // assume packet was "dropped" and will arrive after gst
474                self.synchronous.sample_delay() + self.gst
475            }
476        } else {
477            // act synchronous after gst
478            self.synchronous.sample_delay()
479        }
480    }
481}
482
483impl Default for AsynchronousNetwork {
484    // disable all chance of failure
485    fn default() -> Self {
486        AsynchronousNetwork {
487            keep_numerator: 1,
488            keep_denominator: 1,
489            delay_low_ms: 0,
490            delay_high_ms: 0,
491        }
492    }
493}
494
495impl Default for PartiallySynchronousNetwork {
496    fn default() -> Self {
497        PartiallySynchronousNetwork {
498            synchronous: SynchronousNetwork::default(),
499            asynchronous: AsynchronousNetwork::default(),
500            gst: std::time::Duration::new(0, 0),
501            start: std::time::Instant::now(),
502        }
503    }
504}
505
506impl SynchronousNetwork {
507    /// create new `SynchronousNetwork`
508    #[must_use]
509    pub fn new(timeout: u64, delay_low_ms: u64) -> Self {
510        SynchronousNetwork {
511            delay_high_ms: timeout,
512            delay_low_ms,
513        }
514    }
515}
516
517impl AsynchronousNetwork {
518    /// create new `AsynchronousNetwork`
519    #[must_use]
520    pub fn new(
521        keep_numerator: u32,
522        keep_denominator: u32,
523        delay_low_ms: u64,
524        delay_high_ms: u64,
525    ) -> Self {
526        AsynchronousNetwork {
527            keep_numerator,
528            keep_denominator,
529            delay_low_ms,
530            delay_high_ms,
531        }
532    }
533}
534
535impl PartiallySynchronousNetwork {
536    /// create new `PartiallySynchronousNetwork`
537    #[allow(clippy::similar_names)]
538    #[must_use]
539    pub fn new(
540        asynchronous: AsynchronousNetwork,
541        synchronous: SynchronousNetwork,
542        gst: std::time::Duration,
543    ) -> Self {
544        PartiallySynchronousNetwork {
545            asynchronous,
546            synchronous,
547            gst,
548            start: std::time::Instant::now(),
549        }
550    }
551}
552
553/// A chaotic network using all the networking calls
554#[derive(Debug, Clone)]
555pub struct ChaosNetwork {
556    /// numerator for probability of keeping packets
557    pub keep_numerator: u32,
558    /// denominator for probability of keeping packets
559    pub keep_denominator: u32,
560    /// lowest value in milliseconds that a packet may be delayed
561    pub delay_low_ms: u64,
562    /// highest value in milliseconds that a packet may be delayed
563    pub delay_high_ms: u64,
564    /// lowest value of repeats for a message
565    pub repeat_low: usize,
566    /// highest value of repeats for a message
567    pub repeat_high: usize,
568}
569
570impl NetworkReliability for ChaosNetwork {
571    fn sample_keep(&self) -> bool {
572        Bernoulli::from_ratio(self.keep_numerator, self.keep_denominator)
573            .unwrap()
574            .sample(&mut rand::thread_rng())
575    }
576
577    fn sample_delay(&self) -> Duration {
578        Duration::from_millis(
579            Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms)
580                .sample(&mut rand::thread_rng()),
581        )
582    }
583
584    fn sample_repeat(&self) -> usize {
585        Uniform::new_inclusive(self.repeat_low, self.repeat_high).sample(&mut rand::thread_rng())
586    }
587}
588
589/// Used when broadcasting messages
590#[derive(Clone, Debug, PartialEq, Eq, Hash)]
591pub enum Topic {
592    /// The `Global` topic goes out to all nodes
593    Global,
594    /// The `Da` topic goes out to only the DA committee
595    Da,
596}
597
598/// Libp2p topics require a string, so we need to convert our enum to a string
599impl Display for Topic {
600    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601        match self {
602            Topic::Global => write!(f, "global"),
603            Topic::Da => write!(f, "DA"),
604        }
605    }
606}