hotshot_task_impls/
request.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, BTreeSet},
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13    time::Duration,
14};
15
16use async_broadcast::{Receiver, Sender};
17use async_trait::async_trait;
18use hotshot_task::task::TaskState;
19use hotshot_types::{
20    consensus::OuterConsensus,
21    epoch_membership::EpochMembershipCoordinator,
22    simple_vote::HasEpoch,
23    traits::{
24        block_contents::BlockHeader,
25        network::{ConnectedNetwork, DataRequest, RequestKind},
26        node_implementation::{NodeImplementation, NodeType},
27        signature_key::SignatureKey,
28    },
29    utils::is_epoch_transition,
30    vote::HasViewNumber,
31};
32use hotshot_utils::anytrace::*;
33use rand::{seq::SliceRandom, thread_rng};
34use sha2::{Digest, Sha256};
35use tokio::{spawn, task::JoinHandle, time::sleep};
36use tracing::instrument;
37
38use crate::{events::HotShotEvent, helpers::broadcast_event};
39
40/// Amount of time to try for a request before timing out.
41pub const REQUEST_TIMEOUT: Duration = Duration::from_millis(500);
42
43/// Long running task which will request information after a proposal is received.
44/// The task will wait a it's `delay` and then send a request iteratively to peers
45/// for any data they don't have related to the proposal.  For now it's just requesting VID
46/// shares.
47pub struct NetworkRequestState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
48    /// Network to send requests over
49    /// The underlying network
50    pub network: Arc<I::Network>,
51
52    /// Consensus shared state so we can check if we've gotten the information
53    /// before sending a request
54    pub consensus: OuterConsensus<TYPES>,
55
56    /// Last seen view, we won't request for proposals before older than this view
57    pub view: TYPES::View,
58
59    /// Delay before requesting peers
60    pub delay: Duration,
61
62    /// Membership (Used here only for DA)
63    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
64
65    /// This nodes public key
66    pub public_key: TYPES::SignatureKey,
67
68    /// This nodes private/signing key, used to sign requests.
69    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
70
71    /// The node's id
72    pub id: u64,
73
74    /// A flag indicating that `HotShotEvent::Shutdown` has been received
75    pub shutdown_flag: Arc<AtomicBool>,
76
77    /// A flag indicating that `HotShotEvent::Shutdown` has been received
78    pub spawned_tasks: BTreeMap<TYPES::View, Vec<JoinHandle<()>>>,
79
80    /// Number of blocks in an epoch, zero means there are no epochs
81    pub epoch_height: u64,
82}
83
84impl<TYPES: NodeType, I: NodeImplementation<TYPES>> Drop for NetworkRequestState<TYPES, I> {
85    fn drop(&mut self) {
86        self.cancel_subtasks();
87    }
88}
89
90/// Alias for a signature
91type Signature<TYPES> =
92    <<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType;
93
94#[async_trait]
95impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for NetworkRequestState<TYPES, I> {
96    type Event = HotShotEvent<TYPES>;
97
98    #[instrument(skip_all, fields(id = self.id), name = "NetworkRequestState")]
99    async fn handle_event(
100        &mut self,
101        event: Arc<Self::Event>,
102        sender: &Sender<Arc<Self::Event>>,
103        _receiver: &Receiver<Arc<Self::Event>>,
104    ) -> Result<()> {
105        match event.as_ref() {
106            HotShotEvent::QuorumProposalValidated(proposal, _) => {
107                let prop_view = proposal.data.view_number();
108                let prop_epoch = proposal.data.epoch();
109
110                // Request VID share only if:
111                // 1. we are part of the current epoch or
112                // 2. we are part of the next epoch and this is a proposal for in transition.
113                let membership = self
114                    .membership_coordinator
115                    .stake_table_for_epoch(prop_epoch)
116                    .await?;
117                let mut target_epochs = BTreeSet::new();
118                if membership.has_stake(&self.public_key).await {
119                    target_epochs.insert(prop_epoch);
120                }
121                if is_epoch_transition(
122                    proposal.data.block_header().block_number(),
123                    self.epoch_height,
124                ) && membership
125                    .next_epoch_stake_table()
126                    .await?
127                    .has_stake(&self.public_key)
128                    .await
129                {
130                    target_epochs.insert(prop_epoch.map(|e| e + 1));
131                }
132
133                ensure!(
134                    !target_epochs.is_empty(),
135                    "We don't belong to the current epoch and we don't belong to the next epoch. \
136                     Do not request VID share."
137                );
138
139                let consensus_reader = self.consensus.read().await;
140                let maybe_vid_share = consensus_reader
141                    .vid_shares()
142                    .get(&prop_view)
143                    .and_then(|shares| shares.get(&self.public_key));
144                // If we already have the VID shares for the next view, do nothing.
145                if prop_view >= self.view
146                    && (maybe_vid_share.is_none()
147                        || !target_epochs
148                            .iter()
149                            .all(|e| maybe_vid_share.unwrap().contains_key(e)))
150                {
151                    drop(consensus_reader);
152                    self.spawn_requests(prop_view, prop_epoch, sender, target_epochs)
153                        .await;
154                }
155                Ok(())
156            },
157            HotShotEvent::VidResponseRecv(sender_key, vid_proposal) => {
158                let view = vid_proposal.data.view_number();
159                let epoch = vid_proposal.data.epoch();
160
161                // Get the committee members for the view and the leader, if applicable
162                let membership_reader = self
163                    .membership_coordinator
164                    .membership_for_epoch(epoch)
165                    .await?;
166                let mut da_committee_for_view = membership_reader.da_committee_members(view).await;
167                if let Ok(leader) = membership_reader.leader(view).await {
168                    da_committee_for_view.insert(leader);
169                }
170                drop(membership_reader);
171
172                ensure!(
173                    self.spawned_tasks.contains_key(&view),
174                    info!("Received VidResponseRecv for view we didn't expect, view {view:?}")
175                );
176
177                ensure!(
178                    da_committee_for_view.contains(sender_key),
179                    warn!("Received VidResponseRecv from unexpected sender key {sender_key:?}")
180                );
181
182                ensure!(
183                    sender_key.validate(
184                        &vid_proposal.signature,
185                        vid_proposal.data.payload_commitment_ref()
186                    ),
187                    warn!("Received VidResponseRecv with invalid signature")
188                );
189
190                tracing::debug!("Received VidResponseRecv {vid_proposal:?}");
191                broadcast_event(
192                    Arc::new(HotShotEvent::VidShareRecv(
193                        sender_key.clone(),
194                        vid_proposal.clone(),
195                    )),
196                    sender,
197                )
198                .await;
199                Ok(())
200            },
201            HotShotEvent::ViewChange(view, _) => {
202                let view = *view;
203                if view > self.view {
204                    self.view = view;
205                }
206                // Clean old tasks' handles
207                self.spawned_tasks
208                    .range_mut(..self.view)
209                    .for_each(|(_, handles)| {
210                        handles.retain(|handle| !handle.is_finished());
211                    });
212                self.spawned_tasks
213                    .retain(|view, handles| view >= &self.view || !handles.is_empty());
214                Ok(())
215            },
216            _ => Ok(()),
217        }
218    }
219
220    fn cancel_subtasks(&mut self) {
221        self.shutdown_flag.store(true, Ordering::Relaxed);
222
223        while !self.spawned_tasks.is_empty() {
224            let Some((_, handles)) = self.spawned_tasks.pop_first() else {
225                break;
226            };
227
228            for handle in handles {
229                handle.abort();
230            }
231        }
232    }
233}
234
235impl<TYPES: NodeType, I: NodeImplementation<TYPES>> NetworkRequestState<TYPES, I> {
236    /// Creates and signs the payload, then will create a request task
237    async fn spawn_requests(
238        &mut self,
239        view: TYPES::View,
240        prop_epoch: Option<TYPES::Epoch>,
241        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
242        target_epochs: BTreeSet<Option<TYPES::Epoch>>,
243    ) {
244        let request = RequestKind::Vid(view, self.public_key.clone());
245
246        // First sign the request for the VID shares.
247        if let Some(signature) = self.serialize_and_sign(&request) {
248            self.create_vid_request_task(
249                request,
250                signature,
251                sender.clone(),
252                view,
253                prop_epoch,
254                target_epochs,
255            )
256            .await;
257        }
258    }
259
260    /// Creates a task that will request a VID share from a DA member and wait for the `HotShotEvent::VidResponseRecv`event
261    /// If we get the VID disperse share, broadcast `HotShotEvent::VidShareRecv` and terminate task
262    async fn create_vid_request_task(
263        &mut self,
264        request: RequestKind<TYPES>,
265        signature: Signature<TYPES>,
266        sender: Sender<Arc<HotShotEvent<TYPES>>>,
267        view: TYPES::View,
268        prop_epoch: Option<TYPES::Epoch>,
269        mut target_epochs: BTreeSet<Option<TYPES::Epoch>>,
270    ) {
271        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
272        let network = Arc::clone(&self.network);
273        let shutdown_flag = Arc::clone(&self.shutdown_flag);
274        let delay = self.delay;
275        let public_key = self.public_key.clone();
276
277        // Get the committee members for the view and the leader, if applicable
278        let membership_reader = match self
279            .membership_coordinator
280            .membership_for_epoch(prop_epoch)
281            .await
282        {
283            Ok(m) => m,
284            Err(e) => {
285                tracing::warn!(e.message);
286                return;
287            },
288        };
289        // Get committee members for view
290        let mut recipients: Vec<TYPES::SignatureKey> = membership_reader
291            .da_committee_members(view)
292            .await
293            .into_iter()
294            .collect();
295
296        // Randomize the recipients so all replicas don't overload the same 1 recipients
297        // and so we don't implicitly rely on the same replica all the time.
298        recipients.shuffle(&mut thread_rng());
299
300        // prepare request
301        let data_request = DataRequest::<TYPES> {
302            request,
303            view,
304            signature,
305        };
306        let my_id = self.id;
307        let handle: JoinHandle<()> = spawn(async move {
308            // Do the delay only if primary is up and then start sending
309            if !network.is_primary_down() {
310                sleep(delay).await;
311            }
312
313            let mut recipients_it = recipients.iter();
314            // First check if we got the data before continuing
315            while !Self::cancel_vid_request_task(
316                &consensus,
317                &sender,
318                &public_key,
319                &view,
320                &shutdown_flag,
321                my_id,
322                &mut target_epochs,
323            )
324            .await
325            {
326                // Cycle da members we send the request to each time
327                if let Some(recipient) = recipients_it.next() {
328                    if *recipient == public_key {
329                        // no need to send a message to ourselves.
330                        // just check for the data at start of loop in `cancel_vid_request_task`
331                        continue;
332                    }
333                    tracing::debug!("Sending VidRequestSend {data_request:?}, my id {my_id:?}");
334                    // First send request to a random DA member for the view
335                    broadcast_event(
336                        HotShotEvent::VidRequestSend(
337                            data_request.clone(),
338                            public_key.clone(),
339                            recipient.clone(),
340                        )
341                        .into(),
342                        &sender,
343                    )
344                    .await;
345                    // Wait before sending the request to the next recipient.
346                    sleep(REQUEST_TIMEOUT).await;
347                } else {
348                    // This shouldn't be possible `recipients_it.next()` should clone original and start over if `None`
349                    tracing::warn!(
350                        "Sent VID request to all available DA members and got no response for \
351                         view: {view:?}, my id: {my_id:?}"
352                    );
353                    return;
354                }
355            }
356        });
357        self.spawned_tasks.entry(view).or_default().push(handle);
358    }
359
360    /// Returns true if we got the data we wanted, a shutdown event was received, or the view has moved on.
361    async fn cancel_vid_request_task(
362        consensus: &OuterConsensus<TYPES>,
363        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
364        public_key: &<TYPES as NodeType>::SignatureKey,
365        view: &TYPES::View,
366        shutdown_flag: &Arc<AtomicBool>,
367        id: u64,
368        target_epochs: &mut BTreeSet<Option<TYPES::Epoch>>,
369    ) -> bool {
370        let consensus_reader = consensus.read().await;
371
372        let maybe_vid_shares = consensus_reader
373            .vid_shares()
374            .get(view)
375            .and_then(|key_map| key_map.get(public_key));
376        if let Some(vid_shares) = maybe_vid_shares {
377            tracing::debug!("Send own vid share: {vid_shares:?}, my id {id:?}");
378            for vid_share in vid_shares.values() {
379                broadcast_event(
380                    Arc::new(HotShotEvent::VidShareRecv(
381                        public_key.clone(),
382                        vid_share.clone(),
383                    )),
384                    sender,
385                )
386                .await;
387                target_epochs.remove(&vid_share.data.target_epoch());
388            }
389        }
390        let cancel = shutdown_flag.load(Ordering::Relaxed)
391            || consensus_reader.cur_view() > *view
392            || target_epochs.is_empty();
393        if cancel {
394            tracing::debug!(
395                "Canceling vid request for view {:?}, cur view is {:?}, my id {:?}",
396                view,
397                consensus_reader.cur_view(),
398                id,
399            );
400        }
401        cancel
402    }
403
404    /// Sign the serialized version of the request
405    fn serialize_and_sign(&self, request: &RequestKind<TYPES>) -> Option<Signature<TYPES>> {
406        let Ok(data) = bincode::serialize(&request) else {
407            tracing::error!("Failed to serialize request!");
408            return None;
409        };
410        let Ok(signature) = TYPES::SignatureKey::sign(&self.private_key, &Sha256::digest(data))
411        else {
412            tracing::error!("Failed to sign Data Request");
413            return None;
414        };
415        Some(signature)
416    }
417}