sequencer/
context.rs

1use std::{
2    fmt::{Debug, Display},
3    marker::PhantomData,
4    sync::Arc,
5    time::Duration,
6};
7
8use anyhow::Context;
9use async_lock::RwLock;
10use derivative::Derivative;
11use espresso_types::{
12    v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence},
13    NodeState, PubKey, Transaction, ValidatedState,
14};
15use futures::{
16    future::{join_all, Future},
17    stream::{Stream, StreamExt},
18};
19use hotshot::{
20    types::{Event, EventType, SystemContextHandle},
21    SystemContext,
22};
23use hotshot_events_service::events_source::{EventConsumer, EventsStreamer};
24use hotshot_orchestrator::client::OrchestratorClient;
25use hotshot_types::{
26    consensus::ConsensusMetricsValue,
27    data::{Leaf2, ViewNumber},
28    epoch_membership::EpochMembershipCoordinator,
29    network::NetworkConfig,
30    storage_metrics::StorageMetricsValue,
31    traits::{metrics::Metrics, network::ConnectedNetwork, node_implementation::Versions},
32    PeerConfig, ValidatorConfig,
33};
34use parking_lot::Mutex;
35use request_response::RequestResponseConfig;
36use tokio::{spawn, sync::mpsc::channel, task::JoinHandle};
37use tracing::{Instrument, Level};
38use url::Url;
39
40use crate::{
41    catchup::ParallelStateCatchup,
42    external_event_handler::ExternalEventHandler,
43    proposal_fetcher::ProposalFetcherConfig,
44    request_response::{
45        data_source::{DataSource, Storage as RequestResponseStorage},
46        network::Sender as RequestResponseSender,
47        recipient_source::RecipientSource,
48        RequestResponseProtocol,
49    },
50    state_signature::StateSigner,
51    Node, SeqTypes, SequencerApiVersion,
52};
53
54/// The consensus handle
55pub type Consensus<N, P, V> = SystemContextHandle<SeqTypes, Node<N, P>, V>;
56
57/// The sequencer context contains a consensus handle and other sequencer specific information.
58#[derive(Derivative, Clone)]
59#[derivative(Debug(bound = ""))]
60pub struct SequencerContext<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> {
61    /// The consensus handle
62    #[derivative(Debug = "ignore")]
63    handle: Arc<RwLock<Consensus<N, P, V>>>,
64
65    /// The request-response protocol
66    #[derivative(Debug = "ignore")]
67    #[allow(dead_code)]
68    pub request_response_protocol: RequestResponseProtocol<Node<N, P>, V, N, P>,
69
70    /// Context for generating state signatures.
71    state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
72
73    /// An orchestrator to wait for before starting consensus.
74    #[derivative(Debug = "ignore")]
75    wait_for_orchestrator: Option<Arc<OrchestratorClient>>,
76
77    /// Background tasks to shut down when the node is dropped.
78    tasks: TaskList,
79
80    /// events streamer to stream hotshot events to external clients
81    events_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
82
83    detached: bool,
84
85    node_state: NodeState,
86
87    network_config: NetworkConfig<SeqTypes>,
88
89    #[derivative(Debug = "ignore")]
90    validator_config: ValidatorConfig<SeqTypes>,
91}
92
93impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> SequencerContext<N, P, V> {
94    #[tracing::instrument(skip_all, fields(node_id = instance_state.node_id))]
95    #[allow(clippy::too_many_arguments)]
96    pub async fn init(
97        network_config: NetworkConfig<SeqTypes>,
98        validator_config: ValidatorConfig<SeqTypes>,
99        coordinator: EpochMembershipCoordinator<SeqTypes>,
100        instance_state: NodeState,
101        storage: Option<RequestResponseStorage>,
102        state_catchup: ParallelStateCatchup,
103        persistence: Arc<P>,
104        network: Arc<N>,
105        state_relay_server: Option<Url>,
106        metrics: &dyn Metrics,
107        stake_table_capacity: usize,
108        event_consumer: impl PersistenceEventConsumer + 'static,
109        _: V,
110        proposal_fetcher_cfg: ProposalFetcherConfig,
111    ) -> anyhow::Result<Self> {
112        let config = &network_config.config;
113        let pub_key = validator_config.public_key;
114        tracing::info!(%pub_key, "initializing consensus");
115
116        // Stick our node ID in `metrics` so it is easily accessible via the status API.
117        metrics
118            .create_gauge("node_index".into(), None)
119            .set(instance_state.node_id as usize);
120
121        // Start L1 client if it isn't already.
122        instance_state.l1_client.spawn_tasks().await;
123
124        // Load saved consensus state from storage.
125        let (initializer, anchor_view) = persistence
126            .load_consensus_state::<V>(instance_state.clone())
127            .await?;
128
129        let stake_table = config.hotshot_stake_table();
130        let stake_table_commit = stake_table.commitment(stake_table_capacity)?;
131        let stake_table_epoch = None;
132
133        let event_streamer = Arc::new(RwLock::new(EventsStreamer::<SeqTypes>::new(
134            stake_table.0,
135            0,
136        )));
137
138        let handle = SystemContext::init(
139            validator_config.public_key,
140            validator_config.private_key.clone(),
141            validator_config.state_private_key.clone(),
142            instance_state.node_id,
143            config.clone(),
144            coordinator.clone(),
145            network.clone(),
146            initializer,
147            ConsensusMetricsValue::new(metrics),
148            Arc::clone(&persistence),
149            StorageMetricsValue::new(metrics),
150        )
151        .await?
152        .0;
153
154        let mut state_signer = StateSigner::new(
155            validator_config.state_private_key.clone(),
156            validator_config.state_public_key.clone(),
157            stake_table_commit,
158            stake_table_epoch,
159            stake_table_capacity,
160        );
161        if let Some(url) = state_relay_server {
162            state_signer = state_signer.with_relay_server(url);
163        }
164
165        // Create the channel for sending outbound messages from the external event handler
166        let (outbound_message_sender, outbound_message_receiver) = channel(20);
167        let (request_response_sender, request_response_receiver) = channel(20);
168
169        // Configure the request-response protocol
170        let request_response_config = RequestResponseConfig {
171            incoming_request_ttl: Duration::from_secs(40),
172            incoming_request_timeout: Duration::from_secs(5),
173            incoming_response_timeout: Duration::from_secs(5),
174            request_batch_size: 5,
175            request_batch_interval: Duration::from_secs(2),
176            max_incoming_requests: 10,
177            max_incoming_requests_per_key: 1,
178            max_incoming_responses: 20,
179        };
180
181        // Create the request-response protocol
182        let request_response_protocol = RequestResponseProtocol::new(
183            request_response_config,
184            RequestResponseSender::new(outbound_message_sender),
185            request_response_receiver,
186            RecipientSource {
187                memberships: coordinator,
188                consensus: handle.hotshot.clone(),
189                public_key: validator_config.public_key,
190            },
191            DataSource {
192                node_state: instance_state.clone(),
193                storage,
194                consensus: handle.hotshot.clone(),
195                phantom: PhantomData,
196            },
197            validator_config.public_key,
198            validator_config.private_key.clone(),
199        );
200
201        // Add the request-response protocol to the list of providers for state catchup. Since the interior is mutable,
202        // the request-response protocol will now retroactively be used anywhere we passed in the original struct (e.g. in consensus
203        // itself)
204        state_catchup.add_provider(Arc::new(request_response_protocol.clone()));
205
206        // Create the external event handler
207        let mut tasks = TaskList::default();
208        let external_event_handler = ExternalEventHandler::<V>::new(
209            &mut tasks,
210            request_response_sender,
211            outbound_message_receiver,
212            network,
213            pub_key,
214        )
215        .await
216        .with_context(|| "Failed to create external event handler")?;
217
218        Ok(Self::new(
219            handle,
220            persistence,
221            state_signer,
222            external_event_handler,
223            request_response_protocol,
224            event_streamer,
225            instance_state,
226            network_config,
227            validator_config,
228            event_consumer,
229            anchor_view,
230            proposal_fetcher_cfg,
231            metrics,
232        )
233        .with_task_list(tasks))
234    }
235
236    /// Constructor
237    #[allow(clippy::too_many_arguments)]
238    fn new(
239        handle: Consensus<N, P, V>,
240        persistence: Arc<P>,
241        state_signer: StateSigner<SequencerApiVersion>,
242        external_event_handler: ExternalEventHandler<V>,
243        request_response_protocol: RequestResponseProtocol<Node<N, P>, V, N, P>,
244        event_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
245        node_state: NodeState,
246        network_config: NetworkConfig<SeqTypes>,
247        validator_config: ValidatorConfig<SeqTypes>,
248        event_consumer: impl PersistenceEventConsumer + 'static,
249        anchor_view: Option<ViewNumber>,
250        proposal_fetcher_cfg: ProposalFetcherConfig,
251        metrics: &dyn Metrics,
252    ) -> Self {
253        let events = handle.event_stream();
254
255        let node_id = node_state.node_id;
256        let mut ctx = Self {
257            handle: Arc::new(RwLock::new(handle)),
258            state_signer: Arc::new(RwLock::new(state_signer)),
259            request_response_protocol,
260            tasks: Default::default(),
261            detached: false,
262            wait_for_orchestrator: None,
263            events_streamer: event_streamer.clone(),
264            node_state,
265            network_config,
266            validator_config,
267        };
268
269        // Spawn proposal fetching tasks.
270        proposal_fetcher_cfg.spawn(
271            &mut ctx.tasks,
272            ctx.handle.clone(),
273            persistence.clone(),
274            metrics,
275        );
276
277        // Spawn event handling loop.
278        ctx.spawn(
279            "event handler",
280            handle_events(
281                ctx.handle.clone(),
282                node_id,
283                events,
284                persistence,
285                ctx.state_signer.clone(),
286                external_event_handler,
287                Some(event_streamer.clone()),
288                event_consumer,
289                anchor_view,
290            ),
291        );
292
293        ctx
294    }
295
296    /// Wait for a signal from the orchestrator before starting consensus.
297    pub fn wait_for_orchestrator(mut self, client: OrchestratorClient) -> Self {
298        self.wait_for_orchestrator = Some(Arc::new(client));
299        self
300    }
301
302    /// Add a list of tasks to the given context.
303    pub(crate) fn with_task_list(mut self, tasks: TaskList) -> Self {
304        self.tasks.extend(tasks);
305        self
306    }
307
308    /// Return a reference to the consensus state signer.
309    pub fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
310        self.state_signer.clone()
311    }
312
313    /// Stream consensus events.
314    pub async fn event_stream(&self) -> impl Stream<Item = Event<SeqTypes>> {
315        self.handle.read().await.event_stream()
316    }
317
318    pub async fn submit_transaction(&self, tx: Transaction) -> anyhow::Result<()> {
319        self.handle.read().await.submit_transaction(tx).await?;
320        Ok(())
321    }
322
323    /// get event streamer
324    pub fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
325        self.events_streamer.clone()
326    }
327
328    /// Return a reference to the underlying consensus handle.
329    pub fn consensus(&self) -> Arc<RwLock<Consensus<N, P, V>>> {
330        Arc::clone(&self.handle)
331    }
332
333    pub async fn shutdown_consensus(&self) {
334        self.handle.write().await.shut_down().await
335    }
336
337    pub async fn decided_leaf(&self) -> Leaf2<SeqTypes> {
338        self.handle.read().await.decided_leaf().await
339    }
340
341    pub async fn state(&self, view: ViewNumber) -> Option<Arc<ValidatedState>> {
342        self.handle.read().await.state(view).await
343    }
344
345    pub async fn decided_state(&self) -> Arc<ValidatedState> {
346        self.handle.read().await.decided_state().await
347    }
348
349    pub fn node_id(&self) -> u64 {
350        self.node_state.node_id
351    }
352
353    pub fn node_state(&self) -> NodeState {
354        self.node_state.clone()
355    }
356
357    /// Start participating in consensus.
358    pub async fn start_consensus(&self) {
359        if let Some(orchestrator_client) = &self.wait_for_orchestrator {
360            tracing::warn!("waiting for orchestrated start");
361            let peer_config = PeerConfig::to_bytes(&self.validator_config.public_config()).clone();
362            orchestrator_client
363                .wait_for_all_nodes_ready(peer_config)
364                .await;
365        } else {
366            tracing::error!("Cannot get info from orchestrator client");
367        }
368        tracing::warn!("starting consensus");
369        self.handle.read().await.hotshot.start_consensus().await;
370    }
371
372    /// Spawn a background task attached to this context.
373    ///
374    /// When this context is dropped or [`shut_down`](Self::shut_down), background tasks will be
375    /// cancelled in the reverse order that they were spawned.
376    pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
377        self.tasks.spawn(name, task);
378    }
379
380    /// Spawn a short-lived background task attached to this context.
381    ///
382    /// When this context is dropped or [`shut_down`](Self::shut_down), background tasks will be
383    /// cancelled in the reverse order that they were spawned.
384    ///
385    /// The only difference between a short-lived background task and a [long-lived](Self::spawn)
386    /// one is how urgently logging related to the task is treated.
387    pub fn spawn_short_lived(
388        &mut self,
389        name: impl Display,
390        task: impl Future<Output: Debug> + Send + 'static,
391    ) {
392        self.tasks.spawn_short_lived(name, task);
393    }
394
395    /// Stop participating in consensus.
396    pub async fn shut_down(&mut self) {
397        tracing::info!("shutting down SequencerContext");
398        self.handle.write().await.shut_down().await;
399        self.tasks.shut_down();
400        self.node_state.l1_client.shut_down_tasks().await;
401
402        // Since we've already shut down, we can set `detached` so the drop
403        // handler doesn't call `shut_down` again.
404        self.detached = true;
405    }
406
407    /// Wait for consensus to complete.
408    ///
409    /// Under normal conditions, this function will block forever, which is a convenient way of
410    /// keeping the main thread from exiting as long as there are still active background tasks.
411    pub async fn join(mut self) {
412        self.tasks.join().await;
413    }
414
415    /// Allow this node to continue participating in consensus even after it is dropped.
416    pub fn detach(&mut self) {
417        // Set `detached` so the drop handler doesn't call `shut_down`.
418        self.detached = true;
419    }
420
421    /// Get the network config
422    pub fn network_config(&self) -> NetworkConfig<SeqTypes> {
423        self.network_config.clone()
424    }
425}
426
427impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Drop
428    for SequencerContext<N, P, V>
429{
430    fn drop(&mut self) {
431        if !self.detached {
432            // Spawn a task to shut down the context
433            let handle_clone = self.handle.clone();
434            let tasks_clone = self.tasks.clone();
435            let node_state_clone = self.node_state.clone();
436
437            spawn(async move {
438                tracing::info!("shutting down SequencerContext");
439                handle_clone.write().await.shut_down().await;
440                tasks_clone.shut_down();
441                node_state_clone.l1_client.shut_down_tasks().await;
442            });
443
444            // Set `detached` so the drop handler doesn't call `shut_down` again.
445            self.detached = true;
446        }
447    }
448}
449
450#[tracing::instrument(skip_all, fields(node_id))]
451#[allow(clippy::too_many_arguments)]
452async fn handle_events<N, P, V>(
453    consensus: Arc<RwLock<Consensus<N, P, V>>>,
454    node_id: u64,
455    mut events: impl Stream<Item = Event<SeqTypes>> + Unpin,
456    persistence: Arc<P>,
457    state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
458    external_event_handler: ExternalEventHandler<V>,
459    events_streamer: Option<Arc<RwLock<EventsStreamer<SeqTypes>>>>,
460    event_consumer: impl PersistenceEventConsumer + 'static,
461    anchor_view: Option<ViewNumber>,
462) where
463    N: ConnectedNetwork<PubKey>,
464    P: SequencerPersistence,
465    V: Versions,
466{
467    if let Some(view) = anchor_view {
468        // Process and clean up any leaves that we may have persisted last time we were running but
469        // failed to handle due to a shutdown.
470        if let Err(err) = persistence
471            .append_decided_leaves(view, vec![], &event_consumer)
472            .await
473        {
474            tracing::warn!(
475                "failed to process decided leaves, chain may not be up to date: {err:#}"
476            );
477        }
478    }
479
480    while let Some(event) = events.next().await {
481        tracing::debug!(node_id, ?event, "consensus event");
482        // Store latest consensus state.
483        persistence.handle_event(&event, &event_consumer).await;
484
485        // Generate state signature.
486        state_signer
487            .write()
488            .await
489            .handle_event(&event, consensus.clone())
490            .await;
491
492        // Handle external messages
493        if let EventType::ExternalMessageReceived { data, .. } = &event.event {
494            if let Err(err) = external_event_handler.handle_event(data).await {
495                tracing::warn!("Failed to handle external message: {:?}", err);
496            };
497        }
498
499        // Send the event via the event streaming service
500        if let Some(events_streamer) = events_streamer.as_ref() {
501            events_streamer.write().await.handle_event(event).await;
502        }
503    }
504}
505
506#[derive(Debug, Default, Clone)]
507#[allow(clippy::type_complexity)]
508pub(crate) struct TaskList(Arc<Mutex<Vec<(String, JoinHandle<()>)>>>);
509
510macro_rules! spawn_with_log_level {
511    ($this:expr, $lvl:expr, $name:expr, $task: expr) => {
512        let name = $name.to_string();
513        let task = {
514            let name = name.clone();
515            let span = tracing::span!($lvl, "background task", name);
516            spawn(
517                async move {
518                    tracing::event!($lvl, "spawning background task");
519                    let res = $task.await;
520                    tracing::event!($lvl, ?res, "background task exited");
521                }
522                .instrument(span),
523            )
524        };
525        $this.0.lock().push((name, task));
526    };
527}
528
529impl TaskList {
530    /// Spawn a background task attached to this [`TaskList`].
531    ///
532    /// When this [`TaskList`] is dropped or [`shut_down`](Self::shut_down), background tasks will
533    /// be cancelled in the reverse order that they were spawned.
534    pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
535        spawn_with_log_level!(self, Level::INFO, name, task);
536    }
537
538    /// Spawn a short-lived background task attached to this [`TaskList`].
539    ///
540    /// When this [`TaskList`] is dropped or [`shut_down`](Self::shut_down), background tasks will
541    /// be cancelled in the reverse order that they were spawned.
542    ///
543    /// The only difference between a short-lived background task and a [long-lived](Self::spawn)
544    /// one is how urgently logging related to the task is treated.
545    pub fn spawn_short_lived(
546        &mut self,
547        name: impl Display,
548        task: impl Future<Output: Debug> + Send + 'static,
549    ) {
550        spawn_with_log_level!(self, Level::DEBUG, name, task);
551    }
552
553    /// Stop all background tasks.
554    pub fn shut_down(&self) {
555        let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
556        for (name, task) in tasks.into_iter().rev() {
557            tracing::info!(name, "cancelling background task");
558            task.abort();
559        }
560    }
561
562    /// Wait for all background tasks to complete.
563    pub async fn join(&mut self) {
564        let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
565        join_all(tasks.into_iter().map(|(_, task)| task)).await;
566    }
567
568    pub fn extend(&mut self, tasks: TaskList) {
569        self.0.lock().extend(
570            tasks
571                .0
572                .lock()
573                .drain(..)
574                .collect::<Vec<(String, JoinHandle<()>)>>(),
575        );
576    }
577}
578
579impl Drop for TaskList {
580    fn drop(&mut self) {
581        self.shut_down()
582    }
583}