hotshot/traits/networking/
libp2p_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Libp2p based/production networking implementation
8//! This module provides a libp2p based networking implementation where each node in the
9//! network forms a tcp or udp connection to a subset of other nodes in the network
10use std::{
11    cmp::min,
12    collections::{BTreeSet, HashSet},
13    fmt::Debug,
14    net::{IpAddr, ToSocketAddrs},
15    num::NonZeroUsize,
16    sync::{
17        Arc,
18        atomic::{AtomicBool, AtomicU64, Ordering},
19    },
20    time::Duration,
21};
22#[cfg(feature = "hotshot-testing")]
23use std::{collections::HashMap, str::FromStr};
24
25use anyhow::{Context, anyhow};
26use async_lock::RwLock;
27use async_trait::async_trait;
28use bimap::BiMap;
29use futures::future::join_all;
30#[cfg(feature = "hotshot-testing")]
31use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtNoPersistence;
32pub use hotshot_libp2p_networking::network::{GossipConfig, RequestResponseConfig};
33use hotshot_libp2p_networking::{
34    network::{
35        DEFAULT_REPLICATION_FACTOR,
36        NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg},
37        NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeReceiver,
38        behaviours::dht::{
39            record::{Namespace, RecordKey, RecordValue},
40            store::persistent::DhtPersistentStorage,
41        },
42        spawn_network_node,
43        transport::construct_auth_message,
44    },
45    reexport::Multiaddr,
46};
47use hotshot_types::{
48    BoxSyncFuture, boxed_sync,
49    constants::LOOK_AHEAD,
50    data::{EpochNumber, ViewNumber},
51    network::NetworkConfig,
52    traits::{
53        metrics::{Counter, Gauge, Metrics, NoMetrics},
54        network::{ConnectedNetwork, NetworkError, Topic},
55        node_implementation::NodeType,
56        signature_key::{PrivateSignatureKey, SignatureKey},
57    },
58};
59#[cfg(feature = "hotshot-testing")]
60use hotshot_types::{
61    PeerConnectInfo,
62    traits::network::{AsyncGenerator, NetworkReliability, TestableNetworkingImplementation},
63};
64use libp2p_identity::{
65    Keypair, PeerId,
66    ed25519::{self, SecretKey},
67};
68use serde::Serialize;
69use tokio::{
70    select, spawn,
71    sync::{
72        Mutex,
73        mpsc::{Receiver, Sender, channel, error::TrySendError},
74    },
75    time::sleep,
76};
77use tracing::{error, info, instrument, trace, warn};
78
79use crate::{BroadcastDelay, EpochMembershipCoordinator};
80
81/// Libp2p-specific metrics
82#[derive(Clone, Debug)]
83pub struct Libp2pMetricsValue {
84    /// The number of currently connected peers
85    pub num_connected_peers: Box<dyn Gauge>,
86    /// The number of failed messages
87    pub num_failed_messages: Box<dyn Counter>,
88    /// Whether or not the network is considered ready
89    pub is_ready: Box<dyn Gauge>,
90}
91
92impl Libp2pMetricsValue {
93    /// Populate the metrics with Libp2p-specific metrics
94    pub fn new(metrics: &dyn Metrics) -> Self {
95        // Create a `libp2p subgroup
96        let subgroup = metrics.subgroup("libp2p".into());
97
98        // Create the metrics
99        Self {
100            num_connected_peers: subgroup.create_gauge("num_connected_peers".into(), None),
101            num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
102            is_ready: subgroup.create_gauge("is_ready".into(), None),
103        }
104    }
105}
106
107impl Default for Libp2pMetricsValue {
108    /// Initialize with empty metrics
109    fn default() -> Self {
110        Self::new(&*NoMetrics::boxed())
111    }
112}
113
114/// convenience alias for the type for bootstrap addresses
115/// concurrency primitives are needed for having tests
116pub type BootstrapAddrs = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
117
118/// hardcoded topic of QC used
119pub const QC_TOPIC: &str = "global";
120
121/// Stubbed out Ack
122///
123/// Note: as part of versioning for upgradability,
124/// all network messages must begin with a 4-byte version number.
125///
126/// Hence:
127///   * `Empty` *must* be a struct (enums are serialized with a leading byte for the variant), and
128///   * we must have an explicit version field.
129#[derive(Serialize)]
130pub struct Empty {
131    /// This should not be required, but it is. Version automatically gets prepended.
132    /// Perhaps this could be replaced with something zero-sized and serializable.
133    byte: u8,
134}
135
136impl<T: NodeType> Debug for Libp2pNetwork<T> {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        f.debug_struct("Libp2p").field("inner", &"inner").finish()
139    }
140}
141
142/// Type alias for a shared collection of peerid, multiaddrs
143pub type PeerInfoVec = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
144
145/// The underlying state of the libp2p network
146#[derive(Debug)]
147struct Libp2pNetworkInner<T: NodeType> {
148    /// this node's public key
149    pk: T::SignatureKey,
150    /// handle to control the network
151    handle: Arc<NetworkNodeHandle<T>>,
152    /// Message Receiver
153    receiver: Mutex<Receiver<Vec<u8>>>,
154    /// Sender for broadcast messages
155    sender: Sender<Vec<u8>>,
156    /// Sender for node lookup (relevant view number, key of node) (None for shutdown)
157    node_lookup_send: Sender<Option<(ViewNumber, T::SignatureKey)>>,
158    /// this is really cheating to enable local tests
159    /// hashset of (bootstrap_addr, peer_id)
160    bootstrap_addrs: PeerInfoVec,
161    /// whether or not the network is ready to send
162    is_ready: Arc<AtomicBool>,
163    /// max time before dropping message due to DHT error
164    dht_timeout: Duration,
165    /// whether or not we've bootstrapped into the DHT yet
166    is_bootstrapped: Arc<AtomicBool>,
167    /// The Libp2p metrics we're managing
168    metrics: Libp2pMetricsValue,
169    /// The list of topics we're subscribed to
170    subscribed_topics: HashSet<String>,
171    /// the latest view number (for node lookup purposes)
172    /// NOTE: supposed to represent a ViewNumber but we
173    /// haven't made that atomic yet and we prefer lock-free
174    latest_seen_view: Arc<AtomicU64>,
175    #[cfg(feature = "hotshot-testing")]
176    /// reliability_config
177    reliability_config: Option<Box<dyn NetworkReliability>>,
178    /// Killswitch sender
179    kill_switch: Sender<()>,
180}
181
182/// Networking implementation that uses libp2p
183/// generic over `M` which is the message type
184#[derive(Clone)]
185pub struct Libp2pNetwork<T: NodeType> {
186    /// holds the state of the libp2p network
187    inner: Arc<Libp2pNetworkInner<T>>,
188}
189
190#[cfg(feature = "hotshot-testing")]
191impl<T: NodeType> TestableNetworkingImplementation<T> for Libp2pNetwork<T> {
192    /// Returns a boxed function `f(node_id, public_key) -> Libp2pNetwork`
193    /// with the purpose of generating libp2p networks.
194    /// Generates `num_bootstrap` bootstrap nodes. The remainder of nodes are normal
195    /// nodes with sane defaults.
196    /// # Panics
197    /// Returned function may panic either:
198    /// - An invalid configuration
199    ///   (probably an issue with the defaults of this function)
200    /// - An inability to spin up the replica's network
201    #[allow(clippy::panic, clippy::too_many_lines)]
202    fn generator(
203        expected_node_count: usize,
204        num_bootstrap: usize,
205        _network_id: usize,
206        da_committee_size: usize,
207        reliability_config: Option<Box<dyn NetworkReliability>>,
208        _secondary_network_delay: Duration,
209        _connect_infos: &mut HashMap<T::SignatureKey, PeerConnectInfo>,
210    ) -> AsyncGenerator<Arc<Self>> {
211        assert!(
212            da_committee_size <= expected_node_count,
213            "DA committee size must be less than or equal to total # nodes"
214        );
215        let bootstrap_addrs: PeerInfoVec = Arc::default();
216        let node_ids: Arc<RwLock<HashSet<u64>>> = Arc::default();
217
218        // NOTE uncomment this for easier debugging
219        // let start_port = 5000;
220        Box::pin({
221            move |node_id| {
222                info!(
223                    "GENERATOR: Node id {:?}, is bootstrap: {:?}",
224                    node_id,
225                    node_id < num_bootstrap as u64
226                );
227
228                // UDP has no TIME_WAIT, so there's a tiny race before libp2p binds.
229                let port = std::net::UdpSocket::bind("127.0.0.1:0")
230                    .expect("UDP socket should bind")
231                    .local_addr()
232                    .expect("UDP socket should have local addr")
233                    .port();
234
235                let addr =
236                    Multiaddr::from_str(&format!("/ip4/127.0.0.1/udp/{port}/quic-v1")).unwrap();
237
238                // We assign node's public key and stake value rather than read from config file since it's a test
239                let privkey = T::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
240                let pubkey = T::SignatureKey::from_private(&privkey);
241
242                // Derive the Libp2p keypair from the private key
243                let libp2p_keypair = derive_libp2p_keypair::<T::SignatureKey>(&privkey)
244                    .expect("Failed to derive libp2p keypair");
245
246                // Sign the lookup record
247                let lookup_record_value = RecordValue::new_signed(
248                    &RecordKey::new(Namespace::Lookup, pubkey.to_bytes()),
249                    libp2p_keypair.public().to_peer_id().to_bytes(),
250                    &privkey,
251                )
252                .expect("Failed to sign DHT lookup record");
253
254                // We want at least 2/3 of the nodes to have any given record in the DHT
255                let replication_factor =
256                    NonZeroUsize::new((2 * expected_node_count).div_ceil(3)).unwrap();
257
258                // Build the network node configuration
259                let config = NetworkNodeConfigBuilder::default()
260                    .keypair(libp2p_keypair)
261                    .replication_factor(replication_factor)
262                    .bind_address(Some(addr))
263                    .to_connect_addrs(HashSet::default())
264                    .republication_interval(None)
265                    .build()
266                    .expect("Failed to build network node config");
267
268                let bootstrap_addrs_ref = Arc::clone(&bootstrap_addrs);
269                let node_ids_ref = Arc::clone(&node_ids);
270                let reliability_config_dup = reliability_config.clone();
271
272                Box::pin(async move {
273                    // If it's the second time we are starting this network, clear the bootstrap info
274                    let mut write_ids = node_ids_ref.write().await;
275                    if write_ids.contains(&node_id) {
276                        write_ids.clear();
277                    }
278                    write_ids.insert(node_id);
279                    drop(write_ids);
280                    Arc::new(
281                        match Libp2pNetwork::new(
282                            Libp2pMetricsValue::default(),
283                            DhtNoPersistence,
284                            config,
285                            pubkey.clone(),
286                            lookup_record_value,
287                            bootstrap_addrs_ref,
288                            usize::try_from(node_id).unwrap(),
289                            #[cfg(feature = "hotshot-testing")]
290                            reliability_config_dup,
291                        )
292                        .await
293                        {
294                            Ok(network) => network,
295                            Err(err) => {
296                                panic!("Failed to create libp2p network: {err:?}");
297                            },
298                        },
299                    )
300                })
301            }
302        })
303    }
304
305    fn in_flight_message_count(&self) -> Option<usize> {
306        None
307    }
308}
309
310/// Derive a Libp2p keypair from a given private key
311///
312/// # Errors
313/// If we are unable to derive a new `SecretKey` from the `blake3`-derived
314/// bytes.
315pub fn derive_libp2p_keypair<K: SignatureKey>(
316    private_key: &K::PrivateKey,
317) -> anyhow::Result<Keypair> {
318    // Derive a secondary key from our primary private key
319    let derived_key = blake3::derive_key("libp2p key", &private_key.to_bytes());
320    let derived_key = SecretKey::try_from_bytes(derived_key)?;
321
322    // Create an `ed25519` keypair from the derived key
323    Ok(ed25519::Keypair::from(derived_key).into())
324}
325
326/// Derive a Libp2p Peer ID from a given private key
327///
328/// # Errors
329/// If we are unable to derive a Libp2p keypair
330pub fn derive_libp2p_peer_id<K: SignatureKey>(
331    private_key: &K::PrivateKey,
332) -> anyhow::Result<PeerId> {
333    // Get the derived keypair
334    let keypair = derive_libp2p_keypair::<K>(private_key)?;
335
336    // Return the PeerID derived from the public key
337    Ok(PeerId::from_public_key(&keypair.public()))
338}
339
340/// Parse a Libp2p Multiaddr from a string. The input string should be in the format
341/// `hostname:port` or `ip:port`. This function derives a `Multiaddr` from the input string.
342///
343/// This borrows from Rust's implementation of `to_socket_addrs` but will only warn if the domain
344/// does not yet resolve.
345///
346/// # Errors
347/// - If the input string is not in the correct format
348pub fn derive_libp2p_multiaddr(addr: &String) -> anyhow::Result<Multiaddr> {
349    // Split the address into the host and port parts
350    let (host, port) = match addr.rfind(':') {
351        Some(idx) => (&addr[..idx], &addr[idx + 1..]),
352        None => return Err(anyhow!("Invalid address format, no port supplied")),
353    };
354
355    // Try parsing the host as an IP address
356    let ip = host.parse::<IpAddr>();
357
358    // Conditionally build the multiaddr string
359    let multiaddr_string = match ip {
360        Ok(IpAddr::V4(ip)) => format!("/ip4/{ip}/udp/{port}/quic-v1"),
361        Ok(IpAddr::V6(ip)) => format!("/ip6/{ip}/udp/{port}/quic-v1"),
362        Err(_) => {
363            // Try resolving the host. If it fails, continue but warn the user
364            let lookup_result = addr.to_socket_addrs();
365
366            // See if the lookup failed
367            let failed = lookup_result
368                .map(|result| result.collect::<Vec<_>>().is_empty())
369                .unwrap_or(true);
370
371            // If it did, warn the user
372            if failed {
373                warn!(
374                    "Failed to resolve domain name {host}, assuming it has not yet been \
375                     provisioned"
376                );
377            }
378
379            format!("/dns/{host}/udp/{port}/quic-v1")
380        },
381    };
382
383    // Convert the multiaddr string to a `Multiaddr`
384    multiaddr_string.parse().with_context(|| {
385        format!("Failed to convert Multiaddr string to Multiaddr: {multiaddr_string}")
386    })
387}
388
389impl<T: NodeType> Libp2pNetwork<T> {
390    /// Create and return a Libp2p network from a network config file
391    /// and various other configuration-specific values.
392    ///
393    /// # Errors
394    /// If we are unable to parse a Multiaddress
395    ///
396    /// # Panics
397    /// If we are unable to calculate the replication factor
398    #[allow(clippy::too_many_arguments)]
399    pub async fn from_config<D: DhtPersistentStorage>(
400        mut config: NetworkConfig<T>,
401        dht_persistent_storage: D,
402        gossip_config: GossipConfig,
403        request_response_config: RequestResponseConfig,
404        bind_address: Multiaddr,
405        pub_key: &T::SignatureKey,
406        priv_key: &<T::SignatureKey as SignatureKey>::PrivateKey,
407        metrics: Libp2pMetricsValue,
408    ) -> anyhow::Result<Self> {
409        // Try to take our Libp2p config from our broader network config
410        let libp2p_config = config
411            .libp2p_config
412            .take()
413            .ok_or(anyhow!("Libp2p config not supplied"))?;
414
415        // Derive our Libp2p keypair from our supplied private key
416        let keypair = derive_libp2p_keypair::<T::SignatureKey>(priv_key)?;
417
418        // Build our libp2p configuration
419        let mut config_builder = NetworkNodeConfigBuilder::default();
420
421        // Set the gossip configuration
422        config_builder.gossip_config(gossip_config.clone());
423        config_builder.request_response_config(request_response_config);
424
425        // Construct the auth message
426        let auth_message =
427            construct_auth_message(pub_key, &keypair.public().to_peer_id(), priv_key)
428                .with_context(|| "Failed to construct auth message")?;
429
430        // Set the auth message and stake table
431        config_builder.auth_message(Some(auth_message));
432
433        // The replication factor is the minimum of [the default and 2/3 the number of nodes]
434        let Some(default_replication_factor) = DEFAULT_REPLICATION_FACTOR else {
435            return Err(anyhow!("Default replication factor not supplied"));
436        };
437
438        let replication_factor = NonZeroUsize::new(min(
439            default_replication_factor.get(),
440            config.config.num_nodes_with_stake.get() / 2,
441        ))
442        .with_context(|| "Failed to calculate replication factor")?;
443
444        // Sign our DHT lookup record
445        let lookup_record_value = RecordValue::new_signed(
446            &RecordKey::new(Namespace::Lookup, pub_key.to_bytes()),
447            // The value is our Libp2p Peer ID
448            keypair.public().to_peer_id().to_bytes(),
449            priv_key,
450        )
451        .with_context(|| "Failed to sign DHT lookup record")?;
452
453        config_builder
454            .keypair(keypair)
455            .replication_factor(replication_factor)
456            .bind_address(Some(bind_address.clone()));
457
458        // Connect to the provided bootstrap nodes
459        config_builder.to_connect_addrs(HashSet::from_iter(libp2p_config.bootstrap_nodes.clone()));
460
461        // Build the node's configuration
462        let node_config = config_builder.build()?;
463
464        // Calculate all keys so we can keep track of direct message recipients
465        let mut all_keys = BTreeSet::new();
466
467        // Insert all known nodes into the set of all keys
468        for node in config.config.known_nodes_with_stake {
469            all_keys.insert(T::SignatureKey::public_key(&node.stake_table_entry));
470        }
471
472        Ok(Libp2pNetwork::new(
473            metrics,
474            dht_persistent_storage,
475            node_config,
476            pub_key.clone(),
477            lookup_record_value,
478            Arc::new(RwLock::new(libp2p_config.bootstrap_nodes)),
479            usize::try_from(config.node_index)?,
480            #[cfg(feature = "hotshot-testing")]
481            None,
482        )
483        .await?)
484    }
485
486    /// Returns whether or not the network has any peers.
487    #[must_use]
488    pub fn has_peers(&self) -> bool {
489        self.inner.is_ready.load(Ordering::Relaxed)
490    }
491
492    /// Returns only when the network is ready.
493    pub async fn wait_for_peers(&self) {
494        loop {
495            if self.has_peers() {
496                break;
497            }
498            sleep(Duration::from_secs(1)).await;
499        }
500    }
501
502    /// Constructs new network for a node. Note that this network is unconnected.
503    /// One must call `connect` in order to connect.
504    /// * `config`: the configuration of the node
505    /// * `pk`: public key associated with the node
506    /// * `bootstrap_addrs`: rwlock containing the bootstrap addrs
507    /// # Errors
508    /// Returns error in the event that the underlying libp2p network
509    /// is unable to create a network.
510    ///
511    /// # Panics
512    ///
513    /// This will panic if there are less than 5 bootstrap nodes
514    #[allow(clippy::too_many_arguments)]
515    pub async fn new<D: DhtPersistentStorage>(
516        metrics: Libp2pMetricsValue,
517        dht_persistent_storage: D,
518        config: NetworkNodeConfig,
519        pk: T::SignatureKey,
520        lookup_record_value: RecordValue<T::SignatureKey>,
521        bootstrap_addrs: BootstrapAddrs,
522        id: usize,
523        #[cfg(feature = "hotshot-testing")] reliability_config: Option<Box<dyn NetworkReliability>>,
524    ) -> Result<Libp2pNetwork<T>, NetworkError> {
525        // Create a map from consensus keys to Libp2p peer IDs
526        let consensus_key_to_pid_map = Arc::new(parking_lot::Mutex::new(BiMap::new()));
527
528        let (mut rx, network_handle) = spawn_network_node::<T, D>(
529            config.clone(),
530            dht_persistent_storage,
531            Arc::clone(&consensus_key_to_pid_map),
532            id,
533        )
534        .await
535        .map_err(|e| NetworkError::ConfigError(format!("failed to spawn network node: {e}")))?;
536
537        // Add our own address to the bootstrap addresses
538        let addr = network_handle.listen_addr();
539        let pid = network_handle.peer_id();
540        bootstrap_addrs.write().await.push((pid, addr));
541
542        // Subscribe to the relevant topics
543        let subscribed_topics = HashSet::from_iter(vec![QC_TOPIC.to_string()]);
544
545        // unbounded channels may not be the best choice (spammed?)
546        // if bounded figure out a way to log dropped msgs
547        let (sender, receiver) = channel(1000);
548        let (node_lookup_send, node_lookup_recv) = channel(10);
549        let (kill_tx, kill_rx) = channel(1);
550        rx.set_kill_switch(kill_rx);
551
552        let mut result = Libp2pNetwork {
553            inner: Arc::new(Libp2pNetworkInner {
554                handle: Arc::new(network_handle),
555                receiver: Mutex::new(receiver),
556                sender: sender.clone(),
557                pk,
558                bootstrap_addrs,
559                is_ready: Arc::new(AtomicBool::new(false)),
560                // This is optimal for 10-30 nodes. TODO: parameterize this for both tests and examples
561                dht_timeout: config.dht_timeout.unwrap_or(Duration::from_secs(120)),
562                is_bootstrapped: Arc::new(AtomicBool::new(false)),
563                metrics,
564                subscribed_topics,
565                node_lookup_send,
566                // Start the latest view from 0. "Latest" refers to "most recent view we are polling for
567                // proposals on". We need this because to have consensus info injected we need a working
568                // network already. In the worst case, we send a few lookups we don't need.
569                latest_seen_view: Arc::new(AtomicU64::new(0)),
570                #[cfg(feature = "hotshot-testing")]
571                reliability_config,
572                kill_switch: kill_tx,
573            }),
574        };
575
576        // Set the network as not ready
577        result.inner.metrics.is_ready.set(0);
578
579        result.handle_event_generator(sender, rx);
580        result.spawn_node_lookup(node_lookup_recv);
581        result.spawn_connect(id, lookup_record_value);
582
583        Ok(result)
584    }
585
586    /// Spawns task for looking up nodes pre-emptively
587    #[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)]
588    fn spawn_node_lookup(
589        &self,
590        mut node_lookup_recv: Receiver<Option<(ViewNumber, T::SignatureKey)>>,
591    ) {
592        let handle = Arc::clone(&self.inner.handle);
593        let dht_timeout = self.inner.dht_timeout;
594        let latest_seen_view = Arc::clone(&self.inner.latest_seen_view);
595
596        // deals with handling lookup queue. should be infallible
597        spawn(async move {
598            // cancels on shutdown
599            while let Some(Some((view_number, pk))) = node_lookup_recv.recv().await {
600                /// defines lookahead threshold based on the constant
601                #[allow(clippy::cast_possible_truncation)]
602                const THRESHOLD: u64 = (LOOK_AHEAD as f64 * 0.8) as u64;
603
604                trace!("Performing lookup for peer {pk}");
605
606                // only run if we are not too close to the next view number
607                if latest_seen_view.load(Ordering::Relaxed) + THRESHOLD <= *view_number {
608                    // look up
609                    if let Err(err) = handle.lookup_node(&pk, dht_timeout).await {
610                        warn!("Failed to perform lookup for key {pk}: {err}");
611                    };
612                }
613            }
614        });
615    }
616
617    /// Initiates connection to the outside world
618    fn spawn_connect(&mut self, id: usize, lookup_record_value: RecordValue<T::SignatureKey>) {
619        let pk = self.inner.pk.clone();
620        let bootstrap_ref = Arc::clone(&self.inner.bootstrap_addrs);
621        let handle = Arc::clone(&self.inner.handle);
622        let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
623        let inner = Arc::clone(&self.inner);
624
625        spawn({
626            let is_ready = Arc::clone(&self.inner.is_ready);
627            async move {
628                let bs_addrs = bootstrap_ref.read().await.clone();
629
630                // Add known peers to the network
631                handle.add_known_peers(bs_addrs).unwrap();
632
633                // Begin the bootstrap process
634                handle.begin_bootstrap()?;
635                while !is_bootstrapped.load(Ordering::Relaxed) {
636                    sleep(Duration::from_secs(1)).await;
637                    handle.begin_bootstrap()?;
638                }
639
640                // Subscribe to the QC topic
641                handle.subscribe(QC_TOPIC.to_string()).await.unwrap();
642
643                // Map our staking key to our Libp2p Peer ID so we can properly
644                // route direct messages
645                while handle
646                    .put_record(
647                        RecordKey::new(Namespace::Lookup, pk.to_bytes()),
648                        lookup_record_value.clone(),
649                    )
650                    .await
651                    .is_err()
652                {
653                    sleep(Duration::from_secs(1)).await;
654                }
655
656                // Wait for the network to connect to at least 1 peer
657                if let Err(e) = handle.wait_to_connect(1, id).await {
658                    error!("Failed to connect to peers: {e:?}");
659                    return Err::<(), NetworkError>(e);
660                }
661                info!("Connected to required number of peers");
662
663                // Set the network as ready
664                is_ready.store(true, Ordering::Relaxed);
665                inner.metrics.is_ready.set(1);
666
667                Ok::<(), NetworkError>(())
668            }
669        });
670    }
671
672    /// Handle events
673    fn handle_recvd_events(
674        &self,
675        msg: NetworkEvent,
676        sender: &Sender<Vec<u8>>,
677    ) -> Result<(), NetworkError> {
678        match msg {
679            GossipMsg(msg) => {
680                sender.try_send(msg).map_err(|err| {
681                    NetworkError::ChannelSendError(format!("failed to send gossip message: {err}"))
682                })?;
683            },
684            DirectRequest(msg, _pid, chan) => {
685                sender.try_send(msg).map_err(|err| {
686                    NetworkError::ChannelSendError(format!(
687                        "failed to send direct request message: {err}"
688                    ))
689                })?;
690                if self
691                    .inner
692                    .handle
693                    .direct_response(
694                        chan,
695                        &bincode::serialize(&Empty { byte: 0u8 }).map_err(|e| {
696                            NetworkError::FailedToSerialize(format!(
697                                "failed to serialize acknowledgement: {e}"
698                            ))
699                        })?,
700                    )
701                    .is_err()
702                {
703                    error!("failed to ack!");
704                };
705            },
706            DirectResponse(_msg, _) => {},
707            NetworkEvent::IsBootstrapped => {
708                error!(
709                    "handle_recvd_events received `NetworkEvent::IsBootstrapped`, which should be \
710                     impossible."
711                );
712            },
713            NetworkEvent::ConnectedPeersUpdate(_) => {},
714        }
715        Ok::<(), NetworkError>(())
716    }
717
718    /// task to propagate messages to handlers
719    /// terminates on shut down of network
720    fn handle_event_generator(&self, sender: Sender<Vec<u8>>, mut network_rx: NetworkNodeReceiver) {
721        let handle = self.clone();
722        let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
723        spawn(async move {
724            let Some(mut kill_switch) = network_rx.take_kill_switch() else {
725                tracing::error!(
726                    "`spawn_handle` was called on a network handle that was already closed"
727                );
728                return;
729            };
730
731            loop {
732                select! {
733                    msg = network_rx.recv() => {
734                        let Ok(message) = msg else {
735                            warn!("Network receiver shut down!");
736                            return;
737                        };
738
739                        match message {
740                            NetworkEvent::IsBootstrapped => {
741                                is_bootstrapped.store(true, Ordering::Relaxed);
742                            }
743                            GossipMsg(_) | DirectRequest(_, _, _) | DirectResponse(_, _) => {
744                                let _ = handle.handle_recvd_events(message, &sender);
745                            }
746                            NetworkEvent::ConnectedPeersUpdate(num_peers) => {
747                                handle.inner.metrics.num_connected_peers.set(num_peers);
748                            }
749                        }
750                    }
751
752                    _kill_switch = kill_switch.recv() => {
753                        warn!("Event Handler shutdown");
754                        return;
755                    }
756                }
757            }
758        });
759    }
760}
761
762#[async_trait]
763impl<T: NodeType> ConnectedNetwork<T::SignatureKey> for Libp2pNetwork<T> {
764    #[instrument(name = "Libp2pNetwork::ready_blocking", skip_all)]
765    async fn wait_for_ready(&self) {
766        self.wait_for_peers().await;
767    }
768
769    fn pause(&self) {
770        unimplemented!("Pausing not implemented for the Libp2p network");
771    }
772
773    fn resume(&self) {
774        unimplemented!("Resuming not implemented for the Libp2p network");
775    }
776
777    #[instrument(name = "Libp2pNetwork::shut_down", skip_all)]
778    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
779    where
780        'a: 'b,
781        Self: 'b,
782    {
783        let closure = async move {
784            let _ = self.inner.handle.shutdown().await;
785            let _ = self.inner.node_lookup_send.send(None).await;
786            let _ = self.inner.kill_switch.send(()).await;
787        };
788        boxed_sync(closure)
789    }
790
791    #[instrument(name = "Libp2pNetwork::broadcast_message", skip_all)]
792    async fn broadcast_message(
793        &self,
794        _: ViewNumber,
795        message: Vec<u8>,
796        topic: Topic,
797        _broadcast_delay: BroadcastDelay,
798    ) -> Result<(), NetworkError> {
799        // If we're not ready yet (we don't have any peers, error)
800        if !self.has_peers() {
801            self.inner.metrics.num_failed_messages.add(1);
802            return Err(NetworkError::NoPeersYet);
803        };
804
805        // If we are subscribed to the topic,
806        let topic = topic.to_string();
807        if self.inner.subscribed_topics.contains(&topic) {
808            // Short-circuit-send the message to ourselves
809            self.inner.sender.try_send(message.clone()).map_err(|_| {
810                self.inner.metrics.num_failed_messages.add(1);
811                NetworkError::ShutDown
812            })?;
813        }
814
815        // NOTE: metrics is threadsafe, so clone is fine (and lightweight)
816        #[cfg(feature = "hotshot-testing")]
817        {
818            let metrics = self.inner.metrics.clone();
819            if let Some(config) = &self.inner.reliability_config {
820                let handle = Arc::clone(&self.inner.handle);
821
822                let fut = config.clone().chaos_send_msg(
823                    message,
824                    Arc::new(move |msg: Vec<u8>| {
825                        let topic_2 = topic.clone();
826                        let handle_2 = Arc::clone(&handle);
827                        let metrics_2 = metrics.clone();
828                        boxed_sync(async move {
829                            if let Err(e) = handle_2.gossip_no_serialize(topic_2, msg) {
830                                metrics_2.num_failed_messages.add(1);
831                                warn!("Failed to broadcast to libp2p: {e:?}");
832                            }
833                        })
834                    }),
835                );
836                spawn(fut);
837                return Ok(());
838            }
839        }
840
841        if let Err(e) = self.inner.handle.gossip(topic, &message) {
842            self.inner.metrics.num_failed_messages.add(1);
843            return Err(e);
844        }
845
846        Ok(())
847    }
848
849    #[instrument(name = "Libp2pNetwork::da_broadcast_message", skip_all)]
850    async fn da_broadcast_message(
851        &self,
852        view: ViewNumber,
853        message: Vec<u8>,
854        recipients: Vec<T::SignatureKey>,
855        _broadcast_delay: BroadcastDelay,
856    ) -> Result<(), NetworkError> {
857        // If we're not ready yet (we don't have any peers, error)
858        if !self.has_peers() {
859            self.inner.metrics.num_failed_messages.add(1);
860            return Err(NetworkError::NoPeersYet);
861        };
862
863        // If we are subscribed to the DA topic, send the message to ourselves first
864        let topic = Topic::Da.to_string();
865        if self.inner.subscribed_topics.contains(&topic) {
866            self.inner.sender.try_send(message.clone()).map_err(|_| {
867                self.inner.metrics.num_failed_messages.add(1);
868                NetworkError::ShutDown
869            })?;
870        }
871
872        let future_results = recipients
873            .into_iter()
874            .map(|r| self.direct_message(view, message.clone(), r));
875        let results = join_all(future_results).await;
876
877        let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
878
879        if errors.is_empty() {
880            Ok(())
881        } else {
882            Err(NetworkError::Multiple(errors))
883        }
884    }
885
886    #[instrument(name = "Libp2pNetwork::direct_message", skip_all)]
887    async fn direct_message(
888        &self,
889        _: ViewNumber,
890        message: Vec<u8>,
891        recipient: T::SignatureKey,
892    ) -> Result<(), NetworkError> {
893        // If we're not ready yet (we don't have any peers, error)
894        if !self.has_peers() {
895            self.inner.metrics.num_failed_messages.add(1);
896            return Err(NetworkError::NoPeersYet);
897        };
898
899        // short circuit if we're dming ourselves
900        if recipient == self.inner.pk {
901            // panic if we already shut down?
902            self.inner.sender.try_send(message).map_err(|_x| {
903                self.inner.metrics.num_failed_messages.add(1);
904                NetworkError::ShutDown
905            })?;
906            return Ok(());
907        }
908
909        let pid = match self
910            .inner
911            .handle
912            .lookup_node(&recipient, Duration::from_secs(2))
913            .await
914        {
915            Ok(pid) => pid,
916            Err(err) => {
917                self.inner.metrics.num_failed_messages.add(1);
918                return Err(NetworkError::LookupError(format!(
919                    "failed to look up node for direct message: {err}"
920                )));
921            },
922        };
923
924        #[cfg(feature = "hotshot-testing")]
925        {
926            let metrics = self.inner.metrics.clone();
927            if let Some(config) = &self.inner.reliability_config {
928                let handle = Arc::clone(&self.inner.handle);
929
930                let fut = config.clone().chaos_send_msg(
931                    message,
932                    Arc::new(move |msg: Vec<u8>| {
933                        let handle_2 = Arc::clone(&handle);
934                        let metrics_2 = metrics.clone();
935                        boxed_sync(async move {
936                            if let Err(e) = handle_2.direct_request_no_serialize(pid, msg) {
937                                metrics_2.num_failed_messages.add(1);
938                                warn!("Failed to broadcast to libp2p: {e:?}");
939                            }
940                        })
941                    }),
942                );
943                spawn(fut);
944                return Ok(());
945            }
946        }
947
948        match self.inner.handle.direct_request(pid, &message) {
949            Ok(()) => Ok(()),
950            Err(e) => {
951                self.inner.metrics.num_failed_messages.add(1);
952                Err(e)
953            },
954        }
955    }
956
957    /// Receive one or many messages from the underlying network.
958    ///
959    /// # Errors
960    /// If there is a network-related failure.
961    #[instrument(name = "Libp2pNetwork::recv_message", skip_all)]
962    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
963        let result = self
964            .inner
965            .receiver
966            .lock()
967            .await
968            .recv()
969            .await
970            .ok_or(NetworkError::ShutDown)?;
971
972        Ok(result)
973    }
974
975    #[instrument(name = "Libp2pNetwork::queue_node_lookup", skip_all)]
976    #[allow(clippy::type_complexity)]
977    fn queue_node_lookup(
978        &self,
979        view_number: ViewNumber,
980        pk: T::SignatureKey,
981    ) -> Result<(), TrySendError<Option<(ViewNumber, T::SignatureKey)>>> {
982        self.inner
983            .node_lookup_send
984            .try_send(Some((view_number, pk)))
985    }
986
987    /// The libp2p view update is a special operation intrinsic to its internal behavior.
988    ///
989    /// Libp2p needs to do a lookup because a libp2p address is not related to
990    /// hotshot keys. So in libp2p we store a mapping of HotShot key to libp2p address
991    /// in a distributed hash table.
992    ///
993    /// This means to directly message someone on libp2p we need to lookup in the hash
994    /// table what their libp2p address is, using their HotShot public key as the key.
995    ///
996    /// So the logic with libp2p is to prefetch upcoming leaders libp2p address to
997    /// save time when we later need to direct message the leader our vote. Hence the
998    /// use of the future view and leader to queue the lookups.
999    async fn update_view<TYPES>(
1000        &self,
1001        view: ViewNumber,
1002        epoch: Option<EpochNumber>,
1003        membership_coordinator: EpochMembershipCoordinator<TYPES>,
1004    ) where
1005        TYPES: NodeType<SignatureKey = T::SignatureKey>,
1006    {
1007        let future_view = ViewNumber::new(*view) + LOOK_AHEAD;
1008        let epoch = epoch.map(|e| EpochNumber::new(*e));
1009
1010        let membership = match membership_coordinator.membership_for_epoch(epoch).await {
1011            Ok(m) => m,
1012            Err(e) => {
1013                return tracing::warn!(e.message);
1014            },
1015        };
1016        let future_leader = match membership.leader(future_view).await {
1017            Ok(l) => l,
1018            Err(e) => {
1019                return tracing::info!("Failed to calculate leader for view {future_view}: {e}");
1020            },
1021        };
1022
1023        let _ = self
1024            .queue_node_lookup(ViewNumber::new(*future_view), future_leader)
1025            .map_err(|err| tracing::warn!("failed to process node lookup request: {err}"));
1026    }
1027}
1028
1029#[cfg(test)]
1030mod test {
1031    mod derive_multiaddr {
1032        use std::net::Ipv6Addr;
1033
1034        use super::super::*;
1035
1036        /// Test derivation of a valid IPv4 address -> Multiaddr
1037        #[test]
1038        fn test_v4_valid() {
1039            // Derive a multiaddr from a valid IPv4 address
1040            let addr = "1.1.1.1:8080".to_string();
1041            let multiaddr =
1042                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1043
1044            // Make sure it's the correct (quic) multiaddr
1045            assert_eq!(multiaddr.to_string(), "/ip4/1.1.1.1/udp/8080/quic-v1");
1046        }
1047
1048        /// Test derivation of a valid IPv6 address -> Multiaddr
1049        #[test]
1050        fn test_v6_valid() {
1051            // Derive a multiaddr from a valid IPv6 address
1052            let ipv6_addr = Ipv6Addr::new(1, 2, 3, 4, 5, 6, 7, 8);
1053            let addr = format!("{ipv6_addr}:8080");
1054            let multiaddr =
1055                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1056
1057            // Make sure it's the correct (quic) multiaddr
1058            assert_eq!(
1059                multiaddr.to_string(),
1060                format!("/ip6/{ipv6_addr}/udp/8080/quic-v1")
1061            );
1062        }
1063
1064        /// Test that an invalid address fails to derive to a Multiaddr
1065        #[test]
1066        fn test_no_port() {
1067            // Derive a multiaddr from an invalid port
1068            let addr = "1.1.1.1".to_string();
1069            let multiaddr = derive_libp2p_multiaddr(&addr);
1070
1071            // Make sure it fails
1072            assert!(multiaddr.is_err());
1073        }
1074
1075        /// Test that an existing domain name resolves to a Multiaddr
1076        #[test]
1077        fn test_fqdn_exists() {
1078            // Derive a multiaddr from a valid FQDN
1079            let addr = "example.com:8080".to_string();
1080            let multiaddr =
1081                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1082
1083            // Make sure it's the correct (quic) multiaddr
1084            assert_eq!(multiaddr.to_string(), "/dns/example.com/udp/8080/quic-v1");
1085        }
1086
1087        /// Test that a non-existent domain name still resolves to a Multiaddr
1088        #[test]
1089        fn test_fqdn_does_not_exist() {
1090            // Derive a multiaddr from an invalid FQDN
1091            let addr = "libp2p.example.com:8080".to_string();
1092            let multiaddr =
1093                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1094
1095            // Make sure it still worked
1096            assert_eq!(
1097                multiaddr.to_string(),
1098                "/dns/libp2p.example.com/udp/8080/quic-v1"
1099            );
1100        }
1101
1102        /// Test that a domain name without a port fails to derive to a Multiaddr
1103        #[test]
1104        fn test_fqdn_no_port() {
1105            // Derive a multiaddr from an invalid port
1106            let addr = "example.com".to_string();
1107            let multiaddr = derive_libp2p_multiaddr(&addr);
1108
1109            // Make sure it fails
1110            assert!(multiaddr.is_err());
1111        }
1112    }
1113}