1use std::{sync::Arc, time::Instant};
8
9use async_broadcast::{InactiveReceiver, Sender};
10use chrono::Utc;
11use committable::Committable;
12use hotshot_types::{
13 consensus::OuterConsensus,
14 data::{Leaf2, QuorumProposalWrapper, VidDisperseShare},
15 drb::INITIAL_DRB_RESULT,
16 epoch_membership::{EpochMembership, EpochMembershipCoordinator},
17 event::{Event, EventType},
18 message::{Proposal, UpgradeLock},
19 simple_vote::{EpochRootQuorumVote2, LightClientStateUpdateVote2, QuorumData2, QuorumVote2},
20 storage_metrics::StorageMetricsValue,
21 traits::{
22 block_contents::BlockHeader,
23 election::Membership,
24 node_implementation::{ConsensusTime, NodeImplementation, NodeType},
25 signature_key::{
26 LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StateSignatureKey,
27 },
28 storage::Storage,
29 ValidatedState,
30 },
31 utils::{epoch_from_block_number, is_epoch_transition, is_last_block, is_transition_block},
32 vote::HasViewNumber,
33};
34use hotshot_utils::anytrace::*;
35use tracing::instrument;
36use vbs::version::StaticVersionType;
37
38use super::QuorumVoteTaskState;
39use crate::{
40 events::HotShotEvent,
41 helpers::{
42 broadcast_event, decide_from_proposal, decide_from_proposal_2, derive_signed_state_digest,
43 fetch_proposal, handle_drb_result, LeafChainTraversalOutcome,
44 },
45 quorum_vote::Versions,
46};
47
48async fn store_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
50 task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
51 decided_leaf: &Leaf2<TYPES>,
52) -> Result<()> {
53 if task_state.epoch_height == 0 {
54 tracing::info!("Epoch height is 0, skipping DRB storage.");
55 return Ok(());
56 }
57
58 let decided_block_number = decided_leaf.block_header().block_number();
59 let current_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
60 decided_block_number,
61 task_state.epoch_height,
62 ));
63 if is_transition_block(decided_block_number, task_state.epoch_height) {
65 if let Some(result) = decided_leaf.next_drb_result {
66 handle_drb_result::<TYPES, I>(
69 task_state.membership.membership(),
70 current_epoch_number + 1,
71 &task_state.storage,
72 result,
73 )
74 .await;
75 } else {
76 bail!("The last block of the epoch is decided but doesn't contain a DRB result.");
77 }
78 }
79 Ok(())
80}
81
82#[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number()))]
84pub(crate) async fn handle_quorum_proposal_validated<
85 TYPES: NodeType,
86 I: NodeImplementation<TYPES>,
87 V: Versions,
88>(
89 proposal: &QuorumProposalWrapper<TYPES>,
90 task_state: &mut QuorumVoteTaskState<TYPES, I, V>,
91 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
92) -> Result<()> {
93 let version = task_state
94 .upgrade_lock
95 .version(proposal.view_number())
96 .await?;
97
98 let LeafChainTraversalOutcome {
99 new_locked_view_number,
100 new_decided_view_number,
101 committing_qc,
102 deciding_qc,
103 leaf_views,
104 included_txns,
105 decided_upgrade_cert,
106 } = if version >= V::Epochs::VERSION {
107 if !is_last_block(
110 proposal.block_header().block_number(),
111 task_state.epoch_height,
112 ) {
113 decide_from_proposal_2::<TYPES, I>(
114 proposal,
115 OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
116 Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
117 &task_state.public_key,
118 version >= V::Epochs::VERSION,
119 &task_state.membership,
120 &task_state.storage,
121 )
122 .await
123 } else {
124 LeafChainTraversalOutcome::default()
125 }
126 } else {
127 decide_from_proposal::<TYPES, I>(
128 proposal,
129 OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
130 Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate),
131 &task_state.public_key,
132 version >= V::Epochs::VERSION,
133 &task_state.membership,
134 &task_state.storage,
135 task_state.epoch_height,
136 )
137 .await
138 };
139
140 if let (Some(cert), Some(_)) = (decided_upgrade_cert.clone(), new_decided_view_number) {
141 let mut decided_certificate_lock = task_state
142 .upgrade_lock
143 .decided_upgrade_certificate
144 .write()
145 .await;
146 *decided_certificate_lock = Some(cert.clone());
147 drop(decided_certificate_lock);
148 if cert.data.new_version >= V::Epochs::VERSION && V::Base::VERSION < V::Epochs::VERSION {
149 let epoch_height = task_state.consensus.read().await.epoch_height;
150 let first_epoch_number = TYPES::Epoch::new(epoch_from_block_number(
151 proposal.block_header().block_number(),
152 epoch_height,
153 ));
154
155 tracing::debug!("Calling set_first_epoch for epoch {first_epoch_number:?}");
156 task_state
157 .membership
158 .membership()
159 .write()
160 .await
161 .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
162
163 broadcast_event(
164 Arc::new(HotShotEvent::SetFirstEpoch(
165 cert.data.new_version_first_view,
166 first_epoch_number,
167 )),
168 event_sender,
169 )
170 .await;
171 }
172
173 for da_committee in &task_state.da_committees {
174 if cert.data.new_version >= da_committee.start_version {
175 task_state
176 .membership
177 .membership()
178 .write()
179 .await
180 .add_da_committee(da_committee.start_epoch, da_committee.committee.clone());
181 }
182 }
183
184 let _ = task_state
185 .storage
186 .update_decided_upgrade_certificate(Some(cert.clone()))
187 .await;
188 }
189
190 let mut consensus_writer = task_state.consensus.write().await;
191 if let Some(locked_view_number) = new_locked_view_number {
192 let _ = consensus_writer.update_locked_view(locked_view_number);
193 }
194
195 #[allow(clippy::cast_precision_loss)]
196 if let Some(decided_view_number) = new_decided_view_number {
197 let old_decided_view = consensus_writer.last_decided_view();
200 consensus_writer.collect_garbage(old_decided_view, decided_view_number);
201
202 consensus_writer
204 .update_last_decided_view(decided_view_number)
205 .context(|e| {
206 warn!("`update_last_decided_view` failed; this should never happen. Error: {e}")
207 })?;
208
209 consensus_writer
210 .metrics
211 .last_decided_time
212 .set(Utc::now().timestamp().try_into().unwrap());
213 consensus_writer.metrics.invalid_qc.set(0);
214 consensus_writer
215 .metrics
216 .last_decided_view
217 .set(usize::try_from(consensus_writer.last_decided_view().u64()).unwrap());
218 let cur_number_of_views_per_decide_event =
219 *proposal.view_number() - consensus_writer.last_decided_view().u64();
220 consensus_writer
221 .metrics
222 .number_of_views_per_decide_event
223 .add_point(cur_number_of_views_per_decide_event as f64);
224
225 drop(consensus_writer);
227
228 for leaf_info in &leaf_views {
229 tracing::info!(
230 "Sending decide for view {:?} at height {:?}",
231 leaf_info.leaf.view_number(),
232 leaf_info.leaf.block_header().block_number(),
233 );
234 }
235
236 broadcast_event(
237 Arc::new(HotShotEvent::LeavesDecided(
238 leaf_views
239 .iter()
240 .map(|leaf_info| leaf_info.leaf.clone())
241 .collect(),
242 )),
243 event_sender,
244 )
245 .await;
246
247 let committing_qc = Arc::new(committing_qc.unwrap());
250 broadcast_event(
251 Event {
252 view_number: decided_view_number,
253 event: EventType::Decide {
254 leaf_chain: Arc::new(leaf_views.clone()),
255 committing_qc: committing_qc.clone(),
256 deciding_qc: deciding_qc.map(Arc::new),
257 block_size: included_txns.map(|txns| txns.len().try_into().unwrap()),
258 },
259 },
260 &task_state.output_event_stream,
261 )
262 .await;
263
264 tracing::debug!(
265 "Successfully sent decide event, leaf views: {:?}, leaf views len: {:?}, qc view: {:?}",
266 decided_view_number,
267 leaf_views.len(),
268 committing_qc.view_number()
269 );
270
271 if version >= V::Epochs::VERSION {
272 for leaf_view in leaf_views {
273 store_drb_result(task_state, &leaf_view.leaf).await?;
274 }
275 }
276 }
277
278 Ok(())
279}
280
281#[instrument(skip_all, target = "VoteDependencyHandle", fields(view = *view_number))]
283#[allow(clippy::too_many_arguments)]
284pub(crate) async fn update_shared_state<TYPES: NodeType, V: Versions>(
285 consensus: OuterConsensus<TYPES>,
286 sender: Sender<Arc<HotShotEvent<TYPES>>>,
287 receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
288 membership: EpochMembershipCoordinator<TYPES>,
289 public_key: TYPES::SignatureKey,
290 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
291 upgrade_lock: UpgradeLock<TYPES, V>,
292 view_number: TYPES::View,
293 instance_state: Arc<TYPES::InstanceState>,
294 proposed_leaf: &Leaf2<TYPES>,
295 vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
296 parent_view_number: Option<TYPES::View>,
297 epoch_height: u64,
298) -> Result<()> {
299 let justify_qc = &proposed_leaf.justify_qc();
300
301 let consensus_reader = consensus.read().await;
302 let mut maybe_validated_view = parent_view_number.and_then(|view_number| {
305 consensus_reader
306 .validated_state_map()
307 .get(&view_number)
308 .cloned()
309 });
310
311 let mut maybe_parent = consensus_reader
313 .saved_leaves()
314 .get(&justify_qc.data.leaf_commit)
315 .cloned();
316
317 drop(consensus_reader);
318
319 maybe_parent = match maybe_parent {
320 Some(p) => Some(p),
321 None => {
322 match fetch_proposal(
323 justify_qc,
324 sender.clone(),
325 receiver.activate_cloned(),
326 membership.clone(),
327 OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
328 public_key.clone(),
329 private_key.clone(),
330 &upgrade_lock,
331 epoch_height,
332 )
333 .await
334 .ok()
335 {
336 Some((leaf, view)) => {
337 maybe_validated_view = Some(view);
338 Some(leaf)
339 },
340 None => None,
341 }
342 },
343 };
344
345 let parent = maybe_parent.context(info!(
346 "Proposal's parent missing from storage with commitment: {:?}, proposal view {}",
347 justify_qc.data.leaf_commit,
348 proposed_leaf.view_number(),
349 ))?;
350
351 let Some(validated_view) = maybe_validated_view else {
352 bail!("Failed to fetch view for parent, parent view {parent_view_number:?}");
353 };
354
355 let (Some(parent_state), _) = validated_view.state_and_delta() else {
356 bail!("Parent state not found! Consensus internally inconsistent");
357 };
358
359 let version = upgrade_lock.version(view_number).await?;
360
361 let now = Instant::now();
362 let (validated_state, state_delta) = parent_state
363 .validate_and_apply_header(
364 &instance_state,
365 &parent,
366 &proposed_leaf.block_header().clone(),
367 vid_share.data.payload_byte_len(),
368 version,
369 *view_number,
370 )
371 .await
372 .wrap()
373 .context(warn!("Block header doesn't extend the proposal!"))?;
374 let validation_duration = now.elapsed();
375 tracing::debug!("Validation time: {validation_duration:?}");
376
377 let now = Instant::now();
378 let mut consensus_writer = consensus.write().await;
380
381 if let Err(e) = consensus_writer.update_leaf(
382 proposed_leaf.clone(),
383 Arc::new(validated_state),
384 Some(Arc::new(state_delta)),
385 ) {
386 tracing::trace!("{e:?}");
387 }
388 let update_leaf_duration = now.elapsed();
389
390 consensus_writer
391 .metrics
392 .validate_and_apply_header_duration
393 .add_point(validation_duration.as_secs_f64());
394 consensus_writer
395 .metrics
396 .update_leaf_duration
397 .add_point(update_leaf_duration.as_secs_f64());
398 drop(consensus_writer);
399 tracing::debug!("update_leaf time: {update_leaf_duration:?}");
400
401 Ok(())
402}
403
404#[instrument(skip_all, fields(name = "Submit quorum vote", level = "error"))]
406#[allow(clippy::too_many_arguments)]
407pub(crate) async fn submit_vote<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
408 sender: Sender<Arc<HotShotEvent<TYPES>>>,
409 membership: EpochMembership<TYPES>,
410 public_key: TYPES::SignatureKey,
411 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
412 upgrade_lock: UpgradeLock<TYPES, V>,
413 view_number: TYPES::View,
414 storage: I::Storage,
415 storage_metrics: Arc<StorageMetricsValue>,
416 leaf: Leaf2<TYPES>,
417 vid_share: Proposal<TYPES, VidDisperseShare<TYPES>>,
418 extended_vote: bool,
419 epoch_root_vote: bool,
420 epoch_height: u64,
421 state_private_key: &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
422 stake_table_capacity: usize,
423) -> Result<()> {
424 let committee_member_in_current_epoch = membership.has_stake(&public_key).await;
425 let committee_member_in_next_epoch = leaf.with_epoch
428 && is_epoch_transition(leaf.height(), epoch_height)
429 && membership
430 .next_epoch_stake_table()
431 .await?
432 .has_stake(&public_key)
433 .await;
434
435 ensure!(
436 committee_member_in_current_epoch || committee_member_in_next_epoch,
437 info!("We were not chosen for quorum committee on {view_number}")
438 );
439
440 let height = if membership.epoch().is_some() {
441 Some(leaf.height())
442 } else {
443 None
444 };
445
446 let vote = QuorumVote2::<TYPES>::create_signed_vote(
448 QuorumData2 {
449 leaf_commit: leaf.commit(),
450 epoch: membership.epoch(),
451 block_number: height,
452 },
453 view_number,
454 &public_key,
455 &private_key,
456 &upgrade_lock,
457 )
458 .await
459 .wrap()
460 .context(error!("Failed to sign vote. This should never happen."))?;
461 let now = Instant::now();
462 storage
464 .append_vid_general(&vid_share)
465 .await
466 .wrap()
467 .context(error!("Failed to store VID share"))?;
468 let append_vid_duration = now.elapsed();
469 storage_metrics
470 .append_vid_duration
471 .add_point(append_vid_duration.as_secs_f64());
472 tracing::debug!("append_vid_general time: {append_vid_duration:?}");
473
474 let epoch_enabled = upgrade_lock.epochs_enabled(view_number).await;
477 if extended_vote && epoch_enabled {
478 tracing::debug!("sending extended vote to everybody",);
479 broadcast_event(
480 Arc::new(HotShotEvent::ExtendedQuorumVoteSend(vote)),
481 &sender,
482 )
483 .await;
484 } else if epoch_root_vote && epoch_enabled {
485 tracing::debug!(
486 "sending epoch root vote to next quorum leader {:?}",
487 vote.view_number() + 1
488 );
489 let light_client_state = leaf
490 .block_header()
491 .get_light_client_state(view_number)
492 .wrap()
493 .context(error!("Failed to generate light client state"))?;
494 let next_stake_table = membership
495 .next_epoch_stake_table()
496 .await?
497 .stake_table()
498 .await;
499 let next_stake_table_state = next_stake_table
500 .commitment(stake_table_capacity)
501 .wrap()
502 .context(error!("Failed to compute stake table commitment"))?;
503 let v2_signature = <TYPES::StateSignatureKey as LCV2StateSignatureKey>::sign_state(
505 state_private_key,
506 &light_client_state,
507 &next_stake_table_state,
508 )
509 .wrap()
510 .context(error!("Failed to sign the light client state"))?;
511 let auth_root = leaf
512 .block_header()
513 .auth_root()
514 .wrap()
515 .context(error!(format!(
516 "Failed to get auth root for light client state certificate. view={view_number}"
517 )))?;
518 let signed_state_digest =
519 derive_signed_state_digest(&light_client_state, &next_stake_table_state, &auth_root);
520 let signature = <TYPES::StateSignatureKey as LCV3StateSignatureKey>::sign_state(
521 state_private_key,
522 signed_state_digest,
523 )
524 .wrap()
525 .context(error!("Failed to sign the light client state"))?;
526 let state_vote = LightClientStateUpdateVote2 {
527 epoch: TYPES::Epoch::new(epoch_from_block_number(leaf.height(), epoch_height)),
528 light_client_state,
529 next_stake_table_state,
530 signature,
531 v2_signature,
532 auth_root,
533 signed_state_digest,
534 };
535 broadcast_event(
536 Arc::new(HotShotEvent::EpochRootQuorumVoteSend(
537 EpochRootQuorumVote2 { vote, state_vote },
538 )),
539 &sender,
540 )
541 .await;
542 } else {
543 tracing::debug!(
544 "sending vote to next quorum leader {:?}",
545 vote.view_number() + 1
546 );
547 broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await;
548 }
549
550 Ok(())
551}