1use std::sync::Arc;
10
11use anyhow::{anyhow, Context, Result};
12use async_broadcast::{InactiveReceiver, Receiver, Sender};
13use async_lock::RwLock;
14use committable::{Commitment, Committable};
15use futures::Stream;
16use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry, Task, TaskState};
17use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
18use hotshot_types::{
19 consensus::Consensus,
20 data::{Leaf2, QuorumProposalWrapper},
21 epoch_membership::EpochMembershipCoordinator,
22 error::HotShotError,
23 message::{Message, MessageKind, Proposal, RecipientList},
24 request_response::ProposalRequestPayload,
25 storage_metrics::StorageMetricsValue,
26 traits::{
27 consensus_api::ConsensusApi,
28 network::{BroadcastDelay, ConnectedNetwork, Topic},
29 node_implementation::NodeType,
30 signature_key::SignatureKey,
31 },
32};
33use tracing::instrument;
34
35use crate::{traits::NodeImplementation, types::Event, SystemContext, Versions};
36
37pub struct SystemContextHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
43 pub(crate) output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
46
47 #[allow(clippy::type_complexity)]
49 pub(crate) internal_event_stream: (
50 Sender<Arc<HotShotEvent<TYPES>>>,
51 InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
52 ),
53 pub(crate) consensus_registry: ConsensusTaskRegistry<HotShotEvent<TYPES>>,
55
56 pub(crate) network_registry: NetworkTaskRegistry,
58
59 pub hotshot: Arc<SystemContext<TYPES, I, V>>,
61
62 pub(crate) storage: I::Storage,
64
65 pub network: Arc<I::Network>,
67
68 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
70
71 pub epoch_height: u64,
73}
74
75impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
76 SystemContextHandle<TYPES, I, V>
77{
78 pub fn add_task<S: TaskState<Event = HotShotEvent<TYPES>> + 'static>(&mut self, task_state: S) {
80 let task = Task::new(
81 task_state,
82 self.internal_event_stream.0.clone(),
83 self.internal_event_stream.1.activate_cloned(),
84 );
85
86 self.consensus_registry.run_task(task);
87 }
88
89 pub fn event_stream(&self) -> impl Stream<Item = Event<TYPES>> {
91 self.output_event_stream.1.activate_cloned()
92 }
93
94 pub async fn send_external_message(
101 &self,
102 msg: Vec<u8>,
103 recipients: RecipientList<TYPES::SignatureKey>,
104 ) -> Result<()> {
105 let message = Message {
106 sender: self.public_key().clone(),
107 kind: MessageKind::External(msg),
108 };
109 let serialized_message = self.hotshot.upgrade_lock.serialize(&message).await?;
110
111 match recipients {
112 RecipientList::Broadcast => {
113 self.network
114 .broadcast_message(serialized_message, Topic::Global, BroadcastDelay::None)
115 .await?;
116 },
117 RecipientList::Direct(recipient) => {
118 self.network
119 .direct_message(serialized_message, recipient)
120 .await?;
121 },
122 RecipientList::Many(recipients) => {
123 self.network
124 .da_broadcast_message(serialized_message, recipients, BroadcastDelay::None)
125 .await?;
126 },
127 }
128 Ok(())
129 }
130
131 pub fn request_proposal(
138 &self,
139 view: TYPES::View,
140 leaf_commitment: Commitment<Leaf2<TYPES>>,
141 ) -> Result<impl futures::Future<Output = Result<Proposal<TYPES, QuorumProposalWrapper<TYPES>>>>>
142 {
143 let signed_proposal_request = ProposalRequestPayload {
146 view_number: view,
147 key: self.public_key().clone(),
148 };
149
150 let signature = TYPES::SignatureKey::sign(
152 self.private_key(),
153 signed_proposal_request.commit().as_ref(),
154 )?;
155
156 let mut receiver = self.internal_event_stream.1.activate_cloned();
157 let sender = self.internal_event_stream.0.clone();
158 Ok(async move {
159 broadcast_event(
161 HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
162 &sender,
163 )
164 .await;
165 while let std::result::Result::Ok(event) = receiver.recv_direct().await {
166 if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
167 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
168 if leaf.view_number() == view && leaf.commit() == leaf_commitment {
169 return Ok(quorum_proposal.clone());
170 }
171 }
172 }
173 Err(anyhow!("No proposal found"))
174 })
175 }
176
177 #[must_use]
182 pub fn event_stream_known_impl(&self) -> Receiver<Event<TYPES>> {
183 self.output_event_stream.1.activate_cloned()
184 }
185
186 #[must_use]
188 pub fn internal_event_stream_sender(&self) -> Sender<Arc<HotShotEvent<TYPES>>> {
189 self.internal_event_stream.0.clone()
190 }
191
192 #[must_use]
199 pub fn internal_event_stream_receiver_known_impl(&self) -> Receiver<Arc<HotShotEvent<TYPES>>> {
200 self.internal_event_stream.1.activate_cloned()
201 }
202
203 pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
208 self.hotshot.decided_state().await
209 }
210
211 pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
219 self.hotshot.state(view).await
220 }
221
222 pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
227 self.hotshot.decided_leaf().await
228 }
229
230 #[must_use]
236 pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
237 self.hotshot.try_decided_leaf()
238 }
239
240 pub async fn submit_transaction(
249 &self,
250 tx: TYPES::Transaction,
251 ) -> Result<(), HotShotError<TYPES>> {
252 self.hotshot.publish_transaction_async(tx).await
253 }
254
255 #[must_use]
257 pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
258 self.hotshot.consensus()
259 }
260
261 pub async fn shut_down(&mut self) {
263 self.internal_event_stream.0.set_await_active(false);
266 let _ = self
267 .internal_event_stream
268 .0
269 .broadcast_direct(Arc::new(HotShotEvent::Shutdown))
270 .await
271 .inspect_err(|err| tracing::error!("Failed to send shutdown event: {err}"));
272
273 tracing::error!("Shutting down the network!");
274 self.hotshot.network.shut_down().await;
275
276 tracing::error!("Shutting down network tasks!");
277 self.network_registry.shutdown().await;
278
279 tracing::error!("Shutting down consensus!");
280 self.consensus_registry.shutdown().await;
281 }
282
283 #[must_use]
285 pub fn next_view_timeout(&self) -> u64 {
286 self.hotshot.next_view_timeout()
287 }
288
289 #[allow(clippy::unused_async)] pub async fn leader(
295 &self,
296 view_number: TYPES::View,
297 epoch_number: Option<TYPES::Epoch>,
298 ) -> Result<TYPES::SignatureKey> {
299 self.hotshot
300 .membership_coordinator
301 .membership_for_epoch(epoch_number)
302 .await?
303 .leader(view_number)
304 .await
305 .context("Failed to lookup leader")
306 }
307
308 #[cfg(feature = "hotshot-testing")]
311 #[must_use]
312 pub fn public_key(&self) -> TYPES::SignatureKey {
313 self.hotshot.public_key.clone()
314 }
315
316 #[cfg(feature = "hotshot-testing")]
318 #[must_use]
319 pub fn external_channel_sender(&self) -> Sender<Event<TYPES>> {
320 self.output_event_stream.0.clone()
321 }
322
323 #[cfg(feature = "hotshot-testing")]
325 #[must_use]
326 pub fn internal_channel_sender(&self) -> Sender<Arc<HotShotEvent<TYPES>>> {
327 self.internal_event_stream.0.clone()
328 }
329
330 #[instrument(skip_all, target = "SystemContextHandle", fields(id = self.hotshot.id))]
332 pub async fn cur_view(&self) -> TYPES::View {
333 self.hotshot.consensus.read().await.cur_view()
334 }
335
336 #[instrument(skip_all, target = "SystemContextHandle", fields(id = self.hotshot.id))]
338 pub async fn cur_epoch(&self) -> Option<TYPES::Epoch> {
339 self.hotshot.consensus.read().await.cur_epoch()
340 }
341
342 #[must_use]
345 pub fn storage(&self) -> I::Storage {
346 self.storage.clone()
347 }
348
349 #[must_use]
351 pub fn storage_metrics(&self) -> Arc<StorageMetricsValue> {
352 Arc::clone(&self.hotshot.storage_metrics)
353 }
354}