1use std::{
8 sync::Arc,
9 time::{Duration, Instant},
10};
11
12use async_broadcast::{Receiver, Sender};
13use async_trait::async_trait;
14use futures::{stream::FuturesUnordered, StreamExt};
15use hotshot_builder_api::v0_1::block_info::AvailableBlockInfo;
16use hotshot_task::task::TaskState;
17use hotshot_types::{
18 consensus::OuterConsensus,
19 data::{null_block, PackedBundle, VidCommitment},
20 epoch_membership::EpochMembershipCoordinator,
21 event::{Event, EventType},
22 message::UpgradeLock,
23 traits::{
24 block_contents::{BuilderFee, EncodeBytes},
25 node_implementation::{ConsensusTime, NodeType, Versions},
26 signature_key::{BuilderSignatureKey, SignatureKey},
27 BlockPayload,
28 },
29 utils::{is_epoch_transition, is_last_block, ViewInner},
30};
31use hotshot_utils::anytrace::*;
32use tokio::time::{sleep, timeout};
33use tracing::instrument;
34use vbs::version::{StaticVersionType, Version};
35
36use crate::{
37 builder::v0_1::BuilderClient as BuilderClientBase,
38 events::{HotShotEvent, HotShotTaskCompleted},
39 helpers::broadcast_event,
40};
41
42const BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND: usize = 2;
46const BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR: usize = 3;
48const BUILDER_MAIN_BATCH_CUTOFF: Duration = Duration::from_millis(700);
50const BUILDER_ADDITIONAL_TIME_MULTIPLIER: f32 = 0.2;
52const BUILDER_MINIMUM_QUERY_TIME: Duration = Duration::from_millis(300);
55const RETRY_DELAY: Duration = Duration::from_millis(100);
57
58pub struct BuilderResponse<TYPES: NodeType> {
60 pub fee: BuilderFee<TYPES>,
62
63 pub block_payload: TYPES::BlockPayload,
65
66 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
68}
69
70pub struct TransactionTaskState<TYPES: NodeType, V: Versions> {
72 pub builder_timeout: Duration,
74
75 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
77
78 pub cur_view: TYPES::View,
80
81 pub cur_epoch: Option<TYPES::Epoch>,
83
84 pub consensus: OuterConsensus<TYPES>,
86
87 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
89
90 pub builder_clients: Vec<BuilderClientBase<TYPES>>,
92
93 pub public_key: TYPES::SignatureKey,
95
96 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
98
99 pub instance_state: Arc<TYPES::InstanceState>,
101
102 pub id: u64,
104
105 pub upgrade_lock: UpgradeLock<TYPES, V>,
107
108 pub epoch_height: u64,
110}
111
112impl<TYPES: NodeType, V: Versions> TransactionTaskState<TYPES, V> {
113 pub async fn handle_view_change(
115 &mut self,
116 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
117 block_view: TYPES::View,
118 block_epoch: Option<TYPES::Epoch>,
119 ) -> Option<HotShotTaskCompleted> {
120 let _version = match self.upgrade_lock.version(block_view).await {
121 Ok(v) => v,
122 Err(e) => {
123 tracing::error!("Failed to calculate version: {e:?}");
124 return None;
125 },
126 };
127
128 self.handle_view_change_legacy(event_stream, block_view, block_epoch)
129 .await
130 }
131
132 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Transaction task", level = "error", target = "TransactionTaskState")]
134 pub async fn handle_view_change_legacy(
135 &mut self,
136 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
137 block_view: TYPES::View,
138 block_epoch: Option<TYPES::Epoch>,
139 ) -> Option<HotShotTaskCompleted> {
140 let version = match self.upgrade_lock.version(block_view).await {
141 Ok(v) => v,
142 Err(err) => {
143 tracing::error!(
144 "Upgrade certificate requires unsupported version, refusing to request \
145 blocks: {err}"
146 );
147 return None;
148 },
149 };
150
151 if version >= V::Epochs::VERSION {
154 let Some(epoch) = block_epoch else {
155 tracing::error!("Epoch is required for epoch-based view change");
156 return None;
157 };
158 let high_qc = self.consensus.read().await.high_qc().clone();
159 let mut high_qc_block_number = if let Some(bn) = high_qc.data.block_number {
160 bn
161 } else {
162 if block_view
165 > self
166 .upgrade_lock
167 .upgrade_view()
168 .await
169 .unwrap_or(TYPES::View::new(0))
170 + 1
171 {
172 tracing::warn!("High QC in epoch version and not the first QC after upgrade");
173 self.send_empty_block(event_stream, block_view, block_epoch, version)
174 .await;
175 return None;
176 }
177 0
179 };
180 high_qc_block_number = std::cmp::max(
181 high_qc_block_number,
182 self.consensus.read().await.highest_block,
183 );
184 if self
185 .consensus
186 .read()
187 .await
188 .transition_qc()
189 .is_some_and(|qc| {
190 let Some(e) = qc.0.data.epoch else {
191 return false;
192 };
193 e == epoch
194 })
195 || is_epoch_transition(high_qc_block_number, self.epoch_height)
196 {
197 if !is_last_block(high_qc_block_number, self.epoch_height) {
199 tracing::info!(
200 "Sending empty block event. View number: {block_view}. Parent Block \
201 number: {high_qc_block_number}"
202 );
203 self.send_empty_block(event_stream, block_view, block_epoch, version)
204 .await;
205 return None;
206 }
207 }
208 }
209
210 let block = {
212 if self
213 .upgrade_lock
214 .decided_upgrade_certificate
215 .read()
216 .await
217 .as_ref()
218 .is_some_and(|cert| cert.upgrading_in(block_view))
219 {
220 None
221 } else {
222 self.wait_for_block(block_view).await
223 }
224 };
225
226 if let Some(BuilderResponse {
227 block_payload,
228 metadata,
229 fee,
230 }) = block
231 {
232 broadcast_event(
233 Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
234 block_payload.encode(),
235 metadata,
236 block_view,
237 block_epoch,
238 vec1::vec1![fee],
239 ))),
240 event_stream,
241 )
242 .await;
243 } else {
244 self.send_empty_block(event_stream, block_view, block_epoch, version)
245 .await;
246 };
247
248 return None;
249 }
250
251 async fn send_empty_block(
253 &self,
254 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
255 block_view: TYPES::View,
256 block_epoch: Option<TYPES::Epoch>,
257 version: Version,
258 ) {
259 tracing::info!("Failed to get a block for view {block_view}, proposing empty block");
261
262 self.consensus
264 .write()
265 .await
266 .metrics
267 .number_of_empty_blocks_proposed
268 .add(1);
269
270 let num_storage_nodes = match self
271 .membership_coordinator
272 .stake_table_for_epoch(block_epoch)
273 .await
274 {
275 Ok(epoch_stake_table) => epoch_stake_table.total_nodes().await,
276 Err(e) => {
277 tracing::warn!("Failed to get num_storage_nodes for epoch {block_epoch:?}: {e}");
278 return;
279 },
280 };
281
282 let Some(null_fee) = null_block::builder_fee::<TYPES, V>(num_storage_nodes, version) else {
283 tracing::error!("Failed to get null fee");
284 return;
285 };
286
287 let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
289
290 broadcast_event(
292 Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
293 vec![].into(),
294 metadata,
295 block_view,
296 block_epoch,
297 vec1::vec1![null_fee],
298 ))),
299 event_stream,
300 )
301 .await;
302 }
303
304 pub async fn null_block(
306 &self,
307 block_view: TYPES::View,
308 block_epoch: Option<TYPES::Epoch>,
309 version: Version,
310 num_storage_nodes: usize,
311 ) -> Option<PackedBundle<TYPES>> {
312 let Some(null_fee) = null_block::builder_fee::<TYPES, V>(num_storage_nodes, version) else {
313 tracing::error!("Failed to calculate null block fee.");
314 return None;
315 };
316
317 let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
319
320 Some(PackedBundle::new(
321 vec![].into(),
322 metadata,
323 block_view,
324 block_epoch,
325 vec1::vec1![null_fee],
326 ))
327 }
328
329 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Transaction task", level = "error", target = "TransactionTaskState")]
331 pub async fn handle(
332 &mut self,
333 event: Arc<HotShotEvent<TYPES>>,
334 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
335 ) -> Result<()> {
336 match event.as_ref() {
337 HotShotEvent::TransactionsRecv(transactions) => {
338 broadcast_event(
339 Event {
340 view_number: self.cur_view,
341 event: EventType::Transactions {
342 transactions: transactions.clone(),
343 },
344 },
345 &self.output_event_stream,
346 )
347 .await;
348 },
349 HotShotEvent::ViewChange(view, epoch) => {
350 let view = TYPES::View::new(std::cmp::max(1, **view));
351 ensure!(
352 *view > *self.cur_view && *epoch >= self.cur_epoch,
353 debug!(
354 "Received a view change to an older view and epoch: tried to change view \
355 to {view}and epoch {epoch:?} though we are at view {} and epoch {:?}",
356 self.cur_view, self.cur_epoch
357 )
358 );
359 self.cur_view = view;
360 self.cur_epoch = *epoch;
361
362 let leader = self
363 .membership_coordinator
364 .membership_for_epoch(*epoch)
365 .await?
366 .leader(view)
367 .await?;
368 if leader == self.public_key {
369 self.handle_view_change(&event_stream, view, *epoch).await;
370 return Ok(());
371 }
372 },
373 _ => {},
374 }
375 Ok(())
376 }
377
378 #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
381 async fn last_vid_commitment_retry(
382 &self,
383 block_view: TYPES::View,
384 task_start_time: Instant,
385 ) -> Result<(TYPES::View, VidCommitment)> {
386 loop {
387 match self.last_vid_commitment(block_view).await {
388 Ok((view, comm)) => break Ok((view, comm)),
389 Err(e) if task_start_time.elapsed() >= self.builder_timeout => break Err(e),
390 _ => {
391 sleep(RETRY_DELAY).await;
393 continue;
394 },
395 }
396 }
397 }
398
399 #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
402 async fn last_vid_commitment(
403 &self,
404 block_view: TYPES::View,
405 ) -> Result<(TYPES::View, VidCommitment)> {
406 let consensus_reader = self.consensus.read().await;
407 let mut target_view = TYPES::View::new(block_view.saturating_sub(1));
408
409 loop {
410 let view_data = consensus_reader
411 .validated_state_map()
412 .get(&target_view)
413 .context(info!(
414 "Missing record for view {target_view} in validated state",
415 ))?;
416
417 match &view_data.view_inner {
418 ViewInner::Da {
419 payload_commitment, ..
420 } => return Ok((target_view, *payload_commitment)),
421 ViewInner::Leaf {
422 leaf: leaf_commitment,
423 ..
424 } => {
425 let leaf = consensus_reader
426 .saved_leaves()
427 .get(leaf_commitment)
428 .context(info!(
429 "Missing leaf with commitment {leaf_commitment} for view \
430 {target_view} in saved_leaves",
431 ))?;
432 return Ok((target_view, leaf.payload_commitment()));
433 },
434 ViewInner::Failed => {
435 target_view = TYPES::View::new(target_view.checked_sub(1).context(warn!(
437 "Reached genesis. Something is wrong -- have we not decided any blocks \
438 since genesis?"
439 ))?);
440 continue;
441 },
442 }
443 }
444 }
445
446 #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view), name = "wait_for_block", level = "error")]
447 async fn wait_for_block(&self, block_view: TYPES::View) -> Option<BuilderResponse<TYPES>> {
448 let task_start_time = Instant::now();
449
450 let (parent_view, parent_comm) = match self
452 .last_vid_commitment_retry(block_view, task_start_time)
453 .await
454 {
455 Ok((v, c)) => (v, c),
456 Err(e) => {
457 tracing::warn!("Failed to find last vid commitment in time: {e}");
458 return None;
459 },
460 };
461
462 let parent_comm_sig = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
463 &self.private_key,
464 parent_comm.as_ref(),
465 ) {
466 Ok(sig) => sig,
467 Err(err) => {
468 tracing::error!(%err, "Failed to sign block hash");
469 return None;
470 },
471 };
472
473 while task_start_time.elapsed() < self.builder_timeout {
474 match timeout(
475 self.builder_timeout
476 .saturating_sub(task_start_time.elapsed()),
477 self.block_from_builder(parent_comm, parent_view, &parent_comm_sig),
478 )
479 .await
480 {
481 Ok(Ok(block)) => {
483 return Some(block);
484 },
485
486 Ok(Err(err)) => {
488 tracing::info!("Couldn't get a block: {err:#}");
489 sleep(RETRY_DELAY).await;
491 continue;
492 },
493
494 Err(err) => {
496 tracing::info!(%err, "Timeout while getting available blocks");
497 return None;
498 },
499 }
500 }
501
502 tracing::warn!("could not get a block from the builder in time");
503 None
504 }
505
506 async fn get_available_blocks(
509 &self,
510 parent_comm: VidCommitment,
511 view_number: TYPES::View,
512 parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
513 ) -> Vec<(AvailableBlockInfo<TYPES>, usize)> {
514 let tasks = self
515 .builder_clients
516 .iter()
517 .enumerate()
518 .map(|(builder_idx, client)| async move {
519 client
520 .available_blocks(
521 parent_comm,
522 view_number.u64(),
523 self.public_key.clone(),
524 parent_comm_sig,
525 )
526 .await
527 .map(move |blocks| {
528 blocks
529 .into_iter()
530 .map(move |block_info| (block_info, builder_idx))
531 })
532 })
533 .collect::<FuturesUnordered<_>>();
534 let mut results = Vec::with_capacity(self.builder_clients.len());
535 let query_start = Instant::now();
536 let threshold = (self.builder_clients.len() * BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND)
537 .div_ceil(BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR);
538 let mut tasks = tasks.take(threshold);
539 while let Some(result) = tasks.next().await {
540 results.push(result);
541 if query_start.elapsed() > BUILDER_MAIN_BATCH_CUTOFF {
542 break;
543 }
544 }
545 let timeout = sleep(std::cmp::max(
546 query_start
547 .elapsed()
548 .mul_f32(BUILDER_ADDITIONAL_TIME_MULTIPLIER),
549 BUILDER_MINIMUM_QUERY_TIME.saturating_sub(query_start.elapsed()),
550 ));
551 futures::pin_mut!(timeout);
552 let mut tasks = tasks.into_inner().take_until(timeout);
553 while let Some(result) = tasks.next().await {
554 results.push(result);
555 }
556 results
557 .into_iter()
558 .filter_map(|result| result.ok())
559 .flatten()
560 .collect::<Vec<_>>()
561 }
562
563 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "block_from_builder", level = "error")]
571 async fn block_from_builder(
572 &self,
573 parent_comm: VidCommitment,
574 view_number: TYPES::View,
575 parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
576 ) -> Result<BuilderResponse<TYPES>> {
577 let mut available_blocks = self
578 .get_available_blocks(parent_comm, view_number, parent_comm_sig)
579 .await;
580
581 available_blocks.sort_by(|(l, _), (r, _)| {
582 (u128::from(l.offered_fee) * u128::from(r.block_size))
590 .cmp(&(u128::from(r.offered_fee) * u128::from(l.block_size)))
591 });
592
593 if available_blocks.is_empty() {
594 tracing::info!("No available blocks");
595 bail!("No available blocks");
596 }
597
598 for (block_info, builder_idx) in available_blocks {
599 if !block_info.sender.validate_block_info_signature(
601 &block_info.signature,
602 block_info.block_size,
603 block_info.offered_fee,
604 &block_info.block_hash,
605 ) {
606 tracing::warn!("Failed to verify available block info response message signature");
607 continue;
608 }
609
610 let request_signature = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
611 &self.private_key,
612 block_info.block_hash.as_ref(),
613 ) {
614 Ok(request_signature) => request_signature,
615 Err(err) => {
616 tracing::error!(%err, "Failed to sign block hash");
617 continue;
618 },
619 };
620
621 let response = {
622 let client = &self.builder_clients[builder_idx];
623
624 let (block, either_header_input) = futures::join! {
625 client.claim_block(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature),
626 client.claim_either_block_header_input(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature)
627 };
628
629 let block_data = match block {
630 Ok(block_data) => block_data,
631 Err(err) => {
632 tracing::warn!(%err, "Error claiming block data");
633 continue;
634 },
635 };
636
637 let Ok(either_header_input) = either_header_input
638 .inspect_err(|err| tracing::warn!(%err, "Error claiming header input"))
639 else {
640 continue;
641 };
642
643 let Some(header_input) = either_header_input
644 .validate_signature_and_get_input(block_info.offered_fee, &block_data.metadata)
645 else {
646 tracing::warn!(
647 "Failed to verify available new or legacy block header input data \
648 response message signature"
649 );
650 continue;
651 };
652
653 if !block_data.validate_signature() {
655 tracing::warn!(
656 "Failed to verify available block data response message signature"
657 );
658 continue;
659 }
660
661 if !header_input.validate_signature(block_info.offered_fee, &block_data.metadata) {
663 tracing::warn!(
664 "Failed to verify available block header input data response message \
665 signature"
666 );
667 continue;
668 }
669
670 let fee = BuilderFee {
671 fee_amount: block_info.offered_fee,
672 fee_account: header_input.sender,
673 fee_signature: header_input.fee_signature,
674 };
675
676 BuilderResponse {
677 fee,
678 block_payload: block_data.block_payload,
679 metadata: block_data.metadata,
680 }
681 };
682
683 return Ok(response);
684 }
685
686 bail!("Couldn't claim a block from any of the builders");
687 }
688}
689
690#[async_trait]
691impl<TYPES: NodeType, V: Versions> TaskState for TransactionTaskState<TYPES, V> {
693 type Event = HotShotEvent<TYPES>;
694
695 async fn handle_event(
696 &mut self,
697 event: Arc<Self::Event>,
698 sender: &Sender<Arc<Self::Event>>,
699 _receiver: &Receiver<Arc<Self::Event>>,
700 ) -> Result<()> {
701 self.handle(event, sender.clone()).await
702 }
703
704 fn cancel_subtasks(&mut self) {}
705}