hotshot_task_impls/
transactions.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    sync::Arc,
9    time::{Duration, Instant},
10};
11
12use async_broadcast::{Receiver, Sender};
13use async_trait::async_trait;
14use futures::{stream::FuturesUnordered, StreamExt};
15use hotshot_builder_api::v0_1::block_info::AvailableBlockInfo;
16use hotshot_task::task::TaskState;
17use hotshot_types::{
18    consensus::OuterConsensus,
19    data::{null_block, PackedBundle, VidCommitment},
20    epoch_membership::EpochMembershipCoordinator,
21    event::{Event, EventType},
22    message::UpgradeLock,
23    traits::{
24        block_contents::{BlockHeader, BuilderFee, EncodeBytes},
25        node_implementation::{ConsensusTime, NodeType, Versions},
26        signature_key::{BuilderSignatureKey, SignatureKey},
27        BlockPayload,
28    },
29    utils::{is_epoch_transition, is_last_block, ViewInner},
30};
31use hotshot_utils::anytrace::*;
32use tokio::time::{sleep, timeout};
33use tracing::instrument;
34use vbs::version::{StaticVersionType, Version};
35
36use crate::{
37    builder::v0_1::BuilderClient as BuilderClientBase,
38    events::{HotShotEvent, HotShotTaskCompleted},
39    helpers::broadcast_event,
40};
41
42// Parameters for builder querying algorithm
43
44/// Proportion of builders queried in first batch, dividend
45const BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND: usize = 2;
46/// Proportion of builders queried in the first batch, divisor
47const BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR: usize = 3;
48/// Time the first batch of builders has to respond
49const BUILDER_MAIN_BATCH_CUTOFF: Duration = Duration::from_millis(700);
50/// Multiplier for extra time to give to the second batch of builders
51const BUILDER_ADDITIONAL_TIME_MULTIPLIER: f32 = 0.2;
52/// Minimum amount of time allotted to both batches, cannot be cut shorter if the first batch
53/// responds extremely fast.
54const BUILDER_MINIMUM_QUERY_TIME: Duration = Duration::from_millis(300);
55/// Delay between re-tries on unsuccessful calls
56const RETRY_DELAY: Duration = Duration::from_millis(100);
57
58/// Builder Provided Responses
59pub struct BuilderResponse<TYPES: NodeType> {
60    /// Fee information
61    pub fee: BuilderFee<TYPES>,
62
63    /// Block payload
64    pub block_payload: TYPES::BlockPayload,
65
66    /// Block metadata
67    pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
68}
69
70/// Tracks state of a Transaction task
71pub struct TransactionTaskState<TYPES: NodeType, V: Versions> {
72    /// The state's api
73    pub builder_timeout: Duration,
74
75    /// Output events to application
76    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
77
78    /// View number this view is executing in.
79    pub cur_view: TYPES::View,
80
81    /// Epoch number this node is executing in.
82    pub cur_epoch: Option<TYPES::Epoch>,
83
84    /// Reference to consensus. Leader will require a read lock on this.
85    pub consensus: OuterConsensus<TYPES>,
86
87    /// Membership for the quorum
88    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
89
90    /// Builder 0.1 API clients
91    pub builder_clients: Vec<BuilderClientBase<TYPES>>,
92
93    /// This Nodes Public Key
94    pub public_key: TYPES::SignatureKey,
95
96    /// Our Private Key
97    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
98
99    /// InstanceState
100    pub instance_state: Arc<TYPES::InstanceState>,
101
102    /// This state's ID
103    pub id: u64,
104
105    /// Lock for a decided upgrade
106    pub upgrade_lock: UpgradeLock<TYPES, V>,
107
108    /// Number of blocks in an epoch, zero means there are no epochs
109    pub epoch_height: u64,
110}
111
112impl<TYPES: NodeType, V: Versions> TransactionTaskState<TYPES, V> {
113    /// handle view change decide legacy or not
114    pub async fn handle_view_change(
115        &mut self,
116        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
117        block_view: TYPES::View,
118        block_epoch: Option<TYPES::Epoch>,
119        vid: Option<VidCommitment>,
120    ) -> Option<HotShotTaskCompleted> {
121        self.handle_view_change_legacy(event_stream, block_view, block_epoch, vid)
122            .await
123    }
124
125    /// legacy view change handler
126    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Transaction task", level = "error", target = "TransactionTaskState")]
127    pub async fn handle_view_change_legacy(
128        &mut self,
129        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
130        block_view: TYPES::View,
131        block_epoch: Option<TYPES::Epoch>,
132        vid: Option<VidCommitment>,
133    ) -> Option<HotShotTaskCompleted> {
134        let version = match self.upgrade_lock.version(block_view).await {
135            Ok(v) => v,
136            Err(err) => {
137                tracing::error!(
138                    "Upgrade certificate requires unsupported version, refusing to request \
139                     blocks: {err}"
140                );
141                return None;
142            },
143        };
144
145        // Short circuit if we are in epochs and we are likely proposing a transition block
146        // If it's the first view of the upgrade, we don't need to check for transition blocks
147        if version >= V::Epochs::VERSION {
148            let Some(epoch) = block_epoch else {
149                tracing::error!("Epoch is required for epoch-based view change");
150                return None;
151            };
152            let high_qc = self.consensus.read().await.high_qc().clone();
153            let mut high_qc_block_number = if let Some(bn) = high_qc.data.block_number {
154                bn
155            } else {
156                // If it's the first view after the upgrade the high QC won't have a block number
157                // So just use the highest_block number we've stored
158                if block_view
159                    > self
160                        .upgrade_lock
161                        .upgrade_view()
162                        .await
163                        .unwrap_or(TYPES::View::new(0))
164                        + 1
165                {
166                    tracing::warn!("High QC in epoch version and not the first QC after upgrade");
167                    self.send_empty_block(event_stream, block_view, block_epoch, version)
168                        .await;
169                    return None;
170                }
171                // 0 here so we use the highest block number in the calculation below
172                0
173            };
174            high_qc_block_number = std::cmp::max(
175                high_qc_block_number,
176                self.consensus.read().await.highest_block,
177            );
178            if self
179                .consensus
180                .read()
181                .await
182                .transition_qc()
183                .is_some_and(|qc| {
184                    let Some(e) = qc.0.data.epoch else {
185                        return false;
186                    };
187                    e == epoch
188                })
189                || is_epoch_transition(high_qc_block_number, self.epoch_height)
190            {
191                // We are proposing a transition block it should be empty
192                if !is_last_block(high_qc_block_number, self.epoch_height) {
193                    tracing::info!(
194                        "Sending empty block event. View number: {block_view}. Parent Block \
195                         number: {high_qc_block_number}"
196                    );
197                    self.send_empty_block(event_stream, block_view, block_epoch, version)
198                        .await;
199                    return None;
200                }
201            }
202        }
203
204        // Request a block from the builder unless we are between versions.
205        let block = {
206            if self
207                .upgrade_lock
208                .decided_upgrade_certificate
209                .read()
210                .await
211                .as_ref()
212                .is_some_and(|cert| cert.upgrading_in(block_view))
213            {
214                None
215            } else {
216                self.wait_for_block(block_view, vid).await
217            }
218        };
219
220        if let Some(BuilderResponse {
221            block_payload,
222            metadata,
223            fee,
224        }) = block
225        {
226            broadcast_event(
227                Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
228                    block_payload.encode(),
229                    metadata,
230                    block_view,
231                    block_epoch,
232                    vec1::vec1![fee],
233                ))),
234                event_stream,
235            )
236            .await;
237        } else {
238            self.send_empty_block(event_stream, block_view, block_epoch, version)
239                .await;
240        };
241
242        return None;
243    }
244
245    /// Send the event to the event stream that we are proposing an empty block
246    async fn send_empty_block(
247        &self,
248        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
249        block_view: TYPES::View,
250        block_epoch: Option<TYPES::Epoch>,
251        version: Version,
252    ) {
253        // If we couldn't get a block, send an empty block
254        tracing::info!("Failed to get a block for view {block_view}, proposing empty block");
255
256        // Increment the metric for number of empty blocks proposed
257        self.consensus
258            .write()
259            .await
260            .metrics
261            .number_of_empty_blocks_proposed
262            .add(1);
263
264        let num_storage_nodes = match self
265            .membership_coordinator
266            .stake_table_for_epoch(block_epoch)
267            .await
268        {
269            Ok(epoch_stake_table) => epoch_stake_table.total_nodes().await,
270            Err(e) => {
271                tracing::warn!("Failed to get num_storage_nodes for epoch {block_epoch:?}: {e}");
272                return;
273            },
274        };
275
276        let Some(null_fee) = null_block::builder_fee::<TYPES, V>(num_storage_nodes, version) else {
277            tracing::error!("Failed to get null fee");
278            return;
279        };
280
281        // Create an empty block payload and metadata
282        let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
283
284        // Broadcast the empty block
285        broadcast_event(
286            Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
287                vec![].into(),
288                metadata,
289                block_view,
290                block_epoch,
291                vec1::vec1![null_fee],
292            ))),
293            event_stream,
294        )
295        .await;
296    }
297
298    /// Produce a null block
299    pub async fn null_block(
300        &self,
301        block_view: TYPES::View,
302        block_epoch: Option<TYPES::Epoch>,
303        version: Version,
304        num_storage_nodes: usize,
305    ) -> Option<PackedBundle<TYPES>> {
306        let Some(null_fee) = null_block::builder_fee::<TYPES, V>(num_storage_nodes, version) else {
307            tracing::error!("Failed to calculate null block fee.");
308            return None;
309        };
310
311        // Create an empty block payload and metadata
312        let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
313
314        Some(PackedBundle::new(
315            vec![].into(),
316            metadata,
317            block_view,
318            block_epoch,
319            vec1::vec1![null_fee],
320        ))
321    }
322
323    /// main task event handler
324    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Transaction task", level = "error", target = "TransactionTaskState")]
325    pub async fn handle(
326        &mut self,
327        event: Arc<HotShotEvent<TYPES>>,
328        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
329    ) -> Result<()> {
330        match event.as_ref() {
331            HotShotEvent::TransactionsRecv(transactions) => {
332                broadcast_event(
333                    Event {
334                        view_number: self.cur_view,
335                        event: EventType::Transactions {
336                            transactions: transactions.clone(),
337                        },
338                    },
339                    &self.output_event_stream,
340                )
341                .await;
342            },
343            HotShotEvent::ViewChange(view, epoch) => {
344                let view = TYPES::View::new(std::cmp::max(1, **view));
345                ensure!(
346                    *view > *self.cur_view && *epoch >= self.cur_epoch,
347                    debug!(
348                        "Received a view change to an older view and epoch: tried to change view \
349                         to {view}and epoch {epoch:?} though we are at view {} and epoch {:?}",
350                        self.cur_view, self.cur_epoch
351                    )
352                );
353                self.cur_view = view;
354                self.cur_epoch = *epoch;
355
356                let leader = self
357                    .membership_coordinator
358                    .membership_for_epoch(*epoch)
359                    .await?
360                    .leader(view)
361                    .await?;
362                if leader == self.public_key {
363                    self.handle_view_change(&event_stream, view, *epoch, None)
364                        .await;
365                    return Ok(());
366                }
367            },
368            HotShotEvent::QuorumProposalValidated(proposal, _leaf) => {
369                let view_number = proposal.data.view_number();
370                let next_view = view_number + 1;
371
372                let version = match self.upgrade_lock.version(next_view).await {
373                    Ok(v) => v,
374                    Err(e) => {
375                        tracing::error!("Failed to calculate version: {e:?}");
376                        return Ok(());
377                    },
378                };
379
380                if version < V::DrbAndHeaderUpgrade::VERSION {
381                    return Ok(());
382                }
383
384                let vid = proposal.data.block_header().payload_commitment();
385                let block_height = proposal.data.block_header().block_number();
386                if is_epoch_transition(block_height, self.epoch_height) {
387                    return Ok(());
388                }
389                if next_view <= self.cur_view {
390                    return Ok(());
391                }
392                // move to next view for this task only
393                self.cur_view = next_view;
394
395                let leader = self
396                    .membership_coordinator
397                    .membership_for_epoch(self.cur_epoch)
398                    .await?
399                    .leader(next_view)
400                    .await?;
401                if leader == self.public_key {
402                    self.handle_view_change(&event_stream, next_view, self.cur_epoch, Some(vid))
403                        .await;
404                    return Ok(());
405                }
406            },
407            _ => {},
408        }
409        Ok(())
410    }
411
412    /// Get VID commitment for the last successful view before `block_view`.
413    /// Returns None if we don't have said commitment recorded.
414    #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
415    async fn last_vid_commitment_retry(
416        &self,
417        block_view: TYPES::View,
418        task_start_time: Instant,
419    ) -> Result<(TYPES::View, VidCommitment)> {
420        loop {
421            match self.last_vid_commitment(block_view).await {
422                Ok((view, comm)) => break Ok((view, comm)),
423                Err(e) if task_start_time.elapsed() >= self.builder_timeout => break Err(e),
424                _ => {
425                    // We still have time, will re-try in a bit
426                    sleep(RETRY_DELAY).await;
427                    continue;
428                },
429            }
430        }
431    }
432
433    /// Get VID commitment for the last successful view before `block_view`.
434    /// Returns None if we don't have said commitment recorded.
435    #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
436    async fn last_vid_commitment(
437        &self,
438        block_view: TYPES::View,
439    ) -> Result<(TYPES::View, VidCommitment)> {
440        let consensus_reader = self.consensus.read().await;
441        let mut target_view = TYPES::View::new(block_view.saturating_sub(1));
442
443        loop {
444            let view_data = consensus_reader
445                .validated_state_map()
446                .get(&target_view)
447                .context(info!(
448                    "Missing record for view {target_view} in validated state",
449                ))?;
450
451            match &view_data.view_inner {
452                ViewInner::Da {
453                    payload_commitment, ..
454                } => return Ok((target_view, *payload_commitment)),
455                ViewInner::Leaf {
456                    leaf: leaf_commitment,
457                    ..
458                } => {
459                    let leaf = consensus_reader
460                        .saved_leaves()
461                        .get(leaf_commitment)
462                        .context(info!(
463                            "Missing leaf with commitment {leaf_commitment} for view \
464                             {target_view} in saved_leaves",
465                        ))?;
466                    return Ok((target_view, leaf.payload_commitment()));
467                },
468                ViewInner::Failed => {
469                    // For failed views, backtrack
470                    target_view = TYPES::View::new(target_view.checked_sub(1).context(warn!(
471                        "Reached genesis. Something is wrong -- have we not decided any blocks \
472                         since genesis?"
473                    ))?);
474                    continue;
475                },
476            }
477        }
478    }
479
480    #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view), name = "wait_for_block", level = "error")]
481    async fn wait_for_block(
482        &self,
483        block_view: TYPES::View,
484        vid: Option<VidCommitment>,
485    ) -> Option<BuilderResponse<TYPES>> {
486        let task_start_time = Instant::now();
487
488        // Find commitment to the block we want to build upon
489        let (parent_view, parent_comm) = if let Some(vid) = vid {
490            (block_view - 1, vid)
491        } else {
492            match self
493                .last_vid_commitment_retry(block_view, task_start_time)
494                .await
495            {
496                Ok((v, c)) => (v, c),
497                Err(e) => {
498                    tracing::warn!("Failed to find last vid commitment in time: {e}");
499                    return None;
500                },
501            }
502        };
503
504        let parent_comm_sig = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
505            &self.private_key,
506            parent_comm.as_ref(),
507        ) {
508            Ok(sig) => sig,
509            Err(err) => {
510                tracing::error!(%err, "Failed to sign block hash");
511                return None;
512            },
513        };
514
515        while task_start_time.elapsed() < self.builder_timeout {
516            match timeout(
517                self.builder_timeout
518                    .saturating_sub(task_start_time.elapsed()),
519                self.block_from_builder(parent_comm, parent_view, &parent_comm_sig),
520            )
521            .await
522            {
523                // We got a block
524                Ok(Ok(block)) => {
525                    return Some(block);
526                },
527
528                // We failed to get a block
529                Ok(Err(err)) => {
530                    tracing::info!("Couldn't get a block: {err:#}");
531                    // pause a bit
532                    sleep(RETRY_DELAY).await;
533                    continue;
534                },
535
536                // We timed out while getting available blocks
537                Err(err) => {
538                    tracing::info!(%err, "Timeout while getting available blocks");
539                    return None;
540                },
541            }
542        }
543
544        tracing::warn!("could not get a block from the builder in time");
545        None
546    }
547
548    /// Query the builders for available blocks. Queries only fraction of the builders
549    /// based on the response time.
550    async fn get_available_blocks(
551        &self,
552        parent_comm: VidCommitment,
553        view_number: TYPES::View,
554        parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
555    ) -> Vec<(AvailableBlockInfo<TYPES>, usize)> {
556        let tasks = self
557            .builder_clients
558            .iter()
559            .enumerate()
560            .map(|(builder_idx, client)| async move {
561                client
562                    .available_blocks(
563                        parent_comm,
564                        view_number.u64(),
565                        self.public_key.clone(),
566                        parent_comm_sig,
567                    )
568                    .await
569                    .map(move |blocks| {
570                        blocks
571                            .into_iter()
572                            .map(move |block_info| (block_info, builder_idx))
573                    })
574            })
575            .collect::<FuturesUnordered<_>>();
576        let mut results = Vec::with_capacity(self.builder_clients.len());
577        let query_start = Instant::now();
578        let threshold = (self.builder_clients.len() * BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND)
579            .div_ceil(BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR);
580        let mut tasks = tasks.take(threshold);
581        while let Some(result) = tasks.next().await {
582            results.push(result);
583            if query_start.elapsed() > BUILDER_MAIN_BATCH_CUTOFF {
584                break;
585            }
586        }
587        let timeout = sleep(std::cmp::max(
588            query_start
589                .elapsed()
590                .mul_f32(BUILDER_ADDITIONAL_TIME_MULTIPLIER),
591            BUILDER_MINIMUM_QUERY_TIME.saturating_sub(query_start.elapsed()),
592        ));
593        futures::pin_mut!(timeout);
594        let mut tasks = tasks.into_inner().take_until(timeout);
595        while let Some(result) = tasks.next().await {
596            results.push(result);
597        }
598        results
599            .into_iter()
600            .filter_map(|result| result.ok())
601            .flatten()
602            .collect::<Vec<_>>()
603    }
604
605    /// Get a block from builder.
606    /// Queries the sufficiently fast builders for available blocks and chooses the one with the
607    /// best fee/byte ratio, re-trying with the next best one in case of failure.
608    ///
609    /// # Errors
610    /// If none of the builder reports any available blocks or claiming block fails for all of the
611    /// builders.
612    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "block_from_builder", level = "error")]
613    async fn block_from_builder(
614        &self,
615        parent_comm: VidCommitment,
616        view_number: TYPES::View,
617        parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
618    ) -> Result<BuilderResponse<TYPES>> {
619        let mut available_blocks = self
620            .get_available_blocks(parent_comm, view_number, parent_comm_sig)
621            .await;
622
623        available_blocks.sort_by(|(l, _), (r, _)| {
624            // We want the block with the highest fee per byte of data we're going to have to
625            // process, thus our comparison function is:
626            //      (l.offered_fee / l.block_size) < (r.offered_fee / r.block_size)
627            // To avoid floating point math (which doesn't even have an `Ord` impl) we multiply
628            // through by the denominators to get
629            //      l.offered_fee * r.block_size < r.offered_fee * l.block_size
630            // We cast up to u128 to avoid overflow.
631            (u128::from(l.offered_fee) * u128::from(r.block_size))
632                .cmp(&(u128::from(r.offered_fee) * u128::from(l.block_size)))
633        });
634
635        if available_blocks.is_empty() {
636            tracing::info!("No available blocks");
637            bail!("No available blocks");
638        }
639
640        for (block_info, builder_idx) in available_blocks {
641            // Verify signature over chosen block.
642            if !block_info.sender.validate_block_info_signature(
643                &block_info.signature,
644                block_info.block_size,
645                block_info.offered_fee,
646                &block_info.block_hash,
647            ) {
648                tracing::warn!("Failed to verify available block info response message signature");
649                continue;
650            }
651
652            let request_signature = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
653                &self.private_key,
654                block_info.block_hash.as_ref(),
655            ) {
656                Ok(request_signature) => request_signature,
657                Err(err) => {
658                    tracing::error!(%err, "Failed to sign block hash");
659                    continue;
660                },
661            };
662
663            let response = {
664                let client = &self.builder_clients[builder_idx];
665
666                let (block, either_header_input) = futures::join! {
667                    client.claim_block(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature),
668                    client.claim_either_block_header_input(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature)
669                };
670
671                let block_data = match block {
672                    Ok(block_data) => block_data,
673                    Err(err) => {
674                        tracing::warn!(%err, "Error claiming block data");
675                        continue;
676                    },
677                };
678
679                let Ok(either_header_input) = either_header_input
680                    .inspect_err(|err| tracing::warn!(%err, "Error claiming header input"))
681                else {
682                    continue;
683                };
684
685                let Some(header_input) = either_header_input
686                    .validate_signature_and_get_input(block_info.offered_fee, &block_data.metadata)
687                else {
688                    tracing::warn!(
689                        "Failed to verify available new or legacy block header input data \
690                         response message signature"
691                    );
692                    continue;
693                };
694
695                // verify the signature over the message
696                if !block_data.validate_signature() {
697                    tracing::warn!(
698                        "Failed to verify available block data response message signature"
699                    );
700                    continue;
701                }
702
703                let fee = BuilderFee {
704                    fee_amount: block_info.offered_fee,
705                    fee_account: header_input.sender,
706                    fee_signature: header_input.fee_signature,
707                };
708
709                BuilderResponse {
710                    fee,
711                    block_payload: block_data.block_payload,
712                    metadata: block_data.metadata,
713                }
714            };
715
716            return Ok(response);
717        }
718
719        bail!("Couldn't claim a block from any of the builders");
720    }
721}
722
723#[async_trait]
724/// task state implementation for Transactions Task
725impl<TYPES: NodeType, V: Versions> TaskState for TransactionTaskState<TYPES, V> {
726    type Event = HotShotEvent<TYPES>;
727
728    async fn handle_event(
729        &mut self,
730        event: Arc<Self::Event>,
731        sender: &Sender<Arc<Self::Event>>,
732        _receiver: &Receiver<Arc<Self::Event>>,
733    ) -> Result<()> {
734        self.handle(event, sender.clone()).await
735    }
736
737    fn cancel_subtasks(&mut self) {}
738}