hotshot_libp2p_networking/network/
node.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7/// configuration for the libp2p network (e.g. how it should be built)
8mod config;
9
10/// libp2p network handle
11/// allows for control over the libp2p network
12mod handle;
13
14use std::{
15    collections::{HashMap, HashSet},
16    iter,
17    num::{NonZeroU32, NonZeroUsize},
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22use bimap::BiMap;
23use futures::{channel::mpsc, SinkExt, StreamExt};
24use hotshot_types::{
25    constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, traits::node_implementation::NodeType,
26};
27use libp2p::{
28    autonat,
29    core::transport::ListenerId,
30    gossipsub::{
31        Behaviour as Gossipsub, ConfigBuilder as GossipsubConfigBuilder, Event as GossipEvent,
32        Message as GossipsubMessage, MessageAuthenticity, MessageId, Topic, ValidationMode,
33    },
34    identify::{
35        Behaviour as IdentifyBehaviour, Config as IdentifyConfig, Event as IdentifyEvent,
36        Info as IdentifyInfo,
37    },
38    identity::Keypair,
39    kad::{store::MemoryStore, Behaviour, Config, Mode, Record},
40    request_response::{
41        Behaviour as RequestResponse, Config as Libp2pRequestResponseConfig, ProtocolSupport,
42    },
43    swarm::SwarmEvent,
44    Multiaddr, StreamProtocol, Swarm, SwarmBuilder,
45};
46use libp2p_identity::PeerId;
47use parking_lot::Mutex;
48use rand::{prelude::SliceRandom, thread_rng};
49use tokio::{
50    select, spawn,
51    sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
52};
53use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
54
55pub use self::{
56    config::{
57        GossipConfig, NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeConfigBuilderError,
58        RequestResponseConfig, DEFAULT_REPLICATION_FACTOR,
59    },
60    handle::{spawn_network_node, NetworkNodeHandle, NetworkNodeReceiver},
61};
62use super::{
63    behaviours::dht::{
64        bootstrap::{DHTBootstrapTask, InputEvent},
65        store::{
66            persistent::{DhtPersistentStorage, PersistentStore},
67            validated::ValidatedStore,
68        },
69    },
70    cbor::Cbor,
71    gen_transport, BoxedTransport, ClientRequest, NetworkDef, NetworkError, NetworkEvent,
72    NetworkEventInternal,
73};
74use crate::network::behaviours::{
75    dht::{DHTBehaviour, DHTProgress, KadPutQuery},
76    direct_message::{DMBehaviour, DMRequest},
77    exponential_backoff::ExponentialBackoff,
78};
79
80/// Maximum size of a message
81pub const MAX_GOSSIP_MSG_SIZE: usize = 2_000_000_000;
82
83/// Wrapped num of connections
84pub const ESTABLISHED_LIMIT: NonZeroU32 = NonZeroU32::new(ESTABLISHED_LIMIT_UNWR).unwrap();
85/// Number of connections to a single peer before logging an error
86pub const ESTABLISHED_LIMIT_UNWR: u32 = 10;
87
88/// Network definition
89#[derive(derive_more::Debug)]
90pub struct NetworkNode<T: NodeType, D: DhtPersistentStorage> {
91    /// peer id of network node
92    peer_id: PeerId,
93    /// the swarm of networkbehaviours
94    #[debug(skip)]
95    swarm: Swarm<NetworkDef<T::SignatureKey, D>>,
96    /// The Kademlia record TTL
97    kademlia_record_ttl: Duration,
98    /// The map from consensus keys to peer IDs
99    consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
100    /// the listener id we are listening on, if it exists
101    listener_id: Option<ListenerId>,
102    /// Handler for direct messages
103    direct_message_state: DMBehaviour,
104    /// Handler for DHT Events
105    dht_handler: DHTBehaviour<T::SignatureKey, D>,
106    /// Channel to resend requests, set to Some when we call `spawn_listeners`
107    resend_tx: Option<UnboundedSender<ClientRequest>>,
108}
109
110impl<T: NodeType, D: DhtPersistentStorage> NetworkNode<T, D> {
111    /// Returns number of peers this node is connected to
112    pub fn num_connected(&self) -> usize {
113        self.swarm.connected_peers().count()
114    }
115
116    /// return hashset of PIDs this node is connected to
117    pub fn connected_pids(&self) -> HashSet<PeerId> {
118        self.swarm.connected_peers().copied().collect()
119    }
120
121    /// starts the swarm listening on `listen_addr`
122    /// and optionally dials into peer `known_peer`
123    /// returns the address the swarm is listening upon
124    #[instrument(skip(self))]
125    pub async fn start_listen(
126        &mut self,
127        listen_addr: Multiaddr,
128    ) -> Result<Multiaddr, NetworkError> {
129        self.listener_id = Some(self.swarm.listen_on(listen_addr).map_err(|err| {
130            NetworkError::ListenError(format!("failed to listen for Libp2p: {err}"))
131        })?);
132        let addr = loop {
133            if let Some(SwarmEvent::NewListenAddr { address, .. }) = self.swarm.next().await {
134                break address;
135            }
136        };
137        info!("Libp2p listening on {addr:?}");
138        Ok(addr)
139    }
140
141    /// initialize the DHT with known peers
142    /// add the peers to kademlia and then
143    /// the `spawn_listeners` function
144    /// will start connecting to peers
145    #[instrument(skip(self))]
146    pub fn add_known_peers(&mut self, known_peers: &[(PeerId, Multiaddr)]) {
147        debug!("Adding {} known peers", known_peers.len());
148        let behaviour = self.swarm.behaviour_mut();
149        let mut bs_nodes = HashMap::<PeerId, HashSet<Multiaddr>>::new();
150        let mut shuffled = known_peers.iter().collect::<Vec<_>>();
151        shuffled.shuffle(&mut thread_rng());
152        for (peer_id, addr) in shuffled {
153            if *peer_id != self.peer_id {
154                behaviour.dht.add_address(peer_id, addr.clone());
155                behaviour.autonat.add_server(*peer_id, Some(addr.clone()));
156                bs_nodes.insert(*peer_id, iter::once(addr.clone()).collect());
157            }
158        }
159    }
160
161    /// Creates a new `Network` with the given settings.
162    ///
163    /// Currently:
164    ///   * Generates a random key pair and associated [`PeerId`]
165    ///   * Launches a hopefully production ready transport: QUIC v1 (RFC 9000) + DNS
166    ///   * Generates a connection to the "broadcast" topic
167    ///   * Creates a swarm to manage peers and events
168    ///
169    /// # Errors
170    /// - If we fail to generate the transport or any of the behaviours
171    ///
172    /// # Panics
173    /// If 5 < 0
174    #[allow(clippy::too_many_lines)]
175    pub async fn new(
176        config: NetworkNodeConfig<T>,
177        dht_persistent_storage: D,
178        consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
179    ) -> Result<Self, NetworkError> {
180        // Generate a random `KeyPair` if one is not specified
181        let keypair = config
182            .keypair
183            .clone()
184            .unwrap_or_else(Keypair::generate_ed25519);
185
186        // Get the `PeerId` from the `KeyPair`
187        let peer_id = PeerId::from(keypair.public());
188
189        // Generate the transport from the keypair, membership, and auth message
190        let transport: BoxedTransport = gen_transport::<T>(
191            keypair.clone(),
192            config.membership.clone(),
193            config.auth_message.clone(),
194            Arc::clone(&consensus_key_to_pid_map),
195        )
196        .await?;
197
198        // Calculate the record republication interval
199        let kademlia_record_republication_interval = config
200            .republication_interval
201            .unwrap_or(Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC));
202
203        // Calculate the Kademlia record TTL
204        let kademlia_ttl = config
205            .ttl
206            .unwrap_or(16 * kademlia_record_republication_interval);
207
208        // Generate the swarm
209        let mut swarm: Swarm<NetworkDef<T::SignatureKey, D>> = {
210            // Use the `Blake3` hash of the message's contents as the ID
211            let message_id_fn = |message: &GossipsubMessage| {
212                let hash = blake3::hash(&message.data);
213                MessageId::from(hash.as_bytes().to_vec())
214            };
215
216            // Derive a `Gossipsub` config from our gossip config
217            let gossipsub_config = GossipsubConfigBuilder::default()
218                .message_id_fn(message_id_fn) // Use the (blake3) hash of a message as its ID
219                .validation_mode(ValidationMode::Strict) // Force all messages to have valid signatures
220                .heartbeat_interval(config.gossip_config.heartbeat_interval) // Time between gossip heartbeats
221                .history_gossip(config.gossip_config.history_gossip) // Number of heartbeats to gossip about
222                .history_length(config.gossip_config.history_length) // Number of heartbeats to remember the full message for
223                .mesh_n(config.gossip_config.mesh_n) // Target number of mesh peers
224                .mesh_n_high(config.gossip_config.mesh_n_high) // Upper limit of mesh peers
225                .mesh_n_low(config.gossip_config.mesh_n_low) // Lower limit of mesh peers
226                .mesh_outbound_min(config.gossip_config.mesh_outbound_min) // Minimum number of outbound peers in mesh
227                .max_transmit_size(config.gossip_config.max_transmit_size) // Maximum size of a message
228                .max_ihave_length(config.gossip_config.max_ihave_length) // Maximum number of messages to include in an IHAVE message
229                .max_ihave_messages(config.gossip_config.max_ihave_messages) // Maximum number of IHAVE messages to accept from a peer within a heartbeat
230                .published_message_ids_cache_time(
231                    config.gossip_config.published_message_ids_cache_time,
232                ) // Cache duration for published message IDs
233                .iwant_followup_time(config.gossip_config.iwant_followup_time) // Time to wait for a message requested through IWANT following an IHAVE advertisement
234                .max_messages_per_rpc(config.gossip_config.max_messages_per_rpc) // The maximum number of messages we will process in a given RPC
235                .gossip_retransimission(config.gossip_config.gossip_retransmission) // Controls how many times we will allow a peer to request the same message id through IWANT gossip before we start ignoring them.
236                .flood_publish(config.gossip_config.flood_publish) // If enabled newly created messages will always be sent to all peers that are subscribed to the topic and have a good enough score.
237                .duplicate_cache_time(config.gossip_config.duplicate_cache_time) // The time period that messages are stored in the cache
238                .fanout_ttl(config.gossip_config.fanout_ttl) // Time to live for fanout peers
239                .heartbeat_initial_delay(config.gossip_config.heartbeat_initial_delay) // Initial delay in each heartbeat
240                .gossip_factor(config.gossip_config.gossip_factor) // Affects how many peers we will emit gossip to at each heartbeat
241                .gossip_lazy(config.gossip_config.gossip_lazy) // Minimum number of peers to emit gossip to during a heartbeat
242                .build()
243                .map_err(|err| {
244                    NetworkError::ConfigError(format!("error building gossipsub config: {err:?}"))
245                })?;
246
247            // - Build a gossipsub network behavior
248            let gossipsub: Gossipsub = Gossipsub::new(
249                MessageAuthenticity::Signed(keypair.clone()),
250                gossipsub_config,
251            )
252            .map_err(|err| {
253                NetworkError::ConfigError(format!("error building gossipsub behaviour: {err:?}"))
254            })?;
255
256            //   Build a identify network behavior needed for own
257            //   node connection information
258            //   E.g. this will answer the question: how are other nodes
259            //   seeing the peer from behind a NAT
260            let identify_cfg =
261                IdentifyConfig::new("HotShot/identify/1.0".to_string(), keypair.public());
262            let identify = IdentifyBehaviour::new(identify_cfg);
263
264            // - Build DHT needed for peer discovery
265            let mut kconfig = Config::new(StreamProtocol::new("/ipfs/kad/1.0.0"));
266            kconfig
267                .set_parallelism(NonZeroUsize::new(5).unwrap())
268                .set_provider_publication_interval(Some(kademlia_record_republication_interval))
269                .set_publication_interval(Some(kademlia_record_republication_interval))
270                .set_record_ttl(Some(kademlia_ttl));
271
272            // allowing panic here because something is very wrong if this fales
273            #[allow(clippy::panic)]
274            if let Some(factor) = config.replication_factor {
275                kconfig.set_replication_factor(factor);
276            } else {
277                panic!("Replication factor not set");
278            }
279
280            // Create the DHT behaviour with the given persistent storage
281            let mut kadem = Behaviour::with_config(
282                peer_id,
283                PersistentStore::new(
284                    ValidatedStore::new(MemoryStore::new(peer_id)),
285                    dht_persistent_storage,
286                    5,
287                )
288                .await,
289                kconfig,
290            );
291            kadem.set_mode(Some(Mode::Server));
292
293            let rrconfig = Libp2pRequestResponseConfig::default();
294
295            // Create a new `cbor` codec with the given request and response sizes
296            let cbor = Cbor::new(
297                config.request_response_config.request_size_maximum,
298                config.request_response_config.response_size_maximum,
299            );
300
301            let direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>> =
302                RequestResponse::with_codec(
303                    cbor,
304                    [(
305                        StreamProtocol::new("/HotShot/direct_message/1.0"),
306                        ProtocolSupport::Full,
307                    )]
308                    .into_iter(),
309                    rrconfig.clone(),
310                );
311
312            let autonat_config = autonat::Config {
313                only_global_ips: false,
314                ..Default::default()
315            };
316
317            let network = NetworkDef::new(
318                gossipsub,
319                kadem,
320                identify,
321                direct_message,
322                autonat::Behaviour::new(peer_id, autonat_config),
323            );
324
325            // build swarm
326            let swarm = SwarmBuilder::with_existing_identity(keypair.clone());
327            let swarm = swarm.with_tokio();
328
329            swarm
330                .with_other_transport(|_| transport)
331                .unwrap()
332                .with_behaviour(|_| network)
333                .unwrap()
334                .build()
335        };
336        for (peer, addr) in &config.to_connect_addrs {
337            if peer != swarm.local_peer_id() {
338                swarm.behaviour_mut().add_address(peer, addr.clone());
339            }
340        }
341
342        Ok(Self {
343            peer_id,
344            swarm,
345            kademlia_record_ttl: kademlia_ttl,
346            consensus_key_to_pid_map,
347            listener_id: None,
348            direct_message_state: DMBehaviour::default(),
349            dht_handler: DHTBehaviour::new(
350                peer_id,
351                config
352                    .replication_factor
353                    .unwrap_or(NonZeroUsize::new(4).unwrap()),
354            ),
355            resend_tx: None,
356        })
357    }
358
359    /// Publish a key/value to the record store.
360    ///
361    /// # Panics
362    /// If the default replication factor is `None`
363    pub fn put_record(&mut self, mut query: KadPutQuery) {
364        // Create the new record
365        let mut record = Record::new(query.key.clone(), query.value.clone());
366
367        // Set the record's expiration time to the proper time
368        record.expires = Some(Instant::now() + self.kademlia_record_ttl);
369
370        match self.swarm.behaviour_mut().dht.put_record(
371            record,
372            libp2p::kad::Quorum::N(
373                NonZeroUsize::try_from(self.dht_handler.replication_factor().get() / 2)
374                    .expect("replication factor should be bigger than 0"),
375            ),
376        ) {
377            Err(e) => {
378                // failed try again later
379                query.progress = DHTProgress::NotStarted;
380                query.backoff.start_next(false);
381                error!("Error publishing to DHT: {e:?} for peer {:?}", self.peer_id);
382            },
383            Ok(qid) => {
384                debug!("Published record to DHT with qid {qid:?}");
385                let query = KadPutQuery {
386                    progress: DHTProgress::InProgress(qid),
387                    ..query
388                };
389                self.dht_handler.put_record(qid, query);
390            },
391        }
392    }
393
394    /// event handler for client events
395    /// currently supported actions include
396    /// - shutting down the swarm
397    /// - gossipping a message to known peers on the `global` topic
398    /// - returning the id of the current peer
399    /// - subscribing to a topic
400    /// - unsubscribing from a toipc
401    /// - direct messaging a peer
402    #[instrument(skip(self))]
403    async fn handle_client_requests(
404        &mut self,
405        msg: Option<ClientRequest>,
406    ) -> Result<bool, NetworkError> {
407        let behaviour = self.swarm.behaviour_mut();
408        match msg {
409            Some(msg) => {
410                match msg {
411                    ClientRequest::BeginBootstrap => {
412                        debug!("Beginning Libp2p bootstrap");
413                        let _ = self.swarm.behaviour_mut().dht.bootstrap();
414                    },
415                    ClientRequest::LookupPeer(pid, chan) => {
416                        let id = self.swarm.behaviour_mut().dht.get_closest_peers(pid);
417                        self.dht_handler
418                            .in_progress_get_closest_peers
419                            .insert(id, chan);
420                    },
421                    ClientRequest::GetRoutingTable(chan) => {
422                        self.dht_handler
423                            .print_routing_table(&mut self.swarm.behaviour_mut().dht);
424                        if chan.send(()).is_err() {
425                            warn!("Tried to notify client but client not tracking anymore");
426                        }
427                    },
428                    ClientRequest::PutDHT { key, value, notify } => {
429                        let query = KadPutQuery {
430                            progress: DHTProgress::NotStarted,
431                            notify,
432                            key,
433                            value,
434                            backoff: ExponentialBackoff::default(),
435                        };
436                        self.put_record(query);
437                    },
438                    ClientRequest::GetConnectedPeerNum(s) => {
439                        if s.send(self.num_connected()).is_err() {
440                            error!("error sending peer number to client");
441                        }
442                    },
443                    ClientRequest::GetConnectedPeers(s) => {
444                        if s.send(self.connected_pids()).is_err() {
445                            error!("error sending peer set to client");
446                        }
447                    },
448                    ClientRequest::GetDHT {
449                        key,
450                        notify,
451                        retry_count,
452                    } => {
453                        self.dht_handler.get_record(
454                            key,
455                            notify,
456                            ExponentialBackoff::default(),
457                            retry_count,
458                            &mut self.swarm.behaviour_mut().dht,
459                        );
460                    },
461                    ClientRequest::IgnorePeers(_peers) => {
462                        // NOTE used by test with conductor only
463                    },
464                    ClientRequest::Shutdown => {
465                        if let Some(listener_id) = self.listener_id {
466                            self.swarm.remove_listener(listener_id);
467                        }
468
469                        return Ok(true);
470                    },
471                    ClientRequest::GossipMsg(topic, contents) => {
472                        behaviour.publish_gossip(Topic::new(topic.clone()), contents.clone());
473                    },
474                    ClientRequest::Subscribe(t, chan) => {
475                        behaviour.subscribe_gossip(&t);
476                        if let Some(chan) = chan {
477                            if chan.send(()).is_err() {
478                                error!("finished subscribing but response channel dropped");
479                            }
480                        }
481                    },
482                    ClientRequest::Unsubscribe(t, chan) => {
483                        behaviour.unsubscribe_gossip(&t);
484                        if let Some(chan) = chan {
485                            if chan.send(()).is_err() {
486                                error!("finished unsubscribing but response channel dropped");
487                            }
488                        }
489                    },
490                    ClientRequest::DirectRequest {
491                        pid,
492                        contents,
493                        retry_count,
494                    } => {
495                        debug!("Sending direct request to {pid:?}");
496                        let id = behaviour.add_direct_request(pid, contents.clone());
497                        let req = DMRequest {
498                            peer_id: pid,
499                            data: contents,
500                            backoff: ExponentialBackoff::default(),
501                            retry_count,
502                        };
503                        self.direct_message_state.add_direct_request(req, id);
504                    },
505                    ClientRequest::DirectResponse(chan, msg) => {
506                        behaviour.add_direct_response(chan, msg);
507                    },
508                    ClientRequest::AddKnownPeers(peers) => {
509                        self.add_known_peers(&peers);
510                    },
511                    ClientRequest::Prune(pid) => {
512                        if self.swarm.disconnect_peer_id(pid).is_err() {
513                            warn!("Could not disconnect from {pid:?}");
514                        }
515                    },
516                }
517            },
518            None => {
519                error!("Error receiving msg in main behaviour loop: channel closed");
520            },
521        }
522        Ok(false)
523    }
524
525    /// event handler for events emitted from the swarm
526    #[allow(clippy::type_complexity)]
527    #[instrument(skip(self))]
528    async fn handle_swarm_events(
529        &mut self,
530        event: SwarmEvent<NetworkEventInternal>,
531        send_to_client: &UnboundedSender<NetworkEvent>,
532    ) -> Result<(), NetworkError> {
533        // Make the match cleaner
534        debug!("Swarm event observed {:?}", event);
535
536        #[allow(deprecated)]
537        match event {
538            SwarmEvent::ConnectionEstablished {
539                connection_id: _,
540                peer_id,
541                endpoint,
542                num_established,
543                concurrent_dial_errors,
544                established_in: _established_in,
545            } => {
546                if num_established > ESTABLISHED_LIMIT {
547                    error!(
548                        "Num concurrent connections to a single peer exceeding {ESTABLISHED_LIMIT:?} at {num_established:?}!"
549                    );
550                } else {
551                    debug!(
552                        "Connection established with {peer_id:?} at {endpoint:?} with {concurrent_dial_errors:?} concurrent dial errors"
553                    );
554                }
555
556                // Send the number of connected peers to the client
557                send_to_client
558                    .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
559                    .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
560            },
561            SwarmEvent::ConnectionClosed {
562                connection_id: _,
563                peer_id,
564                endpoint,
565                num_established,
566                cause,
567            } => {
568                if num_established > ESTABLISHED_LIMIT_UNWR {
569                    error!(
570                        "Num concurrent connections to a single peer exceeding {ESTABLISHED_LIMIT:?} at {num_established:?}!"
571                    );
572                } else {
573                    debug!("Connection closed with {peer_id:?} at {endpoint:?} due to {cause:?}");
574                }
575
576                // If we are no longer connected to the peer, remove the consensus key from the map
577                if num_established == 0 {
578                    self.consensus_key_to_pid_map
579                        .lock()
580                        .remove_by_right(&peer_id);
581                }
582
583                // Send the number of connected peers to the client
584                send_to_client
585                    .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
586                    .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
587            },
588            SwarmEvent::Dialing {
589                peer_id,
590                connection_id: _,
591            } => {
592                debug!("Attempting to dial {peer_id:?}");
593            },
594            SwarmEvent::ListenerClosed {
595                listener_id: _,
596                addresses: _,
597                reason: _,
598            }
599            | SwarmEvent::NewListenAddr {
600                listener_id: _,
601                address: _,
602            }
603            | SwarmEvent::ExpiredListenAddr {
604                listener_id: _,
605                address: _,
606            }
607            | SwarmEvent::NewExternalAddrCandidate { .. }
608            | SwarmEvent::ExternalAddrExpired { .. }
609            | SwarmEvent::IncomingConnection {
610                connection_id: _,
611                local_addr: _,
612                send_back_addr: _,
613            } => {},
614            SwarmEvent::Behaviour(b) => {
615                let maybe_event = match b {
616                    NetworkEventInternal::DHTEvent(e) => self
617                        .dht_handler
618                        .dht_handle_event(e, self.swarm.behaviour_mut().dht.store_mut()),
619                    NetworkEventInternal::IdentifyEvent(e) => {
620                        // NOTE feed identified peers into kademlia's routing table for peer discovery.
621                        if let IdentifyEvent::Received {
622                            peer_id,
623                            info:
624                                IdentifyInfo {
625                                    listen_addrs,
626                                    protocols: _,
627                                    public_key: _,
628                                    protocol_version: _,
629                                    agent_version: _,
630                                    observed_addr: _,
631                                },
632                            connection_id: _,
633                        } = *e
634                        {
635                            let behaviour = self.swarm.behaviour_mut();
636
637                            // into hashset to delete duplicates (I checked: there are duplicates)
638                            for addr in listen_addrs.iter().collect::<HashSet<_>>() {
639                                behaviour.dht.add_address(&peer_id, addr.clone());
640                            }
641                        }
642                        None
643                    },
644                    NetworkEventInternal::GossipEvent(e) => match *e {
645                        GossipEvent::Message {
646                            propagation_source: _peer_id,
647                            message_id: _id,
648                            message,
649                        } => Some(NetworkEvent::GossipMsg(message.data)),
650                        GossipEvent::Subscribed { peer_id, topic } => {
651                            debug!("Peer {peer_id:?} subscribed to topic {topic:?}");
652                            None
653                        },
654                        GossipEvent::Unsubscribed { peer_id, topic } => {
655                            debug!("Peer {peer_id:?} unsubscribed from topic {topic:?}");
656                            None
657                        },
658                        GossipEvent::GossipsubNotSupported { peer_id } => {
659                            warn!("Peer {peer_id:?} does not support gossipsub");
660                            None
661                        },
662                    },
663                    NetworkEventInternal::DMEvent(e) => self
664                        .direct_message_state
665                        .handle_dm_event(e, self.resend_tx.clone()),
666                    NetworkEventInternal::AutonatEvent(e) => {
667                        match e {
668                            autonat::Event::InboundProbe(_) => {},
669                            autonat::Event::OutboundProbe(e) => match e {
670                                autonat::OutboundProbeEvent::Request { .. }
671                                | autonat::OutboundProbeEvent::Response { .. } => {},
672                                autonat::OutboundProbeEvent::Error {
673                                    probe_id: _,
674                                    peer,
675                                    error,
676                                } => {
677                                    warn!(
678                                        "AutoNAT Probe failed to peer {peer:?} with error: {error:?}"
679                                    );
680                                },
681                            },
682                            autonat::Event::StatusChanged { old, new } => {
683                                debug!("AutoNAT Status changed. Old: {old:?}, New: {new:?}");
684                            },
685                        };
686                        None
687                    },
688                };
689
690                if let Some(event) = maybe_event {
691                    // forward messages directly to Client
692                    send_to_client
693                        .send(event)
694                        .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
695                }
696            },
697            SwarmEvent::OutgoingConnectionError {
698                connection_id: _,
699                peer_id,
700                error,
701            } => {
702                warn!("Outgoing connection error to {peer_id:?}: {error:?}");
703            },
704            SwarmEvent::IncomingConnectionError {
705                connection_id: _,
706                local_addr: _,
707                send_back_addr: _,
708                error,
709            } => {
710                warn!("Incoming connection error: {error:?}");
711            },
712            SwarmEvent::ListenerError {
713                listener_id: _,
714                error,
715            } => {
716                warn!("Listener error: {error:?}");
717            },
718            SwarmEvent::ExternalAddrConfirmed { address } => {
719                let my_id = *self.swarm.local_peer_id();
720                self.swarm
721                    .behaviour_mut()
722                    .dht
723                    .add_address(&my_id, address.clone());
724            },
725            SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
726                self.swarm
727                    .behaviour_mut()
728                    .dht
729                    .add_address(&peer_id, address.clone());
730            },
731            _ => {
732                debug!("Unhandled swarm event {event:?}");
733            },
734        }
735        Ok(())
736    }
737
738    /// Spawn a task to listen for requests on the returned channel
739    /// as well as any events produced by libp2p
740    ///
741    /// # Errors
742    /// - If we fail to create the channels or the bootstrap channel
743    pub fn spawn_listeners(
744        mut self,
745    ) -> Result<
746        (
747            UnboundedSender<ClientRequest>,
748            UnboundedReceiver<NetworkEvent>,
749        ),
750        NetworkError,
751    > {
752        let (s_input, mut s_output) = unbounded_channel::<ClientRequest>();
753        let (r_input, r_output) = unbounded_channel::<NetworkEvent>();
754        let (mut bootstrap_tx, bootstrap_rx) = mpsc::channel(100);
755        self.resend_tx = Some(s_input.clone());
756        self.dht_handler.set_bootstrap_sender(bootstrap_tx.clone());
757
758        DHTBootstrapTask::run(bootstrap_rx, s_input.clone());
759        spawn(
760            async move {
761                loop {
762                    select! {
763                        event = self.swarm.next() => {
764                            debug!("peerid {:?}\t\thandling maybe event {:?}", self.peer_id, event);
765                            if let Some(event) = event {
766                                debug!("peerid {:?}\t\thandling event {:?}", self.peer_id, event);
767                                self.handle_swarm_events(event, &r_input).await?;
768                            }
769                        },
770                        msg = s_output.recv() => {
771                            debug!("peerid {:?}\t\thandling msg {:?}", self.peer_id, msg);
772                            let shutdown = self.handle_client_requests(msg).await?;
773                            if shutdown {
774                                let _ = bootstrap_tx.send(InputEvent::ShutdownBootstrap).await;
775                                break
776                            }
777                        }
778                    }
779                }
780                Ok::<(), NetworkError>(())
781            }
782            .instrument(info_span!("Libp2p NetworkBehaviour Handler")),
783        );
784        Ok((s_input, r_output))
785    }
786
787    /// Get a reference to the network node's peer id.
788    pub fn peer_id(&self) -> PeerId {
789        self.peer_id
790    }
791}