hotshot/traits/networking/
push_cdn_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7#[cfg(feature = "hotshot-testing")]
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
10#[cfg(feature = "hotshot-testing")]
11use std::{path::Path, time::Duration};
12
13use async_trait::async_trait;
14use bincode::config::Options;
15use cdn_broker::reexports::{
16    connection::protocols::{Quic, Tcp},
17    def::{hook::NoMessageHook, ConnectionDef, RunDef, Topic as TopicTrait},
18    discovery::{Embedded, Redis},
19};
20#[cfg(feature = "hotshot-testing")]
21use cdn_broker::{Broker, Config as BrokerConfig};
22pub use cdn_client::reexports::crypto::signature::KeyPair;
23use cdn_client::{
24    reexports::{
25        crypto::signature::{Serializable, SignatureScheme},
26        message::{Broadcast, Direct, Message as PushCdnMessage},
27    },
28    Client, Config as ClientConfig,
29};
30#[cfg(feature = "hotshot-testing")]
31use cdn_marshal::{Config as MarshalConfig, Marshal};
32#[cfg(feature = "hotshot-testing")]
33use hotshot_types::traits::network::{
34    AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
35};
36use hotshot_types::{
37    boxed_sync,
38    data::ViewNumber,
39    traits::{
40        metrics::{Counter, Metrics, NoMetrics},
41        network::{BroadcastDelay, ConnectedNetwork, Topic as HotShotTopic},
42        signature_key::SignatureKey,
43    },
44    utils::bincode_opts,
45    BoxSyncFuture,
46};
47use num_enum::{IntoPrimitive, TryFromPrimitive};
48use parking_lot::Mutex;
49#[cfg(feature = "hotshot-testing")]
50use rand::{rngs::StdRng, RngCore, SeedableRng};
51use tokio::sync::mpsc::error::TrySendError;
52#[cfg(feature = "hotshot-testing")]
53use tokio::{spawn, time::sleep};
54#[cfg(feature = "hotshot-testing")]
55use tracing::error;
56
57use super::NetworkError;
58#[cfg(feature = "hotshot-testing")]
59use crate::NodeType;
60
61/// CDN-specific metrics
62#[derive(Clone)]
63pub struct CdnMetricsValue {
64    /// The number of failed messages
65    pub num_failed_messages: Box<dyn Counter>,
66}
67
68impl CdnMetricsValue {
69    /// Populate the metrics with the CDN-specific ones
70    pub fn new(metrics: &dyn Metrics) -> Self {
71        // Create a subgroup for the CDN
72        let subgroup = metrics.subgroup("cdn".into());
73
74        // Create the CDN-specific metrics
75        Self {
76            num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
77        }
78    }
79}
80
81impl Default for CdnMetricsValue {
82    // The default is empty metrics
83    fn default() -> Self {
84        Self::new(&*NoMetrics::boxed())
85    }
86}
87
88/// A wrapped `SignatureKey`. We need to implement the Push CDN's `SignatureScheme`
89/// trait in order to sign and verify messages to/from the CDN.
90#[derive(Clone, Eq, PartialEq)]
91pub struct WrappedSignatureKey<T: SignatureKey + 'static>(pub T);
92impl<T: SignatureKey> SignatureScheme for WrappedSignatureKey<T> {
93    type PrivateKey = T::PrivateKey;
94    type PublicKey = Self;
95
96    /// Sign a message of arbitrary data and return the serialized signature
97    fn sign(
98        private_key: &Self::PrivateKey,
99        namespace: &str,
100        message: &[u8],
101    ) -> anyhow::Result<Vec<u8>> {
102        // Combine the namespace and message into a single byte array
103        let message = [namespace.as_bytes(), message].concat();
104
105        let signature = T::sign(private_key, &message)?;
106        Ok(bincode_opts().serialize(&signature)?)
107    }
108
109    /// Verify a message of arbitrary data and return the result
110    fn verify(
111        public_key: &Self::PublicKey,
112        namespace: &str,
113        message: &[u8],
114        signature: &[u8],
115    ) -> bool {
116        // Deserialize the signature
117        let signature: T::PureAssembledSignatureType = match bincode_opts().deserialize(signature) {
118            Ok(key) => key,
119            Err(_) => return false,
120        };
121
122        // Combine the namespace and message into a single byte array
123        let message = [namespace.as_bytes(), message].concat();
124
125        public_key.0.validate(&signature, &message)
126    }
127}
128
129/// We need to implement the `Serializable` so the Push CDN can serialize the signatures
130/// and public keys and send them over the wire.
131impl<T: SignatureKey> Serializable for WrappedSignatureKey<T> {
132    fn serialize(&self) -> anyhow::Result<Vec<u8>> {
133        Ok(self.0.to_bytes())
134    }
135
136    fn deserialize(serialized: &[u8]) -> anyhow::Result<Self> {
137        Ok(WrappedSignatureKey(T::from_bytes(serialized)?))
138    }
139}
140
141/// The production run definition for the Push CDN.
142/// Uses the real protocols and a Redis discovery client.
143pub struct ProductionDef<K: SignatureKey + 'static>(PhantomData<K>);
144impl<K: SignatureKey + 'static> RunDef for ProductionDef<K> {
145    type User = UserDef<K>;
146    type Broker = BrokerDef<K>;
147    type DiscoveryClientType = Redis;
148    type Topic = Topic;
149}
150
151/// The user definition for the Push CDN.
152/// Uses the TCP+TLS protocol and untrusted middleware.
153pub struct UserDef<K: SignatureKey + 'static>(PhantomData<K>);
154impl<K: SignatureKey + 'static> ConnectionDef for UserDef<K> {
155    type Scheme = WrappedSignatureKey<K>;
156    type Protocol = Quic;
157    type MessageHook = NoMessageHook;
158}
159
160/// The broker definition for the Push CDN.
161/// Uses the TCP protocol and trusted middleware.
162pub struct BrokerDef<K: SignatureKey + 'static>(PhantomData<K>);
163impl<K: SignatureKey> ConnectionDef for BrokerDef<K> {
164    type Scheme = WrappedSignatureKey<K>;
165    type Protocol = Tcp;
166    type MessageHook = NoMessageHook;
167}
168
169/// The client definition for the Push CDN. Uses the TCP+TLS
170/// protocol and no middleware. Differs from the user
171/// definition in that is on the client-side.
172#[derive(Clone)]
173pub struct ClientDef<K: SignatureKey + 'static>(PhantomData<K>);
174impl<K: SignatureKey> ConnectionDef for ClientDef<K> {
175    type Scheme = WrappedSignatureKey<K>;
176    type Protocol = Quic;
177    type MessageHook = NoMessageHook;
178}
179
180/// The testing run definition for the Push CDN.
181/// Uses the real protocols, but with an embedded discovery client.
182pub struct TestingDef<K: SignatureKey + 'static>(PhantomData<K>);
183impl<K: SignatureKey + 'static> RunDef for TestingDef<K> {
184    type User = UserDef<K>;
185    type Broker = BrokerDef<K>;
186    type DiscoveryClientType = Embedded;
187    type Topic = Topic;
188}
189
190/// A communication channel to the Push CDN, which is a collection of brokers and a marshal
191/// that helps organize them all.
192#[derive(Clone)]
193/// Is generic over both the type of key and the network protocol.
194pub struct PushCdnNetwork<K: SignatureKey + 'static> {
195    /// The underlying client
196    client: Client<ClientDef<K>>,
197    /// The CDN-specific metrics
198    metrics: Arc<CdnMetricsValue>,
199    /// The internal queue for messages to ourselves
200    internal_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
201    /// The public key of this node
202    public_key: K,
203    /// Whether or not the underlying network is supposed to be paused
204    #[cfg(feature = "hotshot-testing")]
205    is_paused: Arc<AtomicBool>,
206    // The receiver channel for
207    // request_receiver_channel: TakeReceiver,
208}
209
210/// The enum for the topics we can subscribe to in the Push CDN
211#[repr(u8)]
212#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)]
213pub enum Topic {
214    /// The global topic
215    Global = 0,
216    /// The DA topic
217    Da = 1,
218}
219
220/// Implement the `TopicTrait` for our `Topic` enum. We need this to filter
221/// topics that are not implemented at the application level.
222impl TopicTrait for Topic {}
223
224impl<K: SignatureKey + 'static> PushCdnNetwork<K> {
225    /// Create a new `PushCdnNetwork` (really a client) from a marshal endpoint, a list of initial
226    /// topics we are interested in, and our wrapped keypair that we use to authenticate with the
227    /// marshal.
228    ///
229    /// # Errors
230    /// If we fail to build the config
231    pub fn new(
232        marshal_endpoint: String,
233        topics: Vec<Topic>,
234        keypair: KeyPair<WrappedSignatureKey<K>>,
235        metrics: CdnMetricsValue,
236    ) -> anyhow::Result<Self> {
237        // Build config
238        let config = ClientConfig {
239            endpoint: marshal_endpoint,
240            subscribed_topics: topics.into_iter().map(|t| t as u8).collect(),
241            keypair: keypair.clone(),
242            use_local_authority: true,
243        };
244
245        // Create the client from the config
246        let client = Client::new(config);
247
248        Ok(Self {
249            client,
250            metrics: Arc::from(metrics),
251            internal_queue: Arc::new(Mutex::new(VecDeque::new())),
252            public_key: keypair.public_key.0,
253            // Start unpaused
254            #[cfg(feature = "hotshot-testing")]
255            is_paused: Arc::from(AtomicBool::new(false)),
256        })
257    }
258
259    /// Broadcast a message to members of the particular topic. Does not retry.
260    ///
261    /// # Errors
262    /// - If we fail to serialize the message
263    /// - If we fail to send the broadcast message.
264    async fn broadcast_message(&self, message: Vec<u8>, topic: Topic) -> Result<(), NetworkError> {
265        // If we're paused, don't send the message
266        #[cfg(feature = "hotshot-testing")]
267        if self.is_paused.load(Ordering::Relaxed) {
268            return Ok(());
269        }
270
271        // Send the message
272        if let Err(err) = self
273            .client
274            .send_broadcast_message(vec![topic as u8], message)
275            .await
276        {
277            return Err(NetworkError::MessageReceiveError(format!(
278                "failed to send broadcast message: {err}"
279            )));
280        };
281
282        Ok(())
283    }
284}
285
286#[cfg(feature = "hotshot-testing")]
287impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
288    for PushCdnNetwork<TYPES::SignatureKey>
289{
290    /// Generate n Push CDN clients, a marshal, and two brokers (that run locally).
291    /// Uses a `SQLite` database instead of Redis.
292    #[allow(clippy::too_many_lines)]
293    fn generator(
294        _expected_node_count: usize,
295        _num_bootstrap: usize,
296        _network_id: usize,
297        da_committee_size: usize,
298        _reliability_config: Option<Box<dyn NetworkReliability>>,
299        _secondary_network_delay: Duration,
300    ) -> AsyncGenerator<Arc<Self>> {
301        // The configuration we are using for testing is 2 brokers & 1 marshal
302
303        // A keypair shared between brokers
304        let (broker_public_key, broker_private_key) =
305            TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], 1337);
306
307        // Get the OS temporary directory
308        let temp_dir = std::env::temp_dir();
309
310        // Create an SQLite file inside of the temporary directory
311        let discovery_endpoint = temp_dir
312            .join(Path::new(&format!(
313                "test-{}.sqlite",
314                StdRng::from_entropy().next_u64()
315            )))
316            .to_string_lossy()
317            .into_owned();
318
319        // Pick some unused public ports
320        let public_address_1 = format!(
321            "127.0.0.1:{}",
322            portpicker::pick_unused_port().expect("could not find an open port")
323        );
324        let public_address_2 = format!(
325            "127.0.0.1:{}",
326            portpicker::pick_unused_port().expect("could not find an open port")
327        );
328
329        // 2 brokers
330        for i in 0..2 {
331            // Get the ports to bind to
332            let private_port = portpicker::pick_unused_port().expect("could not find an open port");
333
334            // Extrapolate addresses
335            let private_address = format!("127.0.0.1:{private_port}");
336            let (public_address, other_public_address) = if i == 0 {
337                (public_address_1.clone(), public_address_2.clone())
338            } else {
339                (public_address_2.clone(), public_address_1.clone())
340            };
341
342            // Calculate the broker identifiers
343            let broker_identifier = format!("{public_address}/{public_address}");
344            let other_broker_identifier = format!("{other_public_address}/{other_public_address}");
345
346            // Configure the broker
347            let config: BrokerConfig<TestingDef<TYPES::SignatureKey>> = BrokerConfig {
348                public_advertise_endpoint: public_address.clone(),
349                public_bind_endpoint: public_address,
350                private_advertise_endpoint: private_address.clone(),
351                private_bind_endpoint: private_address,
352                metrics_bind_endpoint: None,
353                keypair: KeyPair {
354                    public_key: WrappedSignatureKey(broker_public_key.clone()),
355                    private_key: broker_private_key.clone(),
356                },
357                discovery_endpoint: discovery_endpoint.clone(),
358
359                user_message_hook: NoMessageHook,
360                broker_message_hook: NoMessageHook,
361
362                ca_cert_path: None,
363                ca_key_path: None,
364                // 1GB
365                global_memory_pool_size: Some(1024 * 1024 * 1024),
366            };
367
368            // Create and spawn the broker
369            spawn(async move {
370                let broker: Broker<TestingDef<TYPES::SignatureKey>> =
371                    Broker::new(config).await.expect("broker failed to start");
372
373                // If we are the first broker by identifier, we need to sleep a bit
374                // for discovery to happen first
375                if other_broker_identifier > broker_identifier {
376                    sleep(Duration::from_secs(2)).await;
377                }
378
379                // Error if we stopped unexpectedly
380                if let Err(err) = broker.start().await {
381                    error!("broker stopped: {err}");
382                }
383            });
384        }
385
386        // Get the port to use for the marshal
387        let marshal_port = portpicker::pick_unused_port().expect("could not find an open port");
388
389        // Configure the marshal
390        let marshal_endpoint = format!("127.0.0.1:{marshal_port}");
391        let marshal_config = MarshalConfig {
392            bind_endpoint: marshal_endpoint.clone(),
393            discovery_endpoint,
394            metrics_bind_endpoint: None,
395            ca_cert_path: None,
396            ca_key_path: None,
397            // 1GB
398            global_memory_pool_size: Some(1024 * 1024 * 1024),
399        };
400
401        // Spawn the marshal
402        spawn(async move {
403            let marshal: Marshal<TestingDef<TYPES::SignatureKey>> = Marshal::new(marshal_config)
404                .await
405                .expect("failed to spawn marshal");
406
407            // Error if we stopped unexpectedly
408            if let Err(err) = marshal.start().await {
409                error!("marshal stopped: {err}");
410            }
411        });
412
413        // This function is called for each client we spawn
414        Box::pin({
415            move |node_id| {
416                // Clone this so we can pin the future
417                let marshal_endpoint = marshal_endpoint.clone();
418
419                Box::pin(async move {
420                    // Derive our public and priate keys from our index
421                    let private_key =
422                        TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
423                    let public_key = TYPES::SignatureKey::from_private(&private_key);
424
425                    // Calculate if we're DA or not
426                    let topics = if node_id < da_committee_size as u64 {
427                        vec![Topic::Da as u8, Topic::Global as u8]
428                    } else {
429                        vec![Topic::Global as u8]
430                    };
431
432                    // Configure our client
433                    let client_config: ClientConfig<ClientDef<TYPES::SignatureKey>> =
434                        ClientConfig {
435                            keypair: KeyPair {
436                                public_key: WrappedSignatureKey(public_key.clone()),
437                                private_key,
438                            },
439                            subscribed_topics: topics,
440                            endpoint: marshal_endpoint,
441                            use_local_authority: true,
442                        };
443
444                    // Create our client
445                    Arc::new(PushCdnNetwork {
446                        client: Client::new(client_config),
447                        metrics: Arc::new(CdnMetricsValue::default()),
448                        internal_queue: Arc::new(Mutex::new(VecDeque::new())),
449                        public_key,
450                        #[cfg(feature = "hotshot-testing")]
451                        is_paused: Arc::from(AtomicBool::new(false)),
452                    })
453                })
454            }
455        })
456    }
457
458    /// The PushCDN does not support in-flight message counts
459    fn in_flight_message_count(&self) -> Option<usize> {
460        None
461    }
462}
463
464#[async_trait]
465impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
466    /// Pause sending and receiving on the PushCDN network.
467    fn pause(&self) {
468        #[cfg(feature = "hotshot-testing")]
469        self.is_paused.store(true, Ordering::Relaxed);
470    }
471
472    /// Resume sending and receiving on the PushCDN network.
473    fn resume(&self) {
474        #[cfg(feature = "hotshot-testing")]
475        self.is_paused.store(false, Ordering::Relaxed);
476    }
477
478    /// Wait for the client to initialize the connection
479    async fn wait_for_ready(&self) {
480        let _ = self.client.ensure_initialized().await;
481    }
482
483    /// TODO: shut down the networks. Unneeded for testing.
484    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
485    where
486        'a: 'b,
487        Self: 'b,
488    {
489        boxed_sync(async move { self.client.close().await })
490    }
491
492    /// Broadcast a message to all members of the quorum.
493    ///
494    /// # Errors
495    /// - If we fail to serialize the message
496    /// - If we fail to send the broadcast message.
497    async fn broadcast_message(
498        &self,
499        message: Vec<u8>,
500        topic: HotShotTopic,
501        _broadcast_delay: BroadcastDelay,
502    ) -> Result<(), NetworkError> {
503        // If we're paused, don't send the message
504        #[cfg(feature = "hotshot-testing")]
505        if self.is_paused.load(Ordering::Relaxed) {
506            return Ok(());
507        }
508        self.broadcast_message(message, topic.into())
509            .await
510            .inspect_err(|_e| {
511                self.metrics.num_failed_messages.add(1);
512            })
513    }
514
515    /// Broadcast a message to all members of the DA committee.
516    ///
517    /// # Errors
518    /// - If we fail to serialize the message
519    /// - If we fail to send the broadcast message.
520    async fn da_broadcast_message(
521        &self,
522        message: Vec<u8>,
523        _recipients: Vec<K>,
524        _broadcast_delay: BroadcastDelay,
525    ) -> Result<(), NetworkError> {
526        // If we're paused, don't send the message
527        #[cfg(feature = "hotshot-testing")]
528        if self.is_paused.load(Ordering::Relaxed) {
529            return Ok(());
530        }
531        self.broadcast_message(message, Topic::Da)
532            .await
533            .inspect_err(|_e| {
534                self.metrics.num_failed_messages.add(1);
535            })
536    }
537
538    /// Send a direct message to a node with a particular key. Does not retry.
539    ///
540    /// - If we fail to serialize the message
541    /// - If we fail to send the direct message
542    async fn direct_message(&self, message: Vec<u8>, recipient: K) -> Result<(), NetworkError> {
543        // If the message is to ourselves, just add it to the internal queue
544        if recipient == self.public_key {
545            self.internal_queue.lock().push_back(message);
546            return Ok(());
547        }
548
549        // If we're paused, don't send the message
550        #[cfg(feature = "hotshot-testing")]
551        if self.is_paused.load(Ordering::Relaxed) {
552            return Ok(());
553        }
554
555        // Send the message
556        if let Err(e) = self
557            .client
558            .send_direct_message(&WrappedSignatureKey(recipient), message)
559            .await
560        {
561            self.metrics.num_failed_messages.add(1);
562            return Err(NetworkError::MessageSendError(format!(
563                "failed to send direct message: {e}"
564            )));
565        };
566
567        Ok(())
568    }
569
570    /// Receive a message. Is agnostic over `transmit_type`, which has an issue
571    /// to be removed anyway.
572    ///
573    /// # Errors
574    /// - If we fail to receive messages. Will trigger a retry automatically.
575    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
576        // If we have a message in the internal queue, return it
577        let queued_message = self.internal_queue.lock().pop_front();
578        if let Some(message) = queued_message {
579            return Ok(message);
580        }
581
582        // Receive a message from the network
583        let message = self.client.receive_message().await;
584
585        // If we're paused, receive but don't process messages
586        #[cfg(feature = "hotshot-testing")]
587        if self.is_paused.load(Ordering::Relaxed) {
588            sleep(Duration::from_millis(100)).await;
589            return Ok(vec![]);
590        }
591
592        // If it was an error, wait a bit and retry
593        let message = match message {
594            Ok(message) => message,
595            Err(error) => {
596                return Err(NetworkError::MessageReceiveError(format!(
597                    "failed to receive message: {error}"
598                )));
599            },
600        };
601
602        // Extract the underlying message
603        let (PushCdnMessage::Broadcast(Broadcast { message, topics: _ })
604        | PushCdnMessage::Direct(Direct {
605            message,
606            recipient: _,
607        })) = message
608        else {
609            return Ok(vec![]);
610        };
611
612        Ok(message)
613    }
614
615    /// Do nothing here, as we don't need to look up nodes.
616    fn queue_node_lookup(
617        &self,
618        _view_number: ViewNumber,
619        _pk: K,
620    ) -> Result<(), TrySendError<Option<(ViewNumber, K)>>> {
621        Ok(())
622    }
623}
624
625impl From<HotShotTopic> for Topic {
626    fn from(topic: HotShotTopic) -> Self {
627        match topic {
628            HotShotTopic::Global => Topic::Global,
629            HotShotTopic::Da => Topic::Da,
630        }
631    }
632}