hotshot/traits/networking/
libp2p_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Libp2p based/production networking implementation
8//! This module provides a libp2p based networking implementation where each node in the
9//! network forms a tcp or udp connection to a subset of other nodes in the network
10#[cfg(feature = "hotshot-testing")]
11use std::str::FromStr;
12use std::{
13    cmp::min,
14    collections::{BTreeSet, HashSet},
15    fmt::Debug,
16    net::{IpAddr, ToSocketAddrs},
17    num::NonZeroUsize,
18    sync::{
19        atomic::{AtomicBool, AtomicU64, Ordering},
20        Arc,
21    },
22    time::Duration,
23};
24
25use anyhow::{anyhow, Context};
26use async_lock::RwLock;
27use async_trait::async_trait;
28use bimap::BiMap;
29use futures::future::join_all;
30#[cfg(feature = "hotshot-testing")]
31use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtNoPersistence;
32pub use hotshot_libp2p_networking::network::{GossipConfig, RequestResponseConfig};
33use hotshot_libp2p_networking::{
34    network::{
35        behaviours::dht::{
36            record::{Namespace, RecordKey, RecordValue},
37            store::persistent::DhtPersistentStorage,
38        },
39        spawn_network_node,
40        transport::construct_auth_message,
41        NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg},
42        NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeReceiver,
43        DEFAULT_REPLICATION_FACTOR,
44    },
45    reexport::Multiaddr,
46};
47#[cfg(feature = "hotshot-testing")]
48use hotshot_types::traits::network::{
49    AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
50};
51use hotshot_types::{
52    boxed_sync,
53    constants::LOOK_AHEAD,
54    data::ViewNumber,
55    network::NetworkConfig,
56    traits::{
57        metrics::{Counter, Gauge, Metrics, NoMetrics},
58        network::{ConnectedNetwork, NetworkError, Topic},
59        node_implementation::{ConsensusTime, NodeType},
60        signature_key::{PrivateSignatureKey, SignatureKey},
61    },
62    BoxSyncFuture,
63};
64use libp2p_identity::{
65    ed25519::{self, SecretKey},
66    Keypair, PeerId,
67};
68use serde::Serialize;
69use tokio::{
70    select, spawn,
71    sync::{
72        mpsc::{channel, error::TrySendError, Receiver, Sender},
73        Mutex,
74    },
75    time::sleep,
76};
77use tracing::{error, info, instrument, trace, warn};
78
79use crate::{BroadcastDelay, EpochMembershipCoordinator};
80
81/// Libp2p-specific metrics
82#[derive(Clone, Debug)]
83pub struct Libp2pMetricsValue {
84    /// The number of currently connected peers
85    pub num_connected_peers: Box<dyn Gauge>,
86    /// The number of failed messages
87    pub num_failed_messages: Box<dyn Counter>,
88    /// Whether or not the network is considered ready
89    pub is_ready: Box<dyn Gauge>,
90}
91
92impl Libp2pMetricsValue {
93    /// Populate the metrics with Libp2p-specific metrics
94    pub fn new(metrics: &dyn Metrics) -> Self {
95        // Create a `libp2p subgroup
96        let subgroup = metrics.subgroup("libp2p".into());
97
98        // Create the metrics
99        Self {
100            num_connected_peers: subgroup.create_gauge("num_connected_peers".into(), None),
101            num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
102            is_ready: subgroup.create_gauge("is_ready".into(), None),
103        }
104    }
105}
106
107impl Default for Libp2pMetricsValue {
108    /// Initialize with empty metrics
109    fn default() -> Self {
110        Self::new(&*NoMetrics::boxed())
111    }
112}
113
114/// convenience alias for the type for bootstrap addresses
115/// concurrency primitives are needed for having tests
116pub type BootstrapAddrs = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
117
118/// hardcoded topic of QC used
119pub const QC_TOPIC: &str = "global";
120
121/// Stubbed out Ack
122///
123/// Note: as part of versioning for upgradability,
124/// all network messages must begin with a 4-byte version number.
125///
126/// Hence:
127///   * `Empty` *must* be a struct (enums are serialized with a leading byte for the variant), and
128///   * we must have an explicit version field.
129#[derive(Serialize)]
130pub struct Empty {
131    /// This should not be required, but it is. Version automatically gets prepended.
132    /// Perhaps this could be replaced with something zero-sized and serializable.
133    byte: u8,
134}
135
136impl<T: NodeType> Debug for Libp2pNetwork<T> {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        f.debug_struct("Libp2p").field("inner", &"inner").finish()
139    }
140}
141
142/// Type alias for a shared collection of peerid, multiaddrs
143pub type PeerInfoVec = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
144
145/// The underlying state of the libp2p network
146#[derive(Debug)]
147struct Libp2pNetworkInner<T: NodeType> {
148    /// this node's public key
149    pk: T::SignatureKey,
150    /// handle to control the network
151    handle: Arc<NetworkNodeHandle<T>>,
152    /// Message Receiver
153    receiver: Mutex<Receiver<Vec<u8>>>,
154    /// Sender for broadcast messages
155    sender: Sender<Vec<u8>>,
156    /// Sender for node lookup (relevant view number, key of node) (None for shutdown)
157    node_lookup_send: Sender<Option<(ViewNumber, T::SignatureKey)>>,
158    /// this is really cheating to enable local tests
159    /// hashset of (bootstrap_addr, peer_id)
160    bootstrap_addrs: PeerInfoVec,
161    /// whether or not the network is ready to send
162    is_ready: Arc<AtomicBool>,
163    /// max time before dropping message due to DHT error
164    dht_timeout: Duration,
165    /// whether or not we've bootstrapped into the DHT yet
166    is_bootstrapped: Arc<AtomicBool>,
167    /// The Libp2p metrics we're managing
168    metrics: Libp2pMetricsValue,
169    /// The list of topics we're subscribed to
170    subscribed_topics: HashSet<String>,
171    /// the latest view number (for node lookup purposes)
172    /// NOTE: supposed to represent a ViewNumber but we
173    /// haven't made that atomic yet and we prefer lock-free
174    latest_seen_view: Arc<AtomicU64>,
175    #[cfg(feature = "hotshot-testing")]
176    /// reliability_config
177    reliability_config: Option<Box<dyn NetworkReliability>>,
178    /// Killswitch sender
179    kill_switch: Sender<()>,
180}
181
182/// Networking implementation that uses libp2p
183/// generic over `M` which is the message type
184#[derive(Clone)]
185pub struct Libp2pNetwork<T: NodeType> {
186    /// holds the state of the libp2p network
187    inner: Arc<Libp2pNetworkInner<T>>,
188}
189
190#[cfg(feature = "hotshot-testing")]
191impl<T: NodeType> TestableNetworkingImplementation<T> for Libp2pNetwork<T> {
192    /// Returns a boxed function `f(node_id, public_key) -> Libp2pNetwork`
193    /// with the purpose of generating libp2p networks.
194    /// Generates `num_bootstrap` bootstrap nodes. The remainder of nodes are normal
195    /// nodes with sane defaults.
196    /// # Panics
197    /// Returned function may panic either:
198    /// - An invalid configuration
199    ///   (probably an issue with the defaults of this function)
200    /// - An inability to spin up the replica's network
201    #[allow(clippy::panic, clippy::too_many_lines)]
202    fn generator(
203        expected_node_count: usize,
204        num_bootstrap: usize,
205        _network_id: usize,
206        da_committee_size: usize,
207        reliability_config: Option<Box<dyn NetworkReliability>>,
208        _secondary_network_delay: Duration,
209    ) -> AsyncGenerator<Arc<Self>> {
210        assert!(
211            da_committee_size <= expected_node_count,
212            "DA committee size must be less than or equal to total # nodes"
213        );
214        let bootstrap_addrs: PeerInfoVec = Arc::default();
215        let node_ids: Arc<RwLock<HashSet<u64>>> = Arc::default();
216
217        // NOTE uncomment this for easier debugging
218        // let start_port = 5000;
219        Box::pin({
220            move |node_id| {
221                info!(
222                    "GENERATOR: Node id {:?}, is bootstrap: {:?}",
223                    node_id,
224                    node_id < num_bootstrap as u64
225                );
226
227                // pick a free, unused UDP port for testing
228                let port = portpicker::pick_unused_port().expect("Could not find an open port");
229
230                let addr =
231                    Multiaddr::from_str(&format!("/ip4/127.0.0.1/udp/{port}/quic-v1")).unwrap();
232
233                // We assign node's public key and stake value rather than read from config file since it's a test
234                let privkey = T::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
235                let pubkey = T::SignatureKey::from_private(&privkey);
236
237                // Derive the Libp2p keypair from the private key
238                let libp2p_keypair = derive_libp2p_keypair::<T::SignatureKey>(&privkey)
239                    .expect("Failed to derive libp2p keypair");
240
241                // Sign the lookup record
242                let lookup_record_value = RecordValue::new_signed(
243                    &RecordKey::new(Namespace::Lookup, pubkey.to_bytes()),
244                    libp2p_keypair.public().to_peer_id().to_bytes(),
245                    &privkey,
246                )
247                .expect("Failed to sign DHT lookup record");
248
249                // We want at least 2/3 of the nodes to have any given record in the DHT
250                let replication_factor =
251                    NonZeroUsize::new((2 * expected_node_count).div_ceil(3)).unwrap();
252
253                // Build the network node configuration
254                let config = NetworkNodeConfigBuilder::default()
255                    .keypair(libp2p_keypair)
256                    .replication_factor(replication_factor)
257                    .bind_address(Some(addr))
258                    .to_connect_addrs(HashSet::default())
259                    .republication_interval(None)
260                    .build()
261                    .expect("Failed to build network node config");
262
263                let bootstrap_addrs_ref = Arc::clone(&bootstrap_addrs);
264                let node_ids_ref = Arc::clone(&node_ids);
265                let reliability_config_dup = reliability_config.clone();
266
267                Box::pin(async move {
268                    // If it's the second time we are starting this network, clear the bootstrap info
269                    let mut write_ids = node_ids_ref.write().await;
270                    if write_ids.contains(&node_id) {
271                        write_ids.clear();
272                        bootstrap_addrs_ref.write().await.clear();
273                    }
274                    write_ids.insert(node_id);
275                    drop(write_ids);
276                    Arc::new(
277                        match Libp2pNetwork::new(
278                            Libp2pMetricsValue::default(),
279                            DhtNoPersistence,
280                            config,
281                            pubkey.clone(),
282                            lookup_record_value,
283                            bootstrap_addrs_ref,
284                            usize::try_from(node_id).unwrap(),
285                            #[cfg(feature = "hotshot-testing")]
286                            reliability_config_dup,
287                        )
288                        .await
289                        {
290                            Ok(network) => network,
291                            Err(err) => {
292                                panic!("Failed to create libp2p network: {err:?}");
293                            },
294                        },
295                    )
296                })
297            }
298        })
299    }
300
301    fn in_flight_message_count(&self) -> Option<usize> {
302        None
303    }
304}
305
306/// Derive a Libp2p keypair from a given private key
307///
308/// # Errors
309/// If we are unable to derive a new `SecretKey` from the `blake3`-derived
310/// bytes.
311pub fn derive_libp2p_keypair<K: SignatureKey>(
312    private_key: &K::PrivateKey,
313) -> anyhow::Result<Keypair> {
314    // Derive a secondary key from our primary private key
315    let derived_key = blake3::derive_key("libp2p key", &private_key.to_bytes());
316    let derived_key = SecretKey::try_from_bytes(derived_key)?;
317
318    // Create an `ed25519` keypair from the derived key
319    Ok(ed25519::Keypair::from(derived_key).into())
320}
321
322/// Derive a Libp2p Peer ID from a given private key
323///
324/// # Errors
325/// If we are unable to derive a Libp2p keypair
326pub fn derive_libp2p_peer_id<K: SignatureKey>(
327    private_key: &K::PrivateKey,
328) -> anyhow::Result<PeerId> {
329    // Get the derived keypair
330    let keypair = derive_libp2p_keypair::<K>(private_key)?;
331
332    // Return the PeerID derived from the public key
333    Ok(PeerId::from_public_key(&keypair.public()))
334}
335
336/// Parse a Libp2p Multiaddr from a string. The input string should be in the format
337/// `hostname:port` or `ip:port`. This function derives a `Multiaddr` from the input string.
338///
339/// This borrows from Rust's implementation of `to_socket_addrs` but will only warn if the domain
340/// does not yet resolve.
341///
342/// # Errors
343/// - If the input string is not in the correct format
344pub fn derive_libp2p_multiaddr(addr: &String) -> anyhow::Result<Multiaddr> {
345    // Split the address into the host and port parts
346    let (host, port) = match addr.rfind(':') {
347        Some(idx) => (&addr[..idx], &addr[idx + 1..]),
348        None => return Err(anyhow!("Invalid address format, no port supplied")),
349    };
350
351    // Try parsing the host as an IP address
352    let ip = host.parse::<IpAddr>();
353
354    // Conditionally build the multiaddr string
355    let multiaddr_string = match ip {
356        Ok(IpAddr::V4(ip)) => format!("/ip4/{ip}/udp/{port}/quic-v1"),
357        Ok(IpAddr::V6(ip)) => format!("/ip6/{ip}/udp/{port}/quic-v1"),
358        Err(_) => {
359            // Try resolving the host. If it fails, continue but warn the user
360            let lookup_result = addr.to_socket_addrs();
361
362            // See if the lookup failed
363            let failed = lookup_result
364                .map(|result| result.collect::<Vec<_>>().is_empty())
365                .unwrap_or(true);
366
367            // If it did, warn the user
368            if failed {
369                warn!(
370                    "Failed to resolve domain name {host}, assuming it has not yet been provisioned"
371                );
372            }
373
374            format!("/dns/{host}/udp/{port}/quic-v1")
375        },
376    };
377
378    // Convert the multiaddr string to a `Multiaddr`
379    multiaddr_string.parse().with_context(|| {
380        format!("Failed to convert Multiaddr string to Multiaddr: {multiaddr_string}")
381    })
382}
383
384impl<T: NodeType> Libp2pNetwork<T> {
385    /// Create and return a Libp2p network from a network config file
386    /// and various other configuration-specific values.
387    ///
388    /// # Errors
389    /// If we are unable to parse a Multiaddress
390    ///
391    /// # Panics
392    /// If we are unable to calculate the replication factor
393    #[allow(clippy::too_many_arguments)]
394    pub async fn from_config<D: DhtPersistentStorage>(
395        mut config: NetworkConfig<T>,
396        dht_persistent_storage: D,
397        quorum_membership: Arc<RwLock<T::Membership>>,
398        gossip_config: GossipConfig,
399        request_response_config: RequestResponseConfig,
400        bind_address: Multiaddr,
401        pub_key: &T::SignatureKey,
402        priv_key: &<T::SignatureKey as SignatureKey>::PrivateKey,
403        metrics: Libp2pMetricsValue,
404    ) -> anyhow::Result<Self> {
405        // Try to take our Libp2p config from our broader network config
406        let libp2p_config = config
407            .libp2p_config
408            .take()
409            .ok_or(anyhow!("Libp2p config not supplied"))?;
410
411        // Derive our Libp2p keypair from our supplied private key
412        let keypair = derive_libp2p_keypair::<T::SignatureKey>(priv_key)?;
413
414        // Build our libp2p configuration
415        let mut config_builder = NetworkNodeConfigBuilder::default();
416
417        // Set the gossip configuration
418        config_builder.gossip_config(gossip_config.clone());
419        config_builder.request_response_config(request_response_config);
420
421        // Construct the auth message
422        let auth_message =
423            construct_auth_message(pub_key, &keypair.public().to_peer_id(), priv_key)
424                .with_context(|| "Failed to construct auth message")?;
425
426        // Set the auth message and stake table
427        config_builder
428            .membership(Some(quorum_membership))
429            .auth_message(Some(auth_message));
430
431        // The replication factor is the minimum of [the default and 2/3 the number of nodes]
432        let Some(default_replication_factor) = DEFAULT_REPLICATION_FACTOR else {
433            return Err(anyhow!("Default replication factor not supplied"));
434        };
435
436        let replication_factor = NonZeroUsize::new(min(
437            default_replication_factor.get(),
438            config.config.num_nodes_with_stake.get() / 2,
439        ))
440        .with_context(|| "Failed to calculate replication factor")?;
441
442        // Sign our DHT lookup record
443        let lookup_record_value = RecordValue::new_signed(
444            &RecordKey::new(Namespace::Lookup, pub_key.to_bytes()),
445            // The value is our Libp2p Peer ID
446            keypair.public().to_peer_id().to_bytes(),
447            priv_key,
448        )
449        .with_context(|| "Failed to sign DHT lookup record")?;
450
451        config_builder
452            .keypair(keypair)
453            .replication_factor(replication_factor)
454            .bind_address(Some(bind_address.clone()));
455
456        // Connect to the provided bootstrap nodes
457        config_builder.to_connect_addrs(HashSet::from_iter(libp2p_config.bootstrap_nodes.clone()));
458
459        // Build the node's configuration
460        let node_config = config_builder.build()?;
461
462        // Calculate all keys so we can keep track of direct message recipients
463        let mut all_keys = BTreeSet::new();
464
465        // Insert all known nodes into the set of all keys
466        for node in config.config.known_nodes_with_stake {
467            all_keys.insert(T::SignatureKey::public_key(&node.stake_table_entry));
468        }
469
470        Ok(Libp2pNetwork::new(
471            metrics,
472            dht_persistent_storage,
473            node_config,
474            pub_key.clone(),
475            lookup_record_value,
476            Arc::new(RwLock::new(libp2p_config.bootstrap_nodes)),
477            usize::try_from(config.node_index)?,
478            #[cfg(feature = "hotshot-testing")]
479            None,
480        )
481        .await?)
482    }
483
484    /// Returns whether or not the network has any peers.
485    #[must_use]
486    pub fn has_peers(&self) -> bool {
487        self.inner.is_ready.load(Ordering::Relaxed)
488    }
489
490    /// Returns only when the network is ready.
491    pub async fn wait_for_peers(&self) {
492        loop {
493            if self.has_peers() {
494                break;
495            }
496            sleep(Duration::from_secs(1)).await;
497        }
498    }
499
500    /// Constructs new network for a node. Note that this network is unconnected.
501    /// One must call `connect` in order to connect.
502    /// * `config`: the configuration of the node
503    /// * `pk`: public key associated with the node
504    /// * `bootstrap_addrs`: rwlock containing the bootstrap addrs
505    /// # Errors
506    /// Returns error in the event that the underlying libp2p network
507    /// is unable to create a network.
508    ///
509    /// # Panics
510    ///
511    /// This will panic if there are less than 5 bootstrap nodes
512    #[allow(clippy::too_many_arguments)]
513    pub async fn new<D: DhtPersistentStorage>(
514        metrics: Libp2pMetricsValue,
515        dht_persistent_storage: D,
516        config: NetworkNodeConfig<T>,
517        pk: T::SignatureKey,
518        lookup_record_value: RecordValue<T::SignatureKey>,
519        bootstrap_addrs: BootstrapAddrs,
520        id: usize,
521        #[cfg(feature = "hotshot-testing")] reliability_config: Option<Box<dyn NetworkReliability>>,
522    ) -> Result<Libp2pNetwork<T>, NetworkError> {
523        // Create a map from consensus keys to Libp2p peer IDs
524        let consensus_key_to_pid_map = Arc::new(parking_lot::Mutex::new(BiMap::new()));
525
526        let (mut rx, network_handle) = spawn_network_node::<T, D>(
527            config.clone(),
528            dht_persistent_storage,
529            Arc::clone(&consensus_key_to_pid_map),
530            id,
531        )
532        .await
533        .map_err(|e| NetworkError::ConfigError(format!("failed to spawn network node: {e}")))?;
534
535        // Add our own address to the bootstrap addresses
536        let addr = network_handle.listen_addr();
537        let pid = network_handle.peer_id();
538        bootstrap_addrs.write().await.push((pid, addr));
539
540        // Subscribe to the relevant topics
541        let subscribed_topics = HashSet::from_iter(vec![QC_TOPIC.to_string()]);
542
543        // unbounded channels may not be the best choice (spammed?)
544        // if bounded figure out a way to log dropped msgs
545        let (sender, receiver) = channel(1000);
546        let (node_lookup_send, node_lookup_recv) = channel(10);
547        let (kill_tx, kill_rx) = channel(1);
548        rx.set_kill_switch(kill_rx);
549
550        let mut result = Libp2pNetwork {
551            inner: Arc::new(Libp2pNetworkInner {
552                handle: Arc::new(network_handle),
553                receiver: Mutex::new(receiver),
554                sender: sender.clone(),
555                pk,
556                bootstrap_addrs,
557                is_ready: Arc::new(AtomicBool::new(false)),
558                // This is optimal for 10-30 nodes. TODO: parameterize this for both tests and examples
559                dht_timeout: config.dht_timeout.unwrap_or(Duration::from_secs(120)),
560                is_bootstrapped: Arc::new(AtomicBool::new(false)),
561                metrics,
562                subscribed_topics,
563                node_lookup_send,
564                // Start the latest view from 0. "Latest" refers to "most recent view we are polling for
565                // proposals on". We need this because to have consensus info injected we need a working
566                // network already. In the worst case, we send a few lookups we don't need.
567                latest_seen_view: Arc::new(AtomicU64::new(0)),
568                #[cfg(feature = "hotshot-testing")]
569                reliability_config,
570                kill_switch: kill_tx,
571            }),
572        };
573
574        // Set the network as not ready
575        result.inner.metrics.is_ready.set(0);
576
577        result.handle_event_generator(sender, rx);
578        result.spawn_node_lookup(node_lookup_recv);
579        result.spawn_connect(id, lookup_record_value);
580
581        Ok(result)
582    }
583
584    /// Spawns task for looking up nodes pre-emptively
585    #[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)]
586    fn spawn_node_lookup(
587        &self,
588        mut node_lookup_recv: Receiver<Option<(ViewNumber, T::SignatureKey)>>,
589    ) {
590        let handle = Arc::clone(&self.inner.handle);
591        let dht_timeout = self.inner.dht_timeout;
592        let latest_seen_view = Arc::clone(&self.inner.latest_seen_view);
593
594        // deals with handling lookup queue. should be infallible
595        spawn(async move {
596            // cancels on shutdown
597            while let Some(Some((view_number, pk))) = node_lookup_recv.recv().await {
598                /// defines lookahead threshold based on the constant
599                #[allow(clippy::cast_possible_truncation)]
600                const THRESHOLD: u64 = (LOOK_AHEAD as f64 * 0.8) as u64;
601
602                trace!("Performing lookup for peer {pk}");
603
604                // only run if we are not too close to the next view number
605                if latest_seen_view.load(Ordering::Relaxed) + THRESHOLD <= *view_number {
606                    // look up
607                    if let Err(err) = handle.lookup_node(&pk, dht_timeout).await {
608                        warn!("Failed to perform lookup for key {pk}: {err}");
609                    };
610                }
611            }
612        });
613    }
614
615    /// Initiates connection to the outside world
616    fn spawn_connect(&mut self, id: usize, lookup_record_value: RecordValue<T::SignatureKey>) {
617        let pk = self.inner.pk.clone();
618        let bootstrap_ref = Arc::clone(&self.inner.bootstrap_addrs);
619        let handle = Arc::clone(&self.inner.handle);
620        let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
621        let inner = Arc::clone(&self.inner);
622
623        spawn({
624            let is_ready = Arc::clone(&self.inner.is_ready);
625            async move {
626                let bs_addrs = bootstrap_ref.read().await.clone();
627
628                // Add known peers to the network
629                handle.add_known_peers(bs_addrs).unwrap();
630
631                // Begin the bootstrap process
632                handle.begin_bootstrap()?;
633                while !is_bootstrapped.load(Ordering::Relaxed) {
634                    sleep(Duration::from_secs(1)).await;
635                    handle.begin_bootstrap()?;
636                }
637
638                // Subscribe to the QC topic
639                handle.subscribe(QC_TOPIC.to_string()).await.unwrap();
640
641                // Map our staking key to our Libp2p Peer ID so we can properly
642                // route direct messages
643                while handle
644                    .put_record(
645                        RecordKey::new(Namespace::Lookup, pk.to_bytes()),
646                        lookup_record_value.clone(),
647                    )
648                    .await
649                    .is_err()
650                {
651                    sleep(Duration::from_secs(1)).await;
652                }
653
654                // Wait for the network to connect to at least 1 peer
655                if let Err(e) = handle.wait_to_connect(1, id).await {
656                    error!("Failed to connect to peers: {e:?}");
657                    return Err::<(), NetworkError>(e);
658                }
659                info!("Connected to required number of peers");
660
661                // Set the network as ready
662                is_ready.store(true, Ordering::Relaxed);
663                inner.metrics.is_ready.set(1);
664
665                Ok::<(), NetworkError>(())
666            }
667        });
668    }
669
670    /// Handle events
671    fn handle_recvd_events(
672        &self,
673        msg: NetworkEvent,
674        sender: &Sender<Vec<u8>>,
675    ) -> Result<(), NetworkError> {
676        match msg {
677            GossipMsg(msg) => {
678                sender.try_send(msg).map_err(|err| {
679                    NetworkError::ChannelSendError(format!("failed to send gossip message: {err}"))
680                })?;
681            },
682            DirectRequest(msg, _pid, chan) => {
683                sender.try_send(msg).map_err(|err| {
684                    NetworkError::ChannelSendError(format!(
685                        "failed to send direct request message: {err}"
686                    ))
687                })?;
688                if self
689                    .inner
690                    .handle
691                    .direct_response(
692                        chan,
693                        &bincode::serialize(&Empty { byte: 0u8 }).map_err(|e| {
694                            NetworkError::FailedToSerialize(format!(
695                                "failed to serialize acknowledgement: {e}"
696                            ))
697                        })?,
698                    )
699                    .is_err()
700                {
701                    error!("failed to ack!");
702                };
703            },
704            DirectResponse(_msg, _) => {},
705            NetworkEvent::IsBootstrapped => {
706                error!("handle_recvd_events received `NetworkEvent::IsBootstrapped`, which should be impossible.");
707            },
708            NetworkEvent::ConnectedPeersUpdate(_) => {},
709        }
710        Ok::<(), NetworkError>(())
711    }
712
713    /// task to propagate messages to handlers
714    /// terminates on shut down of network
715    fn handle_event_generator(&self, sender: Sender<Vec<u8>>, mut network_rx: NetworkNodeReceiver) {
716        let handle = self.clone();
717        let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
718        spawn(async move {
719            let Some(mut kill_switch) = network_rx.take_kill_switch() else {
720                tracing::error!(
721                    "`spawn_handle` was called on a network handle that was already closed"
722                );
723                return;
724            };
725
726            loop {
727                select! {
728                    msg = network_rx.recv() => {
729                        let Ok(message) = msg else {
730                            warn!("Network receiver shut down!");
731                            return;
732                        };
733
734                        match message {
735                            NetworkEvent::IsBootstrapped => {
736                                is_bootstrapped.store(true, Ordering::Relaxed);
737                            }
738                            GossipMsg(_) | DirectRequest(_, _, _) | DirectResponse(_, _) => {
739                                let _ = handle.handle_recvd_events(message, &sender);
740                            }
741                            NetworkEvent::ConnectedPeersUpdate(num_peers) => {
742                                handle.inner.metrics.num_connected_peers.set(num_peers);
743                            }
744                        }
745                    }
746
747                    _kill_switch = kill_switch.recv() => {
748                        warn!("Event Handler shutdown");
749                        return;
750                    }
751                }
752            }
753        });
754    }
755}
756
757#[async_trait]
758impl<T: NodeType> ConnectedNetwork<T::SignatureKey> for Libp2pNetwork<T> {
759    #[instrument(name = "Libp2pNetwork::ready_blocking", skip_all)]
760    async fn wait_for_ready(&self) {
761        self.wait_for_peers().await;
762    }
763
764    fn pause(&self) {
765        unimplemented!("Pausing not implemented for the Libp2p network");
766    }
767
768    fn resume(&self) {
769        unimplemented!("Resuming not implemented for the Libp2p network");
770    }
771
772    #[instrument(name = "Libp2pNetwork::shut_down", skip_all)]
773    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
774    where
775        'a: 'b,
776        Self: 'b,
777    {
778        let closure = async move {
779            let _ = self.inner.handle.shutdown().await;
780            let _ = self.inner.node_lookup_send.send(None).await;
781            let _ = self.inner.kill_switch.send(()).await;
782        };
783        boxed_sync(closure)
784    }
785
786    #[instrument(name = "Libp2pNetwork::broadcast_message", skip_all)]
787    async fn broadcast_message(
788        &self,
789        message: Vec<u8>,
790        topic: Topic,
791        _broadcast_delay: BroadcastDelay,
792    ) -> Result<(), NetworkError> {
793        // If we're not ready yet (we don't have any peers, error)
794        if !self.has_peers() {
795            self.inner.metrics.num_failed_messages.add(1);
796            return Err(NetworkError::NoPeersYet);
797        };
798
799        // If we are subscribed to the topic,
800        let topic = topic.to_string();
801        if self.inner.subscribed_topics.contains(&topic) {
802            // Short-circuit-send the message to ourselves
803            self.inner.sender.try_send(message.clone()).map_err(|_| {
804                self.inner.metrics.num_failed_messages.add(1);
805                NetworkError::ShutDown
806            })?;
807        }
808
809        // NOTE: metrics is threadsafe, so clone is fine (and lightweight)
810        #[cfg(feature = "hotshot-testing")]
811        {
812            let metrics = self.inner.metrics.clone();
813            if let Some(ref config) = &self.inner.reliability_config {
814                let handle = Arc::clone(&self.inner.handle);
815
816                let fut = config.clone().chaos_send_msg(
817                    message,
818                    Arc::new(move |msg: Vec<u8>| {
819                        let topic_2 = topic.clone();
820                        let handle_2 = Arc::clone(&handle);
821                        let metrics_2 = metrics.clone();
822                        boxed_sync(async move {
823                            if let Err(e) = handle_2.gossip_no_serialize(topic_2, msg) {
824                                metrics_2.num_failed_messages.add(1);
825                                warn!("Failed to broadcast to libp2p: {e:?}");
826                            }
827                        })
828                    }),
829                );
830                spawn(fut);
831                return Ok(());
832            }
833        }
834
835        if let Err(e) = self.inner.handle.gossip(topic, &message) {
836            self.inner.metrics.num_failed_messages.add(1);
837            return Err(e);
838        }
839
840        Ok(())
841    }
842
843    #[instrument(name = "Libp2pNetwork::da_broadcast_message", skip_all)]
844    async fn da_broadcast_message(
845        &self,
846        message: Vec<u8>,
847        recipients: Vec<T::SignatureKey>,
848        _broadcast_delay: BroadcastDelay,
849    ) -> Result<(), NetworkError> {
850        // If we're not ready yet (we don't have any peers, error)
851        if !self.has_peers() {
852            self.inner.metrics.num_failed_messages.add(1);
853            return Err(NetworkError::NoPeersYet);
854        };
855
856        let future_results = recipients
857            .into_iter()
858            .map(|r| self.direct_message(message.clone(), r));
859        let results = join_all(future_results).await;
860
861        let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
862
863        if errors.is_empty() {
864            Ok(())
865        } else {
866            Err(NetworkError::Multiple(errors))
867        }
868    }
869
870    #[instrument(name = "Libp2pNetwork::direct_message", skip_all)]
871    async fn direct_message(
872        &self,
873        message: Vec<u8>,
874        recipient: T::SignatureKey,
875    ) -> Result<(), NetworkError> {
876        // If we're not ready yet (we don't have any peers, error)
877        if !self.has_peers() {
878            self.inner.metrics.num_failed_messages.add(1);
879            return Err(NetworkError::NoPeersYet);
880        };
881
882        // short circuit if we're dming ourselves
883        if recipient == self.inner.pk {
884            // panic if we already shut down?
885            self.inner.sender.try_send(message).map_err(|_x| {
886                self.inner.metrics.num_failed_messages.add(1);
887                NetworkError::ShutDown
888            })?;
889            return Ok(());
890        }
891
892        let pid = match self
893            .inner
894            .handle
895            .lookup_node(&recipient, self.inner.dht_timeout)
896            .await
897        {
898            Ok(pid) => pid,
899            Err(err) => {
900                self.inner.metrics.num_failed_messages.add(1);
901                return Err(NetworkError::LookupError(format!(
902                    "failed to look up node for direct message: {err}"
903                )));
904            },
905        };
906
907        #[cfg(feature = "hotshot-testing")]
908        {
909            let metrics = self.inner.metrics.clone();
910            if let Some(ref config) = &self.inner.reliability_config {
911                let handle = Arc::clone(&self.inner.handle);
912
913                let fut = config.clone().chaos_send_msg(
914                    message,
915                    Arc::new(move |msg: Vec<u8>| {
916                        let handle_2 = Arc::clone(&handle);
917                        let metrics_2 = metrics.clone();
918                        boxed_sync(async move {
919                            if let Err(e) = handle_2.direct_request_no_serialize(pid, msg) {
920                                metrics_2.num_failed_messages.add(1);
921                                warn!("Failed to broadcast to libp2p: {e:?}");
922                            }
923                        })
924                    }),
925                );
926                spawn(fut);
927                return Ok(());
928            }
929        }
930
931        match self.inner.handle.direct_request(pid, &message) {
932            Ok(()) => Ok(()),
933            Err(e) => {
934                self.inner.metrics.num_failed_messages.add(1);
935                Err(e)
936            },
937        }
938    }
939
940    /// Receive one or many messages from the underlying network.
941    ///
942    /// # Errors
943    /// If there is a network-related failure.
944    #[instrument(name = "Libp2pNetwork::recv_message", skip_all)]
945    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
946        let result = self
947            .inner
948            .receiver
949            .lock()
950            .await
951            .recv()
952            .await
953            .ok_or(NetworkError::ShutDown)?;
954
955        Ok(result)
956    }
957
958    #[instrument(name = "Libp2pNetwork::queue_node_lookup", skip_all)]
959    #[allow(clippy::type_complexity)]
960    fn queue_node_lookup(
961        &self,
962        view_number: ViewNumber,
963        pk: T::SignatureKey,
964    ) -> Result<(), TrySendError<Option<(ViewNumber, T::SignatureKey)>>> {
965        self.inner
966            .node_lookup_send
967            .try_send(Some((view_number, pk)))
968    }
969
970    /// The libp2p view update is a special operation intrinsic to its internal behavior.
971    ///
972    /// Libp2p needs to do a lookup because a libp2p address is not related to
973    /// hotshot keys. So in libp2p we store a mapping of HotShot key to libp2p address
974    /// in a distributed hash table.
975    ///
976    /// This means to directly message someone on libp2p we need to lookup in the hash
977    /// table what their libp2p address is, using their HotShot public key as the key.
978    ///
979    /// So the logic with libp2p is to prefetch upcoming leaders libp2p address to
980    /// save time when we later need to direct message the leader our vote. Hence the
981    /// use of the future view and leader to queue the lookups.
982    async fn update_view<'a, TYPES>(
983        &'a self,
984        view: u64,
985        epoch: Option<u64>,
986        membership_coordinator: EpochMembershipCoordinator<TYPES>,
987    ) where
988        TYPES: NodeType<SignatureKey = T::SignatureKey> + 'a,
989    {
990        let future_view = <TYPES as NodeType>::View::new(view) + LOOK_AHEAD;
991        let epoch = epoch.map(<TYPES as NodeType>::Epoch::new);
992
993        let membership = match membership_coordinator.membership_for_epoch(epoch).await {
994            Ok(m) => m,
995            Err(e) => {
996                return tracing::warn!(e.message);
997            },
998        };
999        let future_leader = match membership.leader(future_view).await {
1000            Ok(l) => l,
1001            Err(e) => {
1002                return tracing::info!("Failed to calculate leader for view {future_view}: {e}");
1003            },
1004        };
1005
1006        let _ = self
1007            .queue_node_lookup(ViewNumber::new(*future_view), future_leader)
1008            .map_err(|err| tracing::warn!("failed to process node lookup request: {err}"));
1009    }
1010}
1011
1012#[cfg(test)]
1013mod test {
1014    mod derive_multiaddr {
1015        use std::net::Ipv6Addr;
1016
1017        use super::super::*;
1018
1019        /// Test derivation of a valid IPv4 address -> Multiaddr
1020        #[test]
1021        fn test_v4_valid() {
1022            // Derive a multiaddr from a valid IPv4 address
1023            let addr = "1.1.1.1:8080".to_string();
1024            let multiaddr =
1025                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1026
1027            // Make sure it's the correct (quic) multiaddr
1028            assert_eq!(multiaddr.to_string(), "/ip4/1.1.1.1/udp/8080/quic-v1");
1029        }
1030
1031        /// Test derivation of a valid IPv6 address -> Multiaddr
1032        #[test]
1033        fn test_v6_valid() {
1034            // Derive a multiaddr from a valid IPv6 address
1035            let ipv6_addr = Ipv6Addr::new(1, 2, 3, 4, 5, 6, 7, 8);
1036            let addr = format!("{ipv6_addr}:8080");
1037            let multiaddr =
1038                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1039
1040            // Make sure it's the correct (quic) multiaddr
1041            assert_eq!(
1042                multiaddr.to_string(),
1043                format!("/ip6/{ipv6_addr}/udp/8080/quic-v1")
1044            );
1045        }
1046
1047        /// Test that an invalid address fails to derive to a Multiaddr
1048        #[test]
1049        fn test_no_port() {
1050            // Derive a multiaddr from an invalid port
1051            let addr = "1.1.1.1".to_string();
1052            let multiaddr = derive_libp2p_multiaddr(&addr);
1053
1054            // Make sure it fails
1055            assert!(multiaddr.is_err());
1056        }
1057
1058        /// Test that an existing domain name resolves to a Multiaddr
1059        #[test]
1060        fn test_fqdn_exists() {
1061            // Derive a multiaddr from a valid FQDN
1062            let addr = "example.com:8080".to_string();
1063            let multiaddr =
1064                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1065
1066            // Make sure it's the correct (quic) multiaddr
1067            assert_eq!(multiaddr.to_string(), "/dns/example.com/udp/8080/quic-v1");
1068        }
1069
1070        /// Test that a non-existent domain name still resolves to a Multiaddr
1071        #[test]
1072        fn test_fqdn_does_not_exist() {
1073            // Derive a multiaddr from an invalid FQDN
1074            let addr = "libp2p.example.com:8080".to_string();
1075            let multiaddr =
1076                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1077
1078            // Make sure it still worked
1079            assert_eq!(
1080                multiaddr.to_string(),
1081                "/dns/libp2p.example.com/udp/8080/quic-v1"
1082            );
1083        }
1084
1085        /// Test that a domain name without a port fails to derive to a Multiaddr
1086        #[test]
1087        fn test_fqdn_no_port() {
1088            // Derive a multiaddr from an invalid port
1089            let addr = "example.com".to_string();
1090            let multiaddr = derive_libp2p_multiaddr(&addr);
1091
1092            // Make sure it fails
1093            assert!(multiaddr.is_err());
1094        }
1095    }
1096}