hotshot_task_impls/
response.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Duration};
8
9use async_broadcast::{Receiver, Sender};
10use committable::Committable;
11use hotshot_types::{
12    consensus::{Consensus, LockedConsensusState, OuterConsensus},
13    data::{EpochNumber, VidDisperseShare, ViewNumber},
14    epoch_membership::EpochMembershipCoordinator,
15    message::{Proposal, UpgradeLock},
16    traits::{network::DataRequest, node_implementation::NodeType, signature_key::SignatureKey},
17    utils::{View, ViewInner},
18};
19use sha2::{Digest, Sha256};
20use tokio::{spawn, task::JoinHandle, time::sleep};
21use tracing::instrument;
22
23use crate::{events::HotShotEvent, helpers::broadcast_event};
24/// Time to wait for txns before sending `ResponseMessage::NotFound`
25const TXNS_TIMEOUT: Duration = Duration::from_millis(100);
26
27/// Task state for the Network Request Task. The task is responsible for handling
28/// requests sent to this node by the network.  It will validate the sender,
29/// parse the request, and try to find the data request in the consensus stores.
30pub struct NetworkResponseState<TYPES: NodeType> {
31    /// Locked consensus state
32    consensus: LockedConsensusState<TYPES>,
33
34    /// Quorum membership for checking if requesters have state
35    membership: EpochMembershipCoordinator<TYPES>,
36
37    /// This replicas public key
38    pub_key: TYPES::SignatureKey,
39
40    /// This replicas private key
41    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
42
43    /// The node's id
44    id: u64,
45
46    /// Lock for a decided upgrade
47    upgrade_lock: UpgradeLock<TYPES>,
48}
49
50impl<TYPES: NodeType> NetworkResponseState<TYPES> {
51    /// Create the network request state with the info it needs
52    pub fn new(
53        consensus: LockedConsensusState<TYPES>,
54        membership: EpochMembershipCoordinator<TYPES>,
55        pub_key: TYPES::SignatureKey,
56        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
57        id: u64,
58        upgrade_lock: UpgradeLock<TYPES>,
59    ) -> Self {
60        Self {
61            consensus,
62            membership,
63            pub_key,
64            private_key,
65            id,
66            upgrade_lock,
67        }
68    }
69
70    /// Process request events or loop until a `HotShotEvent::Shutdown` is received.
71    #[instrument(skip_all, fields(id = self.id), name = "NetworkResponseState")]
72    async fn run_response_loop(
73        self,
74        mut receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
75        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
76    ) {
77        loop {
78            match receiver.recv_direct().await {
79                Ok(event) => {
80                    // break loop when false, this means shutdown received
81                    match event.as_ref() {
82                        HotShotEvent::VidRequestRecv(request, sender) => {
83                            // Verify request is valid
84                            if !valid_signature::<TYPES>(request, sender) {
85                                continue;
86                            }
87                            for vid_share in self.get_or_calc_vid_share(request.view, sender).await
88                            {
89                                tracing::debug!("Sending VID response {vid_share:?}");
90                                broadcast_event(
91                                    HotShotEvent::VidResponseSend(
92                                        self.pub_key.clone(),
93                                        sender.clone(),
94                                        vid_share,
95                                    )
96                                    .into(),
97                                    &event_sender,
98                                )
99                                .await;
100                            }
101                        },
102                        HotShotEvent::QuorumProposalRequestRecv(req, signature) => {
103                            // Make sure that this request came from who we think it did
104                            if !req.key.validate(signature, req.commit().as_ref()) {
105                                tracing::warn!("Invalid signature key on proposal request.");
106                                return;
107                            }
108
109                            let quorum_proposal_result = self
110                                .consensus
111                                .read()
112                                .await
113                                .last_proposals()
114                                .get(&req.view_number)
115                                .cloned();
116                            if let Some(quorum_proposal) = quorum_proposal_result {
117                                broadcast_event(
118                                    HotShotEvent::QuorumProposalResponseSend(
119                                        req.key.clone(),
120                                        quorum_proposal,
121                                    )
122                                    .into(),
123                                    &event_sender,
124                                )
125                                .await;
126                            }
127                        },
128                        HotShotEvent::Shutdown => {
129                            return;
130                        },
131                        _ => {},
132                    }
133                },
134                Err(e) => {
135                    tracing::error!("Failed to receive event: {e:?}");
136                },
137            }
138        }
139    }
140
141    /// Get the VID share from consensus storage, or calculate it from the payload for
142    /// the view, if we have the payload.  Stores all the shares calculated from the payload
143    /// if the calculation was done
144    async fn get_or_calc_vid_share(
145        &self,
146        view: ViewNumber,
147        sender: &TYPES::SignatureKey,
148    ) -> Vec<Proposal<TYPES, VidDisperseShare<TYPES>>> {
149        let consensus_reader = self.consensus.read().await;
150        let cur_epoch = consensus_reader.cur_epoch();
151        let next_epoch = cur_epoch.map(|epoch| epoch + 1);
152        let is_transition_block = match consensus_reader.validated_state_map().get(&view) {
153            Some(View {
154                view_inner:
155                    ViewInner::Leaf {
156                        leaf: leaf_commit, ..
157                    },
158            }) => consensus_reader.is_epoch_transition(*leaf_commit),
159            _ => false,
160        };
161        drop(consensus_reader);
162
163        // Epochs for which vid shares are required
164        let mut target_epochs = vec![];
165        if self.valid_sender(sender, cur_epoch).await {
166            // The sender belongs to the current epoch.
167            target_epochs.push(cur_epoch);
168        }
169        if is_transition_block && self.valid_sender(sender, next_epoch).await {
170            // It's the last block in epoch and the sender belongs to the next epoch.
171            target_epochs.push(next_epoch);
172        }
173
174        // Vector of vid shares that we return
175        let mut res = vec![];
176        // Epochs for which vid shares need to be calculated
177        let mut calc_target_epochs = vec![];
178        for target_epoch in target_epochs {
179            if let Some(vid_share) = self
180                .consensus
181                .read()
182                .await
183                .vid_shares()
184                .get(&view)
185                .and_then(|key_map| key_map.get(sender))
186                .and_then(|epoch_map| epoch_map.get(&target_epoch))
187            {
188                res.push(vid_share.clone());
189            } else {
190                calc_target_epochs.push(target_epoch);
191            }
192        }
193
194        // We have all the required vid shares, return them
195        if calc_target_epochs.is_empty() {
196            return res;
197        }
198
199        for target_epoch in calc_target_epochs {
200            if Consensus::calculate_and_update_vid(
201                OuterConsensus::new(Arc::clone(&self.consensus)),
202                view,
203                target_epoch,
204                self.membership.clone(),
205                &self.private_key,
206                &self.upgrade_lock,
207            )
208            .await
209            .is_none()
210            {
211                // Sleep in hope we receive txns in the meantime
212                sleep(TXNS_TIMEOUT).await;
213                Consensus::calculate_and_update_vid(
214                    OuterConsensus::new(Arc::clone(&self.consensus)),
215                    view,
216                    target_epoch,
217                    self.membership.clone(),
218                    &self.private_key,
219                    &self.upgrade_lock,
220                )
221                .await;
222            }
223            if let Some(vid_share) = self
224                .consensus
225                .read()
226                .await
227                .vid_shares()
228                .get(&view)
229                .and_then(|key_map| key_map.get(sender))
230                .and_then(|epoch_map| epoch_map.get(&target_epoch))
231            {
232                res.push(vid_share.clone());
233            }
234        }
235        res
236    }
237
238    /// Makes sure the sender is allowed to send a request in the given epoch.
239    async fn valid_sender(&self, sender: &TYPES::SignatureKey, epoch: Option<EpochNumber>) -> bool {
240        let Ok(memb) = self.membership.stake_table_for_epoch(epoch).await else {
241            return false;
242        };
243        memb.has_stake(sender).await
244    }
245}
246
247/// Check the signature
248fn valid_signature<TYPES: NodeType>(
249    req: &DataRequest<TYPES>,
250    sender: &TYPES::SignatureKey,
251) -> bool {
252    let Ok(data) = bincode::serialize(&req.request) else {
253        return false;
254    };
255    sender.validate(&req.signature, &Sha256::digest(data))
256}
257
258/// Spawn the network response task to handle incoming request for data
259/// from other nodes.  It will shutdown when it gets `HotshotEvent::Shutdown`
260/// on the `event_stream` arg.
261pub fn run_response_task<TYPES: NodeType>(
262    task_state: NetworkResponseState<TYPES>,
263    event_stream: Receiver<Arc<HotShotEvent<TYPES>>>,
264    sender: Sender<Arc<HotShotEvent<TYPES>>>,
265) -> JoinHandle<()> {
266    spawn(task_state.run_response_loop(event_stream, sender))
267}