hotshot_testing/
spinning_task.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap},
9    sync::Arc,
10};
11
12use async_broadcast::broadcast;
13use async_lock::RwLock;
14use async_trait::async_trait;
15use futures::future::join_all;
16use hotshot::{
17    traits::TestableNodeImplementation, types::EventType, HotShotInitializer, InitializerEpochInfo,
18    SystemContext,
19};
20use hotshot_example_types::{
21    block_types::TestBlockHeader,
22    state_types::{TestInstanceState, TestValidatedState},
23    storage_types::TestStorage,
24    testable_delay::DelayConfig,
25};
26use hotshot_types::{
27    constants::EVENT_CHANNEL_SIZE,
28    data::Leaf2,
29    event::Event,
30    message::convert_proposal,
31    simple_certificate::{
32        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
33    },
34    traits::{
35        election::Membership,
36        network::{AsyncGenerator, ConnectedNetwork},
37        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
38    },
39    utils::genesis_epoch_from_version,
40    vote::HasViewNumber,
41    ValidatorConfig,
42};
43use hotshot_utils::anytrace::*;
44
45use crate::{
46    node_stake::TestNodeStakes,
47    test_launcher::Network,
48    test_runner::{LateNodeContext, LateNodeContextParameters, LateStartNode, Node, TestRunner},
49    test_task::{TestResult, TestTaskState},
50};
51
52/// convenience type for state and block
53pub type StateAndBlock<S, B> = (Vec<S>, Vec<B>);
54
55/// Spinning task state
56pub struct SpinningTask<
57    TYPES: NodeType,
58    N: ConnectedNetwork<TYPES::SignatureKey>,
59    I: TestableNodeImplementation<TYPES>,
60    V: Versions,
61> {
62    /// epoch height
63    pub epoch_height: u64,
64    /// Epoch start block
65    pub epoch_start_block: u64,
66    /// Saved epoch information. This must be sorted ascending by epoch.
67    pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
68    /// handle to the nodes
69    pub(crate) handles: Arc<RwLock<Vec<Node<TYPES, I, V>>>>,
70    /// late start nodes
71    pub(crate) late_start: HashMap<u64, LateStartNode<TYPES, I, V>>,
72    /// time based changes
73    pub(crate) changes: BTreeMap<TYPES::View, Vec<ChangeNode>>,
74    /// most recent view seen by spinning task
75    pub(crate) latest_view: Option<TYPES::View>,
76    /// Last decided leaf that can be used as the anchor leaf to initialize the node.
77    pub(crate) last_decided_leaf: Leaf2<TYPES>,
78    /// Highest qc seen in the test for restarting nodes
79    pub(crate) high_qc: QuorumCertificate2<TYPES>,
80    /// Next epoch highest qc seen in the test for restarting nodes
81    pub(crate) next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
82    /// Add specified delay to async calls
83    pub(crate) async_delay_config: HashMap<u64, DelayConfig>,
84    /// Context stored for nodes to be restarted with
85    pub(crate) restart_contexts: HashMap<usize, RestartContext<TYPES, N, I, V>>,
86    /// Generate network channel for restart nodes
87    pub(crate) channel_generator: AsyncGenerator<Network<TYPES, I>>,
88    /// The light client state update certificate
89    pub(crate) state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
90    /// Node stakes
91    pub(crate) node_stakes: TestNodeStakes,
92}
93
94#[async_trait]
95impl<
96        TYPES: NodeType<
97            InstanceState = TestInstanceState,
98            ValidatedState = TestValidatedState,
99            BlockHeader = TestBlockHeader,
100        >,
101        I: TestableNodeImplementation<TYPES>,
102        N: ConnectedNetwork<TYPES::SignatureKey>,
103        V: Versions,
104    > TestTaskState for SpinningTask<TYPES, N, I, V>
105where
106    I: TestableNodeImplementation<TYPES>,
107    I: NodeImplementation<TYPES, Network = N, Storage = TestStorage<TYPES>>,
108    <TYPES as NodeType>::Membership: Membership<TYPES, Storage = TestStorage<TYPES>>,
109{
110    type Event = Event<TYPES>;
111    type Error = Error;
112
113    async fn handle_event(&mut self, (message, _id): (Self::Event, usize)) -> Result<()> {
114        let Event { view_number, event } = message;
115
116        if let EventType::Decide { leaf_chain, .. } = event {
117            let leaf = leaf_chain.first().unwrap().leaf.clone();
118            if leaf.view_number() > self.last_decided_leaf.view_number() {
119                self.last_decided_leaf = leaf;
120            }
121        } else if let EventType::QuorumProposal {
122            proposal,
123            sender: _,
124        } = event
125        {
126            if proposal.data.justify_qc().view_number() > self.high_qc.view_number() {
127                self.high_qc = proposal.data.justify_qc().clone();
128            }
129        } else if let EventType::ViewTimeout { view_number } = event {
130            tracing::error!("View timeout for view {view_number}");
131        }
132
133        let mut new_nodes = vec![];
134        let mut new_networks = vec![];
135        // if we have not seen this view before
136        if self.latest_view.is_none() || view_number > self.latest_view.unwrap() {
137            // perform operations on the nodes
138            if let Some(operations) = self.changes.remove(&view_number) {
139                for ChangeNode { idx, updown } in operations {
140                    match updown {
141                        NodeAction::Up => {
142                            let node_id = idx.try_into().unwrap();
143                            if let Some(node) = self.late_start.remove(&node_id) {
144                                tracing::error!("Node {idx} spinning up late");
145                                let network = if let Some(network) = node.network {
146                                    network
147                                } else {
148                                    let generated_network = (self.channel_generator)(node_id).await;
149                                    generated_network.wait_for_ready().await;
150                                    generated_network
151                                };
152                                let node_id = idx.try_into().unwrap();
153                                let context = match node.context {
154                                    LateNodeContext::InitializedContext(context) => context,
155                                    // Node not initialized. Initialize it
156                                    // based on the received leaf.
157                                    LateNodeContext::UninitializedContext(late_context_params) => {
158                                        let LateNodeContextParameters {
159                                            storage,
160                                            memberships,
161                                            config,
162                                        } = late_context_params;
163
164                                        let initializer = HotShotInitializer::<TYPES>::load(
165                                            TestInstanceState::new(
166                                                self.async_delay_config
167                                                    .get(&node_id)
168                                                    .cloned()
169                                                    .unwrap_or_default(),
170                                            ),
171                                            self.epoch_height,
172                                            self.epoch_start_block,
173                                            self.start_epoch_info.clone(),
174                                            self.last_decided_leaf.clone(),
175                                            (
176                                                TYPES::View::genesis(),
177                                                genesis_epoch_from_version::<V, TYPES>(),
178                                            ),
179                                            (self.high_qc.clone(), self.next_epoch_high_qc.clone()),
180                                            TYPES::View::genesis(),
181                                            BTreeMap::new(),
182                                            BTreeMap::new(),
183                                            None,
184                                            self.state_cert.clone(),
185                                        );
186                                        // We assign node's public key and stake value rather than read from config file since it's a test
187                                        let validator_config =
188                                            ValidatorConfig::generated_from_seed_indexed(
189                                                [0u8; 32],
190                                                node_id,
191                                                self.node_stakes.get(node_id),
192                                                // For tests, make the node DA based on its index
193                                                node_id < config.da_staked_committee_size as u64,
194                                            );
195
196                                        TestRunner::add_node_with_config(
197                                            node_id,
198                                            network.clone(),
199                                            memberships,
200                                            initializer,
201                                            config,
202                                            validator_config,
203                                            storage,
204                                        )
205                                        .await
206                                    },
207                                    LateNodeContext::Restart => {
208                                        panic!("Cannot spin up a node with Restart context")
209                                    },
210                                };
211
212                                let handle = context.run_tasks().await;
213
214                                // Create the node and add it to the state, so we can shut them
215                                // down properly later to avoid the overflow error in the overall
216                                // safety task.
217                                let node = Node {
218                                    node_id,
219                                    network,
220                                    handle,
221                                };
222                                node.handle.hotshot.start_consensus().await;
223
224                                self.handles.write().await.push(node);
225                            }
226                        },
227                        NodeAction::Down => {
228                            if let Some(node) = self.handles.write().await.get_mut(idx) {
229                                tracing::error!("Node {idx} shutting down in view {view_number}");
230                                node.handle.shut_down().await;
231                            }
232                        },
233                        NodeAction::RestartDown(delay_views) => {
234                            let node_id = idx.try_into().unwrap();
235                            if let Some(node) = self.handles.write().await.get_mut(idx) {
236                                tracing::error!("Node {idx} shutting down in view {view_number}");
237                                node.handle.shut_down().await;
238                                // For restarted nodes generate the network on correct view
239                                let generated_network = (self.channel_generator)(node_id).await;
240
241                                let Some(LateStartNode {
242                                    network: _,
243                                    context: LateNodeContext::Restart,
244                                }) = self.late_start.get(&node_id)
245                                else {
246                                    panic!("Restarted Nodes must have an uninitialized context");
247                                };
248
249                                let storage = node.handle.storage().clone();
250
251                                let membership = Arc::new(RwLock::new(
252                                    <TYPES as NodeType>::Membership::new::<I>(
253                                        node.handle.hotshot.config.known_nodes_with_stake.clone(),
254                                        node.handle.hotshot.config.known_da_nodes.clone(),
255                                        node.handle.storage().clone(),
256                                        generated_network.clone(),
257                                        node.handle.public_key().clone(),
258                                        node.handle.hotshot.config.epoch_height,
259                                    ),
260                                ));
261
262                                let config = node.handle.hotshot.config.clone();
263
264                                let next_epoch_high_qc = storage.next_epoch_high_qc_cloned().await;
265                                let start_view = storage.restart_view().await;
266                                let last_actioned_view = storage.last_actioned_view().await;
267                                let start_epoch = storage.last_actioned_epoch().await;
268                                let high_qc = storage.high_qc_cloned().await.unwrap_or(
269                                    QuorumCertificate2::genesis::<V>(
270                                        &TestValidatedState::default(),
271                                        &TestInstanceState::default(),
272                                    )
273                                    .await,
274                                );
275                                let state_cert = storage.state_cert_cloned().await;
276                                let saved_proposals = storage.proposals_cloned().await;
277                                let mut vid_shares = BTreeMap::new();
278                                for (view, hash_map) in storage.vids_cloned().await {
279                                    let mut converted_hash_map = HashMap::new();
280                                    for (key, proposal) in hash_map {
281                                        converted_hash_map
282                                            .entry(key)
283                                            .or_insert_with(BTreeMap::new)
284                                            .insert(
285                                                proposal.data.target_epoch,
286                                                convert_proposal(proposal),
287                                            );
288                                    }
289                                    vid_shares.insert(view, converted_hash_map);
290                                }
291                                let decided_upgrade_certificate =
292                                    storage.decided_upgrade_certificate().await;
293
294                                let initializer = HotShotInitializer::<TYPES>::load(
295                                    TestInstanceState::new(
296                                        self.async_delay_config
297                                            .get(&node_id)
298                                            .cloned()
299                                            .unwrap_or_default(),
300                                    ),
301                                    self.epoch_height,
302                                    self.epoch_start_block,
303                                    self.start_epoch_info.clone(),
304                                    self.last_decided_leaf.clone(),
305                                    (start_view, start_epoch),
306                                    (high_qc, next_epoch_high_qc),
307                                    last_actioned_view,
308                                    saved_proposals,
309                                    vid_shares,
310                                    decided_upgrade_certificate,
311                                    state_cert,
312                                );
313                                // We assign node's public key and stake value rather than read from config file since it's a test
314                                let validator_config = ValidatorConfig::generated_from_seed_indexed(
315                                    [0u8; 32],
316                                    node_id,
317                                    self.node_stakes.get(node_id),
318                                    // For tests, make the node DA based on its index
319                                    node_id < config.da_staked_committee_size as u64,
320                                );
321                                let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
322                                let context =
323                                    TestRunner::<TYPES, I, V, N>::add_node_with_config_and_channels(
324                                        node_id,
325                                        generated_network.clone(),
326                                        Arc::clone(&membership),
327                                        initializer,
328                                        config,
329                                        validator_config,
330                                        storage.clone(),
331                                        internal_chan,
332                                        (
333                                            node.handle.external_channel_sender(),
334                                            node.handle.event_stream_known_impl().new_receiver(),
335                                        ),
336                                    )
337                                    .await;
338                                tracing::info!(
339                                    "Node {} restarting in view {} with start view {}",
340                                    idx,
341                                    view_number + delay_views,
342                                    start_view
343                                );
344                                if delay_views == 0 {
345                                    new_nodes.push((context, idx));
346                                    new_networks.push(generated_network.clone());
347                                } else {
348                                    let up_view = view_number + delay_views;
349                                    let change = ChangeNode {
350                                        idx,
351                                        updown: NodeAction::RestartUp,
352                                    };
353                                    self.changes.entry(up_view).or_default().push(change);
354                                    let new_ctx = RestartContext {
355                                        context,
356                                        network: generated_network.clone(),
357                                    };
358                                    self.restart_contexts.insert(idx, new_ctx);
359                                }
360                            }
361                        },
362                        NodeAction::RestartUp => {
363                            if let Some(ctx) = self.restart_contexts.remove(&idx) {
364                                new_nodes.push((ctx.context, idx));
365                                new_networks.push(ctx.network.clone());
366                            }
367                        },
368                        NodeAction::NetworkUp => {
369                            if let Some(handle) = self.handles.write().await.get(idx) {
370                                tracing::error!("Node {idx} networks resuming");
371                                handle.network.resume();
372                            }
373                        },
374                        NodeAction::NetworkDown => {
375                            if let Some(handle) = self.handles.write().await.get(idx) {
376                                tracing::error!("Node {idx} networks pausing");
377                                handle.network.pause();
378                            }
379                        },
380                    }
381                }
382            }
383            let mut ready_futs = vec![];
384            while let Some(net) = new_networks.pop() {
385                ready_futs.push(async move {
386                    net.wait_for_ready().await;
387                });
388            }
389            join_all(ready_futs).await;
390
391            let mut start_futs = vec![];
392
393            while let Some((node, id)) = new_nodes.pop() {
394                let handles = self.handles.clone();
395                let fut = async move {
396                    tracing::info!("Starting node {id} back up");
397                    node.network.wait_for_ready().await;
398                    let handle = node.run_tasks().await;
399
400                    // Create the node and add it to the state, so we can shut them
401                    // down properly later to avoid the overflow error in the overall
402                    // safety task.
403                    let node = Node {
404                        node_id: id.try_into().unwrap(),
405                        network: node.network.clone(),
406                        handle,
407                    };
408                    node.handle.hotshot.start_consensus().await;
409
410                    handles.write().await[id] = node;
411                };
412                start_futs.push(fut);
413            }
414            if !start_futs.is_empty() {
415                join_all(start_futs).await;
416                tracing::info!("Nodes all started");
417            }
418
419            // update our latest view
420            self.latest_view = Some(view_number);
421        }
422
423        Ok(())
424    }
425
426    async fn check(&self) -> TestResult {
427        TestResult::Pass
428    }
429}
430
431#[derive(Clone)]
432pub(crate) struct RestartContext<
433    TYPES: NodeType,
434    N: ConnectedNetwork<TYPES::SignatureKey>,
435    I: TestableNodeImplementation<TYPES>,
436    V: Versions,
437> {
438    context: Arc<SystemContext<TYPES, I, V>>,
439    network: Arc<N>,
440}
441
442/// Spin the node up or down
443#[derive(Clone, Debug)]
444pub enum NodeAction {
445    /// spin the node up
446    Up,
447    /// spin the node down
448    Down,
449    /// spin the node's network up
450    NetworkUp,
451    /// spin the node's network down
452    NetworkDown,
453    /// Take a node down to be restarted after a number of views
454    RestartDown(u64),
455    /// Start a node up again after it's been shutdown for restart.  This
456    /// should only be created following a `RestartDown`
457    RestartUp,
458}
459
460/// denotes a change in node state
461#[derive(Clone, Debug)]
462pub struct ChangeNode {
463    /// the index of the node
464    pub idx: usize,
465    /// spin the node or node's network up or down
466    pub updown: NodeAction,
467}
468
469/// description of the spinning task
470/// (used to build a spinning task)
471#[derive(Clone, Debug)]
472pub struct SpinningTaskDescription {
473    /// the changes in node status, time -> changes
474    pub node_changes: Vec<(u64, Vec<ChangeNode>)>,
475}