hotshot_task_impls/
network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap},
9    hash::{DefaultHasher, Hash},
10    sync::Arc,
11};
12
13use async_broadcast::{Receiver, Sender};
14use async_trait::async_trait;
15use hotshot_task::task::TaskState;
16use hotshot_types::{
17    consensus::OuterConsensus,
18    data::{VidDisperse, VidDisperseShare},
19    epoch_membership::EpochMembershipCoordinator,
20    event::{Event, EventType, HotShotAction},
21    message::{
22        convert_proposal, DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message,
23        MessageKind, Proposal, SequencingMessage, UpgradeLock,
24    },
25    simple_vote::HasEpoch,
26    storage_metrics::StorageMetricsValue,
27    traits::{
28        network::{
29            BroadcastDelay, ConnectedNetwork, RequestKind, ResponseMessage, Topic, TransmitType,
30            ViewMessage,
31        },
32        node_implementation::{ConsensusTime, NodeType, Versions},
33        storage::Storage,
34    },
35    vote::{HasViewNumber, Vote},
36};
37use hotshot_utils::anytrace::*;
38use tokio::{spawn, task::JoinHandle, time::Instant};
39use tracing::instrument;
40
41use crate::{
42    events::{HotShotEvent, HotShotTaskCompleted},
43    helpers::broadcast_event,
44};
45
46/// the network message task state
47#[derive(Clone)]
48pub struct NetworkMessageTaskState<TYPES: NodeType, V: Versions> {
49    /// Sender to send internal events this task generates to other tasks
50    pub internal_event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
51
52    /// Sender to send external events this task generates to the event stream
53    pub external_event_stream: Sender<Event<TYPES>>,
54
55    /// This nodes public key
56    pub public_key: TYPES::SignatureKey,
57
58    /// Lock for a decided upgrade
59    pub upgrade_lock: UpgradeLock<TYPES, V>,
60
61    /// Node's id
62    pub id: u64,
63}
64
65impl<TYPES: NodeType, V: Versions> NetworkMessageTaskState<TYPES, V> {
66    #[instrument(skip_all, name = "Network message task", fields(id = self.id), level = "trace")]
67    /// Handles a (deserialized) message from the network
68    pub async fn handle_message(&mut self, message: Message<TYPES>) {
69        match &message.kind {
70            MessageKind::Consensus(_) => {
71                tracing::debug!("Received consensus message from network: {:?}", message)
72            },
73            MessageKind::Data(_) => {
74                tracing::trace!("Received data message from network: {:?}", message)
75            },
76            MessageKind::External(_) => {
77                tracing::trace!("Received external message from network: {:?}", message)
78            },
79        }
80
81        // Match the message kind and send the appropriate event to the internal event stream
82        let sender = message.sender;
83        match message.kind {
84            // Handle consensus messages
85            MessageKind::Consensus(consensus_message) => {
86                let event = match consensus_message {
87                    SequencingMessage::General(general_message) => match general_message {
88                        GeneralConsensusMessage::Proposal(proposal) => {
89                            if self
90                                .upgrade_lock
91                                .epochs_enabled(proposal.data.view_number())
92                                .await
93                            {
94                                tracing::warn!(
95                                    "received GeneralConsensusMessage::Proposal for view {} but \
96                                     epochs are enabled for that view",
97                                    proposal.data.view_number()
98                                );
99                                return;
100                            }
101                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
102                        },
103                        GeneralConsensusMessage::Proposal2Legacy(proposal) => {
104                            if !self
105                                .upgrade_lock
106                                .proposal2_legacy_version(proposal.data.view_number())
107                                .await
108                            {
109                                tracing::warn!(
110                                    "received GeneralConsensusMessage::Proposal2Legacy for view \
111                                     {} but we are in the wrong version for that message type",
112                                    proposal.data.view_number()
113                                );
114                                return;
115                            }
116                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
117                        },
118                        GeneralConsensusMessage::Proposal2(proposal) => {
119                            if !self
120                                .upgrade_lock
121                                .proposal2_version(proposal.data.view_number())
122                                .await
123                            {
124                                tracing::warn!(
125                                    "received GeneralConsensusMessage::Proposal2 for view {} but \
126                                     we are in the wrong version for that message type",
127                                    proposal.data.view_number()
128                                );
129                                return;
130                            }
131                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
132                        },
133                        GeneralConsensusMessage::ProposalRequested(req, sig) => {
134                            HotShotEvent::QuorumProposalRequestRecv(req, sig)
135                        },
136                        GeneralConsensusMessage::ProposalResponse(proposal) => {
137                            if self
138                                .upgrade_lock
139                                .epochs_enabled(proposal.data.view_number())
140                                .await
141                            {
142                                tracing::warn!(
143                                    "received GeneralConsensusMessage::ProposalResponse for view \
144                                     {} but we are in the wrong version for that message type",
145                                    proposal.data.view_number()
146                                );
147                                return;
148                            }
149                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
150                        },
151                        GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
152                            if !self
153                                .upgrade_lock
154                                .proposal2_legacy_version(proposal.data.view_number())
155                                .await
156                            {
157                                tracing::warn!(
158                                    "received GeneralConsensusMessage::ProposalResponse2Legacy \
159                                     for view {} but we are in the wrong version for that message \
160                                     type",
161                                    proposal.data.view_number()
162                                );
163                                return;
164                            }
165                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
166                        },
167                        GeneralConsensusMessage::ProposalResponse2(proposal) => {
168                            if !self
169                                .upgrade_lock
170                                .proposal2_version(proposal.data.view_number())
171                                .await
172                            {
173                                tracing::warn!(
174                                    "received GeneralConsensusMessage::ProposalResponse2 for view \
175                                     {} but epochs are not enabled for that view",
176                                    proposal.data.view_number()
177                                );
178                                return;
179                            }
180                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
181                        },
182                        GeneralConsensusMessage::Vote(vote) => {
183                            if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
184                                tracing::warn!(
185                                    "received GeneralConsensusMessage::Vote for view {} but \
186                                     epochs are enabled for that view",
187                                    vote.view_number()
188                                );
189                                return;
190                            }
191                            HotShotEvent::QuorumVoteRecv(vote.to_vote2())
192                        },
193                        GeneralConsensusMessage::Vote2(vote) => {
194                            if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
195                                tracing::warn!(
196                                    "received GeneralConsensusMessage::Vote2 for view {} but \
197                                     epochs are not enabled for that view",
198                                    vote.view_number()
199                                );
200                                return;
201                            }
202                            HotShotEvent::QuorumVoteRecv(vote)
203                        },
204                        GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => {
205                            if self
206                                .upgrade_lock
207                                .epochs_enabled(view_sync_message.view_number())
208                                .await
209                            {
210                                tracing::warn!(
211                                    "received GeneralConsensusMessage::ViewSyncPreCommitVote for \
212                                     view {} but epochs are enabled for that view",
213                                    view_sync_message.view_number()
214                                );
215                                return;
216                            }
217                            HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message.to_vote2())
218                        },
219                        GeneralConsensusMessage::ViewSyncPreCommitVote2(view_sync_message) => {
220                            if !self
221                                .upgrade_lock
222                                .epochs_enabled(view_sync_message.view_number())
223                                .await
224                            {
225                                tracing::warn!(
226                                    "received GeneralConsensusMessage::ViewSyncPreCommitVote2 for \
227                                     view {} but epochs are not enabled for that view",
228                                    view_sync_message.view_number()
229                                );
230                                return;
231                            }
232                            HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message)
233                        },
234                        GeneralConsensusMessage::ViewSyncPreCommitCertificate(
235                            view_sync_message,
236                        ) => {
237                            if self
238                                .upgrade_lock
239                                .epochs_enabled(view_sync_message.view_number())
240                                .await
241                            {
242                                tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
243                                return;
244                            }
245                            HotShotEvent::ViewSyncPreCommitCertificateRecv(
246                                view_sync_message.to_vsc2(),
247                            )
248                        },
249                        GeneralConsensusMessage::ViewSyncPreCommitCertificate2(
250                            view_sync_message,
251                        ) => {
252                            if !self
253                                .upgrade_lock
254                                .epochs_enabled(view_sync_message.view_number())
255                                .await
256                            {
257                                tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
258                                return;
259                            }
260                            HotShotEvent::ViewSyncPreCommitCertificateRecv(view_sync_message)
261                        },
262                        GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => {
263                            if self
264                                .upgrade_lock
265                                .epochs_enabled(view_sync_message.view_number())
266                                .await
267                            {
268                                tracing::warn!(
269                                    "received GeneralConsensusMessage::ViewSyncCommitVote for \
270                                     view {} but epochs are enabled for that view",
271                                    view_sync_message.view_number()
272                                );
273                                return;
274                            }
275                            HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message.to_vote2())
276                        },
277                        GeneralConsensusMessage::ViewSyncCommitVote2(view_sync_message) => {
278                            if !self
279                                .upgrade_lock
280                                .epochs_enabled(view_sync_message.view_number())
281                                .await
282                            {
283                                tracing::warn!(
284                                    "received GeneralConsensusMessage::ViewSyncCommitVote2 for \
285                                     view {} but epochs are not enabled for that view",
286                                    view_sync_message.view_number()
287                                );
288                                return;
289                            }
290                            HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message)
291                        },
292                        GeneralConsensusMessage::ViewSyncCommitCertificate(view_sync_message) => {
293                            if self
294                                .upgrade_lock
295                                .epochs_enabled(view_sync_message.view_number())
296                                .await
297                            {
298                                tracing::warn!(
299                                    "received GeneralConsensusMessage::ViewSyncCommitCertificate \
300                                     for view {} but epochs are enabled for that view",
301                                    view_sync_message.view_number()
302                                );
303                                return;
304                            }
305                            HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message.to_vsc2())
306                        },
307                        GeneralConsensusMessage::ViewSyncCommitCertificate2(view_sync_message) => {
308                            if !self
309                                .upgrade_lock
310                                .epochs_enabled(view_sync_message.view_number())
311                                .await
312                            {
313                                tracing::warn!(
314                                    "received GeneralConsensusMessage::ViewSyncCommitCertificate2 \
315                                     for view {} but epochs are not enabled for that view",
316                                    view_sync_message.view_number()
317                                );
318                                return;
319                            }
320                            HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message)
321                        },
322                        GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => {
323                            if self
324                                .upgrade_lock
325                                .epochs_enabled(view_sync_message.view_number())
326                                .await
327                            {
328                                tracing::warn!(
329                                    "received GeneralConsensusMessage::ViewSyncFinalizeVote for \
330                                     view {} but epochs are enabled for that view",
331                                    view_sync_message.view_number()
332                                );
333                                return;
334                            }
335                            HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message.to_vote2())
336                        },
337                        GeneralConsensusMessage::ViewSyncFinalizeVote2(view_sync_message) => {
338                            if !self
339                                .upgrade_lock
340                                .epochs_enabled(view_sync_message.view_number())
341                                .await
342                            {
343                                tracing::warn!(
344                                    "received GeneralConsensusMessage::ViewSyncFinalizeVote2 for \
345                                     view {} but epochs are not enabled for that view",
346                                    view_sync_message.view_number()
347                                );
348                                return;
349                            }
350                            HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message)
351                        },
352                        GeneralConsensusMessage::ViewSyncFinalizeCertificate(view_sync_message) => {
353                            if self
354                                .upgrade_lock
355                                .epochs_enabled(view_sync_message.view_number())
356                                .await
357                            {
358                                tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
359                                return;
360                            }
361                            HotShotEvent::ViewSyncFinalizeCertificateRecv(
362                                view_sync_message.to_vsc2(),
363                            )
364                        },
365                        GeneralConsensusMessage::ViewSyncFinalizeCertificate2(
366                            view_sync_message,
367                        ) => {
368                            if !self
369                                .upgrade_lock
370                                .epochs_enabled(view_sync_message.view_number())
371                                .await
372                            {
373                                tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
374                                return;
375                            }
376                            HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_message)
377                        },
378                        GeneralConsensusMessage::TimeoutVote(message) => {
379                            if self
380                                .upgrade_lock
381                                .epochs_enabled(message.view_number())
382                                .await
383                            {
384                                tracing::warn!(
385                                    "received GeneralConsensusMessage::TimeoutVote for view {} \
386                                     but epochs are enabled for that view",
387                                    message.view_number()
388                                );
389                                return;
390                            }
391                            HotShotEvent::TimeoutVoteRecv(message.to_vote2())
392                        },
393                        GeneralConsensusMessage::TimeoutVote2(message) => {
394                            if !self
395                                .upgrade_lock
396                                .epochs_enabled(message.view_number())
397                                .await
398                            {
399                                tracing::warn!(
400                                    "received GeneralConsensusMessage::TimeoutVote2 for view {} \
401                                     but epochs are not enabled for that view",
402                                    message.view_number()
403                                );
404                                return;
405                            }
406                            HotShotEvent::TimeoutVoteRecv(message)
407                        },
408                        GeneralConsensusMessage::UpgradeProposal(message) => {
409                            HotShotEvent::UpgradeProposalRecv(message, sender)
410                        },
411                        GeneralConsensusMessage::UpgradeVote(message) => {
412                            tracing::error!("Received upgrade vote!");
413                            HotShotEvent::UpgradeVoteRecv(message)
414                        },
415                        GeneralConsensusMessage::HighQc(qc, next_qc) => {
416                            HotShotEvent::HighQcRecv(qc, next_qc, sender)
417                        },
418                        GeneralConsensusMessage::ExtendedQc(qc, next_epoch_qc) => {
419                            HotShotEvent::ExtendedQcRecv(qc, next_epoch_qc, sender)
420                        },
421                        GeneralConsensusMessage::EpochRootQuorumVote(vote) => {
422                            if !self
423                                .upgrade_lock
424                                .proposal2_legacy_version(vote.view_number())
425                                .await
426                            {
427                                tracing::warn!(
428                                    "received GeneralConsensusMessage::EpochRootQuorumVote for \
429                                     view {} but we do not expect this message in this version",
430                                    vote.view_number()
431                                );
432                                return;
433                            }
434                            HotShotEvent::EpochRootQuorumVoteRecv(vote.to_vote2())
435                        },
436                        GeneralConsensusMessage::EpochRootQuorumVote2(vote) => {
437                            if !self
438                                .upgrade_lock
439                                .proposal2_version(vote.view_number())
440                                .await
441                            {
442                                tracing::warn!(
443                                    "received GeneralConsensusMessage::EpochRootQuorumVote2 for \
444                                     view {} but we do not expect this message in this version",
445                                    vote.view_number()
446                                );
447                                return;
448                            }
449                            HotShotEvent::EpochRootQuorumVoteRecv(vote)
450                        },
451                        GeneralConsensusMessage::EpochRootQc(root_qc) => {
452                            if !self
453                                .upgrade_lock
454                                .proposal2_version(root_qc.view_number())
455                                .await
456                            {
457                                tracing::warn!(
458                                    "received GeneralConsensusMessage::EpochRootQc for view {} \
459                                     but we are in the wrong version for that message types",
460                                    root_qc.view_number()
461                                );
462                                return;
463                            }
464                            HotShotEvent::EpochRootQcRecv(root_qc, sender)
465                        },
466                        GeneralConsensusMessage::EpochRootQcV1(root_qc) => {
467                            if !self
468                                .upgrade_lock
469                                .proposal2_legacy_version(root_qc.view_number())
470                                .await
471                            {
472                                tracing::warn!(
473                                    "received GeneralConsensusMessage::EpochRootQcV1 for view {} \
474                                     but we are in the wrong version for that message type",
475                                    root_qc.view_number()
476                                );
477                                return;
478                            }
479                            HotShotEvent::EpochRootQcRecv(root_qc.into(), sender)
480                        },
481                    },
482                    SequencingMessage::Da(da_message) => match da_message {
483                        DaConsensusMessage::DaProposal(proposal) => {
484                            if self
485                                .upgrade_lock
486                                .epochs_enabled(proposal.data.view_number())
487                                .await
488                            {
489                                tracing::warn!(
490                                    "received DaConsensusMessage::DaProposal for view {} but \
491                                     epochs are enabled for that view",
492                                    proposal.data.view_number()
493                                );
494                                return;
495                            }
496                            HotShotEvent::DaProposalRecv(convert_proposal(proposal), sender)
497                        },
498                        DaConsensusMessage::DaProposal2(proposal) => {
499                            if !self
500                                .upgrade_lock
501                                .epochs_enabled(proposal.data.view_number())
502                                .await
503                            {
504                                tracing::warn!(
505                                    "received DaConsensusMessage::DaProposal2 for view {} but \
506                                     epochs are not enabled for that view",
507                                    proposal.data.view_number()
508                                );
509                                return;
510                            }
511                            HotShotEvent::DaProposalRecv(proposal, sender)
512                        },
513                        DaConsensusMessage::DaVote(vote) => {
514                            if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
515                                tracing::warn!(
516                                    "received DaConsensusMessage::DaVote for view {} but epochs \
517                                     are enabled for that view",
518                                    vote.view_number()
519                                );
520                                return;
521                            }
522                            HotShotEvent::DaVoteRecv(vote.clone().to_vote2())
523                        },
524                        DaConsensusMessage::DaVote2(vote) => {
525                            if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
526                                tracing::warn!(
527                                    "received DaConsensusMessage::DaVote2 for view {} but epochs \
528                                     are not enabled for that view",
529                                    vote.view_number()
530                                );
531                                return;
532                            }
533                            HotShotEvent::DaVoteRecv(vote.clone())
534                        },
535                        DaConsensusMessage::DaCertificate(cert) => {
536                            if self.upgrade_lock.epochs_enabled(cert.view_number()).await {
537                                tracing::warn!(
538                                    "received DaConsensusMessage::DaCertificate for view {} but \
539                                     epochs are enabled for that view",
540                                    cert.view_number()
541                                );
542                                return;
543                            }
544                            HotShotEvent::DaCertificateRecv(cert.to_dac2())
545                        },
546                        DaConsensusMessage::DaCertificate2(cert) => {
547                            if !self.upgrade_lock.epochs_enabled(cert.view_number()).await {
548                                tracing::warn!(
549                                    "received DaConsensusMessage::DaCertificate2 for view {} but \
550                                     epochs are not enabled for that view",
551                                    cert.view_number()
552                                );
553                                return;
554                            }
555                            HotShotEvent::DaCertificateRecv(cert)
556                        },
557                        DaConsensusMessage::VidDisperseMsg(proposal) => {
558                            if self
559                                .upgrade_lock
560                                .epochs_enabled(proposal.data.view_number())
561                                .await
562                            {
563                                tracing::warn!(
564                                    "received DaConsensusMessage::VidDisperseMsg for view {} but \
565                                     epochs are enabled for that view",
566                                    proposal.data.view_number()
567                                );
568                                return;
569                            }
570                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
571                        },
572                        DaConsensusMessage::VidDisperseMsg2(proposal) => {
573                            if !self
574                                .upgrade_lock
575                                .epochs_enabled(proposal.data.view_number())
576                                .await
577                            {
578                                tracing::warn!(
579                                    "received DaConsensusMessage::VidDisperseMsg2 for view {} but \
580                                     epochs are not enabled for that view",
581                                    proposal.data.view_number()
582                                );
583                                return;
584                            }
585                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
586                        },
587                    },
588                };
589                broadcast_event(Arc::new(event), &self.internal_event_stream).await;
590            },
591
592            // Handle data messages
593            MessageKind::Data(message) => match message {
594                DataMessage::SubmitTransaction(transaction, _) => {
595                    let mut hasher = DefaultHasher::new();
596                    transaction.hash(&mut hasher);
597                    broadcast_event(
598                        Arc::new(HotShotEvent::TransactionsRecv(vec![transaction])),
599                        &self.internal_event_stream,
600                    )
601                    .await;
602                },
603                DataMessage::DataResponse(response) => {
604                    if let ResponseMessage::Found(message) = response {
605                        match message {
606                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(proposal)) => {
607                                broadcast_event(
608                                    Arc::new(HotShotEvent::VidResponseRecv(
609                                        sender,
610                                        convert_proposal(proposal),
611                                    )),
612                                    &self.internal_event_stream,
613                                )
614                                .await;
615                            },
616                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
617                                proposal,
618                            )) => {
619                                broadcast_event(
620                                    Arc::new(HotShotEvent::VidResponseRecv(
621                                        sender,
622                                        convert_proposal(proposal),
623                                    )),
624                                    &self.internal_event_stream,
625                                )
626                                .await;
627                            },
628                            _ => {},
629                        }
630                    }
631                },
632                DataMessage::RequestData(data) => {
633                    let req_data = data.clone();
634                    if let RequestKind::Vid(_view_number, _key) = req_data.request {
635                        broadcast_event(
636                            Arc::new(HotShotEvent::VidRequestRecv(data, sender)),
637                            &self.internal_event_stream,
638                        )
639                        .await;
640                    }
641                },
642            },
643
644            // Handle external messages
645            MessageKind::External(data) => {
646                if sender == self.public_key {
647                    return;
648                }
649                // Send the external message to the external event stream so it can be processed
650                broadcast_event(
651                    Event {
652                        view_number: TYPES::View::new(1),
653                        event: EventType::ExternalMessageReceived { sender, data },
654                    },
655                    &self.external_event_stream,
656                )
657                .await;
658            },
659        }
660    }
661}
662
663/// network event task state
664pub struct NetworkEventTaskState<
665    TYPES: NodeType,
666    V: Versions,
667    NET: ConnectedNetwork<TYPES::SignatureKey>,
668    S: Storage<TYPES>,
669> {
670    /// comm network
671    pub network: Arc<NET>,
672
673    /// view number
674    pub view: TYPES::View,
675
676    /// epoch number
677    pub epoch: Option<TYPES::Epoch>,
678
679    /// network memberships
680    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
681
682    /// Storage to store actionable events
683    pub storage: S,
684
685    /// Storage metrics
686    pub storage_metrics: Arc<StorageMetricsValue>,
687
688    /// Shared consensus state
689    pub consensus: OuterConsensus<TYPES>,
690
691    /// Lock for a decided upgrade
692    pub upgrade_lock: UpgradeLock<TYPES, V>,
693
694    /// map view number to transmit tasks
695    pub transmit_tasks: BTreeMap<TYPES::View, Vec<JoinHandle<()>>>,
696
697    /// Number of blocks in an epoch, zero means there are no epochs
698    pub epoch_height: u64,
699
700    /// Node's id
701    pub id: u64,
702}
703
704#[async_trait]
705impl<
706        TYPES: NodeType,
707        V: Versions,
708        NET: ConnectedNetwork<TYPES::SignatureKey>,
709        S: Storage<TYPES> + 'static,
710    > TaskState for NetworkEventTaskState<TYPES, V, NET, S>
711{
712    type Event = HotShotEvent<TYPES>;
713
714    async fn handle_event(
715        &mut self,
716        event: Arc<Self::Event>,
717        _sender: &Sender<Arc<Self::Event>>,
718        _receiver: &Receiver<Arc<Self::Event>>,
719    ) -> Result<()> {
720        self.handle(event).await;
721
722        Ok(())
723    }
724
725    fn cancel_subtasks(&mut self) {}
726}
727
728impl<
729        TYPES: NodeType,
730        V: Versions,
731        NET: ConnectedNetwork<TYPES::SignatureKey>,
732        S: Storage<TYPES> + 'static,
733    > NetworkEventTaskState<TYPES, V, NET, S>
734{
735    /// Handle the given event.
736    ///
737    /// Returns the completion status.
738    #[instrument(skip_all, fields(id = self.id, view = *self.view), name = "Network Task", level = "error")]
739    pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
740        let mut maybe_action = None;
741        if let Some((sender, message_kind, transmit)) =
742            self.parse_event(event, &mut maybe_action).await
743        {
744            self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
745                .await;
746        };
747    }
748
749    /// handle `VidDisperseSend`
750    async fn handle_vid_disperse_proposal(
751        &self,
752        vid_proposal: Proposal<TYPES, VidDisperse<TYPES>>,
753        sender: &<TYPES as NodeType>::SignatureKey,
754    ) -> Option<HotShotTaskCompleted> {
755        let view = vid_proposal.data.view_number();
756        let epoch = vid_proposal.data.epoch();
757        let vid_share_proposals = VidDisperseShare::to_vid_share_proposals(vid_proposal);
758        let mut messages = HashMap::new();
759
760        for proposal in vid_share_proposals {
761            let recipient = proposal.data.recipient_key().clone();
762            let message = if self
763                .upgrade_lock
764                .epochs_enabled(proposal.data.view_number())
765                .await
766            {
767                let vid_share_proposal = if let VidDisperseShare::V1(data) = proposal.data {
768                    Proposal {
769                        data,
770                        signature: proposal.signature,
771                        _pd: proposal._pd,
772                    }
773                } else {
774                    tracing::warn!(
775                        "Epochs are enabled for view {} but didn't receive VidDisperseShare2",
776                        proposal.data.view_number()
777                    );
778                    return None;
779                };
780                Message {
781                    sender: sender.clone(),
782                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
783                        DaConsensusMessage::VidDisperseMsg2(vid_share_proposal),
784                    )),
785                }
786            } else {
787                let vid_share_proposal = if let VidDisperseShare::V0(data) = proposal.data {
788                    Proposal {
789                        data,
790                        signature: proposal.signature,
791                        _pd: proposal._pd,
792                    }
793                } else {
794                    tracing::warn!(
795                        "Epochs are not enabled for view {} but didn't receive ADVZDisperseShare",
796                        proposal.data.view_number()
797                    );
798                    return None;
799                };
800                Message {
801                    sender: sender.clone(),
802                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
803                        DaConsensusMessage::VidDisperseMsg(vid_share_proposal),
804                    )),
805                }
806            };
807            let serialized_message = match self.upgrade_lock.serialize(&message).await {
808                Ok(serialized) => serialized,
809                Err(e) => {
810                    tracing::error!("Failed to serialize message: {e}");
811                    continue;
812                },
813            };
814
815            messages.insert(recipient, serialized_message);
816        }
817
818        let net = Arc::clone(&self.network);
819        let storage = self.storage.clone();
820        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
821        spawn(async move {
822            if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
823                Some(HotShotAction::VidDisperse),
824                storage,
825                consensus,
826                view,
827                epoch,
828            )
829            .await
830            .is_err()
831            {
832                return;
833            }
834            match net.vid_broadcast_message(messages).await {
835                Ok(()) => {},
836                Err(e) => tracing::warn!("Failed to send message from network task: {e:?}"),
837            }
838        });
839
840        None
841    }
842
843    /// Record `HotShotAction` if available
844    async fn maybe_record_action(
845        maybe_action: Option<HotShotAction>,
846        storage: S,
847        consensus: OuterConsensus<TYPES>,
848        view: <TYPES as NodeType>::View,
849        epoch: Option<<TYPES as NodeType>::Epoch>,
850    ) -> std::result::Result<(), ()> {
851        if let Some(mut action) = maybe_action {
852            if !consensus.write().await.update_action(action, view) {
853                return Err(());
854            }
855            // If the action was view sync record it as a vote, but we don't
856            // want to limit to 1 View sync vote above so change the action here.
857            if matches!(action, HotShotAction::ViewSyncVote) {
858                action = HotShotAction::Vote;
859            }
860            match storage.record_action(view, epoch, action).await {
861                Ok(()) => Ok(()),
862                Err(e) => {
863                    tracing::warn!("Not Sending {action:?} because of storage error: {e:?}");
864                    Err(())
865                },
866            }
867        } else {
868            Ok(())
869        }
870    }
871
872    /// Cancel all tasks for previous views
873    pub fn cancel_tasks(&mut self, view: TYPES::View) {
874        let keep = self.transmit_tasks.split_off(&view);
875
876        while let Some((_, tasks)) = self.transmit_tasks.pop_first() {
877            for task in tasks {
878                task.abort();
879            }
880        }
881
882        self.transmit_tasks = keep;
883    }
884
885    /// Parses a `HotShotEvent` and returns a tuple of: (sender's public key, `MessageKind`, `TransmitType`)
886    /// which will be used to create a message and transmit on the wire.
887    /// Returns `None` if the parsing result should not be sent on the wire.
888    /// Handles the `VidDisperseSend` event separately using a helper method.
889    #[allow(clippy::too_many_lines)]
890    async fn parse_event(
891        &mut self,
892        event: Arc<HotShotEvent<TYPES>>,
893        maybe_action: &mut Option<HotShotAction>,
894    ) -> Option<(
895        <TYPES as NodeType>::SignatureKey,
896        MessageKind<TYPES>,
897        TransmitType<TYPES>,
898    )> {
899        match event.as_ref().clone() {
900            HotShotEvent::QuorumProposalSend(proposal, sender) => {
901                *maybe_action = Some(HotShotAction::Propose);
902
903                let message = if self
904                    .upgrade_lock
905                    .proposal2_version(proposal.data.view_number())
906                    .await
907                {
908                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
909                        GeneralConsensusMessage::Proposal2(convert_proposal(proposal)),
910                    ))
911                } else if self
912                    .upgrade_lock
913                    .proposal2_legacy_version(proposal.data.view_number())
914                    .await
915                {
916                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
917                        GeneralConsensusMessage::Proposal2Legacy(convert_proposal(proposal)),
918                    ))
919                } else {
920                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
921                        GeneralConsensusMessage::Proposal(convert_proposal(proposal)),
922                    ))
923                };
924
925                Some((sender, message, TransmitType::Broadcast))
926            },
927
928            // ED Each network task is subscribed to all these message types.  Need filters per network task
929            HotShotEvent::QuorumVoteSend(vote) => {
930                *maybe_action = Some(HotShotAction::Vote);
931                let view_number = vote.view_number() + 1;
932                let leader = match self
933                    .membership_coordinator
934                    .membership_for_epoch(vote.epoch())
935                    .await
936                    .ok()?
937                    .leader(view_number)
938                    .await
939                {
940                    Ok(l) => l,
941                    Err(e) => {
942                        tracing::warn!(
943                            "Failed to calculate leader for view number {view_number}. Error: \
944                             {e:?}"
945                        );
946                        return None;
947                    },
948                };
949
950                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
951                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
952                        GeneralConsensusMessage::Vote2(vote.clone()),
953                    ))
954                } else {
955                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
956                        GeneralConsensusMessage::Vote(vote.clone().to_vote()),
957                    ))
958                };
959
960                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
961            },
962            HotShotEvent::EpochRootQuorumVoteSend(vote) => {
963                *maybe_action = Some(HotShotAction::Vote);
964                let view_number = vote.view_number() + 1;
965                let leader = match self
966                    .membership_coordinator
967                    .membership_for_epoch(vote.epoch())
968                    .await
969                    .ok()?
970                    .leader(view_number)
971                    .await
972                {
973                    Ok(l) => l,
974                    Err(e) => {
975                        tracing::warn!(
976                            "Failed to calculate leader for view number {:?}. Error: {:?}",
977                            view_number,
978                            e
979                        );
980                        return None;
981                    },
982                };
983
984                let message = if self
985                    .upgrade_lock
986                    .proposal2_version(vote.view_number())
987                    .await
988                {
989                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
990                        GeneralConsensusMessage::EpochRootQuorumVote2(vote.clone()),
991                    ))
992                } else if self
993                    .upgrade_lock
994                    .proposal2_legacy_version(vote.view_number())
995                    .await
996                {
997                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
998                        GeneralConsensusMessage::EpochRootQuorumVote(vote.clone().to_vote()),
999                    ))
1000                } else {
1001                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1002                        GeneralConsensusMessage::Vote(vote.vote.clone().to_vote()),
1003                    ))
1004                };
1005
1006                Some((
1007                    vote.vote.signing_key(),
1008                    message,
1009                    TransmitType::Direct(leader),
1010                ))
1011            },
1012            HotShotEvent::ExtendedQuorumVoteSend(vote) => {
1013                *maybe_action = Some(HotShotAction::Vote);
1014                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1015                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1016                        GeneralConsensusMessage::Vote2(vote.clone()),
1017                    ))
1018                } else {
1019                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1020                        GeneralConsensusMessage::Vote(vote.clone().to_vote()),
1021                    ))
1022                };
1023
1024                Some((vote.signing_key(), message, TransmitType::Broadcast))
1025            },
1026            HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
1027                req.key.clone(),
1028                MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1029                    GeneralConsensusMessage::ProposalRequested(req.clone(), signature),
1030                )),
1031                TransmitType::Broadcast,
1032            )),
1033            HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => {
1034                let message = if self
1035                    .upgrade_lock
1036                    .proposal2_version(proposal.data.view_number())
1037                    .await
1038                {
1039                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1040                        GeneralConsensusMessage::ProposalResponse2(convert_proposal(proposal)),
1041                    ))
1042                } else if self
1043                    .upgrade_lock
1044                    .proposal2_legacy_version(proposal.data.view_number())
1045                    .await
1046                {
1047                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1048                        GeneralConsensusMessage::ProposalResponse2Legacy(convert_proposal(
1049                            proposal,
1050                        )),
1051                    ))
1052                } else {
1053                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1054                        GeneralConsensusMessage::ProposalResponse(convert_proposal(proposal)),
1055                    ))
1056                };
1057
1058                Some((
1059                    sender_key.clone(),
1060                    message,
1061                    TransmitType::Direct(sender_key),
1062                ))
1063            },
1064            HotShotEvent::VidDisperseSend(proposal, sender) => {
1065                self.handle_vid_disperse_proposal(proposal, &sender).await;
1066                None
1067            },
1068            HotShotEvent::DaProposalSend(proposal, sender) => {
1069                *maybe_action = Some(HotShotAction::DaPropose);
1070
1071                let message = if self
1072                    .upgrade_lock
1073                    .epochs_enabled(proposal.data.view_number())
1074                    .await
1075                {
1076                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1077                        DaConsensusMessage::DaProposal2(proposal),
1078                    ))
1079                } else {
1080                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1081                        DaConsensusMessage::DaProposal(convert_proposal(proposal)),
1082                    ))
1083                };
1084
1085                Some((sender, message, TransmitType::DaCommitteeBroadcast))
1086            },
1087            HotShotEvent::DaVoteSend(vote) => {
1088                *maybe_action = Some(HotShotAction::DaVote);
1089                let view_number = vote.view_number();
1090                let leader = match self
1091                    .membership_coordinator
1092                    .membership_for_epoch(vote.epoch())
1093                    .await
1094                    .ok()?
1095                    .leader(view_number)
1096                    .await
1097                {
1098                    Ok(l) => l,
1099                    Err(e) => {
1100                        tracing::warn!(
1101                            "Failed to calculate leader for view number {view_number}. Error: \
1102                             {e:?}"
1103                        );
1104                        return None;
1105                    },
1106                };
1107
1108                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1109                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1110                        DaConsensusMessage::DaVote2(vote.clone()),
1111                    ))
1112                } else {
1113                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1114                        DaConsensusMessage::DaVote(vote.clone().to_vote()),
1115                    ))
1116                };
1117
1118                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1119            },
1120            HotShotEvent::DacSend(certificate, sender) => {
1121                *maybe_action = Some(HotShotAction::DaCert);
1122                let message = if self
1123                    .upgrade_lock
1124                    .epochs_enabled(certificate.view_number())
1125                    .await
1126                {
1127                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1128                        DaConsensusMessage::DaCertificate2(certificate),
1129                    ))
1130                } else {
1131                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1132                        DaConsensusMessage::DaCertificate(certificate.to_dac()),
1133                    ))
1134                };
1135
1136                Some((sender, message, TransmitType::Broadcast))
1137            },
1138            HotShotEvent::ViewSyncPreCommitVoteSend(vote) => {
1139                let view_number = vote.view_number() + vote.date().relay;
1140                let leader = match self
1141                    .membership_coordinator
1142                    .membership_for_epoch(self.epoch)
1143                    .await
1144                    .ok()?
1145                    .leader(view_number)
1146                    .await
1147                {
1148                    Ok(l) => l,
1149                    Err(e) => {
1150                        tracing::warn!(
1151                            "Failed to calculate leader for view number {view_number}. Error: \
1152                             {e:?}"
1153                        );
1154                        return None;
1155                    },
1156                };
1157                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1158                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1159                        GeneralConsensusMessage::ViewSyncPreCommitVote2(vote.clone()),
1160                    ))
1161                } else {
1162                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1163                        GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone().to_vote()),
1164                    ))
1165                };
1166
1167                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1168            },
1169            HotShotEvent::ViewSyncCommitVoteSend(vote) => {
1170                *maybe_action = Some(HotShotAction::ViewSyncVote);
1171                let view_number = vote.view_number() + vote.date().relay;
1172                let leader = match self
1173                    .membership_coordinator
1174                    .membership_for_epoch(self.epoch)
1175                    .await
1176                    .ok()?
1177                    .leader(view_number)
1178                    .await
1179                {
1180                    Ok(l) => l,
1181                    Err(e) => {
1182                        tracing::warn!(
1183                            "Failed to calculate leader for view number {view_number}. Error: \
1184                             {e:?}"
1185                        );
1186                        return None;
1187                    },
1188                };
1189                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1190                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1191                        GeneralConsensusMessage::ViewSyncCommitVote2(vote.clone()),
1192                    ))
1193                } else {
1194                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1195                        GeneralConsensusMessage::ViewSyncCommitVote(vote.clone().to_vote()),
1196                    ))
1197                };
1198
1199                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1200            },
1201            HotShotEvent::ViewSyncFinalizeVoteSend(vote) => {
1202                *maybe_action = Some(HotShotAction::ViewSyncVote);
1203                let view_number = vote.view_number() + vote.date().relay;
1204                let leader = match self
1205                    .membership_coordinator
1206                    .membership_for_epoch(self.epoch)
1207                    .await
1208                    .ok()?
1209                    .leader(view_number)
1210                    .await
1211                {
1212                    Ok(l) => l,
1213                    Err(e) => {
1214                        tracing::warn!(
1215                            "Failed to calculate leader for view number {view_number:?}. Error: \
1216                             {e:?}"
1217                        );
1218                        return None;
1219                    },
1220                };
1221                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1222                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1223                        GeneralConsensusMessage::ViewSyncFinalizeVote2(vote.clone()),
1224                    ))
1225                } else {
1226                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1227                        GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone().to_vote()),
1228                    ))
1229                };
1230
1231                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1232            },
1233            HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, sender) => {
1234                let view_number = certificate.view_number();
1235                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1236                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1237                        GeneralConsensusMessage::ViewSyncPreCommitCertificate2(certificate),
1238                    ))
1239                } else {
1240                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1241                        GeneralConsensusMessage::ViewSyncPreCommitCertificate(certificate.to_vsc()),
1242                    ))
1243                };
1244
1245                Some((sender, message, TransmitType::Broadcast))
1246            },
1247            HotShotEvent::ViewSyncCommitCertificateSend(certificate, sender) => {
1248                let view_number = certificate.view_number();
1249                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1250                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1251                        GeneralConsensusMessage::ViewSyncCommitCertificate2(certificate),
1252                    ))
1253                } else {
1254                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1255                        GeneralConsensusMessage::ViewSyncCommitCertificate(certificate.to_vsc()),
1256                    ))
1257                };
1258
1259                Some((sender, message, TransmitType::Broadcast))
1260            },
1261            HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, sender) => {
1262                let view_number = certificate.view_number();
1263                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1264                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1265                        GeneralConsensusMessage::ViewSyncFinalizeCertificate2(certificate),
1266                    ))
1267                } else {
1268                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1269                        GeneralConsensusMessage::ViewSyncFinalizeCertificate(certificate.to_vsc()),
1270                    ))
1271                };
1272
1273                Some((sender, message, TransmitType::Broadcast))
1274            },
1275            HotShotEvent::TimeoutVoteSend(vote) => {
1276                *maybe_action = Some(HotShotAction::TimeoutVote);
1277                let view_number = vote.view_number() + 1;
1278                let leader = match self
1279                    .membership_coordinator
1280                    .membership_for_epoch(self.epoch)
1281                    .await
1282                    .ok()?
1283                    .leader(view_number)
1284                    .await
1285                {
1286                    Ok(l) => l,
1287                    Err(e) => {
1288                        tracing::warn!(
1289                            "Failed to calculate leader for view number {view_number}. Error: \
1290                             {e:?}"
1291                        );
1292                        return None;
1293                    },
1294                };
1295                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1296                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1297                        GeneralConsensusMessage::TimeoutVote2(vote.clone()),
1298                    ))
1299                } else {
1300                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1301                        GeneralConsensusMessage::TimeoutVote(vote.clone().to_vote()),
1302                    ))
1303                };
1304
1305                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1306            },
1307            HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
1308                sender,
1309                MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1310                    GeneralConsensusMessage::UpgradeProposal(proposal),
1311                )),
1312                TransmitType::Broadcast,
1313            )),
1314            HotShotEvent::UpgradeVoteSend(vote) => {
1315                tracing::error!("Sending upgrade vote!");
1316                let view_number = vote.view_number();
1317                let leader = match self
1318                    .membership_coordinator
1319                    .membership_for_epoch(self.epoch)
1320                    .await
1321                    .ok()?
1322                    .leader(view_number)
1323                    .await
1324                {
1325                    Ok(l) => l,
1326                    Err(e) => {
1327                        tracing::warn!(
1328                            "Failed to calculate leader for view number {view_number}. Error: \
1329                             {e:?}"
1330                        );
1331                        return None;
1332                    },
1333                };
1334                Some((
1335                    vote.signing_key(),
1336                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1337                        GeneralConsensusMessage::UpgradeVote(vote.clone()),
1338                    )),
1339                    TransmitType::Direct(leader),
1340                ))
1341            },
1342            HotShotEvent::ViewChange(view, epoch) => {
1343                self.view = view;
1344                if epoch > self.epoch {
1345                    self.epoch = epoch;
1346                }
1347                let keep_view = TYPES::View::new(view.saturating_sub(1));
1348                self.cancel_tasks(keep_view);
1349                let net = Arc::clone(&self.network);
1350                let epoch = self.epoch.map(|x| x.u64());
1351                let membership_coordinator = self.membership_coordinator.clone();
1352                spawn(async move {
1353                    net.update_view::<TYPES>(*keep_view, epoch, membership_coordinator)
1354                        .await;
1355                });
1356                None
1357            },
1358            HotShotEvent::VidRequestSend(req, sender, to) => Some((
1359                sender,
1360                MessageKind::Data(DataMessage::RequestData(req)),
1361                TransmitType::Direct(to),
1362            )),
1363            HotShotEvent::VidResponseSend(sender, to, proposal) => {
1364                let epochs_enabled = self
1365                    .upgrade_lock
1366                    .epochs_enabled(proposal.data.view_number())
1367                    .await;
1368                let message = match proposal.data {
1369                    VidDisperseShare::V0(data) => {
1370                        if epochs_enabled {
1371                            tracing::warn!(
1372                                "Epochs are enabled for view {} but didn't receive \
1373                                 VidDisperseShare2",
1374                                data.view_number()
1375                            );
1376                            return None;
1377                        }
1378                        let vid_share_proposal = Proposal {
1379                            data,
1380                            signature: proposal.signature,
1381                            _pd: proposal._pd,
1382                        };
1383                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1384                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(
1385                                vid_share_proposal,
1386                            )),
1387                        )))
1388                    },
1389                    VidDisperseShare::V1(data) => {
1390                        if !epochs_enabled {
1391                            tracing::warn!(
1392                                "Epochs are enabled for view {} but didn't receive \
1393                                 ADVZDisperseShare",
1394                                data.view_number()
1395                            );
1396                            return None;
1397                        }
1398                        let vid_share_proposal = Proposal {
1399                            data,
1400                            signature: proposal.signature,
1401                            _pd: proposal._pd,
1402                        };
1403                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1404                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
1405                                vid_share_proposal,
1406                            )),
1407                        )))
1408                    },
1409                };
1410                Some((sender, message, TransmitType::Direct(to)))
1411            },
1412            HotShotEvent::HighQcSend(quorum_cert, next_epoch_qc, leader, sender) => Some((
1413                sender,
1414                MessageKind::Consensus(SequencingMessage::General(
1415                    GeneralConsensusMessage::HighQc(quorum_cert, next_epoch_qc),
1416                )),
1417                TransmitType::Direct(leader),
1418            )),
1419            HotShotEvent::EpochRootQcSend(epoch_root_qc, sender, leader) => {
1420                let message = if self
1421                    .upgrade_lock
1422                    .proposal2_version(epoch_root_qc.view_number())
1423                    .await
1424                {
1425                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1426                        GeneralConsensusMessage::EpochRootQc(epoch_root_qc),
1427                    ))
1428                } else {
1429                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1430                        GeneralConsensusMessage::EpochRootQcV1(epoch_root_qc.into()),
1431                    ))
1432                };
1433
1434                Some((sender, message, TransmitType::Direct(leader)))
1435            },
1436            HotShotEvent::ExtendedQcSend(quorum_cert, next_epoch_qc, sender) => Some((
1437                sender,
1438                MessageKind::Consensus(SequencingMessage::General(
1439                    GeneralConsensusMessage::ExtendedQc(quorum_cert, next_epoch_qc),
1440                )),
1441                TransmitType::Broadcast,
1442            )),
1443            _ => None,
1444        }
1445    }
1446
1447    /// Creates a network message and spawns a task that transmits it on the wire.
1448    async fn spawn_transmit_task(
1449        &mut self,
1450        message_kind: MessageKind<TYPES>,
1451        maybe_action: Option<HotShotAction>,
1452        transmit: TransmitType<TYPES>,
1453        sender: TYPES::SignatureKey,
1454    ) {
1455        let broadcast_delay = match &message_kind {
1456            MessageKind::Consensus(
1457                SequencingMessage::General(GeneralConsensusMessage::Vote(_))
1458                | SequencingMessage::Da(_),
1459            ) => BroadcastDelay::View(*message_kind.view_number()),
1460            _ => BroadcastDelay::None,
1461        };
1462        let message = Message {
1463            sender,
1464            kind: message_kind,
1465        };
1466        let view_number = message.kind.view_number();
1467        let epoch = message.kind.epoch();
1468        let committee_topic = Topic::Global;
1469        let Ok(mem) = self
1470            .membership_coordinator
1471            .stake_table_for_epoch(self.epoch)
1472            .await
1473        else {
1474            return;
1475        };
1476        let da_committee = mem.da_committee_members(view_number).await;
1477        let network = Arc::clone(&self.network);
1478        let storage = self.storage.clone();
1479        let storage_metrics = Arc::clone(&self.storage_metrics);
1480        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
1481        let upgrade_lock = self.upgrade_lock.clone();
1482        let handle = spawn(async move {
1483            if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
1484                maybe_action,
1485                storage.clone(),
1486                consensus,
1487                view_number,
1488                epoch,
1489            )
1490            .await
1491            .is_err()
1492            {
1493                return;
1494            }
1495            if let MessageKind::Consensus(SequencingMessage::General(
1496                GeneralConsensusMessage::Proposal(prop),
1497            )) = &message.kind
1498            {
1499                let now = Instant::now();
1500                if storage
1501                    .append_proposal2(&convert_proposal(prop.clone()))
1502                    .await
1503                    .is_err()
1504                {
1505                    return;
1506                }
1507                storage_metrics
1508                    .append_quorum_duration
1509                    .add_point(now.elapsed().as_secs_f64());
1510            }
1511
1512            let serialized_message = match upgrade_lock.serialize(&message).await {
1513                Ok(serialized) => serialized,
1514                Err(e) => {
1515                    tracing::error!("Failed to serialize message: {e}");
1516                    return;
1517                },
1518            };
1519
1520            let transmit_result = match transmit {
1521                TransmitType::Direct(recipient) => {
1522                    network.direct_message(serialized_message, recipient).await
1523                },
1524                TransmitType::Broadcast => {
1525                    network
1526                        .broadcast_message(serialized_message, committee_topic, broadcast_delay)
1527                        .await
1528                },
1529                TransmitType::DaCommitteeBroadcast => {
1530                    network
1531                        .da_broadcast_message(
1532                            serialized_message,
1533                            da_committee.iter().cloned().collect(),
1534                            broadcast_delay,
1535                        )
1536                        .await
1537                },
1538            };
1539
1540            match transmit_result {
1541                Ok(()) => {},
1542                Err(e) => tracing::warn!("Failed to send message task: {e:?}"),
1543            }
1544        });
1545        self.transmit_tasks
1546            .entry(view_number)
1547            .or_default()
1548            .push(handle);
1549    }
1550}
1551
1552/// A module with test helpers
1553pub mod test {
1554    use std::ops::{Deref, DerefMut};
1555
1556    use async_trait::async_trait;
1557
1558    use super::{
1559        Arc, ConnectedNetwork, HotShotEvent, MessageKind, NetworkEventTaskState, NodeType,
1560        Receiver, Result, Sender, Storage, TaskState, TransmitType, Versions,
1561    };
1562
1563    /// A dynamic type alias for a function that takes the result of `NetworkEventTaskState::parse_event`
1564    /// and changes it before transmitting on the network.
1565    pub type ModifierClosure<TYPES> = dyn Fn(
1566            &mut <TYPES as NodeType>::SignatureKey,
1567            &mut MessageKind<TYPES>,
1568            &mut TransmitType<TYPES>,
1569            &<TYPES as NodeType>::Membership,
1570        ) + Send
1571        + Sync;
1572
1573    /// A helper wrapper around `NetworkEventTaskState` that can modify its behaviour for tests
1574    pub struct NetworkEventTaskStateModifier<
1575        TYPES: NodeType,
1576        V: Versions,
1577        NET: ConnectedNetwork<TYPES::SignatureKey>,
1578        S: Storage<TYPES>,
1579    > {
1580        /// The real `NetworkEventTaskState`
1581        pub network_event_task_state: NetworkEventTaskState<TYPES, V, NET, S>,
1582        /// A function that takes the result of `NetworkEventTaskState::parse_event` and
1583        /// changes it before transmitting on the network.
1584        pub modifier: Arc<ModifierClosure<TYPES>>,
1585    }
1586
1587    impl<
1588            TYPES: NodeType,
1589            V: Versions,
1590            NET: ConnectedNetwork<TYPES::SignatureKey>,
1591            S: Storage<TYPES> + 'static,
1592        > NetworkEventTaskStateModifier<TYPES, V, NET, S>
1593    {
1594        /// Handles the received event modifying it before sending on the network.
1595        pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
1596            let mut maybe_action = None;
1597            if let Some((mut sender, mut message_kind, mut transmit)) =
1598                self.parse_event(event, &mut maybe_action).await
1599            {
1600                // Modify the values acquired by parsing the event.
1601                (self.modifier)(
1602                    &mut sender,
1603                    &mut message_kind,
1604                    &mut transmit,
1605                    &*self.membership_coordinator.membership().read().await,
1606                );
1607
1608                self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
1609                    .await;
1610            }
1611        }
1612    }
1613
1614    #[async_trait]
1615    impl<
1616            TYPES: NodeType,
1617            V: Versions,
1618            NET: ConnectedNetwork<TYPES::SignatureKey>,
1619            S: Storage<TYPES> + 'static,
1620        > TaskState for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1621    {
1622        type Event = HotShotEvent<TYPES>;
1623
1624        async fn handle_event(
1625            &mut self,
1626            event: Arc<Self::Event>,
1627            _sender: &Sender<Arc<Self::Event>>,
1628            _receiver: &Receiver<Arc<Self::Event>>,
1629        ) -> Result<()> {
1630            self.handle(event).await;
1631
1632            Ok(())
1633        }
1634
1635        fn cancel_subtasks(&mut self) {}
1636    }
1637
1638    impl<
1639            TYPES: NodeType,
1640            V: Versions,
1641            NET: ConnectedNetwork<TYPES::SignatureKey>,
1642            S: Storage<TYPES>,
1643        > Deref for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1644    {
1645        type Target = NetworkEventTaskState<TYPES, V, NET, S>;
1646
1647        fn deref(&self) -> &Self::Target {
1648            &self.network_event_task_state
1649        }
1650    }
1651
1652    impl<
1653            TYPES: NodeType,
1654            V: Versions,
1655            NET: ConnectedNetwork<TYPES::SignatureKey>,
1656            S: Storage<TYPES>,
1657        > DerefMut for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1658    {
1659        fn deref_mut(&mut self) -> &mut Self::Target {
1660            &mut self.network_event_task_state
1661        }
1662    }
1663}