1use std::{
10 collections::{BTreeMap, HashMap},
11 mem::ManuallyDrop,
12 ops::{Deref, DerefMut},
13 sync::Arc,
14};
15
16use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
17use committable::{Commitment, Committable};
18use hotshot_utils::anytrace::*;
19use tracing::instrument;
20use vec1::Vec1;
21
22pub use crate::utils::{View, ViewInner};
23use crate::{
24 data::{Leaf2, QuorumProposalWrapper, VidCommitment, VidDisperse, VidDisperseShare},
25 drb::DrbResults,
26 epoch_membership::EpochMembershipCoordinator,
27 error::HotShotError,
28 event::{HotShotAction, LeafInfo},
29 message::{Proposal, UpgradeLock},
30 simple_certificate::{
31 DaCertificate2, LightClientStateUpdateCertificate, NextEpochQuorumCertificate2,
32 QuorumCertificate2,
33 },
34 simple_vote::HasEpoch,
35 traits::{
36 block_contents::{BlockHeader, BuilderFee},
37 metrics::{Counter, Gauge, Histogram, Metrics, NoMetrics},
38 node_implementation::{ConsensusTime, NodeType, Versions},
39 signature_key::SignatureKey,
40 BlockPayload, ValidatedState,
41 },
42 utils::{
43 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_ge_epoch_root,
44 is_last_block, is_transition_block, option_epoch_from_block_number, BuilderCommitment,
45 LeafCommitment, StateAndDelta, Terminator,
46 },
47 vote::{Certificate, HasViewNumber},
48};
49
50pub type CommitmentMap<T> = HashMap<Commitment<T>, T>;
52
53pub type VidShares<TYPES> = BTreeMap<
55 <TYPES as NodeType>::View,
56 HashMap<
57 <TYPES as NodeType>::SignatureKey,
58 BTreeMap<Option<<TYPES as NodeType>::Epoch>, Proposal<TYPES, VidDisperseShare<TYPES>>>,
59 >,
60>;
61
62pub type LockedConsensusState<TYPES> = Arc<RwLock<Consensus<TYPES>>>;
64
65#[derive(Clone, Debug)]
67pub struct OuterConsensus<TYPES: NodeType> {
68 pub inner_consensus: LockedConsensusState<TYPES>,
70}
71
72impl<TYPES: NodeType> OuterConsensus<TYPES> {
73 pub fn new(consensus: LockedConsensusState<TYPES>) -> Self {
75 Self {
76 inner_consensus: consensus,
77 }
78 }
79
80 #[instrument(skip_all, target = "OuterConsensus")]
82 pub async fn read(&self) -> ConsensusReadLockGuard<'_, TYPES> {
83 tracing::trace!("Trying to acquire read lock on consensus");
84 let ret = self.inner_consensus.read().await;
85 tracing::trace!("Acquired read lock on consensus");
86 ConsensusReadLockGuard::new(ret)
87 }
88
89 #[instrument(skip_all, target = "OuterConsensus")]
91 pub async fn write(&self) -> ConsensusWriteLockGuard<'_, TYPES> {
92 tracing::trace!("Trying to acquire write lock on consensus");
93 let ret = self.inner_consensus.write().await;
94 tracing::trace!("Acquired write lock on consensus");
95 ConsensusWriteLockGuard::new(ret)
96 }
97
98 #[instrument(skip_all, target = "OuterConsensus")]
100 pub fn try_write(&self) -> Option<ConsensusWriteLockGuard<'_, TYPES>> {
101 tracing::trace!("Trying to acquire write lock on consensus");
102 let ret = self.inner_consensus.try_write();
103 if let Some(guard) = ret {
104 tracing::trace!("Acquired write lock on consensus");
105 Some(ConsensusWriteLockGuard::new(guard))
106 } else {
107 tracing::trace!("Failed to acquire write lock");
108 None
109 }
110 }
111
112 #[instrument(skip_all, target = "OuterConsensus")]
114 pub async fn upgradable_read(&self) -> ConsensusUpgradableReadLockGuard<'_, TYPES> {
115 tracing::trace!("Trying to acquire upgradable read lock on consensus");
116 let ret = self.inner_consensus.upgradable_read().await;
117 tracing::trace!("Acquired upgradable read lock on consensus");
118 ConsensusUpgradableReadLockGuard::new(ret)
119 }
120
121 #[instrument(skip_all, target = "OuterConsensus")]
123 pub fn try_read(&self) -> Option<ConsensusReadLockGuard<'_, TYPES>> {
124 tracing::trace!("Trying to acquire read lock on consensus");
125 let ret = self.inner_consensus.try_read();
126 if let Some(guard) = ret {
127 tracing::trace!("Acquired read lock on consensus");
128 Some(ConsensusReadLockGuard::new(guard))
129 } else {
130 tracing::trace!("Failed to acquire read lock");
131 None
132 }
133 }
134}
135
136pub struct ConsensusReadLockGuard<'a, TYPES: NodeType> {
138 lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>,
140}
141
142impl<'a, TYPES: NodeType> ConsensusReadLockGuard<'a, TYPES> {
143 #[must_use]
145 pub fn new(lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>) -> Self {
146 Self { lock_guard }
147 }
148}
149
150impl<TYPES: NodeType> Deref for ConsensusReadLockGuard<'_, TYPES> {
151 type Target = Consensus<TYPES>;
152 fn deref(&self) -> &Self::Target {
153 &self.lock_guard
154 }
155}
156
157impl<TYPES: NodeType> Drop for ConsensusReadLockGuard<'_, TYPES> {
158 #[instrument(skip_all, target = "ConsensusReadLockGuard")]
159 fn drop(&mut self) {
160 tracing::trace!("Read lock on consensus dropped");
161 }
162}
163
164pub struct ConsensusWriteLockGuard<'a, TYPES: NodeType> {
166 lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>,
168}
169
170impl<'a, TYPES: NodeType> ConsensusWriteLockGuard<'a, TYPES> {
171 #[must_use]
173 pub fn new(lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>) -> Self {
174 Self { lock_guard }
175 }
176}
177
178impl<TYPES: NodeType> Deref for ConsensusWriteLockGuard<'_, TYPES> {
179 type Target = Consensus<TYPES>;
180 fn deref(&self) -> &Self::Target {
181 &self.lock_guard
182 }
183}
184
185impl<TYPES: NodeType> DerefMut for ConsensusWriteLockGuard<'_, TYPES> {
186 fn deref_mut(&mut self) -> &mut Self::Target {
187 &mut self.lock_guard
188 }
189}
190
191impl<TYPES: NodeType> Drop for ConsensusWriteLockGuard<'_, TYPES> {
192 #[instrument(skip_all, target = "ConsensusWriteLockGuard")]
193 fn drop(&mut self) {
194 tracing::debug!("Write lock on consensus dropped");
195 }
196}
197
198pub struct ConsensusUpgradableReadLockGuard<'a, TYPES: NodeType> {
200 lock_guard: ManuallyDrop<RwLockUpgradableReadGuard<'a, Consensus<TYPES>>>,
202 taken: bool,
204}
205
206impl<'a, TYPES: NodeType> ConsensusUpgradableReadLockGuard<'a, TYPES> {
207 #[must_use]
209 pub fn new(lock_guard: RwLockUpgradableReadGuard<'a, Consensus<TYPES>>) -> Self {
210 Self {
211 lock_guard: ManuallyDrop::new(lock_guard),
212 taken: false,
213 }
214 }
215
216 #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
218 pub async fn upgrade(mut guard: Self) -> ConsensusWriteLockGuard<'a, TYPES> {
219 let inner_guard = unsafe { ManuallyDrop::take(&mut guard.lock_guard) };
220 guard.taken = true;
221 tracing::debug!("Trying to upgrade upgradable read lock on consensus");
222 let ret = RwLockUpgradableReadGuard::upgrade(inner_guard).await;
223 tracing::debug!("Upgraded upgradable read lock on consensus");
224 ConsensusWriteLockGuard::new(ret)
225 }
226}
227
228impl<TYPES: NodeType> Deref for ConsensusUpgradableReadLockGuard<'_, TYPES> {
229 type Target = Consensus<TYPES>;
230
231 fn deref(&self) -> &Self::Target {
232 &self.lock_guard
233 }
234}
235
236impl<TYPES: NodeType> Drop for ConsensusUpgradableReadLockGuard<'_, TYPES> {
237 #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
238 fn drop(&mut self) {
239 if !self.taken {
240 unsafe { ManuallyDrop::drop(&mut self.lock_guard) }
241 tracing::debug!("Upgradable read lock on consensus dropped");
242 }
243 }
244}
245
246#[derive(Debug, Clone, Copy)]
248struct HotShotActionViews<T: ConsensusTime> {
249 proposed: T,
251 voted: T,
253 da_proposed: T,
255 da_vote: T,
257}
258
259impl<T: ConsensusTime> Default for HotShotActionViews<T> {
260 fn default() -> Self {
261 let genesis = T::genesis();
262 Self {
263 proposed: genesis,
264 voted: genesis,
265 da_proposed: genesis,
266 da_vote: genesis,
267 }
268 }
269}
270impl<T: ConsensusTime> HotShotActionViews<T> {
271 fn from_view(view: T) -> Self {
273 Self {
274 proposed: view,
275 voted: view,
276 da_proposed: view,
277 da_vote: view,
278 }
279 }
280}
281#[derive(derive_more::Debug, Clone)]
285pub struct Consensus<TYPES: NodeType> {
286 validated_state_map: BTreeMap<TYPES::View, View<TYPES>>,
288
289 vid_shares: VidShares<TYPES>,
291
292 saved_da_certs: HashMap<TYPES::View, DaCertificate2<TYPES>>,
295
296 cur_view: TYPES::View,
298
299 cur_epoch: Option<TYPES::Epoch>,
301
302 last_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
305
306 last_decided_view: TYPES::View,
308
309 locked_view: TYPES::View,
311
312 saved_leaves: CommitmentMap<Leaf2<TYPES>>,
316
317 last_actions: HotShotActionViews<TYPES::View>,
321
322 saved_payloads: BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>>,
326
327 high_qc: QuorumCertificate2<TYPES>,
329
330 next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
332
333 pub metrics: Arc<ConsensusMetricsValue>,
335
336 pub epoch_height: u64,
338
339 pub drb_results: DrbResults<TYPES>,
341
342 transition_qc: Option<(
344 QuorumCertificate2<TYPES>,
345 NextEpochQuorumCertificate2<TYPES>,
346 )>,
347
348 pub highest_block: u64,
350 pub state_cert: Option<LightClientStateUpdateCertificate<TYPES>>,
352}
353
354#[derive(Debug, Clone, Hash, Eq, PartialEq)]
356pub struct PayloadWithMetadata<TYPES: NodeType> {
357 pub payload: TYPES::BlockPayload,
358 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
359}
360
361#[derive(Clone, Debug)]
363pub struct ConsensusMetricsValue {
364 pub last_synced_block_height: Box<dyn Gauge>,
366 pub last_decided_view: Box<dyn Gauge>,
368 pub last_voted_view: Box<dyn Gauge>,
370 pub last_decided_time: Box<dyn Gauge>,
372 pub current_view: Box<dyn Gauge>,
374 pub number_of_views_since_last_decide: Box<dyn Gauge>,
376 pub number_of_views_per_decide_event: Box<dyn Histogram>,
378 pub view_duration_as_leader: Box<dyn Histogram>,
380 pub invalid_qc: Box<dyn Gauge>,
382 pub outstanding_transactions: Box<dyn Gauge>,
384 pub outstanding_transactions_memory_size: Box<dyn Gauge>,
386 pub number_of_timeouts: Box<dyn Counter>,
388 pub number_of_timeouts_as_leader: Box<dyn Counter>,
390 pub number_of_empty_blocks_proposed: Box<dyn Counter>,
392 pub internal_event_queue_len: Box<dyn Gauge>,
394}
395
396impl ConsensusMetricsValue {
397 #[must_use]
399 pub fn new(metrics: &dyn Metrics) -> Self {
400 Self {
401 last_synced_block_height: metrics
402 .create_gauge(String::from("last_synced_block_height"), None),
403 last_decided_view: metrics.create_gauge(String::from("last_decided_view"), None),
404 last_voted_view: metrics.create_gauge(String::from("last_voted_view"), None),
405 last_decided_time: metrics.create_gauge(String::from("last_decided_time"), None),
406 current_view: metrics.create_gauge(String::from("current_view"), None),
407 number_of_views_since_last_decide: metrics
408 .create_gauge(String::from("number_of_views_since_last_decide"), None),
409 number_of_views_per_decide_event: metrics
410 .create_histogram(String::from("number_of_views_per_decide_event"), None),
411 view_duration_as_leader: metrics
412 .create_histogram(String::from("view_duration_as_leader"), None),
413 invalid_qc: metrics.create_gauge(String::from("invalid_qc"), None),
414 outstanding_transactions: metrics
415 .create_gauge(String::from("outstanding_transactions"), None),
416 outstanding_transactions_memory_size: metrics
417 .create_gauge(String::from("outstanding_transactions_memory_size"), None),
418 number_of_timeouts: metrics.create_counter(String::from("number_of_timeouts"), None),
419 number_of_timeouts_as_leader: metrics
420 .create_counter(String::from("number_of_timeouts_as_leader"), None),
421 number_of_empty_blocks_proposed: metrics
422 .create_counter(String::from("number_of_empty_blocks_proposed"), None),
423 internal_event_queue_len: metrics
424 .create_gauge(String::from("internal_event_queue_len"), None),
425 }
426 }
427}
428
429impl Default for ConsensusMetricsValue {
430 fn default() -> Self {
431 Self::new(&*NoMetrics::boxed())
432 }
433}
434
435impl<TYPES: NodeType> Consensus<TYPES> {
436 #[allow(clippy::too_many_arguments)]
438 pub fn new(
439 validated_state_map: BTreeMap<TYPES::View, View<TYPES>>,
440 vid_shares: Option<VidShares<TYPES>>,
441 cur_view: TYPES::View,
442 cur_epoch: Option<TYPES::Epoch>,
443 locked_view: TYPES::View,
444 last_decided_view: TYPES::View,
445 last_actioned_view: TYPES::View,
446 last_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
447 saved_leaves: CommitmentMap<Leaf2<TYPES>>,
448 saved_payloads: BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>>,
449 high_qc: QuorumCertificate2<TYPES>,
450 next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
451 metrics: Arc<ConsensusMetricsValue>,
452 epoch_height: u64,
453 state_cert: Option<LightClientStateUpdateCertificate<TYPES>>,
454 ) -> Self {
455 let transition_qc = if let Some(ref next_epoch_high_qc) = next_epoch_high_qc {
456 if high_qc
457 .data
458 .block_number
459 .is_some_and(|bn| is_transition_block(bn, epoch_height))
460 {
461 if high_qc.data.leaf_commit == next_epoch_high_qc.data.leaf_commit {
462 Some((high_qc.clone(), next_epoch_high_qc.clone()))
463 } else {
464 tracing::error!("Next epoch high QC has different leaf commit to high QC");
465 None
466 }
467 } else {
468 None
469 }
470 } else {
471 None
472 };
473 Consensus {
474 validated_state_map,
475 vid_shares: vid_shares.unwrap_or_default(),
476 saved_da_certs: HashMap::new(),
477 cur_view,
478 cur_epoch,
479 last_decided_view,
480 last_proposals,
481 last_actions: HotShotActionViews::from_view(last_actioned_view),
482 locked_view,
483 saved_leaves,
484 saved_payloads,
485 high_qc,
486 next_epoch_high_qc,
487 metrics,
488 epoch_height,
489 drb_results: DrbResults::new(),
490 transition_qc,
491 highest_block: 0,
492 state_cert,
493 }
494 }
495
496 pub fn cur_view(&self) -> TYPES::View {
498 self.cur_view
499 }
500
501 pub fn cur_epoch(&self) -> Option<TYPES::Epoch> {
503 self.cur_epoch
504 }
505
506 pub fn last_decided_view(&self) -> TYPES::View {
508 self.last_decided_view
509 }
510
511 pub fn locked_view(&self) -> TYPES::View {
513 self.locked_view
514 }
515
516 pub fn high_qc(&self) -> &QuorumCertificate2<TYPES> {
518 &self.high_qc
519 }
520
521 pub fn transition_qc(
523 &self,
524 ) -> Option<&(
525 QuorumCertificate2<TYPES>,
526 NextEpochQuorumCertificate2<TYPES>,
527 )> {
528 self.transition_qc.as_ref()
529 }
530
531 pub fn update_highest_block(&mut self, block_number: u64) {
533 if block_number > self.highest_block {
534 self.highest_block = block_number;
535 return;
536 }
537
538 if is_epoch_transition(block_number, self.epoch_height) {
539 let new_epoch = epoch_from_block_number(block_number, self.epoch_height);
540 let high_epoch = epoch_from_block_number(self.highest_block, self.epoch_height);
541 if new_epoch >= high_epoch {
542 self.highest_block = block_number;
543 }
544 }
545 }
546
547 pub fn update_transition_qc(
549 &mut self,
550 qc: QuorumCertificate2<TYPES>,
551 next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
552 ) {
553 if next_epoch_qc.data.leaf_commit != qc.data().leaf_commit {
554 tracing::error!(
555 "Next epoch QC for view {:?} has different leaf commit {:?} to {:?}",
556 qc.view_number(),
557 next_epoch_qc.data.leaf_commit,
558 qc.data().leaf_commit
559 );
560 return;
561 }
562 if let Some((transition_qc, _)) = &self.transition_qc {
563 if transition_qc.view_number() >= qc.view_number() {
564 return;
565 }
566 }
567 self.transition_qc = Some((qc, next_epoch_qc));
568 }
569
570 pub fn state_cert(&self) -> Option<&LightClientStateUpdateCertificate<TYPES>> {
572 self.state_cert.as_ref()
573 }
574
575 pub fn next_epoch_high_qc(&self) -> Option<&NextEpochQuorumCertificate2<TYPES>> {
577 self.next_epoch_high_qc.as_ref()
578 }
579
580 pub fn validated_state_map(&self) -> &BTreeMap<TYPES::View, View<TYPES>> {
582 &self.validated_state_map
583 }
584
585 pub fn saved_leaves(&self) -> &CommitmentMap<Leaf2<TYPES>> {
587 &self.saved_leaves
588 }
589
590 pub fn saved_payloads(&self) -> &BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>> {
592 &self.saved_payloads
593 }
594
595 pub fn vid_shares(&self) -> &VidShares<TYPES> {
597 &self.vid_shares
598 }
599
600 pub fn saved_da_certs(&self) -> &HashMap<TYPES::View, DaCertificate2<TYPES>> {
602 &self.saved_da_certs
603 }
604
605 pub fn last_proposals(
607 &self,
608 ) -> &BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
609 &self.last_proposals
610 }
611
612 pub fn update_view(&mut self, view_number: TYPES::View) -> Result<()> {
616 ensure!(
617 view_number > self.cur_view,
618 debug!("New view isn't newer than the current view.")
619 );
620 self.cur_view = view_number;
621 Ok(())
622 }
623
624 pub async fn parent_leaf_info(
627 &self,
628 leaf: &Leaf2<TYPES>,
629 public_key: &TYPES::SignatureKey,
630 ) -> Option<LeafInfo<TYPES>> {
631 let parent_view_number = leaf.justify_qc().view_number();
632 let parent_epoch = leaf.justify_qc().epoch();
633 let parent_leaf = self
634 .saved_leaves
635 .get(&leaf.justify_qc().data().leaf_commit)?;
636 let parent_state_and_delta = self.state_and_delta(parent_view_number);
637 let (Some(state), delta) = parent_state_and_delta else {
638 return None;
639 };
640
641 let parent_vid = self
642 .vid_shares()
643 .get(&parent_view_number)
644 .and_then(|key_map| key_map.get(public_key).cloned())
645 .and_then(|epoch_map| epoch_map.get(&parent_epoch).cloned())
646 .map(|prop| prop.data);
647
648 let state_cert = if parent_leaf.with_epoch
649 && is_epoch_root(parent_leaf.block_header().block_number(), self.epoch_height)
650 {
651 match self.state_cert() {
652 Some(state_cert)
654 if state_cert.light_client_state.view_number == parent_view_number.u64() =>
655 {
656 Some(state_cert.clone())
657 },
658 _ => None,
659 }
660 } else {
661 None
662 };
663
664 Some(LeafInfo {
665 leaf: parent_leaf.clone(),
666 state,
667 delta,
668 vid_share: parent_vid,
669 state_cert,
670 })
671 }
672
673 pub fn update_epoch(&mut self, epoch_number: TYPES::Epoch) -> Result<()> {
677 ensure!(
678 self.cur_epoch.is_none() || Some(epoch_number) > self.cur_epoch,
679 debug!("New epoch isn't newer than the current epoch.")
680 );
681 tracing::trace!(
682 "Updating epoch from {:?} to {}",
683 self.cur_epoch,
684 epoch_number
685 );
686 self.cur_epoch = Some(epoch_number);
687 Ok(())
688 }
689
690 pub fn update_action(&mut self, action: HotShotAction, view: TYPES::View) -> bool {
694 let old_view = match action {
695 HotShotAction::Vote => &mut self.last_actions.voted,
696 HotShotAction::Propose => &mut self.last_actions.proposed,
697 HotShotAction::DaPropose => &mut self.last_actions.da_proposed,
698 HotShotAction::DaVote => {
699 if view > self.last_actions.da_vote {
700 self.last_actions.da_vote = view;
701 }
702 return true;
707 },
708 _ => return true,
709 };
710 if view > *old_view {
711 *old_view = view;
712 return true;
713 }
714 false
715 }
716
717 pub fn reset_actions(&mut self) {
719 self.last_actions = HotShotActionViews::default();
720 }
721
722 pub fn update_proposed_view(
727 &mut self,
728 proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
729 ) -> Result<()> {
730 ensure!(
731 proposal.data.view_number()
732 > self
733 .last_proposals
734 .last_key_value()
735 .map_or(TYPES::View::genesis(), |(k, _)| { *k }),
736 debug!("New view isn't newer than the previously proposed view.")
737 );
738 self.last_proposals
739 .insert(proposal.data.view_number(), proposal);
740 Ok(())
741 }
742
743 pub fn update_last_decided_view(&mut self, view_number: TYPES::View) -> Result<()> {
748 ensure!(
749 view_number > self.last_decided_view,
750 debug!("New view isn't newer than the previously decided view.")
751 );
752 self.last_decided_view = view_number;
753 Ok(())
754 }
755
756 pub fn update_locked_view(&mut self, view_number: TYPES::View) -> Result<()> {
761 ensure!(
762 view_number > self.locked_view,
763 debug!("New view isn't newer than the previously locked view.")
764 );
765 self.locked_view = view_number;
766 Ok(())
767 }
768
769 pub fn update_da_view(
775 &mut self,
776 view_number: TYPES::View,
777 epoch: Option<TYPES::Epoch>,
778 payload_commitment: VidCommitment,
779 ) -> Result<()> {
780 let view = View {
781 view_inner: ViewInner::Da {
782 payload_commitment,
783 epoch,
784 },
785 };
786 self.update_validated_state_map(view_number, view)
787 }
788
789 pub fn update_leaf(
795 &mut self,
796 leaf: Leaf2<TYPES>,
797 state: Arc<TYPES::ValidatedState>,
798 delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
799 ) -> Result<()> {
800 let view_number = leaf.view_number();
801 let epoch = option_epoch_from_block_number::<TYPES>(
802 leaf.with_epoch,
803 leaf.height(),
804 self.epoch_height,
805 );
806 let view = View {
807 view_inner: ViewInner::Leaf {
808 leaf: leaf.commit(),
809 state,
810 delta,
811 epoch,
812 },
813 };
814 self.update_validated_state_map(view_number, view)?;
815 self.update_saved_leaves(leaf);
816 Ok(())
817 }
818
819 fn update_validated_state_map(
825 &mut self,
826 view_number: TYPES::View,
827 new_view: View<TYPES>,
828 ) -> Result<()> {
829 if let Some(existing_view) = self.validated_state_map().get(&view_number) {
830 if let ViewInner::Leaf {
831 delta: ref existing_delta,
832 ..
833 } = existing_view.view_inner
834 {
835 if let ViewInner::Leaf {
836 delta: ref new_delta,
837 ..
838 } = new_view.view_inner
839 {
840 ensure!(
841 new_delta.is_some() || existing_delta.is_none(),
842 debug!("Skipping the state update to not override a `Leaf` view with `Some` state delta.")
843 );
844 } else {
845 bail!("Skipping the state update to not override a `Leaf` view with a non-`Leaf` view.");
846 }
847 }
848 }
849 self.validated_state_map.insert(view_number, new_view);
850 Ok(())
851 }
852
853 fn update_saved_leaves(&mut self, leaf: Leaf2<TYPES>) {
855 self.saved_leaves.insert(leaf.commit(), leaf);
856 }
857
858 pub fn update_saved_payloads(
863 &mut self,
864 view_number: TYPES::View,
865 payload: Arc<PayloadWithMetadata<TYPES>>,
866 ) -> Result<()> {
867 ensure!(
868 !self.saved_payloads.contains_key(&view_number),
869 "Payload with the same view already exists."
870 );
871 self.saved_payloads.insert(view_number, payload);
872 Ok(())
873 }
874
875 pub fn update_high_qc(&mut self, high_qc: QuorumCertificate2<TYPES>) -> Result<()> {
879 if self.high_qc == high_qc {
880 return Ok(());
881 }
882 ensure!(
884 high_qc.view_number > self.high_qc.view_number,
885 debug!("High QC with an equal or higher view exists.")
886 );
887 tracing::debug!("Updating high QC");
888 self.high_qc = high_qc;
889
890 Ok(())
891 }
892
893 pub fn update_next_epoch_high_qc(
899 &mut self,
900 high_qc: NextEpochQuorumCertificate2<TYPES>,
901 ) -> Result<()> {
902 if self.next_epoch_high_qc.as_ref() == Some(&high_qc) {
903 return Ok(());
904 }
905 if let Some(next_epoch_high_qc) = self.next_epoch_high_qc() {
906 ensure!(
907 high_qc.view_number > next_epoch_high_qc.view_number,
908 debug!("Next epoch high QC with an equal or higher view exists.")
909 );
910 }
911 tracing::debug!("Updating next epoch high QC");
912 self.next_epoch_high_qc = Some(high_qc);
913
914 Ok(())
915 }
916
917 pub fn reset_high_qc(
921 &mut self,
922 high_qc: QuorumCertificate2<TYPES>,
923 next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
924 ) -> Result<()> {
925 ensure!(
926 high_qc.data.leaf_commit == next_epoch_qc.data.leaf_commit,
927 error!("High QC's and next epoch QC's leaf commits do not match.")
928 );
929 if self.high_qc == high_qc {
930 return Ok(());
931 }
932 let same_epoch = high_qc.data.block_number.is_some_and(|bn| {
933 let current_qc = self.high_qc();
934 let Some(high_bn) = current_qc.data.block_number else {
935 return false;
936 };
937 epoch_from_block_number(bn + 1, self.epoch_height)
938 == epoch_from_block_number(high_bn + 1, self.epoch_height)
939 });
940 ensure!(
941 high_qc
942 .data
943 .block_number
944 .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
945 && same_epoch,
946 error!("Provided QC is not a transition QC.")
947 );
948 tracing::debug!("Resetting high QC and next epoch high QC");
949 self.high_qc = high_qc;
950 self.next_epoch_high_qc = Some(next_epoch_qc);
951
952 Ok(())
953 }
954
955 pub fn update_state_cert(
959 &mut self,
960 state_cert: LightClientStateUpdateCertificate<TYPES>,
961 ) -> Result<()> {
962 if let Some(existing_state_cert) = &self.state_cert {
963 ensure!(
964 state_cert.epoch > existing_state_cert.epoch,
965 debug!(
966 "Light client state update certification with an equal or higher epoch exists."
967 )
968 );
969 }
970 tracing::debug!("Updating light client state update certification");
971 self.state_cert = Some(state_cert);
972
973 Ok(())
974 }
975
976 pub fn update_vid_shares(
978 &mut self,
979 view_number: TYPES::View,
980 disperse: Proposal<TYPES, VidDisperseShare<TYPES>>,
981 ) {
982 self.vid_shares
983 .entry(view_number)
984 .or_default()
985 .entry(disperse.data.recipient_key().clone())
986 .or_default()
987 .insert(disperse.data.target_epoch(), disperse);
988 }
989
990 pub fn update_saved_da_certs(&mut self, view_number: TYPES::View, cert: DaCertificate2<TYPES>) {
992 self.saved_da_certs.insert(view_number, cert);
993 }
994
995 pub fn visit_leaf_ancestors<F>(
999 &self,
1000 start_from: TYPES::View,
1001 terminator: Terminator<TYPES::View>,
1002 ok_when_finished: bool,
1003 mut f: F,
1004 ) -> std::result::Result<(), HotShotError<TYPES>>
1005 where
1006 F: FnMut(
1007 &Leaf2<TYPES>,
1008 Arc<<TYPES as NodeType>::ValidatedState>,
1009 Option<Arc<<<TYPES as NodeType>::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1010 ) -> bool,
1011 {
1012 let mut next_leaf = if let Some(view) = self.validated_state_map.get(&start_from) {
1013 view.leaf_commitment().ok_or_else(|| {
1014 HotShotError::InvalidState(format!(
1015 "Visited failed view {start_from:?} leaf. Expected successful leaf"
1016 ))
1017 })?
1018 } else {
1019 return Err(HotShotError::InvalidState(format!(
1020 "View {start_from:?} leaf does not exist in state map "
1021 )));
1022 };
1023
1024 while let Some(leaf) = self.saved_leaves.get(&next_leaf) {
1025 let view = leaf.view_number();
1026 if let (Some(state), delta) = self.state_and_delta(view) {
1027 if let Terminator::Exclusive(stop_before) = terminator {
1028 if stop_before == view {
1029 if ok_when_finished {
1030 return Ok(());
1031 }
1032 break;
1033 }
1034 }
1035 next_leaf = leaf.parent_commitment();
1036 if !f(leaf, state, delta) {
1037 return Ok(());
1038 }
1039 if let Terminator::Inclusive(stop_after) = terminator {
1040 if stop_after == view {
1041 if ok_when_finished {
1042 return Ok(());
1043 }
1044 break;
1045 }
1046 }
1047 } else {
1048 return Err(HotShotError::InvalidState(format!(
1049 "View {view:?} state does not exist in state map"
1050 )));
1051 }
1052 }
1053 Err(HotShotError::MissingLeaf(next_leaf))
1054 }
1055
1056 pub fn collect_garbage(&mut self, old_anchor_view: TYPES::View, new_anchor_view: TYPES::View) {
1061 if new_anchor_view <= old_anchor_view {
1063 return;
1064 }
1065 let gc_view = TYPES::View::new(new_anchor_view.saturating_sub(1));
1066 let anchor_entry = self
1068 .validated_state_map
1069 .iter()
1070 .next()
1071 .expect("INCONSISTENT STATE: anchor leaf not in state map!");
1072 if **anchor_entry.0 != old_anchor_view.saturating_sub(1) {
1073 tracing::info!(
1074 "Something about GC has failed. Older leaf exists than the previous anchor leaf."
1075 );
1076 }
1077 self.saved_da_certs
1079 .retain(|view_number, _| *view_number >= old_anchor_view);
1080 self.validated_state_map
1081 .range(old_anchor_view..gc_view)
1082 .filter_map(|(_view_number, view)| view.leaf_commitment())
1083 .for_each(|leaf| {
1084 self.saved_leaves.remove(&leaf);
1085 });
1086 self.validated_state_map = self.validated_state_map.split_off(&gc_view);
1087 self.saved_payloads = self.saved_payloads.split_off(&gc_view);
1088 self.vid_shares = self.vid_shares.split_off(&gc_view);
1089 self.last_proposals = self.last_proposals.split_off(&gc_view);
1090 }
1091
1092 #[must_use]
1098 pub fn decided_leaf(&self) -> Leaf2<TYPES> {
1099 let decided_view_num = self.last_decided_view;
1100 let view = self.validated_state_map.get(&decided_view_num).unwrap();
1101 let leaf = view
1102 .leaf_commitment()
1103 .expect("Decided leaf not found! Consensus internally inconsistent");
1104 self.saved_leaves.get(&leaf).unwrap().clone()
1105 }
1106
1107 pub fn undecided_leaves(&self) -> Vec<Leaf2<TYPES>> {
1108 self.saved_leaves.values().cloned().collect::<Vec<_>>()
1109 }
1110
1111 #[must_use]
1113 pub fn state(&self, view_number: TYPES::View) -> Option<&Arc<TYPES::ValidatedState>> {
1114 match self.validated_state_map.get(&view_number) {
1115 Some(view) => view.state(),
1116 None => None,
1117 }
1118 }
1119
1120 #[must_use]
1122 pub fn state_and_delta(&self, view_number: TYPES::View) -> StateAndDelta<TYPES> {
1123 match self.validated_state_map.get(&view_number) {
1124 Some(view) => view.state_and_delta(),
1125 None => (None, None),
1126 }
1127 }
1128
1129 #[must_use]
1135 pub fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
1136 let decided_view_num = self.last_decided_view;
1137 self.state_and_delta(decided_view_num)
1138 .0
1139 .expect("Decided state not found! Consensus internally inconsistent")
1140 }
1141
1142 #[instrument(skip_all, target = "Consensus", fields(view = *view))]
1148 pub async fn calculate_and_update_vid<V: Versions>(
1149 consensus: OuterConsensus<TYPES>,
1150 view: <TYPES as NodeType>::View,
1151 target_epoch: Option<<TYPES as NodeType>::Epoch>,
1152 membership_coordinator: EpochMembershipCoordinator<TYPES>,
1153 private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
1154 upgrade_lock: &UpgradeLock<TYPES, V>,
1155 ) -> Option<()> {
1156 let payload_with_metadata = Arc::clone(consensus.read().await.saved_payloads().get(&view)?);
1157 let epoch = consensus
1158 .read()
1159 .await
1160 .validated_state_map()
1161 .get(&view)?
1162 .view_inner
1163 .epoch()?;
1164
1165 let vid = VidDisperse::calculate_vid_disperse::<V>(
1166 &payload_with_metadata.payload,
1167 &membership_coordinator,
1168 view,
1169 target_epoch,
1170 epoch,
1171 &payload_with_metadata.metadata,
1172 upgrade_lock,
1173 )
1174 .await
1175 .ok()?;
1176
1177 let shares = VidDisperseShare::from_vid_disperse(vid);
1178 let mut consensus_writer = consensus.write().await;
1179 for share in shares {
1180 if let Some(prop) = share.to_proposal(private_key) {
1181 consensus_writer.update_vid_shares(view, prop);
1182 }
1183 }
1184
1185 Some(())
1186 }
1187 pub fn is_epoch_transition(&self, leaf_commit: LeafCommitment<TYPES>) -> bool {
1189 let Some(leaf) = self.saved_leaves.get(&leaf_commit) else {
1190 tracing::trace!("We don't have a leaf corresponding to the leaf commit");
1191 return false;
1192 };
1193 let block_height = leaf.height();
1194 is_epoch_transition(block_height, self.epoch_height)
1195 }
1196
1197 pub fn is_high_qc_for_epoch_transition(&self) -> bool {
1199 let Some(block_height) = self.high_qc().data.block_number else {
1200 return false;
1201 };
1202 is_epoch_transition(block_height, self.epoch_height)
1203 }
1204
1205 pub fn check_eqc(&self, proposed_leaf: &Leaf2<TYPES>, parent_leaf: &Leaf2<TYPES>) -> bool {
1207 if parent_leaf.view_number() == TYPES::View::genesis() {
1208 return true;
1209 }
1210 let new_epoch = epoch_from_block_number(proposed_leaf.height(), self.epoch_height);
1211 let old_epoch = epoch_from_block_number(parent_leaf.height(), self.epoch_height);
1212
1213 new_epoch - 1 == old_epoch && is_last_block(parent_leaf.height(), self.epoch_height)
1214 }
1215
1216 pub fn is_high_qc_ge_root_block(&self) -> bool {
1218 let Some(leaf) = self.saved_leaves.get(&self.high_qc().data.leaf_commit) else {
1219 tracing::trace!("We don't have a leaf corresponding to the high QC");
1220 return false;
1221 };
1222 let block_height = leaf.height();
1223 is_ge_epoch_root(block_height, self.epoch_height)
1224 }
1225}
1226
1227#[derive(Eq, PartialEq, Debug, Clone)]
1230pub struct CommitmentAndMetadata<TYPES: NodeType> {
1231 pub commitment: VidCommitment,
1233 pub builder_commitment: BuilderCommitment,
1235 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
1237 pub fees: Vec1<BuilderFee<TYPES>>,
1239 pub block_view: TYPES::View,
1241 pub auction_result: Option<TYPES::AuctionResult>,
1243}