1use std::{
11 cmp::min,
12 collections::{BTreeSet, HashSet},
13 fmt::Debug,
14 net::{IpAddr, ToSocketAddrs},
15 num::NonZeroUsize,
16 sync::{
17 Arc,
18 atomic::{AtomicBool, AtomicU64, Ordering},
19 },
20 time::Duration,
21};
22#[cfg(feature = "hotshot-testing")]
23use std::{collections::HashMap, str::FromStr};
24
25use anyhow::{Context, anyhow};
26use async_lock::RwLock;
27use async_trait::async_trait;
28use bimap::BiMap;
29use futures::future::join_all;
30#[cfg(feature = "hotshot-testing")]
31use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtNoPersistence;
32pub use hotshot_libp2p_networking::network::{GossipConfig, RequestResponseConfig};
33use hotshot_libp2p_networking::{
34 network::{
35 DEFAULT_REPLICATION_FACTOR,
36 NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg},
37 NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeReceiver,
38 behaviours::dht::{
39 record::{Namespace, RecordKey, RecordValue},
40 store::persistent::DhtPersistentStorage,
41 },
42 spawn_network_node,
43 transport::construct_auth_message,
44 },
45 reexport::Multiaddr,
46};
47use hotshot_types::{
48 BoxSyncFuture, boxed_sync,
49 constants::LOOK_AHEAD,
50 data::{EpochNumber, ViewNumber},
51 network::NetworkConfig,
52 traits::{
53 metrics::{Counter, Gauge, Metrics, NoMetrics},
54 network::{ConnectedNetwork, NetworkError, Topic},
55 node_implementation::NodeType,
56 signature_key::{PrivateSignatureKey, SignatureKey},
57 },
58};
59#[cfg(feature = "hotshot-testing")]
60use hotshot_types::{
61 PeerConnectInfo,
62 traits::network::{AsyncGenerator, NetworkReliability, TestableNetworkingImplementation},
63};
64use libp2p_identity::{
65 Keypair, PeerId,
66 ed25519::{self, SecretKey},
67};
68use serde::Serialize;
69use tokio::{
70 select, spawn,
71 sync::{
72 Mutex,
73 mpsc::{Receiver, Sender, channel, error::TrySendError},
74 },
75 time::sleep,
76};
77use tracing::{error, info, instrument, trace, warn};
78
79use crate::{BroadcastDelay, EpochMembershipCoordinator};
80
81#[derive(Clone, Debug)]
83pub struct Libp2pMetricsValue {
84 pub num_connected_peers: Box<dyn Gauge>,
86 pub num_failed_messages: Box<dyn Counter>,
88 pub is_ready: Box<dyn Gauge>,
90}
91
92impl Libp2pMetricsValue {
93 pub fn new(metrics: &dyn Metrics) -> Self {
95 let subgroup = metrics.subgroup("libp2p".into());
97
98 Self {
100 num_connected_peers: subgroup.create_gauge("num_connected_peers".into(), None),
101 num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
102 is_ready: subgroup.create_gauge("is_ready".into(), None),
103 }
104 }
105}
106
107impl Default for Libp2pMetricsValue {
108 fn default() -> Self {
110 Self::new(&*NoMetrics::boxed())
111 }
112}
113
114pub type BootstrapAddrs = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
117
118pub const QC_TOPIC: &str = "global";
120
121#[derive(Serialize)]
130pub struct Empty {
131 byte: u8,
134}
135
136impl<T: NodeType> Debug for Libp2pNetwork<T> {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 f.debug_struct("Libp2p").field("inner", &"inner").finish()
139 }
140}
141
142pub type PeerInfoVec = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
144
145#[derive(Debug)]
147struct Libp2pNetworkInner<T: NodeType> {
148 pk: T::SignatureKey,
150 handle: Arc<NetworkNodeHandle<T>>,
152 receiver: Mutex<Receiver<Vec<u8>>>,
154 sender: Sender<Vec<u8>>,
156 node_lookup_send: Sender<Option<(ViewNumber, T::SignatureKey)>>,
158 bootstrap_addrs: PeerInfoVec,
161 is_ready: Arc<AtomicBool>,
163 dht_timeout: Duration,
165 is_bootstrapped: Arc<AtomicBool>,
167 metrics: Libp2pMetricsValue,
169 subscribed_topics: HashSet<String>,
171 latest_seen_view: Arc<AtomicU64>,
175 #[cfg(feature = "hotshot-testing")]
176 reliability_config: Option<Box<dyn NetworkReliability>>,
178 kill_switch: Sender<()>,
180}
181
182#[derive(Clone)]
185pub struct Libp2pNetwork<T: NodeType> {
186 inner: Arc<Libp2pNetworkInner<T>>,
188}
189
190#[cfg(feature = "hotshot-testing")]
191impl<T: NodeType> TestableNetworkingImplementation<T> for Libp2pNetwork<T> {
192 #[allow(clippy::panic, clippy::too_many_lines)]
202 fn generator(
203 expected_node_count: usize,
204 num_bootstrap: usize,
205 _network_id: usize,
206 da_committee_size: usize,
207 reliability_config: Option<Box<dyn NetworkReliability>>,
208 _secondary_network_delay: Duration,
209 _connect_infos: &mut HashMap<T::SignatureKey, PeerConnectInfo>,
210 ) -> AsyncGenerator<Arc<Self>> {
211 assert!(
212 da_committee_size <= expected_node_count,
213 "DA committee size must be less than or equal to total # nodes"
214 );
215 let bootstrap_addrs: PeerInfoVec = Arc::default();
216 let node_ids: Arc<RwLock<HashSet<u64>>> = Arc::default();
217
218 Box::pin({
221 move |node_id| {
222 info!(
223 "GENERATOR: Node id {:?}, is bootstrap: {:?}",
224 node_id,
225 node_id < num_bootstrap as u64
226 );
227
228 let port = std::net::UdpSocket::bind("127.0.0.1:0")
230 .expect("UDP socket should bind")
231 .local_addr()
232 .expect("UDP socket should have local addr")
233 .port();
234
235 let addr =
236 Multiaddr::from_str(&format!("/ip4/127.0.0.1/udp/{port}/quic-v1")).unwrap();
237
238 let privkey = T::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
240 let pubkey = T::SignatureKey::from_private(&privkey);
241
242 let libp2p_keypair = derive_libp2p_keypair::<T::SignatureKey>(&privkey)
244 .expect("Failed to derive libp2p keypair");
245
246 let lookup_record_value = RecordValue::new_signed(
248 &RecordKey::new(Namespace::Lookup, pubkey.to_bytes()),
249 libp2p_keypair.public().to_peer_id().to_bytes(),
250 &privkey,
251 )
252 .expect("Failed to sign DHT lookup record");
253
254 let replication_factor =
256 NonZeroUsize::new((2 * expected_node_count).div_ceil(3)).unwrap();
257
258 let config = NetworkNodeConfigBuilder::default()
260 .keypair(libp2p_keypair)
261 .replication_factor(replication_factor)
262 .bind_address(Some(addr))
263 .to_connect_addrs(HashSet::default())
264 .republication_interval(None)
265 .build()
266 .expect("Failed to build network node config");
267
268 let bootstrap_addrs_ref = Arc::clone(&bootstrap_addrs);
269 let node_ids_ref = Arc::clone(&node_ids);
270 let reliability_config_dup = reliability_config.clone();
271
272 Box::pin(async move {
273 let mut write_ids = node_ids_ref.write().await;
275 if write_ids.contains(&node_id) {
276 write_ids.clear();
277 }
278 write_ids.insert(node_id);
279 drop(write_ids);
280 Arc::new(
281 match Libp2pNetwork::new(
282 Libp2pMetricsValue::default(),
283 DhtNoPersistence,
284 config,
285 pubkey.clone(),
286 lookup_record_value,
287 bootstrap_addrs_ref,
288 usize::try_from(node_id).unwrap(),
289 #[cfg(feature = "hotshot-testing")]
290 reliability_config_dup,
291 )
292 .await
293 {
294 Ok(network) => network,
295 Err(err) => {
296 panic!("Failed to create libp2p network: {err:?}");
297 },
298 },
299 )
300 })
301 }
302 })
303 }
304
305 fn in_flight_message_count(&self) -> Option<usize> {
306 None
307 }
308}
309
310pub fn derive_libp2p_keypair<K: SignatureKey>(
316 private_key: &K::PrivateKey,
317) -> anyhow::Result<Keypair> {
318 let derived_key = blake3::derive_key("libp2p key", &private_key.to_bytes());
320 let derived_key = SecretKey::try_from_bytes(derived_key)?;
321
322 Ok(ed25519::Keypair::from(derived_key).into())
324}
325
326pub fn derive_libp2p_peer_id<K: SignatureKey>(
331 private_key: &K::PrivateKey,
332) -> anyhow::Result<PeerId> {
333 let keypair = derive_libp2p_keypair::<K>(private_key)?;
335
336 Ok(PeerId::from_public_key(&keypair.public()))
338}
339
340pub fn derive_libp2p_multiaddr(addr: &String) -> anyhow::Result<Multiaddr> {
349 let (host, port) = match addr.rfind(':') {
351 Some(idx) => (&addr[..idx], &addr[idx + 1..]),
352 None => return Err(anyhow!("Invalid address format, no port supplied")),
353 };
354
355 let ip = host.parse::<IpAddr>();
357
358 let multiaddr_string = match ip {
360 Ok(IpAddr::V4(ip)) => format!("/ip4/{ip}/udp/{port}/quic-v1"),
361 Ok(IpAddr::V6(ip)) => format!("/ip6/{ip}/udp/{port}/quic-v1"),
362 Err(_) => {
363 let lookup_result = addr.to_socket_addrs();
365
366 let failed = lookup_result
368 .map(|result| result.collect::<Vec<_>>().is_empty())
369 .unwrap_or(true);
370
371 if failed {
373 warn!(
374 "Failed to resolve domain name {host}, assuming it has not yet been \
375 provisioned"
376 );
377 }
378
379 format!("/dns/{host}/udp/{port}/quic-v1")
380 },
381 };
382
383 multiaddr_string.parse().with_context(|| {
385 format!("Failed to convert Multiaddr string to Multiaddr: {multiaddr_string}")
386 })
387}
388
389impl<T: NodeType> Libp2pNetwork<T> {
390 #[allow(clippy::too_many_arguments)]
399 pub async fn from_config<D: DhtPersistentStorage>(
400 mut config: NetworkConfig<T>,
401 dht_persistent_storage: D,
402 gossip_config: GossipConfig,
403 request_response_config: RequestResponseConfig,
404 bind_address: Multiaddr,
405 pub_key: &T::SignatureKey,
406 priv_key: &<T::SignatureKey as SignatureKey>::PrivateKey,
407 metrics: Libp2pMetricsValue,
408 ) -> anyhow::Result<Self> {
409 let libp2p_config = config
411 .libp2p_config
412 .take()
413 .ok_or(anyhow!("Libp2p config not supplied"))?;
414
415 let keypair = derive_libp2p_keypair::<T::SignatureKey>(priv_key)?;
417
418 let mut config_builder = NetworkNodeConfigBuilder::default();
420
421 config_builder.gossip_config(gossip_config.clone());
423 config_builder.request_response_config(request_response_config);
424
425 let auth_message =
427 construct_auth_message(pub_key, &keypair.public().to_peer_id(), priv_key)
428 .with_context(|| "Failed to construct auth message")?;
429
430 config_builder.auth_message(Some(auth_message));
432
433 let Some(default_replication_factor) = DEFAULT_REPLICATION_FACTOR else {
435 return Err(anyhow!("Default replication factor not supplied"));
436 };
437
438 let replication_factor = NonZeroUsize::new(min(
439 default_replication_factor.get(),
440 config.config.num_nodes_with_stake.get() / 2,
441 ))
442 .with_context(|| "Failed to calculate replication factor")?;
443
444 let lookup_record_value = RecordValue::new_signed(
446 &RecordKey::new(Namespace::Lookup, pub_key.to_bytes()),
447 keypair.public().to_peer_id().to_bytes(),
449 priv_key,
450 )
451 .with_context(|| "Failed to sign DHT lookup record")?;
452
453 config_builder
454 .keypair(keypair)
455 .replication_factor(replication_factor)
456 .bind_address(Some(bind_address.clone()));
457
458 config_builder.to_connect_addrs(HashSet::from_iter(libp2p_config.bootstrap_nodes.clone()));
460
461 let node_config = config_builder.build()?;
463
464 let mut all_keys = BTreeSet::new();
466
467 for node in config.config.known_nodes_with_stake {
469 all_keys.insert(T::SignatureKey::public_key(&node.stake_table_entry));
470 }
471
472 Ok(Libp2pNetwork::new(
473 metrics,
474 dht_persistent_storage,
475 node_config,
476 pub_key.clone(),
477 lookup_record_value,
478 Arc::new(RwLock::new(libp2p_config.bootstrap_nodes)),
479 usize::try_from(config.node_index)?,
480 #[cfg(feature = "hotshot-testing")]
481 None,
482 )
483 .await?)
484 }
485
486 #[must_use]
488 pub fn has_peers(&self) -> bool {
489 self.inner.is_ready.load(Ordering::Relaxed)
490 }
491
492 pub async fn wait_for_peers(&self) {
494 loop {
495 if self.has_peers() {
496 break;
497 }
498 sleep(Duration::from_secs(1)).await;
499 }
500 }
501
502 #[allow(clippy::too_many_arguments)]
515 pub async fn new<D: DhtPersistentStorage>(
516 metrics: Libp2pMetricsValue,
517 dht_persistent_storage: D,
518 config: NetworkNodeConfig,
519 pk: T::SignatureKey,
520 lookup_record_value: RecordValue<T::SignatureKey>,
521 bootstrap_addrs: BootstrapAddrs,
522 id: usize,
523 #[cfg(feature = "hotshot-testing")] reliability_config: Option<Box<dyn NetworkReliability>>,
524 ) -> Result<Libp2pNetwork<T>, NetworkError> {
525 let consensus_key_to_pid_map = Arc::new(parking_lot::Mutex::new(BiMap::new()));
527
528 let (mut rx, network_handle) = spawn_network_node::<T, D>(
529 config.clone(),
530 dht_persistent_storage,
531 Arc::clone(&consensus_key_to_pid_map),
532 id,
533 )
534 .await
535 .map_err(|e| NetworkError::ConfigError(format!("failed to spawn network node: {e}")))?;
536
537 let addr = network_handle.listen_addr();
539 let pid = network_handle.peer_id();
540 bootstrap_addrs.write().await.push((pid, addr));
541
542 let subscribed_topics = HashSet::from_iter(vec![QC_TOPIC.to_string()]);
544
545 let (sender, receiver) = channel(1000);
548 let (node_lookup_send, node_lookup_recv) = channel(10);
549 let (kill_tx, kill_rx) = channel(1);
550 rx.set_kill_switch(kill_rx);
551
552 let mut result = Libp2pNetwork {
553 inner: Arc::new(Libp2pNetworkInner {
554 handle: Arc::new(network_handle),
555 receiver: Mutex::new(receiver),
556 sender: sender.clone(),
557 pk,
558 bootstrap_addrs,
559 is_ready: Arc::new(AtomicBool::new(false)),
560 dht_timeout: config.dht_timeout.unwrap_or(Duration::from_secs(120)),
562 is_bootstrapped: Arc::new(AtomicBool::new(false)),
563 metrics,
564 subscribed_topics,
565 node_lookup_send,
566 latest_seen_view: Arc::new(AtomicU64::new(0)),
570 #[cfg(feature = "hotshot-testing")]
571 reliability_config,
572 kill_switch: kill_tx,
573 }),
574 };
575
576 result.inner.metrics.is_ready.set(0);
578
579 result.handle_event_generator(sender, rx);
580 result.spawn_node_lookup(node_lookup_recv);
581 result.spawn_connect(id, lookup_record_value);
582
583 Ok(result)
584 }
585
586 #[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)]
588 fn spawn_node_lookup(
589 &self,
590 mut node_lookup_recv: Receiver<Option<(ViewNumber, T::SignatureKey)>>,
591 ) {
592 let handle = Arc::clone(&self.inner.handle);
593 let dht_timeout = self.inner.dht_timeout;
594 let latest_seen_view = Arc::clone(&self.inner.latest_seen_view);
595
596 spawn(async move {
598 while let Some(Some((view_number, pk))) = node_lookup_recv.recv().await {
600 #[allow(clippy::cast_possible_truncation)]
602 const THRESHOLD: u64 = (LOOK_AHEAD as f64 * 0.8) as u64;
603
604 trace!("Performing lookup for peer {pk}");
605
606 if latest_seen_view.load(Ordering::Relaxed) + THRESHOLD <= *view_number {
608 if let Err(err) = handle.lookup_node(&pk, dht_timeout).await {
610 warn!("Failed to perform lookup for key {pk}: {err}");
611 };
612 }
613 }
614 });
615 }
616
617 fn spawn_connect(&mut self, id: usize, lookup_record_value: RecordValue<T::SignatureKey>) {
619 let pk = self.inner.pk.clone();
620 let bootstrap_ref = Arc::clone(&self.inner.bootstrap_addrs);
621 let handle = Arc::clone(&self.inner.handle);
622 let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
623 let inner = Arc::clone(&self.inner);
624
625 spawn({
626 let is_ready = Arc::clone(&self.inner.is_ready);
627 async move {
628 let bs_addrs = bootstrap_ref.read().await.clone();
629
630 handle.add_known_peers(bs_addrs).unwrap();
632
633 handle.begin_bootstrap()?;
635 while !is_bootstrapped.load(Ordering::Relaxed) {
636 sleep(Duration::from_secs(1)).await;
637 handle.begin_bootstrap()?;
638 }
639
640 handle.subscribe(QC_TOPIC.to_string()).await.unwrap();
642
643 while handle
646 .put_record(
647 RecordKey::new(Namespace::Lookup, pk.to_bytes()),
648 lookup_record_value.clone(),
649 )
650 .await
651 .is_err()
652 {
653 sleep(Duration::from_secs(1)).await;
654 }
655
656 if let Err(e) = handle.wait_to_connect(1, id).await {
658 error!("Failed to connect to peers: {e:?}");
659 return Err::<(), NetworkError>(e);
660 }
661 info!("Connected to required number of peers");
662
663 is_ready.store(true, Ordering::Relaxed);
665 inner.metrics.is_ready.set(1);
666
667 Ok::<(), NetworkError>(())
668 }
669 });
670 }
671
672 fn handle_recvd_events(
674 &self,
675 msg: NetworkEvent,
676 sender: &Sender<Vec<u8>>,
677 ) -> Result<(), NetworkError> {
678 match msg {
679 GossipMsg(msg) => {
680 sender.try_send(msg).map_err(|err| {
681 NetworkError::ChannelSendError(format!("failed to send gossip message: {err}"))
682 })?;
683 },
684 DirectRequest(msg, _pid, chan) => {
685 sender.try_send(msg).map_err(|err| {
686 NetworkError::ChannelSendError(format!(
687 "failed to send direct request message: {err}"
688 ))
689 })?;
690 if self
691 .inner
692 .handle
693 .direct_response(
694 chan,
695 &bincode::serialize(&Empty { byte: 0u8 }).map_err(|e| {
696 NetworkError::FailedToSerialize(format!(
697 "failed to serialize acknowledgement: {e}"
698 ))
699 })?,
700 )
701 .is_err()
702 {
703 error!("failed to ack!");
704 };
705 },
706 DirectResponse(_msg, _) => {},
707 NetworkEvent::IsBootstrapped => {
708 error!(
709 "handle_recvd_events received `NetworkEvent::IsBootstrapped`, which should be \
710 impossible."
711 );
712 },
713 NetworkEvent::ConnectedPeersUpdate(_) => {},
714 }
715 Ok::<(), NetworkError>(())
716 }
717
718 fn handle_event_generator(&self, sender: Sender<Vec<u8>>, mut network_rx: NetworkNodeReceiver) {
721 let handle = self.clone();
722 let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
723 spawn(async move {
724 let Some(mut kill_switch) = network_rx.take_kill_switch() else {
725 tracing::error!(
726 "`spawn_handle` was called on a network handle that was already closed"
727 );
728 return;
729 };
730
731 loop {
732 select! {
733 msg = network_rx.recv() => {
734 let Ok(message) = msg else {
735 warn!("Network receiver shut down!");
736 return;
737 };
738
739 match message {
740 NetworkEvent::IsBootstrapped => {
741 is_bootstrapped.store(true, Ordering::Relaxed);
742 }
743 GossipMsg(_) | DirectRequest(_, _, _) | DirectResponse(_, _) => {
744 let _ = handle.handle_recvd_events(message, &sender);
745 }
746 NetworkEvent::ConnectedPeersUpdate(num_peers) => {
747 handle.inner.metrics.num_connected_peers.set(num_peers);
748 }
749 }
750 }
751
752 _kill_switch = kill_switch.recv() => {
753 warn!("Event Handler shutdown");
754 return;
755 }
756 }
757 }
758 });
759 }
760}
761
762#[async_trait]
763impl<T: NodeType> ConnectedNetwork<T::SignatureKey> for Libp2pNetwork<T> {
764 #[instrument(name = "Libp2pNetwork::ready_blocking", skip_all)]
765 async fn wait_for_ready(&self) {
766 self.wait_for_peers().await;
767 }
768
769 fn pause(&self) {
770 unimplemented!("Pausing not implemented for the Libp2p network");
771 }
772
773 fn resume(&self) {
774 unimplemented!("Resuming not implemented for the Libp2p network");
775 }
776
777 #[instrument(name = "Libp2pNetwork::shut_down", skip_all)]
778 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
779 where
780 'a: 'b,
781 Self: 'b,
782 {
783 let closure = async move {
784 let _ = self.inner.handle.shutdown().await;
785 let _ = self.inner.node_lookup_send.send(None).await;
786 let _ = self.inner.kill_switch.send(()).await;
787 };
788 boxed_sync(closure)
789 }
790
791 #[instrument(name = "Libp2pNetwork::broadcast_message", skip_all)]
792 async fn broadcast_message(
793 &self,
794 _: ViewNumber,
795 message: Vec<u8>,
796 topic: Topic,
797 _broadcast_delay: BroadcastDelay,
798 ) -> Result<(), NetworkError> {
799 if !self.has_peers() {
801 self.inner.metrics.num_failed_messages.add(1);
802 return Err(NetworkError::NoPeersYet);
803 };
804
805 let topic = topic.to_string();
807 if self.inner.subscribed_topics.contains(&topic) {
808 self.inner.sender.try_send(message.clone()).map_err(|_| {
810 self.inner.metrics.num_failed_messages.add(1);
811 NetworkError::ShutDown
812 })?;
813 }
814
815 #[cfg(feature = "hotshot-testing")]
817 {
818 let metrics = self.inner.metrics.clone();
819 if let Some(config) = &self.inner.reliability_config {
820 let handle = Arc::clone(&self.inner.handle);
821
822 let fut = config.clone().chaos_send_msg(
823 message,
824 Arc::new(move |msg: Vec<u8>| {
825 let topic_2 = topic.clone();
826 let handle_2 = Arc::clone(&handle);
827 let metrics_2 = metrics.clone();
828 boxed_sync(async move {
829 if let Err(e) = handle_2.gossip_no_serialize(topic_2, msg) {
830 metrics_2.num_failed_messages.add(1);
831 warn!("Failed to broadcast to libp2p: {e:?}");
832 }
833 })
834 }),
835 );
836 spawn(fut);
837 return Ok(());
838 }
839 }
840
841 if let Err(e) = self.inner.handle.gossip(topic, &message) {
842 self.inner.metrics.num_failed_messages.add(1);
843 return Err(e);
844 }
845
846 Ok(())
847 }
848
849 #[instrument(name = "Libp2pNetwork::da_broadcast_message", skip_all)]
850 async fn da_broadcast_message(
851 &self,
852 view: ViewNumber,
853 message: Vec<u8>,
854 recipients: Vec<T::SignatureKey>,
855 _broadcast_delay: BroadcastDelay,
856 ) -> Result<(), NetworkError> {
857 if !self.has_peers() {
859 self.inner.metrics.num_failed_messages.add(1);
860 return Err(NetworkError::NoPeersYet);
861 };
862
863 let topic = Topic::Da.to_string();
865 if self.inner.subscribed_topics.contains(&topic) {
866 self.inner.sender.try_send(message.clone()).map_err(|_| {
867 self.inner.metrics.num_failed_messages.add(1);
868 NetworkError::ShutDown
869 })?;
870 }
871
872 let future_results = recipients
873 .into_iter()
874 .map(|r| self.direct_message(view, message.clone(), r));
875 let results = join_all(future_results).await;
876
877 let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
878
879 if errors.is_empty() {
880 Ok(())
881 } else {
882 Err(NetworkError::Multiple(errors))
883 }
884 }
885
886 #[instrument(name = "Libp2pNetwork::direct_message", skip_all)]
887 async fn direct_message(
888 &self,
889 _: ViewNumber,
890 message: Vec<u8>,
891 recipient: T::SignatureKey,
892 ) -> Result<(), NetworkError> {
893 if !self.has_peers() {
895 self.inner.metrics.num_failed_messages.add(1);
896 return Err(NetworkError::NoPeersYet);
897 };
898
899 if recipient == self.inner.pk {
901 self.inner.sender.try_send(message).map_err(|_x| {
903 self.inner.metrics.num_failed_messages.add(1);
904 NetworkError::ShutDown
905 })?;
906 return Ok(());
907 }
908
909 let pid = match self
910 .inner
911 .handle
912 .lookup_node(&recipient, Duration::from_secs(2))
913 .await
914 {
915 Ok(pid) => pid,
916 Err(err) => {
917 self.inner.metrics.num_failed_messages.add(1);
918 return Err(NetworkError::LookupError(format!(
919 "failed to look up node for direct message: {err}"
920 )));
921 },
922 };
923
924 #[cfg(feature = "hotshot-testing")]
925 {
926 let metrics = self.inner.metrics.clone();
927 if let Some(config) = &self.inner.reliability_config {
928 let handle = Arc::clone(&self.inner.handle);
929
930 let fut = config.clone().chaos_send_msg(
931 message,
932 Arc::new(move |msg: Vec<u8>| {
933 let handle_2 = Arc::clone(&handle);
934 let metrics_2 = metrics.clone();
935 boxed_sync(async move {
936 if let Err(e) = handle_2.direct_request_no_serialize(pid, msg) {
937 metrics_2.num_failed_messages.add(1);
938 warn!("Failed to broadcast to libp2p: {e:?}");
939 }
940 })
941 }),
942 );
943 spawn(fut);
944 return Ok(());
945 }
946 }
947
948 match self.inner.handle.direct_request(pid, &message) {
949 Ok(()) => Ok(()),
950 Err(e) => {
951 self.inner.metrics.num_failed_messages.add(1);
952 Err(e)
953 },
954 }
955 }
956
957 #[instrument(name = "Libp2pNetwork::recv_message", skip_all)]
962 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
963 let result = self
964 .inner
965 .receiver
966 .lock()
967 .await
968 .recv()
969 .await
970 .ok_or(NetworkError::ShutDown)?;
971
972 Ok(result)
973 }
974
975 #[instrument(name = "Libp2pNetwork::queue_node_lookup", skip_all)]
976 #[allow(clippy::type_complexity)]
977 fn queue_node_lookup(
978 &self,
979 view_number: ViewNumber,
980 pk: T::SignatureKey,
981 ) -> Result<(), TrySendError<Option<(ViewNumber, T::SignatureKey)>>> {
982 self.inner
983 .node_lookup_send
984 .try_send(Some((view_number, pk)))
985 }
986
987 async fn update_view<TYPES>(
1000 &self,
1001 view: ViewNumber,
1002 epoch: Option<EpochNumber>,
1003 membership_coordinator: EpochMembershipCoordinator<TYPES>,
1004 ) where
1005 TYPES: NodeType<SignatureKey = T::SignatureKey>,
1006 {
1007 let future_view = ViewNumber::new(*view) + LOOK_AHEAD;
1008 let epoch = epoch.map(|e| EpochNumber::new(*e));
1009
1010 let membership = match membership_coordinator.membership_for_epoch(epoch).await {
1011 Ok(m) => m,
1012 Err(e) => {
1013 return tracing::warn!(e.message);
1014 },
1015 };
1016 let future_leader = match membership.leader(future_view).await {
1017 Ok(l) => l,
1018 Err(e) => {
1019 return tracing::info!("Failed to calculate leader for view {future_view}: {e}");
1020 },
1021 };
1022
1023 let _ = self
1024 .queue_node_lookup(ViewNumber::new(*future_view), future_leader)
1025 .map_err(|err| tracing::warn!("failed to process node lookup request: {err}"));
1026 }
1027}
1028
1029#[cfg(test)]
1030mod test {
1031 mod derive_multiaddr {
1032 use std::net::Ipv6Addr;
1033
1034 use super::super::*;
1035
1036 #[test]
1038 fn test_v4_valid() {
1039 let addr = "1.1.1.1:8080".to_string();
1041 let multiaddr =
1042 derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1043
1044 assert_eq!(multiaddr.to_string(), "/ip4/1.1.1.1/udp/8080/quic-v1");
1046 }
1047
1048 #[test]
1050 fn test_v6_valid() {
1051 let ipv6_addr = Ipv6Addr::new(1, 2, 3, 4, 5, 6, 7, 8);
1053 let addr = format!("{ipv6_addr}:8080");
1054 let multiaddr =
1055 derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1056
1057 assert_eq!(
1059 multiaddr.to_string(),
1060 format!("/ip6/{ipv6_addr}/udp/8080/quic-v1")
1061 );
1062 }
1063
1064 #[test]
1066 fn test_no_port() {
1067 let addr = "1.1.1.1".to_string();
1069 let multiaddr = derive_libp2p_multiaddr(&addr);
1070
1071 assert!(multiaddr.is_err());
1073 }
1074
1075 #[test]
1077 fn test_fqdn_exists() {
1078 let addr = "example.com:8080".to_string();
1080 let multiaddr =
1081 derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1082
1083 assert_eq!(multiaddr.to_string(), "/dns/example.com/udp/8080/quic-v1");
1085 }
1086
1087 #[test]
1089 fn test_fqdn_does_not_exist() {
1090 let addr = "libp2p.example.com:8080".to_string();
1092 let multiaddr =
1093 derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1094
1095 assert_eq!(
1097 multiaddr.to_string(),
1098 "/dns/libp2p.example.com/udp/8080/quic-v1"
1099 );
1100 }
1101
1102 #[test]
1104 fn test_fqdn_no_port() {
1105 let addr = "example.com".to_string();
1107 let multiaddr = derive_libp2p_multiaddr(&addr);
1108
1109 assert!(multiaddr.is_err());
1111 }
1112 }
1113}