1use std::{
10 collections::{BTreeMap, HashMap},
11 mem::ManuallyDrop,
12 ops::{Deref, DerefMut},
13 sync::Arc,
14};
15
16use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
17use committable::{Commitment, Committable};
18use hotshot_utils::anytrace::*;
19use tracing::instrument;
20use vec1::Vec1;
21
22pub use crate::utils::{View, ViewInner};
23use crate::{
24 data::{
25 Leaf2, QuorumProposalWrapper, VidCommitment, VidDisperse, VidDisperseAndDuration,
26 VidDisperseShare,
27 },
28 epoch_membership::EpochMembershipCoordinator,
29 error::HotShotError,
30 event::{HotShotAction, LeafInfo},
31 message::{Proposal, UpgradeLock},
32 simple_certificate::{
33 DaCertificate2, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
34 QuorumCertificate2,
35 },
36 simple_vote::HasEpoch,
37 traits::{
38 block_contents::{BlockHeader, BuilderFee},
39 metrics::{Counter, Gauge, Histogram, Metrics, NoMetrics},
40 node_implementation::{ConsensusTime, NodeType, Versions},
41 signature_key::SignatureKey,
42 BlockPayload, ValidatedState,
43 },
44 utils::{
45 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_last_block,
46 is_transition_block, option_epoch_from_block_number, BuilderCommitment, LeafCommitment,
47 StateAndDelta, Terminator,
48 },
49 vote::{Certificate, HasViewNumber},
50};
51
52pub type CommitmentMap<T> = HashMap<Commitment<T>, T>;
54
55pub type VidShares<TYPES> = BTreeMap<
57 <TYPES as NodeType>::View,
58 HashMap<
59 <TYPES as NodeType>::SignatureKey,
60 BTreeMap<Option<<TYPES as NodeType>::Epoch>, Proposal<TYPES, VidDisperseShare<TYPES>>>,
61 >,
62>;
63
64pub type LockedConsensusState<TYPES> = Arc<RwLock<Consensus<TYPES>>>;
66
67#[derive(Clone, Debug)]
69pub struct OuterConsensus<TYPES: NodeType> {
70 pub inner_consensus: LockedConsensusState<TYPES>,
72}
73
74impl<TYPES: NodeType> OuterConsensus<TYPES> {
75 pub fn new(consensus: LockedConsensusState<TYPES>) -> Self {
77 Self {
78 inner_consensus: consensus,
79 }
80 }
81
82 #[instrument(skip_all, target = "OuterConsensus")]
84 pub async fn read(&self) -> ConsensusReadLockGuard<'_, TYPES> {
85 tracing::trace!("Trying to acquire read lock on consensus");
86 let ret = self.inner_consensus.read().await;
87 tracing::trace!("Acquired read lock on consensus");
88 ConsensusReadLockGuard::new(ret)
89 }
90
91 #[instrument(skip_all, target = "OuterConsensus")]
93 pub async fn write(&self) -> ConsensusWriteLockGuard<'_, TYPES> {
94 tracing::trace!("Trying to acquire write lock on consensus");
95 let ret = self.inner_consensus.write().await;
96 tracing::trace!("Acquired write lock on consensus");
97 ConsensusWriteLockGuard::new(ret)
98 }
99
100 #[instrument(skip_all, target = "OuterConsensus")]
102 pub fn try_write(&self) -> Option<ConsensusWriteLockGuard<'_, TYPES>> {
103 tracing::trace!("Trying to acquire write lock on consensus");
104 let ret = self.inner_consensus.try_write();
105 if let Some(guard) = ret {
106 tracing::trace!("Acquired write lock on consensus");
107 Some(ConsensusWriteLockGuard::new(guard))
108 } else {
109 tracing::trace!("Failed to acquire write lock");
110 None
111 }
112 }
113
114 #[instrument(skip_all, target = "OuterConsensus")]
116 pub async fn upgradable_read(&self) -> ConsensusUpgradableReadLockGuard<'_, TYPES> {
117 tracing::trace!("Trying to acquire upgradable read lock on consensus");
118 let ret = self.inner_consensus.upgradable_read().await;
119 tracing::trace!("Acquired upgradable read lock on consensus");
120 ConsensusUpgradableReadLockGuard::new(ret)
121 }
122
123 #[instrument(skip_all, target = "OuterConsensus")]
125 pub fn try_read(&self) -> Option<ConsensusReadLockGuard<'_, TYPES>> {
126 tracing::trace!("Trying to acquire read lock on consensus");
127 let ret = self.inner_consensus.try_read();
128 if let Some(guard) = ret {
129 tracing::trace!("Acquired read lock on consensus");
130 Some(ConsensusReadLockGuard::new(guard))
131 } else {
132 tracing::trace!("Failed to acquire read lock");
133 None
134 }
135 }
136}
137
138pub struct ConsensusReadLockGuard<'a, TYPES: NodeType> {
140 lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>,
142}
143
144impl<'a, TYPES: NodeType> ConsensusReadLockGuard<'a, TYPES> {
145 #[must_use]
147 pub fn new(lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>) -> Self {
148 Self { lock_guard }
149 }
150}
151
152impl<TYPES: NodeType> Deref for ConsensusReadLockGuard<'_, TYPES> {
153 type Target = Consensus<TYPES>;
154 fn deref(&self) -> &Self::Target {
155 &self.lock_guard
156 }
157}
158
159impl<TYPES: NodeType> Drop for ConsensusReadLockGuard<'_, TYPES> {
160 #[instrument(skip_all, target = "ConsensusReadLockGuard")]
161 fn drop(&mut self) {
162 tracing::trace!("Read lock on consensus dropped");
163 }
164}
165
166pub struct ConsensusWriteLockGuard<'a, TYPES: NodeType> {
168 lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>,
170}
171
172impl<'a, TYPES: NodeType> ConsensusWriteLockGuard<'a, TYPES> {
173 #[must_use]
175 pub fn new(lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>) -> Self {
176 Self { lock_guard }
177 }
178}
179
180impl<TYPES: NodeType> Deref for ConsensusWriteLockGuard<'_, TYPES> {
181 type Target = Consensus<TYPES>;
182 fn deref(&self) -> &Self::Target {
183 &self.lock_guard
184 }
185}
186
187impl<TYPES: NodeType> DerefMut for ConsensusWriteLockGuard<'_, TYPES> {
188 fn deref_mut(&mut self) -> &mut Self::Target {
189 &mut self.lock_guard
190 }
191}
192
193impl<TYPES: NodeType> Drop for ConsensusWriteLockGuard<'_, TYPES> {
194 #[instrument(skip_all, target = "ConsensusWriteLockGuard")]
195 fn drop(&mut self) {
196 tracing::debug!("Write lock on consensus dropped");
197 }
198}
199
200pub struct ConsensusUpgradableReadLockGuard<'a, TYPES: NodeType> {
202 lock_guard: ManuallyDrop<RwLockUpgradableReadGuard<'a, Consensus<TYPES>>>,
204 taken: bool,
206}
207
208impl<'a, TYPES: NodeType> ConsensusUpgradableReadLockGuard<'a, TYPES> {
209 #[must_use]
211 pub fn new(lock_guard: RwLockUpgradableReadGuard<'a, Consensus<TYPES>>) -> Self {
212 Self {
213 lock_guard: ManuallyDrop::new(lock_guard),
214 taken: false,
215 }
216 }
217
218 #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
220 #[allow(unused_assignments)] pub async fn upgrade(mut guard: Self) -> ConsensusWriteLockGuard<'a, TYPES> {
222 let inner_guard = unsafe { ManuallyDrop::take(&mut guard.lock_guard) };
223 guard.taken = true;
224 tracing::debug!("Trying to upgrade upgradable read lock on consensus");
225 let ret = RwLockUpgradableReadGuard::upgrade(inner_guard).await;
226 tracing::debug!("Upgraded upgradable read lock on consensus");
227 ConsensusWriteLockGuard::new(ret)
228 }
229}
230
231impl<TYPES: NodeType> Deref for ConsensusUpgradableReadLockGuard<'_, TYPES> {
232 type Target = Consensus<TYPES>;
233
234 fn deref(&self) -> &Self::Target {
235 &self.lock_guard
236 }
237}
238
239impl<TYPES: NodeType> Drop for ConsensusUpgradableReadLockGuard<'_, TYPES> {
240 #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
241 fn drop(&mut self) {
242 if !self.taken {
243 unsafe { ManuallyDrop::drop(&mut self.lock_guard) }
244 tracing::debug!("Upgradable read lock on consensus dropped");
245 }
246 }
247}
248
249#[derive(Debug, Clone, Copy)]
251struct HotShotActionViews<T: ConsensusTime> {
252 proposed: T,
254 voted: T,
256 da_proposed: T,
258 da_vote: T,
260}
261
262impl<T: ConsensusTime> Default for HotShotActionViews<T> {
263 fn default() -> Self {
264 let genesis = T::genesis();
265 Self {
266 proposed: genesis,
267 voted: genesis,
268 da_proposed: genesis,
269 da_vote: genesis,
270 }
271 }
272}
273impl<T: ConsensusTime> HotShotActionViews<T> {
274 fn from_view(view: T) -> Self {
276 Self {
277 proposed: view,
278 voted: view,
279 da_proposed: view,
280 da_vote: view,
281 }
282 }
283}
284
285#[derive(Debug, Clone)]
286struct ValidatorParticipation<TYPES: NodeType> {
287 epoch: TYPES::Epoch,
288 current_epoch_participation: HashMap<TYPES::SignatureKey, (u64, u64)>,
290
291 last_epoch_participation: HashMap<TYPES::SignatureKey, (u64, u64)>,
293}
294
295impl<TYPES: NodeType> ValidatorParticipation<TYPES> {
296 fn new() -> Self {
297 Self {
298 epoch: TYPES::Epoch::genesis(),
299 current_epoch_participation: HashMap::new(),
300 last_epoch_participation: HashMap::new(),
301 }
302 }
303
304 fn update_participation(
305 &mut self,
306 key: TYPES::SignatureKey,
307 epoch: TYPES::Epoch,
308 proposed: bool,
309 ) {
310 if epoch != self.epoch {
311 return;
312 }
313 let entry = self
314 .current_epoch_participation
315 .entry(key)
316 .or_insert((0, 0));
317 if proposed {
318 entry.1 += 1;
319 }
320 entry.0 += 1;
321 }
322
323 fn update_participation_epoch(&mut self, epoch: TYPES::Epoch) {
324 if epoch <= self.epoch {
325 return;
326 }
327 self.epoch = epoch;
328 self.last_epoch_participation = self.current_epoch_participation.clone();
329 self.current_epoch_participation = HashMap::new();
330 }
331
332 fn get_participation(&self, key: TYPES::SignatureKey) -> (f64, Option<f64>) {
333 let current_epoch_participation = self
334 .current_epoch_participation
335 .get(&key)
336 .unwrap_or(&(0, 0));
337 let num_leader = current_epoch_participation.0;
338 let num_proposed = current_epoch_participation.1;
339
340 let current_epoch_participation_ratio = if num_leader == 0 {
341 0.0
342 } else {
343 num_proposed as f64 / num_leader as f64
344 };
345 let last_epoch_participation = self.last_epoch_participation.get(&key);
346 let last_epoch_participation_ratio = last_epoch_participation.map(|(leader, proposed)| {
347 if *leader == 0 {
348 0.0
349 } else {
350 *proposed as f64 / *leader as f64
351 }
352 });
353 (
354 current_epoch_participation_ratio,
355 last_epoch_participation_ratio,
356 )
357 }
358
359 fn current_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
360 self.current_epoch_participation
361 .iter()
362 .map(|(key, (leader, proposed))| {
363 (
364 key.clone(),
365 if *leader == 0 {
366 0.0
367 } else {
368 *proposed as f64 / *leader as f64
369 },
370 )
371 })
372 .collect()
373 }
374 fn previous_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
375 self.last_epoch_participation
376 .iter()
377 .map(|(key, (leader, proposed))| {
378 (
379 key.clone(),
380 if *leader == 0 {
381 0.0
382 } else {
383 *proposed as f64 / *leader as f64
384 },
385 )
386 })
387 .collect()
388 }
389}
390
391#[derive(derive_more::Debug, Clone)]
395pub struct Consensus<TYPES: NodeType> {
396 validated_state_map: BTreeMap<TYPES::View, View<TYPES>>,
398
399 vid_shares: VidShares<TYPES>,
401
402 saved_da_certs: HashMap<TYPES::View, DaCertificate2<TYPES>>,
405
406 cur_view: TYPES::View,
408
409 cur_epoch: Option<TYPES::Epoch>,
411
412 last_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
415
416 last_decided_view: TYPES::View,
418
419 locked_view: TYPES::View,
421
422 saved_leaves: CommitmentMap<Leaf2<TYPES>>,
426
427 last_actions: HotShotActionViews<TYPES::View>,
431
432 saved_payloads: BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>>,
436
437 high_qc: QuorumCertificate2<TYPES>,
439
440 next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
442
443 validator_participation: ValidatorParticipation<TYPES>,
445
446 pub metrics: Arc<ConsensusMetricsValue>,
448
449 pub epoch_height: u64,
451
452 pub drb_difficulty: u64,
454
455 pub drb_upgrade_difficulty: u64,
457
458 transition_qc: Option<(
460 QuorumCertificate2<TYPES>,
461 NextEpochQuorumCertificate2<TYPES>,
462 )>,
463
464 pub highest_block: u64,
466 pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
468}
469
470#[derive(Debug, Clone, Hash, Eq, PartialEq)]
472pub struct PayloadWithMetadata<TYPES: NodeType> {
473 pub payload: TYPES::BlockPayload,
474 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
475}
476
477#[derive(Clone, Debug)]
479pub struct ConsensusMetricsValue {
480 pub last_synced_block_height: Box<dyn Gauge>,
482 pub last_decided_view: Box<dyn Gauge>,
484 pub last_voted_view: Box<dyn Gauge>,
486 pub last_decided_time: Box<dyn Gauge>,
488 pub current_view: Box<dyn Gauge>,
490 pub number_of_views_since_last_decide: Box<dyn Gauge>,
492 pub number_of_views_per_decide_event: Box<dyn Histogram>,
494 pub view_duration_as_leader: Box<dyn Histogram>,
496 pub invalid_qc: Box<dyn Gauge>,
498 pub outstanding_transactions: Box<dyn Gauge>,
500 pub outstanding_transactions_memory_size: Box<dyn Gauge>,
502 pub number_of_timeouts: Box<dyn Counter>,
504 pub number_of_timeouts_as_leader: Box<dyn Counter>,
506 pub number_of_empty_blocks_proposed: Box<dyn Counter>,
508 pub internal_event_queue_len: Box<dyn Gauge>,
510 pub proposal_to_decide_time: Box<dyn Histogram>,
512 pub previous_proposal_to_proposal_time: Box<dyn Histogram>,
514 pub finalized_bytes: Box<dyn Histogram>,
516 pub validate_and_apply_header_duration: Box<dyn Histogram>,
518 pub update_leaf_duration: Box<dyn Histogram>,
520 pub vid_disperse_duration: Box<dyn Histogram>,
522}
523
524impl ConsensusMetricsValue {
525 #[must_use]
527 pub fn new(metrics: &dyn Metrics) -> Self {
528 Self {
529 last_synced_block_height: metrics
530 .create_gauge(String::from("last_synced_block_height"), None),
531 last_decided_view: metrics.create_gauge(String::from("last_decided_view"), None),
532 last_voted_view: metrics.create_gauge(String::from("last_voted_view"), None),
533 last_decided_time: metrics.create_gauge(String::from("last_decided_time"), None),
534 current_view: metrics.create_gauge(String::from("current_view"), None),
535 number_of_views_since_last_decide: metrics
536 .create_gauge(String::from("number_of_views_since_last_decide"), None),
537 number_of_views_per_decide_event: metrics
538 .create_histogram(String::from("number_of_views_per_decide_event"), None),
539 view_duration_as_leader: metrics
540 .create_histogram(String::from("view_duration_as_leader"), None),
541 invalid_qc: metrics.create_gauge(String::from("invalid_qc"), None),
542 outstanding_transactions: metrics
543 .create_gauge(String::from("outstanding_transactions"), None),
544 outstanding_transactions_memory_size: metrics
545 .create_gauge(String::from("outstanding_transactions_memory_size"), None),
546 number_of_timeouts: metrics.create_counter(String::from("number_of_timeouts"), None),
547 number_of_timeouts_as_leader: metrics
548 .create_counter(String::from("number_of_timeouts_as_leader"), None),
549 number_of_empty_blocks_proposed: metrics
550 .create_counter(String::from("number_of_empty_blocks_proposed"), None),
551 internal_event_queue_len: metrics
552 .create_gauge(String::from("internal_event_queue_len"), None),
553 proposal_to_decide_time: metrics
554 .create_histogram(String::from("proposal_to_decide_time"), None),
555 previous_proposal_to_proposal_time: metrics
556 .create_histogram(String::from("previous_proposal_to_proposal_time"), None),
557 finalized_bytes: metrics.create_histogram(String::from("finalized_bytes"), None),
558 validate_and_apply_header_duration: metrics.create_histogram(
559 String::from("validate_and_apply_header_duration"),
560 Some("seconds".to_string()),
561 ),
562 update_leaf_duration: metrics.create_histogram(
563 String::from("update_leaf_duration"),
564 Some("seconds".to_string()),
565 ),
566 vid_disperse_duration: metrics.create_histogram(
567 String::from("vid_disperse_duration"),
568 Some("seconds".to_string()),
569 ),
570 }
571 }
572}
573
574impl Default for ConsensusMetricsValue {
575 fn default() -> Self {
576 Self::new(&*NoMetrics::boxed())
577 }
578}
579
580impl<TYPES: NodeType> Consensus<TYPES> {
581 #[allow(clippy::too_many_arguments)]
583 pub fn new(
584 validated_state_map: BTreeMap<TYPES::View, View<TYPES>>,
585 vid_shares: Option<VidShares<TYPES>>,
586 cur_view: TYPES::View,
587 cur_epoch: Option<TYPES::Epoch>,
588 locked_view: TYPES::View,
589 last_decided_view: TYPES::View,
590 last_actioned_view: TYPES::View,
591 last_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
592 saved_leaves: CommitmentMap<Leaf2<TYPES>>,
593 saved_payloads: BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>>,
594 high_qc: QuorumCertificate2<TYPES>,
595 next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
596 metrics: Arc<ConsensusMetricsValue>,
597 epoch_height: u64,
598 state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
599 drb_difficulty: u64,
600 drb_upgrade_difficulty: u64,
601 ) -> Self {
602 let transition_qc = if let Some(ref next_epoch_high_qc) = next_epoch_high_qc {
603 if high_qc
604 .data
605 .block_number
606 .is_some_and(|bn| is_transition_block(bn, epoch_height))
607 {
608 if high_qc.data.leaf_commit == next_epoch_high_qc.data.leaf_commit {
609 Some((high_qc.clone(), next_epoch_high_qc.clone()))
610 } else {
611 tracing::error!("Next epoch high QC has different leaf commit to high QC");
612 None
613 }
614 } else {
615 None
616 }
617 } else {
618 None
619 };
620 Consensus {
621 validated_state_map,
622 vid_shares: vid_shares.unwrap_or_default(),
623 saved_da_certs: HashMap::new(),
624 cur_view,
625 cur_epoch,
626 last_decided_view,
627 last_proposals,
628 last_actions: HotShotActionViews::from_view(last_actioned_view),
629 locked_view,
630 saved_leaves,
631 saved_payloads,
632 high_qc,
633 next_epoch_high_qc,
634 metrics,
635 epoch_height,
636 transition_qc,
637 highest_block: 0,
638 state_cert,
639 drb_difficulty,
640 validator_participation: ValidatorParticipation::new(),
641 drb_upgrade_difficulty,
642 }
643 }
644
645 pub fn cur_view(&self) -> TYPES::View {
647 self.cur_view
648 }
649
650 pub fn cur_epoch(&self) -> Option<TYPES::Epoch> {
652 self.cur_epoch
653 }
654
655 pub fn last_decided_view(&self) -> TYPES::View {
657 self.last_decided_view
658 }
659
660 pub fn locked_view(&self) -> TYPES::View {
662 self.locked_view
663 }
664
665 pub fn high_qc(&self) -> &QuorumCertificate2<TYPES> {
667 &self.high_qc
668 }
669
670 pub fn transition_qc(
672 &self,
673 ) -> Option<&(
674 QuorumCertificate2<TYPES>,
675 NextEpochQuorumCertificate2<TYPES>,
676 )> {
677 self.transition_qc.as_ref()
678 }
679
680 pub fn update_highest_block(&mut self, block_number: u64) {
682 if block_number > self.highest_block {
683 self.highest_block = block_number;
684 return;
685 }
686
687 if is_epoch_transition(block_number, self.epoch_height) {
688 let new_epoch = epoch_from_block_number(block_number, self.epoch_height);
689 let high_epoch = epoch_from_block_number(self.highest_block, self.epoch_height);
690 if new_epoch >= high_epoch {
691 self.highest_block = block_number;
692 }
693 }
694 }
695
696 pub fn update_transition_qc(
698 &mut self,
699 qc: QuorumCertificate2<TYPES>,
700 next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
701 ) {
702 if next_epoch_qc.data.leaf_commit != qc.data().leaf_commit {
703 tracing::error!(
704 "Next epoch QC for view {} has different leaf commit {:?} to {:?}",
705 qc.view_number(),
706 next_epoch_qc.data.leaf_commit,
707 qc.data().leaf_commit
708 );
709 return;
710 }
711 if let Some((transition_qc, _)) = &self.transition_qc {
712 if transition_qc.view_number() >= qc.view_number() {
713 return;
714 }
715 }
716 self.transition_qc = Some((qc, next_epoch_qc));
717 }
718
719 pub fn state_cert(&self) -> Option<&LightClientStateUpdateCertificateV2<TYPES>> {
721 self.state_cert.as_ref()
722 }
723
724 pub fn next_epoch_high_qc(&self) -> Option<&NextEpochQuorumCertificate2<TYPES>> {
726 self.next_epoch_high_qc.as_ref()
727 }
728
729 pub fn validated_state_map(&self) -> &BTreeMap<TYPES::View, View<TYPES>> {
731 &self.validated_state_map
732 }
733
734 pub fn saved_leaves(&self) -> &CommitmentMap<Leaf2<TYPES>> {
736 &self.saved_leaves
737 }
738
739 pub fn saved_payloads(&self) -> &BTreeMap<TYPES::View, Arc<PayloadWithMetadata<TYPES>>> {
741 &self.saved_payloads
742 }
743
744 pub fn vid_shares(&self) -> &VidShares<TYPES> {
746 &self.vid_shares
747 }
748
749 pub fn saved_da_certs(&self) -> &HashMap<TYPES::View, DaCertificate2<TYPES>> {
751 &self.saved_da_certs
752 }
753
754 pub fn last_proposals(
756 &self,
757 ) -> &BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
758 &self.last_proposals
759 }
760
761 pub fn update_view(&mut self, view_number: TYPES::View) -> Result<()> {
765 ensure!(
766 view_number > self.cur_view,
767 debug!("New view isn't newer than the current view.")
768 );
769 self.cur_view = view_number;
770 Ok(())
771 }
772
773 pub fn update_validator_participation(
775 &mut self,
776 key: TYPES::SignatureKey,
777 epoch: TYPES::Epoch,
778 proposed: bool,
779 ) {
780 self.validator_participation
781 .update_participation(key, epoch, proposed);
782 }
783
784 pub fn update_validator_participation_epoch(&mut self, epoch: TYPES::Epoch) {
786 self.validator_participation
787 .update_participation_epoch(epoch);
788 }
789
790 pub fn get_validator_participation(&self, key: TYPES::SignatureKey) -> (f64, Option<f64>) {
792 self.validator_participation.get_participation(key)
793 }
794
795 pub fn current_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
797 self.validator_participation
798 .current_proposal_participation()
799 }
800
801 pub fn previous_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
803 self.validator_participation
804 .previous_proposal_participation()
805 }
806
807 pub async fn parent_leaf_info(
810 &self,
811 leaf: &Leaf2<TYPES>,
812 public_key: &TYPES::SignatureKey,
813 ) -> Option<LeafInfo<TYPES>> {
814 let parent_view_number = leaf.justify_qc().view_number();
815 let parent_epoch = leaf.justify_qc().epoch();
816 let parent_leaf = self
817 .saved_leaves
818 .get(&leaf.justify_qc().data().leaf_commit)?;
819 let parent_state_and_delta = self.state_and_delta(parent_view_number);
820 let (Some(state), delta) = parent_state_and_delta else {
821 return None;
822 };
823
824 let parent_vid = self
825 .vid_shares()
826 .get(&parent_view_number)
827 .and_then(|key_map| key_map.get(public_key))
828 .and_then(|epoch_map| epoch_map.get(&parent_epoch))
829 .map(|prop| prop.data.clone());
830
831 let state_cert = if parent_leaf.with_epoch
832 && is_epoch_root(parent_leaf.block_header().block_number(), self.epoch_height)
833 {
834 match self.state_cert() {
835 Some(state_cert)
837 if state_cert.light_client_state.view_number == parent_view_number.u64() =>
838 {
839 Some(state_cert.clone())
840 },
841 _ => None,
842 }
843 } else {
844 None
845 };
846
847 Some(LeafInfo {
848 leaf: parent_leaf.clone(),
849 state,
850 delta,
851 vid_share: parent_vid,
852 state_cert,
853 })
854 }
855
856 pub fn update_epoch(&mut self, epoch_number: TYPES::Epoch) -> Result<()> {
860 ensure!(
861 self.cur_epoch.is_none() || Some(epoch_number) > self.cur_epoch,
862 debug!("New epoch isn't newer than the current epoch.")
863 );
864 tracing::trace!(
865 "Updating epoch from {:?} to {}",
866 self.cur_epoch,
867 epoch_number
868 );
869 self.cur_epoch = Some(epoch_number);
870 Ok(())
871 }
872
873 pub fn update_action(&mut self, action: HotShotAction, view: TYPES::View) -> bool {
877 let old_view = match action {
878 HotShotAction::Vote => &mut self.last_actions.voted,
879 HotShotAction::Propose => &mut self.last_actions.proposed,
880 HotShotAction::DaPropose => &mut self.last_actions.da_proposed,
881 HotShotAction::DaVote => {
882 if view > self.last_actions.da_vote {
883 self.last_actions.da_vote = view;
884 }
885 return true;
890 },
891 _ => return true,
892 };
893 if view > *old_view {
894 *old_view = view;
895 return true;
896 }
897 false
898 }
899
900 pub fn reset_actions(&mut self) {
902 self.last_actions = HotShotActionViews::default();
903 }
904
905 pub fn update_proposed_view(
910 &mut self,
911 proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
912 ) -> Result<()> {
913 ensure!(
914 proposal.data.view_number()
915 > self
916 .last_proposals
917 .last_key_value()
918 .map_or(TYPES::View::genesis(), |(k, _)| { *k }),
919 debug!("New view isn't newer than the previously proposed view.")
920 );
921 self.last_proposals
922 .insert(proposal.data.view_number(), proposal);
923 Ok(())
924 }
925
926 pub fn update_last_decided_view(&mut self, view_number: TYPES::View) -> Result<()> {
931 ensure!(
932 view_number > self.last_decided_view,
933 debug!("New view isn't newer than the previously decided view.")
934 );
935 self.last_decided_view = view_number;
936 Ok(())
937 }
938
939 pub fn update_locked_view(&mut self, view_number: TYPES::View) -> Result<()> {
944 ensure!(
945 view_number > self.locked_view,
946 debug!("New view isn't newer than the previously locked view.")
947 );
948 self.locked_view = view_number;
949 Ok(())
950 }
951
952 pub fn update_da_view(
958 &mut self,
959 view_number: TYPES::View,
960 epoch: Option<TYPES::Epoch>,
961 payload_commitment: VidCommitment,
962 ) -> Result<()> {
963 let view = View {
964 view_inner: ViewInner::Da {
965 payload_commitment,
966 epoch,
967 },
968 };
969 self.update_validated_state_map(view_number, view)
970 }
971
972 pub fn update_leaf(
978 &mut self,
979 leaf: Leaf2<TYPES>,
980 state: Arc<TYPES::ValidatedState>,
981 delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
982 ) -> Result<()> {
983 let view_number = leaf.view_number();
984 let epoch = option_epoch_from_block_number::<TYPES>(
985 leaf.with_epoch,
986 leaf.height(),
987 self.epoch_height,
988 );
989 let view = View {
990 view_inner: ViewInner::Leaf {
991 leaf: leaf.commit(),
992 state,
993 delta,
994 epoch,
995 },
996 };
997 self.update_validated_state_map(view_number, view)?;
998 self.update_saved_leaves(leaf);
999 Ok(())
1000 }
1001
1002 fn update_validated_state_map(
1008 &mut self,
1009 view_number: TYPES::View,
1010 new_view: View<TYPES>,
1011 ) -> Result<()> {
1012 if let Some(existing_view) = self.validated_state_map().get(&view_number) {
1013 if let ViewInner::Leaf {
1014 delta: ref existing_delta,
1015 ..
1016 } = existing_view.view_inner
1017 {
1018 if let ViewInner::Leaf {
1019 delta: ref new_delta,
1020 ..
1021 } = new_view.view_inner
1022 {
1023 ensure!(
1024 new_delta.is_some() || existing_delta.is_none(),
1025 debug!(
1026 "Skipping the state update to not override a `Leaf` view with `Some` \
1027 state delta."
1028 )
1029 );
1030 } else {
1031 bail!(
1032 "Skipping the state update to not override a `Leaf` view with a \
1033 non-`Leaf` view."
1034 );
1035 }
1036 }
1037 }
1038 self.validated_state_map.insert(view_number, new_view);
1039 Ok(())
1040 }
1041
1042 fn update_saved_leaves(&mut self, leaf: Leaf2<TYPES>) {
1044 self.saved_leaves.insert(leaf.commit(), leaf);
1045 }
1046
1047 pub fn update_saved_payloads(
1052 &mut self,
1053 view_number: TYPES::View,
1054 payload: Arc<PayloadWithMetadata<TYPES>>,
1055 ) -> Result<()> {
1056 ensure!(
1057 !self.saved_payloads.contains_key(&view_number),
1058 "Payload with the same view already exists."
1059 );
1060 self.saved_payloads.insert(view_number, payload);
1061 Ok(())
1062 }
1063
1064 pub fn update_high_qc(&mut self, high_qc: QuorumCertificate2<TYPES>) -> Result<()> {
1068 if self.high_qc == high_qc {
1069 return Ok(());
1070 }
1071 ensure!(
1073 high_qc.view_number > self.high_qc.view_number,
1074 debug!("High QC with an equal or higher view exists.")
1075 );
1076 tracing::debug!("Updating high QC");
1077 self.high_qc = high_qc;
1078
1079 Ok(())
1080 }
1081
1082 pub fn update_next_epoch_high_qc(
1088 &mut self,
1089 high_qc: NextEpochQuorumCertificate2<TYPES>,
1090 ) -> Result<()> {
1091 if self.next_epoch_high_qc.as_ref() == Some(&high_qc) {
1092 return Ok(());
1093 }
1094 if let Some(next_epoch_high_qc) = self.next_epoch_high_qc() {
1095 ensure!(
1096 high_qc.view_number > next_epoch_high_qc.view_number,
1097 debug!("Next epoch high QC with an equal or higher view exists.")
1098 );
1099 }
1100 tracing::debug!("Updating next epoch high QC");
1101 self.next_epoch_high_qc = Some(high_qc);
1102
1103 Ok(())
1104 }
1105
1106 pub fn reset_high_qc(
1110 &mut self,
1111 high_qc: QuorumCertificate2<TYPES>,
1112 next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
1113 ) -> Result<()> {
1114 ensure!(
1115 high_qc.data.leaf_commit == next_epoch_qc.data.leaf_commit,
1116 error!("High QC's and next epoch QC's leaf commits do not match.")
1117 );
1118 if self.high_qc == high_qc {
1119 return Ok(());
1120 }
1121 let same_epoch = high_qc.data.block_number.is_some_and(|bn| {
1122 let current_qc = self.high_qc();
1123 let Some(high_bn) = current_qc.data.block_number else {
1124 return false;
1125 };
1126 epoch_from_block_number(bn + 1, self.epoch_height)
1127 == epoch_from_block_number(high_bn + 1, self.epoch_height)
1128 });
1129 ensure!(
1130 high_qc
1131 .data
1132 .block_number
1133 .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
1134 && same_epoch,
1135 error!("Provided QC is not a transition QC.")
1136 );
1137 tracing::debug!("Resetting high QC and next epoch high QC");
1138 self.high_qc = high_qc;
1139 self.next_epoch_high_qc = Some(next_epoch_qc);
1140
1141 Ok(())
1142 }
1143
1144 pub fn update_state_cert(
1148 &mut self,
1149 state_cert: LightClientStateUpdateCertificateV2<TYPES>,
1150 ) -> Result<()> {
1151 if let Some(existing_state_cert) = &self.state_cert {
1152 ensure!(
1153 state_cert.epoch > existing_state_cert.epoch,
1154 debug!(
1155 "Light client state update certification with an equal or higher epoch exists."
1156 )
1157 );
1158 }
1159 tracing::debug!("Updating light client state update certification");
1160 self.state_cert = Some(state_cert);
1161
1162 Ok(())
1163 }
1164
1165 pub fn update_vid_shares(
1167 &mut self,
1168 view_number: TYPES::View,
1169 disperse: Proposal<TYPES, VidDisperseShare<TYPES>>,
1170 ) {
1171 self.vid_shares
1172 .entry(view_number)
1173 .or_default()
1174 .entry(disperse.data.recipient_key().clone())
1175 .or_default()
1176 .insert(disperse.data.target_epoch(), disperse);
1177 }
1178
1179 pub fn update_saved_da_certs(&mut self, view_number: TYPES::View, cert: DaCertificate2<TYPES>) {
1181 self.saved_da_certs.insert(view_number, cert);
1182 }
1183
1184 pub fn visit_leaf_ancestors<F>(
1188 &self,
1189 start_from: TYPES::View,
1190 terminator: Terminator<TYPES::View>,
1191 ok_when_finished: bool,
1192 mut f: F,
1193 ) -> std::result::Result<(), HotShotError<TYPES>>
1194 where
1195 F: FnMut(
1196 &Leaf2<TYPES>,
1197 Arc<<TYPES as NodeType>::ValidatedState>,
1198 Option<Arc<<<TYPES as NodeType>::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1199 ) -> bool,
1200 {
1201 let mut next_leaf = if let Some(view) = self.validated_state_map.get(&start_from) {
1202 view.leaf_commitment().ok_or_else(|| {
1203 HotShotError::InvalidState(format!(
1204 "Visited failed view {start_from} leaf. Expected successful leaf"
1205 ))
1206 })?
1207 } else {
1208 return Err(HotShotError::InvalidState(format!(
1209 "View {start_from} leaf does not exist in state map "
1210 )));
1211 };
1212
1213 while let Some(leaf) = self.saved_leaves.get(&next_leaf) {
1214 let view = leaf.view_number();
1215 if let (Some(state), delta) = self.state_and_delta(view) {
1216 if let Terminator::Exclusive(stop_before) = terminator {
1217 if stop_before == view {
1218 if ok_when_finished {
1219 return Ok(());
1220 }
1221 break;
1222 }
1223 }
1224 next_leaf = leaf.parent_commitment();
1225 if !f(leaf, state, delta) {
1226 return Ok(());
1227 }
1228 if let Terminator::Inclusive(stop_after) = terminator {
1229 if stop_after == view {
1230 if ok_when_finished {
1231 return Ok(());
1232 }
1233 break;
1234 }
1235 }
1236 } else {
1237 return Err(HotShotError::InvalidState(format!(
1238 "View {view} state does not exist in state map"
1239 )));
1240 }
1241 }
1242 Err(HotShotError::MissingLeaf(next_leaf))
1243 }
1244
1245 pub fn collect_garbage(&mut self, old_anchor_view: TYPES::View, new_anchor_view: TYPES::View) {
1250 if new_anchor_view <= old_anchor_view {
1252 return;
1253 }
1254 let gc_view = TYPES::View::new(new_anchor_view.saturating_sub(1));
1255 let anchor_entry = self
1257 .validated_state_map
1258 .iter()
1259 .next()
1260 .expect("INCONSISTENT STATE: anchor leaf not in state map!");
1261 if **anchor_entry.0 != old_anchor_view.saturating_sub(1) {
1262 tracing::info!(
1263 "Something about GC has failed. Older leaf exists than the previous anchor leaf."
1264 );
1265 }
1266 self.saved_da_certs
1268 .retain(|view_number, _| *view_number >= old_anchor_view);
1269 self.validated_state_map
1270 .range(..gc_view)
1271 .filter_map(|(_view_number, view)| view.leaf_commitment())
1272 .for_each(|leaf| {
1273 self.saved_leaves.remove(&leaf);
1274 });
1275 self.validated_state_map = self.validated_state_map.split_off(&gc_view);
1276 self.saved_payloads = self.saved_payloads.split_off(&gc_view);
1277 self.vid_shares = self.vid_shares.split_off(&gc_view);
1278 self.last_proposals = self.last_proposals.split_off(&gc_view);
1279 }
1280
1281 #[must_use]
1287 pub fn decided_leaf(&self) -> Leaf2<TYPES> {
1288 let decided_view_num = self.last_decided_view;
1289 let view = self.validated_state_map.get(&decided_view_num).unwrap();
1290 let leaf = view
1291 .leaf_commitment()
1292 .expect("Decided leaf not found! Consensus internally inconsistent");
1293 self.saved_leaves.get(&leaf).unwrap().clone()
1294 }
1295
1296 pub fn undecided_leaves(&self) -> Vec<Leaf2<TYPES>> {
1297 self.saved_leaves.values().cloned().collect::<Vec<_>>()
1298 }
1299
1300 #[must_use]
1302 pub fn state(&self, view_number: TYPES::View) -> Option<&Arc<TYPES::ValidatedState>> {
1303 match self.validated_state_map.get(&view_number) {
1304 Some(view) => view.state(),
1305 None => None,
1306 }
1307 }
1308
1309 #[must_use]
1311 pub fn state_and_delta(&self, view_number: TYPES::View) -> StateAndDelta<TYPES> {
1312 match self.validated_state_map.get(&view_number) {
1313 Some(view) => view.state_and_delta(),
1314 None => (None, None),
1315 }
1316 }
1317
1318 #[must_use]
1324 pub fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
1325 let decided_view_num = self.last_decided_view;
1326 self.state_and_delta(decided_view_num)
1327 .0
1328 .expect("Decided state not found! Consensus internally inconsistent")
1329 }
1330
1331 #[instrument(skip_all, target = "Consensus", fields(view = *view))]
1337 pub async fn calculate_and_update_vid<V: Versions>(
1338 consensus: OuterConsensus<TYPES>,
1339 view: <TYPES as NodeType>::View,
1340 target_epoch: Option<<TYPES as NodeType>::Epoch>,
1341 membership_coordinator: EpochMembershipCoordinator<TYPES>,
1342 private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
1343 upgrade_lock: &UpgradeLock<TYPES, V>,
1344 ) -> Option<()> {
1345 let payload_with_metadata = Arc::clone(consensus.read().await.saved_payloads().get(&view)?);
1346 let epoch = consensus
1347 .read()
1348 .await
1349 .validated_state_map()
1350 .get(&view)?
1351 .view_inner
1352 .epoch()?;
1353
1354 let VidDisperseAndDuration {
1355 disperse: vid,
1356 duration: disperse_duration,
1357 } = VidDisperse::calculate_vid_disperse::<V>(
1358 &payload_with_metadata.payload,
1359 &membership_coordinator,
1360 view,
1361 target_epoch,
1362 epoch,
1363 &payload_with_metadata.metadata,
1364 upgrade_lock,
1365 )
1366 .await
1367 .ok()?;
1368
1369 let mut consensus_writer = consensus.write().await;
1370 consensus_writer
1371 .metrics
1372 .vid_disperse_duration
1373 .add_point(disperse_duration.as_secs_f64());
1374 for share in vid.to_shares() {
1375 if let Some(prop) = share.to_proposal(private_key) {
1376 consensus_writer.update_vid_shares(view, prop);
1377 }
1378 }
1379
1380 Some(())
1381 }
1382 pub fn is_epoch_transition(&self, leaf_commit: LeafCommitment<TYPES>) -> bool {
1384 let Some(leaf) = self.saved_leaves.get(&leaf_commit) else {
1385 tracing::trace!("We don't have a leaf corresponding to the leaf commit");
1386 return false;
1387 };
1388 let block_height = leaf.height();
1389 is_epoch_transition(block_height, self.epoch_height)
1390 }
1391
1392 pub fn is_high_qc_for_epoch_transition(&self) -> bool {
1394 let Some(block_height) = self.high_qc().data.block_number else {
1395 return false;
1396 };
1397 is_epoch_transition(block_height, self.epoch_height)
1398 }
1399
1400 pub fn check_eqc(&self, proposed_leaf: &Leaf2<TYPES>, parent_leaf: &Leaf2<TYPES>) -> bool {
1402 if parent_leaf.view_number() == TYPES::View::genesis() {
1403 return true;
1404 }
1405 let new_epoch = epoch_from_block_number(proposed_leaf.height(), self.epoch_height);
1406 let old_epoch = epoch_from_block_number(parent_leaf.height(), self.epoch_height);
1407
1408 new_epoch - 1 == old_epoch && is_last_block(parent_leaf.height(), self.epoch_height)
1409 }
1410}
1411
1412#[derive(Eq, PartialEq, Debug, Clone)]
1415pub struct CommitmentAndMetadata<TYPES: NodeType> {
1416 pub commitment: VidCommitment,
1418 pub builder_commitment: BuilderCommitment,
1420 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
1422 pub fees: Vec1<BuilderFee<TYPES>>,
1424 pub block_view: TYPES::View,
1426}