1#[cfg(feature = "hotshot-testing")]
11use std::str::FromStr;
12use std::{
13 cmp::min,
14 collections::{BTreeSet, HashSet},
15 fmt::Debug,
16 net::{IpAddr, ToSocketAddrs},
17 num::NonZeroUsize,
18 sync::{
19 atomic::{AtomicBool, AtomicU64, Ordering},
20 Arc,
21 },
22 time::Duration,
23};
24
25use anyhow::{anyhow, Context};
26use async_lock::RwLock;
27use async_trait::async_trait;
28use bimap::BiMap;
29use futures::future::join_all;
30#[cfg(feature = "hotshot-testing")]
31use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtNoPersistence;
32pub use hotshot_libp2p_networking::network::{GossipConfig, RequestResponseConfig};
33use hotshot_libp2p_networking::{
34 network::{
35 behaviours::dht::{
36 record::{Namespace, RecordKey, RecordValue},
37 store::persistent::DhtPersistentStorage,
38 },
39 spawn_network_node,
40 transport::construct_auth_message,
41 NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg},
42 NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeReceiver,
43 DEFAULT_REPLICATION_FACTOR,
44 },
45 reexport::Multiaddr,
46};
47#[cfg(feature = "hotshot-testing")]
48use hotshot_types::traits::network::{
49 AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
50};
51use hotshot_types::{
52 boxed_sync,
53 constants::LOOK_AHEAD,
54 data::{EpochNumber, ViewNumber},
55 network::NetworkConfig,
56 traits::{
57 metrics::{Counter, Gauge, Metrics, NoMetrics},
58 network::{ConnectedNetwork, NetworkError, Topic},
59 node_implementation::{ConsensusTime, NodeType},
60 signature_key::{PrivateSignatureKey, SignatureKey},
61 },
62 BoxSyncFuture,
63};
64use libp2p_identity::{
65 ed25519::{self, SecretKey},
66 Keypair, PeerId,
67};
68use serde::Serialize;
69use tokio::{
70 select, spawn,
71 sync::{
72 mpsc::{channel, error::TrySendError, Receiver, Sender},
73 Mutex,
74 },
75 time::sleep,
76};
77use tracing::{error, info, instrument, trace, warn};
78
79use crate::{BroadcastDelay, EpochMembershipCoordinator};
80
81#[derive(Clone, Debug)]
83pub struct Libp2pMetricsValue {
84 pub num_connected_peers: Box<dyn Gauge>,
86 pub num_failed_messages: Box<dyn Counter>,
88 pub is_ready: Box<dyn Gauge>,
90}
91
92impl Libp2pMetricsValue {
93 pub fn new(metrics: &dyn Metrics) -> Self {
95 let subgroup = metrics.subgroup("libp2p".into());
97
98 Self {
100 num_connected_peers: subgroup.create_gauge("num_connected_peers".into(), None),
101 num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
102 is_ready: subgroup.create_gauge("is_ready".into(), None),
103 }
104 }
105}
106
107impl Default for Libp2pMetricsValue {
108 fn default() -> Self {
110 Self::new(&*NoMetrics::boxed())
111 }
112}
113
114pub type BootstrapAddrs = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
117
118pub const QC_TOPIC: &str = "global";
120
121#[derive(Serialize)]
130pub struct Empty {
131 byte: u8,
134}
135
136impl<T: NodeType> Debug for Libp2pNetwork<T> {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 f.debug_struct("Libp2p").field("inner", &"inner").finish()
139 }
140}
141
142pub type PeerInfoVec = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
144
145#[derive(Debug)]
147struct Libp2pNetworkInner<T: NodeType> {
148 pk: T::SignatureKey,
150 handle: Arc<NetworkNodeHandle<T>>,
152 receiver: Mutex<Receiver<Vec<u8>>>,
154 sender: Sender<Vec<u8>>,
156 node_lookup_send: Sender<Option<(ViewNumber, T::SignatureKey)>>,
158 bootstrap_addrs: PeerInfoVec,
161 is_ready: Arc<AtomicBool>,
163 dht_timeout: Duration,
165 is_bootstrapped: Arc<AtomicBool>,
167 metrics: Libp2pMetricsValue,
169 subscribed_topics: HashSet<String>,
171 latest_seen_view: Arc<AtomicU64>,
175 #[cfg(feature = "hotshot-testing")]
176 reliability_config: Option<Box<dyn NetworkReliability>>,
178 kill_switch: Sender<()>,
180}
181
182#[derive(Clone)]
185pub struct Libp2pNetwork<T: NodeType> {
186 inner: Arc<Libp2pNetworkInner<T>>,
188}
189
190#[cfg(feature = "hotshot-testing")]
191impl<T: NodeType> TestableNetworkingImplementation<T> for Libp2pNetwork<T> {
192 #[allow(clippy::panic, clippy::too_many_lines)]
202 fn generator(
203 expected_node_count: usize,
204 num_bootstrap: usize,
205 _network_id: usize,
206 da_committee_size: usize,
207 reliability_config: Option<Box<dyn NetworkReliability>>,
208 _secondary_network_delay: Duration,
209 ) -> AsyncGenerator<Arc<Self>> {
210 assert!(
211 da_committee_size <= expected_node_count,
212 "DA committee size must be less than or equal to total # nodes"
213 );
214 let bootstrap_addrs: PeerInfoVec = Arc::default();
215 let node_ids: Arc<RwLock<HashSet<u64>>> = Arc::default();
216
217 Box::pin({
220 move |node_id| {
221 info!(
222 "GENERATOR: Node id {:?}, is bootstrap: {:?}",
223 node_id,
224 node_id < num_bootstrap as u64
225 );
226
227 let port = portpicker::pick_unused_port().expect("Could not find an open port");
229
230 let addr =
231 Multiaddr::from_str(&format!("/ip4/127.0.0.1/udp/{port}/quic-v1")).unwrap();
232
233 let privkey = T::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
235 let pubkey = T::SignatureKey::from_private(&privkey);
236
237 let libp2p_keypair = derive_libp2p_keypair::<T::SignatureKey>(&privkey)
239 .expect("Failed to derive libp2p keypair");
240
241 let lookup_record_value = RecordValue::new_signed(
243 &RecordKey::new(Namespace::Lookup, pubkey.to_bytes()),
244 libp2p_keypair.public().to_peer_id().to_bytes(),
245 &privkey,
246 )
247 .expect("Failed to sign DHT lookup record");
248
249 let replication_factor =
251 NonZeroUsize::new((2 * expected_node_count).div_ceil(3)).unwrap();
252
253 let config = NetworkNodeConfigBuilder::default()
255 .keypair(libp2p_keypair)
256 .replication_factor(replication_factor)
257 .bind_address(Some(addr))
258 .to_connect_addrs(HashSet::default())
259 .republication_interval(None)
260 .build()
261 .expect("Failed to build network node config");
262
263 let bootstrap_addrs_ref = Arc::clone(&bootstrap_addrs);
264 let node_ids_ref = Arc::clone(&node_ids);
265 let reliability_config_dup = reliability_config.clone();
266
267 Box::pin(async move {
268 let mut write_ids = node_ids_ref.write().await;
270 if write_ids.contains(&node_id) {
271 write_ids.clear();
272 }
273 write_ids.insert(node_id);
274 drop(write_ids);
275 Arc::new(
276 match Libp2pNetwork::new(
277 Libp2pMetricsValue::default(),
278 DhtNoPersistence,
279 config,
280 pubkey.clone(),
281 lookup_record_value,
282 bootstrap_addrs_ref,
283 usize::try_from(node_id).unwrap(),
284 #[cfg(feature = "hotshot-testing")]
285 reliability_config_dup,
286 )
287 .await
288 {
289 Ok(network) => network,
290 Err(err) => {
291 panic!("Failed to create libp2p network: {err:?}");
292 },
293 },
294 )
295 })
296 }
297 })
298 }
299
300 fn in_flight_message_count(&self) -> Option<usize> {
301 None
302 }
303}
304
305pub fn derive_libp2p_keypair<K: SignatureKey>(
311 private_key: &K::PrivateKey,
312) -> anyhow::Result<Keypair> {
313 let derived_key = blake3::derive_key("libp2p key", &private_key.to_bytes());
315 let derived_key = SecretKey::try_from_bytes(derived_key)?;
316
317 Ok(ed25519::Keypair::from(derived_key).into())
319}
320
321pub fn derive_libp2p_peer_id<K: SignatureKey>(
326 private_key: &K::PrivateKey,
327) -> anyhow::Result<PeerId> {
328 let keypair = derive_libp2p_keypair::<K>(private_key)?;
330
331 Ok(PeerId::from_public_key(&keypair.public()))
333}
334
335pub fn derive_libp2p_multiaddr(addr: &String) -> anyhow::Result<Multiaddr> {
344 let (host, port) = match addr.rfind(':') {
346 Some(idx) => (&addr[..idx], &addr[idx + 1..]),
347 None => return Err(anyhow!("Invalid address format, no port supplied")),
348 };
349
350 let ip = host.parse::<IpAddr>();
352
353 let multiaddr_string = match ip {
355 Ok(IpAddr::V4(ip)) => format!("/ip4/{ip}/udp/{port}/quic-v1"),
356 Ok(IpAddr::V6(ip)) => format!("/ip6/{ip}/udp/{port}/quic-v1"),
357 Err(_) => {
358 let lookup_result = addr.to_socket_addrs();
360
361 let failed = lookup_result
363 .map(|result| result.collect::<Vec<_>>().is_empty())
364 .unwrap_or(true);
365
366 if failed {
368 warn!(
369 "Failed to resolve domain name {host}, assuming it has not yet been \
370 provisioned"
371 );
372 }
373
374 format!("/dns/{host}/udp/{port}/quic-v1")
375 },
376 };
377
378 multiaddr_string.parse().with_context(|| {
380 format!("Failed to convert Multiaddr string to Multiaddr: {multiaddr_string}")
381 })
382}
383
384impl<T: NodeType> Libp2pNetwork<T> {
385 #[allow(clippy::too_many_arguments)]
394 pub async fn from_config<D: DhtPersistentStorage>(
395 mut config: NetworkConfig<T>,
396 dht_persistent_storage: D,
397 gossip_config: GossipConfig,
398 request_response_config: RequestResponseConfig,
399 bind_address: Multiaddr,
400 pub_key: &T::SignatureKey,
401 priv_key: &<T::SignatureKey as SignatureKey>::PrivateKey,
402 metrics: Libp2pMetricsValue,
403 ) -> anyhow::Result<Self> {
404 let libp2p_config = config
406 .libp2p_config
407 .take()
408 .ok_or(anyhow!("Libp2p config not supplied"))?;
409
410 let keypair = derive_libp2p_keypair::<T::SignatureKey>(priv_key)?;
412
413 let mut config_builder = NetworkNodeConfigBuilder::default();
415
416 config_builder.gossip_config(gossip_config.clone());
418 config_builder.request_response_config(request_response_config);
419
420 let auth_message =
422 construct_auth_message(pub_key, &keypair.public().to_peer_id(), priv_key)
423 .with_context(|| "Failed to construct auth message")?;
424
425 config_builder.auth_message(Some(auth_message));
427
428 let Some(default_replication_factor) = DEFAULT_REPLICATION_FACTOR else {
430 return Err(anyhow!("Default replication factor not supplied"));
431 };
432
433 let replication_factor = NonZeroUsize::new(min(
434 default_replication_factor.get(),
435 config.config.num_nodes_with_stake.get() / 2,
436 ))
437 .with_context(|| "Failed to calculate replication factor")?;
438
439 let lookup_record_value = RecordValue::new_signed(
441 &RecordKey::new(Namespace::Lookup, pub_key.to_bytes()),
442 keypair.public().to_peer_id().to_bytes(),
444 priv_key,
445 )
446 .with_context(|| "Failed to sign DHT lookup record")?;
447
448 config_builder
449 .keypair(keypair)
450 .replication_factor(replication_factor)
451 .bind_address(Some(bind_address.clone()));
452
453 config_builder.to_connect_addrs(HashSet::from_iter(libp2p_config.bootstrap_nodes.clone()));
455
456 let node_config = config_builder.build()?;
458
459 let mut all_keys = BTreeSet::new();
461
462 for node in config.config.known_nodes_with_stake {
464 all_keys.insert(T::SignatureKey::public_key(&node.stake_table_entry));
465 }
466
467 Ok(Libp2pNetwork::new(
468 metrics,
469 dht_persistent_storage,
470 node_config,
471 pub_key.clone(),
472 lookup_record_value,
473 Arc::new(RwLock::new(libp2p_config.bootstrap_nodes)),
474 usize::try_from(config.node_index)?,
475 #[cfg(feature = "hotshot-testing")]
476 None,
477 )
478 .await?)
479 }
480
481 #[must_use]
483 pub fn has_peers(&self) -> bool {
484 self.inner.is_ready.load(Ordering::Relaxed)
485 }
486
487 pub async fn wait_for_peers(&self) {
489 loop {
490 if self.has_peers() {
491 break;
492 }
493 sleep(Duration::from_secs(1)).await;
494 }
495 }
496
497 #[allow(clippy::too_many_arguments)]
510 pub async fn new<D: DhtPersistentStorage>(
511 metrics: Libp2pMetricsValue,
512 dht_persistent_storage: D,
513 config: NetworkNodeConfig,
514 pk: T::SignatureKey,
515 lookup_record_value: RecordValue<T::SignatureKey>,
516 bootstrap_addrs: BootstrapAddrs,
517 id: usize,
518 #[cfg(feature = "hotshot-testing")] reliability_config: Option<Box<dyn NetworkReliability>>,
519 ) -> Result<Libp2pNetwork<T>, NetworkError> {
520 let consensus_key_to_pid_map = Arc::new(parking_lot::Mutex::new(BiMap::new()));
522
523 let (mut rx, network_handle) = spawn_network_node::<T, D>(
524 config.clone(),
525 dht_persistent_storage,
526 Arc::clone(&consensus_key_to_pid_map),
527 id,
528 )
529 .await
530 .map_err(|e| NetworkError::ConfigError(format!("failed to spawn network node: {e}")))?;
531
532 let addr = network_handle.listen_addr();
534 let pid = network_handle.peer_id();
535 bootstrap_addrs.write().await.push((pid, addr));
536
537 let subscribed_topics = HashSet::from_iter(vec![QC_TOPIC.to_string()]);
539
540 let (sender, receiver) = channel(1000);
543 let (node_lookup_send, node_lookup_recv) = channel(10);
544 let (kill_tx, kill_rx) = channel(1);
545 rx.set_kill_switch(kill_rx);
546
547 let mut result = Libp2pNetwork {
548 inner: Arc::new(Libp2pNetworkInner {
549 handle: Arc::new(network_handle),
550 receiver: Mutex::new(receiver),
551 sender: sender.clone(),
552 pk,
553 bootstrap_addrs,
554 is_ready: Arc::new(AtomicBool::new(false)),
555 dht_timeout: config.dht_timeout.unwrap_or(Duration::from_secs(120)),
557 is_bootstrapped: Arc::new(AtomicBool::new(false)),
558 metrics,
559 subscribed_topics,
560 node_lookup_send,
561 latest_seen_view: Arc::new(AtomicU64::new(0)),
565 #[cfg(feature = "hotshot-testing")]
566 reliability_config,
567 kill_switch: kill_tx,
568 }),
569 };
570
571 result.inner.metrics.is_ready.set(0);
573
574 result.handle_event_generator(sender, rx);
575 result.spawn_node_lookup(node_lookup_recv);
576 result.spawn_connect(id, lookup_record_value);
577
578 Ok(result)
579 }
580
581 #[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)]
583 fn spawn_node_lookup(
584 &self,
585 mut node_lookup_recv: Receiver<Option<(ViewNumber, T::SignatureKey)>>,
586 ) {
587 let handle = Arc::clone(&self.inner.handle);
588 let dht_timeout = self.inner.dht_timeout;
589 let latest_seen_view = Arc::clone(&self.inner.latest_seen_view);
590
591 spawn(async move {
593 while let Some(Some((view_number, pk))) = node_lookup_recv.recv().await {
595 #[allow(clippy::cast_possible_truncation)]
597 const THRESHOLD: u64 = (LOOK_AHEAD as f64 * 0.8) as u64;
598
599 trace!("Performing lookup for peer {pk}");
600
601 if latest_seen_view.load(Ordering::Relaxed) + THRESHOLD <= *view_number {
603 if let Err(err) = handle.lookup_node(&pk, dht_timeout).await {
605 warn!("Failed to perform lookup for key {pk}: {err}");
606 };
607 }
608 }
609 });
610 }
611
612 fn spawn_connect(&mut self, id: usize, lookup_record_value: RecordValue<T::SignatureKey>) {
614 let pk = self.inner.pk.clone();
615 let bootstrap_ref = Arc::clone(&self.inner.bootstrap_addrs);
616 let handle = Arc::clone(&self.inner.handle);
617 let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
618 let inner = Arc::clone(&self.inner);
619
620 spawn({
621 let is_ready = Arc::clone(&self.inner.is_ready);
622 async move {
623 let bs_addrs = bootstrap_ref.read().await.clone();
624
625 handle.add_known_peers(bs_addrs).unwrap();
627
628 handle.begin_bootstrap()?;
630 while !is_bootstrapped.load(Ordering::Relaxed) {
631 sleep(Duration::from_secs(1)).await;
632 handle.begin_bootstrap()?;
633 }
634
635 handle.subscribe(QC_TOPIC.to_string()).await.unwrap();
637
638 while handle
641 .put_record(
642 RecordKey::new(Namespace::Lookup, pk.to_bytes()),
643 lookup_record_value.clone(),
644 )
645 .await
646 .is_err()
647 {
648 sleep(Duration::from_secs(1)).await;
649 }
650
651 if let Err(e) = handle.wait_to_connect(1, id).await {
653 error!("Failed to connect to peers: {e:?}");
654 return Err::<(), NetworkError>(e);
655 }
656 info!("Connected to required number of peers");
657
658 is_ready.store(true, Ordering::Relaxed);
660 inner.metrics.is_ready.set(1);
661
662 Ok::<(), NetworkError>(())
663 }
664 });
665 }
666
667 fn handle_recvd_events(
669 &self,
670 msg: NetworkEvent,
671 sender: &Sender<Vec<u8>>,
672 ) -> Result<(), NetworkError> {
673 match msg {
674 GossipMsg(msg) => {
675 sender.try_send(msg).map_err(|err| {
676 NetworkError::ChannelSendError(format!("failed to send gossip message: {err}"))
677 })?;
678 },
679 DirectRequest(msg, _pid, chan) => {
680 sender.try_send(msg).map_err(|err| {
681 NetworkError::ChannelSendError(format!(
682 "failed to send direct request message: {err}"
683 ))
684 })?;
685 if self
686 .inner
687 .handle
688 .direct_response(
689 chan,
690 &bincode::serialize(&Empty { byte: 0u8 }).map_err(|e| {
691 NetworkError::FailedToSerialize(format!(
692 "failed to serialize acknowledgement: {e}"
693 ))
694 })?,
695 )
696 .is_err()
697 {
698 error!("failed to ack!");
699 };
700 },
701 DirectResponse(_msg, _) => {},
702 NetworkEvent::IsBootstrapped => {
703 error!(
704 "handle_recvd_events received `NetworkEvent::IsBootstrapped`, which should be \
705 impossible."
706 );
707 },
708 NetworkEvent::ConnectedPeersUpdate(_) => {},
709 }
710 Ok::<(), NetworkError>(())
711 }
712
713 fn handle_event_generator(&self, sender: Sender<Vec<u8>>, mut network_rx: NetworkNodeReceiver) {
716 let handle = self.clone();
717 let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
718 spawn(async move {
719 let Some(mut kill_switch) = network_rx.take_kill_switch() else {
720 tracing::error!(
721 "`spawn_handle` was called on a network handle that was already closed"
722 );
723 return;
724 };
725
726 loop {
727 select! {
728 msg = network_rx.recv() => {
729 let Ok(message) = msg else {
730 warn!("Network receiver shut down!");
731 return;
732 };
733
734 match message {
735 NetworkEvent::IsBootstrapped => {
736 is_bootstrapped.store(true, Ordering::Relaxed);
737 }
738 GossipMsg(_) | DirectRequest(_, _, _) | DirectResponse(_, _) => {
739 let _ = handle.handle_recvd_events(message, &sender);
740 }
741 NetworkEvent::ConnectedPeersUpdate(num_peers) => {
742 handle.inner.metrics.num_connected_peers.set(num_peers);
743 }
744 }
745 }
746
747 _kill_switch = kill_switch.recv() => {
748 warn!("Event Handler shutdown");
749 return;
750 }
751 }
752 }
753 });
754 }
755}
756
757#[async_trait]
758impl<T: NodeType> ConnectedNetwork<T::SignatureKey> for Libp2pNetwork<T> {
759 #[instrument(name = "Libp2pNetwork::ready_blocking", skip_all)]
760 async fn wait_for_ready(&self) {
761 self.wait_for_peers().await;
762 }
763
764 fn pause(&self) {
765 unimplemented!("Pausing not implemented for the Libp2p network");
766 }
767
768 fn resume(&self) {
769 unimplemented!("Resuming not implemented for the Libp2p network");
770 }
771
772 #[instrument(name = "Libp2pNetwork::shut_down", skip_all)]
773 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
774 where
775 'a: 'b,
776 Self: 'b,
777 {
778 let closure = async move {
779 let _ = self.inner.handle.shutdown().await;
780 let _ = self.inner.node_lookup_send.send(None).await;
781 let _ = self.inner.kill_switch.send(()).await;
782 };
783 boxed_sync(closure)
784 }
785
786 #[instrument(name = "Libp2pNetwork::broadcast_message", skip_all)]
787 async fn broadcast_message(
788 &self,
789 _: ViewNumber,
790 message: Vec<u8>,
791 topic: Topic,
792 _broadcast_delay: BroadcastDelay,
793 ) -> Result<(), NetworkError> {
794 if !self.has_peers() {
796 self.inner.metrics.num_failed_messages.add(1);
797 return Err(NetworkError::NoPeersYet);
798 };
799
800 let topic = topic.to_string();
802 if self.inner.subscribed_topics.contains(&topic) {
803 self.inner.sender.try_send(message.clone()).map_err(|_| {
805 self.inner.metrics.num_failed_messages.add(1);
806 NetworkError::ShutDown
807 })?;
808 }
809
810 #[cfg(feature = "hotshot-testing")]
812 {
813 let metrics = self.inner.metrics.clone();
814 if let Some(ref config) = &self.inner.reliability_config {
815 let handle = Arc::clone(&self.inner.handle);
816
817 let fut = config.clone().chaos_send_msg(
818 message,
819 Arc::new(move |msg: Vec<u8>| {
820 let topic_2 = topic.clone();
821 let handle_2 = Arc::clone(&handle);
822 let metrics_2 = metrics.clone();
823 boxed_sync(async move {
824 if let Err(e) = handle_2.gossip_no_serialize(topic_2, msg) {
825 metrics_2.num_failed_messages.add(1);
826 warn!("Failed to broadcast to libp2p: {e:?}");
827 }
828 })
829 }),
830 );
831 spawn(fut);
832 return Ok(());
833 }
834 }
835
836 if let Err(e) = self.inner.handle.gossip(topic, &message) {
837 self.inner.metrics.num_failed_messages.add(1);
838 return Err(e);
839 }
840
841 Ok(())
842 }
843
844 #[instrument(name = "Libp2pNetwork::da_broadcast_message", skip_all)]
845 async fn da_broadcast_message(
846 &self,
847 view: ViewNumber,
848 message: Vec<u8>,
849 recipients: Vec<T::SignatureKey>,
850 _broadcast_delay: BroadcastDelay,
851 ) -> Result<(), NetworkError> {
852 if !self.has_peers() {
854 self.inner.metrics.num_failed_messages.add(1);
855 return Err(NetworkError::NoPeersYet);
856 };
857
858 let topic = Topic::Da.to_string();
860 if self.inner.subscribed_topics.contains(&topic) {
861 self.inner.sender.try_send(message.clone()).map_err(|_| {
862 self.inner.metrics.num_failed_messages.add(1);
863 NetworkError::ShutDown
864 })?;
865 }
866
867 let future_results = recipients
868 .into_iter()
869 .map(|r| self.direct_message(view, message.clone(), r));
870 let results = join_all(future_results).await;
871
872 let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
873
874 if errors.is_empty() {
875 Ok(())
876 } else {
877 Err(NetworkError::Multiple(errors))
878 }
879 }
880
881 #[instrument(name = "Libp2pNetwork::direct_message", skip_all)]
882 async fn direct_message(
883 &self,
884 _: ViewNumber,
885 message: Vec<u8>,
886 recipient: T::SignatureKey,
887 ) -> Result<(), NetworkError> {
888 if !self.has_peers() {
890 self.inner.metrics.num_failed_messages.add(1);
891 return Err(NetworkError::NoPeersYet);
892 };
893
894 if recipient == self.inner.pk {
896 self.inner.sender.try_send(message).map_err(|_x| {
898 self.inner.metrics.num_failed_messages.add(1);
899 NetworkError::ShutDown
900 })?;
901 return Ok(());
902 }
903
904 let pid = match self
905 .inner
906 .handle
907 .lookup_node(&recipient, self.inner.dht_timeout)
908 .await
909 {
910 Ok(pid) => pid,
911 Err(err) => {
912 self.inner.metrics.num_failed_messages.add(1);
913 return Err(NetworkError::LookupError(format!(
914 "failed to look up node for direct message: {err}"
915 )));
916 },
917 };
918
919 #[cfg(feature = "hotshot-testing")]
920 {
921 let metrics = self.inner.metrics.clone();
922 if let Some(ref config) = &self.inner.reliability_config {
923 let handle = Arc::clone(&self.inner.handle);
924
925 let fut = config.clone().chaos_send_msg(
926 message,
927 Arc::new(move |msg: Vec<u8>| {
928 let handle_2 = Arc::clone(&handle);
929 let metrics_2 = metrics.clone();
930 boxed_sync(async move {
931 if let Err(e) = handle_2.direct_request_no_serialize(pid, msg) {
932 metrics_2.num_failed_messages.add(1);
933 warn!("Failed to broadcast to libp2p: {e:?}");
934 }
935 })
936 }),
937 );
938 spawn(fut);
939 return Ok(());
940 }
941 }
942
943 match self.inner.handle.direct_request(pid, &message) {
944 Ok(()) => Ok(()),
945 Err(e) => {
946 self.inner.metrics.num_failed_messages.add(1);
947 Err(e)
948 },
949 }
950 }
951
952 #[instrument(name = "Libp2pNetwork::recv_message", skip_all)]
957 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
958 let result = self
959 .inner
960 .receiver
961 .lock()
962 .await
963 .recv()
964 .await
965 .ok_or(NetworkError::ShutDown)?;
966
967 Ok(result)
968 }
969
970 #[instrument(name = "Libp2pNetwork::queue_node_lookup", skip_all)]
971 #[allow(clippy::type_complexity)]
972 fn queue_node_lookup(
973 &self,
974 view_number: ViewNumber,
975 pk: T::SignatureKey,
976 ) -> Result<(), TrySendError<Option<(ViewNumber, T::SignatureKey)>>> {
977 self.inner
978 .node_lookup_send
979 .try_send(Some((view_number, pk)))
980 }
981
982 async fn update_view<TYPES>(
995 &self,
996 view: ViewNumber,
997 epoch: Option<EpochNumber>,
998 membership_coordinator: EpochMembershipCoordinator<TYPES>,
999 ) where
1000 TYPES: NodeType<SignatureKey = T::SignatureKey>,
1001 {
1002 let future_view = <TYPES as NodeType>::View::new(*view) + LOOK_AHEAD;
1003 let epoch = epoch.map(|e| <TYPES as NodeType>::Epoch::new(*e));
1004
1005 let membership = match membership_coordinator.membership_for_epoch(epoch).await {
1006 Ok(m) => m,
1007 Err(e) => {
1008 return tracing::warn!(e.message);
1009 },
1010 };
1011 let future_leader = match membership.leader(future_view).await {
1012 Ok(l) => l,
1013 Err(e) => {
1014 return tracing::info!("Failed to calculate leader for view {future_view}: {e}");
1015 },
1016 };
1017
1018 let _ = self
1019 .queue_node_lookup(ViewNumber::new(*future_view), future_leader)
1020 .map_err(|err| tracing::warn!("failed to process node lookup request: {err}"));
1021 }
1022}
1023
1024#[cfg(test)]
1025mod test {
1026 mod derive_multiaddr {
1027 use std::net::Ipv6Addr;
1028
1029 use super::super::*;
1030
1031 #[test]
1033 fn test_v4_valid() {
1034 let addr = "1.1.1.1:8080".to_string();
1036 let multiaddr =
1037 derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1038
1039 assert_eq!(multiaddr.to_string(), "/ip4/1.1.1.1/udp/8080/quic-v1");
1041 }
1042
1043 #[test]
1045 fn test_v6_valid() {
1046 let ipv6_addr = Ipv6Addr::new(1, 2, 3, 4, 5, 6, 7, 8);
1048 let addr = format!("{ipv6_addr}:8080");
1049 let multiaddr =
1050 derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1051
1052 assert_eq!(
1054 multiaddr.to_string(),
1055 format!("/ip6/{ipv6_addr}/udp/8080/quic-v1")
1056 );
1057 }
1058
1059 #[test]
1061 fn test_no_port() {
1062 let addr = "1.1.1.1".to_string();
1064 let multiaddr = derive_libp2p_multiaddr(&addr);
1065
1066 assert!(multiaddr.is_err());
1068 }
1069
1070 #[test]
1072 fn test_fqdn_exists() {
1073 let addr = "example.com:8080".to_string();
1075 let multiaddr =
1076 derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1077
1078 assert_eq!(multiaddr.to_string(), "/dns/example.com/udp/8080/quic-v1");
1080 }
1081
1082 #[test]
1084 fn test_fqdn_does_not_exist() {
1085 let addr = "libp2p.example.com:8080".to_string();
1087 let multiaddr =
1088 derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1089
1090 assert_eq!(
1092 multiaddr.to_string(),
1093 "/dns/libp2p.example.com/udp/8080/quic-v1"
1094 );
1095 }
1096
1097 #[test]
1099 fn test_fqdn_no_port() {
1100 let addr = "example.com".to_string();
1102 let multiaddr = derive_libp2p_multiaddr(&addr);
1103
1104 assert!(multiaddr.is_err());
1106 }
1107 }
1108}