1use std::{
8 collections::{BTreeMap, HashMap},
9 sync::Arc,
10};
11
12use async_broadcast::broadcast;
13use async_lock::RwLock;
14use async_trait::async_trait;
15use futures::future::join_all;
16use hotshot::{
17 traits::TestableNodeImplementation, types::EventType, HotShotInitializer, InitializerEpochInfo,
18 SystemContext,
19};
20use hotshot_example_types::{
21 block_types::TestBlockHeader,
22 state_types::{TestInstanceState, TestValidatedState},
23 storage_types::TestStorage,
24 testable_delay::DelayConfig,
25};
26use hotshot_types::{
27 constants::EVENT_CHANNEL_SIZE,
28 data::Leaf2,
29 event::Event,
30 message::convert_proposal,
31 simple_certificate::{
32 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
33 },
34 traits::{
35 election::Membership,
36 network::{AsyncGenerator, ConnectedNetwork},
37 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
38 },
39 utils::genesis_epoch_from_version,
40 vote::HasViewNumber,
41 ValidatorConfig,
42};
43use hotshot_utils::anytrace::*;
44
45use crate::{
46 node_stake::TestNodeStakes,
47 test_launcher::Network,
48 test_runner::{LateNodeContext, LateNodeContextParameters, LateStartNode, Node, TestRunner},
49 test_task::{TestResult, TestTaskState},
50};
51
52pub type StateAndBlock<S, B> = (Vec<S>, Vec<B>);
54
55pub struct SpinningTask<
57 TYPES: NodeType,
58 N: ConnectedNetwork<TYPES::SignatureKey>,
59 I: TestableNodeImplementation<TYPES>,
60 V: Versions,
61> {
62 pub epoch_height: u64,
64 pub epoch_start_block: u64,
66 pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
68 pub(crate) handles: Arc<RwLock<Vec<Node<TYPES, I, V>>>>,
70 pub(crate) late_start: HashMap<u64, LateStartNode<TYPES, I, V>>,
72 pub(crate) changes: BTreeMap<TYPES::View, Vec<ChangeNode>>,
74 pub(crate) latest_view: Option<TYPES::View>,
76 pub(crate) last_decided_leaf: Leaf2<TYPES>,
78 pub(crate) high_qc: QuorumCertificate2<TYPES>,
80 pub(crate) next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
82 pub(crate) async_delay_config: HashMap<u64, DelayConfig>,
84 pub(crate) restart_contexts: HashMap<usize, RestartContext<TYPES, N, I, V>>,
86 pub(crate) channel_generator: AsyncGenerator<Network<TYPES, I>>,
88 pub(crate) state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
90 pub(crate) node_stakes: TestNodeStakes,
92}
93
94#[async_trait]
95impl<
96 TYPES: NodeType<
97 InstanceState = TestInstanceState,
98 ValidatedState = TestValidatedState,
99 BlockHeader = TestBlockHeader,
100 >,
101 I: TestableNodeImplementation<TYPES>,
102 N: ConnectedNetwork<TYPES::SignatureKey>,
103 V: Versions,
104 > TestTaskState for SpinningTask<TYPES, N, I, V>
105where
106 I: TestableNodeImplementation<TYPES>,
107 I: NodeImplementation<TYPES, Network = N, Storage = TestStorage<TYPES>>,
108 <TYPES as NodeType>::Membership: Membership<TYPES, Storage = TestStorage<TYPES>>,
109{
110 type Event = Event<TYPES>;
111 type Error = Error;
112
113 async fn handle_event(&mut self, (message, _id): (Self::Event, usize)) -> Result<()> {
114 let Event { view_number, event } = message;
115
116 if let EventType::Decide { leaf_chain, .. } = event {
117 let leaf = leaf_chain.first().unwrap().leaf.clone();
118 if leaf.view_number() > self.last_decided_leaf.view_number() {
119 self.last_decided_leaf = leaf;
120 }
121 } else if let EventType::QuorumProposal {
122 proposal,
123 sender: _,
124 } = event
125 {
126 if proposal.data.justify_qc().view_number() > self.high_qc.view_number() {
127 self.high_qc = proposal.data.justify_qc().clone();
128 }
129 } else if let EventType::ViewTimeout { view_number } = event {
130 tracing::error!("View timeout for view {view_number}");
131 }
132
133 let mut new_nodes = vec![];
134 let mut new_networks = vec![];
135 if self.latest_view.is_none() || view_number > self.latest_view.unwrap() {
137 if let Some(operations) = self.changes.remove(&view_number) {
139 for ChangeNode { idx, updown } in operations {
140 match updown {
141 NodeAction::Up => {
142 let node_id = idx.try_into().unwrap();
143 if let Some(node) = self.late_start.remove(&node_id) {
144 tracing::error!("Node {idx} spinning up late");
145 let network = if let Some(network) = node.network {
146 network
147 } else {
148 let generated_network = (self.channel_generator)(node_id).await;
149 generated_network.wait_for_ready().await;
150 generated_network
151 };
152 let node_id = idx.try_into().unwrap();
153 let context = match node.context {
154 LateNodeContext::InitializedContext(context) => context,
155 LateNodeContext::UninitializedContext(late_context_params) => {
158 let LateNodeContextParameters {
159 storage,
160 memberships,
161 config,
162 } = late_context_params;
163
164 let initializer = HotShotInitializer::<TYPES>::load(
165 TestInstanceState::new(
166 self.async_delay_config
167 .get(&node_id)
168 .cloned()
169 .unwrap_or_default(),
170 ),
171 self.epoch_height,
172 self.epoch_start_block,
173 self.start_epoch_info.clone(),
174 self.last_decided_leaf.clone(),
175 (
176 TYPES::View::genesis(),
177 genesis_epoch_from_version::<V, TYPES>(),
178 ),
179 (self.high_qc.clone(), self.next_epoch_high_qc.clone()),
180 TYPES::View::genesis(),
181 BTreeMap::new(),
182 BTreeMap::new(),
183 None,
184 self.state_cert.clone(),
185 );
186 let validator_config =
188 ValidatorConfig::generated_from_seed_indexed(
189 [0u8; 32],
190 node_id,
191 self.node_stakes.get(node_id),
192 node_id < config.da_staked_committee_size as u64,
194 );
195
196 TestRunner::add_node_with_config(
197 node_id,
198 network.clone(),
199 memberships,
200 initializer,
201 config,
202 validator_config,
203 storage,
204 )
205 .await
206 },
207 LateNodeContext::Restart => {
208 panic!("Cannot spin up a node with Restart context")
209 },
210 };
211
212 let handle = context.run_tasks().await;
213
214 let node = Node {
218 node_id,
219 network,
220 handle,
221 };
222 node.handle.hotshot.start_consensus().await;
223
224 self.handles.write().await.push(node);
225 }
226 },
227 NodeAction::Down => {
228 if let Some(node) = self.handles.write().await.get_mut(idx) {
229 tracing::error!("Node {idx} shutting down in view {view_number}");
230 node.handle.shut_down().await;
231 }
232 },
233 NodeAction::RestartDown(delay_views) => {
234 let node_id = idx.try_into().unwrap();
235 if let Some(node) = self.handles.write().await.get_mut(idx) {
236 tracing::error!("Node {idx} shutting down in view {view_number}");
237 node.handle.shut_down().await;
238 let generated_network = (self.channel_generator)(node_id).await;
240
241 let Some(LateStartNode {
242 network: _,
243 context: LateNodeContext::Restart,
244 }) = self.late_start.get(&node_id)
245 else {
246 panic!("Restarted Nodes must have an uninitialized context");
247 };
248
249 let storage = node.handle.storage().clone();
250
251 let membership = Arc::new(RwLock::new(
252 <TYPES as NodeType>::Membership::new::<I>(
253 node.handle.hotshot.config.known_nodes_with_stake.clone(),
254 node.handle.hotshot.config.known_da_nodes.clone(),
255 node.handle.storage().clone(),
256 generated_network.clone(),
257 node.handle.public_key().clone(),
258 node.handle.hotshot.config.epoch_height,
259 ),
260 ));
261
262 let config = node.handle.hotshot.config.clone();
263
264 let next_epoch_high_qc = storage.next_epoch_high_qc_cloned().await;
265 let start_view = storage.restart_view().await;
266 let last_actioned_view = storage.last_actioned_view().await;
267 let start_epoch = storage.last_actioned_epoch().await;
268 let high_qc = storage.high_qc_cloned().await.unwrap_or(
269 QuorumCertificate2::genesis::<V>(
270 &TestValidatedState::default(),
271 &TestInstanceState::default(),
272 )
273 .await,
274 );
275 let state_cert = storage.state_cert_cloned().await;
276 let saved_proposals = storage.proposals_cloned().await;
277 let mut vid_shares = BTreeMap::new();
278 for (view, hash_map) in storage.vids_cloned().await {
279 let mut converted_hash_map = HashMap::new();
280 for (key, proposal) in hash_map {
281 converted_hash_map
282 .entry(key)
283 .or_insert_with(BTreeMap::new)
284 .insert(
285 proposal.data.target_epoch,
286 convert_proposal(proposal),
287 );
288 }
289 vid_shares.insert(view, converted_hash_map);
290 }
291 let decided_upgrade_certificate =
292 storage.decided_upgrade_certificate().await;
293
294 let initializer = HotShotInitializer::<TYPES>::load(
295 TestInstanceState::new(
296 self.async_delay_config
297 .get(&node_id)
298 .cloned()
299 .unwrap_or_default(),
300 ),
301 self.epoch_height,
302 self.epoch_start_block,
303 self.start_epoch_info.clone(),
304 self.last_decided_leaf.clone(),
305 (start_view, start_epoch),
306 (high_qc, next_epoch_high_qc),
307 last_actioned_view,
308 saved_proposals,
309 vid_shares,
310 decided_upgrade_certificate,
311 state_cert,
312 );
313 let validator_config = ValidatorConfig::generated_from_seed_indexed(
315 [0u8; 32],
316 node_id,
317 self.node_stakes.get(node_id),
318 node_id < config.da_staked_committee_size as u64,
320 );
321 let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
322 let context =
323 TestRunner::<TYPES, I, V, N>::add_node_with_config_and_channels(
324 node_id,
325 generated_network.clone(),
326 Arc::clone(&membership),
327 initializer,
328 config,
329 validator_config,
330 storage.clone(),
331 internal_chan,
332 (
333 node.handle.external_channel_sender(),
334 node.handle.event_stream_known_impl().new_receiver(),
335 ),
336 )
337 .await;
338 tracing::info!(
339 "Node {} restarting in view {} with start view {}",
340 idx,
341 view_number + delay_views,
342 start_view
343 );
344 if delay_views == 0 {
345 new_nodes.push((context, idx));
346 new_networks.push(generated_network.clone());
347 } else {
348 let up_view = view_number + delay_views;
349 let change = ChangeNode {
350 idx,
351 updown: NodeAction::RestartUp,
352 };
353 self.changes.entry(up_view).or_default().push(change);
354 let new_ctx = RestartContext {
355 context,
356 network: generated_network.clone(),
357 };
358 self.restart_contexts.insert(idx, new_ctx);
359 }
360 }
361 },
362 NodeAction::RestartUp => {
363 if let Some(ctx) = self.restart_contexts.remove(&idx) {
364 new_nodes.push((ctx.context, idx));
365 new_networks.push(ctx.network.clone());
366 }
367 },
368 NodeAction::NetworkUp => {
369 if let Some(handle) = self.handles.write().await.get(idx) {
370 tracing::error!("Node {idx} networks resuming");
371 handle.network.resume();
372 }
373 },
374 NodeAction::NetworkDown => {
375 if let Some(handle) = self.handles.write().await.get(idx) {
376 tracing::error!("Node {idx} networks pausing");
377 handle.network.pause();
378 }
379 },
380 }
381 }
382 }
383 let mut ready_futs = vec![];
384 while let Some(net) = new_networks.pop() {
385 ready_futs.push(async move {
386 net.wait_for_ready().await;
387 });
388 }
389 join_all(ready_futs).await;
390
391 let mut start_futs = vec![];
392
393 while let Some((node, id)) = new_nodes.pop() {
394 let handles = self.handles.clone();
395 let fut = async move {
396 tracing::info!("Starting node {id} back up");
397 node.network.wait_for_ready().await;
398 let handle = node.run_tasks().await;
399
400 let node = Node {
404 node_id: id.try_into().unwrap(),
405 network: node.network.clone(),
406 handle,
407 };
408 node.handle.hotshot.start_consensus().await;
409
410 handles.write().await[id] = node;
411 };
412 start_futs.push(fut);
413 }
414 if !start_futs.is_empty() {
415 join_all(start_futs).await;
416 tracing::info!("Nodes all started");
417 }
418
419 self.latest_view = Some(view_number);
421 }
422
423 Ok(())
424 }
425
426 async fn check(&self) -> TestResult {
427 TestResult::Pass
428 }
429}
430
431#[derive(Clone)]
432pub(crate) struct RestartContext<
433 TYPES: NodeType,
434 N: ConnectedNetwork<TYPES::SignatureKey>,
435 I: TestableNodeImplementation<TYPES>,
436 V: Versions,
437> {
438 context: Arc<SystemContext<TYPES, I, V>>,
439 network: Arc<N>,
440}
441
442#[derive(Clone, Debug)]
444pub enum NodeAction {
445 Up,
447 Down,
449 NetworkUp,
451 NetworkDown,
453 RestartDown(u64),
455 RestartUp,
458}
459
460#[derive(Clone, Debug)]
462pub struct ChangeNode {
463 pub idx: usize,
465 pub updown: NodeAction,
467}
468
469#[derive(Clone, Debug)]
472pub struct SpinningTaskDescription {
473 pub node_changes: Vec<(u64, Vec<ChangeNode>)>,
475}