hotshot/traits/networking/
combined_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Networking Implementation that has a primary and a fallback network.  If the primary
8//! Errors we will use the backup to send or receive
9use std::{
10    collections::{BTreeMap, HashMap},
11    future::Future,
12    num::NonZeroUsize,
13    sync::{
14        atomic::{AtomicBool, AtomicU64, Ordering},
15        Arc,
16    },
17    time::Duration,
18};
19
20use async_broadcast::{broadcast, InactiveReceiver, Sender};
21use async_lock::RwLock;
22use async_trait::async_trait;
23use futures::{join, select, FutureExt};
24#[cfg(feature = "hotshot-testing")]
25use hotshot_types::traits::network::{
26    AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
27};
28use hotshot_types::{
29    boxed_sync,
30    constants::{
31        COMBINED_NETWORK_CACHE_SIZE, COMBINED_NETWORK_DELAY_DURATION,
32        COMBINED_NETWORK_MIN_PRIMARY_FAILURES, COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL,
33    },
34    data::ViewNumber,
35    epoch_membership::EpochMembershipCoordinator,
36    traits::{
37        network::{BroadcastDelay, ConnectedNetwork, Topic},
38        node_implementation::NodeType,
39    },
40    BoxSyncFuture,
41};
42use lru::LruCache;
43use parking_lot::RwLock as PlRwLock;
44use tokio::{spawn, sync::mpsc::error::TrySendError, time::sleep};
45use tracing::{debug, info, warn};
46
47use super::{push_cdn_network::PushCdnNetwork, NetworkError};
48use crate::traits::implementations::Libp2pNetwork;
49
50/// Thread-safe ref counted lock to a map of channels to the delayed tasks
51type DelayedTasksChannelsMap = Arc<RwLock<BTreeMap<u64, (Sender<()>, InactiveReceiver<()>)>>>;
52
53/// A communication channel with 2 networks, where we can fall back to the slower network if the
54/// primary fails
55#[derive(Clone)]
56pub struct CombinedNetworks<TYPES: NodeType> {
57    /// The two networks we'll use for send/recv
58    networks: Arc<UnderlyingCombinedNetworks<TYPES>>,
59
60    /// The message deduplication cache that we use to prevent duplicate messages between
61    /// sources
62    message_deduplication_cache: Arc<PlRwLock<MessageDeduplicationCache>>,
63
64    /// How many times primary failed to deliver
65    primary_fail_counter: Arc<AtomicU64>,
66
67    /// Whether primary is considered down
68    primary_down: Arc<AtomicBool>,
69
70    /// How long to delay
71    delay_duration: Arc<RwLock<Duration>>,
72
73    /// Channels to the delayed tasks
74    delayed_tasks_channels: DelayedTasksChannelsMap,
75
76    /// How many times messages were sent on secondary without delay because primary is down
77    no_delay_counter: Arc<AtomicU64>,
78}
79
80impl<TYPES: NodeType> CombinedNetworks<TYPES> {
81    /// Constructor
82    ///
83    /// # Panics
84    ///
85    /// Panics if `COMBINED_NETWORK_CACHE_SIZE` is 0
86    #[must_use]
87    pub fn new(
88        primary_network: PushCdnNetwork<TYPES::SignatureKey>,
89        secondary_network: Libp2pNetwork<TYPES>,
90        delay_duration: Option<Duration>,
91    ) -> Self {
92        // Create networks from the ones passed in
93        let networks = Arc::from(UnderlyingCombinedNetworks(
94            primary_network,
95            secondary_network,
96        ));
97
98        Self {
99            networks,
100            message_deduplication_cache: Arc::new(PlRwLock::new(MessageDeduplicationCache::new())),
101            primary_fail_counter: Arc::new(AtomicU64::new(0)),
102            primary_down: Arc::new(AtomicBool::new(false)),
103            delay_duration: Arc::new(RwLock::new(
104                delay_duration.unwrap_or(Duration::from_millis(COMBINED_NETWORK_DELAY_DURATION)),
105            )),
106            delayed_tasks_channels: Arc::default(),
107            no_delay_counter: Arc::new(AtomicU64::new(0)),
108        }
109    }
110
111    /// Get a ref to the primary network
112    #[must_use]
113    pub fn primary(&self) -> &PushCdnNetwork<TYPES::SignatureKey> {
114        &self.networks.0
115    }
116
117    /// Get a ref to the backup network
118    #[must_use]
119    pub fn secondary(&self) -> &Libp2pNetwork<TYPES> {
120        &self.networks.1
121    }
122
123    /// a helper function to send messages through both networks (possibly delayed)
124    async fn send_both_networks(
125        &self,
126        _message: Vec<u8>,
127        primary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
128        secondary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
129        broadcast_delay: BroadcastDelay,
130    ) -> Result<(), NetworkError> {
131        // A local variable used to decide whether to delay this message or not
132        let mut primary_failed = false;
133        if self.primary_down.load(Ordering::Relaxed) {
134            // If the primary is considered down, we don't want to delay
135            primary_failed = true;
136        } else if self.primary_fail_counter.load(Ordering::Relaxed)
137            > COMBINED_NETWORK_MIN_PRIMARY_FAILURES
138        {
139            // If the primary failed more than `COMBINED_NETWORK_MIN_PRIMARY_FAILURES` times,
140            // we don't want to delay this message, and from now on we consider the primary as down
141            info!(
142                "View progression is slower than normally, stop delaying messages on the secondary"
143            );
144            self.primary_down.store(true, Ordering::Relaxed);
145            primary_failed = true;
146        }
147
148        // Always send on the primary network
149        if let Err(e) = primary_future.await {
150            // If the primary failed right away, we don't want to delay this message
151            warn!("Error on primary network: {}", e);
152            self.primary_fail_counter.fetch_add(1, Ordering::Relaxed);
153            primary_failed = true;
154        };
155
156        if let (BroadcastDelay::View(view), false) = (broadcast_delay, primary_failed) {
157            // We are delaying this message
158            let duration = *self.delay_duration.read().await;
159            let primary_down = Arc::clone(&self.primary_down);
160            let primary_fail_counter = Arc::clone(&self.primary_fail_counter);
161            // Each delayed task gets its own receiver clone to get a signal cancelling all tasks
162            // related to the given view.
163            let mut receiver = self
164                .delayed_tasks_channels
165                .write()
166                .await
167                .entry(view)
168                .or_insert_with(|| {
169                    let (s, r) = broadcast(1);
170                    (s, r.deactivate())
171                })
172                .1
173                .activate_cloned();
174            // Spawn a task that sleeps for `duration` and then sends the message if it wasn't cancelled
175            spawn(async move {
176                sleep(duration).await;
177                if receiver.try_recv().is_ok() {
178                    // The task has been cancelled because the view progressed, it means the primary is working fine
179                    debug!(
180                        "Not sending on secondary after delay, task was canceled in view update"
181                    );
182                    match primary_fail_counter.load(Ordering::Relaxed) {
183                        0u64 => {
184                            // The primary fail counter reached 0, the primary is now considered up
185                            primary_down.store(false, Ordering::Relaxed);
186                            debug!("primary_fail_counter reached zero, primary_down set to false");
187                        },
188                        c => {
189                            // Decrement the primary fail counter
190                            primary_fail_counter.store(c - 1, Ordering::Relaxed);
191                            debug!("primary_fail_counter set to {:?}", c - 1);
192                        },
193                    }
194                    return Ok(());
195                }
196                // The task hasn't been cancelled, the primary probably failed.
197                // Increment the primary fail counter and send the message.
198                debug!("Sending on secondary after delay, message possibly has not reached recipient on primary");
199                primary_fail_counter.fetch_add(1, Ordering::Relaxed);
200                secondary_future.await
201            });
202            Ok(())
203        } else {
204            // We will send without delay
205            if self.primary_down.load(Ordering::Relaxed) {
206                // If the primary is considered down, we want to periodically delay sending
207                // on the secondary to check whether the primary is able to deliver.
208                // This message will be sent without delay but the next might be delayed.
209                match self.no_delay_counter.load(Ordering::Relaxed) {
210                    c if c < COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL => {
211                        // Just increment the 'no delay counter'
212                        self.no_delay_counter.store(c + 1, Ordering::Relaxed);
213                    },
214                    _ => {
215                        // The 'no delay counter' reached the threshold
216                        debug!(
217                            "Sent on secondary without delay more than {} times,\
218                            try delaying to check primary",
219                            COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL
220                        );
221                        // Reset the 'no delay counter'
222                        self.no_delay_counter.store(0u64, Ordering::Relaxed);
223                        // The primary is not considered down for the moment
224                        self.primary_down.store(false, Ordering::Relaxed);
225                        // The primary fail counter is set just below the threshold to delay the next message
226                        self.primary_fail_counter
227                            .store(COMBINED_NETWORK_MIN_PRIMARY_FAILURES, Ordering::Relaxed);
228                    },
229                }
230            }
231            // Send the message
232            secondary_future.await
233        }
234    }
235}
236
237/// Wrapper for the tuple of `PushCdnNetwork` and `Libp2pNetwork`
238/// We need this so we can impl `TestableNetworkingImplementation`
239/// on the tuple
240#[derive(Clone)]
241pub struct UnderlyingCombinedNetworks<TYPES: NodeType>(
242    pub PushCdnNetwork<TYPES::SignatureKey>,
243    pub Libp2pNetwork<TYPES>,
244);
245
246#[cfg(feature = "hotshot-testing")]
247impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetworks<TYPES> {
248    fn generator(
249        expected_node_count: usize,
250        num_bootstrap: usize,
251        network_id: usize,
252        da_committee_size: usize,
253        reliability_config: Option<Box<dyn NetworkReliability>>,
254        secondary_network_delay: Duration,
255    ) -> AsyncGenerator<Arc<Self>> {
256        let generators = (
257            <PushCdnNetwork<TYPES::SignatureKey> as TestableNetworkingImplementation<TYPES>>::generator(
258                expected_node_count,
259                num_bootstrap,
260                network_id,
261                da_committee_size,
262                None,
263                Duration::default(),
264            ),
265            <Libp2pNetwork<TYPES> as TestableNetworkingImplementation<TYPES>>::generator(
266                expected_node_count,
267                num_bootstrap,
268                network_id,
269                da_committee_size,
270                reliability_config,
271                Duration::default(),
272            )
273        );
274        Box::pin(move |node_id| {
275            let gen0 = generators.0(node_id);
276            let gen1 = generators.1(node_id);
277
278            Box::pin(async move {
279                // Generate the CDN network
280                let cdn = gen0.await;
281                let cdn = Arc::<PushCdnNetwork<TYPES::SignatureKey>>::into_inner(cdn).unwrap();
282
283                // Generate the p2p network
284                let p2p = gen1.await;
285
286                // Combine the two
287                let underlying_combined = UnderlyingCombinedNetworks(
288                    cdn.clone(),
289                    Arc::<Libp2pNetwork<TYPES>>::unwrap_or_clone(p2p),
290                );
291
292                // Create a new message deduplication cache
293                let message_deduplication_cache =
294                    Arc::new(PlRwLock::new(MessageDeduplicationCache::new()));
295
296                // Combine the two networks with the same cache
297                let combined_network = Self {
298                    networks: Arc::new(underlying_combined),
299                    primary_fail_counter: Arc::new(AtomicU64::new(0)),
300                    primary_down: Arc::new(AtomicBool::new(false)),
301                    message_deduplication_cache: Arc::clone(&message_deduplication_cache),
302                    delay_duration: Arc::new(RwLock::new(secondary_network_delay)),
303                    delayed_tasks_channels: Arc::default(),
304                    no_delay_counter: Arc::new(AtomicU64::new(0)),
305                };
306
307                Arc::new(combined_network)
308            })
309        })
310    }
311
312    /// Get the number of messages in-flight.
313    ///
314    /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`.
315    fn in_flight_message_count(&self) -> Option<usize> {
316        None
317    }
318}
319
320#[async_trait]
321impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks<TYPES> {
322    fn pause(&self) {
323        self.networks.0.pause();
324    }
325
326    fn resume(&self) {
327        self.networks.0.resume();
328    }
329
330    async fn wait_for_ready(&self) {
331        join!(
332            self.primary().wait_for_ready(),
333            self.secondary().wait_for_ready()
334        );
335    }
336
337    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
338    where
339        'a: 'b,
340        Self: 'b,
341    {
342        let closure = async move {
343            join!(self.primary().shut_down(), self.secondary().shut_down());
344        };
345        boxed_sync(closure)
346    }
347
348    async fn broadcast_message(
349        &self,
350        message: Vec<u8>,
351        topic: Topic,
352        broadcast_delay: BroadcastDelay,
353    ) -> Result<(), NetworkError> {
354        let primary = self.primary().clone();
355        let secondary = self.secondary().clone();
356        let primary_message = message.clone();
357        let secondary_message = message.clone();
358        let topic_clone = topic.clone();
359        self.send_both_networks(
360            message,
361            async move {
362                primary
363                    .broadcast_message(primary_message, topic_clone, BroadcastDelay::None)
364                    .await
365            },
366            async move {
367                secondary
368                    .broadcast_message(secondary_message, topic, BroadcastDelay::None)
369                    .await
370            },
371            broadcast_delay,
372        )
373        .await
374    }
375
376    async fn da_broadcast_message(
377        &self,
378        message: Vec<u8>,
379        recipients: Vec<TYPES::SignatureKey>,
380        broadcast_delay: BroadcastDelay,
381    ) -> Result<(), NetworkError> {
382        let primary = self.primary().clone();
383        let secondary = self.secondary().clone();
384        let primary_message = message.clone();
385        let secondary_message = message.clone();
386        let primary_recipients = recipients.clone();
387        self.send_both_networks(
388            message,
389            async move {
390                primary
391                    .da_broadcast_message(primary_message, primary_recipients, BroadcastDelay::None)
392                    .await
393            },
394            async move {
395                secondary
396                    .da_broadcast_message(secondary_message, recipients, BroadcastDelay::None)
397                    .await
398            },
399            broadcast_delay,
400        )
401        .await
402    }
403
404    async fn direct_message(
405        &self,
406        message: Vec<u8>,
407        recipient: TYPES::SignatureKey,
408    ) -> Result<(), NetworkError> {
409        let primary = self.primary().clone();
410        let secondary = self.secondary().clone();
411        let primary_message = message.clone();
412        let secondary_message = message.clone();
413        let primary_recipient = recipient.clone();
414        self.send_both_networks(
415            message,
416            async move {
417                primary
418                    .direct_message(primary_message, primary_recipient)
419                    .await
420            },
421            async move { secondary.direct_message(secondary_message, recipient).await },
422            BroadcastDelay::None,
423        )
424        .await
425    }
426
427    async fn vid_broadcast_message(
428        &self,
429        messages: HashMap<TYPES::SignatureKey, Vec<u8>>,
430    ) -> Result<(), NetworkError> {
431        self.networks.0.vid_broadcast_message(messages).await
432    }
433
434    /// Receive one or many messages from the underlying network.
435    ///
436    /// # Errors
437    /// Does not error
438    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
439        loop {
440            // Receive from both networks
441            let mut primary_fut = self.primary().recv_message().fuse();
442            let mut secondary_fut = self.secondary().recv_message().fuse();
443
444            // Wait for one to return a message
445            let (message, from_primary) = select! {
446                p = primary_fut => (p?, true),
447                s = secondary_fut => (s?, false),
448            };
449
450            // See if we should process the message or not based on the cache
451            if self
452                .message_deduplication_cache
453                .write()
454                .is_unique(&message, from_primary)
455            {
456                break Ok(message);
457            }
458        }
459    }
460
461    fn queue_node_lookup(
462        &self,
463        view_number: ViewNumber,
464        pk: TYPES::SignatureKey,
465    ) -> Result<(), TrySendError<Option<(ViewNumber, TYPES::SignatureKey)>>> {
466        self.primary().queue_node_lookup(view_number, pk.clone())?;
467        self.secondary().queue_node_lookup(view_number, pk)
468    }
469
470    async fn update_view<'a, T>(
471        &'a self,
472        view: u64,
473        epoch: Option<u64>,
474        membership: EpochMembershipCoordinator<T>,
475    ) where
476        T: NodeType<SignatureKey = TYPES::SignatureKey> + 'a,
477    {
478        let delayed_tasks_channels = Arc::clone(&self.delayed_tasks_channels);
479        spawn(async move {
480            let mut map_lock = delayed_tasks_channels.write().await;
481            while let Some((first_view, _)) = map_lock.first_key_value() {
482                // Broadcast a cancelling signal to all the tasks related to each view older than the new one
483                if *first_view < view {
484                    if let Some((_, (sender, _))) = map_lock.pop_first() {
485                        let _ = sender.try_broadcast(());
486                    } else {
487                        break;
488                    }
489                } else {
490                    break;
491                }
492            }
493        });
494        // Run `update_view` logic for the libp2p network
495        self.networks
496            .1
497            .update_view::<T>(view, epoch, membership)
498            .await;
499    }
500
501    fn is_primary_down(&self) -> bool {
502        self.primary_down.load(Ordering::Relaxed)
503    }
504}
505
506struct MessageDeduplicationCache {
507    /// Last n seen messages from the primary network (to prevent processing a duplicate
508    /// received from the secondary network)
509    primary_message_cache: LruCache<blake3::Hash, ()>,
510
511    /// Last n seen messages from the secondary network (to prevent processing a duplicate
512    /// received from the primary network)
513    secondary_message_cache: LruCache<blake3::Hash, ()>,
514}
515
516impl MessageDeduplicationCache {
517    /// Create a new, empty message cache
518    fn new() -> Self {
519        Self {
520            primary_message_cache: LruCache::new(
521                NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
522            ),
523            secondary_message_cache: LruCache::new(
524                NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
525            ),
526        }
527    }
528
529    /// Determine if a message is unique between two sources
530    fn is_unique(&mut self, message: &[u8], from_primary: bool) -> bool {
531        // Calculate the hash of the message
532        let message_hash = blake3::hash(message);
533
534        // Determine which cache to use based on the source of the message
535        let (this_cache, other_cache) = if from_primary {
536            (
537                &mut self.primary_message_cache,
538                &mut self.secondary_message_cache,
539            )
540        } else {
541            (
542                &mut self.secondary_message_cache,
543                &mut self.primary_message_cache,
544            )
545        };
546
547        // Check if we've seen this message from the other source. We want to use `pop` because we
548        // still want to process the message if it gets received again from the same source.
549        if other_cache.pop(&message_hash).is_some() {
550            // We've seen this message from the other source, don't process it again
551            false
552        } else {
553            // First time seeing from this source or already processed duplicate
554            this_cache.put(message_hash, ());
555            true
556        }
557    }
558}
559
560#[cfg(test)]
561mod tests {
562    use super::*;
563
564    #[test]
565    fn test_message_deduplication() {
566        let message = b"hello";
567
568        // Process one message from both. Only the first one should be unique
569        let mut cache = MessageDeduplicationCache::new();
570        assert!(cache.is_unique(message, true));
571        assert!(!cache.is_unique(message, false));
572
573        // Since we've already received it once on both, it should continue to be unique
574        // on the second receive
575        assert!(cache.is_unique(message, true));
576        assert!(!cache.is_unique(message, false));
577
578        // Try both of the above tests the other way around
579        assert!(cache.is_unique(message, false));
580        assert!(!cache.is_unique(message, true));
581        assert!(cache.is_unique(message, false));
582        assert!(!cache.is_unique(message, true));
583
584        // The same message from the same source a few times should always be treated as unique
585        assert!(cache.is_unique(message, true));
586        assert!(cache.is_unique(message, true));
587        assert!(cache.is_unique(message, true));
588        assert!(!cache.is_unique(message, false));
589        assert!(cache.is_unique(message, false));
590    }
591}