hotshot_libp2p_networking/network/
node.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7/// configuration for the libp2p network (e.g. how it should be built)
8mod config;
9
10/// libp2p network handle
11/// allows for control over the libp2p network
12mod handle;
13
14use std::{
15    collections::{HashMap, HashSet},
16    iter,
17    num::{NonZeroU32, NonZeroUsize},
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22use bimap::BiMap;
23use futures::{SinkExt, StreamExt, channel::mpsc};
24use hotshot_types::{
25    constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, traits::node_implementation::NodeType,
26};
27use libp2p::{
28    Multiaddr, StreamProtocol, Swarm, SwarmBuilder, autonat,
29    core::{transport::ListenerId, upgrade::Version::V1Lazy},
30    gossipsub::{
31        Behaviour as Gossipsub, ConfigBuilder as GossipsubConfigBuilder, Event as GossipEvent,
32        Message as GossipsubMessage, MessageAuthenticity, MessageId, Topic, ValidationMode,
33    },
34    identify::{
35        Behaviour as IdentifyBehaviour, Config as IdentifyConfig, Event as IdentifyEvent,
36        Info as IdentifyInfo,
37    },
38    identity::Keypair,
39    kad::{Behaviour, Config, Mode, Record, store::MemoryStore},
40    request_response::{
41        Behaviour as RequestResponse, Config as Libp2pRequestResponseConfig, ProtocolSupport,
42    },
43    swarm::SwarmEvent,
44};
45use libp2p_identity::PeerId;
46use parking_lot::Mutex;
47use rand::{prelude::SliceRandom, thread_rng};
48use tokio::{
49    select, spawn,
50    sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
51};
52use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
53
54pub use self::{
55    config::{
56        DEFAULT_REPLICATION_FACTOR, GossipConfig, NetworkNodeConfig, NetworkNodeConfigBuilder,
57        NetworkNodeConfigBuilderError, RequestResponseConfig,
58    },
59    handle::{NetworkNodeHandle, NetworkNodeReceiver, spawn_network_node},
60};
61use super::{
62    BoxedTransport, ClientRequest, NetworkDef, NetworkError, NetworkEvent, NetworkEventInternal,
63    behaviours::dht::{
64        bootstrap::{DHTBootstrapTask, InputEvent},
65        store::{
66            persistent::{DhtPersistentStorage, PersistentStore},
67            validated::ValidatedStore,
68        },
69    },
70    cbor::Cbor,
71    gen_transport,
72};
73use crate::network::behaviours::{
74    dht::{DHTBehaviour, DHTProgress, KadPutQuery},
75    direct_message::{DMBehaviour, DMRequest},
76    exponential_backoff::ExponentialBackoff,
77};
78
79/// Maximum size of a message
80pub const MAX_GOSSIP_MSG_SIZE: usize = 2_000_000_000;
81
82/// Wrapped num of connections
83pub const ESTABLISHED_LIMIT: NonZeroU32 = NonZeroU32::new(ESTABLISHED_LIMIT_UNWR).unwrap();
84/// Number of connections to a single peer before logging an error
85pub const ESTABLISHED_LIMIT_UNWR: u32 = 10;
86
87/// Network definition
88#[derive(derive_more::Debug)]
89pub struct NetworkNode<T: NodeType, D: DhtPersistentStorage> {
90    /// peer id of network node
91    peer_id: PeerId,
92    /// the swarm of networkbehaviours
93    #[debug(skip)]
94    swarm: Swarm<NetworkDef<T::SignatureKey, D>>,
95    /// The Kademlia record TTL
96    kademlia_record_ttl: Duration,
97    /// The map from consensus keys to peer IDs
98    consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
99    /// the listener id we are listening on, if it exists
100    listener_id: Option<ListenerId>,
101    /// Handler for direct messages
102    direct_message_state: DMBehaviour,
103    /// Handler for DHT Events
104    dht_handler: DHTBehaviour<T::SignatureKey, D>,
105    /// Channel to resend requests, set to Some when we call `spawn_listeners`
106    resend_tx: Option<UnboundedSender<ClientRequest>>,
107}
108
109impl<T: NodeType, D: DhtPersistentStorage> NetworkNode<T, D> {
110    /// Returns number of peers this node is connected to
111    pub fn num_connected(&self) -> usize {
112        self.swarm.connected_peers().count()
113    }
114
115    /// return hashset of PIDs this node is connected to
116    pub fn connected_pids(&self) -> HashSet<PeerId> {
117        self.swarm.connected_peers().copied().collect()
118    }
119
120    /// starts the swarm listening on `listen_addr`
121    /// and optionally dials into peer `known_peer`
122    /// returns the address the swarm is listening upon
123    #[instrument(skip(self))]
124    pub async fn start_listen(
125        &mut self,
126        listen_addr: Multiaddr,
127    ) -> Result<Multiaddr, NetworkError> {
128        self.listener_id = Some(self.swarm.listen_on(listen_addr).map_err(|err| {
129            NetworkError::ListenError(format!("failed to listen for Libp2p: {err}"))
130        })?);
131        let addr = loop {
132            if let Some(SwarmEvent::NewListenAddr { address, .. }) = self.swarm.next().await {
133                break address;
134            }
135        };
136        info!("Libp2p listening on {addr:?}");
137        Ok(addr)
138    }
139
140    /// initialize the DHT with known peers
141    /// add the peers to kademlia and then
142    /// the `spawn_listeners` function
143    /// will start connecting to peers
144    #[instrument(skip(self))]
145    pub fn add_known_peers(&mut self, known_peers: &[(PeerId, Multiaddr)]) {
146        debug!("Adding {} known peers", known_peers.len());
147        let behaviour = self.swarm.behaviour_mut();
148        let mut bs_nodes = HashMap::<PeerId, HashSet<Multiaddr>>::new();
149        let mut shuffled = known_peers.iter().collect::<Vec<_>>();
150        shuffled.shuffle(&mut thread_rng());
151        for (peer_id, addr) in shuffled {
152            if *peer_id != self.peer_id {
153                behaviour.dht.add_address(peer_id, addr.clone());
154                behaviour.autonat.add_server(*peer_id, Some(addr.clone()));
155                bs_nodes.insert(*peer_id, iter::once(addr.clone()).collect());
156            }
157        }
158    }
159
160    /// Creates a new `Network` with the given settings.
161    ///
162    /// Currently:
163    ///   * Generates a random key pair and associated [`PeerId`]
164    ///   * Launches a hopefully production ready transport: QUIC v1 (RFC 9000) + DNS
165    ///   * Generates a connection to the "broadcast" topic
166    ///   * Creates a swarm to manage peers and events
167    ///
168    /// # Errors
169    /// - If we fail to generate the transport or any of the behaviours
170    ///
171    /// # Panics
172    /// If 5 < 0
173    #[allow(clippy::too_many_lines)]
174    pub async fn new(
175        config: NetworkNodeConfig,
176        dht_persistent_storage: D,
177        consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
178    ) -> Result<Self, NetworkError> {
179        // Generate a random `KeyPair` if one is not specified
180        let keypair = config
181            .keypair
182            .clone()
183            .unwrap_or_else(Keypair::generate_ed25519);
184
185        // Get the `PeerId` from the `KeyPair`
186        let peer_id = PeerId::from(keypair.public());
187
188        // Generate the transport from the keypair and auth message
189        let transport: BoxedTransport = gen_transport::<T>(
190            keypair.clone(),
191            config.auth_message.clone(),
192            Arc::clone(&consensus_key_to_pid_map),
193        )
194        .await?;
195
196        // Calculate the record republication interval
197        let kademlia_record_republication_interval = config
198            .republication_interval
199            .unwrap_or(Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC));
200
201        // Calculate the Kademlia record TTL
202        let kademlia_ttl = config
203            .ttl
204            .unwrap_or(16 * kademlia_record_republication_interval);
205
206        // Generate the swarm
207        let mut swarm: Swarm<NetworkDef<T::SignatureKey, D>> = {
208            // Use the `Blake3` hash of the message's contents as the ID
209            let message_id_fn = |message: &GossipsubMessage| {
210                let hash = blake3::hash(&message.data);
211                MessageId::from(hash.as_bytes().to_vec())
212            };
213
214            // Derive a `Gossipsub` config from our gossip config
215            let gossipsub_config = GossipsubConfigBuilder::default()
216                .message_id_fn(message_id_fn) // Use the (blake3) hash of a message as its ID
217                .validation_mode(ValidationMode::Strict) // Force all messages to have valid signatures
218                .heartbeat_interval(config.gossip_config.heartbeat_interval) // Time between gossip heartbeats
219                .history_gossip(config.gossip_config.history_gossip) // Number of heartbeats to gossip about
220                .history_length(config.gossip_config.history_length) // Number of heartbeats to remember the full message for
221                .mesh_n(config.gossip_config.mesh_n) // Target number of mesh peers
222                .mesh_n_high(config.gossip_config.mesh_n_high) // Upper limit of mesh peers
223                .mesh_n_low(config.gossip_config.mesh_n_low) // Lower limit of mesh peers
224                .mesh_outbound_min(config.gossip_config.mesh_outbound_min) // Minimum number of outbound peers in mesh
225                .max_transmit_size(config.gossip_config.max_transmit_size) // Maximum size of a message
226                .max_ihave_length(config.gossip_config.max_ihave_length) // Maximum number of messages to include in an IHAVE message
227                .max_ihave_messages(config.gossip_config.max_ihave_messages) // Maximum number of IHAVE messages to accept from a peer within a heartbeat
228                .published_message_ids_cache_time(
229                    config.gossip_config.published_message_ids_cache_time,
230                ) // Cache duration for published message IDs
231                .iwant_followup_time(config.gossip_config.iwant_followup_time) // Time to wait for a message requested through IWANT following an IHAVE advertisement
232                .max_messages_per_rpc(config.gossip_config.max_messages_per_rpc) // The maximum number of messages we will process in a given RPC
233                .gossip_retransimission(config.gossip_config.gossip_retransmission) // Controls how many times we will allow a peer to request the same message id through IWANT gossip before we start ignoring them.
234                .flood_publish(config.gossip_config.flood_publish) // If enabled newly created messages will always be sent to all peers that are subscribed to the topic and have a good enough score.
235                .duplicate_cache_time(config.gossip_config.duplicate_cache_time) // The time period that messages are stored in the cache
236                .fanout_ttl(config.gossip_config.fanout_ttl) // Time to live for fanout peers
237                .heartbeat_initial_delay(config.gossip_config.heartbeat_initial_delay) // Initial delay in each heartbeat
238                .gossip_factor(config.gossip_config.gossip_factor) // Affects how many peers we will emit gossip to at each heartbeat
239                .gossip_lazy(config.gossip_config.gossip_lazy) // Minimum number of peers to emit gossip to during a heartbeat
240                .build()
241                .map_err(|err| {
242                    NetworkError::ConfigError(format!("error building gossipsub config: {err:?}"))
243                })?;
244
245            // - Build a gossipsub network behavior
246            let gossipsub: Gossipsub = Gossipsub::new(
247                MessageAuthenticity::Signed(keypair.clone()),
248                gossipsub_config,
249            )
250            .map_err(|err| {
251                NetworkError::ConfigError(format!("error building gossipsub behaviour: {err:?}"))
252            })?;
253
254            //   Build a identify network behavior needed for own
255            //   node connection information
256            //   E.g. this will answer the question: how are other nodes
257            //   seeing the peer from behind a NAT
258            let identify_cfg =
259                IdentifyConfig::new("HotShot/identify/1.0".to_string(), keypair.public());
260            let identify = IdentifyBehaviour::new(identify_cfg);
261
262            // - Build DHT needed for peer discovery
263            let mut kconfig = Config::new(StreamProtocol::new("/ipfs/kad/1.0.0"));
264            kconfig
265                .set_parallelism(NonZeroUsize::new(5).unwrap())
266                .set_provider_publication_interval(Some(kademlia_record_republication_interval))
267                .set_publication_interval(Some(kademlia_record_republication_interval))
268                .set_record_ttl(Some(kademlia_ttl));
269
270            // allowing panic here because something is very wrong if this fales
271            #[allow(clippy::panic)]
272            if let Some(factor) = config.replication_factor {
273                kconfig.set_replication_factor(factor);
274            } else {
275                panic!("Replication factor not set");
276            }
277
278            // Create the DHT behaviour with the given persistent storage
279            let mut kadem = Behaviour::with_config(
280                peer_id,
281                PersistentStore::new(
282                    ValidatedStore::new(MemoryStore::new(peer_id)),
283                    dht_persistent_storage,
284                    5,
285                )
286                .await,
287                kconfig,
288            );
289            kadem.set_mode(Some(Mode::Server));
290
291            let rrconfig = Libp2pRequestResponseConfig::default();
292
293            // Create a new `cbor` codec with the given request and response sizes
294            let cbor = Cbor::new(
295                config.request_response_config.request_size_maximum,
296                config.request_response_config.response_size_maximum,
297            );
298
299            let direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>> =
300                RequestResponse::with_codec(
301                    cbor,
302                    [(
303                        StreamProtocol::new("/HotShot/direct_message/1.0"),
304                        ProtocolSupport::Full,
305                    )]
306                    .into_iter(),
307                    rrconfig.clone(),
308                );
309
310            let autonat_config = autonat::Config {
311                only_global_ips: false,
312                ..Default::default()
313            };
314
315            let network = NetworkDef::new(
316                gossipsub,
317                kadem,
318                identify,
319                direct_message,
320                autonat::Behaviour::new(peer_id, autonat_config),
321            );
322
323            // build swarm
324            let swarm = SwarmBuilder::with_existing_identity(keypair.clone());
325            let swarm = swarm.with_tokio();
326
327            swarm
328                .with_other_transport(|_| transport)
329                .unwrap()
330                .with_behaviour(|_| network)
331                .unwrap()
332                .with_swarm_config(|cfg| {
333                    cfg.with_idle_connection_timeout(Duration::from_secs(10))
334                        .with_substream_upgrade_protocol_override(V1Lazy)
335                })
336                .build()
337        };
338        for (peer, addr) in &config.to_connect_addrs {
339            if peer != swarm.local_peer_id() {
340                swarm.behaviour_mut().add_address(peer, addr.clone());
341                swarm.add_peer_address(*peer, addr.clone());
342            }
343        }
344
345        Ok(Self {
346            peer_id,
347            swarm,
348            kademlia_record_ttl: kademlia_ttl,
349            consensus_key_to_pid_map,
350            listener_id: None,
351            direct_message_state: DMBehaviour::default(),
352            dht_handler: DHTBehaviour::new(
353                peer_id,
354                config
355                    .replication_factor
356                    .unwrap_or(NonZeroUsize::new(4).unwrap()),
357            ),
358            resend_tx: None,
359        })
360    }
361
362    /// Publish a key/value to the record store.
363    ///
364    /// # Panics
365    /// If the default replication factor is `None`
366    pub fn put_record(&mut self, mut query: KadPutQuery) {
367        // Create the new record
368        let mut record = Record::new(query.key.clone(), query.value.clone());
369
370        // Set the record's expiration time to the proper time
371        record.expires = Some(Instant::now() + self.kademlia_record_ttl);
372
373        match self.swarm.behaviour_mut().dht.put_record(
374            record,
375            libp2p::kad::Quorum::N(
376                NonZeroUsize::try_from(self.dht_handler.replication_factor().get() / 2)
377                    .expect("replication factor should be bigger than 0"),
378            ),
379        ) {
380            Err(e) => {
381                // failed try again later
382                query.progress = DHTProgress::NotStarted;
383                query.backoff.start_next(false);
384                error!("Error publishing to DHT: {e:?} for peer {:?}", self.peer_id);
385            },
386            Ok(qid) => {
387                debug!("Published record to DHT with qid {qid:?}");
388                let query = KadPutQuery {
389                    progress: DHTProgress::InProgress(qid),
390                    ..query
391                };
392                self.dht_handler.put_record(qid, query);
393            },
394        }
395    }
396
397    /// event handler for client events
398    /// currently supported actions include
399    /// - shutting down the swarm
400    /// - gossipping a message to known peers on the `global` topic
401    /// - returning the id of the current peer
402    /// - subscribing to a topic
403    /// - unsubscribing from a toipc
404    /// - direct messaging a peer
405    #[instrument(skip(self))]
406    async fn handle_client_requests(
407        &mut self,
408        msg: Option<ClientRequest>,
409    ) -> Result<bool, NetworkError> {
410        let behaviour = self.swarm.behaviour_mut();
411        match msg {
412            Some(msg) => {
413                match msg {
414                    ClientRequest::BeginBootstrap => {
415                        debug!("Beginning Libp2p bootstrap");
416                        let _ = self.swarm.behaviour_mut().dht.bootstrap();
417                    },
418                    ClientRequest::LookupPeer(pid, chan) => {
419                        let id = self.swarm.behaviour_mut().dht.get_closest_peers(pid);
420                        self.dht_handler
421                            .in_progress_get_closest_peers
422                            .insert(id, chan);
423                    },
424                    ClientRequest::GetRoutingTable(chan) => {
425                        self.dht_handler
426                            .print_routing_table(&mut self.swarm.behaviour_mut().dht);
427                        if chan.send(()).is_err() {
428                            warn!("Tried to notify client but client not tracking anymore");
429                        }
430                    },
431                    ClientRequest::PutDHT { key, value, notify } => {
432                        let query = KadPutQuery {
433                            progress: DHTProgress::NotStarted,
434                            notify,
435                            key,
436                            value,
437                            backoff: ExponentialBackoff::default(),
438                        };
439                        self.put_record(query);
440                    },
441                    ClientRequest::GetConnectedPeerNum(s) => {
442                        if s.send(self.num_connected()).is_err() {
443                            error!("error sending peer number to client");
444                        }
445                    },
446                    ClientRequest::GetConnectedPeers(s) => {
447                        if s.send(self.connected_pids()).is_err() {
448                            error!("error sending peer set to client");
449                        }
450                    },
451                    ClientRequest::GetDHT {
452                        key,
453                        notify,
454                        retry_count,
455                    } => {
456                        self.dht_handler.get_record(
457                            key,
458                            notify,
459                            ExponentialBackoff::default(),
460                            retry_count,
461                            &mut self.swarm.behaviour_mut().dht,
462                        );
463                    },
464                    ClientRequest::IgnorePeers(_peers) => {
465                        // NOTE used by test with conductor only
466                    },
467                    ClientRequest::Shutdown => {
468                        if let Some(listener_id) = self.listener_id {
469                            self.swarm.remove_listener(listener_id);
470                        }
471
472                        return Ok(true);
473                    },
474                    ClientRequest::GossipMsg(topic, contents) => {
475                        behaviour.publish_gossip(Topic::new(topic.clone()), contents.clone());
476                    },
477                    ClientRequest::Subscribe(t, chan) => {
478                        behaviour.subscribe_gossip(&t);
479                        if let Some(chan) = chan
480                            && chan.send(()).is_err()
481                        {
482                            error!("finished subscribing but response channel dropped");
483                        }
484                    },
485                    ClientRequest::Unsubscribe(t, chan) => {
486                        behaviour.unsubscribe_gossip(&t);
487                        if let Some(chan) = chan
488                            && chan.send(()).is_err()
489                        {
490                            error!("finished unsubscribing but response channel dropped");
491                        }
492                    },
493                    ClientRequest::DirectRequest {
494                        pid,
495                        contents,
496                        retry_count,
497                    } => {
498                        debug!("Sending direct request to {pid:?}");
499                        let id = behaviour.add_direct_request(pid, contents.clone());
500                        let req = DMRequest {
501                            peer_id: pid,
502                            data: contents,
503                            backoff: ExponentialBackoff::default(),
504                            retry_count,
505                        };
506                        self.direct_message_state.add_direct_request(req, id);
507                    },
508                    ClientRequest::DirectResponse(chan, msg) => {
509                        behaviour.add_direct_response(chan, msg);
510                    },
511                    ClientRequest::AddKnownPeers(peers) => {
512                        self.add_known_peers(&peers);
513                    },
514                    ClientRequest::Prune(pid) => {
515                        if self.swarm.disconnect_peer_id(pid).is_err() {
516                            warn!("Could not disconnect from {pid:?}");
517                        }
518                    },
519                }
520            },
521            None => {
522                error!("Error receiving msg in main behaviour loop: channel closed");
523            },
524        }
525        Ok(false)
526    }
527
528    /// event handler for events emitted from the swarm
529    #[allow(clippy::type_complexity)]
530    #[instrument(skip(self))]
531    async fn handle_swarm_events(
532        &mut self,
533        event: SwarmEvent<NetworkEventInternal>,
534        send_to_client: &UnboundedSender<NetworkEvent>,
535    ) -> Result<(), NetworkError> {
536        // Make the match cleaner
537        debug!("Swarm event observed {:?}", event);
538
539        #[allow(deprecated)]
540        match event {
541            SwarmEvent::ConnectionEstablished {
542                connection_id: _,
543                peer_id,
544                endpoint,
545                num_established,
546                concurrent_dial_errors,
547                established_in: _established_in,
548            } => {
549                if num_established > ESTABLISHED_LIMIT {
550                    error!(
551                        "Num concurrent connections to a single peer exceeding \
552                         {ESTABLISHED_LIMIT:?} at {num_established:?}!"
553                    );
554                } else {
555                    debug!(
556                        "Connection established with {peer_id:?} at {endpoint:?} with \
557                         {concurrent_dial_errors:?} concurrent dial errors"
558                    );
559                }
560
561                // Send the number of connected peers to the client
562                send_to_client
563                    .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
564                    .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
565            },
566            SwarmEvent::ConnectionClosed {
567                connection_id: _,
568                peer_id,
569                endpoint,
570                num_established,
571                cause,
572            } => {
573                if num_established > ESTABLISHED_LIMIT_UNWR {
574                    error!(
575                        "Num concurrent connections to a single peer exceeding \
576                         {ESTABLISHED_LIMIT:?} at {num_established:?}!"
577                    );
578                } else {
579                    debug!("Connection closed with {peer_id:?} at {endpoint:?} due to {cause:?}");
580                }
581
582                // If we are no longer connected to the peer, remove the consensus key from the map
583                if num_established == 0 {
584                    self.consensus_key_to_pid_map
585                        .lock()
586                        .remove_by_right(&peer_id);
587                }
588
589                // Send the number of connected peers to the client
590                send_to_client
591                    .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
592                    .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
593            },
594            SwarmEvent::Dialing {
595                peer_id,
596                connection_id: _,
597            } => {
598                debug!("Attempting to dial {peer_id:?}");
599            },
600            SwarmEvent::ListenerClosed {
601                listener_id: _,
602                addresses: _,
603                reason: _,
604            }
605            | SwarmEvent::NewListenAddr {
606                listener_id: _,
607                address: _,
608            }
609            | SwarmEvent::ExpiredListenAddr {
610                listener_id: _,
611                address: _,
612            }
613            | SwarmEvent::NewExternalAddrCandidate { .. }
614            | SwarmEvent::ExternalAddrExpired { .. }
615            | SwarmEvent::IncomingConnection {
616                connection_id: _,
617                local_addr: _,
618                send_back_addr: _,
619            } => {},
620            SwarmEvent::Behaviour(b) => {
621                let maybe_event = match b {
622                    NetworkEventInternal::DHTEvent(e) => self
623                        .dht_handler
624                        .dht_handle_event(e, self.swarm.behaviour_mut().dht.store_mut()),
625                    NetworkEventInternal::IdentifyEvent(e) => {
626                        // NOTE feed identified peers into kademlia's routing table for peer discovery.
627                        if let IdentifyEvent::Received {
628                            peer_id,
629                            info:
630                                IdentifyInfo {
631                                    listen_addrs,
632                                    protocols: _,
633                                    public_key: _,
634                                    protocol_version: _,
635                                    agent_version: _,
636                                    observed_addr: _,
637                                    signed_peer_record: _,
638                                },
639                            connection_id: _,
640                        } = *e
641                        {
642                            let behaviour = self.swarm.behaviour_mut();
643
644                            // into hashset to delete duplicates (I checked: there are duplicates)
645                            for addr in listen_addrs.iter().collect::<HashSet<_>>() {
646                                behaviour.dht.add_address(&peer_id, addr.clone());
647                            }
648                        }
649                        None
650                    },
651                    NetworkEventInternal::GossipEvent(e) => match *e {
652                        GossipEvent::Message {
653                            propagation_source: _peer_id,
654                            message_id: _id,
655                            message,
656                        } => Some(NetworkEvent::GossipMsg(message.data)),
657                        GossipEvent::Subscribed { peer_id, topic } => {
658                            debug!("Peer {peer_id:?} subscribed to topic {topic:?}");
659                            None
660                        },
661                        GossipEvent::Unsubscribed { peer_id, topic } => {
662                            debug!("Peer {peer_id:?} unsubscribed from topic {topic:?}");
663                            None
664                        },
665                        GossipEvent::GossipsubNotSupported { peer_id } => {
666                            warn!("Peer {peer_id:?} does not support gossipsub");
667                            None
668                        },
669                        GossipEvent::SlowPeer {
670                            peer_id,
671                            failed_messages: _,
672                        } => {
673                            warn!("Peer {peer_id:?} is slow");
674                            None
675                        },
676                    },
677                    NetworkEventInternal::DMEvent(e) => self
678                        .direct_message_state
679                        .handle_dm_event(e, self.resend_tx.clone()),
680                    NetworkEventInternal::AutonatEvent(e) => {
681                        match e {
682                            autonat::Event::InboundProbe(_) => {},
683                            autonat::Event::OutboundProbe(e) => match e {
684                                autonat::OutboundProbeEvent::Request { .. }
685                                | autonat::OutboundProbeEvent::Response { .. } => {},
686                                autonat::OutboundProbeEvent::Error {
687                                    probe_id: _,
688                                    peer,
689                                    error,
690                                } => {
691                                    warn!(
692                                        "AutoNAT Probe failed to peer {peer:?} with error: \
693                                         {error:?}"
694                                    );
695                                },
696                            },
697                            autonat::Event::StatusChanged { old, new } => {
698                                debug!("AutoNAT Status changed. Old: {old:?}, New: {new:?}");
699                            },
700                        };
701                        None
702                    },
703                };
704
705                if let Some(event) = maybe_event {
706                    // forward messages directly to Client
707                    send_to_client
708                        .send(event)
709                        .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
710                }
711            },
712            SwarmEvent::OutgoingConnectionError {
713                connection_id: _,
714                peer_id,
715                error,
716            } => {
717                warn!("Outgoing connection error to {peer_id:?}: {error:?}");
718            },
719            SwarmEvent::IncomingConnectionError {
720                connection_id: _,
721                local_addr: _,
722                send_back_addr: _,
723                error,
724                peer_id: _,
725            } => {
726                warn!("Incoming connection error: {error:?}");
727            },
728            SwarmEvent::ListenerError {
729                listener_id: _,
730                error,
731            } => {
732                warn!("Listener error: {error:?}");
733            },
734            SwarmEvent::ExternalAddrConfirmed { address } => {
735                let my_id = *self.swarm.local_peer_id();
736                self.swarm
737                    .behaviour_mut()
738                    .dht
739                    .add_address(&my_id, address.clone());
740            },
741            SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
742                self.swarm
743                    .behaviour_mut()
744                    .dht
745                    .add_address(&peer_id, address.clone());
746            },
747            _ => {
748                debug!("Unhandled swarm event {event:?}");
749            },
750        }
751        Ok(())
752    }
753
754    /// Spawn a task to listen for requests on the returned channel
755    /// as well as any events produced by libp2p
756    ///
757    /// # Errors
758    /// - If we fail to create the channels or the bootstrap channel
759    pub fn spawn_listeners(
760        mut self,
761    ) -> Result<
762        (
763            UnboundedSender<ClientRequest>,
764            UnboundedReceiver<NetworkEvent>,
765        ),
766        NetworkError,
767    > {
768        let (s_input, mut s_output) = unbounded_channel::<ClientRequest>();
769        let (r_input, r_output) = unbounded_channel::<NetworkEvent>();
770        let (mut bootstrap_tx, bootstrap_rx) = mpsc::channel(100);
771        self.resend_tx = Some(s_input.clone());
772        self.dht_handler.set_bootstrap_sender(bootstrap_tx.clone());
773
774        DHTBootstrapTask::run(bootstrap_rx, s_input.clone());
775        spawn(
776            async move {
777                loop {
778                    select! {
779                        event = self.swarm.next() => {
780                            debug!("peerid {:?}\t\thandling maybe event {:?}", self.peer_id, event);
781                            if let Some(event) = event {
782                                debug!("peerid {:?}\t\thandling event {:?}", self.peer_id, event);
783                                self.handle_swarm_events(event, &r_input).await?;
784                            }
785                        },
786                        msg = s_output.recv() => {
787                            debug!("peerid {:?}\t\thandling msg {:?}", self.peer_id, msg);
788                            let shutdown = self.handle_client_requests(msg).await?;
789                            if shutdown {
790                                let _ = bootstrap_tx.send(InputEvent::ShutdownBootstrap).await;
791                                break
792                            }
793                        }
794                    }
795                }
796                Ok::<(), NetworkError>(())
797            }
798            .instrument(info_span!("Libp2p NetworkBehaviour Handler")),
799        );
800        Ok((s_input, r_output))
801    }
802
803    /// Get a reference to the network node's peer id.
804    pub fn peer_id(&self) -> PeerId {
805        self.peer_id
806    }
807}