hotshot/tasks/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides a number of tasks that run continuously
8
9/// Provides trait to create task states from a `SystemContextHandle`
10pub mod task_state;
11use std::{collections::BTreeMap, fmt::Debug, num::NonZeroUsize, sync::Arc, time::Duration};
12
13use async_broadcast::{broadcast, RecvError};
14use async_lock::RwLock;
15use async_trait::async_trait;
16use futures::{
17    future::{BoxFuture, FutureExt},
18    stream, StreamExt,
19};
20use hotshot_task::task::Task;
21#[cfg(feature = "rewind")]
22use hotshot_task_impls::rewind::RewindTaskState;
23use hotshot_task_impls::{
24    da::DaTaskState,
25    events::HotShotEvent,
26    network::{NetworkEventTaskState, NetworkMessageTaskState},
27    request::NetworkRequestState,
28    response::{run_response_task, NetworkResponseState},
29    transactions::TransactionTaskState,
30    upgrade::UpgradeTaskState,
31    vid::VidTaskState,
32    view_sync::ViewSyncTaskState,
33};
34use hotshot_types::{
35    consensus::{Consensus, OuterConsensus},
36    constants::EVENT_CHANNEL_SIZE,
37    message::{Message, UpgradeLock},
38    storage_metrics::StorageMetricsValue,
39    traits::{
40        network::ConnectedNetwork,
41        node_implementation::{ConsensusTime, NodeImplementation, NodeType},
42    },
43};
44use tokio::{spawn, time::sleep};
45use vbs::version::StaticVersionType;
46
47use crate::{
48    genesis_epoch_from_version, tasks::task_state::CreateTaskState, types::SystemContextHandle,
49    ConsensusApi, ConsensusMetricsValue, ConsensusTaskRegistry, EpochMembershipCoordinator,
50    HotShotConfig, HotShotInitializer, NetworkTaskRegistry, SignatureKey, StateSignatureKey,
51    SystemContext, Versions,
52};
53
54/// event for global event stream
55#[derive(Clone, Debug)]
56pub enum GlobalEvent {
57    /// shut everything down
58    Shutdown,
59    /// dummy (TODO delete later)
60    Dummy,
61}
62
63/// Add tasks for network requests and responses
64pub async fn add_request_network_task<
65    TYPES: NodeType,
66    I: NodeImplementation<TYPES>,
67    V: Versions,
68>(
69    handle: &mut SystemContextHandle<TYPES, I, V>,
70) {
71    let state = NetworkRequestState::<TYPES, I>::create_from(handle).await;
72
73    let task = Task::new(
74        state,
75        handle.internal_event_stream.0.clone(),
76        handle.internal_event_stream.1.activate_cloned(),
77    );
78    handle.consensus_registry.run_task(task);
79}
80
81/// Add a task which responds to requests on the network.
82pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
83    handle: &mut SystemContextHandle<TYPES, I, V>,
84) {
85    let state = NetworkResponseState::<TYPES, V>::new(
86        handle.hotshot.consensus(),
87        handle.membership_coordinator.clone(),
88        handle.public_key().clone(),
89        handle.private_key().clone(),
90        handle.hotshot.id,
91        handle.hotshot.upgrade_lock.clone(),
92    );
93    handle
94        .network_registry
95        .register(run_response_task::<TYPES, V>(
96            state,
97            handle.internal_event_stream.1.activate_cloned(),
98            handle.internal_event_stream.0.clone(),
99        ));
100}
101
102/// Add a task which updates our queue length metric at a set interval
103pub fn add_queue_len_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
104    handle: &mut SystemContextHandle<TYPES, I, V>,
105) {
106    let consensus = handle.hotshot.consensus();
107    let rx = handle.internal_event_stream.1.clone();
108    let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
109    let task_handle = spawn(async move {
110        futures::pin_mut!(shutdown_signal);
111        loop {
112            futures::select! {
113                () = shutdown_signal => {
114                    return;
115                },
116                () = sleep(Duration::from_millis(500)).fuse() => {
117                    consensus.read().await.metrics.internal_event_queue_len.set(rx.len());
118                }
119            }
120        }
121    });
122    handle.network_registry.register(task_handle);
123}
124
125/// Add the network task to handle messages and publish events.
126#[allow(clippy::missing_panics_doc)]
127pub fn add_network_message_task<
128    TYPES: NodeType,
129    I: NodeImplementation<TYPES>,
130    NET: ConnectedNetwork<TYPES::SignatureKey>,
131    V: Versions,
132>(
133    handle: &mut SystemContextHandle<TYPES, I, V>,
134    channel: &Arc<NET>,
135) {
136    let upgrade_lock = handle.hotshot.upgrade_lock.clone();
137
138    let network_state: NetworkMessageTaskState<TYPES, V> = NetworkMessageTaskState {
139        internal_event_stream: handle.internal_event_stream.0.clone(),
140        external_event_stream: handle.output_event_stream.0.clone(),
141        public_key: handle.public_key().clone(),
142        transactions_cache: lru::LruCache::new(NonZeroUsize::new(100_000).unwrap()),
143        upgrade_lock: upgrade_lock.clone(),
144        id: handle.hotshot.id,
145    };
146
147    let network = Arc::clone(channel);
148    let mut state = network_state.clone();
149    let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
150    let task_handle = spawn(async move {
151        futures::pin_mut!(shutdown_signal);
152
153        loop {
154            // Wait for one of the following to resolve:
155            futures::select! {
156                // Wait for a shutdown signal
157                () = shutdown_signal => {
158                    tracing::error!("Shutting down network message task");
159                    return;
160                }
161
162                // Wait for a message from the network
163                message = network.recv_message().fuse() => {
164                    // Make sure the message did not fail
165                    let Ok(message) = message else {
166                        continue;
167                    };
168
169                    // Deserialize the message
170                    let deserialized_message: Message<TYPES> = match upgrade_lock.deserialize(&message).await {
171                        Ok(message) => message,
172                        Err(e) => {
173                            tracing::error!("Failed to deserialize message: {:?}", e);
174                            continue;
175                        }
176                    };
177
178                    // Handle the message
179                    state.handle_message(deserialized_message).await;
180                }
181            }
182        }
183    });
184    handle.network_registry.register(task_handle);
185}
186
187/// Add the network task to handle events and send messages.
188pub fn add_network_event_task<
189    TYPES: NodeType,
190    I: NodeImplementation<TYPES>,
191    V: Versions,
192    NET: ConnectedNetwork<TYPES::SignatureKey>,
193>(
194    handle: &mut SystemContextHandle<TYPES, I, V>,
195    network: Arc<NET>,
196) {
197    let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState {
198        network,
199        view: TYPES::View::genesis(),
200        epoch: genesis_epoch_from_version::<V, TYPES>(),
201        membership_coordinator: handle.membership_coordinator.clone(),
202        storage: handle.storage(),
203        storage_metrics: handle.storage_metrics(),
204        consensus: OuterConsensus::new(handle.consensus()),
205        upgrade_lock: handle.hotshot.upgrade_lock.clone(),
206        transmit_tasks: BTreeMap::new(),
207        epoch_height: handle.epoch_height,
208        id: handle.hotshot.id,
209    };
210    let task = Task::new(
211        network_state,
212        handle.internal_event_stream.0.clone(),
213        handle.internal_event_stream.1.activate_cloned(),
214    );
215    handle.consensus_registry.run_task(task);
216}
217
218/// Adds consensus-related tasks to a `SystemContextHandle`.
219pub async fn add_consensus_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
220    handle: &mut SystemContextHandle<TYPES, I, V>,
221) {
222    handle.add_task(ViewSyncTaskState::<TYPES, V>::create_from(handle).await);
223    handle.add_task(VidTaskState::<TYPES, I, V>::create_from(handle).await);
224    handle.add_task(DaTaskState::<TYPES, I, V>::create_from(handle).await);
225    handle.add_task(TransactionTaskState::<TYPES, V>::create_from(handle).await);
226
227    {
228        let mut upgrade_certificate_lock = handle
229            .hotshot
230            .upgrade_lock
231            .decided_upgrade_certificate
232            .write()
233            .await;
234
235        // clear the loaded certificate if it's now outdated
236        if upgrade_certificate_lock
237            .as_ref()
238            .is_some_and(|cert| V::Base::VERSION >= cert.data.new_version)
239        {
240            tracing::warn!("Discarding loaded upgrade certificate due to version configuration.");
241            *upgrade_certificate_lock = None;
242        }
243    }
244
245    // only spawn the upgrade task if we are actually configured to perform an upgrade.
246    if V::Base::VERSION < V::Upgrade::VERSION {
247        tracing::warn!("Consensus was started with an upgrade configured. Spawning upgrade task.");
248        handle.add_task(UpgradeTaskState::<TYPES, V>::create_from(handle).await);
249    }
250
251    {
252        use hotshot_task_impls::{
253            consensus::ConsensusTaskState, quorum_proposal::QuorumProposalTaskState,
254            quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState,
255        };
256
257        handle.add_task(QuorumProposalTaskState::<TYPES, I, V>::create_from(handle).await);
258        handle.add_task(QuorumVoteTaskState::<TYPES, I, V>::create_from(handle).await);
259        handle.add_task(QuorumProposalRecvTaskState::<TYPES, I, V>::create_from(handle).await);
260        handle.add_task(ConsensusTaskState::<TYPES, I, V>::create_from(handle).await);
261    }
262    add_queue_len_task(handle);
263    #[cfg(feature = "rewind")]
264    handle.add_task(RewindTaskState::<TYPES>::create_from(&handle).await);
265}
266
267/// Creates a monitor for shutdown events.
268///
269/// # Returns
270/// A `BoxFuture<'static, ()>` that resolves when a `HotShotEvent::Shutdown` is detected.
271///
272/// # Usage
273/// Use in `select!` macros or similar constructs for graceful shutdowns:
274#[must_use]
275pub fn create_shutdown_event_monitor<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
276    handle: &SystemContextHandle<TYPES, I, V>,
277) -> BoxFuture<'static, ()> {
278    // Activate the cloned internal event stream
279    let mut event_stream = handle.internal_event_stream.1.activate_cloned();
280
281    // Create a future that completes when the `HotShotEvent::Shutdown` is received
282    async move {
283        loop {
284            match event_stream.recv_direct().await {
285                Ok(event) => {
286                    if matches!(event.as_ref(), HotShotEvent::Shutdown) {
287                        return;
288                    }
289                },
290                Err(RecvError::Closed) => {
291                    return;
292                },
293                Err(e) => {
294                    tracing::error!("Shutdown event monitor channel recv error: {}", e);
295                },
296            }
297        }
298    }
299    .boxed()
300}
301
302#[async_trait]
303/// Trait for intercepting and modifying messages between the network and consensus layers.
304///
305/// Consensus <-> [Byzantine logic layer] <-> Network
306pub trait EventTransformerState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
307where
308    Self: std::fmt::Debug + Send + Sync + 'static,
309{
310    /// modify incoming messages from the network
311    async fn recv_handler(&mut self, event: &HotShotEvent<TYPES>) -> Vec<HotShotEvent<TYPES>>;
312
313    /// modify outgoing messages from the network
314    async fn send_handler(
315        &mut self,
316        event: &HotShotEvent<TYPES>,
317        public_key: &TYPES::SignatureKey,
318        private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
319        upgrade_lock: &UpgradeLock<TYPES, V>,
320        consensus: Arc<RwLock<Consensus<TYPES>>>,
321    ) -> Vec<HotShotEvent<TYPES>>;
322
323    #[allow(clippy::too_many_arguments)]
324    /// Creates a `SystemContextHandle` with the given even transformer
325    async fn spawn_handle(
326        &'static mut self,
327        public_key: TYPES::SignatureKey,
328        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
329        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
330        nonce: u64,
331        config: HotShotConfig<TYPES>,
332        memberships: EpochMembershipCoordinator<TYPES>,
333        network: Arc<I::Network>,
334        initializer: HotShotInitializer<TYPES>,
335        consensus_metrics: ConsensusMetricsValue,
336        storage: I::Storage,
337        storage_metrics: StorageMetricsValue,
338    ) -> SystemContextHandle<TYPES, I, V> {
339        let epoch_height = config.epoch_height;
340
341        let hotshot = SystemContext::new(
342            public_key,
343            private_key,
344            state_private_key,
345            nonce,
346            config,
347            memberships.clone(),
348            network,
349            initializer,
350            consensus_metrics,
351            storage.clone(),
352            storage_metrics,
353        )
354        .await;
355        let consensus_registry = ConsensusTaskRegistry::new();
356        let network_registry = NetworkTaskRegistry::new();
357
358        let output_event_stream = hotshot.external_event_stream.clone();
359        let internal_event_stream = hotshot.internal_event_stream.clone();
360
361        let mut handle = SystemContextHandle {
362            consensus_registry,
363            network_registry,
364            output_event_stream: output_event_stream.clone(),
365            internal_event_stream: internal_event_stream.clone(),
366            hotshot: Arc::clone(&hotshot),
367            storage,
368            network: Arc::clone(&hotshot.network),
369            membership_coordinator: memberships.clone(),
370            epoch_height,
371        };
372
373        add_consensus_tasks::<TYPES, I, V>(&mut handle).await;
374        self.add_network_tasks(&mut handle).await;
375
376        handle
377    }
378
379    /// Add byzantine network tasks with the trait
380    #[allow(clippy::too_many_lines)]
381    async fn add_network_tasks(&'static mut self, handle: &mut SystemContextHandle<TYPES, I, V>) {
382        // channels between the task spawned in this function and the network tasks.
383        // with this, we can control exactly what events the network tasks see.
384
385        // channel to the network task
386        let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
387        // channel from the network task
388        let (network_task_sender, receiver_from_network) = broadcast(EVENT_CHANNEL_SIZE);
389        // create a copy of the original receiver
390        let (original_sender, original_receiver) = (
391            handle.internal_event_stream.0.clone(),
392            handle.internal_event_stream.1.activate_cloned(),
393        );
394
395        // replace the internal event stream with the one we just created,
396        // so that the network tasks are spawned with our channel.
397        let mut internal_event_stream = (
398            network_task_sender.clone(),
399            network_task_receiver.clone().deactivate(),
400        );
401        std::mem::swap(
402            &mut internal_event_stream,
403            &mut handle.internal_event_stream,
404        );
405
406        // spawn the network tasks with our newly-created channel
407        add_network_message_and_request_receiver_tasks(handle).await;
408        self.add_network_event_tasks(handle);
409
410        std::mem::swap(
411            &mut internal_event_stream,
412            &mut handle.internal_event_stream,
413        );
414
415        let state_in = Arc::new(RwLock::new(self));
416        let state_out = Arc::clone(&state_in);
417        // spawn a task to listen on the (original) internal event stream,
418        // and broadcast the transformed events to the replacement event stream we just created.
419        let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
420        let public_key = handle.public_key().clone();
421        let private_key = handle.private_key().clone();
422        let upgrade_lock = handle.hotshot.upgrade_lock.clone();
423        let consensus = Arc::clone(&handle.hotshot.consensus());
424        let send_handle = spawn(async move {
425            futures::pin_mut!(shutdown_signal);
426
427            let recv_stream = stream::unfold(original_receiver, |mut recv| async move {
428                match recv.recv().await {
429                    Ok(event) => Some((Ok(event), recv)),
430                    Err(async_broadcast::RecvError::Closed) => None,
431                    Err(e) => Some((Err(e), recv)),
432                }
433            })
434            .boxed();
435
436            let fused_recv_stream = recv_stream.fuse();
437            futures::pin_mut!(fused_recv_stream);
438
439            loop {
440                futures::select! {
441                    () = shutdown_signal => {
442                        tracing::error!("Shutting down relay send task");
443                        let _ = sender_to_network.broadcast(HotShotEvent::<TYPES>::Shutdown.into()).await;
444                        return;
445                    }
446                    event = fused_recv_stream.next() => {
447                        match event {
448                            Some(Ok(msg)) => {
449                                let mut state = state_out.write().await;
450                                let mut results = state.send_handler(
451                                    &msg,
452                                    &public_key,
453                                    &private_key,
454                                    &upgrade_lock,
455                                    Arc::clone(&consensus)
456                                ).await;
457                                results.reverse();
458                                while let Some(event) = results.pop() {
459                                    let _ = sender_to_network.broadcast(event.into()).await;
460                                }
461                            }
462                            Some(Err(e)) => {
463                                tracing::error!("Relay Task, send_handle, Error receiving event: {e:?}");
464                            }
465                            None => {
466                                tracing::info!("Relay Task, send_handle, Event stream closed");
467                                return;
468                            }
469                        }
470                    }
471                }
472            }
473        });
474
475        // spawn a task to listen on the newly created event stream,
476        // and broadcast the transformed events to the original internal event stream
477        let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
478        let recv_handle = spawn(async move {
479            futures::pin_mut!(shutdown_signal);
480
481            let network_recv_stream =
482                stream::unfold(receiver_from_network, |mut recv| async move {
483                    match recv.recv().await {
484                        Ok(event) => Some((Ok(event), recv)),
485                        Err(async_broadcast::RecvError::Closed) => None,
486                        Err(e) => Some((Err(e), recv)),
487                    }
488                });
489
490            let fused_network_recv_stream = network_recv_stream.boxed().fuse();
491            futures::pin_mut!(fused_network_recv_stream);
492
493            loop {
494                futures::select! {
495                    () = shutdown_signal => {
496                        tracing::error!("Shutting down relay receive task");
497                        return;
498                    }
499                    event = fused_network_recv_stream.next() => {
500                        match event {
501                            Some(Ok(msg)) => {
502                                let mut state = state_in.write().await;
503                                let mut results = state.recv_handler(&msg).await;
504                                results.reverse();
505                                while let Some(event) = results.pop() {
506                                    let _ = original_sender.broadcast(event.into()).await;
507                                }
508                            }
509                            Some(Err(e)) => {
510                                tracing::error!("Relay Task, recv_handle, Error receiving event from network: {e:?}");
511                            }
512                            None => {
513                                tracing::info!("Relay Task, recv_handle, Network event stream closed");
514                                return;
515                            }
516                        }
517                    }
518                }
519            }
520        });
521
522        handle.network_registry.register(send_handle);
523        handle.network_registry.register(recv_handle);
524    }
525
526    /// Adds the `NetworkEventTaskState` tasks possibly modifying them as well.
527    fn add_network_event_tasks(&self, handle: &mut SystemContextHandle<TYPES, I, V>) {
528        let network = Arc::clone(&handle.network);
529
530        self.add_network_event_task(handle, network);
531    }
532
533    /// Adds a `NetworkEventTaskState` task. Can be reimplemented to modify its behaviour.
534    fn add_network_event_task(
535        &self,
536        handle: &mut SystemContextHandle<TYPES, I, V>,
537        channel: Arc<<I as NodeImplementation<TYPES>>::Network>,
538    ) {
539        add_network_event_task(handle, channel);
540    }
541}
542
543/// adds tasks for sending/receiving messages to/from the network.
544pub async fn add_network_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
545    handle: &mut SystemContextHandle<TYPES, I, V>,
546) {
547    add_network_message_and_request_receiver_tasks(handle).await;
548
549    add_network_event_tasks(handle);
550}
551
552/// Adds the `NetworkMessageTaskState` tasks and the request / receiver tasks.
553pub async fn add_network_message_and_request_receiver_tasks<
554    TYPES: NodeType,
555    I: NodeImplementation<TYPES>,
556    V: Versions,
557>(
558    handle: &mut SystemContextHandle<TYPES, I, V>,
559) {
560    let network = Arc::clone(&handle.network);
561
562    add_network_message_task(handle, &network);
563
564    add_request_network_task(handle).await;
565    add_response_task(handle);
566}
567
568/// Adds the `NetworkEventTaskState` tasks.
569pub fn add_network_event_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
570    handle: &mut SystemContextHandle<TYPES, I, V>,
571) {
572    add_network_event_task(handle, Arc::clone(&handle.network));
573}