hotshot_testing/block_builder/
simple.rs1use std::{
8 collections::HashMap,
9 num::NonZeroUsize,
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc,
13 },
14 time::{Duration, Instant},
15};
16
17use async_broadcast::{broadcast, Sender};
18use async_lock::RwLock;
19use async_trait::async_trait;
20use committable::{Commitment, Committable};
21use futures::{future::BoxFuture, Stream, StreamExt};
22use hotshot::{
23 traits::BlockPayload,
24 types::{Event, EventType, SignatureKey},
25};
26use hotshot_builder_api::{
27 v0_1::{
28 self,
29 block_info::{AvailableBlockData, AvailableBlockInfo},
30 builder::{BuildError, Error, Options},
31 },
32 v0_2::block_info::AvailableBlockHeaderInputV1,
33};
34use hotshot_types::{
35 constants::LEGACY_BUILDER_MODULE,
36 data::VidCommitment,
37 traits::{
38 block_contents::BlockHeader, node_implementation::NodeType,
39 signature_key::BuilderSignatureKey,
40 },
41 utils::BuilderCommitment,
42};
43use lru::LruCache;
44use tide_disco::{method::ReadState, App, Url};
45use tokio::spawn;
46use vbs::version::StaticVersionType;
47
48use super::{build_block, run_builder_source, BlockEntry, BuilderTask, TestBuilderImplementation};
49use crate::test_builder::BuilderChange;
50
51pub struct SimpleBuilderImplementation;
52
53impl SimpleBuilderImplementation {
54 pub async fn create<TYPES: NodeType>(
55 num_nodes: usize,
56 changes: HashMap<u64, BuilderChange>,
57 change_sender: Sender<BuilderChange>,
58 ) -> (SimpleBuilderSource<TYPES>, SimpleBuilderTask<TYPES>) {
59 let (pub_key, priv_key) =
60 TYPES::BuilderSignatureKey::generated_from_seed_indexed([1; 32], 0);
61
62 let transactions = Arc::new(RwLock::new(HashMap::new()));
63 let blocks = Arc::new(RwLock::new(HashMap::new()));
64 let should_fail_claims = Arc::new(AtomicBool::new(false));
65
66 let source = SimpleBuilderSource {
67 pub_key,
68 priv_key,
69 transactions: transactions.clone(),
70 blocks: blocks.clone(),
71 num_nodes: Arc::new(RwLock::new(num_nodes)),
72 should_fail_claims: Arc::clone(&should_fail_claims),
73 };
74
75 let task = SimpleBuilderTask {
76 transactions,
77 blocks,
78 decided_transactions: LruCache::new(NonZeroUsize::new(u16::MAX.into()).expect("> 0")),
79 should_fail_claims,
80 change_sender,
81 changes,
82 };
83
84 (source, task)
85 }
86}
87
88#[async_trait]
89impl<TYPES: NodeType> TestBuilderImplementation<TYPES> for SimpleBuilderImplementation
90where
91 <TYPES as NodeType>::InstanceState: Default,
92{
93 type Config = ();
94
95 async fn start(
96 num_nodes: usize,
97 url: Url,
98 _config: Self::Config,
99 changes: HashMap<u64, BuilderChange>,
100 ) -> Box<dyn BuilderTask<TYPES>> {
101 let (change_sender, change_receiver) = broadcast(128);
102 let (source, task) = Self::create(num_nodes, changes, change_sender).await;
103 run_builder_source(url, change_receiver, source);
104
105 Box::new(task)
106 }
107}
108
109#[derive(Debug, Clone)]
110pub struct SimpleBuilderSource<TYPES: NodeType> {
111 pub_key: TYPES::BuilderSignatureKey,
112 priv_key: <TYPES::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey,
113 num_nodes: Arc<RwLock<usize>>,
114 #[allow(clippy::type_complexity)]
115 transactions: Arc<RwLock<HashMap<Commitment<TYPES::Transaction>, SubmittedTransaction<TYPES>>>>,
116 blocks: Arc<RwLock<HashMap<BuilderCommitment, BlockEntry<TYPES>>>>,
117 should_fail_claims: Arc<AtomicBool>,
118}
119
120#[async_trait]
121impl<TYPES: NodeType> ReadState for SimpleBuilderSource<TYPES> {
122 type State = Self;
123
124 async fn read<T>(
125 &self,
126 op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait,
127 ) -> T {
128 op(self).await
129 }
130}
131
132#[async_trait]
133impl<TYPES: NodeType> v0_1::data_source::BuilderDataSource<TYPES> for SimpleBuilderSource<TYPES>
134where
135 <TYPES as NodeType>::InstanceState: Default,
136{
137 async fn available_blocks(
138 &self,
139 _for_parent: &VidCommitment,
140 _view_number: u64,
141 _sender: TYPES::SignatureKey,
142 _signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
143 ) -> Result<Vec<AvailableBlockInfo<TYPES>>, BuildError> {
144 let transactions = self
145 .transactions
146 .read(|txns| {
147 Box::pin(async {
148 txns.values()
149 .filter(|txn| {
150 txn.claimed
154 .map(|claim_time| claim_time.elapsed() > Duration::from_secs(30))
155 .unwrap_or(true)
156 })
157 .cloned()
158 .map(|txn| txn.transaction)
159 .collect::<Vec<TYPES::Transaction>>()
160 })
161 })
162 .await;
163
164 if transactions.is_empty() {
165 return Ok(vec![]);
171 }
172
173 let block_entry =
174 build_block::<TYPES>(transactions, self.pub_key.clone(), self.priv_key.clone()).await;
175
176 let metadata = block_entry.metadata.clone();
177
178 self.blocks
179 .write()
180 .await
181 .insert(block_entry.metadata.block_hash.clone(), block_entry);
182
183 Ok(vec![metadata])
184 }
185
186 async fn claim_block(
187 &self,
188 block_hash: &BuilderCommitment,
189 _view_number: u64,
190 _sender: TYPES::SignatureKey,
191 _signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
192 ) -> Result<AvailableBlockData<TYPES>, BuildError> {
193 if self.should_fail_claims.load(Ordering::Relaxed) {
194 return Err(BuildError::Missing);
195 }
196
197 let payload = {
198 let mut blocks = self.blocks.write().await;
199 let entry = blocks.get_mut(block_hash).ok_or(BuildError::NotFound)?;
200 entry.payload.take().ok_or(BuildError::Missing)?
201 };
202
203 let now = Instant::now();
204
205 let claimed_transactions = payload
206 .block_payload
207 .transaction_commitments(&payload.metadata);
208
209 let mut transactions = self.transactions.write().await;
210 for txn_hash in claimed_transactions {
211 if let Some(txn) = transactions.get_mut(&txn_hash) {
212 txn.claimed = Some(now);
213 }
214 }
215
216 Ok(payload)
217 }
218
219 async fn claim_block_with_num_nodes(
220 &self,
221 block_hash: &BuilderCommitment,
222 view_number: u64,
223 sender: TYPES::SignatureKey,
224 signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
225 num_nodes: usize,
226 ) -> Result<AvailableBlockData<TYPES>, BuildError> {
227 *self.num_nodes.write().await = num_nodes;
228 self.claim_block(block_hash, view_number, sender, signature)
229 .await
230 }
231
232 async fn claim_block_header_input(
233 &self,
234 block_hash: &BuilderCommitment,
235 _view_number: u64,
236 _sender: TYPES::SignatureKey,
237 _signature: &<TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
238 ) -> Result<AvailableBlockHeaderInputV1<TYPES>, BuildError> {
239 if self.should_fail_claims.load(Ordering::Relaxed) {
240 return Err(BuildError::Missing);
241 }
242
243 let mut blocks = self.blocks.write().await;
244 let entry = blocks.get_mut(block_hash).ok_or(BuildError::NotFound)?;
245 Ok(entry.header_input.clone().ok_or(BuildError::Missing)?)
246 }
247
248 async fn builder_address(&self) -> Result<TYPES::BuilderSignatureKey, BuildError> {
249 Ok(self.pub_key.clone())
250 }
251}
252
253impl<TYPES: NodeType> SimpleBuilderSource<TYPES> {
254 pub async fn run(self, url: Url)
255 where
256 <TYPES as NodeType>::InstanceState: Default,
257 {
258 let builder_api_0_1 = hotshot_builder_api::v0_1::builder::define_api::<
259 SimpleBuilderSource<TYPES>,
260 TYPES,
261 >(&Options::default())
262 .expect("Failed to construct the builder API");
263
264 let mut app: App<SimpleBuilderSource<TYPES>, Error> = App::with_state(self);
265 app.register_module::<Error, _>(LEGACY_BUILDER_MODULE, builder_api_0_1)
266 .expect("Failed to register builder API 0.1");
267
268 spawn(app.serve(url, hotshot_builder_api::v0_1::Version::instance()));
269 }
270}
271
272#[derive(Debug, Clone)]
273struct SubmittedTransaction<TYPES: NodeType> {
274 claimed: Option<Instant>,
275 transaction: TYPES::Transaction,
276}
277
278#[derive(Clone)]
279pub struct SimpleBuilderTask<TYPES: NodeType> {
280 #[allow(clippy::type_complexity)]
281 transactions: Arc<RwLock<HashMap<Commitment<TYPES::Transaction>, SubmittedTransaction<TYPES>>>>,
282 blocks: Arc<RwLock<HashMap<BuilderCommitment, BlockEntry<TYPES>>>>,
283 decided_transactions: LruCache<Commitment<TYPES::Transaction>, ()>,
284 should_fail_claims: Arc<AtomicBool>,
285 changes: HashMap<u64, BuilderChange>,
286 change_sender: Sender<BuilderChange>,
287}
288
289impl<TYPES: NodeType> BuilderTask<TYPES> for SimpleBuilderTask<TYPES> {
290 fn start(
291 mut self: Box<Self>,
292 mut stream: Box<dyn Stream<Item = Event<TYPES>> + std::marker::Unpin + Send + 'static>,
293 ) {
294 spawn(async move {
295 let mut should_build_blocks = true;
296 loop {
297 match stream.next().await {
298 None => {
299 break;
300 },
301 Some(evt) => match evt.event {
302 EventType::ViewFinished { view_number } => {
303 if let Some(change) = self.changes.remove(&view_number) {
304 match change {
305 BuilderChange::Up => should_build_blocks = true,
306 BuilderChange::Down => {
307 should_build_blocks = false;
308 self.transactions.write().await.clear();
309 self.blocks.write().await.clear();
310 },
311 BuilderChange::FailClaims(value) => {
312 self.should_fail_claims.store(value, Ordering::Relaxed);
313 },
314 }
315 let _ = self.change_sender.broadcast(change).await;
316 }
317 },
318 EventType::Decide { leaf_chain, .. } if should_build_blocks => {
319 let mut queue = self.transactions.write().await;
320 for leaf_info in leaf_chain.iter() {
321 if let Some(ref payload) = leaf_info.leaf.block_payload() {
322 for txn in payload.transaction_commitments(
323 leaf_info.leaf.block_header().metadata(),
324 ) {
325 self.decided_transactions.put(txn, ());
326 queue.remove(&txn);
327 }
328 }
329 }
330 self.blocks.write().await.clear();
331 },
332 EventType::DaProposal { proposal, .. } if should_build_blocks => {
333 let payload = TYPES::BlockPayload::from_bytes(
334 &proposal.data.encoded_transactions,
335 &proposal.data.metadata,
336 );
337 let now = Instant::now();
338
339 let mut queue = self.transactions.write().await;
340 for commitment in
341 payload.transaction_commitments(&proposal.data.metadata)
342 {
343 if let Some(txn) = queue.get_mut(&commitment) {
344 txn.claimed = Some(now);
345 }
346 }
347 },
348 EventType::Transactions { transactions } if should_build_blocks => {
349 let mut queue = self.transactions.write().await;
350 for transaction in transactions {
351 if !self.decided_transactions.contains(&transaction.commit()) {
352 queue.insert(
353 transaction.commit(),
354 SubmittedTransaction {
355 claimed: None,
356 transaction: transaction.clone(),
357 },
358 );
359 }
360 }
361 },
362 _ => {},
363 },
364 }
365 }
366 });
367 }
368}