1mod config;
9
10mod handle;
13
14use std::{
15 collections::{HashMap, HashSet},
16 iter,
17 num::{NonZeroU32, NonZeroUsize},
18 sync::Arc,
19 time::{Duration, Instant},
20};
21
22use bimap::BiMap;
23use futures::{SinkExt, StreamExt, channel::mpsc};
24use hotshot_types::{
25 constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, traits::node_implementation::NodeType,
26};
27use libp2p::{
28 Multiaddr, StreamProtocol, Swarm, SwarmBuilder, autonat,
29 core::{transport::ListenerId, upgrade::Version::V1Lazy},
30 gossipsub::{
31 Behaviour as Gossipsub, ConfigBuilder as GossipsubConfigBuilder, Event as GossipEvent,
32 Message as GossipsubMessage, MessageAuthenticity, MessageId, Topic, ValidationMode,
33 },
34 identify::{
35 Behaviour as IdentifyBehaviour, Config as IdentifyConfig, Event as IdentifyEvent,
36 Info as IdentifyInfo,
37 },
38 identity::Keypair,
39 kad::{Behaviour, Config, Mode, Record, store::MemoryStore},
40 request_response::{
41 Behaviour as RequestResponse, Config as Libp2pRequestResponseConfig, ProtocolSupport,
42 },
43 swarm::SwarmEvent,
44};
45use libp2p_identity::PeerId;
46use parking_lot::Mutex;
47use rand::{prelude::SliceRandom, thread_rng};
48use tokio::{
49 select, spawn,
50 sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
51};
52use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
53
54pub use self::{
55 config::{
56 DEFAULT_REPLICATION_FACTOR, GossipConfig, NetworkNodeConfig, NetworkNodeConfigBuilder,
57 NetworkNodeConfigBuilderError, RequestResponseConfig,
58 },
59 handle::{NetworkNodeHandle, NetworkNodeReceiver, spawn_network_node},
60};
61use super::{
62 BoxedTransport, ClientRequest, NetworkDef, NetworkError, NetworkEvent, NetworkEventInternal,
63 behaviours::dht::{
64 bootstrap::{DHTBootstrapTask, InputEvent},
65 store::{
66 persistent::{DhtPersistentStorage, PersistentStore},
67 validated::ValidatedStore,
68 },
69 },
70 cbor::Cbor,
71 gen_transport,
72};
73use crate::network::behaviours::{
74 dht::{DHTBehaviour, DHTProgress, KadPutQuery},
75 direct_message::{DMBehaviour, DMRequest},
76 exponential_backoff::ExponentialBackoff,
77};
78
79pub const MAX_GOSSIP_MSG_SIZE: usize = 2_000_000_000;
81
82pub const ESTABLISHED_LIMIT: NonZeroU32 = NonZeroU32::new(ESTABLISHED_LIMIT_UNWR).unwrap();
84pub const ESTABLISHED_LIMIT_UNWR: u32 = 10;
86
87#[derive(derive_more::Debug)]
89pub struct NetworkNode<T: NodeType, D: DhtPersistentStorage> {
90 peer_id: PeerId,
92 #[debug(skip)]
94 swarm: Swarm<NetworkDef<T::SignatureKey, D>>,
95 kademlia_record_ttl: Duration,
97 consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
99 listener_id: Option<ListenerId>,
101 direct_message_state: DMBehaviour,
103 dht_handler: DHTBehaviour<T::SignatureKey, D>,
105 resend_tx: Option<UnboundedSender<ClientRequest>>,
107}
108
109impl<T: NodeType, D: DhtPersistentStorage> NetworkNode<T, D> {
110 pub fn num_connected(&self) -> usize {
112 self.swarm.connected_peers().count()
113 }
114
115 pub fn connected_pids(&self) -> HashSet<PeerId> {
117 self.swarm.connected_peers().copied().collect()
118 }
119
120 #[instrument(skip(self))]
124 pub async fn start_listen(
125 &mut self,
126 listen_addr: Multiaddr,
127 ) -> Result<Multiaddr, NetworkError> {
128 self.listener_id = Some(self.swarm.listen_on(listen_addr).map_err(|err| {
129 NetworkError::ListenError(format!("failed to listen for Libp2p: {err}"))
130 })?);
131 let addr = loop {
132 if let Some(SwarmEvent::NewListenAddr { address, .. }) = self.swarm.next().await {
133 break address;
134 }
135 };
136 info!("Libp2p listening on {addr:?}");
137 Ok(addr)
138 }
139
140 #[instrument(skip(self))]
145 pub fn add_known_peers(&mut self, known_peers: &[(PeerId, Multiaddr)]) {
146 debug!("Adding {} known peers", known_peers.len());
147 let behaviour = self.swarm.behaviour_mut();
148 let mut bs_nodes = HashMap::<PeerId, HashSet<Multiaddr>>::new();
149 let mut shuffled = known_peers.iter().collect::<Vec<_>>();
150 shuffled.shuffle(&mut thread_rng());
151 for (peer_id, addr) in shuffled {
152 if *peer_id != self.peer_id {
153 behaviour.dht.add_address(peer_id, addr.clone());
154 behaviour.autonat.add_server(*peer_id, Some(addr.clone()));
155 bs_nodes.insert(*peer_id, iter::once(addr.clone()).collect());
156 }
157 }
158 }
159
160 #[allow(clippy::too_many_lines)]
174 pub async fn new(
175 config: NetworkNodeConfig,
176 dht_persistent_storage: D,
177 consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
178 ) -> Result<Self, NetworkError> {
179 let keypair = config
181 .keypair
182 .clone()
183 .unwrap_or_else(Keypair::generate_ed25519);
184
185 let peer_id = PeerId::from(keypair.public());
187
188 let transport: BoxedTransport = gen_transport::<T>(
190 keypair.clone(),
191 config.auth_message.clone(),
192 Arc::clone(&consensus_key_to_pid_map),
193 )
194 .await?;
195
196 let kademlia_record_republication_interval = config
198 .republication_interval
199 .unwrap_or(Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC));
200
201 let kademlia_ttl = config
203 .ttl
204 .unwrap_or(16 * kademlia_record_republication_interval);
205
206 let mut swarm: Swarm<NetworkDef<T::SignatureKey, D>> = {
208 let message_id_fn = |message: &GossipsubMessage| {
210 let hash = blake3::hash(&message.data);
211 MessageId::from(hash.as_bytes().to_vec())
212 };
213
214 let gossipsub_config = GossipsubConfigBuilder::default()
216 .message_id_fn(message_id_fn) .validation_mode(ValidationMode::Strict) .heartbeat_interval(config.gossip_config.heartbeat_interval) .history_gossip(config.gossip_config.history_gossip) .history_length(config.gossip_config.history_length) .mesh_n(config.gossip_config.mesh_n) .mesh_n_high(config.gossip_config.mesh_n_high) .mesh_n_low(config.gossip_config.mesh_n_low) .mesh_outbound_min(config.gossip_config.mesh_outbound_min) .max_transmit_size(config.gossip_config.max_transmit_size) .max_ihave_length(config.gossip_config.max_ihave_length) .max_ihave_messages(config.gossip_config.max_ihave_messages) .published_message_ids_cache_time(
229 config.gossip_config.published_message_ids_cache_time,
230 ) .iwant_followup_time(config.gossip_config.iwant_followup_time) .max_messages_per_rpc(config.gossip_config.max_messages_per_rpc) .gossip_retransimission(config.gossip_config.gossip_retransmission) .flood_publish(config.gossip_config.flood_publish) .duplicate_cache_time(config.gossip_config.duplicate_cache_time) .fanout_ttl(config.gossip_config.fanout_ttl) .heartbeat_initial_delay(config.gossip_config.heartbeat_initial_delay) .gossip_factor(config.gossip_config.gossip_factor) .gossip_lazy(config.gossip_config.gossip_lazy) .build()
241 .map_err(|err| {
242 NetworkError::ConfigError(format!("error building gossipsub config: {err:?}"))
243 })?;
244
245 let gossipsub: Gossipsub = Gossipsub::new(
247 MessageAuthenticity::Signed(keypair.clone()),
248 gossipsub_config,
249 )
250 .map_err(|err| {
251 NetworkError::ConfigError(format!("error building gossipsub behaviour: {err:?}"))
252 })?;
253
254 let identify_cfg =
259 IdentifyConfig::new("HotShot/identify/1.0".to_string(), keypair.public());
260 let identify = IdentifyBehaviour::new(identify_cfg);
261
262 let mut kconfig = Config::new(StreamProtocol::new("/ipfs/kad/1.0.0"));
264 kconfig
265 .set_parallelism(NonZeroUsize::new(5).unwrap())
266 .set_provider_publication_interval(Some(kademlia_record_republication_interval))
267 .set_publication_interval(Some(kademlia_record_republication_interval))
268 .set_record_ttl(Some(kademlia_ttl));
269
270 #[allow(clippy::panic)]
272 if let Some(factor) = config.replication_factor {
273 kconfig.set_replication_factor(factor);
274 } else {
275 panic!("Replication factor not set");
276 }
277
278 let mut kadem = Behaviour::with_config(
280 peer_id,
281 PersistentStore::new(
282 ValidatedStore::new(MemoryStore::new(peer_id)),
283 dht_persistent_storage,
284 5,
285 )
286 .await,
287 kconfig,
288 );
289 kadem.set_mode(Some(Mode::Server));
290
291 let rrconfig = Libp2pRequestResponseConfig::default();
292
293 let cbor = Cbor::new(
295 config.request_response_config.request_size_maximum,
296 config.request_response_config.response_size_maximum,
297 );
298
299 let direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>> =
300 RequestResponse::with_codec(
301 cbor,
302 [(
303 StreamProtocol::new("/HotShot/direct_message/1.0"),
304 ProtocolSupport::Full,
305 )]
306 .into_iter(),
307 rrconfig.clone(),
308 );
309
310 let autonat_config = autonat::Config {
311 only_global_ips: false,
312 ..Default::default()
313 };
314
315 let network = NetworkDef::new(
316 gossipsub,
317 kadem,
318 identify,
319 direct_message,
320 autonat::Behaviour::new(peer_id, autonat_config),
321 );
322
323 let swarm = SwarmBuilder::with_existing_identity(keypair.clone());
325 let swarm = swarm.with_tokio();
326
327 swarm
328 .with_other_transport(|_| transport)
329 .unwrap()
330 .with_behaviour(|_| network)
331 .unwrap()
332 .with_swarm_config(|cfg| {
333 cfg.with_idle_connection_timeout(Duration::from_secs(10))
334 .with_substream_upgrade_protocol_override(V1Lazy)
335 })
336 .build()
337 };
338 for (peer, addr) in &config.to_connect_addrs {
339 if peer != swarm.local_peer_id() {
340 swarm.behaviour_mut().add_address(peer, addr.clone());
341 swarm.add_peer_address(*peer, addr.clone());
342 }
343 }
344
345 Ok(Self {
346 peer_id,
347 swarm,
348 kademlia_record_ttl: kademlia_ttl,
349 consensus_key_to_pid_map,
350 listener_id: None,
351 direct_message_state: DMBehaviour::default(),
352 dht_handler: DHTBehaviour::new(
353 peer_id,
354 config
355 .replication_factor
356 .unwrap_or(NonZeroUsize::new(4).unwrap()),
357 ),
358 resend_tx: None,
359 })
360 }
361
362 pub fn put_record(&mut self, mut query: KadPutQuery) {
367 let mut record = Record::new(query.key.clone(), query.value.clone());
369
370 record.expires = Some(Instant::now() + self.kademlia_record_ttl);
372
373 match self.swarm.behaviour_mut().dht.put_record(
374 record,
375 libp2p::kad::Quorum::N(
376 NonZeroUsize::try_from(self.dht_handler.replication_factor().get() / 2)
377 .expect("replication factor should be bigger than 0"),
378 ),
379 ) {
380 Err(e) => {
381 query.progress = DHTProgress::NotStarted;
383 query.backoff.start_next(false);
384 error!("Error publishing to DHT: {e:?} for peer {:?}", self.peer_id);
385 },
386 Ok(qid) => {
387 debug!("Published record to DHT with qid {qid:?}");
388 let query = KadPutQuery {
389 progress: DHTProgress::InProgress(qid),
390 ..query
391 };
392 self.dht_handler.put_record(qid, query);
393 },
394 }
395 }
396
397 #[instrument(skip(self))]
406 async fn handle_client_requests(
407 &mut self,
408 msg: Option<ClientRequest>,
409 ) -> Result<bool, NetworkError> {
410 let behaviour = self.swarm.behaviour_mut();
411 match msg {
412 Some(msg) => {
413 match msg {
414 ClientRequest::BeginBootstrap => {
415 debug!("Beginning Libp2p bootstrap");
416 let _ = self.swarm.behaviour_mut().dht.bootstrap();
417 },
418 ClientRequest::LookupPeer(pid, chan) => {
419 let id = self.swarm.behaviour_mut().dht.get_closest_peers(pid);
420 self.dht_handler
421 .in_progress_get_closest_peers
422 .insert(id, chan);
423 },
424 ClientRequest::GetRoutingTable(chan) => {
425 self.dht_handler
426 .print_routing_table(&mut self.swarm.behaviour_mut().dht);
427 if chan.send(()).is_err() {
428 warn!("Tried to notify client but client not tracking anymore");
429 }
430 },
431 ClientRequest::PutDHT { key, value, notify } => {
432 let query = KadPutQuery {
433 progress: DHTProgress::NotStarted,
434 notify,
435 key,
436 value,
437 backoff: ExponentialBackoff::default(),
438 };
439 self.put_record(query);
440 },
441 ClientRequest::GetConnectedPeerNum(s) => {
442 if s.send(self.num_connected()).is_err() {
443 error!("error sending peer number to client");
444 }
445 },
446 ClientRequest::GetConnectedPeers(s) => {
447 if s.send(self.connected_pids()).is_err() {
448 error!("error sending peer set to client");
449 }
450 },
451 ClientRequest::GetDHT {
452 key,
453 notify,
454 retry_count,
455 } => {
456 self.dht_handler.get_record(
457 key,
458 notify,
459 ExponentialBackoff::default(),
460 retry_count,
461 &mut self.swarm.behaviour_mut().dht,
462 );
463 },
464 ClientRequest::IgnorePeers(_peers) => {
465 },
467 ClientRequest::Shutdown => {
468 if let Some(listener_id) = self.listener_id {
469 self.swarm.remove_listener(listener_id);
470 }
471
472 return Ok(true);
473 },
474 ClientRequest::GossipMsg(topic, contents) => {
475 behaviour.publish_gossip(Topic::new(topic.clone()), contents.clone());
476 },
477 ClientRequest::Subscribe(t, chan) => {
478 behaviour.subscribe_gossip(&t);
479 if let Some(chan) = chan
480 && chan.send(()).is_err()
481 {
482 error!("finished subscribing but response channel dropped");
483 }
484 },
485 ClientRequest::Unsubscribe(t, chan) => {
486 behaviour.unsubscribe_gossip(&t);
487 if let Some(chan) = chan
488 && chan.send(()).is_err()
489 {
490 error!("finished unsubscribing but response channel dropped");
491 }
492 },
493 ClientRequest::DirectRequest {
494 pid,
495 contents,
496 retry_count,
497 } => {
498 debug!("Sending direct request to {pid:?}");
499 let id = behaviour.add_direct_request(pid, contents.clone());
500 let req = DMRequest {
501 peer_id: pid,
502 data: contents,
503 backoff: ExponentialBackoff::default(),
504 retry_count,
505 };
506 self.direct_message_state.add_direct_request(req, id);
507 },
508 ClientRequest::DirectResponse(chan, msg) => {
509 behaviour.add_direct_response(chan, msg);
510 },
511 ClientRequest::AddKnownPeers(peers) => {
512 self.add_known_peers(&peers);
513 },
514 ClientRequest::Prune(pid) => {
515 if self.swarm.disconnect_peer_id(pid).is_err() {
516 warn!("Could not disconnect from {pid:?}");
517 }
518 },
519 }
520 },
521 None => {
522 error!("Error receiving msg in main behaviour loop: channel closed");
523 },
524 }
525 Ok(false)
526 }
527
528 #[allow(clippy::type_complexity)]
530 #[instrument(skip(self))]
531 async fn handle_swarm_events(
532 &mut self,
533 event: SwarmEvent<NetworkEventInternal>,
534 send_to_client: &UnboundedSender<NetworkEvent>,
535 ) -> Result<(), NetworkError> {
536 debug!("Swarm event observed {:?}", event);
538
539 #[allow(deprecated)]
540 match event {
541 SwarmEvent::ConnectionEstablished {
542 connection_id: _,
543 peer_id,
544 endpoint,
545 num_established,
546 concurrent_dial_errors,
547 established_in: _established_in,
548 } => {
549 if num_established > ESTABLISHED_LIMIT {
550 error!(
551 "Num concurrent connections to a single peer exceeding \
552 {ESTABLISHED_LIMIT:?} at {num_established:?}!"
553 );
554 } else {
555 debug!(
556 "Connection established with {peer_id:?} at {endpoint:?} with \
557 {concurrent_dial_errors:?} concurrent dial errors"
558 );
559 }
560
561 send_to_client
563 .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
564 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
565 },
566 SwarmEvent::ConnectionClosed {
567 connection_id: _,
568 peer_id,
569 endpoint,
570 num_established,
571 cause,
572 } => {
573 if num_established > ESTABLISHED_LIMIT_UNWR {
574 error!(
575 "Num concurrent connections to a single peer exceeding \
576 {ESTABLISHED_LIMIT:?} at {num_established:?}!"
577 );
578 } else {
579 debug!("Connection closed with {peer_id:?} at {endpoint:?} due to {cause:?}");
580 }
581
582 if num_established == 0 {
584 self.consensus_key_to_pid_map
585 .lock()
586 .remove_by_right(&peer_id);
587 }
588
589 send_to_client
591 .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
592 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
593 },
594 SwarmEvent::Dialing {
595 peer_id,
596 connection_id: _,
597 } => {
598 debug!("Attempting to dial {peer_id:?}");
599 },
600 SwarmEvent::ListenerClosed {
601 listener_id: _,
602 addresses: _,
603 reason: _,
604 }
605 | SwarmEvent::NewListenAddr {
606 listener_id: _,
607 address: _,
608 }
609 | SwarmEvent::ExpiredListenAddr {
610 listener_id: _,
611 address: _,
612 }
613 | SwarmEvent::NewExternalAddrCandidate { .. }
614 | SwarmEvent::ExternalAddrExpired { .. }
615 | SwarmEvent::IncomingConnection {
616 connection_id: _,
617 local_addr: _,
618 send_back_addr: _,
619 } => {},
620 SwarmEvent::Behaviour(b) => {
621 let maybe_event = match b {
622 NetworkEventInternal::DHTEvent(e) => self
623 .dht_handler
624 .dht_handle_event(e, self.swarm.behaviour_mut().dht.store_mut()),
625 NetworkEventInternal::IdentifyEvent(e) => {
626 if let IdentifyEvent::Received {
628 peer_id,
629 info:
630 IdentifyInfo {
631 listen_addrs,
632 protocols: _,
633 public_key: _,
634 protocol_version: _,
635 agent_version: _,
636 observed_addr: _,
637 signed_peer_record: _,
638 },
639 connection_id: _,
640 } = *e
641 {
642 let behaviour = self.swarm.behaviour_mut();
643
644 for addr in listen_addrs.iter().collect::<HashSet<_>>() {
646 behaviour.dht.add_address(&peer_id, addr.clone());
647 }
648 }
649 None
650 },
651 NetworkEventInternal::GossipEvent(e) => match *e {
652 GossipEvent::Message {
653 propagation_source: _peer_id,
654 message_id: _id,
655 message,
656 } => Some(NetworkEvent::GossipMsg(message.data)),
657 GossipEvent::Subscribed { peer_id, topic } => {
658 debug!("Peer {peer_id:?} subscribed to topic {topic:?}");
659 None
660 },
661 GossipEvent::Unsubscribed { peer_id, topic } => {
662 debug!("Peer {peer_id:?} unsubscribed from topic {topic:?}");
663 None
664 },
665 GossipEvent::GossipsubNotSupported { peer_id } => {
666 warn!("Peer {peer_id:?} does not support gossipsub");
667 None
668 },
669 GossipEvent::SlowPeer {
670 peer_id,
671 failed_messages: _,
672 } => {
673 warn!("Peer {peer_id:?} is slow");
674 None
675 },
676 },
677 NetworkEventInternal::DMEvent(e) => self
678 .direct_message_state
679 .handle_dm_event(e, self.resend_tx.clone()),
680 NetworkEventInternal::AutonatEvent(e) => {
681 match e {
682 autonat::Event::InboundProbe(_) => {},
683 autonat::Event::OutboundProbe(e) => match e {
684 autonat::OutboundProbeEvent::Request { .. }
685 | autonat::OutboundProbeEvent::Response { .. } => {},
686 autonat::OutboundProbeEvent::Error {
687 probe_id: _,
688 peer,
689 error,
690 } => {
691 warn!(
692 "AutoNAT Probe failed to peer {peer:?} with error: \
693 {error:?}"
694 );
695 },
696 },
697 autonat::Event::StatusChanged { old, new } => {
698 debug!("AutoNAT Status changed. Old: {old:?}, New: {new:?}");
699 },
700 };
701 None
702 },
703 };
704
705 if let Some(event) = maybe_event {
706 send_to_client
708 .send(event)
709 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
710 }
711 },
712 SwarmEvent::OutgoingConnectionError {
713 connection_id: _,
714 peer_id,
715 error,
716 } => {
717 warn!("Outgoing connection error to {peer_id:?}: {error:?}");
718 },
719 SwarmEvent::IncomingConnectionError {
720 connection_id: _,
721 local_addr: _,
722 send_back_addr: _,
723 error,
724 peer_id: _,
725 } => {
726 warn!("Incoming connection error: {error:?}");
727 },
728 SwarmEvent::ListenerError {
729 listener_id: _,
730 error,
731 } => {
732 warn!("Listener error: {error:?}");
733 },
734 SwarmEvent::ExternalAddrConfirmed { address } => {
735 let my_id = *self.swarm.local_peer_id();
736 self.swarm
737 .behaviour_mut()
738 .dht
739 .add_address(&my_id, address.clone());
740 },
741 SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
742 self.swarm
743 .behaviour_mut()
744 .dht
745 .add_address(&peer_id, address.clone());
746 },
747 _ => {
748 debug!("Unhandled swarm event {event:?}");
749 },
750 }
751 Ok(())
752 }
753
754 pub fn spawn_listeners(
760 mut self,
761 ) -> Result<
762 (
763 UnboundedSender<ClientRequest>,
764 UnboundedReceiver<NetworkEvent>,
765 ),
766 NetworkError,
767 > {
768 let (s_input, mut s_output) = unbounded_channel::<ClientRequest>();
769 let (r_input, r_output) = unbounded_channel::<NetworkEvent>();
770 let (mut bootstrap_tx, bootstrap_rx) = mpsc::channel(100);
771 self.resend_tx = Some(s_input.clone());
772 self.dht_handler.set_bootstrap_sender(bootstrap_tx.clone());
773
774 DHTBootstrapTask::run(bootstrap_rx, s_input.clone());
775 spawn(
776 async move {
777 loop {
778 select! {
779 event = self.swarm.next() => {
780 debug!("peerid {:?}\t\thandling maybe event {:?}", self.peer_id, event);
781 if let Some(event) = event {
782 debug!("peerid {:?}\t\thandling event {:?}", self.peer_id, event);
783 self.handle_swarm_events(event, &r_input).await?;
784 }
785 },
786 msg = s_output.recv() => {
787 debug!("peerid {:?}\t\thandling msg {:?}", self.peer_id, msg);
788 let shutdown = self.handle_client_requests(msg).await?;
789 if shutdown {
790 let _ = bootstrap_tx.send(InputEvent::ShutdownBootstrap).await;
791 break
792 }
793 }
794 }
795 }
796 Ok::<(), NetworkError>(())
797 }
798 .instrument(info_span!("Libp2p NetworkBehaviour Handler")),
799 );
800 Ok((s_input, r_output))
801 }
802
803 pub fn peer_id(&self) -> PeerId {
805 self.peer_id
806 }
807}