1#[cfg(feature = "hotshot-testing")]
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
10#[cfg(feature = "hotshot-testing")]
11use std::{path::Path, time::Duration};
12
13use async_trait::async_trait;
14use bincode::config::Options;
15use cdn_broker::reexports::{
16 connection::protocols::{Quic, Tcp},
17 def::{hook::NoMessageHook, ConnectionDef, RunDef, Topic as TopicTrait},
18 discovery::{Embedded, Redis},
19};
20#[cfg(feature = "hotshot-testing")]
21use cdn_broker::{Broker, Config as BrokerConfig};
22pub use cdn_client::reexports::crypto::signature::KeyPair;
23use cdn_client::{
24 reexports::{
25 crypto::signature::{Serializable, SignatureScheme},
26 message::{Broadcast, Direct, Message as PushCdnMessage},
27 },
28 Client, Config as ClientConfig,
29};
30#[cfg(feature = "hotshot-testing")]
31use cdn_marshal::{Config as MarshalConfig, Marshal};
32#[cfg(feature = "hotshot-testing")]
33use hotshot_types::traits::network::{
34 AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
35};
36use hotshot_types::{
37 boxed_sync,
38 data::ViewNumber,
39 traits::{
40 metrics::{Counter, Metrics, NoMetrics},
41 network::{BroadcastDelay, ConnectedNetwork, Topic as HotShotTopic},
42 node_implementation::NodeType,
43 signature_key::SignatureKey,
44 },
45 utils::bincode_opts,
46 BoxSyncFuture,
47};
48use num_enum::{IntoPrimitive, TryFromPrimitive};
49use parking_lot::Mutex;
50#[cfg(feature = "hotshot-testing")]
51use rand::{rngs::StdRng, RngCore, SeedableRng};
52use tokio::{spawn, sync::mpsc::error::TrySendError, time::sleep};
53#[cfg(feature = "hotshot-testing")]
54use tracing::error;
55
56use super::NetworkError;
57
58#[derive(Clone)]
60pub struct CdnMetricsValue {
61 pub num_failed_messages: Box<dyn Counter>,
63}
64
65impl CdnMetricsValue {
66 pub fn new(metrics: &dyn Metrics) -> Self {
68 let subgroup = metrics.subgroup("cdn".into());
70
71 Self {
73 num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
74 }
75 }
76}
77
78impl Default for CdnMetricsValue {
79 fn default() -> Self {
81 Self::new(&*NoMetrics::boxed())
82 }
83}
84
85#[derive(Clone, Eq, PartialEq)]
88pub struct WrappedSignatureKey<T: SignatureKey + 'static>(pub T);
89impl<T: SignatureKey> SignatureScheme for WrappedSignatureKey<T> {
90 type PrivateKey = T::PrivateKey;
91 type PublicKey = Self;
92
93 fn sign(
95 private_key: &Self::PrivateKey,
96 namespace: &str,
97 message: &[u8],
98 ) -> anyhow::Result<Vec<u8>> {
99 let message = [namespace.as_bytes(), message].concat();
101
102 let signature = T::sign(private_key, &message)?;
103 Ok(bincode_opts().serialize(&signature)?)
104 }
105
106 fn verify(
108 public_key: &Self::PublicKey,
109 namespace: &str,
110 message: &[u8],
111 signature: &[u8],
112 ) -> bool {
113 let signature: T::PureAssembledSignatureType = match bincode_opts().deserialize(signature) {
115 Ok(key) => key,
116 Err(_) => return false,
117 };
118
119 let message = [namespace.as_bytes(), message].concat();
121
122 public_key.0.validate(&signature, &message)
123 }
124}
125
126impl<T: SignatureKey> Serializable for WrappedSignatureKey<T> {
129 fn serialize(&self) -> anyhow::Result<Vec<u8>> {
130 Ok(self.0.to_bytes())
131 }
132
133 fn deserialize(serialized: &[u8]) -> anyhow::Result<Self> {
134 Ok(WrappedSignatureKey(T::from_bytes(serialized)?))
135 }
136}
137
138pub struct ProductionDef<K: SignatureKey + 'static>(PhantomData<K>);
141impl<K: SignatureKey + 'static> RunDef for ProductionDef<K> {
142 type User = UserDef<K>;
143 type Broker = BrokerDef<K>;
144 type DiscoveryClientType = Redis;
145 type Topic = Topic;
146}
147
148pub struct UserDef<K: SignatureKey + 'static>(PhantomData<K>);
151impl<K: SignatureKey + 'static> ConnectionDef for UserDef<K> {
152 type Scheme = WrappedSignatureKey<K>;
153 type Protocol = Quic;
154 type MessageHook = NoMessageHook;
155}
156
157pub struct BrokerDef<K: SignatureKey + 'static>(PhantomData<K>);
160impl<K: SignatureKey> ConnectionDef for BrokerDef<K> {
161 type Scheme = WrappedSignatureKey<K>;
162 type Protocol = Tcp;
163 type MessageHook = NoMessageHook;
164}
165
166#[derive(Clone)]
170pub struct ClientDef<K: SignatureKey + 'static>(PhantomData<K>);
171impl<K: SignatureKey> ConnectionDef for ClientDef<K> {
172 type Scheme = WrappedSignatureKey<K>;
173 type Protocol = Quic;
174 type MessageHook = NoMessageHook;
175}
176
177pub struct TestingDef<K: SignatureKey + 'static>(PhantomData<K>);
180impl<K: SignatureKey + 'static> RunDef for TestingDef<K> {
181 type User = UserDef<K>;
182 type Broker = BrokerDef<K>;
183 type DiscoveryClientType = Embedded;
184 type Topic = Topic;
185}
186
187#[derive(Clone)]
190pub struct PushCdnNetwork<K: SignatureKey + 'static> {
192 client: Client<ClientDef<K>>,
194 metrics: Arc<CdnMetricsValue>,
196 internal_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
198 public_key: K,
200 #[cfg(feature = "hotshot-testing")]
202 is_paused: Arc<AtomicBool>,
203 }
206
207#[repr(u8)]
209#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)]
210pub enum Topic {
211 Global = 0,
213 Da = 1,
215}
216
217impl TopicTrait for Topic {}
220
221impl<K: SignatureKey + 'static> PushCdnNetwork<K> {
222 pub fn new(
229 marshal_endpoint: String,
230 topics: Vec<Topic>,
231 keypair: KeyPair<WrappedSignatureKey<K>>,
232 metrics: CdnMetricsValue,
233 ) -> anyhow::Result<Self> {
234 let config = ClientConfig {
236 endpoint: marshal_endpoint,
237 subscribed_topics: topics.into_iter().map(|t| t as u8).collect(),
238 keypair: keypair.clone(),
239 use_local_authority: true,
240 };
241
242 let client = Client::new(config);
244
245 Ok(Self {
246 client,
247 metrics: Arc::from(metrics),
248 internal_queue: Arc::new(Mutex::new(VecDeque::new())),
249 public_key: keypair.public_key.0,
250 #[cfg(feature = "hotshot-testing")]
252 is_paused: Arc::from(AtomicBool::new(false)),
253 })
254 }
255
256 async fn broadcast_message(&self, message: Vec<u8>, topic: Topic) -> Result<(), NetworkError> {
262 #[cfg(feature = "hotshot-testing")]
264 if self.is_paused.load(Ordering::Relaxed) {
265 return Ok(());
266 }
267
268 if let Err(err) = self
270 .client
271 .send_broadcast_message(vec![topic as u8], message)
272 .await
273 {
274 return Err(NetworkError::MessageReceiveError(format!(
275 "failed to send broadcast message: {err}"
276 )));
277 };
278
279 Ok(())
280 }
281}
282
283#[cfg(feature = "hotshot-testing")]
284impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
285 for PushCdnNetwork<TYPES::SignatureKey>
286{
287 #[allow(clippy::too_many_lines)]
290 fn generator(
291 _expected_node_count: usize,
292 _num_bootstrap: usize,
293 _network_id: usize,
294 da_committee_size: usize,
295 _reliability_config: Option<Box<dyn NetworkReliability>>,
296 _secondary_network_delay: Duration,
297 ) -> AsyncGenerator<Arc<Self>> {
298 let (broker_public_key, broker_private_key) =
302 TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], 1337);
303
304 let temp_dir = std::env::temp_dir();
306
307 let discovery_endpoint = temp_dir
309 .join(Path::new(&format!(
310 "test-{}.sqlite",
311 StdRng::from_entropy().next_u64()
312 )))
313 .to_string_lossy()
314 .into_owned();
315
316 let public_address_1 = format!(
318 "127.0.0.1:{}",
319 portpicker::pick_unused_port().expect("could not find an open port")
320 );
321 let public_address_2 = format!(
322 "127.0.0.1:{}",
323 portpicker::pick_unused_port().expect("could not find an open port")
324 );
325
326 for i in 0..2 {
328 let private_port = portpicker::pick_unused_port().expect("could not find an open port");
330
331 let private_address = format!("127.0.0.1:{private_port}");
333 let (public_address, other_public_address) = if i == 0 {
334 (public_address_1.clone(), public_address_2.clone())
335 } else {
336 (public_address_2.clone(), public_address_1.clone())
337 };
338
339 let broker_identifier = format!("{public_address}/{public_address}");
341 let other_broker_identifier = format!("{other_public_address}/{other_public_address}");
342
343 let config: BrokerConfig<TestingDef<TYPES::SignatureKey>> = BrokerConfig {
345 public_advertise_endpoint: public_address.clone(),
346 public_bind_endpoint: public_address,
347 private_advertise_endpoint: private_address.clone(),
348 private_bind_endpoint: private_address,
349 metrics_bind_endpoint: None,
350 keypair: KeyPair {
351 public_key: WrappedSignatureKey(broker_public_key.clone()),
352 private_key: broker_private_key.clone(),
353 },
354 discovery_endpoint: discovery_endpoint.clone(),
355
356 user_message_hook: NoMessageHook,
357 broker_message_hook: NoMessageHook,
358
359 ca_cert_path: None,
360 ca_key_path: None,
361 global_memory_pool_size: Some(1024 * 1024 * 1024),
363 };
364
365 spawn(async move {
367 let broker: Broker<TestingDef<TYPES::SignatureKey>> =
368 Broker::new(config).await.expect("broker failed to start");
369
370 if other_broker_identifier > broker_identifier {
373 sleep(Duration::from_secs(2)).await;
374 }
375
376 if let Err(err) = broker.start().await {
378 error!("broker stopped: {err}");
379 }
380 });
381 }
382
383 let marshal_port = portpicker::pick_unused_port().expect("could not find an open port");
385
386 let marshal_endpoint = format!("127.0.0.1:{marshal_port}");
388 let marshal_config = MarshalConfig {
389 bind_endpoint: marshal_endpoint.clone(),
390 discovery_endpoint,
391 metrics_bind_endpoint: None,
392 ca_cert_path: None,
393 ca_key_path: None,
394 global_memory_pool_size: Some(1024 * 1024 * 1024),
396 };
397
398 spawn(async move {
400 let marshal: Marshal<TestingDef<TYPES::SignatureKey>> = Marshal::new(marshal_config)
401 .await
402 .expect("failed to spawn marshal");
403
404 if let Err(err) = marshal.start().await {
406 error!("marshal stopped: {err}");
407 }
408 });
409
410 Box::pin({
412 move |node_id| {
413 let marshal_endpoint = marshal_endpoint.clone();
415
416 Box::pin(async move {
417 let private_key =
419 TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
420 let public_key = TYPES::SignatureKey::from_private(&private_key);
421
422 let topics = if node_id < da_committee_size as u64 {
424 vec![Topic::Da as u8, Topic::Global as u8]
425 } else {
426 vec![Topic::Global as u8]
427 };
428
429 let client_config: ClientConfig<ClientDef<TYPES::SignatureKey>> =
431 ClientConfig {
432 keypair: KeyPair {
433 public_key: WrappedSignatureKey(public_key.clone()),
434 private_key,
435 },
436 subscribed_topics: topics,
437 endpoint: marshal_endpoint,
438 use_local_authority: true,
439 };
440
441 Arc::new(PushCdnNetwork {
443 client: Client::new(client_config),
444 metrics: Arc::new(CdnMetricsValue::default()),
445 internal_queue: Arc::new(Mutex::new(VecDeque::new())),
446 public_key,
447 #[cfg(feature = "hotshot-testing")]
448 is_paused: Arc::from(AtomicBool::new(false)),
449 })
450 })
451 }
452 })
453 }
454
455 fn in_flight_message_count(&self) -> Option<usize> {
457 None
458 }
459}
460
461#[async_trait]
462impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
463 fn pause(&self) {
465 #[cfg(feature = "hotshot-testing")]
466 self.is_paused.store(true, Ordering::Relaxed);
467 }
468
469 fn resume(&self) {
471 #[cfg(feature = "hotshot-testing")]
472 self.is_paused.store(false, Ordering::Relaxed);
473 }
474
475 async fn wait_for_ready(&self) {
477 let _ = self.client.ensure_initialized().await;
478 }
479
480 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
482 where
483 'a: 'b,
484 Self: 'b,
485 {
486 boxed_sync(async move { self.client.close().await })
487 }
488
489 async fn broadcast_message(
495 &self,
496 message: Vec<u8>,
497 topic: HotShotTopic,
498 _broadcast_delay: BroadcastDelay,
499 ) -> Result<(), NetworkError> {
500 #[cfg(feature = "hotshot-testing")]
502 if self.is_paused.load(Ordering::Relaxed) {
503 return Ok(());
504 }
505 self.broadcast_message(message, topic.into())
506 .await
507 .inspect_err(|_e| {
508 self.metrics.num_failed_messages.add(1);
509 })
510 }
511
512 async fn da_broadcast_message(
518 &self,
519 message: Vec<u8>,
520 _recipients: Vec<K>,
521 _broadcast_delay: BroadcastDelay,
522 ) -> Result<(), NetworkError> {
523 #[cfg(feature = "hotshot-testing")]
525 if self.is_paused.load(Ordering::Relaxed) {
526 return Ok(());
527 }
528 self.broadcast_message(message, Topic::Da)
529 .await
530 .inspect_err(|_e| {
531 self.metrics.num_failed_messages.add(1);
532 })
533 }
534
535 async fn direct_message(&self, message: Vec<u8>, recipient: K) -> Result<(), NetworkError> {
540 if recipient == self.public_key {
542 self.internal_queue.lock().push_back(message);
543 return Ok(());
544 }
545
546 #[cfg(feature = "hotshot-testing")]
548 if self.is_paused.load(Ordering::Relaxed) {
549 return Ok(());
550 }
551
552 if let Err(e) = self
554 .client
555 .send_direct_message(&WrappedSignatureKey(recipient), message)
556 .await
557 {
558 self.metrics.num_failed_messages.add(1);
559 return Err(NetworkError::MessageSendError(format!(
560 "failed to send direct message: {e}"
561 )));
562 };
563
564 Ok(())
565 }
566
567 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
573 if let Some(message) = self.internal_queue.lock().pop_front() {
575 return Ok(message);
576 }
577
578 let message = self.client.receive_message().await;
580
581 #[cfg(feature = "hotshot-testing")]
583 if self.is_paused.load(Ordering::Relaxed) {
584 sleep(Duration::from_millis(100)).await;
585 return Ok(vec![]);
586 }
587
588 let message = match message {
590 Ok(message) => message,
591 Err(error) => {
592 return Err(NetworkError::MessageReceiveError(format!(
593 "failed to receive message: {error}"
594 )));
595 },
596 };
597
598 let (PushCdnMessage::Broadcast(Broadcast { message, topics: _ })
600 | PushCdnMessage::Direct(Direct {
601 message,
602 recipient: _,
603 })) = message
604 else {
605 return Ok(vec![]);
606 };
607
608 Ok(message)
609 }
610
611 fn queue_node_lookup(
613 &self,
614 _view_number: ViewNumber,
615 _pk: K,
616 ) -> Result<(), TrySendError<Option<(ViewNumber, K)>>> {
617 Ok(())
618 }
619}
620
621impl From<HotShotTopic> for Topic {
622 fn from(topic: HotShotTopic) -> Self {
623 match topic {
624 HotShotTopic::Global => Topic::Global,
625 HotShotTopic::Da => Topic::Da,
626 }
627 }
628}