1use std::{marker::PhantomData, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13 consensus::{Consensus, OuterConsensus, PayloadWithMetadata},
14 data::{vid_commitment, vid_disperse::vid_total_weight, DaProposal2, PackedBundle},
15 epoch_membership::EpochMembershipCoordinator,
16 event::{Event, EventType},
17 message::{Proposal, UpgradeLock},
18 simple_certificate::DaCertificate2,
19 simple_vote::{DaData2, DaVote2},
20 storage_metrics::StorageMetricsValue,
21 traits::{
22 network::ConnectedNetwork,
23 node_implementation::{NodeImplementation, NodeType, Versions},
24 signature_key::SignatureKey,
25 storage::Storage,
26 BlockPayload, EncodeBytes,
27 },
28 utils::{epoch_from_block_number, is_ge_epoch_root, is_last_block, EpochTransitionIndicator},
29 vote::HasViewNumber,
30};
31use hotshot_utils::anytrace::*;
32use sha2::{Digest, Sha256};
33use tokio::{spawn, task::spawn_blocking};
34use tracing::instrument;
35
36use crate::{
37 events::HotShotEvent,
38 helpers::broadcast_event,
39 vote_collection::{handle_vote, VoteCollectorsMap},
40};
41
42pub struct DaTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
44 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
46
47 pub cur_view: TYPES::View,
49
50 pub cur_epoch: Option<TYPES::Epoch>,
52
53 pub consensus: OuterConsensus<TYPES>,
55
56 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
60
61 pub network: Arc<I::Network>,
63
64 pub vote_collectors: VoteCollectorsMap<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>, V>,
66
67 pub public_key: TYPES::SignatureKey,
69
70 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
72
73 pub id: u64,
75
76 pub storage: I::Storage,
78
79 pub storage_metrics: Arc<StorageMetricsValue>,
81
82 pub upgrade_lock: UpgradeLock<TYPES, V>,
84}
85
86impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYPES, I, V> {
87 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "DA Main Task", level = "error", target = "DaTaskState")]
89 pub async fn handle(
90 &mut self,
91 event: Arc<HotShotEvent<TYPES>>,
92 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
93 ) -> Result<()> {
94 match event.as_ref() {
95 HotShotEvent::DaProposalRecv(proposal, sender) => {
96 let sender = sender.clone();
97 tracing::debug!(
98 "DA proposal received for view: {}",
99 proposal.data.view_number()
100 );
101 let view = proposal.data.view_number();
103
104 ensure!(
109 self.cur_view <= view + 1,
110 "Throwing away DA proposal that is more than one view older"
111 );
112
113 if let Some(entry) = self.consensus.read().await.saved_payloads().get(&view) {
114 ensure!(
115 entry.payload.encode() == proposal.data.encoded_transactions,
116 "Received DA proposal for view {view} but we already have a payload for \
117 that view and they are not identical. Throwing it away",
118 );
119 }
120
121 let encoded_transactions_hash = Sha256::digest(&proposal.data.encoded_transactions);
122 let view_leader_key = self
123 .membership_coordinator
124 .membership_for_epoch(proposal.data.epoch)
125 .await
126 .context(warn!("No stake table for epoch {:?}", proposal.data.epoch))?
127 .leader(view)
128 .await?;
129 ensure!(
130 view_leader_key == sender,
131 warn!(
132 "DA proposal doesn't have expected leader key for view {} \n DA proposal \
133 is: {:?}",
134 *view,
135 proposal.data.clone()
136 )
137 );
138
139 ensure!(
140 view_leader_key.validate(&proposal.signature, &encoded_transactions_hash),
141 warn!("Could not verify proposal.")
142 );
143
144 broadcast_event(
145 Arc::new(HotShotEvent::DaProposalValidated(proposal.clone(), sender)),
146 &event_stream,
147 )
148 .await;
149 },
150 HotShotEvent::DaProposalValidated(proposal, sender) => {
151 tracing::debug!(
152 "DA proposal validated for view {}",
153 proposal.data.view_number()
154 );
155 let cur_view = self.consensus.read().await.cur_view();
156 let view_number = proposal.data.view_number();
157 let epoch_number = proposal.data.epoch;
158 let membership = self
159 .membership_coordinator
160 .stake_table_for_epoch(epoch_number)
161 .await
162 .context(warn!("No stake table for epoch"))?;
163
164 ensure!(
165 cur_view <= view_number + 1,
166 debug!(
167 "Validated DA proposal for prior view but it's too old now Current view \
168 {cur_view}, DA Proposal view {}",
169 proposal.data.view_number()
170 )
171 );
172
173 broadcast_event(
175 Event {
176 view_number,
177 event: EventType::DaProposal {
178 proposal: proposal.clone(),
179 sender: sender.clone(),
180 },
181 },
182 &self.output_event_stream,
183 )
184 .await;
185
186 ensure!(
187 membership.has_da_stake(&self.public_key).await,
188 debug!(
189 "We were not chosen for consensus committee for view {view_number} in \
190 epoch {epoch_number:?}"
191 )
192 );
193 let total_weight =
194 vid_total_weight::<TYPES>(&membership.stake_table().await, epoch_number);
195
196 let version = self.upgrade_lock.version_infallible(view_number).await;
197
198 let txns = Arc::clone(&proposal.data.encoded_transactions);
199 let txns_clone = Arc::clone(&txns);
200 let metadata = proposal.data.metadata.encode();
201 let metadata_clone = metadata.clone();
202 let payload_commitment = spawn_blocking(move || {
203 vid_commitment::<V>(&txns, &metadata, total_weight, version)
204 })
205 .await;
206 let payload_commitment = payload_commitment.unwrap();
207 let next_epoch_payload_commitment = if matches!(
208 proposal.data.epoch_transition_indicator,
209 EpochTransitionIndicator::InTransition
210 ) && self
211 .upgrade_lock
212 .epochs_enabled(proposal.data.view_number())
213 .await
214 && epoch_number.is_some()
215 {
216 let next_epoch_total_weight = vid_total_weight::<TYPES>(
217 &membership
218 .next_epoch_stake_table()
219 .await?
220 .stake_table()
221 .await,
222 epoch_number.map(|epoch| epoch + 1),
223 );
224
225 let commit_result = spawn_blocking(move || {
226 vid_commitment::<V>(
227 &txns_clone,
228 &metadata_clone,
229 next_epoch_total_weight,
230 version,
231 )
232 })
233 .await;
234 Some(commit_result.unwrap())
235 } else {
236 None
237 };
238
239 let now = Instant::now();
240 self.storage
241 .append_da2(proposal, payload_commitment)
242 .await
243 .wrap()
244 .context(error!("Failed to append DA proposal to storage"))?;
245 self.storage_metrics
246 .append_da_duration
247 .add_point(now.elapsed().as_secs_f64());
248
249 let vote = DaVote2::create_signed_vote(
251 DaData2 {
252 payload_commit: payload_commitment,
253 next_epoch_payload_commit: next_epoch_payload_commitment,
254 epoch: epoch_number,
255 },
256 view_number,
257 &self.public_key,
258 &self.private_key,
259 &self.upgrade_lock,
260 )
261 .await?;
262
263 tracing::debug!("Sending vote to the DA leader {}", vote.view_number());
264
265 broadcast_event(Arc::new(HotShotEvent::DaVoteSend(vote)), &event_stream).await;
266 let mut consensus_writer = self.consensus.write().await;
267
268 if let Err(e) =
271 consensus_writer.update_da_view(view_number, epoch_number, payload_commitment)
272 {
273 tracing::trace!("{e:?}");
274 }
275
276 let payload_with_metadata = Arc::new(PayloadWithMetadata {
277 payload: TYPES::BlockPayload::from_bytes(
278 proposal.data.encoded_transactions.as_ref(),
279 &proposal.data.metadata,
280 ),
281 metadata: proposal.data.metadata.clone(),
282 });
283
284 if let Err(e) =
286 consensus_writer.update_saved_payloads(view_number, payload_with_metadata)
287 {
288 tracing::trace!("{e:?}");
289 }
290 drop(consensus_writer);
291
292 if self.network.is_primary_down() {
294 let my_id = self.id;
295 let consensus =
296 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
297 let pk = self.private_key.clone();
298 let public_key = self.public_key.clone();
299 let chan = event_stream.clone();
300 let upgrade_lock = self.upgrade_lock.clone();
301 let next_epoch = epoch_number.map(|epoch| epoch + 1);
302
303 let mut target_epochs = vec![];
304 if membership.has_stake(&public_key).await {
305 target_epochs.push(epoch_number);
306 }
307 if membership
308 .next_epoch_stake_table()
309 .await?
310 .has_stake(&public_key)
311 .await
312 {
313 target_epochs.push(next_epoch);
314 }
315 if target_epochs.is_empty() {
316 bail!(
317 "Not calculating VID, the node doesn't belong to the current epoch or \
318 the next epoch."
319 );
320 };
321
322 tracing::debug!(
323 "Primary network is down. Optimistically calculate own VID share."
324 );
325 let membership = membership.clone();
326 spawn(async move {
327 for target_epoch in target_epochs {
328 Consensus::calculate_and_update_vid::<V>(
329 OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
330 view_number,
331 target_epoch,
332 membership.coordinator.clone(),
333 &pk,
334 &upgrade_lock,
335 )
336 .await;
337 if let Some(vid_share) = consensus
338 .read()
339 .await
340 .vid_shares()
341 .get(&view_number)
342 .and_then(|key_map| key_map.get(&public_key))
343 .and_then(|epoch_map| epoch_map.get(&target_epoch))
344 {
345 tracing::debug!(
346 "Primary network is down. Calculated own VID share for epoch \
347 {target_epoch:?}, my id {my_id}"
348 );
349 broadcast_event(
350 Arc::new(HotShotEvent::VidShareRecv(
351 public_key.clone(),
352 vid_share.clone(),
353 )),
354 &chan,
355 )
356 .await;
357 }
358 }
359 });
360 }
361 },
362 HotShotEvent::DaVoteRecv(ref vote) => {
363 tracing::debug!("DA vote recv, Main Task {}", vote.view_number());
364 let view = vote.view_number();
366 let epoch = vote.data.epoch;
367 let membership = self
368 .membership_coordinator
369 .membership_for_epoch(epoch)
370 .await
371 .context(warn!("No stake table for epoch"))?;
372
373 ensure!(
374 membership.leader(view).await? == self.public_key,
375 debug!(
376 "We are not the DA committee leader for view {} are we leader for next \
377 view? {}",
378 *view,
379 membership.leader(view + 1).await? == self.public_key
380 )
381 );
382
383 handle_vote(
384 &mut self.vote_collectors,
385 vote,
386 self.public_key.clone(),
387 &membership,
388 self.id,
389 &event,
390 &event_stream,
391 &self.upgrade_lock,
392 EpochTransitionIndicator::NotInTransition,
393 )
394 .await?;
395 },
396 HotShotEvent::ViewChange(view, epoch) => {
397 if *epoch > self.cur_epoch {
398 self.cur_epoch = *epoch;
399 }
400
401 let view = *view;
402 ensure!(
403 *self.cur_view < *view,
404 info!("Received a view change to an older view.")
405 );
406
407 if *view - *self.cur_view > 1 {
408 tracing::info!("View changed by more than 1 going to view {view}");
409 }
410 self.cur_view = view;
411 },
412 HotShotEvent::BlockRecv(packed_bundle) => {
413 let PackedBundle::<TYPES> {
414 encoded_transactions,
415 metadata,
416 view_number,
417 ..
418 } = packed_bundle;
419 let view_number = *view_number;
420
421 let encoded_transactions_hash = Sha256::digest(encoded_transactions);
423
424 let signature =
426 TYPES::SignatureKey::sign(&self.private_key, &encoded_transactions_hash)
427 .wrap()?;
428
429 let epoch = self.cur_epoch;
430 let leader = self
431 .membership_coordinator
432 .membership_for_epoch(epoch)
433 .await
434 .context(warn!("No stake table for epoch"))?
435 .leader(view_number)
436 .await?;
437 if leader != self.public_key {
438 tracing::debug!(
439 "We are not the leader in the current epoch. Do not send the DA proposal"
440 );
441 return Ok(());
442 }
443 let consensus_reader = self.consensus.read().await;
444 let high_qc_block_number = consensus_reader.high_qc().data.block_number;
445 let epoch_transition_indicator =
450 if self.upgrade_lock.epochs_enabled(view_number).await {
451 match (high_qc_block_number, self.cur_epoch) {
452 (Some(block_number), Some(cur_epoch)) => {
453 let epoch = epoch_from_block_number(
454 block_number,
455 self.membership_coordinator.epoch_height,
456 );
457 if epoch < *cur_epoch {
458 EpochTransitionIndicator::NotInTransition
460 } else if !is_last_block(
461 block_number,
462 self.membership_coordinator.epoch_height,
463 ) && is_ge_epoch_root(
464 block_number,
465 self.membership_coordinator.epoch_height,
466 ) {
467 EpochTransitionIndicator::InTransition
468 } else {
469 EpochTransitionIndicator::NotInTransition
470 }
471 },
472 _ => EpochTransitionIndicator::NotInTransition,
473 }
474 } else {
475 EpochTransitionIndicator::NotInTransition
476 };
477
478 drop(consensus_reader);
479
480 let data: DaProposal2<TYPES> = DaProposal2 {
481 encoded_transactions: Arc::clone(encoded_transactions),
482 metadata: metadata.clone(),
483 view_number,
485 epoch,
486 epoch_transition_indicator,
487 };
488
489 let message = Proposal {
490 data,
491 signature,
492 _pd: PhantomData,
493 };
494
495 broadcast_event(
496 Arc::new(HotShotEvent::DaProposalSend(
497 message.clone(),
498 self.public_key.clone(),
499 )),
500 &event_stream,
501 )
502 .await;
503 let payload_with_metadata = Arc::new(PayloadWithMetadata {
504 payload: TYPES::BlockPayload::from_bytes(
505 encoded_transactions.as_ref(),
506 metadata,
507 ),
508 metadata: metadata.clone(),
509 });
510 let update_result = self
512 .consensus
513 .write()
514 .await
515 .update_saved_payloads(view_number, payload_with_metadata);
516 if let Err(e) = update_result {
517 tracing::trace!("{e:?}");
518 }
519 },
520 _ => {},
521 }
522 Ok(())
523 }
524}
525
526#[async_trait]
527impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
529 for DaTaskState<TYPES, I, V>
530{
531 type Event = HotShotEvent<TYPES>;
532
533 async fn handle_event(
534 &mut self,
535 event: Arc<Self::Event>,
536 sender: &Sender<Arc<Self::Event>>,
537 _receiver: &Receiver<Arc<Self::Event>>,
538 ) -> Result<()> {
539 self.handle(event, sender.clone()).await
540 }
541
542 fn cancel_subtasks(&mut self) {}
543}