1use std::{
8 collections::{BTreeMap, HashMap},
9 hash::{DefaultHasher, Hash},
10 sync::Arc,
11};
12
13use async_broadcast::{Receiver, Sender};
14use async_trait::async_trait;
15use hotshot_task::task::TaskState;
16use hotshot_types::{
17 consensus::OuterConsensus,
18 data::{EpochNumber, VidDisperse, VidDisperseShare, ViewNumber},
19 epoch_membership::EpochMembershipCoordinator,
20 event::{Event, EventType, HotShotAction},
21 message::{
22 DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message, MessageKind, Proposal,
23 SequencingMessage, UpgradeLock, convert_proposal,
24 },
25 simple_vote::HasEpoch,
26 storage_metrics::StorageMetricsValue,
27 traits::{
28 network::{
29 BroadcastDelay, ConnectedNetwork, RequestKind, ResponseMessage, Topic, TransmitType,
30 ViewMessage,
31 },
32 node_implementation::NodeType,
33 storage::Storage,
34 },
35 vote::{HasViewNumber, Vote},
36};
37use hotshot_utils::anytrace::*;
38use tokio::{spawn, task::JoinHandle, time::Instant};
39use tracing::instrument;
40
41use crate::{
42 events::{HotShotEvent, HotShotTaskCompleted},
43 helpers::broadcast_event,
44};
45
46#[derive(Clone)]
48pub struct NetworkMessageTaskState<TYPES: NodeType> {
49 pub internal_event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
51
52 pub external_event_stream: Sender<Event<TYPES>>,
54
55 pub public_key: TYPES::SignatureKey,
57
58 pub upgrade_lock: UpgradeLock<TYPES>,
60
61 pub id: u64,
63}
64
65impl<TYPES: NodeType> NetworkMessageTaskState<TYPES> {
66 #[instrument(skip_all, name = "Network message task", fields(id = self.id), level = "trace")]
67 pub async fn handle_message(&mut self, message: Message<TYPES>) {
69 match &message.kind {
70 MessageKind::Consensus(_) => {
71 tracing::debug!("Received consensus message from network: {:?}", message)
72 },
73 MessageKind::Data(_) => {
74 tracing::trace!("Received data message from network: {:?}", message)
75 },
76 MessageKind::External(_) => {
77 tracing::trace!("Received external message from network: {:?}", message)
78 },
79 }
80
81 let sender = message.sender;
83 match message.kind {
84 MessageKind::Consensus(consensus_message) => {
86 let event = match consensus_message {
87 SequencingMessage::General(general_message) => match general_message {
88 GeneralConsensusMessage::Proposal(proposal) => {
89 if self
90 .upgrade_lock
91 .epochs_enabled(proposal.data.view_number())
92 {
93 tracing::warn!(
94 "received GeneralConsensusMessage::Proposal for view {} but \
95 epochs are enabled for that view",
96 proposal.data.view_number()
97 );
98 return;
99 }
100 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
101 },
102 GeneralConsensusMessage::Proposal2Legacy(proposal) => {
103 if !self
104 .upgrade_lock
105 .proposal2_legacy_version(proposal.data.view_number())
106 {
107 tracing::warn!(
108 "received GeneralConsensusMessage::Proposal2Legacy for view \
109 {} but we are in the wrong version for that message type",
110 proposal.data.view_number()
111 );
112 return;
113 }
114 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
115 },
116 GeneralConsensusMessage::Proposal2(proposal) => {
117 if !self
118 .upgrade_lock
119 .proposal2_version(proposal.data.view_number())
120 {
121 tracing::warn!(
122 "received GeneralConsensusMessage::Proposal2 for view {} but \
123 we are in the wrong version for that message type",
124 proposal.data.view_number()
125 );
126 return;
127 }
128 HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender)
129 },
130 GeneralConsensusMessage::ProposalRequested(req, sig) => {
131 HotShotEvent::QuorumProposalRequestRecv(req, sig)
132 },
133 GeneralConsensusMessage::ProposalResponse(proposal) => {
134 if self
135 .upgrade_lock
136 .epochs_enabled(proposal.data.view_number())
137 {
138 tracing::warn!(
139 "received GeneralConsensusMessage::ProposalResponse for view \
140 {} but we are in the wrong version for that message type",
141 proposal.data.view_number()
142 );
143 return;
144 }
145 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
146 },
147 GeneralConsensusMessage::ProposalResponse2Legacy(proposal) => {
148 if !self
149 .upgrade_lock
150 .proposal2_legacy_version(proposal.data.view_number())
151 {
152 tracing::warn!(
153 "received GeneralConsensusMessage::ProposalResponse2Legacy \
154 for view {} but we are in the wrong version for that message \
155 type",
156 proposal.data.view_number()
157 );
158 return;
159 }
160 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
161 },
162 GeneralConsensusMessage::ProposalResponse2(proposal) => {
163 if !self
164 .upgrade_lock
165 .proposal2_version(proposal.data.view_number())
166 {
167 tracing::warn!(
168 "received GeneralConsensusMessage::ProposalResponse2 for view \
169 {} but epochs are not enabled for that view",
170 proposal.data.view_number()
171 );
172 return;
173 }
174 HotShotEvent::QuorumProposalResponseRecv(convert_proposal(proposal))
175 },
176 GeneralConsensusMessage::Vote(vote) => {
177 if self.upgrade_lock.epochs_enabled(vote.view_number()) {
178 tracing::warn!(
179 "received GeneralConsensusMessage::Vote for view {} but \
180 epochs are enabled for that view",
181 vote.view_number()
182 );
183 return;
184 }
185 HotShotEvent::QuorumVoteRecv(vote.to_vote2())
186 },
187 GeneralConsensusMessage::Vote2(vote) => {
188 if !self.upgrade_lock.epochs_enabled(vote.view_number()) {
189 tracing::warn!(
190 "received GeneralConsensusMessage::Vote2 for view {} but \
191 epochs are not enabled for that view",
192 vote.view_number()
193 );
194 return;
195 }
196 HotShotEvent::QuorumVoteRecv(vote)
197 },
198 GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => {
199 if self
200 .upgrade_lock
201 .epochs_enabled(view_sync_message.view_number())
202 {
203 tracing::warn!(
204 "received GeneralConsensusMessage::ViewSyncPreCommitVote for \
205 view {} but epochs are enabled for that view",
206 view_sync_message.view_number()
207 );
208 return;
209 }
210 HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message.to_vote2())
211 },
212 GeneralConsensusMessage::ViewSyncPreCommitVote2(view_sync_message) => {
213 if !self
214 .upgrade_lock
215 .epochs_enabled(view_sync_message.view_number())
216 {
217 tracing::warn!(
218 "received GeneralConsensusMessage::ViewSyncPreCommitVote2 for \
219 view {} but epochs are not enabled for that view",
220 view_sync_message.view_number()
221 );
222 return;
223 }
224 HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message)
225 },
226 GeneralConsensusMessage::ViewSyncPreCommitCertificate(
227 view_sync_message,
228 ) => {
229 if self
230 .upgrade_lock
231 .epochs_enabled(view_sync_message.view_number())
232 {
233 tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
234 return;
235 }
236 HotShotEvent::ViewSyncPreCommitCertificateRecv(
237 view_sync_message.to_vsc2(),
238 )
239 },
240 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(
241 view_sync_message,
242 ) => {
243 if !self
244 .upgrade_lock
245 .epochs_enabled(view_sync_message.view_number())
246 {
247 tracing::warn!("received GeneralConsensusMessage::ViewSyncPreCommitCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
248 return;
249 }
250 HotShotEvent::ViewSyncPreCommitCertificateRecv(view_sync_message)
251 },
252 GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => {
253 if self
254 .upgrade_lock
255 .epochs_enabled(view_sync_message.view_number())
256 {
257 tracing::warn!(
258 "received GeneralConsensusMessage::ViewSyncCommitVote for \
259 view {} but epochs are enabled for that view",
260 view_sync_message.view_number()
261 );
262 return;
263 }
264 HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message.to_vote2())
265 },
266 GeneralConsensusMessage::ViewSyncCommitVote2(view_sync_message) => {
267 if !self
268 .upgrade_lock
269 .epochs_enabled(view_sync_message.view_number())
270 {
271 tracing::warn!(
272 "received GeneralConsensusMessage::ViewSyncCommitVote2 for \
273 view {} but epochs are not enabled for that view",
274 view_sync_message.view_number()
275 );
276 return;
277 }
278 HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message)
279 },
280 GeneralConsensusMessage::ViewSyncCommitCertificate(view_sync_message) => {
281 if self
282 .upgrade_lock
283 .epochs_enabled(view_sync_message.view_number())
284 {
285 tracing::warn!(
286 "received GeneralConsensusMessage::ViewSyncCommitCertificate \
287 for view {} but epochs are enabled for that view",
288 view_sync_message.view_number()
289 );
290 return;
291 }
292 HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message.to_vsc2())
293 },
294 GeneralConsensusMessage::ViewSyncCommitCertificate2(view_sync_message) => {
295 if !self
296 .upgrade_lock
297 .epochs_enabled(view_sync_message.view_number())
298 {
299 tracing::warn!(
300 "received GeneralConsensusMessage::ViewSyncCommitCertificate2 \
301 for view {} but epochs are not enabled for that view",
302 view_sync_message.view_number()
303 );
304 return;
305 }
306 HotShotEvent::ViewSyncCommitCertificateRecv(view_sync_message)
307 },
308 GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => {
309 if self
310 .upgrade_lock
311 .epochs_enabled(view_sync_message.view_number())
312 {
313 tracing::warn!(
314 "received GeneralConsensusMessage::ViewSyncFinalizeVote for \
315 view {} but epochs are enabled for that view",
316 view_sync_message.view_number()
317 );
318 return;
319 }
320 HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message.to_vote2())
321 },
322 GeneralConsensusMessage::ViewSyncFinalizeVote2(view_sync_message) => {
323 if !self
324 .upgrade_lock
325 .epochs_enabled(view_sync_message.view_number())
326 {
327 tracing::warn!(
328 "received GeneralConsensusMessage::ViewSyncFinalizeVote2 for \
329 view {} but epochs are not enabled for that view",
330 view_sync_message.view_number()
331 );
332 return;
333 }
334 HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message)
335 },
336 GeneralConsensusMessage::ViewSyncFinalizeCertificate(view_sync_message) => {
337 if self
338 .upgrade_lock
339 .epochs_enabled(view_sync_message.view_number())
340 {
341 tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate for view {} but epochs are enabled for that view", view_sync_message.view_number());
342 return;
343 }
344 HotShotEvent::ViewSyncFinalizeCertificateRecv(
345 view_sync_message.to_vsc2(),
346 )
347 },
348 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(
349 view_sync_message,
350 ) => {
351 if !self
352 .upgrade_lock
353 .epochs_enabled(view_sync_message.view_number())
354 {
355 tracing::warn!("received GeneralConsensusMessage::ViewSyncFinalizeCertificate2 for view {} but epochs are not enabled for that view", view_sync_message.view_number());
356 return;
357 }
358 HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_message)
359 },
360 GeneralConsensusMessage::TimeoutVote(message) => {
361 if self.upgrade_lock.epochs_enabled(message.view_number()) {
362 tracing::warn!(
363 "received GeneralConsensusMessage::TimeoutVote for view {} \
364 but epochs are enabled for that view",
365 message.view_number()
366 );
367 return;
368 }
369 HotShotEvent::TimeoutVoteRecv(message.to_vote2())
370 },
371 GeneralConsensusMessage::TimeoutVote2(message) => {
372 if !self.upgrade_lock.epochs_enabled(message.view_number()) {
373 tracing::warn!(
374 "received GeneralConsensusMessage::TimeoutVote2 for view {} \
375 but epochs are not enabled for that view",
376 message.view_number()
377 );
378 return;
379 }
380 HotShotEvent::TimeoutVoteRecv(message)
381 },
382 GeneralConsensusMessage::UpgradeProposal(message) => {
383 HotShotEvent::UpgradeProposalRecv(message, sender)
384 },
385 GeneralConsensusMessage::UpgradeVote(message) => {
386 tracing::error!("Received upgrade vote!");
387 HotShotEvent::UpgradeVoteRecv(message)
388 },
389 GeneralConsensusMessage::HighQc(qc, next_qc) => {
390 HotShotEvent::HighQcRecv(qc, next_qc, sender)
391 },
392 GeneralConsensusMessage::ExtendedQc(qc, next_epoch_qc) => {
393 HotShotEvent::ExtendedQcRecv(qc, next_epoch_qc, sender)
394 },
395 GeneralConsensusMessage::EpochRootQuorumVote(vote) => {
396 if !self
397 .upgrade_lock
398 .proposal2_legacy_version(vote.view_number())
399 {
400 tracing::warn!(
401 "received GeneralConsensusMessage::EpochRootQuorumVote for \
402 view {} but we do not expect this message in this version",
403 vote.view_number()
404 );
405 return;
406 }
407 HotShotEvent::EpochRootQuorumVoteRecv(vote.to_vote2())
408 },
409 GeneralConsensusMessage::EpochRootQuorumVote2(vote) => {
410 if !self.upgrade_lock.proposal2_version(vote.view_number()) {
411 tracing::warn!(
412 "received GeneralConsensusMessage::EpochRootQuorumVote2 for \
413 view {} but we do not expect this message in this version",
414 vote.view_number()
415 );
416 return;
417 }
418 HotShotEvent::EpochRootQuorumVoteRecv(vote)
419 },
420 GeneralConsensusMessage::EpochRootQc(root_qc) => {
421 if !self.upgrade_lock.proposal2_version(root_qc.view_number()) {
422 tracing::warn!(
423 "received GeneralConsensusMessage::EpochRootQc for view {} \
424 but we are in the wrong version for that message types",
425 root_qc.view_number()
426 );
427 return;
428 }
429 HotShotEvent::EpochRootQcRecv(root_qc, sender)
430 },
431 GeneralConsensusMessage::EpochRootQcV1(root_qc) => {
432 if !self
433 .upgrade_lock
434 .proposal2_legacy_version(root_qc.view_number())
435 {
436 tracing::warn!(
437 "received GeneralConsensusMessage::EpochRootQcV1 for view {} \
438 but we are in the wrong version for that message type",
439 root_qc.view_number()
440 );
441 return;
442 }
443 HotShotEvent::EpochRootQcRecv(root_qc.into(), sender)
444 },
445 },
446 SequencingMessage::Da(da_message) => match da_message {
447 DaConsensusMessage::DaProposal(proposal) => {
448 if self
449 .upgrade_lock
450 .epochs_enabled(proposal.data.view_number())
451 {
452 tracing::warn!(
453 "received DaConsensusMessage::DaProposal for view {} but \
454 epochs are enabled for that view",
455 proposal.data.view_number()
456 );
457 return;
458 }
459 HotShotEvent::DaProposalRecv(convert_proposal(proposal), sender)
460 },
461 DaConsensusMessage::DaProposal2(proposal) => {
462 if !self
463 .upgrade_lock
464 .epochs_enabled(proposal.data.view_number())
465 {
466 tracing::warn!(
467 "received DaConsensusMessage::DaProposal2 for view {} but \
468 epochs are not enabled for that view",
469 proposal.data.view_number()
470 );
471 return;
472 }
473 HotShotEvent::DaProposalRecv(proposal, sender)
474 },
475 DaConsensusMessage::DaVote(vote) => {
476 if self.upgrade_lock.epochs_enabled(vote.view_number()) {
477 tracing::warn!(
478 "received DaConsensusMessage::DaVote for view {} but epochs \
479 are enabled for that view",
480 vote.view_number()
481 );
482 return;
483 }
484 HotShotEvent::DaVoteRecv(vote.clone().to_vote2())
485 },
486 DaConsensusMessage::DaVote2(vote) => {
487 if !self.upgrade_lock.epochs_enabled(vote.view_number()) {
488 tracing::warn!(
489 "received DaConsensusMessage::DaVote2 for view {} but epochs \
490 are not enabled for that view",
491 vote.view_number()
492 );
493 return;
494 }
495 HotShotEvent::DaVoteRecv(vote.clone())
496 },
497 DaConsensusMessage::DaCertificate(cert) => {
498 if self.upgrade_lock.epochs_enabled(cert.view_number()) {
499 tracing::warn!(
500 "received DaConsensusMessage::DaCertificate for view {} but \
501 epochs are enabled for that view",
502 cert.view_number()
503 );
504 return;
505 }
506 HotShotEvent::DaCertificateRecv(cert.to_dac2())
507 },
508 DaConsensusMessage::DaCertificate2(cert) => {
509 if !self.upgrade_lock.epochs_enabled(cert.view_number()) {
510 tracing::warn!(
511 "received DaConsensusMessage::DaCertificate2 for view {} but \
512 epochs are not enabled for that view",
513 cert.view_number()
514 );
515 return;
516 }
517 HotShotEvent::DaCertificateRecv(cert)
518 },
519 DaConsensusMessage::VidDisperseMsg(proposal) => {
520 if self
521 .upgrade_lock
522 .epochs_enabled(proposal.data.view_number())
523 {
524 tracing::warn!(
525 "received DaConsensusMessage::VidDisperseMsg for view {} but \
526 epochs are enabled for that view",
527 proposal.data.view_number()
528 );
529 return;
530 }
531 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
532 },
533 DaConsensusMessage::VidDisperseMsg1(proposal) => {
534 if !self
535 .upgrade_lock
536 .epochs_enabled(proposal.data.view_number())
537 {
538 tracing::warn!(
539 "received DaConsensusMessage::VidDisperseMsg1 for view {} but \
540 epochs are not enabled for that view",
541 proposal.data.view_number()
542 );
543 return;
544 }
545 if self.upgrade_lock.upgraded_vid2(proposal.data.view_number()) {
546 tracing::warn!(
547 "received DaConsensusMessage::VidDisperseMsg1 for view {} but \
548 vid2 upgrade is enabled for that view",
549 proposal.data.view_number()
550 );
551 return;
552 }
553 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
554 },
555 DaConsensusMessage::VidDisperseMsg2(proposal) => {
556 if !self.upgrade_lock.upgraded_vid2(proposal.data.view_number()) {
557 tracing::warn!(
558 "received DaConsensusMessage::VidDisperseMsg2 for view {} but \
559 vid2 upgrade is not enabled for that view",
560 proposal.data.view_number()
561 );
562 return;
563 }
564 HotShotEvent::VidShareRecv(sender, convert_proposal(proposal))
565 },
566 },
567 };
568 broadcast_event(Arc::new(event), &self.internal_event_stream).await;
569 },
570
571 MessageKind::Data(message) => match message {
573 DataMessage::SubmitTransaction(transaction, _) => {
574 let mut hasher = DefaultHasher::new();
575 transaction.hash(&mut hasher);
576 broadcast_event(
577 Arc::new(HotShotEvent::TransactionsRecv(vec![transaction])),
578 &self.internal_event_stream,
579 )
580 .await;
581 },
582 DataMessage::DataResponse(response) => {
583 if let ResponseMessage::Found(message) = response {
584 match message {
585 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(proposal)) => {
586 broadcast_event(
587 Arc::new(HotShotEvent::VidResponseRecv(
588 sender,
589 convert_proposal(proposal),
590 )),
591 &self.internal_event_stream,
592 )
593 .await;
594 },
595 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg1(
596 proposal,
597 )) => {
598 broadcast_event(
599 Arc::new(HotShotEvent::VidResponseRecv(
600 sender,
601 convert_proposal(proposal),
602 )),
603 &self.internal_event_stream,
604 )
605 .await;
606 },
607 _ => {},
608 }
609 }
610 },
611 DataMessage::RequestData(data) => {
612 let req_data = data.clone();
613 if let RequestKind::Vid(_view_number, _key) = req_data.request {
614 broadcast_event(
615 Arc::new(HotShotEvent::VidRequestRecv(data, sender)),
616 &self.internal_event_stream,
617 )
618 .await;
619 }
620 },
621 },
622
623 MessageKind::External(data) => {
625 if sender == self.public_key {
626 return;
627 }
628 broadcast_event(
630 Event {
631 view_number: ViewNumber::new(1),
632 event: EventType::ExternalMessageReceived { sender, data },
633 },
634 &self.external_event_stream,
635 )
636 .await;
637 },
638 }
639 }
640}
641
642pub struct NetworkEventTaskState<
644 TYPES: NodeType,
645 NET: ConnectedNetwork<TYPES::SignatureKey>,
646 S: Storage<TYPES>,
647> {
648 pub network: Arc<NET>,
650
651 pub view: ViewNumber,
653
654 pub epoch: Option<EpochNumber>,
656
657 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
659
660 pub storage: S,
662
663 pub storage_metrics: Arc<StorageMetricsValue>,
665
666 pub consensus: OuterConsensus<TYPES>,
668
669 pub upgrade_lock: UpgradeLock<TYPES>,
671
672 pub transmit_tasks: BTreeMap<ViewNumber, Vec<JoinHandle<()>>>,
674
675 pub epoch_height: u64,
677
678 pub id: u64,
680}
681
682#[async_trait]
683impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
684 TaskState for NetworkEventTaskState<TYPES, NET, S>
685{
686 type Event = HotShotEvent<TYPES>;
687
688 async fn handle_event(
689 &mut self,
690 event: Arc<Self::Event>,
691 _sender: &Sender<Arc<Self::Event>>,
692 _receiver: &Receiver<Arc<Self::Event>>,
693 ) -> Result<()> {
694 self.handle(event).await;
695
696 Ok(())
697 }
698
699 fn cancel_subtasks(&mut self) {}
700}
701
702impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
703 NetworkEventTaskState<TYPES, NET, S>
704{
705 #[instrument(skip_all, fields(id = self.id, view = *self.view), name = "Network Task", level = "error")]
709 pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
710 let mut maybe_action = None;
711 if let Some((sender, message_kind, transmit)) =
712 self.parse_event(event, &mut maybe_action).await
713 {
714 self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
715 .await;
716 };
717 }
718
719 async fn handle_vid_disperse_proposal(
721 &self,
722 vid_proposal: Proposal<TYPES, VidDisperse<TYPES>>,
723 sender: &<TYPES as NodeType>::SignatureKey,
724 ) -> Option<HotShotTaskCompleted> {
725 let view = vid_proposal.data.view_number();
726 let epoch = vid_proposal.data.epoch();
727 let vid_share_proposals = VidDisperse::to_share_proposals(vid_proposal);
728 let mut messages = HashMap::new();
729
730 for proposal in vid_share_proposals {
731 let recipient = proposal.data.recipient_key().clone();
732 let epochs_enabled = self
733 .upgrade_lock
734 .epochs_enabled(proposal.data.view_number());
735 let upgraded_vid2 = self.upgrade_lock.upgraded_vid2(proposal.data.view_number());
736 let message = if !epochs_enabled {
737 let vid_share_proposal = if let VidDisperseShare::V0(data) = proposal.data {
738 Proposal {
739 data,
740 signature: proposal.signature,
741 _pd: proposal._pd,
742 }
743 } else {
744 tracing::warn!(
745 "Epochs are not enabled for view {} but didn't receive VidDisperseShare1",
746 proposal.data.view_number()
747 );
748 return None;
749 };
750 Message {
751 sender: sender.clone(),
752 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
753 DaConsensusMessage::VidDisperseMsg(vid_share_proposal),
754 )),
755 }
756 } else if !upgraded_vid2 {
757 let vid_share_proposal = if let VidDisperseShare::V1(data) = proposal.data {
758 Proposal {
759 data,
760 signature: proposal.signature,
761 _pd: proposal._pd,
762 }
763 } else {
764 tracing::warn!(
765 "Epochs are enabled and Vid2Upgrade is not enabled for view {} but didn't \
766 receive VidDisperseShare2",
767 proposal.data.view_number()
768 );
769 return None;
770 };
771 Message {
772 sender: sender.clone(),
773 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
774 DaConsensusMessage::VidDisperseMsg1(vid_share_proposal),
775 )),
776 }
777 } else {
778 let vid_share_proposal = if let VidDisperseShare::V2(data) = proposal.data {
779 Proposal {
780 data,
781 signature: proposal.signature,
782 _pd: proposal._pd,
783 }
784 } else {
785 tracing::warn!(
786 "Vid2Upgrade is enabled for view {} but didn't receive VidDisperseShare2",
787 proposal.data.view_number()
788 );
789 return None;
790 };
791 Message {
792 sender: sender.clone(),
793 kind: MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
794 DaConsensusMessage::VidDisperseMsg2(vid_share_proposal),
795 )),
796 }
797 };
798 let view = message.view_number();
799 let serialized_message = match self.upgrade_lock.serialize(&message) {
800 Ok(serialized) => serialized,
801 Err(e) => {
802 tracing::error!("Failed to serialize message: {e}");
803 continue;
804 },
805 };
806
807 messages.insert(recipient, (view.u64().into(), serialized_message));
808 }
809
810 let net = Arc::clone(&self.network);
811 let storage = self.storage.clone();
812 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
813 spawn(async move {
814 if NetworkEventTaskState::<TYPES, NET, S>::maybe_record_action(
815 Some(HotShotAction::VidDisperse),
816 storage,
817 consensus,
818 view,
819 epoch,
820 )
821 .await
822 .is_err()
823 {
824 return;
825 }
826 match net.vid_broadcast_message(messages).await {
827 Ok(()) => {},
828 Err(e) => tracing::warn!("Failed to send message from network task: {e:?}"),
829 }
830 });
831
832 None
833 }
834
835 async fn maybe_record_action(
837 maybe_action: Option<HotShotAction>,
838 storage: S,
839 consensus: OuterConsensus<TYPES>,
840 view: ViewNumber,
841 epoch: Option<EpochNumber>,
842 ) -> std::result::Result<(), ()> {
843 if let Some(mut action) = maybe_action {
844 if !consensus.write().await.update_action(action, view) {
845 return Err(());
846 }
847 if matches!(action, HotShotAction::ViewSyncVote) {
850 action = HotShotAction::Vote;
851 }
852 match storage.record_action(view, epoch, action).await {
853 Ok(()) => Ok(()),
854 Err(e) => {
855 tracing::warn!("Not Sending {action:?} because of storage error: {e:?}");
856 Err(())
857 },
858 }
859 } else {
860 Ok(())
861 }
862 }
863
864 pub fn cancel_tasks(&mut self, view: ViewNumber) {
866 let keep = self.transmit_tasks.split_off(&view);
867
868 while let Some((_, tasks)) = self.transmit_tasks.pop_first() {
869 for task in tasks {
870 task.abort();
871 }
872 }
873
874 self.transmit_tasks = keep;
875 }
876
877 #[allow(clippy::too_many_lines)]
882 async fn parse_event(
883 &mut self,
884 event: Arc<HotShotEvent<TYPES>>,
885 maybe_action: &mut Option<HotShotAction>,
886 ) -> Option<(
887 <TYPES as NodeType>::SignatureKey,
888 MessageKind<TYPES>,
889 TransmitType<TYPES>,
890 )> {
891 match event.as_ref().clone() {
892 HotShotEvent::QuorumProposalSend(proposal, sender) => {
893 *maybe_action = Some(HotShotAction::Propose);
894
895 let message = if self
896 .upgrade_lock
897 .proposal2_version(proposal.data.view_number())
898 {
899 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
900 GeneralConsensusMessage::Proposal2(convert_proposal(proposal)),
901 ))
902 } else if self
903 .upgrade_lock
904 .proposal2_legacy_version(proposal.data.view_number())
905 {
906 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
907 GeneralConsensusMessage::Proposal2Legacy(convert_proposal(proposal)),
908 ))
909 } else {
910 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
911 GeneralConsensusMessage::Proposal(convert_proposal(proposal)),
912 ))
913 };
914
915 Some((sender, message, TransmitType::Broadcast))
916 },
917
918 HotShotEvent::QuorumVoteSend(vote) => {
920 *maybe_action = Some(HotShotAction::Vote);
921 let view_number = vote.view_number() + 1;
922 let leader = match self
923 .membership_coordinator
924 .membership_for_epoch(vote.epoch())
925 .await
926 .ok()?
927 .leader(view_number)
928 .await
929 {
930 Ok(l) => l,
931 Err(e) => {
932 tracing::warn!(
933 "Failed to calculate leader for view number {view_number}. Error: \
934 {e:?}"
935 );
936 return None;
937 },
938 };
939
940 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
941 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
942 GeneralConsensusMessage::Vote2(vote.clone()),
943 ))
944 } else {
945 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
946 GeneralConsensusMessage::Vote(vote.clone().to_vote()),
947 ))
948 };
949
950 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
951 },
952 HotShotEvent::EpochRootQuorumVoteSend(vote) => {
953 *maybe_action = Some(HotShotAction::Vote);
954 let view_number = vote.view_number() + 1;
955 let leader = match self
956 .membership_coordinator
957 .membership_for_epoch(vote.epoch())
958 .await
959 .ok()?
960 .leader(view_number)
961 .await
962 {
963 Ok(l) => l,
964 Err(e) => {
965 tracing::warn!(
966 "Failed to calculate leader for view number {:?}. Error: {:?}",
967 view_number,
968 e
969 );
970 return None;
971 },
972 };
973
974 let message = if self.upgrade_lock.proposal2_version(vote.view_number()) {
975 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
976 GeneralConsensusMessage::EpochRootQuorumVote2(vote.clone()),
977 ))
978 } else if self
979 .upgrade_lock
980 .proposal2_legacy_version(vote.view_number())
981 {
982 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
983 GeneralConsensusMessage::EpochRootQuorumVote(vote.clone().to_vote()),
984 ))
985 } else {
986 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
987 GeneralConsensusMessage::Vote(vote.vote.clone().to_vote()),
988 ))
989 };
990
991 Some((
992 vote.vote.signing_key(),
993 message,
994 TransmitType::Direct(leader),
995 ))
996 },
997 HotShotEvent::ExtendedQuorumVoteSend(vote) => {
998 *maybe_action = Some(HotShotAction::Vote);
999 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1000 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1001 GeneralConsensusMessage::Vote2(vote.clone()),
1002 ))
1003 } else {
1004 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1005 GeneralConsensusMessage::Vote(vote.clone().to_vote()),
1006 ))
1007 };
1008
1009 Some((vote.signing_key(), message, TransmitType::Broadcast))
1010 },
1011 HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
1012 req.key.clone(),
1013 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1014 GeneralConsensusMessage::ProposalRequested(req.clone(), signature),
1015 )),
1016 TransmitType::Broadcast,
1017 )),
1018 HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => {
1019 let message = if self
1020 .upgrade_lock
1021 .proposal2_version(proposal.data.view_number())
1022 {
1023 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1024 GeneralConsensusMessage::ProposalResponse2(convert_proposal(proposal)),
1025 ))
1026 } else if self
1027 .upgrade_lock
1028 .proposal2_legacy_version(proposal.data.view_number())
1029 {
1030 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1031 GeneralConsensusMessage::ProposalResponse2Legacy(convert_proposal(
1032 proposal,
1033 )),
1034 ))
1035 } else {
1036 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1037 GeneralConsensusMessage::ProposalResponse(convert_proposal(proposal)),
1038 ))
1039 };
1040
1041 Some((
1042 sender_key.clone(),
1043 message,
1044 TransmitType::Direct(sender_key),
1045 ))
1046 },
1047 HotShotEvent::VidDisperseSend(proposal, sender) => {
1048 self.handle_vid_disperse_proposal(proposal, &sender).await;
1049 None
1050 },
1051 HotShotEvent::DaProposalSend(proposal, sender) => {
1052 *maybe_action = Some(HotShotAction::DaPropose);
1053
1054 let message = if self
1055 .upgrade_lock
1056 .epochs_enabled(proposal.data.view_number())
1057 {
1058 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1059 DaConsensusMessage::DaProposal2(proposal),
1060 ))
1061 } else {
1062 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1063 DaConsensusMessage::DaProposal(convert_proposal(proposal)),
1064 ))
1065 };
1066
1067 Some((sender, message, TransmitType::DaCommitteeBroadcast))
1068 },
1069 HotShotEvent::DaVoteSend(vote) => {
1070 *maybe_action = Some(HotShotAction::DaVote);
1071 let view_number = vote.view_number();
1072 let leader = match self
1073 .membership_coordinator
1074 .membership_for_epoch(vote.epoch())
1075 .await
1076 .ok()?
1077 .leader(view_number)
1078 .await
1079 {
1080 Ok(l) => l,
1081 Err(e) => {
1082 tracing::warn!(
1083 "Failed to calculate leader for view number {view_number}. Error: \
1084 {e:?}"
1085 );
1086 return None;
1087 },
1088 };
1089
1090 let message = if self.upgrade_lock.epochs_enabled(view_number) {
1091 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1092 DaConsensusMessage::DaVote2(vote.clone()),
1093 ))
1094 } else {
1095 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1096 DaConsensusMessage::DaVote(vote.clone().to_vote()),
1097 ))
1098 };
1099
1100 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1101 },
1102 HotShotEvent::DacSend(certificate, sender) => {
1103 *maybe_action = Some(HotShotAction::DaCert);
1104 let message = if self.upgrade_lock.epochs_enabled(certificate.view_number()) {
1105 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1106 DaConsensusMessage::DaCertificate2(certificate),
1107 ))
1108 } else {
1109 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::Da(
1110 DaConsensusMessage::DaCertificate(certificate.to_dac()),
1111 ))
1112 };
1113
1114 Some((sender, message, TransmitType::Broadcast))
1115 },
1116 HotShotEvent::ViewSyncPreCommitVoteSend(vote) => {
1117 let view_number = vote.view_number() + vote.date().relay;
1118 let leader = match self
1119 .membership_coordinator
1120 .membership_for_epoch(self.epoch)
1121 .await
1122 .ok()?
1123 .leader(view_number)
1124 .await
1125 {
1126 Ok(l) => l,
1127 Err(e) => {
1128 tracing::warn!(
1129 "Failed to calculate leader for view number {view_number}. Error: \
1130 {e:?}"
1131 );
1132 return None;
1133 },
1134 };
1135 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1136 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1137 GeneralConsensusMessage::ViewSyncPreCommitVote2(vote.clone()),
1138 ))
1139 } else {
1140 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1141 GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone().to_vote()),
1142 ))
1143 };
1144
1145 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1146 },
1147 HotShotEvent::ViewSyncCommitVoteSend(vote) => {
1148 *maybe_action = Some(HotShotAction::ViewSyncVote);
1149 let view_number = vote.view_number() + vote.date().relay;
1150 let leader = match self
1151 .membership_coordinator
1152 .membership_for_epoch(self.epoch)
1153 .await
1154 .ok()?
1155 .leader(view_number)
1156 .await
1157 {
1158 Ok(l) => l,
1159 Err(e) => {
1160 tracing::warn!(
1161 "Failed to calculate leader for view number {view_number}. Error: \
1162 {e:?}"
1163 );
1164 return None;
1165 },
1166 };
1167 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1168 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1169 GeneralConsensusMessage::ViewSyncCommitVote2(vote.clone()),
1170 ))
1171 } else {
1172 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1173 GeneralConsensusMessage::ViewSyncCommitVote(vote.clone().to_vote()),
1174 ))
1175 };
1176
1177 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1178 },
1179 HotShotEvent::ViewSyncFinalizeVoteSend(vote) => {
1180 *maybe_action = Some(HotShotAction::ViewSyncVote);
1181 let view_number = vote.view_number() + vote.date().relay;
1182 let leader = match self
1183 .membership_coordinator
1184 .membership_for_epoch(self.epoch)
1185 .await
1186 .ok()?
1187 .leader(view_number)
1188 .await
1189 {
1190 Ok(l) => l,
1191 Err(e) => {
1192 tracing::warn!(
1193 "Failed to calculate leader for view number {view_number:?}. Error: \
1194 {e:?}"
1195 );
1196 return None;
1197 },
1198 };
1199 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1200 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1201 GeneralConsensusMessage::ViewSyncFinalizeVote2(vote.clone()),
1202 ))
1203 } else {
1204 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1205 GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone().to_vote()),
1206 ))
1207 };
1208
1209 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1210 },
1211 HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, sender) => {
1212 let view_number = certificate.view_number();
1213 let message = if self.upgrade_lock.epochs_enabled(view_number) {
1214 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1215 GeneralConsensusMessage::ViewSyncPreCommitCertificate2(certificate),
1216 ))
1217 } else {
1218 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1219 GeneralConsensusMessage::ViewSyncPreCommitCertificate(certificate.to_vsc()),
1220 ))
1221 };
1222
1223 Some((sender, message, TransmitType::Broadcast))
1224 },
1225 HotShotEvent::ViewSyncCommitCertificateSend(certificate, sender) => {
1226 let view_number = certificate.view_number();
1227 let message = if self.upgrade_lock.epochs_enabled(view_number) {
1228 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1229 GeneralConsensusMessage::ViewSyncCommitCertificate2(certificate),
1230 ))
1231 } else {
1232 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1233 GeneralConsensusMessage::ViewSyncCommitCertificate(certificate.to_vsc()),
1234 ))
1235 };
1236
1237 Some((sender, message, TransmitType::Broadcast))
1238 },
1239 HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, sender) => {
1240 let view_number = certificate.view_number();
1241 let message = if self.upgrade_lock.epochs_enabled(view_number) {
1242 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1243 GeneralConsensusMessage::ViewSyncFinalizeCertificate2(certificate),
1244 ))
1245 } else {
1246 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1247 GeneralConsensusMessage::ViewSyncFinalizeCertificate(certificate.to_vsc()),
1248 ))
1249 };
1250
1251 Some((sender, message, TransmitType::Broadcast))
1252 },
1253 HotShotEvent::TimeoutVoteSend(vote) => {
1254 *maybe_action = Some(HotShotAction::TimeoutVote);
1255 let view_number = vote.view_number() + 1;
1256 let leader = match self
1257 .membership_coordinator
1258 .membership_for_epoch(self.epoch)
1259 .await
1260 .ok()?
1261 .leader(view_number)
1262 .await
1263 {
1264 Ok(l) => l,
1265 Err(e) => {
1266 tracing::warn!(
1267 "Failed to calculate leader for view number {view_number}. Error: \
1268 {e:?}"
1269 );
1270 return None;
1271 },
1272 };
1273 let message = if self.upgrade_lock.epochs_enabled(vote.view_number()) {
1274 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1275 GeneralConsensusMessage::TimeoutVote2(vote.clone()),
1276 ))
1277 } else {
1278 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1279 GeneralConsensusMessage::TimeoutVote(vote.clone().to_vote()),
1280 ))
1281 };
1282
1283 Some((vote.signing_key(), message, TransmitType::Direct(leader)))
1284 },
1285 HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
1286 sender,
1287 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1288 GeneralConsensusMessage::UpgradeProposal(proposal),
1289 )),
1290 TransmitType::Broadcast,
1291 )),
1292 HotShotEvent::UpgradeVoteSend(vote) => {
1293 tracing::error!("Sending upgrade vote!");
1294 let view_number = vote.view_number();
1295 let leader = match self
1296 .membership_coordinator
1297 .membership_for_epoch(self.epoch)
1298 .await
1299 .ok()?
1300 .leader(view_number)
1301 .await
1302 {
1303 Ok(l) => l,
1304 Err(e) => {
1305 tracing::warn!(
1306 "Failed to calculate leader for view number {view_number}. Error: \
1307 {e:?}"
1308 );
1309 return None;
1310 },
1311 };
1312 Some((
1313 vote.signing_key(),
1314 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1315 GeneralConsensusMessage::UpgradeVote(vote.clone()),
1316 )),
1317 TransmitType::Direct(leader),
1318 ))
1319 },
1320 HotShotEvent::ViewChange(view, epoch) => {
1321 self.view = view;
1322 if epoch > self.epoch {
1323 self.epoch = epoch;
1324 }
1325 let keep_view = ViewNumber::new(view.saturating_sub(1));
1326 self.cancel_tasks(keep_view);
1327 let net = Arc::clone(&self.network);
1328 let epoch = self.epoch.map(|x| x.u64().into());
1329 let membership_coordinator = self.membership_coordinator.clone();
1330 spawn(async move {
1331 net.update_view::<TYPES>(keep_view.u64().into(), epoch, membership_coordinator)
1332 .await;
1333 });
1334 None
1335 },
1336 HotShotEvent::VidRequestSend(req, sender, to) => Some((
1337 sender,
1338 MessageKind::Data(DataMessage::RequestData(req)),
1339 TransmitType::Direct(to),
1340 )),
1341 HotShotEvent::VidResponseSend(sender, to, proposal) => {
1342 let epochs_enabled = self
1343 .upgrade_lock
1344 .epochs_enabled(proposal.data.view_number());
1345 let upgraded_vid2 = self.upgrade_lock.upgraded_vid2(proposal.data.view_number());
1346 let message = match proposal.data {
1347 VidDisperseShare::V0(data) => {
1348 if epochs_enabled {
1349 tracing::warn!(
1350 "Epochs are enabled for view {} but still receive \
1351 VidDisperseShare0",
1352 data.view_number()
1353 );
1354 return None;
1355 }
1356 let vid_share_proposal = Proposal {
1357 data,
1358 signature: proposal.signature,
1359 _pd: proposal._pd,
1360 };
1361 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1362 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg(
1363 vid_share_proposal,
1364 )),
1365 )))
1366 },
1367 VidDisperseShare::V1(data) => {
1368 if !epochs_enabled {
1369 tracing::warn!(
1370 "Epochs are enabled for view {} but didn't receive \
1371 VidDisperseShare0",
1372 data.view_number()
1373 );
1374 return None;
1375 }
1376 if upgraded_vid2 {
1377 tracing::warn!(
1378 "VID2 upgrade is enabled for view {} but didn't receive \
1379 VidDisperseShare2",
1380 data.view_number()
1381 );
1382 return None;
1383 }
1384 let vid_share_proposal = Proposal {
1385 data,
1386 signature: proposal.signature,
1387 _pd: proposal._pd,
1388 };
1389 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1390 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg1(
1391 vid_share_proposal,
1392 )),
1393 )))
1394 },
1395 VidDisperseShare::V2(data) => {
1396 if !upgraded_vid2 {
1397 tracing::warn!(
1398 "VID2 upgrade is not enabled for view {} but receive \
1399 VidDisperseShare2",
1400 data.view_number()
1401 );
1402 return None;
1403 }
1404 let vid_share_proposal = Proposal {
1405 data,
1406 signature: proposal.signature,
1407 _pd: proposal._pd,
1408 };
1409 MessageKind::Data(DataMessage::DataResponse(ResponseMessage::Found(
1410 SequencingMessage::Da(DaConsensusMessage::VidDisperseMsg2(
1411 vid_share_proposal,
1412 )),
1413 )))
1414 },
1415 };
1416 Some((sender, message, TransmitType::Direct(to)))
1417 },
1418 HotShotEvent::HighQcSend(quorum_cert, next_epoch_qc, leader, sender) => Some((
1419 sender,
1420 MessageKind::Consensus(SequencingMessage::General(
1421 GeneralConsensusMessage::HighQc(quorum_cert, next_epoch_qc),
1422 )),
1423 TransmitType::Direct(leader),
1424 )),
1425 HotShotEvent::EpochRootQcSend(epoch_root_qc, sender, leader) => {
1426 let message = if self
1427 .upgrade_lock
1428 .proposal2_version(epoch_root_qc.view_number())
1429 {
1430 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1431 GeneralConsensusMessage::EpochRootQc(epoch_root_qc),
1432 ))
1433 } else {
1434 MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
1435 GeneralConsensusMessage::EpochRootQcV1(epoch_root_qc.into()),
1436 ))
1437 };
1438
1439 Some((sender, message, TransmitType::Direct(leader)))
1440 },
1441 HotShotEvent::ExtendedQcSend(quorum_cert, next_epoch_qc, sender) => Some((
1442 sender,
1443 MessageKind::Consensus(SequencingMessage::General(
1444 GeneralConsensusMessage::ExtendedQc(quorum_cert, next_epoch_qc),
1445 )),
1446 TransmitType::Broadcast,
1447 )),
1448 _ => None,
1449 }
1450 }
1451
1452 async fn spawn_transmit_task(
1454 &mut self,
1455 message_kind: MessageKind<TYPES>,
1456 maybe_action: Option<HotShotAction>,
1457 transmit: TransmitType<TYPES>,
1458 sender: TYPES::SignatureKey,
1459 ) {
1460 let broadcast_delay = match &message_kind {
1461 MessageKind::Consensus(
1462 SequencingMessage::General(GeneralConsensusMessage::Vote(_))
1463 | SequencingMessage::Da(_),
1464 ) => BroadcastDelay::View(*message_kind.view_number()),
1465 _ => BroadcastDelay::None,
1466 };
1467 let message = Message {
1468 sender,
1469 kind: message_kind,
1470 };
1471 let view_number = message.kind.view_number();
1472 let epoch = message.kind.epoch();
1473 let committee_topic = Topic::Global;
1474 let Ok(mem) = self
1475 .membership_coordinator
1476 .stake_table_for_epoch(self.epoch)
1477 .await
1478 else {
1479 return;
1480 };
1481 let da_committee = mem.da_committee_members(view_number).await;
1482 let network = Arc::clone(&self.network);
1483 let storage = self.storage.clone();
1484 let storage_metrics = Arc::clone(&self.storage_metrics);
1485 let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
1486 let upgrade_lock = self.upgrade_lock.clone();
1487 let handle = spawn(async move {
1488 if NetworkEventTaskState::<TYPES, NET, S>::maybe_record_action(
1489 maybe_action,
1490 storage.clone(),
1491 consensus,
1492 view_number,
1493 epoch,
1494 )
1495 .await
1496 .is_err()
1497 {
1498 return;
1499 }
1500 if let MessageKind::Consensus(SequencingMessage::General(
1501 GeneralConsensusMessage::Proposal(prop),
1502 )) = &message.kind
1503 {
1504 let now = Instant::now();
1505 if storage
1506 .append_proposal2(&convert_proposal(prop.clone()))
1507 .await
1508 .is_err()
1509 {
1510 return;
1511 }
1512 storage_metrics
1513 .append_quorum_duration
1514 .add_point(now.elapsed().as_secs_f64());
1515 }
1516
1517 let serialized_message = match upgrade_lock.serialize(&message) {
1518 Ok(serialized) => serialized,
1519 Err(e) => {
1520 tracing::error!("Failed to serialize message: {e}");
1521 return;
1522 },
1523 };
1524
1525 let transmit_result = match transmit {
1526 TransmitType::Direct(recipient) => {
1527 network
1528 .direct_message(view_number.u64().into(), serialized_message, recipient)
1529 .await
1530 },
1531 TransmitType::Broadcast => {
1532 network
1533 .broadcast_message(
1534 view_number.u64().into(),
1535 serialized_message,
1536 committee_topic,
1537 broadcast_delay,
1538 )
1539 .await
1540 },
1541 TransmitType::DaCommitteeBroadcast => {
1542 network
1543 .da_broadcast_message(
1544 view_number.u64().into(),
1545 serialized_message,
1546 da_committee.iter().cloned().collect(),
1547 broadcast_delay,
1548 )
1549 .await
1550 },
1551 };
1552
1553 match transmit_result {
1554 Ok(()) => {},
1555 Err(e) => tracing::warn!("Failed to send message task: {e:?}"),
1556 }
1557 });
1558 self.transmit_tasks
1559 .entry(view_number)
1560 .or_default()
1561 .push(handle);
1562 }
1563}
1564
1565pub mod test {
1567 use std::ops::{Deref, DerefMut};
1568
1569 use async_trait::async_trait;
1570
1571 use super::{
1572 Arc, ConnectedNetwork, HotShotEvent, MessageKind, NetworkEventTaskState, NodeType,
1573 Receiver, Result, Sender, Storage, TaskState, TransmitType,
1574 };
1575
1576 pub type ModifierClosure<TYPES> = dyn Fn(
1579 &mut <TYPES as NodeType>::SignatureKey,
1580 &mut MessageKind<TYPES>,
1581 &mut TransmitType<TYPES>,
1582 &<TYPES as NodeType>::Membership,
1583 ) + Send
1584 + Sync;
1585
1586 pub struct NetworkEventTaskStateModifier<
1588 TYPES: NodeType,
1589 NET: ConnectedNetwork<TYPES::SignatureKey>,
1590 S: Storage<TYPES>,
1591 > {
1592 pub network_event_task_state: NetworkEventTaskState<TYPES, NET, S>,
1594 pub modifier: Arc<ModifierClosure<TYPES>>,
1597 }
1598
1599 impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
1600 NetworkEventTaskStateModifier<TYPES, NET, S>
1601 {
1602 pub async fn handle(&mut self, event: Arc<HotShotEvent<TYPES>>) {
1604 let mut maybe_action = None;
1605 if let Some((mut sender, mut message_kind, mut transmit)) =
1606 self.parse_event(event, &mut maybe_action).await
1607 {
1608 (self.modifier)(
1610 &mut sender,
1611 &mut message_kind,
1612 &mut transmit,
1613 &*self.membership_coordinator.membership().read().await,
1614 );
1615
1616 self.spawn_transmit_task(message_kind, maybe_action, transmit, sender)
1617 .await;
1618 }
1619 }
1620 }
1621
1622 #[async_trait]
1623 impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES> + 'static>
1624 TaskState for NetworkEventTaskStateModifier<TYPES, NET, S>
1625 {
1626 type Event = HotShotEvent<TYPES>;
1627
1628 async fn handle_event(
1629 &mut self,
1630 event: Arc<Self::Event>,
1631 _sender: &Sender<Arc<Self::Event>>,
1632 _receiver: &Receiver<Arc<Self::Event>>,
1633 ) -> Result<()> {
1634 self.handle(event).await;
1635
1636 Ok(())
1637 }
1638
1639 fn cancel_subtasks(&mut self) {}
1640 }
1641
1642 impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES>> Deref
1643 for NetworkEventTaskStateModifier<TYPES, NET, S>
1644 {
1645 type Target = NetworkEventTaskState<TYPES, NET, S>;
1646
1647 fn deref(&self) -> &Self::Target {
1648 &self.network_event_task_state
1649 }
1650 }
1651
1652 impl<TYPES: NodeType, NET: ConnectedNetwork<TYPES::SignatureKey>, S: Storage<TYPES>> DerefMut
1653 for NetworkEventTaskStateModifier<TYPES, NET, S>
1654 {
1655 fn deref_mut(&mut self) -> &mut Self::Target {
1656 &mut self.network_event_task_state
1657 }
1658 }
1659}