hotshot_libp2p_networking/network/
node.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7/// configuration for the libp2p network (e.g. how it should be built)
8mod config;
9
10/// libp2p network handle
11/// allows for control over the libp2p network
12mod handle;
13
14use std::{
15    collections::{HashMap, HashSet},
16    iter,
17    num::{NonZeroU32, NonZeroUsize},
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22use bimap::BiMap;
23use futures::{channel::mpsc, SinkExt, StreamExt};
24use hotshot_types::{
25    constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, traits::node_implementation::NodeType,
26};
27use libp2p::{
28    autonat,
29    core::{transport::ListenerId, upgrade::Version::V1Lazy},
30    gossipsub::{
31        Behaviour as Gossipsub, ConfigBuilder as GossipsubConfigBuilder, Event as GossipEvent,
32        Message as GossipsubMessage, MessageAuthenticity, MessageId, Topic, ValidationMode,
33    },
34    identify::{
35        Behaviour as IdentifyBehaviour, Config as IdentifyConfig, Event as IdentifyEvent,
36        Info as IdentifyInfo,
37    },
38    identity::Keypair,
39    kad::{store::MemoryStore, Behaviour, Config, Mode, Record},
40    request_response::{
41        Behaviour as RequestResponse, Config as Libp2pRequestResponseConfig, ProtocolSupport,
42    },
43    swarm::SwarmEvent,
44    Multiaddr, StreamProtocol, Swarm, SwarmBuilder,
45};
46use libp2p_identity::PeerId;
47use parking_lot::Mutex;
48use rand::{prelude::SliceRandom, thread_rng};
49use tokio::{
50    select, spawn,
51    sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
52};
53use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
54
55pub use self::{
56    config::{
57        GossipConfig, NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeConfigBuilderError,
58        RequestResponseConfig, DEFAULT_REPLICATION_FACTOR,
59    },
60    handle::{spawn_network_node, NetworkNodeHandle, NetworkNodeReceiver},
61};
62use super::{
63    behaviours::dht::{
64        bootstrap::{DHTBootstrapTask, InputEvent},
65        store::{
66            persistent::{DhtPersistentStorage, PersistentStore},
67            validated::ValidatedStore,
68        },
69    },
70    cbor::Cbor,
71    gen_transport, BoxedTransport, ClientRequest, NetworkDef, NetworkError, NetworkEvent,
72    NetworkEventInternal,
73};
74use crate::network::behaviours::{
75    dht::{DHTBehaviour, DHTProgress, KadPutQuery},
76    direct_message::{DMBehaviour, DMRequest},
77    exponential_backoff::ExponentialBackoff,
78};
79
80/// Maximum size of a message
81pub const MAX_GOSSIP_MSG_SIZE: usize = 2_000_000_000;
82
83/// Wrapped num of connections
84pub const ESTABLISHED_LIMIT: NonZeroU32 = NonZeroU32::new(ESTABLISHED_LIMIT_UNWR).unwrap();
85/// Number of connections to a single peer before logging an error
86pub const ESTABLISHED_LIMIT_UNWR: u32 = 10;
87
88/// Network definition
89#[derive(derive_more::Debug)]
90pub struct NetworkNode<T: NodeType, D: DhtPersistentStorage> {
91    /// peer id of network node
92    peer_id: PeerId,
93    /// the swarm of networkbehaviours
94    #[debug(skip)]
95    swarm: Swarm<NetworkDef<T::SignatureKey, D>>,
96    /// The Kademlia record TTL
97    kademlia_record_ttl: Duration,
98    /// The map from consensus keys to peer IDs
99    consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
100    /// the listener id we are listening on, if it exists
101    listener_id: Option<ListenerId>,
102    /// Handler for direct messages
103    direct_message_state: DMBehaviour,
104    /// Handler for DHT Events
105    dht_handler: DHTBehaviour<T::SignatureKey, D>,
106    /// Channel to resend requests, set to Some when we call `spawn_listeners`
107    resend_tx: Option<UnboundedSender<ClientRequest>>,
108}
109
110impl<T: NodeType, D: DhtPersistentStorage> NetworkNode<T, D> {
111    /// Returns number of peers this node is connected to
112    pub fn num_connected(&self) -> usize {
113        self.swarm.connected_peers().count()
114    }
115
116    /// return hashset of PIDs this node is connected to
117    pub fn connected_pids(&self) -> HashSet<PeerId> {
118        self.swarm.connected_peers().copied().collect()
119    }
120
121    /// starts the swarm listening on `listen_addr`
122    /// and optionally dials into peer `known_peer`
123    /// returns the address the swarm is listening upon
124    #[instrument(skip(self))]
125    pub async fn start_listen(
126        &mut self,
127        listen_addr: Multiaddr,
128    ) -> Result<Multiaddr, NetworkError> {
129        self.listener_id = Some(self.swarm.listen_on(listen_addr).map_err(|err| {
130            NetworkError::ListenError(format!("failed to listen for Libp2p: {err}"))
131        })?);
132        let addr = loop {
133            if let Some(SwarmEvent::NewListenAddr { address, .. }) = self.swarm.next().await {
134                break address;
135            }
136        };
137        info!("Libp2p listening on {addr:?}");
138        Ok(addr)
139    }
140
141    /// initialize the DHT with known peers
142    /// add the peers to kademlia and then
143    /// the `spawn_listeners` function
144    /// will start connecting to peers
145    #[instrument(skip(self))]
146    pub fn add_known_peers(&mut self, known_peers: &[(PeerId, Multiaddr)]) {
147        debug!("Adding {} known peers", known_peers.len());
148        let behaviour = self.swarm.behaviour_mut();
149        let mut bs_nodes = HashMap::<PeerId, HashSet<Multiaddr>>::new();
150        let mut shuffled = known_peers.iter().collect::<Vec<_>>();
151        shuffled.shuffle(&mut thread_rng());
152        for (peer_id, addr) in shuffled {
153            if *peer_id != self.peer_id {
154                behaviour.dht.add_address(peer_id, addr.clone());
155                behaviour.autonat.add_server(*peer_id, Some(addr.clone()));
156                bs_nodes.insert(*peer_id, iter::once(addr.clone()).collect());
157            }
158        }
159    }
160
161    /// Creates a new `Network` with the given settings.
162    ///
163    /// Currently:
164    ///   * Generates a random key pair and associated [`PeerId`]
165    ///   * Launches a hopefully production ready transport: QUIC v1 (RFC 9000) + DNS
166    ///   * Generates a connection to the "broadcast" topic
167    ///   * Creates a swarm to manage peers and events
168    ///
169    /// # Errors
170    /// - If we fail to generate the transport or any of the behaviours
171    ///
172    /// # Panics
173    /// If 5 < 0
174    #[allow(clippy::too_many_lines)]
175    pub async fn new(
176        config: NetworkNodeConfig,
177        dht_persistent_storage: D,
178        consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
179    ) -> Result<Self, NetworkError> {
180        // Generate a random `KeyPair` if one is not specified
181        let keypair = config
182            .keypair
183            .clone()
184            .unwrap_or_else(Keypair::generate_ed25519);
185
186        // Get the `PeerId` from the `KeyPair`
187        let peer_id = PeerId::from(keypair.public());
188
189        // Generate the transport from the keypair and auth message
190        let transport: BoxedTransport = gen_transport::<T>(
191            keypair.clone(),
192            config.auth_message.clone(),
193            Arc::clone(&consensus_key_to_pid_map),
194        )
195        .await?;
196
197        // Calculate the record republication interval
198        let kademlia_record_republication_interval = config
199            .republication_interval
200            .unwrap_or(Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC));
201
202        // Calculate the Kademlia record TTL
203        let kademlia_ttl = config
204            .ttl
205            .unwrap_or(16 * kademlia_record_republication_interval);
206
207        // Generate the swarm
208        let mut swarm: Swarm<NetworkDef<T::SignatureKey, D>> = {
209            // Use the `Blake3` hash of the message's contents as the ID
210            let message_id_fn = |message: &GossipsubMessage| {
211                let hash = blake3::hash(&message.data);
212                MessageId::from(hash.as_bytes().to_vec())
213            };
214
215            // Derive a `Gossipsub` config from our gossip config
216            let gossipsub_config = GossipsubConfigBuilder::default()
217                .message_id_fn(message_id_fn) // Use the (blake3) hash of a message as its ID
218                .validation_mode(ValidationMode::Strict) // Force all messages to have valid signatures
219                .heartbeat_interval(config.gossip_config.heartbeat_interval) // Time between gossip heartbeats
220                .history_gossip(config.gossip_config.history_gossip) // Number of heartbeats to gossip about
221                .history_length(config.gossip_config.history_length) // Number of heartbeats to remember the full message for
222                .mesh_n(config.gossip_config.mesh_n) // Target number of mesh peers
223                .mesh_n_high(config.gossip_config.mesh_n_high) // Upper limit of mesh peers
224                .mesh_n_low(config.gossip_config.mesh_n_low) // Lower limit of mesh peers
225                .mesh_outbound_min(config.gossip_config.mesh_outbound_min) // Minimum number of outbound peers in mesh
226                .max_transmit_size(config.gossip_config.max_transmit_size) // Maximum size of a message
227                .max_ihave_length(config.gossip_config.max_ihave_length) // Maximum number of messages to include in an IHAVE message
228                .max_ihave_messages(config.gossip_config.max_ihave_messages) // Maximum number of IHAVE messages to accept from a peer within a heartbeat
229                .published_message_ids_cache_time(
230                    config.gossip_config.published_message_ids_cache_time,
231                ) // Cache duration for published message IDs
232                .iwant_followup_time(config.gossip_config.iwant_followup_time) // Time to wait for a message requested through IWANT following an IHAVE advertisement
233                .max_messages_per_rpc(config.gossip_config.max_messages_per_rpc) // The maximum number of messages we will process in a given RPC
234                .gossip_retransimission(config.gossip_config.gossip_retransmission) // Controls how many times we will allow a peer to request the same message id through IWANT gossip before we start ignoring them.
235                .flood_publish(config.gossip_config.flood_publish) // If enabled newly created messages will always be sent to all peers that are subscribed to the topic and have a good enough score.
236                .duplicate_cache_time(config.gossip_config.duplicate_cache_time) // The time period that messages are stored in the cache
237                .fanout_ttl(config.gossip_config.fanout_ttl) // Time to live for fanout peers
238                .heartbeat_initial_delay(config.gossip_config.heartbeat_initial_delay) // Initial delay in each heartbeat
239                .gossip_factor(config.gossip_config.gossip_factor) // Affects how many peers we will emit gossip to at each heartbeat
240                .gossip_lazy(config.gossip_config.gossip_lazy) // Minimum number of peers to emit gossip to during a heartbeat
241                .build()
242                .map_err(|err| {
243                    NetworkError::ConfigError(format!("error building gossipsub config: {err:?}"))
244                })?;
245
246            // - Build a gossipsub network behavior
247            let gossipsub: Gossipsub = Gossipsub::new(
248                MessageAuthenticity::Signed(keypair.clone()),
249                gossipsub_config,
250            )
251            .map_err(|err| {
252                NetworkError::ConfigError(format!("error building gossipsub behaviour: {err:?}"))
253            })?;
254
255            //   Build a identify network behavior needed for own
256            //   node connection information
257            //   E.g. this will answer the question: how are other nodes
258            //   seeing the peer from behind a NAT
259            let identify_cfg =
260                IdentifyConfig::new("HotShot/identify/1.0".to_string(), keypair.public());
261            let identify = IdentifyBehaviour::new(identify_cfg);
262
263            // - Build DHT needed for peer discovery
264            let mut kconfig = Config::new(StreamProtocol::new("/ipfs/kad/1.0.0"));
265            kconfig
266                .set_parallelism(NonZeroUsize::new(5).unwrap())
267                .set_provider_publication_interval(Some(kademlia_record_republication_interval))
268                .set_publication_interval(Some(kademlia_record_republication_interval))
269                .set_record_ttl(Some(kademlia_ttl));
270
271            // allowing panic here because something is very wrong if this fales
272            #[allow(clippy::panic)]
273            if let Some(factor) = config.replication_factor {
274                kconfig.set_replication_factor(factor);
275            } else {
276                panic!("Replication factor not set");
277            }
278
279            // Create the DHT behaviour with the given persistent storage
280            let mut kadem = Behaviour::with_config(
281                peer_id,
282                PersistentStore::new(
283                    ValidatedStore::new(MemoryStore::new(peer_id)),
284                    dht_persistent_storage,
285                    5,
286                )
287                .await,
288                kconfig,
289            );
290            kadem.set_mode(Some(Mode::Server));
291
292            let rrconfig = Libp2pRequestResponseConfig::default();
293
294            // Create a new `cbor` codec with the given request and response sizes
295            let cbor = Cbor::new(
296                config.request_response_config.request_size_maximum,
297                config.request_response_config.response_size_maximum,
298            );
299
300            let direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>> =
301                RequestResponse::with_codec(
302                    cbor,
303                    [(
304                        StreamProtocol::new("/HotShot/direct_message/1.0"),
305                        ProtocolSupport::Full,
306                    )]
307                    .into_iter(),
308                    rrconfig.clone(),
309                );
310
311            let autonat_config = autonat::Config {
312                only_global_ips: false,
313                ..Default::default()
314            };
315
316            let network = NetworkDef::new(
317                gossipsub,
318                kadem,
319                identify,
320                direct_message,
321                autonat::Behaviour::new(peer_id, autonat_config),
322            );
323
324            // build swarm
325            let swarm = SwarmBuilder::with_existing_identity(keypair.clone());
326            let swarm = swarm.with_tokio();
327
328            swarm
329                .with_other_transport(|_| transport)
330                .unwrap()
331                .with_behaviour(|_| network)
332                .unwrap()
333                .with_swarm_config(|cfg| {
334                    cfg.with_idle_connection_timeout(Duration::from_secs(10))
335                        .with_substream_upgrade_protocol_override(V1Lazy)
336                })
337                .build()
338        };
339        for (peer, addr) in &config.to_connect_addrs {
340            if peer != swarm.local_peer_id() {
341                swarm.behaviour_mut().add_address(peer, addr.clone());
342                swarm.add_peer_address(*peer, addr.clone());
343            }
344        }
345
346        Ok(Self {
347            peer_id,
348            swarm,
349            kademlia_record_ttl: kademlia_ttl,
350            consensus_key_to_pid_map,
351            listener_id: None,
352            direct_message_state: DMBehaviour::default(),
353            dht_handler: DHTBehaviour::new(
354                peer_id,
355                config
356                    .replication_factor
357                    .unwrap_or(NonZeroUsize::new(4).unwrap()),
358            ),
359            resend_tx: None,
360        })
361    }
362
363    /// Publish a key/value to the record store.
364    ///
365    /// # Panics
366    /// If the default replication factor is `None`
367    pub fn put_record(&mut self, mut query: KadPutQuery) {
368        // Create the new record
369        let mut record = Record::new(query.key.clone(), query.value.clone());
370
371        // Set the record's expiration time to the proper time
372        record.expires = Some(Instant::now() + self.kademlia_record_ttl);
373
374        match self.swarm.behaviour_mut().dht.put_record(
375            record,
376            libp2p::kad::Quorum::N(
377                NonZeroUsize::try_from(self.dht_handler.replication_factor().get() / 2)
378                    .expect("replication factor should be bigger than 0"),
379            ),
380        ) {
381            Err(e) => {
382                // failed try again later
383                query.progress = DHTProgress::NotStarted;
384                query.backoff.start_next(false);
385                error!("Error publishing to DHT: {e:?} for peer {:?}", self.peer_id);
386            },
387            Ok(qid) => {
388                debug!("Published record to DHT with qid {qid:?}");
389                let query = KadPutQuery {
390                    progress: DHTProgress::InProgress(qid),
391                    ..query
392                };
393                self.dht_handler.put_record(qid, query);
394            },
395        }
396    }
397
398    /// event handler for client events
399    /// currently supported actions include
400    /// - shutting down the swarm
401    /// - gossipping a message to known peers on the `global` topic
402    /// - returning the id of the current peer
403    /// - subscribing to a topic
404    /// - unsubscribing from a toipc
405    /// - direct messaging a peer
406    #[instrument(skip(self))]
407    async fn handle_client_requests(
408        &mut self,
409        msg: Option<ClientRequest>,
410    ) -> Result<bool, NetworkError> {
411        let behaviour = self.swarm.behaviour_mut();
412        match msg {
413            Some(msg) => {
414                match msg {
415                    ClientRequest::BeginBootstrap => {
416                        debug!("Beginning Libp2p bootstrap");
417                        let _ = self.swarm.behaviour_mut().dht.bootstrap();
418                    },
419                    ClientRequest::LookupPeer(pid, chan) => {
420                        let id = self.swarm.behaviour_mut().dht.get_closest_peers(pid);
421                        self.dht_handler
422                            .in_progress_get_closest_peers
423                            .insert(id, chan);
424                    },
425                    ClientRequest::GetRoutingTable(chan) => {
426                        self.dht_handler
427                            .print_routing_table(&mut self.swarm.behaviour_mut().dht);
428                        if chan.send(()).is_err() {
429                            warn!("Tried to notify client but client not tracking anymore");
430                        }
431                    },
432                    ClientRequest::PutDHT { key, value, notify } => {
433                        let query = KadPutQuery {
434                            progress: DHTProgress::NotStarted,
435                            notify,
436                            key,
437                            value,
438                            backoff: ExponentialBackoff::default(),
439                        };
440                        self.put_record(query);
441                    },
442                    ClientRequest::GetConnectedPeerNum(s) => {
443                        if s.send(self.num_connected()).is_err() {
444                            error!("error sending peer number to client");
445                        }
446                    },
447                    ClientRequest::GetConnectedPeers(s) => {
448                        if s.send(self.connected_pids()).is_err() {
449                            error!("error sending peer set to client");
450                        }
451                    },
452                    ClientRequest::GetDHT {
453                        key,
454                        notify,
455                        retry_count,
456                    } => {
457                        self.dht_handler.get_record(
458                            key,
459                            notify,
460                            ExponentialBackoff::default(),
461                            retry_count,
462                            &mut self.swarm.behaviour_mut().dht,
463                        );
464                    },
465                    ClientRequest::IgnorePeers(_peers) => {
466                        // NOTE used by test with conductor only
467                    },
468                    ClientRequest::Shutdown => {
469                        if let Some(listener_id) = self.listener_id {
470                            self.swarm.remove_listener(listener_id);
471                        }
472
473                        return Ok(true);
474                    },
475                    ClientRequest::GossipMsg(topic, contents) => {
476                        behaviour.publish_gossip(Topic::new(topic.clone()), contents.clone());
477                    },
478                    ClientRequest::Subscribe(t, chan) => {
479                        behaviour.subscribe_gossip(&t);
480                        if let Some(chan) = chan {
481                            if chan.send(()).is_err() {
482                                error!("finished subscribing but response channel dropped");
483                            }
484                        }
485                    },
486                    ClientRequest::Unsubscribe(t, chan) => {
487                        behaviour.unsubscribe_gossip(&t);
488                        if let Some(chan) = chan {
489                            if chan.send(()).is_err() {
490                                error!("finished unsubscribing but response channel dropped");
491                            }
492                        }
493                    },
494                    ClientRequest::DirectRequest {
495                        pid,
496                        contents,
497                        retry_count,
498                    } => {
499                        debug!("Sending direct request to {pid:?}");
500                        let id = behaviour.add_direct_request(pid, contents.clone());
501                        let req = DMRequest {
502                            peer_id: pid,
503                            data: contents,
504                            backoff: ExponentialBackoff::default(),
505                            retry_count,
506                        };
507                        self.direct_message_state.add_direct_request(req, id);
508                    },
509                    ClientRequest::DirectResponse(chan, msg) => {
510                        behaviour.add_direct_response(chan, msg);
511                    },
512                    ClientRequest::AddKnownPeers(peers) => {
513                        self.add_known_peers(&peers);
514                    },
515                    ClientRequest::Prune(pid) => {
516                        if self.swarm.disconnect_peer_id(pid).is_err() {
517                            warn!("Could not disconnect from {pid:?}");
518                        }
519                    },
520                }
521            },
522            None => {
523                error!("Error receiving msg in main behaviour loop: channel closed");
524            },
525        }
526        Ok(false)
527    }
528
529    /// event handler for events emitted from the swarm
530    #[allow(clippy::type_complexity)]
531    #[instrument(skip(self))]
532    async fn handle_swarm_events(
533        &mut self,
534        event: SwarmEvent<NetworkEventInternal>,
535        send_to_client: &UnboundedSender<NetworkEvent>,
536    ) -> Result<(), NetworkError> {
537        // Make the match cleaner
538        debug!("Swarm event observed {:?}", event);
539
540        #[allow(deprecated)]
541        match event {
542            SwarmEvent::ConnectionEstablished {
543                connection_id: _,
544                peer_id,
545                endpoint,
546                num_established,
547                concurrent_dial_errors,
548                established_in: _established_in,
549            } => {
550                if num_established > ESTABLISHED_LIMIT {
551                    error!(
552                        "Num concurrent connections to a single peer exceeding \
553                         {ESTABLISHED_LIMIT:?} at {num_established:?}!"
554                    );
555                } else {
556                    debug!(
557                        "Connection established with {peer_id:?} at {endpoint:?} with \
558                         {concurrent_dial_errors:?} concurrent dial errors"
559                    );
560                }
561
562                // Send the number of connected peers to the client
563                send_to_client
564                    .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
565                    .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
566            },
567            SwarmEvent::ConnectionClosed {
568                connection_id: _,
569                peer_id,
570                endpoint,
571                num_established,
572                cause,
573            } => {
574                if num_established > ESTABLISHED_LIMIT_UNWR {
575                    error!(
576                        "Num concurrent connections to a single peer exceeding \
577                         {ESTABLISHED_LIMIT:?} at {num_established:?}!"
578                    );
579                } else {
580                    debug!("Connection closed with {peer_id:?} at {endpoint:?} due to {cause:?}");
581                }
582
583                // If we are no longer connected to the peer, remove the consensus key from the map
584                if num_established == 0 {
585                    self.consensus_key_to_pid_map
586                        .lock()
587                        .remove_by_right(&peer_id);
588                }
589
590                // Send the number of connected peers to the client
591                send_to_client
592                    .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
593                    .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
594            },
595            SwarmEvent::Dialing {
596                peer_id,
597                connection_id: _,
598            } => {
599                debug!("Attempting to dial {peer_id:?}");
600            },
601            SwarmEvent::ListenerClosed {
602                listener_id: _,
603                addresses: _,
604                reason: _,
605            }
606            | SwarmEvent::NewListenAddr {
607                listener_id: _,
608                address: _,
609            }
610            | SwarmEvent::ExpiredListenAddr {
611                listener_id: _,
612                address: _,
613            }
614            | SwarmEvent::NewExternalAddrCandidate { .. }
615            | SwarmEvent::ExternalAddrExpired { .. }
616            | SwarmEvent::IncomingConnection {
617                connection_id: _,
618                local_addr: _,
619                send_back_addr: _,
620            } => {},
621            SwarmEvent::Behaviour(b) => {
622                let maybe_event = match b {
623                    NetworkEventInternal::DHTEvent(e) => self
624                        .dht_handler
625                        .dht_handle_event(e, self.swarm.behaviour_mut().dht.store_mut()),
626                    NetworkEventInternal::IdentifyEvent(e) => {
627                        // NOTE feed identified peers into kademlia's routing table for peer discovery.
628                        if let IdentifyEvent::Received {
629                            peer_id,
630                            info:
631                                IdentifyInfo {
632                                    listen_addrs,
633                                    protocols: _,
634                                    public_key: _,
635                                    protocol_version: _,
636                                    agent_version: _,
637                                    observed_addr: _,
638                                    signed_peer_record: _,
639                                },
640                            connection_id: _,
641                        } = *e
642                        {
643                            let behaviour = self.swarm.behaviour_mut();
644
645                            // into hashset to delete duplicates (I checked: there are duplicates)
646                            for addr in listen_addrs.iter().collect::<HashSet<_>>() {
647                                behaviour.dht.add_address(&peer_id, addr.clone());
648                            }
649                        }
650                        None
651                    },
652                    NetworkEventInternal::GossipEvent(e) => match *e {
653                        GossipEvent::Message {
654                            propagation_source: _peer_id,
655                            message_id: _id,
656                            message,
657                        } => Some(NetworkEvent::GossipMsg(message.data)),
658                        GossipEvent::Subscribed { peer_id, topic } => {
659                            debug!("Peer {peer_id:?} subscribed to topic {topic:?}");
660                            None
661                        },
662                        GossipEvent::Unsubscribed { peer_id, topic } => {
663                            debug!("Peer {peer_id:?} unsubscribed from topic {topic:?}");
664                            None
665                        },
666                        GossipEvent::GossipsubNotSupported { peer_id } => {
667                            warn!("Peer {peer_id:?} does not support gossipsub");
668                            None
669                        },
670                        GossipEvent::SlowPeer {
671                            peer_id,
672                            failed_messages: _,
673                        } => {
674                            warn!("Peer {peer_id:?} is slow");
675                            None
676                        },
677                    },
678                    NetworkEventInternal::DMEvent(e) => self
679                        .direct_message_state
680                        .handle_dm_event(e, self.resend_tx.clone()),
681                    NetworkEventInternal::AutonatEvent(e) => {
682                        match e {
683                            autonat::Event::InboundProbe(_) => {},
684                            autonat::Event::OutboundProbe(e) => match e {
685                                autonat::OutboundProbeEvent::Request { .. }
686                                | autonat::OutboundProbeEvent::Response { .. } => {},
687                                autonat::OutboundProbeEvent::Error {
688                                    probe_id: _,
689                                    peer,
690                                    error,
691                                } => {
692                                    warn!(
693                                        "AutoNAT Probe failed to peer {peer:?} with error: \
694                                         {error:?}"
695                                    );
696                                },
697                            },
698                            autonat::Event::StatusChanged { old, new } => {
699                                debug!("AutoNAT Status changed. Old: {old:?}, New: {new:?}");
700                            },
701                        };
702                        None
703                    },
704                };
705
706                if let Some(event) = maybe_event {
707                    // forward messages directly to Client
708                    send_to_client
709                        .send(event)
710                        .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
711                }
712            },
713            SwarmEvent::OutgoingConnectionError {
714                connection_id: _,
715                peer_id,
716                error,
717            } => {
718                warn!("Outgoing connection error to {peer_id:?}: {error:?}");
719            },
720            SwarmEvent::IncomingConnectionError {
721                connection_id: _,
722                local_addr: _,
723                send_back_addr: _,
724                error,
725                peer_id: _,
726            } => {
727                warn!("Incoming connection error: {error:?}");
728            },
729            SwarmEvent::ListenerError {
730                listener_id: _,
731                error,
732            } => {
733                warn!("Listener error: {error:?}");
734            },
735            SwarmEvent::ExternalAddrConfirmed { address } => {
736                let my_id = *self.swarm.local_peer_id();
737                self.swarm
738                    .behaviour_mut()
739                    .dht
740                    .add_address(&my_id, address.clone());
741            },
742            SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
743                self.swarm
744                    .behaviour_mut()
745                    .dht
746                    .add_address(&peer_id, address.clone());
747            },
748            _ => {
749                debug!("Unhandled swarm event {event:?}");
750            },
751        }
752        Ok(())
753    }
754
755    /// Spawn a task to listen for requests on the returned channel
756    /// as well as any events produced by libp2p
757    ///
758    /// # Errors
759    /// - If we fail to create the channels or the bootstrap channel
760    pub fn spawn_listeners(
761        mut self,
762    ) -> Result<
763        (
764            UnboundedSender<ClientRequest>,
765            UnboundedReceiver<NetworkEvent>,
766        ),
767        NetworkError,
768    > {
769        let (s_input, mut s_output) = unbounded_channel::<ClientRequest>();
770        let (r_input, r_output) = unbounded_channel::<NetworkEvent>();
771        let (mut bootstrap_tx, bootstrap_rx) = mpsc::channel(100);
772        self.resend_tx = Some(s_input.clone());
773        self.dht_handler.set_bootstrap_sender(bootstrap_tx.clone());
774
775        DHTBootstrapTask::run(bootstrap_rx, s_input.clone());
776        spawn(
777            async move {
778                loop {
779                    select! {
780                        event = self.swarm.next() => {
781                            debug!("peerid {:?}\t\thandling maybe event {:?}", self.peer_id, event);
782                            if let Some(event) = event {
783                                debug!("peerid {:?}\t\thandling event {:?}", self.peer_id, event);
784                                self.handle_swarm_events(event, &r_input).await?;
785                            }
786                        },
787                        msg = s_output.recv() => {
788                            debug!("peerid {:?}\t\thandling msg {:?}", self.peer_id, msg);
789                            let shutdown = self.handle_client_requests(msg).await?;
790                            if shutdown {
791                                let _ = bootstrap_tx.send(InputEvent::ShutdownBootstrap).await;
792                                break
793                            }
794                        }
795                    }
796                }
797                Ok::<(), NetworkError>(())
798            }
799            .instrument(info_span!("Libp2p NetworkBehaviour Handler")),
800        );
801        Ok((s_input, r_output))
802    }
803
804    /// Get a reference to the network node's peer id.
805    pub fn peer_id(&self) -> PeerId {
806        self.peer_id
807    }
808}