1use std::{
12 collections::HashMap,
13 fmt::{Debug, Display},
14 hash::Hash,
15 pin::Pin,
16 sync::Arc,
17 time::Duration,
18};
19
20use async_trait::async_trait;
21use dyn_clone::DynClone;
22use futures::{Future, future::join_all};
23use rand::{
24 distributions::{Bernoulli, Uniform},
25 prelude::Distribution,
26};
27use serde::{Deserialize, Serialize};
28use thiserror::Error;
29use tokio::{sync::mpsc::error::TrySendError, time::sleep};
30
31use super::{node_implementation::NodeType, signature_key::SignatureKey};
32use crate::{
33 BoxSyncFuture, PeerConnectInfo,
34 data::{EpochNumber, ViewNumber},
35 epoch_membership::EpochMembershipCoordinator,
36 message::SequencingMessage,
37};
38
39#[derive(Debug, Error, Serialize, Deserialize)]
41pub enum PushCdnNetworkError {}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub enum TransmitType<TYPES: NodeType> {
46 Direct(TYPES::SignatureKey),
48 Broadcast,
50 DaCommitteeBroadcast,
52}
53
54#[derive(Debug, Error)]
56pub enum NetworkError {
57 #[error("Multiple errors: {0:?}")]
59 Multiple(Vec<NetworkError>),
60
61 #[error("Configuration error: {0}")]
63 ConfigError(String),
64
65 #[error("Failed to send message: {0}")]
67 MessageSendError(String),
68
69 #[error("Failed to receive message: {0}")]
71 MessageReceiveError(String),
72
73 #[error("Unimplemented")]
75 Unimplemented,
76
77 #[error("Listen error: {0}")]
79 ListenError(String),
80
81 #[error("Channel send error: {0}")]
83 ChannelSendError(String),
84
85 #[error("Channel receive error: {0}")]
87 ChannelReceiveError(String),
88
89 #[error("Network has been shut down")]
91 ShutDown,
92
93 #[error("Failed to serialize: {0}")]
95 FailedToSerialize(String),
96
97 #[error("Failed to deserialize: {0}")]
99 FailedToDeserialize(String),
100
101 #[error("Timeout: {0}")]
103 Timeout(String),
104
105 #[error("The request was cancelled before it could be fulfilled")]
107 RequestCancelled,
108
109 #[error("This node does not have any peers yet")]
111 NoPeersYet,
112
113 #[error("Node lookup failed: {0}")]
115 LookupError(String),
116}
117
118pub trait Id: Eq + PartialEq + Hash {}
120
121pub trait ViewMessage<TYPES: NodeType> {
123 fn view_number(&self) -> ViewNumber;
125}
126
127#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
129#[serde(bound(deserialize = ""))]
130pub struct DataRequest<TYPES: NodeType> {
131 pub request: RequestKind<TYPES>,
133 pub view: ViewNumber,
135 pub signature: <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
138}
139
140#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
142pub enum RequestKind<TYPES: NodeType> {
143 Vid(ViewNumber, TYPES::SignatureKey),
145 DaProposal(ViewNumber),
147 Proposal(ViewNumber),
149}
150
151impl<TYPES: NodeType> std::fmt::Debug for RequestKind<TYPES> {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 match self {
154 RequestKind::Vid(view, key) => write!(f, "Vid({view:?}, {key})"),
155 RequestKind::DaProposal(view) => write!(f, "DaProposal({view:?})"),
156 RequestKind::Proposal(view) => write!(f, "Proposal({view:?})"),
157 }
158 }
159}
160
161#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
164#[serde(bound(deserialize = ""))]
165#[allow(clippy::large_enum_variant)]
166pub enum ResponseMessage<TYPES: NodeType> {
168 Found(SequencingMessage<TYPES>),
170 NotFound,
172 Denied,
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
180pub enum BroadcastDelay {
181 None,
183 View(u64),
185}
186
187#[async_trait]
192pub trait ConnectedNetwork<K: SignatureKey + 'static>: Clone + Send + Sync + 'static {
193 fn pause(&self);
195
196 fn resume(&self);
198
199 async fn wait_for_ready(&self);
201
202 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
204 where
205 'a: 'b,
206 Self: 'b;
207
208 async fn broadcast_message(
211 &self,
212 view: ViewNumber,
213 message: Vec<u8>,
214 topic: Topic,
215 broadcast_delay: BroadcastDelay,
216 ) -> Result<(), NetworkError>;
217
218 async fn da_broadcast_message(
221 &self,
222 view: ViewNumber,
223 message: Vec<u8>,
224 recipients: Vec<K>,
225 broadcast_delay: BroadcastDelay,
226 ) -> Result<(), NetworkError>;
227
228 async fn vid_broadcast_message(
231 &self,
232 messages: HashMap<K, (ViewNumber, Vec<u8>)>,
233 ) -> Result<(), NetworkError> {
234 let future_results = messages
235 .into_iter()
236 .map(|(recipient_key, (v, m))| self.direct_message(v, m, recipient_key));
237 let results = join_all(future_results).await;
238
239 let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
240
241 if errors.is_empty() {
242 Ok(())
243 } else {
244 Err(NetworkError::Multiple(errors))
245 }
246 }
247
248 async fn direct_message(
251 &self,
252 view: ViewNumber,
253 message: Vec<u8>,
254 recipient: K,
255 ) -> Result<(), NetworkError>;
256
257 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError>;
262
263 fn queue_node_lookup(
268 &self,
269 _: ViewNumber,
270 _: K,
271 ) -> Result<(), TrySendError<Option<(ViewNumber, K)>>> {
272 Ok(())
273 }
274
275 async fn update_view<TYPES>(
278 &self,
279 _: ViewNumber,
280 _: Option<EpochNumber>,
281 _: EpochMembershipCoordinator<TYPES>,
282 ) where
283 TYPES: NodeType<SignatureKey = K>,
284 {
285 }
286
287 fn is_primary_down(&self) -> bool {
289 false
290 }
291}
292
293pub type AsyncGenerator<T> =
295 Pin<Box<dyn Fn(u64) -> Pin<Box<dyn Future<Output = T> + Send>> + Send + Sync>>;
296
297pub trait TestableNetworkingImplementation<TYPES: NodeType>
299where
300 Self: Sized,
301{
302 #[allow(clippy::type_complexity)]
304 fn generator(
305 expected_node_count: usize,
306 num_bootstrap: usize,
307 network_id: usize,
308 da_committee_size: usize,
309 reliability_config: Option<Box<dyn NetworkReliability>>,
310 secondary_network_delay: Duration,
311 connect_infos: &mut HashMap<TYPES::SignatureKey, PeerConnectInfo>,
312 ) -> AsyncGenerator<Arc<Self>>;
313
314 fn in_flight_message_count(&self) -> Option<usize>;
318}
319
320#[derive(Debug)]
322pub enum NetworkChange<P: SignatureKey> {
323 NodeConnected(P),
325
326 NodeDisconnected(P),
328}
329
330#[async_trait]
332pub trait NetworkReliability: Debug + Sync + std::marker::Send + DynClone + 'static {
333 fn sample_keep(&self) -> bool {
340 true
341 }
342
343 fn sample_delay(&self) -> Duration {
346 std::time::Duration::ZERO
347 }
348
349 fn scramble(&self, msg: Vec<u8>) -> Vec<u8> {
351 msg
352 }
353
354 fn sample_repeat(&self) -> usize {
356 1
357 }
358
359 fn chaos_send_msg(
369 &self,
370 msg: Vec<u8>,
371 send_fn: Arc<dyn Send + Sync + 'static + Fn(Vec<u8>) -> BoxSyncFuture<'static, ()>>,
372 ) -> BoxSyncFuture<'static, ()> {
373 let sample_keep = self.sample_keep();
374 let delay = self.sample_delay();
375 let repeats = self.sample_repeat();
376 let mut msgs = Vec::new();
377 for _idx in 0..repeats {
378 let scrambled = self.scramble(msg.clone());
379 msgs.push(scrambled);
380 }
381 let closure = async move {
382 if sample_keep {
383 sleep(delay).await;
384 for msg in msgs {
385 send_fn(msg).await;
386 }
387 }
388 };
389 Box::pin(closure)
390 }
391}
392
393dyn_clone::clone_trait_object!(NetworkReliability);
395
396#[derive(Clone, Copy, Debug, Default)]
398pub struct PerfectNetwork {}
399
400impl NetworkReliability for PerfectNetwork {}
401
402#[derive(Clone, Copy, Debug, Default)]
405pub struct SynchronousNetwork {
406 pub delay_high_ms: u64,
408 pub delay_low_ms: u64,
410}
411
412impl NetworkReliability for SynchronousNetwork {
413 fn sample_keep(&self) -> bool {
415 true
416 }
417 fn sample_delay(&self) -> Duration {
418 Duration::from_millis(
419 Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms)
420 .sample(&mut rand::thread_rng()),
421 )
422 }
423}
424
425#[derive(Debug, Clone, Copy)]
431pub struct AsynchronousNetwork {
432 pub keep_numerator: u32,
434 pub keep_denominator: u32,
436 pub delay_low_ms: u64,
438 pub delay_high_ms: u64,
440}
441
442impl NetworkReliability for AsynchronousNetwork {
443 fn sample_keep(&self) -> bool {
444 Bernoulli::from_ratio(self.keep_numerator, self.keep_denominator)
445 .unwrap()
446 .sample(&mut rand::thread_rng())
447 }
448 fn sample_delay(&self) -> Duration {
449 Duration::from_millis(
450 Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms)
451 .sample(&mut rand::thread_rng()),
452 )
453 }
454}
455
456#[allow(clippy::similar_names)]
460#[derive(Debug, Clone, Copy)]
461pub struct PartiallySynchronousNetwork {
462 pub asynchronous: AsynchronousNetwork,
464 pub synchronous: SynchronousNetwork,
466 pub gst: std::time::Duration,
468 pub start: std::time::Instant,
470}
471
472impl NetworkReliability for PartiallySynchronousNetwork {
473 fn sample_keep(&self) -> bool {
475 true
476 }
477 fn sample_delay(&self) -> Duration {
478 if self.start.elapsed() < self.gst {
480 if self.asynchronous.sample_keep() {
481 self.asynchronous.sample_delay()
482 } else {
483 self.synchronous.sample_delay() + self.gst
485 }
486 } else {
487 self.synchronous.sample_delay()
489 }
490 }
491}
492
493impl Default for AsynchronousNetwork {
494 fn default() -> Self {
496 AsynchronousNetwork {
497 keep_numerator: 1,
498 keep_denominator: 1,
499 delay_low_ms: 0,
500 delay_high_ms: 0,
501 }
502 }
503}
504
505impl Default for PartiallySynchronousNetwork {
506 fn default() -> Self {
507 PartiallySynchronousNetwork {
508 synchronous: SynchronousNetwork::default(),
509 asynchronous: AsynchronousNetwork::default(),
510 gst: std::time::Duration::new(0, 0),
511 start: std::time::Instant::now(),
512 }
513 }
514}
515
516impl SynchronousNetwork {
517 #[must_use]
519 pub fn new(timeout: u64, delay_low_ms: u64) -> Self {
520 SynchronousNetwork {
521 delay_high_ms: timeout,
522 delay_low_ms,
523 }
524 }
525}
526
527impl AsynchronousNetwork {
528 #[must_use]
530 pub fn new(
531 keep_numerator: u32,
532 keep_denominator: u32,
533 delay_low_ms: u64,
534 delay_high_ms: u64,
535 ) -> Self {
536 AsynchronousNetwork {
537 keep_numerator,
538 keep_denominator,
539 delay_low_ms,
540 delay_high_ms,
541 }
542 }
543}
544
545impl PartiallySynchronousNetwork {
546 #[allow(clippy::similar_names)]
548 #[must_use]
549 pub fn new(
550 asynchronous: AsynchronousNetwork,
551 synchronous: SynchronousNetwork,
552 gst: std::time::Duration,
553 ) -> Self {
554 PartiallySynchronousNetwork {
555 asynchronous,
556 synchronous,
557 gst,
558 start: std::time::Instant::now(),
559 }
560 }
561}
562
563#[derive(Debug, Clone)]
565pub struct ChaosNetwork {
566 pub keep_numerator: u32,
568 pub keep_denominator: u32,
570 pub delay_low_ms: u64,
572 pub delay_high_ms: u64,
574 pub repeat_low: usize,
576 pub repeat_high: usize,
578}
579
580impl NetworkReliability for ChaosNetwork {
581 fn sample_keep(&self) -> bool {
582 Bernoulli::from_ratio(self.keep_numerator, self.keep_denominator)
583 .unwrap()
584 .sample(&mut rand::thread_rng())
585 }
586
587 fn sample_delay(&self) -> Duration {
588 Duration::from_millis(
589 Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms)
590 .sample(&mut rand::thread_rng()),
591 )
592 }
593
594 fn sample_repeat(&self) -> usize {
595 Uniform::new_inclusive(self.repeat_low, self.repeat_high).sample(&mut rand::thread_rng())
596 }
597}
598
599#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)]
601pub enum Topic {
602 Global,
604 Da,
606}
607
608impl Display for Topic {
610 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
611 match self {
612 Topic::Global => write!(f, "global"),
613 Topic::Da => write!(f, "DA"),
614 }
615 }
616}