hotshot/traits/networking/
combined_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Networking Implementation that has a primary and a fallback network.  If the primary
8//! Errors we will use the backup to send or receive
9use std::{
10    collections::{BTreeMap, HashMap},
11    future::Future,
12    num::NonZeroUsize,
13    sync::{
14        atomic::{AtomicBool, AtomicU64, Ordering},
15        Arc,
16    },
17    time::Duration,
18};
19
20use async_broadcast::{broadcast, InactiveReceiver, Sender};
21use async_lock::RwLock;
22use async_trait::async_trait;
23use futures::{join, select, FutureExt};
24#[cfg(feature = "hotshot-testing")]
25use hotshot_types::traits::network::{
26    AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
27};
28use hotshot_types::{
29    boxed_sync,
30    constants::{
31        COMBINED_NETWORK_CACHE_SIZE, COMBINED_NETWORK_DELAY_DURATION,
32        COMBINED_NETWORK_MIN_PRIMARY_FAILURES, COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL,
33    },
34    data::ViewNumber,
35    epoch_membership::EpochMembershipCoordinator,
36    traits::{
37        network::{BroadcastDelay, ConnectedNetwork, Topic},
38        node_implementation::NodeType,
39    },
40    BoxSyncFuture,
41};
42use lru::LruCache;
43use parking_lot::RwLock as PlRwLock;
44use tokio::{spawn, sync::mpsc::error::TrySendError, time::sleep};
45use tracing::{debug, info, warn};
46
47use super::{push_cdn_network::PushCdnNetwork, NetworkError};
48use crate::traits::implementations::Libp2pNetwork;
49
50/// Thread-safe ref counted lock to a map of channels to the delayed tasks
51type DelayedTasksChannelsMap = Arc<RwLock<BTreeMap<u64, (Sender<()>, InactiveReceiver<()>)>>>;
52
53/// A communication channel with 2 networks, where we can fall back to the slower network if the
54/// primary fails
55#[derive(Clone)]
56pub struct CombinedNetworks<TYPES: NodeType> {
57    /// The two networks we'll use for send/recv
58    networks: Arc<UnderlyingCombinedNetworks<TYPES>>,
59
60    /// The message deduplication cache that we use to prevent duplicate messages between
61    /// sources
62    message_deduplication_cache: Arc<PlRwLock<MessageDeduplicationCache>>,
63
64    /// How many times primary failed to deliver
65    primary_fail_counter: Arc<AtomicU64>,
66
67    /// Whether primary is considered down
68    primary_down: Arc<AtomicBool>,
69
70    /// How long to delay
71    delay_duration: Arc<RwLock<Duration>>,
72
73    /// Channels to the delayed tasks
74    delayed_tasks_channels: DelayedTasksChannelsMap,
75
76    /// How many times messages were sent on secondary without delay because primary is down
77    no_delay_counter: Arc<AtomicU64>,
78}
79
80impl<TYPES: NodeType> CombinedNetworks<TYPES> {
81    /// Constructor
82    ///
83    /// # Panics
84    ///
85    /// Panics if `COMBINED_NETWORK_CACHE_SIZE` is 0
86    #[must_use]
87    pub fn new(
88        primary_network: PushCdnNetwork<TYPES::SignatureKey>,
89        secondary_network: Libp2pNetwork<TYPES>,
90        delay_duration: Option<Duration>,
91    ) -> Self {
92        // Create networks from the ones passed in
93        let networks = Arc::from(UnderlyingCombinedNetworks(
94            primary_network,
95            secondary_network,
96        ));
97
98        Self {
99            networks,
100            message_deduplication_cache: Arc::new(PlRwLock::new(MessageDeduplicationCache::new())),
101            primary_fail_counter: Arc::new(AtomicU64::new(0)),
102            primary_down: Arc::new(AtomicBool::new(false)),
103            delay_duration: Arc::new(RwLock::new(
104                delay_duration.unwrap_or(Duration::from_millis(COMBINED_NETWORK_DELAY_DURATION)),
105            )),
106            delayed_tasks_channels: Arc::default(),
107            no_delay_counter: Arc::new(AtomicU64::new(0)),
108        }
109    }
110
111    /// Get a ref to the primary network
112    #[must_use]
113    pub fn primary(&self) -> &PushCdnNetwork<TYPES::SignatureKey> {
114        &self.networks.0
115    }
116
117    /// Get a ref to the backup network
118    #[must_use]
119    pub fn secondary(&self) -> &Libp2pNetwork<TYPES> {
120        &self.networks.1
121    }
122
123    /// a helper function to send messages through both networks (possibly delayed)
124    async fn send_both_networks(
125        &self,
126        _message: Vec<u8>,
127        primary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
128        secondary_future: impl Future<Output = Result<(), NetworkError>> + Send + 'static,
129        broadcast_delay: BroadcastDelay,
130    ) -> Result<(), NetworkError> {
131        // A local variable used to decide whether to delay this message or not
132        let mut primary_failed = false;
133        if self.primary_down.load(Ordering::Relaxed) {
134            // If the primary is considered down, we don't want to delay
135            primary_failed = true;
136        } else if self.primary_fail_counter.load(Ordering::Relaxed)
137            > COMBINED_NETWORK_MIN_PRIMARY_FAILURES
138        {
139            // If the primary failed more than `COMBINED_NETWORK_MIN_PRIMARY_FAILURES` times,
140            // we don't want to delay this message, and from now on we consider the primary as down
141            info!(
142                "View progression is slower than normally, stop delaying messages on the secondary"
143            );
144            self.primary_down.store(true, Ordering::Relaxed);
145            primary_failed = true;
146        }
147
148        // Always send on the primary network
149        if let Err(e) = primary_future.await {
150            // If the primary failed right away, we don't want to delay this message
151            warn!("Error on primary network: {}", e);
152            self.primary_fail_counter.fetch_add(1, Ordering::Relaxed);
153            primary_failed = true;
154        };
155
156        if let (BroadcastDelay::View(view), false) = (broadcast_delay, primary_failed) {
157            // We are delaying this message
158            let duration = *self.delay_duration.read().await;
159            let primary_down = Arc::clone(&self.primary_down);
160            let primary_fail_counter = Arc::clone(&self.primary_fail_counter);
161            // Each delayed task gets its own receiver clone to get a signal cancelling all tasks
162            // related to the given view.
163            let mut receiver = self
164                .delayed_tasks_channels
165                .write()
166                .await
167                .entry(view)
168                .or_insert_with(|| {
169                    let (s, r) = broadcast(1);
170                    (s, r.deactivate())
171                })
172                .1
173                .activate_cloned();
174            // Spawn a task that sleeps for `duration` and then sends the message if it wasn't cancelled
175            spawn(async move {
176                sleep(duration).await;
177                if receiver.try_recv().is_ok() {
178                    // The task has been cancelled because the view progressed, it means the primary is working fine
179                    debug!(
180                        "Not sending on secondary after delay, task was canceled in view update"
181                    );
182                    match primary_fail_counter.load(Ordering::Relaxed) {
183                        0u64 => {
184                            // The primary fail counter reached 0, the primary is now considered up
185                            primary_down.store(false, Ordering::Relaxed);
186                            debug!("primary_fail_counter reached zero, primary_down set to false");
187                        },
188                        c => {
189                            // Decrement the primary fail counter
190                            primary_fail_counter.store(c - 1, Ordering::Relaxed);
191                            debug!("primary_fail_counter set to {:?}", c - 1);
192                        },
193                    }
194                    return Ok(());
195                }
196                // The task hasn't been cancelled, the primary probably failed.
197                // Increment the primary fail counter and send the message.
198                debug!(
199                    "Sending on secondary after delay, message possibly has not reached recipient \
200                     on primary"
201                );
202                primary_fail_counter.fetch_add(1, Ordering::Relaxed);
203                secondary_future.await
204            });
205            Ok(())
206        } else {
207            // We will send without delay
208            if self.primary_down.load(Ordering::Relaxed) {
209                // If the primary is considered down, we want to periodically delay sending
210                // on the secondary to check whether the primary is able to deliver.
211                // This message will be sent without delay but the next might be delayed.
212                match self.no_delay_counter.load(Ordering::Relaxed) {
213                    c if c < COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL => {
214                        // Just increment the 'no delay counter'
215                        self.no_delay_counter.store(c + 1, Ordering::Relaxed);
216                    },
217                    _ => {
218                        // The 'no delay counter' reached the threshold
219                        debug!(
220                            "Sent on secondary without delay more than {} times,try delaying to \
221                             check primary",
222                            COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL
223                        );
224                        // Reset the 'no delay counter'
225                        self.no_delay_counter.store(0u64, Ordering::Relaxed);
226                        // The primary is not considered down for the moment
227                        self.primary_down.store(false, Ordering::Relaxed);
228                        // The primary fail counter is set just below the threshold to delay the next message
229                        self.primary_fail_counter
230                            .store(COMBINED_NETWORK_MIN_PRIMARY_FAILURES, Ordering::Relaxed);
231                    },
232                }
233            }
234            // Send the message
235            secondary_future.await
236        }
237    }
238}
239
240/// Wrapper for the tuple of `PushCdnNetwork` and `Libp2pNetwork`
241/// We need this so we can impl `TestableNetworkingImplementation`
242/// on the tuple
243#[derive(Clone)]
244pub struct UnderlyingCombinedNetworks<TYPES: NodeType>(
245    pub PushCdnNetwork<TYPES::SignatureKey>,
246    pub Libp2pNetwork<TYPES>,
247);
248
249#[cfg(feature = "hotshot-testing")]
250impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetworks<TYPES> {
251    fn generator(
252        expected_node_count: usize,
253        num_bootstrap: usize,
254        network_id: usize,
255        da_committee_size: usize,
256        reliability_config: Option<Box<dyn NetworkReliability>>,
257        secondary_network_delay: Duration,
258    ) -> AsyncGenerator<Arc<Self>> {
259        let generators = (
260            <PushCdnNetwork<TYPES::SignatureKey> as TestableNetworkingImplementation<TYPES>>::generator(
261                expected_node_count,
262                num_bootstrap,
263                network_id,
264                da_committee_size,
265                None,
266                Duration::default(),
267            ),
268            <Libp2pNetwork<TYPES> as TestableNetworkingImplementation<TYPES>>::generator(
269                expected_node_count,
270                num_bootstrap,
271                network_id,
272                da_committee_size,
273                reliability_config,
274                Duration::default(),
275            )
276        );
277        Box::pin(move |node_id| {
278            let gen0 = generators.0(node_id);
279            let gen1 = generators.1(node_id);
280
281            Box::pin(async move {
282                // Generate the CDN network
283                let cdn = gen0.await;
284                let cdn = Arc::<PushCdnNetwork<TYPES::SignatureKey>>::into_inner(cdn).unwrap();
285
286                // Generate the p2p network
287                let p2p = gen1.await;
288
289                // Combine the two
290                let underlying_combined = UnderlyingCombinedNetworks(
291                    cdn.clone(),
292                    Arc::<Libp2pNetwork<TYPES>>::unwrap_or_clone(p2p),
293                );
294
295                // Create a new message deduplication cache
296                let message_deduplication_cache =
297                    Arc::new(PlRwLock::new(MessageDeduplicationCache::new()));
298
299                // Combine the two networks with the same cache
300                let combined_network = Self {
301                    networks: Arc::new(underlying_combined),
302                    primary_fail_counter: Arc::new(AtomicU64::new(0)),
303                    primary_down: Arc::new(AtomicBool::new(false)),
304                    message_deduplication_cache: Arc::clone(&message_deduplication_cache),
305                    delay_duration: Arc::new(RwLock::new(secondary_network_delay)),
306                    delayed_tasks_channels: Arc::default(),
307                    no_delay_counter: Arc::new(AtomicU64::new(0)),
308                };
309
310                Arc::new(combined_network)
311            })
312        })
313    }
314
315    /// Get the number of messages in-flight.
316    ///
317    /// Some implementations will not be able to tell how many messages there are in-flight. These implementations should return `None`.
318    fn in_flight_message_count(&self) -> Option<usize> {
319        None
320    }
321}
322
323#[async_trait]
324impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks<TYPES> {
325    fn pause(&self) {
326        self.networks.0.pause();
327    }
328
329    fn resume(&self) {
330        self.networks.0.resume();
331    }
332
333    async fn wait_for_ready(&self) {
334        join!(
335            self.primary().wait_for_ready(),
336            self.secondary().wait_for_ready()
337        );
338    }
339
340    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
341    where
342        'a: 'b,
343        Self: 'b,
344    {
345        let closure = async move {
346            join!(self.primary().shut_down(), self.secondary().shut_down());
347        };
348        boxed_sync(closure)
349    }
350
351    async fn broadcast_message(
352        &self,
353        message: Vec<u8>,
354        topic: Topic,
355        broadcast_delay: BroadcastDelay,
356    ) -> Result<(), NetworkError> {
357        let primary = self.primary().clone();
358        let secondary = self.secondary().clone();
359        let primary_message = message.clone();
360        let secondary_message = message.clone();
361        let topic_clone = topic.clone();
362        self.send_both_networks(
363            message,
364            async move {
365                primary
366                    .broadcast_message(primary_message, topic_clone, BroadcastDelay::None)
367                    .await
368            },
369            async move {
370                secondary
371                    .broadcast_message(secondary_message, topic, BroadcastDelay::None)
372                    .await
373            },
374            broadcast_delay,
375        )
376        .await
377    }
378
379    async fn da_broadcast_message(
380        &self,
381        message: Vec<u8>,
382        recipients: Vec<TYPES::SignatureKey>,
383        broadcast_delay: BroadcastDelay,
384    ) -> Result<(), NetworkError> {
385        let primary = self.primary().clone();
386        let secondary = self.secondary().clone();
387        let primary_message = message.clone();
388        let secondary_message = message.clone();
389        let primary_recipients = recipients.clone();
390        self.send_both_networks(
391            message,
392            async move {
393                primary
394                    .da_broadcast_message(primary_message, primary_recipients, BroadcastDelay::None)
395                    .await
396            },
397            async move {
398                secondary
399                    .da_broadcast_message(secondary_message, recipients, BroadcastDelay::None)
400                    .await
401            },
402            broadcast_delay,
403        )
404        .await
405    }
406
407    async fn direct_message(
408        &self,
409        message: Vec<u8>,
410        recipient: TYPES::SignatureKey,
411    ) -> Result<(), NetworkError> {
412        let primary = self.primary().clone();
413        let secondary = self.secondary().clone();
414        let primary_message = message.clone();
415        let secondary_message = message.clone();
416        let primary_recipient = recipient.clone();
417        self.send_both_networks(
418            message,
419            async move {
420                primary
421                    .direct_message(primary_message, primary_recipient)
422                    .await
423            },
424            async move { secondary.direct_message(secondary_message, recipient).await },
425            BroadcastDelay::None,
426        )
427        .await
428    }
429
430    async fn vid_broadcast_message(
431        &self,
432        messages: HashMap<TYPES::SignatureKey, Vec<u8>>,
433    ) -> Result<(), NetworkError> {
434        self.networks.0.vid_broadcast_message(messages).await
435    }
436
437    /// Receive one or many messages from the underlying network.
438    ///
439    /// # Errors
440    /// Does not error
441    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
442        loop {
443            // Receive from both networks
444            let mut primary_fut = self.primary().recv_message().fuse();
445            let mut secondary_fut = self.secondary().recv_message().fuse();
446
447            // Wait for one to return a message
448            let (message, from_primary) = select! {
449                p = primary_fut => (p?, true),
450                s = secondary_fut => (s?, false),
451            };
452
453            // See if we should process the message or not based on the cache
454            if self
455                .message_deduplication_cache
456                .write()
457                .is_unique(&message, from_primary)
458            {
459                break Ok(message);
460            }
461        }
462    }
463
464    fn queue_node_lookup(
465        &self,
466        view_number: ViewNumber,
467        pk: TYPES::SignatureKey,
468    ) -> Result<(), TrySendError<Option<(ViewNumber, TYPES::SignatureKey)>>> {
469        self.primary().queue_node_lookup(view_number, pk.clone())?;
470        self.secondary().queue_node_lookup(view_number, pk)
471    }
472
473    async fn update_view<'a, T>(
474        &'a self,
475        view: u64,
476        epoch: Option<u64>,
477        membership: EpochMembershipCoordinator<T>,
478    ) where
479        T: NodeType<SignatureKey = TYPES::SignatureKey> + 'a,
480    {
481        let delayed_tasks_channels = Arc::clone(&self.delayed_tasks_channels);
482        spawn(async move {
483            let mut map_lock = delayed_tasks_channels.write().await;
484            while let Some((first_view, _)) = map_lock.first_key_value() {
485                // Broadcast a cancelling signal to all the tasks related to each view older than the new one
486                if *first_view < view {
487                    if let Some((_, (sender, _))) = map_lock.pop_first() {
488                        let _ = sender.try_broadcast(());
489                    } else {
490                        break;
491                    }
492                } else {
493                    break;
494                }
495            }
496        });
497        // Run `update_view` logic for the libp2p network
498        self.networks
499            .1
500            .update_view::<T>(view, epoch, membership)
501            .await;
502    }
503
504    fn is_primary_down(&self) -> bool {
505        self.primary_down.load(Ordering::Relaxed)
506    }
507}
508
509struct MessageDeduplicationCache {
510    /// Last n seen messages from the primary network (to prevent processing a duplicate
511    /// received from the secondary network)
512    primary_message_cache: LruCache<blake3::Hash, ()>,
513
514    /// Last n seen messages from the secondary network (to prevent processing a duplicate
515    /// received from the primary network)
516    secondary_message_cache: LruCache<blake3::Hash, ()>,
517}
518
519impl MessageDeduplicationCache {
520    /// Create a new, empty message cache
521    fn new() -> Self {
522        Self {
523            primary_message_cache: LruCache::new(
524                NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
525            ),
526            secondary_message_cache: LruCache::new(
527                NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(),
528            ),
529        }
530    }
531
532    /// Determine if a message is unique between two sources
533    fn is_unique(&mut self, message: &[u8], from_primary: bool) -> bool {
534        // Calculate the hash of the message
535        let message_hash = blake3::hash(message);
536
537        // Determine which cache to use based on the source of the message
538        let (this_cache, other_cache) = if from_primary {
539            (
540                &mut self.primary_message_cache,
541                &mut self.secondary_message_cache,
542            )
543        } else {
544            (
545                &mut self.secondary_message_cache,
546                &mut self.primary_message_cache,
547            )
548        };
549
550        // Check if we've seen this message from the other source. We want to use `pop` because we
551        // still want to process the message if it gets received again from the same source.
552        if other_cache.pop(&message_hash).is_some() {
553            // We've seen this message from the other source, don't process it again
554            false
555        } else {
556            // First time seeing from this source or already processed duplicate
557            this_cache.put(message_hash, ());
558            true
559        }
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566
567    #[test]
568    fn test_message_deduplication() {
569        let message = b"hello";
570
571        // Process one message from both. Only the first one should be unique
572        let mut cache = MessageDeduplicationCache::new();
573        assert!(cache.is_unique(message, true));
574        assert!(!cache.is_unique(message, false));
575
576        // Since we've already received it once on both, it should continue to be unique
577        // on the second receive
578        assert!(cache.is_unique(message, true));
579        assert!(!cache.is_unique(message, false));
580
581        // Try both of the above tests the other way around
582        assert!(cache.is_unique(message, false));
583        assert!(!cache.is_unique(message, true));
584        assert!(cache.is_unique(message, false));
585        assert!(!cache.is_unique(message, true));
586
587        // The same message from the same source a few times should always be treated as unique
588        assert!(cache.is_unique(message, true));
589        assert!(cache.is_unique(message, true));
590        assert!(cache.is_unique(message, true));
591        assert!(!cache.is_unique(message, false));
592        assert!(cache.is_unique(message, false));
593    }
594}