1use std::{
10 collections::{BTreeMap, HashMap},
11 mem::ManuallyDrop,
12 ops::{Deref, DerefMut},
13 sync::Arc,
14};
15
16use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
17use committable::{Commitment, Committable};
18use hotshot_utils::anytrace::*;
19use tracing::instrument;
20use vec1::Vec1;
21
22pub use crate::utils::{View, ViewInner};
23use crate::{
24 data::{
25 Leaf2, QuorumProposalWrapper, VidCommitment, VidDisperse, VidDisperseAndDuration,
26 VidDisperseShare,
27 },
28 epoch_membership::EpochMembershipCoordinator,
29 error::HotShotError,
30 event::{HotShotAction, LeafInfo},
31 message::{Proposal, UpgradeLock},
32 simple_certificate::{
33 DaCertificate2, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
34 QuorumCertificate2,
35 },
36 simple_vote::HasEpoch,
37 traits::{
38 block_contents::{BlockHeader, BuilderFee},
39 metrics::{Counter, Gauge, Histogram, Metrics, NoMetrics},
40 node_implementation::{ConsensusTime, NodeType, Versions},
41 signature_key::SignatureKey,
42 BlockPayload, ValidatedState,
43 },
44 utils::{
45 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_ge_epoch_root,
46 is_last_block, is_transition_block, option_epoch_from_block_number, BuilderCommitment,
47 LeafCommitment, StateAndDelta, Terminator,
48 },
49 vote::{Certificate, HasViewNumber},
50};
51
52pub type CommitmentMap<T> = HashMap<Commitment<T>, T>;
54
55pub type VidShares<TYPES> = BTreeMap<
57 <TYPES as NodeType>::View,
58 HashMap<
59 <TYPES as NodeType>::SignatureKey,
60 BTreeMap<Option<<TYPES as NodeType>::Epoch>, Proposal<TYPES, VidDisperseShare<TYPES>>>,
61 >,
62>;
63
64pub type LockedConsensusState<TYPES> = Arc<RwLock<Consensus<TYPES>>>;
66
67#[derive(Clone, Debug)]
69pub struct OuterConsensus<TYPES: NodeType> {
70 pub inner_consensus: LockedConsensusState<TYPES>,
72}
73
74impl<TYPES: NodeType> OuterConsensus<TYPES> {
75 pub fn new(consensus: LockedConsensusState<TYPES>) -> Self {
77 Self {
78 inner_consensus: consensus,
79 }
80 }
81
82 #[instrument(skip_all, target = "OuterConsensus")]
84 pub async fn read(&self) -> ConsensusReadLockGuard<'_, TYPES> {
85 tracing::trace!("Trying to acquire read lock on consensus");
86 let ret = self.inner_consensus.read().await;
87 tracing::trace!("Acquired read lock on consensus");
88 ConsensusReadLockGuard::new(ret)
89 }
90
91 #[instrument(skip_all, target = "OuterConsensus")]
93 pub async fn write(&self) -> ConsensusWriteLockGuard<'_, TYPES> {
94 tracing::trace!("Trying to acquire write lock on consensus");
95 let ret = self.inner_consensus.write().await;
96 tracing::trace!("Acquired write lock on consensus");
97 ConsensusWriteLockGuard::new(ret)
98 }
99
100 #[instrument(skip_all, target = "OuterConsensus")]
102 pub fn try_write(&self) -> Option<ConsensusWriteLockGuard<'_, TYPES>> {
103 tracing::trace!("Trying to acquire write lock on consensus");
104 let ret = self.inner_consensus.try_write();
105 if let Some(guard) = ret {
106 tracing::trace!("Acquired write lock on consensus");
107 Some(ConsensusWriteLockGuard::new(guard))
108 } else {
109 tracing::trace!("Failed to acquire write lock");
110 None
111 }
112 }
113
114 #[instrument(skip_all, target = "OuterConsensus")]
116 pub async fn upgradable_read(&self) -> ConsensusUpgradableReadLockGuard<'_, TYPES> {
117 tracing::trace!("Trying to acquire upgradable read lock on consensus");
118 let ret = self.inner_consensus.upgradable_read().await;
119 tracing::trace!("Acquired upgradable read lock on consensus");
120 ConsensusUpgradableReadLockGuard::new(ret)
121 }
122
123 #[instrument(skip_all, target = "OuterConsensus")]
125 pub fn try_read(&self) -> Option<ConsensusReadLockGuard<'_, TYPES>> {
126 tracing::trace!("Trying to acquire read lock on consensus");
127 let ret = self.inner_consensus.try_read();
128 if let Some(guard) = ret {
129 tracing::trace!("Acquired read lock on consensus");
130 Some(ConsensusReadLockGuard::new(guard))
131 } else {
132 tracing::trace!("Failed to acquire read lock");
133 None
134 }
135 }
136}
137
138pub struct ConsensusReadLockGuard<'a, TYPES: NodeType> {
140 lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>,
142}
143
144impl<'a, TYPES: NodeType> ConsensusReadLockGuard<'a, TYPES> {
145 #[must_use]
147 pub fn new(lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>) -> Self {
148 Self { lock_guard }
149 }
150}
151
152impl<TYPES: NodeType> Deref for ConsensusReadLockGuard<'_, TYPES> {
153 type Target = Consensus<TYPES>;
154 fn deref(&self) -> &Self::Target {
155 &self.lock_guard
156 }
157}
158
159impl<TYPES: NodeType> Drop for ConsensusReadLockGuard<'_, TYPES> {
160 #[instrument(skip_all, target = "ConsensusReadLockGuard")]
161 fn drop(&mut self) {
162 tracing::trace!("Read lock on consensus dropped");
163 }
164}
165
166pub struct ConsensusWriteLockGuard<'a, TYPES: NodeType> {
168 lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>,
170}
171
172impl<'a, TYPES: NodeType> ConsensusWriteLockGuard<'a, TYPES> {
173 #[must_use]
175 pub fn new(lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>) -> Self {
176 Self { lock_guard }
177 }
178}
179
180impl<TYPES: NodeType> Deref for ConsensusWriteLockGuard<'_, TYPES> {
181 type Target = Consensus<TYPES>;
182 fn deref(&self) -> &Self::Target {
183 &self.lock_guard
184 }
185}
186
187impl<TYPES: NodeType> DerefMut for ConsensusWriteLockGuard<'_, TYPES> {
188 fn deref_mut(&mut self) -> &mut Self::Target {
189 &mut self.lock_guard
190 }
191}
192
193impl<TYPES: NodeType> Drop for ConsensusWriteLockGuard<'_, TYPES> {
194 #[instrument(skip_all, target = "ConsensusWriteLockGuard")]
195 fn drop(&mut self) {
196 tracing::debug!("Write lock on consensus dropped");
197 }
198}
199
200pub struct ConsensusUpgradableReadLockGuard<'a, TYPES: NodeType> {
202 lock_guard: ManuallyDrop<RwLockUpgradableReadGuard<'a, Consensus<TYPES>>>,
204 taken: bool,
206}
207
208impl<'a, TYPES: NodeType> ConsensusUpgradableReadLockGuard<'a, TYPES> {
209 #[must_use]
211 pub fn new(lock_guard: RwLockUpgradableReadGuard<'a, Consensus<TYPES>>) -> Self {
212 Self {
213 lock_guard: ManuallyDrop::new(lock_guard),
214 taken: false,
215 }
216 }
217
218 #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
220 pub async fn upgrade(mut guard: Self) -> ConsensusWriteLockGuard<'a, TYPES> {
221 let inner_guard = unsafe { ManuallyDrop::take(&mut guard.lock_guard) };
222 guard.taken = true;
223 tracing::debug!("Trying to upgrade upgradable read lock on consensus");
224 let ret = RwLockUpgradableReadGuard::upgrade(inner_guard).await;
225 tracing::debug!("Upgraded upgradable read lock on consensus");
226 ConsensusWriteLockGuard::new(ret)
227 }
228}
229
230impl<TYPES: NodeType> Deref for ConsensusUpgradableReadLockGuard<'_, TYPES> {
231 type Target = Consensus<TYPES>;
232
233 fn deref(&self) -> &Self::Target {
234 &self.lock_guard
235 }
236}
237
238impl<TYPES: NodeType> Drop for ConsensusUpgradableReadLockGuard<'_, TYPES> {
239 #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
240 fn drop(&mut self) {
241 if !self.taken {
242 unsafe { ManuallyDrop::drop(&mut self.lock_guard) }
243 tracing::debug!("Upgradable read lock on consensus dropped");
244 }
245 }
246}
247
248#[derive(Debug, Clone, Copy)]
250struct HotShotActionViews<T: ConsensusTime> {
251 proposed: T,
253 voted: T,
255 da_proposed: T,
257 da_vote: T,
259}
260
261impl<T: ConsensusTime> Default for HotShotActionViews<T> {
262 fn default() -> Self {
263 let genesis = T::genesis();
264 Self {
265 proposed: genesis,
266 voted: genesis,
267 da_proposed: genesis,
268 da_vote: genesis,
269 }
270 }
271}
272impl<T: ConsensusTime> HotShotActionViews<T> {
273 fn from_view(view: T) -> Self {
275 Self {
276 proposed: view,
277 voted: view,
278 da_proposed: view,
279 da_vote: view,
280 }
281 }
282}
283
284#[derive(Debug, Clone)]
285struct ValidatorParticipation<TYPES: NodeType> {
286 epoch: TYPES::Epoch,
287 current_epoch_participation: HashMap<TYPES::SignatureKey, (u64, u64)>,
289
290 last_epoch_participation: HashMap<TYPES::SignatureKey, (u64, u64)>,
292}
293
294impl<TYPES: NodeType> ValidatorParticipation<TYPES> {
295 fn new() -> Self {
296 Self {
297 epoch: TYPES::Epoch::genesis(),
298 current_epoch_participation: HashMap::new(),
299 last_epoch_participation: HashMap::new(),
300 }
301 }
302
303 fn update_participation(
304 &mut self,
305 key: TYPES::SignatureKey,
306 epoch: TYPES::Epoch,
307 proposed: bool,
308 ) {
309 if epoch != self.epoch {
310 return;
311 }
312 let entry = self
313 .current_epoch_participation
314 .entry(key)
315 .or_insert((0, 0));
316 if proposed {
317 entry.1 += 1;
318 }
319 entry.0 += 1;
320 }
321
322 fn update_participation_epoch(&mut self, epoch: TYPES::Epoch) {
323 if epoch <= self.epoch {
324 return;
325 }
326 self.epoch = epoch;
327 self.last_epoch_participation = self.current_epoch_participation.clone();
328 self.current_epoch_participation = HashMap::new();
329 }
330
331 fn get_participation(&self, key: TYPES::SignatureKey) -> (f64, Option<f64>) {
332 let current_epoch_participation = self
333 .current_epoch_participation
334 .get(&key)
335 .unwrap_or(&(0, 0));
336 let num_leader = current_epoch_participation.0;
337 let num_proposed = current_epoch_participation.1;
338
339 let current_epoch_participation_ratio = if num_leader == 0 {
340 0.0
341 } else {
342 num_proposed as f64 / num_leader as f64
343 };
344 let last_epoch_participation = self.last_epoch_participation.get(&key);
345 let last_epoch_participation_ratio =
346 last_epoch_participation.map(|(leader, proposed)| *leader as f64 / *proposed as f64);
347 (
348 current_epoch_participation_ratio,
349 last_epoch_participation_ratio,
350 )
351 }
352
353 fn current_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
354 self.current_epoch_participation
355 .iter()
356 .map(|(key, (leader, proposed))| (key.clone(), *leader as f64 / *proposed as f64))
357 .collect()
358 }
359 fn previous_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
360 self.last_epoch_participation
361 .iter()
362 .map(|(key, (leader, proposed))| (key.clone(), *leader as f64 / *proposed as f64))
363 .collect()
364 }
365}
366
367#[derive(derive_more::Debug, Clone)]
371pub struct Consensus<TYPES: NodeType> {
372 validated_state_map: BTreeMap<TYPES::View, View<TYPES>>,
374
375 vid_shares: VidShares<TYPES>,
377
378 saved_da_certs: HashMap<TYPES::View, DaCertificate2<TYPES>>,
381
382 cur_view: TYPES::View,
384
385 cur_epoch: Option<TYPES::Epoch>,
387
388 last_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
391
392 last_decided_view: TYPES::View,
394
395 locked_view: TYPES::View,
397
398 saved_leaves: CommitmentMap<Leaf2<TYPES>>,
402
403 last_actions: HotShotActionViews<TYPES::View>,
407
408 saved_payloads: BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>>,
412
413 high_qc: QuorumCertificate2<TYPES>,
415
416 next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
418
419 validator_participation: ValidatorParticipation<TYPES>,
421
422 pub metrics: Arc<ConsensusMetricsValue>,
424
425 pub epoch_height: u64,
427
428 pub drb_difficulty: u64,
430
431 pub drb_upgrade_difficulty: u64,
433
434 transition_qc: Option<(
436 QuorumCertificate2<TYPES>,
437 NextEpochQuorumCertificate2<TYPES>,
438 )>,
439
440 pub highest_block: u64,
442 pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
444}
445
446#[derive(Debug, Clone, Hash, Eq, PartialEq)]
448pub struct PayloadWithMetadata<TYPES: NodeType> {
449 pub payload: TYPES::BlockPayload,
450 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
451}
452
453#[derive(Clone, Debug)]
455pub struct ConsensusMetricsValue {
456 pub last_synced_block_height: Box<dyn Gauge>,
458 pub last_decided_view: Box<dyn Gauge>,
460 pub last_voted_view: Box<dyn Gauge>,
462 pub last_decided_time: Box<dyn Gauge>,
464 pub current_view: Box<dyn Gauge>,
466 pub number_of_views_since_last_decide: Box<dyn Gauge>,
468 pub number_of_views_per_decide_event: Box<dyn Histogram>,
470 pub view_duration_as_leader: Box<dyn Histogram>,
472 pub invalid_qc: Box<dyn Gauge>,
474 pub outstanding_transactions: Box<dyn Gauge>,
476 pub outstanding_transactions_memory_size: Box<dyn Gauge>,
478 pub number_of_timeouts: Box<dyn Counter>,
480 pub number_of_timeouts_as_leader: Box<dyn Counter>,
482 pub number_of_empty_blocks_proposed: Box<dyn Counter>,
484 pub internal_event_queue_len: Box<dyn Gauge>,
486 pub proposal_to_decide_time: Box<dyn Histogram>,
488 pub previous_proposal_to_proposal_time: Box<dyn Histogram>,
490 pub finalized_bytes: Box<dyn Histogram>,
492 pub validate_and_apply_header_duration: Box<dyn Histogram>,
494 pub update_leaf_duration: Box<dyn Histogram>,
496 pub vid_disperse_duration: Box<dyn Histogram>,
498}
499
500impl ConsensusMetricsValue {
501 #[must_use]
503 pub fn new(metrics: &dyn Metrics) -> Self {
504 Self {
505 last_synced_block_height: metrics
506 .create_gauge(String::from("last_synced_block_height"), None),
507 last_decided_view: metrics.create_gauge(String::from("last_decided_view"), None),
508 last_voted_view: metrics.create_gauge(String::from("last_voted_view"), None),
509 last_decided_time: metrics.create_gauge(String::from("last_decided_time"), None),
510 current_view: metrics.create_gauge(String::from("current_view"), None),
511 number_of_views_since_last_decide: metrics
512 .create_gauge(String::from("number_of_views_since_last_decide"), None),
513 number_of_views_per_decide_event: metrics
514 .create_histogram(String::from("number_of_views_per_decide_event"), None),
515 view_duration_as_leader: metrics
516 .create_histogram(String::from("view_duration_as_leader"), None),
517 invalid_qc: metrics.create_gauge(String::from("invalid_qc"), None),
518 outstanding_transactions: metrics
519 .create_gauge(String::from("outstanding_transactions"), None),
520 outstanding_transactions_memory_size: metrics
521 .create_gauge(String::from("outstanding_transactions_memory_size"), None),
522 number_of_timeouts: metrics.create_counter(String::from("number_of_timeouts"), None),
523 number_of_timeouts_as_leader: metrics
524 .create_counter(String::from("number_of_timeouts_as_leader"), None),
525 number_of_empty_blocks_proposed: metrics
526 .create_counter(String::from("number_of_empty_blocks_proposed"), None),
527 internal_event_queue_len: metrics
528 .create_gauge(String::from("internal_event_queue_len"), None),
529 proposal_to_decide_time: metrics
530 .create_histogram(String::from("proposal_to_decide_time"), None),
531 previous_proposal_to_proposal_time: metrics
532 .create_histogram(String::from("previous_proposal_to_proposal_time"), None),
533 finalized_bytes: metrics.create_histogram(String::from("finalized_bytes"), None),
534 validate_and_apply_header_duration: metrics.create_histogram(
535 String::from("validate_and_apply_header_duration"),
536 Some("seconds".to_string()),
537 ),
538 update_leaf_duration: metrics.create_histogram(
539 String::from("update_leaf_duration"),
540 Some("seconds".to_string()),
541 ),
542 vid_disperse_duration: metrics.create_histogram(
543 String::from("vid_disperse_duration"),
544 Some("seconds".to_string()),
545 ),
546 }
547 }
548}
549
550impl Default for ConsensusMetricsValue {
551 fn default() -> Self {
552 Self::new(&*NoMetrics::boxed())
553 }
554}
555
556impl<TYPES: NodeType> Consensus<TYPES> {
557 #[allow(clippy::too_many_arguments)]
559 pub fn new(
560 validated_state_map: BTreeMap<TYPES::View, View<TYPES>>,
561 vid_shares: Option<VidShares<TYPES>>,
562 cur_view: TYPES::View,
563 cur_epoch: Option<TYPES::Epoch>,
564 locked_view: TYPES::View,
565 last_decided_view: TYPES::View,
566 last_actioned_view: TYPES::View,
567 last_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
568 saved_leaves: CommitmentMap<Leaf2<TYPES>>,
569 saved_payloads: BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>>,
570 high_qc: QuorumCertificate2<TYPES>,
571 next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
572 metrics: Arc<ConsensusMetricsValue>,
573 epoch_height: u64,
574 state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
575 drb_difficulty: u64,
576 drb_upgrade_difficulty: u64,
577 ) -> Self {
578 let transition_qc = if let Some(ref next_epoch_high_qc) = next_epoch_high_qc {
579 if high_qc
580 .data
581 .block_number
582 .is_some_and(|bn| is_transition_block(bn, epoch_height))
583 {
584 if high_qc.data.leaf_commit == next_epoch_high_qc.data.leaf_commit {
585 Some((high_qc.clone(), next_epoch_high_qc.clone()))
586 } else {
587 tracing::error!("Next epoch high QC has different leaf commit to high QC");
588 None
589 }
590 } else {
591 None
592 }
593 } else {
594 None
595 };
596 Consensus {
597 validated_state_map,
598 vid_shares: vid_shares.unwrap_or_default(),
599 saved_da_certs: HashMap::new(),
600 cur_view,
601 cur_epoch,
602 last_decided_view,
603 last_proposals,
604 last_actions: HotShotActionViews::from_view(last_actioned_view),
605 locked_view,
606 saved_leaves,
607 saved_payloads,
608 high_qc,
609 next_epoch_high_qc,
610 metrics,
611 epoch_height,
612 transition_qc,
613 highest_block: 0,
614 state_cert,
615 drb_difficulty,
616 validator_participation: ValidatorParticipation::new(),
617 drb_upgrade_difficulty,
618 }
619 }
620
621 pub fn cur_view(&self) -> TYPES::View {
623 self.cur_view
624 }
625
626 pub fn cur_epoch(&self) -> Option<TYPES::Epoch> {
628 self.cur_epoch
629 }
630
631 pub fn last_decided_view(&self) -> TYPES::View {
633 self.last_decided_view
634 }
635
636 pub fn locked_view(&self) -> TYPES::View {
638 self.locked_view
639 }
640
641 pub fn high_qc(&self) -> &QuorumCertificate2<TYPES> {
643 &self.high_qc
644 }
645
646 pub fn transition_qc(
648 &self,
649 ) -> Option<&(
650 QuorumCertificate2<TYPES>,
651 NextEpochQuorumCertificate2<TYPES>,
652 )> {
653 self.transition_qc.as_ref()
654 }
655
656 pub fn update_highest_block(&mut self, block_number: u64) {
658 if block_number > self.highest_block {
659 self.highest_block = block_number;
660 return;
661 }
662
663 if is_epoch_transition(block_number, self.epoch_height) {
664 let new_epoch = epoch_from_block_number(block_number, self.epoch_height);
665 let high_epoch = epoch_from_block_number(self.highest_block, self.epoch_height);
666 if new_epoch >= high_epoch {
667 self.highest_block = block_number;
668 }
669 }
670 }
671
672 pub fn update_transition_qc(
674 &mut self,
675 qc: QuorumCertificate2<TYPES>,
676 next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
677 ) {
678 if next_epoch_qc.data.leaf_commit != qc.data().leaf_commit {
679 tracing::error!(
680 "Next epoch QC for view {} has different leaf commit {:?} to {:?}",
681 qc.view_number(),
682 next_epoch_qc.data.leaf_commit,
683 qc.data().leaf_commit
684 );
685 return;
686 }
687 if let Some((transition_qc, _)) = &self.transition_qc {
688 if transition_qc.view_number() >= qc.view_number() {
689 return;
690 }
691 }
692 self.transition_qc = Some((qc, next_epoch_qc));
693 }
694
695 pub fn state_cert(&self) -> Option<&LightClientStateUpdateCertificateV2<TYPES>> {
697 self.state_cert.as_ref()
698 }
699
700 pub fn next_epoch_high_qc(&self) -> Option<&NextEpochQuorumCertificate2<TYPES>> {
702 self.next_epoch_high_qc.as_ref()
703 }
704
705 pub fn validated_state_map(&self) -> &BTreeMap<TYPES::View, View<TYPES>> {
707 &self.validated_state_map
708 }
709
710 pub fn saved_leaves(&self) -> &CommitmentMap<Leaf2<TYPES>> {
712 &self.saved_leaves
713 }
714
715 pub fn saved_payloads(&self) -> &BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>> {
717 &self.saved_payloads
718 }
719
720 pub fn vid_shares(&self) -> &VidShares<TYPES> {
722 &self.vid_shares
723 }
724
725 pub fn saved_da_certs(&self) -> &HashMap<TYPES::View, DaCertificate2<TYPES>> {
727 &self.saved_da_certs
728 }
729
730 pub fn last_proposals(
732 &self,
733 ) -> &BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
734 &self.last_proposals
735 }
736
737 pub fn update_view(&mut self, view_number: TYPES::View) -> Result<()> {
741 ensure!(
742 view_number > self.cur_view,
743 debug!("New view isn't newer than the current view.")
744 );
745 self.cur_view = view_number;
746 Ok(())
747 }
748
749 pub fn update_validator_participation(
751 &mut self,
752 key: TYPES::SignatureKey,
753 epoch: TYPES::Epoch,
754 proposed: bool,
755 ) {
756 self.validator_participation
757 .update_participation(key, epoch, proposed);
758 }
759
760 pub fn update_validator_participation_epoch(&mut self, epoch: TYPES::Epoch) {
762 self.validator_participation
763 .update_participation_epoch(epoch);
764 }
765
766 pub fn get_validator_participation(&self, key: TYPES::SignatureKey) -> (f64, Option<f64>) {
768 self.validator_participation.get_participation(key)
769 }
770
771 pub fn current_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
773 self.validator_participation
774 .current_proposal_participation()
775 }
776
777 pub fn previous_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
779 self.validator_participation
780 .previous_proposal_participation()
781 }
782
783 pub async fn parent_leaf_info(
786 &self,
787 leaf: &Leaf2<TYPES>,
788 public_key: &TYPES::SignatureKey,
789 ) -> Option<LeafInfo<TYPES>> {
790 let parent_view_number = leaf.justify_qc().view_number();
791 let parent_epoch = leaf.justify_qc().epoch();
792 let parent_leaf = self
793 .saved_leaves
794 .get(&leaf.justify_qc().data().leaf_commit)?;
795 let parent_state_and_delta = self.state_and_delta(parent_view_number);
796 let (Some(state), delta) = parent_state_and_delta else {
797 return None;
798 };
799
800 let parent_vid = self
801 .vid_shares()
802 .get(&parent_view_number)
803 .and_then(|key_map| key_map.get(public_key))
804 .and_then(|epoch_map| epoch_map.get(&parent_epoch))
805 .map(|prop| prop.data.clone());
806
807 let state_cert = if parent_leaf.with_epoch
808 && is_epoch_root(parent_leaf.block_header().block_number(), self.epoch_height)
809 {
810 match self.state_cert() {
811 Some(state_cert)
813 if state_cert.light_client_state.view_number == parent_view_number.u64() =>
814 {
815 Some(state_cert.clone())
816 },
817 _ => None,
818 }
819 } else {
820 None
821 };
822
823 Some(LeafInfo {
824 leaf: parent_leaf.clone(),
825 state,
826 delta,
827 vid_share: parent_vid,
828 state_cert,
829 })
830 }
831
832 pub fn update_epoch(&mut self, epoch_number: TYPES::Epoch) -> Result<()> {
836 ensure!(
837 self.cur_epoch.is_none() || Some(epoch_number) > self.cur_epoch,
838 debug!("New epoch isn't newer than the current epoch.")
839 );
840 tracing::trace!(
841 "Updating epoch from {:?} to {}",
842 self.cur_epoch,
843 epoch_number
844 );
845 self.cur_epoch = Some(epoch_number);
846 Ok(())
847 }
848
849 pub fn update_action(&mut self, action: HotShotAction, view: TYPES::View) -> bool {
853 let old_view = match action {
854 HotShotAction::Vote => &mut self.last_actions.voted,
855 HotShotAction::Propose => &mut self.last_actions.proposed,
856 HotShotAction::DaPropose => &mut self.last_actions.da_proposed,
857 HotShotAction::DaVote => {
858 if view > self.last_actions.da_vote {
859 self.last_actions.da_vote = view;
860 }
861 return true;
866 },
867 _ => return true,
868 };
869 if view > *old_view {
870 *old_view = view;
871 return true;
872 }
873 false
874 }
875
876 pub fn reset_actions(&mut self) {
878 self.last_actions = HotShotActionViews::default();
879 }
880
881 pub fn update_proposed_view(
886 &mut self,
887 proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
888 ) -> Result<()> {
889 ensure!(
890 proposal.data.view_number()
891 > self
892 .last_proposals
893 .last_key_value()
894 .map_or(TYPES::View::genesis(), |(k, _)| { *k }),
895 debug!("New view isn't newer than the previously proposed view.")
896 );
897 self.last_proposals
898 .insert(proposal.data.view_number(), proposal);
899 Ok(())
900 }
901
902 pub fn update_last_decided_view(&mut self, view_number: TYPES::View) -> Result<()> {
907 ensure!(
908 view_number > self.last_decided_view,
909 debug!("New view isn't newer than the previously decided view.")
910 );
911 self.last_decided_view = view_number;
912 Ok(())
913 }
914
915 pub fn update_locked_view(&mut self, view_number: TYPES::View) -> Result<()> {
920 ensure!(
921 view_number > self.locked_view,
922 debug!("New view isn't newer than the previously locked view.")
923 );
924 self.locked_view = view_number;
925 Ok(())
926 }
927
928 pub fn update_da_view(
934 &mut self,
935 view_number: TYPES::View,
936 epoch: Option<TYPES::Epoch>,
937 payload_commitment: VidCommitment,
938 ) -> Result<()> {
939 let view = View {
940 view_inner: ViewInner::Da {
941 payload_commitment,
942 epoch,
943 },
944 };
945 self.update_validated_state_map(view_number, view)
946 }
947
948 pub fn update_leaf(
954 &mut self,
955 leaf: Leaf2<TYPES>,
956 state: Arc<TYPES::ValidatedState>,
957 delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
958 ) -> Result<()> {
959 let view_number = leaf.view_number();
960 let epoch = option_epoch_from_block_number::<TYPES>(
961 leaf.with_epoch,
962 leaf.height(),
963 self.epoch_height,
964 );
965 let view = View {
966 view_inner: ViewInner::Leaf {
967 leaf: leaf.commit(),
968 state,
969 delta,
970 epoch,
971 },
972 };
973 self.update_validated_state_map(view_number, view)?;
974 self.update_saved_leaves(leaf);
975 Ok(())
976 }
977
978 fn update_validated_state_map(
984 &mut self,
985 view_number: TYPES::View,
986 new_view: View<TYPES>,
987 ) -> Result<()> {
988 if let Some(existing_view) = self.validated_state_map().get(&view_number) {
989 if let ViewInner::Leaf {
990 delta: ref existing_delta,
991 ..
992 } = existing_view.view_inner
993 {
994 if let ViewInner::Leaf {
995 delta: ref new_delta,
996 ..
997 } = new_view.view_inner
998 {
999 ensure!(
1000 new_delta.is_some() || existing_delta.is_none(),
1001 debug!(
1002 "Skipping the state update to not override a `Leaf` view with `Some` \
1003 state delta."
1004 )
1005 );
1006 } else {
1007 bail!(
1008 "Skipping the state update to not override a `Leaf` view with a \
1009 non-`Leaf` view."
1010 );
1011 }
1012 }
1013 }
1014 self.validated_state_map.insert(view_number, new_view);
1015 Ok(())
1016 }
1017
1018 fn update_saved_leaves(&mut self, leaf: Leaf2<TYPES>) {
1020 self.saved_leaves.insert(leaf.commit(), leaf);
1021 }
1022
1023 pub fn update_saved_payloads(
1028 &mut self,
1029 view_number: TYPES::View,
1030 payload: Arc<PayloadWithMetadata<TYPES>>,
1031 ) -> Result<()> {
1032 ensure!(
1033 !self.saved_payloads.contains_key(&view_number),
1034 "Payload with the same view already exists."
1035 );
1036 self.saved_payloads.insert(view_number, payload);
1037 Ok(())
1038 }
1039
1040 pub fn update_high_qc(&mut self, high_qc: QuorumCertificate2<TYPES>) -> Result<()> {
1044 if self.high_qc == high_qc {
1045 return Ok(());
1046 }
1047 ensure!(
1049 high_qc.view_number > self.high_qc.view_number,
1050 debug!("High QC with an equal or higher view exists.")
1051 );
1052 tracing::debug!("Updating high QC");
1053 self.high_qc = high_qc;
1054
1055 Ok(())
1056 }
1057
1058 pub fn update_next_epoch_high_qc(
1064 &mut self,
1065 high_qc: NextEpochQuorumCertificate2<TYPES>,
1066 ) -> Result<()> {
1067 if self.next_epoch_high_qc.as_ref() == Some(&high_qc) {
1068 return Ok(());
1069 }
1070 if let Some(next_epoch_high_qc) = self.next_epoch_high_qc() {
1071 ensure!(
1072 high_qc.view_number > next_epoch_high_qc.view_number,
1073 debug!("Next epoch high QC with an equal or higher view exists.")
1074 );
1075 }
1076 tracing::debug!("Updating next epoch high QC");
1077 self.next_epoch_high_qc = Some(high_qc);
1078
1079 Ok(())
1080 }
1081
1082 pub fn reset_high_qc(
1086 &mut self,
1087 high_qc: QuorumCertificate2<TYPES>,
1088 next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
1089 ) -> Result<()> {
1090 ensure!(
1091 high_qc.data.leaf_commit == next_epoch_qc.data.leaf_commit,
1092 error!("High QC's and next epoch QC's leaf commits do not match.")
1093 );
1094 if self.high_qc == high_qc {
1095 return Ok(());
1096 }
1097 let same_epoch = high_qc.data.block_number.is_some_and(|bn| {
1098 let current_qc = self.high_qc();
1099 let Some(high_bn) = current_qc.data.block_number else {
1100 return false;
1101 };
1102 epoch_from_block_number(bn + 1, self.epoch_height)
1103 == epoch_from_block_number(high_bn + 1, self.epoch_height)
1104 });
1105 ensure!(
1106 high_qc
1107 .data
1108 .block_number
1109 .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
1110 && same_epoch,
1111 error!("Provided QC is not a transition QC.")
1112 );
1113 tracing::debug!("Resetting high QC and next epoch high QC");
1114 self.high_qc = high_qc;
1115 self.next_epoch_high_qc = Some(next_epoch_qc);
1116
1117 Ok(())
1118 }
1119
1120 pub fn update_state_cert(
1124 &mut self,
1125 state_cert: LightClientStateUpdateCertificateV2<TYPES>,
1126 ) -> Result<()> {
1127 if let Some(existing_state_cert) = &self.state_cert {
1128 ensure!(
1129 state_cert.epoch > existing_state_cert.epoch,
1130 debug!(
1131 "Light client state update certification with an equal or higher epoch exists."
1132 )
1133 );
1134 }
1135 tracing::debug!("Updating light client state update certification");
1136 self.state_cert = Some(state_cert);
1137
1138 Ok(())
1139 }
1140
1141 pub fn update_vid_shares(
1143 &mut self,
1144 view_number: TYPES::View,
1145 disperse: Proposal<TYPES, VidDisperseShare<TYPES>>,
1146 ) {
1147 self.vid_shares
1148 .entry(view_number)
1149 .or_default()
1150 .entry(disperse.data.recipient_key().clone())
1151 .or_default()
1152 .insert(disperse.data.target_epoch(), disperse);
1153 }
1154
1155 pub fn update_saved_da_certs(&mut self, view_number: TYPES::View, cert: DaCertificate2<TYPES>) {
1157 self.saved_da_certs.insert(view_number, cert);
1158 }
1159
1160 pub fn visit_leaf_ancestors<F>(
1164 &self,
1165 start_from: TYPES::View,
1166 terminator: Terminator<TYPES::View>,
1167 ok_when_finished: bool,
1168 mut f: F,
1169 ) -> std::result::Result<(), HotShotError<TYPES>>
1170 where
1171 F: FnMut(
1172 &Leaf2<TYPES>,
1173 Arc<<TYPES as NodeType>::ValidatedState>,
1174 Option<Arc<<<TYPES as NodeType>::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1175 ) -> bool,
1176 {
1177 let mut next_leaf = if let Some(view) = self.validated_state_map.get(&start_from) {
1178 view.leaf_commitment().ok_or_else(|| {
1179 HotShotError::InvalidState(format!(
1180 "Visited failed view {start_from} leaf. Expected successful leaf"
1181 ))
1182 })?
1183 } else {
1184 return Err(HotShotError::InvalidState(format!(
1185 "View {start_from} leaf does not exist in state map "
1186 )));
1187 };
1188
1189 while let Some(leaf) = self.saved_leaves.get(&next_leaf) {
1190 let view = leaf.view_number();
1191 if let (Some(state), delta) = self.state_and_delta(view) {
1192 if let Terminator::Exclusive(stop_before) = terminator {
1193 if stop_before == view {
1194 if ok_when_finished {
1195 return Ok(());
1196 }
1197 break;
1198 }
1199 }
1200 next_leaf = leaf.parent_commitment();
1201 if !f(leaf, state, delta) {
1202 return Ok(());
1203 }
1204 if let Terminator::Inclusive(stop_after) = terminator {
1205 if stop_after == view {
1206 if ok_when_finished {
1207 return Ok(());
1208 }
1209 break;
1210 }
1211 }
1212 } else {
1213 return Err(HotShotError::InvalidState(format!(
1214 "View {view} state does not exist in state map"
1215 )));
1216 }
1217 }
1218 Err(HotShotError::MissingLeaf(next_leaf))
1219 }
1220
1221 pub fn collect_garbage(&mut self, old_anchor_view: TYPES::View, new_anchor_view: TYPES::View) {
1226 if new_anchor_view <= old_anchor_view {
1228 return;
1229 }
1230 let gc_view = TYPES::View::new(new_anchor_view.saturating_sub(1));
1231 let anchor_entry = self
1233 .validated_state_map
1234 .iter()
1235 .next()
1236 .expect("INCONSISTENT STATE: anchor leaf not in state map!");
1237 if **anchor_entry.0 != old_anchor_view.saturating_sub(1) {
1238 tracing::info!(
1239 "Something about GC has failed. Older leaf exists than the previous anchor leaf."
1240 );
1241 }
1242 self.saved_da_certs
1244 .retain(|view_number, _| *view_number >= old_anchor_view);
1245 self.validated_state_map
1246 .range(..gc_view)
1247 .filter_map(|(_view_number, view)| view.leaf_commitment())
1248 .for_each(|leaf| {
1249 self.saved_leaves.remove(&leaf);
1250 });
1251 self.validated_state_map = self.validated_state_map.split_off(&gc_view);
1252 self.saved_payloads = self.saved_payloads.split_off(&gc_view);
1253 self.vid_shares = self.vid_shares.split_off(&gc_view);
1254 self.last_proposals = self.last_proposals.split_off(&gc_view);
1255 }
1256
1257 #[must_use]
1263 pub fn decided_leaf(&self) -> Leaf2<TYPES> {
1264 let decided_view_num = self.last_decided_view;
1265 let view = self.validated_state_map.get(&decided_view_num).unwrap();
1266 let leaf = view
1267 .leaf_commitment()
1268 .expect("Decided leaf not found! Consensus internally inconsistent");
1269 self.saved_leaves.get(&leaf).unwrap().clone()
1270 }
1271
1272 pub fn undecided_leaves(&self) -> Vec<Leaf2<TYPES>> {
1273 self.saved_leaves.values().cloned().collect::<Vec<_>>()
1274 }
1275
1276 #[must_use]
1278 pub fn state(&self, view_number: TYPES::View) -> Option<&Arc<TYPES::ValidatedState>> {
1279 match self.validated_state_map.get(&view_number) {
1280 Some(view) => view.state(),
1281 None => None,
1282 }
1283 }
1284
1285 #[must_use]
1287 pub fn state_and_delta(&self, view_number: TYPES::View) -> StateAndDelta<TYPES> {
1288 match self.validated_state_map.get(&view_number) {
1289 Some(view) => view.state_and_delta(),
1290 None => (None, None),
1291 }
1292 }
1293
1294 #[must_use]
1300 pub fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
1301 let decided_view_num = self.last_decided_view;
1302 self.state_and_delta(decided_view_num)
1303 .0
1304 .expect("Decided state not found! Consensus internally inconsistent")
1305 }
1306
1307 #[instrument(skip_all, target = "Consensus", fields(view = *view))]
1313 pub async fn calculate_and_update_vid<V: Versions>(
1314 consensus: OuterConsensus<TYPES>,
1315 view: <TYPES as NodeType>::View,
1316 target_epoch: Option<<TYPES as NodeType>::Epoch>,
1317 membership_coordinator: EpochMembershipCoordinator<TYPES>,
1318 private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
1319 upgrade_lock: &UpgradeLock<TYPES, V>,
1320 ) -> Option<()> {
1321 let payload_with_metadata = Arc::clone(consensus.read().await.saved_payloads().get(&view)?);
1322 let epoch = consensus
1323 .read()
1324 .await
1325 .validated_state_map()
1326 .get(&view)?
1327 .view_inner
1328 .epoch()?;
1329
1330 let VidDisperseAndDuration {
1331 disperse: vid,
1332 duration: disperse_duration,
1333 } = VidDisperse::calculate_vid_disperse::<V>(
1334 &payload_with_metadata.payload,
1335 &membership_coordinator,
1336 view,
1337 target_epoch,
1338 epoch,
1339 &payload_with_metadata.metadata,
1340 upgrade_lock,
1341 )
1342 .await
1343 .ok()?;
1344
1345 let shares = VidDisperseShare::from_vid_disperse(vid);
1346 let mut consensus_writer = consensus.write().await;
1347 consensus_writer
1348 .metrics
1349 .vid_disperse_duration
1350 .add_point(disperse_duration.as_secs_f64());
1351 for share in shares {
1352 if let Some(prop) = share.to_proposal(private_key) {
1353 consensus_writer.update_vid_shares(view, prop);
1354 }
1355 }
1356
1357 Some(())
1358 }
1359 pub fn is_epoch_transition(&self, leaf_commit: LeafCommitment<TYPES>) -> bool {
1361 let Some(leaf) = self.saved_leaves.get(&leaf_commit) else {
1362 tracing::trace!("We don't have a leaf corresponding to the leaf commit");
1363 return false;
1364 };
1365 let block_height = leaf.height();
1366 is_epoch_transition(block_height, self.epoch_height)
1367 }
1368
1369 pub fn is_high_qc_for_epoch_transition(&self) -> bool {
1371 let Some(block_height) = self.high_qc().data.block_number else {
1372 return false;
1373 };
1374 is_epoch_transition(block_height, self.epoch_height)
1375 }
1376
1377 pub fn check_eqc(&self, proposed_leaf: &Leaf2<TYPES>, parent_leaf: &Leaf2<TYPES>) -> bool {
1379 if parent_leaf.view_number() == TYPES::View::genesis() {
1380 return true;
1381 }
1382 let new_epoch = epoch_from_block_number(proposed_leaf.height(), self.epoch_height);
1383 let old_epoch = epoch_from_block_number(parent_leaf.height(), self.epoch_height);
1384
1385 new_epoch - 1 == old_epoch && is_last_block(parent_leaf.height(), self.epoch_height)
1386 }
1387
1388 pub fn is_high_qc_ge_root_block(&self) -> bool {
1390 let Some(leaf) = self.saved_leaves.get(&self.high_qc().data.leaf_commit) else {
1391 tracing::trace!("We don't have a leaf corresponding to the high QC");
1392 return false;
1393 };
1394 let block_height = leaf.height();
1395 is_ge_epoch_root(block_height, self.epoch_height)
1396 }
1397}
1398
1399#[derive(Eq, PartialEq, Debug, Clone)]
1402pub struct CommitmentAndMetadata<TYPES: NodeType> {
1403 pub commitment: VidCommitment,
1405 pub builder_commitment: BuilderCommitment,
1407 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
1409 pub fees: Vec1<BuilderFee<TYPES>>,
1411 pub block_view: TYPES::View,
1413}