hotshot_task_impls/quorum_vote/
handlers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Instant};
8
9use async_broadcast::{InactiveReceiver, Sender};
10use chrono::Utc;
11use committable::Committable;
12use hotshot_types::{
13    consensus::OuterConsensus,
14    data::{Leaf2, QuorumProposalWrapper, VidDisperseShare},
15    drb::INITIAL_DRB_RESULT,
16    epoch_membership::{EpochMembership, EpochMembershipCoordinator},
17    event::{Event, EventType},
18    message::{Proposal, UpgradeLock},
19    simple_vote::{EpochRootQuorumVote2, LightClientStateUpdateVote2, QuorumData2, QuorumVote2},
20    storage_metrics::StorageMetricsValue,
21    traits::{
22        block_contents::BlockHeader,
23        election::Membership,
24        node_implementation::{ConsensusTime, NodeImplementation, NodeType},
25        signature_key::{
26            LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StateSignatureKey,
27        },
28        storage::Storage,
29        ValidatedState,
30    },
31    utils::{epoch_from_block_number, is_epoch_transition, is_last_block, is_transition_block},
32    vote::HasViewNumber,
33};
34use hotshot_utils::anytrace::*;
35use tracing::instrument;
36use vbs::version::StaticVersionType;
37
38use super::QuorumVoteTaskState;
39use crate::{
40    events::HotShotEvent,
41    helpers::{
42        broadcast_event, decide_from_proposal, decide_from_proposal_2, derive_signed_state_digest,
43        fetch_proposal, handle_drb_result, LeafChainTraversalOutcome,
44    },
45    quorum_vote::Versions,
46};
47
48/// Store the DRB result for the next epoch if we received it in a decided leaf.
49async fn store_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
50    task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
51    decided_leaf: &Leaf2<TYPES>,
52) -> Result<()> {
53    if task_state.epoch_height == 0 {
54        tracing::info!("Epoch height is 0, skipping DRB storage.");
55        return Ok(());
56    }
57
58    let decided_block_number = decided_leaf.block_header().block_number();
59    let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
60        decided_block_number,
61        task_state.epoch_height,
62    ));
63    // Skip storing the received result if this is not the transition block.
64    if is_transition_block(decided_block_number, task_state.epoch_height) {
65        if let Some(result) = decided_leaf.next_drb_result {
66            // We don't need to check value existence and consistency because it should be
67            // impossible to decide on a block with different DRB results.
68            handle_drb_result::<TYPES, I>(
69                task_state.membership.membership(),
70                current_epoch_number + 1,
71                &task_state.storage,
72                result,
73            )
74            .await;
75        } else {
76            bail!("The last block of the epoch is decided but doesn't contain a DRB result.");
77        }
78    }
79    Ok(())
80}
81
82/// Handles the `QuorumProposalValidated` event.
83#[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number()))]
84pub(crate) async fn handle_quorum_proposal_validated<
85    TYPES: NodeType,
86    I: NodeImplementation<TYPES>,
87    V: Versions,
88>(
89    proposal: &QuorumProposalWrapper<TYPES>,
90    task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
91    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
92) -> Result<()> {
93    let version = task_state
94        .upgrade_lock
95        .version(proposal.view_number())
96        .await?;
97
98    let LeafChainTraversalOutcome {
99        new_locked_view_number,
100        new_decided_view_number,
101        committing_qc,
102        deciding_qc,
103        leaf_views,
104        included_txns,
105        decided_upgrade_cert,
106    } = if version >= V::Epochs::VERSION {
107        // Skip the decide rule for the last block of the epoch.  This is so
108        // that we do not decide the block with epoch_height -2 before we enter the new epoch
109        if !is_last_block(
110            proposal.block_header().block_number(),
111            task_state.epoch_height,
112        ) {
113            decide_from_proposal_2::<TYPES, I>(
114                proposal,
115                OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
116                Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
117                &task_state.public_key,
118                version >= V::Epochs::VERSION,
119                &task_state.membership,
120                &task_state.storage,
121            )
122            .await
123        } else {
124            LeafChainTraversalOutcome::default()
125        }
126    } else {
127        decide_from_proposal::<TYPES, I>(
128            proposal,
129            OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
130            Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
131            &task_state.public_key,
132            version >= V::Epochs::VERSION,
133            &task_state.membership,
134            &task_state.storage,
135            task_state.epoch_height,
136        )
137        .await
138    };
139
140    if let (Some(cert), Some(_)) = (decided_upgrade_cert.clone(), new_decided_view_number) {
141        let mut decided_certificate_lock = task_state
142            .upgrade_lock
143            .decided_upgrade_certificate
144            .write()
145            .await;
146        *decided_certificate_lock = Some(cert.clone());
147        drop(decided_certificate_lock);
148        if cert.data.new_version >= V::Epochs::VERSION && V::Base::VERSION < V::Epochs::VERSION {
149            let epoch_height = task_state.consensus.read().await.epoch_height;
150            let first_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
151                proposal.block_header().block_number(),
152                epoch_height,
153            ));
154
155            tracing::debug!("Calling set_first_epoch for epoch {first_epoch_number:?}");
156            task_state
157                .membership
158                .membership()
159                .write()
160                .await
161                .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
162
163            broadcast_event(
164                Arc::new(HotShotEvent::SetFirstEpoch(
165                    cert.data.new_version_first_view,
166                    first_epoch_number,
167                )),
168                event_sender,
169            )
170            .await;
171        }
172
173        for da_committee in &task_state.da_committees {
174            if cert.data.new_version >= da_committee.start_version {
175                task_state
176                    .membership
177                    .membership()
178                    .write()
179                    .await
180                    .add_da_committee(da_committee.start_epoch, da_committee.committee.clone());
181            }
182        }
183
184        let _ = task_state
185            .storage
186            .update_decided_upgrade_certificate(Some(cert.clone()))
187            .await;
188    }
189
190    let mut consensus_writer = task_state.consensus.write().await;
191    if let Some(locked_view_number) = new_locked_view_number {
192        let _ = consensus_writer.update_locked_view(locked_view_number);
193    }
194
195    #[allow(clippy::cast_precision_loss)]
196    if let Some(decided_view_number) = new_decided_view_number {
197        // Bring in the cleanup crew. When a new decide is indeed valid, we need to clear out old memory.
198
199        let old_decided_view = consensus_writer.last_decided_view();
200        consensus_writer.collect_garbage(old_decided_view, decided_view_number);
201
202        // Set the new decided view.
203        consensus_writer
204            .update_last_decided_view(decided_view_number)
205            .context(|e| {
206                warn!("`update_last_decided_view` failed; this should never happen. Error: {e}")
207            })?;
208
209        consensus_writer
210            .metrics
211            .last_decided_time
212            .set(Utc::now().timestamp().try_into().unwrap());
213        consensus_writer.metrics.invalid_qc.set(0);
214        consensus_writer
215            .metrics
216            .last_decided_view
217            .set(usize::try_from(consensus_writer.last_decided_view().u64()).unwrap());
218        let cur_number_of_views_per_decide_event =
219            *proposal.view_number() - consensus_writer.last_decided_view().u64();
220        consensus_writer
221            .metrics
222            .number_of_views_per_decide_event
223            .add_point(cur_number_of_views_per_decide_event as f64);
224
225        // We don't need to hold this while we broadcast
226        drop(consensus_writer);
227
228        for leaf_info in &leaf_views {
229            tracing::info!(
230                "Sending decide for view {:?} at height {:?}",
231                leaf_info.leaf.view_number(),
232                leaf_info.leaf.block_header().block_number(),
233            );
234        }
235
236        broadcast_event(
237            Arc::new(HotShotEvent::LeavesDecided(
238                leaf_views
239                    .iter()
240                    .map(|leaf_info| leaf_info.leaf.clone())
241                    .collect(),
242            )),
243            event_sender,
244        )
245        .await;
246
247        // Send an update to everyone saying that we've reached a decide. The committing QC is never
248        // none if we've reached a new decide, so this is safe to unwrap.
249        let committing_qc = Arc::new(committing_qc.unwrap());
250        broadcast_event(
251            Event {
252                view_number: decided_view_number,
253                event: EventType::Decide {
254                    leaf_chain: Arc::new(leaf_views.clone()),
255                    committing_qc: committing_qc.clone(),
256                    deciding_qc: deciding_qc.map(Arc::new),
257                    block_size: included_txns.map(|txns| txns.len().try_into().unwrap()),
258                },
259            },
260            &task_state.output_event_stream,
261        )
262        .await;
263
264        tracing::debug!(
265            "Successfully sent decide event, leaf views: {:?}, leaf views len: {:?}, qc view: {:?}",
266            decided_view_number,
267            leaf_views.len(),
268            committing_qc.view_number()
269        );
270
271        if version >= V::Epochs::VERSION {
272            for leaf_view in leaf_views {
273                store_drb_result(task_state, &leaf_view.leaf).await?;
274            }
275        }
276    }
277
278    Ok(())
279}
280
281/// Updates the shared consensus state with the new voting data.
282#[instrument(skip_all, target = "VoteDependencyHandle", fields(view = *view_number))]
283#[allow(clippy::too_many_arguments)]
284pub(crate) async fn update_shared_state<TYPES: NodeType, V: Versions>(
285    consensus: OuterConsensus<TYPES>,
286    sender: Sender<Arc<HotShotEvent<TYPES>>>,
287    receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
288    membership: EpochMembershipCoordinator<TYPES>,
289    public_key: TYPES::SignatureKey,
290    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
291    upgrade_lock: UpgradeLock<TYPES, V>,
292    view_number: TYPES::View,
293    instance_state: Arc<TYPES::InstanceState>,
294    proposed_leaf: &Leaf2<TYPES>,
295    vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
296    parent_view_number: Option<TYPES::View>,
297    epoch_height: u64,
298) -> Result<()> {
299    let justify_qc = &proposed_leaf.justify_qc();
300
301    let consensus_reader = consensus.read().await;
302    // Try to find the validated view within the validated state map. This will be present
303    // if we have the saved leaf, but if not we'll get it when we fetch_proposal.
304    let mut maybe_validated_view = parent_view_number.and_then(|view_number| {
305        consensus_reader
306            .validated_state_map()
307            .get(&view_number)
308            .cloned()
309    });
310
311    // Justify qc's leaf commitment should be the same as the parent's leaf commitment.
312    let mut maybe_parent = consensus_reader
313        .saved_leaves()
314        .get(&justify_qc.data.leaf_commit)
315        .cloned();
316
317    drop(consensus_reader);
318
319    maybe_parent = match maybe_parent {
320        Some(p) => Some(p),
321        None => {
322            match fetch_proposal(
323                justify_qc,
324                sender.clone(),
325                receiver.activate_cloned(),
326                membership.clone(),
327                OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
328                public_key.clone(),
329                private_key.clone(),
330                &upgrade_lock,
331                epoch_height,
332            )
333            .await
334            .ok()
335            {
336                Some((leaf, view)) => {
337                    maybe_validated_view = Some(view);
338                    Some(leaf)
339                },
340                None => None,
341            }
342        },
343    };
344
345    let parent = maybe_parent.context(info!(
346        "Proposal's parent missing from storage with commitment: {:?}, proposal view {}",
347        justify_qc.data.leaf_commit,
348        proposed_leaf.view_number(),
349    ))?;
350
351    let Some(validated_view) = maybe_validated_view else {
352        bail!("Failed to fetch view for parent, parent view {parent_view_number:?}");
353    };
354
355    let (Some(parent_state), _) = validated_view.state_and_delta() else {
356        bail!("Parent state not found! Consensus internally inconsistent");
357    };
358
359    let version = upgrade_lock.version(view_number).await?;
360
361    let now = Instant::now();
362    let (validated_state, state_delta) = parent_state
363        .validate_and_apply_header(
364            &instance_state,
365            &parent,
366            &proposed_leaf.block_header().clone(),
367            vid_share.data.payload_byte_len(),
368            version,
369            *view_number,
370        )
371        .await
372        .wrap()
373        .context(warn!("Block header doesn't extend the proposal!"))?;
374    let validation_duration = now.elapsed();
375    tracing::debug!("Validation time: {validation_duration:?}");
376
377    let now = Instant::now();
378    // Now that we've rounded everyone up, we need to update the shared state
379    let mut consensus_writer = consensus.write().await;
380
381    if let Err(e) = consensus_writer.update_leaf(
382        proposed_leaf.clone(),
383        Arc::new(validated_state),
384        Some(Arc::new(state_delta)),
385    ) {
386        tracing::trace!("{e:?}");
387    }
388    let update_leaf_duration = now.elapsed();
389
390    consensus_writer
391        .metrics
392        .validate_and_apply_header_duration
393        .add_point(validation_duration.as_secs_f64());
394    consensus_writer
395        .metrics
396        .update_leaf_duration
397        .add_point(update_leaf_duration.as_secs_f64());
398    drop(consensus_writer);
399    tracing::debug!("update_leaf time: {update_leaf_duration:?}");
400
401    Ok(())
402}
403
404/// Submits the `QuorumVoteSend` event if all the dependencies are met.
405#[instrument(skip_all, fields(name = "Submit quorum vote", level = "error"))]
406#[allow(clippy::too_many_arguments)]
407pub(crate) async fn submit_vote<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
408    sender: Sender<Arc<HotShotEvent<TYPES>>>,
409    membership: EpochMembership<TYPES>,
410    public_key: TYPES::SignatureKey,
411    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
412    upgrade_lock: UpgradeLock<TYPES, V>,
413    view_number: TYPES::View,
414    storage: I::Storage,
415    storage_metrics: Arc<StorageMetricsValue>,
416    leaf: Leaf2<TYPES>,
417    vid_share: Proposal<TYPES, VidDisperseShare<TYPES>>,
418    extended_vote: bool,
419    epoch_root_vote: bool,
420    epoch_height: u64,
421    state_private_key: &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
422    stake_table_capacity: usize,
423) -> Result<()> {
424    let committee_member_in_current_epoch = membership.has_stake(&public_key).await;
425    // If the proposed leaf is for the last block in the epoch and the node is part of the quorum committee
426    // in the next epoch, the node should vote to achieve the double quorum.
427    let committee_member_in_next_epoch = leaf.with_epoch
428        && is_epoch_transition(leaf.height(), epoch_height)
429        && membership
430            .next_epoch_stake_table()
431            .await?
432            .has_stake(&public_key)
433            .await;
434
435    ensure!(
436        committee_member_in_current_epoch || committee_member_in_next_epoch,
437        info!("We were not chosen for quorum committee on {view_number}")
438    );
439
440    let height = if membership.epoch().is_some() {
441        Some(leaf.height())
442    } else {
443        None
444    };
445
446    // Create and send the vote.
447    let vote = QuorumVote2::<TYPES>::create_signed_vote(
448        QuorumData2 {
449            leaf_commit: leaf.commit(),
450            epoch: membership.epoch(),
451            block_number: height,
452        },
453        view_number,
454        &public_key,
455        &private_key,
456        &upgrade_lock,
457    )
458    .await
459    .wrap()
460    .context(error!("Failed to sign vote. This should never happen."))?;
461    let now = Instant::now();
462    // Add to the storage.
463    storage
464        .append_vid_general(&vid_share)
465        .await
466        .wrap()
467        .context(error!("Failed to store VID share"))?;
468    let append_vid_duration = now.elapsed();
469    storage_metrics
470        .append_vid_duration
471        .add_point(append_vid_duration.as_secs_f64());
472    tracing::debug!("append_vid_general time: {append_vid_duration:?}");
473
474    // Make epoch root vote
475
476    let epoch_enabled = upgrade_lock.epochs_enabled(view_number).await;
477    if extended_vote && epoch_enabled {
478        tracing::debug!("sending extended vote to everybody",);
479        broadcast_event(
480            Arc::new(HotShotEvent::ExtendedQuorumVoteSend(vote)),
481            &sender,
482        )
483        .await;
484    } else if epoch_root_vote && epoch_enabled {
485        tracing::debug!(
486            "sending epoch root vote to next quorum leader {:?}",
487            vote.view_number() + 1
488        );
489        let light_client_state = leaf
490            .block_header()
491            .get_light_client_state(view_number)
492            .wrap()
493            .context(error!("Failed to generate light client state"))?;
494        let next_stake_table = membership
495            .next_epoch_stake_table()
496            .await?
497            .stake_table()
498            .await;
499        let next_stake_table_state = next_stake_table
500            .commitment(stake_table_capacity)
501            .wrap()
502            .context(error!("Failed to compute stake table commitment"))?;
503        // We are still providing LCV2 state signatures for backward compatibility
504        let v2_signature = <TYPES::StateSignatureKey as LCV2StateSignatureKey>::sign_state(
505            state_private_key,
506            &light_client_state,
507            &next_stake_table_state,
508        )
509        .wrap()
510        .context(error!("Failed to sign the light client state"))?;
511        let auth_root = leaf
512            .block_header()
513            .auth_root()
514            .wrap()
515            .context(error!(format!(
516                "Failed to get auth root for light client state certificate. view={view_number}"
517            )))?;
518        let signed_state_digest =
519            derive_signed_state_digest(&light_client_state, &next_stake_table_state, &auth_root);
520        let signature = <TYPES::StateSignatureKey as LCV3StateSignatureKey>::sign_state(
521            state_private_key,
522            signed_state_digest,
523        )
524        .wrap()
525        .context(error!("Failed to sign the light client state"))?;
526        let state_vote = LightClientStateUpdateVote2 {
527            epoch: TYPES::Epoch::new(epoch_from_block_number(leaf.height(), epoch_height)),
528            light_client_state,
529            next_stake_table_state,
530            signature,
531            v2_signature,
532            auth_root,
533            signed_state_digest,
534        };
535        broadcast_event(
536            Arc::new(HotShotEvent::EpochRootQuorumVoteSend(
537                EpochRootQuorumVote2 { vote, state_vote },
538            )),
539            &sender,
540        )
541        .await;
542    } else {
543        tracing::debug!(
544            "sending vote to next quorum leader {:?}",
545            vote.view_number() + 1
546        );
547        broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await;
548    }
549
550    Ok(())
551}