hotshot/types/
handle.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides an event-streaming handle for a [`SystemContext`] running in the background
8
9use std::sync::Arc;
10
11use anyhow::{anyhow, Context, Result};
12use async_broadcast::{InactiveReceiver, Receiver, Sender};
13use async_lock::RwLock;
14use committable::{Commitment, Committable};
15use futures::Stream;
16use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry, Task, TaskState};
17use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
18use hotshot_types::{
19    consensus::Consensus,
20    data::{Leaf2, QuorumProposalWrapper},
21    epoch_membership::EpochMembershipCoordinator,
22    error::HotShotError,
23    message::{Message, MessageKind, Proposal, RecipientList},
24    request_response::ProposalRequestPayload,
25    storage_metrics::StorageMetricsValue,
26    traits::{
27        consensus_api::ConsensusApi,
28        network::{BroadcastDelay, ConnectedNetwork, Topic},
29        node_implementation::NodeType,
30        signature_key::SignatureKey,
31    },
32};
33use tracing::instrument;
34
35use crate::{traits::NodeImplementation, types::Event, SystemContext, Versions};
36
37/// Event streaming handle for a [`SystemContext`] instance running in the background
38///
39/// This type provides the means to message and interact with a background [`SystemContext`] instance,
40/// allowing the ability to receive [`Event`]s from it, send transactions to it, and interact with
41/// the underlying storage.
42pub struct SystemContextHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
43    /// The [sender](Sender) and [receiver](Receiver),
44    /// to allow the application to communicate with HotShot.
45    pub(crate) output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
46
47    /// access to the internal event stream, in case we need to, say, shut something down
48    #[allow(clippy::type_complexity)]
49    pub(crate) internal_event_stream: (
50        Sender<Arc<HotShotEvent<TYPES>>>,
51        InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
52    ),
53    /// registry for controlling consensus tasks
54    pub(crate) consensus_registry: ConsensusTaskRegistry<HotShotEvent<TYPES>>,
55
56    /// registry for controlling network tasks
57    pub(crate) network_registry: NetworkTaskRegistry,
58
59    /// Internal reference to the underlying [`SystemContext`]
60    pub hotshot: Arc<SystemContext<TYPES, I, V>>,
61
62    /// Reference to the internal storage for consensus datum.
63    pub(crate) storage: I::Storage,
64
65    /// Networks used by the instance of hotshot
66    pub network: Arc<I::Network>,
67
68    /// Memberships used by consensus
69    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
70
71    /// Number of blocks in an epoch, zero means there are no epochs
72    pub epoch_height: u64,
73}
74
75impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
76    SystemContextHandle<TYPES, I, V>
77{
78    /// Adds a hotshot consensus-related task to the `SystemContextHandle`.
79    pub fn add_task<S: TaskState<Event = HotShotEvent<TYPES>> + 'static>(&mut self, task_state: S) {
80        let task = Task::new(
81            task_state,
82            self.internal_event_stream.0.clone(),
83            self.internal_event_stream.1.activate_cloned(),
84        );
85
86        self.consensus_registry.run_task(task);
87    }
88
89    /// obtains a stream to expose to the user
90    pub fn event_stream(&self) -> impl Stream<Item = Event<TYPES>> {
91        self.output_event_stream.1.activate_cloned()
92    }
93
94    /// Message other participants with a serialized message from the application
95    /// Receivers of this message will get an `Event::ExternalMessageReceived` via
96    /// the event stream.
97    ///
98    /// # Errors
99    /// Errors if serializing the request fails, or the request fails to be sent
100    pub async fn send_external_message(
101        &self,
102        msg: Vec<u8>,
103        recipients: RecipientList<TYPES::SignatureKey>,
104    ) -> Result<()> {
105        let message = Message {
106            sender: self.public_key().clone(),
107            kind: MessageKind::External(msg),
108        };
109        let serialized_message = self.hotshot.upgrade_lock.serialize(&message).await?;
110
111        match recipients {
112            RecipientList::Broadcast => {
113                self.network
114                    .broadcast_message(serialized_message, Topic::Global, BroadcastDelay::None)
115                    .await?;
116            },
117            RecipientList::Direct(recipient) => {
118                self.network
119                    .direct_message(serialized_message, recipient)
120                    .await?;
121            },
122            RecipientList::Many(recipients) => {
123                self.network
124                    .da_broadcast_message(serialized_message, recipients, BroadcastDelay::None)
125                    .await?;
126            },
127        }
128        Ok(())
129    }
130
131    /// Request a proposal from the all other nodes.  Will block until some node
132    /// returns a valid proposal with the requested commitment.  If nobody has the
133    /// proposal this will block forever
134    ///
135    /// # Errors
136    /// Errors if signing the request for proposal fails
137    pub fn request_proposal(
138        &self,
139        view: TYPES::View,
140        leaf_commitment: Commitment<Leaf2<TYPES>>,
141    ) -> Result<impl futures::Future<Output = Result<Proposal<TYPES, QuorumProposalWrapper<TYPES>>>>>
142    {
143        // We need to be able to sign this request before submitting it to the network. Compute the
144        // payload first.
145        let signed_proposal_request = ProposalRequestPayload {
146            view_number: view,
147            key: self.public_key().clone(),
148        };
149
150        // Finally, compute the signature for the payload.
151        let signature = TYPES::SignatureKey::sign(
152            self.private_key(),
153            signed_proposal_request.commit().as_ref(),
154        )?;
155
156        let mut receiver = self.internal_event_stream.1.activate_cloned();
157        let sender = self.internal_event_stream.0.clone();
158        Ok(async move {
159            // First, broadcast that we need a proposal
160            broadcast_event(
161                HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
162                &sender,
163            )
164            .await;
165            while let std::result::Result::Ok(event) = receiver.recv_direct().await {
166                if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
167                    let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
168                    if leaf.view_number() == view && leaf.commit() == leaf_commitment {
169                        return Ok(quorum_proposal.clone());
170                    }
171                }
172            }
173            Err(anyhow!("No proposal found"))
174        })
175    }
176
177    /// HACK so we can know the types when running tests...
178    /// there are two cleaner solutions:
179    /// - make the stream generic and in nodetypes or nodeimpelmentation
180    /// - type wrapper
181    #[must_use]
182    pub fn event_stream_known_impl(&self) -> Receiver<Event<TYPES>> {
183        self.output_event_stream.1.activate_cloned()
184    }
185
186    /// HACK so we can create dependency tasks when running tests
187    #[must_use]
188    pub fn internal_event_stream_sender(&self) -> Sender<Arc<HotShotEvent<TYPES>>> {
189        self.internal_event_stream.0.clone()
190    }
191
192    /// HACK so we can know the types when running tests...
193    /// there are two cleaner solutions:
194    /// - make the stream generic and in nodetypes or nodeimpelmentation
195    /// - type wrapper
196    ///
197    /// NOTE: this is only used for sanity checks in our tests
198    #[must_use]
199    pub fn internal_event_stream_receiver_known_impl(&self) -> Receiver<Arc<HotShotEvent<TYPES>>> {
200        self.internal_event_stream.1.activate_cloned()
201    }
202
203    /// Get the last decided validated state of the [`SystemContext`] instance.
204    ///
205    /// # Panics
206    /// If the internal consensus is in an inconsistent state.
207    pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
208        self.hotshot.decided_state().await
209    }
210
211    /// Get the validated state from a given `view`.
212    ///
213    /// Returns the requested state, if the [`SystemContext`] is tracking this view. Consensus
214    /// tracks views that have not yet been decided but could be in the future. This function may
215    /// return [`None`] if the requested view has already been decided (but see
216    /// [`decided_state`](Self::decided_state)) or if there is no path for the requested
217    /// view to ever be decided.
218    pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
219        self.hotshot.state(view).await
220    }
221
222    /// Get the last decided leaf of the [`SystemContext`] instance.
223    ///
224    /// # Panics
225    /// If the internal consensus is in an inconsistent state.
226    pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
227        self.hotshot.decided_leaf().await
228    }
229
230    /// Tries to get the most recent decided leaf, returning instantly
231    /// if we can't acquire the lock.
232    ///
233    /// # Panics
234    /// Panics if internal consensus is in an inconsistent state.
235    #[must_use]
236    pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
237        self.hotshot.try_decided_leaf()
238    }
239
240    /// Submits a transaction to the backing [`SystemContext`] instance.
241    ///
242    /// The current node broadcasts the transaction to all nodes on the network.
243    ///
244    /// # Errors
245    ///
246    /// Will return a [`HotShotError`] if some error occurs in the underlying
247    /// [`SystemContext`] instance.
248    pub async fn submit_transaction(
249        &self,
250        tx: TYPES::Transaction,
251    ) -> Result<(), HotShotError<TYPES>> {
252        self.hotshot.publish_transaction_async(tx).await
253    }
254
255    /// Get the underlying consensus state for this [`SystemContext`]
256    #[must_use]
257    pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
258        self.hotshot.consensus()
259    }
260
261    /// Shut down the inner hotshot and wait until all background threads are closed.
262    pub async fn shut_down(&mut self) {
263        // this is required because `SystemContextHandle` holds an inactive receiver and
264        // `broadcast_direct` below can wait indefinitely
265        self.internal_event_stream.0.set_await_active(false);
266        let _ = self
267            .internal_event_stream
268            .0
269            .broadcast_direct(Arc::new(HotShotEvent::Shutdown))
270            .await
271            .inspect_err(|err| tracing::error!("Failed to send shutdown event: {err}"));
272
273        tracing::error!("Shutting down the network!");
274        self.hotshot.network.shut_down().await;
275
276        tracing::error!("Shutting down network tasks!");
277        self.network_registry.shutdown().await;
278
279        tracing::error!("Shutting down consensus!");
280        self.consensus_registry.shutdown().await;
281    }
282
283    /// return the timeout for a view of the underlying `SystemContext`
284    #[must_use]
285    pub fn next_view_timeout(&self) -> u64 {
286        self.hotshot.next_view_timeout()
287    }
288
289    /// Wrapper for `HotShotConsensusApi`'s `leader` function
290    ///
291    /// # Errors
292    /// Returns an error if the leader cannot be calculated
293    #[allow(clippy::unused_async)] // async for API compatibility reasons
294    pub async fn leader(
295        &self,
296        view_number: TYPES::View,
297        epoch_number: Option<TYPES::Epoch>,
298    ) -> Result<TYPES::SignatureKey> {
299        self.hotshot
300            .membership_coordinator
301            .membership_for_epoch(epoch_number)
302            .await?
303            .leader(view_number)
304            .await
305            .context("Failed to lookup leader")
306    }
307
308    // Below is for testing only:
309    /// Wrapper to get this node's public key
310    #[cfg(feature = "hotshot-testing")]
311    #[must_use]
312    pub fn public_key(&self) -> TYPES::SignatureKey {
313        self.hotshot.public_key.clone()
314    }
315
316    /// Get the sender side of the external event stream for testing purpose
317    #[cfg(feature = "hotshot-testing")]
318    #[must_use]
319    pub fn external_channel_sender(&self) -> Sender<Event<TYPES>> {
320        self.output_event_stream.0.clone()
321    }
322
323    /// Get the sender side of the internal event stream for testing purpose
324    #[cfg(feature = "hotshot-testing")]
325    #[must_use]
326    pub fn internal_channel_sender(&self) -> Sender<Arc<HotShotEvent<TYPES>>> {
327        self.internal_event_stream.0.clone()
328    }
329
330    /// Wrapper to get the view number this node is on.
331    #[instrument(skip_all, target = "SystemContextHandle", fields(id = self.hotshot.id))]
332    pub async fn cur_view(&self) -> TYPES::View {
333        self.hotshot.consensus.read().await.cur_view()
334    }
335
336    /// Wrapper to get the epoch number this node is on.
337    #[instrument(skip_all, target = "SystemContextHandle", fields(id = self.hotshot.id))]
338    pub async fn cur_epoch(&self) -> Option<TYPES::Epoch> {
339        self.hotshot.consensus.read().await.cur_epoch()
340    }
341
342    /// Provides a reference to the underlying storage for this [`SystemContext`], allowing access to
343    /// historical data
344    #[must_use]
345    pub fn storage(&self) -> I::Storage {
346        self.storage.clone()
347    }
348
349    /// Returns a reference to the storage metrics
350    #[must_use]
351    pub fn storage_metrics(&self) -> Arc<StorageMetricsValue> {
352        Arc::clone(&self.hotshot.storage_metrics)
353    }
354}