1#[cfg(feature = "hotshot-testing")]
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
10#[cfg(feature = "hotshot-testing")]
11use std::{path::Path, time::Duration};
12
13use async_trait::async_trait;
14use bincode::config::Options;
15use cdn_broker::reexports::{
16 connection::protocols::{Quic, Tcp},
17 def::{hook::NoMessageHook, ConnectionDef, RunDef, Topic as TopicTrait},
18 discovery::{Embedded, Redis},
19};
20#[cfg(feature = "hotshot-testing")]
21use cdn_broker::{Broker, Config as BrokerConfig};
22pub use cdn_client::reexports::crypto::signature::KeyPair;
23use cdn_client::{
24 reexports::{
25 crypto::signature::{Serializable, SignatureScheme},
26 message::{Broadcast, Direct, Message as PushCdnMessage},
27 },
28 Client, Config as ClientConfig,
29};
30#[cfg(feature = "hotshot-testing")]
31use cdn_marshal::{Config as MarshalConfig, Marshal};
32#[cfg(feature = "hotshot-testing")]
33use hotshot_types::traits::network::{
34 AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
35};
36use hotshot_types::{
37 boxed_sync,
38 data::ViewNumber,
39 traits::{
40 metrics::{Counter, Metrics, NoMetrics},
41 network::{BroadcastDelay, ConnectedNetwork, Topic as HotShotTopic},
42 signature_key::SignatureKey,
43 },
44 utils::bincode_opts,
45 BoxSyncFuture,
46};
47use num_enum::{IntoPrimitive, TryFromPrimitive};
48use parking_lot::Mutex;
49#[cfg(feature = "hotshot-testing")]
50use rand::{rngs::StdRng, RngCore, SeedableRng};
51use tokio::sync::mpsc::error::TrySendError;
52#[cfg(feature = "hotshot-testing")]
53use tokio::{spawn, time::sleep};
54#[cfg(feature = "hotshot-testing")]
55use tracing::error;
56
57use super::NetworkError;
58#[cfg(feature = "hotshot-testing")]
59use crate::NodeType;
60
61#[derive(Clone)]
63pub struct CdnMetricsValue {
64 pub num_failed_messages: Box<dyn Counter>,
66}
67
68impl CdnMetricsValue {
69 pub fn new(metrics: &dyn Metrics) -> Self {
71 let subgroup = metrics.subgroup("cdn".into());
73
74 Self {
76 num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
77 }
78 }
79}
80
81impl Default for CdnMetricsValue {
82 fn default() -> Self {
84 Self::new(&*NoMetrics::boxed())
85 }
86}
87
88#[derive(Clone, Eq, PartialEq)]
91pub struct WrappedSignatureKey<T: SignatureKey + 'static>(pub T);
92impl<T: SignatureKey> SignatureScheme for WrappedSignatureKey<T> {
93 type PrivateKey = T::PrivateKey;
94 type PublicKey = Self;
95
96 fn sign(
98 private_key: &Self::PrivateKey,
99 namespace: &str,
100 message: &[u8],
101 ) -> anyhow::Result<Vec<u8>> {
102 let message = [namespace.as_bytes(), message].concat();
104
105 let signature = T::sign(private_key, &message)?;
106 Ok(bincode_opts().serialize(&signature)?)
107 }
108
109 fn verify(
111 public_key: &Self::PublicKey,
112 namespace: &str,
113 message: &[u8],
114 signature: &[u8],
115 ) -> bool {
116 let signature: T::PureAssembledSignatureType = match bincode_opts().deserialize(signature) {
118 Ok(key) => key,
119 Err(_) => return false,
120 };
121
122 let message = [namespace.as_bytes(), message].concat();
124
125 public_key.0.validate(&signature, &message)
126 }
127}
128
129impl<T: SignatureKey> Serializable for WrappedSignatureKey<T> {
132 fn serialize(&self) -> anyhow::Result<Vec<u8>> {
133 Ok(self.0.to_bytes())
134 }
135
136 fn deserialize(serialized: &[u8]) -> anyhow::Result<Self> {
137 Ok(WrappedSignatureKey(T::from_bytes(serialized)?))
138 }
139}
140
141pub struct ProductionDef<K: SignatureKey + 'static>(PhantomData<K>);
144impl<K: SignatureKey + 'static> RunDef for ProductionDef<K> {
145 type User = UserDefQuic<K>;
146 type User2 = UserDefTcp<K>;
147 type Broker = BrokerDef<K>;
148 type DiscoveryClientType = Redis;
149 type Topic = Topic;
150}
151
152pub struct UserDefQuic<K: SignatureKey + 'static>(PhantomData<K>);
156impl<K: SignatureKey + 'static> ConnectionDef for UserDefQuic<K> {
157 type Scheme = WrappedSignatureKey<K>;
158 type Protocol = Quic;
159 type MessageHook = NoMessageHook;
160}
161
162pub struct UserDefTcp<K: SignatureKey + 'static>(PhantomData<K>);
165impl<K: SignatureKey + 'static> ConnectionDef for UserDefTcp<K> {
166 type Scheme = WrappedSignatureKey<K>;
167 type Protocol = Tcp;
168 type MessageHook = NoMessageHook;
169}
170
171pub struct BrokerDef<K: SignatureKey + 'static>(PhantomData<K>);
174impl<K: SignatureKey> ConnectionDef for BrokerDef<K> {
175 type Scheme = WrappedSignatureKey<K>;
176 type Protocol = Tcp;
177 type MessageHook = NoMessageHook;
178}
179
180#[derive(Clone)]
184pub struct ClientDef<K: SignatureKey + 'static>(PhantomData<K>);
185impl<K: SignatureKey> ConnectionDef for ClientDef<K> {
186 type Scheme = WrappedSignatureKey<K>;
187 type Protocol = Tcp;
188 type MessageHook = NoMessageHook;
189}
190
191pub struct TestingDef<K: SignatureKey + 'static>(PhantomData<K>);
194impl<K: SignatureKey + 'static> RunDef for TestingDef<K> {
195 type User = UserDefQuic<K>;
196 type User2 = UserDefTcp<K>;
197 type Broker = BrokerDef<K>;
198 type DiscoveryClientType = Embedded;
199 type Topic = Topic;
200}
201
202#[derive(Clone)]
205pub struct PushCdnNetwork<K: SignatureKey + 'static> {
207 client: Client<ClientDef<K>>,
209 metrics: Arc<CdnMetricsValue>,
211 internal_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
213 public_key: K,
215 #[cfg(feature = "hotshot-testing")]
217 is_paused: Arc<AtomicBool>,
218 }
221
222#[repr(u8)]
224#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)]
225pub enum Topic {
226 Global = 0,
228 Da = 1,
230}
231
232impl TopicTrait for Topic {}
235
236impl<K: SignatureKey + 'static> PushCdnNetwork<K> {
237 pub fn new(
244 marshal_endpoint: String,
245 topics: Vec<Topic>,
246 keypair: KeyPair<WrappedSignatureKey<K>>,
247 metrics: CdnMetricsValue,
248 ) -> anyhow::Result<Self> {
249 let config = ClientConfig {
251 endpoint: marshal_endpoint,
252 subscribed_topics: topics.into_iter().map(|t| t as u8).collect(),
253 keypair: keypair.clone(),
254 use_local_authority: true,
255 };
256
257 let client = Client::new(config);
259
260 Ok(Self {
261 client,
262 metrics: Arc::from(metrics),
263 internal_queue: Arc::new(Mutex::new(VecDeque::new())),
264 public_key: keypair.public_key.0,
265 #[cfg(feature = "hotshot-testing")]
267 is_paused: Arc::from(AtomicBool::new(false)),
268 })
269 }
270
271 async fn broadcast_message(&self, message: Vec<u8>, topic: Topic) -> Result<(), NetworkError> {
277 if self
279 .client
280 .subscribed_topics
281 .read()
282 .await
283 .contains(&(topic.clone() as u8))
284 {
285 self.internal_queue.lock().push_back(message.clone());
286 }
287
288 #[cfg(feature = "hotshot-testing")]
290 if self.is_paused.load(Ordering::Relaxed) {
291 return Ok(());
292 }
293
294 if let Err(err) = self
296 .client
297 .send_broadcast_message(vec![topic as u8], message)
298 .await
299 {
300 return Err(NetworkError::MessageReceiveError(format!(
301 "failed to send broadcast message: {err}"
302 )));
303 };
304
305 Ok(())
306 }
307}
308
309#[cfg(feature = "hotshot-testing")]
310impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
311 for PushCdnNetwork<TYPES::SignatureKey>
312{
313 #[allow(clippy::too_many_lines)]
316 fn generator(
317 _expected_node_count: usize,
318 _num_bootstrap: usize,
319 _network_id: usize,
320 da_committee_size: usize,
321 _reliability_config: Option<Box<dyn NetworkReliability>>,
322 _secondary_network_delay: Duration,
323 ) -> AsyncGenerator<Arc<Self>> {
324 let (broker_public_key, broker_private_key) =
328 TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], 1337);
329
330 let temp_dir = std::env::temp_dir();
332
333 let discovery_endpoint = temp_dir
335 .join(Path::new(&format!(
336 "test-{}.sqlite",
337 StdRng::from_entropy().next_u64()
338 )))
339 .to_string_lossy()
340 .into_owned();
341
342 let public_address_1 = format!(
344 "127.0.0.1:{}",
345 portpicker::pick_unused_port().expect("could not find an open port")
346 );
347 let public_address_2 = format!(
348 "127.0.0.1:{}",
349 portpicker::pick_unused_port().expect("could not find an open port")
350 );
351
352 for i in 0..2 {
354 let private_port = portpicker::pick_unused_port().expect("could not find an open port");
356
357 let private_address = format!("127.0.0.1:{private_port}");
359 let (public_address, other_public_address) = if i == 0 {
360 (public_address_1.clone(), public_address_2.clone())
361 } else {
362 (public_address_2.clone(), public_address_1.clone())
363 };
364
365 let broker_identifier = format!("{public_address}/{public_address}");
367 let other_broker_identifier = format!("{other_public_address}/{other_public_address}");
368
369 let config: BrokerConfig<TestingDef<TYPES::SignatureKey>> = BrokerConfig {
371 public_advertise_endpoint: public_address.clone(),
372 public_bind_endpoint: public_address,
373 private_advertise_endpoint: private_address.clone(),
374 private_bind_endpoint: private_address,
375 metrics_bind_endpoint: None,
376 keypair: KeyPair {
377 public_key: WrappedSignatureKey(broker_public_key.clone()),
378 private_key: broker_private_key.clone(),
379 },
380 discovery_endpoint: discovery_endpoint.clone(),
381
382 user_message_hook: NoMessageHook,
383 broker_message_hook: NoMessageHook,
384
385 ca_cert_path: None,
386 ca_key_path: None,
387 global_memory_pool_size: Some(1024 * 1024 * 1024),
389 };
390
391 spawn(async move {
393 let broker: Broker<TestingDef<TYPES::SignatureKey>> =
394 Broker::new(config).await.expect("broker failed to start");
395
396 if other_broker_identifier > broker_identifier {
399 sleep(Duration::from_secs(2)).await;
400 }
401
402 if let Err(err) = broker.start().await {
404 error!("broker stopped: {err}");
405 }
406 });
407 }
408
409 let marshal_port = portpicker::pick_unused_port().expect("could not find an open port");
411
412 let marshal_endpoint = format!("127.0.0.1:{marshal_port}");
414 let marshal_config = MarshalConfig {
415 bind_endpoint: marshal_endpoint.clone(),
416 discovery_endpoint,
417 metrics_bind_endpoint: None,
418 ca_cert_path: None,
419 ca_key_path: None,
420 global_memory_pool_size: Some(1024 * 1024 * 1024),
422 };
423
424 spawn(async move {
426 let marshal: Marshal<TestingDef<TYPES::SignatureKey>> = Marshal::new(marshal_config)
427 .await
428 .expect("failed to spawn marshal");
429
430 if let Err(err) = marshal.start().await {
432 error!("marshal stopped: {err}");
433 }
434 });
435
436 Box::pin({
438 move |node_id| {
439 let marshal_endpoint = marshal_endpoint.clone();
441
442 Box::pin(async move {
443 let private_key =
445 TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
446 let public_key = TYPES::SignatureKey::from_private(&private_key);
447
448 let topics = if node_id < da_committee_size as u64 {
450 vec![Topic::Da as u8, Topic::Global as u8]
451 } else {
452 vec![Topic::Global as u8]
453 };
454
455 let client_config: ClientConfig<ClientDef<TYPES::SignatureKey>> =
457 ClientConfig {
458 keypair: KeyPair {
459 public_key: WrappedSignatureKey(public_key.clone()),
460 private_key,
461 },
462 subscribed_topics: topics,
463 endpoint: marshal_endpoint,
464 use_local_authority: true,
465 };
466
467 Arc::new(PushCdnNetwork {
469 client: Client::new(client_config),
470 metrics: Arc::new(CdnMetricsValue::default()),
471 internal_queue: Arc::new(Mutex::new(VecDeque::new())),
472 public_key,
473 #[cfg(feature = "hotshot-testing")]
474 is_paused: Arc::from(AtomicBool::new(false)),
475 })
476 })
477 }
478 })
479 }
480
481 fn in_flight_message_count(&self) -> Option<usize> {
483 None
484 }
485}
486
487#[async_trait]
488impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
489 fn pause(&self) {
491 #[cfg(feature = "hotshot-testing")]
492 self.is_paused.store(true, Ordering::Relaxed);
493 }
494
495 fn resume(&self) {
497 #[cfg(feature = "hotshot-testing")]
498 self.is_paused.store(false, Ordering::Relaxed);
499 }
500
501 async fn wait_for_ready(&self) {
503 let _ = self.client.ensure_initialized().await;
504 }
505
506 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
508 where
509 'a: 'b,
510 Self: 'b,
511 {
512 boxed_sync(async move { self.client.close().await })
513 }
514
515 async fn broadcast_message(
521 &self,
522 _: ViewNumber,
523 message: Vec<u8>,
524 topic: HotShotTopic,
525 _broadcast_delay: BroadcastDelay,
526 ) -> Result<(), NetworkError> {
527 #[cfg(feature = "hotshot-testing")]
529 if self.is_paused.load(Ordering::Relaxed) {
530 return Ok(());
531 }
532
533 self.broadcast_message(message, topic.into())
535 .await
536 .inspect_err(|_e| {
537 self.metrics.num_failed_messages.add(1);
538 })
539 }
540
541 async fn da_broadcast_message(
547 &self,
548 _: ViewNumber,
549 message: Vec<u8>,
550 _recipients: Vec<K>,
551 _broadcast_delay: BroadcastDelay,
552 ) -> Result<(), NetworkError> {
553 #[cfg(feature = "hotshot-testing")]
555 if self.is_paused.load(Ordering::Relaxed) {
556 return Ok(());
557 }
558
559 self.broadcast_message(message, Topic::Da)
561 .await
562 .inspect_err(|_e| {
563 self.metrics.num_failed_messages.add(1);
564 })
565 }
566
567 async fn direct_message(
572 &self,
573 _: ViewNumber,
574 message: Vec<u8>,
575 recipient: K,
576 ) -> Result<(), NetworkError> {
577 if recipient == self.public_key {
579 self.internal_queue.lock().push_back(message);
580 return Ok(());
581 }
582
583 #[cfg(feature = "hotshot-testing")]
585 if self.is_paused.load(Ordering::Relaxed) {
586 return Ok(());
587 }
588
589 if let Err(e) = self
591 .client
592 .send_direct_message(&WrappedSignatureKey(recipient), message)
593 .await
594 {
595 self.metrics.num_failed_messages.add(1);
596 return Err(NetworkError::MessageSendError(format!(
597 "failed to send direct message: {e}"
598 )));
599 };
600
601 Ok(())
602 }
603
604 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
610 let queued_message = self.internal_queue.lock().pop_front();
612 if let Some(message) = queued_message {
613 return Ok(message);
614 }
615
616 let message = self.client.receive_message().await;
618
619 #[cfg(feature = "hotshot-testing")]
621 if self.is_paused.load(Ordering::Relaxed) {
622 sleep(Duration::from_millis(100)).await;
623 return Ok(vec![]);
624 }
625
626 let message = match message {
628 Ok(message) => message,
629 Err(error) => {
630 return Err(NetworkError::MessageReceiveError(format!(
631 "failed to receive message: {error}"
632 )));
633 },
634 };
635
636 let (PushCdnMessage::Broadcast(Broadcast { message, topics: _ })
638 | PushCdnMessage::Direct(Direct {
639 message,
640 recipient: _,
641 })) = message
642 else {
643 return Ok(vec![]);
644 };
645
646 Ok(message)
647 }
648
649 fn queue_node_lookup(
651 &self,
652 _view_number: ViewNumber,
653 _pk: K,
654 ) -> Result<(), TrySendError<Option<(ViewNumber, K)>>> {
655 Ok(())
656 }
657}
658
659impl From<HotShotTopic> for Topic {
660 fn from(topic: HotShotTopic) -> Self {
661 match topic {
662 HotShotTopic::Global => Topic::Global,
663 HotShotTopic::Da => Topic::Da,
664 }
665 }
666}