1#[cfg(feature = "docs")]
12pub mod documentation;
13use committable::Committable;
14use futures::future::{select, Either};
15use hotshot_types::{
16 drb::{drb_difficulty_selector, DrbResult, INITIAL_DRB_RESULT},
17 epoch_membership::EpochMembershipCoordinator,
18 message::UpgradeLock,
19 simple_certificate::LightClientStateUpdateCertificateV2,
20 traits::{
21 block_contents::BlockHeader, election::Membership, network::BroadcastDelay,
22 node_implementation::Versions, signature_key::StateSignatureKey, storage::Storage,
23 },
24 utils::epoch_from_block_number,
25};
26use rand::Rng;
27use vbs::version::StaticVersionType;
28
29pub mod traits;
31pub mod types;
33
34pub mod tasks;
35use hotshot_types::data::QuorumProposalWrapper;
36
37pub mod helpers;
39
40use std::{
41 collections::{BTreeMap, HashMap},
42 num::NonZeroUsize,
43 sync::Arc,
44 time::Duration,
45};
46
47use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
48use async_lock::RwLock;
49use async_trait::async_trait;
50use futures::join;
51use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
52use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
53pub use hotshot_types::error::HotShotError;
56use hotshot_types::{
57 consensus::{
58 Consensus, ConsensusMetricsValue, OuterConsensus, PayloadWithMetadata, VidShares, View,
59 ViewInner,
60 },
61 constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
62 data::Leaf2,
63 event::{EventType, LeafInfo},
64 message::{DataMessage, Message, MessageKind, Proposal},
65 simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
66 storage_metrics::StorageMetricsValue,
67 traits::{
68 consensus_api::ConsensusApi,
69 network::ConnectedNetwork,
70 node_implementation::{ConsensusTime, NodeType},
71 signature_key::SignatureKey,
72 states::ValidatedState,
73 },
74 utils::{genesis_epoch_from_version, option_epoch_from_block_number},
75 HotShotConfig,
76};
77pub use rand;
79use tokio::{spawn, time::sleep};
80use tracing::{debug, instrument, trace};
81
82use crate::{
85 tasks::{add_consensus_tasks, add_network_tasks},
86 traits::NodeImplementation,
87 types::{Event, SystemContextHandle},
88};
89
90pub const H_512: usize = 64;
92pub const H_256: usize = 32;
94
95pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
97 public_key: TYPES::SignatureKey,
99
100 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
102
103 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106 pub config: HotShotConfig<TYPES>,
108
109 pub network: Arc<I::Network>,
111
112 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
114
115 metrics: Arc<ConsensusMetricsValue>,
117
118 consensus: OuterConsensus<TYPES>,
120
121 instance_state: Arc<TYPES::InstanceState>,
123
124 start_view: TYPES::View,
126
127 start_epoch: Option<TYPES::Epoch>,
129
130 output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
132
133 pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
135
136 anchored_leaf: Leaf2<TYPES>,
138
139 #[allow(clippy::type_complexity)]
141 internal_event_stream: (
142 Sender<Arc<HotShotEvent<TYPES>>>,
143 InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
144 ),
145
146 pub id: u64,
148
149 pub storage: I::Storage,
151
152 pub storage_metrics: Arc<StorageMetricsValue>,
154
155 pub upgrade_lock: UpgradeLock<TYPES, V>,
157}
158impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Clone
159 for SystemContext<TYPES, I, V>
160{
161 #![allow(deprecated)]
162 fn clone(&self) -> Self {
163 Self {
164 public_key: self.public_key.clone(),
165 private_key: self.private_key.clone(),
166 state_private_key: self.state_private_key.clone(),
167 config: self.config.clone(),
168 network: Arc::clone(&self.network),
169 membership_coordinator: self.membership_coordinator.clone(),
170 metrics: Arc::clone(&self.metrics),
171 consensus: self.consensus.clone(),
172 instance_state: Arc::clone(&self.instance_state),
173 start_view: self.start_view,
174 start_epoch: self.start_epoch,
175 output_event_stream: self.output_event_stream.clone(),
176 external_event_stream: self.external_event_stream.clone(),
177 anchored_leaf: self.anchored_leaf.clone(),
178 internal_event_stream: self.internal_event_stream.clone(),
179 id: self.id,
180 storage: self.storage.clone(),
181 storage_metrics: Arc::clone(&self.storage_metrics),
182 upgrade_lock: self.upgrade_lock.clone(),
183 }
184 }
185}
186
187impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
188 #![allow(deprecated)]
189 #[allow(clippy::too_many_arguments)]
200 pub async fn new(
201 public_key: TYPES::SignatureKey,
202 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
203 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
204 nonce: u64,
205 config: HotShotConfig<TYPES>,
206 memberships: EpochMembershipCoordinator<TYPES>,
207 network: Arc<I::Network>,
208 initializer: HotShotInitializer<TYPES>,
209 consensus_metrics: ConsensusMetricsValue,
210 storage: I::Storage,
211 storage_metrics: StorageMetricsValue,
212 ) -> Arc<Self> {
213 let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
214 let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
215
216 Self::new_from_channels(
217 public_key,
218 private_key,
219 state_private_key,
220 nonce,
221 config,
222 memberships,
223 network,
224 initializer,
225 consensus_metrics,
226 storage,
227 storage_metrics,
228 internal_chan,
229 external_chan,
230 )
231 .await
232 }
233
234 #[allow(clippy::too_many_arguments, clippy::type_complexity)]
242 pub async fn new_from_channels(
243 public_key: TYPES::SignatureKey,
244 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
245 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
246 nonce: u64,
247 config: HotShotConfig<TYPES>,
248 membership_coordinator: EpochMembershipCoordinator<TYPES>,
249 network: Arc<I::Network>,
250 initializer: HotShotInitializer<TYPES>,
251 consensus_metrics: ConsensusMetricsValue,
252 storage: I::Storage,
253 storage_metrics: StorageMetricsValue,
254 internal_channel: (
255 Sender<Arc<HotShotEvent<TYPES>>>,
256 Receiver<Arc<HotShotEvent<TYPES>>>,
257 ),
258 external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
259 ) -> Arc<Self> {
260 debug!("Creating a new hotshot");
261
262 tracing::warn!("Starting consensus with HotShotConfig:\n\n {config:?}");
263
264 let consensus_metrics = Arc::new(consensus_metrics);
265 let storage_metrics = Arc::new(storage_metrics);
266 let anchored_leaf = initializer.anchor_leaf;
267 let instance_state = initializer.instance_state;
268
269 let (internal_tx, mut internal_rx) = internal_channel;
270 let (mut external_tx, mut external_rx) = external_channel;
271
272 tracing::warn!(
273 "Starting consensus with versions:\n\n Base: {:?}\nUpgrade: {:?}.",
274 V::Base::VERSION,
275 V::Upgrade::VERSION,
276 );
277 tracing::warn!(
278 "Loading previously decided upgrade certificate from storage: {:?}",
279 initializer.decided_upgrade_certificate
280 );
281
282 let upgrade_lock =
283 UpgradeLock::<TYPES, V>::from_certificate(&initializer.decided_upgrade_certificate);
284
285 let current_version = if let Some(cert) = initializer.decided_upgrade_certificate {
286 cert.data.new_version
287 } else {
288 V::Base::VERSION
289 };
290
291 debug!("Setting DRB difficulty selector in membership");
292 let drb_difficulty_selector = drb_difficulty_selector(upgrade_lock.clone(), &config);
293
294 membership_coordinator
295 .set_drb_difficulty_selector(drb_difficulty_selector)
296 .await;
297
298 for da_committee in &config.da_committees {
299 if current_version >= da_committee.start_version {
300 membership_coordinator
301 .membership()
302 .write()
303 .await
304 .add_da_committee(da_committee.start_epoch, da_committee.committee.clone());
305 }
306 }
307
308 external_rx.set_overflow(true);
310
311 internal_rx.set_overflow(true);
314
315 let validated_state = initializer.anchor_state;
318
319 load_start_epoch_info(
320 membership_coordinator.membership(),
321 &initializer.start_epoch_info,
322 config.epoch_height,
323 config.epoch_start_block,
324 )
325 .await;
326
327 let epoch = initializer.high_qc.data.block_number.map(|block_number| {
329 TYPES::Epoch::new(epoch_from_block_number(
330 block_number + 1,
331 config.epoch_height,
332 ))
333 });
334
335 let mut validated_state_map = BTreeMap::default();
337 validated_state_map.insert(
338 anchored_leaf.view_number(),
339 View {
340 view_inner: ViewInner::Leaf {
341 leaf: anchored_leaf.commit(),
342 state: Arc::clone(&validated_state),
343 delta: initializer.anchor_state_delta,
344 epoch,
345 },
346 },
347 );
348 for (view_num, inner) in initializer.undecided_state {
349 validated_state_map.insert(view_num, inner);
350 }
351
352 let mut saved_leaves = HashMap::new();
353 let mut saved_payloads = BTreeMap::new();
354 saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
355
356 for (_, leaf) in initializer.undecided_leaves {
357 saved_leaves.insert(leaf.commit(), leaf.clone());
358 }
359 if let Some(payload) = anchored_leaf.block_payload() {
360 let metadata = anchored_leaf.block_header().metadata().clone();
361 saved_payloads.insert(
362 anchored_leaf.view_number(),
363 Arc::new(PayloadWithMetadata { payload, metadata }),
364 );
365 }
366
367 let consensus = Consensus::new(
368 validated_state_map,
369 Some(initializer.saved_vid_shares),
370 anchored_leaf.view_number(),
371 epoch,
372 anchored_leaf.view_number(),
373 anchored_leaf.view_number(),
374 initializer.last_actioned_view,
375 initializer.saved_proposals,
376 saved_leaves,
377 saved_payloads,
378 initializer.high_qc,
379 initializer.next_epoch_high_qc,
380 Arc::clone(&consensus_metrics),
381 config.epoch_height,
382 initializer.state_cert,
383 config.drb_difficulty,
384 config.drb_upgrade_difficulty,
385 );
386
387 let consensus = Arc::new(RwLock::new(consensus));
388
389 if let Some(epoch) = epoch {
390 let _ = membership_coordinator
392 .membership_for_epoch(Some(epoch))
393 .await;
394 let _ = membership_coordinator
395 .membership_for_epoch(Some(epoch + 1))
396 .await;
397
398 if let Ok(drb_result) = storage.load_drb_result(epoch + 1).await {
399 tracing::error!("Writing DRB result for epoch {}", epoch + 1);
400 if let Ok(mem) = membership_coordinator
401 .stake_table_for_epoch(Some(epoch + 1))
402 .await
403 {
404 mem.add_drb_result(drb_result).await;
405 }
406 }
407 }
408
409 external_tx.set_await_active(false);
412
413 let inner: Arc<SystemContext<TYPES, I, V>> = Arc::new(SystemContext {
414 id: nonce,
415 consensus: OuterConsensus::new(consensus),
416 instance_state: Arc::new(instance_state),
417 public_key,
418 private_key,
419 state_private_key,
420 config,
421 start_view: initializer.start_view,
422 start_epoch: initializer.start_epoch,
423 network,
424 membership_coordinator,
425 metrics: Arc::clone(&consensus_metrics),
426 internal_event_stream: (internal_tx, internal_rx.deactivate()),
427 output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
428 external_event_stream: (external_tx, external_rx.deactivate()),
429 anchored_leaf: anchored_leaf.clone(),
430 storage,
431 storage_metrics,
432 upgrade_lock,
433 });
434
435 inner
436 }
437
438 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
443 pub async fn start_consensus(&self) {
444 #[cfg(all(feature = "rewind", not(debug_assertions)))]
445 compile_error!("Cannot run rewind in production builds!");
446
447 debug!("Starting Consensus");
448 let consensus = self.consensus.read().await;
449
450 let first_epoch = option_epoch_from_block_number::<TYPES>(
451 V::Base::VERSION >= V::Epochs::VERSION,
452 self.config.epoch_start_block,
453 self.config.epoch_height,
454 );
455 let initial_view_change_epoch = self.start_epoch.max(first_epoch);
459 #[allow(clippy::panic)]
460 self.internal_event_stream
461 .0
462 .broadcast_direct(Arc::new(HotShotEvent::ViewChange(
463 self.start_view,
464 initial_view_change_epoch,
465 )))
466 .await
467 .unwrap_or_else(|_| {
468 panic!(
469 "Genesis Broadcast failed; event = ViewChange({:?}, {:?})",
470 self.start_view, initial_view_change_epoch,
471 )
472 });
473
474 let event_stream = self.internal_event_stream.0.clone();
476 let next_view_timeout = self.config.next_view_timeout;
477 let start_view = self.start_view;
478 let start_epoch = self.start_epoch;
479
480 spawn({
483 async move {
484 sleep(Duration::from_millis(next_view_timeout)).await;
485 broadcast_event(
486 Arc::new(HotShotEvent::Timeout(start_view, start_epoch)),
487 &event_stream,
488 )
489 .await;
490 }
491 });
492 #[allow(clippy::panic)]
493 self.internal_event_stream
494 .0
495 .broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
496 consensus.high_qc().clone(),
497 ))))
498 .await
499 .unwrap_or_else(|_| {
500 panic!(
501 "Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
502 consensus.high_qc()
503 )
504 });
505
506 {
507 if self.anchored_leaf.view_number() == TYPES::View::genesis() {
510 let (validated_state, state_delta) =
511 TYPES::ValidatedState::genesis(&self.instance_state);
512
513 let qc = Arc::new(
514 QuorumCertificate2::genesis::<V>(
515 &validated_state,
516 self.instance_state.as_ref(),
517 )
518 .await,
519 );
520
521 broadcast_event(
522 Event {
523 view_number: self.anchored_leaf.view_number(),
524 event: EventType::Decide {
525 leaf_chain: Arc::new(vec![LeafInfo::new(
526 self.anchored_leaf.clone(),
527 Arc::new(validated_state),
528 Some(Arc::new(state_delta)),
529 None,
530 None,
531 )]),
532 qc,
533 block_size: None,
534 },
535 },
536 &self.external_event_stream.0,
537 )
538 .await;
539 }
540 }
541 }
542
543 async fn send_external_event(&self, event: Event<TYPES>) {
545 debug!(?event, "send_external_event");
546 broadcast_event(event, &self.external_event_stream.0).await;
547 }
548
549 #[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
555 pub async fn publish_transaction_async(
556 &self,
557 transaction: TYPES::Transaction,
558 ) -> Result<(), HotShotError<TYPES>> {
559 trace!("Adding transaction to our own queue");
560
561 let api = self.clone();
562
563 let consensus_reader = api.consensus.read().await;
564 let view_number = consensus_reader.cur_view();
565 let epoch = consensus_reader.cur_epoch();
566 drop(consensus_reader);
567
568 let message_kind: DataMessage<TYPES> =
570 DataMessage::SubmitTransaction(transaction.clone(), view_number);
571 let message = Message {
572 sender: api.public_key.clone(),
573 kind: MessageKind::from(message_kind),
574 };
575
576 let serialized_message = self.upgrade_lock.serialize(&message).await.map_err(|err| {
577 HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
578 })?;
579
580 let membership = match api.membership_coordinator.membership_for_epoch(epoch).await {
581 Ok(m) => m,
582 Err(e) => return Err(HotShotError::InvalidState(e.message)),
583 };
584
585 spawn(async move {
586 let memberships_da_committee_members = membership
587 .da_committee_members(view_number)
588 .await
589 .iter()
590 .cloned()
591 .collect();
592
593 join! {
594 api
602 .network.da_broadcast_message(
603 serialized_message,
604 memberships_da_committee_members,
605 BroadcastDelay::None,
606 ),
607 api
608 .send_external_event(Event {
609 view_number,
610 event: EventType::Transactions {
611 transactions: vec![transaction],
612 },
613 }),
614 }
615 });
616 Ok(())
617 }
618
619 #[must_use]
621 pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
622 Arc::clone(&self.consensus.inner_consensus)
623 }
624
625 pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
627 Arc::clone(&self.instance_state)
628 }
629
630 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
634 pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
635 self.consensus.read().await.decided_leaf()
636 }
637
638 #[must_use]
644 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
645 pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
646 self.consensus.try_read().map(|guard| guard.decided_leaf())
647 }
648
649 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
654 pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
655 Arc::clone(&self.consensus.read().await.decided_state())
656 }
657
658 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
666 pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
667 self.consensus.read().await.state(view).cloned()
668 }
669
670 #[allow(clippy::too_many_arguments)]
684 pub async fn init(
685 public_key: TYPES::SignatureKey,
686 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
687 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
688 node_id: u64,
689 config: HotShotConfig<TYPES>,
690 memberships: EpochMembershipCoordinator<TYPES>,
691 network: Arc<I::Network>,
692 initializer: HotShotInitializer<TYPES>,
693 consensus_metrics: ConsensusMetricsValue,
694 storage: I::Storage,
695 storage_metrics: StorageMetricsValue,
696 ) -> Result<
697 (
698 SystemContextHandle<TYPES, I, V>,
699 Sender<Arc<HotShotEvent<TYPES>>>,
700 Receiver<Arc<HotShotEvent<TYPES>>>,
701 ),
702 HotShotError<TYPES>,
703 > {
704 let hotshot = Self::new(
705 public_key,
706 private_key,
707 state_private_key,
708 node_id,
709 config,
710 memberships,
711 network,
712 initializer,
713 consensus_metrics,
714 storage,
715 storage_metrics,
716 )
717 .await;
718 let handle = Arc::clone(&hotshot).run_tasks().await;
719 let (tx, rx) = hotshot.internal_event_stream.clone();
720
721 Ok((handle, tx, rx.activate()))
722 }
723 #[must_use]
725 pub fn next_view_timeout(&self) -> u64 {
726 self.config.next_view_timeout
727 }
728}
729
730impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
731 pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I, V> {
735 let consensus_registry = ConsensusTaskRegistry::new();
736 let network_registry = NetworkTaskRegistry::new();
737
738 let output_event_stream = self.external_event_stream.clone();
739 let internal_event_stream = self.internal_event_stream.clone();
740
741 let mut handle = SystemContextHandle {
742 consensus_registry,
743 network_registry,
744 output_event_stream: output_event_stream.clone(),
745 internal_event_stream: internal_event_stream.clone(),
746 hotshot: self.clone().into(),
747 storage: self.storage.clone(),
748 network: Arc::clone(&self.network),
749 membership_coordinator: self.membership_coordinator.clone(),
750 epoch_height: self.config.epoch_height,
751 };
752
753 add_network_tasks::<TYPES, I, V>(&mut handle).await;
754 add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
755
756 handle
757 }
758}
759
760type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
762
763#[async_trait]
764pub trait TwinsHandlerState<TYPES, I, V>
766where
767 Self: std::fmt::Debug + Send + Sync,
768 TYPES: NodeType,
769 I: NodeImplementation<TYPES>,
770 V: Versions,
771{
772 async fn send_handler(
774 &mut self,
775 event: &HotShotEvent<TYPES>,
776 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
777
778 async fn recv_handler(
780 &mut self,
781 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
782 ) -> Vec<HotShotEvent<TYPES>>;
783
784 fn fuse_channels(
788 &'static mut self,
789 left: Channel<HotShotEvent<TYPES>>,
790 right: Channel<HotShotEvent<TYPES>>,
791 ) -> Channel<HotShotEvent<TYPES>> {
792 let send_state = Arc::new(RwLock::new(self));
793 let recv_state = Arc::clone(&send_state);
794
795 let (left_sender, mut left_receiver) = (left.0, left.1);
796 let (right_sender, mut right_receiver) = (right.0, right.1);
797
798 let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
800 let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
802 broadcast(EVENT_CHANNEL_SIZE);
803
804 let _recv_loop_handle = spawn(async move {
805 loop {
806 let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
807 Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
808 Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
809 };
810
811 let mut state = recv_state.write().await;
812 let mut result = state.recv_handler(&msg).await;
813
814 while let Some(event) = result.pop() {
815 let _ = sender_to_network.broadcast(event.into()).await;
816 }
817 }
818 });
819
820 let _send_loop_handle = spawn(async move {
821 loop {
822 if let Ok(msg) = receiver_from_network.recv().await {
823 let mut state = send_state.write().await;
824
825 let mut result = state.send_handler(&msg).await;
826
827 while let Some(event) = result.pop() {
828 match event {
829 Either::Left(msg) => {
830 let _ = left_sender.broadcast(msg.into()).await;
831 },
832 Either::Right(msg) => {
833 let _ = right_sender.broadcast(msg.into()).await;
834 },
835 }
836 }
837 }
838 }
839 });
840
841 (network_task_sender, network_task_receiver)
842 }
843
844 #[allow(clippy::too_many_arguments)]
845 async fn spawn_twin_handles(
849 &'static mut self,
850 public_key: TYPES::SignatureKey,
851 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
852 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
853 nonce: u64,
854 config: HotShotConfig<TYPES>,
855 memberships: EpochMembershipCoordinator<TYPES>,
856 network: Arc<I::Network>,
857 initializer: HotShotInitializer<TYPES>,
858 consensus_metrics: ConsensusMetricsValue,
859 storage: I::Storage,
860 storage_metrics: StorageMetricsValue,
861 ) -> (
862 SystemContextHandle<TYPES, I, V>,
863 SystemContextHandle<TYPES, I, V>,
864 ) {
865 let epoch_height = config.epoch_height;
866 let left_system_context = SystemContext::new(
867 public_key.clone(),
868 private_key.clone(),
869 state_private_key.clone(),
870 nonce,
871 config.clone(),
872 memberships.clone(),
873 Arc::clone(&network),
874 initializer.clone(),
875 consensus_metrics.clone(),
876 storage.clone(),
877 storage_metrics.clone(),
878 )
879 .await;
880 let right_system_context = SystemContext::new(
881 public_key,
882 private_key,
883 state_private_key,
884 nonce,
885 config,
886 memberships,
887 network,
888 initializer,
889 consensus_metrics,
890 storage,
891 storage_metrics,
892 )
893 .await;
894
895 let left_consensus_registry = ConsensusTaskRegistry::new();
897 let left_network_registry = NetworkTaskRegistry::new();
898
899 let right_consensus_registry = ConsensusTaskRegistry::new();
900 let right_network_registry = NetworkTaskRegistry::new();
901
902 let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
904 let left_external_event_stream =
905 (left_external_sender, left_external_receiver.deactivate());
906
907 let (right_external_sender, right_external_receiver) =
908 broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
909 let right_external_event_stream =
910 (right_external_sender, right_external_receiver.deactivate());
911
912 let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
914 let left_internal_event_stream = (
915 left_internal_sender.clone(),
916 left_internal_receiver.clone().deactivate(),
917 );
918
919 let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
920 let right_internal_event_stream = (
921 right_internal_sender.clone(),
922 right_internal_receiver.clone().deactivate(),
923 );
924
925 let mut left_handle = SystemContextHandle::<_, I, _> {
927 consensus_registry: left_consensus_registry,
928 network_registry: left_network_registry,
929 output_event_stream: left_external_event_stream.clone(),
930 internal_event_stream: left_internal_event_stream.clone(),
931 hotshot: Arc::clone(&left_system_context),
932 storage: left_system_context.storage.clone(),
933 network: Arc::clone(&left_system_context.network),
934 membership_coordinator: left_system_context.membership_coordinator.clone(),
935 epoch_height,
936 };
937
938 let mut right_handle = SystemContextHandle::<_, I, _> {
939 consensus_registry: right_consensus_registry,
940 network_registry: right_network_registry,
941 output_event_stream: right_external_event_stream.clone(),
942 internal_event_stream: right_internal_event_stream.clone(),
943 hotshot: Arc::clone(&right_system_context),
944 storage: right_system_context.storage.clone(),
945 network: Arc::clone(&right_system_context.network),
946 membership_coordinator: right_system_context.membership_coordinator.clone(),
947 epoch_height,
948 };
949
950 add_consensus_tasks::<TYPES, I, V>(&mut left_handle).await;
952 add_consensus_tasks::<TYPES, I, V>(&mut right_handle).await;
953
954 let fused_internal_event_stream = self.fuse_channels(
956 (left_internal_sender, left_internal_receiver),
957 (right_internal_sender, right_internal_receiver),
958 );
959
960 left_handle.internal_event_stream = (
962 fused_internal_event_stream.0,
963 fused_internal_event_stream.1.deactivate(),
964 );
965
966 add_network_tasks::<TYPES, I, V>(&mut left_handle).await;
968
969 left_handle.internal_event_stream = left_internal_event_stream.clone();
971
972 (left_handle, right_handle)
973 }
974}
975
976#[derive(Debug)]
977pub struct RandomTwinsHandler;
980
981#[async_trait]
982impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
983 for RandomTwinsHandler
984{
985 async fn send_handler(
986 &mut self,
987 event: &HotShotEvent<TYPES>,
988 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
989 let random: bool = rand::thread_rng().gen();
990
991 #[allow(clippy::match_bool)]
992 match random {
993 true => vec![Either::Left(event.clone())],
994 false => vec![Either::Right(event.clone())],
995 }
996 }
997
998 async fn recv_handler(
999 &mut self,
1000 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1001 ) -> Vec<HotShotEvent<TYPES>> {
1002 match event {
1003 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1004 }
1005 }
1006}
1007
1008#[derive(Debug)]
1009pub struct DoubleTwinsHandler;
1012
1013#[async_trait]
1014impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
1015 for DoubleTwinsHandler
1016{
1017 async fn send_handler(
1018 &mut self,
1019 event: &HotShotEvent<TYPES>,
1020 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1021 vec![Either::Left(event.clone()), Either::Right(event.clone())]
1022 }
1023
1024 async fn recv_handler(
1025 &mut self,
1026 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1027 ) -> Vec<HotShotEvent<TYPES>> {
1028 match event {
1029 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1030 }
1031 }
1032}
1033
1034#[async_trait]
1035impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusApi<TYPES, I>
1036 for SystemContextHandle<TYPES, I, V>
1037{
1038 fn total_nodes(&self) -> NonZeroUsize {
1039 self.hotshot.config.num_nodes_with_stake
1040 }
1041
1042 fn builder_timeout(&self) -> Duration {
1043 self.hotshot.config.builder_timeout
1044 }
1045
1046 async fn send_event(&self, event: Event<TYPES>) {
1047 debug!(?event, "send_event");
1048 broadcast_event(event, &self.hotshot.external_event_stream.0).await;
1049 }
1050
1051 fn public_key(&self) -> &TYPES::SignatureKey {
1052 &self.hotshot.public_key
1053 }
1054
1055 fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
1056 &self.hotshot.private_key
1057 }
1058
1059 fn state_private_key(
1060 &self,
1061 ) -> &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey {
1062 &self.hotshot.state_private_key
1063 }
1064}
1065
1066#[derive(Clone, Debug, PartialEq)]
1067pub struct InitializerEpochInfo<TYPES: NodeType> {
1068 pub epoch: TYPES::Epoch,
1069 pub drb_result: DrbResult,
1070 pub block_header: Option<TYPES::BlockHeader>,
1072}
1073
1074#[derive(Clone, Debug)]
1075pub struct HotShotInitializer<TYPES: NodeType> {
1077 pub instance_state: TYPES::InstanceState,
1079
1080 pub epoch_height: u64,
1082
1083 pub epoch_start_block: u64,
1085
1086 pub anchor_leaf: Leaf2<TYPES>,
1088
1089 pub anchor_state: Arc<TYPES::ValidatedState>,
1091
1092 pub anchor_state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1094
1095 pub start_view: TYPES::View,
1097
1098 pub last_actioned_view: TYPES::View,
1101
1102 pub start_epoch: Option<TYPES::Epoch>,
1104
1105 pub high_qc: QuorumCertificate2<TYPES>,
1109
1110 pub next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
1112
1113 pub saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1115
1116 pub decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1118
1119 pub undecided_leaves: BTreeMap<TYPES::View, Leaf2<TYPES>>,
1122
1123 pub undecided_state: BTreeMap<TYPES::View, View<TYPES>>,
1125
1126 pub saved_vid_shares: VidShares<TYPES>,
1128
1129 pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1131
1132 pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1134}
1135
1136impl<TYPES: NodeType> HotShotInitializer<TYPES> {
1137 pub async fn from_genesis<V: Versions>(
1141 instance_state: TYPES::InstanceState,
1142 epoch_height: u64,
1143 epoch_start_block: u64,
1144 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1145 ) -> Result<Self, HotShotError<TYPES>> {
1146 let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
1147 let high_qc = QuorumCertificate2::genesis::<V>(&validated_state, &instance_state).await;
1148
1149 Ok(Self {
1150 anchor_leaf: Leaf2::genesis::<V>(&validated_state, &instance_state).await,
1151 anchor_state: Arc::new(validated_state),
1152 anchor_state_delta: Some(Arc::new(state_delta)),
1153 start_view: TYPES::View::new(0),
1154 start_epoch: genesis_epoch_from_version::<V, TYPES>(),
1155 last_actioned_view: TYPES::View::new(0),
1156 saved_proposals: BTreeMap::new(),
1157 high_qc,
1158 next_epoch_high_qc: None,
1159 decided_upgrade_certificate: None,
1160 undecided_leaves: BTreeMap::new(),
1161 undecided_state: BTreeMap::new(),
1162 instance_state,
1163 saved_vid_shares: BTreeMap::new(),
1164 epoch_height,
1165 state_cert: None,
1166 epoch_start_block,
1167 start_epoch_info,
1168 })
1169 }
1170
1171 #[must_use]
1173 pub fn update_undecided(self) -> Self {
1174 let mut undecided_leaves = self.undecided_leaves.clone();
1175 let mut undecided_state = self.undecided_state.clone();
1176
1177 for proposal in self.saved_proposals.values() {
1178 if proposal.data.view_number() <= self.anchor_leaf.view_number() {
1180 continue;
1181 }
1182
1183 undecided_leaves.insert(
1184 proposal.data.view_number(),
1185 Leaf2::from_quorum_proposal(&proposal.data),
1186 );
1187 }
1188
1189 for leaf in undecided_leaves.values() {
1190 let view_inner = ViewInner::Leaf {
1191 leaf: leaf.commit(),
1192 state: Arc::new(TYPES::ValidatedState::from_header(leaf.block_header())),
1193 delta: None,
1194 epoch: leaf.epoch(self.epoch_height),
1195 };
1196 let view = View { view_inner };
1197
1198 undecided_state.insert(leaf.view_number(), view);
1199 }
1200
1201 Self {
1202 undecided_leaves,
1203 undecided_state,
1204 ..self
1205 }
1206 }
1207
1208 #[allow(clippy::too_many_arguments)]
1216 pub fn load(
1217 instance_state: TYPES::InstanceState,
1218 epoch_height: u64,
1219 epoch_start_block: u64,
1220 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1221 anchor_leaf: Leaf2<TYPES>,
1222 (start_view, start_epoch): (TYPES::View, Option<TYPES::Epoch>),
1223 (high_qc, next_epoch_high_qc): (
1224 QuorumCertificate2<TYPES>,
1225 Option<NextEpochQuorumCertificate2<TYPES>>,
1226 ),
1227 last_actioned_view: TYPES::View,
1228 saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1229 saved_vid_shares: VidShares<TYPES>,
1230 decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1231 state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1232 ) -> Self {
1233 let anchor_state = Arc::new(TYPES::ValidatedState::from_header(
1234 anchor_leaf.block_header(),
1235 ));
1236 let anchor_state_delta = None;
1237
1238 let initializer = Self {
1239 instance_state,
1240 epoch_height,
1241 epoch_start_block,
1242 anchor_leaf,
1243 anchor_state,
1244 anchor_state_delta,
1245 high_qc,
1246 start_view,
1247 start_epoch,
1248 last_actioned_view,
1249 saved_proposals,
1250 saved_vid_shares,
1251 next_epoch_high_qc,
1252 decided_upgrade_certificate,
1253 undecided_leaves: BTreeMap::new(),
1254 undecided_state: BTreeMap::new(),
1255 state_cert,
1256 start_epoch_info,
1257 };
1258
1259 initializer.update_undecided()
1260 }
1261}
1262
1263async fn load_start_epoch_info<TYPES: NodeType>(
1264 membership: &Arc<RwLock<TYPES::Membership>>,
1265 start_epoch_info: &Vec<InitializerEpochInfo<TYPES>>,
1266 epoch_height: u64,
1267 epoch_start_block: u64,
1268) {
1269 let first_epoch_number =
1270 TYPES::Epoch::new(epoch_from_block_number(epoch_start_block, epoch_height));
1271
1272 tracing::warn!("Calling set_first_epoch for epoch {first_epoch_number}");
1273 membership
1274 .write()
1275 .await
1276 .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
1277
1278 let mut sorted_epoch_info = start_epoch_info.clone();
1279 sorted_epoch_info.sort_by_key(|info| info.epoch);
1280 for epoch_info in sorted_epoch_info {
1281 if let Some(block_header) = &epoch_info.block_header {
1282 tracing::info!("Calling add_epoch_root for epoch {}", epoch_info.epoch);
1283
1284 Membership::add_epoch_root(
1285 Arc::clone(membership),
1286 epoch_info.epoch,
1287 block_header.clone(),
1288 )
1289 .await
1290 .unwrap_or_else(|err| {
1291 tracing::error!(
1293 "Failed to add epoch root for epoch {}: {err}",
1294 epoch_info.epoch
1295 );
1296 });
1297 }
1298 }
1299
1300 for epoch_info in start_epoch_info {
1301 tracing::info!("Calling add_drb_result for epoch {}", epoch_info.epoch);
1302 membership
1303 .write()
1304 .await
1305 .add_drb_result(epoch_info.epoch, epoch_info.drb_result);
1306 }
1307}