hotshot/
lib.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides a generic rust implementation of the `HotShot` BFT protocol
8//!
9
10// Documentation module
11#[cfg(feature = "docs")]
12pub mod documentation;
13use committable::Committable;
14use futures::future::{select, Either};
15use hotshot_types::{
16    drb::{drb_difficulty_selector, DrbResult, INITIAL_DRB_RESULT},
17    epoch_membership::EpochMembershipCoordinator,
18    message::UpgradeLock,
19    simple_certificate::LightClientStateUpdateCertificate,
20    traits::{
21        block_contents::BlockHeader, election::Membership, network::BroadcastDelay,
22        node_implementation::Versions, signature_key::StateSignatureKey,
23    },
24    utils::epoch_from_block_number,
25};
26use rand::Rng;
27use vbs::version::StaticVersionType;
28
29/// Contains traits consumed by [`SystemContext`]
30pub mod traits;
31/// Contains types used by the crate
32pub mod types;
33
34pub mod tasks;
35use hotshot_types::data::QuorumProposalWrapper;
36
37/// Contains helper functions for the crate
38pub mod helpers;
39
40use std::{
41    collections::{BTreeMap, HashMap},
42    num::NonZeroUsize,
43    sync::Arc,
44    time::Duration,
45};
46
47use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
48use async_lock::RwLock;
49use async_trait::async_trait;
50use futures::join;
51use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
52use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
53// Internal
54/// Reexport error type
55pub use hotshot_types::error::HotShotError;
56use hotshot_types::{
57    consensus::{
58        Consensus, ConsensusMetricsValue, OuterConsensus, PayloadWithMetadata, VidShares, View,
59        ViewInner,
60    },
61    constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
62    data::Leaf2,
63    event::{EventType, LeafInfo},
64    message::{DataMessage, Message, MessageKind, Proposal},
65    simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
66    storage_metrics::StorageMetricsValue,
67    traits::{
68        consensus_api::ConsensusApi,
69        network::ConnectedNetwork,
70        node_implementation::{ConsensusTime, NodeType},
71        signature_key::SignatureKey,
72        states::ValidatedState,
73    },
74    utils::{genesis_epoch_from_version, option_epoch_from_block_number},
75    HotShotConfig,
76};
77/// Reexport rand crate
78pub use rand;
79use tokio::{spawn, time::sleep};
80use tracing::{debug, instrument, trace};
81
82// -- Rexports
83// External
84use crate::{
85    tasks::{add_consensus_tasks, add_network_tasks},
86    traits::NodeImplementation,
87    types::{Event, SystemContextHandle},
88};
89
90/// Length, in bytes, of a 512 bit hash
91pub const H_512: usize = 64;
92/// Length, in bytes, of a 256 bit hash
93pub const H_256: usize = 32;
94
95/// Holds the state needed to participate in `HotShot` consensus
96pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
97    /// The public key of this node
98    public_key: TYPES::SignatureKey,
99
100    /// The private key of this node
101    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
102
103    /// The private key to sign the light client state
104    state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106    /// Configuration items for this hotshot instance
107    pub config: HotShotConfig<TYPES>,
108
109    /// The underlying network
110    pub network: Arc<I::Network>,
111
112    /// Memberships used by consensus
113    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
114
115    /// the metrics that the implementor is using.
116    metrics: Arc<ConsensusMetricsValue>,
117
118    /// The hotstuff implementation
119    consensus: OuterConsensus<TYPES>,
120
121    /// Immutable instance state
122    instance_state: Arc<TYPES::InstanceState>,
123
124    /// The view to enter when first starting consensus
125    start_view: TYPES::View,
126
127    /// The epoch to enter when first starting consensus
128    start_epoch: Option<TYPES::Epoch>,
129
130    /// Access to the output event stream.
131    output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
132
133    /// External event stream for communication with the application.
134    pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
135
136    /// Anchored leaf provided by the initializer.
137    anchored_leaf: Leaf2<TYPES>,
138
139    /// access to the internal event stream, in case we need to, say, shut something down
140    #[allow(clippy::type_complexity)]
141    internal_event_stream: (
142        Sender<Arc<HotShotEvent<TYPES>>>,
143        InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
144    ),
145
146    /// uid for instrumentation
147    pub id: u64,
148
149    /// Reference to the internal storage for consensus datum.
150    pub storage: I::Storage,
151
152    /// Storage metrics
153    pub storage_metrics: Arc<StorageMetricsValue>,
154
155    /// shared lock for upgrade information
156    pub upgrade_lock: UpgradeLock<TYPES, V>,
157}
158impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Clone
159    for SystemContext<TYPES, I, V>
160{
161    #![allow(deprecated)]
162    fn clone(&self) -> Self {
163        Self {
164            public_key: self.public_key.clone(),
165            private_key: self.private_key.clone(),
166            state_private_key: self.state_private_key.clone(),
167            config: self.config.clone(),
168            network: Arc::clone(&self.network),
169            membership_coordinator: self.membership_coordinator.clone(),
170            metrics: Arc::clone(&self.metrics),
171            consensus: self.consensus.clone(),
172            instance_state: Arc::clone(&self.instance_state),
173            start_view: self.start_view,
174            start_epoch: self.start_epoch,
175            output_event_stream: self.output_event_stream.clone(),
176            external_event_stream: self.external_event_stream.clone(),
177            anchored_leaf: self.anchored_leaf.clone(),
178            internal_event_stream: self.internal_event_stream.clone(),
179            id: self.id,
180            storage: self.storage.clone(),
181            storage_metrics: Arc::clone(&self.storage_metrics),
182            upgrade_lock: self.upgrade_lock.clone(),
183        }
184    }
185}
186
187impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
188    #![allow(deprecated)]
189    /// Creates a new [`Arc<SystemContext>`] with the given configuration options.
190    ///
191    /// To do a full initialization, use `fn init` instead, which will set up background tasks as
192    /// well.
193    ///
194    /// Use this instead of `init` if you want to start the tasks manually
195    ///
196    /// # Panics
197    ///
198    /// Panics if storage migration fails.
199    #[allow(clippy::too_many_arguments)]
200    pub async fn new(
201        public_key: TYPES::SignatureKey,
202        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
203        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
204        nonce: u64,
205        config: HotShotConfig<TYPES>,
206        memberships: EpochMembershipCoordinator<TYPES>,
207        network: Arc<I::Network>,
208        initializer: HotShotInitializer<TYPES>,
209        consensus_metrics: ConsensusMetricsValue,
210        storage: I::Storage,
211        storage_metrics: StorageMetricsValue,
212    ) -> Arc<Self> {
213        let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
214        let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
215
216        Self::new_from_channels(
217            public_key,
218            private_key,
219            state_private_key,
220            nonce,
221            config,
222            memberships,
223            network,
224            initializer,
225            consensus_metrics,
226            storage,
227            storage_metrics,
228            internal_chan,
229            external_chan,
230        )
231        .await
232    }
233
234    /// Creates a new [`Arc<SystemContext>`] with the given configuration options.
235    ///
236    /// To do a full initialization, use `fn init` instead, which will set up background tasks as
237    /// well.
238    ///
239    /// Use this function if you want to use some preexisting channels and to spin up the tasks
240    /// and start consensus manually.  Mostly useful for tests
241    #[allow(clippy::too_many_arguments, clippy::type_complexity)]
242    pub async fn new_from_channels(
243        public_key: TYPES::SignatureKey,
244        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
245        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
246        nonce: u64,
247        config: HotShotConfig<TYPES>,
248        membership_coordinator: EpochMembershipCoordinator<TYPES>,
249        network: Arc<I::Network>,
250        initializer: HotShotInitializer<TYPES>,
251        consensus_metrics: ConsensusMetricsValue,
252        storage: I::Storage,
253        storage_metrics: StorageMetricsValue,
254        internal_channel: (
255            Sender<Arc<HotShotEvent<TYPES>>>,
256            Receiver<Arc<HotShotEvent<TYPES>>>,
257        ),
258        external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
259    ) -> Arc<Self> {
260        debug!("Creating a new hotshot");
261
262        tracing::warn!("Starting consensus with HotShotConfig:\n\n {config:?}");
263
264        let consensus_metrics = Arc::new(consensus_metrics);
265        let storage_metrics = Arc::new(storage_metrics);
266        let anchored_leaf = initializer.anchor_leaf;
267        let instance_state = initializer.instance_state;
268
269        let (internal_tx, mut internal_rx) = internal_channel;
270        let (mut external_tx, mut external_rx) = external_channel;
271
272        tracing::warn!(
273            "Starting consensus with versions:\n\n Base: {:?}\nUpgrade: {:?}.",
274            V::Base::VERSION,
275            V::Upgrade::VERSION,
276        );
277        tracing::warn!(
278            "Loading previously decided upgrade certificate from storage: {:?}",
279            initializer.decided_upgrade_certificate
280        );
281
282        let upgrade_lock =
283            UpgradeLock::<TYPES, V>::from_certificate(&initializer.decided_upgrade_certificate);
284
285        debug!("Setting DRB difficulty selector in membership");
286        let drb_difficulty_selector = drb_difficulty_selector(upgrade_lock.clone(), &config);
287
288        membership_coordinator
289            .set_drb_difficulty_selector(drb_difficulty_selector)
290            .await;
291
292        // Allow overflow on the external channel, otherwise sending to it may block.
293        external_rx.set_overflow(true);
294
295        // Allow overflow on the internal channel as well. We don't want to block consensus if we
296        // have a slow receiver
297        internal_rx.set_overflow(true);
298
299        // Get the validated state from the initializer or construct an incomplete one from the
300        // block header.
301        let validated_state = initializer.anchor_state;
302
303        load_start_epoch_info(
304            membership_coordinator.membership(),
305            &initializer.start_epoch_info,
306            config.epoch_height,
307            config.epoch_start_block,
308        )
309        .await;
310
311        // #3967 REVIEW NOTE: Should this actually be Some()? How do we know?
312        let epoch = initializer.high_qc.data.block_number.map(|block_number| {
313            TYPES::Epoch::new(epoch_from_block_number(
314                block_number + 1,
315                config.epoch_height,
316            ))
317        });
318
319        // Insert the validated state to state map.
320        let mut validated_state_map = BTreeMap::default();
321        validated_state_map.insert(
322            anchored_leaf.view_number(),
323            View {
324                view_inner: ViewInner::Leaf {
325                    leaf: anchored_leaf.commit(),
326                    state: Arc::clone(&validated_state),
327                    delta: initializer.anchor_state_delta,
328                    epoch,
329                },
330            },
331        );
332        for (view_num, inner) in initializer.undecided_state {
333            validated_state_map.insert(view_num, inner);
334        }
335
336        let mut saved_leaves = HashMap::new();
337        let mut saved_payloads = BTreeMap::new();
338        saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
339
340        for (_, leaf) in initializer.undecided_leaves {
341            saved_leaves.insert(leaf.commit(), leaf.clone());
342        }
343        if let Some(payload) = anchored_leaf.block_payload() {
344            let metadata = anchored_leaf.block_header().metadata().clone();
345            saved_payloads.insert(
346                anchored_leaf.view_number(),
347                Arc::new(PayloadWithMetadata { payload, metadata }),
348            );
349        }
350
351        let consensus = Consensus::new(
352            validated_state_map,
353            Some(initializer.saved_vid_shares),
354            anchored_leaf.view_number(),
355            epoch,
356            anchored_leaf.view_number(),
357            anchored_leaf.view_number(),
358            initializer.last_actioned_view,
359            initializer.saved_proposals,
360            saved_leaves,
361            saved_payloads,
362            initializer.high_qc,
363            initializer.next_epoch_high_qc,
364            Arc::clone(&consensus_metrics),
365            config.epoch_height,
366            initializer.state_cert,
367            config.drb_difficulty,
368            config.drb_upgrade_difficulty,
369        );
370
371        let consensus = Arc::new(RwLock::new(consensus));
372
373        // This makes it so we won't block on broadcasting if there is not a receiver
374        // Our own copy of the receiver is inactive so it doesn't count.
375        external_tx.set_await_active(false);
376
377        let inner: Arc<SystemContext<TYPES, I, V>> = Arc::new(SystemContext {
378            id: nonce,
379            consensus: OuterConsensus::new(consensus),
380            instance_state: Arc::new(instance_state),
381            public_key,
382            private_key,
383            state_private_key,
384            config,
385            start_view: initializer.start_view,
386            start_epoch: initializer.start_epoch,
387            network,
388            membership_coordinator,
389            metrics: Arc::clone(&consensus_metrics),
390            internal_event_stream: (internal_tx, internal_rx.deactivate()),
391            output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
392            external_event_stream: (external_tx, external_rx.deactivate()),
393            anchored_leaf: anchored_leaf.clone(),
394            storage,
395            storage_metrics,
396            upgrade_lock,
397        });
398
399        inner
400    }
401
402    /// "Starts" consensus by sending a `Qc2Formed`, `ViewChange` events
403    ///
404    /// # Panics
405    /// Panics if sending genesis fails
406    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
407    pub async fn start_consensus(&self) {
408        #[cfg(all(feature = "rewind", not(debug_assertions)))]
409        compile_error!("Cannot run rewind in production builds!");
410
411        debug!("Starting Consensus");
412        let consensus = self.consensus.read().await;
413
414        let first_epoch = option_epoch_from_block_number::<TYPES>(
415            V::Base::VERSION >= V::Epochs::VERSION,
416            self.config.epoch_start_block,
417            self.config.epoch_height,
418        );
419        // `start_epoch` comes from the initializer, it might be the last seen epoch before restart
420        // `first_epoch` is the first epoch after the transition to the epoch version
421        // `initial_view_change_epoch` is the greater of the two, we use it with the initial view change
422        let initial_view_change_epoch = self.start_epoch.max(first_epoch);
423        #[allow(clippy::panic)]
424        self.internal_event_stream
425            .0
426            .broadcast_direct(Arc::new(HotShotEvent::ViewChange(
427                self.start_view,
428                initial_view_change_epoch,
429            )))
430            .await
431            .unwrap_or_else(|_| {
432                panic!(
433                    "Genesis Broadcast failed; event = ViewChange({:?}, {:?})",
434                    self.start_view, initial_view_change_epoch,
435                )
436            });
437
438        // Clone the event stream that we send the timeout event to
439        let event_stream = self.internal_event_stream.0.clone();
440        let next_view_timeout = self.config.next_view_timeout;
441        let start_view = self.start_view;
442        let start_epoch = self.start_epoch;
443
444        // Spawn a task that will sleep for the next view timeout and then send a timeout event
445        // if not cancelled
446        spawn({
447            async move {
448                sleep(Duration::from_millis(next_view_timeout)).await;
449                broadcast_event(
450                    Arc::new(HotShotEvent::Timeout(start_view, start_epoch)),
451                    &event_stream,
452                )
453                .await;
454            }
455        });
456        #[allow(clippy::panic)]
457        self.internal_event_stream
458            .0
459            .broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
460                consensus.high_qc().clone(),
461            ))))
462            .await
463            .unwrap_or_else(|_| {
464                panic!(
465                    "Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
466                    consensus.high_qc()
467                )
468            });
469
470        {
471            // Some applications seem to expect a leaf decide event for the genesis leaf,
472            // which contains only that leaf and nothing else.
473            if self.anchored_leaf.view_number() == TYPES::View::genesis() {
474                let (validated_state, state_delta) =
475                    TYPES::ValidatedState::genesis(&self.instance_state);
476
477                let qc = Arc::new(
478                    QuorumCertificate2::genesis::<V>(
479                        &validated_state,
480                        self.instance_state.as_ref(),
481                    )
482                    .await,
483                );
484
485                broadcast_event(
486                    Event {
487                        view_number: self.anchored_leaf.view_number(),
488                        event: EventType::Decide {
489                            leaf_chain: Arc::new(vec![LeafInfo::new(
490                                self.anchored_leaf.clone(),
491                                Arc::new(validated_state),
492                                Some(Arc::new(state_delta)),
493                                None,
494                                None,
495                            )]),
496                            qc,
497                            block_size: None,
498                        },
499                    },
500                    &self.external_event_stream.0,
501                )
502                .await;
503            }
504        }
505    }
506
507    /// Emit an external event
508    async fn send_external_event(&self, event: Event<TYPES>) {
509        debug!(?event, "send_external_event");
510        broadcast_event(event, &self.external_event_stream.0).await;
511    }
512
513    /// Publishes a transaction asynchronously to the network.
514    ///
515    /// # Errors
516    ///
517    /// Always returns Ok; does not return an error if the transaction couldn't be published to the network
518    #[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
519    pub async fn publish_transaction_async(
520        &self,
521        transaction: TYPES::Transaction,
522    ) -> Result<(), HotShotError<TYPES>> {
523        trace!("Adding transaction to our own queue");
524
525        let api = self.clone();
526
527        let consensus_reader = api.consensus.read().await;
528        let view_number = consensus_reader.cur_view();
529        let epoch = consensus_reader.cur_epoch();
530        drop(consensus_reader);
531
532        // Wrap up a message
533        let message_kind: DataMessage<TYPES> =
534            DataMessage::SubmitTransaction(transaction.clone(), view_number);
535        let message = Message {
536            sender: api.public_key.clone(),
537            kind: MessageKind::from(message_kind),
538        };
539
540        let serialized_message = self.upgrade_lock.serialize(&message).await.map_err(|err| {
541            HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
542        })?;
543
544        let membership = match api.membership_coordinator.membership_for_epoch(epoch).await {
545            Ok(m) => m,
546            Err(e) => return Err(HotShotError::InvalidState(e.message)),
547        };
548
549        spawn(async move {
550            let memberships_da_committee_members = membership
551                .da_committee_members(view_number)
552                .await
553                .iter()
554                .cloned()
555                .collect();
556
557            join! {
558                // TODO We should have a function that can return a network error if there is one
559                // but first we'd need to ensure our network implementations can support that
560                // (and not hang instead)
561
562                // version <0, 1> currently fixed; this is the same as VERSION_0_1,
563                // and will be updated to be part of SystemContext. I wanted to use associated
564                // constants in NodeType, but that seems to be unavailable in the current Rust.
565                api
566                    .network.da_broadcast_message(
567                        serialized_message,
568                        memberships_da_committee_members,
569                        BroadcastDelay::None,
570                    ),
571                api
572                    .send_external_event(Event {
573                        view_number,
574                        event: EventType::Transactions {
575                            transactions: vec![transaction],
576                        },
577                    }),
578            }
579        });
580        Ok(())
581    }
582
583    /// Returns a copy of the consensus struct
584    #[must_use]
585    pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
586        Arc::clone(&self.consensus.inner_consensus)
587    }
588
589    /// Returns a copy of the instance state
590    pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
591        Arc::clone(&self.instance_state)
592    }
593
594    /// Returns a copy of the last decided leaf
595    /// # Panics
596    /// Panics if internal leaf for consensus is inconsistent
597    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
598    pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
599        self.consensus.read().await.decided_leaf()
600    }
601
602    /// [Non-blocking] instantly returns a copy of the last decided leaf if
603    /// it is available to be read. If not, we return `None`.
604    ///
605    /// # Panics
606    /// Panics if internal state for consensus is inconsistent
607    #[must_use]
608    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
609    pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
610        self.consensus.try_read().map(|guard| guard.decided_leaf())
611    }
612
613    /// Returns the last decided validated state.
614    ///
615    /// # Panics
616    /// Panics if internal state for consensus is inconsistent
617    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
618    pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
619        Arc::clone(&self.consensus.read().await.decided_state())
620    }
621
622    /// Get the validated state from a given `view`.
623    ///
624    /// Returns the requested state, if the [`SystemContext`] is tracking this view. Consensus
625    /// tracks views that have not yet been decided but could be in the future. This function may
626    /// return [`None`] if the requested view has already been decided (but see
627    /// [`decided_state`](Self::decided_state)) or if there is no path for the requested
628    /// view to ever be decided.
629    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
630    pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
631        self.consensus.read().await.state(view).cloned()
632    }
633
634    /// Initializes a new [`SystemContext`] and does the work of setting up all the background tasks
635    ///
636    /// Assumes networking implementation is already primed.
637    ///
638    /// Underlying `HotShot` instance starts out paused, and must be unpaused
639    ///
640    /// Upon encountering an unrecoverable error, such as a failure to send to a broadcast channel,
641    /// the `HotShot` instance will log the error and shut down.
642    ///
643    /// To construct a [`SystemContext`] without setting up tasks, use `fn new` instead.
644    /// # Errors
645    ///
646    /// Can throw an error if `Self::new` fails.
647    #[allow(clippy::too_many_arguments)]
648    pub async fn init(
649        public_key: TYPES::SignatureKey,
650        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
651        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
652        node_id: u64,
653        config: HotShotConfig<TYPES>,
654        memberships: EpochMembershipCoordinator<TYPES>,
655        network: Arc<I::Network>,
656        initializer: HotShotInitializer<TYPES>,
657        consensus_metrics: ConsensusMetricsValue,
658        storage: I::Storage,
659        storage_metrics: StorageMetricsValue,
660    ) -> Result<
661        (
662            SystemContextHandle<TYPES, I, V>,
663            Sender<Arc<HotShotEvent<TYPES>>>,
664            Receiver<Arc<HotShotEvent<TYPES>>>,
665        ),
666        HotShotError<TYPES>,
667    > {
668        let hotshot = Self::new(
669            public_key,
670            private_key,
671            state_private_key,
672            node_id,
673            config,
674            memberships,
675            network,
676            initializer,
677            consensus_metrics,
678            storage,
679            storage_metrics,
680        )
681        .await;
682        let handle = Arc::clone(&hotshot).run_tasks().await;
683        let (tx, rx) = hotshot.internal_event_stream.clone();
684
685        Ok((handle, tx, rx.activate()))
686    }
687    /// return the timeout for a view for `self`
688    #[must_use]
689    pub fn next_view_timeout(&self) -> u64 {
690        self.config.next_view_timeout
691    }
692}
693
694impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
695    /// Spawn all tasks that operate on [`SystemContextHandle`].
696    ///
697    /// For a list of which tasks are being spawned, see this module's documentation.
698    pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I, V> {
699        let consensus_registry = ConsensusTaskRegistry::new();
700        let network_registry = NetworkTaskRegistry::new();
701
702        let output_event_stream = self.external_event_stream.clone();
703        let internal_event_stream = self.internal_event_stream.clone();
704
705        let mut handle = SystemContextHandle {
706            consensus_registry,
707            network_registry,
708            output_event_stream: output_event_stream.clone(),
709            internal_event_stream: internal_event_stream.clone(),
710            hotshot: self.clone().into(),
711            storage: self.storage.clone(),
712            network: Arc::clone(&self.network),
713            membership_coordinator: self.membership_coordinator.clone(),
714            epoch_height: self.config.epoch_height,
715        };
716
717        add_network_tasks::<TYPES, I, V>(&mut handle).await;
718        add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
719
720        handle
721    }
722}
723
724/// An async broadcast channel
725type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
726
727#[async_trait]
728/// Trait for handling messages for a node with a twin copy of consensus
729pub trait TwinsHandlerState<TYPES, I, V>
730where
731    Self: std::fmt::Debug + Send + Sync,
732    TYPES: NodeType,
733    I: NodeImplementation<TYPES>,
734    V: Versions,
735{
736    /// Handle a message sent to the twin from the network task, forwarding it to one of the two twins.
737    async fn send_handler(
738        &mut self,
739        event: &HotShotEvent<TYPES>,
740    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
741
742    /// Handle a message from either twin, forwarding it to the network task
743    async fn recv_handler(
744        &mut self,
745        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
746    ) -> Vec<HotShotEvent<TYPES>>;
747
748    /// Fuse two channels into a single channel
749    ///
750    /// Note: the channels are fused using two async loops, whose `JoinHandle`s are dropped.
751    fn fuse_channels(
752        &'static mut self,
753        left: Channel<HotShotEvent<TYPES>>,
754        right: Channel<HotShotEvent<TYPES>>,
755    ) -> Channel<HotShotEvent<TYPES>> {
756        let send_state = Arc::new(RwLock::new(self));
757        let recv_state = Arc::clone(&send_state);
758
759        let (left_sender, mut left_receiver) = (left.0, left.1);
760        let (right_sender, mut right_receiver) = (right.0, right.1);
761
762        // channel to the network task
763        let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
764        // channel from the network task
765        let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
766            broadcast(EVENT_CHANNEL_SIZE);
767
768        let _recv_loop_handle = spawn(async move {
769            loop {
770                let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
771                    Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
772                    Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
773                };
774
775                let mut state = recv_state.write().await;
776                let mut result = state.recv_handler(&msg).await;
777
778                while let Some(event) = result.pop() {
779                    let _ = sender_to_network.broadcast(event.into()).await;
780                }
781            }
782        });
783
784        let _send_loop_handle = spawn(async move {
785            loop {
786                if let Ok(msg) = receiver_from_network.recv().await {
787                    let mut state = send_state.write().await;
788
789                    let mut result = state.send_handler(&msg).await;
790
791                    while let Some(event) = result.pop() {
792                        match event {
793                            Either::Left(msg) => {
794                                let _ = left_sender.broadcast(msg.into()).await;
795                            },
796                            Either::Right(msg) => {
797                                let _ = right_sender.broadcast(msg.into()).await;
798                            },
799                        }
800                    }
801                }
802            }
803        });
804
805        (network_task_sender, network_task_receiver)
806    }
807
808    #[allow(clippy::too_many_arguments)]
809    /// Spawn all tasks that operate on [`SystemContextHandle`].
810    ///
811    /// For a list of which tasks are being spawned, see this module's documentation.
812    async fn spawn_twin_handles(
813        &'static mut self,
814        public_key: TYPES::SignatureKey,
815        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
816        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
817        nonce: u64,
818        config: HotShotConfig<TYPES>,
819        memberships: EpochMembershipCoordinator<TYPES>,
820        network: Arc<I::Network>,
821        initializer: HotShotInitializer<TYPES>,
822        consensus_metrics: ConsensusMetricsValue,
823        storage: I::Storage,
824        storage_metrics: StorageMetricsValue,
825    ) -> (
826        SystemContextHandle<TYPES, I, V>,
827        SystemContextHandle<TYPES, I, V>,
828    ) {
829        let epoch_height = config.epoch_height;
830        let left_system_context = SystemContext::new(
831            public_key.clone(),
832            private_key.clone(),
833            state_private_key.clone(),
834            nonce,
835            config.clone(),
836            memberships.clone(),
837            Arc::clone(&network),
838            initializer.clone(),
839            consensus_metrics.clone(),
840            storage.clone(),
841            storage_metrics.clone(),
842        )
843        .await;
844        let right_system_context = SystemContext::new(
845            public_key,
846            private_key,
847            state_private_key,
848            nonce,
849            config,
850            memberships,
851            network,
852            initializer,
853            consensus_metrics,
854            storage,
855            storage_metrics,
856        )
857        .await;
858
859        // create registries for both handles
860        let left_consensus_registry = ConsensusTaskRegistry::new();
861        let left_network_registry = NetworkTaskRegistry::new();
862
863        let right_consensus_registry = ConsensusTaskRegistry::new();
864        let right_network_registry = NetworkTaskRegistry::new();
865
866        // create external channels for both handles
867        let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
868        let left_external_event_stream =
869            (left_external_sender, left_external_receiver.deactivate());
870
871        let (right_external_sender, right_external_receiver) =
872            broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
873        let right_external_event_stream =
874            (right_external_sender, right_external_receiver.deactivate());
875
876        // create internal channels for both handles
877        let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
878        let left_internal_event_stream = (
879            left_internal_sender.clone(),
880            left_internal_receiver.clone().deactivate(),
881        );
882
883        let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
884        let right_internal_event_stream = (
885            right_internal_sender.clone(),
886            right_internal_receiver.clone().deactivate(),
887        );
888
889        // create each handle
890        let mut left_handle = SystemContextHandle::<_, I, _> {
891            consensus_registry: left_consensus_registry,
892            network_registry: left_network_registry,
893            output_event_stream: left_external_event_stream.clone(),
894            internal_event_stream: left_internal_event_stream.clone(),
895            hotshot: Arc::clone(&left_system_context),
896            storage: left_system_context.storage.clone(),
897            network: Arc::clone(&left_system_context.network),
898            membership_coordinator: left_system_context.membership_coordinator.clone(),
899            epoch_height,
900        };
901
902        let mut right_handle = SystemContextHandle::<_, I, _> {
903            consensus_registry: right_consensus_registry,
904            network_registry: right_network_registry,
905            output_event_stream: right_external_event_stream.clone(),
906            internal_event_stream: right_internal_event_stream.clone(),
907            hotshot: Arc::clone(&right_system_context),
908            storage: right_system_context.storage.clone(),
909            network: Arc::clone(&right_system_context.network),
910            membership_coordinator: right_system_context.membership_coordinator.clone(),
911            epoch_height,
912        };
913
914        // add consensus tasks to each handle, using their individual internal event streams
915        add_consensus_tasks::<TYPES, I, V>(&mut left_handle).await;
916        add_consensus_tasks::<TYPES, I, V>(&mut right_handle).await;
917
918        // fuse the event streams from both handles before initializing the network tasks
919        let fused_internal_event_stream = self.fuse_channels(
920            (left_internal_sender, left_internal_receiver),
921            (right_internal_sender, right_internal_receiver),
922        );
923
924        // swap out the event stream on the left handle
925        left_handle.internal_event_stream = (
926            fused_internal_event_stream.0,
927            fused_internal_event_stream.1.deactivate(),
928        );
929
930        // add the network tasks to the left handle. note: because the left handle has the fused event stream, the network tasks on the left handle will handle messages from both handles.
931        add_network_tasks::<TYPES, I, V>(&mut left_handle).await;
932
933        // revert to the original event stream on the left handle, for any applications that want to listen to it
934        left_handle.internal_event_stream = left_internal_event_stream.clone();
935
936        (left_handle, right_handle)
937    }
938}
939
940#[derive(Debug)]
941/// A `TwinsHandlerState` that randomly forwards a message to either twin,
942/// and returns messages from both.
943pub struct RandomTwinsHandler;
944
945#[async_trait]
946impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
947    for RandomTwinsHandler
948{
949    async fn send_handler(
950        &mut self,
951        event: &HotShotEvent<TYPES>,
952    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
953        let random: bool = rand::thread_rng().gen();
954
955        #[allow(clippy::match_bool)]
956        match random {
957            true => vec![Either::Left(event.clone())],
958            false => vec![Either::Right(event.clone())],
959        }
960    }
961
962    async fn recv_handler(
963        &mut self,
964        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
965    ) -> Vec<HotShotEvent<TYPES>> {
966        match event {
967            Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
968        }
969    }
970}
971
972#[derive(Debug)]
973/// A `TwinsHandlerState` that forwards each message to both twins,
974/// and returns messages from each of them.
975pub struct DoubleTwinsHandler;
976
977#[async_trait]
978impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
979    for DoubleTwinsHandler
980{
981    async fn send_handler(
982        &mut self,
983        event: &HotShotEvent<TYPES>,
984    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
985        vec![Either::Left(event.clone()), Either::Right(event.clone())]
986    }
987
988    async fn recv_handler(
989        &mut self,
990        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
991    ) -> Vec<HotShotEvent<TYPES>> {
992        match event {
993            Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
994        }
995    }
996}
997
998#[async_trait]
999impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusApi<TYPES, I>
1000    for SystemContextHandle<TYPES, I, V>
1001{
1002    fn total_nodes(&self) -> NonZeroUsize {
1003        self.hotshot.config.num_nodes_with_stake
1004    }
1005
1006    fn builder_timeout(&self) -> Duration {
1007        self.hotshot.config.builder_timeout
1008    }
1009
1010    async fn send_event(&self, event: Event<TYPES>) {
1011        debug!(?event, "send_event");
1012        broadcast_event(event, &self.hotshot.external_event_stream.0).await;
1013    }
1014
1015    fn public_key(&self) -> &TYPES::SignatureKey {
1016        &self.hotshot.public_key
1017    }
1018
1019    fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
1020        &self.hotshot.private_key
1021    }
1022
1023    fn state_private_key(
1024        &self,
1025    ) -> &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey {
1026        &self.hotshot.state_private_key
1027    }
1028}
1029
1030#[derive(Clone, Debug, PartialEq)]
1031pub struct InitializerEpochInfo<TYPES: NodeType> {
1032    pub epoch: TYPES::Epoch,
1033    pub drb_result: DrbResult,
1034    // pub stake_table: Option<StakeTable>, // TODO: Figure out how to connect this up
1035    pub block_header: Option<TYPES::BlockHeader>,
1036}
1037
1038#[derive(Clone)]
1039/// initializer struct for creating starting block
1040pub struct HotShotInitializer<TYPES: NodeType> {
1041    /// Instance-level state.
1042    pub instance_state: TYPES::InstanceState,
1043
1044    /// Epoch height
1045    pub epoch_height: u64,
1046
1047    /// Epoch start block
1048    pub epoch_start_block: u64,
1049
1050    /// the anchor leaf for the hotshot initializer
1051    pub anchor_leaf: Leaf2<TYPES>,
1052
1053    /// ValidatedState for the anchor leaf
1054    pub anchor_state: Arc<TYPES::ValidatedState>,
1055
1056    /// ValidatedState::Delta for the anchor leaf, optional.
1057    pub anchor_state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1058
1059    /// Starting view number that should be equivalent to the view the node shut down with last.
1060    pub start_view: TYPES::View,
1061
1062    /// The view we last performed an action in.  An action is proposing or voting for
1063    /// either the quorum or DA.
1064    pub last_actioned_view: TYPES::View,
1065
1066    /// Starting epoch number that should be equivalent to the epoch the node shut down with last.
1067    pub start_epoch: Option<TYPES::Epoch>,
1068
1069    /// Highest QC that was seen, for genesis it's the genesis QC.  It should be for a view greater
1070    /// than `inner`s view number for the non genesis case because we must have seen higher QCs
1071    /// to decide on the leaf.
1072    pub high_qc: QuorumCertificate2<TYPES>,
1073
1074    /// Next epoch highest QC that was seen. This is needed to propose during epoch transition after restart.
1075    pub next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
1076
1077    /// Proposals we have sent out to provide to others for catchup
1078    pub saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1079
1080    /// Previously decided upgrade certificate; this is necessary if an upgrade has happened and we are not restarting with the new version
1081    pub decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1082
1083    /// Undecided leaves that were seen, but not yet decided on.  These allow a restarting node
1084    /// to vote and propose right away if they didn't miss anything while down.
1085    pub undecided_leaves: BTreeMap<TYPES::View, Leaf2<TYPES>>,
1086
1087    /// Not yet decided state
1088    pub undecided_state: BTreeMap<TYPES::View, View<TYPES>>,
1089
1090    /// Saved VID shares
1091    pub saved_vid_shares: VidShares<TYPES>,
1092
1093    /// The last formed light client state update certificate if there's any
1094    pub state_cert: Option<LightClientStateUpdateCertificate<TYPES>>,
1095
1096    /// Saved epoch information. This must be sorted ascending by epoch.
1097    pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1098}
1099
1100impl<TYPES: NodeType> HotShotInitializer<TYPES> {
1101    /// initialize from genesis
1102    /// # Errors
1103    /// If we are unable to apply the genesis block to the default state
1104    pub async fn from_genesis<V: Versions>(
1105        instance_state: TYPES::InstanceState,
1106        epoch_height: u64,
1107        epoch_start_block: u64,
1108        start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1109    ) -> Result<Self, HotShotError<TYPES>> {
1110        let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
1111        let high_qc = QuorumCertificate2::genesis::<V>(&validated_state, &instance_state).await;
1112
1113        Ok(Self {
1114            anchor_leaf: Leaf2::genesis::<V>(&validated_state, &instance_state).await,
1115            anchor_state: Arc::new(validated_state),
1116            anchor_state_delta: Some(Arc::new(state_delta)),
1117            start_view: TYPES::View::new(0),
1118            start_epoch: genesis_epoch_from_version::<V, TYPES>(),
1119            last_actioned_view: TYPES::View::new(0),
1120            saved_proposals: BTreeMap::new(),
1121            high_qc,
1122            next_epoch_high_qc: None,
1123            decided_upgrade_certificate: None,
1124            undecided_leaves: BTreeMap::new(),
1125            undecided_state: BTreeMap::new(),
1126            instance_state,
1127            saved_vid_shares: BTreeMap::new(),
1128            epoch_height,
1129            state_cert: None,
1130            epoch_start_block,
1131            start_epoch_info,
1132        })
1133    }
1134
1135    /// Use saved proposals to update undecided leaves and state
1136    #[must_use]
1137    pub fn update_undecided(self) -> Self {
1138        let mut undecided_leaves = self.undecided_leaves.clone();
1139        let mut undecided_state = self.undecided_state.clone();
1140
1141        for proposal in self.saved_proposals.values() {
1142            // skip proposals unless they're newer than the anchor leaf
1143            if proposal.data.view_number() <= self.anchor_leaf.view_number() {
1144                continue;
1145            }
1146
1147            undecided_leaves.insert(
1148                proposal.data.view_number(),
1149                Leaf2::from_quorum_proposal(&proposal.data),
1150            );
1151        }
1152
1153        for leaf in undecided_leaves.values() {
1154            let view_inner = ViewInner::Leaf {
1155                leaf: leaf.commit(),
1156                state: Arc::new(TYPES::ValidatedState::from_header(leaf.block_header())),
1157                delta: None,
1158                epoch: leaf.epoch(self.epoch_height),
1159            };
1160            let view = View { view_inner };
1161
1162            undecided_state.insert(leaf.view_number(), view);
1163        }
1164
1165        Self {
1166            undecided_leaves,
1167            undecided_state,
1168            ..self
1169        }
1170    }
1171
1172    /// Create a `HotShotInitializer` from the given information.
1173    ///
1174    /// This function uses the anchor leaf to set the initial validated state,
1175    /// and populates `undecided_leaves` and `undecided_state` using `saved_proposals`.
1176    ///
1177    /// If you are able to or would prefer to set these yourself,
1178    /// you should use the `HotShotInitializer` constructor directly.
1179    #[allow(clippy::too_many_arguments)]
1180    pub fn load(
1181        instance_state: TYPES::InstanceState,
1182        epoch_height: u64,
1183        epoch_start_block: u64,
1184        start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1185        anchor_leaf: Leaf2<TYPES>,
1186        (start_view, start_epoch): (TYPES::View, Option<TYPES::Epoch>),
1187        (high_qc, next_epoch_high_qc): (
1188            QuorumCertificate2<TYPES>,
1189            Option<NextEpochQuorumCertificate2<TYPES>>,
1190        ),
1191        last_actioned_view: TYPES::View,
1192        saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1193        saved_vid_shares: VidShares<TYPES>,
1194        decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1195        state_cert: Option<LightClientStateUpdateCertificate<TYPES>>,
1196    ) -> Self {
1197        let anchor_state = Arc::new(TYPES::ValidatedState::from_header(
1198            anchor_leaf.block_header(),
1199        ));
1200        let anchor_state_delta = None;
1201
1202        let initializer = Self {
1203            instance_state,
1204            epoch_height,
1205            epoch_start_block,
1206            anchor_leaf,
1207            anchor_state,
1208            anchor_state_delta,
1209            high_qc,
1210            start_view,
1211            start_epoch,
1212            last_actioned_view,
1213            saved_proposals,
1214            saved_vid_shares,
1215            next_epoch_high_qc,
1216            decided_upgrade_certificate,
1217            undecided_leaves: BTreeMap::new(),
1218            undecided_state: BTreeMap::new(),
1219            state_cert,
1220            start_epoch_info,
1221        };
1222
1223        initializer.update_undecided()
1224    }
1225}
1226
1227async fn load_start_epoch_info<TYPES: NodeType>(
1228    membership: &Arc<RwLock<TYPES::Membership>>,
1229    start_epoch_info: &Vec<InitializerEpochInfo<TYPES>>,
1230    epoch_height: u64,
1231    epoch_start_block: u64,
1232) {
1233    let first_epoch_number =
1234        TYPES::Epoch::new(epoch_from_block_number(epoch_start_block, epoch_height));
1235
1236    tracing::warn!("Calling set_first_epoch for epoch {first_epoch_number}");
1237    membership
1238        .write()
1239        .await
1240        .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
1241
1242    for epoch_info in start_epoch_info {
1243        if let Some(block_header) = &epoch_info.block_header {
1244            tracing::info!("Calling add_epoch_root for epoch {}", epoch_info.epoch);
1245
1246            Membership::add_epoch_root(
1247                Arc::clone(membership),
1248                epoch_info.epoch,
1249                block_header.clone(),
1250            )
1251            .await
1252            .unwrap_or_else(|err| {
1253                // REVIEW NOTE: Should we panic here? a failure here seems like it should be fatal
1254                tracing::error!(
1255                    "Failed to add epoch root for epoch {}: {err}",
1256                    epoch_info.epoch
1257                );
1258            });
1259        }
1260    }
1261
1262    for epoch_info in start_epoch_info {
1263        tracing::info!("Calling add_drb_result for epoch {}", epoch_info.epoch);
1264        membership
1265            .write()
1266            .await
1267            .add_drb_result(epoch_info.epoch, epoch_info.drb_result);
1268    }
1269}