1use std::{
8 collections::{HashMap, HashSet},
9 sync::Arc,
10 time::Instant,
11};
12
13use alloy::{
14 primitives::{FixedBytes, U256},
15 sol_types::SolValue,
16};
17use ark_ff::PrimeField;
18use async_broadcast::{Receiver, SendError, Sender};
19use async_lock::RwLock;
20use committable::{Commitment, Committable};
21use hotshot_contract_adapter::sol_types::{LightClientStateSol, StakeTableStateSol};
22use hotshot_task::dependency::{Dependency, EventDependency};
23use hotshot_types::{
24 consensus::OuterConsensus,
25 data::{
26 EpochNumber, Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2,
27 ViewNumber,
28 },
29 drb::DrbResult,
30 epoch_membership::EpochMembershipCoordinator,
31 event::{Event, EventType, LeafInfo},
32 light_client::{CircuitField, LightClientState, StakeTableState},
33 message::{Proposal, UpgradeLock},
34 request_response::ProposalRequestPayload,
35 simple_certificate::{
36 CertificatePair, DaCertificate2, LightClientStateUpdateCertificateV2,
37 NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
38 },
39 simple_vote::HasEpoch,
40 stake_table::StakeTableEntries,
41 traits::{
42 BlockPayload, ValidatedState,
43 block_contents::BlockHeader,
44 election::Membership,
45 node_implementation::{NodeImplementation, NodeType},
46 signature_key::{
47 LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StakeTableEntryType,
48 },
49 storage::Storage,
50 },
51 utils::{
52 Terminator, View, ViewInner, epoch_from_block_number, is_epoch_root, is_epoch_transition,
53 is_transition_block, option_epoch_from_block_number,
54 },
55 vote::{Certificate, HasViewNumber},
56};
57use hotshot_utils::anytrace::*;
58use time::OffsetDateTime;
59use tokio::time::timeout;
60use tracing::instrument;
61use versions::EPOCH_VERSION;
62
63use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
64
65#[instrument(skip_all)]
67#[allow(clippy::too_many_arguments)]
68pub(crate) async fn fetch_proposal<TYPES: NodeType>(
69 qc: &QuorumCertificate2<TYPES>,
70 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
71 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
72 membership_coordinator: EpochMembershipCoordinator<TYPES>,
73 consensus: OuterConsensus<TYPES>,
74 sender_public_key: TYPES::SignatureKey,
75 sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
76 upgrade_lock: &UpgradeLock<TYPES>,
77 epoch_height: u64,
78) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
79 let view_number = qc.view_number();
80 let leaf_commit = qc.data.leaf_commit;
81 let signed_proposal_request = ProposalRequestPayload {
84 view_number,
85 key: sender_public_key,
86 };
87
88 let signature = TYPES::SignatureKey::sign(
90 &sender_private_key,
91 signed_proposal_request.commit().as_ref(),
92 )
93 .wrap()
94 .context(error!("Failed to sign proposal. This should never happen."))?;
95
96 tracing::info!("Sending proposal request for view {view_number}");
97
98 broadcast_event(
100 HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
101 &event_sender,
102 )
103 .await;
104
105 let mut rx = event_receiver.clone();
106 let Ok(Some(proposal)) =
108 timeout(REQUEST_TIMEOUT, async move {
110 while let Ok(event) = rx.recv_direct().await {
112 if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
113 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
114 if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
115 return Some(quorum_proposal.clone());
116 }
117 }
118 }
119 None
120 })
121 .await
122 else {
123 bail!("Request for proposal failed");
124 };
125
126 let view_number = proposal.data.view_number();
127 let justify_qc = proposal.data.justify_qc().clone();
128
129 let justify_qc_epoch = justify_qc.data.epoch();
130
131 let epoch_membership = membership_coordinator
132 .stake_table_for_epoch(justify_qc_epoch)
133 .await?;
134 let membership_stake_table = epoch_membership.stake_table().await;
135 let membership_success_threshold = epoch_membership.success_threshold().await;
136
137 justify_qc
138 .is_valid_cert(
139 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
140 membership_success_threshold,
141 upgrade_lock,
142 )
143 .await
144 .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
145
146 let mut consensus_writer = consensus.write().await;
147 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
148 let state = Arc::new(
149 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
150 );
151
152 if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
153 tracing::trace!("{e:?}");
154 }
155 let view = View {
156 view_inner: ViewInner::Leaf {
157 leaf: leaf.commit(),
158 state,
159 delta: None,
160 epoch: leaf.epoch(epoch_height),
161 },
162 };
163 Ok((leaf, view))
164}
165pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
166 membership: &Arc<RwLock<TYPES::Membership>>,
167 epoch: EpochNumber,
168 storage: &I::Storage,
169 drb_result: DrbResult,
170) {
171 tracing::debug!("Calling store_drb_result for epoch {epoch}");
172 if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
173 tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
174 }
175
176 membership.write().await.add_drb_result(epoch, drb_result)
177}
178
179pub(crate) async fn verify_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
186 proposal: &QuorumProposalWrapper<TYPES>,
187 validation_info: &ValidationInfo<TYPES, I>,
188) -> Result<()> {
189 if validation_info.epoch_height == 0
191 || !is_epoch_transition(
192 proposal.block_header().block_number(),
193 validation_info.epoch_height,
194 )
195 {
196 tracing::debug!("Skipping DRB result verification");
197 return Ok(());
198 }
199
200 let epoch = option_epoch_from_block_number(
204 validation_info
205 .upgrade_lock
206 .epochs_enabled(proposal.view_number()),
207 proposal.block_header().block_number(),
208 validation_info.epoch_height,
209 );
210
211 let proposal_result = proposal
212 .next_drb_result()
213 .context(info!("Proposal is missing the next epoch's DRB result."))?;
214
215 if let Some(epoch_val) = epoch {
216 let current_epoch_membership = validation_info
217 .membership
218 .coordinator
219 .stake_table_for_epoch(epoch)
220 .await
221 .context(warn!("No stake table for epoch {}", epoch_val))?;
222
223 let has_stake_current_epoch = current_epoch_membership
224 .has_stake(&validation_info.public_key)
225 .await;
226
227 if has_stake_current_epoch {
228 let computed_result = current_epoch_membership
229 .next_epoch()
230 .await
231 .context(warn!("No stake table for epoch {}", epoch_val + 1))?
232 .get_epoch_drb()
233 .await
234 .clone()
235 .context(warn!("DRB result not found"))?;
236
237 ensure!(
238 proposal_result == computed_result,
239 warn!(
240 "Our calculated DRB result is {computed_result:?}, which does not match the \
241 proposed DRB result of {proposal_result:?}"
242 )
243 );
244 }
245
246 Ok(())
247 } else {
248 Err(error!("Epochs are not available"))
249 }
250}
251
252async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
254 decided_leaf: &Leaf2<TYPES>,
255 epoch_height: u64,
256 membership: &EpochMembershipCoordinator<TYPES>,
257 storage: &I::Storage,
258 consensus: &OuterConsensus<TYPES>,
259) {
260 let decided_leaf = decided_leaf.clone();
261 let decided_block_number = decided_leaf.block_header().block_number();
262
263 if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
265 let next_epoch_number =
266 EpochNumber::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
267
268 let start = Instant::now();
269 if let Err(e) = storage
270 .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
271 .await
272 {
273 tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
274 }
275 tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
276
277 let membership = membership.clone();
278 let decided_block_header = decided_leaf.block_header().clone();
279 let storage = storage.clone();
280 let consensus = consensus.clone();
281
282 let consensus_reader = consensus.read().await;
283
284 drop(consensus_reader);
285
286 tokio::spawn(async move {
287 let membership_clone = membership.clone();
288
289 {
291 let start = Instant::now();
292 if let Err(e) = Membership::add_epoch_root(
293 Arc::clone(membership_clone.membership()),
294 decided_block_header,
295 )
296 .await
297 {
298 tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
299 }
300 tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
301 }
302
303 let membership_clone = membership.clone();
304
305 let drb_result = membership_clone
306 .compute_drb_result(next_epoch_number, decided_leaf.clone())
307 .await;
308
309 let drb_result = match drb_result {
310 Ok(result) => result,
311 Err(e) => {
312 tracing::error!("Failed to compute DRB result from decide: {e}");
313 return;
314 },
315 };
316
317 let start = Instant::now();
318 handle_drb_result::<TYPES, I>(
319 membership.membership(),
320 next_epoch_number,
321 &storage,
322 drb_result,
323 )
324 .await;
325 tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
326 });
327 }
328}
329
330#[derive(Debug)]
332pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
333 pub new_locked_view_number: Option<ViewNumber>,
335
336 pub new_decided_view_number: Option<ViewNumber>,
338
339 pub committing_qc: Option<CertificatePair<TYPES>>,
343
344 pub deciding_qc: Option<CertificatePair<TYPES>>,
349
350 pub leaf_views: Vec<LeafInfo<TYPES>>,
352
353 pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
355
356 pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
358}
359
360impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
364 fn default() -> Self {
366 Self {
367 new_locked_view_number: None,
368 new_decided_view_number: None,
369 committing_qc: None,
370 deciding_qc: None,
371 leaf_views: Vec::new(),
372 included_txns: None,
373 decided_upgrade_cert: None,
374 }
375 }
376}
377
378async fn update_metrics<TYPES: NodeType>(
379 consensus: &OuterConsensus<TYPES>,
380 leaf_views: &[LeafInfo<TYPES>],
381) {
382 let consensus_reader = consensus.read().await;
383 let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
384
385 for leaf_view in leaf_views {
386 let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
387
388 let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
389 tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
390 continue;
391 };
392 consensus_reader
393 .metrics
394 .proposal_to_decide_time
395 .add_point(proposal_to_decide_time as f64);
396 if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
397 consensus_reader
398 .metrics
399 .finalized_bytes
400 .add_point(txn_bytes as f64);
401 }
402 }
403}
404
405#[allow(clippy::too_many_arguments)]
411pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
412 proposal: &QuorumProposalWrapper<TYPES>,
413 consensus: OuterConsensus<TYPES>,
414 upgrade_lock: &UpgradeLock<TYPES>,
415 public_key: &TYPES::SignatureKey,
416 with_epochs: bool,
417 membership: &EpochMembershipCoordinator<TYPES>,
418 storage: &I::Storage,
419) -> LeafChainTraversalOutcome<TYPES> {
420 let mut res = LeafChainTraversalOutcome::default();
421 let consensus_reader = consensus.read().await;
422 let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
423 res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
424
425 let Some(parent_info) = consensus_reader
427 .parent_leaf_info(&proposed_leaf, public_key)
428 .await
429 else {
430 return res;
431 };
432 let Some(grand_parent_info) = consensus_reader
435 .parent_leaf_info(&parent_info.leaf, public_key)
436 .await
437 else {
438 return res;
439 };
440 if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
441 return res;
442 }
443 res.committing_qc = Some(CertificatePair::for_parent(&parent_info.leaf));
444 res.deciding_qc = Some(CertificatePair::for_parent(&proposed_leaf));
445 let decided_view_number = grand_parent_info.leaf.view_number();
446 res.new_decided_view_number = Some(decided_view_number);
447 let old_anchor_view = consensus_reader.last_decided_view();
449 let mut current_leaf_info = Some(grand_parent_info);
450 let existing_upgrade_cert_reader = upgrade_lock.decided_upgrade_cert();
451 let mut txns = HashSet::new();
452 while current_leaf_info
453 .as_ref()
454 .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
455 {
456 let info = &mut current_leaf_info.unwrap();
458 if let Some(cert) = info.leaf.upgrade_certificate()
460 && info.leaf.upgrade_certificate() != existing_upgrade_cert_reader
461 {
462 if cert.data.decide_by < decided_view_number {
463 tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
464 } else {
465 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
466 res.decided_upgrade_cert = Some(cert.clone());
467 }
468 }
469
470 if let Some(payload) = consensus_reader
473 .saved_payloads()
474 .get(&info.leaf.view_number())
475 {
476 info.leaf
477 .fill_block_payload_unchecked(payload.as_ref().payload.clone());
478 }
479
480 if let Some(ref payload) = info.leaf.block_payload() {
481 for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
482 txns.insert(txn);
483 }
484 }
485
486 current_leaf_info = consensus_reader
487 .parent_leaf_info(&info.leaf, public_key)
488 .await;
489 res.leaf_views.push(info.clone());
490 }
491
492 if !txns.is_empty() {
493 res.included_txns = Some(txns);
494 }
495
496 if with_epochs && res.new_decided_view_number.is_some() {
497 let Some(first_leaf) = res.leaf_views.first() else {
498 return res;
499 };
500 let epoch_height = consensus_reader.epoch_height;
501 consensus_reader
502 .metrics
503 .last_synced_block_height
504 .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
505 drop(consensus_reader);
506
507 for decided_leaf_info in &res.leaf_views {
508 decide_epoch_root::<TYPES, I>(
509 &decided_leaf_info.leaf,
510 epoch_height,
511 membership,
512 storage,
513 &consensus,
514 )
515 .await;
516 }
517 update_metrics(&consensus, &res.leaf_views).await;
518 }
519
520 res
521}
522
523#[allow(clippy::too_many_arguments)]
555pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>>(
556 proposal: &QuorumProposalWrapper<TYPES>,
557 consensus: OuterConsensus<TYPES>,
558 upgrade_lock: &UpgradeLock<TYPES>,
559 public_key: &TYPES::SignatureKey,
560 with_epochs: bool,
561 membership: &EpochMembershipCoordinator<TYPES>,
562 storage: &I::Storage,
563 epoch_height: u64,
564) -> LeafChainTraversalOutcome<TYPES> {
565 let consensus_reader = consensus.read().await;
566 let existing_upgrade_cert_reader = upgrade_lock.decided_upgrade_cert();
567 let view_number = proposal.view_number();
568 let parent_view_number = proposal.justify_qc().view_number();
569 let old_anchor_view = consensus_reader.last_decided_view();
570
571 let mut last_view_number_visited = view_number;
572 let mut current_chain_length = 0usize;
573 let mut res = LeafChainTraversalOutcome::default();
574
575 if let Err(e) = consensus_reader.visit_leaf_ancestors(
576 parent_view_number,
577 Terminator::Exclusive(old_anchor_view),
578 true,
579 |leaf, state, delta| {
580 if res.new_decided_view_number.is_none() {
582 if last_view_number_visited == leaf.view_number() + 1 {
584 last_view_number_visited = leaf.view_number();
585
586 current_chain_length += 1;
588
589 if current_chain_length == 2 {
591 res.new_locked_view_number = Some(leaf.view_number());
592 res.committing_qc = Some(CertificatePair::for_parent(leaf));
595 } else if current_chain_length == 3 {
596 res.new_decided_view_number = Some(leaf.view_number());
598 }
599 } else {
600 return false;
603 }
604 }
605
606 if let Some(new_decided_view) = res.new_decided_view_number {
608 let mut leaf = leaf.clone();
610
611 if leaf.view_number() == new_decided_view {
613 consensus_reader
614 .metrics
615 .last_synced_block_height
616 .set(usize::try_from(leaf.height()).unwrap_or(0));
617 }
618
619 if let Some(cert) = leaf.upgrade_certificate()
621 && leaf.upgrade_certificate() != existing_upgrade_cert_reader
622 {
623 if cert.data.decide_by < view_number {
624 tracing::warn!(
625 "Failed to decide an upgrade certificate in time. Ignoring."
626 );
627 } else {
628 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
629 res.decided_upgrade_cert = Some(cert.clone());
630 }
631 }
632 if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
635 leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
636 }
637
638 let vid_share = consensus_reader
641 .vid_shares()
642 .get(&leaf.view_number())
643 .and_then(|key_map| key_map.get(public_key))
644 .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
645 .map(|prop| prop.data.clone());
646
647 let state_cert = if leaf.with_epoch
648 && is_epoch_root(
649 leaf.block_header().block_number(),
650 consensus_reader.epoch_height,
651 ) {
652 match consensus_reader.state_cert() {
653 Some(state_cert)
655 if state_cert.light_client_state.view_number
656 == leaf.view_number().u64() =>
657 {
658 Some(state_cert.clone())
659 },
660 _ => None,
661 }
662 } else {
663 None
664 };
665
666 res.leaf_views.push(LeafInfo::new(
668 leaf.clone(),
669 Arc::clone(&state),
670 delta.clone(),
671 vid_share,
672 state_cert,
673 ));
674 if let Some(ref payload) = leaf.block_payload() {
675 res.included_txns = Some(
676 payload
677 .transaction_commitments(leaf.block_header().metadata())
678 .into_iter()
679 .collect::<HashSet<_>>(),
680 );
681 }
682 }
683 true
684 },
685 ) {
686 tracing::debug!("Leaf ascension failed; error={e}");
687 }
688
689 let epoch_height = consensus_reader.epoch_height;
690 drop(consensus_reader);
691
692 if with_epochs && res.new_decided_view_number.is_some() {
693 for decided_leaf_info in &res.leaf_views {
694 decide_epoch_root::<TYPES, I>(
695 &decided_leaf_info.leaf,
696 epoch_height,
697 membership,
698 storage,
699 &consensus,
700 )
701 .await;
702 }
703 }
704
705 res
706}
707
708#[instrument(skip_all)]
710#[allow(clippy::too_many_arguments)]
711pub(crate) async fn parent_leaf_and_state<TYPES: NodeType>(
712 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
713 event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
714 membership: EpochMembershipCoordinator<TYPES>,
715 public_key: TYPES::SignatureKey,
716 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
717 consensus: OuterConsensus<TYPES>,
718 upgrade_lock: &UpgradeLock<TYPES>,
719 parent_qc: &QuorumCertificate2<TYPES>,
720 epoch_height: u64,
721) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
722 let consensus_reader = consensus.read().await;
723 let vsm_contains_parent_view = consensus_reader
724 .validated_state_map()
725 .contains_key(&parent_qc.view_number());
726 drop(consensus_reader);
727
728 if !vsm_contains_parent_view {
729 let _ = fetch_proposal(
730 parent_qc,
731 event_sender.clone(),
732 event_receiver.clone(),
733 membership,
734 consensus.clone(),
735 public_key.clone(),
736 private_key.clone(),
737 upgrade_lock,
738 epoch_height,
739 )
740 .await
741 .context(info!("Failed to fetch proposal"))?;
742 }
743
744 let consensus_reader = consensus.read().await;
745 let parent_view = consensus_reader
746 .validated_state_map()
747 .get(&parent_qc.view_number())
748 .context(debug!(
749 "Couldn't find parent view in state map, waiting for replica to see proposal; \
750 parent_view_number: {}",
751 *parent_qc.view_number()
752 ))?;
753
754 let (leaf_commitment, state) = parent_view.leaf_and_state().context(info!(
755 "Parent of high QC points to a view without a proposal; parent_view_number: {}, \
756 parent_view {:?}",
757 *parent_qc.view_number(),
758 parent_view
759 ))?;
760
761 if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
762 tracing::debug!(
764 "They don't equal: {:?} {:?}",
765 leaf_commitment,
766 consensus_reader.high_qc().data().leaf_commit
767 );
768 }
769
770 let leaf = consensus_reader
771 .saved_leaves()
772 .get(&leaf_commitment)
773 .context(info!("Failed to find high QC of parent"))?;
774
775 Ok((leaf.clone(), Arc::clone(state)))
776}
777
778pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
779 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
780 validation_info: &ValidationInfo<TYPES, I>,
781) -> Result<()> {
782 let in_transition_epoch = proposal
783 .data
784 .justify_qc()
785 .data
786 .block_number
787 .is_some_and(|bn| {
788 !is_transition_block(bn, validation_info.epoch_height)
789 && is_epoch_transition(bn, validation_info.epoch_height)
790 && bn % validation_info.epoch_height != 0
791 });
792 let justify_qc = proposal.data.justify_qc();
793 let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
794 if !in_transition_epoch {
795 tracing::debug!(
796 "Storing high QC for view {:?} and height {:?}",
797 justify_qc.view_number(),
798 justify_qc.data.block_number
799 );
800 if let Err(e) = validation_info
801 .storage
802 .update_high_qc2(justify_qc.clone())
803 .await
804 {
805 bail!("Failed to store High QC, not voting; error = {e:?}");
806 }
807 if justify_qc
808 .data
809 .block_number
810 .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
811 {
812 let Some(state_cert) = proposal.data.state_cert() else {
813 bail!("Epoch root QC has no state cert, not voting!");
814 };
815 if let Err(e) = validation_info
816 .storage
817 .update_state_cert(state_cert.clone())
818 .await
819 {
820 bail!(
821 "Failed to store the light client state update certificate, not voting; error \
822 = {:?}",
823 e
824 );
825 }
826 validation_info
827 .consensus
828 .write()
829 .await
830 .update_state_cert(state_cert.clone())?;
831 }
832 if let Some(next_epoch_justify_qc) = maybe_next_epoch_justify_qc
833 && let Err(e) = validation_info
834 .storage
835 .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
836 .await
837 {
838 bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
839 }
840 }
841 let mut consensus_writer = validation_info.consensus.write().await;
842 if let Some(next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
843 if justify_qc
844 .data
845 .block_number
846 .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
847 {
848 consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
849 consensus_writer
850 .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
851 return Ok(());
852 }
853 consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
854 }
855 consensus_writer.update_high_qc(justify_qc.clone())?;
856
857 Ok(())
858}
859
860async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
861 validation_info: &ValidationInfo<TYPES, I>,
862) -> Option<(
863 QuorumCertificate2<TYPES>,
864 NextEpochQuorumCertificate2<TYPES>,
865)> {
866 validation_info
867 .consensus
868 .read()
869 .await
870 .transition_qc()
871 .cloned()
872}
873
874pub(crate) async fn validate_epoch_transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
875 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
876 validation_info: &ValidationInfo<TYPES, I>,
877) -> Result<()> {
878 let proposed_qc = proposal.data.justify_qc();
879 let Some(qc_block_number) = proposed_qc.data().block_number else {
880 bail!("Justify QC has no block number");
881 };
882 if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
883 || qc_block_number % validation_info.epoch_height == 0
884 {
885 return Ok(());
886 }
887 let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
888 bail!("Next epoch justify QC is not present");
889 };
890 ensure!(
891 next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
892 "Next epoch QC has different leaf commit to justify QC"
893 );
894
895 if is_transition_block(qc_block_number, validation_info.epoch_height) {
896 ensure!(
898 transition_qc(validation_info)
899 .await
900 .is_none_or(|(qc, _)| qc.view_number() <= proposed_qc.view_number()),
901 "Proposed transition qc must have view number greater than or equal to previous \
902 transition QC"
903 );
904
905 validation_info
906 .consensus
907 .write()
908 .await
909 .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
910 update_high_qc(proposal, validation_info).await?;
912 } else {
913 ensure!(
915 transition_qc(validation_info)
916 .await
917 .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
918 "Transition block must have view number greater than previous transition QC"
919 );
920 ensure!(
921 proposal.data.view_change_evidence().is_none(),
922 "Second to last block and last block of epoch must directly extend previous block, Qc \
923 Block number: {qc_block_number}, Proposal Block number: {}",
924 proposal.data.block_header().block_number()
925 );
926 ensure!(
927 proposed_qc.view_number() + 1 == proposal.data.view_number()
928 || transition_qc(validation_info)
929 .await
930 .is_some_and(|(qc, _)| &qc == proposed_qc),
931 "Transition proposals must extend the previous view directly, or extend the previous \
932 transition block"
933 );
934 }
935 Ok(())
936}
937
938#[allow(clippy::too_many_lines)]
945#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
946pub(crate) async fn validate_proposal_safety_and_liveness<
947 TYPES: NodeType,
948 I: NodeImplementation<TYPES>,
949>(
950 proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
951 parent_leaf: Leaf2<TYPES>,
952 validation_info: &ValidationInfo<TYPES, I>,
953 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
954 sender: TYPES::SignatureKey,
955) -> Result<()> {
956 let view_number = proposal.data.view_number();
957
958 let mut valid_epoch_transition = false;
959 if validation_info
960 .upgrade_lock
961 .version(proposal.data.justify_qc().view_number())
962 .is_ok_and(|v| v >= EPOCH_VERSION)
963 {
964 let Some(block_number) = proposal.data.justify_qc().data.block_number else {
965 bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
966 };
967 if is_epoch_transition(block_number, validation_info.epoch_height) {
968 validate_epoch_transition_qc(&proposal, validation_info).await?;
969 valid_epoch_transition = true;
970 }
971 }
972
973 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
974 ensure!(
975 proposed_leaf.parent_commitment() == parent_leaf.commit(),
976 "Proposed leaf does not extend the parent leaf."
977 );
978 let proposal_epoch = option_epoch_from_block_number(
979 validation_info.upgrade_lock.epochs_enabled(view_number),
980 proposed_leaf.height(),
981 validation_info.epoch_height,
982 );
983
984 let state = Arc::new(
985 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
986 );
987
988 {
989 let mut consensus_writer = validation_info.consensus.write().await;
990 if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
991 tracing::trace!("{e:?}");
992 }
993
994 if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
997 tracing::debug!("Internal proposal update failed; error = {e:#}");
998 };
999 }
1000
1001 UpgradeCertificate::validate(
1002 proposal.data.upgrade_certificate(),
1003 &validation_info.membership,
1004 proposal_epoch,
1005 &validation_info.upgrade_lock,
1006 )
1007 .await?;
1008
1009 proposed_leaf.extends_upgrade(&parent_leaf, &validation_info.upgrade_lock)?;
1011
1012 let justify_qc = proposal.data.justify_qc().clone();
1013 {
1017 let consensus_reader = validation_info.consensus.read().await;
1018 let justify_qc_epoch = option_epoch_from_block_number(
1023 validation_info.upgrade_lock.epochs_enabled(view_number),
1024 parent_leaf.height(),
1025 validation_info.epoch_height,
1026 );
1027 ensure!(
1028 proposal_epoch == justify_qc_epoch
1029 || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
1030 {
1031 error!(
1032 "Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify \
1033 QC leaf is {parent_leaf:?}"
1034 )
1035 }
1036 );
1037
1038 if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
1040 && validation_info.upgrade_lock.epochs_enabled(view_number)
1041 {
1042 ensure!(
1043 proposal.data.next_epoch_justify_qc().is_some(),
1044 "Epoch transition proposal does not include the next epoch justify QC. Do not \
1045 vote!"
1046 );
1047 }
1048
1049 let liveness_check =
1051 justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1052
1053 let outcome = consensus_reader.visit_leaf_ancestors(
1056 justify_qc.view_number(),
1057 Terminator::Inclusive(consensus_reader.locked_view()),
1058 false,
1059 |leaf, _, _| {
1060 leaf.view_number() != consensus_reader.locked_view()
1063 },
1064 );
1065 let safety_check = outcome.is_ok();
1066
1067 ensure!(safety_check || liveness_check, {
1068 if let Err(e) = outcome {
1069 broadcast_event(
1070 Event {
1071 view_number,
1072 event: EventType::Error { error: Arc::new(e) },
1073 },
1074 &validation_info.output_event_stream,
1075 )
1076 .await;
1077 }
1078
1079 error!(
1080 "Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked \
1081 view is {:?}",
1082 consensus_reader.high_qc(),
1083 proposal.data,
1084 consensus_reader.locked_view()
1085 )
1086 });
1087 }
1088
1089 broadcast_event(
1091 Event {
1092 view_number,
1093 event: EventType::QuorumProposal {
1094 proposal: proposal.clone(),
1095 sender,
1096 },
1097 },
1098 &validation_info.output_event_stream,
1099 )
1100 .await;
1101
1102 broadcast_event(
1104 Arc::new(HotShotEvent::QuorumProposalValidated(
1105 proposal.clone(),
1106 parent_leaf,
1107 )),
1108 &event_stream,
1109 )
1110 .await;
1111
1112 Ok(())
1113}
1114
1115pub(crate) async fn validate_proposal_view_and_certs<
1122 TYPES: NodeType,
1123 I: NodeImplementation<TYPES>,
1124>(
1125 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1126 validation_info: &ValidationInfo<TYPES, I>,
1127) -> Result<()> {
1128 let view_number = proposal.data.view_number();
1129 ensure!(
1130 view_number + 1 >= validation_info.consensus.read().await.cur_view(),
1131 "Proposal is from an older view {:?}",
1132 proposal.data
1133 );
1134
1135 let mut membership = validation_info.membership.clone();
1137 proposal.validate_signature(&membership).await?;
1138
1139 if proposal.data.justify_qc().view_number() != view_number - 1 {
1141 let received_proposal_cert =
1142 proposal
1143 .data
1144 .view_change_evidence()
1145 .clone()
1146 .context(debug!(
1147 "Quorum proposal for view {view_number} needed a timeout or view sync \
1148 certificate, but did not have one",
1149 ))?;
1150
1151 match received_proposal_cert {
1152 ViewChangeEvidence2::Timeout(timeout_cert) => {
1153 ensure!(
1154 timeout_cert.data().view == view_number - 1,
1155 "Timeout certificate for view {view_number} was not for the immediately \
1156 preceding view"
1157 );
1158 let timeout_cert_epoch = timeout_cert.data().epoch();
1159 membership = membership.get_new_epoch(timeout_cert_epoch).await?;
1160
1161 let membership_stake_table = membership.stake_table().await;
1162 let membership_success_threshold = membership.success_threshold().await;
1163
1164 timeout_cert
1165 .is_valid_cert(
1166 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1167 membership_success_threshold,
1168 &validation_info.upgrade_lock,
1169 )
1170 .await
1171 .context(|e| {
1172 warn!("Timeout certificate for view {view_number} was invalid: {e}")
1173 })?;
1174 },
1175 ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1176 ensure!(
1177 view_sync_cert.view_number == view_number,
1178 "View sync cert view number {:?} does not match proposal view number {:?}",
1179 view_sync_cert.view_number,
1180 view_number
1181 );
1182
1183 let view_sync_cert_epoch = view_sync_cert.data().epoch();
1184 membership = membership.get_new_epoch(view_sync_cert_epoch).await?;
1185
1186 let membership_stake_table = membership.stake_table().await;
1187 let membership_success_threshold = membership.success_threshold().await;
1188
1189 view_sync_cert
1191 .is_valid_cert(
1192 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1193 membership_success_threshold,
1194 &validation_info.upgrade_lock,
1195 )
1196 .await
1197 .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1198 },
1199 }
1200 }
1201
1202 {
1205 let epoch = option_epoch_from_block_number(
1206 proposal.data.epoch().is_some(),
1207 proposal.data.block_header().block_number(),
1208 validation_info.epoch_height,
1209 );
1210 UpgradeCertificate::validate(
1211 proposal.data.upgrade_certificate(),
1212 &validation_info.membership,
1213 epoch,
1214 &validation_info.upgrade_lock,
1215 )
1216 .await?;
1217 }
1218
1219 Ok(())
1220}
1221
1222pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1224 match sender.broadcast_direct(event).await {
1225 Ok(None) => (),
1226 Ok(Some(overflowed)) => {
1227 tracing::error!(
1228 "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1229 );
1230 },
1231 Err(SendError(e)) => {
1232 tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1233 },
1234 }
1235}
1236
1237pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType>(
1240 qc: &QuorumCertificate2<TYPES>,
1241 maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1242 consensus: &OuterConsensus<TYPES>,
1243 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1244 upgrade_lock: &UpgradeLock<TYPES>,
1245 epoch_height: u64,
1246) -> Result<()> {
1247 let cert = CertificatePair::new(qc.clone(), maybe_next_epoch_qc.cloned());
1248
1249 let mut epoch_membership = membership_coordinator
1250 .stake_table_for_epoch(cert.epoch())
1251 .await?;
1252
1253 let membership_stake_table = epoch_membership.stake_table().await;
1254 let membership_success_threshold = epoch_membership.success_threshold().await;
1255
1256 if let Err(e) = cert
1257 .qc()
1258 .is_valid_cert(
1259 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1260 membership_success_threshold,
1261 upgrade_lock,
1262 )
1263 .await
1264 {
1265 consensus.read().await.metrics.invalid_qc.update(1);
1266 return Err(warn!("Invalid certificate: {e}"));
1267 }
1268
1269 if upgrade_lock.epochs_enabled(cert.view_number())
1271 && let Some(next_epoch_qc) = cert.verify_next_epoch_qc(epoch_height)?
1272 {
1273 epoch_membership = epoch_membership.next_epoch_stake_table().await?;
1274 let membership_next_stake_table = epoch_membership.stake_table().await;
1275 let membership_next_success_threshold = epoch_membership.success_threshold().await;
1276 next_epoch_qc
1277 .is_valid_cert(
1278 &StakeTableEntries::<TYPES>::from(membership_next_stake_table).0,
1279 membership_next_success_threshold,
1280 upgrade_lock,
1281 )
1282 .await
1283 .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1284 }
1285
1286 Ok(())
1287}
1288
1289pub async fn validate_light_client_state_update_certificate<TYPES: NodeType>(
1291 state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1292 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1293 upgrade_lock: &UpgradeLock<TYPES>,
1294) -> Result<()> {
1295 tracing::debug!("Validating light client state update certificate");
1296
1297 let epoch_membership = membership_coordinator
1298 .membership_for_epoch(state_cert.epoch())
1299 .await?;
1300
1301 let membership_stake_table = epoch_membership.stake_table().await;
1302 let membership_success_threshold = epoch_membership.success_threshold().await;
1303
1304 let mut state_key_map = HashMap::new();
1305 membership_stake_table.into_iter().for_each(|config| {
1306 state_key_map.insert(
1307 config.state_ver_key.clone(),
1308 config.stake_table_entry.stake(),
1309 );
1310 });
1311
1312 let mut accumulated_stake = U256::from(0);
1313 let signed_state_digest = derive_signed_state_digest(
1314 &state_cert.light_client_state,
1315 &state_cert.next_stake_table_state,
1316 &state_cert.auth_root,
1317 );
1318 for (key, sig, sig_v2) in state_cert.signatures.iter() {
1319 if let Some(stake) = state_key_map.get(key) {
1320 accumulated_stake += *stake;
1321 #[allow(clippy::collapsible_else_if)]
1322 if !upgrade_lock
1324 .proposal2_version(ViewNumber::new(state_cert.light_client_state.view_number))
1325 {
1326 if !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1327 key,
1328 sig_v2,
1329 &state_cert.light_client_state,
1330 &state_cert.next_stake_table_state,
1331 ) {
1332 bail!("Invalid light client state update certificate signature");
1333 }
1334 } else {
1335 if !<TYPES::StateSignatureKey as LCV3StateSignatureKey>::verify_state_sig(
1336 key,
1337 sig,
1338 signed_state_digest,
1339 ) || !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1340 key,
1341 sig_v2,
1342 &state_cert.light_client_state,
1343 &state_cert.next_stake_table_state,
1344 ) {
1345 bail!("Invalid light client state update certificate signature");
1346 }
1347 }
1348 } else {
1349 bail!("Invalid light client state update certificate signature");
1350 }
1351 }
1352 if accumulated_stake < membership_success_threshold {
1353 bail!("Light client state update certificate does not meet the success threshold");
1354 }
1355
1356 Ok(())
1357}
1358
1359pub(crate) fn check_qc_state_cert_correspondence<TYPES: NodeType>(
1360 qc: &QuorumCertificate2<TYPES>,
1361 state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1362 epoch_height: u64,
1363) -> bool {
1364 qc.data
1365 .block_number
1366 .is_some_and(|bn| is_epoch_root(bn, epoch_height))
1367 && Some(state_cert.epoch) == qc.data.epoch()
1368 && qc.view_number().u64() == state_cert.light_client_state.view_number
1369}
1370
1371pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1375 target_epoch: Option<EpochNumber>,
1376 vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1377 da_cert: &DaCertificate2<TYPES>,
1378 consensus: &OuterConsensus<TYPES>,
1379 receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1380 cancel_receiver: Receiver<()>,
1381 id: u64,
1382) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1383 tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1384 let maybe_second_vid_share = consensus
1385 .read()
1386 .await
1387 .vid_shares()
1388 .get(&vid_share.data.view_number())
1389 .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1390 .and_then(|epoch_map| epoch_map.get(&target_epoch))
1391 .cloned();
1392 if let Some(second_vid_share) = maybe_second_vid_share
1393 && ((target_epoch == da_cert.epoch()
1394 && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1395 || (target_epoch != da_cert.epoch()
1396 && Some(second_vid_share.data.payload_commitment())
1397 == da_cert.data().next_epoch_payload_commit))
1398 {
1399 return Ok(second_vid_share);
1400 }
1401
1402 let receiver = receiver.clone();
1403 let da_cert_clone = da_cert.clone();
1404 let Some(event) = EventDependency::new(
1405 receiver,
1406 cancel_receiver,
1407 format!(
1408 "VoteDependency Second VID share for view {:?}, my id {:?}",
1409 vid_share.data.view_number(),
1410 id
1411 ),
1412 Box::new(move |event| {
1413 let event = event.as_ref();
1414 if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1415 if target_epoch == da_cert_clone.epoch() {
1416 second_vid_share.data.payload_commitment()
1417 == da_cert_clone.data().payload_commit
1418 } else {
1419 Some(second_vid_share.data.payload_commitment())
1420 == da_cert_clone.data().next_epoch_payload_commit
1421 }
1422 } else {
1423 false
1424 }
1425 }),
1426 )
1427 .completed()
1428 .await
1429 else {
1430 return Err(warn!("Error while waiting for the second VID share."));
1431 };
1432 let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1433 return Err(warn!(
1435 "Received event is not VidShareValidated but we checked it earlier. Shouldn't be \
1436 possible."
1437 ));
1438 };
1439 Ok(second_vid_share.clone())
1440}
1441
1442pub async fn broadcast_view_change<TYPES: NodeType>(
1443 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1444 new_view_number: ViewNumber,
1445 epoch: Option<EpochNumber>,
1446 first_epoch: Option<(ViewNumber, EpochNumber)>,
1447) {
1448 let mut broadcast_epoch = epoch;
1449 if let Some((first_epoch_view, first_epoch)) = first_epoch
1450 && new_view_number == first_epoch_view
1451 && broadcast_epoch != Some(first_epoch)
1452 {
1453 broadcast_epoch = Some(first_epoch);
1454 }
1455 tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1456 broadcast_event(
1457 Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1458 sender,
1459 )
1460 .await
1461}
1462
1463pub fn derive_signed_state_digest(
1464 lc_state: &LightClientState,
1465 next_stake_state: &StakeTableState,
1466 auth_root: &FixedBytes<32>,
1467) -> CircuitField {
1468 let lc_state_sol: LightClientStateSol = (*lc_state).into();
1469 let stake_st_sol: StakeTableStateSol = (*next_stake_state).into();
1470
1471 let res = alloy::primitives::keccak256(
1472 (
1473 lc_state_sol.abi_encode(),
1474 stake_st_sol.abi_encode(),
1475 auth_root.abi_encode(),
1476 )
1477 .abi_encode_packed(),
1478 );
1479 CircuitField::from_be_bytes_mod_order(res.as_ref())
1480}