1use std::{
8 collections::{HashMap, HashSet},
9 sync::Arc,
10 time::Instant,
11};
12
13use alloy::{
14 primitives::{FixedBytes, U256},
15 sol_types::SolValue,
16};
17use ark_ff::PrimeField;
18use async_broadcast::{Receiver, SendError, Sender};
19use async_lock::RwLock;
20use committable::{Commitment, Committable};
21use hotshot_contract_adapter::sol_types::{LightClientStateSol, StakeTableStateSol};
22use hotshot_task::dependency::{Dependency, EventDependency};
23use hotshot_types::{
24 consensus::OuterConsensus,
25 data::{Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2},
26 drb::DrbResult,
27 epoch_membership::EpochMembershipCoordinator,
28 event::{Event, EventType, LeafInfo},
29 light_client::{CircuitField, LightClientState, StakeTableState},
30 message::{Proposal, UpgradeLock},
31 request_response::ProposalRequestPayload,
32 simple_certificate::{
33 CertificatePair, DaCertificate2, LightClientStateUpdateCertificateV2,
34 NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
35 },
36 simple_vote::HasEpoch,
37 stake_table::StakeTableEntries,
38 traits::{
39 block_contents::BlockHeader,
40 election::Membership,
41 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
42 signature_key::{
43 LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StakeTableEntryType,
44 },
45 storage::Storage,
46 BlockPayload, ValidatedState,
47 },
48 utils::{
49 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_transition_block,
50 option_epoch_from_block_number, Terminator, View, ViewInner,
51 },
52 vote::{Certificate, HasViewNumber},
53};
54use hotshot_utils::anytrace::*;
55use time::OffsetDateTime;
56use tokio::time::timeout;
57use tracing::instrument;
58use vbs::version::StaticVersionType;
59
60use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
61
62#[instrument(skip_all)]
64#[allow(clippy::too_many_arguments)]
65pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
66 qc: &QuorumCertificate2<TYPES>,
67 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
68 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
69 membership_coordinator: EpochMembershipCoordinator<TYPES>,
70 consensus: OuterConsensus<TYPES>,
71 sender_public_key: TYPES::SignatureKey,
72 sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
73 upgrade_lock: &UpgradeLock<TYPES, V>,
74 epoch_height: u64,
75) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
76 let view_number = qc.view_number();
77 let leaf_commit = qc.data.leaf_commit;
78 let signed_proposal_request = ProposalRequestPayload {
81 view_number,
82 key: sender_public_key,
83 };
84
85 let signature = TYPES::SignatureKey::sign(
87 &sender_private_key,
88 signed_proposal_request.commit().as_ref(),
89 )
90 .wrap()
91 .context(error!("Failed to sign proposal. This should never happen."))?;
92
93 tracing::info!("Sending proposal request for view {view_number}");
94
95 broadcast_event(
97 HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
98 &event_sender,
99 )
100 .await;
101
102 let mut rx = event_receiver.clone();
103 let Ok(Some(proposal)) =
105 timeout(REQUEST_TIMEOUT, async move {
107 while let Ok(event) = rx.recv_direct().await {
109 if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
110 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
111 if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
112 return Some(quorum_proposal.clone());
113 }
114 }
115 }
116 None
117 })
118 .await
119 else {
120 bail!("Request for proposal failed");
121 };
122
123 let view_number = proposal.data.view_number();
124 let justify_qc = proposal.data.justify_qc().clone();
125
126 let justify_qc_epoch = justify_qc.data.epoch();
127
128 let epoch_membership = membership_coordinator
129 .stake_table_for_epoch(justify_qc_epoch)
130 .await?;
131 let membership_stake_table = epoch_membership.stake_table().await;
132 let membership_success_threshold = epoch_membership.success_threshold().await;
133
134 justify_qc
135 .is_valid_cert(
136 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
137 membership_success_threshold,
138 upgrade_lock,
139 )
140 .await
141 .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
142
143 let mut consensus_writer = consensus.write().await;
144 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
145 let state = Arc::new(
146 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
147 );
148
149 if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
150 tracing::trace!("{e:?}");
151 }
152 let view = View {
153 view_inner: ViewInner::Leaf {
154 leaf: leaf.commit(),
155 state,
156 delta: None,
157 epoch: leaf.epoch(epoch_height),
158 },
159 };
160 Ok((leaf, view))
161}
162pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
163 membership: &Arc<RwLock<TYPES::Membership>>,
164 epoch: TYPES::Epoch,
165 storage: &I::Storage,
166 drb_result: DrbResult,
167) {
168 tracing::debug!("Calling store_drb_result for epoch {epoch}");
169 if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
170 tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
171 }
172
173 membership.write().await.add_drb_result(epoch, drb_result)
174}
175
176pub(crate) async fn verify_drb_result<
183 TYPES: NodeType,
184 I: NodeImplementation<TYPES>,
185 V: Versions,
186>(
187 proposal: &QuorumProposalWrapper<TYPES>,
188 validation_info: &ValidationInfo<TYPES, I, V>,
189) -> Result<()> {
190 if validation_info.epoch_height == 0
192 || !is_epoch_transition(
193 proposal.block_header().block_number(),
194 validation_info.epoch_height,
195 )
196 {
197 tracing::debug!("Skipping DRB result verification");
198 return Ok(());
199 }
200
201 let epoch = option_epoch_from_block_number::<TYPES>(
205 validation_info
206 .upgrade_lock
207 .epochs_enabled(proposal.view_number())
208 .await,
209 proposal.block_header().block_number(),
210 validation_info.epoch_height,
211 );
212
213 let proposal_result = proposal
214 .next_drb_result()
215 .context(info!("Proposal is missing the next epoch's DRB result."))?;
216
217 if let Some(epoch_val) = epoch {
218 let current_epoch_membership = validation_info
219 .membership
220 .coordinator
221 .stake_table_for_epoch(epoch)
222 .await
223 .context(warn!("No stake table for epoch {}", epoch_val))?;
224
225 let has_stake_current_epoch = current_epoch_membership
226 .has_stake(&validation_info.public_key)
227 .await;
228
229 if has_stake_current_epoch {
230 let computed_result = current_epoch_membership
231 .next_epoch()
232 .await
233 .context(warn!("No stake table for epoch {}", epoch_val + 1))?
234 .get_epoch_drb()
235 .await
236 .clone()
237 .context(warn!("DRB result not found"))?;
238
239 ensure!(
240 proposal_result == computed_result,
241 warn!(
242 "Our calculated DRB result is {computed_result:?}, which does not match the \
243 proposed DRB result of {proposal_result:?}"
244 )
245 );
246 }
247
248 Ok(())
249 } else {
250 Err(error!("Epochs are not available"))
251 }
252}
253
254async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
256 decided_leaf: &Leaf2<TYPES>,
257 epoch_height: u64,
258 membership: &EpochMembershipCoordinator<TYPES>,
259 storage: &I::Storage,
260 consensus: &OuterConsensus<TYPES>,
261) {
262 let decided_leaf = decided_leaf.clone();
263 let decided_block_number = decided_leaf.block_header().block_number();
264
265 if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
267 let next_epoch_number =
268 TYPES::Epoch::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
269
270 let start = Instant::now();
271 if let Err(e) = storage
272 .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
273 .await
274 {
275 tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
276 }
277 tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
278
279 let membership = membership.clone();
280 let decided_block_header = decided_leaf.block_header().clone();
281 let storage = storage.clone();
282 let consensus = consensus.clone();
283
284 let consensus_reader = consensus.read().await;
285
286 drop(consensus_reader);
287
288 tokio::spawn(async move {
289 let membership_clone = membership.clone();
290
291 {
293 let start = Instant::now();
294 if let Err(e) = Membership::add_epoch_root(
295 Arc::clone(membership_clone.membership()),
296 decided_block_header,
297 )
298 .await
299 {
300 tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
301 }
302 tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
303 }
304
305 let membership_clone = membership.clone();
306
307 let drb_result = membership_clone
308 .compute_drb_result(next_epoch_number, decided_leaf.clone())
309 .await;
310
311 let drb_result = match drb_result {
312 Ok(result) => result,
313 Err(e) => {
314 tracing::error!("Failed to compute DRB result from decide: {e}");
315 return;
316 },
317 };
318
319 let start = Instant::now();
320 handle_drb_result::<TYPES, I>(
321 membership.membership(),
322 next_epoch_number,
323 &storage,
324 drb_result,
325 )
326 .await;
327 tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
328 });
329 }
330}
331
332#[derive(Debug)]
334pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
335 pub new_locked_view_number: Option<TYPES::View>,
337
338 pub new_decided_view_number: Option<TYPES::View>,
340
341 pub committing_qc: Option<CertificatePair<TYPES>>,
345
346 pub deciding_qc: Option<CertificatePair<TYPES>>,
351
352 pub leaf_views: Vec<LeafInfo<TYPES>>,
354
355 pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
357
358 pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
360}
361
362impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
366 fn default() -> Self {
368 Self {
369 new_locked_view_number: None,
370 new_decided_view_number: None,
371 committing_qc: None,
372 deciding_qc: None,
373 leaf_views: Vec::new(),
374 included_txns: None,
375 decided_upgrade_cert: None,
376 }
377 }
378}
379
380async fn update_metrics<TYPES: NodeType>(
381 consensus: &OuterConsensus<TYPES>,
382 leaf_views: &[LeafInfo<TYPES>],
383) {
384 let consensus_reader = consensus.read().await;
385 let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
386
387 for leaf_view in leaf_views {
388 let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
389
390 let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
391 tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
392 continue;
393 };
394 consensus_reader
395 .metrics
396 .proposal_to_decide_time
397 .add_point(proposal_to_decide_time as f64);
398 if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
399 consensus_reader
400 .metrics
401 .finalized_bytes
402 .add_point(txn_bytes as f64);
403 }
404 }
405}
406
407#[allow(clippy::too_many_arguments)]
413pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
414 proposal: &QuorumProposalWrapper<TYPES>,
415 consensus: OuterConsensus<TYPES>,
416 existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
417 public_key: &TYPES::SignatureKey,
418 with_epochs: bool,
419 membership: &EpochMembershipCoordinator<TYPES>,
420 storage: &I::Storage,
421) -> LeafChainTraversalOutcome<TYPES> {
422 let mut res = LeafChainTraversalOutcome::default();
423 let consensus_reader = consensus.read().await;
424 let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
425 res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
426
427 let Some(parent_info) = consensus_reader
429 .parent_leaf_info(&proposed_leaf, public_key)
430 .await
431 else {
432 return res;
433 };
434 let Some(grand_parent_info) = consensus_reader
437 .parent_leaf_info(&parent_info.leaf, public_key)
438 .await
439 else {
440 return res;
441 };
442 if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
443 return res;
444 }
445 res.committing_qc = Some(CertificatePair::for_parent(&parent_info.leaf));
446 res.deciding_qc = Some(CertificatePair::for_parent(&proposed_leaf));
447 let decided_view_number = grand_parent_info.leaf.view_number();
448 res.new_decided_view_number = Some(decided_view_number);
449 let old_anchor_view = consensus_reader.last_decided_view();
451 let mut current_leaf_info = Some(grand_parent_info);
452 let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
453 let mut txns = HashSet::new();
454 while current_leaf_info
455 .as_ref()
456 .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
457 {
458 let info = &mut current_leaf_info.unwrap();
460 if let Some(cert) = info.leaf.upgrade_certificate() {
462 if info.leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
463 if cert.data.decide_by < decided_view_number {
464 tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
465 } else {
466 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
467 res.decided_upgrade_cert = Some(cert.clone());
468 }
469 }
470 }
471
472 if let Some(payload) = consensus_reader
475 .saved_payloads()
476 .get(&info.leaf.view_number())
477 {
478 info.leaf
479 .fill_block_payload_unchecked(payload.as_ref().payload.clone());
480 }
481
482 if let Some(ref payload) = info.leaf.block_payload() {
483 for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
484 txns.insert(txn);
485 }
486 }
487
488 current_leaf_info = consensus_reader
489 .parent_leaf_info(&info.leaf, public_key)
490 .await;
491 res.leaf_views.push(info.clone());
492 }
493
494 if !txns.is_empty() {
495 res.included_txns = Some(txns);
496 }
497
498 if with_epochs && res.new_decided_view_number.is_some() {
499 let Some(first_leaf) = res.leaf_views.first() else {
500 return res;
501 };
502 let epoch_height = consensus_reader.epoch_height;
503 consensus_reader
504 .metrics
505 .last_synced_block_height
506 .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
507 drop(consensus_reader);
508
509 for decided_leaf_info in &res.leaf_views {
510 decide_epoch_root::<TYPES, I>(
511 &decided_leaf_info.leaf,
512 epoch_height,
513 membership,
514 storage,
515 &consensus,
516 )
517 .await;
518 }
519 update_metrics(&consensus, &res.leaf_views).await;
520 }
521
522 res
523}
524
525#[allow(clippy::too_many_arguments)]
557pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>>(
558 proposal: &QuorumProposalWrapper<TYPES>,
559 consensus: OuterConsensus<TYPES>,
560 existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
561 public_key: &TYPES::SignatureKey,
562 with_epochs: bool,
563 membership: &EpochMembershipCoordinator<TYPES>,
564 storage: &I::Storage,
565 epoch_height: u64,
566) -> LeafChainTraversalOutcome<TYPES> {
567 let consensus_reader = consensus.read().await;
568 let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
569 let view_number = proposal.view_number();
570 let parent_view_number = proposal.justify_qc().view_number();
571 let old_anchor_view = consensus_reader.last_decided_view();
572
573 let mut last_view_number_visited = view_number;
574 let mut current_chain_length = 0usize;
575 let mut res = LeafChainTraversalOutcome::default();
576
577 if let Err(e) = consensus_reader.visit_leaf_ancestors(
578 parent_view_number,
579 Terminator::Exclusive(old_anchor_view),
580 true,
581 |leaf, state, delta| {
582 if res.new_decided_view_number.is_none() {
584 if last_view_number_visited == leaf.view_number() + 1 {
586 last_view_number_visited = leaf.view_number();
587
588 current_chain_length += 1;
590
591 if current_chain_length == 2 {
593 res.new_locked_view_number = Some(leaf.view_number());
594 res.committing_qc = Some(CertificatePair::for_parent(leaf));
597 } else if current_chain_length == 3 {
598 res.new_decided_view_number = Some(leaf.view_number());
600 }
601 } else {
602 return false;
605 }
606 }
607
608 if let Some(new_decided_view) = res.new_decided_view_number {
610 let mut leaf = leaf.clone();
612
613 if leaf.view_number() == new_decided_view {
615 consensus_reader
616 .metrics
617 .last_synced_block_height
618 .set(usize::try_from(leaf.height()).unwrap_or(0));
619 }
620
621 if let Some(cert) = leaf.upgrade_certificate() {
623 if leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
624 if cert.data.decide_by < view_number {
625 tracing::warn!(
626 "Failed to decide an upgrade certificate in time. Ignoring."
627 );
628 } else {
629 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
630 res.decided_upgrade_cert = Some(cert.clone());
631 }
632 }
633 }
634 if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
637 leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
638 }
639
640 let vid_share = consensus_reader
643 .vid_shares()
644 .get(&leaf.view_number())
645 .and_then(|key_map| key_map.get(public_key))
646 .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
647 .map(|prop| prop.data.clone());
648
649 let state_cert = if leaf.with_epoch
650 && is_epoch_root(
651 leaf.block_header().block_number(),
652 consensus_reader.epoch_height,
653 ) {
654 match consensus_reader.state_cert() {
655 Some(state_cert)
657 if state_cert.light_client_state.view_number
658 == leaf.view_number().u64() =>
659 {
660 Some(state_cert.clone())
661 },
662 _ => None,
663 }
664 } else {
665 None
666 };
667
668 res.leaf_views.push(LeafInfo::new(
670 leaf.clone(),
671 Arc::clone(&state),
672 delta.clone(),
673 vid_share,
674 state_cert,
675 ));
676 if let Some(ref payload) = leaf.block_payload() {
677 res.included_txns = Some(
678 payload
679 .transaction_commitments(leaf.block_header().metadata())
680 .into_iter()
681 .collect::<HashSet<_>>(),
682 );
683 }
684 }
685 true
686 },
687 ) {
688 tracing::debug!("Leaf ascension failed; error={e}");
689 }
690
691 let epoch_height = consensus_reader.epoch_height;
692 drop(consensus_reader);
693
694 if with_epochs && res.new_decided_view_number.is_some() {
695 for decided_leaf_info in &res.leaf_views {
696 decide_epoch_root::<TYPES, I>(
697 &decided_leaf_info.leaf,
698 epoch_height,
699 membership,
700 storage,
701 &consensus,
702 )
703 .await;
704 }
705 }
706
707 res
708}
709
710#[instrument(skip_all)]
712#[allow(clippy::too_many_arguments)]
713pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
714 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
715 event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
716 membership: EpochMembershipCoordinator<TYPES>,
717 public_key: TYPES::SignatureKey,
718 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
719 consensus: OuterConsensus<TYPES>,
720 upgrade_lock: &UpgradeLock<TYPES, V>,
721 parent_qc: &QuorumCertificate2<TYPES>,
722 epoch_height: u64,
723) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
724 let consensus_reader = consensus.read().await;
725 let vsm_contains_parent_view = consensus_reader
726 .validated_state_map()
727 .contains_key(&parent_qc.view_number());
728 drop(consensus_reader);
729
730 if !vsm_contains_parent_view {
731 let _ = fetch_proposal(
732 parent_qc,
733 event_sender.clone(),
734 event_receiver.clone(),
735 membership,
736 consensus.clone(),
737 public_key.clone(),
738 private_key.clone(),
739 upgrade_lock,
740 epoch_height,
741 )
742 .await
743 .context(info!("Failed to fetch proposal"))?;
744 }
745
746 let consensus_reader = consensus.read().await;
747 let parent_view = consensus_reader
748 .validated_state_map()
749 .get(&parent_qc.view_number())
750 .context(debug!(
751 "Couldn't find parent view in state map, waiting for replica to see proposal; \
752 parent_view_number: {}",
753 *parent_qc.view_number()
754 ))?;
755
756 let (leaf_commitment, state) = parent_view.leaf_and_state().context(info!(
757 "Parent of high QC points to a view without a proposal; parent_view_number: {}, \
758 parent_view {:?}",
759 *parent_qc.view_number(),
760 parent_view
761 ))?;
762
763 if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
764 tracing::debug!(
766 "They don't equal: {:?} {:?}",
767 leaf_commitment,
768 consensus_reader.high_qc().data().leaf_commit
769 );
770 }
771
772 let leaf = consensus_reader
773 .saved_leaves()
774 .get(&leaf_commitment)
775 .context(info!("Failed to find high QC of parent"))?;
776
777 Ok((leaf.clone(), Arc::clone(state)))
778}
779
780pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
781 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
782 validation_info: &ValidationInfo<TYPES, I, V>,
783) -> Result<()> {
784 let in_transition_epoch = proposal
785 .data
786 .justify_qc()
787 .data
788 .block_number
789 .is_some_and(|bn| {
790 !is_transition_block(bn, validation_info.epoch_height)
791 && is_epoch_transition(bn, validation_info.epoch_height)
792 && bn % validation_info.epoch_height != 0
793 });
794 let justify_qc = proposal.data.justify_qc();
795 let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
796 if !in_transition_epoch {
797 tracing::debug!(
798 "Storing high QC for view {:?} and height {:?}",
799 justify_qc.view_number(),
800 justify_qc.data.block_number
801 );
802 if let Err(e) = validation_info
803 .storage
804 .update_high_qc2(justify_qc.clone())
805 .await
806 {
807 bail!("Failed to store High QC, not voting; error = {e:?}");
808 }
809 if justify_qc
810 .data
811 .block_number
812 .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
813 {
814 let Some(state_cert) = proposal.data.state_cert() else {
815 bail!("Epoch root QC has no state cert, not voting!");
816 };
817 if let Err(e) = validation_info
818 .storage
819 .update_state_cert(state_cert.clone())
820 .await
821 {
822 bail!(
823 "Failed to store the light client state update certificate, not voting; error \
824 = {:?}",
825 e
826 );
827 }
828 validation_info
829 .consensus
830 .write()
831 .await
832 .update_state_cert(state_cert.clone())?;
833 }
834 if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
835 if let Err(e) = validation_info
836 .storage
837 .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
838 .await
839 {
840 bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
841 }
842 }
843 }
844 let mut consensus_writer = validation_info.consensus.write().await;
845 if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
846 if justify_qc
847 .data
848 .block_number
849 .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
850 {
851 consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
852 consensus_writer
853 .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
854 return Ok(());
855 }
856 consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
857 }
858 consensus_writer.update_high_qc(justify_qc.clone())?;
859
860 Ok(())
861}
862
863async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
864 validation_info: &ValidationInfo<TYPES, I, V>,
865) -> Option<(
866 QuorumCertificate2<TYPES>,
867 NextEpochQuorumCertificate2<TYPES>,
868)> {
869 validation_info
870 .consensus
871 .read()
872 .await
873 .transition_qc()
874 .cloned()
875}
876
877pub(crate) async fn validate_epoch_transition_qc<
878 TYPES: NodeType,
879 I: NodeImplementation<TYPES>,
880 V: Versions,
881>(
882 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
883 validation_info: &ValidationInfo<TYPES, I, V>,
884) -> Result<()> {
885 let proposed_qc = proposal.data.justify_qc();
886 let Some(qc_block_number) = proposed_qc.data().block_number else {
887 bail!("Justify QC has no block number");
888 };
889 if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
890 || qc_block_number % validation_info.epoch_height == 0
891 {
892 return Ok(());
893 }
894 let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
895 bail!("Next epoch justify QC is not present");
896 };
897 ensure!(
898 next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
899 "Next epoch QC has different leaf commit to justify QC"
900 );
901
902 if is_transition_block(qc_block_number, validation_info.epoch_height) {
903 ensure!(
905 transition_qc(validation_info)
906 .await
907 .is_none_or(|(qc, _)| qc.view_number() <= proposed_qc.view_number()),
908 "Proposed transition qc must have view number greater than or equal to previous \
909 transition QC"
910 );
911
912 validation_info
913 .consensus
914 .write()
915 .await
916 .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
917 update_high_qc(proposal, validation_info).await?;
919 } else {
920 ensure!(
922 transition_qc(validation_info)
923 .await
924 .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
925 "Transition block must have view number greater than previous transition QC"
926 );
927 ensure!(
928 proposal.data.view_change_evidence().is_none(),
929 "Second to last block and last block of epoch must directly extend previous block, Qc \
930 Block number: {qc_block_number}, Proposal Block number: {}",
931 proposal.data.block_header().block_number()
932 );
933 ensure!(
934 proposed_qc.view_number() + 1 == proposal.data.view_number()
935 || transition_qc(validation_info)
936 .await
937 .is_some_and(|(qc, _)| &qc == proposed_qc),
938 "Transition proposals must extend the previous view directly, or extend the previous \
939 transition block"
940 );
941 }
942 Ok(())
943}
944
945#[allow(clippy::too_many_lines)]
952#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
953pub(crate) async fn validate_proposal_safety_and_liveness<
954 TYPES: NodeType,
955 I: NodeImplementation<TYPES>,
956 V: Versions,
957>(
958 proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
959 parent_leaf: Leaf2<TYPES>,
960 validation_info: &ValidationInfo<TYPES, I, V>,
961 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
962 sender: TYPES::SignatureKey,
963) -> Result<()> {
964 let view_number = proposal.data.view_number();
965
966 let mut valid_epoch_transition = false;
967 if validation_info
968 .upgrade_lock
969 .version(proposal.data.justify_qc().view_number())
970 .await
971 .is_ok_and(|v| v >= V::Epochs::VERSION)
972 {
973 let Some(block_number) = proposal.data.justify_qc().data.block_number else {
974 bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
975 };
976 if is_epoch_transition(block_number, validation_info.epoch_height) {
977 validate_epoch_transition_qc(&proposal, validation_info).await?;
978 valid_epoch_transition = true;
979 }
980 }
981
982 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
983 ensure!(
984 proposed_leaf.parent_commitment() == parent_leaf.commit(),
985 "Proposed leaf does not extend the parent leaf."
986 );
987 let proposal_epoch = option_epoch_from_block_number::<TYPES>(
988 validation_info
989 .upgrade_lock
990 .epochs_enabled(view_number)
991 .await,
992 proposed_leaf.height(),
993 validation_info.epoch_height,
994 );
995
996 let state = Arc::new(
997 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
998 );
999
1000 {
1001 let mut consensus_writer = validation_info.consensus.write().await;
1002 if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
1003 tracing::trace!("{e:?}");
1004 }
1005
1006 if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
1009 tracing::debug!("Internal proposal update failed; error = {e:#}");
1010 };
1011 }
1012
1013 UpgradeCertificate::validate(
1014 proposal.data.upgrade_certificate(),
1015 &validation_info.membership,
1016 proposal_epoch,
1017 &validation_info.upgrade_lock,
1018 )
1019 .await?;
1020
1021 proposed_leaf
1023 .extends_upgrade(
1024 &parent_leaf,
1025 &validation_info.upgrade_lock.decided_upgrade_certificate,
1026 )
1027 .await?;
1028
1029 let justify_qc = proposal.data.justify_qc().clone();
1030 {
1034 let consensus_reader = validation_info.consensus.read().await;
1035 let justify_qc_epoch = option_epoch_from_block_number::<TYPES>(
1040 validation_info
1041 .upgrade_lock
1042 .epochs_enabled(view_number)
1043 .await,
1044 parent_leaf.height(),
1045 validation_info.epoch_height,
1046 );
1047 ensure!(
1048 proposal_epoch == justify_qc_epoch
1049 || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
1050 {
1051 error!(
1052 "Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify \
1053 QC leaf is {parent_leaf:?}"
1054 )
1055 }
1056 );
1057
1058 if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
1060 && validation_info
1061 .upgrade_lock
1062 .epochs_enabled(view_number)
1063 .await
1064 {
1065 ensure!(
1066 proposal.data.next_epoch_justify_qc().is_some(),
1067 "Epoch transition proposal does not include the next epoch justify QC. Do not \
1068 vote!"
1069 );
1070 }
1071
1072 let liveness_check =
1074 justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1075
1076 let outcome = consensus_reader.visit_leaf_ancestors(
1079 justify_qc.view_number(),
1080 Terminator::Inclusive(consensus_reader.locked_view()),
1081 false,
1082 |leaf, _, _| {
1083 leaf.view_number() != consensus_reader.locked_view()
1086 },
1087 );
1088 let safety_check = outcome.is_ok();
1089
1090 ensure!(safety_check || liveness_check, {
1091 if let Err(e) = outcome {
1092 broadcast_event(
1093 Event {
1094 view_number,
1095 event: EventType::Error { error: Arc::new(e) },
1096 },
1097 &validation_info.output_event_stream,
1098 )
1099 .await;
1100 }
1101
1102 error!(
1103 "Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked \
1104 view is {:?}",
1105 consensus_reader.high_qc(),
1106 proposal.data,
1107 consensus_reader.locked_view()
1108 )
1109 });
1110 }
1111
1112 broadcast_event(
1114 Event {
1115 view_number,
1116 event: EventType::QuorumProposal {
1117 proposal: proposal.clone(),
1118 sender,
1119 },
1120 },
1121 &validation_info.output_event_stream,
1122 )
1123 .await;
1124
1125 broadcast_event(
1127 Arc::new(HotShotEvent::QuorumProposalValidated(
1128 proposal.clone(),
1129 parent_leaf,
1130 )),
1131 &event_stream,
1132 )
1133 .await;
1134
1135 Ok(())
1136}
1137
1138pub(crate) async fn validate_proposal_view_and_certs<
1145 TYPES: NodeType,
1146 I: NodeImplementation<TYPES>,
1147 V: Versions,
1148>(
1149 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1150 validation_info: &ValidationInfo<TYPES, I, V>,
1151) -> Result<()> {
1152 let view_number = proposal.data.view_number();
1153 ensure!(
1154 view_number >= validation_info.consensus.read().await.cur_view(),
1155 "Proposal is from an older view {:?}",
1156 proposal.data
1157 );
1158
1159 let mut membership = validation_info.membership.clone();
1161 proposal.validate_signature(&membership).await?;
1162
1163 if proposal.data.justify_qc().view_number() != view_number - 1 {
1165 let received_proposal_cert =
1166 proposal
1167 .data
1168 .view_change_evidence()
1169 .clone()
1170 .context(debug!(
1171 "Quorum proposal for view {view_number} needed a timeout or view sync \
1172 certificate, but did not have one",
1173 ))?;
1174
1175 match received_proposal_cert {
1176 ViewChangeEvidence2::Timeout(timeout_cert) => {
1177 ensure!(
1178 timeout_cert.data().view == view_number - 1,
1179 "Timeout certificate for view {view_number} was not for the immediately \
1180 preceding view"
1181 );
1182 let timeout_cert_epoch = timeout_cert.data().epoch();
1183 membership = membership.get_new_epoch(timeout_cert_epoch).await?;
1184
1185 let membership_stake_table = membership.stake_table().await;
1186 let membership_success_threshold = membership.success_threshold().await;
1187
1188 timeout_cert
1189 .is_valid_cert(
1190 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1191 membership_success_threshold,
1192 &validation_info.upgrade_lock,
1193 )
1194 .await
1195 .context(|e| {
1196 warn!("Timeout certificate for view {view_number} was invalid: {e}")
1197 })?;
1198 },
1199 ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1200 ensure!(
1201 view_sync_cert.view_number == view_number,
1202 "View sync cert view number {:?} does not match proposal view number {:?}",
1203 view_sync_cert.view_number,
1204 view_number
1205 );
1206
1207 let view_sync_cert_epoch = view_sync_cert.data().epoch();
1208 membership = membership.get_new_epoch(view_sync_cert_epoch).await?;
1209
1210 let membership_stake_table = membership.stake_table().await;
1211 let membership_success_threshold = membership.success_threshold().await;
1212
1213 view_sync_cert
1215 .is_valid_cert(
1216 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1217 membership_success_threshold,
1218 &validation_info.upgrade_lock,
1219 )
1220 .await
1221 .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1222 },
1223 }
1224 }
1225
1226 {
1229 let epoch = option_epoch_from_block_number::<TYPES>(
1230 proposal.data.epoch().is_some(),
1231 proposal.data.block_header().block_number(),
1232 validation_info.epoch_height,
1233 );
1234 UpgradeCertificate::validate(
1235 proposal.data.upgrade_certificate(),
1236 &validation_info.membership,
1237 epoch,
1238 &validation_info.upgrade_lock,
1239 )
1240 .await?;
1241 }
1242
1243 Ok(())
1244}
1245
1246pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1248 match sender.broadcast_direct(event).await {
1249 Ok(None) => (),
1250 Ok(Some(overflowed)) => {
1251 tracing::error!(
1252 "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1253 );
1254 },
1255 Err(SendError(e)) => {
1256 tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1257 },
1258 }
1259}
1260
1261pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType, V: Versions>(
1264 qc: &QuorumCertificate2<TYPES>,
1265 maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1266 consensus: &OuterConsensus<TYPES>,
1267 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1268 upgrade_lock: &UpgradeLock<TYPES, V>,
1269 epoch_height: u64,
1270) -> Result<()> {
1271 let cert = CertificatePair::new(qc.clone(), maybe_next_epoch_qc.cloned());
1272
1273 let mut epoch_membership = membership_coordinator
1274 .stake_table_for_epoch(cert.epoch())
1275 .await?;
1276
1277 let membership_stake_table = epoch_membership.stake_table().await;
1278 let membership_success_threshold = epoch_membership.success_threshold().await;
1279
1280 if let Err(e) = cert
1281 .qc()
1282 .is_valid_cert(
1283 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1284 membership_success_threshold,
1285 upgrade_lock,
1286 )
1287 .await
1288 {
1289 consensus.read().await.metrics.invalid_qc.update(1);
1290 return Err(warn!("Invalid certificate: {e}"));
1291 }
1292
1293 if upgrade_lock.epochs_enabled(cert.view_number()).await {
1295 if let Some(next_epoch_qc) = cert.verify_next_epoch_qc(epoch_height)? {
1296 epoch_membership = epoch_membership.next_epoch_stake_table().await?;
1297 let membership_next_stake_table = epoch_membership.stake_table().await;
1298 let membership_next_success_threshold = epoch_membership.success_threshold().await;
1299 next_epoch_qc
1300 .is_valid_cert(
1301 &StakeTableEntries::<TYPES>::from(membership_next_stake_table).0,
1302 membership_next_success_threshold,
1303 upgrade_lock,
1304 )
1305 .await
1306 .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1307 }
1308 }
1309
1310 Ok(())
1311}
1312
1313pub async fn validate_light_client_state_update_certificate<TYPES: NodeType, V: Versions>(
1315 state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1316 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1317 upgrade_lock: &UpgradeLock<TYPES, V>,
1318) -> Result<()> {
1319 tracing::debug!("Validating light client state update certificate");
1320
1321 let epoch_membership = membership_coordinator
1322 .membership_for_epoch(state_cert.epoch())
1323 .await?;
1324
1325 let membership_stake_table = epoch_membership.stake_table().await;
1326 let membership_success_threshold = epoch_membership.success_threshold().await;
1327
1328 let mut state_key_map = HashMap::new();
1329 membership_stake_table.into_iter().for_each(|config| {
1330 state_key_map.insert(
1331 config.state_ver_key.clone(),
1332 config.stake_table_entry.stake(),
1333 );
1334 });
1335
1336 let mut accumulated_stake = U256::from(0);
1337 let signed_state_digest = derive_signed_state_digest(
1338 &state_cert.light_client_state,
1339 &state_cert.next_stake_table_state,
1340 &state_cert.auth_root,
1341 );
1342 for (key, sig, sig_v2) in state_cert.signatures.iter() {
1343 if let Some(stake) = state_key_map.get(key) {
1344 accumulated_stake += *stake;
1345 #[allow(clippy::collapsible_else_if)]
1346 if !upgrade_lock
1348 .proposal2_version(TYPES::View::new(state_cert.light_client_state.view_number))
1349 .await
1350 {
1351 if !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1352 key,
1353 sig_v2,
1354 &state_cert.light_client_state,
1355 &state_cert.next_stake_table_state,
1356 ) {
1357 bail!("Invalid light client state update certificate signature");
1358 }
1359 } else {
1360 if !<TYPES::StateSignatureKey as LCV3StateSignatureKey>::verify_state_sig(
1361 key,
1362 sig,
1363 signed_state_digest,
1364 ) || !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1365 key,
1366 sig_v2,
1367 &state_cert.light_client_state,
1368 &state_cert.next_stake_table_state,
1369 ) {
1370 bail!("Invalid light client state update certificate signature");
1371 }
1372 }
1373 } else {
1374 bail!("Invalid light client state update certificate signature");
1375 }
1376 }
1377 if accumulated_stake < membership_success_threshold {
1378 bail!("Light client state update certificate does not meet the success threshold");
1379 }
1380
1381 Ok(())
1382}
1383
1384pub(crate) fn check_qc_state_cert_correspondence<TYPES: NodeType>(
1385 qc: &QuorumCertificate2<TYPES>,
1386 state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1387 epoch_height: u64,
1388) -> bool {
1389 qc.data
1390 .block_number
1391 .is_some_and(|bn| is_epoch_root(bn, epoch_height))
1392 && Some(state_cert.epoch) == qc.data.epoch()
1393 && qc.view_number().u64() == state_cert.light_client_state.view_number
1394}
1395
1396pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1400 target_epoch: Option<TYPES::Epoch>,
1401 vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1402 da_cert: &DaCertificate2<TYPES>,
1403 consensus: &OuterConsensus<TYPES>,
1404 receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1405 cancel_receiver: Receiver<()>,
1406 id: u64,
1407) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1408 tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1409 let maybe_second_vid_share = consensus
1410 .read()
1411 .await
1412 .vid_shares()
1413 .get(&vid_share.data.view_number())
1414 .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1415 .and_then(|epoch_map| epoch_map.get(&target_epoch))
1416 .cloned();
1417 if let Some(second_vid_share) = maybe_second_vid_share {
1418 if (target_epoch == da_cert.epoch()
1419 && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1420 || (target_epoch != da_cert.epoch()
1421 && Some(second_vid_share.data.payload_commitment())
1422 == da_cert.data().next_epoch_payload_commit)
1423 {
1424 return Ok(second_vid_share);
1425 }
1426 }
1427
1428 let receiver = receiver.clone();
1429 let da_cert_clone = da_cert.clone();
1430 let Some(event) = EventDependency::new(
1431 receiver,
1432 cancel_receiver,
1433 format!(
1434 "VoteDependency Second VID share for view {:?}, my id {:?}",
1435 vid_share.data.view_number(),
1436 id
1437 ),
1438 Box::new(move |event| {
1439 let event = event.as_ref();
1440 if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1441 if target_epoch == da_cert_clone.epoch() {
1442 second_vid_share.data.payload_commitment()
1443 == da_cert_clone.data().payload_commit
1444 } else {
1445 Some(second_vid_share.data.payload_commitment())
1446 == da_cert_clone.data().next_epoch_payload_commit
1447 }
1448 } else {
1449 false
1450 }
1451 }),
1452 )
1453 .completed()
1454 .await
1455 else {
1456 return Err(warn!("Error while waiting for the second VID share."));
1457 };
1458 let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1459 return Err(warn!(
1461 "Received event is not VidShareValidated but we checked it earlier. Shouldn't be \
1462 possible."
1463 ));
1464 };
1465 Ok(second_vid_share.clone())
1466}
1467
1468pub async fn broadcast_view_change<TYPES: NodeType>(
1469 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1470 new_view_number: TYPES::View,
1471 epoch: Option<TYPES::Epoch>,
1472 first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
1473) {
1474 let mut broadcast_epoch = epoch;
1475 if let Some((first_epoch_view, first_epoch)) = first_epoch {
1476 if new_view_number == first_epoch_view && broadcast_epoch != Some(first_epoch) {
1477 broadcast_epoch = Some(first_epoch);
1478 }
1479 }
1480 tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1481 broadcast_event(
1482 Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1483 sender,
1484 )
1485 .await
1486}
1487
1488pub fn derive_signed_state_digest(
1489 lc_state: &LightClientState,
1490 next_stake_state: &StakeTableState,
1491 auth_root: &FixedBytes<32>,
1492) -> CircuitField {
1493 let lc_state_sol: LightClientStateSol = (*lc_state).into();
1494 let stake_st_sol: StakeTableStateSol = (*next_stake_state).into();
1495
1496 let res = alloy::primitives::keccak256(
1497 (
1498 lc_state_sol.abi_encode(),
1499 stake_st_sol.abi_encode(),
1500 auth_root.abi_encode(),
1501 )
1502 .abi_encode_packed(),
1503 );
1504 CircuitField::from_be_bytes_mod_order(res.as_ref())
1505}