hotshot_task_impls/
vid.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{marker::PhantomData, sync::Arc};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13    consensus::{OuterConsensus, PayloadWithMetadata},
14    data::{PackedBundle, VidDisperse, VidDisperseAndDuration},
15    epoch_membership::EpochMembershipCoordinator,
16    message::{Proposal, UpgradeLock},
17    simple_vote::HasEpoch,
18    traits::{
19        block_contents::BlockHeader,
20        node_implementation::{NodeImplementation, NodeType, Versions},
21        signature_key::SignatureKey,
22        BlockPayload,
23    },
24    utils::{is_epoch_transition, option_epoch_from_block_number},
25};
26use hotshot_utils::anytrace::Result;
27use tracing::{debug, error, info, instrument};
28
29use crate::{
30    events::{HotShotEvent, HotShotTaskCompleted},
31    helpers::broadcast_event,
32};
33
34/// Tracks state of a VID task
35pub struct VidTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
36    /// View number this view is executing in.
37    pub cur_view: TYPES::View,
38
39    /// Epoch number this node is executing in.
40    pub cur_epoch: Option<TYPES::Epoch>,
41
42    /// Reference to consensus. Leader will require a read lock on this.
43    pub consensus: OuterConsensus<TYPES>,
44
45    /// The underlying network
46    pub network: Arc<I::Network>,
47
48    /// Membership for the quorum
49    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
50
51    /// This Nodes Public Key
52    pub public_key: TYPES::SignatureKey,
53
54    /// Our Private Key
55    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
56
57    /// This state's ID
58    pub id: u64,
59
60    /// Lock for a decided upgrade
61    pub upgrade_lock: UpgradeLock<TYPES, V>,
62
63    /// Number of blocks in an epoch, zero means there are no epochs
64    pub epoch_height: u64,
65}
66
67impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> VidTaskState<TYPES, I, V> {
68    /// main task event handler
69    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "VID Main Task", level = "error", target = "VidTaskState")]
70    pub async fn handle(
71        &mut self,
72        event: Arc<HotShotEvent<TYPES>>,
73        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
74    ) -> Option<HotShotTaskCompleted> {
75        match event.as_ref() {
76            HotShotEvent::BlockRecv(packed_bundle) => {
77                let PackedBundle::<TYPES> {
78                    encoded_transactions,
79                    metadata,
80                    view_number,
81                    sequencing_fees,
82                    ..
83                } = packed_bundle;
84                let payload =
85                    <TYPES as NodeType>::BlockPayload::from_bytes(encoded_transactions, metadata);
86                let builder_commitment = payload.builder_commitment(metadata);
87                let epoch = self.cur_epoch;
88                if self
89                    .membership_coordinator
90                    .membership_for_epoch(epoch)
91                    .await
92                    .ok()?
93                    .leader(*view_number)
94                    .await
95                    .ok()?
96                    != self.public_key
97                {
98                    tracing::debug!(
99                        "We are not the leader in the current epoch. Do not send the VID \
100                         dispersal."
101                    );
102                    return None;
103                }
104                let VidDisperseAndDuration {
105                    disperse: vid_disperse,
106                    duration: disperse_duration,
107                } = VidDisperse::calculate_vid_disperse::<V>(
108                    &payload,
109                    &self.membership_coordinator,
110                    *view_number,
111                    epoch,
112                    epoch,
113                    metadata,
114                    &self.upgrade_lock,
115                )
116                .await
117                .ok()?;
118                let payload_commitment = vid_disperse.payload_commitment();
119                let payload_with_metadata = Arc::new(PayloadWithMetadata {
120                    payload,
121                    metadata: metadata.clone(),
122                });
123
124                let mut consensus_writer = self.consensus.write().await;
125                consensus_writer
126                    .metrics
127                    .vid_disperse_duration
128                    .add_point(disperse_duration.as_secs_f64());
129                // Make sure we save the payload; we might need it to send the next epoch VID shares.
130                if let Err(e) =
131                    consensus_writer.update_saved_payloads(*view_number, payload_with_metadata)
132                {
133                    tracing::debug!(error=?e);
134                }
135                for share in vid_disperse.clone().to_shares() {
136                    if let Some(share) = share.to_proposal(&self.private_key) {
137                        consensus_writer.update_vid_shares(*view_number, share);
138                    }
139                }
140                drop(consensus_writer);
141
142                // send the commitment and metadata to consensus for block building
143                broadcast_event(
144                    Arc::new(HotShotEvent::SendPayloadCommitmentAndMetadata(
145                        payload_commitment,
146                        builder_commitment,
147                        metadata.clone(),
148                        *view_number,
149                        sequencing_fees.clone(),
150                    )),
151                    &event_stream,
152                )
153                .await;
154
155                let view_number = *view_number;
156                let Ok(signature) = TYPES::SignatureKey::sign(
157                    &self.private_key,
158                    vid_disperse.payload_commitment_ref(),
159                ) else {
160                    error!("VID: failed to sign dispersal payload");
161                    return None;
162                };
163                debug!("publishing VID disperse for view {view_number} and epoch {epoch:?}");
164                broadcast_event(
165                    Arc::new(HotShotEvent::VidDisperseSend(
166                        Proposal {
167                            signature,
168                            data: vid_disperse,
169                            _pd: PhantomData,
170                        },
171                        self.public_key.clone(),
172                    )),
173                    &event_stream,
174                )
175                .await;
176            },
177
178            HotShotEvent::ViewChange(view, epoch) => {
179                if *epoch > self.cur_epoch {
180                    self.cur_epoch = *epoch;
181                }
182
183                let view = *view;
184                if (*view != 0 || *self.cur_view > 0) && *self.cur_view >= *view {
185                    return None;
186                }
187
188                if *view - *self.cur_view > 1 {
189                    info!("View changed by more than 1 going to view {view}");
190                }
191                self.cur_view = view;
192
193                return None;
194            },
195
196            HotShotEvent::QuorumProposalSend(proposal, _) => {
197                let proposed_block_number = proposal.data.block_header().block_number();
198                if proposal.data.epoch().is_none()
199                    || !is_epoch_transition(proposed_block_number, self.epoch_height)
200                {
201                    // This is not the last block in the epoch, do nothing.
202                    return None;
203                }
204                // We just sent a proposal for the last block in the epoch. We need to calculate
205                // and send VID for the nodes in the next epoch so that they can vote.
206                let proposal_view_number = proposal.data.view_number();
207                let sender_epoch = option_epoch_from_block_number::<TYPES>(
208                    true,
209                    proposed_block_number,
210                    self.epoch_height,
211                );
212                let target_epoch = sender_epoch.map(|x| x + 1);
213
214                let consensus_reader = self.consensus.read().await;
215                let Some(payload) = consensus_reader.saved_payloads().get(&proposal_view_number)
216                else {
217                    tracing::warn!(
218                        "We need to calculate VID for the nodes in the next epoch but we don't \
219                         have the transactions"
220                    );
221                    return None;
222                };
223                let payload = Arc::clone(payload);
224                drop(consensus_reader);
225
226                let VidDisperseAndDuration {
227                    disperse: next_epoch_vid_disperse,
228                    duration: _,
229                } = VidDisperse::calculate_vid_disperse::<V>(
230                    &payload.payload,
231                    &self.membership_coordinator,
232                    proposal_view_number,
233                    target_epoch,
234                    sender_epoch,
235                    &payload.metadata,
236                    &self.upgrade_lock,
237                )
238                .await
239                .ok()?;
240                let Ok(next_epoch_signature) = TYPES::SignatureKey::sign(
241                    &self.private_key,
242                    next_epoch_vid_disperse.payload_commitment().as_ref(),
243                ) else {
244                    error!("VID: failed to sign dispersal payload for the next epoch");
245                    return None;
246                };
247                debug!(
248                    "publishing VID disperse for view {proposal_view_number} and epoch \
249                     {target_epoch:?}"
250                );
251                broadcast_event(
252                    Arc::new(HotShotEvent::VidDisperseSend(
253                        Proposal {
254                            signature: next_epoch_signature,
255                            data: next_epoch_vid_disperse.clone(),
256                            _pd: PhantomData,
257                        },
258                        self.public_key.clone(),
259                    )),
260                    &event_stream,
261                )
262                .await;
263            },
264            HotShotEvent::Shutdown => {
265                return Some(HotShotTaskCompleted);
266            },
267            _ => {},
268        }
269        None
270    }
271}
272
273#[async_trait]
274/// task state implementation for VID Task
275impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
276    for VidTaskState<TYPES, I, V>
277{
278    type Event = HotShotEvent<TYPES>;
279
280    async fn handle_event(
281        &mut self,
282        event: Arc<Self::Event>,
283        sender: &Sender<Arc<Self::Event>>,
284        _receiver: &Receiver<Arc<Self::Event>>,
285    ) -> Result<()> {
286        self.handle(event, sender.clone()).await;
287        Ok(())
288    }
289
290    fn cancel_subtasks(&mut self) {}
291}