hotshot_builder_refactored/
service.rs

1use std::{
2    fmt::Display,
3    sync::{
4        atomic::{AtomicUsize, Ordering},
5        Arc,
6    },
7    time::{Duration, Instant},
8};
9
10pub use async_broadcast::{broadcast, RecvError, TryRecvError};
11use async_lock::RwLock;
12use async_trait::async_trait;
13use committable::Commitment;
14use futures::{
15    future::BoxFuture,
16    stream::{FuturesOrdered, FuturesUnordered, StreamExt},
17    Stream, TryStreamExt,
18};
19use hotshot::types::Event;
20use hotshot_builder_api::{
21    v0_1::{
22        block_info::{AvailableBlockData, AvailableBlockInfo},
23        builder::{
24            define_api, submit_api, BuildError, Error as BuilderApiError, TransactionStatus,
25        },
26        data_source::{AcceptsTxnSubmits, BuilderDataSource},
27    },
28    v0_2::block_info::AvailableBlockHeaderInputV1,
29};
30use hotshot_builder_shared::{
31    block::{BlockId, BuilderStateId, ReceivedTransaction, TransactionSource},
32    coordinator::{BuilderStateCoordinator, BuilderStateLookup},
33    error::Error,
34    state::BuilderState,
35    utils::BuilderKeys,
36};
37use hotshot_types::{
38    data::VidCommitment,
39    event::EventType,
40    traits::{
41        block_contents::{BlockPayload, Transaction},
42        node_implementation::{ConsensusTime, NodeType},
43        signature_key::{BuilderSignatureKey, SignatureKey},
44        EncodeBytes,
45    },
46    utils::BuilderCommitment,
47};
48use tagged_base64::TaggedBase64;
49use tide_disco::{app::AppError, method::ReadState, App};
50use tokio::{
51    spawn,
52    task::JoinHandle,
53    time::{sleep, timeout},
54};
55use tracing::{error, info, instrument, trace, warn};
56use vbs::version::StaticVersion;
57
58use crate::{
59    block_size_limits::BlockSizeLimits,
60    block_store::{BlockInfo, BlockStore},
61};
62
63/// Proportion of overall allotted time to wait for optimal builder state
64/// to appear before resorting to highest view builder state
65const BUILDER_STATE_EXACT_MATCH_DIVISOR: u32 = 2;
66
67/// This constant governs duration of sleep in various retry loops
68/// in the API. If we're re-trying something with a timeout of `X`,
69/// we will sleep for `X / RETRY_LOOP_RESOLUTION` between attempts.
70const RETRY_LOOP_RESOLUTION: u32 = 10;
71
72/// We will not increment max block value if we aren't able to serve a response
73/// with a margin below [`GlobalState::max_api_waiting_time`]
74/// more than [`GlobalState::max_api_waiting_time`] / `VID_RESPONSE_TARGET_MARGIN_DIVISOR`
75const VID_RESPONSE_TARGET_MARGIN_DIVISOR: u32 = 10;
76
77/// [`ALLOW_EMPTY_BLOCK_PERIOD`] is a constant that is used to determine the
78/// number of future views that we will allow building empty blocks for.
79///
80/// This value governs the ability for the Builder to prioritize finalizing
81/// transactions by producing empty blocks rather than avoiding the creation
82/// of them, following the proposal that contains transactions.
83pub(crate) const ALLOW_EMPTY_BLOCK_PERIOD: u64 = 3;
84
85/// Configuration to initialize the builder
86#[derive(Debug, Clone)]
87pub struct BuilderConfig<Types: NodeType> {
88    /// Keys that this builder will use to sign responses
89    pub builder_keys: BuilderKeys<Types>,
90    /// Maximum time allotted for the builder to respond to an API call.
91    /// If the response isn't ready by this time, an error will be returned
92    /// to the caller.
93    pub max_api_waiting_time: Duration,
94    /// Interval at which the builder will optimistically increment its maximum
95    /// allowed block size in case it becomes lower than the protocol maximum.
96    pub max_block_size_increment_period: Duration,
97    /// Time the builder will wait for new transactions before answering an
98    /// `available_blocks` API call if the builder doesn't have any transactions at the moment
99    /// of the call. Should be less than [`Self::max_api_waiting_time`]
100    pub maximize_txn_capture_timeout: Duration,
101    /// (Approximate) duration over which included transaction hashes will be stored
102    /// by the builder for deduplication of incoming transactions.
103    pub txn_garbage_collect_duration: Duration,
104    /// Channel capacity for incoming transactions for a single builder state.
105    pub txn_channel_capacity: usize,
106    /// Capacity of cache storing information for transaction status API
107    pub tx_status_cache_capacity: usize,
108    /// Base fee; the sequencing fee for a block is calculated as block size × base fee
109    pub base_fee: u64,
110}
111
112#[cfg(test)]
113impl<Types: NodeType> BuilderConfig<Types> {
114    pub(crate) fn test() -> Self {
115        use hotshot_builder_shared::testing::constants::*;
116        Self {
117            builder_keys:
118                <Types::BuilderSignatureKey as BuilderSignatureKey>::generated_from_seed_indexed(
119                    [0u8; 32], 42,
120                ),
121            max_api_waiting_time: TEST_API_TIMEOUT,
122            max_block_size_increment_period: TEST_MAX_BLOCK_SIZE_INCREMENT_PERIOD,
123            maximize_txn_capture_timeout: TEST_MAXIMIZE_TX_CAPTURE_TIMEOUT,
124            txn_garbage_collect_duration: TEST_INCLUDED_TX_GC_PERIOD,
125            txn_channel_capacity: TEST_CHANNEL_BUFFER_SIZE,
126            tx_status_cache_capacity: TEST_TX_STATUS_CACHE_CAPACITY,
127            base_fee: TEST_BASE_FEE,
128        }
129    }
130}
131
132pub struct GlobalState<Types: NodeType> {
133    /// Underlying coordinator, responsible for builder state lifecycle
134    pub(crate) coordinator: Arc<BuilderStateCoordinator<Types>>,
135    /// Keys that this builder will use to sign responses
136    pub(crate) builder_keys: BuilderKeys<Types>,
137    /// Stores blocks built by this builder
138    pub(crate) block_store: RwLock<BlockStore<Types>>,
139    /// Limits on block size. See [`BlockSizeLimits`] documentation for more details.
140    pub(crate) block_size_limits: BlockSizeLimits,
141    /// Number of DA nodes used in VID computation
142    pub(crate) num_nodes: AtomicUsize,
143    /// Instance state, used to construct new blocks
144    pub(crate) instance_state: Types::InstanceState,
145    /// See [`BuilderConfig::max_api_waiting_time`]
146    pub(crate) max_api_waiting_time: Duration,
147    /// See [`BuilderConfig::maximize_txn_capture_timeout`]
148    pub(crate) maximize_txn_capture_timeout: Duration,
149    /// See [`BuilderConfig::base_fee`]
150    pub(crate) base_fee: u64,
151}
152
153impl<Types: NodeType> GlobalState<Types>
154where
155    for<'a> <<Types::SignatureKey as SignatureKey>::PureAssembledSignatureType as TryFrom<
156        &'a TaggedBase64,
157    >>::Error: Display,
158    for<'a> <Types::SignatureKey as TryFrom<&'a TaggedBase64>>::Error: Display,
159{
160    pub fn new(
161        config: BuilderConfig<Types>,
162        instance_state: Types::InstanceState,
163        protocol_max_block_size: u64,
164        num_nodes: usize,
165    ) -> Arc<Self> {
166        Arc::new(Self {
167            coordinator: Arc::new(BuilderStateCoordinator::new(
168                config.txn_channel_capacity,
169                config.txn_garbage_collect_duration,
170                config.tx_status_cache_capacity,
171            )),
172            block_store: RwLock::new(BlockStore::new()),
173            block_size_limits: BlockSizeLimits::new(
174                protocol_max_block_size,
175                config.max_block_size_increment_period,
176            ),
177            num_nodes: num_nodes.into(),
178            builder_keys: config.builder_keys,
179            max_api_waiting_time: config.max_api_waiting_time,
180            maximize_txn_capture_timeout: config.maximize_txn_capture_timeout,
181            instance_state,
182            base_fee: config.base_fee,
183        })
184    }
185
186    /// Spawns an event loop handling HotShot events from the provided stream.
187    /// Returns a handle for the spawned task.
188    pub fn start_event_loop(
189        self: Arc<Self>,
190        event_stream: impl Stream<Item = Event<Types>> + Unpin + Send + 'static,
191    ) -> JoinHandle<anyhow::Result<()>> {
192        spawn(self.event_loop(event_stream))
193    }
194
195    /// Internal implementation of the event loop, drives the underlying coordinator
196    /// and runs hooks
197    async fn event_loop(
198        self: Arc<Self>,
199        mut event_stream: impl Stream<Item = Event<Types>> + Unpin + Send + 'static,
200    ) -> anyhow::Result<()> {
201        loop {
202            let Some(event) = event_stream.next().await else {
203                anyhow::bail!("Event stream ended");
204            };
205
206            match event.event {
207                EventType::Error { error } => {
208                    error!("Error event in HotShot: {error:?}");
209                },
210                EventType::Transactions { transactions } => {
211                    let this = Arc::clone(&self);
212                    spawn(async move {
213                        transactions
214                            .into_iter()
215                            .map(|txn| {
216                                this.handle_transaction(ReceivedTransaction::new(
217                                    txn,
218                                    TransactionSource::Public,
219                                ))
220                            })
221                            .collect::<FuturesUnordered<_>>()
222                            .collect::<Vec<_>>()
223                            .await;
224                    });
225                },
226                EventType::Decide { leaf_chain, .. } => {
227                    let prune_cutoff = leaf_chain[0].leaf.view_number();
228
229                    let coordinator = Arc::clone(&self.coordinator);
230                    spawn(async move { coordinator.handle_decide(leaf_chain).await });
231
232                    let this = Arc::clone(&self);
233                    spawn(async move { this.block_store.write().await.prune(prune_cutoff) });
234                },
235                EventType::DaProposal { proposal, .. } => {
236                    let coordinator = Arc::clone(&self.coordinator);
237                    spawn(async move { coordinator.handle_da_proposal(proposal.data).await });
238                },
239                EventType::QuorumProposal { proposal, .. } => {
240                    let coordinator = Arc::clone(&self.coordinator);
241                    spawn(async move { coordinator.handle_quorum_proposal(proposal.data).await });
242                },
243                _ => {},
244            }
245        }
246    }
247
248    /// Consumes `self` and returns a `tide_disco` [`App`] with builder and private mempool APIs registered
249    pub fn into_app(
250        self: Arc<Self>,
251    ) -> Result<App<ProxyGlobalState<Types>, BuilderApiError>, AppError> {
252        let proxy = ProxyGlobalState(self);
253        let builder_api = define_api::<ProxyGlobalState<Types>, Types>(&Default::default())?;
254
255        // TODO: Replace StaticVersion with proper constant when added in HotShot
256        let private_mempool_api =
257            submit_api::<ProxyGlobalState<Types>, Types, StaticVersion<0, 1>>(&Default::default())?;
258
259        let mut app: App<ProxyGlobalState<Types>, BuilderApiError> = App::with_state(proxy);
260
261        app.register_module(hotshot_types::constants::LEGACY_BUILDER_MODULE, builder_api)?;
262
263        app.register_module("txn_submit", private_mempool_api)?;
264
265        Ok(app)
266    }
267
268    async fn handle_transaction(&self, tx: ReceivedTransaction<Types>) -> Result<(), Error<Types>> {
269        let len = tx.transaction.minimum_block_size();
270        let max_tx_len = self.block_size_limits.max_block_size();
271        if len > max_tx_len {
272            tracing::warn!(%tx.commit, %len, %max_tx_len, "Transaction too big");
273            let error = Error::TxTooBig { len, max_tx_len };
274            self.coordinator.update_txn_status(
275                &tx.commit,
276                TransactionStatus::Rejected {
277                    reason: error.to_string(),
278                },
279            );
280            return Err(error);
281        }
282        self.coordinator.handle_transaction(tx).await
283    }
284
285    async fn wait_for_builder_state(
286        &self,
287        state_id: &BuilderStateId<Types>,
288        check_period: Duration,
289    ) -> Result<Arc<BuilderState<Types>>, Error<Types>> {
290        loop {
291            match self.coordinator.lookup_builder_state(state_id).await {
292                BuilderStateLookup::Found(builder) => break Ok(builder),
293                BuilderStateLookup::Decided => {
294                    return Err(Error::AlreadyDecided);
295                },
296                BuilderStateLookup::NotFound => {
297                    sleep(check_period).await;
298                },
299            };
300        }
301    }
302
303    /// Build a block with provided builder state
304    ///
305    /// Returns None if there are no transactions to include
306    /// and we aren't prioritizing finalization for this builder state
307    pub(crate) async fn build_block(
308        &self,
309        builder_state: Arc<BuilderState<Types>>,
310    ) -> Result<Option<BlockInfo<Types>>, Error<Types>> {
311        let timeout_after = Instant::now() + self.maximize_txn_capture_timeout;
312        let sleep_interval = self.maximize_txn_capture_timeout / RETRY_LOOP_RESOLUTION;
313
314        while Instant::now() <= timeout_after {
315            let queue_populated = builder_state.collect_txns(timeout_after).await;
316
317            if queue_populated || Instant::now() + sleep_interval > timeout_after {
318                // we don't have time for another iteration
319                break;
320            }
321
322            sleep(sleep_interval).await
323        }
324
325        // If the parent block had transactions included and [`ALLOW_EMPTY_BLOCK_PERIOD`] views has not
326        // passed since, we will allow building empty blocks. This is done to allow for faster finalization
327        // of previous blocks that have had transactions included in them.
328        let should_prioritize_finalization = builder_state.parent_block_references.tx_count != 0
329            && builder_state
330                .parent_block_references
331                .last_nonempty_view
332                .map(|nonempty_view| {
333                    nonempty_view.saturating_sub(*builder_state.parent_block_references.view_number)
334                        < ALLOW_EMPTY_BLOCK_PERIOD
335                })
336                .unwrap_or(false);
337
338        let builder: &Arc<BuilderState<Types>> = &builder_state;
339        let max_block_size = self.block_size_limits.max_block_size();
340
341        let transactions_to_include = {
342            let txn_queue = builder.txn_queue.read().await;
343            if txn_queue.is_empty() && !should_prioritize_finalization {
344                // Don't build an empty block
345                return Ok(None);
346            }
347            txn_queue
348                .iter()
349                .scan(0, |total_size, tx| {
350                    let prev_size = *total_size;
351                    *total_size += tx.min_block_size;
352                    // We will include one transaction over our target block length
353                    // if it's the first transaction in queue, otherwise we'd have a possible failure
354                    // state where a single transaction larger than target block state is stuck in
355                    // queue and we just build empty blocks forever
356                    if *total_size >= max_block_size && prev_size != 0 {
357                        None
358                    } else {
359                        // Note: we're going to map from ReceivedTransaction to
360                        // Transaction it contains later, so we can just clone
361                        // the Arc here to reduce the time we hold the lock
362                        Some(Arc::clone(tx))
363                    }
364                })
365                .collect::<Vec<_>>()
366        };
367
368        let (payload, metadata) =
369            match <Types::BlockPayload as BlockPayload<Types>>::from_transactions(
370                transactions_to_include
371                    .into_iter()
372                    .map(|tx| tx.transaction.clone()),
373                &builder.validated_state,
374                &self.instance_state,
375            )
376            .await
377            {
378                Ok((payload, metadata)) => (payload, metadata),
379                Err(error) => {
380                    warn!(?error, "Failed to build block payload");
381                    return Err(Error::BuildBlock(error));
382                },
383            };
384
385        // count the number of txns
386        let actual_txn_count = payload.num_transactions(&metadata);
387        let truncated = actual_txn_count == 0;
388
389        // Payload is empty despite us checking that tx_queue isn't empty earlier.
390        //
391        // This means that the block was truncated due to *sequencer* block length
392        // limits, which are different from our `max_block_size`. There's no good way
393        // for us to check for this in advance, so we detect transactions too big for
394        // the sequencer indirectly, by observing that we passed some transactions
395        // to `<Types::BlockPayload as BlockPayload<Types>>::from_transactions`, but
396        // it returned an empty block.
397        // Thus we deduce that the first transaction in our queue is too big to *ever*
398        // be included, because it alone goes over sequencer's block size limit.
399        if truncated {
400            builder.txn_queue.write().await.pop_front();
401            if !should_prioritize_finalization {
402                return Ok(None);
403            }
404        }
405
406        let encoded_txns: Vec<u8> = payload.encode().to_vec();
407        let block_size: u64 = encoded_txns.len() as u64;
408        let offered_fee: u64 = self.base_fee * block_size;
409
410        info!(
411            builder_id = %builder.id(),
412            txn_count = actual_txn_count,
413            block_size,
414            "Built a block",
415        );
416
417        Ok(Some(BlockInfo {
418            block_payload: payload,
419            block_size,
420            metadata,
421            offered_fee,
422            truncated,
423        }))
424    }
425
426    #[instrument(skip_all,
427        fields(state_id = %state_id)
428    )]
429    pub(crate) async fn available_blocks_implementation(
430        &self,
431        state_id: BuilderStateId<Types>,
432    ) -> Result<Vec<AvailableBlockInfo<Types>>, Error<Types>> {
433        let start = Instant::now();
434
435        let check_period = self.max_api_waiting_time / RETRY_LOOP_RESOLUTION;
436        let time_to_wait_for_matching_builder =
437            self.max_api_waiting_time / BUILDER_STATE_EXACT_MATCH_DIVISOR;
438
439        let builder = match timeout(
440            time_to_wait_for_matching_builder,
441            self.wait_for_builder_state(&state_id, check_period),
442        )
443        .await
444        {
445            Ok(Ok(builder)) => Some(builder),
446            Err(_) => {
447                // Timeout waiting for ideal state, get the highest view builder instead
448                warn!("Couldn't find the ideal builder state");
449                self.coordinator.highest_view_builder().await
450            },
451            Ok(Err(e)) => {
452                // State already decided
453                let lowest_view = self.coordinator.lowest_view().await;
454                warn!(
455                    ?lowest_view,
456                    "get_available_blocks request for decided view"
457                );
458                return Err(e);
459            },
460        };
461
462        let Some(builder) = builder else {
463            if let Some(cached_block) = self.block_store.read().await.get_cached(&state_id) {
464                return Ok(vec![cached_block.signed_response(&self.builder_keys)?]);
465            } else {
466                return Err(Error::NotFound);
467            };
468        };
469
470        let build_block_timeout = self
471            .max_api_waiting_time
472            .saturating_sub(start.elapsed())
473            .div_f32(1.1);
474        match timeout(build_block_timeout, self.build_block(builder))
475            .await
476            .map_err(|_| Error::ApiTimeout)
477        {
478            // Success
479            Ok(Ok(Some(info))) => {
480                let block_id = BlockId {
481                    hash: info.block_payload.builder_commitment(&info.metadata),
482                    view: state_id.parent_view,
483                };
484
485                let response = info.signed_response(&self.builder_keys)?;
486
487                {
488                    let mut mutable_state = self.block_store.write().await;
489                    mutable_state.update(state_id, block_id, info);
490                }
491
492                Ok(vec![response])
493            },
494            // Success, but no block: we don't have transactions and aren't prioritizing finalization
495            Ok(Ok(None)) => Ok(vec![]),
496            // Error building block, try to respond with a cached one as last-ditch attempt
497            Ok(Err(e)) | Err(e) => {
498                if let Some(cached_block) = self.block_store.read().await.get_cached(&state_id) {
499                    Ok(vec![cached_block.signed_response(&self.builder_keys)?])
500                } else {
501                    Err(e)
502                }
503            },
504        }
505    }
506
507    #[instrument(skip_all,
508        fields(block_id = %block_id)
509    )]
510    pub(crate) async fn claim_block_implementation(
511        &self,
512        block_id: BlockId<Types>,
513    ) -> Result<AvailableBlockData<Types>, Error<Types>> {
514        let (block_payload, metadata) = {
515            // We store this read lock guard separately to make it explicit
516            // that this will end up holding a lock for the duration of this
517            // closure.
518            //
519            // Additionally, we clone the properties from the block_info that
520            // end up being cloned if found anyway.  Since we know this already
521            // we can perform the clone here to avoid holding the lock for
522            // longer than needed.
523            let mutable_state_read = self.block_store.read().await;
524            let block_info = mutable_state_read
525                .get_block(&block_id)
526                .ok_or(Error::NotFound)?;
527
528            (
529                block_info.block_payload.clone(),
530                block_info.metadata.clone(),
531            )
532        };
533
534        let (pub_key, sign_key) = self.builder_keys.clone();
535
536        // sign over the builder commitment, as the proposer can computer it based on provide block_payload
537        // and the metadata
538        let response_block_hash = block_payload.builder_commitment(&metadata);
539        let signature_over_builder_commitment =
540            <Types as NodeType>::BuilderSignatureKey::sign_builder_message(
541                &sign_key,
542                response_block_hash.as_ref(),
543            )
544            .map_err(Error::Signing)?;
545
546        let block_data = AvailableBlockData::<Types> {
547            block_payload,
548            metadata,
549            signature: signature_over_builder_commitment,
550            sender: pub_key,
551        };
552
553        info!("Sending Claim Block data for {block_id}");
554        Ok(block_data)
555    }
556
557    #[instrument(skip_all,
558        fields(block_id = %block_id)
559    )]
560    pub(crate) async fn claim_block_header_input_implementation(
561        &self,
562        block_id: BlockId<Types>,
563    ) -> Result<(bool, AvailableBlockHeaderInputV1<Types>), Error<Types>> {
564        let metadata;
565        let offered_fee;
566        let truncated;
567        {
568            // We store this read lock guard separately to make it explicit
569            // that this will end up holding a lock for the duration of this
570            // closure.
571            //
572            // Additionally, we clone the properties from the block_info that
573            // end up being cloned if found anyway.  Since we know this already
574            // we can perform the clone here to avoid holding the lock for
575            // longer than needed.
576            let mutable_state_read_lock_guard = self.block_store.read().await;
577            let block_info = mutable_state_read_lock_guard
578                .get_block(&block_id)
579                .ok_or(Error::NotFound)?;
580
581            metadata = block_info.metadata.clone();
582            offered_fee = block_info.offered_fee;
583            truncated = block_info.truncated;
584        };
585
586        // TODO Add precompute back.
587
588        let signature_over_fee_info =
589            Types::BuilderSignatureKey::sign_fee(&self.builder_keys.1, offered_fee, &metadata)
590                .map_err(Error::Signing)?;
591
592        let response = AvailableBlockHeaderInputV1::<Types> {
593            fee_signature: signature_over_fee_info,
594            sender: self.builder_keys.0.clone(),
595        };
596        info!("Sending Claim Block Header Input response");
597        Ok((truncated, response))
598    }
599}
600
601#[derive(derive_more::Deref, derive_more::DerefMut)]
602#[deref(forward)]
603#[deref_mut(forward)]
604pub struct ProxyGlobalState<Types: NodeType>(pub Arc<GlobalState<Types>>);
605
606/*
607Handling Builder API responses
608*/
609#[async_trait]
610impl<Types: NodeType> BuilderDataSource<Types> for ProxyGlobalState<Types>
611where
612    for<'a> <<Types::SignatureKey as SignatureKey>::PureAssembledSignatureType as TryFrom<
613        &'a TaggedBase64,
614    >>::Error: Display,
615    for<'a> <Types::SignatureKey as TryFrom<&'a TaggedBase64>>::Error: Display,
616{
617    #[tracing::instrument(skip_all)]
618    async fn available_blocks(
619        &self,
620        parent_block: &VidCommitment,
621        parent_view: u64,
622        sender: Types::SignatureKey,
623        signature: &<Types::SignatureKey as SignatureKey>::PureAssembledSignatureType,
624    ) -> Result<Vec<AvailableBlockInfo<Types>>, BuildError> {
625        // verify the signature
626        if !sender.validate(signature, parent_block.as_ref()) {
627            warn!("Signature validation failed");
628            return Err(Error::<Types>::SignatureValidation.into());
629        }
630
631        let state_id = BuilderStateId {
632            parent_commitment: *parent_block,
633            parent_view: Types::View::new(parent_view),
634        };
635
636        trace!("Requesting available blocks");
637
638        let available_blocks = timeout(
639            self.max_api_waiting_time,
640            self.available_blocks_implementation(state_id),
641        )
642        .await
643        .map_err(|_| Error::<Types>::ApiTimeout)??;
644
645        Ok(available_blocks)
646    }
647
648    #[tracing::instrument(skip_all)]
649    async fn claim_block(
650        &self,
651        block_hash: &BuilderCommitment,
652        view_number: u64,
653        sender: Types::SignatureKey,
654        signature: &<<Types as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
655    ) -> Result<AvailableBlockData<Types>, BuildError> {
656        // verify the signature
657        if !sender.validate(signature, block_hash.as_ref()) {
658            warn!("Signature validation failed");
659            return Err(Error::<Types>::SignatureValidation.into());
660        }
661
662        let block_id = BlockId {
663            hash: block_hash.clone(),
664            view: Types::View::new(view_number),
665        };
666
667        trace!("Processing claim block request");
668
669        let block = timeout(
670            self.max_api_waiting_time,
671            self.claim_block_implementation(block_id),
672        )
673        .await
674        .map_err(|_| Error::<Types>::ApiTimeout)??;
675
676        Ok(block)
677    }
678
679    #[tracing::instrument(skip_all)]
680    async fn claim_block_with_num_nodes(
681        &self,
682        block_hash: &BuilderCommitment,
683        view_number: u64,
684        sender: <Types as NodeType>::SignatureKey,
685        signature: &<<Types as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
686        num_nodes: usize,
687    ) -> Result<AvailableBlockData<Types>, BuildError> {
688        // Update the stored `num_nodes` with the given value, which will be used for VID computation.
689        trace!(
690            new_num_nodes = num_nodes,
691            old_num_nodes = self.num_nodes.load(Ordering::Relaxed),
692            "Updating num_nodes"
693        );
694
695        self.num_nodes.store(num_nodes, Ordering::Relaxed);
696
697        self.claim_block(block_hash, view_number, sender, signature)
698            .await
699    }
700
701    #[tracing::instrument(skip_all)]
702    async fn claim_block_header_input(
703        &self,
704        block_hash: &BuilderCommitment,
705        view_number: u64,
706        sender: Types::SignatureKey,
707        signature: &<<Types as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
708    ) -> Result<AvailableBlockHeaderInputV1<Types>, BuildError> {
709        let start = Instant::now();
710        // verify the signature
711        if !sender.validate(signature, block_hash.as_ref()) {
712            warn!("Signature validation failed in claim_block_header_input");
713            return Err(Error::<Types>::SignatureValidation.into());
714        }
715
716        let block_id = BlockId {
717            hash: block_hash.clone(),
718            view: Types::View::new(view_number),
719        };
720
721        trace!("Processing claim_block_header_input request");
722
723        let (truncated, info) = timeout(
724            self.max_api_waiting_time,
725            self.claim_block_header_input_implementation(block_id),
726        )
727        .await
728        .inspect_err(|_| {
729            // we can't keep up with this block size, reduce max block size
730            self.block_size_limits.decrement_block_size();
731        })
732        .map_err(|_| Error::<Types>::ApiTimeout)??;
733
734        if self.max_api_waiting_time.saturating_sub(start.elapsed())
735            > self.max_api_waiting_time / VID_RESPONSE_TARGET_MARGIN_DIVISOR
736        {
737            // Increase max block size
738            self.block_size_limits.try_increment_block_size(truncated);
739        }
740
741        Ok(info)
742    }
743
744    /// Returns the public key of the builder
745    #[tracing::instrument(skip_all)]
746    async fn builder_address(
747        &self,
748    ) -> Result<<Types as NodeType>::BuilderSignatureKey, BuildError> {
749        Ok(self.builder_keys.0.clone())
750    }
751}
752
753#[async_trait]
754impl<Types: NodeType> AcceptsTxnSubmits<Types> for ProxyGlobalState<Types>
755where
756    for<'a> <<Types::SignatureKey as SignatureKey>::PureAssembledSignatureType as TryFrom<
757        &'a TaggedBase64,
758    >>::Error: Display,
759    for<'a> <Types::SignatureKey as TryFrom<&'a TaggedBase64>>::Error: Display,
760{
761    async fn submit_txns(
762        &self,
763        txns: Vec<<Types as NodeType>::Transaction>,
764    ) -> Result<Vec<Commitment<<Types as NodeType>::Transaction>>, BuildError> {
765        txns.into_iter()
766            .map(|txn| ReceivedTransaction::new(txn, TransactionSource::Private))
767            .map(|txn| async {
768                let commit = txn.commit;
769                self.0.handle_transaction(txn).await?;
770                Ok(commit)
771            })
772            .collect::<FuturesOrdered<_>>()
773            .try_collect()
774            .await
775    }
776
777    async fn txn_status(
778        &self,
779        txn_hash: Commitment<<Types as NodeType>::Transaction>,
780    ) -> Result<TransactionStatus, BuildError> {
781        Ok(self.coordinator.tx_status(&txn_hash))
782    }
783}
784
785#[async_trait]
786impl<Types: NodeType> ReadState for ProxyGlobalState<Types> {
787    type State = ProxyGlobalState<Types>;
788
789    async fn read<T>(
790        &self,
791        op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait,
792    ) -> T {
793        op(self).await
794    }
795}