1use std::sync::Arc;
10
11use anyhow::{Context, Result, anyhow};
12use async_broadcast::{InactiveReceiver, Receiver, Sender};
13use async_lock::RwLock;
14use committable::{Commitment, Committable};
15use futures::Stream;
16use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry, Task, TaskState};
17use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
18use hotshot_types::{
19 consensus::Consensus,
20 data::{EpochNumber, Leaf2, QuorumProposalWrapper, ViewNumber},
21 epoch_membership::EpochMembershipCoordinator,
22 error::HotShotError,
23 message::{Message, MessageKind, Proposal, RecipientList},
24 request_response::ProposalRequestPayload,
25 storage_metrics::StorageMetricsValue,
26 traits::{
27 consensus_api::ConsensusApi,
28 network::{BroadcastDelay, ConnectedNetwork, Topic},
29 node_implementation::NodeType,
30 signature_key::SignatureKey,
31 },
32 vote::HasViewNumber,
33};
34use tracing::instrument;
35
36use crate::{SystemContext, traits::NodeImplementation, types::Event};
37
38pub struct SystemContextHandle<TYPES: NodeType, I: NodeImplementation<TYPES>> {
44 pub(crate) output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
47
48 #[allow(clippy::type_complexity)]
50 pub(crate) internal_event_stream: (
51 Sender<Arc<HotShotEvent<TYPES>>>,
52 InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
53 ),
54 pub(crate) consensus_registry: ConsensusTaskRegistry<HotShotEvent<TYPES>>,
56
57 pub(crate) network_registry: NetworkTaskRegistry,
59
60 pub hotshot: Arc<SystemContext<TYPES, I>>,
62
63 pub(crate) storage: I::Storage,
65
66 pub network: Arc<I::Network>,
68
69 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
71
72 pub epoch_height: u64,
74}
75
76impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> SystemContextHandle<TYPES, I> {
77 pub fn add_task<S: TaskState<Event = HotShotEvent<TYPES>> + 'static>(&mut self, task_state: S) {
79 let task = Task::new(
80 task_state,
81 self.internal_event_stream.0.clone(),
82 self.internal_event_stream.1.activate_cloned(),
83 );
84
85 self.consensus_registry.run_task(task);
86 }
87
88 pub fn event_stream(&self) -> impl Stream<Item = Event<TYPES>> + use<TYPES, I> {
90 self.output_event_stream.1.activate_cloned()
91 }
92
93 pub async fn send_external_message(
100 &self,
101 msg: Vec<u8>,
102 recipients: RecipientList<TYPES::SignatureKey>,
103 ) -> Result<()> {
104 let message = Message {
105 sender: self.public_key().clone(),
106 kind: MessageKind::<TYPES>::External(msg),
107 };
108 let view: ViewNumber = message.view_number();
109 let serialized_message = self.hotshot.upgrade_lock.serialize(&message)?;
110
111 match recipients {
112 RecipientList::Broadcast => {
113 self.network
114 .broadcast_message(
115 view.u64().into(),
116 serialized_message,
117 Topic::Global,
118 BroadcastDelay::None,
119 )
120 .await?;
121 },
122 RecipientList::Direct(recipient) => {
123 self.network
124 .direct_message(view.u64().into(), serialized_message, recipient)
125 .await?;
126 },
127 RecipientList::Many(recipients) => {
128 self.network
129 .da_broadcast_message(
130 view.u64().into(),
131 serialized_message,
132 recipients,
133 BroadcastDelay::None,
134 )
135 .await?;
136 },
137 }
138 Ok(())
139 }
140
141 pub fn request_proposal(
148 &self,
149 view: ViewNumber,
150 leaf_commitment: Commitment<Leaf2<TYPES>>,
151 ) -> Result<
152 impl futures::Future<Output = Result<Proposal<TYPES, QuorumProposalWrapper<TYPES>>>>
153 + use<TYPES, I>,
154 > {
155 let signed_proposal_request = ProposalRequestPayload {
158 view_number: view,
159 key: self.public_key().clone(),
160 };
161
162 let signature = TYPES::SignatureKey::sign(
164 self.private_key(),
165 signed_proposal_request.commit().as_ref(),
166 )?;
167
168 let mut receiver = self.internal_event_stream.1.activate_cloned();
169 let sender = self.internal_event_stream.0.clone();
170 Ok(async move {
171 broadcast_event(
173 HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
174 &sender,
175 )
176 .await;
177 while let std::result::Result::Ok(event) = receiver.recv_direct().await {
178 if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
179 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
180 if leaf.view_number() == view && leaf.commit() == leaf_commitment {
181 return Ok(quorum_proposal.clone());
182 }
183 }
184 }
185 Err(anyhow!("No proposal found"))
186 })
187 }
188
189 #[must_use]
194 pub fn event_stream_known_impl(&self) -> Receiver<Event<TYPES>> {
195 self.output_event_stream.1.activate_cloned()
196 }
197
198 #[must_use]
200 pub fn internal_event_stream_sender(&self) -> Sender<Arc<HotShotEvent<TYPES>>> {
201 self.internal_event_stream.0.clone()
202 }
203
204 #[must_use]
211 pub fn internal_event_stream_receiver_known_impl(&self) -> Receiver<Arc<HotShotEvent<TYPES>>> {
212 self.internal_event_stream.1.activate_cloned()
213 }
214
215 pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
220 self.hotshot.decided_state().await
221 }
222
223 pub async fn state(&self, view: ViewNumber) -> Option<Arc<TYPES::ValidatedState>> {
231 self.hotshot.state(view).await
232 }
233
234 pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
239 self.hotshot.decided_leaf().await
240 }
241
242 #[must_use]
248 pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
249 self.hotshot.try_decided_leaf()
250 }
251
252 pub async fn submit_transaction(
261 &self,
262 tx: TYPES::Transaction,
263 ) -> Result<(), HotShotError<TYPES>> {
264 self.hotshot.publish_transaction_async(tx).await
265 }
266
267 #[must_use]
269 pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
270 self.hotshot.consensus()
271 }
272
273 pub async fn shut_down(&mut self) {
275 self.internal_event_stream.0.set_await_active(false);
278 let _ = self
279 .internal_event_stream
280 .0
281 .broadcast_direct(Arc::new(HotShotEvent::Shutdown))
282 .await
283 .inspect_err(|err| tracing::error!("Failed to send shutdown event: {err}"));
284
285 tracing::error!("Shutting down the network!");
286 self.hotshot.network.shut_down().await;
287
288 tracing::error!("Shutting down network tasks!");
289 self.network_registry.shutdown().await;
290
291 tracing::error!("Shutting down consensus!");
292 self.consensus_registry.shutdown().await;
293 }
294
295 #[must_use]
297 pub fn next_view_timeout(&self) -> u64 {
298 self.hotshot.next_view_timeout()
299 }
300
301 #[allow(clippy::unused_async)] pub async fn leader(
307 &self,
308 view_number: ViewNumber,
309 epoch_number: Option<EpochNumber>,
310 ) -> Result<TYPES::SignatureKey> {
311 self.hotshot
312 .membership_coordinator
313 .membership_for_epoch(epoch_number)
314 .await?
315 .leader(view_number)
316 .await
317 .context("Failed to lookup leader")
318 }
319
320 #[cfg(feature = "hotshot-testing")]
323 #[must_use]
324 pub fn public_key(&self) -> TYPES::SignatureKey {
325 self.hotshot.public_key.clone()
326 }
327
328 #[cfg(feature = "hotshot-testing")]
330 #[must_use]
331 pub fn external_channel_sender(&self) -> Sender<Event<TYPES>> {
332 self.output_event_stream.0.clone()
333 }
334
335 #[cfg(feature = "hotshot-testing")]
337 #[must_use]
338 pub fn internal_channel_sender(&self) -> Sender<Arc<HotShotEvent<TYPES>>> {
339 self.internal_event_stream.0.clone()
340 }
341
342 #[instrument(skip_all, target = "SystemContextHandle", fields(id = self.hotshot.id))]
344 pub async fn cur_view(&self) -> ViewNumber {
345 self.hotshot.consensus.read().await.cur_view()
346 }
347
348 #[instrument(skip_all, target = "SystemContextHandle", fields(id = self.hotshot.id))]
350 pub async fn cur_epoch(&self) -> Option<EpochNumber> {
351 self.hotshot.consensus.read().await.cur_epoch()
352 }
353
354 #[must_use]
357 pub fn storage(&self) -> I::Storage {
358 self.storage.clone()
359 }
360
361 #[must_use]
363 pub fn storage_metrics(&self) -> Arc<StorageMetricsValue> {
364 Arc::clone(&self.hotshot.storage_metrics)
365 }
366}