hotshot_testing/block_builder/
simple.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::HashMap,
9    num::NonZeroUsize,
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc,
13    },
14    time::{Duration, Instant},
15};
16
17use async_broadcast::{broadcast, Sender};
18use async_lock::RwLock;
19use async_trait::async_trait;
20use committable::{Commitment, Committable};
21use futures::{future::BoxFuture, Stream, StreamExt};
22use hotshot::{
23    traits::BlockPayload,
24    types::{Event, EventType, SignatureKey},
25};
26use hotshot_builder_api::{
27    v0_1::{
28        self,
29        block_info::{AvailableBlockData, AvailableBlockInfo},
30        builder::{BuildError, Error, Options},
31    },
32    v0_2::block_info::AvailableBlockHeaderInputV1,
33};
34use hotshot_types::{
35    constants::LEGACY_BUILDER_MODULE,
36    data::VidCommitment,
37    traits::{
38        block_contents::BlockHeader, node_implementation::NodeType,
39        signature_key::BuilderSignatureKey,
40    },
41    utils::BuilderCommitment,
42};
43use lru::LruCache;
44use tide_disco::{method::ReadState, App, Url};
45use tokio::spawn;
46use vbs::version::StaticVersionType;
47
48use super::{build_block, run_builder_source, BlockEntry, BuilderTask, TestBuilderImplementation};
49use crate::test_builder::BuilderChange;
50
51pub struct SimpleBuilderImplementation;
52
53impl SimpleBuilderImplementation {
54    pub async fn create<TYPES: NodeType>(
55        num_nodes: usize,
56        changes: HashMap<u64, BuilderChange>,
57        change_sender: Sender<BuilderChange>,
58    ) -> (SimpleBuilderSource<TYPES>, SimpleBuilderTask<TYPES>) {
59        let (pub_key, priv_key) =
60            TYPES::BuilderSignatureKey::generated_from_seed_indexed([1; 32], 0);
61
62        let transactions = Arc::new(RwLock::new(HashMap::new()));
63        let blocks = Arc::new(RwLock::new(HashMap::new()));
64        let should_fail_claims = Arc::new(AtomicBool::new(false));
65
66        let source = SimpleBuilderSource {
67            pub_key,
68            priv_key,
69            transactions: transactions.clone(),
70            blocks: blocks.clone(),
71            num_nodes: Arc::new(RwLock::new(num_nodes)),
72            should_fail_claims: Arc::clone(&should_fail_claims),
73        };
74
75        let task = SimpleBuilderTask {
76            transactions,
77            blocks,
78            decided_transactions: LruCache::new(NonZeroUsize::new(u16::MAX.into()).expect("> 0")),
79            should_fail_claims,
80            change_sender,
81            changes,
82        };
83
84        (source, task)
85    }
86}
87
88#[async_trait]
89impl<TYPES: NodeType> TestBuilderImplementation<TYPES> for SimpleBuilderImplementation
90where
91    <TYPES as NodeType>::InstanceState: Default,
92{
93    type Config = ();
94
95    async fn start(
96        num_nodes: usize,
97        url: Url,
98        _config: Self::Config,
99        changes: HashMap<u64, BuilderChange>,
100    ) -> Box<dyn BuilderTask<TYPES>> {
101        let (change_sender, change_receiver) = broadcast(128);
102        let (source, task) = Self::create(num_nodes, changes, change_sender).await;
103        run_builder_source(url, change_receiver, source);
104
105        Box::new(task)
106    }
107}
108
109#[derive(Debug, Clone)]
110pub struct SimpleBuilderSource<TYPES: NodeType> {
111    pub_key: TYPES::BuilderSignatureKey,
112    priv_key: <TYPES::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey,
113    num_nodes: Arc<RwLock<usize>>,
114    #[allow(clippy::type_complexity)]
115    transactions: Arc<RwLock<HashMap<Commitment<TYPES::Transaction>, SubmittedTransaction<TYPES>>>>,
116    blocks: Arc<RwLock<HashMap<BuilderCommitment, BlockEntry<TYPES>>>>,
117    should_fail_claims: Arc<AtomicBool>,
118}
119
120#[async_trait]
121impl<TYPES: NodeType> ReadState for SimpleBuilderSource<TYPES> {
122    type State = Self;
123
124    async fn read<T>(
125        &self,
126        op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait,
127    ) -> T {
128        op(self).await
129    }
130}
131
132#[async_trait]
133impl<TYPES: NodeType> v0_1::data_source::BuilderDataSource<TYPES> for SimpleBuilderSource<TYPES>
134where
135    <TYPES as NodeType>::InstanceState: Default,
136{
137    async fn available_blocks(
138        &self,
139        _for_parent: &VidCommitment,
140        _view_number: u64,
141        _sender: TYPES::SignatureKey,
142        _signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
143    ) -> Result<Vec<AvailableBlockInfo<TYPES>>, BuildError> {
144        let transactions = self
145            .transactions
146            .read(|txns| {
147                Box::pin(async {
148                    txns.values()
149                        .filter(|txn| {
150                            // We want transactions that are either unclaimed, or claimed long ago
151                            // and thus probably not included, or they would've been decided on
152                            // already and removed from the queue
153                            txn.claimed
154                                .map(|claim_time| claim_time.elapsed() > Duration::from_secs(30))
155                                .unwrap_or(true)
156                        })
157                        .cloned()
158                        .map(|txn| txn.transaction)
159                        .collect::<Vec<TYPES::Transaction>>()
160                })
161            })
162            .await;
163
164        if transactions.is_empty() {
165            // We don't want to return an empty block if we have no transactions, as we would end up
166            // driving consensus to produce empty blocks extremely quickly when mempool is empty.
167            // Instead, we return no blocks, so that view leader will keep asking for blocks until
168            // either we have something non-trivial to propose, or leader runs out of time to propose,
169            // in which case view leader will finally propose an empty block themselves.
170            return Ok(vec![]);
171        }
172
173        let block_entry =
174            build_block::<TYPES>(transactions, self.pub_key.clone(), self.priv_key.clone()).await;
175
176        let metadata = block_entry.metadata.clone();
177
178        self.blocks
179            .write()
180            .await
181            .insert(block_entry.metadata.block_hash.clone(), block_entry);
182
183        Ok(vec![metadata])
184    }
185
186    async fn claim_block(
187        &self,
188        block_hash: &BuilderCommitment,
189        _view_number: u64,
190        _sender: TYPES::SignatureKey,
191        _signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
192    ) -> Result<AvailableBlockData<TYPES>, BuildError> {
193        if self.should_fail_claims.load(Ordering::Relaxed) {
194            return Err(BuildError::Missing);
195        }
196
197        let payload = {
198            let mut blocks = self.blocks.write().await;
199            let entry = blocks.get_mut(block_hash).ok_or(BuildError::NotFound)?;
200            entry.payload.take().ok_or(BuildError::Missing)?
201        };
202
203        let now = Instant::now();
204
205        let claimed_transactions = payload
206            .block_payload
207            .transaction_commitments(&payload.metadata);
208
209        let mut transactions = self.transactions.write().await;
210        for txn_hash in claimed_transactions {
211            if let Some(txn) = transactions.get_mut(&txn_hash) {
212                txn.claimed = Some(now);
213            }
214        }
215
216        Ok(payload)
217    }
218
219    async fn claim_block_with_num_nodes(
220        &self,
221        block_hash: &BuilderCommitment,
222        view_number: u64,
223        sender: TYPES::SignatureKey,
224        signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
225        num_nodes: usize,
226    ) -> Result<AvailableBlockData<TYPES>, BuildError> {
227        *self.num_nodes.write().await = num_nodes;
228        self.claim_block(block_hash, view_number, sender, signature)
229            .await
230    }
231
232    async fn claim_block_header_input(
233        &self,
234        block_hash: &BuilderCommitment,
235        _view_number: u64,
236        _sender: TYPES::SignatureKey,
237        _signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
238    ) -> Result<AvailableBlockHeaderInputV1<TYPES>, BuildError> {
239        if self.should_fail_claims.load(Ordering::Relaxed) {
240            return Err(BuildError::Missing);
241        }
242
243        let mut blocks = self.blocks.write().await;
244        let entry = blocks.get_mut(block_hash).ok_or(BuildError::NotFound)?;
245        Ok(entry.header_input.clone().ok_or(BuildError::Missing)?)
246    }
247
248    async fn builder_address(&self) -> Result<TYPES::BuilderSignatureKey, BuildError> {
249        Ok(self.pub_key.clone())
250    }
251}
252
253impl<TYPES: NodeType> SimpleBuilderSource<TYPES> {
254    pub async fn run(self, url: Url)
255    where
256        <TYPES as NodeType>::InstanceState: Default,
257    {
258        let builder_api_0_1 = hotshot_builder_api::v0_1::builder::define_api::<
259            SimpleBuilderSource<TYPES>,
260            TYPES,
261        >(&Options::default())
262        .expect("Failed to construct the builder API");
263
264        let mut app: App<SimpleBuilderSource<TYPES>, Error> = App::with_state(self);
265        app.register_module::<Error, _>(LEGACY_BUILDER_MODULE, builder_api_0_1)
266            .expect("Failed to register builder API 0.1");
267
268        spawn(app.serve(url, hotshot_builder_api::v0_1::Version::instance()));
269    }
270}
271
272#[derive(Debug, Clone)]
273struct SubmittedTransaction<TYPES: NodeType> {
274    claimed: Option<Instant>,
275    transaction: TYPES::Transaction,
276}
277
278#[derive(Clone)]
279pub struct SimpleBuilderTask<TYPES: NodeType> {
280    #[allow(clippy::type_complexity)]
281    transactions: Arc<RwLock<HashMap<Commitment<TYPES::Transaction>, SubmittedTransaction<TYPES>>>>,
282    blocks: Arc<RwLock<HashMap<BuilderCommitment, BlockEntry<TYPES>>>>,
283    decided_transactions: LruCache<Commitment<TYPES::Transaction>, ()>,
284    should_fail_claims: Arc<AtomicBool>,
285    changes: HashMap<u64, BuilderChange>,
286    change_sender: Sender<BuilderChange>,
287}
288
289impl<TYPES: NodeType> BuilderTask<TYPES> for SimpleBuilderTask<TYPES> {
290    fn start(
291        mut self: Box<Self>,
292        mut stream: Box<dyn Stream<Item = Event<TYPES>> + std::marker::Unpin + Send + 'static>,
293    ) {
294        spawn(async move {
295            let mut should_build_blocks = true;
296            loop {
297                match stream.next().await {
298                    None => {
299                        break;
300                    },
301                    Some(evt) => match evt.event {
302                        EventType::ViewFinished { view_number } => {
303                            if let Some(change) = self.changes.remove(&view_number) {
304                                match change {
305                                    BuilderChange::Up => should_build_blocks = true,
306                                    BuilderChange::Down => {
307                                        should_build_blocks = false;
308                                        self.transactions.write().await.clear();
309                                        self.blocks.write().await.clear();
310                                    },
311                                    BuilderChange::FailClaims(value) => {
312                                        self.should_fail_claims.store(value, Ordering::Relaxed);
313                                    },
314                                }
315                                let _ = self.change_sender.broadcast(change).await;
316                            }
317                        },
318                        EventType::Decide { leaf_chain, .. } if should_build_blocks => {
319                            let mut queue = self.transactions.write().await;
320                            for leaf_info in leaf_chain.iter() {
321                                if let Some(ref payload) = leaf_info.leaf.block_payload() {
322                                    for txn in payload.transaction_commitments(
323                                        leaf_info.leaf.block_header().metadata(),
324                                    ) {
325                                        self.decided_transactions.put(txn, ());
326                                        queue.remove(&txn);
327                                    }
328                                }
329                            }
330                            self.blocks.write().await.clear();
331                        },
332                        EventType::DaProposal { proposal, .. } if should_build_blocks => {
333                            let payload = TYPES::BlockPayload::from_bytes(
334                                &proposal.data.encoded_transactions,
335                                &proposal.data.metadata,
336                            );
337                            let now = Instant::now();
338
339                            let mut queue = self.transactions.write().await;
340                            for commitment in
341                                payload.transaction_commitments(&proposal.data.metadata)
342                            {
343                                if let Some(txn) = queue.get_mut(&commitment) {
344                                    txn.claimed = Some(now);
345                                }
346                            }
347                        },
348                        EventType::Transactions { transactions } if should_build_blocks => {
349                            let mut queue = self.transactions.write().await;
350                            for transaction in transactions {
351                                if !self.decided_transactions.contains(&transaction.commit()) {
352                                    queue.insert(
353                                        transaction.commit(),
354                                        SubmittedTransaction {
355                                            claimed: None,
356                                            transaction: transaction.clone(),
357                                        },
358                                    );
359                                }
360                            }
361                        },
362                        _ => {},
363                    },
364                }
365            }
366        });
367    }
368}