1#[cfg(feature = "docs")]
12pub mod documentation;
13use committable::Committable;
14use futures::future::{select, Either};
15use hotshot_types::{
16 drb::{drb_difficulty_selector, DrbResult, INITIAL_DRB_RESULT},
17 epoch_membership::EpochMembershipCoordinator,
18 message::UpgradeLock,
19 simple_certificate::{CertificatePair, LightClientStateUpdateCertificateV2},
20 traits::{
21 block_contents::BlockHeader, election::Membership, network::BroadcastDelay,
22 node_implementation::Versions, signature_key::StateSignatureKey, storage::Storage,
23 },
24 utils::{epoch_from_block_number, is_ge_epoch_root},
25};
26use rand::Rng;
27use vbs::version::StaticVersionType;
28
29pub mod traits;
31pub mod types;
33
34pub mod tasks;
35use hotshot_types::data::QuorumProposalWrapper;
36
37pub mod helpers;
39
40use std::{
41 collections::{BTreeMap, HashMap},
42 num::NonZeroUsize,
43 sync::Arc,
44 time::Duration,
45};
46
47use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
48use async_lock::RwLock;
49use async_trait::async_trait;
50use futures::join;
51use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
52use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
53pub use hotshot_types::error::HotShotError;
56use hotshot_types::{
57 consensus::{
58 Consensus, ConsensusMetricsValue, OuterConsensus, PayloadWithMetadata, VidShares, View,
59 ViewInner,
60 },
61 constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
62 data::Leaf2,
63 event::{EventType, LeafInfo},
64 message::{DataMessage, Message, MessageKind, Proposal},
65 simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
66 storage_metrics::StorageMetricsValue,
67 traits::{
68 consensus_api::ConsensusApi,
69 network::ConnectedNetwork,
70 node_implementation::{ConsensusTime, NodeType},
71 signature_key::SignatureKey,
72 states::ValidatedState,
73 },
74 utils::{genesis_epoch_from_version, option_epoch_from_block_number},
75 HotShotConfig,
76};
77pub use rand;
79use tokio::{spawn, time::sleep};
80use tracing::{debug, instrument, trace};
81
82use crate::{
85 tasks::{add_consensus_tasks, add_network_tasks},
86 traits::NodeImplementation,
87 types::{Event, SystemContextHandle},
88};
89
90pub const H_512: usize = 64;
92pub const H_256: usize = 32;
94
95pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
97 public_key: TYPES::SignatureKey,
99
100 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
102
103 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106 pub config: HotShotConfig<TYPES>,
108
109 pub network: Arc<I::Network>,
111
112 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
114
115 metrics: Arc<ConsensusMetricsValue>,
117
118 consensus: OuterConsensus<TYPES>,
120
121 instance_state: Arc<TYPES::InstanceState>,
123
124 start_view: TYPES::View,
126
127 start_epoch: Option<TYPES::Epoch>,
129
130 output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
132
133 pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
135
136 anchored_leaf: Leaf2<TYPES>,
138
139 #[allow(clippy::type_complexity)]
141 internal_event_stream: (
142 Sender<Arc<HotShotEvent<TYPES>>>,
143 InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
144 ),
145
146 pub id: u64,
148
149 pub storage: I::Storage,
151
152 pub storage_metrics: Arc<StorageMetricsValue>,
154
155 pub upgrade_lock: UpgradeLock<TYPES, V>,
157}
158impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Clone
159 for SystemContext<TYPES, I, V>
160{
161 #![allow(deprecated)]
162 fn clone(&self) -> Self {
163 Self {
164 public_key: self.public_key.clone(),
165 private_key: self.private_key.clone(),
166 state_private_key: self.state_private_key.clone(),
167 config: self.config.clone(),
168 network: Arc::clone(&self.network),
169 membership_coordinator: self.membership_coordinator.clone(),
170 metrics: Arc::clone(&self.metrics),
171 consensus: self.consensus.clone(),
172 instance_state: Arc::clone(&self.instance_state),
173 start_view: self.start_view,
174 start_epoch: self.start_epoch,
175 output_event_stream: self.output_event_stream.clone(),
176 external_event_stream: self.external_event_stream.clone(),
177 anchored_leaf: self.anchored_leaf.clone(),
178 internal_event_stream: self.internal_event_stream.clone(),
179 id: self.id,
180 storage: self.storage.clone(),
181 storage_metrics: Arc::clone(&self.storage_metrics),
182 upgrade_lock: self.upgrade_lock.clone(),
183 }
184 }
185}
186
187impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
188 #![allow(deprecated)]
189 #[allow(clippy::too_many_arguments)]
200 pub async fn new(
201 public_key: TYPES::SignatureKey,
202 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
203 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
204 nonce: u64,
205 config: HotShotConfig<TYPES>,
206 memberships: EpochMembershipCoordinator<TYPES>,
207 network: Arc<I::Network>,
208 initializer: HotShotInitializer<TYPES>,
209 consensus_metrics: ConsensusMetricsValue,
210 storage: I::Storage,
211 storage_metrics: StorageMetricsValue,
212 ) -> Arc<Self> {
213 let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
214 let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
215
216 Self::new_from_channels(
217 public_key,
218 private_key,
219 state_private_key,
220 nonce,
221 config,
222 memberships,
223 network,
224 initializer,
225 consensus_metrics,
226 storage,
227 storage_metrics,
228 internal_chan,
229 external_chan,
230 )
231 .await
232 }
233
234 #[allow(clippy::too_many_arguments, clippy::type_complexity)]
242 pub async fn new_from_channels(
243 public_key: TYPES::SignatureKey,
244 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
245 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
246 nonce: u64,
247 config: HotShotConfig<TYPES>,
248 mut membership_coordinator: EpochMembershipCoordinator<TYPES>,
249 network: Arc<I::Network>,
250 initializer: HotShotInitializer<TYPES>,
251 consensus_metrics: ConsensusMetricsValue,
252 storage: I::Storage,
253 storage_metrics: StorageMetricsValue,
254 internal_channel: (
255 Sender<Arc<HotShotEvent<TYPES>>>,
256 Receiver<Arc<HotShotEvent<TYPES>>>,
257 ),
258 external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
259 ) -> Arc<Self> {
260 debug!("Creating a new hotshot");
261
262 tracing::warn!("Starting consensus with HotShotConfig:\n\n {config:?}");
263
264 let consensus_metrics = Arc::new(consensus_metrics);
265 let storage_metrics = Arc::new(storage_metrics);
266 let anchored_leaf = initializer.anchor_leaf;
267 let instance_state = initializer.instance_state;
268
269 let (internal_tx, internal_rx) = internal_channel;
270 let (mut external_tx, external_rx) = external_channel;
271
272 let mut internal_rx = internal_rx.new_receiver();
273
274 let mut external_rx = external_rx.new_receiver();
275
276 internal_rx.set_overflow(true);
279 external_rx.set_overflow(true);
281
282 membership_coordinator
283 .set_external_channel(external_rx.clone())
284 .await;
285
286 tracing::warn!(
287 "Starting consensus with versions:\n\n Base: {:?}\nUpgrade: {:?}.",
288 V::Base::VERSION,
289 V::Upgrade::VERSION,
290 );
291 tracing::warn!(
292 "Loading previously decided upgrade certificate from storage: {:?}",
293 initializer.decided_upgrade_certificate
294 );
295
296 let upgrade_lock =
297 UpgradeLock::<TYPES, V>::from_certificate(&initializer.decided_upgrade_certificate);
298
299 let current_version = if let Some(cert) = initializer.decided_upgrade_certificate {
300 cert.data.new_version
301 } else {
302 V::Base::VERSION
303 };
304
305 debug!("Setting DRB difficulty selector in membership");
306 let drb_difficulty_selector = drb_difficulty_selector::<_, V>(&config);
307
308 membership_coordinator
309 .set_drb_difficulty_selector(drb_difficulty_selector)
310 .await;
311
312 for da_committee in &config.da_committees {
313 if current_version >= da_committee.start_version {
314 membership_coordinator
315 .membership()
316 .write()
317 .await
318 .add_da_committee(da_committee.start_epoch, da_committee.committee.clone());
319 }
320 }
321
322 let validated_state = initializer.anchor_state;
325
326 load_start_epoch_info(
327 membership_coordinator.membership(),
328 &initializer.start_epoch_info,
329 config.epoch_height,
330 config.epoch_start_block,
331 )
332 .await;
333
334 let epoch = initializer.high_qc.data.block_number.map(|block_number| {
336 TYPES::Epoch::new(epoch_from_block_number(
337 block_number + 1,
338 config.epoch_height,
339 ))
340 });
341
342 let mut validated_state_map = BTreeMap::default();
344 validated_state_map.insert(
345 anchored_leaf.view_number(),
346 View {
347 view_inner: ViewInner::Leaf {
348 leaf: anchored_leaf.commit(),
349 state: Arc::clone(&validated_state),
350 delta: initializer.anchor_state_delta,
351 epoch,
352 },
353 },
354 );
355 for (view_num, inner) in initializer.undecided_state {
356 validated_state_map.insert(view_num, inner);
357 }
358
359 let mut saved_leaves = HashMap::new();
360 let mut saved_payloads = BTreeMap::new();
361 saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
362
363 for (_, leaf) in initializer.undecided_leaves {
364 saved_leaves.insert(leaf.commit(), leaf.clone());
365 }
366 if let Some(payload) = anchored_leaf.block_payload() {
367 let metadata = anchored_leaf.block_header().metadata().clone();
368 saved_payloads.insert(
369 anchored_leaf.view_number(),
370 Arc::new(PayloadWithMetadata { payload, metadata }),
371 );
372 }
373 let high_qc_block_number = initializer.high_qc.data.block_number;
374
375 let consensus = Consensus::new(
376 validated_state_map,
377 Some(initializer.saved_vid_shares),
378 anchored_leaf.view_number(),
379 epoch,
380 anchored_leaf.view_number(),
381 anchored_leaf.view_number(),
382 initializer.last_actioned_view,
383 initializer.saved_proposals,
384 saved_leaves,
385 saved_payloads,
386 initializer.high_qc,
387 initializer.next_epoch_high_qc,
388 Arc::clone(&consensus_metrics),
389 config.epoch_height,
390 initializer.state_cert,
391 config.drb_difficulty,
392 config.drb_upgrade_difficulty,
393 );
394
395 let consensus = Arc::new(RwLock::new(consensus));
396
397 if let Some(epoch) = epoch {
398 tracing::info!(
399 "Triggering catchup for epoch {} and next epoch {}",
400 epoch,
401 epoch + 1
402 );
403 let _ = membership_coordinator
405 .membership_for_epoch(Some(epoch))
406 .await;
407 let _ = membership_coordinator
408 .membership_for_epoch(Some(epoch + 1))
409 .await;
410 if let Some(high_qc_block_number) = high_qc_block_number {
413 if is_ge_epoch_root(high_qc_block_number, config.epoch_height) {
414 let _ = membership_coordinator
415 .stake_table_for_epoch(Some(epoch + 2))
416 .await;
417 }
418 }
419
420 if let Ok(drb_result) = storage.load_drb_result(epoch + 1).await {
421 tracing::error!("Writing DRB result for epoch {}", epoch + 1);
422 if let Ok(mem) = membership_coordinator
423 .stake_table_for_epoch(Some(epoch + 1))
424 .await
425 {
426 mem.add_drb_result(drb_result).await;
427 }
428 }
429 }
430
431 external_tx.set_await_active(false);
434
435 let inner: Arc<SystemContext<TYPES, I, V>> = Arc::new(SystemContext {
436 id: nonce,
437 consensus: OuterConsensus::new(consensus),
438 instance_state: Arc::new(instance_state),
439 public_key,
440 private_key,
441 state_private_key,
442 config,
443 start_view: initializer.start_view,
444 start_epoch: initializer.start_epoch,
445 network,
446 membership_coordinator,
447 metrics: Arc::clone(&consensus_metrics),
448 internal_event_stream: (internal_tx, internal_rx.deactivate()),
449 output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
450 external_event_stream: (external_tx, external_rx.deactivate()),
451 anchored_leaf: anchored_leaf.clone(),
452 storage,
453 storage_metrics,
454 upgrade_lock,
455 });
456
457 inner
458 }
459
460 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
465 pub async fn start_consensus(&self) {
466 #[cfg(all(feature = "rewind", not(debug_assertions)))]
467 compile_error!("Cannot run rewind in production builds!");
468
469 debug!("Starting Consensus");
470 let consensus = self.consensus.read().await;
471
472 let first_epoch = option_epoch_from_block_number::<TYPES>(
473 V::Base::VERSION >= V::Epochs::VERSION,
474 self.config.epoch_start_block,
475 self.config.epoch_height,
476 );
477 let initial_view_change_epoch = self.start_epoch.max(first_epoch);
481 #[allow(clippy::panic)]
482 self.internal_event_stream
483 .0
484 .broadcast_direct(Arc::new(HotShotEvent::ViewChange(
485 self.start_view,
486 initial_view_change_epoch,
487 )))
488 .await
489 .unwrap_or_else(|_| {
490 panic!(
491 "Genesis Broadcast failed; event = ViewChange({:?}, {:?})",
492 self.start_view, initial_view_change_epoch,
493 )
494 });
495
496 let event_stream = self.internal_event_stream.0.clone();
498 let next_view_timeout = self.config.next_view_timeout;
499 let start_view = self.start_view;
500 let start_epoch = self.start_epoch;
501
502 spawn({
505 async move {
506 sleep(Duration::from_millis(next_view_timeout)).await;
507 broadcast_event(
508 Arc::new(HotShotEvent::Timeout(start_view, start_epoch)),
509 &event_stream,
510 )
511 .await;
512 }
513 });
514 #[allow(clippy::panic)]
515 self.internal_event_stream
516 .0
517 .broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
518 consensus.high_qc().clone(),
519 ))))
520 .await
521 .unwrap_or_else(|_| {
522 panic!(
523 "Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
524 consensus.high_qc()
525 )
526 });
527
528 {
529 if self.anchored_leaf.view_number() == TYPES::View::genesis() {
532 let (validated_state, state_delta) =
533 TYPES::ValidatedState::genesis(&self.instance_state);
534
535 let qc = QuorumCertificate2::genesis::<V>(
536 &validated_state,
537 self.instance_state.as_ref(),
538 )
539 .await;
540
541 broadcast_event(
542 Event {
543 view_number: self.anchored_leaf.view_number(),
544 event: EventType::Decide {
545 leaf_chain: Arc::new(vec![LeafInfo::new(
546 self.anchored_leaf.clone(),
547 Arc::new(validated_state),
548 Some(Arc::new(state_delta)),
549 None,
550 None,
551 )]),
552 committing_qc: Arc::new(CertificatePair::non_epoch_change(qc)),
553 deciding_qc: None,
554 block_size: None,
555 },
556 },
557 &self.external_event_stream.0,
558 )
559 .await;
560 }
561 }
562 }
563
564 async fn send_external_event(&self, event: Event<TYPES>) {
566 debug!(?event, "send_external_event");
567 broadcast_event(event, &self.external_event_stream.0).await;
568 }
569
570 #[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
576 pub async fn publish_transaction_async(
577 &self,
578 transaction: TYPES::Transaction,
579 ) -> Result<(), HotShotError<TYPES>> {
580 trace!("Adding transaction to our own queue");
581
582 let api = self.clone();
583
584 let consensus_reader = api.consensus.read().await;
585 let view_number = consensus_reader.cur_view();
586 let epoch = consensus_reader.cur_epoch();
587 drop(consensus_reader);
588
589 let message_kind: DataMessage<TYPES> =
591 DataMessage::SubmitTransaction(transaction.clone(), view_number);
592 let message = Message {
593 sender: api.public_key.clone(),
594 kind: MessageKind::from(message_kind),
595 };
596
597 let serialized_message = self.upgrade_lock.serialize(&message).await.map_err(|err| {
598 HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
599 })?;
600
601 let membership = match api.membership_coordinator.membership_for_epoch(epoch).await {
602 Ok(m) => m,
603 Err(e) => return Err(HotShotError::InvalidState(e.message)),
604 };
605
606 spawn(async move {
607 let memberships_da_committee_members = membership
608 .da_committee_members(view_number)
609 .await
610 .iter()
611 .cloned()
612 .collect();
613
614 join! {
615 api
623 .network.da_broadcast_message(
624 serialized_message,
625 memberships_da_committee_members,
626 BroadcastDelay::None,
627 ),
628 api
629 .send_external_event(Event {
630 view_number,
631 event: EventType::Transactions {
632 transactions: vec![transaction],
633 },
634 }),
635 }
636 });
637 Ok(())
638 }
639
640 #[must_use]
642 pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
643 Arc::clone(&self.consensus.inner_consensus)
644 }
645
646 pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
648 Arc::clone(&self.instance_state)
649 }
650
651 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
655 pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
656 self.consensus.read().await.decided_leaf()
657 }
658
659 #[must_use]
665 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
666 pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
667 self.consensus.try_read().map(|guard| guard.decided_leaf())
668 }
669
670 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
675 pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
676 Arc::clone(&self.consensus.read().await.decided_state())
677 }
678
679 #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
687 pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
688 self.consensus.read().await.state(view).cloned()
689 }
690
691 #[allow(clippy::too_many_arguments)]
705 pub async fn init(
706 public_key: TYPES::SignatureKey,
707 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
708 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
709 node_id: u64,
710 config: HotShotConfig<TYPES>,
711 memberships: EpochMembershipCoordinator<TYPES>,
712 network: Arc<I::Network>,
713 initializer: HotShotInitializer<TYPES>,
714 consensus_metrics: ConsensusMetricsValue,
715 storage: I::Storage,
716 storage_metrics: StorageMetricsValue,
717 ) -> Result<
718 (
719 SystemContextHandle<TYPES, I, V>,
720 Sender<Arc<HotShotEvent<TYPES>>>,
721 Receiver<Arc<HotShotEvent<TYPES>>>,
722 ),
723 HotShotError<TYPES>,
724 > {
725 let hotshot = Self::new(
726 public_key,
727 private_key,
728 state_private_key,
729 node_id,
730 config,
731 memberships,
732 network,
733 initializer,
734 consensus_metrics,
735 storage,
736 storage_metrics,
737 )
738 .await;
739 let handle = Arc::clone(&hotshot).run_tasks().await;
740 let (tx, rx) = hotshot.internal_event_stream.clone();
741
742 Ok((handle, tx, rx.activate()))
743 }
744 #[must_use]
746 pub fn next_view_timeout(&self) -> u64 {
747 self.config.next_view_timeout
748 }
749}
750
751impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
752 pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I, V> {
756 let consensus_registry = ConsensusTaskRegistry::new();
757 let network_registry = NetworkTaskRegistry::new();
758
759 let output_event_stream = self.external_event_stream.clone();
760 let internal_event_stream = self.internal_event_stream.clone();
761
762 let mut handle = SystemContextHandle {
763 consensus_registry,
764 network_registry,
765 output_event_stream: output_event_stream.clone(),
766 internal_event_stream: internal_event_stream.clone(),
767 hotshot: self.clone().into(),
768 storage: self.storage.clone(),
769 network: Arc::clone(&self.network),
770 membership_coordinator: self.membership_coordinator.clone(),
771 epoch_height: self.config.epoch_height,
772 };
773
774 add_network_tasks::<TYPES, I, V>(&mut handle).await;
775 add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
776
777 handle
778 }
779}
780
781type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
783
784#[async_trait]
785pub trait TwinsHandlerState<TYPES, I, V>
787where
788 Self: std::fmt::Debug + Send + Sync,
789 TYPES: NodeType,
790 I: NodeImplementation<TYPES>,
791 V: Versions,
792{
793 async fn send_handler(
795 &mut self,
796 event: &HotShotEvent<TYPES>,
797 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
798
799 async fn recv_handler(
801 &mut self,
802 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
803 ) -> Vec<HotShotEvent<TYPES>>;
804
805 fn fuse_channels(
809 &'static mut self,
810 left: Channel<HotShotEvent<TYPES>>,
811 right: Channel<HotShotEvent<TYPES>>,
812 ) -> Channel<HotShotEvent<TYPES>> {
813 let send_state = Arc::new(RwLock::new(self));
814 let recv_state = Arc::clone(&send_state);
815
816 let (left_sender, mut left_receiver) = (left.0, left.1);
817 let (right_sender, mut right_receiver) = (right.0, right.1);
818
819 let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
821 let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
823 broadcast(EVENT_CHANNEL_SIZE);
824
825 let _recv_loop_handle = spawn(async move {
826 loop {
827 let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
828 Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
829 Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
830 };
831
832 let mut state = recv_state.write().await;
833 let mut result = state.recv_handler(&msg).await;
834
835 while let Some(event) = result.pop() {
836 let _ = sender_to_network.broadcast(event.into()).await;
837 }
838 }
839 });
840
841 let _send_loop_handle = spawn(async move {
842 loop {
843 if let Ok(msg) = receiver_from_network.recv().await {
844 let mut state = send_state.write().await;
845
846 let mut result = state.send_handler(&msg).await;
847
848 while let Some(event) = result.pop() {
849 match event {
850 Either::Left(msg) => {
851 let _ = left_sender.broadcast(msg.into()).await;
852 },
853 Either::Right(msg) => {
854 let _ = right_sender.broadcast(msg.into()).await;
855 },
856 }
857 }
858 }
859 }
860 });
861
862 (network_task_sender, network_task_receiver)
863 }
864
865 #[allow(clippy::too_many_arguments)]
866 async fn spawn_twin_handles(
870 &'static mut self,
871 public_key: TYPES::SignatureKey,
872 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
873 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
874 nonce: u64,
875 config: HotShotConfig<TYPES>,
876 memberships: EpochMembershipCoordinator<TYPES>,
877 network: Arc<I::Network>,
878 initializer: HotShotInitializer<TYPES>,
879 consensus_metrics: ConsensusMetricsValue,
880 storage: I::Storage,
881 storage_metrics: StorageMetricsValue,
882 ) -> (
883 SystemContextHandle<TYPES, I, V>,
884 SystemContextHandle<TYPES, I, V>,
885 ) {
886 let epoch_height = config.epoch_height;
887 let left_system_context = SystemContext::new(
888 public_key.clone(),
889 private_key.clone(),
890 state_private_key.clone(),
891 nonce,
892 config.clone(),
893 memberships.clone(),
894 Arc::clone(&network),
895 initializer.clone(),
896 consensus_metrics.clone(),
897 storage.clone(),
898 storage_metrics.clone(),
899 )
900 .await;
901 let right_system_context = SystemContext::new(
902 public_key,
903 private_key,
904 state_private_key,
905 nonce,
906 config,
907 memberships,
908 network,
909 initializer,
910 consensus_metrics,
911 storage,
912 storage_metrics,
913 )
914 .await;
915
916 let left_consensus_registry = ConsensusTaskRegistry::new();
918 let left_network_registry = NetworkTaskRegistry::new();
919
920 let right_consensus_registry = ConsensusTaskRegistry::new();
921 let right_network_registry = NetworkTaskRegistry::new();
922
923 let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
925 let left_external_event_stream =
926 (left_external_sender, left_external_receiver.deactivate());
927
928 let (right_external_sender, right_external_receiver) =
929 broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
930 let right_external_event_stream =
931 (right_external_sender, right_external_receiver.deactivate());
932
933 let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
935 let left_internal_event_stream = (
936 left_internal_sender.clone(),
937 left_internal_receiver.clone().deactivate(),
938 );
939
940 let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
941 let right_internal_event_stream = (
942 right_internal_sender.clone(),
943 right_internal_receiver.clone().deactivate(),
944 );
945
946 let mut left_handle = SystemContextHandle::<_, I, _> {
948 consensus_registry: left_consensus_registry,
949 network_registry: left_network_registry,
950 output_event_stream: left_external_event_stream.clone(),
951 internal_event_stream: left_internal_event_stream.clone(),
952 hotshot: Arc::clone(&left_system_context),
953 storage: left_system_context.storage.clone(),
954 network: Arc::clone(&left_system_context.network),
955 membership_coordinator: left_system_context.membership_coordinator.clone(),
956 epoch_height,
957 };
958
959 let mut right_handle = SystemContextHandle::<_, I, _> {
960 consensus_registry: right_consensus_registry,
961 network_registry: right_network_registry,
962 output_event_stream: right_external_event_stream.clone(),
963 internal_event_stream: right_internal_event_stream.clone(),
964 hotshot: Arc::clone(&right_system_context),
965 storage: right_system_context.storage.clone(),
966 network: Arc::clone(&right_system_context.network),
967 membership_coordinator: right_system_context.membership_coordinator.clone(),
968 epoch_height,
969 };
970
971 add_consensus_tasks::<TYPES, I, V>(&mut left_handle).await;
973 add_consensus_tasks::<TYPES, I, V>(&mut right_handle).await;
974
975 let fused_internal_event_stream = self.fuse_channels(
977 (left_internal_sender, left_internal_receiver),
978 (right_internal_sender, right_internal_receiver),
979 );
980
981 left_handle.internal_event_stream = (
983 fused_internal_event_stream.0,
984 fused_internal_event_stream.1.deactivate(),
985 );
986
987 add_network_tasks::<TYPES, I, V>(&mut left_handle).await;
989
990 left_handle.internal_event_stream = left_internal_event_stream.clone();
992
993 (left_handle, right_handle)
994 }
995}
996
997#[derive(Debug)]
998pub struct RandomTwinsHandler;
1001
1002#[async_trait]
1003impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
1004 for RandomTwinsHandler
1005{
1006 async fn send_handler(
1007 &mut self,
1008 event: &HotShotEvent<TYPES>,
1009 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1010 let random: bool = rand::thread_rng().gen();
1011
1012 #[allow(clippy::match_bool)]
1013 match random {
1014 true => vec![Either::Left(event.clone())],
1015 false => vec![Either::Right(event.clone())],
1016 }
1017 }
1018
1019 async fn recv_handler(
1020 &mut self,
1021 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1022 ) -> Vec<HotShotEvent<TYPES>> {
1023 match event {
1024 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1025 }
1026 }
1027}
1028
1029#[derive(Debug)]
1030pub struct DoubleTwinsHandler;
1033
1034#[async_trait]
1035impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
1036 for DoubleTwinsHandler
1037{
1038 async fn send_handler(
1039 &mut self,
1040 event: &HotShotEvent<TYPES>,
1041 ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1042 vec![Either::Left(event.clone()), Either::Right(event.clone())]
1043 }
1044
1045 async fn recv_handler(
1046 &mut self,
1047 event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1048 ) -> Vec<HotShotEvent<TYPES>> {
1049 match event {
1050 Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1051 }
1052 }
1053}
1054
1055#[async_trait]
1056impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusApi<TYPES, I>
1057 for SystemContextHandle<TYPES, I, V>
1058{
1059 fn total_nodes(&self) -> NonZeroUsize {
1060 self.hotshot.config.num_nodes_with_stake
1061 }
1062
1063 fn builder_timeout(&self) -> Duration {
1064 self.hotshot.config.builder_timeout
1065 }
1066
1067 async fn send_event(&self, event: Event<TYPES>) {
1068 debug!(?event, "send_event");
1069 broadcast_event(event, &self.hotshot.external_event_stream.0).await;
1070 }
1071
1072 fn public_key(&self) -> &TYPES::SignatureKey {
1073 &self.hotshot.public_key
1074 }
1075
1076 fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
1077 &self.hotshot.private_key
1078 }
1079
1080 fn state_private_key(
1081 &self,
1082 ) -> &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey {
1083 &self.hotshot.state_private_key
1084 }
1085}
1086
1087#[derive(Clone, Debug, PartialEq)]
1088pub struct InitializerEpochInfo<TYPES: NodeType> {
1089 pub epoch: TYPES::Epoch,
1090 pub drb_result: DrbResult,
1091 pub block_header: Option<TYPES::BlockHeader>,
1093}
1094
1095#[derive(Clone, Debug)]
1096pub struct HotShotInitializer<TYPES: NodeType> {
1098 pub instance_state: TYPES::InstanceState,
1100
1101 pub epoch_height: u64,
1103
1104 pub epoch_start_block: u64,
1106
1107 pub anchor_leaf: Leaf2<TYPES>,
1109
1110 pub anchor_state: Arc<TYPES::ValidatedState>,
1112
1113 pub anchor_state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1115
1116 pub start_view: TYPES::View,
1118
1119 pub last_actioned_view: TYPES::View,
1122
1123 pub start_epoch: Option<TYPES::Epoch>,
1125
1126 pub high_qc: QuorumCertificate2<TYPES>,
1130
1131 pub next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
1133
1134 pub saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1136
1137 pub decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1139
1140 pub undecided_leaves: BTreeMap<TYPES::View, Leaf2<TYPES>>,
1143
1144 pub undecided_state: BTreeMap<TYPES::View, View<TYPES>>,
1146
1147 pub saved_vid_shares: VidShares<TYPES>,
1149
1150 pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1152
1153 pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1155}
1156
1157impl<TYPES: NodeType> HotShotInitializer<TYPES> {
1158 pub async fn from_genesis<V: Versions>(
1162 instance_state: TYPES::InstanceState,
1163 epoch_height: u64,
1164 epoch_start_block: u64,
1165 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1166 ) -> Result<Self, HotShotError<TYPES>> {
1167 let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
1168 let high_qc = QuorumCertificate2::genesis::<V>(&validated_state, &instance_state).await;
1169
1170 Ok(Self {
1171 anchor_leaf: Leaf2::genesis::<V>(&validated_state, &instance_state).await,
1172 anchor_state: Arc::new(validated_state),
1173 anchor_state_delta: Some(Arc::new(state_delta)),
1174 start_view: TYPES::View::new(0),
1175 start_epoch: genesis_epoch_from_version::<V, TYPES>(),
1176 last_actioned_view: TYPES::View::new(0),
1177 saved_proposals: BTreeMap::new(),
1178 high_qc,
1179 next_epoch_high_qc: None,
1180 decided_upgrade_certificate: None,
1181 undecided_leaves: BTreeMap::new(),
1182 undecided_state: BTreeMap::new(),
1183 instance_state,
1184 saved_vid_shares: BTreeMap::new(),
1185 epoch_height,
1186 state_cert: None,
1187 epoch_start_block,
1188 start_epoch_info,
1189 })
1190 }
1191
1192 #[must_use]
1194 pub fn update_undecided(self) -> Self {
1195 let mut undecided_leaves = self.undecided_leaves.clone();
1196 let mut undecided_state = self.undecided_state.clone();
1197
1198 for proposal in self.saved_proposals.values() {
1199 if proposal.data.view_number() <= self.anchor_leaf.view_number() {
1201 continue;
1202 }
1203
1204 undecided_leaves.insert(
1205 proposal.data.view_number(),
1206 Leaf2::from_quorum_proposal(&proposal.data),
1207 );
1208 }
1209
1210 for leaf in undecided_leaves.values() {
1211 let view_inner = ViewInner::Leaf {
1212 leaf: leaf.commit(),
1213 state: Arc::new(TYPES::ValidatedState::from_header(leaf.block_header())),
1214 delta: None,
1215 epoch: leaf.epoch(self.epoch_height),
1216 };
1217 let view = View { view_inner };
1218
1219 undecided_state.insert(leaf.view_number(), view);
1220 }
1221
1222 Self {
1223 undecided_leaves,
1224 undecided_state,
1225 ..self
1226 }
1227 }
1228
1229 #[allow(clippy::too_many_arguments)]
1237 pub fn load(
1238 instance_state: TYPES::InstanceState,
1239 epoch_height: u64,
1240 epoch_start_block: u64,
1241 start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1242 anchor_leaf: Leaf2<TYPES>,
1243 (start_view, start_epoch): (TYPES::View, Option<TYPES::Epoch>),
1244 (high_qc, next_epoch_high_qc): (
1245 QuorumCertificate2<TYPES>,
1246 Option<NextEpochQuorumCertificate2<TYPES>>,
1247 ),
1248 last_actioned_view: TYPES::View,
1249 saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1250 saved_vid_shares: VidShares<TYPES>,
1251 decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1252 state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1253 ) -> Self {
1254 let anchor_state = Arc::new(TYPES::ValidatedState::from_header(
1255 anchor_leaf.block_header(),
1256 ));
1257 let anchor_state_delta = None;
1258
1259 let initializer = Self {
1260 instance_state,
1261 epoch_height,
1262 epoch_start_block,
1263 anchor_leaf,
1264 anchor_state,
1265 anchor_state_delta,
1266 high_qc,
1267 start_view,
1268 start_epoch,
1269 last_actioned_view,
1270 saved_proposals,
1271 saved_vid_shares,
1272 next_epoch_high_qc,
1273 decided_upgrade_certificate,
1274 undecided_leaves: BTreeMap::new(),
1275 undecided_state: BTreeMap::new(),
1276 state_cert,
1277 start_epoch_info,
1278 };
1279
1280 initializer.update_undecided()
1281 }
1282}
1283
1284async fn load_start_epoch_info<TYPES: NodeType>(
1285 membership: &Arc<RwLock<TYPES::Membership>>,
1286 start_epoch_info: &Vec<InitializerEpochInfo<TYPES>>,
1287 epoch_height: u64,
1288 epoch_start_block: u64,
1289) {
1290 let first_epoch_number =
1291 TYPES::Epoch::new(epoch_from_block_number(epoch_start_block, epoch_height));
1292
1293 tracing::warn!("Calling set_first_epoch for epoch {first_epoch_number}");
1294 membership
1295 .write()
1296 .await
1297 .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
1298
1299 let mut sorted_epoch_info = start_epoch_info.clone();
1300 sorted_epoch_info.sort_by_key(|info| info.epoch);
1301 for epoch_info in sorted_epoch_info {
1302 if let Some(block_header) = &epoch_info.block_header {
1303 tracing::warn!("Calling add_epoch_root for epoch {}", epoch_info.epoch);
1304
1305 Membership::add_epoch_root(Arc::clone(membership), block_header.clone())
1306 .await
1307 .unwrap_or_else(|err| {
1308 tracing::error!(
1310 "Failed to add epoch root for epoch {}: {err}",
1311 epoch_info.epoch
1312 );
1313 });
1314 }
1315 }
1316
1317 for epoch_info in start_epoch_info {
1318 tracing::warn!("Calling add_drb_result for epoch {}", epoch_info.epoch);
1319 membership
1320 .write()
1321 .await
1322 .add_drb_result(epoch_info.epoch, epoch_info.drb_result);
1323 }
1324}