hotshot_task_impls/consensus/
handlers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Duration};
8
9use async_broadcast::Sender;
10use chrono::Utc;
11use hotshot_types::{
12    event::{Event, EventType},
13    simple_certificate::EpochRootQuorumCertificate,
14    simple_vote::{EpochRootQuorumVote, HasEpoch, QuorumVote2, TimeoutData2, TimeoutVote2},
15    traits::node_implementation::{ConsensusTime, NodeImplementation, NodeType},
16    utils::{is_epoch_root, is_epoch_transition, is_last_block, EpochTransitionIndicator},
17    vote::{HasViewNumber, Vote},
18};
19use hotshot_utils::anytrace::*;
20use tokio::{spawn, time::sleep};
21use tracing::instrument;
22use vbs::version::StaticVersionType;
23
24use super::ConsensusTaskState;
25use crate::{
26    consensus::Versions,
27    events::HotShotEvent,
28    helpers::{broadcast_event, check_qc_state_cert_correspondence},
29    vote_collection::{handle_epoch_root_vote, handle_vote},
30};
31
32/// Handle a `QuorumVoteRecv` event.
33pub(crate) async fn handle_quorum_vote_recv<
34    TYPES: NodeType,
35    I: NodeImplementation<TYPES>,
36    V: Versions,
37>(
38    vote: &QuorumVote2<TYPES>,
39    event: Arc<HotShotEvent<TYPES>>,
40    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
41    task_state: &mut ConsensusTaskState<TYPES, I, V>,
42) -> Result<()> {
43    let in_transition = task_state
44        .consensus
45        .read()
46        .await
47        .is_high_qc_for_epoch_transition();
48    let epoch_membership = task_state
49        .membership_coordinator
50        .membership_for_epoch(vote.data.epoch)
51        .await
52        .context(warn!("No stake table for epoch"))?;
53
54    let we_are_leader =
55        epoch_membership.leader(vote.view_number() + 1).await? == task_state.public_key;
56    ensure!(
57        in_transition || we_are_leader,
58        info!(
59            "We are not the leader for view {} and we are not in the epoch transition",
60            vote.view_number() + 1
61        )
62    );
63
64    let transition_indicator = if in_transition {
65        EpochTransitionIndicator::InTransition
66    } else {
67        EpochTransitionIndicator::NotInTransition
68    };
69    handle_vote(
70        &mut task_state.vote_collectors,
71        vote,
72        task_state.public_key.clone(),
73        &epoch_membership,
74        task_state.id,
75        &event,
76        sender,
77        &task_state.upgrade_lock,
78        transition_indicator.clone(),
79    )
80    .await?;
81
82    if vote.epoch().is_some()
83        && vote
84            .data
85            .block_number
86            .is_some_and(|b| is_epoch_transition(b, task_state.epoch_height))
87    {
88        // If the vote sender belongs to the next epoch, collect it separately to form the second QC
89        let has_stake = epoch_membership
90            .next_epoch_stake_table()
91            .await?
92            .has_stake(&vote.signing_key())
93            .await;
94        if has_stake {
95            handle_vote(
96                &mut task_state.next_epoch_vote_collectors,
97                &vote.clone().into(),
98                task_state.public_key.clone(),
99                // We eventually verify in `handle_vote` that we are the leader before assembling the certificate here,
100                // so we must request the full randomized stake table.
101                //
102                // I'm not sure this is really necessary, but I've opted not to modify the logic.
103                &epoch_membership.next_epoch().await?.clone(),
104                task_state.id,
105                &event,
106                sender,
107                &task_state.upgrade_lock,
108                transition_indicator,
109            )
110            .await?;
111        }
112    }
113
114    Ok(())
115}
116
117/// Handle a `QuorumVoteRecv` event.
118pub(crate) async fn handle_epoch_root_quorum_vote_recv<
119    TYPES: NodeType,
120    I: NodeImplementation<TYPES>,
121    V: Versions,
122>(
123    vote: &EpochRootQuorumVote<TYPES>,
124    event: Arc<HotShotEvent<TYPES>>,
125    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
126    task_state: &mut ConsensusTaskState<TYPES, I, V>,
127) -> Result<()> {
128    ensure!(
129        vote.vote
130            .data
131            .block_number
132            .is_some_and(|bn| is_epoch_root(bn, task_state.epoch_height)),
133        error!("Received epoch root quorum vote for non epoch root block.")
134    );
135
136    let epoch_membership = task_state
137        .membership_coordinator
138        .membership_for_epoch(vote.vote.data.epoch)
139        .await
140        .context(warn!("No stake table for epoch"))?;
141
142    let we_are_leader =
143        epoch_membership.leader(vote.view_number() + 1).await? == task_state.public_key;
144    ensure!(
145        we_are_leader,
146        info!("We are not the leader for view {}", vote.view_number() + 1)
147    );
148
149    handle_epoch_root_vote(
150        &mut task_state.epoch_root_vote_collectors,
151        vote,
152        task_state.public_key.clone(),
153        &epoch_membership,
154        task_state.id,
155        &event,
156        sender,
157        &task_state.upgrade_lock,
158    )
159    .await?;
160
161    Ok(())
162}
163
164/// Handle a `TimeoutVoteRecv` event.
165pub(crate) async fn handle_timeout_vote_recv<
166    TYPES: NodeType,
167    I: NodeImplementation<TYPES>,
168    V: Versions,
169>(
170    vote: &TimeoutVote2<TYPES>,
171    event: Arc<HotShotEvent<TYPES>>,
172    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
173    task_state: &mut ConsensusTaskState<TYPES, I, V>,
174) -> Result<()> {
175    let epoch_membership = task_state
176        .membership_coordinator
177        .membership_for_epoch(task_state.cur_epoch)
178        .await
179        .context(warn!("No stake table for epoch"))?;
180    // Are we the leader for this view?
181    ensure!(
182        epoch_membership.leader(vote.view_number() + 1).await? == task_state.public_key,
183        info!("We are not the leader for view {}", vote.view_number() + 1)
184    );
185
186    handle_vote(
187        &mut task_state.timeout_vote_collectors,
188        vote,
189        task_state.public_key.clone(),
190        &task_state
191            .membership_coordinator
192            .membership_for_epoch(vote.data.epoch)
193            .await?,
194        task_state.id,
195        &event,
196        sender,
197        &task_state.upgrade_lock,
198        EpochTransitionIndicator::NotInTransition,
199    )
200    .await?;
201
202    Ok(())
203}
204
205/// Send an event to the next leader containing the highest QC we have
206/// This is a necessary part of HotStuff 2 but not the original HotStuff
207///
208/// #Errors
209/// Returns and error if we can't get the version or the version doesn't
210/// yet support HS 2
211pub async fn send_high_qc<TYPES: NodeType, V: Versions, I: NodeImplementation<TYPES>>(
212    new_view_number: TYPES::View,
213    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
214    task_state: &mut ConsensusTaskState<TYPES, I, V>,
215) -> Result<()> {
216    let version = task_state.upgrade_lock.version(new_view_number).await?;
217    ensure!(
218        version >= V::Epochs::VERSION,
219        debug!("HotStuff 2 upgrade not yet in effect")
220    );
221
222    let consensus_reader = task_state.consensus.read().await;
223    let high_qc = consensus_reader.high_qc().clone();
224    let is_eqc = high_qc
225        .data
226        .block_number
227        .is_some_and(|b| is_last_block(b, task_state.epoch_height));
228    let is_epoch_root = high_qc
229        .data
230        .block_number
231        .is_some_and(|b| is_epoch_root(b, task_state.epoch_height));
232    let state_cert = if is_epoch_root {
233        consensus_reader.state_cert().cloned()
234    } else {
235        None
236    };
237    drop(consensus_reader);
238
239    if is_eqc {
240        let maybe_next_epoch_high_qc = task_state
241            .consensus
242            .read()
243            .await
244            .next_epoch_high_qc()
245            .cloned();
246        ensure!(
247            maybe_next_epoch_high_qc
248                .as_ref()
249                .is_some_and(|neqc| neqc.data.leaf_commit == high_qc.data.leaf_commit),
250            "We've seen an extended QC but we don't have a corresponding next epoch extended QC"
251        );
252
253        tracing::debug!(
254            "Broadcasting Extended QC for view {} and epoch {:?}, my id {}.",
255            high_qc.view_number(),
256            high_qc.epoch(),
257            task_state.id
258        );
259        broadcast_event(
260            Arc::new(HotShotEvent::ExtendedQcSend(
261                high_qc,
262                maybe_next_epoch_high_qc.unwrap(),
263                task_state.public_key.clone(),
264            )),
265            sender,
266        )
267        .await;
268    } else {
269        let leader = task_state
270            .membership_coordinator
271            .membership_for_epoch(task_state.cur_epoch)
272            .await?
273            .leader(new_view_number)
274            .await?;
275
276        let (high_qc, maybe_next_epoch_qc) = if high_qc
277            .data
278            .block_number
279            .is_some_and(|b| is_epoch_transition(b, task_state.epoch_height))
280        {
281            let Some((qc, next_epoch_qc)) =
282                task_state.consensus.read().await.transition_qc().cloned()
283            else {
284                bail!("We don't have a transition QC");
285            };
286            ensure!(
287                next_epoch_qc.data.leaf_commit == qc.data.leaf_commit,
288                "Transition QC is invalid because leaf commits are not equal."
289            );
290            (qc, Some(next_epoch_qc))
291        } else {
292            (high_qc, None)
293        };
294
295        if is_epoch_root {
296            // For epoch root QC, we are sending high QC and state cert
297            let Some(state_cert) = state_cert else {
298                bail!(
299                    "We are sending an epoch root QC but we don't have the corresponding state \
300                     cert."
301                );
302            };
303            ensure!(
304                check_qc_state_cert_correspondence(&high_qc, &state_cert, task_state.epoch_height),
305                "We are sending an epoch root QC but we don't have the corresponding state cert."
306            );
307
308            tracing::trace!(
309                "Sending epoch root QC for view {}, height {:?}",
310                high_qc.view_number(),
311                high_qc.data.block_number
312            );
313            broadcast_event(
314                Arc::new(HotShotEvent::EpochRootQcSend(
315                    EpochRootQuorumCertificate {
316                        qc: high_qc,
317                        state_cert,
318                    },
319                    leader,
320                    task_state.public_key.clone(),
321                )),
322                sender,
323            )
324            .await;
325        } else {
326            tracing::trace!(
327                "Sending high QC for view {}, height {:?}",
328                high_qc.view_number(),
329                high_qc.data.block_number
330            );
331            broadcast_event(
332                Arc::new(HotShotEvent::HighQcSend(
333                    high_qc,
334                    maybe_next_epoch_qc,
335                    leader,
336                    task_state.public_key.clone(),
337                )),
338                sender,
339            )
340            .await;
341        }
342    }
343    Ok(())
344}
345
346/// Handle a `ViewChange` event.
347#[instrument(skip_all)]
348pub(crate) async fn handle_view_change<
349    TYPES: NodeType,
350    I: NodeImplementation<TYPES>,
351    V: Versions,
352>(
353    new_view_number: TYPES::View,
354    epoch_number: Option<TYPES::Epoch>,
355    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
356    task_state: &mut ConsensusTaskState<TYPES, I, V>,
357) -> Result<()> {
358    if epoch_number > task_state.cur_epoch {
359        task_state.cur_epoch = epoch_number;
360        if let Some(new_epoch) = epoch_number {
361            let _ = task_state.consensus.write().await.update_epoch(new_epoch);
362            tracing::info!("Progress: entered epoch {:>6}", *new_epoch);
363        }
364    }
365
366    ensure!(
367        new_view_number > task_state.cur_view,
368        "New view is not larger than the current view"
369    );
370
371    let old_view_number = task_state.cur_view;
372    tracing::debug!("Updating view from {old_view_number} to {new_view_number}");
373
374    if *old_view_number / 100 != *new_view_number / 100 {
375        tracing::info!("Progress: entered view {:>6}", *new_view_number);
376    }
377
378    // Send our high qc to the next leader immediately upon finishing a view.
379    // Part of HotStuff 2
380    let _ = send_high_qc(new_view_number, sender, task_state)
381        .await
382        .inspect_err(|e| {
383            tracing::debug!("High QC sending failed with error: {e:?}");
384        });
385
386    // Move this node to the next view
387    task_state.cur_view = new_view_number;
388    task_state
389        .consensus
390        .write()
391        .await
392        .update_view(new_view_number)?;
393
394    // If we have a decided upgrade certificate, the protocol version may also have been upgraded.
395    let decided_upgrade_certificate_read = task_state
396        .upgrade_lock
397        .decided_upgrade_certificate
398        .read()
399        .await
400        .clone();
401    if let Some(cert) = decided_upgrade_certificate_read {
402        if new_view_number == cert.data.new_version_first_view {
403            tracing::error!("Version upgraded based on a decided upgrade cert: {cert:?}");
404        }
405    }
406
407    // Spawn a timeout task if we did actually update view
408    let timeout = task_state.timeout;
409    let new_timeout_task = spawn({
410        let stream = sender.clone();
411        let view_number = new_view_number;
412        async move {
413            sleep(Duration::from_millis(timeout)).await;
414            broadcast_event(
415                Arc::new(HotShotEvent::Timeout(
416                    TYPES::View::new(*view_number),
417                    epoch_number,
418                )),
419                &stream,
420            )
421            .await;
422        }
423    });
424
425    // Cancel the old timeout task
426    std::mem::replace(&mut task_state.timeout_task, new_timeout_task).abort();
427
428    let old_view_leader_key = task_state
429        .membership_coordinator
430        .membership_for_epoch(task_state.cur_epoch)
431        .await
432        .context(warn!("No stake table for epoch"))?
433        .leader(old_view_number)
434        .await?;
435
436    let consensus_reader = task_state.consensus.read().await;
437    consensus_reader
438        .metrics
439        .current_view
440        .set(usize::try_from(task_state.cur_view.u64()).unwrap());
441    let cur_view_time = Utc::now().timestamp();
442    if old_view_leader_key == task_state.public_key {
443        #[allow(clippy::cast_precision_loss)]
444        consensus_reader
445            .metrics
446            .view_duration_as_leader
447            .add_point((cur_view_time - task_state.cur_view_time) as f64);
448    }
449    task_state.cur_view_time = cur_view_time;
450
451    // Do the comparison before the subtraction to avoid potential overflow, since
452    // `last_decided_view` may be greater than `cur_view` if the node is catching up.
453    if usize::try_from(task_state.cur_view.u64()).unwrap()
454        > usize::try_from(consensus_reader.last_decided_view().u64()).unwrap()
455    {
456        consensus_reader
457            .metrics
458            .number_of_views_since_last_decide
459            .set(
460                usize::try_from(task_state.cur_view.u64()).unwrap()
461                    - usize::try_from(consensus_reader.last_decided_view().u64()).unwrap(),
462            );
463    }
464
465    broadcast_event(
466        Event {
467            view_number: old_view_number,
468            event: EventType::ViewFinished {
469                view_number: old_view_number,
470            },
471        },
472        &task_state.output_event_stream,
473    )
474    .await;
475    Ok(())
476}
477
478/// Handle a `Timeout` event.
479#[instrument(skip_all)]
480pub(crate) async fn handle_timeout<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
481    view_number: TYPES::View,
482    epoch: Option<TYPES::Epoch>,
483    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
484    task_state: &mut ConsensusTaskState<TYPES, I, V>,
485) -> Result<()> {
486    ensure!(
487        task_state.cur_view <= view_number,
488        "Timeout event is for an old view"
489    );
490
491    ensure!(
492        task_state
493            .membership_coordinator
494            .stake_table_for_epoch(epoch)
495            .await
496            .context(warn!("No stake table for epoch"))?
497            .has_stake(&task_state.public_key)
498            .await,
499        debug!("We were not chosen for the consensus committee for view {view_number}",)
500    );
501
502    let vote = TimeoutVote2::create_signed_vote(
503        TimeoutData2::<TYPES> {
504            view: view_number,
505            epoch,
506        },
507        view_number,
508        &task_state.public_key,
509        &task_state.private_key,
510        &task_state.upgrade_lock,
511    )
512    .await
513    .wrap()
514    .context(error!("Failed to sign TimeoutData"))?;
515
516    broadcast_event(Arc::new(HotShotEvent::TimeoutVoteSend(vote)), sender).await;
517    broadcast_event(
518        Event {
519            view_number,
520            event: EventType::ViewTimeout { view_number },
521        },
522        &task_state.output_event_stream,
523    )
524    .await;
525
526    tracing::error!(
527        "We did not receive evidence for view {view_number} in time, sending timeout vote for \
528         that view!"
529    );
530
531    broadcast_event(
532        Event {
533            view_number,
534            event: EventType::ReplicaViewTimeout { view_number },
535        },
536        &task_state.output_event_stream,
537    )
538    .await;
539
540    let leader = task_state
541        .membership_coordinator
542        .membership_for_epoch(task_state.cur_epoch)
543        .await
544        .context(warn!("No stake table for epoch"))?
545        .leader(view_number)
546        .await;
547
548    let consensus_reader = task_state.consensus.read().await;
549    consensus_reader.metrics.number_of_timeouts.add(1);
550    if leader? == task_state.public_key {
551        consensus_reader.metrics.number_of_timeouts_as_leader.add(1);
552    }
553
554    Ok(())
555}