1use std::{
8 sync::Arc,
9 time::{Duration, Instant},
10};
11
12use async_broadcast::{Receiver, Sender};
13use async_trait::async_trait;
14use futures::{StreamExt, stream::FuturesUnordered};
15use hotshot_builder_api::v0_1::block_info::AvailableBlockInfo;
16use hotshot_task::task::TaskState;
17use hotshot_types::{
18 consensus::OuterConsensus,
19 data::{EpochNumber, PackedBundle, VidCommitment, ViewNumber, null_block},
20 epoch_membership::EpochMembershipCoordinator,
21 event::{Event, EventType},
22 message::UpgradeLock,
23 traits::{
24 BlockPayload,
25 block_contents::{BlockHeader, BuilderFee, EncodeBytes},
26 node_implementation::NodeType,
27 signature_key::{BuilderSignatureKey, SignatureKey},
28 },
29 utils::{ViewInner, is_epoch_transition, is_last_block},
30};
31use hotshot_utils::anytrace::*;
32use tokio::time::{sleep, timeout};
33use tracing::instrument;
34use vbs::version::Version;
35use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_VERSION};
36
37use crate::{
38 builder::v0_1::BuilderClient as BuilderClientBase,
39 events::{HotShotEvent, HotShotTaskCompleted},
40 helpers::broadcast_event,
41};
42
43const BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND: usize = 2;
47const BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR: usize = 3;
49const BUILDER_MAIN_BATCH_CUTOFF: Duration = Duration::from_millis(700);
51const BUILDER_ADDITIONAL_TIME_MULTIPLIER: f32 = 0.2;
53const BUILDER_MINIMUM_QUERY_TIME: Duration = Duration::from_millis(300);
56const RETRY_DELAY: Duration = Duration::from_millis(100);
58
59pub struct BuilderResponse<TYPES: NodeType> {
61 pub fee: BuilderFee<TYPES>,
63
64 pub block_payload: TYPES::BlockPayload,
66
67 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
69}
70
71pub struct TransactionTaskState<TYPES: NodeType> {
73 pub builder_timeout: Duration,
75
76 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
78
79 pub cur_view: ViewNumber,
81
82 pub cur_epoch: Option<EpochNumber>,
84
85 pub consensus: OuterConsensus<TYPES>,
87
88 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
90
91 pub builder_clients: Vec<BuilderClientBase<TYPES>>,
93
94 pub public_key: TYPES::SignatureKey,
96
97 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
99
100 pub instance_state: Arc<TYPES::InstanceState>,
102
103 pub id: u64,
105
106 pub upgrade_lock: UpgradeLock<TYPES>,
108
109 pub epoch_height: u64,
111}
112
113impl<TYPES: NodeType> TransactionTaskState<TYPES> {
114 pub async fn handle_view_change(
116 &mut self,
117 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
118 block_view: ViewNumber,
119 block_epoch: Option<EpochNumber>,
120 vid: Option<VidCommitment>,
121 ) -> Option<HotShotTaskCompleted> {
122 self.handle_view_change_legacy(event_stream, block_view, block_epoch, vid)
123 .await
124 }
125
126 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Transaction task", level = "error", target = "TransactionTaskState")]
128 pub async fn handle_view_change_legacy(
129 &mut self,
130 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
131 block_view: ViewNumber,
132 block_epoch: Option<EpochNumber>,
133 vid: Option<VidCommitment>,
134 ) -> Option<HotShotTaskCompleted> {
135 let version = match self.upgrade_lock.version(block_view) {
136 Ok(v) => v,
137 Err(err) => {
138 tracing::error!(
139 "Upgrade certificate requires unsupported version, refusing to request \
140 blocks: {err}"
141 );
142 return None;
143 },
144 };
145
146 if version >= EPOCH_VERSION {
149 let Some(epoch) = block_epoch else {
150 tracing::error!("Epoch is required for epoch-based view change");
151 return None;
152 };
153 let high_qc = self.consensus.read().await.high_qc().clone();
154 let mut high_qc_block_number = if let Some(bn) = high_qc.data.block_number {
155 bn
156 } else {
157 if block_view
160 > self
161 .upgrade_lock
162 .upgrade_view()
163 .unwrap_or(ViewNumber::new(0))
164 + 1
165 {
166 tracing::warn!("High QC in epoch version and not the first QC after upgrade");
167 self.send_empty_block(event_stream, block_view, block_epoch, version)
168 .await;
169 return None;
170 }
171 0
173 };
174 high_qc_block_number = std::cmp::max(
175 high_qc_block_number,
176 self.consensus.read().await.highest_block,
177 );
178 if self
179 .consensus
180 .read()
181 .await
182 .transition_qc()
183 .is_some_and(|qc| {
184 let Some(e) = qc.0.data.epoch else {
185 return false;
186 };
187 e == epoch
188 })
189 || is_epoch_transition(high_qc_block_number, self.epoch_height)
190 {
191 if !is_last_block(high_qc_block_number, self.epoch_height) {
193 tracing::info!(
194 "Sending empty block event. View number: {block_view}. Parent Block \
195 number: {high_qc_block_number}"
196 );
197 self.send_empty_block(event_stream, block_view, block_epoch, version)
198 .await;
199 return None;
200 }
201 }
202 }
203
204 let block = {
206 if self
207 .upgrade_lock
208 .decided_upgrade_cert()
209 .as_ref()
210 .is_some_and(|cert| cert.upgrading_in(block_view))
211 {
212 None
213 } else {
214 self.wait_for_block(block_view, vid).await
215 }
216 };
217
218 if let Some(BuilderResponse {
219 block_payload,
220 metadata,
221 fee,
222 }) = block
223 {
224 broadcast_event(
225 Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
226 block_payload.encode(),
227 metadata,
228 block_view,
229 block_epoch,
230 vec1::vec1![fee],
231 ))),
232 event_stream,
233 )
234 .await;
235 } else {
236 self.send_empty_block(event_stream, block_view, block_epoch, version)
237 .await;
238 };
239
240 return None;
241 }
242
243 async fn send_empty_block(
245 &self,
246 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
247 block_view: ViewNumber,
248 block_epoch: Option<EpochNumber>,
249 version: Version,
250 ) {
251 tracing::info!("Failed to get a block for view {block_view}, proposing empty block");
253
254 self.consensus
256 .write()
257 .await
258 .metrics
259 .number_of_empty_blocks_proposed
260 .add(1);
261
262 let num_storage_nodes = match self
263 .membership_coordinator
264 .stake_table_for_epoch(block_epoch)
265 .await
266 {
267 Ok(epoch_stake_table) => epoch_stake_table.total_nodes().await,
268 Err(e) => {
269 tracing::warn!("Failed to get num_storage_nodes for epoch {block_epoch:?}: {e}");
270 return;
271 },
272 };
273
274 let Some(null_fee) = null_block::builder_fee::<TYPES>(num_storage_nodes, version) else {
275 tracing::error!("Failed to get null fee");
276 return;
277 };
278
279 let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
281
282 broadcast_event(
284 Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
285 vec![].into(),
286 metadata,
287 block_view,
288 block_epoch,
289 vec1::vec1![null_fee],
290 ))),
291 event_stream,
292 )
293 .await;
294 }
295
296 pub async fn null_block(
298 &self,
299 block_view: ViewNumber,
300 block_epoch: Option<EpochNumber>,
301 version: Version,
302 num_storage_nodes: usize,
303 ) -> Option<PackedBundle<TYPES>> {
304 let Some(null_fee) = null_block::builder_fee::<TYPES>(num_storage_nodes, version) else {
305 tracing::error!("Failed to calculate null block fee.");
306 return None;
307 };
308
309 let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
311
312 Some(PackedBundle::new(
313 vec![].into(),
314 metadata,
315 block_view,
316 block_epoch,
317 vec1::vec1![null_fee],
318 ))
319 }
320
321 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Transaction task", level = "error", target = "TransactionTaskState")]
323 pub async fn handle(
324 &mut self,
325 event: Arc<HotShotEvent<TYPES>>,
326 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
327 ) -> Result<()> {
328 match event.as_ref() {
329 HotShotEvent::TransactionsRecv(transactions) => {
330 broadcast_event(
331 Event {
332 view_number: self.cur_view,
333 event: EventType::Transactions {
334 transactions: transactions.clone(),
335 },
336 },
337 &self.output_event_stream,
338 )
339 .await;
340 },
341 HotShotEvent::ViewChange(view, epoch) => {
342 let view = ViewNumber::new(std::cmp::max(1, **view));
343 ensure!(
344 *view > *self.cur_view && *epoch >= self.cur_epoch,
345 debug!(
346 "Received a view change to an older view and epoch: tried to change view \
347 to {view}and epoch {epoch:?} though we are at view {} and epoch {:?}",
348 self.cur_view, self.cur_epoch
349 )
350 );
351 self.cur_view = view;
352 self.cur_epoch = *epoch;
353
354 let leader = self
355 .membership_coordinator
356 .membership_for_epoch(*epoch)
357 .await?
358 .leader(view)
359 .await?;
360 if leader == self.public_key {
361 self.handle_view_change(&event_stream, view, *epoch, None)
362 .await;
363 return Ok(());
364 }
365 },
366 HotShotEvent::QuorumProposalValidated(proposal, _leaf) => {
367 let view_number = proposal.data.view_number();
368 let next_view = view_number + 1;
369
370 let version = match self.upgrade_lock.version(next_view) {
371 Ok(v) => v,
372 Err(e) => {
373 tracing::error!("Failed to calculate version: {e:?}");
374 return Ok(());
375 },
376 };
377
378 if version < DRB_AND_HEADER_UPGRADE_VERSION {
379 return Ok(());
380 }
381
382 let vid = proposal.data.block_header().payload_commitment();
383 let block_height = proposal.data.block_header().block_number();
384 if is_epoch_transition(block_height, self.epoch_height) {
385 return Ok(());
386 }
387 if next_view <= self.cur_view {
388 return Ok(());
389 }
390 self.cur_view = next_view;
392
393 let leader = self
394 .membership_coordinator
395 .membership_for_epoch(self.cur_epoch)
396 .await?
397 .leader(next_view)
398 .await?;
399 if leader == self.public_key {
400 self.handle_view_change(&event_stream, next_view, self.cur_epoch, Some(vid))
401 .await;
402 return Ok(());
403 }
404 },
405 _ => {},
406 }
407 Ok(())
408 }
409
410 #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
413 async fn last_vid_commitment_retry(
414 &self,
415 block_view: ViewNumber,
416 task_start_time: Instant,
417 ) -> Result<(ViewNumber, VidCommitment)> {
418 loop {
419 match self.last_vid_commitment(block_view).await {
420 Ok((view, comm)) => break Ok((view, comm)),
421 Err(e) if task_start_time.elapsed() >= self.builder_timeout => break Err(e),
422 _ => {
423 sleep(RETRY_DELAY).await;
425 continue;
426 },
427 }
428 }
429 }
430
431 #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
434 async fn last_vid_commitment(
435 &self,
436 block_view: ViewNumber,
437 ) -> Result<(ViewNumber, VidCommitment)> {
438 let consensus_reader = self.consensus.read().await;
439 let mut target_view = ViewNumber::new(block_view.saturating_sub(1));
440
441 loop {
442 let view_data = consensus_reader
443 .validated_state_map()
444 .get(&target_view)
445 .context(info!(
446 "Missing record for view {target_view} in validated state",
447 ))?;
448
449 match &view_data.view_inner {
450 ViewInner::Da {
451 payload_commitment, ..
452 } => return Ok((target_view, *payload_commitment)),
453 ViewInner::Leaf {
454 leaf: leaf_commitment,
455 ..
456 } => {
457 let leaf = consensus_reader
458 .saved_leaves()
459 .get(leaf_commitment)
460 .context(info!(
461 "Missing leaf with commitment {leaf_commitment} for view \
462 {target_view} in saved_leaves",
463 ))?;
464 return Ok((target_view, leaf.payload_commitment()));
465 },
466 ViewInner::Failed => {
467 target_view = ViewNumber::new(target_view.checked_sub(1).context(warn!(
469 "Reached genesis. Something is wrong -- have we not decided any blocks \
470 since genesis?"
471 ))?);
472 continue;
473 },
474 }
475 }
476 }
477
478 #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view), name = "wait_for_block", level = "error")]
479 async fn wait_for_block(
480 &self,
481 block_view: ViewNumber,
482 vid: Option<VidCommitment>,
483 ) -> Option<BuilderResponse<TYPES>> {
484 let task_start_time = Instant::now();
485
486 let (parent_view, parent_comm) = if let Some(vid) = vid {
488 (block_view - 1, vid)
489 } else {
490 match self
491 .last_vid_commitment_retry(block_view, task_start_time)
492 .await
493 {
494 Ok((v, c)) => (v, c),
495 Err(e) => {
496 tracing::warn!("Failed to find last vid commitment in time: {e}");
497 return None;
498 },
499 }
500 };
501
502 let parent_comm_sig = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
503 &self.private_key,
504 parent_comm.as_ref(),
505 ) {
506 Ok(sig) => sig,
507 Err(err) => {
508 tracing::error!(%err, "Failed to sign block hash");
509 return None;
510 },
511 };
512
513 while task_start_time.elapsed() < self.builder_timeout {
514 match timeout(
515 self.builder_timeout
516 .saturating_sub(task_start_time.elapsed()),
517 self.block_from_builder(parent_comm, parent_view, &parent_comm_sig),
518 )
519 .await
520 {
521 Ok(Ok(block)) => {
523 return Some(block);
524 },
525
526 Ok(Err(err)) => {
528 tracing::info!("Couldn't get a block: {err:#}");
529 sleep(RETRY_DELAY).await;
531 continue;
532 },
533
534 Err(err) => {
536 tracing::info!(%err, "Timeout while getting available blocks");
537 return None;
538 },
539 }
540 }
541
542 tracing::warn!("could not get a block from the builder in time");
543 None
544 }
545
546 async fn get_available_blocks(
549 &self,
550 parent_comm: VidCommitment,
551 view_number: ViewNumber,
552 parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
553 ) -> Vec<(AvailableBlockInfo<TYPES>, usize)> {
554 let tasks = self
555 .builder_clients
556 .iter()
557 .enumerate()
558 .map(|(builder_idx, client)| async move {
559 client
560 .available_blocks(
561 parent_comm,
562 view_number.u64(),
563 self.public_key.clone(),
564 parent_comm_sig,
565 )
566 .await
567 .map(move |blocks| {
568 blocks
569 .into_iter()
570 .map(move |block_info| (block_info, builder_idx))
571 })
572 })
573 .collect::<FuturesUnordered<_>>();
574 let mut results = Vec::with_capacity(self.builder_clients.len());
575 let query_start = Instant::now();
576 let threshold = (self.builder_clients.len() * BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND)
577 .div_ceil(BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR);
578 let mut tasks = tasks.take(threshold);
579 while let Some(result) = tasks.next().await {
580 results.push(result);
581 if query_start.elapsed() > BUILDER_MAIN_BATCH_CUTOFF {
582 break;
583 }
584 }
585 let timeout = sleep(std::cmp::max(
586 query_start
587 .elapsed()
588 .mul_f32(BUILDER_ADDITIONAL_TIME_MULTIPLIER),
589 BUILDER_MINIMUM_QUERY_TIME.saturating_sub(query_start.elapsed()),
590 ));
591 futures::pin_mut!(timeout);
592 let mut tasks = tasks.into_inner().take_until(timeout);
593 while let Some(result) = tasks.next().await {
594 results.push(result);
595 }
596 results
597 .into_iter()
598 .filter_map(|result| result.ok())
599 .flatten()
600 .collect::<Vec<_>>()
601 }
602
603 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "block_from_builder", level = "error")]
611 async fn block_from_builder(
612 &self,
613 parent_comm: VidCommitment,
614 view_number: ViewNumber,
615 parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
616 ) -> Result<BuilderResponse<TYPES>> {
617 let mut available_blocks = self
618 .get_available_blocks(parent_comm, view_number, parent_comm_sig)
619 .await;
620
621 available_blocks.sort_by(|(l, _), (r, _)| {
622 (u128::from(l.offered_fee) * u128::from(r.block_size))
630 .cmp(&(u128::from(r.offered_fee) * u128::from(l.block_size)))
631 });
632
633 if available_blocks.is_empty() {
634 tracing::info!("No available blocks");
635 bail!("No available blocks");
636 }
637
638 for (block_info, builder_idx) in available_blocks {
639 if !block_info.sender.validate_block_info_signature(
641 &block_info.signature,
642 block_info.block_size,
643 block_info.offered_fee,
644 &block_info.block_hash,
645 ) {
646 tracing::warn!("Failed to verify available block info response message signature");
647 continue;
648 }
649
650 let request_signature = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
651 &self.private_key,
652 block_info.block_hash.as_ref(),
653 ) {
654 Ok(request_signature) => request_signature,
655 Err(err) => {
656 tracing::error!(%err, "Failed to sign block hash");
657 continue;
658 },
659 };
660
661 let response = {
662 let client = &self.builder_clients[builder_idx];
663
664 let (block, either_header_input) = futures::join! {
665 client.claim_block(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature),
666 client.claim_either_block_header_input(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature)
667 };
668
669 let block_data = match block {
670 Ok(block_data) => block_data,
671 Err(err) => {
672 tracing::warn!(%err, "Error claiming block data");
673 continue;
674 },
675 };
676
677 let Ok(either_header_input) = either_header_input
678 .inspect_err(|err| tracing::warn!(%err, "Error claiming header input"))
679 else {
680 continue;
681 };
682
683 let Some(header_input) = either_header_input
684 .validate_signature_and_get_input(block_info.offered_fee, &block_data.metadata)
685 else {
686 tracing::warn!(
687 "Failed to verify available new or legacy block header input data \
688 response message signature"
689 );
690 continue;
691 };
692
693 if !block_data.validate_signature() {
695 tracing::warn!(
696 "Failed to verify available block data response message signature"
697 );
698 continue;
699 }
700
701 let fee = BuilderFee {
702 fee_amount: block_info.offered_fee,
703 fee_account: header_input.sender,
704 fee_signature: header_input.fee_signature,
705 };
706
707 BuilderResponse {
708 fee,
709 block_payload: block_data.block_payload,
710 metadata: block_data.metadata,
711 }
712 };
713
714 return Ok(response);
715 }
716
717 bail!("Couldn't claim a block from any of the builders");
718 }
719}
720
721#[async_trait]
722impl<TYPES: NodeType> TaskState for TransactionTaskState<TYPES> {
724 type Event = HotShotEvent<TYPES>;
725
726 async fn handle_event(
727 &mut self,
728 event: Arc<Self::Event>,
729 sender: &Sender<Arc<Self::Event>>,
730 _receiver: &Receiver<Arc<Self::Event>>,
731 ) -> Result<()> {
732 self.handle(event, sender.clone()).await
733 }
734
735 fn cancel_subtasks(&mut self) {}
736}