hotshot_task_impls/
vote_collection.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap, btree_map::Entry},
9    fmt::Debug,
10    future::Future,
11    marker::PhantomData,
12    sync::Arc,
13};
14
15use async_broadcast::Sender;
16use async_trait::async_trait;
17use either::Either::{Left, Right};
18use hotshot_types::{
19    data::{EpochNumber, ViewNumber},
20    epoch_membership::EpochMembership,
21    message::UpgradeLock,
22    simple_certificate::{
23        DaCertificate2, EpochRootQuorumCertificateV2, NextEpochQuorumCertificate2,
24        QuorumCertificate, QuorumCertificate2, TimeoutCertificate2, UpgradeCertificate,
25        ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
26    },
27    simple_vote::{
28        DaVote2, EpochRootQuorumVote2, NextEpochQuorumVote2, QuorumVote, QuorumVote2, TimeoutVote2,
29        UpgradeVote, ViewSyncCommitVote2, ViewSyncFinalizeVote2, ViewSyncPreCommitVote2,
30    },
31    traits::node_implementation::NodeType,
32    utils::EpochTransitionIndicator,
33    vote::{
34        Certificate, HasViewNumber, LightClientStateUpdateVoteAccumulator, Vote, VoteAccumulator,
35    },
36};
37use hotshot_utils::anytrace::*;
38
39use crate::{events::HotShotEvent, helpers::broadcast_event};
40
41/// Alias for a map of Vote Collectors
42pub type VoteCollectorsMap<TYPES, VOTE, CERT> =
43    BTreeMap<ViewNumber, VoteCollectionTaskState<TYPES, VOTE, CERT>>;
44
45/// Task state for collecting votes of one type and emitting a certificate
46pub struct VoteCollectionTaskState<
47    TYPES: NodeType,
48    VOTE: Vote<TYPES>,
49    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Debug,
50> {
51    /// Public key for this node.
52    pub public_key: TYPES::SignatureKey,
53
54    /// Membership for voting
55    pub membership: EpochMembership<TYPES>,
56
57    /// accumulator handles aggregating the votes
58    pub accumulator: Option<VoteAccumulator<TYPES, VOTE, CERT>>,
59
60    /// The view which we are collecting votes for
61    pub view: ViewNumber,
62
63    /// Node id
64    pub id: u64,
65
66    /// Whether we should check if we are the leader when handling a vote
67    pub transition_indicator: EpochTransitionIndicator,
68}
69
70/// Describes the functions a vote must implement for it to be aggregatable by the generic vote collection task
71pub trait AggregatableVote<
72    TYPES: NodeType,
73    VOTE: Vote<TYPES>,
74    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>,
75>
76{
77    /// return the leader for this votes
78    ///
79    /// # Errors
80    /// if the leader cannot be calculated
81    fn leader(
82        &self,
83        membership: &EpochMembership<TYPES>,
84    ) -> impl Future<Output = Result<TYPES::SignatureKey>>;
85
86    /// return the Hotshot event for the completion of this CERT
87    fn make_cert_event(certificate: CERT, key: &TYPES::SignatureKey) -> HotShotEvent<TYPES>;
88}
89
90impl<
91    TYPES: NodeType,
92    VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
93    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Clone + Debug,
94> VoteCollectionTaskState<TYPES, VOTE, CERT>
95{
96    /// Take one vote and accumulate it. Returns either the cert or the updated state
97    /// after the vote is accumulated
98    ///
99    /// # Errors
100    /// If are unable to accumulate the vote
101    #[allow(clippy::question_mark)]
102    pub async fn accumulate_vote(
103        &mut self,
104        vote: &VOTE,
105        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
106    ) -> Result<Option<CERT>> {
107        // TODO create this only once
108        ensure!(
109            matches!(
110                self.transition_indicator,
111                EpochTransitionIndicator::InTransition
112            ) || vote.leader(&self.membership).await? == self.public_key,
113            info!("Received vote for a view in which we were not the leader.")
114        );
115
116        ensure!(
117            vote.view_number() == self.view,
118            error!(
119                "Vote view does not match! vote view is {} current view is {}. This vote should \
120                 not have been passed to this accumulator.",
121                *vote.view_number(),
122                *self.view
123            )
124        );
125
126        let accumulator = self.accumulator.as_mut().context(warn!(
127            "No accumulator to handle vote with. This shouldn't happen."
128        ))?;
129
130        match accumulator.accumulate(vote, self.membership.clone()).await {
131            None => Ok(None),
132            Some(cert) => {
133                tracing::debug!("Certificate Formed! {cert:?}");
134
135                broadcast_event(
136                    Arc::new(VOTE::make_cert_event(cert.clone(), &self.public_key)),
137                    event_stream,
138                )
139                .await;
140                self.accumulator = None;
141
142                Ok(Some(cert))
143            },
144        }
145    }
146}
147
148/// Trait for types which will handle a vote event.
149#[async_trait]
150pub trait HandleVoteEvent<TYPES, VOTE, CERT>
151where
152    TYPES: NodeType,
153    VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
154    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Debug,
155{
156    /// Handle a vote event
157    ///
158    /// # Errors
159    /// Returns an error if we fail to handle the vote
160    async fn handle_vote_event(
161        &mut self,
162        event: Arc<HotShotEvent<TYPES>>,
163        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
164    ) -> Result<Option<CERT>>;
165
166    /// Event filter to use for this event
167    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool;
168}
169
170/// Info needed to create a vote accumulator task
171pub struct AccumulatorInfo<TYPES: NodeType> {
172    /// This nodes Pub Key
173    pub public_key: TYPES::SignatureKey,
174
175    /// Membership we are accumulation votes for
176    pub membership: EpochMembership<TYPES>,
177
178    /// View of the votes we are collecting
179    pub view: ViewNumber,
180
181    /// This nodes id
182    pub id: u64,
183}
184
185/// Generic function for spawning a vote task.  Returns the event stream id of the spawned task if created
186///
187/// # Errors
188/// If we failed to create the accumulator
189///
190/// # Panics
191/// Calls unwrap but should never panic.
192pub async fn create_vote_accumulator<TYPES, VOTE, CERT>(
193    info: &AccumulatorInfo<TYPES>,
194    event: Arc<HotShotEvent<TYPES>>,
195    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
196    upgrade_lock: UpgradeLock<TYPES>,
197    transition_indicator: EpochTransitionIndicator,
198) -> Result<VoteCollectionTaskState<TYPES, VOTE, CERT>>
199where
200    TYPES: NodeType,
201    VOTE: Vote<TYPES>
202        + AggregatableVote<TYPES, VOTE, CERT>
203        + std::marker::Send
204        + std::marker::Sync
205        + 'static,
206    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>
207        + Debug
208        + std::marker::Send
209        + std::marker::Sync
210        + 'static,
211    VoteCollectionTaskState<TYPES, VOTE, CERT>: HandleVoteEvent<TYPES, VOTE, CERT>,
212{
213    let new_accumulator = VoteAccumulator {
214        vote_outcomes: HashMap::new(),
215        signers: HashMap::new(),
216        phantom: PhantomData,
217        upgrade_lock,
218    };
219
220    let mut state = VoteCollectionTaskState::<TYPES, VOTE, CERT> {
221        membership: info.membership.clone(),
222        public_key: info.public_key.clone(),
223        accumulator: Some(new_accumulator),
224        view: info.view,
225        id: info.id,
226        transition_indicator,
227    };
228
229    state.handle_vote_event(Arc::clone(&event), sender).await?;
230
231    Ok(state)
232}
233
234/// A helper function that handles a vote regardless whether it's the first vote in the view or not.
235///
236/// # Errors
237/// If we fail to handle the vote
238#[allow(clippy::too_many_arguments)]
239pub async fn handle_vote<
240    TYPES: NodeType,
241    VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT> + Send + Sync + 'static,
242    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>
243        + Debug
244        + Send
245        + Sync
246        + 'static,
247>(
248    collectors: &mut VoteCollectorsMap<TYPES, VOTE, CERT>,
249    vote: &VOTE,
250    public_key: TYPES::SignatureKey,
251    membership: &EpochMembership<TYPES>,
252    id: u64,
253    event: &Arc<HotShotEvent<TYPES>>,
254    event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
255    upgrade_lock: &UpgradeLock<TYPES>,
256    transition_indicator: EpochTransitionIndicator,
257) -> Result<()>
258where
259    VoteCollectionTaskState<TYPES, VOTE, CERT>: HandleVoteEvent<TYPES, VOTE, CERT>,
260{
261    match collectors.entry(vote.view_number()) {
262        Entry::Vacant(entry) => {
263            tracing::debug!("Starting vote handle for view {:?}", vote.view_number());
264            let info = AccumulatorInfo {
265                public_key,
266                membership: membership.clone(),
267                view: vote.view_number(),
268                id,
269            };
270            let collector = create_vote_accumulator(
271                &info,
272                Arc::clone(event),
273                event_stream,
274                upgrade_lock.clone(),
275                transition_indicator,
276            )
277            .await?;
278
279            entry.insert(collector);
280
281            Ok(())
282        },
283        Entry::Occupied(mut entry) => {
284            // handle the vote, and garbage collect if the vote collector is finished
285            if entry
286                .get_mut()
287                .handle_vote_event(Arc::clone(event), event_stream)
288                .await?
289                .is_some()
290            {
291                entry.remove();
292                *collectors = collectors.split_off(&vote.view_number());
293            }
294
295            Ok(())
296        },
297    }
298}
299
300/// Alias for Quorum vote accumulator
301type QuorumVoteState<TYPES> =
302    VoteCollectionTaskState<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>;
303/// Alias for Quorum vote accumulator
304type NextEpochQuorumVoteState<TYPES> =
305    VoteCollectionTaskState<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>;
306/// Alias for DA vote accumulator
307type DaVoteState<TYPES> = VoteCollectionTaskState<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>;
308/// Alias for Timeout vote accumulator
309type TimeoutVoteState<TYPES> =
310    VoteCollectionTaskState<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>;
311/// Alias for upgrade vote accumulator
312type UpgradeVoteState<TYPES> =
313    VoteCollectionTaskState<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>;
314/// Alias for View Sync Pre Commit vote accumulator
315type ViewSyncPreCommitState<TYPES> = VoteCollectionTaskState<
316    TYPES,
317    ViewSyncPreCommitVote2<TYPES>,
318    ViewSyncPreCommitCertificate2<TYPES>,
319>;
320/// Alias for View Sync Commit vote accumulator
321type ViewSyncCommitVoteState<TYPES> =
322    VoteCollectionTaskState<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>;
323/// Alias for View Sync Finalize vote accumulator
324type ViewSyncFinalizeVoteState<TYPES> = VoteCollectionTaskState<
325    TYPES,
326    ViewSyncFinalizeVote2<TYPES>,
327    ViewSyncFinalizeCertificate2<TYPES>,
328>;
329
330impl<TYPES: NodeType> AggregatableVote<TYPES, QuorumVote<TYPES>, QuorumCertificate<TYPES>>
331    for QuorumVote<TYPES>
332{
333    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
334        membership.leader(self.view_number() + 1).await
335    }
336    fn make_cert_event(
337        certificate: QuorumCertificate<TYPES>,
338        _key: &TYPES::SignatureKey,
339    ) -> HotShotEvent<TYPES> {
340        HotShotEvent::QcFormed(Left(certificate))
341    }
342}
343
344impl<TYPES: NodeType> AggregatableVote<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>
345    for QuorumVote2<TYPES>
346{
347    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
348        membership.leader(self.view_number() + 1).await
349    }
350    fn make_cert_event(
351        certificate: QuorumCertificate2<TYPES>,
352        _key: &TYPES::SignatureKey,
353    ) -> HotShotEvent<TYPES> {
354        HotShotEvent::Qc2Formed(Left(certificate))
355    }
356}
357
358impl<TYPES: NodeType>
359    AggregatableVote<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>
360    for NextEpochQuorumVote2<TYPES>
361{
362    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
363        let epoch = membership
364            .epoch
365            .map(|e| EpochNumber::new(e.saturating_sub(1)));
366        membership
367            .get_new_epoch(epoch)
368            .await?
369            .leader(self.view_number() + 1)
370            .await
371    }
372    fn make_cert_event(
373        certificate: NextEpochQuorumCertificate2<TYPES>,
374        _key: &TYPES::SignatureKey,
375    ) -> HotShotEvent<TYPES> {
376        HotShotEvent::NextEpochQc2Formed(Left(certificate))
377    }
378}
379
380impl<TYPES: NodeType> AggregatableVote<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
381    for UpgradeVote<TYPES>
382{
383    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
384        membership.leader(self.view_number()).await
385    }
386    fn make_cert_event(
387        certificate: UpgradeCertificate<TYPES>,
388        _key: &TYPES::SignatureKey,
389    ) -> HotShotEvent<TYPES> {
390        HotShotEvent::UpgradeCertificateFormed(certificate)
391    }
392}
393
394impl<TYPES: NodeType> AggregatableVote<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>
395    for DaVote2<TYPES>
396{
397    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
398        membership.leader(self.view_number()).await
399    }
400    fn make_cert_event(
401        certificate: DaCertificate2<TYPES>,
402        key: &TYPES::SignatureKey,
403    ) -> HotShotEvent<TYPES> {
404        HotShotEvent::DacSend(certificate, key.clone())
405    }
406}
407
408impl<TYPES: NodeType> AggregatableVote<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>
409    for TimeoutVote2<TYPES>
410{
411    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
412        membership.leader(self.view_number() + 1).await
413    }
414    fn make_cert_event(
415        certificate: TimeoutCertificate2<TYPES>,
416        _key: &TYPES::SignatureKey,
417    ) -> HotShotEvent<TYPES> {
418        HotShotEvent::Qc2Formed(Right(certificate))
419    }
420}
421
422impl<TYPES: NodeType>
423    AggregatableVote<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>
424    for ViewSyncCommitVote2<TYPES>
425{
426    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
427        membership
428            .leader(self.date().round + self.date().relay)
429            .await
430    }
431    fn make_cert_event(
432        certificate: ViewSyncCommitCertificate2<TYPES>,
433        key: &TYPES::SignatureKey,
434    ) -> HotShotEvent<TYPES> {
435        HotShotEvent::ViewSyncCommitCertificateSend(certificate, key.clone())
436    }
437}
438
439impl<TYPES: NodeType>
440    AggregatableVote<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
441    for ViewSyncPreCommitVote2<TYPES>
442{
443    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
444        membership
445            .leader(self.date().round + self.date().relay)
446            .await
447    }
448    fn make_cert_event(
449        certificate: ViewSyncPreCommitCertificate2<TYPES>,
450        key: &TYPES::SignatureKey,
451    ) -> HotShotEvent<TYPES> {
452        HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, key.clone())
453    }
454}
455
456impl<TYPES: NodeType>
457    AggregatableVote<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
458    for ViewSyncFinalizeVote2<TYPES>
459{
460    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
461        membership
462            .leader(self.date().round + self.date().relay)
463            .await
464    }
465    fn make_cert_event(
466        certificate: ViewSyncFinalizeCertificate2<TYPES>,
467        key: &TYPES::SignatureKey,
468    ) -> HotShotEvent<TYPES> {
469        HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, key.clone())
470    }
471}
472
473// Handlers for all vote accumulators
474#[async_trait]
475impl<TYPES: NodeType> HandleVoteEvent<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>
476    for QuorumVoteState<TYPES>
477{
478    async fn handle_vote_event(
479        &mut self,
480        event: Arc<HotShotEvent<TYPES>>,
481        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
482    ) -> Result<Option<QuorumCertificate2<TYPES>>> {
483        match event.as_ref() {
484            HotShotEvent::QuorumVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
485            _ => Ok(None),
486        }
487    }
488    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
489        matches!(event.as_ref(), HotShotEvent::QuorumVoteRecv(_))
490    }
491}
492
493// Handlers for all vote accumulators
494#[async_trait]
495impl<TYPES: NodeType>
496    HandleVoteEvent<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>
497    for NextEpochQuorumVoteState<TYPES>
498{
499    async fn handle_vote_event(
500        &mut self,
501        event: Arc<HotShotEvent<TYPES>>,
502        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
503    ) -> Result<Option<NextEpochQuorumCertificate2<TYPES>>> {
504        match event.as_ref() {
505            HotShotEvent::QuorumVoteRecv(vote) => {
506                // #3967 REVIEW NOTE: Should we error if self.epoch is None?
507                self.accumulate_vote(&vote.clone().into(), sender).await
508            },
509            _ => Ok(None),
510        }
511    }
512    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
513        matches!(event.as_ref(), HotShotEvent::QuorumVoteRecv(_))
514    }
515}
516
517// Handlers for all vote accumulators
518#[async_trait]
519impl<TYPES: NodeType> HandleVoteEvent<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
520    for UpgradeVoteState<TYPES>
521{
522    async fn handle_vote_event(
523        &mut self,
524        event: Arc<HotShotEvent<TYPES>>,
525        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
526    ) -> Result<Option<UpgradeCertificate<TYPES>>> {
527        match event.as_ref() {
528            HotShotEvent::UpgradeVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
529            _ => Ok(None),
530        }
531    }
532    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
533        matches!(event.as_ref(), HotShotEvent::UpgradeVoteRecv(_))
534    }
535}
536
537#[async_trait]
538impl<TYPES: NodeType> HandleVoteEvent<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>
539    for DaVoteState<TYPES>
540{
541    async fn handle_vote_event(
542        &mut self,
543        event: Arc<HotShotEvent<TYPES>>,
544        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
545    ) -> Result<Option<DaCertificate2<TYPES>>> {
546        match event.as_ref() {
547            HotShotEvent::DaVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
548            _ => Ok(None),
549        }
550    }
551    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
552        matches!(event.as_ref(), HotShotEvent::DaVoteRecv(_))
553    }
554}
555
556#[async_trait]
557impl<TYPES: NodeType> HandleVoteEvent<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>
558    for TimeoutVoteState<TYPES>
559{
560    async fn handle_vote_event(
561        &mut self,
562        event: Arc<HotShotEvent<TYPES>>,
563        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
564    ) -> Result<Option<TimeoutCertificate2<TYPES>>> {
565        match event.as_ref() {
566            HotShotEvent::TimeoutVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
567            _ => Ok(None),
568        }
569    }
570    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
571        matches!(event.as_ref(), HotShotEvent::TimeoutVoteRecv(_))
572    }
573}
574
575#[async_trait]
576impl<TYPES: NodeType>
577    HandleVoteEvent<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
578    for ViewSyncPreCommitState<TYPES>
579{
580    async fn handle_vote_event(
581        &mut self,
582        event: Arc<HotShotEvent<TYPES>>,
583        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
584    ) -> Result<Option<ViewSyncPreCommitCertificate2<TYPES>>> {
585        match event.as_ref() {
586            HotShotEvent::ViewSyncPreCommitVoteRecv(vote) => {
587                self.accumulate_vote(vote, sender).await
588            },
589            _ => Ok(None),
590        }
591    }
592    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
593        matches!(event.as_ref(), HotShotEvent::ViewSyncPreCommitVoteRecv(_))
594    }
595}
596
597#[async_trait]
598impl<TYPES: NodeType>
599    HandleVoteEvent<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>
600    for ViewSyncCommitVoteState<TYPES>
601{
602    async fn handle_vote_event(
603        &mut self,
604        event: Arc<HotShotEvent<TYPES>>,
605        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
606    ) -> Result<Option<ViewSyncCommitCertificate2<TYPES>>> {
607        match event.as_ref() {
608            HotShotEvent::ViewSyncCommitVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
609            _ => Ok(None),
610        }
611    }
612    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
613        matches!(event.as_ref(), HotShotEvent::ViewSyncCommitVoteRecv(_))
614    }
615}
616
617#[async_trait]
618impl<TYPES: NodeType>
619    HandleVoteEvent<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
620    for ViewSyncFinalizeVoteState<TYPES>
621{
622    async fn handle_vote_event(
623        &mut self,
624        event: Arc<HotShotEvent<TYPES>>,
625        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
626    ) -> Result<Option<ViewSyncFinalizeCertificate2<TYPES>>> {
627        match event.as_ref() {
628            HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
629                self.accumulate_vote(vote, sender).await
630            },
631            _ => Ok(None),
632        }
633    }
634    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
635        matches!(event.as_ref(), HotShotEvent::ViewSyncFinalizeVoteRecv(_))
636    }
637}
638
639/// A map for extended quorum vote collectors
640pub type EpochRootVoteCollectorsMap<TYPES> =
641    BTreeMap<ViewNumber, EpochRootVoteCollectionTaskState<TYPES>>;
642
643pub struct EpochRootVoteCollectionTaskState<TYPES: NodeType> {
644    /// Public key for this node.
645    pub public_key: TYPES::SignatureKey,
646
647    /// Membership for voting
648    pub membership: EpochMembership<TYPES>,
649
650    /// accumulator for quorum votes
651    pub accumulator: Option<VoteAccumulator<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>>,
652
653    /// accumulator for light client state update votes
654    pub state_vote_accumulator: Option<LightClientStateUpdateVoteAccumulator<TYPES>>,
655
656    /// The view which we are collecting votes for
657    pub view: ViewNumber,
658
659    /// The epoch which we are collecting votes for
660    pub epoch: Option<EpochNumber>,
661
662    /// Node id
663    pub id: u64,
664}
665
666// Handlers for extended quorum vote accumulators
667impl<TYPES: NodeType> EpochRootVoteCollectionTaskState<TYPES> {
668    /// Take one vote and accumulate it. Returns the certs once formed.
669    async fn handle_vote_event(
670        &mut self,
671        event: Arc<HotShotEvent<TYPES>>,
672        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
673    ) -> Result<Option<EpochRootQuorumCertificateV2<TYPES>>> {
674        match event.as_ref() {
675            HotShotEvent::EpochRootQuorumVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
676            _ => Ok(None),
677        }
678    }
679
680    /// Accumulate a vote and return the certificates if formed
681    async fn accumulate_vote(
682        &mut self,
683        vote: &EpochRootQuorumVote2<TYPES>,
684        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
685    ) -> Result<Option<EpochRootQuorumCertificateV2<TYPES>>> {
686        let EpochRootQuorumVote2 { vote, state_vote } = vote;
687        ensure!(
688            vote.view_number() == self.view,
689            error!(
690                "Vote view does not match! vote view is {} current view is {}. This vote should \
691                 not have been passed to this accumulator.",
692                *vote.view_number(),
693                *self.view
694            )
695        );
696
697        let accumulator = self.accumulator.as_mut().context(warn!(
698            "No accumulator to handle extended quorum vote with. This shouldn't happen."
699        ))?;
700
701        let state_vote_accumulator = self.state_vote_accumulator.as_mut().context(warn!(
702            "No accumulator to handle light client state update vote with. This shouldn't happen."
703        ))?;
704
705        match (
706            accumulator.accumulate(vote, self.membership.clone()).await,
707            state_vote_accumulator
708                .accumulate(&vote.signing_key(), state_vote, &self.membership)
709                .await,
710        ) {
711            (None, None) => Ok(None),
712            (Some(cert), Some(state_cert)) => {
713                let root_qc = EpochRootQuorumCertificateV2 {
714                    qc: cert,
715                    state_cert,
716                };
717
718                tracing::debug!("Certificate Formed! {root_qc:?}");
719
720                broadcast_event(
721                    Arc::new(HotShotEvent::EpochRootQcFormed(root_qc.clone())),
722                    event_stream,
723                )
724                .await;
725                self.accumulator = None;
726
727                Ok(Some(root_qc))
728            },
729            _ => Err(error!(
730                "Only one certificate formed for the epoch root, this should not happen."
731            )),
732        }
733    }
734}
735
736async fn create_epoch_root_vote_collection_task_state<TYPES: NodeType>(
737    info: &AccumulatorInfo<TYPES>,
738    event: Arc<HotShotEvent<TYPES>>,
739    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
740    upgrade_lock: UpgradeLock<TYPES>,
741) -> Result<EpochRootVoteCollectionTaskState<TYPES>> {
742    let new_accumulator = VoteAccumulator::<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>> {
743        vote_outcomes: HashMap::new(),
744        signers: HashMap::new(),
745        phantom: PhantomData,
746        upgrade_lock: upgrade_lock.clone(),
747    };
748    let state_vote_accumulator = LightClientStateUpdateVoteAccumulator {
749        vote_outcomes: HashMap::new(),
750        upgrade_lock,
751    };
752
753    let mut state = EpochRootVoteCollectionTaskState::<TYPES> {
754        membership: info.membership.clone(),
755        public_key: info.public_key.clone(),
756        accumulator: Some(new_accumulator),
757        state_vote_accumulator: Some(state_vote_accumulator),
758        view: info.view,
759        epoch: info.membership.epoch,
760        id: info.id,
761    };
762
763    state.handle_vote_event(Arc::clone(&event), sender).await?;
764
765    Ok(state)
766}
767
768/// A helper function that handles quorum vote collection for epoch root
769///
770/// # Errors
771/// If we fail to handle the vote
772#[allow(clippy::too_many_arguments)]
773pub async fn handle_epoch_root_vote<TYPES: NodeType>(
774    collectors: &mut EpochRootVoteCollectorsMap<TYPES>,
775    vote: &EpochRootQuorumVote2<TYPES>,
776    public_key: TYPES::SignatureKey,
777    membership: &EpochMembership<TYPES>,
778    id: u64,
779    event: &Arc<HotShotEvent<TYPES>>,
780    event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
781    upgrade_lock: &UpgradeLock<TYPES>,
782) -> Result<()> {
783    match collectors.entry(vote.view_number()) {
784        Entry::Vacant(entry) => {
785            tracing::debug!(
786                "Starting epoch root quorum vote handle for view {:?}",
787                vote.view_number()
788            );
789            let info = AccumulatorInfo {
790                public_key,
791                membership: membership.clone(),
792                view: vote.view_number(),
793                id,
794            };
795            let collector = create_epoch_root_vote_collection_task_state(
796                &info,
797                Arc::clone(event),
798                event_stream,
799                upgrade_lock.clone(),
800            )
801            .await?;
802
803            entry.insert(collector);
804
805            Ok(())
806        },
807        Entry::Occupied(mut entry) => {
808            // handle the vote, and garbage collect if the vote collector is finished
809            if entry
810                .get_mut()
811                .handle_vote_event(Arc::clone(event), event_stream)
812                .await?
813                .is_some()
814            {
815                entry.remove();
816                *collectors = collectors.split_off(&vote.view_number());
817            }
818
819            Ok(())
820        },
821    }
822}