hotshot_task_impls/
vote_collection.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{btree_map::Entry, BTreeMap, HashMap},
9    fmt::Debug,
10    future::Future,
11    marker::PhantomData,
12    sync::Arc,
13};
14
15use async_broadcast::Sender;
16use async_trait::async_trait;
17use either::Either::{Left, Right};
18use hotshot_types::{
19    epoch_membership::EpochMembership,
20    message::UpgradeLock,
21    simple_certificate::{
22        DaCertificate2, EpochRootQuorumCertificate, NextEpochQuorumCertificate2, QuorumCertificate,
23        QuorumCertificate2, TimeoutCertificate2, UpgradeCertificate, ViewSyncCommitCertificate2,
24        ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
25    },
26    simple_vote::{
27        DaVote2, EpochRootQuorumVote, NextEpochQuorumVote2, QuorumVote, QuorumVote2, TimeoutVote2,
28        UpgradeVote, ViewSyncCommitVote2, ViewSyncFinalizeVote2, ViewSyncPreCommitVote2,
29    },
30    traits::node_implementation::{ConsensusTime, NodeType, Versions},
31    utils::EpochTransitionIndicator,
32    vote::{
33        Certificate, HasViewNumber, LightClientStateUpdateVoteAccumulator, Vote, VoteAccumulator,
34    },
35};
36use hotshot_utils::anytrace::*;
37
38use crate::{events::HotShotEvent, helpers::broadcast_event};
39
40/// Alias for a map of Vote Collectors
41pub type VoteCollectorsMap<TYPES, VOTE, CERT, V> =
42    BTreeMap<<TYPES as NodeType>::View, VoteCollectionTaskState<TYPES, VOTE, CERT, V>>;
43
44/// Task state for collecting votes of one type and emitting a certificate
45pub struct VoteCollectionTaskState<
46    TYPES: NodeType,
47    VOTE: Vote<TYPES>,
48    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Debug,
49    V: Versions,
50> {
51    /// Public key for this node.
52    pub public_key: TYPES::SignatureKey,
53
54    /// Membership for voting
55    pub membership: EpochMembership<TYPES>,
56
57    /// accumulator handles aggregating the votes
58    pub accumulator: Option<VoteAccumulator<TYPES, VOTE, CERT, V>>,
59
60    /// The view which we are collecting votes for
61    pub view: TYPES::View,
62
63    /// Node id
64    pub id: u64,
65
66    /// Whether we should check if we are the leader when handling a vote
67    pub transition_indicator: EpochTransitionIndicator,
68}
69
70/// Describes the functions a vote must implement for it to be aggregatable by the generic vote collection task
71pub trait AggregatableVote<
72    TYPES: NodeType,
73    VOTE: Vote<TYPES>,
74    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>,
75>
76{
77    /// return the leader for this votes
78    ///
79    /// # Errors
80    /// if the leader cannot be calculated
81    fn leader(
82        &self,
83        membership: &EpochMembership<TYPES>,
84    ) -> impl Future<Output = Result<TYPES::SignatureKey>>;
85
86    /// return the Hotshot event for the completion of this CERT
87    fn make_cert_event(certificate: CERT, key: &TYPES::SignatureKey) -> HotShotEvent<TYPES>;
88}
89
90impl<
91        TYPES: NodeType,
92        VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
93        CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Clone + Debug,
94        V: Versions,
95    > VoteCollectionTaskState<TYPES, VOTE, CERT, V>
96{
97    /// Take one vote and accumulate it. Returns either the cert or the updated state
98    /// after the vote is accumulated
99    ///
100    /// # Errors
101    /// If are unable to accumulate the vote
102    #[allow(clippy::question_mark)]
103    pub async fn accumulate_vote(
104        &mut self,
105        vote: &VOTE,
106        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
107    ) -> Result<Option<CERT>> {
108        // TODO create this only once
109        ensure!(
110            matches!(
111                self.transition_indicator,
112                EpochTransitionIndicator::InTransition
113            ) || vote.leader(&self.membership).await? == self.public_key,
114            info!("Received vote for a view in which we were not the leader.")
115        );
116
117        ensure!(
118            vote.view_number() == self.view,
119            error!(
120                "Vote view does not match! vote view is {} current view is {}. This vote should \
121                 not have been passed to this accumulator.",
122                *vote.view_number(),
123                *self.view
124            )
125        );
126
127        let accumulator = self.accumulator.as_mut().context(warn!(
128            "No accumulator to handle vote with. This shouldn't happen."
129        ))?;
130
131        match accumulator.accumulate(vote, self.membership.clone()).await {
132            None => Ok(None),
133            Some(cert) => {
134                tracing::debug!("Certificate Formed! {cert:?}");
135
136                broadcast_event(
137                    Arc::new(VOTE::make_cert_event(cert.clone(), &self.public_key)),
138                    event_stream,
139                )
140                .await;
141                self.accumulator = None;
142
143                Ok(Some(cert))
144            },
145        }
146    }
147}
148
149/// Trait for types which will handle a vote event.
150#[async_trait]
151pub trait HandleVoteEvent<TYPES, VOTE, CERT>
152where
153    TYPES: NodeType,
154    VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
155    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Debug,
156{
157    /// Handle a vote event
158    ///
159    /// # Errors
160    /// Returns an error if we fail to handle the vote
161    async fn handle_vote_event(
162        &mut self,
163        event: Arc<HotShotEvent<TYPES>>,
164        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
165    ) -> Result<Option<CERT>>;
166
167    /// Event filter to use for this event
168    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool;
169}
170
171/// Info needed to create a vote accumulator task
172pub struct AccumulatorInfo<TYPES: NodeType> {
173    /// This nodes Pub Key
174    pub public_key: TYPES::SignatureKey,
175
176    /// Membership we are accumulation votes for
177    pub membership: EpochMembership<TYPES>,
178
179    /// View of the votes we are collecting
180    pub view: TYPES::View,
181
182    /// This nodes id
183    pub id: u64,
184}
185
186/// Generic function for spawning a vote task.  Returns the event stream id of the spawned task if created
187///
188/// # Errors
189/// If we failed to create the accumulator
190///
191/// # Panics
192/// Calls unwrap but should never panic.
193pub async fn create_vote_accumulator<TYPES, VOTE, CERT, V>(
194    info: &AccumulatorInfo<TYPES>,
195    event: Arc<HotShotEvent<TYPES>>,
196    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
197    upgrade_lock: UpgradeLock<TYPES, V>,
198    transition_indicator: EpochTransitionIndicator,
199) -> Result<VoteCollectionTaskState<TYPES, VOTE, CERT, V>>
200where
201    TYPES: NodeType,
202    VOTE: Vote<TYPES>
203        + AggregatableVote<TYPES, VOTE, CERT>
204        + std::marker::Send
205        + std::marker::Sync
206        + 'static,
207    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>
208        + Debug
209        + std::marker::Send
210        + std::marker::Sync
211        + 'static,
212    V: Versions,
213    VoteCollectionTaskState<TYPES, VOTE, CERT, V>: HandleVoteEvent<TYPES, VOTE, CERT>,
214{
215    let new_accumulator = VoteAccumulator {
216        vote_outcomes: HashMap::new(),
217        signers: HashMap::new(),
218        phantom: PhantomData,
219        upgrade_lock,
220    };
221
222    let mut state = VoteCollectionTaskState::<TYPES, VOTE, CERT, V> {
223        membership: info.membership.clone(),
224        public_key: info.public_key.clone(),
225        accumulator: Some(new_accumulator),
226        view: info.view,
227        id: info.id,
228        transition_indicator,
229    };
230
231    state.handle_vote_event(Arc::clone(&event), sender).await?;
232
233    Ok(state)
234}
235
236/// A helper function that handles a vote regardless whether it's the first vote in the view or not.
237///
238/// # Errors
239/// If we fail to handle the vote
240#[allow(clippy::too_many_arguments)]
241pub async fn handle_vote<
242    TYPES: NodeType,
243    VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT> + Send + Sync + 'static,
244    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>
245        + Debug
246        + Send
247        + Sync
248        + 'static,
249    V: Versions,
250>(
251    collectors: &mut VoteCollectorsMap<TYPES, VOTE, CERT, V>,
252    vote: &VOTE,
253    public_key: TYPES::SignatureKey,
254    membership: &EpochMembership<TYPES>,
255    id: u64,
256    event: &Arc<HotShotEvent<TYPES>>,
257    event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
258    upgrade_lock: &UpgradeLock<TYPES, V>,
259    transition_indicator: EpochTransitionIndicator,
260) -> Result<()>
261where
262    VoteCollectionTaskState<TYPES, VOTE, CERT, V>: HandleVoteEvent<TYPES, VOTE, CERT>,
263{
264    match collectors.entry(vote.view_number()) {
265        Entry::Vacant(entry) => {
266            tracing::debug!("Starting vote handle for view {:?}", vote.view_number());
267            let info = AccumulatorInfo {
268                public_key,
269                membership: membership.clone(),
270                view: vote.view_number(),
271                id,
272            };
273            let collector = create_vote_accumulator(
274                &info,
275                Arc::clone(event),
276                event_stream,
277                upgrade_lock.clone(),
278                transition_indicator,
279            )
280            .await?;
281
282            entry.insert(collector);
283
284            Ok(())
285        },
286        Entry::Occupied(mut entry) => {
287            // handle the vote, and garbage collect if the vote collector is finished
288            if entry
289                .get_mut()
290                .handle_vote_event(Arc::clone(event), event_stream)
291                .await?
292                .is_some()
293            {
294                entry.remove();
295                *collectors = collectors.split_off(&vote.view_number());
296            }
297
298            Ok(())
299        },
300    }
301}
302
303/// Alias for Quorum vote accumulator
304type QuorumVoteState<TYPES, V> =
305    VoteCollectionTaskState<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>, V>;
306/// Alias for Quorum vote accumulator
307type NextEpochQuorumVoteState<TYPES, V> = VoteCollectionTaskState<
308    TYPES,
309    NextEpochQuorumVote2<TYPES>,
310    NextEpochQuorumCertificate2<TYPES>,
311    V,
312>;
313/// Alias for DA vote accumulator
314type DaVoteState<TYPES, V> =
315    VoteCollectionTaskState<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>, V>;
316/// Alias for Timeout vote accumulator
317type TimeoutVoteState<TYPES, V> =
318    VoteCollectionTaskState<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>, V>;
319/// Alias for upgrade vote accumulator
320type UpgradeVoteState<TYPES, V> =
321    VoteCollectionTaskState<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>, V>;
322/// Alias for View Sync Pre Commit vote accumulator
323type ViewSyncPreCommitState<TYPES, V> = VoteCollectionTaskState<
324    TYPES,
325    ViewSyncPreCommitVote2<TYPES>,
326    ViewSyncPreCommitCertificate2<TYPES>,
327    V,
328>;
329/// Alias for View Sync Commit vote accumulator
330type ViewSyncCommitVoteState<TYPES, V> = VoteCollectionTaskState<
331    TYPES,
332    ViewSyncCommitVote2<TYPES>,
333    ViewSyncCommitCertificate2<TYPES>,
334    V,
335>;
336/// Alias for View Sync Finalize vote accumulator
337type ViewSyncFinalizeVoteState<TYPES, V> = VoteCollectionTaskState<
338    TYPES,
339    ViewSyncFinalizeVote2<TYPES>,
340    ViewSyncFinalizeCertificate2<TYPES>,
341    V,
342>;
343
344impl<TYPES: NodeType> AggregatableVote<TYPES, QuorumVote<TYPES>, QuorumCertificate<TYPES>>
345    for QuorumVote<TYPES>
346{
347    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
348        membership.leader(self.view_number() + 1).await
349    }
350    fn make_cert_event(
351        certificate: QuorumCertificate<TYPES>,
352        _key: &TYPES::SignatureKey,
353    ) -> HotShotEvent<TYPES> {
354        HotShotEvent::QcFormed(Left(certificate))
355    }
356}
357
358impl<TYPES: NodeType> AggregatableVote<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>
359    for QuorumVote2<TYPES>
360{
361    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
362        membership.leader(self.view_number() + 1).await
363    }
364    fn make_cert_event(
365        certificate: QuorumCertificate2<TYPES>,
366        _key: &TYPES::SignatureKey,
367    ) -> HotShotEvent<TYPES> {
368        HotShotEvent::Qc2Formed(Left(certificate))
369    }
370}
371
372impl<TYPES: NodeType>
373    AggregatableVote<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>
374    for NextEpochQuorumVote2<TYPES>
375{
376    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
377        let epoch = membership
378            .epoch
379            .map(|e| TYPES::Epoch::new(e.saturating_sub(1)));
380        membership
381            .get_new_epoch(epoch)
382            .await?
383            .leader(self.view_number() + 1)
384            .await
385    }
386    fn make_cert_event(
387        certificate: NextEpochQuorumCertificate2<TYPES>,
388        _key: &TYPES::SignatureKey,
389    ) -> HotShotEvent<TYPES> {
390        HotShotEvent::NextEpochQc2Formed(Left(certificate))
391    }
392}
393
394impl<TYPES: NodeType> AggregatableVote<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
395    for UpgradeVote<TYPES>
396{
397    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
398        membership.leader(self.view_number()).await
399    }
400    fn make_cert_event(
401        certificate: UpgradeCertificate<TYPES>,
402        _key: &TYPES::SignatureKey,
403    ) -> HotShotEvent<TYPES> {
404        HotShotEvent::UpgradeCertificateFormed(certificate)
405    }
406}
407
408impl<TYPES: NodeType> AggregatableVote<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>
409    for DaVote2<TYPES>
410{
411    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
412        membership.leader(self.view_number()).await
413    }
414    fn make_cert_event(
415        certificate: DaCertificate2<TYPES>,
416        key: &TYPES::SignatureKey,
417    ) -> HotShotEvent<TYPES> {
418        HotShotEvent::DacSend(certificate, key.clone())
419    }
420}
421
422impl<TYPES: NodeType> AggregatableVote<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>
423    for TimeoutVote2<TYPES>
424{
425    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
426        membership.leader(self.view_number() + 1).await
427    }
428    fn make_cert_event(
429        certificate: TimeoutCertificate2<TYPES>,
430        _key: &TYPES::SignatureKey,
431    ) -> HotShotEvent<TYPES> {
432        HotShotEvent::Qc2Formed(Right(certificate))
433    }
434}
435
436impl<TYPES: NodeType>
437    AggregatableVote<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>
438    for ViewSyncCommitVote2<TYPES>
439{
440    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
441        membership
442            .leader(self.date().round + self.date().relay)
443            .await
444    }
445    fn make_cert_event(
446        certificate: ViewSyncCommitCertificate2<TYPES>,
447        key: &TYPES::SignatureKey,
448    ) -> HotShotEvent<TYPES> {
449        HotShotEvent::ViewSyncCommitCertificateSend(certificate, key.clone())
450    }
451}
452
453impl<TYPES: NodeType>
454    AggregatableVote<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
455    for ViewSyncPreCommitVote2<TYPES>
456{
457    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
458        membership
459            .leader(self.date().round + self.date().relay)
460            .await
461    }
462    fn make_cert_event(
463        certificate: ViewSyncPreCommitCertificate2<TYPES>,
464        key: &TYPES::SignatureKey,
465    ) -> HotShotEvent<TYPES> {
466        HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, key.clone())
467    }
468}
469
470impl<TYPES: NodeType>
471    AggregatableVote<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
472    for ViewSyncFinalizeVote2<TYPES>
473{
474    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
475        membership
476            .leader(self.date().round + self.date().relay)
477            .await
478    }
479    fn make_cert_event(
480        certificate: ViewSyncFinalizeCertificate2<TYPES>,
481        key: &TYPES::SignatureKey,
482    ) -> HotShotEvent<TYPES> {
483        HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, key.clone())
484    }
485}
486
487// Handlers for all vote accumulators
488#[async_trait]
489impl<TYPES: NodeType, V: Versions>
490    HandleVoteEvent<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>
491    for QuorumVoteState<TYPES, V>
492{
493    async fn handle_vote_event(
494        &mut self,
495        event: Arc<HotShotEvent<TYPES>>,
496        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
497    ) -> Result<Option<QuorumCertificate2<TYPES>>> {
498        match event.as_ref() {
499            HotShotEvent::QuorumVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
500            _ => Ok(None),
501        }
502    }
503    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
504        matches!(event.as_ref(), HotShotEvent::QuorumVoteRecv(_))
505    }
506}
507
508// Handlers for all vote accumulators
509#[async_trait]
510impl<TYPES: NodeType, V: Versions>
511    HandleVoteEvent<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>
512    for NextEpochQuorumVoteState<TYPES, V>
513{
514    async fn handle_vote_event(
515        &mut self,
516        event: Arc<HotShotEvent<TYPES>>,
517        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
518    ) -> Result<Option<NextEpochQuorumCertificate2<TYPES>>> {
519        match event.as_ref() {
520            HotShotEvent::QuorumVoteRecv(vote) => {
521                // #3967 REVIEW NOTE: Should we error if self.epoch is None?
522                self.accumulate_vote(&vote.clone().into(), sender).await
523            },
524            _ => Ok(None),
525        }
526    }
527    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
528        matches!(event.as_ref(), HotShotEvent::QuorumVoteRecv(_))
529    }
530}
531
532// Handlers for all vote accumulators
533#[async_trait]
534impl<TYPES: NodeType, V: Versions>
535    HandleVoteEvent<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
536    for UpgradeVoteState<TYPES, V>
537{
538    async fn handle_vote_event(
539        &mut self,
540        event: Arc<HotShotEvent<TYPES>>,
541        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
542    ) -> Result<Option<UpgradeCertificate<TYPES>>> {
543        match event.as_ref() {
544            HotShotEvent::UpgradeVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
545            _ => Ok(None),
546        }
547    }
548    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
549        matches!(event.as_ref(), HotShotEvent::UpgradeVoteRecv(_))
550    }
551}
552
553#[async_trait]
554impl<TYPES: NodeType, V: Versions> HandleVoteEvent<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>
555    for DaVoteState<TYPES, V>
556{
557    async fn handle_vote_event(
558        &mut self,
559        event: Arc<HotShotEvent<TYPES>>,
560        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
561    ) -> Result<Option<DaCertificate2<TYPES>>> {
562        match event.as_ref() {
563            HotShotEvent::DaVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
564            _ => Ok(None),
565        }
566    }
567    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
568        matches!(event.as_ref(), HotShotEvent::DaVoteRecv(_))
569    }
570}
571
572#[async_trait]
573impl<TYPES: NodeType, V: Versions>
574    HandleVoteEvent<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>
575    for TimeoutVoteState<TYPES, V>
576{
577    async fn handle_vote_event(
578        &mut self,
579        event: Arc<HotShotEvent<TYPES>>,
580        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
581    ) -> Result<Option<TimeoutCertificate2<TYPES>>> {
582        match event.as_ref() {
583            HotShotEvent::TimeoutVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
584            _ => Ok(None),
585        }
586    }
587    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
588        matches!(event.as_ref(), HotShotEvent::TimeoutVoteRecv(_))
589    }
590}
591
592#[async_trait]
593impl<TYPES: NodeType, V: Versions>
594    HandleVoteEvent<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
595    for ViewSyncPreCommitState<TYPES, V>
596{
597    async fn handle_vote_event(
598        &mut self,
599        event: Arc<HotShotEvent<TYPES>>,
600        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
601    ) -> Result<Option<ViewSyncPreCommitCertificate2<TYPES>>> {
602        match event.as_ref() {
603            HotShotEvent::ViewSyncPreCommitVoteRecv(vote) => {
604                self.accumulate_vote(vote, sender).await
605            },
606            _ => Ok(None),
607        }
608    }
609    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
610        matches!(event.as_ref(), HotShotEvent::ViewSyncPreCommitVoteRecv(_))
611    }
612}
613
614#[async_trait]
615impl<TYPES: NodeType, V: Versions>
616    HandleVoteEvent<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>
617    for ViewSyncCommitVoteState<TYPES, V>
618{
619    async fn handle_vote_event(
620        &mut self,
621        event: Arc<HotShotEvent<TYPES>>,
622        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
623    ) -> Result<Option<ViewSyncCommitCertificate2<TYPES>>> {
624        match event.as_ref() {
625            HotShotEvent::ViewSyncCommitVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
626            _ => Ok(None),
627        }
628    }
629    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
630        matches!(event.as_ref(), HotShotEvent::ViewSyncCommitVoteRecv(_))
631    }
632}
633
634#[async_trait]
635impl<TYPES: NodeType, V: Versions>
636    HandleVoteEvent<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
637    for ViewSyncFinalizeVoteState<TYPES, V>
638{
639    async fn handle_vote_event(
640        &mut self,
641        event: Arc<HotShotEvent<TYPES>>,
642        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
643    ) -> Result<Option<ViewSyncFinalizeCertificate2<TYPES>>> {
644        match event.as_ref() {
645            HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
646                self.accumulate_vote(vote, sender).await
647            },
648            _ => Ok(None),
649        }
650    }
651    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
652        matches!(event.as_ref(), HotShotEvent::ViewSyncFinalizeVoteRecv(_))
653    }
654}
655
656/// A map for extended quorum vote collectors
657pub type EpochRootVoteCollectorsMap<TYPES, V> =
658    BTreeMap<<TYPES as NodeType>::View, EpochRootVoteCollectionTaskState<TYPES, V>>;
659
660pub struct EpochRootVoteCollectionTaskState<TYPES: NodeType, V: Versions> {
661    /// Public key for this node.
662    pub public_key: TYPES::SignatureKey,
663
664    /// Membership for voting
665    pub membership: EpochMembership<TYPES>,
666
667    /// accumulator for quorum votes
668    pub accumulator:
669        Option<VoteAccumulator<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>, V>>,
670
671    /// accumulator for light client state update votes
672    pub state_vote_accumulator: Option<LightClientStateUpdateVoteAccumulator<TYPES>>,
673
674    /// The view which we are collecting votes for
675    pub view: TYPES::View,
676
677    /// The epoch which we are collecting votes for
678    pub epoch: Option<TYPES::Epoch>,
679
680    /// Node id
681    pub id: u64,
682}
683
684// Handlers for extended quorum vote accumulators
685impl<TYPES: NodeType, V: Versions> EpochRootVoteCollectionTaskState<TYPES, V> {
686    /// Take one vote and accumulate it. Returns the certs once formed.
687    async fn handle_vote_event(
688        &mut self,
689        event: Arc<HotShotEvent<TYPES>>,
690        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
691    ) -> Result<Option<EpochRootQuorumCertificate<TYPES>>> {
692        match event.as_ref() {
693            HotShotEvent::EpochRootQuorumVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
694            _ => Ok(None),
695        }
696    }
697
698    /// Accumulate a vote and return the certificates if formed
699    async fn accumulate_vote(
700        &mut self,
701        vote: &EpochRootQuorumVote<TYPES>,
702        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
703    ) -> Result<Option<EpochRootQuorumCertificate<TYPES>>> {
704        let EpochRootQuorumVote { vote, state_vote } = vote;
705        ensure!(
706            vote.view_number() == self.view,
707            error!(
708                "Vote view does not match! vote view is {} current view is {}. This vote should \
709                 not have been passed to this accumulator.",
710                *vote.view_number(),
711                *self.view
712            )
713        );
714
715        let accumulator = self.accumulator.as_mut().context(warn!(
716            "No accumulator to handle extended quorum vote with. This shouldn't happen."
717        ))?;
718
719        let state_vote_accumulator = self.state_vote_accumulator.as_mut().context(warn!(
720            "No accumulator to handle light client state update vote with. This shouldn't happen."
721        ))?;
722
723        match (
724            accumulator.accumulate(vote, self.membership.clone()).await,
725            state_vote_accumulator
726                .accumulate(&vote.signing_key(), state_vote, &self.membership)
727                .await,
728        ) {
729            (None, None) => Ok(None),
730            (Some(cert), Some(state_cert)) => {
731                let root_qc = EpochRootQuorumCertificate {
732                    qc: cert,
733                    state_cert,
734                };
735
736                tracing::debug!("Certificate Formed! {root_qc:?}");
737
738                broadcast_event(
739                    Arc::new(HotShotEvent::EpochRootQcFormed(root_qc.clone())),
740                    event_stream,
741                )
742                .await;
743                self.accumulator = None;
744
745                Ok(Some(root_qc))
746            },
747            _ => Err(error!(
748                "Only one certificate formed for the epoch root, this should not happen."
749            )),
750        }
751    }
752}
753
754async fn create_epoch_root_vote_collection_task_state<TYPES: NodeType, V: Versions>(
755    info: &AccumulatorInfo<TYPES>,
756    event: Arc<HotShotEvent<TYPES>>,
757    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
758    upgrade_lock: UpgradeLock<TYPES, V>,
759) -> Result<EpochRootVoteCollectionTaskState<TYPES, V>> {
760    let new_accumulator =
761        VoteAccumulator::<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>, V> {
762            vote_outcomes: HashMap::new(),
763            signers: HashMap::new(),
764            phantom: PhantomData,
765            upgrade_lock,
766        };
767    let state_vote_accumulator = LightClientStateUpdateVoteAccumulator {
768        vote_outcomes: HashMap::new(),
769    };
770
771    let mut state = EpochRootVoteCollectionTaskState::<TYPES, V> {
772        membership: info.membership.clone(),
773        public_key: info.public_key.clone(),
774        accumulator: Some(new_accumulator),
775        state_vote_accumulator: Some(state_vote_accumulator),
776        view: info.view,
777        epoch: info.membership.epoch,
778        id: info.id,
779    };
780
781    state.handle_vote_event(Arc::clone(&event), sender).await?;
782
783    Ok(state)
784}
785
786/// A helper function that handles quorum vote collection for epoch root
787///
788/// # Errors
789/// If we fail to handle the vote
790#[allow(clippy::too_many_arguments)]
791pub async fn handle_epoch_root_vote<TYPES: NodeType, V: Versions>(
792    collectors: &mut EpochRootVoteCollectorsMap<TYPES, V>,
793    vote: &EpochRootQuorumVote<TYPES>,
794    public_key: TYPES::SignatureKey,
795    membership: &EpochMembership<TYPES>,
796    id: u64,
797    event: &Arc<HotShotEvent<TYPES>>,
798    event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
799    upgrade_lock: &UpgradeLock<TYPES, V>,
800) -> Result<()> {
801    match collectors.entry(vote.view_number()) {
802        Entry::Vacant(entry) => {
803            tracing::debug!(
804                "Starting epoch root quorum vote handle for view {:?}",
805                vote.view_number()
806            );
807            let info = AccumulatorInfo {
808                public_key,
809                membership: membership.clone(),
810                view: vote.view_number(),
811                id,
812            };
813            let collector = create_epoch_root_vote_collection_task_state(
814                &info,
815                Arc::clone(event),
816                event_stream,
817                upgrade_lock.clone(),
818            )
819            .await?;
820
821            entry.insert(collector);
822
823            Ok(())
824        },
825        Entry::Occupied(mut entry) => {
826            // handle the vote, and garbage collect if the vote collector is finished
827            if entry
828                .get_mut()
829                .handle_vote_event(Arc::clone(event), event_stream)
830                .await?
831                .is_some()
832            {
833                entry.remove();
834                *collectors = collectors.split_off(&vote.view_number());
835            }
836
837            Ok(())
838        },
839    }
840}