hotshot_task_impls/
view_sync.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration};
8
9use async_broadcast::{Receiver, Sender};
10use async_lock::RwLock;
11use async_trait::async_trait;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14    data::{EpochNumber, ViewNumber},
15    epoch_membership::{EpochMembership, EpochMembershipCoordinator},
16    message::UpgradeLock,
17    simple_certificate::{
18        ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
19    },
20    simple_vote::{
21        HasEpoch, ViewSyncCommitData2, ViewSyncCommitVote2, ViewSyncFinalizeData2,
22        ViewSyncFinalizeVote2, ViewSyncPreCommitData2, ViewSyncPreCommitVote2,
23    },
24    stake_table::StakeTableEntries,
25    traits::{node_implementation::NodeType, signature_key::SignatureKey},
26    utils::EpochTransitionIndicator,
27    vote::{Certificate, HasViewNumber, Vote},
28};
29use hotshot_utils::anytrace::*;
30use tokio::{spawn, task::JoinHandle, time::sleep};
31use tracing::instrument;
32
33use crate::{
34    events::{HotShotEvent, HotShotTaskCompleted},
35    helpers::{broadcast_event, broadcast_view_change},
36    vote_collection::{
37        AccumulatorInfo, HandleVoteEvent, VoteCollectionTaskState, create_vote_accumulator,
38    },
39};
40
41#[derive(PartialEq, PartialOrd, Clone, Debug, Eq, Hash)]
42/// Phases of view sync
43pub enum ViewSyncPhase {
44    /// No phase; before the protocol has begun
45    None,
46    /// PreCommit phase
47    PreCommit,
48    /// Commit phase
49    Commit,
50    /// Finalize phase
51    Finalize,
52}
53
54type TaskMap<VAL> = BTreeMap<Option<EpochNumber>, BTreeMap<ViewNumber, VAL>>;
55
56/// Type alias for a map from View Number to Relay to Vote Task
57type RelayMap<TYPES, VOTE, CERT> =
58    TaskMap<BTreeMap<u64, VoteCollectionTaskState<TYPES, VOTE, CERT>>>;
59
60type ReplicaTaskMap<TYPES> = TaskMap<ViewSyncReplicaTaskState<TYPES>>;
61
62/// Main view sync task state
63pub struct ViewSyncTaskState<TYPES: NodeType> {
64    /// View HotShot is currently in
65    pub cur_view: ViewNumber,
66
67    /// View HotShot wishes to be in
68    pub next_view: ViewNumber,
69
70    /// Epoch HotShot is currently in
71    pub cur_epoch: Option<EpochNumber>,
72
73    /// Membership for the quorum
74    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
75
76    /// This Nodes Public Key
77    pub public_key: TYPES::SignatureKey,
78
79    /// Our Private Key
80    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
81
82    /// Our node id; for logging
83    pub id: u64,
84
85    /// How many timeouts we've seen in a row; is reset upon a successful view change
86    pub num_timeouts_tracked: u64,
87
88    /// Map of running replica tasks
89    pub replica_task_map: RwLock<ReplicaTaskMap<TYPES>>,
90
91    /// Map of pre-commit vote accumulates for the relay
92    pub pre_commit_relay_map: RwLock<
93        RelayMap<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>,
94    >,
95
96    /// Map of commit vote accumulates for the relay
97    pub commit_relay_map:
98        RwLock<RelayMap<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>>,
99
100    /// Map of finalize vote accumulates for the relay
101    pub finalize_relay_map:
102        RwLock<RelayMap<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>>,
103
104    /// Timeout duration for view sync rounds
105    pub view_sync_timeout: Duration,
106
107    /// Last view we garbage collected old tasks
108    pub last_garbage_collected_view: ViewNumber,
109
110    /// Lock for a decided upgrade
111    pub upgrade_lock: UpgradeLock<TYPES>,
112
113    /// First view in which epoch version takes effect
114    pub first_epoch: Option<(ViewNumber, EpochNumber)>,
115
116    /// Keeps track of the highest finalized view and epoch, used for garbage collection
117    pub highest_finalized_epoch_view: (Option<EpochNumber>, ViewNumber),
118
119    pub epoch_height: u64,
120}
121
122#[async_trait]
123impl<TYPES: NodeType> TaskState for ViewSyncTaskState<TYPES> {
124    type Event = HotShotEvent<TYPES>;
125
126    async fn handle_event(
127        &mut self,
128        event: Arc<Self::Event>,
129        sender: &Sender<Arc<Self::Event>>,
130        _receiver: &Receiver<Arc<Self::Event>>,
131    ) -> Result<()> {
132        self.handle(event, sender.clone()).await
133    }
134
135    fn cancel_subtasks(&mut self) {}
136}
137
138/// State of a view sync replica task
139pub struct ViewSyncReplicaTaskState<TYPES: NodeType> {
140    /// Timeout for view sync rounds
141    pub view_sync_timeout: Duration,
142
143    /// Current round HotShot is in
144    pub cur_view: ViewNumber,
145
146    /// Round HotShot wishes to be in
147    pub next_view: ViewNumber,
148
149    /// The relay index we are currently on
150    pub relay: u64,
151
152    /// Whether we have seen a finalized certificate
153    pub finalized: bool,
154
155    /// Whether we have already sent a view change event for `next_view`
156    pub sent_view_change_event: bool,
157
158    /// Timeout task handle, when it expires we try the next relay
159    pub timeout_task: Option<JoinHandle<()>>,
160
161    /// Our node id; for logging
162    pub id: u64,
163
164    /// Membership for the quorum
165    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
166
167    /// This Nodes Public Key
168    pub public_key: TYPES::SignatureKey,
169
170    /// Our Private Key
171    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
172
173    /// Lock for a decided upgrade
174    pub upgrade_lock: UpgradeLock<TYPES>,
175
176    /// Epoch HotShot was in when this task was created
177    pub cur_epoch: Option<EpochNumber>,
178
179    /// First view in which epoch version takes effect
180    pub first_epoch: Option<(ViewNumber, EpochNumber)>,
181}
182
183#[async_trait]
184impl<TYPES: NodeType> TaskState for ViewSyncReplicaTaskState<TYPES> {
185    type Event = HotShotEvent<TYPES>;
186
187    async fn handle_event(
188        &mut self,
189        event: Arc<Self::Event>,
190        sender: &Sender<Arc<Self::Event>>,
191        _receiver: &Receiver<Arc<Self::Event>>,
192    ) -> Result<()> {
193        self.handle(event, sender.clone()).await;
194
195        Ok(())
196    }
197
198    fn cancel_subtasks(&mut self) {}
199}
200
201impl<TYPES: NodeType> ViewSyncTaskState<TYPES> {
202    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "View Sync Main Task", level = "error")]
203    #[allow(clippy::type_complexity)]
204    /// Handles incoming events for the main view sync task
205    pub async fn send_to_or_create_replica(
206        &mut self,
207        event: Arc<HotShotEvent<TYPES>>,
208        view: ViewNumber,
209        epoch: Option<EpochNumber>,
210        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
211    ) {
212        let mut task_map = self.replica_task_map.write().await;
213
214        if let Some(replica_task) = task_map.get_mut(&epoch).and_then(|x| x.get_mut(&view)) {
215            // Forward event then return
216            tracing::debug!("Forwarding message");
217            let result = replica_task
218                .handle(Arc::clone(&event), sender.clone())
219                .await;
220
221            if result == Some(HotShotTaskCompleted) {
222                // The protocol has finished
223                if epoch >= self.highest_finalized_epoch_view.0
224                    && view > self.highest_finalized_epoch_view.1
225                {
226                    self.highest_finalized_epoch_view = (epoch, view);
227                } else if view > self.highest_finalized_epoch_view.1 {
228                    tracing::error!(
229                        "We finalized a higher view but the epoch is lower. This should never \
230                         happen. Current highest finalized epoch view: {:?}, new highest \
231                         finalized epoch view: {:?}",
232                        self.highest_finalized_epoch_view,
233                        (epoch, view)
234                    );
235                }
236                task_map.get_mut(&epoch).and_then(|x| x.remove(&view));
237                task_map.retain(|_, x| !x.is_empty());
238                drop(task_map);
239
240                // Garbage collect old tasks
241                self.garbage_collect_tasks().await;
242                return;
243            }
244
245            return;
246        }
247
248        // We do not have a replica task already running, so start one
249        let mut replica_state: ViewSyncReplicaTaskState<TYPES> = ViewSyncReplicaTaskState {
250            cur_view: view,
251            next_view: view,
252            relay: 0,
253            finalized: false,
254            sent_view_change_event: false,
255            timeout_task: None,
256            membership_coordinator: self.membership_coordinator.clone(),
257            public_key: self.public_key.clone(),
258            private_key: self.private_key.clone(),
259            view_sync_timeout: self.view_sync_timeout,
260            id: self.id,
261            upgrade_lock: self.upgrade_lock.clone(),
262            cur_epoch: self.cur_epoch,
263            first_epoch: self.first_epoch,
264        };
265
266        let result = replica_state
267            .handle(Arc::clone(&event), sender.clone())
268            .await;
269
270        if result == Some(HotShotTaskCompleted) {
271            // The protocol has finished
272            return;
273        }
274
275        task_map
276            .entry(epoch)
277            .or_default()
278            .insert(view, replica_state);
279    }
280
281    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Main Task", level = "error")]
282    #[allow(clippy::type_complexity)]
283    /// Handles incoming events for the main view sync task
284    pub async fn handle(
285        &mut self,
286        event: Arc<HotShotEvent<TYPES>>,
287        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
288    ) -> Result<()> {
289        match event.as_ref() {
290            HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
291                tracing::debug!("Received view sync cert for phase {certificate:?}");
292                let view = certificate.view_number();
293                self.send_to_or_create_replica(
294                    Arc::clone(&event),
295                    view,
296                    certificate.epoch(),
297                    &event_stream,
298                )
299                .await;
300            },
301            HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
302                tracing::debug!("Received view sync cert for phase {certificate:?}");
303                let view = certificate.view_number();
304                self.send_to_or_create_replica(
305                    Arc::clone(&event),
306                    view,
307                    certificate.epoch(),
308                    &event_stream,
309                )
310                .await;
311            },
312            HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
313                tracing::debug!("Received view sync cert for phase {certificate:?}");
314                let view = certificate.view_number();
315                self.send_to_or_create_replica(
316                    Arc::clone(&event),
317                    view,
318                    certificate.epoch(),
319                    &event_stream,
320                )
321                .await;
322            },
323            HotShotEvent::ViewSyncTimeout(view, ..) => {
324                tracing::debug!("view sync timeout in main task {view:?}");
325                let view = *view;
326                self.send_to_or_create_replica(
327                    Arc::clone(&event),
328                    view,
329                    self.cur_epoch,
330                    &event_stream,
331                )
332                .await;
333            },
334
335            HotShotEvent::ViewSyncPreCommitVoteRecv(vote) => {
336                let mut map = self.pre_commit_relay_map.write().await;
337                let vote_view = vote.view_number();
338                let relay = vote.date().relay;
339                let relay_map = map
340                    .entry(vote.date().epoch)
341                    .or_insert(BTreeMap::new())
342                    .entry(vote_view)
343                    .or_insert(BTreeMap::new());
344                if let Some(relay_task) = relay_map.get_mut(&relay) {
345                    tracing::debug!("Forwarding message");
346
347                    // Handle the vote and check if the accumulator has returned successfully
348                    if relay_task
349                        .handle_vote_event(Arc::clone(&event), &event_stream)
350                        .await?
351                        .is_some()
352                    {
353                        map.get_mut(&vote.date().epoch)
354                            .and_then(|x| x.remove(&vote_view));
355                        map.retain(|_, x| !x.is_empty());
356                    }
357
358                    return Ok(());
359                }
360
361                let epoch_mem = self
362                    .membership_coordinator
363                    .membership_for_epoch(vote.date().epoch)
364                    .await?;
365                // We do not have a relay task already running, so start one
366                ensure!(
367                    epoch_mem.leader(vote_view + relay).await? == self.public_key,
368                    "View sync vote sent to wrong leader"
369                );
370
371                let info = AccumulatorInfo {
372                    public_key: self.public_key.clone(),
373                    membership: epoch_mem,
374                    view: vote_view,
375                    id: self.id,
376                };
377                let vote_collector = create_vote_accumulator(
378                    &info,
379                    event,
380                    &event_stream,
381                    self.upgrade_lock.clone(),
382                    EpochTransitionIndicator::NotInTransition,
383                )
384                .await?;
385
386                relay_map.insert(relay, vote_collector);
387            },
388
389            HotShotEvent::ViewSyncCommitVoteRecv(vote) => {
390                let mut map = self.commit_relay_map.write().await;
391                let vote_view = vote.view_number();
392                let relay = vote.date().relay;
393                let relay_map = map
394                    .entry(vote.date().epoch)
395                    .or_insert(BTreeMap::new())
396                    .entry(vote_view)
397                    .or_insert(BTreeMap::new());
398                if let Some(relay_task) = relay_map.get_mut(&relay) {
399                    tracing::debug!("Forwarding message");
400
401                    // Handle the vote and check if the accumulator has returned successfully
402                    if relay_task
403                        .handle_vote_event(Arc::clone(&event), &event_stream)
404                        .await?
405                        .is_some()
406                    {
407                        map.get_mut(&vote.date().epoch)
408                            .and_then(|x| x.remove(&vote_view));
409                        map.retain(|_, x| !x.is_empty());
410                    }
411
412                    return Ok(());
413                }
414
415                // We do not have a relay task already running, so start one
416                let epoch_mem = self
417                    .membership_coordinator
418                    .membership_for_epoch(vote.date().epoch)
419                    .await?;
420                ensure!(
421                    epoch_mem.leader(vote_view + relay).await? == self.public_key,
422                    debug!("View sync vote sent to wrong leader")
423                );
424
425                let info = AccumulatorInfo {
426                    public_key: self.public_key.clone(),
427                    membership: epoch_mem,
428                    view: vote_view,
429                    id: self.id,
430                };
431
432                let vote_collector = create_vote_accumulator(
433                    &info,
434                    event,
435                    &event_stream,
436                    self.upgrade_lock.clone(),
437                    EpochTransitionIndicator::NotInTransition,
438                )
439                .await?;
440                relay_map.insert(relay, vote_collector);
441            },
442
443            HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
444                let mut map = self.finalize_relay_map.write().await;
445                let vote_view = vote.view_number();
446                let relay = vote.date().relay;
447                let relay_map = map
448                    .entry(vote.date().epoch)
449                    .or_insert(BTreeMap::new())
450                    .entry(vote_view)
451                    .or_insert(BTreeMap::new());
452                if let Some(relay_task) = relay_map.get_mut(&relay) {
453                    tracing::debug!("Forwarding message");
454
455                    // Handle the vote and check if the accumulator has returned successfully
456                    if relay_task
457                        .handle_vote_event(Arc::clone(&event), &event_stream)
458                        .await?
459                        .is_some()
460                    {
461                        map.get_mut(&vote.date().epoch)
462                            .and_then(|x| x.remove(&vote_view));
463                        map.retain(|_, x| !x.is_empty());
464                    }
465
466                    return Ok(());
467                }
468
469                let epoch_mem = self
470                    .membership_coordinator
471                    .membership_for_epoch(vote.date().epoch)
472                    .await?;
473                // We do not have a relay task already running, so start one
474                ensure!(
475                    epoch_mem.leader(vote_view + relay).await? == self.public_key,
476                    debug!("View sync vote sent to wrong leader")
477                );
478
479                let info = AccumulatorInfo {
480                    public_key: self.public_key.clone(),
481                    membership: epoch_mem,
482                    view: vote_view,
483                    id: self.id,
484                };
485                let vote_collector = create_vote_accumulator(
486                    &info,
487                    event,
488                    &event_stream,
489                    self.upgrade_lock.clone(),
490                    EpochTransitionIndicator::NotInTransition,
491                )
492                .await;
493                if let Ok(vote_task) = vote_collector {
494                    relay_map.insert(relay, vote_task);
495                }
496            },
497
498            &HotShotEvent::ViewChange(new_view, epoch) => {
499                if epoch > self.cur_epoch {
500                    self.cur_epoch = epoch;
501                }
502                let new_view = ViewNumber::new(*new_view);
503                if self.cur_view < new_view {
504                    tracing::debug!(
505                        "Change from view {} to view {} in view sync task",
506                        *self.cur_view,
507                        *new_view
508                    );
509
510                    self.cur_view = new_view;
511                    self.next_view = self.cur_view;
512                    self.num_timeouts_tracked = 0;
513                }
514
515                self.garbage_collect_tasks().await;
516            },
517            HotShotEvent::LeavesDecided(leaves) => {
518                let finalized_epoch = self.highest_finalized_epoch_view.0.max(
519                    leaves
520                        .iter()
521                        .map(|leaf| leaf.epoch(self.epoch_height))
522                        .max()
523                        .unwrap_or(None),
524                );
525                let finalized_view = self.highest_finalized_epoch_view.1.max(
526                    leaves
527                        .iter()
528                        .map(|leaf| leaf.view_number())
529                        .max()
530                        .unwrap_or(ViewNumber::new(0)),
531                );
532
533                self.highest_finalized_epoch_view = (finalized_epoch, finalized_view);
534
535                self.garbage_collect_tasks().await;
536            },
537            &HotShotEvent::Timeout(view_number, ..) => {
538                // This is an old timeout and we can ignore it
539                ensure!(
540                    view_number >= self.cur_view,
541                    debug!("Discarding old timeout vote.")
542                );
543
544                self.num_timeouts_tracked += 1;
545
546                if self.num_timeouts_tracked >= 3 {
547                    tracing::error!("Too many consecutive timeouts!  This shouldn't happen");
548                }
549
550                if self.num_timeouts_tracked >= 2 {
551                    tracing::error!("Starting view sync protocol for view {}", *view_number + 1);
552
553                    self.send_to_or_create_replica(
554                        Arc::new(HotShotEvent::ViewSyncTrigger(view_number + 1)),
555                        view_number + 1,
556                        self.cur_epoch,
557                        &event_stream,
558                    )
559                    .await;
560                } else {
561                    // If this is the first timeout we've seen advance to the next view
562                    self.cur_view = view_number + 1;
563                    broadcast_view_change(
564                        &event_stream,
565                        self.cur_view,
566                        self.cur_epoch,
567                        self.first_epoch,
568                    )
569                    .await;
570                }
571                let leader = self
572                    .membership_coordinator
573                    .membership_for_epoch(self.cur_epoch)
574                    .await?
575                    .leader(view_number)
576                    .await?;
577                tracing::warn!(
578                    %leader,
579                    leader_mnemonic = hotshot_types::utils::mnemonic(&leader),
580                    view_number = *view_number,
581                    num_timeouts_tracked = self.num_timeouts_tracked,
582                    "view timed out",
583                );
584            },
585            HotShotEvent::SetFirstEpoch(view, epoch) => {
586                self.first_epoch = Some((*view, *epoch));
587            },
588
589            _ => {},
590        }
591        Ok(())
592    }
593
594    /// Garbage collect tasks for epochs older than the highest finalized epoch
595    /// or older than the previous epoch, whichever is greater.
596    /// Garbage collect views older than the highest finalized view including the highest finalized view.
597    async fn garbage_collect_tasks(&self) {
598        let previous_epoch = self
599            .cur_epoch
600            .map(|e| e.saturating_sub(1))
601            .map(EpochNumber::new);
602        let gc_epoch = self.highest_finalized_epoch_view.0.max(previous_epoch);
603        Self::garbage_collect_tasks_helper(
604            &self.replica_task_map,
605            &gc_epoch,
606            &self.highest_finalized_epoch_view.1,
607        )
608        .await;
609        Self::garbage_collect_tasks_helper(
610            &self.pre_commit_relay_map,
611            &gc_epoch,
612            &self.highest_finalized_epoch_view.1,
613        )
614        .await;
615        Self::garbage_collect_tasks_helper(
616            &self.commit_relay_map,
617            &gc_epoch,
618            &self.highest_finalized_epoch_view.1,
619        )
620        .await;
621        Self::garbage_collect_tasks_helper(
622            &self.finalize_relay_map,
623            &gc_epoch,
624            &self.highest_finalized_epoch_view.1,
625        )
626        .await;
627    }
628
629    async fn garbage_collect_tasks_helper<VAL>(
630        map: &RwLock<TaskMap<VAL>>,
631        gc_epoch: &Option<EpochNumber>,
632        gc_view: &ViewNumber,
633    ) {
634        let mut task_map = map.write().await;
635        task_map.retain(|e, _| e >= gc_epoch);
636        if let Some(view_map) = task_map.get_mut(gc_epoch) {
637            view_map.retain(|v, _| v > gc_view)
638        };
639        task_map.retain(|_, view_map| !view_map.is_empty());
640    }
641}
642
643impl<TYPES: NodeType> ViewSyncReplicaTaskState<TYPES> {
644    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Replica Task", level = "error")]
645    /// Handle incoming events for the view sync replica task
646    pub async fn handle(
647        &mut self,
648        event: Arc<HotShotEvent<TYPES>>,
649        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
650    ) -> Option<HotShotTaskCompleted> {
651        match event.as_ref() {
652            HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
653                let last_seen_certificate = ViewSyncPhase::PreCommit;
654
655                // Ignore certificate if it is for an older round
656                if certificate.view_number() < self.next_view {
657                    tracing::warn!("We're already in a higher round");
658
659                    return None;
660                }
661
662                let membership = self.membership_for_epoch(certificate.epoch()).await?;
663                let membership_stake_table = membership.stake_table().await;
664                let membership_failure_threshold = membership.failure_threshold().await;
665
666                // If certificate is not valid, return current state
667                if let Err(e) = certificate
668                    .is_valid_cert(
669                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
670                        membership_failure_threshold,
671                        &self.upgrade_lock,
672                    )
673                    .await
674                {
675                    tracing::error!(
676                        "Not valid view sync cert! data: {:?}, error: {}",
677                        certificate.data(),
678                        e
679                    );
680
681                    return None;
682                }
683
684                // If certificate is for a higher round shutdown this task
685                // since another task should have been started for the higher round
686                if certificate.view_number() > self.next_view {
687                    return Some(HotShotTaskCompleted);
688                }
689
690                if certificate.data().relay > self.relay {
691                    self.relay = certificate.data().relay;
692                }
693
694                let Ok(vote) = ViewSyncCommitVote2::<TYPES>::create_signed_vote(
695                    ViewSyncCommitData2 {
696                        relay: certificate.data().relay,
697                        round: self.next_view,
698                        epoch: certificate.data().epoch,
699                    },
700                    self.next_view,
701                    &self.public_key,
702                    &self.private_key,
703                    &self.upgrade_lock,
704                )
705                .await
706                else {
707                    tracing::error!("Failed to sign ViewSyncCommitData!");
708                    return None;
709                };
710
711                broadcast_event(
712                    Arc::new(HotShotEvent::ViewSyncCommitVoteSend(vote)),
713                    &event_stream,
714                )
715                .await;
716
717                if let Some(timeout_task) = self.timeout_task.take() {
718                    timeout_task.abort();
719                }
720
721                self.timeout_task = Some(spawn({
722                    let stream = event_stream.clone();
723                    let phase = last_seen_certificate;
724                    let relay = self.relay;
725                    let next_view = self.next_view;
726                    let timeout = self.view_sync_timeout;
727                    async move {
728                        sleep(timeout).await;
729                        tracing::warn!(
730                            "Vote sending timed out in ViewSyncPreCommitCertificateRecv, Relay = \
731                             {relay}"
732                        );
733
734                        broadcast_event(
735                            Arc::new(HotShotEvent::ViewSyncTimeout(
736                                ViewNumber::new(*next_view),
737                                relay,
738                                phase,
739                            )),
740                            &stream,
741                        )
742                        .await;
743                    }
744                }));
745            },
746
747            HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
748                let last_seen_certificate = ViewSyncPhase::Commit;
749
750                // Ignore certificate if it is for an older round
751                if certificate.view_number() < self.next_view {
752                    tracing::warn!("We're already in a higher round");
753
754                    return None;
755                }
756
757                let membership = self.membership_for_epoch(certificate.epoch()).await?;
758                let membership_stake_table = membership.stake_table().await;
759                let membership_success_threshold = membership.success_threshold().await;
760
761                // If certificate is not valid, return current state
762                if let Err(e) = certificate
763                    .is_valid_cert(
764                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
765                        membership_success_threshold,
766                        &self.upgrade_lock,
767                    )
768                    .await
769                {
770                    tracing::error!(
771                        "Not valid view sync cert! data: {:?}, error: {}",
772                        certificate.data(),
773                        e
774                    );
775
776                    return None;
777                }
778
779                // If certificate is for a higher round shutdown this task
780                // since another task should have been started for the higher round
781                if certificate.view_number() > self.next_view {
782                    return Some(HotShotTaskCompleted);
783                }
784
785                if certificate.data().relay > self.relay {
786                    self.relay = certificate.data().relay;
787                }
788
789                let Ok(vote) = ViewSyncFinalizeVote2::<TYPES>::create_signed_vote(
790                    ViewSyncFinalizeData2 {
791                        relay: certificate.data().relay,
792                        round: self.next_view,
793                        epoch: certificate.data().epoch,
794                    },
795                    self.next_view,
796                    &self.public_key,
797                    &self.private_key,
798                    &self.upgrade_lock,
799                )
800                .await
801                else {
802                    tracing::error!("Failed to sign view sync finalized vote!");
803                    return None;
804                };
805
806                broadcast_event(
807                    Arc::new(HotShotEvent::ViewSyncFinalizeVoteSend(vote)),
808                    &event_stream,
809                )
810                .await;
811
812                tracing::info!(
813                    "View sync protocol has received view sync evidence to update the view to {}",
814                    *self.next_view
815                );
816
817                broadcast_view_change(
818                    &event_stream,
819                    self.next_view,
820                    certificate.epoch(),
821                    self.first_epoch,
822                )
823                .await;
824
825                if let Some(timeout_task) = self.timeout_task.take() {
826                    timeout_task.abort();
827                }
828                self.timeout_task = Some(spawn({
829                    let stream = event_stream.clone();
830                    let phase = last_seen_certificate;
831                    let relay = self.relay;
832                    let next_view = self.next_view;
833                    let timeout = self.view_sync_timeout;
834                    async move {
835                        sleep(timeout).await;
836                        tracing::warn!(
837                            "Vote sending timed out in ViewSyncCommitCertificateRecv, relay = \
838                             {relay}"
839                        );
840                        broadcast_event(
841                            Arc::new(HotShotEvent::ViewSyncTimeout(
842                                ViewNumber::new(*next_view),
843                                relay,
844                                phase,
845                            )),
846                            &stream,
847                        )
848                        .await;
849                    }
850                }));
851            },
852
853            HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
854                // Ignore certificate if it is for an older round
855                if certificate.view_number() < self.next_view {
856                    tracing::warn!("We're already in a higher round");
857
858                    return None;
859                }
860
861                let membership = self.membership_for_epoch(certificate.epoch()).await?;
862                let membership_stake_table = membership.stake_table().await;
863                let membership_success_threshold = membership.success_threshold().await;
864
865                // If certificate is not valid, return current state
866                if let Err(e) = certificate
867                    .is_valid_cert(
868                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
869                        membership_success_threshold,
870                        &self.upgrade_lock,
871                    )
872                    .await
873                {
874                    tracing::error!(
875                        "Not valid view sync cert! data: {:?}, error: {}",
876                        certificate.data(),
877                        e
878                    );
879
880                    return None;
881                }
882
883                // If certificate is for a higher round shutdown this task
884                // since another task should have been started for the higher round
885                if certificate.view_number() > self.next_view {
886                    return Some(HotShotTaskCompleted);
887                }
888
889                if certificate.data().relay > self.relay {
890                    self.relay = certificate.data().relay;
891                }
892
893                if let Some(timeout_task) = self.timeout_task.take() {
894                    timeout_task.abort();
895                }
896
897                broadcast_view_change(
898                    &event_stream,
899                    self.next_view,
900                    certificate.epoch(),
901                    self.first_epoch,
902                )
903                .await;
904                return Some(HotShotTaskCompleted);
905            },
906
907            HotShotEvent::ViewSyncTrigger(view_number) => {
908                let view_number = *view_number;
909                if self.next_view != ViewNumber::new(*view_number) {
910                    tracing::error!("Unexpected view number to trigger view sync");
911                    return None;
912                }
913
914                let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
915                    ViewSyncPreCommitData2 {
916                        relay: 0,
917                        round: view_number,
918                        epoch: self.cur_epoch,
919                    },
920                    view_number,
921                    &self.public_key,
922                    &self.private_key,
923                    &self.upgrade_lock,
924                )
925                .await
926                else {
927                    tracing::error!("Failed to sign pre commit vote!");
928                    return None;
929                };
930
931                broadcast_event(
932                    Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
933                    &event_stream,
934                )
935                .await;
936
937                self.timeout_task = Some(spawn({
938                    let stream = event_stream.clone();
939                    let relay = self.relay;
940                    let next_view = self.next_view;
941                    let timeout = self.view_sync_timeout;
942                    async move {
943                        sleep(timeout).await;
944                        tracing::warn!("Vote sending timed out in ViewSyncTrigger");
945                        broadcast_event(
946                            Arc::new(HotShotEvent::ViewSyncTimeout(
947                                ViewNumber::new(*next_view),
948                                relay,
949                                ViewSyncPhase::None,
950                            )),
951                            &stream,
952                        )
953                        .await;
954                    }
955                }));
956
957                return None;
958            },
959
960            HotShotEvent::ViewSyncTimeout(round, relay, last_seen_certificate) => {
961                let round = *round;
962                // Shouldn't ever receive a timeout for a relay higher than ours
963                if ViewNumber::new(*round) == self.next_view && *relay == self.relay {
964                    if let Some(timeout_task) = self.timeout_task.take() {
965                        timeout_task.abort();
966                    }
967                    self.relay += 1;
968                    match last_seen_certificate {
969                        ViewSyncPhase::None | ViewSyncPhase::PreCommit | ViewSyncPhase::Commit => {
970                            let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
971                                ViewSyncPreCommitData2 {
972                                    relay: self.relay,
973                                    round: self.next_view,
974                                    epoch: self.cur_epoch,
975                                },
976                                self.next_view,
977                                &self.public_key,
978                                &self.private_key,
979                                &self.upgrade_lock,
980                            )
981                            .await
982                            else {
983                                tracing::error!("Failed to sign ViewSyncPreCommitData!");
984                                return None;
985                            };
986
987                            broadcast_event(
988                                Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
989                                &event_stream,
990                            )
991                            .await;
992                        },
993                        ViewSyncPhase::Finalize => {
994                            // This should never occur
995                            unimplemented!()
996                        },
997                    }
998
999                    self.timeout_task = Some(spawn({
1000                        let stream = event_stream.clone();
1001                        let relay = self.relay;
1002                        let next_view = self.next_view;
1003                        let timeout = self.view_sync_timeout;
1004                        let last_cert = last_seen_certificate.clone();
1005                        async move {
1006                            sleep(timeout).await;
1007                            tracing::warn!(
1008                                "Vote sending timed out in ViewSyncTimeout relay = {relay}"
1009                            );
1010                            broadcast_event(
1011                                Arc::new(HotShotEvent::ViewSyncTimeout(
1012                                    ViewNumber::new(*next_view),
1013                                    relay,
1014                                    last_cert,
1015                                )),
1016                                &stream,
1017                            )
1018                            .await;
1019                        }
1020                    }));
1021
1022                    return None;
1023                }
1024            },
1025            _ => return None,
1026        }
1027        None
1028    }
1029
1030    pub async fn membership_for_epoch(
1031        &self,
1032        epoch: Option<EpochNumber>,
1033    ) -> Option<EpochMembership<TYPES>> {
1034        match self
1035            .membership_coordinator
1036            .membership_for_epoch(epoch)
1037            .await
1038        {
1039            Ok(m) => Some(m),
1040            Err(e) => {
1041                tracing::warn!(e.message);
1042                None
1043            },
1044        }
1045    }
1046}