1use std::{sync::Arc, time::Duration};
8
9use async_broadcast::Sender;
10use chrono::Utc;
11use hotshot_types::{
12 data::{EpochNumber, ViewNumber},
13 event::{Event, EventType},
14 simple_certificate::EpochRootQuorumCertificateV2,
15 simple_vote::{EpochRootQuorumVote2, HasEpoch, QuorumVote2, TimeoutData2, TimeoutVote2},
16 traits::node_implementation::{NodeImplementation, NodeType},
17 utils::{EpochTransitionIndicator, is_epoch_root, is_epoch_transition, is_last_block},
18 vote::{HasViewNumber, Vote},
19};
20use hotshot_utils::anytrace::*;
21use tokio::{spawn, time::sleep};
22use tracing::instrument;
23use versions::EPOCH_VERSION;
24
25use super::ConsensusTaskState;
26use crate::{
27 events::HotShotEvent,
28 helpers::{broadcast_event, check_qc_state_cert_correspondence},
29 vote_collection::{handle_epoch_root_vote, handle_vote},
30};
31
32pub(crate) async fn handle_quorum_vote_recv<TYPES: NodeType, I: NodeImplementation<TYPES>>(
34 vote: &QuorumVote2<TYPES>,
35 event: Arc<HotShotEvent<TYPES>>,
36 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
37 task_state: &mut ConsensusTaskState<TYPES, I>,
38) -> Result<()> {
39 let in_transition = task_state
40 .consensus
41 .read()
42 .await
43 .is_high_qc_for_epoch_transition();
44 let epoch_membership = task_state
45 .membership_coordinator
46 .membership_for_epoch(vote.data.epoch)
47 .await
48 .context(warn!("No stake table for epoch"))?;
49
50 let we_are_leader =
51 epoch_membership.leader(vote.view_number() + 1).await? == task_state.public_key;
52 ensure!(
53 in_transition || we_are_leader,
54 info!(
55 "We are not the leader for view {} and we are not in the epoch transition",
56 vote.view_number() + 1
57 )
58 );
59
60 let transition_indicator = if in_transition {
61 EpochTransitionIndicator::InTransition
62 } else {
63 EpochTransitionIndicator::NotInTransition
64 };
65 handle_vote(
66 &mut task_state.vote_collectors,
67 vote,
68 task_state.public_key.clone(),
69 &epoch_membership,
70 task_state.id,
71 &event,
72 sender,
73 &task_state.upgrade_lock,
74 transition_indicator.clone(),
75 )
76 .await?;
77
78 if vote.epoch().is_some()
79 && vote
80 .data
81 .block_number
82 .is_some_and(|b| is_epoch_transition(b, task_state.epoch_height))
83 {
84 let has_stake = epoch_membership
86 .next_epoch_stake_table()
87 .await?
88 .has_stake(&vote.signing_key())
89 .await;
90 if has_stake {
91 handle_vote(
92 &mut task_state.next_epoch_vote_collectors,
93 &vote.clone().into(),
94 task_state.public_key.clone(),
95 &epoch_membership.next_epoch().await?.clone(),
100 task_state.id,
101 &event,
102 sender,
103 &task_state.upgrade_lock,
104 transition_indicator,
105 )
106 .await?;
107 }
108 }
109
110 Ok(())
111}
112
113pub(crate) async fn handle_epoch_root_quorum_vote_recv<
115 TYPES: NodeType,
116 I: NodeImplementation<TYPES>,
117>(
118 vote: &EpochRootQuorumVote2<TYPES>,
119 event: Arc<HotShotEvent<TYPES>>,
120 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
121 task_state: &mut ConsensusTaskState<TYPES, I>,
122) -> Result<()> {
123 ensure!(
124 vote.vote
125 .data
126 .block_number
127 .is_some_and(|bn| is_epoch_root(bn, task_state.epoch_height)),
128 error!("Received epoch root quorum vote for non epoch root block.")
129 );
130
131 let epoch_membership = task_state
132 .membership_coordinator
133 .membership_for_epoch(vote.vote.data.epoch)
134 .await
135 .context(warn!("No stake table for epoch"))?;
136
137 let we_are_leader =
138 epoch_membership.leader(vote.view_number() + 1).await? == task_state.public_key;
139 ensure!(
140 we_are_leader,
141 info!("We are not the leader for view {}", vote.view_number() + 1)
142 );
143
144 handle_epoch_root_vote(
145 &mut task_state.epoch_root_vote_collectors,
146 vote,
147 task_state.public_key.clone(),
148 &epoch_membership,
149 task_state.id,
150 &event,
151 sender,
152 &task_state.upgrade_lock,
153 )
154 .await?;
155
156 Ok(())
157}
158
159pub(crate) async fn handle_timeout_vote_recv<TYPES: NodeType, I: NodeImplementation<TYPES>>(
161 vote: &TimeoutVote2<TYPES>,
162 event: Arc<HotShotEvent<TYPES>>,
163 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
164 task_state: &mut ConsensusTaskState<TYPES, I>,
165) -> Result<()> {
166 let epoch_membership = task_state
167 .membership_coordinator
168 .membership_for_epoch(task_state.cur_epoch)
169 .await
170 .context(warn!("No stake table for epoch"))?;
171 ensure!(
173 epoch_membership.leader(vote.view_number() + 1).await? == task_state.public_key,
174 info!("We are not the leader for view {}", vote.view_number() + 1)
175 );
176
177 handle_vote(
178 &mut task_state.timeout_vote_collectors,
179 vote,
180 task_state.public_key.clone(),
181 &task_state
182 .membership_coordinator
183 .membership_for_epoch(vote.data.epoch)
184 .await?,
185 task_state.id,
186 &event,
187 sender,
188 &task_state.upgrade_lock,
189 EpochTransitionIndicator::NotInTransition,
190 )
191 .await?;
192
193 Ok(())
194}
195
196pub async fn send_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
203 new_view_number: ViewNumber,
204 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
205 task_state: &mut ConsensusTaskState<TYPES, I>,
206) -> Result<()> {
207 let version = task_state.upgrade_lock.version(new_view_number)?;
208 ensure!(
209 version >= EPOCH_VERSION,
210 debug!("HotStuff 2 upgrade not yet in effect")
211 );
212
213 let consensus_reader = task_state.consensus.read().await;
214 let high_qc = consensus_reader.high_qc().clone();
215 let is_eqc = high_qc
216 .data
217 .block_number
218 .is_some_and(|b| is_last_block(b, task_state.epoch_height));
219 let is_epoch_root = high_qc
220 .data
221 .block_number
222 .is_some_and(|b| is_epoch_root(b, task_state.epoch_height));
223 let state_cert = if is_epoch_root {
224 consensus_reader.state_cert().cloned()
225 } else {
226 None
227 };
228 drop(consensus_reader);
229
230 if is_eqc {
231 let maybe_next_epoch_high_qc = task_state
232 .consensus
233 .read()
234 .await
235 .next_epoch_high_qc()
236 .cloned();
237 ensure!(
238 maybe_next_epoch_high_qc
239 .as_ref()
240 .is_some_and(|neqc| neqc.data.leaf_commit == high_qc.data.leaf_commit),
241 "We've seen an extended QC but we don't have a corresponding next epoch extended QC"
242 );
243
244 tracing::debug!(
245 "Broadcasting Extended QC for view {} and epoch {:?}, my id {}.",
246 high_qc.view_number(),
247 high_qc.epoch(),
248 task_state.id
249 );
250 broadcast_event(
251 Arc::new(HotShotEvent::ExtendedQcSend(
252 high_qc,
253 maybe_next_epoch_high_qc.unwrap(),
254 task_state.public_key.clone(),
255 )),
256 sender,
257 )
258 .await;
259 } else {
260 let leader = task_state
261 .membership_coordinator
262 .membership_for_epoch(task_state.cur_epoch)
263 .await?
264 .leader(new_view_number)
265 .await?;
266
267 let (high_qc, maybe_next_epoch_qc) = if high_qc
268 .data
269 .block_number
270 .is_some_and(|b| is_epoch_transition(b, task_state.epoch_height))
271 {
272 let Some((qc, next_epoch_qc)) =
273 task_state.consensus.read().await.transition_qc().cloned()
274 else {
275 bail!("We don't have a transition QC");
276 };
277 ensure!(
278 next_epoch_qc.data.leaf_commit == qc.data.leaf_commit,
279 "Transition QC is invalid because leaf commits are not equal."
280 );
281 (qc, Some(next_epoch_qc))
282 } else {
283 (high_qc, None)
284 };
285
286 if is_epoch_root {
287 let Some(state_cert) = state_cert else {
289 bail!(
290 "We are sending an epoch root QC but we don't have the corresponding state \
291 cert."
292 );
293 };
294 ensure!(
295 check_qc_state_cert_correspondence(&high_qc, &state_cert, task_state.epoch_height),
296 "We are sending an epoch root QC but we don't have the corresponding state cert."
297 );
298
299 tracing::trace!(
300 "Sending epoch root QC for view {}, height {:?}",
301 high_qc.view_number(),
302 high_qc.data.block_number
303 );
304 broadcast_event(
305 Arc::new(HotShotEvent::EpochRootQcSend(
306 EpochRootQuorumCertificateV2 {
307 qc: high_qc,
308 state_cert,
309 },
310 leader,
311 task_state.public_key.clone(),
312 )),
313 sender,
314 )
315 .await;
316 } else {
317 tracing::trace!(
318 "Sending high QC for view {}, height {:?}",
319 high_qc.view_number(),
320 high_qc.data.block_number
321 );
322 broadcast_event(
323 Arc::new(HotShotEvent::HighQcSend(
324 high_qc,
325 maybe_next_epoch_qc,
326 leader,
327 task_state.public_key.clone(),
328 )),
329 sender,
330 )
331 .await;
332 }
333 }
334 Ok(())
335}
336
337#[instrument(skip_all)]
339pub(crate) async fn handle_view_change<TYPES: NodeType, I: NodeImplementation<TYPES>>(
340 new_view_number: ViewNumber,
341 epoch_number: Option<EpochNumber>,
342 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
343 task_state: &mut ConsensusTaskState<TYPES, I>,
344) -> Result<()> {
345 if epoch_number > task_state.cur_epoch {
346 task_state.cur_epoch = epoch_number;
347 if let Some(new_epoch) = epoch_number {
348 let _ = task_state.consensus.write().await.update_epoch(new_epoch);
349 tracing::info!("Progress: entered epoch {:>6}", *new_epoch);
350 }
351 }
352
353 ensure!(
354 new_view_number > task_state.cur_view,
355 "New view is not larger than the current view"
356 );
357
358 let old_view_number = task_state.cur_view;
359 tracing::debug!("Updating view from {old_view_number} to {new_view_number}");
360
361 if *old_view_number / 100 != *new_view_number / 100 {
362 tracing::info!("Progress: entered view {:>6}", *new_view_number);
363 }
364
365 let _ = send_high_qc(new_view_number, sender, task_state)
368 .await
369 .inspect_err(|e| {
370 tracing::debug!("High QC sending failed with error: {e:?}");
371 });
372
373 task_state.cur_view = new_view_number;
375 task_state
376 .consensus
377 .write()
378 .await
379 .update_view(new_view_number)?;
380
381 if let Some(cert) = task_state.upgrade_lock.decided_upgrade_cert()
383 && new_view_number == cert.data.new_version_first_view
384 {
385 tracing::error!("Version upgraded based on a decided upgrade cert: {cert:?}");
386 }
387
388 let timeout = task_state.timeout;
390 let new_timeout_task = spawn({
391 let stream = sender.clone();
392 let view_number = new_view_number;
393 async move {
394 sleep(Duration::from_millis(timeout)).await;
395 broadcast_event(
396 Arc::new(HotShotEvent::Timeout(
397 ViewNumber::new(*view_number),
398 epoch_number,
399 )),
400 &stream,
401 )
402 .await;
403 }
404 });
405
406 std::mem::replace(&mut task_state.timeout_task, new_timeout_task).abort();
408
409 let old_view_leader_key = task_state
410 .membership_coordinator
411 .membership_for_epoch(task_state.cur_epoch)
412 .await
413 .context(warn!("No stake table for epoch"))?
414 .leader(old_view_number)
415 .await?;
416
417 let consensus_reader = task_state.consensus.read().await;
418 consensus_reader
419 .metrics
420 .current_view
421 .set(usize::try_from(task_state.cur_view.u64()).unwrap());
422 let cur_view_time = Utc::now().timestamp();
423 if old_view_leader_key == task_state.public_key {
424 #[allow(clippy::cast_precision_loss)]
425 consensus_reader
426 .metrics
427 .view_duration_as_leader
428 .add_point((cur_view_time - task_state.cur_view_time) as f64);
429 }
430 task_state.cur_view_time = cur_view_time;
431
432 if usize::try_from(task_state.cur_view.u64()).unwrap()
435 > usize::try_from(consensus_reader.last_decided_view().u64()).unwrap()
436 {
437 consensus_reader
438 .metrics
439 .number_of_views_since_last_decide
440 .set(
441 usize::try_from(task_state.cur_view.u64()).unwrap()
442 - usize::try_from(consensus_reader.last_decided_view().u64()).unwrap(),
443 );
444 }
445
446 broadcast_event(
447 Event {
448 view_number: old_view_number,
449 event: EventType::ViewFinished {
450 view_number: old_view_number,
451 },
452 },
453 &task_state.output_event_stream,
454 )
455 .await;
456 Ok(())
457}
458
459#[instrument(skip_all)]
461pub(crate) async fn handle_timeout<TYPES: NodeType, I: NodeImplementation<TYPES>>(
462 view_number: ViewNumber,
463 epoch: Option<EpochNumber>,
464 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
465 task_state: &mut ConsensusTaskState<TYPES, I>,
466) -> Result<()> {
467 ensure!(
468 task_state.cur_view <= view_number,
469 "Timeout event is for an old view"
470 );
471
472 ensure!(
473 task_state
474 .membership_coordinator
475 .stake_table_for_epoch(epoch)
476 .await
477 .context(warn!("No stake table for epoch"))?
478 .has_stake(&task_state.public_key)
479 .await,
480 debug!("We were not chosen for the consensus committee for view {view_number}",)
481 );
482
483 let vote = TimeoutVote2::create_signed_vote(
484 TimeoutData2 {
485 view: view_number,
486 epoch,
487 },
488 view_number,
489 &task_state.public_key,
490 &task_state.private_key,
491 &task_state.upgrade_lock,
492 )
493 .await
494 .wrap()
495 .context(error!("Failed to sign TimeoutData"))?;
496
497 broadcast_event(Arc::new(HotShotEvent::TimeoutVoteSend(vote)), sender).await;
498 broadcast_event(
499 Event {
500 view_number,
501 event: EventType::ViewTimeout { view_number },
502 },
503 &task_state.output_event_stream,
504 )
505 .await;
506
507 tracing::error!(
508 "We did not receive evidence for view {view_number} in time, sending timeout vote for \
509 that view!"
510 );
511
512 broadcast_event(
513 Event {
514 view_number,
515 event: EventType::ReplicaViewTimeout { view_number },
516 },
517 &task_state.output_event_stream,
518 )
519 .await;
520
521 let leader = task_state
522 .membership_coordinator
523 .membership_for_epoch(task_state.cur_epoch)
524 .await
525 .context(warn!("No stake table for epoch"))?
526 .leader(view_number)
527 .await;
528
529 let consensus_reader = task_state.consensus.read().await;
530 consensus_reader.metrics.number_of_timeouts.add(1);
531 if leader.as_ref().is_ok_and(|l| *l == task_state.public_key) {
532 consensus_reader.metrics.number_of_timeouts_as_leader.add(1);
533 }
534 drop(consensus_reader);
535 task_state
536 .consensus
537 .write()
538 .await
539 .update_validator_participation(
540 leader?,
541 task_state.cur_epoch.ok_or(debug!("No epoch"))?,
542 false,
543 );
544
545 Ok(())
546}