1use std::{
2 fmt::{Debug, Display},
3 marker::PhantomData,
4 sync::Arc,
5 time::Duration,
6};
7
8use anyhow::Context;
9use async_lock::RwLock;
10use derivative::Derivative;
11use espresso_types::{
12 v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence},
13 NodeState, PubKey, Transaction, ValidatedState,
14};
15use futures::{
16 future::{join_all, Future},
17 stream::{Stream, StreamExt},
18};
19use hotshot::{
20 types::{Event, EventType, SystemContextHandle},
21 SystemContext,
22};
23use hotshot_events_service::events_source::{EventConsumer, EventsStreamer};
24use hotshot_orchestrator::client::OrchestratorClient;
25use hotshot_types::{
26 consensus::ConsensusMetricsValue,
27 data::{Leaf2, ViewNumber},
28 epoch_membership::EpochMembershipCoordinator,
29 network::NetworkConfig,
30 storage_metrics::StorageMetricsValue,
31 traits::{metrics::Metrics, network::ConnectedNetwork, node_implementation::Versions},
32 PeerConfig, ValidatorConfig,
33};
34use parking_lot::Mutex;
35use request_response::RequestResponseConfig;
36use tokio::{spawn, sync::mpsc::channel, task::JoinHandle};
37use tracing::{Instrument, Level};
38use url::Url;
39
40use crate::{
41 catchup::ParallelStateCatchup,
42 external_event_handler::ExternalEventHandler,
43 proposal_fetcher::ProposalFetcherConfig,
44 request_response::{
45 data_source::{DataSource, Storage as RequestResponseStorage},
46 network::Sender as RequestResponseSender,
47 recipient_source::RecipientSource,
48 RequestResponseProtocol,
49 },
50 state_signature::StateSigner,
51 Node, SeqTypes, SequencerApiVersion,
52};
53
54pub type Consensus<N, P, V> = SystemContextHandle<SeqTypes, Node<N, P>, V>;
56
57#[derive(Derivative, Clone)]
59#[derivative(Debug(bound = ""))]
60pub struct SequencerContext<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> {
61 #[derivative(Debug = "ignore")]
63 handle: Arc<RwLock<Consensus<N, P, V>>>,
64
65 #[derivative(Debug = "ignore")]
67 #[allow(dead_code)]
68 pub request_response_protocol: RequestResponseProtocol<Node<N, P>, V, N, P>,
69
70 state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
72
73 #[derivative(Debug = "ignore")]
75 wait_for_orchestrator: Option<Arc<OrchestratorClient>>,
76
77 tasks: TaskList,
79
80 events_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
82
83 detached: bool,
84
85 node_state: NodeState,
86
87 network_config: NetworkConfig<SeqTypes>,
88
89 #[derivative(Debug = "ignore")]
90 validator_config: ValidatorConfig<SeqTypes>,
91}
92
93impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> SequencerContext<N, P, V> {
94 #[tracing::instrument(skip_all, fields(node_id = instance_state.node_id))]
95 #[allow(clippy::too_many_arguments)]
96 pub async fn init(
97 network_config: NetworkConfig<SeqTypes>,
98 validator_config: ValidatorConfig<SeqTypes>,
99 coordinator: EpochMembershipCoordinator<SeqTypes>,
100 instance_state: NodeState,
101 storage: Option<RequestResponseStorage>,
102 state_catchup: ParallelStateCatchup,
103 persistence: Arc<P>,
104 network: Arc<N>,
105 state_relay_server: Option<Url>,
106 metrics: &dyn Metrics,
107 stake_table_capacity: usize,
108 event_consumer: impl PersistenceEventConsumer + 'static,
109 _: V,
110 proposal_fetcher_cfg: ProposalFetcherConfig,
111 ) -> anyhow::Result<Self> {
112 let config = &network_config.config;
113 let pub_key = validator_config.public_key;
114 tracing::info!(%pub_key, "initializing consensus");
115
116 metrics
118 .create_gauge("node_index".into(), None)
119 .set(instance_state.node_id as usize);
120
121 instance_state.l1_client.spawn_tasks().await;
123
124 let (initializer, anchor_view) = persistence
126 .load_consensus_state::<V>(instance_state.clone())
127 .await?;
128
129 let stake_table = config.hotshot_stake_table();
130 let stake_table_commit = stake_table.commitment(stake_table_capacity)?;
131 let stake_table_epoch = None;
132
133 let event_streamer = Arc::new(RwLock::new(EventsStreamer::<SeqTypes>::new(
134 stake_table.0,
135 0,
136 )));
137
138 let handle = SystemContext::init(
139 validator_config.public_key,
140 validator_config.private_key.clone(),
141 validator_config.state_private_key.clone(),
142 instance_state.node_id,
143 config.clone(),
144 coordinator.clone(),
145 network.clone(),
146 initializer,
147 ConsensusMetricsValue::new(metrics),
148 Arc::clone(&persistence),
149 StorageMetricsValue::new(metrics),
150 )
151 .await?
152 .0;
153
154 let mut state_signer = StateSigner::new(
155 validator_config.state_private_key.clone(),
156 validator_config.state_public_key.clone(),
157 stake_table_commit,
158 stake_table_epoch,
159 stake_table_capacity,
160 );
161 if let Some(url) = state_relay_server {
162 state_signer = state_signer.with_relay_server(url);
163 }
164
165 let (outbound_message_sender, outbound_message_receiver) = channel(20);
167 let (request_response_sender, request_response_receiver) = channel(20);
168
169 let request_response_config = RequestResponseConfig {
171 incoming_request_ttl: Duration::from_secs(40),
172 incoming_request_timeout: Duration::from_secs(5),
173 incoming_response_timeout: Duration::from_secs(5),
174 request_batch_size: 5,
175 request_batch_interval: Duration::from_secs(2),
176 max_incoming_requests: 10,
177 max_incoming_requests_per_key: 1,
178 max_incoming_responses: 20,
179 };
180
181 let request_response_protocol = RequestResponseProtocol::new(
183 request_response_config,
184 RequestResponseSender::new(outbound_message_sender),
185 request_response_receiver,
186 RecipientSource {
187 memberships: coordinator,
188 consensus: handle.hotshot.clone(),
189 public_key: validator_config.public_key,
190 },
191 DataSource {
192 node_state: instance_state.clone(),
193 storage,
194 consensus: handle.hotshot.clone(),
195 phantom: PhantomData,
196 },
197 validator_config.public_key,
198 validator_config.private_key.clone(),
199 );
200
201 state_catchup.add_provider(Arc::new(request_response_protocol.clone()));
205
206 let mut tasks = TaskList::default();
208 let external_event_handler = ExternalEventHandler::<V>::new(
209 &mut tasks,
210 request_response_sender,
211 outbound_message_receiver,
212 network,
213 pub_key,
214 )
215 .await
216 .with_context(|| "Failed to create external event handler")?;
217
218 Ok(Self::new(
219 handle,
220 persistence,
221 state_signer,
222 external_event_handler,
223 request_response_protocol,
224 event_streamer,
225 instance_state,
226 network_config,
227 validator_config,
228 event_consumer,
229 anchor_view,
230 proposal_fetcher_cfg,
231 metrics,
232 )
233 .with_task_list(tasks))
234 }
235
236 #[allow(clippy::too_many_arguments)]
238 fn new(
239 handle: Consensus<N, P, V>,
240 persistence: Arc<P>,
241 state_signer: StateSigner<SequencerApiVersion>,
242 external_event_handler: ExternalEventHandler<V>,
243 request_response_protocol: RequestResponseProtocol<Node<N, P>, V, N, P>,
244 event_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
245 node_state: NodeState,
246 network_config: NetworkConfig<SeqTypes>,
247 validator_config: ValidatorConfig<SeqTypes>,
248 event_consumer: impl PersistenceEventConsumer + 'static,
249 anchor_view: Option<ViewNumber>,
250 proposal_fetcher_cfg: ProposalFetcherConfig,
251 metrics: &dyn Metrics,
252 ) -> Self {
253 let events = handle.event_stream();
254
255 let node_id = node_state.node_id;
256 let mut ctx = Self {
257 handle: Arc::new(RwLock::new(handle)),
258 state_signer: Arc::new(RwLock::new(state_signer)),
259 request_response_protocol,
260 tasks: Default::default(),
261 detached: false,
262 wait_for_orchestrator: None,
263 events_streamer: event_streamer.clone(),
264 node_state,
265 network_config,
266 validator_config,
267 };
268
269 proposal_fetcher_cfg.spawn(
271 &mut ctx.tasks,
272 ctx.handle.clone(),
273 persistence.clone(),
274 metrics,
275 );
276
277 ctx.spawn(
279 "event handler",
280 handle_events(
281 ctx.handle.clone(),
282 node_id,
283 events,
284 persistence,
285 ctx.state_signer.clone(),
286 external_event_handler,
287 Some(event_streamer.clone()),
288 event_consumer,
289 anchor_view,
290 ),
291 );
292
293 ctx
294 }
295
296 pub fn wait_for_orchestrator(mut self, client: OrchestratorClient) -> Self {
298 self.wait_for_orchestrator = Some(Arc::new(client));
299 self
300 }
301
302 pub(crate) fn with_task_list(mut self, tasks: TaskList) -> Self {
304 self.tasks.extend(tasks);
305 self
306 }
307
308 pub fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
310 self.state_signer.clone()
311 }
312
313 pub async fn event_stream(&self) -> impl Stream<Item = Event<SeqTypes>> {
315 self.handle.read().await.event_stream()
316 }
317
318 pub async fn submit_transaction(&self, tx: Transaction) -> anyhow::Result<()> {
319 self.handle.read().await.submit_transaction(tx).await?;
320 Ok(())
321 }
322
323 pub fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
325 self.events_streamer.clone()
326 }
327
328 pub fn consensus(&self) -> Arc<RwLock<Consensus<N, P, V>>> {
330 Arc::clone(&self.handle)
331 }
332
333 pub async fn shutdown_consensus(&self) {
334 self.handle.write().await.shut_down().await
335 }
336
337 pub async fn decided_leaf(&self) -> Leaf2<SeqTypes> {
338 self.handle.read().await.decided_leaf().await
339 }
340
341 pub async fn state(&self, view: ViewNumber) -> Option<Arc<ValidatedState>> {
342 self.handle.read().await.state(view).await
343 }
344
345 pub async fn decided_state(&self) -> Arc<ValidatedState> {
346 self.handle.read().await.decided_state().await
347 }
348
349 pub fn node_id(&self) -> u64 {
350 self.node_state.node_id
351 }
352
353 pub fn node_state(&self) -> NodeState {
354 self.node_state.clone()
355 }
356
357 pub async fn start_consensus(&self) {
359 if let Some(orchestrator_client) = &self.wait_for_orchestrator {
360 tracing::warn!("waiting for orchestrated start");
361 let peer_config = PeerConfig::to_bytes(&self.validator_config.public_config()).clone();
362 orchestrator_client
363 .wait_for_all_nodes_ready(peer_config)
364 .await;
365 } else {
366 tracing::error!("Cannot get info from orchestrator client");
367 }
368 tracing::warn!("starting consensus");
369 self.handle.read().await.hotshot.start_consensus().await;
370 }
371
372 pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
377 self.tasks.spawn(name, task);
378 }
379
380 pub fn spawn_short_lived(
388 &mut self,
389 name: impl Display,
390 task: impl Future<Output: Debug> + Send + 'static,
391 ) {
392 self.tasks.spawn_short_lived(name, task);
393 }
394
395 pub async fn shut_down(&mut self) {
397 tracing::info!("shutting down SequencerContext");
398 self.handle.write().await.shut_down().await;
399 self.tasks.shut_down();
400 self.node_state.l1_client.shut_down_tasks().await;
401
402 self.detached = true;
405 }
406
407 pub async fn join(mut self) {
412 self.tasks.join().await;
413 }
414
415 pub fn detach(&mut self) {
417 self.detached = true;
419 }
420
421 pub fn network_config(&self) -> NetworkConfig<SeqTypes> {
423 self.network_config.clone()
424 }
425}
426
427impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Drop
428 for SequencerContext<N, P, V>
429{
430 fn drop(&mut self) {
431 if !self.detached {
432 let handle_clone = self.handle.clone();
434 let tasks_clone = self.tasks.clone();
435 let node_state_clone = self.node_state.clone();
436
437 spawn(async move {
438 tracing::info!("shutting down SequencerContext");
439 handle_clone.write().await.shut_down().await;
440 tasks_clone.shut_down();
441 node_state_clone.l1_client.shut_down_tasks().await;
442 });
443
444 self.detached = true;
446 }
447 }
448}
449
450#[tracing::instrument(skip_all, fields(node_id))]
451#[allow(clippy::too_many_arguments)]
452async fn handle_events<N, P, V>(
453 consensus: Arc<RwLock<Consensus<N, P, V>>>,
454 node_id: u64,
455 mut events: impl Stream<Item = Event<SeqTypes>> + Unpin,
456 persistence: Arc<P>,
457 state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
458 external_event_handler: ExternalEventHandler<V>,
459 events_streamer: Option<Arc<RwLock<EventsStreamer<SeqTypes>>>>,
460 event_consumer: impl PersistenceEventConsumer + 'static,
461 anchor_view: Option<ViewNumber>,
462) where
463 N: ConnectedNetwork<PubKey>,
464 P: SequencerPersistence,
465 V: Versions,
466{
467 if let Some(view) = anchor_view {
468 if let Err(err) = persistence
471 .append_decided_leaves(view, vec![], &event_consumer)
472 .await
473 {
474 tracing::warn!(
475 "failed to process decided leaves, chain may not be up to date: {err:#}"
476 );
477 }
478 }
479
480 while let Some(event) = events.next().await {
481 tracing::debug!(node_id, ?event, "consensus event");
482 persistence.handle_event(&event, &event_consumer).await;
484
485 state_signer
487 .write()
488 .await
489 .handle_event(&event, consensus.clone())
490 .await;
491
492 if let EventType::ExternalMessageReceived { data, .. } = &event.event {
494 if let Err(err) = external_event_handler.handle_event(data).await {
495 tracing::warn!("Failed to handle external message: {:?}", err);
496 };
497 }
498
499 if let Some(events_streamer) = events_streamer.as_ref() {
501 events_streamer.write().await.handle_event(event).await;
502 }
503 }
504}
505
506#[derive(Debug, Default, Clone)]
507#[allow(clippy::type_complexity)]
508pub(crate) struct TaskList(Arc<Mutex<Vec<(String, JoinHandle<()>)>>>);
509
510macro_rules! spawn_with_log_level {
511 ($this:expr, $lvl:expr, $name:expr, $task: expr) => {
512 let name = $name.to_string();
513 let task = {
514 let name = name.clone();
515 let span = tracing::span!($lvl, "background task", name);
516 spawn(
517 async move {
518 tracing::event!($lvl, "spawning background task");
519 let res = $task.await;
520 tracing::event!($lvl, ?res, "background task exited");
521 }
522 .instrument(span),
523 )
524 };
525 $this.0.lock().push((name, task));
526 };
527}
528
529impl TaskList {
530 pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
535 spawn_with_log_level!(self, Level::INFO, name, task);
536 }
537
538 pub fn spawn_short_lived(
546 &mut self,
547 name: impl Display,
548 task: impl Future<Output: Debug> + Send + 'static,
549 ) {
550 spawn_with_log_level!(self, Level::DEBUG, name, task);
551 }
552
553 pub fn shut_down(&self) {
555 let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
556 for (name, task) in tasks.into_iter().rev() {
557 tracing::info!(name, "cancelling background task");
558 task.abort();
559 }
560 }
561
562 pub async fn join(&mut self) {
564 let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
565 join_all(tasks.into_iter().map(|(_, task)| task)).await;
566 }
567
568 pub fn extend(&mut self, tasks: TaskList) {
569 self.0.lock().extend(
570 tasks
571 .0
572 .lock()
573 .drain(..)
574 .collect::<Vec<(String, JoinHandle<()>)>>(),
575 );
576 }
577}
578
579impl Drop for TaskList {
580 fn drop(&mut self) {
581 self.shut_down()
582 }
583}