hotshot_task_impls/consensus/
mod.rs1use std::{sync::Arc, time::Instant};
8
9use alloy::primitives::U256;
10use async_broadcast::{Receiver, Sender};
11use async_trait::async_trait;
12use handlers::handle_epoch_root_quorum_vote_recv;
13use hotshot_task::task::TaskState;
14use hotshot_types::{
15 consensus::OuterConsensus,
16 data::{EpochNumber, ViewNumber},
17 epoch_membership::EpochMembershipCoordinator,
18 event::Event,
19 message::UpgradeLock,
20 simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, TimeoutCertificate2},
21 simple_vote::{HasEpoch, NextEpochQuorumVote2, QuorumVote2, TimeoutVote2},
22 stake_table::HSStakeTable,
23 traits::{
24 node_implementation::{NodeImplementation, NodeType},
25 signature_key::SignatureKey,
26 storage::Storage,
27 },
28 utils::{epoch_from_block_number, is_last_block},
29 vote::HasViewNumber,
30};
31use hotshot_utils::anytrace::*;
32use tokio::task::JoinHandle;
33use tracing::instrument;
34
35use self::handlers::{
36 handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
37};
38use crate::{
39 events::HotShotEvent,
40 helpers::{broadcast_view_change, validate_qc_and_next_epoch_qc},
41 vote_collection::{EpochRootVoteCollectorsMap, VoteCollectorsMap},
42};
43
44mod handlers;
46
47pub struct ConsensusTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
49 pub public_key: TYPES::SignatureKey,
51
52 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
54
55 pub instance_state: Arc<TYPES::InstanceState>,
57
58 pub network: Arc<I::Network>,
60
61 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
63
64 pub vote_collectors: VoteCollectorsMap<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>,
66
67 pub epoch_root_vote_collectors: EpochRootVoteCollectorsMap<TYPES>,
69
70 pub next_epoch_vote_collectors:
72 VoteCollectorsMap<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>,
73
74 pub timeout_vote_collectors:
76 VoteCollectorsMap<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>,
77
78 pub cur_view: ViewNumber,
80
81 pub cur_view_time: i64,
83
84 pub cur_epoch: Option<EpochNumber>,
86
87 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
89
90 pub timeout_task: JoinHandle<()>,
92
93 pub timeout: u64,
95
96 pub consensus: OuterConsensus<TYPES>,
98
99 pub storage: I::Storage,
101
102 pub id: u64,
104
105 pub upgrade_lock: UpgradeLock<TYPES>,
107
108 pub epoch_height: u64,
110
111 pub view_start_time: Instant,
113
114 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
116}
117
118impl<TYPES: NodeType, I: NodeImplementation<TYPES>> ConsensusTaskState<TYPES, I> {
119 #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, cur_epoch = self.cur_epoch.map(|x| *x)), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")]
121 pub async fn handle(
122 &mut self,
123 event: Arc<HotShotEvent<TYPES>>,
124 sender: Sender<Arc<HotShotEvent<TYPES>>>,
125 ) -> Result<()> {
126 match event.as_ref() {
127 HotShotEvent::QuorumVoteRecv(vote) => {
128 if let Err(e) =
129 handle_quorum_vote_recv(vote, Arc::clone(&event), &sender, self).await
130 {
131 tracing::debug!("Failed to handle QuorumVoteRecv event; error = {e}");
132 }
133 },
134 HotShotEvent::EpochRootQuorumVoteRecv(vote) => {
135 if let Err(e) =
136 handle_epoch_root_quorum_vote_recv(vote, Arc::clone(&event), &sender, self)
137 .await
138 {
139 tracing::debug!("Failed to handle EpochRootQuorumVoteRecv event; error = {e}");
140 }
141 },
142 HotShotEvent::TimeoutVoteRecv(vote) => {
143 if let Err(e) =
144 handle_timeout_vote_recv(vote, Arc::clone(&event), &sender, self).await
145 {
146 tracing::debug!("Failed to handle TimeoutVoteRecv event; error = {e}");
147 }
148 },
149 HotShotEvent::SetFirstEpoch(view, epoch) => {
150 self.first_epoch = Some((*view, *epoch));
151 },
152 HotShotEvent::ViewChange(new_view_number, epoch_number) => {
153 let frequency = (self.epoch_height / 30).clamp(1, 100);
159 if **new_view_number % frequency == 0 {
160 let _ = self
161 .membership_coordinator
162 .membership_for_epoch(epoch_number.map(|e| e + 1))
163 .await;
164 }
165
166 if let Err(e) =
167 handle_view_change(*new_view_number, *epoch_number, &sender, self).await
168 {
169 tracing::trace!("Failed to handle ViewChange event; error = {e}");
170 }
171 self.view_start_time = Instant::now();
172 },
173 HotShotEvent::Timeout(view_number, epoch) => {
174 if let Err(e) = handle_timeout(*view_number, *epoch, &sender, self).await {
175 tracing::debug!("Failed to handle Timeout event; error = {e}");
176 }
177 },
178 HotShotEvent::ExtendedQc2Formed(eqc) => {
179 let cert_view = eqc.view_number();
180 let Some(cert_block_number) = eqc.data.block_number else {
181 tracing::error!("Received extended QC but no block number");
182 return Ok(());
183 };
184 let cert_epoch = epoch_from_block_number(cert_block_number, self.epoch_height);
185 tracing::error!("Formed Extended QC for view {cert_view} and epoch {cert_epoch}.");
186 let next_epoch = EpochNumber::new(cert_epoch + 1);
188 broadcast_view_change(&sender, cert_view + 1, Some(next_epoch), self.first_epoch)
189 .await;
190 tracing::info!("Entering new epoch: {next_epoch}");
191 tracing::info!(
192 "Stake table for epoch {}:\n\n{:?}",
193 next_epoch,
194 self.membership_coordinator
195 .stake_table_for_epoch(Some(next_epoch))
196 .await?
197 .stake_table()
198 .await
199 );
200 },
201 HotShotEvent::ExtendedQcRecv(high_qc, next_epoch_high_qc, _) => {
202 if !high_qc
203 .data
204 .block_number
205 .is_some_and(|bn| is_last_block(bn, self.epoch_height))
206 {
207 tracing::warn!("Received extended QC but we can't verify the leaf is extended");
208 return Ok(());
209 }
210 if let Err(e) = validate_qc_and_next_epoch_qc(
211 high_qc,
212 Some(next_epoch_high_qc),
213 &self.consensus,
214 &self.membership_coordinator,
215 &self.upgrade_lock,
216 self.epoch_height,
217 )
218 .await
219 {
220 tracing::error!("Received invalid extended QC: {e}");
221 return Ok(());
222 }
223
224 let next_epoch = high_qc.data.epoch().map(|x| x + 1);
225
226 let mut consensus_writer = self.consensus.write().await;
227 let high_qc_updated = consensus_writer.update_high_qc(high_qc.clone()).is_ok();
228 let next_high_qc_updated = consensus_writer
229 .update_next_epoch_high_qc(next_epoch_high_qc.clone())
230 .is_ok();
231 if let Some(next_epoch) = next_epoch {
232 consensus_writer.update_validator_participation_epoch(next_epoch);
233 let (stake_table, success_threshold) = if let Ok(epoch_membership) = self
234 .membership_coordinator
235 .stake_table_for_epoch(Some(next_epoch))
236 .await
237 {
238 (
239 epoch_membership.stake_table().await,
240 epoch_membership.success_threshold().await,
241 )
242 } else {
243 tracing::warn!(
244 "Failed to get stake table for epoch {} while updating vote \
245 participation",
246 next_epoch
247 );
248 (HSStakeTable::default(), U256::MAX)
249 };
250 consensus_writer
251 .update_vote_participation_epoch(
252 stake_table,
253 success_threshold,
254 Some(next_epoch),
255 )
256 .context(warn!("Updating vote participation"))?;
257 }
258 drop(consensus_writer);
259
260 self.storage
261 .update_high_qc2(high_qc.clone())
262 .await
263 .map_err(|_| warn!("Failed to update high QC"))?;
264 self.storage
265 .update_next_epoch_high_qc2(next_epoch_high_qc.clone())
266 .await
267 .map_err(|_| warn!("Failed to update next epoch high QC"))?;
268 self.storage
269 .update_eqc(high_qc.clone(), next_epoch_high_qc.clone())
270 .await
271 .map_err(|_| warn!("Failed to store eQC"))?;
272
273 tracing::debug!(
274 "Received Extended QC for view {} and epoch {:?}.",
275 high_qc.view_number(),
276 high_qc.epoch()
277 );
278 if high_qc_updated || next_high_qc_updated {
279 let next_epoch = high_qc.data.epoch().map(|x| x + 1);
281 tracing::info!("Entering new epoch: {next_epoch:?}");
282 broadcast_view_change(
283 &sender,
284 high_qc.view_number() + 1,
285 next_epoch,
286 self.first_epoch,
287 )
288 .await;
289 }
290 },
291 _ => {},
292 }
293
294 Ok(())
295 }
296}
297
298#[async_trait]
299impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for ConsensusTaskState<TYPES, I> {
300 type Event = HotShotEvent<TYPES>;
301
302 async fn handle_event(
303 &mut self,
304 event: Arc<Self::Event>,
305 sender: &Sender<Arc<Self::Event>>,
306 _receiver: &Receiver<Arc<Self::Event>>,
307 ) -> Result<()> {
308 self.handle(event, sender.clone()).await
309 }
310
311 fn cancel_subtasks(&mut self) {
313 std::mem::replace(&mut self.timeout_task, tokio::spawn(async {})).abort();
315 }
316}