1use std::{fmt::Display, num::NonZeroUsize, sync::Arc, time::Duration};
14
15use alloy::primitives::U256;
16use async_lock::RwLock;
17use async_trait::async_trait;
18use futures::{
19 future::{join_all, Future},
20 stream::StreamExt,
21};
22use hotshot::{
23 traits::implementations::{MasterMap, MemoryNetwork},
24 types::{Event, SystemContextHandle},
25 HotShotInitializer, SystemContext,
26};
27use hotshot_example_types::{state_types::TestInstanceState, storage_types::TestStorage};
28use hotshot_testing::block_builder::{SimpleBuilderImplementation, TestBuilderImplementation};
29use hotshot_types::{
30 consensus::ConsensusMetricsValue,
31 data::ViewNumber,
32 drb::INITIAL_DRB_RESULT,
33 epoch_membership::EpochMembershipCoordinator,
34 light_client::StateKeyPair,
35 signature_key::BLSPubKey,
36 storage_metrics::StorageMetricsValue,
37 traits::{
38 election::Membership,
39 network::Topic,
40 node_implementation::{ConsensusTime, Versions},
41 signature_key::SignatureKey as _,
42 },
43 HotShotConfig, PeerConfig,
44};
45use tokio::{
46 runtime::Handle,
47 task::{block_in_place, yield_now},
48};
49use tracing::{info_span, Instrument};
50use url::Url;
51
52use super::mocks::{MockMembership, MockNodeImpl, MockTransaction, MockTypes};
53use crate::{
54 availability::{AvailabilityDataSource, UpdateAvailabilityData},
55 data_source::{FileSystemDataSource, SqlDataSource, VersionedDataSource},
56 fetching::provider::NoFetching,
57 node::NodeDataSource,
58 status::{StatusDataSource, UpdateStatusData},
59 task::BackgroundTask,
60 SignatureKey,
61};
62
63struct MockNode<D: DataSourceLifeCycle, V: Versions> {
64 hotshot: SystemContextHandle<MockTypes, MockNodeImpl, V>,
65 data_source: D,
66 storage: D::Storage,
67}
68
69pub struct MockNetwork<D: DataSourceLifeCycle, V: Versions> {
70 tasks: Vec<BackgroundTask>,
71 nodes: Vec<MockNode<D, V>>,
72 pub_keys: Vec<BLSPubKey>,
73}
74
75pub type MockDataSource = FileSystemDataSource<MockTypes, NoFetching>;
78pub type MockSqlDataSource = SqlDataSource<MockTypes, NoFetching>;
79
80pub const NUM_NODES: usize = 2;
81const EPOCH_HEIGHT: u64 = 10;
82const DIFFICULTY_LEVEL: u64 = 10;
83
84impl<D: DataSourceLifeCycle + UpdateStatusData, V: Versions> MockNetwork<D, V> {
85 pub async fn init() -> Self {
86 Self::init_with_config(|_| {}, false).await
87 }
88
89 pub async fn init_with_leaf_ds() -> Self {
90 Self::init_with_config(|_| {}, true).await
91 }
92
93 pub async fn init_with_config(
94 update_config: impl FnOnce(&mut HotShotConfig<MockTypes>),
95 leaf_only: bool,
96 ) -> Self {
97 let (pub_keys, priv_keys): (Vec<_>, Vec<_>) = (0..NUM_NODES)
98 .map(|i| BLSPubKey::generated_from_seed_indexed([0; 32], i as u64))
99 .unzip();
100 let num_staked_nodes = NonZeroUsize::new(pub_keys.len()).unwrap();
101 let state_key_pairs = (0..num_staked_nodes.into())
102 .map(|i| StateKeyPair::generate_from_seed_indexed([0; 32], i as u64))
103 .collect::<Vec<_>>();
104 let master_map = MasterMap::new();
105 let stake = 1u64;
106 let known_nodes_with_stake = (0..num_staked_nodes.into())
107 .map(|id| PeerConfig {
108 stake_table_entry: pub_keys[id].stake_table_entry(U256::from(stake)),
109 state_ver_key: state_key_pairs[id].ver_key(),
110 })
111 .collect::<Vec<_>>();
112
113 let builder_port = portpicker::pick_unused_port().expect("No ports available");
115
116 let builder_url =
118 Url::parse(&format!("http://0.0.0.0:{builder_port}")).expect("Failed to parse URL");
119
120 let builder_task =
122 <SimpleBuilderImplementation as TestBuilderImplementation<MockTypes>>::start(
123 NUM_NODES,
124 builder_url.clone(),
125 (),
126 Default::default(),
127 )
128 .await;
129
130 let mut config = HotShotConfig {
131 builder_urls: vec1::vec1![builder_url.clone()],
132 fixed_leader_for_gpuvid: 0,
133 num_nodes_with_stake: num_staked_nodes,
134 known_nodes_with_stake: known_nodes_with_stake.clone(),
135 next_view_timeout: 10000,
136 num_bootstrap: 0,
137 da_staked_committee_size: pub_keys.len(),
138 known_da_nodes: known_nodes_with_stake.clone(),
139 da_committees: Default::default(),
140 data_request_delay: Duration::from_millis(200),
141 view_sync_timeout: Duration::from_millis(250),
142 start_threshold: (
143 known_nodes_with_stake.len() as u64,
144 known_nodes_with_stake.len() as u64,
145 ),
146 builder_timeout: Duration::from_secs(1),
147 start_proposing_view: 0,
148 stop_proposing_view: 0,
149 start_voting_view: 0,
150 stop_voting_view: 0,
151 start_proposing_time: 0,
152 stop_proposing_time: 0,
153 start_voting_time: 0,
154 stop_voting_time: 0,
155 epoch_height: EPOCH_HEIGHT,
156 epoch_start_block: 0,
157 stake_table_capacity: hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY,
158 drb_difficulty: DIFFICULTY_LEVEL,
159 drb_upgrade_difficulty: DIFFICULTY_LEVEL,
160 };
161 update_config(&mut config);
162
163 let nodes = join_all(
164 priv_keys
165 .into_iter()
166 .enumerate()
167 .map(|(node_id, priv_key)| {
168 let config = config.clone();
169
170 let pub_keys = pub_keys.clone();
171 let master_map = master_map.clone();
172 let state_priv_keys = state_key_pairs
173 .iter()
174 .map(|kp| kp.sign_key())
175 .collect::<Vec<_>>();
176
177 let span = info_span!("initialize node", node_id);
178 let known_nodes_with_stake_clone = known_nodes_with_stake.clone();
179 async move {
180 let storage = D::create(node_id).await;
181 let data_source = if leaf_only {
182 D::leaf_only_ds(&storage).await
183 } else {
184 D::connect(&storage).await
185 };
186
187 let network = Arc::new(MemoryNetwork::new(
188 &pub_keys[node_id],
189 &master_map.clone(),
190 &[Topic::Global, Topic::Da],
191 None,
192 ));
193 let hs_storage: TestStorage<MockTypes> = TestStorage::default();
194
195 let membership =
196 Arc::new(RwLock::new(MockMembership::new::<MockNodeImpl>(
197 known_nodes_with_stake_clone.clone(),
198 known_nodes_with_stake_clone,
199 hs_storage.clone(),
200 network.clone(),
201 pub_keys[node_id],
202 config.epoch_height,
203 )));
204
205 membership
206 .write()
207 .await
208 .set_first_epoch(ViewNumber::new(0), INITIAL_DRB_RESULT);
209 let memberships = EpochMembershipCoordinator::new(
210 membership,
211 config.epoch_height,
212 &hs_storage.clone(),
213 );
214
215 let hotshot = SystemContext::init(
216 pub_keys[node_id],
217 priv_key,
218 state_priv_keys[node_id].clone(),
219 node_id as u64,
220 config,
221 memberships,
222 network,
223 HotShotInitializer::from_genesis::<V>(
224 TestInstanceState::default(),
225 0,
226 0,
227 vec![],
228 )
229 .await
230 .unwrap(),
231 ConsensusMetricsValue::new(&*data_source.populate_metrics()),
232 hs_storage,
233 StorageMetricsValue::new(&*data_source.populate_metrics()),
234 )
235 .await
236 .unwrap()
237 .0;
238
239 MockNode {
240 hotshot,
241 data_source,
242 storage,
243 }
244 }
245 .instrument(span)
246 }),
247 )
248 .await;
249
250 builder_task.start(Box::new(nodes[0].hotshot.event_stream()));
253
254 let mut network = Self {
255 nodes,
256 pub_keys,
257 tasks: Default::default(),
258 };
259 D::setup(&mut network).await;
260 network
261 }
262}
263
264impl<D: DataSourceLifeCycle, V: Versions> MockNetwork<D, V> {
265 pub fn handle(&self) -> &SystemContextHandle<MockTypes, MockNodeImpl, V> {
266 &self.nodes[0].hotshot
267 }
268
269 pub async fn submit_transaction(&self, tx: MockTransaction) {
270 self.handle().submit_transaction(tx).await.unwrap();
271 }
272
273 pub fn num_nodes(&self) -> usize {
274 self.pub_keys.len()
275 }
276
277 pub fn proposer(&self, i: usize) -> SignatureKey<MockTypes> {
278 self.pub_keys[i]
279 }
280
281 pub fn data_source_index(&self, i: usize) -> D {
282 self.nodes[i].data_source.clone()
283 }
284
285 pub fn data_source(&self) -> D {
286 self.data_source_index(0)
287 }
288
289 pub fn storage(&self) -> &D::Storage {
290 &self.nodes[0].storage
291 }
292
293 pub fn spawn(&mut self, name: impl Display, task: impl Future + Send + 'static) {
294 self.tasks.push(BackgroundTask::spawn(name, task));
295 }
296
297 pub async fn shut_down(mut self) {
298 self.shut_down_impl().await
299 }
300
301 async fn shut_down_impl(&mut self) {
302 for node in &mut self.nodes {
303 node.hotshot.shut_down().await;
304 }
305 }
306
307 pub fn epoch_height(&self) -> u64 {
308 EPOCH_HEIGHT
309 }
310}
311
312impl<D: DataSourceLifeCycle, V: Versions> MockNetwork<D, V> {
313 pub async fn start(&mut self) {
314 for (i, node) in self.nodes.iter_mut().enumerate() {
316 let ds = node.data_source.clone();
317 let mut events = node.hotshot.event_stream();
318 self.tasks.push(BackgroundTask::spawn(
319 format!("update node {i}"),
320 async move {
321 while let Some(event) = events.next().await {
322 tracing::info!(node = i, event = ?event.event, "EVENT");
323 {
324 ds.handle_event(&event).await;
325 }
326 yield_now().await;
327 }
328 },
329 ));
330 }
331
332 join_all(
333 self.nodes
334 .iter()
335 .map(|node| node.hotshot.hotshot.start_consensus()),
336 )
337 .await;
338 }
339}
340
341impl<D: DataSourceLifeCycle, V: Versions> Drop for MockNetwork<D, V> {
342 fn drop(&mut self) {
343 if let Ok(handle) = Handle::try_current() {
344 block_in_place(move || handle.block_on(self.shut_down_impl()));
345 }
346 }
347}
348
349#[async_trait]
350pub trait DataSourceLifeCycle: Clone + Send + Sync + Sized + 'static {
351 type Storage: Send + Sync;
356
357 async fn create(node_id: usize) -> Self::Storage;
358 async fn connect(storage: &Self::Storage) -> Self;
359 async fn reset(storage: &Self::Storage) -> Self;
360 async fn handle_event(&self, event: &Event<MockTypes>);
361 async fn leaf_only_ds(_storage: &Self::Storage) -> Self {
362 panic!("not supported")
363 }
364
365 async fn setup<V: Versions>(_network: &mut MockNetwork<Self, V>) {}
367}
368
369pub trait TestableDataSource:
370 DataSourceLifeCycle
371 + AvailabilityDataSource<MockTypes>
372 + UpdateAvailabilityData<MockTypes>
373 + NodeDataSource<MockTypes>
374 + StatusDataSource
375 + VersionedDataSource
376{
377}
378
379impl<T> TestableDataSource for T where
380 T: DataSourceLifeCycle
381 + AvailabilityDataSource<MockTypes>
382 + UpdateAvailabilityData<MockTypes>
383 + NodeDataSource<MockTypes>
384 + StatusDataSource
385 + VersionedDataSource
386{
387}