1use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::{
13 dependency::{AndDependency, EventDependency},
14 dependency_task::{DependencyTask, HandleDepOutput},
15 task::TaskState,
16};
17use hotshot_types::{
18 consensus::{ConsensusMetricsValue, OuterConsensus},
19 data::{vid_disperse::vid_total_weight, Leaf2},
20 epoch_membership::EpochMembershipCoordinator,
21 event::Event,
22 message::UpgradeLock,
23 simple_vote::HasEpoch,
24 stake_table::StakeTableEntries,
25 storage_metrics::StorageMetricsValue,
26 traits::{
27 block_contents::BlockHeader,
28 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
29 signature_key::{SignatureKey, StateSignatureKey},
30 storage::Storage,
31 },
32 utils::{is_epoch_root, is_epoch_transition, is_last_block, option_epoch_from_block_number},
33 vote::{Certificate, HasViewNumber},
34};
35use hotshot_utils::anytrace::*;
36use tracing::instrument;
37
38use crate::{
39 events::HotShotEvent,
40 helpers::{broadcast_event, broadcast_view_change, wait_for_second_vid_share},
41 quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state},
42};
43
44mod handlers;
46
47#[derive(Debug, PartialEq)]
49enum VoteDependency {
50 QuorumProposal,
52 Dac,
54 Vid,
56}
57
58pub struct VoteDependencyHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
60 pub public_key: TYPES::SignatureKey,
62
63 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
65
66 pub consensus: OuterConsensus<TYPES>,
68
69 pub instance_state: Arc<TYPES::InstanceState>,
71
72 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
74
75 pub storage: I::Storage,
77
78 pub storage_metrics: Arc<StorageMetricsValue>,
80
81 pub view_number: TYPES::View,
83
84 pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
86
87 pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
89
90 pub upgrade_lock: UpgradeLock<TYPES, V>,
92
93 pub consensus_metrics: Arc<ConsensusMetricsValue>,
95
96 pub id: u64,
98
99 pub epoch_height: u64,
101
102 pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
104
105 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
107
108 pub stake_table_capacity: usize,
110
111 pub cancel_receiver: Receiver<()>,
112}
113
114impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> HandleDepOutput
115 for VoteDependencyHandle<TYPES, I, V>
116{
117 type Output = Vec<Arc<HotShotEvent<TYPES>>>;
118
119 #[allow(clippy::too_many_lines)]
120 #[instrument(skip_all, fields(id = self.id, view = *self.view_number))]
121 async fn handle_dep_result(self, res: Self::Output) {
122 let result = self.handle_vote_deps(&res).await;
123 if result.is_err() {
124 log!(result);
125 self.print_vote_events(&res)
126 }
127 }
128}
129
130impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
131 VoteDependencyHandle<TYPES, I, V>
132{
133 fn print_vote_events(&self, res: &[Arc<HotShotEvent<TYPES>>]) {
134 let events: Vec<_> = res.iter().map(Arc::as_ref).collect();
135 tracing::warn!("Failed to vote, events: {:?}", events);
136 }
137
138 async fn handle_vote_deps(&self, res: &[Arc<HotShotEvent<TYPES>>]) -> Result<()> {
139 let mut payload_commitment = None;
140 let mut next_epoch_payload_commitment = None;
141 let mut leaf = None;
142 let mut vid_share = None;
143 let mut da_cert = None;
144 let mut parent_view_number = None;
145 for event in res.iter() {
146 match event.as_ref() {
147 #[allow(unused_assignments)]
148 HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
149 let proposal_payload_comm = proposal.data.block_header().payload_commitment();
150 let parent_commitment = parent_leaf.commit();
151 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
152
153 if let Some(ref comm) = payload_commitment {
154 ensure!(
155 proposal_payload_comm == *comm,
156 error!(
157 "Quorum proposal has inconsistent payload commitment with DAC or \
158 VID."
159 )
160 );
161 } else {
162 payload_commitment = Some(proposal_payload_comm);
163 }
164
165 ensure!(
166 proposed_leaf.parent_commitment() == parent_commitment,
167 warn!(
168 "Proposed leaf parent commitment does not match parent leaf payload \
169 commitment. Aborting vote."
170 )
171 );
172
173 let now = Instant::now();
174 self.storage
177 .append_proposal_wrapper(proposal)
178 .await
179 .map_err(|e| {
180 error!("failed to store proposal, not voting. error = {e:#}")
181 })?;
182 self.storage_metrics
183 .append_quorum_duration
184 .add_point(now.elapsed().as_secs_f64());
185
186 leaf = Some(proposed_leaf);
187 parent_view_number = Some(parent_leaf.view_number());
188 },
189 HotShotEvent::DaCertificateValidated(cert) => {
190 let cert_payload_comm = &cert.data().payload_commit;
191 let next_epoch_cert_payload_comm = cert.data().next_epoch_payload_commit;
192 if let Some(ref comm) = payload_commitment {
193 ensure!(
194 cert_payload_comm == comm,
195 error!(
196 "DAC has inconsistent payload commitment with quorum proposal or \
197 VID."
198 )
199 );
200 } else {
201 payload_commitment = Some(*cert_payload_comm);
202 }
203 if next_epoch_payload_commitment.is_some()
204 && next_epoch_payload_commitment != next_epoch_cert_payload_comm
205 {
206 bail!(error!(
207 "DAC has inconsistent next epoch payload commitment with VID."
208 ));
209 } else {
210 next_epoch_payload_commitment = next_epoch_cert_payload_comm;
211 }
212 da_cert = Some(cert.clone());
213 },
214 HotShotEvent::VidShareValidated(share) => {
215 let vid_payload_commitment = &share.data.payload_commitment();
216 vid_share = Some(share.clone());
217 let is_next_epoch_vid = share.data.epoch() != share.data.target_epoch();
218 if is_next_epoch_vid {
219 if let Some(ref comm) = next_epoch_payload_commitment {
220 ensure!(
221 vid_payload_commitment == comm,
222 error!(
223 "VID has inconsistent next epoch payload commitment with DAC."
224 )
225 );
226 } else {
227 next_epoch_payload_commitment = Some(*vid_payload_commitment);
228 }
229 } else if let Some(ref comm) = payload_commitment {
230 ensure!(
231 vid_payload_commitment == comm,
232 error!(
233 "VID has inconsistent payload commitment with quorum proposal or \
234 DAC."
235 )
236 );
237 } else {
238 payload_commitment = Some(*vid_payload_commitment);
239 }
240 },
241 _ => {},
242 }
243 }
244
245 let Some(vid_share) = vid_share else {
246 bail!(error!(
247 "We don't have the VID share for this view {}, but we should, because the vote \
248 dependencies have completed.",
249 self.view_number
250 ));
251 };
252
253 let Some(leaf) = leaf else {
254 bail!(error!(
255 "We don't have the leaf for this view {}, but we should, because the vote \
256 dependencies have completed.",
257 self.view_number
258 ));
259 };
260
261 let Some(da_cert) = da_cert else {
262 bail!(error!(
263 "We don't have the DA cert for this view {}, but we should, because the vote \
264 dependencies have completed.",
265 self.view_number
266 ));
267 };
268
269 let mut maybe_current_epoch_vid_share = None;
270 if self.upgrade_lock.epochs_enabled(leaf.view_number()).await
272 && is_epoch_transition(leaf.block_header().block_number(), self.epoch_height)
273 {
274 let current_epoch = option_epoch_from_block_number::<TYPES>(
275 leaf.with_epoch,
276 leaf.block_header().block_number(),
277 self.epoch_height,
278 );
279 let next_epoch = current_epoch.map(|e| e + 1);
280
281 let Ok(current_epoch_membership) = self
282 .membership_coordinator
283 .stake_table_for_epoch(current_epoch)
284 .await
285 else {
286 bail!(warn!(
287 "Couldn't acquire current epoch membership. Do not vote!"
288 ));
289 };
290 let Ok(next_epoch_membership) = self
291 .membership_coordinator
292 .stake_table_for_epoch(next_epoch)
293 .await
294 else {
295 bail!(warn!(
296 "Couldn't acquire next epoch membership. Do not vote!"
297 ));
298 };
299
300 if current_epoch_membership.has_stake(&self.public_key).await
302 && next_epoch_membership.has_stake(&self.public_key).await
303 {
304 let other_target_epoch = if vid_share.data.target_epoch() == current_epoch {
305 maybe_current_epoch_vid_share = Some(vid_share.clone());
306 next_epoch
307 } else {
308 current_epoch
309 };
310 match wait_for_second_vid_share(
311 other_target_epoch,
312 &vid_share,
313 &da_cert,
314 &self.consensus,
315 &self.receiver.activate_cloned(),
316 self.cancel_receiver.clone(),
317 self.id,
318 )
319 .await
320 {
321 Ok(other_vid_share) => {
322 if maybe_current_epoch_vid_share.is_none() {
323 maybe_current_epoch_vid_share = Some(other_vid_share);
324 }
325 ensure!(
326 leaf.block_header().payload_commitment()
327 == maybe_current_epoch_vid_share
328 .as_ref()
329 .unwrap()
330 .data
331 .payload_commitment(),
332 error!(
333 "We have both epochs vid shares but the leaf's vid commit doesn't \
334 match the old epoch vid share's commit. It should never happen."
335 )
336 );
337 },
338 Err(e) => {
339 bail!(warn!(
340 "This is an epoch transition block, we are in both epochs but we \
341 received only one VID share. Do not vote! Error: {e:?}"
342 ));
343 },
344 }
345 }
346 }
347
348 update_shared_state::<TYPES, V>(
350 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
351 self.sender.clone(),
352 self.receiver.clone(),
353 self.membership_coordinator.clone(),
354 self.public_key.clone(),
355 self.private_key.clone(),
356 self.upgrade_lock.clone(),
357 self.view_number,
358 Arc::clone(&self.instance_state),
359 &leaf,
360 maybe_current_epoch_vid_share.as_ref().unwrap_or(&vid_share),
361 parent_view_number,
362 self.epoch_height,
363 )
364 .await
365 .context(error!("Failed to update shared consensus state"))?;
366
367 let cur_epoch = option_epoch_from_block_number::<TYPES>(
368 leaf.with_epoch,
369 leaf.height(),
370 self.epoch_height,
371 );
372
373 let now = Instant::now();
374 let epoch_membership = self
378 .membership_coordinator
379 .membership_for_epoch(cur_epoch)
380 .await?;
381
382 let duration = now.elapsed();
383 tracing::info!("membership_for_epoch time: {duration:?}");
384
385 let is_vote_leaf_extended = is_last_block(leaf.height(), self.epoch_height);
386 let is_vote_epoch_root = is_epoch_root(leaf.height(), self.epoch_height);
387 if cur_epoch.is_none() || !is_vote_leaf_extended {
388 broadcast_view_change(
392 &self.sender,
393 leaf.view_number() + 1,
394 cur_epoch,
395 self.first_epoch,
396 )
397 .await;
398 }
399
400 let leader = epoch_membership.leader(self.view_number).await;
401 if let (Ok(leader_key), Some(cur_epoch)) = (leader, cur_epoch) {
402 self.consensus
403 .write()
404 .await
405 .update_validator_participation(leader_key, cur_epoch, true);
406 }
407
408 submit_vote::<TYPES, I, V>(
409 self.sender.clone(),
410 epoch_membership,
411 self.public_key.clone(),
412 self.private_key.clone(),
413 self.upgrade_lock.clone(),
414 self.view_number,
415 self.storage.clone(),
416 Arc::clone(&self.storage_metrics),
417 leaf,
418 maybe_current_epoch_vid_share.unwrap_or(vid_share),
419 is_vote_leaf_extended,
420 is_vote_epoch_root,
421 self.epoch_height,
422 &self.state_private_key,
423 self.stake_table_capacity,
424 )
425 .await
426 }
427}
428
429pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
433 pub public_key: TYPES::SignatureKey,
435
436 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
438
439 pub consensus: OuterConsensus<TYPES>,
441
442 pub instance_state: Arc<TYPES::InstanceState>,
444
445 pub latest_voted_view: TYPES::View,
447
448 pub vote_dependencies: BTreeMap<TYPES::View, Sender<()>>,
450
451 pub network: Arc<I::Network>,
453
454 pub membership: EpochMembershipCoordinator<TYPES>,
456
457 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
459
460 pub id: u64,
462
463 pub consensus_metrics: Arc<ConsensusMetricsValue>,
465
466 pub storage: I::Storage,
468
469 pub storage_metrics: Arc<StorageMetricsValue>,
471
472 pub upgrade_lock: UpgradeLock<TYPES, V>,
474
475 pub epoch_height: u64,
477
478 pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
480
481 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
483
484 pub stake_table_capacity: usize,
486}
487
488impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskState<TYPES, I, V> {
489 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create event dependency", level = "error")]
491 fn create_event_dependency(
492 &self,
493 dependency_type: VoteDependency,
494 view_number: TYPES::View,
495 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
496 cancel_receiver: Receiver<()>,
497 ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
498 let id = self.id;
499 EventDependency::new(
500 event_receiver,
501 cancel_receiver,
502 format!(
503 "VoteDependency::{:?} for view {:?}, my id {:?}",
504 dependency_type, view_number, self.id
505 ),
506 Box::new(move |event| {
507 let event = event.as_ref();
508 let event_view = match dependency_type {
509 VoteDependency::QuorumProposal => {
510 if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
511 proposal.data.view_number()
512 } else {
513 return false;
514 }
515 },
516 VoteDependency::Dac => {
517 if let HotShotEvent::DaCertificateValidated(cert) = event {
518 cert.view_number
519 } else {
520 return false;
521 }
522 },
523 VoteDependency::Vid => {
524 if let HotShotEvent::VidShareValidated(disperse) = event {
525 disperse.data.view_number()
526 } else {
527 return false;
528 }
529 },
530 };
531 if event_view == view_number {
532 tracing::debug!(
533 "Vote dependency {dependency_type:?} completed for view {view_number}, my \
534 id is {id}"
535 );
536 return true;
537 }
538 false
539 }),
540 )
541 }
542
543 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create dependency task if new", level = "error")]
546 fn create_dependency_task_if_new(
547 &mut self,
548 view_number: TYPES::View,
549 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
550 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
551 event: Arc<HotShotEvent<TYPES>>,
552 ) {
553 tracing::debug!(
554 "Attempting to make dependency task for view {view_number} and event {event:?}"
555 );
556
557 if self.vote_dependencies.contains_key(&view_number) {
558 return;
559 }
560
561 let (cancel_sender, cancel_receiver) = broadcast(1);
562
563 let mut quorum_proposal_dependency = self.create_event_dependency(
564 VoteDependency::QuorumProposal,
565 view_number,
566 event_receiver.clone(),
567 cancel_receiver.clone(),
568 );
569 let dac_dependency = self.create_event_dependency(
570 VoteDependency::Dac,
571 view_number,
572 event_receiver.clone(),
573 cancel_receiver.clone(),
574 );
575 let vid_dependency = self.create_event_dependency(
576 VoteDependency::Vid,
577 view_number,
578 event_receiver.clone(),
579 cancel_receiver.clone(),
580 );
581 if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
583 quorum_proposal_dependency.mark_as_completed(event);
584 }
585
586 let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency];
587
588 let dependency_chain = AndDependency::from_deps(deps);
589
590 let dependency_task = DependencyTask::new(
591 dependency_chain,
592 VoteDependencyHandle::<TYPES, I, V> {
593 public_key: self.public_key.clone(),
594 private_key: self.private_key.clone(),
595 consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
596 instance_state: Arc::clone(&self.instance_state),
597 membership_coordinator: self.membership.clone(),
598 storage: self.storage.clone(),
599 storage_metrics: Arc::clone(&self.storage_metrics),
600 view_number,
601 sender: event_sender.clone(),
602 receiver: event_receiver.clone().deactivate(),
603 upgrade_lock: self.upgrade_lock.clone(),
604 id: self.id,
605 epoch_height: self.epoch_height,
606 consensus_metrics: Arc::clone(&self.consensus_metrics),
607 state_private_key: self.state_private_key.clone(),
608 first_epoch: self.first_epoch,
609 stake_table_capacity: self.stake_table_capacity,
610 cancel_receiver,
611 },
612 );
613 self.vote_dependencies.insert(view_number, cancel_sender);
614
615 dependency_task.run();
616 }
617
618 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote update latest voted view", level = "error")]
620 async fn update_latest_voted_view(&mut self, new_view: TYPES::View) -> bool {
621 if *self.latest_voted_view < *new_view {
622 tracing::debug!(
623 "Updating next vote view from {} to {} in the quorum vote task",
624 *self.latest_voted_view,
625 *new_view
626 );
627
628 for view in *self.latest_voted_view..(*new_view) {
630 let maybe_cancel_sender = self.vote_dependencies.remove(&TYPES::View::new(view));
631 if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
632 tracing::error!("Aborting vote dependency task for view {view}");
633 let _ = maybe_cancel_sender.unwrap().try_broadcast(());
634 }
635 }
636
637 if let Ok(last_voted_view_usize) = usize::try_from(*new_view) {
639 self.consensus_metrics
640 .last_voted_view
641 .set(last_voted_view_usize);
642 } else {
643 tracing::warn!("Failed to convert last voted view to a usize: {new_view}");
644 }
645
646 self.latest_voted_view = new_view;
647
648 return true;
649 }
650 false
651 }
652
653 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote handle", level = "error", target = "QuorumVoteTaskState")]
655 pub async fn handle(
656 &mut self,
657 event: Arc<HotShotEvent<TYPES>>,
658 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
659 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
660 ) -> Result<()> {
661 match event.as_ref() {
662 HotShotEvent::QuorumProposalValidated(proposal, _parent_leaf) => {
663 tracing::trace!(
664 "Received Proposal for view {}",
665 *proposal.data.view_number()
666 );
667
668 if let Err(e) =
670 handle_quorum_proposal_validated(&proposal.data, self, &event_sender).await
671 {
672 tracing::debug!(
673 "Failed to handle QuorumProposalValidated event; error = {e:#}"
674 );
675 }
676
677 ensure!(
678 proposal.data.view_number() > self.latest_voted_view,
679 "We have already voted for this view"
680 );
681
682 self.create_dependency_task_if_new(
683 proposal.data.view_number(),
684 event_receiver,
685 &event_sender,
686 Arc::clone(&event),
687 );
688 },
689 HotShotEvent::DaCertificateRecv(cert) => {
690 let view = cert.view_number;
691
692 tracing::trace!("Received DAC for view {view}");
693 ensure!(
695 view > self.latest_voted_view,
696 "Received DAC for an older view."
697 );
698
699 let cert_epoch = cert.data.epoch;
700
701 let epoch_membership = self.membership.stake_table_for_epoch(cert_epoch).await?;
702 let membership_da_stake_table = epoch_membership.da_stake_table().await;
703 let membership_da_success_threshold = epoch_membership.da_success_threshold().await;
704
705 cert.is_valid_cert(
707 &StakeTableEntries::<TYPES>::from(membership_da_stake_table).0,
708 membership_da_success_threshold,
709 &self.upgrade_lock,
710 )
711 .await
712 .context(|e| warn!("Invalid DAC: {e}"))?;
713
714 self.consensus
716 .write()
717 .await
718 .update_saved_da_certs(view, cert.clone());
719
720 broadcast_event(
721 Arc::new(HotShotEvent::DaCertificateValidated(cert.clone())),
722 &event_sender.clone(),
723 )
724 .await;
725 self.create_dependency_task_if_new(
726 view,
727 event_receiver,
728 &event_sender,
729 Arc::clone(&event),
730 );
731 },
732 HotShotEvent::VidShareRecv(sender, share) => {
733 let view = share.data.view_number();
734 tracing::trace!("Received VID share for view {view}");
736 ensure!(
737 view > self.latest_voted_view,
738 "Received VID share for an older view."
739 );
740
741 let payload_commitment = share.data.payload_commitment_ref();
743
744 ensure!(
746 sender.validate(&share.signature, payload_commitment.as_ref()),
747 "VID share signature is invalid, sender: {}, signature: {:?}, \
748 payload_commitment: {:?}",
749 sender,
750 share.signature,
751 payload_commitment
752 );
753
754 let vid_epoch = share.data.epoch();
755 let target_epoch = share.data.target_epoch();
756 let membership_reader = self.membership.membership_for_epoch(vid_epoch).await?;
757 ensure!(
759 membership_reader
760 .da_committee_members(view)
761 .await
762 .contains(sender)
763 || *sender == membership_reader.leader(view).await?,
764 "VID share was not sent by a DA member or the view leader."
765 );
766
767 let total_weight = vid_total_weight::<TYPES>(
768 &self
769 .membership
770 .membership_for_epoch(target_epoch)
771 .await?
772 .stake_table()
773 .await,
774 target_epoch,
775 );
776
777 if let Err(()) = share.data.verify_share(total_weight) {
778 bail!("Failed to verify VID share");
779 }
780
781 self.consensus
782 .write()
783 .await
784 .update_vid_shares(view, share.clone());
785
786 ensure!(
787 *share.data.recipient_key() == self.public_key,
788 "Got a Valid VID share but it's not for our key"
789 );
790
791 broadcast_event(
792 Arc::new(HotShotEvent::VidShareValidated(share.clone())),
793 &event_sender.clone(),
794 )
795 .await;
796 self.create_dependency_task_if_new(
797 view,
798 event_receiver,
799 &event_sender,
800 Arc::clone(&event),
801 );
802 },
803 HotShotEvent::Timeout(view, ..) => {
804 let view = TYPES::View::new(view.saturating_sub(1));
805 let current_tasks = self.vote_dependencies.split_off(&view);
807 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
808 if !cancel_sender.is_closed() {
809 tracing::error!("Aborting vote dependency task for view {view}");
810 let _ = cancel_sender.try_broadcast(());
811 }
812 }
813 self.vote_dependencies = current_tasks;
814 },
815 HotShotEvent::ViewChange(mut view, _) => {
816 view = TYPES::View::new(view.saturating_sub(1));
817 if !self.update_latest_voted_view(view).await {
818 tracing::debug!("view not updated");
819 }
820 let current_tasks = self.vote_dependencies.split_off(&view);
822 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
823 if !cancel_sender.is_closed() {
824 tracing::error!("Aborting vote dependency task for view {view}");
825 let _ = cancel_sender.try_broadcast(());
826 }
827 }
828 self.vote_dependencies = current_tasks;
829 },
830 HotShotEvent::SetFirstEpoch(view, epoch) => {
831 self.first_epoch = Some((*view, *epoch));
832 },
833 _ => {},
834 }
835 Ok(())
836 }
837}
838
839#[async_trait]
840impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
841 for QuorumVoteTaskState<TYPES, I, V>
842{
843 type Event = HotShotEvent<TYPES>;
844
845 async fn handle_event(
846 &mut self,
847 event: Arc<Self::Event>,
848 sender: &Sender<Arc<Self::Event>>,
849 receiver: &Receiver<Arc<Self::Event>>,
850 ) -> Result<()> {
851 self.handle(event, receiver.clone(), sender.clone()).await
852 }
853
854 fn cancel_subtasks(&mut self) {
855 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
856 if !cancel_sender.is_closed() {
857 tracing::error!("Aborting vote dependency task for view {view}");
858 let _ = cancel_sender.try_broadcast(());
859 }
860 }
861 }
862}