1use std::{
10 collections::{BTreeMap, HashMap},
11 future::Future,
12 num::NonZeroUsize,
13 sync::{
14 atomic::{AtomicBool, AtomicU64, Ordering},
15 Arc,
16 },
17 time::Duration,
18};
19
20use async_broadcast::{broadcast, InactiveReceiver, Sender};
21use async_lock::RwLock;
22use async_trait::async_trait;
23use futures::{join, select, FutureExt};
24#[cfg(feature = "hotshot-testing")]
25use hotshot_types::traits::network::{
26 AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
27};
28use hotshot_types::{
29 boxed_sync,
30 constants::{
31 COMBINED_NETWORK_CACHE_SIZE, COMBINED_NETWORK_DELAY_DURATION,
32 COMBINED_NETWORK_MIN_PRIMARY_FAILURES, COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL,
33 },
34 data::ViewNumber,
35 epoch_membership::EpochMembershipCoordinator,
36 traits::{
37 network::{BroadcastDelay, ConnectedNetwork, Topic},
38 node_implementation::NodeType,
39 },
40 BoxSyncFuture,
41};
42use lru::LruCache;
43use parking_lot::RwLock as PlRwLock;
44use tokio::{spawn, sync::mpsc::error::TrySendError, time::sleep};
45use tracing::{debug, info, warn};
46
47use super::{push_cdn_network::PushCdnNetwork, NetworkError};
48use crate::traits::implementations::Libp2pNetwork;
49
50type DelayedTasksChannelsMap = Arc<RwLock<BTreeMap<u64, (Sender<()>, InactiveReceiver<()>)>>>;
52
53#[derive(Clone)]
56pub struct CombinedNetworks<TYPES: NodeType> {
57 networks: Arc<UnderlyingCombinedNetworks<TYPES>>,
59
60 message_deduplication_cache: Arc<PlRwLock<MessageDeduplicationCache>>,
63
64 primary_fail_counter: Arc<AtomicU64>,
66
67 primary_down: Arc<AtomicBool>,
69
70 delay_duration: Arc<RwLock<Duration>>,
72
73 delayed_tasks_channels: DelayedTasksChannelsMap,
75
76 no_delay_counter: Arc<AtomicU64>,
78}
79
80impl<TYPES: NodeType> CombinedNetworks<TYPES> {
81 #[must_use]
87 pub fn new(
88 primary_network: PushCdnNetwork<TYPES::SignatureKey>,
89 secondary_network: Libp2pNetwork<TYPES>,
90 delay_duration: Option<Duration>,
91 ) -> Self {
92 let networks = Arc::from(UnderlyingCombinedNetworks(
94 primary_network,
95 secondary_network,
96 ));
97
98 Self {
99 networks,
100 message_deduplication_cache: Arc::new(PlRwLock::new(MessageDeduplicationCache::new())),
101 primary_fail_counter: Arc::new(AtomicU64::new(0)),
102 primary_down: Arc::new(AtomicBool::new(false)),
103 delay_duration: Arc::new(RwLock::new(
104 delay_duration.unwrap_or(Duration::from_millis(COMBINED_NETWORK_DELAY_DURATION)),
105 )),
106 delayed_tasks_channels: Arc::default(),
107 no_delay_counter: Arc::new(AtomicU64::new(0)),
108 }
109 }
110
111 #[must_use]
113 pub fn primary(&self) -> &PushCdnNetwork<TYPES::SignatureKey> {
114 &self.networks.0
115 }
116
117 #[must_use]
119 pub fn secondary(&self) -> &Libp2pNetwork<TYPES> {
120 &self.networks.1
121 }
122
123 async fn send_both_networks(
125 &self,
126 _message: Vec<u8>,
127 primary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
128 secondary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
129 broadcast_delay: BroadcastDelay,
130 ) -> Result<(), NetworkError> {
131 let mut primary_failed = false;
133 if self.primary_down.load(Ordering::Relaxed) {
134 primary_failed = true;
136 } else if self.primary_fail_counter.load(Ordering::Relaxed)
137 > COMBINED_NETWORK_MIN_PRIMARY_FAILURES
138 {
139 info!(
142 "View progression is slower than normally, stop delaying messages on the secondary"
143 );
144 self.primary_down.store(true, Ordering::Relaxed);
145 primary_failed = true;
146 }
147
148 if let Err(e) = primary_future.await {
150 warn!("Error on primary network: {}", e);
152 self.primary_fail_counter.fetch_add(1, Ordering::Relaxed);
153 primary_failed = true;
154 };
155
156 if let (BroadcastDelay::View(view), false) = (broadcast_delay, primary_failed) {
157 let duration = *self.delay_duration.read().await;
159 let primary_down = Arc::clone(&self.primary_down);
160 let primary_fail_counter = Arc::clone(&self.primary_fail_counter);
161 let mut receiver = self
164 .delayed_tasks_channels
165 .write()
166 .await
167 .entry(view)
168 .or_insert_with(|| {
169 let (s, r) = broadcast(1);
170 (s, r.deactivate())
171 })
172 .1
173 .activate_cloned();
174 spawn(async move {
176 sleep(duration).await;
177 if receiver.try_recv().is_ok() {
178 debug!(
180 "Not sending on secondary after delay, task was canceled in view update"
181 );
182 match primary_fail_counter.load(Ordering::Relaxed) {
183 0u64 => {
184 primary_down.store(false, Ordering::Relaxed);
186 debug!("primary_fail_counter reached zero, primary_down set to false");
187 },
188 c => {
189 primary_fail_counter.store(c - 1, Ordering::Relaxed);
191 debug!("primary_fail_counter set to {:?}", c - 1);
192 },
193 }
194 return Ok(());
195 }
196 debug!("Sending on secondary after delay, message possibly has not reached recipient on primary");
199 primary_fail_counter.fetch_add(1, Ordering::Relaxed);
200 secondary_future.await
201 });
202 Ok(())
203 } else {
204 if self.primary_down.load(Ordering::Relaxed) {
206 match self.no_delay_counter.load(Ordering::Relaxed) {
210 c if c < COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL => {
211 self.no_delay_counter.store(c + 1, Ordering::Relaxed);
213 },
214 _ => {
215 debug!(
217 "Sent on secondary without delay more than {} times,\
218 try delaying to check primary",
219 COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL
220 );
221 self.no_delay_counter.store(0u64, Ordering::Relaxed);
223 self.primary_down.store(false, Ordering::Relaxed);
225 self.primary_fail_counter
227 .store(COMBINED_NETWORK_MIN_PRIMARY_FAILURES, Ordering::Relaxed);
228 },
229 }
230 }
231 secondary_future.await
233 }
234 }
235}
236
237#[derive(Clone)]
241pub struct UnderlyingCombinedNetworks<TYPES: NodeType>(
242 pub PushCdnNetwork<TYPES::SignatureKey>,
243 pub Libp2pNetwork<TYPES>,
244);
245
246#[cfg(feature = "hotshot-testing")]
247impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetworks<TYPES> {
248 fn generator(
249 expected_node_count: usize,
250 num_bootstrap: usize,
251 network_id: usize,
252 da_committee_size: usize,
253 reliability_config: Option<Box<dyn NetworkReliability>>,
254 secondary_network_delay: Duration,
255 ) -> AsyncGenerator<Arc<Self>> {
256 let generators = (
257 <PushCdnNetwork<TYPES::SignatureKey> as TestableNetworkingImplementation<TYPES>>::generator(
258 expected_node_count,
259 num_bootstrap,
260 network_id,
261 da_committee_size,
262 None,
263 Duration::default(),
264 ),
265 <Libp2pNetwork<TYPES> as TestableNetworkingImplementation<TYPES>>::generator(
266 expected_node_count,
267 num_bootstrap,
268 network_id,
269 da_committee_size,
270 reliability_config,
271 Duration::default(),
272 )
273 );
274 Box::pin(move |node_id| {
275 let gen0 = generators.0(node_id);
276 let gen1 = generators.1(node_id);
277
278 Box::pin(async move {
279 let cdn = gen0.await;
281 let cdn = Arc::<PushCdnNetwork<TYPES::SignatureKey>>::into_inner(cdn).unwrap();
282
283 let p2p = gen1.await;
285
286 let underlying_combined = UnderlyingCombinedNetworks(
288 cdn.clone(),
289 Arc::<Libp2pNetwork<TYPES>>::unwrap_or_clone(p2p),
290 );
291
292 let message_deduplication_cache =
294 Arc::new(PlRwLock::new(MessageDeduplicationCache::new()));
295
296 let combined_network = Self {
298 networks: Arc::new(underlying_combined),
299 primary_fail_counter: Arc::new(AtomicU64::new(0)),
300 primary_down: Arc::new(AtomicBool::new(false)),
301 message_deduplication_cache: Arc::clone(&message_deduplication_cache),
302 delay_duration: Arc::new(RwLock::new(secondary_network_delay)),
303 delayed_tasks_channels: Arc::default(),
304 no_delay_counter: Arc::new(AtomicU64::new(0)),
305 };
306
307 Arc::new(combined_network)
308 })
309 })
310 }
311
312 fn in_flight_message_count(&self) -> Option<usize> {
316 None
317 }
318}
319
320#[async_trait]
321impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks<TYPES> {
322 fn pause(&self) {
323 self.networks.0.pause();
324 }
325
326 fn resume(&self) {
327 self.networks.0.resume();
328 }
329
330 async fn wait_for_ready(&self) {
331 join!(
332 self.primary().wait_for_ready(),
333 self.secondary().wait_for_ready()
334 );
335 }
336
337 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
338 where
339 'a: 'b,
340 Self: 'b,
341 {
342 let closure = async move {
343 join!(self.primary().shut_down(), self.secondary().shut_down());
344 };
345 boxed_sync(closure)
346 }
347
348 async fn broadcast_message(
349 &self,
350 message: Vec<u8>,
351 topic: Topic,
352 broadcast_delay: BroadcastDelay,
353 ) -> Result<(), NetworkError> {
354 let primary = self.primary().clone();
355 let secondary = self.secondary().clone();
356 let primary_message = message.clone();
357 let secondary_message = message.clone();
358 let topic_clone = topic.clone();
359 self.send_both_networks(
360 message,
361 async move {
362 primary
363 .broadcast_message(primary_message, topic_clone, BroadcastDelay::None)
364 .await
365 },
366 async move {
367 secondary
368 .broadcast_message(secondary_message, topic, BroadcastDelay::None)
369 .await
370 },
371 broadcast_delay,
372 )
373 .await
374 }
375
376 async fn da_broadcast_message(
377 &self,
378 message: Vec<u8>,
379 recipients: Vec<TYPES::SignatureKey>,
380 broadcast_delay: BroadcastDelay,
381 ) -> Result<(), NetworkError> {
382 let primary = self.primary().clone();
383 let secondary = self.secondary().clone();
384 let primary_message = message.clone();
385 let secondary_message = message.clone();
386 let primary_recipients = recipients.clone();
387 self.send_both_networks(
388 message,
389 async move {
390 primary
391 .da_broadcast_message(primary_message, primary_recipients, BroadcastDelay::None)
392 .await
393 },
394 async move {
395 secondary
396 .da_broadcast_message(secondary_message, recipients, BroadcastDelay::None)
397 .await
398 },
399 broadcast_delay,
400 )
401 .await
402 }
403
404 async fn direct_message(
405 &self,
406 message: Vec<u8>,
407 recipient: TYPES::SignatureKey,
408 ) -> Result<(), NetworkError> {
409 let primary = self.primary().clone();
410 let secondary = self.secondary().clone();
411 let primary_message = message.clone();
412 let secondary_message = message.clone();
413 let primary_recipient = recipient.clone();
414 self.send_both_networks(
415 message,
416 async move {
417 primary
418 .direct_message(primary_message, primary_recipient)
419 .await
420 },
421 async move { secondary.direct_message(secondary_message, recipient).await },
422 BroadcastDelay::None,
423 )
424 .await
425 }
426
427 async fn vid_broadcast_message(
428 &self,
429 messages: HashMap<TYPES::SignatureKey, Vec<u8>>,
430 ) -> Result<(), NetworkError> {
431 self.networks.0.vid_broadcast_message(messages).await
432 }
433
434 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
439 loop {
440 let mut primary_fut = self.primary().recv_message().fuse();
442 let mut secondary_fut = self.secondary().recv_message().fuse();
443
444 let (message, from_primary) = select! {
446 p = primary_fut => (p?, true),
447 s = secondary_fut => (s?, false),
448 };
449
450 if self
452 .message_deduplication_cache
453 .write()
454 .is_unique(&message, from_primary)
455 {
456 break Ok(message);
457 }
458 }
459 }
460
461 fn queue_node_lookup(
462 &self,
463 view_number: ViewNumber,
464 pk: TYPES::SignatureKey,
465 ) -> Result<(), TrySendError<Option<(ViewNumber, TYPES::SignatureKey)>>> {
466 self.primary().queue_node_lookup(view_number, pk.clone())?;
467 self.secondary().queue_node_lookup(view_number, pk)
468 }
469
470 async fn update_view<'a, T>(
471 &'a self,
472 view: u64,
473 epoch: Option<u64>,
474 membership: EpochMembershipCoordinator<T>,
475 ) where
476 T: NodeType<SignatureKey = TYPES::SignatureKey> + 'a,
477 {
478 let delayed_tasks_channels = Arc::clone(&self.delayed_tasks_channels);
479 spawn(async move {
480 let mut map_lock = delayed_tasks_channels.write().await;
481 while let Some((first_view, _)) = map_lock.first_key_value() {
482 if *first_view < view {
484 if let Some((_, (sender, _))) = map_lock.pop_first() {
485 let _ = sender.try_broadcast(());
486 } else {
487 break;
488 }
489 } else {
490 break;
491 }
492 }
493 });
494 self.networks
496 .1
497 .update_view::<T>(view, epoch, membership)
498 .await;
499 }
500
501 fn is_primary_down(&self) -> bool {
502 self.primary_down.load(Ordering::Relaxed)
503 }
504}
505
506struct MessageDeduplicationCache {
507 primary_message_cache: LruCache<blake3::Hash, ()>,
510
511 secondary_message_cache: LruCache<blake3::Hash, ()>,
514}
515
516impl MessageDeduplicationCache {
517 fn new() -> Self {
519 Self {
520 primary_message_cache: LruCache::new(
521 NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
522 ),
523 secondary_message_cache: LruCache::new(
524 NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
525 ),
526 }
527 }
528
529 fn is_unique(&mut self, message: &[u8], from_primary: bool) -> bool {
531 let message_hash = blake3::hash(message);
533
534 let (this_cache, other_cache) = if from_primary {
536 (
537 &mut self.primary_message_cache,
538 &mut self.secondary_message_cache,
539 )
540 } else {
541 (
542 &mut self.secondary_message_cache,
543 &mut self.primary_message_cache,
544 )
545 };
546
547 if other_cache.pop(&message_hash).is_some() {
550 false
552 } else {
553 this_cache.put(message_hash, ());
555 true
556 }
557 }
558}
559
560#[cfg(test)]
561mod tests {
562 use super::*;
563
564 #[test]
565 fn test_message_deduplication() {
566 let message = b"hello";
567
568 let mut cache = MessageDeduplicationCache::new();
570 assert!(cache.is_unique(message, true));
571 assert!(!cache.is_unique(message, false));
572
573 assert!(cache.is_unique(message, true));
576 assert!(!cache.is_unique(message, false));
577
578 assert!(cache.is_unique(message, false));
580 assert!(!cache.is_unique(message, true));
581 assert!(cache.is_unique(message, false));
582 assert!(!cache.is_unique(message, true));
583
584 assert!(cache.is_unique(message, true));
586 assert!(cache.is_unique(message, true));
587 assert!(cache.is_unique(message, true));
588 assert!(!cache.is_unique(message, false));
589 assert!(cache.is_unique(message, false));
590 }
591}