hotshot/
lib.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides a generic rust implementation of the `HotShot` BFT protocol
8//!
9
10// Documentation module
11#[cfg(feature = "docs")]
12pub mod documentation;
13use committable::Committable;
14use futures::future::{select, Either};
15use hotshot_types::{
16    drb::{drb_difficulty_selector, DrbResult, INITIAL_DRB_RESULT},
17    epoch_membership::EpochMembershipCoordinator,
18    message::UpgradeLock,
19    simple_certificate::LightClientStateUpdateCertificateV2,
20    traits::{
21        block_contents::BlockHeader, election::Membership, network::BroadcastDelay,
22        node_implementation::Versions, signature_key::StateSignatureKey, storage::Storage,
23    },
24    utils::epoch_from_block_number,
25};
26use rand::Rng;
27use vbs::version::StaticVersionType;
28
29/// Contains traits consumed by [`SystemContext`]
30pub mod traits;
31/// Contains types used by the crate
32pub mod types;
33
34pub mod tasks;
35use hotshot_types::data::QuorumProposalWrapper;
36
37/// Contains helper functions for the crate
38pub mod helpers;
39
40use std::{
41    collections::{BTreeMap, HashMap},
42    num::NonZeroUsize,
43    sync::Arc,
44    time::Duration,
45};
46
47use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
48use async_lock::RwLock;
49use async_trait::async_trait;
50use futures::join;
51use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
52use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
53// Internal
54/// Reexport error type
55pub use hotshot_types::error::HotShotError;
56use hotshot_types::{
57    consensus::{
58        Consensus, ConsensusMetricsValue, OuterConsensus, PayloadWithMetadata, VidShares, View,
59        ViewInner,
60    },
61    constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
62    data::Leaf2,
63    event::{EventType, LeafInfo},
64    message::{DataMessage, Message, MessageKind, Proposal},
65    simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
66    storage_metrics::StorageMetricsValue,
67    traits::{
68        consensus_api::ConsensusApi,
69        network::ConnectedNetwork,
70        node_implementation::{ConsensusTime, NodeType},
71        signature_key::SignatureKey,
72        states::ValidatedState,
73    },
74    utils::{genesis_epoch_from_version, option_epoch_from_block_number},
75    HotShotConfig,
76};
77/// Reexport rand crate
78pub use rand;
79use tokio::{spawn, time::sleep};
80use tracing::{debug, instrument, trace};
81
82// -- Rexports
83// External
84use crate::{
85    tasks::{add_consensus_tasks, add_network_tasks},
86    traits::NodeImplementation,
87    types::{Event, SystemContextHandle},
88};
89
90/// Length, in bytes, of a 512 bit hash
91pub const H_512: usize = 64;
92/// Length, in bytes, of a 256 bit hash
93pub const H_256: usize = 32;
94
95/// Holds the state needed to participate in `HotShot` consensus
96pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
97    /// The public key of this node
98    public_key: TYPES::SignatureKey,
99
100    /// The private key of this node
101    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
102
103    /// The private key to sign the light client state
104    state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106    /// Configuration items for this hotshot instance
107    pub config: HotShotConfig<TYPES>,
108
109    /// The underlying network
110    pub network: Arc<I::Network>,
111
112    /// Memberships used by consensus
113    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
114
115    /// the metrics that the implementor is using.
116    metrics: Arc<ConsensusMetricsValue>,
117
118    /// The hotstuff implementation
119    consensus: OuterConsensus<TYPES>,
120
121    /// Immutable instance state
122    instance_state: Arc<TYPES::InstanceState>,
123
124    /// The view to enter when first starting consensus
125    start_view: TYPES::View,
126
127    /// The epoch to enter when first starting consensus
128    start_epoch: Option<TYPES::Epoch>,
129
130    /// Access to the output event stream.
131    output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
132
133    /// External event stream for communication with the application.
134    pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
135
136    /// Anchored leaf provided by the initializer.
137    anchored_leaf: Leaf2<TYPES>,
138
139    /// access to the internal event stream, in case we need to, say, shut something down
140    #[allow(clippy::type_complexity)]
141    internal_event_stream: (
142        Sender<Arc<HotShotEvent<TYPES>>>,
143        InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
144    ),
145
146    /// uid for instrumentation
147    pub id: u64,
148
149    /// Reference to the internal storage for consensus datum.
150    pub storage: I::Storage,
151
152    /// Storage metrics
153    pub storage_metrics: Arc<StorageMetricsValue>,
154
155    /// shared lock for upgrade information
156    pub upgrade_lock: UpgradeLock<TYPES, V>,
157}
158impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Clone
159    for SystemContext<TYPES, I, V>
160{
161    #![allow(deprecated)]
162    fn clone(&self) -> Self {
163        Self {
164            public_key: self.public_key.clone(),
165            private_key: self.private_key.clone(),
166            state_private_key: self.state_private_key.clone(),
167            config: self.config.clone(),
168            network: Arc::clone(&self.network),
169            membership_coordinator: self.membership_coordinator.clone(),
170            metrics: Arc::clone(&self.metrics),
171            consensus: self.consensus.clone(),
172            instance_state: Arc::clone(&self.instance_state),
173            start_view: self.start_view,
174            start_epoch: self.start_epoch,
175            output_event_stream: self.output_event_stream.clone(),
176            external_event_stream: self.external_event_stream.clone(),
177            anchored_leaf: self.anchored_leaf.clone(),
178            internal_event_stream: self.internal_event_stream.clone(),
179            id: self.id,
180            storage: self.storage.clone(),
181            storage_metrics: Arc::clone(&self.storage_metrics),
182            upgrade_lock: self.upgrade_lock.clone(),
183        }
184    }
185}
186
187impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
188    #![allow(deprecated)]
189    /// Creates a new [`Arc<SystemContext>`] with the given configuration options.
190    ///
191    /// To do a full initialization, use `fn init` instead, which will set up background tasks as
192    /// well.
193    ///
194    /// Use this instead of `init` if you want to start the tasks manually
195    ///
196    /// # Panics
197    ///
198    /// Panics if storage migration fails.
199    #[allow(clippy::too_many_arguments)]
200    pub async fn new(
201        public_key: TYPES::SignatureKey,
202        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
203        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
204        nonce: u64,
205        config: HotShotConfig<TYPES>,
206        memberships: EpochMembershipCoordinator<TYPES>,
207        network: Arc<I::Network>,
208        initializer: HotShotInitializer<TYPES>,
209        consensus_metrics: ConsensusMetricsValue,
210        storage: I::Storage,
211        storage_metrics: StorageMetricsValue,
212    ) -> Arc<Self> {
213        let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
214        let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
215
216        Self::new_from_channels(
217            public_key,
218            private_key,
219            state_private_key,
220            nonce,
221            config,
222            memberships,
223            network,
224            initializer,
225            consensus_metrics,
226            storage,
227            storage_metrics,
228            internal_chan,
229            external_chan,
230        )
231        .await
232    }
233
234    /// Creates a new [`Arc<SystemContext>`] with the given configuration options.
235    ///
236    /// To do a full initialization, use `fn init` instead, which will set up background tasks as
237    /// well.
238    ///
239    /// Use this function if you want to use some preexisting channels and to spin up the tasks
240    /// and start consensus manually.  Mostly useful for tests
241    #[allow(clippy::too_many_arguments, clippy::type_complexity)]
242    pub async fn new_from_channels(
243        public_key: TYPES::SignatureKey,
244        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
245        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
246        nonce: u64,
247        config: HotShotConfig<TYPES>,
248        membership_coordinator: EpochMembershipCoordinator<TYPES>,
249        network: Arc<I::Network>,
250        initializer: HotShotInitializer<TYPES>,
251        consensus_metrics: ConsensusMetricsValue,
252        storage: I::Storage,
253        storage_metrics: StorageMetricsValue,
254        internal_channel: (
255            Sender<Arc<HotShotEvent<TYPES>>>,
256            Receiver<Arc<HotShotEvent<TYPES>>>,
257        ),
258        external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
259    ) -> Arc<Self> {
260        debug!("Creating a new hotshot");
261
262        tracing::warn!("Starting consensus with HotShotConfig:\n\n {config:?}");
263
264        let consensus_metrics = Arc::new(consensus_metrics);
265        let storage_metrics = Arc::new(storage_metrics);
266        let anchored_leaf = initializer.anchor_leaf;
267        let instance_state = initializer.instance_state;
268
269        let (internal_tx, mut internal_rx) = internal_channel;
270        let (mut external_tx, mut external_rx) = external_channel;
271
272        tracing::warn!(
273            "Starting consensus with versions:\n\n Base: {:?}\nUpgrade: {:?}.",
274            V::Base::VERSION,
275            V::Upgrade::VERSION,
276        );
277        tracing::warn!(
278            "Loading previously decided upgrade certificate from storage: {:?}",
279            initializer.decided_upgrade_certificate
280        );
281
282        let upgrade_lock =
283            UpgradeLock::<TYPES, V>::from_certificate(&initializer.decided_upgrade_certificate);
284
285        debug!("Setting DRB difficulty selector in membership");
286        let drb_difficulty_selector = drb_difficulty_selector(upgrade_lock.clone(), &config);
287
288        membership_coordinator
289            .set_drb_difficulty_selector(drb_difficulty_selector)
290            .await;
291
292        // Allow overflow on the external channel, otherwise sending to it may block.
293        external_rx.set_overflow(true);
294
295        // Allow overflow on the internal channel as well. We don't want to block consensus if we
296        // have a slow receiver
297        internal_rx.set_overflow(true);
298
299        // Get the validated state from the initializer or construct an incomplete one from the
300        // block header.
301        let validated_state = initializer.anchor_state;
302
303        load_start_epoch_info(
304            membership_coordinator.membership(),
305            &initializer.start_epoch_info,
306            config.epoch_height,
307            config.epoch_start_block,
308        )
309        .await;
310
311        // #3967 REVIEW NOTE: Should this actually be Some()? How do we know?
312        let epoch = initializer.high_qc.data.block_number.map(|block_number| {
313            TYPES::Epoch::new(epoch_from_block_number(
314                block_number + 1,
315                config.epoch_height,
316            ))
317        });
318
319        // Insert the validated state to state map.
320        let mut validated_state_map = BTreeMap::default();
321        validated_state_map.insert(
322            anchored_leaf.view_number(),
323            View {
324                view_inner: ViewInner::Leaf {
325                    leaf: anchored_leaf.commit(),
326                    state: Arc::clone(&validated_state),
327                    delta: initializer.anchor_state_delta,
328                    epoch,
329                },
330            },
331        );
332        for (view_num, inner) in initializer.undecided_state {
333            validated_state_map.insert(view_num, inner);
334        }
335
336        let mut saved_leaves = HashMap::new();
337        let mut saved_payloads = BTreeMap::new();
338        saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
339
340        for (_, leaf) in initializer.undecided_leaves {
341            saved_leaves.insert(leaf.commit(), leaf.clone());
342        }
343        if let Some(payload) = anchored_leaf.block_payload() {
344            let metadata = anchored_leaf.block_header().metadata().clone();
345            saved_payloads.insert(
346                anchored_leaf.view_number(),
347                Arc::new(PayloadWithMetadata { payload, metadata }),
348            );
349        }
350
351        let consensus = Consensus::new(
352            validated_state_map,
353            Some(initializer.saved_vid_shares),
354            anchored_leaf.view_number(),
355            epoch,
356            anchored_leaf.view_number(),
357            anchored_leaf.view_number(),
358            initializer.last_actioned_view,
359            initializer.saved_proposals,
360            saved_leaves,
361            saved_payloads,
362            initializer.high_qc,
363            initializer.next_epoch_high_qc,
364            Arc::clone(&consensus_metrics),
365            config.epoch_height,
366            initializer.state_cert,
367            config.drb_difficulty,
368            config.drb_upgrade_difficulty,
369        );
370
371        let consensus = Arc::new(RwLock::new(consensus));
372
373        if let Some(epoch) = epoch {
374            // trigger catchup for the current and next epoch if needed
375            let _ = membership_coordinator
376                .membership_for_epoch(Some(epoch))
377                .await;
378            let _ = membership_coordinator
379                .membership_for_epoch(Some(epoch + 1))
380                .await;
381
382            if let Ok(drb_result) = storage.load_drb_result(epoch + 1).await {
383                tracing::error!("Writing DRB result for epoch {}", epoch + 1);
384                if let Ok(mem) = membership_coordinator
385                    .stake_table_for_epoch(Some(epoch + 1))
386                    .await
387                {
388                    mem.add_drb_result(drb_result).await;
389                }
390            }
391        }
392
393        // This makes it so we won't block on broadcasting if there is not a receiver
394        // Our own copy of the receiver is inactive so it doesn't count.
395        external_tx.set_await_active(false);
396
397        let inner: Arc<SystemContext<TYPES, I, V>> = Arc::new(SystemContext {
398            id: nonce,
399            consensus: OuterConsensus::new(consensus),
400            instance_state: Arc::new(instance_state),
401            public_key,
402            private_key,
403            state_private_key,
404            config,
405            start_view: initializer.start_view,
406            start_epoch: initializer.start_epoch,
407            network,
408            membership_coordinator,
409            metrics: Arc::clone(&consensus_metrics),
410            internal_event_stream: (internal_tx, internal_rx.deactivate()),
411            output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
412            external_event_stream: (external_tx, external_rx.deactivate()),
413            anchored_leaf: anchored_leaf.clone(),
414            storage,
415            storage_metrics,
416            upgrade_lock,
417        });
418
419        inner
420    }
421
422    /// "Starts" consensus by sending a `Qc2Formed`, `ViewChange` events
423    ///
424    /// # Panics
425    /// Panics if sending genesis fails
426    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
427    pub async fn start_consensus(&self) {
428        #[cfg(all(feature = "rewind", not(debug_assertions)))]
429        compile_error!("Cannot run rewind in production builds!");
430
431        debug!("Starting Consensus");
432        let consensus = self.consensus.read().await;
433
434        let first_epoch = option_epoch_from_block_number::<TYPES>(
435            V::Base::VERSION >= V::Epochs::VERSION,
436            self.config.epoch_start_block,
437            self.config.epoch_height,
438        );
439        // `start_epoch` comes from the initializer, it might be the last seen epoch before restart
440        // `first_epoch` is the first epoch after the transition to the epoch version
441        // `initial_view_change_epoch` is the greater of the two, we use it with the initial view change
442        let initial_view_change_epoch = self.start_epoch.max(first_epoch);
443        #[allow(clippy::panic)]
444        self.internal_event_stream
445            .0
446            .broadcast_direct(Arc::new(HotShotEvent::ViewChange(
447                self.start_view,
448                initial_view_change_epoch,
449            )))
450            .await
451            .unwrap_or_else(|_| {
452                panic!(
453                    "Genesis Broadcast failed; event = ViewChange({:?}, {:?})",
454                    self.start_view, initial_view_change_epoch,
455                )
456            });
457
458        // Clone the event stream that we send the timeout event to
459        let event_stream = self.internal_event_stream.0.clone();
460        let next_view_timeout = self.config.next_view_timeout;
461        let start_view = self.start_view;
462        let start_epoch = self.start_epoch;
463
464        // Spawn a task that will sleep for the next view timeout and then send a timeout event
465        // if not cancelled
466        spawn({
467            async move {
468                sleep(Duration::from_millis(next_view_timeout)).await;
469                broadcast_event(
470                    Arc::new(HotShotEvent::Timeout(start_view, start_epoch)),
471                    &event_stream,
472                )
473                .await;
474            }
475        });
476        #[allow(clippy::panic)]
477        self.internal_event_stream
478            .0
479            .broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
480                consensus.high_qc().clone(),
481            ))))
482            .await
483            .unwrap_or_else(|_| {
484                panic!(
485                    "Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
486                    consensus.high_qc()
487                )
488            });
489
490        {
491            // Some applications seem to expect a leaf decide event for the genesis leaf,
492            // which contains only that leaf and nothing else.
493            if self.anchored_leaf.view_number() == TYPES::View::genesis() {
494                let (validated_state, state_delta) =
495                    TYPES::ValidatedState::genesis(&self.instance_state);
496
497                let qc = Arc::new(
498                    QuorumCertificate2::genesis::<V>(
499                        &validated_state,
500                        self.instance_state.as_ref(),
501                    )
502                    .await,
503                );
504
505                broadcast_event(
506                    Event {
507                        view_number: self.anchored_leaf.view_number(),
508                        event: EventType::Decide {
509                            leaf_chain: Arc::new(vec![LeafInfo::new(
510                                self.anchored_leaf.clone(),
511                                Arc::new(validated_state),
512                                Some(Arc::new(state_delta)),
513                                None,
514                                None,
515                            )]),
516                            qc,
517                            block_size: None,
518                        },
519                    },
520                    &self.external_event_stream.0,
521                )
522                .await;
523            }
524        }
525    }
526
527    /// Emit an external event
528    async fn send_external_event(&self, event: Event<TYPES>) {
529        debug!(?event, "send_external_event");
530        broadcast_event(event, &self.external_event_stream.0).await;
531    }
532
533    /// Publishes a transaction asynchronously to the network.
534    ///
535    /// # Errors
536    ///
537    /// Always returns Ok; does not return an error if the transaction couldn't be published to the network
538    #[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
539    pub async fn publish_transaction_async(
540        &self,
541        transaction: TYPES::Transaction,
542    ) -> Result<(), HotShotError<TYPES>> {
543        trace!("Adding transaction to our own queue");
544
545        let api = self.clone();
546
547        let consensus_reader = api.consensus.read().await;
548        let view_number = consensus_reader.cur_view();
549        let epoch = consensus_reader.cur_epoch();
550        drop(consensus_reader);
551
552        // Wrap up a message
553        let message_kind: DataMessage<TYPES> =
554            DataMessage::SubmitTransaction(transaction.clone(), view_number);
555        let message = Message {
556            sender: api.public_key.clone(),
557            kind: MessageKind::from(message_kind),
558        };
559
560        let serialized_message = self.upgrade_lock.serialize(&message).await.map_err(|err| {
561            HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
562        })?;
563
564        let membership = match api.membership_coordinator.membership_for_epoch(epoch).await {
565            Ok(m) => m,
566            Err(e) => return Err(HotShotError::InvalidState(e.message)),
567        };
568
569        spawn(async move {
570            let memberships_da_committee_members = membership
571                .da_committee_members(view_number)
572                .await
573                .iter()
574                .cloned()
575                .collect();
576
577            join! {
578                // TODO We should have a function that can return a network error if there is one
579                // but first we'd need to ensure our network implementations can support that
580                // (and not hang instead)
581
582                // version <0, 1> currently fixed; this is the same as VERSION_0_1,
583                // and will be updated to be part of SystemContext. I wanted to use associated
584                // constants in NodeType, but that seems to be unavailable in the current Rust.
585                api
586                    .network.da_broadcast_message(
587                        serialized_message,
588                        memberships_da_committee_members,
589                        BroadcastDelay::None,
590                    ),
591                api
592                    .send_external_event(Event {
593                        view_number,
594                        event: EventType::Transactions {
595                            transactions: vec![transaction],
596                        },
597                    }),
598            }
599        });
600        Ok(())
601    }
602
603    /// Returns a copy of the consensus struct
604    #[must_use]
605    pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
606        Arc::clone(&self.consensus.inner_consensus)
607    }
608
609    /// Returns a copy of the instance state
610    pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
611        Arc::clone(&self.instance_state)
612    }
613
614    /// Returns a copy of the last decided leaf
615    /// # Panics
616    /// Panics if internal leaf for consensus is inconsistent
617    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
618    pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
619        self.consensus.read().await.decided_leaf()
620    }
621
622    /// [Non-blocking] instantly returns a copy of the last decided leaf if
623    /// it is available to be read. If not, we return `None`.
624    ///
625    /// # Panics
626    /// Panics if internal state for consensus is inconsistent
627    #[must_use]
628    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
629    pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
630        self.consensus.try_read().map(|guard| guard.decided_leaf())
631    }
632
633    /// Returns the last decided validated state.
634    ///
635    /// # Panics
636    /// Panics if internal state for consensus is inconsistent
637    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
638    pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
639        Arc::clone(&self.consensus.read().await.decided_state())
640    }
641
642    /// Get the validated state from a given `view`.
643    ///
644    /// Returns the requested state, if the [`SystemContext`] is tracking this view. Consensus
645    /// tracks views that have not yet been decided but could be in the future. This function may
646    /// return [`None`] if the requested view has already been decided (but see
647    /// [`decided_state`](Self::decided_state)) or if there is no path for the requested
648    /// view to ever be decided.
649    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
650    pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
651        self.consensus.read().await.state(view).cloned()
652    }
653
654    /// Initializes a new [`SystemContext`] and does the work of setting up all the background tasks
655    ///
656    /// Assumes networking implementation is already primed.
657    ///
658    /// Underlying `HotShot` instance starts out paused, and must be unpaused
659    ///
660    /// Upon encountering an unrecoverable error, such as a failure to send to a broadcast channel,
661    /// the `HotShot` instance will log the error and shut down.
662    ///
663    /// To construct a [`SystemContext`] without setting up tasks, use `fn new` instead.
664    /// # Errors
665    ///
666    /// Can throw an error if `Self::new` fails.
667    #[allow(clippy::too_many_arguments)]
668    pub async fn init(
669        public_key: TYPES::SignatureKey,
670        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
671        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
672        node_id: u64,
673        config: HotShotConfig<TYPES>,
674        memberships: EpochMembershipCoordinator<TYPES>,
675        network: Arc<I::Network>,
676        initializer: HotShotInitializer<TYPES>,
677        consensus_metrics: ConsensusMetricsValue,
678        storage: I::Storage,
679        storage_metrics: StorageMetricsValue,
680    ) -> Result<
681        (
682            SystemContextHandle<TYPES, I, V>,
683            Sender<Arc<HotShotEvent<TYPES>>>,
684            Receiver<Arc<HotShotEvent<TYPES>>>,
685        ),
686        HotShotError<TYPES>,
687    > {
688        let hotshot = Self::new(
689            public_key,
690            private_key,
691            state_private_key,
692            node_id,
693            config,
694            memberships,
695            network,
696            initializer,
697            consensus_metrics,
698            storage,
699            storage_metrics,
700        )
701        .await;
702        let handle = Arc::clone(&hotshot).run_tasks().await;
703        let (tx, rx) = hotshot.internal_event_stream.clone();
704
705        Ok((handle, tx, rx.activate()))
706    }
707    /// return the timeout for a view for `self`
708    #[must_use]
709    pub fn next_view_timeout(&self) -> u64 {
710        self.config.next_view_timeout
711    }
712}
713
714impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
715    /// Spawn all tasks that operate on [`SystemContextHandle`].
716    ///
717    /// For a list of which tasks are being spawned, see this module's documentation.
718    pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I, V> {
719        let consensus_registry = ConsensusTaskRegistry::new();
720        let network_registry = NetworkTaskRegistry::new();
721
722        let output_event_stream = self.external_event_stream.clone();
723        let internal_event_stream = self.internal_event_stream.clone();
724
725        let mut handle = SystemContextHandle {
726            consensus_registry,
727            network_registry,
728            output_event_stream: output_event_stream.clone(),
729            internal_event_stream: internal_event_stream.clone(),
730            hotshot: self.clone().into(),
731            storage: self.storage.clone(),
732            network: Arc::clone(&self.network),
733            membership_coordinator: self.membership_coordinator.clone(),
734            epoch_height: self.config.epoch_height,
735        };
736
737        add_network_tasks::<TYPES, I, V>(&mut handle).await;
738        add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
739
740        handle
741    }
742}
743
744/// An async broadcast channel
745type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
746
747#[async_trait]
748/// Trait for handling messages for a node with a twin copy of consensus
749pub trait TwinsHandlerState<TYPES, I, V>
750where
751    Self: std::fmt::Debug + Send + Sync,
752    TYPES: NodeType,
753    I: NodeImplementation<TYPES>,
754    V: Versions,
755{
756    /// Handle a message sent to the twin from the network task, forwarding it to one of the two twins.
757    async fn send_handler(
758        &mut self,
759        event: &HotShotEvent<TYPES>,
760    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
761
762    /// Handle a message from either twin, forwarding it to the network task
763    async fn recv_handler(
764        &mut self,
765        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
766    ) -> Vec<HotShotEvent<TYPES>>;
767
768    /// Fuse two channels into a single channel
769    ///
770    /// Note: the channels are fused using two async loops, whose `JoinHandle`s are dropped.
771    fn fuse_channels(
772        &'static mut self,
773        left: Channel<HotShotEvent<TYPES>>,
774        right: Channel<HotShotEvent<TYPES>>,
775    ) -> Channel<HotShotEvent<TYPES>> {
776        let send_state = Arc::new(RwLock::new(self));
777        let recv_state = Arc::clone(&send_state);
778
779        let (left_sender, mut left_receiver) = (left.0, left.1);
780        let (right_sender, mut right_receiver) = (right.0, right.1);
781
782        // channel to the network task
783        let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
784        // channel from the network task
785        let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
786            broadcast(EVENT_CHANNEL_SIZE);
787
788        let _recv_loop_handle = spawn(async move {
789            loop {
790                let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
791                    Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
792                    Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
793                };
794
795                let mut state = recv_state.write().await;
796                let mut result = state.recv_handler(&msg).await;
797
798                while let Some(event) = result.pop() {
799                    let _ = sender_to_network.broadcast(event.into()).await;
800                }
801            }
802        });
803
804        let _send_loop_handle = spawn(async move {
805            loop {
806                if let Ok(msg) = receiver_from_network.recv().await {
807                    let mut state = send_state.write().await;
808
809                    let mut result = state.send_handler(&msg).await;
810
811                    while let Some(event) = result.pop() {
812                        match event {
813                            Either::Left(msg) => {
814                                let _ = left_sender.broadcast(msg.into()).await;
815                            },
816                            Either::Right(msg) => {
817                                let _ = right_sender.broadcast(msg.into()).await;
818                            },
819                        }
820                    }
821                }
822            }
823        });
824
825        (network_task_sender, network_task_receiver)
826    }
827
828    #[allow(clippy::too_many_arguments)]
829    /// Spawn all tasks that operate on [`SystemContextHandle`].
830    ///
831    /// For a list of which tasks are being spawned, see this module's documentation.
832    async fn spawn_twin_handles(
833        &'static mut self,
834        public_key: TYPES::SignatureKey,
835        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
836        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
837        nonce: u64,
838        config: HotShotConfig<TYPES>,
839        memberships: EpochMembershipCoordinator<TYPES>,
840        network: Arc<I::Network>,
841        initializer: HotShotInitializer<TYPES>,
842        consensus_metrics: ConsensusMetricsValue,
843        storage: I::Storage,
844        storage_metrics: StorageMetricsValue,
845    ) -> (
846        SystemContextHandle<TYPES, I, V>,
847        SystemContextHandle<TYPES, I, V>,
848    ) {
849        let epoch_height = config.epoch_height;
850        let left_system_context = SystemContext::new(
851            public_key.clone(),
852            private_key.clone(),
853            state_private_key.clone(),
854            nonce,
855            config.clone(),
856            memberships.clone(),
857            Arc::clone(&network),
858            initializer.clone(),
859            consensus_metrics.clone(),
860            storage.clone(),
861            storage_metrics.clone(),
862        )
863        .await;
864        let right_system_context = SystemContext::new(
865            public_key,
866            private_key,
867            state_private_key,
868            nonce,
869            config,
870            memberships,
871            network,
872            initializer,
873            consensus_metrics,
874            storage,
875            storage_metrics,
876        )
877        .await;
878
879        // create registries for both handles
880        let left_consensus_registry = ConsensusTaskRegistry::new();
881        let left_network_registry = NetworkTaskRegistry::new();
882
883        let right_consensus_registry = ConsensusTaskRegistry::new();
884        let right_network_registry = NetworkTaskRegistry::new();
885
886        // create external channels for both handles
887        let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
888        let left_external_event_stream =
889            (left_external_sender, left_external_receiver.deactivate());
890
891        let (right_external_sender, right_external_receiver) =
892            broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
893        let right_external_event_stream =
894            (right_external_sender, right_external_receiver.deactivate());
895
896        // create internal channels for both handles
897        let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
898        let left_internal_event_stream = (
899            left_internal_sender.clone(),
900            left_internal_receiver.clone().deactivate(),
901        );
902
903        let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
904        let right_internal_event_stream = (
905            right_internal_sender.clone(),
906            right_internal_receiver.clone().deactivate(),
907        );
908
909        // create each handle
910        let mut left_handle = SystemContextHandle::<_, I, _> {
911            consensus_registry: left_consensus_registry,
912            network_registry: left_network_registry,
913            output_event_stream: left_external_event_stream.clone(),
914            internal_event_stream: left_internal_event_stream.clone(),
915            hotshot: Arc::clone(&left_system_context),
916            storage: left_system_context.storage.clone(),
917            network: Arc::clone(&left_system_context.network),
918            membership_coordinator: left_system_context.membership_coordinator.clone(),
919            epoch_height,
920        };
921
922        let mut right_handle = SystemContextHandle::<_, I, _> {
923            consensus_registry: right_consensus_registry,
924            network_registry: right_network_registry,
925            output_event_stream: right_external_event_stream.clone(),
926            internal_event_stream: right_internal_event_stream.clone(),
927            hotshot: Arc::clone(&right_system_context),
928            storage: right_system_context.storage.clone(),
929            network: Arc::clone(&right_system_context.network),
930            membership_coordinator: right_system_context.membership_coordinator.clone(),
931            epoch_height,
932        };
933
934        // add consensus tasks to each handle, using their individual internal event streams
935        add_consensus_tasks::<TYPES, I, V>(&mut left_handle).await;
936        add_consensus_tasks::<TYPES, I, V>(&mut right_handle).await;
937
938        // fuse the event streams from both handles before initializing the network tasks
939        let fused_internal_event_stream = self.fuse_channels(
940            (left_internal_sender, left_internal_receiver),
941            (right_internal_sender, right_internal_receiver),
942        );
943
944        // swap out the event stream on the left handle
945        left_handle.internal_event_stream = (
946            fused_internal_event_stream.0,
947            fused_internal_event_stream.1.deactivate(),
948        );
949
950        // add the network tasks to the left handle. note: because the left handle has the fused event stream, the network tasks on the left handle will handle messages from both handles.
951        add_network_tasks::<TYPES, I, V>(&mut left_handle).await;
952
953        // revert to the original event stream on the left handle, for any applications that want to listen to it
954        left_handle.internal_event_stream = left_internal_event_stream.clone();
955
956        (left_handle, right_handle)
957    }
958}
959
960#[derive(Debug)]
961/// A `TwinsHandlerState` that randomly forwards a message to either twin,
962/// and returns messages from both.
963pub struct RandomTwinsHandler;
964
965#[async_trait]
966impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
967    for RandomTwinsHandler
968{
969    async fn send_handler(
970        &mut self,
971        event: &HotShotEvent<TYPES>,
972    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
973        let random: bool = rand::thread_rng().gen();
974
975        #[allow(clippy::match_bool)]
976        match random {
977            true => vec![Either::Left(event.clone())],
978            false => vec![Either::Right(event.clone())],
979        }
980    }
981
982    async fn recv_handler(
983        &mut self,
984        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
985    ) -> Vec<HotShotEvent<TYPES>> {
986        match event {
987            Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
988        }
989    }
990}
991
992#[derive(Debug)]
993/// A `TwinsHandlerState` that forwards each message to both twins,
994/// and returns messages from each of them.
995pub struct DoubleTwinsHandler;
996
997#[async_trait]
998impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
999    for DoubleTwinsHandler
1000{
1001    async fn send_handler(
1002        &mut self,
1003        event: &HotShotEvent<TYPES>,
1004    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1005        vec![Either::Left(event.clone()), Either::Right(event.clone())]
1006    }
1007
1008    async fn recv_handler(
1009        &mut self,
1010        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1011    ) -> Vec<HotShotEvent<TYPES>> {
1012        match event {
1013            Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1014        }
1015    }
1016}
1017
1018#[async_trait]
1019impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusApi<TYPES, I>
1020    for SystemContextHandle<TYPES, I, V>
1021{
1022    fn total_nodes(&self) -> NonZeroUsize {
1023        self.hotshot.config.num_nodes_with_stake
1024    }
1025
1026    fn builder_timeout(&self) -> Duration {
1027        self.hotshot.config.builder_timeout
1028    }
1029
1030    async fn send_event(&self, event: Event<TYPES>) {
1031        debug!(?event, "send_event");
1032        broadcast_event(event, &self.hotshot.external_event_stream.0).await;
1033    }
1034
1035    fn public_key(&self) -> &TYPES::SignatureKey {
1036        &self.hotshot.public_key
1037    }
1038
1039    fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
1040        &self.hotshot.private_key
1041    }
1042
1043    fn state_private_key(
1044        &self,
1045    ) -> &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey {
1046        &self.hotshot.state_private_key
1047    }
1048}
1049
1050#[derive(Clone, Debug, PartialEq)]
1051pub struct InitializerEpochInfo<TYPES: NodeType> {
1052    pub epoch: TYPES::Epoch,
1053    pub drb_result: DrbResult,
1054    // pub stake_table: Option<StakeTable>, // TODO: Figure out how to connect this up
1055    pub block_header: Option<TYPES::BlockHeader>,
1056}
1057
1058#[derive(Clone, Debug)]
1059/// initializer struct for creating starting block
1060pub struct HotShotInitializer<TYPES: NodeType> {
1061    /// Instance-level state.
1062    pub instance_state: TYPES::InstanceState,
1063
1064    /// Epoch height
1065    pub epoch_height: u64,
1066
1067    /// Epoch start block
1068    pub epoch_start_block: u64,
1069
1070    /// the anchor leaf for the hotshot initializer
1071    pub anchor_leaf: Leaf2<TYPES>,
1072
1073    /// ValidatedState for the anchor leaf
1074    pub anchor_state: Arc<TYPES::ValidatedState>,
1075
1076    /// ValidatedState::Delta for the anchor leaf, optional.
1077    pub anchor_state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1078
1079    /// Starting view number that should be equivalent to the view the node shut down with last.
1080    pub start_view: TYPES::View,
1081
1082    /// The view we last performed an action in.  An action is proposing or voting for
1083    /// either the quorum or DA.
1084    pub last_actioned_view: TYPES::View,
1085
1086    /// Starting epoch number that should be equivalent to the epoch the node shut down with last.
1087    pub start_epoch: Option<TYPES::Epoch>,
1088
1089    /// Highest QC that was seen, for genesis it's the genesis QC.  It should be for a view greater
1090    /// than `inner`s view number for the non genesis case because we must have seen higher QCs
1091    /// to decide on the leaf.
1092    pub high_qc: QuorumCertificate2<TYPES>,
1093
1094    /// Next epoch highest QC that was seen. This is needed to propose during epoch transition after restart.
1095    pub next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
1096
1097    /// Proposals we have sent out to provide to others for catchup
1098    pub saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1099
1100    /// Previously decided upgrade certificate; this is necessary if an upgrade has happened and we are not restarting with the new version
1101    pub decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1102
1103    /// Undecided leaves that were seen, but not yet decided on.  These allow a restarting node
1104    /// to vote and propose right away if they didn't miss anything while down.
1105    pub undecided_leaves: BTreeMap<TYPES::View, Leaf2<TYPES>>,
1106
1107    /// Not yet decided state
1108    pub undecided_state: BTreeMap<TYPES::View, View<TYPES>>,
1109
1110    /// Saved VID shares
1111    pub saved_vid_shares: VidShares<TYPES>,
1112
1113    /// The last formed light client state update certificate if there's any
1114    pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1115
1116    /// Saved epoch information. This must be sorted ascending by epoch.
1117    pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1118}
1119
1120impl<TYPES: NodeType> HotShotInitializer<TYPES> {
1121    /// initialize from genesis
1122    /// # Errors
1123    /// If we are unable to apply the genesis block to the default state
1124    pub async fn from_genesis<V: Versions>(
1125        instance_state: TYPES::InstanceState,
1126        epoch_height: u64,
1127        epoch_start_block: u64,
1128        start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1129    ) -> Result<Self, HotShotError<TYPES>> {
1130        let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
1131        let high_qc = QuorumCertificate2::genesis::<V>(&validated_state, &instance_state).await;
1132
1133        Ok(Self {
1134            anchor_leaf: Leaf2::genesis::<V>(&validated_state, &instance_state).await,
1135            anchor_state: Arc::new(validated_state),
1136            anchor_state_delta: Some(Arc::new(state_delta)),
1137            start_view: TYPES::View::new(0),
1138            start_epoch: genesis_epoch_from_version::<V, TYPES>(),
1139            last_actioned_view: TYPES::View::new(0),
1140            saved_proposals: BTreeMap::new(),
1141            high_qc,
1142            next_epoch_high_qc: None,
1143            decided_upgrade_certificate: None,
1144            undecided_leaves: BTreeMap::new(),
1145            undecided_state: BTreeMap::new(),
1146            instance_state,
1147            saved_vid_shares: BTreeMap::new(),
1148            epoch_height,
1149            state_cert: None,
1150            epoch_start_block,
1151            start_epoch_info,
1152        })
1153    }
1154
1155    /// Use saved proposals to update undecided leaves and state
1156    #[must_use]
1157    pub fn update_undecided(self) -> Self {
1158        let mut undecided_leaves = self.undecided_leaves.clone();
1159        let mut undecided_state = self.undecided_state.clone();
1160
1161        for proposal in self.saved_proposals.values() {
1162            // skip proposals unless they're newer than the anchor leaf
1163            if proposal.data.view_number() <= self.anchor_leaf.view_number() {
1164                continue;
1165            }
1166
1167            undecided_leaves.insert(
1168                proposal.data.view_number(),
1169                Leaf2::from_quorum_proposal(&proposal.data),
1170            );
1171        }
1172
1173        for leaf in undecided_leaves.values() {
1174            let view_inner = ViewInner::Leaf {
1175                leaf: leaf.commit(),
1176                state: Arc::new(TYPES::ValidatedState::from_header(leaf.block_header())),
1177                delta: None,
1178                epoch: leaf.epoch(self.epoch_height),
1179            };
1180            let view = View { view_inner };
1181
1182            undecided_state.insert(leaf.view_number(), view);
1183        }
1184
1185        Self {
1186            undecided_leaves,
1187            undecided_state,
1188            ..self
1189        }
1190    }
1191
1192    /// Create a `HotShotInitializer` from the given information.
1193    ///
1194    /// This function uses the anchor leaf to set the initial validated state,
1195    /// and populates `undecided_leaves` and `undecided_state` using `saved_proposals`.
1196    ///
1197    /// If you are able to or would prefer to set these yourself,
1198    /// you should use the `HotShotInitializer` constructor directly.
1199    #[allow(clippy::too_many_arguments)]
1200    pub fn load(
1201        instance_state: TYPES::InstanceState,
1202        epoch_height: u64,
1203        epoch_start_block: u64,
1204        start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1205        anchor_leaf: Leaf2<TYPES>,
1206        (start_view, start_epoch): (TYPES::View, Option<TYPES::Epoch>),
1207        (high_qc, next_epoch_high_qc): (
1208            QuorumCertificate2<TYPES>,
1209            Option<NextEpochQuorumCertificate2<TYPES>>,
1210        ),
1211        last_actioned_view: TYPES::View,
1212        saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1213        saved_vid_shares: VidShares<TYPES>,
1214        decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1215        state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1216    ) -> Self {
1217        let anchor_state = Arc::new(TYPES::ValidatedState::from_header(
1218            anchor_leaf.block_header(),
1219        ));
1220        let anchor_state_delta = None;
1221
1222        let initializer = Self {
1223            instance_state,
1224            epoch_height,
1225            epoch_start_block,
1226            anchor_leaf,
1227            anchor_state,
1228            anchor_state_delta,
1229            high_qc,
1230            start_view,
1231            start_epoch,
1232            last_actioned_view,
1233            saved_proposals,
1234            saved_vid_shares,
1235            next_epoch_high_qc,
1236            decided_upgrade_certificate,
1237            undecided_leaves: BTreeMap::new(),
1238            undecided_state: BTreeMap::new(),
1239            state_cert,
1240            start_epoch_info,
1241        };
1242
1243        initializer.update_undecided()
1244    }
1245}
1246
1247async fn load_start_epoch_info<TYPES: NodeType>(
1248    membership: &Arc<RwLock<TYPES::Membership>>,
1249    start_epoch_info: &Vec<InitializerEpochInfo<TYPES>>,
1250    epoch_height: u64,
1251    epoch_start_block: u64,
1252) {
1253    let first_epoch_number =
1254        TYPES::Epoch::new(epoch_from_block_number(epoch_start_block, epoch_height));
1255
1256    tracing::warn!("Calling set_first_epoch for epoch {first_epoch_number}");
1257    membership
1258        .write()
1259        .await
1260        .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
1261
1262    let mut sorted_epoch_info = start_epoch_info.clone();
1263    sorted_epoch_info.sort_by_key(|info| info.epoch);
1264    for epoch_info in sorted_epoch_info {
1265        if let Some(block_header) = &epoch_info.block_header {
1266            tracing::info!("Calling add_epoch_root for epoch {}", epoch_info.epoch);
1267
1268            Membership::add_epoch_root(
1269                Arc::clone(membership),
1270                epoch_info.epoch,
1271                block_header.clone(),
1272            )
1273            .await
1274            .unwrap_or_else(|err| {
1275                // REVIEW NOTE: Should we panic here? a failure here seems like it should be fatal
1276                tracing::error!(
1277                    "Failed to add epoch root for epoch {}: {err}",
1278                    epoch_info.epoch
1279                );
1280            });
1281        }
1282    }
1283
1284    for epoch_info in start_epoch_info {
1285        tracing::info!("Calling add_drb_result for epoch {}", epoch_info.epoch);
1286        membership
1287            .write()
1288            .await
1289            .add_drb_result(epoch_info.epoch, epoch_info.drb_result);
1290    }
1291}