hotshot/tasks/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides a number of tasks that run continuously
8
9/// Provides trait to create task states from a `SystemContextHandle`
10pub mod task_state;
11use std::{collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration};
12
13use async_broadcast::{RecvError, broadcast};
14use async_lock::RwLock;
15use async_trait::async_trait;
16use futures::{
17    StreamExt,
18    future::{BoxFuture, FutureExt},
19    stream,
20};
21use hotshot_task::task::Task;
22#[cfg(feature = "rewind")]
23use hotshot_task_impls::rewind::RewindTaskState;
24use hotshot_task_impls::{
25    da::DaTaskState,
26    events::HotShotEvent,
27    network::{NetworkEventTaskState, NetworkMessageTaskState},
28    request::NetworkRequestState,
29    response::{NetworkResponseState, run_response_task},
30    stats::StatsTaskState,
31    transactions::TransactionTaskState,
32    upgrade::UpgradeTaskState,
33    vid::VidTaskState,
34    view_sync::ViewSyncTaskState,
35};
36use hotshot_types::{
37    consensus::OuterConsensus,
38    constants::EVENT_CHANNEL_SIZE,
39    data::ViewNumber,
40    message::{EXTERNAL_MESSAGE_VERSION, Message, MessageKind, UpgradeLock},
41    storage_metrics::StorageMetricsValue,
42    traits::{
43        network::ConnectedNetwork,
44        node_implementation::{NodeImplementation, NodeType},
45    },
46};
47use tokio::{spawn, time::sleep};
48use vbs::version::Version;
49
50use crate::{
51    ConsensusApi, ConsensusMetricsValue, ConsensusTaskRegistry, EpochMembershipCoordinator,
52    HotShotConfig, HotShotInitializer, NetworkTaskRegistry, SignatureKey, StateSignatureKey,
53    SystemContext, genesis_epoch_from_version, tasks::task_state::CreateTaskState,
54    types::SystemContextHandle,
55};
56
57/// event for global event stream
58#[derive(Clone, Debug)]
59pub enum GlobalEvent {
60    /// shut everything down
61    Shutdown,
62    /// dummy (TODO delete later)
63    Dummy,
64}
65
66/// Add tasks for network requests and responses
67pub async fn add_request_network_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
68    handle: &mut SystemContextHandle<TYPES, I>,
69) {
70    let state = NetworkRequestState::<TYPES, I>::create_from(handle).await;
71
72    let task = Task::new(
73        state,
74        handle.internal_event_stream.0.clone(),
75        handle.internal_event_stream.1.activate_cloned(),
76    );
77    handle.consensus_registry.run_task(task);
78}
79
80/// Add a task which responds to requests on the network.
81pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
82    handle: &mut SystemContextHandle<TYPES, I>,
83) {
84    let state = NetworkResponseState::<TYPES>::new(
85        handle.hotshot.consensus(),
86        handle.membership_coordinator.clone(),
87        handle.public_key().clone(),
88        handle.private_key().clone(),
89        handle.hotshot.id,
90        handle.hotshot.upgrade_lock.clone(),
91    );
92    handle.network_registry.register(run_response_task::<TYPES>(
93        state,
94        handle.internal_event_stream.1.activate_cloned(),
95        handle.internal_event_stream.0.clone(),
96    ));
97}
98
99/// Add a task which updates our queue length metric at a set interval
100pub fn add_queue_len_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
101    handle: &mut SystemContextHandle<TYPES, I>,
102) {
103    let consensus = handle.hotshot.consensus();
104    let rx = handle.internal_event_stream.1.clone();
105    let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
106    let task_handle = spawn(async move {
107        futures::pin_mut!(shutdown_signal);
108        loop {
109            futures::select! {
110                () = shutdown_signal => {
111                    return;
112                },
113                () = sleep(Duration::from_millis(500)).fuse() => {
114                    consensus.read().await.metrics.internal_event_queue_len.set(rx.len());
115                }
116            }
117        }
118    });
119    handle.network_registry.register(task_handle);
120}
121
122/// Add the network task to handle messages and publish events.
123#[allow(clippy::missing_panics_doc)]
124pub fn add_network_message_task<
125    TYPES: NodeType,
126    I: NodeImplementation<TYPES>,
127    NET: ConnectedNetwork<TYPES::SignatureKey>,
128>(
129    handle: &mut SystemContextHandle<TYPES, I>,
130    channel: &Arc<NET>,
131) {
132    let upgrade_lock = handle.hotshot.upgrade_lock.clone();
133
134    let network_state: NetworkMessageTaskState<TYPES> = NetworkMessageTaskState {
135        internal_event_stream: handle.internal_event_stream.0.clone(),
136        external_event_stream: handle.output_event_stream.0.clone(),
137        public_key: handle.public_key().clone(),
138        upgrade_lock: upgrade_lock.clone(),
139        id: handle.hotshot.id,
140    };
141
142    let network = Arc::clone(channel);
143    let mut state = network_state.clone();
144    let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
145    let task_handle = spawn(async move {
146        futures::pin_mut!(shutdown_signal);
147
148        loop {
149            // Wait for one of the following to resolve:
150            futures::select! {
151                // Wait for a shutdown signal
152                () = shutdown_signal => {
153                    tracing::error!("Shutting down network message task");
154                    return;
155                }
156
157                // Wait for a message from the network
158                message = network.recv_message().fuse() => {
159                    // Make sure the message did not fail
160                    let Ok(message) = message else {
161                        continue;
162                    };
163
164                    // Deserialize the message and get the version
165                    let (deserialized_message, version): (Message<TYPES>, Version) = match upgrade_lock.deserialize(&message) {
166                        Ok(message) => message,
167                        Err(e) => {
168                            tracing::error!("Failed to deserialize message: {:?}", e);
169                            continue;
170                        }
171                    };
172
173                    // Special case: external messages (version 0.0). We want to make sure it is an external message
174                    // and warn and continue otherwise.
175                    if version == EXTERNAL_MESSAGE_VERSION
176                        && !matches!(deserialized_message.kind, MessageKind::<TYPES>::External(_))
177                    {
178                        tracing::warn!("Received a non-external message with version 0.0");
179                        continue;
180                    }
181
182                    // Handle the message
183                    state.handle_message(deserialized_message).await;
184                }
185            }
186        }
187    });
188    handle.network_registry.register(task_handle);
189}
190
191/// Add the network task to handle events and send messages.
192pub fn add_network_event_task<
193    TYPES: NodeType,
194    I: NodeImplementation<TYPES>,
195    NET: ConnectedNetwork<TYPES::SignatureKey>,
196>(
197    handle: &mut SystemContextHandle<TYPES, I>,
198    network: Arc<NET>,
199) {
200    let network_state: NetworkEventTaskState<_, _, _> = NetworkEventTaskState {
201        network,
202        view: ViewNumber::genesis(),
203        epoch: genesis_epoch_from_version(handle.hotshot.upgrade_lock.upgrade().base),
204        membership_coordinator: handle.membership_coordinator.clone(),
205        storage: handle.storage(),
206        storage_metrics: handle.storage_metrics(),
207        consensus: OuterConsensus::new(handle.consensus()),
208        upgrade_lock: handle.hotshot.upgrade_lock.clone(),
209        transmit_tasks: BTreeMap::new(),
210        epoch_height: handle.epoch_height,
211        id: handle.hotshot.id,
212    };
213    let task = Task::new(
214        network_state,
215        handle.internal_event_stream.0.clone(),
216        handle.internal_event_stream.1.activate_cloned(),
217    );
218    handle.consensus_registry.run_task(task);
219}
220
221/// Adds consensus-related tasks to a `SystemContextHandle`.
222pub async fn add_consensus_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>>(
223    handle: &mut SystemContextHandle<TYPES, I>,
224) {
225    handle.add_task(ViewSyncTaskState::<TYPES>::create_from(handle).await);
226    handle.add_task(VidTaskState::<TYPES, I>::create_from(handle).await);
227    handle.add_task(DaTaskState::<TYPES, I>::create_from(handle).await);
228    handle.add_task(TransactionTaskState::<TYPES>::create_from(handle).await);
229
230    let upgrade = handle.hotshot.upgrade_lock.upgrade();
231
232    // clear the loaded certificate if it's now outdated
233    handle.hotshot.upgrade_lock.apply(|cert| {
234        if cert
235            .as_ref()
236            .is_some_and(|c| upgrade.base >= c.data.new_version)
237        {
238            tracing::warn!("Discarding loaded upgrade certificate due to version configuration.");
239            *cert = None
240        }
241    });
242
243    // only spawn the upgrade task if we are actually configured to perform an upgrade.
244    if upgrade.base < upgrade.target {
245        tracing::warn!("Consensus was started with an upgrade configured. Spawning upgrade task.");
246        handle.add_task(UpgradeTaskState::<TYPES>::create_from(handle).await);
247    }
248
249    {
250        use hotshot_task_impls::{
251            consensus::ConsensusTaskState, quorum_proposal::QuorumProposalTaskState,
252            quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState,
253        };
254
255        handle.add_task(QuorumProposalTaskState::<TYPES, I>::create_from(handle).await);
256        handle.add_task(QuorumVoteTaskState::<TYPES, I>::create_from(handle).await);
257        handle.add_task(QuorumProposalRecvTaskState::<TYPES, I>::create_from(handle).await);
258        handle.add_task(ConsensusTaskState::<TYPES, I>::create_from(handle).await);
259        if cfg!(feature = "stats") {
260            handle.add_task(StatsTaskState::<TYPES>::create_from(handle).await);
261        }
262    }
263    add_queue_len_task(handle);
264    #[cfg(feature = "rewind")]
265    handle.add_task(RewindTaskState::<TYPES>::create_from(&handle).await);
266}
267
268/// Creates a monitor for shutdown events.
269///
270/// # Returns
271/// A `BoxFuture<'static, ()>` that resolves when a `HotShotEvent::Shutdown` is detected.
272///
273/// # Usage
274/// Use in `select!` macros or similar constructs for graceful shutdowns:
275#[must_use]
276pub fn create_shutdown_event_monitor<TYPES: NodeType, I: NodeImplementation<TYPES>>(
277    handle: &SystemContextHandle<TYPES, I>,
278) -> BoxFuture<'static, ()> {
279    // Activate the cloned internal event stream
280    let mut event_stream = handle.internal_event_stream.1.activate_cloned();
281
282    // Create a future that completes when the `HotShotEvent::Shutdown` is received
283    async move {
284        loop {
285            match event_stream.recv_direct().await {
286                Ok(event) => {
287                    if matches!(event.as_ref(), HotShotEvent::Shutdown) {
288                        return;
289                    }
290                },
291                Err(RecvError::Closed) => {
292                    return;
293                },
294                Err(e) => {
295                    tracing::error!("Shutdown event monitor channel recv error: {}", e);
296                },
297            }
298        }
299    }
300    .boxed()
301}
302
303/// Trait for intercepting and modifying messages between the network and consensus layers.
304///
305/// Consensus <-> [Byzantine logic layer] <-> Network
306#[allow(clippy::too_many_arguments)]
307#[async_trait]
308pub trait EventTransformerState<TYPES: NodeType, I: NodeImplementation<TYPES>>
309where
310    Self: std::fmt::Debug + Send + Sync + 'static,
311{
312    /// modify incoming messages from the network
313    async fn recv_handler(&mut self, event: &HotShotEvent<TYPES>) -> Vec<HotShotEvent<TYPES>>;
314
315    /// modify outgoing messages from the network
316    async fn send_handler(
317        &mut self,
318        event: &HotShotEvent<TYPES>,
319        public_key: &TYPES::SignatureKey,
320        private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
321        upgrade_lock: &UpgradeLock<TYPES>,
322        consensus: OuterConsensus<TYPES>,
323        membership_coordinator: EpochMembershipCoordinator<TYPES>,
324        network: Arc<I::Network>,
325    ) -> Vec<HotShotEvent<TYPES>>;
326
327    /// Creates a `SystemContextHandle` with the given even transformer
328    #[allow(clippy::too_many_arguments)]
329    async fn spawn_handle(
330        &'static mut self,
331        public_key: TYPES::SignatureKey,
332        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
333        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
334        nonce: u64,
335        config: HotShotConfig<TYPES>,
336        upgrade: versions::Upgrade,
337        memberships: EpochMembershipCoordinator<TYPES>,
338        network: Arc<I::Network>,
339        initializer: HotShotInitializer<TYPES>,
340        consensus_metrics: ConsensusMetricsValue,
341        storage: I::Storage,
342        storage_metrics: StorageMetricsValue,
343    ) -> SystemContextHandle<TYPES, I> {
344        let epoch_height = config.epoch_height;
345
346        let hotshot = SystemContext::new(
347            public_key,
348            private_key,
349            state_private_key,
350            nonce,
351            config,
352            upgrade,
353            memberships.clone(),
354            network,
355            initializer,
356            consensus_metrics,
357            storage.clone(),
358            storage_metrics,
359        )
360        .await;
361        let consensus_registry = ConsensusTaskRegistry::new();
362        let network_registry = NetworkTaskRegistry::new();
363
364        let output_event_stream = hotshot.external_event_stream.clone();
365        let internal_event_stream = hotshot.internal_event_stream.clone();
366
367        let mut handle = SystemContextHandle {
368            consensus_registry,
369            network_registry,
370            output_event_stream: output_event_stream.clone(),
371            internal_event_stream: internal_event_stream.clone(),
372            hotshot: Arc::clone(&hotshot),
373            storage,
374            network: Arc::clone(&hotshot.network),
375            membership_coordinator: memberships.clone(),
376            epoch_height,
377        };
378
379        add_consensus_tasks::<TYPES, I>(&mut handle).await;
380        self.add_network_tasks(&mut handle).await;
381
382        handle
383    }
384
385    /// Add byzantine network tasks with the trait
386    #[allow(clippy::too_many_lines)]
387    async fn add_network_tasks(&'static mut self, handle: &mut SystemContextHandle<TYPES, I>) {
388        // channels between the task spawned in this function and the network tasks.
389        // with this, we can control exactly what events the network tasks see.
390
391        // channel to the network task
392        let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
393        // channel from the network task
394        let (network_task_sender, receiver_from_network) = broadcast(EVENT_CHANNEL_SIZE);
395        // create a copy of the original receiver
396        let (original_sender, original_receiver) = (
397            handle.internal_event_stream.0.clone(),
398            handle.internal_event_stream.1.activate_cloned(),
399        );
400
401        // replace the internal event stream with the one we just created,
402        // so that the network tasks are spawned with our channel.
403        let mut internal_event_stream = (
404            network_task_sender.clone(),
405            network_task_receiver.clone().deactivate(),
406        );
407        std::mem::swap(
408            &mut internal_event_stream,
409            &mut handle.internal_event_stream,
410        );
411
412        // spawn the network tasks with our newly-created channel
413        add_network_message_and_request_receiver_tasks(handle).await;
414        self.add_network_event_tasks(handle);
415
416        std::mem::swap(
417            &mut internal_event_stream,
418            &mut handle.internal_event_stream,
419        );
420
421        let state_in = Arc::new(RwLock::new(self));
422        let state_out = Arc::clone(&state_in);
423        // spawn a task to listen on the (original) internal event stream,
424        // and broadcast the transformed events to the replacement event stream we just created.
425        let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
426        let public_key = handle.public_key().clone();
427        let private_key = handle.private_key().clone();
428        let upgrade_lock = handle.hotshot.upgrade_lock.clone();
429        let consensus = OuterConsensus::new(handle.consensus());
430        let membership_coordinator = handle.membership_coordinator.clone();
431        let network = Arc::clone(&handle.network);
432        let send_handle = spawn(async move {
433            futures::pin_mut!(shutdown_signal);
434
435            let recv_stream = stream::unfold(original_receiver, |mut recv| async move {
436                match recv.recv().await {
437                    Ok(event) => Some((Ok(event), recv)),
438                    Err(async_broadcast::RecvError::Closed) => None,
439                    Err(e) => Some((Err(e), recv)),
440                }
441            })
442            .boxed();
443
444            let fused_recv_stream = recv_stream.fuse();
445            futures::pin_mut!(fused_recv_stream);
446
447            loop {
448                futures::select! {
449                    () = shutdown_signal => {
450                        tracing::error!("Shutting down relay send task");
451                        let _ = sender_to_network.broadcast(HotShotEvent::<TYPES>::Shutdown.into()).await;
452                        return;
453                    }
454                    event = fused_recv_stream.next() => {
455                        match event {
456                            Some(Ok(msg)) => {
457                                let mut state = state_out.write().await;
458                                let mut results = state.send_handler(
459                                    &msg,
460                                    &public_key,
461                                    &private_key,
462                                    &upgrade_lock,
463                                    consensus.clone(),
464                                    membership_coordinator.clone(),
465                                    Arc::clone(&network),
466                                ).await;
467                                results.reverse();
468                                while let Some(event) = results.pop() {
469                                    let _ = sender_to_network.broadcast(event.into()).await;
470                                }
471                            }
472                            Some(Err(e)) => {
473                                tracing::error!("Relay Task, send_handle, Error receiving event: {e:?}");
474                            }
475                            None => {
476                                tracing::info!("Relay Task, send_handle, Event stream closed");
477                                return;
478                            }
479                        }
480                    }
481                }
482            }
483        });
484
485        // spawn a task to listen on the newly created event stream,
486        // and broadcast the transformed events to the original internal event stream
487        let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
488        let recv_handle = spawn(async move {
489            futures::pin_mut!(shutdown_signal);
490
491            let network_recv_stream =
492                stream::unfold(receiver_from_network, |mut recv| async move {
493                    match recv.recv().await {
494                        Ok(event) => Some((Ok(event), recv)),
495                        Err(async_broadcast::RecvError::Closed) => None,
496                        Err(e) => Some((Err(e), recv)),
497                    }
498                });
499
500            let fused_network_recv_stream = network_recv_stream.boxed().fuse();
501            futures::pin_mut!(fused_network_recv_stream);
502
503            loop {
504                futures::select! {
505                    () = shutdown_signal => {
506                        tracing::error!("Shutting down relay receive task");
507                        return;
508                    }
509                    event = fused_network_recv_stream.next() => {
510                        match event {
511                            Some(Ok(msg)) => {
512                                let mut state = state_in.write().await;
513                                let mut results = state.recv_handler(&msg).await;
514                                results.reverse();
515                                while let Some(event) = results.pop() {
516                                    let _ = original_sender.broadcast(event.into()).await;
517                                }
518                            }
519                            Some(Err(e)) => {
520                                tracing::error!("Relay Task, recv_handle, Error receiving event from network: {e:?}");
521                            }
522                            None => {
523                                tracing::info!("Relay Task, recv_handle, Network event stream closed");
524                                return;
525                            }
526                        }
527                    }
528                }
529            }
530        });
531
532        handle.network_registry.register(send_handle);
533        handle.network_registry.register(recv_handle);
534    }
535
536    /// Adds the `NetworkEventTaskState` tasks possibly modifying them as well.
537    fn add_network_event_tasks(&self, handle: &mut SystemContextHandle<TYPES, I>) {
538        let network = Arc::clone(&handle.network);
539
540        self.add_network_event_task(handle, network);
541    }
542
543    /// Adds a `NetworkEventTaskState` task. Can be reimplemented to modify its behaviour.
544    fn add_network_event_task(
545        &self,
546        handle: &mut SystemContextHandle<TYPES, I>,
547        channel: Arc<<I as NodeImplementation<TYPES>>::Network>,
548    ) {
549        add_network_event_task(handle, channel);
550    }
551}
552
553/// adds tasks for sending/receiving messages to/from the network.
554pub async fn add_network_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>>(
555    handle: &mut SystemContextHandle<TYPES, I>,
556) {
557    add_network_message_and_request_receiver_tasks(handle).await;
558
559    add_network_event_tasks(handle);
560}
561
562/// Adds the `NetworkMessageTaskState` tasks and the request / receiver tasks.
563pub async fn add_network_message_and_request_receiver_tasks<
564    TYPES: NodeType,
565    I: NodeImplementation<TYPES>,
566>(
567    handle: &mut SystemContextHandle<TYPES, I>,
568) {
569    let network = Arc::clone(&handle.network);
570
571    add_network_message_task(handle, &network);
572
573    add_request_network_task(handle).await;
574    add_response_task(handle);
575}
576
577/// Adds the `NetworkEventTaskState` tasks.
578pub fn add_network_event_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>>(
579    handle: &mut SystemContextHandle<TYPES, I>,
580) {
581    add_network_event_task(handle, Arc::clone(&handle.network));
582}