1use std::{
2 fmt::{Debug, Display},
3 marker::PhantomData,
4 sync::Arc,
5 time::Duration,
6};
7
8use anyhow::Context;
9use async_lock::RwLock;
10use derivative::Derivative;
11use espresso_types::{
12 v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence},
13 NodeState, PubKey, Transaction, ValidatedState,
14};
15use futures::{
16 future::{join_all, Future},
17 stream::{Stream, StreamExt},
18};
19use hotshot::{
20 types::{Event, EventType, SystemContextHandle},
21 SystemContext,
22};
23use hotshot_events_service::events_source::{EventConsumer, EventsStreamer};
24use hotshot_orchestrator::client::OrchestratorClient;
25use hotshot_types::{
26 consensus::ConsensusMetricsValue,
27 data::{Leaf2, ViewNumber},
28 epoch_membership::EpochMembershipCoordinator,
29 network::NetworkConfig,
30 storage_metrics::StorageMetricsValue,
31 traits::{metrics::Metrics, network::ConnectedNetwork, node_implementation::Versions},
32 PeerConfig, ValidatorConfig,
33};
34use parking_lot::Mutex;
35use request_response::RequestResponseConfig;
36use tokio::{spawn, sync::mpsc::channel, task::JoinHandle};
37use tracing::{Instrument, Level};
38use url::Url;
39
40use crate::{
41 catchup::ParallelStateCatchup,
42 external_event_handler::ExternalEventHandler,
43 proposal_fetcher::ProposalFetcherConfig,
44 request_response::{
45 data_source::{DataSource, Storage as RequestResponseStorage},
46 network::Sender as RequestResponseSender,
47 recipient_source::RecipientSource,
48 RequestResponseProtocol,
49 },
50 state_signature::StateSigner,
51 Node, SeqTypes, SequencerApiVersion,
52};
53
54pub type Consensus<N, P, V> = SystemContextHandle<SeqTypes, Node<N, P>, V>;
56
57#[derive(Derivative, Clone)]
59#[derivative(Debug(bound = ""))]
60pub struct SequencerContext<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> {
61 #[derivative(Debug = "ignore")]
63 handle: Arc<RwLock<Consensus<N, P, V>>>,
64
65 #[derivative(Debug = "ignore")]
67 #[allow(dead_code)]
68 pub request_response_protocol: RequestResponseProtocol<Node<N, P>, V, N, P>,
69
70 state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
72
73 #[derivative(Debug = "ignore")]
75 wait_for_orchestrator: Option<Arc<OrchestratorClient>>,
76
77 tasks: TaskList,
79
80 events_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
82
83 detached: bool,
84
85 node_state: NodeState,
86
87 network_config: NetworkConfig<SeqTypes>,
88
89 #[derivative(Debug = "ignore")]
90 validator_config: ValidatorConfig<SeqTypes>,
91}
92
93impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> SequencerContext<N, P, V> {
94 #[tracing::instrument(skip_all, fields(node_id = instance_state.node_id))]
95 #[allow(clippy::too_many_arguments)]
96 pub async fn init(
97 network_config: NetworkConfig<SeqTypes>,
98 validator_config: ValidatorConfig<SeqTypes>,
99 coordinator: EpochMembershipCoordinator<SeqTypes>,
100 instance_state: NodeState,
101 storage: Option<RequestResponseStorage>,
102 state_catchup: ParallelStateCatchup,
103 persistence: Arc<P>,
104 network: Arc<N>,
105 state_relay_server: Option<Url>,
106 metrics: &dyn Metrics,
107 stake_table_capacity: usize,
108 event_consumer: impl PersistenceEventConsumer + 'static,
109 _: V,
110 proposal_fetcher_cfg: ProposalFetcherConfig,
111 ) -> anyhow::Result<Self> {
112 let config = &network_config.config;
113 let pub_key = validator_config.public_key;
114 tracing::info!(%pub_key, "initializing consensus");
115
116 metrics
118 .create_gauge("node_index".into(), None)
119 .set(instance_state.node_id as usize);
120
121 instance_state.l1_client.spawn_tasks().await;
123
124 let (initializer, anchor_view) = persistence
126 .load_consensus_state::<V>(instance_state.clone())
127 .await?;
128
129 tracing::warn!(
130 "Starting up sequencer context with initializer:\n\n{:?}",
131 initializer
132 );
133
134 let stake_table = config.hotshot_stake_table();
135 let stake_table_commit = stake_table.commitment(stake_table_capacity)?;
136 let stake_table_epoch = None;
137
138 let event_streamer = Arc::new(RwLock::new(EventsStreamer::<SeqTypes>::new(
139 stake_table.0,
140 0,
141 )));
142
143 let handle = SystemContext::init(
144 validator_config.public_key,
145 validator_config.private_key.clone(),
146 validator_config.state_private_key.clone(),
147 instance_state.node_id,
148 config.clone(),
149 coordinator.clone(),
150 network.clone(),
151 initializer,
152 ConsensusMetricsValue::new(metrics),
153 Arc::clone(&persistence),
154 StorageMetricsValue::new(metrics),
155 )
156 .await?
157 .0;
158
159 let mut state_signer = StateSigner::new(
160 validator_config.state_private_key.clone(),
161 validator_config.state_public_key.clone(),
162 stake_table_commit,
163 stake_table_epoch,
164 stake_table_capacity,
165 );
166 if let Some(url) = state_relay_server {
167 state_signer = state_signer.with_relay_server(url);
168 }
169
170 let (outbound_message_sender, outbound_message_receiver) = channel(20);
172 let (request_response_sender, request_response_receiver) = channel(20);
173
174 let request_response_config = RequestResponseConfig {
176 incoming_request_ttl: Duration::from_secs(40),
177 incoming_request_timeout: Duration::from_secs(5),
178 incoming_response_timeout: Duration::from_secs(5),
179 request_batch_size: 5,
180 request_batch_interval: Duration::from_secs(2),
181 max_incoming_requests: 10,
182 max_incoming_requests_per_key: 1,
183 max_incoming_responses: 20,
184 };
185
186 let request_response_protocol = RequestResponseProtocol::new(
188 request_response_config,
189 RequestResponseSender::new(outbound_message_sender),
190 request_response_receiver,
191 RecipientSource {
192 memberships: coordinator,
193 consensus: handle.hotshot.clone(),
194 public_key: validator_config.public_key,
195 },
196 DataSource {
197 node_state: instance_state.clone(),
198 storage,
199 persistence: persistence.clone(),
200 consensus: handle.hotshot.clone(),
201 phantom: PhantomData,
202 },
203 validator_config.public_key,
204 validator_config.private_key.clone(),
205 );
206
207 state_catchup.add_provider(Arc::new(request_response_protocol.clone()));
211
212 let mut tasks = TaskList::default();
214 let external_event_handler = ExternalEventHandler::<V>::new(
215 &mut tasks,
216 request_response_sender,
217 outbound_message_receiver,
218 network,
219 pub_key,
220 )
221 .await
222 .with_context(|| "Failed to create external event handler")?;
223
224 Ok(Self::new(
225 handle,
226 persistence,
227 state_signer,
228 external_event_handler,
229 request_response_protocol,
230 event_streamer,
231 instance_state,
232 network_config,
233 validator_config,
234 event_consumer,
235 anchor_view,
236 proposal_fetcher_cfg,
237 metrics,
238 )
239 .with_task_list(tasks))
240 }
241
242 #[allow(clippy::too_many_arguments)]
244 fn new(
245 handle: Consensus<N, P, V>,
246 persistence: Arc<P>,
247 state_signer: StateSigner<SequencerApiVersion>,
248 external_event_handler: ExternalEventHandler<V>,
249 request_response_protocol: RequestResponseProtocol<Node<N, P>, V, N, P>,
250 event_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
251 node_state: NodeState,
252 network_config: NetworkConfig<SeqTypes>,
253 validator_config: ValidatorConfig<SeqTypes>,
254 event_consumer: impl PersistenceEventConsumer + 'static,
255 anchor_view: Option<ViewNumber>,
256 proposal_fetcher_cfg: ProposalFetcherConfig,
257 metrics: &dyn Metrics,
258 ) -> Self {
259 let events = handle.event_stream();
260
261 let node_id = node_state.node_id;
262 let mut ctx = Self {
263 handle: Arc::new(RwLock::new(handle)),
264 state_signer: Arc::new(RwLock::new(state_signer)),
265 request_response_protocol,
266 tasks: Default::default(),
267 detached: false,
268 wait_for_orchestrator: None,
269 events_streamer: event_streamer.clone(),
270 node_state,
271 network_config,
272 validator_config,
273 };
274
275 proposal_fetcher_cfg.spawn(
277 &mut ctx.tasks,
278 ctx.handle.clone(),
279 persistence.clone(),
280 metrics,
281 );
282
283 ctx.spawn(
285 "event handler",
286 handle_events(
287 ctx.handle.clone(),
288 node_id,
289 events,
290 persistence,
291 ctx.state_signer.clone(),
292 external_event_handler,
293 Some(event_streamer.clone()),
294 event_consumer,
295 anchor_view,
296 ),
297 );
298
299 ctx
300 }
301
302 pub fn wait_for_orchestrator(mut self, client: OrchestratorClient) -> Self {
304 self.wait_for_orchestrator = Some(Arc::new(client));
305 self
306 }
307
308 pub(crate) fn with_task_list(mut self, tasks: TaskList) -> Self {
310 self.tasks.extend(tasks);
311 self
312 }
313
314 pub fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
316 self.state_signer.clone()
317 }
318
319 pub async fn event_stream(&self) -> impl Stream<Item = Event<SeqTypes>> {
321 self.handle.read().await.event_stream()
322 }
323
324 pub async fn submit_transaction(&self, tx: Transaction) -> anyhow::Result<()> {
325 self.handle.read().await.submit_transaction(tx).await?;
326 Ok(())
327 }
328
329 pub fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
331 self.events_streamer.clone()
332 }
333
334 pub fn consensus(&self) -> Arc<RwLock<Consensus<N, P, V>>> {
336 Arc::clone(&self.handle)
337 }
338
339 pub async fn shutdown_consensus(&self) {
340 self.handle.write().await.shut_down().await
341 }
342
343 pub async fn decided_leaf(&self) -> Leaf2<SeqTypes> {
344 self.handle.read().await.decided_leaf().await
345 }
346
347 pub async fn state(&self, view: ViewNumber) -> Option<Arc<ValidatedState>> {
348 self.handle.read().await.state(view).await
349 }
350
351 pub async fn decided_state(&self) -> Arc<ValidatedState> {
352 self.handle.read().await.decided_state().await
353 }
354
355 pub fn node_id(&self) -> u64 {
356 self.node_state.node_id
357 }
358
359 pub fn node_state(&self) -> NodeState {
360 self.node_state.clone()
361 }
362
363 pub async fn start_consensus(&self) {
365 if let Some(orchestrator_client) = &self.wait_for_orchestrator {
366 tracing::warn!("waiting for orchestrated start");
367 let peer_config = PeerConfig::to_bytes(&self.validator_config.public_config()).clone();
368 orchestrator_client
369 .wait_for_all_nodes_ready(peer_config)
370 .await;
371 } else {
372 tracing::error!("Cannot get info from orchestrator client");
373 }
374 tracing::warn!("starting consensus");
375 self.handle.read().await.hotshot.start_consensus().await;
376 }
377
378 pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
383 self.tasks.spawn(name, task);
384 }
385
386 pub fn spawn_short_lived(
394 &mut self,
395 name: impl Display,
396 task: impl Future<Output: Debug> + Send + 'static,
397 ) {
398 self.tasks.spawn_short_lived(name, task);
399 }
400
401 pub async fn shut_down(&mut self) {
403 tracing::info!("shutting down SequencerContext");
404 self.handle.write().await.shut_down().await;
405 self.tasks.shut_down();
406 self.node_state.l1_client.shut_down_tasks().await;
407
408 self.detached = true;
411 }
412
413 pub async fn join(mut self) {
418 self.tasks.join().await;
419 }
420
421 pub fn detach(&mut self) {
423 self.detached = true;
425 }
426
427 pub fn network_config(&self) -> NetworkConfig<SeqTypes> {
429 self.network_config.clone()
430 }
431}
432
433impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Drop
434 for SequencerContext<N, P, V>
435{
436 fn drop(&mut self) {
437 if !self.detached {
438 let handle_clone = self.handle.clone();
440 let tasks_clone = self.tasks.clone();
441 let node_state_clone = self.node_state.clone();
442
443 spawn(async move {
444 tracing::info!("shutting down SequencerContext");
445 handle_clone.write().await.shut_down().await;
446 tasks_clone.shut_down();
447 node_state_clone.l1_client.shut_down_tasks().await;
448 });
449
450 self.detached = true;
452 }
453 }
454}
455
456#[tracing::instrument(skip_all, fields(node_id))]
457#[allow(clippy::too_many_arguments)]
458async fn handle_events<N, P, V>(
459 consensus: Arc<RwLock<Consensus<N, P, V>>>,
460 node_id: u64,
461 mut events: impl Stream<Item = Event<SeqTypes>> + Unpin,
462 persistence: Arc<P>,
463 state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
464 external_event_handler: ExternalEventHandler<V>,
465 events_streamer: Option<Arc<RwLock<EventsStreamer<SeqTypes>>>>,
466 event_consumer: impl PersistenceEventConsumer + 'static,
467 anchor_view: Option<ViewNumber>,
468) where
469 N: ConnectedNetwork<PubKey>,
470 P: SequencerPersistence,
471 V: Versions,
472{
473 if let Some(view) = anchor_view {
474 if let Err(err) = persistence
477 .append_decided_leaves(view, vec![], None, &event_consumer)
478 .await
479 {
480 tracing::warn!(
481 "failed to process decided leaves, chain may not be up to date: {err:#}"
482 );
483 }
484 }
485
486 while let Some(event) = events.next().await {
487 tracing::debug!(node_id, ?event, "consensus event");
488 persistence.handle_event(&event, &event_consumer).await;
490
491 state_signer
493 .write()
494 .await
495 .handle_event(&event, consensus.clone())
496 .await;
497
498 if let EventType::ExternalMessageReceived { data, .. } = &event.event {
500 if let Err(err) = external_event_handler.handle_event(data).await {
501 tracing::warn!("Failed to handle external message: {:?}", err);
502 };
503 }
504
505 if let Some(events_streamer) = events_streamer.as_ref() {
507 events_streamer.write().await.handle_event(event).await;
508 }
509 }
510}
511
512#[derive(Debug, Default, Clone)]
513#[allow(clippy::type_complexity)]
514pub(crate) struct TaskList(Arc<Mutex<Vec<(String, JoinHandle<()>)>>>);
515
516macro_rules! spawn_with_log_level {
517 ($this:expr, $lvl:expr, $name:expr, $task: expr) => {
518 let name = $name.to_string();
519 let task = {
520 let name = name.clone();
521 let span = tracing::span!($lvl, "background task", name);
522 spawn(
523 async move {
524 tracing::event!($lvl, "spawning background task");
525 let res = $task.await;
526 tracing::event!($lvl, ?res, "background task exited");
527 }
528 .instrument(span),
529 )
530 };
531 $this.0.lock().push((name, task));
532 };
533}
534
535impl TaskList {
536 pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
541 spawn_with_log_level!(self, Level::INFO, name, task);
542 }
543
544 pub fn spawn_short_lived(
552 &mut self,
553 name: impl Display,
554 task: impl Future<Output: Debug> + Send + 'static,
555 ) {
556 spawn_with_log_level!(self, Level::DEBUG, name, task);
557 }
558
559 pub fn shut_down(&self) {
561 let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
562 for (name, task) in tasks.into_iter().rev() {
563 tracing::info!(name, "cancelling background task");
564 task.abort();
565 }
566 }
567
568 pub async fn join(&mut self) {
570 let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
571 join_all(tasks.into_iter().map(|(_, task)| task)).await;
572 }
573
574 pub fn extend(&mut self, tasks: TaskList) {
575 self.0.lock().extend(
576 tasks
577 .0
578 .lock()
579 .drain(..)
580 .collect::<Vec<(String, JoinHandle<()>)>>(),
581 );
582 }
583}
584
585impl Drop for TaskList {
586 fn drop(&mut self) {
587 self.shut_down()
588 }
589}