1use std::{
10 collections::{BTreeMap, HashMap},
11 future::Future,
12 num::NonZeroUsize,
13 sync::{
14 atomic::{AtomicBool, AtomicU64, Ordering},
15 Arc,
16 },
17 time::Duration,
18};
19
20use async_broadcast::{broadcast, InactiveReceiver, Sender};
21use async_lock::RwLock;
22use async_trait::async_trait;
23use futures::{join, select, FutureExt};
24#[cfg(feature = "hotshot-testing")]
25use hotshot_types::traits::network::{
26 AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
27};
28use hotshot_types::{
29 boxed_sync,
30 constants::{
31 COMBINED_NETWORK_CACHE_SIZE, COMBINED_NETWORK_DELAY_DURATION,
32 COMBINED_NETWORK_MIN_PRIMARY_FAILURES, COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL,
33 },
34 data::ViewNumber,
35 epoch_membership::EpochMembershipCoordinator,
36 traits::{
37 network::{BroadcastDelay, ConnectedNetwork, Topic},
38 node_implementation::NodeType,
39 },
40 BoxSyncFuture,
41};
42use lru::LruCache;
43use parking_lot::RwLock as PlRwLock;
44use tokio::{spawn, sync::mpsc::error::TrySendError, time::sleep};
45use tracing::{debug, info, warn};
46
47use super::{push_cdn_network::PushCdnNetwork, NetworkError};
48use crate::traits::implementations::Libp2pNetwork;
49
50type DelayedTasksChannelsMap = Arc<RwLock<BTreeMap<u64, (Sender<()>, InactiveReceiver<()>)>>>;
52
53#[derive(Clone)]
56pub struct CombinedNetworks<TYPES: NodeType> {
57 networks: Arc<UnderlyingCombinedNetworks<TYPES>>,
59
60 message_deduplication_cache: Arc<PlRwLock<MessageDeduplicationCache>>,
63
64 primary_fail_counter: Arc<AtomicU64>,
66
67 primary_down: Arc<AtomicBool>,
69
70 delay_duration: Arc<RwLock<Duration>>,
72
73 delayed_tasks_channels: DelayedTasksChannelsMap,
75
76 no_delay_counter: Arc<AtomicU64>,
78}
79
80impl<TYPES: NodeType> CombinedNetworks<TYPES> {
81 #[must_use]
87 pub fn new(
88 primary_network: PushCdnNetwork<TYPES::SignatureKey>,
89 secondary_network: Libp2pNetwork<TYPES>,
90 delay_duration: Option<Duration>,
91 ) -> Self {
92 let networks = Arc::from(UnderlyingCombinedNetworks(
94 primary_network,
95 secondary_network,
96 ));
97
98 Self {
99 networks,
100 message_deduplication_cache: Arc::new(PlRwLock::new(MessageDeduplicationCache::new())),
101 primary_fail_counter: Arc::new(AtomicU64::new(0)),
102 primary_down: Arc::new(AtomicBool::new(false)),
103 delay_duration: Arc::new(RwLock::new(
104 delay_duration.unwrap_or(Duration::from_millis(COMBINED_NETWORK_DELAY_DURATION)),
105 )),
106 delayed_tasks_channels: Arc::default(),
107 no_delay_counter: Arc::new(AtomicU64::new(0)),
108 }
109 }
110
111 #[must_use]
113 pub fn primary(&self) -> &PushCdnNetwork<TYPES::SignatureKey> {
114 &self.networks.0
115 }
116
117 #[must_use]
119 pub fn secondary(&self) -> &Libp2pNetwork<TYPES> {
120 &self.networks.1
121 }
122
123 async fn send_both_networks(
125 &self,
126 _message: Vec<u8>,
127 primary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
128 secondary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
129 broadcast_delay: BroadcastDelay,
130 ) -> Result<(), NetworkError> {
131 let mut primary_failed = false;
133 if self.primary_down.load(Ordering::Relaxed) {
134 primary_failed = true;
136 } else if self.primary_fail_counter.load(Ordering::Relaxed)
137 > COMBINED_NETWORK_MIN_PRIMARY_FAILURES
138 {
139 info!(
142 "View progression is slower than normally, stop delaying messages on the secondary"
143 );
144 self.primary_down.store(true, Ordering::Relaxed);
145 primary_failed = true;
146 }
147
148 if let Err(e) = primary_future.await {
150 warn!("Error on primary network: {}", e);
152 self.primary_fail_counter.fetch_add(1, Ordering::Relaxed);
153 primary_failed = true;
154 };
155
156 if let (BroadcastDelay::View(view), false) = (broadcast_delay, primary_failed) {
157 let duration = *self.delay_duration.read().await;
159 let primary_down = Arc::clone(&self.primary_down);
160 let primary_fail_counter = Arc::clone(&self.primary_fail_counter);
161 let mut receiver = self
164 .delayed_tasks_channels
165 .write()
166 .await
167 .entry(view)
168 .or_insert_with(|| {
169 let (s, r) = broadcast(1);
170 (s, r.deactivate())
171 })
172 .1
173 .activate_cloned();
174 spawn(async move {
176 sleep(duration).await;
177 if receiver.try_recv().is_ok() {
178 debug!(
180 "Not sending on secondary after delay, task was canceled in view update"
181 );
182 match primary_fail_counter.load(Ordering::Relaxed) {
183 0u64 => {
184 primary_down.store(false, Ordering::Relaxed);
186 debug!("primary_fail_counter reached zero, primary_down set to false");
187 },
188 c => {
189 primary_fail_counter.store(c - 1, Ordering::Relaxed);
191 debug!("primary_fail_counter set to {:?}", c - 1);
192 },
193 }
194 return Ok(());
195 }
196 debug!(
199 "Sending on secondary after delay, message possibly has not reached recipient \
200 on primary"
201 );
202 primary_fail_counter.fetch_add(1, Ordering::Relaxed);
203 secondary_future.await
204 });
205 Ok(())
206 } else {
207 if self.primary_down.load(Ordering::Relaxed) {
209 match self.no_delay_counter.load(Ordering::Relaxed) {
213 c if c < COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL => {
214 self.no_delay_counter.store(c + 1, Ordering::Relaxed);
216 },
217 _ => {
218 debug!(
220 "Sent on secondary without delay more than {} times,try delaying to \
221 check primary",
222 COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL
223 );
224 self.no_delay_counter.store(0u64, Ordering::Relaxed);
226 self.primary_down.store(false, Ordering::Relaxed);
228 self.primary_fail_counter
230 .store(COMBINED_NETWORK_MIN_PRIMARY_FAILURES, Ordering::Relaxed);
231 },
232 }
233 }
234 secondary_future.await
236 }
237 }
238}
239
240#[derive(Clone)]
244pub struct UnderlyingCombinedNetworks<TYPES: NodeType>(
245 pub PushCdnNetwork<TYPES::SignatureKey>,
246 pub Libp2pNetwork<TYPES>,
247);
248
249#[cfg(feature = "hotshot-testing")]
250impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetworks<TYPES> {
251 fn generator(
252 expected_node_count: usize,
253 num_bootstrap: usize,
254 network_id: usize,
255 da_committee_size: usize,
256 reliability_config: Option<Box<dyn NetworkReliability>>,
257 secondary_network_delay: Duration,
258 ) -> AsyncGenerator<Arc<Self>> {
259 let generators = (
260 <PushCdnNetwork<TYPES::SignatureKey> as TestableNetworkingImplementation<TYPES>>::generator(
261 expected_node_count,
262 num_bootstrap,
263 network_id,
264 da_committee_size,
265 None,
266 Duration::default(),
267 ),
268 <Libp2pNetwork<TYPES> as TestableNetworkingImplementation<TYPES>>::generator(
269 expected_node_count,
270 num_bootstrap,
271 network_id,
272 da_committee_size,
273 reliability_config,
274 Duration::default(),
275 )
276 );
277 Box::pin(move |node_id| {
278 let gen0 = generators.0(node_id);
279 let gen1 = generators.1(node_id);
280
281 Box::pin(async move {
282 let cdn = gen0.await;
284 let cdn = Arc::<PushCdnNetwork<TYPES::SignatureKey>>::into_inner(cdn).unwrap();
285
286 let p2p = gen1.await;
288
289 let underlying_combined = UnderlyingCombinedNetworks(
291 cdn.clone(),
292 Arc::<Libp2pNetwork<TYPES>>::unwrap_or_clone(p2p),
293 );
294
295 let message_deduplication_cache =
297 Arc::new(PlRwLock::new(MessageDeduplicationCache::new()));
298
299 let combined_network = Self {
301 networks: Arc::new(underlying_combined),
302 primary_fail_counter: Arc::new(AtomicU64::new(0)),
303 primary_down: Arc::new(AtomicBool::new(false)),
304 message_deduplication_cache: Arc::clone(&message_deduplication_cache),
305 delay_duration: Arc::new(RwLock::new(secondary_network_delay)),
306 delayed_tasks_channels: Arc::default(),
307 no_delay_counter: Arc::new(AtomicU64::new(0)),
308 };
309
310 Arc::new(combined_network)
311 })
312 })
313 }
314
315 fn in_flight_message_count(&self) -> Option<usize> {
319 None
320 }
321}
322
323#[async_trait]
324impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks<TYPES> {
325 fn pause(&self) {
326 self.networks.0.pause();
327 }
328
329 fn resume(&self) {
330 self.networks.0.resume();
331 }
332
333 async fn wait_for_ready(&self) {
334 join!(
335 self.primary().wait_for_ready(),
336 self.secondary().wait_for_ready()
337 );
338 }
339
340 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
341 where
342 'a: 'b,
343 Self: 'b,
344 {
345 let closure = async move {
346 join!(self.primary().shut_down(), self.secondary().shut_down());
347 };
348 boxed_sync(closure)
349 }
350
351 async fn broadcast_message(
352 &self,
353 message: Vec<u8>,
354 topic: Topic,
355 broadcast_delay: BroadcastDelay,
356 ) -> Result<(), NetworkError> {
357 let primary = self.primary().clone();
358 let secondary = self.secondary().clone();
359 let primary_message = message.clone();
360 let secondary_message = message.clone();
361 let topic_clone = topic.clone();
362 self.send_both_networks(
363 message,
364 async move {
365 primary
366 .broadcast_message(primary_message, topic_clone, BroadcastDelay::None)
367 .await
368 },
369 async move {
370 secondary
371 .broadcast_message(secondary_message, topic, BroadcastDelay::None)
372 .await
373 },
374 broadcast_delay,
375 )
376 .await
377 }
378
379 async fn da_broadcast_message(
380 &self,
381 message: Vec<u8>,
382 recipients: Vec<TYPES::SignatureKey>,
383 broadcast_delay: BroadcastDelay,
384 ) -> Result<(), NetworkError> {
385 let primary = self.primary().clone();
386 let secondary = self.secondary().clone();
387 let primary_message = message.clone();
388 let secondary_message = message.clone();
389 let primary_recipients = recipients.clone();
390 self.send_both_networks(
391 message,
392 async move {
393 primary
394 .da_broadcast_message(primary_message, primary_recipients, BroadcastDelay::None)
395 .await
396 },
397 async move {
398 secondary
399 .da_broadcast_message(secondary_message, recipients, BroadcastDelay::None)
400 .await
401 },
402 broadcast_delay,
403 )
404 .await
405 }
406
407 async fn direct_message(
408 &self,
409 message: Vec<u8>,
410 recipient: TYPES::SignatureKey,
411 ) -> Result<(), NetworkError> {
412 let primary = self.primary().clone();
413 let secondary = self.secondary().clone();
414 let primary_message = message.clone();
415 let secondary_message = message.clone();
416 let primary_recipient = recipient.clone();
417 self.send_both_networks(
418 message,
419 async move {
420 primary
421 .direct_message(primary_message, primary_recipient)
422 .await
423 },
424 async move { secondary.direct_message(secondary_message, recipient).await },
425 BroadcastDelay::None,
426 )
427 .await
428 }
429
430 async fn vid_broadcast_message(
431 &self,
432 messages: HashMap<TYPES::SignatureKey, Vec<u8>>,
433 ) -> Result<(), NetworkError> {
434 self.networks.0.vid_broadcast_message(messages).await
435 }
436
437 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
442 loop {
443 let mut primary_fut = self.primary().recv_message().fuse();
445 let mut secondary_fut = self.secondary().recv_message().fuse();
446
447 let (message, from_primary) = select! {
449 p = primary_fut => (p?, true),
450 s = secondary_fut => (s?, false),
451 };
452
453 if self
455 .message_deduplication_cache
456 .write()
457 .is_unique(&message, from_primary)
458 {
459 break Ok(message);
460 }
461 }
462 }
463
464 fn queue_node_lookup(
465 &self,
466 view_number: ViewNumber,
467 pk: TYPES::SignatureKey,
468 ) -> Result<(), TrySendError<Option<(ViewNumber, TYPES::SignatureKey)>>> {
469 self.primary().queue_node_lookup(view_number, pk.clone())?;
470 self.secondary().queue_node_lookup(view_number, pk)
471 }
472
473 async fn update_view<'a, T>(
474 &'a self,
475 view: u64,
476 epoch: Option<u64>,
477 membership: EpochMembershipCoordinator<T>,
478 ) where
479 T: NodeType<SignatureKey = TYPES::SignatureKey> + 'a,
480 {
481 let delayed_tasks_channels = Arc::clone(&self.delayed_tasks_channels);
482 spawn(async move {
483 let mut map_lock = delayed_tasks_channels.write().await;
484 while let Some((first_view, _)) = map_lock.first_key_value() {
485 if *first_view < view {
487 if let Some((_, (sender, _))) = map_lock.pop_first() {
488 let _ = sender.try_broadcast(());
489 } else {
490 break;
491 }
492 } else {
493 break;
494 }
495 }
496 });
497 self.networks
499 .1
500 .update_view::<T>(view, epoch, membership)
501 .await;
502 }
503
504 fn is_primary_down(&self) -> bool {
505 self.primary_down.load(Ordering::Relaxed)
506 }
507}
508
509struct MessageDeduplicationCache {
510 primary_message_cache: LruCache<blake3::Hash, ()>,
513
514 secondary_message_cache: LruCache<blake3::Hash, ()>,
517}
518
519impl MessageDeduplicationCache {
520 fn new() -> Self {
522 Self {
523 primary_message_cache: LruCache::new(
524 NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
525 ),
526 secondary_message_cache: LruCache::new(
527 NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
528 ),
529 }
530 }
531
532 fn is_unique(&mut self, message: &[u8], from_primary: bool) -> bool {
534 let message_hash = blake3::hash(message);
536
537 let (this_cache, other_cache) = if from_primary {
539 (
540 &mut self.primary_message_cache,
541 &mut self.secondary_message_cache,
542 )
543 } else {
544 (
545 &mut self.secondary_message_cache,
546 &mut self.primary_message_cache,
547 )
548 };
549
550 if other_cache.pop(&message_hash).is_some() {
553 false
555 } else {
556 this_cache.put(message_hash, ());
558 true
559 }
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566
567 #[test]
568 fn test_message_deduplication() {
569 let message = b"hello";
570
571 let mut cache = MessageDeduplicationCache::new();
573 assert!(cache.is_unique(message, true));
574 assert!(!cache.is_unique(message, false));
575
576 assert!(cache.is_unique(message, true));
579 assert!(!cache.is_unique(message, false));
580
581 assert!(cache.is_unique(message, false));
583 assert!(!cache.is_unique(message, true));
584 assert!(cache.is_unique(message, false));
585 assert!(!cache.is_unique(message, true));
586
587 assert!(cache.is_unique(message, true));
589 assert!(cache.is_unique(message, true));
590 assert!(cache.is_unique(message, true));
591 assert!(!cache.is_unique(message, false));
592 assert!(cache.is_unique(message, false));
593 }
594}