hotshot_task_impls/consensus/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use handlers::handle_epoch_root_quorum_vote_recv;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14    consensus::OuterConsensus,
15    epoch_membership::EpochMembershipCoordinator,
16    event::Event,
17    message::UpgradeLock,
18    simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, TimeoutCertificate2},
19    simple_vote::{HasEpoch, NextEpochQuorumVote2, QuorumVote2, TimeoutVote2},
20    traits::{
21        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
22        signature_key::SignatureKey,
23        storage::Storage,
24    },
25    utils::{epoch_from_block_number, is_last_block},
26    vote::HasViewNumber,
27};
28use hotshot_utils::anytrace::*;
29use tokio::task::JoinHandle;
30use tracing::instrument;
31
32use self::handlers::{
33    handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
34};
35use crate::{
36    events::HotShotEvent,
37    helpers::{broadcast_view_change, validate_qc_and_next_epoch_qc},
38    vote_collection::{EpochRootVoteCollectorsMap, VoteCollectorsMap},
39};
40
41/// Event handlers for use in the `handle` method.
42mod handlers;
43
44/// Task state for the Consensus task.
45pub struct ConsensusTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
46    /// Our public key
47    pub public_key: TYPES::SignatureKey,
48
49    /// Our Private Key
50    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
51
52    /// Immutable instance state
53    pub instance_state: Arc<TYPES::InstanceState>,
54
55    /// The underlying network
56    pub network: Arc<I::Network>,
57
58    /// Membership for Quorum Certs/votes
59    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
60
61    /// A map of `QuorumVote` collector tasks.
62    pub vote_collectors: VoteCollectorsMap<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>, V>,
63
64    /// A map of `EpochRootQuorumVote` collector tasks.
65    pub epoch_root_vote_collectors: EpochRootVoteCollectorsMap<TYPES, V>,
66
67    /// A map of `QuorumVote` collector tasks. They collect votes from the nodes in the next epoch.
68    pub next_epoch_vote_collectors: VoteCollectorsMap<
69        TYPES,
70        NextEpochQuorumVote2<TYPES>,
71        NextEpochQuorumCertificate2<TYPES>,
72        V,
73    >,
74
75    /// A map of `TimeoutVote` collector tasks.
76    pub timeout_vote_collectors:
77        VoteCollectorsMap<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>, V>,
78
79    /// The view number that this node is currently executing in.
80    pub cur_view: TYPES::View,
81
82    /// Timestamp this view starts at.
83    pub cur_view_time: i64,
84
85    /// The epoch number that this node is currently executing in.
86    pub cur_epoch: Option<TYPES::Epoch>,
87
88    /// Output events to application
89    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
90
91    /// Timeout task handle
92    pub timeout_task: JoinHandle<()>,
93
94    /// View timeout from config.
95    pub timeout: u64,
96
97    /// A reference to the metrics trait.
98    pub consensus: OuterConsensus<TYPES>,
99
100    /// A reference to the storage trait.
101    pub storage: I::Storage,
102
103    /// The node's id
104    pub id: u64,
105
106    /// Lock for a decided upgrade
107    pub upgrade_lock: UpgradeLock<TYPES, V>,
108
109    /// Number of blocks in an epoch, zero means there are no epochs
110    pub epoch_height: u64,
111
112    /// The time this view started
113    pub view_start_time: Instant,
114
115    /// First view in which epoch version takes effect
116    pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
117}
118
119impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusTaskState<TYPES, I, V> {
120    /// Handles a consensus event received on the event stream
121    #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, cur_epoch = self.cur_epoch.map(|x| *x)), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")]
122    pub async fn handle(
123        &mut self,
124        event: Arc<HotShotEvent<TYPES>>,
125        sender: Sender<Arc<HotShotEvent<TYPES>>>,
126    ) -> Result<()> {
127        match event.as_ref() {
128            HotShotEvent::QuorumVoteRecv(ref vote) => {
129                if let Err(e) =
130                    handle_quorum_vote_recv(vote, Arc::clone(&event), &sender, self).await
131                {
132                    tracing::debug!("Failed to handle QuorumVoteRecv event; error = {e}");
133                }
134            },
135            HotShotEvent::EpochRootQuorumVoteRecv(ref vote) => {
136                if let Err(e) =
137                    handle_epoch_root_quorum_vote_recv(vote, Arc::clone(&event), &sender, self)
138                        .await
139                {
140                    tracing::debug!("Failed to handle EpochRootQuorumVoteRecv event; error = {e}");
141                }
142            },
143            HotShotEvent::TimeoutVoteRecv(ref vote) => {
144                if let Err(e) =
145                    handle_timeout_vote_recv(vote, Arc::clone(&event), &sender, self).await
146                {
147                    tracing::debug!("Failed to handle TimeoutVoteRecv event; error = {e}");
148                }
149            },
150            HotShotEvent::SetFirstEpoch(view, epoch) => {
151                self.first_epoch = Some((*view, *epoch));
152            },
153            HotShotEvent::ViewChange(new_view_number, epoch_number) => {
154                // Request the randomized stake table for the subsequent epoch,
155                // to trigger catchup and the DRB calculation if it happens to be missing.
156                let _ = self
157                    .membership_coordinator
158                    .membership_for_epoch(epoch_number.map(|e| e + 1))
159                    .await;
160
161                if let Err(e) =
162                    handle_view_change(*new_view_number, *epoch_number, &sender, self).await
163                {
164                    tracing::trace!("Failed to handle ViewChange event; error = {e}");
165                }
166                self.view_start_time = Instant::now();
167            },
168            HotShotEvent::Timeout(view_number, epoch) => {
169                if let Err(e) = handle_timeout(*view_number, *epoch, &sender, self).await {
170                    tracing::debug!("Failed to handle Timeout event; error = {e}");
171                }
172            },
173            HotShotEvent::ExtendedQc2Formed(eqc) => {
174                let cert_view = eqc.view_number();
175                let Some(cert_block_number) = eqc.data.block_number else {
176                    tracing::error!("Received extended QC but no block number");
177                    return Ok(());
178                };
179                let cert_epoch = epoch_from_block_number(cert_block_number, self.epoch_height);
180                tracing::error!("Formed Extended QC for view {cert_view} and epoch {cert_epoch}.");
181                // Transition to the new epoch by sending ViewChange
182                let next_epoch = TYPES::Epoch::new(cert_epoch + 1);
183                broadcast_view_change(&sender, cert_view + 1, Some(next_epoch), self.first_epoch)
184                    .await;
185                tracing::info!("Entering new epoch: {next_epoch}");
186                tracing::info!(
187                    "Stake table for epoch {}:\n\n{:?}",
188                    next_epoch,
189                    self.membership_coordinator
190                        .stake_table_for_epoch(Some(next_epoch))
191                        .await?
192                        .stake_table()
193                        .await
194                );
195            },
196            HotShotEvent::ExtendedQcRecv(high_qc, next_epoch_high_qc, _) => {
197                if !high_qc
198                    .data
199                    .block_number
200                    .is_some_and(|bn| is_last_block(bn, self.epoch_height))
201                {
202                    tracing::warn!("Received extended QC but we can't verify the leaf is extended");
203                    return Ok(());
204                }
205                if let Err(e) = validate_qc_and_next_epoch_qc(
206                    high_qc,
207                    Some(next_epoch_high_qc),
208                    &self.consensus,
209                    &self.membership_coordinator,
210                    &self.upgrade_lock,
211                    self.epoch_height,
212                )
213                .await
214                {
215                    tracing::error!("Received invalid extended QC: {e}");
216                    return Ok(());
217                }
218
219                let next_epoch = high_qc.data.epoch().map(|x| x + 1);
220
221                let mut consensus_writer = self.consensus.write().await;
222                let high_qc_updated = consensus_writer.update_high_qc(high_qc.clone()).is_ok();
223                let next_high_qc_updated = consensus_writer
224                    .update_next_epoch_high_qc(next_epoch_high_qc.clone())
225                    .is_ok();
226                if let Some(next_epoch) = next_epoch {
227                    consensus_writer.update_validator_participation_epoch(next_epoch);
228                }
229                drop(consensus_writer);
230
231                self.storage
232                    .update_high_qc2(high_qc.clone())
233                    .await
234                    .map_err(|_| warn!("Failed to update high QC"))?;
235                self.storage
236                    .update_next_epoch_high_qc2(next_epoch_high_qc.clone())
237                    .await
238                    .map_err(|_| warn!("Failed to update next epoch high QC"))?;
239                self.storage
240                    .update_eqc(high_qc.clone(), next_epoch_high_qc.clone())
241                    .await
242                    .map_err(|_| warn!("Failed to store eQC"))?;
243
244                tracing::debug!(
245                    "Received Extended QC for view {} and epoch {:?}.",
246                    high_qc.view_number(),
247                    high_qc.epoch()
248                );
249                if high_qc_updated || next_high_qc_updated {
250                    // Send ViewChange indicating new view and new epoch.
251                    let next_epoch = high_qc.data.epoch().map(|x| x + 1);
252                    tracing::info!("Entering new epoch: {next_epoch:?}");
253                    broadcast_view_change(
254                        &sender,
255                        high_qc.view_number() + 1,
256                        next_epoch,
257                        self.first_epoch,
258                    )
259                    .await;
260                }
261            },
262            _ => {},
263        }
264
265        Ok(())
266    }
267}
268
269#[async_trait]
270impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
271    for ConsensusTaskState<TYPES, I, V>
272{
273    type Event = HotShotEvent<TYPES>;
274
275    async fn handle_event(
276        &mut self,
277        event: Arc<Self::Event>,
278        sender: &Sender<Arc<Self::Event>>,
279        _receiver: &Receiver<Arc<Self::Event>>,
280    ) -> Result<()> {
281        self.handle(event, sender.clone()).await
282    }
283
284    /// Joins all subtasks.
285    fn cancel_subtasks(&mut self) {
286        // Cancel the old timeout task
287        std::mem::replace(&mut self.timeout_task, tokio::spawn(async {})).abort();
288    }
289}