hotshot_task_impls/consensus/
handlers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Duration};
8
9use async_broadcast::Sender;
10use chrono::Utc;
11use hotshot_types::{
12    data::{EpochNumber, ViewNumber},
13    event::{Event, EventType},
14    simple_certificate::EpochRootQuorumCertificateV2,
15    simple_vote::{EpochRootQuorumVote2, HasEpoch, QuorumVote2, TimeoutData2, TimeoutVote2},
16    traits::node_implementation::{NodeImplementation, NodeType},
17    utils::{EpochTransitionIndicator, is_epoch_root, is_epoch_transition, is_last_block},
18    vote::{HasViewNumber, Vote},
19};
20use hotshot_utils::anytrace::*;
21use tokio::{spawn, time::sleep};
22use tracing::instrument;
23use versions::EPOCH_VERSION;
24
25use super::ConsensusTaskState;
26use crate::{
27    events::HotShotEvent,
28    helpers::{broadcast_event, check_qc_state_cert_correspondence},
29    vote_collection::{handle_epoch_root_vote, handle_vote},
30};
31
32/// Handle a `QuorumVoteRecv` event.
33pub(crate) async fn handle_quorum_vote_recv<TYPES: NodeType, I: NodeImplementation<TYPES>>(
34    vote: &QuorumVote2<TYPES>,
35    event: Arc<HotShotEvent<TYPES>>,
36    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
37    task_state: &mut ConsensusTaskState<TYPES, I>,
38) -> Result<()> {
39    let in_transition = task_state
40        .consensus
41        .read()
42        .await
43        .is_high_qc_for_epoch_transition();
44    let epoch_membership = task_state
45        .membership_coordinator
46        .membership_for_epoch(vote.data.epoch)
47        .await
48        .context(warn!("No stake table for epoch"))?;
49
50    let we_are_leader =
51        epoch_membership.leader(vote.view_number() + 1).await? == task_state.public_key;
52    ensure!(
53        in_transition || we_are_leader,
54        info!(
55            "We are not the leader for view {} and we are not in the epoch transition",
56            vote.view_number() + 1
57        )
58    );
59
60    let transition_indicator = if in_transition {
61        EpochTransitionIndicator::InTransition
62    } else {
63        EpochTransitionIndicator::NotInTransition
64    };
65    handle_vote(
66        &mut task_state.vote_collectors,
67        vote,
68        task_state.public_key.clone(),
69        &epoch_membership,
70        task_state.id,
71        &event,
72        sender,
73        &task_state.upgrade_lock,
74        transition_indicator.clone(),
75    )
76    .await?;
77
78    if vote.epoch().is_some()
79        && vote
80            .data
81            .block_number
82            .is_some_and(|b| is_epoch_transition(b, task_state.epoch_height))
83    {
84        // If the vote sender belongs to the next epoch, collect it separately to form the second QC
85        let has_stake = epoch_membership
86            .next_epoch_stake_table()
87            .await?
88            .has_stake(&vote.signing_key())
89            .await;
90        if has_stake {
91            handle_vote(
92                &mut task_state.next_epoch_vote_collectors,
93                &vote.clone().into(),
94                task_state.public_key.clone(),
95                // We eventually verify in `handle_vote` that we are the leader before assembling the certificate here,
96                // so we must request the full randomized stake table.
97                //
98                // I'm not sure this is really necessary, but I've opted not to modify the logic.
99                &epoch_membership.next_epoch().await?.clone(),
100                task_state.id,
101                &event,
102                sender,
103                &task_state.upgrade_lock,
104                transition_indicator,
105            )
106            .await?;
107        }
108    }
109
110    Ok(())
111}
112
113/// Handle a `QuorumVoteRecv` event.
114pub(crate) async fn handle_epoch_root_quorum_vote_recv<
115    TYPES: NodeType,
116    I: NodeImplementation<TYPES>,
117>(
118    vote: &EpochRootQuorumVote2<TYPES>,
119    event: Arc<HotShotEvent<TYPES>>,
120    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
121    task_state: &mut ConsensusTaskState<TYPES, I>,
122) -> Result<()> {
123    ensure!(
124        vote.vote
125            .data
126            .block_number
127            .is_some_and(|bn| is_epoch_root(bn, task_state.epoch_height)),
128        error!("Received epoch root quorum vote for non epoch root block.")
129    );
130
131    let epoch_membership = task_state
132        .membership_coordinator
133        .membership_for_epoch(vote.vote.data.epoch)
134        .await
135        .context(warn!("No stake table for epoch"))?;
136
137    let we_are_leader =
138        epoch_membership.leader(vote.view_number() + 1).await? == task_state.public_key;
139    ensure!(
140        we_are_leader,
141        info!("We are not the leader for view {}", vote.view_number() + 1)
142    );
143
144    handle_epoch_root_vote(
145        &mut task_state.epoch_root_vote_collectors,
146        vote,
147        task_state.public_key.clone(),
148        &epoch_membership,
149        task_state.id,
150        &event,
151        sender,
152        &task_state.upgrade_lock,
153    )
154    .await?;
155
156    Ok(())
157}
158
159/// Handle a `TimeoutVoteRecv` event.
160pub(crate) async fn handle_timeout_vote_recv<TYPES: NodeType, I: NodeImplementation<TYPES>>(
161    vote: &TimeoutVote2<TYPES>,
162    event: Arc<HotShotEvent<TYPES>>,
163    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
164    task_state: &mut ConsensusTaskState<TYPES, I>,
165) -> Result<()> {
166    let epoch_membership = task_state
167        .membership_coordinator
168        .membership_for_epoch(task_state.cur_epoch)
169        .await
170        .context(warn!("No stake table for epoch"))?;
171    // Are we the leader for this view?
172    ensure!(
173        epoch_membership.leader(vote.view_number() + 1).await? == task_state.public_key,
174        info!("We are not the leader for view {}", vote.view_number() + 1)
175    );
176
177    handle_vote(
178        &mut task_state.timeout_vote_collectors,
179        vote,
180        task_state.public_key.clone(),
181        &task_state
182            .membership_coordinator
183            .membership_for_epoch(vote.data.epoch)
184            .await?,
185        task_state.id,
186        &event,
187        sender,
188        &task_state.upgrade_lock,
189        EpochTransitionIndicator::NotInTransition,
190    )
191    .await?;
192
193    Ok(())
194}
195
196/// Send an event to the next leader containing the highest QC we have
197/// This is a necessary part of HotStuff 2 but not the original HotStuff
198///
199/// #Errors
200/// Returns and error if we can't get the version or the version doesn't
201/// yet support HS 2
202pub async fn send_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
203    new_view_number: ViewNumber,
204    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
205    task_state: &mut ConsensusTaskState<TYPES, I>,
206) -> Result<()> {
207    let version = task_state.upgrade_lock.version(new_view_number)?;
208    ensure!(
209        version >= EPOCH_VERSION,
210        debug!("HotStuff 2 upgrade not yet in effect")
211    );
212
213    let consensus_reader = task_state.consensus.read().await;
214    let high_qc = consensus_reader.high_qc().clone();
215    let is_eqc = high_qc
216        .data
217        .block_number
218        .is_some_and(|b| is_last_block(b, task_state.epoch_height));
219    let is_epoch_root = high_qc
220        .data
221        .block_number
222        .is_some_and(|b| is_epoch_root(b, task_state.epoch_height));
223    let state_cert = if is_epoch_root {
224        consensus_reader.state_cert().cloned()
225    } else {
226        None
227    };
228    drop(consensus_reader);
229
230    if is_eqc {
231        let maybe_next_epoch_high_qc = task_state
232            .consensus
233            .read()
234            .await
235            .next_epoch_high_qc()
236            .cloned();
237        ensure!(
238            maybe_next_epoch_high_qc
239                .as_ref()
240                .is_some_and(|neqc| neqc.data.leaf_commit == high_qc.data.leaf_commit),
241            "We've seen an extended QC but we don't have a corresponding next epoch extended QC"
242        );
243
244        tracing::debug!(
245            "Broadcasting Extended QC for view {} and epoch {:?}, my id {}.",
246            high_qc.view_number(),
247            high_qc.epoch(),
248            task_state.id
249        );
250        broadcast_event(
251            Arc::new(HotShotEvent::ExtendedQcSend(
252                high_qc,
253                maybe_next_epoch_high_qc.unwrap(),
254                task_state.public_key.clone(),
255            )),
256            sender,
257        )
258        .await;
259    } else {
260        let leader = task_state
261            .membership_coordinator
262            .membership_for_epoch(task_state.cur_epoch)
263            .await?
264            .leader(new_view_number)
265            .await?;
266
267        let (high_qc, maybe_next_epoch_qc) = if high_qc
268            .data
269            .block_number
270            .is_some_and(|b| is_epoch_transition(b, task_state.epoch_height))
271        {
272            let Some((qc, next_epoch_qc)) =
273                task_state.consensus.read().await.transition_qc().cloned()
274            else {
275                bail!("We don't have a transition QC");
276            };
277            ensure!(
278                next_epoch_qc.data.leaf_commit == qc.data.leaf_commit,
279                "Transition QC is invalid because leaf commits are not equal."
280            );
281            (qc, Some(next_epoch_qc))
282        } else {
283            (high_qc, None)
284        };
285
286        if is_epoch_root {
287            // For epoch root QC, we are sending high QC and state cert
288            let Some(state_cert) = state_cert else {
289                bail!(
290                    "We are sending an epoch root QC but we don't have the corresponding state \
291                     cert."
292                );
293            };
294            ensure!(
295                check_qc_state_cert_correspondence(&high_qc, &state_cert, task_state.epoch_height),
296                "We are sending an epoch root QC but we don't have the corresponding state cert."
297            );
298
299            tracing::trace!(
300                "Sending epoch root QC for view {}, height {:?}",
301                high_qc.view_number(),
302                high_qc.data.block_number
303            );
304            broadcast_event(
305                Arc::new(HotShotEvent::EpochRootQcSend(
306                    EpochRootQuorumCertificateV2 {
307                        qc: high_qc,
308                        state_cert,
309                    },
310                    leader,
311                    task_state.public_key.clone(),
312                )),
313                sender,
314            )
315            .await;
316        } else {
317            tracing::trace!(
318                "Sending high QC for view {}, height {:?}",
319                high_qc.view_number(),
320                high_qc.data.block_number
321            );
322            broadcast_event(
323                Arc::new(HotShotEvent::HighQcSend(
324                    high_qc,
325                    maybe_next_epoch_qc,
326                    leader,
327                    task_state.public_key.clone(),
328                )),
329                sender,
330            )
331            .await;
332        }
333    }
334    Ok(())
335}
336
337/// Handle a `ViewChange` event.
338#[instrument(skip_all)]
339pub(crate) async fn handle_view_change<TYPES: NodeType, I: NodeImplementation<TYPES>>(
340    new_view_number: ViewNumber,
341    epoch_number: Option<EpochNumber>,
342    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
343    task_state: &mut ConsensusTaskState<TYPES, I>,
344) -> Result<()> {
345    if epoch_number > task_state.cur_epoch {
346        task_state.cur_epoch = epoch_number;
347        if let Some(new_epoch) = epoch_number {
348            let _ = task_state.consensus.write().await.update_epoch(new_epoch);
349            tracing::info!("Progress: entered epoch {:>6}", *new_epoch);
350        }
351    }
352
353    ensure!(
354        new_view_number > task_state.cur_view,
355        "New view is not larger than the current view"
356    );
357
358    let old_view_number = task_state.cur_view;
359    tracing::debug!("Updating view from {old_view_number} to {new_view_number}");
360
361    if *old_view_number / 100 != *new_view_number / 100 {
362        tracing::info!("Progress: entered view {:>6}", *new_view_number);
363    }
364
365    // Send our high qc to the next leader immediately upon finishing a view.
366    // Part of HotStuff 2
367    let _ = send_high_qc(new_view_number, sender, task_state)
368        .await
369        .inspect_err(|e| {
370            tracing::debug!("High QC sending failed with error: {e:?}");
371        });
372
373    // Move this node to the next view
374    task_state.cur_view = new_view_number;
375    task_state
376        .consensus
377        .write()
378        .await
379        .update_view(new_view_number)?;
380
381    // If we have a decided upgrade certificate, the protocol version may also have been upgraded.
382    if let Some(cert) = task_state.upgrade_lock.decided_upgrade_cert()
383        && new_view_number == cert.data.new_version_first_view
384    {
385        tracing::error!("Version upgraded based on a decided upgrade cert: {cert:?}");
386    }
387
388    // Spawn a timeout task if we did actually update view
389    let timeout = task_state.timeout;
390    let new_timeout_task = spawn({
391        let stream = sender.clone();
392        let view_number = new_view_number;
393        async move {
394            sleep(Duration::from_millis(timeout)).await;
395            broadcast_event(
396                Arc::new(HotShotEvent::Timeout(
397                    ViewNumber::new(*view_number),
398                    epoch_number,
399                )),
400                &stream,
401            )
402            .await;
403        }
404    });
405
406    // Cancel the old timeout task
407    std::mem::replace(&mut task_state.timeout_task, new_timeout_task).abort();
408
409    let old_view_leader_key = task_state
410        .membership_coordinator
411        .membership_for_epoch(task_state.cur_epoch)
412        .await
413        .context(warn!("No stake table for epoch"))?
414        .leader(old_view_number)
415        .await?;
416
417    let consensus_reader = task_state.consensus.read().await;
418    consensus_reader
419        .metrics
420        .current_view
421        .set(usize::try_from(task_state.cur_view.u64()).unwrap());
422    let cur_view_time = Utc::now().timestamp();
423    if old_view_leader_key == task_state.public_key {
424        #[allow(clippy::cast_precision_loss)]
425        consensus_reader
426            .metrics
427            .view_duration_as_leader
428            .add_point((cur_view_time - task_state.cur_view_time) as f64);
429    }
430    task_state.cur_view_time = cur_view_time;
431
432    // Do the comparison before the subtraction to avoid potential overflow, since
433    // `last_decided_view` may be greater than `cur_view` if the node is catching up.
434    if usize::try_from(task_state.cur_view.u64()).unwrap()
435        > usize::try_from(consensus_reader.last_decided_view().u64()).unwrap()
436    {
437        consensus_reader
438            .metrics
439            .number_of_views_since_last_decide
440            .set(
441                usize::try_from(task_state.cur_view.u64()).unwrap()
442                    - usize::try_from(consensus_reader.last_decided_view().u64()).unwrap(),
443            );
444    }
445
446    broadcast_event(
447        Event {
448            view_number: old_view_number,
449            event: EventType::ViewFinished {
450                view_number: old_view_number,
451            },
452        },
453        &task_state.output_event_stream,
454    )
455    .await;
456    Ok(())
457}
458
459/// Handle a `Timeout` event.
460#[instrument(skip_all)]
461pub(crate) async fn handle_timeout<TYPES: NodeType, I: NodeImplementation<TYPES>>(
462    view_number: ViewNumber,
463    epoch: Option<EpochNumber>,
464    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
465    task_state: &mut ConsensusTaskState<TYPES, I>,
466) -> Result<()> {
467    ensure!(
468        task_state.cur_view <= view_number,
469        "Timeout event is for an old view"
470    );
471
472    ensure!(
473        task_state
474            .membership_coordinator
475            .stake_table_for_epoch(epoch)
476            .await
477            .context(warn!("No stake table for epoch"))?
478            .has_stake(&task_state.public_key)
479            .await,
480        debug!("We were not chosen for the consensus committee for view {view_number}",)
481    );
482
483    let vote = TimeoutVote2::create_signed_vote(
484        TimeoutData2 {
485            view: view_number,
486            epoch,
487        },
488        view_number,
489        &task_state.public_key,
490        &task_state.private_key,
491        &task_state.upgrade_lock,
492    )
493    .await
494    .wrap()
495    .context(error!("Failed to sign TimeoutData"))?;
496
497    broadcast_event(Arc::new(HotShotEvent::TimeoutVoteSend(vote)), sender).await;
498    broadcast_event(
499        Event {
500            view_number,
501            event: EventType::ViewTimeout { view_number },
502        },
503        &task_state.output_event_stream,
504    )
505    .await;
506
507    tracing::error!(
508        "We did not receive evidence for view {view_number} in time, sending timeout vote for \
509         that view!"
510    );
511
512    broadcast_event(
513        Event {
514            view_number,
515            event: EventType::ReplicaViewTimeout { view_number },
516        },
517        &task_state.output_event_stream,
518    )
519    .await;
520
521    let leader = task_state
522        .membership_coordinator
523        .membership_for_epoch(task_state.cur_epoch)
524        .await
525        .context(warn!("No stake table for epoch"))?
526        .leader(view_number)
527        .await;
528
529    let consensus_reader = task_state.consensus.read().await;
530    consensus_reader.metrics.number_of_timeouts.add(1);
531    if leader.as_ref().is_ok_and(|l| *l == task_state.public_key) {
532        consensus_reader.metrics.number_of_timeouts_as_leader.add(1);
533    }
534    drop(consensus_reader);
535    task_state
536        .consensus
537        .write()
538        .await
539        .update_validator_participation(
540            leader?,
541            task_state.cur_epoch.ok_or(debug!("No epoch"))?,
542            false,
543        );
544
545    Ok(())
546}