1pub mod task_state;
11use std::{collections::BTreeMap, fmt::Debug, num::NonZeroUsize, sync::Arc, time::Duration};
12
13use async_broadcast::{broadcast, RecvError};
14use async_lock::RwLock;
15use async_trait::async_trait;
16use futures::{
17 future::{BoxFuture, FutureExt},
18 stream, StreamExt,
19};
20use hotshot_task::task::Task;
21#[cfg(feature = "rewind")]
22use hotshot_task_impls::rewind::RewindTaskState;
23use hotshot_task_impls::{
24 da::DaTaskState,
25 events::HotShotEvent,
26 network::{NetworkEventTaskState, NetworkMessageTaskState},
27 request::NetworkRequestState,
28 response::{run_response_task, NetworkResponseState},
29 stats::StatsTaskState,
30 transactions::TransactionTaskState,
31 upgrade::UpgradeTaskState,
32 vid::VidTaskState,
33 view_sync::ViewSyncTaskState,
34};
35use hotshot_types::{
36 consensus::OuterConsensus,
37 constants::EVENT_CHANNEL_SIZE,
38 message::{Message, MessageKind, UpgradeLock, EXTERNAL_MESSAGE_VERSION},
39 storage_metrics::StorageMetricsValue,
40 traits::{
41 network::ConnectedNetwork,
42 node_implementation::{ConsensusTime, NodeImplementation, NodeType},
43 },
44};
45use tokio::{spawn, time::sleep};
46use vbs::version::{StaticVersionType, Version};
47
48use crate::{
49 genesis_epoch_from_version, tasks::task_state::CreateTaskState, types::SystemContextHandle,
50 ConsensusApi, ConsensusMetricsValue, ConsensusTaskRegistry, EpochMembershipCoordinator,
51 HotShotConfig, HotShotInitializer, NetworkTaskRegistry, SignatureKey, StateSignatureKey,
52 SystemContext, Versions,
53};
54
55#[derive(Clone, Debug)]
57pub enum GlobalEvent {
58 Shutdown,
60 Dummy,
62}
63
64pub async fn add_request_network_task<
66 TYPES: NodeType,
67 I: NodeImplementation<TYPES>,
68 V: Versions,
69>(
70 handle: &mut SystemContextHandle<TYPES, I, V>,
71) {
72 let state = NetworkRequestState::<TYPES, I>::create_from(handle).await;
73
74 let task = Task::new(
75 state,
76 handle.internal_event_stream.0.clone(),
77 handle.internal_event_stream.1.activate_cloned(),
78 );
79 handle.consensus_registry.run_task(task);
80}
81
82pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
84 handle: &mut SystemContextHandle<TYPES, I, V>,
85) {
86 let state = NetworkResponseState::<TYPES, V>::new(
87 handle.hotshot.consensus(),
88 handle.membership_coordinator.clone(),
89 handle.public_key().clone(),
90 handle.private_key().clone(),
91 handle.hotshot.id,
92 handle.hotshot.upgrade_lock.clone(),
93 );
94 handle
95 .network_registry
96 .register(run_response_task::<TYPES, V>(
97 state,
98 handle.internal_event_stream.1.activate_cloned(),
99 handle.internal_event_stream.0.clone(),
100 ));
101}
102
103pub fn add_queue_len_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
105 handle: &mut SystemContextHandle<TYPES, I, V>,
106) {
107 let consensus = handle.hotshot.consensus();
108 let rx = handle.internal_event_stream.1.clone();
109 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
110 let task_handle = spawn(async move {
111 futures::pin_mut!(shutdown_signal);
112 loop {
113 futures::select! {
114 () = shutdown_signal => {
115 return;
116 },
117 () = sleep(Duration::from_millis(500)).fuse() => {
118 consensus.read().await.metrics.internal_event_queue_len.set(rx.len());
119 }
120 }
121 }
122 });
123 handle.network_registry.register(task_handle);
124}
125
126#[allow(clippy::missing_panics_doc)]
128pub fn add_network_message_task<
129 TYPES: NodeType,
130 I: NodeImplementation<TYPES>,
131 NET: ConnectedNetwork<TYPES::SignatureKey>,
132 V: Versions,
133>(
134 handle: &mut SystemContextHandle<TYPES, I, V>,
135 channel: &Arc<NET>,
136) {
137 let upgrade_lock = handle.hotshot.upgrade_lock.clone();
138
139 let network_state: NetworkMessageTaskState<TYPES, V> = NetworkMessageTaskState {
140 internal_event_stream: handle.internal_event_stream.0.clone(),
141 external_event_stream: handle.output_event_stream.0.clone(),
142 public_key: handle.public_key().clone(),
143 transactions_cache: lru::LruCache::new(NonZeroUsize::new(100_000).unwrap()),
144 upgrade_lock: upgrade_lock.clone(),
145 id: handle.hotshot.id,
146 };
147
148 let network = Arc::clone(channel);
149 let mut state = network_state.clone();
150 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
151 let task_handle = spawn(async move {
152 futures::pin_mut!(shutdown_signal);
153
154 loop {
155 futures::select! {
157 () = shutdown_signal => {
159 tracing::error!("Shutting down network message task");
160 return;
161 }
162
163 message = network.recv_message().fuse() => {
165 let Ok(message) = message else {
167 continue;
168 };
169
170 let (deserialized_message, version): (Message<TYPES>, Version) = match upgrade_lock.deserialize(&message).await {
172 Ok(message) => message,
173 Err(e) => {
174 tracing::error!("Failed to deserialize message: {:?}", e);
175 continue;
176 }
177 };
178
179 if version == EXTERNAL_MESSAGE_VERSION
182 && !matches!(deserialized_message.kind, MessageKind::<TYPES>::External(_))
183 {
184 tracing::warn!("Received a non-external message with version 0.0");
185 continue;
186 }
187
188 state.handle_message(deserialized_message).await;
190 }
191 }
192 }
193 });
194 handle.network_registry.register(task_handle);
195}
196
197pub fn add_network_event_task<
199 TYPES: NodeType,
200 I: NodeImplementation<TYPES>,
201 V: Versions,
202 NET: ConnectedNetwork<TYPES::SignatureKey>,
203>(
204 handle: &mut SystemContextHandle<TYPES, I, V>,
205 network: Arc<NET>,
206) {
207 let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState {
208 network,
209 view: TYPES::View::genesis(),
210 epoch: genesis_epoch_from_version::<V, TYPES>(),
211 membership_coordinator: handle.membership_coordinator.clone(),
212 storage: handle.storage(),
213 storage_metrics: handle.storage_metrics(),
214 consensus: OuterConsensus::new(handle.consensus()),
215 upgrade_lock: handle.hotshot.upgrade_lock.clone(),
216 transmit_tasks: BTreeMap::new(),
217 epoch_height: handle.epoch_height,
218 id: handle.hotshot.id,
219 };
220 let task = Task::new(
221 network_state,
222 handle.internal_event_stream.0.clone(),
223 handle.internal_event_stream.1.activate_cloned(),
224 );
225 handle.consensus_registry.run_task(task);
226}
227
228pub async fn add_consensus_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
230 handle: &mut SystemContextHandle<TYPES, I, V>,
231) {
232 handle.add_task(ViewSyncTaskState::<TYPES, V>::create_from(handle).await);
233 handle.add_task(VidTaskState::<TYPES, I, V>::create_from(handle).await);
234 handle.add_task(DaTaskState::<TYPES, I, V>::create_from(handle).await);
235 handle.add_task(TransactionTaskState::<TYPES, V>::create_from(handle).await);
236
237 {
238 let mut upgrade_certificate_lock = handle
239 .hotshot
240 .upgrade_lock
241 .decided_upgrade_certificate
242 .write()
243 .await;
244
245 if upgrade_certificate_lock
247 .as_ref()
248 .is_some_and(|cert| V::Base::VERSION >= cert.data.new_version)
249 {
250 tracing::warn!("Discarding loaded upgrade certificate due to version configuration.");
251 *upgrade_certificate_lock = None;
252 }
253 }
254
255 if V::Base::VERSION < V::Upgrade::VERSION {
257 tracing::warn!("Consensus was started with an upgrade configured. Spawning upgrade task.");
258 handle.add_task(UpgradeTaskState::<TYPES, V>::create_from(handle).await);
259 }
260
261 {
262 use hotshot_task_impls::{
263 consensus::ConsensusTaskState, quorum_proposal::QuorumProposalTaskState,
264 quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState,
265 };
266
267 handle.add_task(QuorumProposalTaskState::<TYPES, I, V>::create_from(handle).await);
268 handle.add_task(QuorumVoteTaskState::<TYPES, I, V>::create_from(handle).await);
269 handle.add_task(QuorumProposalRecvTaskState::<TYPES, I, V>::create_from(handle).await);
270 handle.add_task(ConsensusTaskState::<TYPES, I, V>::create_from(handle).await);
271 handle.add_task(StatsTaskState::<TYPES>::create_from(handle).await);
272 }
273 add_queue_len_task(handle);
274 #[cfg(feature = "rewind")]
275 handle.add_task(RewindTaskState::<TYPES>::create_from(&handle).await);
276}
277
278#[must_use]
286pub fn create_shutdown_event_monitor<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
287 handle: &SystemContextHandle<TYPES, I, V>,
288) -> BoxFuture<'static, ()> {
289 let mut event_stream = handle.internal_event_stream.1.activate_cloned();
291
292 async move {
294 loop {
295 match event_stream.recv_direct().await {
296 Ok(event) => {
297 if matches!(event.as_ref(), HotShotEvent::Shutdown) {
298 return;
299 }
300 },
301 Err(RecvError::Closed) => {
302 return;
303 },
304 Err(e) => {
305 tracing::error!("Shutdown event monitor channel recv error: {}", e);
306 },
307 }
308 }
309 }
310 .boxed()
311}
312
313#[allow(clippy::too_many_arguments)]
314#[async_trait]
315pub trait EventTransformerState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
319where
320 Self: std::fmt::Debug + Send + Sync + 'static,
321{
322 async fn recv_handler(&mut self, event: &HotShotEvent<TYPES>) -> Vec<HotShotEvent<TYPES>>;
324
325 async fn send_handler(
327 &mut self,
328 event: &HotShotEvent<TYPES>,
329 public_key: &TYPES::SignatureKey,
330 private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
331 upgrade_lock: &UpgradeLock<TYPES, V>,
332 consensus: OuterConsensus<TYPES>,
333 membership_coordinator: EpochMembershipCoordinator<TYPES>,
334 network: Arc<I::Network>,
335 ) -> Vec<HotShotEvent<TYPES>>;
336
337 #[allow(clippy::too_many_arguments)]
338 async fn spawn_handle(
340 &'static mut self,
341 public_key: TYPES::SignatureKey,
342 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
343 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
344 nonce: u64,
345 config: HotShotConfig<TYPES>,
346 memberships: EpochMembershipCoordinator<TYPES>,
347 network: Arc<I::Network>,
348 initializer: HotShotInitializer<TYPES>,
349 consensus_metrics: ConsensusMetricsValue,
350 storage: I::Storage,
351 storage_metrics: StorageMetricsValue,
352 ) -> SystemContextHandle<TYPES, I, V> {
353 let epoch_height = config.epoch_height;
354
355 let hotshot = SystemContext::new(
356 public_key,
357 private_key,
358 state_private_key,
359 nonce,
360 config,
361 memberships.clone(),
362 network,
363 initializer,
364 consensus_metrics,
365 storage.clone(),
366 storage_metrics,
367 )
368 .await;
369 let consensus_registry = ConsensusTaskRegistry::new();
370 let network_registry = NetworkTaskRegistry::new();
371
372 let output_event_stream = hotshot.external_event_stream.clone();
373 let internal_event_stream = hotshot.internal_event_stream.clone();
374
375 let mut handle = SystemContextHandle {
376 consensus_registry,
377 network_registry,
378 output_event_stream: output_event_stream.clone(),
379 internal_event_stream: internal_event_stream.clone(),
380 hotshot: Arc::clone(&hotshot),
381 storage,
382 network: Arc::clone(&hotshot.network),
383 membership_coordinator: memberships.clone(),
384 epoch_height,
385 };
386
387 add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
388 self.add_network_tasks(&mut handle).await;
389
390 handle
391 }
392
393 #[allow(clippy::too_many_lines)]
395 async fn add_network_tasks(&'static mut self, handle: &mut SystemContextHandle<TYPES, I, V>) {
396 let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
401 let (network_task_sender, receiver_from_network) = broadcast(EVENT_CHANNEL_SIZE);
403 let (original_sender, original_receiver) = (
405 handle.internal_event_stream.0.clone(),
406 handle.internal_event_stream.1.activate_cloned(),
407 );
408
409 let mut internal_event_stream = (
412 network_task_sender.clone(),
413 network_task_receiver.clone().deactivate(),
414 );
415 std::mem::swap(
416 &mut internal_event_stream,
417 &mut handle.internal_event_stream,
418 );
419
420 add_network_message_and_request_receiver_tasks(handle).await;
422 self.add_network_event_tasks(handle);
423
424 std::mem::swap(
425 &mut internal_event_stream,
426 &mut handle.internal_event_stream,
427 );
428
429 let state_in = Arc::new(RwLock::new(self));
430 let state_out = Arc::clone(&state_in);
431 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
434 let public_key = handle.public_key().clone();
435 let private_key = handle.private_key().clone();
436 let upgrade_lock = handle.hotshot.upgrade_lock.clone();
437 let consensus = OuterConsensus::new(handle.consensus());
438 let membership_coordinator = handle.membership_coordinator.clone();
439 let network = Arc::clone(&handle.network);
440 let send_handle = spawn(async move {
441 futures::pin_mut!(shutdown_signal);
442
443 let recv_stream = stream::unfold(original_receiver, |mut recv| async move {
444 match recv.recv().await {
445 Ok(event) => Some((Ok(event), recv)),
446 Err(async_broadcast::RecvError::Closed) => None,
447 Err(e) => Some((Err(e), recv)),
448 }
449 })
450 .boxed();
451
452 let fused_recv_stream = recv_stream.fuse();
453 futures::pin_mut!(fused_recv_stream);
454
455 loop {
456 futures::select! {
457 () = shutdown_signal => {
458 tracing::error!("Shutting down relay send task");
459 let _ = sender_to_network.broadcast(HotShotEvent::<TYPES>::Shutdown.into()).await;
460 return;
461 }
462 event = fused_recv_stream.next() => {
463 match event {
464 Some(Ok(msg)) => {
465 let mut state = state_out.write().await;
466 let mut results = state.send_handler(
467 &msg,
468 &public_key,
469 &private_key,
470 &upgrade_lock,
471 consensus.clone(),
472 membership_coordinator.clone(),
473 Arc::clone(&network),
474 ).await;
475 results.reverse();
476 while let Some(event) = results.pop() {
477 let _ = sender_to_network.broadcast(event.into()).await;
478 }
479 }
480 Some(Err(e)) => {
481 tracing::error!("Relay Task, send_handle, Error receiving event: {e:?}");
482 }
483 None => {
484 tracing::info!("Relay Task, send_handle, Event stream closed");
485 return;
486 }
487 }
488 }
489 }
490 }
491 });
492
493 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
496 let recv_handle = spawn(async move {
497 futures::pin_mut!(shutdown_signal);
498
499 let network_recv_stream =
500 stream::unfold(receiver_from_network, |mut recv| async move {
501 match recv.recv().await {
502 Ok(event) => Some((Ok(event), recv)),
503 Err(async_broadcast::RecvError::Closed) => None,
504 Err(e) => Some((Err(e), recv)),
505 }
506 });
507
508 let fused_network_recv_stream = network_recv_stream.boxed().fuse();
509 futures::pin_mut!(fused_network_recv_stream);
510
511 loop {
512 futures::select! {
513 () = shutdown_signal => {
514 tracing::error!("Shutting down relay receive task");
515 return;
516 }
517 event = fused_network_recv_stream.next() => {
518 match event {
519 Some(Ok(msg)) => {
520 let mut state = state_in.write().await;
521 let mut results = state.recv_handler(&msg).await;
522 results.reverse();
523 while let Some(event) = results.pop() {
524 let _ = original_sender.broadcast(event.into()).await;
525 }
526 }
527 Some(Err(e)) => {
528 tracing::error!("Relay Task, recv_handle, Error receiving event from network: {e:?}");
529 }
530 None => {
531 tracing::info!("Relay Task, recv_handle, Network event stream closed");
532 return;
533 }
534 }
535 }
536 }
537 }
538 });
539
540 handle.network_registry.register(send_handle);
541 handle.network_registry.register(recv_handle);
542 }
543
544 fn add_network_event_tasks(&self, handle: &mut SystemContextHandle<TYPES, I, V>) {
546 let network = Arc::clone(&handle.network);
547
548 self.add_network_event_task(handle, network);
549 }
550
551 fn add_network_event_task(
553 &self,
554 handle: &mut SystemContextHandle<TYPES, I, V>,
555 channel: Arc<<I as NodeImplementation<TYPES>>::Network>,
556 ) {
557 add_network_event_task(handle, channel);
558 }
559}
560
561pub async fn add_network_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
563 handle: &mut SystemContextHandle<TYPES, I, V>,
564) {
565 add_network_message_and_request_receiver_tasks(handle).await;
566
567 add_network_event_tasks(handle);
568}
569
570pub async fn add_network_message_and_request_receiver_tasks<
572 TYPES: NodeType,
573 I: NodeImplementation<TYPES>,
574 V: Versions,
575>(
576 handle: &mut SystemContextHandle<TYPES, I, V>,
577) {
578 let network = Arc::clone(&handle.network);
579
580 add_network_message_task(handle, &network);
581
582 add_request_network_task(handle).await;
583 add_response_task(handle);
584}
585
586pub fn add_network_event_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
588 handle: &mut SystemContextHandle<TYPES, I, V>,
589) {
590 add_network_event_task(handle, Arc::clone(&handle.network));
591}