1use std::{
8 collections::{BTreeMap, HashMap},
9 fmt::Debug,
10 sync::Arc,
11 time::Duration,
12};
13
14use async_broadcast::{Receiver, Sender};
15use async_lock::RwLock;
16use async_trait::async_trait;
17use hotshot_task::task::TaskState;
18use hotshot_types::{
19 epoch_membership::{EpochMembership, EpochMembershipCoordinator},
20 message::UpgradeLock,
21 simple_certificate::{
22 ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
23 },
24 simple_vote::{
25 HasEpoch, ViewSyncCommitData2, ViewSyncCommitVote2, ViewSyncFinalizeData2,
26 ViewSyncFinalizeVote2, ViewSyncPreCommitData2, ViewSyncPreCommitVote2,
27 },
28 stake_table::StakeTableEntries,
29 traits::{
30 node_implementation::{ConsensusTime, NodeType, Versions},
31 signature_key::SignatureKey,
32 },
33 utils::EpochTransitionIndicator,
34 vote::{Certificate, HasViewNumber, Vote},
35};
36use hotshot_utils::anytrace::*;
37use tokio::{spawn, task::JoinHandle, time::sleep};
38use tracing::instrument;
39
40use crate::{
41 events::{HotShotEvent, HotShotTaskCompleted},
42 helpers::{broadcast_event, broadcast_view_change},
43 vote_collection::{
44 create_vote_accumulator, AccumulatorInfo, HandleVoteEvent, VoteCollectionTaskState,
45 },
46};
47
48#[derive(PartialEq, PartialOrd, Clone, Debug, Eq, Hash)]
49pub enum ViewSyncPhase {
51 None,
53 PreCommit,
55 Commit,
57 Finalize,
59}
60
61type RelayMap<TYPES, VOTE, CERT, V> = HashMap<
63 (
64 <TYPES as NodeType>::View,
65 Option<<TYPES as NodeType>::Epoch>,
66 ),
67 BTreeMap<u64, VoteCollectionTaskState<TYPES, VOTE, CERT, V>>,
68>;
69
70type ReplicaTaskMap<TYPES, V> = HashMap<
71 (
72 <TYPES as NodeType>::View,
73 Option<<TYPES as NodeType>::Epoch>,
74 ),
75 ViewSyncReplicaTaskState<TYPES, V>,
76>;
77
78pub struct ViewSyncTaskState<TYPES: NodeType, V: Versions> {
80 pub cur_view: TYPES::View,
82
83 pub next_view: TYPES::View,
85
86 pub cur_epoch: Option<TYPES::Epoch>,
88
89 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
91
92 pub public_key: TYPES::SignatureKey,
94
95 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
97
98 pub id: u64,
100
101 pub num_timeouts_tracked: u64,
103
104 pub replica_task_map: RwLock<ReplicaTaskMap<TYPES, V>>,
106
107 pub pre_commit_relay_map: RwLock<
109 RelayMap<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>, V>,
110 >,
111
112 pub commit_relay_map:
114 RwLock<RelayMap<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>, V>>,
115
116 pub finalize_relay_map: RwLock<
118 RelayMap<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>, V>,
119 >,
120
121 pub view_sync_timeout: Duration,
123
124 pub last_garbage_collected_view: TYPES::View,
126
127 pub upgrade_lock: UpgradeLock<TYPES, V>,
129
130 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
132}
133
134#[async_trait]
135impl<TYPES: NodeType, V: Versions> TaskState for ViewSyncTaskState<TYPES, V> {
136 type Event = HotShotEvent<TYPES>;
137
138 async fn handle_event(
139 &mut self,
140 event: Arc<Self::Event>,
141 sender: &Sender<Arc<Self::Event>>,
142 _receiver: &Receiver<Arc<Self::Event>>,
143 ) -> Result<()> {
144 self.handle(event, sender.clone()).await
145 }
146
147 fn cancel_subtasks(&mut self) {}
148}
149
150pub struct ViewSyncReplicaTaskState<TYPES: NodeType, V: Versions> {
152 pub view_sync_timeout: Duration,
154
155 pub cur_view: TYPES::View,
157
158 pub next_view: TYPES::View,
160
161 pub relay: u64,
163
164 pub finalized: bool,
166
167 pub sent_view_change_event: bool,
169
170 pub timeout_task: Option<JoinHandle<()>>,
172
173 pub id: u64,
175
176 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
178
179 pub public_key: TYPES::SignatureKey,
181
182 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
184
185 pub upgrade_lock: UpgradeLock<TYPES, V>,
187
188 pub cur_epoch: Option<TYPES::Epoch>,
190
191 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
193}
194
195#[async_trait]
196impl<TYPES: NodeType, V: Versions> TaskState for ViewSyncReplicaTaskState<TYPES, V> {
197 type Event = HotShotEvent<TYPES>;
198
199 async fn handle_event(
200 &mut self,
201 event: Arc<Self::Event>,
202 sender: &Sender<Arc<Self::Event>>,
203 _receiver: &Receiver<Arc<Self::Event>>,
204 ) -> Result<()> {
205 self.handle(event, sender.clone()).await;
206
207 Ok(())
208 }
209
210 fn cancel_subtasks(&mut self) {}
211}
212
213impl<TYPES: NodeType, V: Versions> ViewSyncTaskState<TYPES, V> {
214 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "View Sync Main Task", level = "error")]
215 #[allow(clippy::type_complexity)]
216 pub async fn send_to_or_create_replica(
218 &mut self,
219 event: Arc<HotShotEvent<TYPES>>,
220 view: TYPES::View,
221 epoch: Option<TYPES::Epoch>,
222 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
223 ) {
224 if self.cur_view > view {
227 tracing::debug!("Already in a higher view than the view sync message");
228 return;
229 }
230
231 let mut task_map = self.replica_task_map.write().await;
232
233 if let Some(replica_task) = task_map.get_mut(&(view, epoch)) {
234 tracing::debug!("Forwarding message");
236 let result = replica_task
237 .handle(Arc::clone(&event), sender.clone())
238 .await;
239
240 if result == Some(HotShotTaskCompleted) {
241 task_map.remove(&(view, epoch));
243 return;
244 }
245
246 return;
247 }
248
249 let mut replica_state: ViewSyncReplicaTaskState<TYPES, V> = ViewSyncReplicaTaskState {
251 cur_view: view,
252 next_view: view,
253 relay: 0,
254 finalized: false,
255 sent_view_change_event: false,
256 timeout_task: None,
257 membership_coordinator: self.membership_coordinator.clone(),
258 public_key: self.public_key.clone(),
259 private_key: self.private_key.clone(),
260 view_sync_timeout: self.view_sync_timeout,
261 id: self.id,
262 upgrade_lock: self.upgrade_lock.clone(),
263 cur_epoch: self.cur_epoch,
264 first_epoch: self.first_epoch,
265 };
266
267 let result = replica_state
268 .handle(Arc::clone(&event), sender.clone())
269 .await;
270
271 if result == Some(HotShotTaskCompleted) {
272 return;
274 }
275
276 task_map.insert((view, epoch), replica_state);
277 }
278
279 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Main Task", level = "error")]
280 #[allow(clippy::type_complexity)]
281 pub async fn handle(
283 &mut self,
284 event: Arc<HotShotEvent<TYPES>>,
285 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
286 ) -> Result<()> {
287 match event.as_ref() {
288 HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
289 tracing::debug!("Received view sync cert for phase {certificate:?}");
290 let view = certificate.view_number();
291 self.send_to_or_create_replica(
292 Arc::clone(&event),
293 view,
294 certificate.epoch(),
295 &event_stream,
296 )
297 .await;
298 },
299 HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
300 tracing::debug!("Received view sync cert for phase {certificate:?}");
301 let view = certificate.view_number();
302 self.send_to_or_create_replica(
303 Arc::clone(&event),
304 view,
305 certificate.epoch(),
306 &event_stream,
307 )
308 .await;
309 },
310 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
311 tracing::debug!("Received view sync cert for phase {certificate:?}");
312 let view = certificate.view_number();
313 self.send_to_or_create_replica(
314 Arc::clone(&event),
315 view,
316 certificate.epoch(),
317 &event_stream,
318 )
319 .await;
320 },
321 HotShotEvent::ViewSyncTimeout(view, ..) => {
322 tracing::debug!("view sync timeout in main task {view:?}");
323 let view = *view;
324 self.send_to_or_create_replica(
325 Arc::clone(&event),
326 view,
327 self.cur_epoch,
328 &event_stream,
329 )
330 .await;
331 },
332
333 HotShotEvent::ViewSyncPreCommitVoteRecv(ref vote) => {
334 let mut map = self.pre_commit_relay_map.write().await;
335 let vote_view = vote.view_number();
336 let relay = vote.date().relay;
337 let relay_map = map
338 .entry((vote_view, vote.date().epoch))
339 .or_insert(BTreeMap::new());
340 if let Some(relay_task) = relay_map.get_mut(&relay) {
341 tracing::debug!("Forwarding message");
342
343 if relay_task
345 .handle_vote_event(Arc::clone(&event), &event_stream)
346 .await?
347 .is_some()
348 {
349 map.remove(&(vote_view, vote.date().epoch));
350 }
351
352 return Ok(());
353 }
354
355 let epoch_mem = self
356 .membership_coordinator
357 .membership_for_epoch(vote.date().epoch)
358 .await?;
359 ensure!(
361 epoch_mem.leader(vote_view + relay).await? == self.public_key,
362 "View sync vote sent to wrong leader"
363 );
364
365 let info = AccumulatorInfo {
366 public_key: self.public_key.clone(),
367 membership: epoch_mem,
368 view: vote_view,
369 id: self.id,
370 };
371 let vote_collector = create_vote_accumulator(
372 &info,
373 event,
374 &event_stream,
375 self.upgrade_lock.clone(),
376 EpochTransitionIndicator::NotInTransition,
377 )
378 .await?;
379
380 relay_map.insert(relay, vote_collector);
381 },
382
383 HotShotEvent::ViewSyncCommitVoteRecv(ref vote) => {
384 let mut map = self.commit_relay_map.write().await;
385 let vote_view = vote.view_number();
386 let relay = vote.date().relay;
387 let relay_map = map
388 .entry((vote_view, vote.date().epoch))
389 .or_insert(BTreeMap::new());
390 if let Some(relay_task) = relay_map.get_mut(&relay) {
391 tracing::debug!("Forwarding message");
392
393 if relay_task
395 .handle_vote_event(Arc::clone(&event), &event_stream)
396 .await?
397 .is_some()
398 {
399 map.remove(&(vote_view, vote.date().epoch));
400 }
401
402 return Ok(());
403 }
404
405 let epoch_mem = self
407 .membership_coordinator
408 .membership_for_epoch(vote.date().epoch)
409 .await?;
410 ensure!(
411 epoch_mem.leader(vote_view + relay).await? == self.public_key,
412 debug!("View sync vote sent to wrong leader")
413 );
414
415 let info = AccumulatorInfo {
416 public_key: self.public_key.clone(),
417 membership: epoch_mem,
418 view: vote_view,
419 id: self.id,
420 };
421
422 let vote_collector = create_vote_accumulator(
423 &info,
424 event,
425 &event_stream,
426 self.upgrade_lock.clone(),
427 EpochTransitionIndicator::NotInTransition,
428 )
429 .await?;
430 relay_map.insert(relay, vote_collector);
431 },
432
433 HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
434 let mut map = self.finalize_relay_map.write().await;
435 let vote_view = vote.view_number();
436 let relay = vote.date().relay;
437 let relay_map = map
438 .entry((vote_view, vote.date().epoch))
439 .or_insert(BTreeMap::new());
440 if let Some(relay_task) = relay_map.get_mut(&relay) {
441 tracing::debug!("Forwarding message");
442
443 if relay_task
445 .handle_vote_event(Arc::clone(&event), &event_stream)
446 .await?
447 .is_some()
448 {
449 map.remove(&(vote_view, vote.date().epoch));
450 }
451
452 return Ok(());
453 }
454
455 let epoch_mem = self
456 .membership_coordinator
457 .membership_for_epoch(vote.date().epoch)
458 .await?;
459 ensure!(
461 epoch_mem.leader(vote_view + relay).await? == self.public_key,
462 debug!("View sync vote sent to wrong leader")
463 );
464
465 let info = AccumulatorInfo {
466 public_key: self.public_key.clone(),
467 membership: epoch_mem,
468 view: vote_view,
469 id: self.id,
470 };
471 let vote_collector = create_vote_accumulator(
472 &info,
473 event,
474 &event_stream,
475 self.upgrade_lock.clone(),
476 EpochTransitionIndicator::NotInTransition,
477 )
478 .await;
479 if let Ok(vote_task) = vote_collector {
480 relay_map.insert(relay, vote_task);
481 }
482 },
483
484 &HotShotEvent::ViewChange(new_view, epoch) => {
485 if epoch > self.cur_epoch {
486 self.cur_epoch = epoch;
487 }
488 let new_view = TYPES::View::new(*new_view);
489 if self.cur_view < new_view {
490 tracing::debug!(
491 "Change from view {} to view {} in view sync task",
492 *self.cur_view,
493 *new_view
494 );
495
496 self.cur_view = new_view;
497 self.next_view = self.cur_view;
498 self.num_timeouts_tracked = 0;
499
500 for i in *self.last_garbage_collected_view..*self.cur_view {
504 self.replica_task_map
505 .write()
506 .await
507 .remove_entry(&(TYPES::View::new(i), epoch));
508 self.pre_commit_relay_map
509 .write()
510 .await
511 .remove_entry(&(TYPES::View::new(i), epoch));
512 self.commit_relay_map
513 .write()
514 .await
515 .remove_entry(&(TYPES::View::new(i), epoch));
516 self.finalize_relay_map
517 .write()
518 .await
519 .remove_entry(&(TYPES::View::new(i), epoch));
520 }
521
522 self.last_garbage_collected_view = self.cur_view - 1;
523 }
524 },
525 &HotShotEvent::Timeout(view_number, ..) => {
526 ensure!(
528 view_number >= self.cur_view,
529 debug!("Discarding old timeout vote.")
530 );
531
532 self.num_timeouts_tracked += 1;
533 let leader = self
534 .membership_coordinator
535 .membership_for_epoch(self.cur_epoch)
536 .await?
537 .leader(view_number)
538 .await?;
539 tracing::warn!(
540 %leader,
541 leader_mnemonic = hotshot_types::utils::mnemonic(&leader),
542 view_number = *view_number,
543 num_timeouts_tracked = self.num_timeouts_tracked,
544 "view timed out",
545 );
546
547 if self.num_timeouts_tracked >= 3 {
548 tracing::error!("Too many consecutive timeouts! This shouldn't happen");
549 }
550
551 if self.num_timeouts_tracked >= 2 {
552 tracing::error!("Starting view sync protocol for view {}", *view_number + 1);
553
554 self.send_to_or_create_replica(
555 Arc::new(HotShotEvent::ViewSyncTrigger(view_number + 1)),
556 view_number + 1,
557 self.cur_epoch,
558 &event_stream,
559 )
560 .await;
561 } else {
562 self.cur_view = view_number + 1;
564 broadcast_view_change(
565 &event_stream,
566 self.cur_view,
567 self.cur_epoch,
568 self.first_epoch,
569 )
570 .await;
571 }
572 },
573 HotShotEvent::SetFirstEpoch(view, epoch) => {
574 self.first_epoch = Some((*view, *epoch));
575 },
576
577 _ => {},
578 }
579 Ok(())
580 }
581}
582
583impl<TYPES: NodeType, V: Versions> ViewSyncReplicaTaskState<TYPES, V> {
584 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Replica Task", level = "error")]
585 pub async fn handle(
587 &mut self,
588 event: Arc<HotShotEvent<TYPES>>,
589 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
590 ) -> Option<HotShotTaskCompleted> {
591 match event.as_ref() {
592 HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
593 let last_seen_certificate = ViewSyncPhase::PreCommit;
594
595 if certificate.view_number() < self.next_view {
597 tracing::warn!("We're already in a higher round");
598
599 return None;
600 }
601
602 let membership = self.membership_for_epoch(certificate.epoch()).await?;
603 let membership_stake_table = membership.stake_table().await;
604 let membership_failure_threshold = membership.failure_threshold().await;
605
606 if let Err(e) = certificate
608 .is_valid_cert(
609 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
610 membership_failure_threshold,
611 &self.upgrade_lock,
612 )
613 .await
614 {
615 tracing::error!(
616 "Not valid view sync cert! data: {:?}, error: {}",
617 certificate.data(),
618 e
619 );
620
621 return None;
622 }
623
624 if certificate.view_number() > self.next_view {
627 return Some(HotShotTaskCompleted);
628 }
629
630 if certificate.data().relay > self.relay {
631 self.relay = certificate.data().relay;
632 }
633
634 let Ok(vote) = ViewSyncCommitVote2::<TYPES>::create_signed_vote(
635 ViewSyncCommitData2 {
636 relay: certificate.data().relay,
637 round: self.next_view,
638 epoch: certificate.data().epoch,
639 },
640 self.next_view,
641 &self.public_key,
642 &self.private_key,
643 &self.upgrade_lock,
644 )
645 .await
646 else {
647 tracing::error!("Failed to sign ViewSyncCommitData!");
648 return None;
649 };
650
651 broadcast_event(
652 Arc::new(HotShotEvent::ViewSyncCommitVoteSend(vote)),
653 &event_stream,
654 )
655 .await;
656
657 if let Some(timeout_task) = self.timeout_task.take() {
658 timeout_task.abort();
659 }
660
661 self.timeout_task = Some(spawn({
662 let stream = event_stream.clone();
663 let phase = last_seen_certificate;
664 let relay = self.relay;
665 let next_view = self.next_view;
666 let timeout = self.view_sync_timeout;
667 async move {
668 sleep(timeout).await;
669 tracing::warn!("Vote sending timed out in ViewSyncPreCommitCertificateRecv, Relay = {relay}");
670
671 broadcast_event(
672 Arc::new(HotShotEvent::ViewSyncTimeout(
673 TYPES::View::new(*next_view),
674 relay,
675 phase,
676 )),
677 &stream,
678 )
679 .await;
680 }
681 }));
682 },
683
684 HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
685 let last_seen_certificate = ViewSyncPhase::Commit;
686
687 if certificate.view_number() < self.next_view {
689 tracing::warn!("We're already in a higher round");
690
691 return None;
692 }
693
694 let membership = self.membership_for_epoch(certificate.epoch()).await?;
695 let membership_stake_table = membership.stake_table().await;
696 let membership_success_threshold = membership.success_threshold().await;
697
698 if let Err(e) = certificate
700 .is_valid_cert(
701 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
702 membership_success_threshold,
703 &self.upgrade_lock,
704 )
705 .await
706 {
707 tracing::error!(
708 "Not valid view sync cert! data: {:?}, error: {}",
709 certificate.data(),
710 e
711 );
712
713 return None;
714 }
715
716 if certificate.view_number() > self.next_view {
719 return Some(HotShotTaskCompleted);
720 }
721
722 if certificate.data().relay > self.relay {
723 self.relay = certificate.data().relay;
724 }
725
726 let Ok(vote) = ViewSyncFinalizeVote2::<TYPES>::create_signed_vote(
727 ViewSyncFinalizeData2 {
728 relay: certificate.data().relay,
729 round: self.next_view,
730 epoch: certificate.data().epoch,
731 },
732 self.next_view,
733 &self.public_key,
734 &self.private_key,
735 &self.upgrade_lock,
736 )
737 .await
738 else {
739 tracing::error!("Failed to sign view sync finalized vote!");
740 return None;
741 };
742
743 broadcast_event(
744 Arc::new(HotShotEvent::ViewSyncFinalizeVoteSend(vote)),
745 &event_stream,
746 )
747 .await;
748
749 tracing::info!(
750 "View sync protocol has received view sync evidence to update the view to {}",
751 *self.next_view
752 );
753
754 broadcast_view_change(
755 &event_stream,
756 self.next_view,
757 certificate.epoch(),
758 self.first_epoch,
759 )
760 .await;
761
762 if let Some(timeout_task) = self.timeout_task.take() {
763 timeout_task.abort();
764 }
765 self.timeout_task = Some(spawn({
766 let stream = event_stream.clone();
767 let phase = last_seen_certificate;
768 let relay = self.relay;
769 let next_view = self.next_view;
770 let timeout = self.view_sync_timeout;
771 async move {
772 sleep(timeout).await;
773 tracing::warn!("Vote sending timed out in ViewSyncCommitCertificateRecv, relay = {relay}");
774 broadcast_event(
775 Arc::new(HotShotEvent::ViewSyncTimeout(
776 TYPES::View::new(*next_view),
777 relay,
778 phase,
779 )),
780 &stream,
781 )
782 .await;
783 }
784 }));
785 },
786
787 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
788 if certificate.view_number() < self.next_view {
790 tracing::warn!("We're already in a higher round");
791
792 return None;
793 }
794
795 let membership = self.membership_for_epoch(certificate.epoch()).await?;
796 let membership_stake_table = membership.stake_table().await;
797 let membership_success_threshold = membership.success_threshold().await;
798
799 if let Err(e) = certificate
801 .is_valid_cert(
802 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
803 membership_success_threshold,
804 &self.upgrade_lock,
805 )
806 .await
807 {
808 tracing::error!(
809 "Not valid view sync cert! data: {:?}, error: {}",
810 certificate.data(),
811 e
812 );
813
814 return None;
815 }
816
817 if certificate.view_number() > self.next_view {
820 return Some(HotShotTaskCompleted);
821 }
822
823 if certificate.data().relay > self.relay {
824 self.relay = certificate.data().relay;
825 }
826
827 if let Some(timeout_task) = self.timeout_task.take() {
828 timeout_task.abort();
829 }
830
831 broadcast_view_change(
832 &event_stream,
833 self.next_view,
834 certificate.epoch(),
835 self.first_epoch,
836 )
837 .await;
838 return Some(HotShotTaskCompleted);
839 },
840
841 HotShotEvent::ViewSyncTrigger(view_number) => {
842 let view_number = *view_number;
843 if self.next_view != TYPES::View::new(*view_number) {
844 tracing::error!("Unexpected view number to trigger view sync");
845 return None;
846 }
847
848 let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
849 ViewSyncPreCommitData2 {
850 relay: 0,
851 round: view_number,
852 epoch: self.cur_epoch,
853 },
854 view_number,
855 &self.public_key,
856 &self.private_key,
857 &self.upgrade_lock,
858 )
859 .await
860 else {
861 tracing::error!("Failed to sign pre commit vote!");
862 return None;
863 };
864
865 broadcast_event(
866 Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
867 &event_stream,
868 )
869 .await;
870
871 self.timeout_task = Some(spawn({
872 let stream = event_stream.clone();
873 let relay = self.relay;
874 let next_view = self.next_view;
875 let timeout = self.view_sync_timeout;
876 async move {
877 sleep(timeout).await;
878 tracing::warn!("Vote sending timed out in ViewSyncTrigger");
879 broadcast_event(
880 Arc::new(HotShotEvent::ViewSyncTimeout(
881 TYPES::View::new(*next_view),
882 relay,
883 ViewSyncPhase::None,
884 )),
885 &stream,
886 )
887 .await;
888 }
889 }));
890
891 return None;
892 },
893
894 HotShotEvent::ViewSyncTimeout(round, relay, last_seen_certificate) => {
895 let round = *round;
896 if TYPES::View::new(*round) == self.next_view && *relay == self.relay {
898 if let Some(timeout_task) = self.timeout_task.take() {
899 timeout_task.abort();
900 }
901 self.relay += 1;
902 match last_seen_certificate {
903 ViewSyncPhase::None | ViewSyncPhase::PreCommit | ViewSyncPhase::Commit => {
904 let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
905 ViewSyncPreCommitData2 {
906 relay: self.relay,
907 round: self.next_view,
908 epoch: self.cur_epoch,
909 },
910 self.next_view,
911 &self.public_key,
912 &self.private_key,
913 &self.upgrade_lock,
914 )
915 .await
916 else {
917 tracing::error!("Failed to sign ViewSyncPreCommitData!");
918 return None;
919 };
920
921 broadcast_event(
922 Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
923 &event_stream,
924 )
925 .await;
926 },
927 ViewSyncPhase::Finalize => {
928 unimplemented!()
930 },
931 }
932
933 self.timeout_task = Some(spawn({
934 let stream = event_stream.clone();
935 let relay = self.relay;
936 let next_view = self.next_view;
937 let timeout = self.view_sync_timeout;
938 let last_cert = last_seen_certificate.clone();
939 async move {
940 sleep(timeout).await;
941 tracing::warn!(
942 "Vote sending timed out in ViewSyncTimeout relay = {relay}"
943 );
944 broadcast_event(
945 Arc::new(HotShotEvent::ViewSyncTimeout(
946 TYPES::View::new(*next_view),
947 relay,
948 last_cert,
949 )),
950 &stream,
951 )
952 .await;
953 }
954 }));
955
956 return None;
957 }
958 },
959 _ => return None,
960 }
961 None
962 }
963
964 pub async fn membership_for_epoch(
965 &self,
966 epoch: Option<TYPES::Epoch>,
967 ) -> Option<EpochMembership<TYPES>> {
968 match self
969 .membership_coordinator
970 .membership_for_epoch(epoch)
971 .await
972 {
973 Ok(m) => Some(m),
974 Err(e) => {
975 tracing::warn!(e.message);
976 None
977 },
978 }
979 }
980}