hotshot_task_impls/
view_sync.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration};
8
9use async_broadcast::{Receiver, Sender};
10use async_lock::RwLock;
11use async_trait::async_trait;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14    epoch_membership::{EpochMembership, EpochMembershipCoordinator},
15    message::UpgradeLock,
16    simple_certificate::{
17        ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
18    },
19    simple_vote::{
20        HasEpoch, ViewSyncCommitData2, ViewSyncCommitVote2, ViewSyncFinalizeData2,
21        ViewSyncFinalizeVote2, ViewSyncPreCommitData2, ViewSyncPreCommitVote2,
22    },
23    stake_table::StakeTableEntries,
24    traits::{
25        node_implementation::{ConsensusTime, NodeType, Versions},
26        signature_key::SignatureKey,
27    },
28    utils::EpochTransitionIndicator,
29    vote::{Certificate, HasViewNumber, Vote},
30};
31use hotshot_utils::anytrace::*;
32use tokio::{spawn, task::JoinHandle, time::sleep};
33use tracing::instrument;
34
35use crate::{
36    events::{HotShotEvent, HotShotTaskCompleted},
37    helpers::{broadcast_event, broadcast_view_change},
38    vote_collection::{
39        create_vote_accumulator, AccumulatorInfo, HandleVoteEvent, VoteCollectionTaskState,
40    },
41};
42
43#[derive(PartialEq, PartialOrd, Clone, Debug, Eq, Hash)]
44/// Phases of view sync
45pub enum ViewSyncPhase {
46    /// No phase; before the protocol has begun
47    None,
48    /// PreCommit phase
49    PreCommit,
50    /// Commit phase
51    Commit,
52    /// Finalize phase
53    Finalize,
54}
55
56type TaskMap<TYPES, VAL> =
57    BTreeMap<Option<<TYPES as NodeType>::Epoch>, BTreeMap<<TYPES as NodeType>::View, VAL>>;
58
59/// Type alias for a map from View Number to Relay to Vote Task
60type RelayMap<TYPES, VOTE, CERT, V> =
61    TaskMap<TYPES, BTreeMap<u64, VoteCollectionTaskState<TYPES, VOTE, CERT, V>>>;
62
63type ReplicaTaskMap<TYPES, V> = TaskMap<TYPES, ViewSyncReplicaTaskState<TYPES, V>>;
64
65/// Main view sync task state
66pub struct ViewSyncTaskState<TYPES: NodeType, V: Versions> {
67    /// View HotShot is currently in
68    pub cur_view: TYPES::View,
69
70    /// View HotShot wishes to be in
71    pub next_view: TYPES::View,
72
73    /// Epoch HotShot is currently in
74    pub cur_epoch: Option<TYPES::Epoch>,
75
76    /// Membership for the quorum
77    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
78
79    /// This Nodes Public Key
80    pub public_key: TYPES::SignatureKey,
81
82    /// Our Private Key
83    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
84
85    /// Our node id; for logging
86    pub id: u64,
87
88    /// How many timeouts we've seen in a row; is reset upon a successful view change
89    pub num_timeouts_tracked: u64,
90
91    /// Map of running replica tasks
92    pub replica_task_map: RwLock<ReplicaTaskMap<TYPES, V>>,
93
94    /// Map of pre-commit vote accumulates for the relay
95    pub pre_commit_relay_map: RwLock<
96        RelayMap<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>, V>,
97    >,
98
99    /// Map of commit vote accumulates for the relay
100    pub commit_relay_map:
101        RwLock<RelayMap<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>, V>>,
102
103    /// Map of finalize vote accumulates for the relay
104    pub finalize_relay_map: RwLock<
105        RelayMap<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>, V>,
106    >,
107
108    /// Timeout duration for view sync rounds
109    pub view_sync_timeout: Duration,
110
111    /// Last view we garbage collected old tasks
112    pub last_garbage_collected_view: TYPES::View,
113
114    /// Lock for a decided upgrade
115    pub upgrade_lock: UpgradeLock<TYPES, V>,
116
117    /// First view in which epoch version takes effect
118    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
119
120    /// Keeps track of the highest finalized view and epoch, used for garbage collection
121    pub highest_finalized_epoch_view: (Option<TYPES::Epoch>, TYPES::View),
122
123    pub epoch_height: u64,
124}
125
126#[async_trait]
127impl<TYPES: NodeType, V: Versions> TaskState for ViewSyncTaskState<TYPES, V> {
128    type Event = HotShotEvent<TYPES>;
129
130    async fn handle_event(
131        &mut self,
132        event: Arc<Self::Event>,
133        sender: &Sender<Arc<Self::Event>>,
134        _receiver: &Receiver<Arc<Self::Event>>,
135    ) -> Result<()> {
136        self.handle(event, sender.clone()).await
137    }
138
139    fn cancel_subtasks(&mut self) {}
140}
141
142/// State of a view sync replica task
143pub struct ViewSyncReplicaTaskState<TYPES: NodeType, V: Versions> {
144    /// Timeout for view sync rounds
145    pub view_sync_timeout: Duration,
146
147    /// Current round HotShot is in
148    pub cur_view: TYPES::View,
149
150    /// Round HotShot wishes to be in
151    pub next_view: TYPES::View,
152
153    /// The relay index we are currently on
154    pub relay: u64,
155
156    /// Whether we have seen a finalized certificate
157    pub finalized: bool,
158
159    /// Whether we have already sent a view change event for `next_view`
160    pub sent_view_change_event: bool,
161
162    /// Timeout task handle, when it expires we try the next relay
163    pub timeout_task: Option<JoinHandle<()>>,
164
165    /// Our node id; for logging
166    pub id: u64,
167
168    /// Membership for the quorum
169    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
170
171    /// This Nodes Public Key
172    pub public_key: TYPES::SignatureKey,
173
174    /// Our Private Key
175    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
176
177    /// Lock for a decided upgrade
178    pub upgrade_lock: UpgradeLock<TYPES, V>,
179
180    /// Epoch HotShot was in when this task was created
181    pub cur_epoch: Option<TYPES::Epoch>,
182
183    /// First view in which epoch version takes effect
184    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
185}
186
187#[async_trait]
188impl<TYPES: NodeType, V: Versions> TaskState for ViewSyncReplicaTaskState<TYPES, V> {
189    type Event = HotShotEvent<TYPES>;
190
191    async fn handle_event(
192        &mut self,
193        event: Arc<Self::Event>,
194        sender: &Sender<Arc<Self::Event>>,
195        _receiver: &Receiver<Arc<Self::Event>>,
196    ) -> Result<()> {
197        self.handle(event, sender.clone()).await;
198
199        Ok(())
200    }
201
202    fn cancel_subtasks(&mut self) {}
203}
204
205impl<TYPES: NodeType, V: Versions> ViewSyncTaskState<TYPES, V> {
206    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "View Sync Main Task", level = "error")]
207    #[allow(clippy::type_complexity)]
208    /// Handles incoming events for the main view sync task
209    pub async fn send_to_or_create_replica(
210        &mut self,
211        event: Arc<HotShotEvent<TYPES>>,
212        view: TYPES::View,
213        epoch: Option<TYPES::Epoch>,
214        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
215    ) {
216        let mut task_map = self.replica_task_map.write().await;
217
218        if let Some(replica_task) = task_map.get_mut(&epoch).and_then(|x| x.get_mut(&view)) {
219            // Forward event then return
220            tracing::debug!("Forwarding message");
221            let result = replica_task
222                .handle(Arc::clone(&event), sender.clone())
223                .await;
224
225            if result == Some(HotShotTaskCompleted) {
226                // The protocol has finished
227                if epoch >= self.highest_finalized_epoch_view.0
228                    && view > self.highest_finalized_epoch_view.1
229                {
230                    self.highest_finalized_epoch_view = (epoch, view);
231                } else if view > self.highest_finalized_epoch_view.1 {
232                    tracing::error!(
233                        "We finalized a higher view but the epoch is lower. This should never \
234                         happen. Current highest finalized epoch view: {:?}, new highest \
235                         finalized epoch view: {:?}",
236                        self.highest_finalized_epoch_view,
237                        (epoch, view)
238                    );
239                }
240                task_map.get_mut(&epoch).and_then(|x| x.remove(&view));
241                task_map.retain(|_, x| !x.is_empty());
242                drop(task_map);
243
244                // Garbage collect old tasks
245                self.garbage_collect_tasks().await;
246                return;
247            }
248
249            return;
250        }
251
252        // We do not have a replica task already running, so start one
253        let mut replica_state: ViewSyncReplicaTaskState<TYPES, V> = ViewSyncReplicaTaskState {
254            cur_view: view,
255            next_view: view,
256            relay: 0,
257            finalized: false,
258            sent_view_change_event: false,
259            timeout_task: None,
260            membership_coordinator: self.membership_coordinator.clone(),
261            public_key: self.public_key.clone(),
262            private_key: self.private_key.clone(),
263            view_sync_timeout: self.view_sync_timeout,
264            id: self.id,
265            upgrade_lock: self.upgrade_lock.clone(),
266            cur_epoch: self.cur_epoch,
267            first_epoch: self.first_epoch,
268        };
269
270        let result = replica_state
271            .handle(Arc::clone(&event), sender.clone())
272            .await;
273
274        if result == Some(HotShotTaskCompleted) {
275            // The protocol has finished
276            return;
277        }
278
279        task_map
280            .entry(epoch)
281            .or_default()
282            .insert(view, replica_state);
283    }
284
285    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Main Task", level = "error")]
286    #[allow(clippy::type_complexity)]
287    /// Handles incoming events for the main view sync task
288    pub async fn handle(
289        &mut self,
290        event: Arc<HotShotEvent<TYPES>>,
291        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
292    ) -> Result<()> {
293        match event.as_ref() {
294            HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
295                tracing::debug!("Received view sync cert for phase {certificate:?}");
296                let view = certificate.view_number();
297                self.send_to_or_create_replica(
298                    Arc::clone(&event),
299                    view,
300                    certificate.epoch(),
301                    &event_stream,
302                )
303                .await;
304            },
305            HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
306                tracing::debug!("Received view sync cert for phase {certificate:?}");
307                let view = certificate.view_number();
308                self.send_to_or_create_replica(
309                    Arc::clone(&event),
310                    view,
311                    certificate.epoch(),
312                    &event_stream,
313                )
314                .await;
315            },
316            HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
317                tracing::debug!("Received view sync cert for phase {certificate:?}");
318                let view = certificate.view_number();
319                self.send_to_or_create_replica(
320                    Arc::clone(&event),
321                    view,
322                    certificate.epoch(),
323                    &event_stream,
324                )
325                .await;
326            },
327            HotShotEvent::ViewSyncTimeout(view, ..) => {
328                tracing::debug!("view sync timeout in main task {view:?}");
329                let view = *view;
330                self.send_to_or_create_replica(
331                    Arc::clone(&event),
332                    view,
333                    self.cur_epoch,
334                    &event_stream,
335                )
336                .await;
337            },
338
339            HotShotEvent::ViewSyncPreCommitVoteRecv(ref vote) => {
340                let mut map = self.pre_commit_relay_map.write().await;
341                let vote_view = vote.view_number();
342                let relay = vote.date().relay;
343                let relay_map = map
344                    .entry(vote.date().epoch)
345                    .or_insert(BTreeMap::new())
346                    .entry(vote_view)
347                    .or_insert(BTreeMap::new());
348                if let Some(relay_task) = relay_map.get_mut(&relay) {
349                    tracing::debug!("Forwarding message");
350
351                    // Handle the vote and check if the accumulator has returned successfully
352                    if relay_task
353                        .handle_vote_event(Arc::clone(&event), &event_stream)
354                        .await?
355                        .is_some()
356                    {
357                        map.get_mut(&vote.date().epoch)
358                            .and_then(|x| x.remove(&vote_view));
359                        map.retain(|_, x| !x.is_empty());
360                    }
361
362                    return Ok(());
363                }
364
365                let epoch_mem = self
366                    .membership_coordinator
367                    .membership_for_epoch(vote.date().epoch)
368                    .await?;
369                // We do not have a relay task already running, so start one
370                ensure!(
371                    epoch_mem.leader(vote_view + relay).await? == self.public_key,
372                    "View sync vote sent to wrong leader"
373                );
374
375                let info = AccumulatorInfo {
376                    public_key: self.public_key.clone(),
377                    membership: epoch_mem,
378                    view: vote_view,
379                    id: self.id,
380                };
381                let vote_collector = create_vote_accumulator(
382                    &info,
383                    event,
384                    &event_stream,
385                    self.upgrade_lock.clone(),
386                    EpochTransitionIndicator::NotInTransition,
387                )
388                .await?;
389
390                relay_map.insert(relay, vote_collector);
391            },
392
393            HotShotEvent::ViewSyncCommitVoteRecv(ref vote) => {
394                let mut map = self.commit_relay_map.write().await;
395                let vote_view = vote.view_number();
396                let relay = vote.date().relay;
397                let relay_map = map
398                    .entry(vote.date().epoch)
399                    .or_insert(BTreeMap::new())
400                    .entry(vote_view)
401                    .or_insert(BTreeMap::new());
402                if let Some(relay_task) = relay_map.get_mut(&relay) {
403                    tracing::debug!("Forwarding message");
404
405                    // Handle the vote and check if the accumulator has returned successfully
406                    if relay_task
407                        .handle_vote_event(Arc::clone(&event), &event_stream)
408                        .await?
409                        .is_some()
410                    {
411                        map.get_mut(&vote.date().epoch)
412                            .and_then(|x| x.remove(&vote_view));
413                        map.retain(|_, x| !x.is_empty());
414                    }
415
416                    return Ok(());
417                }
418
419                // We do not have a relay task already running, so start one
420                let epoch_mem = self
421                    .membership_coordinator
422                    .membership_for_epoch(vote.date().epoch)
423                    .await?;
424                ensure!(
425                    epoch_mem.leader(vote_view + relay).await? == self.public_key,
426                    debug!("View sync vote sent to wrong leader")
427                );
428
429                let info = AccumulatorInfo {
430                    public_key: self.public_key.clone(),
431                    membership: epoch_mem,
432                    view: vote_view,
433                    id: self.id,
434                };
435
436                let vote_collector = create_vote_accumulator(
437                    &info,
438                    event,
439                    &event_stream,
440                    self.upgrade_lock.clone(),
441                    EpochTransitionIndicator::NotInTransition,
442                )
443                .await?;
444                relay_map.insert(relay, vote_collector);
445            },
446
447            HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
448                let mut map = self.finalize_relay_map.write().await;
449                let vote_view = vote.view_number();
450                let relay = vote.date().relay;
451                let relay_map = map
452                    .entry(vote.date().epoch)
453                    .or_insert(BTreeMap::new())
454                    .entry(vote_view)
455                    .or_insert(BTreeMap::new());
456                if let Some(relay_task) = relay_map.get_mut(&relay) {
457                    tracing::debug!("Forwarding message");
458
459                    // Handle the vote and check if the accumulator has returned successfully
460                    if relay_task
461                        .handle_vote_event(Arc::clone(&event), &event_stream)
462                        .await?
463                        .is_some()
464                    {
465                        map.get_mut(&vote.date().epoch)
466                            .and_then(|x| x.remove(&vote_view));
467                        map.retain(|_, x| !x.is_empty());
468                    }
469
470                    return Ok(());
471                }
472
473                let epoch_mem = self
474                    .membership_coordinator
475                    .membership_for_epoch(vote.date().epoch)
476                    .await?;
477                // We do not have a relay task already running, so start one
478                ensure!(
479                    epoch_mem.leader(vote_view + relay).await? == self.public_key,
480                    debug!("View sync vote sent to wrong leader")
481                );
482
483                let info = AccumulatorInfo {
484                    public_key: self.public_key.clone(),
485                    membership: epoch_mem,
486                    view: vote_view,
487                    id: self.id,
488                };
489                let vote_collector = create_vote_accumulator(
490                    &info,
491                    event,
492                    &event_stream,
493                    self.upgrade_lock.clone(),
494                    EpochTransitionIndicator::NotInTransition,
495                )
496                .await;
497                if let Ok(vote_task) = vote_collector {
498                    relay_map.insert(relay, vote_task);
499                }
500            },
501
502            &HotShotEvent::ViewChange(new_view, epoch) => {
503                if epoch > self.cur_epoch {
504                    self.cur_epoch = epoch;
505                }
506                let new_view = TYPES::View::new(*new_view);
507                if self.cur_view < new_view {
508                    tracing::debug!(
509                        "Change from view {} to view {} in view sync task",
510                        *self.cur_view,
511                        *new_view
512                    );
513
514                    self.cur_view = new_view;
515                    self.next_view = self.cur_view;
516                    self.num_timeouts_tracked = 0;
517                }
518
519                self.garbage_collect_tasks().await;
520            },
521            HotShotEvent::LeavesDecided(leaves) => {
522                let finalized_epoch = self.highest_finalized_epoch_view.0.max(
523                    leaves
524                        .iter()
525                        .map(|leaf| leaf.epoch(self.epoch_height))
526                        .max()
527                        .unwrap_or(None),
528                );
529                let finalized_view = self.highest_finalized_epoch_view.1.max(
530                    leaves
531                        .iter()
532                        .map(|leaf| leaf.view_number())
533                        .max()
534                        .unwrap_or(TYPES::View::new(0)),
535                );
536
537                self.highest_finalized_epoch_view = (finalized_epoch, finalized_view);
538
539                self.garbage_collect_tasks().await;
540            },
541            &HotShotEvent::Timeout(view_number, ..) => {
542                // This is an old timeout and we can ignore it
543                ensure!(
544                    view_number >= self.cur_view,
545                    debug!("Discarding old timeout vote.")
546                );
547
548                self.num_timeouts_tracked += 1;
549                let leader = self
550                    .membership_coordinator
551                    .membership_for_epoch(self.cur_epoch)
552                    .await?
553                    .leader(view_number)
554                    .await?;
555                tracing::warn!(
556                    %leader,
557                    leader_mnemonic = hotshot_types::utils::mnemonic(&leader),
558                    view_number = *view_number,
559                    num_timeouts_tracked = self.num_timeouts_tracked,
560                    "view timed out",
561                );
562
563                if self.num_timeouts_tracked >= 3 {
564                    tracing::error!("Too many consecutive timeouts!  This shouldn't happen");
565                }
566
567                if self.num_timeouts_tracked >= 2 {
568                    tracing::error!("Starting view sync protocol for view {}", *view_number + 1);
569
570                    self.send_to_or_create_replica(
571                        Arc::new(HotShotEvent::ViewSyncTrigger(view_number + 1)),
572                        view_number + 1,
573                        self.cur_epoch,
574                        &event_stream,
575                    )
576                    .await;
577                } else {
578                    // If this is the first timeout we've seen advance to the next view
579                    self.cur_view = view_number + 1;
580                    broadcast_view_change(
581                        &event_stream,
582                        self.cur_view,
583                        self.cur_epoch,
584                        self.first_epoch,
585                    )
586                    .await;
587                }
588            },
589            HotShotEvent::SetFirstEpoch(view, epoch) => {
590                self.first_epoch = Some((*view, *epoch));
591            },
592
593            _ => {},
594        }
595        Ok(())
596    }
597
598    /// Garbage collect tasks for epochs older than the highest finalized epoch
599    /// or older than the previous epoch, whichever is greater.
600    /// Garbage collect views older than the highest finalized view including the highest finalized view.
601    async fn garbage_collect_tasks(&self) {
602        let previous_epoch = self
603            .cur_epoch
604            .map(|e| e.saturating_sub(1))
605            .map(TYPES::Epoch::new);
606        let gc_epoch = self.highest_finalized_epoch_view.0.max(previous_epoch);
607        Self::garbage_collect_tasks_helper(
608            &self.replica_task_map,
609            &gc_epoch,
610            &self.highest_finalized_epoch_view.1,
611        )
612        .await;
613        Self::garbage_collect_tasks_helper(
614            &self.pre_commit_relay_map,
615            &gc_epoch,
616            &self.highest_finalized_epoch_view.1,
617        )
618        .await;
619        Self::garbage_collect_tasks_helper(
620            &self.commit_relay_map,
621            &gc_epoch,
622            &self.highest_finalized_epoch_view.1,
623        )
624        .await;
625        Self::garbage_collect_tasks_helper(
626            &self.finalize_relay_map,
627            &gc_epoch,
628            &self.highest_finalized_epoch_view.1,
629        )
630        .await;
631    }
632
633    async fn garbage_collect_tasks_helper<VAL>(
634        map: &RwLock<TaskMap<TYPES, VAL>>,
635        gc_epoch: &Option<TYPES::Epoch>,
636        gc_view: &TYPES::View,
637    ) {
638        let mut task_map = map.write().await;
639        task_map.retain(|e, _| e >= gc_epoch);
640        if let Some(view_map) = task_map.get_mut(gc_epoch) {
641            view_map.retain(|v, _| v > gc_view)
642        };
643        task_map.retain(|_, view_map| !view_map.is_empty());
644    }
645}
646
647impl<TYPES: NodeType, V: Versions> ViewSyncReplicaTaskState<TYPES, V> {
648    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Replica Task", level = "error")]
649    /// Handle incoming events for the view sync replica task
650    pub async fn handle(
651        &mut self,
652        event: Arc<HotShotEvent<TYPES>>,
653        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
654    ) -> Option<HotShotTaskCompleted> {
655        match event.as_ref() {
656            HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
657                let last_seen_certificate = ViewSyncPhase::PreCommit;
658
659                // Ignore certificate if it is for an older round
660                if certificate.view_number() < self.next_view {
661                    tracing::warn!("We're already in a higher round");
662
663                    return None;
664                }
665
666                let membership = self.membership_for_epoch(certificate.epoch()).await?;
667                let membership_stake_table = membership.stake_table().await;
668                let membership_failure_threshold = membership.failure_threshold().await;
669
670                // If certificate is not valid, return current state
671                if let Err(e) = certificate
672                    .is_valid_cert(
673                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
674                        membership_failure_threshold,
675                        &self.upgrade_lock,
676                    )
677                    .await
678                {
679                    tracing::error!(
680                        "Not valid view sync cert! data: {:?}, error: {}",
681                        certificate.data(),
682                        e
683                    );
684
685                    return None;
686                }
687
688                // If certificate is for a higher round shutdown this task
689                // since another task should have been started for the higher round
690                if certificate.view_number() > self.next_view {
691                    return Some(HotShotTaskCompleted);
692                }
693
694                if certificate.data().relay > self.relay {
695                    self.relay = certificate.data().relay;
696                }
697
698                let Ok(vote) = ViewSyncCommitVote2::<TYPES>::create_signed_vote(
699                    ViewSyncCommitData2 {
700                        relay: certificate.data().relay,
701                        round: self.next_view,
702                        epoch: certificate.data().epoch,
703                    },
704                    self.next_view,
705                    &self.public_key,
706                    &self.private_key,
707                    &self.upgrade_lock,
708                )
709                .await
710                else {
711                    tracing::error!("Failed to sign ViewSyncCommitData!");
712                    return None;
713                };
714
715                broadcast_event(
716                    Arc::new(HotShotEvent::ViewSyncCommitVoteSend(vote)),
717                    &event_stream,
718                )
719                .await;
720
721                if let Some(timeout_task) = self.timeout_task.take() {
722                    timeout_task.abort();
723                }
724
725                self.timeout_task = Some(spawn({
726                    let stream = event_stream.clone();
727                    let phase = last_seen_certificate;
728                    let relay = self.relay;
729                    let next_view = self.next_view;
730                    let timeout = self.view_sync_timeout;
731                    async move {
732                        sleep(timeout).await;
733                        tracing::warn!(
734                            "Vote sending timed out in ViewSyncPreCommitCertificateRecv, Relay = \
735                             {relay}"
736                        );
737
738                        broadcast_event(
739                            Arc::new(HotShotEvent::ViewSyncTimeout(
740                                TYPES::View::new(*next_view),
741                                relay,
742                                phase,
743                            )),
744                            &stream,
745                        )
746                        .await;
747                    }
748                }));
749            },
750
751            HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
752                let last_seen_certificate = ViewSyncPhase::Commit;
753
754                // Ignore certificate if it is for an older round
755                if certificate.view_number() < self.next_view {
756                    tracing::warn!("We're already in a higher round");
757
758                    return None;
759                }
760
761                let membership = self.membership_for_epoch(certificate.epoch()).await?;
762                let membership_stake_table = membership.stake_table().await;
763                let membership_success_threshold = membership.success_threshold().await;
764
765                // If certificate is not valid, return current state
766                if let Err(e) = certificate
767                    .is_valid_cert(
768                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
769                        membership_success_threshold,
770                        &self.upgrade_lock,
771                    )
772                    .await
773                {
774                    tracing::error!(
775                        "Not valid view sync cert! data: {:?}, error: {}",
776                        certificate.data(),
777                        e
778                    );
779
780                    return None;
781                }
782
783                // If certificate is for a higher round shutdown this task
784                // since another task should have been started for the higher round
785                if certificate.view_number() > self.next_view {
786                    return Some(HotShotTaskCompleted);
787                }
788
789                if certificate.data().relay > self.relay {
790                    self.relay = certificate.data().relay;
791                }
792
793                let Ok(vote) = ViewSyncFinalizeVote2::<TYPES>::create_signed_vote(
794                    ViewSyncFinalizeData2 {
795                        relay: certificate.data().relay,
796                        round: self.next_view,
797                        epoch: certificate.data().epoch,
798                    },
799                    self.next_view,
800                    &self.public_key,
801                    &self.private_key,
802                    &self.upgrade_lock,
803                )
804                .await
805                else {
806                    tracing::error!("Failed to sign view sync finalized vote!");
807                    return None;
808                };
809
810                broadcast_event(
811                    Arc::new(HotShotEvent::ViewSyncFinalizeVoteSend(vote)),
812                    &event_stream,
813                )
814                .await;
815
816                tracing::info!(
817                    "View sync protocol has received view sync evidence to update the view to {}",
818                    *self.next_view
819                );
820
821                broadcast_view_change(
822                    &event_stream,
823                    self.next_view,
824                    certificate.epoch(),
825                    self.first_epoch,
826                )
827                .await;
828
829                if let Some(timeout_task) = self.timeout_task.take() {
830                    timeout_task.abort();
831                }
832                self.timeout_task = Some(spawn({
833                    let stream = event_stream.clone();
834                    let phase = last_seen_certificate;
835                    let relay = self.relay;
836                    let next_view = self.next_view;
837                    let timeout = self.view_sync_timeout;
838                    async move {
839                        sleep(timeout).await;
840                        tracing::warn!(
841                            "Vote sending timed out in ViewSyncCommitCertificateRecv, relay = \
842                             {relay}"
843                        );
844                        broadcast_event(
845                            Arc::new(HotShotEvent::ViewSyncTimeout(
846                                TYPES::View::new(*next_view),
847                                relay,
848                                phase,
849                            )),
850                            &stream,
851                        )
852                        .await;
853                    }
854                }));
855            },
856
857            HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
858                // Ignore certificate if it is for an older round
859                if certificate.view_number() < self.next_view {
860                    tracing::warn!("We're already in a higher round");
861
862                    return None;
863                }
864
865                let membership = self.membership_for_epoch(certificate.epoch()).await?;
866                let membership_stake_table = membership.stake_table().await;
867                let membership_success_threshold = membership.success_threshold().await;
868
869                // If certificate is not valid, return current state
870                if let Err(e) = certificate
871                    .is_valid_cert(
872                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
873                        membership_success_threshold,
874                        &self.upgrade_lock,
875                    )
876                    .await
877                {
878                    tracing::error!(
879                        "Not valid view sync cert! data: {:?}, error: {}",
880                        certificate.data(),
881                        e
882                    );
883
884                    return None;
885                }
886
887                // If certificate is for a higher round shutdown this task
888                // since another task should have been started for the higher round
889                if certificate.view_number() > self.next_view {
890                    return Some(HotShotTaskCompleted);
891                }
892
893                if certificate.data().relay > self.relay {
894                    self.relay = certificate.data().relay;
895                }
896
897                if let Some(timeout_task) = self.timeout_task.take() {
898                    timeout_task.abort();
899                }
900
901                broadcast_view_change(
902                    &event_stream,
903                    self.next_view,
904                    certificate.epoch(),
905                    self.first_epoch,
906                )
907                .await;
908                return Some(HotShotTaskCompleted);
909            },
910
911            HotShotEvent::ViewSyncTrigger(view_number) => {
912                let view_number = *view_number;
913                if self.next_view != TYPES::View::new(*view_number) {
914                    tracing::error!("Unexpected view number to trigger view sync");
915                    return None;
916                }
917
918                let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
919                    ViewSyncPreCommitData2 {
920                        relay: 0,
921                        round: view_number,
922                        epoch: self.cur_epoch,
923                    },
924                    view_number,
925                    &self.public_key,
926                    &self.private_key,
927                    &self.upgrade_lock,
928                )
929                .await
930                else {
931                    tracing::error!("Failed to sign pre commit vote!");
932                    return None;
933                };
934
935                broadcast_event(
936                    Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
937                    &event_stream,
938                )
939                .await;
940
941                self.timeout_task = Some(spawn({
942                    let stream = event_stream.clone();
943                    let relay = self.relay;
944                    let next_view = self.next_view;
945                    let timeout = self.view_sync_timeout;
946                    async move {
947                        sleep(timeout).await;
948                        tracing::warn!("Vote sending timed out in ViewSyncTrigger");
949                        broadcast_event(
950                            Arc::new(HotShotEvent::ViewSyncTimeout(
951                                TYPES::View::new(*next_view),
952                                relay,
953                                ViewSyncPhase::None,
954                            )),
955                            &stream,
956                        )
957                        .await;
958                    }
959                }));
960
961                return None;
962            },
963
964            HotShotEvent::ViewSyncTimeout(round, relay, last_seen_certificate) => {
965                let round = *round;
966                // Shouldn't ever receive a timeout for a relay higher than ours
967                if TYPES::View::new(*round) == self.next_view && *relay == self.relay {
968                    if let Some(timeout_task) = self.timeout_task.take() {
969                        timeout_task.abort();
970                    }
971                    self.relay += 1;
972                    match last_seen_certificate {
973                        ViewSyncPhase::None | ViewSyncPhase::PreCommit | ViewSyncPhase::Commit => {
974                            let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
975                                ViewSyncPreCommitData2 {
976                                    relay: self.relay,
977                                    round: self.next_view,
978                                    epoch: self.cur_epoch,
979                                },
980                                self.next_view,
981                                &self.public_key,
982                                &self.private_key,
983                                &self.upgrade_lock,
984                            )
985                            .await
986                            else {
987                                tracing::error!("Failed to sign ViewSyncPreCommitData!");
988                                return None;
989                            };
990
991                            broadcast_event(
992                                Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
993                                &event_stream,
994                            )
995                            .await;
996                        },
997                        ViewSyncPhase::Finalize => {
998                            // This should never occur
999                            unimplemented!()
1000                        },
1001                    }
1002
1003                    self.timeout_task = Some(spawn({
1004                        let stream = event_stream.clone();
1005                        let relay = self.relay;
1006                        let next_view = self.next_view;
1007                        let timeout = self.view_sync_timeout;
1008                        let last_cert = last_seen_certificate.clone();
1009                        async move {
1010                            sleep(timeout).await;
1011                            tracing::warn!(
1012                                "Vote sending timed out in ViewSyncTimeout relay = {relay}"
1013                            );
1014                            broadcast_event(
1015                                Arc::new(HotShotEvent::ViewSyncTimeout(
1016                                    TYPES::View::new(*next_view),
1017                                    relay,
1018                                    last_cert,
1019                                )),
1020                                &stream,
1021                            )
1022                            .await;
1023                        }
1024                    }));
1025
1026                    return None;
1027                }
1028            },
1029            _ => return None,
1030        }
1031        None
1032    }
1033
1034    pub async fn membership_for_epoch(
1035        &self,
1036        epoch: Option<TYPES::Epoch>,
1037    ) -> Option<EpochMembership<TYPES>> {
1038        match self
1039            .membership_coordinator
1040            .membership_for_epoch(epoch)
1041            .await
1042        {
1043            Ok(m) => Some(m),
1044            Err(e) => {
1045                tracing::warn!(e.message);
1046                None
1047            },
1048        }
1049    }
1050}