sequencer/
external_event_handler.rs1use std::{marker::PhantomData, sync::Arc};
4
5use anyhow::{Context, Result};
6use espresso_types::{PubKey, SeqTypes};
7use hotshot::types::Message;
8use hotshot_types::{
9 message::MessageKind,
10 traits::{
11 network::{BroadcastDelay, ConnectedNetwork, Topic},
12 node_implementation::Versions,
13 },
14};
15use request_response::network::Bytes;
16use serde::{Deserialize, Serialize};
17use tokio::sync::mpsc::{Receiver, Sender};
18use vbs::{bincode_serializer::BincodeSerializer, version::StaticVersion, BinarySerializer};
19
20use crate::context::TaskList;
21
22#[derive(Debug, Serialize, Deserialize, Clone)]
24pub enum ExternalMessage {
25 RequestResponse(Vec<u8>),
26}
27
28#[derive(Clone)]
30pub struct ExternalEventHandler<V: Versions> {
31 request_response_sender: Sender<Bytes>,
33
34 phantom: PhantomData<V>,
36}
37
38#[derive(Debug)]
40#[allow(dead_code)]
41pub enum OutboundMessage {
42 Direct(MessageKind<SeqTypes>, PubKey),
43 Broadcast(MessageKind<SeqTypes>),
44}
45
46impl<V: Versions> ExternalEventHandler<V> {
47 pub async fn new<N: ConnectedNetwork<PubKey>>(
49 tasks: &mut TaskList,
50 request_response_sender: Sender<Bytes>,
51 outbound_message_receiver: Receiver<OutboundMessage>,
52 network: Arc<N>,
53 public_key: PubKey,
54 ) -> Result<Self> {
55 tasks.spawn(
57 "ExternalEventHandler",
58 Self::outbound_message_loop(outbound_message_receiver, network, public_key),
59 );
60
61 Ok(Self {
62 request_response_sender,
63 phantom: PhantomData,
64 })
65 }
66
67 pub async fn handle_event(&self, external_message_bytes: &[u8]) -> Result<()> {
72 let external_message = bincode::deserialize(external_message_bytes)
74 .with_context(|| "Failed to deserialize external message")?;
75
76 match external_message {
78 ExternalMessage::RequestResponse(request_response) => {
79 self.request_response_sender
81 .send(request_response.into())
82 .await?;
83 },
84 }
85 Ok(())
86 }
87
88 async fn outbound_message_loop<N: ConnectedNetwork<PubKey>>(
90 mut receiver: Receiver<OutboundMessage>,
91 network: Arc<N>,
92 public_key: PubKey,
93 ) {
94 while let Some(message) = receiver.recv().await {
95 match message {
97 OutboundMessage::Direct(message, recipient) => {
98 let message_inner = Message {
100 sender: public_key,
101 kind: message,
102 };
103
104 let message_bytes =
106 match BincodeSerializer::<StaticVersion<0, 0>>::serialize(&message_inner) {
107 Ok(message_bytes) => message_bytes,
108 Err(err) => {
109 tracing::warn!("Failed to serialize direct message: {}", err);
110 continue;
111 },
112 };
113
114 if let Err(err) = network.direct_message(message_bytes, recipient).await {
116 tracing::warn!("Failed to send message: {:?}", err);
117 };
118 },
119
120 OutboundMessage::Broadcast(message) => {
121 let message_inner = Message {
123 sender: public_key,
124 kind: message,
125 };
126
127 let message_bytes =
129 match BincodeSerializer::<StaticVersion<0, 0>>::serialize(&message_inner) {
130 Ok(message_bytes) => message_bytes,
131 Err(err) => {
132 tracing::warn!("Failed to serialize broadcast message: {}", err);
133 continue;
134 },
135 };
136
137 if let Err(err) = network
139 .broadcast_message(message_bytes, Topic::Global, BroadcastDelay::None)
140 .await
141 {
142 tracing::error!("Failed to broadcast message: {:?}", err);
143 };
144 },
145 }
146 }
147 }
148}