hotshot_task_impls/
upgrade.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{marker::PhantomData, sync::Arc, time::SystemTime};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14    consensus::OuterConsensus,
15    data::UpgradeProposal,
16    epoch_membership::EpochMembershipCoordinator,
17    event::{Event, EventType},
18    message::{Proposal, UpgradeLock},
19    simple_certificate::UpgradeCertificate,
20    simple_vote::{UpgradeProposalData, UpgradeVote},
21    traits::{
22        block_contents::BlockHeader,
23        node_implementation::{ConsensusTime, NodeType, Versions},
24        signature_key::SignatureKey,
25    },
26    utils::{epoch_from_block_number, EpochTransitionIndicator},
27    vote::HasViewNumber,
28};
29use hotshot_utils::anytrace::*;
30use tracing::instrument;
31use vbs::version::StaticVersionType;
32
33use crate::{
34    events::HotShotEvent,
35    helpers::broadcast_event,
36    vote_collection::{handle_vote, VoteCollectorsMap},
37};
38
39/// Tracks state of an upgrade task
40pub struct UpgradeTaskState<TYPES: NodeType, V: Versions> {
41    /// Output events to application
42    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
43
44    /// View number this view is executing in.
45    pub cur_view: TYPES::View,
46
47    /// Epoch number this node is executing in.
48    pub cur_epoch: Option<TYPES::Epoch>,
49
50    /// Membership for Quorum Certs/votes
51    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
52
53    /// A map of `UpgradeVote` collector tasks
54    pub vote_collectors: VoteCollectorsMap<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>, V>,
55
56    /// This Nodes public key
57    pub public_key: TYPES::SignatureKey,
58
59    /// This Nodes private key
60    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
61
62    /// This state's ID
63    pub id: u64,
64
65    /// Target block for the epoch upgrade
66    pub epoch_start_block: u64,
67
68    /// View to start proposing an upgrade
69    pub start_proposing_view: u64,
70
71    /// View to stop proposing an upgrade
72    pub stop_proposing_view: u64,
73
74    /// View to start voting on an upgrade
75    pub start_voting_view: u64,
76
77    /// View to stop voting on an upgrade
78    pub stop_voting_view: u64,
79
80    /// Unix time in seconds at which we start proposing an upgrade
81    pub start_proposing_time: u64,
82
83    /// Unix time in seconds at which we stop proposing an upgrade
84    pub stop_proposing_time: u64,
85
86    /// Unix time in seconds at which we start voting on an upgrade
87    pub start_voting_time: u64,
88
89    /// Unix time in seconds at which we stop voting on an upgrade
90    pub stop_voting_time: u64,
91
92    /// Lock for a decided upgrade
93    pub upgrade_lock: UpgradeLock<TYPES, V>,
94
95    /// Reference to consensus. The replica will require a write lock on this.
96    pub consensus: OuterConsensus<TYPES>,
97
98    /// Number of blocks in an epoch, zero means there are no epochs
99    pub epoch_height: u64,
100}
101
102impl<TYPES: NodeType, V: Versions> UpgradeTaskState<TYPES, V> {
103    /// Check if we have decided on an upgrade certificate
104    async fn upgraded(&self) -> bool {
105        self.upgrade_lock
106            .decided_upgrade_certificate
107            .read()
108            .await
109            .is_some()
110    }
111
112    /// main task event handler
113    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Upgrade Task", level = "error")]
114    pub async fn handle(
115        &mut self,
116        event: Arc<HotShotEvent<TYPES>>,
117        tx: Sender<Arc<HotShotEvent<TYPES>>>,
118    ) -> Result<()> {
119        match event.as_ref() {
120            HotShotEvent::UpgradeProposalRecv(proposal, sender) => {
121                tracing::info!("Received upgrade proposal: {proposal:?}");
122
123                let view = *proposal.data.view_number();
124
125                // Skip voting if the version has already been upgraded.
126                ensure!(
127                    !self.upgraded().await,
128                    info!("Already upgraded to {:?}; not voting.", V::Upgrade::VERSION)
129                );
130
131                let time = SystemTime::now()
132                    .duration_since(SystemTime::UNIX_EPOCH)
133                    .wrap()
134                    .context(error!(
135                        "Failed to calculate duration. This should never happen."
136                    ))?
137                    .as_secs();
138
139                ensure!(
140                    time >= self.start_voting_time && time < self.stop_voting_time,
141                    "Refusing to vote because we are no longer in the configured vote time window."
142                );
143
144                ensure!(
145                    view >= self.start_voting_view && view < self.stop_voting_view,
146                    "Refusing to vote because we are no longer in the configured vote view window."
147                );
148
149                // If the proposal does not match our upgrade target, we immediately exit.
150                ensure!(
151                    proposal.data.upgrade_proposal.new_version_hash == V::UPGRADE_HASH
152                        && proposal.data.upgrade_proposal.old_version == V::Base::VERSION
153                        && proposal.data.upgrade_proposal.new_version == V::Upgrade::VERSION,
154                    "Proposal does not match our upgrade target"
155                );
156
157                // If we have an upgrade target, we validate that the proposal is relevant for the current view.
158                tracing::info!(
159                    "Upgrade proposal received for view: {:?}",
160                    proposal.data.view_number()
161                );
162
163                let epoch_upgrade_checks = if V::Upgrade::VERSION == V::Epochs::VERSION {
164                    let consensus_reader = self.consensus.read().await;
165
166                    let Some((_, last_proposal)) =
167                        consensus_reader.last_proposals().last_key_value()
168                    else {
169                        tracing::error!(
170                            "No recent quorum proposals in consensus state -- skipping upgrade \
171                             proposal vote."
172                        );
173                        return Err(error!(
174                            "No recent quorum proposals in consensus state -- skipping upgrade \
175                             proposal vote."
176                        ));
177                    };
178
179                    let last_proposal_view: u64 = *last_proposal.data.view_number();
180                    let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
181
182                    drop(consensus_reader);
183
184                    let target_start_epoch =
185                        epoch_from_block_number(self.epoch_start_block, self.epoch_height);
186                    let last_proposal_epoch =
187                        epoch_from_block_number(last_proposal_block, self.epoch_height);
188                    let upgrade_finish_epoch = epoch_from_block_number(
189                        last_proposal_block
190                            + (*proposal.data.upgrade_proposal.new_version_first_view
191                                - last_proposal_view)
192                            + 10,
193                        self.epoch_height,
194                    );
195
196                    target_start_epoch == last_proposal_epoch
197                        && last_proposal_epoch == upgrade_finish_epoch
198                } else {
199                    true
200                };
201
202                ensure!(
203                    epoch_upgrade_checks,
204                    error!("Epoch upgrade safety check failed! Refusing to vote on upgrade.")
205                );
206
207                let view = proposal.data.view_number();
208
209                // At this point, we could choose to validate
210                // that the proposal was issued by the correct leader
211                // for the indicated view.
212                //
213                // We choose not to, because we don't gain that much from it.
214                // The certificate itself is only useful to the leader for that view anyway,
215                // and from the node's perspective it doesn't matter who the sender is.
216                // All we'd save is the cost of signing the vote, and we'd lose some flexibility.
217
218                // Allow an upgrade proposal that is one view older, in case we have voted on a quorum
219                // proposal and updated the view.
220                // `self.cur_view` should be at least 1 since there is a view change before getting
221                // the `UpgradeProposalRecv` event. Otherwise, the view number subtraction below will
222                // cause an overflow error.
223                // TODO Come back to this - we probably don't need this, but we should also never receive a UpgradeCertificate where this fails, investigate block ready so it doesn't make one for the genesis block
224                ensure!(
225                    self.cur_view != TYPES::View::genesis()
226                        && *view >= self.cur_view.saturating_sub(1),
227                    warn!(
228                        "Discarding old upgrade proposal; the proposal is for view {view}, but \
229                         the current view is {}.",
230                        self.cur_view
231                    )
232                );
233
234                // We then validate that the proposal was issued by the leader for the view.
235                let view_leader_key = self
236                    .membership_coordinator
237                    .membership_for_epoch(self.cur_epoch)
238                    .await?
239                    .leader(view)
240                    .await?;
241                ensure!(
242                    view_leader_key == *sender,
243                    info!(
244                        "Upgrade proposal doesn't have expected leader key for view {} \n Upgrade \
245                         proposal is: {:?}",
246                        *view, proposal.data
247                    )
248                );
249
250                // At this point, we've checked that:
251                //   * the proposal was expected,
252                //   * the proposal is valid, and
253                // so we notify the application layer
254                broadcast_event(
255                    Event {
256                        view_number: self.cur_view,
257                        event: EventType::UpgradeProposal {
258                            proposal: proposal.clone(),
259                            sender: sender.clone(),
260                        },
261                    },
262                    &self.output_event_stream,
263                )
264                .await;
265
266                // If everything is fine up to here, we generate and send a vote on the proposal.
267                let vote = UpgradeVote::create_signed_vote(
268                    proposal.data.upgrade_proposal.clone(),
269                    view,
270                    &self.public_key,
271                    &self.private_key,
272                    &self.upgrade_lock,
273                )
274                .await?;
275
276                tracing::debug!("Sending upgrade vote {:?}", vote.view_number());
277                broadcast_event(Arc::new(HotShotEvent::UpgradeVoteSend(vote)), &tx).await;
278            },
279            HotShotEvent::UpgradeVoteRecv(ref vote) => {
280                tracing::debug!("Upgrade vote recv, Main Task {:?}", vote.view_number());
281
282                // Check if we are the leader.
283                let view = vote.view_number();
284                let epoch_membership = self
285                    .membership_coordinator
286                    .membership_for_epoch(self.cur_epoch)
287                    .await?;
288                ensure!(
289                    epoch_membership.leader(view).await? == self.public_key,
290                    debug!(
291                        "We are not the leader for view {} are we leader for next view? {}",
292                        *view,
293                        epoch_membership.leader(view + 1).await? == self.public_key
294                    )
295                );
296
297                handle_vote(
298                    &mut self.vote_collectors,
299                    vote,
300                    self.public_key.clone(),
301                    &epoch_membership,
302                    self.id,
303                    &event,
304                    &tx,
305                    &self.upgrade_lock,
306                    EpochTransitionIndicator::NotInTransition,
307                )
308                .await?;
309            },
310            HotShotEvent::ViewChange(new_view, epoch_number) => {
311                if *epoch_number > self.cur_epoch {
312                    self.cur_epoch = *epoch_number;
313                }
314                ensure!(self.cur_view < *new_view || *self.cur_view == 0);
315
316                self.cur_view = *new_view;
317
318                let view: u64 = *self.cur_view;
319                let time = SystemTime::now()
320                    .duration_since(SystemTime::UNIX_EPOCH)
321                    .wrap()
322                    .context(error!(
323                        "Failed to calculate duration. This should never happen."
324                    ))?
325                    .as_secs();
326
327                let leader = self
328                    .membership_coordinator
329                    .membership_for_epoch(self.cur_epoch)
330                    .await?
331                    .leader(TYPES::View::new(
332                        view + TYPES::UPGRADE_CONSTANTS.propose_offset,
333                    ))
334                    .await?;
335
336                let old_version_last_view = view + TYPES::UPGRADE_CONSTANTS.begin_offset;
337                let new_version_first_view = view + TYPES::UPGRADE_CONSTANTS.finish_offset;
338                let decide_by = view + TYPES::UPGRADE_CONSTANTS.decide_by_offset;
339
340                let epoch_upgrade_checks = if V::Upgrade::VERSION == V::Epochs::VERSION {
341                    let consensus_reader = self.consensus.read().await;
342
343                    let Some((_, last_proposal)) =
344                        consensus_reader.last_proposals().last_key_value()
345                    else {
346                        tracing::error!(
347                            "No recent quorum proposals in consensus state -- skipping upgrade \
348                             proposal."
349                        );
350                        return Err(error!(
351                            "No recent quorum proposals in consensus state -- skipping upgrade \
352                             proposal."
353                        ));
354                    };
355
356                    let last_proposal_view: u64 = *last_proposal.data.view_number();
357                    let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
358
359                    drop(consensus_reader);
360
361                    let target_start_epoch =
362                        epoch_from_block_number(self.epoch_start_block, self.epoch_height);
363                    let last_proposal_epoch =
364                        epoch_from_block_number(last_proposal_block, self.epoch_height);
365                    let upgrade_finish_epoch = epoch_from_block_number(
366                        last_proposal_block + (new_version_first_view - last_proposal_view) + 10,
367                        self.epoch_height,
368                    );
369
370                    target_start_epoch == last_proposal_epoch
371                        && last_proposal_epoch == upgrade_finish_epoch
372                } else {
373                    true
374                };
375
376                // We try to form a certificate 5 views before we're leader.
377                if view >= self.start_proposing_view
378                    && view < self.stop_proposing_view
379                    && time >= self.start_proposing_time
380                    && time < self.stop_proposing_time
381                    && !self.upgraded().await
382                    && epoch_upgrade_checks
383                    && leader == self.public_key
384                {
385                    let upgrade_proposal_data = UpgradeProposalData {
386                        old_version: V::Base::VERSION,
387                        new_version: V::Upgrade::VERSION,
388                        new_version_hash: V::UPGRADE_HASH.to_vec(),
389                        old_version_last_view: TYPES::View::new(old_version_last_view),
390                        new_version_first_view: TYPES::View::new(new_version_first_view),
391                        decide_by: TYPES::View::new(decide_by),
392                    };
393
394                    let upgrade_proposal = UpgradeProposal {
395                        upgrade_proposal: upgrade_proposal_data.clone(),
396                        view_number: TYPES::View::new(
397                            view + TYPES::UPGRADE_CONSTANTS.propose_offset,
398                        ),
399                    };
400
401                    let signature = TYPES::SignatureKey::sign(
402                        &self.private_key,
403                        upgrade_proposal_data.commit().as_ref(),
404                    )
405                    .expect("Failed to sign upgrade proposal commitment!");
406
407                    tracing::warn!("Sending upgrade proposal:\n\n {upgrade_proposal:?}");
408
409                    let message = Proposal {
410                        data: upgrade_proposal,
411                        signature,
412                        _pd: PhantomData,
413                    };
414
415                    broadcast_event(
416                        Arc::new(HotShotEvent::UpgradeProposalSend(
417                            message,
418                            self.public_key.clone(),
419                        )),
420                        &tx,
421                    )
422                    .await;
423                }
424            },
425            _ => {},
426        }
427        Ok(())
428    }
429}
430
431#[async_trait]
432/// task state implementation for the upgrade task
433impl<TYPES: NodeType, V: Versions> TaskState for UpgradeTaskState<TYPES, V> {
434    type Event = HotShotEvent<TYPES>;
435
436    async fn handle_event(
437        &mut self,
438        event: Arc<Self::Event>,
439        sender: &Sender<Arc<Self::Event>>,
440        _receiver: &Receiver<Arc<Self::Event>>,
441    ) -> Result<()> {
442        self.handle(event, sender.clone()).await?;
443
444        Ok(())
445    }
446
447    fn cancel_subtasks(&mut self) {}
448}