1use std::{marker::PhantomData, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13 consensus::{Consensus, OuterConsensus, PayloadWithMetadata},
14 data::{vid_commitment, vid_disperse::vid_total_weight, DaProposal2, PackedBundle},
15 epoch_membership::EpochMembershipCoordinator,
16 event::{Event, EventType},
17 message::{Proposal, UpgradeLock},
18 simple_certificate::DaCertificate2,
19 simple_vote::{DaData2, DaVote2},
20 storage_metrics::StorageMetricsValue,
21 traits::{
22 network::ConnectedNetwork,
23 node_implementation::{NodeImplementation, NodeType, Versions},
24 signature_key::SignatureKey,
25 storage::Storage,
26 BlockPayload, EncodeBytes,
27 },
28 utils::EpochTransitionIndicator,
29 vote::HasViewNumber,
30};
31use hotshot_utils::anytrace::*;
32use sha2::{Digest, Sha256};
33use tokio::{spawn, task::spawn_blocking};
34use tracing::instrument;
35
36use crate::{
37 events::HotShotEvent,
38 helpers::broadcast_event,
39 vote_collection::{handle_vote, VoteCollectorsMap},
40};
41
42pub struct DaTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
44 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
46
47 pub cur_view: TYPES::View,
49
50 pub cur_epoch: Option<TYPES::Epoch>,
52
53 pub consensus: OuterConsensus<TYPES>,
55
56 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
60
61 pub network: Arc<I::Network>,
63
64 pub vote_collectors: VoteCollectorsMap<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>, V>,
66
67 pub public_key: TYPES::SignatureKey,
69
70 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
72
73 pub id: u64,
75
76 pub storage: I::Storage,
78
79 pub storage_metrics: Arc<StorageMetricsValue>,
81
82 pub upgrade_lock: UpgradeLock<TYPES, V>,
84}
85
86impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYPES, I, V> {
87 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "DA Main Task", level = "error", target = "DaTaskState")]
89 pub async fn handle(
90 &mut self,
91 event: Arc<HotShotEvent<TYPES>>,
92 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
93 ) -> Result<()> {
94 match event.as_ref() {
95 HotShotEvent::DaProposalRecv(proposal, sender) => {
96 let sender = sender.clone();
97 tracing::debug!(
98 "DA proposal received for view: {}",
99 proposal.data.view_number()
100 );
101 let view = proposal.data.view_number();
103
104 ensure!(
109 self.cur_view <= view + 1,
110 "Throwing away DA proposal that is more than one view older"
111 );
112
113 if let Some(entry) = self.consensus.read().await.saved_payloads().get(&view) {
114 ensure!(
115 entry.payload.encode() == proposal.data.encoded_transactions,
116 "Received DA proposal for view {view} but we already have a payload for \
117 that view and they are not identical. Throwing it away",
118 );
119 }
120
121 let encoded_transactions_hash = Sha256::digest(&proposal.data.encoded_transactions);
122 let view_leader_key = self
123 .membership_coordinator
124 .membership_for_epoch(proposal.data.epoch)
125 .await
126 .context(warn!("No stake table for epoch {:?}", proposal.data.epoch))?
127 .leader(view)
128 .await?;
129 ensure!(
130 view_leader_key == sender,
131 warn!(
132 "DA proposal doesn't have expected leader key for view {} \n DA proposal \
133 is: {:?}",
134 *view,
135 proposal.data.clone()
136 )
137 );
138
139 ensure!(
140 view_leader_key.validate(&proposal.signature, &encoded_transactions_hash),
141 warn!("Could not verify proposal.")
142 );
143
144 broadcast_event(
145 Arc::new(HotShotEvent::DaProposalValidated(proposal.clone(), sender)),
146 &event_stream,
147 )
148 .await;
149 },
150 HotShotEvent::DaProposalValidated(proposal, sender) => {
151 let cur_view = self.consensus.read().await.cur_view();
152 let view_number = proposal.data.view_number();
153 let epoch_number = proposal.data.epoch;
154 let membership = self
155 .membership_coordinator
156 .stake_table_for_epoch(epoch_number)
157 .await
158 .context(warn!("No stake table for epoch"))?;
159
160 ensure!(
161 cur_view <= view_number + 1,
162 debug!(
163 "Validated DA proposal for prior view but it's too old now Current view \
164 {cur_view}, DA Proposal view {}",
165 proposal.data.view_number()
166 )
167 );
168
169 broadcast_event(
171 Event {
172 view_number,
173 event: EventType::DaProposal {
174 proposal: proposal.clone(),
175 sender: sender.clone(),
176 },
177 },
178 &self.output_event_stream,
179 )
180 .await;
181
182 ensure!(
183 membership.has_da_stake(&self.public_key).await,
184 debug!(
185 "We were not chosen for consensus committee for view {view_number} in \
186 epoch {epoch_number:?}"
187 )
188 );
189 let total_weight =
190 vid_total_weight::<TYPES>(&membership.stake_table().await, epoch_number);
191
192 let version = self.upgrade_lock.version_infallible(view_number).await;
193
194 let txns = Arc::clone(&proposal.data.encoded_transactions);
195 let txns_clone = Arc::clone(&txns);
196 let metadata = proposal.data.metadata.encode();
197 let metadata_clone = metadata.clone();
198 let payload_commitment = spawn_blocking(move || {
199 vid_commitment::<V>(&txns, &metadata, total_weight, version)
200 })
201 .await;
202 let payload_commitment = payload_commitment.unwrap();
203 let next_epoch_payload_commitment = if matches!(
204 proposal.data.epoch_transition_indicator,
205 EpochTransitionIndicator::InTransition
206 ) && self
207 .upgrade_lock
208 .epochs_enabled(proposal.data.view_number())
209 .await
210 && epoch_number.is_some()
211 {
212 let next_epoch_total_weight = vid_total_weight::<TYPES>(
213 &membership
214 .next_epoch_stake_table()
215 .await?
216 .stake_table()
217 .await,
218 epoch_number.map(|epoch| epoch + 1),
219 );
220
221 let commit_result = spawn_blocking(move || {
222 vid_commitment::<V>(
223 &txns_clone,
224 &metadata_clone,
225 next_epoch_total_weight,
226 version,
227 )
228 })
229 .await;
230 Some(commit_result.unwrap())
231 } else {
232 None
233 };
234
235 let now = Instant::now();
236 self.storage
237 .append_da2(proposal, payload_commitment)
238 .await
239 .wrap()
240 .context(error!("Failed to append DA proposal to storage"))?;
241 self.storage_metrics
242 .append_da_duration
243 .add_point(now.elapsed().as_secs_f64());
244
245 let vote = DaVote2::create_signed_vote(
247 DaData2 {
248 payload_commit: payload_commitment,
249 next_epoch_payload_commit: next_epoch_payload_commitment,
250 epoch: epoch_number,
251 },
252 view_number,
253 &self.public_key,
254 &self.private_key,
255 &self.upgrade_lock,
256 )
257 .await?;
258
259 tracing::debug!("Sending vote to the DA leader {}", vote.view_number());
260
261 broadcast_event(Arc::new(HotShotEvent::DaVoteSend(vote)), &event_stream).await;
262 let mut consensus_writer = self.consensus.write().await;
263
264 if let Err(e) =
267 consensus_writer.update_da_view(view_number, epoch_number, payload_commitment)
268 {
269 tracing::trace!("{e:?}");
270 }
271
272 let payload_with_metadata = Arc::new(PayloadWithMetadata {
273 payload: TYPES::BlockPayload::from_bytes(
274 proposal.data.encoded_transactions.as_ref(),
275 &proposal.data.metadata,
276 ),
277 metadata: proposal.data.metadata.clone(),
278 });
279
280 if let Err(e) =
282 consensus_writer.update_saved_payloads(view_number, payload_with_metadata)
283 {
284 tracing::trace!("{e:?}");
285 }
286 drop(consensus_writer);
287
288 if self.network.is_primary_down() {
290 let my_id = self.id;
291 let consensus =
292 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
293 let pk = self.private_key.clone();
294 let public_key = self.public_key.clone();
295 let chan = event_stream.clone();
296 let upgrade_lock = self.upgrade_lock.clone();
297 let next_epoch = epoch_number.map(|epoch| epoch + 1);
298
299 let mut target_epochs = vec![];
300 if membership.has_stake(&public_key).await {
301 target_epochs.push(epoch_number);
302 }
303 if membership
304 .next_epoch_stake_table()
305 .await?
306 .has_stake(&public_key)
307 .await
308 {
309 target_epochs.push(next_epoch);
310 }
311 if target_epochs.is_empty() {
312 bail!(
313 "Not calculating VID, the node doesn't belong to the current epoch or \
314 the next epoch."
315 );
316 };
317
318 tracing::debug!(
319 "Primary network is down. Optimistically calculate own VID share."
320 );
321 let membership = membership.clone();
322 spawn(async move {
323 for target_epoch in target_epochs {
324 Consensus::calculate_and_update_vid::<V>(
325 OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
326 view_number,
327 target_epoch,
328 membership.coordinator.clone(),
329 &pk,
330 &upgrade_lock,
331 )
332 .await;
333 if let Some(vid_share) = consensus
334 .read()
335 .await
336 .vid_shares()
337 .get(&view_number)
338 .and_then(|key_map| key_map.get(&public_key))
339 .and_then(|epoch_map| epoch_map.get(&target_epoch))
340 {
341 tracing::debug!(
342 "Primary network is down. Calculated own VID share for epoch \
343 {target_epoch:?}, my id {my_id}"
344 );
345 broadcast_event(
346 Arc::new(HotShotEvent::VidShareRecv(
347 public_key.clone(),
348 vid_share.clone(),
349 )),
350 &chan,
351 )
352 .await;
353 }
354 }
355 });
356 }
357 },
358 HotShotEvent::DaVoteRecv(ref vote) => {
359 tracing::debug!("DA vote recv, Main Task {}", vote.view_number());
360 let view = vote.view_number();
362 let epoch = vote.data.epoch;
363 let membership = self
364 .membership_coordinator
365 .membership_for_epoch(epoch)
366 .await
367 .context(warn!("No stake table for epoch"))?;
368
369 ensure!(
370 membership.leader(view).await? == self.public_key,
371 debug!(
372 "We are not the DA committee leader for view {} are we leader for next \
373 view? {}",
374 *view,
375 membership.leader(view + 1).await? == self.public_key
376 )
377 );
378
379 handle_vote(
380 &mut self.vote_collectors,
381 vote,
382 self.public_key.clone(),
383 &membership,
384 self.id,
385 &event,
386 &event_stream,
387 &self.upgrade_lock,
388 EpochTransitionIndicator::NotInTransition,
389 )
390 .await?;
391 },
392 HotShotEvent::ViewChange(view, epoch) => {
393 if *epoch > self.cur_epoch {
394 self.cur_epoch = *epoch;
395 }
396
397 let view = *view;
398 ensure!(
399 *self.cur_view < *view,
400 info!("Received a view change to an older view.")
401 );
402
403 if *view - *self.cur_view > 1 {
404 tracing::info!("View changed by more than 1 going to view {view}");
405 }
406 self.cur_view = view;
407 },
408 HotShotEvent::BlockRecv(packed_bundle) => {
409 let PackedBundle::<TYPES> {
410 encoded_transactions,
411 metadata,
412 view_number,
413 ..
414 } = packed_bundle;
415 let view_number = *view_number;
416
417 let encoded_transactions_hash = Sha256::digest(encoded_transactions);
419
420 let signature =
422 TYPES::SignatureKey::sign(&self.private_key, &encoded_transactions_hash)
423 .wrap()?;
424
425 let epoch = self.cur_epoch;
426 let leader = self
427 .membership_coordinator
428 .membership_for_epoch(epoch)
429 .await
430 .context(warn!("No stake table for epoch"))?
431 .leader(view_number)
432 .await?;
433 if leader != self.public_key {
434 tracing::debug!(
435 "We are not the leader in the current epoch. Do not send the DA proposal"
436 );
437 return Ok(());
438 }
439 let epoch_transition_indicator =
440 if self.consensus.read().await.is_high_qc_ge_root_block() {
441 EpochTransitionIndicator::InTransition
442 } else {
443 EpochTransitionIndicator::NotInTransition
444 };
445 let data: DaProposal2<TYPES> = DaProposal2 {
446 encoded_transactions: Arc::clone(encoded_transactions),
447 metadata: metadata.clone(),
448 view_number,
450 epoch,
451 epoch_transition_indicator,
452 };
453
454 let message = Proposal {
455 data,
456 signature,
457 _pd: PhantomData,
458 };
459
460 broadcast_event(
461 Arc::new(HotShotEvent::DaProposalSend(
462 message.clone(),
463 self.public_key.clone(),
464 )),
465 &event_stream,
466 )
467 .await;
468 let payload_with_metadata = Arc::new(PayloadWithMetadata {
469 payload: TYPES::BlockPayload::from_bytes(
470 encoded_transactions.as_ref(),
471 metadata,
472 ),
473 metadata: metadata.clone(),
474 });
475 if let Err(e) = self
477 .consensus
478 .write()
479 .await
480 .update_saved_payloads(view_number, payload_with_metadata)
481 {
482 tracing::trace!("{e:?}");
483 }
484 },
485 _ => {},
486 }
487 Ok(())
488 }
489}
490
491#[async_trait]
492impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
494 for DaTaskState<TYPES, I, V>
495{
496 type Event = HotShotEvent<TYPES>;
497
498 async fn handle_event(
499 &mut self,
500 event: Arc<Self::Event>,
501 sender: &Sender<Arc<Self::Event>>,
502 _receiver: &Receiver<Arc<Self::Event>>,
503 ) -> Result<()> {
504 self.handle(event, sender.clone()).await
505 }
506
507 fn cancel_subtasks(&mut self) {}
508}