hotshot/traits/networking/
cliquenet_network.rs1#[cfg(feature = "hotshot-testing")]
2use std::sync::Arc;
3#[cfg(feature = "hotshot-testing")]
4use std::time::Duration;
5
6use async_trait::async_trait;
7use cliquenet::{Address, Keypair, NetConf, PublicKey, Retry, SecretKey};
8use futures::future::ready;
9#[cfg(feature = "hotshot-testing")]
10use hotshot_types::traits::network::{
11 AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
12};
13use hotshot_types::{
14 boxed_sync,
15 data::{EpochNumber, ViewNumber},
16 epoch_membership::EpochMembershipCoordinator,
17 traits::{
18 metrics::Metrics,
19 network::{BroadcastDelay, ConnectedNetwork, NetworkError, Topic},
20 node_implementation::NodeType,
21 signature_key::{PrivateSignatureKey, SignatureKey},
22 },
23 BoxSyncFuture,
24};
25
26#[derive(Clone)]
27pub struct Cliquenet<T: NodeType> {
28 net: Retry<T::SignatureKey>,
29}
30
31impl<T: NodeType> Cliquenet<T> {
32 pub async fn create<A, B, P, M>(
33 name: &'static str,
34 key: T::SignatureKey,
35 keypair: Keypair,
36 addr: A,
37 parties: P,
38 metrics: M,
39 ) -> Result<Self, NetworkError>
40 where
41 A: Into<Address>,
42 B: Into<Address>,
43 P: IntoIterator<Item = (T::SignatureKey, PublicKey, B)>,
44 M: Metrics + 'static,
45 {
46 let cfg = NetConf::builder()
47 .name(name)
48 .label(key)
49 .keypair(keypair)
50 .bind(addr.into())
51 .parties(parties.into_iter().map(|(k, x, a)| (k, x, a.into())))
52 .metrics(Box::new(metrics))
53 .build();
54 let net = Retry::create(cfg)
55 .await
56 .map_err(|e| NetworkError::ListenError(format!("cliquenet creation failed: {e}")))?;
57 Ok(Self { net })
58 }
59}
60
61pub fn derive_keypair<K: SignatureKey>(k: &K::PrivateKey) -> Keypair {
62 SecretKey::from(blake3::derive_key("cliquenet key", &k.to_bytes())).into()
63}
64
65#[async_trait]
66impl<T: NodeType> ConnectedNetwork<T::SignatureKey> for Cliquenet<T> {
67 async fn broadcast_message(
68 &self,
69 v: ViewNumber,
70 m: Vec<u8>,
71 _: Topic,
72 _: BroadcastDelay,
73 ) -> Result<(), NetworkError> {
74 self.net.broadcast(*v, m).await.map_err(|e| {
75 NetworkError::MessageSendError(format!("cliquenet broadcast error: {e}"))
76 })?;
77 Ok(())
78 }
79
80 async fn da_broadcast_message(
81 &self,
82 v: ViewNumber,
83 m: Vec<u8>,
84 recipients: Vec<T::SignatureKey>,
85 _: BroadcastDelay,
86 ) -> Result<(), NetworkError> {
87 self.net.multicast(recipients, *v, m).await.map_err(|e| {
88 NetworkError::MessageSendError(format!("cliquenet da_broadcast error: {e}"))
89 })?;
90 Ok(())
91 }
92
93 async fn direct_message(
94 &self,
95 v: ViewNumber,
96 m: Vec<u8>,
97 recipient: T::SignatureKey,
98 ) -> Result<(), NetworkError> {
99 self.net
100 .unicast(recipient, *v, m)
101 .await
102 .map_err(|e| NetworkError::MessageSendError(format!("cliquenet unicast error: {e}")))?;
103 Ok(())
104 }
105
106 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
107 let (_src, data) =
108 self.net.receive().await.map_err(|e| {
109 NetworkError::MessageSendError(format!("cliquenet receive error: {e}"))
110 })?;
111 Ok(Vec::from(&data[..]))
112 }
113
114 async fn update_view<U>(
115 &self,
116 v: ViewNumber,
117 _: Option<EpochNumber>,
118 _: EpochMembershipCoordinator<U>,
119 ) where
120 U: NodeType<SignatureKey = T::SignatureKey>,
121 {
122 self.net.gc(*v)
123 }
124
125 async fn wait_for_ready(&self) {}
126
127 fn pause(&self) {
128 unimplemented!("Pausing not implemented for cliquenet");
129 }
130
131 fn resume(&self) {
132 unimplemented!("Resuming not implemented for cliquenet");
133 }
134
135 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
136 where
137 'a: 'b,
138 Self: 'b,
139 {
140 boxed_sync(ready(()))
141 }
142}
143
144#[cfg(feature = "hotshot-testing")]
145impl<T: NodeType> TestableNetworkingImplementation<T> for Cliquenet<T> {
146 fn generator(
147 expected_node_count: usize,
148 _num_bootstrap: usize,
149 _network_id: usize,
150 _da_committee_size: usize,
151 _reliability_config: Option<Box<dyn NetworkReliability>>,
152 _secondary_network_delay: Duration,
153 ) -> AsyncGenerator<Arc<Self>> {
154 let mut parties = Vec::new();
155 for i in 0..expected_node_count {
156 use std::net::Ipv4Addr;
157
158 use cliquenet::Address;
159
160 let secret = T::SignatureKey::generated_from_seed_indexed([0u8; 32], i as u64).1;
161 let public = T::SignatureKey::from_private(&secret);
162 let kpair = derive_keypair::<<T as NodeType>::SignatureKey>(&secret);
163 let port = portpicker::pick_unused_port().expect("an unused port is available");
164 let addr = Address::Inet(Ipv4Addr::LOCALHOST.into(), port);
165
166 parties.push((kpair, public, addr));
167 }
168
169 let parties = Arc::new(parties);
170
171 Box::pin(move |i| {
172 let parties = parties.clone();
173 let future = async move {
174 use hotshot_types::traits::metrics::NoMetrics;
175
176 let (s, k, a) = &parties[i as usize];
177 let it = parties
178 .iter()
179 .map(|(s, k, a)| (k.clone(), s.public_key(), a.clone()));
180 let net = Cliquenet::create("test", k.clone(), s.clone(), a.clone(), it, NoMetrics)
181 .await
182 .unwrap();
183 Arc::new(net)
184 };
185 Box::pin(future)
186 })
187 }
188
189 fn in_flight_message_count(&self) -> Option<usize> {
190 None
191 }
192}