hotshot_task_impls/quorum_proposal/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, Receiver, Sender};
10use async_trait::async_trait;
11use either::Either;
12use hotshot_task::{
13    dependency::{AndDependency, EventDependency, OrDependency},
14    dependency_task::DependencyTask,
15    task::TaskState,
16};
17use hotshot_types::{
18    consensus::OuterConsensus,
19    epoch_membership::EpochMembershipCoordinator,
20    message::UpgradeLock,
21    simple_certificate::{
22        EpochRootQuorumCertificateV2, LightClientStateUpdateCertificateV2,
23        NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
24    },
25    stake_table::StakeTableEntries,
26    traits::{
27        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
28        signature_key::SignatureKey,
29        storage::Storage,
30    },
31    utils::{is_epoch_transition, is_last_block, EpochTransitionIndicator},
32    vote::{Certificate, HasViewNumber},
33};
34use hotshot_utils::anytrace::*;
35use tracing::instrument;
36
37use self::handlers::{ProposalDependency, ProposalDependencyHandle};
38use crate::{
39    events::HotShotEvent, helpers::broadcast_view_change,
40    quorum_proposal::handlers::handle_eqc_formed,
41};
42
43mod handlers;
44
45/// The state for the quorum proposal task.
46pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
47    /// Latest view number that has been proposed for.
48    pub latest_proposed_view: TYPES::View,
49
50    /// Current epoch
51    pub cur_epoch: Option<TYPES::Epoch>,
52
53    /// Table for the in-progress proposal dependency tasks.
54    pub proposal_dependencies: BTreeMap<TYPES::View, Sender<()>>,
55
56    /// Formed QCs
57    pub formed_quorum_certificates: BTreeMap<TYPES::View, QuorumCertificate2<TYPES>>,
58
59    /// Formed QCs for the next epoch
60    pub formed_next_epoch_quorum_certificates:
61        BTreeMap<TYPES::View, NextEpochQuorumCertificate2<TYPES>>,
62
63    /// Immutable instance state
64    pub instance_state: Arc<TYPES::InstanceState>,
65
66    /// Membership for Quorum Certs/votes
67    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
68
69    /// Our public key
70    pub public_key: TYPES::SignatureKey,
71
72    /// Our Private Key
73    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
74
75    /// View timeout from config.
76    pub timeout: u64,
77
78    /// This node's storage ref
79    pub storage: I::Storage,
80
81    /// Shared consensus task state
82    pub consensus: OuterConsensus<TYPES>,
83
84    /// The node's id
85    pub id: u64,
86
87    /// The most recent upgrade certificate this node formed.
88    /// Note: this is ONLY for certificates that have been formed internally,
89    /// so that we can propose with them.
90    ///
91    /// Certificates received from other nodes will get reattached regardless of this fields,
92    /// since they will be present in the leaf we propose off of.
93    pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
94
95    /// Lock for a decided upgrade
96    pub upgrade_lock: UpgradeLock<TYPES, V>,
97
98    /// Number of blocks in an epoch, zero means there are no epochs
99    pub epoch_height: u64,
100
101    /// Formed light client state update certificates
102    pub formed_state_cert: BTreeMap<TYPES::Epoch, LightClientStateUpdateCertificateV2<TYPES>>,
103
104    /// First view in which epoch version takes effect
105    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
106}
107
108impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
109    QuorumProposalTaskState<TYPES, I, V>
110{
111    /// Create an event dependency
112    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create event dependency", level = "info")]
113    fn create_event_dependency(
114        &self,
115        dependency_type: ProposalDependency,
116        view_number: TYPES::View,
117        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
118        cancel_receiver: Receiver<()>,
119    ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
120        let id = self.id;
121        EventDependency::new(
122            event_receiver,
123            cancel_receiver,
124            format!(
125                "ProposalDependency::{:?} for view {:?}, my id {:?}",
126                dependency_type, view_number, self.id
127            ),
128            Box::new(move |event| {
129                let event = event.as_ref();
130                let event_view = match dependency_type {
131                    ProposalDependency::Qc => {
132                        if let HotShotEvent::Qc2Formed(either::Left(qc)) = event {
133                            qc.view_number() + 1
134                        } else if let HotShotEvent::EpochRootQcFormed(root_qc) = event {
135                            root_qc.view_number() + 1
136                        } else {
137                            return false;
138                        }
139                    },
140                    ProposalDependency::TimeoutCert => {
141                        if let HotShotEvent::Qc2Formed(either::Right(timeout)) = event {
142                            timeout.view_number() + 1
143                        } else {
144                            return false;
145                        }
146                    },
147                    ProposalDependency::ViewSyncCert => {
148                        if let HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_cert) = event
149                        {
150                            view_sync_cert.view_number()
151                        } else {
152                            return false;
153                        }
154                    },
155                    ProposalDependency::Proposal => {
156                        if let HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) = event
157                        {
158                            proposal.data.view_number() + 1
159                        } else {
160                            return false;
161                        }
162                    },
163                    ProposalDependency::PayloadAndMetadata => {
164                        if let HotShotEvent::SendPayloadCommitmentAndMetadata(
165                            _payload_commitment,
166                            _builder_commitment,
167                            _metadata,
168                            view_number,
169                            _fee,
170                        ) = event
171                        {
172                            *view_number
173                        } else {
174                            return false;
175                        }
176                    },
177                    ProposalDependency::VidShare => {
178                        if let HotShotEvent::VidDisperseSend(vid_disperse, _) = event {
179                            vid_disperse.data.view_number()
180                        } else {
181                            return false;
182                        }
183                    },
184                };
185                let valid = event_view == view_number;
186                if valid {
187                    tracing::debug!(
188                        "Dependency {dependency_type:?} is complete for view {event_view:?}, my \
189                         id is {id:?}!",
190                    );
191                }
192                valid
193            }),
194        )
195    }
196
197    /// Creates the requisite dependencies for the Quorum Proposal task. It also handles any event forwarding.
198    fn create_and_complete_dependencies(
199        &self,
200        view_number: TYPES::View,
201        event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
202        event: Arc<HotShotEvent<TYPES>>,
203        cancel_receiver: &Receiver<()>,
204    ) -> AndDependency<Vec<Vec<Arc<HotShotEvent<TYPES>>>>> {
205        let mut proposal_dependency = self.create_event_dependency(
206            ProposalDependency::Proposal,
207            view_number,
208            event_receiver.clone(),
209            cancel_receiver.clone(),
210        );
211
212        let mut qc_dependency = self.create_event_dependency(
213            ProposalDependency::Qc,
214            view_number,
215            event_receiver.clone(),
216            cancel_receiver.clone(),
217        );
218
219        let mut view_sync_dependency = self.create_event_dependency(
220            ProposalDependency::ViewSyncCert,
221            view_number,
222            event_receiver.clone(),
223            cancel_receiver.clone(),
224        );
225
226        let mut timeout_dependency = self.create_event_dependency(
227            ProposalDependency::TimeoutCert,
228            view_number,
229            event_receiver.clone(),
230            cancel_receiver.clone(),
231        );
232
233        let mut payload_commitment_dependency = self.create_event_dependency(
234            ProposalDependency::PayloadAndMetadata,
235            view_number,
236            event_receiver.clone(),
237            cancel_receiver.clone(),
238        );
239
240        let mut vid_share_dependency = self.create_event_dependency(
241            ProposalDependency::VidShare,
242            view_number,
243            event_receiver.clone(),
244            cancel_receiver.clone(),
245        );
246
247        let epoch_height = self.epoch_height;
248
249        // Next epoch QC dependency is fulfilled if we get the next epoch QC or
250        // form a current qc that isn't during transition
251        let mut next_epoch_qc_dependency = EventDependency::new(
252            event_receiver.clone(),
253            cancel_receiver.clone(),
254            format!(
255                "ProposalDependency Next epoch QC for view {:?}, my id {:?}",
256                view_number, self.id
257            ),
258            Box::new(move |event| {
259                if let HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) =
260                    event.as_ref()
261                {
262                    return next_epoch_qc.view_number() + 1 == view_number;
263                }
264                if let HotShotEvent::EpochRootQcFormed(..) = event.as_ref() {
265                    // Epoch root QC is always not in epoch transition
266                    return true;
267                }
268                if let HotShotEvent::Qc2Formed(Either::Left(qc)) = event.as_ref() {
269                    if qc.view_number() + 1 == view_number {
270                        return qc
271                            .data
272                            .block_number
273                            .is_none_or(|bn| !is_epoch_transition(bn, epoch_height));
274                    }
275                }
276                false
277            }),
278        );
279
280        match event.as_ref() {
281            HotShotEvent::SendPayloadCommitmentAndMetadata(..) => {
282                payload_commitment_dependency.mark_as_completed(Arc::clone(&event));
283            },
284            HotShotEvent::QuorumProposalPreliminarilyValidated(..) => {
285                proposal_dependency.mark_as_completed(event);
286            },
287            HotShotEvent::Qc2Formed(quorum_certificate) => match quorum_certificate {
288                Either::Right(_) => timeout_dependency.mark_as_completed(event),
289                Either::Left(qc) => {
290                    if qc
291                        .data
292                        .block_number
293                        .is_none_or(|bn| !is_epoch_transition(bn, epoch_height))
294                    {
295                        next_epoch_qc_dependency.mark_as_completed(event.clone());
296                    }
297                    qc_dependency.mark_as_completed(event);
298                },
299            },
300            HotShotEvent::EpochRootQcFormed(..) => {
301                // Epoch root QC is always not in epoch transition
302                next_epoch_qc_dependency.mark_as_completed(event.clone());
303                qc_dependency.mark_as_completed(event);
304            },
305            HotShotEvent::ViewSyncFinalizeCertificateRecv(_) => {
306                view_sync_dependency.mark_as_completed(event);
307            },
308            HotShotEvent::VidDisperseSend(..) => {
309                vid_share_dependency.mark_as_completed(event);
310            },
311            HotShotEvent::NextEpochQc2Formed(Either::Left(_)) => {
312                next_epoch_qc_dependency.mark_as_completed(event);
313            },
314            _ => {},
315        };
316
317        // We have three cases to consider:
318        let mut secondary_deps = vec![
319            // 1. A timeout cert was received
320            AndDependency::from_deps(vec![timeout_dependency]),
321            // 2. A view sync cert was received.
322            AndDependency::from_deps(vec![view_sync_dependency]),
323        ];
324        // 3. A `Qc2Formed`` event (and `QuorumProposalRecv` event)
325        if *view_number > 1 {
326            secondary_deps.push(AndDependency::from_deps(vec![
327                qc_dependency,
328                proposal_dependency,
329                next_epoch_qc_dependency,
330            ]));
331        } else {
332            secondary_deps.push(AndDependency::from_deps(vec![qc_dependency]));
333        }
334
335        let primary_deps = vec![payload_commitment_dependency, vid_share_dependency];
336
337        AndDependency::from_deps(vec![OrDependency::from_deps(vec![
338            AndDependency::from_deps(vec![
339                OrDependency::from_deps(vec![AndDependency::from_deps(primary_deps)]),
340                OrDependency::from_deps(secondary_deps),
341            ]),
342        ])])
343    }
344
345    /// Create and store an [`AndDependency`] combining [`EventDependency`]s associated with the
346    /// given view number if it doesn't exist. Also takes in the received `event` to seed a
347    /// dependency as already completed. This allows for the task to receive a proposable event
348    /// without losing the data that it received, as the dependency task would otherwise have no
349    /// ability to receive the event and, thus, would never propose.
350    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create dependency task", level = "error")]
351    async fn create_dependency_task_if_new(
352        &mut self,
353        view_number: TYPES::View,
354        epoch_number: Option<TYPES::Epoch>,
355        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
356        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
357        event: Arc<HotShotEvent<TYPES>>,
358        epoch_transition_indicator: EpochTransitionIndicator,
359    ) -> Result<()> {
360        let epoch_membership = self
361            .membership_coordinator
362            .membership_for_epoch(epoch_number)
363            .await?;
364        let leader_in_current_epoch =
365            epoch_membership.leader(view_number).await? == self.public_key;
366        // If we are in the epoch transition and we are the leader in the next epoch,
367        // we might want to start collecting dependencies for our next epoch proposal.
368
369        let leader_in_next_epoch = !leader_in_current_epoch
370            && epoch_number.is_some()
371            && matches!(
372                epoch_transition_indicator,
373                EpochTransitionIndicator::InTransition
374            )
375            && epoch_membership
376                .next_epoch()
377                .await
378                .context(warn!(
379                    "Missing the randomized stake table for epoch {}",
380                    epoch_number.unwrap() + 1
381                ))?
382                .leader(view_number)
383                .await?
384                == self.public_key;
385
386        // Don't even bother making the task if we are not entitled to propose anyway.
387        ensure!(
388            leader_in_current_epoch || leader_in_next_epoch,
389            debug!("We are not the leader of the next view")
390        );
391
392        // Don't try to propose twice for the same view.
393        ensure!(
394            view_number > self.latest_proposed_view,
395            "We have already proposed for this view"
396        );
397
398        tracing::debug!(
399            "Attempting to make dependency task for view {view_number} and event {event:?}"
400        );
401
402        ensure!(
403            !self.proposal_dependencies.contains_key(&view_number),
404            "Task already exists"
405        );
406
407        let (cancel_sender, cancel_receiver) = broadcast(1);
408
409        let dependency_chain = self.create_and_complete_dependencies(
410            view_number,
411            &event_receiver,
412            event,
413            &cancel_receiver,
414        );
415
416        let dependency_task = DependencyTask::new(
417            dependency_chain,
418            ProposalDependencyHandle {
419                latest_proposed_view: self.latest_proposed_view,
420                view_number,
421                sender: event_sender,
422                receiver: event_receiver,
423                membership: epoch_membership,
424                public_key: self.public_key.clone(),
425                private_key: self.private_key.clone(),
426                instance_state: Arc::clone(&self.instance_state),
427                consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
428                timeout: self.timeout,
429                formed_upgrade_certificate: self.formed_upgrade_certificate.clone(),
430                upgrade_lock: self.upgrade_lock.clone(),
431                id: self.id,
432                view_start_time: Instant::now(),
433                epoch_height: self.epoch_height,
434                cancel_receiver,
435            },
436        );
437        self.proposal_dependencies
438            .insert(view_number, cancel_sender);
439
440        dependency_task.run();
441
442        Ok(())
443    }
444
445    /// Update the latest proposed view number.
446    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Update latest proposed view", level = "error")]
447    async fn update_latest_proposed_view(&mut self, new_view: TYPES::View) -> bool {
448        if *self.latest_proposed_view < *new_view {
449            tracing::debug!(
450                "Updating latest proposed view from {} to {}",
451                *self.latest_proposed_view,
452                *new_view
453            );
454
455            // Cancel the old dependency tasks.
456            for view in (*self.latest_proposed_view + 1)..=(*new_view) {
457                let maybe_cancel_sender =
458                    self.proposal_dependencies.remove(&TYPES::View::new(view));
459                if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
460                    tracing::error!("Aborting proposal dependency task for view {view}");
461                    let _ = maybe_cancel_sender.unwrap().try_broadcast(());
462                }
463            }
464
465            self.latest_proposed_view = new_view;
466
467            return true;
468        }
469        false
470    }
471
472    /// Handles a consensus event received on the event stream
473    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view, epoch = self.cur_epoch.map(|x| *x)), name = "handle method", level = "error", target = "QuorumProposalTaskState")]
474    pub async fn handle(
475        &mut self,
476        event: Arc<HotShotEvent<TYPES>>,
477        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
478        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
479    ) -> Result<()> {
480        let epoch_number = self.cur_epoch;
481        let maybe_high_qc_block_number = self.consensus.read().await.high_qc().data.block_number;
482        let epoch_transition_indicator = if maybe_high_qc_block_number.is_some_and(|bn| {
483            is_epoch_transition(bn, self.epoch_height) && !is_last_block(bn, self.epoch_height)
484        }) {
485            EpochTransitionIndicator::InTransition
486        } else {
487            EpochTransitionIndicator::NotInTransition
488        };
489        match event.as_ref() {
490            HotShotEvent::UpgradeCertificateFormed(cert) => {
491                tracing::debug!(
492                    "Upgrade certificate received for view {}!",
493                    *cert.view_number
494                );
495                // Update our current upgrade_cert as long as we still have a chance of reaching a decide on it in time.
496                if cert.data.decide_by >= self.latest_proposed_view + 3 {
497                    tracing::debug!("Updating current formed_upgrade_certificate");
498
499                    self.formed_upgrade_certificate = Some(cert.clone());
500                }
501            },
502            HotShotEvent::Qc2Formed(cert) => match cert.clone() {
503                either::Right(timeout_cert) => {
504                    let view_number = timeout_cert.view_number + 1;
505                    self.create_dependency_task_if_new(
506                        view_number,
507                        epoch_number,
508                        event_receiver,
509                        event_sender,
510                        Arc::clone(&event),
511                        epoch_transition_indicator,
512                    )
513                    .await?;
514                },
515                either::Left(qc) => {
516                    // Only update if the qc is from a newer view
517                    if qc.view_number <= self.consensus.read().await.high_qc().view_number {
518                        tracing::trace!(
519                            "Received a QC for a view that was not > than our current high QC"
520                        );
521                    }
522
523                    self.formed_quorum_certificates
524                        .insert(qc.view_number(), qc.clone());
525
526                    handle_eqc_formed(
527                        qc.view_number(),
528                        qc.data.leaf_commit,
529                        qc.data.block_number,
530                        self,
531                        &event_sender,
532                    )
533                    .await;
534
535                    let view_number = qc.view_number() + 1;
536                    if !qc
537                        .data
538                        .block_number
539                        .is_some_and(|bn| is_last_block(bn, self.epoch_height))
540                    {
541                        broadcast_view_change(
542                            &event_sender,
543                            view_number,
544                            qc.data.epoch,
545                            self.first_epoch,
546                        )
547                        .await;
548                    }
549                    self.create_dependency_task_if_new(
550                        view_number,
551                        epoch_number,
552                        event_receiver,
553                        event_sender,
554                        Arc::clone(&event),
555                        epoch_transition_indicator,
556                    )
557                    .await?;
558                },
559            },
560
561            HotShotEvent::EpochRootQcFormed(EpochRootQuorumCertificateV2 { qc, state_cert }) => {
562                // Only update if the qc is from a newer view
563                if qc.view_number() <= self.consensus.read().await.high_qc().view_number {
564                    tracing::trace!(
565                        "Received a QC for a view that was not > than our current high QC"
566                    );
567                }
568
569                self.formed_quorum_certificates
570                    .insert(qc.view_number(), qc.clone());
571                self.formed_state_cert
572                    .insert(state_cert.epoch, state_cert.clone());
573
574                self.storage
575                    .update_high_qc2_and_state_cert(qc.clone(), state_cert.clone())
576                    .await
577                    .wrap()
578                    .context(error!(
579                        "Failed to update the epoch root QC and state cert in storage!"
580                    ))?;
581
582                let view_number = qc.view_number() + 1;
583                broadcast_view_change(&event_sender, view_number, qc.data.epoch, self.first_epoch)
584                    .await;
585                self.create_dependency_task_if_new(
586                    view_number,
587                    epoch_number,
588                    event_receiver,
589                    event_sender,
590                    Arc::clone(&event),
591                    epoch_transition_indicator,
592                )
593                .await?;
594            },
595            HotShotEvent::SendPayloadCommitmentAndMetadata(
596                _payload_commitment,
597                _builder_commitment,
598                _metadata,
599                view_number,
600                _fee,
601            ) => {
602                let view_number = *view_number;
603
604                self.create_dependency_task_if_new(
605                    view_number,
606                    epoch_number,
607                    event_receiver,
608                    event_sender,
609                    Arc::clone(&event),
610                    epoch_transition_indicator,
611                )
612                .await?;
613            },
614            HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
615                let epoch_number = certificate.data.epoch;
616                let epoch_membership = self
617                    .membership_coordinator
618                    .stake_table_for_epoch(epoch_number)
619                    .await
620                    .context(warn!("No Stake Table for Epoch = {epoch_number:?}"))?;
621
622                let membership_stake_table = epoch_membership.stake_table().await;
623                let membership_success_threshold = epoch_membership.success_threshold().await;
624
625                certificate
626                    .is_valid_cert(
627                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
628                        membership_success_threshold,
629                        &self.upgrade_lock,
630                    )
631                    .await
632                    .context(|e| {
633                        warn!(
634                            "View Sync Finalize certificate {:?} was invalid: {}",
635                            certificate.data(),
636                            e
637                        )
638                    })?;
639
640                let view_number = certificate.view_number;
641
642                self.create_dependency_task_if_new(
643                    view_number,
644                    epoch_number,
645                    event_receiver,
646                    event_sender,
647                    event,
648                    epoch_transition_indicator,
649                )
650                .await?;
651            },
652            HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
653                let view_number = proposal.data.view_number();
654                // All nodes get the latest proposed view as a proxy of `cur_view` of old.
655                if !self.update_latest_proposed_view(view_number).await {
656                    tracing::trace!("Failed to update latest proposed view");
657                }
658
659                self.create_dependency_task_if_new(
660                    view_number + 1,
661                    epoch_number,
662                    event_receiver,
663                    event_sender,
664                    Arc::clone(&event),
665                    epoch_transition_indicator,
666                )
667                .await?;
668            },
669            HotShotEvent::QuorumProposalSend(proposal, _) => {
670                let view = proposal.data.view_number();
671
672                ensure!(
673                    self.update_latest_proposed_view(view).await,
674                    "Failed to update latest proposed view"
675                );
676            },
677            HotShotEvent::VidDisperseSend(vid_disperse, _) => {
678                let view_number = vid_disperse.data.view_number();
679                self.create_dependency_task_if_new(
680                    view_number,
681                    epoch_number,
682                    event_receiver,
683                    event_sender,
684                    Arc::clone(&event),
685                    epoch_transition_indicator,
686                )
687                .await?;
688            },
689            HotShotEvent::ViewChange(view, epoch) => {
690                if epoch > &self.cur_epoch {
691                    self.cur_epoch = *epoch;
692                }
693                let keep_view = TYPES::View::new(view.saturating_sub(1));
694                self.cancel_tasks(keep_view);
695            },
696            HotShotEvent::Timeout(view, ..) => {
697                let keep_view = TYPES::View::new(view.saturating_sub(1));
698                self.cancel_tasks(keep_view);
699            },
700            HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) => {
701                // Only update if the qc is from a newer view
702                let current_next_epoch_qc =
703                    self.consensus.read().await.next_epoch_high_qc().cloned();
704                ensure!(
705                    current_next_epoch_qc.is_none()
706                        || next_epoch_qc.view_number > current_next_epoch_qc.unwrap().view_number,
707                    debug!(
708                        "Received a next epoch QC for a view that was not > than our current next \
709                         epoch high QC"
710                    )
711                );
712
713                self.formed_next_epoch_quorum_certificates
714                    .insert(next_epoch_qc.view_number(), next_epoch_qc.clone());
715
716                handle_eqc_formed(
717                    next_epoch_qc.view_number(),
718                    next_epoch_qc.data.leaf_commit,
719                    next_epoch_qc.data.block_number,
720                    self,
721                    &event_sender,
722                )
723                .await;
724
725                let view_number = next_epoch_qc.view_number() + 1;
726                self.create_dependency_task_if_new(
727                    view_number,
728                    epoch_number,
729                    event_receiver,
730                    event_sender,
731                    Arc::clone(&event),
732                    epoch_transition_indicator,
733                )
734                .await?;
735            },
736            HotShotEvent::SetFirstEpoch(view, epoch) => {
737                self.first_epoch = Some((*view, *epoch));
738            },
739            _ => {},
740        }
741        Ok(())
742    }
743
744    /// Cancel all tasks the consensus tasks has spawned before the given view
745    pub fn cancel_tasks(&mut self, view: TYPES::View) {
746        let keep = self.proposal_dependencies.split_off(&view);
747        while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
748            if !cancel_sender.is_closed() {
749                tracing::error!("Aborting proposal dependency task for view {view}");
750                let _ = cancel_sender.try_broadcast(());
751            }
752        }
753        self.proposal_dependencies = keep;
754    }
755}
756
757#[async_trait]
758impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
759    for QuorumProposalTaskState<TYPES, I, V>
760{
761    type Event = HotShotEvent<TYPES>;
762
763    async fn handle_event(
764        &mut self,
765        event: Arc<Self::Event>,
766        sender: &Sender<Arc<Self::Event>>,
767        receiver: &Receiver<Arc<Self::Event>>,
768    ) -> Result<()> {
769        self.handle(event, receiver.clone(), sender.clone()).await
770    }
771
772    fn cancel_subtasks(&mut self) {
773        while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
774            if !cancel_sender.is_closed() {
775                tracing::error!("Aborting proposal dependency task for view {view}");
776                let _ = cancel_sender.try_broadcast(());
777            }
778        }
779    }
780}