hotshot_task_impls/
da.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{marker::PhantomData, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13    consensus::{Consensus, OuterConsensus, PayloadWithMetadata},
14    data::{
15        DaProposal2, EpochNumber, PackedBundle, ViewNumber, vid_commitment,
16        vid_disperse::vid_total_weight,
17    },
18    epoch_membership::EpochMembershipCoordinator,
19    event::{Event, EventType},
20    message::{Proposal, UpgradeLock},
21    simple_certificate::DaCertificate2,
22    simple_vote::{DaData2, DaVote2},
23    storage_metrics::StorageMetricsValue,
24    traits::{
25        BlockPayload, EncodeBytes,
26        network::ConnectedNetwork,
27        node_implementation::{NodeImplementation, NodeType},
28        signature_key::SignatureKey,
29        storage::Storage,
30    },
31    utils::{EpochTransitionIndicator, epoch_from_block_number, is_ge_epoch_root, is_last_block},
32    vote::HasViewNumber,
33};
34use hotshot_utils::anytrace::*;
35use sha2::{Digest, Sha256};
36use tokio::{spawn, task::spawn_blocking};
37use tracing::instrument;
38
39use crate::{
40    events::HotShotEvent,
41    helpers::broadcast_event,
42    vote_collection::{VoteCollectorsMap, handle_vote},
43};
44
45/// Tracks state of a DA task
46pub struct DaTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
47    /// Output events to application
48    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
49
50    /// View number this view is executing in.
51    pub cur_view: ViewNumber,
52
53    /// Epoch number this node is executing in.
54    pub cur_epoch: Option<EpochNumber>,
55
56    /// Reference to consensus. Leader will require a read lock on this.
57    pub consensus: OuterConsensus<TYPES>,
58
59    /// Membership for the DA committee and quorum committee.
60    /// We need the latter only for calculating the proper VID scheme
61    /// from the number of nodes in the quorum.
62    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
63
64    /// The underlying network
65    pub network: Arc<I::Network>,
66
67    /// A map of `DaVote` collector tasks.
68    pub vote_collectors: VoteCollectorsMap<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>,
69
70    /// This Nodes public key
71    pub public_key: TYPES::SignatureKey,
72
73    /// This Nodes private key
74    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
75
76    /// This state's ID
77    pub id: u64,
78
79    /// This node's storage ref
80    pub storage: I::Storage,
81
82    /// Storage metrics
83    pub storage_metrics: Arc<StorageMetricsValue>,
84
85    /// Lock for a decided upgrade
86    pub upgrade_lock: UpgradeLock<TYPES>,
87}
88
89impl<TYPES: NodeType, I: NodeImplementation<TYPES>> DaTaskState<TYPES, I> {
90    /// main task event handler
91    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "DA Main Task", level = "error", target = "DaTaskState")]
92    pub async fn handle(
93        &mut self,
94        event: Arc<HotShotEvent<TYPES>>,
95        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
96    ) -> Result<()> {
97        match event.as_ref() {
98            HotShotEvent::DaProposalRecv(proposal, sender) => {
99                let sender = sender.clone();
100                tracing::debug!(
101                    "DA proposal received for view: {}",
102                    proposal.data.view_number()
103                );
104                // ED NOTE: Assuming that the next view leader is the one who sends DA proposal for this view
105                let view = proposal.data.view_number();
106
107                // Allow a DA proposal that is one view older, in case we have voted on a quorum
108                // proposal and updated the view.
109                //
110                // Anything older is discarded because it is no longer relevant.
111                ensure!(
112                    self.cur_view <= view + 1,
113                    "Throwing away DA proposal that is more than one view older"
114                );
115
116                if let Some(entry) = self.consensus.read().await.saved_payloads().get(&view) {
117                    ensure!(
118                        entry.payload.encode() == proposal.data.encoded_transactions,
119                        "Received DA proposal for view {view} but we already have a payload for \
120                         that view and they are not identical.  Throwing it away",
121                    );
122                }
123
124                let encoded_transactions_hash = Sha256::digest(&proposal.data.encoded_transactions);
125                let view_leader_key = self
126                    .membership_coordinator
127                    .membership_for_epoch(proposal.data.epoch)
128                    .await
129                    .context(warn!("No stake table for epoch {:?}", proposal.data.epoch))?
130                    .leader(view)
131                    .await?;
132                ensure!(
133                    view_leader_key == sender,
134                    warn!(
135                        "DA proposal doesn't have expected leader key for view {} \n DA proposal \
136                         is: {:?}",
137                        *view,
138                        proposal.data.clone()
139                    )
140                );
141
142                ensure!(
143                    view_leader_key.validate(&proposal.signature, &encoded_transactions_hash),
144                    warn!("Could not verify proposal.")
145                );
146
147                broadcast_event(
148                    Arc::new(HotShotEvent::DaProposalValidated(proposal.clone(), sender)),
149                    &event_stream,
150                )
151                .await;
152            },
153            HotShotEvent::DaProposalValidated(proposal, sender) => {
154                tracing::debug!(
155                    "DA proposal validated for view {}",
156                    proposal.data.view_number()
157                );
158                let cur_view = self.consensus.read().await.cur_view();
159                let view_number = proposal.data.view_number();
160                let epoch_number = proposal.data.epoch;
161                let membership = self
162                    .membership_coordinator
163                    .stake_table_for_epoch(epoch_number)
164                    .await
165                    .context(warn!("No stake table for epoch"))?;
166
167                ensure!(
168                    cur_view <= view_number + 1,
169                    debug!(
170                        "Validated DA proposal for prior view but it's too old now Current view \
171                         {cur_view}, DA Proposal view {}",
172                        proposal.data.view_number()
173                    )
174                );
175
176                // Proposal is fresh and valid, notify the application layer
177                broadcast_event(
178                    Event {
179                        view_number,
180                        event: EventType::DaProposal {
181                            proposal: proposal.clone(),
182                            sender: sender.clone(),
183                        },
184                    },
185                    &self.output_event_stream,
186                )
187                .await;
188
189                ensure!(
190                    membership.has_da_stake(&self.public_key).await,
191                    debug!(
192                        "We were not chosen for consensus committee for view {view_number} in \
193                         epoch {epoch_number:?}"
194                    )
195                );
196                let total_weight =
197                    vid_total_weight::<TYPES>(&membership.stake_table().await, epoch_number);
198
199                let version = self.upgrade_lock.version_infallible(view_number);
200
201                let txns = Arc::clone(&proposal.data.encoded_transactions);
202                let txns_clone = Arc::clone(&txns);
203                let metadata = proposal.data.metadata.encode();
204                let metadata_clone = metadata.clone();
205                let payload_commitment =
206                    spawn_blocking(move || vid_commitment(&txns, &metadata, total_weight, version))
207                        .await;
208                let payload_commitment = payload_commitment.unwrap();
209                let next_epoch_payload_commitment = if matches!(
210                    proposal.data.epoch_transition_indicator,
211                    EpochTransitionIndicator::InTransition
212                ) && self
213                    .upgrade_lock
214                    .epochs_enabled(proposal.data.view_number())
215                    && epoch_number.is_some()
216                {
217                    let next_epoch_total_weight = vid_total_weight::<TYPES>(
218                        &membership
219                            .next_epoch_stake_table()
220                            .await?
221                            .stake_table()
222                            .await,
223                        epoch_number.map(|epoch| epoch + 1),
224                    );
225
226                    let commit_result = spawn_blocking(move || {
227                        vid_commitment(
228                            &txns_clone,
229                            &metadata_clone,
230                            next_epoch_total_weight,
231                            version,
232                        )
233                    })
234                    .await;
235                    Some(commit_result.unwrap())
236                } else {
237                    None
238                };
239
240                let now = Instant::now();
241                self.storage
242                    .append_da2(proposal, payload_commitment)
243                    .await
244                    .wrap()
245                    .context(error!("Failed to append DA proposal to storage"))?;
246                self.storage_metrics
247                    .append_da_duration
248                    .add_point(now.elapsed().as_secs_f64());
249
250                // Generate and send vote
251                let vote = DaVote2::create_signed_vote(
252                    DaData2 {
253                        payload_commit: payload_commitment,
254                        next_epoch_payload_commit: next_epoch_payload_commitment,
255                        epoch: epoch_number,
256                    },
257                    view_number,
258                    &self.public_key,
259                    &self.private_key,
260                    &self.upgrade_lock,
261                )
262                .await?;
263
264                tracing::debug!("Sending vote to the DA leader {}", vote.view_number());
265
266                broadcast_event(Arc::new(HotShotEvent::DaVoteSend(vote)), &event_stream).await;
267                let mut consensus_writer = self.consensus.write().await;
268
269                // Ensure this view is in the view map for garbage collection.
270
271                if let Err(e) =
272                    consensus_writer.update_da_view(view_number, epoch_number, payload_commitment)
273                {
274                    tracing::trace!("{e:?}");
275                }
276
277                let payload_with_metadata = Arc::new(PayloadWithMetadata {
278                    payload: TYPES::BlockPayload::from_bytes(
279                        proposal.data.encoded_transactions.as_ref(),
280                        &proposal.data.metadata,
281                    ),
282                    metadata: proposal.data.metadata.clone(),
283                });
284
285                // Record the payload we have promised to make available.
286                if let Err(e) =
287                    consensus_writer.update_saved_payloads(view_number, payload_with_metadata)
288                {
289                    tracing::trace!("{e:?}");
290                }
291                drop(consensus_writer);
292
293                // Optimistically calculate and update VID if we know that the primary network is down.
294                if self.network.is_primary_down() {
295                    let my_id = self.id;
296                    let consensus =
297                        OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
298                    let pk = self.private_key.clone();
299                    let public_key = self.public_key.clone();
300                    let chan = event_stream.clone();
301                    let upgrade_lock = self.upgrade_lock.clone();
302                    let next_epoch = epoch_number.map(|epoch| epoch + 1);
303
304                    let mut target_epochs = vec![];
305                    if membership.has_stake(&public_key).await {
306                        target_epochs.push(epoch_number);
307                    }
308                    if membership
309                        .next_epoch_stake_table()
310                        .await?
311                        .has_stake(&public_key)
312                        .await
313                    {
314                        target_epochs.push(next_epoch);
315                    }
316                    if target_epochs.is_empty() {
317                        bail!(
318                            "Not calculating VID, the node doesn't belong to the current epoch or \
319                             the next epoch."
320                        );
321                    };
322
323                    tracing::debug!(
324                        "Primary network is down. Optimistically calculate own VID share."
325                    );
326                    let membership = membership.clone();
327                    spawn(async move {
328                        for target_epoch in target_epochs {
329                            Consensus::calculate_and_update_vid(
330                                OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
331                                view_number,
332                                target_epoch,
333                                membership.coordinator.clone(),
334                                &pk,
335                                &upgrade_lock,
336                            )
337                            .await;
338                            if let Some(vid_share) = consensus
339                                .read()
340                                .await
341                                .vid_shares()
342                                .get(&view_number)
343                                .and_then(|key_map| key_map.get(&public_key))
344                                .and_then(|epoch_map| epoch_map.get(&target_epoch))
345                            {
346                                tracing::debug!(
347                                    "Primary network is down. Calculated own VID share for epoch \
348                                     {target_epoch:?}, my id {my_id}"
349                                );
350                                broadcast_event(
351                                    Arc::new(HotShotEvent::VidShareRecv(
352                                        public_key.clone(),
353                                        vid_share.clone(),
354                                    )),
355                                    &chan,
356                                )
357                                .await;
358                            }
359                        }
360                    });
361                }
362            },
363            HotShotEvent::DaVoteRecv(vote) => {
364                tracing::debug!("DA vote recv, Main Task {}", vote.view_number());
365                // Check if we are the leader and the vote is from the sender.
366                let view = vote.view_number();
367                let epoch = vote.data.epoch;
368                let membership = self
369                    .membership_coordinator
370                    .membership_for_epoch(epoch)
371                    .await
372                    .context(warn!("No stake table for epoch"))?;
373
374                ensure!(
375                    membership.leader(view).await? == self.public_key,
376                    debug!(
377                        "We are not the DA committee leader for view {} are we leader for next \
378                         view? {}",
379                        *view,
380                        membership.leader(view + 1).await? == self.public_key
381                    )
382                );
383
384                handle_vote(
385                    &mut self.vote_collectors,
386                    vote,
387                    self.public_key.clone(),
388                    &membership,
389                    self.id,
390                    &event,
391                    &event_stream,
392                    &self.upgrade_lock,
393                    EpochTransitionIndicator::NotInTransition,
394                )
395                .await?;
396            },
397            HotShotEvent::ViewChange(view, epoch) => {
398                if *epoch > self.cur_epoch {
399                    self.cur_epoch = *epoch;
400                }
401
402                let view = *view;
403                ensure!(
404                    *self.cur_view < *view,
405                    info!("Received a view change to an older view.")
406                );
407
408                if *view - *self.cur_view > 1 {
409                    tracing::info!("View changed by more than 1 going to view {view}");
410                }
411                self.cur_view = view;
412            },
413            HotShotEvent::BlockRecv(packed_bundle) => {
414                let PackedBundle::<TYPES> {
415                    encoded_transactions,
416                    metadata,
417                    view_number,
418                    ..
419                } = packed_bundle;
420                let view_number = *view_number;
421
422                // quick hash the encoded txns with sha256
423                let encoded_transactions_hash = Sha256::digest(encoded_transactions);
424
425                // sign the encoded transactions as opposed to the VID commitment
426                let signature =
427                    TYPES::SignatureKey::sign(&self.private_key, &encoded_transactions_hash)
428                        .wrap()?;
429
430                let epoch = self.cur_epoch;
431                let leader = self
432                    .membership_coordinator
433                    .membership_for_epoch(epoch)
434                    .await
435                    .context(warn!("No stake table for epoch"))?
436                    .leader(view_number)
437                    .await?;
438                if leader != self.public_key {
439                    tracing::debug!(
440                        "We are not the leader in the current epoch. Do not send the DA proposal"
441                    );
442                    return Ok(());
443                }
444                let consensus_reader = self.consensus.read().await;
445                let high_qc_block_number = consensus_reader.high_qc().data.block_number;
446                // We in transition if our high QC is after the epoch root block, not the last block,
447                // And we aren't in an epoch greater than the high qc's epoch.  In other words
448                // we expect to propose to both epochs if the next block after our current high QC is
449                // going to be a transition block.  We most likely will propose the high QC's block height + 1.
450                let epoch_transition_indicator = if self.upgrade_lock.epochs_enabled(view_number) {
451                    match (high_qc_block_number, self.cur_epoch) {
452                        (Some(block_number), Some(cur_epoch)) => {
453                            let epoch = epoch_from_block_number(
454                                block_number,
455                                self.membership_coordinator.epoch_height,
456                            );
457                            if epoch < *cur_epoch {
458                                // We are in a new epoch, we can't be in transition
459                                EpochTransitionIndicator::NotInTransition
460                            } else if !is_last_block(
461                                block_number,
462                                self.membership_coordinator.epoch_height,
463                            ) && is_ge_epoch_root(
464                                block_number,
465                                self.membership_coordinator.epoch_height,
466                            ) {
467                                EpochTransitionIndicator::InTransition
468                            } else {
469                                EpochTransitionIndicator::NotInTransition
470                            }
471                        },
472                        _ => EpochTransitionIndicator::NotInTransition,
473                    }
474                } else {
475                    EpochTransitionIndicator::NotInTransition
476                };
477
478                drop(consensus_reader);
479
480                let data: DaProposal2<TYPES> = DaProposal2 {
481                    encoded_transactions: Arc::clone(encoded_transactions),
482                    metadata: metadata.clone(),
483                    // Upon entering a new view we want to send a DA Proposal for the next view -> Is it always the case that this is cur_view + 1?
484                    view_number,
485                    epoch,
486                    epoch_transition_indicator,
487                };
488
489                let message = Proposal {
490                    data,
491                    signature,
492                    _pd: PhantomData,
493                };
494
495                broadcast_event(
496                    Arc::new(HotShotEvent::DaProposalSend(
497                        message.clone(),
498                        self.public_key.clone(),
499                    )),
500                    &event_stream,
501                )
502                .await;
503                let payload_with_metadata = Arc::new(PayloadWithMetadata {
504                    payload: TYPES::BlockPayload::from_bytes(
505                        encoded_transactions.as_ref(),
506                        metadata,
507                    ),
508                    metadata: metadata.clone(),
509                });
510                // Save the payload early because we might need it to calculate VID for the next epoch nodes.
511                let update_result = self
512                    .consensus
513                    .write()
514                    .await
515                    .update_saved_payloads(view_number, payload_with_metadata);
516                if let Err(e) = update_result {
517                    tracing::trace!("{e:?}");
518                }
519            },
520            _ => {},
521        }
522        Ok(())
523    }
524}
525
526/// task state implementation for DA Task
527#[async_trait]
528impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for DaTaskState<TYPES, I> {
529    type Event = HotShotEvent<TYPES>;
530
531    async fn handle_event(
532        &mut self,
533        event: Arc<Self::Event>,
534        sender: &Sender<Arc<Self::Event>>,
535        _receiver: &Receiver<Arc<Self::Event>>,
536    ) -> Result<()> {
537        self.handle(event, sender.clone()).await
538    }
539
540    fn cancel_subtasks(&mut self) {}
541}