hotshot_query_service/testing/
consensus.rs1use std::{fmt::Display, num::NonZeroUsize, sync::Arc, time::Duration};
14
15use alloy::primitives::U256;
16use async_lock::RwLock;
17use async_trait::async_trait;
18use futures::{
19 future::{Future, join_all},
20 stream::StreamExt,
21};
22use hotshot::{
23 HotShotInitializer, SystemContext,
24 traits::implementations::{MasterMap, MemoryNetwork},
25 types::{Event, SystemContextHandle},
26};
27use hotshot_example_types::{state_types::TestInstanceState, storage_types::TestStorage};
28use hotshot_testing::block_builder::{SimpleBuilderImplementation, TestBuilderImplementation};
29use hotshot_types::{
30 HotShotConfig, PeerConfig,
31 consensus::ConsensusMetricsValue,
32 data::EpochNumber,
33 drb::INITIAL_DRB_RESULT,
34 epoch_membership::EpochMembershipCoordinator,
35 light_client::StateKeyPair,
36 signature_key::BLSPubKey,
37 storage_metrics::StorageMetricsValue,
38 traits::{election::Membership, network::Topic, signature_key::SignatureKey as _},
39};
40use test_utils::reserve_tcp_port;
41use tokio::{
42 runtime::Handle,
43 task::{block_in_place, yield_now},
44};
45use tracing::{Instrument, info_span};
46use url::Url;
47use versions::{MIN_SUPPORTED_VERSION, Upgrade};
48
49use super::mocks::{MockMembership, MockNodeImpl, MockTransaction, MockTypes};
50use crate::{
51 SignatureKey,
52 availability::{AvailabilityDataSource, UpdateAvailabilityData},
53 data_source::{FileSystemDataSource, SqlDataSource, VersionedDataSource},
54 fetching::provider::NoFetching,
55 node::NodeDataSource,
56 status::{StatusDataSource, UpdateStatusData},
57 task::BackgroundTask,
58};
59
60struct MockNode<D: DataSourceLifeCycle> {
61 hotshot: SystemContextHandle<MockTypes, MockNodeImpl>,
62 data_source: D,
63 storage: D::Storage,
64}
65
66pub struct MockNetwork<D: DataSourceLifeCycle> {
67 tasks: Vec<BackgroundTask>,
68 nodes: Vec<MockNode<D>>,
69 pub_keys: Vec<BLSPubKey>,
70}
71
72pub type MockDataSource = FileSystemDataSource<MockTypes, NoFetching>;
75pub type MockSqlDataSource = SqlDataSource<MockTypes, NoFetching>;
76
77pub const NUM_NODES: usize = 2;
78const EPOCH_HEIGHT: u64 = 10;
79const DIFFICULTY_LEVEL: u64 = 10;
80
81impl<D: DataSourceLifeCycle + UpdateStatusData> MockNetwork<D> {
82 pub async fn init() -> Self {
83 Self::init_with_config(|_| {}, false).await
84 }
85
86 pub async fn init_with_leaf_ds() -> Self {
87 Self::init_with_config(|_| {}, true).await
88 }
89
90 pub async fn init_with_config(
91 update_config: impl FnOnce(&mut HotShotConfig<MockTypes>),
92 leaf_only: bool,
93 ) -> Self {
94 let (pub_keys, priv_keys): (Vec<_>, Vec<_>) = (0..NUM_NODES)
95 .map(|i| BLSPubKey::generated_from_seed_indexed([0; 32], i as u64))
96 .unzip();
97 let num_staked_nodes = NonZeroUsize::new(pub_keys.len()).unwrap();
98 let state_key_pairs = (0..num_staked_nodes.into())
99 .map(|i| StateKeyPair::generate_from_seed_indexed([0; 32], i as u64))
100 .collect::<Vec<_>>();
101 let master_map = MasterMap::new();
102 let stake = 1u64;
103 let known_nodes_with_stake = (0..num_staked_nodes.into())
104 .map(|id| PeerConfig {
105 stake_table_entry: pub_keys[id].stake_table_entry(U256::from(stake)),
106 state_ver_key: state_key_pairs[id].ver_key(),
107 connect_info: None,
108 })
109 .collect::<Vec<_>>();
110
111 let builder_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
113
114 let builder_url =
116 Url::parse(&format!("http://0.0.0.0:{builder_port}")).expect("Failed to parse URL");
117
118 let builder_task =
120 <SimpleBuilderImplementation as TestBuilderImplementation<MockTypes>>::start(
121 NUM_NODES,
122 builder_url.clone(),
123 (),
124 Default::default(),
125 )
126 .await;
127
128 let mut config = HotShotConfig {
129 builder_urls: vec1::vec1![builder_url.clone()],
130 fixed_leader_for_gpuvid: 0,
131 num_nodes_with_stake: num_staked_nodes,
132 known_nodes_with_stake: known_nodes_with_stake.clone(),
133 next_view_timeout: 10000,
134 num_bootstrap: 0,
135 da_staked_committee_size: pub_keys.len(),
136 known_da_nodes: known_nodes_with_stake.clone(),
137 da_committees: Default::default(),
138 data_request_delay: Duration::from_millis(200),
139 view_sync_timeout: Duration::from_millis(250),
140 start_threshold: (
141 known_nodes_with_stake.len() as u64,
142 known_nodes_with_stake.len() as u64,
143 ),
144 builder_timeout: Duration::from_secs(1),
145 start_proposing_view: 0,
146 stop_proposing_view: 0,
147 start_voting_view: 0,
148 stop_voting_view: 0,
149 start_proposing_time: 0,
150 stop_proposing_time: 0,
151 start_voting_time: 0,
152 stop_voting_time: 0,
153 epoch_height: EPOCH_HEIGHT,
154 epoch_start_block: 0,
155 stake_table_capacity: hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY,
156 drb_difficulty: DIFFICULTY_LEVEL,
157 drb_upgrade_difficulty: DIFFICULTY_LEVEL,
158 };
159 update_config(&mut config);
160 let upgrade = Upgrade::trivial(MIN_SUPPORTED_VERSION);
161
162 let nodes = join_all(
163 priv_keys
164 .into_iter()
165 .enumerate()
166 .map(|(node_id, priv_key)| {
167 let config = config.clone();
168
169 let pub_keys = pub_keys.clone();
170 let master_map = master_map.clone();
171 let state_priv_keys = state_key_pairs
172 .iter()
173 .map(|kp| kp.sign_key())
174 .collect::<Vec<_>>();
175
176 let span = info_span!("initialize node", node_id);
177 let known_nodes_with_stake_clone = known_nodes_with_stake.clone();
178 async move {
179 let storage = D::create(node_id).await;
180 let data_source = if leaf_only {
181 D::leaf_only_ds(&storage).await
182 } else {
183 D::connect(&storage).await
184 };
185
186 let network = Arc::new(MemoryNetwork::new(
187 &pub_keys[node_id],
188 &master_map.clone(),
189 &[Topic::Global, Topic::Da],
190 None,
191 ));
192 let hs_storage: TestStorage<MockTypes> = TestStorage::default();
193
194 let membership =
195 Arc::new(RwLock::new(MockMembership::new::<MockNodeImpl>(
196 known_nodes_with_stake_clone.clone(),
197 known_nodes_with_stake_clone,
198 hs_storage.clone(),
199 network.clone(),
200 pub_keys[node_id],
201 config.epoch_height,
202 )));
203
204 membership
205 .write()
206 .await
207 .set_first_epoch(EpochNumber::new(0), INITIAL_DRB_RESULT);
208 let memberships = EpochMembershipCoordinator::new(
209 membership,
210 config.epoch_height,
211 &hs_storage.clone(),
212 );
213
214 let init = HotShotInitializer::from_genesis(
215 TestInstanceState::default(),
216 0,
217 0,
218 vec![],
219 upgrade,
220 )
221 .await
222 .unwrap();
223
224 let hotshot = SystemContext::init(
225 pub_keys[node_id],
226 priv_key,
227 state_priv_keys[node_id].clone(),
228 node_id as u64,
229 config,
230 upgrade,
231 memberships,
232 network,
233 init,
234 ConsensusMetricsValue::new(&*data_source.populate_metrics()),
235 hs_storage,
236 StorageMetricsValue::new(&*data_source.populate_metrics()),
237 )
238 .await
239 .unwrap()
240 .0;
241
242 MockNode {
243 hotshot,
244 data_source,
245 storage,
246 }
247 }
248 .instrument(span)
249 }),
250 )
251 .await;
252
253 builder_task.start(Box::new(nodes[0].hotshot.event_stream()));
256
257 let mut network = Self {
258 nodes,
259 pub_keys,
260 tasks: Default::default(),
261 };
262 D::setup(&mut network).await;
263 network
264 }
265}
266
267impl<D: DataSourceLifeCycle> MockNetwork<D> {
268 pub fn handle(&self) -> &SystemContextHandle<MockTypes, MockNodeImpl> {
269 &self.nodes[0].hotshot
270 }
271
272 pub async fn submit_transaction(&self, tx: MockTransaction) {
273 self.handle().submit_transaction(tx).await.unwrap();
274 }
275
276 pub fn num_nodes(&self) -> usize {
277 self.pub_keys.len()
278 }
279
280 pub fn proposer(&self, i: usize) -> SignatureKey<MockTypes> {
281 self.pub_keys[i]
282 }
283
284 pub fn data_source_index(&self, i: usize) -> D {
285 self.nodes[i].data_source.clone()
286 }
287
288 pub fn data_source(&self) -> D {
289 self.data_source_index(0)
290 }
291
292 pub fn storage(&self) -> &D::Storage {
293 &self.nodes[0].storage
294 }
295
296 pub fn spawn(&mut self, name: impl Display, task: impl Future + Send + 'static) {
297 self.tasks.push(BackgroundTask::spawn(name, task));
298 }
299
300 pub async fn shut_down(mut self) {
301 self.shut_down_impl().await
302 }
303
304 async fn shut_down_impl(&mut self) {
305 for node in &mut self.nodes {
306 node.hotshot.shut_down().await;
307 }
308 }
309
310 pub fn epoch_height(&self) -> u64 {
311 EPOCH_HEIGHT
312 }
313}
314
315impl<D: DataSourceLifeCycle> MockNetwork<D> {
316 pub async fn start(&mut self) {
317 for (i, node) in self.nodes.iter_mut().enumerate() {
319 let ds = node.data_source.clone();
320 let mut events = node.hotshot.event_stream();
321 self.tasks.push(BackgroundTask::spawn(
322 format!("update node {i}"),
323 async move {
324 while let Some(event) = events.next().await {
325 tracing::info!(node = i, event = ?event.event, "EVENT");
326 {
327 ds.handle_event(&event).await;
328 }
329 yield_now().await;
330 }
331 },
332 ));
333 }
334
335 join_all(
336 self.nodes
337 .iter()
338 .map(|node| node.hotshot.hotshot.start_consensus()),
339 )
340 .await;
341 }
342}
343
344impl<D: DataSourceLifeCycle> Drop for MockNetwork<D> {
345 fn drop(&mut self) {
346 if let Ok(handle) = Handle::try_current() {
347 block_in_place(move || handle.block_on(self.shut_down_impl()));
348 }
349 }
350}
351
352#[async_trait]
353pub trait DataSourceLifeCycle: Clone + Send + Sync + Sized + 'static {
354 type Storage: Send + Sync;
359
360 async fn create(node_id: usize) -> Self::Storage;
361 async fn connect(storage: &Self::Storage) -> Self;
362 async fn reset(storage: &Self::Storage) -> Self;
363 async fn handle_event(&self, event: &Event<MockTypes>);
364 async fn leaf_only_ds(_storage: &Self::Storage) -> Self {
365 panic!("not supported")
366 }
367
368 async fn setup(_network: &mut MockNetwork<Self>) {}
370}
371
372pub trait TestableDataSource:
373 DataSourceLifeCycle
374 + AvailabilityDataSource<MockTypes>
375 + UpdateAvailabilityData<MockTypes>
376 + NodeDataSource<MockTypes>
377 + StatusDataSource
378 + VersionedDataSource
379{
380}
381
382impl<T> TestableDataSource for T where
383 T: DataSourceLifeCycle
384 + AvailabilityDataSource<MockTypes>
385 + UpdateAvailabilityData<MockTypes>
386 + NodeDataSource<MockTypes>
387 + StatusDataSource
388 + VersionedDataSource
389{
390}