1use std::{
8 collections::{BTreeMap, HashMap},
9 hash::{DefaultHasher, Hash, Hasher},
10 sync::Arc,
11};
12
13use async_broadcast::{Receiver, Sender};
14use async_trait::async_trait;
15use hotshot_task::task::TaskState;
16use hotshot_types::{
17 consensus::OuterConsensus,
18 data::{VidDisperse, VidDisperseShare},
19 epoch_membership::EpochMembershipCoordinator,
20 event::{Event, EventType, HotShotAction},
21 message::{
22 convert_proposal, DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message,
23 MessageKind, Proposal, SequencingMessage, UpgradeLock,
24 },
25 simple_vote::HasEpoch,
26 storage_metrics::StorageMetricsValue,
27 traits::{
28 network::{
29 BroadcastDelay, ConnectedNetwork, RequestKind, ResponseMessage, Topic, TransmitType,
30 ViewMessage,
31 },
32 node_implementation::{ConsensusTime, NodeType, Versions},
33 storage::Storage,
34 },
35 vote::{HasViewNumber, Vote},
36};
37use hotshot_utils::anytrace::*;
38use tokio::{spawn, task::JoinHandle, time::Instant};
39use tracing::instrument;
40
41use crate::{
42 events::{HotShotEvent, HotShotTaskCompleted},
43 helpers::broadcast_event,
44};
45
46#[derive(Clone)]
48pub struct NetworkMessageTaskState<TYPES: NodeType, V: Versions> {
49 pub internal_event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
51
52 pub external_event_stream: Sender<Event<TYPES>>,
54
55 pub public_key: TYPES::SignatureKey,
57
58 pub transactions_cache: lru::LruCache<u64, ()>,
60
61 pub upgrade_lock: UpgradeLock<TYPES, V>,
63
64 pub id: u64,
66}
67
68impl<TYPES: NodeType, V: Versions> NetworkMessageTaskState<TYPES, V> {
69 #[instrument(skip_all, name = "Network message task", fields(id = self.id), level = "trace")]
70 pub async fn handle_message(&mut self, message: Message<TYPES>) {
72 match &message.kind {
73 MessageKind::Consensus(_) => {
74 tracing::debug!("Received consensus message from network: {:?}", message)
75 },
76 MessageKind::Data(_) => {
77 tracing::trace!("Received data message from network: {:?}", message)
78 },
79 MessageKind::External(_) => {
80 tracing::trace!("Received external message from network: {:?}", message)
81 },
82 }
83
84 let sender = message.sender;
86 match message.kind {
87 MessageKind::Consensus(consensus_message) => {
89 let event = match consensus_message {
90 SequencingMessage::General(general_message) => match general_message {
91 GeneralConsensusMessage::Proposal(proposal) => {
92 if self
93 .upgrade_lock
94 .epochs_enabled(proposal.data.view_number())
95 .await
96 {
97 tracing::warn!(
98 "received GeneralConsensusMessage::Proposal for view {} but \
99 epochs are enabled for that view",
100 proposal.data.view_number()
101 );
102 return;
103 }
104 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
105 },
106 GeneralConsensusMessage::Proposal2(proposal) => {
107 if !self
108 .upgrade_lock
109 .epochs_enabled(proposal.data.view_number())
110 .await
111 {
112 tracing::warn!(
113 "received GeneralConsensusMessage::Proposal2 for view {} but \
114 epochs are not enabled for that view",
115 proposal.data.view_number()
116 );
117 return;
118 }
119 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
120 },
121 GeneralConsensusMessage::ProposalRequested(req, sig) => {
122 HotShotEvent::QuorumProposalRequestRecv(req, sig)
123 },
124 GeneralConsensusMessage::ProposalResponse(proposal) => {
125 if self
126 .upgrade_lock
127 .epochs_enabled(proposal.data.view_number())
128 .await
129 {
130 tracing::warn!(
131 "received GeneralConsensusMessage::ProposalResponse for view \
132 {} but epochs are enabled for that view",
133 proposal.data.view_number()
134 );
135 return;
136 }
137 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
138 },
139 GeneralConsensusMessage::ProposalResponse2(proposal) => {
140 if !self
141 .upgrade_lock
142 .epochs_enabled(proposal.data.view_number())
143 .await
144 {
145 tracing::warn!(
146 "received GeneralConsensusMessage::ProposalResponse2 for view \
147 {} but epochs are not enabled for that view",
148 proposal.data.view_number()
149 );
150 return;
151 }
152 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
153 },
154 GeneralConsensusMessage::Vote(vote) => {
155 if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
156 tracing::warn!(
157 "received GeneralConsensusMessage::Vote for view {} but \
158 epochs are enabled for that view",
159 vote.view_number()
160 );
161 return;
162 }
163 HotShotEvent::QuorumVoteRecv(vote.to_vote2())
164 },
165 GeneralConsensusMessage::Vote2(vote) => {
166 if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
167 tracing::warn!(
168 "received GeneralConsensusMessage::Vote2 for view {} but \
169 epochs are not enabled for that view",
170 vote.view_number()
171 );
172 return;
173 }
174 HotShotEvent::QuorumVoteRecv(vote)
175 },
176 GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => {
177 if self
178 .upgrade_lock
179 .epochs_enabled(view_sync_message.view_number())
180 .await
181 {
182 tracing::warn!(
183 "received GeneralConsensusMessage::ViewSyncPreCommitVote for \
184 view {} but epochs are enabled for that view",
185 view_sync_message.view_number()
186 );
187 return;
188 }
189 HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message.to_vote2())
190 },
191 GeneralConsensusMessage::ViewSyncPreCommitVote2(view_sync_message) => {
192 if !self
193 .upgrade_lock
194 .epochs_enabled(view_sync_message.view_number())
195 .await
196 {
197 tracing::warn!(
198 "received GeneralConsensusMessage::ViewSyncPreCommitVote2 for \
199 view {} but epochs are not enabled for that view",
200 view_sync_message.view_number()
201 );
202 return;
203 }
204 HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message)
205 },
206 GeneralConsensusMessage::ViewSyncPreCommitCertificate(
207 view_sync_message,
208 ) => {
209 if self
210 .upgrade_lock
211 .epochs_enabled(view_sync_message.view_number())
212 .await
213 {
214 tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
215 return;
216 }
217 HotShotEvent::ViewSyncPreCommitCertificateRecv(
218 view_sync_message.to_vsc2(),
219 )
220 },
221 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(
222 view_sync_message,
223 ) => {
224 if !self
225 .upgrade_lock
226 .epochs_enabled(view_sync_message.view_number())
227 .await
228 {
229 tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
230 return;
231 }
232 HotShotEvent::ViewSyncPreCommitCertificateRecv(view_sync_message)
233 },
234 GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => {
235 if self
236 .upgrade_lock
237 .epochs_enabled(view_sync_message.view_number())
238 .await
239 {
240 tracing::warn!(
241 "received GeneralConsensusMessage::ViewSyncCommitVote for \
242 view {} but epochs are enabled for that view",
243 view_sync_message.view_number()
244 );
245 return;
246 }
247 HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message.to_vote2())
248 },
249 GeneralConsensusMessage::ViewSyncCommitVote2(view_sync_message) => {
250 if !self
251 .upgrade_lock
252 .epochs_enabled(view_sync_message.view_number())
253 .await
254 {
255 tracing::warn!(
256 "received GeneralConsensusMessage::ViewSyncCommitVote2 for \
257 view {} but epochs are not enabled for that view",
258 view_sync_message.view_number()
259 );
260 return;
261 }
262 HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message)
263 },
264 GeneralConsensusMessage::ViewSyncCommitCertificate(view_sync_message) => {
265 if self
266 .upgrade_lock
267 .epochs_enabled(view_sync_message.view_number())
268 .await
269 {
270 tracing::warn!(
271 "received GeneralConsensusMessage::ViewSyncCommitCertificate \
272 for view {} but epochs are enabled for that view",
273 view_sync_message.view_number()
274 );
275 return;
276 }
277 HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message.to_vsc2())
278 },
279 GeneralConsensusMessage::ViewSyncCommitCertificate2(view_sync_message) => {
280 if !self
281 .upgrade_lock
282 .epochs_enabled(view_sync_message.view_number())
283 .await
284 {
285 tracing::warn!(
286 "received GeneralConsensusMessage::ViewSyncCommitCertificate2 \
287 for view {} but epochs are not enabled for that view",
288 view_sync_message.view_number()
289 );
290 return;
291 }
292 HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message)
293 },
294 GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => {
295 if self
296 .upgrade_lock
297 .epochs_enabled(view_sync_message.view_number())
298 .await
299 {
300 tracing::warn!(
301 "received GeneralConsensusMessage::ViewSyncFinalizeVote for \
302 view {} but epochs are enabled for that view",
303 view_sync_message.view_number()
304 );
305 return;
306 }
307 HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message.to_vote2())
308 },
309 GeneralConsensusMessage::ViewSyncFinalizeVote2(view_sync_message) => {
310 if !self
311 .upgrade_lock
312 .epochs_enabled(view_sync_message.view_number())
313 .await
314 {
315 tracing::warn!(
316 "received GeneralConsensusMessage::ViewSyncFinalizeVote2 for \
317 view {} but epochs are not enabled for that view",
318 view_sync_message.view_number()
319 );
320 return;
321 }
322 HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message)
323 },
324 GeneralConsensusMessage::ViewSyncFinalizeCertificate(view_sync_message) => {
325 if self
326 .upgrade_lock
327 .epochs_enabled(view_sync_message.view_number())
328 .await
329 {
330 tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
331 return;
332 }
333 HotShotEvent::ViewSyncFinalizeCertificateRecv(
334 view_sync_message.to_vsc2(),
335 )
336 },
337 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(
338 view_sync_message,
339 ) => {
340 if !self
341 .upgrade_lock
342 .epochs_enabled(view_sync_message.view_number())
343 .await
344 {
345 tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
346 return;
347 }
348 HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_message)
349 },
350 GeneralConsensusMessage::TimeoutVote(message) => {
351 if self
352 .upgrade_lock
353 .epochs_enabled(message.view_number())
354 .await
355 {
356 tracing::warn!(
357 "received GeneralConsensusMessage::TimeoutVote for view {} \
358 but epochs are enabled for that view",
359 message.view_number()
360 );
361 return;
362 }
363 HotShotEvent::TimeoutVoteRecv(message.to_vote2())
364 },
365 GeneralConsensusMessage::TimeoutVote2(message) => {
366 if !self
367 .upgrade_lock
368 .epochs_enabled(message.view_number())
369 .await
370 {
371 tracing::warn!(
372 "received GeneralConsensusMessage::TimeoutVote2 for view {} \
373 but epochs are not enabled for that view",
374 message.view_number()
375 );
376 return;
377 }
378 HotShotEvent::TimeoutVoteRecv(message)
379 },
380 GeneralConsensusMessage::UpgradeProposal(message) => {
381 HotShotEvent::UpgradeProposalRecv(message, sender)
382 },
383 GeneralConsensusMessage::UpgradeVote(message) => {
384 tracing::error!("Received upgrade vote!");
385 HotShotEvent::UpgradeVoteRecv(message)
386 },
387 GeneralConsensusMessage::HighQc(qc, next_qc) => {
388 HotShotEvent::HighQcRecv(qc, next_qc, sender)
389 },
390 GeneralConsensusMessage::ExtendedQc(qc, next_epoch_qc) => {
391 HotShotEvent::ExtendedQcRecv(qc, next_epoch_qc, sender)
392 },
393 GeneralConsensusMessage::EpochRootQuorumVote(vote) => {
394 if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
395 tracing::warn!(
396 "received GeneralConsensusMessage::EpochRootVote for view {} \
397 but epochs are not enabled for that view",
398 vote.view_number()
399 );
400 return;
401 }
402 HotShotEvent::EpochRootQuorumVoteRecv(vote)
403 },
404 GeneralConsensusMessage::EpochRootQc(root_qc) => {
405 if !self
406 .upgrade_lock
407 .epochs_enabled(root_qc.view_number())
408 .await
409 {
410 tracing::warn!(
411 "received GeneralConsensusMessage::EpochRootQc for view {} \
412 but epochs are not enabled for that view",
413 root_qc.view_number()
414 );
415 return;
416 }
417 HotShotEvent::EpochRootQcRecv(root_qc, sender)
418 },
419 },
420 SequencingMessage::Da(da_message) => match da_message {
421 DaConsensusMessage::DaProposal(proposal) => {
422 if self
423 .upgrade_lock
424 .epochs_enabled(proposal.data.view_number())
425 .await
426 {
427 tracing::warn!(
428 "received DaConsensusMessage::DaProposal for view {} but \
429 epochs are enabled for that view",
430 proposal.data.view_number()
431 );
432 return;
433 }
434 HotShotEvent::DaProposalRecv(convert_proposal(proposal), sender)
435 },
436 DaConsensusMessage::DaProposal2(proposal) => {
437 if !self
438 .upgrade_lock
439 .epochs_enabled(proposal.data.view_number())
440 .await
441 {
442 tracing::warn!(
443 "received DaConsensusMessage::DaProposal2 for view {} but \
444 epochs are not enabled for that view",
445 proposal.data.view_number()
446 );
447 return;
448 }
449 HotShotEvent::DaProposalRecv(proposal, sender)
450 },
451 DaConsensusMessage::DaVote(vote) => {
452 if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
453 tracing::warn!(
454 "received DaConsensusMessage::DaVote for view {} but epochs \
455 are enabled for that view",
456 vote.view_number()
457 );
458 return;
459 }
460 HotShotEvent::DaVoteRecv(vote.clone().to_vote2())
461 },
462 DaConsensusMessage::DaVote2(vote) => {
463 if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
464 tracing::warn!(
465 "received DaConsensusMessage::DaVote2 for view {} but epochs \
466 are not enabled for that view",
467 vote.view_number()
468 );
469 return;
470 }
471 HotShotEvent::DaVoteRecv(vote.clone())
472 },
473 DaConsensusMessage::DaCertificate(cert) => {
474 if self.upgrade_lock.epochs_enabled(cert.view_number()).await {
475 tracing::warn!(
476 "received DaConsensusMessage::DaCertificate for view {} but \
477 epochs are enabled for that view",
478 cert.view_number()
479 );
480 return;
481 }
482 HotShotEvent::DaCertificateRecv(cert.to_dac2())
483 },
484 DaConsensusMessage::DaCertificate2(cert) => {
485 if !self.upgrade_lock.epochs_enabled(cert.view_number()).await {
486 tracing::warn!(
487 "received DaConsensusMessage::DaCertificate2 for view {} but \
488 epochs are not enabled for that view",
489 cert.view_number()
490 );
491 return;
492 }
493 HotShotEvent::DaCertificateRecv(cert)
494 },
495 DaConsensusMessage::VidDisperseMsg(proposal) => {
496 if self
497 .upgrade_lock
498 .epochs_enabled(proposal.data.view_number())
499 .await
500 {
501 tracing::warn!(
502 "received DaConsensusMessage::VidDisperseMsg for view {} but \
503 epochs are enabled for that view",
504 proposal.data.view_number()
505 );
506 return;
507 }
508 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
509 },
510 DaConsensusMessage::VidDisperseMsg2(proposal) => {
511 if !self
512 .upgrade_lock
513 .epochs_enabled(proposal.data.view_number())
514 .await
515 {
516 tracing::warn!(
517 "received DaConsensusMessage::VidDisperseMsg2 for view {} but \
518 epochs are not enabled for that view",
519 proposal.data.view_number()
520 );
521 return;
522 }
523 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
524 },
525 },
526 };
527 broadcast_event(Arc::new(event), &self.internal_event_stream).await;
528 },
529
530 MessageKind::Data(message) => match message {
532 DataMessage::SubmitTransaction(transaction, _) => {
533 let mut hasher = DefaultHasher::new();
534 transaction.hash(&mut hasher);
535 if self.transactions_cache.put(hasher.finish(), ()).is_some() {
536 return;
537 }
538 broadcast_event(
539 Arc::new(HotShotEvent::TransactionsRecv(vec![transaction])),
540 &self.internal_event_stream,
541 )
542 .await;
543 },
544 DataMessage::DataResponse(response) => {
545 if let ResponseMessage::Found(message) = response {
546 match message {
547 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(proposal)) => {
548 broadcast_event(
549 Arc::new(HotShotEvent::VidResponseRecv(
550 sender,
551 convert_proposal(proposal),
552 )),
553 &self.internal_event_stream,
554 )
555 .await;
556 },
557 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
558 proposal,
559 )) => {
560 broadcast_event(
561 Arc::new(HotShotEvent::VidResponseRecv(
562 sender,
563 convert_proposal(proposal),
564 )),
565 &self.internal_event_stream,
566 )
567 .await;
568 },
569 _ => {},
570 }
571 }
572 },
573 DataMessage::RequestData(data) => {
574 let req_data = data.clone();
575 if let RequestKind::Vid(_view_number, _key) = req_data.request {
576 broadcast_event(
577 Arc::new(HotShotEvent::VidRequestRecv(data, sender)),
578 &self.internal_event_stream,
579 )
580 .await;
581 }
582 },
583 },
584
585 MessageKind::External(data) => {
587 if sender == self.public_key {
588 return;
589 }
590 broadcast_event(
592 Event {
593 view_number: TYPES::View::new(1),
594 event: EventType::ExternalMessageReceived { sender, data },
595 },
596 &self.external_event_stream,
597 )
598 .await;
599 },
600 }
601 }
602}
603
604pub struct NetworkEventTaskState<
606 TYPES: NodeType,
607 V: Versions,
608 NET: ConnectedNetwork<TYPES::SignatureKey>,
609 S: Storage<TYPES>,
610> {
611 pub network: Arc<NET>,
613
614 pub view: TYPES::View,
616
617 pub epoch: Option<TYPES::Epoch>,
619
620 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
622
623 pub storage: S,
625
626 pub storage_metrics: Arc<StorageMetricsValue>,
628
629 pub consensus: OuterConsensus<TYPES>,
631
632 pub upgrade_lock: UpgradeLock<TYPES, V>,
634
635 pub transmit_tasks: BTreeMap<TYPES::View, Vec<JoinHandle<()>>>,
637
638 pub epoch_height: u64,
640
641 pub id: u64,
643}
644
645#[async_trait]
646impl<
647 TYPES: NodeType,
648 V: Versions,
649 NET: ConnectedNetwork<TYPES::SignatureKey>,
650 S: Storage<TYPES> + 'static,
651 > TaskState for NetworkEventTaskState<TYPES, V, NET, S>
652{
653 type Event = HotShotEvent<TYPES>;
654
655 async fn handle_event(
656 &mut self,
657 event: Arc<Self::Event>,
658 _sender: &Sender<Arc<Self::Event>>,
659 _receiver: &Receiver<Arc<Self::Event>>,
660 ) -> Result<()> {
661 self.handle(event).await;
662
663 Ok(())
664 }
665
666 fn cancel_subtasks(&mut self) {}
667}
668
669impl<
670 TYPES: NodeType,
671 V: Versions,
672 NET: ConnectedNetwork<TYPES::SignatureKey>,
673 S: Storage<TYPES> + 'static,
674 > NetworkEventTaskState<TYPES, V, NET, S>
675{
676 #[instrument(skip_all, fields(id = self.id, view = *self.view), name = "Network Task", level = "error")]
680 pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
681 let mut maybe_action = None;
682 if let Some((sender, message_kind, transmit)) =
683 self.parse_event(event, &mut maybe_action).await
684 {
685 self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
686 .await;
687 };
688 }
689
690 async fn handle_vid_disperse_proposal(
692 &self,
693 vid_proposal: Proposal<TYPES, VidDisperse<TYPES>>,
694 sender: &<TYPES as NodeType>::SignatureKey,
695 ) -> Option<HotShotTaskCompleted> {
696 let view = vid_proposal.data.view_number();
697 let epoch = vid_proposal.data.epoch();
698 let vid_share_proposals = VidDisperseShare::to_vid_share_proposals(vid_proposal);
699 let mut messages = HashMap::new();
700
701 for proposal in vid_share_proposals {
702 let recipient = proposal.data.recipient_key().clone();
703 let message = if self
704 .upgrade_lock
705 .epochs_enabled(proposal.data.view_number())
706 .await
707 {
708 let vid_share_proposal = if let VidDisperseShare::V1(data) = proposal.data {
709 Proposal {
710 data,
711 signature: proposal.signature,
712 _pd: proposal._pd,
713 }
714 } else {
715 tracing::warn!(
716 "Epochs are enabled for view {} but didn't receive VidDisperseShare2",
717 proposal.data.view_number()
718 );
719 return None;
720 };
721 Message {
722 sender: sender.clone(),
723 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
724 DaConsensusMessage::VidDisperseMsg2(vid_share_proposal),
725 )),
726 }
727 } else {
728 let vid_share_proposal = if let VidDisperseShare::V0(data) = proposal.data {
729 Proposal {
730 data,
731 signature: proposal.signature,
732 _pd: proposal._pd,
733 }
734 } else {
735 tracing::warn!(
736 "Epochs are not enabled for view {} but didn't receive ADVZDisperseShare",
737 proposal.data.view_number()
738 );
739 return None;
740 };
741 Message {
742 sender: sender.clone(),
743 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
744 DaConsensusMessage::VidDisperseMsg(vid_share_proposal),
745 )),
746 }
747 };
748 let serialized_message = match self.upgrade_lock.serialize(&message).await {
749 Ok(serialized) => serialized,
750 Err(e) => {
751 tracing::error!("Failed to serialize message: {e}");
752 continue;
753 },
754 };
755
756 messages.insert(recipient, serialized_message);
757 }
758
759 let net = Arc::clone(&self.network);
760 let storage = self.storage.clone();
761 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
762 spawn(async move {
763 if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
764 Some(HotShotAction::VidDisperse),
765 storage,
766 consensus,
767 view,
768 epoch,
769 )
770 .await
771 .is_err()
772 {
773 return;
774 }
775 match net.vid_broadcast_message(messages).await {
776 Ok(()) => {},
777 Err(e) => tracing::warn!("Failed to send message from network task: {e:?}"),
778 }
779 });
780
781 None
782 }
783
784 async fn maybe_record_action(
786 maybe_action: Option<HotShotAction>,
787 storage: S,
788 consensus: OuterConsensus<TYPES>,
789 view: <TYPES as NodeType>::View,
790 epoch: Option<<TYPES as NodeType>::Epoch>,
791 ) -> std::result::Result<(), ()> {
792 if let Some(mut action) = maybe_action {
793 if !consensus.write().await.update_action(action, view) {
794 return Err(());
795 }
796 if matches!(action, HotShotAction::ViewSyncVote) {
799 action = HotShotAction::Vote;
800 }
801 match storage.record_action(view, epoch, action).await {
802 Ok(()) => Ok(()),
803 Err(e) => {
804 tracing::warn!("Not Sending {action:?} because of storage error: {e:?}");
805 Err(())
806 },
807 }
808 } else {
809 Ok(())
810 }
811 }
812
813 pub fn cancel_tasks(&mut self, view: TYPES::View) {
815 let keep = self.transmit_tasks.split_off(&view);
816
817 while let Some((_, tasks)) = self.transmit_tasks.pop_first() {
818 for task in tasks {
819 task.abort();
820 }
821 }
822
823 self.transmit_tasks = keep;
824 }
825
826 #[allow(clippy::too_many_lines)]
831 async fn parse_event(
832 &mut self,
833 event: Arc<HotShotEvent<TYPES>>,
834 maybe_action: &mut Option<HotShotAction>,
835 ) -> Option<(
836 <TYPES as NodeType>::SignatureKey,
837 MessageKind<TYPES>,
838 TransmitType<TYPES>,
839 )> {
840 match event.as_ref().clone() {
841 HotShotEvent::QuorumProposalSend(proposal, sender) => {
842 *maybe_action = Some(HotShotAction::Propose);
843
844 let message = if self
845 .upgrade_lock
846 .epochs_enabled(proposal.data.view_number())
847 .await
848 {
849 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
850 GeneralConsensusMessage::Proposal2(convert_proposal(proposal)),
851 ))
852 } else {
853 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
854 GeneralConsensusMessage::Proposal(convert_proposal(proposal)),
855 ))
856 };
857
858 Some((sender, message, TransmitType::Broadcast))
859 },
860
861 HotShotEvent::QuorumVoteSend(vote) => {
863 *maybe_action = Some(HotShotAction::Vote);
864 let view_number = vote.view_number() + 1;
865 let leader = match self
866 .membership_coordinator
867 .membership_for_epoch(vote.epoch())
868 .await
869 .ok()?
870 .leader(view_number)
871 .await
872 {
873 Ok(l) => l,
874 Err(e) => {
875 tracing::warn!(
876 "Failed to calculate leader for view number {view_number}. Error: \
877 {e:?}"
878 );
879 return None;
880 },
881 };
882
883 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
884 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
885 GeneralConsensusMessage::Vote2(vote.clone()),
886 ))
887 } else {
888 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
889 GeneralConsensusMessage::Vote(vote.clone().to_vote()),
890 ))
891 };
892
893 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
894 },
895 HotShotEvent::EpochRootQuorumVoteSend(vote) => {
896 *maybe_action = Some(HotShotAction::Vote);
897 let view_number = vote.view_number() + 1;
898 let leader = match self
899 .membership_coordinator
900 .membership_for_epoch(vote.epoch())
901 .await
902 .ok()?
903 .leader(view_number)
904 .await
905 {
906 Ok(l) => l,
907 Err(e) => {
908 tracing::warn!(
909 "Failed to calculate leader for view number {:?}. Error: {:?}",
910 view_number,
911 e
912 );
913 return None;
914 },
915 };
916
917 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
918 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
919 GeneralConsensusMessage::EpochRootQuorumVote(vote.clone()),
920 ))
921 } else {
922 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
923 GeneralConsensusMessage::Vote(vote.vote.clone().to_vote()),
924 ))
925 };
926
927 Some((
928 vote.vote.signing_key(),
929 message,
930 TransmitType::Direct(leader),
931 ))
932 },
933 HotShotEvent::ExtendedQuorumVoteSend(vote) => {
934 *maybe_action = Some(HotShotAction::Vote);
935 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
936 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
937 GeneralConsensusMessage::Vote2(vote.clone()),
938 ))
939 } else {
940 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
941 GeneralConsensusMessage::Vote(vote.clone().to_vote()),
942 ))
943 };
944
945 Some((vote.signing_key(), message, TransmitType::Broadcast))
946 },
947 HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
948 req.key.clone(),
949 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
950 GeneralConsensusMessage::ProposalRequested(req.clone(), signature),
951 )),
952 TransmitType::Broadcast,
953 )),
954 HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => {
955 let message = if self
956 .upgrade_lock
957 .epochs_enabled(proposal.data.view_number())
958 .await
959 {
960 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
961 GeneralConsensusMessage::ProposalResponse2(convert_proposal(proposal)),
962 ))
963 } else {
964 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
965 GeneralConsensusMessage::ProposalResponse(convert_proposal(proposal)),
966 ))
967 };
968
969 Some((
970 sender_key.clone(),
971 message,
972 TransmitType::Direct(sender_key),
973 ))
974 },
975 HotShotEvent::VidDisperseSend(proposal, sender) => {
976 self.handle_vid_disperse_proposal(proposal, &sender).await;
977 None
978 },
979 HotShotEvent::DaProposalSend(proposal, sender) => {
980 *maybe_action = Some(HotShotAction::DaPropose);
981
982 let message = if self
983 .upgrade_lock
984 .epochs_enabled(proposal.data.view_number())
985 .await
986 {
987 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
988 DaConsensusMessage::DaProposal2(proposal),
989 ))
990 } else {
991 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
992 DaConsensusMessage::DaProposal(convert_proposal(proposal)),
993 ))
994 };
995
996 Some((sender, message, TransmitType::DaCommitteeBroadcast))
997 },
998 HotShotEvent::DaVoteSend(vote) => {
999 *maybe_action = Some(HotShotAction::DaVote);
1000 let view_number = vote.view_number();
1001 let leader = match self
1002 .membership_coordinator
1003 .membership_for_epoch(vote.epoch())
1004 .await
1005 .ok()?
1006 .leader(view_number)
1007 .await
1008 {
1009 Ok(l) => l,
1010 Err(e) => {
1011 tracing::warn!(
1012 "Failed to calculate leader for view number {view_number}. Error: \
1013 {e:?}"
1014 );
1015 return None;
1016 },
1017 };
1018
1019 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1020 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1021 DaConsensusMessage::DaVote2(vote.clone()),
1022 ))
1023 } else {
1024 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1025 DaConsensusMessage::DaVote(vote.clone().to_vote()),
1026 ))
1027 };
1028
1029 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1030 },
1031 HotShotEvent::DacSend(certificate, sender) => {
1032 *maybe_action = Some(HotShotAction::DaCert);
1033 let message = if self
1034 .upgrade_lock
1035 .epochs_enabled(certificate.view_number())
1036 .await
1037 {
1038 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1039 DaConsensusMessage::DaCertificate2(certificate),
1040 ))
1041 } else {
1042 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1043 DaConsensusMessage::DaCertificate(certificate.to_dac()),
1044 ))
1045 };
1046
1047 Some((sender, message, TransmitType::Broadcast))
1048 },
1049 HotShotEvent::ViewSyncPreCommitVoteSend(vote) => {
1050 let view_number = vote.view_number() + vote.date().relay;
1051 let leader = match self
1052 .membership_coordinator
1053 .membership_for_epoch(self.epoch)
1054 .await
1055 .ok()?
1056 .leader(view_number)
1057 .await
1058 {
1059 Ok(l) => l,
1060 Err(e) => {
1061 tracing::warn!(
1062 "Failed to calculate leader for view number {view_number}. Error: \
1063 {e:?}"
1064 );
1065 return None;
1066 },
1067 };
1068 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1069 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1070 GeneralConsensusMessage::ViewSyncPreCommitVote2(vote.clone()),
1071 ))
1072 } else {
1073 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1074 GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone().to_vote()),
1075 ))
1076 };
1077
1078 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1079 },
1080 HotShotEvent::ViewSyncCommitVoteSend(vote) => {
1081 *maybe_action = Some(HotShotAction::ViewSyncVote);
1082 let view_number = vote.view_number() + vote.date().relay;
1083 let leader = match self
1084 .membership_coordinator
1085 .membership_for_epoch(self.epoch)
1086 .await
1087 .ok()?
1088 .leader(view_number)
1089 .await
1090 {
1091 Ok(l) => l,
1092 Err(e) => {
1093 tracing::warn!(
1094 "Failed to calculate leader for view number {view_number}. Error: \
1095 {e:?}"
1096 );
1097 return None;
1098 },
1099 };
1100 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1101 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1102 GeneralConsensusMessage::ViewSyncCommitVote2(vote.clone()),
1103 ))
1104 } else {
1105 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1106 GeneralConsensusMessage::ViewSyncCommitVote(vote.clone().to_vote()),
1107 ))
1108 };
1109
1110 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1111 },
1112 HotShotEvent::ViewSyncFinalizeVoteSend(vote) => {
1113 *maybe_action = Some(HotShotAction::ViewSyncVote);
1114 let view_number = vote.view_number() + vote.date().relay;
1115 let leader = match self
1116 .membership_coordinator
1117 .membership_for_epoch(self.epoch)
1118 .await
1119 .ok()?
1120 .leader(view_number)
1121 .await
1122 {
1123 Ok(l) => l,
1124 Err(e) => {
1125 tracing::warn!(
1126 "Failed to calculate leader for view number {view_number:?}. Error: \
1127 {e:?}"
1128 );
1129 return None;
1130 },
1131 };
1132 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1133 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1134 GeneralConsensusMessage::ViewSyncFinalizeVote2(vote.clone()),
1135 ))
1136 } else {
1137 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1138 GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone().to_vote()),
1139 ))
1140 };
1141
1142 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1143 },
1144 HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, sender) => {
1145 let view_number = certificate.view_number();
1146 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1147 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1148 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(certificate),
1149 ))
1150 } else {
1151 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1152 GeneralConsensusMessage::ViewSyncPreCommitCertificate(certificate.to_vsc()),
1153 ))
1154 };
1155
1156 Some((sender, message, TransmitType::Broadcast))
1157 },
1158 HotShotEvent::ViewSyncCommitCertificateSend(certificate, sender) => {
1159 let view_number = certificate.view_number();
1160 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1161 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1162 GeneralConsensusMessage::ViewSyncCommitCertificate2(certificate),
1163 ))
1164 } else {
1165 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1166 GeneralConsensusMessage::ViewSyncCommitCertificate(certificate.to_vsc()),
1167 ))
1168 };
1169
1170 Some((sender, message, TransmitType::Broadcast))
1171 },
1172 HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, sender) => {
1173 let view_number = certificate.view_number();
1174 let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1175 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1176 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(certificate),
1177 ))
1178 } else {
1179 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1180 GeneralConsensusMessage::ViewSyncFinalizeCertificate(certificate.to_vsc()),
1181 ))
1182 };
1183
1184 Some((sender, message, TransmitType::Broadcast))
1185 },
1186 HotShotEvent::TimeoutVoteSend(vote) => {
1187 *maybe_action = Some(HotShotAction::TimeoutVote);
1188 let view_number = vote.view_number() + 1;
1189 let leader = match self
1190 .membership_coordinator
1191 .membership_for_epoch(self.epoch)
1192 .await
1193 .ok()?
1194 .leader(view_number)
1195 .await
1196 {
1197 Ok(l) => l,
1198 Err(e) => {
1199 tracing::warn!(
1200 "Failed to calculate leader for view number {view_number}. Error: \
1201 {e:?}"
1202 );
1203 return None;
1204 },
1205 };
1206 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1207 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1208 GeneralConsensusMessage::TimeoutVote2(vote.clone()),
1209 ))
1210 } else {
1211 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1212 GeneralConsensusMessage::TimeoutVote(vote.clone().to_vote()),
1213 ))
1214 };
1215
1216 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1217 },
1218 HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
1219 sender,
1220 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1221 GeneralConsensusMessage::UpgradeProposal(proposal),
1222 )),
1223 TransmitType::Broadcast,
1224 )),
1225 HotShotEvent::UpgradeVoteSend(vote) => {
1226 tracing::error!("Sending upgrade vote!");
1227 let view_number = vote.view_number();
1228 let leader = match self
1229 .membership_coordinator
1230 .membership_for_epoch(self.epoch)
1231 .await
1232 .ok()?
1233 .leader(view_number)
1234 .await
1235 {
1236 Ok(l) => l,
1237 Err(e) => {
1238 tracing::warn!(
1239 "Failed to calculate leader for view number {view_number}. Error: \
1240 {e:?}"
1241 );
1242 return None;
1243 },
1244 };
1245 Some((
1246 vote.signing_key(),
1247 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1248 GeneralConsensusMessage::UpgradeVote(vote.clone()),
1249 )),
1250 TransmitType::Direct(leader),
1251 ))
1252 },
1253 HotShotEvent::ViewChange(view, epoch) => {
1254 self.view = view;
1255 if epoch > self.epoch {
1256 self.epoch = epoch;
1257 }
1258 let keep_view = TYPES::View::new(view.saturating_sub(1));
1259 self.cancel_tasks(keep_view);
1260 let net = Arc::clone(&self.network);
1261 let epoch = self.epoch.map(|x| x.u64());
1262 let membership_coordinator = self.membership_coordinator.clone();
1263 spawn(async move {
1264 net.update_view::<TYPES>(*keep_view, epoch, membership_coordinator)
1265 .await;
1266 });
1267 None
1268 },
1269 HotShotEvent::VidRequestSend(req, sender, to) => Some((
1270 sender,
1271 MessageKind::Data(DataMessage::RequestData(req)),
1272 TransmitType::Direct(to),
1273 )),
1274 HotShotEvent::VidResponseSend(sender, to, proposal) => {
1275 let epochs_enabled = self
1276 .upgrade_lock
1277 .epochs_enabled(proposal.data.view_number())
1278 .await;
1279 let message = match proposal.data {
1280 VidDisperseShare::V0(data) => {
1281 if epochs_enabled {
1282 tracing::warn!(
1283 "Epochs are enabled for view {} but didn't receive \
1284 VidDisperseShare2",
1285 data.view_number()
1286 );
1287 return None;
1288 }
1289 let vid_share_proposal = Proposal {
1290 data,
1291 signature: proposal.signature,
1292 _pd: proposal._pd,
1293 };
1294 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1295 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(
1296 vid_share_proposal,
1297 )),
1298 )))
1299 },
1300 VidDisperseShare::V1(data) => {
1301 if !epochs_enabled {
1302 tracing::warn!(
1303 "Epochs are enabled for view {} but didn't receive \
1304 ADVZDisperseShare",
1305 data.view_number()
1306 );
1307 return None;
1308 }
1309 let vid_share_proposal = Proposal {
1310 data,
1311 signature: proposal.signature,
1312 _pd: proposal._pd,
1313 };
1314 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1315 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
1316 vid_share_proposal,
1317 )),
1318 )))
1319 },
1320 };
1321 Some((sender, message, TransmitType::Direct(to)))
1322 },
1323 HotShotEvent::HighQcSend(quorum_cert, next_epoch_qc, leader, sender) => Some((
1324 sender,
1325 MessageKind::Consensus(SequencingMessage::General(
1326 GeneralConsensusMessage::HighQc(quorum_cert, next_epoch_qc),
1327 )),
1328 TransmitType::Direct(leader),
1329 )),
1330 HotShotEvent::EpochRootQcSend(epoch_root_qc, sender, leader) => Some((
1331 sender,
1332 MessageKind::Consensus(SequencingMessage::General(
1333 GeneralConsensusMessage::EpochRootQc(epoch_root_qc),
1334 )),
1335 TransmitType::Direct(leader),
1336 )),
1337 HotShotEvent::ExtendedQcSend(quorum_cert, next_epoch_qc, sender) => Some((
1338 sender,
1339 MessageKind::Consensus(SequencingMessage::General(
1340 GeneralConsensusMessage::ExtendedQc(quorum_cert, next_epoch_qc),
1341 )),
1342 TransmitType::Broadcast,
1343 )),
1344 _ => None,
1345 }
1346 }
1347
1348 async fn spawn_transmit_task(
1350 &mut self,
1351 message_kind: MessageKind<TYPES>,
1352 maybe_action: Option<HotShotAction>,
1353 transmit: TransmitType<TYPES>,
1354 sender: TYPES::SignatureKey,
1355 ) {
1356 let broadcast_delay = match &message_kind {
1357 MessageKind::Consensus(
1358 SequencingMessage::General(GeneralConsensusMessage::Vote(_))
1359 | SequencingMessage::Da(_),
1360 ) => BroadcastDelay::View(*message_kind.view_number()),
1361 _ => BroadcastDelay::None,
1362 };
1363 let message = Message {
1364 sender,
1365 kind: message_kind,
1366 };
1367 let view_number = message.kind.view_number();
1368 let epoch = message.kind.epoch();
1369 let committee_topic = Topic::Global;
1370 let Ok(mem) = self
1371 .membership_coordinator
1372 .stake_table_for_epoch(self.epoch)
1373 .await
1374 else {
1375 return;
1376 };
1377 let da_committee = mem.da_committee_members(view_number).await;
1378 let network = Arc::clone(&self.network);
1379 let storage = self.storage.clone();
1380 let storage_metrics = Arc::clone(&self.storage_metrics);
1381 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
1382 let upgrade_lock = self.upgrade_lock.clone();
1383 let handle = spawn(async move {
1384 if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
1385 maybe_action,
1386 storage.clone(),
1387 consensus,
1388 view_number,
1389 epoch,
1390 )
1391 .await
1392 .is_err()
1393 {
1394 return;
1395 }
1396 if let MessageKind::Consensus(SequencingMessage::General(
1397 GeneralConsensusMessage::Proposal(prop),
1398 )) = &message.kind
1399 {
1400 let now = Instant::now();
1401 if storage
1402 .append_proposal2(&convert_proposal(prop.clone()))
1403 .await
1404 .is_err()
1405 {
1406 return;
1407 }
1408 storage_metrics
1409 .append_quorum_duration
1410 .add_point(now.elapsed().as_secs_f64());
1411 }
1412
1413 let serialized_message = match upgrade_lock.serialize(&message).await {
1414 Ok(serialized) => serialized,
1415 Err(e) => {
1416 tracing::error!("Failed to serialize message: {e}");
1417 return;
1418 },
1419 };
1420
1421 let transmit_result = match transmit {
1422 TransmitType::Direct(recipient) => {
1423 network.direct_message(serialized_message, recipient).await
1424 },
1425 TransmitType::Broadcast => {
1426 network
1427 .broadcast_message(serialized_message, committee_topic, broadcast_delay)
1428 .await
1429 },
1430 TransmitType::DaCommitteeBroadcast => {
1431 network
1432 .da_broadcast_message(
1433 serialized_message,
1434 da_committee.iter().cloned().collect(),
1435 broadcast_delay,
1436 )
1437 .await
1438 },
1439 };
1440
1441 match transmit_result {
1442 Ok(()) => {},
1443 Err(e) => tracing::warn!("Failed to send message task: {e:?}"),
1444 }
1445 });
1446 self.transmit_tasks
1447 .entry(view_number)
1448 .or_default()
1449 .push(handle);
1450 }
1451}
1452
1453pub mod test {
1455 use std::ops::{Deref, DerefMut};
1456
1457 use async_trait::async_trait;
1458
1459 use super::{
1460 Arc, ConnectedNetwork, HotShotEvent, MessageKind, NetworkEventTaskState, NodeType,
1461 Receiver, Result, Sender, Storage, TaskState, TransmitType, Versions,
1462 };
1463
1464 pub type ModifierClosure<TYPES> = dyn Fn(
1467 &mut <TYPES as NodeType>::SignatureKey,
1468 &mut MessageKind<TYPES>,
1469 &mut TransmitType<TYPES>,
1470 &<TYPES as NodeType>::Membership,
1471 ) + Send
1472 + Sync;
1473
1474 pub struct NetworkEventTaskStateModifier<
1476 TYPES: NodeType,
1477 V: Versions,
1478 NET: ConnectedNetwork<TYPES::SignatureKey>,
1479 S: Storage<TYPES>,
1480 > {
1481 pub network_event_task_state: NetworkEventTaskState<TYPES, V, NET, S>,
1483 pub modifier: Arc<ModifierClosure<TYPES>>,
1486 }
1487
1488 impl<
1489 TYPES: NodeType,
1490 V: Versions,
1491 NET: ConnectedNetwork<TYPES::SignatureKey>,
1492 S: Storage<TYPES> + 'static,
1493 > NetworkEventTaskStateModifier<TYPES, V, NET, S>
1494 {
1495 pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
1497 let mut maybe_action = None;
1498 if let Some((mut sender, mut message_kind, mut transmit)) =
1499 self.parse_event(event, &mut maybe_action).await
1500 {
1501 (self.modifier)(
1503 &mut sender,
1504 &mut message_kind,
1505 &mut transmit,
1506 &*self.membership_coordinator.membership().read().await,
1507 );
1508
1509 self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
1510 .await;
1511 }
1512 }
1513 }
1514
1515 #[async_trait]
1516 impl<
1517 TYPES: NodeType,
1518 V: Versions,
1519 NET: ConnectedNetwork<TYPES::SignatureKey>,
1520 S: Storage<TYPES> + 'static,
1521 > TaskState for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1522 {
1523 type Event = HotShotEvent<TYPES>;
1524
1525 async fn handle_event(
1526 &mut self,
1527 event: Arc<Self::Event>,
1528 _sender: &Sender<Arc<Self::Event>>,
1529 _receiver: &Receiver<Arc<Self::Event>>,
1530 ) -> Result<()> {
1531 self.handle(event).await;
1532
1533 Ok(())
1534 }
1535
1536 fn cancel_subtasks(&mut self) {}
1537 }
1538
1539 impl<
1540 TYPES: NodeType,
1541 V: Versions,
1542 NET: ConnectedNetwork<TYPES::SignatureKey>,
1543 S: Storage<TYPES>,
1544 > Deref for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1545 {
1546 type Target = NetworkEventTaskState<TYPES, V, NET, S>;
1547
1548 fn deref(&self) -> &Self::Target {
1549 &self.network_event_task_state
1550 }
1551 }
1552
1553 impl<
1554 TYPES: NodeType,
1555 V: Versions,
1556 NET: ConnectedNetwork<TYPES::SignatureKey>,
1557 S: Storage<TYPES>,
1558 > DerefMut for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1559 {
1560 fn deref_mut(&mut self) -> &mut Self::Target {
1561 &mut self.network_event_task_state
1562 }
1563 }
1564}