hotshot_task_impls/quorum_vote/
handlers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Instant};
8
9use async_broadcast::{InactiveReceiver, Sender};
10use chrono::Utc;
11use committable::Committable;
12use hotshot_types::{
13    consensus::OuterConsensus,
14    data::{Leaf2, QuorumProposalWrapper, VidDisperseShare},
15    drb::{DrbResult, INITIAL_DRB_RESULT},
16    epoch_membership::{EpochMembership, EpochMembershipCoordinator},
17    event::{Event, EventType},
18    message::{Proposal, UpgradeLock},
19    simple_vote::{EpochRootQuorumVote, LightClientStateUpdateVote, QuorumData2, QuorumVote2},
20    storage_metrics::StorageMetricsValue,
21    traits::{
22        block_contents::BlockHeader,
23        election::Membership,
24        node_implementation::{ConsensusTime, NodeImplementation, NodeType},
25        signature_key::{SignatureKey, StateSignatureKey},
26        storage::Storage,
27        ValidatedState,
28    },
29    utils::{
30        epoch_from_block_number, is_epoch_transition, is_last_block, is_transition_block,
31        option_epoch_from_block_number,
32    },
33    vote::HasViewNumber,
34};
35use hotshot_utils::anytrace::*;
36use tracing::instrument;
37use vbs::version::StaticVersionType;
38
39use super::QuorumVoteTaskState;
40use crate::{
41    events::HotShotEvent,
42    helpers::{
43        broadcast_event, decide_from_proposal, decide_from_proposal_2, fetch_proposal,
44        handle_drb_result, LeafChainTraversalOutcome,
45    },
46    quorum_vote::Versions,
47};
48
49/// Store the DRB result from the computation task to the shared `results` table.
50///
51/// Returns the result if it exists.
52async fn get_computed_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
53    epoch_number: TYPES::Epoch,
54    task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
55) -> Option<DrbResult> {
56    // Return the result if it's already in the table.
57    task_state
58        .consensus
59        .read()
60        .await
61        .drb_results
62        .results
63        .get(&epoch_number)
64        .cloned()
65}
66
67/// Verify the DRB result from the proposal for the next epoch if this is the last block of the
68/// current epoch.
69///
70/// Uses the result from `start_drb_task`.
71///
72/// Returns an error if we should not vote.
73async fn verify_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
74    proposal: &QuorumProposalWrapper<TYPES>,
75    task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
76) -> Result<()> {
77    // Skip if this is not the expected block.
78    if task_state.epoch_height == 0
79        || !is_epoch_transition(
80            proposal.block_header().block_number(),
81            task_state.epoch_height,
82        )
83    {
84        tracing::debug!("Skipping DRB result verification");
85        return Ok(());
86    }
87
88    // #3967 REVIEW NOTE: Check if this is the right way to decide if we're doing epochs
89    // Alternatively, should we just return Err() if epochs aren't happening here? Or can we assume
90    // that epochs are definitely happening by virtue of getting here?
91    let epoch = option_epoch_from_block_number::<TYPES>(
92        task_state
93            .upgrade_lock
94            .epochs_enabled(proposal.view_number())
95            .await,
96        proposal.block_header().block_number(),
97        task_state.epoch_height,
98    );
99
100    let proposal_result = proposal
101        .next_drb_result()
102        .context(info!("Proposal is missing the DRB result."))?;
103
104    if let Some(epoch_val) = epoch {
105        let has_stake_current_epoch = task_state
106            .membership
107            .stake_table_for_epoch(epoch)
108            .await
109            .context(warn!("No stake table for epoch {epoch_val}"))?
110            .has_stake(&task_state.public_key)
111            .await;
112
113        if has_stake_current_epoch {
114            let computed_result = get_computed_drb_result(epoch_val + 1, task_state)
115                .await
116                .context(warn!("DRB result not found"))?;
117
118            ensure!(
119                proposal_result == computed_result,
120                warn!(
121                    "Our calculated DRB result is {computed_result:?}, which does not match the \
122                     proposed DRB result of {proposal_result:?}"
123                )
124            );
125        }
126
127        Ok(())
128    } else {
129        Err(error!("Epochs are not available"))
130    }
131}
132
133/// Store the DRB result for the next epoch if we received it in a decided leaf.
134async fn store_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
135    task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
136    decided_leaf: &Leaf2<TYPES>,
137) -> Result<()> {
138    if task_state.epoch_height == 0 {
139        tracing::info!("Epoch height is 0, skipping DRB storage.");
140        return Ok(());
141    }
142
143    let decided_block_number = decided_leaf.block_header().block_number();
144    let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
145        decided_block_number,
146        task_state.epoch_height,
147    ));
148    // Skip storing the received result if this is not the transition block.
149    if is_transition_block(decided_block_number, task_state.epoch_height) {
150        if let Some(result) = decided_leaf.next_drb_result {
151            // We don't need to check value existence and consistency because it should be
152            // impossible to decide on a block with different DRB results.
153            handle_drb_result::<TYPES, I>(
154                task_state.membership.membership(),
155                current_epoch_number + 1,
156                &task_state.storage,
157                &task_state.consensus,
158                result,
159            )
160            .await;
161        } else {
162            bail!("The last block of the epoch is decided but doesn't contain a DRB result.");
163        }
164    }
165    Ok(())
166}
167
168/// Handles the `QuorumProposalValidated` event.
169#[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number()))]
170pub(crate) async fn handle_quorum_proposal_validated<
171    TYPES: NodeType,
172    I: NodeImplementation<TYPES>,
173    V: Versions,
174>(
175    proposal: &QuorumProposalWrapper<TYPES>,
176    task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
177    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
178) -> Result<()> {
179    let version = task_state
180        .upgrade_lock
181        .version(proposal.view_number())
182        .await?;
183
184    if version >= V::Epochs::VERSION {
185        // Don't vote if the DRB result verification fails.
186        verify_drb_result(proposal, task_state).await?;
187    }
188
189    let LeafChainTraversalOutcome {
190        new_locked_view_number,
191        new_decided_view_number,
192        new_decide_qc,
193        leaf_views,
194        included_txns,
195        decided_upgrade_cert,
196    } = if version >= V::Epochs::VERSION {
197        // Skip the decide rule for the last block of the epoch.  This is so
198        // that we do not decide the block with epoch_height -2 before we enter the new epoch
199        if !is_last_block(
200            proposal.block_header().block_number(),
201            task_state.epoch_height,
202        ) {
203            decide_from_proposal_2::<TYPES, I, V>(
204                proposal,
205                OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
206                Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
207                &task_state.public_key,
208                version >= V::Epochs::VERSION,
209                &task_state.membership,
210                &task_state.storage,
211                &task_state.upgrade_lock,
212            )
213            .await
214        } else {
215            LeafChainTraversalOutcome::default()
216        }
217    } else {
218        decide_from_proposal::<TYPES, I, V>(
219            proposal,
220            OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
221            Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
222            &task_state.public_key,
223            version >= V::Epochs::VERSION,
224            task_state.membership.membership(),
225            &task_state.storage,
226            task_state.epoch_height,
227            &task_state.upgrade_lock,
228        )
229        .await
230    };
231
232    if let (Some(cert), Some(_)) = (decided_upgrade_cert.clone(), new_decided_view_number) {
233        let mut decided_certificate_lock = task_state
234            .upgrade_lock
235            .decided_upgrade_certificate
236            .write()
237            .await;
238        *decided_certificate_lock = Some(cert.clone());
239        drop(decided_certificate_lock);
240
241        if cert.data.new_version == V::Epochs::VERSION {
242            let epoch_height = task_state.consensus.read().await.epoch_height;
243            let first_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
244                proposal.block_header().block_number(),
245                epoch_height,
246            ));
247
248            tracing::debug!("Calling set_first_epoch for epoch {first_epoch_number:?}");
249            task_state
250                .membership
251                .membership()
252                .write()
253                .await
254                .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
255
256            broadcast_event(
257                Arc::new(HotShotEvent::SetFirstEpoch(
258                    cert.data.new_version_first_view,
259                    first_epoch_number,
260                )),
261                event_sender,
262            )
263            .await;
264        }
265
266        let _ = task_state
267            .storage
268            .update_decided_upgrade_certificate(Some(cert.clone()))
269            .await;
270    }
271
272    let mut consensus_writer = task_state.consensus.write().await;
273    if let Some(locked_view_number) = new_locked_view_number {
274        let _ = consensus_writer.update_locked_view(locked_view_number);
275    }
276
277    #[allow(clippy::cast_precision_loss)]
278    if let Some(decided_view_number) = new_decided_view_number {
279        // Bring in the cleanup crew. When a new decide is indeed valid, we need to clear out old memory.
280
281        let old_decided_view = consensus_writer.last_decided_view();
282        consensus_writer.collect_garbage(old_decided_view, decided_view_number);
283
284        // Set the new decided view.
285        consensus_writer
286            .update_last_decided_view(decided_view_number)
287            .context(|e| {
288                warn!("`update_last_decided_view` failed; this should never happen. Error: {e}")
289            })?;
290
291        consensus_writer
292            .metrics
293            .last_decided_time
294            .set(Utc::now().timestamp().try_into().unwrap());
295        consensus_writer.metrics.invalid_qc.set(0);
296        consensus_writer
297            .metrics
298            .last_decided_view
299            .set(usize::try_from(consensus_writer.last_decided_view().u64()).unwrap());
300        let cur_number_of_views_per_decide_event =
301            *proposal.view_number() - consensus_writer.last_decided_view().u64();
302        consensus_writer
303            .metrics
304            .number_of_views_per_decide_event
305            .add_point(cur_number_of_views_per_decide_event as f64);
306
307        // We don't need to hold this while we broadcast
308        drop(consensus_writer);
309
310        for leaf_info in &leaf_views {
311            tracing::info!(
312                "Sending decide for view {:?} at height {:?}",
313                leaf_info.leaf.view_number(),
314                leaf_info.leaf.block_header().block_number(),
315            );
316        }
317
318        // Send an update to everyone saying that we've reached a decide
319        broadcast_event(
320            Event {
321                view_number: decided_view_number,
322                event: EventType::Decide {
323                    leaf_chain: Arc::new(leaf_views.clone()),
324                    // This is never none if we've reached a new decide, so this is safe to unwrap.
325                    qc: Arc::new(new_decide_qc.clone().unwrap()),
326                    block_size: included_txns.map(|txns| txns.len().try_into().unwrap()),
327                },
328            },
329            &task_state.output_event_stream,
330        )
331        .await;
332
333        tracing::debug!(
334            "Successfully sent decide event, leaf views: {:?}, leaf views len: {:?}, qc view: {:?}",
335            decided_view_number,
336            leaf_views.len(),
337            new_decide_qc.as_ref().unwrap().view_number()
338        );
339
340        if version >= V::Epochs::VERSION {
341            for leaf_view in leaf_views {
342                store_drb_result(task_state, &leaf_view.leaf).await?;
343            }
344        }
345    }
346
347    Ok(())
348}
349
350/// Updates the shared consensus state with the new voting data.
351#[instrument(skip_all, target = "VoteDependencyHandle", fields(view = *view_number))]
352#[allow(clippy::too_many_arguments)]
353pub(crate) async fn update_shared_state<
354    TYPES: NodeType,
355    I: NodeImplementation<TYPES>,
356    V: Versions,
357>(
358    consensus: OuterConsensus<TYPES>,
359    sender: Sender<Arc<HotShotEvent<TYPES>>>,
360    receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
361    membership: EpochMembershipCoordinator<TYPES>,
362    public_key: TYPES::SignatureKey,
363    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
364    upgrade_lock: UpgradeLock<TYPES, V>,
365    view_number: TYPES::View,
366    instance_state: Arc<TYPES::InstanceState>,
367    proposed_leaf: &Leaf2<TYPES>,
368    vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
369    parent_view_number: Option<TYPES::View>,
370    epoch_height: u64,
371) -> Result<()> {
372    let justify_qc = &proposed_leaf.justify_qc();
373
374    let consensus_reader = consensus.read().await;
375    // Try to find the validated view within the validated state map. This will be present
376    // if we have the saved leaf, but if not we'll get it when we fetch_proposal.
377    let mut maybe_validated_view = parent_view_number.and_then(|view_number| {
378        consensus_reader
379            .validated_state_map()
380            .get(&view_number)
381            .cloned()
382    });
383
384    // Justify qc's leaf commitment should be the same as the parent's leaf commitment.
385    let mut maybe_parent = consensus_reader
386        .saved_leaves()
387        .get(&justify_qc.data.leaf_commit)
388        .cloned();
389
390    drop(consensus_reader);
391
392    maybe_parent = match maybe_parent {
393        Some(p) => Some(p),
394        None => {
395            match fetch_proposal(
396                justify_qc,
397                sender.clone(),
398                receiver.activate_cloned(),
399                membership.clone(),
400                OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
401                public_key.clone(),
402                private_key.clone(),
403                &upgrade_lock,
404                epoch_height,
405            )
406            .await
407            .ok()
408            {
409                Some((leaf, view)) => {
410                    maybe_validated_view = Some(view);
411                    Some(leaf)
412                },
413                None => None,
414            }
415        },
416    };
417
418    let parent = maybe_parent.context(info!(
419        "Proposal's parent missing from storage with commitment: {:?}, proposal view {}",
420        justify_qc.data.leaf_commit,
421        proposed_leaf.view_number(),
422    ))?;
423
424    let Some(validated_view) = maybe_validated_view else {
425        bail!("Failed to fetch view for parent, parent view {parent_view_number:?}");
426    };
427
428    let (Some(parent_state), _) = validated_view.state_and_delta() else {
429        bail!("Parent state not found! Consensus internally inconsistent");
430    };
431
432    let version = upgrade_lock.version(view_number).await?;
433
434    let now = Instant::now();
435    let (validated_state, state_delta) = parent_state
436        .validate_and_apply_header(
437            &instance_state,
438            &parent,
439            &proposed_leaf.block_header().clone(),
440            vid_share.data.payload_byte_len(),
441            version,
442            *view_number,
443        )
444        .await
445        .wrap()
446        .context(warn!("Block header doesn't extend the proposal!"))?;
447    let validation_duration = now.elapsed();
448    tracing::debug!("Validation time: {validation_duration:?}");
449
450    let now = Instant::now();
451    // Now that we've rounded everyone up, we need to update the shared state
452    let mut consensus_writer = consensus.write().await;
453
454    if let Err(e) = consensus_writer.update_leaf(
455        proposed_leaf.clone(),
456        Arc::new(validated_state),
457        Some(Arc::new(state_delta)),
458    ) {
459        tracing::trace!("{e:?}");
460    }
461    let update_leaf_duration = now.elapsed();
462
463    consensus_writer
464        .metrics
465        .validate_and_apply_header_duration
466        .add_point(validation_duration.as_secs_f64());
467    consensus_writer
468        .metrics
469        .update_leaf_duration
470        .add_point(update_leaf_duration.as_secs_f64());
471    drop(consensus_writer);
472    tracing::debug!("update_leaf time: {update_leaf_duration:?}");
473
474    Ok(())
475}
476
477/// Submits the `QuorumVoteSend` event if all the dependencies are met.
478#[instrument(skip_all, fields(name = "Submit quorum vote", level = "error"))]
479#[allow(clippy::too_many_arguments)]
480pub(crate) async fn submit_vote<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
481    sender: Sender<Arc<HotShotEvent<TYPES>>>,
482    membership: EpochMembership<TYPES>,
483    public_key: TYPES::SignatureKey,
484    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
485    upgrade_lock: UpgradeLock<TYPES, V>,
486    view_number: TYPES::View,
487    storage: I::Storage,
488    storage_metrics: Arc<StorageMetricsValue>,
489    leaf: Leaf2<TYPES>,
490    vid_share: Proposal<TYPES, VidDisperseShare<TYPES>>,
491    extended_vote: bool,
492    epoch_root_vote: bool,
493    epoch_height: u64,
494    state_private_key: &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
495    stake_table_capacity: usize,
496) -> Result<()> {
497    let committee_member_in_current_epoch = membership.has_stake(&public_key).await;
498    // If the proposed leaf is for the last block in the epoch and the node is part of the quorum committee
499    // in the next epoch, the node should vote to achieve the double quorum.
500    let committee_member_in_next_epoch = leaf.with_epoch
501        && is_epoch_transition(leaf.height(), epoch_height)
502        && membership
503            .next_epoch_stake_table()
504            .await?
505            .has_stake(&public_key)
506            .await;
507
508    ensure!(
509        committee_member_in_current_epoch || committee_member_in_next_epoch,
510        info!("We were not chosen for quorum committee on {view_number}")
511    );
512
513    let height = if membership.epoch().is_some() {
514        Some(leaf.height())
515    } else {
516        None
517    };
518
519    // Create and send the vote.
520    let vote = QuorumVote2::<TYPES>::create_signed_vote(
521        QuorumData2 {
522            leaf_commit: leaf.commit(),
523            epoch: membership.epoch(),
524            block_number: height,
525        },
526        view_number,
527        &public_key,
528        &private_key,
529        &upgrade_lock,
530    )
531    .await
532    .wrap()
533    .context(error!("Failed to sign vote. This should never happen."))?;
534    let now = Instant::now();
535    // Add to the storage.
536    storage
537        .append_vid_general(&vid_share)
538        .await
539        .wrap()
540        .context(error!("Failed to store VID share"))?;
541    let append_vid_duration = now.elapsed();
542    storage_metrics
543        .append_vid_duration
544        .add_point(append_vid_duration.as_secs_f64());
545    tracing::debug!("append_vid_general time: {append_vid_duration:?}");
546
547    // Make epoch root vote
548
549    let epoch_enabled = upgrade_lock.epochs_enabled(view_number).await;
550    if extended_vote && epoch_enabled {
551        tracing::debug!("sending extended vote to everybody",);
552        broadcast_event(
553            Arc::new(HotShotEvent::ExtendedQuorumVoteSend(vote)),
554            &sender,
555        )
556        .await;
557    } else if epoch_root_vote && epoch_enabled {
558        tracing::debug!(
559            "sending epoch root vote to next quorum leader {:?}",
560            vote.view_number() + 1
561        );
562        let light_client_state = leaf
563            .block_header()
564            .get_light_client_state(view_number)
565            .wrap()
566            .context(error!("Failed to generate light client state"))?;
567        let next_stake_table = membership
568            .next_epoch_stake_table()
569            .await?
570            .stake_table()
571            .await;
572        let next_stake_table_state = next_stake_table
573            .commitment(stake_table_capacity)
574            .wrap()
575            .context(error!("Failed to compute stake table commitment"))?;
576        let signature = <TYPES::StateSignatureKey as StateSignatureKey>::sign_state(
577            state_private_key,
578            &light_client_state,
579            &next_stake_table_state,
580        )
581        .wrap()
582        .context(error!("Failed to sign the light client state"))?;
583        let state_vote = LightClientStateUpdateVote {
584            epoch: TYPES::Epoch::new(epoch_from_block_number(leaf.height(), epoch_height)),
585            light_client_state,
586            next_stake_table_state,
587            signature,
588        };
589        broadcast_event(
590            Arc::new(HotShotEvent::EpochRootQuorumVoteSend(EpochRootQuorumVote {
591                vote,
592                state_vote,
593            })),
594            &sender,
595        )
596        .await;
597    } else {
598        tracing::debug!(
599            "sending vote to next quorum leader {:?}",
600            vote.view_number() + 1
601        );
602        broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await;
603    }
604
605    Ok(())
606}