hotshot_task_impls/quorum_proposal_recv/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7#![allow(unused_imports)]
8
9use std::{collections::BTreeMap, sync::Arc};
10
11use async_broadcast::{broadcast, Receiver, Sender};
12use async_lock::RwLock;
13use async_trait::async_trait;
14use either::Either;
15use futures::future::{err, join_all};
16use hotshot_task::task::{Task, TaskState};
17use hotshot_types::{
18    consensus::{Consensus, OuterConsensus},
19    data::{EpochNumber, Leaf, ViewChangeEvidence2},
20    epoch_membership::{self, EpochMembership, EpochMembershipCoordinator},
21    event::Event,
22    message::UpgradeLock,
23    simple_certificate::UpgradeCertificate,
24    simple_vote::HasEpoch,
25    traits::{
26        block_contents::BlockHeader,
27        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
28        signature_key::SignatureKey,
29    },
30    utils::option_epoch_from_block_number,
31    vote::{Certificate, HasViewNumber},
32};
33use hotshot_utils::anytrace::{bail, Result};
34use tokio::task::JoinHandle;
35use tracing::{debug, error, info, instrument, warn};
36use vbs::version::Version;
37
38use self::handlers::handle_quorum_proposal_recv;
39use crate::{
40    events::{HotShotEvent, ProposalMissing},
41    helpers::{broadcast_event, fetch_proposal, parent_leaf_and_state},
42};
43/// Event handlers for this task.
44mod handlers;
45
46/// The state for the quorum proposal task. Contains all of the information for
47/// handling [`HotShotEvent::QuorumProposalRecv`] events.
48pub struct QuorumProposalRecvTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
49    /// Our public key
50    pub public_key: TYPES::SignatureKey,
51
52    /// Our Private Key
53    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
54
55    /// Reference to consensus. The replica will require a write lock on this.
56    pub consensus: OuterConsensus<TYPES>,
57
58    /// View number this view is executing in.
59    pub cur_view: TYPES::View,
60
61    /// Epoch number this node is executing in.
62    pub cur_epoch: Option<TYPES::Epoch>,
63
64    /// Membership for Quorum Certs/votes
65    pub membership: EpochMembershipCoordinator<TYPES>,
66
67    /// View timeout from config.
68    pub timeout: u64,
69
70    /// Output events to application
71    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
72
73    /// This node's storage ref
74    pub storage: I::Storage,
75
76    /// Spawned tasks related to a specific view, so we can cancel them when
77    /// they are stale
78    pub spawned_tasks: BTreeMap<TYPES::View, Vec<JoinHandle<()>>>,
79
80    /// The node's id
81    pub id: u64,
82
83    /// Lock for a decided upgrade
84    pub upgrade_lock: UpgradeLock<TYPES, V>,
85
86    /// Number of blocks in an epoch, zero means there are no epochs
87    pub epoch_height: u64,
88
89    /// First view in which epoch version takes effect
90    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
91}
92
93/// all the info we need to validate a proposal.  This makes it easy to spawn an effemeral task to
94/// do all the proposal validation without blocking the long running one
95pub(crate) struct ValidationInfo<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
96    /// The node's id
97    pub id: u64,
98
99    /// Our public key
100    pub(crate) public_key: TYPES::SignatureKey,
101
102    /// Our Private Key
103    pub(crate) private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
104
105    /// Reference to consensus. The replica will require a write lock on this.
106    pub(crate) consensus: OuterConsensus<TYPES>,
107
108    /// Membership for Quorum Certs/votes
109    pub membership: EpochMembership<TYPES>,
110
111    /// Output events to application
112    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
113
114    /// This node's storage ref
115    pub(crate) storage: I::Storage,
116
117    /// Lock for a decided upgrade
118    pub(crate) upgrade_lock: UpgradeLock<TYPES, V>,
119
120    /// Number of blocks in an epoch, zero means there are no epochs
121    pub epoch_height: u64,
122
123    /// First view in which epoch version takes effect
124    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
125}
126
127impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
128    QuorumProposalRecvTaskState<TYPES, I, V>
129{
130    /// Cancel all tasks the consensus tasks has spawned before the given view
131    pub fn cancel_tasks(&mut self, view: TYPES::View) {
132        let keep = self.spawned_tasks.split_off(&view);
133        while let Some((_, tasks)) = self.spawned_tasks.pop_first() {
134            for task in tasks {
135                task.abort();
136            }
137        }
138        self.spawned_tasks = keep;
139    }
140
141    /// Handles all consensus events relating to propose and vote-enabling events.
142    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Consensus replica task", level = "error")]
143    #[allow(unused_variables)]
144    pub async fn handle(
145        &mut self,
146        event: Arc<HotShotEvent<TYPES>>,
147        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
148        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
149    ) {
150        match event.as_ref() {
151            HotShotEvent::QuorumProposalRecv(proposal, sender) => {
152                tracing::debug!(
153                    "Quorum proposal recv for view {}",
154                    proposal.data.view_number()
155                );
156                if self.consensus.read().await.cur_view() > proposal.data.view_number()
157                    || self.cur_view > proposal.data.view_number()
158                {
159                    tracing::warn!(
160                        "Throwing away old proposal for view {}",
161                        proposal.data.view_number()
162                    );
163                    return;
164                }
165                let proposal_epoch = option_epoch_from_block_number::<TYPES>(
166                    proposal.data.proposal.epoch().is_some(),
167                    proposal.data.block_header().block_number(),
168                    self.epoch_height,
169                );
170                let Ok(epoch_membership) =
171                    self.membership.membership_for_epoch(proposal_epoch).await
172                else {
173                    tracing::warn!("No Stake table for epoch = {proposal_epoch:?}");
174                    return;
175                };
176                let validation_info = ValidationInfo::<TYPES, I, V> {
177                    id: self.id,
178                    public_key: self.public_key.clone(),
179                    private_key: self.private_key.clone(),
180                    consensus: self.consensus.clone(),
181                    membership: epoch_membership,
182                    output_event_stream: self.output_event_stream.clone(),
183                    storage: self.storage.clone(),
184                    upgrade_lock: self.upgrade_lock.clone(),
185                    epoch_height: self.epoch_height,
186                    first_epoch: self.first_epoch,
187                };
188                match handle_quorum_proposal_recv(
189                    proposal,
190                    sender,
191                    &event_sender,
192                    &event_receiver,
193                    validation_info,
194                )
195                .await
196                {
197                    Ok(()) => {},
198                    Err(e) => tracing::error!(?e, "Failed to validate the proposal"),
199                }
200            },
201            HotShotEvent::ViewChange(view, epoch) => {
202                if *epoch > self.cur_epoch {
203                    self.cur_epoch = *epoch;
204                }
205                if self.cur_view >= *view {
206                    return;
207                }
208                self.cur_view = *view;
209                // cancel task for any view 2 views prior or more.  The view here is the oldest
210                // view we want to KEEP tasks for.  We keep the view prior to this because
211                // we might still be processing the proposal from view V which caused us
212                // to enter view V + 1.
213                let oldest_view_to_keep = TYPES::View::new(view.saturating_sub(1));
214                self.cancel_tasks(oldest_view_to_keep);
215            },
216            HotShotEvent::SetFirstEpoch(view, epoch) => {
217                self.first_epoch = Some((*view, *epoch));
218            },
219            _ => {},
220        }
221    }
222}
223
224#[async_trait]
225impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
226    for QuorumProposalRecvTaskState<TYPES, I, V>
227{
228    type Event = HotShotEvent<TYPES>;
229
230    async fn handle_event(
231        &mut self,
232        event: Arc<Self::Event>,
233        sender: &Sender<Arc<Self::Event>>,
234        receiver: &Receiver<Arc<Self::Event>>,
235    ) -> Result<()> {
236        self.handle(event, sender.clone(), receiver.clone()).await;
237
238        Ok(())
239    }
240
241    fn cancel_subtasks(&mut self) {
242        while !self.spawned_tasks.is_empty() {
243            let Some((_, handles)) = self.spawned_tasks.pop_first() else {
244                break;
245            };
246            for handle in handles {
247                handle.abort();
248            }
249        }
250    }
251}