hotshot/traits/networking/
push_cdn_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7#[cfg(feature = "hotshot-testing")]
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
10#[cfg(feature = "hotshot-testing")]
11use std::{path::Path, time::Duration};
12
13use async_trait::async_trait;
14use bincode::config::Options;
15use cdn_broker::reexports::{
16    connection::protocols::{Quic, Tcp},
17    def::{hook::NoMessageHook, ConnectionDef, RunDef, Topic as TopicTrait},
18    discovery::{Embedded, Redis},
19};
20#[cfg(feature = "hotshot-testing")]
21use cdn_broker::{Broker, Config as BrokerConfig};
22pub use cdn_client::reexports::crypto::signature::KeyPair;
23use cdn_client::{
24    reexports::{
25        crypto::signature::{Serializable, SignatureScheme},
26        message::{Broadcast, Direct, Message as PushCdnMessage},
27    },
28    Client, Config as ClientConfig,
29};
30#[cfg(feature = "hotshot-testing")]
31use cdn_marshal::{Config as MarshalConfig, Marshal};
32#[cfg(feature = "hotshot-testing")]
33use hotshot_types::traits::network::{
34    AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
35};
36use hotshot_types::{
37    boxed_sync,
38    data::ViewNumber,
39    traits::{
40        metrics::{Counter, Metrics, NoMetrics},
41        network::{BroadcastDelay, ConnectedNetwork, Topic as HotShotTopic},
42        signature_key::SignatureKey,
43    },
44    utils::bincode_opts,
45    BoxSyncFuture,
46};
47use num_enum::{IntoPrimitive, TryFromPrimitive};
48use parking_lot::Mutex;
49#[cfg(feature = "hotshot-testing")]
50use rand::{rngs::StdRng, RngCore, SeedableRng};
51use tokio::sync::mpsc::error::TrySendError;
52#[cfg(feature = "hotshot-testing")]
53use tokio::{spawn, time::sleep};
54#[cfg(feature = "hotshot-testing")]
55use tracing::error;
56
57use super::NetworkError;
58#[cfg(feature = "hotshot-testing")]
59use crate::NodeType;
60
61/// CDN-specific metrics
62#[derive(Clone)]
63pub struct CdnMetricsValue {
64    /// The number of failed messages
65    pub num_failed_messages: Box<dyn Counter>,
66}
67
68impl CdnMetricsValue {
69    /// Populate the metrics with the CDN-specific ones
70    pub fn new(metrics: &dyn Metrics) -> Self {
71        // Create a subgroup for the CDN
72        let subgroup = metrics.subgroup("cdn".into());
73
74        // Create the CDN-specific metrics
75        Self {
76            num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
77        }
78    }
79}
80
81impl Default for CdnMetricsValue {
82    // The default is empty metrics
83    fn default() -> Self {
84        Self::new(&*NoMetrics::boxed())
85    }
86}
87
88/// A wrapped `SignatureKey`. We need to implement the Push CDN's `SignatureScheme`
89/// trait in order to sign and verify messages to/from the CDN.
90#[derive(Clone, Eq, PartialEq)]
91pub struct WrappedSignatureKey<T: SignatureKey + 'static>(pub T);
92impl<T: SignatureKey> SignatureScheme for WrappedSignatureKey<T> {
93    type PrivateKey = T::PrivateKey;
94    type PublicKey = Self;
95
96    /// Sign a message of arbitrary data and return the serialized signature
97    fn sign(
98        private_key: &Self::PrivateKey,
99        namespace: &str,
100        message: &[u8],
101    ) -> anyhow::Result<Vec<u8>> {
102        // Combine the namespace and message into a single byte array
103        let message = [namespace.as_bytes(), message].concat();
104
105        let signature = T::sign(private_key, &message)?;
106        Ok(bincode_opts().serialize(&signature)?)
107    }
108
109    /// Verify a message of arbitrary data and return the result
110    fn verify(
111        public_key: &Self::PublicKey,
112        namespace: &str,
113        message: &[u8],
114        signature: &[u8],
115    ) -> bool {
116        // Deserialize the signature
117        let signature: T::PureAssembledSignatureType = match bincode_opts().deserialize(signature) {
118            Ok(key) => key,
119            Err(_) => return false,
120        };
121
122        // Combine the namespace and message into a single byte array
123        let message = [namespace.as_bytes(), message].concat();
124
125        public_key.0.validate(&signature, &message)
126    }
127}
128
129/// We need to implement the `Serializable` so the Push CDN can serialize the signatures
130/// and public keys and send them over the wire.
131impl<T: SignatureKey> Serializable for WrappedSignatureKey<T> {
132    fn serialize(&self) -> anyhow::Result<Vec<u8>> {
133        Ok(self.0.to_bytes())
134    }
135
136    fn deserialize(serialized: &[u8]) -> anyhow::Result<Self> {
137        Ok(WrappedSignatureKey(T::from_bytes(serialized)?))
138    }
139}
140
141/// The production run definition for the Push CDN.
142/// Uses the real protocols and a Redis discovery client.
143pub struct ProductionDef<K: SignatureKey + 'static>(PhantomData<K>);
144impl<K: SignatureKey + 'static> RunDef for ProductionDef<K> {
145    type User = UserDefQuic<K>;
146    type User2 = UserDefTcp<K>;
147    type Broker = BrokerDef<K>;
148    type DiscoveryClientType = Redis;
149    type Topic = Topic;
150}
151
152/// The user definition for the Push CDN.
153/// Uses the Quic protocol and untrusted middleware.
154/// RM TODO: Remove this, switching to TCP singularly when everyone has updated
155pub struct UserDefQuic<K: SignatureKey + 'static>(PhantomData<K>);
156impl<K: SignatureKey + 'static> ConnectionDef for UserDefQuic<K> {
157    type Scheme = WrappedSignatureKey<K>;
158    type Protocol = Quic;
159    type MessageHook = NoMessageHook;
160}
161
162/// The user definition for the Push CDN.
163/// Uses the TCP protocol and untrusted middleware.
164pub struct UserDefTcp<K: SignatureKey + 'static>(PhantomData<K>);
165impl<K: SignatureKey + 'static> ConnectionDef for UserDefTcp<K> {
166    type Scheme = WrappedSignatureKey<K>;
167    type Protocol = Tcp;
168    type MessageHook = NoMessageHook;
169}
170
171/// The broker definition for the Push CDN.
172/// Uses the TCP protocol and trusted middleware.
173pub struct BrokerDef<K: SignatureKey + 'static>(PhantomData<K>);
174impl<K: SignatureKey> ConnectionDef for BrokerDef<K> {
175    type Scheme = WrappedSignatureKey<K>;
176    type Protocol = Tcp;
177    type MessageHook = NoMessageHook;
178}
179
180/// The client definition for the Push CDN. Uses the TCP+TLS
181/// protocol and no middleware. Differs from the user
182/// definition in that is on the client-side.
183#[derive(Clone)]
184pub struct ClientDef<K: SignatureKey + 'static>(PhantomData<K>);
185impl<K: SignatureKey> ConnectionDef for ClientDef<K> {
186    type Scheme = WrappedSignatureKey<K>;
187    type Protocol = Tcp;
188    type MessageHook = NoMessageHook;
189}
190
191/// The testing run definition for the Push CDN.
192/// Uses the real protocols, but with an embedded discovery client.
193pub struct TestingDef<K: SignatureKey + 'static>(PhantomData<K>);
194impl<K: SignatureKey + 'static> RunDef for TestingDef<K> {
195    type User = UserDefQuic<K>;
196    type User2 = UserDefTcp<K>;
197    type Broker = BrokerDef<K>;
198    type DiscoveryClientType = Embedded;
199    type Topic = Topic;
200}
201
202/// A communication channel to the Push CDN, which is a collection of brokers and a marshal
203/// that helps organize them all.
204#[derive(Clone)]
205/// Is generic over both the type of key and the network protocol.
206pub struct PushCdnNetwork<K: SignatureKey + 'static> {
207    /// The underlying client
208    client: Client<ClientDef<K>>,
209    /// The CDN-specific metrics
210    metrics: Arc<CdnMetricsValue>,
211    /// The internal queue for messages to ourselves
212    internal_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
213    /// The public key of this node
214    public_key: K,
215    /// Whether or not the underlying network is supposed to be paused
216    #[cfg(feature = "hotshot-testing")]
217    is_paused: Arc<AtomicBool>,
218    // The receiver channel for
219    // request_receiver_channel: TakeReceiver,
220}
221
222/// The enum for the topics we can subscribe to in the Push CDN
223#[repr(u8)]
224#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)]
225pub enum Topic {
226    /// The global topic
227    Global = 0,
228    /// The DA topic
229    Da = 1,
230}
231
232/// Implement the `TopicTrait` for our `Topic` enum. We need this to filter
233/// topics that are not implemented at the application level.
234impl TopicTrait for Topic {}
235
236impl<K: SignatureKey + 'static> PushCdnNetwork<K> {
237    /// Create a new `PushCdnNetwork` (really a client) from a marshal endpoint, a list of initial
238    /// topics we are interested in, and our wrapped keypair that we use to authenticate with the
239    /// marshal.
240    ///
241    /// # Errors
242    /// If we fail to build the config
243    pub fn new(
244        marshal_endpoint: String,
245        topics: Vec<Topic>,
246        keypair: KeyPair<WrappedSignatureKey<K>>,
247        metrics: CdnMetricsValue,
248    ) -> anyhow::Result<Self> {
249        // Build config
250        let config = ClientConfig {
251            endpoint: marshal_endpoint,
252            subscribed_topics: topics.into_iter().map(|t| t as u8).collect(),
253            keypair: keypair.clone(),
254            use_local_authority: true,
255        };
256
257        // Create the client from the config
258        let client = Client::new(config);
259
260        Ok(Self {
261            client,
262            metrics: Arc::from(metrics),
263            internal_queue: Arc::new(Mutex::new(VecDeque::new())),
264            public_key: keypair.public_key.0,
265            // Start unpaused
266            #[cfg(feature = "hotshot-testing")]
267            is_paused: Arc::from(AtomicBool::new(false)),
268        })
269    }
270
271    /// Broadcast a message to members of the particular topic. Does not retry.
272    ///
273    /// # Errors
274    /// - If we fail to serialize the message
275    /// - If we fail to send the broadcast message.
276    async fn broadcast_message(&self, message: Vec<u8>, topic: Topic) -> Result<(), NetworkError> {
277        // If the message should also go to us, add it to the internal queue
278        if self
279            .client
280            .subscribed_topics
281            .read()
282            .await
283            .contains(&(topic.clone() as u8))
284        {
285            self.internal_queue.lock().push_back(message.clone());
286        }
287
288        // If we're paused, don't send the message
289        #[cfg(feature = "hotshot-testing")]
290        if self.is_paused.load(Ordering::Relaxed) {
291            return Ok(());
292        }
293
294        // Send the message
295        if let Err(err) = self
296            .client
297            .send_broadcast_message(vec![topic as u8], message)
298            .await
299        {
300            return Err(NetworkError::MessageReceiveError(format!(
301                "failed to send broadcast message: {err}"
302            )));
303        };
304
305        Ok(())
306    }
307}
308
309#[cfg(feature = "hotshot-testing")]
310impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
311    for PushCdnNetwork<TYPES::SignatureKey>
312{
313    /// Generate n Push CDN clients, a marshal, and two brokers (that run locally).
314    /// Uses a `SQLite` database instead of Redis.
315    #[allow(clippy::too_many_lines)]
316    fn generator(
317        _expected_node_count: usize,
318        _num_bootstrap: usize,
319        _network_id: usize,
320        da_committee_size: usize,
321        _reliability_config: Option<Box<dyn NetworkReliability>>,
322        _secondary_network_delay: Duration,
323    ) -> AsyncGenerator<Arc<Self>> {
324        // The configuration we are using for testing is 2 brokers & 1 marshal
325
326        // A keypair shared between brokers
327        let (broker_public_key, broker_private_key) =
328            TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], 1337);
329
330        // Get the OS temporary directory
331        let temp_dir = std::env::temp_dir();
332
333        // Create an SQLite file inside of the temporary directory
334        let discovery_endpoint = temp_dir
335            .join(Path::new(&format!(
336                "test-{}.sqlite",
337                StdRng::from_entropy().next_u64()
338            )))
339            .to_string_lossy()
340            .into_owned();
341
342        // Pick some unused public ports
343        let public_address_1 = format!(
344            "127.0.0.1:{}",
345            portpicker::pick_unused_port().expect("could not find an open port")
346        );
347        let public_address_2 = format!(
348            "127.0.0.1:{}",
349            portpicker::pick_unused_port().expect("could not find an open port")
350        );
351
352        // 2 brokers
353        for i in 0..2 {
354            // Get the ports to bind to
355            let private_port = portpicker::pick_unused_port().expect("could not find an open port");
356
357            // Extrapolate addresses
358            let private_address = format!("127.0.0.1:{private_port}");
359            let (public_address, other_public_address) = if i == 0 {
360                (public_address_1.clone(), public_address_2.clone())
361            } else {
362                (public_address_2.clone(), public_address_1.clone())
363            };
364
365            // Calculate the broker identifiers
366            let broker_identifier = format!("{public_address}/{public_address}");
367            let other_broker_identifier = format!("{other_public_address}/{other_public_address}");
368
369            // Configure the broker
370            let config: BrokerConfig<TestingDef<TYPES::SignatureKey>> = BrokerConfig {
371                public_advertise_endpoint: public_address.clone(),
372                public_bind_endpoint: public_address,
373                private_advertise_endpoint: private_address.clone(),
374                private_bind_endpoint: private_address,
375                metrics_bind_endpoint: None,
376                keypair: KeyPair {
377                    public_key: WrappedSignatureKey(broker_public_key.clone()),
378                    private_key: broker_private_key.clone(),
379                },
380                discovery_endpoint: discovery_endpoint.clone(),
381
382                user_message_hook: NoMessageHook,
383                broker_message_hook: NoMessageHook,
384
385                ca_cert_path: None,
386                ca_key_path: None,
387                // 1GB
388                global_memory_pool_size: Some(1024 * 1024 * 1024),
389            };
390
391            // Create and spawn the broker
392            spawn(async move {
393                let broker: Broker<TestingDef<TYPES::SignatureKey>> =
394                    Broker::new(config).await.expect("broker failed to start");
395
396                // If we are the first broker by identifier, we need to sleep a bit
397                // for discovery to happen first
398                if other_broker_identifier > broker_identifier {
399                    sleep(Duration::from_secs(2)).await;
400                }
401
402                // Error if we stopped unexpectedly
403                if let Err(err) = broker.start().await {
404                    error!("broker stopped: {err}");
405                }
406            });
407        }
408
409        // Get the port to use for the marshal
410        let marshal_port = portpicker::pick_unused_port().expect("could not find an open port");
411
412        // Configure the marshal
413        let marshal_endpoint = format!("127.0.0.1:{marshal_port}");
414        let marshal_config = MarshalConfig {
415            bind_endpoint: marshal_endpoint.clone(),
416            discovery_endpoint,
417            metrics_bind_endpoint: None,
418            ca_cert_path: None,
419            ca_key_path: None,
420            // 1GB
421            global_memory_pool_size: Some(1024 * 1024 * 1024),
422        };
423
424        // Spawn the marshal
425        spawn(async move {
426            let marshal: Marshal<TestingDef<TYPES::SignatureKey>> = Marshal::new(marshal_config)
427                .await
428                .expect("failed to spawn marshal");
429
430            // Error if we stopped unexpectedly
431            if let Err(err) = marshal.start().await {
432                error!("marshal stopped: {err}");
433            }
434        });
435
436        // This function is called for each client we spawn
437        Box::pin({
438            move |node_id| {
439                // Clone this so we can pin the future
440                let marshal_endpoint = marshal_endpoint.clone();
441
442                Box::pin(async move {
443                    // Derive our public and priate keys from our index
444                    let private_key =
445                        TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
446                    let public_key = TYPES::SignatureKey::from_private(&private_key);
447
448                    // Calculate if we're DA or not
449                    let topics = if node_id < da_committee_size as u64 {
450                        vec![Topic::Da as u8, Topic::Global as u8]
451                    } else {
452                        vec![Topic::Global as u8]
453                    };
454
455                    // Configure our client
456                    let client_config: ClientConfig<ClientDef<TYPES::SignatureKey>> =
457                        ClientConfig {
458                            keypair: KeyPair {
459                                public_key: WrappedSignatureKey(public_key.clone()),
460                                private_key,
461                            },
462                            subscribed_topics: topics,
463                            endpoint: marshal_endpoint,
464                            use_local_authority: true,
465                        };
466
467                    // Create our client
468                    Arc::new(PushCdnNetwork {
469                        client: Client::new(client_config),
470                        metrics: Arc::new(CdnMetricsValue::default()),
471                        internal_queue: Arc::new(Mutex::new(VecDeque::new())),
472                        public_key,
473                        #[cfg(feature = "hotshot-testing")]
474                        is_paused: Arc::from(AtomicBool::new(false)),
475                    })
476                })
477            }
478        })
479    }
480
481    /// The PushCDN does not support in-flight message counts
482    fn in_flight_message_count(&self) -> Option<usize> {
483        None
484    }
485}
486
487#[async_trait]
488impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
489    /// Pause sending and receiving on the PushCDN network.
490    fn pause(&self) {
491        #[cfg(feature = "hotshot-testing")]
492        self.is_paused.store(true, Ordering::Relaxed);
493    }
494
495    /// Resume sending and receiving on the PushCDN network.
496    fn resume(&self) {
497        #[cfg(feature = "hotshot-testing")]
498        self.is_paused.store(false, Ordering::Relaxed);
499    }
500
501    /// Wait for the client to initialize the connection
502    async fn wait_for_ready(&self) {
503        let _ = self.client.ensure_initialized().await;
504    }
505
506    /// TODO: shut down the networks. Unneeded for testing.
507    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
508    where
509        'a: 'b,
510        Self: 'b,
511    {
512        boxed_sync(async move { self.client.close().await })
513    }
514
515    /// Broadcast a message to all members of the quorum.
516    ///
517    /// # Errors
518    /// - If we fail to serialize the message
519    /// - If we fail to send the broadcast message.
520    async fn broadcast_message(
521        &self,
522        message: Vec<u8>,
523        topic: HotShotTopic,
524        _broadcast_delay: BroadcastDelay,
525    ) -> Result<(), NetworkError> {
526        // If we're paused, don't send the message
527        #[cfg(feature = "hotshot-testing")]
528        if self.is_paused.load(Ordering::Relaxed) {
529            return Ok(());
530        }
531
532        // Broadcast the message
533        self.broadcast_message(message, topic.into())
534            .await
535            .inspect_err(|_e| {
536                self.metrics.num_failed_messages.add(1);
537            })
538    }
539
540    /// Broadcast a message to all members of the DA committee.
541    ///
542    /// # Errors
543    /// - If we fail to serialize the message
544    /// - If we fail to send the broadcast message.
545    async fn da_broadcast_message(
546        &self,
547        message: Vec<u8>,
548        _recipients: Vec<K>,
549        _broadcast_delay: BroadcastDelay,
550    ) -> Result<(), NetworkError> {
551        // If we're paused, don't send the message
552        #[cfg(feature = "hotshot-testing")]
553        if self.is_paused.load(Ordering::Relaxed) {
554            return Ok(());
555        }
556
557        // Broadcast the message
558        self.broadcast_message(message, Topic::Da)
559            .await
560            .inspect_err(|_e| {
561                self.metrics.num_failed_messages.add(1);
562            })
563    }
564
565    /// Send a direct message to a node with a particular key. Does not retry.
566    ///
567    /// - If we fail to serialize the message
568    /// - If we fail to send the direct message
569    async fn direct_message(&self, message: Vec<u8>, recipient: K) -> Result<(), NetworkError> {
570        // If the message is to ourselves, just add it to the internal queue
571        if recipient == self.public_key {
572            self.internal_queue.lock().push_back(message);
573            return Ok(());
574        }
575
576        // If we're paused, don't send the message
577        #[cfg(feature = "hotshot-testing")]
578        if self.is_paused.load(Ordering::Relaxed) {
579            return Ok(());
580        }
581
582        // Send the message
583        if let Err(e) = self
584            .client
585            .send_direct_message(&WrappedSignatureKey(recipient), message)
586            .await
587        {
588            self.metrics.num_failed_messages.add(1);
589            return Err(NetworkError::MessageSendError(format!(
590                "failed to send direct message: {e}"
591            )));
592        };
593
594        Ok(())
595    }
596
597    /// Receive a message. Is agnostic over `transmit_type`, which has an issue
598    /// to be removed anyway.
599    ///
600    /// # Errors
601    /// - If we fail to receive messages. Will trigger a retry automatically.
602    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
603        // If we have a message in the internal queue, return it
604        let queued_message = self.internal_queue.lock().pop_front();
605        if let Some(message) = queued_message {
606            return Ok(message);
607        }
608
609        // Receive a message from the network
610        let message = self.client.receive_message().await;
611
612        // If we're paused, receive but don't process messages
613        #[cfg(feature = "hotshot-testing")]
614        if self.is_paused.load(Ordering::Relaxed) {
615            sleep(Duration::from_millis(100)).await;
616            return Ok(vec![]);
617        }
618
619        // If it was an error, wait a bit and retry
620        let message = match message {
621            Ok(message) => message,
622            Err(error) => {
623                return Err(NetworkError::MessageReceiveError(format!(
624                    "failed to receive message: {error}"
625                )));
626            },
627        };
628
629        // Extract the underlying message
630        let (PushCdnMessage::Broadcast(Broadcast { message, topics: _ })
631        | PushCdnMessage::Direct(Direct {
632            message,
633            recipient: _,
634        })) = message
635        else {
636            return Ok(vec![]);
637        };
638
639        Ok(message)
640    }
641
642    /// Do nothing here, as we don't need to look up nodes.
643    fn queue_node_lookup(
644        &self,
645        _view_number: ViewNumber,
646        _pk: K,
647    ) -> Result<(), TrySendError<Option<(ViewNumber, K)>>> {
648        Ok(())
649    }
650}
651
652impl From<HotShotTopic> for Topic {
653    fn from(topic: HotShotTopic) -> Self {
654        match topic {
655            HotShotTopic::Global => Topic::Global,
656            HotShotTopic::Da => Topic::Da,
657        }
658    }
659}