hotshot_task_impls/quorum_vote/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::{
13    dependency::{AndDependency, EventDependency},
14    dependency_task::{DependencyTask, HandleDepOutput},
15    task::TaskState,
16};
17use hotshot_types::{
18    consensus::{ConsensusMetricsValue, OuterConsensus},
19    data::{vid_disperse::vid_total_weight, Leaf2},
20    epoch_membership::EpochMembershipCoordinator,
21    event::Event,
22    message::UpgradeLock,
23    simple_vote::HasEpoch,
24    stake_table::StakeTableEntries,
25    storage_metrics::StorageMetricsValue,
26    traits::{
27        block_contents::BlockHeader,
28        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
29        signature_key::{SignatureKey, StateSignatureKey},
30        storage::Storage,
31    },
32    utils::{is_epoch_root, is_epoch_transition, is_last_block, option_epoch_from_block_number},
33    vote::{Certificate, HasViewNumber},
34    VersionedDaCommittee,
35};
36use hotshot_utils::anytrace::*;
37use tracing::instrument;
38
39use crate::{
40    events::HotShotEvent,
41    helpers::{broadcast_event, broadcast_view_change, wait_for_second_vid_share},
42    quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state},
43};
44
45/// Event handlers for `QuorumProposalValidated`.
46mod handlers;
47
48/// Vote dependency types.
49#[derive(Debug, PartialEq)]
50enum VoteDependency {
51    /// For the `QuorumProposalValidated` event after validating `QuorumProposalRecv`.
52    QuorumProposal,
53    /// For the `DaCertificateRecv` event.
54    Dac,
55    /// For the `VidShareRecv` event.
56    Vid,
57}
58
59/// Handler for the vote dependency.
60pub struct VoteDependencyHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
61    /// Public key.
62    pub public_key: TYPES::SignatureKey,
63
64    /// Private Key.
65    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
66
67    /// Reference to consensus. The replica will require a write lock on this.
68    pub consensus: OuterConsensus<TYPES>,
69
70    /// Immutable instance state
71    pub instance_state: Arc<TYPES::InstanceState>,
72
73    /// Membership for Quorum certs/votes.
74    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
75
76    /// Reference to the storage.
77    pub storage: I::Storage,
78
79    /// Storage metrics
80    pub storage_metrics: Arc<StorageMetricsValue>,
81
82    /// View number to vote on.
83    pub view_number: TYPES::View,
84
85    /// Event sender.
86    pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
87
88    /// Event receiver.
89    pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
90
91    /// Lock for a decided upgrade
92    pub upgrade_lock: UpgradeLock<TYPES, V>,
93
94    /// The consensus metrics
95    pub consensus_metrics: Arc<ConsensusMetricsValue>,
96
97    /// The node's id
98    pub id: u64,
99
100    /// Number of blocks in an epoch, zero means there are no epochs
101    pub epoch_height: u64,
102
103    /// Signature key for light client state
104    pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106    /// First view in which epoch version takes effect
107    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
108
109    /// Stake table capacity for light client use
110    pub stake_table_capacity: usize,
111
112    pub cancel_receiver: Receiver<()>,
113}
114
115impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> HandleDepOutput
116    for VoteDependencyHandle<TYPES, I, V>
117{
118    type Output = Vec<Arc<HotShotEvent<TYPES>>>;
119
120    #[allow(clippy::too_many_lines)]
121    #[instrument(skip_all, fields(id = self.id, view = *self.view_number))]
122    async fn handle_dep_result(self, res: Self::Output) {
123        let mut cancel_receiver = self.cancel_receiver.clone();
124        let result = tokio::select! { result = self.handle_vote_deps(&res) => {
125            result
126        }
127        _ = cancel_receiver.recv() => {
128            tracing::warn!("Vote dependency task cancelled");
129            return;
130        }
131        };
132        if result.is_err() {
133            log!(result);
134            self.print_vote_events(&res)
135        }
136    }
137}
138
139impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
140    VoteDependencyHandle<TYPES, I, V>
141{
142    fn print_vote_events(&self, res: &[Arc<HotShotEvent<TYPES>>]) {
143        let events: Vec<_> = res.iter().map(Arc::as_ref).collect();
144        tracing::warn!("Failed to vote, events: {:?}", events);
145    }
146
147    async fn handle_vote_deps(&self, res: &[Arc<HotShotEvent<TYPES>>]) -> Result<()> {
148        let mut payload_commitment = None;
149        let mut next_epoch_payload_commitment = None;
150        let mut leaf = None;
151        let mut vid_share = None;
152        let mut da_cert = None;
153        let mut parent_view_number = None;
154        for event in res.iter() {
155            match event.as_ref() {
156                #[allow(unused_assignments)]
157                HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
158                    let proposal_payload_comm = proposal.data.block_header().payload_commitment();
159                    let parent_commitment = parent_leaf.commit();
160                    let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
161
162                    if let Some(ref comm) = payload_commitment {
163                        ensure!(
164                            proposal_payload_comm == *comm,
165                            error!(
166                                "Quorum proposal has inconsistent payload commitment with DAC or \
167                                 VID."
168                            )
169                        );
170                    } else {
171                        payload_commitment = Some(proposal_payload_comm);
172                    }
173
174                    ensure!(
175                        proposed_leaf.parent_commitment() == parent_commitment,
176                        warn!(
177                            "Proposed leaf parent commitment does not match parent leaf payload \
178                             commitment. Aborting vote."
179                        )
180                    );
181
182                    let now = Instant::now();
183                    // Update our persistent storage of the proposal. If we cannot store the proposal return
184                    // and error so we don't vote
185                    self.storage
186                        .append_proposal_wrapper(proposal)
187                        .await
188                        .map_err(|e| {
189                            error!("failed to store proposal, not voting.  error = {e:#}")
190                        })?;
191                    self.storage_metrics
192                        .append_quorum_duration
193                        .add_point(now.elapsed().as_secs_f64());
194
195                    leaf = Some(proposed_leaf);
196                    parent_view_number = Some(parent_leaf.view_number());
197                },
198                HotShotEvent::DaCertificateValidated(cert) => {
199                    let cert_payload_comm = &cert.data().payload_commit;
200                    let next_epoch_cert_payload_comm = cert.data().next_epoch_payload_commit;
201                    if let Some(ref comm) = payload_commitment {
202                        ensure!(
203                            cert_payload_comm == comm,
204                            error!(
205                                "DAC has inconsistent payload commitment with quorum proposal or \
206                                 VID."
207                            )
208                        );
209                    } else {
210                        payload_commitment = Some(*cert_payload_comm);
211                    }
212                    if next_epoch_payload_commitment.is_some()
213                        && next_epoch_payload_commitment != next_epoch_cert_payload_comm
214                    {
215                        bail!(error!(
216                            "DAC has inconsistent next epoch payload commitment with VID."
217                        ));
218                    } else {
219                        next_epoch_payload_commitment = next_epoch_cert_payload_comm;
220                    }
221                    da_cert = Some(cert.clone());
222                },
223                HotShotEvent::VidShareValidated(share) => {
224                    let vid_payload_commitment = &share.data.payload_commitment();
225                    vid_share = Some(share.clone());
226                    let is_next_epoch_vid = share.data.epoch() != share.data.target_epoch();
227                    if is_next_epoch_vid {
228                        if let Some(ref comm) = next_epoch_payload_commitment {
229                            ensure!(
230                                vid_payload_commitment == comm,
231                                error!(
232                                    "VID has inconsistent next epoch payload commitment with DAC."
233                                )
234                            );
235                        } else {
236                            next_epoch_payload_commitment = Some(*vid_payload_commitment);
237                        }
238                    } else if let Some(ref comm) = payload_commitment {
239                        ensure!(
240                            vid_payload_commitment == comm,
241                            error!(
242                                "VID has inconsistent payload commitment with quorum proposal or \
243                                 DAC."
244                            )
245                        );
246                    } else {
247                        payload_commitment = Some(*vid_payload_commitment);
248                    }
249                },
250                _ => {},
251            }
252        }
253
254        let Some(vid_share) = vid_share else {
255            bail!(error!(
256                "We don't have the VID share for this view {}, but we should, because the vote \
257                 dependencies have completed.",
258                self.view_number
259            ));
260        };
261
262        let Some(leaf) = leaf else {
263            bail!(error!(
264                "We don't have the leaf for this view {}, but we should, because the vote \
265                 dependencies have completed.",
266                self.view_number
267            ));
268        };
269
270        let Some(da_cert) = da_cert else {
271            bail!(error!(
272                "We don't have the DA cert for this view {}, but we should, because the vote \
273                 dependencies have completed.",
274                self.view_number
275            ));
276        };
277
278        let mut maybe_current_epoch_vid_share = None;
279        // If this is an epoch transition block, we might need two VID shares.
280        if self.upgrade_lock.epochs_enabled(leaf.view_number()).await
281            && is_epoch_transition(leaf.block_header().block_number(), self.epoch_height)
282        {
283            let current_epoch = option_epoch_from_block_number::<TYPES>(
284                leaf.with_epoch,
285                leaf.block_header().block_number(),
286                self.epoch_height,
287            );
288            let next_epoch = current_epoch.map(|e| e + 1);
289
290            let Ok(current_epoch_membership) = self
291                .membership_coordinator
292                .stake_table_for_epoch(current_epoch)
293                .await
294            else {
295                bail!(warn!(
296                    "Couldn't acquire current epoch membership. Do not vote!"
297                ));
298            };
299            let Ok(next_epoch_membership) = self
300                .membership_coordinator
301                .stake_table_for_epoch(next_epoch)
302                .await
303            else {
304                bail!(warn!(
305                    "Couldn't acquire next epoch membership. Do not vote!"
306                ));
307            };
308
309            // If we belong to both epochs, we require VID shares from both epochs.
310            if current_epoch_membership.has_stake(&self.public_key).await
311                && next_epoch_membership.has_stake(&self.public_key).await
312            {
313                let other_target_epoch = if vid_share.data.target_epoch() == current_epoch {
314                    maybe_current_epoch_vid_share = Some(vid_share.clone());
315                    next_epoch
316                } else {
317                    current_epoch
318                };
319                match wait_for_second_vid_share(
320                    other_target_epoch,
321                    &vid_share,
322                    &da_cert,
323                    &self.consensus,
324                    &self.receiver.activate_cloned(),
325                    self.cancel_receiver.clone(),
326                    self.id,
327                )
328                .await
329                {
330                    Ok(other_vid_share) => {
331                        if maybe_current_epoch_vid_share.is_none() {
332                            maybe_current_epoch_vid_share = Some(other_vid_share);
333                        }
334                        ensure!(
335                            leaf.block_header().payload_commitment()
336                                == maybe_current_epoch_vid_share
337                                    .as_ref()
338                                    .unwrap()
339                                    .data
340                                    .payload_commitment(),
341                            error!(
342                                "We have both epochs vid shares but the leaf's vid commit doesn't \
343                                 match the old epoch vid share's commit. It should never happen."
344                            )
345                        );
346                    },
347                    Err(e) => {
348                        bail!(warn!(
349                            "This is an epoch transition block, we are in both epochs but we \
350                             received only one VID share. Do not vote! Error: {e:?}"
351                        ));
352                    },
353                }
354            }
355        }
356
357        // Update internal state
358        update_shared_state::<TYPES, V>(
359            OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
360            self.sender.clone(),
361            self.receiver.clone(),
362            self.membership_coordinator.clone(),
363            self.public_key.clone(),
364            self.private_key.clone(),
365            self.upgrade_lock.clone(),
366            self.view_number,
367            Arc::clone(&self.instance_state),
368            &leaf,
369            maybe_current_epoch_vid_share.as_ref().unwrap_or(&vid_share),
370            parent_view_number,
371            self.epoch_height,
372        )
373        .await
374        .context(error!("Failed to update shared consensus state"))?;
375
376        let cur_epoch = option_epoch_from_block_number::<TYPES>(
377            leaf.with_epoch,
378            leaf.height(),
379            self.epoch_height,
380        );
381
382        let now = Instant::now();
383        // We use this `epoch_membership` to vote,
384        // meaning that we must know the leader for the current view in the current epoch
385        // and must therefore perform the full DRB catchup.
386        let epoch_membership = self
387            .membership_coordinator
388            .membership_for_epoch(cur_epoch)
389            .await?;
390
391        let duration = now.elapsed();
392        tracing::info!("membership_for_epoch time: {duration:?}");
393
394        let is_vote_leaf_extended = is_last_block(leaf.height(), self.epoch_height);
395        let is_vote_epoch_root = is_epoch_root(leaf.height(), self.epoch_height);
396        if cur_epoch.is_none() || !is_vote_leaf_extended {
397            // We're voting for the proposal that will probably form the eQC. We don't want to change
398            // the view here because we will probably change it when we form the eQC.
399            // The main reason is to handle view change event only once in the transaction task.
400            broadcast_view_change(
401                &self.sender,
402                leaf.view_number() + 1,
403                cur_epoch,
404                self.first_epoch,
405            )
406            .await;
407        }
408
409        let leader = epoch_membership.leader(self.view_number).await;
410        if let (Ok(leader_key), Some(cur_epoch)) = (leader, cur_epoch) {
411            self.consensus
412                .write()
413                .await
414                .update_validator_participation(leader_key, cur_epoch, true);
415        }
416
417        submit_vote::<TYPES, I, V>(
418            self.sender.clone(),
419            epoch_membership,
420            self.public_key.clone(),
421            self.private_key.clone(),
422            self.upgrade_lock.clone(),
423            self.view_number,
424            self.storage.clone(),
425            Arc::clone(&self.storage_metrics),
426            leaf,
427            maybe_current_epoch_vid_share.unwrap_or(vid_share),
428            is_vote_leaf_extended,
429            is_vote_epoch_root,
430            self.epoch_height,
431            &self.state_private_key,
432            self.stake_table_capacity,
433        )
434        .await
435    }
436}
437
438/// The state for the quorum vote task.
439///
440/// Contains all of the information for the quorum vote.
441pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
442    /// Public key.
443    pub public_key: TYPES::SignatureKey,
444
445    /// Private Key.
446    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
447
448    /// Reference to consensus. The replica will require a write lock on this.
449    pub consensus: OuterConsensus<TYPES>,
450
451    /// Immutable instance state
452    pub instance_state: Arc<TYPES::InstanceState>,
453
454    /// Latest view number that has been voted for.
455    pub latest_voted_view: TYPES::View,
456
457    /// Table for the in-progress dependency tasks.
458    pub vote_dependencies: BTreeMap<TYPES::View, Sender<()>>,
459
460    /// The underlying network
461    pub network: Arc<I::Network>,
462
463    /// Membership for Quorum certs/votes and DA committee certs/votes.
464    pub membership: EpochMembershipCoordinator<TYPES>,
465
466    /// Output events to application
467    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
468
469    /// The node's id
470    pub id: u64,
471
472    /// The consensus metrics
473    pub consensus_metrics: Arc<ConsensusMetricsValue>,
474
475    /// Reference to the storage.
476    pub storage: I::Storage,
477
478    /// Storage metrics
479    pub storage_metrics: Arc<StorageMetricsValue>,
480
481    /// Lock for a decided upgrade
482    pub upgrade_lock: UpgradeLock<TYPES, V>,
483
484    /// Number of blocks in an epoch, zero means there are no epochs
485    pub epoch_height: u64,
486
487    /// Signature key for light client state
488    pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
489
490    /// First view in which epoch version takes effect
491    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
492
493    /// Stake table capacity for light client use
494    pub stake_table_capacity: usize,
495
496    /// DA committees from HotShotConfig, to apply when an upgrade is decided
497    pub da_committees: Vec<VersionedDaCommittee<TYPES>>,
498}
499
500impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskState<TYPES, I, V> {
501    /// Create an event dependency.
502    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create event dependency", level = "error")]
503    fn create_event_dependency(
504        &self,
505        dependency_type: VoteDependency,
506        view_number: TYPES::View,
507        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
508        cancel_receiver: Receiver<()>,
509    ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
510        let id = self.id;
511        EventDependency::new(
512            event_receiver,
513            cancel_receiver,
514            format!(
515                "VoteDependency::{:?} for view {:?}, my id {:?}",
516                dependency_type, view_number, self.id
517            ),
518            Box::new(move |event| {
519                let event = event.as_ref();
520                let event_view = match dependency_type {
521                    VoteDependency::QuorumProposal => {
522                        if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
523                            proposal.data.view_number()
524                        } else {
525                            return false;
526                        }
527                    },
528                    VoteDependency::Dac => {
529                        if let HotShotEvent::DaCertificateValidated(cert) = event {
530                            cert.view_number
531                        } else {
532                            return false;
533                        }
534                    },
535                    VoteDependency::Vid => {
536                        if let HotShotEvent::VidShareValidated(disperse) = event {
537                            disperse.data.view_number()
538                        } else {
539                            return false;
540                        }
541                    },
542                };
543                if event_view == view_number {
544                    tracing::debug!(
545                        "Vote dependency {dependency_type:?} completed for view {view_number}, my \
546                         id is {id}"
547                    );
548                    return true;
549                }
550                false
551            }),
552        )
553    }
554
555    /// Create and store an [`AndDependency`] combining [`EventDependency`]s associated with the
556    /// given view number if it doesn't exist.
557    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create dependency task if new", level = "error")]
558    fn create_dependency_task_if_new(
559        &mut self,
560        view_number: TYPES::View,
561        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
562        event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
563        event: Arc<HotShotEvent<TYPES>>,
564    ) {
565        tracing::debug!(
566            "Attempting to make dependency task for view {view_number} and event {event:?}"
567        );
568
569        if self.vote_dependencies.contains_key(&view_number) {
570            return;
571        }
572
573        let (cancel_sender, cancel_receiver) = broadcast(1);
574
575        let mut quorum_proposal_dependency = self.create_event_dependency(
576            VoteDependency::QuorumProposal,
577            view_number,
578            event_receiver.clone(),
579            cancel_receiver.clone(),
580        );
581        let dac_dependency = self.create_event_dependency(
582            VoteDependency::Dac,
583            view_number,
584            event_receiver.clone(),
585            cancel_receiver.clone(),
586        );
587        let vid_dependency = self.create_event_dependency(
588            VoteDependency::Vid,
589            view_number,
590            event_receiver.clone(),
591            cancel_receiver.clone(),
592        );
593        // If we have an event provided to us
594        if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
595            quorum_proposal_dependency.mark_as_completed(event);
596        }
597
598        let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency];
599
600        let dependency_chain = AndDependency::from_deps(deps);
601
602        let dependency_task = DependencyTask::new(
603            dependency_chain,
604            VoteDependencyHandle::<TYPES, I, V> {
605                public_key: self.public_key.clone(),
606                private_key: self.private_key.clone(),
607                consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
608                instance_state: Arc::clone(&self.instance_state),
609                membership_coordinator: self.membership.clone(),
610                storage: self.storage.clone(),
611                storage_metrics: Arc::clone(&self.storage_metrics),
612                view_number,
613                sender: event_sender.clone(),
614                receiver: event_receiver.clone().deactivate(),
615                upgrade_lock: self.upgrade_lock.clone(),
616                id: self.id,
617                epoch_height: self.epoch_height,
618                consensus_metrics: Arc::clone(&self.consensus_metrics),
619                state_private_key: self.state_private_key.clone(),
620                first_epoch: self.first_epoch,
621                stake_table_capacity: self.stake_table_capacity,
622                cancel_receiver,
623            },
624        );
625        self.vote_dependencies.insert(view_number, cancel_sender);
626
627        dependency_task.run();
628    }
629
630    /// Update the latest voted view number.
631    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote update latest voted view", level = "error")]
632    async fn update_latest_voted_view(&mut self, new_view: TYPES::View) -> bool {
633        if *self.latest_voted_view < *new_view {
634            tracing::debug!(
635                "Updating next vote view from {} to {} in the quorum vote task",
636                *self.latest_voted_view,
637                *new_view
638            );
639
640            // Cancel the old dependency tasks.
641            for view in *self.latest_voted_view..(*new_view) {
642                let maybe_cancel_sender = self.vote_dependencies.remove(&TYPES::View::new(view));
643                if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
644                    tracing::warn!("Aborting vote dependency task for view {view}");
645                    let _ = maybe_cancel_sender.unwrap().try_broadcast(());
646                }
647            }
648
649            // Update the metric for the last voted view
650            if let Ok(last_voted_view_usize) = usize::try_from(*new_view) {
651                self.consensus_metrics
652                    .last_voted_view
653                    .set(last_voted_view_usize);
654            } else {
655                tracing::warn!("Failed to convert last voted view to a usize: {new_view}");
656            }
657
658            self.latest_voted_view = new_view;
659
660            return true;
661        }
662        false
663    }
664
665    /// Handle a vote dependent event received on the event stream
666    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote handle", level = "error", target = "QuorumVoteTaskState")]
667    pub async fn handle(
668        &mut self,
669        event: Arc<HotShotEvent<TYPES>>,
670        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
671        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
672    ) -> Result<()> {
673        match event.as_ref() {
674            HotShotEvent::QuorumProposalValidated(proposal, _parent_leaf) => {
675                tracing::trace!(
676                    "Received Proposal for view {}",
677                    *proposal.data.view_number()
678                );
679
680                // Handle the event before creating the dependency task.
681                if let Err(e) =
682                    handle_quorum_proposal_validated(&proposal.data, self, &event_sender).await
683                {
684                    tracing::debug!(
685                        "Failed to handle QuorumProposalValidated event; error = {e:#}"
686                    );
687                }
688
689                ensure!(
690                    proposal.data.view_number() > self.latest_voted_view,
691                    "We have already voted for this view"
692                );
693
694                self.create_dependency_task_if_new(
695                    proposal.data.view_number(),
696                    event_receiver,
697                    &event_sender,
698                    Arc::clone(&event),
699                );
700            },
701            HotShotEvent::DaCertificateRecv(cert) => {
702                let view = cert.view_number;
703
704                tracing::trace!("Received DAC for view {view}");
705                // Do nothing if the DAC is old
706                ensure!(
707                    view > self.latest_voted_view,
708                    "Received DAC for an older view."
709                );
710
711                let cert_epoch = cert.data.epoch;
712
713                let epoch_membership = self.membership.stake_table_for_epoch(cert_epoch).await?;
714                let membership_da_stake_table = epoch_membership.da_stake_table().await;
715                let membership_da_success_threshold = epoch_membership.da_success_threshold().await;
716
717                // Validate the DAC.
718                cert.is_valid_cert(
719                    &StakeTableEntries::<TYPES>::from(membership_da_stake_table).0,
720                    membership_da_success_threshold,
721                    &self.upgrade_lock,
722                )
723                .await
724                .context(|e| warn!("Invalid DAC: {e}"))?;
725
726                // Add to the storage.
727                self.consensus
728                    .write()
729                    .await
730                    .update_saved_da_certs(view, cert.clone());
731
732                broadcast_event(
733                    Arc::new(HotShotEvent::DaCertificateValidated(cert.clone())),
734                    &event_sender.clone(),
735                )
736                .await;
737                self.create_dependency_task_if_new(
738                    view,
739                    event_receiver,
740                    &event_sender,
741                    Arc::clone(&event),
742                );
743            },
744            HotShotEvent::VidShareRecv(sender, share) => {
745                let view = share.data.view_number();
746                // Do nothing if the VID share is old
747                tracing::trace!("Received VID share for view {view}");
748                ensure!(
749                    view > self.latest_voted_view,
750                    "Received VID share for an older view."
751                );
752
753                // Validate the VID share.
754                let payload_commitment = share.data.payload_commitment_ref();
755
756                // Check that the signature is valid
757                ensure!(
758                    sender.validate(&share.signature, payload_commitment.as_ref()),
759                    warn!(
760                        "VID share signature is invalid, sender: {}, signature: {:?}, \
761                         payload_commitment: {:?}",
762                        sender, share.signature, payload_commitment
763                    )
764                );
765
766                let vid_epoch = share.data.epoch();
767                let target_epoch = share.data.target_epoch();
768                let membership_reader = self.membership.membership_for_epoch(vid_epoch).await?;
769                // ensure that the VID share was sent by a DA member OR the view leader
770                ensure!(
771                    membership_reader
772                        .da_committee_members(view)
773                        .await
774                        .contains(sender)
775                        || *sender == membership_reader.leader(view).await?,
776                    "VID share was not sent by a DA member or the view leader."
777                );
778
779                let total_weight = vid_total_weight::<TYPES>(
780                    &self
781                        .membership
782                        .membership_for_epoch(target_epoch)
783                        .await?
784                        .stake_table()
785                        .await,
786                    target_epoch,
787                );
788
789                if let Err(()) = share.data.verify_share(total_weight) {
790                    bail!("Failed to verify VID share");
791                }
792
793                self.consensus
794                    .write()
795                    .await
796                    .update_vid_shares(view, share.clone());
797
798                ensure!(
799                    *share.data.recipient_key() == self.public_key,
800                    "Got a Valid VID share but it's not for our key"
801                );
802
803                broadcast_event(
804                    Arc::new(HotShotEvent::VidShareValidated(share.clone())),
805                    &event_sender.clone(),
806                )
807                .await;
808                self.create_dependency_task_if_new(
809                    view,
810                    event_receiver,
811                    &event_sender,
812                    Arc::clone(&event),
813                );
814            },
815            HotShotEvent::Timeout(view, ..) => {
816                let view = TYPES::View::new(view.saturating_sub(1));
817                // cancel old tasks
818                let current_tasks = self.vote_dependencies.split_off(&view);
819                while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
820                    if !cancel_sender.is_closed() {
821                        tracing::error!("Aborting vote dependency task for view {view}");
822                        let _ = cancel_sender.try_broadcast(());
823                    }
824                }
825                self.vote_dependencies = current_tasks;
826            },
827            HotShotEvent::ViewChange(mut view, _) => {
828                view = TYPES::View::new(view.saturating_sub(1));
829                if !self.update_latest_voted_view(view).await {
830                    tracing::debug!("view not updated");
831                }
832                // cancel old tasks
833                let current_tasks = self.vote_dependencies.split_off(&view);
834                while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
835                    if !cancel_sender.is_closed() {
836                        tracing::error!("Aborting vote dependency task for view {view}");
837                        let _ = cancel_sender.try_broadcast(());
838                    }
839                }
840                self.vote_dependencies = current_tasks;
841            },
842            HotShotEvent::SetFirstEpoch(view, epoch) => {
843                self.first_epoch = Some((*view, *epoch));
844            },
845            _ => {},
846        }
847        Ok(())
848    }
849}
850
851#[async_trait]
852impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
853    for QuorumVoteTaskState<TYPES, I, V>
854{
855    type Event = HotShotEvent<TYPES>;
856
857    async fn handle_event(
858        &mut self,
859        event: Arc<Self::Event>,
860        sender: &Sender<Arc<Self::Event>>,
861        receiver: &Receiver<Arc<Self::Event>>,
862    ) -> Result<()> {
863        self.handle(event, receiver.clone(), sender.clone()).await
864    }
865
866    fn cancel_subtasks(&mut self) {
867        while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
868            if !cancel_sender.is_closed() {
869                tracing::error!("Aborting vote dependency task for view {view}");
870                let _ = cancel_sender.try_broadcast(());
871            }
872        }
873    }
874}