hotshot/
lib.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides a generic rust implementation of the `HotShot` BFT protocol
8//!
9
10// Documentation module
11#[cfg(feature = "docs")]
12pub mod documentation;
13
14use committable::Committable;
15use futures::future::{Either, select};
16use hotshot_types::{
17    drb::{DrbResult, INITIAL_DRB_RESULT, drb_difficulty_selector},
18    epoch_membership::EpochMembershipCoordinator,
19    message::UpgradeLock,
20    simple_certificate::{CertificatePair, LightClientStateUpdateCertificateV2},
21    traits::{
22        block_contents::BlockHeader, election::Membership, network::BroadcastDelay,
23        signature_key::StateSignatureKey, storage::Storage,
24    },
25    utils::{epoch_from_block_number, is_ge_epoch_root},
26};
27use rand::Rng;
28
29/// Contains traits consumed by [`SystemContext`]
30pub mod traits;
31/// Contains types used by the crate
32pub mod types;
33
34pub mod tasks;
35use hotshot_types::data::QuorumProposalWrapper;
36use versions::{EPOCH_VERSION, Upgrade};
37
38/// Contains helper functions for the crate
39pub mod helpers;
40
41use std::{
42    collections::{BTreeMap, HashMap},
43    num::NonZeroUsize,
44    sync::Arc,
45    time::Duration,
46};
47
48use alloy::primitives::U256;
49use async_broadcast::{InactiveReceiver, Receiver, Sender, broadcast};
50use async_lock::RwLock;
51use async_trait::async_trait;
52use futures::join;
53use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
54use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
55// Internal
56/// Reexport error type
57pub use hotshot_types::error::HotShotError;
58use hotshot_types::{
59    HotShotConfig,
60    consensus::{
61        Consensus, ConsensusMetricsValue, OuterConsensus, PayloadWithMetadata, VidShares, View,
62        ViewInner,
63    },
64    constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
65    data::{EpochNumber, Leaf2, ViewNumber},
66    event::{EventType, LeafInfo},
67    message::{DataMessage, Message, MessageKind, Proposal},
68    simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
69    stake_table::HSStakeTable,
70    storage_metrics::StorageMetricsValue,
71    traits::{
72        consensus_api::ConsensusApi, network::ConnectedNetwork, node_implementation::NodeType,
73        signature_key::SignatureKey, states::ValidatedState,
74    },
75    utils::{genesis_epoch_from_version, option_epoch_from_block_number},
76};
77use hotshot_utils::warn;
78/// Reexport rand crate
79pub use rand;
80use tokio::{spawn, time::sleep};
81use tracing::{debug, instrument, trace};
82
83// -- Rexports
84// External
85use crate::{
86    tasks::{add_consensus_tasks, add_network_tasks},
87    traits::NodeImplementation,
88    types::{Event, SystemContextHandle},
89};
90
91/// Length, in bytes, of a 512 bit hash
92pub const H_512: usize = 64;
93/// Length, in bytes, of a 256 bit hash
94pub const H_256: usize = 32;
95
96/// Holds the state needed to participate in `HotShot` consensus
97pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>> {
98    /// The public key of this node
99    public_key: TYPES::SignatureKey,
100
101    /// The private key of this node
102    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
103
104    /// The private key to sign the light client state
105    state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
106
107    /// Configuration items for this hotshot instance
108    pub config: HotShotConfig<TYPES>,
109
110    /// The underlying network
111    pub network: Arc<I::Network>,
112
113    /// Memberships used by consensus
114    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
115
116    /// the metrics that the implementor is using.
117    metrics: Arc<ConsensusMetricsValue>,
118
119    /// The hotstuff implementation
120    consensus: OuterConsensus<TYPES>,
121
122    /// Immutable instance state
123    instance_state: Arc<TYPES::InstanceState>,
124
125    /// The view to enter when first starting consensus
126    start_view: ViewNumber,
127
128    /// The epoch to enter when first starting consensus
129    start_epoch: Option<EpochNumber>,
130
131    /// Access to the output event stream.
132    output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
133
134    /// External event stream for communication with the application.
135    pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
136
137    /// Anchored leaf provided by the initializer.
138    anchored_leaf: Leaf2<TYPES>,
139
140    /// access to the internal event stream, in case we need to, say, shut something down
141    #[allow(clippy::type_complexity)]
142    internal_event_stream: (
143        Sender<Arc<HotShotEvent<TYPES>>>,
144        InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
145    ),
146
147    /// uid for instrumentation
148    pub id: u64,
149
150    /// Reference to the internal storage for consensus datum.
151    pub storage: I::Storage,
152
153    /// Storage metrics
154    pub storage_metrics: Arc<StorageMetricsValue>,
155
156    /// shared lock for upgrade information
157    pub upgrade_lock: UpgradeLock<TYPES>,
158}
159impl<TYPES: NodeType, I: NodeImplementation<TYPES>> Clone for SystemContext<TYPES, I> {
160    #![allow(deprecated)]
161    fn clone(&self) -> Self {
162        Self {
163            public_key: self.public_key.clone(),
164            private_key: self.private_key.clone(),
165            state_private_key: self.state_private_key.clone(),
166            config: self.config.clone(),
167            network: Arc::clone(&self.network),
168            membership_coordinator: self.membership_coordinator.clone(),
169            metrics: Arc::clone(&self.metrics),
170            consensus: self.consensus.clone(),
171            instance_state: Arc::clone(&self.instance_state),
172            start_view: self.start_view,
173            start_epoch: self.start_epoch,
174            output_event_stream: self.output_event_stream.clone(),
175            external_event_stream: self.external_event_stream.clone(),
176            anchored_leaf: self.anchored_leaf.clone(),
177            internal_event_stream: self.internal_event_stream.clone(),
178            id: self.id,
179            storage: self.storage.clone(),
180            storage_metrics: Arc::clone(&self.storage_metrics),
181            upgrade_lock: self.upgrade_lock.clone(),
182        }
183    }
184}
185
186impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
187    #![allow(deprecated)]
188    /// Creates a new [`Arc<SystemContext>`] with the given configuration options.
189    ///
190    /// To do a full initialization, use `fn init` instead, which will set up background tasks as
191    /// well.
192    ///
193    /// Use this instead of `init` if you want to start the tasks manually
194    ///
195    /// # Panics
196    ///
197    /// Panics if storage migration fails.
198    #[allow(clippy::too_many_arguments)]
199    pub async fn new(
200        public_key: TYPES::SignatureKey,
201        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
202        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
203        nonce: u64,
204        config: HotShotConfig<TYPES>,
205        upgrade: versions::Upgrade,
206        memberships: EpochMembershipCoordinator<TYPES>,
207        network: Arc<I::Network>,
208        initializer: HotShotInitializer<TYPES>,
209        consensus_metrics: ConsensusMetricsValue,
210        storage: I::Storage,
211        storage_metrics: StorageMetricsValue,
212    ) -> Arc<Self> {
213        let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
214        let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
215
216        Self::new_from_channels(
217            public_key,
218            private_key,
219            state_private_key,
220            nonce,
221            config,
222            upgrade,
223            memberships,
224            network,
225            initializer,
226            consensus_metrics,
227            storage,
228            storage_metrics,
229            internal_chan,
230            external_chan,
231        )
232        .await
233    }
234
235    /// Creates a new [`Arc<SystemContext>`] with the given configuration options.
236    ///
237    /// To do a full initialization, use `fn init` instead, which will set up background tasks as
238    /// well.
239    ///
240    /// Use this function if you want to use some preexisting channels and to spin up the tasks
241    /// and start consensus manually.  Mostly useful for tests
242    #[allow(clippy::too_many_arguments, clippy::type_complexity)]
243    pub async fn new_from_channels(
244        public_key: TYPES::SignatureKey,
245        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
246        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
247        nonce: u64,
248        config: HotShotConfig<TYPES>,
249        upgrade: versions::Upgrade,
250        mut membership_coordinator: EpochMembershipCoordinator<TYPES>,
251        network: Arc<I::Network>,
252        initializer: HotShotInitializer<TYPES>,
253        consensus_metrics: ConsensusMetricsValue,
254        storage: I::Storage,
255        storage_metrics: StorageMetricsValue,
256        internal_channel: (
257            Sender<Arc<HotShotEvent<TYPES>>>,
258            Receiver<Arc<HotShotEvent<TYPES>>>,
259        ),
260        external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
261    ) -> Arc<Self> {
262        debug!("Creating a new hotshot");
263
264        tracing::warn!("Starting consensus with HotShotConfig:\n\n {config:?}");
265
266        let consensus_metrics = Arc::new(consensus_metrics);
267        let storage_metrics = Arc::new(storage_metrics);
268        let anchored_leaf = initializer.anchor_leaf;
269        let instance_state = initializer.instance_state;
270
271        let (internal_tx, internal_rx) = internal_channel;
272        let (mut external_tx, external_rx) = external_channel;
273
274        let mut internal_rx = internal_rx.new_receiver();
275
276        let mut external_rx = external_rx.new_receiver();
277
278        // Allow overflow on the internal channel as well. We don't want to block consensus if we
279        // have a slow receiver
280        internal_rx.set_overflow(true);
281        // Allow overflow on the external channel, otherwise sending to it may block.
282        external_rx.set_overflow(true);
283
284        membership_coordinator
285            .set_external_channel(external_rx.clone())
286            .await;
287
288        tracing::warn!(
289            "Starting consensus with versions:\n\n Base: {:?}\nUpgrade: {:?}.",
290            upgrade.base,
291            upgrade.target
292        );
293        tracing::warn!(
294            "Loading previously decided upgrade certificate from storage: {:?}",
295            initializer.decided_upgrade_certificate
296        );
297
298        let upgrade_lock = UpgradeLock::<TYPES>::from_certificate(
299            upgrade,
300            &initializer.decided_upgrade_certificate,
301        );
302
303        let current_version = if let Some(cert) = initializer.decided_upgrade_certificate {
304            cert.data.new_version
305        } else {
306            upgrade.base
307        };
308
309        debug!("Setting DRB difficulty selector in membership");
310        let drb_difficulty_selector = drb_difficulty_selector(&config);
311
312        membership_coordinator
313            .set_drb_difficulty_selector(drb_difficulty_selector)
314            .await;
315
316        for da_committee in &config.da_committees {
317            if current_version >= da_committee.start_version {
318                membership_coordinator
319                    .membership()
320                    .write()
321                    .await
322                    .add_da_committee(da_committee.start_epoch, da_committee.committee.clone());
323            }
324        }
325
326        // Get the validated state from the initializer or construct an incomplete one from the
327        // block header.
328        let validated_state = initializer.anchor_state;
329
330        load_start_epoch_info(
331            membership_coordinator.membership(),
332            &initializer.start_epoch_info,
333            config.epoch_height,
334            config.epoch_start_block,
335        )
336        .await;
337
338        // #3967 REVIEW NOTE: Should this actually be Some()? How do we know?
339        let epoch = initializer.high_qc.data.block_number.map(|block_number| {
340            EpochNumber::new(epoch_from_block_number(
341                block_number + 1,
342                config.epoch_height,
343            ))
344        });
345
346        // Insert the validated state to state map.
347        let mut validated_state_map = BTreeMap::default();
348        validated_state_map.insert(
349            anchored_leaf.view_number(),
350            View {
351                view_inner: ViewInner::Leaf {
352                    leaf: anchored_leaf.commit(),
353                    state: Arc::clone(&validated_state),
354                    delta: initializer.anchor_state_delta,
355                    epoch,
356                },
357            },
358        );
359        for (view_num, inner) in initializer.undecided_state {
360            validated_state_map.insert(view_num, inner);
361        }
362
363        let mut saved_leaves = HashMap::new();
364        let mut saved_payloads = BTreeMap::new();
365        saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
366
367        for (_, leaf) in initializer.undecided_leaves {
368            saved_leaves.insert(leaf.commit(), leaf.clone());
369        }
370        if let Some(payload) = anchored_leaf.block_payload() {
371            let metadata = anchored_leaf.block_header().metadata().clone();
372            saved_payloads.insert(
373                anchored_leaf.view_number(),
374                Arc::new(PayloadWithMetadata { payload, metadata }),
375            );
376        }
377        let high_qc_block_number = initializer.high_qc.data.block_number;
378        let (stake_table, success_threshold) = if let Ok(epoch_membership) =
379            membership_coordinator.stake_table_for_epoch(epoch).await
380        {
381            (
382                epoch_membership.stake_table().await,
383                epoch_membership.success_threshold().await,
384            )
385        } else {
386            tracing::warn!(
387                "Failed to get stake table for epoch {:?} while creating vote participation",
388                epoch
389            );
390            (HSStakeTable::default(), U256::MAX)
391        };
392
393        let consensus = Consensus::new(
394            validated_state_map,
395            Some(initializer.saved_vid_shares),
396            anchored_leaf.view_number(),
397            epoch,
398            anchored_leaf.view_number(),
399            anchored_leaf.view_number(),
400            initializer.last_actioned_view,
401            initializer.saved_proposals,
402            saved_leaves,
403            saved_payloads,
404            initializer.high_qc,
405            initializer.next_epoch_high_qc,
406            Arc::clone(&consensus_metrics),
407            config.epoch_height,
408            initializer.state_cert,
409            config.drb_difficulty,
410            config.drb_upgrade_difficulty,
411            stake_table,
412            success_threshold,
413        );
414
415        let consensus = Arc::new(RwLock::new(consensus));
416
417        if let Some(epoch) = epoch {
418            tracing::info!(
419                "Triggering catchup for epoch {} and next epoch {}",
420                epoch,
421                epoch + 1
422            );
423            // trigger catchup for the current and next epoch if needed
424            let _ = membership_coordinator
425                .membership_for_epoch(Some(epoch))
426                .await;
427            let _ = membership_coordinator
428                .membership_for_epoch(Some(epoch + 1))
429                .await;
430            // If we already have an epoch root, we can trigger catchup for the epoch
431            // which that root applies to.
432            if let Some(high_qc_block_number) = high_qc_block_number
433                && is_ge_epoch_root(high_qc_block_number, config.epoch_height)
434            {
435                let _ = membership_coordinator
436                    .stake_table_for_epoch(Some(epoch + 2))
437                    .await;
438            }
439
440            if let Ok(drb_result) = storage.load_drb_result(epoch + 1).await {
441                tracing::error!("Writing DRB result for epoch {}", epoch + 1);
442                if let Ok(mem) = membership_coordinator
443                    .stake_table_for_epoch(Some(epoch + 1))
444                    .await
445                {
446                    mem.add_drb_result(drb_result).await;
447                }
448            }
449        }
450
451        // This makes it so we won't block on broadcasting if there is not a receiver
452        // Our own copy of the receiver is inactive so it doesn't count.
453        external_tx.set_await_active(false);
454
455        let inner: Arc<SystemContext<TYPES, I>> = Arc::new(SystemContext {
456            id: nonce,
457            consensus: OuterConsensus::new(consensus),
458            instance_state: Arc::new(instance_state),
459            public_key,
460            private_key,
461            state_private_key,
462            config,
463            start_view: initializer.start_view,
464            start_epoch: initializer.start_epoch,
465            network,
466            membership_coordinator,
467            metrics: Arc::clone(&consensus_metrics),
468            internal_event_stream: (internal_tx, internal_rx.deactivate()),
469            output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
470            external_event_stream: (external_tx, external_rx.deactivate()),
471            anchored_leaf: anchored_leaf.clone(),
472            storage,
473            storage_metrics,
474            upgrade_lock,
475        });
476
477        inner
478    }
479
480    /// "Starts" consensus by sending a `Qc2Formed`, `ViewChange` events
481    ///
482    /// # Panics
483    /// Panics if sending genesis fails
484    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
485    pub async fn start_consensus(&self) {
486        #[cfg(all(feature = "rewind", not(debug_assertions)))]
487        compile_error!("Cannot run rewind in production builds!");
488
489        debug!("Starting Consensus");
490        let consensus = self.consensus.read().await;
491
492        let first_epoch = option_epoch_from_block_number(
493            self.upgrade_lock.upgrade().base >= EPOCH_VERSION,
494            self.config.epoch_start_block,
495            self.config.epoch_height,
496        );
497        // `start_epoch` comes from the initializer, it might be the last seen epoch before restart
498        // `first_epoch` is the first epoch after the transition to the epoch version
499        // `initial_view_change_epoch` is the greater of the two, we use it with the initial view change
500        let initial_view_change_epoch = self.start_epoch.max(first_epoch);
501        #[allow(clippy::panic)]
502        self.internal_event_stream
503            .0
504            .broadcast_direct(Arc::new(HotShotEvent::ViewChange(
505                self.start_view,
506                initial_view_change_epoch,
507            )))
508            .await
509            .unwrap_or_else(|_| {
510                panic!(
511                    "Genesis Broadcast failed; event = ViewChange({:?}, {:?})",
512                    self.start_view, initial_view_change_epoch,
513                )
514            });
515
516        // Clone the event stream that we send the timeout event to
517        let event_stream = self.internal_event_stream.0.clone();
518        let next_view_timeout = self.config.next_view_timeout;
519        let start_view = self.start_view;
520        let start_epoch = self.start_epoch;
521
522        // Spawn a task that will sleep for the next view timeout and then send a timeout event
523        // if not cancelled
524        spawn({
525            async move {
526                sleep(Duration::from_millis(next_view_timeout)).await;
527                broadcast_event(
528                    Arc::new(HotShotEvent::Timeout(start_view, start_epoch)),
529                    &event_stream,
530                )
531                .await;
532            }
533        });
534        #[allow(clippy::panic)]
535        self.internal_event_stream
536            .0
537            .broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
538                consensus.high_qc().clone(),
539            ))))
540            .await
541            .unwrap_or_else(|_| {
542                panic!(
543                    "Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
544                    consensus.high_qc()
545                )
546            });
547
548        {
549            // Some applications seem to expect a leaf decide event for the genesis leaf,
550            // which contains only that leaf and nothing else.
551            if self.anchored_leaf.view_number() == ViewNumber::genesis() {
552                let (validated_state, state_delta) =
553                    TYPES::ValidatedState::genesis(&self.instance_state);
554
555                let qc = QuorumCertificate2::genesis(
556                    &validated_state,
557                    self.instance_state.as_ref(),
558                    self.upgrade_lock.upgrade(),
559                )
560                .await;
561
562                broadcast_event(
563                    Event {
564                        view_number: self.anchored_leaf.view_number(),
565                        event: EventType::Decide {
566                            leaf_chain: Arc::new(vec![LeafInfo::new(
567                                self.anchored_leaf.clone(),
568                                Arc::new(validated_state),
569                                Some(Arc::new(state_delta)),
570                                None,
571                                None,
572                            )]),
573                            committing_qc: Arc::new(CertificatePair::non_epoch_change(qc)),
574                            deciding_qc: None,
575                            block_size: None,
576                        },
577                    },
578                    &self.external_event_stream.0,
579                )
580                .await;
581            }
582        }
583    }
584
585    /// Emit an external event
586    async fn send_external_event(&self, event: Event<TYPES>) {
587        debug!(?event, "send_external_event");
588        broadcast_event(event, &self.external_event_stream.0).await;
589    }
590
591    /// Publishes a transaction asynchronously to the network.
592    ///
593    /// # Errors
594    ///
595    /// Always returns Ok; does not return an error if the transaction couldn't be published to the network
596    #[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
597    pub async fn publish_transaction_async(
598        &self,
599        transaction: TYPES::Transaction,
600    ) -> Result<(), HotShotError<TYPES>> {
601        trace!("Adding transaction to our own queue");
602
603        let api = self.clone();
604
605        let consensus_reader = api.consensus.read().await;
606        let view_number = consensus_reader.cur_view();
607        let epoch = consensus_reader.cur_epoch();
608        drop(consensus_reader);
609
610        // Wrap up a message
611        let message_kind: DataMessage<TYPES> =
612            DataMessage::SubmitTransaction(transaction.clone(), view_number);
613        let message = Message {
614            sender: api.public_key.clone(),
615            kind: MessageKind::from(message_kind),
616        };
617
618        let serialized_message = self.upgrade_lock.serialize(&message).map_err(|err| {
619            HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
620        })?;
621
622        let membership = match api.membership_coordinator.membership_for_epoch(epoch).await {
623            Ok(m) => m,
624            Err(e) => return Err(HotShotError::InvalidState(e.message)),
625        };
626
627        spawn(async move {
628            let memberships_da_committee_members = membership
629                .da_committee_members(view_number)
630                .await
631                .iter()
632                .cloned()
633                .collect();
634
635            join! {
636                // TODO We should have a function that can return a network error if there is one
637                // but first we'd need to ensure our network implementations can support that
638                // (and not hang instead)
639
640                // version <0, 1> currently fixed; this is the same as VERSION_0_1,
641                // and will be updated to be part of SystemContext. I wanted to use associated
642                // constants in NodeType, but that seems to be unavailable in the current Rust.
643                api
644                    .network.da_broadcast_message(
645                        view_number.u64().into(),
646                        serialized_message,
647                        memberships_da_committee_members,
648                        BroadcastDelay::None,
649                    ),
650                api
651                    .send_external_event(Event {
652                        view_number,
653                        event: EventType::Transactions {
654                            transactions: vec![transaction],
655                        },
656                    }),
657            }
658        });
659        Ok(())
660    }
661
662    /// Returns a copy of the consensus struct
663    #[must_use]
664    pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
665        Arc::clone(&self.consensus.inner_consensus)
666    }
667
668    /// Returns a copy of the instance state
669    pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
670        Arc::clone(&self.instance_state)
671    }
672
673    /// Returns a copy of the last decided leaf
674    /// # Panics
675    /// Panics if internal leaf for consensus is inconsistent
676    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
677    pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
678        self.consensus.read().await.decided_leaf()
679    }
680
681    /// [Non-blocking] instantly returns a copy of the last decided leaf if
682    /// it is available to be read. If not, we return `None`.
683    ///
684    /// # Panics
685    /// Panics if internal state for consensus is inconsistent
686    #[must_use]
687    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
688    pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
689        self.consensus.try_read().map(|guard| guard.decided_leaf())
690    }
691
692    /// Returns the last decided validated state.
693    ///
694    /// # Panics
695    /// Panics if internal state for consensus is inconsistent
696    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
697    pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
698        Arc::clone(&self.consensus.read().await.decided_state())
699    }
700
701    /// Get the validated state from a given `view`.
702    ///
703    /// Returns the requested state, if the [`SystemContext`] is tracking this view. Consensus
704    /// tracks views that have not yet been decided but could be in the future. This function may
705    /// return [`None`] if the requested view has already been decided (but see
706    /// [`decided_state`](Self::decided_state)) or if there is no path for the requested
707    /// view to ever be decided.
708    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
709    pub async fn state(&self, view: ViewNumber) -> Option<Arc<TYPES::ValidatedState>> {
710        self.consensus.read().await.state(view).cloned()
711    }
712
713    /// Initializes a new [`SystemContext`] and does the work of setting up all the background tasks
714    ///
715    /// Assumes networking implementation is already primed.
716    ///
717    /// Underlying `HotShot` instance starts out paused, and must be unpaused
718    ///
719    /// Upon encountering an unrecoverable error, such as a failure to send to a broadcast channel,
720    /// the `HotShot` instance will log the error and shut down.
721    ///
722    /// To construct a [`SystemContext`] without setting up tasks, use `fn new` instead.
723    /// # Errors
724    ///
725    /// Can throw an error if `Self::new` fails.
726    #[allow(clippy::too_many_arguments)]
727    pub async fn init(
728        public_key: TYPES::SignatureKey,
729        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
730        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
731        node_id: u64,
732        config: HotShotConfig<TYPES>,
733        upgrade: versions::Upgrade,
734        memberships: EpochMembershipCoordinator<TYPES>,
735        network: Arc<I::Network>,
736        initializer: HotShotInitializer<TYPES>,
737        consensus_metrics: ConsensusMetricsValue,
738        storage: I::Storage,
739        storage_metrics: StorageMetricsValue,
740    ) -> Result<
741        (
742            SystemContextHandle<TYPES, I>,
743            Sender<Arc<HotShotEvent<TYPES>>>,
744            Receiver<Arc<HotShotEvent<TYPES>>>,
745        ),
746        HotShotError<TYPES>,
747    > {
748        let hotshot = Self::new(
749            public_key,
750            private_key,
751            state_private_key,
752            node_id,
753            config,
754            upgrade,
755            memberships,
756            network,
757            initializer,
758            consensus_metrics,
759            storage,
760            storage_metrics,
761        )
762        .await;
763        let handle = Arc::clone(&hotshot).run_tasks().await;
764        let (tx, rx) = hotshot.internal_event_stream.clone();
765
766        Ok((handle, tx, rx.activate()))
767    }
768    /// return the timeout for a view for `self`
769    #[must_use]
770    pub fn next_view_timeout(&self) -> u64 {
771        self.config.next_view_timeout
772    }
773}
774
775impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
776    /// Spawn all tasks that operate on [`SystemContextHandle`].
777    ///
778    /// For a list of which tasks are being spawned, see this module's documentation.
779    pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I> {
780        let consensus_registry = ConsensusTaskRegistry::new();
781        let network_registry = NetworkTaskRegistry::new();
782
783        let output_event_stream = self.external_event_stream.clone();
784        let internal_event_stream = self.internal_event_stream.clone();
785
786        let mut handle = SystemContextHandle {
787            consensus_registry,
788            network_registry,
789            output_event_stream: output_event_stream.clone(),
790            internal_event_stream: internal_event_stream.clone(),
791            hotshot: self.clone().into(),
792            storage: self.storage.clone(),
793            network: Arc::clone(&self.network),
794            membership_coordinator: self.membership_coordinator.clone(),
795            epoch_height: self.config.epoch_height,
796        };
797
798        add_network_tasks::<TYPES, I>(&mut handle).await;
799        add_consensus_tasks::<TYPES, I>(&mut handle).await;
800
801        handle
802    }
803}
804
805/// An async broadcast channel
806type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
807
808/// Trait for handling messages for a node with a twin copy of consensus
809#[async_trait]
810pub trait TwinsHandlerState<TYPES, I>
811where
812    Self: std::fmt::Debug + Send + Sync,
813    TYPES: NodeType,
814    I: NodeImplementation<TYPES>,
815{
816    /// Handle a message sent to the twin from the network task, forwarding it to one of the two twins.
817    async fn send_handler(
818        &mut self,
819        event: &HotShotEvent<TYPES>,
820    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
821
822    /// Handle a message from either twin, forwarding it to the network task
823    async fn recv_handler(
824        &mut self,
825        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
826    ) -> Vec<HotShotEvent<TYPES>>;
827
828    /// Fuse two channels into a single channel
829    ///
830    /// Note: the channels are fused using two async loops, whose `JoinHandle`s are dropped.
831    fn fuse_channels(
832        &'static mut self,
833        left: Channel<HotShotEvent<TYPES>>,
834        right: Channel<HotShotEvent<TYPES>>,
835    ) -> Channel<HotShotEvent<TYPES>> {
836        let send_state = Arc::new(RwLock::new(self));
837        let recv_state = Arc::clone(&send_state);
838
839        let (left_sender, mut left_receiver) = (left.0, left.1);
840        let (right_sender, mut right_receiver) = (right.0, right.1);
841
842        // channel to the network task
843        let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
844        // channel from the network task
845        let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
846            broadcast(EVENT_CHANNEL_SIZE);
847
848        let _recv_loop_handle = spawn(async move {
849            loop {
850                let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
851                    Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
852                    Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
853                };
854
855                let mut state = recv_state.write().await;
856                let mut result = state.recv_handler(&msg).await;
857
858                while let Some(event) = result.pop() {
859                    let _ = sender_to_network.broadcast(event.into()).await;
860                }
861            }
862        });
863
864        let _send_loop_handle = spawn(async move {
865            loop {
866                if let Ok(msg) = receiver_from_network.recv().await {
867                    let mut state = send_state.write().await;
868
869                    let mut result = state.send_handler(&msg).await;
870
871                    while let Some(event) = result.pop() {
872                        match event {
873                            Either::Left(msg) => {
874                                let _ = left_sender.broadcast(msg.into()).await;
875                            },
876                            Either::Right(msg) => {
877                                let _ = right_sender.broadcast(msg.into()).await;
878                            },
879                        }
880                    }
881                }
882            }
883        });
884
885        (network_task_sender, network_task_receiver)
886    }
887
888    #[allow(clippy::too_many_arguments)]
889    /// Spawn all tasks that operate on [`SystemContextHandle`].
890    ///
891    /// For a list of which tasks are being spawned, see this module's documentation.
892    async fn spawn_twin_handles(
893        &'static mut self,
894        public_key: TYPES::SignatureKey,
895        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
896        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
897        nonce: u64,
898        config: HotShotConfig<TYPES>,
899        upgrade: versions::Upgrade,
900        memberships: EpochMembershipCoordinator<TYPES>,
901        network: Arc<I::Network>,
902        initializer: HotShotInitializer<TYPES>,
903        consensus_metrics: ConsensusMetricsValue,
904        storage: I::Storage,
905        storage_metrics: StorageMetricsValue,
906    ) -> (SystemContextHandle<TYPES, I>, SystemContextHandle<TYPES, I>) {
907        let epoch_height = config.epoch_height;
908        let left_system_context = SystemContext::new(
909            public_key.clone(),
910            private_key.clone(),
911            state_private_key.clone(),
912            nonce,
913            config.clone(),
914            upgrade,
915            memberships.clone(),
916            Arc::clone(&network),
917            initializer.clone(),
918            consensus_metrics.clone(),
919            storage.clone(),
920            storage_metrics.clone(),
921        )
922        .await;
923        let right_system_context = SystemContext::new(
924            public_key,
925            private_key,
926            state_private_key,
927            nonce,
928            config,
929            upgrade,
930            memberships,
931            network,
932            initializer,
933            consensus_metrics,
934            storage,
935            storage_metrics,
936        )
937        .await;
938
939        // create registries for both handles
940        let left_consensus_registry = ConsensusTaskRegistry::new();
941        let left_network_registry = NetworkTaskRegistry::new();
942
943        let right_consensus_registry = ConsensusTaskRegistry::new();
944        let right_network_registry = NetworkTaskRegistry::new();
945
946        // create external channels for both handles
947        let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
948        let left_external_event_stream =
949            (left_external_sender, left_external_receiver.deactivate());
950
951        let (right_external_sender, right_external_receiver) =
952            broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
953        let right_external_event_stream =
954            (right_external_sender, right_external_receiver.deactivate());
955
956        // create internal channels for both handles
957        let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
958        let left_internal_event_stream = (
959            left_internal_sender.clone(),
960            left_internal_receiver.clone().deactivate(),
961        );
962
963        let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
964        let right_internal_event_stream = (
965            right_internal_sender.clone(),
966            right_internal_receiver.clone().deactivate(),
967        );
968
969        // create each handle
970        let mut left_handle = SystemContextHandle::<_, I> {
971            consensus_registry: left_consensus_registry,
972            network_registry: left_network_registry,
973            output_event_stream: left_external_event_stream.clone(),
974            internal_event_stream: left_internal_event_stream.clone(),
975            hotshot: Arc::clone(&left_system_context),
976            storage: left_system_context.storage.clone(),
977            network: Arc::clone(&left_system_context.network),
978            membership_coordinator: left_system_context.membership_coordinator.clone(),
979            epoch_height,
980        };
981
982        let mut right_handle = SystemContextHandle::<_, I> {
983            consensus_registry: right_consensus_registry,
984            network_registry: right_network_registry,
985            output_event_stream: right_external_event_stream.clone(),
986            internal_event_stream: right_internal_event_stream.clone(),
987            hotshot: Arc::clone(&right_system_context),
988            storage: right_system_context.storage.clone(),
989            network: Arc::clone(&right_system_context.network),
990            membership_coordinator: right_system_context.membership_coordinator.clone(),
991            epoch_height,
992        };
993
994        // add consensus tasks to each handle, using their individual internal event streams
995        add_consensus_tasks::<TYPES, I>(&mut left_handle).await;
996        add_consensus_tasks::<TYPES, I>(&mut right_handle).await;
997
998        // fuse the event streams from both handles before initializing the network tasks
999        let fused_internal_event_stream = self.fuse_channels(
1000            (left_internal_sender, left_internal_receiver),
1001            (right_internal_sender, right_internal_receiver),
1002        );
1003
1004        // swap out the event stream on the left handle
1005        left_handle.internal_event_stream = (
1006            fused_internal_event_stream.0,
1007            fused_internal_event_stream.1.deactivate(),
1008        );
1009
1010        // add the network tasks to the left handle. note: because the left handle has the fused event stream, the network tasks on the left handle will handle messages from both handles.
1011        add_network_tasks::<TYPES, I>(&mut left_handle).await;
1012
1013        // revert to the original event stream on the left handle, for any applications that want to listen to it
1014        left_handle.internal_event_stream = left_internal_event_stream.clone();
1015
1016        (left_handle, right_handle)
1017    }
1018}
1019
1020#[derive(Debug)]
1021/// A `TwinsHandlerState` that randomly forwards a message to either twin,
1022/// and returns messages from both.
1023pub struct RandomTwinsHandler;
1024
1025#[async_trait]
1026impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TwinsHandlerState<TYPES, I>
1027    for RandomTwinsHandler
1028{
1029    async fn send_handler(
1030        &mut self,
1031        event: &HotShotEvent<TYPES>,
1032    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1033        let random: bool = rand::thread_rng().r#gen();
1034
1035        #[allow(clippy::match_bool)]
1036        match random {
1037            true => vec![Either::Left(event.clone())],
1038            false => vec![Either::Right(event.clone())],
1039        }
1040    }
1041
1042    async fn recv_handler(
1043        &mut self,
1044        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1045    ) -> Vec<HotShotEvent<TYPES>> {
1046        match event {
1047            Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1048        }
1049    }
1050}
1051
1052/// A `TwinsHandlerState` that forwards each message to both twins,
1053/// and returns messages from each of them.
1054#[derive(Debug)]
1055pub struct DoubleTwinsHandler;
1056
1057#[async_trait]
1058impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TwinsHandlerState<TYPES, I>
1059    for DoubleTwinsHandler
1060{
1061    async fn send_handler(
1062        &mut self,
1063        event: &HotShotEvent<TYPES>,
1064    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1065        vec![Either::Left(event.clone()), Either::Right(event.clone())]
1066    }
1067
1068    async fn recv_handler(
1069        &mut self,
1070        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1071    ) -> Vec<HotShotEvent<TYPES>> {
1072        match event {
1073            Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1074        }
1075    }
1076}
1077
1078#[async_trait]
1079impl<TYPES: NodeType, I: NodeImplementation<TYPES>> ConsensusApi<TYPES, I>
1080    for SystemContextHandle<TYPES, I>
1081{
1082    fn total_nodes(&self) -> NonZeroUsize {
1083        self.hotshot.config.num_nodes_with_stake
1084    }
1085
1086    fn builder_timeout(&self) -> Duration {
1087        self.hotshot.config.builder_timeout
1088    }
1089
1090    async fn send_event(&self, event: Event<TYPES>) {
1091        debug!(?event, "send_event");
1092        broadcast_event(event, &self.hotshot.external_event_stream.0).await;
1093    }
1094
1095    fn public_key(&self) -> &TYPES::SignatureKey {
1096        &self.hotshot.public_key
1097    }
1098
1099    fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
1100        &self.hotshot.private_key
1101    }
1102
1103    fn state_private_key(
1104        &self,
1105    ) -> &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey {
1106        &self.hotshot.state_private_key
1107    }
1108}
1109
1110#[derive(Clone, Debug, PartialEq)]
1111pub struct InitializerEpochInfo<TYPES: NodeType> {
1112    pub epoch: EpochNumber,
1113    pub drb_result: DrbResult,
1114    // pub stake_table: Option<StakeTable>, // TODO: Figure out how to connect this up
1115    pub block_header: Option<TYPES::BlockHeader>,
1116}
1117
1118#[derive(Clone, Debug)]
1119/// initializer struct for creating starting block
1120pub struct HotShotInitializer<TYPES: NodeType> {
1121    /// Instance-level state.
1122    pub instance_state: TYPES::InstanceState,
1123
1124    /// Epoch height
1125    pub epoch_height: u64,
1126
1127    /// Epoch start block
1128    pub epoch_start_block: u64,
1129
1130    /// the anchor leaf for the hotshot initializer
1131    pub anchor_leaf: Leaf2<TYPES>,
1132
1133    /// ValidatedState for the anchor leaf
1134    pub anchor_state: Arc<TYPES::ValidatedState>,
1135
1136    /// ValidatedState::Delta for the anchor leaf, optional.
1137    pub anchor_state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1138
1139    /// Starting view number that should be equivalent to the view the node shut down with last.
1140    pub start_view: ViewNumber,
1141
1142    /// The view we last performed an action in.  An action is proposing or voting for
1143    /// either the quorum or DA.
1144    pub last_actioned_view: ViewNumber,
1145
1146    /// Starting epoch number that should be equivalent to the epoch the node shut down with last.
1147    pub start_epoch: Option<EpochNumber>,
1148
1149    /// Highest QC that was seen, for genesis it's the genesis QC.  It should be for a view greater
1150    /// than `inner`s view number for the non genesis case because we must have seen higher QCs
1151    /// to decide on the leaf.
1152    pub high_qc: QuorumCertificate2<TYPES>,
1153
1154    /// Next epoch highest QC that was seen. This is needed to propose during epoch transition after restart.
1155    pub next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
1156
1157    /// Proposals we have sent out to provide to others for catchup
1158    pub saved_proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1159
1160    /// Previously decided upgrade certificate; this is necessary if an upgrade has happened and we are not restarting with the new version
1161    pub decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1162
1163    /// Undecided leaves that were seen, but not yet decided on.  These allow a restarting node
1164    /// to vote and propose right away if they didn't miss anything while down.
1165    pub undecided_leaves: BTreeMap<ViewNumber, Leaf2<TYPES>>,
1166
1167    /// Not yet decided state
1168    pub undecided_state: BTreeMap<ViewNumber, View<TYPES>>,
1169
1170    /// Saved VID shares
1171    pub saved_vid_shares: VidShares<TYPES>,
1172
1173    /// The last formed light client state update certificate if there's any
1174    pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1175
1176    /// Saved epoch information. This must be sorted ascending by epoch.
1177    pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1178}
1179
1180impl<TYPES: NodeType> HotShotInitializer<TYPES> {
1181    /// initialize from genesis
1182    /// # Errors
1183    /// If we are unable to apply the genesis block to the default state
1184    pub async fn from_genesis(
1185        instance_state: TYPES::InstanceState,
1186        epoch_height: u64,
1187        epoch_start_block: u64,
1188        start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1189        upgrade: Upgrade,
1190    ) -> Result<Self, HotShotError<TYPES>> {
1191        let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
1192        let high_qc = QuorumCertificate2::genesis(&validated_state, &instance_state, upgrade).await;
1193
1194        Ok(Self {
1195            anchor_leaf: Leaf2::genesis(&validated_state, &instance_state, upgrade.base).await,
1196            anchor_state: Arc::new(validated_state),
1197            anchor_state_delta: Some(Arc::new(state_delta)),
1198            start_view: ViewNumber::new(0),
1199            start_epoch: genesis_epoch_from_version(upgrade.base),
1200            last_actioned_view: ViewNumber::new(0),
1201            saved_proposals: BTreeMap::new(),
1202            high_qc,
1203            next_epoch_high_qc: None,
1204            decided_upgrade_certificate: None,
1205            undecided_leaves: BTreeMap::new(),
1206            undecided_state: BTreeMap::new(),
1207            instance_state,
1208            saved_vid_shares: BTreeMap::new(),
1209            epoch_height,
1210            state_cert: None,
1211            epoch_start_block,
1212            start_epoch_info,
1213        })
1214    }
1215
1216    /// Use saved proposals to update undecided leaves and state
1217    #[must_use]
1218    pub fn update_undecided(self) -> Self {
1219        let mut undecided_leaves = self.undecided_leaves.clone();
1220        let mut undecided_state = self.undecided_state.clone();
1221
1222        for proposal in self.saved_proposals.values() {
1223            // skip proposals unless they're newer than the anchor leaf
1224            if proposal.data.view_number() <= self.anchor_leaf.view_number() {
1225                continue;
1226            }
1227
1228            undecided_leaves.insert(
1229                proposal.data.view_number(),
1230                Leaf2::from_quorum_proposal(&proposal.data),
1231            );
1232        }
1233
1234        for leaf in undecided_leaves.values() {
1235            let view_inner = ViewInner::Leaf {
1236                leaf: leaf.commit(),
1237                state: Arc::new(TYPES::ValidatedState::from_header(leaf.block_header())),
1238                delta: None,
1239                epoch: leaf.epoch(self.epoch_height),
1240            };
1241            let view = View { view_inner };
1242
1243            undecided_state.insert(leaf.view_number(), view);
1244        }
1245
1246        Self {
1247            undecided_leaves,
1248            undecided_state,
1249            ..self
1250        }
1251    }
1252
1253    /// Create a `HotShotInitializer` from the given information.
1254    ///
1255    /// This function uses the anchor leaf to set the initial validated state,
1256    /// and populates `undecided_leaves` and `undecided_state` using `saved_proposals`.
1257    ///
1258    /// If you are able to or would prefer to set these yourself,
1259    /// you should use the `HotShotInitializer` constructor directly.
1260    #[allow(clippy::too_many_arguments)]
1261    pub fn load(
1262        instance_state: TYPES::InstanceState,
1263        epoch_height: u64,
1264        epoch_start_block: u64,
1265        start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1266        anchor_leaf: Leaf2<TYPES>,
1267        (start_view, start_epoch): (ViewNumber, Option<EpochNumber>),
1268        (high_qc, next_epoch_high_qc): (
1269            QuorumCertificate2<TYPES>,
1270            Option<NextEpochQuorumCertificate2<TYPES>>,
1271        ),
1272        last_actioned_view: ViewNumber,
1273        saved_proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1274        saved_vid_shares: VidShares<TYPES>,
1275        decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1276        state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1277    ) -> Self {
1278        let anchor_state = Arc::new(TYPES::ValidatedState::from_header(
1279            anchor_leaf.block_header(),
1280        ));
1281        let anchor_state_delta = None;
1282
1283        let initializer = Self {
1284            instance_state,
1285            epoch_height,
1286            epoch_start_block,
1287            anchor_leaf,
1288            anchor_state,
1289            anchor_state_delta,
1290            high_qc,
1291            start_view,
1292            start_epoch,
1293            last_actioned_view,
1294            saved_proposals,
1295            saved_vid_shares,
1296            next_epoch_high_qc,
1297            decided_upgrade_certificate,
1298            undecided_leaves: BTreeMap::new(),
1299            undecided_state: BTreeMap::new(),
1300            state_cert,
1301            start_epoch_info,
1302        };
1303
1304        initializer.update_undecided()
1305    }
1306}
1307
1308async fn load_start_epoch_info<TYPES: NodeType>(
1309    membership: &Arc<RwLock<TYPES::Membership>>,
1310    start_epoch_info: &Vec<InitializerEpochInfo<TYPES>>,
1311    epoch_height: u64,
1312    epoch_start_block: u64,
1313) {
1314    let first_epoch_number =
1315        EpochNumber::new(epoch_from_block_number(epoch_start_block, epoch_height));
1316
1317    tracing::warn!("Calling set_first_epoch for epoch {first_epoch_number}");
1318    membership
1319        .write()
1320        .await
1321        .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
1322
1323    let mut sorted_epoch_info = start_epoch_info.clone();
1324    sorted_epoch_info.sort_by_key(|info| info.epoch);
1325    for epoch_info in sorted_epoch_info {
1326        if let Some(block_header) = &epoch_info.block_header {
1327            tracing::warn!("Calling add_epoch_root for epoch {}", epoch_info.epoch);
1328
1329            Membership::add_epoch_root(Arc::clone(membership), block_header.clone())
1330                .await
1331                .unwrap_or_else(|err| {
1332                    // REVIEW NOTE: Should we panic here? a failure here seems like it should be fatal
1333                    tracing::error!(
1334                        "Failed to add epoch root for epoch {}: {err}",
1335                        epoch_info.epoch
1336                    );
1337                });
1338        }
1339    }
1340
1341    for epoch_info in start_epoch_info {
1342        tracing::warn!("Calling add_drb_result for epoch {}", epoch_info.epoch);
1343        membership
1344            .write()
1345            .await
1346            .add_drb_result(epoch_info.epoch, epoch_info.drb_result);
1347    }
1348}