1use std::{
13 fmt::{self, Debug},
14 marker::PhantomData,
15 sync::Arc,
16};
17
18use async_lock::RwLock;
19use committable::Committable;
20use hotshot_utils::anytrace::*;
21use serde::{de::DeserializeOwned, Deserialize, Serialize};
22use vbs::{
23 version::{StaticVersionType, Version},
24 BinarySerializer, Serializer,
25};
26
27use crate::{
28 data::{
29 vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
30 DaProposal, DaProposal2, Leaf, Leaf2, QuorumProposal, QuorumProposal2,
31 QuorumProposalWrapper, UpgradeProposal,
32 },
33 epoch_membership::EpochMembership,
34 request_response::ProposalRequestPayload,
35 simple_certificate::{
36 DaCertificate, DaCertificate2, EpochRootQuorumCertificate, NextEpochQuorumCertificate2,
37 QuorumCertificate2, UpgradeCertificate, ViewSyncCommitCertificate,
38 ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate, ViewSyncFinalizeCertificate2,
39 ViewSyncPreCommitCertificate, ViewSyncPreCommitCertificate2,
40 },
41 simple_vote::{
42 DaVote, DaVote2, EpochRootQuorumVote, HasEpoch, QuorumVote, QuorumVote2, TimeoutVote,
43 TimeoutVote2, UpgradeVote, ViewSyncCommitVote, ViewSyncCommitVote2, ViewSyncFinalizeVote,
44 ViewSyncFinalizeVote2, ViewSyncPreCommitVote, ViewSyncPreCommitVote2,
45 },
46 traits::{
47 election::Membership,
48 network::{DataRequest, ResponseMessage, ViewMessage},
49 node_implementation::{ConsensusTime, NodeType, Versions},
50 signature_key::SignatureKey,
51 },
52 utils::mnemonic,
53 vote::HasViewNumber,
54};
55
56#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
58#[serde(bound(deserialize = "", serialize = ""))]
59pub struct Message<TYPES: NodeType> {
60 pub sender: TYPES::SignatureKey,
62
63 pub kind: MessageKind<TYPES>,
65}
66
67impl<TYPES: NodeType> fmt::Debug for Message<TYPES> {
68 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
69 fmt.debug_struct("Message")
70 .field("sender", &mnemonic(&self.sender))
71 .field("kind", &self.kind)
72 .finish()
73 }
74}
75
76impl<TYPES: NodeType> HasViewNumber<TYPES> for Message<TYPES> {
77 fn view_number(&self) -> TYPES::View {
79 self.kind.view_number()
80 }
81}
82
83#[derive(Clone, Debug)]
85pub struct Messages<TYPES: NodeType>(pub Vec<Message<TYPES>>);
86
87#[derive(PartialEq, Copy, Clone)]
89pub enum MessagePurpose {
90 Proposal,
92 LatestProposal,
94 LatestViewSyncCertificate,
96 Vote,
98 ViewSyncVote,
100 ViewSyncCertificate,
102 DaCertificate,
104 Internal,
106 Data,
108 VidDisperse,
110 UpgradeProposal,
112 UpgradeVote,
114 External,
116}
117
118#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash, Eq)]
122#[serde(bound(deserialize = "", serialize = ""))]
123pub enum MessageKind<TYPES: NodeType> {
124 Consensus(SequencingMessage<TYPES>),
126 Data(DataMessage<TYPES>),
128 External(Vec<u8>),
130}
131
132pub enum RecipientList<K: SignatureKey> {
134 Broadcast,
136 Direct(K),
138 Many(Vec<K>),
140}
141
142impl<TYPES: NodeType> MessageKind<TYPES> {
143 pub fn from_consensus_message(m: SequencingMessage<TYPES>) -> Self {
147 Self::Consensus(m)
148 }
149}
150
151impl<TYPES: NodeType> From<DataMessage<TYPES>> for MessageKind<TYPES> {
152 fn from(m: DataMessage<TYPES>) -> Self {
153 Self::Data(m)
154 }
155}
156
157impl<TYPES: NodeType> ViewMessage<TYPES> for MessageKind<TYPES> {
158 fn view_number(&self) -> TYPES::View {
159 match &self {
160 MessageKind::Consensus(message) => message.view_number(),
161 MessageKind::Data(DataMessage::SubmitTransaction(_, v)) => *v,
162 MessageKind::Data(DataMessage::RequestData(msg)) => msg.view,
163 MessageKind::Data(DataMessage::DataResponse(msg)) => match msg {
164 ResponseMessage::Found(m) => m.view_number(),
165 ResponseMessage::NotFound | ResponseMessage::Denied => TYPES::View::new(1),
166 },
167 MessageKind::External(_) => TYPES::View::new(1),
168 }
169 }
170}
171
172impl<TYPES: NodeType> HasEpoch<TYPES> for MessageKind<TYPES> {
173 fn epoch(&self) -> Option<TYPES::Epoch> {
174 match &self {
175 MessageKind::Consensus(message) => message.epoch_number(),
176 MessageKind::Data(DataMessage::SubmitTransaction(..) | DataMessage::RequestData(_))
177 | MessageKind::External(_) => None,
178 MessageKind::Data(DataMessage::DataResponse(msg)) => match msg {
179 ResponseMessage::Found(m) => m.epoch_number(),
180 ResponseMessage::NotFound | ResponseMessage::Denied => None,
181 },
182 }
183 }
184}
185
186#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
187#[serde(bound(deserialize = "", serialize = ""))]
188pub enum GeneralConsensusMessage<TYPES: NodeType> {
190 Proposal(Proposal<TYPES, QuorumProposal<TYPES>>),
192
193 Vote(QuorumVote<TYPES>),
195
196 ViewSyncPreCommitVote(ViewSyncPreCommitVote<TYPES>),
198
199 ViewSyncCommitVote(ViewSyncCommitVote<TYPES>),
201
202 ViewSyncFinalizeVote(ViewSyncFinalizeVote<TYPES>),
204
205 ViewSyncPreCommitCertificate(ViewSyncPreCommitCertificate<TYPES>),
207
208 ViewSyncCommitCertificate(ViewSyncCommitCertificate<TYPES>),
210
211 ViewSyncFinalizeCertificate(ViewSyncFinalizeCertificate<TYPES>),
213
214 TimeoutVote(TimeoutVote<TYPES>),
216
217 UpgradeProposal(Proposal<TYPES, UpgradeProposal<TYPES>>),
219
220 UpgradeVote(UpgradeVote<TYPES>),
222
223 ProposalRequested(
225 ProposalRequestPayload<TYPES>,
226 <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
227 ),
228
229 ProposalResponse(Proposal<TYPES, QuorumProposal<TYPES>>),
231
232 Proposal2(Proposal<TYPES, QuorumProposal2<TYPES>>),
234
235 Vote2(QuorumVote2<TYPES>),
237
238 EpochRootQuorumVote(EpochRootQuorumVote<TYPES>),
240
241 ProposalResponse2(Proposal<TYPES, QuorumProposal2<TYPES>>),
243
244 HighQc(
246 QuorumCertificate2<TYPES>,
247 Option<NextEpochQuorumCertificate2<TYPES>>,
248 ),
249
250 ExtendedQc(
252 QuorumCertificate2<TYPES>,
253 NextEpochQuorumCertificate2<TYPES>,
254 ),
255
256 EpochRootQc(EpochRootQuorumCertificate<TYPES>),
258
259 ViewSyncPreCommitVote2(ViewSyncPreCommitVote2<TYPES>),
261
262 ViewSyncCommitVote2(ViewSyncCommitVote2<TYPES>),
264
265 ViewSyncFinalizeVote2(ViewSyncFinalizeVote2<TYPES>),
267
268 ViewSyncPreCommitCertificate2(ViewSyncPreCommitCertificate2<TYPES>),
270
271 ViewSyncCommitCertificate2(ViewSyncCommitCertificate2<TYPES>),
273
274 ViewSyncFinalizeCertificate2(ViewSyncFinalizeCertificate2<TYPES>),
276
277 TimeoutVote2(TimeoutVote2<TYPES>),
279}
280
281#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Hash, Eq)]
282#[serde(bound(deserialize = "", serialize = ""))]
283pub enum DaConsensusMessage<TYPES: NodeType> {
285 DaProposal(Proposal<TYPES, DaProposal<TYPES>>),
287
288 DaVote(DaVote<TYPES>),
290
291 DaCertificate(DaCertificate<TYPES>),
293
294 VidDisperseMsg(Proposal<TYPES, ADVZDisperseShare<TYPES>>),
298
299 DaProposal2(Proposal<TYPES, DaProposal2<TYPES>>),
301
302 DaVote2(DaVote2<TYPES>),
304
305 DaCertificate2(DaCertificate2<TYPES>),
307
308 VidDisperseMsg2(Proposal<TYPES, VidDisperseShare2<TYPES>>),
312}
313
314#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)]
316#[serde(bound(deserialize = "", serialize = ""))]
317#[allow(clippy::large_enum_variant)]
318pub enum SequencingMessage<TYPES: NodeType> {
319 General(GeneralConsensusMessage<TYPES>),
321
322 Da(DaConsensusMessage<TYPES>),
324}
325
326impl<TYPES: NodeType> SequencingMessage<TYPES> {
327 fn view_number(&self) -> TYPES::View {
329 match &self {
330 SequencingMessage::General(general_message) => {
331 match general_message {
332 GeneralConsensusMessage::Proposal(p) => {
333 p.data.view_number()
336 },
337 GeneralConsensusMessage::Proposal2(p) => {
338 p.data.view_number()
341 },
342 GeneralConsensusMessage::ProposalRequested(req, _) => req.view_number,
343 GeneralConsensusMessage::ProposalResponse(proposal) => {
344 proposal.data.view_number()
345 },
346 GeneralConsensusMessage::ProposalResponse2(proposal) => {
347 proposal.data.view_number()
348 },
349 GeneralConsensusMessage::Vote(vote_message) => vote_message.view_number(),
350 GeneralConsensusMessage::Vote2(vote_message) => vote_message.view_number(),
351 GeneralConsensusMessage::TimeoutVote(message) => message.view_number(),
352 GeneralConsensusMessage::ViewSyncPreCommitVote(message) => {
353 message.view_number()
354 },
355 GeneralConsensusMessage::ViewSyncCommitVote(message) => message.view_number(),
356 GeneralConsensusMessage::ViewSyncFinalizeVote(message) => message.view_number(),
357 GeneralConsensusMessage::ViewSyncPreCommitCertificate(message) => {
358 message.view_number()
359 },
360 GeneralConsensusMessage::ViewSyncCommitCertificate(message) => {
361 message.view_number()
362 },
363 GeneralConsensusMessage::ViewSyncFinalizeCertificate(message) => {
364 message.view_number()
365 },
366 GeneralConsensusMessage::TimeoutVote2(message) => message.view_number(),
367 GeneralConsensusMessage::ViewSyncPreCommitVote2(message) => {
368 message.view_number()
369 },
370 GeneralConsensusMessage::ViewSyncCommitVote2(message) => message.view_number(),
371 GeneralConsensusMessage::ViewSyncFinalizeVote2(message) => {
372 message.view_number()
373 },
374 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(message) => {
375 message.view_number()
376 },
377 GeneralConsensusMessage::ViewSyncCommitCertificate2(message) => {
378 message.view_number()
379 },
380 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(message) => {
381 message.view_number()
382 },
383 GeneralConsensusMessage::UpgradeProposal(message) => message.data.view_number(),
384 GeneralConsensusMessage::UpgradeVote(message) => message.view_number(),
385 GeneralConsensusMessage::HighQc(qc, _)
386 | GeneralConsensusMessage::ExtendedQc(qc, _) => qc.view_number(),
387 GeneralConsensusMessage::EpochRootQuorumVote(vote) => vote.view_number(),
388 GeneralConsensusMessage::EpochRootQc(root_qc) => root_qc.view_number(),
389 }
390 },
391 SequencingMessage::Da(da_message) => {
392 match da_message {
393 DaConsensusMessage::DaProposal(p) => {
394 p.data.view_number()
397 },
398 DaConsensusMessage::DaVote(vote_message) => vote_message.view_number(),
399 DaConsensusMessage::DaCertificate(cert) => cert.view_number,
400 DaConsensusMessage::VidDisperseMsg(disperse) => disperse.data.view_number(),
401 DaConsensusMessage::DaProposal2(p) => {
402 p.data.view_number()
405 },
406 DaConsensusMessage::DaVote2(vote_message) => vote_message.view_number(),
407 DaConsensusMessage::DaCertificate2(cert) => cert.view_number,
408 DaConsensusMessage::VidDisperseMsg2(disperse) => disperse.data.view_number(),
409 }
410 },
411 }
412 }
413
414 fn epoch_number(&self) -> Option<TYPES::Epoch> {
416 match &self {
417 SequencingMessage::General(general_message) => {
418 match general_message {
419 GeneralConsensusMessage::Proposal(p) => {
420 p.data.epoch()
423 },
424 GeneralConsensusMessage::Proposal2(p) => {
425 p.data.epoch()
428 },
429 GeneralConsensusMessage::ProposalRequested(..) => None,
430 GeneralConsensusMessage::ProposalResponse(proposal) => proposal.data.epoch(),
431 GeneralConsensusMessage::ProposalResponse2(proposal) => proposal.data.epoch(),
432 GeneralConsensusMessage::Vote(vote_message) => vote_message.epoch(),
433 GeneralConsensusMessage::Vote2(vote_message) => vote_message.epoch(),
434 GeneralConsensusMessage::TimeoutVote(message) => message.epoch(),
435 GeneralConsensusMessage::ViewSyncPreCommitVote(message) => message.epoch(),
436 GeneralConsensusMessage::ViewSyncCommitVote(message) => message.epoch(),
437 GeneralConsensusMessage::ViewSyncFinalizeVote(message) => message.epoch(),
438 GeneralConsensusMessage::ViewSyncPreCommitCertificate(message) => {
439 message.epoch()
440 },
441 GeneralConsensusMessage::ViewSyncCommitCertificate(message) => message.epoch(),
442 GeneralConsensusMessage::ViewSyncFinalizeCertificate(message) => {
443 message.epoch()
444 },
445 GeneralConsensusMessage::TimeoutVote2(message) => message.epoch(),
446 GeneralConsensusMessage::ViewSyncPreCommitVote2(message) => message.epoch(),
447 GeneralConsensusMessage::ViewSyncCommitVote2(message) => message.epoch(),
448 GeneralConsensusMessage::ViewSyncFinalizeVote2(message) => message.epoch(),
449 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(message) => {
450 message.epoch()
451 },
452 GeneralConsensusMessage::ViewSyncCommitCertificate2(message) => message.epoch(),
453 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(message) => {
454 message.epoch()
455 },
456 GeneralConsensusMessage::UpgradeProposal(message) => message.data.epoch(),
457 GeneralConsensusMessage::UpgradeVote(message) => message.epoch(),
458 GeneralConsensusMessage::HighQc(qc, _)
459 | GeneralConsensusMessage::ExtendedQc(qc, _) => qc.epoch(),
460 GeneralConsensusMessage::EpochRootQuorumVote(vote) => vote.epoch(),
461 GeneralConsensusMessage::EpochRootQc(root_qc) => root_qc.epoch(),
462 }
463 },
464 SequencingMessage::Da(da_message) => {
465 match da_message {
466 DaConsensusMessage::DaProposal(p) => {
467 p.data.epoch()
470 },
471 DaConsensusMessage::DaVote(vote_message) => vote_message.epoch(),
472 DaConsensusMessage::DaCertificate(cert) => cert.epoch(),
473 DaConsensusMessage::VidDisperseMsg(disperse) => disperse.data.epoch(),
474 DaConsensusMessage::VidDisperseMsg2(disperse) => disperse.data.epoch(),
475 DaConsensusMessage::DaProposal2(p) => {
476 p.data.epoch()
479 },
480 DaConsensusMessage::DaVote2(vote_message) => vote_message.epoch(),
481 DaConsensusMessage::DaCertificate2(cert) => cert.epoch(),
482 }
483 },
484 }
485 }
486}
487
488#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
489#[serde(bound(deserialize = ""))]
490#[allow(clippy::large_enum_variant)]
491pub enum DataMessage<TYPES: NodeType> {
494 SubmitTransaction(TYPES::Transaction, TYPES::View),
498 RequestData(DataRequest<TYPES>),
500 DataResponse(ResponseMessage<TYPES>),
502}
503
504#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
505#[serde(bound(deserialize = ""))]
506pub struct Proposal<
508 TYPES: NodeType,
509 PROPOSAL: HasViewNumber<TYPES> + HasEpoch<TYPES> + DeserializeOwned,
510> {
511 pub data: PROPOSAL,
515 pub signature: <TYPES::SignatureKey as SignatureKey>::PureAssembledSignatureType,
517 pub _pd: PhantomData<TYPES>,
519}
520
521pub fn convert_proposal<TYPES, PROPOSAL, PROPOSAL2>(
523 proposal: Proposal<TYPES, PROPOSAL>,
524) -> Proposal<TYPES, PROPOSAL2>
525where
526 TYPES: NodeType,
527 PROPOSAL: HasViewNumber<TYPES> + HasEpoch<TYPES> + DeserializeOwned,
528 PROPOSAL2: HasViewNumber<TYPES> + HasEpoch<TYPES> + DeserializeOwned + From<PROPOSAL>,
529{
530 Proposal {
531 data: proposal.data.into(),
532 signature: proposal.signature,
533 _pd: proposal._pd,
534 }
535}
536
537impl<TYPES> Proposal<TYPES, QuorumProposal<TYPES>>
538where
539 TYPES: NodeType,
540{
541 pub async fn validate_signature<V: Versions>(
545 &self,
546 membership: &TYPES::Membership,
547 _epoch_height: u64,
548 upgrade_lock: &UpgradeLock<TYPES, V>,
549 ) -> Result<()> {
550 let view_number = self.data.view_number();
551 let view_leader_key = membership.leader(view_number, None)?;
552 let proposed_leaf = Leaf::from_quorum_proposal(&self.data);
553
554 ensure!(
555 view_leader_key.validate(
556 &self.signature,
557 proposed_leaf.commit(upgrade_lock).await.as_ref()
558 ),
559 "Proposal signature is invalid."
560 );
561
562 Ok(())
563 }
564}
565
566impl<TYPES> Proposal<TYPES, QuorumProposalWrapper<TYPES>>
567where
568 TYPES: NodeType,
569{
570 pub async fn validate_signature(&self, membership: &EpochMembership<TYPES>) -> Result<()> {
574 let view_number = self.data.proposal.view_number();
575 let view_leader_key = membership.leader(view_number).await?;
576 let proposed_leaf = Leaf2::from_quorum_proposal(&self.data);
577
578 ensure!(
579 view_leader_key.validate(&self.signature, proposed_leaf.commit().as_ref()),
580 "Proposal signature is invalid."
581 );
582
583 Ok(())
584 }
585}
586
587#[derive(Clone, Debug)]
588pub struct UpgradeLock<TYPES: NodeType, V: Versions> {
590 pub decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
592
593 pub _pd: PhantomData<V>,
595}
596
597impl<TYPES: NodeType, V: Versions> UpgradeLock<TYPES, V> {
598 #[allow(clippy::new_without_default)]
599 pub fn new() -> Self {
601 Self {
602 decided_upgrade_certificate: Arc::new(RwLock::new(None)),
603 _pd: PhantomData::<V>,
604 }
605 }
606
607 #[allow(clippy::new_without_default)]
608 pub fn from_certificate(certificate: &Option<UpgradeCertificate<TYPES>>) -> Self {
610 Self {
611 decided_upgrade_certificate: Arc::new(RwLock::new(certificate.clone())),
612 _pd: PhantomData::<V>,
613 }
614 }
615
616 pub async fn upgrade_view(&self) -> Option<TYPES::View> {
617 let upgrade_certificate = self.decided_upgrade_certificate.read().await;
618 upgrade_certificate
619 .as_ref()
620 .map(|cert| cert.data.new_version_first_view)
621 }
622
623 pub async fn version(&self, view: TYPES::View) -> Result<Version> {
628 let upgrade_certificate = self.decided_upgrade_certificate.read().await;
629
630 let version = match *upgrade_certificate {
631 Some(ref cert) => {
632 if view >= cert.data.new_version_first_view {
633 if cert.data.new_version == V::Upgrade::VERSION {
634 V::Upgrade::VERSION
635 } else {
636 bail!("The network has upgraded to a new version that we do not support!");
637 }
638 } else {
639 V::Base::VERSION
640 }
641 },
642 None => V::Base::VERSION,
643 };
644
645 Ok(version)
646 }
647
648 pub async fn version_infallible(&self, view: TYPES::View) -> Version {
652 let upgrade_certificate = self.decided_upgrade_certificate.read().await;
653
654 match *upgrade_certificate {
655 Some(ref cert) => {
656 if view >= cert.data.new_version_first_view {
657 cert.data.new_version
658 } else {
659 cert.data.old_version
660 }
661 },
662 None => V::Base::VERSION,
663 }
664 }
665
666 pub async fn epochs_enabled(&self, view: TYPES::View) -> bool {
668 self.version_infallible(view).await >= V::Epochs::VERSION
669 }
670
671 pub async fn serialize<M: HasViewNumber<TYPES> + Serialize>(
677 &self,
678 message: &M,
679 ) -> Result<Vec<u8>> {
680 let view = message.view_number();
681
682 let version = self.version(view).await?;
683
684 let serialized_message = match version {
685 v if v == V::Base::VERSION => Serializer::<V::Base>::serialize(&message),
687 v if v == V::Upgrade::VERSION => Serializer::<V::Upgrade>::serialize(&message),
688 v => {
689 bail!("Attempted to serialize with version {}, which is incompatible. This should be impossible.", v);
690 },
691 };
692
693 serialized_message
694 .wrap()
695 .context(info!("Failed to serialize message!"))
696 }
697
698 pub async fn deserialize<M: HasViewNumber<TYPES> + for<'a> Deserialize<'a>>(
704 &self,
705 message: &[u8],
706 ) -> Result<M> {
707 let actual_version = Version::deserialize(message)
708 .wrap()
709 .context(info!("Failed to read message version!"))?
710 .0;
711
712 let deserialized_message: M = match actual_version {
713 v if v == V::Base::VERSION => Serializer::<V::Base>::deserialize(message),
714 v if v == V::Upgrade::VERSION => Serializer::<V::Upgrade>::deserialize(message),
715 v => {
716 bail!("Cannot deserialize message with stated version {}", v);
717 },
718 }
719 .wrap()
720 .context(info!("Failed to deserialize message!"))?;
721
722 let view = deserialized_message.view_number();
723
724 let expected_version = self.version(view).await?;
725
726 if actual_version != expected_version {
727 return Err(error!(format!("Message has invalid version number for its view. Expected: {expected_version}, Actual: {actual_version}, View: {view:?}")));
728 };
729
730 Ok(deserialized_message)
731 }
732}