hotshot/traits/networking/
combined_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Networking Implementation that has a primary and a fallback network.  If the primary
8//! Errors we will use the backup to send or receive
9use std::{
10    collections::{BTreeMap, HashMap},
11    future::Future,
12    num::NonZeroUsize,
13    sync::{
14        Arc,
15        atomic::{AtomicBool, AtomicU64, Ordering},
16    },
17    time::Duration,
18};
19
20use async_broadcast::{InactiveReceiver, Sender, broadcast};
21use async_lock::RwLock;
22use async_trait::async_trait;
23use futures::{FutureExt, join, select};
24use hotshot_types::{
25    BoxSyncFuture, boxed_sync,
26    constants::{
27        COMBINED_NETWORK_CACHE_SIZE, COMBINED_NETWORK_DELAY_DURATION,
28        COMBINED_NETWORK_MIN_PRIMARY_FAILURES, COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL,
29    },
30    data::{EpochNumber, ViewNumber},
31    epoch_membership::EpochMembershipCoordinator,
32    traits::{
33        network::{BroadcastDelay, ConnectedNetwork, Topic},
34        node_implementation::NodeType,
35    },
36};
37#[cfg(feature = "hotshot-testing")]
38use hotshot_types::{
39    PeerConnectInfo,
40    traits::network::{AsyncGenerator, NetworkReliability, TestableNetworkingImplementation},
41};
42use lru::LruCache;
43use parking_lot::RwLock as PlRwLock;
44use tokio::{spawn, sync::mpsc::error::TrySendError, time::sleep};
45use tracing::{debug, info, warn};
46
47use super::{NetworkError, push_cdn_network::PushCdnNetwork};
48use crate::traits::implementations::Libp2pNetwork;
49
50/// Thread-safe ref counted lock to a map of channels to the delayed tasks
51type DelayedTasksChannelsMap = Arc<RwLock<BTreeMap<u64, (Sender<()>, InactiveReceiver<()>)>>>;
52
53/// A communication channel with 2 networks, where we can fall back to the slower network if the
54/// primary fails
55#[derive(Clone)]
56pub struct CombinedNetworks<TYPES: NodeType> {
57    /// The two networks we'll use for send/recv
58    networks: Arc<UnderlyingCombinedNetworks<TYPES>>,
59
60    /// The message deduplication cache that we use to prevent duplicate messages between
61    /// sources
62    message_deduplication_cache: Arc<PlRwLock<MessageDeduplicationCache>>,
63
64    /// How many times primary failed to deliver
65    primary_fail_counter: Arc<AtomicU64>,
66
67    /// Whether primary is considered down
68    primary_down: Arc<AtomicBool>,
69
70    /// How long to delay
71    delay_duration: Arc<RwLock<Duration>>,
72
73    /// Channels to the delayed tasks
74    delayed_tasks_channels: DelayedTasksChannelsMap,
75
76    /// How many times messages were sent on secondary without delay because primary is down
77    no_delay_counter: Arc<AtomicU64>,
78}
79
80impl<TYPES: NodeType> CombinedNetworks<TYPES> {
81    /// Constructor
82    ///
83    /// # Panics
84    ///
85    /// Panics if `COMBINED_NETWORK_CACHE_SIZE` is 0
86    #[must_use]
87    pub fn new(
88        primary_network: PushCdnNetwork<TYPES::SignatureKey>,
89        secondary_network: Libp2pNetwork<TYPES>,
90        delay_duration: Option<Duration>,
91    ) -> Self {
92        // Create networks from the ones passed in
93        let networks = Arc::from(UnderlyingCombinedNetworks(
94            primary_network,
95            secondary_network,
96        ));
97
98        Self {
99            networks,
100            message_deduplication_cache: Arc::new(PlRwLock::new(MessageDeduplicationCache::new())),
101            primary_fail_counter: Arc::new(AtomicU64::new(0)),
102            primary_down: Arc::new(AtomicBool::new(false)),
103            delay_duration: Arc::new(RwLock::new(
104                delay_duration.unwrap_or(Duration::from_millis(COMBINED_NETWORK_DELAY_DURATION)),
105            )),
106            delayed_tasks_channels: Arc::default(),
107            no_delay_counter: Arc::new(AtomicU64::new(0)),
108        }
109    }
110
111    /// Get a ref to the primary network
112    #[must_use]
113    pub fn primary(&self) -> &PushCdnNetwork<TYPES::SignatureKey> {
114        &self.networks.0
115    }
116
117    /// Get a ref to the backup network
118    #[must_use]
119    pub fn secondary(&self) -> &Libp2pNetwork<TYPES> {
120        &self.networks.1
121    }
122
123    /// a helper function to send messages through both networks (possibly delayed)
124    async fn send_both_networks(
125        &self,
126        _message: Vec<u8>,
127        primary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
128        secondary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
129        broadcast_delay: BroadcastDelay,
130    ) -> Result<(), NetworkError> {
131        // A local variable used to decide whether to delay this message or not
132        let mut primary_failed = false;
133        if self.primary_down.load(Ordering::Relaxed) {
134            // If the primary is considered down, we don't want to delay
135            primary_failed = true;
136        } else if self.primary_fail_counter.load(Ordering::Relaxed)
137            > COMBINED_NETWORK_MIN_PRIMARY_FAILURES
138        {
139            // If the primary failed more than `COMBINED_NETWORK_MIN_PRIMARY_FAILURES` times,
140            // we don't want to delay this message, and from now on we consider the primary as down
141            info!(
142                "View progression is slower than normally, stop delaying messages on the secondary"
143            );
144            self.primary_down.store(true, Ordering::Relaxed);
145            primary_failed = true;
146        }
147
148        // Always send on the primary network
149        if let Err(e) = primary_future.await {
150            // If the primary failed right away, we don't want to delay this message
151            warn!("Error on primary network: {}", e);
152            self.primary_fail_counter.fetch_add(1, Ordering::Relaxed);
153            primary_failed = true;
154        };
155
156        if let (BroadcastDelay::View(view), false) = (broadcast_delay, primary_failed) {
157            // We are delaying this message
158            let duration = *self.delay_duration.read().await;
159            let primary_down = Arc::clone(&self.primary_down);
160            let primary_fail_counter = Arc::clone(&self.primary_fail_counter);
161            // Each delayed task gets its own receiver clone to get a signal cancelling all tasks
162            // related to the given view.
163            let mut receiver = self
164                .delayed_tasks_channels
165                .write()
166                .await
167                .entry(view)
168                .or_insert_with(|| {
169                    let (s, r) = broadcast(1);
170                    (s, r.deactivate())
171                })
172                .1
173                .activate_cloned();
174            // Spawn a task that sleeps for `duration` and then sends the message if it wasn't cancelled
175            spawn(async move {
176                sleep(duration).await;
177                if receiver.try_recv().is_ok() {
178                    // The task has been cancelled because the view progressed, it means the primary is working fine
179                    debug!(
180                        "Not sending on secondary after delay, task was canceled in view update"
181                    );
182                    match primary_fail_counter.load(Ordering::Relaxed) {
183                        0u64 => {
184                            // The primary fail counter reached 0, the primary is now considered up
185                            primary_down.store(false, Ordering::Relaxed);
186                            debug!("primary_fail_counter reached zero, primary_down set to false");
187                        },
188                        c => {
189                            // Decrement the primary fail counter
190                            primary_fail_counter.store(c - 1, Ordering::Relaxed);
191                            debug!("primary_fail_counter set to {:?}", c - 1);
192                        },
193                    }
194                    return Ok(());
195                }
196                // The task hasn't been cancelled, the primary probably failed.
197                // Increment the primary fail counter and send the message.
198                debug!(
199                    "Sending on secondary after delay, message possibly has not reached recipient \
200                     on primary"
201                );
202                primary_fail_counter.fetch_add(1, Ordering::Relaxed);
203                secondary_future.await
204            });
205            Ok(())
206        } else {
207            // We will send without delay
208            if self.primary_down.load(Ordering::Relaxed) {
209                // If the primary is considered down, we want to periodically delay sending
210                // on the secondary to check whether the primary is able to deliver.
211                // This message will be sent without delay but the next might be delayed.
212                match self.no_delay_counter.load(Ordering::Relaxed) {
213                    c if c < COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL => {
214                        // Just increment the 'no delay counter'
215                        self.no_delay_counter.store(c + 1, Ordering::Relaxed);
216                    },
217                    _ => {
218                        // The 'no delay counter' reached the threshold
219                        debug!(
220                            "Sent on secondary without delay more than {} times,try delaying to \
221                             check primary",
222                            COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL
223                        );
224                        // Reset the 'no delay counter'
225                        self.no_delay_counter.store(0u64, Ordering::Relaxed);
226                        // The primary is not considered down for the moment
227                        self.primary_down.store(false, Ordering::Relaxed);
228                        // The primary fail counter is set just below the threshold to delay the next message
229                        self.primary_fail_counter
230                            .store(COMBINED_NETWORK_MIN_PRIMARY_FAILURES, Ordering::Relaxed);
231                    },
232                }
233            }
234            // Send the message
235            tokio::time::timeout(Duration::from_secs(2), secondary_future)
236                .await
237                .map_err(|e| NetworkError::Timeout(e.to_string()))?
238        }
239    }
240}
241
242/// Wrapper for the tuple of `PushCdnNetwork` and `Libp2pNetwork`
243/// We need this so we can impl `TestableNetworkingImplementation`
244/// on the tuple
245#[derive(Clone)]
246pub struct UnderlyingCombinedNetworks<TYPES: NodeType>(
247    pub PushCdnNetwork<TYPES::SignatureKey>,
248    pub Libp2pNetwork<TYPES>,
249);
250
251#[cfg(feature = "hotshot-testing")]
252impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetworks<TYPES> {
253    fn generator(
254        expected_node_count: usize,
255        num_bootstrap: usize,
256        network_id: usize,
257        da_committee_size: usize,
258        reliability_config: Option<Box<dyn NetworkReliability>>,
259        secondary_network_delay: Duration,
260        connect_infos: &mut HashMap<TYPES::SignatureKey, PeerConnectInfo>,
261    ) -> AsyncGenerator<Arc<Self>> {
262        let generators = (
263            <PushCdnNetwork<TYPES::SignatureKey> as TestableNetworkingImplementation<TYPES>>::generator(
264                expected_node_count,
265                num_bootstrap,
266                network_id,
267                da_committee_size,
268                None,
269                Duration::default(),
270                connect_infos
271            ),
272            <Libp2pNetwork<TYPES> as TestableNetworkingImplementation<TYPES>>::generator(
273                expected_node_count,
274                num_bootstrap,
275                network_id,
276                da_committee_size,
277                reliability_config,
278                Duration::default(),
279                connect_infos
280            )
281        );
282        Box::pin(move |node_id| {
283            let gen0 = generators.0(node_id);
284            let gen1 = generators.1(node_id);
285
286            Box::pin(async move {
287                // Generate the CDN network
288                let cdn = gen0.await;
289                let cdn = Arc::<PushCdnNetwork<TYPES::SignatureKey>>::into_inner(cdn).unwrap();
290
291                // Generate the p2p network
292                let p2p = gen1.await;
293
294                // Combine the two
295                let underlying_combined = UnderlyingCombinedNetworks(
296                    cdn.clone(),
297                    Arc::<Libp2pNetwork<TYPES>>::unwrap_or_clone(p2p),
298                );
299
300                // Create a new message deduplication cache
301                let message_deduplication_cache =
302                    Arc::new(PlRwLock::new(MessageDeduplicationCache::new()));
303
304                // Combine the two networks with the same cache
305                let combined_network = Self {
306                    networks: Arc::new(underlying_combined),
307                    primary_fail_counter: Arc::new(AtomicU64::new(0)),
308                    primary_down: Arc::new(AtomicBool::new(false)),
309                    message_deduplication_cache: Arc::clone(&message_deduplication_cache),
310                    delay_duration: Arc::new(RwLock::new(secondary_network_delay)),
311                    delayed_tasks_channels: Arc::default(),
312                    no_delay_counter: Arc::new(AtomicU64::new(0)),
313                };
314
315                Arc::new(combined_network)
316            })
317        })
318    }
319
320    /// Get the number of messages in-flight.
321    ///
322    /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`.
323    fn in_flight_message_count(&self) -> Option<usize> {
324        None
325    }
326}
327
328#[async_trait]
329impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks<TYPES> {
330    fn pause(&self) {
331        self.networks.0.pause();
332    }
333
334    fn resume(&self) {
335        self.networks.0.resume();
336    }
337
338    async fn wait_for_ready(&self) {
339        join!(
340            self.primary().wait_for_ready(),
341            self.secondary().wait_for_ready()
342        );
343    }
344
345    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
346    where
347        'a: 'b,
348        Self: 'b,
349    {
350        let closure = async move {
351            join!(self.primary().shut_down(), self.secondary().shut_down());
352        };
353        boxed_sync(closure)
354    }
355
356    async fn broadcast_message(
357        &self,
358        view: ViewNumber,
359        message: Vec<u8>,
360        topic: Topic,
361        broadcast_delay: BroadcastDelay,
362    ) -> Result<(), NetworkError> {
363        let primary = self.primary().clone();
364        let secondary = self.secondary().clone();
365        let primary_message = message.clone();
366        let secondary_message = message.clone();
367        self.send_both_networks(
368            message,
369            async move {
370                primary
371                    .broadcast_message(view, primary_message, topic, BroadcastDelay::None)
372                    .await
373            },
374            async move {
375                secondary
376                    .broadcast_message(view, secondary_message, topic, BroadcastDelay::None)
377                    .await
378            },
379            broadcast_delay,
380        )
381        .await
382    }
383
384    async fn da_broadcast_message(
385        &self,
386        view: ViewNumber,
387        message: Vec<u8>,
388        recipients: Vec<TYPES::SignatureKey>,
389        broadcast_delay: BroadcastDelay,
390    ) -> Result<(), NetworkError> {
391        let primary = self.primary().clone();
392        let secondary = self.secondary().clone();
393        let primary_message = message.clone();
394        let secondary_message = message.clone();
395        let primary_recipients = recipients.clone();
396        self.send_both_networks(
397            message,
398            async move {
399                primary
400                    .da_broadcast_message(
401                        view,
402                        primary_message,
403                        primary_recipients,
404                        BroadcastDelay::None,
405                    )
406                    .await
407            },
408            async move {
409                secondary
410                    .da_broadcast_message(view, secondary_message, recipients, BroadcastDelay::None)
411                    .await
412            },
413            broadcast_delay,
414        )
415        .await
416    }
417
418    async fn direct_message(
419        &self,
420        view: ViewNumber,
421        message: Vec<u8>,
422        recipient: TYPES::SignatureKey,
423    ) -> Result<(), NetworkError> {
424        let primary = self.primary().clone();
425        let secondary = self.secondary().clone();
426        let primary_message = message.clone();
427        let secondary_message = message.clone();
428        let primary_recipient = recipient.clone();
429        self.send_both_networks(
430            message,
431            async move {
432                primary
433                    .direct_message(view, primary_message, primary_recipient)
434                    .await
435            },
436            async move {
437                secondary
438                    .direct_message(view, secondary_message, recipient)
439                    .await
440            },
441            BroadcastDelay::None,
442        )
443        .await
444    }
445
446    async fn vid_broadcast_message(
447        &self,
448        messages: HashMap<TYPES::SignatureKey, (ViewNumber, Vec<u8>)>,
449    ) -> Result<(), NetworkError> {
450        self.networks.0.vid_broadcast_message(messages).await
451    }
452
453    /// Receive one or many messages from the underlying network.
454    ///
455    /// # Errors
456    /// Does not error
457    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
458        loop {
459            // Receive from both networks
460            let mut primary_fut = self.primary().recv_message().fuse();
461            let mut secondary_fut = self.secondary().recv_message().fuse();
462
463            // Wait for one to return a message
464            let (message, from_primary) = select! {
465                p = primary_fut => (p?, true),
466                s = secondary_fut => (s?, false),
467            };
468
469            // See if we should process the message or not based on the cache
470            if self
471                .message_deduplication_cache
472                .write()
473                .is_unique(&message, from_primary)
474            {
475                break Ok(message);
476            }
477        }
478    }
479
480    fn queue_node_lookup(
481        &self,
482        view_number: ViewNumber,
483        pk: TYPES::SignatureKey,
484    ) -> Result<(), TrySendError<Option<(ViewNumber, TYPES::SignatureKey)>>> {
485        self.primary().queue_node_lookup(view_number, pk.clone())?;
486        self.secondary().queue_node_lookup(view_number, pk)
487    }
488
489    async fn update_view<T>(
490        &self,
491        view: ViewNumber,
492        epoch: Option<EpochNumber>,
493        membership: EpochMembershipCoordinator<T>,
494    ) where
495        T: NodeType<SignatureKey = TYPES::SignatureKey>,
496    {
497        let delayed_tasks_channels = Arc::clone(&self.delayed_tasks_channels);
498        spawn(async move {
499            let mut map_lock = delayed_tasks_channels.write().await;
500            while let Some((first_view, _)) = map_lock.first_key_value() {
501                // Broadcast a cancelling signal to all the tasks related to each view older than the new one
502                if *first_view < *view {
503                    if let Some((_, (sender, _))) = map_lock.pop_first() {
504                        let _ = sender.try_broadcast(());
505                    } else {
506                        break;
507                    }
508                } else {
509                    break;
510                }
511            }
512        });
513        // Run `update_view` logic for the libp2p network
514        self.networks
515            .1
516            .update_view::<T>(view, epoch, membership)
517            .await;
518    }
519
520    fn is_primary_down(&self) -> bool {
521        self.primary_down.load(Ordering::Relaxed)
522    }
523}
524
525struct MessageDeduplicationCache {
526    /// Last n seen messages from the primary network (to prevent processing a duplicate
527    /// received from the secondary network)
528    primary_message_cache: LruCache<blake3::Hash, ()>,
529
530    /// Last n seen messages from the secondary network (to prevent processing a duplicate
531    /// received from the primary network)
532    secondary_message_cache: LruCache<blake3::Hash, ()>,
533}
534
535impl MessageDeduplicationCache {
536    /// Create a new, empty message cache
537    fn new() -> Self {
538        Self {
539            primary_message_cache: LruCache::new(
540                NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
541            ),
542            secondary_message_cache: LruCache::new(
543                NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
544            ),
545        }
546    }
547
548    /// Determine if a message is unique between two sources
549    fn is_unique(&mut self, message: &[u8], from_primary: bool) -> bool {
550        // Calculate the hash of the message
551        let message_hash = blake3::hash(message);
552
553        // Determine which cache to use based on the source of the message
554        let (this_cache, other_cache) = if from_primary {
555            (
556                &mut self.primary_message_cache,
557                &mut self.secondary_message_cache,
558            )
559        } else {
560            (
561                &mut self.secondary_message_cache,
562                &mut self.primary_message_cache,
563            )
564        };
565
566        // Check if we've seen this message from the other source. We want to use `pop` because we
567        // still want to process the message if it gets received again from the same source.
568        if other_cache.pop(&message_hash).is_some() {
569            // We've seen this message from the other source, don't process it again
570            false
571        } else {
572            // First time seeing from this source or already processed duplicate
573            this_cache.put(message_hash, ());
574            true
575        }
576    }
577}
578
579#[cfg(test)]
580mod tests {
581    use super::*;
582
583    #[test]
584    fn test_message_deduplication() {
585        let message = b"hello";
586
587        // Process one message from both. Only the first one should be unique
588        let mut cache = MessageDeduplicationCache::new();
589        assert!(cache.is_unique(message, true));
590        assert!(!cache.is_unique(message, false));
591
592        // Since we've already received it once on both, it should continue to be unique
593        // on the second receive
594        assert!(cache.is_unique(message, true));
595        assert!(!cache.is_unique(message, false));
596
597        // Try both of the above tests the other way around
598        assert!(cache.is_unique(message, false));
599        assert!(!cache.is_unique(message, true));
600        assert!(cache.is_unique(message, false));
601        assert!(!cache.is_unique(message, true));
602
603        // The same message from the same source a few times should always be treated as unique
604        assert!(cache.is_unique(message, true));
605        assert!(cache.is_unique(message, true));
606        assert!(cache.is_unique(message, true));
607        assert!(!cache.is_unique(message, false));
608        assert!(cache.is_unique(message, false));
609    }
610}