hotshot_task_impls/
view_sync.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap},
9    fmt::Debug,
10    sync::Arc,
11    time::Duration,
12};
13
14use async_broadcast::{Receiver, Sender};
15use async_lock::RwLock;
16use async_trait::async_trait;
17use hotshot_task::task::TaskState;
18use hotshot_types::{
19    epoch_membership::{EpochMembership, EpochMembershipCoordinator},
20    message::UpgradeLock,
21    simple_certificate::{
22        ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
23    },
24    simple_vote::{
25        HasEpoch, ViewSyncCommitData2, ViewSyncCommitVote2, ViewSyncFinalizeData2,
26        ViewSyncFinalizeVote2, ViewSyncPreCommitData2, ViewSyncPreCommitVote2,
27    },
28    stake_table::StakeTableEntries,
29    traits::{
30        node_implementation::{ConsensusTime, NodeType, Versions},
31        signature_key::SignatureKey,
32    },
33    utils::EpochTransitionIndicator,
34    vote::{Certificate, HasViewNumber, Vote},
35};
36use hotshot_utils::anytrace::*;
37use tokio::{spawn, task::JoinHandle, time::sleep};
38use tracing::instrument;
39
40use crate::{
41    events::{HotShotEvent, HotShotTaskCompleted},
42    helpers::{broadcast_event, broadcast_view_change},
43    vote_collection::{
44        create_vote_accumulator, AccumulatorInfo, HandleVoteEvent, VoteCollectionTaskState,
45    },
46};
47
48#[derive(PartialEq, PartialOrd, Clone, Debug, Eq, Hash)]
49/// Phases of view sync
50pub enum ViewSyncPhase {
51    /// No phase; before the protocol has begun
52    None,
53    /// PreCommit phase
54    PreCommit,
55    /// Commit phase
56    Commit,
57    /// Finalize phase
58    Finalize,
59}
60
61/// Type alias for a map from View Number to Relay to Vote Task
62type RelayMap<TYPES, VOTE, CERT, V> = HashMap<
63    (
64        <TYPES as NodeType>::View,
65        Option<<TYPES as NodeType>::Epoch>,
66    ),
67    BTreeMap<u64, VoteCollectionTaskState<TYPES, VOTE, CERT, V>>,
68>;
69
70type ReplicaTaskMap<TYPES, V> = HashMap<
71    (
72        <TYPES as NodeType>::View,
73        Option<<TYPES as NodeType>::Epoch>,
74    ),
75    ViewSyncReplicaTaskState<TYPES, V>,
76>;
77
78/// Main view sync task state
79pub struct ViewSyncTaskState<TYPES: NodeType, V: Versions> {
80    /// View HotShot is currently in
81    pub cur_view: TYPES::View,
82
83    /// View HotShot wishes to be in
84    pub next_view: TYPES::View,
85
86    /// Epoch HotShot is currently in
87    pub cur_epoch: Option<TYPES::Epoch>,
88
89    /// Membership for the quorum
90    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
91
92    /// This Nodes Public Key
93    pub public_key: TYPES::SignatureKey,
94
95    /// Our Private Key
96    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
97
98    /// Our node id; for logging
99    pub id: u64,
100
101    /// How many timeouts we've seen in a row; is reset upon a successful view change
102    pub num_timeouts_tracked: u64,
103
104    /// Map of running replica tasks
105    pub replica_task_map: RwLock<ReplicaTaskMap<TYPES, V>>,
106
107    /// Map of pre-commit vote accumulates for the relay
108    pub pre_commit_relay_map: RwLock<
109        RelayMap<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>, V>,
110    >,
111
112    /// Map of commit vote accumulates for the relay
113    pub commit_relay_map:
114        RwLock<RelayMap<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>, V>>,
115
116    /// Map of finalize vote accumulates for the relay
117    pub finalize_relay_map: RwLock<
118        RelayMap<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>, V>,
119    >,
120
121    /// Timeout duration for view sync rounds
122    pub view_sync_timeout: Duration,
123
124    /// Last view we garbage collected old tasks
125    pub last_garbage_collected_view: TYPES::View,
126
127    /// Lock for a decided upgrade
128    pub upgrade_lock: UpgradeLock<TYPES, V>,
129
130    /// First view in which epoch version takes effect
131    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
132}
133
134#[async_trait]
135impl<TYPES: NodeType, V: Versions> TaskState for ViewSyncTaskState<TYPES, V> {
136    type Event = HotShotEvent<TYPES>;
137
138    async fn handle_event(
139        &mut self,
140        event: Arc<Self::Event>,
141        sender: &Sender<Arc<Self::Event>>,
142        _receiver: &Receiver<Arc<Self::Event>>,
143    ) -> Result<()> {
144        self.handle(event, sender.clone()).await
145    }
146
147    fn cancel_subtasks(&mut self) {}
148}
149
150/// State of a view sync replica task
151pub struct ViewSyncReplicaTaskState<TYPES: NodeType, V: Versions> {
152    /// Timeout for view sync rounds
153    pub view_sync_timeout: Duration,
154
155    /// Current round HotShot is in
156    pub cur_view: TYPES::View,
157
158    /// Round HotShot wishes to be in
159    pub next_view: TYPES::View,
160
161    /// The relay index we are currently on
162    pub relay: u64,
163
164    /// Whether we have seen a finalized certificate
165    pub finalized: bool,
166
167    /// Whether we have already sent a view change event for `next_view`
168    pub sent_view_change_event: bool,
169
170    /// Timeout task handle, when it expires we try the next relay
171    pub timeout_task: Option<JoinHandle<()>>,
172
173    /// Our node id; for logging
174    pub id: u64,
175
176    /// Membership for the quorum
177    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
178
179    /// This Nodes Public Key
180    pub public_key: TYPES::SignatureKey,
181
182    /// Our Private Key
183    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
184
185    /// Lock for a decided upgrade
186    pub upgrade_lock: UpgradeLock<TYPES, V>,
187
188    /// Epoch HotShot was in when this task was created
189    pub cur_epoch: Option<TYPES::Epoch>,
190
191    /// First view in which epoch version takes effect
192    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
193}
194
195#[async_trait]
196impl<TYPES: NodeType, V: Versions> TaskState for ViewSyncReplicaTaskState<TYPES, V> {
197    type Event = HotShotEvent<TYPES>;
198
199    async fn handle_event(
200        &mut self,
201        event: Arc<Self::Event>,
202        sender: &Sender<Arc<Self::Event>>,
203        _receiver: &Receiver<Arc<Self::Event>>,
204    ) -> Result<()> {
205        self.handle(event, sender.clone()).await;
206
207        Ok(())
208    }
209
210    fn cancel_subtasks(&mut self) {}
211}
212
213impl<TYPES: NodeType, V: Versions> ViewSyncTaskState<TYPES, V> {
214    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "View Sync Main Task", level = "error")]
215    #[allow(clippy::type_complexity)]
216    /// Handles incoming events for the main view sync task
217    pub async fn send_to_or_create_replica(
218        &mut self,
219        event: Arc<HotShotEvent<TYPES>>,
220        view: TYPES::View,
221        epoch: Option<TYPES::Epoch>,
222        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
223    ) {
224        // This certificate is old, we can throw it away
225        // If next view = cert round, then that means we should already have a task running for it
226        if self.cur_view > view {
227            tracing::debug!("Already in a higher view than the view sync message");
228            return;
229        }
230
231        let mut task_map = self.replica_task_map.write().await;
232
233        if let Some(replica_task) = task_map.get_mut(&(view, epoch)) {
234            // Forward event then return
235            tracing::debug!("Forwarding message");
236            let result = replica_task
237                .handle(Arc::clone(&event), sender.clone())
238                .await;
239
240            if result == Some(HotShotTaskCompleted) {
241                // The protocol has finished
242                task_map.remove(&(view, epoch));
243                return;
244            }
245
246            return;
247        }
248
249        // We do not have a replica task already running, so start one
250        let mut replica_state: ViewSyncReplicaTaskState<TYPES, V> = ViewSyncReplicaTaskState {
251            cur_view: view,
252            next_view: view,
253            relay: 0,
254            finalized: false,
255            sent_view_change_event: false,
256            timeout_task: None,
257            membership_coordinator: self.membership_coordinator.clone(),
258            public_key: self.public_key.clone(),
259            private_key: self.private_key.clone(),
260            view_sync_timeout: self.view_sync_timeout,
261            id: self.id,
262            upgrade_lock: self.upgrade_lock.clone(),
263            cur_epoch: self.cur_epoch,
264            first_epoch: self.first_epoch,
265        };
266
267        let result = replica_state
268            .handle(Arc::clone(&event), sender.clone())
269            .await;
270
271        if result == Some(HotShotTaskCompleted) {
272            // The protocol has finished
273            return;
274        }
275
276        task_map.insert((view, epoch), replica_state);
277    }
278
279    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Main Task", level = "error")]
280    #[allow(clippy::type_complexity)]
281    /// Handles incoming events for the main view sync task
282    pub async fn handle(
283        &mut self,
284        event: Arc<HotShotEvent<TYPES>>,
285        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
286    ) -> Result<()> {
287        match event.as_ref() {
288            HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
289                tracing::debug!("Received view sync cert for phase {certificate:?}");
290                let view = certificate.view_number();
291                self.send_to_or_create_replica(
292                    Arc::clone(&event),
293                    view,
294                    certificate.epoch(),
295                    &event_stream,
296                )
297                .await;
298            },
299            HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
300                tracing::debug!("Received view sync cert for phase {certificate:?}");
301                let view = certificate.view_number();
302                self.send_to_or_create_replica(
303                    Arc::clone(&event),
304                    view,
305                    certificate.epoch(),
306                    &event_stream,
307                )
308                .await;
309            },
310            HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
311                tracing::debug!("Received view sync cert for phase {certificate:?}");
312                let view = certificate.view_number();
313                self.send_to_or_create_replica(
314                    Arc::clone(&event),
315                    view,
316                    certificate.epoch(),
317                    &event_stream,
318                )
319                .await;
320            },
321            HotShotEvent::ViewSyncTimeout(view, ..) => {
322                tracing::debug!("view sync timeout in main task {view:?}");
323                let view = *view;
324                self.send_to_or_create_replica(
325                    Arc::clone(&event),
326                    view,
327                    self.cur_epoch,
328                    &event_stream,
329                )
330                .await;
331            },
332
333            HotShotEvent::ViewSyncPreCommitVoteRecv(ref vote) => {
334                let mut map = self.pre_commit_relay_map.write().await;
335                let vote_view = vote.view_number();
336                let relay = vote.date().relay;
337                let relay_map = map
338                    .entry((vote_view, vote.date().epoch))
339                    .or_insert(BTreeMap::new());
340                if let Some(relay_task) = relay_map.get_mut(&relay) {
341                    tracing::debug!("Forwarding message");
342
343                    // Handle the vote and check if the accumulator has returned successfully
344                    if relay_task
345                        .handle_vote_event(Arc::clone(&event), &event_stream)
346                        .await?
347                        .is_some()
348                    {
349                        map.remove(&(vote_view, vote.date().epoch));
350                    }
351
352                    return Ok(());
353                }
354
355                let epoch_mem = self
356                    .membership_coordinator
357                    .membership_for_epoch(vote.date().epoch)
358                    .await?;
359                // We do not have a relay task already running, so start one
360                ensure!(
361                    epoch_mem.leader(vote_view + relay).await? == self.public_key,
362                    "View sync vote sent to wrong leader"
363                );
364
365                let info = AccumulatorInfo {
366                    public_key: self.public_key.clone(),
367                    membership: epoch_mem,
368                    view: vote_view,
369                    id: self.id,
370                };
371                let vote_collector = create_vote_accumulator(
372                    &info,
373                    event,
374                    &event_stream,
375                    self.upgrade_lock.clone(),
376                    EpochTransitionIndicator::NotInTransition,
377                )
378                .await?;
379
380                relay_map.insert(relay, vote_collector);
381            },
382
383            HotShotEvent::ViewSyncCommitVoteRecv(ref vote) => {
384                let mut map = self.commit_relay_map.write().await;
385                let vote_view = vote.view_number();
386                let relay = vote.date().relay;
387                let relay_map = map
388                    .entry((vote_view, vote.date().epoch))
389                    .or_insert(BTreeMap::new());
390                if let Some(relay_task) = relay_map.get_mut(&relay) {
391                    tracing::debug!("Forwarding message");
392
393                    // Handle the vote and check if the accumulator has returned successfully
394                    if relay_task
395                        .handle_vote_event(Arc::clone(&event), &event_stream)
396                        .await?
397                        .is_some()
398                    {
399                        map.remove(&(vote_view, vote.date().epoch));
400                    }
401
402                    return Ok(());
403                }
404
405                // We do not have a relay task already running, so start one
406                let epoch_mem = self
407                    .membership_coordinator
408                    .membership_for_epoch(vote.date().epoch)
409                    .await?;
410                ensure!(
411                    epoch_mem.leader(vote_view + relay).await? == self.public_key,
412                    debug!("View sync vote sent to wrong leader")
413                );
414
415                let info = AccumulatorInfo {
416                    public_key: self.public_key.clone(),
417                    membership: epoch_mem,
418                    view: vote_view,
419                    id: self.id,
420                };
421
422                let vote_collector = create_vote_accumulator(
423                    &info,
424                    event,
425                    &event_stream,
426                    self.upgrade_lock.clone(),
427                    EpochTransitionIndicator::NotInTransition,
428                )
429                .await?;
430                relay_map.insert(relay, vote_collector);
431            },
432
433            HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
434                let mut map = self.finalize_relay_map.write().await;
435                let vote_view = vote.view_number();
436                let relay = vote.date().relay;
437                let relay_map = map
438                    .entry((vote_view, vote.date().epoch))
439                    .or_insert(BTreeMap::new());
440                if let Some(relay_task) = relay_map.get_mut(&relay) {
441                    tracing::debug!("Forwarding message");
442
443                    // Handle the vote and check if the accumulator has returned successfully
444                    if relay_task
445                        .handle_vote_event(Arc::clone(&event), &event_stream)
446                        .await?
447                        .is_some()
448                    {
449                        map.remove(&(vote_view, vote.date().epoch));
450                    }
451
452                    return Ok(());
453                }
454
455                let epoch_mem = self
456                    .membership_coordinator
457                    .membership_for_epoch(vote.date().epoch)
458                    .await?;
459                // We do not have a relay task already running, so start one
460                ensure!(
461                    epoch_mem.leader(vote_view + relay).await? == self.public_key,
462                    debug!("View sync vote sent to wrong leader")
463                );
464
465                let info = AccumulatorInfo {
466                    public_key: self.public_key.clone(),
467                    membership: epoch_mem,
468                    view: vote_view,
469                    id: self.id,
470                };
471                let vote_collector = create_vote_accumulator(
472                    &info,
473                    event,
474                    &event_stream,
475                    self.upgrade_lock.clone(),
476                    EpochTransitionIndicator::NotInTransition,
477                )
478                .await;
479                if let Ok(vote_task) = vote_collector {
480                    relay_map.insert(relay, vote_task);
481                }
482            },
483
484            &HotShotEvent::ViewChange(new_view, epoch) => {
485                if epoch > self.cur_epoch {
486                    self.cur_epoch = epoch;
487                }
488                let new_view = TYPES::View::new(*new_view);
489                if self.cur_view < new_view {
490                    tracing::debug!(
491                        "Change from view {} to view {} in view sync task",
492                        *self.cur_view,
493                        *new_view
494                    );
495
496                    self.cur_view = new_view;
497                    self.next_view = self.cur_view;
498                    self.num_timeouts_tracked = 0;
499
500                    // Garbage collect old tasks
501                    // We could put this into a separate async task, but that would require making several fields on ViewSyncTaskState thread-safe and harm readability.  In the common case this will have zero tasks to clean up.
502                    // run GC
503                    for i in *self.last_garbage_collected_view..*self.cur_view {
504                        self.replica_task_map
505                            .write()
506                            .await
507                            .remove_entry(&(TYPES::View::new(i), epoch));
508                        self.pre_commit_relay_map
509                            .write()
510                            .await
511                            .remove_entry(&(TYPES::View::new(i), epoch));
512                        self.commit_relay_map
513                            .write()
514                            .await
515                            .remove_entry(&(TYPES::View::new(i), epoch));
516                        self.finalize_relay_map
517                            .write()
518                            .await
519                            .remove_entry(&(TYPES::View::new(i), epoch));
520                    }
521
522                    self.last_garbage_collected_view = self.cur_view - 1;
523                }
524            },
525            &HotShotEvent::Timeout(view_number, ..) => {
526                // This is an old timeout and we can ignore it
527                ensure!(
528                    view_number >= self.cur_view,
529                    debug!("Discarding old timeout vote.")
530                );
531
532                self.num_timeouts_tracked += 1;
533                let leader = self
534                    .membership_coordinator
535                    .membership_for_epoch(self.cur_epoch)
536                    .await?
537                    .leader(view_number)
538                    .await?;
539                tracing::warn!(
540                    %leader,
541                    leader_mnemonic = hotshot_types::utils::mnemonic(&leader),
542                    view_number = *view_number,
543                    num_timeouts_tracked = self.num_timeouts_tracked,
544                    "view timed out",
545                );
546
547                if self.num_timeouts_tracked >= 3 {
548                    tracing::error!("Too many consecutive timeouts!  This shouldn't happen");
549                }
550
551                if self.num_timeouts_tracked >= 2 {
552                    tracing::error!("Starting view sync protocol for view {}", *view_number + 1);
553
554                    self.send_to_or_create_replica(
555                        Arc::new(HotShotEvent::ViewSyncTrigger(view_number + 1)),
556                        view_number + 1,
557                        self.cur_epoch,
558                        &event_stream,
559                    )
560                    .await;
561                } else {
562                    // If this is the first timeout we've seen advance to the next view
563                    self.cur_view = view_number + 1;
564                    broadcast_view_change(
565                        &event_stream,
566                        self.cur_view,
567                        self.cur_epoch,
568                        self.first_epoch,
569                    )
570                    .await;
571                }
572            },
573            HotShotEvent::SetFirstEpoch(view, epoch) => {
574                self.first_epoch = Some((*view, *epoch));
575            },
576
577            _ => {},
578        }
579        Ok(())
580    }
581}
582
583impl<TYPES: NodeType, V: Versions> ViewSyncReplicaTaskState<TYPES, V> {
584    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Replica Task", level = "error")]
585    /// Handle incoming events for the view sync replica task
586    pub async fn handle(
587        &mut self,
588        event: Arc<HotShotEvent<TYPES>>,
589        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
590    ) -> Option<HotShotTaskCompleted> {
591        match event.as_ref() {
592            HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
593                let last_seen_certificate = ViewSyncPhase::PreCommit;
594
595                // Ignore certificate if it is for an older round
596                if certificate.view_number() < self.next_view {
597                    tracing::warn!("We're already in a higher round");
598
599                    return None;
600                }
601
602                let membership = self.membership_for_epoch(certificate.epoch()).await?;
603                let membership_stake_table = membership.stake_table().await;
604                let membership_failure_threshold = membership.failure_threshold().await;
605
606                // If certificate is not valid, return current state
607                if let Err(e) = certificate
608                    .is_valid_cert(
609                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
610                        membership_failure_threshold,
611                        &self.upgrade_lock,
612                    )
613                    .await
614                {
615                    tracing::error!(
616                        "Not valid view sync cert! data: {:?}, error: {}",
617                        certificate.data(),
618                        e
619                    );
620
621                    return None;
622                }
623
624                // If certificate is for a higher round shutdown this task
625                // since another task should have been started for the higher round
626                if certificate.view_number() > self.next_view {
627                    return Some(HotShotTaskCompleted);
628                }
629
630                if certificate.data().relay > self.relay {
631                    self.relay = certificate.data().relay;
632                }
633
634                let Ok(vote) = ViewSyncCommitVote2::<TYPES>::create_signed_vote(
635                    ViewSyncCommitData2 {
636                        relay: certificate.data().relay,
637                        round: self.next_view,
638                        epoch: certificate.data().epoch,
639                    },
640                    self.next_view,
641                    &self.public_key,
642                    &self.private_key,
643                    &self.upgrade_lock,
644                )
645                .await
646                else {
647                    tracing::error!("Failed to sign ViewSyncCommitData!");
648                    return None;
649                };
650
651                broadcast_event(
652                    Arc::new(HotShotEvent::ViewSyncCommitVoteSend(vote)),
653                    &event_stream,
654                )
655                .await;
656
657                if let Some(timeout_task) = self.timeout_task.take() {
658                    timeout_task.abort();
659                }
660
661                self.timeout_task = Some(spawn({
662                    let stream = event_stream.clone();
663                    let phase = last_seen_certificate;
664                    let relay = self.relay;
665                    let next_view = self.next_view;
666                    let timeout = self.view_sync_timeout;
667                    async move {
668                        sleep(timeout).await;
669                        tracing::warn!("Vote sending timed out in ViewSyncPreCommitCertificateRecv, Relay = {relay}");
670
671                        broadcast_event(
672                            Arc::new(HotShotEvent::ViewSyncTimeout(
673                                TYPES::View::new(*next_view),
674                                relay,
675                                phase,
676                            )),
677                            &stream,
678                        )
679                        .await;
680                    }
681                }));
682            },
683
684            HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
685                let last_seen_certificate = ViewSyncPhase::Commit;
686
687                // Ignore certificate if it is for an older round
688                if certificate.view_number() < self.next_view {
689                    tracing::warn!("We're already in a higher round");
690
691                    return None;
692                }
693
694                let membership = self.membership_for_epoch(certificate.epoch()).await?;
695                let membership_stake_table = membership.stake_table().await;
696                let membership_success_threshold = membership.success_threshold().await;
697
698                // If certificate is not valid, return current state
699                if let Err(e) = certificate
700                    .is_valid_cert(
701                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
702                        membership_success_threshold,
703                        &self.upgrade_lock,
704                    )
705                    .await
706                {
707                    tracing::error!(
708                        "Not valid view sync cert! data: {:?}, error: {}",
709                        certificate.data(),
710                        e
711                    );
712
713                    return None;
714                }
715
716                // If certificate is for a higher round shutdown this task
717                // since another task should have been started for the higher round
718                if certificate.view_number() > self.next_view {
719                    return Some(HotShotTaskCompleted);
720                }
721
722                if certificate.data().relay > self.relay {
723                    self.relay = certificate.data().relay;
724                }
725
726                let Ok(vote) = ViewSyncFinalizeVote2::<TYPES>::create_signed_vote(
727                    ViewSyncFinalizeData2 {
728                        relay: certificate.data().relay,
729                        round: self.next_view,
730                        epoch: certificate.data().epoch,
731                    },
732                    self.next_view,
733                    &self.public_key,
734                    &self.private_key,
735                    &self.upgrade_lock,
736                )
737                .await
738                else {
739                    tracing::error!("Failed to sign view sync finalized vote!");
740                    return None;
741                };
742
743                broadcast_event(
744                    Arc::new(HotShotEvent::ViewSyncFinalizeVoteSend(vote)),
745                    &event_stream,
746                )
747                .await;
748
749                tracing::info!(
750                    "View sync protocol has received view sync evidence to update the view to {}",
751                    *self.next_view
752                );
753
754                broadcast_view_change(
755                    &event_stream,
756                    self.next_view,
757                    certificate.epoch(),
758                    self.first_epoch,
759                )
760                .await;
761
762                if let Some(timeout_task) = self.timeout_task.take() {
763                    timeout_task.abort();
764                }
765                self.timeout_task = Some(spawn({
766                    let stream = event_stream.clone();
767                    let phase = last_seen_certificate;
768                    let relay = self.relay;
769                    let next_view = self.next_view;
770                    let timeout = self.view_sync_timeout;
771                    async move {
772                        sleep(timeout).await;
773                        tracing::warn!("Vote sending timed out in ViewSyncCommitCertificateRecv, relay = {relay}");
774                        broadcast_event(
775                            Arc::new(HotShotEvent::ViewSyncTimeout(
776                                TYPES::View::new(*next_view),
777                                relay,
778                                phase,
779                            )),
780                            &stream,
781                        )
782                        .await;
783                    }
784                }));
785            },
786
787            HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
788                // Ignore certificate if it is for an older round
789                if certificate.view_number() < self.next_view {
790                    tracing::warn!("We're already in a higher round");
791
792                    return None;
793                }
794
795                let membership = self.membership_for_epoch(certificate.epoch()).await?;
796                let membership_stake_table = membership.stake_table().await;
797                let membership_success_threshold = membership.success_threshold().await;
798
799                // If certificate is not valid, return current state
800                if let Err(e) = certificate
801                    .is_valid_cert(
802                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
803                        membership_success_threshold,
804                        &self.upgrade_lock,
805                    )
806                    .await
807                {
808                    tracing::error!(
809                        "Not valid view sync cert! data: {:?}, error: {}",
810                        certificate.data(),
811                        e
812                    );
813
814                    return None;
815                }
816
817                // If certificate is for a higher round shutdown this task
818                // since another task should have been started for the higher round
819                if certificate.view_number() > self.next_view {
820                    return Some(HotShotTaskCompleted);
821                }
822
823                if certificate.data().relay > self.relay {
824                    self.relay = certificate.data().relay;
825                }
826
827                if let Some(timeout_task) = self.timeout_task.take() {
828                    timeout_task.abort();
829                }
830
831                broadcast_view_change(
832                    &event_stream,
833                    self.next_view,
834                    certificate.epoch(),
835                    self.first_epoch,
836                )
837                .await;
838                return Some(HotShotTaskCompleted);
839            },
840
841            HotShotEvent::ViewSyncTrigger(view_number) => {
842                let view_number = *view_number;
843                if self.next_view != TYPES::View::new(*view_number) {
844                    tracing::error!("Unexpected view number to trigger view sync");
845                    return None;
846                }
847
848                let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
849                    ViewSyncPreCommitData2 {
850                        relay: 0,
851                        round: view_number,
852                        epoch: self.cur_epoch,
853                    },
854                    view_number,
855                    &self.public_key,
856                    &self.private_key,
857                    &self.upgrade_lock,
858                )
859                .await
860                else {
861                    tracing::error!("Failed to sign pre commit vote!");
862                    return None;
863                };
864
865                broadcast_event(
866                    Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
867                    &event_stream,
868                )
869                .await;
870
871                self.timeout_task = Some(spawn({
872                    let stream = event_stream.clone();
873                    let relay = self.relay;
874                    let next_view = self.next_view;
875                    let timeout = self.view_sync_timeout;
876                    async move {
877                        sleep(timeout).await;
878                        tracing::warn!("Vote sending timed out in ViewSyncTrigger");
879                        broadcast_event(
880                            Arc::new(HotShotEvent::ViewSyncTimeout(
881                                TYPES::View::new(*next_view),
882                                relay,
883                                ViewSyncPhase::None,
884                            )),
885                            &stream,
886                        )
887                        .await;
888                    }
889                }));
890
891                return None;
892            },
893
894            HotShotEvent::ViewSyncTimeout(round, relay, last_seen_certificate) => {
895                let round = *round;
896                // Shouldn't ever receive a timeout for a relay higher than ours
897                if TYPES::View::new(*round) == self.next_view && *relay == self.relay {
898                    if let Some(timeout_task) = self.timeout_task.take() {
899                        timeout_task.abort();
900                    }
901                    self.relay += 1;
902                    match last_seen_certificate {
903                        ViewSyncPhase::None | ViewSyncPhase::PreCommit | ViewSyncPhase::Commit => {
904                            let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
905                                ViewSyncPreCommitData2 {
906                                    relay: self.relay,
907                                    round: self.next_view,
908                                    epoch: self.cur_epoch,
909                                },
910                                self.next_view,
911                                &self.public_key,
912                                &self.private_key,
913                                &self.upgrade_lock,
914                            )
915                            .await
916                            else {
917                                tracing::error!("Failed to sign ViewSyncPreCommitData!");
918                                return None;
919                            };
920
921                            broadcast_event(
922                                Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
923                                &event_stream,
924                            )
925                            .await;
926                        },
927                        ViewSyncPhase::Finalize => {
928                            // This should never occur
929                            unimplemented!()
930                        },
931                    }
932
933                    self.timeout_task = Some(spawn({
934                        let stream = event_stream.clone();
935                        let relay = self.relay;
936                        let next_view = self.next_view;
937                        let timeout = self.view_sync_timeout;
938                        let last_cert = last_seen_certificate.clone();
939                        async move {
940                            sleep(timeout).await;
941                            tracing::warn!(
942                                "Vote sending timed out in ViewSyncTimeout relay = {relay}"
943                            );
944                            broadcast_event(
945                                Arc::new(HotShotEvent::ViewSyncTimeout(
946                                    TYPES::View::new(*next_view),
947                                    relay,
948                                    last_cert,
949                                )),
950                                &stream,
951                            )
952                            .await;
953                        }
954                    }));
955
956                    return None;
957                }
958            },
959            _ => return None,
960        }
961        None
962    }
963
964    pub async fn membership_for_epoch(
965        &self,
966        epoch: Option<TYPES::Epoch>,
967    ) -> Option<EpochMembership<TYPES>> {
968        match self
969            .membership_coordinator
970            .membership_for_epoch(epoch)
971            .await
972        {
973            Ok(m) => Some(m),
974            Err(e) => {
975                tracing::warn!(e.message);
976                None
977            },
978        }
979    }
980}