1mod config;
9
10mod handle;
13
14use std::{
15 collections::{HashMap, HashSet},
16 iter,
17 num::{NonZeroU32, NonZeroUsize},
18 sync::Arc,
19 time::{Duration, Instant},
20};
21
22use bimap::BiMap;
23use futures::{channel::mpsc, SinkExt, StreamExt};
24use hotshot_types::{
25 constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, traits::node_implementation::NodeType,
26};
27use libp2p::{
28 autonat,
29 core::{transport::ListenerId, upgrade::Version::V1Lazy},
30 gossipsub::{
31 Behaviour as Gossipsub, ConfigBuilder as GossipsubConfigBuilder, Event as GossipEvent,
32 Message as GossipsubMessage, MessageAuthenticity, MessageId, Topic, ValidationMode,
33 },
34 identify::{
35 Behaviour as IdentifyBehaviour, Config as IdentifyConfig, Event as IdentifyEvent,
36 Info as IdentifyInfo,
37 },
38 identity::Keypair,
39 kad::{store::MemoryStore, Behaviour, Config, Mode, Record},
40 request_response::{
41 Behaviour as RequestResponse, Config as Libp2pRequestResponseConfig, ProtocolSupport,
42 },
43 swarm::SwarmEvent,
44 Multiaddr, StreamProtocol, Swarm, SwarmBuilder,
45};
46use libp2p_identity::PeerId;
47use parking_lot::Mutex;
48use rand::{prelude::SliceRandom, thread_rng};
49use tokio::{
50 select, spawn,
51 sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
52};
53use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
54
55pub use self::{
56 config::{
57 GossipConfig, NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeConfigBuilderError,
58 RequestResponseConfig, DEFAULT_REPLICATION_FACTOR,
59 },
60 handle::{spawn_network_node, NetworkNodeHandle, NetworkNodeReceiver},
61};
62use super::{
63 behaviours::dht::{
64 bootstrap::{DHTBootstrapTask, InputEvent},
65 store::{
66 persistent::{DhtPersistentStorage, PersistentStore},
67 validated::ValidatedStore,
68 },
69 },
70 cbor::Cbor,
71 gen_transport, BoxedTransport, ClientRequest, NetworkDef, NetworkError, NetworkEvent,
72 NetworkEventInternal,
73};
74use crate::network::behaviours::{
75 dht::{DHTBehaviour, DHTProgress, KadPutQuery},
76 direct_message::{DMBehaviour, DMRequest},
77 exponential_backoff::ExponentialBackoff,
78};
79
80pub const MAX_GOSSIP_MSG_SIZE: usize = 2_000_000_000;
82
83pub const ESTABLISHED_LIMIT: NonZeroU32 = NonZeroU32::new(ESTABLISHED_LIMIT_UNWR).unwrap();
85pub const ESTABLISHED_LIMIT_UNWR: u32 = 10;
87
88#[derive(derive_more::Debug)]
90pub struct NetworkNode<T: NodeType, D: DhtPersistentStorage> {
91 peer_id: PeerId,
93 #[debug(skip)]
95 swarm: Swarm<NetworkDef<T::SignatureKey, D>>,
96 kademlia_record_ttl: Duration,
98 consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
100 listener_id: Option<ListenerId>,
102 direct_message_state: DMBehaviour,
104 dht_handler: DHTBehaviour<T::SignatureKey, D>,
106 resend_tx: Option<UnboundedSender<ClientRequest>>,
108}
109
110impl<T: NodeType, D: DhtPersistentStorage> NetworkNode<T, D> {
111 pub fn num_connected(&self) -> usize {
113 self.swarm.connected_peers().count()
114 }
115
116 pub fn connected_pids(&self) -> HashSet<PeerId> {
118 self.swarm.connected_peers().copied().collect()
119 }
120
121 #[instrument(skip(self))]
125 pub async fn start_listen(
126 &mut self,
127 listen_addr: Multiaddr,
128 ) -> Result<Multiaddr, NetworkError> {
129 self.listener_id = Some(self.swarm.listen_on(listen_addr).map_err(|err| {
130 NetworkError::ListenError(format!("failed to listen for Libp2p: {err}"))
131 })?);
132 let addr = loop {
133 if let Some(SwarmEvent::NewListenAddr { address, .. }) = self.swarm.next().await {
134 break address;
135 }
136 };
137 info!("Libp2p listening on {addr:?}");
138 Ok(addr)
139 }
140
141 #[instrument(skip(self))]
146 pub fn add_known_peers(&mut self, known_peers: &[(PeerId, Multiaddr)]) {
147 debug!("Adding {} known peers", known_peers.len());
148 let behaviour = self.swarm.behaviour_mut();
149 let mut bs_nodes = HashMap::<PeerId, HashSet<Multiaddr>>::new();
150 let mut shuffled = known_peers.iter().collect::<Vec<_>>();
151 shuffled.shuffle(&mut thread_rng());
152 for (peer_id, addr) in shuffled {
153 if *peer_id != self.peer_id {
154 behaviour.dht.add_address(peer_id, addr.clone());
155 behaviour.autonat.add_server(*peer_id, Some(addr.clone()));
156 bs_nodes.insert(*peer_id, iter::once(addr.clone()).collect());
157 }
158 }
159 }
160
161 #[allow(clippy::too_many_lines)]
175 pub async fn new(
176 config: NetworkNodeConfig<T>,
177 dht_persistent_storage: D,
178 consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
179 ) -> Result<Self, NetworkError> {
180 let keypair = config
182 .keypair
183 .clone()
184 .unwrap_or_else(Keypair::generate_ed25519);
185
186 let peer_id = PeerId::from(keypair.public());
188
189 let transport: BoxedTransport = gen_transport::<T>(
191 keypair.clone(),
192 config.membership.clone(),
193 config.auth_message.clone(),
194 Arc::clone(&consensus_key_to_pid_map),
195 )
196 .await?;
197
198 let kademlia_record_republication_interval = config
200 .republication_interval
201 .unwrap_or(Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC));
202
203 let kademlia_ttl = config
205 .ttl
206 .unwrap_or(16 * kademlia_record_republication_interval);
207
208 let mut swarm: Swarm<NetworkDef<T::SignatureKey, D>> = {
210 let message_id_fn = |message: &GossipsubMessage| {
212 let hash = blake3::hash(&message.data);
213 MessageId::from(hash.as_bytes().to_vec())
214 };
215
216 let gossipsub_config = GossipsubConfigBuilder::default()
218 .message_id_fn(message_id_fn) .validation_mode(ValidationMode::Strict) .heartbeat_interval(config.gossip_config.heartbeat_interval) .history_gossip(config.gossip_config.history_gossip) .history_length(config.gossip_config.history_length) .mesh_n(config.gossip_config.mesh_n) .mesh_n_high(config.gossip_config.mesh_n_high) .mesh_n_low(config.gossip_config.mesh_n_low) .mesh_outbound_min(config.gossip_config.mesh_outbound_min) .max_transmit_size(config.gossip_config.max_transmit_size) .max_ihave_length(config.gossip_config.max_ihave_length) .max_ihave_messages(config.gossip_config.max_ihave_messages) .published_message_ids_cache_time(
231 config.gossip_config.published_message_ids_cache_time,
232 ) .iwant_followup_time(config.gossip_config.iwant_followup_time) .max_messages_per_rpc(config.gossip_config.max_messages_per_rpc) .gossip_retransimission(config.gossip_config.gossip_retransmission) .flood_publish(config.gossip_config.flood_publish) .duplicate_cache_time(config.gossip_config.duplicate_cache_time) .fanout_ttl(config.gossip_config.fanout_ttl) .heartbeat_initial_delay(config.gossip_config.heartbeat_initial_delay) .gossip_factor(config.gossip_config.gossip_factor) .gossip_lazy(config.gossip_config.gossip_lazy) .build()
243 .map_err(|err| {
244 NetworkError::ConfigError(format!("error building gossipsub config: {err:?}"))
245 })?;
246
247 let gossipsub: Gossipsub = Gossipsub::new(
249 MessageAuthenticity::Signed(keypair.clone()),
250 gossipsub_config,
251 )
252 .map_err(|err| {
253 NetworkError::ConfigError(format!("error building gossipsub behaviour: {err:?}"))
254 })?;
255
256 let identify_cfg =
261 IdentifyConfig::new("HotShot/identify/1.0".to_string(), keypair.public());
262 let identify = IdentifyBehaviour::new(identify_cfg);
263
264 let mut kconfig = Config::new(StreamProtocol::new("/ipfs/kad/1.0.0"));
266 kconfig
267 .set_parallelism(NonZeroUsize::new(5).unwrap())
268 .set_provider_publication_interval(Some(kademlia_record_republication_interval))
269 .set_publication_interval(Some(kademlia_record_republication_interval))
270 .set_record_ttl(Some(kademlia_ttl));
271
272 #[allow(clippy::panic)]
274 if let Some(factor) = config.replication_factor {
275 kconfig.set_replication_factor(factor);
276 } else {
277 panic!("Replication factor not set");
278 }
279
280 let mut kadem = Behaviour::with_config(
282 peer_id,
283 PersistentStore::new(
284 ValidatedStore::new(MemoryStore::new(peer_id)),
285 dht_persistent_storage,
286 5,
287 )
288 .await,
289 kconfig,
290 );
291 kadem.set_mode(Some(Mode::Server));
292
293 let rrconfig = Libp2pRequestResponseConfig::default();
294
295 let cbor = Cbor::new(
297 config.request_response_config.request_size_maximum,
298 config.request_response_config.response_size_maximum,
299 );
300
301 let direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>> =
302 RequestResponse::with_codec(
303 cbor,
304 [(
305 StreamProtocol::new("/HotShot/direct_message/1.0"),
306 ProtocolSupport::Full,
307 )]
308 .into_iter(),
309 rrconfig.clone(),
310 );
311
312 let autonat_config = autonat::Config {
313 only_global_ips: false,
314 ..Default::default()
315 };
316
317 let network = NetworkDef::new(
318 gossipsub,
319 kadem,
320 identify,
321 direct_message,
322 autonat::Behaviour::new(peer_id, autonat_config),
323 );
324
325 let swarm = SwarmBuilder::with_existing_identity(keypair.clone());
327 let swarm = swarm.with_tokio();
328
329 swarm
330 .with_other_transport(|_| transport)
331 .unwrap()
332 .with_behaviour(|_| network)
333 .unwrap()
334 .with_swarm_config(|cfg| {
335 cfg.with_idle_connection_timeout(Duration::from_secs(10))
336 .with_substream_upgrade_protocol_override(V1Lazy)
337 })
338 .build()
339 };
340 for (peer, addr) in &config.to_connect_addrs {
341 if peer != swarm.local_peer_id() {
342 swarm.behaviour_mut().add_address(peer, addr.clone());
343 swarm.add_peer_address(*peer, addr.clone());
344 }
345 }
346
347 Ok(Self {
348 peer_id,
349 swarm,
350 kademlia_record_ttl: kademlia_ttl,
351 consensus_key_to_pid_map,
352 listener_id: None,
353 direct_message_state: DMBehaviour::default(),
354 dht_handler: DHTBehaviour::new(
355 peer_id,
356 config
357 .replication_factor
358 .unwrap_or(NonZeroUsize::new(4).unwrap()),
359 ),
360 resend_tx: None,
361 })
362 }
363
364 pub fn put_record(&mut self, mut query: KadPutQuery) {
369 let mut record = Record::new(query.key.clone(), query.value.clone());
371
372 record.expires = Some(Instant::now() + self.kademlia_record_ttl);
374
375 match self.swarm.behaviour_mut().dht.put_record(
376 record,
377 libp2p::kad::Quorum::N(
378 NonZeroUsize::try_from(self.dht_handler.replication_factor().get() / 2)
379 .expect("replication factor should be bigger than 0"),
380 ),
381 ) {
382 Err(e) => {
383 query.progress = DHTProgress::NotStarted;
385 query.backoff.start_next(false);
386 error!("Error publishing to DHT: {e:?} for peer {:?}", self.peer_id);
387 },
388 Ok(qid) => {
389 debug!("Published record to DHT with qid {qid:?}");
390 let query = KadPutQuery {
391 progress: DHTProgress::InProgress(qid),
392 ..query
393 };
394 self.dht_handler.put_record(qid, query);
395 },
396 }
397 }
398
399 #[instrument(skip(self))]
408 async fn handle_client_requests(
409 &mut self,
410 msg: Option<ClientRequest>,
411 ) -> Result<bool, NetworkError> {
412 let behaviour = self.swarm.behaviour_mut();
413 match msg {
414 Some(msg) => {
415 match msg {
416 ClientRequest::BeginBootstrap => {
417 debug!("Beginning Libp2p bootstrap");
418 let _ = self.swarm.behaviour_mut().dht.bootstrap();
419 },
420 ClientRequest::LookupPeer(pid, chan) => {
421 let id = self.swarm.behaviour_mut().dht.get_closest_peers(pid);
422 self.dht_handler
423 .in_progress_get_closest_peers
424 .insert(id, chan);
425 },
426 ClientRequest::GetRoutingTable(chan) => {
427 self.dht_handler
428 .print_routing_table(&mut self.swarm.behaviour_mut().dht);
429 if chan.send(()).is_err() {
430 warn!("Tried to notify client but client not tracking anymore");
431 }
432 },
433 ClientRequest::PutDHT { key, value, notify } => {
434 let query = KadPutQuery {
435 progress: DHTProgress::NotStarted,
436 notify,
437 key,
438 value,
439 backoff: ExponentialBackoff::default(),
440 };
441 self.put_record(query);
442 },
443 ClientRequest::GetConnectedPeerNum(s) => {
444 if s.send(self.num_connected()).is_err() {
445 error!("error sending peer number to client");
446 }
447 },
448 ClientRequest::GetConnectedPeers(s) => {
449 if s.send(self.connected_pids()).is_err() {
450 error!("error sending peer set to client");
451 }
452 },
453 ClientRequest::GetDHT {
454 key,
455 notify,
456 retry_count,
457 } => {
458 self.dht_handler.get_record(
459 key,
460 notify,
461 ExponentialBackoff::default(),
462 retry_count,
463 &mut self.swarm.behaviour_mut().dht,
464 );
465 },
466 ClientRequest::IgnorePeers(_peers) => {
467 },
469 ClientRequest::Shutdown => {
470 if let Some(listener_id) = self.listener_id {
471 self.swarm.remove_listener(listener_id);
472 }
473
474 return Ok(true);
475 },
476 ClientRequest::GossipMsg(topic, contents) => {
477 behaviour.publish_gossip(Topic::new(topic.clone()), contents.clone());
478 },
479 ClientRequest::Subscribe(t, chan) => {
480 behaviour.subscribe_gossip(&t);
481 if let Some(chan) = chan {
482 if chan.send(()).is_err() {
483 error!("finished subscribing but response channel dropped");
484 }
485 }
486 },
487 ClientRequest::Unsubscribe(t, chan) => {
488 behaviour.unsubscribe_gossip(&t);
489 if let Some(chan) = chan {
490 if chan.send(()).is_err() {
491 error!("finished unsubscribing but response channel dropped");
492 }
493 }
494 },
495 ClientRequest::DirectRequest {
496 pid,
497 contents,
498 retry_count,
499 } => {
500 debug!("Sending direct request to {pid:?}");
501 let id = behaviour.add_direct_request(pid, contents.clone());
502 let req = DMRequest {
503 peer_id: pid,
504 data: contents,
505 backoff: ExponentialBackoff::default(),
506 retry_count,
507 };
508 self.direct_message_state.add_direct_request(req, id);
509 },
510 ClientRequest::DirectResponse(chan, msg) => {
511 behaviour.add_direct_response(chan, msg);
512 },
513 ClientRequest::AddKnownPeers(peers) => {
514 self.add_known_peers(&peers);
515 },
516 ClientRequest::Prune(pid) => {
517 if self.swarm.disconnect_peer_id(pid).is_err() {
518 warn!("Could not disconnect from {pid:?}");
519 }
520 },
521 }
522 },
523 None => {
524 error!("Error receiving msg in main behaviour loop: channel closed");
525 },
526 }
527 Ok(false)
528 }
529
530 #[allow(clippy::type_complexity)]
532 #[instrument(skip(self))]
533 async fn handle_swarm_events(
534 &mut self,
535 event: SwarmEvent<NetworkEventInternal>,
536 send_to_client: &UnboundedSender<NetworkEvent>,
537 ) -> Result<(), NetworkError> {
538 debug!("Swarm event observed {:?}", event);
540
541 #[allow(deprecated)]
542 match event {
543 SwarmEvent::ConnectionEstablished {
544 connection_id: _,
545 peer_id,
546 endpoint,
547 num_established,
548 concurrent_dial_errors,
549 established_in: _established_in,
550 } => {
551 if num_established > ESTABLISHED_LIMIT {
552 error!(
553 "Num concurrent connections to a single peer exceeding \
554 {ESTABLISHED_LIMIT:?} at {num_established:?}!"
555 );
556 } else {
557 debug!(
558 "Connection established with {peer_id:?} at {endpoint:?} with \
559 {concurrent_dial_errors:?} concurrent dial errors"
560 );
561 }
562
563 send_to_client
565 .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
566 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
567 },
568 SwarmEvent::ConnectionClosed {
569 connection_id: _,
570 peer_id,
571 endpoint,
572 num_established,
573 cause,
574 } => {
575 if num_established > ESTABLISHED_LIMIT_UNWR {
576 error!(
577 "Num concurrent connections to a single peer exceeding \
578 {ESTABLISHED_LIMIT:?} at {num_established:?}!"
579 );
580 } else {
581 debug!("Connection closed with {peer_id:?} at {endpoint:?} due to {cause:?}");
582 }
583
584 if num_established == 0 {
586 self.consensus_key_to_pid_map
587 .lock()
588 .remove_by_right(&peer_id);
589 }
590
591 send_to_client
593 .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
594 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
595 },
596 SwarmEvent::Dialing {
597 peer_id,
598 connection_id: _,
599 } => {
600 debug!("Attempting to dial {peer_id:?}");
601 },
602 SwarmEvent::ListenerClosed {
603 listener_id: _,
604 addresses: _,
605 reason: _,
606 }
607 | SwarmEvent::NewListenAddr {
608 listener_id: _,
609 address: _,
610 }
611 | SwarmEvent::ExpiredListenAddr {
612 listener_id: _,
613 address: _,
614 }
615 | SwarmEvent::NewExternalAddrCandidate { .. }
616 | SwarmEvent::ExternalAddrExpired { .. }
617 | SwarmEvent::IncomingConnection {
618 connection_id: _,
619 local_addr: _,
620 send_back_addr: _,
621 } => {},
622 SwarmEvent::Behaviour(b) => {
623 let maybe_event = match b {
624 NetworkEventInternal::DHTEvent(e) => self
625 .dht_handler
626 .dht_handle_event(e, self.swarm.behaviour_mut().dht.store_mut()),
627 NetworkEventInternal::IdentifyEvent(e) => {
628 if let IdentifyEvent::Received {
630 peer_id,
631 info:
632 IdentifyInfo {
633 listen_addrs,
634 protocols: _,
635 public_key: _,
636 protocol_version: _,
637 agent_version: _,
638 observed_addr: _,
639 },
640 connection_id: _,
641 } = *e
642 {
643 let behaviour = self.swarm.behaviour_mut();
644
645 for addr in listen_addrs.iter().collect::<HashSet<_>>() {
647 behaviour.dht.add_address(&peer_id, addr.clone());
648 }
649 }
650 None
651 },
652 NetworkEventInternal::GossipEvent(e) => match *e {
653 GossipEvent::Message {
654 propagation_source: _peer_id,
655 message_id: _id,
656 message,
657 } => Some(NetworkEvent::GossipMsg(message.data)),
658 GossipEvent::Subscribed { peer_id, topic } => {
659 debug!("Peer {peer_id:?} subscribed to topic {topic:?}");
660 None
661 },
662 GossipEvent::Unsubscribed { peer_id, topic } => {
663 debug!("Peer {peer_id:?} unsubscribed from topic {topic:?}");
664 None
665 },
666 GossipEvent::GossipsubNotSupported { peer_id } => {
667 warn!("Peer {peer_id:?} does not support gossipsub");
668 None
669 },
670 },
671 NetworkEventInternal::DMEvent(e) => self
672 .direct_message_state
673 .handle_dm_event(e, self.resend_tx.clone()),
674 NetworkEventInternal::AutonatEvent(e) => {
675 match e {
676 autonat::Event::InboundProbe(_) => {},
677 autonat::Event::OutboundProbe(e) => match e {
678 autonat::OutboundProbeEvent::Request { .. }
679 | autonat::OutboundProbeEvent::Response { .. } => {},
680 autonat::OutboundProbeEvent::Error {
681 probe_id: _,
682 peer,
683 error,
684 } => {
685 warn!(
686 "AutoNAT Probe failed to peer {peer:?} with error: \
687 {error:?}"
688 );
689 },
690 },
691 autonat::Event::StatusChanged { old, new } => {
692 debug!("AutoNAT Status changed. Old: {old:?}, New: {new:?}");
693 },
694 };
695 None
696 },
697 };
698
699 if let Some(event) = maybe_event {
700 send_to_client
702 .send(event)
703 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
704 }
705 },
706 SwarmEvent::OutgoingConnectionError {
707 connection_id: _,
708 peer_id,
709 error,
710 } => {
711 warn!("Outgoing connection error to {peer_id:?}: {error:?}");
712 },
713 SwarmEvent::IncomingConnectionError {
714 connection_id: _,
715 local_addr: _,
716 send_back_addr: _,
717 error,
718 } => {
719 warn!("Incoming connection error: {error:?}");
720 },
721 SwarmEvent::ListenerError {
722 listener_id: _,
723 error,
724 } => {
725 warn!("Listener error: {error:?}");
726 },
727 SwarmEvent::ExternalAddrConfirmed { address } => {
728 let my_id = *self.swarm.local_peer_id();
729 self.swarm
730 .behaviour_mut()
731 .dht
732 .add_address(&my_id, address.clone());
733 },
734 SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
735 self.swarm
736 .behaviour_mut()
737 .dht
738 .add_address(&peer_id, address.clone());
739 },
740 _ => {
741 debug!("Unhandled swarm event {event:?}");
742 },
743 }
744 Ok(())
745 }
746
747 pub fn spawn_listeners(
753 mut self,
754 ) -> Result<
755 (
756 UnboundedSender<ClientRequest>,
757 UnboundedReceiver<NetworkEvent>,
758 ),
759 NetworkError,
760 > {
761 let (s_input, mut s_output) = unbounded_channel::<ClientRequest>();
762 let (r_input, r_output) = unbounded_channel::<NetworkEvent>();
763 let (mut bootstrap_tx, bootstrap_rx) = mpsc::channel(100);
764 self.resend_tx = Some(s_input.clone());
765 self.dht_handler.set_bootstrap_sender(bootstrap_tx.clone());
766
767 DHTBootstrapTask::run(bootstrap_rx, s_input.clone());
768 spawn(
769 async move {
770 loop {
771 select! {
772 event = self.swarm.next() => {
773 debug!("peerid {:?}\t\thandling maybe event {:?}", self.peer_id, event);
774 if let Some(event) = event {
775 debug!("peerid {:?}\t\thandling event {:?}", self.peer_id, event);
776 self.handle_swarm_events(event, &r_input).await?;
777 }
778 },
779 msg = s_output.recv() => {
780 debug!("peerid {:?}\t\thandling msg {:?}", self.peer_id, msg);
781 let shutdown = self.handle_client_requests(msg).await?;
782 if shutdown {
783 let _ = bootstrap_tx.send(InputEvent::ShutdownBootstrap).await;
784 break
785 }
786 }
787 }
788 }
789 Ok::<(), NetworkError>(())
790 }
791 .instrument(info_span!("Libp2p NetworkBehaviour Handler")),
792 );
793 Ok((s_input, r_output))
794 }
795
796 pub fn peer_id(&self) -> PeerId {
798 self.peer_id
799 }
800}