hotshot_task_impls/quorum_proposal/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, Receiver, Sender};
10use async_trait::async_trait;
11use either::Either;
12use hotshot_task::{
13    dependency::{AndDependency, EventDependency, OrDependency},
14    dependency_task::DependencyTask,
15    task::TaskState,
16};
17use hotshot_types::{
18    consensus::OuterConsensus,
19    epoch_membership::EpochMembershipCoordinator,
20    message::UpgradeLock,
21    simple_certificate::{
22        EpochRootQuorumCertificate, LightClientStateUpdateCertificate, NextEpochQuorumCertificate2,
23        QuorumCertificate2, UpgradeCertificate,
24    },
25    stake_table::StakeTableEntries,
26    traits::{
27        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
28        signature_key::SignatureKey,
29        storage::Storage,
30    },
31    utils::{is_epoch_transition, is_last_block, EpochTransitionIndicator},
32    vote::{Certificate, HasViewNumber},
33};
34use hotshot_utils::anytrace::*;
35use tracing::instrument;
36
37use self::handlers::{ProposalDependency, ProposalDependencyHandle};
38use crate::{
39    events::HotShotEvent, helpers::broadcast_view_change,
40    quorum_proposal::handlers::handle_eqc_formed,
41};
42
43mod handlers;
44
45/// The state for the quorum proposal task.
46pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
47    /// Latest view number that has been proposed for.
48    pub latest_proposed_view: TYPES::View,
49
50    /// Current epoch
51    pub cur_epoch: Option<TYPES::Epoch>,
52
53    /// Table for the in-progress proposal dependency tasks.
54    pub proposal_dependencies: BTreeMap<TYPES::View, Sender<()>>,
55
56    /// Formed QCs
57    pub formed_quorum_certificates: BTreeMap<TYPES::View, QuorumCertificate2<TYPES>>,
58
59    /// Formed QCs for the next epoch
60    pub formed_next_epoch_quorum_certificates:
61        BTreeMap<TYPES::View, NextEpochQuorumCertificate2<TYPES>>,
62
63    /// Immutable instance state
64    pub instance_state: Arc<TYPES::InstanceState>,
65
66    /// Membership for Quorum Certs/votes
67    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
68
69    /// Our public key
70    pub public_key: TYPES::SignatureKey,
71
72    /// Our Private Key
73    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
74
75    /// View timeout from config.
76    pub timeout: u64,
77
78    /// This node's storage ref
79    pub storage: I::Storage,
80
81    /// Shared consensus task state
82    pub consensus: OuterConsensus<TYPES>,
83
84    /// The node's id
85    pub id: u64,
86
87    /// The most recent upgrade certificate this node formed.
88    /// Note: this is ONLY for certificates that have been formed internally,
89    /// so that we can propose with them.
90    ///
91    /// Certificates received from other nodes will get reattached regardless of this fields,
92    /// since they will be present in the leaf we propose off of.
93    pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
94
95    /// Lock for a decided upgrade
96    pub upgrade_lock: UpgradeLock<TYPES, V>,
97
98    /// Number of blocks in an epoch, zero means there are no epochs
99    pub epoch_height: u64,
100
101    /// Formed light client state update certificates
102    pub formed_state_cert: BTreeMap<TYPES::Epoch, LightClientStateUpdateCertificate<TYPES>>,
103
104    /// First view in which epoch version takes effect
105    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
106}
107
108impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
109    QuorumProposalTaskState<TYPES, I, V>
110{
111    /// Create an event dependency
112    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create event dependency", level = "info")]
113    fn create_event_dependency(
114        &self,
115        dependency_type: ProposalDependency,
116        view_number: TYPES::View,
117        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
118        cancel_receiver: Receiver<()>,
119    ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
120        let id = self.id;
121        EventDependency::new(
122            event_receiver,
123            cancel_receiver,
124            format!(
125                "ProposalDependency::{:?} for view {:?}, my id {:?}",
126                dependency_type, view_number, self.id
127            ),
128            Box::new(move |event| {
129                let event = event.as_ref();
130                let event_view = match dependency_type {
131                    ProposalDependency::Qc => {
132                        if let HotShotEvent::Qc2Formed(either::Left(qc)) = event {
133                            qc.view_number() + 1
134                        } else if let HotShotEvent::EpochRootQcFormed(root_qc) = event {
135                            root_qc.view_number() + 1
136                        } else {
137                            return false;
138                        }
139                    },
140                    ProposalDependency::TimeoutCert => {
141                        if let HotShotEvent::Qc2Formed(either::Right(timeout)) = event {
142                            timeout.view_number() + 1
143                        } else {
144                            return false;
145                        }
146                    },
147                    ProposalDependency::ViewSyncCert => {
148                        if let HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_cert) = event
149                        {
150                            view_sync_cert.view_number()
151                        } else {
152                            return false;
153                        }
154                    },
155                    ProposalDependency::Proposal => {
156                        if let HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) = event
157                        {
158                            proposal.data.view_number() + 1
159                        } else {
160                            return false;
161                        }
162                    },
163                    ProposalDependency::PayloadAndMetadata => {
164                        if let HotShotEvent::SendPayloadCommitmentAndMetadata(
165                            _payload_commitment,
166                            _builder_commitment,
167                            _metadata,
168                            view_number,
169                            _fee,
170                        ) = event
171                        {
172                            *view_number
173                        } else {
174                            return false;
175                        }
176                    },
177                    ProposalDependency::VidShare => {
178                        if let HotShotEvent::VidDisperseSend(vid_disperse, _) = event {
179                            vid_disperse.data.view_number()
180                        } else {
181                            return false;
182                        }
183                    },
184                };
185                let valid = event_view == view_number;
186                if valid {
187                    tracing::debug!(
188                        "Dependency {dependency_type:?} is complete for view {event_view:?}, my id is {id:?}!",
189                    );
190                }
191                valid
192            }),
193        )
194    }
195
196    /// Creates the requisite dependencies for the Quorum Proposal task. It also handles any event forwarding.
197    fn create_and_complete_dependencies(
198        &self,
199        view_number: TYPES::View,
200        event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
201        event: Arc<HotShotEvent<TYPES>>,
202        cancel_receiver: &Receiver<()>,
203    ) -> AndDependency<Vec<Vec<Arc<HotShotEvent<TYPES>>>>> {
204        let mut proposal_dependency = self.create_event_dependency(
205            ProposalDependency::Proposal,
206            view_number,
207            event_receiver.clone(),
208            cancel_receiver.clone(),
209        );
210
211        let mut qc_dependency = self.create_event_dependency(
212            ProposalDependency::Qc,
213            view_number,
214            event_receiver.clone(),
215            cancel_receiver.clone(),
216        );
217
218        let mut view_sync_dependency = self.create_event_dependency(
219            ProposalDependency::ViewSyncCert,
220            view_number,
221            event_receiver.clone(),
222            cancel_receiver.clone(),
223        );
224
225        let mut timeout_dependency = self.create_event_dependency(
226            ProposalDependency::TimeoutCert,
227            view_number,
228            event_receiver.clone(),
229            cancel_receiver.clone(),
230        );
231
232        let mut payload_commitment_dependency = self.create_event_dependency(
233            ProposalDependency::PayloadAndMetadata,
234            view_number,
235            event_receiver.clone(),
236            cancel_receiver.clone(),
237        );
238
239        let mut vid_share_dependency = self.create_event_dependency(
240            ProposalDependency::VidShare,
241            view_number,
242            event_receiver.clone(),
243            cancel_receiver.clone(),
244        );
245
246        let epoch_height = self.epoch_height;
247
248        // Next epoch QC dependency is fulfilled if we get the next epoch QC or
249        // form a current qc that isn't during transition
250        let mut next_epoch_qc_dependency = EventDependency::new(
251            event_receiver.clone(),
252            cancel_receiver.clone(),
253            format!(
254                "ProposalDependency Next epoch QC for view {:?}, my id {:?}",
255                view_number, self.id
256            ),
257            Box::new(move |event| {
258                if let HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) =
259                    event.as_ref()
260                {
261                    return next_epoch_qc.view_number() + 1 == view_number;
262                }
263                if let HotShotEvent::EpochRootQcFormed(..) = event.as_ref() {
264                    // Epoch root QC is always not in epoch transition
265                    return true;
266                }
267                if let HotShotEvent::Qc2Formed(Either::Left(qc)) = event.as_ref() {
268                    if qc.view_number() + 1 == view_number {
269                        return qc
270                            .data
271                            .block_number
272                            .is_none_or(|bn| !is_epoch_transition(bn, epoch_height));
273                    }
274                }
275                false
276            }),
277        );
278
279        match event.as_ref() {
280            HotShotEvent::SendPayloadCommitmentAndMetadata(..) => {
281                payload_commitment_dependency.mark_as_completed(Arc::clone(&event));
282            },
283            HotShotEvent::QuorumProposalPreliminarilyValidated(..) => {
284                proposal_dependency.mark_as_completed(event);
285            },
286            HotShotEvent::Qc2Formed(quorum_certificate) => match quorum_certificate {
287                Either::Right(_) => timeout_dependency.mark_as_completed(event),
288                Either::Left(qc) => {
289                    if qc
290                        .data
291                        .block_number
292                        .is_none_or(|bn| !is_epoch_transition(bn, epoch_height))
293                    {
294                        next_epoch_qc_dependency.mark_as_completed(event.clone());
295                    }
296                    qc_dependency.mark_as_completed(event);
297                },
298            },
299            HotShotEvent::EpochRootQcFormed(..) => {
300                // Epoch root QC is always not in epoch transition
301                next_epoch_qc_dependency.mark_as_completed(event.clone());
302                qc_dependency.mark_as_completed(event);
303            },
304            HotShotEvent::ViewSyncFinalizeCertificateRecv(_) => {
305                view_sync_dependency.mark_as_completed(event);
306            },
307            HotShotEvent::VidDisperseSend(..) => {
308                vid_share_dependency.mark_as_completed(event);
309            },
310            HotShotEvent::NextEpochQc2Formed(Either::Left(_)) => {
311                next_epoch_qc_dependency.mark_as_completed(event);
312            },
313            _ => {},
314        };
315
316        // We have three cases to consider:
317        let mut secondary_deps = vec![
318            // 1. A timeout cert was received
319            AndDependency::from_deps(vec![timeout_dependency]),
320            // 2. A view sync cert was received.
321            AndDependency::from_deps(vec![view_sync_dependency]),
322        ];
323        // 3. A `Qc2Formed`` event (and `QuorumProposalRecv` event)
324        if *view_number > 1 {
325            secondary_deps.push(AndDependency::from_deps(vec![
326                qc_dependency,
327                proposal_dependency,
328                next_epoch_qc_dependency,
329            ]));
330        } else {
331            secondary_deps.push(AndDependency::from_deps(vec![qc_dependency]));
332        }
333
334        let primary_deps = vec![payload_commitment_dependency, vid_share_dependency];
335
336        AndDependency::from_deps(vec![OrDependency::from_deps(vec![
337            AndDependency::from_deps(vec![
338                OrDependency::from_deps(vec![AndDependency::from_deps(primary_deps)]),
339                OrDependency::from_deps(secondary_deps),
340            ]),
341        ])])
342    }
343
344    /// Create and store an [`AndDependency`] combining [`EventDependency`]s associated with the
345    /// given view number if it doesn't exist. Also takes in the received `event` to seed a
346    /// dependency as already completed. This allows for the task to receive a proposable event
347    /// without losing the data that it received, as the dependency task would otherwise have no
348    /// ability to receive the event and, thus, would never propose.
349    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create dependency task", level = "error")]
350    async fn create_dependency_task_if_new(
351        &mut self,
352        view_number: TYPES::View,
353        epoch_number: Option<TYPES::Epoch>,
354        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
355        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
356        event: Arc<HotShotEvent<TYPES>>,
357        epoch_transition_indicator: EpochTransitionIndicator,
358    ) -> Result<()> {
359        let epoch_membership = self
360            .membership_coordinator
361            .membership_for_epoch(epoch_number)
362            .await?;
363        let leader_in_current_epoch =
364            epoch_membership.leader(view_number).await? == self.public_key;
365        // If we are in the epoch transition and we are the leader in the next epoch,
366        // we might want to start collecting dependencies for our next epoch proposal.
367
368        let leader_in_next_epoch = !leader_in_current_epoch
369            && epoch_number.is_some()
370            && matches!(
371                epoch_transition_indicator,
372                EpochTransitionIndicator::InTransition
373            )
374            && epoch_membership
375                .next_epoch()
376                .await
377                .context(warn!(
378                    "Missing the randomized stake table for epoch {}",
379                    epoch_number.unwrap() + 1
380                ))?
381                .leader(view_number)
382                .await?
383                == self.public_key;
384
385        // Don't even bother making the task if we are not entitled to propose anyway.
386        ensure!(
387            leader_in_current_epoch || leader_in_next_epoch,
388            debug!("We are not the leader of the next view")
389        );
390
391        // Don't try to propose twice for the same view.
392        ensure!(
393            view_number > self.latest_proposed_view,
394            "We have already proposed for this view"
395        );
396
397        tracing::debug!(
398            "Attempting to make dependency task for view {view_number} and event {event:?}"
399        );
400
401        ensure!(
402            !self.proposal_dependencies.contains_key(&view_number),
403            "Task already exists"
404        );
405
406        let (cancel_sender, cancel_receiver) = broadcast(1);
407
408        let dependency_chain = self.create_and_complete_dependencies(
409            view_number,
410            &event_receiver,
411            event,
412            &cancel_receiver,
413        );
414
415        let dependency_task = DependencyTask::new(
416            dependency_chain,
417            ProposalDependencyHandle {
418                latest_proposed_view: self.latest_proposed_view,
419                view_number,
420                sender: event_sender,
421                receiver: event_receiver,
422                membership: epoch_membership,
423                public_key: self.public_key.clone(),
424                private_key: self.private_key.clone(),
425                instance_state: Arc::clone(&self.instance_state),
426                consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
427                timeout: self.timeout,
428                formed_upgrade_certificate: self.formed_upgrade_certificate.clone(),
429                upgrade_lock: self.upgrade_lock.clone(),
430                id: self.id,
431                view_start_time: Instant::now(),
432                epoch_height: self.epoch_height,
433            },
434        );
435        self.proposal_dependencies
436            .insert(view_number, cancel_sender);
437
438        dependency_task.run();
439
440        Ok(())
441    }
442
443    /// Update the latest proposed view number.
444    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Update latest proposed view", level = "error")]
445    async fn update_latest_proposed_view(&mut self, new_view: TYPES::View) -> bool {
446        if *self.latest_proposed_view < *new_view {
447            tracing::debug!(
448                "Updating latest proposed view from {} to {}",
449                *self.latest_proposed_view,
450                *new_view
451            );
452
453            // Cancel the old dependency tasks.
454            for view in (*self.latest_proposed_view + 1)..=(*new_view) {
455                let maybe_cancel_sender =
456                    self.proposal_dependencies.remove(&TYPES::View::new(view));
457                if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
458                    tracing::error!("Aborting proposal dependency task for view {view}");
459                    let _ = maybe_cancel_sender.unwrap().try_broadcast(());
460                }
461            }
462
463            self.latest_proposed_view = new_view;
464
465            return true;
466        }
467        false
468    }
469
470    /// Handles a consensus event received on the event stream
471    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view, epoch = self.cur_epoch.map(|x| *x)), name = "handle method", level = "error", target = "QuorumProposalTaskState")]
472    pub async fn handle(
473        &mut self,
474        event: Arc<HotShotEvent<TYPES>>,
475        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
476        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
477    ) -> Result<()> {
478        let epoch_number = self.cur_epoch;
479        let maybe_high_qc_block_number = self.consensus.read().await.high_qc().data.block_number;
480        let epoch_transition_indicator = if maybe_high_qc_block_number.is_some_and(|bn| {
481            is_epoch_transition(bn, self.epoch_height) && !is_last_block(bn, self.epoch_height)
482        }) {
483            EpochTransitionIndicator::InTransition
484        } else {
485            EpochTransitionIndicator::NotInTransition
486        };
487        match event.as_ref() {
488            HotShotEvent::UpgradeCertificateFormed(cert) => {
489                tracing::debug!(
490                    "Upgrade certificate received for view {}!",
491                    *cert.view_number
492                );
493                // Update our current upgrade_cert as long as we still have a chance of reaching a decide on it in time.
494                if cert.data.decide_by >= self.latest_proposed_view + 3 {
495                    tracing::debug!("Updating current formed_upgrade_certificate");
496
497                    self.formed_upgrade_certificate = Some(cert.clone());
498                }
499            },
500            HotShotEvent::Qc2Formed(cert) => match cert.clone() {
501                either::Right(timeout_cert) => {
502                    let view_number = timeout_cert.view_number + 1;
503                    self.create_dependency_task_if_new(
504                        view_number,
505                        epoch_number,
506                        event_receiver,
507                        event_sender,
508                        Arc::clone(&event),
509                        epoch_transition_indicator,
510                    )
511                    .await?;
512                },
513                either::Left(qc) => {
514                    // Only update if the qc is from a newer view
515                    if qc.view_number <= self.consensus.read().await.high_qc().view_number {
516                        tracing::trace!(
517                            "Received a QC for a view that was not > than our current high QC"
518                        );
519                    }
520
521                    self.formed_quorum_certificates
522                        .insert(qc.view_number(), qc.clone());
523
524                    handle_eqc_formed(
525                        qc.view_number(),
526                        qc.data.leaf_commit,
527                        qc.data.block_number,
528                        self,
529                        &event_sender,
530                    )
531                    .await;
532
533                    let view_number = qc.view_number() + 1;
534                    if !qc
535                        .data
536                        .block_number
537                        .is_some_and(|bn| is_last_block(bn, self.epoch_height))
538                    {
539                        broadcast_view_change(
540                            &event_sender,
541                            view_number,
542                            qc.data.epoch,
543                            self.first_epoch,
544                        )
545                        .await;
546                    }
547                    self.create_dependency_task_if_new(
548                        view_number,
549                        epoch_number,
550                        event_receiver,
551                        event_sender,
552                        Arc::clone(&event),
553                        epoch_transition_indicator,
554                    )
555                    .await?;
556                },
557            },
558
559            HotShotEvent::EpochRootQcFormed(EpochRootQuorumCertificate { qc, state_cert }) => {
560                // Only update if the qc is from a newer view
561                if qc.view_number() <= self.consensus.read().await.high_qc().view_number {
562                    tracing::trace!(
563                        "Received a QC for a view that was not > than our current high QC"
564                    );
565                }
566
567                self.formed_quorum_certificates
568                    .insert(qc.view_number(), qc.clone());
569                self.formed_state_cert
570                    .insert(state_cert.epoch, state_cert.clone());
571
572                self.storage
573                    .update_high_qc2_and_state_cert(qc.clone(), state_cert.clone())
574                    .await
575                    .wrap()
576                    .context(error!(
577                        "Failed to update the epoch root QC and state cert in storage!"
578                    ))?;
579
580                let view_number = qc.view_number() + 1;
581                broadcast_view_change(&event_sender, view_number, qc.data.epoch, self.first_epoch)
582                    .await;
583                self.create_dependency_task_if_new(
584                    view_number,
585                    epoch_number,
586                    event_receiver,
587                    event_sender,
588                    Arc::clone(&event),
589                    epoch_transition_indicator,
590                )
591                .await?;
592            },
593            HotShotEvent::SendPayloadCommitmentAndMetadata(
594                _payload_commitment,
595                _builder_commitment,
596                _metadata,
597                view_number,
598                _fee,
599            ) => {
600                let view_number = *view_number;
601
602                self.create_dependency_task_if_new(
603                    view_number,
604                    epoch_number,
605                    event_receiver,
606                    event_sender,
607                    Arc::clone(&event),
608                    epoch_transition_indicator,
609                )
610                .await?;
611            },
612            HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
613                let epoch_number = certificate.data.epoch;
614                let epoch_membership = self
615                    .membership_coordinator
616                    .stake_table_for_epoch(epoch_number)
617                    .await
618                    .context(warn!("No Stake Table for Epoch = {epoch_number:?}"))?;
619
620                let membership_stake_table = epoch_membership.stake_table().await;
621                let membership_success_threshold = epoch_membership.success_threshold().await;
622
623                certificate
624                    .is_valid_cert(
625                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
626                        membership_success_threshold,
627                        &self.upgrade_lock,
628                    )
629                    .await
630                    .context(|e| {
631                        warn!(
632                            "View Sync Finalize certificate {:?} was invalid: {}",
633                            certificate.data(),
634                            e
635                        )
636                    })?;
637
638                let view_number = certificate.view_number;
639
640                self.create_dependency_task_if_new(
641                    view_number,
642                    epoch_number,
643                    event_receiver,
644                    event_sender,
645                    event,
646                    epoch_transition_indicator,
647                )
648                .await?;
649            },
650            HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
651                let view_number = proposal.data.view_number();
652                // All nodes get the latest proposed view as a proxy of `cur_view` of old.
653                if !self.update_latest_proposed_view(view_number).await {
654                    tracing::trace!("Failed to update latest proposed view");
655                }
656
657                self.create_dependency_task_if_new(
658                    view_number + 1,
659                    epoch_number,
660                    event_receiver,
661                    event_sender,
662                    Arc::clone(&event),
663                    epoch_transition_indicator,
664                )
665                .await?;
666            },
667            HotShotEvent::QuorumProposalSend(proposal, _) => {
668                let view = proposal.data.view_number();
669
670                ensure!(
671                    self.update_latest_proposed_view(view).await,
672                    "Failed to update latest proposed view"
673                );
674            },
675            HotShotEvent::VidDisperseSend(vid_disperse, _) => {
676                let view_number = vid_disperse.data.view_number();
677                self.create_dependency_task_if_new(
678                    view_number,
679                    epoch_number,
680                    event_receiver,
681                    event_sender,
682                    Arc::clone(&event),
683                    epoch_transition_indicator,
684                )
685                .await?;
686            },
687            HotShotEvent::ViewChange(view, epoch) => {
688                if epoch > &self.cur_epoch {
689                    self.cur_epoch = *epoch;
690                }
691                let keep_view = TYPES::View::new(view.saturating_sub(1));
692                self.cancel_tasks(keep_view);
693            },
694            HotShotEvent::Timeout(view, ..) => {
695                let keep_view = TYPES::View::new(view.saturating_sub(1));
696                self.cancel_tasks(keep_view);
697            },
698            HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) => {
699                // Only update if the qc is from a newer view
700                let current_next_epoch_qc =
701                    self.consensus.read().await.next_epoch_high_qc().cloned();
702                ensure!(current_next_epoch_qc.is_none() ||
703                    next_epoch_qc.view_number > current_next_epoch_qc.unwrap().view_number,
704                    debug!("Received a next epoch QC for a view that was not > than our current next epoch high QC")
705                );
706
707                self.formed_next_epoch_quorum_certificates
708                    .insert(next_epoch_qc.view_number(), next_epoch_qc.clone());
709
710                handle_eqc_formed(
711                    next_epoch_qc.view_number(),
712                    next_epoch_qc.data.leaf_commit,
713                    next_epoch_qc.data.block_number,
714                    self,
715                    &event_sender,
716                )
717                .await;
718
719                let view_number = next_epoch_qc.view_number() + 1;
720                self.create_dependency_task_if_new(
721                    view_number,
722                    epoch_number,
723                    event_receiver,
724                    event_sender,
725                    Arc::clone(&event),
726                    epoch_transition_indicator,
727                )
728                .await?;
729            },
730            HotShotEvent::SetFirstEpoch(view, epoch) => {
731                self.first_epoch = Some((*view, *epoch));
732            },
733            _ => {},
734        }
735        Ok(())
736    }
737
738    /// Cancel all tasks the consensus tasks has spawned before the given view
739    pub fn cancel_tasks(&mut self, view: TYPES::View) {
740        let keep = self.proposal_dependencies.split_off(&view);
741        while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
742            if !cancel_sender.is_closed() {
743                tracing::error!("Aborting proposal dependency task for view {view}");
744                let _ = cancel_sender.try_broadcast(());
745            }
746        }
747        self.proposal_dependencies = keep;
748    }
749}
750
751#[async_trait]
752impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
753    for QuorumProposalTaskState<TYPES, I, V>
754{
755    type Event = HotShotEvent<TYPES>;
756
757    async fn handle_event(
758        &mut self,
759        event: Arc<Self::Event>,
760        sender: &Sender<Arc<Self::Event>>,
761        receiver: &Receiver<Arc<Self::Event>>,
762    ) -> Result<()> {
763        self.handle(event, receiver.clone(), sender.clone()).await
764    }
765
766    fn cancel_subtasks(&mut self) {
767        while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
768            if !cancel_sender.is_closed() {
769                tracing::error!("Aborting proposal dependency task for view {view}");
770                let _ = cancel_sender.try_broadcast(());
771            }
772        }
773    }
774}