sequencer/
context.rs

1use std::{
2    fmt::{Debug, Display},
3    marker::PhantomData,
4    sync::Arc,
5    time::Duration,
6};
7
8use anyhow::Context;
9use async_lock::RwLock;
10use derivative::Derivative;
11use espresso_types::{
12    v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence},
13    NodeState, PubKey, Transaction, ValidatedState,
14};
15use futures::{
16    future::{join_all, Future},
17    stream::{Stream, StreamExt},
18};
19use hotshot::{
20    types::{Event, EventType, SystemContextHandle},
21    SystemContext,
22};
23use hotshot_events_service::events_source::{EventConsumer, EventsStreamer};
24use hotshot_orchestrator::client::OrchestratorClient;
25use hotshot_types::{
26    consensus::ConsensusMetricsValue,
27    data::{Leaf2, ViewNumber},
28    epoch_membership::EpochMembershipCoordinator,
29    network::NetworkConfig,
30    storage_metrics::StorageMetricsValue,
31    traits::{metrics::Metrics, network::ConnectedNetwork, node_implementation::Versions},
32    PeerConfig, ValidatorConfig,
33};
34use parking_lot::Mutex;
35use request_response::RequestResponseConfig;
36use tokio::{spawn, sync::mpsc::channel, task::JoinHandle};
37use tracing::{Instrument, Level};
38use url::Url;
39
40use crate::{
41    catchup::ParallelStateCatchup,
42    external_event_handler::ExternalEventHandler,
43    proposal_fetcher::ProposalFetcherConfig,
44    request_response::{
45        data_source::{DataSource, Storage as RequestResponseStorage},
46        network::Sender as RequestResponseSender,
47        recipient_source::RecipientSource,
48        RequestResponseProtocol,
49    },
50    state_signature::StateSigner,
51    Node, SeqTypes, SequencerApiVersion,
52};
53
54/// The consensus handle
55pub type Consensus<N, P, V> = SystemContextHandle<SeqTypes, Node<N, P>, V>;
56
57/// The sequencer context contains a consensus handle and other sequencer specific information.
58#[derive(Derivative, Clone)]
59#[derivative(Debug(bound = ""))]
60pub struct SequencerContext<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> {
61    /// The consensus handle
62    #[derivative(Debug = "ignore")]
63    handle: Arc<RwLock<Consensus<N, P, V>>>,
64
65    /// The request-response protocol
66    #[derivative(Debug = "ignore")]
67    #[allow(dead_code)]
68    pub request_response_protocol: RequestResponseProtocol<Node<N, P>, V, N, P>,
69
70    /// Context for generating state signatures.
71    state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
72
73    /// An orchestrator to wait for before starting consensus.
74    #[derivative(Debug = "ignore")]
75    wait_for_orchestrator: Option<Arc<OrchestratorClient>>,
76
77    /// Background tasks to shut down when the node is dropped.
78    tasks: TaskList,
79
80    /// events streamer to stream hotshot events to external clients
81    events_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
82
83    detached: bool,
84
85    node_state: NodeState,
86
87    network_config: NetworkConfig<SeqTypes>,
88
89    #[derivative(Debug = "ignore")]
90    validator_config: ValidatorConfig<SeqTypes>,
91}
92
93impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> SequencerContext<N, P, V> {
94    #[tracing::instrument(skip_all, fields(node_id = instance_state.node_id))]
95    #[allow(clippy::too_many_arguments)]
96    pub async fn init(
97        network_config: NetworkConfig<SeqTypes>,
98        validator_config: ValidatorConfig<SeqTypes>,
99        coordinator: EpochMembershipCoordinator<SeqTypes>,
100        instance_state: NodeState,
101        storage: Option<RequestResponseStorage>,
102        state_catchup: ParallelStateCatchup,
103        persistence: Arc<P>,
104        network: Arc<N>,
105        state_relay_server: Option<Url>,
106        metrics: &dyn Metrics,
107        stake_table_capacity: usize,
108        event_consumer: impl PersistenceEventConsumer + 'static,
109        _: V,
110        proposal_fetcher_cfg: ProposalFetcherConfig,
111    ) -> anyhow::Result<Self> {
112        let config = &network_config.config;
113        let pub_key = validator_config.public_key;
114        tracing::info!(%pub_key, "initializing consensus");
115
116        // Stick our node ID in `metrics` so it is easily accessible via the status API.
117        metrics
118            .create_gauge("node_index".into(), None)
119            .set(instance_state.node_id as usize);
120
121        // Start L1 client if it isn't already.
122        instance_state.l1_client.spawn_tasks().await;
123
124        // Load saved consensus state from storage.
125        let (initializer, anchor_view) = persistence
126            .load_consensus_state::<V>(instance_state.clone())
127            .await?;
128
129        tracing::warn!(
130            "Starting up sequencer context with initializer:\n\n{:?}",
131            initializer
132        );
133
134        let stake_table = config.hotshot_stake_table();
135        let stake_table_commit = stake_table.commitment(stake_table_capacity)?;
136        let stake_table_epoch = None;
137
138        let event_streamer = Arc::new(RwLock::new(EventsStreamer::<SeqTypes>::new(
139            stake_table.0,
140            0,
141        )));
142
143        let handle = SystemContext::init(
144            validator_config.public_key,
145            validator_config.private_key.clone(),
146            validator_config.state_private_key.clone(),
147            instance_state.node_id,
148            config.clone(),
149            coordinator.clone(),
150            network.clone(),
151            initializer,
152            ConsensusMetricsValue::new(metrics),
153            Arc::clone(&persistence),
154            StorageMetricsValue::new(metrics),
155        )
156        .await?
157        .0;
158
159        let mut state_signer = StateSigner::new(
160            validator_config.state_private_key.clone(),
161            validator_config.state_public_key.clone(),
162            stake_table_commit,
163            stake_table_epoch,
164            stake_table_capacity,
165        );
166        if let Some(url) = state_relay_server {
167            state_signer = state_signer.with_relay_server(url);
168        }
169
170        // Create the channel for sending outbound messages from the external event handler
171        let (outbound_message_sender, outbound_message_receiver) = channel(20);
172        let (request_response_sender, request_response_receiver) = channel(20);
173
174        // Configure the request-response protocol
175        let request_response_config = RequestResponseConfig {
176            incoming_request_ttl: Duration::from_secs(40),
177            incoming_request_timeout: Duration::from_secs(5),
178            incoming_response_timeout: Duration::from_secs(5),
179            request_batch_size: 5,
180            request_batch_interval: Duration::from_secs(2),
181            max_incoming_requests: 10,
182            max_incoming_requests_per_key: 1,
183            max_incoming_responses: 20,
184        };
185
186        // Create the request-response protocol
187        let request_response_protocol = RequestResponseProtocol::new(
188            request_response_config,
189            RequestResponseSender::new(outbound_message_sender),
190            request_response_receiver,
191            RecipientSource {
192                memberships: coordinator,
193                consensus: handle.hotshot.clone(),
194                public_key: validator_config.public_key,
195            },
196            DataSource {
197                node_state: instance_state.clone(),
198                storage,
199                persistence: persistence.clone(),
200                consensus: handle.hotshot.clone(),
201                phantom: PhantomData,
202            },
203            validator_config.public_key,
204            validator_config.private_key.clone(),
205        );
206
207        // Add the request-response protocol to the list of providers for state catchup. Since the interior is mutable,
208        // the request-response protocol will now retroactively be used anywhere we passed in the original struct (e.g. in consensus
209        // itself)
210        state_catchup.add_provider(Arc::new(request_response_protocol.clone()));
211
212        // Create the external event handler
213        let mut tasks = TaskList::default();
214        let external_event_handler = ExternalEventHandler::<V>::new(
215            &mut tasks,
216            request_response_sender,
217            outbound_message_receiver,
218            network,
219            pub_key,
220        )
221        .await
222        .with_context(|| "Failed to create external event handler")?;
223
224        Ok(Self::new(
225            handle,
226            persistence,
227            state_signer,
228            external_event_handler,
229            request_response_protocol,
230            event_streamer,
231            instance_state,
232            network_config,
233            validator_config,
234            event_consumer,
235            anchor_view,
236            proposal_fetcher_cfg,
237            metrics,
238        )
239        .with_task_list(tasks))
240    }
241
242    /// Constructor
243    #[allow(clippy::too_many_arguments)]
244    fn new(
245        handle: Consensus<N, P, V>,
246        persistence: Arc<P>,
247        state_signer: StateSigner<SequencerApiVersion>,
248        external_event_handler: ExternalEventHandler<V>,
249        request_response_protocol: RequestResponseProtocol<Node<N, P>, V, N, P>,
250        event_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
251        node_state: NodeState,
252        network_config: NetworkConfig<SeqTypes>,
253        validator_config: ValidatorConfig<SeqTypes>,
254        event_consumer: impl PersistenceEventConsumer + 'static,
255        anchor_view: Option<ViewNumber>,
256        proposal_fetcher_cfg: ProposalFetcherConfig,
257        metrics: &dyn Metrics,
258    ) -> Self {
259        let events = handle.event_stream();
260
261        let node_id = node_state.node_id;
262        let mut ctx = Self {
263            handle: Arc::new(RwLock::new(handle)),
264            state_signer: Arc::new(RwLock::new(state_signer)),
265            request_response_protocol,
266            tasks: Default::default(),
267            detached: false,
268            wait_for_orchestrator: None,
269            events_streamer: event_streamer.clone(),
270            node_state,
271            network_config,
272            validator_config,
273        };
274
275        // Spawn proposal fetching tasks.
276        proposal_fetcher_cfg.spawn(
277            &mut ctx.tasks,
278            ctx.handle.clone(),
279            persistence.clone(),
280            metrics,
281        );
282
283        // Spawn event handling loop.
284        ctx.spawn(
285            "event handler",
286            handle_events(
287                ctx.handle.clone(),
288                node_id,
289                events,
290                persistence,
291                ctx.state_signer.clone(),
292                external_event_handler,
293                Some(event_streamer.clone()),
294                event_consumer,
295                anchor_view,
296            ),
297        );
298
299        ctx
300    }
301
302    /// Wait for a signal from the orchestrator before starting consensus.
303    pub fn wait_for_orchestrator(mut self, client: OrchestratorClient) -> Self {
304        self.wait_for_orchestrator = Some(Arc::new(client));
305        self
306    }
307
308    /// Add a list of tasks to the given context.
309    pub(crate) fn with_task_list(mut self, tasks: TaskList) -> Self {
310        self.tasks.extend(tasks);
311        self
312    }
313
314    /// Return a reference to the consensus state signer.
315    pub fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
316        self.state_signer.clone()
317    }
318
319    /// Stream consensus events.
320    pub async fn event_stream(&self) -> impl Stream<Item = Event<SeqTypes>> {
321        self.handle.read().await.event_stream()
322    }
323
324    pub async fn submit_transaction(&self, tx: Transaction) -> anyhow::Result<()> {
325        self.handle.read().await.submit_transaction(tx).await?;
326        Ok(())
327    }
328
329    /// get event streamer
330    pub fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
331        self.events_streamer.clone()
332    }
333
334    /// Return a reference to the underlying consensus handle.
335    pub fn consensus(&self) -> Arc<RwLock<Consensus<N, P, V>>> {
336        Arc::clone(&self.handle)
337    }
338
339    pub async fn shutdown_consensus(&self) {
340        self.handle.write().await.shut_down().await
341    }
342
343    pub async fn decided_leaf(&self) -> Leaf2<SeqTypes> {
344        self.handle.read().await.decided_leaf().await
345    }
346
347    pub async fn state(&self, view: ViewNumber) -> Option<Arc<ValidatedState>> {
348        self.handle.read().await.state(view).await
349    }
350
351    pub async fn decided_state(&self) -> Arc<ValidatedState> {
352        self.handle.read().await.decided_state().await
353    }
354
355    pub fn node_id(&self) -> u64 {
356        self.node_state.node_id
357    }
358
359    pub fn node_state(&self) -> NodeState {
360        self.node_state.clone()
361    }
362
363    /// Start participating in consensus.
364    pub async fn start_consensus(&self) {
365        if let Some(orchestrator_client) = &self.wait_for_orchestrator {
366            tracing::warn!("waiting for orchestrated start");
367            let peer_config = PeerConfig::to_bytes(&self.validator_config.public_config()).clone();
368            orchestrator_client
369                .wait_for_all_nodes_ready(peer_config)
370                .await;
371        } else {
372            tracing::error!("Cannot get info from orchestrator client");
373        }
374        tracing::warn!("starting consensus");
375        self.handle.read().await.hotshot.start_consensus().await;
376    }
377
378    /// Spawn a background task attached to this context.
379    ///
380    /// When this context is dropped or [`shut_down`](Self::shut_down), background tasks will be
381    /// cancelled in the reverse order that they were spawned.
382    pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
383        self.tasks.spawn(name, task);
384    }
385
386    /// Spawn a short-lived background task attached to this context.
387    ///
388    /// When this context is dropped or [`shut_down`](Self::shut_down), background tasks will be
389    /// cancelled in the reverse order that they were spawned.
390    ///
391    /// The only difference between a short-lived background task and a [long-lived](Self::spawn)
392    /// one is how urgently logging related to the task is treated.
393    pub fn spawn_short_lived(
394        &mut self,
395        name: impl Display,
396        task: impl Future<Output: Debug> + Send + 'static,
397    ) {
398        self.tasks.spawn_short_lived(name, task);
399    }
400
401    /// Stop participating in consensus.
402    pub async fn shut_down(&mut self) {
403        tracing::info!("shutting down SequencerContext");
404        self.handle.write().await.shut_down().await;
405        self.tasks.shut_down();
406        self.node_state.l1_client.shut_down_tasks().await;
407
408        // Since we've already shut down, we can set `detached` so the drop
409        // handler doesn't call `shut_down` again.
410        self.detached = true;
411    }
412
413    /// Wait for consensus to complete.
414    ///
415    /// Under normal conditions, this function will block forever, which is a convenient way of
416    /// keeping the main thread from exiting as long as there are still active background tasks.
417    pub async fn join(mut self) {
418        self.tasks.join().await;
419    }
420
421    /// Allow this node to continue participating in consensus even after it is dropped.
422    pub fn detach(&mut self) {
423        // Set `detached` so the drop handler doesn't call `shut_down`.
424        self.detached = true;
425    }
426
427    /// Get the network config
428    pub fn network_config(&self) -> NetworkConfig<SeqTypes> {
429        self.network_config.clone()
430    }
431}
432
433impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Drop
434    for SequencerContext<N, P, V>
435{
436    fn drop(&mut self) {
437        if !self.detached {
438            // Spawn a task to shut down the context
439            let handle_clone = self.handle.clone();
440            let tasks_clone = self.tasks.clone();
441            let node_state_clone = self.node_state.clone();
442
443            spawn(async move {
444                tracing::info!("shutting down SequencerContext");
445                handle_clone.write().await.shut_down().await;
446                tasks_clone.shut_down();
447                node_state_clone.l1_client.shut_down_tasks().await;
448            });
449
450            // Set `detached` so the drop handler doesn't call `shut_down` again.
451            self.detached = true;
452        }
453    }
454}
455
456#[tracing::instrument(skip_all, fields(node_id))]
457#[allow(clippy::too_many_arguments)]
458async fn handle_events<N, P, V>(
459    consensus: Arc<RwLock<Consensus<N, P, V>>>,
460    node_id: u64,
461    mut events: impl Stream<Item = Event<SeqTypes>> + Unpin,
462    persistence: Arc<P>,
463    state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
464    external_event_handler: ExternalEventHandler<V>,
465    events_streamer: Option<Arc<RwLock<EventsStreamer<SeqTypes>>>>,
466    event_consumer: impl PersistenceEventConsumer + 'static,
467    anchor_view: Option<ViewNumber>,
468) where
469    N: ConnectedNetwork<PubKey>,
470    P: SequencerPersistence,
471    V: Versions,
472{
473    if let Some(view) = anchor_view {
474        // Process and clean up any leaves that we may have persisted last time we were running but
475        // failed to handle due to a shutdown.
476        if let Err(err) = persistence
477            .append_decided_leaves(view, vec![], None, &event_consumer)
478            .await
479        {
480            tracing::warn!(
481                "failed to process decided leaves, chain may not be up to date: {err:#}"
482            );
483        }
484    }
485
486    while let Some(event) = events.next().await {
487        tracing::debug!(node_id, ?event, "consensus event");
488        // Store latest consensus state.
489        persistence.handle_event(&event, &event_consumer).await;
490
491        // Generate state signature.
492        state_signer
493            .write()
494            .await
495            .handle_event(&event, consensus.clone())
496            .await;
497
498        // Handle external messages
499        if let EventType::ExternalMessageReceived { data, .. } = &event.event {
500            if let Err(err) = external_event_handler.handle_event(data).await {
501                tracing::warn!("Failed to handle external message: {:?}", err);
502            };
503        }
504
505        // Send the event via the event streaming service
506        if let Some(events_streamer) = events_streamer.as_ref() {
507            events_streamer.write().await.handle_event(event).await;
508        }
509    }
510}
511
512#[derive(Debug, Default, Clone)]
513#[allow(clippy::type_complexity)]
514pub(crate) struct TaskList(Arc<Mutex<Vec<(String, JoinHandle<()>)>>>);
515
516macro_rules! spawn_with_log_level {
517    ($this:expr, $lvl:expr, $name:expr, $task: expr) => {
518        let name = $name.to_string();
519        let task = {
520            let name = name.clone();
521            let span = tracing::span!($lvl, "background task", name);
522            spawn(
523                async move {
524                    tracing::event!($lvl, "spawning background task");
525                    let res = $task.await;
526                    tracing::event!($lvl, ?res, "background task exited");
527                }
528                .instrument(span),
529            )
530        };
531        $this.0.lock().push((name, task));
532    };
533}
534
535impl TaskList {
536    /// Spawn a background task attached to this [`TaskList`].
537    ///
538    /// When this [`TaskList`] is dropped or [`shut_down`](Self::shut_down), background tasks will
539    /// be cancelled in the reverse order that they were spawned.
540    pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
541        spawn_with_log_level!(self, Level::INFO, name, task);
542    }
543
544    /// Spawn a short-lived background task attached to this [`TaskList`].
545    ///
546    /// When this [`TaskList`] is dropped or [`shut_down`](Self::shut_down), background tasks will
547    /// be cancelled in the reverse order that they were spawned.
548    ///
549    /// The only difference between a short-lived background task and a [long-lived](Self::spawn)
550    /// one is how urgently logging related to the task is treated.
551    pub fn spawn_short_lived(
552        &mut self,
553        name: impl Display,
554        task: impl Future<Output: Debug> + Send + 'static,
555    ) {
556        spawn_with_log_level!(self, Level::DEBUG, name, task);
557    }
558
559    /// Stop all background tasks.
560    pub fn shut_down(&self) {
561        let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
562        for (name, task) in tasks.into_iter().rev() {
563            tracing::info!(name, "cancelling background task");
564            task.abort();
565        }
566    }
567
568    /// Wait for all background tasks to complete.
569    pub async fn join(&mut self) {
570        let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
571        join_all(tasks.into_iter().map(|(_, task)| task)).await;
572    }
573
574    pub fn extend(&mut self, tasks: TaskList) {
575        self.0.lock().extend(
576            tasks
577                .0
578                .lock()
579                .drain(..)
580                .collect::<Vec<(String, JoinHandle<()>)>>(),
581        );
582    }
583}
584
585impl Drop for TaskList {
586    fn drop(&mut self) {
587        self.shut_down()
588    }
589}