hotshot_testing/
txn_task.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Duration};
8
9use async_broadcast::Receiver;
10use async_lock::RwLock;
11use hotshot::traits::TestableNodeImplementation;
12use hotshot_types::{error::HotShotError, traits::node_implementation::NodeType};
13use rand::thread_rng;
14use tokio::{spawn, task::JoinHandle, time::sleep};
15
16use crate::{test_runner::Node, test_task::TestEvent};
17
18// the obvious idea here is to pass in a "stream" that completes every `n` seconds
19// the stream construction can definitely be fancier but that's the baseline idea
20
21/// state of task that decides when things are completed
22pub struct TxnTask<TYPES: NodeType, I: TestableNodeImplementation<TYPES>> {
23    // TODO should this be in a rwlock? Or maybe a similar abstraction to the registry is in order
24    /// Handles for all nodes.
25    pub handles: Arc<RwLock<Vec<Node<TYPES, I>>>>,
26    /// Optional index of the next node.
27    pub next_node_idx: Option<usize>,
28    /// time to wait between txns
29    pub duration: Duration,
30    /// Receiver for the shutdown signal from the testing harness
31    pub shutdown_chan: Receiver<TestEvent>,
32}
33
34impl<TYPES: NodeType, I: TestableNodeImplementation<TYPES>> TxnTask<TYPES, I> {
35    pub fn run(mut self) -> JoinHandle<()> {
36        spawn(async move {
37            loop {
38                sleep(self.duration).await;
39                if let Ok(TestEvent::Shutdown) = self.shutdown_chan.try_recv() {
40                    break;
41                }
42                self.submit_tx().await;
43            }
44        })
45    }
46    async fn submit_tx(&mut self) {
47        if let Some(idx) = self.next_node_idx {
48            let handles = &self.handles.read().await;
49            // submit to idx handle
50            // increment state
51            self.next_node_idx = Some((idx + 1) % handles.len());
52            match handles.get(idx) {
53                None => {
54                    tracing::error!("couldn't get node in txn task");
55                },
56                Some(node) => {
57                    // use rand::seq::IteratorRandom;
58                    // we're assuming all nodes have the same leaf.
59                    // If they don't match, this is probably fine since
60                    // it should be caught by an assertion (and the txn will be rejected anyway)
61                    let leaf = node.handle.decided_leaf().await;
62                    let txn = I::leaf_create_random_transaction(&leaf, &mut thread_rng(), 0);
63
64                    let res = node.handle.submit_transaction(txn.clone()).await;
65                    if let Err(HotShotError::InvalidState(e)) = res.as_ref()
66                        && (e.contains("Catchup already in progress")
67                            || e.contains("Starting catchup"))
68                    {
69                        return;
70                    }
71                    res.expect("Could not send transaction");
72                },
73            }
74        }
75    }
76}
77
78/// build the transaction task
79#[derive(Clone, Debug)]
80pub enum TxnTaskDescription {
81    /// submit transactions in a round robin style using
82    /// every `Duration` seconds
83    RoundRobinTimeBased(Duration),
84    /// TODO
85    DistributionBased, // others?
86}