hotshot_task_impls/
network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap},
9    hash::{DefaultHasher, Hash, Hasher},
10    sync::Arc,
11};
12
13use async_broadcast::{Receiver, Sender};
14use async_trait::async_trait;
15use hotshot_task::task::TaskState;
16use hotshot_types::{
17    consensus::OuterConsensus,
18    data::{VidDisperse, VidDisperseShare},
19    epoch_membership::EpochMembershipCoordinator,
20    event::{Event, EventType, HotShotAction},
21    message::{
22        convert_proposal, DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message,
23        MessageKind, Proposal, SequencingMessage, UpgradeLock,
24    },
25    simple_vote::HasEpoch,
26    storage_metrics::StorageMetricsValue,
27    traits::{
28        network::{
29            BroadcastDelay, ConnectedNetwork, RequestKind, ResponseMessage, Topic, TransmitType,
30            ViewMessage,
31        },
32        node_implementation::{ConsensusTime, NodeType, Versions},
33        storage::Storage,
34    },
35    vote::{HasViewNumber, Vote},
36};
37use hotshot_utils::anytrace::*;
38use tokio::{spawn, task::JoinHandle, time::Instant};
39use tracing::instrument;
40
41use crate::{
42    events::{HotShotEvent, HotShotTaskCompleted},
43    helpers::broadcast_event,
44};
45
46/// the network message task state
47#[derive(Clone)]
48pub struct NetworkMessageTaskState<TYPES: NodeType, V: Versions> {
49    /// Sender to send internal events this task generates to other tasks
50    pub internal_event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
51
52    /// Sender to send external events this task generates to the event stream
53    pub external_event_stream: Sender<Event<TYPES>>,
54
55    /// This nodes public key
56    pub public_key: TYPES::SignatureKey,
57
58    /// Transaction Cache to ignore previously seen transactions
59    pub transactions_cache: lru::LruCache<u64, ()>,
60
61    /// Lock for a decided upgrade
62    pub upgrade_lock: UpgradeLock<TYPES, V>,
63
64    /// Node's id
65    pub id: u64,
66}
67
68impl<TYPES: NodeType, V: Versions> NetworkMessageTaskState<TYPES, V> {
69    #[instrument(skip_all, name = "Network message task", fields(id = self.id), level = "trace")]
70    /// Handles a (deserialized) message from the network
71    pub async fn handle_message(&mut self, message: Message<TYPES>) {
72        match &message.kind {
73            MessageKind::Consensus(_) => {
74                tracing::debug!("Received consensus message from network: {:?}", message)
75            },
76            MessageKind::Data(_) => {
77                tracing::trace!("Received data message from network: {:?}", message)
78            },
79            MessageKind::External(_) => {
80                tracing::trace!("Received external message from network: {:?}", message)
81            },
82        }
83
84        // Match the message kind and send the appropriate event to the internal event stream
85        let sender = message.sender;
86        match message.kind {
87            // Handle consensus messages
88            MessageKind::Consensus(consensus_message) => {
89                let event = match consensus_message {
90                    SequencingMessage::General(general_message) => match general_message {
91                        GeneralConsensusMessage::Proposal(proposal) => {
92                            if self
93                                .upgrade_lock
94                                .epochs_enabled(proposal.data.view_number())
95                                .await
96                            {
97                                tracing::warn!(
98                                    "received GeneralConsensusMessage::Proposal for view {} but \
99                                     epochs are enabled for that view",
100                                    proposal.data.view_number()
101                                );
102                                return;
103                            }
104                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
105                        },
106                        GeneralConsensusMessage::Proposal2Legacy(proposal) => {
107                            if !self
108                                .upgrade_lock
109                                .proposal2_legacy_version(proposal.data.view_number())
110                                .await
111                            {
112                                tracing::warn!(
113                                    "received GeneralConsensusMessage::Proposal2Legacy for view \
114                                     {} but we are in the wrong version for that message type",
115                                    proposal.data.view_number()
116                                );
117                                return;
118                            }
119                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
120                        },
121                        GeneralConsensusMessage::Proposal2(proposal) => {
122                            if !self
123                                .upgrade_lock
124                                .proposal2_version(proposal.data.view_number())
125                                .await
126                            {
127                                tracing::warn!(
128                                    "received GeneralConsensusMessage::Proposal2 for view {} but \
129                                     we are in the wrong version for that message type",
130                                    proposal.data.view_number()
131                                );
132                                return;
133                            }
134                            HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
135                        },
136                        GeneralConsensusMessage::ProposalRequested(req, sig) => {
137                            HotShotEvent::QuorumProposalRequestRecv(req, sig)
138                        },
139                        GeneralConsensusMessage::ProposalResponse(proposal) => {
140                            if self
141                                .upgrade_lock
142                                .epochs_enabled(proposal.data.view_number())
143                                .await
144                            {
145                                tracing::warn!(
146                                    "received GeneralConsensusMessage::ProposalResponse for view \
147                                     {} but we are in the wrong version for that message type",
148                                    proposal.data.view_number()
149                                );
150                                return;
151                            }
152                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
153                        },
154                        GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
155                            if !self
156                                .upgrade_lock
157                                .proposal2_legacy_version(proposal.data.view_number())
158                                .await
159                            {
160                                tracing::warn!(
161                                    "received GeneralConsensusMessage::ProposalResponse2Legacy \
162                                     for view {} but we are in the wrong version for that message \
163                                     type",
164                                    proposal.data.view_number()
165                                );
166                                return;
167                            }
168                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
169                        },
170                        GeneralConsensusMessage::ProposalResponse2(proposal) => {
171                            if !self
172                                .upgrade_lock
173                                .proposal2_version(proposal.data.view_number())
174                                .await
175                            {
176                                tracing::warn!(
177                                    "received GeneralConsensusMessage::ProposalResponse2 for view \
178                                     {} but epochs are not enabled for that view",
179                                    proposal.data.view_number()
180                                );
181                                return;
182                            }
183                            HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
184                        },
185                        GeneralConsensusMessage::Vote(vote) => {
186                            if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
187                                tracing::warn!(
188                                    "received GeneralConsensusMessage::Vote for view {} but \
189                                     epochs are enabled for that view",
190                                    vote.view_number()
191                                );
192                                return;
193                            }
194                            HotShotEvent::QuorumVoteRecv(vote.to_vote2())
195                        },
196                        GeneralConsensusMessage::Vote2(vote) => {
197                            if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
198                                tracing::warn!(
199                                    "received GeneralConsensusMessage::Vote2 for view {} but \
200                                     epochs are not enabled for that view",
201                                    vote.view_number()
202                                );
203                                return;
204                            }
205                            HotShotEvent::QuorumVoteRecv(vote)
206                        },
207                        GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => {
208                            if self
209                                .upgrade_lock
210                                .epochs_enabled(view_sync_message.view_number())
211                                .await
212                            {
213                                tracing::warn!(
214                                    "received GeneralConsensusMessage::ViewSyncPreCommitVote for \
215                                     view {} but epochs are enabled for that view",
216                                    view_sync_message.view_number()
217                                );
218                                return;
219                            }
220                            HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message.to_vote2())
221                        },
222                        GeneralConsensusMessage::ViewSyncPreCommitVote2(view_sync_message) => {
223                            if !self
224                                .upgrade_lock
225                                .epochs_enabled(view_sync_message.view_number())
226                                .await
227                            {
228                                tracing::warn!(
229                                    "received GeneralConsensusMessage::ViewSyncPreCommitVote2 for \
230                                     view {} but epochs are not enabled for that view",
231                                    view_sync_message.view_number()
232                                );
233                                return;
234                            }
235                            HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message)
236                        },
237                        GeneralConsensusMessage::ViewSyncPreCommitCertificate(
238                            view_sync_message,
239                        ) => {
240                            if self
241                                .upgrade_lock
242                                .epochs_enabled(view_sync_message.view_number())
243                                .await
244                            {
245                                tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
246                                return;
247                            }
248                            HotShotEvent::ViewSyncPreCommitCertificateRecv(
249                                view_sync_message.to_vsc2(),
250                            )
251                        },
252                        GeneralConsensusMessage::ViewSyncPreCommitCertificate2(
253                            view_sync_message,
254                        ) => {
255                            if !self
256                                .upgrade_lock
257                                .epochs_enabled(view_sync_message.view_number())
258                                .await
259                            {
260                                tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
261                                return;
262                            }
263                            HotShotEvent::ViewSyncPreCommitCertificateRecv(view_sync_message)
264                        },
265                        GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => {
266                            if self
267                                .upgrade_lock
268                                .epochs_enabled(view_sync_message.view_number())
269                                .await
270                            {
271                                tracing::warn!(
272                                    "received GeneralConsensusMessage::ViewSyncCommitVote for \
273                                     view {} but epochs are enabled for that view",
274                                    view_sync_message.view_number()
275                                );
276                                return;
277                            }
278                            HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message.to_vote2())
279                        },
280                        GeneralConsensusMessage::ViewSyncCommitVote2(view_sync_message) => {
281                            if !self
282                                .upgrade_lock
283                                .epochs_enabled(view_sync_message.view_number())
284                                .await
285                            {
286                                tracing::warn!(
287                                    "received GeneralConsensusMessage::ViewSyncCommitVote2 for \
288                                     view {} but epochs are not enabled for that view",
289                                    view_sync_message.view_number()
290                                );
291                                return;
292                            }
293                            HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message)
294                        },
295                        GeneralConsensusMessage::ViewSyncCommitCertificate(view_sync_message) => {
296                            if self
297                                .upgrade_lock
298                                .epochs_enabled(view_sync_message.view_number())
299                                .await
300                            {
301                                tracing::warn!(
302                                    "received GeneralConsensusMessage::ViewSyncCommitCertificate \
303                                     for view {} but epochs are enabled for that view",
304                                    view_sync_message.view_number()
305                                );
306                                return;
307                            }
308                            HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message.to_vsc2())
309                        },
310                        GeneralConsensusMessage::ViewSyncCommitCertificate2(view_sync_message) => {
311                            if !self
312                                .upgrade_lock
313                                .epochs_enabled(view_sync_message.view_number())
314                                .await
315                            {
316                                tracing::warn!(
317                                    "received GeneralConsensusMessage::ViewSyncCommitCertificate2 \
318                                     for view {} but epochs are not enabled for that view",
319                                    view_sync_message.view_number()
320                                );
321                                return;
322                            }
323                            HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message)
324                        },
325                        GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => {
326                            if self
327                                .upgrade_lock
328                                .epochs_enabled(view_sync_message.view_number())
329                                .await
330                            {
331                                tracing::warn!(
332                                    "received GeneralConsensusMessage::ViewSyncFinalizeVote for \
333                                     view {} but epochs are enabled for that view",
334                                    view_sync_message.view_number()
335                                );
336                                return;
337                            }
338                            HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message.to_vote2())
339                        },
340                        GeneralConsensusMessage::ViewSyncFinalizeVote2(view_sync_message) => {
341                            if !self
342                                .upgrade_lock
343                                .epochs_enabled(view_sync_message.view_number())
344                                .await
345                            {
346                                tracing::warn!(
347                                    "received GeneralConsensusMessage::ViewSyncFinalizeVote2 for \
348                                     view {} but epochs are not enabled for that view",
349                                    view_sync_message.view_number()
350                                );
351                                return;
352                            }
353                            HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message)
354                        },
355                        GeneralConsensusMessage::ViewSyncFinalizeCertificate(view_sync_message) => {
356                            if self
357                                .upgrade_lock
358                                .epochs_enabled(view_sync_message.view_number())
359                                .await
360                            {
361                                tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
362                                return;
363                            }
364                            HotShotEvent::ViewSyncFinalizeCertificateRecv(
365                                view_sync_message.to_vsc2(),
366                            )
367                        },
368                        GeneralConsensusMessage::ViewSyncFinalizeCertificate2(
369                            view_sync_message,
370                        ) => {
371                            if !self
372                                .upgrade_lock
373                                .epochs_enabled(view_sync_message.view_number())
374                                .await
375                            {
376                                tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
377                                return;
378                            }
379                            HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_message)
380                        },
381                        GeneralConsensusMessage::TimeoutVote(message) => {
382                            if self
383                                .upgrade_lock
384                                .epochs_enabled(message.view_number())
385                                .await
386                            {
387                                tracing::warn!(
388                                    "received GeneralConsensusMessage::TimeoutVote for view {} \
389                                     but epochs are enabled for that view",
390                                    message.view_number()
391                                );
392                                return;
393                            }
394                            HotShotEvent::TimeoutVoteRecv(message.to_vote2())
395                        },
396                        GeneralConsensusMessage::TimeoutVote2(message) => {
397                            if !self
398                                .upgrade_lock
399                                .epochs_enabled(message.view_number())
400                                .await
401                            {
402                                tracing::warn!(
403                                    "received GeneralConsensusMessage::TimeoutVote2 for view {} \
404                                     but epochs are not enabled for that view",
405                                    message.view_number()
406                                );
407                                return;
408                            }
409                            HotShotEvent::TimeoutVoteRecv(message)
410                        },
411                        GeneralConsensusMessage::UpgradeProposal(message) => {
412                            HotShotEvent::UpgradeProposalRecv(message, sender)
413                        },
414                        GeneralConsensusMessage::UpgradeVote(message) => {
415                            tracing::error!("Received upgrade vote!");
416                            HotShotEvent::UpgradeVoteRecv(message)
417                        },
418                        GeneralConsensusMessage::HighQc(qc, next_qc) => {
419                            HotShotEvent::HighQcRecv(qc, next_qc, sender)
420                        },
421                        GeneralConsensusMessage::ExtendedQc(qc, next_epoch_qc) => {
422                            HotShotEvent::ExtendedQcRecv(qc, next_epoch_qc, sender)
423                        },
424                        GeneralConsensusMessage::EpochRootQuorumVote(vote) => {
425                            if !self
426                                .upgrade_lock
427                                .proposal2_legacy_version(vote.view_number())
428                                .await
429                            {
430                                tracing::warn!(
431                                    "received GeneralConsensusMessage::EpochRootQuorumVote for \
432                                     view {} but we do not expect this message in this version",
433                                    vote.view_number()
434                                );
435                                return;
436                            }
437                            HotShotEvent::EpochRootQuorumVoteRecv(vote.to_vote2())
438                        },
439                        GeneralConsensusMessage::EpochRootQuorumVote2(vote) => {
440                            if !self
441                                .upgrade_lock
442                                .proposal2_version(vote.view_number())
443                                .await
444                            {
445                                tracing::warn!(
446                                    "received GeneralConsensusMessage::EpochRootQuorumVote2 for \
447                                     view {} but we do not expect this message in this version",
448                                    vote.view_number()
449                                );
450                                return;
451                            }
452                            HotShotEvent::EpochRootQuorumVoteRecv(vote)
453                        },
454                        GeneralConsensusMessage::EpochRootQc(root_qc) => {
455                            if !self
456                                .upgrade_lock
457                                .proposal2_version(root_qc.view_number())
458                                .await
459                            {
460                                tracing::warn!(
461                                    "received GeneralConsensusMessage::EpochRootQc for view {} \
462                                     but we are in the wrong version for that message types",
463                                    root_qc.view_number()
464                                );
465                                return;
466                            }
467                            HotShotEvent::EpochRootQcRecv(root_qc, sender)
468                        },
469                        GeneralConsensusMessage::EpochRootQcV1(root_qc) => {
470                            if !self
471                                .upgrade_lock
472                                .proposal2_legacy_version(root_qc.view_number())
473                                .await
474                            {
475                                tracing::warn!(
476                                    "received GeneralConsensusMessage::EpochRootQcV1 for view {} \
477                                     but we are in the wrong version for that message type",
478                                    root_qc.view_number()
479                                );
480                                return;
481                            }
482                            HotShotEvent::EpochRootQcRecv(root_qc.into(), sender)
483                        },
484                    },
485                    SequencingMessage::Da(da_message) => match da_message {
486                        DaConsensusMessage::DaProposal(proposal) => {
487                            if self
488                                .upgrade_lock
489                                .epochs_enabled(proposal.data.view_number())
490                                .await
491                            {
492                                tracing::warn!(
493                                    "received DaConsensusMessage::DaProposal for view {} but \
494                                     epochs are enabled for that view",
495                                    proposal.data.view_number()
496                                );
497                                return;
498                            }
499                            HotShotEvent::DaProposalRecv(convert_proposal(proposal), sender)
500                        },
501                        DaConsensusMessage::DaProposal2(proposal) => {
502                            if !self
503                                .upgrade_lock
504                                .epochs_enabled(proposal.data.view_number())
505                                .await
506                            {
507                                tracing::warn!(
508                                    "received DaConsensusMessage::DaProposal2 for view {} but \
509                                     epochs are not enabled for that view",
510                                    proposal.data.view_number()
511                                );
512                                return;
513                            }
514                            HotShotEvent::DaProposalRecv(proposal, sender)
515                        },
516                        DaConsensusMessage::DaVote(vote) => {
517                            if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
518                                tracing::warn!(
519                                    "received DaConsensusMessage::DaVote for view {} but epochs \
520                                     are enabled for that view",
521                                    vote.view_number()
522                                );
523                                return;
524                            }
525                            HotShotEvent::DaVoteRecv(vote.clone().to_vote2())
526                        },
527                        DaConsensusMessage::DaVote2(vote) => {
528                            if !self.upgrade_lock.epochs_enabled(vote.view_number()).await {
529                                tracing::warn!(
530                                    "received DaConsensusMessage::DaVote2 for view {} but epochs \
531                                     are not enabled for that view",
532                                    vote.view_number()
533                                );
534                                return;
535                            }
536                            HotShotEvent::DaVoteRecv(vote.clone())
537                        },
538                        DaConsensusMessage::DaCertificate(cert) => {
539                            if self.upgrade_lock.epochs_enabled(cert.view_number()).await {
540                                tracing::warn!(
541                                    "received DaConsensusMessage::DaCertificate for view {} but \
542                                     epochs are enabled for that view",
543                                    cert.view_number()
544                                );
545                                return;
546                            }
547                            HotShotEvent::DaCertificateRecv(cert.to_dac2())
548                        },
549                        DaConsensusMessage::DaCertificate2(cert) => {
550                            if !self.upgrade_lock.epochs_enabled(cert.view_number()).await {
551                                tracing::warn!(
552                                    "received DaConsensusMessage::DaCertificate2 for view {} but \
553                                     epochs are not enabled for that view",
554                                    cert.view_number()
555                                );
556                                return;
557                            }
558                            HotShotEvent::DaCertificateRecv(cert)
559                        },
560                        DaConsensusMessage::VidDisperseMsg(proposal) => {
561                            if self
562                                .upgrade_lock
563                                .epochs_enabled(proposal.data.view_number())
564                                .await
565                            {
566                                tracing::warn!(
567                                    "received DaConsensusMessage::VidDisperseMsg for view {} but \
568                                     epochs are enabled for that view",
569                                    proposal.data.view_number()
570                                );
571                                return;
572                            }
573                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
574                        },
575                        DaConsensusMessage::VidDisperseMsg2(proposal) => {
576                            if !self
577                                .upgrade_lock
578                                .epochs_enabled(proposal.data.view_number())
579                                .await
580                            {
581                                tracing::warn!(
582                                    "received DaConsensusMessage::VidDisperseMsg2 for view {} but \
583                                     epochs are not enabled for that view",
584                                    proposal.data.view_number()
585                                );
586                                return;
587                            }
588                            HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
589                        },
590                    },
591                };
592                broadcast_event(Arc::new(event), &self.internal_event_stream).await;
593            },
594
595            // Handle data messages
596            MessageKind::Data(message) => match message {
597                DataMessage::SubmitTransaction(transaction, _) => {
598                    let mut hasher = DefaultHasher::new();
599                    transaction.hash(&mut hasher);
600                    if self.transactions_cache.put(hasher.finish(), ()).is_some() {
601                        return;
602                    }
603                    broadcast_event(
604                        Arc::new(HotShotEvent::TransactionsRecv(vec![transaction])),
605                        &self.internal_event_stream,
606                    )
607                    .await;
608                },
609                DataMessage::DataResponse(response) => {
610                    if let ResponseMessage::Found(message) = response {
611                        match message {
612                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(proposal)) => {
613                                broadcast_event(
614                                    Arc::new(HotShotEvent::VidResponseRecv(
615                                        sender,
616                                        convert_proposal(proposal),
617                                    )),
618                                    &self.internal_event_stream,
619                                )
620                                .await;
621                            },
622                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
623                                proposal,
624                            )) => {
625                                broadcast_event(
626                                    Arc::new(HotShotEvent::VidResponseRecv(
627                                        sender,
628                                        convert_proposal(proposal),
629                                    )),
630                                    &self.internal_event_stream,
631                                )
632                                .await;
633                            },
634                            _ => {},
635                        }
636                    }
637                },
638                DataMessage::RequestData(data) => {
639                    let req_data = data.clone();
640                    if let RequestKind::Vid(_view_number, _key) = req_data.request {
641                        broadcast_event(
642                            Arc::new(HotShotEvent::VidRequestRecv(data, sender)),
643                            &self.internal_event_stream,
644                        )
645                        .await;
646                    }
647                },
648            },
649
650            // Handle external messages
651            MessageKind::External(data) => {
652                if sender == self.public_key {
653                    return;
654                }
655                // Send the external message to the external event stream so it can be processed
656                broadcast_event(
657                    Event {
658                        view_number: TYPES::View::new(1),
659                        event: EventType::ExternalMessageReceived { sender, data },
660                    },
661                    &self.external_event_stream,
662                )
663                .await;
664            },
665        }
666    }
667}
668
669/// network event task state
670pub struct NetworkEventTaskState<
671    TYPES: NodeType,
672    V: Versions,
673    NET: ConnectedNetwork<TYPES::SignatureKey>,
674    S: Storage<TYPES>,
675> {
676    /// comm network
677    pub network: Arc<NET>,
678
679    /// view number
680    pub view: TYPES::View,
681
682    /// epoch number
683    pub epoch: Option<TYPES::Epoch>,
684
685    /// network memberships
686    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
687
688    /// Storage to store actionable events
689    pub storage: S,
690
691    /// Storage metrics
692    pub storage_metrics: Arc<StorageMetricsValue>,
693
694    /// Shared consensus state
695    pub consensus: OuterConsensus<TYPES>,
696
697    /// Lock for a decided upgrade
698    pub upgrade_lock: UpgradeLock<TYPES, V>,
699
700    /// map view number to transmit tasks
701    pub transmit_tasks: BTreeMap<TYPES::View, Vec<JoinHandle<()>>>,
702
703    /// Number of blocks in an epoch, zero means there are no epochs
704    pub epoch_height: u64,
705
706    /// Node's id
707    pub id: u64,
708}
709
710#[async_trait]
711impl<
712        TYPES: NodeType,
713        V: Versions,
714        NET: ConnectedNetwork<TYPES::SignatureKey>,
715        S: Storage<TYPES> + 'static,
716    > TaskState for NetworkEventTaskState<TYPES, V, NET, S>
717{
718    type Event = HotShotEvent<TYPES>;
719
720    async fn handle_event(
721        &mut self,
722        event: Arc<Self::Event>,
723        _sender: &Sender<Arc<Self::Event>>,
724        _receiver: &Receiver<Arc<Self::Event>>,
725    ) -> Result<()> {
726        self.handle(event).await;
727
728        Ok(())
729    }
730
731    fn cancel_subtasks(&mut self) {}
732}
733
734impl<
735        TYPES: NodeType,
736        V: Versions,
737        NET: ConnectedNetwork<TYPES::SignatureKey>,
738        S: Storage<TYPES> + 'static,
739    > NetworkEventTaskState<TYPES, V, NET, S>
740{
741    /// Handle the given event.
742    ///
743    /// Returns the completion status.
744    #[instrument(skip_all, fields(id = self.id, view = *self.view), name = "Network Task", level = "error")]
745    pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
746        let mut maybe_action = None;
747        if let Some((sender, message_kind, transmit)) =
748            self.parse_event(event, &mut maybe_action).await
749        {
750            self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
751                .await;
752        };
753    }
754
755    /// handle `VidDisperseSend`
756    async fn handle_vid_disperse_proposal(
757        &self,
758        vid_proposal: Proposal<TYPES, VidDisperse<TYPES>>,
759        sender: &<TYPES as NodeType>::SignatureKey,
760    ) -> Option<HotShotTaskCompleted> {
761        let view = vid_proposal.data.view_number();
762        let epoch = vid_proposal.data.epoch();
763        let vid_share_proposals = VidDisperseShare::to_vid_share_proposals(vid_proposal);
764        let mut messages = HashMap::new();
765
766        for proposal in vid_share_proposals {
767            let recipient = proposal.data.recipient_key().clone();
768            let message = if self
769                .upgrade_lock
770                .epochs_enabled(proposal.data.view_number())
771                .await
772            {
773                let vid_share_proposal = if let VidDisperseShare::V1(data) = proposal.data {
774                    Proposal {
775                        data,
776                        signature: proposal.signature,
777                        _pd: proposal._pd,
778                    }
779                } else {
780                    tracing::warn!(
781                        "Epochs are enabled for view {} but didn't receive VidDisperseShare2",
782                        proposal.data.view_number()
783                    );
784                    return None;
785                };
786                Message {
787                    sender: sender.clone(),
788                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
789                        DaConsensusMessage::VidDisperseMsg2(vid_share_proposal),
790                    )),
791                }
792            } else {
793                let vid_share_proposal = if let VidDisperseShare::V0(data) = proposal.data {
794                    Proposal {
795                        data,
796                        signature: proposal.signature,
797                        _pd: proposal._pd,
798                    }
799                } else {
800                    tracing::warn!(
801                        "Epochs are not enabled for view {} but didn't receive ADVZDisperseShare",
802                        proposal.data.view_number()
803                    );
804                    return None;
805                };
806                Message {
807                    sender: sender.clone(),
808                    kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
809                        DaConsensusMessage::VidDisperseMsg(vid_share_proposal),
810                    )),
811                }
812            };
813            let serialized_message = match self.upgrade_lock.serialize(&message).await {
814                Ok(serialized) => serialized,
815                Err(e) => {
816                    tracing::error!("Failed to serialize message: {e}");
817                    continue;
818                },
819            };
820
821            messages.insert(recipient, serialized_message);
822        }
823
824        let net = Arc::clone(&self.network);
825        let storage = self.storage.clone();
826        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
827        spawn(async move {
828            if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
829                Some(HotShotAction::VidDisperse),
830                storage,
831                consensus,
832                view,
833                epoch,
834            )
835            .await
836            .is_err()
837            {
838                return;
839            }
840            match net.vid_broadcast_message(messages).await {
841                Ok(()) => {},
842                Err(e) => tracing::warn!("Failed to send message from network task: {e:?}"),
843            }
844        });
845
846        None
847    }
848
849    /// Record `HotShotAction` if available
850    async fn maybe_record_action(
851        maybe_action: Option<HotShotAction>,
852        storage: S,
853        consensus: OuterConsensus<TYPES>,
854        view: <TYPES as NodeType>::View,
855        epoch: Option<<TYPES as NodeType>::Epoch>,
856    ) -> std::result::Result<(), ()> {
857        if let Some(mut action) = maybe_action {
858            if !consensus.write().await.update_action(action, view) {
859                return Err(());
860            }
861            // If the action was view sync record it as a vote, but we don't
862            // want to limit to 1 View sync vote above so change the action here.
863            if matches!(action, HotShotAction::ViewSyncVote) {
864                action = HotShotAction::Vote;
865            }
866            match storage.record_action(view, epoch, action).await {
867                Ok(()) => Ok(()),
868                Err(e) => {
869                    tracing::warn!("Not Sending {action:?} because of storage error: {e:?}");
870                    Err(())
871                },
872            }
873        } else {
874            Ok(())
875        }
876    }
877
878    /// Cancel all tasks for previous views
879    pub fn cancel_tasks(&mut self, view: TYPES::View) {
880        let keep = self.transmit_tasks.split_off(&view);
881
882        while let Some((_, tasks)) = self.transmit_tasks.pop_first() {
883            for task in tasks {
884                task.abort();
885            }
886        }
887
888        self.transmit_tasks = keep;
889    }
890
891    /// Parses a `HotShotEvent` and returns a tuple of: (sender's public key, `MessageKind`, `TransmitType`)
892    /// which will be used to create a message and transmit on the wire.
893    /// Returns `None` if the parsing result should not be sent on the wire.
894    /// Handles the `VidDisperseSend` event separately using a helper method.
895    #[allow(clippy::too_many_lines)]
896    async fn parse_event(
897        &mut self,
898        event: Arc<HotShotEvent<TYPES>>,
899        maybe_action: &mut Option<HotShotAction>,
900    ) -> Option<(
901        <TYPES as NodeType>::SignatureKey,
902        MessageKind<TYPES>,
903        TransmitType<TYPES>,
904    )> {
905        match event.as_ref().clone() {
906            HotShotEvent::QuorumProposalSend(proposal, sender) => {
907                *maybe_action = Some(HotShotAction::Propose);
908
909                let message = if self
910                    .upgrade_lock
911                    .proposal2_version(proposal.data.view_number())
912                    .await
913                {
914                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
915                        GeneralConsensusMessage::Proposal2(convert_proposal(proposal)),
916                    ))
917                } else if self
918                    .upgrade_lock
919                    .proposal2_legacy_version(proposal.data.view_number())
920                    .await
921                {
922                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
923                        GeneralConsensusMessage::Proposal2Legacy(convert_proposal(proposal)),
924                    ))
925                } else {
926                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
927                        GeneralConsensusMessage::Proposal(convert_proposal(proposal)),
928                    ))
929                };
930
931                Some((sender, message, TransmitType::Broadcast))
932            },
933
934            // ED Each network task is subscribed to all these message types.  Need filters per network task
935            HotShotEvent::QuorumVoteSend(vote) => {
936                *maybe_action = Some(HotShotAction::Vote);
937                let view_number = vote.view_number() + 1;
938                let leader = match self
939                    .membership_coordinator
940                    .membership_for_epoch(vote.epoch())
941                    .await
942                    .ok()?
943                    .leader(view_number)
944                    .await
945                {
946                    Ok(l) => l,
947                    Err(e) => {
948                        tracing::warn!(
949                            "Failed to calculate leader for view number {view_number}. Error: \
950                             {e:?}"
951                        );
952                        return None;
953                    },
954                };
955
956                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
957                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
958                        GeneralConsensusMessage::Vote2(vote.clone()),
959                    ))
960                } else {
961                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
962                        GeneralConsensusMessage::Vote(vote.clone().to_vote()),
963                    ))
964                };
965
966                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
967            },
968            HotShotEvent::EpochRootQuorumVoteSend(vote) => {
969                *maybe_action = Some(HotShotAction::Vote);
970                let view_number = vote.view_number() + 1;
971                let leader = match self
972                    .membership_coordinator
973                    .membership_for_epoch(vote.epoch())
974                    .await
975                    .ok()?
976                    .leader(view_number)
977                    .await
978                {
979                    Ok(l) => l,
980                    Err(e) => {
981                        tracing::warn!(
982                            "Failed to calculate leader for view number {:?}. Error: {:?}",
983                            view_number,
984                            e
985                        );
986                        return None;
987                    },
988                };
989
990                let message = if self
991                    .upgrade_lock
992                    .proposal2_version(vote.view_number())
993                    .await
994                {
995                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
996                        GeneralConsensusMessage::EpochRootQuorumVote2(vote.clone()),
997                    ))
998                } else if self
999                    .upgrade_lock
1000                    .proposal2_legacy_version(vote.view_number())
1001                    .await
1002                {
1003                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1004                        GeneralConsensusMessage::EpochRootQuorumVote(vote.clone().to_vote()),
1005                    ))
1006                } else {
1007                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1008                        GeneralConsensusMessage::Vote(vote.vote.clone().to_vote()),
1009                    ))
1010                };
1011
1012                Some((
1013                    vote.vote.signing_key(),
1014                    message,
1015                    TransmitType::Direct(leader),
1016                ))
1017            },
1018            HotShotEvent::ExtendedQuorumVoteSend(vote) => {
1019                *maybe_action = Some(HotShotAction::Vote);
1020                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1021                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1022                        GeneralConsensusMessage::Vote2(vote.clone()),
1023                    ))
1024                } else {
1025                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1026                        GeneralConsensusMessage::Vote(vote.clone().to_vote()),
1027                    ))
1028                };
1029
1030                Some((vote.signing_key(), message, TransmitType::Broadcast))
1031            },
1032            HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
1033                req.key.clone(),
1034                MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1035                    GeneralConsensusMessage::ProposalRequested(req.clone(), signature),
1036                )),
1037                TransmitType::Broadcast,
1038            )),
1039            HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => {
1040                let message = if self
1041                    .upgrade_lock
1042                    .proposal2_version(proposal.data.view_number())
1043                    .await
1044                {
1045                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1046                        GeneralConsensusMessage::ProposalResponse2(convert_proposal(proposal)),
1047                    ))
1048                } else if self
1049                    .upgrade_lock
1050                    .proposal2_legacy_version(proposal.data.view_number())
1051                    .await
1052                {
1053                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1054                        GeneralConsensusMessage::ProposalResponse2Legacy(convert_proposal(
1055                            proposal,
1056                        )),
1057                    ))
1058                } else {
1059                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1060                        GeneralConsensusMessage::ProposalResponse(convert_proposal(proposal)),
1061                    ))
1062                };
1063
1064                Some((
1065                    sender_key.clone(),
1066                    message,
1067                    TransmitType::Direct(sender_key),
1068                ))
1069            },
1070            HotShotEvent::VidDisperseSend(proposal, sender) => {
1071                self.handle_vid_disperse_proposal(proposal, &sender).await;
1072                None
1073            },
1074            HotShotEvent::DaProposalSend(proposal, sender) => {
1075                *maybe_action = Some(HotShotAction::DaPropose);
1076
1077                let message = if self
1078                    .upgrade_lock
1079                    .epochs_enabled(proposal.data.view_number())
1080                    .await
1081                {
1082                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1083                        DaConsensusMessage::DaProposal2(proposal),
1084                    ))
1085                } else {
1086                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1087                        DaConsensusMessage::DaProposal(convert_proposal(proposal)),
1088                    ))
1089                };
1090
1091                Some((sender, message, TransmitType::DaCommitteeBroadcast))
1092            },
1093            HotShotEvent::DaVoteSend(vote) => {
1094                *maybe_action = Some(HotShotAction::DaVote);
1095                let view_number = vote.view_number();
1096                let leader = match self
1097                    .membership_coordinator
1098                    .membership_for_epoch(vote.epoch())
1099                    .await
1100                    .ok()?
1101                    .leader(view_number)
1102                    .await
1103                {
1104                    Ok(l) => l,
1105                    Err(e) => {
1106                        tracing::warn!(
1107                            "Failed to calculate leader for view number {view_number}. Error: \
1108                             {e:?}"
1109                        );
1110                        return None;
1111                    },
1112                };
1113
1114                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1115                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1116                        DaConsensusMessage::DaVote2(vote.clone()),
1117                    ))
1118                } else {
1119                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1120                        DaConsensusMessage::DaVote(vote.clone().to_vote()),
1121                    ))
1122                };
1123
1124                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1125            },
1126            HotShotEvent::DacSend(certificate, sender) => {
1127                *maybe_action = Some(HotShotAction::DaCert);
1128                let message = if self
1129                    .upgrade_lock
1130                    .epochs_enabled(certificate.view_number())
1131                    .await
1132                {
1133                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1134                        DaConsensusMessage::DaCertificate2(certificate),
1135                    ))
1136                } else {
1137                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1138                        DaConsensusMessage::DaCertificate(certificate.to_dac()),
1139                    ))
1140                };
1141
1142                Some((sender, message, TransmitType::Broadcast))
1143            },
1144            HotShotEvent::ViewSyncPreCommitVoteSend(vote) => {
1145                let view_number = vote.view_number() + vote.date().relay;
1146                let leader = match self
1147                    .membership_coordinator
1148                    .membership_for_epoch(self.epoch)
1149                    .await
1150                    .ok()?
1151                    .leader(view_number)
1152                    .await
1153                {
1154                    Ok(l) => l,
1155                    Err(e) => {
1156                        tracing::warn!(
1157                            "Failed to calculate leader for view number {view_number}. Error: \
1158                             {e:?}"
1159                        );
1160                        return None;
1161                    },
1162                };
1163                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1164                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1165                        GeneralConsensusMessage::ViewSyncPreCommitVote2(vote.clone()),
1166                    ))
1167                } else {
1168                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1169                        GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone().to_vote()),
1170                    ))
1171                };
1172
1173                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1174            },
1175            HotShotEvent::ViewSyncCommitVoteSend(vote) => {
1176                *maybe_action = Some(HotShotAction::ViewSyncVote);
1177                let view_number = vote.view_number() + vote.date().relay;
1178                let leader = match self
1179                    .membership_coordinator
1180                    .membership_for_epoch(self.epoch)
1181                    .await
1182                    .ok()?
1183                    .leader(view_number)
1184                    .await
1185                {
1186                    Ok(l) => l,
1187                    Err(e) => {
1188                        tracing::warn!(
1189                            "Failed to calculate leader for view number {view_number}. Error: \
1190                             {e:?}"
1191                        );
1192                        return None;
1193                    },
1194                };
1195                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1196                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1197                        GeneralConsensusMessage::ViewSyncCommitVote2(vote.clone()),
1198                    ))
1199                } else {
1200                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1201                        GeneralConsensusMessage::ViewSyncCommitVote(vote.clone().to_vote()),
1202                    ))
1203                };
1204
1205                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1206            },
1207            HotShotEvent::ViewSyncFinalizeVoteSend(vote) => {
1208                *maybe_action = Some(HotShotAction::ViewSyncVote);
1209                let view_number = vote.view_number() + vote.date().relay;
1210                let leader = match self
1211                    .membership_coordinator
1212                    .membership_for_epoch(self.epoch)
1213                    .await
1214                    .ok()?
1215                    .leader(view_number)
1216                    .await
1217                {
1218                    Ok(l) => l,
1219                    Err(e) => {
1220                        tracing::warn!(
1221                            "Failed to calculate leader for view number {view_number:?}. Error: \
1222                             {e:?}"
1223                        );
1224                        return None;
1225                    },
1226                };
1227                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1228                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1229                        GeneralConsensusMessage::ViewSyncFinalizeVote2(vote.clone()),
1230                    ))
1231                } else {
1232                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1233                        GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone().to_vote()),
1234                    ))
1235                };
1236
1237                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1238            },
1239            HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, sender) => {
1240                let view_number = certificate.view_number();
1241                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1242                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1243                        GeneralConsensusMessage::ViewSyncPreCommitCertificate2(certificate),
1244                    ))
1245                } else {
1246                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1247                        GeneralConsensusMessage::ViewSyncPreCommitCertificate(certificate.to_vsc()),
1248                    ))
1249                };
1250
1251                Some((sender, message, TransmitType::Broadcast))
1252            },
1253            HotShotEvent::ViewSyncCommitCertificateSend(certificate, sender) => {
1254                let view_number = certificate.view_number();
1255                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1256                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1257                        GeneralConsensusMessage::ViewSyncCommitCertificate2(certificate),
1258                    ))
1259                } else {
1260                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1261                        GeneralConsensusMessage::ViewSyncCommitCertificate(certificate.to_vsc()),
1262                    ))
1263                };
1264
1265                Some((sender, message, TransmitType::Broadcast))
1266            },
1267            HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, sender) => {
1268                let view_number = certificate.view_number();
1269                let message = if self.upgrade_lock.epochs_enabled(view_number).await {
1270                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1271                        GeneralConsensusMessage::ViewSyncFinalizeCertificate2(certificate),
1272                    ))
1273                } else {
1274                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1275                        GeneralConsensusMessage::ViewSyncFinalizeCertificate(certificate.to_vsc()),
1276                    ))
1277                };
1278
1279                Some((sender, message, TransmitType::Broadcast))
1280            },
1281            HotShotEvent::TimeoutVoteSend(vote) => {
1282                *maybe_action = Some(HotShotAction::TimeoutVote);
1283                let view_number = vote.view_number() + 1;
1284                let leader = match self
1285                    .membership_coordinator
1286                    .membership_for_epoch(self.epoch)
1287                    .await
1288                    .ok()?
1289                    .leader(view_number)
1290                    .await
1291                {
1292                    Ok(l) => l,
1293                    Err(e) => {
1294                        tracing::warn!(
1295                            "Failed to calculate leader for view number {view_number}. Error: \
1296                             {e:?}"
1297                        );
1298                        return None;
1299                    },
1300                };
1301                let message = if self.upgrade_lock.epochs_enabled(vote.view_number()).await {
1302                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1303                        GeneralConsensusMessage::TimeoutVote2(vote.clone()),
1304                    ))
1305                } else {
1306                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1307                        GeneralConsensusMessage::TimeoutVote(vote.clone().to_vote()),
1308                    ))
1309                };
1310
1311                Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1312            },
1313            HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
1314                sender,
1315                MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1316                    GeneralConsensusMessage::UpgradeProposal(proposal),
1317                )),
1318                TransmitType::Broadcast,
1319            )),
1320            HotShotEvent::UpgradeVoteSend(vote) => {
1321                tracing::error!("Sending upgrade vote!");
1322                let view_number = vote.view_number();
1323                let leader = match self
1324                    .membership_coordinator
1325                    .membership_for_epoch(self.epoch)
1326                    .await
1327                    .ok()?
1328                    .leader(view_number)
1329                    .await
1330                {
1331                    Ok(l) => l,
1332                    Err(e) => {
1333                        tracing::warn!(
1334                            "Failed to calculate leader for view number {view_number}. Error: \
1335                             {e:?}"
1336                        );
1337                        return None;
1338                    },
1339                };
1340                Some((
1341                    vote.signing_key(),
1342                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1343                        GeneralConsensusMessage::UpgradeVote(vote.clone()),
1344                    )),
1345                    TransmitType::Direct(leader),
1346                ))
1347            },
1348            HotShotEvent::ViewChange(view, epoch) => {
1349                self.view = view;
1350                if epoch > self.epoch {
1351                    self.epoch = epoch;
1352                }
1353                let keep_view = TYPES::View::new(view.saturating_sub(1));
1354                self.cancel_tasks(keep_view);
1355                let net = Arc::clone(&self.network);
1356                let epoch = self.epoch.map(|x| x.u64());
1357                let membership_coordinator = self.membership_coordinator.clone();
1358                spawn(async move {
1359                    net.update_view::<TYPES>(*keep_view, epoch, membership_coordinator)
1360                        .await;
1361                });
1362                None
1363            },
1364            HotShotEvent::VidRequestSend(req, sender, to) => Some((
1365                sender,
1366                MessageKind::Data(DataMessage::RequestData(req)),
1367                TransmitType::Direct(to),
1368            )),
1369            HotShotEvent::VidResponseSend(sender, to, proposal) => {
1370                let epochs_enabled = self
1371                    .upgrade_lock
1372                    .epochs_enabled(proposal.data.view_number())
1373                    .await;
1374                let message = match proposal.data {
1375                    VidDisperseShare::V0(data) => {
1376                        if epochs_enabled {
1377                            tracing::warn!(
1378                                "Epochs are enabled for view {} but didn't receive \
1379                                 VidDisperseShare2",
1380                                data.view_number()
1381                            );
1382                            return None;
1383                        }
1384                        let vid_share_proposal = Proposal {
1385                            data,
1386                            signature: proposal.signature,
1387                            _pd: proposal._pd,
1388                        };
1389                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1390                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(
1391                                vid_share_proposal,
1392                            )),
1393                        )))
1394                    },
1395                    VidDisperseShare::V1(data) => {
1396                        if !epochs_enabled {
1397                            tracing::warn!(
1398                                "Epochs are enabled for view {} but didn't receive \
1399                                 ADVZDisperseShare",
1400                                data.view_number()
1401                            );
1402                            return None;
1403                        }
1404                        let vid_share_proposal = Proposal {
1405                            data,
1406                            signature: proposal.signature,
1407                            _pd: proposal._pd,
1408                        };
1409                        MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1410                            SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
1411                                vid_share_proposal,
1412                            )),
1413                        )))
1414                    },
1415                };
1416                Some((sender, message, TransmitType::Direct(to)))
1417            },
1418            HotShotEvent::HighQcSend(quorum_cert, next_epoch_qc, leader, sender) => Some((
1419                sender,
1420                MessageKind::Consensus(SequencingMessage::General(
1421                    GeneralConsensusMessage::HighQc(quorum_cert, next_epoch_qc),
1422                )),
1423                TransmitType::Direct(leader),
1424            )),
1425            HotShotEvent::EpochRootQcSend(epoch_root_qc, sender, leader) => {
1426                let message = if self
1427                    .upgrade_lock
1428                    .proposal2_version(epoch_root_qc.view_number())
1429                    .await
1430                {
1431                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1432                        GeneralConsensusMessage::EpochRootQc(epoch_root_qc),
1433                    ))
1434                } else {
1435                    MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1436                        GeneralConsensusMessage::EpochRootQcV1(epoch_root_qc.into()),
1437                    ))
1438                };
1439
1440                Some((sender, message, TransmitType::Direct(leader)))
1441            },
1442            HotShotEvent::ExtendedQcSend(quorum_cert, next_epoch_qc, sender) => Some((
1443                sender,
1444                MessageKind::Consensus(SequencingMessage::General(
1445                    GeneralConsensusMessage::ExtendedQc(quorum_cert, next_epoch_qc),
1446                )),
1447                TransmitType::Broadcast,
1448            )),
1449            _ => None,
1450        }
1451    }
1452
1453    /// Creates a network message and spawns a task that transmits it on the wire.
1454    async fn spawn_transmit_task(
1455        &mut self,
1456        message_kind: MessageKind<TYPES>,
1457        maybe_action: Option<HotShotAction>,
1458        transmit: TransmitType<TYPES>,
1459        sender: TYPES::SignatureKey,
1460    ) {
1461        let broadcast_delay = match &message_kind {
1462            MessageKind::Consensus(
1463                SequencingMessage::General(GeneralConsensusMessage::Vote(_))
1464                | SequencingMessage::Da(_),
1465            ) => BroadcastDelay::View(*message_kind.view_number()),
1466            _ => BroadcastDelay::None,
1467        };
1468        let message = Message {
1469            sender,
1470            kind: message_kind,
1471        };
1472        let view_number = message.kind.view_number();
1473        let epoch = message.kind.epoch();
1474        let committee_topic = Topic::Global;
1475        let Ok(mem) = self
1476            .membership_coordinator
1477            .stake_table_for_epoch(self.epoch)
1478            .await
1479        else {
1480            return;
1481        };
1482        let da_committee = mem.da_committee_members(view_number).await;
1483        let network = Arc::clone(&self.network);
1484        let storage = self.storage.clone();
1485        let storage_metrics = Arc::clone(&self.storage_metrics);
1486        let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
1487        let upgrade_lock = self.upgrade_lock.clone();
1488        let handle = spawn(async move {
1489            if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
1490                maybe_action,
1491                storage.clone(),
1492                consensus,
1493                view_number,
1494                epoch,
1495            )
1496            .await
1497            .is_err()
1498            {
1499                return;
1500            }
1501            if let MessageKind::Consensus(SequencingMessage::General(
1502                GeneralConsensusMessage::Proposal(prop),
1503            )) = &message.kind
1504            {
1505                let now = Instant::now();
1506                if storage
1507                    .append_proposal2(&convert_proposal(prop.clone()))
1508                    .await
1509                    .is_err()
1510                {
1511                    return;
1512                }
1513                storage_metrics
1514                    .append_quorum_duration
1515                    .add_point(now.elapsed().as_secs_f64());
1516            }
1517
1518            let serialized_message = match upgrade_lock.serialize(&message).await {
1519                Ok(serialized) => serialized,
1520                Err(e) => {
1521                    tracing::error!("Failed to serialize message: {e}");
1522                    return;
1523                },
1524            };
1525
1526            let transmit_result = match transmit {
1527                TransmitType::Direct(recipient) => {
1528                    network.direct_message(serialized_message, recipient).await
1529                },
1530                TransmitType::Broadcast => {
1531                    network
1532                        .broadcast_message(serialized_message, committee_topic, broadcast_delay)
1533                        .await
1534                },
1535                TransmitType::DaCommitteeBroadcast => {
1536                    network
1537                        .da_broadcast_message(
1538                            serialized_message,
1539                            da_committee.iter().cloned().collect(),
1540                            broadcast_delay,
1541                        )
1542                        .await
1543                },
1544            };
1545
1546            match transmit_result {
1547                Ok(()) => {},
1548                Err(e) => tracing::warn!("Failed to send message task: {e:?}"),
1549            }
1550        });
1551        self.transmit_tasks
1552            .entry(view_number)
1553            .or_default()
1554            .push(handle);
1555    }
1556}
1557
1558/// A module with test helpers
1559pub mod test {
1560    use std::ops::{Deref, DerefMut};
1561
1562    use async_trait::async_trait;
1563
1564    use super::{
1565        Arc, ConnectedNetwork, HotShotEvent, MessageKind, NetworkEventTaskState, NodeType,
1566        Receiver, Result, Sender, Storage, TaskState, TransmitType, Versions,
1567    };
1568
1569    /// A dynamic type alias for a function that takes the result of `NetworkEventTaskState::parse_event`
1570    /// and changes it before transmitting on the network.
1571    pub type ModifierClosure<TYPES> = dyn Fn(
1572            &mut <TYPES as NodeType>::SignatureKey,
1573            &mut MessageKind<TYPES>,
1574            &mut TransmitType<TYPES>,
1575            &<TYPES as NodeType>::Membership,
1576        ) + Send
1577        + Sync;
1578
1579    /// A helper wrapper around `NetworkEventTaskState` that can modify its behaviour for tests
1580    pub struct NetworkEventTaskStateModifier<
1581        TYPES: NodeType,
1582        V: Versions,
1583        NET: ConnectedNetwork<TYPES::SignatureKey>,
1584        S: Storage<TYPES>,
1585    > {
1586        /// The real `NetworkEventTaskState`
1587        pub network_event_task_state: NetworkEventTaskState<TYPES, V, NET, S>,
1588        /// A function that takes the result of `NetworkEventTaskState::parse_event` and
1589        /// changes it before transmitting on the network.
1590        pub modifier: Arc<ModifierClosure<TYPES>>,
1591    }
1592
1593    impl<
1594            TYPES: NodeType,
1595            V: Versions,
1596            NET: ConnectedNetwork<TYPES::SignatureKey>,
1597            S: Storage<TYPES> + 'static,
1598        > NetworkEventTaskStateModifier<TYPES, V, NET, S>
1599    {
1600        /// Handles the received event modifying it before sending on the network.
1601        pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
1602            let mut maybe_action = None;
1603            if let Some((mut sender, mut message_kind, mut transmit)) =
1604                self.parse_event(event, &mut maybe_action).await
1605            {
1606                // Modify the values acquired by parsing the event.
1607                (self.modifier)(
1608                    &mut sender,
1609                    &mut message_kind,
1610                    &mut transmit,
1611                    &*self.membership_coordinator.membership().read().await,
1612                );
1613
1614                self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
1615                    .await;
1616            }
1617        }
1618    }
1619
1620    #[async_trait]
1621    impl<
1622            TYPES: NodeType,
1623            V: Versions,
1624            NET: ConnectedNetwork<TYPES::SignatureKey>,
1625            S: Storage<TYPES> + 'static,
1626        > TaskState for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1627    {
1628        type Event = HotShotEvent<TYPES>;
1629
1630        async fn handle_event(
1631            &mut self,
1632            event: Arc<Self::Event>,
1633            _sender: &Sender<Arc<Self::Event>>,
1634            _receiver: &Receiver<Arc<Self::Event>>,
1635        ) -> Result<()> {
1636            self.handle(event).await;
1637
1638            Ok(())
1639        }
1640
1641        fn cancel_subtasks(&mut self) {}
1642    }
1643
1644    impl<
1645            TYPES: NodeType,
1646            V: Versions,
1647            NET: ConnectedNetwork<TYPES::SignatureKey>,
1648            S: Storage<TYPES>,
1649        > Deref for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1650    {
1651        type Target = NetworkEventTaskState<TYPES, V, NET, S>;
1652
1653        fn deref(&self) -> &Self::Target {
1654            &self.network_event_task_state
1655        }
1656    }
1657
1658    impl<
1659            TYPES: NodeType,
1660            V: Versions,
1661            NET: ConnectedNetwork<TYPES::SignatureKey>,
1662            S: Storage<TYPES>,
1663        > DerefMut for NetworkEventTaskStateModifier<TYPES, V, NET, S>
1664    {
1665        fn deref_mut(&mut self) -> &mut Self::Target {
1666            &mut self.network_event_task_state
1667        }
1668    }
1669}