1use std::{sync::Arc, time::Duration};
8
9use async_broadcast::{Receiver, Sender};
10use committable::Committable;
11use hotshot_types::{
12 consensus::{Consensus, LockedConsensusState, OuterConsensus},
13 data::VidDisperseShare,
14 epoch_membership::EpochMembershipCoordinator,
15 message::{Proposal, UpgradeLock},
16 traits::{
17 network::DataRequest,
18 node_implementation::{NodeType, Versions},
19 signature_key::SignatureKey,
20 },
21 utils::{View, ViewInner},
22};
23use sha2::{Digest, Sha256};
24use tokio::{spawn, task::JoinHandle, time::sleep};
25use tracing::instrument;
26
27use crate::{events::HotShotEvent, helpers::broadcast_event};
28const TXNS_TIMEOUT: Duration = Duration::from_millis(100);
30
31pub struct NetworkResponseState<TYPES: NodeType, V: Versions> {
35 consensus: LockedConsensusState<TYPES>,
37
38 membership: EpochMembershipCoordinator<TYPES>,
40
41 pub_key: TYPES::SignatureKey,
43
44 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
46
47 id: u64,
49
50 upgrade_lock: UpgradeLock<TYPES, V>,
52}
53
54impl<TYPES: NodeType, V: Versions> NetworkResponseState<TYPES, V> {
55 pub fn new(
57 consensus: LockedConsensusState<TYPES>,
58 membership: EpochMembershipCoordinator<TYPES>,
59 pub_key: TYPES::SignatureKey,
60 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
61 id: u64,
62 upgrade_lock: UpgradeLock<TYPES, V>,
63 ) -> Self {
64 Self {
65 consensus,
66 membership,
67 pub_key,
68 private_key,
69 id,
70 upgrade_lock,
71 }
72 }
73
74 #[instrument(skip_all, fields(id = self.id), name = "NetworkResponseState")]
76 async fn run_response_loop(
77 self,
78 mut receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
79 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
80 ) {
81 loop {
82 match receiver.recv_direct().await {
83 Ok(event) => {
84 match event.as_ref() {
86 HotShotEvent::VidRequestRecv(request, sender) => {
87 if !valid_signature::<TYPES>(request, sender) {
89 continue;
90 }
91 for vid_share in self.get_or_calc_vid_share(request.view, sender).await
92 {
93 tracing::debug!("Sending VID response {vid_share:?}");
94 broadcast_event(
95 HotShotEvent::VidResponseSend(
96 self.pub_key.clone(),
97 sender.clone(),
98 vid_share,
99 )
100 .into(),
101 &event_sender,
102 )
103 .await;
104 }
105 },
106 HotShotEvent::QuorumProposalRequestRecv(req, signature) => {
107 if !req.key.validate(signature, req.commit().as_ref()) {
109 tracing::warn!("Invalid signature key on proposal request.");
110 return;
111 }
112
113 let quorum_proposal_result = self
114 .consensus
115 .read()
116 .await
117 .last_proposals()
118 .get(&req.view_number)
119 .cloned();
120 if let Some(quorum_proposal) = quorum_proposal_result {
121 broadcast_event(
122 HotShotEvent::QuorumProposalResponseSend(
123 req.key.clone(),
124 quorum_proposal,
125 )
126 .into(),
127 &event_sender,
128 )
129 .await;
130 }
131 },
132 HotShotEvent::Shutdown => {
133 return;
134 },
135 _ => {},
136 }
137 },
138 Err(e) => {
139 tracing::error!("Failed to receive event: {e:?}");
140 },
141 }
142 }
143 }
144
145 async fn get_or_calc_vid_share(
149 &self,
150 view: TYPES::View,
151 sender: &TYPES::SignatureKey,
152 ) -> Vec<Proposal<TYPES, VidDisperseShare<TYPES>>> {
153 let consensus_reader = self.consensus.read().await;
154 let cur_epoch = consensus_reader.cur_epoch();
155 let next_epoch = cur_epoch.map(|epoch| epoch + 1);
156 let is_transition_block = match consensus_reader.validated_state_map().get(&view) {
157 Some(View {
158 view_inner:
159 ViewInner::Leaf {
160 leaf: leaf_commit, ..
161 },
162 }) => consensus_reader.is_epoch_transition(*leaf_commit),
163 _ => false,
164 };
165 drop(consensus_reader);
166
167 let mut target_epochs = vec![];
169 if self.valid_sender(sender, cur_epoch).await {
170 target_epochs.push(cur_epoch);
172 }
173 if is_transition_block && self.valid_sender(sender, next_epoch).await {
174 target_epochs.push(next_epoch);
176 }
177
178 let mut res = vec![];
180 let mut calc_target_epochs = vec![];
182 for target_epoch in target_epochs {
183 if let Some(vid_share) = self
184 .consensus
185 .read()
186 .await
187 .vid_shares()
188 .get(&view)
189 .and_then(|key_map| key_map.get(sender))
190 .and_then(|epoch_map| epoch_map.get(&target_epoch))
191 {
192 res.push(vid_share.clone());
193 } else {
194 calc_target_epochs.push(target_epoch);
195 }
196 }
197
198 if calc_target_epochs.is_empty() {
200 return res;
201 }
202
203 for target_epoch in calc_target_epochs {
204 if Consensus::calculate_and_update_vid::<V>(
205 OuterConsensus::new(Arc::clone(&self.consensus)),
206 view,
207 target_epoch,
208 self.membership.clone(),
209 &self.private_key,
210 &self.upgrade_lock,
211 )
212 .await
213 .is_none()
214 {
215 sleep(TXNS_TIMEOUT).await;
217 Consensus::calculate_and_update_vid::<V>(
218 OuterConsensus::new(Arc::clone(&self.consensus)),
219 view,
220 target_epoch,
221 self.membership.clone(),
222 &self.private_key,
223 &self.upgrade_lock,
224 )
225 .await;
226 }
227 if let Some(vid_share) = self
228 .consensus
229 .read()
230 .await
231 .vid_shares()
232 .get(&view)
233 .and_then(|key_map| key_map.get(sender))
234 .and_then(|epoch_map| epoch_map.get(&target_epoch))
235 {
236 res.push(vid_share.clone());
237 }
238 }
239 res
240 }
241
242 async fn valid_sender(
244 &self,
245 sender: &TYPES::SignatureKey,
246 epoch: Option<TYPES::Epoch>,
247 ) -> bool {
248 let Ok(memb) = self.membership.stake_table_for_epoch(epoch).await else {
249 return false;
250 };
251 memb.has_stake(sender).await
252 }
253}
254
255fn valid_signature<TYPES: NodeType>(
257 req: &DataRequest<TYPES>,
258 sender: &TYPES::SignatureKey,
259) -> bool {
260 let Ok(data) = bincode::serialize(&req.request) else {
261 return false;
262 };
263 sender.validate(&req.signature, &Sha256::digest(data))
264}
265
266pub fn run_response_task<TYPES: NodeType, V: Versions>(
270 task_state: NetworkResponseState<TYPES, V>,
271 event_stream: Receiver<Arc<HotShotEvent<TYPES>>>,
272 sender: Sender<Arc<HotShotEvent<TYPES>>>,
273) -> JoinHandle<()> {
274 spawn(task_state.run_response_loop(event_stream, sender))
275}