1mod config;
9
10mod handle;
13
14use std::{
15 collections::{HashMap, HashSet},
16 iter,
17 num::{NonZeroU32, NonZeroUsize},
18 sync::Arc,
19 time::{Duration, Instant},
20};
21
22use bimap::BiMap;
23use futures::{channel::mpsc, SinkExt, StreamExt};
24use hotshot_types::{
25 constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, traits::node_implementation::NodeType,
26};
27use libp2p::{
28 autonat,
29 core::{transport::ListenerId, upgrade::Version::V1Lazy},
30 gossipsub::{
31 Behaviour as Gossipsub, ConfigBuilder as GossipsubConfigBuilder, Event as GossipEvent,
32 Message as GossipsubMessage, MessageAuthenticity, MessageId, Topic, ValidationMode,
33 },
34 identify::{
35 Behaviour as IdentifyBehaviour, Config as IdentifyConfig, Event as IdentifyEvent,
36 Info as IdentifyInfo,
37 },
38 identity::Keypair,
39 kad::{store::MemoryStore, Behaviour, Config, Mode, Record},
40 request_response::{
41 Behaviour as RequestResponse, Config as Libp2pRequestResponseConfig, ProtocolSupport,
42 },
43 swarm::SwarmEvent,
44 Multiaddr, StreamProtocol, Swarm, SwarmBuilder,
45};
46use libp2p_identity::PeerId;
47use parking_lot::Mutex;
48use rand::{prelude::SliceRandom, thread_rng};
49use tokio::{
50 select, spawn,
51 sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
52};
53use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
54
55pub use self::{
56 config::{
57 GossipConfig, NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeConfigBuilderError,
58 RequestResponseConfig, DEFAULT_REPLICATION_FACTOR,
59 },
60 handle::{spawn_network_node, NetworkNodeHandle, NetworkNodeReceiver},
61};
62use super::{
63 behaviours::dht::{
64 bootstrap::{DHTBootstrapTask, InputEvent},
65 store::{
66 persistent::{DhtPersistentStorage, PersistentStore},
67 validated::ValidatedStore,
68 },
69 },
70 cbor::Cbor,
71 gen_transport, BoxedTransport, ClientRequest, NetworkDef, NetworkError, NetworkEvent,
72 NetworkEventInternal,
73};
74use crate::network::behaviours::{
75 dht::{DHTBehaviour, DHTProgress, KadPutQuery},
76 direct_message::{DMBehaviour, DMRequest},
77 exponential_backoff::ExponentialBackoff,
78};
79
80pub const MAX_GOSSIP_MSG_SIZE: usize = 2_000_000_000;
82
83pub const ESTABLISHED_LIMIT: NonZeroU32 = NonZeroU32::new(ESTABLISHED_LIMIT_UNWR).unwrap();
85pub const ESTABLISHED_LIMIT_UNWR: u32 = 10;
87
88#[derive(derive_more::Debug)]
90pub struct NetworkNode<T: NodeType, D: DhtPersistentStorage> {
91 peer_id: PeerId,
93 #[debug(skip)]
95 swarm: Swarm<NetworkDef<T::SignatureKey, D>>,
96 kademlia_record_ttl: Duration,
98 consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
100 listener_id: Option<ListenerId>,
102 direct_message_state: DMBehaviour,
104 dht_handler: DHTBehaviour<T::SignatureKey, D>,
106 resend_tx: Option<UnboundedSender<ClientRequest>>,
108}
109
110impl<T: NodeType, D: DhtPersistentStorage> NetworkNode<T, D> {
111 pub fn num_connected(&self) -> usize {
113 self.swarm.connected_peers().count()
114 }
115
116 pub fn connected_pids(&self) -> HashSet<PeerId> {
118 self.swarm.connected_peers().copied().collect()
119 }
120
121 #[instrument(skip(self))]
125 pub async fn start_listen(
126 &mut self,
127 listen_addr: Multiaddr,
128 ) -> Result<Multiaddr, NetworkError> {
129 self.listener_id = Some(self.swarm.listen_on(listen_addr).map_err(|err| {
130 NetworkError::ListenError(format!("failed to listen for Libp2p: {err}"))
131 })?);
132 let addr = loop {
133 if let Some(SwarmEvent::NewListenAddr { address, .. }) = self.swarm.next().await {
134 break address;
135 }
136 };
137 info!("Libp2p listening on {addr:?}");
138 Ok(addr)
139 }
140
141 #[instrument(skip(self))]
146 pub fn add_known_peers(&mut self, known_peers: &[(PeerId, Multiaddr)]) {
147 debug!("Adding {} known peers", known_peers.len());
148 let behaviour = self.swarm.behaviour_mut();
149 let mut bs_nodes = HashMap::<PeerId, HashSet<Multiaddr>>::new();
150 let mut shuffled = known_peers.iter().collect::<Vec<_>>();
151 shuffled.shuffle(&mut thread_rng());
152 for (peer_id, addr) in shuffled {
153 if *peer_id != self.peer_id {
154 behaviour.dht.add_address(peer_id, addr.clone());
155 behaviour.autonat.add_server(*peer_id, Some(addr.clone()));
156 bs_nodes.insert(*peer_id, iter::once(addr.clone()).collect());
157 }
158 }
159 }
160
161 #[allow(clippy::too_many_lines)]
175 pub async fn new(
176 config: NetworkNodeConfig,
177 dht_persistent_storage: D,
178 consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
179 ) -> Result<Self, NetworkError> {
180 let keypair = config
182 .keypair
183 .clone()
184 .unwrap_or_else(Keypair::generate_ed25519);
185
186 let peer_id = PeerId::from(keypair.public());
188
189 let transport: BoxedTransport = gen_transport::<T>(
191 keypair.clone(),
192 config.auth_message.clone(),
193 Arc::clone(&consensus_key_to_pid_map),
194 )
195 .await?;
196
197 let kademlia_record_republication_interval = config
199 .republication_interval
200 .unwrap_or(Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC));
201
202 let kademlia_ttl = config
204 .ttl
205 .unwrap_or(16 * kademlia_record_republication_interval);
206
207 let mut swarm: Swarm<NetworkDef<T::SignatureKey, D>> = {
209 let message_id_fn = |message: &GossipsubMessage| {
211 let hash = blake3::hash(&message.data);
212 MessageId::from(hash.as_bytes().to_vec())
213 };
214
215 let gossipsub_config = GossipsubConfigBuilder::default()
217 .message_id_fn(message_id_fn) .validation_mode(ValidationMode::Strict) .heartbeat_interval(config.gossip_config.heartbeat_interval) .history_gossip(config.gossip_config.history_gossip) .history_length(config.gossip_config.history_length) .mesh_n(config.gossip_config.mesh_n) .mesh_n_high(config.gossip_config.mesh_n_high) .mesh_n_low(config.gossip_config.mesh_n_low) .mesh_outbound_min(config.gossip_config.mesh_outbound_min) .max_transmit_size(config.gossip_config.max_transmit_size) .max_ihave_length(config.gossip_config.max_ihave_length) .max_ihave_messages(config.gossip_config.max_ihave_messages) .published_message_ids_cache_time(
230 config.gossip_config.published_message_ids_cache_time,
231 ) .iwant_followup_time(config.gossip_config.iwant_followup_time) .max_messages_per_rpc(config.gossip_config.max_messages_per_rpc) .gossip_retransimission(config.gossip_config.gossip_retransmission) .flood_publish(config.gossip_config.flood_publish) .duplicate_cache_time(config.gossip_config.duplicate_cache_time) .fanout_ttl(config.gossip_config.fanout_ttl) .heartbeat_initial_delay(config.gossip_config.heartbeat_initial_delay) .gossip_factor(config.gossip_config.gossip_factor) .gossip_lazy(config.gossip_config.gossip_lazy) .build()
242 .map_err(|err| {
243 NetworkError::ConfigError(format!("error building gossipsub config: {err:?}"))
244 })?;
245
246 let gossipsub: Gossipsub = Gossipsub::new(
248 MessageAuthenticity::Signed(keypair.clone()),
249 gossipsub_config,
250 )
251 .map_err(|err| {
252 NetworkError::ConfigError(format!("error building gossipsub behaviour: {err:?}"))
253 })?;
254
255 let identify_cfg =
260 IdentifyConfig::new("HotShot/identify/1.0".to_string(), keypair.public());
261 let identify = IdentifyBehaviour::new(identify_cfg);
262
263 let mut kconfig = Config::new(StreamProtocol::new("/ipfs/kad/1.0.0"));
265 kconfig
266 .set_parallelism(NonZeroUsize::new(5).unwrap())
267 .set_provider_publication_interval(Some(kademlia_record_republication_interval))
268 .set_publication_interval(Some(kademlia_record_republication_interval))
269 .set_record_ttl(Some(kademlia_ttl));
270
271 #[allow(clippy::panic)]
273 if let Some(factor) = config.replication_factor {
274 kconfig.set_replication_factor(factor);
275 } else {
276 panic!("Replication factor not set");
277 }
278
279 let mut kadem = Behaviour::with_config(
281 peer_id,
282 PersistentStore::new(
283 ValidatedStore::new(MemoryStore::new(peer_id)),
284 dht_persistent_storage,
285 5,
286 )
287 .await,
288 kconfig,
289 );
290 kadem.set_mode(Some(Mode::Server));
291
292 let rrconfig = Libp2pRequestResponseConfig::default();
293
294 let cbor = Cbor::new(
296 config.request_response_config.request_size_maximum,
297 config.request_response_config.response_size_maximum,
298 );
299
300 let direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>> =
301 RequestResponse::with_codec(
302 cbor,
303 [(
304 StreamProtocol::new("/HotShot/direct_message/1.0"),
305 ProtocolSupport::Full,
306 )]
307 .into_iter(),
308 rrconfig.clone(),
309 );
310
311 let autonat_config = autonat::Config {
312 only_global_ips: false,
313 ..Default::default()
314 };
315
316 let network = NetworkDef::new(
317 gossipsub,
318 kadem,
319 identify,
320 direct_message,
321 autonat::Behaviour::new(peer_id, autonat_config),
322 );
323
324 let swarm = SwarmBuilder::with_existing_identity(keypair.clone());
326 let swarm = swarm.with_tokio();
327
328 swarm
329 .with_other_transport(|_| transport)
330 .unwrap()
331 .with_behaviour(|_| network)
332 .unwrap()
333 .with_swarm_config(|cfg| {
334 cfg.with_idle_connection_timeout(Duration::from_secs(10))
335 .with_substream_upgrade_protocol_override(V1Lazy)
336 })
337 .build()
338 };
339 for (peer, addr) in &config.to_connect_addrs {
340 if peer != swarm.local_peer_id() {
341 swarm.behaviour_mut().add_address(peer, addr.clone());
342 swarm.add_peer_address(*peer, addr.clone());
343 }
344 }
345
346 Ok(Self {
347 peer_id,
348 swarm,
349 kademlia_record_ttl: kademlia_ttl,
350 consensus_key_to_pid_map,
351 listener_id: None,
352 direct_message_state: DMBehaviour::default(),
353 dht_handler: DHTBehaviour::new(
354 peer_id,
355 config
356 .replication_factor
357 .unwrap_or(NonZeroUsize::new(4).unwrap()),
358 ),
359 resend_tx: None,
360 })
361 }
362
363 pub fn put_record(&mut self, mut query: KadPutQuery) {
368 let mut record = Record::new(query.key.clone(), query.value.clone());
370
371 record.expires = Some(Instant::now() + self.kademlia_record_ttl);
373
374 match self.swarm.behaviour_mut().dht.put_record(
375 record,
376 libp2p::kad::Quorum::N(
377 NonZeroUsize::try_from(self.dht_handler.replication_factor().get() / 2)
378 .expect("replication factor should be bigger than 0"),
379 ),
380 ) {
381 Err(e) => {
382 query.progress = DHTProgress::NotStarted;
384 query.backoff.start_next(false);
385 error!("Error publishing to DHT: {e:?} for peer {:?}", self.peer_id);
386 },
387 Ok(qid) => {
388 debug!("Published record to DHT with qid {qid:?}");
389 let query = KadPutQuery {
390 progress: DHTProgress::InProgress(qid),
391 ..query
392 };
393 self.dht_handler.put_record(qid, query);
394 },
395 }
396 }
397
398 #[instrument(skip(self))]
407 async fn handle_client_requests(
408 &mut self,
409 msg: Option<ClientRequest>,
410 ) -> Result<bool, NetworkError> {
411 let behaviour = self.swarm.behaviour_mut();
412 match msg {
413 Some(msg) => {
414 match msg {
415 ClientRequest::BeginBootstrap => {
416 debug!("Beginning Libp2p bootstrap");
417 let _ = self.swarm.behaviour_mut().dht.bootstrap();
418 },
419 ClientRequest::LookupPeer(pid, chan) => {
420 let id = self.swarm.behaviour_mut().dht.get_closest_peers(pid);
421 self.dht_handler
422 .in_progress_get_closest_peers
423 .insert(id, chan);
424 },
425 ClientRequest::GetRoutingTable(chan) => {
426 self.dht_handler
427 .print_routing_table(&mut self.swarm.behaviour_mut().dht);
428 if chan.send(()).is_err() {
429 warn!("Tried to notify client but client not tracking anymore");
430 }
431 },
432 ClientRequest::PutDHT { key, value, notify } => {
433 let query = KadPutQuery {
434 progress: DHTProgress::NotStarted,
435 notify,
436 key,
437 value,
438 backoff: ExponentialBackoff::default(),
439 };
440 self.put_record(query);
441 },
442 ClientRequest::GetConnectedPeerNum(s) => {
443 if s.send(self.num_connected()).is_err() {
444 error!("error sending peer number to client");
445 }
446 },
447 ClientRequest::GetConnectedPeers(s) => {
448 if s.send(self.connected_pids()).is_err() {
449 error!("error sending peer set to client");
450 }
451 },
452 ClientRequest::GetDHT {
453 key,
454 notify,
455 retry_count,
456 } => {
457 self.dht_handler.get_record(
458 key,
459 notify,
460 ExponentialBackoff::default(),
461 retry_count,
462 &mut self.swarm.behaviour_mut().dht,
463 );
464 },
465 ClientRequest::IgnorePeers(_peers) => {
466 },
468 ClientRequest::Shutdown => {
469 if let Some(listener_id) = self.listener_id {
470 self.swarm.remove_listener(listener_id);
471 }
472
473 return Ok(true);
474 },
475 ClientRequest::GossipMsg(topic, contents) => {
476 behaviour.publish_gossip(Topic::new(topic.clone()), contents.clone());
477 },
478 ClientRequest::Subscribe(t, chan) => {
479 behaviour.subscribe_gossip(&t);
480 if let Some(chan) = chan {
481 if chan.send(()).is_err() {
482 error!("finished subscribing but response channel dropped");
483 }
484 }
485 },
486 ClientRequest::Unsubscribe(t, chan) => {
487 behaviour.unsubscribe_gossip(&t);
488 if let Some(chan) = chan {
489 if chan.send(()).is_err() {
490 error!("finished unsubscribing but response channel dropped");
491 }
492 }
493 },
494 ClientRequest::DirectRequest {
495 pid,
496 contents,
497 retry_count,
498 } => {
499 debug!("Sending direct request to {pid:?}");
500 let id = behaviour.add_direct_request(pid, contents.clone());
501 let req = DMRequest {
502 peer_id: pid,
503 data: contents,
504 backoff: ExponentialBackoff::default(),
505 retry_count,
506 };
507 self.direct_message_state.add_direct_request(req, id);
508 },
509 ClientRequest::DirectResponse(chan, msg) => {
510 behaviour.add_direct_response(chan, msg);
511 },
512 ClientRequest::AddKnownPeers(peers) => {
513 self.add_known_peers(&peers);
514 },
515 ClientRequest::Prune(pid) => {
516 if self.swarm.disconnect_peer_id(pid).is_err() {
517 warn!("Could not disconnect from {pid:?}");
518 }
519 },
520 }
521 },
522 None => {
523 error!("Error receiving msg in main behaviour loop: channel closed");
524 },
525 }
526 Ok(false)
527 }
528
529 #[allow(clippy::type_complexity)]
531 #[instrument(skip(self))]
532 async fn handle_swarm_events(
533 &mut self,
534 event: SwarmEvent<NetworkEventInternal>,
535 send_to_client: &UnboundedSender<NetworkEvent>,
536 ) -> Result<(), NetworkError> {
537 debug!("Swarm event observed {:?}", event);
539
540 #[allow(deprecated)]
541 match event {
542 SwarmEvent::ConnectionEstablished {
543 connection_id: _,
544 peer_id,
545 endpoint,
546 num_established,
547 concurrent_dial_errors,
548 established_in: _established_in,
549 } => {
550 if num_established > ESTABLISHED_LIMIT {
551 error!(
552 "Num concurrent connections to a single peer exceeding \
553 {ESTABLISHED_LIMIT:?} at {num_established:?}!"
554 );
555 } else {
556 debug!(
557 "Connection established with {peer_id:?} at {endpoint:?} with \
558 {concurrent_dial_errors:?} concurrent dial errors"
559 );
560 }
561
562 send_to_client
564 .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
565 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
566 },
567 SwarmEvent::ConnectionClosed {
568 connection_id: _,
569 peer_id,
570 endpoint,
571 num_established,
572 cause,
573 } => {
574 if num_established > ESTABLISHED_LIMIT_UNWR {
575 error!(
576 "Num concurrent connections to a single peer exceeding \
577 {ESTABLISHED_LIMIT:?} at {num_established:?}!"
578 );
579 } else {
580 debug!("Connection closed with {peer_id:?} at {endpoint:?} due to {cause:?}");
581 }
582
583 if num_established == 0 {
585 self.consensus_key_to_pid_map
586 .lock()
587 .remove_by_right(&peer_id);
588 }
589
590 send_to_client
592 .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
593 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
594 },
595 SwarmEvent::Dialing {
596 peer_id,
597 connection_id: _,
598 } => {
599 debug!("Attempting to dial {peer_id:?}");
600 },
601 SwarmEvent::ListenerClosed {
602 listener_id: _,
603 addresses: _,
604 reason: _,
605 }
606 | SwarmEvent::NewListenAddr {
607 listener_id: _,
608 address: _,
609 }
610 | SwarmEvent::ExpiredListenAddr {
611 listener_id: _,
612 address: _,
613 }
614 | SwarmEvent::NewExternalAddrCandidate { .. }
615 | SwarmEvent::ExternalAddrExpired { .. }
616 | SwarmEvent::IncomingConnection {
617 connection_id: _,
618 local_addr: _,
619 send_back_addr: _,
620 } => {},
621 SwarmEvent::Behaviour(b) => {
622 let maybe_event = match b {
623 NetworkEventInternal::DHTEvent(e) => self
624 .dht_handler
625 .dht_handle_event(e, self.swarm.behaviour_mut().dht.store_mut()),
626 NetworkEventInternal::IdentifyEvent(e) => {
627 if let IdentifyEvent::Received {
629 peer_id,
630 info:
631 IdentifyInfo {
632 listen_addrs,
633 protocols: _,
634 public_key: _,
635 protocol_version: _,
636 agent_version: _,
637 observed_addr: _,
638 signed_peer_record: _,
639 },
640 connection_id: _,
641 } = *e
642 {
643 let behaviour = self.swarm.behaviour_mut();
644
645 for addr in listen_addrs.iter().collect::<HashSet<_>>() {
647 behaviour.dht.add_address(&peer_id, addr.clone());
648 }
649 }
650 None
651 },
652 NetworkEventInternal::GossipEvent(e) => match *e {
653 GossipEvent::Message {
654 propagation_source: _peer_id,
655 message_id: _id,
656 message,
657 } => Some(NetworkEvent::GossipMsg(message.data)),
658 GossipEvent::Subscribed { peer_id, topic } => {
659 debug!("Peer {peer_id:?} subscribed to topic {topic:?}");
660 None
661 },
662 GossipEvent::Unsubscribed { peer_id, topic } => {
663 debug!("Peer {peer_id:?} unsubscribed from topic {topic:?}");
664 None
665 },
666 GossipEvent::GossipsubNotSupported { peer_id } => {
667 warn!("Peer {peer_id:?} does not support gossipsub");
668 None
669 },
670 GossipEvent::SlowPeer {
671 peer_id,
672 failed_messages: _,
673 } => {
674 warn!("Peer {peer_id:?} is slow");
675 None
676 },
677 },
678 NetworkEventInternal::DMEvent(e) => self
679 .direct_message_state
680 .handle_dm_event(e, self.resend_tx.clone()),
681 NetworkEventInternal::AutonatEvent(e) => {
682 match e {
683 autonat::Event::InboundProbe(_) => {},
684 autonat::Event::OutboundProbe(e) => match e {
685 autonat::OutboundProbeEvent::Request { .. }
686 | autonat::OutboundProbeEvent::Response { .. } => {},
687 autonat::OutboundProbeEvent::Error {
688 probe_id: _,
689 peer,
690 error,
691 } => {
692 warn!(
693 "AutoNAT Probe failed to peer {peer:?} with error: \
694 {error:?}"
695 );
696 },
697 },
698 autonat::Event::StatusChanged { old, new } => {
699 debug!("AutoNAT Status changed. Old: {old:?}, New: {new:?}");
700 },
701 };
702 None
703 },
704 };
705
706 if let Some(event) = maybe_event {
707 send_to_client
709 .send(event)
710 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
711 }
712 },
713 SwarmEvent::OutgoingConnectionError {
714 connection_id: _,
715 peer_id,
716 error,
717 } => {
718 warn!("Outgoing connection error to {peer_id:?}: {error:?}");
719 },
720 SwarmEvent::IncomingConnectionError {
721 connection_id: _,
722 local_addr: _,
723 send_back_addr: _,
724 error,
725 peer_id: _,
726 } => {
727 warn!("Incoming connection error: {error:?}");
728 },
729 SwarmEvent::ListenerError {
730 listener_id: _,
731 error,
732 } => {
733 warn!("Listener error: {error:?}");
734 },
735 SwarmEvent::ExternalAddrConfirmed { address } => {
736 let my_id = *self.swarm.local_peer_id();
737 self.swarm
738 .behaviour_mut()
739 .dht
740 .add_address(&my_id, address.clone());
741 },
742 SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
743 self.swarm
744 .behaviour_mut()
745 .dht
746 .add_address(&peer_id, address.clone());
747 },
748 _ => {
749 debug!("Unhandled swarm event {event:?}");
750 },
751 }
752 Ok(())
753 }
754
755 pub fn spawn_listeners(
761 mut self,
762 ) -> Result<
763 (
764 UnboundedSender<ClientRequest>,
765 UnboundedReceiver<NetworkEvent>,
766 ),
767 NetworkError,
768 > {
769 let (s_input, mut s_output) = unbounded_channel::<ClientRequest>();
770 let (r_input, r_output) = unbounded_channel::<NetworkEvent>();
771 let (mut bootstrap_tx, bootstrap_rx) = mpsc::channel(100);
772 self.resend_tx = Some(s_input.clone());
773 self.dht_handler.set_bootstrap_sender(bootstrap_tx.clone());
774
775 DHTBootstrapTask::run(bootstrap_rx, s_input.clone());
776 spawn(
777 async move {
778 loop {
779 select! {
780 event = self.swarm.next() => {
781 debug!("peerid {:?}\t\thandling maybe event {:?}", self.peer_id, event);
782 if let Some(event) = event {
783 debug!("peerid {:?}\t\thandling event {:?}", self.peer_id, event);
784 self.handle_swarm_events(event, &r_input).await?;
785 }
786 },
787 msg = s_output.recv() => {
788 debug!("peerid {:?}\t\thandling msg {:?}", self.peer_id, msg);
789 let shutdown = self.handle_client_requests(msg).await?;
790 if shutdown {
791 let _ = bootstrap_tx.send(InputEvent::ShutdownBootstrap).await;
792 break
793 }
794 }
795 }
796 }
797 Ok::<(), NetworkError>(())
798 }
799 .instrument(info_span!("Libp2p NetworkBehaviour Handler")),
800 );
801 Ok((s_input, r_output))
802 }
803
804 pub fn peer_id(&self) -> PeerId {
806 self.peer_id
807 }
808}