1use std::{collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration};
8
9use async_broadcast::{Receiver, Sender};
10use async_lock::RwLock;
11use async_trait::async_trait;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14 data::{EpochNumber, ViewNumber},
15 epoch_membership::{EpochMembership, EpochMembershipCoordinator},
16 message::UpgradeLock,
17 simple_certificate::{
18 ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
19 },
20 simple_vote::{
21 HasEpoch, ViewSyncCommitData2, ViewSyncCommitVote2, ViewSyncFinalizeData2,
22 ViewSyncFinalizeVote2, ViewSyncPreCommitData2, ViewSyncPreCommitVote2,
23 },
24 stake_table::StakeTableEntries,
25 traits::{node_implementation::NodeType, signature_key::SignatureKey},
26 utils::EpochTransitionIndicator,
27 vote::{Certificate, HasViewNumber, Vote},
28};
29use hotshot_utils::anytrace::*;
30use tokio::{spawn, task::JoinHandle, time::sleep};
31use tracing::instrument;
32
33use crate::{
34 events::{HotShotEvent, HotShotTaskCompleted},
35 helpers::{broadcast_event, broadcast_view_change},
36 vote_collection::{
37 AccumulatorInfo, HandleVoteEvent, VoteCollectionTaskState, create_vote_accumulator,
38 },
39};
40
41#[derive(PartialEq, PartialOrd, Clone, Debug, Eq, Hash)]
42pub enum ViewSyncPhase {
44 None,
46 PreCommit,
48 Commit,
50 Finalize,
52}
53
54type TaskMap<VAL> = BTreeMap<Option<EpochNumber>, BTreeMap<ViewNumber, VAL>>;
55
56type RelayMap<TYPES, VOTE, CERT> =
58 TaskMap<BTreeMap<u64, VoteCollectionTaskState<TYPES, VOTE, CERT>>>;
59
60type ReplicaTaskMap<TYPES> = TaskMap<ViewSyncReplicaTaskState<TYPES>>;
61
62pub struct ViewSyncTaskState<TYPES: NodeType> {
64 pub cur_view: ViewNumber,
66
67 pub next_view: ViewNumber,
69
70 pub cur_epoch: Option<EpochNumber>,
72
73 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
75
76 pub public_key: TYPES::SignatureKey,
78
79 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
81
82 pub id: u64,
84
85 pub num_timeouts_tracked: u64,
87
88 pub replica_task_map: RwLock<ReplicaTaskMap<TYPES>>,
90
91 pub pre_commit_relay_map: RwLock<
93 RelayMap<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>,
94 >,
95
96 pub commit_relay_map:
98 RwLock<RelayMap<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>>,
99
100 pub finalize_relay_map:
102 RwLock<RelayMap<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>>,
103
104 pub view_sync_timeout: Duration,
106
107 pub last_garbage_collected_view: ViewNumber,
109
110 pub upgrade_lock: UpgradeLock<TYPES>,
112
113 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
115
116 pub highest_finalized_epoch_view: (Option<EpochNumber>, ViewNumber),
118
119 pub epoch_height: u64,
120}
121
122#[async_trait]
123impl<TYPES: NodeType> TaskState for ViewSyncTaskState<TYPES> {
124 type Event = HotShotEvent<TYPES>;
125
126 async fn handle_event(
127 &mut self,
128 event: Arc<Self::Event>,
129 sender: &Sender<Arc<Self::Event>>,
130 _receiver: &Receiver<Arc<Self::Event>>,
131 ) -> Result<()> {
132 self.handle(event, sender.clone()).await
133 }
134
135 fn cancel_subtasks(&mut self) {}
136}
137
138pub struct ViewSyncReplicaTaskState<TYPES: NodeType> {
140 pub view_sync_timeout: Duration,
142
143 pub cur_view: ViewNumber,
145
146 pub next_view: ViewNumber,
148
149 pub relay: u64,
151
152 pub finalized: bool,
154
155 pub sent_view_change_event: bool,
157
158 pub timeout_task: Option<JoinHandle<()>>,
160
161 pub id: u64,
163
164 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
166
167 pub public_key: TYPES::SignatureKey,
169
170 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
172
173 pub upgrade_lock: UpgradeLock<TYPES>,
175
176 pub cur_epoch: Option<EpochNumber>,
178
179 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
181}
182
183#[async_trait]
184impl<TYPES: NodeType> TaskState for ViewSyncReplicaTaskState<TYPES> {
185 type Event = HotShotEvent<TYPES>;
186
187 async fn handle_event(
188 &mut self,
189 event: Arc<Self::Event>,
190 sender: &Sender<Arc<Self::Event>>,
191 _receiver: &Receiver<Arc<Self::Event>>,
192 ) -> Result<()> {
193 self.handle(event, sender.clone()).await;
194
195 Ok(())
196 }
197
198 fn cancel_subtasks(&mut self) {}
199}
200
201impl<TYPES: NodeType> ViewSyncTaskState<TYPES> {
202 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "View Sync Main Task", level = "error")]
203 #[allow(clippy::type_complexity)]
204 pub async fn send_to_or_create_replica(
206 &mut self,
207 event: Arc<HotShotEvent<TYPES>>,
208 view: ViewNumber,
209 epoch: Option<EpochNumber>,
210 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
211 ) {
212 let mut task_map = self.replica_task_map.write().await;
213
214 if let Some(replica_task) = task_map.get_mut(&epoch).and_then(|x| x.get_mut(&view)) {
215 tracing::debug!("Forwarding message");
217 let result = replica_task
218 .handle(Arc::clone(&event), sender.clone())
219 .await;
220
221 if result == Some(HotShotTaskCompleted) {
222 if epoch >= self.highest_finalized_epoch_view.0
224 && view > self.highest_finalized_epoch_view.1
225 {
226 self.highest_finalized_epoch_view = (epoch, view);
227 } else if view > self.highest_finalized_epoch_view.1 {
228 tracing::error!(
229 "We finalized a higher view but the epoch is lower. This should never \
230 happen. Current highest finalized epoch view: {:?}, new highest \
231 finalized epoch view: {:?}",
232 self.highest_finalized_epoch_view,
233 (epoch, view)
234 );
235 }
236 task_map.get_mut(&epoch).and_then(|x| x.remove(&view));
237 task_map.retain(|_, x| !x.is_empty());
238 drop(task_map);
239
240 self.garbage_collect_tasks().await;
242 return;
243 }
244
245 return;
246 }
247
248 let mut replica_state: ViewSyncReplicaTaskState<TYPES> = ViewSyncReplicaTaskState {
250 cur_view: view,
251 next_view: view,
252 relay: 0,
253 finalized: false,
254 sent_view_change_event: false,
255 timeout_task: None,
256 membership_coordinator: self.membership_coordinator.clone(),
257 public_key: self.public_key.clone(),
258 private_key: self.private_key.clone(),
259 view_sync_timeout: self.view_sync_timeout,
260 id: self.id,
261 upgrade_lock: self.upgrade_lock.clone(),
262 cur_epoch: self.cur_epoch,
263 first_epoch: self.first_epoch,
264 };
265
266 let result = replica_state
267 .handle(Arc::clone(&event), sender.clone())
268 .await;
269
270 if result == Some(HotShotTaskCompleted) {
271 return;
273 }
274
275 task_map
276 .entry(epoch)
277 .or_default()
278 .insert(view, replica_state);
279 }
280
281 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Main Task", level = "error")]
282 #[allow(clippy::type_complexity)]
283 pub async fn handle(
285 &mut self,
286 event: Arc<HotShotEvent<TYPES>>,
287 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
288 ) -> Result<()> {
289 match event.as_ref() {
290 HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
291 tracing::debug!("Received view sync cert for phase {certificate:?}");
292 let view = certificate.view_number();
293 self.send_to_or_create_replica(
294 Arc::clone(&event),
295 view,
296 certificate.epoch(),
297 &event_stream,
298 )
299 .await;
300 },
301 HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
302 tracing::debug!("Received view sync cert for phase {certificate:?}");
303 let view = certificate.view_number();
304 self.send_to_or_create_replica(
305 Arc::clone(&event),
306 view,
307 certificate.epoch(),
308 &event_stream,
309 )
310 .await;
311 },
312 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
313 tracing::debug!("Received view sync cert for phase {certificate:?}");
314 let view = certificate.view_number();
315 self.send_to_or_create_replica(
316 Arc::clone(&event),
317 view,
318 certificate.epoch(),
319 &event_stream,
320 )
321 .await;
322 },
323 HotShotEvent::ViewSyncTimeout(view, ..) => {
324 tracing::debug!("view sync timeout in main task {view:?}");
325 let view = *view;
326 self.send_to_or_create_replica(
327 Arc::clone(&event),
328 view,
329 self.cur_epoch,
330 &event_stream,
331 )
332 .await;
333 },
334
335 HotShotEvent::ViewSyncPreCommitVoteRecv(vote) => {
336 let mut map = self.pre_commit_relay_map.write().await;
337 let vote_view = vote.view_number();
338 let relay = vote.date().relay;
339 let relay_map = map
340 .entry(vote.date().epoch)
341 .or_insert(BTreeMap::new())
342 .entry(vote_view)
343 .or_insert(BTreeMap::new());
344 if let Some(relay_task) = relay_map.get_mut(&relay) {
345 tracing::debug!("Forwarding message");
346
347 if relay_task
349 .handle_vote_event(Arc::clone(&event), &event_stream)
350 .await?
351 .is_some()
352 {
353 map.get_mut(&vote.date().epoch)
354 .and_then(|x| x.remove(&vote_view));
355 map.retain(|_, x| !x.is_empty());
356 }
357
358 return Ok(());
359 }
360
361 let epoch_mem = self
362 .membership_coordinator
363 .membership_for_epoch(vote.date().epoch)
364 .await?;
365 ensure!(
367 epoch_mem.leader(vote_view + relay).await? == self.public_key,
368 "View sync vote sent to wrong leader"
369 );
370
371 let info = AccumulatorInfo {
372 public_key: self.public_key.clone(),
373 membership: epoch_mem,
374 view: vote_view,
375 id: self.id,
376 };
377 let vote_collector = create_vote_accumulator(
378 &info,
379 event,
380 &event_stream,
381 self.upgrade_lock.clone(),
382 EpochTransitionIndicator::NotInTransition,
383 )
384 .await?;
385
386 relay_map.insert(relay, vote_collector);
387 },
388
389 HotShotEvent::ViewSyncCommitVoteRecv(vote) => {
390 let mut map = self.commit_relay_map.write().await;
391 let vote_view = vote.view_number();
392 let relay = vote.date().relay;
393 let relay_map = map
394 .entry(vote.date().epoch)
395 .or_insert(BTreeMap::new())
396 .entry(vote_view)
397 .or_insert(BTreeMap::new());
398 if let Some(relay_task) = relay_map.get_mut(&relay) {
399 tracing::debug!("Forwarding message");
400
401 if relay_task
403 .handle_vote_event(Arc::clone(&event), &event_stream)
404 .await?
405 .is_some()
406 {
407 map.get_mut(&vote.date().epoch)
408 .and_then(|x| x.remove(&vote_view));
409 map.retain(|_, x| !x.is_empty());
410 }
411
412 return Ok(());
413 }
414
415 let epoch_mem = self
417 .membership_coordinator
418 .membership_for_epoch(vote.date().epoch)
419 .await?;
420 ensure!(
421 epoch_mem.leader(vote_view + relay).await? == self.public_key,
422 debug!("View sync vote sent to wrong leader")
423 );
424
425 let info = AccumulatorInfo {
426 public_key: self.public_key.clone(),
427 membership: epoch_mem,
428 view: vote_view,
429 id: self.id,
430 };
431
432 let vote_collector = create_vote_accumulator(
433 &info,
434 event,
435 &event_stream,
436 self.upgrade_lock.clone(),
437 EpochTransitionIndicator::NotInTransition,
438 )
439 .await?;
440 relay_map.insert(relay, vote_collector);
441 },
442
443 HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
444 let mut map = self.finalize_relay_map.write().await;
445 let vote_view = vote.view_number();
446 let relay = vote.date().relay;
447 let relay_map = map
448 .entry(vote.date().epoch)
449 .or_insert(BTreeMap::new())
450 .entry(vote_view)
451 .or_insert(BTreeMap::new());
452 if let Some(relay_task) = relay_map.get_mut(&relay) {
453 tracing::debug!("Forwarding message");
454
455 if relay_task
457 .handle_vote_event(Arc::clone(&event), &event_stream)
458 .await?
459 .is_some()
460 {
461 map.get_mut(&vote.date().epoch)
462 .and_then(|x| x.remove(&vote_view));
463 map.retain(|_, x| !x.is_empty());
464 }
465
466 return Ok(());
467 }
468
469 let epoch_mem = self
470 .membership_coordinator
471 .membership_for_epoch(vote.date().epoch)
472 .await?;
473 ensure!(
475 epoch_mem.leader(vote_view + relay).await? == self.public_key,
476 debug!("View sync vote sent to wrong leader")
477 );
478
479 let info = AccumulatorInfo {
480 public_key: self.public_key.clone(),
481 membership: epoch_mem,
482 view: vote_view,
483 id: self.id,
484 };
485 let vote_collector = create_vote_accumulator(
486 &info,
487 event,
488 &event_stream,
489 self.upgrade_lock.clone(),
490 EpochTransitionIndicator::NotInTransition,
491 )
492 .await;
493 if let Ok(vote_task) = vote_collector {
494 relay_map.insert(relay, vote_task);
495 }
496 },
497
498 &HotShotEvent::ViewChange(new_view, epoch) => {
499 if epoch > self.cur_epoch {
500 self.cur_epoch = epoch;
501 }
502 let new_view = ViewNumber::new(*new_view);
503 if self.cur_view < new_view {
504 tracing::debug!(
505 "Change from view {} to view {} in view sync task",
506 *self.cur_view,
507 *new_view
508 );
509
510 self.cur_view = new_view;
511 self.next_view = self.cur_view;
512 self.num_timeouts_tracked = 0;
513 }
514
515 self.garbage_collect_tasks().await;
516 },
517 HotShotEvent::LeavesDecided(leaves) => {
518 let finalized_epoch = self.highest_finalized_epoch_view.0.max(
519 leaves
520 .iter()
521 .map(|leaf| leaf.epoch(self.epoch_height))
522 .max()
523 .unwrap_or(None),
524 );
525 let finalized_view = self.highest_finalized_epoch_view.1.max(
526 leaves
527 .iter()
528 .map(|leaf| leaf.view_number())
529 .max()
530 .unwrap_or(ViewNumber::new(0)),
531 );
532
533 self.highest_finalized_epoch_view = (finalized_epoch, finalized_view);
534
535 self.garbage_collect_tasks().await;
536 },
537 &HotShotEvent::Timeout(view_number, ..) => {
538 ensure!(
540 view_number >= self.cur_view,
541 debug!("Discarding old timeout vote.")
542 );
543
544 self.num_timeouts_tracked += 1;
545
546 if self.num_timeouts_tracked >= 3 {
547 tracing::error!("Too many consecutive timeouts! This shouldn't happen");
548 }
549
550 if self.num_timeouts_tracked >= 2 {
551 tracing::error!("Starting view sync protocol for view {}", *view_number + 1);
552
553 self.send_to_or_create_replica(
554 Arc::new(HotShotEvent::ViewSyncTrigger(view_number + 1)),
555 view_number + 1,
556 self.cur_epoch,
557 &event_stream,
558 )
559 .await;
560 } else {
561 self.cur_view = view_number + 1;
563 broadcast_view_change(
564 &event_stream,
565 self.cur_view,
566 self.cur_epoch,
567 self.first_epoch,
568 )
569 .await;
570 }
571 let leader = self
572 .membership_coordinator
573 .membership_for_epoch(self.cur_epoch)
574 .await?
575 .leader(view_number)
576 .await?;
577 tracing::warn!(
578 %leader,
579 leader_mnemonic = hotshot_types::utils::mnemonic(&leader),
580 view_number = *view_number,
581 num_timeouts_tracked = self.num_timeouts_tracked,
582 "view timed out",
583 );
584 },
585 HotShotEvent::SetFirstEpoch(view, epoch) => {
586 self.first_epoch = Some((*view, *epoch));
587 },
588
589 _ => {},
590 }
591 Ok(())
592 }
593
594 async fn garbage_collect_tasks(&self) {
598 let previous_epoch = self
599 .cur_epoch
600 .map(|e| e.saturating_sub(1))
601 .map(EpochNumber::new);
602 let gc_epoch = self.highest_finalized_epoch_view.0.max(previous_epoch);
603 Self::garbage_collect_tasks_helper(
604 &self.replica_task_map,
605 &gc_epoch,
606 &self.highest_finalized_epoch_view.1,
607 )
608 .await;
609 Self::garbage_collect_tasks_helper(
610 &self.pre_commit_relay_map,
611 &gc_epoch,
612 &self.highest_finalized_epoch_view.1,
613 )
614 .await;
615 Self::garbage_collect_tasks_helper(
616 &self.commit_relay_map,
617 &gc_epoch,
618 &self.highest_finalized_epoch_view.1,
619 )
620 .await;
621 Self::garbage_collect_tasks_helper(
622 &self.finalize_relay_map,
623 &gc_epoch,
624 &self.highest_finalized_epoch_view.1,
625 )
626 .await;
627 }
628
629 async fn garbage_collect_tasks_helper<VAL>(
630 map: &RwLock<TaskMap<VAL>>,
631 gc_epoch: &Option<EpochNumber>,
632 gc_view: &ViewNumber,
633 ) {
634 let mut task_map = map.write().await;
635 task_map.retain(|e, _| e >= gc_epoch);
636 if let Some(view_map) = task_map.get_mut(gc_epoch) {
637 view_map.retain(|v, _| v > gc_view)
638 };
639 task_map.retain(|_, view_map| !view_map.is_empty());
640 }
641}
642
643impl<TYPES: NodeType> ViewSyncReplicaTaskState<TYPES> {
644 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Replica Task", level = "error")]
645 pub async fn handle(
647 &mut self,
648 event: Arc<HotShotEvent<TYPES>>,
649 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
650 ) -> Option<HotShotTaskCompleted> {
651 match event.as_ref() {
652 HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
653 let last_seen_certificate = ViewSyncPhase::PreCommit;
654
655 if certificate.view_number() < self.next_view {
657 tracing::warn!("We're already in a higher round");
658
659 return None;
660 }
661
662 let membership = self.membership_for_epoch(certificate.epoch()).await?;
663 let membership_stake_table = membership.stake_table().await;
664 let membership_failure_threshold = membership.failure_threshold().await;
665
666 if let Err(e) = certificate
668 .is_valid_cert(
669 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
670 membership_failure_threshold,
671 &self.upgrade_lock,
672 )
673 .await
674 {
675 tracing::error!(
676 "Not valid view sync cert! data: {:?}, error: {}",
677 certificate.data(),
678 e
679 );
680
681 return None;
682 }
683
684 if certificate.view_number() > self.next_view {
687 return Some(HotShotTaskCompleted);
688 }
689
690 if certificate.data().relay > self.relay {
691 self.relay = certificate.data().relay;
692 }
693
694 let Ok(vote) = ViewSyncCommitVote2::<TYPES>::create_signed_vote(
695 ViewSyncCommitData2 {
696 relay: certificate.data().relay,
697 round: self.next_view,
698 epoch: certificate.data().epoch,
699 },
700 self.next_view,
701 &self.public_key,
702 &self.private_key,
703 &self.upgrade_lock,
704 )
705 .await
706 else {
707 tracing::error!("Failed to sign ViewSyncCommitData!");
708 return None;
709 };
710
711 broadcast_event(
712 Arc::new(HotShotEvent::ViewSyncCommitVoteSend(vote)),
713 &event_stream,
714 )
715 .await;
716
717 if let Some(timeout_task) = self.timeout_task.take() {
718 timeout_task.abort();
719 }
720
721 self.timeout_task = Some(spawn({
722 let stream = event_stream.clone();
723 let phase = last_seen_certificate;
724 let relay = self.relay;
725 let next_view = self.next_view;
726 let timeout = self.view_sync_timeout;
727 async move {
728 sleep(timeout).await;
729 tracing::warn!(
730 "Vote sending timed out in ViewSyncPreCommitCertificateRecv, Relay = \
731 {relay}"
732 );
733
734 broadcast_event(
735 Arc::new(HotShotEvent::ViewSyncTimeout(
736 ViewNumber::new(*next_view),
737 relay,
738 phase,
739 )),
740 &stream,
741 )
742 .await;
743 }
744 }));
745 },
746
747 HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
748 let last_seen_certificate = ViewSyncPhase::Commit;
749
750 if certificate.view_number() < self.next_view {
752 tracing::warn!("We're already in a higher round");
753
754 return None;
755 }
756
757 let membership = self.membership_for_epoch(certificate.epoch()).await?;
758 let membership_stake_table = membership.stake_table().await;
759 let membership_success_threshold = membership.success_threshold().await;
760
761 if let Err(e) = certificate
763 .is_valid_cert(
764 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
765 membership_success_threshold,
766 &self.upgrade_lock,
767 )
768 .await
769 {
770 tracing::error!(
771 "Not valid view sync cert! data: {:?}, error: {}",
772 certificate.data(),
773 e
774 );
775
776 return None;
777 }
778
779 if certificate.view_number() > self.next_view {
782 return Some(HotShotTaskCompleted);
783 }
784
785 if certificate.data().relay > self.relay {
786 self.relay = certificate.data().relay;
787 }
788
789 let Ok(vote) = ViewSyncFinalizeVote2::<TYPES>::create_signed_vote(
790 ViewSyncFinalizeData2 {
791 relay: certificate.data().relay,
792 round: self.next_view,
793 epoch: certificate.data().epoch,
794 },
795 self.next_view,
796 &self.public_key,
797 &self.private_key,
798 &self.upgrade_lock,
799 )
800 .await
801 else {
802 tracing::error!("Failed to sign view sync finalized vote!");
803 return None;
804 };
805
806 broadcast_event(
807 Arc::new(HotShotEvent::ViewSyncFinalizeVoteSend(vote)),
808 &event_stream,
809 )
810 .await;
811
812 tracing::info!(
813 "View sync protocol has received view sync evidence to update the view to {}",
814 *self.next_view
815 );
816
817 broadcast_view_change(
818 &event_stream,
819 self.next_view,
820 certificate.epoch(),
821 self.first_epoch,
822 )
823 .await;
824
825 if let Some(timeout_task) = self.timeout_task.take() {
826 timeout_task.abort();
827 }
828 self.timeout_task = Some(spawn({
829 let stream = event_stream.clone();
830 let phase = last_seen_certificate;
831 let relay = self.relay;
832 let next_view = self.next_view;
833 let timeout = self.view_sync_timeout;
834 async move {
835 sleep(timeout).await;
836 tracing::warn!(
837 "Vote sending timed out in ViewSyncCommitCertificateRecv, relay = \
838 {relay}"
839 );
840 broadcast_event(
841 Arc::new(HotShotEvent::ViewSyncTimeout(
842 ViewNumber::new(*next_view),
843 relay,
844 phase,
845 )),
846 &stream,
847 )
848 .await;
849 }
850 }));
851 },
852
853 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
854 if certificate.view_number() < self.next_view {
856 tracing::warn!("We're already in a higher round");
857
858 return None;
859 }
860
861 let membership = self.membership_for_epoch(certificate.epoch()).await?;
862 let membership_stake_table = membership.stake_table().await;
863 let membership_success_threshold = membership.success_threshold().await;
864
865 if let Err(e) = certificate
867 .is_valid_cert(
868 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
869 membership_success_threshold,
870 &self.upgrade_lock,
871 )
872 .await
873 {
874 tracing::error!(
875 "Not valid view sync cert! data: {:?}, error: {}",
876 certificate.data(),
877 e
878 );
879
880 return None;
881 }
882
883 if certificate.view_number() > self.next_view {
886 return Some(HotShotTaskCompleted);
887 }
888
889 if certificate.data().relay > self.relay {
890 self.relay = certificate.data().relay;
891 }
892
893 if let Some(timeout_task) = self.timeout_task.take() {
894 timeout_task.abort();
895 }
896
897 broadcast_view_change(
898 &event_stream,
899 self.next_view,
900 certificate.epoch(),
901 self.first_epoch,
902 )
903 .await;
904 return Some(HotShotTaskCompleted);
905 },
906
907 HotShotEvent::ViewSyncTrigger(view_number) => {
908 let view_number = *view_number;
909 if self.next_view != ViewNumber::new(*view_number) {
910 tracing::error!("Unexpected view number to trigger view sync");
911 return None;
912 }
913
914 let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
915 ViewSyncPreCommitData2 {
916 relay: 0,
917 round: view_number,
918 epoch: self.cur_epoch,
919 },
920 view_number,
921 &self.public_key,
922 &self.private_key,
923 &self.upgrade_lock,
924 )
925 .await
926 else {
927 tracing::error!("Failed to sign pre commit vote!");
928 return None;
929 };
930
931 broadcast_event(
932 Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
933 &event_stream,
934 )
935 .await;
936
937 self.timeout_task = Some(spawn({
938 let stream = event_stream.clone();
939 let relay = self.relay;
940 let next_view = self.next_view;
941 let timeout = self.view_sync_timeout;
942 async move {
943 sleep(timeout).await;
944 tracing::warn!("Vote sending timed out in ViewSyncTrigger");
945 broadcast_event(
946 Arc::new(HotShotEvent::ViewSyncTimeout(
947 ViewNumber::new(*next_view),
948 relay,
949 ViewSyncPhase::None,
950 )),
951 &stream,
952 )
953 .await;
954 }
955 }));
956
957 return None;
958 },
959
960 HotShotEvent::ViewSyncTimeout(round, relay, last_seen_certificate) => {
961 let round = *round;
962 if ViewNumber::new(*round) == self.next_view && *relay == self.relay {
964 if let Some(timeout_task) = self.timeout_task.take() {
965 timeout_task.abort();
966 }
967 self.relay += 1;
968 match last_seen_certificate {
969 ViewSyncPhase::None | ViewSyncPhase::PreCommit | ViewSyncPhase::Commit => {
970 let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
971 ViewSyncPreCommitData2 {
972 relay: self.relay,
973 round: self.next_view,
974 epoch: self.cur_epoch,
975 },
976 self.next_view,
977 &self.public_key,
978 &self.private_key,
979 &self.upgrade_lock,
980 )
981 .await
982 else {
983 tracing::error!("Failed to sign ViewSyncPreCommitData!");
984 return None;
985 };
986
987 broadcast_event(
988 Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
989 &event_stream,
990 )
991 .await;
992 },
993 ViewSyncPhase::Finalize => {
994 unimplemented!()
996 },
997 }
998
999 self.timeout_task = Some(spawn({
1000 let stream = event_stream.clone();
1001 let relay = self.relay;
1002 let next_view = self.next_view;
1003 let timeout = self.view_sync_timeout;
1004 let last_cert = last_seen_certificate.clone();
1005 async move {
1006 sleep(timeout).await;
1007 tracing::warn!(
1008 "Vote sending timed out in ViewSyncTimeout relay = {relay}"
1009 );
1010 broadcast_event(
1011 Arc::new(HotShotEvent::ViewSyncTimeout(
1012 ViewNumber::new(*next_view),
1013 relay,
1014 last_cert,
1015 )),
1016 &stream,
1017 )
1018 .await;
1019 }
1020 }));
1021
1022 return None;
1023 }
1024 },
1025 _ => return None,
1026 }
1027 None
1028 }
1029
1030 pub async fn membership_for_epoch(
1031 &self,
1032 epoch: Option<EpochNumber>,
1033 ) -> Option<EpochMembership<TYPES>> {
1034 match self
1035 .membership_coordinator
1036 .membership_for_epoch(epoch)
1037 .await
1038 {
1039 Ok(m) => Some(m),
1040 Err(e) => {
1041 tracing::warn!(e.message);
1042 None
1043 },
1044 }
1045 }
1046}