hotshot_query_service/testing/
consensus.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::{fmt::Display, num::NonZeroUsize, sync::Arc, time::Duration};
14
15use alloy::primitives::U256;
16use async_lock::RwLock;
17use async_trait::async_trait;
18use futures::{
19    future::{join_all, Future},
20    stream::StreamExt,
21};
22use hotshot::{
23    traits::implementations::{MasterMap, MemoryNetwork},
24    types::{Event, SystemContextHandle},
25    HotShotInitializer, SystemContext,
26};
27use hotshot_example_types::{state_types::TestInstanceState, storage_types::TestStorage};
28use hotshot_testing::block_builder::{SimpleBuilderImplementation, TestBuilderImplementation};
29use hotshot_types::{
30    consensus::ConsensusMetricsValue,
31    data::ViewNumber,
32    drb::INITIAL_DRB_RESULT,
33    epoch_membership::EpochMembershipCoordinator,
34    light_client::StateKeyPair,
35    signature_key::BLSPubKey,
36    storage_metrics::StorageMetricsValue,
37    traits::{
38        election::Membership,
39        network::Topic,
40        node_implementation::{ConsensusTime, Versions},
41        signature_key::SignatureKey as _,
42    },
43    HotShotConfig, PeerConfig,
44};
45use tokio::{
46    runtime::Handle,
47    task::{block_in_place, yield_now},
48};
49use tracing::{info_span, Instrument};
50use url::Url;
51
52use super::mocks::{MockMembership, MockNodeImpl, MockTransaction, MockTypes};
53use crate::{
54    availability::{AvailabilityDataSource, UpdateAvailabilityData},
55    data_source::{FileSystemDataSource, SqlDataSource, VersionedDataSource},
56    fetching::provider::NoFetching,
57    node::NodeDataSource,
58    status::{StatusDataSource, UpdateStatusData},
59    task::BackgroundTask,
60    SignatureKey,
61};
62
63struct MockNode<D: DataSourceLifeCycle, V: Versions> {
64    hotshot: SystemContextHandle<MockTypes, MockNodeImpl, V>,
65    data_source: D,
66    storage: D::Storage,
67}
68
69pub struct MockNetwork<D: DataSourceLifeCycle, V: Versions> {
70    tasks: Vec<BackgroundTask>,
71    nodes: Vec<MockNode<D, V>>,
72    pub_keys: Vec<BLSPubKey>,
73}
74
75// MockNetwork can be used with any DataSourceLifeCycle, but it's nice to have a default with a
76// convenient type alias.
77pub type MockDataSource = FileSystemDataSource<MockTypes, NoFetching>;
78pub type MockSqlDataSource = SqlDataSource<MockTypes, NoFetching>;
79
80pub const NUM_NODES: usize = 2;
81const EPOCH_HEIGHT: u64 = 10;
82const DIFFICULTY_LEVEL: u64 = 10;
83
84impl<D: DataSourceLifeCycle + UpdateStatusData, V: Versions> MockNetwork<D, V> {
85    pub async fn init() -> Self {
86        Self::init_with_config(|_| {}, false).await
87    }
88
89    pub async fn init_with_leaf_ds() -> Self {
90        Self::init_with_config(|_| {}, true).await
91    }
92
93    pub async fn init_with_config(
94        update_config: impl FnOnce(&mut HotShotConfig<MockTypes>),
95        leaf_only: bool,
96    ) -> Self {
97        let (pub_keys, priv_keys): (Vec<_>, Vec<_>) = (0..NUM_NODES)
98            .map(|i| BLSPubKey::generated_from_seed_indexed([0; 32], i as u64))
99            .unzip();
100        let num_staked_nodes = NonZeroUsize::new(pub_keys.len()).unwrap();
101        let state_key_pairs = (0..num_staked_nodes.into())
102            .map(|i| StateKeyPair::generate_from_seed_indexed([0; 32], i as u64))
103            .collect::<Vec<_>>();
104        let master_map = MasterMap::new();
105        let stake = 1u64;
106        let known_nodes_with_stake = (0..num_staked_nodes.into())
107            .map(|id| PeerConfig {
108                stake_table_entry: pub_keys[id].stake_table_entry(U256::from(stake)),
109                state_ver_key: state_key_pairs[id].ver_key(),
110            })
111            .collect::<Vec<_>>();
112
113        // Pick a random, unused port for the builder server
114        let builder_port = portpicker::pick_unused_port().expect("No ports available");
115
116        // Create the bind URL from the random port
117        let builder_url =
118            Url::parse(&format!("http://0.0.0.0:{builder_port}")).expect("Failed to parse URL");
119
120        // Start the builder server
121        let builder_task =
122            <SimpleBuilderImplementation as TestBuilderImplementation<MockTypes>>::start(
123                NUM_NODES,
124                builder_url.clone(),
125                (),
126                Default::default(),
127            )
128            .await;
129
130        let mut config = HotShotConfig {
131            builder_urls: vec1::vec1![builder_url.clone()],
132            fixed_leader_for_gpuvid: 0,
133            num_nodes_with_stake: num_staked_nodes,
134            known_nodes_with_stake: known_nodes_with_stake.clone(),
135            next_view_timeout: 10000,
136            num_bootstrap: 0,
137            da_staked_committee_size: pub_keys.len(),
138            known_da_nodes: known_nodes_with_stake.clone(),
139            da_committees: Default::default(),
140            data_request_delay: Duration::from_millis(200),
141            view_sync_timeout: Duration::from_millis(250),
142            start_threshold: (
143                known_nodes_with_stake.len() as u64,
144                known_nodes_with_stake.len() as u64,
145            ),
146            builder_timeout: Duration::from_secs(1),
147            start_proposing_view: 0,
148            stop_proposing_view: 0,
149            start_voting_view: 0,
150            stop_voting_view: 0,
151            start_proposing_time: 0,
152            stop_proposing_time: 0,
153            start_voting_time: 0,
154            stop_voting_time: 0,
155            epoch_height: EPOCH_HEIGHT,
156            epoch_start_block: 0,
157            stake_table_capacity: hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY,
158            drb_difficulty: DIFFICULTY_LEVEL,
159            drb_upgrade_difficulty: DIFFICULTY_LEVEL,
160        };
161        update_config(&mut config);
162
163        let nodes = join_all(
164            priv_keys
165                .into_iter()
166                .enumerate()
167                .map(|(node_id, priv_key)| {
168                    let config = config.clone();
169
170                    let pub_keys = pub_keys.clone();
171                    let master_map = master_map.clone();
172                    let state_priv_keys = state_key_pairs
173                        .iter()
174                        .map(|kp| kp.sign_key())
175                        .collect::<Vec<_>>();
176
177                    let span = info_span!("initialize node", node_id);
178                    let known_nodes_with_stake_clone = known_nodes_with_stake.clone();
179                    async move {
180                        let storage = D::create(node_id).await;
181                        let data_source = if leaf_only {
182                            D::leaf_only_ds(&storage).await
183                        } else {
184                            D::connect(&storage).await
185                        };
186
187                        let network = Arc::new(MemoryNetwork::new(
188                            &pub_keys[node_id],
189                            &master_map.clone(),
190                            &[Topic::Global, Topic::Da],
191                            None,
192                        ));
193                        let hs_storage: TestStorage<MockTypes> = TestStorage::default();
194
195                        let membership =
196                            Arc::new(RwLock::new(MockMembership::new::<MockNodeImpl>(
197                                known_nodes_with_stake_clone.clone(),
198                                known_nodes_with_stake_clone,
199                                hs_storage.clone(),
200                                network.clone(),
201                                pub_keys[node_id],
202                                config.epoch_height,
203                            )));
204
205                        membership
206                            .write()
207                            .await
208                            .set_first_epoch(ViewNumber::new(0), INITIAL_DRB_RESULT);
209                        let memberships = EpochMembershipCoordinator::new(
210                            membership,
211                            config.epoch_height,
212                            &hs_storage.clone(),
213                        );
214
215                        let hotshot = SystemContext::init(
216                            pub_keys[node_id],
217                            priv_key,
218                            state_priv_keys[node_id].clone(),
219                            node_id as u64,
220                            config,
221                            memberships,
222                            network,
223                            HotShotInitializer::from_genesis::<V>(
224                                TestInstanceState::default(),
225                                0,
226                                0,
227                                vec![],
228                            )
229                            .await
230                            .unwrap(),
231                            ConsensusMetricsValue::new(&*data_source.populate_metrics()),
232                            hs_storage,
233                            StorageMetricsValue::new(&*data_source.populate_metrics()),
234                        )
235                        .await
236                        .unwrap()
237                        .0;
238
239                        MockNode {
240                            hotshot,
241                            data_source,
242                            storage,
243                        }
244                    }
245                    .instrument(span)
246                }),
247        )
248        .await;
249
250        // Hook the builder up to the event stream from the first node
251
252        builder_task.start(Box::new(nodes[0].hotshot.event_stream()));
253
254        let mut network = Self {
255            nodes,
256            pub_keys,
257            tasks: Default::default(),
258        };
259        D::setup(&mut network).await;
260        network
261    }
262}
263
264impl<D: DataSourceLifeCycle, V: Versions> MockNetwork<D, V> {
265    pub fn handle(&self) -> &SystemContextHandle<MockTypes, MockNodeImpl, V> {
266        &self.nodes[0].hotshot
267    }
268
269    pub async fn submit_transaction(&self, tx: MockTransaction) {
270        self.handle().submit_transaction(tx).await.unwrap();
271    }
272
273    pub fn num_nodes(&self) -> usize {
274        self.pub_keys.len()
275    }
276
277    pub fn proposer(&self, i: usize) -> SignatureKey<MockTypes> {
278        self.pub_keys[i]
279    }
280
281    pub fn data_source_index(&self, i: usize) -> D {
282        self.nodes[i].data_source.clone()
283    }
284
285    pub fn data_source(&self) -> D {
286        self.data_source_index(0)
287    }
288
289    pub fn storage(&self) -> &D::Storage {
290        &self.nodes[0].storage
291    }
292
293    pub fn spawn(&mut self, name: impl Display, task: impl Future + Send + 'static) {
294        self.tasks.push(BackgroundTask::spawn(name, task));
295    }
296
297    pub async fn shut_down(mut self) {
298        self.shut_down_impl().await
299    }
300
301    async fn shut_down_impl(&mut self) {
302        for node in &mut self.nodes {
303            node.hotshot.shut_down().await;
304        }
305    }
306
307    pub fn epoch_height(&self) -> u64 {
308        EPOCH_HEIGHT
309    }
310}
311
312impl<D: DataSourceLifeCycle, V: Versions> MockNetwork<D, V> {
313    pub async fn start(&mut self) {
314        // Spawn the update tasks.
315        for (i, node) in self.nodes.iter_mut().enumerate() {
316            let ds = node.data_source.clone();
317            let mut events = node.hotshot.event_stream();
318            self.tasks.push(BackgroundTask::spawn(
319                format!("update node {i}"),
320                async move {
321                    while let Some(event) = events.next().await {
322                        tracing::info!(node = i, event = ?event.event, "EVENT");
323                        {
324                            ds.handle_event(&event).await;
325                        }
326                        yield_now().await;
327                    }
328                },
329            ));
330        }
331
332        join_all(
333            self.nodes
334                .iter()
335                .map(|node| node.hotshot.hotshot.start_consensus()),
336        )
337        .await;
338    }
339}
340
341impl<D: DataSourceLifeCycle, V: Versions> Drop for MockNetwork<D, V> {
342    fn drop(&mut self) {
343        if let Ok(handle) = Handle::try_current() {
344            block_in_place(move || handle.block_on(self.shut_down_impl()));
345        }
346    }
347}
348
349#[async_trait]
350pub trait DataSourceLifeCycle: Clone + Send + Sync + Sized + 'static {
351    /// Backing storage for the data source.
352    ///
353    /// This can be used to connect to data sources to the same underlying data. It must be kept
354    /// alive as long as the related data sources are open.
355    type Storage: Send + Sync;
356
357    async fn create(node_id: usize) -> Self::Storage;
358    async fn connect(storage: &Self::Storage) -> Self;
359    async fn reset(storage: &Self::Storage) -> Self;
360    async fn handle_event(&self, event: &Event<MockTypes>);
361    async fn leaf_only_ds(_storage: &Self::Storage) -> Self {
362        panic!("not supported")
363    }
364
365    /// Setup runs after setting up the network but before starting a test.
366    async fn setup<V: Versions>(_network: &mut MockNetwork<Self, V>) {}
367}
368
369pub trait TestableDataSource:
370    DataSourceLifeCycle
371    + AvailabilityDataSource<MockTypes>
372    + UpdateAvailabilityData<MockTypes>
373    + NodeDataSource<MockTypes>
374    + StatusDataSource
375    + VersionedDataSource
376{
377}
378
379impl<T> TestableDataSource for T where
380    T: DataSourceLifeCycle
381        + AvailabilityDataSource<MockTypes>
382        + UpdateAvailabilityData<MockTypes>
383        + NodeDataSource<MockTypes>
384        + StatusDataSource
385        + VersionedDataSource
386{
387}