1use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{InactiveReceiver, Receiver, Sender, broadcast};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::{
13 dependency::{AndDependency, EventDependency},
14 dependency_task::{DependencyTask, HandleDepOutput},
15 task::TaskState,
16};
17use hotshot_types::{
18 VersionedDaCommittee,
19 consensus::{ConsensusMetricsValue, OuterConsensus},
20 data::{EpochNumber, Leaf2, ViewNumber, vid_disperse::vid_total_weight},
21 epoch_membership::EpochMembershipCoordinator,
22 event::Event,
23 message::UpgradeLock,
24 simple_vote::HasEpoch,
25 stake_table::StakeTableEntries,
26 storage_metrics::StorageMetricsValue,
27 traits::{
28 block_contents::BlockHeader,
29 node_implementation::{NodeImplementation, NodeType},
30 signature_key::{SignatureKey, StateSignatureKey},
31 storage::Storage,
32 },
33 utils::{is_epoch_root, is_epoch_transition, is_last_block, option_epoch_from_block_number},
34 vote::{Certificate, HasViewNumber},
35};
36use hotshot_utils::anytrace::*;
37use tracing::instrument;
38
39use crate::{
40 events::HotShotEvent,
41 helpers::{broadcast_event, broadcast_view_change, wait_for_second_vid_share},
42 quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state},
43};
44
45mod handlers;
47
48#[derive(Debug, PartialEq)]
50enum VoteDependency {
51 QuorumProposal,
53 Dac,
55 Vid,
57}
58
59pub struct VoteDependencyHandle<TYPES: NodeType, I: NodeImplementation<TYPES>> {
61 pub public_key: TYPES::SignatureKey,
63
64 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
66
67 pub consensus: OuterConsensus<TYPES>,
69
70 pub instance_state: Arc<TYPES::InstanceState>,
72
73 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
75
76 pub storage: I::Storage,
78
79 pub storage_metrics: Arc<StorageMetricsValue>,
81
82 pub view_number: ViewNumber,
84
85 pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
87
88 pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
90
91 pub upgrade_lock: UpgradeLock<TYPES>,
93
94 pub consensus_metrics: Arc<ConsensusMetricsValue>,
96
97 pub id: u64,
99
100 pub epoch_height: u64,
102
103 pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
108
109 pub stake_table_capacity: usize,
111
112 pub cancel_receiver: Receiver<()>,
113}
114
115impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> HandleDepOutput
116 for VoteDependencyHandle<TYPES, I>
117{
118 type Output = Vec<Arc<HotShotEvent<TYPES>>>;
119
120 #[allow(clippy::too_many_lines)]
121 #[instrument(skip_all, fields(id = self.id, view = *self.view_number))]
122 async fn handle_dep_result(self, res: Self::Output) {
123 let mut cancel_receiver = self.cancel_receiver.clone();
124 let result = tokio::select! { result = self.handle_vote_deps(&res) => {
125 result
126 }
127 _ = cancel_receiver.recv() => {
128 tracing::warn!("Vote dependency task cancelled");
129 return;
130 }
131 };
132 if result.is_err() {
133 log!(result);
134 self.print_vote_events(&res)
135 }
136 }
137}
138
139impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> VoteDependencyHandle<TYPES, I> {
140 fn print_vote_events(&self, res: &[Arc<HotShotEvent<TYPES>>]) {
141 let events: Vec<_> = res.iter().map(Arc::as_ref).collect();
142 tracing::warn!("Failed to vote, events: {:?}", events);
143 }
144
145 async fn handle_vote_deps(&self, res: &[Arc<HotShotEvent<TYPES>>]) -> Result<()> {
146 let mut payload_commitment = None;
147 let mut next_epoch_payload_commitment = None;
148 let mut leaf = None;
149 let mut vid_share = None;
150 let mut da_cert = None;
151 let mut parent_view_number = None;
152 for event in res.iter() {
153 match event.as_ref() {
154 #[allow(unused_assignments)]
155 HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
156 let proposal_payload_comm = proposal.data.block_header().payload_commitment();
157 let parent_commitment = parent_leaf.commit();
158 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
159
160 if let Some(ref comm) = payload_commitment {
161 ensure!(
162 proposal_payload_comm == *comm,
163 error!(
164 "Quorum proposal has inconsistent payload commitment with DAC or \
165 VID."
166 )
167 );
168 } else {
169 payload_commitment = Some(proposal_payload_comm);
170 }
171
172 ensure!(
173 proposed_leaf.parent_commitment() == parent_commitment,
174 warn!(
175 "Proposed leaf parent commitment does not match parent leaf payload \
176 commitment. Aborting vote."
177 )
178 );
179
180 let now = Instant::now();
181 self.storage
184 .append_proposal_wrapper(proposal)
185 .await
186 .map_err(|e| {
187 error!("failed to store proposal, not voting. error = {e:#}")
188 })?;
189 self.storage_metrics
190 .append_quorum_duration
191 .add_point(now.elapsed().as_secs_f64());
192
193 leaf = Some(proposed_leaf);
194 parent_view_number = Some(parent_leaf.view_number());
195 },
196 HotShotEvent::DaCertificateValidated(cert) => {
197 let cert_payload_comm = &cert.data().payload_commit;
198 let next_epoch_cert_payload_comm = cert.data().next_epoch_payload_commit;
199 if let Some(ref comm) = payload_commitment {
200 ensure!(
201 cert_payload_comm == comm,
202 error!(
203 "DAC has inconsistent payload commitment with quorum proposal or \
204 VID."
205 )
206 );
207 } else {
208 payload_commitment = Some(*cert_payload_comm);
209 }
210 if next_epoch_payload_commitment.is_some()
211 && next_epoch_payload_commitment != next_epoch_cert_payload_comm
212 {
213 bail!(error!(
214 "DAC has inconsistent next epoch payload commitment with VID."
215 ));
216 } else {
217 next_epoch_payload_commitment = next_epoch_cert_payload_comm;
218 }
219 da_cert = Some(cert.clone());
220 },
221 HotShotEvent::VidShareValidated(share) => {
222 let vid_payload_commitment = &share.data.payload_commitment();
223 vid_share = Some(share.clone());
224 let is_next_epoch_vid = share.data.epoch() != share.data.target_epoch();
225 if is_next_epoch_vid {
226 if let Some(ref comm) = next_epoch_payload_commitment {
227 ensure!(
228 vid_payload_commitment == comm,
229 error!(
230 "VID has inconsistent next epoch payload commitment with DAC."
231 )
232 );
233 } else {
234 next_epoch_payload_commitment = Some(*vid_payload_commitment);
235 }
236 } else if let Some(ref comm) = payload_commitment {
237 ensure!(
238 vid_payload_commitment == comm,
239 error!(
240 "VID has inconsistent payload commitment with quorum proposal or \
241 DAC."
242 )
243 );
244 } else {
245 payload_commitment = Some(*vid_payload_commitment);
246 }
247 },
248 _ => {},
249 }
250 }
251
252 let Some(vid_share) = vid_share else {
253 bail!(error!(
254 "We don't have the VID share for this view {}, but we should, because the vote \
255 dependencies have completed.",
256 self.view_number
257 ));
258 };
259
260 let Some(leaf) = leaf else {
261 bail!(error!(
262 "We don't have the leaf for this view {}, but we should, because the vote \
263 dependencies have completed.",
264 self.view_number
265 ));
266 };
267
268 let Some(da_cert) = da_cert else {
269 bail!(error!(
270 "We don't have the DA cert for this view {}, but we should, because the vote \
271 dependencies have completed.",
272 self.view_number
273 ));
274 };
275
276 let mut maybe_current_epoch_vid_share = None;
277 if self.upgrade_lock.epochs_enabled(leaf.view_number())
279 && is_epoch_transition(leaf.block_header().block_number(), self.epoch_height)
280 {
281 let current_epoch = option_epoch_from_block_number(
282 leaf.with_epoch,
283 leaf.block_header().block_number(),
284 self.epoch_height,
285 );
286 let next_epoch = current_epoch.map(|e| e + 1);
287
288 let Ok(current_epoch_membership) = self
289 .membership_coordinator
290 .stake_table_for_epoch(current_epoch)
291 .await
292 else {
293 bail!(warn!(
294 "Couldn't acquire current epoch membership. Do not vote!"
295 ));
296 };
297 let Ok(next_epoch_membership) = self
298 .membership_coordinator
299 .stake_table_for_epoch(next_epoch)
300 .await
301 else {
302 bail!(warn!(
303 "Couldn't acquire next epoch membership. Do not vote!"
304 ));
305 };
306
307 if current_epoch_membership.has_stake(&self.public_key).await
309 && next_epoch_membership.has_stake(&self.public_key).await
310 {
311 let other_target_epoch = if vid_share.data.target_epoch() == current_epoch {
312 maybe_current_epoch_vid_share = Some(vid_share.clone());
313 next_epoch
314 } else {
315 current_epoch
316 };
317 match wait_for_second_vid_share(
318 other_target_epoch,
319 &vid_share,
320 &da_cert,
321 &self.consensus,
322 &self.receiver.activate_cloned(),
323 self.cancel_receiver.clone(),
324 self.id,
325 )
326 .await
327 {
328 Ok(other_vid_share) => {
329 if maybe_current_epoch_vid_share.is_none() {
330 maybe_current_epoch_vid_share = Some(other_vid_share);
331 }
332 ensure!(
333 leaf.block_header().payload_commitment()
334 == maybe_current_epoch_vid_share
335 .as_ref()
336 .unwrap()
337 .data
338 .payload_commitment(),
339 error!(
340 "We have both epochs vid shares but the leaf's vid commit doesn't \
341 match the old epoch vid share's commit. It should never happen."
342 )
343 );
344 },
345 Err(e) => {
346 bail!(warn!(
347 "This is an epoch transition block, we are in both epochs but we \
348 received only one VID share. Do not vote! Error: {e:?}"
349 ));
350 },
351 }
352 }
353 }
354
355 update_shared_state::<TYPES>(
357 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
358 self.sender.clone(),
359 self.receiver.clone(),
360 self.membership_coordinator.clone(),
361 self.public_key.clone(),
362 self.private_key.clone(),
363 self.upgrade_lock.clone(),
364 self.view_number,
365 Arc::clone(&self.instance_state),
366 &leaf,
367 maybe_current_epoch_vid_share.as_ref().unwrap_or(&vid_share),
368 parent_view_number,
369 self.epoch_height,
370 )
371 .await
372 .context(error!("Failed to update shared consensus state"))?;
373
374 let cur_epoch =
375 option_epoch_from_block_number(leaf.with_epoch, leaf.height(), self.epoch_height);
376
377 let now = Instant::now();
378 let epoch_membership = self
382 .membership_coordinator
383 .membership_for_epoch(cur_epoch)
384 .await?;
385
386 let duration = now.elapsed();
387 tracing::info!("membership_for_epoch time: {duration:?}");
388
389 let is_vote_leaf_extended = is_last_block(leaf.height(), self.epoch_height);
390 let is_vote_epoch_root = is_epoch_root(leaf.height(), self.epoch_height);
391 if cur_epoch.is_none() || !is_vote_leaf_extended {
392 broadcast_view_change(
396 &self.sender,
397 leaf.view_number() + 1,
398 cur_epoch,
399 self.first_epoch,
400 )
401 .await;
402 }
403
404 let leader = epoch_membership.leader(self.view_number).await;
405 if let (Ok(leader_key), Some(cur_epoch)) = (leader, cur_epoch) {
406 self.consensus
407 .write()
408 .await
409 .update_validator_participation(leader_key, cur_epoch, true);
410 }
411
412 submit_vote::<TYPES, I>(
413 self.sender.clone(),
414 epoch_membership,
415 self.public_key.clone(),
416 self.private_key.clone(),
417 self.upgrade_lock.clone(),
418 self.view_number,
419 self.storage.clone(),
420 Arc::clone(&self.storage_metrics),
421 leaf,
422 maybe_current_epoch_vid_share.unwrap_or(vid_share),
423 is_vote_leaf_extended,
424 is_vote_epoch_root,
425 self.epoch_height,
426 &self.state_private_key,
427 self.stake_table_capacity,
428 )
429 .await
430 }
431}
432
433pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
437 pub public_key: TYPES::SignatureKey,
439
440 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
442
443 pub consensus: OuterConsensus<TYPES>,
445
446 pub instance_state: Arc<TYPES::InstanceState>,
448
449 pub latest_voted_view: ViewNumber,
451
452 pub vote_dependencies: BTreeMap<ViewNumber, Sender<()>>,
454
455 pub network: Arc<I::Network>,
457
458 pub membership: EpochMembershipCoordinator<TYPES>,
460
461 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
463
464 pub id: u64,
466
467 pub consensus_metrics: Arc<ConsensusMetricsValue>,
469
470 pub storage: I::Storage,
472
473 pub storage_metrics: Arc<StorageMetricsValue>,
475
476 pub upgrade_lock: UpgradeLock<TYPES>,
478
479 pub epoch_height: u64,
481
482 pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
484
485 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
487
488 pub stake_table_capacity: usize,
490
491 pub da_committees: Vec<VersionedDaCommittee<TYPES>>,
493}
494
495impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumVoteTaskState<TYPES, I> {
496 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create event dependency", level = "error")]
498 fn create_event_dependency(
499 &self,
500 dependency_type: VoteDependency,
501 view_number: ViewNumber,
502 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
503 cancel_receiver: Receiver<()>,
504 ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
505 let id = self.id;
506 EventDependency::new(
507 event_receiver,
508 cancel_receiver,
509 format!(
510 "VoteDependency::{:?} for view {:?}, my id {:?}",
511 dependency_type, view_number, self.id
512 ),
513 Box::new(move |event| {
514 let event = event.as_ref();
515 let event_view = match dependency_type {
516 VoteDependency::QuorumProposal => {
517 if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
518 proposal.data.view_number()
519 } else {
520 return false;
521 }
522 },
523 VoteDependency::Dac => {
524 if let HotShotEvent::DaCertificateValidated(cert) = event {
525 cert.view_number
526 } else {
527 return false;
528 }
529 },
530 VoteDependency::Vid => {
531 if let HotShotEvent::VidShareValidated(disperse) = event {
532 disperse.data.view_number()
533 } else {
534 return false;
535 }
536 },
537 };
538 if event_view == view_number {
539 tracing::debug!(
540 "Vote dependency {dependency_type:?} completed for view {view_number}, my \
541 id is {id}"
542 );
543 return true;
544 }
545 false
546 }),
547 )
548 }
549
550 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create dependency task if new", level = "error")]
553 fn create_dependency_task_if_new(
554 &mut self,
555 view_number: ViewNumber,
556 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
557 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
558 event: Arc<HotShotEvent<TYPES>>,
559 ) {
560 tracing::debug!(
561 "Attempting to make dependency task for view {view_number} and event {event:?}"
562 );
563
564 if self.vote_dependencies.contains_key(&view_number) {
565 return;
566 }
567
568 let (cancel_sender, cancel_receiver) = broadcast(1);
569
570 let mut quorum_proposal_dependency = self.create_event_dependency(
571 VoteDependency::QuorumProposal,
572 view_number,
573 event_receiver.clone(),
574 cancel_receiver.clone(),
575 );
576 let dac_dependency = self.create_event_dependency(
577 VoteDependency::Dac,
578 view_number,
579 event_receiver.clone(),
580 cancel_receiver.clone(),
581 );
582 let vid_dependency = self.create_event_dependency(
583 VoteDependency::Vid,
584 view_number,
585 event_receiver.clone(),
586 cancel_receiver.clone(),
587 );
588 if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
590 quorum_proposal_dependency.mark_as_completed(event);
591 }
592
593 let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency];
594
595 let dependency_chain = AndDependency::from_deps(deps);
596
597 let dependency_task = DependencyTask::new(
598 dependency_chain,
599 VoteDependencyHandle::<TYPES, I> {
600 public_key: self.public_key.clone(),
601 private_key: self.private_key.clone(),
602 consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
603 instance_state: Arc::clone(&self.instance_state),
604 membership_coordinator: self.membership.clone(),
605 storage: self.storage.clone(),
606 storage_metrics: Arc::clone(&self.storage_metrics),
607 view_number,
608 sender: event_sender.clone(),
609 receiver: event_receiver.clone().deactivate(),
610 upgrade_lock: self.upgrade_lock.clone(),
611 id: self.id,
612 epoch_height: self.epoch_height,
613 consensus_metrics: Arc::clone(&self.consensus_metrics),
614 state_private_key: self.state_private_key.clone(),
615 first_epoch: self.first_epoch,
616 stake_table_capacity: self.stake_table_capacity,
617 cancel_receiver,
618 },
619 );
620 self.vote_dependencies.insert(view_number, cancel_sender);
621
622 dependency_task.run();
623 }
624
625 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote update latest voted view", level = "error")]
627 async fn update_latest_voted_view(&mut self, new_view: ViewNumber) -> bool {
628 if *self.latest_voted_view < *new_view {
629 tracing::debug!(
630 "Updating next vote view from {} to {} in the quorum vote task",
631 *self.latest_voted_view,
632 *new_view
633 );
634
635 for view in *self.latest_voted_view..(*new_view) {
637 let maybe_cancel_sender = self.vote_dependencies.remove(&ViewNumber::new(view));
638 if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
639 tracing::warn!("Aborting vote dependency task for view {view}");
640 let _ = maybe_cancel_sender.unwrap().try_broadcast(());
641 }
642 }
643
644 if let Ok(last_voted_view_usize) = usize::try_from(*new_view) {
646 self.consensus_metrics
647 .last_voted_view
648 .set(last_voted_view_usize);
649 } else {
650 tracing::warn!("Failed to convert last voted view to a usize: {new_view}");
651 }
652
653 self.latest_voted_view = new_view;
654
655 return true;
656 }
657 false
658 }
659
660 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote handle", level = "error", target = "QuorumVoteTaskState")]
662 pub async fn handle(
663 &mut self,
664 event: Arc<HotShotEvent<TYPES>>,
665 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
666 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
667 ) -> Result<()> {
668 match event.as_ref() {
669 HotShotEvent::QuorumProposalValidated(proposal, _parent_leaf) => {
670 tracing::trace!(
671 "Received Proposal for view {}",
672 *proposal.data.view_number()
673 );
674
675 if let Err(e) =
677 handle_quorum_proposal_validated(&proposal.data, self, &event_sender).await
678 {
679 tracing::debug!(
680 "Failed to handle QuorumProposalValidated event; error = {e:#}"
681 );
682 }
683
684 ensure!(
685 proposal.data.view_number() > self.latest_voted_view,
686 "We have already voted for this view"
687 );
688
689 self.create_dependency_task_if_new(
690 proposal.data.view_number(),
691 event_receiver,
692 &event_sender,
693 Arc::clone(&event),
694 );
695 },
696 HotShotEvent::DaCertificateRecv(cert) => {
697 let view = cert.view_number;
698
699 tracing::trace!("Received DAC for view {view}");
700 ensure!(
702 view > self.latest_voted_view,
703 "Received DAC for an older view."
704 );
705
706 let cert_epoch = cert.data.epoch;
707
708 let epoch_membership = self.membership.stake_table_for_epoch(cert_epoch).await?;
709 let membership_da_stake_table = epoch_membership.da_stake_table().await;
710 let membership_da_success_threshold = epoch_membership.da_success_threshold().await;
711
712 cert.is_valid_cert(
714 &StakeTableEntries::<TYPES>::from(membership_da_stake_table).0,
715 membership_da_success_threshold,
716 &self.upgrade_lock,
717 )
718 .await
719 .context(|e| warn!("Invalid DAC: {e}"))?;
720
721 self.consensus
723 .write()
724 .await
725 .update_saved_da_certs(view, cert.clone());
726
727 broadcast_event(
728 Arc::new(HotShotEvent::DaCertificateValidated(cert.clone())),
729 &event_sender.clone(),
730 )
731 .await;
732 self.create_dependency_task_if_new(
733 view,
734 event_receiver,
735 &event_sender,
736 Arc::clone(&event),
737 );
738 },
739 HotShotEvent::VidShareRecv(sender, share) => {
740 let view = share.data.view_number();
741 tracing::trace!("Received VID share for view {view}");
743 ensure!(
744 view > self.latest_voted_view,
745 "Received VID share for an older view."
746 );
747
748 let payload_commitment = share.data.payload_commitment_ref();
750
751 ensure!(
753 sender.validate(&share.signature, payload_commitment.as_ref()),
754 warn!(
755 "VID share signature is invalid, sender: {}, signature: {:?}, \
756 payload_commitment: {:?}",
757 sender, share.signature, payload_commitment
758 )
759 );
760
761 let vid_epoch = share.data.epoch();
762 let target_epoch = share.data.target_epoch();
763 let membership_reader = self.membership.membership_for_epoch(vid_epoch).await?;
764 ensure!(
766 membership_reader
767 .da_committee_members(view)
768 .await
769 .contains(sender)
770 || *sender == membership_reader.leader(view).await?,
771 "VID share was not sent by a DA member or the view leader."
772 );
773
774 let total_weight = vid_total_weight::<TYPES>(
775 &self
776 .membership
777 .membership_for_epoch(target_epoch)
778 .await?
779 .stake_table()
780 .await,
781 target_epoch,
782 );
783
784 if !share.data.verify(total_weight) {
785 bail!("Failed to verify VID share");
786 }
787
788 self.consensus
789 .write()
790 .await
791 .update_vid_shares(view, share.clone());
792
793 ensure!(
794 *share.data.recipient_key() == self.public_key,
795 "Got a Valid VID share but it's not for our key"
796 );
797
798 broadcast_event(
799 Arc::new(HotShotEvent::VidShareValidated(share.clone())),
800 &event_sender.clone(),
801 )
802 .await;
803 self.create_dependency_task_if_new(
804 view,
805 event_receiver,
806 &event_sender,
807 Arc::clone(&event),
808 );
809 },
810 HotShotEvent::Timeout(view, ..) => {
811 let view = ViewNumber::new(view.saturating_sub(1));
812 let current_tasks = self.vote_dependencies.split_off(&view);
814 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
815 if !cancel_sender.is_closed() {
816 tracing::error!("Aborting vote dependency task for view {view}");
817 let _ = cancel_sender.try_broadcast(());
818 }
819 }
820 self.vote_dependencies = current_tasks;
821 },
822 &HotShotEvent::ViewChange(mut view, _) => {
823 view = ViewNumber::new(view.saturating_sub(1));
824 if !self.update_latest_voted_view(view).await {
825 tracing::debug!("view not updated");
826 }
827 let current_tasks = self.vote_dependencies.split_off(&view);
829 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
830 if !cancel_sender.is_closed() {
831 tracing::error!("Aborting vote dependency task for view {view}");
832 let _ = cancel_sender.try_broadcast(());
833 }
834 }
835 self.vote_dependencies = current_tasks;
836 },
837 HotShotEvent::SetFirstEpoch(view, epoch) => {
838 self.first_epoch = Some((*view, *epoch));
839 },
840 _ => {},
841 }
842 Ok(())
843 }
844}
845
846#[async_trait]
847impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for QuorumVoteTaskState<TYPES, I> {
848 type Event = HotShotEvent<TYPES>;
849
850 async fn handle_event(
851 &mut self,
852 event: Arc<Self::Event>,
853 sender: &Sender<Arc<Self::Event>>,
854 receiver: &Receiver<Arc<Self::Event>>,
855 ) -> Result<()> {
856 self.handle(event, receiver.clone(), sender.clone()).await
857 }
858
859 fn cancel_subtasks(&mut self) {
860 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
861 if !cancel_sender.is_closed() {
862 tracing::error!("Aborting vote dependency task for view {view}");
863 let _ = cancel_sender.try_broadcast(());
864 }
865 }
866 }
867}