hotshot_task_impls/quorum_vote/
handlers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Instant};
8
9use async_broadcast::{InactiveReceiver, Sender};
10use chrono::Utc;
11use committable::Committable;
12use hotshot_types::{
13    consensus::OuterConsensus,
14    data::{EpochNumber, Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewNumber},
15    drb::INITIAL_DRB_RESULT,
16    epoch_membership::{EpochMembership, EpochMembershipCoordinator},
17    event::{Event, EventType},
18    message::{Proposal, UpgradeLock},
19    simple_vote::{EpochRootQuorumVote2, LightClientStateUpdateVote2, QuorumData2, QuorumVote2},
20    storage_metrics::StorageMetricsValue,
21    traits::{
22        ValidatedState,
23        block_contents::BlockHeader,
24        election::Membership,
25        node_implementation::{NodeImplementation, NodeType},
26        signature_key::{
27            LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StateSignatureKey,
28        },
29        storage::Storage,
30    },
31    utils::{epoch_from_block_number, is_epoch_transition, is_last_block, is_transition_block},
32    vote::HasViewNumber,
33};
34use hotshot_utils::anytrace::*;
35use tracing::instrument;
36use versions::EPOCH_VERSION;
37
38use super::QuorumVoteTaskState;
39use crate::{
40    events::HotShotEvent,
41    helpers::{
42        LeafChainTraversalOutcome, broadcast_event, decide_from_proposal, decide_from_proposal_2,
43        derive_signed_state_digest, fetch_proposal, handle_drb_result,
44    },
45};
46
47/// Store the DRB result for the next epoch if we received it in a decided leaf.
48async fn store_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
49    task_state: &mut QuorumVoteTaskState<TYPES, I>,
50    decided_leaf: &Leaf2<TYPES>,
51) -> Result<()> {
52    if task_state.epoch_height == 0 {
53        tracing::info!("Epoch height is 0, skipping DRB storage.");
54        return Ok(());
55    }
56
57    let decided_block_number = decided_leaf.block_header().block_number();
58    let current_epoch_number = EpochNumber::new(epoch_from_block_number(
59        decided_block_number,
60        task_state.epoch_height,
61    ));
62    // Skip storing the received result if this is not the transition block.
63    if is_transition_block(decided_block_number, task_state.epoch_height) {
64        if let Some(result) = decided_leaf.next_drb_result {
65            // We don't need to check value existence and consistency because it should be
66            // impossible to decide on a block with different DRB results.
67            handle_drb_result::<TYPES, I>(
68                task_state.membership.membership(),
69                current_epoch_number + 1,
70                &task_state.storage,
71                result,
72            )
73            .await;
74        } else {
75            bail!("The last block of the epoch is decided but doesn't contain a DRB result.");
76        }
77    }
78    Ok(())
79}
80
81/// Handles the `QuorumProposalValidated` event.
82#[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number()))]
83pub(crate) async fn handle_quorum_proposal_validated<
84    TYPES: NodeType,
85    I: NodeImplementation<TYPES>,
86>(
87    proposal: &QuorumProposalWrapper<TYPES>,
88    task_state: &mut QuorumVoteTaskState<TYPES, I>,
89    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
90) -> Result<()> {
91    let version = task_state.upgrade_lock.version(proposal.view_number())?;
92
93    let LeafChainTraversalOutcome {
94        new_locked_view_number,
95        new_decided_view_number,
96        committing_qc,
97        deciding_qc,
98        leaf_views,
99        included_txns,
100        decided_upgrade_cert,
101    } = if version >= EPOCH_VERSION {
102        // Skip the decide rule for the last block of the epoch.  This is so
103        // that we do not decide the block with epoch_height -2 before we enter the new epoch
104        if !is_last_block(
105            proposal.block_header().block_number(),
106            task_state.epoch_height,
107        ) {
108            decide_from_proposal_2::<TYPES, I>(
109                proposal,
110                OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
111                &task_state.upgrade_lock,
112                &task_state.public_key,
113                version >= EPOCH_VERSION,
114                &task_state.membership,
115                &task_state.storage,
116            )
117            .await
118        } else {
119            LeafChainTraversalOutcome::default()
120        }
121    } else {
122        decide_from_proposal::<TYPES, I>(
123            proposal,
124            OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
125            &task_state.upgrade_lock,
126            &task_state.public_key,
127            version >= EPOCH_VERSION,
128            &task_state.membership,
129            &task_state.storage,
130            task_state.epoch_height,
131        )
132        .await
133    };
134
135    if let (Some(cert), Some(_)) = (decided_upgrade_cert.clone(), new_decided_view_number) {
136        task_state
137            .upgrade_lock
138            .set_decided_upgrade_cert(cert.clone());
139        if cert.data.new_version >= EPOCH_VERSION
140            && task_state.upgrade_lock.upgrade().base < EPOCH_VERSION
141        {
142            let epoch_height = task_state.consensus.read().await.epoch_height;
143            let first_epoch_number = EpochNumber::new(epoch_from_block_number(
144                proposal.block_header().block_number(),
145                epoch_height,
146            ));
147
148            tracing::debug!("Calling set_first_epoch for epoch {first_epoch_number:?}");
149            task_state
150                .membership
151                .membership()
152                .write()
153                .await
154                .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
155
156            broadcast_event(
157                Arc::new(HotShotEvent::SetFirstEpoch(
158                    cert.data.new_version_first_view,
159                    first_epoch_number,
160                )),
161                event_sender,
162            )
163            .await;
164        }
165
166        for da_committee in &task_state.da_committees {
167            if cert.data.new_version >= da_committee.start_version {
168                task_state
169                    .membership
170                    .membership()
171                    .write()
172                    .await
173                    .add_da_committee(da_committee.start_epoch, da_committee.committee.clone());
174            }
175        }
176
177        let _ = task_state
178            .storage
179            .update_decided_upgrade_certificate(Some(cert.clone()))
180            .await;
181    }
182
183    let mut consensus_writer = task_state.consensus.write().await;
184    if let Some(locked_view_number) = new_locked_view_number {
185        let _ = consensus_writer.update_locked_view(locked_view_number);
186    }
187
188    #[allow(clippy::cast_precision_loss)]
189    if let Some(decided_view_number) = new_decided_view_number {
190        // Bring in the cleanup crew. When a new decide is indeed valid, we need to clear out old memory.
191
192        let old_decided_view = consensus_writer.last_decided_view();
193        consensus_writer.collect_garbage(old_decided_view, decided_view_number);
194
195        // Set the new decided view.
196        consensus_writer
197            .update_last_decided_view(decided_view_number)
198            .context(|e| {
199                warn!("`update_last_decided_view` failed; this should never happen. Error: {e}")
200            })?;
201
202        consensus_writer
203            .metrics
204            .last_decided_time
205            .set(Utc::now().timestamp().try_into().unwrap());
206        consensus_writer.metrics.invalid_qc.set(0);
207        consensus_writer
208            .metrics
209            .last_decided_view
210            .set(usize::try_from(consensus_writer.last_decided_view().u64()).unwrap());
211        let cur_number_of_views_per_decide_event =
212            *proposal.view_number() - consensus_writer.last_decided_view().u64();
213        consensus_writer
214            .metrics
215            .number_of_views_per_decide_event
216            .add_point(cur_number_of_views_per_decide_event as f64);
217        for leaf in &leaf_views {
218            if let Err(e) = consensus_writer.update_vote_participation(leaf.leaf.justify_qc()) {
219                tracing::warn!("Failed to update vote participation: {e}");
220            }
221        }
222
223        // We don't need to hold this while we broadcast
224        drop(consensus_writer);
225
226        for leaf_info in &leaf_views {
227            tracing::info!(
228                "Sending decide for view {:?} at height {:?}",
229                leaf_info.leaf.view_number(),
230                leaf_info.leaf.block_header().block_number(),
231            );
232        }
233
234        broadcast_event(
235            Arc::new(HotShotEvent::LeavesDecided(
236                leaf_views
237                    .iter()
238                    .map(|leaf_info| leaf_info.leaf.clone())
239                    .collect(),
240            )),
241            event_sender,
242        )
243        .await;
244
245        // Send an update to everyone saying that we've reached a decide. The committing QC is never
246        // none if we've reached a new decide, so this is safe to unwrap.
247        let committing_qc = Arc::new(committing_qc.unwrap());
248        broadcast_event(
249            Event {
250                view_number: decided_view_number,
251                event: EventType::Decide {
252                    leaf_chain: Arc::new(leaf_views.clone()),
253                    committing_qc: committing_qc.clone(),
254                    deciding_qc: deciding_qc.map(Arc::new),
255                    block_size: included_txns.map(|txns| txns.len().try_into().unwrap()),
256                },
257            },
258            &task_state.output_event_stream,
259        )
260        .await;
261
262        tracing::debug!(
263            "Successfully sent decide event, leaf views: {:?}, leaf views len: {:?}, qc view: {:?}",
264            decided_view_number,
265            leaf_views.len(),
266            committing_qc.view_number()
267        );
268
269        if version >= EPOCH_VERSION {
270            for leaf_view in leaf_views {
271                store_drb_result(task_state, &leaf_view.leaf).await?;
272            }
273        }
274    }
275
276    Ok(())
277}
278
279/// Updates the shared consensus state with the new voting data.
280#[instrument(skip_all, target = "VoteDependencyHandle", fields(view = *view_number))]
281#[allow(clippy::too_many_arguments)]
282pub(crate) async fn update_shared_state<TYPES: NodeType>(
283    consensus: OuterConsensus<TYPES>,
284    sender: Sender<Arc<HotShotEvent<TYPES>>>,
285    receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
286    membership: EpochMembershipCoordinator<TYPES>,
287    public_key: TYPES::SignatureKey,
288    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
289    upgrade_lock: UpgradeLock<TYPES>,
290    view_number: ViewNumber,
291    instance_state: Arc<TYPES::InstanceState>,
292    proposed_leaf: &Leaf2<TYPES>,
293    vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
294    parent_view_number: Option<ViewNumber>,
295    epoch_height: u64,
296) -> Result<()> {
297    let justify_qc = &proposed_leaf.justify_qc();
298
299    let consensus_reader = consensus.read().await;
300    // Try to find the validated view within the validated state map. This will be present
301    // if we have the saved leaf, but if not we'll get it when we fetch_proposal.
302    let mut maybe_validated_view = parent_view_number.and_then(|view_number| {
303        consensus_reader
304            .validated_state_map()
305            .get(&view_number)
306            .cloned()
307    });
308
309    // Justify qc's leaf commitment should be the same as the parent's leaf commitment.
310    let mut maybe_parent = consensus_reader
311        .saved_leaves()
312        .get(&justify_qc.data.leaf_commit)
313        .cloned();
314
315    drop(consensus_reader);
316
317    maybe_parent = match maybe_parent {
318        Some(p) => Some(p),
319        None => {
320            match fetch_proposal(
321                justify_qc,
322                sender.clone(),
323                receiver.activate_cloned(),
324                membership.clone(),
325                OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
326                public_key.clone(),
327                private_key.clone(),
328                &upgrade_lock,
329                epoch_height,
330            )
331            .await
332            .ok()
333            {
334                Some((leaf, view)) => {
335                    maybe_validated_view = Some(view);
336                    Some(leaf)
337                },
338                None => None,
339            }
340        },
341    };
342
343    let parent = maybe_parent.context(info!(
344        "Proposal's parent missing from storage with commitment: {:?}, proposal view {}",
345        justify_qc.data.leaf_commit,
346        proposed_leaf.view_number(),
347    ))?;
348
349    let Some(validated_view) = maybe_validated_view else {
350        bail!("Failed to fetch view for parent, parent view {parent_view_number:?}");
351    };
352
353    let (Some(parent_state), _) = validated_view.state_and_delta() else {
354        bail!("Parent state not found! Consensus internally inconsistent");
355    };
356
357    let version = upgrade_lock.version(view_number)?;
358
359    let now = Instant::now();
360    let (validated_state, state_delta) = parent_state
361        .validate_and_apply_header(
362            &instance_state,
363            &parent,
364            &proposed_leaf.block_header().clone(),
365            vid_share.data.payload_byte_len(),
366            version,
367            *view_number,
368        )
369        .await
370        .wrap()
371        .context(warn!("Block header doesn't extend the proposal!"))?;
372    let validation_duration = now.elapsed();
373    tracing::debug!("Validation time: {validation_duration:?}");
374
375    let now = Instant::now();
376    // Now that we've rounded everyone up, we need to update the shared state
377    let mut consensus_writer = consensus.write().await;
378
379    if let Err(e) = consensus_writer.update_leaf(
380        proposed_leaf.clone(),
381        Arc::new(validated_state),
382        Some(Arc::new(state_delta)),
383    ) {
384        tracing::trace!("{e:?}");
385    }
386    let update_leaf_duration = now.elapsed();
387
388    consensus_writer
389        .metrics
390        .validate_and_apply_header_duration
391        .add_point(validation_duration.as_secs_f64());
392    consensus_writer
393        .metrics
394        .update_leaf_duration
395        .add_point(update_leaf_duration.as_secs_f64());
396    drop(consensus_writer);
397    tracing::debug!("update_leaf time: {update_leaf_duration:?}");
398
399    Ok(())
400}
401
402/// Submits the `QuorumVoteSend` event if all the dependencies are met.
403#[instrument(skip_all, fields(name = "Submit quorum vote", level = "error"))]
404#[allow(clippy::too_many_arguments)]
405pub(crate) async fn submit_vote<TYPES: NodeType, I: NodeImplementation<TYPES>>(
406    sender: Sender<Arc<HotShotEvent<TYPES>>>,
407    membership: EpochMembership<TYPES>,
408    public_key: TYPES::SignatureKey,
409    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
410    upgrade_lock: UpgradeLock<TYPES>,
411    view_number: ViewNumber,
412    storage: I::Storage,
413    storage_metrics: Arc<StorageMetricsValue>,
414    leaf: Leaf2<TYPES>,
415    vid_share: Proposal<TYPES, VidDisperseShare<TYPES>>,
416    extended_vote: bool,
417    epoch_root_vote: bool,
418    epoch_height: u64,
419    state_private_key: &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
420    stake_table_capacity: usize,
421) -> Result<()> {
422    let committee_member_in_current_epoch = membership.has_stake(&public_key).await;
423    // If the proposed leaf is for the last block in the epoch and the node is part of the quorum committee
424    // in the next epoch, the node should vote to achieve the double quorum.
425    let committee_member_in_next_epoch = leaf.with_epoch
426        && is_epoch_transition(leaf.height(), epoch_height)
427        && membership
428            .next_epoch_stake_table()
429            .await?
430            .has_stake(&public_key)
431            .await;
432
433    ensure!(
434        committee_member_in_current_epoch || committee_member_in_next_epoch,
435        info!("We were not chosen for quorum committee on {view_number}")
436    );
437
438    let height = if membership.epoch().is_some() {
439        Some(leaf.height())
440    } else {
441        None
442    };
443
444    // Create and send the vote.
445    let vote = QuorumVote2::<TYPES>::create_signed_vote(
446        QuorumData2 {
447            leaf_commit: leaf.commit(),
448            epoch: membership.epoch(),
449            block_number: height,
450        },
451        view_number,
452        &public_key,
453        &private_key,
454        &upgrade_lock,
455    )
456    .await
457    .wrap()
458    .context(error!("Failed to sign vote. This should never happen."))?;
459    let now = Instant::now();
460    // Add to the storage.
461    storage
462        .append_vid(&vid_share)
463        .await
464        .wrap()
465        .context(error!("Failed to store VID share"))?;
466    let append_vid_duration = now.elapsed();
467    storage_metrics
468        .append_vid_duration
469        .add_point(append_vid_duration.as_secs_f64());
470    tracing::debug!("append_vid_general time: {append_vid_duration:?}");
471
472    // Make epoch root vote
473
474    let epoch_enabled = upgrade_lock.epochs_enabled(view_number);
475    if extended_vote && epoch_enabled {
476        tracing::debug!("sending extended vote to everybody",);
477        broadcast_event(
478            Arc::new(HotShotEvent::ExtendedQuorumVoteSend(vote)),
479            &sender,
480        )
481        .await;
482    } else if epoch_root_vote && epoch_enabled {
483        tracing::debug!(
484            "sending epoch root vote to next quorum leader {:?}",
485            vote.view_number() + 1
486        );
487        let light_client_state = leaf
488            .block_header()
489            .get_light_client_state(view_number)
490            .wrap()
491            .context(error!("Failed to generate light client state"))?;
492        let next_stake_table = membership
493            .next_epoch_stake_table()
494            .await?
495            .stake_table()
496            .await;
497        let next_stake_table_state = next_stake_table
498            .commitment(stake_table_capacity)
499            .wrap()
500            .context(error!("Failed to compute stake table commitment"))?;
501        // We are still providing LCV2 state signatures for backward compatibility
502        let v2_signature = <TYPES::StateSignatureKey as LCV2StateSignatureKey>::sign_state(
503            state_private_key,
504            &light_client_state,
505            &next_stake_table_state,
506        )
507        .wrap()
508        .context(error!("Failed to sign the light client state"))?;
509        let auth_root = leaf
510            .block_header()
511            .auth_root()
512            .wrap()
513            .context(error!(format!(
514                "Failed to get auth root for light client state certificate. view={view_number}"
515            )))?;
516        let signed_state_digest =
517            derive_signed_state_digest(&light_client_state, &next_stake_table_state, &auth_root);
518        let signature = <TYPES::StateSignatureKey as LCV3StateSignatureKey>::sign_state(
519            state_private_key,
520            signed_state_digest,
521        )
522        .wrap()
523        .context(error!("Failed to sign the light client state"))?;
524        let state_vote = LightClientStateUpdateVote2 {
525            epoch: EpochNumber::new(epoch_from_block_number(leaf.height(), epoch_height)),
526            light_client_state,
527            next_stake_table_state,
528            signature,
529            v2_signature,
530            auth_root,
531            signed_state_digest,
532        };
533        broadcast_event(
534            Arc::new(HotShotEvent::EpochRootQuorumVoteSend(
535                EpochRootQuorumVote2 { vote, state_vote },
536            )),
537            &sender,
538        )
539        .await;
540    } else {
541        tracing::debug!(
542            "sending vote to next quorum leader {:?}",
543            vote.view_number() + 1
544        );
545        broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await;
546    }
547
548    Ok(())
549}