1use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::{
13 dependency::{AndDependency, EventDependency},
14 dependency_task::{DependencyTask, HandleDepOutput},
15 task::TaskState,
16};
17use hotshot_types::{
18 consensus::{ConsensusMetricsValue, OuterConsensus},
19 data::{vid_disperse::vid_total_weight, Leaf2},
20 epoch_membership::EpochMembershipCoordinator,
21 event::Event,
22 message::UpgradeLock,
23 simple_vote::HasEpoch,
24 stake_table::StakeTableEntries,
25 storage_metrics::StorageMetricsValue,
26 traits::{
27 block_contents::BlockHeader,
28 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
29 signature_key::{SignatureKey, StateSignatureKey},
30 storage::Storage,
31 },
32 utils::{is_epoch_root, is_epoch_transition, is_last_block, option_epoch_from_block_number},
33 vote::{Certificate, HasViewNumber},
34};
35use hotshot_utils::anytrace::*;
36use tracing::instrument;
37
38use crate::{
39 events::HotShotEvent,
40 helpers::{broadcast_event, broadcast_view_change, wait_for_second_vid_share},
41 quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state},
42};
43
44mod handlers;
46
47#[derive(Debug, PartialEq)]
49enum VoteDependency {
50 QuorumProposal,
52 Dac,
54 Vid,
56}
57
58pub struct VoteDependencyHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
60 pub public_key: TYPES::SignatureKey,
62
63 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
65
66 pub consensus: OuterConsensus<TYPES>,
68
69 pub instance_state: Arc<TYPES::InstanceState>,
71
72 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
74
75 pub storage: I::Storage,
77
78 pub storage_metrics: Arc<StorageMetricsValue>,
80
81 pub view_number: TYPES::View,
83
84 pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
86
87 pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
89
90 pub upgrade_lock: UpgradeLock<TYPES, V>,
92
93 pub consensus_metrics: Arc<ConsensusMetricsValue>,
95
96 pub id: u64,
98
99 pub epoch_height: u64,
101
102 pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
104
105 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
107
108 pub stake_table_capacity: usize,
110
111 pub cancel_receiver: Receiver<()>,
112}
113
114impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> HandleDepOutput
115 for VoteDependencyHandle<TYPES, I, V>
116{
117 type Output = Vec<Arc<HotShotEvent<TYPES>>>;
118
119 #[allow(clippy::too_many_lines)]
120 #[instrument(skip_all, fields(id = self.id, view = *self.view_number))]
121 async fn handle_dep_result(self, res: Self::Output) {
122 let result = self.handle_vote_deps(&res).await;
123 if result.is_err() {
124 log!(result);
125 self.print_vote_events(&res)
126 }
127 }
128}
129
130impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
131 VoteDependencyHandle<TYPES, I, V>
132{
133 fn print_vote_events(&self, res: &[Arc<HotShotEvent<TYPES>>]) {
134 let events: Vec<_> = res.iter().map(Arc::as_ref).collect();
135 tracing::warn!("Failed to vote, events: {:#?}", events);
136 }
137
138 async fn handle_vote_deps(&self, res: &[Arc<HotShotEvent<TYPES>>]) -> Result<()> {
139 let mut payload_commitment = None;
140 let mut next_epoch_payload_commitment = None;
141 let mut leaf = None;
142 let mut vid_share = None;
143 let mut da_cert = None;
144 let mut parent_view_number = None;
145 for event in res.iter() {
146 match event.as_ref() {
147 #[allow(unused_assignments)]
148 HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
149 let proposal_payload_comm = proposal.data.block_header().payload_commitment();
150 let parent_commitment = parent_leaf.commit();
151 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
152
153 if let Some(ref comm) = payload_commitment {
154 ensure!(proposal_payload_comm == *comm,
155 error!("Quorum proposal has inconsistent payload commitment with DAC or VID."));
156 } else {
157 payload_commitment = Some(proposal_payload_comm);
158 }
159
160 ensure!(proposed_leaf.parent_commitment() == parent_commitment,
161 warn!("Proposed leaf parent commitment does not match parent leaf payload commitment. Aborting vote."));
162
163 let now = Instant::now();
164 self.storage
167 .append_proposal_wrapper(proposal)
168 .await
169 .map_err(|e| {
170 error!("failed to store proposal, not voting. error = {e:#}")
171 })?;
172 self.storage_metrics
173 .append_quorum_duration
174 .add_point(now.elapsed().as_secs_f64());
175
176 leaf = Some(proposed_leaf);
177 parent_view_number = Some(parent_leaf.view_number());
178 },
179 HotShotEvent::DaCertificateValidated(cert) => {
180 let cert_payload_comm = &cert.data().payload_commit;
181 let next_epoch_cert_payload_comm = cert.data().next_epoch_payload_commit;
182 if let Some(ref comm) = payload_commitment {
183 ensure!(cert_payload_comm == comm, error!("DAC has inconsistent payload commitment with quorum proposal or VID."));
184 } else {
185 payload_commitment = Some(*cert_payload_comm);
186 }
187 if next_epoch_payload_commitment.is_some()
188 && next_epoch_payload_commitment != next_epoch_cert_payload_comm
189 {
190 bail!(error!(
191 "DAC has inconsistent next epoch payload commitment with VID."
192 ));
193 } else {
194 next_epoch_payload_commitment = next_epoch_cert_payload_comm;
195 }
196 da_cert = Some(cert.clone());
197 },
198 HotShotEvent::VidShareValidated(share) => {
199 let vid_payload_commitment = &share.data.payload_commitment();
200 vid_share = Some(share.clone());
201 let is_next_epoch_vid = share.data.epoch() != share.data.target_epoch();
202 if is_next_epoch_vid {
203 if let Some(ref comm) = next_epoch_payload_commitment {
204 ensure!(
205 vid_payload_commitment == comm,
206 error!(
207 "VID has inconsistent next epoch payload commitment with DAC."
208 )
209 );
210 } else {
211 next_epoch_payload_commitment = Some(*vid_payload_commitment);
212 }
213 } else if let Some(ref comm) = payload_commitment {
214 ensure!(vid_payload_commitment == comm, error!("VID has inconsistent payload commitment with quorum proposal or DAC."));
215 } else {
216 payload_commitment = Some(*vid_payload_commitment);
217 }
218 },
219 _ => {},
220 }
221 }
222
223 let Some(vid_share) = vid_share else {
224 bail!(error!(
225 "We don't have the VID share for this view {}, but we should, because the vote dependencies have completed.",
226 self.view_number
227 ));
228 };
229
230 let Some(leaf) = leaf else {
231 bail!(error!(
232 "We don't have the leaf for this view {}, but we should, because the vote dependencies have completed.",
233 self.view_number
234 ));
235 };
236
237 let Some(da_cert) = da_cert else {
238 bail!(error!(
239 "We don't have the DA cert for this view {}, but we should, because the vote dependencies have completed.",
240 self.view_number
241 ));
242 };
243
244 let mut maybe_current_epoch_vid_share = None;
245 if self.upgrade_lock.epochs_enabled(leaf.view_number()).await
247 && is_epoch_transition(leaf.block_header().block_number(), self.epoch_height)
248 {
249 let current_epoch = option_epoch_from_block_number::<TYPES>(
250 leaf.with_epoch,
251 leaf.block_header().block_number(),
252 self.epoch_height,
253 );
254 let next_epoch = current_epoch.map(|e| e + 1);
255
256 let Ok(current_epoch_membership) = self
257 .membership_coordinator
258 .stake_table_for_epoch(current_epoch)
259 .await
260 else {
261 bail!(warn!(
262 "Couldn't acquire current epoch membership. Do not vote!"
263 ));
264 };
265 let Ok(next_epoch_membership) = self
266 .membership_coordinator
267 .stake_table_for_epoch(next_epoch)
268 .await
269 else {
270 bail!(warn!(
271 "Couldn't acquire next epoch membership. Do not vote!"
272 ));
273 };
274
275 if current_epoch_membership.has_stake(&self.public_key).await
277 && next_epoch_membership.has_stake(&self.public_key).await
278 {
279 let other_target_epoch = if vid_share.data.target_epoch() == current_epoch {
280 maybe_current_epoch_vid_share = Some(vid_share.clone());
281 next_epoch
282 } else {
283 current_epoch
284 };
285 match wait_for_second_vid_share(
286 other_target_epoch,
287 &vid_share,
288 &da_cert,
289 &self.consensus,
290 &self.receiver.activate_cloned(),
291 self.cancel_receiver.clone(),
292 self.id,
293 )
294 .await
295 {
296 Ok(other_vid_share) => {
297 if maybe_current_epoch_vid_share.is_none() {
298 maybe_current_epoch_vid_share = Some(other_vid_share);
299 }
300 ensure!(
301 leaf.block_header().payload_commitment()
302 == maybe_current_epoch_vid_share
303 .as_ref()
304 .unwrap()
305 .data
306 .payload_commitment(),
307 error!(
308 "We have both epochs vid shares but the leaf's vid commit doesn't \
309 match the old epoch vid share's commit. It should never happen."
310 )
311 );
312 },
313 Err(e) => {
314 bail!(warn!(
315 "This is an epoch transition block, we are in both epochs \
316 but we received only one VID share. Do not vote! Error: {e:?}"
317 ));
318 },
319 }
320 }
321 }
322
323 update_shared_state::<TYPES, I, V>(
325 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
326 self.sender.clone(),
327 self.receiver.clone(),
328 self.membership_coordinator.clone(),
329 self.public_key.clone(),
330 self.private_key.clone(),
331 self.upgrade_lock.clone(),
332 self.view_number,
333 Arc::clone(&self.instance_state),
334 &leaf,
335 maybe_current_epoch_vid_share.as_ref().unwrap_or(&vid_share),
336 parent_view_number,
337 self.epoch_height,
338 )
339 .await
340 .context(error!("Failed to update shared consensus state"))?;
341
342 let cur_epoch = option_epoch_from_block_number::<TYPES>(
343 leaf.with_epoch,
344 leaf.height(),
345 self.epoch_height,
346 );
347
348 let now = Instant::now();
349 let epoch_membership = self
353 .membership_coordinator
354 .membership_for_epoch(cur_epoch)
355 .await?;
356
357 let duration = now.elapsed();
358 tracing::info!("membership_for_epoch time: {duration:?}");
359
360 let is_vote_leaf_extended = is_last_block(leaf.height(), self.epoch_height);
361 let is_vote_epoch_root = is_epoch_root(leaf.height(), self.epoch_height);
362 if cur_epoch.is_none() || !is_vote_leaf_extended {
363 broadcast_view_change(
367 &self.sender,
368 leaf.view_number() + 1,
369 cur_epoch,
370 self.first_epoch,
371 )
372 .await;
373 }
374
375 submit_vote::<TYPES, I, V>(
376 self.sender.clone(),
377 epoch_membership,
378 self.public_key.clone(),
379 self.private_key.clone(),
380 self.upgrade_lock.clone(),
381 self.view_number,
382 self.storage.clone(),
383 Arc::clone(&self.storage_metrics),
384 leaf,
385 maybe_current_epoch_vid_share.unwrap_or(vid_share),
386 is_vote_leaf_extended,
387 is_vote_epoch_root,
388 self.epoch_height,
389 &self.state_private_key,
390 self.stake_table_capacity,
391 )
392 .await
393 }
394}
395
396pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
400 pub public_key: TYPES::SignatureKey,
402
403 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
405
406 pub consensus: OuterConsensus<TYPES>,
408
409 pub instance_state: Arc<TYPES::InstanceState>,
411
412 pub latest_voted_view: TYPES::View,
414
415 pub vote_dependencies: BTreeMap<TYPES::View, Sender<()>>,
417
418 pub network: Arc<I::Network>,
420
421 pub membership: EpochMembershipCoordinator<TYPES>,
423
424 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
426
427 pub id: u64,
429
430 pub consensus_metrics: Arc<ConsensusMetricsValue>,
432
433 pub storage: I::Storage,
435
436 pub storage_metrics: Arc<StorageMetricsValue>,
438
439 pub upgrade_lock: UpgradeLock<TYPES, V>,
441
442 pub epoch_height: u64,
444
445 pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
447
448 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
450
451 pub stake_table_capacity: usize,
453}
454
455impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskState<TYPES, I, V> {
456 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create event dependency", level = "error")]
458 fn create_event_dependency(
459 &self,
460 dependency_type: VoteDependency,
461 view_number: TYPES::View,
462 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
463 cancel_receiver: Receiver<()>,
464 ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
465 let id = self.id;
466 EventDependency::new(
467 event_receiver,
468 cancel_receiver,
469 format!(
470 "VoteDependency::{:?} for view {:?}, my id {:?}",
471 dependency_type, view_number, self.id
472 ),
473 Box::new(move |event| {
474 let event = event.as_ref();
475 let event_view = match dependency_type {
476 VoteDependency::QuorumProposal => {
477 if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
478 proposal.data.view_number()
479 } else {
480 return false;
481 }
482 },
483 VoteDependency::Dac => {
484 if let HotShotEvent::DaCertificateValidated(cert) = event {
485 cert.view_number
486 } else {
487 return false;
488 }
489 },
490 VoteDependency::Vid => {
491 if let HotShotEvent::VidShareValidated(disperse) = event {
492 disperse.data.view_number()
493 } else {
494 return false;
495 }
496 },
497 };
498 if event_view == view_number {
499 tracing::debug!(
500 "Vote dependency {dependency_type:?} completed for view {view_number}, my id is {id}");
501 return true;
502 }
503 false
504 }),
505 )
506 }
507
508 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote crete dependency task if new", level = "error")]
511 fn create_dependency_task_if_new(
512 &mut self,
513 view_number: TYPES::View,
514 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
515 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
516 event: Arc<HotShotEvent<TYPES>>,
517 ) {
518 tracing::debug!(
519 "Attempting to make dependency task for view {view_number} and event {event:?}"
520 );
521
522 if self.vote_dependencies.contains_key(&view_number) {
523 return;
524 }
525
526 let (cancel_sender, cancel_receiver) = broadcast(1);
527
528 let mut quorum_proposal_dependency = self.create_event_dependency(
529 VoteDependency::QuorumProposal,
530 view_number,
531 event_receiver.clone(),
532 cancel_receiver.clone(),
533 );
534 let dac_dependency = self.create_event_dependency(
535 VoteDependency::Dac,
536 view_number,
537 event_receiver.clone(),
538 cancel_receiver.clone(),
539 );
540 let vid_dependency = self.create_event_dependency(
541 VoteDependency::Vid,
542 view_number,
543 event_receiver.clone(),
544 cancel_receiver.clone(),
545 );
546 if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
548 quorum_proposal_dependency.mark_as_completed(event);
549 }
550
551 let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency];
552
553 let dependency_chain = AndDependency::from_deps(deps);
554
555 let dependency_task = DependencyTask::new(
556 dependency_chain,
557 VoteDependencyHandle::<TYPES, I, V> {
558 public_key: self.public_key.clone(),
559 private_key: self.private_key.clone(),
560 consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
561 instance_state: Arc::clone(&self.instance_state),
562 membership_coordinator: self.membership.clone(),
563 storage: self.storage.clone(),
564 storage_metrics: Arc::clone(&self.storage_metrics),
565 view_number,
566 sender: event_sender.clone(),
567 receiver: event_receiver.clone().deactivate(),
568 upgrade_lock: self.upgrade_lock.clone(),
569 id: self.id,
570 epoch_height: self.epoch_height,
571 consensus_metrics: Arc::clone(&self.consensus_metrics),
572 state_private_key: self.state_private_key.clone(),
573 first_epoch: self.first_epoch,
574 stake_table_capacity: self.stake_table_capacity,
575 cancel_receiver,
576 },
577 );
578 self.vote_dependencies.insert(view_number, cancel_sender);
579
580 dependency_task.run();
581 }
582
583 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote update latest voted view", level = "error")]
585 async fn update_latest_voted_view(&mut self, new_view: TYPES::View) -> bool {
586 if *self.latest_voted_view < *new_view {
587 tracing::debug!(
588 "Updating next vote view from {} to {} in the quorum vote task",
589 *self.latest_voted_view,
590 *new_view
591 );
592
593 for view in *self.latest_voted_view..(*new_view) {
595 let maybe_cancel_sender = self.vote_dependencies.remove(&TYPES::View::new(view));
596 if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
597 tracing::error!("Aborting vote dependency task for view {view}");
598 let _ = maybe_cancel_sender.unwrap().try_broadcast(());
599 }
600 }
601
602 if let Ok(last_voted_view_usize) = usize::try_from(*new_view) {
604 self.consensus_metrics
605 .last_voted_view
606 .set(last_voted_view_usize);
607 } else {
608 tracing::warn!("Failed to convert last voted view to a usize: {new_view}");
609 }
610
611 self.latest_voted_view = new_view;
612
613 return true;
614 }
615 false
616 }
617
618 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote handle", level = "error", target = "QuorumVoteTaskState")]
620 pub async fn handle(
621 &mut self,
622 event: Arc<HotShotEvent<TYPES>>,
623 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
624 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
625 ) -> Result<()> {
626 match event.as_ref() {
627 HotShotEvent::QuorumProposalValidated(proposal, _parent_leaf) => {
628 tracing::trace!(
629 "Received Proposal for view {}",
630 *proposal.data.view_number()
631 );
632
633 if let Err(e) =
635 handle_quorum_proposal_validated(&proposal.data, self, &event_sender).await
636 {
637 tracing::debug!(
638 "Failed to handle QuorumProposalValidated event; error = {e:#}"
639 );
640 }
641
642 ensure!(
643 proposal.data.view_number() > self.latest_voted_view,
644 "We have already voted for this view"
645 );
646
647 self.create_dependency_task_if_new(
648 proposal.data.view_number(),
649 event_receiver,
650 &event_sender,
651 Arc::clone(&event),
652 );
653 },
654 HotShotEvent::DaCertificateRecv(cert) => {
655 let view = cert.view_number;
656
657 tracing::trace!("Received DAC for view {view}");
658 ensure!(
660 view > self.latest_voted_view,
661 "Received DAC for an older view."
662 );
663
664 let cert_epoch = cert.data.epoch;
665
666 let epoch_membership = self.membership.stake_table_for_epoch(cert_epoch).await?;
667 let membership_da_stake_table = epoch_membership.da_stake_table().await;
668 let membership_da_success_threshold = epoch_membership.da_success_threshold().await;
669
670 cert.is_valid_cert(
672 &StakeTableEntries::<TYPES>::from(membership_da_stake_table).0,
673 membership_da_success_threshold,
674 &self.upgrade_lock,
675 )
676 .await
677 .context(|e| warn!("Invalid DAC: {e}"))?;
678
679 self.consensus
681 .write()
682 .await
683 .update_saved_da_certs(view, cert.clone());
684
685 broadcast_event(
686 Arc::new(HotShotEvent::DaCertificateValidated(cert.clone())),
687 &event_sender.clone(),
688 )
689 .await;
690 self.create_dependency_task_if_new(
691 view,
692 event_receiver,
693 &event_sender,
694 Arc::clone(&event),
695 );
696 },
697 HotShotEvent::VidShareRecv(sender, share) => {
698 let view = share.data.view_number();
699 tracing::trace!("Received VID share for view {view}");
701 ensure!(
702 view > self.latest_voted_view,
703 "Received VID share for an older view."
704 );
705
706 let payload_commitment = share.data.payload_commitment_ref();
708
709 ensure!(
711 sender.validate(&share.signature, payload_commitment.as_ref()),
712 "VID share signature is invalid, sender: {}, signature: {:?}, payload_commitment: {:?}",
713 sender,
714 share.signature,
715 payload_commitment
716 );
717
718 let vid_epoch = share.data.epoch();
719 let target_epoch = share.data.target_epoch();
720 let membership_reader = self.membership.membership_for_epoch(vid_epoch).await?;
721 ensure!(
723 membership_reader
724 .da_committee_members(view)
725 .await
726 .contains(sender)
727 || *sender == membership_reader.leader(view).await?,
728 "VID share was not sent by a DA member or the view leader."
729 );
730
731 let total_weight = vid_total_weight::<TYPES>(
732 &self
733 .membership
734 .membership_for_epoch(target_epoch)
735 .await?
736 .stake_table()
737 .await,
738 target_epoch,
739 );
740
741 if let Err(()) = share.data.verify_share(total_weight) {
742 bail!("Failed to verify VID share");
743 }
744
745 self.consensus
746 .write()
747 .await
748 .update_vid_shares(view, share.clone());
749
750 ensure!(
751 *share.data.recipient_key() == self.public_key,
752 "Got a Valid VID share but it's not for our key"
753 );
754
755 broadcast_event(
756 Arc::new(HotShotEvent::VidShareValidated(share.clone())),
757 &event_sender.clone(),
758 )
759 .await;
760 self.create_dependency_task_if_new(
761 view,
762 event_receiver,
763 &event_sender,
764 Arc::clone(&event),
765 );
766 },
767 HotShotEvent::Timeout(view, ..) => {
768 let view = TYPES::View::new(view.saturating_sub(1));
769 let current_tasks = self.vote_dependencies.split_off(&view);
771 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
772 if !cancel_sender.is_closed() {
773 tracing::error!("Aborting vote dependency task for view {view}");
774 let _ = cancel_sender.try_broadcast(());
775 }
776 }
777 self.vote_dependencies = current_tasks;
778 },
779 HotShotEvent::ViewChange(mut view, _) => {
780 view = TYPES::View::new(view.saturating_sub(1));
781 if !self.update_latest_voted_view(view).await {
782 tracing::debug!("view not updated");
783 }
784 let current_tasks = self.vote_dependencies.split_off(&view);
786 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
787 if !cancel_sender.is_closed() {
788 tracing::error!("Aborting vote dependency task for view {view}");
789 let _ = cancel_sender.try_broadcast(());
790 }
791 }
792 self.vote_dependencies = current_tasks;
793 },
794 HotShotEvent::SetFirstEpoch(view, epoch) => {
795 self.first_epoch = Some((*view, *epoch));
796 },
797 _ => {},
798 }
799 Ok(())
800 }
801}
802
803#[async_trait]
804impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
805 for QuorumVoteTaskState<TYPES, I, V>
806{
807 type Event = HotShotEvent<TYPES>;
808
809 async fn handle_event(
810 &mut self,
811 event: Arc<Self::Event>,
812 sender: &Sender<Arc<Self::Event>>,
813 receiver: &Receiver<Arc<Self::Event>>,
814 ) -> Result<()> {
815 self.handle(event, receiver.clone(), sender.clone()).await
816 }
817
818 fn cancel_subtasks(&mut self) {
819 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
820 if !cancel_sender.is_closed() {
821 tracing::error!("Aborting vote dependency task for view {view}");
822 let _ = cancel_sender.try_broadcast(());
823 }
824 }
825 }
826}