1#[cfg(feature = "docs")]
12pub mod documentation;
13
14use committable::Committable;
15use futures::future::{Either, select};
16use hotshot_types::{
17 drb::{DrbResult, INITIAL_DRB_RESULT, drb_difficulty_selector},
18 epoch_membership::EpochMembershipCoordinator,
19 message::UpgradeLock,
20 simple_certificate::{CertificatePair, LightClientStateUpdateCertificateV2},
21 traits::{
22 block_contents::BlockHeader, election::Membership, network::BroadcastDelay,
23 signature_key::StateSignatureKey, storage::Storage,
24 },
25 utils::{epoch_from_block_number, is_ge_epoch_root},
26};
27use rand::Rng;
28
29pub mod traits;
31pub mod types;
33
34pub mod tasks;
35use hotshot_types::data::QuorumProposalWrapper;
36use versions::{EPOCH_VERSION, Upgrade};
37
38pub mod helpers;
40
41use std::{
42 collections::{BTreeMap, HashMap},
43 num::NonZeroUsize,
44 sync::Arc,
45 time::Duration,
46};
47
48use alloy::primitives::U256;
49use async_broadcast::{InactiveReceiver, Receiver, Sender, broadcast};
50use async_lock::RwLock;
51use async_trait::async_trait;
52use futures::join;
53use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
54use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
55pub use hotshot_types::error::HotShotError;
58use hotshot_types::{
59 HotShotConfig,
60 consensus::{
61 Consensus, ConsensusMetricsValue, OuterConsensus, PayloadWithMetadata, VidShares, View,
62 ViewInner,
63 },
64 constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
65 data::{EpochNumber, Leaf2, ViewNumber},
66 event::{EventType, LeafInfo},
67 message::{DataMessage, Message, MessageKind, Proposal},
68 simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
69 stake_table::HSStakeTable,
70 storage_metrics::StorageMetricsValue,
71 traits::{
72 consensus_api::ConsensusApi, network::ConnectedNetwork, node_implementation::NodeType,
73 signature_key::SignatureKey, states::ValidatedState,
74 },
75 utils::{genesis_epoch_from_version, option_epoch_from_block_number},
76};
77use hotshot_utils::warn;
78pub use rand;
80use tokio::{spawn, time::sleep};
81use tracing::{debug, instrument, trace};
82
83use crate::{
86 tasks::{add_consensus_tasks, add_network_tasks},
87 traits::NodeImplementation,
88 types::{Event, SystemContextHandle},
89};
90
91pub const H_512: usize = 64;
93pub const H_256: usize = 32;
95
96pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>> {
98 public_key: TYPES::SignatureKey,
100
101 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
103
104 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
106
107 pub config: HotShotConfig<TYPES>,
109
110 pub network: Arc<I::Network>,
112
113 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
115
116 metrics: Arc<ConsensusMetricsValue>,
118
119 consensus: OuterConsensus<TYPES>,
121
122 instance_state: Arc<TYPES::InstanceState>,
124
125 start_view: ViewNumber,
127
128 start_epoch: Option<EpochNumber>,
130
131 output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
133
134 pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
136
137 anchored_leaf: Leaf2<TYPES>,
139
140 #[allow(clippy::type_complexity)]
142 internal_event_stream: (
143 Sender<Arc<HotShotEvent<TYPES>>>,
144 InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
145 ),
146
147 pub id: u64,
149
150 pub storage: I::Storage,
152
153 pub storage_metrics: Arc<StorageMetricsValue>,
155
156 pub upgrade_lock: UpgradeLock<TYPES>,
158}
159impl<TYPES: NodeType, I: NodeImplementation<TYPES>> Clone for SystemContext<TYPES, I> {
160 #![allow(deprecated)]
161 fn clone(&self) -> Self {
162 Self {
163 public_key: self.public_key.clone(),
164 private_key: self.private_key.clone(),
165 state_private_key: self.state_private_key.clone(),
166 config: self.config.clone(),
167 network: Arc::clone(&self.network),
168 membership_coordinator: self.membership_coordinator.clone(),
169 metrics: Arc::clone(&self.metrics),
170 consensus: self.consensus.clone(),
171 instance_state: Arc::clone(&self.instance_state),
172 start_view: self.start_view,
173 start_epoch: self.start_epoch,
174 output_event_stream: self.output_event_stream.clone(),
175 external_event_stream: self.external_event_stream.clone(),
176 anchored_leaf: self.anchored_leaf.clone(),
177 internal_event_stream: self.internal_event_stream.clone(),
178 id: self.id,
179 storage: self.storage.clone(),
180 storage_metrics: Arc::clone(&self.storage_metrics),
181 upgrade_lock: self.upgrade_lock.clone(),
182 }
183 }
184}
185
186impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
187 #![allow(deprecated)]
188 #[allow(clippy::too_many_arguments)]
199 pub async fn new(
200 public_key: TYPES::SignatureKey,
201 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
202 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
203 nonce: u64,
204 config: HotShotConfig<TYPES>,
205 upgrade: versions::Upgrade,
206 memberships: EpochMembershipCoordinator<TYPES>,
207 network: Arc<I::Network>,
208 initializer: HotShotInitializer<TYPES>,
209 consensus_metrics: ConsensusMetricsValue,
210 storage: I::Storage,
211 storage_metrics: StorageMetricsValue,
212 ) -> Arc<Self> {
213 let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
214 let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
215
216 Self::new_from_channels(
217 public_key,
218 private_key,
219 state_private_key,
220 nonce,
221 config,
222 upgrade,
223 memberships,
224 network,
225 initializer,
226 consensus_metrics,
227 storage,
228 storage_metrics,
229 internal_chan,
230 external_chan,
231 )
232 .await
233 }
234
235 #[allow(clippy::too_many_arguments, clippy::type_complexity)]
243 pub async fn new_from_channels(
244 public_key: TYPES::SignatureKey,
245 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
246 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
247 nonce: u64,
248 config: HotShotConfig<TYPES>,
249 upgrade: versions::Upgrade,
250 mut membership_coordinator: EpochMembershipCoordinator<TYPES>,
251 network: Arc<I::Network>,
252 initializer: HotShotInitializer<TYPES>,
253 consensus_metrics: ConsensusMetricsValue,
254 storage: I::Storage,
255 storage_metrics: StorageMetricsValue,
256 internal_channel: (
257 Sender<Arc<HotShotEvent<TYPES>>>,
258 Receiver<Arc<HotShotEvent<TYPES>>>,
259 ),
260 external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
261 ) -> Arc<Self> {
262 debug!("Creating a new hotshot");
263
264 tracing::warn!("Starting consensus with HotShotConfig:\n\n {config:?}");
265
266 let consensus_metrics = Arc::new(consensus_metrics);
267 let storage_metrics = Arc::new(storage_metrics);
268 let anchored_leaf = initializer.anchor_leaf;
269 let instance_state = initializer.instance_state;
270
271 let (internal_tx, internal_rx) = internal_channel;
272 let (mut external_tx, external_rx) = external_channel;
273
274 let mut internal_rx = internal_rx.new_receiver();
275
276 let mut external_rx = external_rx.new_receiver();
277
278 internal_rx.set_overflow(true);
281 external_rx.set_overflow(true);
283
284 membership_coordinator
285 .set_external_channel(external_rx.clone())
286 .await;
287
288 tracing::warn!(
289 "Starting consensus with versions:\n\n Base: {:?}\nUpgrade: {:?}.",
290 upgrade.base,
291 upgrade.target
292 );
293 tracing::warn!(
294 "Loading previously decided upgrade certificate from storage: {:?}",
295 initializer.decided_upgrade_certificate
296 );
297
298 let upgrade_lock = UpgradeLock::<TYPES>::from_certificate(
299 upgrade,
300 &initializer.decided_upgrade_certificate,
301 );
302
303 let current_version = if let Some(cert) = initializer.decided_upgrade_certificate {
304 cert.data.new_version
305 } else {
306 upgrade.base
307 };
308
309 debug!("Setting DRB difficulty selector in membership");
310 let drb_difficulty_selector = drb_difficulty_selector(&config);
311
312 membership_coordinator
313 .set_drb_difficulty_selector(drb_difficulty_selector)
314 .await;
315
316 for da_committee in &config.da_committees {
317 if current_version >= da_committee.start_version {
318 membership_coordinator
319 .membership()
320 .write()
321 .await
322 .add_da_committee(da_committee.start_epoch, da_committee.committee.clone());
323 }
324 }
325
326 let validated_state = initializer.anchor_state;
329
330 load_start_epoch_info(
331 membership_coordinator.membership(),
332 &initializer.start_epoch_info,
333 config.epoch_height,
334 config.epoch_start_block,
335 )
336 .await;
337
338 let epoch = initializer.high_qc.data.block_number.map(|block_number| {
340 EpochNumber::new(epoch_from_block_number(
341 block_number + 1,
342 config.epoch_height,
343 ))
344 });
345
346 let mut validated_state_map = BTreeMap::default();
348 validated_state_map.insert(
349 anchored_leaf.view_number(),
350 View {
351 view_inner: ViewInner::Leaf {
352 leaf: anchored_leaf.commit(),
353 state: Arc::clone(&validated_state),
354 delta: initializer.anchor_state_delta,
355 epoch,
356 },
357 },
358 );
359 for (view_num, inner) in initializer.undecided_state {
360 validated_state_map.insert(view_num, inner);
361 }
362
363 let mut saved_leaves = HashMap::new();
364 let mut saved_payloads = BTreeMap::new();
365 saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
366
367 for (_, leaf) in initializer.undecided_leaves {
368 saved_leaves.insert(leaf.commit(), leaf.clone());
369 }
370 if let Some(payload) = anchored_leaf.block_payload() {
371 let metadata = anchored_leaf.block_header().metadata().clone();
372 saved_payloads.insert(
373 anchored_leaf.view_number(),
374 Arc::new(PayloadWithMetadata { payload, metadata }),
375 );
376 }
377 let high_qc_block_number = initializer.high_qc.data.block_number;
378 let (stake_table, success_threshold) = if let Ok(epoch_membership) =
379 membership_coordinator.stake_table_for_epoch(epoch).await
380 {
381 (
382 epoch_membership.stake_table().await,
383 epoch_membership.success_threshold().await,
384 )
385 } else {
386 tracing::warn!(
387 "Failed to get stake table for epoch {:?} while creating vote participation",
388 epoch
389 );
390 (HSStakeTable::default(), U256::MAX)
391 };
392
393 let consensus = Consensus::new(
394 validated_state_map,
395 Some(initializer.saved_vid_shares),
396 anchored_leaf.view_number(),
397 epoch,
398 anchored_leaf.view_number(),
399 anchored_leaf.view_number(),
400 initializer.last_actioned_view,
401 initializer.saved_proposals,
402 saved_leaves,
403 saved_payloads,
404 initializer.high_qc,
405 initializer.next_epoch_high_qc,
406 Arc::clone(&consensus_metrics),
407 config.epoch_height,
408 initializer.state_cert,
409 config.drb_difficulty,
410 config.drb_upgrade_difficulty,
411 stake_table,
412 success_threshold,
413 );
414
415 let consensus = Arc::new(RwLock::new(consensus));
416
417 if let Some(epoch) = epoch {
418 tracing::info!(
419 "Triggering catchup for epoch {} and next epoch {}",
420 epoch,
421 epoch + 1
422 );
423 let _ = membership_coordinator
425 .membership_for_epoch(Some(epoch))
426 .await;
427 let _ = membership_coordinator
428 .membership_for_epoch(Some(epoch + 1))
429 .await;
430 if let Some(high_qc_block_number) = high_qc_block_number
433 && is_ge_epoch_root(high_qc_block_number, config.epoch_height)
434 {
435 let _ = membership_coordinator
436 .stake_table_for_epoch(Some(epoch + 2))
437 .await;
438 }
439
440 if let Ok(drb_result) = storage.load_drb_result(epoch + 1).await {
441 tracing::error!("Writing DRB result for epoch {}", epoch + 1);
442 if let Ok(mem) = membership_coordinator
443 .stake_table_for_epoch(Some(epoch + 1))
444 .await
445 {
446 mem.add_drb_result(drb_result).await;
447 }
448 }
449 }
450
451 external_tx.set_await_active(false);
454
455 let inner: Arc<SystemContext<TYPES, I>> = Arc::new(SystemContext {
456 id: nonce,
457 consensus: OuterConsensus::new(consensus),
458 instance_state: Arc::new(instance_state),
459 public_key,
460 private_key,
461 state_private_key,
462 config,
463 start_view: initializer.start_view,
464 start_epoch: initializer.start_epoch,
465 network,
466 membership_coordinator,
467 metrics: Arc::clone(&consensus_metrics),
468 internal_event_stream: (internal_tx, internal_rx.deactivate()),
469 output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
470 external_event_stream: (external_tx, external_rx.deactivate()),
471 anchored_leaf: anchored_leaf.clone(),
472 storage,
473 storage_metrics,
474 upgrade_lock,
475 });
476
477 inner
478 }
479
480 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
485 pub async fn start_consensus(&self) {
486 #[cfg(all(feature = "rewind", not(debug_assertions)))]
487 compile_error!("Cannot run rewind in production builds!");
488
489 debug!("Starting Consensus");
490 let consensus = self.consensus.read().await;
491
492 let first_epoch = option_epoch_from_block_number(
493 self.upgrade_lock.upgrade().base >= EPOCH_VERSION,
494 self.config.epoch_start_block,
495 self.config.epoch_height,
496 );
497 let initial_view_change_epoch = self.start_epoch.max(first_epoch);
501 #[allow(clippy::panic)]
502 self.internal_event_stream
503 .0
504 .broadcast_direct(Arc::new(HotShotEvent::ViewChange(
505 self.start_view,
506 initial_view_change_epoch,
507 )))
508 .await
509 .unwrap_or_else(|_| {
510 panic!(
511 "Genesis Broadcast failed; event = ViewChange({:?}, {:?})",
512 self.start_view, initial_view_change_epoch,
513 )
514 });
515
516 let event_stream = self.internal_event_stream.0.clone();
518 let next_view_timeout = self.config.next_view_timeout;
519 let start_view = self.start_view;
520 let start_epoch = self.start_epoch;
521
522 spawn({
525 async move {
526 sleep(Duration::from_millis(next_view_timeout)).await;
527 broadcast_event(
528 Arc::new(HotShotEvent::Timeout(start_view, start_epoch)),
529 &event_stream,
530 )
531 .await;
532 }
533 });
534 #[allow(clippy::panic)]
535 self.internal_event_stream
536 .0
537 .broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
538 consensus.high_qc().clone(),
539 ))))
540 .await
541 .unwrap_or_else(|_| {
542 panic!(
543 "Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
544 consensus.high_qc()
545 )
546 });
547
548 {
549 if self.anchored_leaf.view_number() == ViewNumber::genesis() {
552 let (validated_state, state_delta) =
553 TYPES::ValidatedState::genesis(&self.instance_state);
554
555 let qc = QuorumCertificate2::genesis(
556 &validated_state,
557 self.instance_state.as_ref(),
558 self.upgrade_lock.upgrade(),
559 )
560 .await;
561
562 broadcast_event(
563 Event {
564 view_number: self.anchored_leaf.view_number(),
565 event: EventType::Decide {
566 leaf_chain: Arc::new(vec![LeafInfo::new(
567 self.anchored_leaf.clone(),
568 Arc::new(validated_state),
569 Some(Arc::new(state_delta)),
570 None,
571 None,
572 )]),
573 committing_qc: Arc::new(CertificatePair::non_epoch_change(qc)),
574 deciding_qc: None,
575 block_size: None,
576 },
577 },
578 &self.external_event_stream.0,
579 )
580 .await;
581 }
582 }
583 }
584
585 async fn send_external_event(&self, event: Event<TYPES>) {
587 debug!(?event, "send_external_event");
588 broadcast_event(event, &self.external_event_stream.0).await;
589 }
590
591 #[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
597 pub async fn publish_transaction_async(
598 &self,
599 transaction: TYPES::Transaction,
600 ) -> Result<(), HotShotError<TYPES>> {
601 trace!("Adding transaction to our own queue");
602
603 let api = self.clone();
604
605 let consensus_reader = api.consensus.read().await;
606 let view_number = consensus_reader.cur_view();
607 let epoch = consensus_reader.cur_epoch();
608 drop(consensus_reader);
609
610 let message_kind: DataMessage<TYPES> =
612 DataMessage::SubmitTransaction(transaction.clone(), view_number);
613 let message = Message {
614 sender: api.public_key.clone(),
615 kind: MessageKind::from(message_kind),
616 };
617
618 let serialized_message = self.upgrade_lock.serialize(&message).map_err(|err| {
619 HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
620 })?;
621
622 let membership = match api.membership_coordinator.membership_for_epoch(epoch).await {
623 Ok(m) => m,
624 Err(e) => return Err(HotShotError::InvalidState(e.message)),
625 };
626
627 spawn(async move {
628 let memberships_da_committee_members = membership
629 .da_committee_members(view_number)
630 .await
631 .iter()
632 .cloned()
633 .collect();
634
635 join! {
636 api
644 .network.da_broadcast_message(
645 view_number.u64().into(),
646 serialized_message,
647 memberships_da_committee_members,
648 BroadcastDelay::None,
649 ),
650 api
651 .send_external_event(Event {
652 view_number,
653 event: EventType::Transactions {
654 transactions: vec![transaction],
655 },
656 }),
657 }
658 });
659 Ok(())
660 }
661
662 #[must_use]
664 pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
665 Arc::clone(&self.consensus.inner_consensus)
666 }
667
668 pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
670 Arc::clone(&self.instance_state)
671 }
672
673 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
677 pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
678 self.consensus.read().await.decided_leaf()
679 }
680
681 #[must_use]
687 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
688 pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
689 self.consensus.try_read().map(|guard| guard.decided_leaf())
690 }
691
692 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
697 pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
698 Arc::clone(&self.consensus.read().await.decided_state())
699 }
700
701 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
709 pub async fn state(&self, view: ViewNumber) -> Option<Arc<TYPES::ValidatedState>> {
710 self.consensus.read().await.state(view).cloned()
711 }
712
713 #[allow(clippy::too_many_arguments)]
727 pub async fn init(
728 public_key: TYPES::SignatureKey,
729 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
730 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
731 node_id: u64,
732 config: HotShotConfig<TYPES>,
733 upgrade: versions::Upgrade,
734 memberships: EpochMembershipCoordinator<TYPES>,
735 network: Arc<I::Network>,
736 initializer: HotShotInitializer<TYPES>,
737 consensus_metrics: ConsensusMetricsValue,
738 storage: I::Storage,
739 storage_metrics: StorageMetricsValue,
740 ) -> Result<
741 (
742 SystemContextHandle<TYPES, I>,
743 Sender<Arc<HotShotEvent<TYPES>>>,
744 Receiver<Arc<HotShotEvent<TYPES>>>,
745 ),
746 HotShotError<TYPES>,
747 > {
748 let hotshot = Self::new(
749 public_key,
750 private_key,
751 state_private_key,
752 node_id,
753 config,
754 upgrade,
755 memberships,
756 network,
757 initializer,
758 consensus_metrics,
759 storage,
760 storage_metrics,
761 )
762 .await;
763 let handle = Arc::clone(&hotshot).run_tasks().await;
764 let (tx, rx) = hotshot.internal_event_stream.clone();
765
766 Ok((handle, tx, rx.activate()))
767 }
768 #[must_use]
770 pub fn next_view_timeout(&self) -> u64 {
771 self.config.next_view_timeout
772 }
773}
774
775impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
776 pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I> {
780 let consensus_registry = ConsensusTaskRegistry::new();
781 let network_registry = NetworkTaskRegistry::new();
782
783 let output_event_stream = self.external_event_stream.clone();
784 let internal_event_stream = self.internal_event_stream.clone();
785
786 let mut handle = SystemContextHandle {
787 consensus_registry,
788 network_registry,
789 output_event_stream: output_event_stream.clone(),
790 internal_event_stream: internal_event_stream.clone(),
791 hotshot: self.clone().into(),
792 storage: self.storage.clone(),
793 network: Arc::clone(&self.network),
794 membership_coordinator: self.membership_coordinator.clone(),
795 epoch_height: self.config.epoch_height,
796 };
797
798 add_network_tasks::<TYPES, I>(&mut handle).await;
799 add_consensus_tasks::<TYPES, I>(&mut handle).await;
800
801 handle
802 }
803}
804
805type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
807
808#[async_trait]
810pub trait TwinsHandlerState<TYPES, I>
811where
812 Self: std::fmt::Debug + Send + Sync,
813 TYPES: NodeType,
814 I: NodeImplementation<TYPES>,
815{
816 async fn send_handler(
818 &mut self,
819 event: &HotShotEvent<TYPES>,
820 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
821
822 async fn recv_handler(
824 &mut self,
825 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
826 ) -> Vec<HotShotEvent<TYPES>>;
827
828 fn fuse_channels(
832 &'static mut self,
833 left: Channel<HotShotEvent<TYPES>>,
834 right: Channel<HotShotEvent<TYPES>>,
835 ) -> Channel<HotShotEvent<TYPES>> {
836 let send_state = Arc::new(RwLock::new(self));
837 let recv_state = Arc::clone(&send_state);
838
839 let (left_sender, mut left_receiver) = (left.0, left.1);
840 let (right_sender, mut right_receiver) = (right.0, right.1);
841
842 let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
844 let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
846 broadcast(EVENT_CHANNEL_SIZE);
847
848 let _recv_loop_handle = spawn(async move {
849 loop {
850 let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
851 Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
852 Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
853 };
854
855 let mut state = recv_state.write().await;
856 let mut result = state.recv_handler(&msg).await;
857
858 while let Some(event) = result.pop() {
859 let _ = sender_to_network.broadcast(event.into()).await;
860 }
861 }
862 });
863
864 let _send_loop_handle = spawn(async move {
865 loop {
866 if let Ok(msg) = receiver_from_network.recv().await {
867 let mut state = send_state.write().await;
868
869 let mut result = state.send_handler(&msg).await;
870
871 while let Some(event) = result.pop() {
872 match event {
873 Either::Left(msg) => {
874 let _ = left_sender.broadcast(msg.into()).await;
875 },
876 Either::Right(msg) => {
877 let _ = right_sender.broadcast(msg.into()).await;
878 },
879 }
880 }
881 }
882 }
883 });
884
885 (network_task_sender, network_task_receiver)
886 }
887
888 #[allow(clippy::too_many_arguments)]
889 async fn spawn_twin_handles(
893 &'static mut self,
894 public_key: TYPES::SignatureKey,
895 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
896 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
897 nonce: u64,
898 config: HotShotConfig<TYPES>,
899 upgrade: versions::Upgrade,
900 memberships: EpochMembershipCoordinator<TYPES>,
901 network: Arc<I::Network>,
902 initializer: HotShotInitializer<TYPES>,
903 consensus_metrics: ConsensusMetricsValue,
904 storage: I::Storage,
905 storage_metrics: StorageMetricsValue,
906 ) -> (SystemContextHandle<TYPES, I>, SystemContextHandle<TYPES, I>) {
907 let epoch_height = config.epoch_height;
908 let left_system_context = SystemContext::new(
909 public_key.clone(),
910 private_key.clone(),
911 state_private_key.clone(),
912 nonce,
913 config.clone(),
914 upgrade,
915 memberships.clone(),
916 Arc::clone(&network),
917 initializer.clone(),
918 consensus_metrics.clone(),
919 storage.clone(),
920 storage_metrics.clone(),
921 )
922 .await;
923 let right_system_context = SystemContext::new(
924 public_key,
925 private_key,
926 state_private_key,
927 nonce,
928 config,
929 upgrade,
930 memberships,
931 network,
932 initializer,
933 consensus_metrics,
934 storage,
935 storage_metrics,
936 )
937 .await;
938
939 let left_consensus_registry = ConsensusTaskRegistry::new();
941 let left_network_registry = NetworkTaskRegistry::new();
942
943 let right_consensus_registry = ConsensusTaskRegistry::new();
944 let right_network_registry = NetworkTaskRegistry::new();
945
946 let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
948 let left_external_event_stream =
949 (left_external_sender, left_external_receiver.deactivate());
950
951 let (right_external_sender, right_external_receiver) =
952 broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
953 let right_external_event_stream =
954 (right_external_sender, right_external_receiver.deactivate());
955
956 let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
958 let left_internal_event_stream = (
959 left_internal_sender.clone(),
960 left_internal_receiver.clone().deactivate(),
961 );
962
963 let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
964 let right_internal_event_stream = (
965 right_internal_sender.clone(),
966 right_internal_receiver.clone().deactivate(),
967 );
968
969 let mut left_handle = SystemContextHandle::<_, I> {
971 consensus_registry: left_consensus_registry,
972 network_registry: left_network_registry,
973 output_event_stream: left_external_event_stream.clone(),
974 internal_event_stream: left_internal_event_stream.clone(),
975 hotshot: Arc::clone(&left_system_context),
976 storage: left_system_context.storage.clone(),
977 network: Arc::clone(&left_system_context.network),
978 membership_coordinator: left_system_context.membership_coordinator.clone(),
979 epoch_height,
980 };
981
982 let mut right_handle = SystemContextHandle::<_, I> {
983 consensus_registry: right_consensus_registry,
984 network_registry: right_network_registry,
985 output_event_stream: right_external_event_stream.clone(),
986 internal_event_stream: right_internal_event_stream.clone(),
987 hotshot: Arc::clone(&right_system_context),
988 storage: right_system_context.storage.clone(),
989 network: Arc::clone(&right_system_context.network),
990 membership_coordinator: right_system_context.membership_coordinator.clone(),
991 epoch_height,
992 };
993
994 add_consensus_tasks::<TYPES, I>(&mut left_handle).await;
996 add_consensus_tasks::<TYPES, I>(&mut right_handle).await;
997
998 let fused_internal_event_stream = self.fuse_channels(
1000 (left_internal_sender, left_internal_receiver),
1001 (right_internal_sender, right_internal_receiver),
1002 );
1003
1004 left_handle.internal_event_stream = (
1006 fused_internal_event_stream.0,
1007 fused_internal_event_stream.1.deactivate(),
1008 );
1009
1010 add_network_tasks::<TYPES, I>(&mut left_handle).await;
1012
1013 left_handle.internal_event_stream = left_internal_event_stream.clone();
1015
1016 (left_handle, right_handle)
1017 }
1018}
1019
1020#[derive(Debug)]
1021pub struct RandomTwinsHandler;
1024
1025#[async_trait]
1026impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TwinsHandlerState<TYPES, I>
1027 for RandomTwinsHandler
1028{
1029 async fn send_handler(
1030 &mut self,
1031 event: &HotShotEvent<TYPES>,
1032 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1033 let random: bool = rand::thread_rng().r#gen();
1034
1035 #[allow(clippy::match_bool)]
1036 match random {
1037 true => vec![Either::Left(event.clone())],
1038 false => vec![Either::Right(event.clone())],
1039 }
1040 }
1041
1042 async fn recv_handler(
1043 &mut self,
1044 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1045 ) -> Vec<HotShotEvent<TYPES>> {
1046 match event {
1047 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1048 }
1049 }
1050}
1051
1052#[derive(Debug)]
1055pub struct DoubleTwinsHandler;
1056
1057#[async_trait]
1058impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TwinsHandlerState<TYPES, I>
1059 for DoubleTwinsHandler
1060{
1061 async fn send_handler(
1062 &mut self,
1063 event: &HotShotEvent<TYPES>,
1064 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1065 vec![Either::Left(event.clone()), Either::Right(event.clone())]
1066 }
1067
1068 async fn recv_handler(
1069 &mut self,
1070 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1071 ) -> Vec<HotShotEvent<TYPES>> {
1072 match event {
1073 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1074 }
1075 }
1076}
1077
1078#[async_trait]
1079impl<TYPES: NodeType, I: NodeImplementation<TYPES>> ConsensusApi<TYPES, I>
1080 for SystemContextHandle<TYPES, I>
1081{
1082 fn total_nodes(&self) -> NonZeroUsize {
1083 self.hotshot.config.num_nodes_with_stake
1084 }
1085
1086 fn builder_timeout(&self) -> Duration {
1087 self.hotshot.config.builder_timeout
1088 }
1089
1090 async fn send_event(&self, event: Event<TYPES>) {
1091 debug!(?event, "send_event");
1092 broadcast_event(event, &self.hotshot.external_event_stream.0).await;
1093 }
1094
1095 fn public_key(&self) -> &TYPES::SignatureKey {
1096 &self.hotshot.public_key
1097 }
1098
1099 fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
1100 &self.hotshot.private_key
1101 }
1102
1103 fn state_private_key(
1104 &self,
1105 ) -> &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey {
1106 &self.hotshot.state_private_key
1107 }
1108}
1109
1110#[derive(Clone, Debug, PartialEq)]
1111pub struct InitializerEpochInfo<TYPES: NodeType> {
1112 pub epoch: EpochNumber,
1113 pub drb_result: DrbResult,
1114 pub block_header: Option<TYPES::BlockHeader>,
1116}
1117
1118#[derive(Clone, Debug)]
1119pub struct HotShotInitializer<TYPES: NodeType> {
1121 pub instance_state: TYPES::InstanceState,
1123
1124 pub epoch_height: u64,
1126
1127 pub epoch_start_block: u64,
1129
1130 pub anchor_leaf: Leaf2<TYPES>,
1132
1133 pub anchor_state: Arc<TYPES::ValidatedState>,
1135
1136 pub anchor_state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1138
1139 pub start_view: ViewNumber,
1141
1142 pub last_actioned_view: ViewNumber,
1145
1146 pub start_epoch: Option<EpochNumber>,
1148
1149 pub high_qc: QuorumCertificate2<TYPES>,
1153
1154 pub next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
1156
1157 pub saved_proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1159
1160 pub decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1162
1163 pub undecided_leaves: BTreeMap<ViewNumber, Leaf2<TYPES>>,
1166
1167 pub undecided_state: BTreeMap<ViewNumber, View<TYPES>>,
1169
1170 pub saved_vid_shares: VidShares<TYPES>,
1172
1173 pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1175
1176 pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1178}
1179
1180impl<TYPES: NodeType> HotShotInitializer<TYPES> {
1181 pub async fn from_genesis(
1185 instance_state: TYPES::InstanceState,
1186 epoch_height: u64,
1187 epoch_start_block: u64,
1188 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1189 upgrade: Upgrade,
1190 ) -> Result<Self, HotShotError<TYPES>> {
1191 let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
1192 let high_qc = QuorumCertificate2::genesis(&validated_state, &instance_state, upgrade).await;
1193
1194 Ok(Self {
1195 anchor_leaf: Leaf2::genesis(&validated_state, &instance_state, upgrade.base).await,
1196 anchor_state: Arc::new(validated_state),
1197 anchor_state_delta: Some(Arc::new(state_delta)),
1198 start_view: ViewNumber::new(0),
1199 start_epoch: genesis_epoch_from_version(upgrade.base),
1200 last_actioned_view: ViewNumber::new(0),
1201 saved_proposals: BTreeMap::new(),
1202 high_qc,
1203 next_epoch_high_qc: None,
1204 decided_upgrade_certificate: None,
1205 undecided_leaves: BTreeMap::new(),
1206 undecided_state: BTreeMap::new(),
1207 instance_state,
1208 saved_vid_shares: BTreeMap::new(),
1209 epoch_height,
1210 state_cert: None,
1211 epoch_start_block,
1212 start_epoch_info,
1213 })
1214 }
1215
1216 #[must_use]
1218 pub fn update_undecided(self) -> Self {
1219 let mut undecided_leaves = self.undecided_leaves.clone();
1220 let mut undecided_state = self.undecided_state.clone();
1221
1222 for proposal in self.saved_proposals.values() {
1223 if proposal.data.view_number() <= self.anchor_leaf.view_number() {
1225 continue;
1226 }
1227
1228 undecided_leaves.insert(
1229 proposal.data.view_number(),
1230 Leaf2::from_quorum_proposal(&proposal.data),
1231 );
1232 }
1233
1234 for leaf in undecided_leaves.values() {
1235 let view_inner = ViewInner::Leaf {
1236 leaf: leaf.commit(),
1237 state: Arc::new(TYPES::ValidatedState::from_header(leaf.block_header())),
1238 delta: None,
1239 epoch: leaf.epoch(self.epoch_height),
1240 };
1241 let view = View { view_inner };
1242
1243 undecided_state.insert(leaf.view_number(), view);
1244 }
1245
1246 Self {
1247 undecided_leaves,
1248 undecided_state,
1249 ..self
1250 }
1251 }
1252
1253 #[allow(clippy::too_many_arguments)]
1261 pub fn load(
1262 instance_state: TYPES::InstanceState,
1263 epoch_height: u64,
1264 epoch_start_block: u64,
1265 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1266 anchor_leaf: Leaf2<TYPES>,
1267 (start_view, start_epoch): (ViewNumber, Option<EpochNumber>),
1268 (high_qc, next_epoch_high_qc): (
1269 QuorumCertificate2<TYPES>,
1270 Option<NextEpochQuorumCertificate2<TYPES>>,
1271 ),
1272 last_actioned_view: ViewNumber,
1273 saved_proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1274 saved_vid_shares: VidShares<TYPES>,
1275 decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1276 state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1277 ) -> Self {
1278 let anchor_state = Arc::new(TYPES::ValidatedState::from_header(
1279 anchor_leaf.block_header(),
1280 ));
1281 let anchor_state_delta = None;
1282
1283 let initializer = Self {
1284 instance_state,
1285 epoch_height,
1286 epoch_start_block,
1287 anchor_leaf,
1288 anchor_state,
1289 anchor_state_delta,
1290 high_qc,
1291 start_view,
1292 start_epoch,
1293 last_actioned_view,
1294 saved_proposals,
1295 saved_vid_shares,
1296 next_epoch_high_qc,
1297 decided_upgrade_certificate,
1298 undecided_leaves: BTreeMap::new(),
1299 undecided_state: BTreeMap::new(),
1300 state_cert,
1301 start_epoch_info,
1302 };
1303
1304 initializer.update_undecided()
1305 }
1306}
1307
1308async fn load_start_epoch_info<TYPES: NodeType>(
1309 membership: &Arc<RwLock<TYPES::Membership>>,
1310 start_epoch_info: &Vec<InitializerEpochInfo<TYPES>>,
1311 epoch_height: u64,
1312 epoch_start_block: u64,
1313) {
1314 let first_epoch_number =
1315 EpochNumber::new(epoch_from_block_number(epoch_start_block, epoch_height));
1316
1317 tracing::warn!("Calling set_first_epoch for epoch {first_epoch_number}");
1318 membership
1319 .write()
1320 .await
1321 .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
1322
1323 let mut sorted_epoch_info = start_epoch_info.clone();
1324 sorted_epoch_info.sort_by_key(|info| info.epoch);
1325 for epoch_info in sorted_epoch_info {
1326 if let Some(block_header) = &epoch_info.block_header {
1327 tracing::warn!("Calling add_epoch_root for epoch {}", epoch_info.epoch);
1328
1329 Membership::add_epoch_root(Arc::clone(membership), block_header.clone())
1330 .await
1331 .unwrap_or_else(|err| {
1332 tracing::error!(
1334 "Failed to add epoch root for epoch {}: {err}",
1335 epoch_info.epoch
1336 );
1337 });
1338 }
1339 }
1340
1341 for epoch_info in start_epoch_info {
1342 tracing::warn!("Calling add_drb_result for epoch {}", epoch_info.epoch);
1343 membership
1344 .write()
1345 .await
1346 .add_drb_result(epoch_info.epoch, epoch_info.drb_result);
1347 }
1348}