hotshot_query_service/testing/
consensus.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::{fmt::Display, num::NonZeroUsize, sync::Arc, time::Duration};
14
15use alloy::primitives::U256;
16use async_lock::RwLock;
17use async_trait::async_trait;
18use futures::{
19    future::{Future, join_all},
20    stream::StreamExt,
21};
22use hotshot::{
23    HotShotInitializer, SystemContext,
24    traits::implementations::{MasterMap, MemoryNetwork},
25    types::{Event, SystemContextHandle},
26};
27use hotshot_example_types::{state_types::TestInstanceState, storage_types::TestStorage};
28use hotshot_testing::block_builder::{SimpleBuilderImplementation, TestBuilderImplementation};
29use hotshot_types::{
30    HotShotConfig, PeerConfig,
31    consensus::ConsensusMetricsValue,
32    data::EpochNumber,
33    drb::INITIAL_DRB_RESULT,
34    epoch_membership::EpochMembershipCoordinator,
35    light_client::StateKeyPair,
36    signature_key::BLSPubKey,
37    storage_metrics::StorageMetricsValue,
38    traits::{election::Membership, network::Topic, signature_key::SignatureKey as _},
39};
40use test_utils::reserve_tcp_port;
41use tokio::{
42    runtime::Handle,
43    task::{block_in_place, yield_now},
44};
45use tracing::{Instrument, info_span};
46use url::Url;
47use versions::{MIN_SUPPORTED_VERSION, Upgrade};
48
49use super::mocks::{MockMembership, MockNodeImpl, MockTransaction, MockTypes};
50use crate::{
51    SignatureKey,
52    availability::{AvailabilityDataSource, UpdateAvailabilityData},
53    data_source::{FileSystemDataSource, SqlDataSource, VersionedDataSource},
54    fetching::provider::NoFetching,
55    node::NodeDataSource,
56    status::{StatusDataSource, UpdateStatusData},
57    task::BackgroundTask,
58};
59
60struct MockNode<D: DataSourceLifeCycle> {
61    hotshot: SystemContextHandle<MockTypes, MockNodeImpl>,
62    data_source: D,
63    storage: D::Storage,
64}
65
66pub struct MockNetwork<D: DataSourceLifeCycle> {
67    tasks: Vec<BackgroundTask>,
68    nodes: Vec<MockNode<D>>,
69    pub_keys: Vec<BLSPubKey>,
70}
71
72// MockNetwork can be used with any DataSourceLifeCycle, but it's nice to have a default with a
73// convenient type alias.
74pub type MockDataSource = FileSystemDataSource<MockTypes, NoFetching>;
75pub type MockSqlDataSource = SqlDataSource<MockTypes, NoFetching>;
76
77pub const NUM_NODES: usize = 2;
78const EPOCH_HEIGHT: u64 = 10;
79const DIFFICULTY_LEVEL: u64 = 10;
80
81impl<D: DataSourceLifeCycle + UpdateStatusData> MockNetwork<D> {
82    pub async fn init() -> Self {
83        Self::init_with_config(|_| {}, false).await
84    }
85
86    pub async fn init_with_leaf_ds() -> Self {
87        Self::init_with_config(|_| {}, true).await
88    }
89
90    pub async fn init_with_config(
91        update_config: impl FnOnce(&mut HotShotConfig<MockTypes>),
92        leaf_only: bool,
93    ) -> Self {
94        let (pub_keys, priv_keys): (Vec<_>, Vec<_>) = (0..NUM_NODES)
95            .map(|i| BLSPubKey::generated_from_seed_indexed([0; 32], i as u64))
96            .unzip();
97        let num_staked_nodes = NonZeroUsize::new(pub_keys.len()).unwrap();
98        let state_key_pairs = (0..num_staked_nodes.into())
99            .map(|i| StateKeyPair::generate_from_seed_indexed([0; 32], i as u64))
100            .collect::<Vec<_>>();
101        let master_map = MasterMap::new();
102        let stake = 1u64;
103        let known_nodes_with_stake = (0..num_staked_nodes.into())
104            .map(|id| PeerConfig {
105                stake_table_entry: pub_keys[id].stake_table_entry(U256::from(stake)),
106                state_ver_key: state_key_pairs[id].ver_key(),
107                connect_info: None,
108            })
109            .collect::<Vec<_>>();
110
111        // Pick a random, unused port for the builder server
112        let builder_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
113
114        // Create the bind URL from the random port
115        let builder_url =
116            Url::parse(&format!("http://0.0.0.0:{builder_port}")).expect("Failed to parse URL");
117
118        // Start the builder server
119        let builder_task =
120            <SimpleBuilderImplementation as TestBuilderImplementation<MockTypes>>::start(
121                NUM_NODES,
122                builder_url.clone(),
123                (),
124                Default::default(),
125            )
126            .await;
127
128        let mut config = HotShotConfig {
129            builder_urls: vec1::vec1![builder_url.clone()],
130            fixed_leader_for_gpuvid: 0,
131            num_nodes_with_stake: num_staked_nodes,
132            known_nodes_with_stake: known_nodes_with_stake.clone(),
133            next_view_timeout: 10000,
134            num_bootstrap: 0,
135            da_staked_committee_size: pub_keys.len(),
136            known_da_nodes: known_nodes_with_stake.clone(),
137            da_committees: Default::default(),
138            data_request_delay: Duration::from_millis(200),
139            view_sync_timeout: Duration::from_millis(250),
140            start_threshold: (
141                known_nodes_with_stake.len() as u64,
142                known_nodes_with_stake.len() as u64,
143            ),
144            builder_timeout: Duration::from_secs(1),
145            start_proposing_view: 0,
146            stop_proposing_view: 0,
147            start_voting_view: 0,
148            stop_voting_view: 0,
149            start_proposing_time: 0,
150            stop_proposing_time: 0,
151            start_voting_time: 0,
152            stop_voting_time: 0,
153            epoch_height: EPOCH_HEIGHT,
154            epoch_start_block: 0,
155            stake_table_capacity: hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY,
156            drb_difficulty: DIFFICULTY_LEVEL,
157            drb_upgrade_difficulty: DIFFICULTY_LEVEL,
158        };
159        update_config(&mut config);
160        let upgrade = Upgrade::trivial(MIN_SUPPORTED_VERSION);
161
162        let nodes = join_all(
163            priv_keys
164                .into_iter()
165                .enumerate()
166                .map(|(node_id, priv_key)| {
167                    let config = config.clone();
168
169                    let pub_keys = pub_keys.clone();
170                    let master_map = master_map.clone();
171                    let state_priv_keys = state_key_pairs
172                        .iter()
173                        .map(|kp| kp.sign_key())
174                        .collect::<Vec<_>>();
175
176                    let span = info_span!("initialize node", node_id);
177                    let known_nodes_with_stake_clone = known_nodes_with_stake.clone();
178                    async move {
179                        let storage = D::create(node_id).await;
180                        let data_source = if leaf_only {
181                            D::leaf_only_ds(&storage).await
182                        } else {
183                            D::connect(&storage).await
184                        };
185
186                        let network = Arc::new(MemoryNetwork::new(
187                            &pub_keys[node_id],
188                            &master_map.clone(),
189                            &[Topic::Global, Topic::Da],
190                            None,
191                        ));
192                        let hs_storage: TestStorage<MockTypes> = TestStorage::default();
193
194                        let membership =
195                            Arc::new(RwLock::new(MockMembership::new::<MockNodeImpl>(
196                                known_nodes_with_stake_clone.clone(),
197                                known_nodes_with_stake_clone,
198                                hs_storage.clone(),
199                                network.clone(),
200                                pub_keys[node_id],
201                                config.epoch_height,
202                            )));
203
204                        membership
205                            .write()
206                            .await
207                            .set_first_epoch(EpochNumber::new(0), INITIAL_DRB_RESULT);
208                        let memberships = EpochMembershipCoordinator::new(
209                            membership,
210                            config.epoch_height,
211                            &hs_storage.clone(),
212                        );
213
214                        let init = HotShotInitializer::from_genesis(
215                            TestInstanceState::default(),
216                            0,
217                            0,
218                            vec![],
219                            upgrade,
220                        )
221                        .await
222                        .unwrap();
223
224                        let hotshot = SystemContext::init(
225                            pub_keys[node_id],
226                            priv_key,
227                            state_priv_keys[node_id].clone(),
228                            node_id as u64,
229                            config,
230                            upgrade,
231                            memberships,
232                            network,
233                            init,
234                            ConsensusMetricsValue::new(&*data_source.populate_metrics()),
235                            hs_storage,
236                            StorageMetricsValue::new(&*data_source.populate_metrics()),
237                        )
238                        .await
239                        .unwrap()
240                        .0;
241
242                        MockNode {
243                            hotshot,
244                            data_source,
245                            storage,
246                        }
247                    }
248                    .instrument(span)
249                }),
250        )
251        .await;
252
253        // Hook the builder up to the event stream from the first node
254
255        builder_task.start(Box::new(nodes[0].hotshot.event_stream()));
256
257        let mut network = Self {
258            nodes,
259            pub_keys,
260            tasks: Default::default(),
261        };
262        D::setup(&mut network).await;
263        network
264    }
265}
266
267impl<D: DataSourceLifeCycle> MockNetwork<D> {
268    pub fn handle(&self) -> &SystemContextHandle<MockTypes, MockNodeImpl> {
269        &self.nodes[0].hotshot
270    }
271
272    pub async fn submit_transaction(&self, tx: MockTransaction) {
273        self.handle().submit_transaction(tx).await.unwrap();
274    }
275
276    pub fn num_nodes(&self) -> usize {
277        self.pub_keys.len()
278    }
279
280    pub fn proposer(&self, i: usize) -> SignatureKey<MockTypes> {
281        self.pub_keys[i]
282    }
283
284    pub fn data_source_index(&self, i: usize) -> D {
285        self.nodes[i].data_source.clone()
286    }
287
288    pub fn data_source(&self) -> D {
289        self.data_source_index(0)
290    }
291
292    pub fn storage(&self) -> &D::Storage {
293        &self.nodes[0].storage
294    }
295
296    pub fn spawn(&mut self, name: impl Display, task: impl Future + Send + 'static) {
297        self.tasks.push(BackgroundTask::spawn(name, task));
298    }
299
300    pub async fn shut_down(mut self) {
301        self.shut_down_impl().await
302    }
303
304    async fn shut_down_impl(&mut self) {
305        for node in &mut self.nodes {
306            node.hotshot.shut_down().await;
307        }
308    }
309
310    pub fn epoch_height(&self) -> u64 {
311        EPOCH_HEIGHT
312    }
313}
314
315impl<D: DataSourceLifeCycle> MockNetwork<D> {
316    pub async fn start(&mut self) {
317        // Spawn the update tasks.
318        for (i, node) in self.nodes.iter_mut().enumerate() {
319            let ds = node.data_source.clone();
320            let mut events = node.hotshot.event_stream();
321            self.tasks.push(BackgroundTask::spawn(
322                format!("update node {i}"),
323                async move {
324                    while let Some(event) = events.next().await {
325                        tracing::info!(node = i, event = ?event.event, "EVENT");
326                        {
327                            ds.handle_event(&event).await;
328                        }
329                        yield_now().await;
330                    }
331                },
332            ));
333        }
334
335        join_all(
336            self.nodes
337                .iter()
338                .map(|node| node.hotshot.hotshot.start_consensus()),
339        )
340        .await;
341    }
342}
343
344impl<D: DataSourceLifeCycle> Drop for MockNetwork<D> {
345    fn drop(&mut self) {
346        if let Ok(handle) = Handle::try_current() {
347            block_in_place(move || handle.block_on(self.shut_down_impl()));
348        }
349    }
350}
351
352#[async_trait]
353pub trait DataSourceLifeCycle: Clone + Send + Sync + Sized + 'static {
354    /// Backing storage for the data source.
355    ///
356    /// This can be used to connect to data sources to the same underlying data. It must be kept
357    /// alive as long as the related data sources are open.
358    type Storage: Send + Sync;
359
360    async fn create(node_id: usize) -> Self::Storage;
361    async fn connect(storage: &Self::Storage) -> Self;
362    async fn reset(storage: &Self::Storage) -> Self;
363    async fn handle_event(&self, event: &Event<MockTypes>);
364    async fn leaf_only_ds(_storage: &Self::Storage) -> Self {
365        panic!("not supported")
366    }
367
368    /// Setup runs after setting up the network but before starting a test.
369    async fn setup(_network: &mut MockNetwork<Self>) {}
370}
371
372pub trait TestableDataSource:
373    DataSourceLifeCycle
374    + AvailabilityDataSource<MockTypes>
375    + UpdateAvailabilityData<MockTypes>
376    + NodeDataSource<MockTypes>
377    + StatusDataSource
378    + VersionedDataSource
379{
380}
381
382impl<T> TestableDataSource for T where
383    T: DataSourceLifeCycle
384        + AvailabilityDataSource<MockTypes>
385        + UpdateAvailabilityData<MockTypes>
386        + NodeDataSource<MockTypes>
387        + StatusDataSource
388        + VersionedDataSource
389{
390}