1use std::{
8 collections::{HashMap, HashSet},
9 sync::Arc,
10 time::Instant,
11};
12
13use alloy::{
14 primitives::{FixedBytes, U256},
15 sol_types::SolValue,
16};
17use ark_ff::PrimeField;
18use async_broadcast::{Receiver, SendError, Sender};
19use async_lock::RwLock;
20use committable::{Commitment, Committable};
21use hotshot_contract_adapter::sol_types::{LightClientStateSol, StakeTableStateSol};
22use hotshot_task::dependency::{Dependency, EventDependency};
23use hotshot_types::{
24 consensus::OuterConsensus,
25 data::{Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2},
26 drb::DrbResult,
27 epoch_membership::EpochMembershipCoordinator,
28 event::{Event, EventType, LeafInfo},
29 light_client::{CircuitField, LightClientState, StakeTableState},
30 message::{Proposal, UpgradeLock},
31 request_response::ProposalRequestPayload,
32 simple_certificate::{
33 DaCertificate2, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
34 QuorumCertificate2, UpgradeCertificate,
35 },
36 simple_vote::HasEpoch,
37 stake_table::StakeTableEntries,
38 traits::{
39 block_contents::BlockHeader,
40 election::Membership,
41 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
42 signature_key::{
43 LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StakeTableEntryType,
44 },
45 storage::Storage,
46 BlockPayload, ValidatedState,
47 },
48 utils::{
49 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_transition_block,
50 option_epoch_from_block_number, Terminator, View, ViewInner,
51 },
52 vote::{Certificate, HasViewNumber},
53};
54use hotshot_utils::anytrace::*;
55use time::OffsetDateTime;
56use tokio::time::timeout;
57use tracing::instrument;
58use vbs::version::StaticVersionType;
59
60use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
61
62#[instrument(skip_all)]
64#[allow(clippy::too_many_arguments)]
65pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
66 qc: &QuorumCertificate2<TYPES>,
67 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
68 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
69 membership_coordinator: EpochMembershipCoordinator<TYPES>,
70 consensus: OuterConsensus<TYPES>,
71 sender_public_key: TYPES::SignatureKey,
72 sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
73 upgrade_lock: &UpgradeLock<TYPES, V>,
74 epoch_height: u64,
75) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
76 let view_number = qc.view_number();
77 let leaf_commit = qc.data.leaf_commit;
78 let signed_proposal_request = ProposalRequestPayload {
81 view_number,
82 key: sender_public_key,
83 };
84
85 let signature = TYPES::SignatureKey::sign(
87 &sender_private_key,
88 signed_proposal_request.commit().as_ref(),
89 )
90 .wrap()
91 .context(error!("Failed to sign proposal. This should never happen."))?;
92
93 tracing::info!("Sending proposal request for view {view_number}");
94
95 broadcast_event(
97 HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
98 &event_sender,
99 )
100 .await;
101
102 let mut rx = event_receiver.clone();
103 let Ok(Some(proposal)) =
105 timeout(REQUEST_TIMEOUT, async move {
107 while let Ok(event) = rx.recv_direct().await {
109 if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
110 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
111 if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
112 return Some(quorum_proposal.clone());
113 }
114 }
115 }
116 None
117 })
118 .await
119 else {
120 bail!("Request for proposal failed");
121 };
122
123 let view_number = proposal.data.view_number();
124 let justify_qc = proposal.data.justify_qc().clone();
125
126 let justify_qc_epoch = justify_qc.data.epoch();
127
128 let epoch_membership = membership_coordinator
129 .stake_table_for_epoch(justify_qc_epoch)
130 .await?;
131 let membership_stake_table = epoch_membership.stake_table().await;
132 let membership_success_threshold = epoch_membership.success_threshold().await;
133
134 justify_qc
135 .is_valid_cert(
136 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
137 membership_success_threshold,
138 upgrade_lock,
139 )
140 .await
141 .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
142
143 let mut consensus_writer = consensus.write().await;
144 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
145 let state = Arc::new(
146 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
147 );
148
149 if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
150 tracing::trace!("{e:?}");
151 }
152 let view = View {
153 view_inner: ViewInner::Leaf {
154 leaf: leaf.commit(),
155 state,
156 delta: None,
157 epoch: leaf.epoch(epoch_height),
158 },
159 };
160 Ok((leaf, view))
161}
162pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
163 membership: &Arc<RwLock<TYPES::Membership>>,
164 epoch: TYPES::Epoch,
165 storage: &I::Storage,
166 drb_result: DrbResult,
167) {
168 tracing::debug!("Calling store_drb_result for epoch {epoch}");
169 if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
170 tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
171 }
172
173 membership.write().await.add_drb_result(epoch, drb_result)
174}
175
176pub(crate) async fn verify_drb_result<
183 TYPES: NodeType,
184 I: NodeImplementation<TYPES>,
185 V: Versions,
186>(
187 proposal: &QuorumProposalWrapper<TYPES>,
188 validation_info: &ValidationInfo<TYPES, I, V>,
189) -> Result<()> {
190 if validation_info.epoch_height == 0
192 || !is_epoch_transition(
193 proposal.block_header().block_number(),
194 validation_info.epoch_height,
195 )
196 {
197 tracing::debug!("Skipping DRB result verification");
198 return Ok(());
199 }
200
201 let epoch = option_epoch_from_block_number::<TYPES>(
205 validation_info
206 .upgrade_lock
207 .epochs_enabled(proposal.view_number())
208 .await,
209 proposal.block_header().block_number(),
210 validation_info.epoch_height,
211 );
212
213 let proposal_result = proposal
214 .next_drb_result()
215 .context(info!("Proposal is missing the next epoch's DRB result."))?;
216
217 if let Some(epoch_val) = epoch {
218 let current_epoch_membership = validation_info
219 .membership
220 .coordinator
221 .stake_table_for_epoch(epoch)
222 .await
223 .context(warn!("No stake table for epoch {}", epoch_val))?;
224
225 let has_stake_current_epoch = current_epoch_membership
226 .has_stake(&validation_info.public_key)
227 .await;
228
229 if has_stake_current_epoch {
230 let computed_result = current_epoch_membership
231 .next_epoch()
232 .await
233 .context(warn!("No stake table for epoch {}", epoch_val + 1))?
234 .get_epoch_drb()
235 .await
236 .clone()
237 .context(warn!("DRB result not found"))?;
238
239 ensure!(
240 proposal_result == computed_result,
241 warn!(
242 "Our calculated DRB result is {computed_result:?}, which does not match the \
243 proposed DRB result of {proposal_result:?}"
244 )
245 );
246 }
247
248 Ok(())
249 } else {
250 Err(error!("Epochs are not available"))
251 }
252}
253
254async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
256 decided_leaf: &Leaf2<TYPES>,
257 epoch_height: u64,
258 membership: &EpochMembershipCoordinator<TYPES>,
259 storage: &I::Storage,
260 consensus: &OuterConsensus<TYPES>,
261) {
262 let decided_leaf = decided_leaf.clone();
263 let decided_block_number = decided_leaf.block_header().block_number();
264
265 if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
267 let next_epoch_number =
268 TYPES::Epoch::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
269
270 let start = Instant::now();
271 if let Err(e) = storage
272 .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
273 .await
274 {
275 tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
276 }
277 tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
278
279 let membership = membership.clone();
280 let decided_block_header = decided_leaf.block_header().clone();
281 let storage = storage.clone();
282 let consensus = consensus.clone();
283
284 let consensus_reader = consensus.read().await;
285
286 drop(consensus_reader);
287
288 tokio::spawn(async move {
289 let membership_clone = membership.clone();
290
291 {
293 let start = Instant::now();
294 if let Err(e) = Membership::add_epoch_root(
295 Arc::clone(membership_clone.membership()),
296 next_epoch_number,
297 decided_block_header,
298 )
299 .await
300 {
301 tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
302 }
303 tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
304 }
305
306 let membership_clone = membership.clone();
307
308 let drb_result = membership_clone
309 .compute_drb_result(next_epoch_number, decided_leaf.clone())
310 .await;
311
312 let drb_result = match drb_result {
313 Ok(result) => result,
314 Err(e) => {
315 tracing::error!("Failed to compute DRB result from decide: {e}");
316 return;
317 },
318 };
319
320 let start = Instant::now();
321 handle_drb_result::<TYPES, I>(
322 membership.membership(),
323 next_epoch_number,
324 &storage,
325 drb_result,
326 )
327 .await;
328 tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
329 });
330 }
331}
332
333#[derive(Debug)]
335pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
336 pub new_locked_view_number: Option<TYPES::View>,
338
339 pub new_decided_view_number: Option<TYPES::View>,
341
342 pub committing_qc: Option<QuorumCertificate2<TYPES>>,
344
345 pub deciding_qc: Option<QuorumCertificate2<TYPES>>,
350
351 pub leaf_views: Vec<LeafInfo<TYPES>>,
353
354 pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
356
357 pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
359}
360
361impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
365 fn default() -> Self {
367 Self {
368 new_locked_view_number: None,
369 new_decided_view_number: None,
370 committing_qc: None,
371 deciding_qc: None,
372 leaf_views: Vec::new(),
373 included_txns: None,
374 decided_upgrade_cert: None,
375 }
376 }
377}
378
379async fn update_metrics<TYPES: NodeType>(
380 consensus: &OuterConsensus<TYPES>,
381 leaf_views: &[LeafInfo<TYPES>],
382) {
383 let consensus_reader = consensus.read().await;
384 let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
385
386 for leaf_view in leaf_views {
387 let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
388
389 let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
390 tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
391 continue;
392 };
393 consensus_reader
394 .metrics
395 .proposal_to_decide_time
396 .add_point(proposal_to_decide_time as f64);
397 if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
398 consensus_reader
399 .metrics
400 .finalized_bytes
401 .add_point(txn_bytes as f64);
402 }
403 }
404}
405
406#[allow(clippy::too_many_arguments)]
412pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
413 proposal: &QuorumProposalWrapper<TYPES>,
414 consensus: OuterConsensus<TYPES>,
415 existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
416 public_key: &TYPES::SignatureKey,
417 with_epochs: bool,
418 membership: &EpochMembershipCoordinator<TYPES>,
419 storage: &I::Storage,
420) -> LeafChainTraversalOutcome<TYPES> {
421 let mut res = LeafChainTraversalOutcome::default();
422 let consensus_reader = consensus.read().await;
423 let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
424 res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
425
426 let Some(parent_info) = consensus_reader
428 .parent_leaf_info(&proposed_leaf, public_key)
429 .await
430 else {
431 return res;
432 };
433 let Some(grand_parent_info) = consensus_reader
436 .parent_leaf_info(&parent_info.leaf, public_key)
437 .await
438 else {
439 return res;
440 };
441 if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
442 return res;
443 }
444 res.committing_qc = Some(parent_info.leaf.justify_qc().clone());
445 res.deciding_qc = Some(proposed_leaf.justify_qc().clone());
446 let decided_view_number = grand_parent_info.leaf.view_number();
447 res.new_decided_view_number = Some(decided_view_number);
448 let old_anchor_view = consensus_reader.last_decided_view();
450 let mut current_leaf_info = Some(grand_parent_info);
451 let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
452 let mut txns = HashSet::new();
453 while current_leaf_info
454 .as_ref()
455 .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
456 {
457 let info = &mut current_leaf_info.unwrap();
459 if let Some(cert) = info.leaf.upgrade_certificate() {
461 if info.leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
462 if cert.data.decide_by < decided_view_number {
463 tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
464 } else {
465 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
466 res.decided_upgrade_cert = Some(cert.clone());
467 }
468 }
469 }
470
471 if let Some(payload) = consensus_reader
474 .saved_payloads()
475 .get(&info.leaf.view_number())
476 {
477 info.leaf
478 .fill_block_payload_unchecked(payload.as_ref().payload.clone());
479 }
480
481 if let Some(ref payload) = info.leaf.block_payload() {
482 for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
483 txns.insert(txn);
484 }
485 }
486
487 current_leaf_info = consensus_reader
488 .parent_leaf_info(&info.leaf, public_key)
489 .await;
490 res.leaf_views.push(info.clone());
491 }
492
493 if !txns.is_empty() {
494 res.included_txns = Some(txns);
495 }
496
497 if with_epochs && res.new_decided_view_number.is_some() {
498 let Some(first_leaf) = res.leaf_views.first() else {
499 return res;
500 };
501 let epoch_height = consensus_reader.epoch_height;
502 consensus_reader
503 .metrics
504 .last_synced_block_height
505 .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
506 drop(consensus_reader);
507
508 for decided_leaf_info in &res.leaf_views {
509 decide_epoch_root::<TYPES, I>(
510 &decided_leaf_info.leaf,
511 epoch_height,
512 membership,
513 storage,
514 &consensus,
515 )
516 .await;
517 }
518 update_metrics(&consensus, &res.leaf_views).await;
519 }
520
521 res
522}
523
524#[allow(clippy::too_many_arguments)]
556pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>>(
557 proposal: &QuorumProposalWrapper<TYPES>,
558 consensus: OuterConsensus<TYPES>,
559 existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
560 public_key: &TYPES::SignatureKey,
561 with_epochs: bool,
562 membership: &EpochMembershipCoordinator<TYPES>,
563 storage: &I::Storage,
564 epoch_height: u64,
565) -> LeafChainTraversalOutcome<TYPES> {
566 let consensus_reader = consensus.read().await;
567 let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
568 let view_number = proposal.view_number();
569 let parent_view_number = proposal.justify_qc().view_number();
570 let old_anchor_view = consensus_reader.last_decided_view();
571
572 let mut last_view_number_visited = view_number;
573 let mut current_chain_length = 0usize;
574 let mut res = LeafChainTraversalOutcome::default();
575
576 if let Err(e) = consensus_reader.visit_leaf_ancestors(
577 parent_view_number,
578 Terminator::Exclusive(old_anchor_view),
579 true,
580 |leaf, state, delta| {
581 if res.new_decided_view_number.is_none() {
583 if last_view_number_visited == leaf.view_number() + 1 {
585 last_view_number_visited = leaf.view_number();
586
587 current_chain_length += 1;
589
590 if current_chain_length == 2 {
592 res.new_locked_view_number = Some(leaf.view_number());
593 res.committing_qc = Some(leaf.justify_qc().clone());
596 } else if current_chain_length == 3 {
597 res.new_decided_view_number = Some(leaf.view_number());
599 }
600 } else {
601 return false;
604 }
605 }
606
607 if let Some(new_decided_view) = res.new_decided_view_number {
609 let mut leaf = leaf.clone();
611
612 if leaf.view_number() == new_decided_view {
614 consensus_reader
615 .metrics
616 .last_synced_block_height
617 .set(usize::try_from(leaf.height()).unwrap_or(0));
618 }
619
620 if let Some(cert) = leaf.upgrade_certificate() {
622 if leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
623 if cert.data.decide_by < view_number {
624 tracing::warn!(
625 "Failed to decide an upgrade certificate in time. Ignoring."
626 );
627 } else {
628 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
629 res.decided_upgrade_cert = Some(cert.clone());
630 }
631 }
632 }
633 if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
636 leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
637 }
638
639 let vid_share = consensus_reader
642 .vid_shares()
643 .get(&leaf.view_number())
644 .and_then(|key_map| key_map.get(public_key))
645 .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
646 .map(|prop| prop.data.clone());
647
648 let state_cert = if leaf.with_epoch
649 && is_epoch_root(
650 leaf.block_header().block_number(),
651 consensus_reader.epoch_height,
652 ) {
653 match consensus_reader.state_cert() {
654 Some(state_cert)
656 if state_cert.light_client_state.view_number
657 == leaf.view_number().u64() =>
658 {
659 Some(state_cert.clone())
660 },
661 _ => None,
662 }
663 } else {
664 None
665 };
666
667 res.leaf_views.push(LeafInfo::new(
669 leaf.clone(),
670 Arc::clone(&state),
671 delta.clone(),
672 vid_share,
673 state_cert,
674 ));
675 if let Some(ref payload) = leaf.block_payload() {
676 res.included_txns = Some(
677 payload
678 .transaction_commitments(leaf.block_header().metadata())
679 .into_iter()
680 .collect::<HashSet<_>>(),
681 );
682 }
683 }
684 true
685 },
686 ) {
687 tracing::debug!("Leaf ascension failed; error={e}");
688 }
689
690 let epoch_height = consensus_reader.epoch_height;
691 drop(consensus_reader);
692
693 if with_epochs && res.new_decided_view_number.is_some() {
694 for decided_leaf_info in &res.leaf_views {
695 decide_epoch_root::<TYPES, I>(
696 &decided_leaf_info.leaf,
697 epoch_height,
698 membership,
699 storage,
700 &consensus,
701 )
702 .await;
703 }
704 }
705
706 res
707}
708
709#[instrument(skip_all)]
711#[allow(clippy::too_many_arguments)]
712pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
713 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
714 event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
715 membership: EpochMembershipCoordinator<TYPES>,
716 public_key: TYPES::SignatureKey,
717 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
718 consensus: OuterConsensus<TYPES>,
719 upgrade_lock: &UpgradeLock<TYPES, V>,
720 parent_qc: &QuorumCertificate2<TYPES>,
721 epoch_height: u64,
722) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
723 let consensus_reader = consensus.read().await;
724 let vsm_contains_parent_view = consensus_reader
725 .validated_state_map()
726 .contains_key(&parent_qc.view_number());
727 drop(consensus_reader);
728
729 if !vsm_contains_parent_view {
730 let _ = fetch_proposal(
731 parent_qc,
732 event_sender.clone(),
733 event_receiver.clone(),
734 membership,
735 consensus.clone(),
736 public_key.clone(),
737 private_key.clone(),
738 upgrade_lock,
739 epoch_height,
740 )
741 .await
742 .context(info!("Failed to fetch proposal"))?;
743 }
744
745 let consensus_reader = consensus.read().await;
746 let parent_view = consensus_reader
747 .validated_state_map()
748 .get(&parent_qc.view_number())
749 .context(debug!(
750 "Couldn't find parent view in state map, waiting for replica to see proposal; \
751 parent_view_number: {}",
752 *parent_qc.view_number()
753 ))?;
754
755 let (leaf_commitment, state) = parent_view.leaf_and_state().context(info!(
756 "Parent of high QC points to a view without a proposal; parent_view_number: {}, \
757 parent_view {:?}",
758 *parent_qc.view_number(),
759 parent_view
760 ))?;
761
762 if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
763 tracing::debug!(
765 "They don't equal: {:?} {:?}",
766 leaf_commitment,
767 consensus_reader.high_qc().data().leaf_commit
768 );
769 }
770
771 let leaf = consensus_reader
772 .saved_leaves()
773 .get(&leaf_commitment)
774 .context(info!("Failed to find high QC of parent"))?;
775
776 Ok((leaf.clone(), Arc::clone(state)))
777}
778
779pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
780 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
781 validation_info: &ValidationInfo<TYPES, I, V>,
782) -> Result<()> {
783 let in_transition_epoch = proposal
784 .data
785 .justify_qc()
786 .data
787 .block_number
788 .is_some_and(|bn| {
789 !is_transition_block(bn, validation_info.epoch_height)
790 && is_epoch_transition(bn, validation_info.epoch_height)
791 && bn % validation_info.epoch_height != 0
792 });
793 let justify_qc = proposal.data.justify_qc();
794 let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
795 if !in_transition_epoch {
796 tracing::debug!(
797 "Storing high QC for view {:?} and height {:?}",
798 justify_qc.view_number(),
799 justify_qc.data.block_number
800 );
801 if let Err(e) = validation_info
802 .storage
803 .update_high_qc2(justify_qc.clone())
804 .await
805 {
806 bail!("Failed to store High QC, not voting; error = {e:?}");
807 }
808 if justify_qc
809 .data
810 .block_number
811 .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
812 {
813 let Some(state_cert) = proposal.data.state_cert() else {
814 bail!("Epoch root QC has no state cert, not voting!");
815 };
816 if let Err(e) = validation_info
817 .storage
818 .update_state_cert(state_cert.clone())
819 .await
820 {
821 bail!(
822 "Failed to store the light client state update certificate, not voting; error \
823 = {:?}",
824 e
825 );
826 }
827 validation_info
828 .consensus
829 .write()
830 .await
831 .update_state_cert(state_cert.clone())?;
832 }
833 if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
834 if let Err(e) = validation_info
835 .storage
836 .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
837 .await
838 {
839 bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
840 }
841 }
842 }
843 let mut consensus_writer = validation_info.consensus.write().await;
844 if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
845 if justify_qc
846 .data
847 .block_number
848 .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
849 {
850 consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
851 consensus_writer
852 .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
853 return Ok(());
854 }
855 consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
856 }
857 consensus_writer.update_high_qc(justify_qc.clone())?;
858
859 Ok(())
860}
861
862async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
863 validation_info: &ValidationInfo<TYPES, I, V>,
864) -> Option<(
865 QuorumCertificate2<TYPES>,
866 NextEpochQuorumCertificate2<TYPES>,
867)> {
868 validation_info
869 .consensus
870 .read()
871 .await
872 .transition_qc()
873 .cloned()
874}
875
876pub(crate) async fn validate_epoch_transition_qc<
877 TYPES: NodeType,
878 I: NodeImplementation<TYPES>,
879 V: Versions,
880>(
881 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
882 validation_info: &ValidationInfo<TYPES, I, V>,
883) -> Result<()> {
884 let proposed_qc = proposal.data.justify_qc();
885 let Some(qc_block_number) = proposed_qc.data().block_number else {
886 bail!("Justify QC has no block number");
887 };
888 if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
889 || qc_block_number % validation_info.epoch_height == 0
890 {
891 return Ok(());
892 }
893 let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
894 bail!("Next epoch justify QC is not present");
895 };
896 ensure!(
897 next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
898 "Next epoch QC has different leaf commit to justify QC"
899 );
900
901 if is_transition_block(qc_block_number, validation_info.epoch_height) {
902 ensure!(
904 transition_qc(validation_info)
905 .await
906 .is_none_or(|(qc, _)| qc.view_number() <= proposed_qc.view_number()),
907 "Proposed transition qc must have view number greater than or equal to previous \
908 transition QC"
909 );
910
911 validation_info
912 .consensus
913 .write()
914 .await
915 .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
916 update_high_qc(proposal, validation_info).await?;
918 } else {
919 ensure!(
921 transition_qc(validation_info)
922 .await
923 .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
924 "Transition block must have view number greater than previous transition QC"
925 );
926 ensure!(
927 proposal.data.view_change_evidence().is_none(),
928 "Second to last block and last block of epoch must directly extend previous block, Qc \
929 Block number: {qc_block_number}, Proposal Block number: {}",
930 proposal.data.block_header().block_number()
931 );
932 ensure!(
933 proposed_qc.view_number() + 1 == proposal.data.view_number()
934 || transition_qc(validation_info)
935 .await
936 .is_some_and(|(qc, _)| &qc == proposed_qc),
937 "Transition proposals must extend the previous view directly, or extend the previous \
938 transition block"
939 );
940 }
941 Ok(())
942}
943
944#[allow(clippy::too_many_lines)]
951#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
952pub(crate) async fn validate_proposal_safety_and_liveness<
953 TYPES: NodeType,
954 I: NodeImplementation<TYPES>,
955 V: Versions,
956>(
957 proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
958 parent_leaf: Leaf2<TYPES>,
959 validation_info: &ValidationInfo<TYPES, I, V>,
960 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
961 sender: TYPES::SignatureKey,
962) -> Result<()> {
963 let view_number = proposal.data.view_number();
964
965 let mut valid_epoch_transition = false;
966 if validation_info
967 .upgrade_lock
968 .version(proposal.data.justify_qc().view_number())
969 .await
970 .is_ok_and(|v| v >= V::Epochs::VERSION)
971 {
972 let Some(block_number) = proposal.data.justify_qc().data.block_number else {
973 bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
974 };
975 if is_epoch_transition(block_number, validation_info.epoch_height) {
976 validate_epoch_transition_qc(&proposal, validation_info).await?;
977 valid_epoch_transition = true;
978 }
979 }
980
981 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
982 ensure!(
983 proposed_leaf.parent_commitment() == parent_leaf.commit(),
984 "Proposed leaf does not extend the parent leaf."
985 );
986 let proposal_epoch = option_epoch_from_block_number::<TYPES>(
987 validation_info
988 .upgrade_lock
989 .epochs_enabled(view_number)
990 .await,
991 proposed_leaf.height(),
992 validation_info.epoch_height,
993 );
994
995 let state = Arc::new(
996 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
997 );
998
999 {
1000 let mut consensus_writer = validation_info.consensus.write().await;
1001 if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
1002 tracing::trace!("{e:?}");
1003 }
1004
1005 if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
1008 tracing::debug!("Internal proposal update failed; error = {e:#}");
1009 };
1010 }
1011
1012 UpgradeCertificate::validate(
1013 proposal.data.upgrade_certificate(),
1014 &validation_info.membership,
1015 proposal_epoch,
1016 &validation_info.upgrade_lock,
1017 )
1018 .await?;
1019
1020 proposed_leaf
1022 .extends_upgrade(
1023 &parent_leaf,
1024 &validation_info.upgrade_lock.decided_upgrade_certificate,
1025 )
1026 .await?;
1027
1028 let justify_qc = proposal.data.justify_qc().clone();
1029 {
1033 let consensus_reader = validation_info.consensus.read().await;
1034 let justify_qc_epoch = option_epoch_from_block_number::<TYPES>(
1039 validation_info
1040 .upgrade_lock
1041 .epochs_enabled(view_number)
1042 .await,
1043 parent_leaf.height(),
1044 validation_info.epoch_height,
1045 );
1046 ensure!(
1047 proposal_epoch == justify_qc_epoch
1048 || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
1049 {
1050 error!(
1051 "Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify \
1052 QC leaf is {parent_leaf:?}"
1053 )
1054 }
1055 );
1056
1057 if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
1059 && validation_info
1060 .upgrade_lock
1061 .epochs_enabled(view_number)
1062 .await
1063 {
1064 ensure!(
1065 proposal.data.next_epoch_justify_qc().is_some(),
1066 "Epoch transition proposal does not include the next epoch justify QC. Do not \
1067 vote!"
1068 );
1069 }
1070
1071 let liveness_check =
1073 justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1074
1075 let outcome = consensus_reader.visit_leaf_ancestors(
1078 justify_qc.view_number(),
1079 Terminator::Inclusive(consensus_reader.locked_view()),
1080 false,
1081 |leaf, _, _| {
1082 leaf.view_number() != consensus_reader.locked_view()
1085 },
1086 );
1087 let safety_check = outcome.is_ok();
1088
1089 ensure!(safety_check || liveness_check, {
1090 if let Err(e) = outcome {
1091 broadcast_event(
1092 Event {
1093 view_number,
1094 event: EventType::Error { error: Arc::new(e) },
1095 },
1096 &validation_info.output_event_stream,
1097 )
1098 .await;
1099 }
1100
1101 error!(
1102 "Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked \
1103 view is {:?}",
1104 consensus_reader.high_qc(),
1105 proposal.data,
1106 consensus_reader.locked_view()
1107 )
1108 });
1109 }
1110
1111 broadcast_event(
1113 Event {
1114 view_number,
1115 event: EventType::QuorumProposal {
1116 proposal: proposal.clone(),
1117 sender,
1118 },
1119 },
1120 &validation_info.output_event_stream,
1121 )
1122 .await;
1123
1124 broadcast_event(
1126 Arc::new(HotShotEvent::QuorumProposalValidated(
1127 proposal.clone(),
1128 parent_leaf,
1129 )),
1130 &event_stream,
1131 )
1132 .await;
1133
1134 Ok(())
1135}
1136
1137pub(crate) async fn validate_proposal_view_and_certs<
1144 TYPES: NodeType,
1145 I: NodeImplementation<TYPES>,
1146 V: Versions,
1147>(
1148 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1149 validation_info: &ValidationInfo<TYPES, I, V>,
1150) -> Result<()> {
1151 let view_number = proposal.data.view_number();
1152 ensure!(
1153 view_number >= validation_info.consensus.read().await.cur_view(),
1154 "Proposal is from an older view {:?}",
1155 proposal.data
1156 );
1157
1158 let mut membership = validation_info.membership.clone();
1160 proposal.validate_signature(&membership).await?;
1161
1162 if proposal.data.justify_qc().view_number() != view_number - 1 {
1164 let received_proposal_cert =
1165 proposal
1166 .data
1167 .view_change_evidence()
1168 .clone()
1169 .context(debug!(
1170 "Quorum proposal for view {view_number} needed a timeout or view sync \
1171 certificate, but did not have one",
1172 ))?;
1173
1174 match received_proposal_cert {
1175 ViewChangeEvidence2::Timeout(timeout_cert) => {
1176 ensure!(
1177 timeout_cert.data().view == view_number - 1,
1178 "Timeout certificate for view {view_number} was not for the immediately \
1179 preceding view"
1180 );
1181 let timeout_cert_epoch = timeout_cert.data().epoch();
1182 membership = membership.get_new_epoch(timeout_cert_epoch).await?;
1183
1184 let membership_stake_table = membership.stake_table().await;
1185 let membership_success_threshold = membership.success_threshold().await;
1186
1187 timeout_cert
1188 .is_valid_cert(
1189 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1190 membership_success_threshold,
1191 &validation_info.upgrade_lock,
1192 )
1193 .await
1194 .context(|e| {
1195 warn!("Timeout certificate for view {view_number} was invalid: {e}")
1196 })?;
1197 },
1198 ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1199 ensure!(
1200 view_sync_cert.view_number == view_number,
1201 "View sync cert view number {:?} does not match proposal view number {:?}",
1202 view_sync_cert.view_number,
1203 view_number
1204 );
1205
1206 let view_sync_cert_epoch = view_sync_cert.data().epoch();
1207 membership = membership.get_new_epoch(view_sync_cert_epoch).await?;
1208
1209 let membership_stake_table = membership.stake_table().await;
1210 let membership_success_threshold = membership.success_threshold().await;
1211
1212 view_sync_cert
1214 .is_valid_cert(
1215 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1216 membership_success_threshold,
1217 &validation_info.upgrade_lock,
1218 )
1219 .await
1220 .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1221 },
1222 }
1223 }
1224
1225 {
1228 let epoch = option_epoch_from_block_number::<TYPES>(
1229 proposal.data.epoch().is_some(),
1230 proposal.data.block_header().block_number(),
1231 validation_info.epoch_height,
1232 );
1233 UpgradeCertificate::validate(
1234 proposal.data.upgrade_certificate(),
1235 &validation_info.membership,
1236 epoch,
1237 &validation_info.upgrade_lock,
1238 )
1239 .await?;
1240 }
1241
1242 Ok(())
1243}
1244
1245pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1247 match sender.broadcast_direct(event).await {
1248 Ok(None) => (),
1249 Ok(Some(overflowed)) => {
1250 tracing::error!(
1251 "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1252 );
1253 },
1254 Err(SendError(e)) => {
1255 tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1256 },
1257 }
1258}
1259
1260pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType, V: Versions>(
1263 qc: &QuorumCertificate2<TYPES>,
1264 maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1265 consensus: &OuterConsensus<TYPES>,
1266 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1267 upgrade_lock: &UpgradeLock<TYPES, V>,
1268 epoch_height: u64,
1269) -> Result<()> {
1270 let mut epoch_membership = membership_coordinator
1271 .stake_table_for_epoch(qc.data.epoch)
1272 .await?;
1273
1274 let membership_stake_table = epoch_membership.stake_table().await;
1275 let membership_success_threshold = epoch_membership.success_threshold().await;
1276
1277 if let Err(e) = qc
1278 .is_valid_cert(
1279 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1280 membership_success_threshold,
1281 upgrade_lock,
1282 )
1283 .await
1284 {
1285 consensus.read().await.metrics.invalid_qc.update(1);
1286 return Err(warn!("Invalid certificate: {e}"));
1287 }
1288
1289 if upgrade_lock.epochs_enabled(qc.view_number()).await {
1290 ensure!(
1291 qc.data.block_number.is_some(),
1292 "QC for epoch {:?} has no block number",
1293 qc.data.epoch
1294 );
1295 }
1296
1297 if qc
1298 .data
1299 .block_number
1300 .is_some_and(|b| is_epoch_transition(b, epoch_height))
1301 {
1302 ensure!(
1303 maybe_next_epoch_qc.is_some(),
1304 error!("Received High QC for the transition block but not the next epoch QC")
1305 );
1306 }
1307
1308 if let Some(next_epoch_qc) = maybe_next_epoch_qc {
1309 if qc.view_number() != next_epoch_qc.view_number() || qc.data != *next_epoch_qc.data {
1311 bail!("Next epoch qc exists but it's not equal with qc.");
1312 }
1313 epoch_membership = epoch_membership.next_epoch_stake_table().await?;
1314 let membership_next_stake_table = epoch_membership.stake_table().await;
1315 let membership_next_success_threshold = epoch_membership.success_threshold().await;
1316
1317 next_epoch_qc
1319 .is_valid_cert(
1320 &StakeTableEntries::<TYPES>::from(membership_next_stake_table).0,
1321 membership_next_success_threshold,
1322 upgrade_lock,
1323 )
1324 .await
1325 .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1326 }
1327 Ok(())
1328}
1329
1330pub async fn validate_light_client_state_update_certificate<TYPES: NodeType, V: Versions>(
1332 state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1333 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1334 upgrade_lock: &UpgradeLock<TYPES, V>,
1335) -> Result<()> {
1336 tracing::debug!("Validating light client state update certificate");
1337
1338 let epoch_membership = membership_coordinator
1339 .membership_for_epoch(state_cert.epoch())
1340 .await?;
1341
1342 let membership_stake_table = epoch_membership.stake_table().await;
1343 let membership_success_threshold = epoch_membership.success_threshold().await;
1344
1345 let mut state_key_map = HashMap::new();
1346 membership_stake_table.into_iter().for_each(|config| {
1347 state_key_map.insert(
1348 config.state_ver_key.clone(),
1349 config.stake_table_entry.stake(),
1350 );
1351 });
1352
1353 let mut accumulated_stake = U256::from(0);
1354 let signed_state_digest = derive_signed_state_digest(
1355 &state_cert.light_client_state,
1356 &state_cert.next_stake_table_state,
1357 &state_cert.auth_root,
1358 );
1359 for (key, sig, sig_v2) in state_cert.signatures.iter() {
1360 if let Some(stake) = state_key_map.get(key) {
1361 accumulated_stake += *stake;
1362 #[allow(clippy::collapsible_else_if)]
1363 if !upgrade_lock
1365 .proposal2_version(TYPES::View::new(state_cert.light_client_state.view_number))
1366 .await
1367 {
1368 if !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1369 key,
1370 sig_v2,
1371 &state_cert.light_client_state,
1372 &state_cert.next_stake_table_state,
1373 ) {
1374 bail!("Invalid light client state update certificate signature");
1375 }
1376 } else {
1377 if !<TYPES::StateSignatureKey as LCV3StateSignatureKey>::verify_state_sig(
1378 key,
1379 sig,
1380 signed_state_digest,
1381 ) || !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1382 key,
1383 sig_v2,
1384 &state_cert.light_client_state,
1385 &state_cert.next_stake_table_state,
1386 ) {
1387 bail!("Invalid light client state update certificate signature");
1388 }
1389 }
1390 } else {
1391 bail!("Invalid light client state update certificate signature");
1392 }
1393 }
1394 if accumulated_stake < membership_success_threshold {
1395 bail!("Light client state update certificate does not meet the success threshold");
1396 }
1397
1398 Ok(())
1399}
1400
1401pub(crate) fn check_qc_state_cert_correspondence<TYPES: NodeType>(
1402 qc: &QuorumCertificate2<TYPES>,
1403 state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1404 epoch_height: u64,
1405) -> bool {
1406 qc.data
1407 .block_number
1408 .is_some_and(|bn| is_epoch_root(bn, epoch_height))
1409 && Some(state_cert.epoch) == qc.data.epoch()
1410 && qc.view_number().u64() == state_cert.light_client_state.view_number
1411}
1412
1413pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1417 target_epoch: Option<TYPES::Epoch>,
1418 vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1419 da_cert: &DaCertificate2<TYPES>,
1420 consensus: &OuterConsensus<TYPES>,
1421 receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1422 cancel_receiver: Receiver<()>,
1423 id: u64,
1424) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1425 tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1426 let maybe_second_vid_share = consensus
1427 .read()
1428 .await
1429 .vid_shares()
1430 .get(&vid_share.data.view_number())
1431 .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1432 .and_then(|epoch_map| epoch_map.get(&target_epoch))
1433 .cloned();
1434 if let Some(second_vid_share) = maybe_second_vid_share {
1435 if (target_epoch == da_cert.epoch()
1436 && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1437 || (target_epoch != da_cert.epoch()
1438 && Some(second_vid_share.data.payload_commitment())
1439 == da_cert.data().next_epoch_payload_commit)
1440 {
1441 return Ok(second_vid_share);
1442 }
1443 }
1444
1445 let receiver = receiver.clone();
1446 let da_cert_clone = da_cert.clone();
1447 let Some(event) = EventDependency::new(
1448 receiver,
1449 cancel_receiver,
1450 format!(
1451 "VoteDependency Second VID share for view {:?}, my id {:?}",
1452 vid_share.data.view_number(),
1453 id
1454 ),
1455 Box::new(move |event| {
1456 let event = event.as_ref();
1457 if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1458 if target_epoch == da_cert_clone.epoch() {
1459 second_vid_share.data.payload_commitment()
1460 == da_cert_clone.data().payload_commit
1461 } else {
1462 Some(second_vid_share.data.payload_commitment())
1463 == da_cert_clone.data().next_epoch_payload_commit
1464 }
1465 } else {
1466 false
1467 }
1468 }),
1469 )
1470 .completed()
1471 .await
1472 else {
1473 return Err(warn!("Error while waiting for the second VID share."));
1474 };
1475 let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1476 return Err(warn!(
1478 "Received event is not VidShareValidated but we checked it earlier. Shouldn't be \
1479 possible."
1480 ));
1481 };
1482 Ok(second_vid_share.clone())
1483}
1484
1485pub async fn broadcast_view_change<TYPES: NodeType>(
1486 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1487 new_view_number: TYPES::View,
1488 epoch: Option<TYPES::Epoch>,
1489 first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
1490) {
1491 let mut broadcast_epoch = epoch;
1492 if let Some((first_epoch_view, first_epoch)) = first_epoch {
1493 if new_view_number == first_epoch_view && broadcast_epoch != Some(first_epoch) {
1494 broadcast_epoch = Some(first_epoch);
1495 }
1496 }
1497 tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1498 broadcast_event(
1499 Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1500 sender,
1501 )
1502 .await
1503}
1504
1505pub fn derive_signed_state_digest(
1506 lc_state: &LightClientState,
1507 next_stake_state: &StakeTableState,
1508 auth_root: &FixedBytes<32>,
1509) -> CircuitField {
1510 let lc_state_sol: LightClientStateSol = (*lc_state).into();
1511 let stake_st_sol: StakeTableStateSol = (*next_stake_state).into();
1512
1513 let res = alloy::primitives::keccak256(
1514 (
1515 lc_state_sol.abi_encode(),
1516 stake_st_sol.abi_encode(),
1517 auth_root.abi_encode(),
1518 )
1519 .abi_encode_packed(),
1520 );
1521 CircuitField::from_be_bytes_mod_order(res.as_ref())
1522}