1pub mod task_state;
11use std::{collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration};
12
13use async_broadcast::{broadcast, RecvError};
14use async_lock::RwLock;
15use async_trait::async_trait;
16use futures::{
17 future::{BoxFuture, FutureExt},
18 stream, StreamExt,
19};
20use hotshot_task::task::Task;
21#[cfg(feature = "rewind")]
22use hotshot_task_impls::rewind::RewindTaskState;
23use hotshot_task_impls::{
24 da::DaTaskState,
25 events::HotShotEvent,
26 network::{NetworkEventTaskState, NetworkMessageTaskState},
27 request::NetworkRequestState,
28 response::{run_response_task, NetworkResponseState},
29 stats::StatsTaskState,
30 transactions::TransactionTaskState,
31 upgrade::UpgradeTaskState,
32 vid::VidTaskState,
33 view_sync::ViewSyncTaskState,
34};
35use hotshot_types::{
36 consensus::OuterConsensus,
37 constants::EVENT_CHANNEL_SIZE,
38 message::{Message, MessageKind, UpgradeLock, EXTERNAL_MESSAGE_VERSION},
39 storage_metrics::StorageMetricsValue,
40 traits::{
41 network::ConnectedNetwork,
42 node_implementation::{ConsensusTime, NodeImplementation, NodeType},
43 },
44};
45use tokio::{spawn, time::sleep};
46use vbs::version::{StaticVersionType, Version};
47
48use crate::{
49 genesis_epoch_from_version, tasks::task_state::CreateTaskState, types::SystemContextHandle,
50 ConsensusApi, ConsensusMetricsValue, ConsensusTaskRegistry, EpochMembershipCoordinator,
51 HotShotConfig, HotShotInitializer, NetworkTaskRegistry, SignatureKey, StateSignatureKey,
52 SystemContext, Versions,
53};
54
55#[derive(Clone, Debug)]
57pub enum GlobalEvent {
58 Shutdown,
60 Dummy,
62}
63
64pub async fn add_request_network_task<
66 TYPES: NodeType,
67 I: NodeImplementation<TYPES>,
68 V: Versions,
69>(
70 handle: &mut SystemContextHandle<TYPES, I, V>,
71) {
72 let state = NetworkRequestState::<TYPES, I>::create_from(handle).await;
73
74 let task = Task::new(
75 state,
76 handle.internal_event_stream.0.clone(),
77 handle.internal_event_stream.1.activate_cloned(),
78 );
79 handle.consensus_registry.run_task(task);
80}
81
82pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
84 handle: &mut SystemContextHandle<TYPES, I, V>,
85) {
86 let state = NetworkResponseState::<TYPES, V>::new(
87 handle.hotshot.consensus(),
88 handle.membership_coordinator.clone(),
89 handle.public_key().clone(),
90 handle.private_key().clone(),
91 handle.hotshot.id,
92 handle.hotshot.upgrade_lock.clone(),
93 );
94 handle
95 .network_registry
96 .register(run_response_task::<TYPES, V>(
97 state,
98 handle.internal_event_stream.1.activate_cloned(),
99 handle.internal_event_stream.0.clone(),
100 ));
101}
102
103pub fn add_queue_len_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
105 handle: &mut SystemContextHandle<TYPES, I, V>,
106) {
107 let consensus = handle.hotshot.consensus();
108 let rx = handle.internal_event_stream.1.clone();
109 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
110 let task_handle = spawn(async move {
111 futures::pin_mut!(shutdown_signal);
112 loop {
113 futures::select! {
114 () = shutdown_signal => {
115 return;
116 },
117 () = sleep(Duration::from_millis(500)).fuse() => {
118 consensus.read().await.metrics.internal_event_queue_len.set(rx.len());
119 }
120 }
121 }
122 });
123 handle.network_registry.register(task_handle);
124}
125
126#[allow(clippy::missing_panics_doc)]
128pub fn add_network_message_task<
129 TYPES: NodeType,
130 I: NodeImplementation<TYPES>,
131 NET: ConnectedNetwork<TYPES::SignatureKey>,
132 V: Versions,
133>(
134 handle: &mut SystemContextHandle<TYPES, I, V>,
135 channel: &Arc<NET>,
136) {
137 let upgrade_lock = handle.hotshot.upgrade_lock.clone();
138
139 let network_state: NetworkMessageTaskState<TYPES, V> = NetworkMessageTaskState {
140 internal_event_stream: handle.internal_event_stream.0.clone(),
141 external_event_stream: handle.output_event_stream.0.clone(),
142 public_key: handle.public_key().clone(),
143 upgrade_lock: upgrade_lock.clone(),
144 id: handle.hotshot.id,
145 };
146
147 let network = Arc::clone(channel);
148 let mut state = network_state.clone();
149 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
150 let task_handle = spawn(async move {
151 futures::pin_mut!(shutdown_signal);
152
153 loop {
154 futures::select! {
156 () = shutdown_signal => {
158 tracing::error!("Shutting down network message task");
159 return;
160 }
161
162 message = network.recv_message().fuse() => {
164 let Ok(message) = message else {
166 continue;
167 };
168
169 let (deserialized_message, version): (Message<TYPES>, Version) = match upgrade_lock.deserialize(&message).await {
171 Ok(message) => message,
172 Err(e) => {
173 tracing::error!("Failed to deserialize message: {:?}", e);
174 continue;
175 }
176 };
177
178 if version == EXTERNAL_MESSAGE_VERSION
181 && !matches!(deserialized_message.kind, MessageKind::<TYPES>::External(_))
182 {
183 tracing::warn!("Received a non-external message with version 0.0");
184 continue;
185 }
186
187 state.handle_message(deserialized_message).await;
189 }
190 }
191 }
192 });
193 handle.network_registry.register(task_handle);
194}
195
196pub fn add_network_event_task<
198 TYPES: NodeType,
199 I: NodeImplementation<TYPES>,
200 V: Versions,
201 NET: ConnectedNetwork<TYPES::SignatureKey>,
202>(
203 handle: &mut SystemContextHandle<TYPES, I, V>,
204 network: Arc<NET>,
205) {
206 let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState {
207 network,
208 view: TYPES::View::genesis(),
209 epoch: genesis_epoch_from_version::<V, TYPES>(),
210 membership_coordinator: handle.membership_coordinator.clone(),
211 storage: handle.storage(),
212 storage_metrics: handle.storage_metrics(),
213 consensus: OuterConsensus::new(handle.consensus()),
214 upgrade_lock: handle.hotshot.upgrade_lock.clone(),
215 transmit_tasks: BTreeMap::new(),
216 epoch_height: handle.epoch_height,
217 id: handle.hotshot.id,
218 };
219 let task = Task::new(
220 network_state,
221 handle.internal_event_stream.0.clone(),
222 handle.internal_event_stream.1.activate_cloned(),
223 );
224 handle.consensus_registry.run_task(task);
225}
226
227pub async fn add_consensus_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
229 handle: &mut SystemContextHandle<TYPES, I, V>,
230) {
231 handle.add_task(ViewSyncTaskState::<TYPES, V>::create_from(handle).await);
232 handle.add_task(VidTaskState::<TYPES, I, V>::create_from(handle).await);
233 handle.add_task(DaTaskState::<TYPES, I, V>::create_from(handle).await);
234 handle.add_task(TransactionTaskState::<TYPES, V>::create_from(handle).await);
235
236 {
237 let mut upgrade_certificate_lock = handle
238 .hotshot
239 .upgrade_lock
240 .decided_upgrade_certificate
241 .write()
242 .await;
243
244 if upgrade_certificate_lock
246 .as_ref()
247 .is_some_and(|cert| V::Base::VERSION >= cert.data.new_version)
248 {
249 tracing::warn!("Discarding loaded upgrade certificate due to version configuration.");
250 *upgrade_certificate_lock = None;
251 }
252 }
253
254 if V::Base::VERSION < V::Upgrade::VERSION {
256 tracing::warn!("Consensus was started with an upgrade configured. Spawning upgrade task.");
257 handle.add_task(UpgradeTaskState::<TYPES, V>::create_from(handle).await);
258 }
259
260 {
261 use hotshot_task_impls::{
262 consensus::ConsensusTaskState, quorum_proposal::QuorumProposalTaskState,
263 quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState,
264 };
265
266 handle.add_task(QuorumProposalTaskState::<TYPES, I, V>::create_from(handle).await);
267 handle.add_task(QuorumVoteTaskState::<TYPES, I, V>::create_from(handle).await);
268 handle.add_task(QuorumProposalRecvTaskState::<TYPES, I, V>::create_from(handle).await);
269 handle.add_task(ConsensusTaskState::<TYPES, I, V>::create_from(handle).await);
270 if cfg!(feature = "stats") {
271 handle.add_task(StatsTaskState::<TYPES>::create_from(handle).await);
272 }
273 }
274 add_queue_len_task(handle);
275 #[cfg(feature = "rewind")]
276 handle.add_task(RewindTaskState::<TYPES>::create_from(&handle).await);
277}
278
279#[must_use]
287pub fn create_shutdown_event_monitor<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
288 handle: &SystemContextHandle<TYPES, I, V>,
289) -> BoxFuture<'static, ()> {
290 let mut event_stream = handle.internal_event_stream.1.activate_cloned();
292
293 async move {
295 loop {
296 match event_stream.recv_direct().await {
297 Ok(event) => {
298 if matches!(event.as_ref(), HotShotEvent::Shutdown) {
299 return;
300 }
301 },
302 Err(RecvError::Closed) => {
303 return;
304 },
305 Err(e) => {
306 tracing::error!("Shutdown event monitor channel recv error: {}", e);
307 },
308 }
309 }
310 }
311 .boxed()
312}
313
314#[allow(clippy::too_many_arguments)]
315#[async_trait]
316pub trait EventTransformerState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
320where
321 Self: std::fmt::Debug + Send + Sync + 'static,
322{
323 async fn recv_handler(&mut self, event: &HotShotEvent<TYPES>) -> Vec<HotShotEvent<TYPES>>;
325
326 async fn send_handler(
328 &mut self,
329 event: &HotShotEvent<TYPES>,
330 public_key: &TYPES::SignatureKey,
331 private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
332 upgrade_lock: &UpgradeLock<TYPES, V>,
333 consensus: OuterConsensus<TYPES>,
334 membership_coordinator: EpochMembershipCoordinator<TYPES>,
335 network: Arc<I::Network>,
336 ) -> Vec<HotShotEvent<TYPES>>;
337
338 #[allow(clippy::too_many_arguments)]
339 async fn spawn_handle(
341 &'static mut self,
342 public_key: TYPES::SignatureKey,
343 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
344 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
345 nonce: u64,
346 config: HotShotConfig<TYPES>,
347 memberships: EpochMembershipCoordinator<TYPES>,
348 network: Arc<I::Network>,
349 initializer: HotShotInitializer<TYPES>,
350 consensus_metrics: ConsensusMetricsValue,
351 storage: I::Storage,
352 storage_metrics: StorageMetricsValue,
353 ) -> SystemContextHandle<TYPES, I, V> {
354 let epoch_height = config.epoch_height;
355
356 let hotshot = SystemContext::new(
357 public_key,
358 private_key,
359 state_private_key,
360 nonce,
361 config,
362 memberships.clone(),
363 network,
364 initializer,
365 consensus_metrics,
366 storage.clone(),
367 storage_metrics,
368 )
369 .await;
370 let consensus_registry = ConsensusTaskRegistry::new();
371 let network_registry = NetworkTaskRegistry::new();
372
373 let output_event_stream = hotshot.external_event_stream.clone();
374 let internal_event_stream = hotshot.internal_event_stream.clone();
375
376 let mut handle = SystemContextHandle {
377 consensus_registry,
378 network_registry,
379 output_event_stream: output_event_stream.clone(),
380 internal_event_stream: internal_event_stream.clone(),
381 hotshot: Arc::clone(&hotshot),
382 storage,
383 network: Arc::clone(&hotshot.network),
384 membership_coordinator: memberships.clone(),
385 epoch_height,
386 };
387
388 add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
389 self.add_network_tasks(&mut handle).await;
390
391 handle
392 }
393
394 #[allow(clippy::too_many_lines)]
396 async fn add_network_tasks(&'static mut self, handle: &mut SystemContextHandle<TYPES, I, V>) {
397 let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
402 let (network_task_sender, receiver_from_network) = broadcast(EVENT_CHANNEL_SIZE);
404 let (original_sender, original_receiver) = (
406 handle.internal_event_stream.0.clone(),
407 handle.internal_event_stream.1.activate_cloned(),
408 );
409
410 let mut internal_event_stream = (
413 network_task_sender.clone(),
414 network_task_receiver.clone().deactivate(),
415 );
416 std::mem::swap(
417 &mut internal_event_stream,
418 &mut handle.internal_event_stream,
419 );
420
421 add_network_message_and_request_receiver_tasks(handle).await;
423 self.add_network_event_tasks(handle);
424
425 std::mem::swap(
426 &mut internal_event_stream,
427 &mut handle.internal_event_stream,
428 );
429
430 let state_in = Arc::new(RwLock::new(self));
431 let state_out = Arc::clone(&state_in);
432 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
435 let public_key = handle.public_key().clone();
436 let private_key = handle.private_key().clone();
437 let upgrade_lock = handle.hotshot.upgrade_lock.clone();
438 let consensus = OuterConsensus::new(handle.consensus());
439 let membership_coordinator = handle.membership_coordinator.clone();
440 let network = Arc::clone(&handle.network);
441 let send_handle = spawn(async move {
442 futures::pin_mut!(shutdown_signal);
443
444 let recv_stream = stream::unfold(original_receiver, |mut recv| async move {
445 match recv.recv().await {
446 Ok(event) => Some((Ok(event), recv)),
447 Err(async_broadcast::RecvError::Closed) => None,
448 Err(e) => Some((Err(e), recv)),
449 }
450 })
451 .boxed();
452
453 let fused_recv_stream = recv_stream.fuse();
454 futures::pin_mut!(fused_recv_stream);
455
456 loop {
457 futures::select! {
458 () = shutdown_signal => {
459 tracing::error!("Shutting down relay send task");
460 let _ = sender_to_network.broadcast(HotShotEvent::<TYPES>::Shutdown.into()).await;
461 return;
462 }
463 event = fused_recv_stream.next() => {
464 match event {
465 Some(Ok(msg)) => {
466 let mut state = state_out.write().await;
467 let mut results = state.send_handler(
468 &msg,
469 &public_key,
470 &private_key,
471 &upgrade_lock,
472 consensus.clone(),
473 membership_coordinator.clone(),
474 Arc::clone(&network),
475 ).await;
476 results.reverse();
477 while let Some(event) = results.pop() {
478 let _ = sender_to_network.broadcast(event.into()).await;
479 }
480 }
481 Some(Err(e)) => {
482 tracing::error!("Relay Task, send_handle, Error receiving event: {e:?}");
483 }
484 None => {
485 tracing::info!("Relay Task, send_handle, Event stream closed");
486 return;
487 }
488 }
489 }
490 }
491 }
492 });
493
494 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
497 let recv_handle = spawn(async move {
498 futures::pin_mut!(shutdown_signal);
499
500 let network_recv_stream =
501 stream::unfold(receiver_from_network, |mut recv| async move {
502 match recv.recv().await {
503 Ok(event) => Some((Ok(event), recv)),
504 Err(async_broadcast::RecvError::Closed) => None,
505 Err(e) => Some((Err(e), recv)),
506 }
507 });
508
509 let fused_network_recv_stream = network_recv_stream.boxed().fuse();
510 futures::pin_mut!(fused_network_recv_stream);
511
512 loop {
513 futures::select! {
514 () = shutdown_signal => {
515 tracing::error!("Shutting down relay receive task");
516 return;
517 }
518 event = fused_network_recv_stream.next() => {
519 match event {
520 Some(Ok(msg)) => {
521 let mut state = state_in.write().await;
522 let mut results = state.recv_handler(&msg).await;
523 results.reverse();
524 while let Some(event) = results.pop() {
525 let _ = original_sender.broadcast(event.into()).await;
526 }
527 }
528 Some(Err(e)) => {
529 tracing::error!("Relay Task, recv_handle, Error receiving event from network: {e:?}");
530 }
531 None => {
532 tracing::info!("Relay Task, recv_handle, Network event stream closed");
533 return;
534 }
535 }
536 }
537 }
538 }
539 });
540
541 handle.network_registry.register(send_handle);
542 handle.network_registry.register(recv_handle);
543 }
544
545 fn add_network_event_tasks(&self, handle: &mut SystemContextHandle<TYPES, I, V>) {
547 let network = Arc::clone(&handle.network);
548
549 self.add_network_event_task(handle, network);
550 }
551
552 fn add_network_event_task(
554 &self,
555 handle: &mut SystemContextHandle<TYPES, I, V>,
556 channel: Arc<<I as NodeImplementation<TYPES>>::Network>,
557 ) {
558 add_network_event_task(handle, channel);
559 }
560}
561
562pub async fn add_network_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
564 handle: &mut SystemContextHandle<TYPES, I, V>,
565) {
566 add_network_message_and_request_receiver_tasks(handle).await;
567
568 add_network_event_tasks(handle);
569}
570
571pub async fn add_network_message_and_request_receiver_tasks<
573 TYPES: NodeType,
574 I: NodeImplementation<TYPES>,
575 V: Versions,
576>(
577 handle: &mut SystemContextHandle<TYPES, I, V>,
578) {
579 let network = Arc::clone(&handle.network);
580
581 add_network_message_task(handle, &network);
582
583 add_request_network_task(handle).await;
584 add_response_task(handle);
585}
586
587pub fn add_network_event_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
589 handle: &mut SystemContextHandle<TYPES, I, V>,
590) {
591 add_network_event_task(handle, Arc::clone(&handle.network));
592}