hotshot_task_impls/consensus/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Instant};
8
9use alloy::primitives::U256;
10use async_broadcast::{Receiver, Sender};
11use async_trait::async_trait;
12use handlers::handle_epoch_root_quorum_vote_recv;
13use hotshot_task::task::TaskState;
14use hotshot_types::{
15    consensus::OuterConsensus,
16    data::{EpochNumber, ViewNumber},
17    epoch_membership::EpochMembershipCoordinator,
18    event::Event,
19    message::UpgradeLock,
20    simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, TimeoutCertificate2},
21    simple_vote::{HasEpoch, NextEpochQuorumVote2, QuorumVote2, TimeoutVote2},
22    stake_table::HSStakeTable,
23    traits::{
24        node_implementation::{NodeImplementation, NodeType},
25        signature_key::SignatureKey,
26        storage::Storage,
27    },
28    utils::{epoch_from_block_number, is_last_block},
29    vote::HasViewNumber,
30};
31use hotshot_utils::anytrace::*;
32use tokio::task::JoinHandle;
33use tracing::instrument;
34
35use self::handlers::{
36    handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
37};
38use crate::{
39    events::HotShotEvent,
40    helpers::{broadcast_view_change, validate_qc_and_next_epoch_qc},
41    vote_collection::{EpochRootVoteCollectorsMap, VoteCollectorsMap},
42};
43
44/// Event handlers for use in the `handle` method.
45mod handlers;
46
47/// Task state for the Consensus task.
48pub struct ConsensusTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
49    /// Our public key
50    pub public_key: TYPES::SignatureKey,
51
52    /// Our Private Key
53    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
54
55    /// Immutable instance state
56    pub instance_state: Arc<TYPES::InstanceState>,
57
58    /// The underlying network
59    pub network: Arc<I::Network>,
60
61    /// Membership for Quorum Certs/votes
62    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
63
64    /// A map of `QuorumVote` collector tasks.
65    pub vote_collectors: VoteCollectorsMap<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>,
66
67    /// A map of `EpochRootQuorumVote` collector tasks.
68    pub epoch_root_vote_collectors: EpochRootVoteCollectorsMap<TYPES>,
69
70    /// A map of `QuorumVote` collector tasks. They collect votes from the nodes in the next epoch.
71    pub next_epoch_vote_collectors:
72        VoteCollectorsMap<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>,
73
74    /// A map of `TimeoutVote` collector tasks.
75    pub timeout_vote_collectors:
76        VoteCollectorsMap<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>,
77
78    /// The view number that this node is currently executing in.
79    pub cur_view: ViewNumber,
80
81    /// Timestamp this view starts at.
82    pub cur_view_time: i64,
83
84    /// The epoch number that this node is currently executing in.
85    pub cur_epoch: Option<EpochNumber>,
86
87    /// Output events to application
88    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
89
90    /// Timeout task handle
91    pub timeout_task: JoinHandle<()>,
92
93    /// View timeout from config.
94    pub timeout: u64,
95
96    /// A reference to the metrics trait.
97    pub consensus: OuterConsensus<TYPES>,
98
99    /// A reference to the storage trait.
100    pub storage: I::Storage,
101
102    /// The node's id
103    pub id: u64,
104
105    /// Lock for a decided upgrade
106    pub upgrade_lock: UpgradeLock<TYPES>,
107
108    /// Number of blocks in an epoch, zero means there are no epochs
109    pub epoch_height: u64,
110
111    /// The time this view started
112    pub view_start_time: Instant,
113
114    /// First view in which epoch version takes effect
115    pub first_epoch: Option<(ViewNumber, EpochNumber)>,
116}
117
118impl<TYPES: NodeType, I: NodeImplementation<TYPES>> ConsensusTaskState<TYPES, I> {
119    /// Handles a consensus event received on the event stream
120    #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, cur_epoch = self.cur_epoch.map(|x| *x)), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")]
121    pub async fn handle(
122        &mut self,
123        event: Arc<HotShotEvent<TYPES>>,
124        sender: Sender<Arc<HotShotEvent<TYPES>>>,
125    ) -> Result<()> {
126        match event.as_ref() {
127            HotShotEvent::QuorumVoteRecv(vote) => {
128                if let Err(e) =
129                    handle_quorum_vote_recv(vote, Arc::clone(&event), &sender, self).await
130                {
131                    tracing::debug!("Failed to handle QuorumVoteRecv event; error = {e}");
132                }
133            },
134            HotShotEvent::EpochRootQuorumVoteRecv(vote) => {
135                if let Err(e) =
136                    handle_epoch_root_quorum_vote_recv(vote, Arc::clone(&event), &sender, self)
137                        .await
138                {
139                    tracing::debug!("Failed to handle EpochRootQuorumVoteRecv event; error = {e}");
140                }
141            },
142            HotShotEvent::TimeoutVoteRecv(vote) => {
143                if let Err(e) =
144                    handle_timeout_vote_recv(vote, Arc::clone(&event), &sender, self).await
145                {
146                    tracing::debug!("Failed to handle TimeoutVoteRecv event; error = {e}");
147                }
148            },
149            HotShotEvent::SetFirstEpoch(view, epoch) => {
150                self.first_epoch = Some((*view, *epoch));
151            },
152            HotShotEvent::ViewChange(new_view_number, epoch_number) => {
153                // Request the randomized stake table for the subsequent epoch,
154                // to trigger catchup and the DRB calculation if it happens to be missing.
155                //
156                // the frequency is dynamic, depending on the epoch height. if the epoch height is low
157                // (e.g. like it is in tests), we do this every view
158                let frequency = (self.epoch_height / 30).clamp(1, 100);
159                if **new_view_number % frequency == 0 {
160                    let _ = self
161                        .membership_coordinator
162                        .membership_for_epoch(epoch_number.map(|e| e + 1))
163                        .await;
164                }
165
166                if let Err(e) =
167                    handle_view_change(*new_view_number, *epoch_number, &sender, self).await
168                {
169                    tracing::trace!("Failed to handle ViewChange event; error = {e}");
170                }
171                self.view_start_time = Instant::now();
172            },
173            HotShotEvent::Timeout(view_number, epoch) => {
174                if let Err(e) = handle_timeout(*view_number, *epoch, &sender, self).await {
175                    tracing::debug!("Failed to handle Timeout event; error = {e}");
176                }
177            },
178            HotShotEvent::ExtendedQc2Formed(eqc) => {
179                let cert_view = eqc.view_number();
180                let Some(cert_block_number) = eqc.data.block_number else {
181                    tracing::error!("Received extended QC but no block number");
182                    return Ok(());
183                };
184                let cert_epoch = epoch_from_block_number(cert_block_number, self.epoch_height);
185                tracing::error!("Formed Extended QC for view {cert_view} and epoch {cert_epoch}.");
186                // Transition to the new epoch by sending ViewChange
187                let next_epoch = EpochNumber::new(cert_epoch + 1);
188                broadcast_view_change(&sender, cert_view + 1, Some(next_epoch), self.first_epoch)
189                    .await;
190                tracing::info!("Entering new epoch: {next_epoch}");
191                tracing::info!(
192                    "Stake table for epoch {}:\n\n{:?}",
193                    next_epoch,
194                    self.membership_coordinator
195                        .stake_table_for_epoch(Some(next_epoch))
196                        .await?
197                        .stake_table()
198                        .await
199                );
200            },
201            HotShotEvent::ExtendedQcRecv(high_qc, next_epoch_high_qc, _) => {
202                if !high_qc
203                    .data
204                    .block_number
205                    .is_some_and(|bn| is_last_block(bn, self.epoch_height))
206                {
207                    tracing::warn!("Received extended QC but we can't verify the leaf is extended");
208                    return Ok(());
209                }
210                if let Err(e) = validate_qc_and_next_epoch_qc(
211                    high_qc,
212                    Some(next_epoch_high_qc),
213                    &self.consensus,
214                    &self.membership_coordinator,
215                    &self.upgrade_lock,
216                    self.epoch_height,
217                )
218                .await
219                {
220                    tracing::error!("Received invalid extended QC: {e}");
221                    return Ok(());
222                }
223
224                let next_epoch = high_qc.data.epoch().map(|x| x + 1);
225
226                let mut consensus_writer = self.consensus.write().await;
227                let high_qc_updated = consensus_writer.update_high_qc(high_qc.clone()).is_ok();
228                let next_high_qc_updated = consensus_writer
229                    .update_next_epoch_high_qc(next_epoch_high_qc.clone())
230                    .is_ok();
231                if let Some(next_epoch) = next_epoch {
232                    consensus_writer.update_validator_participation_epoch(next_epoch);
233                    let (stake_table, success_threshold) = if let Ok(epoch_membership) = self
234                        .membership_coordinator
235                        .stake_table_for_epoch(Some(next_epoch))
236                        .await
237                    {
238                        (
239                            epoch_membership.stake_table().await,
240                            epoch_membership.success_threshold().await,
241                        )
242                    } else {
243                        tracing::warn!(
244                            "Failed to get stake table for epoch {} while updating vote \
245                             participation",
246                            next_epoch
247                        );
248                        (HSStakeTable::default(), U256::MAX)
249                    };
250                    consensus_writer
251                        .update_vote_participation_epoch(
252                            stake_table,
253                            success_threshold,
254                            Some(next_epoch),
255                        )
256                        .context(warn!("Updating vote participation"))?;
257                }
258                drop(consensus_writer);
259
260                self.storage
261                    .update_high_qc2(high_qc.clone())
262                    .await
263                    .map_err(|_| warn!("Failed to update high QC"))?;
264                self.storage
265                    .update_next_epoch_high_qc2(next_epoch_high_qc.clone())
266                    .await
267                    .map_err(|_| warn!("Failed to update next epoch high QC"))?;
268                self.storage
269                    .update_eqc(high_qc.clone(), next_epoch_high_qc.clone())
270                    .await
271                    .map_err(|_| warn!("Failed to store eQC"))?;
272
273                tracing::debug!(
274                    "Received Extended QC for view {} and epoch {:?}.",
275                    high_qc.view_number(),
276                    high_qc.epoch()
277                );
278                if high_qc_updated || next_high_qc_updated {
279                    // Send ViewChange indicating new view and new epoch.
280                    let next_epoch = high_qc.data.epoch().map(|x| x + 1);
281                    tracing::info!("Entering new epoch: {next_epoch:?}");
282                    broadcast_view_change(
283                        &sender,
284                        high_qc.view_number() + 1,
285                        next_epoch,
286                        self.first_epoch,
287                    )
288                    .await;
289                }
290            },
291            _ => {},
292        }
293
294        Ok(())
295    }
296}
297
298#[async_trait]
299impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for ConsensusTaskState<TYPES, I> {
300    type Event = HotShotEvent<TYPES>;
301
302    async fn handle_event(
303        &mut self,
304        event: Arc<Self::Event>,
305        sender: &Sender<Arc<Self::Event>>,
306        _receiver: &Receiver<Arc<Self::Event>>,
307    ) -> Result<()> {
308        self.handle(event, sender.clone()).await
309    }
310
311    /// Joins all subtasks.
312    fn cancel_subtasks(&mut self) {
313        // Cancel the old timeout task
314        std::mem::replace(&mut self.timeout_task, tokio::spawn(async {})).abort();
315    }
316}