hotshot_task_impls/quorum_vote/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{InactiveReceiver, Receiver, Sender, broadcast};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::{
13    dependency::{AndDependency, EventDependency},
14    dependency_task::{DependencyTask, HandleDepOutput},
15    task::TaskState,
16};
17use hotshot_types::{
18    VersionedDaCommittee,
19    consensus::{ConsensusMetricsValue, OuterConsensus},
20    data::{EpochNumber, Leaf2, ViewNumber, vid_disperse::vid_total_weight},
21    epoch_membership::EpochMembershipCoordinator,
22    event::Event,
23    message::UpgradeLock,
24    simple_vote::HasEpoch,
25    stake_table::StakeTableEntries,
26    storage_metrics::StorageMetricsValue,
27    traits::{
28        block_contents::BlockHeader,
29        node_implementation::{NodeImplementation, NodeType},
30        signature_key::{SignatureKey, StateSignatureKey},
31        storage::Storage,
32    },
33    utils::{is_epoch_root, is_epoch_transition, is_last_block, option_epoch_from_block_number},
34    vote::{Certificate, HasViewNumber},
35};
36use hotshot_utils::anytrace::*;
37use tracing::instrument;
38
39use crate::{
40    events::HotShotEvent,
41    helpers::{broadcast_event, broadcast_view_change, wait_for_second_vid_share},
42    quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state},
43};
44
45/// Event handlers for `QuorumProposalValidated`.
46mod handlers;
47
48/// Vote dependency types.
49#[derive(Debug, PartialEq)]
50enum VoteDependency {
51    /// For the `QuorumProposalValidated` event after validating `QuorumProposalRecv`.
52    QuorumProposal,
53    /// For the `DaCertificateRecv` event.
54    Dac,
55    /// For the `VidShareRecv` event.
56    Vid,
57}
58
59/// Handler for the vote dependency.
60pub struct VoteDependencyHandle<TYPES: NodeType, I: NodeImplementation<TYPES>> {
61    /// Public key.
62    pub public_key: TYPES::SignatureKey,
63
64    /// Private Key.
65    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
66
67    /// Reference to consensus. The replica will require a write lock on this.
68    pub consensus: OuterConsensus<TYPES>,
69
70    /// Immutable instance state
71    pub instance_state: Arc<TYPES::InstanceState>,
72
73    /// Membership for Quorum certs/votes.
74    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
75
76    /// Reference to the storage.
77    pub storage: I::Storage,
78
79    /// Storage metrics
80    pub storage_metrics: Arc<StorageMetricsValue>,
81
82    /// View number to vote on.
83    pub view_number: ViewNumber,
84
85    /// Event sender.
86    pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
87
88    /// Event receiver.
89    pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
90
91    /// Lock for a decided upgrade
92    pub upgrade_lock: UpgradeLock<TYPES>,
93
94    /// The consensus metrics
95    pub consensus_metrics: Arc<ConsensusMetricsValue>,
96
97    /// The node's id
98    pub id: u64,
99
100    /// Number of blocks in an epoch, zero means there are no epochs
101    pub epoch_height: u64,
102
103    /// Signature key for light client state
104    pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106    /// First view in which epoch version takes effect
107    pub first_epoch: Option<(ViewNumber, EpochNumber)>,
108
109    /// Stake table capacity for light client use
110    pub stake_table_capacity: usize,
111
112    pub cancel_receiver: Receiver<()>,
113}
114
115impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> HandleDepOutput
116    for VoteDependencyHandle<TYPES, I>
117{
118    type Output = Vec<Arc<HotShotEvent<TYPES>>>;
119
120    #[allow(clippy::too_many_lines)]
121    #[instrument(skip_all, fields(id = self.id, view = *self.view_number))]
122    async fn handle_dep_result(self, res: Self::Output) {
123        let mut cancel_receiver = self.cancel_receiver.clone();
124        let result = tokio::select! { result = self.handle_vote_deps(&res) => {
125            result
126        }
127        _ = cancel_receiver.recv() => {
128            tracing::warn!("Vote dependency task cancelled");
129            return;
130        }
131        };
132        if result.is_err() {
133            log!(result);
134            self.print_vote_events(&res)
135        }
136    }
137}
138
139impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> VoteDependencyHandle<TYPES, I> {
140    fn print_vote_events(&self, res: &[Arc<HotShotEvent<TYPES>>]) {
141        let events: Vec<_> = res.iter().map(Arc::as_ref).collect();
142        tracing::warn!("Failed to vote, events: {:?}", events);
143    }
144
145    async fn handle_vote_deps(&self, res: &[Arc<HotShotEvent<TYPES>>]) -> Result<()> {
146        let mut payload_commitment = None;
147        let mut next_epoch_payload_commitment = None;
148        let mut leaf = None;
149        let mut vid_share = None;
150        let mut da_cert = None;
151        let mut parent_view_number = None;
152        for event in res.iter() {
153            match event.as_ref() {
154                #[allow(unused_assignments)]
155                HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
156                    let proposal_payload_comm = proposal.data.block_header().payload_commitment();
157                    let parent_commitment = parent_leaf.commit();
158                    let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
159
160                    if let Some(ref comm) = payload_commitment {
161                        ensure!(
162                            proposal_payload_comm == *comm,
163                            error!(
164                                "Quorum proposal has inconsistent payload commitment with DAC or \
165                                 VID."
166                            )
167                        );
168                    } else {
169                        payload_commitment = Some(proposal_payload_comm);
170                    }
171
172                    ensure!(
173                        proposed_leaf.parent_commitment() == parent_commitment,
174                        warn!(
175                            "Proposed leaf parent commitment does not match parent leaf payload \
176                             commitment. Aborting vote."
177                        )
178                    );
179
180                    let now = Instant::now();
181                    // Update our persistent storage of the proposal. If we cannot store the proposal return
182                    // and error so we don't vote
183                    self.storage
184                        .append_proposal_wrapper(proposal)
185                        .await
186                        .map_err(|e| {
187                            error!("failed to store proposal, not voting.  error = {e:#}")
188                        })?;
189                    self.storage_metrics
190                        .append_quorum_duration
191                        .add_point(now.elapsed().as_secs_f64());
192
193                    leaf = Some(proposed_leaf);
194                    parent_view_number = Some(parent_leaf.view_number());
195                },
196                HotShotEvent::DaCertificateValidated(cert) => {
197                    let cert_payload_comm = &cert.data().payload_commit;
198                    let next_epoch_cert_payload_comm = cert.data().next_epoch_payload_commit;
199                    if let Some(ref comm) = payload_commitment {
200                        ensure!(
201                            cert_payload_comm == comm,
202                            error!(
203                                "DAC has inconsistent payload commitment with quorum proposal or \
204                                 VID."
205                            )
206                        );
207                    } else {
208                        payload_commitment = Some(*cert_payload_comm);
209                    }
210                    if next_epoch_payload_commitment.is_some()
211                        && next_epoch_payload_commitment != next_epoch_cert_payload_comm
212                    {
213                        bail!(error!(
214                            "DAC has inconsistent next epoch payload commitment with VID."
215                        ));
216                    } else {
217                        next_epoch_payload_commitment = next_epoch_cert_payload_comm;
218                    }
219                    da_cert = Some(cert.clone());
220                },
221                HotShotEvent::VidShareValidated(share) => {
222                    let vid_payload_commitment = &share.data.payload_commitment();
223                    vid_share = Some(share.clone());
224                    let is_next_epoch_vid = share.data.epoch() != share.data.target_epoch();
225                    if is_next_epoch_vid {
226                        if let Some(ref comm) = next_epoch_payload_commitment {
227                            ensure!(
228                                vid_payload_commitment == comm,
229                                error!(
230                                    "VID has inconsistent next epoch payload commitment with DAC."
231                                )
232                            );
233                        } else {
234                            next_epoch_payload_commitment = Some(*vid_payload_commitment);
235                        }
236                    } else if let Some(ref comm) = payload_commitment {
237                        ensure!(
238                            vid_payload_commitment == comm,
239                            error!(
240                                "VID has inconsistent payload commitment with quorum proposal or \
241                                 DAC."
242                            )
243                        );
244                    } else {
245                        payload_commitment = Some(*vid_payload_commitment);
246                    }
247                },
248                _ => {},
249            }
250        }
251
252        let Some(vid_share) = vid_share else {
253            bail!(error!(
254                "We don't have the VID share for this view {}, but we should, because the vote \
255                 dependencies have completed.",
256                self.view_number
257            ));
258        };
259
260        let Some(leaf) = leaf else {
261            bail!(error!(
262                "We don't have the leaf for this view {}, but we should, because the vote \
263                 dependencies have completed.",
264                self.view_number
265            ));
266        };
267
268        let Some(da_cert) = da_cert else {
269            bail!(error!(
270                "We don't have the DA cert for this view {}, but we should, because the vote \
271                 dependencies have completed.",
272                self.view_number
273            ));
274        };
275
276        let mut maybe_current_epoch_vid_share = None;
277        // If this is an epoch transition block, we might need two VID shares.
278        if self.upgrade_lock.epochs_enabled(leaf.view_number())
279            && is_epoch_transition(leaf.block_header().block_number(), self.epoch_height)
280        {
281            let current_epoch = option_epoch_from_block_number(
282                leaf.with_epoch,
283                leaf.block_header().block_number(),
284                self.epoch_height,
285            );
286            let next_epoch = current_epoch.map(|e| e + 1);
287
288            let Ok(current_epoch_membership) = self
289                .membership_coordinator
290                .stake_table_for_epoch(current_epoch)
291                .await
292            else {
293                bail!(warn!(
294                    "Couldn't acquire current epoch membership. Do not vote!"
295                ));
296            };
297            let Ok(next_epoch_membership) = self
298                .membership_coordinator
299                .stake_table_for_epoch(next_epoch)
300                .await
301            else {
302                bail!(warn!(
303                    "Couldn't acquire next epoch membership. Do not vote!"
304                ));
305            };
306
307            // If we belong to both epochs, we require VID shares from both epochs.
308            if current_epoch_membership.has_stake(&self.public_key).await
309                && next_epoch_membership.has_stake(&self.public_key).await
310            {
311                let other_target_epoch = if vid_share.data.target_epoch() == current_epoch {
312                    maybe_current_epoch_vid_share = Some(vid_share.clone());
313                    next_epoch
314                } else {
315                    current_epoch
316                };
317                match wait_for_second_vid_share(
318                    other_target_epoch,
319                    &vid_share,
320                    &da_cert,
321                    &self.consensus,
322                    &self.receiver.activate_cloned(),
323                    self.cancel_receiver.clone(),
324                    self.id,
325                )
326                .await
327                {
328                    Ok(other_vid_share) => {
329                        if maybe_current_epoch_vid_share.is_none() {
330                            maybe_current_epoch_vid_share = Some(other_vid_share);
331                        }
332                        ensure!(
333                            leaf.block_header().payload_commitment()
334                                == maybe_current_epoch_vid_share
335                                    .as_ref()
336                                    .unwrap()
337                                    .data
338                                    .payload_commitment(),
339                            error!(
340                                "We have both epochs vid shares but the leaf's vid commit doesn't \
341                                 match the old epoch vid share's commit. It should never happen."
342                            )
343                        );
344                    },
345                    Err(e) => {
346                        bail!(warn!(
347                            "This is an epoch transition block, we are in both epochs but we \
348                             received only one VID share. Do not vote! Error: {e:?}"
349                        ));
350                    },
351                }
352            }
353        }
354
355        // Update internal state
356        update_shared_state::<TYPES>(
357            OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
358            self.sender.clone(),
359            self.receiver.clone(),
360            self.membership_coordinator.clone(),
361            self.public_key.clone(),
362            self.private_key.clone(),
363            self.upgrade_lock.clone(),
364            self.view_number,
365            Arc::clone(&self.instance_state),
366            &leaf,
367            maybe_current_epoch_vid_share.as_ref().unwrap_or(&vid_share),
368            parent_view_number,
369            self.epoch_height,
370        )
371        .await
372        .context(error!("Failed to update shared consensus state"))?;
373
374        let cur_epoch =
375            option_epoch_from_block_number(leaf.with_epoch, leaf.height(), self.epoch_height);
376
377        let now = Instant::now();
378        // We use this `epoch_membership` to vote,
379        // meaning that we must know the leader for the current view in the current epoch
380        // and must therefore perform the full DRB catchup.
381        let epoch_membership = self
382            .membership_coordinator
383            .membership_for_epoch(cur_epoch)
384            .await?;
385
386        let duration = now.elapsed();
387        tracing::info!("membership_for_epoch time: {duration:?}");
388
389        let is_vote_leaf_extended = is_last_block(leaf.height(), self.epoch_height);
390        let is_vote_epoch_root = is_epoch_root(leaf.height(), self.epoch_height);
391        if cur_epoch.is_none() || !is_vote_leaf_extended {
392            // We're voting for the proposal that will probably form the eQC. We don't want to change
393            // the view here because we will probably change it when we form the eQC.
394            // The main reason is to handle view change event only once in the transaction task.
395            broadcast_view_change(
396                &self.sender,
397                leaf.view_number() + 1,
398                cur_epoch,
399                self.first_epoch,
400            )
401            .await;
402        }
403
404        let leader = epoch_membership.leader(self.view_number).await;
405        if let (Ok(leader_key), Some(cur_epoch)) = (leader, cur_epoch) {
406            self.consensus
407                .write()
408                .await
409                .update_validator_participation(leader_key, cur_epoch, true);
410        }
411
412        submit_vote::<TYPES, I>(
413            self.sender.clone(),
414            epoch_membership,
415            self.public_key.clone(),
416            self.private_key.clone(),
417            self.upgrade_lock.clone(),
418            self.view_number,
419            self.storage.clone(),
420            Arc::clone(&self.storage_metrics),
421            leaf,
422            maybe_current_epoch_vid_share.unwrap_or(vid_share),
423            is_vote_leaf_extended,
424            is_vote_epoch_root,
425            self.epoch_height,
426            &self.state_private_key,
427            self.stake_table_capacity,
428        )
429        .await
430    }
431}
432
433/// The state for the quorum vote task.
434///
435/// Contains all of the information for the quorum vote.
436pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
437    /// Public key.
438    pub public_key: TYPES::SignatureKey,
439
440    /// Private Key.
441    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
442
443    /// Reference to consensus. The replica will require a write lock on this.
444    pub consensus: OuterConsensus<TYPES>,
445
446    /// Immutable instance state
447    pub instance_state: Arc<TYPES::InstanceState>,
448
449    /// Latest view number that has been voted for.
450    pub latest_voted_view: ViewNumber,
451
452    /// Table for the in-progress dependency tasks.
453    pub vote_dependencies: BTreeMap<ViewNumber, Sender<()>>,
454
455    /// The underlying network
456    pub network: Arc<I::Network>,
457
458    /// Membership for Quorum certs/votes and DA committee certs/votes.
459    pub membership: EpochMembershipCoordinator<TYPES>,
460
461    /// Output events to application
462    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
463
464    /// The node's id
465    pub id: u64,
466
467    /// The consensus metrics
468    pub consensus_metrics: Arc<ConsensusMetricsValue>,
469
470    /// Reference to the storage.
471    pub storage: I::Storage,
472
473    /// Storage metrics
474    pub storage_metrics: Arc<StorageMetricsValue>,
475
476    /// Lock for a decided upgrade
477    pub upgrade_lock: UpgradeLock<TYPES>,
478
479    /// Number of blocks in an epoch, zero means there are no epochs
480    pub epoch_height: u64,
481
482    /// Signature key for light client state
483    pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
484
485    /// First view in which epoch version takes effect
486    pub first_epoch: Option<(ViewNumber, EpochNumber)>,
487
488    /// Stake table capacity for light client use
489    pub stake_table_capacity: usize,
490
491    /// DA committees from HotShotConfig, to apply when an upgrade is decided
492    pub da_committees: Vec<VersionedDaCommittee<TYPES>>,
493}
494
495impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumVoteTaskState<TYPES, I> {
496    /// Create an event dependency.
497    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create event dependency", level = "error")]
498    fn create_event_dependency(
499        &self,
500        dependency_type: VoteDependency,
501        view_number: ViewNumber,
502        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
503        cancel_receiver: Receiver<()>,
504    ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
505        let id = self.id;
506        EventDependency::new(
507            event_receiver,
508            cancel_receiver,
509            format!(
510                "VoteDependency::{:?} for view {:?}, my id {:?}",
511                dependency_type, view_number, self.id
512            ),
513            Box::new(move |event| {
514                let event = event.as_ref();
515                let event_view = match dependency_type {
516                    VoteDependency::QuorumProposal => {
517                        if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
518                            proposal.data.view_number()
519                        } else {
520                            return false;
521                        }
522                    },
523                    VoteDependency::Dac => {
524                        if let HotShotEvent::DaCertificateValidated(cert) = event {
525                            cert.view_number
526                        } else {
527                            return false;
528                        }
529                    },
530                    VoteDependency::Vid => {
531                        if let HotShotEvent::VidShareValidated(disperse) = event {
532                            disperse.data.view_number()
533                        } else {
534                            return false;
535                        }
536                    },
537                };
538                if event_view == view_number {
539                    tracing::debug!(
540                        "Vote dependency {dependency_type:?} completed for view {view_number}, my \
541                         id is {id}"
542                    );
543                    return true;
544                }
545                false
546            }),
547        )
548    }
549
550    /// Create and store an [`AndDependency`] combining [`EventDependency`]s associated with the
551    /// given view number if it doesn't exist.
552    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create dependency task if new", level = "error")]
553    fn create_dependency_task_if_new(
554        &mut self,
555        view_number: ViewNumber,
556        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
557        event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
558        event: Arc<HotShotEvent<TYPES>>,
559    ) {
560        tracing::debug!(
561            "Attempting to make dependency task for view {view_number} and event {event:?}"
562        );
563
564        if self.vote_dependencies.contains_key(&view_number) {
565            return;
566        }
567
568        let (cancel_sender, cancel_receiver) = broadcast(1);
569
570        let mut quorum_proposal_dependency = self.create_event_dependency(
571            VoteDependency::QuorumProposal,
572            view_number,
573            event_receiver.clone(),
574            cancel_receiver.clone(),
575        );
576        let dac_dependency = self.create_event_dependency(
577            VoteDependency::Dac,
578            view_number,
579            event_receiver.clone(),
580            cancel_receiver.clone(),
581        );
582        let vid_dependency = self.create_event_dependency(
583            VoteDependency::Vid,
584            view_number,
585            event_receiver.clone(),
586            cancel_receiver.clone(),
587        );
588        // If we have an event provided to us
589        if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
590            quorum_proposal_dependency.mark_as_completed(event);
591        }
592
593        let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency];
594
595        let dependency_chain = AndDependency::from_deps(deps);
596
597        let dependency_task = DependencyTask::new(
598            dependency_chain,
599            VoteDependencyHandle::<TYPES, I> {
600                public_key: self.public_key.clone(),
601                private_key: self.private_key.clone(),
602                consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
603                instance_state: Arc::clone(&self.instance_state),
604                membership_coordinator: self.membership.clone(),
605                storage: self.storage.clone(),
606                storage_metrics: Arc::clone(&self.storage_metrics),
607                view_number,
608                sender: event_sender.clone(),
609                receiver: event_receiver.clone().deactivate(),
610                upgrade_lock: self.upgrade_lock.clone(),
611                id: self.id,
612                epoch_height: self.epoch_height,
613                consensus_metrics: Arc::clone(&self.consensus_metrics),
614                state_private_key: self.state_private_key.clone(),
615                first_epoch: self.first_epoch,
616                stake_table_capacity: self.stake_table_capacity,
617                cancel_receiver,
618            },
619        );
620        self.vote_dependencies.insert(view_number, cancel_sender);
621
622        dependency_task.run();
623    }
624
625    /// Update the latest voted view number.
626    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote update latest voted view", level = "error")]
627    async fn update_latest_voted_view(&mut self, new_view: ViewNumber) -> bool {
628        if *self.latest_voted_view < *new_view {
629            tracing::debug!(
630                "Updating next vote view from {} to {} in the quorum vote task",
631                *self.latest_voted_view,
632                *new_view
633            );
634
635            // Cancel the old dependency tasks.
636            for view in *self.latest_voted_view..(*new_view) {
637                let maybe_cancel_sender = self.vote_dependencies.remove(&ViewNumber::new(view));
638                if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
639                    tracing::warn!("Aborting vote dependency task for view {view}");
640                    let _ = maybe_cancel_sender.unwrap().try_broadcast(());
641                }
642            }
643
644            // Update the metric for the last voted view
645            if let Ok(last_voted_view_usize) = usize::try_from(*new_view) {
646                self.consensus_metrics
647                    .last_voted_view
648                    .set(last_voted_view_usize);
649            } else {
650                tracing::warn!("Failed to convert last voted view to a usize: {new_view}");
651            }
652
653            self.latest_voted_view = new_view;
654
655            return true;
656        }
657        false
658    }
659
660    /// Handle a vote dependent event received on the event stream
661    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote handle", level = "error", target = "QuorumVoteTaskState")]
662    pub async fn handle(
663        &mut self,
664        event: Arc<HotShotEvent<TYPES>>,
665        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
666        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
667    ) -> Result<()> {
668        match event.as_ref() {
669            HotShotEvent::QuorumProposalValidated(proposal, _parent_leaf) => {
670                tracing::trace!(
671                    "Received Proposal for view {}",
672                    *proposal.data.view_number()
673                );
674
675                // Handle the event before creating the dependency task.
676                if let Err(e) =
677                    handle_quorum_proposal_validated(&proposal.data, self, &event_sender).await
678                {
679                    tracing::debug!(
680                        "Failed to handle QuorumProposalValidated event; error = {e:#}"
681                    );
682                }
683
684                ensure!(
685                    proposal.data.view_number() > self.latest_voted_view,
686                    "We have already voted for this view"
687                );
688
689                self.create_dependency_task_if_new(
690                    proposal.data.view_number(),
691                    event_receiver,
692                    &event_sender,
693                    Arc::clone(&event),
694                );
695            },
696            HotShotEvent::DaCertificateRecv(cert) => {
697                let view = cert.view_number;
698
699                tracing::trace!("Received DAC for view {view}");
700                // Do nothing if the DAC is old
701                ensure!(
702                    view > self.latest_voted_view,
703                    "Received DAC for an older view."
704                );
705
706                let cert_epoch = cert.data.epoch;
707
708                let epoch_membership = self.membership.stake_table_for_epoch(cert_epoch).await?;
709                let membership_da_stake_table = epoch_membership.da_stake_table().await;
710                let membership_da_success_threshold = epoch_membership.da_success_threshold().await;
711
712                // Validate the DAC.
713                cert.is_valid_cert(
714                    &StakeTableEntries::<TYPES>::from(membership_da_stake_table).0,
715                    membership_da_success_threshold,
716                    &self.upgrade_lock,
717                )
718                .await
719                .context(|e| warn!("Invalid DAC: {e}"))?;
720
721                // Add to the storage.
722                self.consensus
723                    .write()
724                    .await
725                    .update_saved_da_certs(view, cert.clone());
726
727                broadcast_event(
728                    Arc::new(HotShotEvent::DaCertificateValidated(cert.clone())),
729                    &event_sender.clone(),
730                )
731                .await;
732                self.create_dependency_task_if_new(
733                    view,
734                    event_receiver,
735                    &event_sender,
736                    Arc::clone(&event),
737                );
738            },
739            HotShotEvent::VidShareRecv(sender, share) => {
740                let view = share.data.view_number();
741                // Do nothing if the VID share is old
742                tracing::trace!("Received VID share for view {view}");
743                ensure!(
744                    view > self.latest_voted_view,
745                    "Received VID share for an older view."
746                );
747
748                // Validate the VID share.
749                let payload_commitment = share.data.payload_commitment_ref();
750
751                // Check that the signature is valid
752                ensure!(
753                    sender.validate(&share.signature, payload_commitment.as_ref()),
754                    warn!(
755                        "VID share signature is invalid, sender: {}, signature: {:?}, \
756                         payload_commitment: {:?}",
757                        sender, share.signature, payload_commitment
758                    )
759                );
760
761                let vid_epoch = share.data.epoch();
762                let target_epoch = share.data.target_epoch();
763                let membership_reader = self.membership.membership_for_epoch(vid_epoch).await?;
764                // ensure that the VID share was sent by a DA member OR the view leader
765                ensure!(
766                    membership_reader
767                        .da_committee_members(view)
768                        .await
769                        .contains(sender)
770                        || *sender == membership_reader.leader(view).await?,
771                    "VID share was not sent by a DA member or the view leader."
772                );
773
774                let total_weight = vid_total_weight::<TYPES>(
775                    &self
776                        .membership
777                        .membership_for_epoch(target_epoch)
778                        .await?
779                        .stake_table()
780                        .await,
781                    target_epoch,
782                );
783
784                if !share.data.verify(total_weight) {
785                    bail!("Failed to verify VID share");
786                }
787
788                self.consensus
789                    .write()
790                    .await
791                    .update_vid_shares(view, share.clone());
792
793                ensure!(
794                    *share.data.recipient_key() == self.public_key,
795                    "Got a Valid VID share but it's not for our key"
796                );
797
798                broadcast_event(
799                    Arc::new(HotShotEvent::VidShareValidated(share.clone())),
800                    &event_sender.clone(),
801                )
802                .await;
803                self.create_dependency_task_if_new(
804                    view,
805                    event_receiver,
806                    &event_sender,
807                    Arc::clone(&event),
808                );
809            },
810            HotShotEvent::Timeout(view, ..) => {
811                let view = ViewNumber::new(view.saturating_sub(1));
812                // cancel old tasks
813                let current_tasks = self.vote_dependencies.split_off(&view);
814                while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
815                    if !cancel_sender.is_closed() {
816                        tracing::error!("Aborting vote dependency task for view {view}");
817                        let _ = cancel_sender.try_broadcast(());
818                    }
819                }
820                self.vote_dependencies = current_tasks;
821            },
822            &HotShotEvent::ViewChange(mut view, _) => {
823                view = ViewNumber::new(view.saturating_sub(1));
824                if !self.update_latest_voted_view(view).await {
825                    tracing::debug!("view not updated");
826                }
827                // cancel old tasks
828                let current_tasks = self.vote_dependencies.split_off(&view);
829                while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
830                    if !cancel_sender.is_closed() {
831                        tracing::error!("Aborting vote dependency task for view {view}");
832                        let _ = cancel_sender.try_broadcast(());
833                    }
834                }
835                self.vote_dependencies = current_tasks;
836            },
837            HotShotEvent::SetFirstEpoch(view, epoch) => {
838                self.first_epoch = Some((*view, *epoch));
839            },
840            _ => {},
841        }
842        Ok(())
843    }
844}
845
846#[async_trait]
847impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for QuorumVoteTaskState<TYPES, I> {
848    type Event = HotShotEvent<TYPES>;
849
850    async fn handle_event(
851        &mut self,
852        event: Arc<Self::Event>,
853        sender: &Sender<Arc<Self::Event>>,
854        receiver: &Receiver<Arc<Self::Event>>,
855    ) -> Result<()> {
856        self.handle(event, receiver.clone(), sender.clone()).await
857    }
858
859    fn cancel_subtasks(&mut self) {
860        while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
861            if !cancel_sender.is_closed() {
862                tracing::error!("Aborting vote dependency task for view {view}");
863                let _ = cancel_sender.try_broadcast(());
864            }
865        }
866    }
867}