hotshot_task_impls/quorum_vote/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::{
13    dependency::{AndDependency, EventDependency},
14    dependency_task::{DependencyTask, HandleDepOutput},
15    task::TaskState,
16};
17use hotshot_types::{
18    consensus::{ConsensusMetricsValue, OuterConsensus},
19    data::{vid_disperse::vid_total_weight, Leaf2},
20    epoch_membership::EpochMembershipCoordinator,
21    event::Event,
22    message::UpgradeLock,
23    simple_vote::HasEpoch,
24    stake_table::StakeTableEntries,
25    storage_metrics::StorageMetricsValue,
26    traits::{
27        block_contents::BlockHeader,
28        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
29        signature_key::{SignatureKey, StateSignatureKey},
30        storage::Storage,
31    },
32    utils::{is_epoch_root, is_epoch_transition, is_last_block, option_epoch_from_block_number},
33    vote::{Certificate, HasViewNumber},
34    VersionedDaCommittee,
35};
36use hotshot_utils::anytrace::*;
37use tracing::instrument;
38
39use crate::{
40    events::HotShotEvent,
41    helpers::{broadcast_event, broadcast_view_change, wait_for_second_vid_share},
42    quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state},
43};
44
45/// Event handlers for `QuorumProposalValidated`.
46mod handlers;
47
48/// Vote dependency types.
49#[derive(Debug, PartialEq)]
50enum VoteDependency {
51    /// For the `QuorumProposalValidated` event after validating `QuorumProposalRecv`.
52    QuorumProposal,
53    /// For the `DaCertificateRecv` event.
54    Dac,
55    /// For the `VidShareRecv` event.
56    Vid,
57}
58
59/// Handler for the vote dependency.
60pub struct VoteDependencyHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
61    /// Public key.
62    pub public_key: TYPES::SignatureKey,
63
64    /// Private Key.
65    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
66
67    /// Reference to consensus. The replica will require a write lock on this.
68    pub consensus: OuterConsensus<TYPES>,
69
70    /// Immutable instance state
71    pub instance_state: Arc<TYPES::InstanceState>,
72
73    /// Membership for Quorum certs/votes.
74    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
75
76    /// Reference to the storage.
77    pub storage: I::Storage,
78
79    /// Storage metrics
80    pub storage_metrics: Arc<StorageMetricsValue>,
81
82    /// View number to vote on.
83    pub view_number: TYPES::View,
84
85    /// Event sender.
86    pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
87
88    /// Event receiver.
89    pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
90
91    /// Lock for a decided upgrade
92    pub upgrade_lock: UpgradeLock<TYPES, V>,
93
94    /// The consensus metrics
95    pub consensus_metrics: Arc<ConsensusMetricsValue>,
96
97    /// The node's id
98    pub id: u64,
99
100    /// Number of blocks in an epoch, zero means there are no epochs
101    pub epoch_height: u64,
102
103    /// Signature key for light client state
104    pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106    /// First view in which epoch version takes effect
107    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
108
109    /// Stake table capacity for light client use
110    pub stake_table_capacity: usize,
111
112    pub cancel_receiver: Receiver<()>,
113}
114
115impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> HandleDepOutput
116    for VoteDependencyHandle<TYPES, I, V>
117{
118    type Output = Vec<Arc<HotShotEvent<TYPES>>>;
119
120    #[allow(clippy::too_many_lines)]
121    #[instrument(skip_all, fields(id = self.id, view = *self.view_number))]
122    async fn handle_dep_result(self, res: Self::Output) {
123        let result = self.handle_vote_deps(&res).await;
124        if result.is_err() {
125            log!(result);
126            self.print_vote_events(&res)
127        }
128    }
129}
130
131impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
132    VoteDependencyHandle<TYPES, I, V>
133{
134    fn print_vote_events(&self, res: &[Arc<HotShotEvent<TYPES>>]) {
135        let events: Vec<_> = res.iter().map(Arc::as_ref).collect();
136        tracing::warn!("Failed to vote, events: {:?}", events);
137    }
138
139    async fn handle_vote_deps(&self, res: &[Arc<HotShotEvent<TYPES>>]) -> Result<()> {
140        let mut payload_commitment = None;
141        let mut next_epoch_payload_commitment = None;
142        let mut leaf = None;
143        let mut vid_share = None;
144        let mut da_cert = None;
145        let mut parent_view_number = None;
146        for event in res.iter() {
147            match event.as_ref() {
148                #[allow(unused_assignments)]
149                HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
150                    let proposal_payload_comm = proposal.data.block_header().payload_commitment();
151                    let parent_commitment = parent_leaf.commit();
152                    let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
153
154                    if let Some(ref comm) = payload_commitment {
155                        ensure!(
156                            proposal_payload_comm == *comm,
157                            error!(
158                                "Quorum proposal has inconsistent payload commitment with DAC or \
159                                 VID."
160                            )
161                        );
162                    } else {
163                        payload_commitment = Some(proposal_payload_comm);
164                    }
165
166                    ensure!(
167                        proposed_leaf.parent_commitment() == parent_commitment,
168                        warn!(
169                            "Proposed leaf parent commitment does not match parent leaf payload \
170                             commitment. Aborting vote."
171                        )
172                    );
173
174                    let now = Instant::now();
175                    // Update our persistent storage of the proposal. If we cannot store the proposal return
176                    // and error so we don't vote
177                    self.storage
178                        .append_proposal_wrapper(proposal)
179                        .await
180                        .map_err(|e| {
181                            error!("failed to store proposal, not voting.  error = {e:#}")
182                        })?;
183                    self.storage_metrics
184                        .append_quorum_duration
185                        .add_point(now.elapsed().as_secs_f64());
186
187                    leaf = Some(proposed_leaf);
188                    parent_view_number = Some(parent_leaf.view_number());
189                },
190                HotShotEvent::DaCertificateValidated(cert) => {
191                    let cert_payload_comm = &cert.data().payload_commit;
192                    let next_epoch_cert_payload_comm = cert.data().next_epoch_payload_commit;
193                    if let Some(ref comm) = payload_commitment {
194                        ensure!(
195                            cert_payload_comm == comm,
196                            error!(
197                                "DAC has inconsistent payload commitment with quorum proposal or \
198                                 VID."
199                            )
200                        );
201                    } else {
202                        payload_commitment = Some(*cert_payload_comm);
203                    }
204                    if next_epoch_payload_commitment.is_some()
205                        && next_epoch_payload_commitment != next_epoch_cert_payload_comm
206                    {
207                        bail!(error!(
208                            "DAC has inconsistent next epoch payload commitment with VID."
209                        ));
210                    } else {
211                        next_epoch_payload_commitment = next_epoch_cert_payload_comm;
212                    }
213                    da_cert = Some(cert.clone());
214                },
215                HotShotEvent::VidShareValidated(share) => {
216                    let vid_payload_commitment = &share.data.payload_commitment();
217                    vid_share = Some(share.clone());
218                    let is_next_epoch_vid = share.data.epoch() != share.data.target_epoch();
219                    if is_next_epoch_vid {
220                        if let Some(ref comm) = next_epoch_payload_commitment {
221                            ensure!(
222                                vid_payload_commitment == comm,
223                                error!(
224                                    "VID has inconsistent next epoch payload commitment with DAC."
225                                )
226                            );
227                        } else {
228                            next_epoch_payload_commitment = Some(*vid_payload_commitment);
229                        }
230                    } else if let Some(ref comm) = payload_commitment {
231                        ensure!(
232                            vid_payload_commitment == comm,
233                            error!(
234                                "VID has inconsistent payload commitment with quorum proposal or \
235                                 DAC."
236                            )
237                        );
238                    } else {
239                        payload_commitment = Some(*vid_payload_commitment);
240                    }
241                },
242                _ => {},
243            }
244        }
245
246        let Some(vid_share) = vid_share else {
247            bail!(error!(
248                "We don't have the VID share for this view {}, but we should, because the vote \
249                 dependencies have completed.",
250                self.view_number
251            ));
252        };
253
254        let Some(leaf) = leaf else {
255            bail!(error!(
256                "We don't have the leaf for this view {}, but we should, because the vote \
257                 dependencies have completed.",
258                self.view_number
259            ));
260        };
261
262        let Some(da_cert) = da_cert else {
263            bail!(error!(
264                "We don't have the DA cert for this view {}, but we should, because the vote \
265                 dependencies have completed.",
266                self.view_number
267            ));
268        };
269
270        let mut maybe_current_epoch_vid_share = None;
271        // If this is an epoch transition block, we might need two VID shares.
272        if self.upgrade_lock.epochs_enabled(leaf.view_number()).await
273            && is_epoch_transition(leaf.block_header().block_number(), self.epoch_height)
274        {
275            let current_epoch = option_epoch_from_block_number::<TYPES>(
276                leaf.with_epoch,
277                leaf.block_header().block_number(),
278                self.epoch_height,
279            );
280            let next_epoch = current_epoch.map(|e| e + 1);
281
282            let Ok(current_epoch_membership) = self
283                .membership_coordinator
284                .stake_table_for_epoch(current_epoch)
285                .await
286            else {
287                bail!(warn!(
288                    "Couldn't acquire current epoch membership. Do not vote!"
289                ));
290            };
291            let Ok(next_epoch_membership) = self
292                .membership_coordinator
293                .stake_table_for_epoch(next_epoch)
294                .await
295            else {
296                bail!(warn!(
297                    "Couldn't acquire next epoch membership. Do not vote!"
298                ));
299            };
300
301            // If we belong to both epochs, we require VID shares from both epochs.
302            if current_epoch_membership.has_stake(&self.public_key).await
303                && next_epoch_membership.has_stake(&self.public_key).await
304            {
305                let other_target_epoch = if vid_share.data.target_epoch() == current_epoch {
306                    maybe_current_epoch_vid_share = Some(vid_share.clone());
307                    next_epoch
308                } else {
309                    current_epoch
310                };
311                match wait_for_second_vid_share(
312                    other_target_epoch,
313                    &vid_share,
314                    &da_cert,
315                    &self.consensus,
316                    &self.receiver.activate_cloned(),
317                    self.cancel_receiver.clone(),
318                    self.id,
319                )
320                .await
321                {
322                    Ok(other_vid_share) => {
323                        if maybe_current_epoch_vid_share.is_none() {
324                            maybe_current_epoch_vid_share = Some(other_vid_share);
325                        }
326                        ensure!(
327                            leaf.block_header().payload_commitment()
328                                == maybe_current_epoch_vid_share
329                                    .as_ref()
330                                    .unwrap()
331                                    .data
332                                    .payload_commitment(),
333                            error!(
334                                "We have both epochs vid shares but the leaf's vid commit doesn't \
335                                 match the old epoch vid share's commit. It should never happen."
336                            )
337                        );
338                    },
339                    Err(e) => {
340                        bail!(warn!(
341                            "This is an epoch transition block, we are in both epochs but we \
342                             received only one VID share. Do not vote! Error: {e:?}"
343                        ));
344                    },
345                }
346            }
347        }
348
349        // Update internal state
350        update_shared_state::<TYPES, V>(
351            OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
352            self.sender.clone(),
353            self.receiver.clone(),
354            self.membership_coordinator.clone(),
355            self.public_key.clone(),
356            self.private_key.clone(),
357            self.upgrade_lock.clone(),
358            self.view_number,
359            Arc::clone(&self.instance_state),
360            &leaf,
361            maybe_current_epoch_vid_share.as_ref().unwrap_or(&vid_share),
362            parent_view_number,
363            self.epoch_height,
364        )
365        .await
366        .context(error!("Failed to update shared consensus state"))?;
367
368        let cur_epoch = option_epoch_from_block_number::<TYPES>(
369            leaf.with_epoch,
370            leaf.height(),
371            self.epoch_height,
372        );
373
374        let now = Instant::now();
375        // We use this `epoch_membership` to vote,
376        // meaning that we must know the leader for the current view in the current epoch
377        // and must therefore perform the full DRB catchup.
378        let epoch_membership = self
379            .membership_coordinator
380            .membership_for_epoch(cur_epoch)
381            .await?;
382
383        let duration = now.elapsed();
384        tracing::info!("membership_for_epoch time: {duration:?}");
385
386        let is_vote_leaf_extended = is_last_block(leaf.height(), self.epoch_height);
387        let is_vote_epoch_root = is_epoch_root(leaf.height(), self.epoch_height);
388        if cur_epoch.is_none() || !is_vote_leaf_extended {
389            // We're voting for the proposal that will probably form the eQC. We don't want to change
390            // the view here because we will probably change it when we form the eQC.
391            // The main reason is to handle view change event only once in the transaction task.
392            broadcast_view_change(
393                &self.sender,
394                leaf.view_number() + 1,
395                cur_epoch,
396                self.first_epoch,
397            )
398            .await;
399        }
400
401        let leader = epoch_membership.leader(self.view_number).await;
402        if let (Ok(leader_key), Some(cur_epoch)) = (leader, cur_epoch) {
403            self.consensus
404                .write()
405                .await
406                .update_validator_participation(leader_key, cur_epoch, true);
407        }
408
409        submit_vote::<TYPES, I, V>(
410            self.sender.clone(),
411            epoch_membership,
412            self.public_key.clone(),
413            self.private_key.clone(),
414            self.upgrade_lock.clone(),
415            self.view_number,
416            self.storage.clone(),
417            Arc::clone(&self.storage_metrics),
418            leaf,
419            maybe_current_epoch_vid_share.unwrap_or(vid_share),
420            is_vote_leaf_extended,
421            is_vote_epoch_root,
422            self.epoch_height,
423            &self.state_private_key,
424            self.stake_table_capacity,
425        )
426        .await
427    }
428}
429
430/// The state for the quorum vote task.
431///
432/// Contains all of the information for the quorum vote.
433pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
434    /// Public key.
435    pub public_key: TYPES::SignatureKey,
436
437    /// Private Key.
438    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
439
440    /// Reference to consensus. The replica will require a write lock on this.
441    pub consensus: OuterConsensus<TYPES>,
442
443    /// Immutable instance state
444    pub instance_state: Arc<TYPES::InstanceState>,
445
446    /// Latest view number that has been voted for.
447    pub latest_voted_view: TYPES::View,
448
449    /// Table for the in-progress dependency tasks.
450    pub vote_dependencies: BTreeMap<TYPES::View, Sender<()>>,
451
452    /// The underlying network
453    pub network: Arc<I::Network>,
454
455    /// Membership for Quorum certs/votes and DA committee certs/votes.
456    pub membership: EpochMembershipCoordinator<TYPES>,
457
458    /// Output events to application
459    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
460
461    /// The node's id
462    pub id: u64,
463
464    /// The consensus metrics
465    pub consensus_metrics: Arc<ConsensusMetricsValue>,
466
467    /// Reference to the storage.
468    pub storage: I::Storage,
469
470    /// Storage metrics
471    pub storage_metrics: Arc<StorageMetricsValue>,
472
473    /// Lock for a decided upgrade
474    pub upgrade_lock: UpgradeLock<TYPES, V>,
475
476    /// Number of blocks in an epoch, zero means there are no epochs
477    pub epoch_height: u64,
478
479    /// Signature key for light client state
480    pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
481
482    /// First view in which epoch version takes effect
483    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
484
485    /// Stake table capacity for light client use
486    pub stake_table_capacity: usize,
487
488    /// DA committees from HotShotConfig, to apply when an upgrade is decided
489    pub da_committees: Vec<VersionedDaCommittee<TYPES>>,
490}
491
492impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskState<TYPES, I, V> {
493    /// Create an event dependency.
494    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create event dependency", level = "error")]
495    fn create_event_dependency(
496        &self,
497        dependency_type: VoteDependency,
498        view_number: TYPES::View,
499        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
500        cancel_receiver: Receiver<()>,
501    ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
502        let id = self.id;
503        EventDependency::new(
504            event_receiver,
505            cancel_receiver,
506            format!(
507                "VoteDependency::{:?} for view {:?}, my id {:?}",
508                dependency_type, view_number, self.id
509            ),
510            Box::new(move |event| {
511                let event = event.as_ref();
512                let event_view = match dependency_type {
513                    VoteDependency::QuorumProposal => {
514                        if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
515                            proposal.data.view_number()
516                        } else {
517                            return false;
518                        }
519                    },
520                    VoteDependency::Dac => {
521                        if let HotShotEvent::DaCertificateValidated(cert) = event {
522                            cert.view_number
523                        } else {
524                            return false;
525                        }
526                    },
527                    VoteDependency::Vid => {
528                        if let HotShotEvent::VidShareValidated(disperse) = event {
529                            disperse.data.view_number()
530                        } else {
531                            return false;
532                        }
533                    },
534                };
535                if event_view == view_number {
536                    tracing::debug!(
537                        "Vote dependency {dependency_type:?} completed for view {view_number}, my \
538                         id is {id}"
539                    );
540                    return true;
541                }
542                false
543            }),
544        )
545    }
546
547    /// Create and store an [`AndDependency`] combining [`EventDependency`]s associated with the
548    /// given view number if it doesn't exist.
549    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create dependency task if new", level = "error")]
550    fn create_dependency_task_if_new(
551        &mut self,
552        view_number: TYPES::View,
553        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
554        event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
555        event: Arc<HotShotEvent<TYPES>>,
556    ) {
557        tracing::debug!(
558            "Attempting to make dependency task for view {view_number} and event {event:?}"
559        );
560
561        if self.vote_dependencies.contains_key(&view_number) {
562            return;
563        }
564
565        let (cancel_sender, cancel_receiver) = broadcast(1);
566
567        let mut quorum_proposal_dependency = self.create_event_dependency(
568            VoteDependency::QuorumProposal,
569            view_number,
570            event_receiver.clone(),
571            cancel_receiver.clone(),
572        );
573        let dac_dependency = self.create_event_dependency(
574            VoteDependency::Dac,
575            view_number,
576            event_receiver.clone(),
577            cancel_receiver.clone(),
578        );
579        let vid_dependency = self.create_event_dependency(
580            VoteDependency::Vid,
581            view_number,
582            event_receiver.clone(),
583            cancel_receiver.clone(),
584        );
585        // If we have an event provided to us
586        if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
587            quorum_proposal_dependency.mark_as_completed(event);
588        }
589
590        let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency];
591
592        let dependency_chain = AndDependency::from_deps(deps);
593
594        let dependency_task = DependencyTask::new(
595            dependency_chain,
596            VoteDependencyHandle::<TYPES, I, V> {
597                public_key: self.public_key.clone(),
598                private_key: self.private_key.clone(),
599                consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
600                instance_state: Arc::clone(&self.instance_state),
601                membership_coordinator: self.membership.clone(),
602                storage: self.storage.clone(),
603                storage_metrics: Arc::clone(&self.storage_metrics),
604                view_number,
605                sender: event_sender.clone(),
606                receiver: event_receiver.clone().deactivate(),
607                upgrade_lock: self.upgrade_lock.clone(),
608                id: self.id,
609                epoch_height: self.epoch_height,
610                consensus_metrics: Arc::clone(&self.consensus_metrics),
611                state_private_key: self.state_private_key.clone(),
612                first_epoch: self.first_epoch,
613                stake_table_capacity: self.stake_table_capacity,
614                cancel_receiver,
615            },
616        );
617        self.vote_dependencies.insert(view_number, cancel_sender);
618
619        dependency_task.run();
620    }
621
622    /// Update the latest voted view number.
623    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote update latest voted view", level = "error")]
624    async fn update_latest_voted_view(&mut self, new_view: TYPES::View) -> bool {
625        if *self.latest_voted_view < *new_view {
626            tracing::debug!(
627                "Updating next vote view from {} to {} in the quorum vote task",
628                *self.latest_voted_view,
629                *new_view
630            );
631
632            // Cancel the old dependency tasks.
633            for view in *self.latest_voted_view..(*new_view) {
634                let maybe_cancel_sender = self.vote_dependencies.remove(&TYPES::View::new(view));
635                if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
636                    tracing::error!("Aborting vote dependency task for view {view}");
637                    let _ = maybe_cancel_sender.unwrap().try_broadcast(());
638                }
639            }
640
641            // Update the metric for the last voted view
642            if let Ok(last_voted_view_usize) = usize::try_from(*new_view) {
643                self.consensus_metrics
644                    .last_voted_view
645                    .set(last_voted_view_usize);
646            } else {
647                tracing::warn!("Failed to convert last voted view to a usize: {new_view}");
648            }
649
650            self.latest_voted_view = new_view;
651
652            return true;
653        }
654        false
655    }
656
657    /// Handle a vote dependent event received on the event stream
658    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote handle", level = "error", target = "QuorumVoteTaskState")]
659    pub async fn handle(
660        &mut self,
661        event: Arc<HotShotEvent<TYPES>>,
662        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
663        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
664    ) -> Result<()> {
665        match event.as_ref() {
666            HotShotEvent::QuorumProposalValidated(proposal, _parent_leaf) => {
667                tracing::trace!(
668                    "Received Proposal for view {}",
669                    *proposal.data.view_number()
670                );
671
672                // Handle the event before creating the dependency task.
673                if let Err(e) =
674                    handle_quorum_proposal_validated(&proposal.data, self, &event_sender).await
675                {
676                    tracing::debug!(
677                        "Failed to handle QuorumProposalValidated event; error = {e:#}"
678                    );
679                }
680
681                ensure!(
682                    proposal.data.view_number() > self.latest_voted_view,
683                    "We have already voted for this view"
684                );
685
686                self.create_dependency_task_if_new(
687                    proposal.data.view_number(),
688                    event_receiver,
689                    &event_sender,
690                    Arc::clone(&event),
691                );
692            },
693            HotShotEvent::DaCertificateRecv(cert) => {
694                let view = cert.view_number;
695
696                tracing::trace!("Received DAC for view {view}");
697                // Do nothing if the DAC is old
698                ensure!(
699                    view > self.latest_voted_view,
700                    "Received DAC for an older view."
701                );
702
703                let cert_epoch = cert.data.epoch;
704
705                let epoch_membership = self.membership.stake_table_for_epoch(cert_epoch).await?;
706                let membership_da_stake_table = epoch_membership.da_stake_table().await;
707                let membership_da_success_threshold = epoch_membership.da_success_threshold().await;
708
709                // Validate the DAC.
710                cert.is_valid_cert(
711                    &StakeTableEntries::<TYPES>::from(membership_da_stake_table).0,
712                    membership_da_success_threshold,
713                    &self.upgrade_lock,
714                )
715                .await
716                .context(|e| warn!("Invalid DAC: {e}"))?;
717
718                // Add to the storage.
719                self.consensus
720                    .write()
721                    .await
722                    .update_saved_da_certs(view, cert.clone());
723
724                broadcast_event(
725                    Arc::new(HotShotEvent::DaCertificateValidated(cert.clone())),
726                    &event_sender.clone(),
727                )
728                .await;
729                self.create_dependency_task_if_new(
730                    view,
731                    event_receiver,
732                    &event_sender,
733                    Arc::clone(&event),
734                );
735            },
736            HotShotEvent::VidShareRecv(sender, share) => {
737                let view = share.data.view_number();
738                // Do nothing if the VID share is old
739                tracing::trace!("Received VID share for view {view}");
740                ensure!(
741                    view > self.latest_voted_view,
742                    "Received VID share for an older view."
743                );
744
745                // Validate the VID share.
746                let payload_commitment = share.data.payload_commitment_ref();
747
748                // Check that the signature is valid
749                ensure!(
750                    sender.validate(&share.signature, payload_commitment.as_ref()),
751                    error!(
752                        "VID share signature is invalid, sender: {}, signature: {:?}, \
753                         payload_commitment: {:?}",
754                        sender, share.signature, payload_commitment
755                    )
756                );
757
758                let vid_epoch = share.data.epoch();
759                let target_epoch = share.data.target_epoch();
760                let membership_reader = self.membership.membership_for_epoch(vid_epoch).await?;
761                // ensure that the VID share was sent by a DA member OR the view leader
762                ensure!(
763                    membership_reader
764                        .da_committee_members(view)
765                        .await
766                        .contains(sender)
767                        || *sender == membership_reader.leader(view).await?,
768                    "VID share was not sent by a DA member or the view leader."
769                );
770
771                let total_weight = vid_total_weight::<TYPES>(
772                    &self
773                        .membership
774                        .membership_for_epoch(target_epoch)
775                        .await?
776                        .stake_table()
777                        .await,
778                    target_epoch,
779                );
780
781                if let Err(()) = share.data.verify_share(total_weight) {
782                    bail!("Failed to verify VID share");
783                }
784
785                self.consensus
786                    .write()
787                    .await
788                    .update_vid_shares(view, share.clone());
789
790                ensure!(
791                    *share.data.recipient_key() == self.public_key,
792                    "Got a Valid VID share but it's not for our key"
793                );
794
795                broadcast_event(
796                    Arc::new(HotShotEvent::VidShareValidated(share.clone())),
797                    &event_sender.clone(),
798                )
799                .await;
800                self.create_dependency_task_if_new(
801                    view,
802                    event_receiver,
803                    &event_sender,
804                    Arc::clone(&event),
805                );
806            },
807            HotShotEvent::Timeout(view, ..) => {
808                let view = TYPES::View::new(view.saturating_sub(1));
809                // cancel old tasks
810                let current_tasks = self.vote_dependencies.split_off(&view);
811                while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
812                    if !cancel_sender.is_closed() {
813                        tracing::error!("Aborting vote dependency task for view {view}");
814                        let _ = cancel_sender.try_broadcast(());
815                    }
816                }
817                self.vote_dependencies = current_tasks;
818            },
819            HotShotEvent::ViewChange(mut view, _) => {
820                view = TYPES::View::new(view.saturating_sub(1));
821                if !self.update_latest_voted_view(view).await {
822                    tracing::debug!("view not updated");
823                }
824                // cancel old tasks
825                let current_tasks = self.vote_dependencies.split_off(&view);
826                while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
827                    if !cancel_sender.is_closed() {
828                        tracing::error!("Aborting vote dependency task for view {view}");
829                        let _ = cancel_sender.try_broadcast(());
830                    }
831                }
832                self.vote_dependencies = current_tasks;
833            },
834            HotShotEvent::SetFirstEpoch(view, epoch) => {
835                self.first_epoch = Some((*view, *epoch));
836            },
837            _ => {},
838        }
839        Ok(())
840    }
841}
842
843#[async_trait]
844impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
845    for QuorumVoteTaskState<TYPES, I, V>
846{
847    type Event = HotShotEvent<TYPES>;
848
849    async fn handle_event(
850        &mut self,
851        event: Arc<Self::Event>,
852        sender: &Sender<Arc<Self::Event>>,
853        receiver: &Receiver<Arc<Self::Event>>,
854    ) -> Result<()> {
855        self.handle(event, receiver.clone(), sender.clone()).await
856    }
857
858    fn cancel_subtasks(&mut self) {
859        while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
860            if !cancel_sender.is_closed() {
861                tracing::error!("Aborting vote dependency task for view {view}");
862                let _ = cancel_sender.try_broadcast(());
863            }
864        }
865    }
866}