hotshot/traits/networking/
combined_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Networking Implementation that has a primary and a fallback network.  If the primary
8//! Errors we will use the backup to send or receive
9use std::{
10    collections::{BTreeMap, HashMap},
11    future::Future,
12    num::NonZeroUsize,
13    sync::{
14        atomic::{AtomicBool, AtomicU64, Ordering},
15        Arc,
16    },
17    time::Duration,
18};
19
20use async_broadcast::{broadcast, InactiveReceiver, Sender};
21use async_lock::RwLock;
22use async_trait::async_trait;
23use futures::{join, select, FutureExt};
24#[cfg(feature = "hotshot-testing")]
25use hotshot_types::traits::network::{
26    AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
27};
28use hotshot_types::{
29    boxed_sync,
30    constants::{
31        COMBINED_NETWORK_CACHE_SIZE, COMBINED_NETWORK_DELAY_DURATION,
32        COMBINED_NETWORK_MIN_PRIMARY_FAILURES, COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL,
33    },
34    data::{EpochNumber, ViewNumber},
35    epoch_membership::EpochMembershipCoordinator,
36    traits::{
37        network::{BroadcastDelay, ConnectedNetwork, Topic},
38        node_implementation::NodeType,
39    },
40    BoxSyncFuture,
41};
42use lru::LruCache;
43use parking_lot::RwLock as PlRwLock;
44use tokio::{spawn, sync::mpsc::error::TrySendError, time::sleep};
45use tracing::{debug, info, warn};
46
47use super::{push_cdn_network::PushCdnNetwork, NetworkError};
48use crate::traits::implementations::Libp2pNetwork;
49
50/// Thread-safe ref counted lock to a map of channels to the delayed tasks
51type DelayedTasksChannelsMap = Arc<RwLock<BTreeMap<u64, (Sender<()>, InactiveReceiver<()>)>>>;
52
53/// A communication channel with 2 networks, where we can fall back to the slower network if the
54/// primary fails
55#[derive(Clone)]
56pub struct CombinedNetworks<TYPES: NodeType> {
57    /// The two networks we'll use for send/recv
58    networks: Arc<UnderlyingCombinedNetworks<TYPES>>,
59
60    /// The message deduplication cache that we use to prevent duplicate messages between
61    /// sources
62    message_deduplication_cache: Arc<PlRwLock<MessageDeduplicationCache>>,
63
64    /// How many times primary failed to deliver
65    primary_fail_counter: Arc<AtomicU64>,
66
67    /// Whether primary is considered down
68    primary_down: Arc<AtomicBool>,
69
70    /// How long to delay
71    delay_duration: Arc<RwLock<Duration>>,
72
73    /// Channels to the delayed tasks
74    delayed_tasks_channels: DelayedTasksChannelsMap,
75
76    /// How many times messages were sent on secondary without delay because primary is down
77    no_delay_counter: Arc<AtomicU64>,
78}
79
80impl<TYPES: NodeType> CombinedNetworks<TYPES> {
81    /// Constructor
82    ///
83    /// # Panics
84    ///
85    /// Panics if `COMBINED_NETWORK_CACHE_SIZE` is 0
86    #[must_use]
87    pub fn new(
88        primary_network: PushCdnNetwork<TYPES::SignatureKey>,
89        secondary_network: Libp2pNetwork<TYPES>,
90        delay_duration: Option<Duration>,
91    ) -> Self {
92        // Create networks from the ones passed in
93        let networks = Arc::from(UnderlyingCombinedNetworks(
94            primary_network,
95            secondary_network,
96        ));
97
98        Self {
99            networks,
100            message_deduplication_cache: Arc::new(PlRwLock::new(MessageDeduplicationCache::new())),
101            primary_fail_counter: Arc::new(AtomicU64::new(0)),
102            primary_down: Arc::new(AtomicBool::new(false)),
103            delay_duration: Arc::new(RwLock::new(
104                delay_duration.unwrap_or(Duration::from_millis(COMBINED_NETWORK_DELAY_DURATION)),
105            )),
106            delayed_tasks_channels: Arc::default(),
107            no_delay_counter: Arc::new(AtomicU64::new(0)),
108        }
109    }
110
111    /// Get a ref to the primary network
112    #[must_use]
113    pub fn primary(&self) -> &PushCdnNetwork<TYPES::SignatureKey> {
114        &self.networks.0
115    }
116
117    /// Get a ref to the backup network
118    #[must_use]
119    pub fn secondary(&self) -> &Libp2pNetwork<TYPES> {
120        &self.networks.1
121    }
122
123    /// a helper function to send messages through both networks (possibly delayed)
124    async fn send_both_networks(
125        &self,
126        _message: Vec<u8>,
127        primary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
128        secondary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
129        broadcast_delay: BroadcastDelay,
130    ) -> Result<(), NetworkError> {
131        // A local variable used to decide whether to delay this message or not
132        let mut primary_failed = false;
133        if self.primary_down.load(Ordering::Relaxed) {
134            // If the primary is considered down, we don't want to delay
135            primary_failed = true;
136        } else if self.primary_fail_counter.load(Ordering::Relaxed)
137            > COMBINED_NETWORK_MIN_PRIMARY_FAILURES
138        {
139            // If the primary failed more than `COMBINED_NETWORK_MIN_PRIMARY_FAILURES` times,
140            // we don't want to delay this message, and from now on we consider the primary as down
141            info!(
142                "View progression is slower than normally, stop delaying messages on the secondary"
143            );
144            self.primary_down.store(true, Ordering::Relaxed);
145            primary_failed = true;
146        }
147
148        // Always send on the primary network
149        if let Err(e) = primary_future.await {
150            // If the primary failed right away, we don't want to delay this message
151            warn!("Error on primary network: {}", e);
152            self.primary_fail_counter.fetch_add(1, Ordering::Relaxed);
153            primary_failed = true;
154        };
155
156        if let (BroadcastDelay::View(view), false) = (broadcast_delay, primary_failed) {
157            // We are delaying this message
158            let duration = *self.delay_duration.read().await;
159            let primary_down = Arc::clone(&self.primary_down);
160            let primary_fail_counter = Arc::clone(&self.primary_fail_counter);
161            // Each delayed task gets its own receiver clone to get a signal cancelling all tasks
162            // related to the given view.
163            let mut receiver = self
164                .delayed_tasks_channels
165                .write()
166                .await
167                .entry(view)
168                .or_insert_with(|| {
169                    let (s, r) = broadcast(1);
170                    (s, r.deactivate())
171                })
172                .1
173                .activate_cloned();
174            // Spawn a task that sleeps for `duration` and then sends the message if it wasn't cancelled
175            spawn(async move {
176                sleep(duration).await;
177                if receiver.try_recv().is_ok() {
178                    // The task has been cancelled because the view progressed, it means the primary is working fine
179                    debug!(
180                        "Not sending on secondary after delay, task was canceled in view update"
181                    );
182                    match primary_fail_counter.load(Ordering::Relaxed) {
183                        0u64 => {
184                            // The primary fail counter reached 0, the primary is now considered up
185                            primary_down.store(false, Ordering::Relaxed);
186                            debug!("primary_fail_counter reached zero, primary_down set to false");
187                        },
188                        c => {
189                            // Decrement the primary fail counter
190                            primary_fail_counter.store(c - 1, Ordering::Relaxed);
191                            debug!("primary_fail_counter set to {:?}", c - 1);
192                        },
193                    }
194                    return Ok(());
195                }
196                // The task hasn't been cancelled, the primary probably failed.
197                // Increment the primary fail counter and send the message.
198                debug!(
199                    "Sending on secondary after delay, message possibly has not reached recipient \
200                     on primary"
201                );
202                primary_fail_counter.fetch_add(1, Ordering::Relaxed);
203                secondary_future.await
204            });
205            Ok(())
206        } else {
207            // We will send without delay
208            if self.primary_down.load(Ordering::Relaxed) {
209                // If the primary is considered down, we want to periodically delay sending
210                // on the secondary to check whether the primary is able to deliver.
211                // This message will be sent without delay but the next might be delayed.
212                match self.no_delay_counter.load(Ordering::Relaxed) {
213                    c if c < COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL => {
214                        // Just increment the 'no delay counter'
215                        self.no_delay_counter.store(c + 1, Ordering::Relaxed);
216                    },
217                    _ => {
218                        // The 'no delay counter' reached the threshold
219                        debug!(
220                            "Sent on secondary without delay more than {} times,try delaying to \
221                             check primary",
222                            COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL
223                        );
224                        // Reset the 'no delay counter'
225                        self.no_delay_counter.store(0u64, Ordering::Relaxed);
226                        // The primary is not considered down for the moment
227                        self.primary_down.store(false, Ordering::Relaxed);
228                        // The primary fail counter is set just below the threshold to delay the next message
229                        self.primary_fail_counter
230                            .store(COMBINED_NETWORK_MIN_PRIMARY_FAILURES, Ordering::Relaxed);
231                    },
232                }
233            }
234            // Send the message
235            secondary_future.await
236        }
237    }
238}
239
240/// Wrapper for the tuple of `PushCdnNetwork` and `Libp2pNetwork`
241/// We need this so we can impl `TestableNetworkingImplementation`
242/// on the tuple
243#[derive(Clone)]
244pub struct UnderlyingCombinedNetworks<TYPES: NodeType>(
245    pub PushCdnNetwork<TYPES::SignatureKey>,
246    pub Libp2pNetwork<TYPES>,
247);
248
249#[cfg(feature = "hotshot-testing")]
250impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetworks<TYPES> {
251    fn generator(
252        expected_node_count: usize,
253        num_bootstrap: usize,
254        network_id: usize,
255        da_committee_size: usize,
256        reliability_config: Option<Box<dyn NetworkReliability>>,
257        secondary_network_delay: Duration,
258    ) -> AsyncGenerator<Arc<Self>> {
259        let generators = (
260            <PushCdnNetwork<TYPES::SignatureKey> as TestableNetworkingImplementation<TYPES>>::generator(
261                expected_node_count,
262                num_bootstrap,
263                network_id,
264                da_committee_size,
265                None,
266                Duration::default(),
267            ),
268            <Libp2pNetwork<TYPES> as TestableNetworkingImplementation<TYPES>>::generator(
269                expected_node_count,
270                num_bootstrap,
271                network_id,
272                da_committee_size,
273                reliability_config,
274                Duration::default(),
275            )
276        );
277        Box::pin(move |node_id| {
278            let gen0 = generators.0(node_id);
279            let gen1 = generators.1(node_id);
280
281            Box::pin(async move {
282                // Generate the CDN network
283                let cdn = gen0.await;
284                let cdn = Arc::<PushCdnNetwork<TYPES::SignatureKey>>::into_inner(cdn).unwrap();
285
286                // Generate the p2p network
287                let p2p = gen1.await;
288
289                // Combine the two
290                let underlying_combined = UnderlyingCombinedNetworks(
291                    cdn.clone(),
292                    Arc::<Libp2pNetwork<TYPES>>::unwrap_or_clone(p2p),
293                );
294
295                // Create a new message deduplication cache
296                let message_deduplication_cache =
297                    Arc::new(PlRwLock::new(MessageDeduplicationCache::new()));
298
299                // Combine the two networks with the same cache
300                let combined_network = Self {
301                    networks: Arc::new(underlying_combined),
302                    primary_fail_counter: Arc::new(AtomicU64::new(0)),
303                    primary_down: Arc::new(AtomicBool::new(false)),
304                    message_deduplication_cache: Arc::clone(&message_deduplication_cache),
305                    delay_duration: Arc::new(RwLock::new(secondary_network_delay)),
306                    delayed_tasks_channels: Arc::default(),
307                    no_delay_counter: Arc::new(AtomicU64::new(0)),
308                };
309
310                Arc::new(combined_network)
311            })
312        })
313    }
314
315    /// Get the number of messages in-flight.
316    ///
317    /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`.
318    fn in_flight_message_count(&self) -> Option<usize> {
319        None
320    }
321}
322
323#[async_trait]
324impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks<TYPES> {
325    fn pause(&self) {
326        self.networks.0.pause();
327    }
328
329    fn resume(&self) {
330        self.networks.0.resume();
331    }
332
333    async fn wait_for_ready(&self) {
334        join!(
335            self.primary().wait_for_ready(),
336            self.secondary().wait_for_ready()
337        );
338    }
339
340    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
341    where
342        'a: 'b,
343        Self: 'b,
344    {
345        let closure = async move {
346            join!(self.primary().shut_down(), self.secondary().shut_down());
347        };
348        boxed_sync(closure)
349    }
350
351    async fn broadcast_message(
352        &self,
353        view: ViewNumber,
354        message: Vec<u8>,
355        topic: Topic,
356        broadcast_delay: BroadcastDelay,
357    ) -> Result<(), NetworkError> {
358        let primary = self.primary().clone();
359        let secondary = self.secondary().clone();
360        let primary_message = message.clone();
361        let secondary_message = message.clone();
362        self.send_both_networks(
363            message,
364            async move {
365                primary
366                    .broadcast_message(view, primary_message, topic, BroadcastDelay::None)
367                    .await
368            },
369            async move {
370                secondary
371                    .broadcast_message(view, secondary_message, topic, BroadcastDelay::None)
372                    .await
373            },
374            broadcast_delay,
375        )
376        .await
377    }
378
379    async fn da_broadcast_message(
380        &self,
381        view: ViewNumber,
382        message: Vec<u8>,
383        recipients: Vec<TYPES::SignatureKey>,
384        broadcast_delay: BroadcastDelay,
385    ) -> Result<(), NetworkError> {
386        let primary = self.primary().clone();
387        let secondary = self.secondary().clone();
388        let primary_message = message.clone();
389        let secondary_message = message.clone();
390        let primary_recipients = recipients.clone();
391        self.send_both_networks(
392            message,
393            async move {
394                primary
395                    .da_broadcast_message(
396                        view,
397                        primary_message,
398                        primary_recipients,
399                        BroadcastDelay::None,
400                    )
401                    .await
402            },
403            async move {
404                secondary
405                    .da_broadcast_message(view, secondary_message, recipients, BroadcastDelay::None)
406                    .await
407            },
408            broadcast_delay,
409        )
410        .await
411    }
412
413    async fn direct_message(
414        &self,
415        view: ViewNumber,
416        message: Vec<u8>,
417        recipient: TYPES::SignatureKey,
418    ) -> Result<(), NetworkError> {
419        let primary = self.primary().clone();
420        let secondary = self.secondary().clone();
421        let primary_message = message.clone();
422        let secondary_message = message.clone();
423        let primary_recipient = recipient.clone();
424        self.send_both_networks(
425            message,
426            async move {
427                primary
428                    .direct_message(view, primary_message, primary_recipient)
429                    .await
430            },
431            async move {
432                secondary
433                    .direct_message(view, secondary_message, recipient)
434                    .await
435            },
436            BroadcastDelay::None,
437        )
438        .await
439    }
440
441    async fn vid_broadcast_message(
442        &self,
443        messages: HashMap<TYPES::SignatureKey, (ViewNumber, Vec<u8>)>,
444    ) -> Result<(), NetworkError> {
445        self.networks.0.vid_broadcast_message(messages).await
446    }
447
448    /// Receive one or many messages from the underlying network.
449    ///
450    /// # Errors
451    /// Does not error
452    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
453        loop {
454            // Receive from both networks
455            let mut primary_fut = self.primary().recv_message().fuse();
456            let mut secondary_fut = self.secondary().recv_message().fuse();
457
458            // Wait for one to return a message
459            let (message, from_primary) = select! {
460                p = primary_fut => (p?, true),
461                s = secondary_fut => (s?, false),
462            };
463
464            // See if we should process the message or not based on the cache
465            if self
466                .message_deduplication_cache
467                .write()
468                .is_unique(&message, from_primary)
469            {
470                break Ok(message);
471            }
472        }
473    }
474
475    fn queue_node_lookup(
476        &self,
477        view_number: ViewNumber,
478        pk: TYPES::SignatureKey,
479    ) -> Result<(), TrySendError<Option<(ViewNumber, TYPES::SignatureKey)>>> {
480        self.primary().queue_node_lookup(view_number, pk.clone())?;
481        self.secondary().queue_node_lookup(view_number, pk)
482    }
483
484    async fn update_view<T>(
485        &self,
486        view: ViewNumber,
487        epoch: Option<EpochNumber>,
488        membership: EpochMembershipCoordinator<T>,
489    ) where
490        T: NodeType<SignatureKey = TYPES::SignatureKey>,
491    {
492        let delayed_tasks_channels = Arc::clone(&self.delayed_tasks_channels);
493        spawn(async move {
494            let mut map_lock = delayed_tasks_channels.write().await;
495            while let Some((first_view, _)) = map_lock.first_key_value() {
496                // Broadcast a cancelling signal to all the tasks related to each view older than the new one
497                if *first_view < *view {
498                    if let Some((_, (sender, _))) = map_lock.pop_first() {
499                        let _ = sender.try_broadcast(());
500                    } else {
501                        break;
502                    }
503                } else {
504                    break;
505                }
506            }
507        });
508        // Run `update_view` logic for the libp2p network
509        self.networks
510            .1
511            .update_view::<T>(view, epoch, membership)
512            .await;
513    }
514
515    fn is_primary_down(&self) -> bool {
516        self.primary_down.load(Ordering::Relaxed)
517    }
518}
519
520struct MessageDeduplicationCache {
521    /// Last n seen messages from the primary network (to prevent processing a duplicate
522    /// received from the secondary network)
523    primary_message_cache: LruCache<blake3::Hash, ()>,
524
525    /// Last n seen messages from the secondary network (to prevent processing a duplicate
526    /// received from the primary network)
527    secondary_message_cache: LruCache<blake3::Hash, ()>,
528}
529
530impl MessageDeduplicationCache {
531    /// Create a new, empty message cache
532    fn new() -> Self {
533        Self {
534            primary_message_cache: LruCache::new(
535                NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
536            ),
537            secondary_message_cache: LruCache::new(
538                NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
539            ),
540        }
541    }
542
543    /// Determine if a message is unique between two sources
544    fn is_unique(&mut self, message: &[u8], from_primary: bool) -> bool {
545        // Calculate the hash of the message
546        let message_hash = blake3::hash(message);
547
548        // Determine which cache to use based on the source of the message
549        let (this_cache, other_cache) = if from_primary {
550            (
551                &mut self.primary_message_cache,
552                &mut self.secondary_message_cache,
553            )
554        } else {
555            (
556                &mut self.secondary_message_cache,
557                &mut self.primary_message_cache,
558            )
559        };
560
561        // Check if we've seen this message from the other source. We want to use `pop` because we
562        // still want to process the message if it gets received again from the same source.
563        if other_cache.pop(&message_hash).is_some() {
564            // We've seen this message from the other source, don't process it again
565            false
566        } else {
567            // First time seeing from this source or already processed duplicate
568            this_cache.put(message_hash, ());
569            true
570        }
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577
578    #[test]
579    fn test_message_deduplication() {
580        let message = b"hello";
581
582        // Process one message from both. Only the first one should be unique
583        let mut cache = MessageDeduplicationCache::new();
584        assert!(cache.is_unique(message, true));
585        assert!(!cache.is_unique(message, false));
586
587        // Since we've already received it once on both, it should continue to be unique
588        // on the second receive
589        assert!(cache.is_unique(message, true));
590        assert!(!cache.is_unique(message, false));
591
592        // Try both of the above tests the other way around
593        assert!(cache.is_unique(message, false));
594        assert!(!cache.is_unique(message, true));
595        assert!(cache.is_unique(message, false));
596        assert!(!cache.is_unique(message, true));
597
598        // The same message from the same source a few times should always be treated as unique
599        assert!(cache.is_unique(message, true));
600        assert!(cache.is_unique(message, true));
601        assert!(cache.is_unique(message, true));
602        assert!(!cache.is_unique(message, false));
603        assert!(cache.is_unique(message, false));
604    }
605}