hotshot_task_impls/
da.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{marker::PhantomData, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13    consensus::{Consensus, OuterConsensus, PayloadWithMetadata},
14    data::{vid_commitment, vid_disperse::vid_total_weight, DaProposal2, PackedBundle},
15    epoch_membership::EpochMembershipCoordinator,
16    event::{Event, EventType},
17    message::{Proposal, UpgradeLock},
18    simple_certificate::DaCertificate2,
19    simple_vote::{DaData2, DaVote2},
20    storage_metrics::StorageMetricsValue,
21    traits::{
22        network::ConnectedNetwork,
23        node_implementation::{NodeImplementation, NodeType, Versions},
24        signature_key::SignatureKey,
25        storage::Storage,
26        BlockPayload, EncodeBytes,
27    },
28    utils::{epoch_from_block_number, is_ge_epoch_root, is_last_block, EpochTransitionIndicator},
29    vote::HasViewNumber,
30};
31use hotshot_utils::anytrace::*;
32use sha2::{Digest, Sha256};
33use tokio::{spawn, task::spawn_blocking};
34use tracing::instrument;
35
36use crate::{
37    events::HotShotEvent,
38    helpers::broadcast_event,
39    vote_collection::{handle_vote, VoteCollectorsMap},
40};
41
42/// Tracks state of a DA task
43pub struct DaTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
44    /// Output events to application
45    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
46
47    /// View number this view is executing in.
48    pub cur_view: TYPES::View,
49
50    /// Epoch number this node is executing in.
51    pub cur_epoch: Option<TYPES::Epoch>,
52
53    /// Reference to consensus. Leader will require a read lock on this.
54    pub consensus: OuterConsensus<TYPES>,
55
56    /// Membership for the DA committee and quorum committee.
57    /// We need the latter only for calculating the proper VID scheme
58    /// from the number of nodes in the quorum.
59    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
60
61    /// The underlying network
62    pub network: Arc<I::Network>,
63
64    /// A map of `DaVote` collector tasks.
65    pub vote_collectors: VoteCollectorsMap<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>, V>,
66
67    /// This Nodes public key
68    pub public_key: TYPES::SignatureKey,
69
70    /// This Nodes private key
71    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
72
73    /// This state's ID
74    pub id: u64,
75
76    /// This node's storage ref
77    pub storage: I::Storage,
78
79    /// Storage metrics
80    pub storage_metrics: Arc<StorageMetricsValue>,
81
82    /// Lock for a decided upgrade
83    pub upgrade_lock: UpgradeLock<TYPES, V>,
84}
85
86impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYPES, I, V> {
87    /// main task event handler
88    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "DA Main Task", level = "error", target = "DaTaskState")]
89    pub async fn handle(
90        &mut self,
91        event: Arc<HotShotEvent<TYPES>>,
92        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
93    ) -> Result<()> {
94        match event.as_ref() {
95            HotShotEvent::DaProposalRecv(proposal, sender) => {
96                let sender = sender.clone();
97                tracing::debug!(
98                    "DA proposal received for view: {}",
99                    proposal.data.view_number()
100                );
101                // ED NOTE: Assuming that the next view leader is the one who sends DA proposal for this view
102                let view = proposal.data.view_number();
103
104                // Allow a DA proposal that is one view older, in case we have voted on a quorum
105                // proposal and updated the view.
106                //
107                // Anything older is discarded because it is no longer relevant.
108                ensure!(
109                    self.cur_view <= view + 1,
110                    "Throwing away DA proposal that is more than one view older"
111                );
112
113                if let Some(entry) = self.consensus.read().await.saved_payloads().get(&view) {
114                    ensure!(
115                        entry.payload.encode() == proposal.data.encoded_transactions,
116                        "Received DA proposal for view {view} but we already have a payload for \
117                         that view and they are not identical.  Throwing it away",
118                    );
119                }
120
121                let encoded_transactions_hash = Sha256::digest(&proposal.data.encoded_transactions);
122                let view_leader_key = self
123                    .membership_coordinator
124                    .membership_for_epoch(proposal.data.epoch)
125                    .await
126                    .context(warn!("No stake table for epoch {:?}", proposal.data.epoch))?
127                    .leader(view)
128                    .await?;
129                ensure!(
130                    view_leader_key == sender,
131                    warn!(
132                        "DA proposal doesn't have expected leader key for view {} \n DA proposal \
133                         is: {:?}",
134                        *view,
135                        proposal.data.clone()
136                    )
137                );
138
139                ensure!(
140                    view_leader_key.validate(&proposal.signature, &encoded_transactions_hash),
141                    warn!("Could not verify proposal.")
142                );
143
144                broadcast_event(
145                    Arc::new(HotShotEvent::DaProposalValidated(proposal.clone(), sender)),
146                    &event_stream,
147                )
148                .await;
149            },
150            HotShotEvent::DaProposalValidated(proposal, sender) => {
151                tracing::debug!(
152                    "DA proposal validated for view {}",
153                    proposal.data.view_number()
154                );
155                let cur_view = self.consensus.read().await.cur_view();
156                let view_number = proposal.data.view_number();
157                let epoch_number = proposal.data.epoch;
158                let membership = self
159                    .membership_coordinator
160                    .stake_table_for_epoch(epoch_number)
161                    .await
162                    .context(warn!("No stake table for epoch"))?;
163
164                ensure!(
165                    cur_view <= view_number + 1,
166                    debug!(
167                        "Validated DA proposal for prior view but it's too old now Current view \
168                         {cur_view}, DA Proposal view {}",
169                        proposal.data.view_number()
170                    )
171                );
172
173                // Proposal is fresh and valid, notify the application layer
174                broadcast_event(
175                    Event {
176                        view_number,
177                        event: EventType::DaProposal {
178                            proposal: proposal.clone(),
179                            sender: sender.clone(),
180                        },
181                    },
182                    &self.output_event_stream,
183                )
184                .await;
185
186                ensure!(
187                    membership.has_da_stake(&self.public_key).await,
188                    debug!(
189                        "We were not chosen for consensus committee for view {view_number} in \
190                         epoch {epoch_number:?}"
191                    )
192                );
193                let total_weight =
194                    vid_total_weight::<TYPES>(&membership.stake_table().await, epoch_number);
195
196                let version = self.upgrade_lock.version_infallible(view_number).await;
197
198                let txns = Arc::clone(&proposal.data.encoded_transactions);
199                let txns_clone = Arc::clone(&txns);
200                let metadata = proposal.data.metadata.encode();
201                let metadata_clone = metadata.clone();
202                let payload_commitment = spawn_blocking(move || {
203                    vid_commitment::<V>(&txns, &metadata, total_weight, version)
204                })
205                .await;
206                let payload_commitment = payload_commitment.unwrap();
207                let next_epoch_payload_commitment = if matches!(
208                    proposal.data.epoch_transition_indicator,
209                    EpochTransitionIndicator::InTransition
210                ) && self
211                    .upgrade_lock
212                    .epochs_enabled(proposal.data.view_number())
213                    .await
214                    && epoch_number.is_some()
215                {
216                    let next_epoch_total_weight = vid_total_weight::<TYPES>(
217                        &membership
218                            .next_epoch_stake_table()
219                            .await?
220                            .stake_table()
221                            .await,
222                        epoch_number.map(|epoch| epoch + 1),
223                    );
224
225                    let commit_result = spawn_blocking(move || {
226                        vid_commitment::<V>(
227                            &txns_clone,
228                            &metadata_clone,
229                            next_epoch_total_weight,
230                            version,
231                        )
232                    })
233                    .await;
234                    Some(commit_result.unwrap())
235                } else {
236                    None
237                };
238
239                let now = Instant::now();
240                self.storage
241                    .append_da2(proposal, payload_commitment)
242                    .await
243                    .wrap()
244                    .context(error!("Failed to append DA proposal to storage"))?;
245                self.storage_metrics
246                    .append_da_duration
247                    .add_point(now.elapsed().as_secs_f64());
248
249                // Generate and send vote
250                let vote = DaVote2::create_signed_vote(
251                    DaData2 {
252                        payload_commit: payload_commitment,
253                        next_epoch_payload_commit: next_epoch_payload_commitment,
254                        epoch: epoch_number,
255                    },
256                    view_number,
257                    &self.public_key,
258                    &self.private_key,
259                    &self.upgrade_lock,
260                )
261                .await?;
262
263                tracing::debug!("Sending vote to the DA leader {}", vote.view_number());
264
265                broadcast_event(Arc::new(HotShotEvent::DaVoteSend(vote)), &event_stream).await;
266                let mut consensus_writer = self.consensus.write().await;
267
268                // Ensure this view is in the view map for garbage collection.
269
270                if let Err(e) =
271                    consensus_writer.update_da_view(view_number, epoch_number, payload_commitment)
272                {
273                    tracing::trace!("{e:?}");
274                }
275
276                let payload_with_metadata = Arc::new(PayloadWithMetadata {
277                    payload: TYPES::BlockPayload::from_bytes(
278                        proposal.data.encoded_transactions.as_ref(),
279                        &proposal.data.metadata,
280                    ),
281                    metadata: proposal.data.metadata.clone(),
282                });
283
284                // Record the payload we have promised to make available.
285                if let Err(e) =
286                    consensus_writer.update_saved_payloads(view_number, payload_with_metadata)
287                {
288                    tracing::trace!("{e:?}");
289                }
290                drop(consensus_writer);
291
292                // Optimistically calculate and update VID if we know that the primary network is down.
293                if self.network.is_primary_down() {
294                    let my_id = self.id;
295                    let consensus =
296                        OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
297                    let pk = self.private_key.clone();
298                    let public_key = self.public_key.clone();
299                    let chan = event_stream.clone();
300                    let upgrade_lock = self.upgrade_lock.clone();
301                    let next_epoch = epoch_number.map(|epoch| epoch + 1);
302
303                    let mut target_epochs = vec![];
304                    if membership.has_stake(&public_key).await {
305                        target_epochs.push(epoch_number);
306                    }
307                    if membership
308                        .next_epoch_stake_table()
309                        .await?
310                        .has_stake(&public_key)
311                        .await
312                    {
313                        target_epochs.push(next_epoch);
314                    }
315                    if target_epochs.is_empty() {
316                        bail!(
317                            "Not calculating VID, the node doesn't belong to the current epoch or \
318                             the next epoch."
319                        );
320                    };
321
322                    tracing::debug!(
323                        "Primary network is down. Optimistically calculate own VID share."
324                    );
325                    let membership = membership.clone();
326                    spawn(async move {
327                        for target_epoch in target_epochs {
328                            Consensus::calculate_and_update_vid::<V>(
329                                OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
330                                view_number,
331                                target_epoch,
332                                membership.coordinator.clone(),
333                                &pk,
334                                &upgrade_lock,
335                            )
336                            .await;
337                            if let Some(vid_share) = consensus
338                                .read()
339                                .await
340                                .vid_shares()
341                                .get(&view_number)
342                                .and_then(|key_map| key_map.get(&public_key))
343                                .and_then(|epoch_map| epoch_map.get(&target_epoch))
344                            {
345                                tracing::debug!(
346                                    "Primary network is down. Calculated own VID share for epoch \
347                                     {target_epoch:?}, my id {my_id}"
348                                );
349                                broadcast_event(
350                                    Arc::new(HotShotEvent::VidShareRecv(
351                                        public_key.clone(),
352                                        vid_share.clone(),
353                                    )),
354                                    &chan,
355                                )
356                                .await;
357                            }
358                        }
359                    });
360                }
361            },
362            HotShotEvent::DaVoteRecv(ref vote) => {
363                tracing::debug!("DA vote recv, Main Task {}", vote.view_number());
364                // Check if we are the leader and the vote is from the sender.
365                let view = vote.view_number();
366                let epoch = vote.data.epoch;
367                let membership = self
368                    .membership_coordinator
369                    .membership_for_epoch(epoch)
370                    .await
371                    .context(warn!("No stake table for epoch"))?;
372
373                ensure!(
374                    membership.leader(view).await? == self.public_key,
375                    debug!(
376                        "We are not the DA committee leader for view {} are we leader for next \
377                         view? {}",
378                        *view,
379                        membership.leader(view + 1).await? == self.public_key
380                    )
381                );
382
383                handle_vote(
384                    &mut self.vote_collectors,
385                    vote,
386                    self.public_key.clone(),
387                    &membership,
388                    self.id,
389                    &event,
390                    &event_stream,
391                    &self.upgrade_lock,
392                    EpochTransitionIndicator::NotInTransition,
393                )
394                .await?;
395            },
396            HotShotEvent::ViewChange(view, epoch) => {
397                if *epoch > self.cur_epoch {
398                    self.cur_epoch = *epoch;
399                }
400
401                let view = *view;
402                ensure!(
403                    *self.cur_view < *view,
404                    info!("Received a view change to an older view.")
405                );
406
407                if *view - *self.cur_view > 1 {
408                    tracing::info!("View changed by more than 1 going to view {view}");
409                }
410                self.cur_view = view;
411            },
412            HotShotEvent::BlockRecv(packed_bundle) => {
413                let PackedBundle::<TYPES> {
414                    encoded_transactions,
415                    metadata,
416                    view_number,
417                    ..
418                } = packed_bundle;
419                let view_number = *view_number;
420
421                // quick hash the encoded txns with sha256
422                let encoded_transactions_hash = Sha256::digest(encoded_transactions);
423
424                // sign the encoded transactions as opposed to the VID commitment
425                let signature =
426                    TYPES::SignatureKey::sign(&self.private_key, &encoded_transactions_hash)
427                        .wrap()?;
428
429                let epoch = self.cur_epoch;
430                let leader = self
431                    .membership_coordinator
432                    .membership_for_epoch(epoch)
433                    .await
434                    .context(warn!("No stake table for epoch"))?
435                    .leader(view_number)
436                    .await?;
437                if leader != self.public_key {
438                    tracing::debug!(
439                        "We are not the leader in the current epoch. Do not send the DA proposal"
440                    );
441                    return Ok(());
442                }
443                let consensus_reader = self.consensus.read().await;
444                let high_qc_block_number = consensus_reader.high_qc().data.block_number;
445                // We in transition if our high QC is after the epoch root block, not the last block,
446                // And we aren't in an epoch greater than the high qc's epoch.  In other words
447                // we expect to propose to both epochs if the next block after our current high QC is
448                // going to be a transition block.  We most likely will propose the high QC's block height + 1.
449                let epoch_transition_indicator =
450                    if self.upgrade_lock.epochs_enabled(view_number).await {
451                        match (high_qc_block_number, self.cur_epoch) {
452                            (Some(block_number), Some(cur_epoch)) => {
453                                let epoch = epoch_from_block_number(
454                                    block_number,
455                                    self.membership_coordinator.epoch_height,
456                                );
457                                if epoch < *cur_epoch {
458                                    // We are in a new epoch, we can't be in transition
459                                    EpochTransitionIndicator::NotInTransition
460                                } else if !is_last_block(
461                                    block_number,
462                                    self.membership_coordinator.epoch_height,
463                                ) && is_ge_epoch_root(
464                                    block_number,
465                                    self.membership_coordinator.epoch_height,
466                                ) {
467                                    EpochTransitionIndicator::InTransition
468                                } else {
469                                    EpochTransitionIndicator::NotInTransition
470                                }
471                            },
472                            _ => EpochTransitionIndicator::NotInTransition,
473                        }
474                    } else {
475                        EpochTransitionIndicator::NotInTransition
476                    };
477
478                drop(consensus_reader);
479
480                let data: DaProposal2<TYPES> = DaProposal2 {
481                    encoded_transactions: Arc::clone(encoded_transactions),
482                    metadata: metadata.clone(),
483                    // Upon entering a new view we want to send a DA Proposal for the next view -> Is it always the case that this is cur_view + 1?
484                    view_number,
485                    epoch,
486                    epoch_transition_indicator,
487                };
488
489                let message = Proposal {
490                    data,
491                    signature,
492                    _pd: PhantomData,
493                };
494
495                broadcast_event(
496                    Arc::new(HotShotEvent::DaProposalSend(
497                        message.clone(),
498                        self.public_key.clone(),
499                    )),
500                    &event_stream,
501                )
502                .await;
503                let payload_with_metadata = Arc::new(PayloadWithMetadata {
504                    payload: TYPES::BlockPayload::from_bytes(
505                        encoded_transactions.as_ref(),
506                        metadata,
507                    ),
508                    metadata: metadata.clone(),
509                });
510                // Save the payload early because we might need it to calculate VID for the next epoch nodes.
511                let update_result = self
512                    .consensus
513                    .write()
514                    .await
515                    .update_saved_payloads(view_number, payload_with_metadata);
516                if let Err(e) = update_result {
517                    tracing::trace!("{e:?}");
518                }
519            },
520            _ => {},
521        }
522        Ok(())
523    }
524}
525
526#[async_trait]
527/// task state implementation for DA Task
528impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
529    for DaTaskState<TYPES, I, V>
530{
531    type Event = HotShotEvent<TYPES>;
532
533    async fn handle_event(
534        &mut self,
535        event: Arc<Self::Event>,
536        sender: &Sender<Arc<Self::Event>>,
537        _receiver: &Receiver<Arc<Self::Event>>,
538    ) -> Result<()> {
539        self.handle(event, sender.clone()).await
540    }
541
542    fn cancel_subtasks(&mut self) {}
543}