hotshot_task_impls/
upgrade.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{marker::PhantomData, sync::Arc, time::SystemTime};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14    consensus::OuterConsensus,
15    data::{EpochNumber, UpgradeProposal, ViewNumber},
16    epoch_membership::EpochMembershipCoordinator,
17    event::{Event, EventType},
18    message::{Proposal, UpgradeLock},
19    simple_certificate::UpgradeCertificate,
20    simple_vote::{UpgradeProposalData, UpgradeVote},
21    traits::{
22        block_contents::BlockHeader, node_implementation::NodeType, signature_key::SignatureKey,
23    },
24    utils::{EpochTransitionIndicator, epoch_from_block_number},
25    vote::HasViewNumber,
26};
27use hotshot_utils::anytrace::*;
28use tracing::instrument;
29use versions::EPOCH_VERSION;
30
31use crate::{
32    events::HotShotEvent,
33    helpers::broadcast_event,
34    vote_collection::{VoteCollectorsMap, handle_vote},
35};
36
37/// Tracks state of an upgrade task
38pub struct UpgradeTaskState<TYPES: NodeType> {
39    /// Output events to application
40    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
41
42    /// View number this view is executing in.
43    pub cur_view: ViewNumber,
44
45    /// Epoch number this node is executing in.
46    pub cur_epoch: Option<EpochNumber>,
47
48    /// Membership for Quorum Certs/votes
49    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
50
51    /// A map of `UpgradeVote` collector tasks
52    pub vote_collectors: VoteCollectorsMap<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>,
53
54    /// This Nodes public key
55    pub public_key: TYPES::SignatureKey,
56
57    /// This Nodes private key
58    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
59
60    /// This state's ID
61    pub id: u64,
62
63    /// Target block for the epoch upgrade
64    pub epoch_start_block: u64,
65
66    /// View to start proposing an upgrade
67    pub start_proposing_view: u64,
68
69    /// View to stop proposing an upgrade
70    pub stop_proposing_view: u64,
71
72    /// View to start voting on an upgrade
73    pub start_voting_view: u64,
74
75    /// View to stop voting on an upgrade
76    pub stop_voting_view: u64,
77
78    /// Unix time in seconds at which we start proposing an upgrade
79    pub start_proposing_time: u64,
80
81    /// Unix time in seconds at which we stop proposing an upgrade
82    pub stop_proposing_time: u64,
83
84    /// Unix time in seconds at which we start voting on an upgrade
85    pub start_voting_time: u64,
86
87    /// Unix time in seconds at which we stop voting on an upgrade
88    pub stop_voting_time: u64,
89
90    /// Lock for a decided upgrade
91    pub upgrade_lock: UpgradeLock<TYPES>,
92
93    /// Reference to consensus. The replica will require a write lock on this.
94    pub consensus: OuterConsensus<TYPES>,
95
96    /// Number of blocks in an epoch, zero means there are no epochs
97    pub epoch_height: u64,
98}
99
100impl<TYPES: NodeType> UpgradeTaskState<TYPES> {
101    /// Check if we have decided on an upgrade certificate
102    async fn upgraded(&self) -> bool {
103        self.upgrade_lock.decided_upgrade_cert().is_some()
104    }
105
106    /// main task event handler
107    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Upgrade Task", level = "error")]
108    pub async fn handle(
109        &mut self,
110        event: Arc<HotShotEvent<TYPES>>,
111        tx: Sender<Arc<HotShotEvent<TYPES>>>,
112    ) -> Result<()> {
113        let upgrade = self.upgrade_lock.upgrade();
114        match event.as_ref() {
115            HotShotEvent::UpgradeProposalRecv(proposal, sender) => {
116                tracing::info!("Received upgrade proposal: {proposal:?}");
117
118                let view = *proposal.data.view_number();
119
120                // Skip voting if the version has already been upgraded.
121                ensure!(
122                    !self.upgraded().await,
123                    info!("Already upgraded to {upgrade:?}; not voting.")
124                );
125
126                let time = SystemTime::now()
127                    .duration_since(SystemTime::UNIX_EPOCH)
128                    .wrap()
129                    .context(error!(
130                        "Failed to calculate duration. This should never happen."
131                    ))?
132                    .as_secs();
133
134                ensure!(
135                    time >= self.start_voting_time && time < self.stop_voting_time,
136                    "Refusing to vote because we are no longer in the configured vote time window."
137                );
138
139                ensure!(
140                    view >= self.start_voting_view && view < self.stop_voting_view,
141                    "Refusing to vote because we are no longer in the configured vote view window."
142                );
143
144                // If the proposal does not match our upgrade target, we immediately exit.
145                ensure!(
146                    proposal.data.upgrade_proposal.new_version_hash == *upgrade.hash()
147                        && proposal.data.upgrade_proposal.old_version == upgrade.base
148                        && proposal.data.upgrade_proposal.new_version == upgrade.target,
149                    "Proposal does not match our upgrade target"
150                );
151
152                // If we have an upgrade target, we validate that the proposal is relevant for the current view.
153                tracing::info!(
154                    "Upgrade proposal received for view: {:?}",
155                    proposal.data.view_number()
156                );
157
158                let epoch_upgrade_checks = if upgrade.target >= EPOCH_VERSION
159                    && upgrade.base < EPOCH_VERSION
160                {
161                    let consensus_reader = self.consensus.read().await;
162
163                    let Some((_, last_proposal)) =
164                        consensus_reader.last_proposals().last_key_value()
165                    else {
166                        tracing::error!(
167                            "No recent quorum proposals in consensus state -- skipping upgrade \
168                             proposal vote."
169                        );
170                        return Err(error!(
171                            "No recent quorum proposals in consensus state -- skipping upgrade \
172                             proposal vote."
173                        ));
174                    };
175
176                    let last_proposal_view: u64 = *last_proposal.data.view_number();
177                    let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
178
179                    drop(consensus_reader);
180
181                    let target_start_epoch =
182                        epoch_from_block_number(self.epoch_start_block, self.epoch_height);
183                    let last_proposal_epoch =
184                        epoch_from_block_number(last_proposal_block, self.epoch_height);
185                    let upgrade_finish_epoch = epoch_from_block_number(
186                        last_proposal_block
187                            + (*proposal.data.upgrade_proposal.new_version_first_view
188                                - last_proposal_view)
189                            + 10,
190                        self.epoch_height,
191                    );
192
193                    target_start_epoch == last_proposal_epoch
194                        && last_proposal_epoch == upgrade_finish_epoch
195                } else {
196                    true
197                };
198
199                ensure!(
200                    epoch_upgrade_checks,
201                    error!("Epoch upgrade safety check failed! Refusing to vote on upgrade.")
202                );
203
204                let view = proposal.data.view_number();
205
206                // At this point, we could choose to validate
207                // that the proposal was issued by the correct leader
208                // for the indicated view.
209                //
210                // We choose not to, because we don't gain that much from it.
211                // The certificate itself is only useful to the leader for that view anyway,
212                // and from the node's perspective it doesn't matter who the sender is.
213                // All we'd save is the cost of signing the vote, and we'd lose some flexibility.
214
215                // Allow an upgrade proposal that is one view older, in case we have voted on a quorum
216                // proposal and updated the view.
217                // `self.cur_view` should be at least 1 since there is a view change before getting
218                // the `UpgradeProposalRecv` event. Otherwise, the view number subtraction below will
219                // cause an overflow error.
220                // TODO Come back to this - we probably don't need this, but we should also never receive a UpgradeCertificate where this fails, investigate block ready so it doesn't make one for the genesis block
221                ensure!(
222                    self.cur_view != ViewNumber::genesis()
223                        && *view >= self.cur_view.saturating_sub(1),
224                    warn!(
225                        "Discarding old upgrade proposal; the proposal is for view {view}, but \
226                         the current view is {}.",
227                        self.cur_view
228                    )
229                );
230
231                // We then validate that the proposal was issued by the leader for the view.
232                let view_leader_key = self
233                    .membership_coordinator
234                    .membership_for_epoch(self.cur_epoch)
235                    .await?
236                    .leader(view)
237                    .await?;
238                ensure!(
239                    view_leader_key == *sender,
240                    info!(
241                        "Upgrade proposal doesn't have expected leader key for view {} \n Upgrade \
242                         proposal is: {:?}",
243                        *view, proposal.data
244                    )
245                );
246
247                // At this point, we've checked that:
248                //   * the proposal was expected,
249                //   * the proposal is valid, and
250                // so we notify the application layer
251                broadcast_event(
252                    Event {
253                        view_number: self.cur_view,
254                        event: EventType::UpgradeProposal {
255                            proposal: proposal.clone(),
256                            sender: sender.clone(),
257                        },
258                    },
259                    &self.output_event_stream,
260                )
261                .await;
262
263                // If everything is fine up to here, we generate and send a vote on the proposal.
264                let vote = UpgradeVote::create_signed_vote(
265                    proposal.data.upgrade_proposal.clone(),
266                    view,
267                    &self.public_key,
268                    &self.private_key,
269                    &self.upgrade_lock,
270                )
271                .await?;
272
273                tracing::debug!("Sending upgrade vote {:?}", vote.view_number());
274                broadcast_event(Arc::new(HotShotEvent::UpgradeVoteSend(vote)), &tx).await;
275            },
276            HotShotEvent::UpgradeVoteRecv(vote) => {
277                tracing::debug!("Upgrade vote recv, Main Task {:?}", vote.view_number());
278
279                // Check if we are the leader.
280                let view = vote.view_number();
281                let epoch_membership = self
282                    .membership_coordinator
283                    .membership_for_epoch(self.cur_epoch)
284                    .await?;
285                ensure!(
286                    epoch_membership.leader(view).await? == self.public_key,
287                    debug!(
288                        "We are not the leader for view {} are we leader for next view? {}",
289                        *view,
290                        epoch_membership.leader(view + 1).await? == self.public_key
291                    )
292                );
293
294                handle_vote(
295                    &mut self.vote_collectors,
296                    vote,
297                    self.public_key.clone(),
298                    &epoch_membership,
299                    self.id,
300                    &event,
301                    &tx,
302                    &self.upgrade_lock,
303                    EpochTransitionIndicator::NotInTransition,
304                )
305                .await?;
306            },
307            HotShotEvent::ViewChange(new_view, epoch_number) => {
308                if *epoch_number > self.cur_epoch {
309                    self.cur_epoch = *epoch_number;
310                }
311                ensure!(self.cur_view < *new_view || *self.cur_view == 0);
312
313                self.cur_view = *new_view;
314
315                let view: u64 = *self.cur_view;
316                let time = SystemTime::now()
317                    .duration_since(SystemTime::UNIX_EPOCH)
318                    .wrap()
319                    .context(error!(
320                        "Failed to calculate duration. This should never happen."
321                    ))?
322                    .as_secs();
323
324                let leader = self
325                    .membership_coordinator
326                    .membership_for_epoch(self.cur_epoch)
327                    .await?
328                    .leader(ViewNumber::new(
329                        view + TYPES::UPGRADE_CONSTANTS.propose_offset,
330                    ))
331                    .await?;
332
333                let old_version_last_view = view + TYPES::UPGRADE_CONSTANTS.begin_offset;
334                let new_version_first_view = view + TYPES::UPGRADE_CONSTANTS.finish_offset;
335                let decide_by = view + TYPES::UPGRADE_CONSTANTS.decide_by_offset;
336
337                let epoch_upgrade_checks = if upgrade.target >= EPOCH_VERSION
338                    && upgrade.base < EPOCH_VERSION
339                {
340                    let consensus_reader = self.consensus.read().await;
341
342                    let Some((_, last_proposal)) =
343                        consensus_reader.last_proposals().last_key_value()
344                    else {
345                        tracing::error!(
346                            "No recent quorum proposals in consensus state -- skipping upgrade \
347                             proposal."
348                        );
349                        return Err(error!(
350                            "No recent quorum proposals in consensus state -- skipping upgrade \
351                             proposal."
352                        ));
353                    };
354
355                    let last_proposal_view: u64 = *last_proposal.data.view_number();
356                    let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
357
358                    drop(consensus_reader);
359
360                    let target_start_epoch =
361                        epoch_from_block_number(self.epoch_start_block, self.epoch_height);
362                    let last_proposal_epoch =
363                        epoch_from_block_number(last_proposal_block, self.epoch_height);
364                    let upgrade_finish_epoch = epoch_from_block_number(
365                        last_proposal_block + (new_version_first_view - last_proposal_view) + 10,
366                        self.epoch_height,
367                    );
368
369                    target_start_epoch == last_proposal_epoch
370                        && last_proposal_epoch == upgrade_finish_epoch
371                } else {
372                    true
373                };
374
375                // We try to form a certificate 5 views before we're leader.
376                if view >= self.start_proposing_view
377                    && view < self.stop_proposing_view
378                    && time >= self.start_proposing_time
379                    && time < self.stop_proposing_time
380                    && !self.upgraded().await
381                    && epoch_upgrade_checks
382                    && leader == self.public_key
383                {
384                    let upgrade_proposal_data = UpgradeProposalData {
385                        old_version: upgrade.base,
386                        new_version: upgrade.target,
387                        new_version_hash: upgrade.hash().into(),
388                        old_version_last_view: ViewNumber::new(old_version_last_view),
389                        new_version_first_view: ViewNumber::new(new_version_first_view),
390                        decide_by: ViewNumber::new(decide_by),
391                    };
392
393                    let upgrade_proposal = UpgradeProposal {
394                        upgrade_proposal: upgrade_proposal_data.clone(),
395                        view_number: ViewNumber::new(
396                            view + TYPES::UPGRADE_CONSTANTS.propose_offset,
397                        ),
398                    };
399
400                    let signature = TYPES::SignatureKey::sign(
401                        &self.private_key,
402                        upgrade_proposal_data.commit().as_ref(),
403                    )
404                    .expect("Failed to sign upgrade proposal commitment!");
405
406                    tracing::warn!("Sending upgrade proposal:\n\n {upgrade_proposal:?}");
407
408                    let message = Proposal {
409                        data: upgrade_proposal,
410                        signature,
411                        _pd: PhantomData,
412                    };
413
414                    broadcast_event(
415                        Arc::new(HotShotEvent::UpgradeProposalSend(
416                            message,
417                            self.public_key.clone(),
418                        )),
419                        &tx,
420                    )
421                    .await;
422                }
423            },
424            _ => {},
425        }
426        Ok(())
427    }
428}
429
430/// task state implementation for the upgrade task
431#[async_trait]
432impl<TYPES: NodeType> TaskState for UpgradeTaskState<TYPES> {
433    type Event = HotShotEvent<TYPES>;
434
435    async fn handle_event(
436        &mut self,
437        event: Arc<Self::Event>,
438        sender: &Sender<Arc<Self::Event>>,
439        _receiver: &Receiver<Arc<Self::Event>>,
440    ) -> Result<()> {
441        self.handle(event, sender.clone()).await?;
442
443        Ok(())
444    }
445
446    fn cancel_subtasks(&mut self) {}
447}