1use std::{
12 collections::HashMap,
13 fmt::{Debug, Display},
14 hash::Hash,
15 pin::Pin,
16 sync::Arc,
17 time::Duration,
18};
19
20use async_trait::async_trait;
21use dyn_clone::DynClone;
22use futures::{future::join_all, Future};
23use rand::{
24 distributions::{Bernoulli, Uniform},
25 prelude::Distribution,
26};
27use serde::{Deserialize, Serialize};
28use thiserror::Error;
29use tokio::{sync::mpsc::error::TrySendError, time::sleep};
30
31use super::{node_implementation::NodeType, signature_key::SignatureKey};
32use crate::{
33 data::ViewNumber, epoch_membership::EpochMembershipCoordinator, message::SequencingMessage,
34 BoxSyncFuture,
35};
36
37#[derive(Debug, Error, Serialize, Deserialize)]
39pub enum PushCdnNetworkError {}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub enum TransmitType<TYPES: NodeType> {
44 Direct(TYPES::SignatureKey),
46 Broadcast,
48 DaCommitteeBroadcast,
50}
51
52#[derive(Debug, Error)]
54pub enum NetworkError {
55 #[error("Multiple errors: {0:?}")]
57 Multiple(Vec<NetworkError>),
58
59 #[error("Configuration error: {0}")]
61 ConfigError(String),
62
63 #[error("Failed to send message: {0}")]
65 MessageSendError(String),
66
67 #[error("Failed to receive message: {0}")]
69 MessageReceiveError(String),
70
71 #[error("Unimplemented")]
73 Unimplemented,
74
75 #[error("Listen error: {0}")]
77 ListenError(String),
78
79 #[error("Channel send error: {0}")]
81 ChannelSendError(String),
82
83 #[error("Channel receive error: {0}")]
85 ChannelReceiveError(String),
86
87 #[error("Network has been shut down")]
89 ShutDown,
90
91 #[error("Failed to serialize: {0}")]
93 FailedToSerialize(String),
94
95 #[error("Failed to deserialize: {0}")]
97 FailedToDeserialize(String),
98
99 #[error("Timeout: {0}")]
101 Timeout(String),
102
103 #[error("The request was cancelled before it could be fulfilled")]
105 RequestCancelled,
106
107 #[error("This node does not have any peers yet")]
109 NoPeersYet,
110
111 #[error("Node lookup failed: {0}")]
113 LookupError(String),
114}
115
116pub trait Id: Eq + PartialEq + Hash {}
118
119pub trait ViewMessage<TYPES: NodeType> {
121 fn view_number(&self) -> TYPES::View;
123}
124
125#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
127#[serde(bound(deserialize = ""))]
128pub struct DataRequest<TYPES: NodeType> {
129 pub request: RequestKind<TYPES>,
131 pub view: TYPES::View,
133 pub signature: <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
136}
137
138#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
140pub enum RequestKind<TYPES: NodeType> {
141 Vid(TYPES::View, TYPES::SignatureKey),
143 DaProposal(TYPES::View),
145 Proposal(TYPES::View),
147}
148
149impl<TYPES: NodeType> std::fmt::Debug for RequestKind<TYPES> {
150 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 match self {
152 RequestKind::Vid(view, key) => write!(f, "Vid({view:?}, {key})"),
153 RequestKind::DaProposal(view) => write!(f, "DaProposal({view:?})"),
154 RequestKind::Proposal(view) => write!(f, "Proposal({view:?})"),
155 }
156 }
157}
158
159#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
162#[serde(bound(deserialize = ""))]
163#[allow(clippy::large_enum_variant)]
164pub enum ResponseMessage<TYPES: NodeType> {
166 Found(SequencingMessage<TYPES>),
168 NotFound,
170 Denied,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub enum BroadcastDelay {
179 None,
181 View(u64),
183}
184
185#[async_trait]
186pub trait ConnectedNetwork<K: SignatureKey + 'static>: Clone + Send + Sync + 'static {
191 fn pause(&self);
193
194 fn resume(&self);
196
197 async fn wait_for_ready(&self);
199
200 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
202 where
203 'a: 'b,
204 Self: 'b;
205
206 async fn broadcast_message(
209 &self,
210 message: Vec<u8>,
211 topic: Topic,
212 broadcast_delay: BroadcastDelay,
213 ) -> Result<(), NetworkError>;
214
215 async fn da_broadcast_message(
218 &self,
219 message: Vec<u8>,
220 recipients: Vec<K>,
221 broadcast_delay: BroadcastDelay,
222 ) -> Result<(), NetworkError>;
223
224 async fn vid_broadcast_message(
227 &self,
228 messages: HashMap<K, Vec<u8>>,
229 ) -> Result<(), NetworkError> {
230 let future_results = messages
231 .into_iter()
232 .map(|(recipient_key, message)| self.direct_message(message, recipient_key));
233 let results = join_all(future_results).await;
234
235 let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
236
237 if errors.is_empty() {
238 Ok(())
239 } else {
240 Err(NetworkError::Multiple(errors))
241 }
242 }
243
244 async fn direct_message(&self, message: Vec<u8>, recipient: K) -> Result<(), NetworkError>;
247
248 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError>;
253
254 fn queue_node_lookup(
259 &self,
260 _view_number: ViewNumber,
261 _pk: K,
262 ) -> Result<(), TrySendError<Option<(ViewNumber, K)>>> {
263 Ok(())
264 }
265
266 async fn update_view<'a, TYPES>(
269 &'a self,
270 _view: u64,
271 _epoch: Option<u64>,
272 _membership_coordinator: EpochMembershipCoordinator<TYPES>,
273 ) where
274 TYPES: NodeType<SignatureKey = K> + 'a,
275 {
276 }
277
278 fn is_primary_down(&self) -> bool {
280 false
281 }
282}
283
284pub type AsyncGenerator<T> =
286 Pin<Box<dyn Fn(u64) -> Pin<Box<dyn Future<Output = T> + Send>> + Send + Sync>>;
287
288pub trait TestableNetworkingImplementation<TYPES: NodeType>
290where
291 Self: Sized,
292{
293 #[allow(clippy::type_complexity)]
295 fn generator(
296 expected_node_count: usize,
297 num_bootstrap: usize,
298 network_id: usize,
299 da_committee_size: usize,
300 reliability_config: Option<Box<dyn NetworkReliability>>,
301 secondary_network_delay: Duration,
302 ) -> AsyncGenerator<Arc<Self>>;
303
304 fn in_flight_message_count(&self) -> Option<usize>;
308}
309
310#[derive(Debug)]
312pub enum NetworkChange<P: SignatureKey> {
313 NodeConnected(P),
315
316 NodeDisconnected(P),
318}
319
320#[async_trait]
322pub trait NetworkReliability: Debug + Sync + std::marker::Send + DynClone + 'static {
323 fn sample_keep(&self) -> bool {
330 true
331 }
332
333 fn sample_delay(&self) -> Duration {
336 std::time::Duration::ZERO
337 }
338
339 fn scramble(&self, msg: Vec<u8>) -> Vec<u8> {
341 msg
342 }
343
344 fn sample_repeat(&self) -> usize {
346 1
347 }
348
349 fn chaos_send_msg(
359 &self,
360 msg: Vec<u8>,
361 send_fn: Arc<dyn Send + Sync + 'static + Fn(Vec<u8>) -> BoxSyncFuture<'static, ()>>,
362 ) -> BoxSyncFuture<'static, ()> {
363 let sample_keep = self.sample_keep();
364 let delay = self.sample_delay();
365 let repeats = self.sample_repeat();
366 let mut msgs = Vec::new();
367 for _idx in 0..repeats {
368 let scrambled = self.scramble(msg.clone());
369 msgs.push(scrambled);
370 }
371 let closure = async move {
372 if sample_keep {
373 sleep(delay).await;
374 for msg in msgs {
375 send_fn(msg).await;
376 }
377 }
378 };
379 Box::pin(closure)
380 }
381}
382
383dyn_clone::clone_trait_object!(NetworkReliability);
385
386#[derive(Clone, Copy, Debug, Default)]
388pub struct PerfectNetwork {}
389
390impl NetworkReliability for PerfectNetwork {}
391
392#[derive(Clone, Copy, Debug, Default)]
395pub struct SynchronousNetwork {
396 pub delay_high_ms: u64,
398 pub delay_low_ms: u64,
400}
401
402impl NetworkReliability for SynchronousNetwork {
403 fn sample_keep(&self) -> bool {
405 true
406 }
407 fn sample_delay(&self) -> Duration {
408 Duration::from_millis(
409 Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms)
410 .sample(&mut rand::thread_rng()),
411 )
412 }
413}
414
415#[derive(Debug, Clone, Copy)]
421pub struct AsynchronousNetwork {
422 pub keep_numerator: u32,
424 pub keep_denominator: u32,
426 pub delay_low_ms: u64,
428 pub delay_high_ms: u64,
430}
431
432impl NetworkReliability for AsynchronousNetwork {
433 fn sample_keep(&self) -> bool {
434 Bernoulli::from_ratio(self.keep_numerator, self.keep_denominator)
435 .unwrap()
436 .sample(&mut rand::thread_rng())
437 }
438 fn sample_delay(&self) -> Duration {
439 Duration::from_millis(
440 Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms)
441 .sample(&mut rand::thread_rng()),
442 )
443 }
444}
445
446#[allow(clippy::similar_names)]
450#[derive(Debug, Clone, Copy)]
451pub struct PartiallySynchronousNetwork {
452 pub asynchronous: AsynchronousNetwork,
454 pub synchronous: SynchronousNetwork,
456 pub gst: std::time::Duration,
458 pub start: std::time::Instant,
460}
461
462impl NetworkReliability for PartiallySynchronousNetwork {
463 fn sample_keep(&self) -> bool {
465 true
466 }
467 fn sample_delay(&self) -> Duration {
468 if self.start.elapsed() < self.gst {
470 if self.asynchronous.sample_keep() {
471 self.asynchronous.sample_delay()
472 } else {
473 self.synchronous.sample_delay() + self.gst
475 }
476 } else {
477 self.synchronous.sample_delay()
479 }
480 }
481}
482
483impl Default for AsynchronousNetwork {
484 fn default() -> Self {
486 AsynchronousNetwork {
487 keep_numerator: 1,
488 keep_denominator: 1,
489 delay_low_ms: 0,
490 delay_high_ms: 0,
491 }
492 }
493}
494
495impl Default for PartiallySynchronousNetwork {
496 fn default() -> Self {
497 PartiallySynchronousNetwork {
498 synchronous: SynchronousNetwork::default(),
499 asynchronous: AsynchronousNetwork::default(),
500 gst: std::time::Duration::new(0, 0),
501 start: std::time::Instant::now(),
502 }
503 }
504}
505
506impl SynchronousNetwork {
507 #[must_use]
509 pub fn new(timeout: u64, delay_low_ms: u64) -> Self {
510 SynchronousNetwork {
511 delay_high_ms: timeout,
512 delay_low_ms,
513 }
514 }
515}
516
517impl AsynchronousNetwork {
518 #[must_use]
520 pub fn new(
521 keep_numerator: u32,
522 keep_denominator: u32,
523 delay_low_ms: u64,
524 delay_high_ms: u64,
525 ) -> Self {
526 AsynchronousNetwork {
527 keep_numerator,
528 keep_denominator,
529 delay_low_ms,
530 delay_high_ms,
531 }
532 }
533}
534
535impl PartiallySynchronousNetwork {
536 #[allow(clippy::similar_names)]
538 #[must_use]
539 pub fn new(
540 asynchronous: AsynchronousNetwork,
541 synchronous: SynchronousNetwork,
542 gst: std::time::Duration,
543 ) -> Self {
544 PartiallySynchronousNetwork {
545 asynchronous,
546 synchronous,
547 gst,
548 start: std::time::Instant::now(),
549 }
550 }
551}
552
553#[derive(Debug, Clone)]
555pub struct ChaosNetwork {
556 pub keep_numerator: u32,
558 pub keep_denominator: u32,
560 pub delay_low_ms: u64,
562 pub delay_high_ms: u64,
564 pub repeat_low: usize,
566 pub repeat_high: usize,
568}
569
570impl NetworkReliability for ChaosNetwork {
571 fn sample_keep(&self) -> bool {
572 Bernoulli::from_ratio(self.keep_numerator, self.keep_denominator)
573 .unwrap()
574 .sample(&mut rand::thread_rng())
575 }
576
577 fn sample_delay(&self) -> Duration {
578 Duration::from_millis(
579 Uniform::new_inclusive(self.delay_low_ms, self.delay_high_ms)
580 .sample(&mut rand::thread_rng()),
581 )
582 }
583
584 fn sample_repeat(&self) -> usize {
585 Uniform::new_inclusive(self.repeat_low, self.repeat_high).sample(&mut rand::thread_rng())
586 }
587}
588
589#[derive(Clone, Debug, PartialEq, Eq, Hash)]
591pub enum Topic {
592 Global,
594 Da,
596}
597
598impl Display for Topic {
600 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601 match self {
602 Topic::Global => write!(f, "global"),
603 Topic::Da => write!(f, "DA"),
604 }
605 }
606}