1use std::{
8 collections::{BTreeMap, HashMap},
9 hash::{DefaultHasher, Hash},
10 sync::Arc,
11};
12
13use async_broadcast::{Receiver, Sender};
14use async_trait::async_trait;
15use hotshot_task::task::TaskState;
16use hotshot_types::{
17 consensus::OuterConsensus,
18 data::{VidDisperse, VidDisperseShare},
19 epoch_membership::EpochMembershipCoordinator,
20 event::{Event, EventType, HotShotAction},
21 message::{
22 convert_proposal, DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message,
23 MessageKind, Proposal, SequencingMessage, UpgradeLock,
24 },
25 simple_vote::HasEpoch,
26 storage_metrics::StorageMetricsValue,
27 traits::{
28 network::{
29 BroadcastDelay, ConnectedNetwork, RequestKind, ResponseMessage, Topic, TransmitType,
30 ViewMessage,
31 },
32 node_implementation::{ConsensusTime, NodeType, Versions},
33 storage::Storage,
34 },
35 vote::{HasViewNumber, Vote},
36};
37use hotshot_utils::anytrace::*;
38use tokio::{spawn, task::JoinHandle, time::Instant};
39use tracing::instrument;
40
41use crate::{
42 events::{HotShotEvent, HotShotTaskCompleted},
43 helpers::broadcast_event,
44};
45
46#[derive(Clone)]
48pub struct NetworkMessageTaskState<TYPES: NodeType, V: Versions> {
49 pub internal_event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
51
52 pub external_event_stream: Sender<Event<TYPES>>,
54
55 pub public_key: TYPES::SignatureKey,
57
58 pub upgrade_lock: UpgradeLock<TYPES, V>,
60
61 pub id: u64,
63}
64
65impl<TYPES: NodeType, V: Versions> NetworkMessageTaskState<TYPES, V> {
66 #[instrument(skip_all, name = "Network message task", fields(id = self.id), level = "trace")]
67 pub async fn handle_message(&mut self, message: Message<TYPES>) {
69 match &message.kind {
70 MessageKind::Consensus(_) => {
71 tracing::debug!("Received consensus message from network: {:?}", message)
72 },
73 MessageKind::Data(_) => {
74 tracing::trace!("Received data message from network: {:?}", message)
75 },
76 MessageKind::External(_) => {
77 tracing::trace!("Received external message from network: {:?}", message)
78 },
79 }
80
81 let sender = message.sender;
83 match message.kind {
84 MessageKind::Consensus(consensus_message) => {
86 let event = match consensus_message {
87 SequencingMessage::General(general_message) => match general_message {
88 GeneralConsensusMessage::Proposal(proposal) => {
89 if self
90 .upgrade_lock
91 .epochs_enabled(proposal.data.view_number())
92 .await
93 {
94 tracing::warn!(
95 "received GeneralConsensusMessage::Proposal for view {} but \
96 epochs are enabled for that view",
97 proposal.data.view_number()
98 );
99 return;
100 }
101 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
102 },
103 GeneralConsensusMessage::Proposal2Legacy(proposal) => {
104 if !self
105 .upgrade_lock
106 .proposal2_legacy_version(proposal.data.view_number())
107 .await
108 {
109 tracing::warn!(
110 "received GeneralConsensusMessage::Proposal2Legacy for view \
111 {} but we are in the wrong version for that message type",
112 proposal.data.view_number()
113 );
114 return;
115 }
116 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
117 },
118 GeneralConsensusMessage::Proposal2(proposal) => {
119 if !self
120 .upgrade_lock
121 .proposal2_version(proposal.data.view_number())
122 .await
123 {
124 tracing::warn!(
125 "received GeneralConsensusMessage::Proposal2 for view {} but \
126 we are in the wrong version for that message type",
127 proposal.data.view_number()
128 );
129 return;
130 }
131 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
132 },
133 GeneralConsensusMessage::ProposalRequested(req, sig) => {
134 HotShotEvent::QuorumProposalRequestRecv(req, sig)
135 },
136 GeneralConsensusMessage::ProposalResponse(proposal) => {
137 if self
138 .upgrade_lock
139 .epochs_enabled(proposal.data.view_number())
140 .await
141 {
142 tracing::warn!(
143 "received GeneralConsensusMessage::ProposalResponse for view \
144 {} but we are in the wrong version for that message type",
145 proposal.data.view_number()
146 );
147 return;
148 }
149 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
150 },
151 GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
152 if !self
153 .upgrade_lock
154 .proposal2_legacy_version(proposal.data.view_number())
155 .await
156 {
157 tracing::warn!(
158 "received GeneralConsensusMessage::ProposalResponse2Legacy \
159 for view {} but we are in the wrong version for that message \
160 type",
161 proposal.data.view_number()
162 );
163 return;
164 }
165 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
166 },
167 GeneralConsensusMessage::ProposalResponse2(proposal) => {
168 if !self
169 .upgrade_lock
170 .proposal2_version(proposal.data.view_number())
171 .await
172 {
173 tracing::warn!(
174 "received GeneralConsensusMessage::ProposalResponse2 for view \
175 {} but epochs are not enabled for that view",
176 proposal.data.view_number()
177 );
178 return;
179 }
180 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
181 },
182 GeneralConsensusMessage::Vote(vote) => {
183 if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
184 tracing::warn!(
185 "received GeneralConsensusMessage::Vote for view {} but \
186 epochs are enabled for that view",
187 vote.view_number()
188 );
189 return;
190 }
191 HotShotEvent::QuorumVoteRecv(vote.to_vote2())
192 },
193 GeneralConsensusMessage::Vote2(vote) => {
194 if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
195 tracing::warn!(
196 "received GeneralConsensusMessage::Vote2 for view {} but \
197 epochs are not enabled for that view",
198 vote.view_number()
199 );
200 return;
201 }
202 HotShotEvent::QuorumVoteRecv(vote)
203 },
204 GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => {
205 if self
206 .upgrade_lock
207 .epochs_enabled(view_sync_message.view_number())
208 .await
209 {
210 tracing::warn!(
211 "received GeneralConsensusMessage::ViewSyncPreCommitVote for \
212 view {} but epochs are enabled for that view",
213 view_sync_message.view_number()
214 );
215 return;
216 }
217 HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message.to_vote2())
218 },
219 GeneralConsensusMessage::ViewSyncPreCommitVote2(view_sync_message) => {
220 if !self
221 .upgrade_lock
222 .epochs_enabled(view_sync_message.view_number())
223 .await
224 {
225 tracing::warn!(
226 "received GeneralConsensusMessage::ViewSyncPreCommitVote2 for \
227 view {} but epochs are not enabled for that view",
228 view_sync_message.view_number()
229 );
230 return;
231 }
232 HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message)
233 },
234 GeneralConsensusMessage::ViewSyncPreCommitCertificate(
235 view_sync_message,
236 ) => {
237 if self
238 .upgrade_lock
239 .epochs_enabled(view_sync_message.view_number())
240 .await
241 {
242 tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
243 return;
244 }
245 HotShotEvent::ViewSyncPreCommitCertificateRecv(
246 view_sync_message.to_vsc2(),
247 )
248 },
249 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(
250 view_sync_message,
251 ) => {
252 if !self
253 .upgrade_lock
254 .epochs_enabled(view_sync_message.view_number())
255 .await
256 {
257 tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
258 return;
259 }
260 HotShotEvent::ViewSyncPreCommitCertificateRecv(view_sync_message)
261 },
262 GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => {
263 if self
264 .upgrade_lock
265 .epochs_enabled(view_sync_message.view_number())
266 .await
267 {
268 tracing::warn!(
269 "received GeneralConsensusMessage::ViewSyncCommitVote for \
270 view {} but epochs are enabled for that view",
271 view_sync_message.view_number()
272 );
273 return;
274 }
275 HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message.to_vote2())
276 },
277 GeneralConsensusMessage::ViewSyncCommitVote2(view_sync_message) => {
278 if !self
279 .upgrade_lock
280 .epochs_enabled(view_sync_message.view_number())
281 .await
282 {
283 tracing::warn!(
284 "received GeneralConsensusMessage::ViewSyncCommitVote2 for \
285 view {} but epochs are not enabled for that view",
286 view_sync_message.view_number()
287 );
288 return;
289 }
290 HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message)
291 },
292 GeneralConsensusMessage::ViewSyncCommitCertificate(view_sync_message) => {
293 if self
294 .upgrade_lock
295 .epochs_enabled(view_sync_message.view_number())
296 .await
297 {
298 tracing::warn!(
299 "received GeneralConsensusMessage::ViewSyncCommitCertificate \
300 for view {} but epochs are enabled for that view",
301 view_sync_message.view_number()
302 );
303 return;
304 }
305 HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message.to_vsc2())
306 },
307 GeneralConsensusMessage::ViewSyncCommitCertificate2(view_sync_message) => {
308 if !self
309 .upgrade_lock
310 .epochs_enabled(view_sync_message.view_number())
311 .await
312 {
313 tracing::warn!(
314 "received GeneralConsensusMessage::ViewSyncCommitCertificate2 \
315 for view {} but epochs are not enabled for that view",
316 view_sync_message.view_number()
317 );
318 return;
319 }
320 HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message)
321 },
322 GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => {
323 if self
324 .upgrade_lock
325 .epochs_enabled(view_sync_message.view_number())
326 .await
327 {
328 tracing::warn!(
329 "received GeneralConsensusMessage::ViewSyncFinalizeVote for \
330 view {} but epochs are enabled for that view",
331 view_sync_message.view_number()
332 );
333 return;
334 }
335 HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message.to_vote2())
336 },
337 GeneralConsensusMessage::ViewSyncFinalizeVote2(view_sync_message) => {
338 if !self
339 .upgrade_lock
340 .epochs_enabled(view_sync_message.view_number())
341 .await
342 {
343 tracing::warn!(
344 "received GeneralConsensusMessage::ViewSyncFinalizeVote2 for \
345 view {} but epochs are not enabled for that view",
346 view_sync_message.view_number()
347 );
348 return;
349 }
350 HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message)
351 },
352 GeneralConsensusMessage::ViewSyncFinalizeCertificate(view_sync_message) => {
353 if self
354 .upgrade_lock
355 .epochs_enabled(view_sync_message.view_number())
356 .await
357 {
358 tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
359 return;
360 }
361 HotShotEvent::ViewSyncFinalizeCertificateRecv(
362 view_sync_message.to_vsc2(),
363 )
364 },
365 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(
366 view_sync_message,
367 ) => {
368 if !self
369 .upgrade_lock
370 .epochs_enabled(view_sync_message.view_number())
371 .await
372 {
373 tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
374 return;
375 }
376 HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_message)
377 },
378 GeneralConsensusMessage::TimeoutVote(message) => {
379 if self
380 .upgrade_lock
381 .epochs_enabled(message.view_number())
382 .await
383 {
384 tracing::warn!(
385 "received GeneralConsensusMessage::TimeoutVote for view {} \
386 but epochs are enabled for that view",
387 message.view_number()
388 );
389 return;
390 }
391 HotShotEvent::TimeoutVoteRecv(message.to_vote2())
392 },
393 GeneralConsensusMessage::TimeoutVote2(message) => {
394 if !self
395 .upgrade_lock
396 .epochs_enabled(message.view_number())
397 .await
398 {
399 tracing::warn!(
400 "received GeneralConsensusMessage::TimeoutVote2 for view {} \
401 but epochs are not enabled for that view",
402 message.view_number()
403 );
404 return;
405 }
406 HotShotEvent::TimeoutVoteRecv(message)
407 },
408 GeneralConsensusMessage::UpgradeProposal(message) => {
409 HotShotEvent::UpgradeProposalRecv(message, sender)
410 },
411 GeneralConsensusMessage::UpgradeVote(message) => {
412 tracing::error!("Received upgrade vote!");
413 HotShotEvent::UpgradeVoteRecv(message)
414 },
415 GeneralConsensusMessage::HighQc(qc, next_qc) => {
416 HotShotEvent::HighQcRecv(qc, next_qc, sender)
417 },
418 GeneralConsensusMessage::ExtendedQc(qc, next_epoch_qc) => {
419 HotShotEvent::ExtendedQcRecv(qc, next_epoch_qc, sender)
420 },
421 GeneralConsensusMessage::EpochRootQuorumVote(vote) => {
422 if !self
423 .upgrade_lock
424 .proposal2_legacy_version(vote.view_number())
425 .await
426 {
427 tracing::warn!(
428 "received GeneralConsensusMessage::EpochRootQuorumVote for \
429 view {} but we do not expect this message in this version",
430 vote.view_number()
431 );
432 return;
433 }
434 HotShotEvent::EpochRootQuorumVoteRecv(vote.to_vote2())
435 },
436 GeneralConsensusMessage::EpochRootQuorumVote2(vote) => {
437 if !self
438 .upgrade_lock
439 .proposal2_version(vote.view_number())
440 .await
441 {
442 tracing::warn!(
443 "received GeneralConsensusMessage::EpochRootQuorumVote2 for \
444 view {} but we do not expect this message in this version",
445 vote.view_number()
446 );
447 return;
448 }
449 HotShotEvent::EpochRootQuorumVoteRecv(vote)
450 },
451 GeneralConsensusMessage::EpochRootQc(root_qc) => {
452 if !self
453 .upgrade_lock
454 .proposal2_version(root_qc.view_number())
455 .await
456 {
457 tracing::warn!(
458 "received GeneralConsensusMessage::EpochRootQc for view {} \
459 but we are in the wrong version for that message types",
460 root_qc.view_number()
461 );
462 return;
463 }
464 HotShotEvent::EpochRootQcRecv(root_qc, sender)
465 },
466 GeneralConsensusMessage::EpochRootQcV1(root_qc) => {
467 if !self
468 .upgrade_lock
469 .proposal2_legacy_version(root_qc.view_number())
470 .await
471 {
472 tracing::warn!(
473 "received GeneralConsensusMessage::EpochRootQcV1 for view {} \
474 but we are in the wrong version for that message type",
475 root_qc.view_number()
476 );
477 return;
478 }
479 HotShotEvent::EpochRootQcRecv(root_qc.into(), sender)
480 },
481 },
482 SequencingMessage::Da(da_message) => match da_message {
483 DaConsensusMessage::DaProposal(proposal) => {
484 if self
485 .upgrade_lock
486 .epochs_enabled(proposal.data.view_number())
487 .await
488 {
489 tracing::warn!(
490 "received DaConsensusMessage::DaProposal for view {} but \
491 epochs are enabled for that view",
492 proposal.data.view_number()
493 );
494 return;
495 }
496 HotShotEvent::DaProposalRecv(convert_proposal(proposal), sender)
497 },
498 DaConsensusMessage::DaProposal2(proposal) => {
499 if !self
500 .upgrade_lock
501 .epochs_enabled(proposal.data.view_number())
502 .await
503 {
504 tracing::warn!(
505 "received DaConsensusMessage::DaProposal2 for view {} but \
506 epochs are not enabled for that view",
507 proposal.data.view_number()
508 );
509 return;
510 }
511 HotShotEvent::DaProposalRecv(proposal, sender)
512 },
513 DaConsensusMessage::DaVote(vote) => {
514 if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
515 tracing::warn!(
516 "received DaConsensusMessage::DaVote for view {} but epochs \
517 are enabled for that view",
518 vote.view_number()
519 );
520 return;
521 }
522 HotShotEvent::DaVoteRecv(vote.clone().to_vote2())
523 },
524 DaConsensusMessage::DaVote2(vote) => {
525 if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
526 tracing::warn!(
527 "received DaConsensusMessage::DaVote2 for view {} but epochs \
528 are not enabled for that view",
529 vote.view_number()
530 );
531 return;
532 }
533 HotShotEvent::DaVoteRecv(vote.clone())
534 },
535 DaConsensusMessage::DaCertificate(cert) => {
536 if self.upgrade_lock.epochs_enabled(cert.view_number()).await {
537 tracing::warn!(
538 "received DaConsensusMessage::DaCertificate for view {} but \
539 epochs are enabled for that view",
540 cert.view_number()
541 );
542 return;
543 }
544 HotShotEvent::DaCertificateRecv(cert.to_dac2())
545 },
546 DaConsensusMessage::DaCertificate2(cert) => {
547 if !self.upgrade_lock.epochs_enabled(cert.view_number()).await {
548 tracing::warn!(
549 "received DaConsensusMessage::DaCertificate2 for view {} but \
550 epochs are not enabled for that view",
551 cert.view_number()
552 );
553 return;
554 }
555 HotShotEvent::DaCertificateRecv(cert)
556 },
557 DaConsensusMessage::VidDisperseMsg(proposal) => {
558 if self
559 .upgrade_lock
560 .epochs_enabled(proposal.data.view_number())
561 .await
562 {
563 tracing::warn!(
564 "received DaConsensusMessage::VidDisperseMsg for view {} but \
565 epochs are enabled for that view",
566 proposal.data.view_number()
567 );
568 return;
569 }
570 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
571 },
572 DaConsensusMessage::VidDisperseMsg2(proposal) => {
573 if !self
574 .upgrade_lock
575 .epochs_enabled(proposal.data.view_number())
576 .await
577 {
578 tracing::warn!(
579 "received DaConsensusMessage::VidDisperseMsg2 for view {} but \
580 epochs are not enabled for that view",
581 proposal.data.view_number()
582 );
583 return;
584 }
585 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
586 },
587 },
588 };
589 broadcast_event(Arc::new(event), &self.internal_event_stream).await;
590 },
591
592 MessageKind::Data(message) => match message {
594 DataMessage::SubmitTransaction(transaction, _) => {
595 let mut hasher = DefaultHasher::new();
596 transaction.hash(&mut hasher);
597 broadcast_event(
598 Arc::new(HotShotEvent::TransactionsRecv(vec![transaction])),
599 &self.internal_event_stream,
600 )
601 .await;
602 },
603 DataMessage::DataResponse(response) => {
604 if let ResponseMessage::Found(message) = response {
605 match message {
606 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(proposal)) => {
607 broadcast_event(
608 Arc::new(HotShotEvent::VidResponseRecv(
609 sender,
610 convert_proposal(proposal),
611 )),
612 &self.internal_event_stream,
613 )
614 .await;
615 },
616 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
617 proposal,
618 )) => {
619 broadcast_event(
620 Arc::new(HotShotEvent::VidResponseRecv(
621 sender,
622 convert_proposal(proposal),
623 )),
624 &self.internal_event_stream,
625 )
626 .await;
627 },
628 _ => {},
629 }
630 }
631 },
632 DataMessage::RequestData(data) => {
633 let req_data = data.clone();
634 if let RequestKind::Vid(_view_number, _key) = req_data.request {
635 broadcast_event(
636 Arc::new(HotShotEvent::VidRequestRecv(data, sender)),
637 &self.internal_event_stream,
638 )
639 .await;
640 }
641 },
642 },
643
644 MessageKind::External(data) => {
646 if sender == self.public_key {
647 return;
648 }
649 broadcast_event(
651 Event {
652 view_number: TYPES::View::new(1),
653 event: EventType::ExternalMessageReceived { sender, data },
654 },
655 &self.external_event_stream,
656 )
657 .await;
658 },
659 }
660 }
661}
662
663pub struct NetworkEventTaskState<
665 TYPES: NodeType,
666 V: Versions,
667 NET: ConnectedNetwork<TYPES::SignatureKey>,
668 S: Storage<TYPES>,
669> {
670 pub network: Arc<NET>,
672
673 pub view: TYPES::View,
675
676 pub epoch: Option<TYPES::Epoch>,
678
679 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
681
682 pub storage: S,
684
685 pub storage_metrics: Arc<StorageMetricsValue>,
687
688 pub consensus: OuterConsensus<TYPES>,
690
691 pub upgrade_lock: UpgradeLock<TYPES, V>,
693
694 pub transmit_tasks: BTreeMap<TYPES::View, Vec<JoinHandle<()>>>,
696
697 pub epoch_height: u64,
699
700 pub id: u64,
702}
703
704#[async_trait]
705impl<
706 TYPES: NodeType,
707 V: Versions,
708 NET: ConnectedNetwork<TYPES::SignatureKey>,
709 S: Storage<TYPES> + 'static,
710 > TaskState for NetworkEventTaskState<TYPES, V, NET, S>
711{
712 type Event = HotShotEvent<TYPES>;
713
714 async fn handle_event(
715 &mut self,
716 event: Arc<Self::Event>,
717 _sender: &Sender<Arc<Self::Event>>,
718 _receiver: &Receiver<Arc<Self::Event>>,
719 ) -> Result<()> {
720 self.handle(event).await;
721
722 Ok(())
723 }
724
725 fn cancel_subtasks(&mut self) {}
726}
727
728impl<
729 TYPES: NodeType,
730 V: Versions,
731 NET: ConnectedNetwork<TYPES::SignatureKey>,
732 S: Storage<TYPES> + 'static,
733 > NetworkEventTaskState<TYPES, V, NET, S>
734{
735 #[instrument(skip_all, fields(id = self.id, view = *self.view), name = "Network Task", level = "error")]
739 pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
740 let mut maybe_action = None;
741 if let Some((sender, message_kind, transmit)) =
742 self.parse_event(event, &mut maybe_action).await
743 {
744 self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
745 .await;
746 };
747 }
748
749 async fn handle_vid_disperse_proposal(
751 &self,
752 vid_proposal: Proposal<TYPES, VidDisperse<TYPES>>,
753 sender: &<TYPES as NodeType>::SignatureKey,
754 ) -> Option<HotShotTaskCompleted> {
755 let view = vid_proposal.data.view_number();
756 let epoch = vid_proposal.data.epoch();
757 let vid_share_proposals = VidDisperseShare::to_vid_share_proposals(vid_proposal);
758 let mut messages = HashMap::new();
759
760 for proposal in vid_share_proposals {
761 let recipient = proposal.data.recipient_key().clone();
762 let message = if self
763 .upgrade_lock
764 .epochs_enabled(proposal.data.view_number())
765 .await
766 {
767 let vid_share_proposal = if let VidDisperseShare::V1(data) = proposal.data {
768 Proposal {
769 data,
770 signature: proposal.signature,
771 _pd: proposal._pd,
772 }
773 } else {
774 tracing::warn!(
775 "Epochs are enabled for view {} but didn't receive VidDisperseShare2",
776 proposal.data.view_number()
777 );
778 return None;
779 };
780 Message {
781 sender: sender.clone(),
782 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
783 DaConsensusMessage::VidDisperseMsg2(vid_share_proposal),
784 )),
785 }
786 } else {
787 let vid_share_proposal = if let VidDisperseShare::V0(data) = proposal.data {
788 Proposal {
789 data,
790 signature: proposal.signature,
791 _pd: proposal._pd,
792 }
793 } else {
794 tracing::warn!(
795 "Epochs are not enabled for view {} but didn't receive ADVZDisperseShare",
796 proposal.data.view_number()
797 );
798 return None;
799 };
800 Message {
801 sender: sender.clone(),
802 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
803 DaConsensusMessage::VidDisperseMsg(vid_share_proposal),
804 )),
805 }
806 };
807 let serialized_message = match self.upgrade_lock.serialize(&message).await {
808 Ok(serialized) => serialized,
809 Err(e) => {
810 tracing::error!("Failed to serialize message: {e}");
811 continue;
812 },
813 };
814
815 messages.insert(recipient, serialized_message);
816 }
817
818 let net = Arc::clone(&self.network);
819 let storage = self.storage.clone();
820 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
821 spawn(async move {
822 if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
823 Some(HotShotAction::VidDisperse),
824 storage,
825 consensus,
826 view,
827 epoch,
828 )
829 .await
830 .is_err()
831 {
832 return;
833 }
834 match net.vid_broadcast_message(messages).await {
835 Ok(()) => {},
836 Err(e) => tracing::warn!("Failed to send message from network task: {e:?}"),
837 }
838 });
839
840 None
841 }
842
843 async fn maybe_record_action(
845 maybe_action: Option<HotShotAction>,
846 storage: S,
847 consensus: OuterConsensus<TYPES>,
848 view: <TYPES as NodeType>::View,
849 epoch: Option<<TYPES as NodeType>::Epoch>,
850 ) -> std::result::Result<(), ()> {
851 if let Some(mut action) = maybe_action {
852 if !consensus.write().await.update_action(action, view) {
853 return Err(());
854 }
855 if matches!(action, HotShotAction::ViewSyncVote) {
858 action = HotShotAction::Vote;
859 }
860 match storage.record_action(view, epoch, action).await {
861 Ok(()) => Ok(()),
862 Err(e) => {
863 tracing::warn!("Not Sending {action:?} because of storage error: {e:?}");
864 Err(())
865 },
866 }
867 } else {
868 Ok(())
869 }
870 }
871
872 pub fn cancel_tasks(&mut self, view: TYPES::View) {
874 let keep = self.transmit_tasks.split_off(&view);
875
876 while let Some((_, tasks)) = self.transmit_tasks.pop_first() {
877 for task in tasks {
878 task.abort();
879 }
880 }
881
882 self.transmit_tasks = keep;
883 }
884
885 #[allow(clippy::too_many_lines)]
890 async fn parse_event(
891 &mut self,
892 event: Arc<HotShotEvent<TYPES>>,
893 maybe_action: &mut Option<HotShotAction>,
894 ) -> Option<(
895 <TYPES as NodeType>::SignatureKey,
896 MessageKind<TYPES>,
897 TransmitType<TYPES>,
898 )> {
899 match event.as_ref().clone() {
900 HotShotEvent::QuorumProposalSend(proposal, sender) => {
901 *maybe_action = Some(HotShotAction::Propose);
902
903 let message = if self
904 .upgrade_lock
905 .proposal2_version(proposal.data.view_number())
906 .await
907 {
908 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
909 GeneralConsensusMessage::Proposal2(convert_proposal(proposal)),
910 ))
911 } else if self
912 .upgrade_lock
913 .proposal2_legacy_version(proposal.data.view_number())
914 .await
915 {
916 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
917 GeneralConsensusMessage::Proposal2Legacy(convert_proposal(proposal)),
918 ))
919 } else {
920 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
921 GeneralConsensusMessage::Proposal(convert_proposal(proposal)),
922 ))
923 };
924
925 Some((sender, message, TransmitType::Broadcast))
926 },
927
928 HotShotEvent::QuorumVoteSend(vote) => {
930 *maybe_action = Some(HotShotAction::Vote);
931 let view_number = vote.view_number() + 1;
932 let leader = match self
933 .membership_coordinator
934 .membership_for_epoch(vote.epoch())
935 .await
936 .ok()?
937 .leader(view_number)
938 .await
939 {
940 Ok(l) => l,
941 Err(e) => {
942 tracing::warn!(
943 "Failed to calculate leader for view number {view_number}. Error: \
944 {e:?}"
945 );
946 return None;
947 },
948 };
949
950 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
951 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
952 GeneralConsensusMessage::Vote2(vote.clone()),
953 ))
954 } else {
955 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
956 GeneralConsensusMessage::Vote(vote.clone().to_vote()),
957 ))
958 };
959
960 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
961 },
962 HotShotEvent::EpochRootQuorumVoteSend(vote) => {
963 *maybe_action = Some(HotShotAction::Vote);
964 let view_number = vote.view_number() + 1;
965 let leader = match self
966 .membership_coordinator
967 .membership_for_epoch(vote.epoch())
968 .await
969 .ok()?
970 .leader(view_number)
971 .await
972 {
973 Ok(l) => l,
974 Err(e) => {
975 tracing::warn!(
976 "Failed to calculate leader for view number {:?}. Error: {:?}",
977 view_number,
978 e
979 );
980 return None;
981 },
982 };
983
984 let message = if self
985 .upgrade_lock
986 .proposal2_version(vote.view_number())
987 .await
988 {
989 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
990 GeneralConsensusMessage::EpochRootQuorumVote2(vote.clone()),
991 ))
992 } else if self
993 .upgrade_lock
994 .proposal2_legacy_version(vote.view_number())
995 .await
996 {
997 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
998 GeneralConsensusMessage::EpochRootQuorumVote(vote.clone().to_vote()),
999 ))
1000 } else {
1001 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1002 GeneralConsensusMessage::Vote(vote.vote.clone().to_vote()),
1003 ))
1004 };
1005
1006 Some((
1007 vote.vote.signing_key(),
1008 message,
1009 TransmitType::Direct(leader),
1010 ))
1011 },
1012 HotShotEvent::ExtendedQuorumVoteSend(vote) => {
1013 *maybe_action = Some(HotShotAction::Vote);
1014 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1015 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1016 GeneralConsensusMessage::Vote2(vote.clone()),
1017 ))
1018 } else {
1019 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1020 GeneralConsensusMessage::Vote(vote.clone().to_vote()),
1021 ))
1022 };
1023
1024 Some((vote.signing_key(), message, TransmitType::Broadcast))
1025 },
1026 HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
1027 req.key.clone(),
1028 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1029 GeneralConsensusMessage::ProposalRequested(req.clone(), signature),
1030 )),
1031 TransmitType::Broadcast,
1032 )),
1033 HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => {
1034 let message = if self
1035 .upgrade_lock
1036 .proposal2_version(proposal.data.view_number())
1037 .await
1038 {
1039 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1040 GeneralConsensusMessage::ProposalResponse2(convert_proposal(proposal)),
1041 ))
1042 } else if self
1043 .upgrade_lock
1044 .proposal2_legacy_version(proposal.data.view_number())
1045 .await
1046 {
1047 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1048 GeneralConsensusMessage::ProposalResponse2Legacy(convert_proposal(
1049 proposal,
1050 )),
1051 ))
1052 } else {
1053 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1054 GeneralConsensusMessage::ProposalResponse(convert_proposal(proposal)),
1055 ))
1056 };
1057
1058 Some((
1059 sender_key.clone(),
1060 message,
1061 TransmitType::Direct(sender_key),
1062 ))
1063 },
1064 HotShotEvent::VidDisperseSend(proposal, sender) => {
1065 self.handle_vid_disperse_proposal(proposal, &sender).await;
1066 None
1067 },
1068 HotShotEvent::DaProposalSend(proposal, sender) => {
1069 *maybe_action = Some(HotShotAction::DaPropose);
1070
1071 let message = if self
1072 .upgrade_lock
1073 .epochs_enabled(proposal.data.view_number())
1074 .await
1075 {
1076 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1077 DaConsensusMessage::DaProposal2(proposal),
1078 ))
1079 } else {
1080 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1081 DaConsensusMessage::DaProposal(convert_proposal(proposal)),
1082 ))
1083 };
1084
1085 Some((sender, message, TransmitType::DaCommitteeBroadcast))
1086 },
1087 HotShotEvent::DaVoteSend(vote) => {
1088 *maybe_action = Some(HotShotAction::DaVote);
1089 let view_number = vote.view_number();
1090 let leader = match self
1091 .membership_coordinator
1092 .membership_for_epoch(vote.epoch())
1093 .await
1094 .ok()?
1095 .leader(view_number)
1096 .await
1097 {
1098 Ok(l) => l,
1099 Err(e) => {
1100 tracing::warn!(
1101 "Failed to calculate leader for view number {view_number}. Error: \
1102 {e:?}"
1103 );
1104 return None;
1105 },
1106 };
1107
1108 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1109 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1110 DaConsensusMessage::DaVote2(vote.clone()),
1111 ))
1112 } else {
1113 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1114 DaConsensusMessage::DaVote(vote.clone().to_vote()),
1115 ))
1116 };
1117
1118 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1119 },
1120 HotShotEvent::DacSend(certificate, sender) => {
1121 *maybe_action = Some(HotShotAction::DaCert);
1122 let message = if self
1123 .upgrade_lock
1124 .epochs_enabled(certificate.view_number())
1125 .await
1126 {
1127 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1128 DaConsensusMessage::DaCertificate2(certificate),
1129 ))
1130 } else {
1131 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1132 DaConsensusMessage::DaCertificate(certificate.to_dac()),
1133 ))
1134 };
1135
1136 Some((sender, message, TransmitType::Broadcast))
1137 },
1138 HotShotEvent::ViewSyncPreCommitVoteSend(vote) => {
1139 let view_number = vote.view_number() + vote.date().relay;
1140 let leader = match self
1141 .membership_coordinator
1142 .membership_for_epoch(self.epoch)
1143 .await
1144 .ok()?
1145 .leader(view_number)
1146 .await
1147 {
1148 Ok(l) => l,
1149 Err(e) => {
1150 tracing::warn!(
1151 "Failed to calculate leader for view number {view_number}. Error: \
1152 {e:?}"
1153 );
1154 return None;
1155 },
1156 };
1157 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1158 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1159 GeneralConsensusMessage::ViewSyncPreCommitVote2(vote.clone()),
1160 ))
1161 } else {
1162 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1163 GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone().to_vote()),
1164 ))
1165 };
1166
1167 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1168 },
1169 HotShotEvent::ViewSyncCommitVoteSend(vote) => {
1170 *maybe_action = Some(HotShotAction::ViewSyncVote);
1171 let view_number = vote.view_number() + vote.date().relay;
1172 let leader = match self
1173 .membership_coordinator
1174 .membership_for_epoch(self.epoch)
1175 .await
1176 .ok()?
1177 .leader(view_number)
1178 .await
1179 {
1180 Ok(l) => l,
1181 Err(e) => {
1182 tracing::warn!(
1183 "Failed to calculate leader for view number {view_number}. Error: \
1184 {e:?}"
1185 );
1186 return None;
1187 },
1188 };
1189 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1190 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1191 GeneralConsensusMessage::ViewSyncCommitVote2(vote.clone()),
1192 ))
1193 } else {
1194 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1195 GeneralConsensusMessage::ViewSyncCommitVote(vote.clone().to_vote()),
1196 ))
1197 };
1198
1199 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1200 },
1201 HotShotEvent::ViewSyncFinalizeVoteSend(vote) => {
1202 *maybe_action = Some(HotShotAction::ViewSyncVote);
1203 let view_number = vote.view_number() + vote.date().relay;
1204 let leader = match self
1205 .membership_coordinator
1206 .membership_for_epoch(self.epoch)
1207 .await
1208 .ok()?
1209 .leader(view_number)
1210 .await
1211 {
1212 Ok(l) => l,
1213 Err(e) => {
1214 tracing::warn!(
1215 "Failed to calculate leader for view number {view_number:?}. Error: \
1216 {e:?}"
1217 );
1218 return None;
1219 },
1220 };
1221 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1222 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1223 GeneralConsensusMessage::ViewSyncFinalizeVote2(vote.clone()),
1224 ))
1225 } else {
1226 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1227 GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone().to_vote()),
1228 ))
1229 };
1230
1231 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1232 },
1233 HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, sender) => {
1234 let view_number = certificate.view_number();
1235 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1236 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1237 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(certificate),
1238 ))
1239 } else {
1240 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1241 GeneralConsensusMessage::ViewSyncPreCommitCertificate(certificate.to_vsc()),
1242 ))
1243 };
1244
1245 Some((sender, message, TransmitType::Broadcast))
1246 },
1247 HotShotEvent::ViewSyncCommitCertificateSend(certificate, sender) => {
1248 let view_number = certificate.view_number();
1249 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1250 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1251 GeneralConsensusMessage::ViewSyncCommitCertificate2(certificate),
1252 ))
1253 } else {
1254 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1255 GeneralConsensusMessage::ViewSyncCommitCertificate(certificate.to_vsc()),
1256 ))
1257 };
1258
1259 Some((sender, message, TransmitType::Broadcast))
1260 },
1261 HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, sender) => {
1262 let view_number = certificate.view_number();
1263 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1264 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1265 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(certificate),
1266 ))
1267 } else {
1268 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1269 GeneralConsensusMessage::ViewSyncFinalizeCertificate(certificate.to_vsc()),
1270 ))
1271 };
1272
1273 Some((sender, message, TransmitType::Broadcast))
1274 },
1275 HotShotEvent::TimeoutVoteSend(vote) => {
1276 *maybe_action = Some(HotShotAction::TimeoutVote);
1277 let view_number = vote.view_number() + 1;
1278 let leader = match self
1279 .membership_coordinator
1280 .membership_for_epoch(self.epoch)
1281 .await
1282 .ok()?
1283 .leader(view_number)
1284 .await
1285 {
1286 Ok(l) => l,
1287 Err(e) => {
1288 tracing::warn!(
1289 "Failed to calculate leader for view number {view_number}. Error: \
1290 {e:?}"
1291 );
1292 return None;
1293 },
1294 };
1295 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1296 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1297 GeneralConsensusMessage::TimeoutVote2(vote.clone()),
1298 ))
1299 } else {
1300 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1301 GeneralConsensusMessage::TimeoutVote(vote.clone().to_vote()),
1302 ))
1303 };
1304
1305 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1306 },
1307 HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
1308 sender,
1309 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1310 GeneralConsensusMessage::UpgradeProposal(proposal),
1311 )),
1312 TransmitType::Broadcast,
1313 )),
1314 HotShotEvent::UpgradeVoteSend(vote) => {
1315 tracing::error!("Sending upgrade vote!");
1316 let view_number = vote.view_number();
1317 let leader = match self
1318 .membership_coordinator
1319 .membership_for_epoch(self.epoch)
1320 .await
1321 .ok()?
1322 .leader(view_number)
1323 .await
1324 {
1325 Ok(l) => l,
1326 Err(e) => {
1327 tracing::warn!(
1328 "Failed to calculate leader for view number {view_number}. Error: \
1329 {e:?}"
1330 );
1331 return None;
1332 },
1333 };
1334 Some((
1335 vote.signing_key(),
1336 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1337 GeneralConsensusMessage::UpgradeVote(vote.clone()),
1338 )),
1339 TransmitType::Direct(leader),
1340 ))
1341 },
1342 HotShotEvent::ViewChange(view, epoch) => {
1343 self.view = view;
1344 if epoch > self.epoch {
1345 self.epoch = epoch;
1346 }
1347 let keep_view = TYPES::View::new(view.saturating_sub(1));
1348 self.cancel_tasks(keep_view);
1349 let net = Arc::clone(&self.network);
1350 let epoch = self.epoch.map(|x| x.u64());
1351 let membership_coordinator = self.membership_coordinator.clone();
1352 spawn(async move {
1353 net.update_view::<TYPES>(*keep_view, epoch, membership_coordinator)
1354 .await;
1355 });
1356 None
1357 },
1358 HotShotEvent::VidRequestSend(req, sender, to) => Some((
1359 sender,
1360 MessageKind::Data(DataMessage::RequestData(req)),
1361 TransmitType::Direct(to),
1362 )),
1363 HotShotEvent::VidResponseSend(sender, to, proposal) => {
1364 let epochs_enabled = self
1365 .upgrade_lock
1366 .epochs_enabled(proposal.data.view_number())
1367 .await;
1368 let message = match proposal.data {
1369 VidDisperseShare::V0(data) => {
1370 if epochs_enabled {
1371 tracing::warn!(
1372 "Epochs are enabled for view {} but didn't receive \
1373 VidDisperseShare2",
1374 data.view_number()
1375 );
1376 return None;
1377 }
1378 let vid_share_proposal = Proposal {
1379 data,
1380 signature: proposal.signature,
1381 _pd: proposal._pd,
1382 };
1383 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1384 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(
1385 vid_share_proposal,
1386 )),
1387 )))
1388 },
1389 VidDisperseShare::V1(data) => {
1390 if !epochs_enabled {
1391 tracing::warn!(
1392 "Epochs are enabled for view {} but didn't receive \
1393 ADVZDisperseShare",
1394 data.view_number()
1395 );
1396 return None;
1397 }
1398 let vid_share_proposal = Proposal {
1399 data,
1400 signature: proposal.signature,
1401 _pd: proposal._pd,
1402 };
1403 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1404 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
1405 vid_share_proposal,
1406 )),
1407 )))
1408 },
1409 };
1410 Some((sender, message, TransmitType::Direct(to)))
1411 },
1412 HotShotEvent::HighQcSend(quorum_cert, next_epoch_qc, leader, sender) => Some((
1413 sender,
1414 MessageKind::Consensus(SequencingMessage::General(
1415 GeneralConsensusMessage::HighQc(quorum_cert, next_epoch_qc),
1416 )),
1417 TransmitType::Direct(leader),
1418 )),
1419 HotShotEvent::EpochRootQcSend(epoch_root_qc, sender, leader) => {
1420 let message = if self
1421 .upgrade_lock
1422 .proposal2_version(epoch_root_qc.view_number())
1423 .await
1424 {
1425 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1426 GeneralConsensusMessage::EpochRootQc(epoch_root_qc),
1427 ))
1428 } else {
1429 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1430 GeneralConsensusMessage::EpochRootQcV1(epoch_root_qc.into()),
1431 ))
1432 };
1433
1434 Some((sender, message, TransmitType::Direct(leader)))
1435 },
1436 HotShotEvent::ExtendedQcSend(quorum_cert, next_epoch_qc, sender) => Some((
1437 sender,
1438 MessageKind::Consensus(SequencingMessage::General(
1439 GeneralConsensusMessage::ExtendedQc(quorum_cert, next_epoch_qc),
1440 )),
1441 TransmitType::Broadcast,
1442 )),
1443 _ => None,
1444 }
1445 }
1446
1447 async fn spawn_transmit_task(
1449 &mut self,
1450 message_kind: MessageKind<TYPES>,
1451 maybe_action: Option<HotShotAction>,
1452 transmit: TransmitType<TYPES>,
1453 sender: TYPES::SignatureKey,
1454 ) {
1455 let broadcast_delay = match &message_kind {
1456 MessageKind::Consensus(
1457 SequencingMessage::General(GeneralConsensusMessage::Vote(_))
1458 | SequencingMessage::Da(_),
1459 ) => BroadcastDelay::View(*message_kind.view_number()),
1460 _ => BroadcastDelay::None,
1461 };
1462 let message = Message {
1463 sender,
1464 kind: message_kind,
1465 };
1466 let view_number = message.kind.view_number();
1467 let epoch = message.kind.epoch();
1468 let committee_topic = Topic::Global;
1469 let Ok(mem) = self
1470 .membership_coordinator
1471 .stake_table_for_epoch(self.epoch)
1472 .await
1473 else {
1474 return;
1475 };
1476 let da_committee = mem.da_committee_members(view_number).await;
1477 let network = Arc::clone(&self.network);
1478 let storage = self.storage.clone();
1479 let storage_metrics = Arc::clone(&self.storage_metrics);
1480 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
1481 let upgrade_lock = self.upgrade_lock.clone();
1482 let handle = spawn(async move {
1483 if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
1484 maybe_action,
1485 storage.clone(),
1486 consensus,
1487 view_number,
1488 epoch,
1489 )
1490 .await
1491 .is_err()
1492 {
1493 return;
1494 }
1495 if let MessageKind::Consensus(SequencingMessage::General(
1496 GeneralConsensusMessage::Proposal(prop),
1497 )) = &message.kind
1498 {
1499 let now = Instant::now();
1500 if storage
1501 .append_proposal2(&convert_proposal(prop.clone()))
1502 .await
1503 .is_err()
1504 {
1505 return;
1506 }
1507 storage_metrics
1508 .append_quorum_duration
1509 .add_point(now.elapsed().as_secs_f64());
1510 }
1511
1512 let serialized_message = match upgrade_lock.serialize(&message).await {
1513 Ok(serialized) => serialized,
1514 Err(e) => {
1515 tracing::error!("Failed to serialize message: {e}");
1516 return;
1517 },
1518 };
1519
1520 let transmit_result = match transmit {
1521 TransmitType::Direct(recipient) => {
1522 network.direct_message(serialized_message, recipient).await
1523 },
1524 TransmitType::Broadcast => {
1525 network
1526 .broadcast_message(serialized_message, committee_topic, broadcast_delay)
1527 .await
1528 },
1529 TransmitType::DaCommitteeBroadcast => {
1530 network
1531 .da_broadcast_message(
1532 serialized_message,
1533 da_committee.iter().cloned().collect(),
1534 broadcast_delay,
1535 )
1536 .await
1537 },
1538 };
1539
1540 match transmit_result {
1541 Ok(()) => {},
1542 Err(e) => tracing::warn!("Failed to send message task: {e:?}"),
1543 }
1544 });
1545 self.transmit_tasks
1546 .entry(view_number)
1547 .or_default()
1548 .push(handle);
1549 }
1550}
1551
1552pub mod test {
1554 use std::ops::{Deref, DerefMut};
1555
1556 use async_trait::async_trait;
1557
1558 use super::{
1559 Arc, ConnectedNetwork, HotShotEvent, MessageKind, NetworkEventTaskState, NodeType,
1560 Receiver, Result, Sender, Storage, TaskState, TransmitType, Versions,
1561 };
1562
1563 pub type ModifierClosure<TYPES> = dyn Fn(
1566 &mut <TYPES as NodeType>::SignatureKey,
1567 &mut MessageKind<TYPES>,
1568 &mut TransmitType<TYPES>,
1569 &<TYPES as NodeType>::Membership,
1570 ) + Send
1571 + Sync;
1572
1573 pub struct NetworkEventTaskStateModifier<
1575 TYPES: NodeType,
1576 V: Versions,
1577 NET: ConnectedNetwork<TYPES::SignatureKey>,
1578 S: Storage<TYPES>,
1579 > {
1580 pub network_event_task_state: NetworkEventTaskState<TYPES, V, NET, S>,
1582 pub modifier: Arc<ModifierClosure<TYPES>>,
1585 }
1586
1587 impl<
1588 TYPES: NodeType,
1589 V: Versions,
1590 NET: ConnectedNetwork<TYPES::SignatureKey>,
1591 S: Storage<TYPES> + 'static,
1592 > NetworkEventTaskStateModifier<TYPES, V, NET, S>
1593 {
1594 pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
1596 let mut maybe_action = None;
1597 if let Some((mut sender, mut message_kind, mut transmit)) =
1598 self.parse_event(event, &mut maybe_action).await
1599 {
1600 (self.modifier)(
1602 &mut sender,
1603 &mut message_kind,
1604 &mut transmit,
1605 &*self.membership_coordinator.membership().read().await,
1606 );
1607
1608 self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
1609 .await;
1610 }
1611 }
1612 }
1613
1614 #[async_trait]
1615 impl<
1616 TYPES: NodeType,
1617 V: Versions,
1618 NET: ConnectedNetwork<TYPES::SignatureKey>,
1619 S: Storage<TYPES> + 'static,
1620 > TaskState for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1621 {
1622 type Event = HotShotEvent<TYPES>;
1623
1624 async fn handle_event(
1625 &mut self,
1626 event: Arc<Self::Event>,
1627 _sender: &Sender<Arc<Self::Event>>,
1628 _receiver: &Receiver<Arc<Self::Event>>,
1629 ) -> Result<()> {
1630 self.handle(event).await;
1631
1632 Ok(())
1633 }
1634
1635 fn cancel_subtasks(&mut self) {}
1636 }
1637
1638 impl<
1639 TYPES: NodeType,
1640 V: Versions,
1641 NET: ConnectedNetwork<TYPES::SignatureKey>,
1642 S: Storage<TYPES>,
1643 > Deref for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1644 {
1645 type Target = NetworkEventTaskState<TYPES, V, NET, S>;
1646
1647 fn deref(&self) -> &Self::Target {
1648 &self.network_event_task_state
1649 }
1650 }
1651
1652 impl<
1653 TYPES: NodeType,
1654 V: Versions,
1655 NET: ConnectedNetwork<TYPES::SignatureKey>,
1656 S: Storage<TYPES>,
1657 > DerefMut for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1658 {
1659 fn deref_mut(&mut self) -> &mut Self::Target {
1660 &mut self.network_event_task_state
1661 }
1662 }
1663}