1use std::{sync::Arc, time::Duration};
8
9use async_broadcast::Sender;
10use chrono::Utc;
11use hotshot_types::{
12 event::{Event, EventType},
13 simple_certificate::EpochRootQuorumCertificate,
14 simple_vote::{EpochRootQuorumVote, HasEpoch, QuorumVote2, TimeoutData2, TimeoutVote2},
15 traits::node_implementation::{ConsensusTime, NodeImplementation, NodeType},
16 utils::{is_epoch_root, is_epoch_transition, is_last_block, EpochTransitionIndicator},
17 vote::{HasViewNumber, Vote},
18};
19use hotshot_utils::anytrace::*;
20use tokio::{spawn, time::sleep};
21use tracing::instrument;
22use vbs::version::StaticVersionType;
23
24use super::ConsensusTaskState;
25use crate::{
26 consensus::Versions,
27 events::HotShotEvent,
28 helpers::{broadcast_event, check_qc_state_cert_correspondence},
29 vote_collection::{handle_epoch_root_vote, handle_vote},
30};
31
32pub(crate) async fn handle_quorum_vote_recv<
34 TYPES: NodeType,
35 I: NodeImplementation<TYPES>,
36 V: Versions,
37>(
38 vote: &QuorumVote2<TYPES>,
39 event: Arc<HotShotEvent<TYPES>>,
40 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
41 task_state: &mut ConsensusTaskState<TYPES, I, V>,
42) -> Result<()> {
43 let in_transition = task_state
44 .consensus
45 .read()
46 .await
47 .is_high_qc_for_epoch_transition();
48 let epoch_membership = task_state
49 .membership_coordinator
50 .membership_for_epoch(vote.data.epoch)
51 .await
52 .context(warn!("No stake table for epoch"))?;
53
54 let we_are_leader =
55 epoch_membership.leader(vote.view_number() + 1).await? == task_state.public_key;
56 ensure!(
57 in_transition || we_are_leader,
58 info!(
59 "We are not the leader for view {} and we are not in the epoch transition",
60 vote.view_number() + 1
61 )
62 );
63
64 let transition_indicator = if in_transition {
65 EpochTransitionIndicator::InTransition
66 } else {
67 EpochTransitionIndicator::NotInTransition
68 };
69 handle_vote(
70 &mut task_state.vote_collectors,
71 vote,
72 task_state.public_key.clone(),
73 &epoch_membership,
74 task_state.id,
75 &event,
76 sender,
77 &task_state.upgrade_lock,
78 transition_indicator.clone(),
79 )
80 .await?;
81
82 if vote.epoch().is_some()
83 && vote
84 .data
85 .block_number
86 .is_some_and(|b| is_epoch_transition(b, task_state.epoch_height))
87 {
88 let has_stake = epoch_membership
90 .next_epoch_stake_table()
91 .await?
92 .has_stake(&vote.signing_key())
93 .await;
94 if has_stake {
95 handle_vote(
96 &mut task_state.next_epoch_vote_collectors,
97 &vote.clone().into(),
98 task_state.public_key.clone(),
99 &epoch_membership.next_epoch().await?.clone(),
104 task_state.id,
105 &event,
106 sender,
107 &task_state.upgrade_lock,
108 transition_indicator,
109 )
110 .await?;
111 }
112 }
113
114 Ok(())
115}
116
117pub(crate) async fn handle_epoch_root_quorum_vote_recv<
119 TYPES: NodeType,
120 I: NodeImplementation<TYPES>,
121 V: Versions,
122>(
123 vote: &EpochRootQuorumVote<TYPES>,
124 event: Arc<HotShotEvent<TYPES>>,
125 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
126 task_state: &mut ConsensusTaskState<TYPES, I, V>,
127) -> Result<()> {
128 ensure!(
129 vote.vote
130 .data
131 .block_number
132 .is_some_and(|bn| is_epoch_root(bn, task_state.epoch_height)),
133 error!("Received epoch root quorum vote for non epoch root block.")
134 );
135
136 let epoch_membership = task_state
137 .membership_coordinator
138 .membership_for_epoch(vote.vote.data.epoch)
139 .await
140 .context(warn!("No stake table for epoch"))?;
141
142 let we_are_leader =
143 epoch_membership.leader(vote.view_number() + 1).await? == task_state.public_key;
144 ensure!(
145 we_are_leader,
146 info!("We are not the leader for view {}", vote.view_number() + 1)
147 );
148
149 handle_epoch_root_vote(
150 &mut task_state.epoch_root_vote_collectors,
151 vote,
152 task_state.public_key.clone(),
153 &epoch_membership,
154 task_state.id,
155 &event,
156 sender,
157 &task_state.upgrade_lock,
158 )
159 .await?;
160
161 Ok(())
162}
163
164pub(crate) async fn handle_timeout_vote_recv<
166 TYPES: NodeType,
167 I: NodeImplementation<TYPES>,
168 V: Versions,
169>(
170 vote: &TimeoutVote2<TYPES>,
171 event: Arc<HotShotEvent<TYPES>>,
172 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
173 task_state: &mut ConsensusTaskState<TYPES, I, V>,
174) -> Result<()> {
175 let epoch_membership = task_state
176 .membership_coordinator
177 .membership_for_epoch(task_state.cur_epoch)
178 .await
179 .context(warn!("No stake table for epoch"))?;
180 ensure!(
182 epoch_membership.leader(vote.view_number() + 1).await? == task_state.public_key,
183 info!("We are not the leader for view {}", vote.view_number() + 1)
184 );
185
186 handle_vote(
187 &mut task_state.timeout_vote_collectors,
188 vote,
189 task_state.public_key.clone(),
190 &task_state
191 .membership_coordinator
192 .membership_for_epoch(vote.data.epoch)
193 .await?,
194 task_state.id,
195 &event,
196 sender,
197 &task_state.upgrade_lock,
198 EpochTransitionIndicator::NotInTransition,
199 )
200 .await?;
201
202 Ok(())
203}
204
205pub async fn send_high_qc<TYPES: NodeType, V: Versions, I: NodeImplementation<TYPES>>(
212 new_view_number: TYPES::View,
213 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
214 task_state: &mut ConsensusTaskState<TYPES, I, V>,
215) -> Result<()> {
216 let version = task_state.upgrade_lock.version(new_view_number).await?;
217 ensure!(
218 version >= V::Epochs::VERSION,
219 debug!("HotStuff 2 upgrade not yet in effect")
220 );
221
222 let consensus_reader = task_state.consensus.read().await;
223 let high_qc = consensus_reader.high_qc().clone();
224 let is_eqc = high_qc
225 .data
226 .block_number
227 .is_some_and(|b| is_last_block(b, task_state.epoch_height));
228 let is_epoch_root = high_qc
229 .data
230 .block_number
231 .is_some_and(|b| is_epoch_root(b, task_state.epoch_height));
232 let state_cert = if is_epoch_root {
233 consensus_reader.state_cert().cloned()
234 } else {
235 None
236 };
237 drop(consensus_reader);
238
239 if is_eqc {
240 let maybe_next_epoch_high_qc = task_state
241 .consensus
242 .read()
243 .await
244 .next_epoch_high_qc()
245 .cloned();
246 ensure!(
247 maybe_next_epoch_high_qc
248 .as_ref()
249 .is_some_and(|neqc| neqc.data.leaf_commit == high_qc.data.leaf_commit),
250 "We've seen an extended QC but we don't have a corresponding next epoch extended QC"
251 );
252
253 tracing::debug!(
254 "Broadcasting Extended QC for view {} and epoch {:?}, my id {}.",
255 high_qc.view_number(),
256 high_qc.epoch(),
257 task_state.id
258 );
259 broadcast_event(
260 Arc::new(HotShotEvent::ExtendedQcSend(
261 high_qc,
262 maybe_next_epoch_high_qc.unwrap(),
263 task_state.public_key.clone(),
264 )),
265 sender,
266 )
267 .await;
268 } else {
269 let leader = task_state
270 .membership_coordinator
271 .membership_for_epoch(task_state.cur_epoch)
272 .await?
273 .leader(new_view_number)
274 .await?;
275
276 let (high_qc, maybe_next_epoch_qc) = if high_qc
277 .data
278 .block_number
279 .is_some_and(|b| is_epoch_transition(b, task_state.epoch_height))
280 {
281 let Some((qc, next_epoch_qc)) =
282 task_state.consensus.read().await.transition_qc().cloned()
283 else {
284 bail!("We don't have a transition QC");
285 };
286 ensure!(
287 next_epoch_qc.data.leaf_commit == qc.data.leaf_commit,
288 "Transition QC is invalid because leaf commits are not equal."
289 );
290 (qc, Some(next_epoch_qc))
291 } else {
292 (high_qc, None)
293 };
294
295 if is_epoch_root {
296 let Some(state_cert) = state_cert else {
298 bail!(
299 "We are sending an epoch root QC but we don't have the corresponding state \
300 cert."
301 );
302 };
303 ensure!(
304 check_qc_state_cert_correspondence(&high_qc, &state_cert, task_state.epoch_height),
305 "We are sending an epoch root QC but we don't have the corresponding state cert."
306 );
307
308 tracing::trace!(
309 "Sending epoch root QC for view {}, height {:?}",
310 high_qc.view_number(),
311 high_qc.data.block_number
312 );
313 broadcast_event(
314 Arc::new(HotShotEvent::EpochRootQcSend(
315 EpochRootQuorumCertificate {
316 qc: high_qc,
317 state_cert,
318 },
319 leader,
320 task_state.public_key.clone(),
321 )),
322 sender,
323 )
324 .await;
325 } else {
326 tracing::trace!(
327 "Sending high QC for view {}, height {:?}",
328 high_qc.view_number(),
329 high_qc.data.block_number
330 );
331 broadcast_event(
332 Arc::new(HotShotEvent::HighQcSend(
333 high_qc,
334 maybe_next_epoch_qc,
335 leader,
336 task_state.public_key.clone(),
337 )),
338 sender,
339 )
340 .await;
341 }
342 }
343 Ok(())
344}
345
346#[instrument(skip_all)]
348pub(crate) async fn handle_view_change<
349 TYPES: NodeType,
350 I: NodeImplementation<TYPES>,
351 V: Versions,
352>(
353 new_view_number: TYPES::View,
354 epoch_number: Option<TYPES::Epoch>,
355 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
356 task_state: &mut ConsensusTaskState<TYPES, I, V>,
357) -> Result<()> {
358 if epoch_number > task_state.cur_epoch {
359 task_state.cur_epoch = epoch_number;
360 if let Some(new_epoch) = epoch_number {
361 let _ = task_state.consensus.write().await.update_epoch(new_epoch);
362 tracing::info!("Progress: entered epoch {:>6}", *new_epoch);
363 }
364 }
365
366 ensure!(
367 new_view_number > task_state.cur_view,
368 "New view is not larger than the current view"
369 );
370
371 let old_view_number = task_state.cur_view;
372 tracing::debug!("Updating view from {old_view_number} to {new_view_number}");
373
374 if *old_view_number / 100 != *new_view_number / 100 {
375 tracing::info!("Progress: entered view {:>6}", *new_view_number);
376 }
377
378 let _ = send_high_qc(new_view_number, sender, task_state)
381 .await
382 .inspect_err(|e| {
383 tracing::debug!("High QC sending failed with error: {e:?}");
384 });
385
386 task_state.cur_view = new_view_number;
388 task_state
389 .consensus
390 .write()
391 .await
392 .update_view(new_view_number)?;
393
394 let decided_upgrade_certificate_read = task_state
396 .upgrade_lock
397 .decided_upgrade_certificate
398 .read()
399 .await
400 .clone();
401 if let Some(cert) = decided_upgrade_certificate_read {
402 if new_view_number == cert.data.new_version_first_view {
403 tracing::error!("Version upgraded based on a decided upgrade cert: {cert:?}");
404 }
405 }
406
407 let timeout = task_state.timeout;
409 let new_timeout_task = spawn({
410 let stream = sender.clone();
411 let view_number = new_view_number;
412 async move {
413 sleep(Duration::from_millis(timeout)).await;
414 broadcast_event(
415 Arc::new(HotShotEvent::Timeout(
416 TYPES::View::new(*view_number),
417 epoch_number,
418 )),
419 &stream,
420 )
421 .await;
422 }
423 });
424
425 std::mem::replace(&mut task_state.timeout_task, new_timeout_task).abort();
427
428 let old_view_leader_key = task_state
429 .membership_coordinator
430 .membership_for_epoch(task_state.cur_epoch)
431 .await
432 .context(warn!("No stake table for epoch"))?
433 .leader(old_view_number)
434 .await?;
435
436 let consensus_reader = task_state.consensus.read().await;
437 consensus_reader
438 .metrics
439 .current_view
440 .set(usize::try_from(task_state.cur_view.u64()).unwrap());
441 let cur_view_time = Utc::now().timestamp();
442 if old_view_leader_key == task_state.public_key {
443 #[allow(clippy::cast_precision_loss)]
444 consensus_reader
445 .metrics
446 .view_duration_as_leader
447 .add_point((cur_view_time - task_state.cur_view_time) as f64);
448 }
449 task_state.cur_view_time = cur_view_time;
450
451 if usize::try_from(task_state.cur_view.u64()).unwrap()
454 > usize::try_from(consensus_reader.last_decided_view().u64()).unwrap()
455 {
456 consensus_reader
457 .metrics
458 .number_of_views_since_last_decide
459 .set(
460 usize::try_from(task_state.cur_view.u64()).unwrap()
461 - usize::try_from(consensus_reader.last_decided_view().u64()).unwrap(),
462 );
463 }
464
465 broadcast_event(
466 Event {
467 view_number: old_view_number,
468 event: EventType::ViewFinished {
469 view_number: old_view_number,
470 },
471 },
472 &task_state.output_event_stream,
473 )
474 .await;
475 Ok(())
476}
477
478#[instrument(skip_all)]
480pub(crate) async fn handle_timeout<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
481 view_number: TYPES::View,
482 epoch: Option<TYPES::Epoch>,
483 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
484 task_state: &mut ConsensusTaskState<TYPES, I, V>,
485) -> Result<()> {
486 ensure!(
487 task_state.cur_view <= view_number,
488 "Timeout event is for an old view"
489 );
490
491 ensure!(
492 task_state
493 .membership_coordinator
494 .stake_table_for_epoch(epoch)
495 .await
496 .context(warn!("No stake table for epoch"))?
497 .has_stake(&task_state.public_key)
498 .await,
499 debug!("We were not chosen for the consensus committee for view {view_number}",)
500 );
501
502 let vote = TimeoutVote2::create_signed_vote(
503 TimeoutData2::<TYPES> {
504 view: view_number,
505 epoch,
506 },
507 view_number,
508 &task_state.public_key,
509 &task_state.private_key,
510 &task_state.upgrade_lock,
511 )
512 .await
513 .wrap()
514 .context(error!("Failed to sign TimeoutData"))?;
515
516 broadcast_event(Arc::new(HotShotEvent::TimeoutVoteSend(vote)), sender).await;
517 broadcast_event(
518 Event {
519 view_number,
520 event: EventType::ViewTimeout { view_number },
521 },
522 &task_state.output_event_stream,
523 )
524 .await;
525
526 tracing::error!(
527 "We did not receive evidence for view {view_number} in time, sending timeout vote for \
528 that view!"
529 );
530
531 broadcast_event(
532 Event {
533 view_number,
534 event: EventType::ReplicaViewTimeout { view_number },
535 },
536 &task_state.output_event_stream,
537 )
538 .await;
539
540 let leader = task_state
541 .membership_coordinator
542 .membership_for_epoch(task_state.cur_epoch)
543 .await
544 .context(warn!("No stake table for epoch"))?
545 .leader(view_number)
546 .await;
547
548 let consensus_reader = task_state.consensus.read().await;
549 consensus_reader.metrics.number_of_timeouts.add(1);
550 if leader? == task_state.public_key {
551 consensus_reader.metrics.number_of_timeouts_as_leader.add(1);
552 }
553
554 Ok(())
555}