1use std::{marker::PhantomData, sync::Arc};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13 consensus::{OuterConsensus, PayloadWithMetadata},
14 data::{PackedBundle, VidDisperse, VidDisperseAndDuration},
15 epoch_membership::EpochMembershipCoordinator,
16 message::{Proposal, UpgradeLock},
17 simple_vote::HasEpoch,
18 traits::{
19 block_contents::BlockHeader,
20 node_implementation::{NodeImplementation, NodeType, Versions},
21 signature_key::SignatureKey,
22 BlockPayload,
23 },
24 utils::{is_epoch_transition, option_epoch_from_block_number},
25};
26use hotshot_utils::anytrace::Result;
27use tracing::{debug, error, info, instrument};
28
29use crate::{
30 events::{HotShotEvent, HotShotTaskCompleted},
31 helpers::broadcast_event,
32};
33
34pub struct VidTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
36 pub cur_view: TYPES::View,
38
39 pub cur_epoch: Option<TYPES::Epoch>,
41
42 pub consensus: OuterConsensus<TYPES>,
44
45 pub network: Arc<I::Network>,
47
48 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
50
51 pub public_key: TYPES::SignatureKey,
53
54 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
56
57 pub id: u64,
59
60 pub upgrade_lock: UpgradeLock<TYPES, V>,
62
63 pub epoch_height: u64,
65}
66
67impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> VidTaskState<TYPES, I, V> {
68 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "VID Main Task", level = "error", target = "VidTaskState")]
70 pub async fn handle(
71 &mut self,
72 event: Arc<HotShotEvent<TYPES>>,
73 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
74 ) -> Option<HotShotTaskCompleted> {
75 match event.as_ref() {
76 HotShotEvent::BlockRecv(packed_bundle) => {
77 let PackedBundle::<TYPES> {
78 encoded_transactions,
79 metadata,
80 view_number,
81 sequencing_fees,
82 ..
83 } = packed_bundle;
84 let payload =
85 <TYPES as NodeType>::BlockPayload::from_bytes(encoded_transactions, metadata);
86 let builder_commitment = payload.builder_commitment(metadata);
87 let epoch = self.cur_epoch;
88 if self
89 .membership_coordinator
90 .membership_for_epoch(epoch)
91 .await
92 .ok()?
93 .leader(*view_number)
94 .await
95 .ok()?
96 != self.public_key
97 {
98 tracing::debug!(
99 "We are not the leader in the current epoch. Do not send the VID \
100 dispersal."
101 );
102 return None;
103 }
104 let VidDisperseAndDuration {
105 disperse: vid_disperse,
106 duration: disperse_duration,
107 } = VidDisperse::calculate_vid_disperse::<V>(
108 &payload,
109 &self.membership_coordinator,
110 *view_number,
111 epoch,
112 epoch,
113 metadata,
114 &self.upgrade_lock,
115 )
116 .await
117 .ok()?;
118 let payload_commitment = vid_disperse.payload_commitment();
119 let payload_with_metadata = Arc::new(PayloadWithMetadata {
120 payload,
121 metadata: metadata.clone(),
122 });
123
124 let mut consensus_writer = self.consensus.write().await;
125 consensus_writer
126 .metrics
127 .vid_disperse_duration
128 .add_point(disperse_duration.as_secs_f64());
129 if let Err(e) =
131 consensus_writer.update_saved_payloads(*view_number, payload_with_metadata)
132 {
133 tracing::debug!(error=?e);
134 }
135 for share in vid_disperse.clone().to_shares() {
136 if let Some(share) = share.to_proposal(&self.private_key) {
137 consensus_writer.update_vid_shares(*view_number, share);
138 }
139 }
140 drop(consensus_writer);
141
142 broadcast_event(
144 Arc::new(HotShotEvent::SendPayloadCommitmentAndMetadata(
145 payload_commitment,
146 builder_commitment,
147 metadata.clone(),
148 *view_number,
149 sequencing_fees.clone(),
150 )),
151 &event_stream,
152 )
153 .await;
154
155 let view_number = *view_number;
156 let Ok(signature) = TYPES::SignatureKey::sign(
157 &self.private_key,
158 vid_disperse.payload_commitment_ref(),
159 ) else {
160 error!("VID: failed to sign dispersal payload");
161 return None;
162 };
163 debug!("publishing VID disperse for view {view_number} and epoch {epoch:?}");
164 broadcast_event(
165 Arc::new(HotShotEvent::VidDisperseSend(
166 Proposal {
167 signature,
168 data: vid_disperse,
169 _pd: PhantomData,
170 },
171 self.public_key.clone(),
172 )),
173 &event_stream,
174 )
175 .await;
176 },
177
178 HotShotEvent::ViewChange(view, epoch) => {
179 if *epoch > self.cur_epoch {
180 self.cur_epoch = *epoch;
181 }
182
183 let view = *view;
184 if (*view != 0 || *self.cur_view > 0) && *self.cur_view >= *view {
185 return None;
186 }
187
188 if *view - *self.cur_view > 1 {
189 info!("View changed by more than 1 going to view {view}");
190 }
191 self.cur_view = view;
192
193 return None;
194 },
195
196 HotShotEvent::QuorumProposalSend(proposal, _) => {
197 let proposed_block_number = proposal.data.block_header().block_number();
198 if proposal.data.epoch().is_none()
199 || !is_epoch_transition(proposed_block_number, self.epoch_height)
200 {
201 return None;
203 }
204 let proposal_view_number = proposal.data.view_number();
207 let sender_epoch = option_epoch_from_block_number::<TYPES>(
208 true,
209 proposed_block_number,
210 self.epoch_height,
211 );
212 let target_epoch = sender_epoch.map(|x| x + 1);
213
214 let consensus_reader = self.consensus.read().await;
215 let Some(payload) = consensus_reader.saved_payloads().get(&proposal_view_number)
216 else {
217 tracing::warn!(
218 "We need to calculate VID for the nodes in the next epoch but we don't \
219 have the transactions"
220 );
221 return None;
222 };
223 let payload = Arc::clone(payload);
224 drop(consensus_reader);
225
226 let VidDisperseAndDuration {
227 disperse: next_epoch_vid_disperse,
228 duration: _,
229 } = VidDisperse::calculate_vid_disperse::<V>(
230 &payload.payload,
231 &self.membership_coordinator,
232 proposal_view_number,
233 target_epoch,
234 sender_epoch,
235 &payload.metadata,
236 &self.upgrade_lock,
237 )
238 .await
239 .ok()?;
240 let Ok(next_epoch_signature) = TYPES::SignatureKey::sign(
241 &self.private_key,
242 next_epoch_vid_disperse.payload_commitment().as_ref(),
243 ) else {
244 error!("VID: failed to sign dispersal payload for the next epoch");
245 return None;
246 };
247 debug!(
248 "publishing VID disperse for view {proposal_view_number} and epoch \
249 {target_epoch:?}"
250 );
251 broadcast_event(
252 Arc::new(HotShotEvent::VidDisperseSend(
253 Proposal {
254 signature: next_epoch_signature,
255 data: next_epoch_vid_disperse.clone(),
256 _pd: PhantomData,
257 },
258 self.public_key.clone(),
259 )),
260 &event_stream,
261 )
262 .await;
263 },
264 HotShotEvent::Shutdown => {
265 return Some(HotShotTaskCompleted);
266 },
267 _ => {},
268 }
269 None
270 }
271}
272
273#[async_trait]
274impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
276 for VidTaskState<TYPES, I, V>
277{
278 type Event = HotShotEvent<TYPES>;
279
280 async fn handle_event(
281 &mut self,
282 event: Arc<Self::Event>,
283 sender: &Sender<Arc<Self::Event>>,
284 _receiver: &Receiver<Arc<Self::Event>>,
285 ) -> Result<()> {
286 self.handle(event, sender.clone()).await;
287 Ok(())
288 }
289
290 fn cancel_subtasks(&mut self) {}
291}