hotshot_task_impls/
network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap},
9    hash::{DefaultHasher, Hash},
10    sync::Arc,
11};
12
13use async_broadcast::{Receiver, Sender};
14use async_trait::async_trait;
15use hotshot_task::task::TaskState;
16use hotshot_types::{
17    consensus::OuterConsensus,
18    data::{VidDisperse, VidDisperseShare},
19    epoch_membership::EpochMembershipCoordinator,
20    event::{Event, EventType, HotShotAction},
21    message::{
22        convert_proposal, DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message,
23        MessageKind, Proposal, SequencingMessage, UpgradeLock,
24    },
25    simple_vote::HasEpoch,
26    storage_metrics::StorageMetricsValue,
27    traits::{
28        network::{
29            BroadcastDelay, ConnectedNetwork, RequestKind, ResponseMessage, Topic, TransmitType,
30            ViewMessage,
31        },
32        node_implementation::{ConsensusTime, NodeType, Versions},
33        storage::Storage,
34    },
35    vote::{HasViewNumber, Vote},
36};
37use hotshot_utils::anytrace::*;
38use tokio::{spawn, task::JoinHandle, time::Instant};
39use tracing::instrument;
40
41use crate::{
42    events::{HotShotEvent, HotShotTaskCompleted},
43    helpers::broadcast_event,
44};
45
46/// the network message task state
47#[derive(Clone)]
48pub struct NetworkMessageTaskState<TYPES: NodeType, V: Versions> {
49    /// Sender to send internal events this task generates to other tasks
50    pub internal_event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
51
52    /// Sender to send external events this task generates to the event stream
53    pub external_event_stream: Sender<Event<TYPES>>,
54
55    /// This nodes public key
56    pub public_key: TYPES::SignatureKey,
57
58    /// Lock for a decided upgrade
59    pub upgrade_lock: UpgradeLock<TYPES, V>,
60
61    /// Node's id
62    pub id: u64,
63}
64
65impl<TYPES: NodeType, V: Versions> NetworkMessageTaskState<TYPES, V> {
66    #[instrument(skip_all, name = "Network message task", fields(id = self.id), level = "trace")]
67    /// Handles a (deserialized) message from the network
68    pub async fn handle_message(&mut self, message: Message<TYPES>) {
69        match &message.kind {
70            MessageKind::Consensus(_) => {
71                tracing::debug!("Received consensus message from network: {:?}", message)
72            },
73            MessageKind::Data(_) => {
74                tracing::trace!("Received data message from network: {:?}", message)
75            },
76            MessageKind::External(_) => {
77                tracing::trace!("Received external message from network: {:?}", message)
78            },
79        }
80
81        // Match the message kind and send the appropriate event to the internal event stream
82        let sender = message.sender;
83        match message.kind {
84            // Handle consensus messages
85            MessageKind::Consensus(consensus_message) => {
86                let event = match consensus_message {
87                    SequencingMessage::General(general_message) => match general_message {
88                        GeneralConsensusMessage::Proposal(proposal) => {
89                            if self
90                                .upgrade_lock
91                                .epochs_enabled(proposal.data.view_number())
92                                .await
93                            {
94                                tracing::warn!(
95                                    "received GeneralConsensusMessage::Proposal for view {} but \
96                                     epochs are enabled for that view",
97                                    proposal.data.view_number()
98                                );
99                                return;
100                            }
101                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
102                        },
103                        GeneralConsensusMessage::Proposal2Legacy(proposal) => {
104                            if !self
105                                .upgrade_lock
106                                .proposal2_legacy_version(proposal.data.view_number())
107                                .await
108                            {
109                                tracing::warn!(
110                                    "received GeneralConsensusMessage::Proposal2Legacy for view \
111                                     {} but we are in the wrong version for that message type",
112                                    proposal.data.view_number()
113                                );
114                                return;
115                            }
116                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
117                        },
118                        GeneralConsensusMessage::Proposal2(proposal) => {
119                            if !self
120                                .upgrade_lock
121                                .proposal2_version(proposal.data.view_number())
122                                .await
123                            {
124                                tracing::warn!(
125                                    "received GeneralConsensusMessage::Proposal2 for view {} but \
126                                     we are in the wrong version for that message type",
127                                    proposal.data.view_number()
128                                );
129                                return;
130                            }
131                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
132                        },
133                        GeneralConsensusMessage::ProposalRequested(req, sig) => {
134                            HotShotEvent::QuorumProposalRequestRecv(req, sig)
135                        },
136                        GeneralConsensusMessage::ProposalResponse(proposal) => {
137                            if self
138                                .upgrade_lock
139                                .epochs_enabled(proposal.data.view_number())
140                                .await
141                            {
142                                tracing::warn!(
143                                    "received GeneralConsensusMessage::ProposalResponse for view \
144                                     {} but we are in the wrong version for that message type",
145                                    proposal.data.view_number()
146                                );
147                                return;
148                            }
149                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
150                        },
151                        GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
152                            if !self
153                                .upgrade_lock
154                                .proposal2_legacy_version(proposal.data.view_number())
155                                .await
156                            {
157                                tracing::warn!(
158                                    "received GeneralConsensusMessage::ProposalResponse2Legacy \
159                                     for view {} but we are in the wrong version for that message \
160                                     type",
161                                    proposal.data.view_number()
162                                );
163                                return;
164                            }
165                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
166                        },
167                        GeneralConsensusMessage::ProposalResponse2(proposal) => {
168                            if !self
169                                .upgrade_lock
170                                .proposal2_version(proposal.data.view_number())
171                                .await
172                            {
173                                tracing::warn!(
174                                    "received GeneralConsensusMessage::ProposalResponse2 for view \
175                                     {} but epochs are not enabled for that view",
176                                    proposal.data.view_number()
177                                );
178                                return;
179                            }
180                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
181                        },
182                        GeneralConsensusMessage::Vote(vote) => {
183                            if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
184                                tracing::warn!(
185                                    "received GeneralConsensusMessage::Vote for view {} but \
186                                     epochs are enabled for that view",
187                                    vote.view_number()
188                                );
189                                return;
190                            }
191                            HotShotEvent::QuorumVoteRecv(vote.to_vote2())
192                        },
193                        GeneralConsensusMessage::Vote2(vote) => {
194                            if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
195                                tracing::warn!(
196                                    "received GeneralConsensusMessage::Vote2 for view {} but \
197                                     epochs are not enabled for that view",
198                                    vote.view_number()
199                                );
200                                return;
201                            }
202                            HotShotEvent::QuorumVoteRecv(vote)
203                        },
204                        GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => {
205                            if self
206                                .upgrade_lock
207                                .epochs_enabled(view_sync_message.view_number())
208                                .await
209                            {
210                                tracing::warn!(
211                                    "received GeneralConsensusMessage::ViewSyncPreCommitVote for \
212                                     view {} but epochs are enabled for that view",
213                                    view_sync_message.view_number()
214                                );
215                                return;
216                            }
217                            HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message.to_vote2())
218                        },
219                        GeneralConsensusMessage::ViewSyncPreCommitVote2(view_sync_message) => {
220                            if !self
221                                .upgrade_lock
222                                .epochs_enabled(view_sync_message.view_number())
223                                .await
224                            {
225                                tracing::warn!(
226                                    "received GeneralConsensusMessage::ViewSyncPreCommitVote2 for \
227                                     view {} but epochs are not enabled for that view",
228                                    view_sync_message.view_number()
229                                );
230                                return;
231                            }
232                            HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message)
233                        },
234                        GeneralConsensusMessage::ViewSyncPreCommitCertificate(
235                            view_sync_message,
236                        ) => {
237                            if self
238                                .upgrade_lock
239                                .epochs_enabled(view_sync_message.view_number())
240                                .await
241                            {
242                                tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
243                                return;
244                            }
245                            HotShotEvent::ViewSyncPreCommitCertificateRecv(
246                                view_sync_message.to_vsc2(),
247                            )
248                        },
249                        GeneralConsensusMessage::ViewSyncPreCommitCertificate2(
250                            view_sync_message,
251                        ) => {
252                            if !self
253                                .upgrade_lock
254                                .epochs_enabled(view_sync_message.view_number())
255                                .await
256                            {
257                                tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
258                                return;
259                            }
260                            HotShotEvent::ViewSyncPreCommitCertificateRecv(view_sync_message)
261                        },
262                        GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => {
263                            if self
264                                .upgrade_lock
265                                .epochs_enabled(view_sync_message.view_number())
266                                .await
267                            {
268                                tracing::warn!(
269                                    "received GeneralConsensusMessage::ViewSyncCommitVote for \
270                                     view {} but epochs are enabled for that view",
271                                    view_sync_message.view_number()
272                                );
273                                return;
274                            }
275                            HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message.to_vote2())
276                        },
277                        GeneralConsensusMessage::ViewSyncCommitVote2(view_sync_message) => {
278                            if !self
279                                .upgrade_lock
280                                .epochs_enabled(view_sync_message.view_number())
281                                .await
282                            {
283                                tracing::warn!(
284                                    "received GeneralConsensusMessage::ViewSyncCommitVote2 for \
285                                     view {} but epochs are not enabled for that view",
286                                    view_sync_message.view_number()
287                                );
288                                return;
289                            }
290                            HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message)
291                        },
292                        GeneralConsensusMessage::ViewSyncCommitCertificate(view_sync_message) => {
293                            if self
294                                .upgrade_lock
295                                .epochs_enabled(view_sync_message.view_number())
296                                .await
297                            {
298                                tracing::warn!(
299                                    "received GeneralConsensusMessage::ViewSyncCommitCertificate \
300                                     for view {} but epochs are enabled for that view",
301                                    view_sync_message.view_number()
302                                );
303                                return;
304                            }
305                            HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message.to_vsc2())
306                        },
307                        GeneralConsensusMessage::ViewSyncCommitCertificate2(view_sync_message) => {
308                            if !self
309                                .upgrade_lock
310                                .epochs_enabled(view_sync_message.view_number())
311                                .await
312                            {
313                                tracing::warn!(
314                                    "received GeneralConsensusMessage::ViewSyncCommitCertificate2 \
315                                     for view {} but epochs are not enabled for that view",
316                                    view_sync_message.view_number()
317                                );
318                                return;
319                            }
320                            HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message)
321                        },
322                        GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => {
323                            if self
324                                .upgrade_lock
325                                .epochs_enabled(view_sync_message.view_number())
326                                .await
327                            {
328                                tracing::warn!(
329                                    "received GeneralConsensusMessage::ViewSyncFinalizeVote for \
330                                     view {} but epochs are enabled for that view",
331                                    view_sync_message.view_number()
332                                );
333                                return;
334                            }
335                            HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message.to_vote2())
336                        },
337                        GeneralConsensusMessage::ViewSyncFinalizeVote2(view_sync_message) => {
338                            if !self
339                                .upgrade_lock
340                                .epochs_enabled(view_sync_message.view_number())
341                                .await
342                            {
343                                tracing::warn!(
344                                    "received GeneralConsensusMessage::ViewSyncFinalizeVote2 for \
345                                     view {} but epochs are not enabled for that view",
346                                    view_sync_message.view_number()
347                                );
348                                return;
349                            }
350                            HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message)
351                        },
352                        GeneralConsensusMessage::ViewSyncFinalizeCertificate(view_sync_message) => {
353                            if self
354                                .upgrade_lock
355                                .epochs_enabled(view_sync_message.view_number())
356                                .await
357                            {
358                                tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
359                                return;
360                            }
361                            HotShotEvent::ViewSyncFinalizeCertificateRecv(
362                                view_sync_message.to_vsc2(),
363                            )
364                        },
365                        GeneralConsensusMessage::ViewSyncFinalizeCertificate2(
366                            view_sync_message,
367                        ) => {
368                            if !self
369                                .upgrade_lock
370                                .epochs_enabled(view_sync_message.view_number())
371                                .await
372                            {
373                                tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
374                                return;
375                            }
376                            HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_message)
377                        },
378                        GeneralConsensusMessage::TimeoutVote(message) => {
379                            if self
380                                .upgrade_lock
381                                .epochs_enabled(message.view_number())
382                                .await
383                            {
384                                tracing::warn!(
385                                    "received GeneralConsensusMessage::TimeoutVote for view {} \
386                                     but epochs are enabled for that view",
387                                    message.view_number()
388                                );
389                                return;
390                            }
391                            HotShotEvent::TimeoutVoteRecv(message.to_vote2())
392                        },
393                        GeneralConsensusMessage::TimeoutVote2(message) => {
394                            if !self
395                                .upgrade_lock
396                                .epochs_enabled(message.view_number())
397                                .await
398                            {
399                                tracing::warn!(
400                                    "received GeneralConsensusMessage::TimeoutVote2 for view {} \
401                                     but epochs are not enabled for that view",
402                                    message.view_number()
403                                );
404                                return;
405                            }
406                            HotShotEvent::TimeoutVoteRecv(message)
407                        },
408                        GeneralConsensusMessage::UpgradeProposal(message) => {
409                            HotShotEvent::UpgradeProposalRecv(message, sender)
410                        },
411                        GeneralConsensusMessage::UpgradeVote(message) => {
412                            tracing::error!("Received upgrade vote!");
413                            HotShotEvent::UpgradeVoteRecv(message)
414                        },
415                        GeneralConsensusMessage::HighQc(qc, next_qc) => {
416                            HotShotEvent::HighQcRecv(qc, next_qc, sender)
417                        },
418                        GeneralConsensusMessage::ExtendedQc(qc, next_epoch_qc) => {
419                            HotShotEvent::ExtendedQcRecv(qc, next_epoch_qc, sender)
420                        },
421                        GeneralConsensusMessage::EpochRootQuorumVote(vote) => {
422                            if !self
423                                .upgrade_lock
424                                .proposal2_legacy_version(vote.view_number())
425                                .await
426                            {
427                                tracing::warn!(
428                                    "received GeneralConsensusMessage::EpochRootQuorumVote for \
429                                     view {} but we do not expect this message in this version",
430                                    vote.view_number()
431                                );
432                                return;
433                            }
434                            HotShotEvent::EpochRootQuorumVoteRecv(vote.to_vote2())
435                        },
436                        GeneralConsensusMessage::EpochRootQuorumVote2(vote) => {
437                            if !self
438                                .upgrade_lock
439                                .proposal2_version(vote.view_number())
440                                .await
441                            {
442                                tracing::warn!(
443                                    "received GeneralConsensusMessage::EpochRootQuorumVote2 for \
444                                     view {} but we do not expect this message in this version",
445                                    vote.view_number()
446                                );
447                                return;
448                            }
449                            HotShotEvent::EpochRootQuorumVoteRecv(vote)
450                        },
451                        GeneralConsensusMessage::EpochRootQc(root_qc) => {
452                            if !self
453                                .upgrade_lock
454                                .proposal2_version(root_qc.view_number())
455                                .await
456                            {
457                                tracing::warn!(
458                                    "received GeneralConsensusMessage::EpochRootQc for view {} \
459                                     but we are in the wrong version for that message types",
460                                    root_qc.view_number()
461                                );
462                                return;
463                            }
464                            HotShotEvent::EpochRootQcRecv(root_qc, sender)
465                        },
466                        GeneralConsensusMessage::EpochRootQcV1(root_qc) => {
467                            if !self
468                                .upgrade_lock
469                                .proposal2_legacy_version(root_qc.view_number())
470                                .await
471                            {
472                                tracing::warn!(
473                                    "received GeneralConsensusMessage::EpochRootQcV1 for view {} \
474                                     but we are in the wrong version for that message type",
475                                    root_qc.view_number()
476                                );
477                                return;
478                            }
479                            HotShotEvent::EpochRootQcRecv(root_qc.into(), sender)
480                        },
481                    },
482                    SequencingMessage::Da(da_message) => match da_message {
483                        DaConsensusMessage::DaProposal(proposal) => {
484                            if self
485                                .upgrade_lock
486                                .epochs_enabled(proposal.data.view_number())
487                                .await
488                            {
489                                tracing::warn!(
490                                    "received DaConsensusMessage::DaProposal for view {} but \
491                                     epochs are enabled for that view",
492                                    proposal.data.view_number()
493                                );
494                                return;
495                            }
496                            HotShotEvent::DaProposalRecv(convert_proposal(proposal), sender)
497                        },
498                        DaConsensusMessage::DaProposal2(proposal) => {
499                            if !self
500                                .upgrade_lock
501                                .epochs_enabled(proposal.data.view_number())
502                                .await
503                            {
504                                tracing::warn!(
505                                    "received DaConsensusMessage::DaProposal2 for view {} but \
506                                     epochs are not enabled for that view",
507                                    proposal.data.view_number()
508                                );
509                                return;
510                            }
511                            HotShotEvent::DaProposalRecv(proposal, sender)
512                        },
513                        DaConsensusMessage::DaVote(vote) => {
514                            if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
515                                tracing::warn!(
516                                    "received DaConsensusMessage::DaVote for view {} but epochs \
517                                     are enabled for that view",
518                                    vote.view_number()
519                                );
520                                return;
521                            }
522                            HotShotEvent::DaVoteRecv(vote.clone().to_vote2())
523                        },
524                        DaConsensusMessage::DaVote2(vote) => {
525                            if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
526                                tracing::warn!(
527                                    "received DaConsensusMessage::DaVote2 for view {} but epochs \
528                                     are not enabled for that view",
529                                    vote.view_number()
530                                );
531                                return;
532                            }
533                            HotShotEvent::DaVoteRecv(vote.clone())
534                        },
535                        DaConsensusMessage::DaCertificate(cert) => {
536                            if self.upgrade_lock.epochs_enabled(cert.view_number()).await {
537                                tracing::warn!(
538                                    "received DaConsensusMessage::DaCertificate for view {} but \
539                                     epochs are enabled for that view",
540                                    cert.view_number()
541                                );
542                                return;
543                            }
544                            HotShotEvent::DaCertificateRecv(cert.to_dac2())
545                        },
546                        DaConsensusMessage::DaCertificate2(cert) => {
547                            if !self.upgrade_lock.epochs_enabled(cert.view_number()).await {
548                                tracing::warn!(
549                                    "received DaConsensusMessage::DaCertificate2 for view {} but \
550                                     epochs are not enabled for that view",
551                                    cert.view_number()
552                                );
553                                return;
554                            }
555                            HotShotEvent::DaCertificateRecv(cert)
556                        },
557                        DaConsensusMessage::VidDisperseMsg(proposal) => {
558                            if self
559                                .upgrade_lock
560                                .epochs_enabled(proposal.data.view_number())
561                                .await
562                            {
563                                tracing::warn!(
564                                    "received DaConsensusMessage::VidDisperseMsg for view {} but \
565                                     epochs are enabled for that view",
566                                    proposal.data.view_number()
567                                );
568                                return;
569                            }
570                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
571                        },
572                        DaConsensusMessage::VidDisperseMsg1(proposal) => {
573                            if !self
574                                .upgrade_lock
575                                .epochs_enabled(proposal.data.view_number())
576                                .await
577                            {
578                                tracing::warn!(
579                                    "received DaConsensusMessage::VidDisperseMsg1 for view {} but \
580                                     epochs are not enabled for that view",
581                                    proposal.data.view_number()
582                                );
583                                return;
584                            }
585                            if self
586                                .upgrade_lock
587                                .upgraded_vid2(proposal.data.view_number())
588                                .await
589                            {
590                                tracing::warn!(
591                                    "received DaConsensusMessage::VidDisperseMsg1 for view {} but \
592                                     vid2 upgrade is enabled for that view",
593                                    proposal.data.view_number()
594                                );
595                                return;
596                            }
597                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
598                        },
599                        DaConsensusMessage::VidDisperseMsg2(proposal) => {
600                            if !self
601                                .upgrade_lock
602                                .upgraded_vid2(proposal.data.view_number())
603                                .await
604                            {
605                                tracing::warn!(
606                                    "received DaConsensusMessage::VidDisperseMsg2 for view {} but \
607                                     vid2 upgrade is not enabled for that view",
608                                    proposal.data.view_number()
609                                );
610                                return;
611                            }
612                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
613                        },
614                    },
615                };
616                broadcast_event(Arc::new(event), &self.internal_event_stream).await;
617            },
618
619            // Handle data messages
620            MessageKind::Data(message) => match message {
621                DataMessage::SubmitTransaction(transaction, _) => {
622                    let mut hasher = DefaultHasher::new();
623                    transaction.hash(&mut hasher);
624                    broadcast_event(
625                        Arc::new(HotShotEvent::TransactionsRecv(vec![transaction])),
626                        &self.internal_event_stream,
627                    )
628                    .await;
629                },
630                DataMessage::DataResponse(response) => {
631                    if let ResponseMessage::Found(message) = response {
632                        match message {
633                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(proposal)) => {
634                                broadcast_event(
635                                    Arc::new(HotShotEvent::VidResponseRecv(
636                                        sender,
637                                        convert_proposal(proposal),
638                                    )),
639                                    &self.internal_event_stream,
640                                )
641                                .await;
642                            },
643                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg1(
644                                proposal,
645                            )) => {
646                                broadcast_event(
647                                    Arc::new(HotShotEvent::VidResponseRecv(
648                                        sender,
649                                        convert_proposal(proposal),
650                                    )),
651                                    &self.internal_event_stream,
652                                )
653                                .await;
654                            },
655                            _ => {},
656                        }
657                    }
658                },
659                DataMessage::RequestData(data) => {
660                    let req_data = data.clone();
661                    if let RequestKind::Vid(_view_number, _key) = req_data.request {
662                        broadcast_event(
663                            Arc::new(HotShotEvent::VidRequestRecv(data, sender)),
664                            &self.internal_event_stream,
665                        )
666                        .await;
667                    }
668                },
669            },
670
671            // Handle external messages
672            MessageKind::External(data) => {
673                if sender == self.public_key {
674                    return;
675                }
676                // Send the external message to the external event stream so it can be processed
677                broadcast_event(
678                    Event {
679                        view_number: TYPES::View::new(1),
680                        event: EventType::ExternalMessageReceived { sender, data },
681                    },
682                    &self.external_event_stream,
683                )
684                .await;
685            },
686        }
687    }
688}
689
690/// network event task state
691pub struct NetworkEventTaskState<
692    TYPES: NodeType,
693    V: Versions,
694    NET: ConnectedNetwork<TYPES::SignatureKey>,
695    S: Storage<TYPES>,
696> {
697    /// comm network
698    pub network: Arc<NET>,
699
700    /// view number
701    pub view: TYPES::View,
702
703    /// epoch number
704    pub epoch: Option<TYPES::Epoch>,
705
706    /// network memberships
707    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
708
709    /// Storage to store actionable events
710    pub storage: S,
711
712    /// Storage metrics
713    pub storage_metrics: Arc<StorageMetricsValue>,
714
715    /// Shared consensus state
716    pub consensus: OuterConsensus<TYPES>,
717
718    /// Lock for a decided upgrade
719    pub upgrade_lock: UpgradeLock<TYPES, V>,
720
721    /// map view number to transmit tasks
722    pub transmit_tasks: BTreeMap<TYPES::View, Vec<JoinHandle<()>>>,
723
724    /// Number of blocks in an epoch, zero means there are no epochs
725    pub epoch_height: u64,
726
727    /// Node's id
728    pub id: u64,
729}
730
731#[async_trait]
732impl<
733        TYPES: NodeType,
734        V: Versions,
735        NET: ConnectedNetwork<TYPES::SignatureKey>,
736        S: Storage<TYPES> + 'static,
737    > TaskState for NetworkEventTaskState<TYPES, V, NET, S>
738{
739    type Event = HotShotEvent<TYPES>;
740
741    async fn handle_event(
742        &mut self,
743        event: Arc<Self::Event>,
744        _sender: &Sender<Arc<Self::Event>>,
745        _receiver: &Receiver<Arc<Self::Event>>,
746    ) -> Result<()> {
747        self.handle(event).await;
748
749        Ok(())
750    }
751
752    fn cancel_subtasks(&mut self) {}
753}
754
755impl<
756        TYPES: NodeType,
757        V: Versions,
758        NET: ConnectedNetwork<TYPES::SignatureKey>,
759        S: Storage<TYPES> + 'static,
760    > NetworkEventTaskState<TYPES, V, NET, S>
761{
762    /// Handle the given event.
763    ///
764    /// Returns the completion status.
765    #[instrument(skip_all, fields(id = self.id, view = *self.view), name = "Network Task", level = "error")]
766    pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
767        let mut maybe_action = None;
768        if let Some((sender, message_kind, transmit)) =
769            self.parse_event(event, &mut maybe_action).await
770        {
771            self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
772                .await;
773        };
774    }
775
776    /// handle `VidDisperseSend`
777    async fn handle_vid_disperse_proposal(
778        &self,
779        vid_proposal: Proposal<TYPES, VidDisperse<TYPES>>,
780        sender: &<TYPES as NodeType>::SignatureKey,
781    ) -> Option<HotShotTaskCompleted> {
782        let view = vid_proposal.data.view_number();
783        let epoch = vid_proposal.data.epoch();
784        let vid_share_proposals = VidDisperse::to_share_proposals(vid_proposal);
785        let mut messages = HashMap::new();
786
787        for proposal in vid_share_proposals {
788            let recipient = proposal.data.recipient_key().clone();
789            let epochs_enabled = self
790                .upgrade_lock
791                .epochs_enabled(proposal.data.view_number())
792                .await;
793            let upgraded_vid2 = self
794                .upgrade_lock
795                .upgraded_vid2(proposal.data.view_number())
796                .await;
797            let message = if !epochs_enabled {
798                let vid_share_proposal = if let VidDisperseShare::V0(data) = proposal.data {
799                    Proposal {
800                        data,
801                        signature: proposal.signature,
802                        _pd: proposal._pd,
803                    }
804                } else {
805                    tracing::warn!(
806                        "Epochs are not enabled for view {} but didn't receive VidDisperseShare1",
807                        proposal.data.view_number()
808                    );
809                    return None;
810                };
811                Message {
812                    sender: sender.clone(),
813                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
814                        DaConsensusMessage::VidDisperseMsg(vid_share_proposal),
815                    )),
816                }
817            } else if !upgraded_vid2 {
818                let vid_share_proposal = if let VidDisperseShare::V1(data) = proposal.data {
819                    Proposal {
820                        data,
821                        signature: proposal.signature,
822                        _pd: proposal._pd,
823                    }
824                } else {
825                    tracing::warn!(
826                        "Epochs are enabled and Vid2Upgrade is not enabled for view {} but didn't \
827                         receive VidDisperseShare2",
828                        proposal.data.view_number()
829                    );
830                    return None;
831                };
832                Message {
833                    sender: sender.clone(),
834                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
835                        DaConsensusMessage::VidDisperseMsg1(vid_share_proposal),
836                    )),
837                }
838            } else {
839                let vid_share_proposal = if let VidDisperseShare::V2(data) = proposal.data {
840                    Proposal {
841                        data,
842                        signature: proposal.signature,
843                        _pd: proposal._pd,
844                    }
845                } else {
846                    tracing::warn!(
847                        "Vid2Upgrade is enabled for view {} but didn't receive VidDisperseShare2",
848                        proposal.data.view_number()
849                    );
850                    return None;
851                };
852                Message {
853                    sender: sender.clone(),
854                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
855                        DaConsensusMessage::VidDisperseMsg2(vid_share_proposal),
856                    )),
857                }
858            };
859            let view = message.view_number();
860            let serialized_message = match self.upgrade_lock.serialize(&message).await {
861                Ok(serialized) => serialized,
862                Err(e) => {
863                    tracing::error!("Failed to serialize message: {e}");
864                    continue;
865                },
866            };
867
868            messages.insert(recipient, (view.u64().into(), serialized_message));
869        }
870
871        let net = Arc::clone(&self.network);
872        let storage = self.storage.clone();
873        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
874        spawn(async move {
875            if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
876                Some(HotShotAction::VidDisperse),
877                storage,
878                consensus,
879                view,
880                epoch,
881            )
882            .await
883            .is_err()
884            {
885                return;
886            }
887            match net.vid_broadcast_message(messages).await {
888                Ok(()) => {},
889                Err(e) => tracing::warn!("Failed to send message from network task: {e:?}"),
890            }
891        });
892
893        None
894    }
895
896    /// Record `HotShotAction` if available
897    async fn maybe_record_action(
898        maybe_action: Option<HotShotAction>,
899        storage: S,
900        consensus: OuterConsensus<TYPES>,
901        view: <TYPES as NodeType>::View,
902        epoch: Option<<TYPES as NodeType>::Epoch>,
903    ) -> std::result::Result<(), ()> {
904        if let Some(mut action) = maybe_action {
905            if !consensus.write().await.update_action(action, view) {
906                return Err(());
907            }
908            // If the action was view sync record it as a vote, but we don't
909            // want to limit to 1 View sync vote above so change the action here.
910            if matches!(action, HotShotAction::ViewSyncVote) {
911                action = HotShotAction::Vote;
912            }
913            match storage.record_action(view, epoch, action).await {
914                Ok(()) => Ok(()),
915                Err(e) => {
916                    tracing::warn!("Not Sending {action:?} because of storage error: {e:?}");
917                    Err(())
918                },
919            }
920        } else {
921            Ok(())
922        }
923    }
924
925    /// Cancel all tasks for previous views
926    pub fn cancel_tasks(&mut self, view: TYPES::View) {
927        let keep = self.transmit_tasks.split_off(&view);
928
929        while let Some((_, tasks)) = self.transmit_tasks.pop_first() {
930            for task in tasks {
931                task.abort();
932            }
933        }
934
935        self.transmit_tasks = keep;
936    }
937
938    /// Parses a `HotShotEvent` and returns a tuple of: (sender's public key, `MessageKind`, `TransmitType`)
939    /// which will be used to create a message and transmit on the wire.
940    /// Returns `None` if the parsing result should not be sent on the wire.
941    /// Handles the `VidDisperseSend` event separately using a helper method.
942    #[allow(clippy::too_many_lines)]
943    async fn parse_event(
944        &mut self,
945        event: Arc<HotShotEvent<TYPES>>,
946        maybe_action: &mut Option<HotShotAction>,
947    ) -> Option<(
948        <TYPES as NodeType>::SignatureKey,
949        MessageKind<TYPES>,
950        TransmitType<TYPES>,
951    )> {
952        match event.as_ref().clone() {
953            HotShotEvent::QuorumProposalSend(proposal, sender) => {
954                *maybe_action = Some(HotShotAction::Propose);
955
956                let message = if self
957                    .upgrade_lock
958                    .proposal2_version(proposal.data.view_number())
959                    .await
960                {
961                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
962                        GeneralConsensusMessage::Proposal2(convert_proposal(proposal)),
963                    ))
964                } else if self
965                    .upgrade_lock
966                    .proposal2_legacy_version(proposal.data.view_number())
967                    .await
968                {
969                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
970                        GeneralConsensusMessage::Proposal2Legacy(convert_proposal(proposal)),
971                    ))
972                } else {
973                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
974                        GeneralConsensusMessage::Proposal(convert_proposal(proposal)),
975                    ))
976                };
977
978                Some((sender, message, TransmitType::Broadcast))
979            },
980
981            // ED Each network task is subscribed to all these message types.  Need filters per network task
982            HotShotEvent::QuorumVoteSend(vote) => {
983                *maybe_action = Some(HotShotAction::Vote);
984                let view_number = vote.view_number() + 1;
985                let leader = match self
986                    .membership_coordinator
987                    .membership_for_epoch(vote.epoch())
988                    .await
989                    .ok()?
990                    .leader(view_number)
991                    .await
992                {
993                    Ok(l) => l,
994                    Err(e) => {
995                        tracing::warn!(
996                            "Failed to calculate leader for view number {view_number}. Error: \
997                             {e:?}"
998                        );
999                        return None;
1000                    },
1001                };
1002
1003                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1004                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1005                        GeneralConsensusMessage::Vote2(vote.clone()),
1006                    ))
1007                } else {
1008                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1009                        GeneralConsensusMessage::Vote(vote.clone().to_vote()),
1010                    ))
1011                };
1012
1013                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1014            },
1015            HotShotEvent::EpochRootQuorumVoteSend(vote) => {
1016                *maybe_action = Some(HotShotAction::Vote);
1017                let view_number = vote.view_number() + 1;
1018                let leader = match self
1019                    .membership_coordinator
1020                    .membership_for_epoch(vote.epoch())
1021                    .await
1022                    .ok()?
1023                    .leader(view_number)
1024                    .await
1025                {
1026                    Ok(l) => l,
1027                    Err(e) => {
1028                        tracing::warn!(
1029                            "Failed to calculate leader for view number {:?}. Error: {:?}",
1030                            view_number,
1031                            e
1032                        );
1033                        return None;
1034                    },
1035                };
1036
1037                let message = if self
1038                    .upgrade_lock
1039                    .proposal2_version(vote.view_number())
1040                    .await
1041                {
1042                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1043                        GeneralConsensusMessage::EpochRootQuorumVote2(vote.clone()),
1044                    ))
1045                } else if self
1046                    .upgrade_lock
1047                    .proposal2_legacy_version(vote.view_number())
1048                    .await
1049                {
1050                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1051                        GeneralConsensusMessage::EpochRootQuorumVote(vote.clone().to_vote()),
1052                    ))
1053                } else {
1054                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1055                        GeneralConsensusMessage::Vote(vote.vote.clone().to_vote()),
1056                    ))
1057                };
1058
1059                Some((
1060                    vote.vote.signing_key(),
1061                    message,
1062                    TransmitType::Direct(leader),
1063                ))
1064            },
1065            HotShotEvent::ExtendedQuorumVoteSend(vote) => {
1066                *maybe_action = Some(HotShotAction::Vote);
1067                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1068                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1069                        GeneralConsensusMessage::Vote2(vote.clone()),
1070                    ))
1071                } else {
1072                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1073                        GeneralConsensusMessage::Vote(vote.clone().to_vote()),
1074                    ))
1075                };
1076
1077                Some((vote.signing_key(), message, TransmitType::Broadcast))
1078            },
1079            HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
1080                req.key.clone(),
1081                MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1082                    GeneralConsensusMessage::ProposalRequested(req.clone(), signature),
1083                )),
1084                TransmitType::Broadcast,
1085            )),
1086            HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => {
1087                let message = if self
1088                    .upgrade_lock
1089                    .proposal2_version(proposal.data.view_number())
1090                    .await
1091                {
1092                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1093                        GeneralConsensusMessage::ProposalResponse2(convert_proposal(proposal)),
1094                    ))
1095                } else if self
1096                    .upgrade_lock
1097                    .proposal2_legacy_version(proposal.data.view_number())
1098                    .await
1099                {
1100                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1101                        GeneralConsensusMessage::ProposalResponse2Legacy(convert_proposal(
1102                            proposal,
1103                        )),
1104                    ))
1105                } else {
1106                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1107                        GeneralConsensusMessage::ProposalResponse(convert_proposal(proposal)),
1108                    ))
1109                };
1110
1111                Some((
1112                    sender_key.clone(),
1113                    message,
1114                    TransmitType::Direct(sender_key),
1115                ))
1116            },
1117            HotShotEvent::VidDisperseSend(proposal, sender) => {
1118                self.handle_vid_disperse_proposal(proposal, &sender).await;
1119                None
1120            },
1121            HotShotEvent::DaProposalSend(proposal, sender) => {
1122                *maybe_action = Some(HotShotAction::DaPropose);
1123
1124                let message = if self
1125                    .upgrade_lock
1126                    .epochs_enabled(proposal.data.view_number())
1127                    .await
1128                {
1129                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1130                        DaConsensusMessage::DaProposal2(proposal),
1131                    ))
1132                } else {
1133                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1134                        DaConsensusMessage::DaProposal(convert_proposal(proposal)),
1135                    ))
1136                };
1137
1138                Some((sender, message, TransmitType::DaCommitteeBroadcast))
1139            },
1140            HotShotEvent::DaVoteSend(vote) => {
1141                *maybe_action = Some(HotShotAction::DaVote);
1142                let view_number = vote.view_number();
1143                let leader = match self
1144                    .membership_coordinator
1145                    .membership_for_epoch(vote.epoch())
1146                    .await
1147                    .ok()?
1148                    .leader(view_number)
1149                    .await
1150                {
1151                    Ok(l) => l,
1152                    Err(e) => {
1153                        tracing::warn!(
1154                            "Failed to calculate leader for view number {view_number}. Error: \
1155                             {e:?}"
1156                        );
1157                        return None;
1158                    },
1159                };
1160
1161                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1162                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1163                        DaConsensusMessage::DaVote2(vote.clone()),
1164                    ))
1165                } else {
1166                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1167                        DaConsensusMessage::DaVote(vote.clone().to_vote()),
1168                    ))
1169                };
1170
1171                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1172            },
1173            HotShotEvent::DacSend(certificate, sender) => {
1174                *maybe_action = Some(HotShotAction::DaCert);
1175                let message = if self
1176                    .upgrade_lock
1177                    .epochs_enabled(certificate.view_number())
1178                    .await
1179                {
1180                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1181                        DaConsensusMessage::DaCertificate2(certificate),
1182                    ))
1183                } else {
1184                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1185                        DaConsensusMessage::DaCertificate(certificate.to_dac()),
1186                    ))
1187                };
1188
1189                Some((sender, message, TransmitType::Broadcast))
1190            },
1191            HotShotEvent::ViewSyncPreCommitVoteSend(vote) => {
1192                let view_number = vote.view_number() + vote.date().relay;
1193                let leader = match self
1194                    .membership_coordinator
1195                    .membership_for_epoch(self.epoch)
1196                    .await
1197                    .ok()?
1198                    .leader(view_number)
1199                    .await
1200                {
1201                    Ok(l) => l,
1202                    Err(e) => {
1203                        tracing::warn!(
1204                            "Failed to calculate leader for view number {view_number}. Error: \
1205                             {e:?}"
1206                        );
1207                        return None;
1208                    },
1209                };
1210                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1211                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1212                        GeneralConsensusMessage::ViewSyncPreCommitVote2(vote.clone()),
1213                    ))
1214                } else {
1215                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1216                        GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone().to_vote()),
1217                    ))
1218                };
1219
1220                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1221            },
1222            HotShotEvent::ViewSyncCommitVoteSend(vote) => {
1223                *maybe_action = Some(HotShotAction::ViewSyncVote);
1224                let view_number = vote.view_number() + vote.date().relay;
1225                let leader = match self
1226                    .membership_coordinator
1227                    .membership_for_epoch(self.epoch)
1228                    .await
1229                    .ok()?
1230                    .leader(view_number)
1231                    .await
1232                {
1233                    Ok(l) => l,
1234                    Err(e) => {
1235                        tracing::warn!(
1236                            "Failed to calculate leader for view number {view_number}. Error: \
1237                             {e:?}"
1238                        );
1239                        return None;
1240                    },
1241                };
1242                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1243                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1244                        GeneralConsensusMessage::ViewSyncCommitVote2(vote.clone()),
1245                    ))
1246                } else {
1247                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1248                        GeneralConsensusMessage::ViewSyncCommitVote(vote.clone().to_vote()),
1249                    ))
1250                };
1251
1252                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1253            },
1254            HotShotEvent::ViewSyncFinalizeVoteSend(vote) => {
1255                *maybe_action = Some(HotShotAction::ViewSyncVote);
1256                let view_number = vote.view_number() + vote.date().relay;
1257                let leader = match self
1258                    .membership_coordinator
1259                    .membership_for_epoch(self.epoch)
1260                    .await
1261                    .ok()?
1262                    .leader(view_number)
1263                    .await
1264                {
1265                    Ok(l) => l,
1266                    Err(e) => {
1267                        tracing::warn!(
1268                            "Failed to calculate leader for view number {view_number:?}. Error: \
1269                             {e:?}"
1270                        );
1271                        return None;
1272                    },
1273                };
1274                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1275                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1276                        GeneralConsensusMessage::ViewSyncFinalizeVote2(vote.clone()),
1277                    ))
1278                } else {
1279                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1280                        GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone().to_vote()),
1281                    ))
1282                };
1283
1284                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1285            },
1286            HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, sender) => {
1287                let view_number = certificate.view_number();
1288                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1289                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1290                        GeneralConsensusMessage::ViewSyncPreCommitCertificate2(certificate),
1291                    ))
1292                } else {
1293                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1294                        GeneralConsensusMessage::ViewSyncPreCommitCertificate(certificate.to_vsc()),
1295                    ))
1296                };
1297
1298                Some((sender, message, TransmitType::Broadcast))
1299            },
1300            HotShotEvent::ViewSyncCommitCertificateSend(certificate, sender) => {
1301                let view_number = certificate.view_number();
1302                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1303                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1304                        GeneralConsensusMessage::ViewSyncCommitCertificate2(certificate),
1305                    ))
1306                } else {
1307                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1308                        GeneralConsensusMessage::ViewSyncCommitCertificate(certificate.to_vsc()),
1309                    ))
1310                };
1311
1312                Some((sender, message, TransmitType::Broadcast))
1313            },
1314            HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, sender) => {
1315                let view_number = certificate.view_number();
1316                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1317                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1318                        GeneralConsensusMessage::ViewSyncFinalizeCertificate2(certificate),
1319                    ))
1320                } else {
1321                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1322                        GeneralConsensusMessage::ViewSyncFinalizeCertificate(certificate.to_vsc()),
1323                    ))
1324                };
1325
1326                Some((sender, message, TransmitType::Broadcast))
1327            },
1328            HotShotEvent::TimeoutVoteSend(vote) => {
1329                *maybe_action = Some(HotShotAction::TimeoutVote);
1330                let view_number = vote.view_number() + 1;
1331                let leader = match self
1332                    .membership_coordinator
1333                    .membership_for_epoch(self.epoch)
1334                    .await
1335                    .ok()?
1336                    .leader(view_number)
1337                    .await
1338                {
1339                    Ok(l) => l,
1340                    Err(e) => {
1341                        tracing::warn!(
1342                            "Failed to calculate leader for view number {view_number}. Error: \
1343                             {e:?}"
1344                        );
1345                        return None;
1346                    },
1347                };
1348                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1349                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1350                        GeneralConsensusMessage::TimeoutVote2(vote.clone()),
1351                    ))
1352                } else {
1353                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1354                        GeneralConsensusMessage::TimeoutVote(vote.clone().to_vote()),
1355                    ))
1356                };
1357
1358                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1359            },
1360            HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
1361                sender,
1362                MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1363                    GeneralConsensusMessage::UpgradeProposal(proposal),
1364                )),
1365                TransmitType::Broadcast,
1366            )),
1367            HotShotEvent::UpgradeVoteSend(vote) => {
1368                tracing::error!("Sending upgrade vote!");
1369                let view_number = vote.view_number();
1370                let leader = match self
1371                    .membership_coordinator
1372                    .membership_for_epoch(self.epoch)
1373                    .await
1374                    .ok()?
1375                    .leader(view_number)
1376                    .await
1377                {
1378                    Ok(l) => l,
1379                    Err(e) => {
1380                        tracing::warn!(
1381                            "Failed to calculate leader for view number {view_number}. Error: \
1382                             {e:?}"
1383                        );
1384                        return None;
1385                    },
1386                };
1387                Some((
1388                    vote.signing_key(),
1389                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1390                        GeneralConsensusMessage::UpgradeVote(vote.clone()),
1391                    )),
1392                    TransmitType::Direct(leader),
1393                ))
1394            },
1395            HotShotEvent::ViewChange(view, epoch) => {
1396                self.view = view;
1397                if epoch > self.epoch {
1398                    self.epoch = epoch;
1399                }
1400                let keep_view = TYPES::View::new(view.saturating_sub(1));
1401                self.cancel_tasks(keep_view);
1402                let net = Arc::clone(&self.network);
1403                let epoch = self.epoch.map(|x| x.u64().into());
1404                let membership_coordinator = self.membership_coordinator.clone();
1405                spawn(async move {
1406                    net.update_view::<TYPES>(keep_view.u64().into(), epoch, membership_coordinator)
1407                        .await;
1408                });
1409                None
1410            },
1411            HotShotEvent::VidRequestSend(req, sender, to) => Some((
1412                sender,
1413                MessageKind::Data(DataMessage::RequestData(req)),
1414                TransmitType::Direct(to),
1415            )),
1416            HotShotEvent::VidResponseSend(sender, to, proposal) => {
1417                let epochs_enabled = self
1418                    .upgrade_lock
1419                    .epochs_enabled(proposal.data.view_number())
1420                    .await;
1421                let upgraded_vid2 = self
1422                    .upgrade_lock
1423                    .upgraded_vid2(proposal.data.view_number())
1424                    .await;
1425                let message = match proposal.data {
1426                    VidDisperseShare::V0(data) => {
1427                        if epochs_enabled {
1428                            tracing::warn!(
1429                                "Epochs are enabled for view {} but still receive \
1430                                 VidDisperseShare0",
1431                                data.view_number()
1432                            );
1433                            return None;
1434                        }
1435                        let vid_share_proposal = Proposal {
1436                            data,
1437                            signature: proposal.signature,
1438                            _pd: proposal._pd,
1439                        };
1440                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1441                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(
1442                                vid_share_proposal,
1443                            )),
1444                        )))
1445                    },
1446                    VidDisperseShare::V1(data) => {
1447                        if !epochs_enabled {
1448                            tracing::warn!(
1449                                "Epochs are enabled for view {} but didn't receive \
1450                                 VidDisperseShare0",
1451                                data.view_number()
1452                            );
1453                            return None;
1454                        }
1455                        if upgraded_vid2 {
1456                            tracing::warn!(
1457                                "VID2 upgrade is enabled for view {} but didn't receive \
1458                                 VidDisperseShare2",
1459                                data.view_number()
1460                            );
1461                            return None;
1462                        }
1463                        let vid_share_proposal = Proposal {
1464                            data,
1465                            signature: proposal.signature,
1466                            _pd: proposal._pd,
1467                        };
1468                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1469                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg1(
1470                                vid_share_proposal,
1471                            )),
1472                        )))
1473                    },
1474                    VidDisperseShare::V2(data) => {
1475                        if !upgraded_vid2 {
1476                            tracing::warn!(
1477                                "VID2 upgrade is not enabled for view {} but receive \
1478                                 VidDisperseShare2",
1479                                data.view_number()
1480                            );
1481                            return None;
1482                        }
1483                        let vid_share_proposal = Proposal {
1484                            data,
1485                            signature: proposal.signature,
1486                            _pd: proposal._pd,
1487                        };
1488                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1489                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
1490                                vid_share_proposal,
1491                            )),
1492                        )))
1493                    },
1494                };
1495                Some((sender, message, TransmitType::Direct(to)))
1496            },
1497            HotShotEvent::HighQcSend(quorum_cert, next_epoch_qc, leader, sender) => Some((
1498                sender,
1499                MessageKind::Consensus(SequencingMessage::General(
1500                    GeneralConsensusMessage::HighQc(quorum_cert, next_epoch_qc),
1501                )),
1502                TransmitType::Direct(leader),
1503            )),
1504            HotShotEvent::EpochRootQcSend(epoch_root_qc, sender, leader) => {
1505                let message = if self
1506                    .upgrade_lock
1507                    .proposal2_version(epoch_root_qc.view_number())
1508                    .await
1509                {
1510                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1511                        GeneralConsensusMessage::EpochRootQc(epoch_root_qc),
1512                    ))
1513                } else {
1514                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1515                        GeneralConsensusMessage::EpochRootQcV1(epoch_root_qc.into()),
1516                    ))
1517                };
1518
1519                Some((sender, message, TransmitType::Direct(leader)))
1520            },
1521            HotShotEvent::ExtendedQcSend(quorum_cert, next_epoch_qc, sender) => Some((
1522                sender,
1523                MessageKind::Consensus(SequencingMessage::General(
1524                    GeneralConsensusMessage::ExtendedQc(quorum_cert, next_epoch_qc),
1525                )),
1526                TransmitType::Broadcast,
1527            )),
1528            _ => None,
1529        }
1530    }
1531
1532    /// Creates a network message and spawns a task that transmits it on the wire.
1533    async fn spawn_transmit_task(
1534        &mut self,
1535        message_kind: MessageKind<TYPES>,
1536        maybe_action: Option<HotShotAction>,
1537        transmit: TransmitType<TYPES>,
1538        sender: TYPES::SignatureKey,
1539    ) {
1540        let broadcast_delay = match &message_kind {
1541            MessageKind::Consensus(
1542                SequencingMessage::General(GeneralConsensusMessage::Vote(_))
1543                | SequencingMessage::Da(_),
1544            ) => BroadcastDelay::View(*message_kind.view_number()),
1545            _ => BroadcastDelay::None,
1546        };
1547        let message = Message {
1548            sender,
1549            kind: message_kind,
1550        };
1551        let view_number = message.kind.view_number();
1552        let epoch = message.kind.epoch();
1553        let committee_topic = Topic::Global;
1554        let Ok(mem) = self
1555            .membership_coordinator
1556            .stake_table_for_epoch(self.epoch)
1557            .await
1558        else {
1559            return;
1560        };
1561        let da_committee = mem.da_committee_members(view_number).await;
1562        let network = Arc::clone(&self.network);
1563        let storage = self.storage.clone();
1564        let storage_metrics = Arc::clone(&self.storage_metrics);
1565        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
1566        let upgrade_lock = self.upgrade_lock.clone();
1567        let handle = spawn(async move {
1568            if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
1569                maybe_action,
1570                storage.clone(),
1571                consensus,
1572                view_number,
1573                epoch,
1574            )
1575            .await
1576            .is_err()
1577            {
1578                return;
1579            }
1580            if let MessageKind::Consensus(SequencingMessage::General(
1581                GeneralConsensusMessage::Proposal(prop),
1582            )) = &message.kind
1583            {
1584                let now = Instant::now();
1585                if storage
1586                    .append_proposal2(&convert_proposal(prop.clone()))
1587                    .await
1588                    .is_err()
1589                {
1590                    return;
1591                }
1592                storage_metrics
1593                    .append_quorum_duration
1594                    .add_point(now.elapsed().as_secs_f64());
1595            }
1596
1597            let serialized_message = match upgrade_lock.serialize(&message).await {
1598                Ok(serialized) => serialized,
1599                Err(e) => {
1600                    tracing::error!("Failed to serialize message: {e}");
1601                    return;
1602                },
1603            };
1604
1605            let transmit_result = match transmit {
1606                TransmitType::Direct(recipient) => {
1607                    network
1608                        .direct_message(view_number.u64().into(), serialized_message, recipient)
1609                        .await
1610                },
1611                TransmitType::Broadcast => {
1612                    network
1613                        .broadcast_message(
1614                            view_number.u64().into(),
1615                            serialized_message,
1616                            committee_topic,
1617                            broadcast_delay,
1618                        )
1619                        .await
1620                },
1621                TransmitType::DaCommitteeBroadcast => {
1622                    network
1623                        .da_broadcast_message(
1624                            view_number.u64().into(),
1625                            serialized_message,
1626                            da_committee.iter().cloned().collect(),
1627                            broadcast_delay,
1628                        )
1629                        .await
1630                },
1631            };
1632
1633            match transmit_result {
1634                Ok(()) => {},
1635                Err(e) => tracing::warn!("Failed to send message task: {e:?}"),
1636            }
1637        });
1638        self.transmit_tasks
1639            .entry(view_number)
1640            .or_default()
1641            .push(handle);
1642    }
1643}
1644
1645/// A module with test helpers
1646pub mod test {
1647    use std::ops::{Deref, DerefMut};
1648
1649    use async_trait::async_trait;
1650
1651    use super::{
1652        Arc, ConnectedNetwork, HotShotEvent, MessageKind, NetworkEventTaskState, NodeType,
1653        Receiver, Result, Sender, Storage, TaskState, TransmitType, Versions,
1654    };
1655
1656    /// A dynamic type alias for a function that takes the result of `NetworkEventTaskState::parse_event`
1657    /// and changes it before transmitting on the network.
1658    pub type ModifierClosure<TYPES> = dyn Fn(
1659            &mut <TYPES as NodeType>::SignatureKey,
1660            &mut MessageKind<TYPES>,
1661            &mut TransmitType<TYPES>,
1662            &<TYPES as NodeType>::Membership,
1663        ) + Send
1664        + Sync;
1665
1666    /// A helper wrapper around `NetworkEventTaskState` that can modify its behaviour for tests
1667    pub struct NetworkEventTaskStateModifier<
1668        TYPES: NodeType,
1669        V: Versions,
1670        NET: ConnectedNetwork<TYPES::SignatureKey>,
1671        S: Storage<TYPES>,
1672    > {
1673        /// The real `NetworkEventTaskState`
1674        pub network_event_task_state: NetworkEventTaskState<TYPES, V, NET, S>,
1675        /// A function that takes the result of `NetworkEventTaskState::parse_event` and
1676        /// changes it before transmitting on the network.
1677        pub modifier: Arc<ModifierClosure<TYPES>>,
1678    }
1679
1680    impl<
1681            TYPES: NodeType,
1682            V: Versions,
1683            NET: ConnectedNetwork<TYPES::SignatureKey>,
1684            S: Storage<TYPES> + 'static,
1685        > NetworkEventTaskStateModifier<TYPES, V, NET, S>
1686    {
1687        /// Handles the received event modifying it before sending on the network.
1688        pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
1689            let mut maybe_action = None;
1690            if let Some((mut sender, mut message_kind, mut transmit)) =
1691                self.parse_event(event, &mut maybe_action).await
1692            {
1693                // Modify the values acquired by parsing the event.
1694                (self.modifier)(
1695                    &mut sender,
1696                    &mut message_kind,
1697                    &mut transmit,
1698                    &*self.membership_coordinator.membership().read().await,
1699                );
1700
1701                self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
1702                    .await;
1703            }
1704        }
1705    }
1706
1707    #[async_trait]
1708    impl<
1709            TYPES: NodeType,
1710            V: Versions,
1711            NET: ConnectedNetwork<TYPES::SignatureKey>,
1712            S: Storage<TYPES> + 'static,
1713        > TaskState for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1714    {
1715        type Event = HotShotEvent<TYPES>;
1716
1717        async fn handle_event(
1718            &mut self,
1719            event: Arc<Self::Event>,
1720            _sender: &Sender<Arc<Self::Event>>,
1721            _receiver: &Receiver<Arc<Self::Event>>,
1722        ) -> Result<()> {
1723            self.handle(event).await;
1724
1725            Ok(())
1726        }
1727
1728        fn cancel_subtasks(&mut self) {}
1729    }
1730
1731    impl<
1732            TYPES: NodeType,
1733            V: Versions,
1734            NET: ConnectedNetwork<TYPES::SignatureKey>,
1735            S: Storage<TYPES>,
1736        > Deref for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1737    {
1738        type Target = NetworkEventTaskState<TYPES, V, NET, S>;
1739
1740        fn deref(&self) -> &Self::Target {
1741            &self.network_event_task_state
1742        }
1743    }
1744
1745    impl<
1746            TYPES: NodeType,
1747            V: Versions,
1748            NET: ConnectedNetwork<TYPES::SignatureKey>,
1749            S: Storage<TYPES>,
1750        > DerefMut for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1751    {
1752        fn deref_mut(&mut self) -> &mut Self::Target {
1753            &mut self.network_event_task_state
1754        }
1755    }
1756}