1use std::{
8 collections::{BTreeMap, BTreeSet},
9 sync::{
10 Arc,
11 atomic::{AtomicBool, Ordering},
12 },
13 time::Duration,
14};
15
16use async_broadcast::{Receiver, Sender};
17use async_trait::async_trait;
18use hotshot_task::task::TaskState;
19use hotshot_types::{
20 consensus::OuterConsensus,
21 data::{EpochNumber, ViewNumber},
22 epoch_membership::EpochMembershipCoordinator,
23 simple_vote::HasEpoch,
24 traits::{
25 block_contents::BlockHeader,
26 network::{ConnectedNetwork, DataRequest, RequestKind},
27 node_implementation::{NodeImplementation, NodeType},
28 signature_key::SignatureKey,
29 },
30 utils::is_epoch_transition,
31 vote::HasViewNumber,
32};
33use hotshot_utils::anytrace::*;
34use rand::{seq::SliceRandom, thread_rng};
35use sha2::{Digest, Sha256};
36use tokio::{spawn, task::JoinHandle, time::sleep};
37use tracing::instrument;
38
39use crate::{events::HotShotEvent, helpers::broadcast_event};
40
41pub const REQUEST_TIMEOUT: Duration = Duration::from_millis(500);
43
44pub struct NetworkRequestState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
49 pub network: Arc<I::Network>,
52
53 pub consensus: OuterConsensus<TYPES>,
56
57 pub view: ViewNumber,
59
60 pub delay: Duration,
62
63 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
65
66 pub public_key: TYPES::SignatureKey,
68
69 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
71
72 pub id: u64,
74
75 pub shutdown_flag: Arc<AtomicBool>,
77
78 pub spawned_tasks: BTreeMap<ViewNumber, Vec<JoinHandle<()>>>,
80
81 pub epoch_height: u64,
83}
84
85impl<TYPES: NodeType, I: NodeImplementation<TYPES>> Drop for NetworkRequestState<TYPES, I> {
86 fn drop(&mut self) {
87 self.cancel_subtasks();
88 }
89}
90
91type Signature<TYPES> =
93 <<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType;
94
95#[async_trait]
96impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for NetworkRequestState<TYPES, I> {
97 type Event = HotShotEvent<TYPES>;
98
99 #[instrument(skip_all, fields(id = self.id), name = "NetworkRequestState")]
100 async fn handle_event(
101 &mut self,
102 event: Arc<Self::Event>,
103 sender: &Sender<Arc<Self::Event>>,
104 _receiver: &Receiver<Arc<Self::Event>>,
105 ) -> Result<()> {
106 match event.as_ref() {
107 HotShotEvent::QuorumProposalValidated(proposal, _) => {
108 let prop_view = proposal.data.view_number();
109 let prop_epoch = proposal.data.epoch();
110
111 let membership = self
115 .membership_coordinator
116 .stake_table_for_epoch(prop_epoch)
117 .await?;
118 let mut target_epochs = BTreeSet::new();
119 if membership.has_stake(&self.public_key).await {
120 target_epochs.insert(prop_epoch);
121 }
122 if is_epoch_transition(
123 proposal.data.block_header().block_number(),
124 self.epoch_height,
125 ) && membership
126 .next_epoch_stake_table()
127 .await?
128 .has_stake(&self.public_key)
129 .await
130 {
131 target_epochs.insert(prop_epoch.map(|e| e + 1));
132 }
133
134 ensure!(
135 !target_epochs.is_empty(),
136 "We don't belong to the current epoch and we don't belong to the next epoch. \
137 Do not request VID share."
138 );
139
140 let consensus_reader = self.consensus.read().await;
141 let maybe_vid_share = consensus_reader
142 .vid_shares()
143 .get(&prop_view)
144 .and_then(|shares| shares.get(&self.public_key));
145 if prop_view >= self.view
147 && (maybe_vid_share.is_none()
148 || !target_epochs
149 .iter()
150 .all(|e| maybe_vid_share.unwrap().contains_key(e)))
151 {
152 drop(consensus_reader);
153 self.spawn_requests(prop_view, prop_epoch, sender, target_epochs)
154 .await;
155 }
156 Ok(())
157 },
158 HotShotEvent::VidResponseRecv(sender_key, vid_proposal) => {
159 let view = vid_proposal.data.view_number();
160 let epoch = vid_proposal.data.epoch();
161
162 let membership_reader = self
164 .membership_coordinator
165 .membership_for_epoch(epoch)
166 .await?;
167 let mut da_committee_for_view = membership_reader.da_committee_members(view).await;
168 if let Ok(leader) = membership_reader.leader(view).await {
169 da_committee_for_view.insert(leader);
170 }
171 drop(membership_reader);
172
173 ensure!(
174 self.spawned_tasks.contains_key(&view),
175 info!("Received VidResponseRecv for view we didn't expect, view {view:?}")
176 );
177
178 ensure!(
179 da_committee_for_view.contains(sender_key),
180 warn!("Received VidResponseRecv from unexpected sender key {sender_key:?}")
181 );
182
183 ensure!(
184 sender_key.validate(
185 &vid_proposal.signature,
186 vid_proposal.data.payload_commitment_ref()
187 ),
188 warn!("Received VidResponseRecv with invalid signature")
189 );
190
191 tracing::debug!("Received VidResponseRecv {vid_proposal:?}");
192 broadcast_event(
193 Arc::new(HotShotEvent::VidShareRecv(
194 sender_key.clone(),
195 vid_proposal.clone(),
196 )),
197 sender,
198 )
199 .await;
200 Ok(())
201 },
202 HotShotEvent::ViewChange(view, _) => {
203 let view = *view;
204 if view > self.view {
205 self.view = view;
206 }
207 self.spawned_tasks
209 .range_mut(..self.view)
210 .for_each(|(_, handles)| {
211 handles.retain(|handle| !handle.is_finished());
212 });
213 self.spawned_tasks
214 .retain(|view, handles| view >= &self.view || !handles.is_empty());
215 Ok(())
216 },
217 _ => Ok(()),
218 }
219 }
220
221 fn cancel_subtasks(&mut self) {
222 self.shutdown_flag.store(true, Ordering::Relaxed);
223
224 while !self.spawned_tasks.is_empty() {
225 let Some((_, handles)) = self.spawned_tasks.pop_first() else {
226 break;
227 };
228
229 for handle in handles {
230 handle.abort();
231 }
232 }
233 }
234}
235
236impl<TYPES: NodeType, I: NodeImplementation<TYPES>> NetworkRequestState<TYPES, I> {
237 async fn spawn_requests(
239 &mut self,
240 view: ViewNumber,
241 prop_epoch: Option<EpochNumber>,
242 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
243 target_epochs: BTreeSet<Option<EpochNumber>>,
244 ) {
245 let request = RequestKind::Vid(view, self.public_key.clone());
246
247 if let Some(signature) = self.serialize_and_sign(&request) {
249 self.create_vid_request_task(
250 request,
251 signature,
252 sender.clone(),
253 view,
254 prop_epoch,
255 target_epochs,
256 )
257 .await;
258 }
259 }
260
261 async fn create_vid_request_task(
264 &mut self,
265 request: RequestKind<TYPES>,
266 signature: Signature<TYPES>,
267 sender: Sender<Arc<HotShotEvent<TYPES>>>,
268 view: ViewNumber,
269 prop_epoch: Option<EpochNumber>,
270 mut target_epochs: BTreeSet<Option<EpochNumber>>,
271 ) {
272 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
273 let network = Arc::clone(&self.network);
274 let shutdown_flag = Arc::clone(&self.shutdown_flag);
275 let delay = self.delay;
276 let public_key = self.public_key.clone();
277
278 let membership_reader = match self
280 .membership_coordinator
281 .membership_for_epoch(prop_epoch)
282 .await
283 {
284 Ok(m) => m,
285 Err(e) => {
286 tracing::warn!(e.message);
287 return;
288 },
289 };
290 let mut recipients: Vec<TYPES::SignatureKey> = membership_reader
292 .da_committee_members(view)
293 .await
294 .into_iter()
295 .collect();
296
297 recipients.shuffle(&mut thread_rng());
300
301 let data_request = DataRequest::<TYPES> {
303 request,
304 view,
305 signature,
306 };
307 let my_id = self.id;
308 let handle: JoinHandle<()> = spawn(async move {
309 if !network.is_primary_down() {
311 sleep(delay).await;
312 }
313
314 let mut recipients_it = recipients.iter();
315 while !Self::cancel_vid_request_task(
317 &consensus,
318 &public_key,
319 &view,
320 &shutdown_flag,
321 my_id,
322 &mut target_epochs,
323 )
324 .await
325 {
326 if let Some(recipient) = recipients_it.next() {
328 if *recipient == public_key {
329 continue;
332 }
333 tracing::debug!("Sending VidRequestSend {data_request:?}, my id {my_id:?}");
334 broadcast_event(
336 HotShotEvent::VidRequestSend(
337 data_request.clone(),
338 public_key.clone(),
339 recipient.clone(),
340 )
341 .into(),
342 &sender,
343 )
344 .await;
345 sleep(REQUEST_TIMEOUT).await;
347 } else {
348 tracing::warn!(
350 "Sent VID request to all available DA members and got no response for \
351 view: {view:?}, my id: {my_id:?}"
352 );
353 return;
354 }
355 }
356 });
357 self.spawned_tasks.entry(view).or_default().push(handle);
358 }
359
360 async fn cancel_vid_request_task(
362 consensus: &OuterConsensus<TYPES>,
363 public_key: &<TYPES as NodeType>::SignatureKey,
364 view: &ViewNumber,
365 shutdown_flag: &Arc<AtomicBool>,
366 id: u64,
367 target_epochs: &mut BTreeSet<Option<EpochNumber>>,
368 ) -> bool {
369 let consensus_reader = consensus.read().await;
370
371 let maybe_vid_shares = consensus_reader
372 .vid_shares()
373 .get(view)
374 .and_then(|key_map| key_map.get(public_key));
375 if let Some(vid_shares) = maybe_vid_shares {
376 tracing::debug!("Send own vid share: {vid_shares:?}, my id {id:?}");
377 for vid_share in vid_shares.values() {
378 target_epochs.remove(&vid_share.data.target_epoch());
379 }
380 }
381 let cancel = shutdown_flag.load(Ordering::Relaxed)
382 || consensus_reader.cur_view() > *view
383 || target_epochs.is_empty();
384 if cancel {
385 tracing::debug!(
386 "Canceling vid request for view {:?}, cur view is {:?}, my id {:?}",
387 view,
388 consensus_reader.cur_view(),
389 id,
390 );
391 }
392 cancel
393 }
394
395 fn serialize_and_sign(&self, request: &RequestKind<TYPES>) -> Option<Signature<TYPES>> {
397 let Ok(data) = bincode::serialize(&request) else {
398 tracing::error!("Failed to serialize request!");
399 return None;
400 };
401 let Ok(signature) = TYPES::SignatureKey::sign(&self.private_key, &Sha256::digest(data))
402 else {
403 tracing::error!("Failed to sign Data Request");
404 return None;
405 };
406 Some(signature)
407 }
408}