hotshot_task_impls/
network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap},
9    hash::{DefaultHasher, Hash},
10    sync::Arc,
11};
12
13use async_broadcast::{Receiver, Sender};
14use async_trait::async_trait;
15use hotshot_task::task::TaskState;
16use hotshot_types::{
17    consensus::OuterConsensus,
18    data::{EpochNumber, VidDisperse, VidDisperseShare, ViewNumber},
19    epoch_membership::EpochMembershipCoordinator,
20    event::{Event, EventType, HotShotAction},
21    message::{
22        DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message, MessageKind, Proposal,
23        SequencingMessage, UpgradeLock, convert_proposal,
24    },
25    simple_vote::HasEpoch,
26    storage_metrics::StorageMetricsValue,
27    traits::{
28        network::{
29            BroadcastDelay, ConnectedNetwork, RequestKind, ResponseMessage, Topic, TransmitType,
30            ViewMessage,
31        },
32        node_implementation::NodeType,
33        storage::Storage,
34    },
35    vote::{HasViewNumber, Vote},
36};
37use hotshot_utils::anytrace::*;
38use tokio::{spawn, task::JoinHandle, time::Instant};
39use tracing::instrument;
40
41use crate::{
42    events::{HotShotEvent, HotShotTaskCompleted},
43    helpers::broadcast_event,
44};
45
46/// the network message task state
47#[derive(Clone)]
48pub struct NetworkMessageTaskState<TYPES: NodeType> {
49    /// Sender to send internal events this task generates to other tasks
50    pub internal_event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
51
52    /// Sender to send external events this task generates to the event stream
53    pub external_event_stream: Sender<Event<TYPES>>,
54
55    /// This nodes public key
56    pub public_key: TYPES::SignatureKey,
57
58    /// Lock for a decided upgrade
59    pub upgrade_lock: UpgradeLock<TYPES>,
60
61    /// Node's id
62    pub id: u64,
63}
64
65impl<TYPES: NodeType> NetworkMessageTaskState<TYPES> {
66    #[instrument(skip_all, name = "Network message task", fields(id = self.id), level = "trace")]
67    /// Handles a (deserialized) message from the network
68    pub async fn handle_message(&mut self, message: Message<TYPES>) {
69        match &message.kind {
70            MessageKind::Consensus(_) => {
71                tracing::debug!("Received consensus message from network: {:?}", message)
72            },
73            MessageKind::Data(_) => {
74                tracing::trace!("Received data message from network: {:?}", message)
75            },
76            MessageKind::External(_) => {
77                tracing::trace!("Received external message from network: {:?}", message)
78            },
79        }
80
81        // Match the message kind and send the appropriate event to the internal event stream
82        let sender = message.sender;
83        match message.kind {
84            // Handle consensus messages
85            MessageKind::Consensus(consensus_message) => {
86                let event = match consensus_message {
87                    SequencingMessage::General(general_message) => match general_message {
88                        GeneralConsensusMessage::Proposal(proposal) => {
89                            if self
90                                .upgrade_lock
91                                .epochs_enabled(proposal.data.view_number())
92                            {
93                                tracing::warn!(
94                                    "received GeneralConsensusMessage::Proposal for view {} but \
95                                     epochs are enabled for that view",
96                                    proposal.data.view_number()
97                                );
98                                return;
99                            }
100                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
101                        },
102                        GeneralConsensusMessage::Proposal2Legacy(proposal) => {
103                            if !self
104                                .upgrade_lock
105                                .proposal2_legacy_version(proposal.data.view_number())
106                            {
107                                tracing::warn!(
108                                    "received GeneralConsensusMessage::Proposal2Legacy for view \
109                                     {} but we are in the wrong version for that message type",
110                                    proposal.data.view_number()
111                                );
112                                return;
113                            }
114                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
115                        },
116                        GeneralConsensusMessage::Proposal2(proposal) => {
117                            if !self
118                                .upgrade_lock
119                                .proposal2_version(proposal.data.view_number())
120                            {
121                                tracing::warn!(
122                                    "received GeneralConsensusMessage::Proposal2 for view {} but \
123                                     we are in the wrong version for that message type",
124                                    proposal.data.view_number()
125                                );
126                                return;
127                            }
128                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
129                        },
130                        GeneralConsensusMessage::ProposalRequested(req, sig) => {
131                            HotShotEvent::QuorumProposalRequestRecv(req, sig)
132                        },
133                        GeneralConsensusMessage::ProposalResponse(proposal) => {
134                            if self
135                                .upgrade_lock
136                                .epochs_enabled(proposal.data.view_number())
137                            {
138                                tracing::warn!(
139                                    "received GeneralConsensusMessage::ProposalResponse for view \
140                                     {} but we are in the wrong version for that message type",
141                                    proposal.data.view_number()
142                                );
143                                return;
144                            }
145                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
146                        },
147                        GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
148                            if !self
149                                .upgrade_lock
150                                .proposal2_legacy_version(proposal.data.view_number())
151                            {
152                                tracing::warn!(
153                                    "received GeneralConsensusMessage::ProposalResponse2Legacy \
154                                     for view {} but we are in the wrong version for that message \
155                                     type",
156                                    proposal.data.view_number()
157                                );
158                                return;
159                            }
160                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
161                        },
162                        GeneralConsensusMessage::ProposalResponse2(proposal) => {
163                            if !self
164                                .upgrade_lock
165                                .proposal2_version(proposal.data.view_number())
166                            {
167                                tracing::warn!(
168                                    "received GeneralConsensusMessage::ProposalResponse2 for view \
169                                     {} but epochs are not enabled for that view",
170                                    proposal.data.view_number()
171                                );
172                                return;
173                            }
174                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
175                        },
176                        GeneralConsensusMessage::Vote(vote) => {
177                            if self.upgrade_lock.epochs_enabled(vote.view_number()) {
178                                tracing::warn!(
179                                    "received GeneralConsensusMessage::Vote for view {} but \
180                                     epochs are enabled for that view",
181                                    vote.view_number()
182                                );
183                                return;
184                            }
185                            HotShotEvent::QuorumVoteRecv(vote.to_vote2())
186                        },
187                        GeneralConsensusMessage::Vote2(vote) => {
188                            if !self.upgrade_lock.epochs_enabled(vote.view_number()) {
189                                tracing::warn!(
190                                    "received GeneralConsensusMessage::Vote2 for view {} but \
191                                     epochs are not enabled for that view",
192                                    vote.view_number()
193                                );
194                                return;
195                            }
196                            HotShotEvent::QuorumVoteRecv(vote)
197                        },
198                        GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => {
199                            if self
200                                .upgrade_lock
201                                .epochs_enabled(view_sync_message.view_number())
202                            {
203                                tracing::warn!(
204                                    "received GeneralConsensusMessage::ViewSyncPreCommitVote for \
205                                     view {} but epochs are enabled for that view",
206                                    view_sync_message.view_number()
207                                );
208                                return;
209                            }
210                            HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message.to_vote2())
211                        },
212                        GeneralConsensusMessage::ViewSyncPreCommitVote2(view_sync_message) => {
213                            if !self
214                                .upgrade_lock
215                                .epochs_enabled(view_sync_message.view_number())
216                            {
217                                tracing::warn!(
218                                    "received GeneralConsensusMessage::ViewSyncPreCommitVote2 for \
219                                     view {} but epochs are not enabled for that view",
220                                    view_sync_message.view_number()
221                                );
222                                return;
223                            }
224                            HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message)
225                        },
226                        GeneralConsensusMessage::ViewSyncPreCommitCertificate(
227                            view_sync_message,
228                        ) => {
229                            if self
230                                .upgrade_lock
231                                .epochs_enabled(view_sync_message.view_number())
232                            {
233                                tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
234                                return;
235                            }
236                            HotShotEvent::ViewSyncPreCommitCertificateRecv(
237                                view_sync_message.to_vsc2(),
238                            )
239                        },
240                        GeneralConsensusMessage::ViewSyncPreCommitCertificate2(
241                            view_sync_message,
242                        ) => {
243                            if !self
244                                .upgrade_lock
245                                .epochs_enabled(view_sync_message.view_number())
246                            {
247                                tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
248                                return;
249                            }
250                            HotShotEvent::ViewSyncPreCommitCertificateRecv(view_sync_message)
251                        },
252                        GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => {
253                            if self
254                                .upgrade_lock
255                                .epochs_enabled(view_sync_message.view_number())
256                            {
257                                tracing::warn!(
258                                    "received GeneralConsensusMessage::ViewSyncCommitVote for \
259                                     view {} but epochs are enabled for that view",
260                                    view_sync_message.view_number()
261                                );
262                                return;
263                            }
264                            HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message.to_vote2())
265                        },
266                        GeneralConsensusMessage::ViewSyncCommitVote2(view_sync_message) => {
267                            if !self
268                                .upgrade_lock
269                                .epochs_enabled(view_sync_message.view_number())
270                            {
271                                tracing::warn!(
272                                    "received GeneralConsensusMessage::ViewSyncCommitVote2 for \
273                                     view {} but epochs are not enabled for that view",
274                                    view_sync_message.view_number()
275                                );
276                                return;
277                            }
278                            HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message)
279                        },
280                        GeneralConsensusMessage::ViewSyncCommitCertificate(view_sync_message) => {
281                            if self
282                                .upgrade_lock
283                                .epochs_enabled(view_sync_message.view_number())
284                            {
285                                tracing::warn!(
286                                    "received GeneralConsensusMessage::ViewSyncCommitCertificate \
287                                     for view {} but epochs are enabled for that view",
288                                    view_sync_message.view_number()
289                                );
290                                return;
291                            }
292                            HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message.to_vsc2())
293                        },
294                        GeneralConsensusMessage::ViewSyncCommitCertificate2(view_sync_message) => {
295                            if !self
296                                .upgrade_lock
297                                .epochs_enabled(view_sync_message.view_number())
298                            {
299                                tracing::warn!(
300                                    "received GeneralConsensusMessage::ViewSyncCommitCertificate2 \
301                                     for view {} but epochs are not enabled for that view",
302                                    view_sync_message.view_number()
303                                );
304                                return;
305                            }
306                            HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message)
307                        },
308                        GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => {
309                            if self
310                                .upgrade_lock
311                                .epochs_enabled(view_sync_message.view_number())
312                            {
313                                tracing::warn!(
314                                    "received GeneralConsensusMessage::ViewSyncFinalizeVote for \
315                                     view {} but epochs are enabled for that view",
316                                    view_sync_message.view_number()
317                                );
318                                return;
319                            }
320                            HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message.to_vote2())
321                        },
322                        GeneralConsensusMessage::ViewSyncFinalizeVote2(view_sync_message) => {
323                            if !self
324                                .upgrade_lock
325                                .epochs_enabled(view_sync_message.view_number())
326                            {
327                                tracing::warn!(
328                                    "received GeneralConsensusMessage::ViewSyncFinalizeVote2 for \
329                                     view {} but epochs are not enabled for that view",
330                                    view_sync_message.view_number()
331                                );
332                                return;
333                            }
334                            HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message)
335                        },
336                        GeneralConsensusMessage::ViewSyncFinalizeCertificate(view_sync_message) => {
337                            if self
338                                .upgrade_lock
339                                .epochs_enabled(view_sync_message.view_number())
340                            {
341                                tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
342                                return;
343                            }
344                            HotShotEvent::ViewSyncFinalizeCertificateRecv(
345                                view_sync_message.to_vsc2(),
346                            )
347                        },
348                        GeneralConsensusMessage::ViewSyncFinalizeCertificate2(
349                            view_sync_message,
350                        ) => {
351                            if !self
352                                .upgrade_lock
353                                .epochs_enabled(view_sync_message.view_number())
354                            {
355                                tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
356                                return;
357                            }
358                            HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_message)
359                        },
360                        GeneralConsensusMessage::TimeoutVote(message) => {
361                            if self.upgrade_lock.epochs_enabled(message.view_number()) {
362                                tracing::warn!(
363                                    "received GeneralConsensusMessage::TimeoutVote for view {} \
364                                     but epochs are enabled for that view",
365                                    message.view_number()
366                                );
367                                return;
368                            }
369                            HotShotEvent::TimeoutVoteRecv(message.to_vote2())
370                        },
371                        GeneralConsensusMessage::TimeoutVote2(message) => {
372                            if !self.upgrade_lock.epochs_enabled(message.view_number()) {
373                                tracing::warn!(
374                                    "received GeneralConsensusMessage::TimeoutVote2 for view {} \
375                                     but epochs are not enabled for that view",
376                                    message.view_number()
377                                );
378                                return;
379                            }
380                            HotShotEvent::TimeoutVoteRecv(message)
381                        },
382                        GeneralConsensusMessage::UpgradeProposal(message) => {
383                            HotShotEvent::UpgradeProposalRecv(message, sender)
384                        },
385                        GeneralConsensusMessage::UpgradeVote(message) => {
386                            tracing::error!("Received upgrade vote!");
387                            HotShotEvent::UpgradeVoteRecv(message)
388                        },
389                        GeneralConsensusMessage::HighQc(qc, next_qc) => {
390                            HotShotEvent::HighQcRecv(qc, next_qc, sender)
391                        },
392                        GeneralConsensusMessage::ExtendedQc(qc, next_epoch_qc) => {
393                            HotShotEvent::ExtendedQcRecv(qc, next_epoch_qc, sender)
394                        },
395                        GeneralConsensusMessage::EpochRootQuorumVote(vote) => {
396                            if !self
397                                .upgrade_lock
398                                .proposal2_legacy_version(vote.view_number())
399                            {
400                                tracing::warn!(
401                                    "received GeneralConsensusMessage::EpochRootQuorumVote for \
402                                     view {} but we do not expect this message in this version",
403                                    vote.view_number()
404                                );
405                                return;
406                            }
407                            HotShotEvent::EpochRootQuorumVoteRecv(vote.to_vote2())
408                        },
409                        GeneralConsensusMessage::EpochRootQuorumVote2(vote) => {
410                            if !self.upgrade_lock.proposal2_version(vote.view_number()) {
411                                tracing::warn!(
412                                    "received GeneralConsensusMessage::EpochRootQuorumVote2 for \
413                                     view {} but we do not expect this message in this version",
414                                    vote.view_number()
415                                );
416                                return;
417                            }
418                            HotShotEvent::EpochRootQuorumVoteRecv(vote)
419                        },
420                        GeneralConsensusMessage::EpochRootQc(root_qc) => {
421                            if !self.upgrade_lock.proposal2_version(root_qc.view_number()) {
422                                tracing::warn!(
423                                    "received GeneralConsensusMessage::EpochRootQc for view {} \
424                                     but we are in the wrong version for that message types",
425                                    root_qc.view_number()
426                                );
427                                return;
428                            }
429                            HotShotEvent::EpochRootQcRecv(root_qc, sender)
430                        },
431                        GeneralConsensusMessage::EpochRootQcV1(root_qc) => {
432                            if !self
433                                .upgrade_lock
434                                .proposal2_legacy_version(root_qc.view_number())
435                            {
436                                tracing::warn!(
437                                    "received GeneralConsensusMessage::EpochRootQcV1 for view {} \
438                                     but we are in the wrong version for that message type",
439                                    root_qc.view_number()
440                                );
441                                return;
442                            }
443                            HotShotEvent::EpochRootQcRecv(root_qc.into(), sender)
444                        },
445                    },
446                    SequencingMessage::Da(da_message) => match da_message {
447                        DaConsensusMessage::DaProposal(proposal) => {
448                            if self
449                                .upgrade_lock
450                                .epochs_enabled(proposal.data.view_number())
451                            {
452                                tracing::warn!(
453                                    "received DaConsensusMessage::DaProposal for view {} but \
454                                     epochs are enabled for that view",
455                                    proposal.data.view_number()
456                                );
457                                return;
458                            }
459                            HotShotEvent::DaProposalRecv(convert_proposal(proposal), sender)
460                        },
461                        DaConsensusMessage::DaProposal2(proposal) => {
462                            if !self
463                                .upgrade_lock
464                                .epochs_enabled(proposal.data.view_number())
465                            {
466                                tracing::warn!(
467                                    "received DaConsensusMessage::DaProposal2 for view {} but \
468                                     epochs are not enabled for that view",
469                                    proposal.data.view_number()
470                                );
471                                return;
472                            }
473                            HotShotEvent::DaProposalRecv(proposal, sender)
474                        },
475                        DaConsensusMessage::DaVote(vote) => {
476                            if self.upgrade_lock.epochs_enabled(vote.view_number()) {
477                                tracing::warn!(
478                                    "received DaConsensusMessage::DaVote for view {} but epochs \
479                                     are enabled for that view",
480                                    vote.view_number()
481                                );
482                                return;
483                            }
484                            HotShotEvent::DaVoteRecv(vote.clone().to_vote2())
485                        },
486                        DaConsensusMessage::DaVote2(vote) => {
487                            if !self.upgrade_lock.epochs_enabled(vote.view_number()) {
488                                tracing::warn!(
489                                    "received DaConsensusMessage::DaVote2 for view {} but epochs \
490                                     are not enabled for that view",
491                                    vote.view_number()
492                                );
493                                return;
494                            }
495                            HotShotEvent::DaVoteRecv(vote.clone())
496                        },
497                        DaConsensusMessage::DaCertificate(cert) => {
498                            if self.upgrade_lock.epochs_enabled(cert.view_number()) {
499                                tracing::warn!(
500                                    "received DaConsensusMessage::DaCertificate for view {} but \
501                                     epochs are enabled for that view",
502                                    cert.view_number()
503                                );
504                                return;
505                            }
506                            HotShotEvent::DaCertificateRecv(cert.to_dac2())
507                        },
508                        DaConsensusMessage::DaCertificate2(cert) => {
509                            if !self.upgrade_lock.epochs_enabled(cert.view_number()) {
510                                tracing::warn!(
511                                    "received DaConsensusMessage::DaCertificate2 for view {} but \
512                                     epochs are not enabled for that view",
513                                    cert.view_number()
514                                );
515                                return;
516                            }
517                            HotShotEvent::DaCertificateRecv(cert)
518                        },
519                        DaConsensusMessage::VidDisperseMsg(proposal) => {
520                            if self
521                                .upgrade_lock
522                                .epochs_enabled(proposal.data.view_number())
523                            {
524                                tracing::warn!(
525                                    "received DaConsensusMessage::VidDisperseMsg for view {} but \
526                                     epochs are enabled for that view",
527                                    proposal.data.view_number()
528                                );
529                                return;
530                            }
531                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
532                        },
533                        DaConsensusMessage::VidDisperseMsg1(proposal) => {
534                            if !self
535                                .upgrade_lock
536                                .epochs_enabled(proposal.data.view_number())
537                            {
538                                tracing::warn!(
539                                    "received DaConsensusMessage::VidDisperseMsg1 for view {} but \
540                                     epochs are not enabled for that view",
541                                    proposal.data.view_number()
542                                );
543                                return;
544                            }
545                            if self.upgrade_lock.upgraded_vid2(proposal.data.view_number()) {
546                                tracing::warn!(
547                                    "received DaConsensusMessage::VidDisperseMsg1 for view {} but \
548                                     vid2 upgrade is enabled for that view",
549                                    proposal.data.view_number()
550                                );
551                                return;
552                            }
553                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
554                        },
555                        DaConsensusMessage::VidDisperseMsg2(proposal) => {
556                            if !self.upgrade_lock.upgraded_vid2(proposal.data.view_number()) {
557                                tracing::warn!(
558                                    "received DaConsensusMessage::VidDisperseMsg2 for view {} but \
559                                     vid2 upgrade is not enabled for that view",
560                                    proposal.data.view_number()
561                                );
562                                return;
563                            }
564                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
565                        },
566                    },
567                };
568                broadcast_event(Arc::new(event), &self.internal_event_stream).await;
569            },
570
571            // Handle data messages
572            MessageKind::Data(message) => match message {
573                DataMessage::SubmitTransaction(transaction, _) => {
574                    let mut hasher = DefaultHasher::new();
575                    transaction.hash(&mut hasher);
576                    broadcast_event(
577                        Arc::new(HotShotEvent::TransactionsRecv(vec![transaction])),
578                        &self.internal_event_stream,
579                    )
580                    .await;
581                },
582                DataMessage::DataResponse(response) => {
583                    if let ResponseMessage::Found(message) = response {
584                        match message {
585                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(proposal)) => {
586                                broadcast_event(
587                                    Arc::new(HotShotEvent::VidResponseRecv(
588                                        sender,
589                                        convert_proposal(proposal),
590                                    )),
591                                    &self.internal_event_stream,
592                                )
593                                .await;
594                            },
595                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg1(
596                                proposal,
597                            )) => {
598                                broadcast_event(
599                                    Arc::new(HotShotEvent::VidResponseRecv(
600                                        sender,
601                                        convert_proposal(proposal),
602                                    )),
603                                    &self.internal_event_stream,
604                                )
605                                .await;
606                            },
607                            _ => {},
608                        }
609                    }
610                },
611                DataMessage::RequestData(data) => {
612                    let req_data = data.clone();
613                    if let RequestKind::Vid(_view_number, _key) = req_data.request {
614                        broadcast_event(
615                            Arc::new(HotShotEvent::VidRequestRecv(data, sender)),
616                            &self.internal_event_stream,
617                        )
618                        .await;
619                    }
620                },
621            },
622
623            // Handle external messages
624            MessageKind::External(data) => {
625                if sender == self.public_key {
626                    return;
627                }
628                // Send the external message to the external event stream so it can be processed
629                broadcast_event(
630                    Event {
631                        view_number: ViewNumber::new(1),
632                        event: EventType::ExternalMessageReceived { sender, data },
633                    },
634                    &self.external_event_stream,
635                )
636                .await;
637            },
638        }
639    }
640}
641
642/// network event task state
643pub struct NetworkEventTaskState<
644    TYPES: NodeType,
645    NET: ConnectedNetwork<TYPES::SignatureKey>,
646    S: Storage<TYPES>,
647> {
648    /// comm network
649    pub network: Arc<NET>,
650
651    /// view number
652    pub view: ViewNumber,
653
654    /// epoch number
655    pub epoch: Option<EpochNumber>,
656
657    /// network memberships
658    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
659
660    /// Storage to store actionable events
661    pub storage: S,
662
663    /// Storage metrics
664    pub storage_metrics: Arc<StorageMetricsValue>,
665
666    /// Shared consensus state
667    pub consensus: OuterConsensus<TYPES>,
668
669    /// Lock for a decided upgrade
670    pub upgrade_lock: UpgradeLock<TYPES>,
671
672    /// map view number to transmit tasks
673    pub transmit_tasks: BTreeMap<ViewNumber, Vec<JoinHandle<()>>>,
674
675    /// Number of blocks in an epoch, zero means there are no epochs
676    pub epoch_height: u64,
677
678    /// Node's id
679    pub id: u64,
680}
681
682#[async_trait]
683impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
684    TaskState for NetworkEventTaskState<TYPES, NET, S>
685{
686    type Event = HotShotEvent<TYPES>;
687
688    async fn handle_event(
689        &mut self,
690        event: Arc<Self::Event>,
691        _sender: &Sender<Arc<Self::Event>>,
692        _receiver: &Receiver<Arc<Self::Event>>,
693    ) -> Result<()> {
694        self.handle(event).await;
695
696        Ok(())
697    }
698
699    fn cancel_subtasks(&mut self) {}
700}
701
702impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
703    NetworkEventTaskState<TYPES, NET, S>
704{
705    /// Handle the given event.
706    ///
707    /// Returns the completion status.
708    #[instrument(skip_all, fields(id = self.id, view = *self.view), name = "Network Task", level = "error")]
709    pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
710        let mut maybe_action = None;
711        if let Some((sender, message_kind, transmit)) =
712            self.parse_event(event, &mut maybe_action).await
713        {
714            self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
715                .await;
716        };
717    }
718
719    /// handle `VidDisperseSend`
720    async fn handle_vid_disperse_proposal(
721        &self,
722        vid_proposal: Proposal<TYPES, VidDisperse<TYPES>>,
723        sender: &<TYPES as NodeType>::SignatureKey,
724    ) -> Option<HotShotTaskCompleted> {
725        let view = vid_proposal.data.view_number();
726        let epoch = vid_proposal.data.epoch();
727        let vid_share_proposals = VidDisperse::to_share_proposals(vid_proposal);
728        let mut messages = HashMap::new();
729
730        for proposal in vid_share_proposals {
731            let recipient = proposal.data.recipient_key().clone();
732            let epochs_enabled = self
733                .upgrade_lock
734                .epochs_enabled(proposal.data.view_number());
735            let upgraded_vid2 = self.upgrade_lock.upgraded_vid2(proposal.data.view_number());
736            let message = if !epochs_enabled {
737                let vid_share_proposal = if let VidDisperseShare::V0(data) = proposal.data {
738                    Proposal {
739                        data,
740                        signature: proposal.signature,
741                        _pd: proposal._pd,
742                    }
743                } else {
744                    tracing::warn!(
745                        "Epochs are not enabled for view {} but didn't receive VidDisperseShare1",
746                        proposal.data.view_number()
747                    );
748                    return None;
749                };
750                Message {
751                    sender: sender.clone(),
752                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
753                        DaConsensusMessage::VidDisperseMsg(vid_share_proposal),
754                    )),
755                }
756            } else if !upgraded_vid2 {
757                let vid_share_proposal = if let VidDisperseShare::V1(data) = proposal.data {
758                    Proposal {
759                        data,
760                        signature: proposal.signature,
761                        _pd: proposal._pd,
762                    }
763                } else {
764                    tracing::warn!(
765                        "Epochs are enabled and Vid2Upgrade is not enabled for view {} but didn't \
766                         receive VidDisperseShare2",
767                        proposal.data.view_number()
768                    );
769                    return None;
770                };
771                Message {
772                    sender: sender.clone(),
773                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
774                        DaConsensusMessage::VidDisperseMsg1(vid_share_proposal),
775                    )),
776                }
777            } else {
778                let vid_share_proposal = if let VidDisperseShare::V2(data) = proposal.data {
779                    Proposal {
780                        data,
781                        signature: proposal.signature,
782                        _pd: proposal._pd,
783                    }
784                } else {
785                    tracing::warn!(
786                        "Vid2Upgrade is enabled for view {} but didn't receive VidDisperseShare2",
787                        proposal.data.view_number()
788                    );
789                    return None;
790                };
791                Message {
792                    sender: sender.clone(),
793                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
794                        DaConsensusMessage::VidDisperseMsg2(vid_share_proposal),
795                    )),
796                }
797            };
798            let view = message.view_number();
799            let serialized_message = match self.upgrade_lock.serialize(&message) {
800                Ok(serialized) => serialized,
801                Err(e) => {
802                    tracing::error!("Failed to serialize message: {e}");
803                    continue;
804                },
805            };
806
807            messages.insert(recipient, (view.u64().into(), serialized_message));
808        }
809
810        let net = Arc::clone(&self.network);
811        let storage = self.storage.clone();
812        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
813        spawn(async move {
814            if NetworkEventTaskState::<TYPES, NET, S>::maybe_record_action(
815                Some(HotShotAction::VidDisperse),
816                storage,
817                consensus,
818                view,
819                epoch,
820            )
821            .await
822            .is_err()
823            {
824                return;
825            }
826            match net.vid_broadcast_message(messages).await {
827                Ok(()) => {},
828                Err(e) => tracing::warn!("Failed to send message from network task: {e:?}"),
829            }
830        });
831
832        None
833    }
834
835    /// Record `HotShotAction` if available
836    async fn maybe_record_action(
837        maybe_action: Option<HotShotAction>,
838        storage: S,
839        consensus: OuterConsensus<TYPES>,
840        view: ViewNumber,
841        epoch: Option<EpochNumber>,
842    ) -> std::result::Result<(), ()> {
843        if let Some(mut action) = maybe_action {
844            if !consensus.write().await.update_action(action, view) {
845                return Err(());
846            }
847            // If the action was view sync record it as a vote, but we don't
848            // want to limit to 1 View sync vote above so change the action here.
849            if matches!(action, HotShotAction::ViewSyncVote) {
850                action = HotShotAction::Vote;
851            }
852            match storage.record_action(view, epoch, action).await {
853                Ok(()) => Ok(()),
854                Err(e) => {
855                    tracing::warn!("Not Sending {action:?} because of storage error: {e:?}");
856                    Err(())
857                },
858            }
859        } else {
860            Ok(())
861        }
862    }
863
864    /// Cancel all tasks for previous views
865    pub fn cancel_tasks(&mut self, view: ViewNumber) {
866        let keep = self.transmit_tasks.split_off(&view);
867
868        while let Some((_, tasks)) = self.transmit_tasks.pop_first() {
869            for task in tasks {
870                task.abort();
871            }
872        }
873
874        self.transmit_tasks = keep;
875    }
876
877    /// Parses a `HotShotEvent` and returns a tuple of: (sender's public key, `MessageKind`, `TransmitType`)
878    /// which will be used to create a message and transmit on the wire.
879    /// Returns `None` if the parsing result should not be sent on the wire.
880    /// Handles the `VidDisperseSend` event separately using a helper method.
881    #[allow(clippy::too_many_lines)]
882    async fn parse_event(
883        &mut self,
884        event: Arc<HotShotEvent<TYPES>>,
885        maybe_action: &mut Option<HotShotAction>,
886    ) -> Option<(
887        <TYPES as NodeType>::SignatureKey,
888        MessageKind<TYPES>,
889        TransmitType<TYPES>,
890    )> {
891        match event.as_ref().clone() {
892            HotShotEvent::QuorumProposalSend(proposal, sender) => {
893                *maybe_action = Some(HotShotAction::Propose);
894
895                let message = if self
896                    .upgrade_lock
897                    .proposal2_version(proposal.data.view_number())
898                {
899                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
900                        GeneralConsensusMessage::Proposal2(convert_proposal(proposal)),
901                    ))
902                } else if self
903                    .upgrade_lock
904                    .proposal2_legacy_version(proposal.data.view_number())
905                {
906                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
907                        GeneralConsensusMessage::Proposal2Legacy(convert_proposal(proposal)),
908                    ))
909                } else {
910                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
911                        GeneralConsensusMessage::Proposal(convert_proposal(proposal)),
912                    ))
913                };
914
915                Some((sender, message, TransmitType::Broadcast))
916            },
917
918            // ED Each network task is subscribed to all these message types.  Need filters per network task
919            HotShotEvent::QuorumVoteSend(vote) => {
920                *maybe_action = Some(HotShotAction::Vote);
921                let view_number = vote.view_number() + 1;
922                let leader = match self
923                    .membership_coordinator
924                    .membership_for_epoch(vote.epoch())
925                    .await
926                    .ok()?
927                    .leader(view_number)
928                    .await
929                {
930                    Ok(l) => l,
931                    Err(e) => {
932                        tracing::warn!(
933                            "Failed to calculate leader for view number {view_number}. Error: \
934                             {e:?}"
935                        );
936                        return None;
937                    },
938                };
939
940                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
941                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
942                        GeneralConsensusMessage::Vote2(vote.clone()),
943                    ))
944                } else {
945                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
946                        GeneralConsensusMessage::Vote(vote.clone().to_vote()),
947                    ))
948                };
949
950                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
951            },
952            HotShotEvent::EpochRootQuorumVoteSend(vote) => {
953                *maybe_action = Some(HotShotAction::Vote);
954                let view_number = vote.view_number() + 1;
955                let leader = match self
956                    .membership_coordinator
957                    .membership_for_epoch(vote.epoch())
958                    .await
959                    .ok()?
960                    .leader(view_number)
961                    .await
962                {
963                    Ok(l) => l,
964                    Err(e) => {
965                        tracing::warn!(
966                            "Failed to calculate leader for view number {:?}. Error: {:?}",
967                            view_number,
968                            e
969                        );
970                        return None;
971                    },
972                };
973
974                let message = if self.upgrade_lock.proposal2_version(vote.view_number()) {
975                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
976                        GeneralConsensusMessage::EpochRootQuorumVote2(vote.clone()),
977                    ))
978                } else if self
979                    .upgrade_lock
980                    .proposal2_legacy_version(vote.view_number())
981                {
982                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
983                        GeneralConsensusMessage::EpochRootQuorumVote(vote.clone().to_vote()),
984                    ))
985                } else {
986                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
987                        GeneralConsensusMessage::Vote(vote.vote.clone().to_vote()),
988                    ))
989                };
990
991                Some((
992                    vote.vote.signing_key(),
993                    message,
994                    TransmitType::Direct(leader),
995                ))
996            },
997            HotShotEvent::ExtendedQuorumVoteSend(vote) => {
998                *maybe_action = Some(HotShotAction::Vote);
999                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1000                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1001                        GeneralConsensusMessage::Vote2(vote.clone()),
1002                    ))
1003                } else {
1004                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1005                        GeneralConsensusMessage::Vote(vote.clone().to_vote()),
1006                    ))
1007                };
1008
1009                Some((vote.signing_key(), message, TransmitType::Broadcast))
1010            },
1011            HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
1012                req.key.clone(),
1013                MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1014                    GeneralConsensusMessage::ProposalRequested(req.clone(), signature),
1015                )),
1016                TransmitType::Broadcast,
1017            )),
1018            HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => {
1019                let message = if self
1020                    .upgrade_lock
1021                    .proposal2_version(proposal.data.view_number())
1022                {
1023                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1024                        GeneralConsensusMessage::ProposalResponse2(convert_proposal(proposal)),
1025                    ))
1026                } else if self
1027                    .upgrade_lock
1028                    .proposal2_legacy_version(proposal.data.view_number())
1029                {
1030                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1031                        GeneralConsensusMessage::ProposalResponse2Legacy(convert_proposal(
1032                            proposal,
1033                        )),
1034                    ))
1035                } else {
1036                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1037                        GeneralConsensusMessage::ProposalResponse(convert_proposal(proposal)),
1038                    ))
1039                };
1040
1041                Some((
1042                    sender_key.clone(),
1043                    message,
1044                    TransmitType::Direct(sender_key),
1045                ))
1046            },
1047            HotShotEvent::VidDisperseSend(proposal, sender) => {
1048                self.handle_vid_disperse_proposal(proposal, &sender).await;
1049                None
1050            },
1051            HotShotEvent::DaProposalSend(proposal, sender) => {
1052                *maybe_action = Some(HotShotAction::DaPropose);
1053
1054                let message = if self
1055                    .upgrade_lock
1056                    .epochs_enabled(proposal.data.view_number())
1057                {
1058                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1059                        DaConsensusMessage::DaProposal2(proposal),
1060                    ))
1061                } else {
1062                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1063                        DaConsensusMessage::DaProposal(convert_proposal(proposal)),
1064                    ))
1065                };
1066
1067                Some((sender, message, TransmitType::DaCommitteeBroadcast))
1068            },
1069            HotShotEvent::DaVoteSend(vote) => {
1070                *maybe_action = Some(HotShotAction::DaVote);
1071                let view_number = vote.view_number();
1072                let leader = match self
1073                    .membership_coordinator
1074                    .membership_for_epoch(vote.epoch())
1075                    .await
1076                    .ok()?
1077                    .leader(view_number)
1078                    .await
1079                {
1080                    Ok(l) => l,
1081                    Err(e) => {
1082                        tracing::warn!(
1083                            "Failed to calculate leader for view number {view_number}. Error: \
1084                             {e:?}"
1085                        );
1086                        return None;
1087                    },
1088                };
1089
1090                let message = if self.upgrade_lock.epochs_enabled(view_number) {
1091                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1092                        DaConsensusMessage::DaVote2(vote.clone()),
1093                    ))
1094                } else {
1095                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1096                        DaConsensusMessage::DaVote(vote.clone().to_vote()),
1097                    ))
1098                };
1099
1100                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1101            },
1102            HotShotEvent::DacSend(certificate, sender) => {
1103                *maybe_action = Some(HotShotAction::DaCert);
1104                let message = if self.upgrade_lock.epochs_enabled(certificate.view_number()) {
1105                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1106                        DaConsensusMessage::DaCertificate2(certificate),
1107                    ))
1108                } else {
1109                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1110                        DaConsensusMessage::DaCertificate(certificate.to_dac()),
1111                    ))
1112                };
1113
1114                Some((sender, message, TransmitType::Broadcast))
1115            },
1116            HotShotEvent::ViewSyncPreCommitVoteSend(vote) => {
1117                let view_number = vote.view_number() + vote.date().relay;
1118                let leader = match self
1119                    .membership_coordinator
1120                    .membership_for_epoch(self.epoch)
1121                    .await
1122                    .ok()?
1123                    .leader(view_number)
1124                    .await
1125                {
1126                    Ok(l) => l,
1127                    Err(e) => {
1128                        tracing::warn!(
1129                            "Failed to calculate leader for view number {view_number}. Error: \
1130                             {e:?}"
1131                        );
1132                        return None;
1133                    },
1134                };
1135                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1136                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1137                        GeneralConsensusMessage::ViewSyncPreCommitVote2(vote.clone()),
1138                    ))
1139                } else {
1140                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1141                        GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone().to_vote()),
1142                    ))
1143                };
1144
1145                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1146            },
1147            HotShotEvent::ViewSyncCommitVoteSend(vote) => {
1148                *maybe_action = Some(HotShotAction::ViewSyncVote);
1149                let view_number = vote.view_number() + vote.date().relay;
1150                let leader = match self
1151                    .membership_coordinator
1152                    .membership_for_epoch(self.epoch)
1153                    .await
1154                    .ok()?
1155                    .leader(view_number)
1156                    .await
1157                {
1158                    Ok(l) => l,
1159                    Err(e) => {
1160                        tracing::warn!(
1161                            "Failed to calculate leader for view number {view_number}. Error: \
1162                             {e:?}"
1163                        );
1164                        return None;
1165                    },
1166                };
1167                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1168                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1169                        GeneralConsensusMessage::ViewSyncCommitVote2(vote.clone()),
1170                    ))
1171                } else {
1172                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1173                        GeneralConsensusMessage::ViewSyncCommitVote(vote.clone().to_vote()),
1174                    ))
1175                };
1176
1177                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1178            },
1179            HotShotEvent::ViewSyncFinalizeVoteSend(vote) => {
1180                *maybe_action = Some(HotShotAction::ViewSyncVote);
1181                let view_number = vote.view_number() + vote.date().relay;
1182                let leader = match self
1183                    .membership_coordinator
1184                    .membership_for_epoch(self.epoch)
1185                    .await
1186                    .ok()?
1187                    .leader(view_number)
1188                    .await
1189                {
1190                    Ok(l) => l,
1191                    Err(e) => {
1192                        tracing::warn!(
1193                            "Failed to calculate leader for view number {view_number:?}. Error: \
1194                             {e:?}"
1195                        );
1196                        return None;
1197                    },
1198                };
1199                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1200                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1201                        GeneralConsensusMessage::ViewSyncFinalizeVote2(vote.clone()),
1202                    ))
1203                } else {
1204                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1205                        GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone().to_vote()),
1206                    ))
1207                };
1208
1209                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1210            },
1211            HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, sender) => {
1212                let view_number = certificate.view_number();
1213                let message = if self.upgrade_lock.epochs_enabled(view_number) {
1214                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1215                        GeneralConsensusMessage::ViewSyncPreCommitCertificate2(certificate),
1216                    ))
1217                } else {
1218                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1219                        GeneralConsensusMessage::ViewSyncPreCommitCertificate(certificate.to_vsc()),
1220                    ))
1221                };
1222
1223                Some((sender, message, TransmitType::Broadcast))
1224            },
1225            HotShotEvent::ViewSyncCommitCertificateSend(certificate, sender) => {
1226                let view_number = certificate.view_number();
1227                let message = if self.upgrade_lock.epochs_enabled(view_number) {
1228                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1229                        GeneralConsensusMessage::ViewSyncCommitCertificate2(certificate),
1230                    ))
1231                } else {
1232                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1233                        GeneralConsensusMessage::ViewSyncCommitCertificate(certificate.to_vsc()),
1234                    ))
1235                };
1236
1237                Some((sender, message, TransmitType::Broadcast))
1238            },
1239            HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, sender) => {
1240                let view_number = certificate.view_number();
1241                let message = if self.upgrade_lock.epochs_enabled(view_number) {
1242                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1243                        GeneralConsensusMessage::ViewSyncFinalizeCertificate2(certificate),
1244                    ))
1245                } else {
1246                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1247                        GeneralConsensusMessage::ViewSyncFinalizeCertificate(certificate.to_vsc()),
1248                    ))
1249                };
1250
1251                Some((sender, message, TransmitType::Broadcast))
1252            },
1253            HotShotEvent::TimeoutVoteSend(vote) => {
1254                *maybe_action = Some(HotShotAction::TimeoutVote);
1255                let view_number = vote.view_number() + 1;
1256                let leader = match self
1257                    .membership_coordinator
1258                    .membership_for_epoch(self.epoch)
1259                    .await
1260                    .ok()?
1261                    .leader(view_number)
1262                    .await
1263                {
1264                    Ok(l) => l,
1265                    Err(e) => {
1266                        tracing::warn!(
1267                            "Failed to calculate leader for view number {view_number}. Error: \
1268                             {e:?}"
1269                        );
1270                        return None;
1271                    },
1272                };
1273                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1274                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1275                        GeneralConsensusMessage::TimeoutVote2(vote.clone()),
1276                    ))
1277                } else {
1278                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1279                        GeneralConsensusMessage::TimeoutVote(vote.clone().to_vote()),
1280                    ))
1281                };
1282
1283                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1284            },
1285            HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
1286                sender,
1287                MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1288                    GeneralConsensusMessage::UpgradeProposal(proposal),
1289                )),
1290                TransmitType::Broadcast,
1291            )),
1292            HotShotEvent::UpgradeVoteSend(vote) => {
1293                tracing::error!("Sending upgrade vote!");
1294                let view_number = vote.view_number();
1295                let leader = match self
1296                    .membership_coordinator
1297                    .membership_for_epoch(self.epoch)
1298                    .await
1299                    .ok()?
1300                    .leader(view_number)
1301                    .await
1302                {
1303                    Ok(l) => l,
1304                    Err(e) => {
1305                        tracing::warn!(
1306                            "Failed to calculate leader for view number {view_number}. Error: \
1307                             {e:?}"
1308                        );
1309                        return None;
1310                    },
1311                };
1312                Some((
1313                    vote.signing_key(),
1314                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1315                        GeneralConsensusMessage::UpgradeVote(vote.clone()),
1316                    )),
1317                    TransmitType::Direct(leader),
1318                ))
1319            },
1320            HotShotEvent::ViewChange(view, epoch) => {
1321                self.view = view;
1322                if epoch > self.epoch {
1323                    self.epoch = epoch;
1324                }
1325                let keep_view = ViewNumber::new(view.saturating_sub(1));
1326                self.cancel_tasks(keep_view);
1327                let net = Arc::clone(&self.network);
1328                let epoch = self.epoch.map(|x| x.u64().into());
1329                let membership_coordinator = self.membership_coordinator.clone();
1330                spawn(async move {
1331                    net.update_view::<TYPES>(keep_view.u64().into(), epoch, membership_coordinator)
1332                        .await;
1333                });
1334                None
1335            },
1336            HotShotEvent::VidRequestSend(req, sender, to) => Some((
1337                sender,
1338                MessageKind::Data(DataMessage::RequestData(req)),
1339                TransmitType::Direct(to),
1340            )),
1341            HotShotEvent::VidResponseSend(sender, to, proposal) => {
1342                let epochs_enabled = self
1343                    .upgrade_lock
1344                    .epochs_enabled(proposal.data.view_number());
1345                let upgraded_vid2 = self.upgrade_lock.upgraded_vid2(proposal.data.view_number());
1346                let message = match proposal.data {
1347                    VidDisperseShare::V0(data) => {
1348                        if epochs_enabled {
1349                            tracing::warn!(
1350                                "Epochs are enabled for view {} but still receive \
1351                                 VidDisperseShare0",
1352                                data.view_number()
1353                            );
1354                            return None;
1355                        }
1356                        let vid_share_proposal = Proposal {
1357                            data,
1358                            signature: proposal.signature,
1359                            _pd: proposal._pd,
1360                        };
1361                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1362                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(
1363                                vid_share_proposal,
1364                            )),
1365                        )))
1366                    },
1367                    VidDisperseShare::V1(data) => {
1368                        if !epochs_enabled {
1369                            tracing::warn!(
1370                                "Epochs are enabled for view {} but didn't receive \
1371                                 VidDisperseShare0",
1372                                data.view_number()
1373                            );
1374                            return None;
1375                        }
1376                        if upgraded_vid2 {
1377                            tracing::warn!(
1378                                "VID2 upgrade is enabled for view {} but didn't receive \
1379                                 VidDisperseShare2",
1380                                data.view_number()
1381                            );
1382                            return None;
1383                        }
1384                        let vid_share_proposal = Proposal {
1385                            data,
1386                            signature: proposal.signature,
1387                            _pd: proposal._pd,
1388                        };
1389                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1390                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg1(
1391                                vid_share_proposal,
1392                            )),
1393                        )))
1394                    },
1395                    VidDisperseShare::V2(data) => {
1396                        if !upgraded_vid2 {
1397                            tracing::warn!(
1398                                "VID2 upgrade is not enabled for view {} but receive \
1399                                 VidDisperseShare2",
1400                                data.view_number()
1401                            );
1402                            return None;
1403                        }
1404                        let vid_share_proposal = Proposal {
1405                            data,
1406                            signature: proposal.signature,
1407                            _pd: proposal._pd,
1408                        };
1409                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1410                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
1411                                vid_share_proposal,
1412                            )),
1413                        )))
1414                    },
1415                };
1416                Some((sender, message, TransmitType::Direct(to)))
1417            },
1418            HotShotEvent::HighQcSend(quorum_cert, next_epoch_qc, leader, sender) => Some((
1419                sender,
1420                MessageKind::Consensus(SequencingMessage::General(
1421                    GeneralConsensusMessage::HighQc(quorum_cert, next_epoch_qc),
1422                )),
1423                TransmitType::Direct(leader),
1424            )),
1425            HotShotEvent::EpochRootQcSend(epoch_root_qc, sender, leader) => {
1426                let message = if self
1427                    .upgrade_lock
1428                    .proposal2_version(epoch_root_qc.view_number())
1429                {
1430                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1431                        GeneralConsensusMessage::EpochRootQc(epoch_root_qc),
1432                    ))
1433                } else {
1434                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1435                        GeneralConsensusMessage::EpochRootQcV1(epoch_root_qc.into()),
1436                    ))
1437                };
1438
1439                Some((sender, message, TransmitType::Direct(leader)))
1440            },
1441            HotShotEvent::ExtendedQcSend(quorum_cert, next_epoch_qc, sender) => Some((
1442                sender,
1443                MessageKind::Consensus(SequencingMessage::General(
1444                    GeneralConsensusMessage::ExtendedQc(quorum_cert, next_epoch_qc),
1445                )),
1446                TransmitType::Broadcast,
1447            )),
1448            _ => None,
1449        }
1450    }
1451
1452    /// Creates a network message and spawns a task that transmits it on the wire.
1453    async fn spawn_transmit_task(
1454        &mut self,
1455        message_kind: MessageKind<TYPES>,
1456        maybe_action: Option<HotShotAction>,
1457        transmit: TransmitType<TYPES>,
1458        sender: TYPES::SignatureKey,
1459    ) {
1460        let broadcast_delay = match &message_kind {
1461            MessageKind::Consensus(
1462                SequencingMessage::General(GeneralConsensusMessage::Vote(_))
1463                | SequencingMessage::Da(_),
1464            ) => BroadcastDelay::View(*message_kind.view_number()),
1465            _ => BroadcastDelay::None,
1466        };
1467        let message = Message {
1468            sender,
1469            kind: message_kind,
1470        };
1471        let view_number = message.kind.view_number();
1472        let epoch = message.kind.epoch();
1473        let committee_topic = Topic::Global;
1474        let Ok(mem) = self
1475            .membership_coordinator
1476            .stake_table_for_epoch(self.epoch)
1477            .await
1478        else {
1479            return;
1480        };
1481        let da_committee = mem.da_committee_members(view_number).await;
1482        let network = Arc::clone(&self.network);
1483        let storage = self.storage.clone();
1484        let storage_metrics = Arc::clone(&self.storage_metrics);
1485        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
1486        let upgrade_lock = self.upgrade_lock.clone();
1487        let handle = spawn(async move {
1488            if NetworkEventTaskState::<TYPES, NET, S>::maybe_record_action(
1489                maybe_action,
1490                storage.clone(),
1491                consensus,
1492                view_number,
1493                epoch,
1494            )
1495            .await
1496            .is_err()
1497            {
1498                return;
1499            }
1500            if let MessageKind::Consensus(SequencingMessage::General(
1501                GeneralConsensusMessage::Proposal(prop),
1502            )) = &message.kind
1503            {
1504                let now = Instant::now();
1505                if storage
1506                    .append_proposal2(&convert_proposal(prop.clone()))
1507                    .await
1508                    .is_err()
1509                {
1510                    return;
1511                }
1512                storage_metrics
1513                    .append_quorum_duration
1514                    .add_point(now.elapsed().as_secs_f64());
1515            }
1516
1517            let serialized_message = match upgrade_lock.serialize(&message) {
1518                Ok(serialized) => serialized,
1519                Err(e) => {
1520                    tracing::error!("Failed to serialize message: {e}");
1521                    return;
1522                },
1523            };
1524
1525            let transmit_result = match transmit {
1526                TransmitType::Direct(recipient) => {
1527                    network
1528                        .direct_message(view_number.u64().into(), serialized_message, recipient)
1529                        .await
1530                },
1531                TransmitType::Broadcast => {
1532                    network
1533                        .broadcast_message(
1534                            view_number.u64().into(),
1535                            serialized_message,
1536                            committee_topic,
1537                            broadcast_delay,
1538                        )
1539                        .await
1540                },
1541                TransmitType::DaCommitteeBroadcast => {
1542                    network
1543                        .da_broadcast_message(
1544                            view_number.u64().into(),
1545                            serialized_message,
1546                            da_committee.iter().cloned().collect(),
1547                            broadcast_delay,
1548                        )
1549                        .await
1550                },
1551            };
1552
1553            match transmit_result {
1554                Ok(()) => {},
1555                Err(e) => tracing::warn!("Failed to send message task: {e:?}"),
1556            }
1557        });
1558        self.transmit_tasks
1559            .entry(view_number)
1560            .or_default()
1561            .push(handle);
1562    }
1563}
1564
1565/// A module with test helpers
1566pub mod test {
1567    use std::ops::{Deref, DerefMut};
1568
1569    use async_trait::async_trait;
1570
1571    use super::{
1572        Arc, ConnectedNetwork, HotShotEvent, MessageKind, NetworkEventTaskState, NodeType,
1573        Receiver, Result, Sender, Storage, TaskState, TransmitType,
1574    };
1575
1576    /// A dynamic type alias for a function that takes the result of `NetworkEventTaskState::parse_event`
1577    /// and changes it before transmitting on the network.
1578    pub type ModifierClosure<TYPES> = dyn Fn(
1579            &mut <TYPES as NodeType>::SignatureKey,
1580            &mut MessageKind<TYPES>,
1581            &mut TransmitType<TYPES>,
1582            &<TYPES as NodeType>::Membership,
1583        ) + Send
1584        + Sync;
1585
1586    /// A helper wrapper around `NetworkEventTaskState` that can modify its behaviour for tests
1587    pub struct NetworkEventTaskStateModifier<
1588        TYPES: NodeType,
1589        NET: ConnectedNetwork<TYPES::SignatureKey>,
1590        S: Storage<TYPES>,
1591    > {
1592        /// The real `NetworkEventTaskState`
1593        pub network_event_task_state: NetworkEventTaskState<TYPES, NET, S>,
1594        /// A function that takes the result of `NetworkEventTaskState::parse_event` and
1595        /// changes it before transmitting on the network.
1596        pub modifier: Arc<ModifierClosure<TYPES>>,
1597    }
1598
1599    impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
1600        NetworkEventTaskStateModifier<TYPES, NET, S>
1601    {
1602        /// Handles the received event modifying it before sending on the network.
1603        pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
1604            let mut maybe_action = None;
1605            if let Some((mut sender, mut message_kind, mut transmit)) =
1606                self.parse_event(event, &mut maybe_action).await
1607            {
1608                // Modify the values acquired by parsing the event.
1609                (self.modifier)(
1610                    &mut sender,
1611                    &mut message_kind,
1612                    &mut transmit,
1613                    &*self.membership_coordinator.membership().read().await,
1614                );
1615
1616                self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
1617                    .await;
1618            }
1619        }
1620    }
1621
1622    #[async_trait]
1623    impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
1624        TaskState for NetworkEventTaskStateModifier<TYPES, NET, S>
1625    {
1626        type Event = HotShotEvent<TYPES>;
1627
1628        async fn handle_event(
1629            &mut self,
1630            event: Arc<Self::Event>,
1631            _sender: &Sender<Arc<Self::Event>>,
1632            _receiver: &Receiver<Arc<Self::Event>>,
1633        ) -> Result<()> {
1634            self.handle(event).await;
1635
1636            Ok(())
1637        }
1638
1639        fn cancel_subtasks(&mut self) {}
1640    }
1641
1642    impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES>> Deref
1643        for NetworkEventTaskStateModifier<TYPES, NET, S>
1644    {
1645        type Target = NetworkEventTaskState<TYPES, NET, S>;
1646
1647        fn deref(&self) -> &Self::Target {
1648            &self.network_event_task_state
1649        }
1650    }
1651
1652    impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES>> DerefMut
1653        for NetworkEventTaskStateModifier<TYPES, NET, S>
1654    {
1655        fn deref_mut(&mut self) -> &mut Self::Target {
1656            &mut self.network_event_task_state
1657        }
1658    }
1659}