1use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::{
13 dependency::{AndDependency, EventDependency},
14 dependency_task::{DependencyTask, HandleDepOutput},
15 task::TaskState,
16};
17use hotshot_types::{
18 consensus::{ConsensusMetricsValue, OuterConsensus},
19 data::{vid_disperse::vid_total_weight, Leaf2},
20 epoch_membership::EpochMembershipCoordinator,
21 event::Event,
22 message::UpgradeLock,
23 simple_vote::HasEpoch,
24 stake_table::StakeTableEntries,
25 storage_metrics::StorageMetricsValue,
26 traits::{
27 block_contents::BlockHeader,
28 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
29 signature_key::{SignatureKey, StateSignatureKey},
30 storage::Storage,
31 },
32 utils::{is_epoch_root, is_epoch_transition, is_last_block, option_epoch_from_block_number},
33 vote::{Certificate, HasViewNumber},
34 VersionedDaCommittee,
35};
36use hotshot_utils::anytrace::*;
37use tracing::instrument;
38
39use crate::{
40 events::HotShotEvent,
41 helpers::{broadcast_event, broadcast_view_change, wait_for_second_vid_share},
42 quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state},
43};
44
45mod handlers;
47
48#[derive(Debug, PartialEq)]
50enum VoteDependency {
51 QuorumProposal,
53 Dac,
55 Vid,
57}
58
59pub struct VoteDependencyHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
61 pub public_key: TYPES::SignatureKey,
63
64 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
66
67 pub consensus: OuterConsensus<TYPES>,
69
70 pub instance_state: Arc<TYPES::InstanceState>,
72
73 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
75
76 pub storage: I::Storage,
78
79 pub storage_metrics: Arc<StorageMetricsValue>,
81
82 pub view_number: TYPES::View,
84
85 pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
87
88 pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
90
91 pub upgrade_lock: UpgradeLock<TYPES, V>,
93
94 pub consensus_metrics: Arc<ConsensusMetricsValue>,
96
97 pub id: u64,
99
100 pub epoch_height: u64,
102
103 pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
108
109 pub stake_table_capacity: usize,
111
112 pub cancel_receiver: Receiver<()>,
113}
114
115impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> HandleDepOutput
116 for VoteDependencyHandle<TYPES, I, V>
117{
118 type Output = Vec<Arc<HotShotEvent<TYPES>>>;
119
120 #[allow(clippy::too_many_lines)]
121 #[instrument(skip_all, fields(id = self.id, view = *self.view_number))]
122 async fn handle_dep_result(self, res: Self::Output) {
123 let result = self.handle_vote_deps(&res).await;
124 if result.is_err() {
125 log!(result);
126 self.print_vote_events(&res)
127 }
128 }
129}
130
131impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
132 VoteDependencyHandle<TYPES, I, V>
133{
134 fn print_vote_events(&self, res: &[Arc<HotShotEvent<TYPES>>]) {
135 let events: Vec<_> = res.iter().map(Arc::as_ref).collect();
136 tracing::warn!("Failed to vote, events: {:?}", events);
137 }
138
139 async fn handle_vote_deps(&self, res: &[Arc<HotShotEvent<TYPES>>]) -> Result<()> {
140 let mut payload_commitment = None;
141 let mut next_epoch_payload_commitment = None;
142 let mut leaf = None;
143 let mut vid_share = None;
144 let mut da_cert = None;
145 let mut parent_view_number = None;
146 for event in res.iter() {
147 match event.as_ref() {
148 #[allow(unused_assignments)]
149 HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
150 let proposal_payload_comm = proposal.data.block_header().payload_commitment();
151 let parent_commitment = parent_leaf.commit();
152 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
153
154 if let Some(ref comm) = payload_commitment {
155 ensure!(
156 proposal_payload_comm == *comm,
157 error!(
158 "Quorum proposal has inconsistent payload commitment with DAC or \
159 VID."
160 )
161 );
162 } else {
163 payload_commitment = Some(proposal_payload_comm);
164 }
165
166 ensure!(
167 proposed_leaf.parent_commitment() == parent_commitment,
168 warn!(
169 "Proposed leaf parent commitment does not match parent leaf payload \
170 commitment. Aborting vote."
171 )
172 );
173
174 let now = Instant::now();
175 self.storage
178 .append_proposal_wrapper(proposal)
179 .await
180 .map_err(|e| {
181 error!("failed to store proposal, not voting. error = {e:#}")
182 })?;
183 self.storage_metrics
184 .append_quorum_duration
185 .add_point(now.elapsed().as_secs_f64());
186
187 leaf = Some(proposed_leaf);
188 parent_view_number = Some(parent_leaf.view_number());
189 },
190 HotShotEvent::DaCertificateValidated(cert) => {
191 let cert_payload_comm = &cert.data().payload_commit;
192 let next_epoch_cert_payload_comm = cert.data().next_epoch_payload_commit;
193 if let Some(ref comm) = payload_commitment {
194 ensure!(
195 cert_payload_comm == comm,
196 error!(
197 "DAC has inconsistent payload commitment with quorum proposal or \
198 VID."
199 )
200 );
201 } else {
202 payload_commitment = Some(*cert_payload_comm);
203 }
204 if next_epoch_payload_commitment.is_some()
205 && next_epoch_payload_commitment != next_epoch_cert_payload_comm
206 {
207 bail!(error!(
208 "DAC has inconsistent next epoch payload commitment with VID."
209 ));
210 } else {
211 next_epoch_payload_commitment = next_epoch_cert_payload_comm;
212 }
213 da_cert = Some(cert.clone());
214 },
215 HotShotEvent::VidShareValidated(share) => {
216 let vid_payload_commitment = &share.data.payload_commitment();
217 vid_share = Some(share.clone());
218 let is_next_epoch_vid = share.data.epoch() != share.data.target_epoch();
219 if is_next_epoch_vid {
220 if let Some(ref comm) = next_epoch_payload_commitment {
221 ensure!(
222 vid_payload_commitment == comm,
223 error!(
224 "VID has inconsistent next epoch payload commitment with DAC."
225 )
226 );
227 } else {
228 next_epoch_payload_commitment = Some(*vid_payload_commitment);
229 }
230 } else if let Some(ref comm) = payload_commitment {
231 ensure!(
232 vid_payload_commitment == comm,
233 error!(
234 "VID has inconsistent payload commitment with quorum proposal or \
235 DAC."
236 )
237 );
238 } else {
239 payload_commitment = Some(*vid_payload_commitment);
240 }
241 },
242 _ => {},
243 }
244 }
245
246 let Some(vid_share) = vid_share else {
247 bail!(error!(
248 "We don't have the VID share for this view {}, but we should, because the vote \
249 dependencies have completed.",
250 self.view_number
251 ));
252 };
253
254 let Some(leaf) = leaf else {
255 bail!(error!(
256 "We don't have the leaf for this view {}, but we should, because the vote \
257 dependencies have completed.",
258 self.view_number
259 ));
260 };
261
262 let Some(da_cert) = da_cert else {
263 bail!(error!(
264 "We don't have the DA cert for this view {}, but we should, because the vote \
265 dependencies have completed.",
266 self.view_number
267 ));
268 };
269
270 let mut maybe_current_epoch_vid_share = None;
271 if self.upgrade_lock.epochs_enabled(leaf.view_number()).await
273 && is_epoch_transition(leaf.block_header().block_number(), self.epoch_height)
274 {
275 let current_epoch = option_epoch_from_block_number::<TYPES>(
276 leaf.with_epoch,
277 leaf.block_header().block_number(),
278 self.epoch_height,
279 );
280 let next_epoch = current_epoch.map(|e| e + 1);
281
282 let Ok(current_epoch_membership) = self
283 .membership_coordinator
284 .stake_table_for_epoch(current_epoch)
285 .await
286 else {
287 bail!(warn!(
288 "Couldn't acquire current epoch membership. Do not vote!"
289 ));
290 };
291 let Ok(next_epoch_membership) = self
292 .membership_coordinator
293 .stake_table_for_epoch(next_epoch)
294 .await
295 else {
296 bail!(warn!(
297 "Couldn't acquire next epoch membership. Do not vote!"
298 ));
299 };
300
301 if current_epoch_membership.has_stake(&self.public_key).await
303 && next_epoch_membership.has_stake(&self.public_key).await
304 {
305 let other_target_epoch = if vid_share.data.target_epoch() == current_epoch {
306 maybe_current_epoch_vid_share = Some(vid_share.clone());
307 next_epoch
308 } else {
309 current_epoch
310 };
311 match wait_for_second_vid_share(
312 other_target_epoch,
313 &vid_share,
314 &da_cert,
315 &self.consensus,
316 &self.receiver.activate_cloned(),
317 self.cancel_receiver.clone(),
318 self.id,
319 )
320 .await
321 {
322 Ok(other_vid_share) => {
323 if maybe_current_epoch_vid_share.is_none() {
324 maybe_current_epoch_vid_share = Some(other_vid_share);
325 }
326 ensure!(
327 leaf.block_header().payload_commitment()
328 == maybe_current_epoch_vid_share
329 .as_ref()
330 .unwrap()
331 .data
332 .payload_commitment(),
333 error!(
334 "We have both epochs vid shares but the leaf's vid commit doesn't \
335 match the old epoch vid share's commit. It should never happen."
336 )
337 );
338 },
339 Err(e) => {
340 bail!(warn!(
341 "This is an epoch transition block, we are in both epochs but we \
342 received only one VID share. Do not vote! Error: {e:?}"
343 ));
344 },
345 }
346 }
347 }
348
349 update_shared_state::<TYPES, V>(
351 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
352 self.sender.clone(),
353 self.receiver.clone(),
354 self.membership_coordinator.clone(),
355 self.public_key.clone(),
356 self.private_key.clone(),
357 self.upgrade_lock.clone(),
358 self.view_number,
359 Arc::clone(&self.instance_state),
360 &leaf,
361 maybe_current_epoch_vid_share.as_ref().unwrap_or(&vid_share),
362 parent_view_number,
363 self.epoch_height,
364 )
365 .await
366 .context(error!("Failed to update shared consensus state"))?;
367
368 let cur_epoch = option_epoch_from_block_number::<TYPES>(
369 leaf.with_epoch,
370 leaf.height(),
371 self.epoch_height,
372 );
373
374 let now = Instant::now();
375 let epoch_membership = self
379 .membership_coordinator
380 .membership_for_epoch(cur_epoch)
381 .await?;
382
383 let duration = now.elapsed();
384 tracing::info!("membership_for_epoch time: {duration:?}");
385
386 let is_vote_leaf_extended = is_last_block(leaf.height(), self.epoch_height);
387 let is_vote_epoch_root = is_epoch_root(leaf.height(), self.epoch_height);
388 if cur_epoch.is_none() || !is_vote_leaf_extended {
389 broadcast_view_change(
393 &self.sender,
394 leaf.view_number() + 1,
395 cur_epoch,
396 self.first_epoch,
397 )
398 .await;
399 }
400
401 let leader = epoch_membership.leader(self.view_number).await;
402 if let (Ok(leader_key), Some(cur_epoch)) = (leader, cur_epoch) {
403 self.consensus
404 .write()
405 .await
406 .update_validator_participation(leader_key, cur_epoch, true);
407 }
408
409 submit_vote::<TYPES, I, V>(
410 self.sender.clone(),
411 epoch_membership,
412 self.public_key.clone(),
413 self.private_key.clone(),
414 self.upgrade_lock.clone(),
415 self.view_number,
416 self.storage.clone(),
417 Arc::clone(&self.storage_metrics),
418 leaf,
419 maybe_current_epoch_vid_share.unwrap_or(vid_share),
420 is_vote_leaf_extended,
421 is_vote_epoch_root,
422 self.epoch_height,
423 &self.state_private_key,
424 self.stake_table_capacity,
425 )
426 .await
427 }
428}
429
430pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
434 pub public_key: TYPES::SignatureKey,
436
437 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
439
440 pub consensus: OuterConsensus<TYPES>,
442
443 pub instance_state: Arc<TYPES::InstanceState>,
445
446 pub latest_voted_view: TYPES::View,
448
449 pub vote_dependencies: BTreeMap<TYPES::View, Sender<()>>,
451
452 pub network: Arc<I::Network>,
454
455 pub membership: EpochMembershipCoordinator<TYPES>,
457
458 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
460
461 pub id: u64,
463
464 pub consensus_metrics: Arc<ConsensusMetricsValue>,
466
467 pub storage: I::Storage,
469
470 pub storage_metrics: Arc<StorageMetricsValue>,
472
473 pub upgrade_lock: UpgradeLock<TYPES, V>,
475
476 pub epoch_height: u64,
478
479 pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
481
482 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
484
485 pub stake_table_capacity: usize,
487
488 pub da_committees: Vec<VersionedDaCommittee<TYPES>>,
490}
491
492impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskState<TYPES, I, V> {
493 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create event dependency", level = "error")]
495 fn create_event_dependency(
496 &self,
497 dependency_type: VoteDependency,
498 view_number: TYPES::View,
499 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
500 cancel_receiver: Receiver<()>,
501 ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
502 let id = self.id;
503 EventDependency::new(
504 event_receiver,
505 cancel_receiver,
506 format!(
507 "VoteDependency::{:?} for view {:?}, my id {:?}",
508 dependency_type, view_number, self.id
509 ),
510 Box::new(move |event| {
511 let event = event.as_ref();
512 let event_view = match dependency_type {
513 VoteDependency::QuorumProposal => {
514 if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
515 proposal.data.view_number()
516 } else {
517 return false;
518 }
519 },
520 VoteDependency::Dac => {
521 if let HotShotEvent::DaCertificateValidated(cert) = event {
522 cert.view_number
523 } else {
524 return false;
525 }
526 },
527 VoteDependency::Vid => {
528 if let HotShotEvent::VidShareValidated(disperse) = event {
529 disperse.data.view_number()
530 } else {
531 return false;
532 }
533 },
534 };
535 if event_view == view_number {
536 tracing::debug!(
537 "Vote dependency {dependency_type:?} completed for view {view_number}, my \
538 id is {id}"
539 );
540 return true;
541 }
542 false
543 }),
544 )
545 }
546
547 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create dependency task if new", level = "error")]
550 fn create_dependency_task_if_new(
551 &mut self,
552 view_number: TYPES::View,
553 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
554 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
555 event: Arc<HotShotEvent<TYPES>>,
556 ) {
557 tracing::debug!(
558 "Attempting to make dependency task for view {view_number} and event {event:?}"
559 );
560
561 if self.vote_dependencies.contains_key(&view_number) {
562 return;
563 }
564
565 let (cancel_sender, cancel_receiver) = broadcast(1);
566
567 let mut quorum_proposal_dependency = self.create_event_dependency(
568 VoteDependency::QuorumProposal,
569 view_number,
570 event_receiver.clone(),
571 cancel_receiver.clone(),
572 );
573 let dac_dependency = self.create_event_dependency(
574 VoteDependency::Dac,
575 view_number,
576 event_receiver.clone(),
577 cancel_receiver.clone(),
578 );
579 let vid_dependency = self.create_event_dependency(
580 VoteDependency::Vid,
581 view_number,
582 event_receiver.clone(),
583 cancel_receiver.clone(),
584 );
585 if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
587 quorum_proposal_dependency.mark_as_completed(event);
588 }
589
590 let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency];
591
592 let dependency_chain = AndDependency::from_deps(deps);
593
594 let dependency_task = DependencyTask::new(
595 dependency_chain,
596 VoteDependencyHandle::<TYPES, I, V> {
597 public_key: self.public_key.clone(),
598 private_key: self.private_key.clone(),
599 consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
600 instance_state: Arc::clone(&self.instance_state),
601 membership_coordinator: self.membership.clone(),
602 storage: self.storage.clone(),
603 storage_metrics: Arc::clone(&self.storage_metrics),
604 view_number,
605 sender: event_sender.clone(),
606 receiver: event_receiver.clone().deactivate(),
607 upgrade_lock: self.upgrade_lock.clone(),
608 id: self.id,
609 epoch_height: self.epoch_height,
610 consensus_metrics: Arc::clone(&self.consensus_metrics),
611 state_private_key: self.state_private_key.clone(),
612 first_epoch: self.first_epoch,
613 stake_table_capacity: self.stake_table_capacity,
614 cancel_receiver,
615 },
616 );
617 self.vote_dependencies.insert(view_number, cancel_sender);
618
619 dependency_task.run();
620 }
621
622 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote update latest voted view", level = "error")]
624 async fn update_latest_voted_view(&mut self, new_view: TYPES::View) -> bool {
625 if *self.latest_voted_view < *new_view {
626 tracing::debug!(
627 "Updating next vote view from {} to {} in the quorum vote task",
628 *self.latest_voted_view,
629 *new_view
630 );
631
632 for view in *self.latest_voted_view..(*new_view) {
634 let maybe_cancel_sender = self.vote_dependencies.remove(&TYPES::View::new(view));
635 if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
636 tracing::error!("Aborting vote dependency task for view {view}");
637 let _ = maybe_cancel_sender.unwrap().try_broadcast(());
638 }
639 }
640
641 if let Ok(last_voted_view_usize) = usize::try_from(*new_view) {
643 self.consensus_metrics
644 .last_voted_view
645 .set(last_voted_view_usize);
646 } else {
647 tracing::warn!("Failed to convert last voted view to a usize: {new_view}");
648 }
649
650 self.latest_voted_view = new_view;
651
652 return true;
653 }
654 false
655 }
656
657 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote handle", level = "error", target = "QuorumVoteTaskState")]
659 pub async fn handle(
660 &mut self,
661 event: Arc<HotShotEvent<TYPES>>,
662 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
663 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
664 ) -> Result<()> {
665 match event.as_ref() {
666 HotShotEvent::QuorumProposalValidated(proposal, _parent_leaf) => {
667 tracing::trace!(
668 "Received Proposal for view {}",
669 *proposal.data.view_number()
670 );
671
672 if let Err(e) =
674 handle_quorum_proposal_validated(&proposal.data, self, &event_sender).await
675 {
676 tracing::debug!(
677 "Failed to handle QuorumProposalValidated event; error = {e:#}"
678 );
679 }
680
681 ensure!(
682 proposal.data.view_number() > self.latest_voted_view,
683 "We have already voted for this view"
684 );
685
686 self.create_dependency_task_if_new(
687 proposal.data.view_number(),
688 event_receiver,
689 &event_sender,
690 Arc::clone(&event),
691 );
692 },
693 HotShotEvent::DaCertificateRecv(cert) => {
694 let view = cert.view_number;
695
696 tracing::trace!("Received DAC for view {view}");
697 ensure!(
699 view > self.latest_voted_view,
700 "Received DAC for an older view."
701 );
702
703 let cert_epoch = cert.data.epoch;
704
705 let epoch_membership = self.membership.stake_table_for_epoch(cert_epoch).await?;
706 let membership_da_stake_table = epoch_membership.da_stake_table().await;
707 let membership_da_success_threshold = epoch_membership.da_success_threshold().await;
708
709 cert.is_valid_cert(
711 &StakeTableEntries::<TYPES>::from(membership_da_stake_table).0,
712 membership_da_success_threshold,
713 &self.upgrade_lock,
714 )
715 .await
716 .context(|e| warn!("Invalid DAC: {e}"))?;
717
718 self.consensus
720 .write()
721 .await
722 .update_saved_da_certs(view, cert.clone());
723
724 broadcast_event(
725 Arc::new(HotShotEvent::DaCertificateValidated(cert.clone())),
726 &event_sender.clone(),
727 )
728 .await;
729 self.create_dependency_task_if_new(
730 view,
731 event_receiver,
732 &event_sender,
733 Arc::clone(&event),
734 );
735 },
736 HotShotEvent::VidShareRecv(sender, share) => {
737 let view = share.data.view_number();
738 tracing::trace!("Received VID share for view {view}");
740 ensure!(
741 view > self.latest_voted_view,
742 "Received VID share for an older view."
743 );
744
745 let payload_commitment = share.data.payload_commitment_ref();
747
748 ensure!(
750 sender.validate(&share.signature, payload_commitment.as_ref()),
751 error!(
752 "VID share signature is invalid, sender: {}, signature: {:?}, \
753 payload_commitment: {:?}",
754 sender, share.signature, payload_commitment
755 )
756 );
757
758 let vid_epoch = share.data.epoch();
759 let target_epoch = share.data.target_epoch();
760 let membership_reader = self.membership.membership_for_epoch(vid_epoch).await?;
761 ensure!(
763 membership_reader
764 .da_committee_members(view)
765 .await
766 .contains(sender)
767 || *sender == membership_reader.leader(view).await?,
768 "VID share was not sent by a DA member or the view leader."
769 );
770
771 let total_weight = vid_total_weight::<TYPES>(
772 &self
773 .membership
774 .membership_for_epoch(target_epoch)
775 .await?
776 .stake_table()
777 .await,
778 target_epoch,
779 );
780
781 if let Err(()) = share.data.verify_share(total_weight) {
782 bail!("Failed to verify VID share");
783 }
784
785 self.consensus
786 .write()
787 .await
788 .update_vid_shares(view, share.clone());
789
790 ensure!(
791 *share.data.recipient_key() == self.public_key,
792 "Got a Valid VID share but it's not for our key"
793 );
794
795 broadcast_event(
796 Arc::new(HotShotEvent::VidShareValidated(share.clone())),
797 &event_sender.clone(),
798 )
799 .await;
800 self.create_dependency_task_if_new(
801 view,
802 event_receiver,
803 &event_sender,
804 Arc::clone(&event),
805 );
806 },
807 HotShotEvent::Timeout(view, ..) => {
808 let view = TYPES::View::new(view.saturating_sub(1));
809 let current_tasks = self.vote_dependencies.split_off(&view);
811 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
812 if !cancel_sender.is_closed() {
813 tracing::error!("Aborting vote dependency task for view {view}");
814 let _ = cancel_sender.try_broadcast(());
815 }
816 }
817 self.vote_dependencies = current_tasks;
818 },
819 HotShotEvent::ViewChange(mut view, _) => {
820 view = TYPES::View::new(view.saturating_sub(1));
821 if !self.update_latest_voted_view(view).await {
822 tracing::debug!("view not updated");
823 }
824 let current_tasks = self.vote_dependencies.split_off(&view);
826 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
827 if !cancel_sender.is_closed() {
828 tracing::error!("Aborting vote dependency task for view {view}");
829 let _ = cancel_sender.try_broadcast(());
830 }
831 }
832 self.vote_dependencies = current_tasks;
833 },
834 HotShotEvent::SetFirstEpoch(view, epoch) => {
835 self.first_epoch = Some((*view, *epoch));
836 },
837 _ => {},
838 }
839 Ok(())
840 }
841}
842
843#[async_trait]
844impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
845 for QuorumVoteTaskState<TYPES, I, V>
846{
847 type Event = HotShotEvent<TYPES>;
848
849 async fn handle_event(
850 &mut self,
851 event: Arc<Self::Event>,
852 sender: &Sender<Arc<Self::Event>>,
853 receiver: &Receiver<Arc<Self::Event>>,
854 ) -> Result<()> {
855 self.handle(event, receiver.clone(), sender.clone()).await
856 }
857
858 fn cancel_subtasks(&mut self) {
859 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
860 if !cancel_sender.is_closed() {
861 tracing::error!("Aborting vote dependency task for view {view}");
862 let _ = cancel_sender.try_broadcast(());
863 }
864 }
865 }
866}