hotshot_testing/
txn_task.rs1use std::{sync::Arc, time::Duration};
8
9use async_broadcast::Receiver;
10use async_lock::RwLock;
11use hotshot::traits::TestableNodeImplementation;
12use hotshot_types::{
13 error::HotShotError,
14 traits::node_implementation::{NodeType, Versions},
15};
16use rand::thread_rng;
17use tokio::{spawn, task::JoinHandle, time::sleep};
18
19use crate::{test_runner::Node, test_task::TestEvent};
20
21pub struct TxnTask<TYPES: NodeType, I: TestableNodeImplementation<TYPES>, V: Versions> {
26 pub handles: Arc<RwLock<Vec<Node<TYPES, I, V>>>>,
29 pub next_node_idx: Option<usize>,
31 pub duration: Duration,
33 pub shutdown_chan: Receiver<TestEvent>,
35}
36
37impl<TYPES: NodeType, I: TestableNodeImplementation<TYPES>, V: Versions> TxnTask<TYPES, I, V> {
38 pub fn run(mut self) -> JoinHandle<()> {
39 spawn(async move {
40 loop {
41 sleep(self.duration).await;
42 if let Ok(TestEvent::Shutdown) = self.shutdown_chan.try_recv() {
43 break;
44 }
45 self.submit_tx().await;
46 }
47 })
48 }
49 async fn submit_tx(&mut self) {
50 if let Some(idx) = self.next_node_idx {
51 let handles = &self.handles.read().await;
52 self.next_node_idx = Some((idx + 1) % handles.len());
55 match handles.get(idx) {
56 None => {
57 tracing::error!("couldn't get node in txn task");
58 },
59 Some(node) => {
60 let leaf = node.handle.decided_leaf().await;
65 let txn = I::leaf_create_random_transaction(&leaf, &mut thread_rng(), 0);
66
67 let res = node.handle.submit_transaction(txn.clone()).await;
68 if let Err(HotShotError::InvalidState(e)) = res.as_ref() {
69 if e.contains("Catchup already in progress")
70 || e.contains("Starting catchup")
71 {
72 return;
73 }
74 }
75 res.expect("Could not send transaction");
76 },
77 }
78 }
79 }
80}
81
82#[derive(Clone, Debug)]
84pub enum TxnTaskDescription {
85 RoundRobinTimeBased(Duration),
88 DistributionBased, }