hotshot_testing/block_builder/
random.rs1use std::{
8 collections::HashMap,
9 num::NonZeroUsize,
10 ops::Deref,
11 sync::{
12 atomic::{AtomicBool, Ordering},
13 Arc,
14 },
15 time::Duration,
16};
17
18use async_broadcast::{broadcast, Sender};
19use async_lock::RwLock;
20use async_trait::async_trait;
21use futures::{future::BoxFuture, Stream, StreamExt};
22use hotshot::types::{Event, EventType, SignatureKey};
23use hotshot_builder_api::{
24 v0_1::{
25 block_info::{AvailableBlockData, AvailableBlockInfo},
26 builder::BuildError,
27 data_source::BuilderDataSource,
28 },
29 v0_2::block_info::AvailableBlockHeaderInputV1,
30};
31use hotshot_example_types::block_types::TestTransaction;
32use hotshot_types::{
33 data::VidCommitment,
34 network::RandomBuilderConfig,
35 traits::{node_implementation::NodeType, signature_key::BuilderSignatureKey},
36 utils::BuilderCommitment,
37};
38use lru::LruCache;
39use rand::{rngs::SmallRng, Rng, RngCore, SeedableRng};
40use tide_disco::{method::ReadState, Url};
41use tokio::{spawn, time::sleep};
42
43use super::{
44 build_block, run_builder_source_0_1, BlockEntry, BuilderTask, TestBuilderImplementation,
45};
46use crate::test_builder::BuilderChange;
47
48pub struct RandomBuilderImplementation;
49
50impl RandomBuilderImplementation {
51 pub async fn create<TYPES: NodeType<Transaction = TestTransaction>>(
52 num_nodes: usize,
53 config: RandomBuilderConfig,
54 changes: HashMap<u64, BuilderChange>,
55 change_sender: Sender<BuilderChange>,
56 ) -> (RandomBuilderTask<TYPES>, RandomBuilderSource<TYPES>)
57 where
58 <TYPES as NodeType>::InstanceState: Default,
59 {
60 let (pub_key, priv_key) =
61 TYPES::BuilderSignatureKey::generated_from_seed_indexed([1; 32], 0);
62 let blocks = Arc::new(RwLock::new(LruCache::new(NonZeroUsize::new(256).unwrap())));
63 let num_nodes = Arc::new(RwLock::new(num_nodes));
64 let source = RandomBuilderSource {
65 blocks: Arc::clone(&blocks),
66 pub_key: pub_key.clone(),
67 num_nodes: num_nodes.clone(),
68 should_fail_claims: Arc::new(AtomicBool::new(false)),
69 };
70 let task = RandomBuilderTask {
71 blocks,
72 config,
73 changes,
74 change_sender,
75 pub_key,
76 priv_key,
77 };
78 (task, source)
79 }
80}
81
82#[async_trait]
83impl<TYPES> TestBuilderImplementation<TYPES> for RandomBuilderImplementation
84where
85 TYPES: NodeType<Transaction = TestTransaction>,
86 <TYPES as NodeType>::InstanceState: Default,
87{
88 type Config = RandomBuilderConfig;
89
90 async fn start(
91 num_nodes: usize,
92 url: Url,
93 config: RandomBuilderConfig,
94 changes: HashMap<u64, BuilderChange>,
95 ) -> Box<dyn BuilderTask<TYPES>> {
96 let (change_sender, change_receiver) = broadcast(128);
97
98 let (task, source) = Self::create(num_nodes, config, changes, change_sender).await;
99 run_builder_source_0_1(url, change_receiver, source);
100 Box::new(task)
101 }
102}
103
104pub struct RandomBuilderTask<TYPES: NodeType<Transaction = TestTransaction>> {
105 config: RandomBuilderConfig,
106 changes: HashMap<u64, BuilderChange>,
107 change_sender: Sender<BuilderChange>,
108 pub_key: TYPES::BuilderSignatureKey,
109 priv_key: <TYPES::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey,
110 blocks: Arc<RwLock<LruCache<BuilderCommitment, BlockEntry<TYPES>>>>,
111}
112
113impl<TYPES: NodeType<Transaction = TestTransaction>> RandomBuilderTask<TYPES> {
114 async fn build_blocks(
115 options: RandomBuilderConfig,
116 pub_key: <TYPES as NodeType>::BuilderSignatureKey,
117 priv_key: <<TYPES as NodeType>::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey,
118 blocks: Arc<RwLock<LruCache<BuilderCommitment, BlockEntry<TYPES>>>>,
119 ) where
120 <TYPES as NodeType>::InstanceState: Default,
121 {
122 let mut rng = SmallRng::from_entropy();
123 let time_per_block = Duration::from_secs(1) / options.blocks_per_second;
124 loop {
125 let start = std::time::Instant::now();
126 let transactions: Vec<TestTransaction> = (0..options.txn_in_block)
127 .map(|_| {
128 let mut bytes = vec![
129 0;
130 rng.gen_range(options.txn_size.clone())
131 .try_into()
132 .expect("We are NOT running on a 16-bit platform")
133 ];
134 rng.fill_bytes(&mut bytes);
135 TestTransaction::new(bytes)
136 })
137 .collect();
138
139 let block = build_block::<TYPES>(transactions, pub_key.clone(), priv_key.clone()).await;
141
142 let push_result = blocks
143 .write()
144 .await
145 .push(block.metadata.block_hash.clone(), block);
146 if let Some((hash, _)) = push_result {
147 tracing::warn!("Block {hash} evicted");
148 };
149 if time_per_block < start.elapsed() {
150 tracing::warn!(
151 "Can't keep up: last block built in {}ms, target time per block: {}",
152 start.elapsed().as_millis(),
153 time_per_block.as_millis(),
154 );
155 }
156 sleep(time_per_block.saturating_sub(start.elapsed())).await;
157 }
158 }
159}
160
161impl<TYPES: NodeType<Transaction = TestTransaction>> BuilderTask<TYPES> for RandomBuilderTask<TYPES>
162where
163 <TYPES as NodeType>::InstanceState: Default,
164{
165 fn start(
166 mut self: Box<Self>,
167 mut stream: Box<dyn Stream<Item = Event<TYPES>> + std::marker::Unpin + Send + 'static>,
168 ) {
169 let mut task = Some(spawn(Self::build_blocks(
170 self.config.clone(),
171 self.pub_key.clone(),
172 self.priv_key.clone(),
173 self.blocks.clone(),
174 )));
175
176 spawn(async move {
177 loop {
178 match stream.next().await {
179 None => {
180 break;
181 },
182 Some(evt) => {
183 if let EventType::ViewFinished { view_number } = evt.event {
184 if let Some(change) = self.changes.remove(&view_number) {
185 match change {
186 BuilderChange::Up => {
187 if task.is_none() {
188 task = Some(spawn(Self::build_blocks(
189 self.config.clone(),
190 self.pub_key.clone(),
191 self.priv_key.clone(),
192 self.blocks.clone(),
193 )))
194 }
195 },
196 BuilderChange::Down => {
197 if let Some(handle) = task.take() {
198 handle.abort();
199 }
200 },
201 BuilderChange::FailClaims(_) => {},
202 }
203 let _ = self.change_sender.broadcast(change).await;
204 }
205 }
206 },
207 }
208 }
209 });
210 }
211}
212
213#[derive(Clone, Debug)]
217pub struct RandomBuilderSource<TYPES: NodeType> {
218 blocks: Arc<
220 RwLock<
221 LruCache<BuilderCommitment, BlockEntry<TYPES>>,
224 >,
225 >,
226 pub_key: TYPES::BuilderSignatureKey,
227 num_nodes: Arc<RwLock<usize>>,
228 should_fail_claims: Arc<AtomicBool>,
229}
230
231impl<TYPES> RandomBuilderSource<TYPES>
232where
233 TYPES: NodeType<Transaction = TestTransaction>,
234 <TYPES as NodeType>::InstanceState: Default,
235{
236 #[must_use]
238 #[allow(clippy::missing_panics_doc)] pub fn new(pub_key: TYPES::BuilderSignatureKey, num_nodes: Arc<RwLock<usize>>) -> Self {
240 Self {
241 blocks: Arc::new(RwLock::new(LruCache::new(NonZeroUsize::new(256).unwrap()))),
242 pub_key,
243 num_nodes,
244 should_fail_claims: Arc::new(AtomicBool::new(false)),
245 }
246 }
247}
248
249#[async_trait]
250impl<TYPES: NodeType> ReadState for RandomBuilderSource<TYPES> {
251 type State = Self;
252
253 async fn read<T>(
254 &self,
255 op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait,
256 ) -> T {
257 op(self).await
258 }
259}
260
261#[async_trait]
262impl<TYPES: NodeType> BuilderDataSource<TYPES> for RandomBuilderSource<TYPES> {
263 async fn available_blocks(
264 &self,
265 _for_parent: &VidCommitment,
266 _view_number: u64,
267 _sender: TYPES::SignatureKey,
268 _signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
269 ) -> Result<Vec<AvailableBlockInfo<TYPES>>, BuildError> {
270 Ok(self
271 .blocks
272 .deref()
273 .read()
274 .await
275 .iter()
276 .map(|(_, BlockEntry { metadata, .. })| metadata.clone())
277 .collect())
278 }
279
280 async fn claim_block(
281 &self,
282 block_hash: &BuilderCommitment,
283 _view_number: u64,
284 _sender: TYPES::SignatureKey,
285 _signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
286 ) -> Result<AvailableBlockData<TYPES>, BuildError> {
287 if self.should_fail_claims.load(Ordering::Relaxed) {
288 return Err(BuildError::Missing);
289 }
290
291 let mut blocks = self.blocks.write().await;
292 let entry = blocks.get_mut(block_hash).ok_or(BuildError::NotFound)?;
293 let payload = entry.payload.take().ok_or(BuildError::Missing)?;
294 if entry.header_input.is_none() {
296 blocks.pop(block_hash);
297 };
298 Ok(payload)
299 }
300
301 async fn claim_block_with_num_nodes(
302 &self,
303 block_hash: &BuilderCommitment,
304 view_number: u64,
305 sender: TYPES::SignatureKey,
306 signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
307 num_nodes: usize,
308 ) -> Result<AvailableBlockData<TYPES>, BuildError> {
309 *self.num_nodes.write().await = num_nodes;
310 self.claim_block(block_hash, view_number, sender, signature)
311 .await
312 }
313
314 async fn claim_block_header_input(
315 &self,
316 block_hash: &BuilderCommitment,
317 _view_number: u64,
318 _sender: TYPES::SignatureKey,
319 _signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
320 ) -> Result<AvailableBlockHeaderInputV1<TYPES>, BuildError> {
321 if self.should_fail_claims.load(Ordering::Relaxed) {
322 return Err(BuildError::Missing);
323 }
324
325 let mut blocks = self.blocks.write().await;
326 let entry = blocks.get_mut(block_hash).ok_or(BuildError::NotFound)?;
327 let header_input = entry.header_input.take().ok_or(BuildError::Missing)?;
328 if entry.payload.is_none() {
330 blocks.pop(block_hash);
331 };
332 Ok(header_input)
333 }
334
335 async fn builder_address(&self) -> Result<TYPES::BuilderSignatureKey, BuildError> {
336 Ok(self.pub_key.clone())
337 }
338}