sequencer/
external_event_handler.rs

1//! Should probably rename this to "external" or something
2
3use std::{marker::PhantomData, sync::Arc};
4
5use anyhow::{Context, Result};
6use espresso_types::{PubKey, SeqTypes};
7use hotshot::types::Message;
8use hotshot_types::{
9    message::MessageKind,
10    traits::{
11        network::{BroadcastDelay, ConnectedNetwork, Topic},
12        node_implementation::Versions,
13    },
14};
15use request_response::network::Bytes;
16use serde::{Deserialize, Serialize};
17use tokio::sync::mpsc::{Receiver, Sender};
18use vbs::{bincode_serializer::BincodeSerializer, version::StaticVersion, BinarySerializer};
19
20use crate::context::TaskList;
21
22/// An external message that can be sent to or received from a node
23#[derive(Debug, Serialize, Deserialize, Clone)]
24pub enum ExternalMessage {
25    RequestResponse(Vec<u8>),
26}
27
28/// The external event handler
29#[derive(Clone)]
30pub struct ExternalEventHandler<V: Versions> {
31    /// The sender to the request-response protocol
32    request_response_sender: Sender<Bytes>,
33
34    /// The type phantom
35    phantom: PhantomData<V>,
36}
37
38// The different types of outbound messages (broadcast or direct)
39#[derive(Debug)]
40#[allow(dead_code)]
41pub enum OutboundMessage {
42    Direct(MessageKind<SeqTypes>, PubKey),
43    Broadcast(MessageKind<SeqTypes>),
44}
45
46impl<V: Versions> ExternalEventHandler<V> {
47    /// Creates a new `ExternalEventHandler` with the given network
48    pub async fn new<N: ConnectedNetwork<PubKey>>(
49        tasks: &mut TaskList,
50        request_response_sender: Sender<Bytes>,
51        outbound_message_receiver: Receiver<OutboundMessage>,
52        network: Arc<N>,
53        public_key: PubKey,
54    ) -> Result<Self> {
55        // Spawn the outbound message handling loop
56        tasks.spawn(
57            "ExternalEventHandler",
58            Self::outbound_message_loop(outbound_message_receiver, network, public_key),
59        );
60
61        Ok(Self {
62            request_response_sender,
63            phantom: PhantomData,
64        })
65    }
66
67    /// Handles an event
68    ///
69    /// # Errors
70    /// If the message type is unknown or if there is an error serializing or deserializing the message
71    pub async fn handle_event(&self, external_message_bytes: &[u8]) -> Result<()> {
72        // Deserialize the external message
73        let external_message = bincode::deserialize(external_message_bytes)
74            .with_context(|| "Failed to deserialize external message")?;
75
76        // Match the type
77        match external_message {
78            ExternalMessage::RequestResponse(request_response) => {
79                // Send the inner message to the request-response protocol
80                self.request_response_sender
81                    .send(request_response.into())
82                    .await?;
83            },
84        }
85        Ok(())
86    }
87
88    /// The main loop for sending outbound messages.
89    async fn outbound_message_loop<N: ConnectedNetwork<PubKey>>(
90        mut receiver: Receiver<OutboundMessage>,
91        network: Arc<N>,
92        public_key: PubKey,
93    ) {
94        while let Some(message) = receiver.recv().await {
95            // Match the message type
96            match message {
97                OutboundMessage::Direct(message, recipient) => {
98                    // Wrap it in the real message type
99                    let message_inner = Message {
100                        sender: public_key,
101                        kind: message,
102                    };
103
104                    // Serialize it
105                    let message_bytes =
106                        match BincodeSerializer::<StaticVersion<0, 0>>::serialize(&message_inner) {
107                            Ok(message_bytes) => message_bytes,
108                            Err(err) => {
109                                tracing::warn!("Failed to serialize direct message: {}", err);
110                                continue;
111                            },
112                        };
113
114                    // Send the message to the recipient
115                    if let Err(err) = network.direct_message(message_bytes, recipient).await {
116                        tracing::warn!("Failed to send message: {:?}", err);
117                    };
118                },
119
120                OutboundMessage::Broadcast(message) => {
121                    // Wrap it in the real message type
122                    let message_inner = Message {
123                        sender: public_key,
124                        kind: message,
125                    };
126
127                    // Serialize it
128                    let message_bytes =
129                        match BincodeSerializer::<StaticVersion<0, 0>>::serialize(&message_inner) {
130                            Ok(message_bytes) => message_bytes,
131                            Err(err) => {
132                                tracing::warn!("Failed to serialize broadcast message: {}", err);
133                                continue;
134                            },
135                        };
136
137                    // Broadcast the message to the global topic
138                    if let Err(err) = network
139                        .broadcast_message(message_bytes, Topic::Global, BroadcastDelay::None)
140                        .await
141                    {
142                        tracing::error!("Failed to broadcast message: {:?}", err);
143                    };
144                },
145            }
146        }
147    }
148}