1use std::{sync::Arc, time::Instant};
8
9use async_broadcast::{InactiveReceiver, Sender};
10use chrono::Utc;
11use committable::Committable;
12use hotshot_types::{
13 consensus::OuterConsensus,
14 data::{Leaf2, QuorumProposalWrapper, VidDisperseShare},
15 drb::INITIAL_DRB_RESULT,
16 epoch_membership::{EpochMembership, EpochMembershipCoordinator},
17 event::{Event, EventType},
18 message::{Proposal, UpgradeLock},
19 simple_vote::{EpochRootQuorumVote, LightClientStateUpdateVote, QuorumData2, QuorumVote2},
20 storage_metrics::StorageMetricsValue,
21 traits::{
22 block_contents::BlockHeader,
23 election::Membership,
24 node_implementation::{ConsensusTime, NodeImplementation, NodeType},
25 signature_key::{
26 LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StateSignatureKey,
27 },
28 storage::Storage,
29 ValidatedState,
30 },
31 utils::{epoch_from_block_number, is_epoch_transition, is_last_block, is_transition_block},
32 vote::HasViewNumber,
33};
34use hotshot_utils::anytrace::*;
35use tracing::instrument;
36use vbs::version::StaticVersionType;
37
38use super::QuorumVoteTaskState;
39use crate::{
40 events::HotShotEvent,
41 helpers::{
42 broadcast_event, decide_from_proposal, decide_from_proposal_2, derive_signed_state_digest,
43 fetch_proposal, handle_drb_result, LeafChainTraversalOutcome,
44 },
45 quorum_vote::Versions,
46};
47
48async fn store_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
50 task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
51 decided_leaf: &Leaf2<TYPES>,
52) -> Result<()> {
53 if task_state.epoch_height == 0 {
54 tracing::info!("Epoch height is 0, skipping DRB storage.");
55 return Ok(());
56 }
57
58 let decided_block_number = decided_leaf.block_header().block_number();
59 let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
60 decided_block_number,
61 task_state.epoch_height,
62 ));
63 if is_transition_block(decided_block_number, task_state.epoch_height) {
65 if let Some(result) = decided_leaf.next_drb_result {
66 handle_drb_result::<TYPES, I>(
69 task_state.membership.membership(),
70 current_epoch_number + 1,
71 &task_state.storage,
72 result,
73 )
74 .await;
75 } else {
76 bail!("The last block of the epoch is decided but doesn't contain a DRB result.");
77 }
78 }
79 Ok(())
80}
81
82#[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number()))]
84pub(crate) async fn handle_quorum_proposal_validated<
85 TYPES: NodeType,
86 I: NodeImplementation<TYPES>,
87 V: Versions,
88>(
89 proposal: &QuorumProposalWrapper<TYPES>,
90 task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
91 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
92) -> Result<()> {
93 let version = task_state
94 .upgrade_lock
95 .version(proposal.view_number())
96 .await?;
97
98 let LeafChainTraversalOutcome {
99 new_locked_view_number,
100 new_decided_view_number,
101 new_decide_qc,
102 leaf_views,
103 included_txns,
104 decided_upgrade_cert,
105 } = if version >= V::Epochs::VERSION {
106 if !is_last_block(
109 proposal.block_header().block_number(),
110 task_state.epoch_height,
111 ) {
112 decide_from_proposal_2::<TYPES, I>(
113 proposal,
114 OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
115 Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
116 &task_state.public_key,
117 version >= V::Epochs::VERSION,
118 &task_state.membership,
119 &task_state.storage,
120 )
121 .await
122 } else {
123 LeafChainTraversalOutcome::default()
124 }
125 } else {
126 decide_from_proposal::<TYPES, I>(
127 proposal,
128 OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
129 Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
130 &task_state.public_key,
131 version >= V::Epochs::VERSION,
132 &task_state.membership,
133 &task_state.storage,
134 task_state.epoch_height,
135 )
136 .await
137 };
138
139 if let (Some(cert), Some(_)) = (decided_upgrade_cert.clone(), new_decided_view_number) {
140 let mut decided_certificate_lock = task_state
141 .upgrade_lock
142 .decided_upgrade_certificate
143 .write()
144 .await;
145 *decided_certificate_lock = Some(cert.clone());
146 drop(decided_certificate_lock);
147 if cert.data.new_version >= V::Epochs::VERSION && V::Base::VERSION < V::Epochs::VERSION {
148 let epoch_height = task_state.consensus.read().await.epoch_height;
149 let first_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
150 proposal.block_header().block_number(),
151 epoch_height,
152 ));
153
154 tracing::debug!("Calling set_first_epoch for epoch {first_epoch_number:?}");
155 task_state
156 .membership
157 .membership()
158 .write()
159 .await
160 .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
161
162 broadcast_event(
163 Arc::new(HotShotEvent::SetFirstEpoch(
164 cert.data.new_version_first_view,
165 first_epoch_number,
166 )),
167 event_sender,
168 )
169 .await;
170 }
171
172 let _ = task_state
173 .storage
174 .update_decided_upgrade_certificate(Some(cert.clone()))
175 .await;
176 }
177
178 let mut consensus_writer = task_state.consensus.write().await;
179 if let Some(locked_view_number) = new_locked_view_number {
180 let _ = consensus_writer.update_locked_view(locked_view_number);
181 }
182
183 #[allow(clippy::cast_precision_loss)]
184 if let Some(decided_view_number) = new_decided_view_number {
185 let old_decided_view = consensus_writer.last_decided_view();
188 consensus_writer.collect_garbage(old_decided_view, decided_view_number);
189
190 consensus_writer
192 .update_last_decided_view(decided_view_number)
193 .context(|e| {
194 warn!("`update_last_decided_view` failed; this should never happen. Error: {e}")
195 })?;
196
197 consensus_writer
198 .metrics
199 .last_decided_time
200 .set(Utc::now().timestamp().try_into().unwrap());
201 consensus_writer.metrics.invalid_qc.set(0);
202 consensus_writer
203 .metrics
204 .last_decided_view
205 .set(usize::try_from(consensus_writer.last_decided_view().u64()).unwrap());
206 let cur_number_of_views_per_decide_event =
207 *proposal.view_number() - consensus_writer.last_decided_view().u64();
208 consensus_writer
209 .metrics
210 .number_of_views_per_decide_event
211 .add_point(cur_number_of_views_per_decide_event as f64);
212
213 drop(consensus_writer);
215
216 for leaf_info in &leaf_views {
217 tracing::info!(
218 "Sending decide for view {:?} at height {:?}",
219 leaf_info.leaf.view_number(),
220 leaf_info.leaf.block_header().block_number(),
221 );
222 }
223
224 broadcast_event(
225 Arc::new(HotShotEvent::LeavesDecided(
226 leaf_views
227 .iter()
228 .map(|leaf_info| leaf_info.leaf.clone())
229 .collect(),
230 )),
231 event_sender,
232 )
233 .await;
234
235 broadcast_event(
237 Event {
238 view_number: decided_view_number,
239 event: EventType::Decide {
240 leaf_chain: Arc::new(leaf_views.clone()),
241 qc: Arc::new(new_decide_qc.clone().unwrap()),
243 block_size: included_txns.map(|txns| txns.len().try_into().unwrap()),
244 },
245 },
246 &task_state.output_event_stream,
247 )
248 .await;
249
250 tracing::debug!(
251 "Successfully sent decide event, leaf views: {:?}, leaf views len: {:?}, qc view: {:?}",
252 decided_view_number,
253 leaf_views.len(),
254 new_decide_qc.as_ref().unwrap().view_number()
255 );
256
257 if version >= V::Epochs::VERSION {
258 for leaf_view in leaf_views {
259 store_drb_result(task_state, &leaf_view.leaf).await?;
260 }
261 }
262 }
263
264 Ok(())
265}
266
267#[instrument(skip_all, target = "VoteDependencyHandle", fields(view = *view_number))]
269#[allow(clippy::too_many_arguments)]
270pub(crate) async fn update_shared_state<TYPES: NodeType, V: Versions>(
271 consensus: OuterConsensus<TYPES>,
272 sender: Sender<Arc<HotShotEvent<TYPES>>>,
273 receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
274 membership: EpochMembershipCoordinator<TYPES>,
275 public_key: TYPES::SignatureKey,
276 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
277 upgrade_lock: UpgradeLock<TYPES, V>,
278 view_number: TYPES::View,
279 instance_state: Arc<TYPES::InstanceState>,
280 proposed_leaf: &Leaf2<TYPES>,
281 vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
282 parent_view_number: Option<TYPES::View>,
283 epoch_height: u64,
284) -> Result<()> {
285 let justify_qc = &proposed_leaf.justify_qc();
286
287 let consensus_reader = consensus.read().await;
288 let mut maybe_validated_view = parent_view_number.and_then(|view_number| {
291 consensus_reader
292 .validated_state_map()
293 .get(&view_number)
294 .cloned()
295 });
296
297 let mut maybe_parent = consensus_reader
299 .saved_leaves()
300 .get(&justify_qc.data.leaf_commit)
301 .cloned();
302
303 drop(consensus_reader);
304
305 maybe_parent = match maybe_parent {
306 Some(p) => Some(p),
307 None => {
308 match fetch_proposal(
309 justify_qc,
310 sender.clone(),
311 receiver.activate_cloned(),
312 membership.clone(),
313 OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
314 public_key.clone(),
315 private_key.clone(),
316 &upgrade_lock,
317 epoch_height,
318 )
319 .await
320 .ok()
321 {
322 Some((leaf, view)) => {
323 maybe_validated_view = Some(view);
324 Some(leaf)
325 },
326 None => None,
327 }
328 },
329 };
330
331 let parent = maybe_parent.context(info!(
332 "Proposal's parent missing from storage with commitment: {:?}, proposal view {}",
333 justify_qc.data.leaf_commit,
334 proposed_leaf.view_number(),
335 ))?;
336
337 let Some(validated_view) = maybe_validated_view else {
338 bail!("Failed to fetch view for parent, parent view {parent_view_number:?}");
339 };
340
341 let (Some(parent_state), _) = validated_view.state_and_delta() else {
342 bail!("Parent state not found! Consensus internally inconsistent");
343 };
344
345 let version = upgrade_lock.version(view_number).await?;
346
347 let now = Instant::now();
348 let (validated_state, state_delta) = parent_state
349 .validate_and_apply_header(
350 &instance_state,
351 &parent,
352 &proposed_leaf.block_header().clone(),
353 vid_share.data.payload_byte_len(),
354 version,
355 *view_number,
356 )
357 .await
358 .wrap()
359 .context(warn!("Block header doesn't extend the proposal!"))?;
360 let validation_duration = now.elapsed();
361 tracing::debug!("Validation time: {validation_duration:?}");
362
363 let now = Instant::now();
364 let mut consensus_writer = consensus.write().await;
366
367 if let Err(e) = consensus_writer.update_leaf(
368 proposed_leaf.clone(),
369 Arc::new(validated_state),
370 Some(Arc::new(state_delta)),
371 ) {
372 tracing::trace!("{e:?}");
373 }
374 let update_leaf_duration = now.elapsed();
375
376 consensus_writer
377 .metrics
378 .validate_and_apply_header_duration
379 .add_point(validation_duration.as_secs_f64());
380 consensus_writer
381 .metrics
382 .update_leaf_duration
383 .add_point(update_leaf_duration.as_secs_f64());
384 drop(consensus_writer);
385 tracing::debug!("update_leaf time: {update_leaf_duration:?}");
386
387 Ok(())
388}
389
390#[instrument(skip_all, fields(name = "Submit quorum vote", level = "error"))]
392#[allow(clippy::too_many_arguments)]
393pub(crate) async fn submit_vote<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
394 sender: Sender<Arc<HotShotEvent<TYPES>>>,
395 membership: EpochMembership<TYPES>,
396 public_key: TYPES::SignatureKey,
397 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
398 upgrade_lock: UpgradeLock<TYPES, V>,
399 view_number: TYPES::View,
400 storage: I::Storage,
401 storage_metrics: Arc<StorageMetricsValue>,
402 leaf: Leaf2<TYPES>,
403 vid_share: Proposal<TYPES, VidDisperseShare<TYPES>>,
404 extended_vote: bool,
405 epoch_root_vote: bool,
406 epoch_height: u64,
407 state_private_key: &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
408 stake_table_capacity: usize,
409) -> Result<()> {
410 let committee_member_in_current_epoch = membership.has_stake(&public_key).await;
411 let committee_member_in_next_epoch = leaf.with_epoch
414 && is_epoch_transition(leaf.height(), epoch_height)
415 && membership
416 .next_epoch_stake_table()
417 .await?
418 .has_stake(&public_key)
419 .await;
420
421 ensure!(
422 committee_member_in_current_epoch || committee_member_in_next_epoch,
423 info!("We were not chosen for quorum committee on {view_number}")
424 );
425
426 let height = if membership.epoch().is_some() {
427 Some(leaf.height())
428 } else {
429 None
430 };
431
432 let vote = QuorumVote2::<TYPES>::create_signed_vote(
434 QuorumData2 {
435 leaf_commit: leaf.commit(),
436 epoch: membership.epoch(),
437 block_number: height,
438 },
439 view_number,
440 &public_key,
441 &private_key,
442 &upgrade_lock,
443 )
444 .await
445 .wrap()
446 .context(error!("Failed to sign vote. This should never happen."))?;
447 let now = Instant::now();
448 storage
450 .append_vid_general(&vid_share)
451 .await
452 .wrap()
453 .context(error!("Failed to store VID share"))?;
454 let append_vid_duration = now.elapsed();
455 storage_metrics
456 .append_vid_duration
457 .add_point(append_vid_duration.as_secs_f64());
458 tracing::debug!("append_vid_general time: {append_vid_duration:?}");
459
460 let epoch_enabled = upgrade_lock.epochs_enabled(view_number).await;
463 if extended_vote && epoch_enabled {
464 tracing::debug!("sending extended vote to everybody",);
465 broadcast_event(
466 Arc::new(HotShotEvent::ExtendedQuorumVoteSend(vote)),
467 &sender,
468 )
469 .await;
470 } else if epoch_root_vote && epoch_enabled {
471 tracing::debug!(
472 "sending epoch root vote to next quorum leader {:?}",
473 vote.view_number() + 1
474 );
475 let light_client_state = leaf
476 .block_header()
477 .get_light_client_state(view_number)
478 .wrap()
479 .context(error!("Failed to generate light client state"))?;
480 let next_stake_table = membership
481 .next_epoch_stake_table()
482 .await?
483 .stake_table()
484 .await;
485 let next_stake_table_state = next_stake_table
486 .commitment(stake_table_capacity)
487 .wrap()
488 .context(error!("Failed to compute stake table commitment"))?;
489 let v2_signature = <TYPES::StateSignatureKey as LCV2StateSignatureKey>::sign_state(
491 state_private_key,
492 &light_client_state,
493 &next_stake_table_state,
494 )
495 .wrap()
496 .context(error!("Failed to sign the light client state"))?;
497 let auth_root = leaf
498 .block_header()
499 .auth_root()
500 .wrap()
501 .context(error!(format!(
502 "Failed to get auth root for light client state certificate. view={view_number}"
503 )))?;
504 let signed_state_digest =
505 derive_signed_state_digest(&light_client_state, &next_stake_table_state, &auth_root);
506 let signature = <TYPES::StateSignatureKey as LCV3StateSignatureKey>::sign_state(
507 state_private_key,
508 signed_state_digest,
509 )
510 .wrap()
511 .context(error!("Failed to sign the light client state"))?;
512 let state_vote = LightClientStateUpdateVote {
513 epoch: TYPES::Epoch::new(epoch_from_block_number(leaf.height(), epoch_height)),
514 light_client_state,
515 next_stake_table_state,
516 signature,
517 v2_signature,
518 auth_root,
519 signed_state_digest,
520 };
521 broadcast_event(
522 Arc::new(HotShotEvent::EpochRootQuorumVoteSend(EpochRootQuorumVote {
523 vote,
524 state_vote,
525 })),
526 &sender,
527 )
528 .await;
529 } else {
530 tracing::debug!(
531 "sending vote to next quorum leader {:?}",
532 vote.view_number() + 1
533 );
534 broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await;
535 }
536
537 Ok(())
538}