1use std::{
8 collections::{BTreeMap, BTreeSet},
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13 time::Duration,
14};
15
16use async_broadcast::{Receiver, Sender};
17use async_trait::async_trait;
18use hotshot_task::task::TaskState;
19use hotshot_types::{
20 consensus::OuterConsensus,
21 epoch_membership::EpochMembershipCoordinator,
22 simple_vote::HasEpoch,
23 traits::{
24 block_contents::BlockHeader,
25 network::{ConnectedNetwork, DataRequest, RequestKind},
26 node_implementation::{NodeImplementation, NodeType},
27 signature_key::SignatureKey,
28 },
29 utils::is_epoch_transition,
30 vote::HasViewNumber,
31};
32use hotshot_utils::anytrace::*;
33use rand::{seq::SliceRandom, thread_rng};
34use sha2::{Digest, Sha256};
35use tokio::{spawn, task::JoinHandle, time::sleep};
36use tracing::instrument;
37
38use crate::{events::HotShotEvent, helpers::broadcast_event};
39
40pub const REQUEST_TIMEOUT: Duration = Duration::from_millis(500);
42
43pub struct NetworkRequestState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
48 pub network: Arc<I::Network>,
51
52 pub consensus: OuterConsensus<TYPES>,
55
56 pub view: TYPES::View,
58
59 pub delay: Duration,
61
62 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
64
65 pub public_key: TYPES::SignatureKey,
67
68 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
70
71 pub id: u64,
73
74 pub shutdown_flag: Arc<AtomicBool>,
76
77 pub spawned_tasks: BTreeMap<TYPES::View, Vec<JoinHandle<()>>>,
79
80 pub epoch_height: u64,
82}
83
84impl<TYPES: NodeType, I: NodeImplementation<TYPES>> Drop for NetworkRequestState<TYPES, I> {
85 fn drop(&mut self) {
86 self.cancel_subtasks();
87 }
88}
89
90type Signature<TYPES> =
92 <<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType;
93
94#[async_trait]
95impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for NetworkRequestState<TYPES, I> {
96 type Event = HotShotEvent<TYPES>;
97
98 #[instrument(skip_all, fields(id = self.id), name = "NetworkRequestState")]
99 async fn handle_event(
100 &mut self,
101 event: Arc<Self::Event>,
102 sender: &Sender<Arc<Self::Event>>,
103 _receiver: &Receiver<Arc<Self::Event>>,
104 ) -> Result<()> {
105 match event.as_ref() {
106 HotShotEvent::QuorumProposalValidated(proposal, _) => {
107 let prop_view = proposal.data.view_number();
108 let prop_epoch = proposal.data.epoch();
109
110 let membership = self
114 .membership_coordinator
115 .stake_table_for_epoch(prop_epoch)
116 .await?;
117 let mut target_epochs = BTreeSet::new();
118 if membership.has_stake(&self.public_key).await {
119 target_epochs.insert(prop_epoch);
120 }
121 if is_epoch_transition(
122 proposal.data.block_header().block_number(),
123 self.epoch_height,
124 ) && membership
125 .next_epoch_stake_table()
126 .await?
127 .has_stake(&self.public_key)
128 .await
129 {
130 target_epochs.insert(prop_epoch.map(|e| e + 1));
131 }
132
133 ensure!(
134 !target_epochs.is_empty(),
135 "We don't belong to the current epoch and we don't belong to the next epoch. \
136 Do not request VID share."
137 );
138
139 let consensus_reader = self.consensus.read().await;
140 let maybe_vid_share = consensus_reader
141 .vid_shares()
142 .get(&prop_view)
143 .and_then(|shares| shares.get(&self.public_key));
144 if prop_view >= self.view
146 && (maybe_vid_share.is_none()
147 || !target_epochs
148 .iter()
149 .all(|e| maybe_vid_share.unwrap().contains_key(e)))
150 {
151 drop(consensus_reader);
152 self.spawn_requests(prop_view, prop_epoch, sender, target_epochs)
153 .await;
154 }
155 Ok(())
156 },
157 HotShotEvent::VidResponseRecv(sender_key, vid_proposal) => {
158 let view = vid_proposal.data.view_number();
159 let epoch = vid_proposal.data.epoch();
160
161 let membership_reader = self
163 .membership_coordinator
164 .membership_for_epoch(epoch)
165 .await?;
166 let mut da_committee_for_view = membership_reader.da_committee_members(view).await;
167 if let Ok(leader) = membership_reader.leader(view).await {
168 da_committee_for_view.insert(leader);
169 }
170 drop(membership_reader);
171
172 ensure!(
173 self.spawned_tasks.contains_key(&view),
174 info!("Received VidResponseRecv for view we didn't expect, view {view:?}")
175 );
176
177 ensure!(
178 da_committee_for_view.contains(sender_key),
179 warn!("Received VidResponseRecv from unexpected sender key {sender_key:?}")
180 );
181
182 ensure!(
183 sender_key.validate(
184 &vid_proposal.signature,
185 vid_proposal.data.payload_commitment_ref()
186 ),
187 warn!("Received VidResponseRecv with invalid signature")
188 );
189
190 tracing::debug!("Received VidResponseRecv {vid_proposal:?}");
191 broadcast_event(
192 Arc::new(HotShotEvent::VidShareRecv(
193 sender_key.clone(),
194 vid_proposal.clone(),
195 )),
196 sender,
197 )
198 .await;
199 Ok(())
200 },
201 HotShotEvent::ViewChange(view, _) => {
202 let view = *view;
203 if view > self.view {
204 self.view = view;
205 }
206 self.spawned_tasks
208 .range_mut(..self.view)
209 .for_each(|(_, handles)| {
210 handles.retain(|handle| !handle.is_finished());
211 });
212 self.spawned_tasks
213 .retain(|view, handles| view >= &self.view || !handles.is_empty());
214 Ok(())
215 },
216 _ => Ok(()),
217 }
218 }
219
220 fn cancel_subtasks(&mut self) {
221 self.shutdown_flag.store(true, Ordering::Relaxed);
222
223 while !self.spawned_tasks.is_empty() {
224 let Some((_, handles)) = self.spawned_tasks.pop_first() else {
225 break;
226 };
227
228 for handle in handles {
229 handle.abort();
230 }
231 }
232 }
233}
234
235impl<TYPES: NodeType, I: NodeImplementation<TYPES>> NetworkRequestState<TYPES, I> {
236 async fn spawn_requests(
238 &mut self,
239 view: TYPES::View,
240 prop_epoch: Option<TYPES::Epoch>,
241 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
242 target_epochs: BTreeSet<Option<TYPES::Epoch>>,
243 ) {
244 let request = RequestKind::Vid(view, self.public_key.clone());
245
246 if let Some(signature) = self.serialize_and_sign(&request) {
248 self.create_vid_request_task(
249 request,
250 signature,
251 sender.clone(),
252 view,
253 prop_epoch,
254 target_epochs,
255 )
256 .await;
257 }
258 }
259
260 async fn create_vid_request_task(
263 &mut self,
264 request: RequestKind<TYPES>,
265 signature: Signature<TYPES>,
266 sender: Sender<Arc<HotShotEvent<TYPES>>>,
267 view: TYPES::View,
268 prop_epoch: Option<TYPES::Epoch>,
269 mut target_epochs: BTreeSet<Option<TYPES::Epoch>>,
270 ) {
271 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
272 let network = Arc::clone(&self.network);
273 let shutdown_flag = Arc::clone(&self.shutdown_flag);
274 let delay = self.delay;
275 let public_key = self.public_key.clone();
276
277 let membership_reader = match self
279 .membership_coordinator
280 .membership_for_epoch(prop_epoch)
281 .await
282 {
283 Ok(m) => m,
284 Err(e) => {
285 tracing::warn!(e.message);
286 return;
287 },
288 };
289 let mut recipients: Vec<TYPES::SignatureKey> = membership_reader
291 .da_committee_members(view)
292 .await
293 .into_iter()
294 .collect();
295
296 recipients.shuffle(&mut thread_rng());
299
300 let data_request = DataRequest::<TYPES> {
302 request,
303 view,
304 signature,
305 };
306 let my_id = self.id;
307 let handle: JoinHandle<()> = spawn(async move {
308 if !network.is_primary_down() {
310 sleep(delay).await;
311 }
312
313 let mut recipients_it = recipients.iter();
314 while !Self::cancel_vid_request_task(
316 &consensus,
317 &sender,
318 &public_key,
319 &view,
320 &shutdown_flag,
321 my_id,
322 &mut target_epochs,
323 )
324 .await
325 {
326 if let Some(recipient) = recipients_it.next() {
328 if *recipient == public_key {
329 continue;
332 }
333 tracing::debug!("Sending VidRequestSend {data_request:?}, my id {my_id:?}");
334 broadcast_event(
336 HotShotEvent::VidRequestSend(
337 data_request.clone(),
338 public_key.clone(),
339 recipient.clone(),
340 )
341 .into(),
342 &sender,
343 )
344 .await;
345 sleep(REQUEST_TIMEOUT).await;
347 } else {
348 tracing::warn!(
350 "Sent VID request to all available DA members and got no response for \
351 view: {view:?}, my id: {my_id:?}"
352 );
353 return;
354 }
355 }
356 });
357 self.spawned_tasks.entry(view).or_default().push(handle);
358 }
359
360 async fn cancel_vid_request_task(
362 consensus: &OuterConsensus<TYPES>,
363 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
364 public_key: &<TYPES as NodeType>::SignatureKey,
365 view: &TYPES::View,
366 shutdown_flag: &Arc<AtomicBool>,
367 id: u64,
368 target_epochs: &mut BTreeSet<Option<TYPES::Epoch>>,
369 ) -> bool {
370 let consensus_reader = consensus.read().await;
371
372 let maybe_vid_shares = consensus_reader
373 .vid_shares()
374 .get(view)
375 .and_then(|key_map| key_map.get(public_key));
376 if let Some(vid_shares) = maybe_vid_shares {
377 tracing::debug!("Send own vid share: {vid_shares:?}, my id {id:?}");
378 for vid_share in vid_shares.values() {
379 broadcast_event(
380 Arc::new(HotShotEvent::VidShareRecv(
381 public_key.clone(),
382 vid_share.clone(),
383 )),
384 sender,
385 )
386 .await;
387 target_epochs.remove(&vid_share.data.target_epoch());
388 }
389 }
390 let cancel = shutdown_flag.load(Ordering::Relaxed)
391 || consensus_reader.cur_view() > *view
392 || target_epochs.is_empty();
393 if cancel {
394 tracing::debug!(
395 "Canceling vid request for view {:?}, cur view is {:?}, my id {:?}",
396 view,
397 consensus_reader.cur_view(),
398 id,
399 );
400 }
401 cancel
402 }
403
404 fn serialize_and_sign(&self, request: &RequestKind<TYPES>) -> Option<Signature<TYPES>> {
406 let Ok(data) = bincode::serialize(&request) else {
407 tracing::error!("Failed to serialize request!");
408 return None;
409 };
410 let Ok(signature) = TYPES::SignatureKey::sign(&self.private_key, &Sha256::digest(data))
411 else {
412 tracing::error!("Failed to sign Data Request");
413 return None;
414 };
415 Some(signature)
416 }
417}