1use std::{
10 collections::{BTreeMap, HashMap},
11 future::Future,
12 num::NonZeroUsize,
13 sync::{
14 atomic::{AtomicBool, AtomicU64, Ordering},
15 Arc,
16 },
17 time::Duration,
18};
19
20use async_broadcast::{broadcast, InactiveReceiver, Sender};
21use async_lock::RwLock;
22use async_trait::async_trait;
23use futures::{join, select, FutureExt};
24#[cfg(feature = "hotshot-testing")]
25use hotshot_types::traits::network::{
26 AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
27};
28use hotshot_types::{
29 boxed_sync,
30 constants::{
31 COMBINED_NETWORK_CACHE_SIZE, COMBINED_NETWORK_DELAY_DURATION,
32 COMBINED_NETWORK_MIN_PRIMARY_FAILURES, COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL,
33 },
34 data::ViewNumber,
35 epoch_membership::EpochMembershipCoordinator,
36 traits::{
37 network::{BroadcastDelay, ConnectedNetwork, Topic},
38 node_implementation::NodeType,
39 },
40 BoxSyncFuture,
41};
42use lru::LruCache;
43use parking_lot::RwLock as PlRwLock;
44use tokio::{spawn, sync::mpsc::error::TrySendError, time::sleep};
45use tracing::{debug, info, warn};
46
47use super::{push_cdn_network::PushCdnNetwork, NetworkError};
48use crate::traits::implementations::Libp2pNetwork;
49
50type DelayedTasksChannelsMap = Arc<RwLock<BTreeMap<u64, (Sender<()>, InactiveReceiver<()>)>>>;
52
53#[derive(Clone)]
56pub struct CombinedNetworks<TYPES: NodeType> {
57 networks: Arc<UnderlyingCombinedNetworks<TYPES>>,
59
60 message_deduplication_cache: Arc<PlRwLock<MessageDeduplicationCache>>,
63
64 primary_fail_counter: Arc<AtomicU64>,
66
67 primary_down: Arc<AtomicBool>,
69
70 delay_duration: Arc<RwLock<Duration>>,
72
73 delayed_tasks_channels: DelayedTasksChannelsMap,
75
76 no_delay_counter: Arc<AtomicU64>,
78}
79
80impl<TYPES: NodeType> CombinedNetworks<TYPES> {
81 #[must_use]
87 pub fn new(
88 primary_network: PushCdnNetwork<TYPES::SignatureKey>,
89 secondary_network: Libp2pNetwork<TYPES>,
90 delay_duration: Option<Duration>,
91 ) -> Self {
92 let networks = Arc::from(UnderlyingCombinedNetworks(
94 primary_network,
95 secondary_network,
96 ));
97
98 Self {
99 networks,
100 message_deduplication_cache: Arc::new(PlRwLock::new(MessageDeduplicationCache::new())),
101 primary_fail_counter: Arc::new(AtomicU64::new(0)),
102 primary_down: Arc::new(AtomicBool::new(false)),
103 delay_duration: Arc::new(RwLock::new(
104 delay_duration.unwrap_or(Duration::from_millis(COMBINED_NETWORK_DELAY_DURATION)),
105 )),
106 delayed_tasks_channels: Arc::default(),
107 no_delay_counter: Arc::new(AtomicU64::new(0)),
108 }
109 }
110
111 #[must_use]
113 pub fn primary(&self) -> &PushCdnNetwork<TYPES::SignatureKey> {
114 &self.networks.0
115 }
116
117 #[must_use]
119 pub fn secondary(&self) -> &Libp2pNetwork<TYPES> {
120 &self.networks.1
121 }
122
123 async fn send_both_networks(
125 &self,
126 _message: Vec<u8>,
127 primary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
128 secondary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
129 broadcast_delay: BroadcastDelay,
130 ) -> Result<(), NetworkError> {
131 let mut primary_failed = false;
133 if self.primary_down.load(Ordering::Relaxed) {
134 primary_failed = true;
136 } else if self.primary_fail_counter.load(Ordering::Relaxed)
137 > COMBINED_NETWORK_MIN_PRIMARY_FAILURES
138 {
139 info!(
142 "View progression is slower than normally, stop delaying messages on the secondary"
143 );
144 self.primary_down.store(true, Ordering::Relaxed);
145 primary_failed = true;
146 }
147
148 if let Err(e) = primary_future.await {
150 warn!("Error on primary network: {}", e);
152 self.primary_fail_counter.fetch_add(1, Ordering::Relaxed);
153 primary_failed = true;
154 };
155
156 if let (BroadcastDelay::View(view), false) = (broadcast_delay, primary_failed) {
157 let duration = *self.delay_duration.read().await;
159 let primary_down = Arc::clone(&self.primary_down);
160 let primary_fail_counter = Arc::clone(&self.primary_fail_counter);
161 let mut receiver = self
164 .delayed_tasks_channels
165 .write()
166 .await
167 .entry(view)
168 .or_insert_with(|| {
169 let (s, r) = broadcast(1);
170 (s, r.deactivate())
171 })
172 .1
173 .activate_cloned();
174 spawn(async move {
176 sleep(duration).await;
177 if receiver.try_recv().is_ok() {
178 debug!(
180 "Not sending on secondary after delay, task was canceled in view update"
181 );
182 match primary_fail_counter.load(Ordering::Relaxed) {
183 0u64 => {
184 primary_down.store(false, Ordering::Relaxed);
186 debug!("primary_fail_counter reached zero, primary_down set to false");
187 },
188 c => {
189 primary_fail_counter.store(c - 1, Ordering::Relaxed);
191 debug!("primary_fail_counter set to {:?}", c - 1);
192 },
193 }
194 return Ok(());
195 }
196 debug!(
199 "Sending on secondary after delay, message possibly has not reached recipient \
200 on primary"
201 );
202 primary_fail_counter.fetch_add(1, Ordering::Relaxed);
203 secondary_future.await
204 });
205 Ok(())
206 } else {
207 if self.primary_down.load(Ordering::Relaxed) {
209 match self.no_delay_counter.load(Ordering::Relaxed) {
213 c if c < COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL => {
214 self.no_delay_counter.store(c + 1, Ordering::Relaxed);
216 },
217 _ => {
218 debug!(
220 "Sent on secondary without delay more than {} times,try delaying to \
221 check primary",
222 COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL
223 );
224 self.no_delay_counter.store(0u64, Ordering::Relaxed);
226 self.primary_down.store(false, Ordering::Relaxed);
228 self.primary_fail_counter
230 .store(COMBINED_NETWORK_MIN_PRIMARY_FAILURES, Ordering::Relaxed);
231 },
232 }
233 }
234 secondary_future.await
236 }
237 }
238}
239
240#[derive(Clone)]
244pub struct UnderlyingCombinedNetworks<TYPES: NodeType>(
245 pub PushCdnNetwork<TYPES::SignatureKey>,
246 pub Libp2pNetwork<TYPES>,
247);
248
249#[cfg(feature = "hotshot-testing")]
250impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetworks<TYPES> {
251 fn generator(
252 expected_node_count: usize,
253 num_bootstrap: usize,
254 network_id: usize,
255 da_committee_size: usize,
256 reliability_config: Option<Box<dyn NetworkReliability>>,
257 secondary_network_delay: Duration,
258 ) -> AsyncGenerator<Arc<Self>> {
259 let generators = (
260 <PushCdnNetwork<TYPES::SignatureKey> as TestableNetworkingImplementation<TYPES>>::generator(
261 expected_node_count,
262 num_bootstrap,
263 network_id,
264 da_committee_size,
265 None,
266 Duration::default(),
267 ),
268 <Libp2pNetwork<TYPES> as TestableNetworkingImplementation<TYPES>>::generator(
269 expected_node_count,
270 num_bootstrap,
271 network_id,
272 da_committee_size,
273 reliability_config,
274 Duration::default(),
275 )
276 );
277 Box::pin(move |node_id| {
278 let gen0 = generators.0(node_id);
279 let gen1 = generators.1(node_id);
280
281 Box::pin(async move {
282 let cdn = gen0.await;
284 let cdn = Arc::<PushCdnNetwork<TYPES::SignatureKey>>::into_inner(cdn).unwrap();
285
286 let p2p = gen1.await;
288
289 let underlying_combined = UnderlyingCombinedNetworks(
291 cdn.clone(),
292 Arc::<Libp2pNetwork<TYPES>>::unwrap_or_clone(p2p),
293 );
294
295 let message_deduplication_cache =
297 Arc::new(PlRwLock::new(MessageDeduplicationCache::new()));
298
299 let combined_network = Self {
301 networks: Arc::new(underlying_combined),
302 primary_fail_counter: Arc::new(AtomicU64::new(0)),
303 primary_down: Arc::new(AtomicBool::new(false)),
304 message_deduplication_cache: Arc::clone(&message_deduplication_cache),
305 delay_duration: Arc::new(RwLock::new(secondary_network_delay)),
306 delayed_tasks_channels: Arc::default(),
307 no_delay_counter: Arc::new(AtomicU64::new(0)),
308 };
309
310 Arc::new(combined_network)
311 })
312 })
313 }
314
315 fn in_flight_message_count(&self) -> Option<usize> {
319 None
320 }
321}
322
323#[async_trait]
324impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks<TYPES> {
325 fn pause(&self) {
326 self.networks.0.pause();
327 }
328
329 fn resume(&self) {
330 self.networks.0.resume();
331 }
332
333 async fn wait_for_ready(&self) {
334 join!(
335 self.primary().wait_for_ready(),
336 self.secondary().wait_for_ready()
337 );
338 }
339
340 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
341 where
342 'a: 'b,
343 Self: 'b,
344 {
345 let closure = async move {
346 join!(self.primary().shut_down(), self.secondary().shut_down());
347 };
348 boxed_sync(closure)
349 }
350
351 async fn broadcast_message(
352 &self,
353 message: Vec<u8>,
354 topic: Topic,
355 broadcast_delay: BroadcastDelay,
356 ) -> Result<(), NetworkError> {
357 let primary = self.primary().clone();
358 let secondary = self.secondary().clone();
359 let primary_message = message.clone();
360 let secondary_message = message.clone();
361 self.send_both_networks(
362 message,
363 async move {
364 primary
365 .broadcast_message(primary_message, topic, BroadcastDelay::None)
366 .await
367 },
368 async move {
369 secondary
370 .broadcast_message(secondary_message, topic, BroadcastDelay::None)
371 .await
372 },
373 broadcast_delay,
374 )
375 .await
376 }
377
378 async fn da_broadcast_message(
379 &self,
380 message: Vec<u8>,
381 recipients: Vec<TYPES::SignatureKey>,
382 broadcast_delay: BroadcastDelay,
383 ) -> Result<(), NetworkError> {
384 let primary = self.primary().clone();
385 let secondary = self.secondary().clone();
386 let primary_message = message.clone();
387 let secondary_message = message.clone();
388 let primary_recipients = recipients.clone();
389 self.send_both_networks(
390 message,
391 async move {
392 primary
393 .da_broadcast_message(primary_message, primary_recipients, BroadcastDelay::None)
394 .await
395 },
396 async move {
397 secondary
398 .da_broadcast_message(secondary_message, recipients, BroadcastDelay::None)
399 .await
400 },
401 broadcast_delay,
402 )
403 .await
404 }
405
406 async fn direct_message(
407 &self,
408 message: Vec<u8>,
409 recipient: TYPES::SignatureKey,
410 ) -> Result<(), NetworkError> {
411 let primary = self.primary().clone();
412 let secondary = self.secondary().clone();
413 let primary_message = message.clone();
414 let secondary_message = message.clone();
415 let primary_recipient = recipient.clone();
416 self.send_both_networks(
417 message,
418 async move {
419 primary
420 .direct_message(primary_message, primary_recipient)
421 .await
422 },
423 async move { secondary.direct_message(secondary_message, recipient).await },
424 BroadcastDelay::None,
425 )
426 .await
427 }
428
429 async fn vid_broadcast_message(
430 &self,
431 messages: HashMap<TYPES::SignatureKey, Vec<u8>>,
432 ) -> Result<(), NetworkError> {
433 self.networks.0.vid_broadcast_message(messages).await
434 }
435
436 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
441 loop {
442 let mut primary_fut = self.primary().recv_message().fuse();
444 let mut secondary_fut = self.secondary().recv_message().fuse();
445
446 let (message, from_primary) = select! {
448 p = primary_fut => (p?, true),
449 s = secondary_fut => (s?, false),
450 };
451
452 if self
454 .message_deduplication_cache
455 .write()
456 .is_unique(&message, from_primary)
457 {
458 break Ok(message);
459 }
460 }
461 }
462
463 fn queue_node_lookup(
464 &self,
465 view_number: ViewNumber,
466 pk: TYPES::SignatureKey,
467 ) -> Result<(), TrySendError<Option<(ViewNumber, TYPES::SignatureKey)>>> {
468 self.primary().queue_node_lookup(view_number, pk.clone())?;
469 self.secondary().queue_node_lookup(view_number, pk)
470 }
471
472 async fn update_view<'a, T>(
473 &'a self,
474 view: u64,
475 epoch: Option<u64>,
476 membership: EpochMembershipCoordinator<T>,
477 ) where
478 T: NodeType<SignatureKey = TYPES::SignatureKey> + 'a,
479 {
480 let delayed_tasks_channels = Arc::clone(&self.delayed_tasks_channels);
481 spawn(async move {
482 let mut map_lock = delayed_tasks_channels.write().await;
483 while let Some((first_view, _)) = map_lock.first_key_value() {
484 if *first_view < view {
486 if let Some((_, (sender, _))) = map_lock.pop_first() {
487 let _ = sender.try_broadcast(());
488 } else {
489 break;
490 }
491 } else {
492 break;
493 }
494 }
495 });
496 self.networks
498 .1
499 .update_view::<T>(view, epoch, membership)
500 .await;
501 }
502
503 fn is_primary_down(&self) -> bool {
504 self.primary_down.load(Ordering::Relaxed)
505 }
506}
507
508struct MessageDeduplicationCache {
509 primary_message_cache: LruCache<blake3::Hash, ()>,
512
513 secondary_message_cache: LruCache<blake3::Hash, ()>,
516}
517
518impl MessageDeduplicationCache {
519 fn new() -> Self {
521 Self {
522 primary_message_cache: LruCache::new(
523 NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
524 ),
525 secondary_message_cache: LruCache::new(
526 NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
527 ),
528 }
529 }
530
531 fn is_unique(&mut self, message: &[u8], from_primary: bool) -> bool {
533 let message_hash = blake3::hash(message);
535
536 let (this_cache, other_cache) = if from_primary {
538 (
539 &mut self.primary_message_cache,
540 &mut self.secondary_message_cache,
541 )
542 } else {
543 (
544 &mut self.secondary_message_cache,
545 &mut self.primary_message_cache,
546 )
547 };
548
549 if other_cache.pop(&message_hash).is_some() {
552 false
554 } else {
555 this_cache.put(message_hash, ());
557 true
558 }
559 }
560}
561
562#[cfg(test)]
563mod tests {
564 use super::*;
565
566 #[test]
567 fn test_message_deduplication() {
568 let message = b"hello";
569
570 let mut cache = MessageDeduplicationCache::new();
572 assert!(cache.is_unique(message, true));
573 assert!(!cache.is_unique(message, false));
574
575 assert!(cache.is_unique(message, true));
578 assert!(!cache.is_unique(message, false));
579
580 assert!(cache.is_unique(message, false));
582 assert!(!cache.is_unique(message, true));
583 assert!(cache.is_unique(message, false));
584 assert!(!cache.is_unique(message, true));
585
586 assert!(cache.is_unique(message, true));
588 assert!(cache.is_unique(message, true));
589 assert!(cache.is_unique(message, true));
590 assert!(!cache.is_unique(message, false));
591 assert!(cache.is_unique(message, false));
592 }
593}