1use std::{
8 sync::Arc,
9 time::{Duration, Instant},
10};
11
12use async_broadcast::{Receiver, Sender};
13use async_trait::async_trait;
14use futures::{stream::FuturesUnordered, StreamExt};
15use hotshot_builder_api::v0_1::block_info::AvailableBlockInfo;
16use hotshot_task::task::TaskState;
17use hotshot_types::{
18 consensus::OuterConsensus,
19 data::{null_block, PackedBundle, VidCommitment},
20 epoch_membership::EpochMembershipCoordinator,
21 event::{Event, EventType},
22 message::UpgradeLock,
23 traits::{
24 block_contents::{BlockHeader, BuilderFee, EncodeBytes},
25 node_implementation::{ConsensusTime, NodeType, Versions},
26 signature_key::{BuilderSignatureKey, SignatureKey},
27 BlockPayload,
28 },
29 utils::{is_epoch_transition, is_last_block, ViewInner},
30};
31use hotshot_utils::anytrace::*;
32use tokio::time::{sleep, timeout};
33use tracing::instrument;
34use vbs::version::{StaticVersionType, Version};
35
36use crate::{
37 builder::v0_1::BuilderClient as BuilderClientBase,
38 events::{HotShotEvent, HotShotTaskCompleted},
39 helpers::broadcast_event,
40};
41
42const BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND: usize = 2;
46const BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR: usize = 3;
48const BUILDER_MAIN_BATCH_CUTOFF: Duration = Duration::from_millis(700);
50const BUILDER_ADDITIONAL_TIME_MULTIPLIER: f32 = 0.2;
52const BUILDER_MINIMUM_QUERY_TIME: Duration = Duration::from_millis(300);
55const RETRY_DELAY: Duration = Duration::from_millis(100);
57
58pub struct BuilderResponse<TYPES: NodeType> {
60 pub fee: BuilderFee<TYPES>,
62
63 pub block_payload: TYPES::BlockPayload,
65
66 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
68}
69
70pub struct TransactionTaskState<TYPES: NodeType, V: Versions> {
72 pub builder_timeout: Duration,
74
75 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
77
78 pub cur_view: TYPES::View,
80
81 pub cur_epoch: Option<TYPES::Epoch>,
83
84 pub consensus: OuterConsensus<TYPES>,
86
87 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
89
90 pub builder_clients: Vec<BuilderClientBase<TYPES>>,
92
93 pub public_key: TYPES::SignatureKey,
95
96 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
98
99 pub instance_state: Arc<TYPES::InstanceState>,
101
102 pub id: u64,
104
105 pub upgrade_lock: UpgradeLock<TYPES, V>,
107
108 pub epoch_height: u64,
110}
111
112impl<TYPES: NodeType, V: Versions> TransactionTaskState<TYPES, V> {
113 pub async fn handle_view_change(
115 &mut self,
116 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
117 block_view: TYPES::View,
118 block_epoch: Option<TYPES::Epoch>,
119 vid: Option<VidCommitment>,
120 ) -> Option<HotShotTaskCompleted> {
121 self.handle_view_change_legacy(event_stream, block_view, block_epoch, vid)
122 .await
123 }
124
125 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Transaction task", level = "error", target = "TransactionTaskState")]
127 pub async fn handle_view_change_legacy(
128 &mut self,
129 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
130 block_view: TYPES::View,
131 block_epoch: Option<TYPES::Epoch>,
132 vid: Option<VidCommitment>,
133 ) -> Option<HotShotTaskCompleted> {
134 let version = match self.upgrade_lock.version(block_view).await {
135 Ok(v) => v,
136 Err(err) => {
137 tracing::error!(
138 "Upgrade certificate requires unsupported version, refusing to request \
139 blocks: {err}"
140 );
141 return None;
142 },
143 };
144
145 if version >= V::Epochs::VERSION {
148 let Some(epoch) = block_epoch else {
149 tracing::error!("Epoch is required for epoch-based view change");
150 return None;
151 };
152 let high_qc = self.consensus.read().await.high_qc().clone();
153 let mut high_qc_block_number = if let Some(bn) = high_qc.data.block_number {
154 bn
155 } else {
156 if block_view
159 > self
160 .upgrade_lock
161 .upgrade_view()
162 .await
163 .unwrap_or(TYPES::View::new(0))
164 + 1
165 {
166 tracing::warn!("High QC in epoch version and not the first QC after upgrade");
167 self.send_empty_block(event_stream, block_view, block_epoch, version)
168 .await;
169 return None;
170 }
171 0
173 };
174 high_qc_block_number = std::cmp::max(
175 high_qc_block_number,
176 self.consensus.read().await.highest_block,
177 );
178 if self
179 .consensus
180 .read()
181 .await
182 .transition_qc()
183 .is_some_and(|qc| {
184 let Some(e) = qc.0.data.epoch else {
185 return false;
186 };
187 e == epoch
188 })
189 || is_epoch_transition(high_qc_block_number, self.epoch_height)
190 {
191 if !is_last_block(high_qc_block_number, self.epoch_height) {
193 tracing::info!(
194 "Sending empty block event. View number: {block_view}. Parent Block \
195 number: {high_qc_block_number}"
196 );
197 self.send_empty_block(event_stream, block_view, block_epoch, version)
198 .await;
199 return None;
200 }
201 }
202 }
203
204 let block = {
206 if self
207 .upgrade_lock
208 .decided_upgrade_certificate
209 .read()
210 .await
211 .as_ref()
212 .is_some_and(|cert| cert.upgrading_in(block_view))
213 {
214 None
215 } else {
216 self.wait_for_block(block_view, vid).await
217 }
218 };
219
220 if let Some(BuilderResponse {
221 block_payload,
222 metadata,
223 fee,
224 }) = block
225 {
226 broadcast_event(
227 Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
228 block_payload.encode(),
229 metadata,
230 block_view,
231 block_epoch,
232 vec1::vec1![fee],
233 ))),
234 event_stream,
235 )
236 .await;
237 } else {
238 self.send_empty_block(event_stream, block_view, block_epoch, version)
239 .await;
240 };
241
242 return None;
243 }
244
245 async fn send_empty_block(
247 &self,
248 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
249 block_view: TYPES::View,
250 block_epoch: Option<TYPES::Epoch>,
251 version: Version,
252 ) {
253 tracing::info!("Failed to get a block for view {block_view}, proposing empty block");
255
256 self.consensus
258 .write()
259 .await
260 .metrics
261 .number_of_empty_blocks_proposed
262 .add(1);
263
264 let num_storage_nodes = match self
265 .membership_coordinator
266 .stake_table_for_epoch(block_epoch)
267 .await
268 {
269 Ok(epoch_stake_table) => epoch_stake_table.total_nodes().await,
270 Err(e) => {
271 tracing::warn!("Failed to get num_storage_nodes for epoch {block_epoch:?}: {e}");
272 return;
273 },
274 };
275
276 let Some(null_fee) = null_block::builder_fee::<TYPES, V>(num_storage_nodes, version) else {
277 tracing::error!("Failed to get null fee");
278 return;
279 };
280
281 let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
283
284 broadcast_event(
286 Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
287 vec![].into(),
288 metadata,
289 block_view,
290 block_epoch,
291 vec1::vec1![null_fee],
292 ))),
293 event_stream,
294 )
295 .await;
296 }
297
298 pub async fn null_block(
300 &self,
301 block_view: TYPES::View,
302 block_epoch: Option<TYPES::Epoch>,
303 version: Version,
304 num_storage_nodes: usize,
305 ) -> Option<PackedBundle<TYPES>> {
306 let Some(null_fee) = null_block::builder_fee::<TYPES, V>(num_storage_nodes, version) else {
307 tracing::error!("Failed to calculate null block fee.");
308 return None;
309 };
310
311 let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
313
314 Some(PackedBundle::new(
315 vec![].into(),
316 metadata,
317 block_view,
318 block_epoch,
319 vec1::vec1![null_fee],
320 ))
321 }
322
323 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Transaction task", level = "error", target = "TransactionTaskState")]
325 pub async fn handle(
326 &mut self,
327 event: Arc<HotShotEvent<TYPES>>,
328 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
329 ) -> Result<()> {
330 match event.as_ref() {
331 HotShotEvent::TransactionsRecv(transactions) => {
332 broadcast_event(
333 Event {
334 view_number: self.cur_view,
335 event: EventType::Transactions {
336 transactions: transactions.clone(),
337 },
338 },
339 &self.output_event_stream,
340 )
341 .await;
342 },
343 HotShotEvent::ViewChange(view, epoch) => {
344 let view = TYPES::View::new(std::cmp::max(1, **view));
345 ensure!(
346 *view > *self.cur_view && *epoch >= self.cur_epoch,
347 debug!(
348 "Received a view change to an older view and epoch: tried to change view \
349 to {view}and epoch {epoch:?} though we are at view {} and epoch {:?}",
350 self.cur_view, self.cur_epoch
351 )
352 );
353 self.cur_view = view;
354 self.cur_epoch = *epoch;
355
356 let leader = self
357 .membership_coordinator
358 .membership_for_epoch(*epoch)
359 .await?
360 .leader(view)
361 .await?;
362 if leader == self.public_key {
363 self.handle_view_change(&event_stream, view, *epoch, None)
364 .await;
365 return Ok(());
366 }
367 },
368 HotShotEvent::QuorumProposalValidated(proposal, _leaf) => {
369 let view_number = proposal.data.view_number();
370 let next_view = view_number + 1;
371
372 let version = match self.upgrade_lock.version(next_view).await {
373 Ok(v) => v,
374 Err(e) => {
375 tracing::error!("Failed to calculate version: {e:?}");
376 return Ok(());
377 },
378 };
379
380 if version < V::DrbAndHeaderUpgrade::VERSION {
381 return Ok(());
382 }
383
384 let vid = proposal.data.block_header().payload_commitment();
385 let block_height = proposal.data.block_header().block_number();
386 if is_epoch_transition(block_height, self.epoch_height) {
387 return Ok(());
388 }
389 if next_view <= self.cur_view {
390 return Ok(());
391 }
392 self.cur_view = next_view;
394
395 let leader = self
396 .membership_coordinator
397 .membership_for_epoch(self.cur_epoch)
398 .await?
399 .leader(next_view)
400 .await?;
401 if leader == self.public_key {
402 self.handle_view_change(&event_stream, next_view, self.cur_epoch, Some(vid))
403 .await;
404 return Ok(());
405 }
406 },
407 _ => {},
408 }
409 Ok(())
410 }
411
412 #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
415 async fn last_vid_commitment_retry(
416 &self,
417 block_view: TYPES::View,
418 task_start_time: Instant,
419 ) -> Result<(TYPES::View, VidCommitment)> {
420 loop {
421 match self.last_vid_commitment(block_view).await {
422 Ok((view, comm)) => break Ok((view, comm)),
423 Err(e) if task_start_time.elapsed() >= self.builder_timeout => break Err(e),
424 _ => {
425 sleep(RETRY_DELAY).await;
427 continue;
428 },
429 }
430 }
431 }
432
433 #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
436 async fn last_vid_commitment(
437 &self,
438 block_view: TYPES::View,
439 ) -> Result<(TYPES::View, VidCommitment)> {
440 let consensus_reader = self.consensus.read().await;
441 let mut target_view = TYPES::View::new(block_view.saturating_sub(1));
442
443 loop {
444 let view_data = consensus_reader
445 .validated_state_map()
446 .get(&target_view)
447 .context(info!(
448 "Missing record for view {target_view} in validated state",
449 ))?;
450
451 match &view_data.view_inner {
452 ViewInner::Da {
453 payload_commitment, ..
454 } => return Ok((target_view, *payload_commitment)),
455 ViewInner::Leaf {
456 leaf: leaf_commitment,
457 ..
458 } => {
459 let leaf = consensus_reader
460 .saved_leaves()
461 .get(leaf_commitment)
462 .context(info!(
463 "Missing leaf with commitment {leaf_commitment} for view \
464 {target_view} in saved_leaves",
465 ))?;
466 return Ok((target_view, leaf.payload_commitment()));
467 },
468 ViewInner::Failed => {
469 target_view = TYPES::View::new(target_view.checked_sub(1).context(warn!(
471 "Reached genesis. Something is wrong -- have we not decided any blocks \
472 since genesis?"
473 ))?);
474 continue;
475 },
476 }
477 }
478 }
479
480 #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view), name = "wait_for_block", level = "error")]
481 async fn wait_for_block(
482 &self,
483 block_view: TYPES::View,
484 vid: Option<VidCommitment>,
485 ) -> Option<BuilderResponse<TYPES>> {
486 let task_start_time = Instant::now();
487
488 let (parent_view, parent_comm) = if let Some(vid) = vid {
490 (block_view - 1, vid)
491 } else {
492 match self
493 .last_vid_commitment_retry(block_view, task_start_time)
494 .await
495 {
496 Ok((v, c)) => (v, c),
497 Err(e) => {
498 tracing::warn!("Failed to find last vid commitment in time: {e}");
499 return None;
500 },
501 }
502 };
503
504 let parent_comm_sig = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
505 &self.private_key,
506 parent_comm.as_ref(),
507 ) {
508 Ok(sig) => sig,
509 Err(err) => {
510 tracing::error!(%err, "Failed to sign block hash");
511 return None;
512 },
513 };
514
515 while task_start_time.elapsed() < self.builder_timeout {
516 match timeout(
517 self.builder_timeout
518 .saturating_sub(task_start_time.elapsed()),
519 self.block_from_builder(parent_comm, parent_view, &parent_comm_sig),
520 )
521 .await
522 {
523 Ok(Ok(block)) => {
525 return Some(block);
526 },
527
528 Ok(Err(err)) => {
530 tracing::info!("Couldn't get a block: {err:#}");
531 sleep(RETRY_DELAY).await;
533 continue;
534 },
535
536 Err(err) => {
538 tracing::info!(%err, "Timeout while getting available blocks");
539 return None;
540 },
541 }
542 }
543
544 tracing::warn!("could not get a block from the builder in time");
545 None
546 }
547
548 async fn get_available_blocks(
551 &self,
552 parent_comm: VidCommitment,
553 view_number: TYPES::View,
554 parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
555 ) -> Vec<(AvailableBlockInfo<TYPES>, usize)> {
556 let tasks = self
557 .builder_clients
558 .iter()
559 .enumerate()
560 .map(|(builder_idx, client)| async move {
561 client
562 .available_blocks(
563 parent_comm,
564 view_number.u64(),
565 self.public_key.clone(),
566 parent_comm_sig,
567 )
568 .await
569 .map(move |blocks| {
570 blocks
571 .into_iter()
572 .map(move |block_info| (block_info, builder_idx))
573 })
574 })
575 .collect::<FuturesUnordered<_>>();
576 let mut results = Vec::with_capacity(self.builder_clients.len());
577 let query_start = Instant::now();
578 let threshold = (self.builder_clients.len() * BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND)
579 .div_ceil(BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR);
580 let mut tasks = tasks.take(threshold);
581 while let Some(result) = tasks.next().await {
582 results.push(result);
583 if query_start.elapsed() > BUILDER_MAIN_BATCH_CUTOFF {
584 break;
585 }
586 }
587 let timeout = sleep(std::cmp::max(
588 query_start
589 .elapsed()
590 .mul_f32(BUILDER_ADDITIONAL_TIME_MULTIPLIER),
591 BUILDER_MINIMUM_QUERY_TIME.saturating_sub(query_start.elapsed()),
592 ));
593 futures::pin_mut!(timeout);
594 let mut tasks = tasks.into_inner().take_until(timeout);
595 while let Some(result) = tasks.next().await {
596 results.push(result);
597 }
598 results
599 .into_iter()
600 .filter_map(|result| result.ok())
601 .flatten()
602 .collect::<Vec<_>>()
603 }
604
605 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "block_from_builder", level = "error")]
613 async fn block_from_builder(
614 &self,
615 parent_comm: VidCommitment,
616 view_number: TYPES::View,
617 parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
618 ) -> Result<BuilderResponse<TYPES>> {
619 let mut available_blocks = self
620 .get_available_blocks(parent_comm, view_number, parent_comm_sig)
621 .await;
622
623 available_blocks.sort_by(|(l, _), (r, _)| {
624 (u128::from(l.offered_fee) * u128::from(r.block_size))
632 .cmp(&(u128::from(r.offered_fee) * u128::from(l.block_size)))
633 });
634
635 if available_blocks.is_empty() {
636 tracing::info!("No available blocks");
637 bail!("No available blocks");
638 }
639
640 for (block_info, builder_idx) in available_blocks {
641 if !block_info.sender.validate_block_info_signature(
643 &block_info.signature,
644 block_info.block_size,
645 block_info.offered_fee,
646 &block_info.block_hash,
647 ) {
648 tracing::warn!("Failed to verify available block info response message signature");
649 continue;
650 }
651
652 let request_signature = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
653 &self.private_key,
654 block_info.block_hash.as_ref(),
655 ) {
656 Ok(request_signature) => request_signature,
657 Err(err) => {
658 tracing::error!(%err, "Failed to sign block hash");
659 continue;
660 },
661 };
662
663 let response = {
664 let client = &self.builder_clients[builder_idx];
665
666 let (block, either_header_input) = futures::join! {
667 client.claim_block(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature),
668 client.claim_either_block_header_input(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature)
669 };
670
671 let block_data = match block {
672 Ok(block_data) => block_data,
673 Err(err) => {
674 tracing::warn!(%err, "Error claiming block data");
675 continue;
676 },
677 };
678
679 let Ok(either_header_input) = either_header_input
680 .inspect_err(|err| tracing::warn!(%err, "Error claiming header input"))
681 else {
682 continue;
683 };
684
685 let Some(header_input) = either_header_input
686 .validate_signature_and_get_input(block_info.offered_fee, &block_data.metadata)
687 else {
688 tracing::warn!(
689 "Failed to verify available new or legacy block header input data \
690 response message signature"
691 );
692 continue;
693 };
694
695 if !block_data.validate_signature() {
697 tracing::warn!(
698 "Failed to verify available block data response message signature"
699 );
700 continue;
701 }
702
703 let fee = BuilderFee {
704 fee_amount: block_info.offered_fee,
705 fee_account: header_input.sender,
706 fee_signature: header_input.fee_signature,
707 };
708
709 BuilderResponse {
710 fee,
711 block_payload: block_data.block_payload,
712 metadata: block_data.metadata,
713 }
714 };
715
716 return Ok(response);
717 }
718
719 bail!("Couldn't claim a block from any of the builders");
720 }
721}
722
723#[async_trait]
724impl<TYPES: NodeType, V: Versions> TaskState for TransactionTaskState<TYPES, V> {
726 type Event = HotShotEvent<TYPES>;
727
728 async fn handle_event(
729 &mut self,
730 event: Arc<Self::Event>,
731 sender: &Sender<Arc<Self::Event>>,
732 _receiver: &Receiver<Arc<Self::Event>>,
733 ) -> Result<()> {
734 self.handle(event, sender.clone()).await
735 }
736
737 fn cancel_subtasks(&mut self) {}
738}