hotshot/traits/networking/
push_cdn_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7#[cfg(feature = "hotshot-testing")]
8use std::{
9    collections::HashMap,
10    sync::atomic::{AtomicBool, Ordering},
11};
12use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
13#[cfg(feature = "hotshot-testing")]
14use std::{path::Path, time::Duration};
15
16use async_trait::async_trait;
17use bincode::config::Options;
18use cdn_broker::reexports::{
19    connection::protocols::{Quic, Tcp},
20    def::{ConnectionDef, RunDef, Topic as TopicTrait, hook::NoMessageHook},
21    discovery::{Embedded, Redis},
22};
23#[cfg(feature = "hotshot-testing")]
24use cdn_broker::{Broker, Config as BrokerConfig};
25pub use cdn_client::reexports::crypto::signature::KeyPair;
26use cdn_client::{
27    Client, Config as ClientConfig,
28    reexports::{
29        crypto::signature::{Serializable, SignatureScheme},
30        message::{Broadcast, Direct, Message as PushCdnMessage},
31    },
32};
33#[cfg(feature = "hotshot-testing")]
34use cdn_marshal::{Config as MarshalConfig, Marshal};
35use hotshot_types::{
36    BoxSyncFuture, boxed_sync,
37    data::ViewNumber,
38    traits::{
39        metrics::{Counter, Metrics, NoMetrics},
40        network::{BroadcastDelay, ConnectedNetwork, Topic as HotShotTopic},
41        signature_key::SignatureKey,
42    },
43    utils::bincode_opts,
44};
45#[cfg(feature = "hotshot-testing")]
46use hotshot_types::{
47    PeerConnectInfo,
48    traits::network::{AsyncGenerator, NetworkReliability, TestableNetworkingImplementation},
49};
50use num_enum::{IntoPrimitive, TryFromPrimitive};
51use parking_lot::Mutex;
52#[cfg(feature = "hotshot-testing")]
53use rand::{RngCore, SeedableRng, rngs::StdRng};
54use test_utils::reserve_tcp_port;
55use tokio::sync::mpsc::error::TrySendError;
56#[cfg(feature = "hotshot-testing")]
57use tokio::{spawn, time::sleep};
58#[cfg(feature = "hotshot-testing")]
59use tracing::error;
60
61use super::NetworkError;
62#[cfg(feature = "hotshot-testing")]
63use crate::NodeType;
64
65/// CDN-specific metrics
66#[derive(Clone)]
67pub struct CdnMetricsValue {
68    /// The number of failed messages
69    pub num_failed_messages: Box<dyn Counter>,
70}
71
72impl CdnMetricsValue {
73    /// Populate the metrics with the CDN-specific ones
74    pub fn new(metrics: &dyn Metrics) -> Self {
75        // Create a subgroup for the CDN
76        let subgroup = metrics.subgroup("cdn".into());
77
78        // Create the CDN-specific metrics
79        Self {
80            num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
81        }
82    }
83}
84
85impl Default for CdnMetricsValue {
86    // The default is empty metrics
87    fn default() -> Self {
88        Self::new(&*NoMetrics::boxed())
89    }
90}
91
92/// A wrapped `SignatureKey`. We need to implement the Push CDN's `SignatureScheme`
93/// trait in order to sign and verify messages to/from the CDN.
94#[derive(Clone, Eq, PartialEq)]
95pub struct WrappedSignatureKey<T: SignatureKey + 'static>(pub T);
96impl<T: SignatureKey> SignatureScheme for WrappedSignatureKey<T> {
97    type PrivateKey = T::PrivateKey;
98    type PublicKey = Self;
99
100    /// Sign a message of arbitrary data and return the serialized signature
101    fn sign(
102        private_key: &Self::PrivateKey,
103        namespace: &str,
104        message: &[u8],
105    ) -> anyhow::Result<Vec<u8>> {
106        // Combine the namespace and message into a single byte array
107        let message = [namespace.as_bytes(), message].concat();
108
109        let signature = T::sign(private_key, &message)?;
110        Ok(bincode_opts().serialize(&signature)?)
111    }
112
113    /// Verify a message of arbitrary data and return the result
114    fn verify(
115        public_key: &Self::PublicKey,
116        namespace: &str,
117        message: &[u8],
118        signature: &[u8],
119    ) -> bool {
120        // Deserialize the signature
121        let signature: T::PureAssembledSignatureType = match bincode_opts().deserialize(signature) {
122            Ok(key) => key,
123            Err(_) => return false,
124        };
125
126        // Combine the namespace and message into a single byte array
127        let message = [namespace.as_bytes(), message].concat();
128
129        public_key.0.validate(&signature, &message)
130    }
131}
132
133/// We need to implement the `Serializable` so the Push CDN can serialize the signatures
134/// and public keys and send them over the wire.
135impl<T: SignatureKey> Serializable for WrappedSignatureKey<T> {
136    fn serialize(&self) -> anyhow::Result<Vec<u8>> {
137        Ok(self.0.to_bytes())
138    }
139
140    fn deserialize(serialized: &[u8]) -> anyhow::Result<Self> {
141        Ok(WrappedSignatureKey(T::from_bytes(serialized)?))
142    }
143}
144
145/// The production run definition for the Push CDN.
146/// Uses the real protocols and a Redis discovery client.
147pub struct ProductionDef<K: SignatureKey + 'static>(PhantomData<K>);
148impl<K: SignatureKey + 'static> RunDef for ProductionDef<K> {
149    type User = UserDefQuic<K>;
150    type User2 = UserDefTcp<K>;
151    type Broker = BrokerDef<K>;
152    type DiscoveryClientType = Redis;
153    type Topic = Topic;
154}
155
156/// The user definition for the Push CDN.
157/// Uses the Quic protocol and untrusted middleware.
158/// RM TODO: Remove this, switching to TCP singularly when everyone has updated
159pub struct UserDefQuic<K: SignatureKey + 'static>(PhantomData<K>);
160impl<K: SignatureKey + 'static> ConnectionDef for UserDefQuic<K> {
161    type Scheme = WrappedSignatureKey<K>;
162    type Protocol = Quic;
163    type MessageHook = NoMessageHook;
164}
165
166/// The user definition for the Push CDN.
167/// Uses the TCP protocol and untrusted middleware.
168pub struct UserDefTcp<K: SignatureKey + 'static>(PhantomData<K>);
169impl<K: SignatureKey + 'static> ConnectionDef for UserDefTcp<K> {
170    type Scheme = WrappedSignatureKey<K>;
171    type Protocol = Tcp;
172    type MessageHook = NoMessageHook;
173}
174
175/// The broker definition for the Push CDN.
176/// Uses the TCP protocol and trusted middleware.
177pub struct BrokerDef<K: SignatureKey + 'static>(PhantomData<K>);
178impl<K: SignatureKey> ConnectionDef for BrokerDef<K> {
179    type Scheme = WrappedSignatureKey<K>;
180    type Protocol = Tcp;
181    type MessageHook = NoMessageHook;
182}
183
184/// The client definition for the Push CDN. Uses the TCP+TLS
185/// protocol and no middleware. Differs from the user
186/// definition in that is on the client-side.
187#[derive(Clone)]
188pub struct ClientDef<K: SignatureKey + 'static>(PhantomData<K>);
189impl<K: SignatureKey> ConnectionDef for ClientDef<K> {
190    type Scheme = WrappedSignatureKey<K>;
191    type Protocol = Tcp;
192    type MessageHook = NoMessageHook;
193}
194
195/// The testing run definition for the Push CDN.
196/// Uses the real protocols, but with an embedded discovery client.
197pub struct TestingDef<K: SignatureKey + 'static>(PhantomData<K>);
198impl<K: SignatureKey + 'static> RunDef for TestingDef<K> {
199    type User = UserDefQuic<K>;
200    type User2 = UserDefTcp<K>;
201    type Broker = BrokerDef<K>;
202    type DiscoveryClientType = Embedded;
203    type Topic = Topic;
204}
205
206/// A communication channel to the Push CDN, which is a collection of brokers and a marshal
207/// that helps organize them all.
208#[derive(Clone)]
209/// Is generic over both the type of key and the network protocol.
210pub struct PushCdnNetwork<K: SignatureKey + 'static> {
211    /// The underlying client
212    client: Client<ClientDef<K>>,
213    /// The CDN-specific metrics
214    metrics: Arc<CdnMetricsValue>,
215    /// The internal queue for messages to ourselves
216    internal_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
217    /// The public key of this node
218    public_key: K,
219    /// Whether or not the underlying network is supposed to be paused
220    #[cfg(feature = "hotshot-testing")]
221    is_paused: Arc<AtomicBool>,
222    // The receiver channel for
223    // request_receiver_channel: TakeReceiver,
224}
225
226/// The enum for the topics we can subscribe to in the Push CDN
227#[repr(u8)]
228#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)]
229pub enum Topic {
230    /// The global topic
231    Global = 0,
232    /// The DA topic
233    Da = 1,
234}
235
236/// Implement the `TopicTrait` for our `Topic` enum. We need this to filter
237/// topics that are not implemented at the application level.
238impl TopicTrait for Topic {}
239
240impl<K: SignatureKey + 'static> PushCdnNetwork<K> {
241    /// Create a new `PushCdnNetwork` (really a client) from a marshal endpoint, a list of initial
242    /// topics we are interested in, and our wrapped keypair that we use to authenticate with the
243    /// marshal.
244    ///
245    /// # Errors
246    /// If we fail to build the config
247    pub fn new(
248        marshal_endpoint: String,
249        topics: Vec<Topic>,
250        keypair: KeyPair<WrappedSignatureKey<K>>,
251        metrics: CdnMetricsValue,
252    ) -> anyhow::Result<Self> {
253        // Build config
254        let config = ClientConfig {
255            endpoint: marshal_endpoint,
256            subscribed_topics: topics.into_iter().map(|t| t as u8).collect(),
257            keypair: keypair.clone(),
258            use_local_authority: true,
259        };
260
261        // Create the client from the config
262        let client = Client::new(config);
263
264        Ok(Self {
265            client,
266            metrics: Arc::from(metrics),
267            internal_queue: Arc::new(Mutex::new(VecDeque::new())),
268            public_key: keypair.public_key.0,
269            // Start unpaused
270            #[cfg(feature = "hotshot-testing")]
271            is_paused: Arc::from(AtomicBool::new(false)),
272        })
273    }
274
275    /// Broadcast a message to members of the particular topic. Does not retry.
276    ///
277    /// # Errors
278    /// - If we fail to serialize the message
279    /// - If we fail to send the broadcast message.
280    async fn broadcast_message(&self, message: Vec<u8>, topic: Topic) -> Result<(), NetworkError> {
281        // If the message should also go to us, add it to the internal queue
282        if self
283            .client
284            .subscribed_topics
285            .read()
286            .await
287            .contains(&(topic.clone() as u8))
288        {
289            self.internal_queue.lock().push_back(message.clone());
290        }
291
292        // If we're paused, don't send the message
293        #[cfg(feature = "hotshot-testing")]
294        if self.is_paused.load(Ordering::Relaxed) {
295            return Ok(());
296        }
297
298        // Send the message
299        if let Err(err) = self
300            .client
301            .send_broadcast_message(vec![topic as u8], message)
302            .await
303        {
304            return Err(NetworkError::MessageReceiveError(format!(
305                "failed to send broadcast message: {err}"
306            )));
307        };
308
309        Ok(())
310    }
311}
312
313#[cfg(feature = "hotshot-testing")]
314impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
315    for PushCdnNetwork<TYPES::SignatureKey>
316{
317    /// Generate n Push CDN clients, a marshal, and two brokers (that run locally).
318    /// Uses a `SQLite` database instead of Redis.
319    #[allow(clippy::too_many_lines)]
320    fn generator(
321        _expected_node_count: usize,
322        _num_bootstrap: usize,
323        _network_id: usize,
324        da_committee_size: usize,
325        _reliability_config: Option<Box<dyn NetworkReliability>>,
326        _secondary_network_delay: Duration,
327        _connect_infos: &mut HashMap<TYPES::SignatureKey, PeerConnectInfo>,
328    ) -> AsyncGenerator<Arc<Self>> {
329        // The configuration we are using for testing is 2 brokers & 1 marshal
330
331        // A keypair shared between brokers
332        let (broker_public_key, broker_private_key) =
333            TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], 1337);
334
335        // Get the OS temporary directory
336        let temp_dir = std::env::temp_dir();
337
338        // Create an SQLite file inside of the temporary directory
339        let discovery_endpoint = temp_dir
340            .join(Path::new(&format!(
341                "test-{}.sqlite",
342                StdRng::from_entropy().next_u64()
343            )))
344            .to_string_lossy()
345            .into_owned();
346
347        // Atomically bind to unused public ports
348        let public_port_1 = reserve_tcp_port().expect("OS should have ephemeral ports available");
349        let public_port_2 = reserve_tcp_port().expect("OS should have ephemeral ports available");
350        let public_address_1 = format!("127.0.0.1:{public_port_1}");
351        let public_address_2 = format!("127.0.0.1:{public_port_2}");
352
353        // 2 brokers
354        for i in 0..2 {
355            // Atomically bind to a private port
356            let private_port =
357                reserve_tcp_port().expect("OS should have ephemeral ports available");
358
359            // Extrapolate addresses
360            let private_address = format!("127.0.0.1:{private_port}");
361            let (public_address, other_public_address) = if i == 0 {
362                (public_address_1.clone(), public_address_2.clone())
363            } else {
364                (public_address_2.clone(), public_address_1.clone())
365            };
366
367            // Calculate the broker identifiers
368            let broker_identifier = format!("{public_address}/{public_address}");
369            let other_broker_identifier = format!("{other_public_address}/{other_public_address}");
370
371            // Configure the broker
372            let config: BrokerConfig<TestingDef<TYPES::SignatureKey>> = BrokerConfig {
373                public_advertise_endpoint: public_address.clone(),
374                public_bind_endpoint: public_address,
375                private_advertise_endpoint: private_address.clone(),
376                private_bind_endpoint: private_address,
377                metrics_bind_endpoint: None,
378                keypair: KeyPair {
379                    public_key: WrappedSignatureKey(broker_public_key.clone()),
380                    private_key: broker_private_key.clone(),
381                },
382                discovery_endpoint: discovery_endpoint.clone(),
383
384                user_message_hook: NoMessageHook,
385                broker_message_hook: NoMessageHook,
386
387                ca_cert_path: None,
388                ca_key_path: None,
389                // 1GB
390                global_memory_pool_size: Some(1024 * 1024 * 1024),
391            };
392
393            // Create and spawn the broker
394            spawn(async move {
395                let broker: Broker<TestingDef<TYPES::SignatureKey>> =
396                    Broker::new(config).await.expect("broker failed to start");
397
398                // If we are the first broker by identifier, we need to sleep a bit
399                // for discovery to happen first
400                if other_broker_identifier > broker_identifier {
401                    sleep(Duration::from_secs(2)).await;
402                }
403
404                // Error if we stopped unexpectedly
405                if let Err(err) = broker.start().await {
406                    error!("broker stopped: {err}");
407                }
408            });
409        }
410
411        // Atomically bind to an available port for the marshal
412        let marshal_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
413
414        // Configure the marshal
415        let marshal_endpoint = format!("127.0.0.1:{marshal_port}");
416        let marshal_config = MarshalConfig {
417            bind_endpoint: marshal_endpoint.clone(),
418            discovery_endpoint,
419            metrics_bind_endpoint: None,
420            ca_cert_path: None,
421            ca_key_path: None,
422            // 1GB
423            global_memory_pool_size: Some(1024 * 1024 * 1024),
424        };
425
426        // Spawn the marshal
427        spawn(async move {
428            let marshal: Marshal<TestingDef<TYPES::SignatureKey>> = Marshal::new(marshal_config)
429                .await
430                .expect("failed to spawn marshal");
431
432            // Error if we stopped unexpectedly
433            if let Err(err) = marshal.start().await {
434                error!("marshal stopped: {err}");
435            }
436        });
437
438        // This function is called for each client we spawn
439        Box::pin({
440            move |node_id| {
441                // Clone this so we can pin the future
442                let marshal_endpoint = marshal_endpoint.clone();
443
444                Box::pin(async move {
445                    // Derive our public and priate keys from our index
446                    let private_key =
447                        TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
448                    let public_key = TYPES::SignatureKey::from_private(&private_key);
449
450                    // Calculate if we're DA or not
451                    let topics = if node_id < da_committee_size as u64 {
452                        vec![Topic::Da as u8, Topic::Global as u8]
453                    } else {
454                        vec![Topic::Global as u8]
455                    };
456
457                    // Configure our client
458                    let client_config: ClientConfig<ClientDef<TYPES::SignatureKey>> =
459                        ClientConfig {
460                            keypair: KeyPair {
461                                public_key: WrappedSignatureKey(public_key.clone()),
462                                private_key,
463                            },
464                            subscribed_topics: topics,
465                            endpoint: marshal_endpoint,
466                            use_local_authority: true,
467                        };
468
469                    // Create our client
470                    Arc::new(PushCdnNetwork {
471                        client: Client::new(client_config),
472                        metrics: Arc::new(CdnMetricsValue::default()),
473                        internal_queue: Arc::new(Mutex::new(VecDeque::new())),
474                        public_key,
475                        #[cfg(feature = "hotshot-testing")]
476                        is_paused: Arc::from(AtomicBool::new(false)),
477                    })
478                })
479            }
480        })
481    }
482
483    /// The PushCDN does not support in-flight message counts
484    fn in_flight_message_count(&self) -> Option<usize> {
485        None
486    }
487}
488
489#[async_trait]
490impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
491    /// Pause sending and receiving on the PushCDN network.
492    fn pause(&self) {
493        #[cfg(feature = "hotshot-testing")]
494        self.is_paused.store(true, Ordering::Relaxed);
495    }
496
497    /// Resume sending and receiving on the PushCDN network.
498    fn resume(&self) {
499        #[cfg(feature = "hotshot-testing")]
500        self.is_paused.store(false, Ordering::Relaxed);
501    }
502
503    /// Wait for the client to initialize the connection
504    async fn wait_for_ready(&self) {
505        let _ = self.client.ensure_initialized().await;
506    }
507
508    /// TODO: shut down the networks. Unneeded for testing.
509    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
510    where
511        'a: 'b,
512        Self: 'b,
513    {
514        boxed_sync(async move { self.client.close().await })
515    }
516
517    /// Broadcast a message to all members of the quorum.
518    ///
519    /// # Errors
520    /// - If we fail to serialize the message
521    /// - If we fail to send the broadcast message.
522    async fn broadcast_message(
523        &self,
524        _: ViewNumber,
525        message: Vec<u8>,
526        topic: HotShotTopic,
527        _broadcast_delay: BroadcastDelay,
528    ) -> Result<(), NetworkError> {
529        // If we're paused, don't send the message
530        #[cfg(feature = "hotshot-testing")]
531        if self.is_paused.load(Ordering::Relaxed) {
532            return Ok(());
533        }
534
535        // Broadcast the message
536        self.broadcast_message(message, topic.into())
537            .await
538            .inspect_err(|_e| {
539                self.metrics.num_failed_messages.add(1);
540            })
541    }
542
543    /// Broadcast a message to all members of the DA committee.
544    ///
545    /// # Errors
546    /// - If we fail to serialize the message
547    /// - If we fail to send the broadcast message.
548    async fn da_broadcast_message(
549        &self,
550        _: ViewNumber,
551        message: Vec<u8>,
552        _recipients: Vec<K>,
553        _broadcast_delay: BroadcastDelay,
554    ) -> Result<(), NetworkError> {
555        // If we're paused, don't send the message
556        #[cfg(feature = "hotshot-testing")]
557        if self.is_paused.load(Ordering::Relaxed) {
558            return Ok(());
559        }
560
561        // Broadcast the message
562        self.broadcast_message(message, Topic::Da)
563            .await
564            .inspect_err(|_e| {
565                self.metrics.num_failed_messages.add(1);
566            })
567    }
568
569    /// Send a direct message to a node with a particular key. Does not retry.
570    ///
571    /// - If we fail to serialize the message
572    /// - If we fail to send the direct message
573    async fn direct_message(
574        &self,
575        _: ViewNumber,
576        message: Vec<u8>,
577        recipient: K,
578    ) -> Result<(), NetworkError> {
579        // If the message is to ourselves, just add it to the internal queue
580        if recipient == self.public_key {
581            self.internal_queue.lock().push_back(message);
582            return Ok(());
583        }
584
585        // If we're paused, don't send the message
586        #[cfg(feature = "hotshot-testing")]
587        if self.is_paused.load(Ordering::Relaxed) {
588            return Ok(());
589        }
590
591        // Send the message
592        if let Err(e) = self
593            .client
594            .send_direct_message(&WrappedSignatureKey(recipient), message)
595            .await
596        {
597            self.metrics.num_failed_messages.add(1);
598            return Err(NetworkError::MessageSendError(format!(
599                "failed to send direct message: {e}"
600            )));
601        };
602
603        Ok(())
604    }
605
606    /// Receive a message. Is agnostic over `transmit_type`, which has an issue
607    /// to be removed anyway.
608    ///
609    /// # Errors
610    /// - If we fail to receive messages. Will trigger a retry automatically.
611    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
612        // If we have a message in the internal queue, return it
613        let queued_message = self.internal_queue.lock().pop_front();
614        if let Some(message) = queued_message {
615            return Ok(message);
616        }
617
618        // Receive a message from the network
619        let message = self.client.receive_message().await;
620
621        // If we're paused, receive but don't process messages
622        #[cfg(feature = "hotshot-testing")]
623        if self.is_paused.load(Ordering::Relaxed) {
624            sleep(Duration::from_millis(100)).await;
625            return Ok(vec![]);
626        }
627
628        // If it was an error, wait a bit and retry
629        let message = match message {
630            Ok(message) => message,
631            Err(error) => {
632                return Err(NetworkError::MessageReceiveError(format!(
633                    "failed to receive message: {error}"
634                )));
635            },
636        };
637
638        // Extract the underlying message
639        let (PushCdnMessage::Broadcast(Broadcast { message, topics: _ })
640        | PushCdnMessage::Direct(Direct {
641            message,
642            recipient: _,
643        })) = message
644        else {
645            return Ok(vec![]);
646        };
647
648        Ok(message)
649    }
650
651    /// Do nothing here, as we don't need to look up nodes.
652    fn queue_node_lookup(
653        &self,
654        _view_number: ViewNumber,
655        _pk: K,
656    ) -> Result<(), TrySendError<Option<(ViewNumber, K)>>> {
657        Ok(())
658    }
659}
660
661impl From<HotShotTopic> for Topic {
662    fn from(topic: HotShotTopic) -> Self {
663        match topic {
664            HotShotTopic::Global => Topic::Global,
665            HotShotTopic::Da => Topic::Da,
666        }
667    }
668}