hotshot_task_impls/
transactions.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    sync::Arc,
9    time::{Duration, Instant},
10};
11
12use async_broadcast::{Receiver, Sender};
13use async_trait::async_trait;
14use futures::{StreamExt, stream::FuturesUnordered};
15use hotshot_builder_api::v0_1::block_info::AvailableBlockInfo;
16use hotshot_task::task::TaskState;
17use hotshot_types::{
18    consensus::OuterConsensus,
19    data::{EpochNumber, PackedBundle, VidCommitment, ViewNumber, null_block},
20    epoch_membership::EpochMembershipCoordinator,
21    event::{Event, EventType},
22    message::UpgradeLock,
23    traits::{
24        BlockPayload,
25        block_contents::{BlockHeader, BuilderFee, EncodeBytes},
26        node_implementation::NodeType,
27        signature_key::{BuilderSignatureKey, SignatureKey},
28    },
29    utils::{ViewInner, is_epoch_transition, is_last_block},
30};
31use hotshot_utils::anytrace::*;
32use tokio::time::{sleep, timeout};
33use tracing::instrument;
34use vbs::version::Version;
35use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_VERSION};
36
37use crate::{
38    builder::v0_1::BuilderClient as BuilderClientBase,
39    events::{HotShotEvent, HotShotTaskCompleted},
40    helpers::broadcast_event,
41};
42
43// Parameters for builder querying algorithm
44
45/// Proportion of builders queried in first batch, dividend
46const BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND: usize = 2;
47/// Proportion of builders queried in the first batch, divisor
48const BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR: usize = 3;
49/// Time the first batch of builders has to respond
50const BUILDER_MAIN_BATCH_CUTOFF: Duration = Duration::from_millis(700);
51/// Multiplier for extra time to give to the second batch of builders
52const BUILDER_ADDITIONAL_TIME_MULTIPLIER: f32 = 0.2;
53/// Minimum amount of time allotted to both batches, cannot be cut shorter if the first batch
54/// responds extremely fast.
55const BUILDER_MINIMUM_QUERY_TIME: Duration = Duration::from_millis(300);
56/// Delay between re-tries on unsuccessful calls
57const RETRY_DELAY: Duration = Duration::from_millis(100);
58
59/// Builder Provided Responses
60pub struct BuilderResponse<TYPES: NodeType> {
61    /// Fee information
62    pub fee: BuilderFee<TYPES>,
63
64    /// Block payload
65    pub block_payload: TYPES::BlockPayload,
66
67    /// Block metadata
68    pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
69}
70
71/// Tracks state of a Transaction task
72pub struct TransactionTaskState<TYPES: NodeType> {
73    /// The state's api
74    pub builder_timeout: Duration,
75
76    /// Output events to application
77    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
78
79    /// View number this view is executing in.
80    pub cur_view: ViewNumber,
81
82    /// Epoch number this node is executing in.
83    pub cur_epoch: Option<EpochNumber>,
84
85    /// Reference to consensus. Leader will require a read lock on this.
86    pub consensus: OuterConsensus<TYPES>,
87
88    /// Membership for the quorum
89    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
90
91    /// Builder 0.1 API clients
92    pub builder_clients: Vec<BuilderClientBase<TYPES>>,
93
94    /// This Nodes Public Key
95    pub public_key: TYPES::SignatureKey,
96
97    /// Our Private Key
98    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
99
100    /// InstanceState
101    pub instance_state: Arc<TYPES::InstanceState>,
102
103    /// This state's ID
104    pub id: u64,
105
106    /// Lock for a decided upgrade
107    pub upgrade_lock: UpgradeLock<TYPES>,
108
109    /// Number of blocks in an epoch, zero means there are no epochs
110    pub epoch_height: u64,
111}
112
113impl<TYPES: NodeType> TransactionTaskState<TYPES> {
114    /// handle view change decide legacy or not
115    pub async fn handle_view_change(
116        &mut self,
117        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
118        block_view: ViewNumber,
119        block_epoch: Option<EpochNumber>,
120        vid: Option<VidCommitment>,
121    ) -> Option<HotShotTaskCompleted> {
122        self.handle_view_change_legacy(event_stream, block_view, block_epoch, vid)
123            .await
124    }
125
126    /// legacy view change handler
127    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Transaction task", level = "error", target = "TransactionTaskState")]
128    pub async fn handle_view_change_legacy(
129        &mut self,
130        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
131        block_view: ViewNumber,
132        block_epoch: Option<EpochNumber>,
133        vid: Option<VidCommitment>,
134    ) -> Option<HotShotTaskCompleted> {
135        let version = match self.upgrade_lock.version(block_view) {
136            Ok(v) => v,
137            Err(err) => {
138                tracing::error!(
139                    "Upgrade certificate requires unsupported version, refusing to request \
140                     blocks: {err}"
141                );
142                return None;
143            },
144        };
145
146        // Short circuit if we are in epochs and we are likely proposing a transition block
147        // If it's the first view of the upgrade, we don't need to check for transition blocks
148        if version >= EPOCH_VERSION {
149            let Some(epoch) = block_epoch else {
150                tracing::error!("Epoch is required for epoch-based view change");
151                return None;
152            };
153            let high_qc = self.consensus.read().await.high_qc().clone();
154            let mut high_qc_block_number = if let Some(bn) = high_qc.data.block_number {
155                bn
156            } else {
157                // If it's the first view after the upgrade the high QC won't have a block number
158                // So just use the highest_block number we've stored
159                if block_view
160                    > self
161                        .upgrade_lock
162                        .upgrade_view()
163                        .unwrap_or(ViewNumber::new(0))
164                        + 1
165                {
166                    tracing::warn!("High QC in epoch version and not the first QC after upgrade");
167                    self.send_empty_block(event_stream, block_view, block_epoch, version)
168                        .await;
169                    return None;
170                }
171                // 0 here so we use the highest block number in the calculation below
172                0
173            };
174            high_qc_block_number = std::cmp::max(
175                high_qc_block_number,
176                self.consensus.read().await.highest_block,
177            );
178            if self
179                .consensus
180                .read()
181                .await
182                .transition_qc()
183                .is_some_and(|qc| {
184                    let Some(e) = qc.0.data.epoch else {
185                        return false;
186                    };
187                    e == epoch
188                })
189                || is_epoch_transition(high_qc_block_number, self.epoch_height)
190            {
191                // We are proposing a transition block it should be empty
192                if !is_last_block(high_qc_block_number, self.epoch_height) {
193                    tracing::info!(
194                        "Sending empty block event. View number: {block_view}. Parent Block \
195                         number: {high_qc_block_number}"
196                    );
197                    self.send_empty_block(event_stream, block_view, block_epoch, version)
198                        .await;
199                    return None;
200                }
201            }
202        }
203
204        // Request a block from the builder unless we are between versions.
205        let block = {
206            if self
207                .upgrade_lock
208                .decided_upgrade_cert()
209                .as_ref()
210                .is_some_and(|cert| cert.upgrading_in(block_view))
211            {
212                None
213            } else {
214                self.wait_for_block(block_view, vid).await
215            }
216        };
217
218        if let Some(BuilderResponse {
219            block_payload,
220            metadata,
221            fee,
222        }) = block
223        {
224            broadcast_event(
225                Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
226                    block_payload.encode(),
227                    metadata,
228                    block_view,
229                    block_epoch,
230                    vec1::vec1![fee],
231                ))),
232                event_stream,
233            )
234            .await;
235        } else {
236            self.send_empty_block(event_stream, block_view, block_epoch, version)
237                .await;
238        };
239
240        return None;
241    }
242
243    /// Send the event to the event stream that we are proposing an empty block
244    async fn send_empty_block(
245        &self,
246        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
247        block_view: ViewNumber,
248        block_epoch: Option<EpochNumber>,
249        version: Version,
250    ) {
251        // If we couldn't get a block, send an empty block
252        tracing::info!("Failed to get a block for view {block_view}, proposing empty block");
253
254        // Increment the metric for number of empty blocks proposed
255        self.consensus
256            .write()
257            .await
258            .metrics
259            .number_of_empty_blocks_proposed
260            .add(1);
261
262        let num_storage_nodes = match self
263            .membership_coordinator
264            .stake_table_for_epoch(block_epoch)
265            .await
266        {
267            Ok(epoch_stake_table) => epoch_stake_table.total_nodes().await,
268            Err(e) => {
269                tracing::warn!("Failed to get num_storage_nodes for epoch {block_epoch:?}: {e}");
270                return;
271            },
272        };
273
274        let Some(null_fee) = null_block::builder_fee::<TYPES>(num_storage_nodes, version) else {
275            tracing::error!("Failed to get null fee");
276            return;
277        };
278
279        // Create an empty block payload and metadata
280        let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
281
282        // Broadcast the empty block
283        broadcast_event(
284            Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
285                vec![].into(),
286                metadata,
287                block_view,
288                block_epoch,
289                vec1::vec1![null_fee],
290            ))),
291            event_stream,
292        )
293        .await;
294    }
295
296    /// Produce a null block
297    pub async fn null_block(
298        &self,
299        block_view: ViewNumber,
300        block_epoch: Option<EpochNumber>,
301        version: Version,
302        num_storage_nodes: usize,
303    ) -> Option<PackedBundle<TYPES>> {
304        let Some(null_fee) = null_block::builder_fee::<TYPES>(num_storage_nodes, version) else {
305            tracing::error!("Failed to calculate null block fee.");
306            return None;
307        };
308
309        // Create an empty block payload and metadata
310        let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
311
312        Some(PackedBundle::new(
313            vec![].into(),
314            metadata,
315            block_view,
316            block_epoch,
317            vec1::vec1![null_fee],
318        ))
319    }
320
321    /// main task event handler
322    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Transaction task", level = "error", target = "TransactionTaskState")]
323    pub async fn handle(
324        &mut self,
325        event: Arc<HotShotEvent<TYPES>>,
326        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
327    ) -> Result<()> {
328        match event.as_ref() {
329            HotShotEvent::TransactionsRecv(transactions) => {
330                broadcast_event(
331                    Event {
332                        view_number: self.cur_view,
333                        event: EventType::Transactions {
334                            transactions: transactions.clone(),
335                        },
336                    },
337                    &self.output_event_stream,
338                )
339                .await;
340            },
341            HotShotEvent::ViewChange(view, epoch) => {
342                let view = ViewNumber::new(std::cmp::max(1, **view));
343                ensure!(
344                    *view > *self.cur_view && *epoch >= self.cur_epoch,
345                    debug!(
346                        "Received a view change to an older view and epoch: tried to change view \
347                         to {view}and epoch {epoch:?} though we are at view {} and epoch {:?}",
348                        self.cur_view, self.cur_epoch
349                    )
350                );
351                self.cur_view = view;
352                self.cur_epoch = *epoch;
353
354                let leader = self
355                    .membership_coordinator
356                    .membership_for_epoch(*epoch)
357                    .await?
358                    .leader(view)
359                    .await?;
360                if leader == self.public_key {
361                    self.handle_view_change(&event_stream, view, *epoch, None)
362                        .await;
363                    return Ok(());
364                }
365            },
366            HotShotEvent::QuorumProposalValidated(proposal, _leaf) => {
367                let view_number = proposal.data.view_number();
368                let next_view = view_number + 1;
369
370                let version = match self.upgrade_lock.version(next_view) {
371                    Ok(v) => v,
372                    Err(e) => {
373                        tracing::error!("Failed to calculate version: {e:?}");
374                        return Ok(());
375                    },
376                };
377
378                if version < DRB_AND_HEADER_UPGRADE_VERSION {
379                    return Ok(());
380                }
381
382                let vid = proposal.data.block_header().payload_commitment();
383                let block_height = proposal.data.block_header().block_number();
384                if is_epoch_transition(block_height, self.epoch_height) {
385                    return Ok(());
386                }
387                if next_view <= self.cur_view {
388                    return Ok(());
389                }
390                // move to next view for this task only
391                self.cur_view = next_view;
392
393                let leader = self
394                    .membership_coordinator
395                    .membership_for_epoch(self.cur_epoch)
396                    .await?
397                    .leader(next_view)
398                    .await?;
399                if leader == self.public_key {
400                    self.handle_view_change(&event_stream, next_view, self.cur_epoch, Some(vid))
401                        .await;
402                    return Ok(());
403                }
404            },
405            _ => {},
406        }
407        Ok(())
408    }
409
410    /// Get VID commitment for the last successful view before `block_view`.
411    /// Returns None if we don't have said commitment recorded.
412    #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
413    async fn last_vid_commitment_retry(
414        &self,
415        block_view: ViewNumber,
416        task_start_time: Instant,
417    ) -> Result<(ViewNumber, VidCommitment)> {
418        loop {
419            match self.last_vid_commitment(block_view).await {
420                Ok((view, comm)) => break Ok((view, comm)),
421                Err(e) if task_start_time.elapsed() >= self.builder_timeout => break Err(e),
422                _ => {
423                    // We still have time, will re-try in a bit
424                    sleep(RETRY_DELAY).await;
425                    continue;
426                },
427            }
428        }
429    }
430
431    /// Get VID commitment for the last successful view before `block_view`.
432    /// Returns None if we don't have said commitment recorded.
433    #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
434    async fn last_vid_commitment(
435        &self,
436        block_view: ViewNumber,
437    ) -> Result<(ViewNumber, VidCommitment)> {
438        let consensus_reader = self.consensus.read().await;
439        let mut target_view = ViewNumber::new(block_view.saturating_sub(1));
440
441        loop {
442            let view_data = consensus_reader
443                .validated_state_map()
444                .get(&target_view)
445                .context(info!(
446                    "Missing record for view {target_view} in validated state",
447                ))?;
448
449            match &view_data.view_inner {
450                ViewInner::Da {
451                    payload_commitment, ..
452                } => return Ok((target_view, *payload_commitment)),
453                ViewInner::Leaf {
454                    leaf: leaf_commitment,
455                    ..
456                } => {
457                    let leaf = consensus_reader
458                        .saved_leaves()
459                        .get(leaf_commitment)
460                        .context(info!(
461                            "Missing leaf with commitment {leaf_commitment} for view \
462                             {target_view} in saved_leaves",
463                        ))?;
464                    return Ok((target_view, leaf.payload_commitment()));
465                },
466                ViewInner::Failed => {
467                    // For failed views, backtrack
468                    target_view = ViewNumber::new(target_view.checked_sub(1).context(warn!(
469                        "Reached genesis. Something is wrong -- have we not decided any blocks \
470                         since genesis?"
471                    ))?);
472                    continue;
473                },
474            }
475        }
476    }
477
478    #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view), name = "wait_for_block", level = "error")]
479    async fn wait_for_block(
480        &self,
481        block_view: ViewNumber,
482        vid: Option<VidCommitment>,
483    ) -> Option<BuilderResponse<TYPES>> {
484        let task_start_time = Instant::now();
485
486        // Find commitment to the block we want to build upon
487        let (parent_view, parent_comm) = if let Some(vid) = vid {
488            (block_view - 1, vid)
489        } else {
490            match self
491                .last_vid_commitment_retry(block_view, task_start_time)
492                .await
493            {
494                Ok((v, c)) => (v, c),
495                Err(e) => {
496                    tracing::warn!("Failed to find last vid commitment in time: {e}");
497                    return None;
498                },
499            }
500        };
501
502        let parent_comm_sig = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
503            &self.private_key,
504            parent_comm.as_ref(),
505        ) {
506            Ok(sig) => sig,
507            Err(err) => {
508                tracing::error!(%err, "Failed to sign block hash");
509                return None;
510            },
511        };
512
513        while task_start_time.elapsed() < self.builder_timeout {
514            match timeout(
515                self.builder_timeout
516                    .saturating_sub(task_start_time.elapsed()),
517                self.block_from_builder(parent_comm, parent_view, &parent_comm_sig),
518            )
519            .await
520            {
521                // We got a block
522                Ok(Ok(block)) => {
523                    return Some(block);
524                },
525
526                // We failed to get a block
527                Ok(Err(err)) => {
528                    tracing::info!("Couldn't get a block: {err:#}");
529                    // pause a bit
530                    sleep(RETRY_DELAY).await;
531                    continue;
532                },
533
534                // We timed out while getting available blocks
535                Err(err) => {
536                    tracing::info!(%err, "Timeout while getting available blocks");
537                    return None;
538                },
539            }
540        }
541
542        tracing::warn!("could not get a block from the builder in time");
543        None
544    }
545
546    /// Query the builders for available blocks. Queries only fraction of the builders
547    /// based on the response time.
548    async fn get_available_blocks(
549        &self,
550        parent_comm: VidCommitment,
551        view_number: ViewNumber,
552        parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
553    ) -> Vec<(AvailableBlockInfo<TYPES>, usize)> {
554        let tasks = self
555            .builder_clients
556            .iter()
557            .enumerate()
558            .map(|(builder_idx, client)| async move {
559                client
560                    .available_blocks(
561                        parent_comm,
562                        view_number.u64(),
563                        self.public_key.clone(),
564                        parent_comm_sig,
565                    )
566                    .await
567                    .map(move |blocks| {
568                        blocks
569                            .into_iter()
570                            .map(move |block_info| (block_info, builder_idx))
571                    })
572            })
573            .collect::<FuturesUnordered<_>>();
574        let mut results = Vec::with_capacity(self.builder_clients.len());
575        let query_start = Instant::now();
576        let threshold = (self.builder_clients.len() * BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND)
577            .div_ceil(BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR);
578        let mut tasks = tasks.take(threshold);
579        while let Some(result) = tasks.next().await {
580            results.push(result);
581            if query_start.elapsed() > BUILDER_MAIN_BATCH_CUTOFF {
582                break;
583            }
584        }
585        let timeout = sleep(std::cmp::max(
586            query_start
587                .elapsed()
588                .mul_f32(BUILDER_ADDITIONAL_TIME_MULTIPLIER),
589            BUILDER_MINIMUM_QUERY_TIME.saturating_sub(query_start.elapsed()),
590        ));
591        futures::pin_mut!(timeout);
592        let mut tasks = tasks.into_inner().take_until(timeout);
593        while let Some(result) = tasks.next().await {
594            results.push(result);
595        }
596        results
597            .into_iter()
598            .filter_map(|result| result.ok())
599            .flatten()
600            .collect::<Vec<_>>()
601    }
602
603    /// Get a block from builder.
604    /// Queries the sufficiently fast builders for available blocks and chooses the one with the
605    /// best fee/byte ratio, re-trying with the next best one in case of failure.
606    ///
607    /// # Errors
608    /// If none of the builder reports any available blocks or claiming block fails for all of the
609    /// builders.
610    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "block_from_builder", level = "error")]
611    async fn block_from_builder(
612        &self,
613        parent_comm: VidCommitment,
614        view_number: ViewNumber,
615        parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
616    ) -> Result<BuilderResponse<TYPES>> {
617        let mut available_blocks = self
618            .get_available_blocks(parent_comm, view_number, parent_comm_sig)
619            .await;
620
621        available_blocks.sort_by(|(l, _), (r, _)| {
622            // We want the block with the highest fee per byte of data we're going to have to
623            // process, thus our comparison function is:
624            //      (l.offered_fee / l.block_size) < (r.offered_fee / r.block_size)
625            // To avoid floating point math (which doesn't even have an `Ord` impl) we multiply
626            // through by the denominators to get
627            //      l.offered_fee * r.block_size < r.offered_fee * l.block_size
628            // We cast up to u128 to avoid overflow.
629            (u128::from(l.offered_fee) * u128::from(r.block_size))
630                .cmp(&(u128::from(r.offered_fee) * u128::from(l.block_size)))
631        });
632
633        if available_blocks.is_empty() {
634            tracing::info!("No available blocks");
635            bail!("No available blocks");
636        }
637
638        for (block_info, builder_idx) in available_blocks {
639            // Verify signature over chosen block.
640            if !block_info.sender.validate_block_info_signature(
641                &block_info.signature,
642                block_info.block_size,
643                block_info.offered_fee,
644                &block_info.block_hash,
645            ) {
646                tracing::warn!("Failed to verify available block info response message signature");
647                continue;
648            }
649
650            let request_signature = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
651                &self.private_key,
652                block_info.block_hash.as_ref(),
653            ) {
654                Ok(request_signature) => request_signature,
655                Err(err) => {
656                    tracing::error!(%err, "Failed to sign block hash");
657                    continue;
658                },
659            };
660
661            let response = {
662                let client = &self.builder_clients[builder_idx];
663
664                let (block, either_header_input) = futures::join! {
665                    client.claim_block(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature),
666                    client.claim_either_block_header_input(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature)
667                };
668
669                let block_data = match block {
670                    Ok(block_data) => block_data,
671                    Err(err) => {
672                        tracing::warn!(%err, "Error claiming block data");
673                        continue;
674                    },
675                };
676
677                let Ok(either_header_input) = either_header_input
678                    .inspect_err(|err| tracing::warn!(%err, "Error claiming header input"))
679                else {
680                    continue;
681                };
682
683                let Some(header_input) = either_header_input
684                    .validate_signature_and_get_input(block_info.offered_fee, &block_data.metadata)
685                else {
686                    tracing::warn!(
687                        "Failed to verify available new or legacy block header input data \
688                         response message signature"
689                    );
690                    continue;
691                };
692
693                // verify the signature over the message
694                if !block_data.validate_signature() {
695                    tracing::warn!(
696                        "Failed to verify available block data response message signature"
697                    );
698                    continue;
699                }
700
701                let fee = BuilderFee {
702                    fee_amount: block_info.offered_fee,
703                    fee_account: header_input.sender,
704                    fee_signature: header_input.fee_signature,
705                };
706
707                BuilderResponse {
708                    fee,
709                    block_payload: block_data.block_payload,
710                    metadata: block_data.metadata,
711                }
712            };
713
714            return Ok(response);
715        }
716
717        bail!("Couldn't claim a block from any of the builders");
718    }
719}
720
721#[async_trait]
722/// task state implementation for Transactions Task
723impl<TYPES: NodeType> TaskState for TransactionTaskState<TYPES> {
724    type Event = HotShotEvent<TYPES>;
725
726    async fn handle_event(
727        &mut self,
728        event: Arc<Self::Event>,
729        sender: &Sender<Arc<Self::Event>>,
730        _receiver: &Receiver<Arc<Self::Event>>,
731    ) -> Result<()> {
732        self.handle(event, sender.clone()).await
733    }
734
735    fn cancel_subtasks(&mut self) {}
736}