hotshot/traits/networking/
cliquenet_network.rs1#[cfg(feature = "hotshot-testing")]
2use std::time::Duration;
3use std::{
4 collections::{HashMap, HashSet},
5 sync::Arc,
6};
7
8use async_trait::async_trait;
9use cliquenet::{NetConf, NetworkDown, Retry, Role};
10#[cfg(feature = "hotshot-testing")]
11use hotshot_types::traits::network::{
12 AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
13};
14use hotshot_types::{
15 BoxSyncFuture, PeerConnectInfo,
16 addr::NetAddr,
17 boxed_sync,
18 data::{EpochNumber, ViewNumber},
19 epoch_membership::EpochMembershipCoordinator,
20 stake_table::HSStakeTable,
21 traits::{
22 metrics::Metrics,
23 network::{BroadcastDelay, ConnectedNetwork, NetworkError, Topic},
24 node_implementation::NodeType,
25 signature_key::{SignatureKey, StakeTableEntryType},
26 },
27 x25519::Keypair,
28};
29use tokio::sync::Mutex;
30use tracing::{error, info};
31
32#[derive(Clone)]
33pub struct Cliquenet<K> {
34 net: Retry<K>,
35 inner: Arc<Mutex<Inner<K>>>,
36}
37
38#[derive(Clone)]
39struct Inner<K> {
40 epoch: EpochNumber,
41 peers: HashMap<K, PeerConnectInfo>,
42}
43
44impl<K: SignatureKey + 'static> Cliquenet<K> {
45 pub async fn create<A, P>(
46 name: &'static str,
47 key: K,
48 keypair: Keypair,
49 addr: A,
50 parties: P,
51 metrics: Box<dyn Metrics>,
52 ) -> Result<Self, NetworkError>
53 where
54 A: Into<NetAddr>,
55 P: IntoIterator<Item = (K, PeerConnectInfo)>,
56 {
57 let parties: HashMap<K, PeerConnectInfo> = parties.into_iter().collect();
58
59 let cfg = NetConf::builder()
60 .name(name)
61 .label(key)
62 .keypair(keypair)
63 .bind(addr.into())
64 .parties(
65 parties
66 .iter()
67 .map(|(k, info)| (k.clone(), info.x25519_key, info.p2p_addr.clone())),
68 )
69 .metrics(metrics)
70 .build();
71
72 let net = Retry::create(cfg)
73 .await
74 .map_err(|e| NetworkError::ListenError(format!("cliquenet creation failed: {e}")))?;
75
76 info!(peers = %parties.len(), "cliquenet created");
77
78 Ok(Self {
79 net,
80 inner: Arc::new(Mutex::new(Inner {
81 epoch: EpochNumber::genesis(),
82 peers: parties,
83 })),
84 })
85 }
86
87 pub fn peers(&self) -> Vec<K> {
89 self.net.parties(None)
90 }
91
92 async fn on_epoch_change<U>(&self, epoch: EpochNumber, coord: &EpochMembershipCoordinator<U>)
100 where
101 U: NodeType<SignatureKey = K>,
102 {
103 let connect_infos = |a: HSStakeTable<U>, b: HSStakeTable<U>| {
105 a.0.into_iter()
106 .chain(b.0)
107 .map(|m| (m.stake_table_entry.public_key(), m.connect_info))
108 .collect()
109 };
110
111 let mut inner = self.inner.lock().await;
112
113 if epoch <= inner.epoch {
114 info!(%epoch, ours = %inner.epoch, "epoch already seen");
115 return;
116 }
117
118 let curr_infos = {
120 let Ok(membership) = coord.stake_table_for_epoch(Some(epoch)).await else {
121 error!(%epoch, "no stake table available");
122 return;
123 };
124 let st = membership.stake_table().await;
125 let da = membership.da_stake_table().await;
126 connect_infos(st, da)
127 };
128
129 let prev_infos = if *epoch > 0 {
131 if let Ok(membership) = coord.stake_table_for_epoch(Some(epoch - 1)).await {
132 let st = membership.stake_table().await;
133 let da = membership.da_stake_table().await;
134 connect_infos(st, da)
135 } else {
136 info!(%epoch, "previous epoch's stake table unavailable");
137 HashMap::new()
138 }
139 } else {
140 HashMap::new()
141 };
142
143 let next_infos = {
145 if let Ok(membership) = coord.stake_table_for_epoch(Some(epoch + 1)).await {
146 let st = membership.stake_table().await;
147 let da = membership.da_stake_table().await;
148 connect_infos(st, da)
149 } else {
150 info!(%epoch, "next epoch's stake table not available");
151 HashMap::new()
152 }
153 };
154
155 let mut merged_infos = prev_infos.clone();
158 for (k, v) in curr_infos.iter().chain(&next_infos) {
159 merged_infos.insert(k.clone(), v.clone());
160 }
161
162 let wanted: HashSet<K> = curr_infos
163 .keys()
164 .chain(next_infos.keys())
165 .cloned()
166 .collect();
167
168 let retained: HashSet<K> = curr_infos
169 .keys()
170 .chain(prev_infos.keys())
171 .cloned()
172 .collect();
173
174 let mut to_add = Vec::new();
175 let mut to_del = Vec::new();
176
177 for k in &wanted {
178 if let Some(Some(new_info)) = merged_infos.get(k) {
179 if Some(new_info) != inner.peers.get(k) {
180 info!(%epoch, peer = %k, "adding/updating network peer");
181 to_add.push((k.clone(), new_info.x25519_key, new_info.p2p_addr.clone()));
182 } else {
183 info!(%epoch, peer = %k, "peer unchanged");
184 }
185 } else {
186 info!(%epoch, peer = %k, "ignoring peer without connection info");
187 }
188 }
189
190 for p in inner.peers.keys() {
192 if !(retained.contains(p) || wanted.contains(p)) {
193 info!(%epoch, peer = %p, "removing network peer");
194 to_del.push(p.clone());
195 }
196 }
197
198 for k in &to_del {
201 inner.peers.remove(k);
202 }
203
204 for (k, x, a) in to_add.iter().cloned() {
205 inner.peers.insert(
206 k,
207 PeerConnectInfo {
208 x25519_key: x,
209 p2p_addr: a,
210 },
211 );
212 }
213
214 if let Err(err) = self.net.add(Role::Active, to_add).await {
215 let _: NetworkDown = err;
216 error!(%epoch, "network down; could not add peers to network");
217 return;
218 }
219
220 if let Err(err) = self.net.remove(to_del).await {
221 let _: NetworkDown = err;
222 error!(%epoch, "network down; could not remove peers from network");
223 return;
224 }
225
226 debug_assert_eq! {
227 HashSet::<K>::from_iter(self.net.parties(None)),
228 HashSet::<K>::from_iter(inner.peers.keys().cloned())
229 }
230
231 info!(%epoch, peers = %inner.peers.len());
232
233 inner.epoch = EpochNumber::from(*epoch);
234 }
235}
236
237#[async_trait]
238impl<K: SignatureKey + 'static> ConnectedNetwork<K> for Cliquenet<K> {
239 async fn broadcast_message(
240 &self,
241 v: ViewNumber,
242 m: Vec<u8>,
243 _: Topic,
244 _: BroadcastDelay,
245 ) -> Result<(), NetworkError> {
246 self.net.broadcast(*v, m).await.map_err(|e| {
247 NetworkError::MessageSendError(format!("cliquenet broadcast error: {e}"))
248 })?;
249 Ok(())
250 }
251
252 async fn da_broadcast_message(
253 &self,
254 v: ViewNumber,
255 m: Vec<u8>,
256 recipients: Vec<K>,
257 _: BroadcastDelay,
258 ) -> Result<(), NetworkError> {
259 self.net.multicast(recipients, *v, m).await.map_err(|e| {
260 NetworkError::MessageSendError(format!("cliquenet da_broadcast error: {e}"))
261 })?;
262 Ok(())
263 }
264
265 async fn direct_message(
266 &self,
267 v: ViewNumber,
268 m: Vec<u8>,
269 recipient: K,
270 ) -> Result<(), NetworkError> {
271 self.net
272 .unicast(recipient, *v, m)
273 .await
274 .map_err(|e| NetworkError::MessageSendError(format!("cliquenet unicast error: {e}")))?;
275 Ok(())
276 }
277
278 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
279 let (_src, data) =
280 self.net.receive().await.map_err(|e| {
281 NetworkError::MessageSendError(format!("cliquenet receive error: {e}"))
282 })?;
283 Ok(Vec::from(&data[..]))
284 }
285
286 async fn update_view<U>(
287 &self,
288 v: ViewNumber,
289 e: Option<EpochNumber>,
290 m: EpochMembershipCoordinator<U>,
291 ) where
292 U: NodeType<SignatureKey = K>,
293 {
294 self.net.gc(*v);
295
296 if let Some(e) = e {
297 self.on_epoch_change(e, &m).await
298 }
299 }
300
301 async fn wait_for_ready(&self) {}
302
303 fn pause(&self) {}
304
305 fn resume(&self) {}
306
307 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
308 where
309 'a: 'b,
310 Self: 'b,
311 {
312 boxed_sync(self.net.close())
313 }
314}
315
316#[cfg(feature = "hotshot-testing")]
317impl<T: NodeType> TestableNetworkingImplementation<T> for Cliquenet<T::SignatureKey> {
318 fn generator(
319 nodes: usize,
320 _num_bootstrap: usize,
321 _network_id: usize,
322 _da_committee_size: usize,
323 _reliability_config: Option<Box<dyn NetworkReliability>>,
324 _secondary_network_delay: Duration,
325 connect_infos: &mut HashMap<T::SignatureKey, PeerConnectInfo>,
326 ) -> AsyncGenerator<Arc<Self>> {
327 let parties = {
328 let p = gen_parties::<T::SignatureKey>()
329 .take(nodes)
330 .collect::<Vec<_>>();
331 Arc::new(p)
332 };
333 for (s, k, a) in &*parties {
334 connect_infos.insert(
335 k.clone(),
336 PeerConnectInfo {
337 x25519_key: s.public_key(),
338 p2p_addr: a.clone(),
339 },
340 );
341 }
342 Box::pin(move |i| {
343 let parties = parties.clone();
344 let future = async move {
345 use hotshot_types::traits::metrics::NoMetrics;
346
347 let (s, k, a) = &parties[i as usize];
348 let it = parties.iter().map(|(s, k, a)| {
349 (
350 k.clone(),
351 PeerConnectInfo {
352 x25519_key: s.public_key(),
353 p2p_addr: a.clone(),
354 },
355 )
356 });
357 let met = Box::new(NoMetrics);
358 let net = Cliquenet::create("test", k.clone(), s.clone(), a.clone(), it, met)
359 .await
360 .unwrap();
361 Arc::new(net)
362 };
363 Box::pin(future)
364 })
365 }
366
367 fn in_flight_message_count(&self) -> Option<usize> {
368 None
369 }
370}
371
372#[cfg(feature = "hotshot-testing")]
376fn gen_parties<K: SignatureKey>() -> impl Iterator<Item = (Keypair, K, NetAddr)> {
377 let mut i = 0u64;
378 std::iter::repeat_with(move || {
379 let secret = K::generated_from_seed_indexed([0u8; 32], i).1;
380 let public = K::from_private(&secret);
381 let kpair = Keypair::derive_from::<K>(&secret);
382 let port =
383 test_utils::reserve_tcp_port().expect("OS should have ephemeral ports available");
384 let addr = NetAddr::Inet(std::net::Ipv4Addr::LOCALHOST.into(), port);
385 i += 1;
386 (kpair, public, addr)
387 })
388}