hotshot_task_impls/consensus/
mod.rs1use std::{sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use handlers::handle_epoch_root_quorum_vote_recv;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14 consensus::OuterConsensus,
15 epoch_membership::EpochMembershipCoordinator,
16 event::Event,
17 message::UpgradeLock,
18 simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, TimeoutCertificate2},
19 simple_vote::{HasEpoch, NextEpochQuorumVote2, QuorumVote2, TimeoutVote2},
20 traits::{
21 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
22 signature_key::SignatureKey,
23 storage::Storage,
24 },
25 utils::{epoch_from_block_number, is_last_block},
26 vote::HasViewNumber,
27};
28use hotshot_utils::anytrace::*;
29use tokio::task::JoinHandle;
30use tracing::instrument;
31
32use self::handlers::{
33 handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
34};
35use crate::{
36 events::HotShotEvent,
37 helpers::{broadcast_view_change, validate_qc_and_next_epoch_qc},
38 vote_collection::{EpochRootVoteCollectorsMap, VoteCollectorsMap},
39};
40
41mod handlers;
43
44pub struct ConsensusTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
46 pub public_key: TYPES::SignatureKey,
48
49 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
51
52 pub instance_state: Arc<TYPES::InstanceState>,
54
55 pub network: Arc<I::Network>,
57
58 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
60
61 pub vote_collectors: VoteCollectorsMap<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>, V>,
63
64 pub epoch_root_vote_collectors: EpochRootVoteCollectorsMap<TYPES, V>,
66
67 pub next_epoch_vote_collectors: VoteCollectorsMap<
69 TYPES,
70 NextEpochQuorumVote2<TYPES>,
71 NextEpochQuorumCertificate2<TYPES>,
72 V,
73 >,
74
75 pub timeout_vote_collectors:
77 VoteCollectorsMap<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>, V>,
78
79 pub cur_view: TYPES::View,
81
82 pub cur_view_time: i64,
84
85 pub cur_epoch: Option<TYPES::Epoch>,
87
88 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
90
91 pub timeout_task: JoinHandle<()>,
93
94 pub timeout: u64,
96
97 pub consensus: OuterConsensus<TYPES>,
99
100 pub storage: I::Storage,
102
103 pub id: u64,
105
106 pub upgrade_lock: UpgradeLock<TYPES, V>,
108
109 pub epoch_height: u64,
111
112 pub view_start_time: Instant,
114
115 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
117}
118
119impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusTaskState<TYPES, I, V> {
120 #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, cur_epoch = self.cur_epoch.map(|x| *x)), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")]
122 pub async fn handle(
123 &mut self,
124 event: Arc<HotShotEvent<TYPES>>,
125 sender: Sender<Arc<HotShotEvent<TYPES>>>,
126 ) -> Result<()> {
127 match event.as_ref() {
128 HotShotEvent::QuorumVoteRecv(ref vote) => {
129 if let Err(e) =
130 handle_quorum_vote_recv(vote, Arc::clone(&event), &sender, self).await
131 {
132 tracing::debug!("Failed to handle QuorumVoteRecv event; error = {e}");
133 }
134 },
135 HotShotEvent::EpochRootQuorumVoteRecv(ref vote) => {
136 if let Err(e) =
137 handle_epoch_root_quorum_vote_recv(vote, Arc::clone(&event), &sender, self)
138 .await
139 {
140 tracing::debug!("Failed to handle EpochRootQuorumVoteRecv event; error = {e}");
141 }
142 },
143 HotShotEvent::TimeoutVoteRecv(ref vote) => {
144 if let Err(e) =
145 handle_timeout_vote_recv(vote, Arc::clone(&event), &sender, self).await
146 {
147 tracing::debug!("Failed to handle TimeoutVoteRecv event; error = {e}");
148 }
149 },
150 HotShotEvent::SetFirstEpoch(view, epoch) => {
151 self.first_epoch = Some((*view, *epoch));
152 },
153 HotShotEvent::ViewChange(new_view_number, epoch_number) => {
154 let _ = self
157 .membership_coordinator
158 .membership_for_epoch(epoch_number.map(|e| e + 1))
159 .await;
160
161 if let Err(e) =
162 handle_view_change(*new_view_number, *epoch_number, &sender, self).await
163 {
164 tracing::trace!("Failed to handle ViewChange event; error = {e}");
165 }
166 self.view_start_time = Instant::now();
167 },
168 HotShotEvent::Timeout(view_number, epoch) => {
169 if let Err(e) = handle_timeout(*view_number, *epoch, &sender, self).await {
170 tracing::debug!("Failed to handle Timeout event; error = {e}");
171 }
172 },
173 HotShotEvent::ExtendedQc2Formed(eqc) => {
174 let cert_view = eqc.view_number();
175 let Some(cert_block_number) = eqc.data.block_number else {
176 tracing::error!("Received extended QC but no block number");
177 return Ok(());
178 };
179 let cert_epoch = epoch_from_block_number(cert_block_number, self.epoch_height);
180 tracing::error!("Formed Extended QC for view {cert_view} and epoch {cert_epoch}.");
181 let next_epoch = TYPES::Epoch::new(cert_epoch + 1);
183 broadcast_view_change(&sender, cert_view + 1, Some(next_epoch), self.first_epoch)
184 .await;
185 tracing::info!("Entering new epoch: {next_epoch}");
186 tracing::info!(
187 "Stake table for epoch {}:\n\n{:?}",
188 next_epoch,
189 self.membership_coordinator
190 .stake_table_for_epoch(Some(next_epoch))
191 .await?
192 .stake_table()
193 .await
194 );
195 },
196 HotShotEvent::ExtendedQcRecv(high_qc, next_epoch_high_qc, _) => {
197 if !high_qc
198 .data
199 .block_number
200 .is_some_and(|bn| is_last_block(bn, self.epoch_height))
201 {
202 tracing::warn!("Received extended QC but we can't verify the leaf is extended");
203 return Ok(());
204 }
205 if let Err(e) = validate_qc_and_next_epoch_qc(
206 high_qc,
207 Some(next_epoch_high_qc),
208 &self.consensus,
209 &self.membership_coordinator,
210 &self.upgrade_lock,
211 self.epoch_height,
212 )
213 .await
214 {
215 tracing::error!("Received invalid extended QC: {e}");
216 return Ok(());
217 }
218
219 let next_epoch = high_qc.data.epoch().map(|x| x + 1);
220
221 let mut consensus_writer = self.consensus.write().await;
222 let high_qc_updated = consensus_writer.update_high_qc(high_qc.clone()).is_ok();
223 let next_high_qc_updated = consensus_writer
224 .update_next_epoch_high_qc(next_epoch_high_qc.clone())
225 .is_ok();
226 if let Some(next_epoch) = next_epoch {
227 consensus_writer.update_validator_participation_epoch(next_epoch);
228 }
229 drop(consensus_writer);
230
231 self.storage
232 .update_high_qc2(high_qc.clone())
233 .await
234 .map_err(|_| warn!("Failed to update high QC"))?;
235 self.storage
236 .update_next_epoch_high_qc2(next_epoch_high_qc.clone())
237 .await
238 .map_err(|_| warn!("Failed to update next epoch high QC"))?;
239 self.storage
240 .update_eqc(high_qc.clone(), next_epoch_high_qc.clone())
241 .await
242 .map_err(|_| warn!("Failed to store eQC"))?;
243
244 tracing::debug!(
245 "Received Extended QC for view {} and epoch {:?}.",
246 high_qc.view_number(),
247 high_qc.epoch()
248 );
249 if high_qc_updated || next_high_qc_updated {
250 let next_epoch = high_qc.data.epoch().map(|x| x + 1);
252 tracing::info!("Entering new epoch: {next_epoch:?}");
253 broadcast_view_change(
254 &sender,
255 high_qc.view_number() + 1,
256 next_epoch,
257 self.first_epoch,
258 )
259 .await;
260 }
261 },
262 _ => {},
263 }
264
265 Ok(())
266 }
267}
268
269#[async_trait]
270impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
271 for ConsensusTaskState<TYPES, I, V>
272{
273 type Event = HotShotEvent<TYPES>;
274
275 async fn handle_event(
276 &mut self,
277 event: Arc<Self::Event>,
278 sender: &Sender<Arc<Self::Event>>,
279 _receiver: &Receiver<Arc<Self::Event>>,
280 ) -> Result<()> {
281 self.handle(event, sender.clone()).await
282 }
283
284 fn cancel_subtasks(&mut self) {
286 std::mem::replace(&mut self.timeout_task, tokio::spawn(async {})).abort();
288 }
289}