1use std::{
8 collections::{BTreeMap, HashMap},
9 hash::{DefaultHasher, Hash, Hasher},
10 sync::Arc,
11};
12
13use async_broadcast::{Receiver, Sender};
14use async_trait::async_trait;
15use hotshot_task::task::TaskState;
16use hotshot_types::{
17 consensus::OuterConsensus,
18 data::{VidDisperse, VidDisperseShare},
19 epoch_membership::EpochMembershipCoordinator,
20 event::{Event, EventType, HotShotAction},
21 message::{
22 convert_proposal, DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message,
23 MessageKind, Proposal, SequencingMessage, UpgradeLock,
24 },
25 simple_vote::HasEpoch,
26 storage_metrics::StorageMetricsValue,
27 traits::{
28 network::{
29 BroadcastDelay, ConnectedNetwork, RequestKind, ResponseMessage, Topic, TransmitType,
30 ViewMessage,
31 },
32 node_implementation::{ConsensusTime, NodeType, Versions},
33 storage::Storage,
34 },
35 vote::{HasViewNumber, Vote},
36};
37use hotshot_utils::anytrace::*;
38use tokio::{spawn, task::JoinHandle, time::Instant};
39use tracing::instrument;
40
41use crate::{
42 events::{HotShotEvent, HotShotTaskCompleted},
43 helpers::broadcast_event,
44};
45
46#[derive(Clone)]
48pub struct NetworkMessageTaskState<TYPES: NodeType, V: Versions> {
49 pub internal_event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
51
52 pub external_event_stream: Sender<Event<TYPES>>,
54
55 pub public_key: TYPES::SignatureKey,
57
58 pub transactions_cache: lru::LruCache<u64, ()>,
60
61 pub upgrade_lock: UpgradeLock<TYPES, V>,
63
64 pub id: u64,
66}
67
68impl<TYPES: NodeType, V: Versions> NetworkMessageTaskState<TYPES, V> {
69 #[instrument(skip_all, name = "Network message task", fields(id = self.id), level = "trace")]
70 pub async fn handle_message(&mut self, message: Message<TYPES>) {
72 match &message.kind {
73 MessageKind::Consensus(_) => {
74 tracing::debug!("Received consensus message from network: {:?}", message)
75 },
76 MessageKind::Data(_) => {
77 tracing::trace!("Received data message from network: {:?}", message)
78 },
79 MessageKind::External(_) => {
80 tracing::trace!("Received external message from network: {:?}", message)
81 },
82 }
83
84 let sender = message.sender;
86 match message.kind {
87 MessageKind::Consensus(consensus_message) => {
89 let event = match consensus_message {
90 SequencingMessage::General(general_message) => match general_message {
91 GeneralConsensusMessage::Proposal(proposal) => {
92 if self
93 .upgrade_lock
94 .epochs_enabled(proposal.data.view_number())
95 .await
96 {
97 tracing::warn!(
98 "received GeneralConsensusMessage::Proposal for view {} but \
99 epochs are enabled for that view",
100 proposal.data.view_number()
101 );
102 return;
103 }
104 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
105 },
106 GeneralConsensusMessage::Proposal2Legacy(proposal) => {
107 if !self
108 .upgrade_lock
109 .proposal2_legacy_version(proposal.data.view_number())
110 .await
111 {
112 tracing::warn!(
113 "received GeneralConsensusMessage::Proposal2Legacy for view \
114 {} but we are in the wrong version for that message type",
115 proposal.data.view_number()
116 );
117 return;
118 }
119 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
120 },
121 GeneralConsensusMessage::Proposal2(proposal) => {
122 if !self
123 .upgrade_lock
124 .proposal2_version(proposal.data.view_number())
125 .await
126 {
127 tracing::warn!(
128 "received GeneralConsensusMessage::Proposal2 for view {} but \
129 we are in the wrong version for that message type",
130 proposal.data.view_number()
131 );
132 return;
133 }
134 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
135 },
136 GeneralConsensusMessage::ProposalRequested(req, sig) => {
137 HotShotEvent::QuorumProposalRequestRecv(req, sig)
138 },
139 GeneralConsensusMessage::ProposalResponse(proposal) => {
140 if self
141 .upgrade_lock
142 .epochs_enabled(proposal.data.view_number())
143 .await
144 {
145 tracing::warn!(
146 "received GeneralConsensusMessage::ProposalResponse for view \
147 {} but we are in the wrong version for that message type",
148 proposal.data.view_number()
149 );
150 return;
151 }
152 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
153 },
154 GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
155 if !self
156 .upgrade_lock
157 .proposal2_legacy_version(proposal.data.view_number())
158 .await
159 {
160 tracing::warn!(
161 "received GeneralConsensusMessage::ProposalResponse2Legacy \
162 for view {} but we are in the wrong version for that message \
163 type",
164 proposal.data.view_number()
165 );
166 return;
167 }
168 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
169 },
170 GeneralConsensusMessage::ProposalResponse2(proposal) => {
171 if !self
172 .upgrade_lock
173 .proposal2_version(proposal.data.view_number())
174 .await
175 {
176 tracing::warn!(
177 "received GeneralConsensusMessage::ProposalResponse2 for view \
178 {} but epochs are not enabled for that view",
179 proposal.data.view_number()
180 );
181 return;
182 }
183 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
184 },
185 GeneralConsensusMessage::Vote(vote) => {
186 if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
187 tracing::warn!(
188 "received GeneralConsensusMessage::Vote for view {} but \
189 epochs are enabled for that view",
190 vote.view_number()
191 );
192 return;
193 }
194 HotShotEvent::QuorumVoteRecv(vote.to_vote2())
195 },
196 GeneralConsensusMessage::Vote2(vote) => {
197 if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
198 tracing::warn!(
199 "received GeneralConsensusMessage::Vote2 for view {} but \
200 epochs are not enabled for that view",
201 vote.view_number()
202 );
203 return;
204 }
205 HotShotEvent::QuorumVoteRecv(vote)
206 },
207 GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => {
208 if self
209 .upgrade_lock
210 .epochs_enabled(view_sync_message.view_number())
211 .await
212 {
213 tracing::warn!(
214 "received GeneralConsensusMessage::ViewSyncPreCommitVote for \
215 view {} but epochs are enabled for that view",
216 view_sync_message.view_number()
217 );
218 return;
219 }
220 HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message.to_vote2())
221 },
222 GeneralConsensusMessage::ViewSyncPreCommitVote2(view_sync_message) => {
223 if !self
224 .upgrade_lock
225 .epochs_enabled(view_sync_message.view_number())
226 .await
227 {
228 tracing::warn!(
229 "received GeneralConsensusMessage::ViewSyncPreCommitVote2 for \
230 view {} but epochs are not enabled for that view",
231 view_sync_message.view_number()
232 );
233 return;
234 }
235 HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message)
236 },
237 GeneralConsensusMessage::ViewSyncPreCommitCertificate(
238 view_sync_message,
239 ) => {
240 if self
241 .upgrade_lock
242 .epochs_enabled(view_sync_message.view_number())
243 .await
244 {
245 tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
246 return;
247 }
248 HotShotEvent::ViewSyncPreCommitCertificateRecv(
249 view_sync_message.to_vsc2(),
250 )
251 },
252 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(
253 view_sync_message,
254 ) => {
255 if !self
256 .upgrade_lock
257 .epochs_enabled(view_sync_message.view_number())
258 .await
259 {
260 tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
261 return;
262 }
263 HotShotEvent::ViewSyncPreCommitCertificateRecv(view_sync_message)
264 },
265 GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => {
266 if self
267 .upgrade_lock
268 .epochs_enabled(view_sync_message.view_number())
269 .await
270 {
271 tracing::warn!(
272 "received GeneralConsensusMessage::ViewSyncCommitVote for \
273 view {} but epochs are enabled for that view",
274 view_sync_message.view_number()
275 );
276 return;
277 }
278 HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message.to_vote2())
279 },
280 GeneralConsensusMessage::ViewSyncCommitVote2(view_sync_message) => {
281 if !self
282 .upgrade_lock
283 .epochs_enabled(view_sync_message.view_number())
284 .await
285 {
286 tracing::warn!(
287 "received GeneralConsensusMessage::ViewSyncCommitVote2 for \
288 view {} but epochs are not enabled for that view",
289 view_sync_message.view_number()
290 );
291 return;
292 }
293 HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message)
294 },
295 GeneralConsensusMessage::ViewSyncCommitCertificate(view_sync_message) => {
296 if self
297 .upgrade_lock
298 .epochs_enabled(view_sync_message.view_number())
299 .await
300 {
301 tracing::warn!(
302 "received GeneralConsensusMessage::ViewSyncCommitCertificate \
303 for view {} but epochs are enabled for that view",
304 view_sync_message.view_number()
305 );
306 return;
307 }
308 HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message.to_vsc2())
309 },
310 GeneralConsensusMessage::ViewSyncCommitCertificate2(view_sync_message) => {
311 if !self
312 .upgrade_lock
313 .epochs_enabled(view_sync_message.view_number())
314 .await
315 {
316 tracing::warn!(
317 "received GeneralConsensusMessage::ViewSyncCommitCertificate2 \
318 for view {} but epochs are not enabled for that view",
319 view_sync_message.view_number()
320 );
321 return;
322 }
323 HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message)
324 },
325 GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => {
326 if self
327 .upgrade_lock
328 .epochs_enabled(view_sync_message.view_number())
329 .await
330 {
331 tracing::warn!(
332 "received GeneralConsensusMessage::ViewSyncFinalizeVote for \
333 view {} but epochs are enabled for that view",
334 view_sync_message.view_number()
335 );
336 return;
337 }
338 HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message.to_vote2())
339 },
340 GeneralConsensusMessage::ViewSyncFinalizeVote2(view_sync_message) => {
341 if !self
342 .upgrade_lock
343 .epochs_enabled(view_sync_message.view_number())
344 .await
345 {
346 tracing::warn!(
347 "received GeneralConsensusMessage::ViewSyncFinalizeVote2 for \
348 view {} but epochs are not enabled for that view",
349 view_sync_message.view_number()
350 );
351 return;
352 }
353 HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message)
354 },
355 GeneralConsensusMessage::ViewSyncFinalizeCertificate(view_sync_message) => {
356 if self
357 .upgrade_lock
358 .epochs_enabled(view_sync_message.view_number())
359 .await
360 {
361 tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
362 return;
363 }
364 HotShotEvent::ViewSyncFinalizeCertificateRecv(
365 view_sync_message.to_vsc2(),
366 )
367 },
368 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(
369 view_sync_message,
370 ) => {
371 if !self
372 .upgrade_lock
373 .epochs_enabled(view_sync_message.view_number())
374 .await
375 {
376 tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
377 return;
378 }
379 HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_message)
380 },
381 GeneralConsensusMessage::TimeoutVote(message) => {
382 if self
383 .upgrade_lock
384 .epochs_enabled(message.view_number())
385 .await
386 {
387 tracing::warn!(
388 "received GeneralConsensusMessage::TimeoutVote for view {} \
389 but epochs are enabled for that view",
390 message.view_number()
391 );
392 return;
393 }
394 HotShotEvent::TimeoutVoteRecv(message.to_vote2())
395 },
396 GeneralConsensusMessage::TimeoutVote2(message) => {
397 if !self
398 .upgrade_lock
399 .epochs_enabled(message.view_number())
400 .await
401 {
402 tracing::warn!(
403 "received GeneralConsensusMessage::TimeoutVote2 for view {} \
404 but epochs are not enabled for that view",
405 message.view_number()
406 );
407 return;
408 }
409 HotShotEvent::TimeoutVoteRecv(message)
410 },
411 GeneralConsensusMessage::UpgradeProposal(message) => {
412 HotShotEvent::UpgradeProposalRecv(message, sender)
413 },
414 GeneralConsensusMessage::UpgradeVote(message) => {
415 tracing::error!("Received upgrade vote!");
416 HotShotEvent::UpgradeVoteRecv(message)
417 },
418 GeneralConsensusMessage::HighQc(qc, next_qc) => {
419 HotShotEvent::HighQcRecv(qc, next_qc, sender)
420 },
421 GeneralConsensusMessage::ExtendedQc(qc, next_epoch_qc) => {
422 HotShotEvent::ExtendedQcRecv(qc, next_epoch_qc, sender)
423 },
424 GeneralConsensusMessage::EpochRootQuorumVote(vote) => {
425 if !self
426 .upgrade_lock
427 .proposal2_legacy_version(vote.view_number())
428 .await
429 {
430 tracing::warn!(
431 "received GeneralConsensusMessage::EpochRootQuorumVote for \
432 view {} but we do not expect this message in this version",
433 vote.view_number()
434 );
435 return;
436 }
437 HotShotEvent::EpochRootQuorumVoteRecv(vote.to_vote2())
438 },
439 GeneralConsensusMessage::EpochRootQuorumVote2(vote) => {
440 if !self
441 .upgrade_lock
442 .proposal2_version(vote.view_number())
443 .await
444 {
445 tracing::warn!(
446 "received GeneralConsensusMessage::EpochRootQuorumVote2 for \
447 view {} but we do not expect this message in this version",
448 vote.view_number()
449 );
450 return;
451 }
452 HotShotEvent::EpochRootQuorumVoteRecv(vote)
453 },
454 GeneralConsensusMessage::EpochRootQc(root_qc) => {
455 if !self
456 .upgrade_lock
457 .proposal2_version(root_qc.view_number())
458 .await
459 {
460 tracing::warn!(
461 "received GeneralConsensusMessage::EpochRootQc for view {} \
462 but we are in the wrong version for that message types",
463 root_qc.view_number()
464 );
465 return;
466 }
467 HotShotEvent::EpochRootQcRecv(root_qc, sender)
468 },
469 GeneralConsensusMessage::EpochRootQcV1(root_qc) => {
470 if !self
471 .upgrade_lock
472 .proposal2_legacy_version(root_qc.view_number())
473 .await
474 {
475 tracing::warn!(
476 "received GeneralConsensusMessage::EpochRootQcV1 for view {} \
477 but we are in the wrong version for that message type",
478 root_qc.view_number()
479 );
480 return;
481 }
482 HotShotEvent::EpochRootQcRecv(root_qc.into(), sender)
483 },
484 },
485 SequencingMessage::Da(da_message) => match da_message {
486 DaConsensusMessage::DaProposal(proposal) => {
487 if self
488 .upgrade_lock
489 .epochs_enabled(proposal.data.view_number())
490 .await
491 {
492 tracing::warn!(
493 "received DaConsensusMessage::DaProposal for view {} but \
494 epochs are enabled for that view",
495 proposal.data.view_number()
496 );
497 return;
498 }
499 HotShotEvent::DaProposalRecv(convert_proposal(proposal), sender)
500 },
501 DaConsensusMessage::DaProposal2(proposal) => {
502 if !self
503 .upgrade_lock
504 .epochs_enabled(proposal.data.view_number())
505 .await
506 {
507 tracing::warn!(
508 "received DaConsensusMessage::DaProposal2 for view {} but \
509 epochs are not enabled for that view",
510 proposal.data.view_number()
511 );
512 return;
513 }
514 HotShotEvent::DaProposalRecv(proposal, sender)
515 },
516 DaConsensusMessage::DaVote(vote) => {
517 if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
518 tracing::warn!(
519 "received DaConsensusMessage::DaVote for view {} but epochs \
520 are enabled for that view",
521 vote.view_number()
522 );
523 return;
524 }
525 HotShotEvent::DaVoteRecv(vote.clone().to_vote2())
526 },
527 DaConsensusMessage::DaVote2(vote) => {
528 if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
529 tracing::warn!(
530 "received DaConsensusMessage::DaVote2 for view {} but epochs \
531 are not enabled for that view",
532 vote.view_number()
533 );
534 return;
535 }
536 HotShotEvent::DaVoteRecv(vote.clone())
537 },
538 DaConsensusMessage::DaCertificate(cert) => {
539 if self.upgrade_lock.epochs_enabled(cert.view_number()).await {
540 tracing::warn!(
541 "received DaConsensusMessage::DaCertificate for view {} but \
542 epochs are enabled for that view",
543 cert.view_number()
544 );
545 return;
546 }
547 HotShotEvent::DaCertificateRecv(cert.to_dac2())
548 },
549 DaConsensusMessage::DaCertificate2(cert) => {
550 if !self.upgrade_lock.epochs_enabled(cert.view_number()).await {
551 tracing::warn!(
552 "received DaConsensusMessage::DaCertificate2 for view {} but \
553 epochs are not enabled for that view",
554 cert.view_number()
555 );
556 return;
557 }
558 HotShotEvent::DaCertificateRecv(cert)
559 },
560 DaConsensusMessage::VidDisperseMsg(proposal) => {
561 if self
562 .upgrade_lock
563 .epochs_enabled(proposal.data.view_number())
564 .await
565 {
566 tracing::warn!(
567 "received DaConsensusMessage::VidDisperseMsg for view {} but \
568 epochs are enabled for that view",
569 proposal.data.view_number()
570 );
571 return;
572 }
573 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
574 },
575 DaConsensusMessage::VidDisperseMsg2(proposal) => {
576 if !self
577 .upgrade_lock
578 .epochs_enabled(proposal.data.view_number())
579 .await
580 {
581 tracing::warn!(
582 "received DaConsensusMessage::VidDisperseMsg2 for view {} but \
583 epochs are not enabled for that view",
584 proposal.data.view_number()
585 );
586 return;
587 }
588 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
589 },
590 },
591 };
592 broadcast_event(Arc::new(event), &self.internal_event_stream).await;
593 },
594
595 MessageKind::Data(message) => match message {
597 DataMessage::SubmitTransaction(transaction, _) => {
598 let mut hasher = DefaultHasher::new();
599 transaction.hash(&mut hasher);
600 if self.transactions_cache.put(hasher.finish(), ()).is_some() {
601 return;
602 }
603 broadcast_event(
604 Arc::new(HotShotEvent::TransactionsRecv(vec![transaction])),
605 &self.internal_event_stream,
606 )
607 .await;
608 },
609 DataMessage::DataResponse(response) => {
610 if let ResponseMessage::Found(message) = response {
611 match message {
612 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(proposal)) => {
613 broadcast_event(
614 Arc::new(HotShotEvent::VidResponseRecv(
615 sender,
616 convert_proposal(proposal),
617 )),
618 &self.internal_event_stream,
619 )
620 .await;
621 },
622 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
623 proposal,
624 )) => {
625 broadcast_event(
626 Arc::new(HotShotEvent::VidResponseRecv(
627 sender,
628 convert_proposal(proposal),
629 )),
630 &self.internal_event_stream,
631 )
632 .await;
633 },
634 _ => {},
635 }
636 }
637 },
638 DataMessage::RequestData(data) => {
639 let req_data = data.clone();
640 if let RequestKind::Vid(_view_number, _key) = req_data.request {
641 broadcast_event(
642 Arc::new(HotShotEvent::VidRequestRecv(data, sender)),
643 &self.internal_event_stream,
644 )
645 .await;
646 }
647 },
648 },
649
650 MessageKind::External(data) => {
652 if sender == self.public_key {
653 return;
654 }
655 broadcast_event(
657 Event {
658 view_number: TYPES::View::new(1),
659 event: EventType::ExternalMessageReceived { sender, data },
660 },
661 &self.external_event_stream,
662 )
663 .await;
664 },
665 }
666 }
667}
668
669pub struct NetworkEventTaskState<
671 TYPES: NodeType,
672 V: Versions,
673 NET: ConnectedNetwork<TYPES::SignatureKey>,
674 S: Storage<TYPES>,
675> {
676 pub network: Arc<NET>,
678
679 pub view: TYPES::View,
681
682 pub epoch: Option<TYPES::Epoch>,
684
685 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
687
688 pub storage: S,
690
691 pub storage_metrics: Arc<StorageMetricsValue>,
693
694 pub consensus: OuterConsensus<TYPES>,
696
697 pub upgrade_lock: UpgradeLock<TYPES, V>,
699
700 pub transmit_tasks: BTreeMap<TYPES::View, Vec<JoinHandle<()>>>,
702
703 pub epoch_height: u64,
705
706 pub id: u64,
708}
709
710#[async_trait]
711impl<
712 TYPES: NodeType,
713 V: Versions,
714 NET: ConnectedNetwork<TYPES::SignatureKey>,
715 S: Storage<TYPES> + 'static,
716 > TaskState for NetworkEventTaskState<TYPES, V, NET, S>
717{
718 type Event = HotShotEvent<TYPES>;
719
720 async fn handle_event(
721 &mut self,
722 event: Arc<Self::Event>,
723 _sender: &Sender<Arc<Self::Event>>,
724 _receiver: &Receiver<Arc<Self::Event>>,
725 ) -> Result<()> {
726 self.handle(event).await;
727
728 Ok(())
729 }
730
731 fn cancel_subtasks(&mut self) {}
732}
733
734impl<
735 TYPES: NodeType,
736 V: Versions,
737 NET: ConnectedNetwork<TYPES::SignatureKey>,
738 S: Storage<TYPES> + 'static,
739 > NetworkEventTaskState<TYPES, V, NET, S>
740{
741 #[instrument(skip_all, fields(id = self.id, view = *self.view), name = "Network Task", level = "error")]
745 pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
746 let mut maybe_action = None;
747 if let Some((sender, message_kind, transmit)) =
748 self.parse_event(event, &mut maybe_action).await
749 {
750 self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
751 .await;
752 };
753 }
754
755 async fn handle_vid_disperse_proposal(
757 &self,
758 vid_proposal: Proposal<TYPES, VidDisperse<TYPES>>,
759 sender: &<TYPES as NodeType>::SignatureKey,
760 ) -> Option<HotShotTaskCompleted> {
761 let view = vid_proposal.data.view_number();
762 let epoch = vid_proposal.data.epoch();
763 let vid_share_proposals = VidDisperseShare::to_vid_share_proposals(vid_proposal);
764 let mut messages = HashMap::new();
765
766 for proposal in vid_share_proposals {
767 let recipient = proposal.data.recipient_key().clone();
768 let message = if self
769 .upgrade_lock
770 .epochs_enabled(proposal.data.view_number())
771 .await
772 {
773 let vid_share_proposal = if let VidDisperseShare::V1(data) = proposal.data {
774 Proposal {
775 data,
776 signature: proposal.signature,
777 _pd: proposal._pd,
778 }
779 } else {
780 tracing::warn!(
781 "Epochs are enabled for view {} but didn't receive VidDisperseShare2",
782 proposal.data.view_number()
783 );
784 return None;
785 };
786 Message {
787 sender: sender.clone(),
788 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
789 DaConsensusMessage::VidDisperseMsg2(vid_share_proposal),
790 )),
791 }
792 } else {
793 let vid_share_proposal = if let VidDisperseShare::V0(data) = proposal.data {
794 Proposal {
795 data,
796 signature: proposal.signature,
797 _pd: proposal._pd,
798 }
799 } else {
800 tracing::warn!(
801 "Epochs are not enabled for view {} but didn't receive ADVZDisperseShare",
802 proposal.data.view_number()
803 );
804 return None;
805 };
806 Message {
807 sender: sender.clone(),
808 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
809 DaConsensusMessage::VidDisperseMsg(vid_share_proposal),
810 )),
811 }
812 };
813 let serialized_message = match self.upgrade_lock.serialize(&message).await {
814 Ok(serialized) => serialized,
815 Err(e) => {
816 tracing::error!("Failed to serialize message: {e}");
817 continue;
818 },
819 };
820
821 messages.insert(recipient, serialized_message);
822 }
823
824 let net = Arc::clone(&self.network);
825 let storage = self.storage.clone();
826 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
827 spawn(async move {
828 if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
829 Some(HotShotAction::VidDisperse),
830 storage,
831 consensus,
832 view,
833 epoch,
834 )
835 .await
836 .is_err()
837 {
838 return;
839 }
840 match net.vid_broadcast_message(messages).await {
841 Ok(()) => {},
842 Err(e) => tracing::warn!("Failed to send message from network task: {e:?}"),
843 }
844 });
845
846 None
847 }
848
849 async fn maybe_record_action(
851 maybe_action: Option<HotShotAction>,
852 storage: S,
853 consensus: OuterConsensus<TYPES>,
854 view: <TYPES as NodeType>::View,
855 epoch: Option<<TYPES as NodeType>::Epoch>,
856 ) -> std::result::Result<(), ()> {
857 if let Some(mut action) = maybe_action {
858 if !consensus.write().await.update_action(action, view) {
859 return Err(());
860 }
861 if matches!(action, HotShotAction::ViewSyncVote) {
864 action = HotShotAction::Vote;
865 }
866 match storage.record_action(view, epoch, action).await {
867 Ok(()) => Ok(()),
868 Err(e) => {
869 tracing::warn!("Not Sending {action:?} because of storage error: {e:?}");
870 Err(())
871 },
872 }
873 } else {
874 Ok(())
875 }
876 }
877
878 pub fn cancel_tasks(&mut self, view: TYPES::View) {
880 let keep = self.transmit_tasks.split_off(&view);
881
882 while let Some((_, tasks)) = self.transmit_tasks.pop_first() {
883 for task in tasks {
884 task.abort();
885 }
886 }
887
888 self.transmit_tasks = keep;
889 }
890
891 #[allow(clippy::too_many_lines)]
896 async fn parse_event(
897 &mut self,
898 event: Arc<HotShotEvent<TYPES>>,
899 maybe_action: &mut Option<HotShotAction>,
900 ) -> Option<(
901 <TYPES as NodeType>::SignatureKey,
902 MessageKind<TYPES>,
903 TransmitType<TYPES>,
904 )> {
905 match event.as_ref().clone() {
906 HotShotEvent::QuorumProposalSend(proposal, sender) => {
907 *maybe_action = Some(HotShotAction::Propose);
908
909 let message = if self
910 .upgrade_lock
911 .proposal2_version(proposal.data.view_number())
912 .await
913 {
914 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
915 GeneralConsensusMessage::Proposal2(convert_proposal(proposal)),
916 ))
917 } else if self
918 .upgrade_lock
919 .proposal2_legacy_version(proposal.data.view_number())
920 .await
921 {
922 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
923 GeneralConsensusMessage::Proposal2Legacy(convert_proposal(proposal)),
924 ))
925 } else {
926 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
927 GeneralConsensusMessage::Proposal(convert_proposal(proposal)),
928 ))
929 };
930
931 Some((sender, message, TransmitType::Broadcast))
932 },
933
934 HotShotEvent::QuorumVoteSend(vote) => {
936 *maybe_action = Some(HotShotAction::Vote);
937 let view_number = vote.view_number() + 1;
938 let leader = match self
939 .membership_coordinator
940 .membership_for_epoch(vote.epoch())
941 .await
942 .ok()?
943 .leader(view_number)
944 .await
945 {
946 Ok(l) => l,
947 Err(e) => {
948 tracing::warn!(
949 "Failed to calculate leader for view number {view_number}. Error: \
950 {e:?}"
951 );
952 return None;
953 },
954 };
955
956 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
957 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
958 GeneralConsensusMessage::Vote2(vote.clone()),
959 ))
960 } else {
961 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
962 GeneralConsensusMessage::Vote(vote.clone().to_vote()),
963 ))
964 };
965
966 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
967 },
968 HotShotEvent::EpochRootQuorumVoteSend(vote) => {
969 *maybe_action = Some(HotShotAction::Vote);
970 let view_number = vote.view_number() + 1;
971 let leader = match self
972 .membership_coordinator
973 .membership_for_epoch(vote.epoch())
974 .await
975 .ok()?
976 .leader(view_number)
977 .await
978 {
979 Ok(l) => l,
980 Err(e) => {
981 tracing::warn!(
982 "Failed to calculate leader for view number {:?}. Error: {:?}",
983 view_number,
984 e
985 );
986 return None;
987 },
988 };
989
990 let message = if self
991 .upgrade_lock
992 .proposal2_version(vote.view_number())
993 .await
994 {
995 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
996 GeneralConsensusMessage::EpochRootQuorumVote2(vote.clone()),
997 ))
998 } else if self
999 .upgrade_lock
1000 .proposal2_legacy_version(vote.view_number())
1001 .await
1002 {
1003 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1004 GeneralConsensusMessage::EpochRootQuorumVote(vote.clone().to_vote()),
1005 ))
1006 } else {
1007 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1008 GeneralConsensusMessage::Vote(vote.vote.clone().to_vote()),
1009 ))
1010 };
1011
1012 Some((
1013 vote.vote.signing_key(),
1014 message,
1015 TransmitType::Direct(leader),
1016 ))
1017 },
1018 HotShotEvent::ExtendedQuorumVoteSend(vote) => {
1019 *maybe_action = Some(HotShotAction::Vote);
1020 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1021 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1022 GeneralConsensusMessage::Vote2(vote.clone()),
1023 ))
1024 } else {
1025 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1026 GeneralConsensusMessage::Vote(vote.clone().to_vote()),
1027 ))
1028 };
1029
1030 Some((vote.signing_key(), message, TransmitType::Broadcast))
1031 },
1032 HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
1033 req.key.clone(),
1034 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1035 GeneralConsensusMessage::ProposalRequested(req.clone(), signature),
1036 )),
1037 TransmitType::Broadcast,
1038 )),
1039 HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => {
1040 let message = if self
1041 .upgrade_lock
1042 .proposal2_version(proposal.data.view_number())
1043 .await
1044 {
1045 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1046 GeneralConsensusMessage::ProposalResponse2(convert_proposal(proposal)),
1047 ))
1048 } else if self
1049 .upgrade_lock
1050 .proposal2_legacy_version(proposal.data.view_number())
1051 .await
1052 {
1053 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1054 GeneralConsensusMessage::ProposalResponse2Legacy(convert_proposal(
1055 proposal,
1056 )),
1057 ))
1058 } else {
1059 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1060 GeneralConsensusMessage::ProposalResponse(convert_proposal(proposal)),
1061 ))
1062 };
1063
1064 Some((
1065 sender_key.clone(),
1066 message,
1067 TransmitType::Direct(sender_key),
1068 ))
1069 },
1070 HotShotEvent::VidDisperseSend(proposal, sender) => {
1071 self.handle_vid_disperse_proposal(proposal, &sender).await;
1072 None
1073 },
1074 HotShotEvent::DaProposalSend(proposal, sender) => {
1075 *maybe_action = Some(HotShotAction::DaPropose);
1076
1077 let message = if self
1078 .upgrade_lock
1079 .epochs_enabled(proposal.data.view_number())
1080 .await
1081 {
1082 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1083 DaConsensusMessage::DaProposal2(proposal),
1084 ))
1085 } else {
1086 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1087 DaConsensusMessage::DaProposal(convert_proposal(proposal)),
1088 ))
1089 };
1090
1091 Some((sender, message, TransmitType::DaCommitteeBroadcast))
1092 },
1093 HotShotEvent::DaVoteSend(vote) => {
1094 *maybe_action = Some(HotShotAction::DaVote);
1095 let view_number = vote.view_number();
1096 let leader = match self
1097 .membership_coordinator
1098 .membership_for_epoch(vote.epoch())
1099 .await
1100 .ok()?
1101 .leader(view_number)
1102 .await
1103 {
1104 Ok(l) => l,
1105 Err(e) => {
1106 tracing::warn!(
1107 "Failed to calculate leader for view number {view_number}. Error: \
1108 {e:?}"
1109 );
1110 return None;
1111 },
1112 };
1113
1114 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1115 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1116 DaConsensusMessage::DaVote2(vote.clone()),
1117 ))
1118 } else {
1119 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1120 DaConsensusMessage::DaVote(vote.clone().to_vote()),
1121 ))
1122 };
1123
1124 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1125 },
1126 HotShotEvent::DacSend(certificate, sender) => {
1127 *maybe_action = Some(HotShotAction::DaCert);
1128 let message = if self
1129 .upgrade_lock
1130 .epochs_enabled(certificate.view_number())
1131 .await
1132 {
1133 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1134 DaConsensusMessage::DaCertificate2(certificate),
1135 ))
1136 } else {
1137 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1138 DaConsensusMessage::DaCertificate(certificate.to_dac()),
1139 ))
1140 };
1141
1142 Some((sender, message, TransmitType::Broadcast))
1143 },
1144 HotShotEvent::ViewSyncPreCommitVoteSend(vote) => {
1145 let view_number = vote.view_number() + vote.date().relay;
1146 let leader = match self
1147 .membership_coordinator
1148 .membership_for_epoch(self.epoch)
1149 .await
1150 .ok()?
1151 .leader(view_number)
1152 .await
1153 {
1154 Ok(l) => l,
1155 Err(e) => {
1156 tracing::warn!(
1157 "Failed to calculate leader for view number {view_number}. Error: \
1158 {e:?}"
1159 );
1160 return None;
1161 },
1162 };
1163 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1164 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1165 GeneralConsensusMessage::ViewSyncPreCommitVote2(vote.clone()),
1166 ))
1167 } else {
1168 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1169 GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone().to_vote()),
1170 ))
1171 };
1172
1173 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1174 },
1175 HotShotEvent::ViewSyncCommitVoteSend(vote) => {
1176 *maybe_action = Some(HotShotAction::ViewSyncVote);
1177 let view_number = vote.view_number() + vote.date().relay;
1178 let leader = match self
1179 .membership_coordinator
1180 .membership_for_epoch(self.epoch)
1181 .await
1182 .ok()?
1183 .leader(view_number)
1184 .await
1185 {
1186 Ok(l) => l,
1187 Err(e) => {
1188 tracing::warn!(
1189 "Failed to calculate leader for view number {view_number}. Error: \
1190 {e:?}"
1191 );
1192 return None;
1193 },
1194 };
1195 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1196 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1197 GeneralConsensusMessage::ViewSyncCommitVote2(vote.clone()),
1198 ))
1199 } else {
1200 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1201 GeneralConsensusMessage::ViewSyncCommitVote(vote.clone().to_vote()),
1202 ))
1203 };
1204
1205 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1206 },
1207 HotShotEvent::ViewSyncFinalizeVoteSend(vote) => {
1208 *maybe_action = Some(HotShotAction::ViewSyncVote);
1209 let view_number = vote.view_number() + vote.date().relay;
1210 let leader = match self
1211 .membership_coordinator
1212 .membership_for_epoch(self.epoch)
1213 .await
1214 .ok()?
1215 .leader(view_number)
1216 .await
1217 {
1218 Ok(l) => l,
1219 Err(e) => {
1220 tracing::warn!(
1221 "Failed to calculate leader for view number {view_number:?}. Error: \
1222 {e:?}"
1223 );
1224 return None;
1225 },
1226 };
1227 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1228 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1229 GeneralConsensusMessage::ViewSyncFinalizeVote2(vote.clone()),
1230 ))
1231 } else {
1232 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1233 GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone().to_vote()),
1234 ))
1235 };
1236
1237 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1238 },
1239 HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, sender) => {
1240 let view_number = certificate.view_number();
1241 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1242 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1243 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(certificate),
1244 ))
1245 } else {
1246 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1247 GeneralConsensusMessage::ViewSyncPreCommitCertificate(certificate.to_vsc()),
1248 ))
1249 };
1250
1251 Some((sender, message, TransmitType::Broadcast))
1252 },
1253 HotShotEvent::ViewSyncCommitCertificateSend(certificate, sender) => {
1254 let view_number = certificate.view_number();
1255 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1256 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1257 GeneralConsensusMessage::ViewSyncCommitCertificate2(certificate),
1258 ))
1259 } else {
1260 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1261 GeneralConsensusMessage::ViewSyncCommitCertificate(certificate.to_vsc()),
1262 ))
1263 };
1264
1265 Some((sender, message, TransmitType::Broadcast))
1266 },
1267 HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, sender) => {
1268 let view_number = certificate.view_number();
1269 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1270 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1271 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(certificate),
1272 ))
1273 } else {
1274 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1275 GeneralConsensusMessage::ViewSyncFinalizeCertificate(certificate.to_vsc()),
1276 ))
1277 };
1278
1279 Some((sender, message, TransmitType::Broadcast))
1280 },
1281 HotShotEvent::TimeoutVoteSend(vote) => {
1282 *maybe_action = Some(HotShotAction::TimeoutVote);
1283 let view_number = vote.view_number() + 1;
1284 let leader = match self
1285 .membership_coordinator
1286 .membership_for_epoch(self.epoch)
1287 .await
1288 .ok()?
1289 .leader(view_number)
1290 .await
1291 {
1292 Ok(l) => l,
1293 Err(e) => {
1294 tracing::warn!(
1295 "Failed to calculate leader for view number {view_number}. Error: \
1296 {e:?}"
1297 );
1298 return None;
1299 },
1300 };
1301 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1302 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1303 GeneralConsensusMessage::TimeoutVote2(vote.clone()),
1304 ))
1305 } else {
1306 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1307 GeneralConsensusMessage::TimeoutVote(vote.clone().to_vote()),
1308 ))
1309 };
1310
1311 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1312 },
1313 HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
1314 sender,
1315 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1316 GeneralConsensusMessage::UpgradeProposal(proposal),
1317 )),
1318 TransmitType::Broadcast,
1319 )),
1320 HotShotEvent::UpgradeVoteSend(vote) => {
1321 tracing::error!("Sending upgrade vote!");
1322 let view_number = vote.view_number();
1323 let leader = match self
1324 .membership_coordinator
1325 .membership_for_epoch(self.epoch)
1326 .await
1327 .ok()?
1328 .leader(view_number)
1329 .await
1330 {
1331 Ok(l) => l,
1332 Err(e) => {
1333 tracing::warn!(
1334 "Failed to calculate leader for view number {view_number}. Error: \
1335 {e:?}"
1336 );
1337 return None;
1338 },
1339 };
1340 Some((
1341 vote.signing_key(),
1342 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1343 GeneralConsensusMessage::UpgradeVote(vote.clone()),
1344 )),
1345 TransmitType::Direct(leader),
1346 ))
1347 },
1348 HotShotEvent::ViewChange(view, epoch) => {
1349 self.view = view;
1350 if epoch > self.epoch {
1351 self.epoch = epoch;
1352 }
1353 let keep_view = TYPES::View::new(view.saturating_sub(1));
1354 self.cancel_tasks(keep_view);
1355 let net = Arc::clone(&self.network);
1356 let epoch = self.epoch.map(|x| x.u64());
1357 let membership_coordinator = self.membership_coordinator.clone();
1358 spawn(async move {
1359 net.update_view::<TYPES>(*keep_view, epoch, membership_coordinator)
1360 .await;
1361 });
1362 None
1363 },
1364 HotShotEvent::VidRequestSend(req, sender, to) => Some((
1365 sender,
1366 MessageKind::Data(DataMessage::RequestData(req)),
1367 TransmitType::Direct(to),
1368 )),
1369 HotShotEvent::VidResponseSend(sender, to, proposal) => {
1370 let epochs_enabled = self
1371 .upgrade_lock
1372 .epochs_enabled(proposal.data.view_number())
1373 .await;
1374 let message = match proposal.data {
1375 VidDisperseShare::V0(data) => {
1376 if epochs_enabled {
1377 tracing::warn!(
1378 "Epochs are enabled for view {} but didn't receive \
1379 VidDisperseShare2",
1380 data.view_number()
1381 );
1382 return None;
1383 }
1384 let vid_share_proposal = Proposal {
1385 data,
1386 signature: proposal.signature,
1387 _pd: proposal._pd,
1388 };
1389 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1390 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(
1391 vid_share_proposal,
1392 )),
1393 )))
1394 },
1395 VidDisperseShare::V1(data) => {
1396 if !epochs_enabled {
1397 tracing::warn!(
1398 "Epochs are enabled for view {} but didn't receive \
1399 ADVZDisperseShare",
1400 data.view_number()
1401 );
1402 return None;
1403 }
1404 let vid_share_proposal = Proposal {
1405 data,
1406 signature: proposal.signature,
1407 _pd: proposal._pd,
1408 };
1409 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1410 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
1411 vid_share_proposal,
1412 )),
1413 )))
1414 },
1415 };
1416 Some((sender, message, TransmitType::Direct(to)))
1417 },
1418 HotShotEvent::HighQcSend(quorum_cert, next_epoch_qc, leader, sender) => Some((
1419 sender,
1420 MessageKind::Consensus(SequencingMessage::General(
1421 GeneralConsensusMessage::HighQc(quorum_cert, next_epoch_qc),
1422 )),
1423 TransmitType::Direct(leader),
1424 )),
1425 HotShotEvent::EpochRootQcSend(epoch_root_qc, sender, leader) => {
1426 let message = if self
1427 .upgrade_lock
1428 .proposal2_version(epoch_root_qc.view_number())
1429 .await
1430 {
1431 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1432 GeneralConsensusMessage::EpochRootQc(epoch_root_qc),
1433 ))
1434 } else {
1435 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1436 GeneralConsensusMessage::EpochRootQcV1(epoch_root_qc.into()),
1437 ))
1438 };
1439
1440 Some((sender, message, TransmitType::Direct(leader)))
1441 },
1442 HotShotEvent::ExtendedQcSend(quorum_cert, next_epoch_qc, sender) => Some((
1443 sender,
1444 MessageKind::Consensus(SequencingMessage::General(
1445 GeneralConsensusMessage::ExtendedQc(quorum_cert, next_epoch_qc),
1446 )),
1447 TransmitType::Broadcast,
1448 )),
1449 _ => None,
1450 }
1451 }
1452
1453 async fn spawn_transmit_task(
1455 &mut self,
1456 message_kind: MessageKind<TYPES>,
1457 maybe_action: Option<HotShotAction>,
1458 transmit: TransmitType<TYPES>,
1459 sender: TYPES::SignatureKey,
1460 ) {
1461 let broadcast_delay = match &message_kind {
1462 MessageKind::Consensus(
1463 SequencingMessage::General(GeneralConsensusMessage::Vote(_))
1464 | SequencingMessage::Da(_),
1465 ) => BroadcastDelay::View(*message_kind.view_number()),
1466 _ => BroadcastDelay::None,
1467 };
1468 let message = Message {
1469 sender,
1470 kind: message_kind,
1471 };
1472 let view_number = message.kind.view_number();
1473 let epoch = message.kind.epoch();
1474 let committee_topic = Topic::Global;
1475 let Ok(mem) = self
1476 .membership_coordinator
1477 .stake_table_for_epoch(self.epoch)
1478 .await
1479 else {
1480 return;
1481 };
1482 let da_committee = mem.da_committee_members(view_number).await;
1483 let network = Arc::clone(&self.network);
1484 let storage = self.storage.clone();
1485 let storage_metrics = Arc::clone(&self.storage_metrics);
1486 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
1487 let upgrade_lock = self.upgrade_lock.clone();
1488 let handle = spawn(async move {
1489 if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
1490 maybe_action,
1491 storage.clone(),
1492 consensus,
1493 view_number,
1494 epoch,
1495 )
1496 .await
1497 .is_err()
1498 {
1499 return;
1500 }
1501 if let MessageKind::Consensus(SequencingMessage::General(
1502 GeneralConsensusMessage::Proposal(prop),
1503 )) = &message.kind
1504 {
1505 let now = Instant::now();
1506 if storage
1507 .append_proposal2(&convert_proposal(prop.clone()))
1508 .await
1509 .is_err()
1510 {
1511 return;
1512 }
1513 storage_metrics
1514 .append_quorum_duration
1515 .add_point(now.elapsed().as_secs_f64());
1516 }
1517
1518 let serialized_message = match upgrade_lock.serialize(&message).await {
1519 Ok(serialized) => serialized,
1520 Err(e) => {
1521 tracing::error!("Failed to serialize message: {e}");
1522 return;
1523 },
1524 };
1525
1526 let transmit_result = match transmit {
1527 TransmitType::Direct(recipient) => {
1528 network.direct_message(serialized_message, recipient).await
1529 },
1530 TransmitType::Broadcast => {
1531 network
1532 .broadcast_message(serialized_message, committee_topic, broadcast_delay)
1533 .await
1534 },
1535 TransmitType::DaCommitteeBroadcast => {
1536 network
1537 .da_broadcast_message(
1538 serialized_message,
1539 da_committee.iter().cloned().collect(),
1540 broadcast_delay,
1541 )
1542 .await
1543 },
1544 };
1545
1546 match transmit_result {
1547 Ok(()) => {},
1548 Err(e) => tracing::warn!("Failed to send message task: {e:?}"),
1549 }
1550 });
1551 self.transmit_tasks
1552 .entry(view_number)
1553 .or_default()
1554 .push(handle);
1555 }
1556}
1557
1558pub mod test {
1560 use std::ops::{Deref, DerefMut};
1561
1562 use async_trait::async_trait;
1563
1564 use super::{
1565 Arc, ConnectedNetwork, HotShotEvent, MessageKind, NetworkEventTaskState, NodeType,
1566 Receiver, Result, Sender, Storage, TaskState, TransmitType, Versions,
1567 };
1568
1569 pub type ModifierClosure<TYPES> = dyn Fn(
1572 &mut <TYPES as NodeType>::SignatureKey,
1573 &mut MessageKind<TYPES>,
1574 &mut TransmitType<TYPES>,
1575 &<TYPES as NodeType>::Membership,
1576 ) + Send
1577 + Sync;
1578
1579 pub struct NetworkEventTaskStateModifier<
1581 TYPES: NodeType,
1582 V: Versions,
1583 NET: ConnectedNetwork<TYPES::SignatureKey>,
1584 S: Storage<TYPES>,
1585 > {
1586 pub network_event_task_state: NetworkEventTaskState<TYPES, V, NET, S>,
1588 pub modifier: Arc<ModifierClosure<TYPES>>,
1591 }
1592
1593 impl<
1594 TYPES: NodeType,
1595 V: Versions,
1596 NET: ConnectedNetwork<TYPES::SignatureKey>,
1597 S: Storage<TYPES> + 'static,
1598 > NetworkEventTaskStateModifier<TYPES, V, NET, S>
1599 {
1600 pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
1602 let mut maybe_action = None;
1603 if let Some((mut sender, mut message_kind, mut transmit)) =
1604 self.parse_event(event, &mut maybe_action).await
1605 {
1606 (self.modifier)(
1608 &mut sender,
1609 &mut message_kind,
1610 &mut transmit,
1611 &*self.membership_coordinator.membership().read().await,
1612 );
1613
1614 self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
1615 .await;
1616 }
1617 }
1618 }
1619
1620 #[async_trait]
1621 impl<
1622 TYPES: NodeType,
1623 V: Versions,
1624 NET: ConnectedNetwork<TYPES::SignatureKey>,
1625 S: Storage<TYPES> + 'static,
1626 > TaskState for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1627 {
1628 type Event = HotShotEvent<TYPES>;
1629
1630 async fn handle_event(
1631 &mut self,
1632 event: Arc<Self::Event>,
1633 _sender: &Sender<Arc<Self::Event>>,
1634 _receiver: &Receiver<Arc<Self::Event>>,
1635 ) -> Result<()> {
1636 self.handle(event).await;
1637
1638 Ok(())
1639 }
1640
1641 fn cancel_subtasks(&mut self) {}
1642 }
1643
1644 impl<
1645 TYPES: NodeType,
1646 V: Versions,
1647 NET: ConnectedNetwork<TYPES::SignatureKey>,
1648 S: Storage<TYPES>,
1649 > Deref for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1650 {
1651 type Target = NetworkEventTaskState<TYPES, V, NET, S>;
1652
1653 fn deref(&self) -> &Self::Target {
1654 &self.network_event_task_state
1655 }
1656 }
1657
1658 impl<
1659 TYPES: NodeType,
1660 V: Versions,
1661 NET: ConnectedNetwork<TYPES::SignatureKey>,
1662 S: Storage<TYPES>,
1663 > DerefMut for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1664 {
1665 fn deref_mut(&mut self) -> &mut Self::Target {
1666 &mut self.network_event_task_state
1667 }
1668 }
1669}