hotshot/tasks/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides a number of tasks that run continuously
8
9/// Provides trait to create task states from a `SystemContextHandle`
10pub mod task_state;
11use std::{collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration};
12
13use async_broadcast::{broadcast, RecvError};
14use async_lock::RwLock;
15use async_trait::async_trait;
16use futures::{
17    future::{BoxFuture, FutureExt},
18    stream, StreamExt,
19};
20use hotshot_task::task::Task;
21#[cfg(feature = "rewind")]
22use hotshot_task_impls::rewind::RewindTaskState;
23use hotshot_task_impls::{
24    da::DaTaskState,
25    events::HotShotEvent,
26    network::{NetworkEventTaskState, NetworkMessageTaskState},
27    request::NetworkRequestState,
28    response::{run_response_task, NetworkResponseState},
29    stats::StatsTaskState,
30    transactions::TransactionTaskState,
31    upgrade::UpgradeTaskState,
32    vid::VidTaskState,
33    view_sync::ViewSyncTaskState,
34};
35use hotshot_types::{
36    consensus::OuterConsensus,
37    constants::EVENT_CHANNEL_SIZE,
38    message::{Message, MessageKind, UpgradeLock, EXTERNAL_MESSAGE_VERSION},
39    storage_metrics::StorageMetricsValue,
40    traits::{
41        network::ConnectedNetwork,
42        node_implementation::{ConsensusTime, NodeImplementation, NodeType},
43    },
44};
45use tokio::{spawn, time::sleep};
46use vbs::version::{StaticVersionType, Version};
47
48use crate::{
49    genesis_epoch_from_version, tasks::task_state::CreateTaskState, types::SystemContextHandle,
50    ConsensusApi, ConsensusMetricsValue, ConsensusTaskRegistry, EpochMembershipCoordinator,
51    HotShotConfig, HotShotInitializer, NetworkTaskRegistry, SignatureKey, StateSignatureKey,
52    SystemContext, Versions,
53};
54
55/// event for global event stream
56#[derive(Clone, Debug)]
57pub enum GlobalEvent {
58    /// shut everything down
59    Shutdown,
60    /// dummy (TODO delete later)
61    Dummy,
62}
63
64/// Add tasks for network requests and responses
65pub async fn add_request_network_task<
66    TYPES: NodeType,
67    I: NodeImplementation<TYPES>,
68    V: Versions,
69>(
70    handle: &mut SystemContextHandle<TYPES, I, V>,
71) {
72    let state = NetworkRequestState::<TYPES, I>::create_from(handle).await;
73
74    let task = Task::new(
75        state,
76        handle.internal_event_stream.0.clone(),
77        handle.internal_event_stream.1.activate_cloned(),
78    );
79    handle.consensus_registry.run_task(task);
80}
81
82/// Add a task which responds to requests on the network.
83pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
84    handle: &mut SystemContextHandle<TYPES, I, V>,
85) {
86    let state = NetworkResponseState::<TYPES, V>::new(
87        handle.hotshot.consensus(),
88        handle.membership_coordinator.clone(),
89        handle.public_key().clone(),
90        handle.private_key().clone(),
91        handle.hotshot.id,
92        handle.hotshot.upgrade_lock.clone(),
93    );
94    handle
95        .network_registry
96        .register(run_response_task::<TYPES, V>(
97            state,
98            handle.internal_event_stream.1.activate_cloned(),
99            handle.internal_event_stream.0.clone(),
100        ));
101}
102
103/// Add a task which updates our queue length metric at a set interval
104pub fn add_queue_len_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
105    handle: &mut SystemContextHandle<TYPES, I, V>,
106) {
107    let consensus = handle.hotshot.consensus();
108    let rx = handle.internal_event_stream.1.clone();
109    let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
110    let task_handle = spawn(async move {
111        futures::pin_mut!(shutdown_signal);
112        loop {
113            futures::select! {
114                () = shutdown_signal => {
115                    return;
116                },
117                () = sleep(Duration::from_millis(500)).fuse() => {
118                    consensus.read().await.metrics.internal_event_queue_len.set(rx.len());
119                }
120            }
121        }
122    });
123    handle.network_registry.register(task_handle);
124}
125
126/// Add the network task to handle messages and publish events.
127#[allow(clippy::missing_panics_doc)]
128pub fn add_network_message_task<
129    TYPES: NodeType,
130    I: NodeImplementation<TYPES>,
131    NET: ConnectedNetwork<TYPES::SignatureKey>,
132    V: Versions,
133>(
134    handle: &mut SystemContextHandle<TYPES, I, V>,
135    channel: &Arc<NET>,
136) {
137    let upgrade_lock = handle.hotshot.upgrade_lock.clone();
138
139    let network_state: NetworkMessageTaskState<TYPES, V> = NetworkMessageTaskState {
140        internal_event_stream: handle.internal_event_stream.0.clone(),
141        external_event_stream: handle.output_event_stream.0.clone(),
142        public_key: handle.public_key().clone(),
143        upgrade_lock: upgrade_lock.clone(),
144        id: handle.hotshot.id,
145    };
146
147    let network = Arc::clone(channel);
148    let mut state = network_state.clone();
149    let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
150    let task_handle = spawn(async move {
151        futures::pin_mut!(shutdown_signal);
152
153        loop {
154            // Wait for one of the following to resolve:
155            futures::select! {
156                // Wait for a shutdown signal
157                () = shutdown_signal => {
158                    tracing::error!("Shutting down network message task");
159                    return;
160                }
161
162                // Wait for a message from the network
163                message = network.recv_message().fuse() => {
164                    // Make sure the message did not fail
165                    let Ok(message) = message else {
166                        continue;
167                    };
168
169                    // Deserialize the message and get the version
170                    let (deserialized_message, version): (Message<TYPES>, Version) = match upgrade_lock.deserialize(&message).await {
171                        Ok(message) => message,
172                        Err(e) => {
173                            tracing::error!("Failed to deserialize message: {:?}", e);
174                            continue;
175                        }
176                    };
177
178                    // Special case: external messages (version 0.0). We want to make sure it is an external message
179                    // and warn and continue otherwise.
180                    if version == EXTERNAL_MESSAGE_VERSION
181                        && !matches!(deserialized_message.kind, MessageKind::<TYPES>::External(_))
182                    {
183                        tracing::warn!("Received a non-external message with version 0.0");
184                        continue;
185                    }
186
187                    // Handle the message
188                    state.handle_message(deserialized_message).await;
189                }
190            }
191        }
192    });
193    handle.network_registry.register(task_handle);
194}
195
196/// Add the network task to handle events and send messages.
197pub fn add_network_event_task<
198    TYPES: NodeType,
199    I: NodeImplementation<TYPES>,
200    V: Versions,
201    NET: ConnectedNetwork<TYPES::SignatureKey>,
202>(
203    handle: &mut SystemContextHandle<TYPES, I, V>,
204    network: Arc<NET>,
205) {
206    let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState {
207        network,
208        view: TYPES::View::genesis(),
209        epoch: genesis_epoch_from_version::<V, TYPES>(),
210        membership_coordinator: handle.membership_coordinator.clone(),
211        storage: handle.storage(),
212        storage_metrics: handle.storage_metrics(),
213        consensus: OuterConsensus::new(handle.consensus()),
214        upgrade_lock: handle.hotshot.upgrade_lock.clone(),
215        transmit_tasks: BTreeMap::new(),
216        epoch_height: handle.epoch_height,
217        id: handle.hotshot.id,
218    };
219    let task = Task::new(
220        network_state,
221        handle.internal_event_stream.0.clone(),
222        handle.internal_event_stream.1.activate_cloned(),
223    );
224    handle.consensus_registry.run_task(task);
225}
226
227/// Adds consensus-related tasks to a `SystemContextHandle`.
228pub async fn add_consensus_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
229    handle: &mut SystemContextHandle<TYPES, I, V>,
230) {
231    handle.add_task(ViewSyncTaskState::<TYPES, V>::create_from(handle).await);
232    handle.add_task(VidTaskState::<TYPES, I, V>::create_from(handle).await);
233    handle.add_task(DaTaskState::<TYPES, I, V>::create_from(handle).await);
234    handle.add_task(TransactionTaskState::<TYPES, V>::create_from(handle).await);
235
236    {
237        let mut upgrade_certificate_lock = handle
238            .hotshot
239            .upgrade_lock
240            .decided_upgrade_certificate
241            .write()
242            .await;
243
244        // clear the loaded certificate if it's now outdated
245        if upgrade_certificate_lock
246            .as_ref()
247            .is_some_and(|cert| V::Base::VERSION >= cert.data.new_version)
248        {
249            tracing::warn!("Discarding loaded upgrade certificate due to version configuration.");
250            *upgrade_certificate_lock = None;
251        }
252    }
253
254    // only spawn the upgrade task if we are actually configured to perform an upgrade.
255    if V::Base::VERSION < V::Upgrade::VERSION {
256        tracing::warn!("Consensus was started with an upgrade configured. Spawning upgrade task.");
257        handle.add_task(UpgradeTaskState::<TYPES, V>::create_from(handle).await);
258    }
259
260    {
261        use hotshot_task_impls::{
262            consensus::ConsensusTaskState, quorum_proposal::QuorumProposalTaskState,
263            quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState,
264        };
265
266        handle.add_task(QuorumProposalTaskState::<TYPES, I, V>::create_from(handle).await);
267        handle.add_task(QuorumVoteTaskState::<TYPES, I, V>::create_from(handle).await);
268        handle.add_task(QuorumProposalRecvTaskState::<TYPES, I, V>::create_from(handle).await);
269        handle.add_task(ConsensusTaskState::<TYPES, I, V>::create_from(handle).await);
270        if cfg!(feature = "stats") {
271            handle.add_task(StatsTaskState::<TYPES>::create_from(handle).await);
272        }
273    }
274    add_queue_len_task(handle);
275    #[cfg(feature = "rewind")]
276    handle.add_task(RewindTaskState::<TYPES>::create_from(&handle).await);
277}
278
279/// Creates a monitor for shutdown events.
280///
281/// # Returns
282/// A `BoxFuture<'static, ()>` that resolves when a `HotShotEvent::Shutdown` is detected.
283///
284/// # Usage
285/// Use in `select!` macros or similar constructs for graceful shutdowns:
286#[must_use]
287pub fn create_shutdown_event_monitor<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
288    handle: &SystemContextHandle<TYPES, I, V>,
289) -> BoxFuture<'static, ()> {
290    // Activate the cloned internal event stream
291    let mut event_stream = handle.internal_event_stream.1.activate_cloned();
292
293    // Create a future that completes when the `HotShotEvent::Shutdown` is received
294    async move {
295        loop {
296            match event_stream.recv_direct().await {
297                Ok(event) => {
298                    if matches!(event.as_ref(), HotShotEvent::Shutdown) {
299                        return;
300                    }
301                },
302                Err(RecvError::Closed) => {
303                    return;
304                },
305                Err(e) => {
306                    tracing::error!("Shutdown event monitor channel recv error: {}", e);
307                },
308            }
309        }
310    }
311    .boxed()
312}
313
314#[allow(clippy::too_many_arguments)]
315#[async_trait]
316/// Trait for intercepting and modifying messages between the network and consensus layers.
317///
318/// Consensus <-> [Byzantine logic layer] <-> Network
319pub trait EventTransformerState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
320where
321    Self: std::fmt::Debug + Send + Sync + 'static,
322{
323    /// modify incoming messages from the network
324    async fn recv_handler(&mut self, event: &HotShotEvent<TYPES>) -> Vec<HotShotEvent<TYPES>>;
325
326    /// modify outgoing messages from the network
327    async fn send_handler(
328        &mut self,
329        event: &HotShotEvent<TYPES>,
330        public_key: &TYPES::SignatureKey,
331        private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
332        upgrade_lock: &UpgradeLock<TYPES, V>,
333        consensus: OuterConsensus<TYPES>,
334        membership_coordinator: EpochMembershipCoordinator<TYPES>,
335        network: Arc<I::Network>,
336    ) -> Vec<HotShotEvent<TYPES>>;
337
338    #[allow(clippy::too_many_arguments)]
339    /// Creates a `SystemContextHandle` with the given even transformer
340    async fn spawn_handle(
341        &'static mut self,
342        public_key: TYPES::SignatureKey,
343        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
344        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
345        nonce: u64,
346        config: HotShotConfig<TYPES>,
347        memberships: EpochMembershipCoordinator<TYPES>,
348        network: Arc<I::Network>,
349        initializer: HotShotInitializer<TYPES>,
350        consensus_metrics: ConsensusMetricsValue,
351        storage: I::Storage,
352        storage_metrics: StorageMetricsValue,
353    ) -> SystemContextHandle<TYPES, I, V> {
354        let epoch_height = config.epoch_height;
355
356        let hotshot = SystemContext::new(
357            public_key,
358            private_key,
359            state_private_key,
360            nonce,
361            config,
362            memberships.clone(),
363            network,
364            initializer,
365            consensus_metrics,
366            storage.clone(),
367            storage_metrics,
368        )
369        .await;
370        let consensus_registry = ConsensusTaskRegistry::new();
371        let network_registry = NetworkTaskRegistry::new();
372
373        let output_event_stream = hotshot.external_event_stream.clone();
374        let internal_event_stream = hotshot.internal_event_stream.clone();
375
376        let mut handle = SystemContextHandle {
377            consensus_registry,
378            network_registry,
379            output_event_stream: output_event_stream.clone(),
380            internal_event_stream: internal_event_stream.clone(),
381            hotshot: Arc::clone(&hotshot),
382            storage,
383            network: Arc::clone(&hotshot.network),
384            membership_coordinator: memberships.clone(),
385            epoch_height,
386        };
387
388        add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
389        self.add_network_tasks(&mut handle).await;
390
391        handle
392    }
393
394    /// Add byzantine network tasks with the trait
395    #[allow(clippy::too_many_lines)]
396    async fn add_network_tasks(&'static mut self, handle: &mut SystemContextHandle<TYPES, I, V>) {
397        // channels between the task spawned in this function and the network tasks.
398        // with this, we can control exactly what events the network tasks see.
399
400        // channel to the network task
401        let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
402        // channel from the network task
403        let (network_task_sender, receiver_from_network) = broadcast(EVENT_CHANNEL_SIZE);
404        // create a copy of the original receiver
405        let (original_sender, original_receiver) = (
406            handle.internal_event_stream.0.clone(),
407            handle.internal_event_stream.1.activate_cloned(),
408        );
409
410        // replace the internal event stream with the one we just created,
411        // so that the network tasks are spawned with our channel.
412        let mut internal_event_stream = (
413            network_task_sender.clone(),
414            network_task_receiver.clone().deactivate(),
415        );
416        std::mem::swap(
417            &mut internal_event_stream,
418            &mut handle.internal_event_stream,
419        );
420
421        // spawn the network tasks with our newly-created channel
422        add_network_message_and_request_receiver_tasks(handle).await;
423        self.add_network_event_tasks(handle);
424
425        std::mem::swap(
426            &mut internal_event_stream,
427            &mut handle.internal_event_stream,
428        );
429
430        let state_in = Arc::new(RwLock::new(self));
431        let state_out = Arc::clone(&state_in);
432        // spawn a task to listen on the (original) internal event stream,
433        // and broadcast the transformed events to the replacement event stream we just created.
434        let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
435        let public_key = handle.public_key().clone();
436        let private_key = handle.private_key().clone();
437        let upgrade_lock = handle.hotshot.upgrade_lock.clone();
438        let consensus = OuterConsensus::new(handle.consensus());
439        let membership_coordinator = handle.membership_coordinator.clone();
440        let network = Arc::clone(&handle.network);
441        let send_handle = spawn(async move {
442            futures::pin_mut!(shutdown_signal);
443
444            let recv_stream = stream::unfold(original_receiver, |mut recv| async move {
445                match recv.recv().await {
446                    Ok(event) => Some((Ok(event), recv)),
447                    Err(async_broadcast::RecvError::Closed) => None,
448                    Err(e) => Some((Err(e), recv)),
449                }
450            })
451            .boxed();
452
453            let fused_recv_stream = recv_stream.fuse();
454            futures::pin_mut!(fused_recv_stream);
455
456            loop {
457                futures::select! {
458                    () = shutdown_signal => {
459                        tracing::error!("Shutting down relay send task");
460                        let _ = sender_to_network.broadcast(HotShotEvent::<TYPES>::Shutdown.into()).await;
461                        return;
462                    }
463                    event = fused_recv_stream.next() => {
464                        match event {
465                            Some(Ok(msg)) => {
466                                let mut state = state_out.write().await;
467                                let mut results = state.send_handler(
468                                    &msg,
469                                    &public_key,
470                                    &private_key,
471                                    &upgrade_lock,
472                                    consensus.clone(),
473                                    membership_coordinator.clone(),
474                                    Arc::clone(&network),
475                                ).await;
476                                results.reverse();
477                                while let Some(event) = results.pop() {
478                                    let _ = sender_to_network.broadcast(event.into()).await;
479                                }
480                            }
481                            Some(Err(e)) => {
482                                tracing::error!("Relay Task, send_handle, Error receiving event: {e:?}");
483                            }
484                            None => {
485                                tracing::info!("Relay Task, send_handle, Event stream closed");
486                                return;
487                            }
488                        }
489                    }
490                }
491            }
492        });
493
494        // spawn a task to listen on the newly created event stream,
495        // and broadcast the transformed events to the original internal event stream
496        let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
497        let recv_handle = spawn(async move {
498            futures::pin_mut!(shutdown_signal);
499
500            let network_recv_stream =
501                stream::unfold(receiver_from_network, |mut recv| async move {
502                    match recv.recv().await {
503                        Ok(event) => Some((Ok(event), recv)),
504                        Err(async_broadcast::RecvError::Closed) => None,
505                        Err(e) => Some((Err(e), recv)),
506                    }
507                });
508
509            let fused_network_recv_stream = network_recv_stream.boxed().fuse();
510            futures::pin_mut!(fused_network_recv_stream);
511
512            loop {
513                futures::select! {
514                    () = shutdown_signal => {
515                        tracing::error!("Shutting down relay receive task");
516                        return;
517                    }
518                    event = fused_network_recv_stream.next() => {
519                        match event {
520                            Some(Ok(msg)) => {
521                                let mut state = state_in.write().await;
522                                let mut results = state.recv_handler(&msg).await;
523                                results.reverse();
524                                while let Some(event) = results.pop() {
525                                    let _ = original_sender.broadcast(event.into()).await;
526                                }
527                            }
528                            Some(Err(e)) => {
529                                tracing::error!("Relay Task, recv_handle, Error receiving event from network: {e:?}");
530                            }
531                            None => {
532                                tracing::info!("Relay Task, recv_handle, Network event stream closed");
533                                return;
534                            }
535                        }
536                    }
537                }
538            }
539        });
540
541        handle.network_registry.register(send_handle);
542        handle.network_registry.register(recv_handle);
543    }
544
545    /// Adds the `NetworkEventTaskState` tasks possibly modifying them as well.
546    fn add_network_event_tasks(&self, handle: &mut SystemContextHandle<TYPES, I, V>) {
547        let network = Arc::clone(&handle.network);
548
549        self.add_network_event_task(handle, network);
550    }
551
552    /// Adds a `NetworkEventTaskState` task. Can be reimplemented to modify its behaviour.
553    fn add_network_event_task(
554        &self,
555        handle: &mut SystemContextHandle<TYPES, I, V>,
556        channel: Arc<<I as NodeImplementation<TYPES>>::Network>,
557    ) {
558        add_network_event_task(handle, channel);
559    }
560}
561
562/// adds tasks for sending/receiving messages to/from the network.
563pub async fn add_network_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
564    handle: &mut SystemContextHandle<TYPES, I, V>,
565) {
566    add_network_message_and_request_receiver_tasks(handle).await;
567
568    add_network_event_tasks(handle);
569}
570
571/// Adds the `NetworkMessageTaskState` tasks and the request / receiver tasks.
572pub async fn add_network_message_and_request_receiver_tasks<
573    TYPES: NodeType,
574    I: NodeImplementation<TYPES>,
575    V: Versions,
576>(
577    handle: &mut SystemContextHandle<TYPES, I, V>,
578) {
579    let network = Arc::clone(&handle.network);
580
581    add_network_message_task(handle, &network);
582
583    add_request_network_task(handle).await;
584    add_response_task(handle);
585}
586
587/// Adds the `NetworkEventTaskState` tasks.
588pub fn add_network_event_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
589    handle: &mut SystemContextHandle<TYPES, I, V>,
590) {
591    add_network_event_task(handle, Arc::clone(&handle.network));
592}