1use std::{
10 collections::{BTreeMap, HashMap},
11 mem::ManuallyDrop,
12 ops::{Deref, DerefMut},
13 sync::Arc,
14};
15
16use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
17use committable::{Commitment, Committable};
18use hotshot_utils::anytrace::*;
19use tracing::instrument;
20use vec1::Vec1;
21
22pub use crate::utils::{View, ViewInner};
23use crate::{
24 data::{
25 Leaf2, QuorumProposalWrapper, VidCommitment, VidDisperse, VidDisperseAndDuration,
26 VidDisperseShare,
27 },
28 epoch_membership::EpochMembershipCoordinator,
29 error::HotShotError,
30 event::{HotShotAction, LeafInfo},
31 message::{Proposal, UpgradeLock},
32 simple_certificate::{
33 DaCertificate2, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
34 QuorumCertificate2,
35 },
36 simple_vote::HasEpoch,
37 traits::{
38 block_contents::{BlockHeader, BuilderFee},
39 metrics::{Counter, Gauge, Histogram, Metrics, NoMetrics},
40 node_implementation::{ConsensusTime, NodeType, Versions},
41 signature_key::SignatureKey,
42 BlockPayload, ValidatedState,
43 },
44 utils::{
45 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_ge_epoch_root,
46 is_last_block, is_transition_block, option_epoch_from_block_number, BuilderCommitment,
47 LeafCommitment, StateAndDelta, Terminator,
48 },
49 vote::{Certificate, HasViewNumber},
50};
51
52pub type CommitmentMap<T> = HashMap<Commitment<T>, T>;
54
55pub type VidShares<TYPES> = BTreeMap<
57 <TYPES as NodeType>::View,
58 HashMap<
59 <TYPES as NodeType>::SignatureKey,
60 BTreeMap<Option<<TYPES as NodeType>::Epoch>, Proposal<TYPES, VidDisperseShare<TYPES>>>,
61 >,
62>;
63
64pub type LockedConsensusState<TYPES> = Arc<RwLock<Consensus<TYPES>>>;
66
67#[derive(Clone, Debug)]
69pub struct OuterConsensus<TYPES: NodeType> {
70 pub inner_consensus: LockedConsensusState<TYPES>,
72}
73
74impl<TYPES: NodeType> OuterConsensus<TYPES> {
75 pub fn new(consensus: LockedConsensusState<TYPES>) -> Self {
77 Self {
78 inner_consensus: consensus,
79 }
80 }
81
82 #[instrument(skip_all, target = "OuterConsensus")]
84 pub async fn read(&self) -> ConsensusReadLockGuard<'_, TYPES> {
85 tracing::trace!("Trying to acquire read lock on consensus");
86 let ret = self.inner_consensus.read().await;
87 tracing::trace!("Acquired read lock on consensus");
88 ConsensusReadLockGuard::new(ret)
89 }
90
91 #[instrument(skip_all, target = "OuterConsensus")]
93 pub async fn write(&self) -> ConsensusWriteLockGuard<'_, TYPES> {
94 tracing::trace!("Trying to acquire write lock on consensus");
95 let ret = self.inner_consensus.write().await;
96 tracing::trace!("Acquired write lock on consensus");
97 ConsensusWriteLockGuard::new(ret)
98 }
99
100 #[instrument(skip_all, target = "OuterConsensus")]
102 pub fn try_write(&self) -> Option<ConsensusWriteLockGuard<'_, TYPES>> {
103 tracing::trace!("Trying to acquire write lock on consensus");
104 let ret = self.inner_consensus.try_write();
105 if let Some(guard) = ret {
106 tracing::trace!("Acquired write lock on consensus");
107 Some(ConsensusWriteLockGuard::new(guard))
108 } else {
109 tracing::trace!("Failed to acquire write lock");
110 None
111 }
112 }
113
114 #[instrument(skip_all, target = "OuterConsensus")]
116 pub async fn upgradable_read(&self) -> ConsensusUpgradableReadLockGuard<'_, TYPES> {
117 tracing::trace!("Trying to acquire upgradable read lock on consensus");
118 let ret = self.inner_consensus.upgradable_read().await;
119 tracing::trace!("Acquired upgradable read lock on consensus");
120 ConsensusUpgradableReadLockGuard::new(ret)
121 }
122
123 #[instrument(skip_all, target = "OuterConsensus")]
125 pub fn try_read(&self) -> Option<ConsensusReadLockGuard<'_, TYPES>> {
126 tracing::trace!("Trying to acquire read lock on consensus");
127 let ret = self.inner_consensus.try_read();
128 if let Some(guard) = ret {
129 tracing::trace!("Acquired read lock on consensus");
130 Some(ConsensusReadLockGuard::new(guard))
131 } else {
132 tracing::trace!("Failed to acquire read lock");
133 None
134 }
135 }
136}
137
138pub struct ConsensusReadLockGuard<'a, TYPES: NodeType> {
140 lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>,
142}
143
144impl<'a, TYPES: NodeType> ConsensusReadLockGuard<'a, TYPES> {
145 #[must_use]
147 pub fn new(lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>) -> Self {
148 Self { lock_guard }
149 }
150}
151
152impl<TYPES: NodeType> Deref for ConsensusReadLockGuard<'_, TYPES> {
153 type Target = Consensus<TYPES>;
154 fn deref(&self) -> &Self::Target {
155 &self.lock_guard
156 }
157}
158
159impl<TYPES: NodeType> Drop for ConsensusReadLockGuard<'_, TYPES> {
160 #[instrument(skip_all, target = "ConsensusReadLockGuard")]
161 fn drop(&mut self) {
162 tracing::trace!("Read lock on consensus dropped");
163 }
164}
165
166pub struct ConsensusWriteLockGuard<'a, TYPES: NodeType> {
168 lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>,
170}
171
172impl<'a, TYPES: NodeType> ConsensusWriteLockGuard<'a, TYPES> {
173 #[must_use]
175 pub fn new(lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>) -> Self {
176 Self { lock_guard }
177 }
178}
179
180impl<TYPES: NodeType> Deref for ConsensusWriteLockGuard<'_, TYPES> {
181 type Target = Consensus<TYPES>;
182 fn deref(&self) -> &Self::Target {
183 &self.lock_guard
184 }
185}
186
187impl<TYPES: NodeType> DerefMut for ConsensusWriteLockGuard<'_, TYPES> {
188 fn deref_mut(&mut self) -> &mut Self::Target {
189 &mut self.lock_guard
190 }
191}
192
193impl<TYPES: NodeType> Drop for ConsensusWriteLockGuard<'_, TYPES> {
194 #[instrument(skip_all, target = "ConsensusWriteLockGuard")]
195 fn drop(&mut self) {
196 tracing::debug!("Write lock on consensus dropped");
197 }
198}
199
200pub struct ConsensusUpgradableReadLockGuard<'a, TYPES: NodeType> {
202 lock_guard: ManuallyDrop<RwLockUpgradableReadGuard<'a, Consensus<TYPES>>>,
204 taken: bool,
206}
207
208impl<'a, TYPES: NodeType> ConsensusUpgradableReadLockGuard<'a, TYPES> {
209 #[must_use]
211 pub fn new(lock_guard: RwLockUpgradableReadGuard<'a, Consensus<TYPES>>) -> Self {
212 Self {
213 lock_guard: ManuallyDrop::new(lock_guard),
214 taken: false,
215 }
216 }
217
218 #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
220 pub async fn upgrade(mut guard: Self) -> ConsensusWriteLockGuard<'a, TYPES> {
221 let inner_guard = unsafe { ManuallyDrop::take(&mut guard.lock_guard) };
222 guard.taken = true;
223 tracing::debug!("Trying to upgrade upgradable read lock on consensus");
224 let ret = RwLockUpgradableReadGuard::upgrade(inner_guard).await;
225 tracing::debug!("Upgraded upgradable read lock on consensus");
226 ConsensusWriteLockGuard::new(ret)
227 }
228}
229
230impl<TYPES: NodeType> Deref for ConsensusUpgradableReadLockGuard<'_, TYPES> {
231 type Target = Consensus<TYPES>;
232
233 fn deref(&self) -> &Self::Target {
234 &self.lock_guard
235 }
236}
237
238impl<TYPES: NodeType> Drop for ConsensusUpgradableReadLockGuard<'_, TYPES> {
239 #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
240 fn drop(&mut self) {
241 if !self.taken {
242 unsafe { ManuallyDrop::drop(&mut self.lock_guard) }
243 tracing::debug!("Upgradable read lock on consensus dropped");
244 }
245 }
246}
247
248#[derive(Debug, Clone, Copy)]
250struct HotShotActionViews<T: ConsensusTime> {
251 proposed: T,
253 voted: T,
255 da_proposed: T,
257 da_vote: T,
259}
260
261impl<T: ConsensusTime> Default for HotShotActionViews<T> {
262 fn default() -> Self {
263 let genesis = T::genesis();
264 Self {
265 proposed: genesis,
266 voted: genesis,
267 da_proposed: genesis,
268 da_vote: genesis,
269 }
270 }
271}
272impl<T: ConsensusTime> HotShotActionViews<T> {
273 fn from_view(view: T) -> Self {
275 Self {
276 proposed: view,
277 voted: view,
278 da_proposed: view,
279 da_vote: view,
280 }
281 }
282}
283
284#[derive(Debug, Clone)]
285struct ValidatorParticipation<TYPES: NodeType> {
286 epoch: TYPES::Epoch,
287 current_epoch_participation: HashMap<TYPES::SignatureKey, (u64, u64)>,
289
290 last_epoch_participation: HashMap<TYPES::SignatureKey, (u64, u64)>,
292}
293
294impl<TYPES: NodeType> ValidatorParticipation<TYPES> {
295 fn new() -> Self {
296 Self {
297 epoch: TYPES::Epoch::genesis(),
298 current_epoch_participation: HashMap::new(),
299 last_epoch_participation: HashMap::new(),
300 }
301 }
302
303 fn update_participation(
304 &mut self,
305 key: TYPES::SignatureKey,
306 epoch: TYPES::Epoch,
307 proposed: bool,
308 ) {
309 if epoch != self.epoch {
310 return;
311 }
312 let entry = self
313 .current_epoch_participation
314 .entry(key)
315 .or_insert((0, 0));
316 if proposed {
317 entry.1 += 1;
318 }
319 entry.0 += 1;
320 }
321
322 fn update_participation_epoch(&mut self, epoch: TYPES::Epoch) {
323 if epoch <= self.epoch {
324 return;
325 }
326 self.epoch = epoch;
327 self.last_epoch_participation = self.current_epoch_participation.clone();
328 self.current_epoch_participation = HashMap::new();
329 }
330
331 fn get_participation(&self, key: TYPES::SignatureKey) -> (f64, Option<f64>) {
332 let current_epoch_participation = self
333 .current_epoch_participation
334 .get(&key)
335 .unwrap_or(&(0, 0));
336 let num_leader = current_epoch_participation.0;
337 let num_proposed = current_epoch_participation.1;
338
339 let current_epoch_participation_ratio = if num_leader == 0 {
340 0.0
341 } else {
342 num_proposed as f64 / num_leader as f64
343 };
344 let last_epoch_participation = self.last_epoch_participation.get(&key);
345 let last_epoch_participation_ratio = last_epoch_participation.map(|(leader, proposed)| {
346 if *leader == 0 {
347 0.0
348 } else {
349 *proposed as f64 / *leader as f64
350 }
351 });
352 (
353 current_epoch_participation_ratio,
354 last_epoch_participation_ratio,
355 )
356 }
357
358 fn current_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
359 self.current_epoch_participation
360 .iter()
361 .map(|(key, (leader, proposed))| {
362 (
363 key.clone(),
364 if *leader == 0 {
365 0.0
366 } else {
367 *proposed as f64 / *leader as f64
368 },
369 )
370 })
371 .collect()
372 }
373 fn previous_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
374 self.last_epoch_participation
375 .iter()
376 .map(|(key, (leader, proposed))| {
377 (
378 key.clone(),
379 if *leader == 0 {
380 0.0
381 } else {
382 *proposed as f64 / *leader as f64
383 },
384 )
385 })
386 .collect()
387 }
388}
389
390#[derive(derive_more::Debug, Clone)]
394pub struct Consensus<TYPES: NodeType> {
395 validated_state_map: BTreeMap<TYPES::View, View<TYPES>>,
397
398 vid_shares: VidShares<TYPES>,
400
401 saved_da_certs: HashMap<TYPES::View, DaCertificate2<TYPES>>,
404
405 cur_view: TYPES::View,
407
408 cur_epoch: Option<TYPES::Epoch>,
410
411 last_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
414
415 last_decided_view: TYPES::View,
417
418 locked_view: TYPES::View,
420
421 saved_leaves: CommitmentMap<Leaf2<TYPES>>,
425
426 last_actions: HotShotActionViews<TYPES::View>,
430
431 saved_payloads: BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>>,
435
436 high_qc: QuorumCertificate2<TYPES>,
438
439 next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
441
442 validator_participation: ValidatorParticipation<TYPES>,
444
445 pub metrics: Arc<ConsensusMetricsValue>,
447
448 pub epoch_height: u64,
450
451 pub drb_difficulty: u64,
453
454 pub drb_upgrade_difficulty: u64,
456
457 transition_qc: Option<(
459 QuorumCertificate2<TYPES>,
460 NextEpochQuorumCertificate2<TYPES>,
461 )>,
462
463 pub highest_block: u64,
465 pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
467}
468
469#[derive(Debug, Clone, Hash, Eq, PartialEq)]
471pub struct PayloadWithMetadata<TYPES: NodeType> {
472 pub payload: TYPES::BlockPayload,
473 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
474}
475
476#[derive(Clone, Debug)]
478pub struct ConsensusMetricsValue {
479 pub last_synced_block_height: Box<dyn Gauge>,
481 pub last_decided_view: Box<dyn Gauge>,
483 pub last_voted_view: Box<dyn Gauge>,
485 pub last_decided_time: Box<dyn Gauge>,
487 pub current_view: Box<dyn Gauge>,
489 pub number_of_views_since_last_decide: Box<dyn Gauge>,
491 pub number_of_views_per_decide_event: Box<dyn Histogram>,
493 pub view_duration_as_leader: Box<dyn Histogram>,
495 pub invalid_qc: Box<dyn Gauge>,
497 pub outstanding_transactions: Box<dyn Gauge>,
499 pub outstanding_transactions_memory_size: Box<dyn Gauge>,
501 pub number_of_timeouts: Box<dyn Counter>,
503 pub number_of_timeouts_as_leader: Box<dyn Counter>,
505 pub number_of_empty_blocks_proposed: Box<dyn Counter>,
507 pub internal_event_queue_len: Box<dyn Gauge>,
509 pub proposal_to_decide_time: Box<dyn Histogram>,
511 pub previous_proposal_to_proposal_time: Box<dyn Histogram>,
513 pub finalized_bytes: Box<dyn Histogram>,
515 pub validate_and_apply_header_duration: Box<dyn Histogram>,
517 pub update_leaf_duration: Box<dyn Histogram>,
519 pub vid_disperse_duration: Box<dyn Histogram>,
521}
522
523impl ConsensusMetricsValue {
524 #[must_use]
526 pub fn new(metrics: &dyn Metrics) -> Self {
527 Self {
528 last_synced_block_height: metrics
529 .create_gauge(String::from("last_synced_block_height"), None),
530 last_decided_view: metrics.create_gauge(String::from("last_decided_view"), None),
531 last_voted_view: metrics.create_gauge(String::from("last_voted_view"), None),
532 last_decided_time: metrics.create_gauge(String::from("last_decided_time"), None),
533 current_view: metrics.create_gauge(String::from("current_view"), None),
534 number_of_views_since_last_decide: metrics
535 .create_gauge(String::from("number_of_views_since_last_decide"), None),
536 number_of_views_per_decide_event: metrics
537 .create_histogram(String::from("number_of_views_per_decide_event"), None),
538 view_duration_as_leader: metrics
539 .create_histogram(String::from("view_duration_as_leader"), None),
540 invalid_qc: metrics.create_gauge(String::from("invalid_qc"), None),
541 outstanding_transactions: metrics
542 .create_gauge(String::from("outstanding_transactions"), None),
543 outstanding_transactions_memory_size: metrics
544 .create_gauge(String::from("outstanding_transactions_memory_size"), None),
545 number_of_timeouts: metrics.create_counter(String::from("number_of_timeouts"), None),
546 number_of_timeouts_as_leader: metrics
547 .create_counter(String::from("number_of_timeouts_as_leader"), None),
548 number_of_empty_blocks_proposed: metrics
549 .create_counter(String::from("number_of_empty_blocks_proposed"), None),
550 internal_event_queue_len: metrics
551 .create_gauge(String::from("internal_event_queue_len"), None),
552 proposal_to_decide_time: metrics
553 .create_histogram(String::from("proposal_to_decide_time"), None),
554 previous_proposal_to_proposal_time: metrics
555 .create_histogram(String::from("previous_proposal_to_proposal_time"), None),
556 finalized_bytes: metrics.create_histogram(String::from("finalized_bytes"), None),
557 validate_and_apply_header_duration: metrics.create_histogram(
558 String::from("validate_and_apply_header_duration"),
559 Some("seconds".to_string()),
560 ),
561 update_leaf_duration: metrics.create_histogram(
562 String::from("update_leaf_duration"),
563 Some("seconds".to_string()),
564 ),
565 vid_disperse_duration: metrics.create_histogram(
566 String::from("vid_disperse_duration"),
567 Some("seconds".to_string()),
568 ),
569 }
570 }
571}
572
573impl Default for ConsensusMetricsValue {
574 fn default() -> Self {
575 Self::new(&*NoMetrics::boxed())
576 }
577}
578
579impl<TYPES: NodeType> Consensus<TYPES> {
580 #[allow(clippy::too_many_arguments)]
582 pub fn new(
583 validated_state_map: BTreeMap<TYPES::View, View<TYPES>>,
584 vid_shares: Option<VidShares<TYPES>>,
585 cur_view: TYPES::View,
586 cur_epoch: Option<TYPES::Epoch>,
587 locked_view: TYPES::View,
588 last_decided_view: TYPES::View,
589 last_actioned_view: TYPES::View,
590 last_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
591 saved_leaves: CommitmentMap<Leaf2<TYPES>>,
592 saved_payloads: BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>>,
593 high_qc: QuorumCertificate2<TYPES>,
594 next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
595 metrics: Arc<ConsensusMetricsValue>,
596 epoch_height: u64,
597 state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
598 drb_difficulty: u64,
599 drb_upgrade_difficulty: u64,
600 ) -> Self {
601 let transition_qc = if let Some(ref next_epoch_high_qc) = next_epoch_high_qc {
602 if high_qc
603 .data
604 .block_number
605 .is_some_and(|bn| is_transition_block(bn, epoch_height))
606 {
607 if high_qc.data.leaf_commit == next_epoch_high_qc.data.leaf_commit {
608 Some((high_qc.clone(), next_epoch_high_qc.clone()))
609 } else {
610 tracing::error!("Next epoch high QC has different leaf commit to high QC");
611 None
612 }
613 } else {
614 None
615 }
616 } else {
617 None
618 };
619 Consensus {
620 validated_state_map,
621 vid_shares: vid_shares.unwrap_or_default(),
622 saved_da_certs: HashMap::new(),
623 cur_view,
624 cur_epoch,
625 last_decided_view,
626 last_proposals,
627 last_actions: HotShotActionViews::from_view(last_actioned_view),
628 locked_view,
629 saved_leaves,
630 saved_payloads,
631 high_qc,
632 next_epoch_high_qc,
633 metrics,
634 epoch_height,
635 transition_qc,
636 highest_block: 0,
637 state_cert,
638 drb_difficulty,
639 validator_participation: ValidatorParticipation::new(),
640 drb_upgrade_difficulty,
641 }
642 }
643
644 pub fn cur_view(&self) -> TYPES::View {
646 self.cur_view
647 }
648
649 pub fn cur_epoch(&self) -> Option<TYPES::Epoch> {
651 self.cur_epoch
652 }
653
654 pub fn last_decided_view(&self) -> TYPES::View {
656 self.last_decided_view
657 }
658
659 pub fn locked_view(&self) -> TYPES::View {
661 self.locked_view
662 }
663
664 pub fn high_qc(&self) -> &QuorumCertificate2<TYPES> {
666 &self.high_qc
667 }
668
669 pub fn transition_qc(
671 &self,
672 ) -> Option<&(
673 QuorumCertificate2<TYPES>,
674 NextEpochQuorumCertificate2<TYPES>,
675 )> {
676 self.transition_qc.as_ref()
677 }
678
679 pub fn update_highest_block(&mut self, block_number: u64) {
681 if block_number > self.highest_block {
682 self.highest_block = block_number;
683 return;
684 }
685
686 if is_epoch_transition(block_number, self.epoch_height) {
687 let new_epoch = epoch_from_block_number(block_number, self.epoch_height);
688 let high_epoch = epoch_from_block_number(self.highest_block, self.epoch_height);
689 if new_epoch >= high_epoch {
690 self.highest_block = block_number;
691 }
692 }
693 }
694
695 pub fn update_transition_qc(
697 &mut self,
698 qc: QuorumCertificate2<TYPES>,
699 next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
700 ) {
701 if next_epoch_qc.data.leaf_commit != qc.data().leaf_commit {
702 tracing::error!(
703 "Next epoch QC for view {} has different leaf commit {:?} to {:?}",
704 qc.view_number(),
705 next_epoch_qc.data.leaf_commit,
706 qc.data().leaf_commit
707 );
708 return;
709 }
710 if let Some((transition_qc, _)) = &self.transition_qc {
711 if transition_qc.view_number() >= qc.view_number() {
712 return;
713 }
714 }
715 self.transition_qc = Some((qc, next_epoch_qc));
716 }
717
718 pub fn state_cert(&self) -> Option<&LightClientStateUpdateCertificateV2<TYPES>> {
720 self.state_cert.as_ref()
721 }
722
723 pub fn next_epoch_high_qc(&self) -> Option<&NextEpochQuorumCertificate2<TYPES>> {
725 self.next_epoch_high_qc.as_ref()
726 }
727
728 pub fn validated_state_map(&self) -> &BTreeMap<TYPES::View, View<TYPES>> {
730 &self.validated_state_map
731 }
732
733 pub fn saved_leaves(&self) -> &CommitmentMap<Leaf2<TYPES>> {
735 &self.saved_leaves
736 }
737
738 pub fn saved_payloads(&self) -> &BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>> {
740 &self.saved_payloads
741 }
742
743 pub fn vid_shares(&self) -> &VidShares<TYPES> {
745 &self.vid_shares
746 }
747
748 pub fn saved_da_certs(&self) -> &HashMap<TYPES::View, DaCertificate2<TYPES>> {
750 &self.saved_da_certs
751 }
752
753 pub fn last_proposals(
755 &self,
756 ) -> &BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
757 &self.last_proposals
758 }
759
760 pub fn update_view(&mut self, view_number: TYPES::View) -> Result<()> {
764 ensure!(
765 view_number > self.cur_view,
766 debug!("New view isn't newer than the current view.")
767 );
768 self.cur_view = view_number;
769 Ok(())
770 }
771
772 pub fn update_validator_participation(
774 &mut self,
775 key: TYPES::SignatureKey,
776 epoch: TYPES::Epoch,
777 proposed: bool,
778 ) {
779 self.validator_participation
780 .update_participation(key, epoch, proposed);
781 }
782
783 pub fn update_validator_participation_epoch(&mut self, epoch: TYPES::Epoch) {
785 self.validator_participation
786 .update_participation_epoch(epoch);
787 }
788
789 pub fn get_validator_participation(&self, key: TYPES::SignatureKey) -> (f64, Option<f64>) {
791 self.validator_participation.get_participation(key)
792 }
793
794 pub fn current_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
796 self.validator_participation
797 .current_proposal_participation()
798 }
799
800 pub fn previous_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
802 self.validator_participation
803 .previous_proposal_participation()
804 }
805
806 pub async fn parent_leaf_info(
809 &self,
810 leaf: &Leaf2<TYPES>,
811 public_key: &TYPES::SignatureKey,
812 ) -> Option<LeafInfo<TYPES>> {
813 let parent_view_number = leaf.justify_qc().view_number();
814 let parent_epoch = leaf.justify_qc().epoch();
815 let parent_leaf = self
816 .saved_leaves
817 .get(&leaf.justify_qc().data().leaf_commit)?;
818 let parent_state_and_delta = self.state_and_delta(parent_view_number);
819 let (Some(state), delta) = parent_state_and_delta else {
820 return None;
821 };
822
823 let parent_vid = self
824 .vid_shares()
825 .get(&parent_view_number)
826 .and_then(|key_map| key_map.get(public_key))
827 .and_then(|epoch_map| epoch_map.get(&parent_epoch))
828 .map(|prop| prop.data.clone());
829
830 let state_cert = if parent_leaf.with_epoch
831 && is_epoch_root(parent_leaf.block_header().block_number(), self.epoch_height)
832 {
833 match self.state_cert() {
834 Some(state_cert)
836 if state_cert.light_client_state.view_number == parent_view_number.u64() =>
837 {
838 Some(state_cert.clone())
839 },
840 _ => None,
841 }
842 } else {
843 None
844 };
845
846 Some(LeafInfo {
847 leaf: parent_leaf.clone(),
848 state,
849 delta,
850 vid_share: parent_vid,
851 state_cert,
852 })
853 }
854
855 pub fn update_epoch(&mut self, epoch_number: TYPES::Epoch) -> Result<()> {
859 ensure!(
860 self.cur_epoch.is_none() || Some(epoch_number) > self.cur_epoch,
861 debug!("New epoch isn't newer than the current epoch.")
862 );
863 tracing::trace!(
864 "Updating epoch from {:?} to {}",
865 self.cur_epoch,
866 epoch_number
867 );
868 self.cur_epoch = Some(epoch_number);
869 Ok(())
870 }
871
872 pub fn update_action(&mut self, action: HotShotAction, view: TYPES::View) -> bool {
876 let old_view = match action {
877 HotShotAction::Vote => &mut self.last_actions.voted,
878 HotShotAction::Propose => &mut self.last_actions.proposed,
879 HotShotAction::DaPropose => &mut self.last_actions.da_proposed,
880 HotShotAction::DaVote => {
881 if view > self.last_actions.da_vote {
882 self.last_actions.da_vote = view;
883 }
884 return true;
889 },
890 _ => return true,
891 };
892 if view > *old_view {
893 *old_view = view;
894 return true;
895 }
896 false
897 }
898
899 pub fn reset_actions(&mut self) {
901 self.last_actions = HotShotActionViews::default();
902 }
903
904 pub fn update_proposed_view(
909 &mut self,
910 proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
911 ) -> Result<()> {
912 ensure!(
913 proposal.data.view_number()
914 > self
915 .last_proposals
916 .last_key_value()
917 .map_or(TYPES::View::genesis(), |(k, _)| { *k }),
918 debug!("New view isn't newer than the previously proposed view.")
919 );
920 self.last_proposals
921 .insert(proposal.data.view_number(), proposal);
922 Ok(())
923 }
924
925 pub fn update_last_decided_view(&mut self, view_number: TYPES::View) -> Result<()> {
930 ensure!(
931 view_number > self.last_decided_view,
932 debug!("New view isn't newer than the previously decided view.")
933 );
934 self.last_decided_view = view_number;
935 Ok(())
936 }
937
938 pub fn update_locked_view(&mut self, view_number: TYPES::View) -> Result<()> {
943 ensure!(
944 view_number > self.locked_view,
945 debug!("New view isn't newer than the previously locked view.")
946 );
947 self.locked_view = view_number;
948 Ok(())
949 }
950
951 pub fn update_da_view(
957 &mut self,
958 view_number: TYPES::View,
959 epoch: Option<TYPES::Epoch>,
960 payload_commitment: VidCommitment,
961 ) -> Result<()> {
962 let view = View {
963 view_inner: ViewInner::Da {
964 payload_commitment,
965 epoch,
966 },
967 };
968 self.update_validated_state_map(view_number, view)
969 }
970
971 pub fn update_leaf(
977 &mut self,
978 leaf: Leaf2<TYPES>,
979 state: Arc<TYPES::ValidatedState>,
980 delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
981 ) -> Result<()> {
982 let view_number = leaf.view_number();
983 let epoch = option_epoch_from_block_number::<TYPES>(
984 leaf.with_epoch,
985 leaf.height(),
986 self.epoch_height,
987 );
988 let view = View {
989 view_inner: ViewInner::Leaf {
990 leaf: leaf.commit(),
991 state,
992 delta,
993 epoch,
994 },
995 };
996 self.update_validated_state_map(view_number, view)?;
997 self.update_saved_leaves(leaf);
998 Ok(())
999 }
1000
1001 fn update_validated_state_map(
1007 &mut self,
1008 view_number: TYPES::View,
1009 new_view: View<TYPES>,
1010 ) -> Result<()> {
1011 if let Some(existing_view) = self.validated_state_map().get(&view_number) {
1012 if let ViewInner::Leaf {
1013 delta: ref existing_delta,
1014 ..
1015 } = existing_view.view_inner
1016 {
1017 if let ViewInner::Leaf {
1018 delta: ref new_delta,
1019 ..
1020 } = new_view.view_inner
1021 {
1022 ensure!(
1023 new_delta.is_some() || existing_delta.is_none(),
1024 debug!(
1025 "Skipping the state update to not override a `Leaf` view with `Some` \
1026 state delta."
1027 )
1028 );
1029 } else {
1030 bail!(
1031 "Skipping the state update to not override a `Leaf` view with a \
1032 non-`Leaf` view."
1033 );
1034 }
1035 }
1036 }
1037 self.validated_state_map.insert(view_number, new_view);
1038 Ok(())
1039 }
1040
1041 fn update_saved_leaves(&mut self, leaf: Leaf2<TYPES>) {
1043 self.saved_leaves.insert(leaf.commit(), leaf);
1044 }
1045
1046 pub fn update_saved_payloads(
1051 &mut self,
1052 view_number: TYPES::View,
1053 payload: Arc<PayloadWithMetadata<TYPES>>,
1054 ) -> Result<()> {
1055 ensure!(
1056 !self.saved_payloads.contains_key(&view_number),
1057 "Payload with the same view already exists."
1058 );
1059 self.saved_payloads.insert(view_number, payload);
1060 Ok(())
1061 }
1062
1063 pub fn update_high_qc(&mut self, high_qc: QuorumCertificate2<TYPES>) -> Result<()> {
1067 if self.high_qc == high_qc {
1068 return Ok(());
1069 }
1070 ensure!(
1072 high_qc.view_number > self.high_qc.view_number,
1073 debug!("High QC with an equal or higher view exists.")
1074 );
1075 tracing::debug!("Updating high QC");
1076 self.high_qc = high_qc;
1077
1078 Ok(())
1079 }
1080
1081 pub fn update_next_epoch_high_qc(
1087 &mut self,
1088 high_qc: NextEpochQuorumCertificate2<TYPES>,
1089 ) -> Result<()> {
1090 if self.next_epoch_high_qc.as_ref() == Some(&high_qc) {
1091 return Ok(());
1092 }
1093 if let Some(next_epoch_high_qc) = self.next_epoch_high_qc() {
1094 ensure!(
1095 high_qc.view_number > next_epoch_high_qc.view_number,
1096 debug!("Next epoch high QC with an equal or higher view exists.")
1097 );
1098 }
1099 tracing::debug!("Updating next epoch high QC");
1100 self.next_epoch_high_qc = Some(high_qc);
1101
1102 Ok(())
1103 }
1104
1105 pub fn reset_high_qc(
1109 &mut self,
1110 high_qc: QuorumCertificate2<TYPES>,
1111 next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
1112 ) -> Result<()> {
1113 ensure!(
1114 high_qc.data.leaf_commit == next_epoch_qc.data.leaf_commit,
1115 error!("High QC's and next epoch QC's leaf commits do not match.")
1116 );
1117 if self.high_qc == high_qc {
1118 return Ok(());
1119 }
1120 let same_epoch = high_qc.data.block_number.is_some_and(|bn| {
1121 let current_qc = self.high_qc();
1122 let Some(high_bn) = current_qc.data.block_number else {
1123 return false;
1124 };
1125 epoch_from_block_number(bn + 1, self.epoch_height)
1126 == epoch_from_block_number(high_bn + 1, self.epoch_height)
1127 });
1128 ensure!(
1129 high_qc
1130 .data
1131 .block_number
1132 .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
1133 && same_epoch,
1134 error!("Provided QC is not a transition QC.")
1135 );
1136 tracing::debug!("Resetting high QC and next epoch high QC");
1137 self.high_qc = high_qc;
1138 self.next_epoch_high_qc = Some(next_epoch_qc);
1139
1140 Ok(())
1141 }
1142
1143 pub fn update_state_cert(
1147 &mut self,
1148 state_cert: LightClientStateUpdateCertificateV2<TYPES>,
1149 ) -> Result<()> {
1150 if let Some(existing_state_cert) = &self.state_cert {
1151 ensure!(
1152 state_cert.epoch > existing_state_cert.epoch,
1153 debug!(
1154 "Light client state update certification with an equal or higher epoch exists."
1155 )
1156 );
1157 }
1158 tracing::debug!("Updating light client state update certification");
1159 self.state_cert = Some(state_cert);
1160
1161 Ok(())
1162 }
1163
1164 pub fn update_vid_shares(
1166 &mut self,
1167 view_number: TYPES::View,
1168 disperse: Proposal<TYPES, VidDisperseShare<TYPES>>,
1169 ) {
1170 self.vid_shares
1171 .entry(view_number)
1172 .or_default()
1173 .entry(disperse.data.recipient_key().clone())
1174 .or_default()
1175 .insert(disperse.data.target_epoch(), disperse);
1176 }
1177
1178 pub fn update_saved_da_certs(&mut self, view_number: TYPES::View, cert: DaCertificate2<TYPES>) {
1180 self.saved_da_certs.insert(view_number, cert);
1181 }
1182
1183 pub fn visit_leaf_ancestors<F>(
1187 &self,
1188 start_from: TYPES::View,
1189 terminator: Terminator<TYPES::View>,
1190 ok_when_finished: bool,
1191 mut f: F,
1192 ) -> std::result::Result<(), HotShotError<TYPES>>
1193 where
1194 F: FnMut(
1195 &Leaf2<TYPES>,
1196 Arc<<TYPES as NodeType>::ValidatedState>,
1197 Option<Arc<<<TYPES as NodeType>::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1198 ) -> bool,
1199 {
1200 let mut next_leaf = if let Some(view) = self.validated_state_map.get(&start_from) {
1201 view.leaf_commitment().ok_or_else(|| {
1202 HotShotError::InvalidState(format!(
1203 "Visited failed view {start_from} leaf. Expected successful leaf"
1204 ))
1205 })?
1206 } else {
1207 return Err(HotShotError::InvalidState(format!(
1208 "View {start_from} leaf does not exist in state map "
1209 )));
1210 };
1211
1212 while let Some(leaf) = self.saved_leaves.get(&next_leaf) {
1213 let view = leaf.view_number();
1214 if let (Some(state), delta) = self.state_and_delta(view) {
1215 if let Terminator::Exclusive(stop_before) = terminator {
1216 if stop_before == view {
1217 if ok_when_finished {
1218 return Ok(());
1219 }
1220 break;
1221 }
1222 }
1223 next_leaf = leaf.parent_commitment();
1224 if !f(leaf, state, delta) {
1225 return Ok(());
1226 }
1227 if let Terminator::Inclusive(stop_after) = terminator {
1228 if stop_after == view {
1229 if ok_when_finished {
1230 return Ok(());
1231 }
1232 break;
1233 }
1234 }
1235 } else {
1236 return Err(HotShotError::InvalidState(format!(
1237 "View {view} state does not exist in state map"
1238 )));
1239 }
1240 }
1241 Err(HotShotError::MissingLeaf(next_leaf))
1242 }
1243
1244 pub fn collect_garbage(&mut self, old_anchor_view: TYPES::View, new_anchor_view: TYPES::View) {
1249 if new_anchor_view <= old_anchor_view {
1251 return;
1252 }
1253 let gc_view = TYPES::View::new(new_anchor_view.saturating_sub(1));
1254 let anchor_entry = self
1256 .validated_state_map
1257 .iter()
1258 .next()
1259 .expect("INCONSISTENT STATE: anchor leaf not in state map!");
1260 if **anchor_entry.0 != old_anchor_view.saturating_sub(1) {
1261 tracing::info!(
1262 "Something about GC has failed. Older leaf exists than the previous anchor leaf."
1263 );
1264 }
1265 self.saved_da_certs
1267 .retain(|view_number, _| *view_number >= old_anchor_view);
1268 self.validated_state_map
1269 .range(..gc_view)
1270 .filter_map(|(_view_number, view)| view.leaf_commitment())
1271 .for_each(|leaf| {
1272 self.saved_leaves.remove(&leaf);
1273 });
1274 self.validated_state_map = self.validated_state_map.split_off(&gc_view);
1275 self.saved_payloads = self.saved_payloads.split_off(&gc_view);
1276 self.vid_shares = self.vid_shares.split_off(&gc_view);
1277 self.last_proposals = self.last_proposals.split_off(&gc_view);
1278 }
1279
1280 #[must_use]
1286 pub fn decided_leaf(&self) -> Leaf2<TYPES> {
1287 let decided_view_num = self.last_decided_view;
1288 let view = self.validated_state_map.get(&decided_view_num).unwrap();
1289 let leaf = view
1290 .leaf_commitment()
1291 .expect("Decided leaf not found! Consensus internally inconsistent");
1292 self.saved_leaves.get(&leaf).unwrap().clone()
1293 }
1294
1295 pub fn undecided_leaves(&self) -> Vec<Leaf2<TYPES>> {
1296 self.saved_leaves.values().cloned().collect::<Vec<_>>()
1297 }
1298
1299 #[must_use]
1301 pub fn state(&self, view_number: TYPES::View) -> Option<&Arc<TYPES::ValidatedState>> {
1302 match self.validated_state_map.get(&view_number) {
1303 Some(view) => view.state(),
1304 None => None,
1305 }
1306 }
1307
1308 #[must_use]
1310 pub fn state_and_delta(&self, view_number: TYPES::View) -> StateAndDelta<TYPES> {
1311 match self.validated_state_map.get(&view_number) {
1312 Some(view) => view.state_and_delta(),
1313 None => (None, None),
1314 }
1315 }
1316
1317 #[must_use]
1323 pub fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
1324 let decided_view_num = self.last_decided_view;
1325 self.state_and_delta(decided_view_num)
1326 .0
1327 .expect("Decided state not found! Consensus internally inconsistent")
1328 }
1329
1330 #[instrument(skip_all, target = "Consensus", fields(view = *view))]
1336 pub async fn calculate_and_update_vid<V: Versions>(
1337 consensus: OuterConsensus<TYPES>,
1338 view: <TYPES as NodeType>::View,
1339 target_epoch: Option<<TYPES as NodeType>::Epoch>,
1340 membership_coordinator: EpochMembershipCoordinator<TYPES>,
1341 private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
1342 upgrade_lock: &UpgradeLock<TYPES, V>,
1343 ) -> Option<()> {
1344 let payload_with_metadata = Arc::clone(consensus.read().await.saved_payloads().get(&view)?);
1345 let epoch = consensus
1346 .read()
1347 .await
1348 .validated_state_map()
1349 .get(&view)?
1350 .view_inner
1351 .epoch()?;
1352
1353 let VidDisperseAndDuration {
1354 disperse: vid,
1355 duration: disperse_duration,
1356 } = VidDisperse::calculate_vid_disperse::<V>(
1357 &payload_with_metadata.payload,
1358 &membership_coordinator,
1359 view,
1360 target_epoch,
1361 epoch,
1362 &payload_with_metadata.metadata,
1363 upgrade_lock,
1364 )
1365 .await
1366 .ok()?;
1367
1368 let shares = VidDisperseShare::from_vid_disperse(vid);
1369 let mut consensus_writer = consensus.write().await;
1370 consensus_writer
1371 .metrics
1372 .vid_disperse_duration
1373 .add_point(disperse_duration.as_secs_f64());
1374 for share in shares {
1375 if let Some(prop) = share.to_proposal(private_key) {
1376 consensus_writer.update_vid_shares(view, prop);
1377 }
1378 }
1379
1380 Some(())
1381 }
1382 pub fn is_epoch_transition(&self, leaf_commit: LeafCommitment<TYPES>) -> bool {
1384 let Some(leaf) = self.saved_leaves.get(&leaf_commit) else {
1385 tracing::trace!("We don't have a leaf corresponding to the leaf commit");
1386 return false;
1387 };
1388 let block_height = leaf.height();
1389 is_epoch_transition(block_height, self.epoch_height)
1390 }
1391
1392 pub fn is_high_qc_for_epoch_transition(&self) -> bool {
1394 let Some(block_height) = self.high_qc().data.block_number else {
1395 return false;
1396 };
1397 is_epoch_transition(block_height, self.epoch_height)
1398 }
1399
1400 pub fn check_eqc(&self, proposed_leaf: &Leaf2<TYPES>, parent_leaf: &Leaf2<TYPES>) -> bool {
1402 if parent_leaf.view_number() == TYPES::View::genesis() {
1403 return true;
1404 }
1405 let new_epoch = epoch_from_block_number(proposed_leaf.height(), self.epoch_height);
1406 let old_epoch = epoch_from_block_number(parent_leaf.height(), self.epoch_height);
1407
1408 new_epoch - 1 == old_epoch && is_last_block(parent_leaf.height(), self.epoch_height)
1409 }
1410
1411 pub fn is_high_qc_ge_root_block(&self) -> bool {
1413 let Some(leaf) = self.saved_leaves.get(&self.high_qc().data.leaf_commit) else {
1414 tracing::trace!("We don't have a leaf corresponding to the high QC");
1415 return false;
1416 };
1417 let block_height = leaf.height();
1418 is_ge_epoch_root(block_height, self.epoch_height)
1419 }
1420
1421 pub fn is_high_qc_last_block(&self) -> bool {
1422 let Some(block_height) = self.high_qc().data.block_number else {
1423 tracing::warn!("We don't have a block number for the high QC");
1424 return false;
1425 };
1426 is_last_block(block_height, self.epoch_height)
1427 }
1428}
1429
1430#[derive(Eq, PartialEq, Debug, Clone)]
1433pub struct CommitmentAndMetadata<TYPES: NodeType> {
1434 pub commitment: VidCommitment,
1436 pub builder_commitment: BuilderCommitment,
1438 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
1440 pub fees: Vec1<BuilderFee<TYPES>>,
1442 pub block_view: TYPES::View,
1444}