hotshot/traits/networking/
combined_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Networking Implementation that has a primary and a fallback network.  If the primary
8//! Errors we will use the backup to send or receive
9use std::{
10    collections::{BTreeMap, HashMap},
11    future::Future,
12    num::NonZeroUsize,
13    sync::{
14        atomic::{AtomicBool, AtomicU64, Ordering},
15        Arc,
16    },
17    time::Duration,
18};
19
20use async_broadcast::{broadcast, InactiveReceiver, Sender};
21use async_lock::RwLock;
22use async_trait::async_trait;
23use futures::{join, select, FutureExt};
24#[cfg(feature = "hotshot-testing")]
25use hotshot_types::traits::network::{
26    AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
27};
28use hotshot_types::{
29    boxed_sync,
30    constants::{
31        COMBINED_NETWORK_CACHE_SIZE, COMBINED_NETWORK_DELAY_DURATION,
32        COMBINED_NETWORK_MIN_PRIMARY_FAILURES, COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL,
33    },
34    data::ViewNumber,
35    epoch_membership::EpochMembershipCoordinator,
36    traits::{
37        network::{BroadcastDelay, ConnectedNetwork, Topic},
38        node_implementation::NodeType,
39    },
40    BoxSyncFuture,
41};
42use lru::LruCache;
43use parking_lot::RwLock as PlRwLock;
44use tokio::{spawn, sync::mpsc::error::TrySendError, time::sleep};
45use tracing::{debug, info, warn};
46
47use super::{push_cdn_network::PushCdnNetwork, NetworkError};
48use crate::traits::implementations::Libp2pNetwork;
49
50/// Thread-safe ref counted lock to a map of channels to the delayed tasks
51type DelayedTasksChannelsMap = Arc<RwLock<BTreeMap<u64, (Sender<()>, InactiveReceiver<()>)>>>;
52
53/// A communication channel with 2 networks, where we can fall back to the slower network if the
54/// primary fails
55#[derive(Clone)]
56pub struct CombinedNetworks<TYPES: NodeType> {
57    /// The two networks we'll use for send/recv
58    networks: Arc<UnderlyingCombinedNetworks<TYPES>>,
59
60    /// The message deduplication cache that we use to prevent duplicate messages between
61    /// sources
62    message_deduplication_cache: Arc<PlRwLock<MessageDeduplicationCache>>,
63
64    /// How many times primary failed to deliver
65    primary_fail_counter: Arc<AtomicU64>,
66
67    /// Whether primary is considered down
68    primary_down: Arc<AtomicBool>,
69
70    /// How long to delay
71    delay_duration: Arc<RwLock<Duration>>,
72
73    /// Channels to the delayed tasks
74    delayed_tasks_channels: DelayedTasksChannelsMap,
75
76    /// How many times messages were sent on secondary without delay because primary is down
77    no_delay_counter: Arc<AtomicU64>,
78}
79
80impl<TYPES: NodeType> CombinedNetworks<TYPES> {
81    /// Constructor
82    ///
83    /// # Panics
84    ///
85    /// Panics if `COMBINED_NETWORK_CACHE_SIZE` is 0
86    #[must_use]
87    pub fn new(
88        primary_network: PushCdnNetwork<TYPES::SignatureKey>,
89        secondary_network: Libp2pNetwork<TYPES>,
90        delay_duration: Option<Duration>,
91    ) -> Self {
92        // Create networks from the ones passed in
93        let networks = Arc::from(UnderlyingCombinedNetworks(
94            primary_network,
95            secondary_network,
96        ));
97
98        Self {
99            networks,
100            message_deduplication_cache: Arc::new(PlRwLock::new(MessageDeduplicationCache::new())),
101            primary_fail_counter: Arc::new(AtomicU64::new(0)),
102            primary_down: Arc::new(AtomicBool::new(false)),
103            delay_duration: Arc::new(RwLock::new(
104                delay_duration.unwrap_or(Duration::from_millis(COMBINED_NETWORK_DELAY_DURATION)),
105            )),
106            delayed_tasks_channels: Arc::default(),
107            no_delay_counter: Arc::new(AtomicU64::new(0)),
108        }
109    }
110
111    /// Get a ref to the primary network
112    #[must_use]
113    pub fn primary(&self) -> &PushCdnNetwork<TYPES::SignatureKey> {
114        &self.networks.0
115    }
116
117    /// Get a ref to the backup network
118    #[must_use]
119    pub fn secondary(&self) -> &Libp2pNetwork<TYPES> {
120        &self.networks.1
121    }
122
123    /// a helper function to send messages through both networks (possibly delayed)
124    async fn send_both_networks(
125        &self,
126        _message: Vec<u8>,
127        primary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
128        secondary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
129        broadcast_delay: BroadcastDelay,
130    ) -> Result<(), NetworkError> {
131        // A local variable used to decide whether to delay this message or not
132        let mut primary_failed = false;
133        if self.primary_down.load(Ordering::Relaxed) {
134            // If the primary is considered down, we don't want to delay
135            primary_failed = true;
136        } else if self.primary_fail_counter.load(Ordering::Relaxed)
137            > COMBINED_NETWORK_MIN_PRIMARY_FAILURES
138        {
139            // If the primary failed more than `COMBINED_NETWORK_MIN_PRIMARY_FAILURES` times,
140            // we don't want to delay this message, and from now on we consider the primary as down
141            info!(
142                "View progression is slower than normally, stop delaying messages on the secondary"
143            );
144            self.primary_down.store(true, Ordering::Relaxed);
145            primary_failed = true;
146        }
147
148        // Always send on the primary network
149        if let Err(e) = primary_future.await {
150            // If the primary failed right away, we don't want to delay this message
151            warn!("Error on primary network: {}", e);
152            self.primary_fail_counter.fetch_add(1, Ordering::Relaxed);
153            primary_failed = true;
154        };
155
156        if let (BroadcastDelay::View(view), false) = (broadcast_delay, primary_failed) {
157            // We are delaying this message
158            let duration = *self.delay_duration.read().await;
159            let primary_down = Arc::clone(&self.primary_down);
160            let primary_fail_counter = Arc::clone(&self.primary_fail_counter);
161            // Each delayed task gets its own receiver clone to get a signal cancelling all tasks
162            // related to the given view.
163            let mut receiver = self
164                .delayed_tasks_channels
165                .write()
166                .await
167                .entry(view)
168                .or_insert_with(|| {
169                    let (s, r) = broadcast(1);
170                    (s, r.deactivate())
171                })
172                .1
173                .activate_cloned();
174            // Spawn a task that sleeps for `duration` and then sends the message if it wasn't cancelled
175            spawn(async move {
176                sleep(duration).await;
177                if receiver.try_recv().is_ok() {
178                    // The task has been cancelled because the view progressed, it means the primary is working fine
179                    debug!(
180                        "Not sending on secondary after delay, task was canceled in view update"
181                    );
182                    match primary_fail_counter.load(Ordering::Relaxed) {
183                        0u64 => {
184                            // The primary fail counter reached 0, the primary is now considered up
185                            primary_down.store(false, Ordering::Relaxed);
186                            debug!("primary_fail_counter reached zero, primary_down set to false");
187                        },
188                        c => {
189                            // Decrement the primary fail counter
190                            primary_fail_counter.store(c - 1, Ordering::Relaxed);
191                            debug!("primary_fail_counter set to {:?}", c - 1);
192                        },
193                    }
194                    return Ok(());
195                }
196                // The task hasn't been cancelled, the primary probably failed.
197                // Increment the primary fail counter and send the message.
198                debug!(
199                    "Sending on secondary after delay, message possibly has not reached recipient \
200                     on primary"
201                );
202                primary_fail_counter.fetch_add(1, Ordering::Relaxed);
203                secondary_future.await
204            });
205            Ok(())
206        } else {
207            // We will send without delay
208            if self.primary_down.load(Ordering::Relaxed) {
209                // If the primary is considered down, we want to periodically delay sending
210                // on the secondary to check whether the primary is able to deliver.
211                // This message will be sent without delay but the next might be delayed.
212                match self.no_delay_counter.load(Ordering::Relaxed) {
213                    c if c < COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL => {
214                        // Just increment the 'no delay counter'
215                        self.no_delay_counter.store(c + 1, Ordering::Relaxed);
216                    },
217                    _ => {
218                        // The 'no delay counter' reached the threshold
219                        debug!(
220                            "Sent on secondary without delay more than {} times,try delaying to \
221                             check primary",
222                            COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL
223                        );
224                        // Reset the 'no delay counter'
225                        self.no_delay_counter.store(0u64, Ordering::Relaxed);
226                        // The primary is not considered down for the moment
227                        self.primary_down.store(false, Ordering::Relaxed);
228                        // The primary fail counter is set just below the threshold to delay the next message
229                        self.primary_fail_counter
230                            .store(COMBINED_NETWORK_MIN_PRIMARY_FAILURES, Ordering::Relaxed);
231                    },
232                }
233            }
234            // Send the message
235            secondary_future.await
236        }
237    }
238}
239
240/// Wrapper for the tuple of `PushCdnNetwork` and `Libp2pNetwork`
241/// We need this so we can impl `TestableNetworkingImplementation`
242/// on the tuple
243#[derive(Clone)]
244pub struct UnderlyingCombinedNetworks<TYPES: NodeType>(
245    pub PushCdnNetwork<TYPES::SignatureKey>,
246    pub Libp2pNetwork<TYPES>,
247);
248
249#[cfg(feature = "hotshot-testing")]
250impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetworks<TYPES> {
251    fn generator(
252        expected_node_count: usize,
253        num_bootstrap: usize,
254        network_id: usize,
255        da_committee_size: usize,
256        reliability_config: Option<Box<dyn NetworkReliability>>,
257        secondary_network_delay: Duration,
258    ) -> AsyncGenerator<Arc<Self>> {
259        let generators = (
260            <PushCdnNetwork<TYPES::SignatureKey> as TestableNetworkingImplementation<TYPES>>::generator(
261                expected_node_count,
262                num_bootstrap,
263                network_id,
264                da_committee_size,
265                None,
266                Duration::default(),
267            ),
268            <Libp2pNetwork<TYPES> as TestableNetworkingImplementation<TYPES>>::generator(
269                expected_node_count,
270                num_bootstrap,
271                network_id,
272                da_committee_size,
273                reliability_config,
274                Duration::default(),
275            )
276        );
277        Box::pin(move |node_id| {
278            let gen0 = generators.0(node_id);
279            let gen1 = generators.1(node_id);
280
281            Box::pin(async move {
282                // Generate the CDN network
283                let cdn = gen0.await;
284                let cdn = Arc::<PushCdnNetwork<TYPES::SignatureKey>>::into_inner(cdn).unwrap();
285
286                // Generate the p2p network
287                let p2p = gen1.await;
288
289                // Combine the two
290                let underlying_combined = UnderlyingCombinedNetworks(
291                    cdn.clone(),
292                    Arc::<Libp2pNetwork<TYPES>>::unwrap_or_clone(p2p),
293                );
294
295                // Create a new message deduplication cache
296                let message_deduplication_cache =
297                    Arc::new(PlRwLock::new(MessageDeduplicationCache::new()));
298
299                // Combine the two networks with the same cache
300                let combined_network = Self {
301                    networks: Arc::new(underlying_combined),
302                    primary_fail_counter: Arc::new(AtomicU64::new(0)),
303                    primary_down: Arc::new(AtomicBool::new(false)),
304                    message_deduplication_cache: Arc::clone(&message_deduplication_cache),
305                    delay_duration: Arc::new(RwLock::new(secondary_network_delay)),
306                    delayed_tasks_channels: Arc::default(),
307                    no_delay_counter: Arc::new(AtomicU64::new(0)),
308                };
309
310                Arc::new(combined_network)
311            })
312        })
313    }
314
315    /// Get the number of messages in-flight.
316    ///
317    /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`.
318    fn in_flight_message_count(&self) -> Option<usize> {
319        None
320    }
321}
322
323#[async_trait]
324impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks<TYPES> {
325    fn pause(&self) {
326        self.networks.0.pause();
327    }
328
329    fn resume(&self) {
330        self.networks.0.resume();
331    }
332
333    async fn wait_for_ready(&self) {
334        join!(
335            self.primary().wait_for_ready(),
336            self.secondary().wait_for_ready()
337        );
338    }
339
340    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
341    where
342        'a: 'b,
343        Self: 'b,
344    {
345        let closure = async move {
346            join!(self.primary().shut_down(), self.secondary().shut_down());
347        };
348        boxed_sync(closure)
349    }
350
351    async fn broadcast_message(
352        &self,
353        message: Vec<u8>,
354        topic: Topic,
355        broadcast_delay: BroadcastDelay,
356    ) -> Result<(), NetworkError> {
357        let primary = self.primary().clone();
358        let secondary = self.secondary().clone();
359        let primary_message = message.clone();
360        let secondary_message = message.clone();
361        self.send_both_networks(
362            message,
363            async move {
364                primary
365                    .broadcast_message(primary_message, topic, BroadcastDelay::None)
366                    .await
367            },
368            async move {
369                secondary
370                    .broadcast_message(secondary_message, topic, BroadcastDelay::None)
371                    .await
372            },
373            broadcast_delay,
374        )
375        .await
376    }
377
378    async fn da_broadcast_message(
379        &self,
380        message: Vec<u8>,
381        recipients: Vec<TYPES::SignatureKey>,
382        broadcast_delay: BroadcastDelay,
383    ) -> Result<(), NetworkError> {
384        let primary = self.primary().clone();
385        let secondary = self.secondary().clone();
386        let primary_message = message.clone();
387        let secondary_message = message.clone();
388        let primary_recipients = recipients.clone();
389        self.send_both_networks(
390            message,
391            async move {
392                primary
393                    .da_broadcast_message(primary_message, primary_recipients, BroadcastDelay::None)
394                    .await
395            },
396            async move {
397                secondary
398                    .da_broadcast_message(secondary_message, recipients, BroadcastDelay::None)
399                    .await
400            },
401            broadcast_delay,
402        )
403        .await
404    }
405
406    async fn direct_message(
407        &self,
408        message: Vec<u8>,
409        recipient: TYPES::SignatureKey,
410    ) -> Result<(), NetworkError> {
411        let primary = self.primary().clone();
412        let secondary = self.secondary().clone();
413        let primary_message = message.clone();
414        let secondary_message = message.clone();
415        let primary_recipient = recipient.clone();
416        self.send_both_networks(
417            message,
418            async move {
419                primary
420                    .direct_message(primary_message, primary_recipient)
421                    .await
422            },
423            async move { secondary.direct_message(secondary_message, recipient).await },
424            BroadcastDelay::None,
425        )
426        .await
427    }
428
429    async fn vid_broadcast_message(
430        &self,
431        messages: HashMap<TYPES::SignatureKey, Vec<u8>>,
432    ) -> Result<(), NetworkError> {
433        self.networks.0.vid_broadcast_message(messages).await
434    }
435
436    /// Receive one or many messages from the underlying network.
437    ///
438    /// # Errors
439    /// Does not error
440    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
441        loop {
442            // Receive from both networks
443            let mut primary_fut = self.primary().recv_message().fuse();
444            let mut secondary_fut = self.secondary().recv_message().fuse();
445
446            // Wait for one to return a message
447            let (message, from_primary) = select! {
448                p = primary_fut => (p?, true),
449                s = secondary_fut => (s?, false),
450            };
451
452            // See if we should process the message or not based on the cache
453            if self
454                .message_deduplication_cache
455                .write()
456                .is_unique(&message, from_primary)
457            {
458                break Ok(message);
459            }
460        }
461    }
462
463    fn queue_node_lookup(
464        &self,
465        view_number: ViewNumber,
466        pk: TYPES::SignatureKey,
467    ) -> Result<(), TrySendError<Option<(ViewNumber, TYPES::SignatureKey)>>> {
468        self.primary().queue_node_lookup(view_number, pk.clone())?;
469        self.secondary().queue_node_lookup(view_number, pk)
470    }
471
472    async fn update_view<'a, T>(
473        &'a self,
474        view: u64,
475        epoch: Option<u64>,
476        membership: EpochMembershipCoordinator<T>,
477    ) where
478        T: NodeType<SignatureKey = TYPES::SignatureKey> + 'a,
479    {
480        let delayed_tasks_channels = Arc::clone(&self.delayed_tasks_channels);
481        spawn(async move {
482            let mut map_lock = delayed_tasks_channels.write().await;
483            while let Some((first_view, _)) = map_lock.first_key_value() {
484                // Broadcast a cancelling signal to all the tasks related to each view older than the new one
485                if *first_view < view {
486                    if let Some((_, (sender, _))) = map_lock.pop_first() {
487                        let _ = sender.try_broadcast(());
488                    } else {
489                        break;
490                    }
491                } else {
492                    break;
493                }
494            }
495        });
496        // Run `update_view` logic for the libp2p network
497        self.networks
498            .1
499            .update_view::<T>(view, epoch, membership)
500            .await;
501    }
502
503    fn is_primary_down(&self) -> bool {
504        self.primary_down.load(Ordering::Relaxed)
505    }
506}
507
508struct MessageDeduplicationCache {
509    /// Last n seen messages from the primary network (to prevent processing a duplicate
510    /// received from the secondary network)
511    primary_message_cache: LruCache<blake3::Hash, ()>,
512
513    /// Last n seen messages from the secondary network (to prevent processing a duplicate
514    /// received from the primary network)
515    secondary_message_cache: LruCache<blake3::Hash, ()>,
516}
517
518impl MessageDeduplicationCache {
519    /// Create a new, empty message cache
520    fn new() -> Self {
521        Self {
522            primary_message_cache: LruCache::new(
523                NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
524            ),
525            secondary_message_cache: LruCache::new(
526                NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
527            ),
528        }
529    }
530
531    /// Determine if a message is unique between two sources
532    fn is_unique(&mut self, message: &[u8], from_primary: bool) -> bool {
533        // Calculate the hash of the message
534        let message_hash = blake3::hash(message);
535
536        // Determine which cache to use based on the source of the message
537        let (this_cache, other_cache) = if from_primary {
538            (
539                &mut self.primary_message_cache,
540                &mut self.secondary_message_cache,
541            )
542        } else {
543            (
544                &mut self.secondary_message_cache,
545                &mut self.primary_message_cache,
546            )
547        };
548
549        // Check if we've seen this message from the other source. We want to use `pop` because we
550        // still want to process the message if it gets received again from the same source.
551        if other_cache.pop(&message_hash).is_some() {
552            // We've seen this message from the other source, don't process it again
553            false
554        } else {
555            // First time seeing from this source or already processed duplicate
556            this_cache.put(message_hash, ());
557            true
558        }
559    }
560}
561
562#[cfg(test)]
563mod tests {
564    use super::*;
565
566    #[test]
567    fn test_message_deduplication() {
568        let message = b"hello";
569
570        // Process one message from both. Only the first one should be unique
571        let mut cache = MessageDeduplicationCache::new();
572        assert!(cache.is_unique(message, true));
573        assert!(!cache.is_unique(message, false));
574
575        // Since we've already received it once on both, it should continue to be unique
576        // on the second receive
577        assert!(cache.is_unique(message, true));
578        assert!(!cache.is_unique(message, false));
579
580        // Try both of the above tests the other way around
581        assert!(cache.is_unique(message, false));
582        assert!(!cache.is_unique(message, true));
583        assert!(cache.is_unique(message, false));
584        assert!(!cache.is_unique(message, true));
585
586        // The same message from the same source a few times should always be treated as unique
587        assert!(cache.is_unique(message, true));
588        assert!(cache.is_unique(message, true));
589        assert!(cache.is_unique(message, true));
590        assert!(!cache.is_unique(message, false));
591        assert!(cache.is_unique(message, false));
592    }
593}