1use std::{
8 collections::{BTreeMap, HashMap},
9 sync::Arc,
10};
11
12use async_broadcast::broadcast;
13use async_lock::RwLock;
14use async_trait::async_trait;
15use futures::future::join_all;
16use hotshot::{
17 HotShotInitializer, InitializerEpochInfo, SystemContext, traits::TestableNodeImplementation,
18 types::EventType,
19};
20use hotshot_example_types::{
21 block_types::TestBlockHeader,
22 state_types::{TestInstanceState, TestValidatedState},
23 storage_types::TestStorage,
24 testable_delay::DelayConfig,
25};
26use hotshot_types::{
27 ValidatorConfig,
28 constants::EVENT_CHANNEL_SIZE,
29 data::{Leaf2, ViewNumber},
30 event::Event,
31 message::convert_proposal,
32 simple_certificate::{
33 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
34 },
35 traits::{
36 election::Membership,
37 network::{AsyncGenerator, ConnectedNetwork},
38 node_implementation::{NodeImplementation, NodeType},
39 },
40 utils::genesis_epoch_from_version,
41 vote::HasViewNumber,
42};
43use hotshot_utils::anytrace::*;
44
45use crate::{
46 node_stake::TestNodeStakes,
47 test_launcher::Network,
48 test_runner::{LateNodeContext, LateNodeContextParameters, LateStartNode, Node, TestRunner},
49 test_task::{TestResult, TestTaskState},
50};
51
52pub type StateAndBlock<S, B> = (Vec<S>, Vec<B>);
54
55pub struct SpinningTask<
57 TYPES: NodeType,
58 N: ConnectedNetwork<TYPES::SignatureKey>,
59 I: TestableNodeImplementation<TYPES>,
60> {
61 pub epoch_height: u64,
63 pub epoch_start_block: u64,
65 pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
67 pub(crate) handles: Arc<RwLock<Vec<Node<TYPES, I>>>>,
69 pub(crate) late_start: HashMap<u64, LateStartNode<TYPES, I>>,
71 pub(crate) changes: BTreeMap<ViewNumber, Vec<ChangeNode>>,
73 pub(crate) latest_view: Option<ViewNumber>,
75 pub(crate) last_decided_leaf: Leaf2<TYPES>,
77 pub(crate) high_qc: QuorumCertificate2<TYPES>,
79 pub(crate) next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
81 pub(crate) async_delay_config: HashMap<u64, DelayConfig>,
83 pub(crate) restart_contexts: HashMap<usize, RestartContext<TYPES, N, I>>,
85 pub(crate) channel_generator: AsyncGenerator<Network<TYPES, I>>,
87 pub(crate) state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
89 pub(crate) node_stakes: TestNodeStakes,
91 pub(crate) upgrade: versions::Upgrade,
93}
94
95#[async_trait]
96impl<
97 TYPES: NodeType<
98 InstanceState = TestInstanceState,
99 ValidatedState = TestValidatedState,
100 BlockHeader = TestBlockHeader,
101 >,
102 I: TestableNodeImplementation<TYPES>,
103 N: ConnectedNetwork<TYPES::SignatureKey>,
104> TestTaskState for SpinningTask<TYPES, N, I>
105where
106 I: TestableNodeImplementation<TYPES>,
107 I: NodeImplementation<TYPES, Network = N, Storage = TestStorage<TYPES>>,
108 <TYPES as NodeType>::Membership: Membership<TYPES, Storage = TestStorage<TYPES>>,
109{
110 type Event = Event<TYPES>;
111 type Error = Error;
112
113 async fn handle_event(&mut self, (message, _id): (Self::Event, usize)) -> Result<()> {
114 let Event { view_number, event } = message;
115
116 if let EventType::Decide { leaf_chain, .. } = event {
117 let leaf = leaf_chain.first().unwrap().leaf.clone();
118 if leaf.view_number() > self.last_decided_leaf.view_number() {
119 self.last_decided_leaf = leaf;
120 }
121 } else if let EventType::QuorumProposal {
122 proposal,
123 sender: _,
124 } = event
125 {
126 if proposal.data.justify_qc().view_number() > self.high_qc.view_number() {
127 self.high_qc = proposal.data.justify_qc().clone();
128 }
129 } else if let EventType::ViewTimeout { view_number } = event {
130 tracing::error!("View timeout for view {view_number}");
131 }
132
133 let mut new_nodes = vec![];
134 let mut new_networks = vec![];
135 if self.latest_view.is_none() || view_number > self.latest_view.unwrap() {
137 if let Some(operations) = self.changes.remove(&view_number) {
139 for ChangeNode { idx, updown } in operations {
140 match updown {
141 NodeAction::Up => {
142 let node_id = idx.try_into().unwrap();
143 if let Some(node) = self.late_start.remove(&node_id) {
144 tracing::error!("Node {idx} spinning up late");
145 let network = if let Some(network) = node.network {
146 network
147 } else {
148 let generated_network = (self.channel_generator)(node_id).await;
149 generated_network.wait_for_ready().await;
150 generated_network
151 };
152 let node_id = idx.try_into().unwrap();
153 let context = match node.context {
154 LateNodeContext::InitializedContext(context) => context,
155 LateNodeContext::UninitializedContext(late_context_params) => {
158 let LateNodeContextParameters { storage, config } =
159 late_context_params;
160
161 let validator_config: ValidatorConfig<TYPES> =
163 ValidatorConfig::generated_from_seed_indexed(
164 [0u8; 32],
165 node_id,
166 self.node_stakes.get(node_id),
167 node_id < config.da_staked_committee_size as u64,
169 );
170
171 let memberships = <TYPES as NodeType>::Membership::new::<I>(
172 config.known_nodes_with_stake.clone(),
173 config.known_da_nodes.clone(),
174 storage.clone(),
175 network.clone(),
176 validator_config.public_key.clone(),
177 config.epoch_height,
178 );
179
180 let initializer = HotShotInitializer::<TYPES>::load(
181 TestInstanceState::new(
182 self.async_delay_config
183 .get(&node_id)
184 .cloned()
185 .unwrap_or_default(),
186 ),
187 self.epoch_height,
188 self.epoch_start_block,
189 self.start_epoch_info.clone(),
190 self.last_decided_leaf.clone(),
191 (
192 ViewNumber::genesis(),
193 genesis_epoch_from_version(self.upgrade.base),
194 ),
195 (self.high_qc.clone(), self.next_epoch_high_qc.clone()),
196 ViewNumber::genesis(),
197 BTreeMap::new(),
198 BTreeMap::new(),
199 None,
200 self.state_cert.clone(),
201 );
202
203 TestRunner::add_node_with_config(
204 node_id,
205 network.clone(),
206 memberships,
207 initializer,
208 config,
209 self.upgrade,
210 validator_config,
211 storage,
212 )
213 .await
214 },
215 LateNodeContext::Restart => {
216 panic!("Cannot spin up a node with Restart context")
217 },
218 };
219
220 let handle = context.run_tasks().await;
221
222 let node = Node {
226 node_id,
227 network,
228 handle,
229 };
230 node.handle.hotshot.start_consensus().await;
231
232 self.handles.write().await.push(node);
233 }
234 },
235 NodeAction::Down => {
236 if let Some(node) = self.handles.write().await.get_mut(idx) {
237 tracing::error!("Node {idx} shutting down in view {view_number}");
238 node.handle.shut_down().await;
239 }
240 },
241 NodeAction::RestartDown(delay_views) => {
242 let node_id = idx.try_into().unwrap();
243 if let Some(node) = self.handles.write().await.get_mut(idx) {
244 tracing::error!("Node {idx} shutting down in view {view_number}");
245 node.handle.shut_down().await;
246 let generated_network = (self.channel_generator)(node_id).await;
248
249 let Some(LateStartNode {
250 network: _,
251 context: LateNodeContext::Restart,
252 }) = self.late_start.get(&node_id)
253 else {
254 panic!("Restarted Nodes must have an uninitialized context");
255 };
256
257 let storage = node.handle.storage().clone();
258
259 let membership = Arc::new(RwLock::new(
260 <TYPES as NodeType>::Membership::new::<I>(
261 node.handle.hotshot.config.known_nodes_with_stake.clone(),
262 node.handle.hotshot.config.known_da_nodes.clone(),
263 node.handle.storage().clone(),
264 generated_network.clone(),
265 node.handle.public_key().clone(),
266 node.handle.hotshot.config.epoch_height,
267 ),
268 ));
269
270 let config = node.handle.hotshot.config.clone();
271
272 let next_epoch_high_qc = storage.next_epoch_high_qc_cloned().await;
273 let start_view = storage.restart_view().await;
274 let last_actioned_view = storage.last_actioned_view().await;
275 let start_epoch = storage.last_actioned_epoch().await;
276 let high_qc = storage.high_qc_cloned().await.unwrap_or(
277 QuorumCertificate2::genesis(
278 &TestValidatedState::default(),
279 &TestInstanceState::default(),
280 self.upgrade,
281 )
282 .await,
283 );
284 let state_cert = storage.state_cert_cloned().await;
285 let saved_proposals = storage.proposals_cloned().await;
286 let mut vid_shares = BTreeMap::new();
287 for (view, hash_map) in storage.vids_cloned().await {
288 let mut converted_hash_map = HashMap::new();
289 for (key, proposal) in hash_map {
290 converted_hash_map
291 .entry(key)
292 .or_insert_with(BTreeMap::new)
293 .insert(
294 proposal.data.target_epoch(),
295 convert_proposal(proposal),
296 );
297 }
298 vid_shares.insert(view, converted_hash_map);
299 }
300 let decided_upgrade_certificate =
301 storage.decided_upgrade_certificate().await;
302
303 let initializer = HotShotInitializer::<TYPES>::load(
304 TestInstanceState::new(
305 self.async_delay_config
306 .get(&node_id)
307 .cloned()
308 .unwrap_or_default(),
309 ),
310 self.epoch_height,
311 self.epoch_start_block,
312 self.start_epoch_info.clone(),
313 self.last_decided_leaf.clone(),
314 (start_view, start_epoch),
315 (high_qc, next_epoch_high_qc),
316 last_actioned_view,
317 saved_proposals,
318 vid_shares,
319 decided_upgrade_certificate,
320 state_cert,
321 );
322 let validator_config = ValidatorConfig::generated_from_seed_indexed(
324 [0u8; 32],
325 node_id,
326 self.node_stakes.get(node_id),
327 node_id < config.da_staked_committee_size as u64,
329 );
330 let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
331 let context =
332 TestRunner::<TYPES, I, N>::add_node_with_config_and_channels(
333 node_id,
334 generated_network.clone(),
335 Arc::clone(&membership),
336 initializer,
337 config,
338 self.upgrade,
339 validator_config,
340 storage.clone(),
341 internal_chan,
342 (
343 node.handle.external_channel_sender(),
344 node.handle.event_stream_known_impl().new_receiver(),
345 ),
346 )
347 .await;
348 tracing::info!(
349 "Node {} restarting in view {} with start view {}",
350 idx,
351 view_number + delay_views,
352 start_view
353 );
354 if delay_views == 0 {
355 new_nodes.push((context, idx));
356 new_networks.push(generated_network.clone());
357 } else {
358 let up_view = view_number + delay_views;
359 let change = ChangeNode {
360 idx,
361 updown: NodeAction::RestartUp,
362 };
363 self.changes.entry(up_view).or_default().push(change);
364 let new_ctx = RestartContext {
365 context,
366 network: generated_network.clone(),
367 };
368 self.restart_contexts.insert(idx, new_ctx);
369 }
370 }
371 },
372 NodeAction::RestartUp => {
373 if let Some(ctx) = self.restart_contexts.remove(&idx) {
374 new_nodes.push((ctx.context, idx));
375 new_networks.push(ctx.network.clone());
376 }
377 },
378 NodeAction::NetworkUp => {
379 if let Some(handle) = self.handles.write().await.get(idx) {
380 tracing::error!("Node {idx} networks resuming");
381 handle.network.resume();
382 }
383 },
384 NodeAction::NetworkDown => {
385 if let Some(handle) = self.handles.write().await.get(idx) {
386 tracing::error!("Node {idx} networks pausing");
387 handle.network.pause();
388 }
389 },
390 }
391 }
392 }
393 let mut ready_futs = vec![];
394 while let Some(net) = new_networks.pop() {
395 ready_futs.push(async move {
396 net.wait_for_ready().await;
397 });
398 }
399 join_all(ready_futs).await;
400
401 let mut start_futs = vec![];
402
403 while let Some((node, id)) = new_nodes.pop() {
404 let handles = self.handles.clone();
405 let fut = async move {
406 tracing::info!("Starting node {id} back up");
407 node.network.wait_for_ready().await;
408 let handle = node.run_tasks().await;
409
410 let node = Node {
414 node_id: id.try_into().unwrap(),
415 network: node.network.clone(),
416 handle,
417 };
418 node.handle.hotshot.start_consensus().await;
419
420 handles.write().await[id] = node;
421 };
422 start_futs.push(fut);
423 }
424 if !start_futs.is_empty() {
425 join_all(start_futs).await;
426 tracing::info!("Nodes all started");
427 }
428
429 self.latest_view = Some(view_number);
431 }
432
433 Ok(())
434 }
435
436 async fn check(&self) -> TestResult {
437 TestResult::Pass
438 }
439}
440
441#[derive(Clone)]
442pub(crate) struct RestartContext<
443 TYPES: NodeType,
444 N: ConnectedNetwork<TYPES::SignatureKey>,
445 I: TestableNodeImplementation<TYPES>,
446> {
447 context: Arc<SystemContext<TYPES, I>>,
448 network: Arc<N>,
449}
450
451#[derive(Clone, Debug)]
453pub enum NodeAction {
454 Up,
456 Down,
458 NetworkUp,
460 NetworkDown,
462 RestartDown(u64),
464 RestartUp,
467}
468
469#[derive(Clone, Debug)]
471pub struct ChangeNode {
472 pub idx: usize,
474 pub updown: NodeAction,
476}
477
478#[derive(Clone, Debug)]
481pub struct SpinningTaskDescription {
482 pub node_changes: Vec<(u64, Vec<ChangeNode>)>,
484}