1use std::{
2 fmt::Display,
3 sync::{
4 atomic::{AtomicUsize, Ordering},
5 Arc,
6 },
7 time::{Duration, Instant},
8};
9
10pub use async_broadcast::{broadcast, RecvError, TryRecvError};
11use async_lock::RwLock;
12use async_trait::async_trait;
13use committable::Commitment;
14use futures::{
15 future::BoxFuture,
16 stream::{FuturesOrdered, FuturesUnordered, StreamExt},
17 Stream, TryStreamExt,
18};
19use hotshot::types::Event;
20use hotshot_builder_api::{
21 v0_1::{
22 block_info::{AvailableBlockData, AvailableBlockInfo},
23 builder::{
24 define_api, submit_api, BuildError, Error as BuilderApiError, TransactionStatus,
25 },
26 data_source::{AcceptsTxnSubmits, BuilderDataSource},
27 },
28 v0_2::block_info::AvailableBlockHeaderInputV1,
29};
30use hotshot_builder_shared::{
31 block::{BlockId, BuilderStateId, ReceivedTransaction, TransactionSource},
32 coordinator::{BuilderStateCoordinator, BuilderStateLookup},
33 error::Error,
34 state::BuilderState,
35 utils::BuilderKeys,
36};
37use hotshot_types::{
38 data::VidCommitment,
39 event::EventType,
40 traits::{
41 block_contents::{BlockPayload, Transaction},
42 node_implementation::{ConsensusTime, NodeType},
43 signature_key::{BuilderSignatureKey, SignatureKey},
44 EncodeBytes,
45 },
46 utils::BuilderCommitment,
47};
48use tagged_base64::TaggedBase64;
49use tide_disco::{app::AppError, method::ReadState, App};
50use tokio::{
51 spawn,
52 task::JoinHandle,
53 time::{sleep, timeout},
54};
55use tracing::{error, info, instrument, trace, warn};
56use vbs::version::StaticVersion;
57
58use crate::{
59 block_size_limits::BlockSizeLimits,
60 block_store::{BlockInfo, BlockStore},
61};
62
63const BUILDER_STATE_EXACT_MATCH_DIVISOR: u32 = 2;
66
67const RETRY_LOOP_RESOLUTION: u32 = 10;
71
72const VID_RESPONSE_TARGET_MARGIN_DIVISOR: u32 = 10;
76
77pub(crate) const ALLOW_EMPTY_BLOCK_PERIOD: u64 = 3;
84
85#[derive(Debug, Clone)]
87pub struct BuilderConfig<Types: NodeType> {
88 pub builder_keys: BuilderKeys<Types>,
90 pub max_api_waiting_time: Duration,
94 pub max_block_size_increment_period: Duration,
97 pub maximize_txn_capture_timeout: Duration,
101 pub txn_garbage_collect_duration: Duration,
104 pub txn_channel_capacity: usize,
106 pub tx_status_cache_capacity: usize,
108 pub base_fee: u64,
110}
111
112#[cfg(test)]
113impl<Types: NodeType> BuilderConfig<Types> {
114 pub(crate) fn test() -> Self {
115 use hotshot_builder_shared::testing::constants::*;
116 Self {
117 builder_keys:
118 <Types::BuilderSignatureKey as BuilderSignatureKey>::generated_from_seed_indexed(
119 [0u8; 32], 42,
120 ),
121 max_api_waiting_time: TEST_API_TIMEOUT,
122 max_block_size_increment_period: TEST_MAX_BLOCK_SIZE_INCREMENT_PERIOD,
123 maximize_txn_capture_timeout: TEST_MAXIMIZE_TX_CAPTURE_TIMEOUT,
124 txn_garbage_collect_duration: TEST_INCLUDED_TX_GC_PERIOD,
125 txn_channel_capacity: TEST_CHANNEL_BUFFER_SIZE,
126 tx_status_cache_capacity: TEST_TX_STATUS_CACHE_CAPACITY,
127 base_fee: TEST_BASE_FEE,
128 }
129 }
130}
131
132pub struct GlobalState<Types: NodeType> {
133 pub(crate) coordinator: Arc<BuilderStateCoordinator<Types>>,
135 pub(crate) builder_keys: BuilderKeys<Types>,
137 pub(crate) block_store: RwLock<BlockStore<Types>>,
139 pub(crate) block_size_limits: BlockSizeLimits,
141 pub(crate) num_nodes: AtomicUsize,
143 pub(crate) instance_state: Types::InstanceState,
145 pub(crate) max_api_waiting_time: Duration,
147 pub(crate) maximize_txn_capture_timeout: Duration,
149 pub(crate) base_fee: u64,
151}
152
153impl<Types: NodeType> GlobalState<Types>
154where
155 for<'a> <<Types::SignatureKey as SignatureKey>::PureAssembledSignatureType as TryFrom<
156 &'a TaggedBase64,
157 >>::Error: Display,
158 for<'a> <Types::SignatureKey as TryFrom<&'a TaggedBase64>>::Error: Display,
159{
160 pub fn new(
161 config: BuilderConfig<Types>,
162 instance_state: Types::InstanceState,
163 protocol_max_block_size: u64,
164 num_nodes: usize,
165 ) -> Arc<Self> {
166 Arc::new(Self {
167 coordinator: Arc::new(BuilderStateCoordinator::new(
168 config.txn_channel_capacity,
169 config.txn_garbage_collect_duration,
170 config.tx_status_cache_capacity,
171 )),
172 block_store: RwLock::new(BlockStore::new()),
173 block_size_limits: BlockSizeLimits::new(
174 protocol_max_block_size,
175 config.max_block_size_increment_period,
176 ),
177 num_nodes: num_nodes.into(),
178 builder_keys: config.builder_keys,
179 max_api_waiting_time: config.max_api_waiting_time,
180 maximize_txn_capture_timeout: config.maximize_txn_capture_timeout,
181 instance_state,
182 base_fee: config.base_fee,
183 })
184 }
185
186 pub fn start_event_loop(
189 self: Arc<Self>,
190 event_stream: impl Stream<Item = Event<Types>> + Unpin + Send + 'static,
191 ) -> JoinHandle<anyhow::Result<()>> {
192 spawn(self.event_loop(event_stream))
193 }
194
195 async fn event_loop(
198 self: Arc<Self>,
199 mut event_stream: impl Stream<Item = Event<Types>> + Unpin + Send + 'static,
200 ) -> anyhow::Result<()> {
201 loop {
202 let Some(event) = event_stream.next().await else {
203 anyhow::bail!("Event stream ended");
204 };
205
206 match event.event {
207 EventType::Error { error } => {
208 error!("Error event in HotShot: {error:?}");
209 },
210 EventType::Transactions { transactions } => {
211 let this = Arc::clone(&self);
212 spawn(async move {
213 transactions
214 .into_iter()
215 .map(|txn| {
216 this.handle_transaction(ReceivedTransaction::new(
217 txn,
218 TransactionSource::Public,
219 ))
220 })
221 .collect::<FuturesUnordered<_>>()
222 .collect::<Vec<_>>()
223 .await;
224 });
225 },
226 EventType::Decide { leaf_chain, .. } => {
227 let prune_cutoff = leaf_chain[0].leaf.view_number();
228
229 let coordinator = Arc::clone(&self.coordinator);
230 spawn(async move { coordinator.handle_decide(leaf_chain).await });
231
232 let this = Arc::clone(&self);
233 spawn(async move { this.block_store.write().await.prune(prune_cutoff) });
234 },
235 EventType::DaProposal { proposal, .. } => {
236 let coordinator = Arc::clone(&self.coordinator);
237 spawn(async move { coordinator.handle_da_proposal(proposal.data).await });
238 },
239 EventType::QuorumProposal { proposal, .. } => {
240 let coordinator = Arc::clone(&self.coordinator);
241 spawn(async move { coordinator.handle_quorum_proposal(proposal.data).await });
242 },
243 _ => {},
244 }
245 }
246 }
247
248 pub fn into_app(
250 self: Arc<Self>,
251 ) -> Result<App<ProxyGlobalState<Types>, BuilderApiError>, AppError> {
252 let proxy = ProxyGlobalState(self);
253 let builder_api = define_api::<ProxyGlobalState<Types>, Types>(&Default::default())?;
254
255 let private_mempool_api =
257 submit_api::<ProxyGlobalState<Types>, Types, StaticVersion<0, 1>>(&Default::default())?;
258
259 let mut app: App<ProxyGlobalState<Types>, BuilderApiError> = App::with_state(proxy);
260
261 app.register_module(hotshot_types::constants::LEGACY_BUILDER_MODULE, builder_api)?;
262
263 app.register_module("txn_submit", private_mempool_api)?;
264
265 Ok(app)
266 }
267
268 async fn handle_transaction(&self, tx: ReceivedTransaction<Types>) -> Result<(), Error<Types>> {
269 let len = tx.transaction.minimum_block_size();
270 let max_tx_len = self.block_size_limits.max_block_size();
271 if len > max_tx_len {
272 tracing::warn!(%tx.commit, %len, %max_tx_len, "Transaction too big");
273 let error = Error::TxTooBig { len, max_tx_len };
274 self.coordinator.update_txn_status(
275 &tx.commit,
276 TransactionStatus::Rejected {
277 reason: error.to_string(),
278 },
279 );
280 return Err(error);
281 }
282 self.coordinator.handle_transaction(tx).await
283 }
284
285 async fn wait_for_builder_state(
286 &self,
287 state_id: &BuilderStateId<Types>,
288 check_period: Duration,
289 ) -> Result<Arc<BuilderState<Types>>, Error<Types>> {
290 loop {
291 match self.coordinator.lookup_builder_state(state_id).await {
292 BuilderStateLookup::Found(builder) => break Ok(builder),
293 BuilderStateLookup::Decided => {
294 return Err(Error::AlreadyDecided);
295 },
296 BuilderStateLookup::NotFound => {
297 sleep(check_period).await;
298 },
299 };
300 }
301 }
302
303 pub(crate) async fn build_block(
308 &self,
309 builder_state: Arc<BuilderState<Types>>,
310 ) -> Result<Option<BlockInfo<Types>>, Error<Types>> {
311 let timeout_after = Instant::now() + self.maximize_txn_capture_timeout;
312 let sleep_interval = self.maximize_txn_capture_timeout / RETRY_LOOP_RESOLUTION;
313
314 while Instant::now() <= timeout_after {
315 let queue_populated = builder_state.collect_txns(timeout_after).await;
316
317 if queue_populated || Instant::now() + sleep_interval > timeout_after {
318 break;
320 }
321
322 sleep(sleep_interval).await
323 }
324
325 let should_prioritize_finalization = builder_state.parent_block_references.tx_count != 0
329 && builder_state
330 .parent_block_references
331 .last_nonempty_view
332 .map(|nonempty_view| {
333 nonempty_view.saturating_sub(*builder_state.parent_block_references.view_number)
334 < ALLOW_EMPTY_BLOCK_PERIOD
335 })
336 .unwrap_or(false);
337
338 let builder: &Arc<BuilderState<Types>> = &builder_state;
339 let max_block_size = self.block_size_limits.max_block_size();
340
341 let transactions_to_include = {
342 let txn_queue = builder.txn_queue.read().await;
343 if txn_queue.is_empty() && !should_prioritize_finalization {
344 return Ok(None);
346 }
347 txn_queue
348 .iter()
349 .scan(0, |total_size, tx| {
350 let prev_size = *total_size;
351 *total_size += tx.min_block_size;
352 if *total_size >= max_block_size && prev_size != 0 {
357 None
358 } else {
359 Some(Arc::clone(tx))
363 }
364 })
365 .collect::<Vec<_>>()
366 };
367
368 let (payload, metadata) =
369 match <Types::BlockPayload as BlockPayload<Types>>::from_transactions(
370 transactions_to_include
371 .into_iter()
372 .map(|tx| tx.transaction.clone()),
373 &builder.validated_state,
374 &self.instance_state,
375 )
376 .await
377 {
378 Ok((payload, metadata)) => (payload, metadata),
379 Err(error) => {
380 warn!(?error, "Failed to build block payload");
381 return Err(Error::BuildBlock(error));
382 },
383 };
384
385 let actual_txn_count = payload.num_transactions(&metadata);
387 let truncated = actual_txn_count == 0;
388
389 if truncated {
400 builder.txn_queue.write().await.pop_front();
401 if !should_prioritize_finalization {
402 return Ok(None);
403 }
404 }
405
406 let encoded_txns: Vec<u8> = payload.encode().to_vec();
407 let block_size: u64 = encoded_txns.len() as u64;
408 let offered_fee: u64 = self.base_fee * block_size;
409
410 info!(
411 builder_id = %builder.id(),
412 txn_count = actual_txn_count,
413 block_size,
414 "Built a block",
415 );
416
417 Ok(Some(BlockInfo {
418 block_payload: payload,
419 block_size,
420 metadata,
421 offered_fee,
422 truncated,
423 }))
424 }
425
426 #[instrument(skip_all,
427 fields(state_id = %state_id)
428 )]
429 pub(crate) async fn available_blocks_implementation(
430 &self,
431 state_id: BuilderStateId<Types>,
432 ) -> Result<Vec<AvailableBlockInfo<Types>>, Error<Types>> {
433 let start = Instant::now();
434
435 let check_period = self.max_api_waiting_time / RETRY_LOOP_RESOLUTION;
436 let time_to_wait_for_matching_builder =
437 self.max_api_waiting_time / BUILDER_STATE_EXACT_MATCH_DIVISOR;
438
439 let builder = match timeout(
440 time_to_wait_for_matching_builder,
441 self.wait_for_builder_state(&state_id, check_period),
442 )
443 .await
444 {
445 Ok(Ok(builder)) => Some(builder),
446 Err(_) => {
447 warn!("Couldn't find the ideal builder state");
449 self.coordinator.highest_view_builder().await
450 },
451 Ok(Err(e)) => {
452 let lowest_view = self.coordinator.lowest_view().await;
454 warn!(
455 ?lowest_view,
456 "get_available_blocks request for decided view"
457 );
458 return Err(e);
459 },
460 };
461
462 let Some(builder) = builder else {
463 if let Some(cached_block) = self.block_store.read().await.get_cached(&state_id) {
464 return Ok(vec![cached_block.signed_response(&self.builder_keys)?]);
465 } else {
466 return Err(Error::NotFound);
467 };
468 };
469
470 let build_block_timeout = self
471 .max_api_waiting_time
472 .saturating_sub(start.elapsed())
473 .div_f32(1.1);
474 match timeout(build_block_timeout, self.build_block(builder))
475 .await
476 .map_err(|_| Error::ApiTimeout)
477 {
478 Ok(Ok(Some(info))) => {
480 let block_id = BlockId {
481 hash: info.block_payload.builder_commitment(&info.metadata),
482 view: state_id.parent_view,
483 };
484
485 let response = info.signed_response(&self.builder_keys)?;
486
487 {
488 let mut mutable_state = self.block_store.write().await;
489 mutable_state.update(state_id, block_id, info);
490 }
491
492 Ok(vec![response])
493 },
494 Ok(Ok(None)) => Ok(vec![]),
496 Ok(Err(e)) | Err(e) => {
498 if let Some(cached_block) = self.block_store.read().await.get_cached(&state_id) {
499 Ok(vec![cached_block.signed_response(&self.builder_keys)?])
500 } else {
501 Err(e)
502 }
503 },
504 }
505 }
506
507 #[instrument(skip_all,
508 fields(block_id = %block_id)
509 )]
510 pub(crate) async fn claim_block_implementation(
511 &self,
512 block_id: BlockId<Types>,
513 ) -> Result<AvailableBlockData<Types>, Error<Types>> {
514 let (block_payload, metadata) = {
515 let mutable_state_read = self.block_store.read().await;
524 let block_info = mutable_state_read
525 .get_block(&block_id)
526 .ok_or(Error::NotFound)?;
527
528 (
529 block_info.block_payload.clone(),
530 block_info.metadata.clone(),
531 )
532 };
533
534 let (pub_key, sign_key) = self.builder_keys.clone();
535
536 let response_block_hash = block_payload.builder_commitment(&metadata);
539 let signature_over_builder_commitment =
540 <Types as NodeType>::BuilderSignatureKey::sign_builder_message(
541 &sign_key,
542 response_block_hash.as_ref(),
543 )
544 .map_err(Error::Signing)?;
545
546 let block_data = AvailableBlockData::<Types> {
547 block_payload,
548 metadata,
549 signature: signature_over_builder_commitment,
550 sender: pub_key,
551 };
552
553 info!("Sending Claim Block data for {block_id}");
554 Ok(block_data)
555 }
556
557 #[instrument(skip_all,
558 fields(block_id = %block_id)
559 )]
560 pub(crate) async fn claim_block_header_input_implementation(
561 &self,
562 block_id: BlockId<Types>,
563 ) -> Result<(bool, AvailableBlockHeaderInputV1<Types>), Error<Types>> {
564 let metadata;
565 let offered_fee;
566 let truncated;
567 {
568 let mutable_state_read_lock_guard = self.block_store.read().await;
577 let block_info = mutable_state_read_lock_guard
578 .get_block(&block_id)
579 .ok_or(Error::NotFound)?;
580
581 metadata = block_info.metadata.clone();
582 offered_fee = block_info.offered_fee;
583 truncated = block_info.truncated;
584 };
585
586 let signature_over_fee_info =
589 Types::BuilderSignatureKey::sign_fee(&self.builder_keys.1, offered_fee, &metadata)
590 .map_err(Error::Signing)?;
591
592 let response = AvailableBlockHeaderInputV1::<Types> {
593 fee_signature: signature_over_fee_info,
594 sender: self.builder_keys.0.clone(),
595 };
596 info!("Sending Claim Block Header Input response");
597 Ok((truncated, response))
598 }
599}
600
601#[derive(derive_more::Deref, derive_more::DerefMut)]
602#[deref(forward)]
603#[deref_mut(forward)]
604pub struct ProxyGlobalState<Types: NodeType>(pub Arc<GlobalState<Types>>);
605
606#[async_trait]
610impl<Types: NodeType> BuilderDataSource<Types> for ProxyGlobalState<Types>
611where
612 for<'a> <<Types::SignatureKey as SignatureKey>::PureAssembledSignatureType as TryFrom<
613 &'a TaggedBase64,
614 >>::Error: Display,
615 for<'a> <Types::SignatureKey as TryFrom<&'a TaggedBase64>>::Error: Display,
616{
617 #[tracing::instrument(skip_all)]
618 async fn available_blocks(
619 &self,
620 parent_block: &VidCommitment,
621 parent_view: u64,
622 sender: Types::SignatureKey,
623 signature: &<Types::SignatureKey as SignatureKey>::PureAssembledSignatureType,
624 ) -> Result<Vec<AvailableBlockInfo<Types>>, BuildError> {
625 if !sender.validate(signature, parent_block.as_ref()) {
627 warn!("Signature validation failed");
628 return Err(Error::<Types>::SignatureValidation.into());
629 }
630
631 let state_id = BuilderStateId {
632 parent_commitment: *parent_block,
633 parent_view: Types::View::new(parent_view),
634 };
635
636 trace!("Requesting available blocks");
637
638 let available_blocks = timeout(
639 self.max_api_waiting_time,
640 self.available_blocks_implementation(state_id),
641 )
642 .await
643 .map_err(|_| Error::<Types>::ApiTimeout)??;
644
645 Ok(available_blocks)
646 }
647
648 #[tracing::instrument(skip_all)]
649 async fn claim_block(
650 &self,
651 block_hash: &BuilderCommitment,
652 view_number: u64,
653 sender: Types::SignatureKey,
654 signature: &<<Types as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
655 ) -> Result<AvailableBlockData<Types>, BuildError> {
656 if !sender.validate(signature, block_hash.as_ref()) {
658 warn!("Signature validation failed");
659 return Err(Error::<Types>::SignatureValidation.into());
660 }
661
662 let block_id = BlockId {
663 hash: block_hash.clone(),
664 view: Types::View::new(view_number),
665 };
666
667 trace!("Processing claim block request");
668
669 let block = timeout(
670 self.max_api_waiting_time,
671 self.claim_block_implementation(block_id),
672 )
673 .await
674 .map_err(|_| Error::<Types>::ApiTimeout)??;
675
676 Ok(block)
677 }
678
679 #[tracing::instrument(skip_all)]
680 async fn claim_block_with_num_nodes(
681 &self,
682 block_hash: &BuilderCommitment,
683 view_number: u64,
684 sender: <Types as NodeType>::SignatureKey,
685 signature: &<<Types as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
686 num_nodes: usize,
687 ) -> Result<AvailableBlockData<Types>, BuildError> {
688 trace!(
690 new_num_nodes = num_nodes,
691 old_num_nodes = self.num_nodes.load(Ordering::Relaxed),
692 "Updating num_nodes"
693 );
694
695 self.num_nodes.store(num_nodes, Ordering::Relaxed);
696
697 self.claim_block(block_hash, view_number, sender, signature)
698 .await
699 }
700
701 #[tracing::instrument(skip_all)]
702 async fn claim_block_header_input(
703 &self,
704 block_hash: &BuilderCommitment,
705 view_number: u64,
706 sender: Types::SignatureKey,
707 signature: &<<Types as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
708 ) -> Result<AvailableBlockHeaderInputV1<Types>, BuildError> {
709 let start = Instant::now();
710 if !sender.validate(signature, block_hash.as_ref()) {
712 warn!("Signature validation failed in claim_block_header_input");
713 return Err(Error::<Types>::SignatureValidation.into());
714 }
715
716 let block_id = BlockId {
717 hash: block_hash.clone(),
718 view: Types::View::new(view_number),
719 };
720
721 trace!("Processing claim_block_header_input request");
722
723 let (truncated, info) = timeout(
724 self.max_api_waiting_time,
725 self.claim_block_header_input_implementation(block_id),
726 )
727 .await
728 .inspect_err(|_| {
729 self.block_size_limits.decrement_block_size();
731 })
732 .map_err(|_| Error::<Types>::ApiTimeout)??;
733
734 if self.max_api_waiting_time.saturating_sub(start.elapsed())
735 > self.max_api_waiting_time / VID_RESPONSE_TARGET_MARGIN_DIVISOR
736 {
737 self.block_size_limits.try_increment_block_size(truncated);
739 }
740
741 Ok(info)
742 }
743
744 #[tracing::instrument(skip_all)]
746 async fn builder_address(
747 &self,
748 ) -> Result<<Types as NodeType>::BuilderSignatureKey, BuildError> {
749 Ok(self.builder_keys.0.clone())
750 }
751}
752
753#[async_trait]
754impl<Types: NodeType> AcceptsTxnSubmits<Types> for ProxyGlobalState<Types>
755where
756 for<'a> <<Types::SignatureKey as SignatureKey>::PureAssembledSignatureType as TryFrom<
757 &'a TaggedBase64,
758 >>::Error: Display,
759 for<'a> <Types::SignatureKey as TryFrom<&'a TaggedBase64>>::Error: Display,
760{
761 async fn submit_txns(
762 &self,
763 txns: Vec<<Types as NodeType>::Transaction>,
764 ) -> Result<Vec<Commitment<<Types as NodeType>::Transaction>>, BuildError> {
765 txns.into_iter()
766 .map(|txn| ReceivedTransaction::new(txn, TransactionSource::Private))
767 .map(|txn| async {
768 let commit = txn.commit;
769 self.0.handle_transaction(txn).await?;
770 Ok(commit)
771 })
772 .collect::<FuturesOrdered<_>>()
773 .try_collect()
774 .await
775 }
776
777 async fn txn_status(
778 &self,
779 txn_hash: Commitment<<Types as NodeType>::Transaction>,
780 ) -> Result<TransactionStatus, BuildError> {
781 Ok(self.coordinator.tx_status(&txn_hash))
782 }
783}
784
785#[async_trait]
786impl<Types: NodeType> ReadState for ProxyGlobalState<Types> {
787 type State = ProxyGlobalState<Types>;
788
789 async fn read<T>(
790 &self,
791 op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait,
792 ) -> T {
793 op(self).await
794 }
795}