1use std::{
10 collections::{BTreeMap, HashMap},
11 mem::ManuallyDrop,
12 ops::{Deref, DerefMut},
13 sync::Arc,
14};
15
16use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
17use committable::{Commitment, Committable};
18use hotshot_utils::anytrace::*;
19use tracing::instrument;
20use vec1::Vec1;
21
22pub use crate::utils::{View, ViewInner};
23use crate::{
24 data::{
25 Leaf2, QuorumProposalWrapper, VidCommitment, VidDisperse, VidDisperseAndDuration,
26 VidDisperseShare,
27 },
28 drb::DrbResults,
29 epoch_membership::EpochMembershipCoordinator,
30 error::HotShotError,
31 event::{HotShotAction, LeafInfo},
32 message::{Proposal, UpgradeLock},
33 simple_certificate::{
34 DaCertificate2, LightClientStateUpdateCertificate, NextEpochQuorumCertificate2,
35 QuorumCertificate2,
36 },
37 simple_vote::HasEpoch,
38 traits::{
39 block_contents::{BlockHeader, BuilderFee},
40 metrics::{Counter, Gauge, Histogram, Metrics, NoMetrics},
41 node_implementation::{ConsensusTime, NodeType, Versions},
42 signature_key::SignatureKey,
43 BlockPayload, ValidatedState,
44 },
45 utils::{
46 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_ge_epoch_root,
47 is_last_block, is_transition_block, option_epoch_from_block_number, BuilderCommitment,
48 LeafCommitment, StateAndDelta, Terminator,
49 },
50 vote::{Certificate, HasViewNumber},
51};
52
53pub type CommitmentMap<T> = HashMap<Commitment<T>, T>;
55
56pub type VidShares<TYPES> = BTreeMap<
58 <TYPES as NodeType>::View,
59 HashMap<
60 <TYPES as NodeType>::SignatureKey,
61 BTreeMap<Option<<TYPES as NodeType>::Epoch>, Proposal<TYPES, VidDisperseShare<TYPES>>>,
62 >,
63>;
64
65pub type LockedConsensusState<TYPES> = Arc<RwLock<Consensus<TYPES>>>;
67
68#[derive(Clone, Debug)]
70pub struct OuterConsensus<TYPES: NodeType> {
71 pub inner_consensus: LockedConsensusState<TYPES>,
73}
74
75impl<TYPES: NodeType> OuterConsensus<TYPES> {
76 pub fn new(consensus: LockedConsensusState<TYPES>) -> Self {
78 Self {
79 inner_consensus: consensus,
80 }
81 }
82
83 #[instrument(skip_all, target = "OuterConsensus")]
85 pub async fn read(&self) -> ConsensusReadLockGuard<'_, TYPES> {
86 tracing::trace!("Trying to acquire read lock on consensus");
87 let ret = self.inner_consensus.read().await;
88 tracing::trace!("Acquired read lock on consensus");
89 ConsensusReadLockGuard::new(ret)
90 }
91
92 #[instrument(skip_all, target = "OuterConsensus")]
94 pub async fn write(&self) -> ConsensusWriteLockGuard<'_, TYPES> {
95 tracing::trace!("Trying to acquire write lock on consensus");
96 let ret = self.inner_consensus.write().await;
97 tracing::trace!("Acquired write lock on consensus");
98 ConsensusWriteLockGuard::new(ret)
99 }
100
101 #[instrument(skip_all, target = "OuterConsensus")]
103 pub fn try_write(&self) -> Option<ConsensusWriteLockGuard<'_, TYPES>> {
104 tracing::trace!("Trying to acquire write lock on consensus");
105 let ret = self.inner_consensus.try_write();
106 if let Some(guard) = ret {
107 tracing::trace!("Acquired write lock on consensus");
108 Some(ConsensusWriteLockGuard::new(guard))
109 } else {
110 tracing::trace!("Failed to acquire write lock");
111 None
112 }
113 }
114
115 #[instrument(skip_all, target = "OuterConsensus")]
117 pub async fn upgradable_read(&self) -> ConsensusUpgradableReadLockGuard<'_, TYPES> {
118 tracing::trace!("Trying to acquire upgradable read lock on consensus");
119 let ret = self.inner_consensus.upgradable_read().await;
120 tracing::trace!("Acquired upgradable read lock on consensus");
121 ConsensusUpgradableReadLockGuard::new(ret)
122 }
123
124 #[instrument(skip_all, target = "OuterConsensus")]
126 pub fn try_read(&self) -> Option<ConsensusReadLockGuard<'_, TYPES>> {
127 tracing::trace!("Trying to acquire read lock on consensus");
128 let ret = self.inner_consensus.try_read();
129 if let Some(guard) = ret {
130 tracing::trace!("Acquired read lock on consensus");
131 Some(ConsensusReadLockGuard::new(guard))
132 } else {
133 tracing::trace!("Failed to acquire read lock");
134 None
135 }
136 }
137}
138
139pub struct ConsensusReadLockGuard<'a, TYPES: NodeType> {
141 lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>,
143}
144
145impl<'a, TYPES: NodeType> ConsensusReadLockGuard<'a, TYPES> {
146 #[must_use]
148 pub fn new(lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>) -> Self {
149 Self { lock_guard }
150 }
151}
152
153impl<TYPES: NodeType> Deref for ConsensusReadLockGuard<'_, TYPES> {
154 type Target = Consensus<TYPES>;
155 fn deref(&self) -> &Self::Target {
156 &self.lock_guard
157 }
158}
159
160impl<TYPES: NodeType> Drop for ConsensusReadLockGuard<'_, TYPES> {
161 #[instrument(skip_all, target = "ConsensusReadLockGuard")]
162 fn drop(&mut self) {
163 tracing::trace!("Read lock on consensus dropped");
164 }
165}
166
167pub struct ConsensusWriteLockGuard<'a, TYPES: NodeType> {
169 lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>,
171}
172
173impl<'a, TYPES: NodeType> ConsensusWriteLockGuard<'a, TYPES> {
174 #[must_use]
176 pub fn new(lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>) -> Self {
177 Self { lock_guard }
178 }
179}
180
181impl<TYPES: NodeType> Deref for ConsensusWriteLockGuard<'_, TYPES> {
182 type Target = Consensus<TYPES>;
183 fn deref(&self) -> &Self::Target {
184 &self.lock_guard
185 }
186}
187
188impl<TYPES: NodeType> DerefMut for ConsensusWriteLockGuard<'_, TYPES> {
189 fn deref_mut(&mut self) -> &mut Self::Target {
190 &mut self.lock_guard
191 }
192}
193
194impl<TYPES: NodeType> Drop for ConsensusWriteLockGuard<'_, TYPES> {
195 #[instrument(skip_all, target = "ConsensusWriteLockGuard")]
196 fn drop(&mut self) {
197 tracing::debug!("Write lock on consensus dropped");
198 }
199}
200
201pub struct ConsensusUpgradableReadLockGuard<'a, TYPES: NodeType> {
203 lock_guard: ManuallyDrop<RwLockUpgradableReadGuard<'a, Consensus<TYPES>>>,
205 taken: bool,
207}
208
209impl<'a, TYPES: NodeType> ConsensusUpgradableReadLockGuard<'a, TYPES> {
210 #[must_use]
212 pub fn new(lock_guard: RwLockUpgradableReadGuard<'a, Consensus<TYPES>>) -> Self {
213 Self {
214 lock_guard: ManuallyDrop::new(lock_guard),
215 taken: false,
216 }
217 }
218
219 #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
221 pub async fn upgrade(mut guard: Self) -> ConsensusWriteLockGuard<'a, TYPES> {
222 let inner_guard = unsafe { ManuallyDrop::take(&mut guard.lock_guard) };
223 guard.taken = true;
224 tracing::debug!("Trying to upgrade upgradable read lock on consensus");
225 let ret = RwLockUpgradableReadGuard::upgrade(inner_guard).await;
226 tracing::debug!("Upgraded upgradable read lock on consensus");
227 ConsensusWriteLockGuard::new(ret)
228 }
229}
230
231impl<TYPES: NodeType> Deref for ConsensusUpgradableReadLockGuard<'_, TYPES> {
232 type Target = Consensus<TYPES>;
233
234 fn deref(&self) -> &Self::Target {
235 &self.lock_guard
236 }
237}
238
239impl<TYPES: NodeType> Drop for ConsensusUpgradableReadLockGuard<'_, TYPES> {
240 #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
241 fn drop(&mut self) {
242 if !self.taken {
243 unsafe { ManuallyDrop::drop(&mut self.lock_guard) }
244 tracing::debug!("Upgradable read lock on consensus dropped");
245 }
246 }
247}
248
249#[derive(Debug, Clone, Copy)]
251struct HotShotActionViews<T: ConsensusTime> {
252 proposed: T,
254 voted: T,
256 da_proposed: T,
258 da_vote: T,
260}
261
262impl<T: ConsensusTime> Default for HotShotActionViews<T> {
263 fn default() -> Self {
264 let genesis = T::genesis();
265 Self {
266 proposed: genesis,
267 voted: genesis,
268 da_proposed: genesis,
269 da_vote: genesis,
270 }
271 }
272}
273impl<T: ConsensusTime> HotShotActionViews<T> {
274 fn from_view(view: T) -> Self {
276 Self {
277 proposed: view,
278 voted: view,
279 da_proposed: view,
280 da_vote: view,
281 }
282 }
283}
284#[derive(derive_more::Debug, Clone)]
288pub struct Consensus<TYPES: NodeType> {
289 validated_state_map: BTreeMap<TYPES::View, View<TYPES>>,
291
292 vid_shares: VidShares<TYPES>,
294
295 saved_da_certs: HashMap<TYPES::View, DaCertificate2<TYPES>>,
298
299 cur_view: TYPES::View,
301
302 cur_epoch: Option<TYPES::Epoch>,
304
305 last_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
308
309 last_decided_view: TYPES::View,
311
312 locked_view: TYPES::View,
314
315 saved_leaves: CommitmentMap<Leaf2<TYPES>>,
319
320 last_actions: HotShotActionViews<TYPES::View>,
324
325 saved_payloads: BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>>,
329
330 high_qc: QuorumCertificate2<TYPES>,
332
333 next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
335
336 pub metrics: Arc<ConsensusMetricsValue>,
338
339 pub epoch_height: u64,
341
342 pub drb_difficulty: u64,
344
345 pub drb_upgrade_difficulty: u64,
347
348 pub drb_results: DrbResults<TYPES>,
350
351 transition_qc: Option<(
353 QuorumCertificate2<TYPES>,
354 NextEpochQuorumCertificate2<TYPES>,
355 )>,
356
357 pub highest_block: u64,
359 pub state_cert: Option<LightClientStateUpdateCertificate<TYPES>>,
361}
362
363#[derive(Debug, Clone, Hash, Eq, PartialEq)]
365pub struct PayloadWithMetadata<TYPES: NodeType> {
366 pub payload: TYPES::BlockPayload,
367 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
368}
369
370#[derive(Clone, Debug)]
372pub struct ConsensusMetricsValue {
373 pub last_synced_block_height: Box<dyn Gauge>,
375 pub last_decided_view: Box<dyn Gauge>,
377 pub last_voted_view: Box<dyn Gauge>,
379 pub last_decided_time: Box<dyn Gauge>,
381 pub current_view: Box<dyn Gauge>,
383 pub number_of_views_since_last_decide: Box<dyn Gauge>,
385 pub number_of_views_per_decide_event: Box<dyn Histogram>,
387 pub view_duration_as_leader: Box<dyn Histogram>,
389 pub invalid_qc: Box<dyn Gauge>,
391 pub outstanding_transactions: Box<dyn Gauge>,
393 pub outstanding_transactions_memory_size: Box<dyn Gauge>,
395 pub number_of_timeouts: Box<dyn Counter>,
397 pub number_of_timeouts_as_leader: Box<dyn Counter>,
399 pub number_of_empty_blocks_proposed: Box<dyn Counter>,
401 pub internal_event_queue_len: Box<dyn Gauge>,
403 pub proposal_to_decide_time: Box<dyn Histogram>,
405 pub previous_proposal_to_proposal_time: Box<dyn Histogram>,
407 pub finalized_bytes: Box<dyn Histogram>,
409 pub validate_and_apply_header_duration: Box<dyn Histogram>,
411 pub update_leaf_duration: Box<dyn Histogram>,
413 pub vid_disperse_duration: Box<dyn Histogram>,
415}
416
417impl ConsensusMetricsValue {
418 #[must_use]
420 pub fn new(metrics: &dyn Metrics) -> Self {
421 Self {
422 last_synced_block_height: metrics
423 .create_gauge(String::from("last_synced_block_height"), None),
424 last_decided_view: metrics.create_gauge(String::from("last_decided_view"), None),
425 last_voted_view: metrics.create_gauge(String::from("last_voted_view"), None),
426 last_decided_time: metrics.create_gauge(String::from("last_decided_time"), None),
427 current_view: metrics.create_gauge(String::from("current_view"), None),
428 number_of_views_since_last_decide: metrics
429 .create_gauge(String::from("number_of_views_since_last_decide"), None),
430 number_of_views_per_decide_event: metrics
431 .create_histogram(String::from("number_of_views_per_decide_event"), None),
432 view_duration_as_leader: metrics
433 .create_histogram(String::from("view_duration_as_leader"), None),
434 invalid_qc: metrics.create_gauge(String::from("invalid_qc"), None),
435 outstanding_transactions: metrics
436 .create_gauge(String::from("outstanding_transactions"), None),
437 outstanding_transactions_memory_size: metrics
438 .create_gauge(String::from("outstanding_transactions_memory_size"), None),
439 number_of_timeouts: metrics.create_counter(String::from("number_of_timeouts"), None),
440 number_of_timeouts_as_leader: metrics
441 .create_counter(String::from("number_of_timeouts_as_leader"), None),
442 number_of_empty_blocks_proposed: metrics
443 .create_counter(String::from("number_of_empty_blocks_proposed"), None),
444 internal_event_queue_len: metrics
445 .create_gauge(String::from("internal_event_queue_len"), None),
446 proposal_to_decide_time: metrics
447 .create_histogram(String::from("proposal_to_decide_time"), None),
448 previous_proposal_to_proposal_time: metrics
449 .create_histogram(String::from("previous_proposal_to_proposal_time"), None),
450 finalized_bytes: metrics.create_histogram(String::from("finalized_bytes"), None),
451 validate_and_apply_header_duration: metrics.create_histogram(
452 String::from("validate_and_apply_header_duration"),
453 Some("seconds".to_string()),
454 ),
455 update_leaf_duration: metrics.create_histogram(
456 String::from("update_leaf_duration"),
457 Some("seconds".to_string()),
458 ),
459 vid_disperse_duration: metrics.create_histogram(
460 String::from("vid_disperse_duration"),
461 Some("seconds".to_string()),
462 ),
463 }
464 }
465}
466
467impl Default for ConsensusMetricsValue {
468 fn default() -> Self {
469 Self::new(&*NoMetrics::boxed())
470 }
471}
472
473impl<TYPES: NodeType> Consensus<TYPES> {
474 #[allow(clippy::too_many_arguments)]
476 pub fn new(
477 validated_state_map: BTreeMap<TYPES::View, View<TYPES>>,
478 vid_shares: Option<VidShares<TYPES>>,
479 cur_view: TYPES::View,
480 cur_epoch: Option<TYPES::Epoch>,
481 locked_view: TYPES::View,
482 last_decided_view: TYPES::View,
483 last_actioned_view: TYPES::View,
484 last_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
485 saved_leaves: CommitmentMap<Leaf2<TYPES>>,
486 saved_payloads: BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>>,
487 high_qc: QuorumCertificate2<TYPES>,
488 next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
489 metrics: Arc<ConsensusMetricsValue>,
490 epoch_height: u64,
491 state_cert: Option<LightClientStateUpdateCertificate<TYPES>>,
492 drb_difficulty: u64,
493 drb_upgrade_difficulty: u64,
494 ) -> Self {
495 let transition_qc = if let Some(ref next_epoch_high_qc) = next_epoch_high_qc {
496 if high_qc
497 .data
498 .block_number
499 .is_some_and(|bn| is_transition_block(bn, epoch_height))
500 {
501 if high_qc.data.leaf_commit == next_epoch_high_qc.data.leaf_commit {
502 Some((high_qc.clone(), next_epoch_high_qc.clone()))
503 } else {
504 tracing::error!("Next epoch high QC has different leaf commit to high QC");
505 None
506 }
507 } else {
508 None
509 }
510 } else {
511 None
512 };
513 Consensus {
514 validated_state_map,
515 vid_shares: vid_shares.unwrap_or_default(),
516 saved_da_certs: HashMap::new(),
517 cur_view,
518 cur_epoch,
519 last_decided_view,
520 last_proposals,
521 last_actions: HotShotActionViews::from_view(last_actioned_view),
522 locked_view,
523 saved_leaves,
524 saved_payloads,
525 high_qc,
526 next_epoch_high_qc,
527 metrics,
528 epoch_height,
529 drb_results: DrbResults::new(),
530 transition_qc,
531 highest_block: 0,
532 state_cert,
533 drb_difficulty,
534 drb_upgrade_difficulty,
535 }
536 }
537
538 pub fn cur_view(&self) -> TYPES::View {
540 self.cur_view
541 }
542
543 pub fn cur_epoch(&self) -> Option<TYPES::Epoch> {
545 self.cur_epoch
546 }
547
548 pub fn last_decided_view(&self) -> TYPES::View {
550 self.last_decided_view
551 }
552
553 pub fn locked_view(&self) -> TYPES::View {
555 self.locked_view
556 }
557
558 pub fn high_qc(&self) -> &QuorumCertificate2<TYPES> {
560 &self.high_qc
561 }
562
563 pub fn transition_qc(
565 &self,
566 ) -> Option<&(
567 QuorumCertificate2<TYPES>,
568 NextEpochQuorumCertificate2<TYPES>,
569 )> {
570 self.transition_qc.as_ref()
571 }
572
573 pub fn update_highest_block(&mut self, block_number: u64) {
575 if block_number > self.highest_block {
576 self.highest_block = block_number;
577 return;
578 }
579
580 if is_epoch_transition(block_number, self.epoch_height) {
581 let new_epoch = epoch_from_block_number(block_number, self.epoch_height);
582 let high_epoch = epoch_from_block_number(self.highest_block, self.epoch_height);
583 if new_epoch >= high_epoch {
584 self.highest_block = block_number;
585 }
586 }
587 }
588
589 pub fn update_transition_qc(
591 &mut self,
592 qc: QuorumCertificate2<TYPES>,
593 next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
594 ) {
595 if next_epoch_qc.data.leaf_commit != qc.data().leaf_commit {
596 tracing::error!(
597 "Next epoch QC for view {} has different leaf commit {:?} to {:?}",
598 qc.view_number(),
599 next_epoch_qc.data.leaf_commit,
600 qc.data().leaf_commit
601 );
602 return;
603 }
604 if let Some((transition_qc, _)) = &self.transition_qc {
605 if transition_qc.view_number() >= qc.view_number() {
606 return;
607 }
608 }
609 self.transition_qc = Some((qc, next_epoch_qc));
610 }
611
612 pub fn state_cert(&self) -> Option<&LightClientStateUpdateCertificate<TYPES>> {
614 self.state_cert.as_ref()
615 }
616
617 pub fn next_epoch_high_qc(&self) -> Option<&NextEpochQuorumCertificate2<TYPES>> {
619 self.next_epoch_high_qc.as_ref()
620 }
621
622 pub fn validated_state_map(&self) -> &BTreeMap<TYPES::View, View<TYPES>> {
624 &self.validated_state_map
625 }
626
627 pub fn saved_leaves(&self) -> &CommitmentMap<Leaf2<TYPES>> {
629 &self.saved_leaves
630 }
631
632 pub fn saved_payloads(&self) -> &BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>> {
634 &self.saved_payloads
635 }
636
637 pub fn vid_shares(&self) -> &VidShares<TYPES> {
639 &self.vid_shares
640 }
641
642 pub fn saved_da_certs(&self) -> &HashMap<TYPES::View, DaCertificate2<TYPES>> {
644 &self.saved_da_certs
645 }
646
647 pub fn last_proposals(
649 &self,
650 ) -> &BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
651 &self.last_proposals
652 }
653
654 pub fn update_view(&mut self, view_number: TYPES::View) -> Result<()> {
658 ensure!(
659 view_number > self.cur_view,
660 debug!("New view isn't newer than the current view.")
661 );
662 self.cur_view = view_number;
663 Ok(())
664 }
665
666 pub async fn parent_leaf_info(
669 &self,
670 leaf: &Leaf2<TYPES>,
671 public_key: &TYPES::SignatureKey,
672 ) -> Option<LeafInfo<TYPES>> {
673 let parent_view_number = leaf.justify_qc().view_number();
674 let parent_epoch = leaf.justify_qc().epoch();
675 let parent_leaf = self
676 .saved_leaves
677 .get(&leaf.justify_qc().data().leaf_commit)?;
678 let parent_state_and_delta = self.state_and_delta(parent_view_number);
679 let (Some(state), delta) = parent_state_and_delta else {
680 return None;
681 };
682
683 let parent_vid = self
684 .vid_shares()
685 .get(&parent_view_number)
686 .and_then(|key_map| key_map.get(public_key))
687 .and_then(|epoch_map| epoch_map.get(&parent_epoch))
688 .map(|prop| prop.data.clone());
689
690 let state_cert = if parent_leaf.with_epoch
691 && is_epoch_root(parent_leaf.block_header().block_number(), self.epoch_height)
692 {
693 match self.state_cert() {
694 Some(state_cert)
696 if state_cert.light_client_state.view_number == parent_view_number.u64() =>
697 {
698 Some(state_cert.clone())
699 },
700 _ => None,
701 }
702 } else {
703 None
704 };
705
706 Some(LeafInfo {
707 leaf: parent_leaf.clone(),
708 state,
709 delta,
710 vid_share: parent_vid,
711 state_cert,
712 })
713 }
714
715 pub fn update_epoch(&mut self, epoch_number: TYPES::Epoch) -> Result<()> {
719 ensure!(
720 self.cur_epoch.is_none() || Some(epoch_number) > self.cur_epoch,
721 debug!("New epoch isn't newer than the current epoch.")
722 );
723 tracing::trace!(
724 "Updating epoch from {:?} to {}",
725 self.cur_epoch,
726 epoch_number
727 );
728 self.cur_epoch = Some(epoch_number);
729 Ok(())
730 }
731
732 pub fn update_action(&mut self, action: HotShotAction, view: TYPES::View) -> bool {
736 let old_view = match action {
737 HotShotAction::Vote => &mut self.last_actions.voted,
738 HotShotAction::Propose => &mut self.last_actions.proposed,
739 HotShotAction::DaPropose => &mut self.last_actions.da_proposed,
740 HotShotAction::DaVote => {
741 if view > self.last_actions.da_vote {
742 self.last_actions.da_vote = view;
743 }
744 return true;
749 },
750 _ => return true,
751 };
752 if view > *old_view {
753 *old_view = view;
754 return true;
755 }
756 false
757 }
758
759 pub fn reset_actions(&mut self) {
761 self.last_actions = HotShotActionViews::default();
762 }
763
764 pub fn update_proposed_view(
769 &mut self,
770 proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
771 ) -> Result<()> {
772 ensure!(
773 proposal.data.view_number()
774 > self
775 .last_proposals
776 .last_key_value()
777 .map_or(TYPES::View::genesis(), |(k, _)| { *k }),
778 debug!("New view isn't newer than the previously proposed view.")
779 );
780 self.last_proposals
781 .insert(proposal.data.view_number(), proposal);
782 Ok(())
783 }
784
785 pub fn update_last_decided_view(&mut self, view_number: TYPES::View) -> Result<()> {
790 ensure!(
791 view_number > self.last_decided_view,
792 debug!("New view isn't newer than the previously decided view.")
793 );
794 self.last_decided_view = view_number;
795 Ok(())
796 }
797
798 pub fn update_locked_view(&mut self, view_number: TYPES::View) -> Result<()> {
803 ensure!(
804 view_number > self.locked_view,
805 debug!("New view isn't newer than the previously locked view.")
806 );
807 self.locked_view = view_number;
808 Ok(())
809 }
810
811 pub fn update_da_view(
817 &mut self,
818 view_number: TYPES::View,
819 epoch: Option<TYPES::Epoch>,
820 payload_commitment: VidCommitment,
821 ) -> Result<()> {
822 let view = View {
823 view_inner: ViewInner::Da {
824 payload_commitment,
825 epoch,
826 },
827 };
828 self.update_validated_state_map(view_number, view)
829 }
830
831 pub fn update_leaf(
837 &mut self,
838 leaf: Leaf2<TYPES>,
839 state: Arc<TYPES::ValidatedState>,
840 delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
841 ) -> Result<()> {
842 let view_number = leaf.view_number();
843 let epoch = option_epoch_from_block_number::<TYPES>(
844 leaf.with_epoch,
845 leaf.height(),
846 self.epoch_height,
847 );
848 let view = View {
849 view_inner: ViewInner::Leaf {
850 leaf: leaf.commit(),
851 state,
852 delta,
853 epoch,
854 },
855 };
856 self.update_validated_state_map(view_number, view)?;
857 self.update_saved_leaves(leaf);
858 Ok(())
859 }
860
861 fn update_validated_state_map(
867 &mut self,
868 view_number: TYPES::View,
869 new_view: View<TYPES>,
870 ) -> Result<()> {
871 if let Some(existing_view) = self.validated_state_map().get(&view_number) {
872 if let ViewInner::Leaf {
873 delta: ref existing_delta,
874 ..
875 } = existing_view.view_inner
876 {
877 if let ViewInner::Leaf {
878 delta: ref new_delta,
879 ..
880 } = new_view.view_inner
881 {
882 ensure!(
883 new_delta.is_some() || existing_delta.is_none(),
884 debug!("Skipping the state update to not override a `Leaf` view with `Some` state delta.")
885 );
886 } else {
887 bail!("Skipping the state update to not override a `Leaf` view with a non-`Leaf` view.");
888 }
889 }
890 }
891 self.validated_state_map.insert(view_number, new_view);
892 Ok(())
893 }
894
895 fn update_saved_leaves(&mut self, leaf: Leaf2<TYPES>) {
897 self.saved_leaves.insert(leaf.commit(), leaf);
898 }
899
900 pub fn update_saved_payloads(
905 &mut self,
906 view_number: TYPES::View,
907 payload: Arc<PayloadWithMetadata<TYPES>>,
908 ) -> Result<()> {
909 ensure!(
910 !self.saved_payloads.contains_key(&view_number),
911 "Payload with the same view already exists."
912 );
913 self.saved_payloads.insert(view_number, payload);
914 Ok(())
915 }
916
917 pub fn update_high_qc(&mut self, high_qc: QuorumCertificate2<TYPES>) -> Result<()> {
921 if self.high_qc == high_qc {
922 return Ok(());
923 }
924 ensure!(
926 high_qc.view_number > self.high_qc.view_number,
927 debug!("High QC with an equal or higher view exists.")
928 );
929 tracing::debug!("Updating high QC");
930 self.high_qc = high_qc;
931
932 Ok(())
933 }
934
935 pub fn update_next_epoch_high_qc(
941 &mut self,
942 high_qc: NextEpochQuorumCertificate2<TYPES>,
943 ) -> Result<()> {
944 if self.next_epoch_high_qc.as_ref() == Some(&high_qc) {
945 return Ok(());
946 }
947 if let Some(next_epoch_high_qc) = self.next_epoch_high_qc() {
948 ensure!(
949 high_qc.view_number > next_epoch_high_qc.view_number,
950 debug!("Next epoch high QC with an equal or higher view exists.")
951 );
952 }
953 tracing::debug!("Updating next epoch high QC");
954 self.next_epoch_high_qc = Some(high_qc);
955
956 Ok(())
957 }
958
959 pub fn reset_high_qc(
963 &mut self,
964 high_qc: QuorumCertificate2<TYPES>,
965 next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
966 ) -> Result<()> {
967 ensure!(
968 high_qc.data.leaf_commit == next_epoch_qc.data.leaf_commit,
969 error!("High QC's and next epoch QC's leaf commits do not match.")
970 );
971 if self.high_qc == high_qc {
972 return Ok(());
973 }
974 let same_epoch = high_qc.data.block_number.is_some_and(|bn| {
975 let current_qc = self.high_qc();
976 let Some(high_bn) = current_qc.data.block_number else {
977 return false;
978 };
979 epoch_from_block_number(bn + 1, self.epoch_height)
980 == epoch_from_block_number(high_bn + 1, self.epoch_height)
981 });
982 ensure!(
983 high_qc
984 .data
985 .block_number
986 .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
987 && same_epoch,
988 error!("Provided QC is not a transition QC.")
989 );
990 tracing::debug!("Resetting high QC and next epoch high QC");
991 self.high_qc = high_qc;
992 self.next_epoch_high_qc = Some(next_epoch_qc);
993
994 Ok(())
995 }
996
997 pub fn update_state_cert(
1001 &mut self,
1002 state_cert: LightClientStateUpdateCertificate<TYPES>,
1003 ) -> Result<()> {
1004 if let Some(existing_state_cert) = &self.state_cert {
1005 ensure!(
1006 state_cert.epoch > existing_state_cert.epoch,
1007 debug!(
1008 "Light client state update certification with an equal or higher epoch exists."
1009 )
1010 );
1011 }
1012 tracing::debug!("Updating light client state update certification");
1013 self.state_cert = Some(state_cert);
1014
1015 Ok(())
1016 }
1017
1018 pub fn update_vid_shares(
1020 &mut self,
1021 view_number: TYPES::View,
1022 disperse: Proposal<TYPES, VidDisperseShare<TYPES>>,
1023 ) {
1024 self.vid_shares
1025 .entry(view_number)
1026 .or_default()
1027 .entry(disperse.data.recipient_key().clone())
1028 .or_default()
1029 .insert(disperse.data.target_epoch(), disperse);
1030 }
1031
1032 pub fn update_saved_da_certs(&mut self, view_number: TYPES::View, cert: DaCertificate2<TYPES>) {
1034 self.saved_da_certs.insert(view_number, cert);
1035 }
1036
1037 pub fn visit_leaf_ancestors<F>(
1041 &self,
1042 start_from: TYPES::View,
1043 terminator: Terminator<TYPES::View>,
1044 ok_when_finished: bool,
1045 mut f: F,
1046 ) -> std::result::Result<(), HotShotError<TYPES>>
1047 where
1048 F: FnMut(
1049 &Leaf2<TYPES>,
1050 Arc<<TYPES as NodeType>::ValidatedState>,
1051 Option<Arc<<<TYPES as NodeType>::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1052 ) -> bool,
1053 {
1054 let mut next_leaf = if let Some(view) = self.validated_state_map.get(&start_from) {
1055 view.leaf_commitment().ok_or_else(|| {
1056 HotShotError::InvalidState(format!(
1057 "Visited failed view {start_from} leaf. Expected successful leaf"
1058 ))
1059 })?
1060 } else {
1061 return Err(HotShotError::InvalidState(format!(
1062 "View {start_from} leaf does not exist in state map "
1063 )));
1064 };
1065
1066 while let Some(leaf) = self.saved_leaves.get(&next_leaf) {
1067 let view = leaf.view_number();
1068 if let (Some(state), delta) = self.state_and_delta(view) {
1069 if let Terminator::Exclusive(stop_before) = terminator {
1070 if stop_before == view {
1071 if ok_when_finished {
1072 return Ok(());
1073 }
1074 break;
1075 }
1076 }
1077 next_leaf = leaf.parent_commitment();
1078 if !f(leaf, state, delta) {
1079 return Ok(());
1080 }
1081 if let Terminator::Inclusive(stop_after) = terminator {
1082 if stop_after == view {
1083 if ok_when_finished {
1084 return Ok(());
1085 }
1086 break;
1087 }
1088 }
1089 } else {
1090 return Err(HotShotError::InvalidState(format!(
1091 "View {view} state does not exist in state map"
1092 )));
1093 }
1094 }
1095 Err(HotShotError::MissingLeaf(next_leaf))
1096 }
1097
1098 pub fn collect_garbage(&mut self, old_anchor_view: TYPES::View, new_anchor_view: TYPES::View) {
1103 if new_anchor_view <= old_anchor_view {
1105 return;
1106 }
1107 let gc_view = TYPES::View::new(new_anchor_view.saturating_sub(1));
1108 let anchor_entry = self
1110 .validated_state_map
1111 .iter()
1112 .next()
1113 .expect("INCONSISTENT STATE: anchor leaf not in state map!");
1114 if **anchor_entry.0 != old_anchor_view.saturating_sub(1) {
1115 tracing::info!(
1116 "Something about GC has failed. Older leaf exists than the previous anchor leaf."
1117 );
1118 }
1119 self.saved_da_certs
1121 .retain(|view_number, _| *view_number >= old_anchor_view);
1122 self.validated_state_map
1123 .range(..gc_view)
1124 .filter_map(|(_view_number, view)| view.leaf_commitment())
1125 .for_each(|leaf| {
1126 self.saved_leaves.remove(&leaf);
1127 });
1128 self.validated_state_map = self.validated_state_map.split_off(&gc_view);
1129 self.saved_payloads = self.saved_payloads.split_off(&gc_view);
1130 self.vid_shares = self.vid_shares.split_off(&gc_view);
1131 self.last_proposals = self.last_proposals.split_off(&gc_view);
1132 }
1133
1134 #[must_use]
1140 pub fn decided_leaf(&self) -> Leaf2<TYPES> {
1141 let decided_view_num = self.last_decided_view;
1142 let view = self.validated_state_map.get(&decided_view_num).unwrap();
1143 let leaf = view
1144 .leaf_commitment()
1145 .expect("Decided leaf not found! Consensus internally inconsistent");
1146 self.saved_leaves.get(&leaf).unwrap().clone()
1147 }
1148
1149 pub fn undecided_leaves(&self) -> Vec<Leaf2<TYPES>> {
1150 self.saved_leaves.values().cloned().collect::<Vec<_>>()
1151 }
1152
1153 #[must_use]
1155 pub fn state(&self, view_number: TYPES::View) -> Option<&Arc<TYPES::ValidatedState>> {
1156 match self.validated_state_map.get(&view_number) {
1157 Some(view) => view.state(),
1158 None => None,
1159 }
1160 }
1161
1162 #[must_use]
1164 pub fn state_and_delta(&self, view_number: TYPES::View) -> StateAndDelta<TYPES> {
1165 match self.validated_state_map.get(&view_number) {
1166 Some(view) => view.state_and_delta(),
1167 None => (None, None),
1168 }
1169 }
1170
1171 #[must_use]
1177 pub fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
1178 let decided_view_num = self.last_decided_view;
1179 self.state_and_delta(decided_view_num)
1180 .0
1181 .expect("Decided state not found! Consensus internally inconsistent")
1182 }
1183
1184 #[instrument(skip_all, target = "Consensus", fields(view = *view))]
1190 pub async fn calculate_and_update_vid<V: Versions>(
1191 consensus: OuterConsensus<TYPES>,
1192 view: <TYPES as NodeType>::View,
1193 target_epoch: Option<<TYPES as NodeType>::Epoch>,
1194 membership_coordinator: EpochMembershipCoordinator<TYPES>,
1195 private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
1196 upgrade_lock: &UpgradeLock<TYPES, V>,
1197 ) -> Option<()> {
1198 let payload_with_metadata = Arc::clone(consensus.read().await.saved_payloads().get(&view)?);
1199 let epoch = consensus
1200 .read()
1201 .await
1202 .validated_state_map()
1203 .get(&view)?
1204 .view_inner
1205 .epoch()?;
1206
1207 let VidDisperseAndDuration {
1208 disperse: vid,
1209 duration: disperse_duration,
1210 } = VidDisperse::calculate_vid_disperse::<V>(
1211 &payload_with_metadata.payload,
1212 &membership_coordinator,
1213 view,
1214 target_epoch,
1215 epoch,
1216 &payload_with_metadata.metadata,
1217 upgrade_lock,
1218 )
1219 .await
1220 .ok()?;
1221
1222 let shares = VidDisperseShare::from_vid_disperse(vid);
1223 let mut consensus_writer = consensus.write().await;
1224 consensus_writer
1225 .metrics
1226 .vid_disperse_duration
1227 .add_point(disperse_duration.as_secs_f64());
1228 for share in shares {
1229 if let Some(prop) = share.to_proposal(private_key) {
1230 consensus_writer.update_vid_shares(view, prop);
1231 }
1232 }
1233
1234 Some(())
1235 }
1236 pub fn is_epoch_transition(&self, leaf_commit: LeafCommitment<TYPES>) -> bool {
1238 let Some(leaf) = self.saved_leaves.get(&leaf_commit) else {
1239 tracing::trace!("We don't have a leaf corresponding to the leaf commit");
1240 return false;
1241 };
1242 let block_height = leaf.height();
1243 is_epoch_transition(block_height, self.epoch_height)
1244 }
1245
1246 pub fn is_high_qc_for_epoch_transition(&self) -> bool {
1248 let Some(block_height) = self.high_qc().data.block_number else {
1249 return false;
1250 };
1251 is_epoch_transition(block_height, self.epoch_height)
1252 }
1253
1254 pub fn check_eqc(&self, proposed_leaf: &Leaf2<TYPES>, parent_leaf: &Leaf2<TYPES>) -> bool {
1256 if parent_leaf.view_number() == TYPES::View::genesis() {
1257 return true;
1258 }
1259 let new_epoch = epoch_from_block_number(proposed_leaf.height(), self.epoch_height);
1260 let old_epoch = epoch_from_block_number(parent_leaf.height(), self.epoch_height);
1261
1262 new_epoch - 1 == old_epoch && is_last_block(parent_leaf.height(), self.epoch_height)
1263 }
1264
1265 pub fn is_high_qc_ge_root_block(&self) -> bool {
1267 let Some(leaf) = self.saved_leaves.get(&self.high_qc().data.leaf_commit) else {
1268 tracing::trace!("We don't have a leaf corresponding to the high QC");
1269 return false;
1270 };
1271 let block_height = leaf.height();
1272 is_ge_epoch_root(block_height, self.epoch_height)
1273 }
1274}
1275
1276#[derive(Eq, PartialEq, Debug, Clone)]
1279pub struct CommitmentAndMetadata<TYPES: NodeType> {
1280 pub commitment: VidCommitment,
1282 pub builder_commitment: BuilderCommitment,
1284 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
1286 pub fees: Vec1<BuilderFee<TYPES>>,
1288 pub block_view: TYPES::View,
1290}