hotshot_task_impls/quorum_vote/
handlers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Instant};
8
9use async_broadcast::{InactiveReceiver, Sender};
10use chrono::Utc;
11use committable::Committable;
12use hotshot_types::{
13    consensus::OuterConsensus,
14    data::{Leaf2, QuorumProposalWrapper, VidDisperseShare},
15    drb::INITIAL_DRB_RESULT,
16    epoch_membership::{EpochMembership, EpochMembershipCoordinator},
17    event::{Event, EventType},
18    message::{Proposal, UpgradeLock},
19    simple_vote::{EpochRootQuorumVote, LightClientStateUpdateVote, QuorumData2, QuorumVote2},
20    storage_metrics::StorageMetricsValue,
21    traits::{
22        block_contents::BlockHeader,
23        election::Membership,
24        node_implementation::{ConsensusTime, NodeImplementation, NodeType},
25        signature_key::{
26            LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StateSignatureKey,
27        },
28        storage::Storage,
29        ValidatedState,
30    },
31    utils::{epoch_from_block_number, is_epoch_transition, is_last_block, is_transition_block},
32    vote::HasViewNumber,
33};
34use hotshot_utils::anytrace::*;
35use tracing::instrument;
36use vbs::version::StaticVersionType;
37
38use super::QuorumVoteTaskState;
39use crate::{
40    events::HotShotEvent,
41    helpers::{
42        broadcast_event, decide_from_proposal, decide_from_proposal_2, derive_signed_state_digest,
43        fetch_proposal, handle_drb_result, LeafChainTraversalOutcome,
44    },
45    quorum_vote::Versions,
46};
47
48/// Store the DRB result for the next epoch if we received it in a decided leaf.
49async fn store_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
50    task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
51    decided_leaf: &Leaf2<TYPES>,
52) -> Result<()> {
53    if task_state.epoch_height == 0 {
54        tracing::info!("Epoch height is 0, skipping DRB storage.");
55        return Ok(());
56    }
57
58    let decided_block_number = decided_leaf.block_header().block_number();
59    let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
60        decided_block_number,
61        task_state.epoch_height,
62    ));
63    // Skip storing the received result if this is not the transition block.
64    if is_transition_block(decided_block_number, task_state.epoch_height) {
65        if let Some(result) = decided_leaf.next_drb_result {
66            // We don't need to check value existence and consistency because it should be
67            // impossible to decide on a block with different DRB results.
68            handle_drb_result::<TYPES, I>(
69                task_state.membership.membership(),
70                current_epoch_number + 1,
71                &task_state.storage,
72                result,
73            )
74            .await;
75        } else {
76            bail!("The last block of the epoch is decided but doesn't contain a DRB result.");
77        }
78    }
79    Ok(())
80}
81
82/// Handles the `QuorumProposalValidated` event.
83#[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number()))]
84pub(crate) async fn handle_quorum_proposal_validated<
85    TYPES: NodeType,
86    I: NodeImplementation<TYPES>,
87    V: Versions,
88>(
89    proposal: &QuorumProposalWrapper<TYPES>,
90    task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
91    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
92) -> Result<()> {
93    let version = task_state
94        .upgrade_lock
95        .version(proposal.view_number())
96        .await?;
97
98    let LeafChainTraversalOutcome {
99        new_locked_view_number,
100        new_decided_view_number,
101        new_decide_qc,
102        leaf_views,
103        included_txns,
104        decided_upgrade_cert,
105    } = if version >= V::Epochs::VERSION {
106        // Skip the decide rule for the last block of the epoch.  This is so
107        // that we do not decide the block with epoch_height -2 before we enter the new epoch
108        if !is_last_block(
109            proposal.block_header().block_number(),
110            task_state.epoch_height,
111        ) {
112            decide_from_proposal_2::<TYPES, I>(
113                proposal,
114                OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
115                Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
116                &task_state.public_key,
117                version >= V::Epochs::VERSION,
118                &task_state.membership,
119                &task_state.storage,
120            )
121            .await
122        } else {
123            LeafChainTraversalOutcome::default()
124        }
125    } else {
126        decide_from_proposal::<TYPES, I>(
127            proposal,
128            OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
129            Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
130            &task_state.public_key,
131            version >= V::Epochs::VERSION,
132            &task_state.membership,
133            &task_state.storage,
134            task_state.epoch_height,
135        )
136        .await
137    };
138
139    if let (Some(cert), Some(_)) = (decided_upgrade_cert.clone(), new_decided_view_number) {
140        let mut decided_certificate_lock = task_state
141            .upgrade_lock
142            .decided_upgrade_certificate
143            .write()
144            .await;
145        *decided_certificate_lock = Some(cert.clone());
146        drop(decided_certificate_lock);
147        if cert.data.new_version >= V::Epochs::VERSION && V::Base::VERSION < V::Epochs::VERSION {
148            let epoch_height = task_state.consensus.read().await.epoch_height;
149            let first_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
150                proposal.block_header().block_number(),
151                epoch_height,
152            ));
153
154            tracing::debug!("Calling set_first_epoch for epoch {first_epoch_number:?}");
155            task_state
156                .membership
157                .membership()
158                .write()
159                .await
160                .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
161
162            broadcast_event(
163                Arc::new(HotShotEvent::SetFirstEpoch(
164                    cert.data.new_version_first_view,
165                    first_epoch_number,
166                )),
167                event_sender,
168            )
169            .await;
170        }
171
172        let _ = task_state
173            .storage
174            .update_decided_upgrade_certificate(Some(cert.clone()))
175            .await;
176    }
177
178    let mut consensus_writer = task_state.consensus.write().await;
179    if let Some(locked_view_number) = new_locked_view_number {
180        let _ = consensus_writer.update_locked_view(locked_view_number);
181    }
182
183    #[allow(clippy::cast_precision_loss)]
184    if let Some(decided_view_number) = new_decided_view_number {
185        // Bring in the cleanup crew. When a new decide is indeed valid, we need to clear out old memory.
186
187        let old_decided_view = consensus_writer.last_decided_view();
188        consensus_writer.collect_garbage(old_decided_view, decided_view_number);
189
190        // Set the new decided view.
191        consensus_writer
192            .update_last_decided_view(decided_view_number)
193            .context(|e| {
194                warn!("`update_last_decided_view` failed; this should never happen. Error: {e}")
195            })?;
196
197        consensus_writer
198            .metrics
199            .last_decided_time
200            .set(Utc::now().timestamp().try_into().unwrap());
201        consensus_writer.metrics.invalid_qc.set(0);
202        consensus_writer
203            .metrics
204            .last_decided_view
205            .set(usize::try_from(consensus_writer.last_decided_view().u64()).unwrap());
206        let cur_number_of_views_per_decide_event =
207            *proposal.view_number() - consensus_writer.last_decided_view().u64();
208        consensus_writer
209            .metrics
210            .number_of_views_per_decide_event
211            .add_point(cur_number_of_views_per_decide_event as f64);
212
213        // We don't need to hold this while we broadcast
214        drop(consensus_writer);
215
216        for leaf_info in &leaf_views {
217            tracing::info!(
218                "Sending decide for view {:?} at height {:?}",
219                leaf_info.leaf.view_number(),
220                leaf_info.leaf.block_header().block_number(),
221            );
222        }
223
224        broadcast_event(
225            Arc::new(HotShotEvent::LeavesDecided(
226                leaf_views
227                    .iter()
228                    .map(|leaf_info| leaf_info.leaf.clone())
229                    .collect(),
230            )),
231            event_sender,
232        )
233        .await;
234
235        // Send an update to everyone saying that we've reached a decide
236        broadcast_event(
237            Event {
238                view_number: decided_view_number,
239                event: EventType::Decide {
240                    leaf_chain: Arc::new(leaf_views.clone()),
241                    // This is never none if we've reached a new decide, so this is safe to unwrap.
242                    qc: Arc::new(new_decide_qc.clone().unwrap()),
243                    block_size: included_txns.map(|txns| txns.len().try_into().unwrap()),
244                },
245            },
246            &task_state.output_event_stream,
247        )
248        .await;
249
250        tracing::debug!(
251            "Successfully sent decide event, leaf views: {:?}, leaf views len: {:?}, qc view: {:?}",
252            decided_view_number,
253            leaf_views.len(),
254            new_decide_qc.as_ref().unwrap().view_number()
255        );
256
257        if version >= V::Epochs::VERSION {
258            for leaf_view in leaf_views {
259                store_drb_result(task_state, &leaf_view.leaf).await?;
260            }
261        }
262    }
263
264    Ok(())
265}
266
267/// Updates the shared consensus state with the new voting data.
268#[instrument(skip_all, target = "VoteDependencyHandle", fields(view = *view_number))]
269#[allow(clippy::too_many_arguments)]
270pub(crate) async fn update_shared_state<TYPES: NodeType, V: Versions>(
271    consensus: OuterConsensus<TYPES>,
272    sender: Sender<Arc<HotShotEvent<TYPES>>>,
273    receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
274    membership: EpochMembershipCoordinator<TYPES>,
275    public_key: TYPES::SignatureKey,
276    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
277    upgrade_lock: UpgradeLock<TYPES, V>,
278    view_number: TYPES::View,
279    instance_state: Arc<TYPES::InstanceState>,
280    proposed_leaf: &Leaf2<TYPES>,
281    vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
282    parent_view_number: Option<TYPES::View>,
283    epoch_height: u64,
284) -> Result<()> {
285    let justify_qc = &proposed_leaf.justify_qc();
286
287    let consensus_reader = consensus.read().await;
288    // Try to find the validated view within the validated state map. This will be present
289    // if we have the saved leaf, but if not we'll get it when we fetch_proposal.
290    let mut maybe_validated_view = parent_view_number.and_then(|view_number| {
291        consensus_reader
292            .validated_state_map()
293            .get(&view_number)
294            .cloned()
295    });
296
297    // Justify qc's leaf commitment should be the same as the parent's leaf commitment.
298    let mut maybe_parent = consensus_reader
299        .saved_leaves()
300        .get(&justify_qc.data.leaf_commit)
301        .cloned();
302
303    drop(consensus_reader);
304
305    maybe_parent = match maybe_parent {
306        Some(p) => Some(p),
307        None => {
308            match fetch_proposal(
309                justify_qc,
310                sender.clone(),
311                receiver.activate_cloned(),
312                membership.clone(),
313                OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
314                public_key.clone(),
315                private_key.clone(),
316                &upgrade_lock,
317                epoch_height,
318            )
319            .await
320            .ok()
321            {
322                Some((leaf, view)) => {
323                    maybe_validated_view = Some(view);
324                    Some(leaf)
325                },
326                None => None,
327            }
328        },
329    };
330
331    let parent = maybe_parent.context(info!(
332        "Proposal's parent missing from storage with commitment: {:?}, proposal view {}",
333        justify_qc.data.leaf_commit,
334        proposed_leaf.view_number(),
335    ))?;
336
337    let Some(validated_view) = maybe_validated_view else {
338        bail!("Failed to fetch view for parent, parent view {parent_view_number:?}");
339    };
340
341    let (Some(parent_state), _) = validated_view.state_and_delta() else {
342        bail!("Parent state not found! Consensus internally inconsistent");
343    };
344
345    let version = upgrade_lock.version(view_number).await?;
346
347    let now = Instant::now();
348    let (validated_state, state_delta) = parent_state
349        .validate_and_apply_header(
350            &instance_state,
351            &parent,
352            &proposed_leaf.block_header().clone(),
353            vid_share.data.payload_byte_len(),
354            version,
355            *view_number,
356        )
357        .await
358        .wrap()
359        .context(warn!("Block header doesn't extend the proposal!"))?;
360    let validation_duration = now.elapsed();
361    tracing::debug!("Validation time: {validation_duration:?}");
362
363    let now = Instant::now();
364    // Now that we've rounded everyone up, we need to update the shared state
365    let mut consensus_writer = consensus.write().await;
366
367    if let Err(e) = consensus_writer.update_leaf(
368        proposed_leaf.clone(),
369        Arc::new(validated_state),
370        Some(Arc::new(state_delta)),
371    ) {
372        tracing::trace!("{e:?}");
373    }
374    let update_leaf_duration = now.elapsed();
375
376    consensus_writer
377        .metrics
378        .validate_and_apply_header_duration
379        .add_point(validation_duration.as_secs_f64());
380    consensus_writer
381        .metrics
382        .update_leaf_duration
383        .add_point(update_leaf_duration.as_secs_f64());
384    drop(consensus_writer);
385    tracing::debug!("update_leaf time: {update_leaf_duration:?}");
386
387    Ok(())
388}
389
390/// Submits the `QuorumVoteSend` event if all the dependencies are met.
391#[instrument(skip_all, fields(name = "Submit quorum vote", level = "error"))]
392#[allow(clippy::too_many_arguments)]
393pub(crate) async fn submit_vote<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
394    sender: Sender<Arc<HotShotEvent<TYPES>>>,
395    membership: EpochMembership<TYPES>,
396    public_key: TYPES::SignatureKey,
397    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
398    upgrade_lock: UpgradeLock<TYPES, V>,
399    view_number: TYPES::View,
400    storage: I::Storage,
401    storage_metrics: Arc<StorageMetricsValue>,
402    leaf: Leaf2<TYPES>,
403    vid_share: Proposal<TYPES, VidDisperseShare<TYPES>>,
404    extended_vote: bool,
405    epoch_root_vote: bool,
406    epoch_height: u64,
407    state_private_key: &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
408    stake_table_capacity: usize,
409) -> Result<()> {
410    let committee_member_in_current_epoch = membership.has_stake(&public_key).await;
411    // If the proposed leaf is for the last block in the epoch and the node is part of the quorum committee
412    // in the next epoch, the node should vote to achieve the double quorum.
413    let committee_member_in_next_epoch = leaf.with_epoch
414        && is_epoch_transition(leaf.height(), epoch_height)
415        && membership
416            .next_epoch_stake_table()
417            .await?
418            .has_stake(&public_key)
419            .await;
420
421    ensure!(
422        committee_member_in_current_epoch || committee_member_in_next_epoch,
423        info!("We were not chosen for quorum committee on {view_number}")
424    );
425
426    let height = if membership.epoch().is_some() {
427        Some(leaf.height())
428    } else {
429        None
430    };
431
432    // Create and send the vote.
433    let vote = QuorumVote2::<TYPES>::create_signed_vote(
434        QuorumData2 {
435            leaf_commit: leaf.commit(),
436            epoch: membership.epoch(),
437            block_number: height,
438        },
439        view_number,
440        &public_key,
441        &private_key,
442        &upgrade_lock,
443    )
444    .await
445    .wrap()
446    .context(error!("Failed to sign vote. This should never happen."))?;
447    let now = Instant::now();
448    // Add to the storage.
449    storage
450        .append_vid_general(&vid_share)
451        .await
452        .wrap()
453        .context(error!("Failed to store VID share"))?;
454    let append_vid_duration = now.elapsed();
455    storage_metrics
456        .append_vid_duration
457        .add_point(append_vid_duration.as_secs_f64());
458    tracing::debug!("append_vid_general time: {append_vid_duration:?}");
459
460    // Make epoch root vote
461
462    let epoch_enabled = upgrade_lock.epochs_enabled(view_number).await;
463    if extended_vote && epoch_enabled {
464        tracing::debug!("sending extended vote to everybody",);
465        broadcast_event(
466            Arc::new(HotShotEvent::ExtendedQuorumVoteSend(vote)),
467            &sender,
468        )
469        .await;
470    } else if epoch_root_vote && epoch_enabled {
471        tracing::debug!(
472            "sending epoch root vote to next quorum leader {:?}",
473            vote.view_number() + 1
474        );
475        let light_client_state = leaf
476            .block_header()
477            .get_light_client_state(view_number)
478            .wrap()
479            .context(error!("Failed to generate light client state"))?;
480        let next_stake_table = membership
481            .next_epoch_stake_table()
482            .await?
483            .stake_table()
484            .await;
485        let next_stake_table_state = next_stake_table
486            .commitment(stake_table_capacity)
487            .wrap()
488            .context(error!("Failed to compute stake table commitment"))?;
489        // We are still providing LCV2 state signatures for backward compatibility
490        let v2_signature = <TYPES::StateSignatureKey as LCV2StateSignatureKey>::sign_state(
491            state_private_key,
492            &light_client_state,
493            &next_stake_table_state,
494        )
495        .wrap()
496        .context(error!("Failed to sign the light client state"))?;
497        let auth_root = leaf
498            .block_header()
499            .auth_root()
500            .wrap()
501            .context(error!(format!(
502                "Failed to get auth root for light client state certificate. view={view_number}"
503            )))?;
504        let signed_state_digest =
505            derive_signed_state_digest(&light_client_state, &next_stake_table_state, &auth_root);
506        let signature = <TYPES::StateSignatureKey as LCV3StateSignatureKey>::sign_state(
507            state_private_key,
508            signed_state_digest,
509        )
510        .wrap()
511        .context(error!("Failed to sign the light client state"))?;
512        let state_vote = LightClientStateUpdateVote {
513            epoch: TYPES::Epoch::new(epoch_from_block_number(leaf.height(), epoch_height)),
514            light_client_state,
515            next_stake_table_state,
516            signature,
517            v2_signature,
518            auth_root,
519            signed_state_digest,
520        };
521        broadcast_event(
522            Arc::new(HotShotEvent::EpochRootQuorumVoteSend(EpochRootQuorumVote {
523                vote,
524                state_vote,
525            })),
526            &sender,
527        )
528        .await;
529    } else {
530        tracing::debug!(
531            "sending vote to next quorum leader {:?}",
532            vote.view_number() + 1
533        );
534        broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await;
535    }
536
537    Ok(())
538}