1use std::{
13 fmt::{self, Debug},
14 marker::PhantomData,
15 sync::Arc,
16};
17
18use async_lock::RwLock;
19use committable::Committable;
20use hotshot_utils::anytrace::*;
21use serde::{de::DeserializeOwned, Deserialize, Serialize};
22use vbs::{
23 version::{StaticVersion, StaticVersionType, Version},
24 BinarySerializer, Serializer,
25};
26
27pub const EXTERNAL_MESSAGE_VERSION: Version = Version { major: 0, minor: 0 };
29
30use crate::{
31 data::{
32 DaProposal, DaProposal2, Leaf, Leaf2, QuorumProposal, QuorumProposal2,
33 QuorumProposal2Legacy, QuorumProposalWrapper, UpgradeProposal, VidDisperseShare0,
34 VidDisperseShare1, VidDisperseShare2,
35 },
36 epoch_membership::EpochMembership,
37 request_response::ProposalRequestPayload,
38 simple_certificate::{
39 DaCertificate, DaCertificate2, EpochRootQuorumCertificateV1, EpochRootQuorumCertificateV2,
40 NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
41 ViewSyncCommitCertificate, ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate,
42 ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate, ViewSyncPreCommitCertificate2,
43 },
44 simple_vote::{
45 DaVote, DaVote2, EpochRootQuorumVote, EpochRootQuorumVote2, HasEpoch, QuorumVote,
46 QuorumVote2, TimeoutVote, TimeoutVote2, UpgradeVote, ViewSyncCommitVote,
47 ViewSyncCommitVote2, ViewSyncFinalizeVote, ViewSyncFinalizeVote2, ViewSyncPreCommitVote,
48 ViewSyncPreCommitVote2,
49 },
50 traits::{
51 election::Membership,
52 network::{DataRequest, ResponseMessage, ViewMessage},
53 node_implementation::{ConsensusTime, NodeType, Versions},
54 signature_key::SignatureKey,
55 },
56 utils::mnemonic,
57 vote::HasViewNumber,
58};
59
60#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
62#[serde(bound(deserialize = "", serialize = ""))]
63pub struct Message<TYPES: NodeType> {
64 pub sender: TYPES::SignatureKey,
66
67 pub kind: MessageKind<TYPES>,
69}
70
71impl<TYPES: NodeType> fmt::Debug for Message<TYPES> {
72 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
73 fmt.debug_struct("Message")
74 .field("sender", &mnemonic(&self.sender))
75 .field("kind", &self.kind)
76 .finish()
77 }
78}
79
80impl<TYPES: NodeType> HasViewNumber<TYPES> for Message<TYPES> {
81 fn view_number(&self) -> TYPES::View {
83 self.kind.view_number()
84 }
85}
86
87#[derive(Clone, Debug)]
89pub struct Messages<TYPES: NodeType>(pub Vec<Message<TYPES>>);
90
91#[derive(PartialEq, Copy, Clone)]
93pub enum MessagePurpose {
94 Proposal,
96 LatestProposal,
98 LatestViewSyncCertificate,
100 Vote,
102 ViewSyncVote,
104 ViewSyncCertificate,
106 DaCertificate,
108 Internal,
110 Data,
112 VidDisperse,
114 UpgradeProposal,
116 UpgradeVote,
118 External,
120}
121
122#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash, Eq)]
126#[serde(bound(deserialize = "", serialize = ""))]
127pub enum MessageKind<TYPES: NodeType> {
128 Consensus(SequencingMessage<TYPES>),
130 Data(DataMessage<TYPES>),
132 External(Vec<u8>),
134}
135
136pub enum RecipientList<K: SignatureKey> {
138 Broadcast,
140 Direct(K),
142 Many(Vec<K>),
144}
145
146impl<TYPES: NodeType> MessageKind<TYPES> {
147 pub fn from_consensus_message(m: SequencingMessage<TYPES>) -> Self {
151 Self::Consensus(m)
152 }
153}
154
155impl<TYPES: NodeType> From<DataMessage<TYPES>> for MessageKind<TYPES> {
156 fn from(m: DataMessage<TYPES>) -> Self {
157 Self::Data(m)
158 }
159}
160
161impl<TYPES: NodeType> ViewMessage<TYPES> for MessageKind<TYPES> {
162 fn view_number(&self) -> TYPES::View {
163 match &self {
164 MessageKind::Consensus(message) => message.view_number(),
165 MessageKind::Data(DataMessage::SubmitTransaction(_, v)) => *v,
166 MessageKind::Data(DataMessage::RequestData(msg)) => msg.view,
167 MessageKind::Data(DataMessage::DataResponse(msg)) => match msg {
168 ResponseMessage::Found(m) => m.view_number(),
169 ResponseMessage::NotFound | ResponseMessage::Denied => TYPES::View::new(1),
170 },
171 MessageKind::External(_) => TYPES::View::new(1),
172 }
173 }
174}
175
176impl<TYPES: NodeType> HasEpoch<TYPES> for MessageKind<TYPES> {
177 fn epoch(&self) -> Option<TYPES::Epoch> {
178 match &self {
179 MessageKind::Consensus(message) => message.epoch_number(),
180 MessageKind::Data(DataMessage::SubmitTransaction(..) | DataMessage::RequestData(_))
181 | MessageKind::External(_) => None,
182 MessageKind::Data(DataMessage::DataResponse(msg)) => match msg {
183 ResponseMessage::Found(m) => m.epoch_number(),
184 ResponseMessage::NotFound | ResponseMessage::Denied => None,
185 },
186 }
187 }
188}
189
190#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
191#[serde(bound(deserialize = "", serialize = ""))]
192pub enum GeneralConsensusMessage<TYPES: NodeType> {
194 Proposal(Proposal<TYPES, QuorumProposal<TYPES>>),
196
197 Vote(QuorumVote<TYPES>),
199
200 ViewSyncPreCommitVote(ViewSyncPreCommitVote<TYPES>),
202
203 ViewSyncCommitVote(ViewSyncCommitVote<TYPES>),
205
206 ViewSyncFinalizeVote(ViewSyncFinalizeVote<TYPES>),
208
209 ViewSyncPreCommitCertificate(ViewSyncPreCommitCertificate<TYPES>),
211
212 ViewSyncCommitCertificate(ViewSyncCommitCertificate<TYPES>),
214
215 ViewSyncFinalizeCertificate(ViewSyncFinalizeCertificate<TYPES>),
217
218 TimeoutVote(TimeoutVote<TYPES>),
220
221 UpgradeProposal(Proposal<TYPES, UpgradeProposal<TYPES>>),
223
224 UpgradeVote(UpgradeVote<TYPES>),
226
227 ProposalRequested(
229 ProposalRequestPayload<TYPES>,
230 <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
231 ),
232
233 ProposalResponse(Proposal<TYPES, QuorumProposal<TYPES>>),
235
236 Proposal2Legacy(Proposal<TYPES, QuorumProposal2Legacy<TYPES>>),
238
239 Vote2(QuorumVote2<TYPES>),
241
242 EpochRootQuorumVote(EpochRootQuorumVote<TYPES>),
244
245 ProposalResponse2Legacy(Proposal<TYPES, QuorumProposal2Legacy<TYPES>>),
247
248 HighQc(
250 QuorumCertificate2<TYPES>,
251 Option<NextEpochQuorumCertificate2<TYPES>>,
252 ),
253
254 ExtendedQc(
256 QuorumCertificate2<TYPES>,
257 NextEpochQuorumCertificate2<TYPES>,
258 ),
259
260 EpochRootQcV1(EpochRootQuorumCertificateV1<TYPES>),
262
263 ViewSyncPreCommitVote2(ViewSyncPreCommitVote2<TYPES>),
265
266 ViewSyncCommitVote2(ViewSyncCommitVote2<TYPES>),
268
269 ViewSyncFinalizeVote2(ViewSyncFinalizeVote2<TYPES>),
271
272 ViewSyncPreCommitCertificate2(ViewSyncPreCommitCertificate2<TYPES>),
274
275 ViewSyncCommitCertificate2(ViewSyncCommitCertificate2<TYPES>),
277
278 ViewSyncFinalizeCertificate2(ViewSyncFinalizeCertificate2<TYPES>),
280
281 TimeoutVote2(TimeoutVote2<TYPES>),
283
284 EpochRootQc(EpochRootQuorumCertificateV2<TYPES>),
286
287 Proposal2(Proposal<TYPES, QuorumProposal2<TYPES>>),
289
290 ProposalResponse2(Proposal<TYPES, QuorumProposal2<TYPES>>),
292
293 EpochRootQuorumVote2(EpochRootQuorumVote2<TYPES>),
295}
296
297#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Hash, Eq)]
298#[serde(bound(deserialize = "", serialize = ""))]
299pub enum DaConsensusMessage<TYPES: NodeType> {
301 DaProposal(Proposal<TYPES, DaProposal<TYPES>>),
303
304 DaVote(DaVote<TYPES>),
306
307 DaCertificate(DaCertificate<TYPES>),
309
310 VidDisperseMsg(Proposal<TYPES, VidDisperseShare0<TYPES>>),
314
315 DaProposal2(Proposal<TYPES, DaProposal2<TYPES>>),
317
318 DaVote2(DaVote2<TYPES>),
320
321 DaCertificate2(DaCertificate2<TYPES>),
323
324 VidDisperseMsg1(Proposal<TYPES, VidDisperseShare1<TYPES>>),
326
327 VidDisperseMsg2(Proposal<TYPES, VidDisperseShare2<TYPES>>),
329}
330
331#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)]
333#[serde(bound(deserialize = "", serialize = ""))]
334#[allow(clippy::large_enum_variant)]
335pub enum SequencingMessage<TYPES: NodeType> {
336 General(GeneralConsensusMessage<TYPES>),
338
339 Da(DaConsensusMessage<TYPES>),
341}
342
343impl<TYPES: NodeType> SequencingMessage<TYPES> {
344 fn view_number(&self) -> TYPES::View {
346 match &self {
347 SequencingMessage::General(general_message) => {
348 match general_message {
349 GeneralConsensusMessage::Proposal(p) => {
350 p.data.view_number()
353 },
354 GeneralConsensusMessage::Proposal2Legacy(p) => {
355 p.data.view_number()
358 },
359 GeneralConsensusMessage::Proposal2(p) => {
360 p.data.view_number()
363 },
364 GeneralConsensusMessage::ProposalRequested(req, _) => req.view_number,
365 GeneralConsensusMessage::ProposalResponse(proposal) => {
366 proposal.data.view_number()
367 },
368 GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
369 proposal.data.view_number()
370 },
371 GeneralConsensusMessage::ProposalResponse2(proposal) => {
372 proposal.data.view_number()
373 },
374 GeneralConsensusMessage::Vote(vote_message) => vote_message.view_number(),
375 GeneralConsensusMessage::Vote2(vote_message) => vote_message.view_number(),
376 GeneralConsensusMessage::TimeoutVote(message) => message.view_number(),
377 GeneralConsensusMessage::ViewSyncPreCommitVote(message) => {
378 message.view_number()
379 },
380 GeneralConsensusMessage::ViewSyncCommitVote(message) => message.view_number(),
381 GeneralConsensusMessage::ViewSyncFinalizeVote(message) => message.view_number(),
382 GeneralConsensusMessage::ViewSyncPreCommitCertificate(message) => {
383 message.view_number()
384 },
385 GeneralConsensusMessage::ViewSyncCommitCertificate(message) => {
386 message.view_number()
387 },
388 GeneralConsensusMessage::ViewSyncFinalizeCertificate(message) => {
389 message.view_number()
390 },
391 GeneralConsensusMessage::TimeoutVote2(message) => message.view_number(),
392 GeneralConsensusMessage::ViewSyncPreCommitVote2(message) => {
393 message.view_number()
394 },
395 GeneralConsensusMessage::ViewSyncCommitVote2(message) => message.view_number(),
396 GeneralConsensusMessage::ViewSyncFinalizeVote2(message) => {
397 message.view_number()
398 },
399 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(message) => {
400 message.view_number()
401 },
402 GeneralConsensusMessage::ViewSyncCommitCertificate2(message) => {
403 message.view_number()
404 },
405 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(message) => {
406 message.view_number()
407 },
408 GeneralConsensusMessage::UpgradeProposal(message) => message.data.view_number(),
409 GeneralConsensusMessage::UpgradeVote(message) => message.view_number(),
410 GeneralConsensusMessage::HighQc(qc, _)
411 | GeneralConsensusMessage::ExtendedQc(qc, _) => qc.view_number(),
412 GeneralConsensusMessage::EpochRootQuorumVote(vote) => vote.view_number(),
413 GeneralConsensusMessage::EpochRootQuorumVote2(vote) => vote.view_number(),
414 GeneralConsensusMessage::EpochRootQc(root_qc) => root_qc.view_number(),
415 GeneralConsensusMessage::EpochRootQcV1(root_qc) => root_qc.view_number(),
416 }
417 },
418 SequencingMessage::Da(da_message) => {
419 match da_message {
420 DaConsensusMessage::DaProposal(p) => {
421 p.data.view_number()
424 },
425 DaConsensusMessage::DaVote(vote_message) => vote_message.view_number(),
426 DaConsensusMessage::DaCertificate(cert) => cert.view_number,
427 DaConsensusMessage::VidDisperseMsg(disperse) => disperse.data.view_number(),
428 DaConsensusMessage::DaProposal2(p) => {
429 p.data.view_number()
432 },
433 DaConsensusMessage::DaVote2(vote_message) => vote_message.view_number(),
434 DaConsensusMessage::DaCertificate2(cert) => cert.view_number,
435 DaConsensusMessage::VidDisperseMsg1(disperse) => disperse.data.view_number(),
436 DaConsensusMessage::VidDisperseMsg2(disperse) => disperse.data.view_number(),
437 }
438 },
439 }
440 }
441
442 fn epoch_number(&self) -> Option<TYPES::Epoch> {
444 match &self {
445 SequencingMessage::General(general_message) => {
446 match general_message {
447 GeneralConsensusMessage::Proposal(p) => {
448 p.data.epoch()
451 },
452 GeneralConsensusMessage::Proposal2Legacy(p) => {
453 p.data.epoch()
456 },
457 GeneralConsensusMessage::Proposal2(p) => {
458 p.data.epoch()
461 },
462 GeneralConsensusMessage::ProposalRequested(..) => None,
463 GeneralConsensusMessage::ProposalResponse(proposal) => proposal.data.epoch(),
464 GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
465 proposal.data.epoch()
466 },
467 GeneralConsensusMessage::ProposalResponse2(proposal) => proposal.data.epoch(),
468 GeneralConsensusMessage::Vote(vote_message) => vote_message.epoch(),
469 GeneralConsensusMessage::Vote2(vote_message) => vote_message.epoch(),
470 GeneralConsensusMessage::TimeoutVote(message) => message.epoch(),
471 GeneralConsensusMessage::ViewSyncPreCommitVote(message) => message.epoch(),
472 GeneralConsensusMessage::ViewSyncCommitVote(message) => message.epoch(),
473 GeneralConsensusMessage::ViewSyncFinalizeVote(message) => message.epoch(),
474 GeneralConsensusMessage::ViewSyncPreCommitCertificate(message) => {
475 message.epoch()
476 },
477 GeneralConsensusMessage::ViewSyncCommitCertificate(message) => message.epoch(),
478 GeneralConsensusMessage::ViewSyncFinalizeCertificate(message) => {
479 message.epoch()
480 },
481 GeneralConsensusMessage::TimeoutVote2(message) => message.epoch(),
482 GeneralConsensusMessage::ViewSyncPreCommitVote2(message) => message.epoch(),
483 GeneralConsensusMessage::ViewSyncCommitVote2(message) => message.epoch(),
484 GeneralConsensusMessage::ViewSyncFinalizeVote2(message) => message.epoch(),
485 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(message) => {
486 message.epoch()
487 },
488 GeneralConsensusMessage::ViewSyncCommitCertificate2(message) => message.epoch(),
489 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(message) => {
490 message.epoch()
491 },
492 GeneralConsensusMessage::UpgradeProposal(message) => message.data.epoch(),
493 GeneralConsensusMessage::UpgradeVote(message) => message.epoch(),
494 GeneralConsensusMessage::HighQc(qc, _)
495 | GeneralConsensusMessage::ExtendedQc(qc, _) => qc.epoch(),
496 GeneralConsensusMessage::EpochRootQuorumVote(vote) => vote.epoch(),
497 GeneralConsensusMessage::EpochRootQuorumVote2(vote) => vote.epoch(),
498 GeneralConsensusMessage::EpochRootQc(root_qc) => root_qc.epoch(),
499 GeneralConsensusMessage::EpochRootQcV1(root_qc) => root_qc.epoch(),
500 }
501 },
502 SequencingMessage::Da(da_message) => {
503 match da_message {
504 DaConsensusMessage::DaProposal(p) => {
505 p.data.epoch()
508 },
509 DaConsensusMessage::DaVote(vote_message) => vote_message.epoch(),
510 DaConsensusMessage::DaCertificate(cert) => cert.epoch(),
511 DaConsensusMessage::VidDisperseMsg(disperse) => disperse.data.epoch(),
512 DaConsensusMessage::VidDisperseMsg1(disperse) => disperse.data.epoch(),
513 DaConsensusMessage::VidDisperseMsg2(disperse) => disperse.data.epoch(),
514 DaConsensusMessage::DaProposal2(p) => {
515 p.data.epoch()
518 },
519 DaConsensusMessage::DaVote2(vote_message) => vote_message.epoch(),
520 DaConsensusMessage::DaCertificate2(cert) => cert.epoch(),
521 }
522 },
523 }
524 }
525}
526
527#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
528#[serde(bound(deserialize = ""))]
529#[allow(clippy::large_enum_variant)]
530pub enum DataMessage<TYPES: NodeType> {
533 SubmitTransaction(TYPES::Transaction, TYPES::View),
537 RequestData(DataRequest<TYPES>),
539 DataResponse(ResponseMessage<TYPES>),
541}
542
543#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
544#[serde(bound(deserialize = ""))]
545pub struct Proposal<
547 TYPES: NodeType,
548 PROPOSAL: HasViewNumber<TYPES> + HasEpoch<TYPES> + DeserializeOwned,
549> {
550 pub data: PROPOSAL,
554 pub signature: <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
556 pub _pd: PhantomData<TYPES>,
558}
559
560pub fn convert_proposal<TYPES, PROPOSAL, PROPOSAL2>(
562 proposal: Proposal<TYPES, PROPOSAL>,
563) -> Proposal<TYPES, PROPOSAL2>
564where
565 TYPES: NodeType,
566 PROPOSAL: HasViewNumber<TYPES> + HasEpoch<TYPES> + DeserializeOwned,
567 PROPOSAL2: HasViewNumber<TYPES> + HasEpoch<TYPES> + DeserializeOwned + From<PROPOSAL>,
568{
569 Proposal {
570 data: proposal.data.into(),
571 signature: proposal.signature,
572 _pd: proposal._pd,
573 }
574}
575
576impl<TYPES> Proposal<TYPES, QuorumProposal<TYPES>>
577where
578 TYPES: NodeType,
579{
580 pub async fn validate_signature<V: Versions>(
584 &self,
585 membership: &TYPES::Membership,
586 _epoch_height: u64,
587 upgrade_lock: &UpgradeLock<TYPES, V>,
588 ) -> Result<()> {
589 let view_number = self.data.view_number();
590 let view_leader_key = membership.leader(view_number, None)?;
591 let proposed_leaf = Leaf::from_quorum_proposal(&self.data);
592
593 ensure!(
594 view_leader_key.validate(
595 &self.signature,
596 proposed_leaf.commit(upgrade_lock).await.as_ref()
597 ),
598 "Proposal signature is invalid."
599 );
600
601 Ok(())
602 }
603}
604
605impl<TYPES> Proposal<TYPES, QuorumProposalWrapper<TYPES>>
606where
607 TYPES: NodeType,
608{
609 pub async fn validate_signature(&self, membership: &EpochMembership<TYPES>) -> Result<()> {
613 let view_number = self.data.proposal.view_number();
614 let view_leader_key = membership.leader(view_number).await?;
615 let proposed_leaf = Leaf2::from_quorum_proposal(&self.data);
616
617 ensure!(
618 view_leader_key.validate(&self.signature, proposed_leaf.commit().as_ref()),
619 "Proposal signature is invalid."
620 );
621
622 Ok(())
623 }
624}
625
626#[derive(Clone, Debug)]
627pub struct UpgradeLock<TYPES: NodeType, V: Versions> {
629 pub decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
631
632 pub _pd: PhantomData<V>,
634}
635
636impl<TYPES: NodeType, V: Versions> UpgradeLock<TYPES, V> {
637 #[allow(clippy::new_without_default)]
638 pub fn new() -> Self {
640 Self {
641 decided_upgrade_certificate: Arc::new(RwLock::new(None)),
642 _pd: PhantomData::<V>,
643 }
644 }
645
646 #[allow(clippy::new_without_default)]
647 pub fn from_certificate(certificate: &Option<UpgradeCertificate<TYPES>>) -> Self {
649 Self {
650 decided_upgrade_certificate: Arc::new(RwLock::new(certificate.clone())),
651 _pd: PhantomData::<V>,
652 }
653 }
654
655 pub async fn upgrade_view(&self) -> Option<TYPES::View> {
656 let upgrade_certificate = self.decided_upgrade_certificate.read().await;
657 upgrade_certificate
658 .as_ref()
659 .map(|cert| cert.data.new_version_first_view)
660 }
661
662 pub async fn version(&self, view: TYPES::View) -> Result<Version> {
667 let upgrade_certificate = self.decided_upgrade_certificate.read().await;
668
669 let version = match *upgrade_certificate {
670 Some(ref cert) => {
671 if view >= cert.data.new_version_first_view {
672 if cert.data.new_version == V::Upgrade::VERSION {
673 V::Upgrade::VERSION
674 } else {
675 bail!("The network has upgraded to a new version that we do not support!");
676 }
677 } else {
678 V::Base::VERSION
679 }
680 },
681 None => V::Base::VERSION,
682 };
683
684 Ok(version)
685 }
686
687 pub async fn version_infallible(&self, view: TYPES::View) -> Version {
691 let upgrade_certificate = self.decided_upgrade_certificate.read().await;
692
693 match *upgrade_certificate {
694 Some(ref cert) => {
695 if view >= cert.data.new_version_first_view {
696 cert.data.new_version
697 } else {
698 cert.data.old_version
699 }
700 },
701 None => V::Base::VERSION,
702 }
703 }
704
705 pub async fn epochs_enabled(&self, view: TYPES::View) -> bool {
707 self.version_infallible(view).await >= V::Epochs::VERSION
708 }
709
710 pub async fn proposal2_legacy_version(&self, view: TYPES::View) -> bool {
712 self.epochs_enabled(view).await && !self.upgraded_drb_and_header(view).await
713 }
714
715 pub async fn proposal2_version(&self, view: TYPES::View) -> bool {
717 self.epochs_enabled(view).await && self.upgraded_drb_and_header(view).await
718 }
719
720 pub async fn upgraded_drb_and_header(&self, view: TYPES::View) -> bool {
722 self.version_infallible(view).await >= V::DrbAndHeaderUpgrade::VERSION
723 }
724
725 pub async fn upgraded_vid2(&self, view: TYPES::View) -> bool {
726 self.version_infallible(view).await >= V::Vid2Upgrade::VERSION
727 }
728
729 pub async fn serialize<M: HasViewNumber<TYPES> + Serialize>(
735 &self,
736 message: &M,
737 ) -> Result<Vec<u8>> {
738 let view = message.view_number();
739
740 let version = self.version(view).await?;
741
742 let serialized_message = match version {
743 v if v == V::Base::VERSION => Serializer::<V::Base>::serialize(&message),
745 v if v == V::Upgrade::VERSION => Serializer::<V::Upgrade>::serialize(&message),
746 v => {
747 bail!(
748 "Attempted to serialize with version {v}, which is incompatible. This should \
749 be impossible."
750 );
751 },
752 };
753
754 serialized_message
755 .wrap()
756 .context(info!("Failed to serialize message!"))
757 }
758
759 pub async fn deserialize<M: Debug + HasViewNumber<TYPES> + for<'a> Deserialize<'a>>(
766 &self,
767 message: &[u8],
768 ) -> Result<(M, Version)> {
769 let (actual_version, rest) = Version::deserialize(message)
771 .wrap()
772 .context(info!("Failed to read message version!"))?;
773
774 let deserialized_message: M = match actual_version {
776 v if v == EXTERNAL_MESSAGE_VERSION => {
778 Serializer::<StaticVersion<0, 0>>::deserialize(message)
779 },
780 v if v == V::Base::VERSION => Serializer::<V::Base>::deserialize(message),
781 v if v == V::Upgrade::VERSION => Serializer::<V::Upgrade>::deserialize(message),
782 v => {
783 let attempted_deserialization: M = match bincode::deserialize(rest) {
784 Ok(m) => m,
785 Err(e) => {
786 bail!("Cannot deserialize message with stated version: {v}. Error: {e}");
787 },
788 };
789
790 bail!(warn!(
791 "Received a message with state version {v} which is invalid for its view: {:?}",
792 attempted_deserialization
793 ));
794 },
795 }
796 .wrap()
797 .context(info!("Failed to deserialize message!"))?;
798
799 if actual_version == EXTERNAL_MESSAGE_VERSION {
802 return Ok((deserialized_message, actual_version));
803 }
804
805 let view = deserialized_message.view_number();
807
808 let expected_version = self.version(view).await?;
810
811 if actual_version != expected_version {
813 return Err(error!(format!(
814 "Message has invalid version number for its view. Expected: {expected_version}, \
815 Actual: {actual_version}, View: {view:?}\n\n{deserialized_message:?}"
816 )));
817 };
818
819 Ok((deserialized_message, actual_version))
820 }
821}