1use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::{
13 dependency::{AndDependency, EventDependency},
14 dependency_task::{DependencyTask, HandleDepOutput},
15 task::TaskState,
16};
17use hotshot_types::{
18 consensus::{ConsensusMetricsValue, OuterConsensus},
19 data::{vid_disperse::vid_total_weight, Leaf2},
20 epoch_membership::EpochMembershipCoordinator,
21 event::Event,
22 message::UpgradeLock,
23 simple_vote::HasEpoch,
24 stake_table::StakeTableEntries,
25 storage_metrics::StorageMetricsValue,
26 traits::{
27 block_contents::BlockHeader,
28 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
29 signature_key::{SignatureKey, StateSignatureKey},
30 storage::Storage,
31 },
32 utils::{is_epoch_root, is_epoch_transition, is_last_block, option_epoch_from_block_number},
33 vote::{Certificate, HasViewNumber},
34 VersionedDaCommittee,
35};
36use hotshot_utils::anytrace::*;
37use tracing::instrument;
38
39use crate::{
40 events::HotShotEvent,
41 helpers::{broadcast_event, broadcast_view_change, wait_for_second_vid_share},
42 quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state},
43};
44
45mod handlers;
47
48#[derive(Debug, PartialEq)]
50enum VoteDependency {
51 QuorumProposal,
53 Dac,
55 Vid,
57}
58
59pub struct VoteDependencyHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
61 pub public_key: TYPES::SignatureKey,
63
64 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
66
67 pub consensus: OuterConsensus<TYPES>,
69
70 pub instance_state: Arc<TYPES::InstanceState>,
72
73 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
75
76 pub storage: I::Storage,
78
79 pub storage_metrics: Arc<StorageMetricsValue>,
81
82 pub view_number: TYPES::View,
84
85 pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
87
88 pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
90
91 pub upgrade_lock: UpgradeLock<TYPES, V>,
93
94 pub consensus_metrics: Arc<ConsensusMetricsValue>,
96
97 pub id: u64,
99
100 pub epoch_height: u64,
102
103 pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
108
109 pub stake_table_capacity: usize,
111
112 pub cancel_receiver: Receiver<()>,
113}
114
115impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> HandleDepOutput
116 for VoteDependencyHandle<TYPES, I, V>
117{
118 type Output = Vec<Arc<HotShotEvent<TYPES>>>;
119
120 #[allow(clippy::too_many_lines)]
121 #[instrument(skip_all, fields(id = self.id, view = *self.view_number))]
122 async fn handle_dep_result(self, res: Self::Output) {
123 let mut cancel_receiver = self.cancel_receiver.clone();
124 let result = tokio::select! { result = self.handle_vote_deps(&res) => {
125 result
126 }
127 _ = cancel_receiver.recv() => {
128 tracing::warn!("Vote dependency task cancelled");
129 return;
130 }
131 };
132 if result.is_err() {
133 log!(result);
134 self.print_vote_events(&res)
135 }
136 }
137}
138
139impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
140 VoteDependencyHandle<TYPES, I, V>
141{
142 fn print_vote_events(&self, res: &[Arc<HotShotEvent<TYPES>>]) {
143 let events: Vec<_> = res.iter().map(Arc::as_ref).collect();
144 tracing::warn!("Failed to vote, events: {:?}", events);
145 }
146
147 async fn handle_vote_deps(&self, res: &[Arc<HotShotEvent<TYPES>>]) -> Result<()> {
148 let mut payload_commitment = None;
149 let mut next_epoch_payload_commitment = None;
150 let mut leaf = None;
151 let mut vid_share = None;
152 let mut da_cert = None;
153 let mut parent_view_number = None;
154 for event in res.iter() {
155 match event.as_ref() {
156 #[allow(unused_assignments)]
157 HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
158 let proposal_payload_comm = proposal.data.block_header().payload_commitment();
159 let parent_commitment = parent_leaf.commit();
160 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
161
162 if let Some(ref comm) = payload_commitment {
163 ensure!(
164 proposal_payload_comm == *comm,
165 error!(
166 "Quorum proposal has inconsistent payload commitment with DAC or \
167 VID."
168 )
169 );
170 } else {
171 payload_commitment = Some(proposal_payload_comm);
172 }
173
174 ensure!(
175 proposed_leaf.parent_commitment() == parent_commitment,
176 warn!(
177 "Proposed leaf parent commitment does not match parent leaf payload \
178 commitment. Aborting vote."
179 )
180 );
181
182 let now = Instant::now();
183 self.storage
186 .append_proposal_wrapper(proposal)
187 .await
188 .map_err(|e| {
189 error!("failed to store proposal, not voting. error = {e:#}")
190 })?;
191 self.storage_metrics
192 .append_quorum_duration
193 .add_point(now.elapsed().as_secs_f64());
194
195 leaf = Some(proposed_leaf);
196 parent_view_number = Some(parent_leaf.view_number());
197 },
198 HotShotEvent::DaCertificateValidated(cert) => {
199 let cert_payload_comm = &cert.data().payload_commit;
200 let next_epoch_cert_payload_comm = cert.data().next_epoch_payload_commit;
201 if let Some(ref comm) = payload_commitment {
202 ensure!(
203 cert_payload_comm == comm,
204 error!(
205 "DAC has inconsistent payload commitment with quorum proposal or \
206 VID."
207 )
208 );
209 } else {
210 payload_commitment = Some(*cert_payload_comm);
211 }
212 if next_epoch_payload_commitment.is_some()
213 && next_epoch_payload_commitment != next_epoch_cert_payload_comm
214 {
215 bail!(error!(
216 "DAC has inconsistent next epoch payload commitment with VID."
217 ));
218 } else {
219 next_epoch_payload_commitment = next_epoch_cert_payload_comm;
220 }
221 da_cert = Some(cert.clone());
222 },
223 HotShotEvent::VidShareValidated(share) => {
224 let vid_payload_commitment = &share.data.payload_commitment();
225 vid_share = Some(share.clone());
226 let is_next_epoch_vid = share.data.epoch() != share.data.target_epoch();
227 if is_next_epoch_vid {
228 if let Some(ref comm) = next_epoch_payload_commitment {
229 ensure!(
230 vid_payload_commitment == comm,
231 error!(
232 "VID has inconsistent next epoch payload commitment with DAC."
233 )
234 );
235 } else {
236 next_epoch_payload_commitment = Some(*vid_payload_commitment);
237 }
238 } else if let Some(ref comm) = payload_commitment {
239 ensure!(
240 vid_payload_commitment == comm,
241 error!(
242 "VID has inconsistent payload commitment with quorum proposal or \
243 DAC."
244 )
245 );
246 } else {
247 payload_commitment = Some(*vid_payload_commitment);
248 }
249 },
250 _ => {},
251 }
252 }
253
254 let Some(vid_share) = vid_share else {
255 bail!(error!(
256 "We don't have the VID share for this view {}, but we should, because the vote \
257 dependencies have completed.",
258 self.view_number
259 ));
260 };
261
262 let Some(leaf) = leaf else {
263 bail!(error!(
264 "We don't have the leaf for this view {}, but we should, because the vote \
265 dependencies have completed.",
266 self.view_number
267 ));
268 };
269
270 let Some(da_cert) = da_cert else {
271 bail!(error!(
272 "We don't have the DA cert for this view {}, but we should, because the vote \
273 dependencies have completed.",
274 self.view_number
275 ));
276 };
277
278 let mut maybe_current_epoch_vid_share = None;
279 if self.upgrade_lock.epochs_enabled(leaf.view_number()).await
281 && is_epoch_transition(leaf.block_header().block_number(), self.epoch_height)
282 {
283 let current_epoch = option_epoch_from_block_number::<TYPES>(
284 leaf.with_epoch,
285 leaf.block_header().block_number(),
286 self.epoch_height,
287 );
288 let next_epoch = current_epoch.map(|e| e + 1);
289
290 let Ok(current_epoch_membership) = self
291 .membership_coordinator
292 .stake_table_for_epoch(current_epoch)
293 .await
294 else {
295 bail!(warn!(
296 "Couldn't acquire current epoch membership. Do not vote!"
297 ));
298 };
299 let Ok(next_epoch_membership) = self
300 .membership_coordinator
301 .stake_table_for_epoch(next_epoch)
302 .await
303 else {
304 bail!(warn!(
305 "Couldn't acquire next epoch membership. Do not vote!"
306 ));
307 };
308
309 if current_epoch_membership.has_stake(&self.public_key).await
311 && next_epoch_membership.has_stake(&self.public_key).await
312 {
313 let other_target_epoch = if vid_share.data.target_epoch() == current_epoch {
314 maybe_current_epoch_vid_share = Some(vid_share.clone());
315 next_epoch
316 } else {
317 current_epoch
318 };
319 match wait_for_second_vid_share(
320 other_target_epoch,
321 &vid_share,
322 &da_cert,
323 &self.consensus,
324 &self.receiver.activate_cloned(),
325 self.cancel_receiver.clone(),
326 self.id,
327 )
328 .await
329 {
330 Ok(other_vid_share) => {
331 if maybe_current_epoch_vid_share.is_none() {
332 maybe_current_epoch_vid_share = Some(other_vid_share);
333 }
334 ensure!(
335 leaf.block_header().payload_commitment()
336 == maybe_current_epoch_vid_share
337 .as_ref()
338 .unwrap()
339 .data
340 .payload_commitment(),
341 error!(
342 "We have both epochs vid shares but the leaf's vid commit doesn't \
343 match the old epoch vid share's commit. It should never happen."
344 )
345 );
346 },
347 Err(e) => {
348 bail!(warn!(
349 "This is an epoch transition block, we are in both epochs but we \
350 received only one VID share. Do not vote! Error: {e:?}"
351 ));
352 },
353 }
354 }
355 }
356
357 update_shared_state::<TYPES, V>(
359 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
360 self.sender.clone(),
361 self.receiver.clone(),
362 self.membership_coordinator.clone(),
363 self.public_key.clone(),
364 self.private_key.clone(),
365 self.upgrade_lock.clone(),
366 self.view_number,
367 Arc::clone(&self.instance_state),
368 &leaf,
369 maybe_current_epoch_vid_share.as_ref().unwrap_or(&vid_share),
370 parent_view_number,
371 self.epoch_height,
372 )
373 .await
374 .context(error!("Failed to update shared consensus state"))?;
375
376 let cur_epoch = option_epoch_from_block_number::<TYPES>(
377 leaf.with_epoch,
378 leaf.height(),
379 self.epoch_height,
380 );
381
382 let now = Instant::now();
383 let epoch_membership = self
387 .membership_coordinator
388 .membership_for_epoch(cur_epoch)
389 .await?;
390
391 let duration = now.elapsed();
392 tracing::info!("membership_for_epoch time: {duration:?}");
393
394 let is_vote_leaf_extended = is_last_block(leaf.height(), self.epoch_height);
395 let is_vote_epoch_root = is_epoch_root(leaf.height(), self.epoch_height);
396 if cur_epoch.is_none() || !is_vote_leaf_extended {
397 broadcast_view_change(
401 &self.sender,
402 leaf.view_number() + 1,
403 cur_epoch,
404 self.first_epoch,
405 )
406 .await;
407 }
408
409 let leader = epoch_membership.leader(self.view_number).await;
410 if let (Ok(leader_key), Some(cur_epoch)) = (leader, cur_epoch) {
411 self.consensus
412 .write()
413 .await
414 .update_validator_participation(leader_key, cur_epoch, true);
415 }
416
417 submit_vote::<TYPES, I, V>(
418 self.sender.clone(),
419 epoch_membership,
420 self.public_key.clone(),
421 self.private_key.clone(),
422 self.upgrade_lock.clone(),
423 self.view_number,
424 self.storage.clone(),
425 Arc::clone(&self.storage_metrics),
426 leaf,
427 maybe_current_epoch_vid_share.unwrap_or(vid_share),
428 is_vote_leaf_extended,
429 is_vote_epoch_root,
430 self.epoch_height,
431 &self.state_private_key,
432 self.stake_table_capacity,
433 )
434 .await
435 }
436}
437
438pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
442 pub public_key: TYPES::SignatureKey,
444
445 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
447
448 pub consensus: OuterConsensus<TYPES>,
450
451 pub instance_state: Arc<TYPES::InstanceState>,
453
454 pub latest_voted_view: TYPES::View,
456
457 pub vote_dependencies: BTreeMap<TYPES::View, Sender<()>>,
459
460 pub network: Arc<I::Network>,
462
463 pub membership: EpochMembershipCoordinator<TYPES>,
465
466 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
468
469 pub id: u64,
471
472 pub consensus_metrics: Arc<ConsensusMetricsValue>,
474
475 pub storage: I::Storage,
477
478 pub storage_metrics: Arc<StorageMetricsValue>,
480
481 pub upgrade_lock: UpgradeLock<TYPES, V>,
483
484 pub epoch_height: u64,
486
487 pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
489
490 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
492
493 pub stake_table_capacity: usize,
495
496 pub da_committees: Vec<VersionedDaCommittee<TYPES>>,
498}
499
500impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskState<TYPES, I, V> {
501 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create event dependency", level = "error")]
503 fn create_event_dependency(
504 &self,
505 dependency_type: VoteDependency,
506 view_number: TYPES::View,
507 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
508 cancel_receiver: Receiver<()>,
509 ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
510 let id = self.id;
511 EventDependency::new(
512 event_receiver,
513 cancel_receiver,
514 format!(
515 "VoteDependency::{:?} for view {:?}, my id {:?}",
516 dependency_type, view_number, self.id
517 ),
518 Box::new(move |event| {
519 let event = event.as_ref();
520 let event_view = match dependency_type {
521 VoteDependency::QuorumProposal => {
522 if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
523 proposal.data.view_number()
524 } else {
525 return false;
526 }
527 },
528 VoteDependency::Dac => {
529 if let HotShotEvent::DaCertificateValidated(cert) = event {
530 cert.view_number
531 } else {
532 return false;
533 }
534 },
535 VoteDependency::Vid => {
536 if let HotShotEvent::VidShareValidated(disperse) = event {
537 disperse.data.view_number()
538 } else {
539 return false;
540 }
541 },
542 };
543 if event_view == view_number {
544 tracing::debug!(
545 "Vote dependency {dependency_type:?} completed for view {view_number}, my \
546 id is {id}"
547 );
548 return true;
549 }
550 false
551 }),
552 )
553 }
554
555 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create dependency task if new", level = "error")]
558 fn create_dependency_task_if_new(
559 &mut self,
560 view_number: TYPES::View,
561 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
562 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
563 event: Arc<HotShotEvent<TYPES>>,
564 ) {
565 tracing::debug!(
566 "Attempting to make dependency task for view {view_number} and event {event:?}"
567 );
568
569 if self.vote_dependencies.contains_key(&view_number) {
570 return;
571 }
572
573 let (cancel_sender, cancel_receiver) = broadcast(1);
574
575 let mut quorum_proposal_dependency = self.create_event_dependency(
576 VoteDependency::QuorumProposal,
577 view_number,
578 event_receiver.clone(),
579 cancel_receiver.clone(),
580 );
581 let dac_dependency = self.create_event_dependency(
582 VoteDependency::Dac,
583 view_number,
584 event_receiver.clone(),
585 cancel_receiver.clone(),
586 );
587 let vid_dependency = self.create_event_dependency(
588 VoteDependency::Vid,
589 view_number,
590 event_receiver.clone(),
591 cancel_receiver.clone(),
592 );
593 if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
595 quorum_proposal_dependency.mark_as_completed(event);
596 }
597
598 let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency];
599
600 let dependency_chain = AndDependency::from_deps(deps);
601
602 let dependency_task = DependencyTask::new(
603 dependency_chain,
604 VoteDependencyHandle::<TYPES, I, V> {
605 public_key: self.public_key.clone(),
606 private_key: self.private_key.clone(),
607 consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
608 instance_state: Arc::clone(&self.instance_state),
609 membership_coordinator: self.membership.clone(),
610 storage: self.storage.clone(),
611 storage_metrics: Arc::clone(&self.storage_metrics),
612 view_number,
613 sender: event_sender.clone(),
614 receiver: event_receiver.clone().deactivate(),
615 upgrade_lock: self.upgrade_lock.clone(),
616 id: self.id,
617 epoch_height: self.epoch_height,
618 consensus_metrics: Arc::clone(&self.consensus_metrics),
619 state_private_key: self.state_private_key.clone(),
620 first_epoch: self.first_epoch,
621 stake_table_capacity: self.stake_table_capacity,
622 cancel_receiver,
623 },
624 );
625 self.vote_dependencies.insert(view_number, cancel_sender);
626
627 dependency_task.run();
628 }
629
630 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote update latest voted view", level = "error")]
632 async fn update_latest_voted_view(&mut self, new_view: TYPES::View) -> bool {
633 if *self.latest_voted_view < *new_view {
634 tracing::debug!(
635 "Updating next vote view from {} to {} in the quorum vote task",
636 *self.latest_voted_view,
637 *new_view
638 );
639
640 for view in *self.latest_voted_view..(*new_view) {
642 let maybe_cancel_sender = self.vote_dependencies.remove(&TYPES::View::new(view));
643 if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
644 tracing::warn!("Aborting vote dependency task for view {view}");
645 let _ = maybe_cancel_sender.unwrap().try_broadcast(());
646 }
647 }
648
649 if let Ok(last_voted_view_usize) = usize::try_from(*new_view) {
651 self.consensus_metrics
652 .last_voted_view
653 .set(last_voted_view_usize);
654 } else {
655 tracing::warn!("Failed to convert last voted view to a usize: {new_view}");
656 }
657
658 self.latest_voted_view = new_view;
659
660 return true;
661 }
662 false
663 }
664
665 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote handle", level = "error", target = "QuorumVoteTaskState")]
667 pub async fn handle(
668 &mut self,
669 event: Arc<HotShotEvent<TYPES>>,
670 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
671 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
672 ) -> Result<()> {
673 match event.as_ref() {
674 HotShotEvent::QuorumProposalValidated(proposal, _parent_leaf) => {
675 tracing::trace!(
676 "Received Proposal for view {}",
677 *proposal.data.view_number()
678 );
679
680 if let Err(e) =
682 handle_quorum_proposal_validated(&proposal.data, self, &event_sender).await
683 {
684 tracing::debug!(
685 "Failed to handle QuorumProposalValidated event; error = {e:#}"
686 );
687 }
688
689 ensure!(
690 proposal.data.view_number() > self.latest_voted_view,
691 "We have already voted for this view"
692 );
693
694 self.create_dependency_task_if_new(
695 proposal.data.view_number(),
696 event_receiver,
697 &event_sender,
698 Arc::clone(&event),
699 );
700 },
701 HotShotEvent::DaCertificateRecv(cert) => {
702 let view = cert.view_number;
703
704 tracing::trace!("Received DAC for view {view}");
705 ensure!(
707 view > self.latest_voted_view,
708 "Received DAC for an older view."
709 );
710
711 let cert_epoch = cert.data.epoch;
712
713 let epoch_membership = self.membership.stake_table_for_epoch(cert_epoch).await?;
714 let membership_da_stake_table = epoch_membership.da_stake_table().await;
715 let membership_da_success_threshold = epoch_membership.da_success_threshold().await;
716
717 cert.is_valid_cert(
719 &StakeTableEntries::<TYPES>::from(membership_da_stake_table).0,
720 membership_da_success_threshold,
721 &self.upgrade_lock,
722 )
723 .await
724 .context(|e| warn!("Invalid DAC: {e}"))?;
725
726 self.consensus
728 .write()
729 .await
730 .update_saved_da_certs(view, cert.clone());
731
732 broadcast_event(
733 Arc::new(HotShotEvent::DaCertificateValidated(cert.clone())),
734 &event_sender.clone(),
735 )
736 .await;
737 self.create_dependency_task_if_new(
738 view,
739 event_receiver,
740 &event_sender,
741 Arc::clone(&event),
742 );
743 },
744 HotShotEvent::VidShareRecv(sender, share) => {
745 let view = share.data.view_number();
746 tracing::trace!("Received VID share for view {view}");
748 ensure!(
749 view > self.latest_voted_view,
750 "Received VID share for an older view."
751 );
752
753 let payload_commitment = share.data.payload_commitment_ref();
755
756 ensure!(
758 sender.validate(&share.signature, payload_commitment.as_ref()),
759 warn!(
760 "VID share signature is invalid, sender: {}, signature: {:?}, \
761 payload_commitment: {:?}",
762 sender, share.signature, payload_commitment
763 )
764 );
765
766 let vid_epoch = share.data.epoch();
767 let target_epoch = share.data.target_epoch();
768 let membership_reader = self.membership.membership_for_epoch(vid_epoch).await?;
769 ensure!(
771 membership_reader
772 .da_committee_members(view)
773 .await
774 .contains(sender)
775 || *sender == membership_reader.leader(view).await?,
776 "VID share was not sent by a DA member or the view leader."
777 );
778
779 let total_weight = vid_total_weight::<TYPES>(
780 &self
781 .membership
782 .membership_for_epoch(target_epoch)
783 .await?
784 .stake_table()
785 .await,
786 target_epoch,
787 );
788
789 if let Err(()) = share.data.verify_share(total_weight) {
790 bail!("Failed to verify VID share");
791 }
792
793 self.consensus
794 .write()
795 .await
796 .update_vid_shares(view, share.clone());
797
798 ensure!(
799 *share.data.recipient_key() == self.public_key,
800 "Got a Valid VID share but it's not for our key"
801 );
802
803 broadcast_event(
804 Arc::new(HotShotEvent::VidShareValidated(share.clone())),
805 &event_sender.clone(),
806 )
807 .await;
808 self.create_dependency_task_if_new(
809 view,
810 event_receiver,
811 &event_sender,
812 Arc::clone(&event),
813 );
814 },
815 HotShotEvent::Timeout(view, ..) => {
816 let view = TYPES::View::new(view.saturating_sub(1));
817 let current_tasks = self.vote_dependencies.split_off(&view);
819 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
820 if !cancel_sender.is_closed() {
821 tracing::error!("Aborting vote dependency task for view {view}");
822 let _ = cancel_sender.try_broadcast(());
823 }
824 }
825 self.vote_dependencies = current_tasks;
826 },
827 HotShotEvent::ViewChange(mut view, _) => {
828 view = TYPES::View::new(view.saturating_sub(1));
829 if !self.update_latest_voted_view(view).await {
830 tracing::debug!("view not updated");
831 }
832 let current_tasks = self.vote_dependencies.split_off(&view);
834 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
835 if !cancel_sender.is_closed() {
836 tracing::error!("Aborting vote dependency task for view {view}");
837 let _ = cancel_sender.try_broadcast(());
838 }
839 }
840 self.vote_dependencies = current_tasks;
841 },
842 HotShotEvent::SetFirstEpoch(view, epoch) => {
843 self.first_epoch = Some((*view, *epoch));
844 },
845 _ => {},
846 }
847 Ok(())
848 }
849}
850
851#[async_trait]
852impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
853 for QuorumVoteTaskState<TYPES, I, V>
854{
855 type Event = HotShotEvent<TYPES>;
856
857 async fn handle_event(
858 &mut self,
859 event: Arc<Self::Event>,
860 sender: &Sender<Arc<Self::Event>>,
861 receiver: &Receiver<Arc<Self::Event>>,
862 ) -> Result<()> {
863 self.handle(event, receiver.clone(), sender.clone()).await
864 }
865
866 fn cancel_subtasks(&mut self) {
867 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
868 if !cancel_sender.is_closed() {
869 tracing::error!("Aborting vote dependency task for view {view}");
870 let _ = cancel_sender.try_broadcast(());
871 }
872 }
873 }
874}