1#![allow(clippy::unwrap_or_default)]
8use std::{collections::BTreeMap, marker::PhantomData};
9
10use async_broadcast::Sender;
11use async_trait::async_trait;
12use committable::Committable;
13use hotshot_example_types::block_types::TestBlockHeader;
14use hotshot_types::{
15 data::Leaf2,
16 event::{Event, EventType},
17 message::UpgradeLock,
18 traits::node_implementation::{ConsensusTime, NodeType, Versions},
19 vote::HasViewNumber,
20};
21use hotshot_utils::anytrace::*;
22use tokio::task::JoinHandle;
23
24use crate::{
25 overall_safety_task::OverallSafetyPropertiesDescription,
26 test_builder::TransactionValidator,
27 test_task::{spawn_timeout_task, TestEvent, TestResult, TestTaskState},
28};
29
30pub type NodeMap<TYPES> = BTreeMap<<TYPES as NodeType>::View, Vec<Leaf2<TYPES>>>;
32
33pub type NodeMapSanitized<TYPES> = BTreeMap<<TYPES as NodeType>::View, Leaf2<TYPES>>;
35
36fn sanitize_node_map<TYPES: NodeType>(
38 node_map: &NodeMap<TYPES>,
39) -> Result<NodeMapSanitized<TYPES>> {
40 let mut result = BTreeMap::new();
41
42 for (view, leaves) in node_map.iter() {
43 let mut reduced = leaves.clone();
44
45 reduced.dedup();
46
47 match reduced.len() {
48 0 => {},
49 1 => {
50 result.insert(*view, reduced[0].clone());
51 },
52 _ => {
53 bail!(
54 "We have received inconsistent leaves for view {view}. Leaves:\n\n{leaves:?}"
55 );
56 },
57 }
58 }
59
60 Ok(result)
61}
62
63async fn validate_node_map<TYPES: NodeType, V: Versions>(
65 node_map: &NodeMapSanitized<TYPES>,
66) -> Result<()> {
67 let leaf_triples = node_map
69 .values()
70 .zip(node_map.values().skip(1))
71 .zip(node_map.values().skip(2))
72 .map(|((a, b), c)| (a, b, c));
73
74 let mut decided_upgrade_certificate = None;
75 let mut view_decided = TYPES::View::new(0);
76
77 for (grandparent, _parent, child) in leaf_triples {
78 if let Some(cert) = grandparent.upgrade_certificate() {
79 if cert.data.decide_by <= child.view_number() {
80 decided_upgrade_certificate = Some(cert);
81 view_decided = child.view_number();
82
83 break;
84 }
85 }
86 }
87
88 let upgrade_lock = UpgradeLock::<TYPES, V>::new();
93
94 let leaf_pairs = node_map.values().zip(node_map.values().skip(1));
95
96 for (parent, child) in leaf_pairs {
98 ensure!(
99 child.justify_qc().view_number >= parent.view_number(),
100 "The node has provided leaf:\n\n{child:?}\n\nbut its quorum certificate points to a \
101 view before the most recent leaf:\n\n{parent:?}"
102 );
103
104 child
105 .extends_upgrade(parent, &upgrade_lock.decided_upgrade_certificate)
106 .await
107 .context(|e| error!("Leaf {child:?} does not extend its parent {parent:?}: {e}"))?;
108
109 ensure!(
110 child.height() > parent.height(),
111 "The node has decided leaf\n\n{child:?}\n\nextending leaf\n\n{parent:?}but the block \
112 height did not increase."
113 );
114
115 if child.justify_qc().view_number == parent.view_number()
118 && child.justify_qc().data.leaf_commit != parent.commit()
119 {
120 bail!(
121 "The node has provided leaf:\n\n{child:?}\n\nwhich points \
122 to:\n\n{parent:?}\n\nbut the commits do not match."
123 );
124 }
125
126 if child.view_number() == view_decided {
127 upgrade_lock
128 .decided_upgrade_certificate
129 .write()
130 .await
131 .clone_from(&decided_upgrade_certificate);
132 }
133 }
134
135 Ok(())
136}
137
138pub type NetworkMap<TYPES> = BTreeMap<usize, NodeMap<TYPES>>;
140
141pub type NetworkMapSanitized<TYPES> = BTreeMap<usize, NodeMapSanitized<TYPES>>;
143
144fn sanitize_network_map<TYPES: NodeType>(
146 network_map: &NetworkMap<TYPES>,
147) -> Result<NetworkMapSanitized<TYPES>> {
148 let mut result = BTreeMap::new();
149
150 for (node, node_map) in network_map {
151 result.insert(
152 *node,
153 sanitize_node_map(node_map)
154 .context(|e| error!("Node {node} produced inconsistent leaves: {e}"))?,
155 );
156 }
157
158 Ok(result)
159}
160
161pub type ViewMap<TYPES> = BTreeMap<<TYPES as NodeType>::View, BTreeMap<usize, Leaf2<TYPES>>>;
162
163async fn invert_network_map<TYPES: NodeType, V: Versions>(
169 network_map: &NetworkMapSanitized<TYPES>,
170) -> Result<ViewMap<TYPES>> {
171 let mut inverted_map = BTreeMap::new();
172 for (node_id, node_map) in network_map.iter() {
173 validate_node_map::<TYPES, V>(node_map)
174 .await
175 .context(|e| error!("Node {node_id} has an invalid leaf history: {e}"))?;
176
177 for (view, leaf) in node_map.iter() {
179 let view_map = inverted_map.entry(*view).or_insert(BTreeMap::new());
180 view_map.insert(*node_id, leaf.clone());
181 }
182 }
183
184 Ok(inverted_map)
185}
186
187pub type ViewMapSanitized<TYPES> = BTreeMap<<TYPES as NodeType>::View, Leaf2<TYPES>>;
189
190fn sanitize_view_map<TYPES: NodeType>(
191 view_map: &ViewMap<TYPES>,
192) -> Result<ViewMapSanitized<TYPES>> {
193 let mut result = BTreeMap::new();
194
195 for (view, leaf_map) in view_map.iter() {
196 let mut node_leaves: Vec<_> = leaf_map.iter().collect();
197
198 node_leaves.dedup_by(|(_node_a, leaf_a), (_node_b, leaf_b)| leaf_a == leaf_b);
199
200 ensure!(
201 node_leaves.len() <= 1,
202 error!(
203 "The network does not agree on the following views: {}",
204 leaf_map
205 .iter()
206 .fold(format!("\n\nView {view}:"), |acc, (node, leaf)| {
207 format!("{acc}\n\nNode {node} sent us leaf:\n\n{leaf:?}")
208 })
209 )
210 );
211
212 if let Some(leaf) = node_leaves.first() {
213 result.insert(*view, leaf.1.clone());
214 }
215 }
216
217 for (parent, child) in result.values().zip(result.values().skip(1)) {
218 if child.justify_qc().data.leaf_commit != parent.commit() {
220 bail!(
221 "The network has decided:\n\n{child:?}\n\nwhich succeeds:\n\n{parent:?}\n\nbut \
222 the commits do not match. Did we miss an intermediate leaf?"
223 );
224 }
225 }
226
227 Ok(result)
228}
229
230enum TestProgress {
231 Incomplete,
232 Finished,
233}
234
235pub struct ConsistencyTask<TYPES: NodeType, V: Versions> {
237 pub consensus_leaves: NetworkMap<TYPES>,
239 pub safety_properties: OverallSafetyPropertiesDescription,
241 pub ensure_upgrade: bool,
243 pub errors: Vec<Error>,
245 pub test_sender: Sender<TestEvent>,
247 pub _pd: PhantomData<V>,
249 pub validate_transactions: TransactionValidator,
251 pub timeout_task: JoinHandle<()>,
253}
254
255impl<TYPES: NodeType<BlockHeader = TestBlockHeader>, V: Versions> ConsistencyTask<TYPES, V> {
256 pub async fn validate(&self) -> Result<()> {
257 let sanitized_network_map = sanitize_network_map(&self.consensus_leaves)?;
258
259 let inverted_map = invert_network_map::<TYPES, V>(&sanitized_network_map).await?;
260
261 let sanitized_view_map = sanitize_view_map(&inverted_map)?;
262 let num_successful_views = sanitized_view_map.iter().len();
263
264 ensure!(
266 num_successful_views >= self.safety_properties.num_successful_views,
267 "Not enough successful views: expected {:?} but got {:?}",
268 self.safety_properties.num_successful_views,
269 num_successful_views,
270 );
271
272 let expected_upgrade = self.ensure_upgrade;
273 let actual_upgrade = sanitized_view_map.iter().fold(false, |acc, (_view, leaf)| {
274 acc || leaf.upgrade_certificate().is_some()
275 });
276
277 let mut transactions = Vec::new();
278
279 transactions = sanitized_view_map
280 .iter()
281 .fold(transactions, |mut acc, (view, leaf)| {
282 acc.push((**view, leaf.block_header().metadata.num_transactions));
283
284 acc
285 });
286
287 (self.validate_transactions)(&transactions)?;
288
289 ensure!(
290 expected_upgrade == actual_upgrade,
291 "Mismatch between expected and actual upgrade. Expected upgrade: {expected_upgrade}. \
292 Actual upgrade: {actual_upgrade}"
293 );
294
295 Ok(())
296 }
297
298 async fn partial_validate(&self) -> Result<TestProgress> {
299 self.check_view_success().await?;
300 self.check_view_failure().await?;
301
302 self.check_total_successes().await
303 }
304
305 fn add_error(&mut self, error: Error) {
306 self.errors.push(error);
307 }
308
309 async fn handle_result(&mut self, result: Result<TestProgress>) {
310 match result {
311 Ok(TestProgress::Finished) => {
312 let _ = self.test_sender.broadcast(TestEvent::Shutdown).await;
313 },
314 Err(e) => {
315 self.add_error(e);
316 let _ = self.test_sender.broadcast(TestEvent::Shutdown).await;
317 },
318 Ok(TestProgress::Incomplete) => {},
319 }
320 }
321
322 async fn check_total_successes(&self) -> Result<TestProgress> {
323 let sanitized_network_map = sanitize_network_map(&self.consensus_leaves)?;
324
325 let inverted_map = invert_network_map::<TYPES, V>(&sanitized_network_map).await?;
326
327 if inverted_map.len() >= self.safety_properties.num_successful_views {
328 Ok(TestProgress::Finished)
329 } else {
330 Ok(TestProgress::Incomplete)
331 }
332 }
333 pub async fn check_view_success(&self) -> Result<()> {
334 for (node_id, node_map) in self.consensus_leaves.iter() {
335 for (view, leaf) in node_map {
336 ensure!(
337 !self.safety_properties.expected_view_failures.contains(view),
338 "Expected a view failure, but got a decided leaf for view {view} from node \
339 {node_id}.\n\nLeaf:\n\n{leaf:?}"
340 );
341 }
342 }
343
344 Ok(())
345 }
346
347 pub async fn check_view_failure(&self) -> Result<()> {
348 let sanitized_network_map = sanitize_network_map(&self.consensus_leaves)?;
349
350 let mut inverted_map = invert_network_map::<TYPES, V>(&sanitized_network_map).await?;
351
352 let (current_view, _) = inverted_map
353 .pop_last()
354 .context(error!("Leaf map is empty, which should be impossible"))?;
355 let Some((last_view, _)) = inverted_map.pop_last() else {
356 return Ok(());
358 };
359
360 let unexpected_failed_views: Vec<_> = (*(last_view + 1)..*current_view)
362 .filter(|view| {
363 !self.safety_properties.expected_view_failures.contains(view)
364 && !self.safety_properties.possible_view_failures.contains(view)
365 })
366 .collect();
367
368 ensure!(
369 unexpected_failed_views.is_empty(),
370 "Unexpected failed views: {:?}",
371 unexpected_failed_views
372 );
373
374 Ok(())
375 }
376}
377
378#[async_trait]
379impl<TYPES: NodeType<BlockHeader = TestBlockHeader>, V: Versions> TestTaskState
380 for ConsistencyTask<TYPES, V>
381{
382 type Event = Event<TYPES>;
383 type Error = Error;
384
385 async fn handle_event(&mut self, (message, id): (Self::Event, usize)) -> Result<()> {
387 if let Event {
388 event:
389 EventType::Decide {
390 leaf_chain,
391 committing_qc,
392 deciding_qc,
393 ..
394 },
395 ..
396 } = message
397 {
398 {
399 let mut timeout_task = spawn_timeout_task(
400 self.test_sender.clone(),
401 self.safety_properties.decide_timeout,
402 );
403
404 std::mem::swap(&mut self.timeout_task, &mut timeout_task);
405
406 timeout_task.abort();
407 }
408
409 match deciding_qc {
410 Some(deciding_qc) => {
411 let last_leaf = &leaf_chain[0].leaf;
412 ensure!(committing_qc.view_number() == last_leaf.view_number());
413 ensure!(committing_qc.leaf_commit() == last_leaf.commit());
414 ensure!(deciding_qc.view_number() == committing_qc.view_number() + 1);
415 },
416 None => {
417 ensure!(
420 committing_qc.epoch().is_none(),
421 "expected 2nd deciding QC for post-epochs decide"
422 );
423 },
424 }
425
426 for leaf_info in leaf_chain.iter().rev() {
427 let map = &mut self.consensus_leaves.entry(id).or_insert(BTreeMap::new());
428
429 map.entry(leaf_info.leaf.view_number())
430 .and_modify(|vec| vec.push(leaf_info.leaf.clone()))
431 .or_insert(vec![leaf_info.leaf.clone()]);
432
433 let result = self.partial_validate().await;
434
435 self.handle_result(result).await;
436 }
437 }
438
439 Ok(())
440 }
441
442 async fn check(&self) -> TestResult {
443 self.timeout_task.abort();
444
445 let mut errors: Vec<_> = self.errors.iter().map(|e| e.to_string()).collect();
446
447 if let Err(e) = self.validate().await {
448 errors.push(e.to_string());
449 }
450
451 if !errors.is_empty() {
452 TestResult::Fail(Box::new(errors))
453 } else {
454 TestResult::Pass
455 }
456 }
457}