1use std::{marker::PhantomData, sync::Arc, time::SystemTime};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14 consensus::OuterConsensus,
15 data::UpgradeProposal,
16 epoch_membership::EpochMembershipCoordinator,
17 event::{Event, EventType},
18 message::{Proposal, UpgradeLock},
19 simple_certificate::UpgradeCertificate,
20 simple_vote::{UpgradeProposalData, UpgradeVote},
21 traits::{
22 block_contents::BlockHeader,
23 node_implementation::{ConsensusTime, NodeType, Versions},
24 signature_key::SignatureKey,
25 },
26 utils::{epoch_from_block_number, EpochTransitionIndicator},
27 vote::HasViewNumber,
28};
29use hotshot_utils::anytrace::*;
30use tracing::instrument;
31use vbs::version::StaticVersionType;
32
33use crate::{
34 events::HotShotEvent,
35 helpers::broadcast_event,
36 vote_collection::{handle_vote, VoteCollectorsMap},
37};
38
39pub struct UpgradeTaskState<TYPES: NodeType, V: Versions> {
41 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
43
44 pub cur_view: TYPES::View,
46
47 pub cur_epoch: Option<TYPES::Epoch>,
49
50 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
52
53 pub vote_collectors: VoteCollectorsMap<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>, V>,
55
56 pub public_key: TYPES::SignatureKey,
58
59 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
61
62 pub id: u64,
64
65 pub epoch_start_block: u64,
67
68 pub start_proposing_view: u64,
70
71 pub stop_proposing_view: u64,
73
74 pub start_voting_view: u64,
76
77 pub stop_voting_view: u64,
79
80 pub start_proposing_time: u64,
82
83 pub stop_proposing_time: u64,
85
86 pub start_voting_time: u64,
88
89 pub stop_voting_time: u64,
91
92 pub upgrade_lock: UpgradeLock<TYPES, V>,
94
95 pub consensus: OuterConsensus<TYPES>,
97
98 pub epoch_height: u64,
100}
101
102impl<TYPES: NodeType, V: Versions> UpgradeTaskState<TYPES, V> {
103 async fn upgraded(&self) -> bool {
105 self.upgrade_lock
106 .decided_upgrade_certificate
107 .read()
108 .await
109 .is_some()
110 }
111
112 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Upgrade Task", level = "error")]
114 pub async fn handle(
115 &mut self,
116 event: Arc<HotShotEvent<TYPES>>,
117 tx: Sender<Arc<HotShotEvent<TYPES>>>,
118 ) -> Result<()> {
119 match event.as_ref() {
120 HotShotEvent::UpgradeProposalRecv(proposal, sender) => {
121 tracing::info!("Received upgrade proposal: {proposal:?}");
122
123 let view = *proposal.data.view_number();
124
125 ensure!(
127 !self.upgraded().await,
128 info!("Already upgraded to {:?}; not voting.", V::Upgrade::VERSION)
129 );
130
131 let time = SystemTime::now()
132 .duration_since(SystemTime::UNIX_EPOCH)
133 .wrap()
134 .context(error!(
135 "Failed to calculate duration. This should never happen."
136 ))?
137 .as_secs();
138
139 ensure!(
140 time >= self.start_voting_time && time < self.stop_voting_time,
141 "Refusing to vote because we are no longer in the configured vote time window."
142 );
143
144 ensure!(
145 view >= self.start_voting_view && view < self.stop_voting_view,
146 "Refusing to vote because we are no longer in the configured vote view window."
147 );
148
149 ensure!(
151 proposal.data.upgrade_proposal.new_version_hash == V::UPGRADE_HASH
152 && proposal.data.upgrade_proposal.old_version == V::Base::VERSION
153 && proposal.data.upgrade_proposal.new_version == V::Upgrade::VERSION,
154 "Proposal does not match our upgrade target"
155 );
156
157 tracing::info!(
159 "Upgrade proposal received for view: {:?}",
160 proposal.data.view_number()
161 );
162
163 let epoch_upgrade_checks = if V::Upgrade::VERSION >= V::Epochs::VERSION
164 && V::Base::VERSION < V::Epochs::VERSION
165 {
166 let consensus_reader = self.consensus.read().await;
167
168 let Some((_, last_proposal)) =
169 consensus_reader.last_proposals().last_key_value()
170 else {
171 tracing::error!(
172 "No recent quorum proposals in consensus state -- skipping upgrade \
173 proposal vote."
174 );
175 return Err(error!(
176 "No recent quorum proposals in consensus state -- skipping upgrade \
177 proposal vote."
178 ));
179 };
180
181 let last_proposal_view: u64 = *last_proposal.data.view_number();
182 let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
183
184 drop(consensus_reader);
185
186 let target_start_epoch =
187 epoch_from_block_number(self.epoch_start_block, self.epoch_height);
188 let last_proposal_epoch =
189 epoch_from_block_number(last_proposal_block, self.epoch_height);
190 let upgrade_finish_epoch = epoch_from_block_number(
191 last_proposal_block
192 + (*proposal.data.upgrade_proposal.new_version_first_view
193 - last_proposal_view)
194 + 10,
195 self.epoch_height,
196 );
197
198 target_start_epoch == last_proposal_epoch
199 && last_proposal_epoch == upgrade_finish_epoch
200 } else {
201 true
202 };
203
204 ensure!(
205 epoch_upgrade_checks,
206 error!("Epoch upgrade safety check failed! Refusing to vote on upgrade.")
207 );
208
209 let view = proposal.data.view_number();
210
211 ensure!(
227 self.cur_view != TYPES::View::genesis()
228 && *view >= self.cur_view.saturating_sub(1),
229 warn!(
230 "Discarding old upgrade proposal; the proposal is for view {view}, but \
231 the current view is {}.",
232 self.cur_view
233 )
234 );
235
236 let view_leader_key = self
238 .membership_coordinator
239 .membership_for_epoch(self.cur_epoch)
240 .await?
241 .leader(view)
242 .await?;
243 ensure!(
244 view_leader_key == *sender,
245 info!(
246 "Upgrade proposal doesn't have expected leader key for view {} \n Upgrade \
247 proposal is: {:?}",
248 *view, proposal.data
249 )
250 );
251
252 broadcast_event(
257 Event {
258 view_number: self.cur_view,
259 event: EventType::UpgradeProposal {
260 proposal: proposal.clone(),
261 sender: sender.clone(),
262 },
263 },
264 &self.output_event_stream,
265 )
266 .await;
267
268 let vote = UpgradeVote::create_signed_vote(
270 proposal.data.upgrade_proposal.clone(),
271 view,
272 &self.public_key,
273 &self.private_key,
274 &self.upgrade_lock,
275 )
276 .await?;
277
278 tracing::debug!("Sending upgrade vote {:?}", vote.view_number());
279 broadcast_event(Arc::new(HotShotEvent::UpgradeVoteSend(vote)), &tx).await;
280 },
281 HotShotEvent::UpgradeVoteRecv(ref vote) => {
282 tracing::debug!("Upgrade vote recv, Main Task {:?}", vote.view_number());
283
284 let view = vote.view_number();
286 let epoch_membership = self
287 .membership_coordinator
288 .membership_for_epoch(self.cur_epoch)
289 .await?;
290 ensure!(
291 epoch_membership.leader(view).await? == self.public_key,
292 debug!(
293 "We are not the leader for view {} are we leader for next view? {}",
294 *view,
295 epoch_membership.leader(view + 1).await? == self.public_key
296 )
297 );
298
299 handle_vote(
300 &mut self.vote_collectors,
301 vote,
302 self.public_key.clone(),
303 &epoch_membership,
304 self.id,
305 &event,
306 &tx,
307 &self.upgrade_lock,
308 EpochTransitionIndicator::NotInTransition,
309 )
310 .await?;
311 },
312 HotShotEvent::ViewChange(new_view, epoch_number) => {
313 if *epoch_number > self.cur_epoch {
314 self.cur_epoch = *epoch_number;
315 }
316 ensure!(self.cur_view < *new_view || *self.cur_view == 0);
317
318 self.cur_view = *new_view;
319
320 let view: u64 = *self.cur_view;
321 let time = SystemTime::now()
322 .duration_since(SystemTime::UNIX_EPOCH)
323 .wrap()
324 .context(error!(
325 "Failed to calculate duration. This should never happen."
326 ))?
327 .as_secs();
328
329 let leader = self
330 .membership_coordinator
331 .membership_for_epoch(self.cur_epoch)
332 .await?
333 .leader(TYPES::View::new(
334 view + TYPES::UPGRADE_CONSTANTS.propose_offset,
335 ))
336 .await?;
337
338 let old_version_last_view = view + TYPES::UPGRADE_CONSTANTS.begin_offset;
339 let new_version_first_view = view + TYPES::UPGRADE_CONSTANTS.finish_offset;
340 let decide_by = view + TYPES::UPGRADE_CONSTANTS.decide_by_offset;
341
342 let epoch_upgrade_checks = if V::Upgrade::VERSION >= V::Epochs::VERSION
343 && V::Base::VERSION < V::Epochs::VERSION
344 {
345 let consensus_reader = self.consensus.read().await;
346
347 let Some((_, last_proposal)) =
348 consensus_reader.last_proposals().last_key_value()
349 else {
350 tracing::error!(
351 "No recent quorum proposals in consensus state -- skipping upgrade \
352 proposal."
353 );
354 return Err(error!(
355 "No recent quorum proposals in consensus state -- skipping upgrade \
356 proposal."
357 ));
358 };
359
360 let last_proposal_view: u64 = *last_proposal.data.view_number();
361 let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
362
363 drop(consensus_reader);
364
365 let target_start_epoch =
366 epoch_from_block_number(self.epoch_start_block, self.epoch_height);
367 let last_proposal_epoch =
368 epoch_from_block_number(last_proposal_block, self.epoch_height);
369 let upgrade_finish_epoch = epoch_from_block_number(
370 last_proposal_block + (new_version_first_view - last_proposal_view) + 10,
371 self.epoch_height,
372 );
373
374 target_start_epoch == last_proposal_epoch
375 && last_proposal_epoch == upgrade_finish_epoch
376 } else {
377 true
378 };
379
380 if view >= self.start_proposing_view
382 && view < self.stop_proposing_view
383 && time >= self.start_proposing_time
384 && time < self.stop_proposing_time
385 && !self.upgraded().await
386 && epoch_upgrade_checks
387 && leader == self.public_key
388 {
389 let upgrade_proposal_data = UpgradeProposalData {
390 old_version: V::Base::VERSION,
391 new_version: V::Upgrade::VERSION,
392 new_version_hash: V::UPGRADE_HASH.to_vec(),
393 old_version_last_view: TYPES::View::new(old_version_last_view),
394 new_version_first_view: TYPES::View::new(new_version_first_view),
395 decide_by: TYPES::View::new(decide_by),
396 };
397
398 let upgrade_proposal = UpgradeProposal {
399 upgrade_proposal: upgrade_proposal_data.clone(),
400 view_number: TYPES::View::new(
401 view + TYPES::UPGRADE_CONSTANTS.propose_offset,
402 ),
403 };
404
405 let signature = TYPES::SignatureKey::sign(
406 &self.private_key,
407 upgrade_proposal_data.commit().as_ref(),
408 )
409 .expect("Failed to sign upgrade proposal commitment!");
410
411 tracing::warn!("Sending upgrade proposal:\n\n {upgrade_proposal:?}");
412
413 let message = Proposal {
414 data: upgrade_proposal,
415 signature,
416 _pd: PhantomData,
417 };
418
419 broadcast_event(
420 Arc::new(HotShotEvent::UpgradeProposalSend(
421 message,
422 self.public_key.clone(),
423 )),
424 &tx,
425 )
426 .await;
427 }
428 },
429 _ => {},
430 }
431 Ok(())
432 }
433}
434
435#[async_trait]
436impl<TYPES: NodeType, V: Versions> TaskState for UpgradeTaskState<TYPES, V> {
438 type Event = HotShotEvent<TYPES>;
439
440 async fn handle_event(
441 &mut self,
442 event: Arc<Self::Event>,
443 sender: &Sender<Arc<Self::Event>>,
444 _receiver: &Receiver<Arc<Self::Event>>,
445 ) -> Result<()> {
446 self.handle(event, sender.clone()).await?;
447
448 Ok(())
449 }
450
451 fn cancel_subtasks(&mut self) {}
452}