1#[cfg(feature = "docs")]
12pub mod documentation;
13use committable::Committable;
14use futures::future::{select, Either};
15use hotshot_types::{
16 drb::{drb_difficulty_selector, DrbResult, INITIAL_DRB_RESULT},
17 epoch_membership::EpochMembershipCoordinator,
18 message::UpgradeLock,
19 simple_certificate::LightClientStateUpdateCertificate,
20 traits::{
21 block_contents::BlockHeader, election::Membership, network::BroadcastDelay,
22 node_implementation::Versions, signature_key::StateSignatureKey,
23 },
24 utils::epoch_from_block_number,
25};
26use rand::Rng;
27use vbs::version::StaticVersionType;
28
29pub mod traits;
31pub mod types;
33
34pub mod tasks;
35use hotshot_types::data::QuorumProposalWrapper;
36
37pub mod helpers;
39
40use std::{
41 collections::{BTreeMap, HashMap},
42 num::NonZeroUsize,
43 sync::Arc,
44 time::Duration,
45};
46
47use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
48use async_lock::RwLock;
49use async_trait::async_trait;
50use futures::join;
51use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
52use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
53pub use hotshot_types::error::HotShotError;
56use hotshot_types::{
57 consensus::{
58 Consensus, ConsensusMetricsValue, OuterConsensus, PayloadWithMetadata, VidShares, View,
59 ViewInner,
60 },
61 constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
62 data::Leaf2,
63 event::{EventType, LeafInfo},
64 message::{DataMessage, Message, MessageKind, Proposal},
65 simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
66 storage_metrics::StorageMetricsValue,
67 traits::{
68 consensus_api::ConsensusApi,
69 network::ConnectedNetwork,
70 node_implementation::{ConsensusTime, NodeType},
71 signature_key::SignatureKey,
72 states::ValidatedState,
73 },
74 utils::{genesis_epoch_from_version, option_epoch_from_block_number},
75 HotShotConfig,
76};
77pub use rand;
79use tokio::{spawn, time::sleep};
80use tracing::{debug, instrument, trace};
81
82use crate::{
85 tasks::{add_consensus_tasks, add_network_tasks},
86 traits::NodeImplementation,
87 types::{Event, SystemContextHandle},
88};
89
90pub const H_512: usize = 64;
92pub const H_256: usize = 32;
94
95pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
97 public_key: TYPES::SignatureKey,
99
100 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
102
103 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106 pub config: HotShotConfig<TYPES>,
108
109 pub network: Arc<I::Network>,
111
112 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
114
115 metrics: Arc<ConsensusMetricsValue>,
117
118 consensus: OuterConsensus<TYPES>,
120
121 instance_state: Arc<TYPES::InstanceState>,
123
124 start_view: TYPES::View,
126
127 start_epoch: Option<TYPES::Epoch>,
129
130 output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
132
133 pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
135
136 anchored_leaf: Leaf2<TYPES>,
138
139 #[allow(clippy::type_complexity)]
141 internal_event_stream: (
142 Sender<Arc<HotShotEvent<TYPES>>>,
143 InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
144 ),
145
146 pub id: u64,
148
149 pub storage: I::Storage,
151
152 pub storage_metrics: Arc<StorageMetricsValue>,
154
155 pub upgrade_lock: UpgradeLock<TYPES, V>,
157}
158impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Clone
159 for SystemContext<TYPES, I, V>
160{
161 #![allow(deprecated)]
162 fn clone(&self) -> Self {
163 Self {
164 public_key: self.public_key.clone(),
165 private_key: self.private_key.clone(),
166 state_private_key: self.state_private_key.clone(),
167 config: self.config.clone(),
168 network: Arc::clone(&self.network),
169 membership_coordinator: self.membership_coordinator.clone(),
170 metrics: Arc::clone(&self.metrics),
171 consensus: self.consensus.clone(),
172 instance_state: Arc::clone(&self.instance_state),
173 start_view: self.start_view,
174 start_epoch: self.start_epoch,
175 output_event_stream: self.output_event_stream.clone(),
176 external_event_stream: self.external_event_stream.clone(),
177 anchored_leaf: self.anchored_leaf.clone(),
178 internal_event_stream: self.internal_event_stream.clone(),
179 id: self.id,
180 storage: self.storage.clone(),
181 storage_metrics: Arc::clone(&self.storage_metrics),
182 upgrade_lock: self.upgrade_lock.clone(),
183 }
184 }
185}
186
187impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
188 #![allow(deprecated)]
189 #[allow(clippy::too_many_arguments)]
200 pub async fn new(
201 public_key: TYPES::SignatureKey,
202 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
203 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
204 nonce: u64,
205 config: HotShotConfig<TYPES>,
206 memberships: EpochMembershipCoordinator<TYPES>,
207 network: Arc<I::Network>,
208 initializer: HotShotInitializer<TYPES>,
209 consensus_metrics: ConsensusMetricsValue,
210 storage: I::Storage,
211 storage_metrics: StorageMetricsValue,
212 ) -> Arc<Self> {
213 let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
214 let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
215
216 Self::new_from_channels(
217 public_key,
218 private_key,
219 state_private_key,
220 nonce,
221 config,
222 memberships,
223 network,
224 initializer,
225 consensus_metrics,
226 storage,
227 storage_metrics,
228 internal_chan,
229 external_chan,
230 )
231 .await
232 }
233
234 #[allow(clippy::too_many_arguments, clippy::type_complexity)]
242 pub async fn new_from_channels(
243 public_key: TYPES::SignatureKey,
244 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
245 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
246 nonce: u64,
247 config: HotShotConfig<TYPES>,
248 membership_coordinator: EpochMembershipCoordinator<TYPES>,
249 network: Arc<I::Network>,
250 initializer: HotShotInitializer<TYPES>,
251 consensus_metrics: ConsensusMetricsValue,
252 storage: I::Storage,
253 storage_metrics: StorageMetricsValue,
254 internal_channel: (
255 Sender<Arc<HotShotEvent<TYPES>>>,
256 Receiver<Arc<HotShotEvent<TYPES>>>,
257 ),
258 external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
259 ) -> Arc<Self> {
260 debug!("Creating a new hotshot");
261
262 tracing::warn!("Starting consensus with HotShotConfig:\n\n {config:?}");
263
264 let consensus_metrics = Arc::new(consensus_metrics);
265 let storage_metrics = Arc::new(storage_metrics);
266 let anchored_leaf = initializer.anchor_leaf;
267 let instance_state = initializer.instance_state;
268
269 let (internal_tx, mut internal_rx) = internal_channel;
270 let (mut external_tx, mut external_rx) = external_channel;
271
272 tracing::warn!(
273 "Starting consensus with versions:\n\n Base: {:?}\nUpgrade: {:?}.",
274 V::Base::VERSION,
275 V::Upgrade::VERSION,
276 );
277 tracing::warn!(
278 "Loading previously decided upgrade certificate from storage: {:?}",
279 initializer.decided_upgrade_certificate
280 );
281
282 let upgrade_lock =
283 UpgradeLock::<TYPES, V>::from_certificate(&initializer.decided_upgrade_certificate);
284
285 debug!("Setting DRB difficulty selector in membership");
286 let drb_difficulty_selector = drb_difficulty_selector(upgrade_lock.clone(), &config);
287
288 membership_coordinator
289 .set_drb_difficulty_selector(drb_difficulty_selector)
290 .await;
291
292 external_rx.set_overflow(true);
294
295 internal_rx.set_overflow(true);
298
299 let validated_state = initializer.anchor_state;
302
303 load_start_epoch_info(
304 membership_coordinator.membership(),
305 &initializer.start_epoch_info,
306 config.epoch_height,
307 config.epoch_start_block,
308 )
309 .await;
310
311 let epoch = initializer.high_qc.data.block_number.map(|block_number| {
313 TYPES::Epoch::new(epoch_from_block_number(
314 block_number + 1,
315 config.epoch_height,
316 ))
317 });
318
319 let mut validated_state_map = BTreeMap::default();
321 validated_state_map.insert(
322 anchored_leaf.view_number(),
323 View {
324 view_inner: ViewInner::Leaf {
325 leaf: anchored_leaf.commit(),
326 state: Arc::clone(&validated_state),
327 delta: initializer.anchor_state_delta,
328 epoch,
329 },
330 },
331 );
332 for (view_num, inner) in initializer.undecided_state {
333 validated_state_map.insert(view_num, inner);
334 }
335
336 let mut saved_leaves = HashMap::new();
337 let mut saved_payloads = BTreeMap::new();
338 saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
339
340 for (_, leaf) in initializer.undecided_leaves {
341 saved_leaves.insert(leaf.commit(), leaf.clone());
342 }
343 if let Some(payload) = anchored_leaf.block_payload() {
344 let metadata = anchored_leaf.block_header().metadata().clone();
345 saved_payloads.insert(
346 anchored_leaf.view_number(),
347 Arc::new(PayloadWithMetadata { payload, metadata }),
348 );
349 }
350
351 let consensus = Consensus::new(
352 validated_state_map,
353 Some(initializer.saved_vid_shares),
354 anchored_leaf.view_number(),
355 epoch,
356 anchored_leaf.view_number(),
357 anchored_leaf.view_number(),
358 initializer.last_actioned_view,
359 initializer.saved_proposals,
360 saved_leaves,
361 saved_payloads,
362 initializer.high_qc,
363 initializer.next_epoch_high_qc,
364 Arc::clone(&consensus_metrics),
365 config.epoch_height,
366 initializer.state_cert,
367 config.drb_difficulty,
368 config.drb_upgrade_difficulty,
369 );
370
371 let consensus = Arc::new(RwLock::new(consensus));
372
373 external_tx.set_await_active(false);
376
377 let inner: Arc<SystemContext<TYPES, I, V>> = Arc::new(SystemContext {
378 id: nonce,
379 consensus: OuterConsensus::new(consensus),
380 instance_state: Arc::new(instance_state),
381 public_key,
382 private_key,
383 state_private_key,
384 config,
385 start_view: initializer.start_view,
386 start_epoch: initializer.start_epoch,
387 network,
388 membership_coordinator,
389 metrics: Arc::clone(&consensus_metrics),
390 internal_event_stream: (internal_tx, internal_rx.deactivate()),
391 output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
392 external_event_stream: (external_tx, external_rx.deactivate()),
393 anchored_leaf: anchored_leaf.clone(),
394 storage,
395 storage_metrics,
396 upgrade_lock,
397 });
398
399 inner
400 }
401
402 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
407 pub async fn start_consensus(&self) {
408 #[cfg(all(feature = "rewind", not(debug_assertions)))]
409 compile_error!("Cannot run rewind in production builds!");
410
411 debug!("Starting Consensus");
412 let consensus = self.consensus.read().await;
413
414 let first_epoch = option_epoch_from_block_number::<TYPES>(
415 V::Base::VERSION >= V::Epochs::VERSION,
416 self.config.epoch_start_block,
417 self.config.epoch_height,
418 );
419 let initial_view_change_epoch = self.start_epoch.max(first_epoch);
423 #[allow(clippy::panic)]
424 self.internal_event_stream
425 .0
426 .broadcast_direct(Arc::new(HotShotEvent::ViewChange(
427 self.start_view,
428 initial_view_change_epoch,
429 )))
430 .await
431 .unwrap_or_else(|_| {
432 panic!(
433 "Genesis Broadcast failed; event = ViewChange({:?}, {:?})",
434 self.start_view, initial_view_change_epoch,
435 )
436 });
437
438 let event_stream = self.internal_event_stream.0.clone();
440 let next_view_timeout = self.config.next_view_timeout;
441 let start_view = self.start_view;
442 let start_epoch = self.start_epoch;
443
444 spawn({
447 async move {
448 sleep(Duration::from_millis(next_view_timeout)).await;
449 broadcast_event(
450 Arc::new(HotShotEvent::Timeout(start_view, start_epoch)),
451 &event_stream,
452 )
453 .await;
454 }
455 });
456 #[allow(clippy::panic)]
457 self.internal_event_stream
458 .0
459 .broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
460 consensus.high_qc().clone(),
461 ))))
462 .await
463 .unwrap_or_else(|_| {
464 panic!(
465 "Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
466 consensus.high_qc()
467 )
468 });
469
470 {
471 if self.anchored_leaf.view_number() == TYPES::View::genesis() {
474 let (validated_state, state_delta) =
475 TYPES::ValidatedState::genesis(&self.instance_state);
476
477 let qc = Arc::new(
478 QuorumCertificate2::genesis::<V>(
479 &validated_state,
480 self.instance_state.as_ref(),
481 )
482 .await,
483 );
484
485 broadcast_event(
486 Event {
487 view_number: self.anchored_leaf.view_number(),
488 event: EventType::Decide {
489 leaf_chain: Arc::new(vec![LeafInfo::new(
490 self.anchored_leaf.clone(),
491 Arc::new(validated_state),
492 Some(Arc::new(state_delta)),
493 None,
494 None,
495 )]),
496 qc,
497 block_size: None,
498 },
499 },
500 &self.external_event_stream.0,
501 )
502 .await;
503 }
504 }
505 }
506
507 async fn send_external_event(&self, event: Event<TYPES>) {
509 debug!(?event, "send_external_event");
510 broadcast_event(event, &self.external_event_stream.0).await;
511 }
512
513 #[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
519 pub async fn publish_transaction_async(
520 &self,
521 transaction: TYPES::Transaction,
522 ) -> Result<(), HotShotError<TYPES>> {
523 trace!("Adding transaction to our own queue");
524
525 let api = self.clone();
526
527 let consensus_reader = api.consensus.read().await;
528 let view_number = consensus_reader.cur_view();
529 let epoch = consensus_reader.cur_epoch();
530 drop(consensus_reader);
531
532 let message_kind: DataMessage<TYPES> =
534 DataMessage::SubmitTransaction(transaction.clone(), view_number);
535 let message = Message {
536 sender: api.public_key.clone(),
537 kind: MessageKind::from(message_kind),
538 };
539
540 let serialized_message = self.upgrade_lock.serialize(&message).await.map_err(|err| {
541 HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
542 })?;
543
544 let membership = match api.membership_coordinator.membership_for_epoch(epoch).await {
545 Ok(m) => m,
546 Err(e) => return Err(HotShotError::InvalidState(e.message)),
547 };
548
549 spawn(async move {
550 let memberships_da_committee_members = membership
551 .da_committee_members(view_number)
552 .await
553 .iter()
554 .cloned()
555 .collect();
556
557 join! {
558 api
566 .network.da_broadcast_message(
567 serialized_message,
568 memberships_da_committee_members,
569 BroadcastDelay::None,
570 ),
571 api
572 .send_external_event(Event {
573 view_number,
574 event: EventType::Transactions {
575 transactions: vec![transaction],
576 },
577 }),
578 }
579 });
580 Ok(())
581 }
582
583 #[must_use]
585 pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
586 Arc::clone(&self.consensus.inner_consensus)
587 }
588
589 pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
591 Arc::clone(&self.instance_state)
592 }
593
594 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
598 pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
599 self.consensus.read().await.decided_leaf()
600 }
601
602 #[must_use]
608 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
609 pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
610 self.consensus.try_read().map(|guard| guard.decided_leaf())
611 }
612
613 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
618 pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
619 Arc::clone(&self.consensus.read().await.decided_state())
620 }
621
622 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
630 pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
631 self.consensus.read().await.state(view).cloned()
632 }
633
634 #[allow(clippy::too_many_arguments)]
648 pub async fn init(
649 public_key: TYPES::SignatureKey,
650 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
651 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
652 node_id: u64,
653 config: HotShotConfig<TYPES>,
654 memberships: EpochMembershipCoordinator<TYPES>,
655 network: Arc<I::Network>,
656 initializer: HotShotInitializer<TYPES>,
657 consensus_metrics: ConsensusMetricsValue,
658 storage: I::Storage,
659 storage_metrics: StorageMetricsValue,
660 ) -> Result<
661 (
662 SystemContextHandle<TYPES, I, V>,
663 Sender<Arc<HotShotEvent<TYPES>>>,
664 Receiver<Arc<HotShotEvent<TYPES>>>,
665 ),
666 HotShotError<TYPES>,
667 > {
668 let hotshot = Self::new(
669 public_key,
670 private_key,
671 state_private_key,
672 node_id,
673 config,
674 memberships,
675 network,
676 initializer,
677 consensus_metrics,
678 storage,
679 storage_metrics,
680 )
681 .await;
682 let handle = Arc::clone(&hotshot).run_tasks().await;
683 let (tx, rx) = hotshot.internal_event_stream.clone();
684
685 Ok((handle, tx, rx.activate()))
686 }
687 #[must_use]
689 pub fn next_view_timeout(&self) -> u64 {
690 self.config.next_view_timeout
691 }
692}
693
694impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
695 pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I, V> {
699 let consensus_registry = ConsensusTaskRegistry::new();
700 let network_registry = NetworkTaskRegistry::new();
701
702 let output_event_stream = self.external_event_stream.clone();
703 let internal_event_stream = self.internal_event_stream.clone();
704
705 let mut handle = SystemContextHandle {
706 consensus_registry,
707 network_registry,
708 output_event_stream: output_event_stream.clone(),
709 internal_event_stream: internal_event_stream.clone(),
710 hotshot: self.clone().into(),
711 storage: self.storage.clone(),
712 network: Arc::clone(&self.network),
713 membership_coordinator: self.membership_coordinator.clone(),
714 epoch_height: self.config.epoch_height,
715 };
716
717 add_network_tasks::<TYPES, I, V>(&mut handle).await;
718 add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
719
720 handle
721 }
722}
723
724type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
726
727#[async_trait]
728pub trait TwinsHandlerState<TYPES, I, V>
730where
731 Self: std::fmt::Debug + Send + Sync,
732 TYPES: NodeType,
733 I: NodeImplementation<TYPES>,
734 V: Versions,
735{
736 async fn send_handler(
738 &mut self,
739 event: &HotShotEvent<TYPES>,
740 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
741
742 async fn recv_handler(
744 &mut self,
745 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
746 ) -> Vec<HotShotEvent<TYPES>>;
747
748 fn fuse_channels(
752 &'static mut self,
753 left: Channel<HotShotEvent<TYPES>>,
754 right: Channel<HotShotEvent<TYPES>>,
755 ) -> Channel<HotShotEvent<TYPES>> {
756 let send_state = Arc::new(RwLock::new(self));
757 let recv_state = Arc::clone(&send_state);
758
759 let (left_sender, mut left_receiver) = (left.0, left.1);
760 let (right_sender, mut right_receiver) = (right.0, right.1);
761
762 let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
764 let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
766 broadcast(EVENT_CHANNEL_SIZE);
767
768 let _recv_loop_handle = spawn(async move {
769 loop {
770 let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
771 Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
772 Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
773 };
774
775 let mut state = recv_state.write().await;
776 let mut result = state.recv_handler(&msg).await;
777
778 while let Some(event) = result.pop() {
779 let _ = sender_to_network.broadcast(event.into()).await;
780 }
781 }
782 });
783
784 let _send_loop_handle = spawn(async move {
785 loop {
786 if let Ok(msg) = receiver_from_network.recv().await {
787 let mut state = send_state.write().await;
788
789 let mut result = state.send_handler(&msg).await;
790
791 while let Some(event) = result.pop() {
792 match event {
793 Either::Left(msg) => {
794 let _ = left_sender.broadcast(msg.into()).await;
795 },
796 Either::Right(msg) => {
797 let _ = right_sender.broadcast(msg.into()).await;
798 },
799 }
800 }
801 }
802 }
803 });
804
805 (network_task_sender, network_task_receiver)
806 }
807
808 #[allow(clippy::too_many_arguments)]
809 async fn spawn_twin_handles(
813 &'static mut self,
814 public_key: TYPES::SignatureKey,
815 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
816 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
817 nonce: u64,
818 config: HotShotConfig<TYPES>,
819 memberships: EpochMembershipCoordinator<TYPES>,
820 network: Arc<I::Network>,
821 initializer: HotShotInitializer<TYPES>,
822 consensus_metrics: ConsensusMetricsValue,
823 storage: I::Storage,
824 storage_metrics: StorageMetricsValue,
825 ) -> (
826 SystemContextHandle<TYPES, I, V>,
827 SystemContextHandle<TYPES, I, V>,
828 ) {
829 let epoch_height = config.epoch_height;
830 let left_system_context = SystemContext::new(
831 public_key.clone(),
832 private_key.clone(),
833 state_private_key.clone(),
834 nonce,
835 config.clone(),
836 memberships.clone(),
837 Arc::clone(&network),
838 initializer.clone(),
839 consensus_metrics.clone(),
840 storage.clone(),
841 storage_metrics.clone(),
842 )
843 .await;
844 let right_system_context = SystemContext::new(
845 public_key,
846 private_key,
847 state_private_key,
848 nonce,
849 config,
850 memberships,
851 network,
852 initializer,
853 consensus_metrics,
854 storage,
855 storage_metrics,
856 )
857 .await;
858
859 let left_consensus_registry = ConsensusTaskRegistry::new();
861 let left_network_registry = NetworkTaskRegistry::new();
862
863 let right_consensus_registry = ConsensusTaskRegistry::new();
864 let right_network_registry = NetworkTaskRegistry::new();
865
866 let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
868 let left_external_event_stream =
869 (left_external_sender, left_external_receiver.deactivate());
870
871 let (right_external_sender, right_external_receiver) =
872 broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
873 let right_external_event_stream =
874 (right_external_sender, right_external_receiver.deactivate());
875
876 let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
878 let left_internal_event_stream = (
879 left_internal_sender.clone(),
880 left_internal_receiver.clone().deactivate(),
881 );
882
883 let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
884 let right_internal_event_stream = (
885 right_internal_sender.clone(),
886 right_internal_receiver.clone().deactivate(),
887 );
888
889 let mut left_handle = SystemContextHandle::<_, I, _> {
891 consensus_registry: left_consensus_registry,
892 network_registry: left_network_registry,
893 output_event_stream: left_external_event_stream.clone(),
894 internal_event_stream: left_internal_event_stream.clone(),
895 hotshot: Arc::clone(&left_system_context),
896 storage: left_system_context.storage.clone(),
897 network: Arc::clone(&left_system_context.network),
898 membership_coordinator: left_system_context.membership_coordinator.clone(),
899 epoch_height,
900 };
901
902 let mut right_handle = SystemContextHandle::<_, I, _> {
903 consensus_registry: right_consensus_registry,
904 network_registry: right_network_registry,
905 output_event_stream: right_external_event_stream.clone(),
906 internal_event_stream: right_internal_event_stream.clone(),
907 hotshot: Arc::clone(&right_system_context),
908 storage: right_system_context.storage.clone(),
909 network: Arc::clone(&right_system_context.network),
910 membership_coordinator: right_system_context.membership_coordinator.clone(),
911 epoch_height,
912 };
913
914 add_consensus_tasks::<TYPES, I, V>(&mut left_handle).await;
916 add_consensus_tasks::<TYPES, I, V>(&mut right_handle).await;
917
918 let fused_internal_event_stream = self.fuse_channels(
920 (left_internal_sender, left_internal_receiver),
921 (right_internal_sender, right_internal_receiver),
922 );
923
924 left_handle.internal_event_stream = (
926 fused_internal_event_stream.0,
927 fused_internal_event_stream.1.deactivate(),
928 );
929
930 add_network_tasks::<TYPES, I, V>(&mut left_handle).await;
932
933 left_handle.internal_event_stream = left_internal_event_stream.clone();
935
936 (left_handle, right_handle)
937 }
938}
939
940#[derive(Debug)]
941pub struct RandomTwinsHandler;
944
945#[async_trait]
946impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
947 for RandomTwinsHandler
948{
949 async fn send_handler(
950 &mut self,
951 event: &HotShotEvent<TYPES>,
952 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
953 let random: bool = rand::thread_rng().gen();
954
955 #[allow(clippy::match_bool)]
956 match random {
957 true => vec![Either::Left(event.clone())],
958 false => vec![Either::Right(event.clone())],
959 }
960 }
961
962 async fn recv_handler(
963 &mut self,
964 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
965 ) -> Vec<HotShotEvent<TYPES>> {
966 match event {
967 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
968 }
969 }
970}
971
972#[derive(Debug)]
973pub struct DoubleTwinsHandler;
976
977#[async_trait]
978impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
979 for DoubleTwinsHandler
980{
981 async fn send_handler(
982 &mut self,
983 event: &HotShotEvent<TYPES>,
984 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
985 vec![Either::Left(event.clone()), Either::Right(event.clone())]
986 }
987
988 async fn recv_handler(
989 &mut self,
990 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
991 ) -> Vec<HotShotEvent<TYPES>> {
992 match event {
993 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
994 }
995 }
996}
997
998#[async_trait]
999impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusApi<TYPES, I>
1000 for SystemContextHandle<TYPES, I, V>
1001{
1002 fn total_nodes(&self) -> NonZeroUsize {
1003 self.hotshot.config.num_nodes_with_stake
1004 }
1005
1006 fn builder_timeout(&self) -> Duration {
1007 self.hotshot.config.builder_timeout
1008 }
1009
1010 async fn send_event(&self, event: Event<TYPES>) {
1011 debug!(?event, "send_event");
1012 broadcast_event(event, &self.hotshot.external_event_stream.0).await;
1013 }
1014
1015 fn public_key(&self) -> &TYPES::SignatureKey {
1016 &self.hotshot.public_key
1017 }
1018
1019 fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
1020 &self.hotshot.private_key
1021 }
1022
1023 fn state_private_key(
1024 &self,
1025 ) -> &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey {
1026 &self.hotshot.state_private_key
1027 }
1028}
1029
1030#[derive(Clone, Debug, PartialEq)]
1031pub struct InitializerEpochInfo<TYPES: NodeType> {
1032 pub epoch: TYPES::Epoch,
1033 pub drb_result: DrbResult,
1034 pub block_header: Option<TYPES::BlockHeader>,
1036}
1037
1038#[derive(Clone)]
1039pub struct HotShotInitializer<TYPES: NodeType> {
1041 pub instance_state: TYPES::InstanceState,
1043
1044 pub epoch_height: u64,
1046
1047 pub epoch_start_block: u64,
1049
1050 pub anchor_leaf: Leaf2<TYPES>,
1052
1053 pub anchor_state: Arc<TYPES::ValidatedState>,
1055
1056 pub anchor_state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1058
1059 pub start_view: TYPES::View,
1061
1062 pub last_actioned_view: TYPES::View,
1065
1066 pub start_epoch: Option<TYPES::Epoch>,
1068
1069 pub high_qc: QuorumCertificate2<TYPES>,
1073
1074 pub next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
1076
1077 pub saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1079
1080 pub decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1082
1083 pub undecided_leaves: BTreeMap<TYPES::View, Leaf2<TYPES>>,
1086
1087 pub undecided_state: BTreeMap<TYPES::View, View<TYPES>>,
1089
1090 pub saved_vid_shares: VidShares<TYPES>,
1092
1093 pub state_cert: Option<LightClientStateUpdateCertificate<TYPES>>,
1095
1096 pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1098}
1099
1100impl<TYPES: NodeType> HotShotInitializer<TYPES> {
1101 pub async fn from_genesis<V: Versions>(
1105 instance_state: TYPES::InstanceState,
1106 epoch_height: u64,
1107 epoch_start_block: u64,
1108 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1109 ) -> Result<Self, HotShotError<TYPES>> {
1110 let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
1111 let high_qc = QuorumCertificate2::genesis::<V>(&validated_state, &instance_state).await;
1112
1113 Ok(Self {
1114 anchor_leaf: Leaf2::genesis::<V>(&validated_state, &instance_state).await,
1115 anchor_state: Arc::new(validated_state),
1116 anchor_state_delta: Some(Arc::new(state_delta)),
1117 start_view: TYPES::View::new(0),
1118 start_epoch: genesis_epoch_from_version::<V, TYPES>(),
1119 last_actioned_view: TYPES::View::new(0),
1120 saved_proposals: BTreeMap::new(),
1121 high_qc,
1122 next_epoch_high_qc: None,
1123 decided_upgrade_certificate: None,
1124 undecided_leaves: BTreeMap::new(),
1125 undecided_state: BTreeMap::new(),
1126 instance_state,
1127 saved_vid_shares: BTreeMap::new(),
1128 epoch_height,
1129 state_cert: None,
1130 epoch_start_block,
1131 start_epoch_info,
1132 })
1133 }
1134
1135 #[must_use]
1137 pub fn update_undecided(self) -> Self {
1138 let mut undecided_leaves = self.undecided_leaves.clone();
1139 let mut undecided_state = self.undecided_state.clone();
1140
1141 for proposal in self.saved_proposals.values() {
1142 if proposal.data.view_number() <= self.anchor_leaf.view_number() {
1144 continue;
1145 }
1146
1147 undecided_leaves.insert(
1148 proposal.data.view_number(),
1149 Leaf2::from_quorum_proposal(&proposal.data),
1150 );
1151 }
1152
1153 for leaf in undecided_leaves.values() {
1154 let view_inner = ViewInner::Leaf {
1155 leaf: leaf.commit(),
1156 state: Arc::new(TYPES::ValidatedState::from_header(leaf.block_header())),
1157 delta: None,
1158 epoch: leaf.epoch(self.epoch_height),
1159 };
1160 let view = View { view_inner };
1161
1162 undecided_state.insert(leaf.view_number(), view);
1163 }
1164
1165 Self {
1166 undecided_leaves,
1167 undecided_state,
1168 ..self
1169 }
1170 }
1171
1172 #[allow(clippy::too_many_arguments)]
1180 pub fn load(
1181 instance_state: TYPES::InstanceState,
1182 epoch_height: u64,
1183 epoch_start_block: u64,
1184 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1185 anchor_leaf: Leaf2<TYPES>,
1186 (start_view, start_epoch): (TYPES::View, Option<TYPES::Epoch>),
1187 (high_qc, next_epoch_high_qc): (
1188 QuorumCertificate2<TYPES>,
1189 Option<NextEpochQuorumCertificate2<TYPES>>,
1190 ),
1191 last_actioned_view: TYPES::View,
1192 saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1193 saved_vid_shares: VidShares<TYPES>,
1194 decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1195 state_cert: Option<LightClientStateUpdateCertificate<TYPES>>,
1196 ) -> Self {
1197 let anchor_state = Arc::new(TYPES::ValidatedState::from_header(
1198 anchor_leaf.block_header(),
1199 ));
1200 let anchor_state_delta = None;
1201
1202 let initializer = Self {
1203 instance_state,
1204 epoch_height,
1205 epoch_start_block,
1206 anchor_leaf,
1207 anchor_state,
1208 anchor_state_delta,
1209 high_qc,
1210 start_view,
1211 start_epoch,
1212 last_actioned_view,
1213 saved_proposals,
1214 saved_vid_shares,
1215 next_epoch_high_qc,
1216 decided_upgrade_certificate,
1217 undecided_leaves: BTreeMap::new(),
1218 undecided_state: BTreeMap::new(),
1219 state_cert,
1220 start_epoch_info,
1221 };
1222
1223 initializer.update_undecided()
1224 }
1225}
1226
1227async fn load_start_epoch_info<TYPES: NodeType>(
1228 membership: &Arc<RwLock<TYPES::Membership>>,
1229 start_epoch_info: &Vec<InitializerEpochInfo<TYPES>>,
1230 epoch_height: u64,
1231 epoch_start_block: u64,
1232) {
1233 let first_epoch_number =
1234 TYPES::Epoch::new(epoch_from_block_number(epoch_start_block, epoch_height));
1235
1236 tracing::warn!("Calling set_first_epoch for epoch {first_epoch_number}");
1237 membership
1238 .write()
1239 .await
1240 .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
1241
1242 for epoch_info in start_epoch_info {
1243 if let Some(block_header) = &epoch_info.block_header {
1244 tracing::info!("Calling add_epoch_root for epoch {}", epoch_info.epoch);
1245
1246 Membership::add_epoch_root(
1247 Arc::clone(membership),
1248 epoch_info.epoch,
1249 block_header.clone(),
1250 )
1251 .await
1252 .unwrap_or_else(|err| {
1253 tracing::error!(
1255 "Failed to add epoch root for epoch {}: {err}",
1256 epoch_info.epoch
1257 );
1258 });
1259 }
1260 }
1261
1262 for epoch_info in start_epoch_info {
1263 tracing::info!("Calling add_drb_result for epoch {}", epoch_info.epoch);
1264 membership
1265 .write()
1266 .await
1267 .add_drb_result(epoch_info.epoch, epoch_info.drb_result);
1268 }
1269}