hotshot_task_impls/
vid.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{marker::PhantomData, sync::Arc};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13    consensus::{OuterConsensus, PayloadWithMetadata},
14    data::{PackedBundle, VidDisperse, VidDisperseAndDuration, VidDisperseShare},
15    epoch_membership::EpochMembershipCoordinator,
16    message::{Proposal, UpgradeLock},
17    simple_vote::HasEpoch,
18    traits::{
19        block_contents::BlockHeader,
20        node_implementation::{NodeImplementation, NodeType, Versions},
21        signature_key::SignatureKey,
22        BlockPayload,
23    },
24    utils::{is_epoch_transition, option_epoch_from_block_number},
25};
26use hotshot_utils::anytrace::Result;
27use tracing::{debug, error, info, instrument};
28
29use crate::{
30    events::{HotShotEvent, HotShotTaskCompleted},
31    helpers::broadcast_event,
32};
33
34/// Tracks state of a VID task
35pub struct VidTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
36    /// View number this view is executing in.
37    pub cur_view: TYPES::View,
38
39    /// Epoch number this node is executing in.
40    pub cur_epoch: Option<TYPES::Epoch>,
41
42    /// Reference to consensus. Leader will require a read lock on this.
43    pub consensus: OuterConsensus<TYPES>,
44
45    /// The underlying network
46    pub network: Arc<I::Network>,
47
48    /// Membership for the quorum
49    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
50
51    /// This Nodes Public Key
52    pub public_key: TYPES::SignatureKey,
53
54    /// Our Private Key
55    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
56
57    /// This state's ID
58    pub id: u64,
59
60    /// Lock for a decided upgrade
61    pub upgrade_lock: UpgradeLock<TYPES, V>,
62
63    /// Number of blocks in an epoch, zero means there are no epochs
64    pub epoch_height: u64,
65}
66
67impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> VidTaskState<TYPES, I, V> {
68    /// main task event handler
69    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "VID Main Task", level = "error", target = "VidTaskState")]
70    pub async fn handle(
71        &mut self,
72        event: Arc<HotShotEvent<TYPES>>,
73        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
74    ) -> Option<HotShotTaskCompleted> {
75        match event.as_ref() {
76            HotShotEvent::BlockRecv(packed_bundle) => {
77                let PackedBundle::<TYPES> {
78                    encoded_transactions,
79                    metadata,
80                    view_number,
81                    sequencing_fees,
82                    ..
83                } = packed_bundle;
84                let payload =
85                    <TYPES as NodeType>::BlockPayload::from_bytes(encoded_transactions, metadata);
86                let builder_commitment = payload.builder_commitment(metadata);
87                let epoch = self.cur_epoch;
88                if self
89                    .membership_coordinator
90                    .membership_for_epoch(epoch)
91                    .await
92                    .ok()?
93                    .leader(*view_number)
94                    .await
95                    .ok()?
96                    != self.public_key
97                {
98                    tracing::debug!(
99                        "We are not the leader in the current epoch. Do not send the VID \
100                         dispersal."
101                    );
102                    return None;
103                }
104                let VidDisperseAndDuration {
105                    disperse: vid_disperse,
106                    duration: disperse_duration,
107                } = VidDisperse::calculate_vid_disperse::<V>(
108                    &payload,
109                    &self.membership_coordinator,
110                    *view_number,
111                    epoch,
112                    epoch,
113                    metadata,
114                    &self.upgrade_lock,
115                )
116                .await
117                .ok()?;
118                let payload_commitment = vid_disperse.payload_commitment();
119                let shares = VidDisperseShare::from_vid_disperse(vid_disperse.clone());
120                let payload_with_metadata = Arc::new(PayloadWithMetadata {
121                    payload,
122                    metadata: metadata.clone(),
123                });
124
125                let mut consensus_writer = self.consensus.write().await;
126                consensus_writer
127                    .metrics
128                    .vid_disperse_duration
129                    .add_point(disperse_duration.as_secs_f64());
130                // Make sure we save the payload; we might need it to send the next epoch VID shares.
131                if let Err(e) =
132                    consensus_writer.update_saved_payloads(*view_number, payload_with_metadata)
133                {
134                    tracing::debug!(error=?e);
135                }
136                for share in shares {
137                    if let Some(share) = share.to_proposal(&self.private_key) {
138                        consensus_writer.update_vid_shares(*view_number, share);
139                    }
140                }
141                drop(consensus_writer);
142
143                // send the commitment and metadata to consensus for block building
144                broadcast_event(
145                    Arc::new(HotShotEvent::SendPayloadCommitmentAndMetadata(
146                        payload_commitment,
147                        builder_commitment,
148                        metadata.clone(),
149                        *view_number,
150                        sequencing_fees.clone(),
151                    )),
152                    &event_stream,
153                )
154                .await;
155
156                let view_number = *view_number;
157                let Ok(signature) = TYPES::SignatureKey::sign(
158                    &self.private_key,
159                    vid_disperse.payload_commitment_ref(),
160                ) else {
161                    error!("VID: failed to sign dispersal payload");
162                    return None;
163                };
164                debug!("publishing VID disperse for view {view_number} and epoch {epoch:?}");
165                broadcast_event(
166                    Arc::new(HotShotEvent::VidDisperseSend(
167                        Proposal {
168                            signature,
169                            data: vid_disperse,
170                            _pd: PhantomData,
171                        },
172                        self.public_key.clone(),
173                    )),
174                    &event_stream,
175                )
176                .await;
177            },
178
179            HotShotEvent::ViewChange(view, epoch) => {
180                if *epoch > self.cur_epoch {
181                    self.cur_epoch = *epoch;
182                }
183
184                let view = *view;
185                if (*view != 0 || *self.cur_view > 0) && *self.cur_view >= *view {
186                    return None;
187                }
188
189                if *view - *self.cur_view > 1 {
190                    info!("View changed by more than 1 going to view {view}");
191                }
192                self.cur_view = view;
193
194                return None;
195            },
196
197            HotShotEvent::QuorumProposalSend(proposal, _) => {
198                let proposed_block_number = proposal.data.block_header().block_number();
199                if proposal.data.epoch().is_none()
200                    || !is_epoch_transition(proposed_block_number, self.epoch_height)
201                {
202                    // This is not the last block in the epoch, do nothing.
203                    return None;
204                }
205                // We just sent a proposal for the last block in the epoch. We need to calculate
206                // and send VID for the nodes in the next epoch so that they can vote.
207                let proposal_view_number = proposal.data.view_number();
208                let sender_epoch = option_epoch_from_block_number::<TYPES>(
209                    true,
210                    proposed_block_number,
211                    self.epoch_height,
212                );
213                let target_epoch = sender_epoch.map(|x| x + 1);
214
215                let consensus_reader = self.consensus.read().await;
216                let Some(payload) = consensus_reader.saved_payloads().get(&proposal_view_number)
217                else {
218                    tracing::warn!(
219                        "We need to calculate VID for the nodes in the next epoch but we don't \
220                         have the transactions"
221                    );
222                    return None;
223                };
224                let payload = Arc::clone(payload);
225                drop(consensus_reader);
226
227                let VidDisperseAndDuration {
228                    disperse: next_epoch_vid_disperse,
229                    duration: _,
230                } = VidDisperse::calculate_vid_disperse::<V>(
231                    &payload.payload,
232                    &self.membership_coordinator,
233                    proposal_view_number,
234                    target_epoch,
235                    sender_epoch,
236                    &payload.metadata,
237                    &self.upgrade_lock,
238                )
239                .await
240                .ok()?;
241                let Ok(next_epoch_signature) = TYPES::SignatureKey::sign(
242                    &self.private_key,
243                    next_epoch_vid_disperse.payload_commitment().as_ref(),
244                ) else {
245                    error!("VID: failed to sign dispersal payload for the next epoch");
246                    return None;
247                };
248                debug!(
249                    "publishing VID disperse for view {proposal_view_number} and epoch \
250                     {target_epoch:?}"
251                );
252                broadcast_event(
253                    Arc::new(HotShotEvent::VidDisperseSend(
254                        Proposal {
255                            signature: next_epoch_signature,
256                            data: next_epoch_vid_disperse.clone(),
257                            _pd: PhantomData,
258                        },
259                        self.public_key.clone(),
260                    )),
261                    &event_stream,
262                )
263                .await;
264            },
265            HotShotEvent::Shutdown => {
266                return Some(HotShotTaskCompleted);
267            },
268            _ => {},
269        }
270        None
271    }
272}
273
274#[async_trait]
275/// task state implementation for VID Task
276impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
277    for VidTaskState<TYPES, I, V>
278{
279    type Event = HotShotEvent<TYPES>;
280
281    async fn handle_event(
282        &mut self,
283        event: Arc<Self::Event>,
284        sender: &Sender<Arc<Self::Event>>,
285        _receiver: &Receiver<Arc<Self::Event>>,
286    ) -> Result<()> {
287        self.handle(event, sender.clone()).await;
288        Ok(())
289    }
290
291    fn cancel_subtasks(&mut self) {}
292}