1use std::{sync::Arc, time::Instant};
8
9use async_broadcast::{InactiveReceiver, Sender};
10use chrono::Utc;
11use committable::Committable;
12use hotshot_types::{
13 consensus::OuterConsensus,
14 data::{Leaf2, QuorumProposalWrapper, VidDisperseShare},
15 drb::{DrbResult, INITIAL_DRB_RESULT},
16 epoch_membership::{EpochMembership, EpochMembershipCoordinator},
17 event::{Event, EventType},
18 message::{Proposal, UpgradeLock},
19 simple_vote::{EpochRootQuorumVote, LightClientStateUpdateVote, QuorumData2, QuorumVote2},
20 storage_metrics::StorageMetricsValue,
21 traits::{
22 block_contents::BlockHeader,
23 election::Membership,
24 node_implementation::{ConsensusTime, NodeImplementation, NodeType},
25 signature_key::{SignatureKey, StateSignatureKey},
26 storage::Storage,
27 ValidatedState,
28 },
29 utils::{
30 epoch_from_block_number, is_epoch_transition, is_last_block, is_transition_block,
31 option_epoch_from_block_number,
32 },
33 vote::HasViewNumber,
34};
35use hotshot_utils::anytrace::*;
36use tracing::instrument;
37use vbs::version::StaticVersionType;
38
39use super::QuorumVoteTaskState;
40use crate::{
41 events::HotShotEvent,
42 helpers::{
43 broadcast_event, decide_from_proposal, decide_from_proposal_2, fetch_proposal,
44 handle_drb_result, LeafChainTraversalOutcome,
45 },
46 quorum_vote::Versions,
47};
48
49async fn get_computed_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
53 epoch_number: TYPES::Epoch,
54 task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
55) -> Option<DrbResult> {
56 task_state
58 .consensus
59 .read()
60 .await
61 .drb_results
62 .results
63 .get(&epoch_number)
64 .cloned()
65}
66
67async fn verify_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
74 proposal: &QuorumProposalWrapper<TYPES>,
75 task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
76) -> Result<()> {
77 if task_state.epoch_height == 0
79 || !is_epoch_transition(
80 proposal.block_header().block_number(),
81 task_state.epoch_height,
82 )
83 {
84 tracing::debug!("Skipping DRB result verification");
85 return Ok(());
86 }
87
88 let epoch = option_epoch_from_block_number::<TYPES>(
92 task_state
93 .upgrade_lock
94 .epochs_enabled(proposal.view_number())
95 .await,
96 proposal.block_header().block_number(),
97 task_state.epoch_height,
98 );
99
100 let proposal_result = proposal
101 .next_drb_result()
102 .context(info!("Proposal is missing the DRB result."))?;
103
104 if let Some(epoch_val) = epoch {
105 let has_stake_current_epoch = task_state
106 .membership
107 .stake_table_for_epoch(epoch)
108 .await
109 .context(warn!("No stake table for epoch {epoch_val}"))?
110 .has_stake(&task_state.public_key)
111 .await;
112
113 if has_stake_current_epoch {
114 let computed_result = get_computed_drb_result(epoch_val + 1, task_state)
115 .await
116 .context(warn!("DRB result not found"))?;
117
118 ensure!(
119 proposal_result == computed_result,
120 warn!(
121 "Our calculated DRB result is {computed_result:?}, which does not match the \
122 proposed DRB result of {proposal_result:?}"
123 )
124 );
125 }
126
127 Ok(())
128 } else {
129 Err(error!("Epochs are not available"))
130 }
131}
132
133async fn store_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
135 task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
136 decided_leaf: &Leaf2<TYPES>,
137) -> Result<()> {
138 if task_state.epoch_height == 0 {
139 tracing::info!("Epoch height is 0, skipping DRB storage.");
140 return Ok(());
141 }
142
143 let decided_block_number = decided_leaf.block_header().block_number();
144 let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
145 decided_block_number,
146 task_state.epoch_height,
147 ));
148 if is_transition_block(decided_block_number, task_state.epoch_height) {
150 if let Some(result) = decided_leaf.next_drb_result {
151 handle_drb_result::<TYPES, I>(
154 task_state.membership.membership(),
155 current_epoch_number + 1,
156 &task_state.storage,
157 &task_state.consensus,
158 result,
159 )
160 .await;
161 } else {
162 bail!("The last block of the epoch is decided but doesn't contain a DRB result.");
163 }
164 }
165 Ok(())
166}
167
168#[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number()))]
170pub(crate) async fn handle_quorum_proposal_validated<
171 TYPES: NodeType,
172 I: NodeImplementation<TYPES>,
173 V: Versions,
174>(
175 proposal: &QuorumProposalWrapper<TYPES>,
176 task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
177 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
178) -> Result<()> {
179 let version = task_state
180 .upgrade_lock
181 .version(proposal.view_number())
182 .await?;
183
184 if version >= V::Epochs::VERSION {
185 verify_drb_result(proposal, task_state).await?;
187 }
188
189 let LeafChainTraversalOutcome {
190 new_locked_view_number,
191 new_decided_view_number,
192 new_decide_qc,
193 leaf_views,
194 included_txns,
195 decided_upgrade_cert,
196 } = if version >= V::Epochs::VERSION {
197 if !is_last_block(
200 proposal.block_header().block_number(),
201 task_state.epoch_height,
202 ) {
203 decide_from_proposal_2::<TYPES, I, V>(
204 proposal,
205 OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
206 Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
207 &task_state.public_key,
208 version >= V::Epochs::VERSION,
209 &task_state.membership,
210 &task_state.storage,
211 &task_state.upgrade_lock,
212 )
213 .await
214 } else {
215 LeafChainTraversalOutcome::default()
216 }
217 } else {
218 decide_from_proposal::<TYPES, I, V>(
219 proposal,
220 OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
221 Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
222 &task_state.public_key,
223 version >= V::Epochs::VERSION,
224 task_state.membership.membership(),
225 &task_state.storage,
226 task_state.epoch_height,
227 &task_state.upgrade_lock,
228 )
229 .await
230 };
231
232 if let (Some(cert), Some(_)) = (decided_upgrade_cert.clone(), new_decided_view_number) {
233 let mut decided_certificate_lock = task_state
234 .upgrade_lock
235 .decided_upgrade_certificate
236 .write()
237 .await;
238 *decided_certificate_lock = Some(cert.clone());
239 drop(decided_certificate_lock);
240
241 if cert.data.new_version == V::Epochs::VERSION {
242 let epoch_height = task_state.consensus.read().await.epoch_height;
243 let first_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
244 proposal.block_header().block_number(),
245 epoch_height,
246 ));
247
248 tracing::debug!("Calling set_first_epoch for epoch {first_epoch_number:?}");
249 task_state
250 .membership
251 .membership()
252 .write()
253 .await
254 .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
255
256 broadcast_event(
257 Arc::new(HotShotEvent::SetFirstEpoch(
258 cert.data.new_version_first_view,
259 first_epoch_number,
260 )),
261 event_sender,
262 )
263 .await;
264 }
265
266 let _ = task_state
267 .storage
268 .update_decided_upgrade_certificate(Some(cert.clone()))
269 .await;
270 }
271
272 let mut consensus_writer = task_state.consensus.write().await;
273 if let Some(locked_view_number) = new_locked_view_number {
274 let _ = consensus_writer.update_locked_view(locked_view_number);
275 }
276
277 #[allow(clippy::cast_precision_loss)]
278 if let Some(decided_view_number) = new_decided_view_number {
279 let old_decided_view = consensus_writer.last_decided_view();
282 consensus_writer.collect_garbage(old_decided_view, decided_view_number);
283
284 consensus_writer
286 .update_last_decided_view(decided_view_number)
287 .context(|e| {
288 warn!("`update_last_decided_view` failed; this should never happen. Error: {e}")
289 })?;
290
291 consensus_writer
292 .metrics
293 .last_decided_time
294 .set(Utc::now().timestamp().try_into().unwrap());
295 consensus_writer.metrics.invalid_qc.set(0);
296 consensus_writer
297 .metrics
298 .last_decided_view
299 .set(usize::try_from(consensus_writer.last_decided_view().u64()).unwrap());
300 let cur_number_of_views_per_decide_event =
301 *proposal.view_number() - consensus_writer.last_decided_view().u64();
302 consensus_writer
303 .metrics
304 .number_of_views_per_decide_event
305 .add_point(cur_number_of_views_per_decide_event as f64);
306
307 drop(consensus_writer);
309
310 for leaf_info in &leaf_views {
311 tracing::info!(
312 "Sending decide for view {:?} at height {:?}",
313 leaf_info.leaf.view_number(),
314 leaf_info.leaf.block_header().block_number(),
315 );
316 }
317
318 broadcast_event(
320 Event {
321 view_number: decided_view_number,
322 event: EventType::Decide {
323 leaf_chain: Arc::new(leaf_views.clone()),
324 qc: Arc::new(new_decide_qc.clone().unwrap()),
326 block_size: included_txns.map(|txns| txns.len().try_into().unwrap()),
327 },
328 },
329 &task_state.output_event_stream,
330 )
331 .await;
332
333 tracing::debug!(
334 "Successfully sent decide event, leaf views: {:?}, leaf views len: {:?}, qc view: {:?}",
335 decided_view_number,
336 leaf_views.len(),
337 new_decide_qc.as_ref().unwrap().view_number()
338 );
339
340 if version >= V::Epochs::VERSION {
341 for leaf_view in leaf_views {
342 store_drb_result(task_state, &leaf_view.leaf).await?;
343 }
344 }
345 }
346
347 Ok(())
348}
349
350#[instrument(skip_all, target = "VoteDependencyHandle", fields(view = *view_number))]
352#[allow(clippy::too_many_arguments)]
353pub(crate) async fn update_shared_state<
354 TYPES: NodeType,
355 I: NodeImplementation<TYPES>,
356 V: Versions,
357>(
358 consensus: OuterConsensus<TYPES>,
359 sender: Sender<Arc<HotShotEvent<TYPES>>>,
360 receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
361 membership: EpochMembershipCoordinator<TYPES>,
362 public_key: TYPES::SignatureKey,
363 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
364 upgrade_lock: UpgradeLock<TYPES, V>,
365 view_number: TYPES::View,
366 instance_state: Arc<TYPES::InstanceState>,
367 proposed_leaf: &Leaf2<TYPES>,
368 vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
369 parent_view_number: Option<TYPES::View>,
370 epoch_height: u64,
371) -> Result<()> {
372 let justify_qc = &proposed_leaf.justify_qc();
373
374 let consensus_reader = consensus.read().await;
375 let mut maybe_validated_view = parent_view_number.and_then(|view_number| {
378 consensus_reader
379 .validated_state_map()
380 .get(&view_number)
381 .cloned()
382 });
383
384 let mut maybe_parent = consensus_reader
386 .saved_leaves()
387 .get(&justify_qc.data.leaf_commit)
388 .cloned();
389
390 drop(consensus_reader);
391
392 maybe_parent = match maybe_parent {
393 Some(p) => Some(p),
394 None => {
395 match fetch_proposal(
396 justify_qc,
397 sender.clone(),
398 receiver.activate_cloned(),
399 membership.clone(),
400 OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
401 public_key.clone(),
402 private_key.clone(),
403 &upgrade_lock,
404 epoch_height,
405 )
406 .await
407 .ok()
408 {
409 Some((leaf, view)) => {
410 maybe_validated_view = Some(view);
411 Some(leaf)
412 },
413 None => None,
414 }
415 },
416 };
417
418 let parent = maybe_parent.context(info!(
419 "Proposal's parent missing from storage with commitment: {:?}, proposal view {}",
420 justify_qc.data.leaf_commit,
421 proposed_leaf.view_number(),
422 ))?;
423
424 let Some(validated_view) = maybe_validated_view else {
425 bail!("Failed to fetch view for parent, parent view {parent_view_number:?}");
426 };
427
428 let (Some(parent_state), _) = validated_view.state_and_delta() else {
429 bail!("Parent state not found! Consensus internally inconsistent");
430 };
431
432 let version = upgrade_lock.version(view_number).await?;
433
434 let now = Instant::now();
435 let (validated_state, state_delta) = parent_state
436 .validate_and_apply_header(
437 &instance_state,
438 &parent,
439 &proposed_leaf.block_header().clone(),
440 vid_share.data.payload_byte_len(),
441 version,
442 *view_number,
443 )
444 .await
445 .wrap()
446 .context(warn!("Block header doesn't extend the proposal!"))?;
447 let validation_duration = now.elapsed();
448 tracing::debug!("Validation time: {validation_duration:?}");
449
450 let now = Instant::now();
451 let mut consensus_writer = consensus.write().await;
453
454 if let Err(e) = consensus_writer.update_leaf(
455 proposed_leaf.clone(),
456 Arc::new(validated_state),
457 Some(Arc::new(state_delta)),
458 ) {
459 tracing::trace!("{e:?}");
460 }
461 let update_leaf_duration = now.elapsed();
462
463 consensus_writer
464 .metrics
465 .validate_and_apply_header_duration
466 .add_point(validation_duration.as_secs_f64());
467 consensus_writer
468 .metrics
469 .update_leaf_duration
470 .add_point(update_leaf_duration.as_secs_f64());
471 drop(consensus_writer);
472 tracing::debug!("update_leaf time: {update_leaf_duration:?}");
473
474 Ok(())
475}
476
477#[instrument(skip_all, fields(name = "Submit quorum vote", level = "error"))]
479#[allow(clippy::too_many_arguments)]
480pub(crate) async fn submit_vote<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
481 sender: Sender<Arc<HotShotEvent<TYPES>>>,
482 membership: EpochMembership<TYPES>,
483 public_key: TYPES::SignatureKey,
484 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
485 upgrade_lock: UpgradeLock<TYPES, V>,
486 view_number: TYPES::View,
487 storage: I::Storage,
488 storage_metrics: Arc<StorageMetricsValue>,
489 leaf: Leaf2<TYPES>,
490 vid_share: Proposal<TYPES, VidDisperseShare<TYPES>>,
491 extended_vote: bool,
492 epoch_root_vote: bool,
493 epoch_height: u64,
494 state_private_key: &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
495 stake_table_capacity: usize,
496) -> Result<()> {
497 let committee_member_in_current_epoch = membership.has_stake(&public_key).await;
498 let committee_member_in_next_epoch = leaf.with_epoch
501 && is_epoch_transition(leaf.height(), epoch_height)
502 && membership
503 .next_epoch_stake_table()
504 .await?
505 .has_stake(&public_key)
506 .await;
507
508 ensure!(
509 committee_member_in_current_epoch || committee_member_in_next_epoch,
510 info!("We were not chosen for quorum committee on {view_number}")
511 );
512
513 let height = if membership.epoch().is_some() {
514 Some(leaf.height())
515 } else {
516 None
517 };
518
519 let vote = QuorumVote2::<TYPES>::create_signed_vote(
521 QuorumData2 {
522 leaf_commit: leaf.commit(),
523 epoch: membership.epoch(),
524 block_number: height,
525 },
526 view_number,
527 &public_key,
528 &private_key,
529 &upgrade_lock,
530 )
531 .await
532 .wrap()
533 .context(error!("Failed to sign vote. This should never happen."))?;
534 let now = Instant::now();
535 storage
537 .append_vid_general(&vid_share)
538 .await
539 .wrap()
540 .context(error!("Failed to store VID share"))?;
541 let append_vid_duration = now.elapsed();
542 storage_metrics
543 .append_vid_duration
544 .add_point(append_vid_duration.as_secs_f64());
545 tracing::debug!("append_vid_general time: {append_vid_duration:?}");
546
547 let epoch_enabled = upgrade_lock.epochs_enabled(view_number).await;
550 if extended_vote && epoch_enabled {
551 tracing::debug!("sending extended vote to everybody",);
552 broadcast_event(
553 Arc::new(HotShotEvent::ExtendedQuorumVoteSend(vote)),
554 &sender,
555 )
556 .await;
557 } else if epoch_root_vote && epoch_enabled {
558 tracing::debug!(
559 "sending epoch root vote to next quorum leader {:?}",
560 vote.view_number() + 1
561 );
562 let light_client_state = leaf
563 .block_header()
564 .get_light_client_state(view_number)
565 .wrap()
566 .context(error!("Failed to generate light client state"))?;
567 let next_stake_table = membership
568 .next_epoch_stake_table()
569 .await?
570 .stake_table()
571 .await;
572 let next_stake_table_state = next_stake_table
573 .commitment(stake_table_capacity)
574 .wrap()
575 .context(error!("Failed to compute stake table commitment"))?;
576 let signature = <TYPES::StateSignatureKey as StateSignatureKey>::sign_state(
577 state_private_key,
578 &light_client_state,
579 &next_stake_table_state,
580 )
581 .wrap()
582 .context(error!("Failed to sign the light client state"))?;
583 let state_vote = LightClientStateUpdateVote {
584 epoch: TYPES::Epoch::new(epoch_from_block_number(leaf.height(), epoch_height)),
585 light_client_state,
586 next_stake_table_state,
587 signature,
588 };
589 broadcast_event(
590 Arc::new(HotShotEvent::EpochRootQuorumVoteSend(EpochRootQuorumVote {
591 vote,
592 state_vote,
593 })),
594 &sender,
595 )
596 .await;
597 } else {
598 tracing::debug!(
599 "sending vote to next quorum leader {:?}",
600 vote.view_number() + 1
601 );
602 broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await;
603 }
604
605 Ok(())
606}