hotshot_task_impls/quorum_proposal_recv/
handlers.rs1#![allow(dead_code)]
8
9use std::sync::Arc;
10
11use async_broadcast::{Receiver, Sender, broadcast};
12use async_lock::{RwLock, RwLockUpgradableReadGuard};
13use committable::Committable;
14use hotshot_types::{
15 consensus::OuterConsensus,
16 data::{Leaf2, QuorumProposal, QuorumProposalWrapper, ViewNumber},
17 epoch_membership::EpochMembershipCoordinator,
18 message::Proposal,
19 simple_certificate::{QuorumCertificate, QuorumCertificate2},
20 simple_vote::HasEpoch,
21 traits::{
22 ValidatedState,
23 block_contents::{BlockHeader, BlockPayload},
24 election::Membership,
25 node_implementation::{NodeImplementation, NodeType},
26 signature_key::SignatureKey,
27 storage::Storage,
28 },
29 utils::{
30 View, ViewInner, epoch_from_block_number, is_epoch_root, is_epoch_transition,
31 is_transition_block, option_epoch_from_block_number,
32 },
33 vote::{Certificate, HasViewNumber},
34};
35use hotshot_utils::anytrace::*;
36use tokio::spawn;
37use tracing::instrument;
38use versions::EPOCH_VERSION;
39
40use super::{QuorumProposalRecvTaskState, ValidationInfo};
41use crate::{
42 events::HotShotEvent,
43 helpers::{
44 broadcast_event, broadcast_view_change, check_qc_state_cert_correspondence, fetch_proposal,
45 update_high_qc, validate_epoch_transition_qc,
46 validate_light_client_state_update_certificate, validate_proposal_safety_and_liveness,
47 validate_proposal_view_and_certs, validate_qc_and_next_epoch_qc, verify_drb_result,
48 },
49 quorum_proposal_recv::UpgradeLock,
50};
51
52#[allow(clippy::too_many_arguments)]
54fn spawn_fetch_proposal<TYPES: NodeType>(
55 qc: &QuorumCertificate2<TYPES>,
56 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
57 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
58 membership: EpochMembershipCoordinator<TYPES>,
59 consensus: OuterConsensus<TYPES>,
60 sender_public_key: TYPES::SignatureKey,
61 sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
62 upgrade_lock: UpgradeLock<TYPES>,
63 epoch_height: u64,
64) {
65 let qc = qc.clone();
66 spawn(async move {
67 let lock = upgrade_lock;
68
69 let _ = fetch_proposal(
70 &qc,
71 event_sender,
72 event_receiver,
73 membership,
74 consensus,
75 sender_public_key,
76 sender_private_key,
77 &lock,
78 epoch_height,
79 )
80 .await;
81 });
82}
83
84#[instrument(skip_all)]
86pub async fn validate_proposal_liveness<TYPES: NodeType, I: NodeImplementation<TYPES>>(
87 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
88 validation_info: &ValidationInfo<TYPES, I>,
89) -> Result<()> {
90 let mut valid_epoch_transition = false;
91 if validation_info
92 .upgrade_lock
93 .version(proposal.data.view_number())
94 .is_ok_and(|v| v >= EPOCH_VERSION)
95 {
96 let Some(block_number) = proposal.data.justify_qc().data.block_number else {
97 bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
98 };
99 if is_epoch_transition(block_number, validation_info.epoch_height) {
100 validate_epoch_transition_qc(proposal, validation_info).await?;
101 valid_epoch_transition = true;
102 }
103 }
104 let mut consensus_writer = validation_info.consensus.write().await;
105
106 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
107
108 let state = Arc::new(
109 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
110 );
111
112 if let Err(e) = consensus_writer.update_leaf(leaf.clone(), state, None) {
113 tracing::trace!("{e:?}");
114 }
115
116 let liveness_check = proposal.data.justify_qc().view_number() > consensus_writer.locked_view();
117 if liveness_check
119 && validation_info
120 .upgrade_lock
121 .version(leaf.view_number())
122 .is_ok_and(|v| v >= EPOCH_VERSION)
123 {
124 consensus_writer.update_locked_view(proposal.data.justify_qc().view_number())?;
125 }
126
127 drop(consensus_writer);
128
129 if !liveness_check && !valid_epoch_transition {
130 bail!("Quorum Proposal failed the liveness check");
131 }
132
133 Ok(())
134}
135
136async fn validate_epoch_transition_block<TYPES: NodeType, I: NodeImplementation<TYPES>>(
137 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
138 validation_info: &ValidationInfo<TYPES, I>,
139) -> Result<()> {
140 if !validation_info
141 .upgrade_lock
142 .epochs_enabled(proposal.data.view_number())
143 {
144 return Ok(());
145 }
146 if !is_epoch_transition(
147 proposal.data.block_header().block_number(),
148 validation_info.epoch_height,
149 ) {
150 return Ok(());
151 }
152 if is_transition_block(
154 proposal.data.block_header().block_number(),
155 validation_info.epoch_height,
156 ) {
157 return Ok(());
158 }
159 let (empty_payload, metadata) = <TYPES as NodeType>::BlockPayload::empty();
161 let header = proposal.data.block_header();
162 ensure!(
163 empty_payload.builder_commitment(&metadata) == header.builder_commitment()
164 && &metadata == header.metadata(),
165 "Block is not empty"
166 );
167 Ok(())
168}
169
170async fn validate_current_epoch<TYPES: NodeType, I: NodeImplementation<TYPES>>(
171 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
172 validation_info: &ValidationInfo<TYPES, I>,
173) -> Result<()> {
174 let upgrade_view = validation_info
175 .upgrade_lock
176 .upgrade_view()
177 .unwrap_or(ViewNumber::new(0));
178 if !validation_info
179 .upgrade_lock
180 .epochs_enabled(proposal.data.view_number())
181 || proposal.data.justify_qc().view_number() <= upgrade_view
182 {
183 return Ok(());
184 }
185 if validation_info
186 .consensus
187 .read()
188 .await
189 .high_qc()
190 .view_number()
191 <= upgrade_view
192 {
193 return Ok(());
194 }
195
196 let block_number = proposal.data.block_header().block_number();
197
198 let Some(high_block_number) = validation_info
199 .consensus
200 .read()
201 .await
202 .high_qc()
203 .data
204 .block_number
205 else {
206 return Ok(());
207 };
208
209 ensure!(
210 epoch_from_block_number(block_number, validation_info.epoch_height)
211 >= epoch_from_block_number(high_block_number + 1, validation_info.epoch_height),
212 "Quorum proposal has an inconsistent epoch"
213 );
214
215 Ok(())
216}
217
218async fn validate_block_height<TYPES: NodeType>(
220 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
221) -> Result<()> {
222 let Some(qc_block_number) = proposal.data.justify_qc().data.block_number else {
223 return Ok(());
224 };
225 ensure!(
226 qc_block_number + 1 == proposal.data.block_header().block_number(),
227 "Quorum proposal has an inconsistent block height"
228 );
229 Ok(())
230}
231
232#[allow(clippy::too_many_lines)]
240#[instrument(skip_all)]
241pub(crate) async fn handle_quorum_proposal_recv<TYPES: NodeType, I: NodeImplementation<TYPES>>(
242 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
243 quorum_proposal_sender_key: &TYPES::SignatureKey,
244 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
245 event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
246 validation_info: ValidationInfo<TYPES, I>,
247) -> Result<()> {
248 proposal
249 .data
250 .validate_epoch(&validation_info.upgrade_lock, validation_info.epoch_height)
251 .await?;
252 validate_current_epoch(proposal, &validation_info).await?;
254 let quorum_proposal_sender_key = quorum_proposal_sender_key.clone();
255
256 validate_proposal_view_and_certs(proposal, &validation_info)
257 .await
258 .context(warn!("Failed to validate proposal view or attached certs"))?;
259
260 validate_block_height(proposal).await?;
261
262 let version = validation_info
263 .upgrade_lock
264 .version(proposal.data.view_number())?;
265
266 if version >= EPOCH_VERSION {
267 verify_drb_result(&proposal.data, &validation_info).await?;
269 }
270
271 let view_number = proposal.data.view_number();
272
273 let justify_qc = proposal.data.justify_qc().clone();
274 let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc().clone();
275
276 let proposal_block_number = proposal.data.block_header().block_number();
277 let proposal_epoch = option_epoch_from_block_number(
278 proposal.data.epoch().is_some(),
279 proposal_block_number,
280 validation_info.epoch_height,
281 );
282
283 if justify_qc
284 .data
285 .block_number
286 .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
287 {
288 let Some(state_cert) = proposal.data.state_cert() else {
289 bail!("Epoch root QC has no state cert");
290 };
291 ensure!(
292 check_qc_state_cert_correspondence(
293 &justify_qc,
294 state_cert,
295 validation_info.epoch_height
296 ),
297 "Epoch root QC has no corresponding state cert"
298 );
299 validate_light_client_state_update_certificate(
300 state_cert,
301 &validation_info.membership.coordinator,
302 &validation_info.upgrade_lock,
303 )
304 .await?;
305 }
306
307 validate_epoch_transition_block(proposal, &validation_info).await?;
308
309 validate_qc_and_next_epoch_qc(
310 &justify_qc,
311 maybe_next_epoch_justify_qc.as_ref(),
312 &validation_info.consensus,
313 &validation_info.membership.coordinator,
314 &validation_info.upgrade_lock,
315 validation_info.epoch_height,
316 )
317 .await?;
318
319 broadcast_event(
320 Arc::new(HotShotEvent::QuorumProposalPreliminarilyValidated(
321 proposal.clone(),
322 )),
323 event_sender,
324 )
325 .await;
326
327 let parent_leaf = validation_info
329 .consensus
330 .read()
331 .await
332 .saved_leaves()
333 .get(&justify_qc.data.leaf_commit)
334 .cloned();
335
336 if parent_leaf.is_none() {
337 spawn_fetch_proposal(
338 &justify_qc,
339 event_sender.clone(),
340 event_receiver.clone(),
341 validation_info.membership.coordinator.clone(),
342 OuterConsensus::new(Arc::clone(&validation_info.consensus.inner_consensus)),
343 validation_info.public_key.clone(),
347 validation_info.private_key.clone(),
348 validation_info.upgrade_lock.clone(),
349 validation_info.epoch_height,
350 );
351 }
352 let consensus_reader = validation_info.consensus.read().await;
353
354 let parent = match parent_leaf {
355 Some(leaf) => {
356 if let (Some(state), _) = consensus_reader.state_and_delta(leaf.view_number()) {
357 Some((leaf, Arc::clone(&state)))
358 } else {
359 bail!("Parent state not found! Consensus internally inconsistent");
360 }
361 },
362 None => None,
363 };
364 drop(consensus_reader);
365 if justify_qc.view_number()
366 > validation_info
367 .consensus
368 .read()
369 .await
370 .high_qc()
371 .view_number()
372 {
373 update_high_qc(proposal, &validation_info).await?;
374 }
375
376 let Some((parent_leaf, _parent_state)) = parent else {
377 tracing::warn!(
378 "Proposal's parent missing from storage with commitment: {:?}",
379 justify_qc.data.leaf_commit
380 );
381 validate_proposal_liveness(proposal, &validation_info).await?;
382 validation_info
383 .consensus
384 .write()
385 .await
386 .update_highest_block(proposal_block_number);
387 broadcast_view_change(
388 event_sender,
389 view_number,
390 proposal_epoch,
391 validation_info.first_epoch,
392 )
393 .await;
394 return Ok(());
395 };
396
397 validate_proposal_safety_and_liveness::<TYPES, I>(
399 proposal.clone(),
400 parent_leaf,
401 &validation_info,
402 event_sender.clone(),
403 quorum_proposal_sender_key,
404 )
405 .await?;
406
407 validation_info
408 .consensus
409 .write()
410 .await
411 .update_highest_block(proposal_block_number);
412 {
413 validation_info.consensus.write().await.highest_block = proposal_block_number;
414 }
415 broadcast_view_change(
416 event_sender,
417 view_number,
418 proposal_epoch,
419 validation_info.first_epoch,
420 )
421 .await;
422
423 Ok(())
424}