1use std::{marker::PhantomData, sync::Arc};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13 consensus::{OuterConsensus, PayloadWithMetadata},
14 data::{PackedBundle, VidDisperse, VidDisperseAndDuration, VidDisperseShare},
15 epoch_membership::EpochMembershipCoordinator,
16 message::{Proposal, UpgradeLock},
17 simple_vote::HasEpoch,
18 traits::{
19 block_contents::BlockHeader,
20 node_implementation::{NodeImplementation, NodeType, Versions},
21 signature_key::SignatureKey,
22 BlockPayload,
23 },
24 utils::{is_epoch_transition, option_epoch_from_block_number},
25};
26use hotshot_utils::anytrace::Result;
27use tracing::{debug, error, info, instrument};
28
29use crate::{
30 events::{HotShotEvent, HotShotTaskCompleted},
31 helpers::broadcast_event,
32};
33
34pub struct VidTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
36 pub cur_view: TYPES::View,
38
39 pub cur_epoch: Option<TYPES::Epoch>,
41
42 pub consensus: OuterConsensus<TYPES>,
44
45 pub network: Arc<I::Network>,
47
48 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
50
51 pub public_key: TYPES::SignatureKey,
53
54 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
56
57 pub id: u64,
59
60 pub upgrade_lock: UpgradeLock<TYPES, V>,
62
63 pub epoch_height: u64,
65}
66
67impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> VidTaskState<TYPES, I, V> {
68 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "VID Main Task", level = "error", target = "VidTaskState")]
70 pub async fn handle(
71 &mut self,
72 event: Arc<HotShotEvent<TYPES>>,
73 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
74 ) -> Option<HotShotTaskCompleted> {
75 match event.as_ref() {
76 HotShotEvent::BlockRecv(packed_bundle) => {
77 let PackedBundle::<TYPES> {
78 encoded_transactions,
79 metadata,
80 view_number,
81 sequencing_fees,
82 ..
83 } = packed_bundle;
84 let payload =
85 <TYPES as NodeType>::BlockPayload::from_bytes(encoded_transactions, metadata);
86 let builder_commitment = payload.builder_commitment(metadata);
87 let epoch = self.cur_epoch;
88 if self
89 .membership_coordinator
90 .membership_for_epoch(epoch)
91 .await
92 .ok()?
93 .leader(*view_number)
94 .await
95 .ok()?
96 != self.public_key
97 {
98 tracing::debug!(
99 "We are not the leader in the current epoch. Do not send the VID \
100 dispersal."
101 );
102 return None;
103 }
104 let VidDisperseAndDuration {
105 disperse: vid_disperse,
106 duration: disperse_duration,
107 } = VidDisperse::calculate_vid_disperse::<V>(
108 &payload,
109 &self.membership_coordinator,
110 *view_number,
111 epoch,
112 epoch,
113 metadata,
114 &self.upgrade_lock,
115 )
116 .await
117 .ok()?;
118 let payload_commitment = vid_disperse.payload_commitment();
119 let shares = VidDisperseShare::from_vid_disperse(vid_disperse.clone());
120 let payload_with_metadata = Arc::new(PayloadWithMetadata {
121 payload,
122 metadata: metadata.clone(),
123 });
124
125 let mut consensus_writer = self.consensus.write().await;
126 consensus_writer
127 .metrics
128 .vid_disperse_duration
129 .add_point(disperse_duration.as_secs_f64());
130 if let Err(e) =
132 consensus_writer.update_saved_payloads(*view_number, payload_with_metadata)
133 {
134 tracing::debug!(error=?e);
135 }
136 for share in shares {
137 if let Some(share) = share.to_proposal(&self.private_key) {
138 consensus_writer.update_vid_shares(*view_number, share);
139 }
140 }
141 drop(consensus_writer);
142
143 broadcast_event(
145 Arc::new(HotShotEvent::SendPayloadCommitmentAndMetadata(
146 payload_commitment,
147 builder_commitment,
148 metadata.clone(),
149 *view_number,
150 sequencing_fees.clone(),
151 )),
152 &event_stream,
153 )
154 .await;
155
156 let view_number = *view_number;
157 let Ok(signature) = TYPES::SignatureKey::sign(
158 &self.private_key,
159 vid_disperse.payload_commitment_ref(),
160 ) else {
161 error!("VID: failed to sign dispersal payload");
162 return None;
163 };
164 debug!("publishing VID disperse for view {view_number} and epoch {epoch:?}");
165 broadcast_event(
166 Arc::new(HotShotEvent::VidDisperseSend(
167 Proposal {
168 signature,
169 data: vid_disperse,
170 _pd: PhantomData,
171 },
172 self.public_key.clone(),
173 )),
174 &event_stream,
175 )
176 .await;
177 },
178
179 HotShotEvent::ViewChange(view, epoch) => {
180 if *epoch > self.cur_epoch {
181 self.cur_epoch = *epoch;
182 }
183
184 let view = *view;
185 if (*view != 0 || *self.cur_view > 0) && *self.cur_view >= *view {
186 return None;
187 }
188
189 if *view - *self.cur_view > 1 {
190 info!("View changed by more than 1 going to view {view}");
191 }
192 self.cur_view = view;
193
194 return None;
195 },
196
197 HotShotEvent::QuorumProposalSend(proposal, _) => {
198 let proposed_block_number = proposal.data.block_header().block_number();
199 if proposal.data.epoch().is_none()
200 || !is_epoch_transition(proposed_block_number, self.epoch_height)
201 {
202 return None;
204 }
205 let proposal_view_number = proposal.data.view_number();
208 let sender_epoch = option_epoch_from_block_number::<TYPES>(
209 true,
210 proposed_block_number,
211 self.epoch_height,
212 );
213 let target_epoch = sender_epoch.map(|x| x + 1);
214
215 let consensus_reader = self.consensus.read().await;
216 let Some(payload) = consensus_reader.saved_payloads().get(&proposal_view_number)
217 else {
218 tracing::warn!(
219 "We need to calculate VID for the nodes in the next epoch but we don't \
220 have the transactions"
221 );
222 return None;
223 };
224 let payload = Arc::clone(payload);
225 drop(consensus_reader);
226
227 let VidDisperseAndDuration {
228 disperse: next_epoch_vid_disperse,
229 duration: _,
230 } = VidDisperse::calculate_vid_disperse::<V>(
231 &payload.payload,
232 &self.membership_coordinator,
233 proposal_view_number,
234 target_epoch,
235 sender_epoch,
236 &payload.metadata,
237 &self.upgrade_lock,
238 )
239 .await
240 .ok()?;
241 let Ok(next_epoch_signature) = TYPES::SignatureKey::sign(
242 &self.private_key,
243 next_epoch_vid_disperse.payload_commitment().as_ref(),
244 ) else {
245 error!("VID: failed to sign dispersal payload for the next epoch");
246 return None;
247 };
248 debug!(
249 "publishing VID disperse for view {proposal_view_number} and epoch \
250 {target_epoch:?}"
251 );
252 broadcast_event(
253 Arc::new(HotShotEvent::VidDisperseSend(
254 Proposal {
255 signature: next_epoch_signature,
256 data: next_epoch_vid_disperse.clone(),
257 _pd: PhantomData,
258 },
259 self.public_key.clone(),
260 )),
261 &event_stream,
262 )
263 .await;
264 },
265 HotShotEvent::Shutdown => {
266 return Some(HotShotTaskCompleted);
267 },
268 _ => {},
269 }
270 None
271 }
272}
273
274#[async_trait]
275impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
277 for VidTaskState<TYPES, I, V>
278{
279 type Event = HotShotEvent<TYPES>;
280
281 async fn handle_event(
282 &mut self,
283 event: Arc<Self::Event>,
284 sender: &Sender<Arc<Self::Event>>,
285 _receiver: &Receiver<Arc<Self::Event>>,
286 ) -> Result<()> {
287 self.handle(event, sender.clone()).await;
288 Ok(())
289 }
290
291 fn cancel_subtasks(&mut self) {}
292}