hotshot_task_impls/quorum_vote/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::{
13    dependency::{AndDependency, EventDependency},
14    dependency_task::{DependencyTask, HandleDepOutput},
15    task::TaskState,
16};
17use hotshot_types::{
18    consensus::{ConsensusMetricsValue, OuterConsensus},
19    data::{vid_disperse::vid_total_weight, Leaf2},
20    epoch_membership::EpochMembershipCoordinator,
21    event::Event,
22    message::UpgradeLock,
23    simple_vote::HasEpoch,
24    stake_table::StakeTableEntries,
25    storage_metrics::StorageMetricsValue,
26    traits::{
27        block_contents::BlockHeader,
28        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
29        signature_key::{SignatureKey, StateSignatureKey},
30        storage::Storage,
31    },
32    utils::{is_epoch_root, is_epoch_transition, is_last_block, option_epoch_from_block_number},
33    vote::{Certificate, HasViewNumber},
34};
35use hotshot_utils::anytrace::*;
36use tracing::instrument;
37
38use crate::{
39    events::HotShotEvent,
40    helpers::{broadcast_event, broadcast_view_change, wait_for_second_vid_share},
41    quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state},
42};
43
44/// Event handlers for `QuorumProposalValidated`.
45mod handlers;
46
47/// Vote dependency types.
48#[derive(Debug, PartialEq)]
49enum VoteDependency {
50    /// For the `QuorumProposalValidated` event after validating `QuorumProposalRecv`.
51    QuorumProposal,
52    /// For the `DaCertificateRecv` event.
53    Dac,
54    /// For the `VidShareRecv` event.
55    Vid,
56}
57
58/// Handler for the vote dependency.
59pub struct VoteDependencyHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
60    /// Public key.
61    pub public_key: TYPES::SignatureKey,
62
63    /// Private Key.
64    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
65
66    /// Reference to consensus. The replica will require a write lock on this.
67    pub consensus: OuterConsensus<TYPES>,
68
69    /// Immutable instance state
70    pub instance_state: Arc<TYPES::InstanceState>,
71
72    /// Membership for Quorum certs/votes.
73    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
74
75    /// Reference to the storage.
76    pub storage: I::Storage,
77
78    /// Storage metrics
79    pub storage_metrics: Arc<StorageMetricsValue>,
80
81    /// View number to vote on.
82    pub view_number: TYPES::View,
83
84    /// Event sender.
85    pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
86
87    /// Event receiver.
88    pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
89
90    /// Lock for a decided upgrade
91    pub upgrade_lock: UpgradeLock<TYPES, V>,
92
93    /// The consensus metrics
94    pub consensus_metrics: Arc<ConsensusMetricsValue>,
95
96    /// The node's id
97    pub id: u64,
98
99    /// Number of blocks in an epoch, zero means there are no epochs
100    pub epoch_height: u64,
101
102    /// Signature key for light client state
103    pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
104
105    /// First view in which epoch version takes effect
106    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
107
108    /// Stake table capacity for light client use
109    pub stake_table_capacity: usize,
110
111    pub cancel_receiver: Receiver<()>,
112}
113
114impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> HandleDepOutput
115    for VoteDependencyHandle<TYPES, I, V>
116{
117    type Output = Vec<Arc<HotShotEvent<TYPES>>>;
118
119    #[allow(clippy::too_many_lines)]
120    #[instrument(skip_all, fields(id = self.id, view = *self.view_number))]
121    async fn handle_dep_result(self, res: Self::Output) {
122        let result = self.handle_vote_deps(&res).await;
123        if result.is_err() {
124            log!(result);
125            self.print_vote_events(&res)
126        }
127    }
128}
129
130impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
131    VoteDependencyHandle<TYPES, I, V>
132{
133    fn print_vote_events(&self, res: &[Arc<HotShotEvent<TYPES>>]) {
134        let events: Vec<_> = res.iter().map(Arc::as_ref).collect();
135        tracing::warn!("Failed to vote, events: {:?}", events);
136    }
137
138    async fn handle_vote_deps(&self, res: &[Arc<HotShotEvent<TYPES>>]) -> Result<()> {
139        let mut payload_commitment = None;
140        let mut next_epoch_payload_commitment = None;
141        let mut leaf = None;
142        let mut vid_share = None;
143        let mut da_cert = None;
144        let mut parent_view_number = None;
145        for event in res.iter() {
146            match event.as_ref() {
147                #[allow(unused_assignments)]
148                HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
149                    let proposal_payload_comm = proposal.data.block_header().payload_commitment();
150                    let parent_commitment = parent_leaf.commit();
151                    let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
152
153                    if let Some(ref comm) = payload_commitment {
154                        ensure!(
155                            proposal_payload_comm == *comm,
156                            error!(
157                                "Quorum proposal has inconsistent payload commitment with DAC or \
158                                 VID."
159                            )
160                        );
161                    } else {
162                        payload_commitment = Some(proposal_payload_comm);
163                    }
164
165                    ensure!(
166                        proposed_leaf.parent_commitment() == parent_commitment,
167                        warn!(
168                            "Proposed leaf parent commitment does not match parent leaf payload \
169                             commitment. Aborting vote."
170                        )
171                    );
172
173                    let now = Instant::now();
174                    // Update our persistent storage of the proposal. If we cannot store the proposal return
175                    // and error so we don't vote
176                    self.storage
177                        .append_proposal_wrapper(proposal)
178                        .await
179                        .map_err(|e| {
180                            error!("failed to store proposal, not voting.  error = {e:#}")
181                        })?;
182                    self.storage_metrics
183                        .append_quorum_duration
184                        .add_point(now.elapsed().as_secs_f64());
185
186                    leaf = Some(proposed_leaf);
187                    parent_view_number = Some(parent_leaf.view_number());
188                },
189                HotShotEvent::DaCertificateValidated(cert) => {
190                    let cert_payload_comm = &cert.data().payload_commit;
191                    let next_epoch_cert_payload_comm = cert.data().next_epoch_payload_commit;
192                    if let Some(ref comm) = payload_commitment {
193                        ensure!(
194                            cert_payload_comm == comm,
195                            error!(
196                                "DAC has inconsistent payload commitment with quorum proposal or \
197                                 VID."
198                            )
199                        );
200                    } else {
201                        payload_commitment = Some(*cert_payload_comm);
202                    }
203                    if next_epoch_payload_commitment.is_some()
204                        && next_epoch_payload_commitment != next_epoch_cert_payload_comm
205                    {
206                        bail!(error!(
207                            "DAC has inconsistent next epoch payload commitment with VID."
208                        ));
209                    } else {
210                        next_epoch_payload_commitment = next_epoch_cert_payload_comm;
211                    }
212                    da_cert = Some(cert.clone());
213                },
214                HotShotEvent::VidShareValidated(share) => {
215                    let vid_payload_commitment = &share.data.payload_commitment();
216                    vid_share = Some(share.clone());
217                    let is_next_epoch_vid = share.data.epoch() != share.data.target_epoch();
218                    if is_next_epoch_vid {
219                        if let Some(ref comm) = next_epoch_payload_commitment {
220                            ensure!(
221                                vid_payload_commitment == comm,
222                                error!(
223                                    "VID has inconsistent next epoch payload commitment with DAC."
224                                )
225                            );
226                        } else {
227                            next_epoch_payload_commitment = Some(*vid_payload_commitment);
228                        }
229                    } else if let Some(ref comm) = payload_commitment {
230                        ensure!(
231                            vid_payload_commitment == comm,
232                            error!(
233                                "VID has inconsistent payload commitment with quorum proposal or \
234                                 DAC."
235                            )
236                        );
237                    } else {
238                        payload_commitment = Some(*vid_payload_commitment);
239                    }
240                },
241                _ => {},
242            }
243        }
244
245        let Some(vid_share) = vid_share else {
246            bail!(error!(
247                "We don't have the VID share for this view {}, but we should, because the vote \
248                 dependencies have completed.",
249                self.view_number
250            ));
251        };
252
253        let Some(leaf) = leaf else {
254            bail!(error!(
255                "We don't have the leaf for this view {}, but we should, because the vote \
256                 dependencies have completed.",
257                self.view_number
258            ));
259        };
260
261        let Some(da_cert) = da_cert else {
262            bail!(error!(
263                "We don't have the DA cert for this view {}, but we should, because the vote \
264                 dependencies have completed.",
265                self.view_number
266            ));
267        };
268
269        let mut maybe_current_epoch_vid_share = None;
270        // If this is an epoch transition block, we might need two VID shares.
271        if self.upgrade_lock.epochs_enabled(leaf.view_number()).await
272            && is_epoch_transition(leaf.block_header().block_number(), self.epoch_height)
273        {
274            let current_epoch = option_epoch_from_block_number::<TYPES>(
275                leaf.with_epoch,
276                leaf.block_header().block_number(),
277                self.epoch_height,
278            );
279            let next_epoch = current_epoch.map(|e| e + 1);
280
281            let Ok(current_epoch_membership) = self
282                .membership_coordinator
283                .stake_table_for_epoch(current_epoch)
284                .await
285            else {
286                bail!(warn!(
287                    "Couldn't acquire current epoch membership. Do not vote!"
288                ));
289            };
290            let Ok(next_epoch_membership) = self
291                .membership_coordinator
292                .stake_table_for_epoch(next_epoch)
293                .await
294            else {
295                bail!(warn!(
296                    "Couldn't acquire next epoch membership. Do not vote!"
297                ));
298            };
299
300            // If we belong to both epochs, we require VID shares from both epochs.
301            if current_epoch_membership.has_stake(&self.public_key).await
302                && next_epoch_membership.has_stake(&self.public_key).await
303            {
304                let other_target_epoch = if vid_share.data.target_epoch() == current_epoch {
305                    maybe_current_epoch_vid_share = Some(vid_share.clone());
306                    next_epoch
307                } else {
308                    current_epoch
309                };
310                match wait_for_second_vid_share(
311                    other_target_epoch,
312                    &vid_share,
313                    &da_cert,
314                    &self.consensus,
315                    &self.receiver.activate_cloned(),
316                    self.cancel_receiver.clone(),
317                    self.id,
318                )
319                .await
320                {
321                    Ok(other_vid_share) => {
322                        if maybe_current_epoch_vid_share.is_none() {
323                            maybe_current_epoch_vid_share = Some(other_vid_share);
324                        }
325                        ensure!(
326                            leaf.block_header().payload_commitment()
327                                == maybe_current_epoch_vid_share
328                                    .as_ref()
329                                    .unwrap()
330                                    .data
331                                    .payload_commitment(),
332                            error!(
333                                "We have both epochs vid shares but the leaf's vid commit doesn't \
334                                 match the old epoch vid share's commit. It should never happen."
335                            )
336                        );
337                    },
338                    Err(e) => {
339                        bail!(warn!(
340                            "This is an epoch transition block, we are in both epochs but we \
341                             received only one VID share. Do not vote! Error: {e:?}"
342                        ));
343                    },
344                }
345            }
346        }
347
348        // Update internal state
349        update_shared_state::<TYPES, V>(
350            OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
351            self.sender.clone(),
352            self.receiver.clone(),
353            self.membership_coordinator.clone(),
354            self.public_key.clone(),
355            self.private_key.clone(),
356            self.upgrade_lock.clone(),
357            self.view_number,
358            Arc::clone(&self.instance_state),
359            &leaf,
360            maybe_current_epoch_vid_share.as_ref().unwrap_or(&vid_share),
361            parent_view_number,
362            self.epoch_height,
363        )
364        .await
365        .context(error!("Failed to update shared consensus state"))?;
366
367        let cur_epoch = option_epoch_from_block_number::<TYPES>(
368            leaf.with_epoch,
369            leaf.height(),
370            self.epoch_height,
371        );
372
373        let now = Instant::now();
374        // We use this `epoch_membership` to vote,
375        // meaning that we must know the leader for the current view in the current epoch
376        // and must therefore perform the full DRB catchup.
377        let epoch_membership = self
378            .membership_coordinator
379            .membership_for_epoch(cur_epoch)
380            .await?;
381
382        let duration = now.elapsed();
383        tracing::info!("membership_for_epoch time: {duration:?}");
384
385        let is_vote_leaf_extended = is_last_block(leaf.height(), self.epoch_height);
386        let is_vote_epoch_root = is_epoch_root(leaf.height(), self.epoch_height);
387        if cur_epoch.is_none() || !is_vote_leaf_extended {
388            // We're voting for the proposal that will probably form the eQC. We don't want to change
389            // the view here because we will probably change it when we form the eQC.
390            // The main reason is to handle view change event only once in the transaction task.
391            broadcast_view_change(
392                &self.sender,
393                leaf.view_number() + 1,
394                cur_epoch,
395                self.first_epoch,
396            )
397            .await;
398        }
399
400        let leader = epoch_membership.leader(self.view_number).await;
401        if let (Ok(leader_key), Some(cur_epoch)) = (leader, cur_epoch) {
402            self.consensus
403                .write()
404                .await
405                .update_validator_participation(leader_key, cur_epoch, true);
406        }
407
408        submit_vote::<TYPES, I, V>(
409            self.sender.clone(),
410            epoch_membership,
411            self.public_key.clone(),
412            self.private_key.clone(),
413            self.upgrade_lock.clone(),
414            self.view_number,
415            self.storage.clone(),
416            Arc::clone(&self.storage_metrics),
417            leaf,
418            maybe_current_epoch_vid_share.unwrap_or(vid_share),
419            is_vote_leaf_extended,
420            is_vote_epoch_root,
421            self.epoch_height,
422            &self.state_private_key,
423            self.stake_table_capacity,
424        )
425        .await
426    }
427}
428
429/// The state for the quorum vote task.
430///
431/// Contains all of the information for the quorum vote.
432pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
433    /// Public key.
434    pub public_key: TYPES::SignatureKey,
435
436    /// Private Key.
437    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
438
439    /// Reference to consensus. The replica will require a write lock on this.
440    pub consensus: OuterConsensus<TYPES>,
441
442    /// Immutable instance state
443    pub instance_state: Arc<TYPES::InstanceState>,
444
445    /// Latest view number that has been voted for.
446    pub latest_voted_view: TYPES::View,
447
448    /// Table for the in-progress dependency tasks.
449    pub vote_dependencies: BTreeMap<TYPES::View, Sender<()>>,
450
451    /// The underlying network
452    pub network: Arc<I::Network>,
453
454    /// Membership for Quorum certs/votes and DA committee certs/votes.
455    pub membership: EpochMembershipCoordinator<TYPES>,
456
457    /// Output events to application
458    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
459
460    /// The node's id
461    pub id: u64,
462
463    /// The consensus metrics
464    pub consensus_metrics: Arc<ConsensusMetricsValue>,
465
466    /// Reference to the storage.
467    pub storage: I::Storage,
468
469    /// Storage metrics
470    pub storage_metrics: Arc<StorageMetricsValue>,
471
472    /// Lock for a decided upgrade
473    pub upgrade_lock: UpgradeLock<TYPES, V>,
474
475    /// Number of blocks in an epoch, zero means there are no epochs
476    pub epoch_height: u64,
477
478    /// Signature key for light client state
479    pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
480
481    /// First view in which epoch version takes effect
482    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
483
484    /// Stake table capacity for light client use
485    pub stake_table_capacity: usize,
486}
487
488impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskState<TYPES, I, V> {
489    /// Create an event dependency.
490    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create event dependency", level = "error")]
491    fn create_event_dependency(
492        &self,
493        dependency_type: VoteDependency,
494        view_number: TYPES::View,
495        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
496        cancel_receiver: Receiver<()>,
497    ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
498        let id = self.id;
499        EventDependency::new(
500            event_receiver,
501            cancel_receiver,
502            format!(
503                "VoteDependency::{:?} for view {:?}, my id {:?}",
504                dependency_type, view_number, self.id
505            ),
506            Box::new(move |event| {
507                let event = event.as_ref();
508                let event_view = match dependency_type {
509                    VoteDependency::QuorumProposal => {
510                        if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
511                            proposal.data.view_number()
512                        } else {
513                            return false;
514                        }
515                    },
516                    VoteDependency::Dac => {
517                        if let HotShotEvent::DaCertificateValidated(cert) = event {
518                            cert.view_number
519                        } else {
520                            return false;
521                        }
522                    },
523                    VoteDependency::Vid => {
524                        if let HotShotEvent::VidShareValidated(disperse) = event {
525                            disperse.data.view_number()
526                        } else {
527                            return false;
528                        }
529                    },
530                };
531                if event_view == view_number {
532                    tracing::debug!(
533                        "Vote dependency {dependency_type:?} completed for view {view_number}, my \
534                         id is {id}"
535                    );
536                    return true;
537                }
538                false
539            }),
540        )
541    }
542
543    /// Create and store an [`AndDependency`] combining [`EventDependency`]s associated with the
544    /// given view number if it doesn't exist.
545    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create dependency task if new", level = "error")]
546    fn create_dependency_task_if_new(
547        &mut self,
548        view_number: TYPES::View,
549        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
550        event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
551        event: Arc<HotShotEvent<TYPES>>,
552    ) {
553        tracing::debug!(
554            "Attempting to make dependency task for view {view_number} and event {event:?}"
555        );
556
557        if self.vote_dependencies.contains_key(&view_number) {
558            return;
559        }
560
561        let (cancel_sender, cancel_receiver) = broadcast(1);
562
563        let mut quorum_proposal_dependency = self.create_event_dependency(
564            VoteDependency::QuorumProposal,
565            view_number,
566            event_receiver.clone(),
567            cancel_receiver.clone(),
568        );
569        let dac_dependency = self.create_event_dependency(
570            VoteDependency::Dac,
571            view_number,
572            event_receiver.clone(),
573            cancel_receiver.clone(),
574        );
575        let vid_dependency = self.create_event_dependency(
576            VoteDependency::Vid,
577            view_number,
578            event_receiver.clone(),
579            cancel_receiver.clone(),
580        );
581        // If we have an event provided to us
582        if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
583            quorum_proposal_dependency.mark_as_completed(event);
584        }
585
586        let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency];
587
588        let dependency_chain = AndDependency::from_deps(deps);
589
590        let dependency_task = DependencyTask::new(
591            dependency_chain,
592            VoteDependencyHandle::<TYPES, I, V> {
593                public_key: self.public_key.clone(),
594                private_key: self.private_key.clone(),
595                consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
596                instance_state: Arc::clone(&self.instance_state),
597                membership_coordinator: self.membership.clone(),
598                storage: self.storage.clone(),
599                storage_metrics: Arc::clone(&self.storage_metrics),
600                view_number,
601                sender: event_sender.clone(),
602                receiver: event_receiver.clone().deactivate(),
603                upgrade_lock: self.upgrade_lock.clone(),
604                id: self.id,
605                epoch_height: self.epoch_height,
606                consensus_metrics: Arc::clone(&self.consensus_metrics),
607                state_private_key: self.state_private_key.clone(),
608                first_epoch: self.first_epoch,
609                stake_table_capacity: self.stake_table_capacity,
610                cancel_receiver,
611            },
612        );
613        self.vote_dependencies.insert(view_number, cancel_sender);
614
615        dependency_task.run();
616    }
617
618    /// Update the latest voted view number.
619    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote update latest voted view", level = "error")]
620    async fn update_latest_voted_view(&mut self, new_view: TYPES::View) -> bool {
621        if *self.latest_voted_view < *new_view {
622            tracing::debug!(
623                "Updating next vote view from {} to {} in the quorum vote task",
624                *self.latest_voted_view,
625                *new_view
626            );
627
628            // Cancel the old dependency tasks.
629            for view in *self.latest_voted_view..(*new_view) {
630                let maybe_cancel_sender = self.vote_dependencies.remove(&TYPES::View::new(view));
631                if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
632                    tracing::error!("Aborting vote dependency task for view {view}");
633                    let _ = maybe_cancel_sender.unwrap().try_broadcast(());
634                }
635            }
636
637            // Update the metric for the last voted view
638            if let Ok(last_voted_view_usize) = usize::try_from(*new_view) {
639                self.consensus_metrics
640                    .last_voted_view
641                    .set(last_voted_view_usize);
642            } else {
643                tracing::warn!("Failed to convert last voted view to a usize: {new_view}");
644            }
645
646            self.latest_voted_view = new_view;
647
648            return true;
649        }
650        false
651    }
652
653    /// Handle a vote dependent event received on the event stream
654    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote handle", level = "error", target = "QuorumVoteTaskState")]
655    pub async fn handle(
656        &mut self,
657        event: Arc<HotShotEvent<TYPES>>,
658        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
659        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
660    ) -> Result<()> {
661        match event.as_ref() {
662            HotShotEvent::QuorumProposalValidated(proposal, _parent_leaf) => {
663                tracing::trace!(
664                    "Received Proposal for view {}",
665                    *proposal.data.view_number()
666                );
667
668                // Handle the event before creating the dependency task.
669                if let Err(e) =
670                    handle_quorum_proposal_validated(&proposal.data, self, &event_sender).await
671                {
672                    tracing::debug!(
673                        "Failed to handle QuorumProposalValidated event; error = {e:#}"
674                    );
675                }
676
677                ensure!(
678                    proposal.data.view_number() > self.latest_voted_view,
679                    "We have already voted for this view"
680                );
681
682                self.create_dependency_task_if_new(
683                    proposal.data.view_number(),
684                    event_receiver,
685                    &event_sender,
686                    Arc::clone(&event),
687                );
688            },
689            HotShotEvent::DaCertificateRecv(cert) => {
690                let view = cert.view_number;
691
692                tracing::trace!("Received DAC for view {view}");
693                // Do nothing if the DAC is old
694                ensure!(
695                    view > self.latest_voted_view,
696                    "Received DAC for an older view."
697                );
698
699                let cert_epoch = cert.data.epoch;
700
701                let epoch_membership = self.membership.stake_table_for_epoch(cert_epoch).await?;
702                let membership_da_stake_table = epoch_membership.da_stake_table().await;
703                let membership_da_success_threshold = epoch_membership.da_success_threshold().await;
704
705                // Validate the DAC.
706                cert.is_valid_cert(
707                    &StakeTableEntries::<TYPES>::from(membership_da_stake_table).0,
708                    membership_da_success_threshold,
709                    &self.upgrade_lock,
710                )
711                .await
712                .context(|e| warn!("Invalid DAC: {e}"))?;
713
714                // Add to the storage.
715                self.consensus
716                    .write()
717                    .await
718                    .update_saved_da_certs(view, cert.clone());
719
720                broadcast_event(
721                    Arc::new(HotShotEvent::DaCertificateValidated(cert.clone())),
722                    &event_sender.clone(),
723                )
724                .await;
725                self.create_dependency_task_if_new(
726                    view,
727                    event_receiver,
728                    &event_sender,
729                    Arc::clone(&event),
730                );
731            },
732            HotShotEvent::VidShareRecv(sender, share) => {
733                let view = share.data.view_number();
734                // Do nothing if the VID share is old
735                tracing::trace!("Received VID share for view {view}");
736                ensure!(
737                    view > self.latest_voted_view,
738                    "Received VID share for an older view."
739                );
740
741                // Validate the VID share.
742                let payload_commitment = share.data.payload_commitment_ref();
743
744                // Check that the signature is valid
745                ensure!(
746                    sender.validate(&share.signature, payload_commitment.as_ref()),
747                    "VID share signature is invalid, sender: {}, signature: {:?}, \
748                     payload_commitment: {:?}",
749                    sender,
750                    share.signature,
751                    payload_commitment
752                );
753
754                let vid_epoch = share.data.epoch();
755                let target_epoch = share.data.target_epoch();
756                let membership_reader = self.membership.membership_for_epoch(vid_epoch).await?;
757                // ensure that the VID share was sent by a DA member OR the view leader
758                ensure!(
759                    membership_reader
760                        .da_committee_members(view)
761                        .await
762                        .contains(sender)
763                        || *sender == membership_reader.leader(view).await?,
764                    "VID share was not sent by a DA member or the view leader."
765                );
766
767                let total_weight = vid_total_weight::<TYPES>(
768                    &self
769                        .membership
770                        .membership_for_epoch(target_epoch)
771                        .await?
772                        .stake_table()
773                        .await,
774                    target_epoch,
775                );
776
777                if let Err(()) = share.data.verify_share(total_weight) {
778                    bail!("Failed to verify VID share");
779                }
780
781                self.consensus
782                    .write()
783                    .await
784                    .update_vid_shares(view, share.clone());
785
786                ensure!(
787                    *share.data.recipient_key() == self.public_key,
788                    "Got a Valid VID share but it's not for our key"
789                );
790
791                broadcast_event(
792                    Arc::new(HotShotEvent::VidShareValidated(share.clone())),
793                    &event_sender.clone(),
794                )
795                .await;
796                self.create_dependency_task_if_new(
797                    view,
798                    event_receiver,
799                    &event_sender,
800                    Arc::clone(&event),
801                );
802            },
803            HotShotEvent::Timeout(view, ..) => {
804                let view = TYPES::View::new(view.saturating_sub(1));
805                // cancel old tasks
806                let current_tasks = self.vote_dependencies.split_off(&view);
807                while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
808                    if !cancel_sender.is_closed() {
809                        tracing::error!("Aborting vote dependency task for view {view}");
810                        let _ = cancel_sender.try_broadcast(());
811                    }
812                }
813                self.vote_dependencies = current_tasks;
814            },
815            HotShotEvent::ViewChange(mut view, _) => {
816                view = TYPES::View::new(view.saturating_sub(1));
817                if !self.update_latest_voted_view(view).await {
818                    tracing::debug!("view not updated");
819                }
820                // cancel old tasks
821                let current_tasks = self.vote_dependencies.split_off(&view);
822                while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
823                    if !cancel_sender.is_closed() {
824                        tracing::error!("Aborting vote dependency task for view {view}");
825                        let _ = cancel_sender.try_broadcast(());
826                    }
827                }
828                self.vote_dependencies = current_tasks;
829            },
830            HotShotEvent::SetFirstEpoch(view, epoch) => {
831                self.first_epoch = Some((*view, *epoch));
832            },
833            _ => {},
834        }
835        Ok(())
836    }
837}
838
839#[async_trait]
840impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
841    for QuorumVoteTaskState<TYPES, I, V>
842{
843    type Event = HotShotEvent<TYPES>;
844
845    async fn handle_event(
846        &mut self,
847        event: Arc<Self::Event>,
848        sender: &Sender<Arc<Self::Event>>,
849        receiver: &Receiver<Arc<Self::Event>>,
850    ) -> Result<()> {
851        self.handle(event, receiver.clone(), sender.clone()).await
852    }
853
854    fn cancel_subtasks(&mut self) {
855        while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
856            if !cancel_sender.is_closed() {
857                tracing::error!("Aborting vote dependency task for view {view}");
858                let _ = cancel_sender.try_broadcast(());
859            }
860        }
861    }
862}