hotshot_task_impls/
da.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{marker::PhantomData, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13    consensus::{Consensus, OuterConsensus, PayloadWithMetadata},
14    data::{vid_commitment, vid_disperse::vid_total_weight, DaProposal2, PackedBundle},
15    epoch_membership::EpochMembershipCoordinator,
16    event::{Event, EventType},
17    message::{Proposal, UpgradeLock},
18    simple_certificate::DaCertificate2,
19    simple_vote::{DaData2, DaVote2},
20    storage_metrics::StorageMetricsValue,
21    traits::{
22        network::ConnectedNetwork,
23        node_implementation::{NodeImplementation, NodeType, Versions},
24        signature_key::SignatureKey,
25        storage::Storage,
26        BlockPayload, EncodeBytes,
27    },
28    utils::EpochTransitionIndicator,
29    vote::HasViewNumber,
30};
31use hotshot_utils::anytrace::*;
32use sha2::{Digest, Sha256};
33use tokio::{spawn, task::spawn_blocking};
34use tracing::instrument;
35
36use crate::{
37    events::HotShotEvent,
38    helpers::broadcast_event,
39    vote_collection::{handle_vote, VoteCollectorsMap},
40};
41
42/// Tracks state of a DA task
43pub struct DaTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
44    /// Output events to application
45    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
46
47    /// View number this view is executing in.
48    pub cur_view: TYPES::View,
49
50    /// Epoch number this node is executing in.
51    pub cur_epoch: Option<TYPES::Epoch>,
52
53    /// Reference to consensus. Leader will require a read lock on this.
54    pub consensus: OuterConsensus<TYPES>,
55
56    /// Membership for the DA committee and quorum committee.
57    /// We need the latter only for calculating the proper VID scheme
58    /// from the number of nodes in the quorum.
59    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
60
61    /// The underlying network
62    pub network: Arc<I::Network>,
63
64    /// A map of `DaVote` collector tasks.
65    pub vote_collectors: VoteCollectorsMap<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>, V>,
66
67    /// This Nodes public key
68    pub public_key: TYPES::SignatureKey,
69
70    /// This Nodes private key
71    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
72
73    /// This state's ID
74    pub id: u64,
75
76    /// This node's storage ref
77    pub storage: I::Storage,
78
79    /// Storage metrics
80    pub storage_metrics: Arc<StorageMetricsValue>,
81
82    /// Lock for a decided upgrade
83    pub upgrade_lock: UpgradeLock<TYPES, V>,
84}
85
86impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYPES, I, V> {
87    /// main task event handler
88    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "DA Main Task", level = "error", target = "DaTaskState")]
89    pub async fn handle(
90        &mut self,
91        event: Arc<HotShotEvent<TYPES>>,
92        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
93    ) -> Result<()> {
94        match event.as_ref() {
95            HotShotEvent::DaProposalRecv(proposal, sender) => {
96                let sender = sender.clone();
97                tracing::debug!(
98                    "DA proposal received for view: {}",
99                    proposal.data.view_number()
100                );
101                // ED NOTE: Assuming that the next view leader is the one who sends DA proposal for this view
102                let view = proposal.data.view_number();
103
104                // Allow a DA proposal that is one view older, in case we have voted on a quorum
105                // proposal and updated the view.
106                //
107                // Anything older is discarded because it is no longer relevant.
108                ensure!(
109                    self.cur_view <= view + 1,
110                    "Throwing away DA proposal that is more than one view older"
111                );
112
113                if let Some(entry) = self.consensus.read().await.saved_payloads().get(&view) {
114                    ensure!(
115                        entry.payload.encode() == proposal.data.encoded_transactions,
116                        "Received DA proposal for view {view} but we already have a payload for \
117                         that view and they are not identical.  Throwing it away",
118                    );
119                }
120
121                let encoded_transactions_hash = Sha256::digest(&proposal.data.encoded_transactions);
122                let view_leader_key = self
123                    .membership_coordinator
124                    .membership_for_epoch(proposal.data.epoch)
125                    .await
126                    .context(warn!("No stake table for epoch {:?}", proposal.data.epoch))?
127                    .leader(view)
128                    .await?;
129                ensure!(
130                    view_leader_key == sender,
131                    warn!(
132                        "DA proposal doesn't have expected leader key for view {} \n DA proposal \
133                         is: {:?}",
134                        *view,
135                        proposal.data.clone()
136                    )
137                );
138
139                ensure!(
140                    view_leader_key.validate(&proposal.signature, &encoded_transactions_hash),
141                    warn!("Could not verify proposal.")
142                );
143
144                broadcast_event(
145                    Arc::new(HotShotEvent::DaProposalValidated(proposal.clone(), sender)),
146                    &event_stream,
147                )
148                .await;
149            },
150            HotShotEvent::DaProposalValidated(proposal, sender) => {
151                let cur_view = self.consensus.read().await.cur_view();
152                let view_number = proposal.data.view_number();
153                let epoch_number = proposal.data.epoch;
154                let membership = self
155                    .membership_coordinator
156                    .stake_table_for_epoch(epoch_number)
157                    .await
158                    .context(warn!("No stake table for epoch"))?;
159
160                ensure!(
161                    cur_view <= view_number + 1,
162                    debug!(
163                        "Validated DA proposal for prior view but it's too old now Current view \
164                         {cur_view}, DA Proposal view {}",
165                        proposal.data.view_number()
166                    )
167                );
168
169                // Proposal is fresh and valid, notify the application layer
170                broadcast_event(
171                    Event {
172                        view_number,
173                        event: EventType::DaProposal {
174                            proposal: proposal.clone(),
175                            sender: sender.clone(),
176                        },
177                    },
178                    &self.output_event_stream,
179                )
180                .await;
181
182                ensure!(
183                    membership.has_da_stake(&self.public_key).await,
184                    debug!(
185                        "We were not chosen for consensus committee for view {view_number} in \
186                         epoch {epoch_number:?}"
187                    )
188                );
189                let total_weight =
190                    vid_total_weight::<TYPES>(&membership.stake_table().await, epoch_number);
191
192                let version = self.upgrade_lock.version_infallible(view_number).await;
193
194                let txns = Arc::clone(&proposal.data.encoded_transactions);
195                let txns_clone = Arc::clone(&txns);
196                let metadata = proposal.data.metadata.encode();
197                let metadata_clone = metadata.clone();
198                let payload_commitment = spawn_blocking(move || {
199                    vid_commitment::<V>(&txns, &metadata, total_weight, version)
200                })
201                .await;
202                let payload_commitment = payload_commitment.unwrap();
203                let next_epoch_payload_commitment = if matches!(
204                    proposal.data.epoch_transition_indicator,
205                    EpochTransitionIndicator::InTransition
206                ) && self
207                    .upgrade_lock
208                    .epochs_enabled(proposal.data.view_number())
209                    .await
210                    && epoch_number.is_some()
211                {
212                    let next_epoch_total_weight = vid_total_weight::<TYPES>(
213                        &membership
214                            .next_epoch_stake_table()
215                            .await?
216                            .stake_table()
217                            .await,
218                        epoch_number.map(|epoch| epoch + 1),
219                    );
220
221                    let commit_result = spawn_blocking(move || {
222                        vid_commitment::<V>(
223                            &txns_clone,
224                            &metadata_clone,
225                            next_epoch_total_weight,
226                            version,
227                        )
228                    })
229                    .await;
230                    Some(commit_result.unwrap())
231                } else {
232                    None
233                };
234
235                let now = Instant::now();
236                self.storage
237                    .append_da2(proposal, payload_commitment)
238                    .await
239                    .wrap()
240                    .context(error!("Failed to append DA proposal to storage"))?;
241                self.storage_metrics
242                    .append_da_duration
243                    .add_point(now.elapsed().as_secs_f64());
244
245                // Generate and send vote
246                let vote = DaVote2::create_signed_vote(
247                    DaData2 {
248                        payload_commit: payload_commitment,
249                        next_epoch_payload_commit: next_epoch_payload_commitment,
250                        epoch: epoch_number,
251                    },
252                    view_number,
253                    &self.public_key,
254                    &self.private_key,
255                    &self.upgrade_lock,
256                )
257                .await?;
258
259                tracing::debug!("Sending vote to the DA leader {}", vote.view_number());
260
261                broadcast_event(Arc::new(HotShotEvent::DaVoteSend(vote)), &event_stream).await;
262                let mut consensus_writer = self.consensus.write().await;
263
264                // Ensure this view is in the view map for garbage collection.
265
266                if let Err(e) =
267                    consensus_writer.update_da_view(view_number, epoch_number, payload_commitment)
268                {
269                    tracing::trace!("{e:?}");
270                }
271
272                let payload_with_metadata = Arc::new(PayloadWithMetadata {
273                    payload: TYPES::BlockPayload::from_bytes(
274                        proposal.data.encoded_transactions.as_ref(),
275                        &proposal.data.metadata,
276                    ),
277                    metadata: proposal.data.metadata.clone(),
278                });
279
280                // Record the payload we have promised to make available.
281                if let Err(e) =
282                    consensus_writer.update_saved_payloads(view_number, payload_with_metadata)
283                {
284                    tracing::trace!("{e:?}");
285                }
286                drop(consensus_writer);
287
288                // Optimistically calculate and update VID if we know that the primary network is down.
289                if self.network.is_primary_down() {
290                    let my_id = self.id;
291                    let consensus =
292                        OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
293                    let pk = self.private_key.clone();
294                    let public_key = self.public_key.clone();
295                    let chan = event_stream.clone();
296                    let upgrade_lock = self.upgrade_lock.clone();
297                    let next_epoch = epoch_number.map(|epoch| epoch + 1);
298
299                    let mut target_epochs = vec![];
300                    if membership.has_stake(&public_key).await {
301                        target_epochs.push(epoch_number);
302                    }
303                    if membership
304                        .next_epoch_stake_table()
305                        .await?
306                        .has_stake(&public_key)
307                        .await
308                    {
309                        target_epochs.push(next_epoch);
310                    }
311                    if target_epochs.is_empty() {
312                        bail!(
313                            "Not calculating VID, the node doesn't belong to the current epoch or \
314                             the next epoch."
315                        );
316                    };
317
318                    tracing::debug!(
319                        "Primary network is down. Optimistically calculate own VID share."
320                    );
321                    let membership = membership.clone();
322                    spawn(async move {
323                        for target_epoch in target_epochs {
324                            Consensus::calculate_and_update_vid::<V>(
325                                OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
326                                view_number,
327                                target_epoch,
328                                membership.coordinator.clone(),
329                                &pk,
330                                &upgrade_lock,
331                            )
332                            .await;
333                            if let Some(vid_share) = consensus
334                                .read()
335                                .await
336                                .vid_shares()
337                                .get(&view_number)
338                                .and_then(|key_map| key_map.get(&public_key))
339                                .and_then(|epoch_map| epoch_map.get(&target_epoch))
340                            {
341                                tracing::debug!(
342                                    "Primary network is down. Calculated own VID share for epoch \
343                                     {target_epoch:?}, my id {my_id}"
344                                );
345                                broadcast_event(
346                                    Arc::new(HotShotEvent::VidShareRecv(
347                                        public_key.clone(),
348                                        vid_share.clone(),
349                                    )),
350                                    &chan,
351                                )
352                                .await;
353                            }
354                        }
355                    });
356                }
357            },
358            HotShotEvent::DaVoteRecv(ref vote) => {
359                tracing::debug!("DA vote recv, Main Task {}", vote.view_number());
360                // Check if we are the leader and the vote is from the sender.
361                let view = vote.view_number();
362                let epoch = vote.data.epoch;
363                let membership = self
364                    .membership_coordinator
365                    .membership_for_epoch(epoch)
366                    .await
367                    .context(warn!("No stake table for epoch"))?;
368
369                ensure!(
370                    membership.leader(view).await? == self.public_key,
371                    debug!(
372                        "We are not the DA committee leader for view {} are we leader for next \
373                         view? {}",
374                        *view,
375                        membership.leader(view + 1).await? == self.public_key
376                    )
377                );
378
379                handle_vote(
380                    &mut self.vote_collectors,
381                    vote,
382                    self.public_key.clone(),
383                    &membership,
384                    self.id,
385                    &event,
386                    &event_stream,
387                    &self.upgrade_lock,
388                    EpochTransitionIndicator::NotInTransition,
389                )
390                .await?;
391            },
392            HotShotEvent::ViewChange(view, epoch) => {
393                if *epoch > self.cur_epoch {
394                    self.cur_epoch = *epoch;
395                }
396
397                let view = *view;
398                ensure!(
399                    *self.cur_view < *view,
400                    info!("Received a view change to an older view.")
401                );
402
403                if *view - *self.cur_view > 1 {
404                    tracing::info!("View changed by more than 1 going to view {view}");
405                }
406                self.cur_view = view;
407            },
408            HotShotEvent::BlockRecv(packed_bundle) => {
409                let PackedBundle::<TYPES> {
410                    encoded_transactions,
411                    metadata,
412                    view_number,
413                    ..
414                } = packed_bundle;
415                let view_number = *view_number;
416
417                // quick hash the encoded txns with sha256
418                let encoded_transactions_hash = Sha256::digest(encoded_transactions);
419
420                // sign the encoded transactions as opposed to the VID commitment
421                let signature =
422                    TYPES::SignatureKey::sign(&self.private_key, &encoded_transactions_hash)
423                        .wrap()?;
424
425                let epoch = self.cur_epoch;
426                let leader = self
427                    .membership_coordinator
428                    .membership_for_epoch(epoch)
429                    .await
430                    .context(warn!("No stake table for epoch"))?
431                    .leader(view_number)
432                    .await?;
433                if leader != self.public_key {
434                    tracing::debug!(
435                        "We are not the leader in the current epoch. Do not send the DA proposal"
436                    );
437                    return Ok(());
438                }
439                let epoch_transition_indicator =
440                    if self.consensus.read().await.is_high_qc_ge_root_block() {
441                        EpochTransitionIndicator::InTransition
442                    } else {
443                        EpochTransitionIndicator::NotInTransition
444                    };
445                let data: DaProposal2<TYPES> = DaProposal2 {
446                    encoded_transactions: Arc::clone(encoded_transactions),
447                    metadata: metadata.clone(),
448                    // Upon entering a new view we want to send a DA Proposal for the next view -> Is it always the case that this is cur_view + 1?
449                    view_number,
450                    epoch,
451                    epoch_transition_indicator,
452                };
453
454                let message = Proposal {
455                    data,
456                    signature,
457                    _pd: PhantomData,
458                };
459
460                broadcast_event(
461                    Arc::new(HotShotEvent::DaProposalSend(
462                        message.clone(),
463                        self.public_key.clone(),
464                    )),
465                    &event_stream,
466                )
467                .await;
468                let payload_with_metadata = Arc::new(PayloadWithMetadata {
469                    payload: TYPES::BlockPayload::from_bytes(
470                        encoded_transactions.as_ref(),
471                        metadata,
472                    ),
473                    metadata: metadata.clone(),
474                });
475                // Save the payload early because we might need it to calculate VID for the next epoch nodes.
476                if let Err(e) = self
477                    .consensus
478                    .write()
479                    .await
480                    .update_saved_payloads(view_number, payload_with_metadata)
481                {
482                    tracing::trace!("{e:?}");
483                }
484            },
485            _ => {},
486        }
487        Ok(())
488    }
489}
490
491#[async_trait]
492/// task state implementation for DA Task
493impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
494    for DaTaskState<TYPES, I, V>
495{
496    type Event = HotShotEvent<TYPES>;
497
498    async fn handle_event(
499        &mut self,
500        event: Arc<Self::Event>,
501        sender: &Sender<Arc<Self::Event>>,
502        _receiver: &Receiver<Arc<Self::Event>>,
503    ) -> Result<()> {
504        self.handle(event, sender.clone()).await
505    }
506
507    fn cancel_subtasks(&mut self) {}
508}