hotshot_task_impls/
response.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Duration};
8
9use async_broadcast::{Receiver, Sender};
10use committable::Committable;
11use hotshot_types::{
12    consensus::{Consensus, LockedConsensusState, OuterConsensus},
13    data::VidDisperseShare,
14    epoch_membership::EpochMembershipCoordinator,
15    message::{Proposal, UpgradeLock},
16    traits::{
17        network::DataRequest,
18        node_implementation::{NodeType, Versions},
19        signature_key::SignatureKey,
20    },
21    utils::{View, ViewInner},
22};
23use sha2::{Digest, Sha256};
24use tokio::{spawn, task::JoinHandle, time::sleep};
25use tracing::instrument;
26
27use crate::{events::HotShotEvent, helpers::broadcast_event};
28/// Time to wait for txns before sending `ResponseMessage::NotFound`
29const TXNS_TIMEOUT: Duration = Duration::from_millis(100);
30
31/// Task state for the Network Request Task. The task is responsible for handling
32/// requests sent to this node by the network.  It will validate the sender,
33/// parse the request, and try to find the data request in the consensus stores.
34pub struct NetworkResponseState<TYPES: NodeType, V: Versions> {
35    /// Locked consensus state
36    consensus: LockedConsensusState<TYPES>,
37
38    /// Quorum membership for checking if requesters have state
39    membership: EpochMembershipCoordinator<TYPES>,
40
41    /// This replicas public key
42    pub_key: TYPES::SignatureKey,
43
44    /// This replicas private key
45    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
46
47    /// The node's id
48    id: u64,
49
50    /// Lock for a decided upgrade
51    upgrade_lock: UpgradeLock<TYPES, V>,
52}
53
54impl<TYPES: NodeType, V: Versions> NetworkResponseState<TYPES, V> {
55    /// Create the network request state with the info it needs
56    pub fn new(
57        consensus: LockedConsensusState<TYPES>,
58        membership: EpochMembershipCoordinator<TYPES>,
59        pub_key: TYPES::SignatureKey,
60        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
61        id: u64,
62        upgrade_lock: UpgradeLock<TYPES, V>,
63    ) -> Self {
64        Self {
65            consensus,
66            membership,
67            pub_key,
68            private_key,
69            id,
70            upgrade_lock,
71        }
72    }
73
74    /// Process request events or loop until a `HotShotEvent::Shutdown` is received.
75    #[instrument(skip_all, fields(id = self.id), name = "NetworkResponseState")]
76    async fn run_response_loop(
77        self,
78        mut receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
79        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
80    ) {
81        loop {
82            match receiver.recv_direct().await {
83                Ok(event) => {
84                    // break loop when false, this means shutdown received
85                    match event.as_ref() {
86                        HotShotEvent::VidRequestRecv(request, sender) => {
87                            // Verify request is valid
88                            if !valid_signature::<TYPES>(request, sender) {
89                                continue;
90                            }
91                            for vid_share in self.get_or_calc_vid_share(request.view, sender).await
92                            {
93                                tracing::debug!("Sending VID response {vid_share:?}");
94                                broadcast_event(
95                                    HotShotEvent::VidResponseSend(
96                                        self.pub_key.clone(),
97                                        sender.clone(),
98                                        vid_share,
99                                    )
100                                    .into(),
101                                    &event_sender,
102                                )
103                                .await;
104                            }
105                        },
106                        HotShotEvent::QuorumProposalRequestRecv(req, signature) => {
107                            // Make sure that this request came from who we think it did
108                            if !req.key.validate(signature, req.commit().as_ref()) {
109                                tracing::warn!("Invalid signature key on proposal request.");
110                                return;
111                            }
112
113                            let quorum_proposal_result = self
114                                .consensus
115                                .read()
116                                .await
117                                .last_proposals()
118                                .get(&req.view_number)
119                                .cloned();
120                            if let Some(quorum_proposal) = quorum_proposal_result {
121                                broadcast_event(
122                                    HotShotEvent::QuorumProposalResponseSend(
123                                        req.key.clone(),
124                                        quorum_proposal,
125                                    )
126                                    .into(),
127                                    &event_sender,
128                                )
129                                .await;
130                            }
131                        },
132                        HotShotEvent::Shutdown => {
133                            return;
134                        },
135                        _ => {},
136                    }
137                },
138                Err(e) => {
139                    tracing::error!("Failed to receive event: {e:?}");
140                },
141            }
142        }
143    }
144
145    /// Get the VID share from consensus storage, or calculate it from the payload for
146    /// the view, if we have the payload.  Stores all the shares calculated from the payload
147    /// if the calculation was done
148    async fn get_or_calc_vid_share(
149        &self,
150        view: TYPES::View,
151        sender: &TYPES::SignatureKey,
152    ) -> Vec<Proposal<TYPES, VidDisperseShare<TYPES>>> {
153        let consensus_reader = self.consensus.read().await;
154        let cur_epoch = consensus_reader.cur_epoch();
155        let next_epoch = cur_epoch.map(|epoch| epoch + 1);
156        let is_transition_block = match consensus_reader.validated_state_map().get(&view) {
157            Some(View {
158                view_inner:
159                    ViewInner::Leaf {
160                        leaf: leaf_commit, ..
161                    },
162            }) => consensus_reader.is_epoch_transition(*leaf_commit),
163            _ => false,
164        };
165        drop(consensus_reader);
166
167        // Epochs for which vid shares are required
168        let mut target_epochs = vec![];
169        if self.valid_sender(sender, cur_epoch).await {
170            // The sender belongs to the current epoch.
171            target_epochs.push(cur_epoch);
172        }
173        if is_transition_block && self.valid_sender(sender, next_epoch).await {
174            // It's the last block in epoch and the sender belongs to the next epoch.
175            target_epochs.push(next_epoch);
176        }
177
178        // Vector of vid shares that we return
179        let mut res = vec![];
180        // Epochs for which vid shares need to be calculated
181        let mut calc_target_epochs = vec![];
182        for target_epoch in target_epochs {
183            if let Some(vid_share) = self
184                .consensus
185                .read()
186                .await
187                .vid_shares()
188                .get(&view)
189                .and_then(|key_map| key_map.get(sender))
190                .and_then(|epoch_map| epoch_map.get(&target_epoch))
191            {
192                res.push(vid_share.clone());
193            } else {
194                calc_target_epochs.push(target_epoch);
195            }
196        }
197
198        // We have all the required vid shares, return them
199        if calc_target_epochs.is_empty() {
200            return res;
201        }
202
203        for target_epoch in calc_target_epochs {
204            if Consensus::calculate_and_update_vid::<V>(
205                OuterConsensus::new(Arc::clone(&self.consensus)),
206                view,
207                target_epoch,
208                self.membership.clone(),
209                &self.private_key,
210                &self.upgrade_lock,
211            )
212            .await
213            .is_none()
214            {
215                // Sleep in hope we receive txns in the meantime
216                sleep(TXNS_TIMEOUT).await;
217                Consensus::calculate_and_update_vid::<V>(
218                    OuterConsensus::new(Arc::clone(&self.consensus)),
219                    view,
220                    target_epoch,
221                    self.membership.clone(),
222                    &self.private_key,
223                    &self.upgrade_lock,
224                )
225                .await;
226            }
227            if let Some(vid_share) = self
228                .consensus
229                .read()
230                .await
231                .vid_shares()
232                .get(&view)
233                .and_then(|key_map| key_map.get(sender))
234                .and_then(|epoch_map| epoch_map.get(&target_epoch))
235            {
236                res.push(vid_share.clone());
237            }
238        }
239        res
240    }
241
242    /// Makes sure the sender is allowed to send a request in the given epoch.
243    async fn valid_sender(
244        &self,
245        sender: &TYPES::SignatureKey,
246        epoch: Option<TYPES::Epoch>,
247    ) -> bool {
248        let Ok(memb) = self.membership.stake_table_for_epoch(epoch).await else {
249            return false;
250        };
251        memb.has_stake(sender).await
252    }
253}
254
255/// Check the signature
256fn valid_signature<TYPES: NodeType>(
257    req: &DataRequest<TYPES>,
258    sender: &TYPES::SignatureKey,
259) -> bool {
260    let Ok(data) = bincode::serialize(&req.request) else {
261        return false;
262    };
263    sender.validate(&req.signature, &Sha256::digest(data))
264}
265
266/// Spawn the network response task to handle incoming request for data
267/// from other nodes.  It will shutdown when it gets `HotshotEvent::Shutdown`
268/// on the `event_stream` arg.
269pub fn run_response_task<TYPES: NodeType, V: Versions>(
270    task_state: NetworkResponseState<TYPES, V>,
271    event_stream: Receiver<Arc<HotShotEvent<TYPES>>>,
272    sender: Sender<Arc<HotShotEvent<TYPES>>>,
273) -> JoinHandle<()> {
274    spawn(task_state.run_response_loop(event_stream, sender))
275}