hotshot_testing/
txn_task.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Duration};
8
9use async_broadcast::Receiver;
10use async_lock::RwLock;
11use hotshot::traits::TestableNodeImplementation;
12use hotshot_types::{
13    error::HotShotError,
14    traits::node_implementation::{NodeType, Versions},
15};
16use rand::thread_rng;
17use tokio::{spawn, task::JoinHandle, time::sleep};
18
19use crate::{test_runner::Node, test_task::TestEvent};
20
21// the obvious idea here is to pass in a "stream" that completes every `n` seconds
22// the stream construction can definitely be fancier but that's the baseline idea
23
24/// state of task that decides when things are completed
25pub struct TxnTask<TYPES: NodeType, I: TestableNodeImplementation<TYPES>, V: Versions> {
26    // TODO should this be in a rwlock? Or maybe a similar abstraction to the registry is in order
27    /// Handles for all nodes.
28    pub handles: Arc<RwLock<Vec<Node<TYPES, I, V>>>>,
29    /// Optional index of the next node.
30    pub next_node_idx: Option<usize>,
31    /// time to wait between txns
32    pub duration: Duration,
33    /// Receiver for the shutdown signal from the testing harness
34    pub shutdown_chan: Receiver<TestEvent>,
35}
36
37impl<TYPES: NodeType, I: TestableNodeImplementation<TYPES>, V: Versions> TxnTask<TYPES, I, V> {
38    pub fn run(mut self) -> JoinHandle<()> {
39        spawn(async move {
40            loop {
41                sleep(self.duration).await;
42                if let Ok(TestEvent::Shutdown) = self.shutdown_chan.try_recv() {
43                    break;
44                }
45                self.submit_tx().await;
46            }
47        })
48    }
49    async fn submit_tx(&mut self) {
50        if let Some(idx) = self.next_node_idx {
51            let handles = &self.handles.read().await;
52            // submit to idx handle
53            // increment state
54            self.next_node_idx = Some((idx + 1) % handles.len());
55            match handles.get(idx) {
56                None => {
57                    tracing::error!("couldn't get node in txn task");
58                },
59                Some(node) => {
60                    // use rand::seq::IteratorRandom;
61                    // we're assuming all nodes have the same leaf.
62                    // If they don't match, this is probably fine since
63                    // it should be caught by an assertion (and the txn will be rejected anyway)
64                    let leaf = node.handle.decided_leaf().await;
65                    let txn = I::leaf_create_random_transaction(&leaf, &mut thread_rng(), 0);
66
67                    let res = node.handle.submit_transaction(txn.clone()).await;
68                    if let Err(HotShotError::InvalidState(e)) = res.as_ref() {
69                        if e.contains("Catchup already in progress")
70                            || e.contains("Starting catchup")
71                        {
72                            return;
73                        }
74                    }
75                    res.expect("Could not send transaction");
76                },
77            }
78        }
79    }
80}
81
82/// build the transaction task
83#[derive(Clone, Debug)]
84pub enum TxnTaskDescription {
85    /// submit transactions in a round robin style using
86    /// every `Duration` seconds
87    RoundRobinTimeBased(Duration),
88    /// TODO
89    DistributionBased, // others?
90}