hotshot_task_impls/
network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap},
9    hash::{DefaultHasher, Hash, Hasher},
10    sync::Arc,
11};
12
13use async_broadcast::{Receiver, Sender};
14use async_trait::async_trait;
15use hotshot_task::task::TaskState;
16use hotshot_types::{
17    consensus::OuterConsensus,
18    data::{VidDisperse, VidDisperseShare},
19    epoch_membership::EpochMembershipCoordinator,
20    event::{Event, EventType, HotShotAction},
21    message::{
22        convert_proposal, DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message,
23        MessageKind, Proposal, SequencingMessage, UpgradeLock,
24    },
25    simple_vote::HasEpoch,
26    storage_metrics::StorageMetricsValue,
27    traits::{
28        network::{
29            BroadcastDelay, ConnectedNetwork, RequestKind, ResponseMessage, Topic, TransmitType,
30            ViewMessage,
31        },
32        node_implementation::{ConsensusTime, NodeType, Versions},
33        storage::Storage,
34    },
35    vote::{HasViewNumber, Vote},
36};
37use hotshot_utils::anytrace::*;
38use tokio::{spawn, task::JoinHandle, time::Instant};
39use tracing::instrument;
40
41use crate::{
42    events::{HotShotEvent, HotShotTaskCompleted},
43    helpers::broadcast_event,
44};
45
46/// the network message task state
47#[derive(Clone)]
48pub struct NetworkMessageTaskState<TYPES: NodeType, V: Versions> {
49    /// Sender to send internal events this task generates to other tasks
50    pub internal_event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
51
52    /// Sender to send external events this task generates to the event stream
53    pub external_event_stream: Sender<Event<TYPES>>,
54
55    /// This nodes public key
56    pub public_key: TYPES::SignatureKey,
57
58    /// Transaction Cache to ignore previously seen transactions
59    pub transactions_cache: lru::LruCache<u64, ()>,
60
61    /// Lock for a decided upgrade
62    pub upgrade_lock: UpgradeLock<TYPES, V>,
63
64    /// Node's id
65    pub id: u64,
66}
67
68impl<TYPES: NodeType, V: Versions> NetworkMessageTaskState<TYPES, V> {
69    #[instrument(skip_all, name = "Network message task", fields(id = self.id), level = "trace")]
70    /// Handles a (deserialized) message from the network
71    pub async fn handle_message(&mut self, message: Message<TYPES>) {
72        match &message.kind {
73            MessageKind::Consensus(_) => {
74                tracing::debug!("Received consensus message from network: {:?}", message)
75            },
76            MessageKind::Data(_) => {
77                tracing::trace!("Received data message from network: {:?}", message)
78            },
79            MessageKind::External(_) => {
80                tracing::trace!("Received external message from network: {:?}", message)
81            },
82        }
83
84        // Match the message kind and send the appropriate event to the internal event stream
85        let sender = message.sender;
86        match message.kind {
87            // Handle consensus messages
88            MessageKind::Consensus(consensus_message) => {
89                let event = match consensus_message {
90                    SequencingMessage::General(general_message) => match general_message {
91                        GeneralConsensusMessage::Proposal(proposal) => {
92                            if self
93                                .upgrade_lock
94                                .epochs_enabled(proposal.data.view_number())
95                                .await
96                            {
97                                tracing::warn!(
98                                    "received GeneralConsensusMessage::Proposal for view {} but \
99                                     epochs are enabled for that view",
100                                    proposal.data.view_number()
101                                );
102                                return;
103                            }
104                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
105                        },
106                        GeneralConsensusMessage::Proposal2(proposal) => {
107                            if !self
108                                .upgrade_lock
109                                .epochs_enabled(proposal.data.view_number())
110                                .await
111                            {
112                                tracing::warn!(
113                                    "received GeneralConsensusMessage::Proposal2 for view {} but \
114                                     epochs are not enabled for that view",
115                                    proposal.data.view_number()
116                                );
117                                return;
118                            }
119                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
120                        },
121                        GeneralConsensusMessage::ProposalRequested(req, sig) => {
122                            HotShotEvent::QuorumProposalRequestRecv(req, sig)
123                        },
124                        GeneralConsensusMessage::ProposalResponse(proposal) => {
125                            if self
126                                .upgrade_lock
127                                .epochs_enabled(proposal.data.view_number())
128                                .await
129                            {
130                                tracing::warn!(
131                                    "received GeneralConsensusMessage::ProposalResponse for view \
132                                     {} but epochs are enabled for that view",
133                                    proposal.data.view_number()
134                                );
135                                return;
136                            }
137                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
138                        },
139                        GeneralConsensusMessage::ProposalResponse2(proposal) => {
140                            if !self
141                                .upgrade_lock
142                                .epochs_enabled(proposal.data.view_number())
143                                .await
144                            {
145                                tracing::warn!(
146                                    "received GeneralConsensusMessage::ProposalResponse2 for view \
147                                     {} but epochs are not enabled for that view",
148                                    proposal.data.view_number()
149                                );
150                                return;
151                            }
152                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
153                        },
154                        GeneralConsensusMessage::Vote(vote) => {
155                            if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
156                                tracing::warn!(
157                                    "received GeneralConsensusMessage::Vote for view {} but \
158                                     epochs are enabled for that view",
159                                    vote.view_number()
160                                );
161                                return;
162                            }
163                            HotShotEvent::QuorumVoteRecv(vote.to_vote2())
164                        },
165                        GeneralConsensusMessage::Vote2(vote) => {
166                            if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
167                                tracing::warn!(
168                                    "received GeneralConsensusMessage::Vote2 for view {} but \
169                                     epochs are not enabled for that view",
170                                    vote.view_number()
171                                );
172                                return;
173                            }
174                            HotShotEvent::QuorumVoteRecv(vote)
175                        },
176                        GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => {
177                            if self
178                                .upgrade_lock
179                                .epochs_enabled(view_sync_message.view_number())
180                                .await
181                            {
182                                tracing::warn!(
183                                    "received GeneralConsensusMessage::ViewSyncPreCommitVote for \
184                                     view {} but epochs are enabled for that view",
185                                    view_sync_message.view_number()
186                                );
187                                return;
188                            }
189                            HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message.to_vote2())
190                        },
191                        GeneralConsensusMessage::ViewSyncPreCommitVote2(view_sync_message) => {
192                            if !self
193                                .upgrade_lock
194                                .epochs_enabled(view_sync_message.view_number())
195                                .await
196                            {
197                                tracing::warn!(
198                                    "received GeneralConsensusMessage::ViewSyncPreCommitVote2 for \
199                                     view {} but epochs are not enabled for that view",
200                                    view_sync_message.view_number()
201                                );
202                                return;
203                            }
204                            HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message)
205                        },
206                        GeneralConsensusMessage::ViewSyncPreCommitCertificate(
207                            view_sync_message,
208                        ) => {
209                            if self
210                                .upgrade_lock
211                                .epochs_enabled(view_sync_message.view_number())
212                                .await
213                            {
214                                tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
215                                return;
216                            }
217                            HotShotEvent::ViewSyncPreCommitCertificateRecv(
218                                view_sync_message.to_vsc2(),
219                            )
220                        },
221                        GeneralConsensusMessage::ViewSyncPreCommitCertificate2(
222                            view_sync_message,
223                        ) => {
224                            if !self
225                                .upgrade_lock
226                                .epochs_enabled(view_sync_message.view_number())
227                                .await
228                            {
229                                tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
230                                return;
231                            }
232                            HotShotEvent::ViewSyncPreCommitCertificateRecv(view_sync_message)
233                        },
234                        GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => {
235                            if self
236                                .upgrade_lock
237                                .epochs_enabled(view_sync_message.view_number())
238                                .await
239                            {
240                                tracing::warn!(
241                                    "received GeneralConsensusMessage::ViewSyncCommitVote for \
242                                     view {} but epochs are enabled for that view",
243                                    view_sync_message.view_number()
244                                );
245                                return;
246                            }
247                            HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message.to_vote2())
248                        },
249                        GeneralConsensusMessage::ViewSyncCommitVote2(view_sync_message) => {
250                            if !self
251                                .upgrade_lock
252                                .epochs_enabled(view_sync_message.view_number())
253                                .await
254                            {
255                                tracing::warn!(
256                                    "received GeneralConsensusMessage::ViewSyncCommitVote2 for \
257                                     view {} but epochs are not enabled for that view",
258                                    view_sync_message.view_number()
259                                );
260                                return;
261                            }
262                            HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message)
263                        },
264                        GeneralConsensusMessage::ViewSyncCommitCertificate(view_sync_message) => {
265                            if self
266                                .upgrade_lock
267                                .epochs_enabled(view_sync_message.view_number())
268                                .await
269                            {
270                                tracing::warn!(
271                                    "received GeneralConsensusMessage::ViewSyncCommitCertificate \
272                                     for view {} but epochs are enabled for that view",
273                                    view_sync_message.view_number()
274                                );
275                                return;
276                            }
277                            HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message.to_vsc2())
278                        },
279                        GeneralConsensusMessage::ViewSyncCommitCertificate2(view_sync_message) => {
280                            if !self
281                                .upgrade_lock
282                                .epochs_enabled(view_sync_message.view_number())
283                                .await
284                            {
285                                tracing::warn!(
286                                    "received GeneralConsensusMessage::ViewSyncCommitCertificate2 \
287                                     for view {} but epochs are not enabled for that view",
288                                    view_sync_message.view_number()
289                                );
290                                return;
291                            }
292                            HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message)
293                        },
294                        GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => {
295                            if self
296                                .upgrade_lock
297                                .epochs_enabled(view_sync_message.view_number())
298                                .await
299                            {
300                                tracing::warn!(
301                                    "received GeneralConsensusMessage::ViewSyncFinalizeVote for \
302                                     view {} but epochs are enabled for that view",
303                                    view_sync_message.view_number()
304                                );
305                                return;
306                            }
307                            HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message.to_vote2())
308                        },
309                        GeneralConsensusMessage::ViewSyncFinalizeVote2(view_sync_message) => {
310                            if !self
311                                .upgrade_lock
312                                .epochs_enabled(view_sync_message.view_number())
313                                .await
314                            {
315                                tracing::warn!(
316                                    "received GeneralConsensusMessage::ViewSyncFinalizeVote2 for \
317                                     view {} but epochs are not enabled for that view",
318                                    view_sync_message.view_number()
319                                );
320                                return;
321                            }
322                            HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message)
323                        },
324                        GeneralConsensusMessage::ViewSyncFinalizeCertificate(view_sync_message) => {
325                            if self
326                                .upgrade_lock
327                                .epochs_enabled(view_sync_message.view_number())
328                                .await
329                            {
330                                tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
331                                return;
332                            }
333                            HotShotEvent::ViewSyncFinalizeCertificateRecv(
334                                view_sync_message.to_vsc2(),
335                            )
336                        },
337                        GeneralConsensusMessage::ViewSyncFinalizeCertificate2(
338                            view_sync_message,
339                        ) => {
340                            if !self
341                                .upgrade_lock
342                                .epochs_enabled(view_sync_message.view_number())
343                                .await
344                            {
345                                tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
346                                return;
347                            }
348                            HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_message)
349                        },
350                        GeneralConsensusMessage::TimeoutVote(message) => {
351                            if self
352                                .upgrade_lock
353                                .epochs_enabled(message.view_number())
354                                .await
355                            {
356                                tracing::warn!(
357                                    "received GeneralConsensusMessage::TimeoutVote for view {} \
358                                     but epochs are enabled for that view",
359                                    message.view_number()
360                                );
361                                return;
362                            }
363                            HotShotEvent::TimeoutVoteRecv(message.to_vote2())
364                        },
365                        GeneralConsensusMessage::TimeoutVote2(message) => {
366                            if !self
367                                .upgrade_lock
368                                .epochs_enabled(message.view_number())
369                                .await
370                            {
371                                tracing::warn!(
372                                    "received GeneralConsensusMessage::TimeoutVote2 for view {} \
373                                     but epochs are not enabled for that view",
374                                    message.view_number()
375                                );
376                                return;
377                            }
378                            HotShotEvent::TimeoutVoteRecv(message)
379                        },
380                        GeneralConsensusMessage::UpgradeProposal(message) => {
381                            HotShotEvent::UpgradeProposalRecv(message, sender)
382                        },
383                        GeneralConsensusMessage::UpgradeVote(message) => {
384                            tracing::error!("Received upgrade vote!");
385                            HotShotEvent::UpgradeVoteRecv(message)
386                        },
387                        GeneralConsensusMessage::HighQc(qc, next_qc) => {
388                            HotShotEvent::HighQcRecv(qc, next_qc, sender)
389                        },
390                        GeneralConsensusMessage::ExtendedQc(qc, next_epoch_qc) => {
391                            HotShotEvent::ExtendedQcRecv(qc, next_epoch_qc, sender)
392                        },
393                        GeneralConsensusMessage::EpochRootQuorumVote(vote) => {
394                            if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
395                                tracing::warn!(
396                                    "received GeneralConsensusMessage::EpochRootVote for view {} \
397                                     but epochs are not enabled for that view",
398                                    vote.view_number()
399                                );
400                                return;
401                            }
402                            HotShotEvent::EpochRootQuorumVoteRecv(vote)
403                        },
404                        GeneralConsensusMessage::EpochRootQc(root_qc) => {
405                            if !self
406                                .upgrade_lock
407                                .epochs_enabled(root_qc.view_number())
408                                .await
409                            {
410                                tracing::warn!(
411                                    "received GeneralConsensusMessage::EpochRootQc for view {} \
412                                     but epochs are not enabled for that view",
413                                    root_qc.view_number()
414                                );
415                                return;
416                            }
417                            HotShotEvent::EpochRootQcRecv(root_qc, sender)
418                        },
419                    },
420                    SequencingMessage::Da(da_message) => match da_message {
421                        DaConsensusMessage::DaProposal(proposal) => {
422                            if self
423                                .upgrade_lock
424                                .epochs_enabled(proposal.data.view_number())
425                                .await
426                            {
427                                tracing::warn!(
428                                    "received DaConsensusMessage::DaProposal for view {} but \
429                                     epochs are enabled for that view",
430                                    proposal.data.view_number()
431                                );
432                                return;
433                            }
434                            HotShotEvent::DaProposalRecv(convert_proposal(proposal), sender)
435                        },
436                        DaConsensusMessage::DaProposal2(proposal) => {
437                            if !self
438                                .upgrade_lock
439                                .epochs_enabled(proposal.data.view_number())
440                                .await
441                            {
442                                tracing::warn!(
443                                    "received DaConsensusMessage::DaProposal2 for view {} but \
444                                     epochs are not enabled for that view",
445                                    proposal.data.view_number()
446                                );
447                                return;
448                            }
449                            HotShotEvent::DaProposalRecv(proposal, sender)
450                        },
451                        DaConsensusMessage::DaVote(vote) => {
452                            if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
453                                tracing::warn!(
454                                    "received DaConsensusMessage::DaVote for view {} but epochs \
455                                     are enabled for that view",
456                                    vote.view_number()
457                                );
458                                return;
459                            }
460                            HotShotEvent::DaVoteRecv(vote.clone().to_vote2())
461                        },
462                        DaConsensusMessage::DaVote2(vote) => {
463                            if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
464                                tracing::warn!(
465                                    "received DaConsensusMessage::DaVote2 for view {} but epochs \
466                                     are not enabled for that view",
467                                    vote.view_number()
468                                );
469                                return;
470                            }
471                            HotShotEvent::DaVoteRecv(vote.clone())
472                        },
473                        DaConsensusMessage::DaCertificate(cert) => {
474                            if self.upgrade_lock.epochs_enabled(cert.view_number()).await {
475                                tracing::warn!(
476                                    "received DaConsensusMessage::DaCertificate for view {} but \
477                                     epochs are enabled for that view",
478                                    cert.view_number()
479                                );
480                                return;
481                            }
482                            HotShotEvent::DaCertificateRecv(cert.to_dac2())
483                        },
484                        DaConsensusMessage::DaCertificate2(cert) => {
485                            if !self.upgrade_lock.epochs_enabled(cert.view_number()).await {
486                                tracing::warn!(
487                                    "received DaConsensusMessage::DaCertificate2 for view {} but \
488                                     epochs are not enabled for that view",
489                                    cert.view_number()
490                                );
491                                return;
492                            }
493                            HotShotEvent::DaCertificateRecv(cert)
494                        },
495                        DaConsensusMessage::VidDisperseMsg(proposal) => {
496                            if self
497                                .upgrade_lock
498                                .epochs_enabled(proposal.data.view_number())
499                                .await
500                            {
501                                tracing::warn!(
502                                    "received DaConsensusMessage::VidDisperseMsg for view {} but \
503                                     epochs are enabled for that view",
504                                    proposal.data.view_number()
505                                );
506                                return;
507                            }
508                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
509                        },
510                        DaConsensusMessage::VidDisperseMsg2(proposal) => {
511                            if !self
512                                .upgrade_lock
513                                .epochs_enabled(proposal.data.view_number())
514                                .await
515                            {
516                                tracing::warn!(
517                                    "received DaConsensusMessage::VidDisperseMsg2 for view {} but \
518                                     epochs are not enabled for that view",
519                                    proposal.data.view_number()
520                                );
521                                return;
522                            }
523                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
524                        },
525                    },
526                };
527                broadcast_event(Arc::new(event), &self.internal_event_stream).await;
528            },
529
530            // Handle data messages
531            MessageKind::Data(message) => match message {
532                DataMessage::SubmitTransaction(transaction, _) => {
533                    let mut hasher = DefaultHasher::new();
534                    transaction.hash(&mut hasher);
535                    if self.transactions_cache.put(hasher.finish(), ()).is_some() {
536                        return;
537                    }
538                    broadcast_event(
539                        Arc::new(HotShotEvent::TransactionsRecv(vec![transaction])),
540                        &self.internal_event_stream,
541                    )
542                    .await;
543                },
544                DataMessage::DataResponse(response) => {
545                    if let ResponseMessage::Found(message) = response {
546                        match message {
547                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(proposal)) => {
548                                broadcast_event(
549                                    Arc::new(HotShotEvent::VidResponseRecv(
550                                        sender,
551                                        convert_proposal(proposal),
552                                    )),
553                                    &self.internal_event_stream,
554                                )
555                                .await;
556                            },
557                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
558                                proposal,
559                            )) => {
560                                broadcast_event(
561                                    Arc::new(HotShotEvent::VidResponseRecv(
562                                        sender,
563                                        convert_proposal(proposal),
564                                    )),
565                                    &self.internal_event_stream,
566                                )
567                                .await;
568                            },
569                            _ => {},
570                        }
571                    }
572                },
573                DataMessage::RequestData(data) => {
574                    let req_data = data.clone();
575                    if let RequestKind::Vid(_view_number, _key) = req_data.request {
576                        broadcast_event(
577                            Arc::new(HotShotEvent::VidRequestRecv(data, sender)),
578                            &self.internal_event_stream,
579                        )
580                        .await;
581                    }
582                },
583            },
584
585            // Handle external messages
586            MessageKind::External(data) => {
587                if sender == self.public_key {
588                    return;
589                }
590                // Send the external message to the external event stream so it can be processed
591                broadcast_event(
592                    Event {
593                        view_number: TYPES::View::new(1),
594                        event: EventType::ExternalMessageReceived { sender, data },
595                    },
596                    &self.external_event_stream,
597                )
598                .await;
599            },
600        }
601    }
602}
603
604/// network event task state
605pub struct NetworkEventTaskState<
606    TYPES: NodeType,
607    V: Versions,
608    NET: ConnectedNetwork<TYPES::SignatureKey>,
609    S: Storage<TYPES>,
610> {
611    /// comm network
612    pub network: Arc<NET>,
613
614    /// view number
615    pub view: TYPES::View,
616
617    /// epoch number
618    pub epoch: Option<TYPES::Epoch>,
619
620    /// network memberships
621    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
622
623    /// Storage to store actionable events
624    pub storage: S,
625
626    /// Storage metrics
627    pub storage_metrics: Arc<StorageMetricsValue>,
628
629    /// Shared consensus state
630    pub consensus: OuterConsensus<TYPES>,
631
632    /// Lock for a decided upgrade
633    pub upgrade_lock: UpgradeLock<TYPES, V>,
634
635    /// map view number to transmit tasks
636    pub transmit_tasks: BTreeMap<TYPES::View, Vec<JoinHandle<()>>>,
637
638    /// Number of blocks in an epoch, zero means there are no epochs
639    pub epoch_height: u64,
640
641    /// Node's id
642    pub id: u64,
643}
644
645#[async_trait]
646impl<
647        TYPES: NodeType,
648        V: Versions,
649        NET: ConnectedNetwork<TYPES::SignatureKey>,
650        S: Storage<TYPES> + 'static,
651    > TaskState for NetworkEventTaskState<TYPES, V, NET, S>
652{
653    type Event = HotShotEvent<TYPES>;
654
655    async fn handle_event(
656        &mut self,
657        event: Arc<Self::Event>,
658        _sender: &Sender<Arc<Self::Event>>,
659        _receiver: &Receiver<Arc<Self::Event>>,
660    ) -> Result<()> {
661        self.handle(event).await;
662
663        Ok(())
664    }
665
666    fn cancel_subtasks(&mut self) {}
667}
668
669impl<
670        TYPES: NodeType,
671        V: Versions,
672        NET: ConnectedNetwork<TYPES::SignatureKey>,
673        S: Storage<TYPES> + 'static,
674    > NetworkEventTaskState<TYPES, V, NET, S>
675{
676    /// Handle the given event.
677    ///
678    /// Returns the completion status.
679    #[instrument(skip_all, fields(id = self.id, view = *self.view), name = "Network Task", level = "error")]
680    pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
681        let mut maybe_action = None;
682        if let Some((sender, message_kind, transmit)) =
683            self.parse_event(event, &mut maybe_action).await
684        {
685            self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
686                .await;
687        };
688    }
689
690    /// handle `VidDisperseSend`
691    async fn handle_vid_disperse_proposal(
692        &self,
693        vid_proposal: Proposal<TYPES, VidDisperse<TYPES>>,
694        sender: &<TYPES as NodeType>::SignatureKey,
695    ) -> Option<HotShotTaskCompleted> {
696        let view = vid_proposal.data.view_number();
697        let epoch = vid_proposal.data.epoch();
698        let vid_share_proposals = VidDisperseShare::to_vid_share_proposals(vid_proposal);
699        let mut messages = HashMap::new();
700
701        for proposal in vid_share_proposals {
702            let recipient = proposal.data.recipient_key().clone();
703            let message = if self
704                .upgrade_lock
705                .epochs_enabled(proposal.data.view_number())
706                .await
707            {
708                let vid_share_proposal = if let VidDisperseShare::V1(data) = proposal.data {
709                    Proposal {
710                        data,
711                        signature: proposal.signature,
712                        _pd: proposal._pd,
713                    }
714                } else {
715                    tracing::warn!(
716                        "Epochs are enabled for view {} but didn't receive VidDisperseShare2",
717                        proposal.data.view_number()
718                    );
719                    return None;
720                };
721                Message {
722                    sender: sender.clone(),
723                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
724                        DaConsensusMessage::VidDisperseMsg2(vid_share_proposal),
725                    )),
726                }
727            } else {
728                let vid_share_proposal = if let VidDisperseShare::V0(data) = proposal.data {
729                    Proposal {
730                        data,
731                        signature: proposal.signature,
732                        _pd: proposal._pd,
733                    }
734                } else {
735                    tracing::warn!(
736                        "Epochs are not enabled for view {} but didn't receive ADVZDisperseShare",
737                        proposal.data.view_number()
738                    );
739                    return None;
740                };
741                Message {
742                    sender: sender.clone(),
743                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
744                        DaConsensusMessage::VidDisperseMsg(vid_share_proposal),
745                    )),
746                }
747            };
748            let serialized_message = match self.upgrade_lock.serialize(&message).await {
749                Ok(serialized) => serialized,
750                Err(e) => {
751                    tracing::error!("Failed to serialize message: {e}");
752                    continue;
753                },
754            };
755
756            messages.insert(recipient, serialized_message);
757        }
758
759        let net = Arc::clone(&self.network);
760        let storage = self.storage.clone();
761        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
762        spawn(async move {
763            if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
764                Some(HotShotAction::VidDisperse),
765                storage,
766                consensus,
767                view,
768                epoch,
769            )
770            .await
771            .is_err()
772            {
773                return;
774            }
775            match net.vid_broadcast_message(messages).await {
776                Ok(()) => {},
777                Err(e) => tracing::warn!("Failed to send message from network task: {e:?}"),
778            }
779        });
780
781        None
782    }
783
784    /// Record `HotShotAction` if available
785    async fn maybe_record_action(
786        maybe_action: Option<HotShotAction>,
787        storage: S,
788        consensus: OuterConsensus<TYPES>,
789        view: <TYPES as NodeType>::View,
790        epoch: Option<<TYPES as NodeType>::Epoch>,
791    ) -> std::result::Result<(), ()> {
792        if let Some(mut action) = maybe_action {
793            if !consensus.write().await.update_action(action, view) {
794                return Err(());
795            }
796            // If the action was view sync record it as a vote, but we don't
797            // want to limit to 1 View sync vote above so change the action here.
798            if matches!(action, HotShotAction::ViewSyncVote) {
799                action = HotShotAction::Vote;
800            }
801            match storage.record_action(view, epoch, action).await {
802                Ok(()) => Ok(()),
803                Err(e) => {
804                    tracing::warn!("Not Sending {action:?} because of storage error: {e:?}");
805                    Err(())
806                },
807            }
808        } else {
809            Ok(())
810        }
811    }
812
813    /// Cancel all tasks for previous views
814    pub fn cancel_tasks(&mut self, view: TYPES::View) {
815        let keep = self.transmit_tasks.split_off(&view);
816
817        while let Some((_, tasks)) = self.transmit_tasks.pop_first() {
818            for task in tasks {
819                task.abort();
820            }
821        }
822
823        self.transmit_tasks = keep;
824    }
825
826    /// Parses a `HotShotEvent` and returns a tuple of: (sender's public key, `MessageKind`, `TransmitType`)
827    /// which will be used to create a message and transmit on the wire.
828    /// Returns `None` if the parsing result should not be sent on the wire.
829    /// Handles the `VidDisperseSend` event separately using a helper method.
830    #[allow(clippy::too_many_lines)]
831    async fn parse_event(
832        &mut self,
833        event: Arc<HotShotEvent<TYPES>>,
834        maybe_action: &mut Option<HotShotAction>,
835    ) -> Option<(
836        <TYPES as NodeType>::SignatureKey,
837        MessageKind<TYPES>,
838        TransmitType<TYPES>,
839    )> {
840        match event.as_ref().clone() {
841            HotShotEvent::QuorumProposalSend(proposal, sender) => {
842                *maybe_action = Some(HotShotAction::Propose);
843
844                let message = if self
845                    .upgrade_lock
846                    .epochs_enabled(proposal.data.view_number())
847                    .await
848                {
849                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
850                        GeneralConsensusMessage::Proposal2(convert_proposal(proposal)),
851                    ))
852                } else {
853                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
854                        GeneralConsensusMessage::Proposal(convert_proposal(proposal)),
855                    ))
856                };
857
858                Some((sender, message, TransmitType::Broadcast))
859            },
860
861            // ED Each network task is subscribed to all these message types.  Need filters per network task
862            HotShotEvent::QuorumVoteSend(vote) => {
863                *maybe_action = Some(HotShotAction::Vote);
864                let view_number = vote.view_number() + 1;
865                let leader = match self
866                    .membership_coordinator
867                    .membership_for_epoch(vote.epoch())
868                    .await
869                    .ok()?
870                    .leader(view_number)
871                    .await
872                {
873                    Ok(l) => l,
874                    Err(e) => {
875                        tracing::warn!(
876                            "Failed to calculate leader for view number {view_number}. Error: \
877                             {e:?}"
878                        );
879                        return None;
880                    },
881                };
882
883                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
884                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
885                        GeneralConsensusMessage::Vote2(vote.clone()),
886                    ))
887                } else {
888                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
889                        GeneralConsensusMessage::Vote(vote.clone().to_vote()),
890                    ))
891                };
892
893                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
894            },
895            HotShotEvent::EpochRootQuorumVoteSend(vote) => {
896                *maybe_action = Some(HotShotAction::Vote);
897                let view_number = vote.view_number() + 1;
898                let leader = match self
899                    .membership_coordinator
900                    .membership_for_epoch(vote.epoch())
901                    .await
902                    .ok()?
903                    .leader(view_number)
904                    .await
905                {
906                    Ok(l) => l,
907                    Err(e) => {
908                        tracing::warn!(
909                            "Failed to calculate leader for view number {:?}. Error: {:?}",
910                            view_number,
911                            e
912                        );
913                        return None;
914                    },
915                };
916
917                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
918                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
919                        GeneralConsensusMessage::EpochRootQuorumVote(vote.clone()),
920                    ))
921                } else {
922                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
923                        GeneralConsensusMessage::Vote(vote.vote.clone().to_vote()),
924                    ))
925                };
926
927                Some((
928                    vote.vote.signing_key(),
929                    message,
930                    TransmitType::Direct(leader),
931                ))
932            },
933            HotShotEvent::ExtendedQuorumVoteSend(vote) => {
934                *maybe_action = Some(HotShotAction::Vote);
935                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
936                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
937                        GeneralConsensusMessage::Vote2(vote.clone()),
938                    ))
939                } else {
940                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
941                        GeneralConsensusMessage::Vote(vote.clone().to_vote()),
942                    ))
943                };
944
945                Some((vote.signing_key(), message, TransmitType::Broadcast))
946            },
947            HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
948                req.key.clone(),
949                MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
950                    GeneralConsensusMessage::ProposalRequested(req.clone(), signature),
951                )),
952                TransmitType::Broadcast,
953            )),
954            HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => {
955                let message = if self
956                    .upgrade_lock
957                    .epochs_enabled(proposal.data.view_number())
958                    .await
959                {
960                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
961                        GeneralConsensusMessage::ProposalResponse2(convert_proposal(proposal)),
962                    ))
963                } else {
964                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
965                        GeneralConsensusMessage::ProposalResponse(convert_proposal(proposal)),
966                    ))
967                };
968
969                Some((
970                    sender_key.clone(),
971                    message,
972                    TransmitType::Direct(sender_key),
973                ))
974            },
975            HotShotEvent::VidDisperseSend(proposal, sender) => {
976                self.handle_vid_disperse_proposal(proposal, &sender).await;
977                None
978            },
979            HotShotEvent::DaProposalSend(proposal, sender) => {
980                *maybe_action = Some(HotShotAction::DaPropose);
981
982                let message = if self
983                    .upgrade_lock
984                    .epochs_enabled(proposal.data.view_number())
985                    .await
986                {
987                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
988                        DaConsensusMessage::DaProposal2(proposal),
989                    ))
990                } else {
991                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
992                        DaConsensusMessage::DaProposal(convert_proposal(proposal)),
993                    ))
994                };
995
996                Some((sender, message, TransmitType::DaCommitteeBroadcast))
997            },
998            HotShotEvent::DaVoteSend(vote) => {
999                *maybe_action = Some(HotShotAction::DaVote);
1000                let view_number = vote.view_number();
1001                let leader = match self
1002                    .membership_coordinator
1003                    .membership_for_epoch(vote.epoch())
1004                    .await
1005                    .ok()?
1006                    .leader(view_number)
1007                    .await
1008                {
1009                    Ok(l) => l,
1010                    Err(e) => {
1011                        tracing::warn!(
1012                            "Failed to calculate leader for view number {view_number}. Error: \
1013                             {e:?}"
1014                        );
1015                        return None;
1016                    },
1017                };
1018
1019                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1020                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1021                        DaConsensusMessage::DaVote2(vote.clone()),
1022                    ))
1023                } else {
1024                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1025                        DaConsensusMessage::DaVote(vote.clone().to_vote()),
1026                    ))
1027                };
1028
1029                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1030            },
1031            HotShotEvent::DacSend(certificate, sender) => {
1032                *maybe_action = Some(HotShotAction::DaCert);
1033                let message = if self
1034                    .upgrade_lock
1035                    .epochs_enabled(certificate.view_number())
1036                    .await
1037                {
1038                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1039                        DaConsensusMessage::DaCertificate2(certificate),
1040                    ))
1041                } else {
1042                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1043                        DaConsensusMessage::DaCertificate(certificate.to_dac()),
1044                    ))
1045                };
1046
1047                Some((sender, message, TransmitType::Broadcast))
1048            },
1049            HotShotEvent::ViewSyncPreCommitVoteSend(vote) => {
1050                let view_number = vote.view_number() + vote.date().relay;
1051                let leader = match self
1052                    .membership_coordinator
1053                    .membership_for_epoch(self.epoch)
1054                    .await
1055                    .ok()?
1056                    .leader(view_number)
1057                    .await
1058                {
1059                    Ok(l) => l,
1060                    Err(e) => {
1061                        tracing::warn!(
1062                            "Failed to calculate leader for view number {view_number}. Error: \
1063                             {e:?}"
1064                        );
1065                        return None;
1066                    },
1067                };
1068                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1069                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1070                        GeneralConsensusMessage::ViewSyncPreCommitVote2(vote.clone()),
1071                    ))
1072                } else {
1073                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1074                        GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone().to_vote()),
1075                    ))
1076                };
1077
1078                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1079            },
1080            HotShotEvent::ViewSyncCommitVoteSend(vote) => {
1081                *maybe_action = Some(HotShotAction::ViewSyncVote);
1082                let view_number = vote.view_number() + vote.date().relay;
1083                let leader = match self
1084                    .membership_coordinator
1085                    .membership_for_epoch(self.epoch)
1086                    .await
1087                    .ok()?
1088                    .leader(view_number)
1089                    .await
1090                {
1091                    Ok(l) => l,
1092                    Err(e) => {
1093                        tracing::warn!(
1094                            "Failed to calculate leader for view number {view_number}. Error: \
1095                             {e:?}"
1096                        );
1097                        return None;
1098                    },
1099                };
1100                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1101                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1102                        GeneralConsensusMessage::ViewSyncCommitVote2(vote.clone()),
1103                    ))
1104                } else {
1105                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1106                        GeneralConsensusMessage::ViewSyncCommitVote(vote.clone().to_vote()),
1107                    ))
1108                };
1109
1110                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1111            },
1112            HotShotEvent::ViewSyncFinalizeVoteSend(vote) => {
1113                *maybe_action = Some(HotShotAction::ViewSyncVote);
1114                let view_number = vote.view_number() + vote.date().relay;
1115                let leader = match self
1116                    .membership_coordinator
1117                    .membership_for_epoch(self.epoch)
1118                    .await
1119                    .ok()?
1120                    .leader(view_number)
1121                    .await
1122                {
1123                    Ok(l) => l,
1124                    Err(e) => {
1125                        tracing::warn!(
1126                            "Failed to calculate leader for view number {view_number:?}. Error: \
1127                             {e:?}"
1128                        );
1129                        return None;
1130                    },
1131                };
1132                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1133                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1134                        GeneralConsensusMessage::ViewSyncFinalizeVote2(vote.clone()),
1135                    ))
1136                } else {
1137                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1138                        GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone().to_vote()),
1139                    ))
1140                };
1141
1142                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1143            },
1144            HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, sender) => {
1145                let view_number = certificate.view_number();
1146                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1147                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1148                        GeneralConsensusMessage::ViewSyncPreCommitCertificate2(certificate),
1149                    ))
1150                } else {
1151                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1152                        GeneralConsensusMessage::ViewSyncPreCommitCertificate(certificate.to_vsc()),
1153                    ))
1154                };
1155
1156                Some((sender, message, TransmitType::Broadcast))
1157            },
1158            HotShotEvent::ViewSyncCommitCertificateSend(certificate, sender) => {
1159                let view_number = certificate.view_number();
1160                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1161                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1162                        GeneralConsensusMessage::ViewSyncCommitCertificate2(certificate),
1163                    ))
1164                } else {
1165                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1166                        GeneralConsensusMessage::ViewSyncCommitCertificate(certificate.to_vsc()),
1167                    ))
1168                };
1169
1170                Some((sender, message, TransmitType::Broadcast))
1171            },
1172            HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, sender) => {
1173                let view_number = certificate.view_number();
1174                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1175                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1176                        GeneralConsensusMessage::ViewSyncFinalizeCertificate2(certificate),
1177                    ))
1178                } else {
1179                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1180                        GeneralConsensusMessage::ViewSyncFinalizeCertificate(certificate.to_vsc()),
1181                    ))
1182                };
1183
1184                Some((sender, message, TransmitType::Broadcast))
1185            },
1186            HotShotEvent::TimeoutVoteSend(vote) => {
1187                *maybe_action = Some(HotShotAction::TimeoutVote);
1188                let view_number = vote.view_number() + 1;
1189                let leader = match self
1190                    .membership_coordinator
1191                    .membership_for_epoch(self.epoch)
1192                    .await
1193                    .ok()?
1194                    .leader(view_number)
1195                    .await
1196                {
1197                    Ok(l) => l,
1198                    Err(e) => {
1199                        tracing::warn!(
1200                            "Failed to calculate leader for view number {view_number}. Error: \
1201                             {e:?}"
1202                        );
1203                        return None;
1204                    },
1205                };
1206                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1207                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1208                        GeneralConsensusMessage::TimeoutVote2(vote.clone()),
1209                    ))
1210                } else {
1211                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1212                        GeneralConsensusMessage::TimeoutVote(vote.clone().to_vote()),
1213                    ))
1214                };
1215
1216                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1217            },
1218            HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
1219                sender,
1220                MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1221                    GeneralConsensusMessage::UpgradeProposal(proposal),
1222                )),
1223                TransmitType::Broadcast,
1224            )),
1225            HotShotEvent::UpgradeVoteSend(vote) => {
1226                tracing::error!("Sending upgrade vote!");
1227                let view_number = vote.view_number();
1228                let leader = match self
1229                    .membership_coordinator
1230                    .membership_for_epoch(self.epoch)
1231                    .await
1232                    .ok()?
1233                    .leader(view_number)
1234                    .await
1235                {
1236                    Ok(l) => l,
1237                    Err(e) => {
1238                        tracing::warn!(
1239                            "Failed to calculate leader for view number {view_number}. Error: \
1240                             {e:?}"
1241                        );
1242                        return None;
1243                    },
1244                };
1245                Some((
1246                    vote.signing_key(),
1247                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1248                        GeneralConsensusMessage::UpgradeVote(vote.clone()),
1249                    )),
1250                    TransmitType::Direct(leader),
1251                ))
1252            },
1253            HotShotEvent::ViewChange(view, epoch) => {
1254                self.view = view;
1255                if epoch > self.epoch {
1256                    self.epoch = epoch;
1257                }
1258                let keep_view = TYPES::View::new(view.saturating_sub(1));
1259                self.cancel_tasks(keep_view);
1260                let net = Arc::clone(&self.network);
1261                let epoch = self.epoch.map(|x| x.u64());
1262                let membership_coordinator = self.membership_coordinator.clone();
1263                spawn(async move {
1264                    net.update_view::<TYPES>(*keep_view, epoch, membership_coordinator)
1265                        .await;
1266                });
1267                None
1268            },
1269            HotShotEvent::VidRequestSend(req, sender, to) => Some((
1270                sender,
1271                MessageKind::Data(DataMessage::RequestData(req)),
1272                TransmitType::Direct(to),
1273            )),
1274            HotShotEvent::VidResponseSend(sender, to, proposal) => {
1275                let epochs_enabled = self
1276                    .upgrade_lock
1277                    .epochs_enabled(proposal.data.view_number())
1278                    .await;
1279                let message = match proposal.data {
1280                    VidDisperseShare::V0(data) => {
1281                        if epochs_enabled {
1282                            tracing::warn!(
1283                                "Epochs are enabled for view {} but didn't receive \
1284                                 VidDisperseShare2",
1285                                data.view_number()
1286                            );
1287                            return None;
1288                        }
1289                        let vid_share_proposal = Proposal {
1290                            data,
1291                            signature: proposal.signature,
1292                            _pd: proposal._pd,
1293                        };
1294                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1295                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(
1296                                vid_share_proposal,
1297                            )),
1298                        )))
1299                    },
1300                    VidDisperseShare::V1(data) => {
1301                        if !epochs_enabled {
1302                            tracing::warn!(
1303                                "Epochs are enabled for view {} but didn't receive \
1304                                 ADVZDisperseShare",
1305                                data.view_number()
1306                            );
1307                            return None;
1308                        }
1309                        let vid_share_proposal = Proposal {
1310                            data,
1311                            signature: proposal.signature,
1312                            _pd: proposal._pd,
1313                        };
1314                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1315                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
1316                                vid_share_proposal,
1317                            )),
1318                        )))
1319                    },
1320                };
1321                Some((sender, message, TransmitType::Direct(to)))
1322            },
1323            HotShotEvent::HighQcSend(quorum_cert, next_epoch_qc, leader, sender) => Some((
1324                sender,
1325                MessageKind::Consensus(SequencingMessage::General(
1326                    GeneralConsensusMessage::HighQc(quorum_cert, next_epoch_qc),
1327                )),
1328                TransmitType::Direct(leader),
1329            )),
1330            HotShotEvent::EpochRootQcSend(epoch_root_qc, sender, leader) => Some((
1331                sender,
1332                MessageKind::Consensus(SequencingMessage::General(
1333                    GeneralConsensusMessage::EpochRootQc(epoch_root_qc),
1334                )),
1335                TransmitType::Direct(leader),
1336            )),
1337            HotShotEvent::ExtendedQcSend(quorum_cert, next_epoch_qc, sender) => Some((
1338                sender,
1339                MessageKind::Consensus(SequencingMessage::General(
1340                    GeneralConsensusMessage::ExtendedQc(quorum_cert, next_epoch_qc),
1341                )),
1342                TransmitType::Broadcast,
1343            )),
1344            _ => None,
1345        }
1346    }
1347
1348    /// Creates a network message and spawns a task that transmits it on the wire.
1349    async fn spawn_transmit_task(
1350        &mut self,
1351        message_kind: MessageKind<TYPES>,
1352        maybe_action: Option<HotShotAction>,
1353        transmit: TransmitType<TYPES>,
1354        sender: TYPES::SignatureKey,
1355    ) {
1356        let broadcast_delay = match &message_kind {
1357            MessageKind::Consensus(
1358                SequencingMessage::General(GeneralConsensusMessage::Vote(_))
1359                | SequencingMessage::Da(_),
1360            ) => BroadcastDelay::View(*message_kind.view_number()),
1361            _ => BroadcastDelay::None,
1362        };
1363        let message = Message {
1364            sender,
1365            kind: message_kind,
1366        };
1367        let view_number = message.kind.view_number();
1368        let epoch = message.kind.epoch();
1369        let committee_topic = Topic::Global;
1370        let Ok(mem) = self
1371            .membership_coordinator
1372            .stake_table_for_epoch(self.epoch)
1373            .await
1374        else {
1375            return;
1376        };
1377        let da_committee = mem.da_committee_members(view_number).await;
1378        let network = Arc::clone(&self.network);
1379        let storage = self.storage.clone();
1380        let storage_metrics = Arc::clone(&self.storage_metrics);
1381        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
1382        let upgrade_lock = self.upgrade_lock.clone();
1383        let handle = spawn(async move {
1384            if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
1385                maybe_action,
1386                storage.clone(),
1387                consensus,
1388                view_number,
1389                epoch,
1390            )
1391            .await
1392            .is_err()
1393            {
1394                return;
1395            }
1396            if let MessageKind::Consensus(SequencingMessage::General(
1397                GeneralConsensusMessage::Proposal(prop),
1398            )) = &message.kind
1399            {
1400                let now = Instant::now();
1401                if storage
1402                    .append_proposal2(&convert_proposal(prop.clone()))
1403                    .await
1404                    .is_err()
1405                {
1406                    return;
1407                }
1408                storage_metrics
1409                    .append_quorum_duration
1410                    .add_point(now.elapsed().as_secs_f64());
1411            }
1412
1413            let serialized_message = match upgrade_lock.serialize(&message).await {
1414                Ok(serialized) => serialized,
1415                Err(e) => {
1416                    tracing::error!("Failed to serialize message: {e}");
1417                    return;
1418                },
1419            };
1420
1421            let transmit_result = match transmit {
1422                TransmitType::Direct(recipient) => {
1423                    network.direct_message(serialized_message, recipient).await
1424                },
1425                TransmitType::Broadcast => {
1426                    network
1427                        .broadcast_message(serialized_message, committee_topic, broadcast_delay)
1428                        .await
1429                },
1430                TransmitType::DaCommitteeBroadcast => {
1431                    network
1432                        .da_broadcast_message(
1433                            serialized_message,
1434                            da_committee.iter().cloned().collect(),
1435                            broadcast_delay,
1436                        )
1437                        .await
1438                },
1439            };
1440
1441            match transmit_result {
1442                Ok(()) => {},
1443                Err(e) => tracing::warn!("Failed to send message task: {e:?}"),
1444            }
1445        });
1446        self.transmit_tasks
1447            .entry(view_number)
1448            .or_default()
1449            .push(handle);
1450    }
1451}
1452
1453/// A module with test helpers
1454pub mod test {
1455    use std::ops::{Deref, DerefMut};
1456
1457    use async_trait::async_trait;
1458
1459    use super::{
1460        Arc, ConnectedNetwork, HotShotEvent, MessageKind, NetworkEventTaskState, NodeType,
1461        Receiver, Result, Sender, Storage, TaskState, TransmitType, Versions,
1462    };
1463
1464    /// A dynamic type alias for a function that takes the result of `NetworkEventTaskState::parse_event`
1465    /// and changes it before transmitting on the network.
1466    pub type ModifierClosure<TYPES> = dyn Fn(
1467            &mut <TYPES as NodeType>::SignatureKey,
1468            &mut MessageKind<TYPES>,
1469            &mut TransmitType<TYPES>,
1470            &<TYPES as NodeType>::Membership,
1471        ) + Send
1472        + Sync;
1473
1474    /// A helper wrapper around `NetworkEventTaskState` that can modify its behaviour for tests
1475    pub struct NetworkEventTaskStateModifier<
1476        TYPES: NodeType,
1477        V: Versions,
1478        NET: ConnectedNetwork<TYPES::SignatureKey>,
1479        S: Storage<TYPES>,
1480    > {
1481        /// The real `NetworkEventTaskState`
1482        pub network_event_task_state: NetworkEventTaskState<TYPES, V, NET, S>,
1483        /// A function that takes the result of `NetworkEventTaskState::parse_event` and
1484        /// changes it before transmitting on the network.
1485        pub modifier: Arc<ModifierClosure<TYPES>>,
1486    }
1487
1488    impl<
1489            TYPES: NodeType,
1490            V: Versions,
1491            NET: ConnectedNetwork<TYPES::SignatureKey>,
1492            S: Storage<TYPES> + 'static,
1493        > NetworkEventTaskStateModifier<TYPES, V, NET, S>
1494    {
1495        /// Handles the received event modifying it before sending on the network.
1496        pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
1497            let mut maybe_action = None;
1498            if let Some((mut sender, mut message_kind, mut transmit)) =
1499                self.parse_event(event, &mut maybe_action).await
1500            {
1501                // Modify the values acquired by parsing the event.
1502                (self.modifier)(
1503                    &mut sender,
1504                    &mut message_kind,
1505                    &mut transmit,
1506                    &*self.membership_coordinator.membership().read().await,
1507                );
1508
1509                self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
1510                    .await;
1511            }
1512        }
1513    }
1514
1515    #[async_trait]
1516    impl<
1517            TYPES: NodeType,
1518            V: Versions,
1519            NET: ConnectedNetwork<TYPES::SignatureKey>,
1520            S: Storage<TYPES> + 'static,
1521        > TaskState for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1522    {
1523        type Event = HotShotEvent<TYPES>;
1524
1525        async fn handle_event(
1526            &mut self,
1527            event: Arc<Self::Event>,
1528            _sender: &Sender<Arc<Self::Event>>,
1529            _receiver: &Receiver<Arc<Self::Event>>,
1530        ) -> Result<()> {
1531            self.handle(event).await;
1532
1533            Ok(())
1534        }
1535
1536        fn cancel_subtasks(&mut self) {}
1537    }
1538
1539    impl<
1540            TYPES: NodeType,
1541            V: Versions,
1542            NET: ConnectedNetwork<TYPES::SignatureKey>,
1543            S: Storage<TYPES>,
1544        > Deref for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1545    {
1546        type Target = NetworkEventTaskState<TYPES, V, NET, S>;
1547
1548        fn deref(&self) -> &Self::Target {
1549            &self.network_event_task_state
1550        }
1551    }
1552
1553    impl<
1554            TYPES: NodeType,
1555            V: Versions,
1556            NET: ConnectedNetwork<TYPES::SignatureKey>,
1557            S: Storage<TYPES>,
1558        > DerefMut for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1559    {
1560        fn deref_mut(&mut self) -> &mut Self::Target {
1561            &mut self.network_event_task_state
1562        }
1563    }
1564}