1#[cfg(feature = "hotshot-testing")]
8use std::{
9 collections::HashMap,
10 sync::atomic::{AtomicBool, Ordering},
11};
12use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
13#[cfg(feature = "hotshot-testing")]
14use std::{path::Path, time::Duration};
15
16use async_trait::async_trait;
17use bincode::config::Options;
18use cdn_broker::reexports::{
19 connection::protocols::{Quic, Tcp},
20 def::{ConnectionDef, RunDef, Topic as TopicTrait, hook::NoMessageHook},
21 discovery::{Embedded, Redis},
22};
23#[cfg(feature = "hotshot-testing")]
24use cdn_broker::{Broker, Config as BrokerConfig};
25pub use cdn_client::reexports::crypto::signature::KeyPair;
26use cdn_client::{
27 Client, Config as ClientConfig,
28 reexports::{
29 crypto::signature::{Serializable, SignatureScheme},
30 message::{Broadcast, Direct, Message as PushCdnMessage},
31 },
32};
33#[cfg(feature = "hotshot-testing")]
34use cdn_marshal::{Config as MarshalConfig, Marshal};
35use hotshot_types::{
36 BoxSyncFuture, boxed_sync,
37 data::ViewNumber,
38 traits::{
39 metrics::{Counter, Metrics, NoMetrics},
40 network::{BroadcastDelay, ConnectedNetwork, Topic as HotShotTopic},
41 signature_key::SignatureKey,
42 },
43 utils::bincode_opts,
44};
45#[cfg(feature = "hotshot-testing")]
46use hotshot_types::{
47 PeerConnectInfo,
48 traits::network::{AsyncGenerator, NetworkReliability, TestableNetworkingImplementation},
49};
50use num_enum::{IntoPrimitive, TryFromPrimitive};
51use parking_lot::Mutex;
52#[cfg(feature = "hotshot-testing")]
53use rand::{RngCore, SeedableRng, rngs::StdRng};
54use test_utils::reserve_tcp_port;
55use tokio::sync::mpsc::error::TrySendError;
56#[cfg(feature = "hotshot-testing")]
57use tokio::{spawn, time::sleep};
58#[cfg(feature = "hotshot-testing")]
59use tracing::error;
60
61use super::NetworkError;
62#[cfg(feature = "hotshot-testing")]
63use crate::NodeType;
64
65#[derive(Clone)]
67pub struct CdnMetricsValue {
68 pub num_failed_messages: Box<dyn Counter>,
70}
71
72impl CdnMetricsValue {
73 pub fn new(metrics: &dyn Metrics) -> Self {
75 let subgroup = metrics.subgroup("cdn".into());
77
78 Self {
80 num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
81 }
82 }
83}
84
85impl Default for CdnMetricsValue {
86 fn default() -> Self {
88 Self::new(&*NoMetrics::boxed())
89 }
90}
91
92#[derive(Clone, Eq, PartialEq)]
95pub struct WrappedSignatureKey<T: SignatureKey + 'static>(pub T);
96impl<T: SignatureKey> SignatureScheme for WrappedSignatureKey<T> {
97 type PrivateKey = T::PrivateKey;
98 type PublicKey = Self;
99
100 fn sign(
102 private_key: &Self::PrivateKey,
103 namespace: &str,
104 message: &[u8],
105 ) -> anyhow::Result<Vec<u8>> {
106 let message = [namespace.as_bytes(), message].concat();
108
109 let signature = T::sign(private_key, &message)?;
110 Ok(bincode_opts().serialize(&signature)?)
111 }
112
113 fn verify(
115 public_key: &Self::PublicKey,
116 namespace: &str,
117 message: &[u8],
118 signature: &[u8],
119 ) -> bool {
120 let signature: T::PureAssembledSignatureType = match bincode_opts().deserialize(signature) {
122 Ok(key) => key,
123 Err(_) => return false,
124 };
125
126 let message = [namespace.as_bytes(), message].concat();
128
129 public_key.0.validate(&signature, &message)
130 }
131}
132
133impl<T: SignatureKey> Serializable for WrappedSignatureKey<T> {
136 fn serialize(&self) -> anyhow::Result<Vec<u8>> {
137 Ok(self.0.to_bytes())
138 }
139
140 fn deserialize(serialized: &[u8]) -> anyhow::Result<Self> {
141 Ok(WrappedSignatureKey(T::from_bytes(serialized)?))
142 }
143}
144
145pub struct ProductionDef<K: SignatureKey + 'static>(PhantomData<K>);
148impl<K: SignatureKey + 'static> RunDef for ProductionDef<K> {
149 type User = UserDefQuic<K>;
150 type User2 = UserDefTcp<K>;
151 type Broker = BrokerDef<K>;
152 type DiscoveryClientType = Redis;
153 type Topic = Topic;
154}
155
156pub struct UserDefQuic<K: SignatureKey + 'static>(PhantomData<K>);
160impl<K: SignatureKey + 'static> ConnectionDef for UserDefQuic<K> {
161 type Scheme = WrappedSignatureKey<K>;
162 type Protocol = Quic;
163 type MessageHook = NoMessageHook;
164}
165
166pub struct UserDefTcp<K: SignatureKey + 'static>(PhantomData<K>);
169impl<K: SignatureKey + 'static> ConnectionDef for UserDefTcp<K> {
170 type Scheme = WrappedSignatureKey<K>;
171 type Protocol = Tcp;
172 type MessageHook = NoMessageHook;
173}
174
175pub struct BrokerDef<K: SignatureKey + 'static>(PhantomData<K>);
178impl<K: SignatureKey> ConnectionDef for BrokerDef<K> {
179 type Scheme = WrappedSignatureKey<K>;
180 type Protocol = Tcp;
181 type MessageHook = NoMessageHook;
182}
183
184#[derive(Clone)]
188pub struct ClientDef<K: SignatureKey + 'static>(PhantomData<K>);
189impl<K: SignatureKey> ConnectionDef for ClientDef<K> {
190 type Scheme = WrappedSignatureKey<K>;
191 type Protocol = Tcp;
192 type MessageHook = NoMessageHook;
193}
194
195pub struct TestingDef<K: SignatureKey + 'static>(PhantomData<K>);
198impl<K: SignatureKey + 'static> RunDef for TestingDef<K> {
199 type User = UserDefQuic<K>;
200 type User2 = UserDefTcp<K>;
201 type Broker = BrokerDef<K>;
202 type DiscoveryClientType = Embedded;
203 type Topic = Topic;
204}
205
206#[derive(Clone)]
209pub struct PushCdnNetwork<K: SignatureKey + 'static> {
211 client: Client<ClientDef<K>>,
213 metrics: Arc<CdnMetricsValue>,
215 internal_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
217 public_key: K,
219 #[cfg(feature = "hotshot-testing")]
221 is_paused: Arc<AtomicBool>,
222 }
225
226#[repr(u8)]
228#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)]
229pub enum Topic {
230 Global = 0,
232 Da = 1,
234}
235
236impl TopicTrait for Topic {}
239
240impl<K: SignatureKey + 'static> PushCdnNetwork<K> {
241 pub fn new(
248 marshal_endpoint: String,
249 topics: Vec<Topic>,
250 keypair: KeyPair<WrappedSignatureKey<K>>,
251 metrics: CdnMetricsValue,
252 ) -> anyhow::Result<Self> {
253 let config = ClientConfig {
255 endpoint: marshal_endpoint,
256 subscribed_topics: topics.into_iter().map(|t| t as u8).collect(),
257 keypair: keypair.clone(),
258 use_local_authority: true,
259 };
260
261 let client = Client::new(config);
263
264 Ok(Self {
265 client,
266 metrics: Arc::from(metrics),
267 internal_queue: Arc::new(Mutex::new(VecDeque::new())),
268 public_key: keypair.public_key.0,
269 #[cfg(feature = "hotshot-testing")]
271 is_paused: Arc::from(AtomicBool::new(false)),
272 })
273 }
274
275 async fn broadcast_message(&self, message: Vec<u8>, topic: Topic) -> Result<(), NetworkError> {
281 if self
283 .client
284 .subscribed_topics
285 .read()
286 .await
287 .contains(&(topic.clone() as u8))
288 {
289 self.internal_queue.lock().push_back(message.clone());
290 }
291
292 #[cfg(feature = "hotshot-testing")]
294 if self.is_paused.load(Ordering::Relaxed) {
295 return Ok(());
296 }
297
298 if let Err(err) = self
300 .client
301 .send_broadcast_message(vec![topic as u8], message)
302 .await
303 {
304 return Err(NetworkError::MessageReceiveError(format!(
305 "failed to send broadcast message: {err}"
306 )));
307 };
308
309 Ok(())
310 }
311}
312
313#[cfg(feature = "hotshot-testing")]
314impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
315 for PushCdnNetwork<TYPES::SignatureKey>
316{
317 #[allow(clippy::too_many_lines)]
320 fn generator(
321 _expected_node_count: usize,
322 _num_bootstrap: usize,
323 _network_id: usize,
324 da_committee_size: usize,
325 _reliability_config: Option<Box<dyn NetworkReliability>>,
326 _secondary_network_delay: Duration,
327 _connect_infos: &mut HashMap<TYPES::SignatureKey, PeerConnectInfo>,
328 ) -> AsyncGenerator<Arc<Self>> {
329 let (broker_public_key, broker_private_key) =
333 TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], 1337);
334
335 let temp_dir = std::env::temp_dir();
337
338 let discovery_endpoint = temp_dir
340 .join(Path::new(&format!(
341 "test-{}.sqlite",
342 StdRng::from_entropy().next_u64()
343 )))
344 .to_string_lossy()
345 .into_owned();
346
347 let public_port_1 = reserve_tcp_port().expect("OS should have ephemeral ports available");
349 let public_port_2 = reserve_tcp_port().expect("OS should have ephemeral ports available");
350 let public_address_1 = format!("127.0.0.1:{public_port_1}");
351 let public_address_2 = format!("127.0.0.1:{public_port_2}");
352
353 for i in 0..2 {
355 let private_port =
357 reserve_tcp_port().expect("OS should have ephemeral ports available");
358
359 let private_address = format!("127.0.0.1:{private_port}");
361 let (public_address, other_public_address) = if i == 0 {
362 (public_address_1.clone(), public_address_2.clone())
363 } else {
364 (public_address_2.clone(), public_address_1.clone())
365 };
366
367 let broker_identifier = format!("{public_address}/{public_address}");
369 let other_broker_identifier = format!("{other_public_address}/{other_public_address}");
370
371 let config: BrokerConfig<TestingDef<TYPES::SignatureKey>> = BrokerConfig {
373 public_advertise_endpoint: public_address.clone(),
374 public_bind_endpoint: public_address,
375 private_advertise_endpoint: private_address.clone(),
376 private_bind_endpoint: private_address,
377 metrics_bind_endpoint: None,
378 keypair: KeyPair {
379 public_key: WrappedSignatureKey(broker_public_key.clone()),
380 private_key: broker_private_key.clone(),
381 },
382 discovery_endpoint: discovery_endpoint.clone(),
383
384 user_message_hook: NoMessageHook,
385 broker_message_hook: NoMessageHook,
386
387 ca_cert_path: None,
388 ca_key_path: None,
389 global_memory_pool_size: Some(1024 * 1024 * 1024),
391 };
392
393 spawn(async move {
395 let broker: Broker<TestingDef<TYPES::SignatureKey>> =
396 Broker::new(config).await.expect("broker failed to start");
397
398 if other_broker_identifier > broker_identifier {
401 sleep(Duration::from_secs(2)).await;
402 }
403
404 if let Err(err) = broker.start().await {
406 error!("broker stopped: {err}");
407 }
408 });
409 }
410
411 let marshal_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
413
414 let marshal_endpoint = format!("127.0.0.1:{marshal_port}");
416 let marshal_config = MarshalConfig {
417 bind_endpoint: marshal_endpoint.clone(),
418 discovery_endpoint,
419 metrics_bind_endpoint: None,
420 ca_cert_path: None,
421 ca_key_path: None,
422 global_memory_pool_size: Some(1024 * 1024 * 1024),
424 };
425
426 spawn(async move {
428 let marshal: Marshal<TestingDef<TYPES::SignatureKey>> = Marshal::new(marshal_config)
429 .await
430 .expect("failed to spawn marshal");
431
432 if let Err(err) = marshal.start().await {
434 error!("marshal stopped: {err}");
435 }
436 });
437
438 Box::pin({
440 move |node_id| {
441 let marshal_endpoint = marshal_endpoint.clone();
443
444 Box::pin(async move {
445 let private_key =
447 TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
448 let public_key = TYPES::SignatureKey::from_private(&private_key);
449
450 let topics = if node_id < da_committee_size as u64 {
452 vec![Topic::Da as u8, Topic::Global as u8]
453 } else {
454 vec![Topic::Global as u8]
455 };
456
457 let client_config: ClientConfig<ClientDef<TYPES::SignatureKey>> =
459 ClientConfig {
460 keypair: KeyPair {
461 public_key: WrappedSignatureKey(public_key.clone()),
462 private_key,
463 },
464 subscribed_topics: topics,
465 endpoint: marshal_endpoint,
466 use_local_authority: true,
467 };
468
469 Arc::new(PushCdnNetwork {
471 client: Client::new(client_config),
472 metrics: Arc::new(CdnMetricsValue::default()),
473 internal_queue: Arc::new(Mutex::new(VecDeque::new())),
474 public_key,
475 #[cfg(feature = "hotshot-testing")]
476 is_paused: Arc::from(AtomicBool::new(false)),
477 })
478 })
479 }
480 })
481 }
482
483 fn in_flight_message_count(&self) -> Option<usize> {
485 None
486 }
487}
488
489#[async_trait]
490impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
491 fn pause(&self) {
493 #[cfg(feature = "hotshot-testing")]
494 self.is_paused.store(true, Ordering::Relaxed);
495 }
496
497 fn resume(&self) {
499 #[cfg(feature = "hotshot-testing")]
500 self.is_paused.store(false, Ordering::Relaxed);
501 }
502
503 async fn wait_for_ready(&self) {
505 let _ = self.client.ensure_initialized().await;
506 }
507
508 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
510 where
511 'a: 'b,
512 Self: 'b,
513 {
514 boxed_sync(async move { self.client.close().await })
515 }
516
517 async fn broadcast_message(
523 &self,
524 _: ViewNumber,
525 message: Vec<u8>,
526 topic: HotShotTopic,
527 _broadcast_delay: BroadcastDelay,
528 ) -> Result<(), NetworkError> {
529 #[cfg(feature = "hotshot-testing")]
531 if self.is_paused.load(Ordering::Relaxed) {
532 return Ok(());
533 }
534
535 self.broadcast_message(message, topic.into())
537 .await
538 .inspect_err(|_e| {
539 self.metrics.num_failed_messages.add(1);
540 })
541 }
542
543 async fn da_broadcast_message(
549 &self,
550 _: ViewNumber,
551 message: Vec<u8>,
552 _recipients: Vec<K>,
553 _broadcast_delay: BroadcastDelay,
554 ) -> Result<(), NetworkError> {
555 #[cfg(feature = "hotshot-testing")]
557 if self.is_paused.load(Ordering::Relaxed) {
558 return Ok(());
559 }
560
561 self.broadcast_message(message, Topic::Da)
563 .await
564 .inspect_err(|_e| {
565 self.metrics.num_failed_messages.add(1);
566 })
567 }
568
569 async fn direct_message(
574 &self,
575 _: ViewNumber,
576 message: Vec<u8>,
577 recipient: K,
578 ) -> Result<(), NetworkError> {
579 if recipient == self.public_key {
581 self.internal_queue.lock().push_back(message);
582 return Ok(());
583 }
584
585 #[cfg(feature = "hotshot-testing")]
587 if self.is_paused.load(Ordering::Relaxed) {
588 return Ok(());
589 }
590
591 if let Err(e) = self
593 .client
594 .send_direct_message(&WrappedSignatureKey(recipient), message)
595 .await
596 {
597 self.metrics.num_failed_messages.add(1);
598 return Err(NetworkError::MessageSendError(format!(
599 "failed to send direct message: {e}"
600 )));
601 };
602
603 Ok(())
604 }
605
606 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
612 let queued_message = self.internal_queue.lock().pop_front();
614 if let Some(message) = queued_message {
615 return Ok(message);
616 }
617
618 let message = self.client.receive_message().await;
620
621 #[cfg(feature = "hotshot-testing")]
623 if self.is_paused.load(Ordering::Relaxed) {
624 sleep(Duration::from_millis(100)).await;
625 return Ok(vec![]);
626 }
627
628 let message = match message {
630 Ok(message) => message,
631 Err(error) => {
632 return Err(NetworkError::MessageReceiveError(format!(
633 "failed to receive message: {error}"
634 )));
635 },
636 };
637
638 let (PushCdnMessage::Broadcast(Broadcast { message, topics: _ })
640 | PushCdnMessage::Direct(Direct {
641 message,
642 recipient: _,
643 })) = message
644 else {
645 return Ok(vec![]);
646 };
647
648 Ok(message)
649 }
650
651 fn queue_node_lookup(
653 &self,
654 _view_number: ViewNumber,
655 _pk: K,
656 ) -> Result<(), TrySendError<Option<(ViewNumber, K)>>> {
657 Ok(())
658 }
659}
660
661impl From<HotShotTopic> for Topic {
662 fn from(topic: HotShotTopic) -> Self {
663 match topic {
664 HotShotTopic::Global => Topic::Global,
665 HotShotTopic::Da => Topic::Da,
666 }
667 }
668}