hotshot/traits/networking/
libp2p_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Libp2p based/production networking implementation
8//! This module provides a libp2p based networking implementation where each node in the
9//! network forms a tcp or udp connection to a subset of other nodes in the network
10#[cfg(feature = "hotshot-testing")]
11use std::str::FromStr;
12use std::{
13    cmp::min,
14    collections::{BTreeSet, HashSet},
15    fmt::Debug,
16    net::{IpAddr, ToSocketAddrs},
17    num::NonZeroUsize,
18    sync::{
19        atomic::{AtomicBool, AtomicU64, Ordering},
20        Arc,
21    },
22    time::Duration,
23};
24
25use anyhow::{anyhow, Context};
26use async_lock::RwLock;
27use async_trait::async_trait;
28use bimap::BiMap;
29use futures::future::join_all;
30#[cfg(feature = "hotshot-testing")]
31use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtNoPersistence;
32pub use hotshot_libp2p_networking::network::{GossipConfig, RequestResponseConfig};
33use hotshot_libp2p_networking::{
34    network::{
35        behaviours::dht::{
36            record::{Namespace, RecordKey, RecordValue},
37            store::persistent::DhtPersistentStorage,
38        },
39        spawn_network_node,
40        transport::construct_auth_message,
41        NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg},
42        NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeReceiver,
43        DEFAULT_REPLICATION_FACTOR,
44    },
45    reexport::Multiaddr,
46};
47#[cfg(feature = "hotshot-testing")]
48use hotshot_types::traits::network::{
49    AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
50};
51use hotshot_types::{
52    boxed_sync,
53    constants::LOOK_AHEAD,
54    data::ViewNumber,
55    network::NetworkConfig,
56    traits::{
57        metrics::{Counter, Gauge, Metrics, NoMetrics},
58        network::{ConnectedNetwork, NetworkError, Topic},
59        node_implementation::{ConsensusTime, NodeType},
60        signature_key::{PrivateSignatureKey, SignatureKey},
61    },
62    BoxSyncFuture,
63};
64use libp2p_identity::{
65    ed25519::{self, SecretKey},
66    Keypair, PeerId,
67};
68use serde::Serialize;
69use tokio::{
70    select, spawn,
71    sync::{
72        mpsc::{channel, error::TrySendError, Receiver, Sender},
73        Mutex,
74    },
75    time::sleep,
76};
77use tracing::{error, info, instrument, trace, warn};
78
79use crate::{BroadcastDelay, EpochMembershipCoordinator};
80
81/// Libp2p-specific metrics
82#[derive(Clone, Debug)]
83pub struct Libp2pMetricsValue {
84    /// The number of currently connected peers
85    pub num_connected_peers: Box<dyn Gauge>,
86    /// The number of failed messages
87    pub num_failed_messages: Box<dyn Counter>,
88    /// Whether or not the network is considered ready
89    pub is_ready: Box<dyn Gauge>,
90}
91
92impl Libp2pMetricsValue {
93    /// Populate the metrics with Libp2p-specific metrics
94    pub fn new(metrics: &dyn Metrics) -> Self {
95        // Create a `libp2p subgroup
96        let subgroup = metrics.subgroup("libp2p".into());
97
98        // Create the metrics
99        Self {
100            num_connected_peers: subgroup.create_gauge("num_connected_peers".into(), None),
101            num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
102            is_ready: subgroup.create_gauge("is_ready".into(), None),
103        }
104    }
105}
106
107impl Default for Libp2pMetricsValue {
108    /// Initialize with empty metrics
109    fn default() -> Self {
110        Self::new(&*NoMetrics::boxed())
111    }
112}
113
114/// convenience alias for the type for bootstrap addresses
115/// concurrency primitives are needed for having tests
116pub type BootstrapAddrs = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
117
118/// hardcoded topic of QC used
119pub const QC_TOPIC: &str = "global";
120
121/// Stubbed out Ack
122///
123/// Note: as part of versioning for upgradability,
124/// all network messages must begin with a 4-byte version number.
125///
126/// Hence:
127///   * `Empty` *must* be a struct (enums are serialized with a leading byte for the variant), and
128///   * we must have an explicit version field.
129#[derive(Serialize)]
130pub struct Empty {
131    /// This should not be required, but it is. Version automatically gets prepended.
132    /// Perhaps this could be replaced with something zero-sized and serializable.
133    byte: u8,
134}
135
136impl<T: NodeType> Debug for Libp2pNetwork<T> {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        f.debug_struct("Libp2p").field("inner", &"inner").finish()
139    }
140}
141
142/// Type alias for a shared collection of peerid, multiaddrs
143pub type PeerInfoVec = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
144
145/// The underlying state of the libp2p network
146#[derive(Debug)]
147struct Libp2pNetworkInner<T: NodeType> {
148    /// this node's public key
149    pk: T::SignatureKey,
150    /// handle to control the network
151    handle: Arc<NetworkNodeHandle<T>>,
152    /// Message Receiver
153    receiver: Mutex<Receiver<Vec<u8>>>,
154    /// Sender for broadcast messages
155    sender: Sender<Vec<u8>>,
156    /// Sender for node lookup (relevant view number, key of node) (None for shutdown)
157    node_lookup_send: Sender<Option<(ViewNumber, T::SignatureKey)>>,
158    /// this is really cheating to enable local tests
159    /// hashset of (bootstrap_addr, peer_id)
160    bootstrap_addrs: PeerInfoVec,
161    /// whether or not the network is ready to send
162    is_ready: Arc<AtomicBool>,
163    /// max time before dropping message due to DHT error
164    dht_timeout: Duration,
165    /// whether or not we've bootstrapped into the DHT yet
166    is_bootstrapped: Arc<AtomicBool>,
167    /// The Libp2p metrics we're managing
168    metrics: Libp2pMetricsValue,
169    /// The list of topics we're subscribed to
170    subscribed_topics: HashSet<String>,
171    /// the latest view number (for node lookup purposes)
172    /// NOTE: supposed to represent a ViewNumber but we
173    /// haven't made that atomic yet and we prefer lock-free
174    latest_seen_view: Arc<AtomicU64>,
175    #[cfg(feature = "hotshot-testing")]
176    /// reliability_config
177    reliability_config: Option<Box<dyn NetworkReliability>>,
178    /// Killswitch sender
179    kill_switch: Sender<()>,
180}
181
182/// Networking implementation that uses libp2p
183/// generic over `M` which is the message type
184#[derive(Clone)]
185pub struct Libp2pNetwork<T: NodeType> {
186    /// holds the state of the libp2p network
187    inner: Arc<Libp2pNetworkInner<T>>,
188}
189
190#[cfg(feature = "hotshot-testing")]
191impl<T: NodeType> TestableNetworkingImplementation<T> for Libp2pNetwork<T> {
192    /// Returns a boxed function `f(node_id, public_key) -> Libp2pNetwork`
193    /// with the purpose of generating libp2p networks.
194    /// Generates `num_bootstrap` bootstrap nodes. The remainder of nodes are normal
195    /// nodes with sane defaults.
196    /// # Panics
197    /// Returned function may panic either:
198    /// - An invalid configuration
199    ///   (probably an issue with the defaults of this function)
200    /// - An inability to spin up the replica's network
201    #[allow(clippy::panic, clippy::too_many_lines)]
202    fn generator(
203        expected_node_count: usize,
204        num_bootstrap: usize,
205        _network_id: usize,
206        da_committee_size: usize,
207        reliability_config: Option<Box<dyn NetworkReliability>>,
208        _secondary_network_delay: Duration,
209    ) -> AsyncGenerator<Arc<Self>> {
210        assert!(
211            da_committee_size <= expected_node_count,
212            "DA committee size must be less than or equal to total # nodes"
213        );
214        let bootstrap_addrs: PeerInfoVec = Arc::default();
215        let node_ids: Arc<RwLock<HashSet<u64>>> = Arc::default();
216
217        // NOTE uncomment this for easier debugging
218        // let start_port = 5000;
219        Box::pin({
220            move |node_id| {
221                info!(
222                    "GENERATOR: Node id {:?}, is bootstrap: {:?}",
223                    node_id,
224                    node_id < num_bootstrap as u64
225                );
226
227                // pick a free, unused UDP port for testing
228                let port = portpicker::pick_unused_port().expect("Could not find an open port");
229
230                let addr =
231                    Multiaddr::from_str(&format!("/ip4/127.0.0.1/udp/{port}/quic-v1")).unwrap();
232
233                // We assign node's public key and stake value rather than read from config file since it's a test
234                let privkey = T::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
235                let pubkey = T::SignatureKey::from_private(&privkey);
236
237                // Derive the Libp2p keypair from the private key
238                let libp2p_keypair = derive_libp2p_keypair::<T::SignatureKey>(&privkey)
239                    .expect("Failed to derive libp2p keypair");
240
241                // Sign the lookup record
242                let lookup_record_value = RecordValue::new_signed(
243                    &RecordKey::new(Namespace::Lookup, pubkey.to_bytes()),
244                    libp2p_keypair.public().to_peer_id().to_bytes(),
245                    &privkey,
246                )
247                .expect("Failed to sign DHT lookup record");
248
249                // We want at least 2/3 of the nodes to have any given record in the DHT
250                let replication_factor =
251                    NonZeroUsize::new((2 * expected_node_count).div_ceil(3)).unwrap();
252
253                // Build the network node configuration
254                let config = NetworkNodeConfigBuilder::default()
255                    .keypair(libp2p_keypair)
256                    .replication_factor(replication_factor)
257                    .bind_address(Some(addr))
258                    .to_connect_addrs(HashSet::default())
259                    .republication_interval(None)
260                    .build()
261                    .expect("Failed to build network node config");
262
263                let bootstrap_addrs_ref = Arc::clone(&bootstrap_addrs);
264                let node_ids_ref = Arc::clone(&node_ids);
265                let reliability_config_dup = reliability_config.clone();
266
267                Box::pin(async move {
268                    // If it's the second time we are starting this network, clear the bootstrap info
269                    let mut write_ids = node_ids_ref.write().await;
270                    if write_ids.contains(&node_id) {
271                        write_ids.clear();
272                        bootstrap_addrs_ref.write().await.clear();
273                    }
274                    write_ids.insert(node_id);
275                    drop(write_ids);
276                    Arc::new(
277                        match Libp2pNetwork::new(
278                            Libp2pMetricsValue::default(),
279                            DhtNoPersistence,
280                            config,
281                            pubkey.clone(),
282                            lookup_record_value,
283                            bootstrap_addrs_ref,
284                            usize::try_from(node_id).unwrap(),
285                            #[cfg(feature = "hotshot-testing")]
286                            reliability_config_dup,
287                        )
288                        .await
289                        {
290                            Ok(network) => network,
291                            Err(err) => {
292                                panic!("Failed to create libp2p network: {err:?}");
293                            },
294                        },
295                    )
296                })
297            }
298        })
299    }
300
301    fn in_flight_message_count(&self) -> Option<usize> {
302        None
303    }
304}
305
306/// Derive a Libp2p keypair from a given private key
307///
308/// # Errors
309/// If we are unable to derive a new `SecretKey` from the `blake3`-derived
310/// bytes.
311pub fn derive_libp2p_keypair<K: SignatureKey>(
312    private_key: &K::PrivateKey,
313) -> anyhow::Result<Keypair> {
314    // Derive a secondary key from our primary private key
315    let derived_key = blake3::derive_key("libp2p key", &private_key.to_bytes());
316    let derived_key = SecretKey::try_from_bytes(derived_key)?;
317
318    // Create an `ed25519` keypair from the derived key
319    Ok(ed25519::Keypair::from(derived_key).into())
320}
321
322/// Derive a Libp2p Peer ID from a given private key
323///
324/// # Errors
325/// If we are unable to derive a Libp2p keypair
326pub fn derive_libp2p_peer_id<K: SignatureKey>(
327    private_key: &K::PrivateKey,
328) -> anyhow::Result<PeerId> {
329    // Get the derived keypair
330    let keypair = derive_libp2p_keypair::<K>(private_key)?;
331
332    // Return the PeerID derived from the public key
333    Ok(PeerId::from_public_key(&keypair.public()))
334}
335
336/// Parse a Libp2p Multiaddr from a string. The input string should be in the format
337/// `hostname:port` or `ip:port`. This function derives a `Multiaddr` from the input string.
338///
339/// This borrows from Rust's implementation of `to_socket_addrs` but will only warn if the domain
340/// does not yet resolve.
341///
342/// # Errors
343/// - If the input string is not in the correct format
344pub fn derive_libp2p_multiaddr(addr: &String) -> anyhow::Result<Multiaddr> {
345    // Split the address into the host and port parts
346    let (host, port) = match addr.rfind(':') {
347        Some(idx) => (&addr[..idx], &addr[idx + 1..]),
348        None => return Err(anyhow!("Invalid address format, no port supplied")),
349    };
350
351    // Try parsing the host as an IP address
352    let ip = host.parse::<IpAddr>();
353
354    // Conditionally build the multiaddr string
355    let multiaddr_string = match ip {
356        Ok(IpAddr::V4(ip)) => format!("/ip4/{ip}/udp/{port}/quic-v1"),
357        Ok(IpAddr::V6(ip)) => format!("/ip6/{ip}/udp/{port}/quic-v1"),
358        Err(_) => {
359            // Try resolving the host. If it fails, continue but warn the user
360            let lookup_result = addr.to_socket_addrs();
361
362            // See if the lookup failed
363            let failed = lookup_result
364                .map(|result| result.collect::<Vec<_>>().is_empty())
365                .unwrap_or(true);
366
367            // If it did, warn the user
368            if failed {
369                warn!(
370                    "Failed to resolve domain name {host}, assuming it has not yet been \
371                     provisioned"
372                );
373            }
374
375            format!("/dns/{host}/udp/{port}/quic-v1")
376        },
377    };
378
379    // Convert the multiaddr string to a `Multiaddr`
380    multiaddr_string.parse().with_context(|| {
381        format!("Failed to convert Multiaddr string to Multiaddr: {multiaddr_string}")
382    })
383}
384
385impl<T: NodeType> Libp2pNetwork<T> {
386    /// Create and return a Libp2p network from a network config file
387    /// and various other configuration-specific values.
388    ///
389    /// # Errors
390    /// If we are unable to parse a Multiaddress
391    ///
392    /// # Panics
393    /// If we are unable to calculate the replication factor
394    #[allow(clippy::too_many_arguments)]
395    pub async fn from_config<D: DhtPersistentStorage>(
396        mut config: NetworkConfig<T>,
397        dht_persistent_storage: D,
398        quorum_membership: Arc<RwLock<T::Membership>>,
399        gossip_config: GossipConfig,
400        request_response_config: RequestResponseConfig,
401        bind_address: Multiaddr,
402        pub_key: &T::SignatureKey,
403        priv_key: &<T::SignatureKey as SignatureKey>::PrivateKey,
404        metrics: Libp2pMetricsValue,
405    ) -> anyhow::Result<Self> {
406        // Try to take our Libp2p config from our broader network config
407        let libp2p_config = config
408            .libp2p_config
409            .take()
410            .ok_or(anyhow!("Libp2p config not supplied"))?;
411
412        // Derive our Libp2p keypair from our supplied private key
413        let keypair = derive_libp2p_keypair::<T::SignatureKey>(priv_key)?;
414
415        // Build our libp2p configuration
416        let mut config_builder = NetworkNodeConfigBuilder::default();
417
418        // Set the gossip configuration
419        config_builder.gossip_config(gossip_config.clone());
420        config_builder.request_response_config(request_response_config);
421
422        // Construct the auth message
423        let auth_message =
424            construct_auth_message(pub_key, &keypair.public().to_peer_id(), priv_key)
425                .with_context(|| "Failed to construct auth message")?;
426
427        // Set the auth message and stake table
428        config_builder
429            .membership(Some(quorum_membership))
430            .auth_message(Some(auth_message));
431
432        // The replication factor is the minimum of [the default and 2/3 the number of nodes]
433        let Some(default_replication_factor) = DEFAULT_REPLICATION_FACTOR else {
434            return Err(anyhow!("Default replication factor not supplied"));
435        };
436
437        let replication_factor = NonZeroUsize::new(min(
438            default_replication_factor.get(),
439            config.config.num_nodes_with_stake.get() / 2,
440        ))
441        .with_context(|| "Failed to calculate replication factor")?;
442
443        // Sign our DHT lookup record
444        let lookup_record_value = RecordValue::new_signed(
445            &RecordKey::new(Namespace::Lookup, pub_key.to_bytes()),
446            // The value is our Libp2p Peer ID
447            keypair.public().to_peer_id().to_bytes(),
448            priv_key,
449        )
450        .with_context(|| "Failed to sign DHT lookup record")?;
451
452        config_builder
453            .keypair(keypair)
454            .replication_factor(replication_factor)
455            .bind_address(Some(bind_address.clone()));
456
457        // Connect to the provided bootstrap nodes
458        config_builder.to_connect_addrs(HashSet::from_iter(libp2p_config.bootstrap_nodes.clone()));
459
460        // Build the node's configuration
461        let node_config = config_builder.build()?;
462
463        // Calculate all keys so we can keep track of direct message recipients
464        let mut all_keys = BTreeSet::new();
465
466        // Insert all known nodes into the set of all keys
467        for node in config.config.known_nodes_with_stake {
468            all_keys.insert(T::SignatureKey::public_key(&node.stake_table_entry));
469        }
470
471        Ok(Libp2pNetwork::new(
472            metrics,
473            dht_persistent_storage,
474            node_config,
475            pub_key.clone(),
476            lookup_record_value,
477            Arc::new(RwLock::new(libp2p_config.bootstrap_nodes)),
478            usize::try_from(config.node_index)?,
479            #[cfg(feature = "hotshot-testing")]
480            None,
481        )
482        .await?)
483    }
484
485    /// Returns whether or not the network has any peers.
486    #[must_use]
487    pub fn has_peers(&self) -> bool {
488        self.inner.is_ready.load(Ordering::Relaxed)
489    }
490
491    /// Returns only when the network is ready.
492    pub async fn wait_for_peers(&self) {
493        loop {
494            if self.has_peers() {
495                break;
496            }
497            sleep(Duration::from_secs(1)).await;
498        }
499    }
500
501    /// Constructs new network for a node. Note that this network is unconnected.
502    /// One must call `connect` in order to connect.
503    /// * `config`: the configuration of the node
504    /// * `pk`: public key associated with the node
505    /// * `bootstrap_addrs`: rwlock containing the bootstrap addrs
506    /// # Errors
507    /// Returns error in the event that the underlying libp2p network
508    /// is unable to create a network.
509    ///
510    /// # Panics
511    ///
512    /// This will panic if there are less than 5 bootstrap nodes
513    #[allow(clippy::too_many_arguments)]
514    pub async fn new<D: DhtPersistentStorage>(
515        metrics: Libp2pMetricsValue,
516        dht_persistent_storage: D,
517        config: NetworkNodeConfig<T>,
518        pk: T::SignatureKey,
519        lookup_record_value: RecordValue<T::SignatureKey>,
520        bootstrap_addrs: BootstrapAddrs,
521        id: usize,
522        #[cfg(feature = "hotshot-testing")] reliability_config: Option<Box<dyn NetworkReliability>>,
523    ) -> Result<Libp2pNetwork<T>, NetworkError> {
524        // Create a map from consensus keys to Libp2p peer IDs
525        let consensus_key_to_pid_map = Arc::new(parking_lot::Mutex::new(BiMap::new()));
526
527        let (mut rx, network_handle) = spawn_network_node::<T, D>(
528            config.clone(),
529            dht_persistent_storage,
530            Arc::clone(&consensus_key_to_pid_map),
531            id,
532        )
533        .await
534        .map_err(|e| NetworkError::ConfigError(format!("failed to spawn network node: {e}")))?;
535
536        // Add our own address to the bootstrap addresses
537        let addr = network_handle.listen_addr();
538        let pid = network_handle.peer_id();
539        bootstrap_addrs.write().await.push((pid, addr));
540
541        // Subscribe to the relevant topics
542        let subscribed_topics = HashSet::from_iter(vec![QC_TOPIC.to_string()]);
543
544        // unbounded channels may not be the best choice (spammed?)
545        // if bounded figure out a way to log dropped msgs
546        let (sender, receiver) = channel(1000);
547        let (node_lookup_send, node_lookup_recv) = channel(10);
548        let (kill_tx, kill_rx) = channel(1);
549        rx.set_kill_switch(kill_rx);
550
551        let mut result = Libp2pNetwork {
552            inner: Arc::new(Libp2pNetworkInner {
553                handle: Arc::new(network_handle),
554                receiver: Mutex::new(receiver),
555                sender: sender.clone(),
556                pk,
557                bootstrap_addrs,
558                is_ready: Arc::new(AtomicBool::new(false)),
559                // This is optimal for 10-30 nodes. TODO: parameterize this for both tests and examples
560                dht_timeout: config.dht_timeout.unwrap_or(Duration::from_secs(120)),
561                is_bootstrapped: Arc::new(AtomicBool::new(false)),
562                metrics,
563                subscribed_topics,
564                node_lookup_send,
565                // Start the latest view from 0. "Latest" refers to "most recent view we are polling for
566                // proposals on". We need this because to have consensus info injected we need a working
567                // network already. In the worst case, we send a few lookups we don't need.
568                latest_seen_view: Arc::new(AtomicU64::new(0)),
569                #[cfg(feature = "hotshot-testing")]
570                reliability_config,
571                kill_switch: kill_tx,
572            }),
573        };
574
575        // Set the network as not ready
576        result.inner.metrics.is_ready.set(0);
577
578        result.handle_event_generator(sender, rx);
579        result.spawn_node_lookup(node_lookup_recv);
580        result.spawn_connect(id, lookup_record_value);
581
582        Ok(result)
583    }
584
585    /// Spawns task for looking up nodes pre-emptively
586    #[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)]
587    fn spawn_node_lookup(
588        &self,
589        mut node_lookup_recv: Receiver<Option<(ViewNumber, T::SignatureKey)>>,
590    ) {
591        let handle = Arc::clone(&self.inner.handle);
592        let dht_timeout = self.inner.dht_timeout;
593        let latest_seen_view = Arc::clone(&self.inner.latest_seen_view);
594
595        // deals with handling lookup queue. should be infallible
596        spawn(async move {
597            // cancels on shutdown
598            while let Some(Some((view_number, pk))) = node_lookup_recv.recv().await {
599                /// defines lookahead threshold based on the constant
600                #[allow(clippy::cast_possible_truncation)]
601                const THRESHOLD: u64 = (LOOK_AHEAD as f64 * 0.8) as u64;
602
603                trace!("Performing lookup for peer {pk}");
604
605                // only run if we are not too close to the next view number
606                if latest_seen_view.load(Ordering::Relaxed) + THRESHOLD <= *view_number {
607                    // look up
608                    if let Err(err) = handle.lookup_node(&pk, dht_timeout).await {
609                        warn!("Failed to perform lookup for key {pk}: {err}");
610                    };
611                }
612            }
613        });
614    }
615
616    /// Initiates connection to the outside world
617    fn spawn_connect(&mut self, id: usize, lookup_record_value: RecordValue<T::SignatureKey>) {
618        let pk = self.inner.pk.clone();
619        let bootstrap_ref = Arc::clone(&self.inner.bootstrap_addrs);
620        let handle = Arc::clone(&self.inner.handle);
621        let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
622        let inner = Arc::clone(&self.inner);
623
624        spawn({
625            let is_ready = Arc::clone(&self.inner.is_ready);
626            async move {
627                let bs_addrs = bootstrap_ref.read().await.clone();
628
629                // Add known peers to the network
630                handle.add_known_peers(bs_addrs).unwrap();
631
632                // Begin the bootstrap process
633                handle.begin_bootstrap()?;
634                while !is_bootstrapped.load(Ordering::Relaxed) {
635                    sleep(Duration::from_secs(1)).await;
636                    handle.begin_bootstrap()?;
637                }
638
639                // Subscribe to the QC topic
640                handle.subscribe(QC_TOPIC.to_string()).await.unwrap();
641
642                // Map our staking key to our Libp2p Peer ID so we can properly
643                // route direct messages
644                while handle
645                    .put_record(
646                        RecordKey::new(Namespace::Lookup, pk.to_bytes()),
647                        lookup_record_value.clone(),
648                    )
649                    .await
650                    .is_err()
651                {
652                    sleep(Duration::from_secs(1)).await;
653                }
654
655                // Wait for the network to connect to at least 1 peer
656                if let Err(e) = handle.wait_to_connect(1, id).await {
657                    error!("Failed to connect to peers: {e:?}");
658                    return Err::<(), NetworkError>(e);
659                }
660                info!("Connected to required number of peers");
661
662                // Set the network as ready
663                is_ready.store(true, Ordering::Relaxed);
664                inner.metrics.is_ready.set(1);
665
666                Ok::<(), NetworkError>(())
667            }
668        });
669    }
670
671    /// Handle events
672    fn handle_recvd_events(
673        &self,
674        msg: NetworkEvent,
675        sender: &Sender<Vec<u8>>,
676    ) -> Result<(), NetworkError> {
677        match msg {
678            GossipMsg(msg) => {
679                sender.try_send(msg).map_err(|err| {
680                    NetworkError::ChannelSendError(format!("failed to send gossip message: {err}"))
681                })?;
682            },
683            DirectRequest(msg, _pid, chan) => {
684                sender.try_send(msg).map_err(|err| {
685                    NetworkError::ChannelSendError(format!(
686                        "failed to send direct request message: {err}"
687                    ))
688                })?;
689                if self
690                    .inner
691                    .handle
692                    .direct_response(
693                        chan,
694                        &bincode::serialize(&Empty { byte: 0u8 }).map_err(|e| {
695                            NetworkError::FailedToSerialize(format!(
696                                "failed to serialize acknowledgement: {e}"
697                            ))
698                        })?,
699                    )
700                    .is_err()
701                {
702                    error!("failed to ack!");
703                };
704            },
705            DirectResponse(_msg, _) => {},
706            NetworkEvent::IsBootstrapped => {
707                error!(
708                    "handle_recvd_events received `NetworkEvent::IsBootstrapped`, which should be \
709                     impossible."
710                );
711            },
712            NetworkEvent::ConnectedPeersUpdate(_) => {},
713        }
714        Ok::<(), NetworkError>(())
715    }
716
717    /// task to propagate messages to handlers
718    /// terminates on shut down of network
719    fn handle_event_generator(&self, sender: Sender<Vec<u8>>, mut network_rx: NetworkNodeReceiver) {
720        let handle = self.clone();
721        let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
722        spawn(async move {
723            let Some(mut kill_switch) = network_rx.take_kill_switch() else {
724                tracing::error!(
725                    "`spawn_handle` was called on a network handle that was already closed"
726                );
727                return;
728            };
729
730            loop {
731                select! {
732                    msg = network_rx.recv() => {
733                        let Ok(message) = msg else {
734                            warn!("Network receiver shut down!");
735                            return;
736                        };
737
738                        match message {
739                            NetworkEvent::IsBootstrapped => {
740                                is_bootstrapped.store(true, Ordering::Relaxed);
741                            }
742                            GossipMsg(_) | DirectRequest(_, _, _) | DirectResponse(_, _) => {
743                                let _ = handle.handle_recvd_events(message, &sender);
744                            }
745                            NetworkEvent::ConnectedPeersUpdate(num_peers) => {
746                                handle.inner.metrics.num_connected_peers.set(num_peers);
747                            }
748                        }
749                    }
750
751                    _kill_switch = kill_switch.recv() => {
752                        warn!("Event Handler shutdown");
753                        return;
754                    }
755                }
756            }
757        });
758    }
759}
760
761#[async_trait]
762impl<T: NodeType> ConnectedNetwork<T::SignatureKey> for Libp2pNetwork<T> {
763    #[instrument(name = "Libp2pNetwork::ready_blocking", skip_all)]
764    async fn wait_for_ready(&self) {
765        self.wait_for_peers().await;
766    }
767
768    fn pause(&self) {
769        unimplemented!("Pausing not implemented for the Libp2p network");
770    }
771
772    fn resume(&self) {
773        unimplemented!("Resuming not implemented for the Libp2p network");
774    }
775
776    #[instrument(name = "Libp2pNetwork::shut_down", skip_all)]
777    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
778    where
779        'a: 'b,
780        Self: 'b,
781    {
782        let closure = async move {
783            let _ = self.inner.handle.shutdown().await;
784            let _ = self.inner.node_lookup_send.send(None).await;
785            let _ = self.inner.kill_switch.send(()).await;
786        };
787        boxed_sync(closure)
788    }
789
790    #[instrument(name = "Libp2pNetwork::broadcast_message", skip_all)]
791    async fn broadcast_message(
792        &self,
793        message: Vec<u8>,
794        topic: Topic,
795        _broadcast_delay: BroadcastDelay,
796    ) -> Result<(), NetworkError> {
797        // If we're not ready yet (we don't have any peers, error)
798        if !self.has_peers() {
799            self.inner.metrics.num_failed_messages.add(1);
800            return Err(NetworkError::NoPeersYet);
801        };
802
803        // If we are subscribed to the topic,
804        let topic = topic.to_string();
805        if self.inner.subscribed_topics.contains(&topic) {
806            // Short-circuit-send the message to ourselves
807            self.inner.sender.try_send(message.clone()).map_err(|_| {
808                self.inner.metrics.num_failed_messages.add(1);
809                NetworkError::ShutDown
810            })?;
811        }
812
813        // NOTE: metrics is threadsafe, so clone is fine (and lightweight)
814        #[cfg(feature = "hotshot-testing")]
815        {
816            let metrics = self.inner.metrics.clone();
817            if let Some(ref config) = &self.inner.reliability_config {
818                let handle = Arc::clone(&self.inner.handle);
819
820                let fut = config.clone().chaos_send_msg(
821                    message,
822                    Arc::new(move |msg: Vec<u8>| {
823                        let topic_2 = topic.clone();
824                        let handle_2 = Arc::clone(&handle);
825                        let metrics_2 = metrics.clone();
826                        boxed_sync(async move {
827                            if let Err(e) = handle_2.gossip_no_serialize(topic_2, msg) {
828                                metrics_2.num_failed_messages.add(1);
829                                warn!("Failed to broadcast to libp2p: {e:?}");
830                            }
831                        })
832                    }),
833                );
834                spawn(fut);
835                return Ok(());
836            }
837        }
838
839        if let Err(e) = self.inner.handle.gossip(topic, &message) {
840            self.inner.metrics.num_failed_messages.add(1);
841            return Err(e);
842        }
843
844        Ok(())
845    }
846
847    #[instrument(name = "Libp2pNetwork::da_broadcast_message", skip_all)]
848    async fn da_broadcast_message(
849        &self,
850        message: Vec<u8>,
851        recipients: Vec<T::SignatureKey>,
852        _broadcast_delay: BroadcastDelay,
853    ) -> Result<(), NetworkError> {
854        // If we're not ready yet (we don't have any peers, error)
855        if !self.has_peers() {
856            self.inner.metrics.num_failed_messages.add(1);
857            return Err(NetworkError::NoPeersYet);
858        };
859
860        let future_results = recipients
861            .into_iter()
862            .map(|r| self.direct_message(message.clone(), r));
863        let results = join_all(future_results).await;
864
865        let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
866
867        if errors.is_empty() {
868            Ok(())
869        } else {
870            Err(NetworkError::Multiple(errors))
871        }
872    }
873
874    #[instrument(name = "Libp2pNetwork::direct_message", skip_all)]
875    async fn direct_message(
876        &self,
877        message: Vec<u8>,
878        recipient: T::SignatureKey,
879    ) -> Result<(), NetworkError> {
880        // If we're not ready yet (we don't have any peers, error)
881        if !self.has_peers() {
882            self.inner.metrics.num_failed_messages.add(1);
883            return Err(NetworkError::NoPeersYet);
884        };
885
886        // short circuit if we're dming ourselves
887        if recipient == self.inner.pk {
888            // panic if we already shut down?
889            self.inner.sender.try_send(message).map_err(|_x| {
890                self.inner.metrics.num_failed_messages.add(1);
891                NetworkError::ShutDown
892            })?;
893            return Ok(());
894        }
895
896        let pid = match self
897            .inner
898            .handle
899            .lookup_node(&recipient, self.inner.dht_timeout)
900            .await
901        {
902            Ok(pid) => pid,
903            Err(err) => {
904                self.inner.metrics.num_failed_messages.add(1);
905                return Err(NetworkError::LookupError(format!(
906                    "failed to look up node for direct message: {err}"
907                )));
908            },
909        };
910
911        #[cfg(feature = "hotshot-testing")]
912        {
913            let metrics = self.inner.metrics.clone();
914            if let Some(ref config) = &self.inner.reliability_config {
915                let handle = Arc::clone(&self.inner.handle);
916
917                let fut = config.clone().chaos_send_msg(
918                    message,
919                    Arc::new(move |msg: Vec<u8>| {
920                        let handle_2 = Arc::clone(&handle);
921                        let metrics_2 = metrics.clone();
922                        boxed_sync(async move {
923                            if let Err(e) = handle_2.direct_request_no_serialize(pid, msg) {
924                                metrics_2.num_failed_messages.add(1);
925                                warn!("Failed to broadcast to libp2p: {e:?}");
926                            }
927                        })
928                    }),
929                );
930                spawn(fut);
931                return Ok(());
932            }
933        }
934
935        match self.inner.handle.direct_request(pid, &message) {
936            Ok(()) => Ok(()),
937            Err(e) => {
938                self.inner.metrics.num_failed_messages.add(1);
939                Err(e)
940            },
941        }
942    }
943
944    /// Receive one or many messages from the underlying network.
945    ///
946    /// # Errors
947    /// If there is a network-related failure.
948    #[instrument(name = "Libp2pNetwork::recv_message", skip_all)]
949    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
950        let result = self
951            .inner
952            .receiver
953            .lock()
954            .await
955            .recv()
956            .await
957            .ok_or(NetworkError::ShutDown)?;
958
959        Ok(result)
960    }
961
962    #[instrument(name = "Libp2pNetwork::queue_node_lookup", skip_all)]
963    #[allow(clippy::type_complexity)]
964    fn queue_node_lookup(
965        &self,
966        view_number: ViewNumber,
967        pk: T::SignatureKey,
968    ) -> Result<(), TrySendError<Option<(ViewNumber, T::SignatureKey)>>> {
969        self.inner
970            .node_lookup_send
971            .try_send(Some((view_number, pk)))
972    }
973
974    /// The libp2p view update is a special operation intrinsic to its internal behavior.
975    ///
976    /// Libp2p needs to do a lookup because a libp2p address is not related to
977    /// hotshot keys. So in libp2p we store a mapping of HotShot key to libp2p address
978    /// in a distributed hash table.
979    ///
980    /// This means to directly message someone on libp2p we need to lookup in the hash
981    /// table what their libp2p address is, using their HotShot public key as the key.
982    ///
983    /// So the logic with libp2p is to prefetch upcoming leaders libp2p address to
984    /// save time when we later need to direct message the leader our vote. Hence the
985    /// use of the future view and leader to queue the lookups.
986    async fn update_view<'a, TYPES>(
987        &'a self,
988        view: u64,
989        epoch: Option<u64>,
990        membership_coordinator: EpochMembershipCoordinator<TYPES>,
991    ) where
992        TYPES: NodeType<SignatureKey = T::SignatureKey> + 'a,
993    {
994        let future_view = <TYPES as NodeType>::View::new(view) + LOOK_AHEAD;
995        let epoch = epoch.map(<TYPES as NodeType>::Epoch::new);
996
997        let membership = match membership_coordinator.membership_for_epoch(epoch).await {
998            Ok(m) => m,
999            Err(e) => {
1000                return tracing::warn!(e.message);
1001            },
1002        };
1003        let future_leader = match membership.leader(future_view).await {
1004            Ok(l) => l,
1005            Err(e) => {
1006                return tracing::info!("Failed to calculate leader for view {future_view}: {e}");
1007            },
1008        };
1009
1010        let _ = self
1011            .queue_node_lookup(ViewNumber::new(*future_view), future_leader)
1012            .map_err(|err| tracing::warn!("failed to process node lookup request: {err}"));
1013    }
1014}
1015
1016#[cfg(test)]
1017mod test {
1018    mod derive_multiaddr {
1019        use std::net::Ipv6Addr;
1020
1021        use super::super::*;
1022
1023        /// Test derivation of a valid IPv4 address -> Multiaddr
1024        #[test]
1025        fn test_v4_valid() {
1026            // Derive a multiaddr from a valid IPv4 address
1027            let addr = "1.1.1.1:8080".to_string();
1028            let multiaddr =
1029                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1030
1031            // Make sure it's the correct (quic) multiaddr
1032            assert_eq!(multiaddr.to_string(), "/ip4/1.1.1.1/udp/8080/quic-v1");
1033        }
1034
1035        /// Test derivation of a valid IPv6 address -> Multiaddr
1036        #[test]
1037        fn test_v6_valid() {
1038            // Derive a multiaddr from a valid IPv6 address
1039            let ipv6_addr = Ipv6Addr::new(1, 2, 3, 4, 5, 6, 7, 8);
1040            let addr = format!("{ipv6_addr}:8080");
1041            let multiaddr =
1042                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1043
1044            // Make sure it's the correct (quic) multiaddr
1045            assert_eq!(
1046                multiaddr.to_string(),
1047                format!("/ip6/{ipv6_addr}/udp/8080/quic-v1")
1048            );
1049        }
1050
1051        /// Test that an invalid address fails to derive to a Multiaddr
1052        #[test]
1053        fn test_no_port() {
1054            // Derive a multiaddr from an invalid port
1055            let addr = "1.1.1.1".to_string();
1056            let multiaddr = derive_libp2p_multiaddr(&addr);
1057
1058            // Make sure it fails
1059            assert!(multiaddr.is_err());
1060        }
1061
1062        /// Test that an existing domain name resolves to a Multiaddr
1063        #[test]
1064        fn test_fqdn_exists() {
1065            // Derive a multiaddr from a valid FQDN
1066            let addr = "example.com:8080".to_string();
1067            let multiaddr =
1068                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1069
1070            // Make sure it's the correct (quic) multiaddr
1071            assert_eq!(multiaddr.to_string(), "/dns/example.com/udp/8080/quic-v1");
1072        }
1073
1074        /// Test that a non-existent domain name still resolves to a Multiaddr
1075        #[test]
1076        fn test_fqdn_does_not_exist() {
1077            // Derive a multiaddr from an invalid FQDN
1078            let addr = "libp2p.example.com:8080".to_string();
1079            let multiaddr =
1080                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1081
1082            // Make sure it still worked
1083            assert_eq!(
1084                multiaddr.to_string(),
1085                "/dns/libp2p.example.com/udp/8080/quic-v1"
1086            );
1087        }
1088
1089        /// Test that a domain name without a port fails to derive to a Multiaddr
1090        #[test]
1091        fn test_fqdn_no_port() {
1092            // Derive a multiaddr from an invalid port
1093            let addr = "example.com".to_string();
1094            let multiaddr = derive_libp2p_multiaddr(&addr);
1095
1096            // Make sure it fails
1097            assert!(multiaddr.is_err());
1098        }
1099    }
1100}