hotshot_task_impls/
upgrade.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{marker::PhantomData, sync::Arc, time::SystemTime};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14    consensus::OuterConsensus,
15    data::UpgradeProposal,
16    epoch_membership::EpochMembershipCoordinator,
17    event::{Event, EventType},
18    message::{Proposal, UpgradeLock},
19    simple_certificate::UpgradeCertificate,
20    simple_vote::{UpgradeProposalData, UpgradeVote},
21    traits::{
22        block_contents::BlockHeader,
23        node_implementation::{ConsensusTime, NodeType, Versions},
24        signature_key::SignatureKey,
25    },
26    utils::{epoch_from_block_number, EpochTransitionIndicator},
27    vote::HasViewNumber,
28};
29use hotshot_utils::anytrace::*;
30use tracing::instrument;
31use vbs::version::StaticVersionType;
32
33use crate::{
34    events::HotShotEvent,
35    helpers::broadcast_event,
36    vote_collection::{handle_vote, VoteCollectorsMap},
37};
38
39/// Tracks state of an upgrade task
40pub struct UpgradeTaskState<TYPES: NodeType, V: Versions> {
41    /// Output events to application
42    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
43
44    /// View number this view is executing in.
45    pub cur_view: TYPES::View,
46
47    /// Epoch number this node is executing in.
48    pub cur_epoch: Option<TYPES::Epoch>,
49
50    /// Membership for Quorum Certs/votes
51    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
52
53    /// A map of `UpgradeVote` collector tasks
54    pub vote_collectors: VoteCollectorsMap<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>, V>,
55
56    /// This Nodes public key
57    pub public_key: TYPES::SignatureKey,
58
59    /// This Nodes private key
60    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
61
62    /// This state's ID
63    pub id: u64,
64
65    /// Target block for the epoch upgrade
66    pub epoch_start_block: u64,
67
68    /// View to start proposing an upgrade
69    pub start_proposing_view: u64,
70
71    /// View to stop proposing an upgrade
72    pub stop_proposing_view: u64,
73
74    /// View to start voting on an upgrade
75    pub start_voting_view: u64,
76
77    /// View to stop voting on an upgrade
78    pub stop_voting_view: u64,
79
80    /// Unix time in seconds at which we start proposing an upgrade
81    pub start_proposing_time: u64,
82
83    /// Unix time in seconds at which we stop proposing an upgrade
84    pub stop_proposing_time: u64,
85
86    /// Unix time in seconds at which we start voting on an upgrade
87    pub start_voting_time: u64,
88
89    /// Unix time in seconds at which we stop voting on an upgrade
90    pub stop_voting_time: u64,
91
92    /// Lock for a decided upgrade
93    pub upgrade_lock: UpgradeLock<TYPES, V>,
94
95    /// Reference to consensus. The replica will require a write lock on this.
96    pub consensus: OuterConsensus<TYPES>,
97
98    /// Number of blocks in an epoch, zero means there are no epochs
99    pub epoch_height: u64,
100}
101
102impl<TYPES: NodeType, V: Versions> UpgradeTaskState<TYPES, V> {
103    /// Check if we have decided on an upgrade certificate
104    async fn upgraded(&self) -> bool {
105        self.upgrade_lock
106            .decided_upgrade_certificate
107            .read()
108            .await
109            .is_some()
110    }
111
112    /// main task event handler
113    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Upgrade Task", level = "error")]
114    pub async fn handle(
115        &mut self,
116        event: Arc<HotShotEvent<TYPES>>,
117        tx: Sender<Arc<HotShotEvent<TYPES>>>,
118    ) -> Result<()> {
119        match event.as_ref() {
120            HotShotEvent::UpgradeProposalRecv(proposal, sender) => {
121                tracing::info!("Received upgrade proposal: {proposal:?}");
122
123                let view = *proposal.data.view_number();
124
125                // Skip voting if the version has already been upgraded.
126                ensure!(
127                    !self.upgraded().await,
128                    info!("Already upgraded to {:?}; not voting.", V::Upgrade::VERSION)
129                );
130
131                let time = SystemTime::now()
132                    .duration_since(SystemTime::UNIX_EPOCH)
133                    .wrap()
134                    .context(error!(
135                        "Failed to calculate duration. This should never happen."
136                    ))?
137                    .as_secs();
138
139                ensure!(
140                    time >= self.start_voting_time && time < self.stop_voting_time,
141                    "Refusing to vote because we are no longer in the configured vote time window."
142                );
143
144                ensure!(
145                    view >= self.start_voting_view && view < self.stop_voting_view,
146                    "Refusing to vote because we are no longer in the configured vote view window."
147                );
148
149                // If the proposal does not match our upgrade target, we immediately exit.
150                ensure!(
151                    proposal.data.upgrade_proposal.new_version_hash == V::UPGRADE_HASH
152                        && proposal.data.upgrade_proposal.old_version == V::Base::VERSION
153                        && proposal.data.upgrade_proposal.new_version == V::Upgrade::VERSION,
154                    "Proposal does not match our upgrade target"
155                );
156
157                // If we have an upgrade target, we validate that the proposal is relevant for the current view.
158                tracing::info!(
159                    "Upgrade proposal received for view: {:?}",
160                    proposal.data.view_number()
161                );
162
163                let epoch_upgrade_checks = if V::Upgrade::VERSION >= V::Epochs::VERSION
164                    && V::Base::VERSION < V::Epochs::VERSION
165                {
166                    let consensus_reader = self.consensus.read().await;
167
168                    let Some((_, last_proposal)) =
169                        consensus_reader.last_proposals().last_key_value()
170                    else {
171                        tracing::error!(
172                            "No recent quorum proposals in consensus state -- skipping upgrade \
173                             proposal vote."
174                        );
175                        return Err(error!(
176                            "No recent quorum proposals in consensus state -- skipping upgrade \
177                             proposal vote."
178                        ));
179                    };
180
181                    let last_proposal_view: u64 = *last_proposal.data.view_number();
182                    let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
183
184                    drop(consensus_reader);
185
186                    let target_start_epoch =
187                        epoch_from_block_number(self.epoch_start_block, self.epoch_height);
188                    let last_proposal_epoch =
189                        epoch_from_block_number(last_proposal_block, self.epoch_height);
190                    let upgrade_finish_epoch = epoch_from_block_number(
191                        last_proposal_block
192                            + (*proposal.data.upgrade_proposal.new_version_first_view
193                                - last_proposal_view)
194                            + 10,
195                        self.epoch_height,
196                    );
197
198                    target_start_epoch == last_proposal_epoch
199                        && last_proposal_epoch == upgrade_finish_epoch
200                } else {
201                    true
202                };
203
204                ensure!(
205                    epoch_upgrade_checks,
206                    error!("Epoch upgrade safety check failed! Refusing to vote on upgrade.")
207                );
208
209                let view = proposal.data.view_number();
210
211                // At this point, we could choose to validate
212                // that the proposal was issued by the correct leader
213                // for the indicated view.
214                //
215                // We choose not to, because we don't gain that much from it.
216                // The certificate itself is only useful to the leader for that view anyway,
217                // and from the node's perspective it doesn't matter who the sender is.
218                // All we'd save is the cost of signing the vote, and we'd lose some flexibility.
219
220                // Allow an upgrade proposal that is one view older, in case we have voted on a quorum
221                // proposal and updated the view.
222                // `self.cur_view` should be at least 1 since there is a view change before getting
223                // the `UpgradeProposalRecv` event. Otherwise, the view number subtraction below will
224                // cause an overflow error.
225                // TODO Come back to this - we probably don't need this, but we should also never receive a UpgradeCertificate where this fails, investigate block ready so it doesn't make one for the genesis block
226                ensure!(
227                    self.cur_view != TYPES::View::genesis()
228                        && *view >= self.cur_view.saturating_sub(1),
229                    warn!(
230                        "Discarding old upgrade proposal; the proposal is for view {view}, but \
231                         the current view is {}.",
232                        self.cur_view
233                    )
234                );
235
236                // We then validate that the proposal was issued by the leader for the view.
237                let view_leader_key = self
238                    .membership_coordinator
239                    .membership_for_epoch(self.cur_epoch)
240                    .await?
241                    .leader(view)
242                    .await?;
243                ensure!(
244                    view_leader_key == *sender,
245                    info!(
246                        "Upgrade proposal doesn't have expected leader key for view {} \n Upgrade \
247                         proposal is: {:?}",
248                        *view, proposal.data
249                    )
250                );
251
252                // At this point, we've checked that:
253                //   * the proposal was expected,
254                //   * the proposal is valid, and
255                // so we notify the application layer
256                broadcast_event(
257                    Event {
258                        view_number: self.cur_view,
259                        event: EventType::UpgradeProposal {
260                            proposal: proposal.clone(),
261                            sender: sender.clone(),
262                        },
263                    },
264                    &self.output_event_stream,
265                )
266                .await;
267
268                // If everything is fine up to here, we generate and send a vote on the proposal.
269                let vote = UpgradeVote::create_signed_vote(
270                    proposal.data.upgrade_proposal.clone(),
271                    view,
272                    &self.public_key,
273                    &self.private_key,
274                    &self.upgrade_lock,
275                )
276                .await?;
277
278                tracing::debug!("Sending upgrade vote {:?}", vote.view_number());
279                broadcast_event(Arc::new(HotShotEvent::UpgradeVoteSend(vote)), &tx).await;
280            },
281            HotShotEvent::UpgradeVoteRecv(ref vote) => {
282                tracing::debug!("Upgrade vote recv, Main Task {:?}", vote.view_number());
283
284                // Check if we are the leader.
285                let view = vote.view_number();
286                let epoch_membership = self
287                    .membership_coordinator
288                    .membership_for_epoch(self.cur_epoch)
289                    .await?;
290                ensure!(
291                    epoch_membership.leader(view).await? == self.public_key,
292                    debug!(
293                        "We are not the leader for view {} are we leader for next view? {}",
294                        *view,
295                        epoch_membership.leader(view + 1).await? == self.public_key
296                    )
297                );
298
299                handle_vote(
300                    &mut self.vote_collectors,
301                    vote,
302                    self.public_key.clone(),
303                    &epoch_membership,
304                    self.id,
305                    &event,
306                    &tx,
307                    &self.upgrade_lock,
308                    EpochTransitionIndicator::NotInTransition,
309                )
310                .await?;
311            },
312            HotShotEvent::ViewChange(new_view, epoch_number) => {
313                if *epoch_number > self.cur_epoch {
314                    self.cur_epoch = *epoch_number;
315                }
316                ensure!(self.cur_view < *new_view || *self.cur_view == 0);
317
318                self.cur_view = *new_view;
319
320                let view: u64 = *self.cur_view;
321                let time = SystemTime::now()
322                    .duration_since(SystemTime::UNIX_EPOCH)
323                    .wrap()
324                    .context(error!(
325                        "Failed to calculate duration. This should never happen."
326                    ))?
327                    .as_secs();
328
329                let leader = self
330                    .membership_coordinator
331                    .membership_for_epoch(self.cur_epoch)
332                    .await?
333                    .leader(TYPES::View::new(
334                        view + TYPES::UPGRADE_CONSTANTS.propose_offset,
335                    ))
336                    .await?;
337
338                let old_version_last_view = view + TYPES::UPGRADE_CONSTANTS.begin_offset;
339                let new_version_first_view = view + TYPES::UPGRADE_CONSTANTS.finish_offset;
340                let decide_by = view + TYPES::UPGRADE_CONSTANTS.decide_by_offset;
341
342                let epoch_upgrade_checks = if V::Upgrade::VERSION >= V::Epochs::VERSION
343                    && V::Base::VERSION < V::Epochs::VERSION
344                {
345                    let consensus_reader = self.consensus.read().await;
346
347                    let Some((_, last_proposal)) =
348                        consensus_reader.last_proposals().last_key_value()
349                    else {
350                        tracing::error!(
351                            "No recent quorum proposals in consensus state -- skipping upgrade \
352                             proposal."
353                        );
354                        return Err(error!(
355                            "No recent quorum proposals in consensus state -- skipping upgrade \
356                             proposal."
357                        ));
358                    };
359
360                    let last_proposal_view: u64 = *last_proposal.data.view_number();
361                    let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
362
363                    drop(consensus_reader);
364
365                    let target_start_epoch =
366                        epoch_from_block_number(self.epoch_start_block, self.epoch_height);
367                    let last_proposal_epoch =
368                        epoch_from_block_number(last_proposal_block, self.epoch_height);
369                    let upgrade_finish_epoch = epoch_from_block_number(
370                        last_proposal_block + (new_version_first_view - last_proposal_view) + 10,
371                        self.epoch_height,
372                    );
373
374                    target_start_epoch == last_proposal_epoch
375                        && last_proposal_epoch == upgrade_finish_epoch
376                } else {
377                    true
378                };
379
380                // We try to form a certificate 5 views before we're leader.
381                if view >= self.start_proposing_view
382                    && view < self.stop_proposing_view
383                    && time >= self.start_proposing_time
384                    && time < self.stop_proposing_time
385                    && !self.upgraded().await
386                    && epoch_upgrade_checks
387                    && leader == self.public_key
388                {
389                    let upgrade_proposal_data = UpgradeProposalData {
390                        old_version: V::Base::VERSION,
391                        new_version: V::Upgrade::VERSION,
392                        new_version_hash: V::UPGRADE_HASH.to_vec(),
393                        old_version_last_view: TYPES::View::new(old_version_last_view),
394                        new_version_first_view: TYPES::View::new(new_version_first_view),
395                        decide_by: TYPES::View::new(decide_by),
396                    };
397
398                    let upgrade_proposal = UpgradeProposal {
399                        upgrade_proposal: upgrade_proposal_data.clone(),
400                        view_number: TYPES::View::new(
401                            view + TYPES::UPGRADE_CONSTANTS.propose_offset,
402                        ),
403                    };
404
405                    let signature = TYPES::SignatureKey::sign(
406                        &self.private_key,
407                        upgrade_proposal_data.commit().as_ref(),
408                    )
409                    .expect("Failed to sign upgrade proposal commitment!");
410
411                    tracing::warn!("Sending upgrade proposal:\n\n {upgrade_proposal:?}");
412
413                    let message = Proposal {
414                        data: upgrade_proposal,
415                        signature,
416                        _pd: PhantomData,
417                    };
418
419                    broadcast_event(
420                        Arc::new(HotShotEvent::UpgradeProposalSend(
421                            message,
422                            self.public_key.clone(),
423                        )),
424                        &tx,
425                    )
426                    .await;
427                }
428            },
429            _ => {},
430        }
431        Ok(())
432    }
433}
434
435#[async_trait]
436/// task state implementation for the upgrade task
437impl<TYPES: NodeType, V: Versions> TaskState for UpgradeTaskState<TYPES, V> {
438    type Event = HotShotEvent<TYPES>;
439
440    async fn handle_event(
441        &mut self,
442        event: Arc<Self::Event>,
443        sender: &Sender<Arc<Self::Event>>,
444        _receiver: &Receiver<Arc<Self::Event>>,
445    ) -> Result<()> {
446        self.handle(event, sender.clone()).await?;
447
448        Ok(())
449    }
450
451    fn cancel_subtasks(&mut self) {}
452}