1pub mod task_state;
11use std::{collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration};
12
13use async_broadcast::{RecvError, broadcast};
14use async_lock::RwLock;
15use async_trait::async_trait;
16use futures::{
17 StreamExt,
18 future::{BoxFuture, FutureExt},
19 stream,
20};
21use hotshot_task::task::Task;
22#[cfg(feature = "rewind")]
23use hotshot_task_impls::rewind::RewindTaskState;
24use hotshot_task_impls::{
25 da::DaTaskState,
26 events::HotShotEvent,
27 network::{NetworkEventTaskState, NetworkMessageTaskState},
28 request::NetworkRequestState,
29 response::{NetworkResponseState, run_response_task},
30 stats::StatsTaskState,
31 transactions::TransactionTaskState,
32 upgrade::UpgradeTaskState,
33 vid::VidTaskState,
34 view_sync::ViewSyncTaskState,
35};
36use hotshot_types::{
37 consensus::OuterConsensus,
38 constants::EVENT_CHANNEL_SIZE,
39 data::ViewNumber,
40 message::{EXTERNAL_MESSAGE_VERSION, Message, MessageKind, UpgradeLock},
41 storage_metrics::StorageMetricsValue,
42 traits::{
43 network::ConnectedNetwork,
44 node_implementation::{NodeImplementation, NodeType},
45 },
46};
47use tokio::{spawn, time::sleep};
48use vbs::version::Version;
49
50use crate::{
51 ConsensusApi, ConsensusMetricsValue, ConsensusTaskRegistry, EpochMembershipCoordinator,
52 HotShotConfig, HotShotInitializer, NetworkTaskRegistry, SignatureKey, StateSignatureKey,
53 SystemContext, genesis_epoch_from_version, tasks::task_state::CreateTaskState,
54 types::SystemContextHandle,
55};
56
57#[derive(Clone, Debug)]
59pub enum GlobalEvent {
60 Shutdown,
62 Dummy,
64}
65
66pub async fn add_request_network_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
68 handle: &mut SystemContextHandle<TYPES, I>,
69) {
70 let state = NetworkRequestState::<TYPES, I>::create_from(handle).await;
71
72 let task = Task::new(
73 state,
74 handle.internal_event_stream.0.clone(),
75 handle.internal_event_stream.1.activate_cloned(),
76 );
77 handle.consensus_registry.run_task(task);
78}
79
80pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
82 handle: &mut SystemContextHandle<TYPES, I>,
83) {
84 let state = NetworkResponseState::<TYPES>::new(
85 handle.hotshot.consensus(),
86 handle.membership_coordinator.clone(),
87 handle.public_key().clone(),
88 handle.private_key().clone(),
89 handle.hotshot.id,
90 handle.hotshot.upgrade_lock.clone(),
91 );
92 handle.network_registry.register(run_response_task::<TYPES>(
93 state,
94 handle.internal_event_stream.1.activate_cloned(),
95 handle.internal_event_stream.0.clone(),
96 ));
97}
98
99pub fn add_queue_len_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
101 handle: &mut SystemContextHandle<TYPES, I>,
102) {
103 let consensus = handle.hotshot.consensus();
104 let rx = handle.internal_event_stream.1.clone();
105 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
106 let task_handle = spawn(async move {
107 futures::pin_mut!(shutdown_signal);
108 loop {
109 futures::select! {
110 () = shutdown_signal => {
111 return;
112 },
113 () = sleep(Duration::from_millis(500)).fuse() => {
114 consensus.read().await.metrics.internal_event_queue_len.set(rx.len());
115 }
116 }
117 }
118 });
119 handle.network_registry.register(task_handle);
120}
121
122#[allow(clippy::missing_panics_doc)]
124pub fn add_network_message_task<
125 TYPES: NodeType,
126 I: NodeImplementation<TYPES>,
127 NET: ConnectedNetwork<TYPES::SignatureKey>,
128>(
129 handle: &mut SystemContextHandle<TYPES, I>,
130 channel: &Arc<NET>,
131) {
132 let upgrade_lock = handle.hotshot.upgrade_lock.clone();
133
134 let network_state: NetworkMessageTaskState<TYPES> = NetworkMessageTaskState {
135 internal_event_stream: handle.internal_event_stream.0.clone(),
136 external_event_stream: handle.output_event_stream.0.clone(),
137 public_key: handle.public_key().clone(),
138 upgrade_lock: upgrade_lock.clone(),
139 id: handle.hotshot.id,
140 };
141
142 let network = Arc::clone(channel);
143 let mut state = network_state.clone();
144 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
145 let task_handle = spawn(async move {
146 futures::pin_mut!(shutdown_signal);
147
148 loop {
149 futures::select! {
151 () = shutdown_signal => {
153 tracing::error!("Shutting down network message task");
154 return;
155 }
156
157 message = network.recv_message().fuse() => {
159 let Ok(message) = message else {
161 continue;
162 };
163
164 let (deserialized_message, version): (Message<TYPES>, Version) = match upgrade_lock.deserialize(&message) {
166 Ok(message) => message,
167 Err(e) => {
168 tracing::error!("Failed to deserialize message: {:?}", e);
169 continue;
170 }
171 };
172
173 if version == EXTERNAL_MESSAGE_VERSION
176 && !matches!(deserialized_message.kind, MessageKind::<TYPES>::External(_))
177 {
178 tracing::warn!("Received a non-external message with version 0.0");
179 continue;
180 }
181
182 state.handle_message(deserialized_message).await;
184 }
185 }
186 }
187 });
188 handle.network_registry.register(task_handle);
189}
190
191pub fn add_network_event_task<
193 TYPES: NodeType,
194 I: NodeImplementation<TYPES>,
195 NET: ConnectedNetwork<TYPES::SignatureKey>,
196>(
197 handle: &mut SystemContextHandle<TYPES, I>,
198 network: Arc<NET>,
199) {
200 let network_state: NetworkEventTaskState<_, _, _> = NetworkEventTaskState {
201 network,
202 view: ViewNumber::genesis(),
203 epoch: genesis_epoch_from_version(handle.hotshot.upgrade_lock.upgrade().base),
204 membership_coordinator: handle.membership_coordinator.clone(),
205 storage: handle.storage(),
206 storage_metrics: handle.storage_metrics(),
207 consensus: OuterConsensus::new(handle.consensus()),
208 upgrade_lock: handle.hotshot.upgrade_lock.clone(),
209 transmit_tasks: BTreeMap::new(),
210 epoch_height: handle.epoch_height,
211 id: handle.hotshot.id,
212 };
213 let task = Task::new(
214 network_state,
215 handle.internal_event_stream.0.clone(),
216 handle.internal_event_stream.1.activate_cloned(),
217 );
218 handle.consensus_registry.run_task(task);
219}
220
221pub async fn add_consensus_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>>(
223 handle: &mut SystemContextHandle<TYPES, I>,
224) {
225 handle.add_task(ViewSyncTaskState::<TYPES>::create_from(handle).await);
226 handle.add_task(VidTaskState::<TYPES, I>::create_from(handle).await);
227 handle.add_task(DaTaskState::<TYPES, I>::create_from(handle).await);
228 handle.add_task(TransactionTaskState::<TYPES>::create_from(handle).await);
229
230 let upgrade = handle.hotshot.upgrade_lock.upgrade();
231
232 handle.hotshot.upgrade_lock.apply(|cert| {
234 if cert
235 .as_ref()
236 .is_some_and(|c| upgrade.base >= c.data.new_version)
237 {
238 tracing::warn!("Discarding loaded upgrade certificate due to version configuration.");
239 *cert = None
240 }
241 });
242
243 if upgrade.base < upgrade.target {
245 tracing::warn!("Consensus was started with an upgrade configured. Spawning upgrade task.");
246 handle.add_task(UpgradeTaskState::<TYPES>::create_from(handle).await);
247 }
248
249 {
250 use hotshot_task_impls::{
251 consensus::ConsensusTaskState, quorum_proposal::QuorumProposalTaskState,
252 quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState,
253 };
254
255 handle.add_task(QuorumProposalTaskState::<TYPES, I>::create_from(handle).await);
256 handle.add_task(QuorumVoteTaskState::<TYPES, I>::create_from(handle).await);
257 handle.add_task(QuorumProposalRecvTaskState::<TYPES, I>::create_from(handle).await);
258 handle.add_task(ConsensusTaskState::<TYPES, I>::create_from(handle).await);
259 if cfg!(feature = "stats") {
260 handle.add_task(StatsTaskState::<TYPES>::create_from(handle).await);
261 }
262 }
263 add_queue_len_task(handle);
264 #[cfg(feature = "rewind")]
265 handle.add_task(RewindTaskState::<TYPES>::create_from(&handle).await);
266}
267
268#[must_use]
276pub fn create_shutdown_event_monitor<TYPES: NodeType, I: NodeImplementation<TYPES>>(
277 handle: &SystemContextHandle<TYPES, I>,
278) -> BoxFuture<'static, ()> {
279 let mut event_stream = handle.internal_event_stream.1.activate_cloned();
281
282 async move {
284 loop {
285 match event_stream.recv_direct().await {
286 Ok(event) => {
287 if matches!(event.as_ref(), HotShotEvent::Shutdown) {
288 return;
289 }
290 },
291 Err(RecvError::Closed) => {
292 return;
293 },
294 Err(e) => {
295 tracing::error!("Shutdown event monitor channel recv error: {}", e);
296 },
297 }
298 }
299 }
300 .boxed()
301}
302
303#[allow(clippy::too_many_arguments)]
307#[async_trait]
308pub trait EventTransformerState<TYPES: NodeType, I: NodeImplementation<TYPES>>
309where
310 Self: std::fmt::Debug + Send + Sync + 'static,
311{
312 async fn recv_handler(&mut self, event: &HotShotEvent<TYPES>) -> Vec<HotShotEvent<TYPES>>;
314
315 async fn send_handler(
317 &mut self,
318 event: &HotShotEvent<TYPES>,
319 public_key: &TYPES::SignatureKey,
320 private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
321 upgrade_lock: &UpgradeLock<TYPES>,
322 consensus: OuterConsensus<TYPES>,
323 membership_coordinator: EpochMembershipCoordinator<TYPES>,
324 network: Arc<I::Network>,
325 ) -> Vec<HotShotEvent<TYPES>>;
326
327 #[allow(clippy::too_many_arguments)]
329 async fn spawn_handle(
330 &'static mut self,
331 public_key: TYPES::SignatureKey,
332 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
333 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
334 nonce: u64,
335 config: HotShotConfig<TYPES>,
336 upgrade: versions::Upgrade,
337 memberships: EpochMembershipCoordinator<TYPES>,
338 network: Arc<I::Network>,
339 initializer: HotShotInitializer<TYPES>,
340 consensus_metrics: ConsensusMetricsValue,
341 storage: I::Storage,
342 storage_metrics: StorageMetricsValue,
343 ) -> SystemContextHandle<TYPES, I> {
344 let epoch_height = config.epoch_height;
345
346 let hotshot = SystemContext::new(
347 public_key,
348 private_key,
349 state_private_key,
350 nonce,
351 config,
352 upgrade,
353 memberships.clone(),
354 network,
355 initializer,
356 consensus_metrics,
357 storage.clone(),
358 storage_metrics,
359 )
360 .await;
361 let consensus_registry = ConsensusTaskRegistry::new();
362 let network_registry = NetworkTaskRegistry::new();
363
364 let output_event_stream = hotshot.external_event_stream.clone();
365 let internal_event_stream = hotshot.internal_event_stream.clone();
366
367 let mut handle = SystemContextHandle {
368 consensus_registry,
369 network_registry,
370 output_event_stream: output_event_stream.clone(),
371 internal_event_stream: internal_event_stream.clone(),
372 hotshot: Arc::clone(&hotshot),
373 storage,
374 network: Arc::clone(&hotshot.network),
375 membership_coordinator: memberships.clone(),
376 epoch_height,
377 };
378
379 add_consensus_tasks::<TYPES, I>(&mut handle).await;
380 self.add_network_tasks(&mut handle).await;
381
382 handle
383 }
384
385 #[allow(clippy::too_many_lines)]
387 async fn add_network_tasks(&'static mut self, handle: &mut SystemContextHandle<TYPES, I>) {
388 let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
393 let (network_task_sender, receiver_from_network) = broadcast(EVENT_CHANNEL_SIZE);
395 let (original_sender, original_receiver) = (
397 handle.internal_event_stream.0.clone(),
398 handle.internal_event_stream.1.activate_cloned(),
399 );
400
401 let mut internal_event_stream = (
404 network_task_sender.clone(),
405 network_task_receiver.clone().deactivate(),
406 );
407 std::mem::swap(
408 &mut internal_event_stream,
409 &mut handle.internal_event_stream,
410 );
411
412 add_network_message_and_request_receiver_tasks(handle).await;
414 self.add_network_event_tasks(handle);
415
416 std::mem::swap(
417 &mut internal_event_stream,
418 &mut handle.internal_event_stream,
419 );
420
421 let state_in = Arc::new(RwLock::new(self));
422 let state_out = Arc::clone(&state_in);
423 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
426 let public_key = handle.public_key().clone();
427 let private_key = handle.private_key().clone();
428 let upgrade_lock = handle.hotshot.upgrade_lock.clone();
429 let consensus = OuterConsensus::new(handle.consensus());
430 let membership_coordinator = handle.membership_coordinator.clone();
431 let network = Arc::clone(&handle.network);
432 let send_handle = spawn(async move {
433 futures::pin_mut!(shutdown_signal);
434
435 let recv_stream = stream::unfold(original_receiver, |mut recv| async move {
436 match recv.recv().await {
437 Ok(event) => Some((Ok(event), recv)),
438 Err(async_broadcast::RecvError::Closed) => None,
439 Err(e) => Some((Err(e), recv)),
440 }
441 })
442 .boxed();
443
444 let fused_recv_stream = recv_stream.fuse();
445 futures::pin_mut!(fused_recv_stream);
446
447 loop {
448 futures::select! {
449 () = shutdown_signal => {
450 tracing::error!("Shutting down relay send task");
451 let _ = sender_to_network.broadcast(HotShotEvent::<TYPES>::Shutdown.into()).await;
452 return;
453 }
454 event = fused_recv_stream.next() => {
455 match event {
456 Some(Ok(msg)) => {
457 let mut state = state_out.write().await;
458 let mut results = state.send_handler(
459 &msg,
460 &public_key,
461 &private_key,
462 &upgrade_lock,
463 consensus.clone(),
464 membership_coordinator.clone(),
465 Arc::clone(&network),
466 ).await;
467 results.reverse();
468 while let Some(event) = results.pop() {
469 let _ = sender_to_network.broadcast(event.into()).await;
470 }
471 }
472 Some(Err(e)) => {
473 tracing::error!("Relay Task, send_handle, Error receiving event: {e:?}");
474 }
475 None => {
476 tracing::info!("Relay Task, send_handle, Event stream closed");
477 return;
478 }
479 }
480 }
481 }
482 }
483 });
484
485 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
488 let recv_handle = spawn(async move {
489 futures::pin_mut!(shutdown_signal);
490
491 let network_recv_stream =
492 stream::unfold(receiver_from_network, |mut recv| async move {
493 match recv.recv().await {
494 Ok(event) => Some((Ok(event), recv)),
495 Err(async_broadcast::RecvError::Closed) => None,
496 Err(e) => Some((Err(e), recv)),
497 }
498 });
499
500 let fused_network_recv_stream = network_recv_stream.boxed().fuse();
501 futures::pin_mut!(fused_network_recv_stream);
502
503 loop {
504 futures::select! {
505 () = shutdown_signal => {
506 tracing::error!("Shutting down relay receive task");
507 return;
508 }
509 event = fused_network_recv_stream.next() => {
510 match event {
511 Some(Ok(msg)) => {
512 let mut state = state_in.write().await;
513 let mut results = state.recv_handler(&msg).await;
514 results.reverse();
515 while let Some(event) = results.pop() {
516 let _ = original_sender.broadcast(event.into()).await;
517 }
518 }
519 Some(Err(e)) => {
520 tracing::error!("Relay Task, recv_handle, Error receiving event from network: {e:?}");
521 }
522 None => {
523 tracing::info!("Relay Task, recv_handle, Network event stream closed");
524 return;
525 }
526 }
527 }
528 }
529 }
530 });
531
532 handle.network_registry.register(send_handle);
533 handle.network_registry.register(recv_handle);
534 }
535
536 fn add_network_event_tasks(&self, handle: &mut SystemContextHandle<TYPES, I>) {
538 let network = Arc::clone(&handle.network);
539
540 self.add_network_event_task(handle, network);
541 }
542
543 fn add_network_event_task(
545 &self,
546 handle: &mut SystemContextHandle<TYPES, I>,
547 channel: Arc<<I as NodeImplementation<TYPES>>::Network>,
548 ) {
549 add_network_event_task(handle, channel);
550 }
551}
552
553pub async fn add_network_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>>(
555 handle: &mut SystemContextHandle<TYPES, I>,
556) {
557 add_network_message_and_request_receiver_tasks(handle).await;
558
559 add_network_event_tasks(handle);
560}
561
562pub async fn add_network_message_and_request_receiver_tasks<
564 TYPES: NodeType,
565 I: NodeImplementation<TYPES>,
566>(
567 handle: &mut SystemContextHandle<TYPES, I>,
568) {
569 let network = Arc::clone(&handle.network);
570
571 add_network_message_task(handle, &network);
572
573 add_request_network_task(handle).await;
574 add_response_task(handle);
575}
576
577pub fn add_network_event_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>>(
579 handle: &mut SystemContextHandle<TYPES, I>,
580) {
581 add_network_event_task(handle, Arc::clone(&handle.network));
582}