1#[cfg(feature = "docs")]
12pub mod documentation;
13use committable::Committable;
14use futures::future::{select, Either};
15use hotshot_types::{
16 drb::{drb_difficulty_selector, DrbResult, INITIAL_DRB_RESULT},
17 epoch_membership::EpochMembershipCoordinator,
18 message::UpgradeLock,
19 simple_certificate::{CertificatePair, LightClientStateUpdateCertificateV2},
20 traits::{
21 block_contents::BlockHeader, election::Membership, network::BroadcastDelay,
22 node_implementation::Versions, signature_key::StateSignatureKey, storage::Storage,
23 },
24 utils::{epoch_from_block_number, is_ge_epoch_root},
25};
26use rand::Rng;
27use vbs::version::StaticVersionType;
28
29pub mod traits;
31pub mod types;
33
34pub mod tasks;
35use hotshot_types::data::QuorumProposalWrapper;
36
37pub mod helpers;
39
40use std::{
41 collections::{BTreeMap, HashMap},
42 num::NonZeroUsize,
43 sync::Arc,
44 time::Duration,
45};
46
47use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
48use async_lock::RwLock;
49use async_trait::async_trait;
50use futures::join;
51use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
52use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
53pub use hotshot_types::error::HotShotError;
56use hotshot_types::{
57 consensus::{
58 Consensus, ConsensusMetricsValue, OuterConsensus, PayloadWithMetadata, VidShares, View,
59 ViewInner,
60 },
61 constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
62 data::Leaf2,
63 event::{EventType, LeafInfo},
64 message::{DataMessage, Message, MessageKind, Proposal},
65 simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
66 storage_metrics::StorageMetricsValue,
67 traits::{
68 consensus_api::ConsensusApi,
69 network::ConnectedNetwork,
70 node_implementation::{ConsensusTime, NodeType},
71 signature_key::SignatureKey,
72 states::ValidatedState,
73 },
74 utils::{genesis_epoch_from_version, option_epoch_from_block_number},
75 HotShotConfig,
76};
77pub use rand;
79use tokio::{spawn, time::sleep};
80use tracing::{debug, instrument, trace};
81
82use crate::{
85 tasks::{add_consensus_tasks, add_network_tasks},
86 traits::NodeImplementation,
87 types::{Event, SystemContextHandle},
88};
89
90pub const H_512: usize = 64;
92pub const H_256: usize = 32;
94
95pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
97 public_key: TYPES::SignatureKey,
99
100 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
102
103 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106 pub config: HotShotConfig<TYPES>,
108
109 pub network: Arc<I::Network>,
111
112 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
114
115 metrics: Arc<ConsensusMetricsValue>,
117
118 consensus: OuterConsensus<TYPES>,
120
121 instance_state: Arc<TYPES::InstanceState>,
123
124 start_view: TYPES::View,
126
127 start_epoch: Option<TYPES::Epoch>,
129
130 output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
132
133 pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
135
136 anchored_leaf: Leaf2<TYPES>,
138
139 #[allow(clippy::type_complexity)]
141 internal_event_stream: (
142 Sender<Arc<HotShotEvent<TYPES>>>,
143 InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
144 ),
145
146 pub id: u64,
148
149 pub storage: I::Storage,
151
152 pub storage_metrics: Arc<StorageMetricsValue>,
154
155 pub upgrade_lock: UpgradeLock<TYPES, V>,
157}
158impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Clone
159 for SystemContext<TYPES, I, V>
160{
161 #![allow(deprecated)]
162 fn clone(&self) -> Self {
163 Self {
164 public_key: self.public_key.clone(),
165 private_key: self.private_key.clone(),
166 state_private_key: self.state_private_key.clone(),
167 config: self.config.clone(),
168 network: Arc::clone(&self.network),
169 membership_coordinator: self.membership_coordinator.clone(),
170 metrics: Arc::clone(&self.metrics),
171 consensus: self.consensus.clone(),
172 instance_state: Arc::clone(&self.instance_state),
173 start_view: self.start_view,
174 start_epoch: self.start_epoch,
175 output_event_stream: self.output_event_stream.clone(),
176 external_event_stream: self.external_event_stream.clone(),
177 anchored_leaf: self.anchored_leaf.clone(),
178 internal_event_stream: self.internal_event_stream.clone(),
179 id: self.id,
180 storage: self.storage.clone(),
181 storage_metrics: Arc::clone(&self.storage_metrics),
182 upgrade_lock: self.upgrade_lock.clone(),
183 }
184 }
185}
186
187impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
188 #![allow(deprecated)]
189 #[allow(clippy::too_many_arguments)]
200 pub async fn new(
201 public_key: TYPES::SignatureKey,
202 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
203 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
204 nonce: u64,
205 config: HotShotConfig<TYPES>,
206 memberships: EpochMembershipCoordinator<TYPES>,
207 network: Arc<I::Network>,
208 initializer: HotShotInitializer<TYPES>,
209 consensus_metrics: ConsensusMetricsValue,
210 storage: I::Storage,
211 storage_metrics: StorageMetricsValue,
212 ) -> Arc<Self> {
213 let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
214 let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
215
216 Self::new_from_channels(
217 public_key,
218 private_key,
219 state_private_key,
220 nonce,
221 config,
222 memberships,
223 network,
224 initializer,
225 consensus_metrics,
226 storage,
227 storage_metrics,
228 internal_chan,
229 external_chan,
230 )
231 .await
232 }
233
234 #[allow(clippy::too_many_arguments, clippy::type_complexity)]
242 pub async fn new_from_channels(
243 public_key: TYPES::SignatureKey,
244 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
245 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
246 nonce: u64,
247 config: HotShotConfig<TYPES>,
248 mut membership_coordinator: EpochMembershipCoordinator<TYPES>,
249 network: Arc<I::Network>,
250 initializer: HotShotInitializer<TYPES>,
251 consensus_metrics: ConsensusMetricsValue,
252 storage: I::Storage,
253 storage_metrics: StorageMetricsValue,
254 internal_channel: (
255 Sender<Arc<HotShotEvent<TYPES>>>,
256 Receiver<Arc<HotShotEvent<TYPES>>>,
257 ),
258 external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
259 ) -> Arc<Self> {
260 debug!("Creating a new hotshot");
261
262 tracing::warn!("Starting consensus with HotShotConfig:\n\n {config:?}");
263
264 let consensus_metrics = Arc::new(consensus_metrics);
265 let storage_metrics = Arc::new(storage_metrics);
266 let anchored_leaf = initializer.anchor_leaf;
267 let instance_state = initializer.instance_state;
268
269 let (internal_tx, internal_rx) = internal_channel;
270 let (mut external_tx, external_rx) = external_channel;
271
272 let mut internal_rx = internal_rx.new_receiver();
273
274 let mut external_rx = external_rx.new_receiver();
275
276 internal_rx.set_overflow(true);
279 external_rx.set_overflow(true);
281
282 membership_coordinator
283 .set_external_channel(external_rx.clone())
284 .await;
285
286 tracing::warn!(
287 "Starting consensus with versions:\n\n Base: {:?}\nUpgrade: {:?}.",
288 V::Base::VERSION,
289 V::Upgrade::VERSION,
290 );
291 tracing::warn!(
292 "Loading previously decided upgrade certificate from storage: {:?}",
293 initializer.decided_upgrade_certificate
294 );
295
296 let upgrade_lock =
297 UpgradeLock::<TYPES, V>::from_certificate(&initializer.decided_upgrade_certificate);
298
299 let current_version = if let Some(cert) = initializer.decided_upgrade_certificate {
300 cert.data.new_version
301 } else {
302 V::Base::VERSION
303 };
304
305 debug!("Setting DRB difficulty selector in membership");
306 let drb_difficulty_selector = drb_difficulty_selector::<_, V>(&config);
307
308 membership_coordinator
309 .set_drb_difficulty_selector(drb_difficulty_selector)
310 .await;
311
312 for da_committee in &config.da_committees {
313 if current_version >= da_committee.start_version {
314 membership_coordinator
315 .membership()
316 .write()
317 .await
318 .add_da_committee(da_committee.start_epoch, da_committee.committee.clone());
319 }
320 }
321
322 let validated_state = initializer.anchor_state;
325
326 load_start_epoch_info(
327 membership_coordinator.membership(),
328 &initializer.start_epoch_info,
329 config.epoch_height,
330 config.epoch_start_block,
331 )
332 .await;
333
334 let epoch = initializer.high_qc.data.block_number.map(|block_number| {
336 TYPES::Epoch::new(epoch_from_block_number(
337 block_number + 1,
338 config.epoch_height,
339 ))
340 });
341
342 let mut validated_state_map = BTreeMap::default();
344 validated_state_map.insert(
345 anchored_leaf.view_number(),
346 View {
347 view_inner: ViewInner::Leaf {
348 leaf: anchored_leaf.commit(),
349 state: Arc::clone(&validated_state),
350 delta: initializer.anchor_state_delta,
351 epoch,
352 },
353 },
354 );
355 for (view_num, inner) in initializer.undecided_state {
356 validated_state_map.insert(view_num, inner);
357 }
358
359 let mut saved_leaves = HashMap::new();
360 let mut saved_payloads = BTreeMap::new();
361 saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
362
363 for (_, leaf) in initializer.undecided_leaves {
364 saved_leaves.insert(leaf.commit(), leaf.clone());
365 }
366 if let Some(payload) = anchored_leaf.block_payload() {
367 let metadata = anchored_leaf.block_header().metadata().clone();
368 saved_payloads.insert(
369 anchored_leaf.view_number(),
370 Arc::new(PayloadWithMetadata { payload, metadata }),
371 );
372 }
373 let high_qc_block_number = initializer.high_qc.data.block_number;
374
375 let consensus = Consensus::new(
376 validated_state_map,
377 Some(initializer.saved_vid_shares),
378 anchored_leaf.view_number(),
379 epoch,
380 anchored_leaf.view_number(),
381 anchored_leaf.view_number(),
382 initializer.last_actioned_view,
383 initializer.saved_proposals,
384 saved_leaves,
385 saved_payloads,
386 initializer.high_qc,
387 initializer.next_epoch_high_qc,
388 Arc::clone(&consensus_metrics),
389 config.epoch_height,
390 initializer.state_cert,
391 config.drb_difficulty,
392 config.drb_upgrade_difficulty,
393 );
394
395 let consensus = Arc::new(RwLock::new(consensus));
396
397 if let Some(epoch) = epoch {
398 tracing::info!(
399 "Triggering catchup for epoch {} and next epoch {}",
400 epoch,
401 epoch + 1
402 );
403 let _ = membership_coordinator
405 .membership_for_epoch(Some(epoch))
406 .await;
407 let _ = membership_coordinator
408 .membership_for_epoch(Some(epoch + 1))
409 .await;
410 if let Some(high_qc_block_number) = high_qc_block_number {
413 if is_ge_epoch_root(high_qc_block_number, config.epoch_height) {
414 let _ = membership_coordinator
415 .stake_table_for_epoch(Some(epoch + 2))
416 .await;
417 }
418 }
419
420 if let Ok(drb_result) = storage.load_drb_result(epoch + 1).await {
421 tracing::error!("Writing DRB result for epoch {}", epoch + 1);
422 if let Ok(mem) = membership_coordinator
423 .stake_table_for_epoch(Some(epoch + 1))
424 .await
425 {
426 mem.add_drb_result(drb_result).await;
427 }
428 }
429 }
430
431 external_tx.set_await_active(false);
434
435 let inner: Arc<SystemContext<TYPES, I, V>> = Arc::new(SystemContext {
436 id: nonce,
437 consensus: OuterConsensus::new(consensus),
438 instance_state: Arc::new(instance_state),
439 public_key,
440 private_key,
441 state_private_key,
442 config,
443 start_view: initializer.start_view,
444 start_epoch: initializer.start_epoch,
445 network,
446 membership_coordinator,
447 metrics: Arc::clone(&consensus_metrics),
448 internal_event_stream: (internal_tx, internal_rx.deactivate()),
449 output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
450 external_event_stream: (external_tx, external_rx.deactivate()),
451 anchored_leaf: anchored_leaf.clone(),
452 storage,
453 storage_metrics,
454 upgrade_lock,
455 });
456
457 inner
458 }
459
460 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
465 pub async fn start_consensus(&self) {
466 #[cfg(all(feature = "rewind", not(debug_assertions)))]
467 compile_error!("Cannot run rewind in production builds!");
468
469 debug!("Starting Consensus");
470 let consensus = self.consensus.read().await;
471
472 let first_epoch = option_epoch_from_block_number::<TYPES>(
473 V::Base::VERSION >= V::Epochs::VERSION,
474 self.config.epoch_start_block,
475 self.config.epoch_height,
476 );
477 let initial_view_change_epoch = self.start_epoch.max(first_epoch);
481 #[allow(clippy::panic)]
482 self.internal_event_stream
483 .0
484 .broadcast_direct(Arc::new(HotShotEvent::ViewChange(
485 self.start_view,
486 initial_view_change_epoch,
487 )))
488 .await
489 .unwrap_or_else(|_| {
490 panic!(
491 "Genesis Broadcast failed; event = ViewChange({:?}, {:?})",
492 self.start_view, initial_view_change_epoch,
493 )
494 });
495
496 let event_stream = self.internal_event_stream.0.clone();
498 let next_view_timeout = self.config.next_view_timeout;
499 let start_view = self.start_view;
500 let start_epoch = self.start_epoch;
501
502 spawn({
505 async move {
506 sleep(Duration::from_millis(next_view_timeout)).await;
507 broadcast_event(
508 Arc::new(HotShotEvent::Timeout(start_view, start_epoch)),
509 &event_stream,
510 )
511 .await;
512 }
513 });
514 #[allow(clippy::panic)]
515 self.internal_event_stream
516 .0
517 .broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
518 consensus.high_qc().clone(),
519 ))))
520 .await
521 .unwrap_or_else(|_| {
522 panic!(
523 "Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
524 consensus.high_qc()
525 )
526 });
527
528 {
529 if self.anchored_leaf.view_number() == TYPES::View::genesis() {
532 let (validated_state, state_delta) =
533 TYPES::ValidatedState::genesis(&self.instance_state);
534
535 let qc = QuorumCertificate2::genesis::<V>(
536 &validated_state,
537 self.instance_state.as_ref(),
538 )
539 .await;
540
541 broadcast_event(
542 Event {
543 view_number: self.anchored_leaf.view_number(),
544 event: EventType::Decide {
545 leaf_chain: Arc::new(vec![LeafInfo::new(
546 self.anchored_leaf.clone(),
547 Arc::new(validated_state),
548 Some(Arc::new(state_delta)),
549 None,
550 None,
551 )]),
552 committing_qc: Arc::new(CertificatePair::non_epoch_change(qc)),
553 deciding_qc: None,
554 block_size: None,
555 },
556 },
557 &self.external_event_stream.0,
558 )
559 .await;
560 }
561 }
562 }
563
564 async fn send_external_event(&self, event: Event<TYPES>) {
566 debug!(?event, "send_external_event");
567 broadcast_event(event, &self.external_event_stream.0).await;
568 }
569
570 #[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
576 pub async fn publish_transaction_async(
577 &self,
578 transaction: TYPES::Transaction,
579 ) -> Result<(), HotShotError<TYPES>> {
580 trace!("Adding transaction to our own queue");
581
582 let api = self.clone();
583
584 let consensus_reader = api.consensus.read().await;
585 let view_number = consensus_reader.cur_view();
586 let epoch = consensus_reader.cur_epoch();
587 drop(consensus_reader);
588
589 let message_kind: DataMessage<TYPES> =
591 DataMessage::SubmitTransaction(transaction.clone(), view_number);
592 let message = Message {
593 sender: api.public_key.clone(),
594 kind: MessageKind::from(message_kind),
595 };
596
597 let serialized_message = self.upgrade_lock.serialize(&message).await.map_err(|err| {
598 HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
599 })?;
600
601 let membership = match api.membership_coordinator.membership_for_epoch(epoch).await {
602 Ok(m) => m,
603 Err(e) => return Err(HotShotError::InvalidState(e.message)),
604 };
605
606 spawn(async move {
607 let memberships_da_committee_members = membership
608 .da_committee_members(view_number)
609 .await
610 .iter()
611 .cloned()
612 .collect();
613
614 join! {
615 api
623 .network.da_broadcast_message(
624 view_number.u64().into(),
625 serialized_message,
626 memberships_da_committee_members,
627 BroadcastDelay::None,
628 ),
629 api
630 .send_external_event(Event {
631 view_number,
632 event: EventType::Transactions {
633 transactions: vec![transaction],
634 },
635 }),
636 }
637 });
638 Ok(())
639 }
640
641 #[must_use]
643 pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
644 Arc::clone(&self.consensus.inner_consensus)
645 }
646
647 pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
649 Arc::clone(&self.instance_state)
650 }
651
652 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
656 pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
657 self.consensus.read().await.decided_leaf()
658 }
659
660 #[must_use]
666 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
667 pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
668 self.consensus.try_read().map(|guard| guard.decided_leaf())
669 }
670
671 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
676 pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
677 Arc::clone(&self.consensus.read().await.decided_state())
678 }
679
680 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
688 pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
689 self.consensus.read().await.state(view).cloned()
690 }
691
692 #[allow(clippy::too_many_arguments)]
706 pub async fn init(
707 public_key: TYPES::SignatureKey,
708 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
709 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
710 node_id: u64,
711 config: HotShotConfig<TYPES>,
712 memberships: EpochMembershipCoordinator<TYPES>,
713 network: Arc<I::Network>,
714 initializer: HotShotInitializer<TYPES>,
715 consensus_metrics: ConsensusMetricsValue,
716 storage: I::Storage,
717 storage_metrics: StorageMetricsValue,
718 ) -> Result<
719 (
720 SystemContextHandle<TYPES, I, V>,
721 Sender<Arc<HotShotEvent<TYPES>>>,
722 Receiver<Arc<HotShotEvent<TYPES>>>,
723 ),
724 HotShotError<TYPES>,
725 > {
726 let hotshot = Self::new(
727 public_key,
728 private_key,
729 state_private_key,
730 node_id,
731 config,
732 memberships,
733 network,
734 initializer,
735 consensus_metrics,
736 storage,
737 storage_metrics,
738 )
739 .await;
740 let handle = Arc::clone(&hotshot).run_tasks().await;
741 let (tx, rx) = hotshot.internal_event_stream.clone();
742
743 Ok((handle, tx, rx.activate()))
744 }
745 #[must_use]
747 pub fn next_view_timeout(&self) -> u64 {
748 self.config.next_view_timeout
749 }
750}
751
752impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
753 pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I, V> {
757 let consensus_registry = ConsensusTaskRegistry::new();
758 let network_registry = NetworkTaskRegistry::new();
759
760 let output_event_stream = self.external_event_stream.clone();
761 let internal_event_stream = self.internal_event_stream.clone();
762
763 let mut handle = SystemContextHandle {
764 consensus_registry,
765 network_registry,
766 output_event_stream: output_event_stream.clone(),
767 internal_event_stream: internal_event_stream.clone(),
768 hotshot: self.clone().into(),
769 storage: self.storage.clone(),
770 network: Arc::clone(&self.network),
771 membership_coordinator: self.membership_coordinator.clone(),
772 epoch_height: self.config.epoch_height,
773 };
774
775 add_network_tasks::<TYPES, I, V>(&mut handle).await;
776 add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
777
778 handle
779 }
780}
781
782type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
784
785#[async_trait]
786pub trait TwinsHandlerState<TYPES, I, V>
788where
789 Self: std::fmt::Debug + Send + Sync,
790 TYPES: NodeType,
791 I: NodeImplementation<TYPES>,
792 V: Versions,
793{
794 async fn send_handler(
796 &mut self,
797 event: &HotShotEvent<TYPES>,
798 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
799
800 async fn recv_handler(
802 &mut self,
803 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
804 ) -> Vec<HotShotEvent<TYPES>>;
805
806 fn fuse_channels(
810 &'static mut self,
811 left: Channel<HotShotEvent<TYPES>>,
812 right: Channel<HotShotEvent<TYPES>>,
813 ) -> Channel<HotShotEvent<TYPES>> {
814 let send_state = Arc::new(RwLock::new(self));
815 let recv_state = Arc::clone(&send_state);
816
817 let (left_sender, mut left_receiver) = (left.0, left.1);
818 let (right_sender, mut right_receiver) = (right.0, right.1);
819
820 let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
822 let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
824 broadcast(EVENT_CHANNEL_SIZE);
825
826 let _recv_loop_handle = spawn(async move {
827 loop {
828 let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
829 Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
830 Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
831 };
832
833 let mut state = recv_state.write().await;
834 let mut result = state.recv_handler(&msg).await;
835
836 while let Some(event) = result.pop() {
837 let _ = sender_to_network.broadcast(event.into()).await;
838 }
839 }
840 });
841
842 let _send_loop_handle = spawn(async move {
843 loop {
844 if let Ok(msg) = receiver_from_network.recv().await {
845 let mut state = send_state.write().await;
846
847 let mut result = state.send_handler(&msg).await;
848
849 while let Some(event) = result.pop() {
850 match event {
851 Either::Left(msg) => {
852 let _ = left_sender.broadcast(msg.into()).await;
853 },
854 Either::Right(msg) => {
855 let _ = right_sender.broadcast(msg.into()).await;
856 },
857 }
858 }
859 }
860 }
861 });
862
863 (network_task_sender, network_task_receiver)
864 }
865
866 #[allow(clippy::too_many_arguments)]
867 async fn spawn_twin_handles(
871 &'static mut self,
872 public_key: TYPES::SignatureKey,
873 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
874 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
875 nonce: u64,
876 config: HotShotConfig<TYPES>,
877 memberships: EpochMembershipCoordinator<TYPES>,
878 network: Arc<I::Network>,
879 initializer: HotShotInitializer<TYPES>,
880 consensus_metrics: ConsensusMetricsValue,
881 storage: I::Storage,
882 storage_metrics: StorageMetricsValue,
883 ) -> (
884 SystemContextHandle<TYPES, I, V>,
885 SystemContextHandle<TYPES, I, V>,
886 ) {
887 let epoch_height = config.epoch_height;
888 let left_system_context = SystemContext::new(
889 public_key.clone(),
890 private_key.clone(),
891 state_private_key.clone(),
892 nonce,
893 config.clone(),
894 memberships.clone(),
895 Arc::clone(&network),
896 initializer.clone(),
897 consensus_metrics.clone(),
898 storage.clone(),
899 storage_metrics.clone(),
900 )
901 .await;
902 let right_system_context = SystemContext::new(
903 public_key,
904 private_key,
905 state_private_key,
906 nonce,
907 config,
908 memberships,
909 network,
910 initializer,
911 consensus_metrics,
912 storage,
913 storage_metrics,
914 )
915 .await;
916
917 let left_consensus_registry = ConsensusTaskRegistry::new();
919 let left_network_registry = NetworkTaskRegistry::new();
920
921 let right_consensus_registry = ConsensusTaskRegistry::new();
922 let right_network_registry = NetworkTaskRegistry::new();
923
924 let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
926 let left_external_event_stream =
927 (left_external_sender, left_external_receiver.deactivate());
928
929 let (right_external_sender, right_external_receiver) =
930 broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
931 let right_external_event_stream =
932 (right_external_sender, right_external_receiver.deactivate());
933
934 let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
936 let left_internal_event_stream = (
937 left_internal_sender.clone(),
938 left_internal_receiver.clone().deactivate(),
939 );
940
941 let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
942 let right_internal_event_stream = (
943 right_internal_sender.clone(),
944 right_internal_receiver.clone().deactivate(),
945 );
946
947 let mut left_handle = SystemContextHandle::<_, I, _> {
949 consensus_registry: left_consensus_registry,
950 network_registry: left_network_registry,
951 output_event_stream: left_external_event_stream.clone(),
952 internal_event_stream: left_internal_event_stream.clone(),
953 hotshot: Arc::clone(&left_system_context),
954 storage: left_system_context.storage.clone(),
955 network: Arc::clone(&left_system_context.network),
956 membership_coordinator: left_system_context.membership_coordinator.clone(),
957 epoch_height,
958 };
959
960 let mut right_handle = SystemContextHandle::<_, I, _> {
961 consensus_registry: right_consensus_registry,
962 network_registry: right_network_registry,
963 output_event_stream: right_external_event_stream.clone(),
964 internal_event_stream: right_internal_event_stream.clone(),
965 hotshot: Arc::clone(&right_system_context),
966 storage: right_system_context.storage.clone(),
967 network: Arc::clone(&right_system_context.network),
968 membership_coordinator: right_system_context.membership_coordinator.clone(),
969 epoch_height,
970 };
971
972 add_consensus_tasks::<TYPES, I, V>(&mut left_handle).await;
974 add_consensus_tasks::<TYPES, I, V>(&mut right_handle).await;
975
976 let fused_internal_event_stream = self.fuse_channels(
978 (left_internal_sender, left_internal_receiver),
979 (right_internal_sender, right_internal_receiver),
980 );
981
982 left_handle.internal_event_stream = (
984 fused_internal_event_stream.0,
985 fused_internal_event_stream.1.deactivate(),
986 );
987
988 add_network_tasks::<TYPES, I, V>(&mut left_handle).await;
990
991 left_handle.internal_event_stream = left_internal_event_stream.clone();
993
994 (left_handle, right_handle)
995 }
996}
997
998#[derive(Debug)]
999pub struct RandomTwinsHandler;
1002
1003#[async_trait]
1004impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
1005 for RandomTwinsHandler
1006{
1007 async fn send_handler(
1008 &mut self,
1009 event: &HotShotEvent<TYPES>,
1010 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1011 let random: bool = rand::thread_rng().gen();
1012
1013 #[allow(clippy::match_bool)]
1014 match random {
1015 true => vec![Either::Left(event.clone())],
1016 false => vec![Either::Right(event.clone())],
1017 }
1018 }
1019
1020 async fn recv_handler(
1021 &mut self,
1022 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1023 ) -> Vec<HotShotEvent<TYPES>> {
1024 match event {
1025 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1026 }
1027 }
1028}
1029
1030#[derive(Debug)]
1031pub struct DoubleTwinsHandler;
1034
1035#[async_trait]
1036impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
1037 for DoubleTwinsHandler
1038{
1039 async fn send_handler(
1040 &mut self,
1041 event: &HotShotEvent<TYPES>,
1042 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1043 vec![Either::Left(event.clone()), Either::Right(event.clone())]
1044 }
1045
1046 async fn recv_handler(
1047 &mut self,
1048 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1049 ) -> Vec<HotShotEvent<TYPES>> {
1050 match event {
1051 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1052 }
1053 }
1054}
1055
1056#[async_trait]
1057impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusApi<TYPES, I>
1058 for SystemContextHandle<TYPES, I, V>
1059{
1060 fn total_nodes(&self) -> NonZeroUsize {
1061 self.hotshot.config.num_nodes_with_stake
1062 }
1063
1064 fn builder_timeout(&self) -> Duration {
1065 self.hotshot.config.builder_timeout
1066 }
1067
1068 async fn send_event(&self, event: Event<TYPES>) {
1069 debug!(?event, "send_event");
1070 broadcast_event(event, &self.hotshot.external_event_stream.0).await;
1071 }
1072
1073 fn public_key(&self) -> &TYPES::SignatureKey {
1074 &self.hotshot.public_key
1075 }
1076
1077 fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
1078 &self.hotshot.private_key
1079 }
1080
1081 fn state_private_key(
1082 &self,
1083 ) -> &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey {
1084 &self.hotshot.state_private_key
1085 }
1086}
1087
1088#[derive(Clone, Debug, PartialEq)]
1089pub struct InitializerEpochInfo<TYPES: NodeType> {
1090 pub epoch: TYPES::Epoch,
1091 pub drb_result: DrbResult,
1092 pub block_header: Option<TYPES::BlockHeader>,
1094}
1095
1096#[derive(Clone, Debug)]
1097pub struct HotShotInitializer<TYPES: NodeType> {
1099 pub instance_state: TYPES::InstanceState,
1101
1102 pub epoch_height: u64,
1104
1105 pub epoch_start_block: u64,
1107
1108 pub anchor_leaf: Leaf2<TYPES>,
1110
1111 pub anchor_state: Arc<TYPES::ValidatedState>,
1113
1114 pub anchor_state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1116
1117 pub start_view: TYPES::View,
1119
1120 pub last_actioned_view: TYPES::View,
1123
1124 pub start_epoch: Option<TYPES::Epoch>,
1126
1127 pub high_qc: QuorumCertificate2<TYPES>,
1131
1132 pub next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
1134
1135 pub saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1137
1138 pub decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1140
1141 pub undecided_leaves: BTreeMap<TYPES::View, Leaf2<TYPES>>,
1144
1145 pub undecided_state: BTreeMap<TYPES::View, View<TYPES>>,
1147
1148 pub saved_vid_shares: VidShares<TYPES>,
1150
1151 pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1153
1154 pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1156}
1157
1158impl<TYPES: NodeType> HotShotInitializer<TYPES> {
1159 pub async fn from_genesis<V: Versions>(
1163 instance_state: TYPES::InstanceState,
1164 epoch_height: u64,
1165 epoch_start_block: u64,
1166 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1167 ) -> Result<Self, HotShotError<TYPES>> {
1168 let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
1169 let high_qc = QuorumCertificate2::genesis::<V>(&validated_state, &instance_state).await;
1170
1171 Ok(Self {
1172 anchor_leaf: Leaf2::genesis::<V>(&validated_state, &instance_state).await,
1173 anchor_state: Arc::new(validated_state),
1174 anchor_state_delta: Some(Arc::new(state_delta)),
1175 start_view: TYPES::View::new(0),
1176 start_epoch: genesis_epoch_from_version::<V, TYPES>(),
1177 last_actioned_view: TYPES::View::new(0),
1178 saved_proposals: BTreeMap::new(),
1179 high_qc,
1180 next_epoch_high_qc: None,
1181 decided_upgrade_certificate: None,
1182 undecided_leaves: BTreeMap::new(),
1183 undecided_state: BTreeMap::new(),
1184 instance_state,
1185 saved_vid_shares: BTreeMap::new(),
1186 epoch_height,
1187 state_cert: None,
1188 epoch_start_block,
1189 start_epoch_info,
1190 })
1191 }
1192
1193 #[must_use]
1195 pub fn update_undecided(self) -> Self {
1196 let mut undecided_leaves = self.undecided_leaves.clone();
1197 let mut undecided_state = self.undecided_state.clone();
1198
1199 for proposal in self.saved_proposals.values() {
1200 if proposal.data.view_number() <= self.anchor_leaf.view_number() {
1202 continue;
1203 }
1204
1205 undecided_leaves.insert(
1206 proposal.data.view_number(),
1207 Leaf2::from_quorum_proposal(&proposal.data),
1208 );
1209 }
1210
1211 for leaf in undecided_leaves.values() {
1212 let view_inner = ViewInner::Leaf {
1213 leaf: leaf.commit(),
1214 state: Arc::new(TYPES::ValidatedState::from_header(leaf.block_header())),
1215 delta: None,
1216 epoch: leaf.epoch(self.epoch_height),
1217 };
1218 let view = View { view_inner };
1219
1220 undecided_state.insert(leaf.view_number(), view);
1221 }
1222
1223 Self {
1224 undecided_leaves,
1225 undecided_state,
1226 ..self
1227 }
1228 }
1229
1230 #[allow(clippy::too_many_arguments)]
1238 pub fn load(
1239 instance_state: TYPES::InstanceState,
1240 epoch_height: u64,
1241 epoch_start_block: u64,
1242 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1243 anchor_leaf: Leaf2<TYPES>,
1244 (start_view, start_epoch): (TYPES::View, Option<TYPES::Epoch>),
1245 (high_qc, next_epoch_high_qc): (
1246 QuorumCertificate2<TYPES>,
1247 Option<NextEpochQuorumCertificate2<TYPES>>,
1248 ),
1249 last_actioned_view: TYPES::View,
1250 saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1251 saved_vid_shares: VidShares<TYPES>,
1252 decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1253 state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1254 ) -> Self {
1255 let anchor_state = Arc::new(TYPES::ValidatedState::from_header(
1256 anchor_leaf.block_header(),
1257 ));
1258 let anchor_state_delta = None;
1259
1260 let initializer = Self {
1261 instance_state,
1262 epoch_height,
1263 epoch_start_block,
1264 anchor_leaf,
1265 anchor_state,
1266 anchor_state_delta,
1267 high_qc,
1268 start_view,
1269 start_epoch,
1270 last_actioned_view,
1271 saved_proposals,
1272 saved_vid_shares,
1273 next_epoch_high_qc,
1274 decided_upgrade_certificate,
1275 undecided_leaves: BTreeMap::new(),
1276 undecided_state: BTreeMap::new(),
1277 state_cert,
1278 start_epoch_info,
1279 };
1280
1281 initializer.update_undecided()
1282 }
1283}
1284
1285async fn load_start_epoch_info<TYPES: NodeType>(
1286 membership: &Arc<RwLock<TYPES::Membership>>,
1287 start_epoch_info: &Vec<InitializerEpochInfo<TYPES>>,
1288 epoch_height: u64,
1289 epoch_start_block: u64,
1290) {
1291 let first_epoch_number =
1292 TYPES::Epoch::new(epoch_from_block_number(epoch_start_block, epoch_height));
1293
1294 tracing::warn!("Calling set_first_epoch for epoch {first_epoch_number}");
1295 membership
1296 .write()
1297 .await
1298 .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
1299
1300 let mut sorted_epoch_info = start_epoch_info.clone();
1301 sorted_epoch_info.sort_by_key(|info| info.epoch);
1302 for epoch_info in sorted_epoch_info {
1303 if let Some(block_header) = &epoch_info.block_header {
1304 tracing::warn!("Calling add_epoch_root for epoch {}", epoch_info.epoch);
1305
1306 Membership::add_epoch_root(Arc::clone(membership), block_header.clone())
1307 .await
1308 .unwrap_or_else(|err| {
1309 tracing::error!(
1311 "Failed to add epoch root for epoch {}: {err}",
1312 epoch_info.epoch
1313 );
1314 });
1315 }
1316 }
1317
1318 for epoch_info in start_epoch_info {
1319 tracing::warn!("Calling add_drb_result for epoch {}", epoch_info.epoch);
1320 membership
1321 .write()
1322 .await
1323 .add_drb_result(epoch_info.epoch, epoch_info.drb_result);
1324 }
1325}