1use std::{collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration};
8
9use async_broadcast::{Receiver, Sender};
10use async_lock::RwLock;
11use async_trait::async_trait;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14 epoch_membership::{EpochMembership, EpochMembershipCoordinator},
15 message::UpgradeLock,
16 simple_certificate::{
17 ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
18 },
19 simple_vote::{
20 HasEpoch, ViewSyncCommitData2, ViewSyncCommitVote2, ViewSyncFinalizeData2,
21 ViewSyncFinalizeVote2, ViewSyncPreCommitData2, ViewSyncPreCommitVote2,
22 },
23 stake_table::StakeTableEntries,
24 traits::{
25 node_implementation::{ConsensusTime, NodeType, Versions},
26 signature_key::SignatureKey,
27 },
28 utils::EpochTransitionIndicator,
29 vote::{Certificate, HasViewNumber, Vote},
30};
31use hotshot_utils::anytrace::*;
32use tokio::{spawn, task::JoinHandle, time::sleep};
33use tracing::instrument;
34
35use crate::{
36 events::{HotShotEvent, HotShotTaskCompleted},
37 helpers::{broadcast_event, broadcast_view_change},
38 vote_collection::{
39 create_vote_accumulator, AccumulatorInfo, HandleVoteEvent, VoteCollectionTaskState,
40 },
41};
42
43#[derive(PartialEq, PartialOrd, Clone, Debug, Eq, Hash)]
44pub enum ViewSyncPhase {
46 None,
48 PreCommit,
50 Commit,
52 Finalize,
54}
55
56type TaskMap<TYPES, VAL> =
57 BTreeMap<Option<<TYPES as NodeType>::Epoch>, BTreeMap<<TYPES as NodeType>::View, VAL>>;
58
59type RelayMap<TYPES, VOTE, CERT, V> =
61 TaskMap<TYPES, BTreeMap<u64, VoteCollectionTaskState<TYPES, VOTE, CERT, V>>>;
62
63type ReplicaTaskMap<TYPES, V> = TaskMap<TYPES, ViewSyncReplicaTaskState<TYPES, V>>;
64
65pub struct ViewSyncTaskState<TYPES: NodeType, V: Versions> {
67 pub cur_view: TYPES::View,
69
70 pub next_view: TYPES::View,
72
73 pub cur_epoch: Option<TYPES::Epoch>,
75
76 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
78
79 pub public_key: TYPES::SignatureKey,
81
82 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
84
85 pub id: u64,
87
88 pub num_timeouts_tracked: u64,
90
91 pub replica_task_map: RwLock<ReplicaTaskMap<TYPES, V>>,
93
94 pub pre_commit_relay_map: RwLock<
96 RelayMap<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>, V>,
97 >,
98
99 pub commit_relay_map:
101 RwLock<RelayMap<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>, V>>,
102
103 pub finalize_relay_map: RwLock<
105 RelayMap<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>, V>,
106 >,
107
108 pub view_sync_timeout: Duration,
110
111 pub last_garbage_collected_view: TYPES::View,
113
114 pub upgrade_lock: UpgradeLock<TYPES, V>,
116
117 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
119
120 pub highest_finalized_epoch_view: (Option<TYPES::Epoch>, TYPES::View),
122
123 pub epoch_height: u64,
124}
125
126#[async_trait]
127impl<TYPES: NodeType, V: Versions> TaskState for ViewSyncTaskState<TYPES, V> {
128 type Event = HotShotEvent<TYPES>;
129
130 async fn handle_event(
131 &mut self,
132 event: Arc<Self::Event>,
133 sender: &Sender<Arc<Self::Event>>,
134 _receiver: &Receiver<Arc<Self::Event>>,
135 ) -> Result<()> {
136 self.handle(event, sender.clone()).await
137 }
138
139 fn cancel_subtasks(&mut self) {}
140}
141
142pub struct ViewSyncReplicaTaskState<TYPES: NodeType, V: Versions> {
144 pub view_sync_timeout: Duration,
146
147 pub cur_view: TYPES::View,
149
150 pub next_view: TYPES::View,
152
153 pub relay: u64,
155
156 pub finalized: bool,
158
159 pub sent_view_change_event: bool,
161
162 pub timeout_task: Option<JoinHandle<()>>,
164
165 pub id: u64,
167
168 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
170
171 pub public_key: TYPES::SignatureKey,
173
174 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
176
177 pub upgrade_lock: UpgradeLock<TYPES, V>,
179
180 pub cur_epoch: Option<TYPES::Epoch>,
182
183 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
185}
186
187#[async_trait]
188impl<TYPES: NodeType, V: Versions> TaskState for ViewSyncReplicaTaskState<TYPES, V> {
189 type Event = HotShotEvent<TYPES>;
190
191 async fn handle_event(
192 &mut self,
193 event: Arc<Self::Event>,
194 sender: &Sender<Arc<Self::Event>>,
195 _receiver: &Receiver<Arc<Self::Event>>,
196 ) -> Result<()> {
197 self.handle(event, sender.clone()).await;
198
199 Ok(())
200 }
201
202 fn cancel_subtasks(&mut self) {}
203}
204
205impl<TYPES: NodeType, V: Versions> ViewSyncTaskState<TYPES, V> {
206 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "View Sync Main Task", level = "error")]
207 #[allow(clippy::type_complexity)]
208 pub async fn send_to_or_create_replica(
210 &mut self,
211 event: Arc<HotShotEvent<TYPES>>,
212 view: TYPES::View,
213 epoch: Option<TYPES::Epoch>,
214 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
215 ) {
216 let mut task_map = self.replica_task_map.write().await;
217
218 if let Some(replica_task) = task_map.get_mut(&epoch).and_then(|x| x.get_mut(&view)) {
219 tracing::debug!("Forwarding message");
221 let result = replica_task
222 .handle(Arc::clone(&event), sender.clone())
223 .await;
224
225 if result == Some(HotShotTaskCompleted) {
226 if epoch >= self.highest_finalized_epoch_view.0
228 && view > self.highest_finalized_epoch_view.1
229 {
230 self.highest_finalized_epoch_view = (epoch, view);
231 } else if view > self.highest_finalized_epoch_view.1 {
232 tracing::error!(
233 "We finalized a higher view but the epoch is lower. This should never \
234 happen. Current highest finalized epoch view: {:?}, new highest \
235 finalized epoch view: {:?}",
236 self.highest_finalized_epoch_view,
237 (epoch, view)
238 );
239 }
240 task_map.get_mut(&epoch).and_then(|x| x.remove(&view));
241 task_map.retain(|_, x| !x.is_empty());
242 drop(task_map);
243
244 self.garbage_collect_tasks().await;
246 return;
247 }
248
249 return;
250 }
251
252 let mut replica_state: ViewSyncReplicaTaskState<TYPES, V> = ViewSyncReplicaTaskState {
254 cur_view: view,
255 next_view: view,
256 relay: 0,
257 finalized: false,
258 sent_view_change_event: false,
259 timeout_task: None,
260 membership_coordinator: self.membership_coordinator.clone(),
261 public_key: self.public_key.clone(),
262 private_key: self.private_key.clone(),
263 view_sync_timeout: self.view_sync_timeout,
264 id: self.id,
265 upgrade_lock: self.upgrade_lock.clone(),
266 cur_epoch: self.cur_epoch,
267 first_epoch: self.first_epoch,
268 };
269
270 let result = replica_state
271 .handle(Arc::clone(&event), sender.clone())
272 .await;
273
274 if result == Some(HotShotTaskCompleted) {
275 return;
277 }
278
279 task_map
280 .entry(epoch)
281 .or_default()
282 .insert(view, replica_state);
283 }
284
285 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Main Task", level = "error")]
286 #[allow(clippy::type_complexity)]
287 pub async fn handle(
289 &mut self,
290 event: Arc<HotShotEvent<TYPES>>,
291 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
292 ) -> Result<()> {
293 match event.as_ref() {
294 HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
295 tracing::debug!("Received view sync cert for phase {certificate:?}");
296 let view = certificate.view_number();
297 self.send_to_or_create_replica(
298 Arc::clone(&event),
299 view,
300 certificate.epoch(),
301 &event_stream,
302 )
303 .await;
304 },
305 HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
306 tracing::debug!("Received view sync cert for phase {certificate:?}");
307 let view = certificate.view_number();
308 self.send_to_or_create_replica(
309 Arc::clone(&event),
310 view,
311 certificate.epoch(),
312 &event_stream,
313 )
314 .await;
315 },
316 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
317 tracing::debug!("Received view sync cert for phase {certificate:?}");
318 let view = certificate.view_number();
319 self.send_to_or_create_replica(
320 Arc::clone(&event),
321 view,
322 certificate.epoch(),
323 &event_stream,
324 )
325 .await;
326 },
327 HotShotEvent::ViewSyncTimeout(view, ..) => {
328 tracing::debug!("view sync timeout in main task {view:?}");
329 let view = *view;
330 self.send_to_or_create_replica(
331 Arc::clone(&event),
332 view,
333 self.cur_epoch,
334 &event_stream,
335 )
336 .await;
337 },
338
339 HotShotEvent::ViewSyncPreCommitVoteRecv(ref vote) => {
340 let mut map = self.pre_commit_relay_map.write().await;
341 let vote_view = vote.view_number();
342 let relay = vote.date().relay;
343 let relay_map = map
344 .entry(vote.date().epoch)
345 .or_insert(BTreeMap::new())
346 .entry(vote_view)
347 .or_insert(BTreeMap::new());
348 if let Some(relay_task) = relay_map.get_mut(&relay) {
349 tracing::debug!("Forwarding message");
350
351 if relay_task
353 .handle_vote_event(Arc::clone(&event), &event_stream)
354 .await?
355 .is_some()
356 {
357 map.get_mut(&vote.date().epoch)
358 .and_then(|x| x.remove(&vote_view));
359 map.retain(|_, x| !x.is_empty());
360 }
361
362 return Ok(());
363 }
364
365 let epoch_mem = self
366 .membership_coordinator
367 .membership_for_epoch(vote.date().epoch)
368 .await?;
369 ensure!(
371 epoch_mem.leader(vote_view + relay).await? == self.public_key,
372 "View sync vote sent to wrong leader"
373 );
374
375 let info = AccumulatorInfo {
376 public_key: self.public_key.clone(),
377 membership: epoch_mem,
378 view: vote_view,
379 id: self.id,
380 };
381 let vote_collector = create_vote_accumulator(
382 &info,
383 event,
384 &event_stream,
385 self.upgrade_lock.clone(),
386 EpochTransitionIndicator::NotInTransition,
387 )
388 .await?;
389
390 relay_map.insert(relay, vote_collector);
391 },
392
393 HotShotEvent::ViewSyncCommitVoteRecv(ref vote) => {
394 let mut map = self.commit_relay_map.write().await;
395 let vote_view = vote.view_number();
396 let relay = vote.date().relay;
397 let relay_map = map
398 .entry(vote.date().epoch)
399 .or_insert(BTreeMap::new())
400 .entry(vote_view)
401 .or_insert(BTreeMap::new());
402 if let Some(relay_task) = relay_map.get_mut(&relay) {
403 tracing::debug!("Forwarding message");
404
405 if relay_task
407 .handle_vote_event(Arc::clone(&event), &event_stream)
408 .await?
409 .is_some()
410 {
411 map.get_mut(&vote.date().epoch)
412 .and_then(|x| x.remove(&vote_view));
413 map.retain(|_, x| !x.is_empty());
414 }
415
416 return Ok(());
417 }
418
419 let epoch_mem = self
421 .membership_coordinator
422 .membership_for_epoch(vote.date().epoch)
423 .await?;
424 ensure!(
425 epoch_mem.leader(vote_view + relay).await? == self.public_key,
426 debug!("View sync vote sent to wrong leader")
427 );
428
429 let info = AccumulatorInfo {
430 public_key: self.public_key.clone(),
431 membership: epoch_mem,
432 view: vote_view,
433 id: self.id,
434 };
435
436 let vote_collector = create_vote_accumulator(
437 &info,
438 event,
439 &event_stream,
440 self.upgrade_lock.clone(),
441 EpochTransitionIndicator::NotInTransition,
442 )
443 .await?;
444 relay_map.insert(relay, vote_collector);
445 },
446
447 HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
448 let mut map = self.finalize_relay_map.write().await;
449 let vote_view = vote.view_number();
450 let relay = vote.date().relay;
451 let relay_map = map
452 .entry(vote.date().epoch)
453 .or_insert(BTreeMap::new())
454 .entry(vote_view)
455 .or_insert(BTreeMap::new());
456 if let Some(relay_task) = relay_map.get_mut(&relay) {
457 tracing::debug!("Forwarding message");
458
459 if relay_task
461 .handle_vote_event(Arc::clone(&event), &event_stream)
462 .await?
463 .is_some()
464 {
465 map.get_mut(&vote.date().epoch)
466 .and_then(|x| x.remove(&vote_view));
467 map.retain(|_, x| !x.is_empty());
468 }
469
470 return Ok(());
471 }
472
473 let epoch_mem = self
474 .membership_coordinator
475 .membership_for_epoch(vote.date().epoch)
476 .await?;
477 ensure!(
479 epoch_mem.leader(vote_view + relay).await? == self.public_key,
480 debug!("View sync vote sent to wrong leader")
481 );
482
483 let info = AccumulatorInfo {
484 public_key: self.public_key.clone(),
485 membership: epoch_mem,
486 view: vote_view,
487 id: self.id,
488 };
489 let vote_collector = create_vote_accumulator(
490 &info,
491 event,
492 &event_stream,
493 self.upgrade_lock.clone(),
494 EpochTransitionIndicator::NotInTransition,
495 )
496 .await;
497 if let Ok(vote_task) = vote_collector {
498 relay_map.insert(relay, vote_task);
499 }
500 },
501
502 &HotShotEvent::ViewChange(new_view, epoch) => {
503 if epoch > self.cur_epoch {
504 self.cur_epoch = epoch;
505 }
506 let new_view = TYPES::View::new(*new_view);
507 if self.cur_view < new_view {
508 tracing::debug!(
509 "Change from view {} to view {} in view sync task",
510 *self.cur_view,
511 *new_view
512 );
513
514 self.cur_view = new_view;
515 self.next_view = self.cur_view;
516 self.num_timeouts_tracked = 0;
517 }
518
519 self.garbage_collect_tasks().await;
520 },
521 HotShotEvent::LeavesDecided(leaves) => {
522 let finalized_epoch = self.highest_finalized_epoch_view.0.max(
523 leaves
524 .iter()
525 .map(|leaf| leaf.epoch(self.epoch_height))
526 .max()
527 .unwrap_or(None),
528 );
529 let finalized_view = self.highest_finalized_epoch_view.1.max(
530 leaves
531 .iter()
532 .map(|leaf| leaf.view_number())
533 .max()
534 .unwrap_or(TYPES::View::new(0)),
535 );
536
537 self.highest_finalized_epoch_view = (finalized_epoch, finalized_view);
538
539 self.garbage_collect_tasks().await;
540 },
541 &HotShotEvent::Timeout(view_number, ..) => {
542 ensure!(
544 view_number >= self.cur_view,
545 debug!("Discarding old timeout vote.")
546 );
547
548 self.num_timeouts_tracked += 1;
549 let leader = self
550 .membership_coordinator
551 .membership_for_epoch(self.cur_epoch)
552 .await?
553 .leader(view_number)
554 .await?;
555 tracing::warn!(
556 %leader,
557 leader_mnemonic = hotshot_types::utils::mnemonic(&leader),
558 view_number = *view_number,
559 num_timeouts_tracked = self.num_timeouts_tracked,
560 "view timed out",
561 );
562
563 if self.num_timeouts_tracked >= 3 {
564 tracing::error!("Too many consecutive timeouts! This shouldn't happen");
565 }
566
567 if self.num_timeouts_tracked >= 2 {
568 tracing::error!("Starting view sync protocol for view {}", *view_number + 1);
569
570 self.send_to_or_create_replica(
571 Arc::new(HotShotEvent::ViewSyncTrigger(view_number + 1)),
572 view_number + 1,
573 self.cur_epoch,
574 &event_stream,
575 )
576 .await;
577 } else {
578 self.cur_view = view_number + 1;
580 broadcast_view_change(
581 &event_stream,
582 self.cur_view,
583 self.cur_epoch,
584 self.first_epoch,
585 )
586 .await;
587 }
588 },
589 HotShotEvent::SetFirstEpoch(view, epoch) => {
590 self.first_epoch = Some((*view, *epoch));
591 },
592
593 _ => {},
594 }
595 Ok(())
596 }
597
598 async fn garbage_collect_tasks(&self) {
602 let previous_epoch = self
603 .cur_epoch
604 .map(|e| e.saturating_sub(1))
605 .map(TYPES::Epoch::new);
606 let gc_epoch = self.highest_finalized_epoch_view.0.max(previous_epoch);
607 Self::garbage_collect_tasks_helper(
608 &self.replica_task_map,
609 &gc_epoch,
610 &self.highest_finalized_epoch_view.1,
611 )
612 .await;
613 Self::garbage_collect_tasks_helper(
614 &self.pre_commit_relay_map,
615 &gc_epoch,
616 &self.highest_finalized_epoch_view.1,
617 )
618 .await;
619 Self::garbage_collect_tasks_helper(
620 &self.commit_relay_map,
621 &gc_epoch,
622 &self.highest_finalized_epoch_view.1,
623 )
624 .await;
625 Self::garbage_collect_tasks_helper(
626 &self.finalize_relay_map,
627 &gc_epoch,
628 &self.highest_finalized_epoch_view.1,
629 )
630 .await;
631 }
632
633 async fn garbage_collect_tasks_helper<VAL>(
634 map: &RwLock<TaskMap<TYPES, VAL>>,
635 gc_epoch: &Option<TYPES::Epoch>,
636 gc_view: &TYPES::View,
637 ) {
638 let mut task_map = map.write().await;
639 task_map.retain(|e, _| e >= gc_epoch);
640 if let Some(view_map) = task_map.get_mut(gc_epoch) {
641 view_map.retain(|v, _| v > gc_view)
642 };
643 task_map.retain(|_, view_map| !view_map.is_empty());
644 }
645}
646
647impl<TYPES: NodeType, V: Versions> ViewSyncReplicaTaskState<TYPES, V> {
648 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Replica Task", level = "error")]
649 pub async fn handle(
651 &mut self,
652 event: Arc<HotShotEvent<TYPES>>,
653 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
654 ) -> Option<HotShotTaskCompleted> {
655 match event.as_ref() {
656 HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
657 let last_seen_certificate = ViewSyncPhase::PreCommit;
658
659 if certificate.view_number() < self.next_view {
661 tracing::warn!("We're already in a higher round");
662
663 return None;
664 }
665
666 let membership = self.membership_for_epoch(certificate.epoch()).await?;
667 let membership_stake_table = membership.stake_table().await;
668 let membership_failure_threshold = membership.failure_threshold().await;
669
670 if let Err(e) = certificate
672 .is_valid_cert(
673 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
674 membership_failure_threshold,
675 &self.upgrade_lock,
676 )
677 .await
678 {
679 tracing::error!(
680 "Not valid view sync cert! data: {:?}, error: {}",
681 certificate.data(),
682 e
683 );
684
685 return None;
686 }
687
688 if certificate.view_number() > self.next_view {
691 return Some(HotShotTaskCompleted);
692 }
693
694 if certificate.data().relay > self.relay {
695 self.relay = certificate.data().relay;
696 }
697
698 let Ok(vote) = ViewSyncCommitVote2::<TYPES>::create_signed_vote(
699 ViewSyncCommitData2 {
700 relay: certificate.data().relay,
701 round: self.next_view,
702 epoch: certificate.data().epoch,
703 },
704 self.next_view,
705 &self.public_key,
706 &self.private_key,
707 &self.upgrade_lock,
708 )
709 .await
710 else {
711 tracing::error!("Failed to sign ViewSyncCommitData!");
712 return None;
713 };
714
715 broadcast_event(
716 Arc::new(HotShotEvent::ViewSyncCommitVoteSend(vote)),
717 &event_stream,
718 )
719 .await;
720
721 if let Some(timeout_task) = self.timeout_task.take() {
722 timeout_task.abort();
723 }
724
725 self.timeout_task = Some(spawn({
726 let stream = event_stream.clone();
727 let phase = last_seen_certificate;
728 let relay = self.relay;
729 let next_view = self.next_view;
730 let timeout = self.view_sync_timeout;
731 async move {
732 sleep(timeout).await;
733 tracing::warn!(
734 "Vote sending timed out in ViewSyncPreCommitCertificateRecv, Relay = \
735 {relay}"
736 );
737
738 broadcast_event(
739 Arc::new(HotShotEvent::ViewSyncTimeout(
740 TYPES::View::new(*next_view),
741 relay,
742 phase,
743 )),
744 &stream,
745 )
746 .await;
747 }
748 }));
749 },
750
751 HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
752 let last_seen_certificate = ViewSyncPhase::Commit;
753
754 if certificate.view_number() < self.next_view {
756 tracing::warn!("We're already in a higher round");
757
758 return None;
759 }
760
761 let membership = self.membership_for_epoch(certificate.epoch()).await?;
762 let membership_stake_table = membership.stake_table().await;
763 let membership_success_threshold = membership.success_threshold().await;
764
765 if let Err(e) = certificate
767 .is_valid_cert(
768 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
769 membership_success_threshold,
770 &self.upgrade_lock,
771 )
772 .await
773 {
774 tracing::error!(
775 "Not valid view sync cert! data: {:?}, error: {}",
776 certificate.data(),
777 e
778 );
779
780 return None;
781 }
782
783 if certificate.view_number() > self.next_view {
786 return Some(HotShotTaskCompleted);
787 }
788
789 if certificate.data().relay > self.relay {
790 self.relay = certificate.data().relay;
791 }
792
793 let Ok(vote) = ViewSyncFinalizeVote2::<TYPES>::create_signed_vote(
794 ViewSyncFinalizeData2 {
795 relay: certificate.data().relay,
796 round: self.next_view,
797 epoch: certificate.data().epoch,
798 },
799 self.next_view,
800 &self.public_key,
801 &self.private_key,
802 &self.upgrade_lock,
803 )
804 .await
805 else {
806 tracing::error!("Failed to sign view sync finalized vote!");
807 return None;
808 };
809
810 broadcast_event(
811 Arc::new(HotShotEvent::ViewSyncFinalizeVoteSend(vote)),
812 &event_stream,
813 )
814 .await;
815
816 tracing::info!(
817 "View sync protocol has received view sync evidence to update the view to {}",
818 *self.next_view
819 );
820
821 broadcast_view_change(
822 &event_stream,
823 self.next_view,
824 certificate.epoch(),
825 self.first_epoch,
826 )
827 .await;
828
829 if let Some(timeout_task) = self.timeout_task.take() {
830 timeout_task.abort();
831 }
832 self.timeout_task = Some(spawn({
833 let stream = event_stream.clone();
834 let phase = last_seen_certificate;
835 let relay = self.relay;
836 let next_view = self.next_view;
837 let timeout = self.view_sync_timeout;
838 async move {
839 sleep(timeout).await;
840 tracing::warn!(
841 "Vote sending timed out in ViewSyncCommitCertificateRecv, relay = \
842 {relay}"
843 );
844 broadcast_event(
845 Arc::new(HotShotEvent::ViewSyncTimeout(
846 TYPES::View::new(*next_view),
847 relay,
848 phase,
849 )),
850 &stream,
851 )
852 .await;
853 }
854 }));
855 },
856
857 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
858 if certificate.view_number() < self.next_view {
860 tracing::warn!("We're already in a higher round");
861
862 return None;
863 }
864
865 let membership = self.membership_for_epoch(certificate.epoch()).await?;
866 let membership_stake_table = membership.stake_table().await;
867 let membership_success_threshold = membership.success_threshold().await;
868
869 if let Err(e) = certificate
871 .is_valid_cert(
872 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
873 membership_success_threshold,
874 &self.upgrade_lock,
875 )
876 .await
877 {
878 tracing::error!(
879 "Not valid view sync cert! data: {:?}, error: {}",
880 certificate.data(),
881 e
882 );
883
884 return None;
885 }
886
887 if certificate.view_number() > self.next_view {
890 return Some(HotShotTaskCompleted);
891 }
892
893 if certificate.data().relay > self.relay {
894 self.relay = certificate.data().relay;
895 }
896
897 if let Some(timeout_task) = self.timeout_task.take() {
898 timeout_task.abort();
899 }
900
901 broadcast_view_change(
902 &event_stream,
903 self.next_view,
904 certificate.epoch(),
905 self.first_epoch,
906 )
907 .await;
908 return Some(HotShotTaskCompleted);
909 },
910
911 HotShotEvent::ViewSyncTrigger(view_number) => {
912 let view_number = *view_number;
913 if self.next_view != TYPES::View::new(*view_number) {
914 tracing::error!("Unexpected view number to trigger view sync");
915 return None;
916 }
917
918 let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
919 ViewSyncPreCommitData2 {
920 relay: 0,
921 round: view_number,
922 epoch: self.cur_epoch,
923 },
924 view_number,
925 &self.public_key,
926 &self.private_key,
927 &self.upgrade_lock,
928 )
929 .await
930 else {
931 tracing::error!("Failed to sign pre commit vote!");
932 return None;
933 };
934
935 broadcast_event(
936 Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
937 &event_stream,
938 )
939 .await;
940
941 self.timeout_task = Some(spawn({
942 let stream = event_stream.clone();
943 let relay = self.relay;
944 let next_view = self.next_view;
945 let timeout = self.view_sync_timeout;
946 async move {
947 sleep(timeout).await;
948 tracing::warn!("Vote sending timed out in ViewSyncTrigger");
949 broadcast_event(
950 Arc::new(HotShotEvent::ViewSyncTimeout(
951 TYPES::View::new(*next_view),
952 relay,
953 ViewSyncPhase::None,
954 )),
955 &stream,
956 )
957 .await;
958 }
959 }));
960
961 return None;
962 },
963
964 HotShotEvent::ViewSyncTimeout(round, relay, last_seen_certificate) => {
965 let round = *round;
966 if TYPES::View::new(*round) == self.next_view && *relay == self.relay {
968 if let Some(timeout_task) = self.timeout_task.take() {
969 timeout_task.abort();
970 }
971 self.relay += 1;
972 match last_seen_certificate {
973 ViewSyncPhase::None | ViewSyncPhase::PreCommit | ViewSyncPhase::Commit => {
974 let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
975 ViewSyncPreCommitData2 {
976 relay: self.relay,
977 round: self.next_view,
978 epoch: self.cur_epoch,
979 },
980 self.next_view,
981 &self.public_key,
982 &self.private_key,
983 &self.upgrade_lock,
984 )
985 .await
986 else {
987 tracing::error!("Failed to sign ViewSyncPreCommitData!");
988 return None;
989 };
990
991 broadcast_event(
992 Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
993 &event_stream,
994 )
995 .await;
996 },
997 ViewSyncPhase::Finalize => {
998 unimplemented!()
1000 },
1001 }
1002
1003 self.timeout_task = Some(spawn({
1004 let stream = event_stream.clone();
1005 let relay = self.relay;
1006 let next_view = self.next_view;
1007 let timeout = self.view_sync_timeout;
1008 let last_cert = last_seen_certificate.clone();
1009 async move {
1010 sleep(timeout).await;
1011 tracing::warn!(
1012 "Vote sending timed out in ViewSyncTimeout relay = {relay}"
1013 );
1014 broadcast_event(
1015 Arc::new(HotShotEvent::ViewSyncTimeout(
1016 TYPES::View::new(*next_view),
1017 relay,
1018 last_cert,
1019 )),
1020 &stream,
1021 )
1022 .await;
1023 }
1024 }));
1025
1026 return None;
1027 }
1028 },
1029 _ => return None,
1030 }
1031 None
1032 }
1033
1034 pub async fn membership_for_epoch(
1035 &self,
1036 epoch: Option<TYPES::Epoch>,
1037 ) -> Option<EpochMembership<TYPES>> {
1038 match self
1039 .membership_coordinator
1040 .membership_for_epoch(epoch)
1041 .await
1042 {
1043 Ok(m) => Some(m),
1044 Err(e) => {
1045 tracing::warn!(e.message);
1046 None
1047 },
1048 }
1049 }
1050}