hotshot_task_impls/quorum_proposal/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, Receiver, Sender};
10use async_trait::async_trait;
11use either::Either;
12use hotshot_task::{
13    dependency::{AndDependency, EventDependency, OrDependency},
14    dependency_task::DependencyTask,
15    task::TaskState,
16};
17use hotshot_types::{
18    consensus::OuterConsensus,
19    epoch_membership::EpochMembershipCoordinator,
20    message::UpgradeLock,
21    simple_certificate::{
22        EpochRootQuorumCertificate, LightClientStateUpdateCertificateV2,
23        NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
24    },
25    stake_table::StakeTableEntries,
26    traits::{
27        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
28        signature_key::SignatureKey,
29        storage::Storage,
30    },
31    utils::{is_epoch_transition, is_last_block, EpochTransitionIndicator},
32    vote::{Certificate, HasViewNumber},
33};
34use hotshot_utils::anytrace::*;
35use tracing::instrument;
36
37use self::handlers::{ProposalDependency, ProposalDependencyHandle};
38use crate::{
39    events::HotShotEvent, helpers::broadcast_view_change,
40    quorum_proposal::handlers::handle_eqc_formed,
41};
42
43mod handlers;
44
45/// The state for the quorum proposal task.
46pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
47    /// Latest view number that has been proposed for.
48    pub latest_proposed_view: TYPES::View,
49
50    /// Current epoch
51    pub cur_epoch: Option<TYPES::Epoch>,
52
53    /// Table for the in-progress proposal dependency tasks.
54    pub proposal_dependencies: BTreeMap<TYPES::View, Sender<()>>,
55
56    /// Formed QCs
57    pub formed_quorum_certificates: BTreeMap<TYPES::View, QuorumCertificate2<TYPES>>,
58
59    /// Formed QCs for the next epoch
60    pub formed_next_epoch_quorum_certificates:
61        BTreeMap<TYPES::View, NextEpochQuorumCertificate2<TYPES>>,
62
63    /// Immutable instance state
64    pub instance_state: Arc<TYPES::InstanceState>,
65
66    /// Membership for Quorum Certs/votes
67    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
68
69    /// Our public key
70    pub public_key: TYPES::SignatureKey,
71
72    /// Our Private Key
73    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
74
75    /// View timeout from config.
76    pub timeout: u64,
77
78    /// This node's storage ref
79    pub storage: I::Storage,
80
81    /// Shared consensus task state
82    pub consensus: OuterConsensus<TYPES>,
83
84    /// The node's id
85    pub id: u64,
86
87    /// The most recent upgrade certificate this node formed.
88    /// Note: this is ONLY for certificates that have been formed internally,
89    /// so that we can propose with them.
90    ///
91    /// Certificates received from other nodes will get reattached regardless of this fields,
92    /// since they will be present in the leaf we propose off of.
93    pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
94
95    /// Lock for a decided upgrade
96    pub upgrade_lock: UpgradeLock<TYPES, V>,
97
98    /// Number of blocks in an epoch, zero means there are no epochs
99    pub epoch_height: u64,
100
101    /// Formed light client state update certificates
102    pub formed_state_cert: BTreeMap<TYPES::Epoch, LightClientStateUpdateCertificateV2<TYPES>>,
103
104    /// First view in which epoch version takes effect
105    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
106}
107
108impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
109    QuorumProposalTaskState<TYPES, I, V>
110{
111    /// Create an event dependency
112    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create event dependency", level = "info")]
113    fn create_event_dependency(
114        &self,
115        dependency_type: ProposalDependency,
116        view_number: TYPES::View,
117        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
118        cancel_receiver: Receiver<()>,
119    ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
120        let id = self.id;
121        EventDependency::new(
122            event_receiver,
123            cancel_receiver,
124            format!(
125                "ProposalDependency::{:?} for view {:?}, my id {:?}",
126                dependency_type, view_number, self.id
127            ),
128            Box::new(move |event| {
129                let event = event.as_ref();
130                let event_view = match dependency_type {
131                    ProposalDependency::Qc => {
132                        if let HotShotEvent::Qc2Formed(either::Left(qc)) = event {
133                            qc.view_number() + 1
134                        } else if let HotShotEvent::EpochRootQcFormed(root_qc) = event {
135                            root_qc.view_number() + 1
136                        } else {
137                            return false;
138                        }
139                    },
140                    ProposalDependency::TimeoutCert => {
141                        if let HotShotEvent::Qc2Formed(either::Right(timeout)) = event {
142                            timeout.view_number() + 1
143                        } else {
144                            return false;
145                        }
146                    },
147                    ProposalDependency::ViewSyncCert => {
148                        if let HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_cert) = event
149                        {
150                            view_sync_cert.view_number()
151                        } else {
152                            return false;
153                        }
154                    },
155                    ProposalDependency::Proposal => {
156                        if let HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) = event
157                        {
158                            proposal.data.view_number() + 1
159                        } else {
160                            return false;
161                        }
162                    },
163                    ProposalDependency::PayloadAndMetadata => {
164                        if let HotShotEvent::SendPayloadCommitmentAndMetadata(
165                            _payload_commitment,
166                            _builder_commitment,
167                            _metadata,
168                            view_number,
169                            _fee,
170                        ) = event
171                        {
172                            *view_number
173                        } else {
174                            return false;
175                        }
176                    },
177                    ProposalDependency::VidShare => {
178                        if let HotShotEvent::VidDisperseSend(vid_disperse, _) = event {
179                            vid_disperse.data.view_number()
180                        } else {
181                            return false;
182                        }
183                    },
184                };
185                let valid = event_view == view_number;
186                if valid {
187                    tracing::debug!(
188                        "Dependency {dependency_type:?} is complete for view {event_view:?}, my \
189                         id is {id:?}!",
190                    );
191                }
192                valid
193            }),
194        )
195    }
196
197    /// Creates the requisite dependencies for the Quorum Proposal task. It also handles any event forwarding.
198    fn create_and_complete_dependencies(
199        &self,
200        view_number: TYPES::View,
201        event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
202        event: Arc<HotShotEvent<TYPES>>,
203        cancel_receiver: &Receiver<()>,
204    ) -> AndDependency<Vec<Vec<Arc<HotShotEvent<TYPES>>>>> {
205        let mut proposal_dependency = self.create_event_dependency(
206            ProposalDependency::Proposal,
207            view_number,
208            event_receiver.clone(),
209            cancel_receiver.clone(),
210        );
211
212        let mut qc_dependency = self.create_event_dependency(
213            ProposalDependency::Qc,
214            view_number,
215            event_receiver.clone(),
216            cancel_receiver.clone(),
217        );
218
219        let mut view_sync_dependency = self.create_event_dependency(
220            ProposalDependency::ViewSyncCert,
221            view_number,
222            event_receiver.clone(),
223            cancel_receiver.clone(),
224        );
225
226        let mut timeout_dependency = self.create_event_dependency(
227            ProposalDependency::TimeoutCert,
228            view_number,
229            event_receiver.clone(),
230            cancel_receiver.clone(),
231        );
232
233        let mut payload_commitment_dependency = self.create_event_dependency(
234            ProposalDependency::PayloadAndMetadata,
235            view_number,
236            event_receiver.clone(),
237            cancel_receiver.clone(),
238        );
239
240        let mut vid_share_dependency = self.create_event_dependency(
241            ProposalDependency::VidShare,
242            view_number,
243            event_receiver.clone(),
244            cancel_receiver.clone(),
245        );
246
247        let epoch_height = self.epoch_height;
248
249        // Next epoch QC dependency is fulfilled if we get the next epoch QC or
250        // form a current qc that isn't during transition
251        let mut next_epoch_qc_dependency = EventDependency::new(
252            event_receiver.clone(),
253            cancel_receiver.clone(),
254            format!(
255                "ProposalDependency Next epoch QC for view {:?}, my id {:?}",
256                view_number, self.id
257            ),
258            Box::new(move |event| {
259                if let HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) =
260                    event.as_ref()
261                {
262                    return next_epoch_qc.view_number() + 1 == view_number;
263                }
264                if let HotShotEvent::EpochRootQcFormed(..) = event.as_ref() {
265                    // Epoch root QC is always not in epoch transition
266                    return true;
267                }
268                if let HotShotEvent::Qc2Formed(Either::Left(qc)) = event.as_ref() {
269                    if qc.view_number() + 1 == view_number {
270                        return qc
271                            .data
272                            .block_number
273                            .is_none_or(|bn| !is_epoch_transition(bn, epoch_height));
274                    }
275                }
276                false
277            }),
278        );
279
280        match event.as_ref() {
281            HotShotEvent::SendPayloadCommitmentAndMetadata(..) => {
282                payload_commitment_dependency.mark_as_completed(Arc::clone(&event));
283            },
284            HotShotEvent::QuorumProposalPreliminarilyValidated(..) => {
285                proposal_dependency.mark_as_completed(event);
286            },
287            HotShotEvent::Qc2Formed(quorum_certificate) => match quorum_certificate {
288                Either::Right(_) => timeout_dependency.mark_as_completed(event),
289                Either::Left(qc) => {
290                    if qc
291                        .data
292                        .block_number
293                        .is_none_or(|bn| !is_epoch_transition(bn, epoch_height))
294                    {
295                        next_epoch_qc_dependency.mark_as_completed(event.clone());
296                    }
297                    qc_dependency.mark_as_completed(event);
298                },
299            },
300            HotShotEvent::EpochRootQcFormed(..) => {
301                // Epoch root QC is always not in epoch transition
302                next_epoch_qc_dependency.mark_as_completed(event.clone());
303                qc_dependency.mark_as_completed(event);
304            },
305            HotShotEvent::ViewSyncFinalizeCertificateRecv(_) => {
306                view_sync_dependency.mark_as_completed(event);
307            },
308            HotShotEvent::VidDisperseSend(..) => {
309                vid_share_dependency.mark_as_completed(event);
310            },
311            HotShotEvent::NextEpochQc2Formed(Either::Left(_)) => {
312                next_epoch_qc_dependency.mark_as_completed(event);
313            },
314            _ => {},
315        };
316
317        // We have three cases to consider:
318        let mut secondary_deps = vec![
319            // 1. A timeout cert was received
320            AndDependency::from_deps(vec![timeout_dependency]),
321            // 2. A view sync cert was received.
322            AndDependency::from_deps(vec![view_sync_dependency]),
323        ];
324        // 3. A `Qc2Formed`` event (and `QuorumProposalRecv` event)
325        if *view_number > 1 {
326            secondary_deps.push(AndDependency::from_deps(vec![
327                qc_dependency,
328                proposal_dependency,
329                next_epoch_qc_dependency,
330            ]));
331        } else {
332            secondary_deps.push(AndDependency::from_deps(vec![qc_dependency]));
333        }
334
335        let primary_deps = vec![payload_commitment_dependency, vid_share_dependency];
336
337        AndDependency::from_deps(vec![OrDependency::from_deps(vec![
338            AndDependency::from_deps(vec![
339                OrDependency::from_deps(vec![AndDependency::from_deps(primary_deps)]),
340                OrDependency::from_deps(secondary_deps),
341            ]),
342        ])])
343    }
344
345    /// Create and store an [`AndDependency`] combining [`EventDependency`]s associated with the
346    /// given view number if it doesn't exist. Also takes in the received `event` to seed a
347    /// dependency as already completed. This allows for the task to receive a proposable event
348    /// without losing the data that it received, as the dependency task would otherwise have no
349    /// ability to receive the event and, thus, would never propose.
350    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create dependency task", level = "error")]
351    async fn create_dependency_task_if_new(
352        &mut self,
353        view_number: TYPES::View,
354        epoch_number: Option<TYPES::Epoch>,
355        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
356        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
357        event: Arc<HotShotEvent<TYPES>>,
358        epoch_transition_indicator: EpochTransitionIndicator,
359    ) -> Result<()> {
360        let epoch_membership = self
361            .membership_coordinator
362            .membership_for_epoch(epoch_number)
363            .await?;
364        let leader_in_current_epoch =
365            epoch_membership.leader(view_number).await? == self.public_key;
366        // If we are in the epoch transition and we are the leader in the next epoch,
367        // we might want to start collecting dependencies for our next epoch proposal.
368
369        let leader_in_next_epoch = !leader_in_current_epoch
370            && epoch_number.is_some()
371            && matches!(
372                epoch_transition_indicator,
373                EpochTransitionIndicator::InTransition
374            )
375            && epoch_membership
376                .next_epoch()
377                .await
378                .context(warn!(
379                    "Missing the randomized stake table for epoch {}",
380                    epoch_number.unwrap() + 1
381                ))?
382                .leader(view_number)
383                .await?
384                == self.public_key;
385
386        // Don't even bother making the task if we are not entitled to propose anyway.
387        ensure!(
388            leader_in_current_epoch || leader_in_next_epoch,
389            debug!("We are not the leader of the next view")
390        );
391
392        // Don't try to propose twice for the same view.
393        ensure!(
394            view_number > self.latest_proposed_view,
395            "We have already proposed for this view"
396        );
397
398        tracing::debug!(
399            "Attempting to make dependency task for view {view_number} and event {event:?}"
400        );
401
402        ensure!(
403            !self.proposal_dependencies.contains_key(&view_number),
404            "Task already exists"
405        );
406
407        let (cancel_sender, cancel_receiver) = broadcast(1);
408
409        let dependency_chain = self.create_and_complete_dependencies(
410            view_number,
411            &event_receiver,
412            event,
413            &cancel_receiver,
414        );
415
416        let dependency_task = DependencyTask::new(
417            dependency_chain,
418            ProposalDependencyHandle {
419                latest_proposed_view: self.latest_proposed_view,
420                view_number,
421                sender: event_sender,
422                receiver: event_receiver,
423                membership: epoch_membership,
424                public_key: self.public_key.clone(),
425                private_key: self.private_key.clone(),
426                instance_state: Arc::clone(&self.instance_state),
427                consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
428                timeout: self.timeout,
429                formed_upgrade_certificate: self.formed_upgrade_certificate.clone(),
430                upgrade_lock: self.upgrade_lock.clone(),
431                id: self.id,
432                view_start_time: Instant::now(),
433                epoch_height: self.epoch_height,
434            },
435        );
436        self.proposal_dependencies
437            .insert(view_number, cancel_sender);
438
439        dependency_task.run();
440
441        Ok(())
442    }
443
444    /// Update the latest proposed view number.
445    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Update latest proposed view", level = "error")]
446    async fn update_latest_proposed_view(&mut self, new_view: TYPES::View) -> bool {
447        if *self.latest_proposed_view < *new_view {
448            tracing::debug!(
449                "Updating latest proposed view from {} to {}",
450                *self.latest_proposed_view,
451                *new_view
452            );
453
454            // Cancel the old dependency tasks.
455            for view in (*self.latest_proposed_view + 1)..=(*new_view) {
456                let maybe_cancel_sender =
457                    self.proposal_dependencies.remove(&TYPES::View::new(view));
458                if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
459                    tracing::error!("Aborting proposal dependency task for view {view}");
460                    let _ = maybe_cancel_sender.unwrap().try_broadcast(());
461                }
462            }
463
464            self.latest_proposed_view = new_view;
465
466            return true;
467        }
468        false
469    }
470
471    /// Handles a consensus event received on the event stream
472    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view, epoch = self.cur_epoch.map(|x| *x)), name = "handle method", level = "error", target = "QuorumProposalTaskState")]
473    pub async fn handle(
474        &mut self,
475        event: Arc<HotShotEvent<TYPES>>,
476        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
477        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
478    ) -> Result<()> {
479        let epoch_number = self.cur_epoch;
480        let maybe_high_qc_block_number = self.consensus.read().await.high_qc().data.block_number;
481        let epoch_transition_indicator = if maybe_high_qc_block_number.is_some_and(|bn| {
482            is_epoch_transition(bn, self.epoch_height) && !is_last_block(bn, self.epoch_height)
483        }) {
484            EpochTransitionIndicator::InTransition
485        } else {
486            EpochTransitionIndicator::NotInTransition
487        };
488        match event.as_ref() {
489            HotShotEvent::UpgradeCertificateFormed(cert) => {
490                tracing::debug!(
491                    "Upgrade certificate received for view {}!",
492                    *cert.view_number
493                );
494                // Update our current upgrade_cert as long as we still have a chance of reaching a decide on it in time.
495                if cert.data.decide_by >= self.latest_proposed_view + 3 {
496                    tracing::debug!("Updating current formed_upgrade_certificate");
497
498                    self.formed_upgrade_certificate = Some(cert.clone());
499                }
500            },
501            HotShotEvent::Qc2Formed(cert) => match cert.clone() {
502                either::Right(timeout_cert) => {
503                    let view_number = timeout_cert.view_number + 1;
504                    self.create_dependency_task_if_new(
505                        view_number,
506                        epoch_number,
507                        event_receiver,
508                        event_sender,
509                        Arc::clone(&event),
510                        epoch_transition_indicator,
511                    )
512                    .await?;
513                },
514                either::Left(qc) => {
515                    // Only update if the qc is from a newer view
516                    if qc.view_number <= self.consensus.read().await.high_qc().view_number {
517                        tracing::trace!(
518                            "Received a QC for a view that was not > than our current high QC"
519                        );
520                    }
521
522                    self.formed_quorum_certificates
523                        .insert(qc.view_number(), qc.clone());
524
525                    handle_eqc_formed(
526                        qc.view_number(),
527                        qc.data.leaf_commit,
528                        qc.data.block_number,
529                        self,
530                        &event_sender,
531                    )
532                    .await;
533
534                    let view_number = qc.view_number() + 1;
535                    if !qc
536                        .data
537                        .block_number
538                        .is_some_and(|bn| is_last_block(bn, self.epoch_height))
539                    {
540                        broadcast_view_change(
541                            &event_sender,
542                            view_number,
543                            qc.data.epoch,
544                            self.first_epoch,
545                        )
546                        .await;
547                    }
548                    self.create_dependency_task_if_new(
549                        view_number,
550                        epoch_number,
551                        event_receiver,
552                        event_sender,
553                        Arc::clone(&event),
554                        epoch_transition_indicator,
555                    )
556                    .await?;
557                },
558            },
559
560            HotShotEvent::EpochRootQcFormed(EpochRootQuorumCertificate { qc, state_cert }) => {
561                // Only update if the qc is from a newer view
562                if qc.view_number() <= self.consensus.read().await.high_qc().view_number {
563                    tracing::trace!(
564                        "Received a QC for a view that was not > than our current high QC"
565                    );
566                }
567
568                self.formed_quorum_certificates
569                    .insert(qc.view_number(), qc.clone());
570                self.formed_state_cert
571                    .insert(state_cert.epoch, state_cert.clone());
572
573                self.storage
574                    .update_high_qc2_and_state_cert(qc.clone(), state_cert.clone())
575                    .await
576                    .wrap()
577                    .context(error!(
578                        "Failed to update the epoch root QC and state cert in storage!"
579                    ))?;
580
581                let view_number = qc.view_number() + 1;
582                broadcast_view_change(&event_sender, view_number, qc.data.epoch, self.first_epoch)
583                    .await;
584                self.create_dependency_task_if_new(
585                    view_number,
586                    epoch_number,
587                    event_receiver,
588                    event_sender,
589                    Arc::clone(&event),
590                    epoch_transition_indicator,
591                )
592                .await?;
593            },
594            HotShotEvent::SendPayloadCommitmentAndMetadata(
595                _payload_commitment,
596                _builder_commitment,
597                _metadata,
598                view_number,
599                _fee,
600            ) => {
601                let view_number = *view_number;
602
603                self.create_dependency_task_if_new(
604                    view_number,
605                    epoch_number,
606                    event_receiver,
607                    event_sender,
608                    Arc::clone(&event),
609                    epoch_transition_indicator,
610                )
611                .await?;
612            },
613            HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
614                let epoch_number = certificate.data.epoch;
615                let epoch_membership = self
616                    .membership_coordinator
617                    .stake_table_for_epoch(epoch_number)
618                    .await
619                    .context(warn!("No Stake Table for Epoch = {epoch_number:?}"))?;
620
621                let membership_stake_table = epoch_membership.stake_table().await;
622                let membership_success_threshold = epoch_membership.success_threshold().await;
623
624                certificate
625                    .is_valid_cert(
626                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
627                        membership_success_threshold,
628                        &self.upgrade_lock,
629                    )
630                    .await
631                    .context(|e| {
632                        warn!(
633                            "View Sync Finalize certificate {:?} was invalid: {}",
634                            certificate.data(),
635                            e
636                        )
637                    })?;
638
639                let view_number = certificate.view_number;
640
641                self.create_dependency_task_if_new(
642                    view_number,
643                    epoch_number,
644                    event_receiver,
645                    event_sender,
646                    event,
647                    epoch_transition_indicator,
648                )
649                .await?;
650            },
651            HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
652                let view_number = proposal.data.view_number();
653                // All nodes get the latest proposed view as a proxy of `cur_view` of old.
654                if !self.update_latest_proposed_view(view_number).await {
655                    tracing::trace!("Failed to update latest proposed view");
656                }
657
658                self.create_dependency_task_if_new(
659                    view_number + 1,
660                    epoch_number,
661                    event_receiver,
662                    event_sender,
663                    Arc::clone(&event),
664                    epoch_transition_indicator,
665                )
666                .await?;
667            },
668            HotShotEvent::QuorumProposalSend(proposal, _) => {
669                let view = proposal.data.view_number();
670
671                ensure!(
672                    self.update_latest_proposed_view(view).await,
673                    "Failed to update latest proposed view"
674                );
675            },
676            HotShotEvent::VidDisperseSend(vid_disperse, _) => {
677                let view_number = vid_disperse.data.view_number();
678                self.create_dependency_task_if_new(
679                    view_number,
680                    epoch_number,
681                    event_receiver,
682                    event_sender,
683                    Arc::clone(&event),
684                    epoch_transition_indicator,
685                )
686                .await?;
687            },
688            HotShotEvent::ViewChange(view, epoch) => {
689                if epoch > &self.cur_epoch {
690                    self.cur_epoch = *epoch;
691                }
692                let keep_view = TYPES::View::new(view.saturating_sub(1));
693                self.cancel_tasks(keep_view);
694            },
695            HotShotEvent::Timeout(view, ..) => {
696                let keep_view = TYPES::View::new(view.saturating_sub(1));
697                self.cancel_tasks(keep_view);
698            },
699            HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) => {
700                // Only update if the qc is from a newer view
701                let current_next_epoch_qc =
702                    self.consensus.read().await.next_epoch_high_qc().cloned();
703                ensure!(
704                    current_next_epoch_qc.is_none()
705                        || next_epoch_qc.view_number > current_next_epoch_qc.unwrap().view_number,
706                    debug!(
707                        "Received a next epoch QC for a view that was not > than our current next \
708                         epoch high QC"
709                    )
710                );
711
712                self.formed_next_epoch_quorum_certificates
713                    .insert(next_epoch_qc.view_number(), next_epoch_qc.clone());
714
715                handle_eqc_formed(
716                    next_epoch_qc.view_number(),
717                    next_epoch_qc.data.leaf_commit,
718                    next_epoch_qc.data.block_number,
719                    self,
720                    &event_sender,
721                )
722                .await;
723
724                let view_number = next_epoch_qc.view_number() + 1;
725                self.create_dependency_task_if_new(
726                    view_number,
727                    epoch_number,
728                    event_receiver,
729                    event_sender,
730                    Arc::clone(&event),
731                    epoch_transition_indicator,
732                )
733                .await?;
734            },
735            HotShotEvent::SetFirstEpoch(view, epoch) => {
736                self.first_epoch = Some((*view, *epoch));
737            },
738            _ => {},
739        }
740        Ok(())
741    }
742
743    /// Cancel all tasks the consensus tasks has spawned before the given view
744    pub fn cancel_tasks(&mut self, view: TYPES::View) {
745        let keep = self.proposal_dependencies.split_off(&view);
746        while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
747            if !cancel_sender.is_closed() {
748                tracing::error!("Aborting proposal dependency task for view {view}");
749                let _ = cancel_sender.try_broadcast(());
750            }
751        }
752        self.proposal_dependencies = keep;
753    }
754}
755
756#[async_trait]
757impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
758    for QuorumProposalTaskState<TYPES, I, V>
759{
760    type Event = HotShotEvent<TYPES>;
761
762    async fn handle_event(
763        &mut self,
764        event: Arc<Self::Event>,
765        sender: &Sender<Arc<Self::Event>>,
766        receiver: &Receiver<Arc<Self::Event>>,
767    ) -> Result<()> {
768        self.handle(event, receiver.clone(), sender.clone()).await
769    }
770
771    fn cancel_subtasks(&mut self) {
772        while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
773            if !cancel_sender.is_closed() {
774                tracing::error!("Aborting proposal dependency task for view {view}");
775                let _ = cancel_sender.try_broadcast(());
776            }
777        }
778    }
779}