hotshot/traits/networking/
cliquenet_network.rs

1#[cfg(feature = "hotshot-testing")]
2use std::sync::Arc;
3#[cfg(feature = "hotshot-testing")]
4use std::time::Duration;
5
6use async_trait::async_trait;
7use cliquenet::{Address, Keypair, NetConf, PublicKey, Retry, SecretKey};
8use futures::future::ready;
9#[cfg(feature = "hotshot-testing")]
10use hotshot_types::traits::network::{
11    AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
12};
13use hotshot_types::{
14    boxed_sync,
15    data::{EpochNumber, ViewNumber},
16    epoch_membership::EpochMembershipCoordinator,
17    traits::{
18        metrics::Metrics,
19        network::{BroadcastDelay, ConnectedNetwork, NetworkError, Topic},
20        node_implementation::NodeType,
21        signature_key::{PrivateSignatureKey, SignatureKey},
22    },
23    BoxSyncFuture,
24};
25
26#[derive(Clone)]
27pub struct Cliquenet<T: NodeType> {
28    net: Retry<T::SignatureKey>,
29}
30
31impl<T: NodeType> Cliquenet<T> {
32    pub async fn create<A, B, P, M>(
33        name: &'static str,
34        key: T::SignatureKey,
35        keypair: Keypair,
36        addr: A,
37        parties: P,
38        metrics: M,
39    ) -> Result<Self, NetworkError>
40    where
41        A: Into<Address>,
42        B: Into<Address>,
43        P: IntoIterator<Item = (T::SignatureKey, PublicKey, B)>,
44        M: Metrics + 'static,
45    {
46        let cfg = NetConf::builder()
47            .name(name)
48            .label(key)
49            .keypair(keypair)
50            .bind(addr.into())
51            .parties(parties.into_iter().map(|(k, x, a)| (k, x, a.into())))
52            .metrics(Box::new(metrics))
53            .build();
54        let net = Retry::create(cfg)
55            .await
56            .map_err(|e| NetworkError::ListenError(format!("cliquenet creation failed: {e}")))?;
57        Ok(Self { net })
58    }
59}
60
61pub fn derive_keypair<K: SignatureKey>(k: &K::PrivateKey) -> Keypair {
62    SecretKey::from(blake3::derive_key("cliquenet key", &k.to_bytes())).into()
63}
64
65#[async_trait]
66impl<T: NodeType> ConnectedNetwork<T::SignatureKey> for Cliquenet<T> {
67    async fn broadcast_message(
68        &self,
69        v: ViewNumber,
70        m: Vec<u8>,
71        _: Topic,
72        _: BroadcastDelay,
73    ) -> Result<(), NetworkError> {
74        self.net.broadcast(*v, m).await.map_err(|e| {
75            NetworkError::MessageSendError(format!("cliquenet broadcast error: {e}"))
76        })?;
77        Ok(())
78    }
79
80    async fn da_broadcast_message(
81        &self,
82        v: ViewNumber,
83        m: Vec<u8>,
84        recipients: Vec<T::SignatureKey>,
85        _: BroadcastDelay,
86    ) -> Result<(), NetworkError> {
87        self.net.multicast(recipients, *v, m).await.map_err(|e| {
88            NetworkError::MessageSendError(format!("cliquenet da_broadcast error: {e}"))
89        })?;
90        Ok(())
91    }
92
93    async fn direct_message(
94        &self,
95        v: ViewNumber,
96        m: Vec<u8>,
97        recipient: T::SignatureKey,
98    ) -> Result<(), NetworkError> {
99        self.net
100            .unicast(recipient, *v, m)
101            .await
102            .map_err(|e| NetworkError::MessageSendError(format!("cliquenet unicast error: {e}")))?;
103        Ok(())
104    }
105
106    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
107        let (_src, data) =
108            self.net.receive().await.map_err(|e| {
109                NetworkError::MessageSendError(format!("cliquenet receive error: {e}"))
110            })?;
111        Ok(Vec::from(&data[..]))
112    }
113
114    async fn update_view<U>(
115        &self,
116        v: ViewNumber,
117        _: Option<EpochNumber>,
118        _: EpochMembershipCoordinator<U>,
119    ) where
120        U: NodeType<SignatureKey = T::SignatureKey>,
121    {
122        self.net.gc(*v)
123    }
124
125    async fn wait_for_ready(&self) {}
126
127    fn pause(&self) {
128        unimplemented!("Pausing not implemented for cliquenet");
129    }
130
131    fn resume(&self) {
132        unimplemented!("Resuming not implemented for cliquenet");
133    }
134
135    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
136    where
137        'a: 'b,
138        Self: 'b,
139    {
140        boxed_sync(ready(()))
141    }
142}
143
144#[cfg(feature = "hotshot-testing")]
145impl<T: NodeType> TestableNetworkingImplementation<T> for Cliquenet<T> {
146    fn generator(
147        expected_node_count: usize,
148        _num_bootstrap: usize,
149        _network_id: usize,
150        _da_committee_size: usize,
151        _reliability_config: Option<Box<dyn NetworkReliability>>,
152        _secondary_network_delay: Duration,
153    ) -> AsyncGenerator<Arc<Self>> {
154        let mut parties = Vec::new();
155        for i in 0..expected_node_count {
156            use std::net::Ipv4Addr;
157
158            use cliquenet::Address;
159
160            let secret = T::SignatureKey::generated_from_seed_indexed([0u8; 32], i as u64).1;
161            let public = T::SignatureKey::from_private(&secret);
162            let kpair = derive_keypair::<<T as NodeType>::SignatureKey>(&secret);
163            let port = portpicker::pick_unused_port().expect("an unused port is available");
164            let addr = Address::Inet(Ipv4Addr::LOCALHOST.into(), port);
165
166            parties.push((kpair, public, addr));
167        }
168
169        let parties = Arc::new(parties);
170
171        Box::pin(move |i| {
172            let parties = parties.clone();
173            let future = async move {
174                use hotshot_types::traits::metrics::NoMetrics;
175
176                let (s, k, a) = &parties[i as usize];
177                let it = parties
178                    .iter()
179                    .map(|(s, k, a)| (k.clone(), s.public_key(), a.clone()));
180                let net = Cliquenet::create("test", k.clone(), s.clone(), a.clone(), it, NoMetrics)
181                    .await
182                    .unwrap();
183                Arc::new(net)
184            };
185            Box::pin(future)
186        })
187    }
188
189    fn in_flight_message_count(&self) -> Option<usize> {
190        None
191    }
192}