hotshot_testing/block_builder/
random.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::HashMap,
9    num::NonZeroUsize,
10    ops::Deref,
11    sync::{
12        atomic::{AtomicBool, Ordering},
13        Arc,
14    },
15    time::Duration,
16};
17
18use async_broadcast::{broadcast, Sender};
19use async_lock::RwLock;
20use async_trait::async_trait;
21use futures::{future::BoxFuture, Stream, StreamExt};
22use hotshot::types::{Event, EventType, SignatureKey};
23use hotshot_builder_api::{
24    v0_1::{
25        block_info::{AvailableBlockData, AvailableBlockInfo},
26        builder::BuildError,
27        data_source::BuilderDataSource,
28    },
29    v0_2::block_info::AvailableBlockHeaderInputV1,
30};
31use hotshot_example_types::block_types::TestTransaction;
32use hotshot_types::{
33    data::VidCommitment,
34    network::RandomBuilderConfig,
35    traits::{node_implementation::NodeType, signature_key::BuilderSignatureKey},
36    utils::BuilderCommitment,
37};
38use lru::LruCache;
39use rand::{rngs::SmallRng, Rng, RngCore, SeedableRng};
40use tide_disco::{method::ReadState, Url};
41use tokio::{spawn, time::sleep};
42
43use super::{
44    build_block, run_builder_source_0_1, BlockEntry, BuilderTask, TestBuilderImplementation,
45};
46use crate::test_builder::BuilderChange;
47
48pub struct RandomBuilderImplementation;
49
50impl RandomBuilderImplementation {
51    pub async fn create<TYPES: NodeType<Transaction = TestTransaction>>(
52        num_nodes: usize,
53        config: RandomBuilderConfig,
54        changes: HashMap<u64, BuilderChange>,
55        change_sender: Sender<BuilderChange>,
56    ) -> (RandomBuilderTask<TYPES>, RandomBuilderSource<TYPES>)
57    where
58        <TYPES as NodeType>::InstanceState: Default,
59    {
60        let (pub_key, priv_key) =
61            TYPES::BuilderSignatureKey::generated_from_seed_indexed([1; 32], 0);
62        let blocks = Arc::new(RwLock::new(LruCache::new(NonZeroUsize::new(256).unwrap())));
63        let num_nodes = Arc::new(RwLock::new(num_nodes));
64        let source = RandomBuilderSource {
65            blocks: Arc::clone(&blocks),
66            pub_key: pub_key.clone(),
67            num_nodes: num_nodes.clone(),
68            should_fail_claims: Arc::new(AtomicBool::new(false)),
69        };
70        let task = RandomBuilderTask {
71            blocks,
72            config,
73            changes,
74            change_sender,
75            pub_key,
76            priv_key,
77        };
78        (task, source)
79    }
80}
81
82#[async_trait]
83impl<TYPES> TestBuilderImplementation<TYPES> for RandomBuilderImplementation
84where
85    TYPES: NodeType<Transaction = TestTransaction>,
86    <TYPES as NodeType>::InstanceState: Default,
87{
88    type Config = RandomBuilderConfig;
89
90    async fn start(
91        num_nodes: usize,
92        url: Url,
93        config: RandomBuilderConfig,
94        changes: HashMap<u64, BuilderChange>,
95    ) -> Box<dyn BuilderTask<TYPES>> {
96        let (change_sender, change_receiver) = broadcast(128);
97
98        let (task, source) = Self::create(num_nodes, config, changes, change_sender).await;
99        run_builder_source_0_1(url, change_receiver, source);
100        Box::new(task)
101    }
102}
103
104pub struct RandomBuilderTask<TYPES: NodeType<Transaction = TestTransaction>> {
105    config: RandomBuilderConfig,
106    changes: HashMap<u64, BuilderChange>,
107    change_sender: Sender<BuilderChange>,
108    pub_key: TYPES::BuilderSignatureKey,
109    priv_key: <TYPES::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey,
110    blocks: Arc<RwLock<LruCache<BuilderCommitment, BlockEntry<TYPES>>>>,
111}
112
113impl<TYPES: NodeType<Transaction = TestTransaction>> RandomBuilderTask<TYPES> {
114    async fn build_blocks(
115        options: RandomBuilderConfig,
116        pub_key: <TYPES as NodeType>::BuilderSignatureKey,
117        priv_key: <<TYPES as NodeType>::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey,
118        blocks: Arc<RwLock<LruCache<BuilderCommitment, BlockEntry<TYPES>>>>,
119    ) where
120        <TYPES as NodeType>::InstanceState: Default,
121    {
122        let mut rng = SmallRng::from_entropy();
123        let time_per_block = Duration::from_secs(1) / options.blocks_per_second;
124        loop {
125            let start = std::time::Instant::now();
126            let transactions: Vec<TestTransaction> = (0..options.txn_in_block)
127                .map(|_| {
128                    let mut bytes = vec![
129                        0;
130                        rng.gen_range(options.txn_size.clone())
131                            .try_into()
132                            .expect("We are NOT running on a 16-bit platform")
133                    ];
134                    rng.fill_bytes(&mut bytes);
135                    TestTransaction::new(bytes)
136                })
137                .collect();
138
139            // Let new VID scheme ship with Epochs upgrade.
140            let block = build_block::<TYPES>(transactions, pub_key.clone(), priv_key.clone()).await;
141
142            let push_result = blocks
143                .write()
144                .await
145                .push(block.metadata.block_hash.clone(), block);
146            if let Some((hash, _)) = push_result {
147                tracing::warn!("Block {hash} evicted");
148            };
149            if time_per_block < start.elapsed() {
150                tracing::warn!(
151                    "Can't keep up: last block built in {}ms, target time per block: {}",
152                    start.elapsed().as_millis(),
153                    time_per_block.as_millis(),
154                );
155            }
156            sleep(time_per_block.saturating_sub(start.elapsed())).await;
157        }
158    }
159}
160
161impl<TYPES: NodeType<Transaction = TestTransaction>> BuilderTask<TYPES> for RandomBuilderTask<TYPES>
162where
163    <TYPES as NodeType>::InstanceState: Default,
164{
165    fn start(
166        mut self: Box<Self>,
167        mut stream: Box<dyn Stream<Item = Event<TYPES>> + std::marker::Unpin + Send + 'static>,
168    ) {
169        let mut task = Some(spawn(Self::build_blocks(
170            self.config.clone(),
171            self.pub_key.clone(),
172            self.priv_key.clone(),
173            self.blocks.clone(),
174        )));
175
176        spawn(async move {
177            loop {
178                match stream.next().await {
179                    None => {
180                        break;
181                    },
182                    Some(evt) => {
183                        if let EventType::ViewFinished { view_number } = evt.event {
184                            if let Some(change) = self.changes.remove(&view_number) {
185                                match change {
186                                    BuilderChange::Up => {
187                                        if task.is_none() {
188                                            task = Some(spawn(Self::build_blocks(
189                                                self.config.clone(),
190                                                self.pub_key.clone(),
191                                                self.priv_key.clone(),
192                                                self.blocks.clone(),
193                                            )))
194                                        }
195                                    },
196                                    BuilderChange::Down => {
197                                        if let Some(handle) = task.take() {
198                                            handle.abort();
199                                        }
200                                    },
201                                    BuilderChange::FailClaims(_) => {},
202                                }
203                                let _ = self.change_sender.broadcast(change).await;
204                            }
205                        }
206                    },
207                }
208            }
209        });
210    }
211}
212
213/// A mock implementation of the builder data source.
214/// Builds random blocks, doesn't track HotShot state at all.
215/// Evicts old available blocks if HotShot doesn't keep up.
216#[derive(Clone, Debug)]
217pub struct RandomBuilderSource<TYPES: NodeType> {
218    /// Built blocks
219    blocks: Arc<
220        RwLock<
221            // Isn't strictly speaking used as a cache,
222            // just as a HashMap that evicts old blocks
223            LruCache<BuilderCommitment, BlockEntry<TYPES>>,
224        >,
225    >,
226    pub_key: TYPES::BuilderSignatureKey,
227    num_nodes: Arc<RwLock<usize>>,
228    should_fail_claims: Arc<AtomicBool>,
229}
230
231impl<TYPES> RandomBuilderSource<TYPES>
232where
233    TYPES: NodeType<Transaction = TestTransaction>,
234    <TYPES as NodeType>::InstanceState: Default,
235{
236    /// Create new [`RandomBuilderSource`]
237    #[must_use]
238    #[allow(clippy::missing_panics_doc)] // only panics if 256 == 0
239    pub fn new(pub_key: TYPES::BuilderSignatureKey, num_nodes: Arc<RwLock<usize>>) -> Self {
240        Self {
241            blocks: Arc::new(RwLock::new(LruCache::new(NonZeroUsize::new(256).unwrap()))),
242            pub_key,
243            num_nodes,
244            should_fail_claims: Arc::new(AtomicBool::new(false)),
245        }
246    }
247}
248
249#[async_trait]
250impl<TYPES: NodeType> ReadState for RandomBuilderSource<TYPES> {
251    type State = Self;
252
253    async fn read<T>(
254        &self,
255        op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait,
256    ) -> T {
257        op(self).await
258    }
259}
260
261#[async_trait]
262impl<TYPES: NodeType> BuilderDataSource<TYPES> for RandomBuilderSource<TYPES> {
263    async fn available_blocks(
264        &self,
265        _for_parent: &VidCommitment,
266        _view_number: u64,
267        _sender: TYPES::SignatureKey,
268        _signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
269    ) -> Result<Vec<AvailableBlockInfo<TYPES>>, BuildError> {
270        Ok(self
271            .blocks
272            .deref()
273            .read()
274            .await
275            .iter()
276            .map(|(_, BlockEntry { metadata, .. })| metadata.clone())
277            .collect())
278    }
279
280    async fn claim_block(
281        &self,
282        block_hash: &BuilderCommitment,
283        _view_number: u64,
284        _sender: TYPES::SignatureKey,
285        _signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
286    ) -> Result<AvailableBlockData<TYPES>, BuildError> {
287        if self.should_fail_claims.load(Ordering::Relaxed) {
288            return Err(BuildError::Missing);
289        }
290
291        let mut blocks = self.blocks.write().await;
292        let entry = blocks.get_mut(block_hash).ok_or(BuildError::NotFound)?;
293        let payload = entry.payload.take().ok_or(BuildError::Missing)?;
294        // Check if header input is claimed as well, if yes, then evict block
295        if entry.header_input.is_none() {
296            blocks.pop(block_hash);
297        };
298        Ok(payload)
299    }
300
301    async fn claim_block_with_num_nodes(
302        &self,
303        block_hash: &BuilderCommitment,
304        view_number: u64,
305        sender: TYPES::SignatureKey,
306        signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
307        num_nodes: usize,
308    ) -> Result<AvailableBlockData<TYPES>, BuildError> {
309        *self.num_nodes.write().await = num_nodes;
310        self.claim_block(block_hash, view_number, sender, signature)
311            .await
312    }
313
314    async fn claim_block_header_input(
315        &self,
316        block_hash: &BuilderCommitment,
317        _view_number: u64,
318        _sender: TYPES::SignatureKey,
319        _signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
320    ) -> Result<AvailableBlockHeaderInputV1<TYPES>, BuildError> {
321        if self.should_fail_claims.load(Ordering::Relaxed) {
322            return Err(BuildError::Missing);
323        }
324
325        let mut blocks = self.blocks.write().await;
326        let entry = blocks.get_mut(block_hash).ok_or(BuildError::NotFound)?;
327        let header_input = entry.header_input.take().ok_or(BuildError::Missing)?;
328        // Check if payload is claimed as well, if yes, then evict block
329        if entry.payload.is_none() {
330            blocks.pop(block_hash);
331        };
332        Ok(header_input)
333    }
334
335    async fn builder_address(&self) -> Result<TYPES::BuilderSignatureKey, BuildError> {
336        Ok(self.pub_key.clone())
337    }
338}