1use std::{marker::PhantomData, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13 consensus::{Consensus, OuterConsensus, PayloadWithMetadata},
14 data::{
15 DaProposal2, EpochNumber, PackedBundle, ViewNumber, vid_commitment,
16 vid_disperse::vid_total_weight,
17 },
18 epoch_membership::EpochMembershipCoordinator,
19 event::{Event, EventType},
20 message::{Proposal, UpgradeLock},
21 simple_certificate::DaCertificate2,
22 simple_vote::{DaData2, DaVote2},
23 storage_metrics::StorageMetricsValue,
24 traits::{
25 BlockPayload, EncodeBytes,
26 network::ConnectedNetwork,
27 node_implementation::{NodeImplementation, NodeType},
28 signature_key::SignatureKey,
29 storage::Storage,
30 },
31 utils::{EpochTransitionIndicator, epoch_from_block_number, is_ge_epoch_root, is_last_block},
32 vote::HasViewNumber,
33};
34use hotshot_utils::anytrace::*;
35use sha2::{Digest, Sha256};
36use tokio::{spawn, task::spawn_blocking};
37use tracing::instrument;
38
39use crate::{
40 events::HotShotEvent,
41 helpers::broadcast_event,
42 vote_collection::{VoteCollectorsMap, handle_vote},
43};
44
45pub struct DaTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
47 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
49
50 pub cur_view: ViewNumber,
52
53 pub cur_epoch: Option<EpochNumber>,
55
56 pub consensus: OuterConsensus<TYPES>,
58
59 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
63
64 pub network: Arc<I::Network>,
66
67 pub vote_collectors: VoteCollectorsMap<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>,
69
70 pub public_key: TYPES::SignatureKey,
72
73 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
75
76 pub id: u64,
78
79 pub storage: I::Storage,
81
82 pub storage_metrics: Arc<StorageMetricsValue>,
84
85 pub upgrade_lock: UpgradeLock<TYPES>,
87}
88
89impl<TYPES: NodeType, I: NodeImplementation<TYPES>> DaTaskState<TYPES, I> {
90 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "DA Main Task", level = "error", target = "DaTaskState")]
92 pub async fn handle(
93 &mut self,
94 event: Arc<HotShotEvent<TYPES>>,
95 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
96 ) -> Result<()> {
97 match event.as_ref() {
98 HotShotEvent::DaProposalRecv(proposal, sender) => {
99 let sender = sender.clone();
100 tracing::debug!(
101 "DA proposal received for view: {}",
102 proposal.data.view_number()
103 );
104 let view = proposal.data.view_number();
106
107 ensure!(
112 self.cur_view <= view + 1,
113 "Throwing away DA proposal that is more than one view older"
114 );
115
116 if let Some(entry) = self.consensus.read().await.saved_payloads().get(&view) {
117 ensure!(
118 entry.payload.encode() == proposal.data.encoded_transactions,
119 "Received DA proposal for view {view} but we already have a payload for \
120 that view and they are not identical. Throwing it away",
121 );
122 }
123
124 let encoded_transactions_hash = Sha256::digest(&proposal.data.encoded_transactions);
125 let view_leader_key = self
126 .membership_coordinator
127 .membership_for_epoch(proposal.data.epoch)
128 .await
129 .context(warn!("No stake table for epoch {:?}", proposal.data.epoch))?
130 .leader(view)
131 .await?;
132 ensure!(
133 view_leader_key == sender,
134 warn!(
135 "DA proposal doesn't have expected leader key for view {} \n DA proposal \
136 is: {:?}",
137 *view,
138 proposal.data.clone()
139 )
140 );
141
142 ensure!(
143 view_leader_key.validate(&proposal.signature, &encoded_transactions_hash),
144 warn!("Could not verify proposal.")
145 );
146
147 broadcast_event(
148 Arc::new(HotShotEvent::DaProposalValidated(proposal.clone(), sender)),
149 &event_stream,
150 )
151 .await;
152 },
153 HotShotEvent::DaProposalValidated(proposal, sender) => {
154 tracing::debug!(
155 "DA proposal validated for view {}",
156 proposal.data.view_number()
157 );
158 let cur_view = self.consensus.read().await.cur_view();
159 let view_number = proposal.data.view_number();
160 let epoch_number = proposal.data.epoch;
161 let membership = self
162 .membership_coordinator
163 .stake_table_for_epoch(epoch_number)
164 .await
165 .context(warn!("No stake table for epoch"))?;
166
167 ensure!(
168 cur_view <= view_number + 1,
169 debug!(
170 "Validated DA proposal for prior view but it's too old now Current view \
171 {cur_view}, DA Proposal view {}",
172 proposal.data.view_number()
173 )
174 );
175
176 broadcast_event(
178 Event {
179 view_number,
180 event: EventType::DaProposal {
181 proposal: proposal.clone(),
182 sender: sender.clone(),
183 },
184 },
185 &self.output_event_stream,
186 )
187 .await;
188
189 ensure!(
190 membership.has_da_stake(&self.public_key).await,
191 debug!(
192 "We were not chosen for consensus committee for view {view_number} in \
193 epoch {epoch_number:?}"
194 )
195 );
196 let total_weight =
197 vid_total_weight::<TYPES>(&membership.stake_table().await, epoch_number);
198
199 let version = self.upgrade_lock.version_infallible(view_number);
200
201 let txns = Arc::clone(&proposal.data.encoded_transactions);
202 let txns_clone = Arc::clone(&txns);
203 let metadata = proposal.data.metadata.encode();
204 let metadata_clone = metadata.clone();
205 let payload_commitment =
206 spawn_blocking(move || vid_commitment(&txns, &metadata, total_weight, version))
207 .await;
208 let payload_commitment = payload_commitment.unwrap();
209 let next_epoch_payload_commitment = if matches!(
210 proposal.data.epoch_transition_indicator,
211 EpochTransitionIndicator::InTransition
212 ) && self
213 .upgrade_lock
214 .epochs_enabled(proposal.data.view_number())
215 && epoch_number.is_some()
216 {
217 let next_epoch_total_weight = vid_total_weight::<TYPES>(
218 &membership
219 .next_epoch_stake_table()
220 .await?
221 .stake_table()
222 .await,
223 epoch_number.map(|epoch| epoch + 1),
224 );
225
226 let commit_result = spawn_blocking(move || {
227 vid_commitment(
228 &txns_clone,
229 &metadata_clone,
230 next_epoch_total_weight,
231 version,
232 )
233 })
234 .await;
235 Some(commit_result.unwrap())
236 } else {
237 None
238 };
239
240 let now = Instant::now();
241 self.storage
242 .append_da2(proposal, payload_commitment)
243 .await
244 .wrap()
245 .context(error!("Failed to append DA proposal to storage"))?;
246 self.storage_metrics
247 .append_da_duration
248 .add_point(now.elapsed().as_secs_f64());
249
250 let vote = DaVote2::create_signed_vote(
252 DaData2 {
253 payload_commit: payload_commitment,
254 next_epoch_payload_commit: next_epoch_payload_commitment,
255 epoch: epoch_number,
256 },
257 view_number,
258 &self.public_key,
259 &self.private_key,
260 &self.upgrade_lock,
261 )
262 .await?;
263
264 tracing::debug!("Sending vote to the DA leader {}", vote.view_number());
265
266 broadcast_event(Arc::new(HotShotEvent::DaVoteSend(vote)), &event_stream).await;
267 let mut consensus_writer = self.consensus.write().await;
268
269 if let Err(e) =
272 consensus_writer.update_da_view(view_number, epoch_number, payload_commitment)
273 {
274 tracing::trace!("{e:?}");
275 }
276
277 let payload_with_metadata = Arc::new(PayloadWithMetadata {
278 payload: TYPES::BlockPayload::from_bytes(
279 proposal.data.encoded_transactions.as_ref(),
280 &proposal.data.metadata,
281 ),
282 metadata: proposal.data.metadata.clone(),
283 });
284
285 if let Err(e) =
287 consensus_writer.update_saved_payloads(view_number, payload_with_metadata)
288 {
289 tracing::trace!("{e:?}");
290 }
291 drop(consensus_writer);
292
293 if self.network.is_primary_down() {
295 let my_id = self.id;
296 let consensus =
297 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
298 let pk = self.private_key.clone();
299 let public_key = self.public_key.clone();
300 let chan = event_stream.clone();
301 let upgrade_lock = self.upgrade_lock.clone();
302 let next_epoch = epoch_number.map(|epoch| epoch + 1);
303
304 let mut target_epochs = vec![];
305 if membership.has_stake(&public_key).await {
306 target_epochs.push(epoch_number);
307 }
308 if membership
309 .next_epoch_stake_table()
310 .await?
311 .has_stake(&public_key)
312 .await
313 {
314 target_epochs.push(next_epoch);
315 }
316 if target_epochs.is_empty() {
317 bail!(
318 "Not calculating VID, the node doesn't belong to the current epoch or \
319 the next epoch."
320 );
321 };
322
323 tracing::debug!(
324 "Primary network is down. Optimistically calculate own VID share."
325 );
326 let membership = membership.clone();
327 spawn(async move {
328 for target_epoch in target_epochs {
329 Consensus::calculate_and_update_vid(
330 OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
331 view_number,
332 target_epoch,
333 membership.coordinator.clone(),
334 &pk,
335 &upgrade_lock,
336 )
337 .await;
338 if let Some(vid_share) = consensus
339 .read()
340 .await
341 .vid_shares()
342 .get(&view_number)
343 .and_then(|key_map| key_map.get(&public_key))
344 .and_then(|epoch_map| epoch_map.get(&target_epoch))
345 {
346 tracing::debug!(
347 "Primary network is down. Calculated own VID share for epoch \
348 {target_epoch:?}, my id {my_id}"
349 );
350 broadcast_event(
351 Arc::new(HotShotEvent::VidShareRecv(
352 public_key.clone(),
353 vid_share.clone(),
354 )),
355 &chan,
356 )
357 .await;
358 }
359 }
360 });
361 }
362 },
363 HotShotEvent::DaVoteRecv(vote) => {
364 tracing::debug!("DA vote recv, Main Task {}", vote.view_number());
365 let view = vote.view_number();
367 let epoch = vote.data.epoch;
368 let membership = self
369 .membership_coordinator
370 .membership_for_epoch(epoch)
371 .await
372 .context(warn!("No stake table for epoch"))?;
373
374 ensure!(
375 membership.leader(view).await? == self.public_key,
376 debug!(
377 "We are not the DA committee leader for view {} are we leader for next \
378 view? {}",
379 *view,
380 membership.leader(view + 1).await? == self.public_key
381 )
382 );
383
384 handle_vote(
385 &mut self.vote_collectors,
386 vote,
387 self.public_key.clone(),
388 &membership,
389 self.id,
390 &event,
391 &event_stream,
392 &self.upgrade_lock,
393 EpochTransitionIndicator::NotInTransition,
394 )
395 .await?;
396 },
397 HotShotEvent::ViewChange(view, epoch) => {
398 if *epoch > self.cur_epoch {
399 self.cur_epoch = *epoch;
400 }
401
402 let view = *view;
403 ensure!(
404 *self.cur_view < *view,
405 info!("Received a view change to an older view.")
406 );
407
408 if *view - *self.cur_view > 1 {
409 tracing::info!("View changed by more than 1 going to view {view}");
410 }
411 self.cur_view = view;
412 },
413 HotShotEvent::BlockRecv(packed_bundle) => {
414 let PackedBundle::<TYPES> {
415 encoded_transactions,
416 metadata,
417 view_number,
418 ..
419 } = packed_bundle;
420 let view_number = *view_number;
421
422 let encoded_transactions_hash = Sha256::digest(encoded_transactions);
424
425 let signature =
427 TYPES::SignatureKey::sign(&self.private_key, &encoded_transactions_hash)
428 .wrap()?;
429
430 let epoch = self.cur_epoch;
431 let leader = self
432 .membership_coordinator
433 .membership_for_epoch(epoch)
434 .await
435 .context(warn!("No stake table for epoch"))?
436 .leader(view_number)
437 .await?;
438 if leader != self.public_key {
439 tracing::debug!(
440 "We are not the leader in the current epoch. Do not send the DA proposal"
441 );
442 return Ok(());
443 }
444 let consensus_reader = self.consensus.read().await;
445 let high_qc_block_number = consensus_reader.high_qc().data.block_number;
446 let epoch_transition_indicator = if self.upgrade_lock.epochs_enabled(view_number) {
451 match (high_qc_block_number, self.cur_epoch) {
452 (Some(block_number), Some(cur_epoch)) => {
453 let epoch = epoch_from_block_number(
454 block_number,
455 self.membership_coordinator.epoch_height,
456 );
457 if epoch < *cur_epoch {
458 EpochTransitionIndicator::NotInTransition
460 } else if !is_last_block(
461 block_number,
462 self.membership_coordinator.epoch_height,
463 ) && is_ge_epoch_root(
464 block_number,
465 self.membership_coordinator.epoch_height,
466 ) {
467 EpochTransitionIndicator::InTransition
468 } else {
469 EpochTransitionIndicator::NotInTransition
470 }
471 },
472 _ => EpochTransitionIndicator::NotInTransition,
473 }
474 } else {
475 EpochTransitionIndicator::NotInTransition
476 };
477
478 drop(consensus_reader);
479
480 let data: DaProposal2<TYPES> = DaProposal2 {
481 encoded_transactions: Arc::clone(encoded_transactions),
482 metadata: metadata.clone(),
483 view_number,
485 epoch,
486 epoch_transition_indicator,
487 };
488
489 let message = Proposal {
490 data,
491 signature,
492 _pd: PhantomData,
493 };
494
495 broadcast_event(
496 Arc::new(HotShotEvent::DaProposalSend(
497 message.clone(),
498 self.public_key.clone(),
499 )),
500 &event_stream,
501 )
502 .await;
503 let payload_with_metadata = Arc::new(PayloadWithMetadata {
504 payload: TYPES::BlockPayload::from_bytes(
505 encoded_transactions.as_ref(),
506 metadata,
507 ),
508 metadata: metadata.clone(),
509 });
510 let update_result = self
512 .consensus
513 .write()
514 .await
515 .update_saved_payloads(view_number, payload_with_metadata);
516 if let Err(e) = update_result {
517 tracing::trace!("{e:?}");
518 }
519 },
520 _ => {},
521 }
522 Ok(())
523 }
524}
525
526#[async_trait]
528impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for DaTaskState<TYPES, I> {
529 type Event = HotShotEvent<TYPES>;
530
531 async fn handle_event(
532 &mut self,
533 event: Arc<Self::Event>,
534 sender: &Sender<Arc<Self::Event>>,
535 _receiver: &Receiver<Arc<Self::Event>>,
536 ) -> Result<()> {
537 self.handle(event, sender.clone()).await
538 }
539
540 fn cancel_subtasks(&mut self) {}
541}