hotshot/traits/networking/
cliquenet_network.rs

1#[cfg(feature = "hotshot-testing")]
2use std::time::Duration;
3use std::{
4    collections::{HashMap, HashSet},
5    sync::Arc,
6};
7
8use async_trait::async_trait;
9use cliquenet::{NetConf, NetworkDown, Retry, Role};
10#[cfg(feature = "hotshot-testing")]
11use hotshot_types::traits::network::{
12    AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
13};
14use hotshot_types::{
15    BoxSyncFuture, PeerConnectInfo,
16    addr::NetAddr,
17    boxed_sync,
18    data::{EpochNumber, ViewNumber},
19    epoch_membership::EpochMembershipCoordinator,
20    stake_table::HSStakeTable,
21    traits::{
22        metrics::Metrics,
23        network::{BroadcastDelay, ConnectedNetwork, NetworkError, Topic},
24        node_implementation::NodeType,
25        signature_key::{SignatureKey, StakeTableEntryType},
26    },
27    x25519::Keypair,
28};
29use tokio::sync::Mutex;
30use tracing::{error, info};
31
32#[derive(Clone)]
33pub struct Cliquenet<K> {
34    net: Retry<K>,
35    inner: Arc<Mutex<Inner<K>>>,
36}
37
38#[derive(Clone)]
39struct Inner<K> {
40    epoch: EpochNumber,
41    peers: HashMap<K, PeerConnectInfo>,
42}
43
44impl<K: SignatureKey + 'static> Cliquenet<K> {
45    pub async fn create<A, P>(
46        name: &'static str,
47        key: K,
48        keypair: Keypair,
49        addr: A,
50        parties: P,
51        metrics: Box<dyn Metrics>,
52    ) -> Result<Self, NetworkError>
53    where
54        A: Into<NetAddr>,
55        P: IntoIterator<Item = (K, PeerConnectInfo)>,
56    {
57        let parties: HashMap<K, PeerConnectInfo> = parties.into_iter().collect();
58
59        let cfg = NetConf::builder()
60            .name(name)
61            .label(key)
62            .keypair(keypair)
63            .bind(addr.into())
64            .parties(
65                parties
66                    .iter()
67                    .map(|(k, info)| (k.clone(), info.x25519_key, info.p2p_addr.clone())),
68            )
69            .metrics(metrics)
70            .build();
71
72        let net = Retry::create(cfg)
73            .await
74            .map_err(|e| NetworkError::ListenError(format!("cliquenet creation failed: {e}")))?;
75
76        info!(peers = %parties.len(), "cliquenet created");
77
78        Ok(Self {
79            net,
80            inner: Arc::new(Mutex::new(Inner {
81                epoch: EpochNumber::genesis(),
82                peers: parties,
83            })),
84        })
85    }
86
87    /// Get the current network peers.
88    pub fn peers(&self) -> Vec<K> {
89        self.net.parties(None)
90    }
91
92    /// Update peers on every epoch change.
93    ///
94    /// For any given epoch `e` we collect the validators of `e`, `e-1` and
95    /// `e+1` from the stake tables and merge their connection information.
96    ///
97    /// We keep validator that were in `e-1` but not in `e` for one additional
98    /// epoch and eagerly connect to new validators of `e+1`.
99    async fn on_epoch_change<U>(&self, epoch: EpochNumber, coord: &EpochMembershipCoordinator<U>)
100    where
101        U: NodeType<SignatureKey = K>,
102    {
103        // Collect peer connect infos from stake table.
104        let connect_infos = |a: HSStakeTable<U>, b: HSStakeTable<U>| {
105            a.0.into_iter()
106                .chain(b.0)
107                .map(|m| (m.stake_table_entry.public_key(), m.connect_info))
108                .collect()
109        };
110
111        let mut inner = self.inner.lock().await;
112
113        if epoch <= inner.epoch {
114            info!(%epoch, ours = %inner.epoch, "epoch already seen");
115            return;
116        }
117
118        // Validators of the new epoch.
119        let curr_infos = {
120            let Ok(membership) = coord.stake_table_for_epoch(Some(epoch)).await else {
121                error!(%epoch, "no stake table available");
122                return;
123            };
124            let st = membership.stake_table().await;
125            let da = membership.da_stake_table().await;
126            connect_infos(st, da)
127        };
128
129        // Validators leaving are retained as peers for one additional epoch.
130        let prev_infos = if *epoch > 0 {
131            if let Ok(membership) = coord.stake_table_for_epoch(Some(epoch - 1)).await {
132                let st = membership.stake_table().await;
133                let da = membership.da_stake_table().await;
134                connect_infos(st, da)
135            } else {
136                info!(%epoch, "previous epoch's stake table unavailable");
137                HashMap::new()
138            }
139        } else {
140            HashMap::new()
141        };
142
143        // Validators joining in the next epoch are connected to early.
144        let next_infos = {
145            if let Ok(membership) = coord.stake_table_for_epoch(Some(epoch + 1)).await {
146                let st = membership.stake_table().await;
147                let da = membership.da_stake_table().await;
148                connect_infos(st, da)
149            } else {
150                info!(%epoch, "next epoch's stake table not available");
151                HashMap::new()
152            }
153        };
154
155        // Since connection information may be updated, we need to merge them,
156        // preferring the newest epoch's data, i.e. `next(curr(prev))`.
157        let mut merged_infos = prev_infos.clone();
158        for (k, v) in curr_infos.iter().chain(&next_infos) {
159            merged_infos.insert(k.clone(), v.clone());
160        }
161
162        let wanted: HashSet<K> = curr_infos
163            .keys()
164            .chain(next_infos.keys())
165            .cloned()
166            .collect();
167
168        let retained: HashSet<K> = curr_infos
169            .keys()
170            .chain(prev_infos.keys())
171            .cloned()
172            .collect();
173
174        let mut to_add = Vec::new();
175        let mut to_del = Vec::new();
176
177        for k in &wanted {
178            if let Some(Some(new_info)) = merged_infos.get(k) {
179                if Some(new_info) != inner.peers.get(k) {
180                    info!(%epoch, peer = %k, "adding/updating network peer");
181                    to_add.push((k.clone(), new_info.x25519_key, new_info.p2p_addr.clone()));
182                } else {
183                    info!(%epoch, peer = %k, "peer unchanged");
184                }
185            } else {
186                info!(%epoch, peer  = %k, "ignoring peer without connection info");
187            }
188        }
189
190        // Remove peers that have left both the current and previous epoch's stake tables.
191        for p in inner.peers.keys() {
192            if !(retained.contains(p) || wanted.contains(p)) {
193                info!(%epoch, peer = %p, "removing network peer");
194                to_del.push(p.clone());
195            }
196        }
197
198        // Perform the updates:
199
200        for k in &to_del {
201            inner.peers.remove(k);
202        }
203
204        for (k, x, a) in to_add.iter().cloned() {
205            inner.peers.insert(
206                k,
207                PeerConnectInfo {
208                    x25519_key: x,
209                    p2p_addr: a,
210                },
211            );
212        }
213
214        if let Err(err) = self.net.add(Role::Active, to_add).await {
215            let _: NetworkDown = err;
216            error!(%epoch, "network down; could not add peers to network");
217            return;
218        }
219
220        if let Err(err) = self.net.remove(to_del).await {
221            let _: NetworkDown = err;
222            error!(%epoch, "network down; could not remove peers from network");
223            return;
224        }
225
226        debug_assert_eq! {
227            HashSet::<K>::from_iter(self.net.parties(None)),
228            HashSet::<K>::from_iter(inner.peers.keys().cloned())
229        }
230
231        info!(%epoch, peers = %inner.peers.len());
232
233        inner.epoch = EpochNumber::from(*epoch);
234    }
235}
236
237#[async_trait]
238impl<K: SignatureKey + 'static> ConnectedNetwork<K> for Cliquenet<K> {
239    async fn broadcast_message(
240        &self,
241        v: ViewNumber,
242        m: Vec<u8>,
243        _: Topic,
244        _: BroadcastDelay,
245    ) -> Result<(), NetworkError> {
246        self.net.broadcast(*v, m).await.map_err(|e| {
247            NetworkError::MessageSendError(format!("cliquenet broadcast error: {e}"))
248        })?;
249        Ok(())
250    }
251
252    async fn da_broadcast_message(
253        &self,
254        v: ViewNumber,
255        m: Vec<u8>,
256        recipients: Vec<K>,
257        _: BroadcastDelay,
258    ) -> Result<(), NetworkError> {
259        self.net.multicast(recipients, *v, m).await.map_err(|e| {
260            NetworkError::MessageSendError(format!("cliquenet da_broadcast error: {e}"))
261        })?;
262        Ok(())
263    }
264
265    async fn direct_message(
266        &self,
267        v: ViewNumber,
268        m: Vec<u8>,
269        recipient: K,
270    ) -> Result<(), NetworkError> {
271        self.net
272            .unicast(recipient, *v, m)
273            .await
274            .map_err(|e| NetworkError::MessageSendError(format!("cliquenet unicast error: {e}")))?;
275        Ok(())
276    }
277
278    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
279        let (_src, data) =
280            self.net.receive().await.map_err(|e| {
281                NetworkError::MessageSendError(format!("cliquenet receive error: {e}"))
282            })?;
283        Ok(Vec::from(&data[..]))
284    }
285
286    async fn update_view<U>(
287        &self,
288        v: ViewNumber,
289        e: Option<EpochNumber>,
290        m: EpochMembershipCoordinator<U>,
291    ) where
292        U: NodeType<SignatureKey = K>,
293    {
294        self.net.gc(*v);
295
296        if let Some(e) = e {
297            self.on_epoch_change(e, &m).await
298        }
299    }
300
301    async fn wait_for_ready(&self) {}
302
303    fn pause(&self) {}
304
305    fn resume(&self) {}
306
307    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
308    where
309        'a: 'b,
310        Self: 'b,
311    {
312        boxed_sync(self.net.close())
313    }
314}
315
316#[cfg(feature = "hotshot-testing")]
317impl<T: NodeType> TestableNetworkingImplementation<T> for Cliquenet<T::SignatureKey> {
318    fn generator(
319        nodes: usize,
320        _num_bootstrap: usize,
321        _network_id: usize,
322        _da_committee_size: usize,
323        _reliability_config: Option<Box<dyn NetworkReliability>>,
324        _secondary_network_delay: Duration,
325        connect_infos: &mut HashMap<T::SignatureKey, PeerConnectInfo>,
326    ) -> AsyncGenerator<Arc<Self>> {
327        let parties = {
328            let p = gen_parties::<T::SignatureKey>()
329                .take(nodes)
330                .collect::<Vec<_>>();
331            Arc::new(p)
332        };
333        for (s, k, a) in &*parties {
334            connect_infos.insert(
335                k.clone(),
336                PeerConnectInfo {
337                    x25519_key: s.public_key(),
338                    p2p_addr: a.clone(),
339                },
340            );
341        }
342        Box::pin(move |i| {
343            let parties = parties.clone();
344            let future = async move {
345                use hotshot_types::traits::metrics::NoMetrics;
346
347                let (s, k, a) = &parties[i as usize];
348                let it = parties.iter().map(|(s, k, a)| {
349                    (
350                        k.clone(),
351                        PeerConnectInfo {
352                            x25519_key: s.public_key(),
353                            p2p_addr: a.clone(),
354                        },
355                    )
356                });
357                let met = Box::new(NoMetrics);
358                let net = Cliquenet::create("test", k.clone(), s.clone(), a.clone(), it, met)
359                    .await
360                    .unwrap();
361                Arc::new(net)
362            };
363            Box::pin(future)
364        })
365    }
366
367    fn in_flight_message_count(&self) -> Option<usize> {
368        None
369    }
370}
371
372/// Generate an arbitrary number or network parties.
373///
374/// A party is defined by its X25519 keypair, public signing key and network address.
375#[cfg(feature = "hotshot-testing")]
376fn gen_parties<K: SignatureKey>() -> impl Iterator<Item = (Keypair, K, NetAddr)> {
377    let mut i = 0u64;
378    std::iter::repeat_with(move || {
379        let secret = K::generated_from_seed_indexed([0u8; 32], i).1;
380        let public = K::from_private(&secret);
381        let kpair = Keypair::derive_from::<K>(&secret);
382        let port =
383            test_utils::reserve_tcp_port().expect("OS should have ephemeral ports available");
384        let addr = NetAddr::Inet(std::net::Ipv4Addr::LOCALHOST.into(), port);
385        i += 1;
386        (kpair, public, addr)
387    })
388}