1use std::{
8 collections::{HashMap, HashSet},
9 sync::Arc,
10 time::Instant,
11};
12
13use alloy::{
14 primitives::{FixedBytes, U256},
15 sol_types::SolValue,
16};
17use ark_ff::PrimeField;
18use async_broadcast::{Receiver, SendError, Sender};
19use async_lock::RwLock;
20use committable::{Commitment, Committable};
21use hotshot_contract_adapter::sol_types::{LightClientStateSol, StakeTableStateSol};
22use hotshot_task::dependency::{Dependency, EventDependency};
23use hotshot_types::{
24 consensus::OuterConsensus,
25 data::{Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2},
26 drb::DrbResult,
27 epoch_membership::EpochMembershipCoordinator,
28 event::{Event, EventType, LeafInfo},
29 light_client::{CircuitField, LightClientState, StakeTableState},
30 message::{Proposal, UpgradeLock},
31 request_response::ProposalRequestPayload,
32 simple_certificate::{
33 DaCertificate2, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
34 QuorumCertificate2, UpgradeCertificate,
35 },
36 simple_vote::HasEpoch,
37 stake_table::StakeTableEntries,
38 traits::{
39 block_contents::BlockHeader,
40 election::Membership,
41 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
42 signature_key::{
43 LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StakeTableEntryType,
44 },
45 storage::Storage,
46 BlockPayload, ValidatedState,
47 },
48 utils::{
49 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_transition_block,
50 option_epoch_from_block_number, Terminator, View, ViewInner,
51 },
52 vote::{Certificate, HasViewNumber},
53};
54use hotshot_utils::anytrace::*;
55use time::OffsetDateTime;
56use tokio::time::timeout;
57use tracing::instrument;
58use vbs::version::StaticVersionType;
59
60use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
61
62#[instrument(skip_all)]
64#[allow(clippy::too_many_arguments)]
65pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
66 qc: &QuorumCertificate2<TYPES>,
67 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
68 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
69 membership_coordinator: EpochMembershipCoordinator<TYPES>,
70 consensus: OuterConsensus<TYPES>,
71 sender_public_key: TYPES::SignatureKey,
72 sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
73 upgrade_lock: &UpgradeLock<TYPES, V>,
74 epoch_height: u64,
75) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
76 let view_number = qc.view_number();
77 let leaf_commit = qc.data.leaf_commit;
78 let signed_proposal_request = ProposalRequestPayload {
81 view_number,
82 key: sender_public_key,
83 };
84
85 let signature = TYPES::SignatureKey::sign(
87 &sender_private_key,
88 signed_proposal_request.commit().as_ref(),
89 )
90 .wrap()
91 .context(error!("Failed to sign proposal. This should never happen."))?;
92
93 tracing::info!("Sending proposal request for view {view_number}");
94
95 broadcast_event(
97 HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
98 &event_sender,
99 )
100 .await;
101
102 let mut rx = event_receiver.clone();
103 let Ok(Some(proposal)) =
105 timeout(REQUEST_TIMEOUT, async move {
107 while let Ok(event) = rx.recv_direct().await {
109 if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
110 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
111 if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
112 return Some(quorum_proposal.clone());
113 }
114 }
115 }
116 None
117 })
118 .await
119 else {
120 bail!("Request for proposal failed");
121 };
122
123 let view_number = proposal.data.view_number();
124 let justify_qc = proposal.data.justify_qc().clone();
125
126 let justify_qc_epoch = justify_qc.data.epoch();
127
128 let epoch_membership = membership_coordinator
129 .stake_table_for_epoch(justify_qc_epoch)
130 .await?;
131 let membership_stake_table = epoch_membership.stake_table().await;
132 let membership_success_threshold = epoch_membership.success_threshold().await;
133
134 justify_qc
135 .is_valid_cert(
136 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
137 membership_success_threshold,
138 upgrade_lock,
139 )
140 .await
141 .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
142
143 let mut consensus_writer = consensus.write().await;
144 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
145 let state = Arc::new(
146 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
147 );
148
149 if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
150 tracing::trace!("{e:?}");
151 }
152 let view = View {
153 view_inner: ViewInner::Leaf {
154 leaf: leaf.commit(),
155 state,
156 delta: None,
157 epoch: leaf.epoch(epoch_height),
158 },
159 };
160 Ok((leaf, view))
161}
162pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
163 membership: &Arc<RwLock<TYPES::Membership>>,
164 epoch: TYPES::Epoch,
165 storage: &I::Storage,
166 drb_result: DrbResult,
167) {
168 tracing::debug!("Calling store_drb_result for epoch {epoch}");
169 if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
170 tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
171 }
172
173 membership.write().await.add_drb_result(epoch, drb_result)
174}
175
176pub(crate) async fn verify_drb_result<
183 TYPES: NodeType,
184 I: NodeImplementation<TYPES>,
185 V: Versions,
186>(
187 proposal: &QuorumProposalWrapper<TYPES>,
188 validation_info: &ValidationInfo<TYPES, I, V>,
189) -> Result<()> {
190 if validation_info.epoch_height == 0
192 || !is_epoch_transition(
193 proposal.block_header().block_number(),
194 validation_info.epoch_height,
195 )
196 {
197 tracing::debug!("Skipping DRB result verification");
198 return Ok(());
199 }
200
201 let epoch = option_epoch_from_block_number::<TYPES>(
205 validation_info
206 .upgrade_lock
207 .epochs_enabled(proposal.view_number())
208 .await,
209 proposal.block_header().block_number(),
210 validation_info.epoch_height,
211 );
212
213 let proposal_result = proposal
214 .next_drb_result()
215 .context(info!("Proposal is missing the next epoch's DRB result."))?;
216
217 if let Some(epoch_val) = epoch {
218 let current_epoch_membership = validation_info
219 .membership
220 .coordinator
221 .stake_table_for_epoch(epoch)
222 .await
223 .context(warn!("No stake table for epoch {}", epoch_val))?;
224
225 let has_stake_current_epoch = current_epoch_membership
226 .has_stake(&validation_info.public_key)
227 .await;
228
229 if has_stake_current_epoch {
230 let computed_result = current_epoch_membership
231 .next_epoch()
232 .await
233 .context(warn!("No stake table for epoch {}", epoch_val + 1))?
234 .get_epoch_drb()
235 .await
236 .clone()
237 .context(warn!("DRB result not found"))?;
238
239 ensure!(
240 proposal_result == computed_result,
241 warn!(
242 "Our calculated DRB result is {computed_result:?}, which does not match the \
243 proposed DRB result of {proposal_result:?}"
244 )
245 );
246 }
247
248 Ok(())
249 } else {
250 Err(error!("Epochs are not available"))
251 }
252}
253
254async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
256 decided_leaf: &Leaf2<TYPES>,
257 epoch_height: u64,
258 membership: &EpochMembershipCoordinator<TYPES>,
259 storage: &I::Storage,
260 consensus: &OuterConsensus<TYPES>,
261) {
262 let decided_leaf = decided_leaf.clone();
263 let decided_block_number = decided_leaf.block_header().block_number();
264
265 if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
267 let next_epoch_number =
268 TYPES::Epoch::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
269
270 let start = Instant::now();
271 if let Err(e) = storage
272 .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
273 .await
274 {
275 tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
276 }
277 tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
278
279 let membership = membership.clone();
280 let decided_block_header = decided_leaf.block_header().clone();
281 let storage = storage.clone();
282 let consensus = consensus.clone();
283
284 let consensus_reader = consensus.read().await;
285
286 drop(consensus_reader);
287
288 tokio::spawn(async move {
289 let membership_clone = membership.clone();
290 let epoch_root_future = tokio::spawn(async move {
291 let start = Instant::now();
292 if let Err(e) = Membership::add_epoch_root(
293 Arc::clone(membership_clone.membership()),
294 next_epoch_number,
295 decided_block_header,
296 )
297 .await
298 {
299 tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
300 }
301 tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
302 });
303
304 let membership_clone = membership.clone();
305
306 let drb_result_future = tokio::spawn(async move {
307 membership_clone
308 .compute_drb_result(next_epoch_number, decided_leaf.clone())
309 .await
310 });
311
312 let (_, drb_result) = tokio::join!(epoch_root_future, drb_result_future);
313
314 let drb_result = match drb_result {
315 Ok(Ok(result)) => result,
316 Err(e) => {
317 tracing::error!("Failed to compute DRB result from decide: {e}");
318 return;
319 },
320 Ok(Err(e)) => {
321 tracing::error!("Failed to compute DRB result from decide: {e}");
322 return;
323 },
324 };
325
326 let start = Instant::now();
327 handle_drb_result::<TYPES, I>(
328 membership.membership(),
329 next_epoch_number,
330 &storage,
331 drb_result,
332 )
333 .await;
334 tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
335 });
336 }
337}
338
339#[derive(Debug)]
341pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
342 pub new_locked_view_number: Option<TYPES::View>,
344
345 pub new_decided_view_number: Option<TYPES::View>,
347
348 pub new_decide_qc: Option<QuorumCertificate2<TYPES>>,
350
351 pub leaf_views: Vec<LeafInfo<TYPES>>,
353
354 pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
356
357 pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
359}
360
361impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
365 fn default() -> Self {
367 Self {
368 new_locked_view_number: None,
369 new_decided_view_number: None,
370 new_decide_qc: None,
371 leaf_views: Vec::new(),
372 included_txns: None,
373 decided_upgrade_cert: None,
374 }
375 }
376}
377
378async fn update_metrics<TYPES: NodeType>(
379 consensus: &OuterConsensus<TYPES>,
380 leaf_views: &[LeafInfo<TYPES>],
381) {
382 let consensus_reader = consensus.read().await;
383 let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
384
385 for leaf_view in leaf_views {
386 let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
387
388 let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
389 tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
390 continue;
391 };
392 consensus_reader
393 .metrics
394 .proposal_to_decide_time
395 .add_point(proposal_to_decide_time as f64);
396 if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
397 consensus_reader
398 .metrics
399 .finalized_bytes
400 .add_point(txn_bytes as f64);
401 }
402 }
403}
404
405#[allow(clippy::too_many_arguments)]
411pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
412 proposal: &QuorumProposalWrapper<TYPES>,
413 consensus: OuterConsensus<TYPES>,
414 existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
415 public_key: &TYPES::SignatureKey,
416 with_epochs: bool,
417 membership: &EpochMembershipCoordinator<TYPES>,
418 storage: &I::Storage,
419) -> LeafChainTraversalOutcome<TYPES> {
420 let mut res = LeafChainTraversalOutcome::default();
421 let consensus_reader = consensus.read().await;
422 let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
423 res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
424
425 let Some(parent_info) = consensus_reader
427 .parent_leaf_info(&proposed_leaf, public_key)
428 .await
429 else {
430 return res;
431 };
432 let Some(grand_parent_info) = consensus_reader
435 .parent_leaf_info(&parent_info.leaf, public_key)
436 .await
437 else {
438 return res;
439 };
440 if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
441 return res;
442 }
443 res.new_decide_qc = Some(parent_info.leaf.justify_qc().clone());
444 let decided_view_number = grand_parent_info.leaf.view_number();
445 res.new_decided_view_number = Some(decided_view_number);
446 let old_anchor_view = consensus_reader.last_decided_view();
448 let mut current_leaf_info = Some(grand_parent_info);
449 let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
450 let mut txns = HashSet::new();
451 while current_leaf_info
452 .as_ref()
453 .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
454 {
455 let info = &mut current_leaf_info.unwrap();
457 if let Some(cert) = info.leaf.upgrade_certificate() {
459 if info.leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
460 if cert.data.decide_by < decided_view_number {
461 tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
462 } else {
463 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
464 res.decided_upgrade_cert = Some(cert.clone());
465 }
466 }
467 }
468
469 if let Some(payload) = consensus_reader
472 .saved_payloads()
473 .get(&info.leaf.view_number())
474 {
475 info.leaf
476 .fill_block_payload_unchecked(payload.as_ref().payload.clone());
477 }
478
479 if let Some(ref payload) = info.leaf.block_payload() {
480 for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
481 txns.insert(txn);
482 }
483 }
484
485 current_leaf_info = consensus_reader
486 .parent_leaf_info(&info.leaf, public_key)
487 .await;
488 res.leaf_views.push(info.clone());
489 }
490
491 if !txns.is_empty() {
492 res.included_txns = Some(txns);
493 }
494
495 if with_epochs && res.new_decided_view_number.is_some() {
496 let Some(first_leaf) = res.leaf_views.first() else {
497 return res;
498 };
499 let epoch_height = consensus_reader.epoch_height;
500 consensus_reader
501 .metrics
502 .last_synced_block_height
503 .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
504 drop(consensus_reader);
505
506 for decided_leaf_info in &res.leaf_views {
507 decide_epoch_root::<TYPES, I>(
508 &decided_leaf_info.leaf,
509 epoch_height,
510 membership,
511 storage,
512 &consensus,
513 )
514 .await;
515 }
516 update_metrics(&consensus, &res.leaf_views).await;
517 }
518
519 res
520}
521
522#[allow(clippy::too_many_arguments)]
554pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>>(
555 proposal: &QuorumProposalWrapper<TYPES>,
556 consensus: OuterConsensus<TYPES>,
557 existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
558 public_key: &TYPES::SignatureKey,
559 with_epochs: bool,
560 membership: &EpochMembershipCoordinator<TYPES>,
561 storage: &I::Storage,
562 epoch_height: u64,
563) -> LeafChainTraversalOutcome<TYPES> {
564 let consensus_reader = consensus.read().await;
565 let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
566 let view_number = proposal.view_number();
567 let parent_view_number = proposal.justify_qc().view_number();
568 let old_anchor_view = consensus_reader.last_decided_view();
569
570 let mut last_view_number_visited = view_number;
571 let mut current_chain_length = 0usize;
572 let mut res = LeafChainTraversalOutcome::default();
573
574 if let Err(e) = consensus_reader.visit_leaf_ancestors(
575 parent_view_number,
576 Terminator::Exclusive(old_anchor_view),
577 true,
578 |leaf, state, delta| {
579 if res.new_decided_view_number.is_none() {
581 if last_view_number_visited == leaf.view_number() + 1 {
583 last_view_number_visited = leaf.view_number();
584
585 current_chain_length += 1;
587
588 if current_chain_length == 2 {
590 res.new_locked_view_number = Some(leaf.view_number());
591 res.new_decide_qc = Some(leaf.justify_qc().clone());
594 } else if current_chain_length == 3 {
595 res.new_decided_view_number = Some(leaf.view_number());
597 }
598 } else {
599 return false;
602 }
603 }
604
605 if let Some(new_decided_view) = res.new_decided_view_number {
607 let mut leaf = leaf.clone();
609
610 if leaf.view_number() == new_decided_view {
612 consensus_reader
613 .metrics
614 .last_synced_block_height
615 .set(usize::try_from(leaf.height()).unwrap_or(0));
616 }
617
618 if let Some(cert) = leaf.upgrade_certificate() {
620 if leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
621 if cert.data.decide_by < view_number {
622 tracing::warn!(
623 "Failed to decide an upgrade certificate in time. Ignoring."
624 );
625 } else {
626 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
627 res.decided_upgrade_cert = Some(cert.clone());
628 }
629 }
630 }
631 if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
634 leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
635 }
636
637 let vid_share = consensus_reader
640 .vid_shares()
641 .get(&leaf.view_number())
642 .and_then(|key_map| key_map.get(public_key))
643 .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
644 .map(|prop| prop.data.clone());
645
646 let state_cert = if leaf.with_epoch
647 && is_epoch_root(
648 leaf.block_header().block_number(),
649 consensus_reader.epoch_height,
650 ) {
651 match consensus_reader.state_cert() {
652 Some(state_cert)
654 if state_cert.light_client_state.view_number
655 == leaf.view_number().u64() =>
656 {
657 Some(state_cert.clone())
658 },
659 _ => None,
660 }
661 } else {
662 None
663 };
664
665 res.leaf_views.push(LeafInfo::new(
667 leaf.clone(),
668 Arc::clone(&state),
669 delta.clone(),
670 vid_share,
671 state_cert,
672 ));
673 if let Some(ref payload) = leaf.block_payload() {
674 res.included_txns = Some(
675 payload
676 .transaction_commitments(leaf.block_header().metadata())
677 .into_iter()
678 .collect::<HashSet<_>>(),
679 );
680 }
681 }
682 true
683 },
684 ) {
685 tracing::debug!("Leaf ascension failed; error={e}");
686 }
687
688 let epoch_height = consensus_reader.epoch_height;
689 drop(consensus_reader);
690
691 if with_epochs && res.new_decided_view_number.is_some() {
692 for decided_leaf_info in &res.leaf_views {
693 decide_epoch_root::<TYPES, I>(
694 &decided_leaf_info.leaf,
695 epoch_height,
696 membership,
697 storage,
698 &consensus,
699 )
700 .await;
701 }
702 }
703
704 res
705}
706
707#[instrument(skip_all)]
709#[allow(clippy::too_many_arguments)]
710pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
711 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
712 event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
713 membership: EpochMembershipCoordinator<TYPES>,
714 public_key: TYPES::SignatureKey,
715 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
716 consensus: OuterConsensus<TYPES>,
717 upgrade_lock: &UpgradeLock<TYPES, V>,
718 parent_qc: &QuorumCertificate2<TYPES>,
719 epoch_height: u64,
720) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
721 let consensus_reader = consensus.read().await;
722 let vsm_contains_parent_view = consensus_reader
723 .validated_state_map()
724 .contains_key(&parent_qc.view_number());
725 drop(consensus_reader);
726
727 if !vsm_contains_parent_view {
728 let _ = fetch_proposal(
729 parent_qc,
730 event_sender.clone(),
731 event_receiver.clone(),
732 membership,
733 consensus.clone(),
734 public_key.clone(),
735 private_key.clone(),
736 upgrade_lock,
737 epoch_height,
738 )
739 .await
740 .context(info!("Failed to fetch proposal"))?;
741 }
742
743 let consensus_reader = consensus.read().await;
744 let parent_view = consensus_reader
745 .validated_state_map()
746 .get(&parent_qc.view_number())
747 .context(debug!(
748 "Couldn't find parent view in state map, waiting for replica to see proposal; \
749 parent_view_number: {}",
750 *parent_qc.view_number()
751 ))?;
752
753 let (leaf_commitment, state) = parent_view.leaf_and_state().context(info!(
754 "Parent of high QC points to a view without a proposal; parent_view_number: {}, \
755 parent_view {:?}",
756 *parent_qc.view_number(),
757 parent_view
758 ))?;
759
760 if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
761 tracing::debug!(
763 "They don't equal: {:?} {:?}",
764 leaf_commitment,
765 consensus_reader.high_qc().data().leaf_commit
766 );
767 }
768
769 let leaf = consensus_reader
770 .saved_leaves()
771 .get(&leaf_commitment)
772 .context(info!("Failed to find high QC of parent"))?;
773
774 Ok((leaf.clone(), Arc::clone(state)))
775}
776
777pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
778 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
779 validation_info: &ValidationInfo<TYPES, I, V>,
780) -> Result<()> {
781 let in_transition_epoch = proposal
782 .data
783 .justify_qc()
784 .data
785 .block_number
786 .is_some_and(|bn| {
787 !is_transition_block(bn, validation_info.epoch_height)
788 && is_epoch_transition(bn, validation_info.epoch_height)
789 && bn % validation_info.epoch_height != 0
790 });
791 let justify_qc = proposal.data.justify_qc();
792 let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
793 if !in_transition_epoch {
794 tracing::debug!(
795 "Storing high QC for view {:?} and height {:?}",
796 justify_qc.view_number(),
797 justify_qc.data.block_number
798 );
799 if let Err(e) = validation_info
800 .storage
801 .update_high_qc2(justify_qc.clone())
802 .await
803 {
804 bail!("Failed to store High QC, not voting; error = {e:?}");
805 }
806 if justify_qc
807 .data
808 .block_number
809 .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
810 {
811 let Some(state_cert) = proposal.data.state_cert() else {
812 bail!("Epoch root QC has no state cert, not voting!");
813 };
814 if let Err(e) = validation_info
815 .storage
816 .update_state_cert(state_cert.clone())
817 .await
818 {
819 bail!(
820 "Failed to store the light client state update certificate, not voting; error \
821 = {:?}",
822 e
823 );
824 }
825 validation_info
826 .consensus
827 .write()
828 .await
829 .update_state_cert(state_cert.clone())?;
830 }
831 if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
832 if let Err(e) = validation_info
833 .storage
834 .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
835 .await
836 {
837 bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
838 }
839 }
840 }
841 let mut consensus_writer = validation_info.consensus.write().await;
842 if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
843 if justify_qc
844 .data
845 .block_number
846 .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
847 {
848 consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
849 consensus_writer
850 .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
851 return Ok(());
852 }
853 consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
854 }
855 consensus_writer.update_high_qc(justify_qc.clone())?;
856
857 Ok(())
858}
859
860async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
861 validation_info: &ValidationInfo<TYPES, I, V>,
862) -> Option<(
863 QuorumCertificate2<TYPES>,
864 NextEpochQuorumCertificate2<TYPES>,
865)> {
866 validation_info
867 .consensus
868 .read()
869 .await
870 .transition_qc()
871 .cloned()
872}
873
874pub(crate) async fn validate_epoch_transition_qc<
875 TYPES: NodeType,
876 I: NodeImplementation<TYPES>,
877 V: Versions,
878>(
879 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
880 validation_info: &ValidationInfo<TYPES, I, V>,
881) -> Result<()> {
882 let proposed_qc = proposal.data.justify_qc();
883 let Some(qc_block_number) = proposed_qc.data().block_number else {
884 bail!("Justify QC has no block number");
885 };
886 if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
887 || qc_block_number % validation_info.epoch_height == 0
888 {
889 return Ok(());
890 }
891 let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
892 bail!("Next epoch justify QC is not present");
893 };
894 ensure!(
895 next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
896 "Next epoch QC has different leaf commit to justify QC"
897 );
898
899 if is_transition_block(qc_block_number, validation_info.epoch_height) {
900 ensure!(
902 transition_qc(validation_info)
903 .await
904 .is_none_or(|(qc, _)| qc.view_number() <= proposed_qc.view_number()),
905 "Proposed transition qc must have view number greater than or equal to previous \
906 transition QC"
907 );
908
909 validation_info
910 .consensus
911 .write()
912 .await
913 .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
914 update_high_qc(proposal, validation_info).await?;
916 } else {
917 ensure!(
919 transition_qc(validation_info)
920 .await
921 .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
922 "Transition block must have view number greater than previous transition QC"
923 );
924 ensure!(
925 proposal.data.view_change_evidence().is_none(),
926 "Second to last block and last block of epoch must directly extend previous block, Qc \
927 Block number: {qc_block_number}, Proposal Block number: {}",
928 proposal.data.block_header().block_number()
929 );
930 ensure!(
931 proposed_qc.view_number() + 1 == proposal.data.view_number()
932 || transition_qc(validation_info)
933 .await
934 .is_some_and(|(qc, _)| &qc == proposed_qc),
935 "Transition proposals must extend the previous view directly, or extend the previous \
936 transition block"
937 );
938 }
939 Ok(())
940}
941
942#[allow(clippy::too_many_lines)]
949#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
950pub(crate) async fn validate_proposal_safety_and_liveness<
951 TYPES: NodeType,
952 I: NodeImplementation<TYPES>,
953 V: Versions,
954>(
955 proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
956 parent_leaf: Leaf2<TYPES>,
957 validation_info: &ValidationInfo<TYPES, I, V>,
958 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
959 sender: TYPES::SignatureKey,
960) -> Result<()> {
961 let view_number = proposal.data.view_number();
962
963 let mut valid_epoch_transition = false;
964 if validation_info
965 .upgrade_lock
966 .version(proposal.data.justify_qc().view_number())
967 .await
968 .is_ok_and(|v| v >= V::Epochs::VERSION)
969 {
970 let Some(block_number) = proposal.data.justify_qc().data.block_number else {
971 bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
972 };
973 if is_epoch_transition(block_number, validation_info.epoch_height) {
974 validate_epoch_transition_qc(&proposal, validation_info).await?;
975 valid_epoch_transition = true;
976 }
977 }
978
979 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
980 ensure!(
981 proposed_leaf.parent_commitment() == parent_leaf.commit(),
982 "Proposed leaf does not extend the parent leaf."
983 );
984 let proposal_epoch = option_epoch_from_block_number::<TYPES>(
985 validation_info
986 .upgrade_lock
987 .epochs_enabled(view_number)
988 .await,
989 proposed_leaf.height(),
990 validation_info.epoch_height,
991 );
992
993 let state = Arc::new(
994 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
995 );
996
997 {
998 let mut consensus_writer = validation_info.consensus.write().await;
999 if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
1000 tracing::trace!("{e:?}");
1001 }
1002
1003 if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
1006 tracing::debug!("Internal proposal update failed; error = {e:#}");
1007 };
1008 }
1009
1010 UpgradeCertificate::validate(
1011 proposal.data.upgrade_certificate(),
1012 &validation_info.membership,
1013 proposal_epoch,
1014 &validation_info.upgrade_lock,
1015 )
1016 .await?;
1017
1018 proposed_leaf
1020 .extends_upgrade(
1021 &parent_leaf,
1022 &validation_info.upgrade_lock.decided_upgrade_certificate,
1023 )
1024 .await?;
1025
1026 let justify_qc = proposal.data.justify_qc().clone();
1027 {
1031 let consensus_reader = validation_info.consensus.read().await;
1032 let justify_qc_epoch = option_epoch_from_block_number::<TYPES>(
1037 validation_info
1038 .upgrade_lock
1039 .epochs_enabled(view_number)
1040 .await,
1041 parent_leaf.height(),
1042 validation_info.epoch_height,
1043 );
1044 ensure!(
1045 proposal_epoch == justify_qc_epoch
1046 || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
1047 {
1048 error!(
1049 "Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify \
1050 QC leaf is {parent_leaf:?}"
1051 )
1052 }
1053 );
1054
1055 if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
1057 && validation_info
1058 .upgrade_lock
1059 .epochs_enabled(view_number)
1060 .await
1061 {
1062 ensure!(
1063 proposal.data.next_epoch_justify_qc().is_some(),
1064 "Epoch transition proposal does not include the next epoch justify QC. Do not \
1065 vote!"
1066 );
1067 }
1068
1069 let liveness_check =
1071 justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1072
1073 let outcome = consensus_reader.visit_leaf_ancestors(
1076 justify_qc.view_number(),
1077 Terminator::Inclusive(consensus_reader.locked_view()),
1078 false,
1079 |leaf, _, _| {
1080 leaf.view_number() != consensus_reader.locked_view()
1083 },
1084 );
1085 let safety_check = outcome.is_ok();
1086
1087 ensure!(safety_check || liveness_check, {
1088 if let Err(e) = outcome {
1089 broadcast_event(
1090 Event {
1091 view_number,
1092 event: EventType::Error { error: Arc::new(e) },
1093 },
1094 &validation_info.output_event_stream,
1095 )
1096 .await;
1097 }
1098
1099 error!(
1100 "Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked \
1101 view is {:?}",
1102 consensus_reader.high_qc(),
1103 proposal.data,
1104 consensus_reader.locked_view()
1105 )
1106 });
1107 }
1108
1109 broadcast_event(
1111 Event {
1112 view_number,
1113 event: EventType::QuorumProposal {
1114 proposal: proposal.clone(),
1115 sender,
1116 },
1117 },
1118 &validation_info.output_event_stream,
1119 )
1120 .await;
1121
1122 broadcast_event(
1124 Arc::new(HotShotEvent::QuorumProposalValidated(
1125 proposal.clone(),
1126 parent_leaf,
1127 )),
1128 &event_stream,
1129 )
1130 .await;
1131
1132 Ok(())
1133}
1134
1135pub(crate) async fn validate_proposal_view_and_certs<
1142 TYPES: NodeType,
1143 I: NodeImplementation<TYPES>,
1144 V: Versions,
1145>(
1146 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1147 validation_info: &ValidationInfo<TYPES, I, V>,
1148) -> Result<()> {
1149 let view_number = proposal.data.view_number();
1150 ensure!(
1151 view_number >= validation_info.consensus.read().await.cur_view(),
1152 "Proposal is from an older view {:?}",
1153 proposal.data
1154 );
1155
1156 let mut membership = validation_info.membership.clone();
1158 proposal.validate_signature(&membership).await?;
1159
1160 if proposal.data.justify_qc().view_number() != view_number - 1 {
1162 let received_proposal_cert =
1163 proposal
1164 .data
1165 .view_change_evidence()
1166 .clone()
1167 .context(debug!(
1168 "Quorum proposal for view {view_number} needed a timeout or view sync \
1169 certificate, but did not have one",
1170 ))?;
1171
1172 match received_proposal_cert {
1173 ViewChangeEvidence2::Timeout(timeout_cert) => {
1174 ensure!(
1175 timeout_cert.data().view == view_number - 1,
1176 "Timeout certificate for view {view_number} was not for the immediately \
1177 preceding view"
1178 );
1179 let timeout_cert_epoch = timeout_cert.data().epoch();
1180 membership = membership.get_new_epoch(timeout_cert_epoch).await?;
1181
1182 let membership_stake_table = membership.stake_table().await;
1183 let membership_success_threshold = membership.success_threshold().await;
1184
1185 timeout_cert
1186 .is_valid_cert(
1187 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1188 membership_success_threshold,
1189 &validation_info.upgrade_lock,
1190 )
1191 .await
1192 .context(|e| {
1193 warn!("Timeout certificate for view {view_number} was invalid: {e}")
1194 })?;
1195 },
1196 ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1197 ensure!(
1198 view_sync_cert.view_number == view_number,
1199 "View sync cert view number {:?} does not match proposal view number {:?}",
1200 view_sync_cert.view_number,
1201 view_number
1202 );
1203
1204 let view_sync_cert_epoch = view_sync_cert.data().epoch();
1205 membership = membership.get_new_epoch(view_sync_cert_epoch).await?;
1206
1207 let membership_stake_table = membership.stake_table().await;
1208 let membership_success_threshold = membership.success_threshold().await;
1209
1210 view_sync_cert
1212 .is_valid_cert(
1213 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1214 membership_success_threshold,
1215 &validation_info.upgrade_lock,
1216 )
1217 .await
1218 .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1219 },
1220 }
1221 }
1222
1223 {
1226 let epoch = option_epoch_from_block_number::<TYPES>(
1227 proposal.data.epoch().is_some(),
1228 proposal.data.block_header().block_number(),
1229 validation_info.epoch_height,
1230 );
1231 UpgradeCertificate::validate(
1232 proposal.data.upgrade_certificate(),
1233 &validation_info.membership,
1234 epoch,
1235 &validation_info.upgrade_lock,
1236 )
1237 .await?;
1238 }
1239
1240 Ok(())
1241}
1242
1243pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1245 match sender.broadcast_direct(event).await {
1246 Ok(None) => (),
1247 Ok(Some(overflowed)) => {
1248 tracing::error!(
1249 "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1250 );
1251 },
1252 Err(SendError(e)) => {
1253 tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1254 },
1255 }
1256}
1257
1258pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType, V: Versions>(
1261 qc: &QuorumCertificate2<TYPES>,
1262 maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1263 consensus: &OuterConsensus<TYPES>,
1264 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1265 upgrade_lock: &UpgradeLock<TYPES, V>,
1266 epoch_height: u64,
1267) -> Result<()> {
1268 let mut epoch_membership = membership_coordinator
1269 .stake_table_for_epoch(qc.data.epoch)
1270 .await?;
1271
1272 let membership_stake_table = epoch_membership.stake_table().await;
1273 let membership_success_threshold = epoch_membership.success_threshold().await;
1274
1275 if let Err(e) = qc
1276 .is_valid_cert(
1277 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1278 membership_success_threshold,
1279 upgrade_lock,
1280 )
1281 .await
1282 {
1283 consensus.read().await.metrics.invalid_qc.update(1);
1284 return Err(warn!("Invalid certificate: {e}"));
1285 }
1286
1287 if upgrade_lock.epochs_enabled(qc.view_number()).await {
1288 ensure!(
1289 qc.data.block_number.is_some(),
1290 "QC for epoch {:?} has no block number",
1291 qc.data.epoch
1292 );
1293 }
1294
1295 if qc
1296 .data
1297 .block_number
1298 .is_some_and(|b| is_epoch_transition(b, epoch_height))
1299 {
1300 ensure!(
1301 maybe_next_epoch_qc.is_some(),
1302 error!("Received High QC for the transition block but not the next epoch QC")
1303 );
1304 }
1305
1306 if let Some(next_epoch_qc) = maybe_next_epoch_qc {
1307 if qc.view_number() != next_epoch_qc.view_number() || qc.data != *next_epoch_qc.data {
1309 bail!("Next epoch qc exists but it's not equal with qc.");
1310 }
1311 epoch_membership = epoch_membership.next_epoch_stake_table().await?;
1312 let membership_next_stake_table = epoch_membership.stake_table().await;
1313 let membership_next_success_threshold = epoch_membership.success_threshold().await;
1314
1315 next_epoch_qc
1317 .is_valid_cert(
1318 &StakeTableEntries::<TYPES>::from(membership_next_stake_table).0,
1319 membership_next_success_threshold,
1320 upgrade_lock,
1321 )
1322 .await
1323 .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1324 }
1325 Ok(())
1326}
1327
1328pub async fn validate_light_client_state_update_certificate<TYPES: NodeType>(
1330 state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1331 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1332) -> Result<()> {
1333 tracing::debug!("Validating light client state update certificate");
1334
1335 let epoch_membership = membership_coordinator
1336 .membership_for_epoch(state_cert.epoch())
1337 .await?;
1338
1339 let membership_stake_table = epoch_membership.stake_table().await;
1340 let membership_success_threshold = epoch_membership.success_threshold().await;
1341
1342 let mut state_key_map = HashMap::new();
1343 membership_stake_table.into_iter().for_each(|config| {
1344 state_key_map.insert(
1345 config.state_ver_key.clone(),
1346 config.stake_table_entry.stake(),
1347 );
1348 });
1349
1350 let mut accumulated_stake = U256::from(0);
1351 let signed_state_digest = derive_signed_state_digest(
1352 &state_cert.light_client_state,
1353 &state_cert.next_stake_table_state,
1354 &state_cert.auth_root,
1355 );
1356 for (key, sig, sig_v2) in state_cert.signatures.iter() {
1357 if let Some(stake) = state_key_map.get(key) {
1358 accumulated_stake += *stake;
1359 if !<TYPES::StateSignatureKey as LCV3StateSignatureKey>::verify_state_sig(
1360 key,
1361 sig,
1362 signed_state_digest,
1363 ) || !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1364 key,
1365 sig_v2,
1366 &state_cert.light_client_state,
1367 &state_cert.next_stake_table_state,
1368 ) {
1369 bail!("Invalid light client state update certificate signature");
1370 }
1371 } else {
1372 bail!("Invalid light client state update certificate signature");
1373 }
1374 }
1375 if accumulated_stake < membership_success_threshold {
1376 bail!("Light client state update certificate does not meet the success threshold");
1377 }
1378
1379 Ok(())
1380}
1381
1382pub(crate) fn check_qc_state_cert_correspondence<TYPES: NodeType>(
1383 qc: &QuorumCertificate2<TYPES>,
1384 state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1385 epoch_height: u64,
1386) -> bool {
1387 qc.data
1388 .block_number
1389 .is_some_and(|bn| is_epoch_root(bn, epoch_height))
1390 && Some(state_cert.epoch) == qc.data.epoch()
1391 && qc.view_number().u64() == state_cert.light_client_state.view_number
1392}
1393
1394pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1398 target_epoch: Option<TYPES::Epoch>,
1399 vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1400 da_cert: &DaCertificate2<TYPES>,
1401 consensus: &OuterConsensus<TYPES>,
1402 receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1403 cancel_receiver: Receiver<()>,
1404 id: u64,
1405) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1406 tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1407 let maybe_second_vid_share = consensus
1408 .read()
1409 .await
1410 .vid_shares()
1411 .get(&vid_share.data.view_number())
1412 .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1413 .and_then(|epoch_map| epoch_map.get(&target_epoch))
1414 .cloned();
1415 if let Some(second_vid_share) = maybe_second_vid_share {
1416 if (target_epoch == da_cert.epoch()
1417 && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1418 || (target_epoch != da_cert.epoch()
1419 && Some(second_vid_share.data.payload_commitment())
1420 == da_cert.data().next_epoch_payload_commit)
1421 {
1422 return Ok(second_vid_share);
1423 }
1424 }
1425
1426 let receiver = receiver.clone();
1427 let da_cert_clone = da_cert.clone();
1428 let Some(event) = EventDependency::new(
1429 receiver,
1430 cancel_receiver,
1431 format!(
1432 "VoteDependency Second VID share for view {:?}, my id {:?}",
1433 vid_share.data.view_number(),
1434 id
1435 ),
1436 Box::new(move |event| {
1437 let event = event.as_ref();
1438 if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1439 if target_epoch == da_cert_clone.epoch() {
1440 second_vid_share.data.payload_commitment()
1441 == da_cert_clone.data().payload_commit
1442 } else {
1443 Some(second_vid_share.data.payload_commitment())
1444 == da_cert_clone.data().next_epoch_payload_commit
1445 }
1446 } else {
1447 false
1448 }
1449 }),
1450 )
1451 .completed()
1452 .await
1453 else {
1454 return Err(warn!("Error while waiting for the second VID share."));
1455 };
1456 let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1457 return Err(warn!(
1459 "Received event is not VidShareValidated but we checked it earlier. Shouldn't be \
1460 possible."
1461 ));
1462 };
1463 Ok(second_vid_share.clone())
1464}
1465
1466pub async fn broadcast_view_change<TYPES: NodeType>(
1467 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1468 new_view_number: TYPES::View,
1469 epoch: Option<TYPES::Epoch>,
1470 first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
1471) {
1472 let mut broadcast_epoch = epoch;
1473 if let Some((first_epoch_view, first_epoch)) = first_epoch {
1474 if new_view_number == first_epoch_view && broadcast_epoch != Some(first_epoch) {
1475 broadcast_epoch = Some(first_epoch);
1476 }
1477 }
1478 tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1479 broadcast_event(
1480 Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1481 sender,
1482 )
1483 .await
1484}
1485
1486pub fn derive_signed_state_digest(
1487 lc_state: &LightClientState,
1488 next_stake_state: &StakeTableState,
1489 auth_root: &FixedBytes<32>,
1490) -> CircuitField {
1491 let lc_state_sol: LightClientStateSol = (*lc_state).into();
1492 let stake_st_sol: StakeTableStateSol = (*next_stake_state).into();
1493
1494 let res = alloy::primitives::keccak256(
1495 (
1496 lc_state_sol.abi_encode(),
1497 stake_st_sol.abi_encode(),
1498 auth_root.abi_encode(),
1499 )
1500 .abi_encode_packed(),
1501 );
1502 CircuitField::from_be_bytes_mod_order(res.as_ref())
1503}