hotshot_testing/
spinning_task.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap},
9    sync::Arc,
10};
11
12use async_broadcast::broadcast;
13use async_lock::RwLock;
14use async_trait::async_trait;
15use futures::future::join_all;
16use hotshot::{
17    HotShotInitializer, InitializerEpochInfo, SystemContext, traits::TestableNodeImplementation,
18    types::EventType,
19};
20use hotshot_example_types::{
21    block_types::TestBlockHeader,
22    state_types::{TestInstanceState, TestValidatedState},
23    storage_types::TestStorage,
24    testable_delay::DelayConfig,
25};
26use hotshot_types::{
27    ValidatorConfig,
28    constants::EVENT_CHANNEL_SIZE,
29    data::{Leaf2, ViewNumber},
30    event::Event,
31    message::convert_proposal,
32    simple_certificate::{
33        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
34    },
35    traits::{
36        election::Membership,
37        network::{AsyncGenerator, ConnectedNetwork},
38        node_implementation::{NodeImplementation, NodeType},
39    },
40    utils::genesis_epoch_from_version,
41    vote::HasViewNumber,
42};
43use hotshot_utils::anytrace::*;
44
45use crate::{
46    node_stake::TestNodeStakes,
47    test_launcher::Network,
48    test_runner::{LateNodeContext, LateNodeContextParameters, LateStartNode, Node, TestRunner},
49    test_task::{TestResult, TestTaskState},
50};
51
52/// convenience type for state and block
53pub type StateAndBlock<S, B> = (Vec<S>, Vec<B>);
54
55/// Spinning task state
56pub struct SpinningTask<
57    TYPES: NodeType,
58    N: ConnectedNetwork<TYPES::SignatureKey>,
59    I: TestableNodeImplementation<TYPES>,
60> {
61    /// epoch height
62    pub epoch_height: u64,
63    /// Epoch start block
64    pub epoch_start_block: u64,
65    /// Saved epoch information. This must be sorted ascending by epoch.
66    pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
67    /// handle to the nodes
68    pub(crate) handles: Arc<RwLock<Vec<Node<TYPES, I>>>>,
69    /// late start nodes
70    pub(crate) late_start: HashMap<u64, LateStartNode<TYPES, I>>,
71    /// time based changes
72    pub(crate) changes: BTreeMap<ViewNumber, Vec<ChangeNode>>,
73    /// most recent view seen by spinning task
74    pub(crate) latest_view: Option<ViewNumber>,
75    /// Last decided leaf that can be used as the anchor leaf to initialize the node.
76    pub(crate) last_decided_leaf: Leaf2<TYPES>,
77    /// Highest qc seen in the test for restarting nodes
78    pub(crate) high_qc: QuorumCertificate2<TYPES>,
79    /// Next epoch highest qc seen in the test for restarting nodes
80    pub(crate) next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
81    /// Add specified delay to async calls
82    pub(crate) async_delay_config: HashMap<u64, DelayConfig>,
83    /// Context stored for nodes to be restarted with
84    pub(crate) restart_contexts: HashMap<usize, RestartContext<TYPES, N, I>>,
85    /// Generate network channel for restart nodes
86    pub(crate) channel_generator: AsyncGenerator<Network<TYPES, I>>,
87    /// The light client state update certificate
88    pub(crate) state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
89    /// Node stakes
90    pub(crate) node_stakes: TestNodeStakes,
91    /// Configured version upgrade
92    pub(crate) upgrade: versions::Upgrade,
93}
94
95#[async_trait]
96impl<
97    TYPES: NodeType<
98            InstanceState = TestInstanceState,
99            ValidatedState = TestValidatedState,
100            BlockHeader = TestBlockHeader,
101        >,
102    I: TestableNodeImplementation<TYPES>,
103    N: ConnectedNetwork<TYPES::SignatureKey>,
104> TestTaskState for SpinningTask<TYPES, N, I>
105where
106    I: TestableNodeImplementation<TYPES>,
107    I: NodeImplementation<TYPES, Network = N, Storage = TestStorage<TYPES>>,
108    <TYPES as NodeType>::Membership: Membership<TYPES, Storage = TestStorage<TYPES>>,
109{
110    type Event = Event<TYPES>;
111    type Error = Error;
112
113    async fn handle_event(&mut self, (message, _id): (Self::Event, usize)) -> Result<()> {
114        let Event { view_number, event } = message;
115
116        if let EventType::Decide { leaf_chain, .. } = event {
117            let leaf = leaf_chain.first().unwrap().leaf.clone();
118            if leaf.view_number() > self.last_decided_leaf.view_number() {
119                self.last_decided_leaf = leaf;
120            }
121        } else if let EventType::QuorumProposal {
122            proposal,
123            sender: _,
124        } = event
125        {
126            if proposal.data.justify_qc().view_number() > self.high_qc.view_number() {
127                self.high_qc = proposal.data.justify_qc().clone();
128            }
129        } else if let EventType::ViewTimeout { view_number } = event {
130            tracing::error!("View timeout for view {view_number}");
131        }
132
133        let mut new_nodes = vec![];
134        let mut new_networks = vec![];
135        // if we have not seen this view before
136        if self.latest_view.is_none() || view_number > self.latest_view.unwrap() {
137            // perform operations on the nodes
138            if let Some(operations) = self.changes.remove(&view_number) {
139                for ChangeNode { idx, updown } in operations {
140                    match updown {
141                        NodeAction::Up => {
142                            let node_id = idx.try_into().unwrap();
143                            if let Some(node) = self.late_start.remove(&node_id) {
144                                tracing::error!("Node {idx} spinning up late");
145                                let network = if let Some(network) = node.network {
146                                    network
147                                } else {
148                                    let generated_network = (self.channel_generator)(node_id).await;
149                                    generated_network.wait_for_ready().await;
150                                    generated_network
151                                };
152                                let node_id = idx.try_into().unwrap();
153                                let context = match node.context {
154                                    LateNodeContext::InitializedContext(context) => context,
155                                    // Node not initialized. Initialize it
156                                    // based on the received leaf.
157                                    LateNodeContext::UninitializedContext(late_context_params) => {
158                                        let LateNodeContextParameters { storage, config } =
159                                            late_context_params;
160
161                                        // We assign node's public key and stake value rather than read from config file since it's a test
162                                        let validator_config: ValidatorConfig<TYPES> =
163                                            ValidatorConfig::generated_from_seed_indexed(
164                                                [0u8; 32],
165                                                node_id,
166                                                self.node_stakes.get(node_id),
167                                                // For tests, make the node DA based on its index
168                                                node_id < config.da_staked_committee_size as u64,
169                                            );
170
171                                        let memberships = <TYPES as NodeType>::Membership::new::<I>(
172                                            config.known_nodes_with_stake.clone(),
173                                            config.known_da_nodes.clone(),
174                                            storage.clone(),
175                                            network.clone(),
176                                            validator_config.public_key.clone(),
177                                            config.epoch_height,
178                                        );
179
180                                        let initializer = HotShotInitializer::<TYPES>::load(
181                                            TestInstanceState::new(
182                                                self.async_delay_config
183                                                    .get(&node_id)
184                                                    .cloned()
185                                                    .unwrap_or_default(),
186                                            ),
187                                            self.epoch_height,
188                                            self.epoch_start_block,
189                                            self.start_epoch_info.clone(),
190                                            self.last_decided_leaf.clone(),
191                                            (
192                                                ViewNumber::genesis(),
193                                                genesis_epoch_from_version(self.upgrade.base),
194                                            ),
195                                            (self.high_qc.clone(), self.next_epoch_high_qc.clone()),
196                                            ViewNumber::genesis(),
197                                            BTreeMap::new(),
198                                            BTreeMap::new(),
199                                            None,
200                                            self.state_cert.clone(),
201                                        );
202
203                                        TestRunner::add_node_with_config(
204                                            node_id,
205                                            network.clone(),
206                                            memberships,
207                                            initializer,
208                                            config,
209                                            self.upgrade,
210                                            validator_config,
211                                            storage,
212                                        )
213                                        .await
214                                    },
215                                    LateNodeContext::Restart => {
216                                        panic!("Cannot spin up a node with Restart context")
217                                    },
218                                };
219
220                                let handle = context.run_tasks().await;
221
222                                // Create the node and add it to the state, so we can shut them
223                                // down properly later to avoid the overflow error in the overall
224                                // safety task.
225                                let node = Node {
226                                    node_id,
227                                    network,
228                                    handle,
229                                };
230                                node.handle.hotshot.start_consensus().await;
231
232                                self.handles.write().await.push(node);
233                            }
234                        },
235                        NodeAction::Down => {
236                            if let Some(node) = self.handles.write().await.get_mut(idx) {
237                                tracing::error!("Node {idx} shutting down in view {view_number}");
238                                node.handle.shut_down().await;
239                            }
240                        },
241                        NodeAction::RestartDown(delay_views) => {
242                            let node_id = idx.try_into().unwrap();
243                            if let Some(node) = self.handles.write().await.get_mut(idx) {
244                                tracing::error!("Node {idx} shutting down in view {view_number}");
245                                node.handle.shut_down().await;
246                                // For restarted nodes generate the network on correct view
247                                let generated_network = (self.channel_generator)(node_id).await;
248
249                                let Some(LateStartNode {
250                                    network: _,
251                                    context: LateNodeContext::Restart,
252                                }) = self.late_start.get(&node_id)
253                                else {
254                                    panic!("Restarted Nodes must have an uninitialized context");
255                                };
256
257                                let storage = node.handle.storage().clone();
258
259                                let membership = Arc::new(RwLock::new(
260                                    <TYPES as NodeType>::Membership::new::<I>(
261                                        node.handle.hotshot.config.known_nodes_with_stake.clone(),
262                                        node.handle.hotshot.config.known_da_nodes.clone(),
263                                        node.handle.storage().clone(),
264                                        generated_network.clone(),
265                                        node.handle.public_key().clone(),
266                                        node.handle.hotshot.config.epoch_height,
267                                    ),
268                                ));
269
270                                let config = node.handle.hotshot.config.clone();
271
272                                let next_epoch_high_qc = storage.next_epoch_high_qc_cloned().await;
273                                let start_view = storage.restart_view().await;
274                                let last_actioned_view = storage.last_actioned_view().await;
275                                let start_epoch = storage.last_actioned_epoch().await;
276                                let high_qc = storage.high_qc_cloned().await.unwrap_or(
277                                    QuorumCertificate2::genesis(
278                                        &TestValidatedState::default(),
279                                        &TestInstanceState::default(),
280                                        self.upgrade,
281                                    )
282                                    .await,
283                                );
284                                let state_cert = storage.state_cert_cloned().await;
285                                let saved_proposals = storage.proposals_cloned().await;
286                                let mut vid_shares = BTreeMap::new();
287                                for (view, hash_map) in storage.vids_cloned().await {
288                                    let mut converted_hash_map = HashMap::new();
289                                    for (key, proposal) in hash_map {
290                                        converted_hash_map
291                                            .entry(key)
292                                            .or_insert_with(BTreeMap::new)
293                                            .insert(
294                                                proposal.data.target_epoch(),
295                                                convert_proposal(proposal),
296                                            );
297                                    }
298                                    vid_shares.insert(view, converted_hash_map);
299                                }
300                                let decided_upgrade_certificate =
301                                    storage.decided_upgrade_certificate().await;
302
303                                let initializer = HotShotInitializer::<TYPES>::load(
304                                    TestInstanceState::new(
305                                        self.async_delay_config
306                                            .get(&node_id)
307                                            .cloned()
308                                            .unwrap_or_default(),
309                                    ),
310                                    self.epoch_height,
311                                    self.epoch_start_block,
312                                    self.start_epoch_info.clone(),
313                                    self.last_decided_leaf.clone(),
314                                    (start_view, start_epoch),
315                                    (high_qc, next_epoch_high_qc),
316                                    last_actioned_view,
317                                    saved_proposals,
318                                    vid_shares,
319                                    decided_upgrade_certificate,
320                                    state_cert,
321                                );
322                                // We assign node's public key and stake value rather than read from config file since it's a test
323                                let validator_config = ValidatorConfig::generated_from_seed_indexed(
324                                    [0u8; 32],
325                                    node_id,
326                                    self.node_stakes.get(node_id),
327                                    // For tests, make the node DA based on its index
328                                    node_id < config.da_staked_committee_size as u64,
329                                );
330                                let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
331                                let context =
332                                    TestRunner::<TYPES, I, N>::add_node_with_config_and_channels(
333                                        node_id,
334                                        generated_network.clone(),
335                                        Arc::clone(&membership),
336                                        initializer,
337                                        config,
338                                        self.upgrade,
339                                        validator_config,
340                                        storage.clone(),
341                                        internal_chan,
342                                        (
343                                            node.handle.external_channel_sender(),
344                                            node.handle.event_stream_known_impl().new_receiver(),
345                                        ),
346                                    )
347                                    .await;
348                                tracing::info!(
349                                    "Node {} restarting in view {} with start view {}",
350                                    idx,
351                                    view_number + delay_views,
352                                    start_view
353                                );
354                                if delay_views == 0 {
355                                    new_nodes.push((context, idx));
356                                    new_networks.push(generated_network.clone());
357                                } else {
358                                    let up_view = view_number + delay_views;
359                                    let change = ChangeNode {
360                                        idx,
361                                        updown: NodeAction::RestartUp,
362                                    };
363                                    self.changes.entry(up_view).or_default().push(change);
364                                    let new_ctx = RestartContext {
365                                        context,
366                                        network: generated_network.clone(),
367                                    };
368                                    self.restart_contexts.insert(idx, new_ctx);
369                                }
370                            }
371                        },
372                        NodeAction::RestartUp => {
373                            if let Some(ctx) = self.restart_contexts.remove(&idx) {
374                                new_nodes.push((ctx.context, idx));
375                                new_networks.push(ctx.network.clone());
376                            }
377                        },
378                        NodeAction::NetworkUp => {
379                            if let Some(handle) = self.handles.write().await.get(idx) {
380                                tracing::error!("Node {idx} networks resuming");
381                                handle.network.resume();
382                            }
383                        },
384                        NodeAction::NetworkDown => {
385                            if let Some(handle) = self.handles.write().await.get(idx) {
386                                tracing::error!("Node {idx} networks pausing");
387                                handle.network.pause();
388                            }
389                        },
390                    }
391                }
392            }
393            let mut ready_futs = vec![];
394            while let Some(net) = new_networks.pop() {
395                ready_futs.push(async move {
396                    net.wait_for_ready().await;
397                });
398            }
399            join_all(ready_futs).await;
400
401            let mut start_futs = vec![];
402
403            while let Some((node, id)) = new_nodes.pop() {
404                let handles = self.handles.clone();
405                let fut = async move {
406                    tracing::info!("Starting node {id} back up");
407                    node.network.wait_for_ready().await;
408                    let handle = node.run_tasks().await;
409
410                    // Create the node and add it to the state, so we can shut them
411                    // down properly later to avoid the overflow error in the overall
412                    // safety task.
413                    let node = Node {
414                        node_id: id.try_into().unwrap(),
415                        network: node.network.clone(),
416                        handle,
417                    };
418                    node.handle.hotshot.start_consensus().await;
419
420                    handles.write().await[id] = node;
421                };
422                start_futs.push(fut);
423            }
424            if !start_futs.is_empty() {
425                join_all(start_futs).await;
426                tracing::info!("Nodes all started");
427            }
428
429            // update our latest view
430            self.latest_view = Some(view_number);
431        }
432
433        Ok(())
434    }
435
436    async fn check(&self) -> TestResult {
437        TestResult::Pass
438    }
439}
440
441#[derive(Clone)]
442pub(crate) struct RestartContext<
443    TYPES: NodeType,
444    N: ConnectedNetwork<TYPES::SignatureKey>,
445    I: TestableNodeImplementation<TYPES>,
446> {
447    context: Arc<SystemContext<TYPES, I>>,
448    network: Arc<N>,
449}
450
451/// Spin the node up or down
452#[derive(Clone, Debug)]
453pub enum NodeAction {
454    /// spin the node up
455    Up,
456    /// spin the node down
457    Down,
458    /// spin the node's network up
459    NetworkUp,
460    /// spin the node's network down
461    NetworkDown,
462    /// Take a node down to be restarted after a number of views
463    RestartDown(u64),
464    /// Start a node up again after it's been shutdown for restart.  This
465    /// should only be created following a `RestartDown`
466    RestartUp,
467}
468
469/// denotes a change in node state
470#[derive(Clone, Debug)]
471pub struct ChangeNode {
472    /// the index of the node
473    pub idx: usize,
474    /// spin the node or node's network up or down
475    pub updown: NodeAction,
476}
477
478/// description of the spinning task
479/// (used to build a spinning task)
480#[derive(Clone, Debug)]
481pub struct SpinningTaskDescription {
482    /// the changes in node status, time -> changes
483    pub node_changes: Vec<(u64, Vec<ChangeNode>)>,
484}