1use std::{sync::Arc, time::Duration};
8
9use async_broadcast::{Receiver, Sender};
10use committable::Committable;
11use hotshot_types::{
12 consensus::{Consensus, LockedConsensusState, OuterConsensus},
13 data::{EpochNumber, VidDisperseShare, ViewNumber},
14 epoch_membership::EpochMembershipCoordinator,
15 message::{Proposal, UpgradeLock},
16 traits::{network::DataRequest, node_implementation::NodeType, signature_key::SignatureKey},
17 utils::{View, ViewInner},
18};
19use sha2::{Digest, Sha256};
20use tokio::{spawn, task::JoinHandle, time::sleep};
21use tracing::instrument;
22
23use crate::{events::HotShotEvent, helpers::broadcast_event};
24const TXNS_TIMEOUT: Duration = Duration::from_millis(100);
26
27pub struct NetworkResponseState<TYPES: NodeType> {
31 consensus: LockedConsensusState<TYPES>,
33
34 membership: EpochMembershipCoordinator<TYPES>,
36
37 pub_key: TYPES::SignatureKey,
39
40 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
42
43 id: u64,
45
46 upgrade_lock: UpgradeLock<TYPES>,
48}
49
50impl<TYPES: NodeType> NetworkResponseState<TYPES> {
51 pub fn new(
53 consensus: LockedConsensusState<TYPES>,
54 membership: EpochMembershipCoordinator<TYPES>,
55 pub_key: TYPES::SignatureKey,
56 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
57 id: u64,
58 upgrade_lock: UpgradeLock<TYPES>,
59 ) -> Self {
60 Self {
61 consensus,
62 membership,
63 pub_key,
64 private_key,
65 id,
66 upgrade_lock,
67 }
68 }
69
70 #[instrument(skip_all, fields(id = self.id), name = "NetworkResponseState")]
72 async fn run_response_loop(
73 self,
74 mut receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
75 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
76 ) {
77 loop {
78 match receiver.recv_direct().await {
79 Ok(event) => {
80 match event.as_ref() {
82 HotShotEvent::VidRequestRecv(request, sender) => {
83 if !valid_signature::<TYPES>(request, sender) {
85 continue;
86 }
87 for vid_share in self.get_or_calc_vid_share(request.view, sender).await
88 {
89 tracing::debug!("Sending VID response {vid_share:?}");
90 broadcast_event(
91 HotShotEvent::VidResponseSend(
92 self.pub_key.clone(),
93 sender.clone(),
94 vid_share,
95 )
96 .into(),
97 &event_sender,
98 )
99 .await;
100 }
101 },
102 HotShotEvent::QuorumProposalRequestRecv(req, signature) => {
103 if !req.key.validate(signature, req.commit().as_ref()) {
105 tracing::warn!("Invalid signature key on proposal request.");
106 return;
107 }
108
109 let quorum_proposal_result = self
110 .consensus
111 .read()
112 .await
113 .last_proposals()
114 .get(&req.view_number)
115 .cloned();
116 if let Some(quorum_proposal) = quorum_proposal_result {
117 broadcast_event(
118 HotShotEvent::QuorumProposalResponseSend(
119 req.key.clone(),
120 quorum_proposal,
121 )
122 .into(),
123 &event_sender,
124 )
125 .await;
126 }
127 },
128 HotShotEvent::Shutdown => {
129 return;
130 },
131 _ => {},
132 }
133 },
134 Err(e) => {
135 tracing::error!("Failed to receive event: {e:?}");
136 },
137 }
138 }
139 }
140
141 async fn get_or_calc_vid_share(
145 &self,
146 view: ViewNumber,
147 sender: &TYPES::SignatureKey,
148 ) -> Vec<Proposal<TYPES, VidDisperseShare<TYPES>>> {
149 let consensus_reader = self.consensus.read().await;
150 let cur_epoch = consensus_reader.cur_epoch();
151 let next_epoch = cur_epoch.map(|epoch| epoch + 1);
152 let is_transition_block = match consensus_reader.validated_state_map().get(&view) {
153 Some(View {
154 view_inner:
155 ViewInner::Leaf {
156 leaf: leaf_commit, ..
157 },
158 }) => consensus_reader.is_epoch_transition(*leaf_commit),
159 _ => false,
160 };
161 drop(consensus_reader);
162
163 let mut target_epochs = vec![];
165 if self.valid_sender(sender, cur_epoch).await {
166 target_epochs.push(cur_epoch);
168 }
169 if is_transition_block && self.valid_sender(sender, next_epoch).await {
170 target_epochs.push(next_epoch);
172 }
173
174 let mut res = vec![];
176 let mut calc_target_epochs = vec![];
178 for target_epoch in target_epochs {
179 if let Some(vid_share) = self
180 .consensus
181 .read()
182 .await
183 .vid_shares()
184 .get(&view)
185 .and_then(|key_map| key_map.get(sender))
186 .and_then(|epoch_map| epoch_map.get(&target_epoch))
187 {
188 res.push(vid_share.clone());
189 } else {
190 calc_target_epochs.push(target_epoch);
191 }
192 }
193
194 if calc_target_epochs.is_empty() {
196 return res;
197 }
198
199 for target_epoch in calc_target_epochs {
200 if Consensus::calculate_and_update_vid(
201 OuterConsensus::new(Arc::clone(&self.consensus)),
202 view,
203 target_epoch,
204 self.membership.clone(),
205 &self.private_key,
206 &self.upgrade_lock,
207 )
208 .await
209 .is_none()
210 {
211 sleep(TXNS_TIMEOUT).await;
213 Consensus::calculate_and_update_vid(
214 OuterConsensus::new(Arc::clone(&self.consensus)),
215 view,
216 target_epoch,
217 self.membership.clone(),
218 &self.private_key,
219 &self.upgrade_lock,
220 )
221 .await;
222 }
223 if let Some(vid_share) = self
224 .consensus
225 .read()
226 .await
227 .vid_shares()
228 .get(&view)
229 .and_then(|key_map| key_map.get(sender))
230 .and_then(|epoch_map| epoch_map.get(&target_epoch))
231 {
232 res.push(vid_share.clone());
233 }
234 }
235 res
236 }
237
238 async fn valid_sender(&self, sender: &TYPES::SignatureKey, epoch: Option<EpochNumber>) -> bool {
240 let Ok(memb) = self.membership.stake_table_for_epoch(epoch).await else {
241 return false;
242 };
243 memb.has_stake(sender).await
244 }
245}
246
247fn valid_signature<TYPES: NodeType>(
249 req: &DataRequest<TYPES>,
250 sender: &TYPES::SignatureKey,
251) -> bool {
252 let Ok(data) = bincode::serialize(&req.request) else {
253 return false;
254 };
255 sender.validate(&req.signature, &Sha256::digest(data))
256}
257
258pub fn run_response_task<TYPES: NodeType>(
262 task_state: NetworkResponseState<TYPES>,
263 event_stream: Receiver<Arc<HotShotEvent<TYPES>>>,
264 sender: Sender<Arc<HotShotEvent<TYPES>>>,
265) -> JoinHandle<()> {
266 spawn(task_state.run_response_loop(event_stream, sender))
267}