hotshot_libp2p_networking/network/
node.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7/// configuration for the libp2p network (e.g. how it should be built)
8mod config;
9
10/// libp2p network handle
11/// allows for control over the libp2p network
12mod handle;
13
14use std::{
15    collections::{HashMap, HashSet},
16    iter,
17    num::{NonZeroU32, NonZeroUsize},
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22use bimap::BiMap;
23use futures::{channel::mpsc, SinkExt, StreamExt};
24use hotshot_types::{
25    constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, traits::node_implementation::NodeType,
26};
27use libp2p::{
28    autonat,
29    core::{transport::ListenerId, upgrade::Version::V1Lazy},
30    gossipsub::{
31        Behaviour as Gossipsub, ConfigBuilder as GossipsubConfigBuilder, Event as GossipEvent,
32        Message as GossipsubMessage, MessageAuthenticity, MessageId, Topic, ValidationMode,
33    },
34    identify::{
35        Behaviour as IdentifyBehaviour, Config as IdentifyConfig, Event as IdentifyEvent,
36        Info as IdentifyInfo,
37    },
38    identity::Keypair,
39    kad::{store::MemoryStore, Behaviour, Config, Mode, Record},
40    request_response::{
41        Behaviour as RequestResponse, Config as Libp2pRequestResponseConfig, ProtocolSupport,
42    },
43    swarm::SwarmEvent,
44    Multiaddr, StreamProtocol, Swarm, SwarmBuilder,
45};
46use libp2p_identity::PeerId;
47use parking_lot::Mutex;
48use rand::{prelude::SliceRandom, thread_rng};
49use tokio::{
50    select, spawn,
51    sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
52};
53use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
54
55pub use self::{
56    config::{
57        GossipConfig, NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeConfigBuilderError,
58        RequestResponseConfig, DEFAULT_REPLICATION_FACTOR,
59    },
60    handle::{spawn_network_node, NetworkNodeHandle, NetworkNodeReceiver},
61};
62use super::{
63    behaviours::dht::{
64        bootstrap::{DHTBootstrapTask, InputEvent},
65        store::{
66            persistent::{DhtPersistentStorage, PersistentStore},
67            validated::ValidatedStore,
68        },
69    },
70    cbor::Cbor,
71    gen_transport, BoxedTransport, ClientRequest, NetworkDef, NetworkError, NetworkEvent,
72    NetworkEventInternal,
73};
74use crate::network::behaviours::{
75    dht::{DHTBehaviour, DHTProgress, KadPutQuery},
76    direct_message::{DMBehaviour, DMRequest},
77    exponential_backoff::ExponentialBackoff,
78};
79
80/// Maximum size of a message
81pub const MAX_GOSSIP_MSG_SIZE: usize = 2_000_000_000;
82
83/// Wrapped num of connections
84pub const ESTABLISHED_LIMIT: NonZeroU32 = NonZeroU32::new(ESTABLISHED_LIMIT_UNWR).unwrap();
85/// Number of connections to a single peer before logging an error
86pub const ESTABLISHED_LIMIT_UNWR: u32 = 10;
87
88/// Network definition
89#[derive(derive_more::Debug)]
90pub struct NetworkNode<T: NodeType, D: DhtPersistentStorage> {
91    /// peer id of network node
92    peer_id: PeerId,
93    /// the swarm of networkbehaviours
94    #[debug(skip)]
95    swarm: Swarm<NetworkDef<T::SignatureKey, D>>,
96    /// The Kademlia record TTL
97    kademlia_record_ttl: Duration,
98    /// The map from consensus keys to peer IDs
99    consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
100    /// the listener id we are listening on, if it exists
101    listener_id: Option<ListenerId>,
102    /// Handler for direct messages
103    direct_message_state: DMBehaviour,
104    /// Handler for DHT Events
105    dht_handler: DHTBehaviour<T::SignatureKey, D>,
106    /// Channel to resend requests, set to Some when we call `spawn_listeners`
107    resend_tx: Option<UnboundedSender<ClientRequest>>,
108}
109
110impl<T: NodeType, D: DhtPersistentStorage> NetworkNode<T, D> {
111    /// Returns number of peers this node is connected to
112    pub fn num_connected(&self) -> usize {
113        self.swarm.connected_peers().count()
114    }
115
116    /// return hashset of PIDs this node is connected to
117    pub fn connected_pids(&self) -> HashSet<PeerId> {
118        self.swarm.connected_peers().copied().collect()
119    }
120
121    /// starts the swarm listening on `listen_addr`
122    /// and optionally dials into peer `known_peer`
123    /// returns the address the swarm is listening upon
124    #[instrument(skip(self))]
125    pub async fn start_listen(
126        &mut self,
127        listen_addr: Multiaddr,
128    ) -> Result<Multiaddr, NetworkError> {
129        self.listener_id = Some(self.swarm.listen_on(listen_addr).map_err(|err| {
130            NetworkError::ListenError(format!("failed to listen for Libp2p: {err}"))
131        })?);
132        let addr = loop {
133            if let Some(SwarmEvent::NewListenAddr { address, .. }) = self.swarm.next().await {
134                break address;
135            }
136        };
137        info!("Libp2p listening on {addr:?}");
138        Ok(addr)
139    }
140
141    /// initialize the DHT with known peers
142    /// add the peers to kademlia and then
143    /// the `spawn_listeners` function
144    /// will start connecting to peers
145    #[instrument(skip(self))]
146    pub fn add_known_peers(&mut self, known_peers: &[(PeerId, Multiaddr)]) {
147        debug!("Adding {} known peers", known_peers.len());
148        let behaviour = self.swarm.behaviour_mut();
149        let mut bs_nodes = HashMap::<PeerId, HashSet<Multiaddr>>::new();
150        let mut shuffled = known_peers.iter().collect::<Vec<_>>();
151        shuffled.shuffle(&mut thread_rng());
152        for (peer_id, addr) in shuffled {
153            if *peer_id != self.peer_id {
154                behaviour.dht.add_address(peer_id, addr.clone());
155                behaviour.autonat.add_server(*peer_id, Some(addr.clone()));
156                bs_nodes.insert(*peer_id, iter::once(addr.clone()).collect());
157            }
158        }
159    }
160
161    /// Creates a new `Network` with the given settings.
162    ///
163    /// Currently:
164    ///   * Generates a random key pair and associated [`PeerId`]
165    ///   * Launches a hopefully production ready transport: QUIC v1 (RFC 9000) + DNS
166    ///   * Generates a connection to the "broadcast" topic
167    ///   * Creates a swarm to manage peers and events
168    ///
169    /// # Errors
170    /// - If we fail to generate the transport or any of the behaviours
171    ///
172    /// # Panics
173    /// If 5 < 0
174    #[allow(clippy::too_many_lines)]
175    pub async fn new(
176        config: NetworkNodeConfig<T>,
177        dht_persistent_storage: D,
178        consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
179    ) -> Result<Self, NetworkError> {
180        // Generate a random `KeyPair` if one is not specified
181        let keypair = config
182            .keypair
183            .clone()
184            .unwrap_or_else(Keypair::generate_ed25519);
185
186        // Get the `PeerId` from the `KeyPair`
187        let peer_id = PeerId::from(keypair.public());
188
189        // Generate the transport from the keypair, membership, and auth message
190        let transport: BoxedTransport = gen_transport::<T>(
191            keypair.clone(),
192            config.membership.clone(),
193            config.auth_message.clone(),
194            Arc::clone(&consensus_key_to_pid_map),
195        )
196        .await?;
197
198        // Calculate the record republication interval
199        let kademlia_record_republication_interval = config
200            .republication_interval
201            .unwrap_or(Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC));
202
203        // Calculate the Kademlia record TTL
204        let kademlia_ttl = config
205            .ttl
206            .unwrap_or(16 * kademlia_record_republication_interval);
207
208        // Generate the swarm
209        let mut swarm: Swarm<NetworkDef<T::SignatureKey, D>> = {
210            // Use the `Blake3` hash of the message's contents as the ID
211            let message_id_fn = |message: &GossipsubMessage| {
212                let hash = blake3::hash(&message.data);
213                MessageId::from(hash.as_bytes().to_vec())
214            };
215
216            // Derive a `Gossipsub` config from our gossip config
217            let gossipsub_config = GossipsubConfigBuilder::default()
218                .message_id_fn(message_id_fn) // Use the (blake3) hash of a message as its ID
219                .validation_mode(ValidationMode::Strict) // Force all messages to have valid signatures
220                .heartbeat_interval(config.gossip_config.heartbeat_interval) // Time between gossip heartbeats
221                .history_gossip(config.gossip_config.history_gossip) // Number of heartbeats to gossip about
222                .history_length(config.gossip_config.history_length) // Number of heartbeats to remember the full message for
223                .mesh_n(config.gossip_config.mesh_n) // Target number of mesh peers
224                .mesh_n_high(config.gossip_config.mesh_n_high) // Upper limit of mesh peers
225                .mesh_n_low(config.gossip_config.mesh_n_low) // Lower limit of mesh peers
226                .mesh_outbound_min(config.gossip_config.mesh_outbound_min) // Minimum number of outbound peers in mesh
227                .max_transmit_size(config.gossip_config.max_transmit_size) // Maximum size of a message
228                .max_ihave_length(config.gossip_config.max_ihave_length) // Maximum number of messages to include in an IHAVE message
229                .max_ihave_messages(config.gossip_config.max_ihave_messages) // Maximum number of IHAVE messages to accept from a peer within a heartbeat
230                .published_message_ids_cache_time(
231                    config.gossip_config.published_message_ids_cache_time,
232                ) // Cache duration for published message IDs
233                .iwant_followup_time(config.gossip_config.iwant_followup_time) // Time to wait for a message requested through IWANT following an IHAVE advertisement
234                .max_messages_per_rpc(config.gossip_config.max_messages_per_rpc) // The maximum number of messages we will process in a given RPC
235                .gossip_retransimission(config.gossip_config.gossip_retransmission) // Controls how many times we will allow a peer to request the same message id through IWANT gossip before we start ignoring them.
236                .flood_publish(config.gossip_config.flood_publish) // If enabled newly created messages will always be sent to all peers that are subscribed to the topic and have a good enough score.
237                .duplicate_cache_time(config.gossip_config.duplicate_cache_time) // The time period that messages are stored in the cache
238                .fanout_ttl(config.gossip_config.fanout_ttl) // Time to live for fanout peers
239                .heartbeat_initial_delay(config.gossip_config.heartbeat_initial_delay) // Initial delay in each heartbeat
240                .gossip_factor(config.gossip_config.gossip_factor) // Affects how many peers we will emit gossip to at each heartbeat
241                .gossip_lazy(config.gossip_config.gossip_lazy) // Minimum number of peers to emit gossip to during a heartbeat
242                .build()
243                .map_err(|err| {
244                    NetworkError::ConfigError(format!("error building gossipsub config: {err:?}"))
245                })?;
246
247            // - Build a gossipsub network behavior
248            let gossipsub: Gossipsub = Gossipsub::new(
249                MessageAuthenticity::Signed(keypair.clone()),
250                gossipsub_config,
251            )
252            .map_err(|err| {
253                NetworkError::ConfigError(format!("error building gossipsub behaviour: {err:?}"))
254            })?;
255
256            //   Build a identify network behavior needed for own
257            //   node connection information
258            //   E.g. this will answer the question: how are other nodes
259            //   seeing the peer from behind a NAT
260            let identify_cfg =
261                IdentifyConfig::new("HotShot/identify/1.0".to_string(), keypair.public());
262            let identify = IdentifyBehaviour::new(identify_cfg);
263
264            // - Build DHT needed for peer discovery
265            let mut kconfig = Config::new(StreamProtocol::new("/ipfs/kad/1.0.0"));
266            kconfig
267                .set_parallelism(NonZeroUsize::new(5).unwrap())
268                .set_provider_publication_interval(Some(kademlia_record_republication_interval))
269                .set_publication_interval(Some(kademlia_record_republication_interval))
270                .set_record_ttl(Some(kademlia_ttl));
271
272            // allowing panic here because something is very wrong if this fales
273            #[allow(clippy::panic)]
274            if let Some(factor) = config.replication_factor {
275                kconfig.set_replication_factor(factor);
276            } else {
277                panic!("Replication factor not set");
278            }
279
280            // Create the DHT behaviour with the given persistent storage
281            let mut kadem = Behaviour::with_config(
282                peer_id,
283                PersistentStore::new(
284                    ValidatedStore::new(MemoryStore::new(peer_id)),
285                    dht_persistent_storage,
286                    5,
287                )
288                .await,
289                kconfig,
290            );
291            kadem.set_mode(Some(Mode::Server));
292
293            let rrconfig = Libp2pRequestResponseConfig::default();
294
295            // Create a new `cbor` codec with the given request and response sizes
296            let cbor = Cbor::new(
297                config.request_response_config.request_size_maximum,
298                config.request_response_config.response_size_maximum,
299            );
300
301            let direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>> =
302                RequestResponse::with_codec(
303                    cbor,
304                    [(
305                        StreamProtocol::new("/HotShot/direct_message/1.0"),
306                        ProtocolSupport::Full,
307                    )]
308                    .into_iter(),
309                    rrconfig.clone(),
310                );
311
312            let autonat_config = autonat::Config {
313                only_global_ips: false,
314                ..Default::default()
315            };
316
317            let network = NetworkDef::new(
318                gossipsub,
319                kadem,
320                identify,
321                direct_message,
322                autonat::Behaviour::new(peer_id, autonat_config),
323            );
324
325            // build swarm
326            let swarm = SwarmBuilder::with_existing_identity(keypair.clone());
327            let swarm = swarm.with_tokio();
328
329            swarm
330                .with_other_transport(|_| transport)
331                .unwrap()
332                .with_behaviour(|_| network)
333                .unwrap()
334                .with_swarm_config(|cfg| {
335                    cfg.with_idle_connection_timeout(Duration::from_secs(10))
336                        .with_substream_upgrade_protocol_override(V1Lazy)
337                })
338                .build()
339        };
340        for (peer, addr) in &config.to_connect_addrs {
341            if peer != swarm.local_peer_id() {
342                swarm.behaviour_mut().add_address(peer, addr.clone());
343                swarm.add_peer_address(*peer, addr.clone());
344            }
345        }
346
347        Ok(Self {
348            peer_id,
349            swarm,
350            kademlia_record_ttl: kademlia_ttl,
351            consensus_key_to_pid_map,
352            listener_id: None,
353            direct_message_state: DMBehaviour::default(),
354            dht_handler: DHTBehaviour::new(
355                peer_id,
356                config
357                    .replication_factor
358                    .unwrap_or(NonZeroUsize::new(4).unwrap()),
359            ),
360            resend_tx: None,
361        })
362    }
363
364    /// Publish a key/value to the record store.
365    ///
366    /// # Panics
367    /// If the default replication factor is `None`
368    pub fn put_record(&mut self, mut query: KadPutQuery) {
369        // Create the new record
370        let mut record = Record::new(query.key.clone(), query.value.clone());
371
372        // Set the record's expiration time to the proper time
373        record.expires = Some(Instant::now() + self.kademlia_record_ttl);
374
375        match self.swarm.behaviour_mut().dht.put_record(
376            record,
377            libp2p::kad::Quorum::N(
378                NonZeroUsize::try_from(self.dht_handler.replication_factor().get() / 2)
379                    .expect("replication factor should be bigger than 0"),
380            ),
381        ) {
382            Err(e) => {
383                // failed try again later
384                query.progress = DHTProgress::NotStarted;
385                query.backoff.start_next(false);
386                error!("Error publishing to DHT: {e:?} for peer {:?}", self.peer_id);
387            },
388            Ok(qid) => {
389                debug!("Published record to DHT with qid {qid:?}");
390                let query = KadPutQuery {
391                    progress: DHTProgress::InProgress(qid),
392                    ..query
393                };
394                self.dht_handler.put_record(qid, query);
395            },
396        }
397    }
398
399    /// event handler for client events
400    /// currently supported actions include
401    /// - shutting down the swarm
402    /// - gossipping a message to known peers on the `global` topic
403    /// - returning the id of the current peer
404    /// - subscribing to a topic
405    /// - unsubscribing from a toipc
406    /// - direct messaging a peer
407    #[instrument(skip(self))]
408    async fn handle_client_requests(
409        &mut self,
410        msg: Option<ClientRequest>,
411    ) -> Result<bool, NetworkError> {
412        let behaviour = self.swarm.behaviour_mut();
413        match msg {
414            Some(msg) => {
415                match msg {
416                    ClientRequest::BeginBootstrap => {
417                        debug!("Beginning Libp2p bootstrap");
418                        let _ = self.swarm.behaviour_mut().dht.bootstrap();
419                    },
420                    ClientRequest::LookupPeer(pid, chan) => {
421                        let id = self.swarm.behaviour_mut().dht.get_closest_peers(pid);
422                        self.dht_handler
423                            .in_progress_get_closest_peers
424                            .insert(id, chan);
425                    },
426                    ClientRequest::GetRoutingTable(chan) => {
427                        self.dht_handler
428                            .print_routing_table(&mut self.swarm.behaviour_mut().dht);
429                        if chan.send(()).is_err() {
430                            warn!("Tried to notify client but client not tracking anymore");
431                        }
432                    },
433                    ClientRequest::PutDHT { key, value, notify } => {
434                        let query = KadPutQuery {
435                            progress: DHTProgress::NotStarted,
436                            notify,
437                            key,
438                            value,
439                            backoff: ExponentialBackoff::default(),
440                        };
441                        self.put_record(query);
442                    },
443                    ClientRequest::GetConnectedPeerNum(s) => {
444                        if s.send(self.num_connected()).is_err() {
445                            error!("error sending peer number to client");
446                        }
447                    },
448                    ClientRequest::GetConnectedPeers(s) => {
449                        if s.send(self.connected_pids()).is_err() {
450                            error!("error sending peer set to client");
451                        }
452                    },
453                    ClientRequest::GetDHT {
454                        key,
455                        notify,
456                        retry_count,
457                    } => {
458                        self.dht_handler.get_record(
459                            key,
460                            notify,
461                            ExponentialBackoff::default(),
462                            retry_count,
463                            &mut self.swarm.behaviour_mut().dht,
464                        );
465                    },
466                    ClientRequest::IgnorePeers(_peers) => {
467                        // NOTE used by test with conductor only
468                    },
469                    ClientRequest::Shutdown => {
470                        if let Some(listener_id) = self.listener_id {
471                            self.swarm.remove_listener(listener_id);
472                        }
473
474                        return Ok(true);
475                    },
476                    ClientRequest::GossipMsg(topic, contents) => {
477                        behaviour.publish_gossip(Topic::new(topic.clone()), contents.clone());
478                    },
479                    ClientRequest::Subscribe(t, chan) => {
480                        behaviour.subscribe_gossip(&t);
481                        if let Some(chan) = chan {
482                            if chan.send(()).is_err() {
483                                error!("finished subscribing but response channel dropped");
484                            }
485                        }
486                    },
487                    ClientRequest::Unsubscribe(t, chan) => {
488                        behaviour.unsubscribe_gossip(&t);
489                        if let Some(chan) = chan {
490                            if chan.send(()).is_err() {
491                                error!("finished unsubscribing but response channel dropped");
492                            }
493                        }
494                    },
495                    ClientRequest::DirectRequest {
496                        pid,
497                        contents,
498                        retry_count,
499                    } => {
500                        debug!("Sending direct request to {pid:?}");
501                        let id = behaviour.add_direct_request(pid, contents.clone());
502                        let req = DMRequest {
503                            peer_id: pid,
504                            data: contents,
505                            backoff: ExponentialBackoff::default(),
506                            retry_count,
507                        };
508                        self.direct_message_state.add_direct_request(req, id);
509                    },
510                    ClientRequest::DirectResponse(chan, msg) => {
511                        behaviour.add_direct_response(chan, msg);
512                    },
513                    ClientRequest::AddKnownPeers(peers) => {
514                        self.add_known_peers(&peers);
515                    },
516                    ClientRequest::Prune(pid) => {
517                        if self.swarm.disconnect_peer_id(pid).is_err() {
518                            warn!("Could not disconnect from {pid:?}");
519                        }
520                    },
521                }
522            },
523            None => {
524                error!("Error receiving msg in main behaviour loop: channel closed");
525            },
526        }
527        Ok(false)
528    }
529
530    /// event handler for events emitted from the swarm
531    #[allow(clippy::type_complexity)]
532    #[instrument(skip(self))]
533    async fn handle_swarm_events(
534        &mut self,
535        event: SwarmEvent<NetworkEventInternal>,
536        send_to_client: &UnboundedSender<NetworkEvent>,
537    ) -> Result<(), NetworkError> {
538        // Make the match cleaner
539        debug!("Swarm event observed {:?}", event);
540
541        #[allow(deprecated)]
542        match event {
543            SwarmEvent::ConnectionEstablished {
544                connection_id: _,
545                peer_id,
546                endpoint,
547                num_established,
548                concurrent_dial_errors,
549                established_in: _established_in,
550            } => {
551                if num_established > ESTABLISHED_LIMIT {
552                    error!(
553                        "Num concurrent connections to a single peer exceeding \
554                         {ESTABLISHED_LIMIT:?} at {num_established:?}!"
555                    );
556                } else {
557                    debug!(
558                        "Connection established with {peer_id:?} at {endpoint:?} with \
559                         {concurrent_dial_errors:?} concurrent dial errors"
560                    );
561                }
562
563                // Send the number of connected peers to the client
564                send_to_client
565                    .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
566                    .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
567            },
568            SwarmEvent::ConnectionClosed {
569                connection_id: _,
570                peer_id,
571                endpoint,
572                num_established,
573                cause,
574            } => {
575                if num_established > ESTABLISHED_LIMIT_UNWR {
576                    error!(
577                        "Num concurrent connections to a single peer exceeding \
578                         {ESTABLISHED_LIMIT:?} at {num_established:?}!"
579                    );
580                } else {
581                    debug!("Connection closed with {peer_id:?} at {endpoint:?} due to {cause:?}");
582                }
583
584                // If we are no longer connected to the peer, remove the consensus key from the map
585                if num_established == 0 {
586                    self.consensus_key_to_pid_map
587                        .lock()
588                        .remove_by_right(&peer_id);
589                }
590
591                // Send the number of connected peers to the client
592                send_to_client
593                    .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
594                    .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
595            },
596            SwarmEvent::Dialing {
597                peer_id,
598                connection_id: _,
599            } => {
600                debug!("Attempting to dial {peer_id:?}");
601            },
602            SwarmEvent::ListenerClosed {
603                listener_id: _,
604                addresses: _,
605                reason: _,
606            }
607            | SwarmEvent::NewListenAddr {
608                listener_id: _,
609                address: _,
610            }
611            | SwarmEvent::ExpiredListenAddr {
612                listener_id: _,
613                address: _,
614            }
615            | SwarmEvent::NewExternalAddrCandidate { .. }
616            | SwarmEvent::ExternalAddrExpired { .. }
617            | SwarmEvent::IncomingConnection {
618                connection_id: _,
619                local_addr: _,
620                send_back_addr: _,
621            } => {},
622            SwarmEvent::Behaviour(b) => {
623                let maybe_event = match b {
624                    NetworkEventInternal::DHTEvent(e) => self
625                        .dht_handler
626                        .dht_handle_event(e, self.swarm.behaviour_mut().dht.store_mut()),
627                    NetworkEventInternal::IdentifyEvent(e) => {
628                        // NOTE feed identified peers into kademlia's routing table for peer discovery.
629                        if let IdentifyEvent::Received {
630                            peer_id,
631                            info:
632                                IdentifyInfo {
633                                    listen_addrs,
634                                    protocols: _,
635                                    public_key: _,
636                                    protocol_version: _,
637                                    agent_version: _,
638                                    observed_addr: _,
639                                },
640                            connection_id: _,
641                        } = *e
642                        {
643                            let behaviour = self.swarm.behaviour_mut();
644
645                            // into hashset to delete duplicates (I checked: there are duplicates)
646                            for addr in listen_addrs.iter().collect::<HashSet<_>>() {
647                                behaviour.dht.add_address(&peer_id, addr.clone());
648                            }
649                        }
650                        None
651                    },
652                    NetworkEventInternal::GossipEvent(e) => match *e {
653                        GossipEvent::Message {
654                            propagation_source: _peer_id,
655                            message_id: _id,
656                            message,
657                        } => Some(NetworkEvent::GossipMsg(message.data)),
658                        GossipEvent::Subscribed { peer_id, topic } => {
659                            debug!("Peer {peer_id:?} subscribed to topic {topic:?}");
660                            None
661                        },
662                        GossipEvent::Unsubscribed { peer_id, topic } => {
663                            debug!("Peer {peer_id:?} unsubscribed from topic {topic:?}");
664                            None
665                        },
666                        GossipEvent::GossipsubNotSupported { peer_id } => {
667                            warn!("Peer {peer_id:?} does not support gossipsub");
668                            None
669                        },
670                    },
671                    NetworkEventInternal::DMEvent(e) => self
672                        .direct_message_state
673                        .handle_dm_event(e, self.resend_tx.clone()),
674                    NetworkEventInternal::AutonatEvent(e) => {
675                        match e {
676                            autonat::Event::InboundProbe(_) => {},
677                            autonat::Event::OutboundProbe(e) => match e {
678                                autonat::OutboundProbeEvent::Request { .. }
679                                | autonat::OutboundProbeEvent::Response { .. } => {},
680                                autonat::OutboundProbeEvent::Error {
681                                    probe_id: _,
682                                    peer,
683                                    error,
684                                } => {
685                                    warn!(
686                                        "AutoNAT Probe failed to peer {peer:?} with error: \
687                                         {error:?}"
688                                    );
689                                },
690                            },
691                            autonat::Event::StatusChanged { old, new } => {
692                                debug!("AutoNAT Status changed. Old: {old:?}, New: {new:?}");
693                            },
694                        };
695                        None
696                    },
697                };
698
699                if let Some(event) = maybe_event {
700                    // forward messages directly to Client
701                    send_to_client
702                        .send(event)
703                        .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
704                }
705            },
706            SwarmEvent::OutgoingConnectionError {
707                connection_id: _,
708                peer_id,
709                error,
710            } => {
711                warn!("Outgoing connection error to {peer_id:?}: {error:?}");
712            },
713            SwarmEvent::IncomingConnectionError {
714                connection_id: _,
715                local_addr: _,
716                send_back_addr: _,
717                error,
718            } => {
719                warn!("Incoming connection error: {error:?}");
720            },
721            SwarmEvent::ListenerError {
722                listener_id: _,
723                error,
724            } => {
725                warn!("Listener error: {error:?}");
726            },
727            SwarmEvent::ExternalAddrConfirmed { address } => {
728                let my_id = *self.swarm.local_peer_id();
729                self.swarm
730                    .behaviour_mut()
731                    .dht
732                    .add_address(&my_id, address.clone());
733            },
734            SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
735                self.swarm
736                    .behaviour_mut()
737                    .dht
738                    .add_address(&peer_id, address.clone());
739            },
740            _ => {
741                debug!("Unhandled swarm event {event:?}");
742            },
743        }
744        Ok(())
745    }
746
747    /// Spawn a task to listen for requests on the returned channel
748    /// as well as any events produced by libp2p
749    ///
750    /// # Errors
751    /// - If we fail to create the channels or the bootstrap channel
752    pub fn spawn_listeners(
753        mut self,
754    ) -> Result<
755        (
756            UnboundedSender<ClientRequest>,
757            UnboundedReceiver<NetworkEvent>,
758        ),
759        NetworkError,
760    > {
761        let (s_input, mut s_output) = unbounded_channel::<ClientRequest>();
762        let (r_input, r_output) = unbounded_channel::<NetworkEvent>();
763        let (mut bootstrap_tx, bootstrap_rx) = mpsc::channel(100);
764        self.resend_tx = Some(s_input.clone());
765        self.dht_handler.set_bootstrap_sender(bootstrap_tx.clone());
766
767        DHTBootstrapTask::run(bootstrap_rx, s_input.clone());
768        spawn(
769            async move {
770                loop {
771                    select! {
772                        event = self.swarm.next() => {
773                            debug!("peerid {:?}\t\thandling maybe event {:?}", self.peer_id, event);
774                            if let Some(event) = event {
775                                debug!("peerid {:?}\t\thandling event {:?}", self.peer_id, event);
776                                self.handle_swarm_events(event, &r_input).await?;
777                            }
778                        },
779                        msg = s_output.recv() => {
780                            debug!("peerid {:?}\t\thandling msg {:?}", self.peer_id, msg);
781                            let shutdown = self.handle_client_requests(msg).await?;
782                            if shutdown {
783                                let _ = bootstrap_tx.send(InputEvent::ShutdownBootstrap).await;
784                                break
785                            }
786                        }
787                    }
788                }
789                Ok::<(), NetworkError>(())
790            }
791            .instrument(info_span!("Libp2p NetworkBehaviour Handler")),
792        );
793        Ok((s_input, r_output))
794    }
795
796    /// Get a reference to the network node's peer id.
797    pub fn peer_id(&self) -> PeerId {
798        self.peer_id
799    }
800}