hotshot/types/
handle.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides an event-streaming handle for a [`SystemContext`] running in the background
8
9use std::sync::Arc;
10
11use anyhow::{anyhow, Context, Result};
12use async_broadcast::{InactiveReceiver, Receiver, Sender};
13use async_lock::RwLock;
14use committable::{Commitment, Committable};
15use futures::Stream;
16use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry, Task, TaskState};
17use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
18use hotshot_types::{
19    consensus::Consensus,
20    data::{Leaf2, QuorumProposalWrapper},
21    epoch_membership::EpochMembershipCoordinator,
22    error::HotShotError,
23    message::{Message, MessageKind, Proposal, RecipientList},
24    request_response::ProposalRequestPayload,
25    storage_metrics::StorageMetricsValue,
26    traits::{
27        consensus_api::ConsensusApi,
28        network::{BroadcastDelay, ConnectedNetwork, Topic},
29        node_implementation::{ConsensusTime, NodeType},
30        signature_key::SignatureKey,
31    },
32    vote::HasViewNumber,
33};
34use tracing::instrument;
35
36use crate::{traits::NodeImplementation, types::Event, SystemContext, Versions};
37
38/// Event streaming handle for a [`SystemContext`] instance running in the background
39///
40/// This type provides the means to message and interact with a background [`SystemContext`] instance,
41/// allowing the ability to receive [`Event`]s from it, send transactions to it, and interact with
42/// the underlying storage.
43pub struct SystemContextHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
44    /// The [sender](Sender) and [receiver](Receiver),
45    /// to allow the application to communicate with HotShot.
46    pub(crate) output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
47
48    /// access to the internal event stream, in case we need to, say, shut something down
49    #[allow(clippy::type_complexity)]
50    pub(crate) internal_event_stream: (
51        Sender<Arc<HotShotEvent<TYPES>>>,
52        InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
53    ),
54    /// registry for controlling consensus tasks
55    pub(crate) consensus_registry: ConsensusTaskRegistry<HotShotEvent<TYPES>>,
56
57    /// registry for controlling network tasks
58    pub(crate) network_registry: NetworkTaskRegistry,
59
60    /// Internal reference to the underlying [`SystemContext`]
61    pub hotshot: Arc<SystemContext<TYPES, I, V>>,
62
63    /// Reference to the internal storage for consensus datum.
64    pub(crate) storage: I::Storage,
65
66    /// Networks used by the instance of hotshot
67    pub network: Arc<I::Network>,
68
69    /// Memberships used by consensus
70    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
71
72    /// Number of blocks in an epoch, zero means there are no epochs
73    pub epoch_height: u64,
74}
75
76impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
77    SystemContextHandle<TYPES, I, V>
78{
79    /// Adds a hotshot consensus-related task to the `SystemContextHandle`.
80    pub fn add_task<S: TaskState<Event = HotShotEvent<TYPES>> + 'static>(&mut self, task_state: S) {
81        let task = Task::new(
82            task_state,
83            self.internal_event_stream.0.clone(),
84            self.internal_event_stream.1.activate_cloned(),
85        );
86
87        self.consensus_registry.run_task(task);
88    }
89
90    /// obtains a stream to expose to the user
91    pub fn event_stream(&self) -> impl Stream<Item = Event<TYPES>> {
92        self.output_event_stream.1.activate_cloned()
93    }
94
95    /// Message other participants with a serialized message from the application
96    /// Receivers of this message will get an `Event::ExternalMessageReceived` via
97    /// the event stream.
98    ///
99    /// # Errors
100    /// Errors if serializing the request fails, or the request fails to be sent
101    pub async fn send_external_message(
102        &self,
103        msg: Vec<u8>,
104        recipients: RecipientList<TYPES::SignatureKey>,
105    ) -> Result<()> {
106        let message = Message {
107            sender: self.public_key().clone(),
108            kind: MessageKind::External(msg),
109        };
110        let view: TYPES::View = message.view_number();
111        let serialized_message = self.hotshot.upgrade_lock.serialize(&message).await?;
112
113        match recipients {
114            RecipientList::Broadcast => {
115                self.network
116                    .broadcast_message(
117                        view.u64().into(),
118                        serialized_message,
119                        Topic::Global,
120                        BroadcastDelay::None,
121                    )
122                    .await?;
123            },
124            RecipientList::Direct(recipient) => {
125                self.network
126                    .direct_message(view.u64().into(), serialized_message, recipient)
127                    .await?;
128            },
129            RecipientList::Many(recipients) => {
130                self.network
131                    .da_broadcast_message(
132                        view.u64().into(),
133                        serialized_message,
134                        recipients,
135                        BroadcastDelay::None,
136                    )
137                    .await?;
138            },
139        }
140        Ok(())
141    }
142
143    /// Request a proposal from the all other nodes.  Will block until some node
144    /// returns a valid proposal with the requested commitment.  If nobody has the
145    /// proposal this will block forever
146    ///
147    /// # Errors
148    /// Errors if signing the request for proposal fails
149    pub fn request_proposal(
150        &self,
151        view: TYPES::View,
152        leaf_commitment: Commitment<Leaf2<TYPES>>,
153    ) -> Result<impl futures::Future<Output = Result<Proposal<TYPES, QuorumProposalWrapper<TYPES>>>>>
154    {
155        // We need to be able to sign this request before submitting it to the network. Compute the
156        // payload first.
157        let signed_proposal_request = ProposalRequestPayload {
158            view_number: view,
159            key: self.public_key().clone(),
160        };
161
162        // Finally, compute the signature for the payload.
163        let signature = TYPES::SignatureKey::sign(
164            self.private_key(),
165            signed_proposal_request.commit().as_ref(),
166        )?;
167
168        let mut receiver = self.internal_event_stream.1.activate_cloned();
169        let sender = self.internal_event_stream.0.clone();
170        Ok(async move {
171            // First, broadcast that we need a proposal
172            broadcast_event(
173                HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
174                &sender,
175            )
176            .await;
177            while let std::result::Result::Ok(event) = receiver.recv_direct().await {
178                if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
179                    let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
180                    if leaf.view_number() == view && leaf.commit() == leaf_commitment {
181                        return Ok(quorum_proposal.clone());
182                    }
183                }
184            }
185            Err(anyhow!("No proposal found"))
186        })
187    }
188
189    /// HACK so we can know the types when running tests...
190    /// there are two cleaner solutions:
191    /// - make the stream generic and in nodetypes or nodeimpelmentation
192    /// - type wrapper
193    #[must_use]
194    pub fn event_stream_known_impl(&self) -> Receiver<Event<TYPES>> {
195        self.output_event_stream.1.activate_cloned()
196    }
197
198    /// HACK so we can create dependency tasks when running tests
199    #[must_use]
200    pub fn internal_event_stream_sender(&self) -> Sender<Arc<HotShotEvent<TYPES>>> {
201        self.internal_event_stream.0.clone()
202    }
203
204    /// HACK so we can know the types when running tests...
205    /// there are two cleaner solutions:
206    /// - make the stream generic and in nodetypes or nodeimpelmentation
207    /// - type wrapper
208    ///
209    /// NOTE: this is only used for sanity checks in our tests
210    #[must_use]
211    pub fn internal_event_stream_receiver_known_impl(&self) -> Receiver<Arc<HotShotEvent<TYPES>>> {
212        self.internal_event_stream.1.activate_cloned()
213    }
214
215    /// Get the last decided validated state of the [`SystemContext`] instance.
216    ///
217    /// # Panics
218    /// If the internal consensus is in an inconsistent state.
219    pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
220        self.hotshot.decided_state().await
221    }
222
223    /// Get the validated state from a given `view`.
224    ///
225    /// Returns the requested state, if the [`SystemContext`] is tracking this view. Consensus
226    /// tracks views that have not yet been decided but could be in the future. This function may
227    /// return [`None`] if the requested view has already been decided (but see
228    /// [`decided_state`](Self::decided_state)) or if there is no path for the requested
229    /// view to ever be decided.
230    pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
231        self.hotshot.state(view).await
232    }
233
234    /// Get the last decided leaf of the [`SystemContext`] instance.
235    ///
236    /// # Panics
237    /// If the internal consensus is in an inconsistent state.
238    pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
239        self.hotshot.decided_leaf().await
240    }
241
242    /// Tries to get the most recent decided leaf, returning instantly
243    /// if we can't acquire the lock.
244    ///
245    /// # Panics
246    /// Panics if internal consensus is in an inconsistent state.
247    #[must_use]
248    pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
249        self.hotshot.try_decided_leaf()
250    }
251
252    /// Submits a transaction to the backing [`SystemContext`] instance.
253    ///
254    /// The current node broadcasts the transaction to all nodes on the network.
255    ///
256    /// # Errors
257    ///
258    /// Will return a [`HotShotError`] if some error occurs in the underlying
259    /// [`SystemContext`] instance.
260    pub async fn submit_transaction(
261        &self,
262        tx: TYPES::Transaction,
263    ) -> Result<(), HotShotError<TYPES>> {
264        self.hotshot.publish_transaction_async(tx).await
265    }
266
267    /// Get the underlying consensus state for this [`SystemContext`]
268    #[must_use]
269    pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
270        self.hotshot.consensus()
271    }
272
273    /// Shut down the inner hotshot and wait until all background threads are closed.
274    pub async fn shut_down(&mut self) {
275        // this is required because `SystemContextHandle` holds an inactive receiver and
276        // `broadcast_direct` below can wait indefinitely
277        self.internal_event_stream.0.set_await_active(false);
278        let _ = self
279            .internal_event_stream
280            .0
281            .broadcast_direct(Arc::new(HotShotEvent::Shutdown))
282            .await
283            .inspect_err(|err| tracing::error!("Failed to send shutdown event: {err}"));
284
285        tracing::error!("Shutting down the network!");
286        self.hotshot.network.shut_down().await;
287
288        tracing::error!("Shutting down network tasks!");
289        self.network_registry.shutdown().await;
290
291        tracing::error!("Shutting down consensus!");
292        self.consensus_registry.shutdown().await;
293    }
294
295    /// return the timeout for a view of the underlying `SystemContext`
296    #[must_use]
297    pub fn next_view_timeout(&self) -> u64 {
298        self.hotshot.next_view_timeout()
299    }
300
301    /// Wrapper for `HotShotConsensusApi`'s `leader` function
302    ///
303    /// # Errors
304    /// Returns an error if the leader cannot be calculated
305    #[allow(clippy::unused_async)] // async for API compatibility reasons
306    pub async fn leader(
307        &self,
308        view_number: TYPES::View,
309        epoch_number: Option<TYPES::Epoch>,
310    ) -> Result<TYPES::SignatureKey> {
311        self.hotshot
312            .membership_coordinator
313            .membership_for_epoch(epoch_number)
314            .await?
315            .leader(view_number)
316            .await
317            .context("Failed to lookup leader")
318    }
319
320    // Below is for testing only:
321    /// Wrapper to get this node's public key
322    #[cfg(feature = "hotshot-testing")]
323    #[must_use]
324    pub fn public_key(&self) -> TYPES::SignatureKey {
325        self.hotshot.public_key.clone()
326    }
327
328    /// Get the sender side of the external event stream for testing purpose
329    #[cfg(feature = "hotshot-testing")]
330    #[must_use]
331    pub fn external_channel_sender(&self) -> Sender<Event<TYPES>> {
332        self.output_event_stream.0.clone()
333    }
334
335    /// Get the sender side of the internal event stream for testing purpose
336    #[cfg(feature = "hotshot-testing")]
337    #[must_use]
338    pub fn internal_channel_sender(&self) -> Sender<Arc<HotShotEvent<TYPES>>> {
339        self.internal_event_stream.0.clone()
340    }
341
342    /// Wrapper to get the view number this node is on.
343    #[instrument(skip_all, target = "SystemContextHandle", fields(id = self.hotshot.id))]
344    pub async fn cur_view(&self) -> TYPES::View {
345        self.hotshot.consensus.read().await.cur_view()
346    }
347
348    /// Wrapper to get the epoch number this node is on.
349    #[instrument(skip_all, target = "SystemContextHandle", fields(id = self.hotshot.id))]
350    pub async fn cur_epoch(&self) -> Option<TYPES::Epoch> {
351        self.hotshot.consensus.read().await.cur_epoch()
352    }
353
354    /// Provides a reference to the underlying storage for this [`SystemContext`], allowing access to
355    /// historical data
356    #[must_use]
357    pub fn storage(&self) -> I::Storage {
358        self.storage.clone()
359    }
360
361    /// Returns a reference to the storage metrics
362    #[must_use]
363    pub fn storage_metrics(&self) -> Arc<StorageMetricsValue> {
364        Arc::clone(&self.hotshot.storage_metrics)
365    }
366}