hotshot/
lib.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides a generic rust implementation of the `HotShot` BFT protocol
8//!
9
10// Documentation module
11#[cfg(feature = "docs")]
12pub mod documentation;
13use committable::Committable;
14use futures::future::{select, Either};
15use hotshot_types::{
16    drb::{drb_difficulty_selector, DrbResult, INITIAL_DRB_RESULT},
17    epoch_membership::EpochMembershipCoordinator,
18    message::UpgradeLock,
19    simple_certificate::LightClientStateUpdateCertificateV2,
20    traits::{
21        block_contents::BlockHeader, election::Membership, network::BroadcastDelay,
22        node_implementation::Versions, signature_key::StateSignatureKey, storage::Storage,
23    },
24    utils::epoch_from_block_number,
25};
26use rand::Rng;
27use vbs::version::StaticVersionType;
28
29/// Contains traits consumed by [`SystemContext`]
30pub mod traits;
31/// Contains types used by the crate
32pub mod types;
33
34pub mod tasks;
35use hotshot_types::data::QuorumProposalWrapper;
36
37/// Contains helper functions for the crate
38pub mod helpers;
39
40use std::{
41    collections::{BTreeMap, HashMap},
42    num::NonZeroUsize,
43    sync::Arc,
44    time::Duration,
45};
46
47use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
48use async_lock::RwLock;
49use async_trait::async_trait;
50use futures::join;
51use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
52use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
53// Internal
54/// Reexport error type
55pub use hotshot_types::error::HotShotError;
56use hotshot_types::{
57    consensus::{
58        Consensus, ConsensusMetricsValue, OuterConsensus, PayloadWithMetadata, VidShares, View,
59        ViewInner,
60    },
61    constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
62    data::Leaf2,
63    event::{EventType, LeafInfo},
64    message::{DataMessage, Message, MessageKind, Proposal},
65    simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
66    storage_metrics::StorageMetricsValue,
67    traits::{
68        consensus_api::ConsensusApi,
69        network::ConnectedNetwork,
70        node_implementation::{ConsensusTime, NodeType},
71        signature_key::SignatureKey,
72        states::ValidatedState,
73    },
74    utils::{genesis_epoch_from_version, option_epoch_from_block_number},
75    HotShotConfig,
76};
77/// Reexport rand crate
78pub use rand;
79use tokio::{spawn, time::sleep};
80use tracing::{debug, instrument, trace};
81
82// -- Rexports
83// External
84use crate::{
85    tasks::{add_consensus_tasks, add_network_tasks},
86    traits::NodeImplementation,
87    types::{Event, SystemContextHandle},
88};
89
90/// Length, in bytes, of a 512 bit hash
91pub const H_512: usize = 64;
92/// Length, in bytes, of a 256 bit hash
93pub const H_256: usize = 32;
94
95/// Holds the state needed to participate in `HotShot` consensus
96pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
97    /// The public key of this node
98    public_key: TYPES::SignatureKey,
99
100    /// The private key of this node
101    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
102
103    /// The private key to sign the light client state
104    state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106    /// Configuration items for this hotshot instance
107    pub config: HotShotConfig<TYPES>,
108
109    /// The underlying network
110    pub network: Arc<I::Network>,
111
112    /// Memberships used by consensus
113    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
114
115    /// the metrics that the implementor is using.
116    metrics: Arc<ConsensusMetricsValue>,
117
118    /// The hotstuff implementation
119    consensus: OuterConsensus<TYPES>,
120
121    /// Immutable instance state
122    instance_state: Arc<TYPES::InstanceState>,
123
124    /// The view to enter when first starting consensus
125    start_view: TYPES::View,
126
127    /// The epoch to enter when first starting consensus
128    start_epoch: Option<TYPES::Epoch>,
129
130    /// Access to the output event stream.
131    output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
132
133    /// External event stream for communication with the application.
134    pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
135
136    /// Anchored leaf provided by the initializer.
137    anchored_leaf: Leaf2<TYPES>,
138
139    /// access to the internal event stream, in case we need to, say, shut something down
140    #[allow(clippy::type_complexity)]
141    internal_event_stream: (
142        Sender<Arc<HotShotEvent<TYPES>>>,
143        InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
144    ),
145
146    /// uid for instrumentation
147    pub id: u64,
148
149    /// Reference to the internal storage for consensus datum.
150    pub storage: I::Storage,
151
152    /// Storage metrics
153    pub storage_metrics: Arc<StorageMetricsValue>,
154
155    /// shared lock for upgrade information
156    pub upgrade_lock: UpgradeLock<TYPES, V>,
157}
158impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Clone
159    for SystemContext<TYPES, I, V>
160{
161    #![allow(deprecated)]
162    fn clone(&self) -> Self {
163        Self {
164            public_key: self.public_key.clone(),
165            private_key: self.private_key.clone(),
166            state_private_key: self.state_private_key.clone(),
167            config: self.config.clone(),
168            network: Arc::clone(&self.network),
169            membership_coordinator: self.membership_coordinator.clone(),
170            metrics: Arc::clone(&self.metrics),
171            consensus: self.consensus.clone(),
172            instance_state: Arc::clone(&self.instance_state),
173            start_view: self.start_view,
174            start_epoch: self.start_epoch,
175            output_event_stream: self.output_event_stream.clone(),
176            external_event_stream: self.external_event_stream.clone(),
177            anchored_leaf: self.anchored_leaf.clone(),
178            internal_event_stream: self.internal_event_stream.clone(),
179            id: self.id,
180            storage: self.storage.clone(),
181            storage_metrics: Arc::clone(&self.storage_metrics),
182            upgrade_lock: self.upgrade_lock.clone(),
183        }
184    }
185}
186
187impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
188    #![allow(deprecated)]
189    /// Creates a new [`Arc<SystemContext>`] with the given configuration options.
190    ///
191    /// To do a full initialization, use `fn init` instead, which will set up background tasks as
192    /// well.
193    ///
194    /// Use this instead of `init` if you want to start the tasks manually
195    ///
196    /// # Panics
197    ///
198    /// Panics if storage migration fails.
199    #[allow(clippy::too_many_arguments)]
200    pub async fn new(
201        public_key: TYPES::SignatureKey,
202        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
203        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
204        nonce: u64,
205        config: HotShotConfig<TYPES>,
206        memberships: EpochMembershipCoordinator<TYPES>,
207        network: Arc<I::Network>,
208        initializer: HotShotInitializer<TYPES>,
209        consensus_metrics: ConsensusMetricsValue,
210        storage: I::Storage,
211        storage_metrics: StorageMetricsValue,
212    ) -> Arc<Self> {
213        let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
214        let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
215
216        Self::new_from_channels(
217            public_key,
218            private_key,
219            state_private_key,
220            nonce,
221            config,
222            memberships,
223            network,
224            initializer,
225            consensus_metrics,
226            storage,
227            storage_metrics,
228            internal_chan,
229            external_chan,
230        )
231        .await
232    }
233
234    /// Creates a new [`Arc<SystemContext>`] with the given configuration options.
235    ///
236    /// To do a full initialization, use `fn init` instead, which will set up background tasks as
237    /// well.
238    ///
239    /// Use this function if you want to use some preexisting channels and to spin up the tasks
240    /// and start consensus manually.  Mostly useful for tests
241    #[allow(clippy::too_many_arguments, clippy::type_complexity)]
242    pub async fn new_from_channels(
243        public_key: TYPES::SignatureKey,
244        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
245        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
246        nonce: u64,
247        config: HotShotConfig<TYPES>,
248        membership_coordinator: EpochMembershipCoordinator<TYPES>,
249        network: Arc<I::Network>,
250        initializer: HotShotInitializer<TYPES>,
251        consensus_metrics: ConsensusMetricsValue,
252        storage: I::Storage,
253        storage_metrics: StorageMetricsValue,
254        internal_channel: (
255            Sender<Arc<HotShotEvent<TYPES>>>,
256            Receiver<Arc<HotShotEvent<TYPES>>>,
257        ),
258        external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
259    ) -> Arc<Self> {
260        debug!("Creating a new hotshot");
261
262        tracing::warn!("Starting consensus with HotShotConfig:\n\n {config:?}");
263
264        let consensus_metrics = Arc::new(consensus_metrics);
265        let storage_metrics = Arc::new(storage_metrics);
266        let anchored_leaf = initializer.anchor_leaf;
267        let instance_state = initializer.instance_state;
268
269        let (internal_tx, mut internal_rx) = internal_channel;
270        let (mut external_tx, mut external_rx) = external_channel;
271
272        tracing::warn!(
273            "Starting consensus with versions:\n\n Base: {:?}\nUpgrade: {:?}.",
274            V::Base::VERSION,
275            V::Upgrade::VERSION,
276        );
277        tracing::warn!(
278            "Loading previously decided upgrade certificate from storage: {:?}",
279            initializer.decided_upgrade_certificate
280        );
281
282        let upgrade_lock =
283            UpgradeLock::<TYPES, V>::from_certificate(&initializer.decided_upgrade_certificate);
284
285        let current_version = if let Some(cert) = initializer.decided_upgrade_certificate {
286            cert.data.new_version
287        } else {
288            V::Base::VERSION
289        };
290
291        debug!("Setting DRB difficulty selector in membership");
292        let drb_difficulty_selector = drb_difficulty_selector(upgrade_lock.clone(), &config);
293
294        membership_coordinator
295            .set_drb_difficulty_selector(drb_difficulty_selector)
296            .await;
297
298        for da_committee in &config.da_committees {
299            if current_version >= da_committee.start_version {
300                membership_coordinator
301                    .membership()
302                    .write()
303                    .await
304                    .add_da_committee(da_committee.start_epoch, da_committee.committee.clone());
305            }
306        }
307
308        // Allow overflow on the external channel, otherwise sending to it may block.
309        external_rx.set_overflow(true);
310
311        // Allow overflow on the internal channel as well. We don't want to block consensus if we
312        // have a slow receiver
313        internal_rx.set_overflow(true);
314
315        // Get the validated state from the initializer or construct an incomplete one from the
316        // block header.
317        let validated_state = initializer.anchor_state;
318
319        load_start_epoch_info(
320            membership_coordinator.membership(),
321            &initializer.start_epoch_info,
322            config.epoch_height,
323            config.epoch_start_block,
324        )
325        .await;
326
327        // #3967 REVIEW NOTE: Should this actually be Some()? How do we know?
328        let epoch = initializer.high_qc.data.block_number.map(|block_number| {
329            TYPES::Epoch::new(epoch_from_block_number(
330                block_number + 1,
331                config.epoch_height,
332            ))
333        });
334
335        // Insert the validated state to state map.
336        let mut validated_state_map = BTreeMap::default();
337        validated_state_map.insert(
338            anchored_leaf.view_number(),
339            View {
340                view_inner: ViewInner::Leaf {
341                    leaf: anchored_leaf.commit(),
342                    state: Arc::clone(&validated_state),
343                    delta: initializer.anchor_state_delta,
344                    epoch,
345                },
346            },
347        );
348        for (view_num, inner) in initializer.undecided_state {
349            validated_state_map.insert(view_num, inner);
350        }
351
352        let mut saved_leaves = HashMap::new();
353        let mut saved_payloads = BTreeMap::new();
354        saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
355
356        for (_, leaf) in initializer.undecided_leaves {
357            saved_leaves.insert(leaf.commit(), leaf.clone());
358        }
359        if let Some(payload) = anchored_leaf.block_payload() {
360            let metadata = anchored_leaf.block_header().metadata().clone();
361            saved_payloads.insert(
362                anchored_leaf.view_number(),
363                Arc::new(PayloadWithMetadata { payload, metadata }),
364            );
365        }
366
367        let consensus = Consensus::new(
368            validated_state_map,
369            Some(initializer.saved_vid_shares),
370            anchored_leaf.view_number(),
371            epoch,
372            anchored_leaf.view_number(),
373            anchored_leaf.view_number(),
374            initializer.last_actioned_view,
375            initializer.saved_proposals,
376            saved_leaves,
377            saved_payloads,
378            initializer.high_qc,
379            initializer.next_epoch_high_qc,
380            Arc::clone(&consensus_metrics),
381            config.epoch_height,
382            initializer.state_cert,
383            config.drb_difficulty,
384            config.drb_upgrade_difficulty,
385        );
386
387        let consensus = Arc::new(RwLock::new(consensus));
388
389        if let Some(epoch) = epoch {
390            // trigger catchup for the current and next epoch if needed
391            let _ = membership_coordinator
392                .membership_for_epoch(Some(epoch))
393                .await;
394            let _ = membership_coordinator
395                .membership_for_epoch(Some(epoch + 1))
396                .await;
397
398            if let Ok(drb_result) = storage.load_drb_result(epoch + 1).await {
399                tracing::error!("Writing DRB result for epoch {}", epoch + 1);
400                if let Ok(mem) = membership_coordinator
401                    .stake_table_for_epoch(Some(epoch + 1))
402                    .await
403                {
404                    mem.add_drb_result(drb_result).await;
405                }
406            }
407        }
408
409        // This makes it so we won't block on broadcasting if there is not a receiver
410        // Our own copy of the receiver is inactive so it doesn't count.
411        external_tx.set_await_active(false);
412
413        let inner: Arc<SystemContext<TYPES, I, V>> = Arc::new(SystemContext {
414            id: nonce,
415            consensus: OuterConsensus::new(consensus),
416            instance_state: Arc::new(instance_state),
417            public_key,
418            private_key,
419            state_private_key,
420            config,
421            start_view: initializer.start_view,
422            start_epoch: initializer.start_epoch,
423            network,
424            membership_coordinator,
425            metrics: Arc::clone(&consensus_metrics),
426            internal_event_stream: (internal_tx, internal_rx.deactivate()),
427            output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
428            external_event_stream: (external_tx, external_rx.deactivate()),
429            anchored_leaf: anchored_leaf.clone(),
430            storage,
431            storage_metrics,
432            upgrade_lock,
433        });
434
435        inner
436    }
437
438    /// "Starts" consensus by sending a `Qc2Formed`, `ViewChange` events
439    ///
440    /// # Panics
441    /// Panics if sending genesis fails
442    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
443    pub async fn start_consensus(&self) {
444        #[cfg(all(feature = "rewind", not(debug_assertions)))]
445        compile_error!("Cannot run rewind in production builds!");
446
447        debug!("Starting Consensus");
448        let consensus = self.consensus.read().await;
449
450        let first_epoch = option_epoch_from_block_number::<TYPES>(
451            V::Base::VERSION >= V::Epochs::VERSION,
452            self.config.epoch_start_block,
453            self.config.epoch_height,
454        );
455        // `start_epoch` comes from the initializer, it might be the last seen epoch before restart
456        // `first_epoch` is the first epoch after the transition to the epoch version
457        // `initial_view_change_epoch` is the greater of the two, we use it with the initial view change
458        let initial_view_change_epoch = self.start_epoch.max(first_epoch);
459        #[allow(clippy::panic)]
460        self.internal_event_stream
461            .0
462            .broadcast_direct(Arc::new(HotShotEvent::ViewChange(
463                self.start_view,
464                initial_view_change_epoch,
465            )))
466            .await
467            .unwrap_or_else(|_| {
468                panic!(
469                    "Genesis Broadcast failed; event = ViewChange({:?}, {:?})",
470                    self.start_view, initial_view_change_epoch,
471                )
472            });
473
474        // Clone the event stream that we send the timeout event to
475        let event_stream = self.internal_event_stream.0.clone();
476        let next_view_timeout = self.config.next_view_timeout;
477        let start_view = self.start_view;
478        let start_epoch = self.start_epoch;
479
480        // Spawn a task that will sleep for the next view timeout and then send a timeout event
481        // if not cancelled
482        spawn({
483            async move {
484                sleep(Duration::from_millis(next_view_timeout)).await;
485                broadcast_event(
486                    Arc::new(HotShotEvent::Timeout(start_view, start_epoch)),
487                    &event_stream,
488                )
489                .await;
490            }
491        });
492        #[allow(clippy::panic)]
493        self.internal_event_stream
494            .0
495            .broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
496                consensus.high_qc().clone(),
497            ))))
498            .await
499            .unwrap_or_else(|_| {
500                panic!(
501                    "Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
502                    consensus.high_qc()
503                )
504            });
505
506        {
507            // Some applications seem to expect a leaf decide event for the genesis leaf,
508            // which contains only that leaf and nothing else.
509            if self.anchored_leaf.view_number() == TYPES::View::genesis() {
510                let (validated_state, state_delta) =
511                    TYPES::ValidatedState::genesis(&self.instance_state);
512
513                let qc = Arc::new(
514                    QuorumCertificate2::genesis::<V>(
515                        &validated_state,
516                        self.instance_state.as_ref(),
517                    )
518                    .await,
519                );
520
521                broadcast_event(
522                    Event {
523                        view_number: self.anchored_leaf.view_number(),
524                        event: EventType::Decide {
525                            leaf_chain: Arc::new(vec![LeafInfo::new(
526                                self.anchored_leaf.clone(),
527                                Arc::new(validated_state),
528                                Some(Arc::new(state_delta)),
529                                None,
530                                None,
531                            )]),
532                            qc,
533                            block_size: None,
534                        },
535                    },
536                    &self.external_event_stream.0,
537                )
538                .await;
539            }
540        }
541    }
542
543    /// Emit an external event
544    async fn send_external_event(&self, event: Event<TYPES>) {
545        debug!(?event, "send_external_event");
546        broadcast_event(event, &self.external_event_stream.0).await;
547    }
548
549    /// Publishes a transaction asynchronously to the network.
550    ///
551    /// # Errors
552    ///
553    /// Always returns Ok; does not return an error if the transaction couldn't be published to the network
554    #[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
555    pub async fn publish_transaction_async(
556        &self,
557        transaction: TYPES::Transaction,
558    ) -> Result<(), HotShotError<TYPES>> {
559        trace!("Adding transaction to our own queue");
560
561        let api = self.clone();
562
563        let consensus_reader = api.consensus.read().await;
564        let view_number = consensus_reader.cur_view();
565        let epoch = consensus_reader.cur_epoch();
566        drop(consensus_reader);
567
568        // Wrap up a message
569        let message_kind: DataMessage<TYPES> =
570            DataMessage::SubmitTransaction(transaction.clone(), view_number);
571        let message = Message {
572            sender: api.public_key.clone(),
573            kind: MessageKind::from(message_kind),
574        };
575
576        let serialized_message = self.upgrade_lock.serialize(&message).await.map_err(|err| {
577            HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
578        })?;
579
580        let membership = match api.membership_coordinator.membership_for_epoch(epoch).await {
581            Ok(m) => m,
582            Err(e) => return Err(HotShotError::InvalidState(e.message)),
583        };
584
585        spawn(async move {
586            let memberships_da_committee_members = membership
587                .da_committee_members(view_number)
588                .await
589                .iter()
590                .cloned()
591                .collect();
592
593            join! {
594                // TODO We should have a function that can return a network error if there is one
595                // but first we'd need to ensure our network implementations can support that
596                // (and not hang instead)
597
598                // version <0, 1> currently fixed; this is the same as VERSION_0_1,
599                // and will be updated to be part of SystemContext. I wanted to use associated
600                // constants in NodeType, but that seems to be unavailable in the current Rust.
601                api
602                    .network.da_broadcast_message(
603                        serialized_message,
604                        memberships_da_committee_members,
605                        BroadcastDelay::None,
606                    ),
607                api
608                    .send_external_event(Event {
609                        view_number,
610                        event: EventType::Transactions {
611                            transactions: vec![transaction],
612                        },
613                    }),
614            }
615        });
616        Ok(())
617    }
618
619    /// Returns a copy of the consensus struct
620    #[must_use]
621    pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
622        Arc::clone(&self.consensus.inner_consensus)
623    }
624
625    /// Returns a copy of the instance state
626    pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
627        Arc::clone(&self.instance_state)
628    }
629
630    /// Returns a copy of the last decided leaf
631    /// # Panics
632    /// Panics if internal leaf for consensus is inconsistent
633    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
634    pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
635        self.consensus.read().await.decided_leaf()
636    }
637
638    /// [Non-blocking] instantly returns a copy of the last decided leaf if
639    /// it is available to be read. If not, we return `None`.
640    ///
641    /// # Panics
642    /// Panics if internal state for consensus is inconsistent
643    #[must_use]
644    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
645    pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
646        self.consensus.try_read().map(|guard| guard.decided_leaf())
647    }
648
649    /// Returns the last decided validated state.
650    ///
651    /// # Panics
652    /// Panics if internal state for consensus is inconsistent
653    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
654    pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
655        Arc::clone(&self.consensus.read().await.decided_state())
656    }
657
658    /// Get the validated state from a given `view`.
659    ///
660    /// Returns the requested state, if the [`SystemContext`] is tracking this view. Consensus
661    /// tracks views that have not yet been decided but could be in the future. This function may
662    /// return [`None`] if the requested view has already been decided (but see
663    /// [`decided_state`](Self::decided_state)) or if there is no path for the requested
664    /// view to ever be decided.
665    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
666    pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
667        self.consensus.read().await.state(view).cloned()
668    }
669
670    /// Initializes a new [`SystemContext`] and does the work of setting up all the background tasks
671    ///
672    /// Assumes networking implementation is already primed.
673    ///
674    /// Underlying `HotShot` instance starts out paused, and must be unpaused
675    ///
676    /// Upon encountering an unrecoverable error, such as a failure to send to a broadcast channel,
677    /// the `HotShot` instance will log the error and shut down.
678    ///
679    /// To construct a [`SystemContext`] without setting up tasks, use `fn new` instead.
680    /// # Errors
681    ///
682    /// Can throw an error if `Self::new` fails.
683    #[allow(clippy::too_many_arguments)]
684    pub async fn init(
685        public_key: TYPES::SignatureKey,
686        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
687        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
688        node_id: u64,
689        config: HotShotConfig<TYPES>,
690        memberships: EpochMembershipCoordinator<TYPES>,
691        network: Arc<I::Network>,
692        initializer: HotShotInitializer<TYPES>,
693        consensus_metrics: ConsensusMetricsValue,
694        storage: I::Storage,
695        storage_metrics: StorageMetricsValue,
696    ) -> Result<
697        (
698            SystemContextHandle<TYPES, I, V>,
699            Sender<Arc<HotShotEvent<TYPES>>>,
700            Receiver<Arc<HotShotEvent<TYPES>>>,
701        ),
702        HotShotError<TYPES>,
703    > {
704        let hotshot = Self::new(
705            public_key,
706            private_key,
707            state_private_key,
708            node_id,
709            config,
710            memberships,
711            network,
712            initializer,
713            consensus_metrics,
714            storage,
715            storage_metrics,
716        )
717        .await;
718        let handle = Arc::clone(&hotshot).run_tasks().await;
719        let (tx, rx) = hotshot.internal_event_stream.clone();
720
721        Ok((handle, tx, rx.activate()))
722    }
723    /// return the timeout for a view for `self`
724    #[must_use]
725    pub fn next_view_timeout(&self) -> u64 {
726        self.config.next_view_timeout
727    }
728}
729
730impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
731    /// Spawn all tasks that operate on [`SystemContextHandle`].
732    ///
733    /// For a list of which tasks are being spawned, see this module's documentation.
734    pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I, V> {
735        let consensus_registry = ConsensusTaskRegistry::new();
736        let network_registry = NetworkTaskRegistry::new();
737
738        let output_event_stream = self.external_event_stream.clone();
739        let internal_event_stream = self.internal_event_stream.clone();
740
741        let mut handle = SystemContextHandle {
742            consensus_registry,
743            network_registry,
744            output_event_stream: output_event_stream.clone(),
745            internal_event_stream: internal_event_stream.clone(),
746            hotshot: self.clone().into(),
747            storage: self.storage.clone(),
748            network: Arc::clone(&self.network),
749            membership_coordinator: self.membership_coordinator.clone(),
750            epoch_height: self.config.epoch_height,
751        };
752
753        add_network_tasks::<TYPES, I, V>(&mut handle).await;
754        add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
755
756        handle
757    }
758}
759
760/// An async broadcast channel
761type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
762
763#[async_trait]
764/// Trait for handling messages for a node with a twin copy of consensus
765pub trait TwinsHandlerState<TYPES, I, V>
766where
767    Self: std::fmt::Debug + Send + Sync,
768    TYPES: NodeType,
769    I: NodeImplementation<TYPES>,
770    V: Versions,
771{
772    /// Handle a message sent to the twin from the network task, forwarding it to one of the two twins.
773    async fn send_handler(
774        &mut self,
775        event: &HotShotEvent<TYPES>,
776    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
777
778    /// Handle a message from either twin, forwarding it to the network task
779    async fn recv_handler(
780        &mut self,
781        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
782    ) -> Vec<HotShotEvent<TYPES>>;
783
784    /// Fuse two channels into a single channel
785    ///
786    /// Note: the channels are fused using two async loops, whose `JoinHandle`s are dropped.
787    fn fuse_channels(
788        &'static mut self,
789        left: Channel<HotShotEvent<TYPES>>,
790        right: Channel<HotShotEvent<TYPES>>,
791    ) -> Channel<HotShotEvent<TYPES>> {
792        let send_state = Arc::new(RwLock::new(self));
793        let recv_state = Arc::clone(&send_state);
794
795        let (left_sender, mut left_receiver) = (left.0, left.1);
796        let (right_sender, mut right_receiver) = (right.0, right.1);
797
798        // channel to the network task
799        let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
800        // channel from the network task
801        let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
802            broadcast(EVENT_CHANNEL_SIZE);
803
804        let _recv_loop_handle = spawn(async move {
805            loop {
806                let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
807                    Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
808                    Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
809                };
810
811                let mut state = recv_state.write().await;
812                let mut result = state.recv_handler(&msg).await;
813
814                while let Some(event) = result.pop() {
815                    let _ = sender_to_network.broadcast(event.into()).await;
816                }
817            }
818        });
819
820        let _send_loop_handle = spawn(async move {
821            loop {
822                if let Ok(msg) = receiver_from_network.recv().await {
823                    let mut state = send_state.write().await;
824
825                    let mut result = state.send_handler(&msg).await;
826
827                    while let Some(event) = result.pop() {
828                        match event {
829                            Either::Left(msg) => {
830                                let _ = left_sender.broadcast(msg.into()).await;
831                            },
832                            Either::Right(msg) => {
833                                let _ = right_sender.broadcast(msg.into()).await;
834                            },
835                        }
836                    }
837                }
838            }
839        });
840
841        (network_task_sender, network_task_receiver)
842    }
843
844    #[allow(clippy::too_many_arguments)]
845    /// Spawn all tasks that operate on [`SystemContextHandle`].
846    ///
847    /// For a list of which tasks are being spawned, see this module's documentation.
848    async fn spawn_twin_handles(
849        &'static mut self,
850        public_key: TYPES::SignatureKey,
851        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
852        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
853        nonce: u64,
854        config: HotShotConfig<TYPES>,
855        memberships: EpochMembershipCoordinator<TYPES>,
856        network: Arc<I::Network>,
857        initializer: HotShotInitializer<TYPES>,
858        consensus_metrics: ConsensusMetricsValue,
859        storage: I::Storage,
860        storage_metrics: StorageMetricsValue,
861    ) -> (
862        SystemContextHandle<TYPES, I, V>,
863        SystemContextHandle<TYPES, I, V>,
864    ) {
865        let epoch_height = config.epoch_height;
866        let left_system_context = SystemContext::new(
867            public_key.clone(),
868            private_key.clone(),
869            state_private_key.clone(),
870            nonce,
871            config.clone(),
872            memberships.clone(),
873            Arc::clone(&network),
874            initializer.clone(),
875            consensus_metrics.clone(),
876            storage.clone(),
877            storage_metrics.clone(),
878        )
879        .await;
880        let right_system_context = SystemContext::new(
881            public_key,
882            private_key,
883            state_private_key,
884            nonce,
885            config,
886            memberships,
887            network,
888            initializer,
889            consensus_metrics,
890            storage,
891            storage_metrics,
892        )
893        .await;
894
895        // create registries for both handles
896        let left_consensus_registry = ConsensusTaskRegistry::new();
897        let left_network_registry = NetworkTaskRegistry::new();
898
899        let right_consensus_registry = ConsensusTaskRegistry::new();
900        let right_network_registry = NetworkTaskRegistry::new();
901
902        // create external channels for both handles
903        let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
904        let left_external_event_stream =
905            (left_external_sender, left_external_receiver.deactivate());
906
907        let (right_external_sender, right_external_receiver) =
908            broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
909        let right_external_event_stream =
910            (right_external_sender, right_external_receiver.deactivate());
911
912        // create internal channels for both handles
913        let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
914        let left_internal_event_stream = (
915            left_internal_sender.clone(),
916            left_internal_receiver.clone().deactivate(),
917        );
918
919        let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
920        let right_internal_event_stream = (
921            right_internal_sender.clone(),
922            right_internal_receiver.clone().deactivate(),
923        );
924
925        // create each handle
926        let mut left_handle = SystemContextHandle::<_, I, _> {
927            consensus_registry: left_consensus_registry,
928            network_registry: left_network_registry,
929            output_event_stream: left_external_event_stream.clone(),
930            internal_event_stream: left_internal_event_stream.clone(),
931            hotshot: Arc::clone(&left_system_context),
932            storage: left_system_context.storage.clone(),
933            network: Arc::clone(&left_system_context.network),
934            membership_coordinator: left_system_context.membership_coordinator.clone(),
935            epoch_height,
936        };
937
938        let mut right_handle = SystemContextHandle::<_, I, _> {
939            consensus_registry: right_consensus_registry,
940            network_registry: right_network_registry,
941            output_event_stream: right_external_event_stream.clone(),
942            internal_event_stream: right_internal_event_stream.clone(),
943            hotshot: Arc::clone(&right_system_context),
944            storage: right_system_context.storage.clone(),
945            network: Arc::clone(&right_system_context.network),
946            membership_coordinator: right_system_context.membership_coordinator.clone(),
947            epoch_height,
948        };
949
950        // add consensus tasks to each handle, using their individual internal event streams
951        add_consensus_tasks::<TYPES, I, V>(&mut left_handle).await;
952        add_consensus_tasks::<TYPES, I, V>(&mut right_handle).await;
953
954        // fuse the event streams from both handles before initializing the network tasks
955        let fused_internal_event_stream = self.fuse_channels(
956            (left_internal_sender, left_internal_receiver),
957            (right_internal_sender, right_internal_receiver),
958        );
959
960        // swap out the event stream on the left handle
961        left_handle.internal_event_stream = (
962            fused_internal_event_stream.0,
963            fused_internal_event_stream.1.deactivate(),
964        );
965
966        // add the network tasks to the left handle. note: because the left handle has the fused event stream, the network tasks on the left handle will handle messages from both handles.
967        add_network_tasks::<TYPES, I, V>(&mut left_handle).await;
968
969        // revert to the original event stream on the left handle, for any applications that want to listen to it
970        left_handle.internal_event_stream = left_internal_event_stream.clone();
971
972        (left_handle, right_handle)
973    }
974}
975
976#[derive(Debug)]
977/// A `TwinsHandlerState` that randomly forwards a message to either twin,
978/// and returns messages from both.
979pub struct RandomTwinsHandler;
980
981#[async_trait]
982impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
983    for RandomTwinsHandler
984{
985    async fn send_handler(
986        &mut self,
987        event: &HotShotEvent<TYPES>,
988    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
989        let random: bool = rand::thread_rng().gen();
990
991        #[allow(clippy::match_bool)]
992        match random {
993            true => vec![Either::Left(event.clone())],
994            false => vec![Either::Right(event.clone())],
995        }
996    }
997
998    async fn recv_handler(
999        &mut self,
1000        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1001    ) -> Vec<HotShotEvent<TYPES>> {
1002        match event {
1003            Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1004        }
1005    }
1006}
1007
1008#[derive(Debug)]
1009/// A `TwinsHandlerState` that forwards each message to both twins,
1010/// and returns messages from each of them.
1011pub struct DoubleTwinsHandler;
1012
1013#[async_trait]
1014impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
1015    for DoubleTwinsHandler
1016{
1017    async fn send_handler(
1018        &mut self,
1019        event: &HotShotEvent<TYPES>,
1020    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1021        vec![Either::Left(event.clone()), Either::Right(event.clone())]
1022    }
1023
1024    async fn recv_handler(
1025        &mut self,
1026        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1027    ) -> Vec<HotShotEvent<TYPES>> {
1028        match event {
1029            Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1030        }
1031    }
1032}
1033
1034#[async_trait]
1035impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusApi<TYPES, I>
1036    for SystemContextHandle<TYPES, I, V>
1037{
1038    fn total_nodes(&self) -> NonZeroUsize {
1039        self.hotshot.config.num_nodes_with_stake
1040    }
1041
1042    fn builder_timeout(&self) -> Duration {
1043        self.hotshot.config.builder_timeout
1044    }
1045
1046    async fn send_event(&self, event: Event<TYPES>) {
1047        debug!(?event, "send_event");
1048        broadcast_event(event, &self.hotshot.external_event_stream.0).await;
1049    }
1050
1051    fn public_key(&self) -> &TYPES::SignatureKey {
1052        &self.hotshot.public_key
1053    }
1054
1055    fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
1056        &self.hotshot.private_key
1057    }
1058
1059    fn state_private_key(
1060        &self,
1061    ) -> &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey {
1062        &self.hotshot.state_private_key
1063    }
1064}
1065
1066#[derive(Clone, Debug, PartialEq)]
1067pub struct InitializerEpochInfo<TYPES: NodeType> {
1068    pub epoch: TYPES::Epoch,
1069    pub drb_result: DrbResult,
1070    // pub stake_table: Option<StakeTable>, // TODO: Figure out how to connect this up
1071    pub block_header: Option<TYPES::BlockHeader>,
1072}
1073
1074#[derive(Clone, Debug)]
1075/// initializer struct for creating starting block
1076pub struct HotShotInitializer<TYPES: NodeType> {
1077    /// Instance-level state.
1078    pub instance_state: TYPES::InstanceState,
1079
1080    /// Epoch height
1081    pub epoch_height: u64,
1082
1083    /// Epoch start block
1084    pub epoch_start_block: u64,
1085
1086    /// the anchor leaf for the hotshot initializer
1087    pub anchor_leaf: Leaf2<TYPES>,
1088
1089    /// ValidatedState for the anchor leaf
1090    pub anchor_state: Arc<TYPES::ValidatedState>,
1091
1092    /// ValidatedState::Delta for the anchor leaf, optional.
1093    pub anchor_state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1094
1095    /// Starting view number that should be equivalent to the view the node shut down with last.
1096    pub start_view: TYPES::View,
1097
1098    /// The view we last performed an action in.  An action is proposing or voting for
1099    /// either the quorum or DA.
1100    pub last_actioned_view: TYPES::View,
1101
1102    /// Starting epoch number that should be equivalent to the epoch the node shut down with last.
1103    pub start_epoch: Option<TYPES::Epoch>,
1104
1105    /// Highest QC that was seen, for genesis it's the genesis QC.  It should be for a view greater
1106    /// than `inner`s view number for the non genesis case because we must have seen higher QCs
1107    /// to decide on the leaf.
1108    pub high_qc: QuorumCertificate2<TYPES>,
1109
1110    /// Next epoch highest QC that was seen. This is needed to propose during epoch transition after restart.
1111    pub next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
1112
1113    /// Proposals we have sent out to provide to others for catchup
1114    pub saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1115
1116    /// Previously decided upgrade certificate; this is necessary if an upgrade has happened and we are not restarting with the new version
1117    pub decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1118
1119    /// Undecided leaves that were seen, but not yet decided on.  These allow a restarting node
1120    /// to vote and propose right away if they didn't miss anything while down.
1121    pub undecided_leaves: BTreeMap<TYPES::View, Leaf2<TYPES>>,
1122
1123    /// Not yet decided state
1124    pub undecided_state: BTreeMap<TYPES::View, View<TYPES>>,
1125
1126    /// Saved VID shares
1127    pub saved_vid_shares: VidShares<TYPES>,
1128
1129    /// The last formed light client state update certificate if there's any
1130    pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1131
1132    /// Saved epoch information. This must be sorted ascending by epoch.
1133    pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1134}
1135
1136impl<TYPES: NodeType> HotShotInitializer<TYPES> {
1137    /// initialize from genesis
1138    /// # Errors
1139    /// If we are unable to apply the genesis block to the default state
1140    pub async fn from_genesis<V: Versions>(
1141        instance_state: TYPES::InstanceState,
1142        epoch_height: u64,
1143        epoch_start_block: u64,
1144        start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1145    ) -> Result<Self, HotShotError<TYPES>> {
1146        let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
1147        let high_qc = QuorumCertificate2::genesis::<V>(&validated_state, &instance_state).await;
1148
1149        Ok(Self {
1150            anchor_leaf: Leaf2::genesis::<V>(&validated_state, &instance_state).await,
1151            anchor_state: Arc::new(validated_state),
1152            anchor_state_delta: Some(Arc::new(state_delta)),
1153            start_view: TYPES::View::new(0),
1154            start_epoch: genesis_epoch_from_version::<V, TYPES>(),
1155            last_actioned_view: TYPES::View::new(0),
1156            saved_proposals: BTreeMap::new(),
1157            high_qc,
1158            next_epoch_high_qc: None,
1159            decided_upgrade_certificate: None,
1160            undecided_leaves: BTreeMap::new(),
1161            undecided_state: BTreeMap::new(),
1162            instance_state,
1163            saved_vid_shares: BTreeMap::new(),
1164            epoch_height,
1165            state_cert: None,
1166            epoch_start_block,
1167            start_epoch_info,
1168        })
1169    }
1170
1171    /// Use saved proposals to update undecided leaves and state
1172    #[must_use]
1173    pub fn update_undecided(self) -> Self {
1174        let mut undecided_leaves = self.undecided_leaves.clone();
1175        let mut undecided_state = self.undecided_state.clone();
1176
1177        for proposal in self.saved_proposals.values() {
1178            // skip proposals unless they're newer than the anchor leaf
1179            if proposal.data.view_number() <= self.anchor_leaf.view_number() {
1180                continue;
1181            }
1182
1183            undecided_leaves.insert(
1184                proposal.data.view_number(),
1185                Leaf2::from_quorum_proposal(&proposal.data),
1186            );
1187        }
1188
1189        for leaf in undecided_leaves.values() {
1190            let view_inner = ViewInner::Leaf {
1191                leaf: leaf.commit(),
1192                state: Arc::new(TYPES::ValidatedState::from_header(leaf.block_header())),
1193                delta: None,
1194                epoch: leaf.epoch(self.epoch_height),
1195            };
1196            let view = View { view_inner };
1197
1198            undecided_state.insert(leaf.view_number(), view);
1199        }
1200
1201        Self {
1202            undecided_leaves,
1203            undecided_state,
1204            ..self
1205        }
1206    }
1207
1208    /// Create a `HotShotInitializer` from the given information.
1209    ///
1210    /// This function uses the anchor leaf to set the initial validated state,
1211    /// and populates `undecided_leaves` and `undecided_state` using `saved_proposals`.
1212    ///
1213    /// If you are able to or would prefer to set these yourself,
1214    /// you should use the `HotShotInitializer` constructor directly.
1215    #[allow(clippy::too_many_arguments)]
1216    pub fn load(
1217        instance_state: TYPES::InstanceState,
1218        epoch_height: u64,
1219        epoch_start_block: u64,
1220        start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1221        anchor_leaf: Leaf2<TYPES>,
1222        (start_view, start_epoch): (TYPES::View, Option<TYPES::Epoch>),
1223        (high_qc, next_epoch_high_qc): (
1224            QuorumCertificate2<TYPES>,
1225            Option<NextEpochQuorumCertificate2<TYPES>>,
1226        ),
1227        last_actioned_view: TYPES::View,
1228        saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1229        saved_vid_shares: VidShares<TYPES>,
1230        decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1231        state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1232    ) -> Self {
1233        let anchor_state = Arc::new(TYPES::ValidatedState::from_header(
1234            anchor_leaf.block_header(),
1235        ));
1236        let anchor_state_delta = None;
1237
1238        let initializer = Self {
1239            instance_state,
1240            epoch_height,
1241            epoch_start_block,
1242            anchor_leaf,
1243            anchor_state,
1244            anchor_state_delta,
1245            high_qc,
1246            start_view,
1247            start_epoch,
1248            last_actioned_view,
1249            saved_proposals,
1250            saved_vid_shares,
1251            next_epoch_high_qc,
1252            decided_upgrade_certificate,
1253            undecided_leaves: BTreeMap::new(),
1254            undecided_state: BTreeMap::new(),
1255            state_cert,
1256            start_epoch_info,
1257        };
1258
1259        initializer.update_undecided()
1260    }
1261}
1262
1263async fn load_start_epoch_info<TYPES: NodeType>(
1264    membership: &Arc<RwLock<TYPES::Membership>>,
1265    start_epoch_info: &Vec<InitializerEpochInfo<TYPES>>,
1266    epoch_height: u64,
1267    epoch_start_block: u64,
1268) {
1269    let first_epoch_number =
1270        TYPES::Epoch::new(epoch_from_block_number(epoch_start_block, epoch_height));
1271
1272    tracing::warn!("Calling set_first_epoch for epoch {first_epoch_number}");
1273    membership
1274        .write()
1275        .await
1276        .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
1277
1278    let mut sorted_epoch_info = start_epoch_info.clone();
1279    sorted_epoch_info.sort_by_key(|info| info.epoch);
1280    for epoch_info in sorted_epoch_info {
1281        if let Some(block_header) = &epoch_info.block_header {
1282            tracing::info!("Calling add_epoch_root for epoch {}", epoch_info.epoch);
1283
1284            Membership::add_epoch_root(
1285                Arc::clone(membership),
1286                epoch_info.epoch,
1287                block_header.clone(),
1288            )
1289            .await
1290            .unwrap_or_else(|err| {
1291                // REVIEW NOTE: Should we panic here? a failure here seems like it should be fatal
1292                tracing::error!(
1293                    "Failed to add epoch root for epoch {}: {err}",
1294                    epoch_info.epoch
1295                );
1296            });
1297        }
1298    }
1299
1300    for epoch_info in start_epoch_info {
1301        tracing::info!("Calling add_drb_result for epoch {}", epoch_info.epoch);
1302        membership
1303            .write()
1304            .await
1305            .add_drb_result(epoch_info.epoch, epoch_info.drb_result);
1306    }
1307}