1use std::{marker::PhantomData, sync::Arc, time::SystemTime};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14 consensus::OuterConsensus,
15 data::{EpochNumber, UpgradeProposal, ViewNumber},
16 epoch_membership::EpochMembershipCoordinator,
17 event::{Event, EventType},
18 message::{Proposal, UpgradeLock},
19 simple_certificate::UpgradeCertificate,
20 simple_vote::{UpgradeProposalData, UpgradeVote},
21 traits::{
22 block_contents::BlockHeader, node_implementation::NodeType, signature_key::SignatureKey,
23 },
24 utils::{EpochTransitionIndicator, epoch_from_block_number},
25 vote::HasViewNumber,
26};
27use hotshot_utils::anytrace::*;
28use tracing::instrument;
29use versions::EPOCH_VERSION;
30
31use crate::{
32 events::HotShotEvent,
33 helpers::broadcast_event,
34 vote_collection::{VoteCollectorsMap, handle_vote},
35};
36
37pub struct UpgradeTaskState<TYPES: NodeType> {
39 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
41
42 pub cur_view: ViewNumber,
44
45 pub cur_epoch: Option<EpochNumber>,
47
48 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
50
51 pub vote_collectors: VoteCollectorsMap<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>,
53
54 pub public_key: TYPES::SignatureKey,
56
57 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
59
60 pub id: u64,
62
63 pub epoch_start_block: u64,
65
66 pub start_proposing_view: u64,
68
69 pub stop_proposing_view: u64,
71
72 pub start_voting_view: u64,
74
75 pub stop_voting_view: u64,
77
78 pub start_proposing_time: u64,
80
81 pub stop_proposing_time: u64,
83
84 pub start_voting_time: u64,
86
87 pub stop_voting_time: u64,
89
90 pub upgrade_lock: UpgradeLock<TYPES>,
92
93 pub consensus: OuterConsensus<TYPES>,
95
96 pub epoch_height: u64,
98}
99
100impl<TYPES: NodeType> UpgradeTaskState<TYPES> {
101 async fn upgraded(&self) -> bool {
103 self.upgrade_lock.decided_upgrade_cert().is_some()
104 }
105
106 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Upgrade Task", level = "error")]
108 pub async fn handle(
109 &mut self,
110 event: Arc<HotShotEvent<TYPES>>,
111 tx: Sender<Arc<HotShotEvent<TYPES>>>,
112 ) -> Result<()> {
113 let upgrade = self.upgrade_lock.upgrade();
114 match event.as_ref() {
115 HotShotEvent::UpgradeProposalRecv(proposal, sender) => {
116 tracing::info!("Received upgrade proposal: {proposal:?}");
117
118 let view = *proposal.data.view_number();
119
120 ensure!(
122 !self.upgraded().await,
123 info!("Already upgraded to {upgrade:?}; not voting.")
124 );
125
126 let time = SystemTime::now()
127 .duration_since(SystemTime::UNIX_EPOCH)
128 .wrap()
129 .context(error!(
130 "Failed to calculate duration. This should never happen."
131 ))?
132 .as_secs();
133
134 ensure!(
135 time >= self.start_voting_time && time < self.stop_voting_time,
136 "Refusing to vote because we are no longer in the configured vote time window."
137 );
138
139 ensure!(
140 view >= self.start_voting_view && view < self.stop_voting_view,
141 "Refusing to vote because we are no longer in the configured vote view window."
142 );
143
144 ensure!(
146 proposal.data.upgrade_proposal.new_version_hash == *upgrade.hash()
147 && proposal.data.upgrade_proposal.old_version == upgrade.base
148 && proposal.data.upgrade_proposal.new_version == upgrade.target,
149 "Proposal does not match our upgrade target"
150 );
151
152 tracing::info!(
154 "Upgrade proposal received for view: {:?}",
155 proposal.data.view_number()
156 );
157
158 let epoch_upgrade_checks = if upgrade.target >= EPOCH_VERSION
159 && upgrade.base < EPOCH_VERSION
160 {
161 let consensus_reader = self.consensus.read().await;
162
163 let Some((_, last_proposal)) =
164 consensus_reader.last_proposals().last_key_value()
165 else {
166 tracing::error!(
167 "No recent quorum proposals in consensus state -- skipping upgrade \
168 proposal vote."
169 );
170 return Err(error!(
171 "No recent quorum proposals in consensus state -- skipping upgrade \
172 proposal vote."
173 ));
174 };
175
176 let last_proposal_view: u64 = *last_proposal.data.view_number();
177 let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
178
179 drop(consensus_reader);
180
181 let target_start_epoch =
182 epoch_from_block_number(self.epoch_start_block, self.epoch_height);
183 let last_proposal_epoch =
184 epoch_from_block_number(last_proposal_block, self.epoch_height);
185 let upgrade_finish_epoch = epoch_from_block_number(
186 last_proposal_block
187 + (*proposal.data.upgrade_proposal.new_version_first_view
188 - last_proposal_view)
189 + 10,
190 self.epoch_height,
191 );
192
193 target_start_epoch == last_proposal_epoch
194 && last_proposal_epoch == upgrade_finish_epoch
195 } else {
196 true
197 };
198
199 ensure!(
200 epoch_upgrade_checks,
201 error!("Epoch upgrade safety check failed! Refusing to vote on upgrade.")
202 );
203
204 let view = proposal.data.view_number();
205
206 ensure!(
222 self.cur_view != ViewNumber::genesis()
223 && *view >= self.cur_view.saturating_sub(1),
224 warn!(
225 "Discarding old upgrade proposal; the proposal is for view {view}, but \
226 the current view is {}.",
227 self.cur_view
228 )
229 );
230
231 let view_leader_key = self
233 .membership_coordinator
234 .membership_for_epoch(self.cur_epoch)
235 .await?
236 .leader(view)
237 .await?;
238 ensure!(
239 view_leader_key == *sender,
240 info!(
241 "Upgrade proposal doesn't have expected leader key for view {} \n Upgrade \
242 proposal is: {:?}",
243 *view, proposal.data
244 )
245 );
246
247 broadcast_event(
252 Event {
253 view_number: self.cur_view,
254 event: EventType::UpgradeProposal {
255 proposal: proposal.clone(),
256 sender: sender.clone(),
257 },
258 },
259 &self.output_event_stream,
260 )
261 .await;
262
263 let vote = UpgradeVote::create_signed_vote(
265 proposal.data.upgrade_proposal.clone(),
266 view,
267 &self.public_key,
268 &self.private_key,
269 &self.upgrade_lock,
270 )
271 .await?;
272
273 tracing::debug!("Sending upgrade vote {:?}", vote.view_number());
274 broadcast_event(Arc::new(HotShotEvent::UpgradeVoteSend(vote)), &tx).await;
275 },
276 HotShotEvent::UpgradeVoteRecv(vote) => {
277 tracing::debug!("Upgrade vote recv, Main Task {:?}", vote.view_number());
278
279 let view = vote.view_number();
281 let epoch_membership = self
282 .membership_coordinator
283 .membership_for_epoch(self.cur_epoch)
284 .await?;
285 ensure!(
286 epoch_membership.leader(view).await? == self.public_key,
287 debug!(
288 "We are not the leader for view {} are we leader for next view? {}",
289 *view,
290 epoch_membership.leader(view + 1).await? == self.public_key
291 )
292 );
293
294 handle_vote(
295 &mut self.vote_collectors,
296 vote,
297 self.public_key.clone(),
298 &epoch_membership,
299 self.id,
300 &event,
301 &tx,
302 &self.upgrade_lock,
303 EpochTransitionIndicator::NotInTransition,
304 )
305 .await?;
306 },
307 HotShotEvent::ViewChange(new_view, epoch_number) => {
308 if *epoch_number > self.cur_epoch {
309 self.cur_epoch = *epoch_number;
310 }
311 ensure!(self.cur_view < *new_view || *self.cur_view == 0);
312
313 self.cur_view = *new_view;
314
315 let view: u64 = *self.cur_view;
316 let time = SystemTime::now()
317 .duration_since(SystemTime::UNIX_EPOCH)
318 .wrap()
319 .context(error!(
320 "Failed to calculate duration. This should never happen."
321 ))?
322 .as_secs();
323
324 let leader = self
325 .membership_coordinator
326 .membership_for_epoch(self.cur_epoch)
327 .await?
328 .leader(ViewNumber::new(
329 view + TYPES::UPGRADE_CONSTANTS.propose_offset,
330 ))
331 .await?;
332
333 let old_version_last_view = view + TYPES::UPGRADE_CONSTANTS.begin_offset;
334 let new_version_first_view = view + TYPES::UPGRADE_CONSTANTS.finish_offset;
335 let decide_by = view + TYPES::UPGRADE_CONSTANTS.decide_by_offset;
336
337 let epoch_upgrade_checks = if upgrade.target >= EPOCH_VERSION
338 && upgrade.base < EPOCH_VERSION
339 {
340 let consensus_reader = self.consensus.read().await;
341
342 let Some((_, last_proposal)) =
343 consensus_reader.last_proposals().last_key_value()
344 else {
345 tracing::error!(
346 "No recent quorum proposals in consensus state -- skipping upgrade \
347 proposal."
348 );
349 return Err(error!(
350 "No recent quorum proposals in consensus state -- skipping upgrade \
351 proposal."
352 ));
353 };
354
355 let last_proposal_view: u64 = *last_proposal.data.view_number();
356 let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
357
358 drop(consensus_reader);
359
360 let target_start_epoch =
361 epoch_from_block_number(self.epoch_start_block, self.epoch_height);
362 let last_proposal_epoch =
363 epoch_from_block_number(last_proposal_block, self.epoch_height);
364 let upgrade_finish_epoch = epoch_from_block_number(
365 last_proposal_block + (new_version_first_view - last_proposal_view) + 10,
366 self.epoch_height,
367 );
368
369 target_start_epoch == last_proposal_epoch
370 && last_proposal_epoch == upgrade_finish_epoch
371 } else {
372 true
373 };
374
375 if view >= self.start_proposing_view
377 && view < self.stop_proposing_view
378 && time >= self.start_proposing_time
379 && time < self.stop_proposing_time
380 && !self.upgraded().await
381 && epoch_upgrade_checks
382 && leader == self.public_key
383 {
384 let upgrade_proposal_data = UpgradeProposalData {
385 old_version: upgrade.base,
386 new_version: upgrade.target,
387 new_version_hash: upgrade.hash().into(),
388 old_version_last_view: ViewNumber::new(old_version_last_view),
389 new_version_first_view: ViewNumber::new(new_version_first_view),
390 decide_by: ViewNumber::new(decide_by),
391 };
392
393 let upgrade_proposal = UpgradeProposal {
394 upgrade_proposal: upgrade_proposal_data.clone(),
395 view_number: ViewNumber::new(
396 view + TYPES::UPGRADE_CONSTANTS.propose_offset,
397 ),
398 };
399
400 let signature = TYPES::SignatureKey::sign(
401 &self.private_key,
402 upgrade_proposal_data.commit().as_ref(),
403 )
404 .expect("Failed to sign upgrade proposal commitment!");
405
406 tracing::warn!("Sending upgrade proposal:\n\n {upgrade_proposal:?}");
407
408 let message = Proposal {
409 data: upgrade_proposal,
410 signature,
411 _pd: PhantomData,
412 };
413
414 broadcast_event(
415 Arc::new(HotShotEvent::UpgradeProposalSend(
416 message,
417 self.public_key.clone(),
418 )),
419 &tx,
420 )
421 .await;
422 }
423 },
424 _ => {},
425 }
426 Ok(())
427 }
428}
429
430#[async_trait]
432impl<TYPES: NodeType> TaskState for UpgradeTaskState<TYPES> {
433 type Event = HotShotEvent<TYPES>;
434
435 async fn handle_event(
436 &mut self,
437 event: Arc<Self::Event>,
438 sender: &Sender<Arc<Self::Event>>,
439 _receiver: &Receiver<Arc<Self::Event>>,
440 ) -> Result<()> {
441 self.handle(event, sender.clone()).await?;
442
443 Ok(())
444 }
445
446 fn cancel_subtasks(&mut self) {}
447}