hotshot_task_impls/
request.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, BTreeSet},
9    sync::{
10        Arc,
11        atomic::{AtomicBool, Ordering},
12    },
13    time::Duration,
14};
15
16use async_broadcast::{Receiver, Sender};
17use async_trait::async_trait;
18use hotshot_task::task::TaskState;
19use hotshot_types::{
20    consensus::OuterConsensus,
21    data::{EpochNumber, ViewNumber},
22    epoch_membership::EpochMembershipCoordinator,
23    simple_vote::HasEpoch,
24    traits::{
25        block_contents::BlockHeader,
26        network::{ConnectedNetwork, DataRequest, RequestKind},
27        node_implementation::{NodeImplementation, NodeType},
28        signature_key::SignatureKey,
29    },
30    utils::is_epoch_transition,
31    vote::HasViewNumber,
32};
33use hotshot_utils::anytrace::*;
34use rand::{seq::SliceRandom, thread_rng};
35use sha2::{Digest, Sha256};
36use tokio::{spawn, task::JoinHandle, time::sleep};
37use tracing::instrument;
38
39use crate::{events::HotShotEvent, helpers::broadcast_event};
40
41/// Amount of time to try for a request before timing out.
42pub const REQUEST_TIMEOUT: Duration = Duration::from_millis(500);
43
44/// Long running task which will request information after a proposal is received.
45/// The task will wait a it's `delay` and then send a request iteratively to peers
46/// for any data they don't have related to the proposal.  For now it's just requesting VID
47/// shares.
48pub struct NetworkRequestState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
49    /// Network to send requests over
50    /// The underlying network
51    pub network: Arc<I::Network>,
52
53    /// Consensus shared state so we can check if we've gotten the information
54    /// before sending a request
55    pub consensus: OuterConsensus<TYPES>,
56
57    /// Last seen view, we won't request for proposals before older than this view
58    pub view: ViewNumber,
59
60    /// Delay before requesting peers
61    pub delay: Duration,
62
63    /// Membership (Used here only for DA)
64    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
65
66    /// This nodes public key
67    pub public_key: TYPES::SignatureKey,
68
69    /// This nodes private/signing key, used to sign requests.
70    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
71
72    /// The node's id
73    pub id: u64,
74
75    /// A flag indicating that `HotShotEvent::Shutdown` has been received
76    pub shutdown_flag: Arc<AtomicBool>,
77
78    /// A flag indicating that `HotShotEvent::Shutdown` has been received
79    pub spawned_tasks: BTreeMap<ViewNumber, Vec<JoinHandle<()>>>,
80
81    /// Number of blocks in an epoch, zero means there are no epochs
82    pub epoch_height: u64,
83}
84
85impl<TYPES: NodeType, I: NodeImplementation<TYPES>> Drop for NetworkRequestState<TYPES, I> {
86    fn drop(&mut self) {
87        self.cancel_subtasks();
88    }
89}
90
91/// Alias for a signature
92type Signature<TYPES> =
93    <<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType;
94
95#[async_trait]
96impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for NetworkRequestState<TYPES, I> {
97    type Event = HotShotEvent<TYPES>;
98
99    #[instrument(skip_all, fields(id = self.id), name = "NetworkRequestState")]
100    async fn handle_event(
101        &mut self,
102        event: Arc<Self::Event>,
103        sender: &Sender<Arc<Self::Event>>,
104        _receiver: &Receiver<Arc<Self::Event>>,
105    ) -> Result<()> {
106        match event.as_ref() {
107            HotShotEvent::QuorumProposalValidated(proposal, _) => {
108                let prop_view = proposal.data.view_number();
109                let prop_epoch = proposal.data.epoch();
110
111                // Request VID share only if:
112                // 1. we are part of the current epoch or
113                // 2. we are part of the next epoch and this is a proposal for in transition.
114                let membership = self
115                    .membership_coordinator
116                    .stake_table_for_epoch(prop_epoch)
117                    .await?;
118                let mut target_epochs = BTreeSet::new();
119                if membership.has_stake(&self.public_key).await {
120                    target_epochs.insert(prop_epoch);
121                }
122                if is_epoch_transition(
123                    proposal.data.block_header().block_number(),
124                    self.epoch_height,
125                ) && membership
126                    .next_epoch_stake_table()
127                    .await?
128                    .has_stake(&self.public_key)
129                    .await
130                {
131                    target_epochs.insert(prop_epoch.map(|e| e + 1));
132                }
133
134                ensure!(
135                    !target_epochs.is_empty(),
136                    "We don't belong to the current epoch and we don't belong to the next epoch. \
137                     Do not request VID share."
138                );
139
140                let consensus_reader = self.consensus.read().await;
141                let maybe_vid_share = consensus_reader
142                    .vid_shares()
143                    .get(&prop_view)
144                    .and_then(|shares| shares.get(&self.public_key));
145                // If we already have the VID shares for the next view, do nothing.
146                if prop_view >= self.view
147                    && (maybe_vid_share.is_none()
148                        || !target_epochs
149                            .iter()
150                            .all(|e| maybe_vid_share.unwrap().contains_key(e)))
151                {
152                    drop(consensus_reader);
153                    self.spawn_requests(prop_view, prop_epoch, sender, target_epochs)
154                        .await;
155                }
156                Ok(())
157            },
158            HotShotEvent::VidResponseRecv(sender_key, vid_proposal) => {
159                let view = vid_proposal.data.view_number();
160                let epoch = vid_proposal.data.epoch();
161
162                // Get the committee members for the view and the leader, if applicable
163                let membership_reader = self
164                    .membership_coordinator
165                    .membership_for_epoch(epoch)
166                    .await?;
167                let mut da_committee_for_view = membership_reader.da_committee_members(view).await;
168                if let Ok(leader) = membership_reader.leader(view).await {
169                    da_committee_for_view.insert(leader);
170                }
171                drop(membership_reader);
172
173                ensure!(
174                    self.spawned_tasks.contains_key(&view),
175                    info!("Received VidResponseRecv for view we didn't expect, view {view:?}")
176                );
177
178                ensure!(
179                    da_committee_for_view.contains(sender_key),
180                    warn!("Received VidResponseRecv from unexpected sender key {sender_key:?}")
181                );
182
183                ensure!(
184                    sender_key.validate(
185                        &vid_proposal.signature,
186                        vid_proposal.data.payload_commitment_ref()
187                    ),
188                    warn!("Received VidResponseRecv with invalid signature")
189                );
190
191                tracing::debug!("Received VidResponseRecv {vid_proposal:?}");
192                broadcast_event(
193                    Arc::new(HotShotEvent::VidShareRecv(
194                        sender_key.clone(),
195                        vid_proposal.clone(),
196                    )),
197                    sender,
198                )
199                .await;
200                Ok(())
201            },
202            HotShotEvent::ViewChange(view, _) => {
203                let view = *view;
204                if view > self.view {
205                    self.view = view;
206                }
207                // Clean old tasks' handles
208                self.spawned_tasks
209                    .range_mut(..self.view)
210                    .for_each(|(_, handles)| {
211                        handles.retain(|handle| !handle.is_finished());
212                    });
213                self.spawned_tasks
214                    .retain(|view, handles| view >= &self.view || !handles.is_empty());
215                Ok(())
216            },
217            _ => Ok(()),
218        }
219    }
220
221    fn cancel_subtasks(&mut self) {
222        self.shutdown_flag.store(true, Ordering::Relaxed);
223
224        while !self.spawned_tasks.is_empty() {
225            let Some((_, handles)) = self.spawned_tasks.pop_first() else {
226                break;
227            };
228
229            for handle in handles {
230                handle.abort();
231            }
232        }
233    }
234}
235
236impl<TYPES: NodeType, I: NodeImplementation<TYPES>> NetworkRequestState<TYPES, I> {
237    /// Creates and signs the payload, then will create a request task
238    async fn spawn_requests(
239        &mut self,
240        view: ViewNumber,
241        prop_epoch: Option<EpochNumber>,
242        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
243        target_epochs: BTreeSet<Option<EpochNumber>>,
244    ) {
245        let request = RequestKind::Vid(view, self.public_key.clone());
246
247        // First sign the request for the VID shares.
248        if let Some(signature) = self.serialize_and_sign(&request) {
249            self.create_vid_request_task(
250                request,
251                signature,
252                sender.clone(),
253                view,
254                prop_epoch,
255                target_epochs,
256            )
257            .await;
258        }
259    }
260
261    /// Creates a task that will request a VID share from a DA member and wait for the `HotShotEvent::VidResponseRecv`event
262    /// If we get the VID disperse share, broadcast `HotShotEvent::VidShareRecv` and terminate task
263    async fn create_vid_request_task(
264        &mut self,
265        request: RequestKind<TYPES>,
266        signature: Signature<TYPES>,
267        sender: Sender<Arc<HotShotEvent<TYPES>>>,
268        view: ViewNumber,
269        prop_epoch: Option<EpochNumber>,
270        mut target_epochs: BTreeSet<Option<EpochNumber>>,
271    ) {
272        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
273        let network = Arc::clone(&self.network);
274        let shutdown_flag = Arc::clone(&self.shutdown_flag);
275        let delay = self.delay;
276        let public_key = self.public_key.clone();
277
278        // Get the committee members for the view and the leader, if applicable
279        let membership_reader = match self
280            .membership_coordinator
281            .membership_for_epoch(prop_epoch)
282            .await
283        {
284            Ok(m) => m,
285            Err(e) => {
286                tracing::warn!(e.message);
287                return;
288            },
289        };
290        // Get committee members for view
291        let mut recipients: Vec<TYPES::SignatureKey> = membership_reader
292            .da_committee_members(view)
293            .await
294            .into_iter()
295            .collect();
296
297        // Randomize the recipients so all replicas don't overload the same 1 recipients
298        // and so we don't implicitly rely on the same replica all the time.
299        recipients.shuffle(&mut thread_rng());
300
301        // prepare request
302        let data_request = DataRequest::<TYPES> {
303            request,
304            view,
305            signature,
306        };
307        let my_id = self.id;
308        let handle: JoinHandle<()> = spawn(async move {
309            // Do the delay only if primary is up and then start sending
310            if !network.is_primary_down() {
311                sleep(delay).await;
312            }
313
314            let mut recipients_it = recipients.iter();
315            // First check if we got the data before continuing
316            while !Self::cancel_vid_request_task(
317                &consensus,
318                &public_key,
319                &view,
320                &shutdown_flag,
321                my_id,
322                &mut target_epochs,
323            )
324            .await
325            {
326                // Cycle da members we send the request to each time
327                if let Some(recipient) = recipients_it.next() {
328                    if *recipient == public_key {
329                        // no need to send a message to ourselves.
330                        // just check for the data at start of loop in `cancel_vid_request_task`
331                        continue;
332                    }
333                    tracing::debug!("Sending VidRequestSend {data_request:?}, my id {my_id:?}");
334                    // First send request to a random DA member for the view
335                    broadcast_event(
336                        HotShotEvent::VidRequestSend(
337                            data_request.clone(),
338                            public_key.clone(),
339                            recipient.clone(),
340                        )
341                        .into(),
342                        &sender,
343                    )
344                    .await;
345                    // Wait before sending the request to the next recipient.
346                    sleep(REQUEST_TIMEOUT).await;
347                } else {
348                    // This shouldn't be possible `recipients_it.next()` should clone original and start over if `None`
349                    tracing::warn!(
350                        "Sent VID request to all available DA members and got no response for \
351                         view: {view:?}, my id: {my_id:?}"
352                    );
353                    return;
354                }
355            }
356        });
357        self.spawned_tasks.entry(view).or_default().push(handle);
358    }
359
360    /// Returns true if we got the data we wanted, a shutdown event was received, or the view has moved on.
361    async fn cancel_vid_request_task(
362        consensus: &OuterConsensus<TYPES>,
363        public_key: &<TYPES as NodeType>::SignatureKey,
364        view: &ViewNumber,
365        shutdown_flag: &Arc<AtomicBool>,
366        id: u64,
367        target_epochs: &mut BTreeSet<Option<EpochNumber>>,
368    ) -> bool {
369        let consensus_reader = consensus.read().await;
370
371        let maybe_vid_shares = consensus_reader
372            .vid_shares()
373            .get(view)
374            .and_then(|key_map| key_map.get(public_key));
375        if let Some(vid_shares) = maybe_vid_shares {
376            tracing::debug!("Send own vid share: {vid_shares:?}, my id {id:?}");
377            for vid_share in vid_shares.values() {
378                target_epochs.remove(&vid_share.data.target_epoch());
379            }
380        }
381        let cancel = shutdown_flag.load(Ordering::Relaxed)
382            || consensus_reader.cur_view() > *view
383            || target_epochs.is_empty();
384        if cancel {
385            tracing::debug!(
386                "Canceling vid request for view {:?}, cur view is {:?}, my id {:?}",
387                view,
388                consensus_reader.cur_view(),
389                id,
390            );
391        }
392        cancel
393    }
394
395    /// Sign the serialized version of the request
396    fn serialize_and_sign(&self, request: &RequestKind<TYPES>) -> Option<Signature<TYPES>> {
397        let Ok(data) = bincode::serialize(&request) else {
398            tracing::error!("Failed to serialize request!");
399            return None;
400        };
401        let Ok(signature) = TYPES::SignatureKey::sign(&self.private_key, &Sha256::digest(data))
402        else {
403            tracing::error!("Failed to sign Data Request");
404            return None;
405        };
406        Some(signature)
407    }
408}