hotshot/tasks/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides a number of tasks that run continuously
8
9/// Provides trait to create task states from a `SystemContextHandle`
10pub mod task_state;
11use std::{collections::BTreeMap, fmt::Debug, num::NonZeroUsize, sync::Arc, time::Duration};
12
13use async_broadcast::{broadcast, RecvError};
14use async_lock::RwLock;
15use async_trait::async_trait;
16use futures::{
17    future::{BoxFuture, FutureExt},
18    stream, StreamExt,
19};
20use hotshot_task::task::Task;
21#[cfg(feature = "rewind")]
22use hotshot_task_impls::rewind::RewindTaskState;
23use hotshot_task_impls::{
24    da::DaTaskState,
25    events::HotShotEvent,
26    network::{NetworkEventTaskState, NetworkMessageTaskState},
27    request::NetworkRequestState,
28    response::{run_response_task, NetworkResponseState},
29    stats::StatsTaskState,
30    transactions::TransactionTaskState,
31    upgrade::UpgradeTaskState,
32    vid::VidTaskState,
33    view_sync::ViewSyncTaskState,
34};
35use hotshot_types::{
36    consensus::OuterConsensus,
37    constants::EVENT_CHANNEL_SIZE,
38    message::{Message, MessageKind, UpgradeLock, EXTERNAL_MESSAGE_VERSION},
39    storage_metrics::StorageMetricsValue,
40    traits::{
41        network::ConnectedNetwork,
42        node_implementation::{ConsensusTime, NodeImplementation, NodeType},
43    },
44};
45use tokio::{spawn, time::sleep};
46use vbs::version::{StaticVersionType, Version};
47
48use crate::{
49    genesis_epoch_from_version, tasks::task_state::CreateTaskState, types::SystemContextHandle,
50    ConsensusApi, ConsensusMetricsValue, ConsensusTaskRegistry, EpochMembershipCoordinator,
51    HotShotConfig, HotShotInitializer, NetworkTaskRegistry, SignatureKey, StateSignatureKey,
52    SystemContext, Versions,
53};
54
55/// event for global event stream
56#[derive(Clone, Debug)]
57pub enum GlobalEvent {
58    /// shut everything down
59    Shutdown,
60    /// dummy (TODO delete later)
61    Dummy,
62}
63
64/// Add tasks for network requests and responses
65pub async fn add_request_network_task<
66    TYPES: NodeType,
67    I: NodeImplementation<TYPES>,
68    V: Versions,
69>(
70    handle: &mut SystemContextHandle<TYPES, I, V>,
71) {
72    let state = NetworkRequestState::<TYPES, I>::create_from(handle).await;
73
74    let task = Task::new(
75        state,
76        handle.internal_event_stream.0.clone(),
77        handle.internal_event_stream.1.activate_cloned(),
78    );
79    handle.consensus_registry.run_task(task);
80}
81
82/// Add a task which responds to requests on the network.
83pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
84    handle: &mut SystemContextHandle<TYPES, I, V>,
85) {
86    let state = NetworkResponseState::<TYPES, V>::new(
87        handle.hotshot.consensus(),
88        handle.membership_coordinator.clone(),
89        handle.public_key().clone(),
90        handle.private_key().clone(),
91        handle.hotshot.id,
92        handle.hotshot.upgrade_lock.clone(),
93    );
94    handle
95        .network_registry
96        .register(run_response_task::<TYPES, V>(
97            state,
98            handle.internal_event_stream.1.activate_cloned(),
99            handle.internal_event_stream.0.clone(),
100        ));
101}
102
103/// Add a task which updates our queue length metric at a set interval
104pub fn add_queue_len_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
105    handle: &mut SystemContextHandle<TYPES, I, V>,
106) {
107    let consensus = handle.hotshot.consensus();
108    let rx = handle.internal_event_stream.1.clone();
109    let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
110    let task_handle = spawn(async move {
111        futures::pin_mut!(shutdown_signal);
112        loop {
113            futures::select! {
114                () = shutdown_signal => {
115                    return;
116                },
117                () = sleep(Duration::from_millis(500)).fuse() => {
118                    consensus.read().await.metrics.internal_event_queue_len.set(rx.len());
119                }
120            }
121        }
122    });
123    handle.network_registry.register(task_handle);
124}
125
126/// Add the network task to handle messages and publish events.
127#[allow(clippy::missing_panics_doc)]
128pub fn add_network_message_task<
129    TYPES: NodeType,
130    I: NodeImplementation<TYPES>,
131    NET: ConnectedNetwork<TYPES::SignatureKey>,
132    V: Versions,
133>(
134    handle: &mut SystemContextHandle<TYPES, I, V>,
135    channel: &Arc<NET>,
136) {
137    let upgrade_lock = handle.hotshot.upgrade_lock.clone();
138
139    let network_state: NetworkMessageTaskState<TYPES, V> = NetworkMessageTaskState {
140        internal_event_stream: handle.internal_event_stream.0.clone(),
141        external_event_stream: handle.output_event_stream.0.clone(),
142        public_key: handle.public_key().clone(),
143        transactions_cache: lru::LruCache::new(NonZeroUsize::new(100_000).unwrap()),
144        upgrade_lock: upgrade_lock.clone(),
145        id: handle.hotshot.id,
146    };
147
148    let network = Arc::clone(channel);
149    let mut state = network_state.clone();
150    let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
151    let task_handle = spawn(async move {
152        futures::pin_mut!(shutdown_signal);
153
154        loop {
155            // Wait for one of the following to resolve:
156            futures::select! {
157                // Wait for a shutdown signal
158                () = shutdown_signal => {
159                    tracing::error!("Shutting down network message task");
160                    return;
161                }
162
163                // Wait for a message from the network
164                message = network.recv_message().fuse() => {
165                    // Make sure the message did not fail
166                    let Ok(message) = message else {
167                        continue;
168                    };
169
170                    // Deserialize the message and get the version
171                    let (deserialized_message, version): (Message<TYPES>, Version) = match upgrade_lock.deserialize(&message).await {
172                        Ok(message) => message,
173                        Err(e) => {
174                            tracing::error!("Failed to deserialize message: {:?}", e);
175                            continue;
176                        }
177                    };
178
179                    // Special case: external messages (version 0.0). We want to make sure it is an external message
180                    // and warn and continue otherwise.
181                    if version == EXTERNAL_MESSAGE_VERSION
182                        && !matches!(deserialized_message.kind, MessageKind::<TYPES>::External(_))
183                    {
184                        tracing::warn!("Received a non-external message with version 0.0");
185                        continue;
186                    }
187
188                    // Handle the message
189                    state.handle_message(deserialized_message).await;
190                }
191            }
192        }
193    });
194    handle.network_registry.register(task_handle);
195}
196
197/// Add the network task to handle events and send messages.
198pub fn add_network_event_task<
199    TYPES: NodeType,
200    I: NodeImplementation<TYPES>,
201    V: Versions,
202    NET: ConnectedNetwork<TYPES::SignatureKey>,
203>(
204    handle: &mut SystemContextHandle<TYPES, I, V>,
205    network: Arc<NET>,
206) {
207    let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState {
208        network,
209        view: TYPES::View::genesis(),
210        epoch: genesis_epoch_from_version::<V, TYPES>(),
211        membership_coordinator: handle.membership_coordinator.clone(),
212        storage: handle.storage(),
213        storage_metrics: handle.storage_metrics(),
214        consensus: OuterConsensus::new(handle.consensus()),
215        upgrade_lock: handle.hotshot.upgrade_lock.clone(),
216        transmit_tasks: BTreeMap::new(),
217        epoch_height: handle.epoch_height,
218        id: handle.hotshot.id,
219    };
220    let task = Task::new(
221        network_state,
222        handle.internal_event_stream.0.clone(),
223        handle.internal_event_stream.1.activate_cloned(),
224    );
225    handle.consensus_registry.run_task(task);
226}
227
228/// Adds consensus-related tasks to a `SystemContextHandle`.
229pub async fn add_consensus_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
230    handle: &mut SystemContextHandle<TYPES, I, V>,
231) {
232    handle.add_task(ViewSyncTaskState::<TYPES, V>::create_from(handle).await);
233    handle.add_task(VidTaskState::<TYPES, I, V>::create_from(handle).await);
234    handle.add_task(DaTaskState::<TYPES, I, V>::create_from(handle).await);
235    handle.add_task(TransactionTaskState::<TYPES, V>::create_from(handle).await);
236
237    {
238        let mut upgrade_certificate_lock = handle
239            .hotshot
240            .upgrade_lock
241            .decided_upgrade_certificate
242            .write()
243            .await;
244
245        // clear the loaded certificate if it's now outdated
246        if upgrade_certificate_lock
247            .as_ref()
248            .is_some_and(|cert| V::Base::VERSION >= cert.data.new_version)
249        {
250            tracing::warn!("Discarding loaded upgrade certificate due to version configuration.");
251            *upgrade_certificate_lock = None;
252        }
253    }
254
255    // only spawn the upgrade task if we are actually configured to perform an upgrade.
256    if V::Base::VERSION < V::Upgrade::VERSION {
257        tracing::warn!("Consensus was started with an upgrade configured. Spawning upgrade task.");
258        handle.add_task(UpgradeTaskState::<TYPES, V>::create_from(handle).await);
259    }
260
261    {
262        use hotshot_task_impls::{
263            consensus::ConsensusTaskState, quorum_proposal::QuorumProposalTaskState,
264            quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState,
265        };
266
267        handle.add_task(QuorumProposalTaskState::<TYPES, I, V>::create_from(handle).await);
268        handle.add_task(QuorumVoteTaskState::<TYPES, I, V>::create_from(handle).await);
269        handle.add_task(QuorumProposalRecvTaskState::<TYPES, I, V>::create_from(handle).await);
270        handle.add_task(ConsensusTaskState::<TYPES, I, V>::create_from(handle).await);
271        handle.add_task(StatsTaskState::<TYPES>::create_from(handle).await);
272    }
273    add_queue_len_task(handle);
274    #[cfg(feature = "rewind")]
275    handle.add_task(RewindTaskState::<TYPES>::create_from(&handle).await);
276}
277
278/// Creates a monitor for shutdown events.
279///
280/// # Returns
281/// A `BoxFuture<'static, ()>` that resolves when a `HotShotEvent::Shutdown` is detected.
282///
283/// # Usage
284/// Use in `select!` macros or similar constructs for graceful shutdowns:
285#[must_use]
286pub fn create_shutdown_event_monitor<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
287    handle: &SystemContextHandle<TYPES, I, V>,
288) -> BoxFuture<'static, ()> {
289    // Activate the cloned internal event stream
290    let mut event_stream = handle.internal_event_stream.1.activate_cloned();
291
292    // Create a future that completes when the `HotShotEvent::Shutdown` is received
293    async move {
294        loop {
295            match event_stream.recv_direct().await {
296                Ok(event) => {
297                    if matches!(event.as_ref(), HotShotEvent::Shutdown) {
298                        return;
299                    }
300                },
301                Err(RecvError::Closed) => {
302                    return;
303                },
304                Err(e) => {
305                    tracing::error!("Shutdown event monitor channel recv error: {}", e);
306                },
307            }
308        }
309    }
310    .boxed()
311}
312
313#[allow(clippy::too_many_arguments)]
314#[async_trait]
315/// Trait for intercepting and modifying messages between the network and consensus layers.
316///
317/// Consensus <-> [Byzantine logic layer] <-> Network
318pub trait EventTransformerState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
319where
320    Self: std::fmt::Debug + Send + Sync + 'static,
321{
322    /// modify incoming messages from the network
323    async fn recv_handler(&mut self, event: &HotShotEvent<TYPES>) -> Vec<HotShotEvent<TYPES>>;
324
325    /// modify outgoing messages from the network
326    async fn send_handler(
327        &mut self,
328        event: &HotShotEvent<TYPES>,
329        public_key: &TYPES::SignatureKey,
330        private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
331        upgrade_lock: &UpgradeLock<TYPES, V>,
332        consensus: OuterConsensus<TYPES>,
333        membership_coordinator: EpochMembershipCoordinator<TYPES>,
334        network: Arc<I::Network>,
335    ) -> Vec<HotShotEvent<TYPES>>;
336
337    #[allow(clippy::too_many_arguments)]
338    /// Creates a `SystemContextHandle` with the given even transformer
339    async fn spawn_handle(
340        &'static mut self,
341        public_key: TYPES::SignatureKey,
342        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
343        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
344        nonce: u64,
345        config: HotShotConfig<TYPES>,
346        memberships: EpochMembershipCoordinator<TYPES>,
347        network: Arc<I::Network>,
348        initializer: HotShotInitializer<TYPES>,
349        consensus_metrics: ConsensusMetricsValue,
350        storage: I::Storage,
351        storage_metrics: StorageMetricsValue,
352    ) -> SystemContextHandle<TYPES, I, V> {
353        let epoch_height = config.epoch_height;
354
355        let hotshot = SystemContext::new(
356            public_key,
357            private_key,
358            state_private_key,
359            nonce,
360            config,
361            memberships.clone(),
362            network,
363            initializer,
364            consensus_metrics,
365            storage.clone(),
366            storage_metrics,
367        )
368        .await;
369        let consensus_registry = ConsensusTaskRegistry::new();
370        let network_registry = NetworkTaskRegistry::new();
371
372        let output_event_stream = hotshot.external_event_stream.clone();
373        let internal_event_stream = hotshot.internal_event_stream.clone();
374
375        let mut handle = SystemContextHandle {
376            consensus_registry,
377            network_registry,
378            output_event_stream: output_event_stream.clone(),
379            internal_event_stream: internal_event_stream.clone(),
380            hotshot: Arc::clone(&hotshot),
381            storage,
382            network: Arc::clone(&hotshot.network),
383            membership_coordinator: memberships.clone(),
384            epoch_height,
385        };
386
387        add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
388        self.add_network_tasks(&mut handle).await;
389
390        handle
391    }
392
393    /// Add byzantine network tasks with the trait
394    #[allow(clippy::too_many_lines)]
395    async fn add_network_tasks(&'static mut self, handle: &mut SystemContextHandle<TYPES, I, V>) {
396        // channels between the task spawned in this function and the network tasks.
397        // with this, we can control exactly what events the network tasks see.
398
399        // channel to the network task
400        let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
401        // channel from the network task
402        let (network_task_sender, receiver_from_network) = broadcast(EVENT_CHANNEL_SIZE);
403        // create a copy of the original receiver
404        let (original_sender, original_receiver) = (
405            handle.internal_event_stream.0.clone(),
406            handle.internal_event_stream.1.activate_cloned(),
407        );
408
409        // replace the internal event stream with the one we just created,
410        // so that the network tasks are spawned with our channel.
411        let mut internal_event_stream = (
412            network_task_sender.clone(),
413            network_task_receiver.clone().deactivate(),
414        );
415        std::mem::swap(
416            &mut internal_event_stream,
417            &mut handle.internal_event_stream,
418        );
419
420        // spawn the network tasks with our newly-created channel
421        add_network_message_and_request_receiver_tasks(handle).await;
422        self.add_network_event_tasks(handle);
423
424        std::mem::swap(
425            &mut internal_event_stream,
426            &mut handle.internal_event_stream,
427        );
428
429        let state_in = Arc::new(RwLock::new(self));
430        let state_out = Arc::clone(&state_in);
431        // spawn a task to listen on the (original) internal event stream,
432        // and broadcast the transformed events to the replacement event stream we just created.
433        let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
434        let public_key = handle.public_key().clone();
435        let private_key = handle.private_key().clone();
436        let upgrade_lock = handle.hotshot.upgrade_lock.clone();
437        let consensus = OuterConsensus::new(handle.consensus());
438        let membership_coordinator = handle.membership_coordinator.clone();
439        let network = Arc::clone(&handle.network);
440        let send_handle = spawn(async move {
441            futures::pin_mut!(shutdown_signal);
442
443            let recv_stream = stream::unfold(original_receiver, |mut recv| async move {
444                match recv.recv().await {
445                    Ok(event) => Some((Ok(event), recv)),
446                    Err(async_broadcast::RecvError::Closed) => None,
447                    Err(e) => Some((Err(e), recv)),
448                }
449            })
450            .boxed();
451
452            let fused_recv_stream = recv_stream.fuse();
453            futures::pin_mut!(fused_recv_stream);
454
455            loop {
456                futures::select! {
457                    () = shutdown_signal => {
458                        tracing::error!("Shutting down relay send task");
459                        let _ = sender_to_network.broadcast(HotShotEvent::<TYPES>::Shutdown.into()).await;
460                        return;
461                    }
462                    event = fused_recv_stream.next() => {
463                        match event {
464                            Some(Ok(msg)) => {
465                                let mut state = state_out.write().await;
466                                let mut results = state.send_handler(
467                                    &msg,
468                                    &public_key,
469                                    &private_key,
470                                    &upgrade_lock,
471                                    consensus.clone(),
472                                    membership_coordinator.clone(),
473                                    Arc::clone(&network),
474                                ).await;
475                                results.reverse();
476                                while let Some(event) = results.pop() {
477                                    let _ = sender_to_network.broadcast(event.into()).await;
478                                }
479                            }
480                            Some(Err(e)) => {
481                                tracing::error!("Relay Task, send_handle, Error receiving event: {e:?}");
482                            }
483                            None => {
484                                tracing::info!("Relay Task, send_handle, Event stream closed");
485                                return;
486                            }
487                        }
488                    }
489                }
490            }
491        });
492
493        // spawn a task to listen on the newly created event stream,
494        // and broadcast the transformed events to the original internal event stream
495        let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
496        let recv_handle = spawn(async move {
497            futures::pin_mut!(shutdown_signal);
498
499            let network_recv_stream =
500                stream::unfold(receiver_from_network, |mut recv| async move {
501                    match recv.recv().await {
502                        Ok(event) => Some((Ok(event), recv)),
503                        Err(async_broadcast::RecvError::Closed) => None,
504                        Err(e) => Some((Err(e), recv)),
505                    }
506                });
507
508            let fused_network_recv_stream = network_recv_stream.boxed().fuse();
509            futures::pin_mut!(fused_network_recv_stream);
510
511            loop {
512                futures::select! {
513                    () = shutdown_signal => {
514                        tracing::error!("Shutting down relay receive task");
515                        return;
516                    }
517                    event = fused_network_recv_stream.next() => {
518                        match event {
519                            Some(Ok(msg)) => {
520                                let mut state = state_in.write().await;
521                                let mut results = state.recv_handler(&msg).await;
522                                results.reverse();
523                                while let Some(event) = results.pop() {
524                                    let _ = original_sender.broadcast(event.into()).await;
525                                }
526                            }
527                            Some(Err(e)) => {
528                                tracing::error!("Relay Task, recv_handle, Error receiving event from network: {e:?}");
529                            }
530                            None => {
531                                tracing::info!("Relay Task, recv_handle, Network event stream closed");
532                                return;
533                            }
534                        }
535                    }
536                }
537            }
538        });
539
540        handle.network_registry.register(send_handle);
541        handle.network_registry.register(recv_handle);
542    }
543
544    /// Adds the `NetworkEventTaskState` tasks possibly modifying them as well.
545    fn add_network_event_tasks(&self, handle: &mut SystemContextHandle<TYPES, I, V>) {
546        let network = Arc::clone(&handle.network);
547
548        self.add_network_event_task(handle, network);
549    }
550
551    /// Adds a `NetworkEventTaskState` task. Can be reimplemented to modify its behaviour.
552    fn add_network_event_task(
553        &self,
554        handle: &mut SystemContextHandle<TYPES, I, V>,
555        channel: Arc<<I as NodeImplementation<TYPES>>::Network>,
556    ) {
557        add_network_event_task(handle, channel);
558    }
559}
560
561/// adds tasks for sending/receiving messages to/from the network.
562pub async fn add_network_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
563    handle: &mut SystemContextHandle<TYPES, I, V>,
564) {
565    add_network_message_and_request_receiver_tasks(handle).await;
566
567    add_network_event_tasks(handle);
568}
569
570/// Adds the `NetworkMessageTaskState` tasks and the request / receiver tasks.
571pub async fn add_network_message_and_request_receiver_tasks<
572    TYPES: NodeType,
573    I: NodeImplementation<TYPES>,
574    V: Versions,
575>(
576    handle: &mut SystemContextHandle<TYPES, I, V>,
577) {
578    let network = Arc::clone(&handle.network);
579
580    add_network_message_task(handle, &network);
581
582    add_request_network_task(handle).await;
583    add_response_task(handle);
584}
585
586/// Adds the `NetworkEventTaskState` tasks.
587pub fn add_network_event_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
588    handle: &mut SystemContextHandle<TYPES, I, V>,
589) {
590    add_network_event_task(handle, Arc::clone(&handle.network));
591}