1use std::{
8 collections::{BTreeMap, HashMap, btree_map::Entry},
9 fmt::Debug,
10 future::Future,
11 marker::PhantomData,
12 sync::Arc,
13};
14
15use async_broadcast::Sender;
16use async_trait::async_trait;
17use either::Either::{Left, Right};
18use hotshot_types::{
19 data::{EpochNumber, ViewNumber},
20 epoch_membership::EpochMembership,
21 message::UpgradeLock,
22 simple_certificate::{
23 DaCertificate2, EpochRootQuorumCertificateV2, NextEpochQuorumCertificate2,
24 QuorumCertificate, QuorumCertificate2, TimeoutCertificate2, UpgradeCertificate,
25 ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
26 },
27 simple_vote::{
28 DaVote2, EpochRootQuorumVote2, NextEpochQuorumVote2, QuorumVote, QuorumVote2, TimeoutVote2,
29 UpgradeVote, ViewSyncCommitVote2, ViewSyncFinalizeVote2, ViewSyncPreCommitVote2,
30 },
31 traits::node_implementation::NodeType,
32 utils::EpochTransitionIndicator,
33 vote::{
34 Certificate, HasViewNumber, LightClientStateUpdateVoteAccumulator, Vote, VoteAccumulator,
35 },
36};
37use hotshot_utils::anytrace::*;
38
39use crate::{events::HotShotEvent, helpers::broadcast_event};
40
41pub type VoteCollectorsMap<TYPES, VOTE, CERT> =
43 BTreeMap<ViewNumber, VoteCollectionTaskState<TYPES, VOTE, CERT>>;
44
45pub struct VoteCollectionTaskState<
47 TYPES: NodeType,
48 VOTE: Vote<TYPES>,
49 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Debug,
50> {
51 pub public_key: TYPES::SignatureKey,
53
54 pub membership: EpochMembership<TYPES>,
56
57 pub accumulator: Option<VoteAccumulator<TYPES, VOTE, CERT>>,
59
60 pub view: ViewNumber,
62
63 pub id: u64,
65
66 pub transition_indicator: EpochTransitionIndicator,
68}
69
70pub trait AggregatableVote<
72 TYPES: NodeType,
73 VOTE: Vote<TYPES>,
74 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>,
75>
76{
77 fn leader(
82 &self,
83 membership: &EpochMembership<TYPES>,
84 ) -> impl Future<Output = Result<TYPES::SignatureKey>>;
85
86 fn make_cert_event(certificate: CERT, key: &TYPES::SignatureKey) -> HotShotEvent<TYPES>;
88}
89
90impl<
91 TYPES: NodeType,
92 VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
93 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Clone + Debug,
94> VoteCollectionTaskState<TYPES, VOTE, CERT>
95{
96 #[allow(clippy::question_mark)]
102 pub async fn accumulate_vote(
103 &mut self,
104 vote: &VOTE,
105 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
106 ) -> Result<Option<CERT>> {
107 ensure!(
109 matches!(
110 self.transition_indicator,
111 EpochTransitionIndicator::InTransition
112 ) || vote.leader(&self.membership).await? == self.public_key,
113 info!("Received vote for a view in which we were not the leader.")
114 );
115
116 ensure!(
117 vote.view_number() == self.view,
118 error!(
119 "Vote view does not match! vote view is {} current view is {}. This vote should \
120 not have been passed to this accumulator.",
121 *vote.view_number(),
122 *self.view
123 )
124 );
125
126 let accumulator = self.accumulator.as_mut().context(warn!(
127 "No accumulator to handle vote with. This shouldn't happen."
128 ))?;
129
130 match accumulator.accumulate(vote, self.membership.clone()).await {
131 None => Ok(None),
132 Some(cert) => {
133 tracing::debug!("Certificate Formed! {cert:?}");
134
135 broadcast_event(
136 Arc::new(VOTE::make_cert_event(cert.clone(), &self.public_key)),
137 event_stream,
138 )
139 .await;
140 self.accumulator = None;
141
142 Ok(Some(cert))
143 },
144 }
145 }
146}
147
148#[async_trait]
150pub trait HandleVoteEvent<TYPES, VOTE, CERT>
151where
152 TYPES: NodeType,
153 VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
154 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Debug,
155{
156 async fn handle_vote_event(
161 &mut self,
162 event: Arc<HotShotEvent<TYPES>>,
163 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
164 ) -> Result<Option<CERT>>;
165
166 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool;
168}
169
170pub struct AccumulatorInfo<TYPES: NodeType> {
172 pub public_key: TYPES::SignatureKey,
174
175 pub membership: EpochMembership<TYPES>,
177
178 pub view: ViewNumber,
180
181 pub id: u64,
183}
184
185pub async fn create_vote_accumulator<TYPES, VOTE, CERT>(
193 info: &AccumulatorInfo<TYPES>,
194 event: Arc<HotShotEvent<TYPES>>,
195 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
196 upgrade_lock: UpgradeLock<TYPES>,
197 transition_indicator: EpochTransitionIndicator,
198) -> Result<VoteCollectionTaskState<TYPES, VOTE, CERT>>
199where
200 TYPES: NodeType,
201 VOTE: Vote<TYPES>
202 + AggregatableVote<TYPES, VOTE, CERT>
203 + std::marker::Send
204 + std::marker::Sync
205 + 'static,
206 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>
207 + Debug
208 + std::marker::Send
209 + std::marker::Sync
210 + 'static,
211 VoteCollectionTaskState<TYPES, VOTE, CERT>: HandleVoteEvent<TYPES, VOTE, CERT>,
212{
213 let new_accumulator = VoteAccumulator {
214 vote_outcomes: HashMap::new(),
215 signers: HashMap::new(),
216 phantom: PhantomData,
217 upgrade_lock,
218 };
219
220 let mut state = VoteCollectionTaskState::<TYPES, VOTE, CERT> {
221 membership: info.membership.clone(),
222 public_key: info.public_key.clone(),
223 accumulator: Some(new_accumulator),
224 view: info.view,
225 id: info.id,
226 transition_indicator,
227 };
228
229 state.handle_vote_event(Arc::clone(&event), sender).await?;
230
231 Ok(state)
232}
233
234#[allow(clippy::too_many_arguments)]
239pub async fn handle_vote<
240 TYPES: NodeType,
241 VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT> + Send + Sync + 'static,
242 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>
243 + Debug
244 + Send
245 + Sync
246 + 'static,
247>(
248 collectors: &mut VoteCollectorsMap<TYPES, VOTE, CERT>,
249 vote: &VOTE,
250 public_key: TYPES::SignatureKey,
251 membership: &EpochMembership<TYPES>,
252 id: u64,
253 event: &Arc<HotShotEvent<TYPES>>,
254 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
255 upgrade_lock: &UpgradeLock<TYPES>,
256 transition_indicator: EpochTransitionIndicator,
257) -> Result<()>
258where
259 VoteCollectionTaskState<TYPES, VOTE, CERT>: HandleVoteEvent<TYPES, VOTE, CERT>,
260{
261 match collectors.entry(vote.view_number()) {
262 Entry::Vacant(entry) => {
263 tracing::debug!("Starting vote handle for view {:?}", vote.view_number());
264 let info = AccumulatorInfo {
265 public_key,
266 membership: membership.clone(),
267 view: vote.view_number(),
268 id,
269 };
270 let collector = create_vote_accumulator(
271 &info,
272 Arc::clone(event),
273 event_stream,
274 upgrade_lock.clone(),
275 transition_indicator,
276 )
277 .await?;
278
279 entry.insert(collector);
280
281 Ok(())
282 },
283 Entry::Occupied(mut entry) => {
284 if entry
286 .get_mut()
287 .handle_vote_event(Arc::clone(event), event_stream)
288 .await?
289 .is_some()
290 {
291 entry.remove();
292 *collectors = collectors.split_off(&vote.view_number());
293 }
294
295 Ok(())
296 },
297 }
298}
299
300type QuorumVoteState<TYPES> =
302 VoteCollectionTaskState<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>;
303type NextEpochQuorumVoteState<TYPES> =
305 VoteCollectionTaskState<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>;
306type DaVoteState<TYPES> = VoteCollectionTaskState<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>;
308type TimeoutVoteState<TYPES> =
310 VoteCollectionTaskState<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>;
311type UpgradeVoteState<TYPES> =
313 VoteCollectionTaskState<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>;
314type ViewSyncPreCommitState<TYPES> = VoteCollectionTaskState<
316 TYPES,
317 ViewSyncPreCommitVote2<TYPES>,
318 ViewSyncPreCommitCertificate2<TYPES>,
319>;
320type ViewSyncCommitVoteState<TYPES> =
322 VoteCollectionTaskState<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>;
323type ViewSyncFinalizeVoteState<TYPES> = VoteCollectionTaskState<
325 TYPES,
326 ViewSyncFinalizeVote2<TYPES>,
327 ViewSyncFinalizeCertificate2<TYPES>,
328>;
329
330impl<TYPES: NodeType> AggregatableVote<TYPES, QuorumVote<TYPES>, QuorumCertificate<TYPES>>
331 for QuorumVote<TYPES>
332{
333 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
334 membership.leader(self.view_number() + 1).await
335 }
336 fn make_cert_event(
337 certificate: QuorumCertificate<TYPES>,
338 _key: &TYPES::SignatureKey,
339 ) -> HotShotEvent<TYPES> {
340 HotShotEvent::QcFormed(Left(certificate))
341 }
342}
343
344impl<TYPES: NodeType> AggregatableVote<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>
345 for QuorumVote2<TYPES>
346{
347 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
348 membership.leader(self.view_number() + 1).await
349 }
350 fn make_cert_event(
351 certificate: QuorumCertificate2<TYPES>,
352 _key: &TYPES::SignatureKey,
353 ) -> HotShotEvent<TYPES> {
354 HotShotEvent::Qc2Formed(Left(certificate))
355 }
356}
357
358impl<TYPES: NodeType>
359 AggregatableVote<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>
360 for NextEpochQuorumVote2<TYPES>
361{
362 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
363 let epoch = membership
364 .epoch
365 .map(|e| EpochNumber::new(e.saturating_sub(1)));
366 membership
367 .get_new_epoch(epoch)
368 .await?
369 .leader(self.view_number() + 1)
370 .await
371 }
372 fn make_cert_event(
373 certificate: NextEpochQuorumCertificate2<TYPES>,
374 _key: &TYPES::SignatureKey,
375 ) -> HotShotEvent<TYPES> {
376 HotShotEvent::NextEpochQc2Formed(Left(certificate))
377 }
378}
379
380impl<TYPES: NodeType> AggregatableVote<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
381 for UpgradeVote<TYPES>
382{
383 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
384 membership.leader(self.view_number()).await
385 }
386 fn make_cert_event(
387 certificate: UpgradeCertificate<TYPES>,
388 _key: &TYPES::SignatureKey,
389 ) -> HotShotEvent<TYPES> {
390 HotShotEvent::UpgradeCertificateFormed(certificate)
391 }
392}
393
394impl<TYPES: NodeType> AggregatableVote<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>
395 for DaVote2<TYPES>
396{
397 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
398 membership.leader(self.view_number()).await
399 }
400 fn make_cert_event(
401 certificate: DaCertificate2<TYPES>,
402 key: &TYPES::SignatureKey,
403 ) -> HotShotEvent<TYPES> {
404 HotShotEvent::DacSend(certificate, key.clone())
405 }
406}
407
408impl<TYPES: NodeType> AggregatableVote<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>
409 for TimeoutVote2<TYPES>
410{
411 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
412 membership.leader(self.view_number() + 1).await
413 }
414 fn make_cert_event(
415 certificate: TimeoutCertificate2<TYPES>,
416 _key: &TYPES::SignatureKey,
417 ) -> HotShotEvent<TYPES> {
418 HotShotEvent::Qc2Formed(Right(certificate))
419 }
420}
421
422impl<TYPES: NodeType>
423 AggregatableVote<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>
424 for ViewSyncCommitVote2<TYPES>
425{
426 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
427 membership
428 .leader(self.date().round + self.date().relay)
429 .await
430 }
431 fn make_cert_event(
432 certificate: ViewSyncCommitCertificate2<TYPES>,
433 key: &TYPES::SignatureKey,
434 ) -> HotShotEvent<TYPES> {
435 HotShotEvent::ViewSyncCommitCertificateSend(certificate, key.clone())
436 }
437}
438
439impl<TYPES: NodeType>
440 AggregatableVote<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
441 for ViewSyncPreCommitVote2<TYPES>
442{
443 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
444 membership
445 .leader(self.date().round + self.date().relay)
446 .await
447 }
448 fn make_cert_event(
449 certificate: ViewSyncPreCommitCertificate2<TYPES>,
450 key: &TYPES::SignatureKey,
451 ) -> HotShotEvent<TYPES> {
452 HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, key.clone())
453 }
454}
455
456impl<TYPES: NodeType>
457 AggregatableVote<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
458 for ViewSyncFinalizeVote2<TYPES>
459{
460 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
461 membership
462 .leader(self.date().round + self.date().relay)
463 .await
464 }
465 fn make_cert_event(
466 certificate: ViewSyncFinalizeCertificate2<TYPES>,
467 key: &TYPES::SignatureKey,
468 ) -> HotShotEvent<TYPES> {
469 HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, key.clone())
470 }
471}
472
473#[async_trait]
475impl<TYPES: NodeType> HandleVoteEvent<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>
476 for QuorumVoteState<TYPES>
477{
478 async fn handle_vote_event(
479 &mut self,
480 event: Arc<HotShotEvent<TYPES>>,
481 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
482 ) -> Result<Option<QuorumCertificate2<TYPES>>> {
483 match event.as_ref() {
484 HotShotEvent::QuorumVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
485 _ => Ok(None),
486 }
487 }
488 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
489 matches!(event.as_ref(), HotShotEvent::QuorumVoteRecv(_))
490 }
491}
492
493#[async_trait]
495impl<TYPES: NodeType>
496 HandleVoteEvent<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>
497 for NextEpochQuorumVoteState<TYPES>
498{
499 async fn handle_vote_event(
500 &mut self,
501 event: Arc<HotShotEvent<TYPES>>,
502 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
503 ) -> Result<Option<NextEpochQuorumCertificate2<TYPES>>> {
504 match event.as_ref() {
505 HotShotEvent::QuorumVoteRecv(vote) => {
506 self.accumulate_vote(&vote.clone().into(), sender).await
508 },
509 _ => Ok(None),
510 }
511 }
512 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
513 matches!(event.as_ref(), HotShotEvent::QuorumVoteRecv(_))
514 }
515}
516
517#[async_trait]
519impl<TYPES: NodeType> HandleVoteEvent<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
520 for UpgradeVoteState<TYPES>
521{
522 async fn handle_vote_event(
523 &mut self,
524 event: Arc<HotShotEvent<TYPES>>,
525 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
526 ) -> Result<Option<UpgradeCertificate<TYPES>>> {
527 match event.as_ref() {
528 HotShotEvent::UpgradeVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
529 _ => Ok(None),
530 }
531 }
532 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
533 matches!(event.as_ref(), HotShotEvent::UpgradeVoteRecv(_))
534 }
535}
536
537#[async_trait]
538impl<TYPES: NodeType> HandleVoteEvent<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>
539 for DaVoteState<TYPES>
540{
541 async fn handle_vote_event(
542 &mut self,
543 event: Arc<HotShotEvent<TYPES>>,
544 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
545 ) -> Result<Option<DaCertificate2<TYPES>>> {
546 match event.as_ref() {
547 HotShotEvent::DaVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
548 _ => Ok(None),
549 }
550 }
551 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
552 matches!(event.as_ref(), HotShotEvent::DaVoteRecv(_))
553 }
554}
555
556#[async_trait]
557impl<TYPES: NodeType> HandleVoteEvent<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>
558 for TimeoutVoteState<TYPES>
559{
560 async fn handle_vote_event(
561 &mut self,
562 event: Arc<HotShotEvent<TYPES>>,
563 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
564 ) -> Result<Option<TimeoutCertificate2<TYPES>>> {
565 match event.as_ref() {
566 HotShotEvent::TimeoutVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
567 _ => Ok(None),
568 }
569 }
570 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
571 matches!(event.as_ref(), HotShotEvent::TimeoutVoteRecv(_))
572 }
573}
574
575#[async_trait]
576impl<TYPES: NodeType>
577 HandleVoteEvent<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
578 for ViewSyncPreCommitState<TYPES>
579{
580 async fn handle_vote_event(
581 &mut self,
582 event: Arc<HotShotEvent<TYPES>>,
583 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
584 ) -> Result<Option<ViewSyncPreCommitCertificate2<TYPES>>> {
585 match event.as_ref() {
586 HotShotEvent::ViewSyncPreCommitVoteRecv(vote) => {
587 self.accumulate_vote(vote, sender).await
588 },
589 _ => Ok(None),
590 }
591 }
592 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
593 matches!(event.as_ref(), HotShotEvent::ViewSyncPreCommitVoteRecv(_))
594 }
595}
596
597#[async_trait]
598impl<TYPES: NodeType>
599 HandleVoteEvent<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>
600 for ViewSyncCommitVoteState<TYPES>
601{
602 async fn handle_vote_event(
603 &mut self,
604 event: Arc<HotShotEvent<TYPES>>,
605 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
606 ) -> Result<Option<ViewSyncCommitCertificate2<TYPES>>> {
607 match event.as_ref() {
608 HotShotEvent::ViewSyncCommitVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
609 _ => Ok(None),
610 }
611 }
612 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
613 matches!(event.as_ref(), HotShotEvent::ViewSyncCommitVoteRecv(_))
614 }
615}
616
617#[async_trait]
618impl<TYPES: NodeType>
619 HandleVoteEvent<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
620 for ViewSyncFinalizeVoteState<TYPES>
621{
622 async fn handle_vote_event(
623 &mut self,
624 event: Arc<HotShotEvent<TYPES>>,
625 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
626 ) -> Result<Option<ViewSyncFinalizeCertificate2<TYPES>>> {
627 match event.as_ref() {
628 HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
629 self.accumulate_vote(vote, sender).await
630 },
631 _ => Ok(None),
632 }
633 }
634 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
635 matches!(event.as_ref(), HotShotEvent::ViewSyncFinalizeVoteRecv(_))
636 }
637}
638
639pub type EpochRootVoteCollectorsMap<TYPES> =
641 BTreeMap<ViewNumber, EpochRootVoteCollectionTaskState<TYPES>>;
642
643pub struct EpochRootVoteCollectionTaskState<TYPES: NodeType> {
644 pub public_key: TYPES::SignatureKey,
646
647 pub membership: EpochMembership<TYPES>,
649
650 pub accumulator: Option<VoteAccumulator<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>>,
652
653 pub state_vote_accumulator: Option<LightClientStateUpdateVoteAccumulator<TYPES>>,
655
656 pub view: ViewNumber,
658
659 pub epoch: Option<EpochNumber>,
661
662 pub id: u64,
664}
665
666impl<TYPES: NodeType> EpochRootVoteCollectionTaskState<TYPES> {
668 async fn handle_vote_event(
670 &mut self,
671 event: Arc<HotShotEvent<TYPES>>,
672 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
673 ) -> Result<Option<EpochRootQuorumCertificateV2<TYPES>>> {
674 match event.as_ref() {
675 HotShotEvent::EpochRootQuorumVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
676 _ => Ok(None),
677 }
678 }
679
680 async fn accumulate_vote(
682 &mut self,
683 vote: &EpochRootQuorumVote2<TYPES>,
684 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
685 ) -> Result<Option<EpochRootQuorumCertificateV2<TYPES>>> {
686 let EpochRootQuorumVote2 { vote, state_vote } = vote;
687 ensure!(
688 vote.view_number() == self.view,
689 error!(
690 "Vote view does not match! vote view is {} current view is {}. This vote should \
691 not have been passed to this accumulator.",
692 *vote.view_number(),
693 *self.view
694 )
695 );
696
697 let accumulator = self.accumulator.as_mut().context(warn!(
698 "No accumulator to handle extended quorum vote with. This shouldn't happen."
699 ))?;
700
701 let state_vote_accumulator = self.state_vote_accumulator.as_mut().context(warn!(
702 "No accumulator to handle light client state update vote with. This shouldn't happen."
703 ))?;
704
705 match (
706 accumulator.accumulate(vote, self.membership.clone()).await,
707 state_vote_accumulator
708 .accumulate(&vote.signing_key(), state_vote, &self.membership)
709 .await,
710 ) {
711 (None, None) => Ok(None),
712 (Some(cert), Some(state_cert)) => {
713 let root_qc = EpochRootQuorumCertificateV2 {
714 qc: cert,
715 state_cert,
716 };
717
718 tracing::debug!("Certificate Formed! {root_qc:?}");
719
720 broadcast_event(
721 Arc::new(HotShotEvent::EpochRootQcFormed(root_qc.clone())),
722 event_stream,
723 )
724 .await;
725 self.accumulator = None;
726
727 Ok(Some(root_qc))
728 },
729 _ => Err(error!(
730 "Only one certificate formed for the epoch root, this should not happen."
731 )),
732 }
733 }
734}
735
736async fn create_epoch_root_vote_collection_task_state<TYPES: NodeType>(
737 info: &AccumulatorInfo<TYPES>,
738 event: Arc<HotShotEvent<TYPES>>,
739 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
740 upgrade_lock: UpgradeLock<TYPES>,
741) -> Result<EpochRootVoteCollectionTaskState<TYPES>> {
742 let new_accumulator = VoteAccumulator::<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>> {
743 vote_outcomes: HashMap::new(),
744 signers: HashMap::new(),
745 phantom: PhantomData,
746 upgrade_lock: upgrade_lock.clone(),
747 };
748 let state_vote_accumulator = LightClientStateUpdateVoteAccumulator {
749 vote_outcomes: HashMap::new(),
750 upgrade_lock,
751 };
752
753 let mut state = EpochRootVoteCollectionTaskState::<TYPES> {
754 membership: info.membership.clone(),
755 public_key: info.public_key.clone(),
756 accumulator: Some(new_accumulator),
757 state_vote_accumulator: Some(state_vote_accumulator),
758 view: info.view,
759 epoch: info.membership.epoch,
760 id: info.id,
761 };
762
763 state.handle_vote_event(Arc::clone(&event), sender).await?;
764
765 Ok(state)
766}
767
768#[allow(clippy::too_many_arguments)]
773pub async fn handle_epoch_root_vote<TYPES: NodeType>(
774 collectors: &mut EpochRootVoteCollectorsMap<TYPES>,
775 vote: &EpochRootQuorumVote2<TYPES>,
776 public_key: TYPES::SignatureKey,
777 membership: &EpochMembership<TYPES>,
778 id: u64,
779 event: &Arc<HotShotEvent<TYPES>>,
780 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
781 upgrade_lock: &UpgradeLock<TYPES>,
782) -> Result<()> {
783 match collectors.entry(vote.view_number()) {
784 Entry::Vacant(entry) => {
785 tracing::debug!(
786 "Starting epoch root quorum vote handle for view {:?}",
787 vote.view_number()
788 );
789 let info = AccumulatorInfo {
790 public_key,
791 membership: membership.clone(),
792 view: vote.view_number(),
793 id,
794 };
795 let collector = create_epoch_root_vote_collection_task_state(
796 &info,
797 Arc::clone(event),
798 event_stream,
799 upgrade_lock.clone(),
800 )
801 .await?;
802
803 entry.insert(collector);
804
805 Ok(())
806 },
807 Entry::Occupied(mut entry) => {
808 if entry
810 .get_mut()
811 .handle_vote_event(Arc::clone(event), event_stream)
812 .await?
813 .is_some()
814 {
815 entry.remove();
816 *collectors = collectors.split_off(&vote.view_number());
817 }
818
819 Ok(())
820 },
821 }
822}