hotshot_task_impls/
transactions.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    sync::Arc,
9    time::{Duration, Instant},
10};
11
12use async_broadcast::{Receiver, Sender};
13use async_trait::async_trait;
14use futures::{stream::FuturesUnordered, StreamExt};
15use hotshot_builder_api::v0_1::block_info::AvailableBlockInfo;
16use hotshot_task::task::TaskState;
17use hotshot_types::{
18    consensus::OuterConsensus,
19    data::{null_block, PackedBundle, VidCommitment},
20    epoch_membership::EpochMembershipCoordinator,
21    event::{Event, EventType},
22    message::UpgradeLock,
23    traits::{
24        block_contents::{BuilderFee, EncodeBytes},
25        node_implementation::{ConsensusTime, NodeType, Versions},
26        signature_key::{BuilderSignatureKey, SignatureKey},
27        BlockPayload,
28    },
29    utils::{is_epoch_transition, is_last_block, ViewInner},
30};
31use hotshot_utils::anytrace::*;
32use tokio::time::{sleep, timeout};
33use tracing::instrument;
34use vbs::version::{StaticVersionType, Version};
35
36use crate::{
37    builder::v0_1::BuilderClient as BuilderClientBase,
38    events::{HotShotEvent, HotShotTaskCompleted},
39    helpers::broadcast_event,
40};
41
42// Parameters for builder querying algorithm
43
44/// Proportion of builders queried in first batch, dividend
45const BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND: usize = 2;
46/// Proportion of builders queried in the first batch, divisor
47const BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR: usize = 3;
48/// Time the first batch of builders has to respond
49const BUILDER_MAIN_BATCH_CUTOFF: Duration = Duration::from_millis(700);
50/// Multiplier for extra time to give to the second batch of builders
51const BUILDER_ADDITIONAL_TIME_MULTIPLIER: f32 = 0.2;
52/// Minimum amount of time allotted to both batches, cannot be cut shorter if the first batch
53/// responds extremely fast.
54const BUILDER_MINIMUM_QUERY_TIME: Duration = Duration::from_millis(300);
55/// Delay between re-tries on unsuccessful calls
56const RETRY_DELAY: Duration = Duration::from_millis(100);
57
58/// Builder Provided Responses
59pub struct BuilderResponse<TYPES: NodeType> {
60    /// Fee information
61    pub fee: BuilderFee<TYPES>,
62
63    /// Block payload
64    pub block_payload: TYPES::BlockPayload,
65
66    /// Block metadata
67    pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
68}
69
70/// Tracks state of a Transaction task
71pub struct TransactionTaskState<TYPES: NodeType, V: Versions> {
72    /// The state's api
73    pub builder_timeout: Duration,
74
75    /// Output events to application
76    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
77
78    /// View number this view is executing in.
79    pub cur_view: TYPES::View,
80
81    /// Epoch number this node is executing in.
82    pub cur_epoch: Option<TYPES::Epoch>,
83
84    /// Reference to consensus. Leader will require a read lock on this.
85    pub consensus: OuterConsensus<TYPES>,
86
87    /// Membership for the quorum
88    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
89
90    /// Builder 0.1 API clients
91    pub builder_clients: Vec<BuilderClientBase<TYPES>>,
92
93    /// This Nodes Public Key
94    pub public_key: TYPES::SignatureKey,
95
96    /// Our Private Key
97    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
98
99    /// InstanceState
100    pub instance_state: Arc<TYPES::InstanceState>,
101
102    /// This state's ID
103    pub id: u64,
104
105    /// Lock for a decided upgrade
106    pub upgrade_lock: UpgradeLock<TYPES, V>,
107
108    /// Number of blocks in an epoch, zero means there are no epochs
109    pub epoch_height: u64,
110}
111
112impl<TYPES: NodeType, V: Versions> TransactionTaskState<TYPES, V> {
113    /// handle view change decide legacy or not
114    pub async fn handle_view_change(
115        &mut self,
116        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
117        block_view: TYPES::View,
118        block_epoch: Option<TYPES::Epoch>,
119    ) -> Option<HotShotTaskCompleted> {
120        let _version = match self.upgrade_lock.version(block_view).await {
121            Ok(v) => v,
122            Err(e) => {
123                tracing::error!("Failed to calculate version: {e:?}");
124                return None;
125            },
126        };
127
128        self.handle_view_change_legacy(event_stream, block_view, block_epoch)
129            .await
130    }
131
132    /// legacy view change handler
133    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Transaction task", level = "error", target = "TransactionTaskState")]
134    pub async fn handle_view_change_legacy(
135        &mut self,
136        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
137        block_view: TYPES::View,
138        block_epoch: Option<TYPES::Epoch>,
139    ) -> Option<HotShotTaskCompleted> {
140        let version = match self.upgrade_lock.version(block_view).await {
141            Ok(v) => v,
142            Err(err) => {
143                tracing::error!(
144                    "Upgrade certificate requires unsupported version, refusing to request \
145                     blocks: {err}"
146                );
147                return None;
148            },
149        };
150
151        // Short circuit if we are in epochs and we are likely proposing a transition block
152        // If it's the first view of the upgrade, we don't need to check for transition blocks
153        if version >= V::Epochs::VERSION {
154            let Some(epoch) = block_epoch else {
155                tracing::error!("Epoch is required for epoch-based view change");
156                return None;
157            };
158            let high_qc = self.consensus.read().await.high_qc().clone();
159            let mut high_qc_block_number = if let Some(bn) = high_qc.data.block_number {
160                bn
161            } else {
162                // If it's the first view after the upgrade the high QC won't have a block number
163                // So just use the highest_block number we've stored
164                if block_view
165                    > self
166                        .upgrade_lock
167                        .upgrade_view()
168                        .await
169                        .unwrap_or(TYPES::View::new(0))
170                        + 1
171                {
172                    tracing::warn!("High QC in epoch version and not the first QC after upgrade");
173                    self.send_empty_block(event_stream, block_view, block_epoch, version)
174                        .await;
175                    return None;
176                }
177                // 0 here so we use the highest block number in the calculation below
178                0
179            };
180            high_qc_block_number = std::cmp::max(
181                high_qc_block_number,
182                self.consensus.read().await.highest_block,
183            );
184            if self
185                .consensus
186                .read()
187                .await
188                .transition_qc()
189                .is_some_and(|qc| {
190                    let Some(e) = qc.0.data.epoch else {
191                        return false;
192                    };
193                    e == epoch
194                })
195                || is_epoch_transition(high_qc_block_number, self.epoch_height)
196            {
197                // We are proposing a transition block it should be empty
198                if !is_last_block(high_qc_block_number, self.epoch_height) {
199                    tracing::info!(
200                        "Sending empty block event. View number: {block_view}. Parent Block \
201                         number: {high_qc_block_number}"
202                    );
203                    self.send_empty_block(event_stream, block_view, block_epoch, version)
204                        .await;
205                    return None;
206                }
207            }
208        }
209
210        // Request a block from the builder unless we are between versions.
211        let block = {
212            if self
213                .upgrade_lock
214                .decided_upgrade_certificate
215                .read()
216                .await
217                .as_ref()
218                .is_some_and(|cert| cert.upgrading_in(block_view))
219            {
220                None
221            } else {
222                self.wait_for_block(block_view).await
223            }
224        };
225
226        if let Some(BuilderResponse {
227            block_payload,
228            metadata,
229            fee,
230        }) = block
231        {
232            broadcast_event(
233                Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
234                    block_payload.encode(),
235                    metadata,
236                    block_view,
237                    block_epoch,
238                    vec1::vec1![fee],
239                ))),
240                event_stream,
241            )
242            .await;
243        } else {
244            self.send_empty_block(event_stream, block_view, block_epoch, version)
245                .await;
246        };
247
248        return None;
249    }
250
251    /// Send the event to the event stream that we are proposing an empty block
252    async fn send_empty_block(
253        &self,
254        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
255        block_view: TYPES::View,
256        block_epoch: Option<TYPES::Epoch>,
257        version: Version,
258    ) {
259        // If we couldn't get a block, send an empty block
260        tracing::info!("Failed to get a block for view {block_view}, proposing empty block");
261
262        // Increment the metric for number of empty blocks proposed
263        self.consensus
264            .write()
265            .await
266            .metrics
267            .number_of_empty_blocks_proposed
268            .add(1);
269
270        let num_storage_nodes = match self
271            .membership_coordinator
272            .stake_table_for_epoch(block_epoch)
273            .await
274        {
275            Ok(epoch_stake_table) => epoch_stake_table.total_nodes().await,
276            Err(e) => {
277                tracing::warn!("Failed to get num_storage_nodes for epoch {block_epoch:?}: {e}");
278                return;
279            },
280        };
281
282        let Some(null_fee) = null_block::builder_fee::<TYPES, V>(num_storage_nodes, version) else {
283            tracing::error!("Failed to get null fee");
284            return;
285        };
286
287        // Create an empty block payload and metadata
288        let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
289
290        // Broadcast the empty block
291        broadcast_event(
292            Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
293                vec![].into(),
294                metadata,
295                block_view,
296                block_epoch,
297                vec1::vec1![null_fee],
298            ))),
299            event_stream,
300        )
301        .await;
302    }
303
304    /// Produce a null block
305    pub async fn null_block(
306        &self,
307        block_view: TYPES::View,
308        block_epoch: Option<TYPES::Epoch>,
309        version: Version,
310        num_storage_nodes: usize,
311    ) -> Option<PackedBundle<TYPES>> {
312        let Some(null_fee) = null_block::builder_fee::<TYPES, V>(num_storage_nodes, version) else {
313            tracing::error!("Failed to calculate null block fee.");
314            return None;
315        };
316
317        // Create an empty block payload and metadata
318        let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
319
320        Some(PackedBundle::new(
321            vec![].into(),
322            metadata,
323            block_view,
324            block_epoch,
325            vec1::vec1![null_fee],
326        ))
327    }
328
329    /// main task event handler
330    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Transaction task", level = "error", target = "TransactionTaskState")]
331    pub async fn handle(
332        &mut self,
333        event: Arc<HotShotEvent<TYPES>>,
334        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
335    ) -> Result<()> {
336        match event.as_ref() {
337            HotShotEvent::TransactionsRecv(transactions) => {
338                broadcast_event(
339                    Event {
340                        view_number: self.cur_view,
341                        event: EventType::Transactions {
342                            transactions: transactions.clone(),
343                        },
344                    },
345                    &self.output_event_stream,
346                )
347                .await;
348            },
349            HotShotEvent::ViewChange(view, epoch) => {
350                let view = TYPES::View::new(std::cmp::max(1, **view));
351                ensure!(
352                    *view > *self.cur_view && *epoch >= self.cur_epoch,
353                    debug!(
354                        "Received a view change to an older view and epoch: tried to change view \
355                         to {view}and epoch {epoch:?} though we are at view {} and epoch {:?}",
356                        self.cur_view, self.cur_epoch
357                    )
358                );
359                self.cur_view = view;
360                self.cur_epoch = *epoch;
361
362                let leader = self
363                    .membership_coordinator
364                    .membership_for_epoch(*epoch)
365                    .await?
366                    .leader(view)
367                    .await?;
368                if leader == self.public_key {
369                    self.handle_view_change(&event_stream, view, *epoch).await;
370                    return Ok(());
371                }
372            },
373            _ => {},
374        }
375        Ok(())
376    }
377
378    /// Get VID commitment for the last successful view before `block_view`.
379    /// Returns None if we don't have said commitment recorded.
380    #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
381    async fn last_vid_commitment_retry(
382        &self,
383        block_view: TYPES::View,
384        task_start_time: Instant,
385    ) -> Result<(TYPES::View, VidCommitment)> {
386        loop {
387            match self.last_vid_commitment(block_view).await {
388                Ok((view, comm)) => break Ok((view, comm)),
389                Err(e) if task_start_time.elapsed() >= self.builder_timeout => break Err(e),
390                _ => {
391                    // We still have time, will re-try in a bit
392                    sleep(RETRY_DELAY).await;
393                    continue;
394                },
395            }
396        }
397    }
398
399    /// Get VID commitment for the last successful view before `block_view`.
400    /// Returns None if we don't have said commitment recorded.
401    #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
402    async fn last_vid_commitment(
403        &self,
404        block_view: TYPES::View,
405    ) -> Result<(TYPES::View, VidCommitment)> {
406        let consensus_reader = self.consensus.read().await;
407        let mut target_view = TYPES::View::new(block_view.saturating_sub(1));
408
409        loop {
410            let view_data = consensus_reader
411                .validated_state_map()
412                .get(&target_view)
413                .context(info!(
414                    "Missing record for view {target_view} in validated state",
415                ))?;
416
417            match &view_data.view_inner {
418                ViewInner::Da {
419                    payload_commitment, ..
420                } => return Ok((target_view, *payload_commitment)),
421                ViewInner::Leaf {
422                    leaf: leaf_commitment,
423                    ..
424                } => {
425                    let leaf = consensus_reader
426                        .saved_leaves()
427                        .get(leaf_commitment)
428                        .context(info!(
429                            "Missing leaf with commitment {leaf_commitment} for view \
430                             {target_view} in saved_leaves",
431                        ))?;
432                    return Ok((target_view, leaf.payload_commitment()));
433                },
434                ViewInner::Failed => {
435                    // For failed views, backtrack
436                    target_view = TYPES::View::new(target_view.checked_sub(1).context(warn!(
437                        "Reached genesis. Something is wrong -- have we not decided any blocks \
438                         since genesis?"
439                    ))?);
440                    continue;
441                },
442            }
443        }
444    }
445
446    #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view), name = "wait_for_block", level = "error")]
447    async fn wait_for_block(&self, block_view: TYPES::View) -> Option<BuilderResponse<TYPES>> {
448        let task_start_time = Instant::now();
449
450        // Find commitment to the block we want to build upon
451        let (parent_view, parent_comm) = match self
452            .last_vid_commitment_retry(block_view, task_start_time)
453            .await
454        {
455            Ok((v, c)) => (v, c),
456            Err(e) => {
457                tracing::warn!("Failed to find last vid commitment in time: {e}");
458                return None;
459            },
460        };
461
462        let parent_comm_sig = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
463            &self.private_key,
464            parent_comm.as_ref(),
465        ) {
466            Ok(sig) => sig,
467            Err(err) => {
468                tracing::error!(%err, "Failed to sign block hash");
469                return None;
470            },
471        };
472
473        while task_start_time.elapsed() < self.builder_timeout {
474            match timeout(
475                self.builder_timeout
476                    .saturating_sub(task_start_time.elapsed()),
477                self.block_from_builder(parent_comm, parent_view, &parent_comm_sig),
478            )
479            .await
480            {
481                // We got a block
482                Ok(Ok(block)) => {
483                    return Some(block);
484                },
485
486                // We failed to get a block
487                Ok(Err(err)) => {
488                    tracing::info!("Couldn't get a block: {err:#}");
489                    // pause a bit
490                    sleep(RETRY_DELAY).await;
491                    continue;
492                },
493
494                // We timed out while getting available blocks
495                Err(err) => {
496                    tracing::info!(%err, "Timeout while getting available blocks");
497                    return None;
498                },
499            }
500        }
501
502        tracing::warn!("could not get a block from the builder in time");
503        None
504    }
505
506    /// Query the builders for available blocks. Queries only fraction of the builders
507    /// based on the response time.
508    async fn get_available_blocks(
509        &self,
510        parent_comm: VidCommitment,
511        view_number: TYPES::View,
512        parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
513    ) -> Vec<(AvailableBlockInfo<TYPES>, usize)> {
514        let tasks = self
515            .builder_clients
516            .iter()
517            .enumerate()
518            .map(|(builder_idx, client)| async move {
519                client
520                    .available_blocks(
521                        parent_comm,
522                        view_number.u64(),
523                        self.public_key.clone(),
524                        parent_comm_sig,
525                    )
526                    .await
527                    .map(move |blocks| {
528                        blocks
529                            .into_iter()
530                            .map(move |block_info| (block_info, builder_idx))
531                    })
532            })
533            .collect::<FuturesUnordered<_>>();
534        let mut results = Vec::with_capacity(self.builder_clients.len());
535        let query_start = Instant::now();
536        let threshold = (self.builder_clients.len() * BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND)
537            .div_ceil(BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR);
538        let mut tasks = tasks.take(threshold);
539        while let Some(result) = tasks.next().await {
540            results.push(result);
541            if query_start.elapsed() > BUILDER_MAIN_BATCH_CUTOFF {
542                break;
543            }
544        }
545        let timeout = sleep(std::cmp::max(
546            query_start
547                .elapsed()
548                .mul_f32(BUILDER_ADDITIONAL_TIME_MULTIPLIER),
549            BUILDER_MINIMUM_QUERY_TIME.saturating_sub(query_start.elapsed()),
550        ));
551        futures::pin_mut!(timeout);
552        let mut tasks = tasks.into_inner().take_until(timeout);
553        while let Some(result) = tasks.next().await {
554            results.push(result);
555        }
556        results
557            .into_iter()
558            .filter_map(|result| result.ok())
559            .flatten()
560            .collect::<Vec<_>>()
561    }
562
563    /// Get a block from builder.
564    /// Queries the sufficiently fast builders for available blocks and chooses the one with the
565    /// best fee/byte ratio, re-trying with the next best one in case of failure.
566    ///
567    /// # Errors
568    /// If none of the builder reports any available blocks or claiming block fails for all of the
569    /// builders.
570    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "block_from_builder", level = "error")]
571    async fn block_from_builder(
572        &self,
573        parent_comm: VidCommitment,
574        view_number: TYPES::View,
575        parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
576    ) -> Result<BuilderResponse<TYPES>> {
577        let mut available_blocks = self
578            .get_available_blocks(parent_comm, view_number, parent_comm_sig)
579            .await;
580
581        available_blocks.sort_by(|(l, _), (r, _)| {
582            // We want the block with the highest fee per byte of data we're going to have to
583            // process, thus our comparison function is:
584            //      (l.offered_fee / l.block_size) < (r.offered_fee / r.block_size)
585            // To avoid floating point math (which doesn't even have an `Ord` impl) we multiply
586            // through by the denominators to get
587            //      l.offered_fee * r.block_size < r.offered_fee * l.block_size
588            // We cast up to u128 to avoid overflow.
589            (u128::from(l.offered_fee) * u128::from(r.block_size))
590                .cmp(&(u128::from(r.offered_fee) * u128::from(l.block_size)))
591        });
592
593        if available_blocks.is_empty() {
594            tracing::info!("No available blocks");
595            bail!("No available blocks");
596        }
597
598        for (block_info, builder_idx) in available_blocks {
599            // Verify signature over chosen block.
600            if !block_info.sender.validate_block_info_signature(
601                &block_info.signature,
602                block_info.block_size,
603                block_info.offered_fee,
604                &block_info.block_hash,
605            ) {
606                tracing::warn!("Failed to verify available block info response message signature");
607                continue;
608            }
609
610            let request_signature = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
611                &self.private_key,
612                block_info.block_hash.as_ref(),
613            ) {
614                Ok(request_signature) => request_signature,
615                Err(err) => {
616                    tracing::error!(%err, "Failed to sign block hash");
617                    continue;
618                },
619            };
620
621            let response = {
622                let client = &self.builder_clients[builder_idx];
623
624                let (block, either_header_input) = futures::join! {
625                    client.claim_block(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature),
626                    client.claim_either_block_header_input(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature)
627                };
628
629                let block_data = match block {
630                    Ok(block_data) => block_data,
631                    Err(err) => {
632                        tracing::warn!(%err, "Error claiming block data");
633                        continue;
634                    },
635                };
636
637                let Ok(either_header_input) = either_header_input
638                    .inspect_err(|err| tracing::warn!(%err, "Error claiming header input"))
639                else {
640                    continue;
641                };
642
643                let Some(header_input) = either_header_input
644                    .validate_signature_and_get_input(block_info.offered_fee, &block_data.metadata)
645                else {
646                    tracing::warn!(
647                        "Failed to verify available new or legacy block header input data \
648                         response message signature"
649                    );
650                    continue;
651                };
652
653                // verify the signature over the message
654                if !block_data.validate_signature() {
655                    tracing::warn!(
656                        "Failed to verify available block data response message signature"
657                    );
658                    continue;
659                }
660
661                // verify the message signature and the fee_signature
662                if !header_input.validate_signature(block_info.offered_fee, &block_data.metadata) {
663                    tracing::warn!(
664                        "Failed to verify available block header input data response message \
665                         signature"
666                    );
667                    continue;
668                }
669
670                let fee = BuilderFee {
671                    fee_amount: block_info.offered_fee,
672                    fee_account: header_input.sender,
673                    fee_signature: header_input.fee_signature,
674                };
675
676                BuilderResponse {
677                    fee,
678                    block_payload: block_data.block_payload,
679                    metadata: block_data.metadata,
680                }
681            };
682
683            return Ok(response);
684        }
685
686        bail!("Couldn't claim a block from any of the builders");
687    }
688}
689
690#[async_trait]
691/// task state implementation for Transactions Task
692impl<TYPES: NodeType, V: Versions> TaskState for TransactionTaskState<TYPES, V> {
693    type Event = HotShotEvent<TYPES>;
694
695    async fn handle_event(
696        &mut self,
697        event: Arc<Self::Event>,
698        sender: &Sender<Arc<Self::Event>>,
699        _receiver: &Receiver<Arc<Self::Event>>,
700    ) -> Result<()> {
701        self.handle(event, sender.clone()).await
702    }
703
704    fn cancel_subtasks(&mut self) {}
705}