hotshot/traits/networking/
push_cdn_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7#[cfg(feature = "hotshot-testing")]
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
10#[cfg(feature = "hotshot-testing")]
11use std::{path::Path, time::Duration};
12
13use async_trait::async_trait;
14use bincode::config::Options;
15use cdn_broker::reexports::{
16    connection::protocols::{Quic, Tcp},
17    def::{hook::NoMessageHook, ConnectionDef, RunDef, Topic as TopicTrait},
18    discovery::{Embedded, Redis},
19};
20#[cfg(feature = "hotshot-testing")]
21use cdn_broker::{Broker, Config as BrokerConfig};
22pub use cdn_client::reexports::crypto::signature::KeyPair;
23use cdn_client::{
24    reexports::{
25        crypto::signature::{Serializable, SignatureScheme},
26        message::{Broadcast, Direct, Message as PushCdnMessage},
27    },
28    Client, Config as ClientConfig,
29};
30#[cfg(feature = "hotshot-testing")]
31use cdn_marshal::{Config as MarshalConfig, Marshal};
32#[cfg(feature = "hotshot-testing")]
33use hotshot_types::traits::network::{
34    AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
35};
36use hotshot_types::{
37    boxed_sync,
38    data::ViewNumber,
39    traits::{
40        metrics::{Counter, Metrics, NoMetrics},
41        network::{BroadcastDelay, ConnectedNetwork, Topic as HotShotTopic},
42        node_implementation::NodeType,
43        signature_key::SignatureKey,
44    },
45    utils::bincode_opts,
46    BoxSyncFuture,
47};
48use num_enum::{IntoPrimitive, TryFromPrimitive};
49use parking_lot::Mutex;
50#[cfg(feature = "hotshot-testing")]
51use rand::{rngs::StdRng, RngCore, SeedableRng};
52use tokio::{spawn, sync::mpsc::error::TrySendError, time::sleep};
53#[cfg(feature = "hotshot-testing")]
54use tracing::error;
55
56use super::NetworkError;
57
58/// CDN-specific metrics
59#[derive(Clone)]
60pub struct CdnMetricsValue {
61    /// The number of failed messages
62    pub num_failed_messages: Box<dyn Counter>,
63}
64
65impl CdnMetricsValue {
66    /// Populate the metrics with the CDN-specific ones
67    pub fn new(metrics: &dyn Metrics) -> Self {
68        // Create a subgroup for the CDN
69        let subgroup = metrics.subgroup("cdn".into());
70
71        // Create the CDN-specific metrics
72        Self {
73            num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
74        }
75    }
76}
77
78impl Default for CdnMetricsValue {
79    // The default is empty metrics
80    fn default() -> Self {
81        Self::new(&*NoMetrics::boxed())
82    }
83}
84
85/// A wrapped `SignatureKey`. We need to implement the Push CDN's `SignatureScheme`
86/// trait in order to sign and verify messages to/from the CDN.
87#[derive(Clone, Eq, PartialEq)]
88pub struct WrappedSignatureKey<T: SignatureKey + 'static>(pub T);
89impl<T: SignatureKey> SignatureScheme for WrappedSignatureKey<T> {
90    type PrivateKey = T::PrivateKey;
91    type PublicKey = Self;
92
93    /// Sign a message of arbitrary data and return the serialized signature
94    fn sign(
95        private_key: &Self::PrivateKey,
96        namespace: &str,
97        message: &[u8],
98    ) -> anyhow::Result<Vec<u8>> {
99        // Combine the namespace and message into a single byte array
100        let message = [namespace.as_bytes(), message].concat();
101
102        let signature = T::sign(private_key, &message)?;
103        Ok(bincode_opts().serialize(&signature)?)
104    }
105
106    /// Verify a message of arbitrary data and return the result
107    fn verify(
108        public_key: &Self::PublicKey,
109        namespace: &str,
110        message: &[u8],
111        signature: &[u8],
112    ) -> bool {
113        // Deserialize the signature
114        let signature: T::PureAssembledSignatureType = match bincode_opts().deserialize(signature) {
115            Ok(key) => key,
116            Err(_) => return false,
117        };
118
119        // Combine the namespace and message into a single byte array
120        let message = [namespace.as_bytes(), message].concat();
121
122        public_key.0.validate(&signature, &message)
123    }
124}
125
126/// We need to implement the `Serializable` so the Push CDN can serialize the signatures
127/// and public keys and send them over the wire.
128impl<T: SignatureKey> Serializable for WrappedSignatureKey<T> {
129    fn serialize(&self) -> anyhow::Result<Vec<u8>> {
130        Ok(self.0.to_bytes())
131    }
132
133    fn deserialize(serialized: &[u8]) -> anyhow::Result<Self> {
134        Ok(WrappedSignatureKey(T::from_bytes(serialized)?))
135    }
136}
137
138/// The production run definition for the Push CDN.
139/// Uses the real protocols and a Redis discovery client.
140pub struct ProductionDef<K: SignatureKey + 'static>(PhantomData<K>);
141impl<K: SignatureKey + 'static> RunDef for ProductionDef<K> {
142    type User = UserDef<K>;
143    type Broker = BrokerDef<K>;
144    type DiscoveryClientType = Redis;
145    type Topic = Topic;
146}
147
148/// The user definition for the Push CDN.
149/// Uses the TCP+TLS protocol and untrusted middleware.
150pub struct UserDef<K: SignatureKey + 'static>(PhantomData<K>);
151impl<K: SignatureKey + 'static> ConnectionDef for UserDef<K> {
152    type Scheme = WrappedSignatureKey<K>;
153    type Protocol = Quic;
154    type MessageHook = NoMessageHook;
155}
156
157/// The broker definition for the Push CDN.
158/// Uses the TCP protocol and trusted middleware.
159pub struct BrokerDef<K: SignatureKey + 'static>(PhantomData<K>);
160impl<K: SignatureKey> ConnectionDef for BrokerDef<K> {
161    type Scheme = WrappedSignatureKey<K>;
162    type Protocol = Tcp;
163    type MessageHook = NoMessageHook;
164}
165
166/// The client definition for the Push CDN. Uses the TCP+TLS
167/// protocol and no middleware. Differs from the user
168/// definition in that is on the client-side.
169#[derive(Clone)]
170pub struct ClientDef<K: SignatureKey + 'static>(PhantomData<K>);
171impl<K: SignatureKey> ConnectionDef for ClientDef<K> {
172    type Scheme = WrappedSignatureKey<K>;
173    type Protocol = Quic;
174    type MessageHook = NoMessageHook;
175}
176
177/// The testing run definition for the Push CDN.
178/// Uses the real protocols, but with an embedded discovery client.
179pub struct TestingDef<K: SignatureKey + 'static>(PhantomData<K>);
180impl<K: SignatureKey + 'static> RunDef for TestingDef<K> {
181    type User = UserDef<K>;
182    type Broker = BrokerDef<K>;
183    type DiscoveryClientType = Embedded;
184    type Topic = Topic;
185}
186
187/// A communication channel to the Push CDN, which is a collection of brokers and a marshal
188/// that helps organize them all.
189#[derive(Clone)]
190/// Is generic over both the type of key and the network protocol.
191pub struct PushCdnNetwork<K: SignatureKey + 'static> {
192    /// The underlying client
193    client: Client<ClientDef<K>>,
194    /// The CDN-specific metrics
195    metrics: Arc<CdnMetricsValue>,
196    /// The internal queue for messages to ourselves
197    internal_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
198    /// The public key of this node
199    public_key: K,
200    /// Whether or not the underlying network is supposed to be paused
201    #[cfg(feature = "hotshot-testing")]
202    is_paused: Arc<AtomicBool>,
203    // The receiver channel for
204    // request_receiver_channel: TakeReceiver,
205}
206
207/// The enum for the topics we can subscribe to in the Push CDN
208#[repr(u8)]
209#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)]
210pub enum Topic {
211    /// The global topic
212    Global = 0,
213    /// The DA topic
214    Da = 1,
215}
216
217/// Implement the `TopicTrait` for our `Topic` enum. We need this to filter
218/// topics that are not implemented at the application level.
219impl TopicTrait for Topic {}
220
221impl<K: SignatureKey + 'static> PushCdnNetwork<K> {
222    /// Create a new `PushCdnNetwork` (really a client) from a marshal endpoint, a list of initial
223    /// topics we are interested in, and our wrapped keypair that we use to authenticate with the
224    /// marshal.
225    ///
226    /// # Errors
227    /// If we fail to build the config
228    pub fn new(
229        marshal_endpoint: String,
230        topics: Vec<Topic>,
231        keypair: KeyPair<WrappedSignatureKey<K>>,
232        metrics: CdnMetricsValue,
233    ) -> anyhow::Result<Self> {
234        // Build config
235        let config = ClientConfig {
236            endpoint: marshal_endpoint,
237            subscribed_topics: topics.into_iter().map(|t| t as u8).collect(),
238            keypair: keypair.clone(),
239            use_local_authority: true,
240        };
241
242        // Create the client from the config
243        let client = Client::new(config);
244
245        Ok(Self {
246            client,
247            metrics: Arc::from(metrics),
248            internal_queue: Arc::new(Mutex::new(VecDeque::new())),
249            public_key: keypair.public_key.0,
250            // Start unpaused
251            #[cfg(feature = "hotshot-testing")]
252            is_paused: Arc::from(AtomicBool::new(false)),
253        })
254    }
255
256    /// Broadcast a message to members of the particular topic. Does not retry.
257    ///
258    /// # Errors
259    /// - If we fail to serialize the message
260    /// - If we fail to send the broadcast message.
261    async fn broadcast_message(&self, message: Vec<u8>, topic: Topic) -> Result<(), NetworkError> {
262        // If we're paused, don't send the message
263        #[cfg(feature = "hotshot-testing")]
264        if self.is_paused.load(Ordering::Relaxed) {
265            return Ok(());
266        }
267
268        // Send the message
269        if let Err(err) = self
270            .client
271            .send_broadcast_message(vec![topic as u8], message)
272            .await
273        {
274            return Err(NetworkError::MessageReceiveError(format!(
275                "failed to send broadcast message: {err}"
276            )));
277        };
278
279        Ok(())
280    }
281}
282
283#[cfg(feature = "hotshot-testing")]
284impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
285    for PushCdnNetwork<TYPES::SignatureKey>
286{
287    /// Generate n Push CDN clients, a marshal, and two brokers (that run locally).
288    /// Uses a `SQLite` database instead of Redis.
289    #[allow(clippy::too_many_lines)]
290    fn generator(
291        _expected_node_count: usize,
292        _num_bootstrap: usize,
293        _network_id: usize,
294        da_committee_size: usize,
295        _reliability_config: Option<Box<dyn NetworkReliability>>,
296        _secondary_network_delay: Duration,
297    ) -> AsyncGenerator<Arc<Self>> {
298        // The configuration we are using for testing is 2 brokers & 1 marshal
299
300        // A keypair shared between brokers
301        let (broker_public_key, broker_private_key) =
302            TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], 1337);
303
304        // Get the OS temporary directory
305        let temp_dir = std::env::temp_dir();
306
307        // Create an SQLite file inside of the temporary directory
308        let discovery_endpoint = temp_dir
309            .join(Path::new(&format!(
310                "test-{}.sqlite",
311                StdRng::from_entropy().next_u64()
312            )))
313            .to_string_lossy()
314            .into_owned();
315
316        // Pick some unused public ports
317        let public_address_1 = format!(
318            "127.0.0.1:{}",
319            portpicker::pick_unused_port().expect("could not find an open port")
320        );
321        let public_address_2 = format!(
322            "127.0.0.1:{}",
323            portpicker::pick_unused_port().expect("could not find an open port")
324        );
325
326        // 2 brokers
327        for i in 0..2 {
328            // Get the ports to bind to
329            let private_port = portpicker::pick_unused_port().expect("could not find an open port");
330
331            // Extrapolate addresses
332            let private_address = format!("127.0.0.1:{private_port}");
333            let (public_address, other_public_address) = if i == 0 {
334                (public_address_1.clone(), public_address_2.clone())
335            } else {
336                (public_address_2.clone(), public_address_1.clone())
337            };
338
339            // Calculate the broker identifiers
340            let broker_identifier = format!("{public_address}/{public_address}");
341            let other_broker_identifier = format!("{other_public_address}/{other_public_address}");
342
343            // Configure the broker
344            let config: BrokerConfig<TestingDef<TYPES::SignatureKey>> = BrokerConfig {
345                public_advertise_endpoint: public_address.clone(),
346                public_bind_endpoint: public_address,
347                private_advertise_endpoint: private_address.clone(),
348                private_bind_endpoint: private_address,
349                metrics_bind_endpoint: None,
350                keypair: KeyPair {
351                    public_key: WrappedSignatureKey(broker_public_key.clone()),
352                    private_key: broker_private_key.clone(),
353                },
354                discovery_endpoint: discovery_endpoint.clone(),
355
356                user_message_hook: NoMessageHook,
357                broker_message_hook: NoMessageHook,
358
359                ca_cert_path: None,
360                ca_key_path: None,
361                // 1GB
362                global_memory_pool_size: Some(1024 * 1024 * 1024),
363            };
364
365            // Create and spawn the broker
366            spawn(async move {
367                let broker: Broker<TestingDef<TYPES::SignatureKey>> =
368                    Broker::new(config).await.expect("broker failed to start");
369
370                // If we are the first broker by identifier, we need to sleep a bit
371                // for discovery to happen first
372                if other_broker_identifier > broker_identifier {
373                    sleep(Duration::from_secs(2)).await;
374                }
375
376                // Error if we stopped unexpectedly
377                if let Err(err) = broker.start().await {
378                    error!("broker stopped: {err}");
379                }
380            });
381        }
382
383        // Get the port to use for the marshal
384        let marshal_port = portpicker::pick_unused_port().expect("could not find an open port");
385
386        // Configure the marshal
387        let marshal_endpoint = format!("127.0.0.1:{marshal_port}");
388        let marshal_config = MarshalConfig {
389            bind_endpoint: marshal_endpoint.clone(),
390            discovery_endpoint,
391            metrics_bind_endpoint: None,
392            ca_cert_path: None,
393            ca_key_path: None,
394            // 1GB
395            global_memory_pool_size: Some(1024 * 1024 * 1024),
396        };
397
398        // Spawn the marshal
399        spawn(async move {
400            let marshal: Marshal<TestingDef<TYPES::SignatureKey>> = Marshal::new(marshal_config)
401                .await
402                .expect("failed to spawn marshal");
403
404            // Error if we stopped unexpectedly
405            if let Err(err) = marshal.start().await {
406                error!("marshal stopped: {err}");
407            }
408        });
409
410        // This function is called for each client we spawn
411        Box::pin({
412            move |node_id| {
413                // Clone this so we can pin the future
414                let marshal_endpoint = marshal_endpoint.clone();
415
416                Box::pin(async move {
417                    // Derive our public and priate keys from our index
418                    let private_key =
419                        TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
420                    let public_key = TYPES::SignatureKey::from_private(&private_key);
421
422                    // Calculate if we're DA or not
423                    let topics = if node_id < da_committee_size as u64 {
424                        vec![Topic::Da as u8, Topic::Global as u8]
425                    } else {
426                        vec![Topic::Global as u8]
427                    };
428
429                    // Configure our client
430                    let client_config: ClientConfig<ClientDef<TYPES::SignatureKey>> =
431                        ClientConfig {
432                            keypair: KeyPair {
433                                public_key: WrappedSignatureKey(public_key.clone()),
434                                private_key,
435                            },
436                            subscribed_topics: topics,
437                            endpoint: marshal_endpoint,
438                            use_local_authority: true,
439                        };
440
441                    // Create our client
442                    Arc::new(PushCdnNetwork {
443                        client: Client::new(client_config),
444                        metrics: Arc::new(CdnMetricsValue::default()),
445                        internal_queue: Arc::new(Mutex::new(VecDeque::new())),
446                        public_key,
447                        #[cfg(feature = "hotshot-testing")]
448                        is_paused: Arc::from(AtomicBool::new(false)),
449                    })
450                })
451            }
452        })
453    }
454
455    /// The PushCDN does not support in-flight message counts
456    fn in_flight_message_count(&self) -> Option<usize> {
457        None
458    }
459}
460
461#[async_trait]
462impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
463    /// Pause sending and receiving on the PushCDN network.
464    fn pause(&self) {
465        #[cfg(feature = "hotshot-testing")]
466        self.is_paused.store(true, Ordering::Relaxed);
467    }
468
469    /// Resume sending and receiving on the PushCDN network.
470    fn resume(&self) {
471        #[cfg(feature = "hotshot-testing")]
472        self.is_paused.store(false, Ordering::Relaxed);
473    }
474
475    /// Wait for the client to initialize the connection
476    async fn wait_for_ready(&self) {
477        let _ = self.client.ensure_initialized().await;
478    }
479
480    /// TODO: shut down the networks. Unneeded for testing.
481    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
482    where
483        'a: 'b,
484        Self: 'b,
485    {
486        boxed_sync(async move { self.client.close().await })
487    }
488
489    /// Broadcast a message to all members of the quorum.
490    ///
491    /// # Errors
492    /// - If we fail to serialize the message
493    /// - If we fail to send the broadcast message.
494    async fn broadcast_message(
495        &self,
496        message: Vec<u8>,
497        topic: HotShotTopic,
498        _broadcast_delay: BroadcastDelay,
499    ) -> Result<(), NetworkError> {
500        // If we're paused, don't send the message
501        #[cfg(feature = "hotshot-testing")]
502        if self.is_paused.load(Ordering::Relaxed) {
503            return Ok(());
504        }
505        self.broadcast_message(message, topic.into())
506            .await
507            .inspect_err(|_e| {
508                self.metrics.num_failed_messages.add(1);
509            })
510    }
511
512    /// Broadcast a message to all members of the DA committee.
513    ///
514    /// # Errors
515    /// - If we fail to serialize the message
516    /// - If we fail to send the broadcast message.
517    async fn da_broadcast_message(
518        &self,
519        message: Vec<u8>,
520        _recipients: Vec<K>,
521        _broadcast_delay: BroadcastDelay,
522    ) -> Result<(), NetworkError> {
523        // If we're paused, don't send the message
524        #[cfg(feature = "hotshot-testing")]
525        if self.is_paused.load(Ordering::Relaxed) {
526            return Ok(());
527        }
528        self.broadcast_message(message, Topic::Da)
529            .await
530            .inspect_err(|_e| {
531                self.metrics.num_failed_messages.add(1);
532            })
533    }
534
535    /// Send a direct message to a node with a particular key. Does not retry.
536    ///
537    /// - If we fail to serialize the message
538    /// - If we fail to send the direct message
539    async fn direct_message(&self, message: Vec<u8>, recipient: K) -> Result<(), NetworkError> {
540        // If the message is to ourselves, just add it to the internal queue
541        if recipient == self.public_key {
542            self.internal_queue.lock().push_back(message);
543            return Ok(());
544        }
545
546        // If we're paused, don't send the message
547        #[cfg(feature = "hotshot-testing")]
548        if self.is_paused.load(Ordering::Relaxed) {
549            return Ok(());
550        }
551
552        // Send the message
553        if let Err(e) = self
554            .client
555            .send_direct_message(&WrappedSignatureKey(recipient), message)
556            .await
557        {
558            self.metrics.num_failed_messages.add(1);
559            return Err(NetworkError::MessageSendError(format!(
560                "failed to send direct message: {e}"
561            )));
562        };
563
564        Ok(())
565    }
566
567    /// Receive a message. Is agnostic over `transmit_type`, which has an issue
568    /// to be removed anyway.
569    ///
570    /// # Errors
571    /// - If we fail to receive messages. Will trigger a retry automatically.
572    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
573        // If we have a message in the internal queue, return it
574        if let Some(message) = self.internal_queue.lock().pop_front() {
575            return Ok(message);
576        }
577
578        // Receive a message from the network
579        let message = self.client.receive_message().await;
580
581        // If we're paused, receive but don't process messages
582        #[cfg(feature = "hotshot-testing")]
583        if self.is_paused.load(Ordering::Relaxed) {
584            sleep(Duration::from_millis(100)).await;
585            return Ok(vec![]);
586        }
587
588        // If it was an error, wait a bit and retry
589        let message = match message {
590            Ok(message) => message,
591            Err(error) => {
592                return Err(NetworkError::MessageReceiveError(format!(
593                    "failed to receive message: {error}"
594                )));
595            },
596        };
597
598        // Extract the underlying message
599        let (PushCdnMessage::Broadcast(Broadcast { message, topics: _ })
600        | PushCdnMessage::Direct(Direct {
601            message,
602            recipient: _,
603        })) = message
604        else {
605            return Ok(vec![]);
606        };
607
608        Ok(message)
609    }
610
611    /// Do nothing here, as we don't need to look up nodes.
612    fn queue_node_lookup(
613        &self,
614        _view_number: ViewNumber,
615        _pk: K,
616    ) -> Result<(), TrySendError<Option<(ViewNumber, K)>>> {
617        Ok(())
618    }
619}
620
621impl From<HotShotTopic> for Topic {
622    fn from(topic: HotShotTopic) -> Self {
623        match topic {
624            HotShotTopic::Global => Topic::Global,
625            HotShotTopic::Da => Topic::Da,
626        }
627    }
628}