1use std::{sync::Arc, time::Instant};
8
9use async_broadcast::{InactiveReceiver, Sender};
10use chrono::Utc;
11use committable::Committable;
12use hotshot_types::{
13 consensus::OuterConsensus,
14 data::{EpochNumber, Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewNumber},
15 drb::INITIAL_DRB_RESULT,
16 epoch_membership::{EpochMembership, EpochMembershipCoordinator},
17 event::{Event, EventType},
18 message::{Proposal, UpgradeLock},
19 simple_vote::{EpochRootQuorumVote2, LightClientStateUpdateVote2, QuorumData2, QuorumVote2},
20 storage_metrics::StorageMetricsValue,
21 traits::{
22 ValidatedState,
23 block_contents::BlockHeader,
24 election::Membership,
25 node_implementation::{NodeImplementation, NodeType},
26 signature_key::{
27 LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StateSignatureKey,
28 },
29 storage::Storage,
30 },
31 utils::{epoch_from_block_number, is_epoch_transition, is_last_block, is_transition_block},
32 vote::HasViewNumber,
33};
34use hotshot_utils::anytrace::*;
35use tracing::instrument;
36use versions::EPOCH_VERSION;
37
38use super::QuorumVoteTaskState;
39use crate::{
40 events::HotShotEvent,
41 helpers::{
42 LeafChainTraversalOutcome, broadcast_event, decide_from_proposal, decide_from_proposal_2,
43 derive_signed_state_digest, fetch_proposal, handle_drb_result,
44 },
45};
46
47async fn store_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
49 task_state: &mut QuorumVoteTaskState<TYPES, I>,
50 decided_leaf: &Leaf2<TYPES>,
51) -> Result<()> {
52 if task_state.epoch_height == 0 {
53 tracing::info!("Epoch height is 0, skipping DRB storage.");
54 return Ok(());
55 }
56
57 let decided_block_number = decided_leaf.block_header().block_number();
58 let current_epoch_number = EpochNumber::new(epoch_from_block_number(
59 decided_block_number,
60 task_state.epoch_height,
61 ));
62 if is_transition_block(decided_block_number, task_state.epoch_height) {
64 if let Some(result) = decided_leaf.next_drb_result {
65 handle_drb_result::<TYPES, I>(
68 task_state.membership.membership(),
69 current_epoch_number + 1,
70 &task_state.storage,
71 result,
72 )
73 .await;
74 } else {
75 bail!("The last block of the epoch is decided but doesn't contain a DRB result.");
76 }
77 }
78 Ok(())
79}
80
81#[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number()))]
83pub(crate) async fn handle_quorum_proposal_validated<
84 TYPES: NodeType,
85 I: NodeImplementation<TYPES>,
86>(
87 proposal: &QuorumProposalWrapper<TYPES>,
88 task_state: &mut QuorumVoteTaskState<TYPES, I>,
89 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
90) -> Result<()> {
91 let version = task_state.upgrade_lock.version(proposal.view_number())?;
92
93 let LeafChainTraversalOutcome {
94 new_locked_view_number,
95 new_decided_view_number,
96 committing_qc,
97 deciding_qc,
98 leaf_views,
99 included_txns,
100 decided_upgrade_cert,
101 } = if version >= EPOCH_VERSION {
102 if !is_last_block(
105 proposal.block_header().block_number(),
106 task_state.epoch_height,
107 ) {
108 decide_from_proposal_2::<TYPES, I>(
109 proposal,
110 OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
111 &task_state.upgrade_lock,
112 &task_state.public_key,
113 version >= EPOCH_VERSION,
114 &task_state.membership,
115 &task_state.storage,
116 )
117 .await
118 } else {
119 LeafChainTraversalOutcome::default()
120 }
121 } else {
122 decide_from_proposal::<TYPES, I>(
123 proposal,
124 OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
125 &task_state.upgrade_lock,
126 &task_state.public_key,
127 version >= EPOCH_VERSION,
128 &task_state.membership,
129 &task_state.storage,
130 task_state.epoch_height,
131 )
132 .await
133 };
134
135 if let (Some(cert), Some(_)) = (decided_upgrade_cert.clone(), new_decided_view_number) {
136 task_state
137 .upgrade_lock
138 .set_decided_upgrade_cert(cert.clone());
139 if cert.data.new_version >= EPOCH_VERSION
140 && task_state.upgrade_lock.upgrade().base < EPOCH_VERSION
141 {
142 let epoch_height = task_state.consensus.read().await.epoch_height;
143 let first_epoch_number = EpochNumber::new(epoch_from_block_number(
144 proposal.block_header().block_number(),
145 epoch_height,
146 ));
147
148 tracing::debug!("Calling set_first_epoch for epoch {first_epoch_number:?}");
149 task_state
150 .membership
151 .membership()
152 .write()
153 .await
154 .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
155
156 broadcast_event(
157 Arc::new(HotShotEvent::SetFirstEpoch(
158 cert.data.new_version_first_view,
159 first_epoch_number,
160 )),
161 event_sender,
162 )
163 .await;
164 }
165
166 for da_committee in &task_state.da_committees {
167 if cert.data.new_version >= da_committee.start_version {
168 task_state
169 .membership
170 .membership()
171 .write()
172 .await
173 .add_da_committee(da_committee.start_epoch, da_committee.committee.clone());
174 }
175 }
176
177 let _ = task_state
178 .storage
179 .update_decided_upgrade_certificate(Some(cert.clone()))
180 .await;
181 }
182
183 let mut consensus_writer = task_state.consensus.write().await;
184 if let Some(locked_view_number) = new_locked_view_number {
185 let _ = consensus_writer.update_locked_view(locked_view_number);
186 }
187
188 #[allow(clippy::cast_precision_loss)]
189 if let Some(decided_view_number) = new_decided_view_number {
190 let old_decided_view = consensus_writer.last_decided_view();
193 consensus_writer.collect_garbage(old_decided_view, decided_view_number);
194
195 consensus_writer
197 .update_last_decided_view(decided_view_number)
198 .context(|e| {
199 warn!("`update_last_decided_view` failed; this should never happen. Error: {e}")
200 })?;
201
202 consensus_writer
203 .metrics
204 .last_decided_time
205 .set(Utc::now().timestamp().try_into().unwrap());
206 consensus_writer.metrics.invalid_qc.set(0);
207 consensus_writer
208 .metrics
209 .last_decided_view
210 .set(usize::try_from(consensus_writer.last_decided_view().u64()).unwrap());
211 let cur_number_of_views_per_decide_event =
212 *proposal.view_number() - consensus_writer.last_decided_view().u64();
213 consensus_writer
214 .metrics
215 .number_of_views_per_decide_event
216 .add_point(cur_number_of_views_per_decide_event as f64);
217 for leaf in &leaf_views {
218 if let Err(e) = consensus_writer.update_vote_participation(leaf.leaf.justify_qc()) {
219 tracing::warn!("Failed to update vote participation: {e}");
220 }
221 }
222
223 drop(consensus_writer);
225
226 for leaf_info in &leaf_views {
227 tracing::info!(
228 "Sending decide for view {:?} at height {:?}",
229 leaf_info.leaf.view_number(),
230 leaf_info.leaf.block_header().block_number(),
231 );
232 }
233
234 broadcast_event(
235 Arc::new(HotShotEvent::LeavesDecided(
236 leaf_views
237 .iter()
238 .map(|leaf_info| leaf_info.leaf.clone())
239 .collect(),
240 )),
241 event_sender,
242 )
243 .await;
244
245 let committing_qc = Arc::new(committing_qc.unwrap());
248 broadcast_event(
249 Event {
250 view_number: decided_view_number,
251 event: EventType::Decide {
252 leaf_chain: Arc::new(leaf_views.clone()),
253 committing_qc: committing_qc.clone(),
254 deciding_qc: deciding_qc.map(Arc::new),
255 block_size: included_txns.map(|txns| txns.len().try_into().unwrap()),
256 },
257 },
258 &task_state.output_event_stream,
259 )
260 .await;
261
262 tracing::debug!(
263 "Successfully sent decide event, leaf views: {:?}, leaf views len: {:?}, qc view: {:?}",
264 decided_view_number,
265 leaf_views.len(),
266 committing_qc.view_number()
267 );
268
269 if version >= EPOCH_VERSION {
270 for leaf_view in leaf_views {
271 store_drb_result(task_state, &leaf_view.leaf).await?;
272 }
273 }
274 }
275
276 Ok(())
277}
278
279#[instrument(skip_all, target = "VoteDependencyHandle", fields(view = *view_number))]
281#[allow(clippy::too_many_arguments)]
282pub(crate) async fn update_shared_state<TYPES: NodeType>(
283 consensus: OuterConsensus<TYPES>,
284 sender: Sender<Arc<HotShotEvent<TYPES>>>,
285 receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
286 membership: EpochMembershipCoordinator<TYPES>,
287 public_key: TYPES::SignatureKey,
288 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
289 upgrade_lock: UpgradeLock<TYPES>,
290 view_number: ViewNumber,
291 instance_state: Arc<TYPES::InstanceState>,
292 proposed_leaf: &Leaf2<TYPES>,
293 vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
294 parent_view_number: Option<ViewNumber>,
295 epoch_height: u64,
296) -> Result<()> {
297 let justify_qc = &proposed_leaf.justify_qc();
298
299 let consensus_reader = consensus.read().await;
300 let mut maybe_validated_view = parent_view_number.and_then(|view_number| {
303 consensus_reader
304 .validated_state_map()
305 .get(&view_number)
306 .cloned()
307 });
308
309 let mut maybe_parent = consensus_reader
311 .saved_leaves()
312 .get(&justify_qc.data.leaf_commit)
313 .cloned();
314
315 drop(consensus_reader);
316
317 maybe_parent = match maybe_parent {
318 Some(p) => Some(p),
319 None => {
320 match fetch_proposal(
321 justify_qc,
322 sender.clone(),
323 receiver.activate_cloned(),
324 membership.clone(),
325 OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
326 public_key.clone(),
327 private_key.clone(),
328 &upgrade_lock,
329 epoch_height,
330 )
331 .await
332 .ok()
333 {
334 Some((leaf, view)) => {
335 maybe_validated_view = Some(view);
336 Some(leaf)
337 },
338 None => None,
339 }
340 },
341 };
342
343 let parent = maybe_parent.context(info!(
344 "Proposal's parent missing from storage with commitment: {:?}, proposal view {}",
345 justify_qc.data.leaf_commit,
346 proposed_leaf.view_number(),
347 ))?;
348
349 let Some(validated_view) = maybe_validated_view else {
350 bail!("Failed to fetch view for parent, parent view {parent_view_number:?}");
351 };
352
353 let (Some(parent_state), _) = validated_view.state_and_delta() else {
354 bail!("Parent state not found! Consensus internally inconsistent");
355 };
356
357 let version = upgrade_lock.version(view_number)?;
358
359 let now = Instant::now();
360 let (validated_state, state_delta) = parent_state
361 .validate_and_apply_header(
362 &instance_state,
363 &parent,
364 &proposed_leaf.block_header().clone(),
365 vid_share.data.payload_byte_len(),
366 version,
367 *view_number,
368 )
369 .await
370 .wrap()
371 .context(warn!("Block header doesn't extend the proposal!"))?;
372 let validation_duration = now.elapsed();
373 tracing::debug!("Validation time: {validation_duration:?}");
374
375 let now = Instant::now();
376 let mut consensus_writer = consensus.write().await;
378
379 if let Err(e) = consensus_writer.update_leaf(
380 proposed_leaf.clone(),
381 Arc::new(validated_state),
382 Some(Arc::new(state_delta)),
383 ) {
384 tracing::trace!("{e:?}");
385 }
386 let update_leaf_duration = now.elapsed();
387
388 consensus_writer
389 .metrics
390 .validate_and_apply_header_duration
391 .add_point(validation_duration.as_secs_f64());
392 consensus_writer
393 .metrics
394 .update_leaf_duration
395 .add_point(update_leaf_duration.as_secs_f64());
396 drop(consensus_writer);
397 tracing::debug!("update_leaf time: {update_leaf_duration:?}");
398
399 Ok(())
400}
401
402#[instrument(skip_all, fields(name = "Submit quorum vote", level = "error"))]
404#[allow(clippy::too_many_arguments)]
405pub(crate) async fn submit_vote<TYPES: NodeType, I: NodeImplementation<TYPES>>(
406 sender: Sender<Arc<HotShotEvent<TYPES>>>,
407 membership: EpochMembership<TYPES>,
408 public_key: TYPES::SignatureKey,
409 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
410 upgrade_lock: UpgradeLock<TYPES>,
411 view_number: ViewNumber,
412 storage: I::Storage,
413 storage_metrics: Arc<StorageMetricsValue>,
414 leaf: Leaf2<TYPES>,
415 vid_share: Proposal<TYPES, VidDisperseShare<TYPES>>,
416 extended_vote: bool,
417 epoch_root_vote: bool,
418 epoch_height: u64,
419 state_private_key: &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
420 stake_table_capacity: usize,
421) -> Result<()> {
422 let committee_member_in_current_epoch = membership.has_stake(&public_key).await;
423 let committee_member_in_next_epoch = leaf.with_epoch
426 && is_epoch_transition(leaf.height(), epoch_height)
427 && membership
428 .next_epoch_stake_table()
429 .await?
430 .has_stake(&public_key)
431 .await;
432
433 ensure!(
434 committee_member_in_current_epoch || committee_member_in_next_epoch,
435 info!("We were not chosen for quorum committee on {view_number}")
436 );
437
438 let height = if membership.epoch().is_some() {
439 Some(leaf.height())
440 } else {
441 None
442 };
443
444 let vote = QuorumVote2::<TYPES>::create_signed_vote(
446 QuorumData2 {
447 leaf_commit: leaf.commit(),
448 epoch: membership.epoch(),
449 block_number: height,
450 },
451 view_number,
452 &public_key,
453 &private_key,
454 &upgrade_lock,
455 )
456 .await
457 .wrap()
458 .context(error!("Failed to sign vote. This should never happen."))?;
459 let now = Instant::now();
460 storage
462 .append_vid(&vid_share)
463 .await
464 .wrap()
465 .context(error!("Failed to store VID share"))?;
466 let append_vid_duration = now.elapsed();
467 storage_metrics
468 .append_vid_duration
469 .add_point(append_vid_duration.as_secs_f64());
470 tracing::debug!("append_vid_general time: {append_vid_duration:?}");
471
472 let epoch_enabled = upgrade_lock.epochs_enabled(view_number);
475 if extended_vote && epoch_enabled {
476 tracing::debug!("sending extended vote to everybody",);
477 broadcast_event(
478 Arc::new(HotShotEvent::ExtendedQuorumVoteSend(vote)),
479 &sender,
480 )
481 .await;
482 } else if epoch_root_vote && epoch_enabled {
483 tracing::debug!(
484 "sending epoch root vote to next quorum leader {:?}",
485 vote.view_number() + 1
486 );
487 let light_client_state = leaf
488 .block_header()
489 .get_light_client_state(view_number)
490 .wrap()
491 .context(error!("Failed to generate light client state"))?;
492 let next_stake_table = membership
493 .next_epoch_stake_table()
494 .await?
495 .stake_table()
496 .await;
497 let next_stake_table_state = next_stake_table
498 .commitment(stake_table_capacity)
499 .wrap()
500 .context(error!("Failed to compute stake table commitment"))?;
501 let v2_signature = <TYPES::StateSignatureKey as LCV2StateSignatureKey>::sign_state(
503 state_private_key,
504 &light_client_state,
505 &next_stake_table_state,
506 )
507 .wrap()
508 .context(error!("Failed to sign the light client state"))?;
509 let auth_root = leaf
510 .block_header()
511 .auth_root()
512 .wrap()
513 .context(error!(format!(
514 "Failed to get auth root for light client state certificate. view={view_number}"
515 )))?;
516 let signed_state_digest =
517 derive_signed_state_digest(&light_client_state, &next_stake_table_state, &auth_root);
518 let signature = <TYPES::StateSignatureKey as LCV3StateSignatureKey>::sign_state(
519 state_private_key,
520 signed_state_digest,
521 )
522 .wrap()
523 .context(error!("Failed to sign the light client state"))?;
524 let state_vote = LightClientStateUpdateVote2 {
525 epoch: EpochNumber::new(epoch_from_block_number(leaf.height(), epoch_height)),
526 light_client_state,
527 next_stake_table_state,
528 signature,
529 v2_signature,
530 auth_root,
531 signed_state_digest,
532 };
533 broadcast_event(
534 Arc::new(HotShotEvent::EpochRootQuorumVoteSend(
535 EpochRootQuorumVote2 { vote, state_vote },
536 )),
537 &sender,
538 )
539 .await;
540 } else {
541 tracing::debug!(
542 "sending vote to next quorum leader {:?}",
543 vote.view_number() + 1
544 );
545 broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await;
546 }
547
548 Ok(())
549}