hotshot_events_service/
events_source.rs

1use std::{marker::PhantomData, sync::Arc};
2
3use async_broadcast::{broadcast, InactiveReceiver, Sender as BroadcastSender};
4use async_trait::async_trait;
5use futures::{
6    future::BoxFuture,
7    stream::{BoxStream, Stream, StreamExt},
8};
9use hotshot_types::{
10    event::{Event, EventType, LegacyEvent},
11    traits::node_implementation::NodeType,
12    PeerConfig,
13};
14use serde::{Deserialize, Serialize};
15use tide_disco::method::ReadState;
16const RETAINED_EVENTS_COUNT: usize = 4096;
17
18#[async_trait]
19pub trait EventsSource<Types>
20where
21    Types: NodeType,
22{
23    type EventStream: Stream<Item = Arc<Event<Types>>> + Unpin + Send + 'static;
24    type LegacyEventStream: Stream<Item = Arc<LegacyEvent<Types>>> + Unpin + Send + 'static;
25    async fn get_event_stream(&self, filter: Option<EventFilterSet<Types>>) -> Self::EventStream;
26    async fn get_legacy_event_stream(
27        &self,
28        filter: Option<EventFilterSet<Types>>,
29    ) -> Self::LegacyEventStream;
30    async fn get_startup_info(&self) -> StartupInfo<Types>;
31}
32
33#[derive(Clone, Debug, Serialize, Deserialize)]
34#[serde(bound = "Types::SignatureKey: for<'a> Deserialize<'a>")]
35pub struct StartupInfo<Types: NodeType> {
36    pub known_node_with_stake: Vec<PeerConfig<Types>>,
37    pub non_staked_node_count: usize,
38}
39
40#[async_trait]
41pub trait EventConsumer<Types>
42where
43    Types: NodeType,
44{
45    async fn handle_event(&mut self, event: Event<Types>);
46}
47
48#[derive(Debug)]
49pub struct EventsStreamer<Types: NodeType> {
50    // required for api subscription
51    inactive_to_subscribe_clone_recv: InactiveReceiver<Arc<Event<Types>>>,
52    subscriber_send_channel: BroadcastSender<Arc<Event<Types>>>,
53
54    // required for sending startup info
55    known_nodes_with_stake: Vec<PeerConfig<Types>>,
56    non_staked_node_count: usize,
57}
58
59impl<Types: NodeType> EventsStreamer<Types> {
60    pub fn known_node_with_stake(&self) -> Vec<PeerConfig<Types>> {
61        self.known_nodes_with_stake.clone()
62    }
63
64    pub fn non_staked_node_count(&self) -> usize {
65        self.non_staked_node_count
66    }
67}
68
69#[async_trait]
70impl<Types: NodeType> EventConsumer<Types> for EventsStreamer<Types> {
71    async fn handle_event(&mut self, event: Event<Types>) {
72        if let Err(e) = self.subscriber_send_channel.broadcast(event.into()).await {
73            tracing::debug!("Error broadcasting the event: {e:?}");
74        }
75    }
76}
77
78/// Wrapper struct representing a set of event filters.
79#[derive(Clone, Debug)]
80pub struct EventFilterSet<Types: NodeType>(pub(crate) Vec<EventFilter<Types>>);
81
82/// `From` trait impl to create an `EventFilterSet` from a vector of `EventFilter`s.
83impl<Types: NodeType> From<Vec<EventFilter<Types>>> for EventFilterSet<Types> {
84    fn from(filters: Vec<EventFilter<Types>>) -> Self {
85        EventFilterSet(filters)
86    }
87}
88
89/// `From` trait impl to create an `EventFilterSet` from a single `EventFilter`.
90impl<Types: NodeType> From<EventFilter<Types>> for EventFilterSet<Types> {
91    fn from(filter: EventFilter<Types>) -> Self {
92        EventFilterSet(vec![filter])
93    }
94}
95
96impl<Types: NodeType> EventFilterSet<Types> {
97    /// Determines whether the given hotshot event should be broadcast based on the filters in the set.
98    ///
99    ///  Returns `true` if the event should be broadcast, `false` otherwise.
100    pub(crate) fn should_broadcast(&self, hotshot_event: &EventType<Types>) -> bool {
101        let filter = &self.0;
102
103        match hotshot_event {
104            EventType::Error { .. } => filter.contains(&EventFilter::Error),
105            EventType::Decide { .. } => filter.contains(&EventFilter::Decide),
106            EventType::ReplicaViewTimeout { .. } => {
107                filter.contains(&EventFilter::ReplicaViewTimeout)
108            },
109            EventType::ViewFinished { .. } => filter.contains(&EventFilter::ViewFinished),
110            EventType::ViewTimeout { .. } => filter.contains(&EventFilter::ViewTimeout),
111            EventType::Transactions { .. } => filter.contains(&EventFilter::Transactions),
112            EventType::DaProposal { .. } => filter.contains(&EventFilter::DaProposal),
113            EventType::QuorumProposal { .. } => filter.contains(&EventFilter::QuorumProposal),
114            EventType::UpgradeProposal { .. } => filter.contains(&EventFilter::UpgradeProposal),
115            _ => false,
116        }
117    }
118}
119
120/// Possible event filters
121/// If the hotshot`EventType` enum is modified, this enum should also be updated
122#[derive(Clone, Debug, PartialEq)]
123pub enum EventFilter<Types: NodeType> {
124    Error,
125    Decide,
126    ReplicaViewTimeout,
127    ViewFinished,
128    ViewTimeout,
129    Transactions,
130    DaProposal,
131    QuorumProposal,
132    UpgradeProposal,
133    Pd(PhantomData<Types>),
134}
135
136#[async_trait]
137impl<Types: NodeType> EventsSource<Types> for EventsStreamer<Types> {
138    type EventStream = BoxStream<'static, Arc<Event<Types>>>;
139    type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<Types>>>;
140
141    async fn get_event_stream(&self, filter: Option<EventFilterSet<Types>>) -> Self::EventStream {
142        let receiver = self.inactive_to_subscribe_clone_recv.activate_cloned();
143
144        if let Some(filter) = filter {
145            receiver
146                .filter(move |event| {
147                    futures::future::ready(filter.should_broadcast(&event.as_ref().event))
148                })
149                .boxed()
150        } else {
151            receiver.boxed()
152        }
153    }
154
155    async fn get_legacy_event_stream(
156        &self,
157        filter: Option<EventFilterSet<Types>>,
158    ) -> Self::LegacyEventStream {
159        let receiver = self.inactive_to_subscribe_clone_recv.activate_cloned();
160
161        if let Some(filter) = filter {
162            receiver
163                .filter(move |event| {
164                    futures::future::ready(filter.should_broadcast(&event.as_ref().event))
165                })
166                .filter_map(|a| {
167                    futures::future::ready(Event::to_legacy(a.as_ref().clone()).ok().map(Arc::new))
168                })
169                .boxed()
170        } else {
171            receiver
172                .filter_map(|a| {
173                    futures::future::ready(Event::to_legacy(a.as_ref().clone()).ok().map(Arc::new))
174                })
175                .boxed()
176        }
177    }
178
179    async fn get_startup_info(&self) -> StartupInfo<Types> {
180        StartupInfo {
181            known_node_with_stake: self.known_node_with_stake(),
182            non_staked_node_count: self.non_staked_node_count(),
183        }
184    }
185}
186
187impl<Types: NodeType> EventsStreamer<Types> {
188    pub fn new(
189        known_nodes_with_stake: Vec<PeerConfig<Types>>,
190        non_staked_node_count: usize,
191    ) -> Self {
192        let (mut subscriber_send_channel, to_subscribe_clone_recv) =
193            broadcast::<Arc<Event<Types>>>(RETAINED_EVENTS_COUNT);
194        // set the overflow to true to drop older messages from the channel
195        subscriber_send_channel.set_overflow(true);
196        // set the await active to false to not block the sender
197        subscriber_send_channel.set_await_active(false);
198        let inactive_to_subscribe_clone_recv = to_subscribe_clone_recv.deactivate();
199
200        EventsStreamer {
201            subscriber_send_channel,
202            inactive_to_subscribe_clone_recv,
203            known_nodes_with_stake,
204            non_staked_node_count,
205        }
206    }
207}
208
209#[async_trait]
210impl<Types: NodeType> ReadState for EventsStreamer<Types> {
211    type State = Self;
212
213    async fn read<T>(
214        &self,
215        op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait,
216    ) -> T {
217        op(self).await
218    }
219}