1use std::{
13 fmt::{self, Debug},
14 marker::PhantomData,
15 sync::Arc,
16};
17
18use async_lock::RwLock;
19use committable::Committable;
20use hotshot_utils::anytrace::*;
21use serde::{de::DeserializeOwned, Deserialize, Serialize};
22use vbs::{
23 version::{StaticVersion, StaticVersionType, Version},
24 BinarySerializer, Serializer,
25};
26
27pub const EXTERNAL_MESSAGE_VERSION: Version = Version { major: 0, minor: 0 };
29
30use crate::{
31 data::{
32 vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
33 DaProposal, DaProposal2, Leaf, Leaf2, QuorumProposal, QuorumProposal2,
34 QuorumProposalWrapper, UpgradeProposal,
35 },
36 epoch_membership::EpochMembership,
37 request_response::ProposalRequestPayload,
38 simple_certificate::{
39 DaCertificate, DaCertificate2, EpochRootQuorumCertificate, NextEpochQuorumCertificate2,
40 QuorumCertificate2, UpgradeCertificate, ViewSyncCommitCertificate,
41 ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate, ViewSyncFinalizeCertificate2,
42 ViewSyncPreCommitCertificate, ViewSyncPreCommitCertificate2,
43 },
44 simple_vote::{
45 DaVote, DaVote2, EpochRootQuorumVote, HasEpoch, QuorumVote, QuorumVote2, TimeoutVote,
46 TimeoutVote2, UpgradeVote, ViewSyncCommitVote, ViewSyncCommitVote2, ViewSyncFinalizeVote,
47 ViewSyncFinalizeVote2, ViewSyncPreCommitVote, ViewSyncPreCommitVote2,
48 },
49 traits::{
50 election::Membership,
51 network::{DataRequest, ResponseMessage, ViewMessage},
52 node_implementation::{ConsensusTime, NodeType, Versions},
53 signature_key::SignatureKey,
54 },
55 utils::mnemonic,
56 vote::HasViewNumber,
57};
58
59#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
61#[serde(bound(deserialize = "", serialize = ""))]
62pub struct Message<TYPES: NodeType> {
63 pub sender: TYPES::SignatureKey,
65
66 pub kind: MessageKind<TYPES>,
68}
69
70impl<TYPES: NodeType> fmt::Debug for Message<TYPES> {
71 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
72 fmt.debug_struct("Message")
73 .field("sender", &mnemonic(&self.sender))
74 .field("kind", &self.kind)
75 .finish()
76 }
77}
78
79impl<TYPES: NodeType> HasViewNumber<TYPES> for Message<TYPES> {
80 fn view_number(&self) -> TYPES::View {
82 self.kind.view_number()
83 }
84}
85
86#[derive(Clone, Debug)]
88pub struct Messages<TYPES: NodeType>(pub Vec<Message<TYPES>>);
89
90#[derive(PartialEq, Copy, Clone)]
92pub enum MessagePurpose {
93 Proposal,
95 LatestProposal,
97 LatestViewSyncCertificate,
99 Vote,
101 ViewSyncVote,
103 ViewSyncCertificate,
105 DaCertificate,
107 Internal,
109 Data,
111 VidDisperse,
113 UpgradeProposal,
115 UpgradeVote,
117 External,
119}
120
121#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash, Eq)]
125#[serde(bound(deserialize = "", serialize = ""))]
126pub enum MessageKind<TYPES: NodeType> {
127 Consensus(SequencingMessage<TYPES>),
129 Data(DataMessage<TYPES>),
131 External(Vec<u8>),
133}
134
135pub enum RecipientList<K: SignatureKey> {
137 Broadcast,
139 Direct(K),
141 Many(Vec<K>),
143}
144
145impl<TYPES: NodeType> MessageKind<TYPES> {
146 pub fn from_consensus_message(m: SequencingMessage<TYPES>) -> Self {
150 Self::Consensus(m)
151 }
152}
153
154impl<TYPES: NodeType> From<DataMessage<TYPES>> for MessageKind<TYPES> {
155 fn from(m: DataMessage<TYPES>) -> Self {
156 Self::Data(m)
157 }
158}
159
160impl<TYPES: NodeType> ViewMessage<TYPES> for MessageKind<TYPES> {
161 fn view_number(&self) -> TYPES::View {
162 match &self {
163 MessageKind::Consensus(message) => message.view_number(),
164 MessageKind::Data(DataMessage::SubmitTransaction(_, v)) => *v,
165 MessageKind::Data(DataMessage::RequestData(msg)) => msg.view,
166 MessageKind::Data(DataMessage::DataResponse(msg)) => match msg {
167 ResponseMessage::Found(m) => m.view_number(),
168 ResponseMessage::NotFound | ResponseMessage::Denied => TYPES::View::new(1),
169 },
170 MessageKind::External(_) => TYPES::View::new(1),
171 }
172 }
173}
174
175impl<TYPES: NodeType> HasEpoch<TYPES> for MessageKind<TYPES> {
176 fn epoch(&self) -> Option<TYPES::Epoch> {
177 match &self {
178 MessageKind::Consensus(message) => message.epoch_number(),
179 MessageKind::Data(DataMessage::SubmitTransaction(..) | DataMessage::RequestData(_))
180 | MessageKind::External(_) => None,
181 MessageKind::Data(DataMessage::DataResponse(msg)) => match msg {
182 ResponseMessage::Found(m) => m.epoch_number(),
183 ResponseMessage::NotFound | ResponseMessage::Denied => None,
184 },
185 }
186 }
187}
188
189#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
190#[serde(bound(deserialize = "", serialize = ""))]
191pub enum GeneralConsensusMessage<TYPES: NodeType> {
193 Proposal(Proposal<TYPES, QuorumProposal<TYPES>>),
195
196 Vote(QuorumVote<TYPES>),
198
199 ViewSyncPreCommitVote(ViewSyncPreCommitVote<TYPES>),
201
202 ViewSyncCommitVote(ViewSyncCommitVote<TYPES>),
204
205 ViewSyncFinalizeVote(ViewSyncFinalizeVote<TYPES>),
207
208 ViewSyncPreCommitCertificate(ViewSyncPreCommitCertificate<TYPES>),
210
211 ViewSyncCommitCertificate(ViewSyncCommitCertificate<TYPES>),
213
214 ViewSyncFinalizeCertificate(ViewSyncFinalizeCertificate<TYPES>),
216
217 TimeoutVote(TimeoutVote<TYPES>),
219
220 UpgradeProposal(Proposal<TYPES, UpgradeProposal<TYPES>>),
222
223 UpgradeVote(UpgradeVote<TYPES>),
225
226 ProposalRequested(
228 ProposalRequestPayload<TYPES>,
229 <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
230 ),
231
232 ProposalResponse(Proposal<TYPES, QuorumProposal<TYPES>>),
234
235 Proposal2(Proposal<TYPES, QuorumProposal2<TYPES>>),
237
238 Vote2(QuorumVote2<TYPES>),
240
241 EpochRootQuorumVote(EpochRootQuorumVote<TYPES>),
243
244 ProposalResponse2(Proposal<TYPES, QuorumProposal2<TYPES>>),
246
247 HighQc(
249 QuorumCertificate2<TYPES>,
250 Option<NextEpochQuorumCertificate2<TYPES>>,
251 ),
252
253 ExtendedQc(
255 QuorumCertificate2<TYPES>,
256 NextEpochQuorumCertificate2<TYPES>,
257 ),
258
259 EpochRootQc(EpochRootQuorumCertificate<TYPES>),
261
262 ViewSyncPreCommitVote2(ViewSyncPreCommitVote2<TYPES>),
264
265 ViewSyncCommitVote2(ViewSyncCommitVote2<TYPES>),
267
268 ViewSyncFinalizeVote2(ViewSyncFinalizeVote2<TYPES>),
270
271 ViewSyncPreCommitCertificate2(ViewSyncPreCommitCertificate2<TYPES>),
273
274 ViewSyncCommitCertificate2(ViewSyncCommitCertificate2<TYPES>),
276
277 ViewSyncFinalizeCertificate2(ViewSyncFinalizeCertificate2<TYPES>),
279
280 TimeoutVote2(TimeoutVote2<TYPES>),
282}
283
284#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Hash, Eq)]
285#[serde(bound(deserialize = "", serialize = ""))]
286pub enum DaConsensusMessage<TYPES: NodeType> {
288 DaProposal(Proposal<TYPES, DaProposal<TYPES>>),
290
291 DaVote(DaVote<TYPES>),
293
294 DaCertificate(DaCertificate<TYPES>),
296
297 VidDisperseMsg(Proposal<TYPES, ADVZDisperseShare<TYPES>>),
301
302 DaProposal2(Proposal<TYPES, DaProposal2<TYPES>>),
304
305 DaVote2(DaVote2<TYPES>),
307
308 DaCertificate2(DaCertificate2<TYPES>),
310
311 VidDisperseMsg2(Proposal<TYPES, VidDisperseShare2<TYPES>>),
315}
316
317#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)]
319#[serde(bound(deserialize = "", serialize = ""))]
320#[allow(clippy::large_enum_variant)]
321pub enum SequencingMessage<TYPES: NodeType> {
322 General(GeneralConsensusMessage<TYPES>),
324
325 Da(DaConsensusMessage<TYPES>),
327}
328
329impl<TYPES: NodeType> SequencingMessage<TYPES> {
330 fn view_number(&self) -> TYPES::View {
332 match &self {
333 SequencingMessage::General(general_message) => {
334 match general_message {
335 GeneralConsensusMessage::Proposal(p) => {
336 p.data.view_number()
339 },
340 GeneralConsensusMessage::Proposal2(p) => {
341 p.data.view_number()
344 },
345 GeneralConsensusMessage::ProposalRequested(req, _) => req.view_number,
346 GeneralConsensusMessage::ProposalResponse(proposal) => {
347 proposal.data.view_number()
348 },
349 GeneralConsensusMessage::ProposalResponse2(proposal) => {
350 proposal.data.view_number()
351 },
352 GeneralConsensusMessage::Vote(vote_message) => vote_message.view_number(),
353 GeneralConsensusMessage::Vote2(vote_message) => vote_message.view_number(),
354 GeneralConsensusMessage::TimeoutVote(message) => message.view_number(),
355 GeneralConsensusMessage::ViewSyncPreCommitVote(message) => {
356 message.view_number()
357 },
358 GeneralConsensusMessage::ViewSyncCommitVote(message) => message.view_number(),
359 GeneralConsensusMessage::ViewSyncFinalizeVote(message) => message.view_number(),
360 GeneralConsensusMessage::ViewSyncPreCommitCertificate(message) => {
361 message.view_number()
362 },
363 GeneralConsensusMessage::ViewSyncCommitCertificate(message) => {
364 message.view_number()
365 },
366 GeneralConsensusMessage::ViewSyncFinalizeCertificate(message) => {
367 message.view_number()
368 },
369 GeneralConsensusMessage::TimeoutVote2(message) => message.view_number(),
370 GeneralConsensusMessage::ViewSyncPreCommitVote2(message) => {
371 message.view_number()
372 },
373 GeneralConsensusMessage::ViewSyncCommitVote2(message) => message.view_number(),
374 GeneralConsensusMessage::ViewSyncFinalizeVote2(message) => {
375 message.view_number()
376 },
377 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(message) => {
378 message.view_number()
379 },
380 GeneralConsensusMessage::ViewSyncCommitCertificate2(message) => {
381 message.view_number()
382 },
383 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(message) => {
384 message.view_number()
385 },
386 GeneralConsensusMessage::UpgradeProposal(message) => message.data.view_number(),
387 GeneralConsensusMessage::UpgradeVote(message) => message.view_number(),
388 GeneralConsensusMessage::HighQc(qc, _)
389 | GeneralConsensusMessage::ExtendedQc(qc, _) => qc.view_number(),
390 GeneralConsensusMessage::EpochRootQuorumVote(vote) => vote.view_number(),
391 GeneralConsensusMessage::EpochRootQc(root_qc) => root_qc.view_number(),
392 }
393 },
394 SequencingMessage::Da(da_message) => {
395 match da_message {
396 DaConsensusMessage::DaProposal(p) => {
397 p.data.view_number()
400 },
401 DaConsensusMessage::DaVote(vote_message) => vote_message.view_number(),
402 DaConsensusMessage::DaCertificate(cert) => cert.view_number,
403 DaConsensusMessage::VidDisperseMsg(disperse) => disperse.data.view_number(),
404 DaConsensusMessage::DaProposal2(p) => {
405 p.data.view_number()
408 },
409 DaConsensusMessage::DaVote2(vote_message) => vote_message.view_number(),
410 DaConsensusMessage::DaCertificate2(cert) => cert.view_number,
411 DaConsensusMessage::VidDisperseMsg2(disperse) => disperse.data.view_number(),
412 }
413 },
414 }
415 }
416
417 fn epoch_number(&self) -> Option<TYPES::Epoch> {
419 match &self {
420 SequencingMessage::General(general_message) => {
421 match general_message {
422 GeneralConsensusMessage::Proposal(p) => {
423 p.data.epoch()
426 },
427 GeneralConsensusMessage::Proposal2(p) => {
428 p.data.epoch()
431 },
432 GeneralConsensusMessage::ProposalRequested(..) => None,
433 GeneralConsensusMessage::ProposalResponse(proposal) => proposal.data.epoch(),
434 GeneralConsensusMessage::ProposalResponse2(proposal) => proposal.data.epoch(),
435 GeneralConsensusMessage::Vote(vote_message) => vote_message.epoch(),
436 GeneralConsensusMessage::Vote2(vote_message) => vote_message.epoch(),
437 GeneralConsensusMessage::TimeoutVote(message) => message.epoch(),
438 GeneralConsensusMessage::ViewSyncPreCommitVote(message) => message.epoch(),
439 GeneralConsensusMessage::ViewSyncCommitVote(message) => message.epoch(),
440 GeneralConsensusMessage::ViewSyncFinalizeVote(message) => message.epoch(),
441 GeneralConsensusMessage::ViewSyncPreCommitCertificate(message) => {
442 message.epoch()
443 },
444 GeneralConsensusMessage::ViewSyncCommitCertificate(message) => message.epoch(),
445 GeneralConsensusMessage::ViewSyncFinalizeCertificate(message) => {
446 message.epoch()
447 },
448 GeneralConsensusMessage::TimeoutVote2(message) => message.epoch(),
449 GeneralConsensusMessage::ViewSyncPreCommitVote2(message) => message.epoch(),
450 GeneralConsensusMessage::ViewSyncCommitVote2(message) => message.epoch(),
451 GeneralConsensusMessage::ViewSyncFinalizeVote2(message) => message.epoch(),
452 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(message) => {
453 message.epoch()
454 },
455 GeneralConsensusMessage::ViewSyncCommitCertificate2(message) => message.epoch(),
456 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(message) => {
457 message.epoch()
458 },
459 GeneralConsensusMessage::UpgradeProposal(message) => message.data.epoch(),
460 GeneralConsensusMessage::UpgradeVote(message) => message.epoch(),
461 GeneralConsensusMessage::HighQc(qc, _)
462 | GeneralConsensusMessage::ExtendedQc(qc, _) => qc.epoch(),
463 GeneralConsensusMessage::EpochRootQuorumVote(vote) => vote.epoch(),
464 GeneralConsensusMessage::EpochRootQc(root_qc) => root_qc.epoch(),
465 }
466 },
467 SequencingMessage::Da(da_message) => {
468 match da_message {
469 DaConsensusMessage::DaProposal(p) => {
470 p.data.epoch()
473 },
474 DaConsensusMessage::DaVote(vote_message) => vote_message.epoch(),
475 DaConsensusMessage::DaCertificate(cert) => cert.epoch(),
476 DaConsensusMessage::VidDisperseMsg(disperse) => disperse.data.epoch(),
477 DaConsensusMessage::VidDisperseMsg2(disperse) => disperse.data.epoch(),
478 DaConsensusMessage::DaProposal2(p) => {
479 p.data.epoch()
482 },
483 DaConsensusMessage::DaVote2(vote_message) => vote_message.epoch(),
484 DaConsensusMessage::DaCertificate2(cert) => cert.epoch(),
485 }
486 },
487 }
488 }
489}
490
491#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
492#[serde(bound(deserialize = ""))]
493#[allow(clippy::large_enum_variant)]
494pub enum DataMessage<TYPES: NodeType> {
497 SubmitTransaction(TYPES::Transaction, TYPES::View),
501 RequestData(DataRequest<TYPES>),
503 DataResponse(ResponseMessage<TYPES>),
505}
506
507#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
508#[serde(bound(deserialize = ""))]
509pub struct Proposal<
511 TYPES: NodeType,
512 PROPOSAL: HasViewNumber<TYPES> + HasEpoch<TYPES> + DeserializeOwned,
513> {
514 pub data: PROPOSAL,
518 pub signature: <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
520 pub _pd: PhantomData<TYPES>,
522}
523
524pub fn convert_proposal<TYPES, PROPOSAL, PROPOSAL2>(
526 proposal: Proposal<TYPES, PROPOSAL>,
527) -> Proposal<TYPES, PROPOSAL2>
528where
529 TYPES: NodeType,
530 PROPOSAL: HasViewNumber<TYPES> + HasEpoch<TYPES> + DeserializeOwned,
531 PROPOSAL2: HasViewNumber<TYPES> + HasEpoch<TYPES> + DeserializeOwned + From<PROPOSAL>,
532{
533 Proposal {
534 data: proposal.data.into(),
535 signature: proposal.signature,
536 _pd: proposal._pd,
537 }
538}
539
540impl<TYPES> Proposal<TYPES, QuorumProposal<TYPES>>
541where
542 TYPES: NodeType,
543{
544 pub async fn validate_signature<V: Versions>(
548 &self,
549 membership: &TYPES::Membership,
550 _epoch_height: u64,
551 upgrade_lock: &UpgradeLock<TYPES, V>,
552 ) -> Result<()> {
553 let view_number = self.data.view_number();
554 let view_leader_key = membership.leader(view_number, None)?;
555 let proposed_leaf = Leaf::from_quorum_proposal(&self.data);
556
557 ensure!(
558 view_leader_key.validate(
559 &self.signature,
560 proposed_leaf.commit(upgrade_lock).await.as_ref()
561 ),
562 "Proposal signature is invalid."
563 );
564
565 Ok(())
566 }
567}
568
569impl<TYPES> Proposal<TYPES, QuorumProposalWrapper<TYPES>>
570where
571 TYPES: NodeType,
572{
573 pub async fn validate_signature(&self, membership: &EpochMembership<TYPES>) -> Result<()> {
577 let view_number = self.data.proposal.view_number();
578 let view_leader_key = membership.leader(view_number).await?;
579 let proposed_leaf = Leaf2::from_quorum_proposal(&self.data);
580
581 ensure!(
582 view_leader_key.validate(&self.signature, proposed_leaf.commit().as_ref()),
583 "Proposal signature is invalid."
584 );
585
586 Ok(())
587 }
588}
589
590#[derive(Clone, Debug)]
591pub struct UpgradeLock<TYPES: NodeType, V: Versions> {
593 pub decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
595
596 pub _pd: PhantomData<V>,
598}
599
600impl<TYPES: NodeType, V: Versions> UpgradeLock<TYPES, V> {
601 #[allow(clippy::new_without_default)]
602 pub fn new() -> Self {
604 Self {
605 decided_upgrade_certificate: Arc::new(RwLock::new(None)),
606 _pd: PhantomData::<V>,
607 }
608 }
609
610 #[allow(clippy::new_without_default)]
611 pub fn from_certificate(certificate: &Option<UpgradeCertificate<TYPES>>) -> Self {
613 Self {
614 decided_upgrade_certificate: Arc::new(RwLock::new(certificate.clone())),
615 _pd: PhantomData::<V>,
616 }
617 }
618
619 pub async fn upgrade_view(&self) -> Option<TYPES::View> {
620 let upgrade_certificate = self.decided_upgrade_certificate.read().await;
621 upgrade_certificate
622 .as_ref()
623 .map(|cert| cert.data.new_version_first_view)
624 }
625
626 pub async fn version(&self, view: TYPES::View) -> Result<Version> {
631 let upgrade_certificate = self.decided_upgrade_certificate.read().await;
632
633 let version = match *upgrade_certificate {
634 Some(ref cert) => {
635 if view >= cert.data.new_version_first_view {
636 if cert.data.new_version == V::Upgrade::VERSION {
637 V::Upgrade::VERSION
638 } else {
639 bail!("The network has upgraded to a new version that we do not support!");
640 }
641 } else {
642 V::Base::VERSION
643 }
644 },
645 None => V::Base::VERSION,
646 };
647
648 Ok(version)
649 }
650
651 pub async fn version_infallible(&self, view: TYPES::View) -> Version {
655 let upgrade_certificate = self.decided_upgrade_certificate.read().await;
656
657 match *upgrade_certificate {
658 Some(ref cert) => {
659 if view >= cert.data.new_version_first_view {
660 cert.data.new_version
661 } else {
662 cert.data.old_version
663 }
664 },
665 None => V::Base::VERSION,
666 }
667 }
668
669 pub async fn epochs_enabled(&self, view: TYPES::View) -> bool {
671 self.version_infallible(view).await >= V::Epochs::VERSION
672 }
673
674 pub async fn upgraded_drb_and_header(&self, view: TYPES::View) -> bool {
676 self.version_infallible(view).await >= V::DrbAndHeaderUpgrade::VERSION
677 }
678
679 pub async fn serialize<M: HasViewNumber<TYPES> + Serialize>(
685 &self,
686 message: &M,
687 ) -> Result<Vec<u8>> {
688 let view = message.view_number();
689
690 let version = self.version(view).await?;
691
692 let serialized_message = match version {
693 v if v == V::Base::VERSION => Serializer::<V::Base>::serialize(&message),
695 v if v == V::Upgrade::VERSION => Serializer::<V::Upgrade>::serialize(&message),
696 v => {
697 bail!(
698 "Attempted to serialize with version {v}, which is incompatible. This should \
699 be impossible."
700 );
701 },
702 };
703
704 serialized_message
705 .wrap()
706 .context(info!("Failed to serialize message!"))
707 }
708
709 pub async fn deserialize<M: HasViewNumber<TYPES> + for<'a> Deserialize<'a>>(
716 &self,
717 message: &[u8],
718 ) -> Result<(M, Version)> {
719 let actual_version = Version::deserialize(message)
721 .wrap()
722 .context(info!("Failed to read message version!"))?
723 .0;
724
725 let deserialized_message: M = match actual_version {
727 v if v == EXTERNAL_MESSAGE_VERSION => {
729 Serializer::<StaticVersion<0, 0>>::deserialize(message)
730 },
731 v if v == V::Base::VERSION => Serializer::<V::Base>::deserialize(message),
732 v if v == V::Upgrade::VERSION => Serializer::<V::Upgrade>::deserialize(message),
733 v => {
734 bail!("Cannot deserialize message with stated version {v}");
735 },
736 }
737 .wrap()
738 .context(info!("Failed to deserialize message!"))?;
739
740 if actual_version == EXTERNAL_MESSAGE_VERSION {
743 return Ok((deserialized_message, actual_version));
744 }
745
746 let view = deserialized_message.view_number();
748
749 let expected_version = self.version(view).await?;
751
752 if actual_version != expected_version {
754 return Err(error!(format!(
755 "Message has invalid version number for its view. Expected: {expected_version}, \
756 Actual: {actual_version}, View: {view:?}"
757 )));
758 };
759
760 Ok((deserialized_message, actual_version))
761 }
762}