1#[cfg(feature = "docs")]
12pub mod documentation;
13use committable::Committable;
14use futures::future::{select, Either};
15use hotshot_types::{
16 drb::{drb_difficulty_selector, DrbResult, INITIAL_DRB_RESULT},
17 epoch_membership::EpochMembershipCoordinator,
18 message::UpgradeLock,
19 simple_certificate::LightClientStateUpdateCertificateV2,
20 traits::{
21 block_contents::BlockHeader, election::Membership, network::BroadcastDelay,
22 node_implementation::Versions, signature_key::StateSignatureKey, storage::Storage,
23 },
24 utils::epoch_from_block_number,
25};
26use rand::Rng;
27use vbs::version::StaticVersionType;
28
29pub mod traits;
31pub mod types;
33
34pub mod tasks;
35use hotshot_types::data::QuorumProposalWrapper;
36
37pub mod helpers;
39
40use std::{
41 collections::{BTreeMap, HashMap},
42 num::NonZeroUsize,
43 sync::Arc,
44 time::Duration,
45};
46
47use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
48use async_lock::RwLock;
49use async_trait::async_trait;
50use futures::join;
51use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
52use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
53pub use hotshot_types::error::HotShotError;
56use hotshot_types::{
57 consensus::{
58 Consensus, ConsensusMetricsValue, OuterConsensus, PayloadWithMetadata, VidShares, View,
59 ViewInner,
60 },
61 constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
62 data::Leaf2,
63 event::{EventType, LeafInfo},
64 message::{DataMessage, Message, MessageKind, Proposal},
65 simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
66 storage_metrics::StorageMetricsValue,
67 traits::{
68 consensus_api::ConsensusApi,
69 network::ConnectedNetwork,
70 node_implementation::{ConsensusTime, NodeType},
71 signature_key::SignatureKey,
72 states::ValidatedState,
73 },
74 utils::{genesis_epoch_from_version, option_epoch_from_block_number},
75 HotShotConfig,
76};
77pub use rand;
79use tokio::{spawn, time::sleep};
80use tracing::{debug, instrument, trace};
81
82use crate::{
85 tasks::{add_consensus_tasks, add_network_tasks},
86 traits::NodeImplementation,
87 types::{Event, SystemContextHandle},
88};
89
90pub const H_512: usize = 64;
92pub const H_256: usize = 32;
94
95pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
97 public_key: TYPES::SignatureKey,
99
100 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
102
103 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106 pub config: HotShotConfig<TYPES>,
108
109 pub network: Arc<I::Network>,
111
112 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
114
115 metrics: Arc<ConsensusMetricsValue>,
117
118 consensus: OuterConsensus<TYPES>,
120
121 instance_state: Arc<TYPES::InstanceState>,
123
124 start_view: TYPES::View,
126
127 start_epoch: Option<TYPES::Epoch>,
129
130 output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
132
133 pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
135
136 anchored_leaf: Leaf2<TYPES>,
138
139 #[allow(clippy::type_complexity)]
141 internal_event_stream: (
142 Sender<Arc<HotShotEvent<TYPES>>>,
143 InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
144 ),
145
146 pub id: u64,
148
149 pub storage: I::Storage,
151
152 pub storage_metrics: Arc<StorageMetricsValue>,
154
155 pub upgrade_lock: UpgradeLock<TYPES, V>,
157}
158impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Clone
159 for SystemContext<TYPES, I, V>
160{
161 #![allow(deprecated)]
162 fn clone(&self) -> Self {
163 Self {
164 public_key: self.public_key.clone(),
165 private_key: self.private_key.clone(),
166 state_private_key: self.state_private_key.clone(),
167 config: self.config.clone(),
168 network: Arc::clone(&self.network),
169 membership_coordinator: self.membership_coordinator.clone(),
170 metrics: Arc::clone(&self.metrics),
171 consensus: self.consensus.clone(),
172 instance_state: Arc::clone(&self.instance_state),
173 start_view: self.start_view,
174 start_epoch: self.start_epoch,
175 output_event_stream: self.output_event_stream.clone(),
176 external_event_stream: self.external_event_stream.clone(),
177 anchored_leaf: self.anchored_leaf.clone(),
178 internal_event_stream: self.internal_event_stream.clone(),
179 id: self.id,
180 storage: self.storage.clone(),
181 storage_metrics: Arc::clone(&self.storage_metrics),
182 upgrade_lock: self.upgrade_lock.clone(),
183 }
184 }
185}
186
187impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
188 #![allow(deprecated)]
189 #[allow(clippy::too_many_arguments)]
200 pub async fn new(
201 public_key: TYPES::SignatureKey,
202 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
203 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
204 nonce: u64,
205 config: HotShotConfig<TYPES>,
206 memberships: EpochMembershipCoordinator<TYPES>,
207 network: Arc<I::Network>,
208 initializer: HotShotInitializer<TYPES>,
209 consensus_metrics: ConsensusMetricsValue,
210 storage: I::Storage,
211 storage_metrics: StorageMetricsValue,
212 ) -> Arc<Self> {
213 let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
214 let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
215
216 Self::new_from_channels(
217 public_key,
218 private_key,
219 state_private_key,
220 nonce,
221 config,
222 memberships,
223 network,
224 initializer,
225 consensus_metrics,
226 storage,
227 storage_metrics,
228 internal_chan,
229 external_chan,
230 )
231 .await
232 }
233
234 #[allow(clippy::too_many_arguments, clippy::type_complexity)]
242 pub async fn new_from_channels(
243 public_key: TYPES::SignatureKey,
244 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
245 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
246 nonce: u64,
247 config: HotShotConfig<TYPES>,
248 membership_coordinator: EpochMembershipCoordinator<TYPES>,
249 network: Arc<I::Network>,
250 initializer: HotShotInitializer<TYPES>,
251 consensus_metrics: ConsensusMetricsValue,
252 storage: I::Storage,
253 storage_metrics: StorageMetricsValue,
254 internal_channel: (
255 Sender<Arc<HotShotEvent<TYPES>>>,
256 Receiver<Arc<HotShotEvent<TYPES>>>,
257 ),
258 external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
259 ) -> Arc<Self> {
260 debug!("Creating a new hotshot");
261
262 tracing::warn!("Starting consensus with HotShotConfig:\n\n {config:?}");
263
264 let consensus_metrics = Arc::new(consensus_metrics);
265 let storage_metrics = Arc::new(storage_metrics);
266 let anchored_leaf = initializer.anchor_leaf;
267 let instance_state = initializer.instance_state;
268
269 let (internal_tx, mut internal_rx) = internal_channel;
270 let (mut external_tx, mut external_rx) = external_channel;
271
272 tracing::warn!(
273 "Starting consensus with versions:\n\n Base: {:?}\nUpgrade: {:?}.",
274 V::Base::VERSION,
275 V::Upgrade::VERSION,
276 );
277 tracing::warn!(
278 "Loading previously decided upgrade certificate from storage: {:?}",
279 initializer.decided_upgrade_certificate
280 );
281
282 let upgrade_lock =
283 UpgradeLock::<TYPES, V>::from_certificate(&initializer.decided_upgrade_certificate);
284
285 debug!("Setting DRB difficulty selector in membership");
286 let drb_difficulty_selector = drb_difficulty_selector(upgrade_lock.clone(), &config);
287
288 membership_coordinator
289 .set_drb_difficulty_selector(drb_difficulty_selector)
290 .await;
291
292 external_rx.set_overflow(true);
294
295 internal_rx.set_overflow(true);
298
299 let validated_state = initializer.anchor_state;
302
303 load_start_epoch_info(
304 membership_coordinator.membership(),
305 &initializer.start_epoch_info,
306 config.epoch_height,
307 config.epoch_start_block,
308 )
309 .await;
310
311 let epoch = initializer.high_qc.data.block_number.map(|block_number| {
313 TYPES::Epoch::new(epoch_from_block_number(
314 block_number + 1,
315 config.epoch_height,
316 ))
317 });
318
319 let mut validated_state_map = BTreeMap::default();
321 validated_state_map.insert(
322 anchored_leaf.view_number(),
323 View {
324 view_inner: ViewInner::Leaf {
325 leaf: anchored_leaf.commit(),
326 state: Arc::clone(&validated_state),
327 delta: initializer.anchor_state_delta,
328 epoch,
329 },
330 },
331 );
332 for (view_num, inner) in initializer.undecided_state {
333 validated_state_map.insert(view_num, inner);
334 }
335
336 let mut saved_leaves = HashMap::new();
337 let mut saved_payloads = BTreeMap::new();
338 saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
339
340 for (_, leaf) in initializer.undecided_leaves {
341 saved_leaves.insert(leaf.commit(), leaf.clone());
342 }
343 if let Some(payload) = anchored_leaf.block_payload() {
344 let metadata = anchored_leaf.block_header().metadata().clone();
345 saved_payloads.insert(
346 anchored_leaf.view_number(),
347 Arc::new(PayloadWithMetadata { payload, metadata }),
348 );
349 }
350
351 let consensus = Consensus::new(
352 validated_state_map,
353 Some(initializer.saved_vid_shares),
354 anchored_leaf.view_number(),
355 epoch,
356 anchored_leaf.view_number(),
357 anchored_leaf.view_number(),
358 initializer.last_actioned_view,
359 initializer.saved_proposals,
360 saved_leaves,
361 saved_payloads,
362 initializer.high_qc,
363 initializer.next_epoch_high_qc,
364 Arc::clone(&consensus_metrics),
365 config.epoch_height,
366 initializer.state_cert,
367 config.drb_difficulty,
368 config.drb_upgrade_difficulty,
369 );
370
371 let consensus = Arc::new(RwLock::new(consensus));
372
373 if let Some(epoch) = epoch {
374 let _ = membership_coordinator
376 .membership_for_epoch(Some(epoch))
377 .await;
378 let _ = membership_coordinator
379 .membership_for_epoch(Some(epoch + 1))
380 .await;
381
382 if let Ok(drb_result) = storage.load_drb_result(epoch + 1).await {
383 tracing::error!("Writing DRB result for epoch {}", epoch + 1);
384 if let Ok(mem) = membership_coordinator
385 .stake_table_for_epoch(Some(epoch + 1))
386 .await
387 {
388 mem.add_drb_result(drb_result).await;
389 }
390 }
391 }
392
393 external_tx.set_await_active(false);
396
397 let inner: Arc<SystemContext<TYPES, I, V>> = Arc::new(SystemContext {
398 id: nonce,
399 consensus: OuterConsensus::new(consensus),
400 instance_state: Arc::new(instance_state),
401 public_key,
402 private_key,
403 state_private_key,
404 config,
405 start_view: initializer.start_view,
406 start_epoch: initializer.start_epoch,
407 network,
408 membership_coordinator,
409 metrics: Arc::clone(&consensus_metrics),
410 internal_event_stream: (internal_tx, internal_rx.deactivate()),
411 output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
412 external_event_stream: (external_tx, external_rx.deactivate()),
413 anchored_leaf: anchored_leaf.clone(),
414 storage,
415 storage_metrics,
416 upgrade_lock,
417 });
418
419 inner
420 }
421
422 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
427 pub async fn start_consensus(&self) {
428 #[cfg(all(feature = "rewind", not(debug_assertions)))]
429 compile_error!("Cannot run rewind in production builds!");
430
431 debug!("Starting Consensus");
432 let consensus = self.consensus.read().await;
433
434 let first_epoch = option_epoch_from_block_number::<TYPES>(
435 V::Base::VERSION >= V::Epochs::VERSION,
436 self.config.epoch_start_block,
437 self.config.epoch_height,
438 );
439 let initial_view_change_epoch = self.start_epoch.max(first_epoch);
443 #[allow(clippy::panic)]
444 self.internal_event_stream
445 .0
446 .broadcast_direct(Arc::new(HotShotEvent::ViewChange(
447 self.start_view,
448 initial_view_change_epoch,
449 )))
450 .await
451 .unwrap_or_else(|_| {
452 panic!(
453 "Genesis Broadcast failed; event = ViewChange({:?}, {:?})",
454 self.start_view, initial_view_change_epoch,
455 )
456 });
457
458 let event_stream = self.internal_event_stream.0.clone();
460 let next_view_timeout = self.config.next_view_timeout;
461 let start_view = self.start_view;
462 let start_epoch = self.start_epoch;
463
464 spawn({
467 async move {
468 sleep(Duration::from_millis(next_view_timeout)).await;
469 broadcast_event(
470 Arc::new(HotShotEvent::Timeout(start_view, start_epoch)),
471 &event_stream,
472 )
473 .await;
474 }
475 });
476 #[allow(clippy::panic)]
477 self.internal_event_stream
478 .0
479 .broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
480 consensus.high_qc().clone(),
481 ))))
482 .await
483 .unwrap_or_else(|_| {
484 panic!(
485 "Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
486 consensus.high_qc()
487 )
488 });
489
490 {
491 if self.anchored_leaf.view_number() == TYPES::View::genesis() {
494 let (validated_state, state_delta) =
495 TYPES::ValidatedState::genesis(&self.instance_state);
496
497 let qc = Arc::new(
498 QuorumCertificate2::genesis::<V>(
499 &validated_state,
500 self.instance_state.as_ref(),
501 )
502 .await,
503 );
504
505 broadcast_event(
506 Event {
507 view_number: self.anchored_leaf.view_number(),
508 event: EventType::Decide {
509 leaf_chain: Arc::new(vec![LeafInfo::new(
510 self.anchored_leaf.clone(),
511 Arc::new(validated_state),
512 Some(Arc::new(state_delta)),
513 None,
514 None,
515 )]),
516 qc,
517 block_size: None,
518 },
519 },
520 &self.external_event_stream.0,
521 )
522 .await;
523 }
524 }
525 }
526
527 async fn send_external_event(&self, event: Event<TYPES>) {
529 debug!(?event, "send_external_event");
530 broadcast_event(event, &self.external_event_stream.0).await;
531 }
532
533 #[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
539 pub async fn publish_transaction_async(
540 &self,
541 transaction: TYPES::Transaction,
542 ) -> Result<(), HotShotError<TYPES>> {
543 trace!("Adding transaction to our own queue");
544
545 let api = self.clone();
546
547 let consensus_reader = api.consensus.read().await;
548 let view_number = consensus_reader.cur_view();
549 let epoch = consensus_reader.cur_epoch();
550 drop(consensus_reader);
551
552 let message_kind: DataMessage<TYPES> =
554 DataMessage::SubmitTransaction(transaction.clone(), view_number);
555 let message = Message {
556 sender: api.public_key.clone(),
557 kind: MessageKind::from(message_kind),
558 };
559
560 let serialized_message = self.upgrade_lock.serialize(&message).await.map_err(|err| {
561 HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
562 })?;
563
564 let membership = match api.membership_coordinator.membership_for_epoch(epoch).await {
565 Ok(m) => m,
566 Err(e) => return Err(HotShotError::InvalidState(e.message)),
567 };
568
569 spawn(async move {
570 let memberships_da_committee_members = membership
571 .da_committee_members(view_number)
572 .await
573 .iter()
574 .cloned()
575 .collect();
576
577 join! {
578 api
586 .network.da_broadcast_message(
587 serialized_message,
588 memberships_da_committee_members,
589 BroadcastDelay::None,
590 ),
591 api
592 .send_external_event(Event {
593 view_number,
594 event: EventType::Transactions {
595 transactions: vec![transaction],
596 },
597 }),
598 }
599 });
600 Ok(())
601 }
602
603 #[must_use]
605 pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
606 Arc::clone(&self.consensus.inner_consensus)
607 }
608
609 pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
611 Arc::clone(&self.instance_state)
612 }
613
614 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
618 pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
619 self.consensus.read().await.decided_leaf()
620 }
621
622 #[must_use]
628 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
629 pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
630 self.consensus.try_read().map(|guard| guard.decided_leaf())
631 }
632
633 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
638 pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
639 Arc::clone(&self.consensus.read().await.decided_state())
640 }
641
642 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
650 pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
651 self.consensus.read().await.state(view).cloned()
652 }
653
654 #[allow(clippy::too_many_arguments)]
668 pub async fn init(
669 public_key: TYPES::SignatureKey,
670 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
671 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
672 node_id: u64,
673 config: HotShotConfig<TYPES>,
674 memberships: EpochMembershipCoordinator<TYPES>,
675 network: Arc<I::Network>,
676 initializer: HotShotInitializer<TYPES>,
677 consensus_metrics: ConsensusMetricsValue,
678 storage: I::Storage,
679 storage_metrics: StorageMetricsValue,
680 ) -> Result<
681 (
682 SystemContextHandle<TYPES, I, V>,
683 Sender<Arc<HotShotEvent<TYPES>>>,
684 Receiver<Arc<HotShotEvent<TYPES>>>,
685 ),
686 HotShotError<TYPES>,
687 > {
688 let hotshot = Self::new(
689 public_key,
690 private_key,
691 state_private_key,
692 node_id,
693 config,
694 memberships,
695 network,
696 initializer,
697 consensus_metrics,
698 storage,
699 storage_metrics,
700 )
701 .await;
702 let handle = Arc::clone(&hotshot).run_tasks().await;
703 let (tx, rx) = hotshot.internal_event_stream.clone();
704
705 Ok((handle, tx, rx.activate()))
706 }
707 #[must_use]
709 pub fn next_view_timeout(&self) -> u64 {
710 self.config.next_view_timeout
711 }
712}
713
714impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
715 pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I, V> {
719 let consensus_registry = ConsensusTaskRegistry::new();
720 let network_registry = NetworkTaskRegistry::new();
721
722 let output_event_stream = self.external_event_stream.clone();
723 let internal_event_stream = self.internal_event_stream.clone();
724
725 let mut handle = SystemContextHandle {
726 consensus_registry,
727 network_registry,
728 output_event_stream: output_event_stream.clone(),
729 internal_event_stream: internal_event_stream.clone(),
730 hotshot: self.clone().into(),
731 storage: self.storage.clone(),
732 network: Arc::clone(&self.network),
733 membership_coordinator: self.membership_coordinator.clone(),
734 epoch_height: self.config.epoch_height,
735 };
736
737 add_network_tasks::<TYPES, I, V>(&mut handle).await;
738 add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
739
740 handle
741 }
742}
743
744type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
746
747#[async_trait]
748pub trait TwinsHandlerState<TYPES, I, V>
750where
751 Self: std::fmt::Debug + Send + Sync,
752 TYPES: NodeType,
753 I: NodeImplementation<TYPES>,
754 V: Versions,
755{
756 async fn send_handler(
758 &mut self,
759 event: &HotShotEvent<TYPES>,
760 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
761
762 async fn recv_handler(
764 &mut self,
765 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
766 ) -> Vec<HotShotEvent<TYPES>>;
767
768 fn fuse_channels(
772 &'static mut self,
773 left: Channel<HotShotEvent<TYPES>>,
774 right: Channel<HotShotEvent<TYPES>>,
775 ) -> Channel<HotShotEvent<TYPES>> {
776 let send_state = Arc::new(RwLock::new(self));
777 let recv_state = Arc::clone(&send_state);
778
779 let (left_sender, mut left_receiver) = (left.0, left.1);
780 let (right_sender, mut right_receiver) = (right.0, right.1);
781
782 let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
784 let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
786 broadcast(EVENT_CHANNEL_SIZE);
787
788 let _recv_loop_handle = spawn(async move {
789 loop {
790 let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
791 Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
792 Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
793 };
794
795 let mut state = recv_state.write().await;
796 let mut result = state.recv_handler(&msg).await;
797
798 while let Some(event) = result.pop() {
799 let _ = sender_to_network.broadcast(event.into()).await;
800 }
801 }
802 });
803
804 let _send_loop_handle = spawn(async move {
805 loop {
806 if let Ok(msg) = receiver_from_network.recv().await {
807 let mut state = send_state.write().await;
808
809 let mut result = state.send_handler(&msg).await;
810
811 while let Some(event) = result.pop() {
812 match event {
813 Either::Left(msg) => {
814 let _ = left_sender.broadcast(msg.into()).await;
815 },
816 Either::Right(msg) => {
817 let _ = right_sender.broadcast(msg.into()).await;
818 },
819 }
820 }
821 }
822 }
823 });
824
825 (network_task_sender, network_task_receiver)
826 }
827
828 #[allow(clippy::too_many_arguments)]
829 async fn spawn_twin_handles(
833 &'static mut self,
834 public_key: TYPES::SignatureKey,
835 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
836 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
837 nonce: u64,
838 config: HotShotConfig<TYPES>,
839 memberships: EpochMembershipCoordinator<TYPES>,
840 network: Arc<I::Network>,
841 initializer: HotShotInitializer<TYPES>,
842 consensus_metrics: ConsensusMetricsValue,
843 storage: I::Storage,
844 storage_metrics: StorageMetricsValue,
845 ) -> (
846 SystemContextHandle<TYPES, I, V>,
847 SystemContextHandle<TYPES, I, V>,
848 ) {
849 let epoch_height = config.epoch_height;
850 let left_system_context = SystemContext::new(
851 public_key.clone(),
852 private_key.clone(),
853 state_private_key.clone(),
854 nonce,
855 config.clone(),
856 memberships.clone(),
857 Arc::clone(&network),
858 initializer.clone(),
859 consensus_metrics.clone(),
860 storage.clone(),
861 storage_metrics.clone(),
862 )
863 .await;
864 let right_system_context = SystemContext::new(
865 public_key,
866 private_key,
867 state_private_key,
868 nonce,
869 config,
870 memberships,
871 network,
872 initializer,
873 consensus_metrics,
874 storage,
875 storage_metrics,
876 )
877 .await;
878
879 let left_consensus_registry = ConsensusTaskRegistry::new();
881 let left_network_registry = NetworkTaskRegistry::new();
882
883 let right_consensus_registry = ConsensusTaskRegistry::new();
884 let right_network_registry = NetworkTaskRegistry::new();
885
886 let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
888 let left_external_event_stream =
889 (left_external_sender, left_external_receiver.deactivate());
890
891 let (right_external_sender, right_external_receiver) =
892 broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
893 let right_external_event_stream =
894 (right_external_sender, right_external_receiver.deactivate());
895
896 let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
898 let left_internal_event_stream = (
899 left_internal_sender.clone(),
900 left_internal_receiver.clone().deactivate(),
901 );
902
903 let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
904 let right_internal_event_stream = (
905 right_internal_sender.clone(),
906 right_internal_receiver.clone().deactivate(),
907 );
908
909 let mut left_handle = SystemContextHandle::<_, I, _> {
911 consensus_registry: left_consensus_registry,
912 network_registry: left_network_registry,
913 output_event_stream: left_external_event_stream.clone(),
914 internal_event_stream: left_internal_event_stream.clone(),
915 hotshot: Arc::clone(&left_system_context),
916 storage: left_system_context.storage.clone(),
917 network: Arc::clone(&left_system_context.network),
918 membership_coordinator: left_system_context.membership_coordinator.clone(),
919 epoch_height,
920 };
921
922 let mut right_handle = SystemContextHandle::<_, I, _> {
923 consensus_registry: right_consensus_registry,
924 network_registry: right_network_registry,
925 output_event_stream: right_external_event_stream.clone(),
926 internal_event_stream: right_internal_event_stream.clone(),
927 hotshot: Arc::clone(&right_system_context),
928 storage: right_system_context.storage.clone(),
929 network: Arc::clone(&right_system_context.network),
930 membership_coordinator: right_system_context.membership_coordinator.clone(),
931 epoch_height,
932 };
933
934 add_consensus_tasks::<TYPES, I, V>(&mut left_handle).await;
936 add_consensus_tasks::<TYPES, I, V>(&mut right_handle).await;
937
938 let fused_internal_event_stream = self.fuse_channels(
940 (left_internal_sender, left_internal_receiver),
941 (right_internal_sender, right_internal_receiver),
942 );
943
944 left_handle.internal_event_stream = (
946 fused_internal_event_stream.0,
947 fused_internal_event_stream.1.deactivate(),
948 );
949
950 add_network_tasks::<TYPES, I, V>(&mut left_handle).await;
952
953 left_handle.internal_event_stream = left_internal_event_stream.clone();
955
956 (left_handle, right_handle)
957 }
958}
959
960#[derive(Debug)]
961pub struct RandomTwinsHandler;
964
965#[async_trait]
966impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
967 for RandomTwinsHandler
968{
969 async fn send_handler(
970 &mut self,
971 event: &HotShotEvent<TYPES>,
972 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
973 let random: bool = rand::thread_rng().gen();
974
975 #[allow(clippy::match_bool)]
976 match random {
977 true => vec![Either::Left(event.clone())],
978 false => vec![Either::Right(event.clone())],
979 }
980 }
981
982 async fn recv_handler(
983 &mut self,
984 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
985 ) -> Vec<HotShotEvent<TYPES>> {
986 match event {
987 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
988 }
989 }
990}
991
992#[derive(Debug)]
993pub struct DoubleTwinsHandler;
996
997#[async_trait]
998impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
999 for DoubleTwinsHandler
1000{
1001 async fn send_handler(
1002 &mut self,
1003 event: &HotShotEvent<TYPES>,
1004 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1005 vec![Either::Left(event.clone()), Either::Right(event.clone())]
1006 }
1007
1008 async fn recv_handler(
1009 &mut self,
1010 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1011 ) -> Vec<HotShotEvent<TYPES>> {
1012 match event {
1013 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1014 }
1015 }
1016}
1017
1018#[async_trait]
1019impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusApi<TYPES, I>
1020 for SystemContextHandle<TYPES, I, V>
1021{
1022 fn total_nodes(&self) -> NonZeroUsize {
1023 self.hotshot.config.num_nodes_with_stake
1024 }
1025
1026 fn builder_timeout(&self) -> Duration {
1027 self.hotshot.config.builder_timeout
1028 }
1029
1030 async fn send_event(&self, event: Event<TYPES>) {
1031 debug!(?event, "send_event");
1032 broadcast_event(event, &self.hotshot.external_event_stream.0).await;
1033 }
1034
1035 fn public_key(&self) -> &TYPES::SignatureKey {
1036 &self.hotshot.public_key
1037 }
1038
1039 fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
1040 &self.hotshot.private_key
1041 }
1042
1043 fn state_private_key(
1044 &self,
1045 ) -> &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey {
1046 &self.hotshot.state_private_key
1047 }
1048}
1049
1050#[derive(Clone, Debug, PartialEq)]
1051pub struct InitializerEpochInfo<TYPES: NodeType> {
1052 pub epoch: TYPES::Epoch,
1053 pub drb_result: DrbResult,
1054 pub block_header: Option<TYPES::BlockHeader>,
1056}
1057
1058#[derive(Clone, Debug)]
1059pub struct HotShotInitializer<TYPES: NodeType> {
1061 pub instance_state: TYPES::InstanceState,
1063
1064 pub epoch_height: u64,
1066
1067 pub epoch_start_block: u64,
1069
1070 pub anchor_leaf: Leaf2<TYPES>,
1072
1073 pub anchor_state: Arc<TYPES::ValidatedState>,
1075
1076 pub anchor_state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1078
1079 pub start_view: TYPES::View,
1081
1082 pub last_actioned_view: TYPES::View,
1085
1086 pub start_epoch: Option<TYPES::Epoch>,
1088
1089 pub high_qc: QuorumCertificate2<TYPES>,
1093
1094 pub next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
1096
1097 pub saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1099
1100 pub decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1102
1103 pub undecided_leaves: BTreeMap<TYPES::View, Leaf2<TYPES>>,
1106
1107 pub undecided_state: BTreeMap<TYPES::View, View<TYPES>>,
1109
1110 pub saved_vid_shares: VidShares<TYPES>,
1112
1113 pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1115
1116 pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1118}
1119
1120impl<TYPES: NodeType> HotShotInitializer<TYPES> {
1121 pub async fn from_genesis<V: Versions>(
1125 instance_state: TYPES::InstanceState,
1126 epoch_height: u64,
1127 epoch_start_block: u64,
1128 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1129 ) -> Result<Self, HotShotError<TYPES>> {
1130 let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
1131 let high_qc = QuorumCertificate2::genesis::<V>(&validated_state, &instance_state).await;
1132
1133 Ok(Self {
1134 anchor_leaf: Leaf2::genesis::<V>(&validated_state, &instance_state).await,
1135 anchor_state: Arc::new(validated_state),
1136 anchor_state_delta: Some(Arc::new(state_delta)),
1137 start_view: TYPES::View::new(0),
1138 start_epoch: genesis_epoch_from_version::<V, TYPES>(),
1139 last_actioned_view: TYPES::View::new(0),
1140 saved_proposals: BTreeMap::new(),
1141 high_qc,
1142 next_epoch_high_qc: None,
1143 decided_upgrade_certificate: None,
1144 undecided_leaves: BTreeMap::new(),
1145 undecided_state: BTreeMap::new(),
1146 instance_state,
1147 saved_vid_shares: BTreeMap::new(),
1148 epoch_height,
1149 state_cert: None,
1150 epoch_start_block,
1151 start_epoch_info,
1152 })
1153 }
1154
1155 #[must_use]
1157 pub fn update_undecided(self) -> Self {
1158 let mut undecided_leaves = self.undecided_leaves.clone();
1159 let mut undecided_state = self.undecided_state.clone();
1160
1161 for proposal in self.saved_proposals.values() {
1162 if proposal.data.view_number() <= self.anchor_leaf.view_number() {
1164 continue;
1165 }
1166
1167 undecided_leaves.insert(
1168 proposal.data.view_number(),
1169 Leaf2::from_quorum_proposal(&proposal.data),
1170 );
1171 }
1172
1173 for leaf in undecided_leaves.values() {
1174 let view_inner = ViewInner::Leaf {
1175 leaf: leaf.commit(),
1176 state: Arc::new(TYPES::ValidatedState::from_header(leaf.block_header())),
1177 delta: None,
1178 epoch: leaf.epoch(self.epoch_height),
1179 };
1180 let view = View { view_inner };
1181
1182 undecided_state.insert(leaf.view_number(), view);
1183 }
1184
1185 Self {
1186 undecided_leaves,
1187 undecided_state,
1188 ..self
1189 }
1190 }
1191
1192 #[allow(clippy::too_many_arguments)]
1200 pub fn load(
1201 instance_state: TYPES::InstanceState,
1202 epoch_height: u64,
1203 epoch_start_block: u64,
1204 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1205 anchor_leaf: Leaf2<TYPES>,
1206 (start_view, start_epoch): (TYPES::View, Option<TYPES::Epoch>),
1207 (high_qc, next_epoch_high_qc): (
1208 QuorumCertificate2<TYPES>,
1209 Option<NextEpochQuorumCertificate2<TYPES>>,
1210 ),
1211 last_actioned_view: TYPES::View,
1212 saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1213 saved_vid_shares: VidShares<TYPES>,
1214 decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1215 state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1216 ) -> Self {
1217 let anchor_state = Arc::new(TYPES::ValidatedState::from_header(
1218 anchor_leaf.block_header(),
1219 ));
1220 let anchor_state_delta = None;
1221
1222 let initializer = Self {
1223 instance_state,
1224 epoch_height,
1225 epoch_start_block,
1226 anchor_leaf,
1227 anchor_state,
1228 anchor_state_delta,
1229 high_qc,
1230 start_view,
1231 start_epoch,
1232 last_actioned_view,
1233 saved_proposals,
1234 saved_vid_shares,
1235 next_epoch_high_qc,
1236 decided_upgrade_certificate,
1237 undecided_leaves: BTreeMap::new(),
1238 undecided_state: BTreeMap::new(),
1239 state_cert,
1240 start_epoch_info,
1241 };
1242
1243 initializer.update_undecided()
1244 }
1245}
1246
1247async fn load_start_epoch_info<TYPES: NodeType>(
1248 membership: &Arc<RwLock<TYPES::Membership>>,
1249 start_epoch_info: &Vec<InitializerEpochInfo<TYPES>>,
1250 epoch_height: u64,
1251 epoch_start_block: u64,
1252) {
1253 let first_epoch_number =
1254 TYPES::Epoch::new(epoch_from_block_number(epoch_start_block, epoch_height));
1255
1256 tracing::warn!("Calling set_first_epoch for epoch {first_epoch_number}");
1257 membership
1258 .write()
1259 .await
1260 .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
1261
1262 let mut sorted_epoch_info = start_epoch_info.clone();
1263 sorted_epoch_info.sort_by_key(|info| info.epoch);
1264 for epoch_info in sorted_epoch_info {
1265 if let Some(block_header) = &epoch_info.block_header {
1266 tracing::info!("Calling add_epoch_root for epoch {}", epoch_info.epoch);
1267
1268 Membership::add_epoch_root(
1269 Arc::clone(membership),
1270 epoch_info.epoch,
1271 block_header.clone(),
1272 )
1273 .await
1274 .unwrap_or_else(|err| {
1275 tracing::error!(
1277 "Failed to add epoch root for epoch {}: {err}",
1278 epoch_info.epoch
1279 );
1280 });
1281 }
1282 }
1283
1284 for epoch_info in start_epoch_info {
1285 tracing::info!("Calling add_drb_result for epoch {}", epoch_info.epoch);
1286 membership
1287 .write()
1288 .await
1289 .add_drb_result(epoch_info.epoch, epoch_info.drb_result);
1290 }
1291}