1use std::{
8 collections::{btree_map::Entry, BTreeMap, HashMap},
9 fmt::Debug,
10 future::Future,
11 marker::PhantomData,
12 sync::Arc,
13};
14
15use async_broadcast::Sender;
16use async_trait::async_trait;
17use either::Either::{Left, Right};
18use hotshot_types::{
19 epoch_membership::EpochMembership,
20 message::UpgradeLock,
21 simple_certificate::{
22 DaCertificate2, EpochRootQuorumCertificate, NextEpochQuorumCertificate2, QuorumCertificate,
23 QuorumCertificate2, TimeoutCertificate2, UpgradeCertificate, ViewSyncCommitCertificate2,
24 ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
25 },
26 simple_vote::{
27 DaVote2, EpochRootQuorumVote, NextEpochQuorumVote2, QuorumVote, QuorumVote2, TimeoutVote2,
28 UpgradeVote, ViewSyncCommitVote2, ViewSyncFinalizeVote2, ViewSyncPreCommitVote2,
29 },
30 traits::node_implementation::{ConsensusTime, NodeType, Versions},
31 utils::EpochTransitionIndicator,
32 vote::{
33 Certificate, HasViewNumber, LightClientStateUpdateVoteAccumulator, Vote, VoteAccumulator,
34 },
35};
36use hotshot_utils::anytrace::*;
37
38use crate::{events::HotShotEvent, helpers::broadcast_event};
39
40pub type VoteCollectorsMap<TYPES, VOTE, CERT, V> =
42 BTreeMap<<TYPES as NodeType>::View, VoteCollectionTaskState<TYPES, VOTE, CERT, V>>;
43
44pub struct VoteCollectionTaskState<
46 TYPES: NodeType,
47 VOTE: Vote<TYPES>,
48 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Debug,
49 V: Versions,
50> {
51 pub public_key: TYPES::SignatureKey,
53
54 pub membership: EpochMembership<TYPES>,
56
57 pub accumulator: Option<VoteAccumulator<TYPES, VOTE, CERT, V>>,
59
60 pub view: TYPES::View,
62
63 pub id: u64,
65
66 pub transition_indicator: EpochTransitionIndicator,
68}
69
70pub trait AggregatableVote<
72 TYPES: NodeType,
73 VOTE: Vote<TYPES>,
74 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>,
75>
76{
77 fn leader(
82 &self,
83 membership: &EpochMembership<TYPES>,
84 ) -> impl Future<Output = Result<TYPES::SignatureKey>>;
85
86 fn make_cert_event(certificate: CERT, key: &TYPES::SignatureKey) -> HotShotEvent<TYPES>;
88}
89
90impl<
91 TYPES: NodeType,
92 VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
93 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Clone + Debug,
94 V: Versions,
95 > VoteCollectionTaskState<TYPES, VOTE, CERT, V>
96{
97 #[allow(clippy::question_mark)]
103 pub async fn accumulate_vote(
104 &mut self,
105 vote: &VOTE,
106 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
107 ) -> Result<Option<CERT>> {
108 ensure!(
110 matches!(
111 self.transition_indicator,
112 EpochTransitionIndicator::InTransition
113 ) || vote.leader(&self.membership).await? == self.public_key,
114 info!("Received vote for a view in which we were not the leader.")
115 );
116
117 ensure!(
118 vote.view_number() == self.view,
119 error!(
120 "Vote view does not match! vote view is {} current view is {}. This vote should \
121 not have been passed to this accumulator.",
122 *vote.view_number(),
123 *self.view
124 )
125 );
126
127 let accumulator = self.accumulator.as_mut().context(warn!(
128 "No accumulator to handle vote with. This shouldn't happen."
129 ))?;
130
131 match accumulator.accumulate(vote, self.membership.clone()).await {
132 None => Ok(None),
133 Some(cert) => {
134 tracing::debug!("Certificate Formed! {cert:?}");
135
136 broadcast_event(
137 Arc::new(VOTE::make_cert_event(cert.clone(), &self.public_key)),
138 event_stream,
139 )
140 .await;
141 self.accumulator = None;
142
143 Ok(Some(cert))
144 },
145 }
146 }
147}
148
149#[async_trait]
151pub trait HandleVoteEvent<TYPES, VOTE, CERT>
152where
153 TYPES: NodeType,
154 VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
155 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Debug,
156{
157 async fn handle_vote_event(
162 &mut self,
163 event: Arc<HotShotEvent<TYPES>>,
164 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
165 ) -> Result<Option<CERT>>;
166
167 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool;
169}
170
171pub struct AccumulatorInfo<TYPES: NodeType> {
173 pub public_key: TYPES::SignatureKey,
175
176 pub membership: EpochMembership<TYPES>,
178
179 pub view: TYPES::View,
181
182 pub id: u64,
184}
185
186pub async fn create_vote_accumulator<TYPES, VOTE, CERT, V>(
194 info: &AccumulatorInfo<TYPES>,
195 event: Arc<HotShotEvent<TYPES>>,
196 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
197 upgrade_lock: UpgradeLock<TYPES, V>,
198 transition_indicator: EpochTransitionIndicator,
199) -> Result<VoteCollectionTaskState<TYPES, VOTE, CERT, V>>
200where
201 TYPES: NodeType,
202 VOTE: Vote<TYPES>
203 + AggregatableVote<TYPES, VOTE, CERT>
204 + std::marker::Send
205 + std::marker::Sync
206 + 'static,
207 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>
208 + Debug
209 + std::marker::Send
210 + std::marker::Sync
211 + 'static,
212 V: Versions,
213 VoteCollectionTaskState<TYPES, VOTE, CERT, V>: HandleVoteEvent<TYPES, VOTE, CERT>,
214{
215 let new_accumulator = VoteAccumulator {
216 vote_outcomes: HashMap::new(),
217 signers: HashMap::new(),
218 phantom: PhantomData,
219 upgrade_lock,
220 };
221
222 let mut state = VoteCollectionTaskState::<TYPES, VOTE, CERT, V> {
223 membership: info.membership.clone(),
224 public_key: info.public_key.clone(),
225 accumulator: Some(new_accumulator),
226 view: info.view,
227 id: info.id,
228 transition_indicator,
229 };
230
231 state.handle_vote_event(Arc::clone(&event), sender).await?;
232
233 Ok(state)
234}
235
236#[allow(clippy::too_many_arguments)]
241pub async fn handle_vote<
242 TYPES: NodeType,
243 VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT> + Send + Sync + 'static,
244 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>
245 + Debug
246 + Send
247 + Sync
248 + 'static,
249 V: Versions,
250>(
251 collectors: &mut VoteCollectorsMap<TYPES, VOTE, CERT, V>,
252 vote: &VOTE,
253 public_key: TYPES::SignatureKey,
254 membership: &EpochMembership<TYPES>,
255 id: u64,
256 event: &Arc<HotShotEvent<TYPES>>,
257 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
258 upgrade_lock: &UpgradeLock<TYPES, V>,
259 transition_indicator: EpochTransitionIndicator,
260) -> Result<()>
261where
262 VoteCollectionTaskState<TYPES, VOTE, CERT, V>: HandleVoteEvent<TYPES, VOTE, CERT>,
263{
264 match collectors.entry(vote.view_number()) {
265 Entry::Vacant(entry) => {
266 tracing::debug!("Starting vote handle for view {:?}", vote.view_number());
267 let info = AccumulatorInfo {
268 public_key,
269 membership: membership.clone(),
270 view: vote.view_number(),
271 id,
272 };
273 let collector = create_vote_accumulator(
274 &info,
275 Arc::clone(event),
276 event_stream,
277 upgrade_lock.clone(),
278 transition_indicator,
279 )
280 .await?;
281
282 entry.insert(collector);
283
284 Ok(())
285 },
286 Entry::Occupied(mut entry) => {
287 if entry
289 .get_mut()
290 .handle_vote_event(Arc::clone(event), event_stream)
291 .await?
292 .is_some()
293 {
294 entry.remove();
295 *collectors = collectors.split_off(&vote.view_number());
296 }
297
298 Ok(())
299 },
300 }
301}
302
303type QuorumVoteState<TYPES, V> =
305 VoteCollectionTaskState<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>, V>;
306type NextEpochQuorumVoteState<TYPES, V> = VoteCollectionTaskState<
308 TYPES,
309 NextEpochQuorumVote2<TYPES>,
310 NextEpochQuorumCertificate2<TYPES>,
311 V,
312>;
313type DaVoteState<TYPES, V> =
315 VoteCollectionTaskState<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>, V>;
316type TimeoutVoteState<TYPES, V> =
318 VoteCollectionTaskState<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>, V>;
319type UpgradeVoteState<TYPES, V> =
321 VoteCollectionTaskState<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>, V>;
322type ViewSyncPreCommitState<TYPES, V> = VoteCollectionTaskState<
324 TYPES,
325 ViewSyncPreCommitVote2<TYPES>,
326 ViewSyncPreCommitCertificate2<TYPES>,
327 V,
328>;
329type ViewSyncCommitVoteState<TYPES, V> = VoteCollectionTaskState<
331 TYPES,
332 ViewSyncCommitVote2<TYPES>,
333 ViewSyncCommitCertificate2<TYPES>,
334 V,
335>;
336type ViewSyncFinalizeVoteState<TYPES, V> = VoteCollectionTaskState<
338 TYPES,
339 ViewSyncFinalizeVote2<TYPES>,
340 ViewSyncFinalizeCertificate2<TYPES>,
341 V,
342>;
343
344impl<TYPES: NodeType> AggregatableVote<TYPES, QuorumVote<TYPES>, QuorumCertificate<TYPES>>
345 for QuorumVote<TYPES>
346{
347 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
348 membership.leader(self.view_number() + 1).await
349 }
350 fn make_cert_event(
351 certificate: QuorumCertificate<TYPES>,
352 _key: &TYPES::SignatureKey,
353 ) -> HotShotEvent<TYPES> {
354 HotShotEvent::QcFormed(Left(certificate))
355 }
356}
357
358impl<TYPES: NodeType> AggregatableVote<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>
359 for QuorumVote2<TYPES>
360{
361 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
362 membership.leader(self.view_number() + 1).await
363 }
364 fn make_cert_event(
365 certificate: QuorumCertificate2<TYPES>,
366 _key: &TYPES::SignatureKey,
367 ) -> HotShotEvent<TYPES> {
368 HotShotEvent::Qc2Formed(Left(certificate))
369 }
370}
371
372impl<TYPES: NodeType>
373 AggregatableVote<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>
374 for NextEpochQuorumVote2<TYPES>
375{
376 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
377 let epoch = membership
378 .epoch
379 .map(|e| TYPES::Epoch::new(e.saturating_sub(1)));
380 membership
381 .get_new_epoch(epoch)
382 .await?
383 .leader(self.view_number() + 1)
384 .await
385 }
386 fn make_cert_event(
387 certificate: NextEpochQuorumCertificate2<TYPES>,
388 _key: &TYPES::SignatureKey,
389 ) -> HotShotEvent<TYPES> {
390 HotShotEvent::NextEpochQc2Formed(Left(certificate))
391 }
392}
393
394impl<TYPES: NodeType> AggregatableVote<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
395 for UpgradeVote<TYPES>
396{
397 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
398 membership.leader(self.view_number()).await
399 }
400 fn make_cert_event(
401 certificate: UpgradeCertificate<TYPES>,
402 _key: &TYPES::SignatureKey,
403 ) -> HotShotEvent<TYPES> {
404 HotShotEvent::UpgradeCertificateFormed(certificate)
405 }
406}
407
408impl<TYPES: NodeType> AggregatableVote<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>
409 for DaVote2<TYPES>
410{
411 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
412 membership.leader(self.view_number()).await
413 }
414 fn make_cert_event(
415 certificate: DaCertificate2<TYPES>,
416 key: &TYPES::SignatureKey,
417 ) -> HotShotEvent<TYPES> {
418 HotShotEvent::DacSend(certificate, key.clone())
419 }
420}
421
422impl<TYPES: NodeType> AggregatableVote<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>
423 for TimeoutVote2<TYPES>
424{
425 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
426 membership.leader(self.view_number() + 1).await
427 }
428 fn make_cert_event(
429 certificate: TimeoutCertificate2<TYPES>,
430 _key: &TYPES::SignatureKey,
431 ) -> HotShotEvent<TYPES> {
432 HotShotEvent::Qc2Formed(Right(certificate))
433 }
434}
435
436impl<TYPES: NodeType>
437 AggregatableVote<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>
438 for ViewSyncCommitVote2<TYPES>
439{
440 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
441 membership
442 .leader(self.date().round + self.date().relay)
443 .await
444 }
445 fn make_cert_event(
446 certificate: ViewSyncCommitCertificate2<TYPES>,
447 key: &TYPES::SignatureKey,
448 ) -> HotShotEvent<TYPES> {
449 HotShotEvent::ViewSyncCommitCertificateSend(certificate, key.clone())
450 }
451}
452
453impl<TYPES: NodeType>
454 AggregatableVote<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
455 for ViewSyncPreCommitVote2<TYPES>
456{
457 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
458 membership
459 .leader(self.date().round + self.date().relay)
460 .await
461 }
462 fn make_cert_event(
463 certificate: ViewSyncPreCommitCertificate2<TYPES>,
464 key: &TYPES::SignatureKey,
465 ) -> HotShotEvent<TYPES> {
466 HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, key.clone())
467 }
468}
469
470impl<TYPES: NodeType>
471 AggregatableVote<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
472 for ViewSyncFinalizeVote2<TYPES>
473{
474 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
475 membership
476 .leader(self.date().round + self.date().relay)
477 .await
478 }
479 fn make_cert_event(
480 certificate: ViewSyncFinalizeCertificate2<TYPES>,
481 key: &TYPES::SignatureKey,
482 ) -> HotShotEvent<TYPES> {
483 HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, key.clone())
484 }
485}
486
487#[async_trait]
489impl<TYPES: NodeType, V: Versions>
490 HandleVoteEvent<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>
491 for QuorumVoteState<TYPES, V>
492{
493 async fn handle_vote_event(
494 &mut self,
495 event: Arc<HotShotEvent<TYPES>>,
496 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
497 ) -> Result<Option<QuorumCertificate2<TYPES>>> {
498 match event.as_ref() {
499 HotShotEvent::QuorumVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
500 _ => Ok(None),
501 }
502 }
503 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
504 matches!(event.as_ref(), HotShotEvent::QuorumVoteRecv(_))
505 }
506}
507
508#[async_trait]
510impl<TYPES: NodeType, V: Versions>
511 HandleVoteEvent<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>
512 for NextEpochQuorumVoteState<TYPES, V>
513{
514 async fn handle_vote_event(
515 &mut self,
516 event: Arc<HotShotEvent<TYPES>>,
517 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
518 ) -> Result<Option<NextEpochQuorumCertificate2<TYPES>>> {
519 match event.as_ref() {
520 HotShotEvent::QuorumVoteRecv(vote) => {
521 self.accumulate_vote(&vote.clone().into(), sender).await
523 },
524 _ => Ok(None),
525 }
526 }
527 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
528 matches!(event.as_ref(), HotShotEvent::QuorumVoteRecv(_))
529 }
530}
531
532#[async_trait]
534impl<TYPES: NodeType, V: Versions>
535 HandleVoteEvent<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
536 for UpgradeVoteState<TYPES, V>
537{
538 async fn handle_vote_event(
539 &mut self,
540 event: Arc<HotShotEvent<TYPES>>,
541 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
542 ) -> Result<Option<UpgradeCertificate<TYPES>>> {
543 match event.as_ref() {
544 HotShotEvent::UpgradeVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
545 _ => Ok(None),
546 }
547 }
548 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
549 matches!(event.as_ref(), HotShotEvent::UpgradeVoteRecv(_))
550 }
551}
552
553#[async_trait]
554impl<TYPES: NodeType, V: Versions> HandleVoteEvent<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>
555 for DaVoteState<TYPES, V>
556{
557 async fn handle_vote_event(
558 &mut self,
559 event: Arc<HotShotEvent<TYPES>>,
560 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
561 ) -> Result<Option<DaCertificate2<TYPES>>> {
562 match event.as_ref() {
563 HotShotEvent::DaVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
564 _ => Ok(None),
565 }
566 }
567 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
568 matches!(event.as_ref(), HotShotEvent::DaVoteRecv(_))
569 }
570}
571
572#[async_trait]
573impl<TYPES: NodeType, V: Versions>
574 HandleVoteEvent<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>
575 for TimeoutVoteState<TYPES, V>
576{
577 async fn handle_vote_event(
578 &mut self,
579 event: Arc<HotShotEvent<TYPES>>,
580 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
581 ) -> Result<Option<TimeoutCertificate2<TYPES>>> {
582 match event.as_ref() {
583 HotShotEvent::TimeoutVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
584 _ => Ok(None),
585 }
586 }
587 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
588 matches!(event.as_ref(), HotShotEvent::TimeoutVoteRecv(_))
589 }
590}
591
592#[async_trait]
593impl<TYPES: NodeType, V: Versions>
594 HandleVoteEvent<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
595 for ViewSyncPreCommitState<TYPES, V>
596{
597 async fn handle_vote_event(
598 &mut self,
599 event: Arc<HotShotEvent<TYPES>>,
600 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
601 ) -> Result<Option<ViewSyncPreCommitCertificate2<TYPES>>> {
602 match event.as_ref() {
603 HotShotEvent::ViewSyncPreCommitVoteRecv(vote) => {
604 self.accumulate_vote(vote, sender).await
605 },
606 _ => Ok(None),
607 }
608 }
609 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
610 matches!(event.as_ref(), HotShotEvent::ViewSyncPreCommitVoteRecv(_))
611 }
612}
613
614#[async_trait]
615impl<TYPES: NodeType, V: Versions>
616 HandleVoteEvent<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>
617 for ViewSyncCommitVoteState<TYPES, V>
618{
619 async fn handle_vote_event(
620 &mut self,
621 event: Arc<HotShotEvent<TYPES>>,
622 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
623 ) -> Result<Option<ViewSyncCommitCertificate2<TYPES>>> {
624 match event.as_ref() {
625 HotShotEvent::ViewSyncCommitVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
626 _ => Ok(None),
627 }
628 }
629 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
630 matches!(event.as_ref(), HotShotEvent::ViewSyncCommitVoteRecv(_))
631 }
632}
633
634#[async_trait]
635impl<TYPES: NodeType, V: Versions>
636 HandleVoteEvent<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
637 for ViewSyncFinalizeVoteState<TYPES, V>
638{
639 async fn handle_vote_event(
640 &mut self,
641 event: Arc<HotShotEvent<TYPES>>,
642 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
643 ) -> Result<Option<ViewSyncFinalizeCertificate2<TYPES>>> {
644 match event.as_ref() {
645 HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
646 self.accumulate_vote(vote, sender).await
647 },
648 _ => Ok(None),
649 }
650 }
651 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
652 matches!(event.as_ref(), HotShotEvent::ViewSyncFinalizeVoteRecv(_))
653 }
654}
655
656pub type EpochRootVoteCollectorsMap<TYPES, V> =
658 BTreeMap<<TYPES as NodeType>::View, EpochRootVoteCollectionTaskState<TYPES, V>>;
659
660pub struct EpochRootVoteCollectionTaskState<TYPES: NodeType, V: Versions> {
661 pub public_key: TYPES::SignatureKey,
663
664 pub membership: EpochMembership<TYPES>,
666
667 pub accumulator:
669 Option<VoteAccumulator<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>, V>>,
670
671 pub state_vote_accumulator: Option<LightClientStateUpdateVoteAccumulator<TYPES>>,
673
674 pub view: TYPES::View,
676
677 pub epoch: Option<TYPES::Epoch>,
679
680 pub id: u64,
682}
683
684impl<TYPES: NodeType, V: Versions> EpochRootVoteCollectionTaskState<TYPES, V> {
686 async fn handle_vote_event(
688 &mut self,
689 event: Arc<HotShotEvent<TYPES>>,
690 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
691 ) -> Result<Option<EpochRootQuorumCertificate<TYPES>>> {
692 match event.as_ref() {
693 HotShotEvent::EpochRootQuorumVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
694 _ => Ok(None),
695 }
696 }
697
698 async fn accumulate_vote(
700 &mut self,
701 vote: &EpochRootQuorumVote<TYPES>,
702 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
703 ) -> Result<Option<EpochRootQuorumCertificate<TYPES>>> {
704 let EpochRootQuorumVote { vote, state_vote } = vote;
705 ensure!(
706 vote.view_number() == self.view,
707 error!(
708 "Vote view does not match! vote view is {} current view is {}. This vote should \
709 not have been passed to this accumulator.",
710 *vote.view_number(),
711 *self.view
712 )
713 );
714
715 let accumulator = self.accumulator.as_mut().context(warn!(
716 "No accumulator to handle extended quorum vote with. This shouldn't happen."
717 ))?;
718
719 let state_vote_accumulator = self.state_vote_accumulator.as_mut().context(warn!(
720 "No accumulator to handle light client state update vote with. This shouldn't happen."
721 ))?;
722
723 match (
724 accumulator.accumulate(vote, self.membership.clone()).await,
725 state_vote_accumulator
726 .accumulate(&vote.signing_key(), state_vote, &self.membership)
727 .await,
728 ) {
729 (None, None) => Ok(None),
730 (Some(cert), Some(state_cert)) => {
731 let root_qc = EpochRootQuorumCertificate {
732 qc: cert,
733 state_cert,
734 };
735
736 tracing::debug!("Certificate Formed! {root_qc:?}");
737
738 broadcast_event(
739 Arc::new(HotShotEvent::EpochRootQcFormed(root_qc.clone())),
740 event_stream,
741 )
742 .await;
743 self.accumulator = None;
744
745 Ok(Some(root_qc))
746 },
747 _ => Err(error!(
748 "Only one certificate formed for the epoch root, this should not happen."
749 )),
750 }
751 }
752}
753
754async fn create_epoch_root_vote_collection_task_state<TYPES: NodeType, V: Versions>(
755 info: &AccumulatorInfo<TYPES>,
756 event: Arc<HotShotEvent<TYPES>>,
757 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
758 upgrade_lock: UpgradeLock<TYPES, V>,
759) -> Result<EpochRootVoteCollectionTaskState<TYPES, V>> {
760 let new_accumulator =
761 VoteAccumulator::<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>, V> {
762 vote_outcomes: HashMap::new(),
763 signers: HashMap::new(),
764 phantom: PhantomData,
765 upgrade_lock,
766 };
767 let state_vote_accumulator = LightClientStateUpdateVoteAccumulator {
768 vote_outcomes: HashMap::new(),
769 };
770
771 let mut state = EpochRootVoteCollectionTaskState::<TYPES, V> {
772 membership: info.membership.clone(),
773 public_key: info.public_key.clone(),
774 accumulator: Some(new_accumulator),
775 state_vote_accumulator: Some(state_vote_accumulator),
776 view: info.view,
777 epoch: info.membership.epoch,
778 id: info.id,
779 };
780
781 state.handle_vote_event(Arc::clone(&event), sender).await?;
782
783 Ok(state)
784}
785
786#[allow(clippy::too_many_arguments)]
791pub async fn handle_epoch_root_vote<TYPES: NodeType, V: Versions>(
792 collectors: &mut EpochRootVoteCollectorsMap<TYPES, V>,
793 vote: &EpochRootQuorumVote<TYPES>,
794 public_key: TYPES::SignatureKey,
795 membership: &EpochMembership<TYPES>,
796 id: u64,
797 event: &Arc<HotShotEvent<TYPES>>,
798 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
799 upgrade_lock: &UpgradeLock<TYPES, V>,
800) -> Result<()> {
801 match collectors.entry(vote.view_number()) {
802 Entry::Vacant(entry) => {
803 tracing::debug!(
804 "Starting epoch root quorum vote handle for view {:?}",
805 vote.view_number()
806 );
807 let info = AccumulatorInfo {
808 public_key,
809 membership: membership.clone(),
810 view: vote.view_number(),
811 id,
812 };
813 let collector = create_epoch_root_vote_collection_task_state(
814 &info,
815 Arc::clone(event),
816 event_stream,
817 upgrade_lock.clone(),
818 )
819 .await?;
820
821 entry.insert(collector);
822
823 Ok(())
824 },
825 Entry::Occupied(mut entry) => {
826 if entry
828 .get_mut()
829 .handle_vote_event(Arc::clone(event), event_stream)
830 .await?
831 .is_some()
832 {
833 entry.remove();
834 *collectors = collectors.split_off(&vote.view_number());
835 }
836
837 Ok(())
838 },
839 }
840}