1pub mod task_state;
11use std::{collections::BTreeMap, fmt::Debug, num::NonZeroUsize, sync::Arc, time::Duration};
12
13use async_broadcast::{broadcast, RecvError};
14use async_lock::RwLock;
15use async_trait::async_trait;
16use futures::{
17 future::{BoxFuture, FutureExt},
18 stream, StreamExt,
19};
20use hotshot_task::task::Task;
21#[cfg(feature = "rewind")]
22use hotshot_task_impls::rewind::RewindTaskState;
23use hotshot_task_impls::{
24 da::DaTaskState,
25 events::HotShotEvent,
26 network::{NetworkEventTaskState, NetworkMessageTaskState},
27 request::NetworkRequestState,
28 response::{run_response_task, NetworkResponseState},
29 transactions::TransactionTaskState,
30 upgrade::UpgradeTaskState,
31 vid::VidTaskState,
32 view_sync::ViewSyncTaskState,
33};
34use hotshot_types::{
35 consensus::{Consensus, OuterConsensus},
36 constants::EVENT_CHANNEL_SIZE,
37 message::{Message, UpgradeLock},
38 storage_metrics::StorageMetricsValue,
39 traits::{
40 network::ConnectedNetwork,
41 node_implementation::{ConsensusTime, NodeImplementation, NodeType},
42 },
43};
44use tokio::{spawn, time::sleep};
45use vbs::version::StaticVersionType;
46
47use crate::{
48 genesis_epoch_from_version, tasks::task_state::CreateTaskState, types::SystemContextHandle,
49 ConsensusApi, ConsensusMetricsValue, ConsensusTaskRegistry, EpochMembershipCoordinator,
50 HotShotConfig, HotShotInitializer, NetworkTaskRegistry, SignatureKey, StateSignatureKey,
51 SystemContext, Versions,
52};
53
54#[derive(Clone, Debug)]
56pub enum GlobalEvent {
57 Shutdown,
59 Dummy,
61}
62
63pub async fn add_request_network_task<
65 TYPES: NodeType,
66 I: NodeImplementation<TYPES>,
67 V: Versions,
68>(
69 handle: &mut SystemContextHandle<TYPES, I, V>,
70) {
71 let state = NetworkRequestState::<TYPES, I>::create_from(handle).await;
72
73 let task = Task::new(
74 state,
75 handle.internal_event_stream.0.clone(),
76 handle.internal_event_stream.1.activate_cloned(),
77 );
78 handle.consensus_registry.run_task(task);
79}
80
81pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
83 handle: &mut SystemContextHandle<TYPES, I, V>,
84) {
85 let state = NetworkResponseState::<TYPES, V>::new(
86 handle.hotshot.consensus(),
87 handle.membership_coordinator.clone(),
88 handle.public_key().clone(),
89 handle.private_key().clone(),
90 handle.hotshot.id,
91 handle.hotshot.upgrade_lock.clone(),
92 );
93 handle
94 .network_registry
95 .register(run_response_task::<TYPES, V>(
96 state,
97 handle.internal_event_stream.1.activate_cloned(),
98 handle.internal_event_stream.0.clone(),
99 ));
100}
101
102pub fn add_queue_len_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
104 handle: &mut SystemContextHandle<TYPES, I, V>,
105) {
106 let consensus = handle.hotshot.consensus();
107 let rx = handle.internal_event_stream.1.clone();
108 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
109 let task_handle = spawn(async move {
110 futures::pin_mut!(shutdown_signal);
111 loop {
112 futures::select! {
113 () = shutdown_signal => {
114 return;
115 },
116 () = sleep(Duration::from_millis(500)).fuse() => {
117 consensus.read().await.metrics.internal_event_queue_len.set(rx.len());
118 }
119 }
120 }
121 });
122 handle.network_registry.register(task_handle);
123}
124
125#[allow(clippy::missing_panics_doc)]
127pub fn add_network_message_task<
128 TYPES: NodeType,
129 I: NodeImplementation<TYPES>,
130 NET: ConnectedNetwork<TYPES::SignatureKey>,
131 V: Versions,
132>(
133 handle: &mut SystemContextHandle<TYPES, I, V>,
134 channel: &Arc<NET>,
135) {
136 let upgrade_lock = handle.hotshot.upgrade_lock.clone();
137
138 let network_state: NetworkMessageTaskState<TYPES, V> = NetworkMessageTaskState {
139 internal_event_stream: handle.internal_event_stream.0.clone(),
140 external_event_stream: handle.output_event_stream.0.clone(),
141 public_key: handle.public_key().clone(),
142 transactions_cache: lru::LruCache::new(NonZeroUsize::new(100_000).unwrap()),
143 upgrade_lock: upgrade_lock.clone(),
144 id: handle.hotshot.id,
145 };
146
147 let network = Arc::clone(channel);
148 let mut state = network_state.clone();
149 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
150 let task_handle = spawn(async move {
151 futures::pin_mut!(shutdown_signal);
152
153 loop {
154 futures::select! {
156 () = shutdown_signal => {
158 tracing::error!("Shutting down network message task");
159 return;
160 }
161
162 message = network.recv_message().fuse() => {
164 let Ok(message) = message else {
166 continue;
167 };
168
169 let deserialized_message: Message<TYPES> = match upgrade_lock.deserialize(&message).await {
171 Ok(message) => message,
172 Err(e) => {
173 tracing::error!("Failed to deserialize message: {:?}", e);
174 continue;
175 }
176 };
177
178 state.handle_message(deserialized_message).await;
180 }
181 }
182 }
183 });
184 handle.network_registry.register(task_handle);
185}
186
187pub fn add_network_event_task<
189 TYPES: NodeType,
190 I: NodeImplementation<TYPES>,
191 V: Versions,
192 NET: ConnectedNetwork<TYPES::SignatureKey>,
193>(
194 handle: &mut SystemContextHandle<TYPES, I, V>,
195 network: Arc<NET>,
196) {
197 let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState {
198 network,
199 view: TYPES::View::genesis(),
200 epoch: genesis_epoch_from_version::<V, TYPES>(),
201 membership_coordinator: handle.membership_coordinator.clone(),
202 storage: handle.storage(),
203 storage_metrics: handle.storage_metrics(),
204 consensus: OuterConsensus::new(handle.consensus()),
205 upgrade_lock: handle.hotshot.upgrade_lock.clone(),
206 transmit_tasks: BTreeMap::new(),
207 epoch_height: handle.epoch_height,
208 id: handle.hotshot.id,
209 };
210 let task = Task::new(
211 network_state,
212 handle.internal_event_stream.0.clone(),
213 handle.internal_event_stream.1.activate_cloned(),
214 );
215 handle.consensus_registry.run_task(task);
216}
217
218pub async fn add_consensus_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
220 handle: &mut SystemContextHandle<TYPES, I, V>,
221) {
222 handle.add_task(ViewSyncTaskState::<TYPES, V>::create_from(handle).await);
223 handle.add_task(VidTaskState::<TYPES, I, V>::create_from(handle).await);
224 handle.add_task(DaTaskState::<TYPES, I, V>::create_from(handle).await);
225 handle.add_task(TransactionTaskState::<TYPES, V>::create_from(handle).await);
226
227 {
228 let mut upgrade_certificate_lock = handle
229 .hotshot
230 .upgrade_lock
231 .decided_upgrade_certificate
232 .write()
233 .await;
234
235 if upgrade_certificate_lock
237 .as_ref()
238 .is_some_and(|cert| V::Base::VERSION >= cert.data.new_version)
239 {
240 tracing::warn!("Discarding loaded upgrade certificate due to version configuration.");
241 *upgrade_certificate_lock = None;
242 }
243 }
244
245 if V::Base::VERSION < V::Upgrade::VERSION {
247 tracing::warn!("Consensus was started with an upgrade configured. Spawning upgrade task.");
248 handle.add_task(UpgradeTaskState::<TYPES, V>::create_from(handle).await);
249 }
250
251 {
252 use hotshot_task_impls::{
253 consensus::ConsensusTaskState, quorum_proposal::QuorumProposalTaskState,
254 quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState,
255 };
256
257 handle.add_task(QuorumProposalTaskState::<TYPES, I, V>::create_from(handle).await);
258 handle.add_task(QuorumVoteTaskState::<TYPES, I, V>::create_from(handle).await);
259 handle.add_task(QuorumProposalRecvTaskState::<TYPES, I, V>::create_from(handle).await);
260 handle.add_task(ConsensusTaskState::<TYPES, I, V>::create_from(handle).await);
261 }
262 add_queue_len_task(handle);
263 #[cfg(feature = "rewind")]
264 handle.add_task(RewindTaskState::<TYPES>::create_from(&handle).await);
265}
266
267#[must_use]
275pub fn create_shutdown_event_monitor<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
276 handle: &SystemContextHandle<TYPES, I, V>,
277) -> BoxFuture<'static, ()> {
278 let mut event_stream = handle.internal_event_stream.1.activate_cloned();
280
281 async move {
283 loop {
284 match event_stream.recv_direct().await {
285 Ok(event) => {
286 if matches!(event.as_ref(), HotShotEvent::Shutdown) {
287 return;
288 }
289 },
290 Err(RecvError::Closed) => {
291 return;
292 },
293 Err(e) => {
294 tracing::error!("Shutdown event monitor channel recv error: {}", e);
295 },
296 }
297 }
298 }
299 .boxed()
300}
301
302#[async_trait]
303pub trait EventTransformerState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
307where
308 Self: std::fmt::Debug + Send + Sync + 'static,
309{
310 async fn recv_handler(&mut self, event: &HotShotEvent<TYPES>) -> Vec<HotShotEvent<TYPES>>;
312
313 async fn send_handler(
315 &mut self,
316 event: &HotShotEvent<TYPES>,
317 public_key: &TYPES::SignatureKey,
318 private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
319 upgrade_lock: &UpgradeLock<TYPES, V>,
320 consensus: Arc<RwLock<Consensus<TYPES>>>,
321 ) -> Vec<HotShotEvent<TYPES>>;
322
323 #[allow(clippy::too_many_arguments)]
324 async fn spawn_handle(
326 &'static mut self,
327 public_key: TYPES::SignatureKey,
328 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
329 state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
330 nonce: u64,
331 config: HotShotConfig<TYPES>,
332 memberships: EpochMembershipCoordinator<TYPES>,
333 network: Arc<I::Network>,
334 initializer: HotShotInitializer<TYPES>,
335 consensus_metrics: ConsensusMetricsValue,
336 storage: I::Storage,
337 storage_metrics: StorageMetricsValue,
338 ) -> SystemContextHandle<TYPES, I, V> {
339 let epoch_height = config.epoch_height;
340
341 let hotshot = SystemContext::new(
342 public_key,
343 private_key,
344 state_private_key,
345 nonce,
346 config,
347 memberships.clone(),
348 network,
349 initializer,
350 consensus_metrics,
351 storage.clone(),
352 storage_metrics,
353 )
354 .await;
355 let consensus_registry = ConsensusTaskRegistry::new();
356 let network_registry = NetworkTaskRegistry::new();
357
358 let output_event_stream = hotshot.external_event_stream.clone();
359 let internal_event_stream = hotshot.internal_event_stream.clone();
360
361 let mut handle = SystemContextHandle {
362 consensus_registry,
363 network_registry,
364 output_event_stream: output_event_stream.clone(),
365 internal_event_stream: internal_event_stream.clone(),
366 hotshot: Arc::clone(&hotshot),
367 storage,
368 network: Arc::clone(&hotshot.network),
369 membership_coordinator: memberships.clone(),
370 epoch_height,
371 };
372
373 add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
374 self.add_network_tasks(&mut handle).await;
375
376 handle
377 }
378
379 #[allow(clippy::too_many_lines)]
381 async fn add_network_tasks(&'static mut self, handle: &mut SystemContextHandle<TYPES, I, V>) {
382 let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
387 let (network_task_sender, receiver_from_network) = broadcast(EVENT_CHANNEL_SIZE);
389 let (original_sender, original_receiver) = (
391 handle.internal_event_stream.0.clone(),
392 handle.internal_event_stream.1.activate_cloned(),
393 );
394
395 let mut internal_event_stream = (
398 network_task_sender.clone(),
399 network_task_receiver.clone().deactivate(),
400 );
401 std::mem::swap(
402 &mut internal_event_stream,
403 &mut handle.internal_event_stream,
404 );
405
406 add_network_message_and_request_receiver_tasks(handle).await;
408 self.add_network_event_tasks(handle);
409
410 std::mem::swap(
411 &mut internal_event_stream,
412 &mut handle.internal_event_stream,
413 );
414
415 let state_in = Arc::new(RwLock::new(self));
416 let state_out = Arc::clone(&state_in);
417 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
420 let public_key = handle.public_key().clone();
421 let private_key = handle.private_key().clone();
422 let upgrade_lock = handle.hotshot.upgrade_lock.clone();
423 let consensus = Arc::clone(&handle.hotshot.consensus());
424 let send_handle = spawn(async move {
425 futures::pin_mut!(shutdown_signal);
426
427 let recv_stream = stream::unfold(original_receiver, |mut recv| async move {
428 match recv.recv().await {
429 Ok(event) => Some((Ok(event), recv)),
430 Err(async_broadcast::RecvError::Closed) => None,
431 Err(e) => Some((Err(e), recv)),
432 }
433 })
434 .boxed();
435
436 let fused_recv_stream = recv_stream.fuse();
437 futures::pin_mut!(fused_recv_stream);
438
439 loop {
440 futures::select! {
441 () = shutdown_signal => {
442 tracing::error!("Shutting down relay send task");
443 let _ = sender_to_network.broadcast(HotShotEvent::<TYPES>::Shutdown.into()).await;
444 return;
445 }
446 event = fused_recv_stream.next() => {
447 match event {
448 Some(Ok(msg)) => {
449 let mut state = state_out.write().await;
450 let mut results = state.send_handler(
451 &msg,
452 &public_key,
453 &private_key,
454 &upgrade_lock,
455 Arc::clone(&consensus)
456 ).await;
457 results.reverse();
458 while let Some(event) = results.pop() {
459 let _ = sender_to_network.broadcast(event.into()).await;
460 }
461 }
462 Some(Err(e)) => {
463 tracing::error!("Relay Task, send_handle, Error receiving event: {e:?}");
464 }
465 None => {
466 tracing::info!("Relay Task, send_handle, Event stream closed");
467 return;
468 }
469 }
470 }
471 }
472 }
473 });
474
475 let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
478 let recv_handle = spawn(async move {
479 futures::pin_mut!(shutdown_signal);
480
481 let network_recv_stream =
482 stream::unfold(receiver_from_network, |mut recv| async move {
483 match recv.recv().await {
484 Ok(event) => Some((Ok(event), recv)),
485 Err(async_broadcast::RecvError::Closed) => None,
486 Err(e) => Some((Err(e), recv)),
487 }
488 });
489
490 let fused_network_recv_stream = network_recv_stream.boxed().fuse();
491 futures::pin_mut!(fused_network_recv_stream);
492
493 loop {
494 futures::select! {
495 () = shutdown_signal => {
496 tracing::error!("Shutting down relay receive task");
497 return;
498 }
499 event = fused_network_recv_stream.next() => {
500 match event {
501 Some(Ok(msg)) => {
502 let mut state = state_in.write().await;
503 let mut results = state.recv_handler(&msg).await;
504 results.reverse();
505 while let Some(event) = results.pop() {
506 let _ = original_sender.broadcast(event.into()).await;
507 }
508 }
509 Some(Err(e)) => {
510 tracing::error!("Relay Task, recv_handle, Error receiving event from network: {e:?}");
511 }
512 None => {
513 tracing::info!("Relay Task, recv_handle, Network event stream closed");
514 return;
515 }
516 }
517 }
518 }
519 }
520 });
521
522 handle.network_registry.register(send_handle);
523 handle.network_registry.register(recv_handle);
524 }
525
526 fn add_network_event_tasks(&self, handle: &mut SystemContextHandle<TYPES, I, V>) {
528 let network = Arc::clone(&handle.network);
529
530 self.add_network_event_task(handle, network);
531 }
532
533 fn add_network_event_task(
535 &self,
536 handle: &mut SystemContextHandle<TYPES, I, V>,
537 channel: Arc<<I as NodeImplementation<TYPES>>::Network>,
538 ) {
539 add_network_event_task(handle, channel);
540 }
541}
542
543pub async fn add_network_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
545 handle: &mut SystemContextHandle<TYPES, I, V>,
546) {
547 add_network_message_and_request_receiver_tasks(handle).await;
548
549 add_network_event_tasks(handle);
550}
551
552pub async fn add_network_message_and_request_receiver_tasks<
554 TYPES: NodeType,
555 I: NodeImplementation<TYPES>,
556 V: Versions,
557>(
558 handle: &mut SystemContextHandle<TYPES, I, V>,
559) {
560 let network = Arc::clone(&handle.network);
561
562 add_network_message_task(handle, &network);
563
564 add_request_network_task(handle).await;
565 add_response_task(handle);
566}
567
568pub fn add_network_event_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
570 handle: &mut SystemContextHandle<TYPES, I, V>,
571) {
572 add_network_event_task(handle, Arc::clone(&handle.network));
573}