1mod config;
9
10mod handle;
13
14use std::{
15 collections::{HashMap, HashSet},
16 iter,
17 num::{NonZeroU32, NonZeroUsize},
18 sync::Arc,
19 time::{Duration, Instant},
20};
21
22use bimap::BiMap;
23use futures::{channel::mpsc, SinkExt, StreamExt};
24use hotshot_types::{
25 constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, traits::node_implementation::NodeType,
26};
27use libp2p::{
28 autonat,
29 core::transport::ListenerId,
30 gossipsub::{
31 Behaviour as Gossipsub, ConfigBuilder as GossipsubConfigBuilder, Event as GossipEvent,
32 Message as GossipsubMessage, MessageAuthenticity, MessageId, Topic, ValidationMode,
33 },
34 identify::{
35 Behaviour as IdentifyBehaviour, Config as IdentifyConfig, Event as IdentifyEvent,
36 Info as IdentifyInfo,
37 },
38 identity::Keypair,
39 kad::{store::MemoryStore, Behaviour, Config, Mode, Record},
40 request_response::{
41 Behaviour as RequestResponse, Config as Libp2pRequestResponseConfig, ProtocolSupport,
42 },
43 swarm::SwarmEvent,
44 Multiaddr, StreamProtocol, Swarm, SwarmBuilder,
45};
46use libp2p_identity::PeerId;
47use parking_lot::Mutex;
48use rand::{prelude::SliceRandom, thread_rng};
49use tokio::{
50 select, spawn,
51 sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
52};
53use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
54
55pub use self::{
56 config::{
57 GossipConfig, NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeConfigBuilderError,
58 RequestResponseConfig, DEFAULT_REPLICATION_FACTOR,
59 },
60 handle::{spawn_network_node, NetworkNodeHandle, NetworkNodeReceiver},
61};
62use super::{
63 behaviours::dht::{
64 bootstrap::{DHTBootstrapTask, InputEvent},
65 store::{
66 persistent::{DhtPersistentStorage, PersistentStore},
67 validated::ValidatedStore,
68 },
69 },
70 cbor::Cbor,
71 gen_transport, BoxedTransport, ClientRequest, NetworkDef, NetworkError, NetworkEvent,
72 NetworkEventInternal,
73};
74use crate::network::behaviours::{
75 dht::{DHTBehaviour, DHTProgress, KadPutQuery},
76 direct_message::{DMBehaviour, DMRequest},
77 exponential_backoff::ExponentialBackoff,
78};
79
80pub const MAX_GOSSIP_MSG_SIZE: usize = 2_000_000_000;
82
83pub const ESTABLISHED_LIMIT: NonZeroU32 = NonZeroU32::new(ESTABLISHED_LIMIT_UNWR).unwrap();
85pub const ESTABLISHED_LIMIT_UNWR: u32 = 10;
87
88#[derive(derive_more::Debug)]
90pub struct NetworkNode<T: NodeType, D: DhtPersistentStorage> {
91 peer_id: PeerId,
93 #[debug(skip)]
95 swarm: Swarm<NetworkDef<T::SignatureKey, D>>,
96 kademlia_record_ttl: Duration,
98 consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
100 listener_id: Option<ListenerId>,
102 direct_message_state: DMBehaviour,
104 dht_handler: DHTBehaviour<T::SignatureKey, D>,
106 resend_tx: Option<UnboundedSender<ClientRequest>>,
108}
109
110impl<T: NodeType, D: DhtPersistentStorage> NetworkNode<T, D> {
111 pub fn num_connected(&self) -> usize {
113 self.swarm.connected_peers().count()
114 }
115
116 pub fn connected_pids(&self) -> HashSet<PeerId> {
118 self.swarm.connected_peers().copied().collect()
119 }
120
121 #[instrument(skip(self))]
125 pub async fn start_listen(
126 &mut self,
127 listen_addr: Multiaddr,
128 ) -> Result<Multiaddr, NetworkError> {
129 self.listener_id = Some(self.swarm.listen_on(listen_addr).map_err(|err| {
130 NetworkError::ListenError(format!("failed to listen for Libp2p: {err}"))
131 })?);
132 let addr = loop {
133 if let Some(SwarmEvent::NewListenAddr { address, .. }) = self.swarm.next().await {
134 break address;
135 }
136 };
137 info!("Libp2p listening on {addr:?}");
138 Ok(addr)
139 }
140
141 #[instrument(skip(self))]
146 pub fn add_known_peers(&mut self, known_peers: &[(PeerId, Multiaddr)]) {
147 debug!("Adding {} known peers", known_peers.len());
148 let behaviour = self.swarm.behaviour_mut();
149 let mut bs_nodes = HashMap::<PeerId, HashSet<Multiaddr>>::new();
150 let mut shuffled = known_peers.iter().collect::<Vec<_>>();
151 shuffled.shuffle(&mut thread_rng());
152 for (peer_id, addr) in shuffled {
153 if *peer_id != self.peer_id {
154 behaviour.dht.add_address(peer_id, addr.clone());
155 behaviour.autonat.add_server(*peer_id, Some(addr.clone()));
156 bs_nodes.insert(*peer_id, iter::once(addr.clone()).collect());
157 }
158 }
159 }
160
161 #[allow(clippy::too_many_lines)]
175 pub async fn new(
176 config: NetworkNodeConfig<T>,
177 dht_persistent_storage: D,
178 consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
179 ) -> Result<Self, NetworkError> {
180 let keypair = config
182 .keypair
183 .clone()
184 .unwrap_or_else(Keypair::generate_ed25519);
185
186 let peer_id = PeerId::from(keypair.public());
188
189 let transport: BoxedTransport = gen_transport::<T>(
191 keypair.clone(),
192 config.membership.clone(),
193 config.auth_message.clone(),
194 Arc::clone(&consensus_key_to_pid_map),
195 )
196 .await?;
197
198 let kademlia_record_republication_interval = config
200 .republication_interval
201 .unwrap_or(Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC));
202
203 let kademlia_ttl = config
205 .ttl
206 .unwrap_or(16 * kademlia_record_republication_interval);
207
208 let mut swarm: Swarm<NetworkDef<T::SignatureKey, D>> = {
210 let message_id_fn = |message: &GossipsubMessage| {
212 let hash = blake3::hash(&message.data);
213 MessageId::from(hash.as_bytes().to_vec())
214 };
215
216 let gossipsub_config = GossipsubConfigBuilder::default()
218 .message_id_fn(message_id_fn) .validation_mode(ValidationMode::Strict) .heartbeat_interval(config.gossip_config.heartbeat_interval) .history_gossip(config.gossip_config.history_gossip) .history_length(config.gossip_config.history_length) .mesh_n(config.gossip_config.mesh_n) .mesh_n_high(config.gossip_config.mesh_n_high) .mesh_n_low(config.gossip_config.mesh_n_low) .mesh_outbound_min(config.gossip_config.mesh_outbound_min) .max_transmit_size(config.gossip_config.max_transmit_size) .max_ihave_length(config.gossip_config.max_ihave_length) .max_ihave_messages(config.gossip_config.max_ihave_messages) .published_message_ids_cache_time(
231 config.gossip_config.published_message_ids_cache_time,
232 ) .iwant_followup_time(config.gossip_config.iwant_followup_time) .max_messages_per_rpc(config.gossip_config.max_messages_per_rpc) .gossip_retransimission(config.gossip_config.gossip_retransmission) .flood_publish(config.gossip_config.flood_publish) .duplicate_cache_time(config.gossip_config.duplicate_cache_time) .fanout_ttl(config.gossip_config.fanout_ttl) .heartbeat_initial_delay(config.gossip_config.heartbeat_initial_delay) .gossip_factor(config.gossip_config.gossip_factor) .gossip_lazy(config.gossip_config.gossip_lazy) .build()
243 .map_err(|err| {
244 NetworkError::ConfigError(format!("error building gossipsub config: {err:?}"))
245 })?;
246
247 let gossipsub: Gossipsub = Gossipsub::new(
249 MessageAuthenticity::Signed(keypair.clone()),
250 gossipsub_config,
251 )
252 .map_err(|err| {
253 NetworkError::ConfigError(format!("error building gossipsub behaviour: {err:?}"))
254 })?;
255
256 let identify_cfg =
261 IdentifyConfig::new("HotShot/identify/1.0".to_string(), keypair.public());
262 let identify = IdentifyBehaviour::new(identify_cfg);
263
264 let mut kconfig = Config::new(StreamProtocol::new("/ipfs/kad/1.0.0"));
266 kconfig
267 .set_parallelism(NonZeroUsize::new(5).unwrap())
268 .set_provider_publication_interval(Some(kademlia_record_republication_interval))
269 .set_publication_interval(Some(kademlia_record_republication_interval))
270 .set_record_ttl(Some(kademlia_ttl));
271
272 #[allow(clippy::panic)]
274 if let Some(factor) = config.replication_factor {
275 kconfig.set_replication_factor(factor);
276 } else {
277 panic!("Replication factor not set");
278 }
279
280 let mut kadem = Behaviour::with_config(
282 peer_id,
283 PersistentStore::new(
284 ValidatedStore::new(MemoryStore::new(peer_id)),
285 dht_persistent_storage,
286 5,
287 )
288 .await,
289 kconfig,
290 );
291 kadem.set_mode(Some(Mode::Server));
292
293 let rrconfig = Libp2pRequestResponseConfig::default();
294
295 let cbor = Cbor::new(
297 config.request_response_config.request_size_maximum,
298 config.request_response_config.response_size_maximum,
299 );
300
301 let direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>> =
302 RequestResponse::with_codec(
303 cbor,
304 [(
305 StreamProtocol::new("/HotShot/direct_message/1.0"),
306 ProtocolSupport::Full,
307 )]
308 .into_iter(),
309 rrconfig.clone(),
310 );
311
312 let autonat_config = autonat::Config {
313 only_global_ips: false,
314 ..Default::default()
315 };
316
317 let network = NetworkDef::new(
318 gossipsub,
319 kadem,
320 identify,
321 direct_message,
322 autonat::Behaviour::new(peer_id, autonat_config),
323 );
324
325 let swarm = SwarmBuilder::with_existing_identity(keypair.clone());
327 let swarm = swarm.with_tokio();
328
329 swarm
330 .with_other_transport(|_| transport)
331 .unwrap()
332 .with_behaviour(|_| network)
333 .unwrap()
334 .build()
335 };
336 for (peer, addr) in &config.to_connect_addrs {
337 if peer != swarm.local_peer_id() {
338 swarm.behaviour_mut().add_address(peer, addr.clone());
339 }
340 }
341
342 Ok(Self {
343 peer_id,
344 swarm,
345 kademlia_record_ttl: kademlia_ttl,
346 consensus_key_to_pid_map,
347 listener_id: None,
348 direct_message_state: DMBehaviour::default(),
349 dht_handler: DHTBehaviour::new(
350 peer_id,
351 config
352 .replication_factor
353 .unwrap_or(NonZeroUsize::new(4).unwrap()),
354 ),
355 resend_tx: None,
356 })
357 }
358
359 pub fn put_record(&mut self, mut query: KadPutQuery) {
364 let mut record = Record::new(query.key.clone(), query.value.clone());
366
367 record.expires = Some(Instant::now() + self.kademlia_record_ttl);
369
370 match self.swarm.behaviour_mut().dht.put_record(
371 record,
372 libp2p::kad::Quorum::N(
373 NonZeroUsize::try_from(self.dht_handler.replication_factor().get() / 2)
374 .expect("replication factor should be bigger than 0"),
375 ),
376 ) {
377 Err(e) => {
378 query.progress = DHTProgress::NotStarted;
380 query.backoff.start_next(false);
381 error!("Error publishing to DHT: {e:?} for peer {:?}", self.peer_id);
382 },
383 Ok(qid) => {
384 debug!("Published record to DHT with qid {qid:?}");
385 let query = KadPutQuery {
386 progress: DHTProgress::InProgress(qid),
387 ..query
388 };
389 self.dht_handler.put_record(qid, query);
390 },
391 }
392 }
393
394 #[instrument(skip(self))]
403 async fn handle_client_requests(
404 &mut self,
405 msg: Option<ClientRequest>,
406 ) -> Result<bool, NetworkError> {
407 let behaviour = self.swarm.behaviour_mut();
408 match msg {
409 Some(msg) => {
410 match msg {
411 ClientRequest::BeginBootstrap => {
412 debug!("Beginning Libp2p bootstrap");
413 let _ = self.swarm.behaviour_mut().dht.bootstrap();
414 },
415 ClientRequest::LookupPeer(pid, chan) => {
416 let id = self.swarm.behaviour_mut().dht.get_closest_peers(pid);
417 self.dht_handler
418 .in_progress_get_closest_peers
419 .insert(id, chan);
420 },
421 ClientRequest::GetRoutingTable(chan) => {
422 self.dht_handler
423 .print_routing_table(&mut self.swarm.behaviour_mut().dht);
424 if chan.send(()).is_err() {
425 warn!("Tried to notify client but client not tracking anymore");
426 }
427 },
428 ClientRequest::PutDHT { key, value, notify } => {
429 let query = KadPutQuery {
430 progress: DHTProgress::NotStarted,
431 notify,
432 key,
433 value,
434 backoff: ExponentialBackoff::default(),
435 };
436 self.put_record(query);
437 },
438 ClientRequest::GetConnectedPeerNum(s) => {
439 if s.send(self.num_connected()).is_err() {
440 error!("error sending peer number to client");
441 }
442 },
443 ClientRequest::GetConnectedPeers(s) => {
444 if s.send(self.connected_pids()).is_err() {
445 error!("error sending peer set to client");
446 }
447 },
448 ClientRequest::GetDHT {
449 key,
450 notify,
451 retry_count,
452 } => {
453 self.dht_handler.get_record(
454 key,
455 notify,
456 ExponentialBackoff::default(),
457 retry_count,
458 &mut self.swarm.behaviour_mut().dht,
459 );
460 },
461 ClientRequest::IgnorePeers(_peers) => {
462 },
464 ClientRequest::Shutdown => {
465 if let Some(listener_id) = self.listener_id {
466 self.swarm.remove_listener(listener_id);
467 }
468
469 return Ok(true);
470 },
471 ClientRequest::GossipMsg(topic, contents) => {
472 behaviour.publish_gossip(Topic::new(topic.clone()), contents.clone());
473 },
474 ClientRequest::Subscribe(t, chan) => {
475 behaviour.subscribe_gossip(&t);
476 if let Some(chan) = chan {
477 if chan.send(()).is_err() {
478 error!("finished subscribing but response channel dropped");
479 }
480 }
481 },
482 ClientRequest::Unsubscribe(t, chan) => {
483 behaviour.unsubscribe_gossip(&t);
484 if let Some(chan) = chan {
485 if chan.send(()).is_err() {
486 error!("finished unsubscribing but response channel dropped");
487 }
488 }
489 },
490 ClientRequest::DirectRequest {
491 pid,
492 contents,
493 retry_count,
494 } => {
495 debug!("Sending direct request to {pid:?}");
496 let id = behaviour.add_direct_request(pid, contents.clone());
497 let req = DMRequest {
498 peer_id: pid,
499 data: contents,
500 backoff: ExponentialBackoff::default(),
501 retry_count,
502 };
503 self.direct_message_state.add_direct_request(req, id);
504 },
505 ClientRequest::DirectResponse(chan, msg) => {
506 behaviour.add_direct_response(chan, msg);
507 },
508 ClientRequest::AddKnownPeers(peers) => {
509 self.add_known_peers(&peers);
510 },
511 ClientRequest::Prune(pid) => {
512 if self.swarm.disconnect_peer_id(pid).is_err() {
513 warn!("Could not disconnect from {pid:?}");
514 }
515 },
516 }
517 },
518 None => {
519 error!("Error receiving msg in main behaviour loop: channel closed");
520 },
521 }
522 Ok(false)
523 }
524
525 #[allow(clippy::type_complexity)]
527 #[instrument(skip(self))]
528 async fn handle_swarm_events(
529 &mut self,
530 event: SwarmEvent<NetworkEventInternal>,
531 send_to_client: &UnboundedSender<NetworkEvent>,
532 ) -> Result<(), NetworkError> {
533 debug!("Swarm event observed {:?}", event);
535
536 #[allow(deprecated)]
537 match event {
538 SwarmEvent::ConnectionEstablished {
539 connection_id: _,
540 peer_id,
541 endpoint,
542 num_established,
543 concurrent_dial_errors,
544 established_in: _established_in,
545 } => {
546 if num_established > ESTABLISHED_LIMIT {
547 error!(
548 "Num concurrent connections to a single peer exceeding {ESTABLISHED_LIMIT:?} at {num_established:?}!"
549 );
550 } else {
551 debug!(
552 "Connection established with {peer_id:?} at {endpoint:?} with {concurrent_dial_errors:?} concurrent dial errors"
553 );
554 }
555
556 send_to_client
558 .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
559 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
560 },
561 SwarmEvent::ConnectionClosed {
562 connection_id: _,
563 peer_id,
564 endpoint,
565 num_established,
566 cause,
567 } => {
568 if num_established > ESTABLISHED_LIMIT_UNWR {
569 error!(
570 "Num concurrent connections to a single peer exceeding {ESTABLISHED_LIMIT:?} at {num_established:?}!"
571 );
572 } else {
573 debug!("Connection closed with {peer_id:?} at {endpoint:?} due to {cause:?}");
574 }
575
576 if num_established == 0 {
578 self.consensus_key_to_pid_map
579 .lock()
580 .remove_by_right(&peer_id);
581 }
582
583 send_to_client
585 .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
586 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
587 },
588 SwarmEvent::Dialing {
589 peer_id,
590 connection_id: _,
591 } => {
592 debug!("Attempting to dial {peer_id:?}");
593 },
594 SwarmEvent::ListenerClosed {
595 listener_id: _,
596 addresses: _,
597 reason: _,
598 }
599 | SwarmEvent::NewListenAddr {
600 listener_id: _,
601 address: _,
602 }
603 | SwarmEvent::ExpiredListenAddr {
604 listener_id: _,
605 address: _,
606 }
607 | SwarmEvent::NewExternalAddrCandidate { .. }
608 | SwarmEvent::ExternalAddrExpired { .. }
609 | SwarmEvent::IncomingConnection {
610 connection_id: _,
611 local_addr: _,
612 send_back_addr: _,
613 } => {},
614 SwarmEvent::Behaviour(b) => {
615 let maybe_event = match b {
616 NetworkEventInternal::DHTEvent(e) => self
617 .dht_handler
618 .dht_handle_event(e, self.swarm.behaviour_mut().dht.store_mut()),
619 NetworkEventInternal::IdentifyEvent(e) => {
620 if let IdentifyEvent::Received {
622 peer_id,
623 info:
624 IdentifyInfo {
625 listen_addrs,
626 protocols: _,
627 public_key: _,
628 protocol_version: _,
629 agent_version: _,
630 observed_addr: _,
631 },
632 connection_id: _,
633 } = *e
634 {
635 let behaviour = self.swarm.behaviour_mut();
636
637 for addr in listen_addrs.iter().collect::<HashSet<_>>() {
639 behaviour.dht.add_address(&peer_id, addr.clone());
640 }
641 }
642 None
643 },
644 NetworkEventInternal::GossipEvent(e) => match *e {
645 GossipEvent::Message {
646 propagation_source: _peer_id,
647 message_id: _id,
648 message,
649 } => Some(NetworkEvent::GossipMsg(message.data)),
650 GossipEvent::Subscribed { peer_id, topic } => {
651 debug!("Peer {peer_id:?} subscribed to topic {topic:?}");
652 None
653 },
654 GossipEvent::Unsubscribed { peer_id, topic } => {
655 debug!("Peer {peer_id:?} unsubscribed from topic {topic:?}");
656 None
657 },
658 GossipEvent::GossipsubNotSupported { peer_id } => {
659 warn!("Peer {peer_id:?} does not support gossipsub");
660 None
661 },
662 },
663 NetworkEventInternal::DMEvent(e) => self
664 .direct_message_state
665 .handle_dm_event(e, self.resend_tx.clone()),
666 NetworkEventInternal::AutonatEvent(e) => {
667 match e {
668 autonat::Event::InboundProbe(_) => {},
669 autonat::Event::OutboundProbe(e) => match e {
670 autonat::OutboundProbeEvent::Request { .. }
671 | autonat::OutboundProbeEvent::Response { .. } => {},
672 autonat::OutboundProbeEvent::Error {
673 probe_id: _,
674 peer,
675 error,
676 } => {
677 warn!(
678 "AutoNAT Probe failed to peer {peer:?} with error: {error:?}"
679 );
680 },
681 },
682 autonat::Event::StatusChanged { old, new } => {
683 debug!("AutoNAT Status changed. Old: {old:?}, New: {new:?}");
684 },
685 };
686 None
687 },
688 };
689
690 if let Some(event) = maybe_event {
691 send_to_client
693 .send(event)
694 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
695 }
696 },
697 SwarmEvent::OutgoingConnectionError {
698 connection_id: _,
699 peer_id,
700 error,
701 } => {
702 warn!("Outgoing connection error to {peer_id:?}: {error:?}");
703 },
704 SwarmEvent::IncomingConnectionError {
705 connection_id: _,
706 local_addr: _,
707 send_back_addr: _,
708 error,
709 } => {
710 warn!("Incoming connection error: {error:?}");
711 },
712 SwarmEvent::ListenerError {
713 listener_id: _,
714 error,
715 } => {
716 warn!("Listener error: {error:?}");
717 },
718 SwarmEvent::ExternalAddrConfirmed { address } => {
719 let my_id = *self.swarm.local_peer_id();
720 self.swarm
721 .behaviour_mut()
722 .dht
723 .add_address(&my_id, address.clone());
724 },
725 SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
726 self.swarm
727 .behaviour_mut()
728 .dht
729 .add_address(&peer_id, address.clone());
730 },
731 _ => {
732 debug!("Unhandled swarm event {event:?}");
733 },
734 }
735 Ok(())
736 }
737
738 pub fn spawn_listeners(
744 mut self,
745 ) -> Result<
746 (
747 UnboundedSender<ClientRequest>,
748 UnboundedReceiver<NetworkEvent>,
749 ),
750 NetworkError,
751 > {
752 let (s_input, mut s_output) = unbounded_channel::<ClientRequest>();
753 let (r_input, r_output) = unbounded_channel::<NetworkEvent>();
754 let (mut bootstrap_tx, bootstrap_rx) = mpsc::channel(100);
755 self.resend_tx = Some(s_input.clone());
756 self.dht_handler.set_bootstrap_sender(bootstrap_tx.clone());
757
758 DHTBootstrapTask::run(bootstrap_rx, s_input.clone());
759 spawn(
760 async move {
761 loop {
762 select! {
763 event = self.swarm.next() => {
764 debug!("peerid {:?}\t\thandling maybe event {:?}", self.peer_id, event);
765 if let Some(event) = event {
766 debug!("peerid {:?}\t\thandling event {:?}", self.peer_id, event);
767 self.handle_swarm_events(event, &r_input).await?;
768 }
769 },
770 msg = s_output.recv() => {
771 debug!("peerid {:?}\t\thandling msg {:?}", self.peer_id, msg);
772 let shutdown = self.handle_client_requests(msg).await?;
773 if shutdown {
774 let _ = bootstrap_tx.send(InputEvent::ShutdownBootstrap).await;
775 break
776 }
777 }
778 }
779 }
780 Ok::<(), NetworkError>(())
781 }
782 .instrument(info_span!("Libp2p NetworkBehaviour Handler")),
783 );
784 Ok((s_input, r_output))
785 }
786
787 pub fn peer_id(&self) -> PeerId {
789 self.peer_id
790 }
791}