1use std::sync::Arc;
10
11use anyhow::{anyhow, Context, Result};
12use async_broadcast::{InactiveReceiver, Receiver, Sender};
13use async_lock::RwLock;
14use committable::{Commitment, Committable};
15use futures::Stream;
16use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry, Task, TaskState};
17use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
18use hotshot_types::{
19 consensus::Consensus,
20 data::{Leaf2, QuorumProposalWrapper},
21 epoch_membership::EpochMembershipCoordinator,
22 error::HotShotError,
23 message::{Message, MessageKind, Proposal, RecipientList},
24 request_response::ProposalRequestPayload,
25 storage_metrics::StorageMetricsValue,
26 traits::{
27 consensus_api::ConsensusApi,
28 network::{BroadcastDelay, ConnectedNetwork, Topic},
29 node_implementation::{ConsensusTime, NodeType},
30 signature_key::SignatureKey,
31 },
32 vote::HasViewNumber,
33};
34use tracing::instrument;
35
36use crate::{traits::NodeImplementation, types::Event, SystemContext, Versions};
37
38pub struct SystemContextHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
44 pub(crate) output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
47
48 #[allow(clippy::type_complexity)]
50 pub(crate) internal_event_stream: (
51 Sender<Arc<HotShotEvent<TYPES>>>,
52 InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
53 ),
54 pub(crate) consensus_registry: ConsensusTaskRegistry<HotShotEvent<TYPES>>,
56
57 pub(crate) network_registry: NetworkTaskRegistry,
59
60 pub hotshot: Arc<SystemContext<TYPES, I, V>>,
62
63 pub(crate) storage: I::Storage,
65
66 pub network: Arc<I::Network>,
68
69 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
71
72 pub epoch_height: u64,
74}
75
76impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
77 SystemContextHandle<TYPES, I, V>
78{
79 pub fn add_task<S: TaskState<Event = HotShotEvent<TYPES>> + 'static>(&mut self, task_state: S) {
81 let task = Task::new(
82 task_state,
83 self.internal_event_stream.0.clone(),
84 self.internal_event_stream.1.activate_cloned(),
85 );
86
87 self.consensus_registry.run_task(task);
88 }
89
90 pub fn event_stream(&self) -> impl Stream<Item = Event<TYPES>> {
92 self.output_event_stream.1.activate_cloned()
93 }
94
95 pub async fn send_external_message(
102 &self,
103 msg: Vec<u8>,
104 recipients: RecipientList<TYPES::SignatureKey>,
105 ) -> Result<()> {
106 let message = Message {
107 sender: self.public_key().clone(),
108 kind: MessageKind::External(msg),
109 };
110 let view: TYPES::View = message.view_number();
111 let serialized_message = self.hotshot.upgrade_lock.serialize(&message).await?;
112
113 match recipients {
114 RecipientList::Broadcast => {
115 self.network
116 .broadcast_message(
117 view.u64().into(),
118 serialized_message,
119 Topic::Global,
120 BroadcastDelay::None,
121 )
122 .await?;
123 },
124 RecipientList::Direct(recipient) => {
125 self.network
126 .direct_message(view.u64().into(), serialized_message, recipient)
127 .await?;
128 },
129 RecipientList::Many(recipients) => {
130 self.network
131 .da_broadcast_message(
132 view.u64().into(),
133 serialized_message,
134 recipients,
135 BroadcastDelay::None,
136 )
137 .await?;
138 },
139 }
140 Ok(())
141 }
142
143 pub fn request_proposal(
150 &self,
151 view: TYPES::View,
152 leaf_commitment: Commitment<Leaf2<TYPES>>,
153 ) -> Result<impl futures::Future<Output = Result<Proposal<TYPES, QuorumProposalWrapper<TYPES>>>>>
154 {
155 let signed_proposal_request = ProposalRequestPayload {
158 view_number: view,
159 key: self.public_key().clone(),
160 };
161
162 let signature = TYPES::SignatureKey::sign(
164 self.private_key(),
165 signed_proposal_request.commit().as_ref(),
166 )?;
167
168 let mut receiver = self.internal_event_stream.1.activate_cloned();
169 let sender = self.internal_event_stream.0.clone();
170 Ok(async move {
171 broadcast_event(
173 HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
174 &sender,
175 )
176 .await;
177 while let std::result::Result::Ok(event) = receiver.recv_direct().await {
178 if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
179 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
180 if leaf.view_number() == view && leaf.commit() == leaf_commitment {
181 return Ok(quorum_proposal.clone());
182 }
183 }
184 }
185 Err(anyhow!("No proposal found"))
186 })
187 }
188
189 #[must_use]
194 pub fn event_stream_known_impl(&self) -> Receiver<Event<TYPES>> {
195 self.output_event_stream.1.activate_cloned()
196 }
197
198 #[must_use]
200 pub fn internal_event_stream_sender(&self) -> Sender<Arc<HotShotEvent<TYPES>>> {
201 self.internal_event_stream.0.clone()
202 }
203
204 #[must_use]
211 pub fn internal_event_stream_receiver_known_impl(&self) -> Receiver<Arc<HotShotEvent<TYPES>>> {
212 self.internal_event_stream.1.activate_cloned()
213 }
214
215 pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
220 self.hotshot.decided_state().await
221 }
222
223 pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
231 self.hotshot.state(view).await
232 }
233
234 pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
239 self.hotshot.decided_leaf().await
240 }
241
242 #[must_use]
248 pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
249 self.hotshot.try_decided_leaf()
250 }
251
252 pub async fn submit_transaction(
261 &self,
262 tx: TYPES::Transaction,
263 ) -> Result<(), HotShotError<TYPES>> {
264 self.hotshot.publish_transaction_async(tx).await
265 }
266
267 #[must_use]
269 pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
270 self.hotshot.consensus()
271 }
272
273 pub async fn shut_down(&mut self) {
275 self.internal_event_stream.0.set_await_active(false);
278 let _ = self
279 .internal_event_stream
280 .0
281 .broadcast_direct(Arc::new(HotShotEvent::Shutdown))
282 .await
283 .inspect_err(|err| tracing::error!("Failed to send shutdown event: {err}"));
284
285 tracing::error!("Shutting down the network!");
286 self.hotshot.network.shut_down().await;
287
288 tracing::error!("Shutting down network tasks!");
289 self.network_registry.shutdown().await;
290
291 tracing::error!("Shutting down consensus!");
292 self.consensus_registry.shutdown().await;
293 }
294
295 #[must_use]
297 pub fn next_view_timeout(&self) -> u64 {
298 self.hotshot.next_view_timeout()
299 }
300
301 #[allow(clippy::unused_async)] pub async fn leader(
307 &self,
308 view_number: TYPES::View,
309 epoch_number: Option<TYPES::Epoch>,
310 ) -> Result<TYPES::SignatureKey> {
311 self.hotshot
312 .membership_coordinator
313 .membership_for_epoch(epoch_number)
314 .await?
315 .leader(view_number)
316 .await
317 .context("Failed to lookup leader")
318 }
319
320 #[cfg(feature = "hotshot-testing")]
323 #[must_use]
324 pub fn public_key(&self) -> TYPES::SignatureKey {
325 self.hotshot.public_key.clone()
326 }
327
328 #[cfg(feature = "hotshot-testing")]
330 #[must_use]
331 pub fn external_channel_sender(&self) -> Sender<Event<TYPES>> {
332 self.output_event_stream.0.clone()
333 }
334
335 #[cfg(feature = "hotshot-testing")]
337 #[must_use]
338 pub fn internal_channel_sender(&self) -> Sender<Arc<HotShotEvent<TYPES>>> {
339 self.internal_event_stream.0.clone()
340 }
341
342 #[instrument(skip_all, target = "SystemContextHandle", fields(id = self.hotshot.id))]
344 pub async fn cur_view(&self) -> TYPES::View {
345 self.hotshot.consensus.read().await.cur_view()
346 }
347
348 #[instrument(skip_all, target = "SystemContextHandle", fields(id = self.hotshot.id))]
350 pub async fn cur_epoch(&self) -> Option<TYPES::Epoch> {
351 self.hotshot.consensus.read().await.cur_epoch()
352 }
353
354 #[must_use]
357 pub fn storage(&self) -> I::Storage {
358 self.storage.clone()
359 }
360
361 #[must_use]
363 pub fn storage_metrics(&self) -> Arc<StorageMetricsValue> {
364 Arc::clone(&self.hotshot.storage_metrics)
365 }
366}