1use std::{
8 collections::{BTreeMap, HashMap},
9 hash::{DefaultHasher, Hash},
10 sync::Arc,
11};
12
13use async_broadcast::{Receiver, Sender};
14use async_trait::async_trait;
15use hotshot_task::task::TaskState;
16use hotshot_types::{
17 consensus::OuterConsensus,
18 data::{VidDisperse, VidDisperseShare},
19 epoch_membership::EpochMembershipCoordinator,
20 event::{Event, EventType, HotShotAction},
21 message::{
22 convert_proposal, DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message,
23 MessageKind, Proposal, SequencingMessage, UpgradeLock,
24 },
25 simple_vote::HasEpoch,
26 storage_metrics::StorageMetricsValue,
27 traits::{
28 network::{
29 BroadcastDelay, ConnectedNetwork, RequestKind, ResponseMessage, Topic, TransmitType,
30 ViewMessage,
31 },
32 node_implementation::{ConsensusTime, NodeType, Versions},
33 storage::Storage,
34 },
35 vote::{HasViewNumber, Vote},
36};
37use hotshot_utils::anytrace::*;
38use tokio::{spawn, task::JoinHandle, time::Instant};
39use tracing::instrument;
40
41use crate::{
42 events::{HotShotEvent, HotShotTaskCompleted},
43 helpers::broadcast_event,
44};
45
46#[derive(Clone)]
48pub struct NetworkMessageTaskState<TYPES: NodeType, V: Versions> {
49 pub internal_event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
51
52 pub external_event_stream: Sender<Event<TYPES>>,
54
55 pub public_key: TYPES::SignatureKey,
57
58 pub upgrade_lock: UpgradeLock<TYPES, V>,
60
61 pub id: u64,
63}
64
65impl<TYPES: NodeType, V: Versions> NetworkMessageTaskState<TYPES, V> {
66 #[instrument(skip_all, name = "Network message task", fields(id = self.id), level = "trace")]
67 pub async fn handle_message(&mut self, message: Message<TYPES>) {
69 match &message.kind {
70 MessageKind::Consensus(_) => {
71 tracing::debug!("Received consensus message from network: {:?}", message)
72 },
73 MessageKind::Data(_) => {
74 tracing::trace!("Received data message from network: {:?}", message)
75 },
76 MessageKind::External(_) => {
77 tracing::trace!("Received external message from network: {:?}", message)
78 },
79 }
80
81 let sender = message.sender;
83 match message.kind {
84 MessageKind::Consensus(consensus_message) => {
86 let event = match consensus_message {
87 SequencingMessage::General(general_message) => match general_message {
88 GeneralConsensusMessage::Proposal(proposal) => {
89 if self
90 .upgrade_lock
91 .epochs_enabled(proposal.data.view_number())
92 .await
93 {
94 tracing::warn!(
95 "received GeneralConsensusMessage::Proposal for view {} but \
96 epochs are enabled for that view",
97 proposal.data.view_number()
98 );
99 return;
100 }
101 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
102 },
103 GeneralConsensusMessage::Proposal2Legacy(proposal) => {
104 if !self
105 .upgrade_lock
106 .proposal2_legacy_version(proposal.data.view_number())
107 .await
108 {
109 tracing::warn!(
110 "received GeneralConsensusMessage::Proposal2Legacy for view \
111 {} but we are in the wrong version for that message type",
112 proposal.data.view_number()
113 );
114 return;
115 }
116 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
117 },
118 GeneralConsensusMessage::Proposal2(proposal) => {
119 if !self
120 .upgrade_lock
121 .proposal2_version(proposal.data.view_number())
122 .await
123 {
124 tracing::warn!(
125 "received GeneralConsensusMessage::Proposal2 for view {} but \
126 we are in the wrong version for that message type",
127 proposal.data.view_number()
128 );
129 return;
130 }
131 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
132 },
133 GeneralConsensusMessage::ProposalRequested(req, sig) => {
134 HotShotEvent::QuorumProposalRequestRecv(req, sig)
135 },
136 GeneralConsensusMessage::ProposalResponse(proposal) => {
137 if self
138 .upgrade_lock
139 .epochs_enabled(proposal.data.view_number())
140 .await
141 {
142 tracing::warn!(
143 "received GeneralConsensusMessage::ProposalResponse for view \
144 {} but we are in the wrong version for that message type",
145 proposal.data.view_number()
146 );
147 return;
148 }
149 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
150 },
151 GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
152 if !self
153 .upgrade_lock
154 .proposal2_legacy_version(proposal.data.view_number())
155 .await
156 {
157 tracing::warn!(
158 "received GeneralConsensusMessage::ProposalResponse2Legacy \
159 for view {} but we are in the wrong version for that message \
160 type",
161 proposal.data.view_number()
162 );
163 return;
164 }
165 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
166 },
167 GeneralConsensusMessage::ProposalResponse2(proposal) => {
168 if !self
169 .upgrade_lock
170 .proposal2_version(proposal.data.view_number())
171 .await
172 {
173 tracing::warn!(
174 "received GeneralConsensusMessage::ProposalResponse2 for view \
175 {} but epochs are not enabled for that view",
176 proposal.data.view_number()
177 );
178 return;
179 }
180 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
181 },
182 GeneralConsensusMessage::Vote(vote) => {
183 if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
184 tracing::warn!(
185 "received GeneralConsensusMessage::Vote for view {} but \
186 epochs are enabled for that view",
187 vote.view_number()
188 );
189 return;
190 }
191 HotShotEvent::QuorumVoteRecv(vote.to_vote2())
192 },
193 GeneralConsensusMessage::Vote2(vote) => {
194 if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
195 tracing::warn!(
196 "received GeneralConsensusMessage::Vote2 for view {} but \
197 epochs are not enabled for that view",
198 vote.view_number()
199 );
200 return;
201 }
202 HotShotEvent::QuorumVoteRecv(vote)
203 },
204 GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => {
205 if self
206 .upgrade_lock
207 .epochs_enabled(view_sync_message.view_number())
208 .await
209 {
210 tracing::warn!(
211 "received GeneralConsensusMessage::ViewSyncPreCommitVote for \
212 view {} but epochs are enabled for that view",
213 view_sync_message.view_number()
214 );
215 return;
216 }
217 HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message.to_vote2())
218 },
219 GeneralConsensusMessage::ViewSyncPreCommitVote2(view_sync_message) => {
220 if !self
221 .upgrade_lock
222 .epochs_enabled(view_sync_message.view_number())
223 .await
224 {
225 tracing::warn!(
226 "received GeneralConsensusMessage::ViewSyncPreCommitVote2 for \
227 view {} but epochs are not enabled for that view",
228 view_sync_message.view_number()
229 );
230 return;
231 }
232 HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message)
233 },
234 GeneralConsensusMessage::ViewSyncPreCommitCertificate(
235 view_sync_message,
236 ) => {
237 if self
238 .upgrade_lock
239 .epochs_enabled(view_sync_message.view_number())
240 .await
241 {
242 tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
243 return;
244 }
245 HotShotEvent::ViewSyncPreCommitCertificateRecv(
246 view_sync_message.to_vsc2(),
247 )
248 },
249 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(
250 view_sync_message,
251 ) => {
252 if !self
253 .upgrade_lock
254 .epochs_enabled(view_sync_message.view_number())
255 .await
256 {
257 tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
258 return;
259 }
260 HotShotEvent::ViewSyncPreCommitCertificateRecv(view_sync_message)
261 },
262 GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => {
263 if self
264 .upgrade_lock
265 .epochs_enabled(view_sync_message.view_number())
266 .await
267 {
268 tracing::warn!(
269 "received GeneralConsensusMessage::ViewSyncCommitVote for \
270 view {} but epochs are enabled for that view",
271 view_sync_message.view_number()
272 );
273 return;
274 }
275 HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message.to_vote2())
276 },
277 GeneralConsensusMessage::ViewSyncCommitVote2(view_sync_message) => {
278 if !self
279 .upgrade_lock
280 .epochs_enabled(view_sync_message.view_number())
281 .await
282 {
283 tracing::warn!(
284 "received GeneralConsensusMessage::ViewSyncCommitVote2 for \
285 view {} but epochs are not enabled for that view",
286 view_sync_message.view_number()
287 );
288 return;
289 }
290 HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message)
291 },
292 GeneralConsensusMessage::ViewSyncCommitCertificate(view_sync_message) => {
293 if self
294 .upgrade_lock
295 .epochs_enabled(view_sync_message.view_number())
296 .await
297 {
298 tracing::warn!(
299 "received GeneralConsensusMessage::ViewSyncCommitCertificate \
300 for view {} but epochs are enabled for that view",
301 view_sync_message.view_number()
302 );
303 return;
304 }
305 HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message.to_vsc2())
306 },
307 GeneralConsensusMessage::ViewSyncCommitCertificate2(view_sync_message) => {
308 if !self
309 .upgrade_lock
310 .epochs_enabled(view_sync_message.view_number())
311 .await
312 {
313 tracing::warn!(
314 "received GeneralConsensusMessage::ViewSyncCommitCertificate2 \
315 for view {} but epochs are not enabled for that view",
316 view_sync_message.view_number()
317 );
318 return;
319 }
320 HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message)
321 },
322 GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => {
323 if self
324 .upgrade_lock
325 .epochs_enabled(view_sync_message.view_number())
326 .await
327 {
328 tracing::warn!(
329 "received GeneralConsensusMessage::ViewSyncFinalizeVote for \
330 view {} but epochs are enabled for that view",
331 view_sync_message.view_number()
332 );
333 return;
334 }
335 HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message.to_vote2())
336 },
337 GeneralConsensusMessage::ViewSyncFinalizeVote2(view_sync_message) => {
338 if !self
339 .upgrade_lock
340 .epochs_enabled(view_sync_message.view_number())
341 .await
342 {
343 tracing::warn!(
344 "received GeneralConsensusMessage::ViewSyncFinalizeVote2 for \
345 view {} but epochs are not enabled for that view",
346 view_sync_message.view_number()
347 );
348 return;
349 }
350 HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message)
351 },
352 GeneralConsensusMessage::ViewSyncFinalizeCertificate(view_sync_message) => {
353 if self
354 .upgrade_lock
355 .epochs_enabled(view_sync_message.view_number())
356 .await
357 {
358 tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
359 return;
360 }
361 HotShotEvent::ViewSyncFinalizeCertificateRecv(
362 view_sync_message.to_vsc2(),
363 )
364 },
365 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(
366 view_sync_message,
367 ) => {
368 if !self
369 .upgrade_lock
370 .epochs_enabled(view_sync_message.view_number())
371 .await
372 {
373 tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
374 return;
375 }
376 HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_message)
377 },
378 GeneralConsensusMessage::TimeoutVote(message) => {
379 if self
380 .upgrade_lock
381 .epochs_enabled(message.view_number())
382 .await
383 {
384 tracing::warn!(
385 "received GeneralConsensusMessage::TimeoutVote for view {} \
386 but epochs are enabled for that view",
387 message.view_number()
388 );
389 return;
390 }
391 HotShotEvent::TimeoutVoteRecv(message.to_vote2())
392 },
393 GeneralConsensusMessage::TimeoutVote2(message) => {
394 if !self
395 .upgrade_lock
396 .epochs_enabled(message.view_number())
397 .await
398 {
399 tracing::warn!(
400 "received GeneralConsensusMessage::TimeoutVote2 for view {} \
401 but epochs are not enabled for that view",
402 message.view_number()
403 );
404 return;
405 }
406 HotShotEvent::TimeoutVoteRecv(message)
407 },
408 GeneralConsensusMessage::UpgradeProposal(message) => {
409 HotShotEvent::UpgradeProposalRecv(message, sender)
410 },
411 GeneralConsensusMessage::UpgradeVote(message) => {
412 tracing::error!("Received upgrade vote!");
413 HotShotEvent::UpgradeVoteRecv(message)
414 },
415 GeneralConsensusMessage::HighQc(qc, next_qc) => {
416 HotShotEvent::HighQcRecv(qc, next_qc, sender)
417 },
418 GeneralConsensusMessage::ExtendedQc(qc, next_epoch_qc) => {
419 HotShotEvent::ExtendedQcRecv(qc, next_epoch_qc, sender)
420 },
421 GeneralConsensusMessage::EpochRootQuorumVote(vote) => {
422 if !self
423 .upgrade_lock
424 .proposal2_legacy_version(vote.view_number())
425 .await
426 {
427 tracing::warn!(
428 "received GeneralConsensusMessage::EpochRootQuorumVote for \
429 view {} but we do not expect this message in this version",
430 vote.view_number()
431 );
432 return;
433 }
434 HotShotEvent::EpochRootQuorumVoteRecv(vote.to_vote2())
435 },
436 GeneralConsensusMessage::EpochRootQuorumVote2(vote) => {
437 if !self
438 .upgrade_lock
439 .proposal2_version(vote.view_number())
440 .await
441 {
442 tracing::warn!(
443 "received GeneralConsensusMessage::EpochRootQuorumVote2 for \
444 view {} but we do not expect this message in this version",
445 vote.view_number()
446 );
447 return;
448 }
449 HotShotEvent::EpochRootQuorumVoteRecv(vote)
450 },
451 GeneralConsensusMessage::EpochRootQc(root_qc) => {
452 if !self
453 .upgrade_lock
454 .proposal2_version(root_qc.view_number())
455 .await
456 {
457 tracing::warn!(
458 "received GeneralConsensusMessage::EpochRootQc for view {} \
459 but we are in the wrong version for that message types",
460 root_qc.view_number()
461 );
462 return;
463 }
464 HotShotEvent::EpochRootQcRecv(root_qc, sender)
465 },
466 GeneralConsensusMessage::EpochRootQcV1(root_qc) => {
467 if !self
468 .upgrade_lock
469 .proposal2_legacy_version(root_qc.view_number())
470 .await
471 {
472 tracing::warn!(
473 "received GeneralConsensusMessage::EpochRootQcV1 for view {} \
474 but we are in the wrong version for that message type",
475 root_qc.view_number()
476 );
477 return;
478 }
479 HotShotEvent::EpochRootQcRecv(root_qc.into(), sender)
480 },
481 },
482 SequencingMessage::Da(da_message) => match da_message {
483 DaConsensusMessage::DaProposal(proposal) => {
484 if self
485 .upgrade_lock
486 .epochs_enabled(proposal.data.view_number())
487 .await
488 {
489 tracing::warn!(
490 "received DaConsensusMessage::DaProposal for view {} but \
491 epochs are enabled for that view",
492 proposal.data.view_number()
493 );
494 return;
495 }
496 HotShotEvent::DaProposalRecv(convert_proposal(proposal), sender)
497 },
498 DaConsensusMessage::DaProposal2(proposal) => {
499 if !self
500 .upgrade_lock
501 .epochs_enabled(proposal.data.view_number())
502 .await
503 {
504 tracing::warn!(
505 "received DaConsensusMessage::DaProposal2 for view {} but \
506 epochs are not enabled for that view",
507 proposal.data.view_number()
508 );
509 return;
510 }
511 HotShotEvent::DaProposalRecv(proposal, sender)
512 },
513 DaConsensusMessage::DaVote(vote) => {
514 if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
515 tracing::warn!(
516 "received DaConsensusMessage::DaVote for view {} but epochs \
517 are enabled for that view",
518 vote.view_number()
519 );
520 return;
521 }
522 HotShotEvent::DaVoteRecv(vote.clone().to_vote2())
523 },
524 DaConsensusMessage::DaVote2(vote) => {
525 if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
526 tracing::warn!(
527 "received DaConsensusMessage::DaVote2 for view {} but epochs \
528 are not enabled for that view",
529 vote.view_number()
530 );
531 return;
532 }
533 HotShotEvent::DaVoteRecv(vote.clone())
534 },
535 DaConsensusMessage::DaCertificate(cert) => {
536 if self.upgrade_lock.epochs_enabled(cert.view_number()).await {
537 tracing::warn!(
538 "received DaConsensusMessage::DaCertificate for view {} but \
539 epochs are enabled for that view",
540 cert.view_number()
541 );
542 return;
543 }
544 HotShotEvent::DaCertificateRecv(cert.to_dac2())
545 },
546 DaConsensusMessage::DaCertificate2(cert) => {
547 if !self.upgrade_lock.epochs_enabled(cert.view_number()).await {
548 tracing::warn!(
549 "received DaConsensusMessage::DaCertificate2 for view {} but \
550 epochs are not enabled for that view",
551 cert.view_number()
552 );
553 return;
554 }
555 HotShotEvent::DaCertificateRecv(cert)
556 },
557 DaConsensusMessage::VidDisperseMsg(proposal) => {
558 if self
559 .upgrade_lock
560 .epochs_enabled(proposal.data.view_number())
561 .await
562 {
563 tracing::warn!(
564 "received DaConsensusMessage::VidDisperseMsg for view {} but \
565 epochs are enabled for that view",
566 proposal.data.view_number()
567 );
568 return;
569 }
570 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
571 },
572 DaConsensusMessage::VidDisperseMsg1(proposal) => {
573 if !self
574 .upgrade_lock
575 .epochs_enabled(proposal.data.view_number())
576 .await
577 {
578 tracing::warn!(
579 "received DaConsensusMessage::VidDisperseMsg1 for view {} but \
580 epochs are not enabled for that view",
581 proposal.data.view_number()
582 );
583 return;
584 }
585 if self
586 .upgrade_lock
587 .upgraded_vid2(proposal.data.view_number())
588 .await
589 {
590 tracing::warn!(
591 "received DaConsensusMessage::VidDisperseMsg1 for view {} but \
592 vid2 upgrade is enabled for that view",
593 proposal.data.view_number()
594 );
595 return;
596 }
597 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
598 },
599 DaConsensusMessage::VidDisperseMsg2(proposal) => {
600 if !self
601 .upgrade_lock
602 .upgraded_vid2(proposal.data.view_number())
603 .await
604 {
605 tracing::warn!(
606 "received DaConsensusMessage::VidDisperseMsg2 for view {} but \
607 vid2 upgrade is not enabled for that view",
608 proposal.data.view_number()
609 );
610 return;
611 }
612 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
613 },
614 },
615 };
616 broadcast_event(Arc::new(event), &self.internal_event_stream).await;
617 },
618
619 MessageKind::Data(message) => match message {
621 DataMessage::SubmitTransaction(transaction, _) => {
622 let mut hasher = DefaultHasher::new();
623 transaction.hash(&mut hasher);
624 broadcast_event(
625 Arc::new(HotShotEvent::TransactionsRecv(vec![transaction])),
626 &self.internal_event_stream,
627 )
628 .await;
629 },
630 DataMessage::DataResponse(response) => {
631 if let ResponseMessage::Found(message) = response {
632 match message {
633 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(proposal)) => {
634 broadcast_event(
635 Arc::new(HotShotEvent::VidResponseRecv(
636 sender,
637 convert_proposal(proposal),
638 )),
639 &self.internal_event_stream,
640 )
641 .await;
642 },
643 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg1(
644 proposal,
645 )) => {
646 broadcast_event(
647 Arc::new(HotShotEvent::VidResponseRecv(
648 sender,
649 convert_proposal(proposal),
650 )),
651 &self.internal_event_stream,
652 )
653 .await;
654 },
655 _ => {},
656 }
657 }
658 },
659 DataMessage::RequestData(data) => {
660 let req_data = data.clone();
661 if let RequestKind::Vid(_view_number, _key) = req_data.request {
662 broadcast_event(
663 Arc::new(HotShotEvent::VidRequestRecv(data, sender)),
664 &self.internal_event_stream,
665 )
666 .await;
667 }
668 },
669 },
670
671 MessageKind::External(data) => {
673 if sender == self.public_key {
674 return;
675 }
676 broadcast_event(
678 Event {
679 view_number: TYPES::View::new(1),
680 event: EventType::ExternalMessageReceived { sender, data },
681 },
682 &self.external_event_stream,
683 )
684 .await;
685 },
686 }
687 }
688}
689
690pub struct NetworkEventTaskState<
692 TYPES: NodeType,
693 V: Versions,
694 NET: ConnectedNetwork<TYPES::SignatureKey>,
695 S: Storage<TYPES>,
696> {
697 pub network: Arc<NET>,
699
700 pub view: TYPES::View,
702
703 pub epoch: Option<TYPES::Epoch>,
705
706 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
708
709 pub storage: S,
711
712 pub storage_metrics: Arc<StorageMetricsValue>,
714
715 pub consensus: OuterConsensus<TYPES>,
717
718 pub upgrade_lock: UpgradeLock<TYPES, V>,
720
721 pub transmit_tasks: BTreeMap<TYPES::View, Vec<JoinHandle<()>>>,
723
724 pub epoch_height: u64,
726
727 pub id: u64,
729}
730
731#[async_trait]
732impl<
733 TYPES: NodeType,
734 V: Versions,
735 NET: ConnectedNetwork<TYPES::SignatureKey>,
736 S: Storage<TYPES> + 'static,
737 > TaskState for NetworkEventTaskState<TYPES, V, NET, S>
738{
739 type Event = HotShotEvent<TYPES>;
740
741 async fn handle_event(
742 &mut self,
743 event: Arc<Self::Event>,
744 _sender: &Sender<Arc<Self::Event>>,
745 _receiver: &Receiver<Arc<Self::Event>>,
746 ) -> Result<()> {
747 self.handle(event).await;
748
749 Ok(())
750 }
751
752 fn cancel_subtasks(&mut self) {}
753}
754
755impl<
756 TYPES: NodeType,
757 V: Versions,
758 NET: ConnectedNetwork<TYPES::SignatureKey>,
759 S: Storage<TYPES> + 'static,
760 > NetworkEventTaskState<TYPES, V, NET, S>
761{
762 #[instrument(skip_all, fields(id = self.id, view = *self.view), name = "Network Task", level = "error")]
766 pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
767 let mut maybe_action = None;
768 if let Some((sender, message_kind, transmit)) =
769 self.parse_event(event, &mut maybe_action).await
770 {
771 self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
772 .await;
773 };
774 }
775
776 async fn handle_vid_disperse_proposal(
778 &self,
779 vid_proposal: Proposal<TYPES, VidDisperse<TYPES>>,
780 sender: &<TYPES as NodeType>::SignatureKey,
781 ) -> Option<HotShotTaskCompleted> {
782 let view = vid_proposal.data.view_number();
783 let epoch = vid_proposal.data.epoch();
784 let vid_share_proposals = VidDisperse::to_share_proposals(vid_proposal);
785 let mut messages = HashMap::new();
786
787 for proposal in vid_share_proposals {
788 let recipient = proposal.data.recipient_key().clone();
789 let epochs_enabled = self
790 .upgrade_lock
791 .epochs_enabled(proposal.data.view_number())
792 .await;
793 let upgraded_vid2 = self
794 .upgrade_lock
795 .upgraded_vid2(proposal.data.view_number())
796 .await;
797 let message = if !epochs_enabled {
798 let vid_share_proposal = if let VidDisperseShare::V0(data) = proposal.data {
799 Proposal {
800 data,
801 signature: proposal.signature,
802 _pd: proposal._pd,
803 }
804 } else {
805 tracing::warn!(
806 "Epochs are not enabled for view {} but didn't receive VidDisperseShare1",
807 proposal.data.view_number()
808 );
809 return None;
810 };
811 Message {
812 sender: sender.clone(),
813 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
814 DaConsensusMessage::VidDisperseMsg(vid_share_proposal),
815 )),
816 }
817 } else if !upgraded_vid2 {
818 let vid_share_proposal = if let VidDisperseShare::V1(data) = proposal.data {
819 Proposal {
820 data,
821 signature: proposal.signature,
822 _pd: proposal._pd,
823 }
824 } else {
825 tracing::warn!(
826 "Epochs are enabled and Vid2Upgrade is not enabled for view {} but didn't \
827 receive VidDisperseShare2",
828 proposal.data.view_number()
829 );
830 return None;
831 };
832 Message {
833 sender: sender.clone(),
834 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
835 DaConsensusMessage::VidDisperseMsg1(vid_share_proposal),
836 )),
837 }
838 } else {
839 let vid_share_proposal = if let VidDisperseShare::V2(data) = proposal.data {
840 Proposal {
841 data,
842 signature: proposal.signature,
843 _pd: proposal._pd,
844 }
845 } else {
846 tracing::warn!(
847 "Vid2Upgrade is enabled for view {} but didn't receive VidDisperseShare2",
848 proposal.data.view_number()
849 );
850 return None;
851 };
852 Message {
853 sender: sender.clone(),
854 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
855 DaConsensusMessage::VidDisperseMsg2(vid_share_proposal),
856 )),
857 }
858 };
859 let view = message.view_number();
860 let serialized_message = match self.upgrade_lock.serialize(&message).await {
861 Ok(serialized) => serialized,
862 Err(e) => {
863 tracing::error!("Failed to serialize message: {e}");
864 continue;
865 },
866 };
867
868 messages.insert(recipient, (view.u64().into(), serialized_message));
869 }
870
871 let net = Arc::clone(&self.network);
872 let storage = self.storage.clone();
873 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
874 spawn(async move {
875 if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
876 Some(HotShotAction::VidDisperse),
877 storage,
878 consensus,
879 view,
880 epoch,
881 )
882 .await
883 .is_err()
884 {
885 return;
886 }
887 match net.vid_broadcast_message(messages).await {
888 Ok(()) => {},
889 Err(e) => tracing::warn!("Failed to send message from network task: {e:?}"),
890 }
891 });
892
893 None
894 }
895
896 async fn maybe_record_action(
898 maybe_action: Option<HotShotAction>,
899 storage: S,
900 consensus: OuterConsensus<TYPES>,
901 view: <TYPES as NodeType>::View,
902 epoch: Option<<TYPES as NodeType>::Epoch>,
903 ) -> std::result::Result<(), ()> {
904 if let Some(mut action) = maybe_action {
905 if !consensus.write().await.update_action(action, view) {
906 return Err(());
907 }
908 if matches!(action, HotShotAction::ViewSyncVote) {
911 action = HotShotAction::Vote;
912 }
913 match storage.record_action(view, epoch, action).await {
914 Ok(()) => Ok(()),
915 Err(e) => {
916 tracing::warn!("Not Sending {action:?} because of storage error: {e:?}");
917 Err(())
918 },
919 }
920 } else {
921 Ok(())
922 }
923 }
924
925 pub fn cancel_tasks(&mut self, view: TYPES::View) {
927 let keep = self.transmit_tasks.split_off(&view);
928
929 while let Some((_, tasks)) = self.transmit_tasks.pop_first() {
930 for task in tasks {
931 task.abort();
932 }
933 }
934
935 self.transmit_tasks = keep;
936 }
937
938 #[allow(clippy::too_many_lines)]
943 async fn parse_event(
944 &mut self,
945 event: Arc<HotShotEvent<TYPES>>,
946 maybe_action: &mut Option<HotShotAction>,
947 ) -> Option<(
948 <TYPES as NodeType>::SignatureKey,
949 MessageKind<TYPES>,
950 TransmitType<TYPES>,
951 )> {
952 match event.as_ref().clone() {
953 HotShotEvent::QuorumProposalSend(proposal, sender) => {
954 *maybe_action = Some(HotShotAction::Propose);
955
956 let message = if self
957 .upgrade_lock
958 .proposal2_version(proposal.data.view_number())
959 .await
960 {
961 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
962 GeneralConsensusMessage::Proposal2(convert_proposal(proposal)),
963 ))
964 } else if self
965 .upgrade_lock
966 .proposal2_legacy_version(proposal.data.view_number())
967 .await
968 {
969 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
970 GeneralConsensusMessage::Proposal2Legacy(convert_proposal(proposal)),
971 ))
972 } else {
973 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
974 GeneralConsensusMessage::Proposal(convert_proposal(proposal)),
975 ))
976 };
977
978 Some((sender, message, TransmitType::Broadcast))
979 },
980
981 HotShotEvent::QuorumVoteSend(vote) => {
983 *maybe_action = Some(HotShotAction::Vote);
984 let view_number = vote.view_number() + 1;
985 let leader = match self
986 .membership_coordinator
987 .membership_for_epoch(vote.epoch())
988 .await
989 .ok()?
990 .leader(view_number)
991 .await
992 {
993 Ok(l) => l,
994 Err(e) => {
995 tracing::warn!(
996 "Failed to calculate leader for view number {view_number}. Error: \
997 {e:?}"
998 );
999 return None;
1000 },
1001 };
1002
1003 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1004 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1005 GeneralConsensusMessage::Vote2(vote.clone()),
1006 ))
1007 } else {
1008 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1009 GeneralConsensusMessage::Vote(vote.clone().to_vote()),
1010 ))
1011 };
1012
1013 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1014 },
1015 HotShotEvent::EpochRootQuorumVoteSend(vote) => {
1016 *maybe_action = Some(HotShotAction::Vote);
1017 let view_number = vote.view_number() + 1;
1018 let leader = match self
1019 .membership_coordinator
1020 .membership_for_epoch(vote.epoch())
1021 .await
1022 .ok()?
1023 .leader(view_number)
1024 .await
1025 {
1026 Ok(l) => l,
1027 Err(e) => {
1028 tracing::warn!(
1029 "Failed to calculate leader for view number {:?}. Error: {:?}",
1030 view_number,
1031 e
1032 );
1033 return None;
1034 },
1035 };
1036
1037 let message = if self
1038 .upgrade_lock
1039 .proposal2_version(vote.view_number())
1040 .await
1041 {
1042 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1043 GeneralConsensusMessage::EpochRootQuorumVote2(vote.clone()),
1044 ))
1045 } else if self
1046 .upgrade_lock
1047 .proposal2_legacy_version(vote.view_number())
1048 .await
1049 {
1050 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1051 GeneralConsensusMessage::EpochRootQuorumVote(vote.clone().to_vote()),
1052 ))
1053 } else {
1054 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1055 GeneralConsensusMessage::Vote(vote.vote.clone().to_vote()),
1056 ))
1057 };
1058
1059 Some((
1060 vote.vote.signing_key(),
1061 message,
1062 TransmitType::Direct(leader),
1063 ))
1064 },
1065 HotShotEvent::ExtendedQuorumVoteSend(vote) => {
1066 *maybe_action = Some(HotShotAction::Vote);
1067 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1068 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1069 GeneralConsensusMessage::Vote2(vote.clone()),
1070 ))
1071 } else {
1072 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1073 GeneralConsensusMessage::Vote(vote.clone().to_vote()),
1074 ))
1075 };
1076
1077 Some((vote.signing_key(), message, TransmitType::Broadcast))
1078 },
1079 HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
1080 req.key.clone(),
1081 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1082 GeneralConsensusMessage::ProposalRequested(req.clone(), signature),
1083 )),
1084 TransmitType::Broadcast,
1085 )),
1086 HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => {
1087 let message = if self
1088 .upgrade_lock
1089 .proposal2_version(proposal.data.view_number())
1090 .await
1091 {
1092 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1093 GeneralConsensusMessage::ProposalResponse2(convert_proposal(proposal)),
1094 ))
1095 } else if self
1096 .upgrade_lock
1097 .proposal2_legacy_version(proposal.data.view_number())
1098 .await
1099 {
1100 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1101 GeneralConsensusMessage::ProposalResponse2Legacy(convert_proposal(
1102 proposal,
1103 )),
1104 ))
1105 } else {
1106 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1107 GeneralConsensusMessage::ProposalResponse(convert_proposal(proposal)),
1108 ))
1109 };
1110
1111 Some((
1112 sender_key.clone(),
1113 message,
1114 TransmitType::Direct(sender_key),
1115 ))
1116 },
1117 HotShotEvent::VidDisperseSend(proposal, sender) => {
1118 self.handle_vid_disperse_proposal(proposal, &sender).await;
1119 None
1120 },
1121 HotShotEvent::DaProposalSend(proposal, sender) => {
1122 *maybe_action = Some(HotShotAction::DaPropose);
1123
1124 let message = if self
1125 .upgrade_lock
1126 .epochs_enabled(proposal.data.view_number())
1127 .await
1128 {
1129 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1130 DaConsensusMessage::DaProposal2(proposal),
1131 ))
1132 } else {
1133 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1134 DaConsensusMessage::DaProposal(convert_proposal(proposal)),
1135 ))
1136 };
1137
1138 Some((sender, message, TransmitType::DaCommitteeBroadcast))
1139 },
1140 HotShotEvent::DaVoteSend(vote) => {
1141 *maybe_action = Some(HotShotAction::DaVote);
1142 let view_number = vote.view_number();
1143 let leader = match self
1144 .membership_coordinator
1145 .membership_for_epoch(vote.epoch())
1146 .await
1147 .ok()?
1148 .leader(view_number)
1149 .await
1150 {
1151 Ok(l) => l,
1152 Err(e) => {
1153 tracing::warn!(
1154 "Failed to calculate leader for view number {view_number}. Error: \
1155 {e:?}"
1156 );
1157 return None;
1158 },
1159 };
1160
1161 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1162 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1163 DaConsensusMessage::DaVote2(vote.clone()),
1164 ))
1165 } else {
1166 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1167 DaConsensusMessage::DaVote(vote.clone().to_vote()),
1168 ))
1169 };
1170
1171 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1172 },
1173 HotShotEvent::DacSend(certificate, sender) => {
1174 *maybe_action = Some(HotShotAction::DaCert);
1175 let message = if self
1176 .upgrade_lock
1177 .epochs_enabled(certificate.view_number())
1178 .await
1179 {
1180 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1181 DaConsensusMessage::DaCertificate2(certificate),
1182 ))
1183 } else {
1184 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1185 DaConsensusMessage::DaCertificate(certificate.to_dac()),
1186 ))
1187 };
1188
1189 Some((sender, message, TransmitType::Broadcast))
1190 },
1191 HotShotEvent::ViewSyncPreCommitVoteSend(vote) => {
1192 let view_number = vote.view_number() + vote.date().relay;
1193 let leader = match self
1194 .membership_coordinator
1195 .membership_for_epoch(self.epoch)
1196 .await
1197 .ok()?
1198 .leader(view_number)
1199 .await
1200 {
1201 Ok(l) => l,
1202 Err(e) => {
1203 tracing::warn!(
1204 "Failed to calculate leader for view number {view_number}. Error: \
1205 {e:?}"
1206 );
1207 return None;
1208 },
1209 };
1210 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1211 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1212 GeneralConsensusMessage::ViewSyncPreCommitVote2(vote.clone()),
1213 ))
1214 } else {
1215 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1216 GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone().to_vote()),
1217 ))
1218 };
1219
1220 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1221 },
1222 HotShotEvent::ViewSyncCommitVoteSend(vote) => {
1223 *maybe_action = Some(HotShotAction::ViewSyncVote);
1224 let view_number = vote.view_number() + vote.date().relay;
1225 let leader = match self
1226 .membership_coordinator
1227 .membership_for_epoch(self.epoch)
1228 .await
1229 .ok()?
1230 .leader(view_number)
1231 .await
1232 {
1233 Ok(l) => l,
1234 Err(e) => {
1235 tracing::warn!(
1236 "Failed to calculate leader for view number {view_number}. Error: \
1237 {e:?}"
1238 );
1239 return None;
1240 },
1241 };
1242 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1243 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1244 GeneralConsensusMessage::ViewSyncCommitVote2(vote.clone()),
1245 ))
1246 } else {
1247 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1248 GeneralConsensusMessage::ViewSyncCommitVote(vote.clone().to_vote()),
1249 ))
1250 };
1251
1252 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1253 },
1254 HotShotEvent::ViewSyncFinalizeVoteSend(vote) => {
1255 *maybe_action = Some(HotShotAction::ViewSyncVote);
1256 let view_number = vote.view_number() + vote.date().relay;
1257 let leader = match self
1258 .membership_coordinator
1259 .membership_for_epoch(self.epoch)
1260 .await
1261 .ok()?
1262 .leader(view_number)
1263 .await
1264 {
1265 Ok(l) => l,
1266 Err(e) => {
1267 tracing::warn!(
1268 "Failed to calculate leader for view number {view_number:?}. Error: \
1269 {e:?}"
1270 );
1271 return None;
1272 },
1273 };
1274 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1275 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1276 GeneralConsensusMessage::ViewSyncFinalizeVote2(vote.clone()),
1277 ))
1278 } else {
1279 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1280 GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone().to_vote()),
1281 ))
1282 };
1283
1284 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1285 },
1286 HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, sender) => {
1287 let view_number = certificate.view_number();
1288 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1289 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1290 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(certificate),
1291 ))
1292 } else {
1293 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1294 GeneralConsensusMessage::ViewSyncPreCommitCertificate(certificate.to_vsc()),
1295 ))
1296 };
1297
1298 Some((sender, message, TransmitType::Broadcast))
1299 },
1300 HotShotEvent::ViewSyncCommitCertificateSend(certificate, sender) => {
1301 let view_number = certificate.view_number();
1302 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1303 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1304 GeneralConsensusMessage::ViewSyncCommitCertificate2(certificate),
1305 ))
1306 } else {
1307 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1308 GeneralConsensusMessage::ViewSyncCommitCertificate(certificate.to_vsc()),
1309 ))
1310 };
1311
1312 Some((sender, message, TransmitType::Broadcast))
1313 },
1314 HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, sender) => {
1315 let view_number = certificate.view_number();
1316 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1317 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1318 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(certificate),
1319 ))
1320 } else {
1321 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1322 GeneralConsensusMessage::ViewSyncFinalizeCertificate(certificate.to_vsc()),
1323 ))
1324 };
1325
1326 Some((sender, message, TransmitType::Broadcast))
1327 },
1328 HotShotEvent::TimeoutVoteSend(vote) => {
1329 *maybe_action = Some(HotShotAction::TimeoutVote);
1330 let view_number = vote.view_number() + 1;
1331 let leader = match self
1332 .membership_coordinator
1333 .membership_for_epoch(self.epoch)
1334 .await
1335 .ok()?
1336 .leader(view_number)
1337 .await
1338 {
1339 Ok(l) => l,
1340 Err(e) => {
1341 tracing::warn!(
1342 "Failed to calculate leader for view number {view_number}. Error: \
1343 {e:?}"
1344 );
1345 return None;
1346 },
1347 };
1348 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1349 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1350 GeneralConsensusMessage::TimeoutVote2(vote.clone()),
1351 ))
1352 } else {
1353 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1354 GeneralConsensusMessage::TimeoutVote(vote.clone().to_vote()),
1355 ))
1356 };
1357
1358 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1359 },
1360 HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
1361 sender,
1362 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1363 GeneralConsensusMessage::UpgradeProposal(proposal),
1364 )),
1365 TransmitType::Broadcast,
1366 )),
1367 HotShotEvent::UpgradeVoteSend(vote) => {
1368 tracing::error!("Sending upgrade vote!");
1369 let view_number = vote.view_number();
1370 let leader = match self
1371 .membership_coordinator
1372 .membership_for_epoch(self.epoch)
1373 .await
1374 .ok()?
1375 .leader(view_number)
1376 .await
1377 {
1378 Ok(l) => l,
1379 Err(e) => {
1380 tracing::warn!(
1381 "Failed to calculate leader for view number {view_number}. Error: \
1382 {e:?}"
1383 );
1384 return None;
1385 },
1386 };
1387 Some((
1388 vote.signing_key(),
1389 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1390 GeneralConsensusMessage::UpgradeVote(vote.clone()),
1391 )),
1392 TransmitType::Direct(leader),
1393 ))
1394 },
1395 HotShotEvent::ViewChange(view, epoch) => {
1396 self.view = view;
1397 if epoch > self.epoch {
1398 self.epoch = epoch;
1399 }
1400 let keep_view = TYPES::View::new(view.saturating_sub(1));
1401 self.cancel_tasks(keep_view);
1402 let net = Arc::clone(&self.network);
1403 let epoch = self.epoch.map(|x| x.u64().into());
1404 let membership_coordinator = self.membership_coordinator.clone();
1405 spawn(async move {
1406 net.update_view::<TYPES>(keep_view.u64().into(), epoch, membership_coordinator)
1407 .await;
1408 });
1409 None
1410 },
1411 HotShotEvent::VidRequestSend(req, sender, to) => Some((
1412 sender,
1413 MessageKind::Data(DataMessage::RequestData(req)),
1414 TransmitType::Direct(to),
1415 )),
1416 HotShotEvent::VidResponseSend(sender, to, proposal) => {
1417 let epochs_enabled = self
1418 .upgrade_lock
1419 .epochs_enabled(proposal.data.view_number())
1420 .await;
1421 let upgraded_vid2 = self
1422 .upgrade_lock
1423 .upgraded_vid2(proposal.data.view_number())
1424 .await;
1425 let message = match proposal.data {
1426 VidDisperseShare::V0(data) => {
1427 if epochs_enabled {
1428 tracing::warn!(
1429 "Epochs are enabled for view {} but still receive \
1430 VidDisperseShare0",
1431 data.view_number()
1432 );
1433 return None;
1434 }
1435 let vid_share_proposal = Proposal {
1436 data,
1437 signature: proposal.signature,
1438 _pd: proposal._pd,
1439 };
1440 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1441 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(
1442 vid_share_proposal,
1443 )),
1444 )))
1445 },
1446 VidDisperseShare::V1(data) => {
1447 if !epochs_enabled {
1448 tracing::warn!(
1449 "Epochs are enabled for view {} but didn't receive \
1450 VidDisperseShare0",
1451 data.view_number()
1452 );
1453 return None;
1454 }
1455 if upgraded_vid2 {
1456 tracing::warn!(
1457 "VID2 upgrade is enabled for view {} but didn't receive \
1458 VidDisperseShare2",
1459 data.view_number()
1460 );
1461 return None;
1462 }
1463 let vid_share_proposal = Proposal {
1464 data,
1465 signature: proposal.signature,
1466 _pd: proposal._pd,
1467 };
1468 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1469 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg1(
1470 vid_share_proposal,
1471 )),
1472 )))
1473 },
1474 VidDisperseShare::V2(data) => {
1475 if !upgraded_vid2 {
1476 tracing::warn!(
1477 "VID2 upgrade is not enabled for view {} but receive \
1478 VidDisperseShare2",
1479 data.view_number()
1480 );
1481 return None;
1482 }
1483 let vid_share_proposal = Proposal {
1484 data,
1485 signature: proposal.signature,
1486 _pd: proposal._pd,
1487 };
1488 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1489 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
1490 vid_share_proposal,
1491 )),
1492 )))
1493 },
1494 };
1495 Some((sender, message, TransmitType::Direct(to)))
1496 },
1497 HotShotEvent::HighQcSend(quorum_cert, next_epoch_qc, leader, sender) => Some((
1498 sender,
1499 MessageKind::Consensus(SequencingMessage::General(
1500 GeneralConsensusMessage::HighQc(quorum_cert, next_epoch_qc),
1501 )),
1502 TransmitType::Direct(leader),
1503 )),
1504 HotShotEvent::EpochRootQcSend(epoch_root_qc, sender, leader) => {
1505 let message = if self
1506 .upgrade_lock
1507 .proposal2_version(epoch_root_qc.view_number())
1508 .await
1509 {
1510 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1511 GeneralConsensusMessage::EpochRootQc(epoch_root_qc),
1512 ))
1513 } else {
1514 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1515 GeneralConsensusMessage::EpochRootQcV1(epoch_root_qc.into()),
1516 ))
1517 };
1518
1519 Some((sender, message, TransmitType::Direct(leader)))
1520 },
1521 HotShotEvent::ExtendedQcSend(quorum_cert, next_epoch_qc, sender) => Some((
1522 sender,
1523 MessageKind::Consensus(SequencingMessage::General(
1524 GeneralConsensusMessage::ExtendedQc(quorum_cert, next_epoch_qc),
1525 )),
1526 TransmitType::Broadcast,
1527 )),
1528 _ => None,
1529 }
1530 }
1531
1532 async fn spawn_transmit_task(
1534 &mut self,
1535 message_kind: MessageKind<TYPES>,
1536 maybe_action: Option<HotShotAction>,
1537 transmit: TransmitType<TYPES>,
1538 sender: TYPES::SignatureKey,
1539 ) {
1540 let broadcast_delay = match &message_kind {
1541 MessageKind::Consensus(
1542 SequencingMessage::General(GeneralConsensusMessage::Vote(_))
1543 | SequencingMessage::Da(_),
1544 ) => BroadcastDelay::View(*message_kind.view_number()),
1545 _ => BroadcastDelay::None,
1546 };
1547 let message = Message {
1548 sender,
1549 kind: message_kind,
1550 };
1551 let view_number = message.kind.view_number();
1552 let epoch = message.kind.epoch();
1553 let committee_topic = Topic::Global;
1554 let Ok(mem) = self
1555 .membership_coordinator
1556 .stake_table_for_epoch(self.epoch)
1557 .await
1558 else {
1559 return;
1560 };
1561 let da_committee = mem.da_committee_members(view_number).await;
1562 let network = Arc::clone(&self.network);
1563 let storage = self.storage.clone();
1564 let storage_metrics = Arc::clone(&self.storage_metrics);
1565 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
1566 let upgrade_lock = self.upgrade_lock.clone();
1567 let handle = spawn(async move {
1568 if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
1569 maybe_action,
1570 storage.clone(),
1571 consensus,
1572 view_number,
1573 epoch,
1574 )
1575 .await
1576 .is_err()
1577 {
1578 return;
1579 }
1580 if let MessageKind::Consensus(SequencingMessage::General(
1581 GeneralConsensusMessage::Proposal(prop),
1582 )) = &message.kind
1583 {
1584 let now = Instant::now();
1585 if storage
1586 .append_proposal2(&convert_proposal(prop.clone()))
1587 .await
1588 .is_err()
1589 {
1590 return;
1591 }
1592 storage_metrics
1593 .append_quorum_duration
1594 .add_point(now.elapsed().as_secs_f64());
1595 }
1596
1597 let serialized_message = match upgrade_lock.serialize(&message).await {
1598 Ok(serialized) => serialized,
1599 Err(e) => {
1600 tracing::error!("Failed to serialize message: {e}");
1601 return;
1602 },
1603 };
1604
1605 let transmit_result = match transmit {
1606 TransmitType::Direct(recipient) => {
1607 network
1608 .direct_message(view_number.u64().into(), serialized_message, recipient)
1609 .await
1610 },
1611 TransmitType::Broadcast => {
1612 network
1613 .broadcast_message(
1614 view_number.u64().into(),
1615 serialized_message,
1616 committee_topic,
1617 broadcast_delay,
1618 )
1619 .await
1620 },
1621 TransmitType::DaCommitteeBroadcast => {
1622 network
1623 .da_broadcast_message(
1624 view_number.u64().into(),
1625 serialized_message,
1626 da_committee.iter().cloned().collect(),
1627 broadcast_delay,
1628 )
1629 .await
1630 },
1631 };
1632
1633 match transmit_result {
1634 Ok(()) => {},
1635 Err(e) => tracing::warn!("Failed to send message task: {e:?}"),
1636 }
1637 });
1638 self.transmit_tasks
1639 .entry(view_number)
1640 .or_default()
1641 .push(handle);
1642 }
1643}
1644
1645pub mod test {
1647 use std::ops::{Deref, DerefMut};
1648
1649 use async_trait::async_trait;
1650
1651 use super::{
1652 Arc, ConnectedNetwork, HotShotEvent, MessageKind, NetworkEventTaskState, NodeType,
1653 Receiver, Result, Sender, Storage, TaskState, TransmitType, Versions,
1654 };
1655
1656 pub type ModifierClosure<TYPES> = dyn Fn(
1659 &mut <TYPES as NodeType>::SignatureKey,
1660 &mut MessageKind<TYPES>,
1661 &mut TransmitType<TYPES>,
1662 &<TYPES as NodeType>::Membership,
1663 ) + Send
1664 + Sync;
1665
1666 pub struct NetworkEventTaskStateModifier<
1668 TYPES: NodeType,
1669 V: Versions,
1670 NET: ConnectedNetwork<TYPES::SignatureKey>,
1671 S: Storage<TYPES>,
1672 > {
1673 pub network_event_task_state: NetworkEventTaskState<TYPES, V, NET, S>,
1675 pub modifier: Arc<ModifierClosure<TYPES>>,
1678 }
1679
1680 impl<
1681 TYPES: NodeType,
1682 V: Versions,
1683 NET: ConnectedNetwork<TYPES::SignatureKey>,
1684 S: Storage<TYPES> + 'static,
1685 > NetworkEventTaskStateModifier<TYPES, V, NET, S>
1686 {
1687 pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
1689 let mut maybe_action = None;
1690 if let Some((mut sender, mut message_kind, mut transmit)) =
1691 self.parse_event(event, &mut maybe_action).await
1692 {
1693 (self.modifier)(
1695 &mut sender,
1696 &mut message_kind,
1697 &mut transmit,
1698 &*self.membership_coordinator.membership().read().await,
1699 );
1700
1701 self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
1702 .await;
1703 }
1704 }
1705 }
1706
1707 #[async_trait]
1708 impl<
1709 TYPES: NodeType,
1710 V: Versions,
1711 NET: ConnectedNetwork<TYPES::SignatureKey>,
1712 S: Storage<TYPES> + 'static,
1713 > TaskState for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1714 {
1715 type Event = HotShotEvent<TYPES>;
1716
1717 async fn handle_event(
1718 &mut self,
1719 event: Arc<Self::Event>,
1720 _sender: &Sender<Arc<Self::Event>>,
1721 _receiver: &Receiver<Arc<Self::Event>>,
1722 ) -> Result<()> {
1723 self.handle(event).await;
1724
1725 Ok(())
1726 }
1727
1728 fn cancel_subtasks(&mut self) {}
1729 }
1730
1731 impl<
1732 TYPES: NodeType,
1733 V: Versions,
1734 NET: ConnectedNetwork<TYPES::SignatureKey>,
1735 S: Storage<TYPES>,
1736 > Deref for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1737 {
1738 type Target = NetworkEventTaskState<TYPES, V, NET, S>;
1739
1740 fn deref(&self) -> &Self::Target {
1741 &self.network_event_task_state
1742 }
1743 }
1744
1745 impl<
1746 TYPES: NodeType,
1747 V: Versions,
1748 NET: ConnectedNetwork<TYPES::SignatureKey>,
1749 S: Storage<TYPES>,
1750 > DerefMut for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1751 {
1752 fn deref_mut(&mut self) -> &mut Self::Target {
1753 &mut self.network_event_task_state
1754 }
1755 }
1756}