1use std::{marker::PhantomData, sync::Arc, time::SystemTime};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14 consensus::OuterConsensus,
15 data::UpgradeProposal,
16 epoch_membership::EpochMembershipCoordinator,
17 event::{Event, EventType},
18 message::{Proposal, UpgradeLock},
19 simple_certificate::UpgradeCertificate,
20 simple_vote::{UpgradeProposalData, UpgradeVote},
21 traits::{
22 block_contents::BlockHeader,
23 node_implementation::{ConsensusTime, NodeType, Versions},
24 signature_key::SignatureKey,
25 },
26 utils::{epoch_from_block_number, EpochTransitionIndicator},
27 vote::HasViewNumber,
28};
29use hotshot_utils::anytrace::*;
30use tracing::instrument;
31use vbs::version::StaticVersionType;
32
33use crate::{
34 events::HotShotEvent,
35 helpers::broadcast_event,
36 vote_collection::{handle_vote, VoteCollectorsMap},
37};
38
39pub struct UpgradeTaskState<TYPES: NodeType, V: Versions> {
41 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
43
44 pub cur_view: TYPES::View,
46
47 pub cur_epoch: Option<TYPES::Epoch>,
49
50 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
52
53 pub vote_collectors: VoteCollectorsMap<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>, V>,
55
56 pub public_key: TYPES::SignatureKey,
58
59 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
61
62 pub id: u64,
64
65 pub epoch_start_block: u64,
67
68 pub start_proposing_view: u64,
70
71 pub stop_proposing_view: u64,
73
74 pub start_voting_view: u64,
76
77 pub stop_voting_view: u64,
79
80 pub start_proposing_time: u64,
82
83 pub stop_proposing_time: u64,
85
86 pub start_voting_time: u64,
88
89 pub stop_voting_time: u64,
91
92 pub upgrade_lock: UpgradeLock<TYPES, V>,
94
95 pub consensus: OuterConsensus<TYPES>,
97
98 pub epoch_height: u64,
100}
101
102impl<TYPES: NodeType, V: Versions> UpgradeTaskState<TYPES, V> {
103 async fn upgraded(&self) -> bool {
105 self.upgrade_lock
106 .decided_upgrade_certificate
107 .read()
108 .await
109 .is_some()
110 }
111
112 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Upgrade Task", level = "error")]
114 pub async fn handle(
115 &mut self,
116 event: Arc<HotShotEvent<TYPES>>,
117 tx: Sender<Arc<HotShotEvent<TYPES>>>,
118 ) -> Result<()> {
119 match event.as_ref() {
120 HotShotEvent::UpgradeProposalRecv(proposal, sender) => {
121 tracing::info!("Received upgrade proposal: {proposal:?}");
122
123 let view = *proposal.data.view_number();
124
125 ensure!(
127 !self.upgraded().await,
128 info!("Already upgraded to {:?}; not voting.", V::Upgrade::VERSION)
129 );
130
131 let time = SystemTime::now()
132 .duration_since(SystemTime::UNIX_EPOCH)
133 .wrap()
134 .context(error!(
135 "Failed to calculate duration. This should never happen."
136 ))?
137 .as_secs();
138
139 ensure!(
140 time >= self.start_voting_time && time < self.stop_voting_time,
141 "Refusing to vote because we are no longer in the configured vote time window."
142 );
143
144 ensure!(
145 view >= self.start_voting_view && view < self.stop_voting_view,
146 "Refusing to vote because we are no longer in the configured vote view window."
147 );
148
149 ensure!(
151 proposal.data.upgrade_proposal.new_version_hash == V::UPGRADE_HASH
152 && proposal.data.upgrade_proposal.old_version == V::Base::VERSION
153 && proposal.data.upgrade_proposal.new_version == V::Upgrade::VERSION,
154 "Proposal does not match our upgrade target"
155 );
156
157 tracing::info!(
159 "Upgrade proposal received for view: {:?}",
160 proposal.data.view_number()
161 );
162
163 let epoch_upgrade_checks = if V::Upgrade::VERSION == V::Epochs::VERSION {
164 let consensus_reader = self.consensus.read().await;
165
166 let Some((_, last_proposal)) =
167 consensus_reader.last_proposals().last_key_value()
168 else {
169 tracing::error!(
170 "No recent quorum proposals in consensus state -- skipping upgrade \
171 proposal vote."
172 );
173 return Err(error!(
174 "No recent quorum proposals in consensus state -- skipping upgrade \
175 proposal vote."
176 ));
177 };
178
179 let last_proposal_view: u64 = *last_proposal.data.view_number();
180 let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
181
182 drop(consensus_reader);
183
184 let target_start_epoch =
185 epoch_from_block_number(self.epoch_start_block, self.epoch_height);
186 let last_proposal_epoch =
187 epoch_from_block_number(last_proposal_block, self.epoch_height);
188 let upgrade_finish_epoch = epoch_from_block_number(
189 last_proposal_block
190 + (*proposal.data.upgrade_proposal.new_version_first_view
191 - last_proposal_view)
192 + 10,
193 self.epoch_height,
194 );
195
196 target_start_epoch == last_proposal_epoch
197 && last_proposal_epoch == upgrade_finish_epoch
198 } else {
199 true
200 };
201
202 ensure!(
203 epoch_upgrade_checks,
204 error!("Epoch upgrade safety check failed! Refusing to vote on upgrade.")
205 );
206
207 let view = proposal.data.view_number();
208
209 ensure!(
225 self.cur_view != TYPES::View::genesis()
226 && *view >= self.cur_view.saturating_sub(1),
227 warn!(
228 "Discarding old upgrade proposal; the proposal is for view {view}, but \
229 the current view is {}.",
230 self.cur_view
231 )
232 );
233
234 let view_leader_key = self
236 .membership_coordinator
237 .membership_for_epoch(self.cur_epoch)
238 .await?
239 .leader(view)
240 .await?;
241 ensure!(
242 view_leader_key == *sender,
243 info!(
244 "Upgrade proposal doesn't have expected leader key for view {} \n Upgrade \
245 proposal is: {:?}",
246 *view, proposal.data
247 )
248 );
249
250 broadcast_event(
255 Event {
256 view_number: self.cur_view,
257 event: EventType::UpgradeProposal {
258 proposal: proposal.clone(),
259 sender: sender.clone(),
260 },
261 },
262 &self.output_event_stream,
263 )
264 .await;
265
266 let vote = UpgradeVote::create_signed_vote(
268 proposal.data.upgrade_proposal.clone(),
269 view,
270 &self.public_key,
271 &self.private_key,
272 &self.upgrade_lock,
273 )
274 .await?;
275
276 tracing::debug!("Sending upgrade vote {:?}", vote.view_number());
277 broadcast_event(Arc::new(HotShotEvent::UpgradeVoteSend(vote)), &tx).await;
278 },
279 HotShotEvent::UpgradeVoteRecv(ref vote) => {
280 tracing::debug!("Upgrade vote recv, Main Task {:?}", vote.view_number());
281
282 let view = vote.view_number();
284 let epoch_membership = self
285 .membership_coordinator
286 .membership_for_epoch(self.cur_epoch)
287 .await?;
288 ensure!(
289 epoch_membership.leader(view).await? == self.public_key,
290 debug!(
291 "We are not the leader for view {} are we leader for next view? {}",
292 *view,
293 epoch_membership.leader(view + 1).await? == self.public_key
294 )
295 );
296
297 handle_vote(
298 &mut self.vote_collectors,
299 vote,
300 self.public_key.clone(),
301 &epoch_membership,
302 self.id,
303 &event,
304 &tx,
305 &self.upgrade_lock,
306 EpochTransitionIndicator::NotInTransition,
307 )
308 .await?;
309 },
310 HotShotEvent::ViewChange(new_view, epoch_number) => {
311 if *epoch_number > self.cur_epoch {
312 self.cur_epoch = *epoch_number;
313 }
314 ensure!(self.cur_view < *new_view || *self.cur_view == 0);
315
316 self.cur_view = *new_view;
317
318 let view: u64 = *self.cur_view;
319 let time = SystemTime::now()
320 .duration_since(SystemTime::UNIX_EPOCH)
321 .wrap()
322 .context(error!(
323 "Failed to calculate duration. This should never happen."
324 ))?
325 .as_secs();
326
327 let leader = self
328 .membership_coordinator
329 .membership_for_epoch(self.cur_epoch)
330 .await?
331 .leader(TYPES::View::new(
332 view + TYPES::UPGRADE_CONSTANTS.propose_offset,
333 ))
334 .await?;
335
336 let old_version_last_view = view + TYPES::UPGRADE_CONSTANTS.begin_offset;
337 let new_version_first_view = view + TYPES::UPGRADE_CONSTANTS.finish_offset;
338 let decide_by = view + TYPES::UPGRADE_CONSTANTS.decide_by_offset;
339
340 let epoch_upgrade_checks = if V::Upgrade::VERSION == V::Epochs::VERSION {
341 let consensus_reader = self.consensus.read().await;
342
343 let Some((_, last_proposal)) =
344 consensus_reader.last_proposals().last_key_value()
345 else {
346 tracing::error!(
347 "No recent quorum proposals in consensus state -- skipping upgrade \
348 proposal."
349 );
350 return Err(error!(
351 "No recent quorum proposals in consensus state -- skipping upgrade \
352 proposal."
353 ));
354 };
355
356 let last_proposal_view: u64 = *last_proposal.data.view_number();
357 let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
358
359 drop(consensus_reader);
360
361 let target_start_epoch =
362 epoch_from_block_number(self.epoch_start_block, self.epoch_height);
363 let last_proposal_epoch =
364 epoch_from_block_number(last_proposal_block, self.epoch_height);
365 let upgrade_finish_epoch = epoch_from_block_number(
366 last_proposal_block + (new_version_first_view - last_proposal_view) + 10,
367 self.epoch_height,
368 );
369
370 target_start_epoch == last_proposal_epoch
371 && last_proposal_epoch == upgrade_finish_epoch
372 } else {
373 true
374 };
375
376 if view >= self.start_proposing_view
378 && view < self.stop_proposing_view
379 && time >= self.start_proposing_time
380 && time < self.stop_proposing_time
381 && !self.upgraded().await
382 && epoch_upgrade_checks
383 && leader == self.public_key
384 {
385 let upgrade_proposal_data = UpgradeProposalData {
386 old_version: V::Base::VERSION,
387 new_version: V::Upgrade::VERSION,
388 new_version_hash: V::UPGRADE_HASH.to_vec(),
389 old_version_last_view: TYPES::View::new(old_version_last_view),
390 new_version_first_view: TYPES::View::new(new_version_first_view),
391 decide_by: TYPES::View::new(decide_by),
392 };
393
394 let upgrade_proposal = UpgradeProposal {
395 upgrade_proposal: upgrade_proposal_data.clone(),
396 view_number: TYPES::View::new(
397 view + TYPES::UPGRADE_CONSTANTS.propose_offset,
398 ),
399 };
400
401 let signature = TYPES::SignatureKey::sign(
402 &self.private_key,
403 upgrade_proposal_data.commit().as_ref(),
404 )
405 .expect("Failed to sign upgrade proposal commitment!");
406
407 tracing::warn!("Sending upgrade proposal:\n\n {upgrade_proposal:?}");
408
409 let message = Proposal {
410 data: upgrade_proposal,
411 signature,
412 _pd: PhantomData,
413 };
414
415 broadcast_event(
416 Arc::new(HotShotEvent::UpgradeProposalSend(
417 message,
418 self.public_key.clone(),
419 )),
420 &tx,
421 )
422 .await;
423 }
424 },
425 _ => {},
426 }
427 Ok(())
428 }
429}
430
431#[async_trait]
432impl<TYPES: NodeType, V: Versions> TaskState for UpgradeTaskState<TYPES, V> {
434 type Event = HotShotEvent<TYPES>;
435
436 async fn handle_event(
437 &mut self,
438 event: Arc<Self::Event>,
439 sender: &Sender<Arc<Self::Event>>,
440 _receiver: &Receiver<Arc<Self::Event>>,
441 ) -> Result<()> {
442 self.handle(event, sender.clone()).await?;
443
444 Ok(())
445 }
446
447 fn cancel_subtasks(&mut self) {}
448}