hotshot_task_impls/quorum_vote/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::{
13    dependency::{AndDependency, EventDependency},
14    dependency_task::{DependencyTask, HandleDepOutput},
15    task::TaskState,
16};
17use hotshot_types::{
18    consensus::{ConsensusMetricsValue, OuterConsensus},
19    data::{vid_disperse::vid_total_weight, Leaf2},
20    epoch_membership::EpochMembershipCoordinator,
21    event::Event,
22    message::UpgradeLock,
23    simple_vote::HasEpoch,
24    stake_table::StakeTableEntries,
25    storage_metrics::StorageMetricsValue,
26    traits::{
27        block_contents::BlockHeader,
28        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
29        signature_key::{SignatureKey, StateSignatureKey},
30        storage::Storage,
31    },
32    utils::{is_epoch_root, is_epoch_transition, is_last_block, option_epoch_from_block_number},
33    vote::{Certificate, HasViewNumber},
34};
35use hotshot_utils::anytrace::*;
36use tracing::instrument;
37
38use crate::{
39    events::HotShotEvent,
40    helpers::{broadcast_event, broadcast_view_change, wait_for_second_vid_share},
41    quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state},
42};
43
44/// Event handlers for `QuorumProposalValidated`.
45mod handlers;
46
47/// Vote dependency types.
48#[derive(Debug, PartialEq)]
49enum VoteDependency {
50    /// For the `QuorumProposalValidated` event after validating `QuorumProposalRecv`.
51    QuorumProposal,
52    /// For the `DaCertificateRecv` event.
53    Dac,
54    /// For the `VidShareRecv` event.
55    Vid,
56}
57
58/// Handler for the vote dependency.
59pub struct VoteDependencyHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
60    /// Public key.
61    pub public_key: TYPES::SignatureKey,
62
63    /// Private Key.
64    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
65
66    /// Reference to consensus. The replica will require a write lock on this.
67    pub consensus: OuterConsensus<TYPES>,
68
69    /// Immutable instance state
70    pub instance_state: Arc<TYPES::InstanceState>,
71
72    /// Membership for Quorum certs/votes.
73    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
74
75    /// Reference to the storage.
76    pub storage: I::Storage,
77
78    /// Storage metrics
79    pub storage_metrics: Arc<StorageMetricsValue>,
80
81    /// View number to vote on.
82    pub view_number: TYPES::View,
83
84    /// Event sender.
85    pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
86
87    /// Event receiver.
88    pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
89
90    /// Lock for a decided upgrade
91    pub upgrade_lock: UpgradeLock<TYPES, V>,
92
93    /// The consensus metrics
94    pub consensus_metrics: Arc<ConsensusMetricsValue>,
95
96    /// The node's id
97    pub id: u64,
98
99    /// Number of blocks in an epoch, zero means there are no epochs
100    pub epoch_height: u64,
101
102    /// Signature key for light client state
103    pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
104
105    /// First view in which epoch version takes effect
106    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
107
108    /// Stake table capacity for light client use
109    pub stake_table_capacity: usize,
110
111    pub cancel_receiver: Receiver<()>,
112}
113
114impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> HandleDepOutput
115    for VoteDependencyHandle<TYPES, I, V>
116{
117    type Output = Vec<Arc<HotShotEvent<TYPES>>>;
118
119    #[allow(clippy::too_many_lines)]
120    #[instrument(skip_all, fields(id = self.id, view = *self.view_number))]
121    async fn handle_dep_result(self, res: Self::Output) {
122        let result = self.handle_vote_deps(&res).await;
123        if result.is_err() {
124            log!(result);
125            self.print_vote_events(&res)
126        }
127    }
128}
129
130impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
131    VoteDependencyHandle<TYPES, I, V>
132{
133    fn print_vote_events(&self, res: &[Arc<HotShotEvent<TYPES>>]) {
134        let events: Vec<_> = res.iter().map(Arc::as_ref).collect();
135        tracing::warn!("Failed to vote, events: {:#?}", events);
136    }
137
138    async fn handle_vote_deps(&self, res: &[Arc<HotShotEvent<TYPES>>]) -> Result<()> {
139        let mut payload_commitment = None;
140        let mut next_epoch_payload_commitment = None;
141        let mut leaf = None;
142        let mut vid_share = None;
143        let mut da_cert = None;
144        let mut parent_view_number = None;
145        for event in res.iter() {
146            match event.as_ref() {
147                #[allow(unused_assignments)]
148                HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
149                    let proposal_payload_comm = proposal.data.block_header().payload_commitment();
150                    let parent_commitment = parent_leaf.commit();
151                    let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
152
153                    if let Some(ref comm) = payload_commitment {
154                        ensure!(proposal_payload_comm == *comm,
155                            error!("Quorum proposal has inconsistent payload commitment with DAC or VID."));
156                    } else {
157                        payload_commitment = Some(proposal_payload_comm);
158                    }
159
160                    ensure!(proposed_leaf.parent_commitment() == parent_commitment,
161                        warn!("Proposed leaf parent commitment does not match parent leaf payload commitment. Aborting vote."));
162
163                    let now = Instant::now();
164                    // Update our persistent storage of the proposal. If we cannot store the proposal return
165                    // and error so we don't vote
166                    self.storage
167                        .append_proposal_wrapper(proposal)
168                        .await
169                        .map_err(|e| {
170                            error!("failed to store proposal, not voting.  error = {e:#}")
171                        })?;
172                    self.storage_metrics
173                        .append_quorum_duration
174                        .add_point(now.elapsed().as_secs_f64());
175
176                    leaf = Some(proposed_leaf);
177                    parent_view_number = Some(parent_leaf.view_number());
178                },
179                HotShotEvent::DaCertificateValidated(cert) => {
180                    let cert_payload_comm = &cert.data().payload_commit;
181                    let next_epoch_cert_payload_comm = cert.data().next_epoch_payload_commit;
182                    if let Some(ref comm) = payload_commitment {
183                        ensure!(cert_payload_comm == comm, error!("DAC has inconsistent payload commitment with quorum proposal or VID."));
184                    } else {
185                        payload_commitment = Some(*cert_payload_comm);
186                    }
187                    if next_epoch_payload_commitment.is_some()
188                        && next_epoch_payload_commitment != next_epoch_cert_payload_comm
189                    {
190                        bail!(error!(
191                            "DAC has inconsistent next epoch payload commitment with VID."
192                        ));
193                    } else {
194                        next_epoch_payload_commitment = next_epoch_cert_payload_comm;
195                    }
196                    da_cert = Some(cert.clone());
197                },
198                HotShotEvent::VidShareValidated(share) => {
199                    let vid_payload_commitment = &share.data.payload_commitment();
200                    vid_share = Some(share.clone());
201                    let is_next_epoch_vid = share.data.epoch() != share.data.target_epoch();
202                    if is_next_epoch_vid {
203                        if let Some(ref comm) = next_epoch_payload_commitment {
204                            ensure!(
205                                vid_payload_commitment == comm,
206                                error!(
207                                    "VID has inconsistent next epoch payload commitment with DAC."
208                                )
209                            );
210                        } else {
211                            next_epoch_payload_commitment = Some(*vid_payload_commitment);
212                        }
213                    } else if let Some(ref comm) = payload_commitment {
214                        ensure!(vid_payload_commitment == comm, error!("VID has inconsistent payload commitment with quorum proposal or DAC."));
215                    } else {
216                        payload_commitment = Some(*vid_payload_commitment);
217                    }
218                },
219                _ => {},
220            }
221        }
222
223        let Some(vid_share) = vid_share else {
224            bail!(error!(
225                "We don't have the VID share for this view {}, but we should, because the vote dependencies have completed.",
226                self.view_number
227            ));
228        };
229
230        let Some(leaf) = leaf else {
231            bail!(error!(
232                "We don't have the leaf for this view {}, but we should, because the vote dependencies have completed.",
233                self.view_number
234            ));
235        };
236
237        let Some(da_cert) = da_cert else {
238            bail!(error!(
239                "We don't have the DA cert for this view {}, but we should, because the vote dependencies have completed.",
240                self.view_number
241            ));
242        };
243
244        let mut maybe_current_epoch_vid_share = None;
245        // If this is an epoch transition block, we might need two VID shares.
246        if self.upgrade_lock.epochs_enabled(leaf.view_number()).await
247            && is_epoch_transition(leaf.block_header().block_number(), self.epoch_height)
248        {
249            let current_epoch = option_epoch_from_block_number::<TYPES>(
250                leaf.with_epoch,
251                leaf.block_header().block_number(),
252                self.epoch_height,
253            );
254            let next_epoch = current_epoch.map(|e| e + 1);
255
256            let Ok(current_epoch_membership) = self
257                .membership_coordinator
258                .stake_table_for_epoch(current_epoch)
259                .await
260            else {
261                bail!(warn!(
262                    "Couldn't acquire current epoch membership. Do not vote!"
263                ));
264            };
265            let Ok(next_epoch_membership) = self
266                .membership_coordinator
267                .stake_table_for_epoch(next_epoch)
268                .await
269            else {
270                bail!(warn!(
271                    "Couldn't acquire next epoch membership. Do not vote!"
272                ));
273            };
274
275            // If we belong to both epochs, we require VID shares from both epochs.
276            if current_epoch_membership.has_stake(&self.public_key).await
277                && next_epoch_membership.has_stake(&self.public_key).await
278            {
279                let other_target_epoch = if vid_share.data.target_epoch() == current_epoch {
280                    maybe_current_epoch_vid_share = Some(vid_share.clone());
281                    next_epoch
282                } else {
283                    current_epoch
284                };
285                match wait_for_second_vid_share(
286                    other_target_epoch,
287                    &vid_share,
288                    &da_cert,
289                    &self.consensus,
290                    &self.receiver.activate_cloned(),
291                    self.cancel_receiver.clone(),
292                    self.id,
293                )
294                .await
295                {
296                    Ok(other_vid_share) => {
297                        if maybe_current_epoch_vid_share.is_none() {
298                            maybe_current_epoch_vid_share = Some(other_vid_share);
299                        }
300                        ensure!(
301                            leaf.block_header().payload_commitment()
302                                == maybe_current_epoch_vid_share
303                                    .as_ref()
304                                    .unwrap()
305                                    .data
306                                    .payload_commitment(),
307                            error!(
308                                "We have both epochs vid shares but the leaf's vid commit doesn't \
309                                match the old epoch vid share's commit. It should never happen."
310                            )
311                        );
312                    },
313                    Err(e) => {
314                        bail!(warn!(
315                            "This is an epoch transition block, we are in both epochs \
316                             but we received only one VID share. Do not vote! Error: {e:?}"
317                        ));
318                    },
319                }
320            }
321        }
322
323        // Update internal state
324        update_shared_state::<TYPES, I, V>(
325            OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
326            self.sender.clone(),
327            self.receiver.clone(),
328            self.membership_coordinator.clone(),
329            self.public_key.clone(),
330            self.private_key.clone(),
331            self.upgrade_lock.clone(),
332            self.view_number,
333            Arc::clone(&self.instance_state),
334            &leaf,
335            maybe_current_epoch_vid_share.as_ref().unwrap_or(&vid_share),
336            parent_view_number,
337            self.epoch_height,
338        )
339        .await
340        .context(error!("Failed to update shared consensus state"))?;
341
342        let cur_epoch = option_epoch_from_block_number::<TYPES>(
343            leaf.with_epoch,
344            leaf.height(),
345            self.epoch_height,
346        );
347
348        let now = Instant::now();
349        // We use this `epoch_membership` to vote,
350        // meaning that we must know the leader for the current view in the current epoch
351        // and must therefore perform the full DRB catchup.
352        let epoch_membership = self
353            .membership_coordinator
354            .membership_for_epoch(cur_epoch)
355            .await?;
356
357        let duration = now.elapsed();
358        tracing::info!("membership_for_epoch time: {duration:?}");
359
360        let is_vote_leaf_extended = is_last_block(leaf.height(), self.epoch_height);
361        let is_vote_epoch_root = is_epoch_root(leaf.height(), self.epoch_height);
362        if cur_epoch.is_none() || !is_vote_leaf_extended {
363            // We're voting for the proposal that will probably form the eQC. We don't want to change
364            // the view here because we will probably change it when we form the eQC.
365            // The main reason is to handle view change event only once in the transaction task.
366            broadcast_view_change(
367                &self.sender,
368                leaf.view_number() + 1,
369                cur_epoch,
370                self.first_epoch,
371            )
372            .await;
373        }
374
375        submit_vote::<TYPES, I, V>(
376            self.sender.clone(),
377            epoch_membership,
378            self.public_key.clone(),
379            self.private_key.clone(),
380            self.upgrade_lock.clone(),
381            self.view_number,
382            self.storage.clone(),
383            Arc::clone(&self.storage_metrics),
384            leaf,
385            maybe_current_epoch_vid_share.unwrap_or(vid_share),
386            is_vote_leaf_extended,
387            is_vote_epoch_root,
388            self.epoch_height,
389            &self.state_private_key,
390            self.stake_table_capacity,
391        )
392        .await
393    }
394}
395
396/// The state for the quorum vote task.
397///
398/// Contains all of the information for the quorum vote.
399pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
400    /// Public key.
401    pub public_key: TYPES::SignatureKey,
402
403    /// Private Key.
404    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
405
406    /// Reference to consensus. The replica will require a write lock on this.
407    pub consensus: OuterConsensus<TYPES>,
408
409    /// Immutable instance state
410    pub instance_state: Arc<TYPES::InstanceState>,
411
412    /// Latest view number that has been voted for.
413    pub latest_voted_view: TYPES::View,
414
415    /// Table for the in-progress dependency tasks.
416    pub vote_dependencies: BTreeMap<TYPES::View, Sender<()>>,
417
418    /// The underlying network
419    pub network: Arc<I::Network>,
420
421    /// Membership for Quorum certs/votes and DA committee certs/votes.
422    pub membership: EpochMembershipCoordinator<TYPES>,
423
424    /// Output events to application
425    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
426
427    /// The node's id
428    pub id: u64,
429
430    /// The consensus metrics
431    pub consensus_metrics: Arc<ConsensusMetricsValue>,
432
433    /// Reference to the storage.
434    pub storage: I::Storage,
435
436    /// Storage metrics
437    pub storage_metrics: Arc<StorageMetricsValue>,
438
439    /// Lock for a decided upgrade
440    pub upgrade_lock: UpgradeLock<TYPES, V>,
441
442    /// Number of blocks in an epoch, zero means there are no epochs
443    pub epoch_height: u64,
444
445    /// Signature key for light client state
446    pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
447
448    /// First view in which epoch version takes effect
449    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
450
451    /// Stake table capacity for light client use
452    pub stake_table_capacity: usize,
453}
454
455impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskState<TYPES, I, V> {
456    /// Create an event dependency.
457    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create event dependency", level = "error")]
458    fn create_event_dependency(
459        &self,
460        dependency_type: VoteDependency,
461        view_number: TYPES::View,
462        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
463        cancel_receiver: Receiver<()>,
464    ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
465        let id = self.id;
466        EventDependency::new(
467            event_receiver,
468            cancel_receiver,
469            format!(
470                "VoteDependency::{:?} for view {:?}, my id {:?}",
471                dependency_type, view_number, self.id
472            ),
473            Box::new(move |event| {
474                let event = event.as_ref();
475                let event_view = match dependency_type {
476                    VoteDependency::QuorumProposal => {
477                        if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
478                            proposal.data.view_number()
479                        } else {
480                            return false;
481                        }
482                    },
483                    VoteDependency::Dac => {
484                        if let HotShotEvent::DaCertificateValidated(cert) = event {
485                            cert.view_number
486                        } else {
487                            return false;
488                        }
489                    },
490                    VoteDependency::Vid => {
491                        if let HotShotEvent::VidShareValidated(disperse) = event {
492                            disperse.data.view_number()
493                        } else {
494                            return false;
495                        }
496                    },
497                };
498                if event_view == view_number {
499                    tracing::debug!(
500                        "Vote dependency {dependency_type:?} completed for view {view_number}, my id is {id}");
501                    return true;
502                }
503                false
504            }),
505        )
506    }
507
508    /// Create and store an [`AndDependency`] combining [`EventDependency`]s associated with the
509    /// given view number if it doesn't exist.
510    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote crete dependency task if new", level = "error")]
511    fn create_dependency_task_if_new(
512        &mut self,
513        view_number: TYPES::View,
514        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
515        event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
516        event: Arc<HotShotEvent<TYPES>>,
517    ) {
518        tracing::debug!(
519            "Attempting to make dependency task for view {view_number} and event {event:?}"
520        );
521
522        if self.vote_dependencies.contains_key(&view_number) {
523            return;
524        }
525
526        let (cancel_sender, cancel_receiver) = broadcast(1);
527
528        let mut quorum_proposal_dependency = self.create_event_dependency(
529            VoteDependency::QuorumProposal,
530            view_number,
531            event_receiver.clone(),
532            cancel_receiver.clone(),
533        );
534        let dac_dependency = self.create_event_dependency(
535            VoteDependency::Dac,
536            view_number,
537            event_receiver.clone(),
538            cancel_receiver.clone(),
539        );
540        let vid_dependency = self.create_event_dependency(
541            VoteDependency::Vid,
542            view_number,
543            event_receiver.clone(),
544            cancel_receiver.clone(),
545        );
546        // If we have an event provided to us
547        if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
548            quorum_proposal_dependency.mark_as_completed(event);
549        }
550
551        let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency];
552
553        let dependency_chain = AndDependency::from_deps(deps);
554
555        let dependency_task = DependencyTask::new(
556            dependency_chain,
557            VoteDependencyHandle::<TYPES, I, V> {
558                public_key: self.public_key.clone(),
559                private_key: self.private_key.clone(),
560                consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
561                instance_state: Arc::clone(&self.instance_state),
562                membership_coordinator: self.membership.clone(),
563                storage: self.storage.clone(),
564                storage_metrics: Arc::clone(&self.storage_metrics),
565                view_number,
566                sender: event_sender.clone(),
567                receiver: event_receiver.clone().deactivate(),
568                upgrade_lock: self.upgrade_lock.clone(),
569                id: self.id,
570                epoch_height: self.epoch_height,
571                consensus_metrics: Arc::clone(&self.consensus_metrics),
572                state_private_key: self.state_private_key.clone(),
573                first_epoch: self.first_epoch,
574                stake_table_capacity: self.stake_table_capacity,
575                cancel_receiver,
576            },
577        );
578        self.vote_dependencies.insert(view_number, cancel_sender);
579
580        dependency_task.run();
581    }
582
583    /// Update the latest voted view number.
584    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote update latest voted view", level = "error")]
585    async fn update_latest_voted_view(&mut self, new_view: TYPES::View) -> bool {
586        if *self.latest_voted_view < *new_view {
587            tracing::debug!(
588                "Updating next vote view from {} to {} in the quorum vote task",
589                *self.latest_voted_view,
590                *new_view
591            );
592
593            // Cancel the old dependency tasks.
594            for view in *self.latest_voted_view..(*new_view) {
595                let maybe_cancel_sender = self.vote_dependencies.remove(&TYPES::View::new(view));
596                if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
597                    tracing::error!("Aborting vote dependency task for view {view}");
598                    let _ = maybe_cancel_sender.unwrap().try_broadcast(());
599                }
600            }
601
602            // Update the metric for the last voted view
603            if let Ok(last_voted_view_usize) = usize::try_from(*new_view) {
604                self.consensus_metrics
605                    .last_voted_view
606                    .set(last_voted_view_usize);
607            } else {
608                tracing::warn!("Failed to convert last voted view to a usize: {new_view}");
609            }
610
611            self.latest_voted_view = new_view;
612
613            return true;
614        }
615        false
616    }
617
618    /// Handle a vote dependent event received on the event stream
619    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote handle", level = "error", target = "QuorumVoteTaskState")]
620    pub async fn handle(
621        &mut self,
622        event: Arc<HotShotEvent<TYPES>>,
623        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
624        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
625    ) -> Result<()> {
626        match event.as_ref() {
627            HotShotEvent::QuorumProposalValidated(proposal, _parent_leaf) => {
628                tracing::trace!(
629                    "Received Proposal for view {}",
630                    *proposal.data.view_number()
631                );
632
633                // Handle the event before creating the dependency task.
634                if let Err(e) =
635                    handle_quorum_proposal_validated(&proposal.data, self, &event_sender).await
636                {
637                    tracing::debug!(
638                        "Failed to handle QuorumProposalValidated event; error = {e:#}"
639                    );
640                }
641
642                ensure!(
643                    proposal.data.view_number() > self.latest_voted_view,
644                    "We have already voted for this view"
645                );
646
647                self.create_dependency_task_if_new(
648                    proposal.data.view_number(),
649                    event_receiver,
650                    &event_sender,
651                    Arc::clone(&event),
652                );
653            },
654            HotShotEvent::DaCertificateRecv(cert) => {
655                let view = cert.view_number;
656
657                tracing::trace!("Received DAC for view {view}");
658                // Do nothing if the DAC is old
659                ensure!(
660                    view > self.latest_voted_view,
661                    "Received DAC for an older view."
662                );
663
664                let cert_epoch = cert.data.epoch;
665
666                let epoch_membership = self.membership.stake_table_for_epoch(cert_epoch).await?;
667                let membership_da_stake_table = epoch_membership.da_stake_table().await;
668                let membership_da_success_threshold = epoch_membership.da_success_threshold().await;
669
670                // Validate the DAC.
671                cert.is_valid_cert(
672                    &StakeTableEntries::<TYPES>::from(membership_da_stake_table).0,
673                    membership_da_success_threshold,
674                    &self.upgrade_lock,
675                )
676                .await
677                .context(|e| warn!("Invalid DAC: {e}"))?;
678
679                // Add to the storage.
680                self.consensus
681                    .write()
682                    .await
683                    .update_saved_da_certs(view, cert.clone());
684
685                broadcast_event(
686                    Arc::new(HotShotEvent::DaCertificateValidated(cert.clone())),
687                    &event_sender.clone(),
688                )
689                .await;
690                self.create_dependency_task_if_new(
691                    view,
692                    event_receiver,
693                    &event_sender,
694                    Arc::clone(&event),
695                );
696            },
697            HotShotEvent::VidShareRecv(sender, share) => {
698                let view = share.data.view_number();
699                // Do nothing if the VID share is old
700                tracing::trace!("Received VID share for view {view}");
701                ensure!(
702                    view > self.latest_voted_view,
703                    "Received VID share for an older view."
704                );
705
706                // Validate the VID share.
707                let payload_commitment = share.data.payload_commitment_ref();
708
709                // Check that the signature is valid
710                ensure!(
711                    sender.validate(&share.signature, payload_commitment.as_ref()),
712                    "VID share signature is invalid, sender: {}, signature: {:?}, payload_commitment: {:?}",
713                    sender,
714                    share.signature,
715                    payload_commitment
716                );
717
718                let vid_epoch = share.data.epoch();
719                let target_epoch = share.data.target_epoch();
720                let membership_reader = self.membership.membership_for_epoch(vid_epoch).await?;
721                // ensure that the VID share was sent by a DA member OR the view leader
722                ensure!(
723                    membership_reader
724                        .da_committee_members(view)
725                        .await
726                        .contains(sender)
727                        || *sender == membership_reader.leader(view).await?,
728                    "VID share was not sent by a DA member or the view leader."
729                );
730
731                let total_weight = vid_total_weight::<TYPES>(
732                    &self
733                        .membership
734                        .membership_for_epoch(target_epoch)
735                        .await?
736                        .stake_table()
737                        .await,
738                    target_epoch,
739                );
740
741                if let Err(()) = share.data.verify_share(total_weight) {
742                    bail!("Failed to verify VID share");
743                }
744
745                self.consensus
746                    .write()
747                    .await
748                    .update_vid_shares(view, share.clone());
749
750                ensure!(
751                    *share.data.recipient_key() == self.public_key,
752                    "Got a Valid VID share but it's not for our key"
753                );
754
755                broadcast_event(
756                    Arc::new(HotShotEvent::VidShareValidated(share.clone())),
757                    &event_sender.clone(),
758                )
759                .await;
760                self.create_dependency_task_if_new(
761                    view,
762                    event_receiver,
763                    &event_sender,
764                    Arc::clone(&event),
765                );
766            },
767            HotShotEvent::Timeout(view, ..) => {
768                let view = TYPES::View::new(view.saturating_sub(1));
769                // cancel old tasks
770                let current_tasks = self.vote_dependencies.split_off(&view);
771                while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
772                    if !cancel_sender.is_closed() {
773                        tracing::error!("Aborting vote dependency task for view {view}");
774                        let _ = cancel_sender.try_broadcast(());
775                    }
776                }
777                self.vote_dependencies = current_tasks;
778            },
779            HotShotEvent::ViewChange(mut view, _) => {
780                view = TYPES::View::new(view.saturating_sub(1));
781                if !self.update_latest_voted_view(view).await {
782                    tracing::debug!("view not updated");
783                }
784                // cancel old tasks
785                let current_tasks = self.vote_dependencies.split_off(&view);
786                while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
787                    if !cancel_sender.is_closed() {
788                        tracing::error!("Aborting vote dependency task for view {view}");
789                        let _ = cancel_sender.try_broadcast(());
790                    }
791                }
792                self.vote_dependencies = current_tasks;
793            },
794            HotShotEvent::SetFirstEpoch(view, epoch) => {
795                self.first_epoch = Some((*view, *epoch));
796            },
797            _ => {},
798        }
799        Ok(())
800    }
801}
802
803#[async_trait]
804impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
805    for QuorumVoteTaskState<TYPES, I, V>
806{
807    type Event = HotShotEvent<TYPES>;
808
809    async fn handle_event(
810        &mut self,
811        event: Arc<Self::Event>,
812        sender: &Sender<Arc<Self::Event>>,
813        receiver: &Receiver<Arc<Self::Event>>,
814    ) -> Result<()> {
815        self.handle(event, receiver.clone(), sender.clone()).await
816    }
817
818    fn cancel_subtasks(&mut self) {
819        while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
820            if !cancel_sender.is_closed() {
821                tracing::error!("Aborting vote dependency task for view {view}");
822                let _ = cancel_sender.try_broadcast(());
823            }
824        }
825    }
826}