1use std::{
8 collections::{HashMap, HashSet},
9 sync::Arc,
10 time::Instant,
11};
12
13use alloy::primitives::U256;
14use async_broadcast::{Receiver, SendError, Sender};
15use async_lock::RwLock;
16use committable::{Commitment, Committable};
17use hotshot_task::dependency::{Dependency, EventDependency};
18use hotshot_types::{
19 consensus::OuterConsensus,
20 data::{Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2},
21 drb::{DrbInput, DrbResult},
22 epoch_membership::EpochMembershipCoordinator,
23 event::{Event, EventType, LeafInfo},
24 message::{Proposal, UpgradeLock},
25 request_response::ProposalRequestPayload,
26 simple_certificate::{
27 DaCertificate2, LightClientStateUpdateCertificate, NextEpochQuorumCertificate2,
28 QuorumCertificate2, UpgradeCertificate,
29 },
30 simple_vote::HasEpoch,
31 stake_table::StakeTableEntries,
32 traits::{
33 block_contents::BlockHeader,
34 election::Membership,
35 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
36 signature_key::{SignatureKey, StakeTableEntryType, StateSignatureKey},
37 storage::{load_drb_progress_fn, store_drb_progress_fn, Storage},
38 BlockPayload, ValidatedState,
39 },
40 utils::{
41 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_transition_block,
42 option_epoch_from_block_number, Terminator, View, ViewInner,
43 },
44 vote::{Certificate, HasViewNumber},
45};
46use hotshot_utils::anytrace::*;
47use time::OffsetDateTime;
48use tokio::time::timeout;
49use tracing::instrument;
50use vbs::version::StaticVersionType;
51
52use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
53
54#[instrument(skip_all)]
56#[allow(clippy::too_many_arguments)]
57pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
58 qc: &QuorumCertificate2<TYPES>,
59 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
60 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
61 membership_coordinator: EpochMembershipCoordinator<TYPES>,
62 consensus: OuterConsensus<TYPES>,
63 sender_public_key: TYPES::SignatureKey,
64 sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
65 upgrade_lock: &UpgradeLock<TYPES, V>,
66 epoch_height: u64,
67) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
68 let view_number = qc.view_number();
69 let leaf_commit = qc.data.leaf_commit;
70 let signed_proposal_request = ProposalRequestPayload {
73 view_number,
74 key: sender_public_key,
75 };
76
77 let signature = TYPES::SignatureKey::sign(
79 &sender_private_key,
80 signed_proposal_request.commit().as_ref(),
81 )
82 .wrap()
83 .context(error!("Failed to sign proposal. This should never happen."))?;
84
85 tracing::info!("Sending proposal request for view {view_number}");
86
87 broadcast_event(
89 HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
90 &event_sender,
91 )
92 .await;
93
94 let mut rx = event_receiver.clone();
95 let Ok(Some(proposal)) =
97 timeout(REQUEST_TIMEOUT, async move {
99 while let Ok(event) = rx.recv_direct().await {
101 if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
102 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
103 if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
104 return Some(quorum_proposal.clone());
105 }
106 }
107 }
108 None
109 })
110 .await
111 else {
112 bail!("Request for proposal failed");
113 };
114
115 let view_number = proposal.data.view_number();
116 let justify_qc = proposal.data.justify_qc().clone();
117
118 let justify_qc_epoch = justify_qc.data.epoch();
119
120 let epoch_membership = membership_coordinator
121 .stake_table_for_epoch(justify_qc_epoch)
122 .await?;
123 let membership_stake_table = epoch_membership.stake_table().await;
124 let membership_success_threshold = epoch_membership.success_threshold().await;
125
126 justify_qc
127 .is_valid_cert(
128 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
129 membership_success_threshold,
130 upgrade_lock,
131 )
132 .await
133 .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
134
135 let mut consensus_writer = consensus.write().await;
136 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
137 let state = Arc::new(
138 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
139 );
140
141 if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
142 tracing::trace!("{e:?}");
143 }
144 let view = View {
145 view_inner: ViewInner::Leaf {
146 leaf: leaf.commit(),
147 state,
148 delta: None,
149 epoch: leaf.epoch(epoch_height),
150 },
151 };
152 Ok((leaf, view))
153}
154pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
155 membership: &Arc<RwLock<TYPES::Membership>>,
156 epoch: TYPES::Epoch,
157 storage: &I::Storage,
158 consensus: &OuterConsensus<TYPES>,
159 drb_result: DrbResult,
160) {
161 let mut consensus_writer = consensus.write().await;
162 consensus_writer.drb_results.store_result(epoch, drb_result);
163 drop(consensus_writer);
164 tracing::debug!("Calling store_drb_result for epoch {epoch}");
165 if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
166 tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
167 }
168
169 membership.write().await.add_drb_result(epoch, drb_result)
170}
171
172async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
174 decided_leaf: &Leaf2<TYPES>,
175 epoch_height: u64,
176 membership: &Arc<RwLock<TYPES::Membership>>,
177 storage: &I::Storage,
178 consensus: &OuterConsensus<TYPES>,
179 upgrade_lock: &UpgradeLock<TYPES, V>,
180) {
181 let decided_block_number = decided_leaf.block_header().block_number();
182 let view_number = decided_leaf.view_number();
183
184 if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
186 let next_epoch_number =
187 TYPES::Epoch::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
188
189 let start = Instant::now();
190 if let Err(e) = storage
191 .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
192 .await
193 {
194 tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
195 }
196 tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
197
198 let Ok(drb_seed_input_vec) = bincode::serialize(&decided_leaf.justify_qc().signatures)
199 else {
200 tracing::error!("Failed to serialize the QC signature.");
201 return;
202 };
203
204 let membership = membership.clone();
205 let decided_block_header = decided_leaf.block_header().clone();
206 let storage = storage.clone();
207 let store_drb_progress_fn = store_drb_progress_fn(storage.clone());
208 let load_drb_progress_fn = load_drb_progress_fn(storage.clone());
209 let consensus = consensus.clone();
210
211 let consensus_reader = consensus.read().await;
212 let difficulty_level = if upgrade_lock.upgraded_drb_and_header(view_number).await {
213 consensus_reader.drb_upgrade_difficulty
214 } else {
215 consensus_reader.drb_difficulty
216 };
217
218 drop(consensus_reader);
219
220 tokio::spawn(async move {
221 let membership_clone = membership.clone();
222 let epoch_root_future = tokio::spawn(async move {
223 let start = Instant::now();
224 if let Err(e) = Membership::add_epoch_root(
225 Arc::clone(&membership_clone),
226 next_epoch_number,
227 decided_block_header,
228 )
229 .await
230 {
231 tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
232 }
233 tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
234 });
235
236 let mut consensus_writer = consensus.write().await;
237 consensus_writer
238 .drb_results
239 .garbage_collect(next_epoch_number);
240 drop(consensus_writer);
241
242 let drb_result_future = tokio::spawn(async move {
243 let start = Instant::now();
244 let mut drb_seed_input = [0u8; 32];
245 let len = drb_seed_input_vec.len().min(32);
246 drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
247
248 let drb_input = DrbInput {
249 epoch: *next_epoch_number,
250 iteration: 0,
251 value: drb_seed_input,
252 difficulty_level,
253 };
254
255 let drb_result = hotshot_types::drb::compute_drb_result(
256 drb_input,
257 store_drb_progress_fn,
258 load_drb_progress_fn,
259 )
260 .await;
261
262 tracing::info!("Time taken to calculate drb result: {:?}", start.elapsed());
263
264 drb_result
265 });
266
267 let (_, drb_result) = tokio::join!(epoch_root_future, drb_result_future);
268
269 let drb_result = match drb_result {
270 Ok(result) => result,
271 Err(e) => {
272 tracing::error!("Failed to compute DRB result: {e}");
273 return;
274 },
275 };
276
277 let start = Instant::now();
278 handle_drb_result::<TYPES, I>(
279 &membership,
280 next_epoch_number,
281 &storage,
282 &consensus,
283 drb_result,
284 )
285 .await;
286 tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
287 });
288 }
289}
290
291#[derive(Debug)]
293pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
294 pub new_locked_view_number: Option<TYPES::View>,
296
297 pub new_decided_view_number: Option<TYPES::View>,
299
300 pub new_decide_qc: Option<QuorumCertificate2<TYPES>>,
302
303 pub leaf_views: Vec<LeafInfo<TYPES>>,
305
306 pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
308
309 pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
311}
312
313impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
317 fn default() -> Self {
319 Self {
320 new_locked_view_number: None,
321 new_decided_view_number: None,
322 new_decide_qc: None,
323 leaf_views: Vec::new(),
324 included_txns: None,
325 decided_upgrade_cert: None,
326 }
327 }
328}
329
330async fn update_metrics<TYPES: NodeType>(
331 consensus: &OuterConsensus<TYPES>,
332 leaf_views: &[LeafInfo<TYPES>],
333) {
334 let consensus_reader = consensus.read().await;
335 let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
336
337 for leaf_view in leaf_views {
338 let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
339
340 let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
341 tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
342 continue;
343 };
344 consensus_reader
345 .metrics
346 .proposal_to_decide_time
347 .add_point(proposal_to_decide_time as f64);
348 if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
349 consensus_reader
350 .metrics
351 .finalized_bytes
352 .add_point(txn_bytes as f64);
353 }
354 }
355}
356
357#[allow(clippy::too_many_arguments)]
363pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
364 proposal: &QuorumProposalWrapper<TYPES>,
365 consensus: OuterConsensus<TYPES>,
366 existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
367 public_key: &TYPES::SignatureKey,
368 with_epochs: bool,
369 membership: &EpochMembershipCoordinator<TYPES>,
370 storage: &I::Storage,
371 upgrade_lock: &UpgradeLock<TYPES, V>,
372) -> LeafChainTraversalOutcome<TYPES> {
373 let mut res = LeafChainTraversalOutcome::default();
374 let consensus_reader = consensus.read().await;
375 let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
376 res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
377
378 let Some(parent_info) = consensus_reader
380 .parent_leaf_info(&proposed_leaf, public_key)
381 .await
382 else {
383 return res;
384 };
385 let Some(grand_parent_info) = consensus_reader
388 .parent_leaf_info(&parent_info.leaf, public_key)
389 .await
390 else {
391 return res;
392 };
393 if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
394 return res;
395 }
396 res.new_decide_qc = Some(parent_info.leaf.justify_qc().clone());
397 let decided_view_number = grand_parent_info.leaf.view_number();
398 res.new_decided_view_number = Some(decided_view_number);
399 let old_anchor_view = consensus_reader.last_decided_view();
401 let mut current_leaf_info = Some(grand_parent_info);
402 let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
403 let mut txns = HashSet::new();
404 while current_leaf_info
405 .as_ref()
406 .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
407 {
408 let info = &mut current_leaf_info.unwrap();
410 if let Some(cert) = info.leaf.upgrade_certificate() {
412 if info.leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
413 if cert.data.decide_by < decided_view_number {
414 tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
415 } else {
416 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
417 res.decided_upgrade_cert = Some(cert.clone());
418 }
419 }
420 }
421
422 if let Some(payload) = consensus_reader
425 .saved_payloads()
426 .get(&info.leaf.view_number())
427 {
428 info.leaf
429 .fill_block_payload_unchecked(payload.as_ref().payload.clone());
430 }
431
432 if let Some(ref payload) = info.leaf.block_payload() {
433 for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
434 txns.insert(txn);
435 }
436 }
437
438 current_leaf_info = consensus_reader
439 .parent_leaf_info(&info.leaf, public_key)
440 .await;
441 res.leaf_views.push(info.clone());
442 }
443
444 if !txns.is_empty() {
445 res.included_txns = Some(txns);
446 }
447
448 if with_epochs && res.new_decided_view_number.is_some() {
449 let Some(first_leaf) = res.leaf_views.first() else {
450 return res;
451 };
452 let epoch_height = consensus_reader.epoch_height;
453 consensus_reader
454 .metrics
455 .last_synced_block_height
456 .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
457 drop(consensus_reader);
458
459 for decided_leaf_info in &res.leaf_views {
460 decide_epoch_root::<TYPES, I, V>(
461 &decided_leaf_info.leaf,
462 epoch_height,
463 membership.membership(),
464 storage,
465 &consensus,
466 upgrade_lock,
467 )
468 .await;
469 }
470 update_metrics(&consensus, &res.leaf_views).await;
471 }
472
473 res
474}
475
476#[allow(clippy::too_many_arguments)]
508pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
509 proposal: &QuorumProposalWrapper<TYPES>,
510 consensus: OuterConsensus<TYPES>,
511 existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
512 public_key: &TYPES::SignatureKey,
513 with_epochs: bool,
514 membership: &Arc<RwLock<TYPES::Membership>>,
515 storage: &I::Storage,
516 epoch_height: u64,
517 upgrade_lock: &UpgradeLock<TYPES, V>,
518) -> LeafChainTraversalOutcome<TYPES> {
519 let consensus_reader = consensus.read().await;
520 let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
521 let view_number = proposal.view_number();
522 let parent_view_number = proposal.justify_qc().view_number();
523 let old_anchor_view = consensus_reader.last_decided_view();
524
525 let mut last_view_number_visited = view_number;
526 let mut current_chain_length = 0usize;
527 let mut res = LeafChainTraversalOutcome::default();
528
529 if let Err(e) = consensus_reader.visit_leaf_ancestors(
530 parent_view_number,
531 Terminator::Exclusive(old_anchor_view),
532 true,
533 |leaf, state, delta| {
534 if res.new_decided_view_number.is_none() {
536 if last_view_number_visited == leaf.view_number() + 1 {
538 last_view_number_visited = leaf.view_number();
539
540 current_chain_length += 1;
542
543 if current_chain_length == 2 {
545 res.new_locked_view_number = Some(leaf.view_number());
546 res.new_decide_qc = Some(leaf.justify_qc().clone());
549 } else if current_chain_length == 3 {
550 res.new_decided_view_number = Some(leaf.view_number());
552 }
553 } else {
554 return false;
557 }
558 }
559
560 if let Some(new_decided_view) = res.new_decided_view_number {
562 let mut leaf = leaf.clone();
564
565 if leaf.view_number() == new_decided_view {
567 consensus_reader
568 .metrics
569 .last_synced_block_height
570 .set(usize::try_from(leaf.height()).unwrap_or(0));
571 }
572
573 if let Some(cert) = leaf.upgrade_certificate() {
575 if leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
576 if cert.data.decide_by < view_number {
577 tracing::warn!(
578 "Failed to decide an upgrade certificate in time. Ignoring."
579 );
580 } else {
581 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
582 res.decided_upgrade_cert = Some(cert.clone());
583 }
584 }
585 }
586 if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
589 leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
590 }
591
592 let vid_share = consensus_reader
595 .vid_shares()
596 .get(&leaf.view_number())
597 .and_then(|key_map| key_map.get(public_key))
598 .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
599 .map(|prop| prop.data.clone());
600
601 let state_cert = if leaf.with_epoch
602 && is_epoch_root(
603 leaf.block_header().block_number(),
604 consensus_reader.epoch_height,
605 ) {
606 match consensus_reader.state_cert() {
607 Some(state_cert)
609 if state_cert.light_client_state.view_number
610 == leaf.view_number().u64() =>
611 {
612 Some(state_cert.clone())
613 },
614 _ => None,
615 }
616 } else {
617 None
618 };
619
620 res.leaf_views.push(LeafInfo::new(
622 leaf.clone(),
623 Arc::clone(&state),
624 delta.clone(),
625 vid_share,
626 state_cert,
627 ));
628 if let Some(ref payload) = leaf.block_payload() {
629 res.included_txns = Some(
630 payload
631 .transaction_commitments(leaf.block_header().metadata())
632 .into_iter()
633 .collect::<HashSet<_>>(),
634 );
635 }
636 }
637 true
638 },
639 ) {
640 tracing::debug!("Leaf ascension failed; error={e}");
641 }
642
643 let epoch_height = consensus_reader.epoch_height;
644 drop(consensus_reader);
645
646 if with_epochs && res.new_decided_view_number.is_some() {
647 for decided_leaf_info in &res.leaf_views {
648 decide_epoch_root::<TYPES, I, V>(
649 &decided_leaf_info.leaf,
650 epoch_height,
651 membership,
652 storage,
653 &consensus,
654 upgrade_lock,
655 )
656 .await;
657 }
658 }
659
660 res
661}
662
663#[instrument(skip_all)]
665#[allow(clippy::too_many_arguments)]
666pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
667 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
668 event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
669 membership: EpochMembershipCoordinator<TYPES>,
670 public_key: TYPES::SignatureKey,
671 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
672 consensus: OuterConsensus<TYPES>,
673 upgrade_lock: &UpgradeLock<TYPES, V>,
674 parent_qc: &QuorumCertificate2<TYPES>,
675 epoch_height: u64,
676) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
677 let consensus_reader = consensus.read().await;
678 let vsm_contains_parent_view = consensus_reader
679 .validated_state_map()
680 .contains_key(&parent_qc.view_number());
681 drop(consensus_reader);
682
683 if !vsm_contains_parent_view {
684 let _ = fetch_proposal(
685 parent_qc,
686 event_sender.clone(),
687 event_receiver.clone(),
688 membership,
689 consensus.clone(),
690 public_key.clone(),
691 private_key.clone(),
692 upgrade_lock,
693 epoch_height,
694 )
695 .await
696 .context(info!("Failed to fetch proposal"))?;
697 }
698
699 let consensus_reader = consensus.read().await;
700 let parent_view = consensus_reader.validated_state_map().get(&parent_qc.view_number()).context(
701 debug!("Couldn't find parent view in state map, waiting for replica to see proposal; parent_view_number: {}", *parent_qc.view_number())
702 )?;
703
704 let (leaf_commitment, state) = parent_view.leaf_and_state().context(
705 info!("Parent of high QC points to a view without a proposal; parent_view_number: {}, parent_view {:?}", *parent_qc.view_number(), parent_view)
706 )?;
707
708 if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
709 tracing::debug!(
711 "They don't equal: {:?} {:?}",
712 leaf_commitment,
713 consensus_reader.high_qc().data().leaf_commit
714 );
715 }
716
717 let leaf = consensus_reader
718 .saved_leaves()
719 .get(&leaf_commitment)
720 .context(info!("Failed to find high QC of parent"))?;
721
722 Ok((leaf.clone(), Arc::clone(state)))
723}
724
725pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
726 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
727 validation_info: &ValidationInfo<TYPES, I, V>,
728) -> Result<()> {
729 let in_transition_epoch = proposal
730 .data
731 .justify_qc()
732 .data
733 .block_number
734 .is_some_and(|bn| {
735 !is_transition_block(bn, validation_info.epoch_height)
736 && is_epoch_transition(bn, validation_info.epoch_height)
737 && bn % validation_info.epoch_height != 0
738 });
739 let justify_qc = proposal.data.justify_qc();
740 let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
741 if !in_transition_epoch {
742 tracing::debug!(
743 "Storing high QC for view {:?} and height {:?}",
744 justify_qc.view_number(),
745 justify_qc.data.block_number
746 );
747 if let Err(e) = validation_info
748 .storage
749 .update_high_qc2(justify_qc.clone())
750 .await
751 {
752 bail!("Failed to store High QC, not voting; error = {e:?}");
753 }
754 if justify_qc
755 .data
756 .block_number
757 .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
758 {
759 let Some(state_cert) = proposal.data.state_cert() else {
760 bail!("Epoch root QC has no state cert, not voting!");
761 };
762 if let Err(e) = validation_info
763 .storage
764 .update_state_cert(state_cert.clone())
765 .await
766 {
767 bail!("Failed to store the light client state update certificate, not voting; error = {:?}", e);
768 }
769 validation_info
770 .consensus
771 .write()
772 .await
773 .update_state_cert(state_cert.clone())?;
774 }
775 if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
776 if let Err(e) = validation_info
777 .storage
778 .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
779 .await
780 {
781 bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
782 }
783 }
784 }
785 let mut consensus_writer = validation_info.consensus.write().await;
786 if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
787 if justify_qc
788 .data
789 .block_number
790 .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
791 {
792 consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
793 consensus_writer
794 .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
795 return Ok(());
796 }
797 consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
798 }
799 consensus_writer.update_high_qc(justify_qc.clone())?;
800
801 Ok(())
802}
803
804async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
805 validation_info: &ValidationInfo<TYPES, I, V>,
806) -> Option<(
807 QuorumCertificate2<TYPES>,
808 NextEpochQuorumCertificate2<TYPES>,
809)> {
810 validation_info
811 .consensus
812 .read()
813 .await
814 .transition_qc()
815 .cloned()
816}
817
818pub(crate) async fn validate_epoch_transition_qc<
819 TYPES: NodeType,
820 I: NodeImplementation<TYPES>,
821 V: Versions,
822>(
823 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
824 validation_info: &ValidationInfo<TYPES, I, V>,
825) -> Result<()> {
826 let proposed_qc = proposal.data.justify_qc();
827 let Some(qc_block_number) = proposed_qc.data().block_number else {
828 bail!("Justify QC has no block number");
829 };
830 if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
831 || qc_block_number % validation_info.epoch_height == 0
832 {
833 return Ok(());
834 }
835 let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
836 bail!("Next epoch justify QC is not present");
837 };
838 ensure!(
839 next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
840 "Next epoch QC has different leaf commit to justify QC"
841 );
842
843 if is_transition_block(qc_block_number, validation_info.epoch_height) {
844 ensure!(
846 transition_qc(validation_info).await.is_none_or(
847 |(qc, _)| qc.view_number() <= proposed_qc.view_number()
848 ),
849 "Proposed transition qc must have view number greater than or equal to previous transition QC"
850 );
851
852 validation_info
853 .consensus
854 .write()
855 .await
856 .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
857 update_high_qc(proposal, validation_info).await?;
859 } else {
860 ensure!(
862 transition_qc(validation_info)
863 .await
864 .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
865 "Transition block must have view number greater than previous transition QC"
866 );
867 ensure!(
868 proposal.data.view_change_evidence().is_none(),
869 "Second to last block and last block of epoch must directly extend previous block, Qc Block number: {qc_block_number}, Proposal Block number: {}",
870 proposal.data.block_header().block_number()
871 );
872 ensure!(
873 proposed_qc.view_number() + 1 == proposal.data.view_number()
874 || transition_qc(validation_info).await.is_some_and(|(qc, _)| &qc == proposed_qc),
875 "Transition proposals must extend the previous view directly, or extend the previous transition block"
876 );
877 }
878 Ok(())
879}
880
881#[allow(clippy::too_many_lines)]
888#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
889pub async fn validate_proposal_safety_and_liveness<
890 TYPES: NodeType,
891 I: NodeImplementation<TYPES>,
892 V: Versions,
893>(
894 proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
895 parent_leaf: Leaf2<TYPES>,
896 validation_info: &ValidationInfo<TYPES, I, V>,
897 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
898 sender: TYPES::SignatureKey,
899) -> Result<()> {
900 let view_number = proposal.data.view_number();
901
902 let mut valid_epoch_transition = false;
903 if validation_info
904 .upgrade_lock
905 .version(proposal.data.justify_qc().view_number())
906 .await
907 .is_ok_and(|v| v >= V::Epochs::VERSION)
908 {
909 let Some(block_number) = proposal.data.justify_qc().data.block_number else {
910 bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
911 };
912 if is_epoch_transition(block_number, validation_info.epoch_height) {
913 validate_epoch_transition_qc(&proposal, validation_info).await?;
914 valid_epoch_transition = true;
915 }
916 }
917
918 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
919 ensure!(
920 proposed_leaf.parent_commitment() == parent_leaf.commit(),
921 "Proposed leaf does not extend the parent leaf."
922 );
923 let proposal_epoch = option_epoch_from_block_number::<TYPES>(
924 validation_info
925 .upgrade_lock
926 .epochs_enabled(view_number)
927 .await,
928 proposed_leaf.height(),
929 validation_info.epoch_height,
930 );
931
932 let state = Arc::new(
933 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
934 );
935
936 {
937 let mut consensus_writer = validation_info.consensus.write().await;
938 if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
939 tracing::trace!("{e:?}");
940 }
941
942 if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
945 tracing::debug!("Internal proposal update failed; error = {e:#}");
946 };
947 }
948
949 UpgradeCertificate::validate(
950 proposal.data.upgrade_certificate(),
951 &validation_info.membership,
952 proposal_epoch,
953 &validation_info.upgrade_lock,
954 )
955 .await?;
956
957 proposed_leaf
959 .extends_upgrade(
960 &parent_leaf,
961 &validation_info.upgrade_lock.decided_upgrade_certificate,
962 )
963 .await?;
964
965 let justify_qc = proposal.data.justify_qc().clone();
966 {
970 let consensus_reader = validation_info.consensus.read().await;
971 let justify_qc_epoch = option_epoch_from_block_number::<TYPES>(
976 validation_info
977 .upgrade_lock
978 .epochs_enabled(view_number)
979 .await,
980 parent_leaf.height(),
981 validation_info.epoch_height,
982 );
983 ensure!(
984 proposal_epoch == justify_qc_epoch
985 || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
986 {
987 error!("Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify QC leaf is {parent_leaf:?}")
988 }
989 );
990
991 if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
993 && validation_info
994 .upgrade_lock
995 .epochs_enabled(view_number)
996 .await
997 {
998 ensure!(proposal.data.next_epoch_justify_qc().is_some(),
999 "Epoch transition proposal does not include the next epoch justify QC. Do not vote!");
1000 }
1001
1002 let liveness_check =
1004 justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1005
1006 let outcome = consensus_reader.visit_leaf_ancestors(
1009 justify_qc.view_number(),
1010 Terminator::Inclusive(consensus_reader.locked_view()),
1011 false,
1012 |leaf, _, _| {
1013 leaf.view_number() != consensus_reader.locked_view()
1016 },
1017 );
1018 let safety_check = outcome.is_ok();
1019
1020 ensure!(safety_check || liveness_check, {
1021 if let Err(e) = outcome {
1022 broadcast_event(
1023 Event {
1024 view_number,
1025 event: EventType::Error { error: Arc::new(e) },
1026 },
1027 &validation_info.output_event_stream,
1028 )
1029 .await;
1030 }
1031
1032 error!("Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked view is {:?}", consensus_reader.high_qc(), proposal.data, consensus_reader.locked_view())
1033 });
1034 }
1035
1036 broadcast_event(
1038 Event {
1039 view_number,
1040 event: EventType::QuorumProposal {
1041 proposal: proposal.clone(),
1042 sender,
1043 },
1044 },
1045 &validation_info.output_event_stream,
1046 )
1047 .await;
1048
1049 broadcast_event(
1051 Arc::new(HotShotEvent::QuorumProposalValidated(
1052 proposal.clone(),
1053 parent_leaf,
1054 )),
1055 &event_stream,
1056 )
1057 .await;
1058
1059 Ok(())
1060}
1061
1062pub(crate) async fn validate_proposal_view_and_certs<
1069 TYPES: NodeType,
1070 I: NodeImplementation<TYPES>,
1071 V: Versions,
1072>(
1073 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1074 validation_info: &ValidationInfo<TYPES, I, V>,
1075) -> Result<()> {
1076 let view_number = proposal.data.view_number();
1077 ensure!(
1078 view_number >= validation_info.consensus.read().await.cur_view(),
1079 "Proposal is from an older view {:?}",
1080 proposal.data
1081 );
1082
1083 let mut membership = validation_info.membership.clone();
1085 proposal.validate_signature(&membership).await?;
1086
1087 if proposal.data.justify_qc().view_number() != view_number - 1 {
1089 let received_proposal_cert =
1090 proposal.data.view_change_evidence().clone().context(debug!(
1091 "Quorum proposal for view {view_number} needed a timeout or view sync certificate, but did not have one",
1092 ))?;
1093
1094 match received_proposal_cert {
1095 ViewChangeEvidence2::Timeout(timeout_cert) => {
1096 ensure!(
1097 timeout_cert.data().view == view_number - 1,
1098 "Timeout certificate for view {view_number} was not for the immediately preceding view"
1099 );
1100 let timeout_cert_epoch = timeout_cert.data().epoch();
1101 membership = membership.get_new_epoch(timeout_cert_epoch).await?;
1102
1103 let membership_stake_table = membership.stake_table().await;
1104 let membership_success_threshold = membership.success_threshold().await;
1105
1106 timeout_cert
1107 .is_valid_cert(
1108 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1109 membership_success_threshold,
1110 &validation_info.upgrade_lock,
1111 )
1112 .await
1113 .context(|e| {
1114 warn!("Timeout certificate for view {view_number} was invalid: {e}")
1115 })?;
1116 },
1117 ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1118 ensure!(
1119 view_sync_cert.view_number == view_number,
1120 "View sync cert view number {:?} does not match proposal view number {:?}",
1121 view_sync_cert.view_number,
1122 view_number
1123 );
1124
1125 let view_sync_cert_epoch = view_sync_cert.data().epoch();
1126 membership = membership.get_new_epoch(view_sync_cert_epoch).await?;
1127
1128 let membership_stake_table = membership.stake_table().await;
1129 let membership_success_threshold = membership.success_threshold().await;
1130
1131 view_sync_cert
1133 .is_valid_cert(
1134 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1135 membership_success_threshold,
1136 &validation_info.upgrade_lock,
1137 )
1138 .await
1139 .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1140 },
1141 }
1142 }
1143
1144 {
1147 let epoch = option_epoch_from_block_number::<TYPES>(
1148 proposal.data.epoch().is_some(),
1149 proposal.data.block_header().block_number(),
1150 validation_info.epoch_height,
1151 );
1152 UpgradeCertificate::validate(
1153 proposal.data.upgrade_certificate(),
1154 &validation_info.membership,
1155 epoch,
1156 &validation_info.upgrade_lock,
1157 )
1158 .await?;
1159 }
1160
1161 Ok(())
1162}
1163
1164pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1166 match sender.broadcast_direct(event).await {
1167 Ok(None) => (),
1168 Ok(Some(overflowed)) => {
1169 tracing::error!(
1170 "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1171 );
1172 },
1173 Err(SendError(e)) => {
1174 tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1175 },
1176 }
1177}
1178
1179pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType, V: Versions>(
1182 qc: &QuorumCertificate2<TYPES>,
1183 maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1184 consensus: &OuterConsensus<TYPES>,
1185 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1186 upgrade_lock: &UpgradeLock<TYPES, V>,
1187 epoch_height: u64,
1188) -> Result<()> {
1189 let mut epoch_membership = membership_coordinator
1190 .stake_table_for_epoch(qc.data.epoch)
1191 .await?;
1192
1193 let membership_stake_table = epoch_membership.stake_table().await;
1194 let membership_success_threshold = epoch_membership.success_threshold().await;
1195
1196 if let Err(e) = qc
1197 .is_valid_cert(
1198 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1199 membership_success_threshold,
1200 upgrade_lock,
1201 )
1202 .await
1203 {
1204 consensus.read().await.metrics.invalid_qc.update(1);
1205 return Err(warn!("Invalid certificate: {e}"));
1206 }
1207
1208 if upgrade_lock.epochs_enabled(qc.view_number()).await {
1209 ensure!(
1210 qc.data.block_number.is_some(),
1211 "QC for epoch {:?} has no block number",
1212 qc.data.epoch
1213 );
1214 }
1215
1216 if qc
1217 .data
1218 .block_number
1219 .is_some_and(|b| is_epoch_transition(b, epoch_height))
1220 {
1221 ensure!(
1222 maybe_next_epoch_qc.is_some(),
1223 error!("Received High QC for the transition block but not the next epoch QC")
1224 );
1225 }
1226
1227 if let Some(next_epoch_qc) = maybe_next_epoch_qc {
1228 if qc.view_number() != next_epoch_qc.view_number() || qc.data != *next_epoch_qc.data {
1230 bail!("Next epoch qc exists but it's not equal with qc.");
1231 }
1232 epoch_membership = epoch_membership.next_epoch_stake_table().await?;
1233 let membership_next_stake_table = epoch_membership.stake_table().await;
1234 let membership_next_success_threshold = epoch_membership.success_threshold().await;
1235
1236 next_epoch_qc
1238 .is_valid_cert(
1239 &StakeTableEntries::<TYPES>::from(membership_next_stake_table).0,
1240 membership_next_success_threshold,
1241 upgrade_lock,
1242 )
1243 .await
1244 .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1245 }
1246 Ok(())
1247}
1248
1249pub async fn validate_light_client_state_update_certificate<TYPES: NodeType>(
1251 state_cert: &LightClientStateUpdateCertificate<TYPES>,
1252 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1253) -> Result<()> {
1254 tracing::debug!("Validating light client state update certificate");
1255
1256 let epoch_membership = membership_coordinator
1257 .membership_for_epoch(state_cert.epoch())
1258 .await?;
1259
1260 let membership_stake_table = epoch_membership.stake_table().await;
1261 let membership_success_threshold = epoch_membership.success_threshold().await;
1262
1263 let mut state_key_map = HashMap::new();
1264 membership_stake_table.into_iter().for_each(|config| {
1265 state_key_map.insert(
1266 config.state_ver_key.clone(),
1267 config.stake_table_entry.stake(),
1268 );
1269 });
1270
1271 let mut accumulated_stake = U256::from(0);
1272 for (key, sig) in state_cert.signatures.iter() {
1273 if let Some(stake) = state_key_map.get(key) {
1274 accumulated_stake += *stake;
1275 if !key.verify_state_sig(
1276 sig,
1277 &state_cert.light_client_state,
1278 &state_cert.next_stake_table_state,
1279 ) {
1280 bail!("Invalid light client state update certificate signature");
1281 }
1282 } else {
1283 bail!("Invalid light client state update certificate signature");
1284 }
1285 }
1286 if accumulated_stake < membership_success_threshold {
1287 bail!("Light client state update certificate does not meet the success threshold");
1288 }
1289
1290 Ok(())
1291}
1292
1293pub(crate) fn check_qc_state_cert_correspondence<TYPES: NodeType>(
1294 qc: &QuorumCertificate2<TYPES>,
1295 state_cert: &LightClientStateUpdateCertificate<TYPES>,
1296 epoch_height: u64,
1297) -> bool {
1298 qc.data
1299 .block_number
1300 .is_some_and(|bn| is_epoch_root(bn, epoch_height))
1301 && Some(state_cert.epoch) == qc.data.epoch()
1302 && qc.view_number().u64() == state_cert.light_client_state.view_number
1303}
1304
1305pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1309 target_epoch: Option<TYPES::Epoch>,
1310 vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1311 da_cert: &DaCertificate2<TYPES>,
1312 consensus: &OuterConsensus<TYPES>,
1313 receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1314 cancel_receiver: Receiver<()>,
1315 id: u64,
1316) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1317 tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1318 let maybe_second_vid_share = consensus
1319 .read()
1320 .await
1321 .vid_shares()
1322 .get(&vid_share.data.view_number())
1323 .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1324 .and_then(|epoch_map| epoch_map.get(&target_epoch))
1325 .cloned();
1326 if let Some(second_vid_share) = maybe_second_vid_share {
1327 if (target_epoch == da_cert.epoch()
1328 && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1329 || (target_epoch != da_cert.epoch()
1330 && Some(second_vid_share.data.payload_commitment())
1331 == da_cert.data().next_epoch_payload_commit)
1332 {
1333 return Ok(second_vid_share);
1334 }
1335 }
1336
1337 let receiver = receiver.clone();
1338 let da_cert_clone = da_cert.clone();
1339 let Some(event) = EventDependency::new(
1340 receiver,
1341 cancel_receiver,
1342 format!(
1343 "VoteDependency Second VID share for view {:?}, my id {:?}",
1344 vid_share.data.view_number(),
1345 id
1346 ),
1347 Box::new(move |event| {
1348 let event = event.as_ref();
1349 if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1350 if target_epoch == da_cert_clone.epoch() {
1351 second_vid_share.data.payload_commitment()
1352 == da_cert_clone.data().payload_commit
1353 } else {
1354 Some(second_vid_share.data.payload_commitment())
1355 == da_cert_clone.data().next_epoch_payload_commit
1356 }
1357 } else {
1358 false
1359 }
1360 }),
1361 )
1362 .completed()
1363 .await
1364 else {
1365 return Err(warn!("Error while waiting for the second VID share."));
1366 };
1367 let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1368 return Err(warn!("Received event is not VidShareValidated but we checked it earlier. Shouldn't be possible."));
1370 };
1371 Ok(second_vid_share.clone())
1372}
1373
1374pub async fn broadcast_view_change<TYPES: NodeType>(
1375 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1376 new_view_number: TYPES::View,
1377 epoch: Option<TYPES::Epoch>,
1378 first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
1379) {
1380 let mut broadcast_epoch = epoch;
1381 if let Some((first_epoch_view, first_epoch)) = first_epoch {
1382 if new_view_number == first_epoch_view && broadcast_epoch != Some(first_epoch) {
1383 broadcast_epoch = Some(first_epoch);
1384 }
1385 }
1386 tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1387 broadcast_event(
1388 Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1389 sender,
1390 )
1391 .await
1392}