hotshot/
lib.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides a generic rust implementation of the `HotShot` BFT protocol
8//!
9
10// Documentation module
11#[cfg(feature = "docs")]
12pub mod documentation;
13use committable::Committable;
14use futures::future::{select, Either};
15use hotshot_types::{
16    drb::{drb_difficulty_selector, DrbResult, INITIAL_DRB_RESULT},
17    epoch_membership::EpochMembershipCoordinator,
18    message::UpgradeLock,
19    simple_certificate::{CertificatePair, LightClientStateUpdateCertificateV2},
20    traits::{
21        block_contents::BlockHeader, election::Membership, network::BroadcastDelay,
22        node_implementation::Versions, signature_key::StateSignatureKey, storage::Storage,
23    },
24    utils::{epoch_from_block_number, is_ge_epoch_root},
25};
26use rand::Rng;
27use vbs::version::StaticVersionType;
28
29/// Contains traits consumed by [`SystemContext`]
30pub mod traits;
31/// Contains types used by the crate
32pub mod types;
33
34pub mod tasks;
35use hotshot_types::data::QuorumProposalWrapper;
36
37/// Contains helper functions for the crate
38pub mod helpers;
39
40use std::{
41    collections::{BTreeMap, HashMap},
42    num::NonZeroUsize,
43    sync::Arc,
44    time::Duration,
45};
46
47use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
48use async_lock::RwLock;
49use async_trait::async_trait;
50use futures::join;
51use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
52use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
53// Internal
54/// Reexport error type
55pub use hotshot_types::error::HotShotError;
56use hotshot_types::{
57    consensus::{
58        Consensus, ConsensusMetricsValue, OuterConsensus, PayloadWithMetadata, VidShares, View,
59        ViewInner,
60    },
61    constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
62    data::Leaf2,
63    event::{EventType, LeafInfo},
64    message::{DataMessage, Message, MessageKind, Proposal},
65    simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
66    storage_metrics::StorageMetricsValue,
67    traits::{
68        consensus_api::ConsensusApi,
69        network::ConnectedNetwork,
70        node_implementation::{ConsensusTime, NodeType},
71        signature_key::SignatureKey,
72        states::ValidatedState,
73    },
74    utils::{genesis_epoch_from_version, option_epoch_from_block_number},
75    HotShotConfig,
76};
77/// Reexport rand crate
78pub use rand;
79use tokio::{spawn, time::sleep};
80use tracing::{debug, instrument, trace};
81
82// -- Rexports
83// External
84use crate::{
85    tasks::{add_consensus_tasks, add_network_tasks},
86    traits::NodeImplementation,
87    types::{Event, SystemContextHandle},
88};
89
90/// Length, in bytes, of a 512 bit hash
91pub const H_512: usize = 64;
92/// Length, in bytes, of a 256 bit hash
93pub const H_256: usize = 32;
94
95/// Holds the state needed to participate in `HotShot` consensus
96pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
97    /// The public key of this node
98    public_key: TYPES::SignatureKey,
99
100    /// The private key of this node
101    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
102
103    /// The private key to sign the light client state
104    state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106    /// Configuration items for this hotshot instance
107    pub config: HotShotConfig<TYPES>,
108
109    /// The underlying network
110    pub network: Arc<I::Network>,
111
112    /// Memberships used by consensus
113    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
114
115    /// the metrics that the implementor is using.
116    metrics: Arc<ConsensusMetricsValue>,
117
118    /// The hotstuff implementation
119    consensus: OuterConsensus<TYPES>,
120
121    /// Immutable instance state
122    instance_state: Arc<TYPES::InstanceState>,
123
124    /// The view to enter when first starting consensus
125    start_view: TYPES::View,
126
127    /// The epoch to enter when first starting consensus
128    start_epoch: Option<TYPES::Epoch>,
129
130    /// Access to the output event stream.
131    output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
132
133    /// External event stream for communication with the application.
134    pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
135
136    /// Anchored leaf provided by the initializer.
137    anchored_leaf: Leaf2<TYPES>,
138
139    /// access to the internal event stream, in case we need to, say, shut something down
140    #[allow(clippy::type_complexity)]
141    internal_event_stream: (
142        Sender<Arc<HotShotEvent<TYPES>>>,
143        InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
144    ),
145
146    /// uid for instrumentation
147    pub id: u64,
148
149    /// Reference to the internal storage for consensus datum.
150    pub storage: I::Storage,
151
152    /// Storage metrics
153    pub storage_metrics: Arc<StorageMetricsValue>,
154
155    /// shared lock for upgrade information
156    pub upgrade_lock: UpgradeLock<TYPES, V>,
157}
158impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Clone
159    for SystemContext<TYPES, I, V>
160{
161    #![allow(deprecated)]
162    fn clone(&self) -> Self {
163        Self {
164            public_key: self.public_key.clone(),
165            private_key: self.private_key.clone(),
166            state_private_key: self.state_private_key.clone(),
167            config: self.config.clone(),
168            network: Arc::clone(&self.network),
169            membership_coordinator: self.membership_coordinator.clone(),
170            metrics: Arc::clone(&self.metrics),
171            consensus: self.consensus.clone(),
172            instance_state: Arc::clone(&self.instance_state),
173            start_view: self.start_view,
174            start_epoch: self.start_epoch,
175            output_event_stream: self.output_event_stream.clone(),
176            external_event_stream: self.external_event_stream.clone(),
177            anchored_leaf: self.anchored_leaf.clone(),
178            internal_event_stream: self.internal_event_stream.clone(),
179            id: self.id,
180            storage: self.storage.clone(),
181            storage_metrics: Arc::clone(&self.storage_metrics),
182            upgrade_lock: self.upgrade_lock.clone(),
183        }
184    }
185}
186
187impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
188    #![allow(deprecated)]
189    /// Creates a new [`Arc<SystemContext>`] with the given configuration options.
190    ///
191    /// To do a full initialization, use `fn init` instead, which will set up background tasks as
192    /// well.
193    ///
194    /// Use this instead of `init` if you want to start the tasks manually
195    ///
196    /// # Panics
197    ///
198    /// Panics if storage migration fails.
199    #[allow(clippy::too_many_arguments)]
200    pub async fn new(
201        public_key: TYPES::SignatureKey,
202        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
203        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
204        nonce: u64,
205        config: HotShotConfig<TYPES>,
206        memberships: EpochMembershipCoordinator<TYPES>,
207        network: Arc<I::Network>,
208        initializer: HotShotInitializer<TYPES>,
209        consensus_metrics: ConsensusMetricsValue,
210        storage: I::Storage,
211        storage_metrics: StorageMetricsValue,
212    ) -> Arc<Self> {
213        let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
214        let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
215
216        Self::new_from_channels(
217            public_key,
218            private_key,
219            state_private_key,
220            nonce,
221            config,
222            memberships,
223            network,
224            initializer,
225            consensus_metrics,
226            storage,
227            storage_metrics,
228            internal_chan,
229            external_chan,
230        )
231        .await
232    }
233
234    /// Creates a new [`Arc<SystemContext>`] with the given configuration options.
235    ///
236    /// To do a full initialization, use `fn init` instead, which will set up background tasks as
237    /// well.
238    ///
239    /// Use this function if you want to use some preexisting channels and to spin up the tasks
240    /// and start consensus manually.  Mostly useful for tests
241    #[allow(clippy::too_many_arguments, clippy::type_complexity)]
242    pub async fn new_from_channels(
243        public_key: TYPES::SignatureKey,
244        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
245        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
246        nonce: u64,
247        config: HotShotConfig<TYPES>,
248        mut membership_coordinator: EpochMembershipCoordinator<TYPES>,
249        network: Arc<I::Network>,
250        initializer: HotShotInitializer<TYPES>,
251        consensus_metrics: ConsensusMetricsValue,
252        storage: I::Storage,
253        storage_metrics: StorageMetricsValue,
254        internal_channel: (
255            Sender<Arc<HotShotEvent<TYPES>>>,
256            Receiver<Arc<HotShotEvent<TYPES>>>,
257        ),
258        external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
259    ) -> Arc<Self> {
260        debug!("Creating a new hotshot");
261
262        tracing::warn!("Starting consensus with HotShotConfig:\n\n {config:?}");
263
264        let consensus_metrics = Arc::new(consensus_metrics);
265        let storage_metrics = Arc::new(storage_metrics);
266        let anchored_leaf = initializer.anchor_leaf;
267        let instance_state = initializer.instance_state;
268
269        let (internal_tx, internal_rx) = internal_channel;
270        let (mut external_tx, external_rx) = external_channel;
271
272        let mut internal_rx = internal_rx.new_receiver();
273
274        let mut external_rx = external_rx.new_receiver();
275
276        // Allow overflow on the internal channel as well. We don't want to block consensus if we
277        // have a slow receiver
278        internal_rx.set_overflow(true);
279        // Allow overflow on the external channel, otherwise sending to it may block.
280        external_rx.set_overflow(true);
281
282        membership_coordinator
283            .set_external_channel(external_rx.clone())
284            .await;
285
286        tracing::warn!(
287            "Starting consensus with versions:\n\n Base: {:?}\nUpgrade: {:?}.",
288            V::Base::VERSION,
289            V::Upgrade::VERSION,
290        );
291        tracing::warn!(
292            "Loading previously decided upgrade certificate from storage: {:?}",
293            initializer.decided_upgrade_certificate
294        );
295
296        let upgrade_lock =
297            UpgradeLock::<TYPES, V>::from_certificate(&initializer.decided_upgrade_certificate);
298
299        let current_version = if let Some(cert) = initializer.decided_upgrade_certificate {
300            cert.data.new_version
301        } else {
302            V::Base::VERSION
303        };
304
305        debug!("Setting DRB difficulty selector in membership");
306        let drb_difficulty_selector = drb_difficulty_selector::<_, V>(&config);
307
308        membership_coordinator
309            .set_drb_difficulty_selector(drb_difficulty_selector)
310            .await;
311
312        for da_committee in &config.da_committees {
313            if current_version >= da_committee.start_version {
314                membership_coordinator
315                    .membership()
316                    .write()
317                    .await
318                    .add_da_committee(da_committee.start_epoch, da_committee.committee.clone());
319            }
320        }
321
322        // Get the validated state from the initializer or construct an incomplete one from the
323        // block header.
324        let validated_state = initializer.anchor_state;
325
326        load_start_epoch_info(
327            membership_coordinator.membership(),
328            &initializer.start_epoch_info,
329            config.epoch_height,
330            config.epoch_start_block,
331        )
332        .await;
333
334        // #3967 REVIEW NOTE: Should this actually be Some()? How do we know?
335        let epoch = initializer.high_qc.data.block_number.map(|block_number| {
336            TYPES::Epoch::new(epoch_from_block_number(
337                block_number + 1,
338                config.epoch_height,
339            ))
340        });
341
342        // Insert the validated state to state map.
343        let mut validated_state_map = BTreeMap::default();
344        validated_state_map.insert(
345            anchored_leaf.view_number(),
346            View {
347                view_inner: ViewInner::Leaf {
348                    leaf: anchored_leaf.commit(),
349                    state: Arc::clone(&validated_state),
350                    delta: initializer.anchor_state_delta,
351                    epoch,
352                },
353            },
354        );
355        for (view_num, inner) in initializer.undecided_state {
356            validated_state_map.insert(view_num, inner);
357        }
358
359        let mut saved_leaves = HashMap::new();
360        let mut saved_payloads = BTreeMap::new();
361        saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
362
363        for (_, leaf) in initializer.undecided_leaves {
364            saved_leaves.insert(leaf.commit(), leaf.clone());
365        }
366        if let Some(payload) = anchored_leaf.block_payload() {
367            let metadata = anchored_leaf.block_header().metadata().clone();
368            saved_payloads.insert(
369                anchored_leaf.view_number(),
370                Arc::new(PayloadWithMetadata { payload, metadata }),
371            );
372        }
373        let high_qc_block_number = initializer.high_qc.data.block_number;
374
375        let consensus = Consensus::new(
376            validated_state_map,
377            Some(initializer.saved_vid_shares),
378            anchored_leaf.view_number(),
379            epoch,
380            anchored_leaf.view_number(),
381            anchored_leaf.view_number(),
382            initializer.last_actioned_view,
383            initializer.saved_proposals,
384            saved_leaves,
385            saved_payloads,
386            initializer.high_qc,
387            initializer.next_epoch_high_qc,
388            Arc::clone(&consensus_metrics),
389            config.epoch_height,
390            initializer.state_cert,
391            config.drb_difficulty,
392            config.drb_upgrade_difficulty,
393        );
394
395        let consensus = Arc::new(RwLock::new(consensus));
396
397        if let Some(epoch) = epoch {
398            tracing::info!(
399                "Triggering catchup for epoch {} and next epoch {}",
400                epoch,
401                epoch + 1
402            );
403            // trigger catchup for the current and next epoch if needed
404            let _ = membership_coordinator
405                .membership_for_epoch(Some(epoch))
406                .await;
407            let _ = membership_coordinator
408                .membership_for_epoch(Some(epoch + 1))
409                .await;
410            // If we already have an epoch root, we can trigger catchup for the epoch
411            // which that root applies to.
412            if let Some(high_qc_block_number) = high_qc_block_number {
413                if is_ge_epoch_root(high_qc_block_number, config.epoch_height) {
414                    let _ = membership_coordinator
415                        .stake_table_for_epoch(Some(epoch + 2))
416                        .await;
417                }
418            }
419
420            if let Ok(drb_result) = storage.load_drb_result(epoch + 1).await {
421                tracing::error!("Writing DRB result for epoch {}", epoch + 1);
422                if let Ok(mem) = membership_coordinator
423                    .stake_table_for_epoch(Some(epoch + 1))
424                    .await
425                {
426                    mem.add_drb_result(drb_result).await;
427                }
428            }
429        }
430
431        // This makes it so we won't block on broadcasting if there is not a receiver
432        // Our own copy of the receiver is inactive so it doesn't count.
433        external_tx.set_await_active(false);
434
435        let inner: Arc<SystemContext<TYPES, I, V>> = Arc::new(SystemContext {
436            id: nonce,
437            consensus: OuterConsensus::new(consensus),
438            instance_state: Arc::new(instance_state),
439            public_key,
440            private_key,
441            state_private_key,
442            config,
443            start_view: initializer.start_view,
444            start_epoch: initializer.start_epoch,
445            network,
446            membership_coordinator,
447            metrics: Arc::clone(&consensus_metrics),
448            internal_event_stream: (internal_tx, internal_rx.deactivate()),
449            output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
450            external_event_stream: (external_tx, external_rx.deactivate()),
451            anchored_leaf: anchored_leaf.clone(),
452            storage,
453            storage_metrics,
454            upgrade_lock,
455        });
456
457        inner
458    }
459
460    /// "Starts" consensus by sending a `Qc2Formed`, `ViewChange` events
461    ///
462    /// # Panics
463    /// Panics if sending genesis fails
464    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
465    pub async fn start_consensus(&self) {
466        #[cfg(all(feature = "rewind", not(debug_assertions)))]
467        compile_error!("Cannot run rewind in production builds!");
468
469        debug!("Starting Consensus");
470        let consensus = self.consensus.read().await;
471
472        let first_epoch = option_epoch_from_block_number::<TYPES>(
473            V::Base::VERSION >= V::Epochs::VERSION,
474            self.config.epoch_start_block,
475            self.config.epoch_height,
476        );
477        // `start_epoch` comes from the initializer, it might be the last seen epoch before restart
478        // `first_epoch` is the first epoch after the transition to the epoch version
479        // `initial_view_change_epoch` is the greater of the two, we use it with the initial view change
480        let initial_view_change_epoch = self.start_epoch.max(first_epoch);
481        #[allow(clippy::panic)]
482        self.internal_event_stream
483            .0
484            .broadcast_direct(Arc::new(HotShotEvent::ViewChange(
485                self.start_view,
486                initial_view_change_epoch,
487            )))
488            .await
489            .unwrap_or_else(|_| {
490                panic!(
491                    "Genesis Broadcast failed; event = ViewChange({:?}, {:?})",
492                    self.start_view, initial_view_change_epoch,
493                )
494            });
495
496        // Clone the event stream that we send the timeout event to
497        let event_stream = self.internal_event_stream.0.clone();
498        let next_view_timeout = self.config.next_view_timeout;
499        let start_view = self.start_view;
500        let start_epoch = self.start_epoch;
501
502        // Spawn a task that will sleep for the next view timeout and then send a timeout event
503        // if not cancelled
504        spawn({
505            async move {
506                sleep(Duration::from_millis(next_view_timeout)).await;
507                broadcast_event(
508                    Arc::new(HotShotEvent::Timeout(start_view, start_epoch)),
509                    &event_stream,
510                )
511                .await;
512            }
513        });
514        #[allow(clippy::panic)]
515        self.internal_event_stream
516            .0
517            .broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
518                consensus.high_qc().clone(),
519            ))))
520            .await
521            .unwrap_or_else(|_| {
522                panic!(
523                    "Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
524                    consensus.high_qc()
525                )
526            });
527
528        {
529            // Some applications seem to expect a leaf decide event for the genesis leaf,
530            // which contains only that leaf and nothing else.
531            if self.anchored_leaf.view_number() == TYPES::View::genesis() {
532                let (validated_state, state_delta) =
533                    TYPES::ValidatedState::genesis(&self.instance_state);
534
535                let qc = QuorumCertificate2::genesis::<V>(
536                    &validated_state,
537                    self.instance_state.as_ref(),
538                )
539                .await;
540
541                broadcast_event(
542                    Event {
543                        view_number: self.anchored_leaf.view_number(),
544                        event: EventType::Decide {
545                            leaf_chain: Arc::new(vec![LeafInfo::new(
546                                self.anchored_leaf.clone(),
547                                Arc::new(validated_state),
548                                Some(Arc::new(state_delta)),
549                                None,
550                                None,
551                            )]),
552                            committing_qc: Arc::new(CertificatePair::non_epoch_change(qc)),
553                            deciding_qc: None,
554                            block_size: None,
555                        },
556                    },
557                    &self.external_event_stream.0,
558                )
559                .await;
560            }
561        }
562    }
563
564    /// Emit an external event
565    async fn send_external_event(&self, event: Event<TYPES>) {
566        debug!(?event, "send_external_event");
567        broadcast_event(event, &self.external_event_stream.0).await;
568    }
569
570    /// Publishes a transaction asynchronously to the network.
571    ///
572    /// # Errors
573    ///
574    /// Always returns Ok; does not return an error if the transaction couldn't be published to the network
575    #[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
576    pub async fn publish_transaction_async(
577        &self,
578        transaction: TYPES::Transaction,
579    ) -> Result<(), HotShotError<TYPES>> {
580        trace!("Adding transaction to our own queue");
581
582        let api = self.clone();
583
584        let consensus_reader = api.consensus.read().await;
585        let view_number = consensus_reader.cur_view();
586        let epoch = consensus_reader.cur_epoch();
587        drop(consensus_reader);
588
589        // Wrap up a message
590        let message_kind: DataMessage<TYPES> =
591            DataMessage::SubmitTransaction(transaction.clone(), view_number);
592        let message = Message {
593            sender: api.public_key.clone(),
594            kind: MessageKind::from(message_kind),
595        };
596
597        let serialized_message = self.upgrade_lock.serialize(&message).await.map_err(|err| {
598            HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
599        })?;
600
601        let membership = match api.membership_coordinator.membership_for_epoch(epoch).await {
602            Ok(m) => m,
603            Err(e) => return Err(HotShotError::InvalidState(e.message)),
604        };
605
606        spawn(async move {
607            let memberships_da_committee_members = membership
608                .da_committee_members(view_number)
609                .await
610                .iter()
611                .cloned()
612                .collect();
613
614            join! {
615                // TODO We should have a function that can return a network error if there is one
616                // but first we'd need to ensure our network implementations can support that
617                // (and not hang instead)
618
619                // version <0, 1> currently fixed; this is the same as VERSION_0_1,
620                // and will be updated to be part of SystemContext. I wanted to use associated
621                // constants in NodeType, but that seems to be unavailable in the current Rust.
622                api
623                    .network.da_broadcast_message(
624                        serialized_message,
625                        memberships_da_committee_members,
626                        BroadcastDelay::None,
627                    ),
628                api
629                    .send_external_event(Event {
630                        view_number,
631                        event: EventType::Transactions {
632                            transactions: vec![transaction],
633                        },
634                    }),
635            }
636        });
637        Ok(())
638    }
639
640    /// Returns a copy of the consensus struct
641    #[must_use]
642    pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
643        Arc::clone(&self.consensus.inner_consensus)
644    }
645
646    /// Returns a copy of the instance state
647    pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
648        Arc::clone(&self.instance_state)
649    }
650
651    /// Returns a copy of the last decided leaf
652    /// # Panics
653    /// Panics if internal leaf for consensus is inconsistent
654    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
655    pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
656        self.consensus.read().await.decided_leaf()
657    }
658
659    /// [Non-blocking] instantly returns a copy of the last decided leaf if
660    /// it is available to be read. If not, we return `None`.
661    ///
662    /// # Panics
663    /// Panics if internal state for consensus is inconsistent
664    #[must_use]
665    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
666    pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
667        self.consensus.try_read().map(|guard| guard.decided_leaf())
668    }
669
670    /// Returns the last decided validated state.
671    ///
672    /// # Panics
673    /// Panics if internal state for consensus is inconsistent
674    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
675    pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
676        Arc::clone(&self.consensus.read().await.decided_state())
677    }
678
679    /// Get the validated state from a given `view`.
680    ///
681    /// Returns the requested state, if the [`SystemContext`] is tracking this view. Consensus
682    /// tracks views that have not yet been decided but could be in the future. This function may
683    /// return [`None`] if the requested view has already been decided (but see
684    /// [`decided_state`](Self::decided_state)) or if there is no path for the requested
685    /// view to ever be decided.
686    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
687    pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
688        self.consensus.read().await.state(view).cloned()
689    }
690
691    /// Initializes a new [`SystemContext`] and does the work of setting up all the background tasks
692    ///
693    /// Assumes networking implementation is already primed.
694    ///
695    /// Underlying `HotShot` instance starts out paused, and must be unpaused
696    ///
697    /// Upon encountering an unrecoverable error, such as a failure to send to a broadcast channel,
698    /// the `HotShot` instance will log the error and shut down.
699    ///
700    /// To construct a [`SystemContext`] without setting up tasks, use `fn new` instead.
701    /// # Errors
702    ///
703    /// Can throw an error if `Self::new` fails.
704    #[allow(clippy::too_many_arguments)]
705    pub async fn init(
706        public_key: TYPES::SignatureKey,
707        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
708        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
709        node_id: u64,
710        config: HotShotConfig<TYPES>,
711        memberships: EpochMembershipCoordinator<TYPES>,
712        network: Arc<I::Network>,
713        initializer: HotShotInitializer<TYPES>,
714        consensus_metrics: ConsensusMetricsValue,
715        storage: I::Storage,
716        storage_metrics: StorageMetricsValue,
717    ) -> Result<
718        (
719            SystemContextHandle<TYPES, I, V>,
720            Sender<Arc<HotShotEvent<TYPES>>>,
721            Receiver<Arc<HotShotEvent<TYPES>>>,
722        ),
723        HotShotError<TYPES>,
724    > {
725        let hotshot = Self::new(
726            public_key,
727            private_key,
728            state_private_key,
729            node_id,
730            config,
731            memberships,
732            network,
733            initializer,
734            consensus_metrics,
735            storage,
736            storage_metrics,
737        )
738        .await;
739        let handle = Arc::clone(&hotshot).run_tasks().await;
740        let (tx, rx) = hotshot.internal_event_stream.clone();
741
742        Ok((handle, tx, rx.activate()))
743    }
744    /// return the timeout for a view for `self`
745    #[must_use]
746    pub fn next_view_timeout(&self) -> u64 {
747        self.config.next_view_timeout
748    }
749}
750
751impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<TYPES, I, V> {
752    /// Spawn all tasks that operate on [`SystemContextHandle`].
753    ///
754    /// For a list of which tasks are being spawned, see this module's documentation.
755    pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I, V> {
756        let consensus_registry = ConsensusTaskRegistry::new();
757        let network_registry = NetworkTaskRegistry::new();
758
759        let output_event_stream = self.external_event_stream.clone();
760        let internal_event_stream = self.internal_event_stream.clone();
761
762        let mut handle = SystemContextHandle {
763            consensus_registry,
764            network_registry,
765            output_event_stream: output_event_stream.clone(),
766            internal_event_stream: internal_event_stream.clone(),
767            hotshot: self.clone().into(),
768            storage: self.storage.clone(),
769            network: Arc::clone(&self.network),
770            membership_coordinator: self.membership_coordinator.clone(),
771            epoch_height: self.config.epoch_height,
772        };
773
774        add_network_tasks::<TYPES, I, V>(&mut handle).await;
775        add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
776
777        handle
778    }
779}
780
781/// An async broadcast channel
782type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
783
784#[async_trait]
785/// Trait for handling messages for a node with a twin copy of consensus
786pub trait TwinsHandlerState<TYPES, I, V>
787where
788    Self: std::fmt::Debug + Send + Sync,
789    TYPES: NodeType,
790    I: NodeImplementation<TYPES>,
791    V: Versions,
792{
793    /// Handle a message sent to the twin from the network task, forwarding it to one of the two twins.
794    async fn send_handler(
795        &mut self,
796        event: &HotShotEvent<TYPES>,
797    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
798
799    /// Handle a message from either twin, forwarding it to the network task
800    async fn recv_handler(
801        &mut self,
802        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
803    ) -> Vec<HotShotEvent<TYPES>>;
804
805    /// Fuse two channels into a single channel
806    ///
807    /// Note: the channels are fused using two async loops, whose `JoinHandle`s are dropped.
808    fn fuse_channels(
809        &'static mut self,
810        left: Channel<HotShotEvent<TYPES>>,
811        right: Channel<HotShotEvent<TYPES>>,
812    ) -> Channel<HotShotEvent<TYPES>> {
813        let send_state = Arc::new(RwLock::new(self));
814        let recv_state = Arc::clone(&send_state);
815
816        let (left_sender, mut left_receiver) = (left.0, left.1);
817        let (right_sender, mut right_receiver) = (right.0, right.1);
818
819        // channel to the network task
820        let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
821        // channel from the network task
822        let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
823            broadcast(EVENT_CHANNEL_SIZE);
824
825        let _recv_loop_handle = spawn(async move {
826            loop {
827                let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
828                    Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
829                    Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
830                };
831
832                let mut state = recv_state.write().await;
833                let mut result = state.recv_handler(&msg).await;
834
835                while let Some(event) = result.pop() {
836                    let _ = sender_to_network.broadcast(event.into()).await;
837                }
838            }
839        });
840
841        let _send_loop_handle = spawn(async move {
842            loop {
843                if let Ok(msg) = receiver_from_network.recv().await {
844                    let mut state = send_state.write().await;
845
846                    let mut result = state.send_handler(&msg).await;
847
848                    while let Some(event) = result.pop() {
849                        match event {
850                            Either::Left(msg) => {
851                                let _ = left_sender.broadcast(msg.into()).await;
852                            },
853                            Either::Right(msg) => {
854                                let _ = right_sender.broadcast(msg.into()).await;
855                            },
856                        }
857                    }
858                }
859            }
860        });
861
862        (network_task_sender, network_task_receiver)
863    }
864
865    #[allow(clippy::too_many_arguments)]
866    /// Spawn all tasks that operate on [`SystemContextHandle`].
867    ///
868    /// For a list of which tasks are being spawned, see this module's documentation.
869    async fn spawn_twin_handles(
870        &'static mut self,
871        public_key: TYPES::SignatureKey,
872        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
873        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
874        nonce: u64,
875        config: HotShotConfig<TYPES>,
876        memberships: EpochMembershipCoordinator<TYPES>,
877        network: Arc<I::Network>,
878        initializer: HotShotInitializer<TYPES>,
879        consensus_metrics: ConsensusMetricsValue,
880        storage: I::Storage,
881        storage_metrics: StorageMetricsValue,
882    ) -> (
883        SystemContextHandle<TYPES, I, V>,
884        SystemContextHandle<TYPES, I, V>,
885    ) {
886        let epoch_height = config.epoch_height;
887        let left_system_context = SystemContext::new(
888            public_key.clone(),
889            private_key.clone(),
890            state_private_key.clone(),
891            nonce,
892            config.clone(),
893            memberships.clone(),
894            Arc::clone(&network),
895            initializer.clone(),
896            consensus_metrics.clone(),
897            storage.clone(),
898            storage_metrics.clone(),
899        )
900        .await;
901        let right_system_context = SystemContext::new(
902            public_key,
903            private_key,
904            state_private_key,
905            nonce,
906            config,
907            memberships,
908            network,
909            initializer,
910            consensus_metrics,
911            storage,
912            storage_metrics,
913        )
914        .await;
915
916        // create registries for both handles
917        let left_consensus_registry = ConsensusTaskRegistry::new();
918        let left_network_registry = NetworkTaskRegistry::new();
919
920        let right_consensus_registry = ConsensusTaskRegistry::new();
921        let right_network_registry = NetworkTaskRegistry::new();
922
923        // create external channels for both handles
924        let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
925        let left_external_event_stream =
926            (left_external_sender, left_external_receiver.deactivate());
927
928        let (right_external_sender, right_external_receiver) =
929            broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
930        let right_external_event_stream =
931            (right_external_sender, right_external_receiver.deactivate());
932
933        // create internal channels for both handles
934        let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
935        let left_internal_event_stream = (
936            left_internal_sender.clone(),
937            left_internal_receiver.clone().deactivate(),
938        );
939
940        let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
941        let right_internal_event_stream = (
942            right_internal_sender.clone(),
943            right_internal_receiver.clone().deactivate(),
944        );
945
946        // create each handle
947        let mut left_handle = SystemContextHandle::<_, I, _> {
948            consensus_registry: left_consensus_registry,
949            network_registry: left_network_registry,
950            output_event_stream: left_external_event_stream.clone(),
951            internal_event_stream: left_internal_event_stream.clone(),
952            hotshot: Arc::clone(&left_system_context),
953            storage: left_system_context.storage.clone(),
954            network: Arc::clone(&left_system_context.network),
955            membership_coordinator: left_system_context.membership_coordinator.clone(),
956            epoch_height,
957        };
958
959        let mut right_handle = SystemContextHandle::<_, I, _> {
960            consensus_registry: right_consensus_registry,
961            network_registry: right_network_registry,
962            output_event_stream: right_external_event_stream.clone(),
963            internal_event_stream: right_internal_event_stream.clone(),
964            hotshot: Arc::clone(&right_system_context),
965            storage: right_system_context.storage.clone(),
966            network: Arc::clone(&right_system_context.network),
967            membership_coordinator: right_system_context.membership_coordinator.clone(),
968            epoch_height,
969        };
970
971        // add consensus tasks to each handle, using their individual internal event streams
972        add_consensus_tasks::<TYPES, I, V>(&mut left_handle).await;
973        add_consensus_tasks::<TYPES, I, V>(&mut right_handle).await;
974
975        // fuse the event streams from both handles before initializing the network tasks
976        let fused_internal_event_stream = self.fuse_channels(
977            (left_internal_sender, left_internal_receiver),
978            (right_internal_sender, right_internal_receiver),
979        );
980
981        // swap out the event stream on the left handle
982        left_handle.internal_event_stream = (
983            fused_internal_event_stream.0,
984            fused_internal_event_stream.1.deactivate(),
985        );
986
987        // add the network tasks to the left handle. note: because the left handle has the fused event stream, the network tasks on the left handle will handle messages from both handles.
988        add_network_tasks::<TYPES, I, V>(&mut left_handle).await;
989
990        // revert to the original event stream on the left handle, for any applications that want to listen to it
991        left_handle.internal_event_stream = left_internal_event_stream.clone();
992
993        (left_handle, right_handle)
994    }
995}
996
997#[derive(Debug)]
998/// A `TwinsHandlerState` that randomly forwards a message to either twin,
999/// and returns messages from both.
1000pub struct RandomTwinsHandler;
1001
1002#[async_trait]
1003impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
1004    for RandomTwinsHandler
1005{
1006    async fn send_handler(
1007        &mut self,
1008        event: &HotShotEvent<TYPES>,
1009    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1010        let random: bool = rand::thread_rng().gen();
1011
1012        #[allow(clippy::match_bool)]
1013        match random {
1014            true => vec![Either::Left(event.clone())],
1015            false => vec![Either::Right(event.clone())],
1016        }
1017    }
1018
1019    async fn recv_handler(
1020        &mut self,
1021        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1022    ) -> Vec<HotShotEvent<TYPES>> {
1023        match event {
1024            Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1025        }
1026    }
1027}
1028
1029#[derive(Debug)]
1030/// A `TwinsHandlerState` that forwards each message to both twins,
1031/// and returns messages from each of them.
1032pub struct DoubleTwinsHandler;
1033
1034#[async_trait]
1035impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TwinsHandlerState<TYPES, I, V>
1036    for DoubleTwinsHandler
1037{
1038    async fn send_handler(
1039        &mut self,
1040        event: &HotShotEvent<TYPES>,
1041    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1042        vec![Either::Left(event.clone()), Either::Right(event.clone())]
1043    }
1044
1045    async fn recv_handler(
1046        &mut self,
1047        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1048    ) -> Vec<HotShotEvent<TYPES>> {
1049        match event {
1050            Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1051        }
1052    }
1053}
1054
1055#[async_trait]
1056impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusApi<TYPES, I>
1057    for SystemContextHandle<TYPES, I, V>
1058{
1059    fn total_nodes(&self) -> NonZeroUsize {
1060        self.hotshot.config.num_nodes_with_stake
1061    }
1062
1063    fn builder_timeout(&self) -> Duration {
1064        self.hotshot.config.builder_timeout
1065    }
1066
1067    async fn send_event(&self, event: Event<TYPES>) {
1068        debug!(?event, "send_event");
1069        broadcast_event(event, &self.hotshot.external_event_stream.0).await;
1070    }
1071
1072    fn public_key(&self) -> &TYPES::SignatureKey {
1073        &self.hotshot.public_key
1074    }
1075
1076    fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
1077        &self.hotshot.private_key
1078    }
1079
1080    fn state_private_key(
1081        &self,
1082    ) -> &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey {
1083        &self.hotshot.state_private_key
1084    }
1085}
1086
1087#[derive(Clone, Debug, PartialEq)]
1088pub struct InitializerEpochInfo<TYPES: NodeType> {
1089    pub epoch: TYPES::Epoch,
1090    pub drb_result: DrbResult,
1091    // pub stake_table: Option<StakeTable>, // TODO: Figure out how to connect this up
1092    pub block_header: Option<TYPES::BlockHeader>,
1093}
1094
1095#[derive(Clone, Debug)]
1096/// initializer struct for creating starting block
1097pub struct HotShotInitializer<TYPES: NodeType> {
1098    /// Instance-level state.
1099    pub instance_state: TYPES::InstanceState,
1100
1101    /// Epoch height
1102    pub epoch_height: u64,
1103
1104    /// Epoch start block
1105    pub epoch_start_block: u64,
1106
1107    /// the anchor leaf for the hotshot initializer
1108    pub anchor_leaf: Leaf2<TYPES>,
1109
1110    /// ValidatedState for the anchor leaf
1111    pub anchor_state: Arc<TYPES::ValidatedState>,
1112
1113    /// ValidatedState::Delta for the anchor leaf, optional.
1114    pub anchor_state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1115
1116    /// Starting view number that should be equivalent to the view the node shut down with last.
1117    pub start_view: TYPES::View,
1118
1119    /// The view we last performed an action in.  An action is proposing or voting for
1120    /// either the quorum or DA.
1121    pub last_actioned_view: TYPES::View,
1122
1123    /// Starting epoch number that should be equivalent to the epoch the node shut down with last.
1124    pub start_epoch: Option<TYPES::Epoch>,
1125
1126    /// Highest QC that was seen, for genesis it's the genesis QC.  It should be for a view greater
1127    /// than `inner`s view number for the non genesis case because we must have seen higher QCs
1128    /// to decide on the leaf.
1129    pub high_qc: QuorumCertificate2<TYPES>,
1130
1131    /// Next epoch highest QC that was seen. This is needed to propose during epoch transition after restart.
1132    pub next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
1133
1134    /// Proposals we have sent out to provide to others for catchup
1135    pub saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1136
1137    /// Previously decided upgrade certificate; this is necessary if an upgrade has happened and we are not restarting with the new version
1138    pub decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1139
1140    /// Undecided leaves that were seen, but not yet decided on.  These allow a restarting node
1141    /// to vote and propose right away if they didn't miss anything while down.
1142    pub undecided_leaves: BTreeMap<TYPES::View, Leaf2<TYPES>>,
1143
1144    /// Not yet decided state
1145    pub undecided_state: BTreeMap<TYPES::View, View<TYPES>>,
1146
1147    /// Saved VID shares
1148    pub saved_vid_shares: VidShares<TYPES>,
1149
1150    /// The last formed light client state update certificate if there's any
1151    pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1152
1153    /// Saved epoch information. This must be sorted ascending by epoch.
1154    pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1155}
1156
1157impl<TYPES: NodeType> HotShotInitializer<TYPES> {
1158    /// initialize from genesis
1159    /// # Errors
1160    /// If we are unable to apply the genesis block to the default state
1161    pub async fn from_genesis<V: Versions>(
1162        instance_state: TYPES::InstanceState,
1163        epoch_height: u64,
1164        epoch_start_block: u64,
1165        start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1166    ) -> Result<Self, HotShotError<TYPES>> {
1167        let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
1168        let high_qc = QuorumCertificate2::genesis::<V>(&validated_state, &instance_state).await;
1169
1170        Ok(Self {
1171            anchor_leaf: Leaf2::genesis::<V>(&validated_state, &instance_state).await,
1172            anchor_state: Arc::new(validated_state),
1173            anchor_state_delta: Some(Arc::new(state_delta)),
1174            start_view: TYPES::View::new(0),
1175            start_epoch: genesis_epoch_from_version::<V, TYPES>(),
1176            last_actioned_view: TYPES::View::new(0),
1177            saved_proposals: BTreeMap::new(),
1178            high_qc,
1179            next_epoch_high_qc: None,
1180            decided_upgrade_certificate: None,
1181            undecided_leaves: BTreeMap::new(),
1182            undecided_state: BTreeMap::new(),
1183            instance_state,
1184            saved_vid_shares: BTreeMap::new(),
1185            epoch_height,
1186            state_cert: None,
1187            epoch_start_block,
1188            start_epoch_info,
1189        })
1190    }
1191
1192    /// Use saved proposals to update undecided leaves and state
1193    #[must_use]
1194    pub fn update_undecided(self) -> Self {
1195        let mut undecided_leaves = self.undecided_leaves.clone();
1196        let mut undecided_state = self.undecided_state.clone();
1197
1198        for proposal in self.saved_proposals.values() {
1199            // skip proposals unless they're newer than the anchor leaf
1200            if proposal.data.view_number() <= self.anchor_leaf.view_number() {
1201                continue;
1202            }
1203
1204            undecided_leaves.insert(
1205                proposal.data.view_number(),
1206                Leaf2::from_quorum_proposal(&proposal.data),
1207            );
1208        }
1209
1210        for leaf in undecided_leaves.values() {
1211            let view_inner = ViewInner::Leaf {
1212                leaf: leaf.commit(),
1213                state: Arc::new(TYPES::ValidatedState::from_header(leaf.block_header())),
1214                delta: None,
1215                epoch: leaf.epoch(self.epoch_height),
1216            };
1217            let view = View { view_inner };
1218
1219            undecided_state.insert(leaf.view_number(), view);
1220        }
1221
1222        Self {
1223            undecided_leaves,
1224            undecided_state,
1225            ..self
1226        }
1227    }
1228
1229    /// Create a `HotShotInitializer` from the given information.
1230    ///
1231    /// This function uses the anchor leaf to set the initial validated state,
1232    /// and populates `undecided_leaves` and `undecided_state` using `saved_proposals`.
1233    ///
1234    /// If you are able to or would prefer to set these yourself,
1235    /// you should use the `HotShotInitializer` constructor directly.
1236    #[allow(clippy::too_many_arguments)]
1237    pub fn load(
1238        instance_state: TYPES::InstanceState,
1239        epoch_height: u64,
1240        epoch_start_block: u64,
1241        start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1242        anchor_leaf: Leaf2<TYPES>,
1243        (start_view, start_epoch): (TYPES::View, Option<TYPES::Epoch>),
1244        (high_qc, next_epoch_high_qc): (
1245            QuorumCertificate2<TYPES>,
1246            Option<NextEpochQuorumCertificate2<TYPES>>,
1247        ),
1248        last_actioned_view: TYPES::View,
1249        saved_proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1250        saved_vid_shares: VidShares<TYPES>,
1251        decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1252        state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1253    ) -> Self {
1254        let anchor_state = Arc::new(TYPES::ValidatedState::from_header(
1255            anchor_leaf.block_header(),
1256        ));
1257        let anchor_state_delta = None;
1258
1259        let initializer = Self {
1260            instance_state,
1261            epoch_height,
1262            epoch_start_block,
1263            anchor_leaf,
1264            anchor_state,
1265            anchor_state_delta,
1266            high_qc,
1267            start_view,
1268            start_epoch,
1269            last_actioned_view,
1270            saved_proposals,
1271            saved_vid_shares,
1272            next_epoch_high_qc,
1273            decided_upgrade_certificate,
1274            undecided_leaves: BTreeMap::new(),
1275            undecided_state: BTreeMap::new(),
1276            state_cert,
1277            start_epoch_info,
1278        };
1279
1280        initializer.update_undecided()
1281    }
1282}
1283
1284async fn load_start_epoch_info<TYPES: NodeType>(
1285    membership: &Arc<RwLock<TYPES::Membership>>,
1286    start_epoch_info: &Vec<InitializerEpochInfo<TYPES>>,
1287    epoch_height: u64,
1288    epoch_start_block: u64,
1289) {
1290    let first_epoch_number =
1291        TYPES::Epoch::new(epoch_from_block_number(epoch_start_block, epoch_height));
1292
1293    tracing::warn!("Calling set_first_epoch for epoch {first_epoch_number}");
1294    membership
1295        .write()
1296        .await
1297        .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
1298
1299    let mut sorted_epoch_info = start_epoch_info.clone();
1300    sorted_epoch_info.sort_by_key(|info| info.epoch);
1301    for epoch_info in sorted_epoch_info {
1302        if let Some(block_header) = &epoch_info.block_header {
1303            tracing::warn!("Calling add_epoch_root for epoch {}", epoch_info.epoch);
1304
1305            Membership::add_epoch_root(Arc::clone(membership), block_header.clone())
1306                .await
1307                .unwrap_or_else(|err| {
1308                    // REVIEW NOTE: Should we panic here? a failure here seems like it should be fatal
1309                    tracing::error!(
1310                        "Failed to add epoch root for epoch {}: {err}",
1311                        epoch_info.epoch
1312                    );
1313                });
1314        }
1315    }
1316
1317    for epoch_info in start_epoch_info {
1318        tracing::warn!("Calling add_drb_result for epoch {}", epoch_info.epoch);
1319        membership
1320            .write()
1321            .await
1322            .add_drb_result(epoch_info.epoch, epoch_info.drb_result);
1323    }
1324}