hotshot_events_service/
events_source.rs1use std::{marker::PhantomData, sync::Arc};
2
3use async_broadcast::{broadcast, InactiveReceiver, Sender as BroadcastSender};
4use async_trait::async_trait;
5use futures::{
6 future::BoxFuture,
7 stream::{BoxStream, Stream, StreamExt},
8};
9use hotshot_types::{
10 event::{Event, EventType, LegacyEvent},
11 traits::node_implementation::NodeType,
12 PeerConfig,
13};
14use serde::{Deserialize, Serialize};
15use tide_disco::method::ReadState;
16const RETAINED_EVENTS_COUNT: usize = 4096;
17
18#[async_trait]
19pub trait EventsSource<Types>
20where
21 Types: NodeType,
22{
23 type EventStream: Stream<Item = Arc<Event<Types>>> + Unpin + Send + 'static;
24 type LegacyEventStream: Stream<Item = Arc<LegacyEvent<Types>>> + Unpin + Send + 'static;
25 async fn get_event_stream(&self, filter: Option<EventFilterSet<Types>>) -> Self::EventStream;
26 async fn get_legacy_event_stream(
27 &self,
28 filter: Option<EventFilterSet<Types>>,
29 ) -> Self::LegacyEventStream;
30 async fn get_startup_info(&self) -> StartupInfo<Types>;
31}
32
33#[derive(Clone, Debug, Serialize, Deserialize)]
34#[serde(bound = "Types::SignatureKey: for<'a> Deserialize<'a>")]
35pub struct StartupInfo<Types: NodeType> {
36 pub known_node_with_stake: Vec<PeerConfig<Types>>,
37 pub non_staked_node_count: usize,
38}
39
40#[async_trait]
41pub trait EventConsumer<Types>
42where
43 Types: NodeType,
44{
45 async fn handle_event(&mut self, event: Event<Types>);
46}
47
48#[derive(Debug)]
49pub struct EventsStreamer<Types: NodeType> {
50 inactive_to_subscribe_clone_recv: InactiveReceiver<Arc<Event<Types>>>,
52 subscriber_send_channel: BroadcastSender<Arc<Event<Types>>>,
53
54 known_nodes_with_stake: Vec<PeerConfig<Types>>,
56 non_staked_node_count: usize,
57}
58
59impl<Types: NodeType> EventsStreamer<Types> {
60 pub fn known_node_with_stake(&self) -> Vec<PeerConfig<Types>> {
61 self.known_nodes_with_stake.clone()
62 }
63
64 pub fn non_staked_node_count(&self) -> usize {
65 self.non_staked_node_count
66 }
67}
68
69#[async_trait]
70impl<Types: NodeType> EventConsumer<Types> for EventsStreamer<Types> {
71 async fn handle_event(&mut self, event: Event<Types>) {
72 if let Err(e) = self.subscriber_send_channel.broadcast(event.into()).await {
73 tracing::debug!("Error broadcasting the event: {e:?}");
74 }
75 }
76}
77
78#[derive(Clone, Debug)]
80pub struct EventFilterSet<Types: NodeType>(pub(crate) Vec<EventFilter<Types>>);
81
82impl<Types: NodeType> From<Vec<EventFilter<Types>>> for EventFilterSet<Types> {
84 fn from(filters: Vec<EventFilter<Types>>) -> Self {
85 EventFilterSet(filters)
86 }
87}
88
89impl<Types: NodeType> From<EventFilter<Types>> for EventFilterSet<Types> {
91 fn from(filter: EventFilter<Types>) -> Self {
92 EventFilterSet(vec![filter])
93 }
94}
95
96impl<Types: NodeType> EventFilterSet<Types> {
97 pub(crate) fn should_broadcast(&self, hotshot_event: &EventType<Types>) -> bool {
101 let filter = &self.0;
102
103 match hotshot_event {
104 EventType::Error { .. } => filter.contains(&EventFilter::Error),
105 EventType::Decide { .. } => filter.contains(&EventFilter::Decide),
106 EventType::ReplicaViewTimeout { .. } => {
107 filter.contains(&EventFilter::ReplicaViewTimeout)
108 },
109 EventType::ViewFinished { .. } => filter.contains(&EventFilter::ViewFinished),
110 EventType::ViewTimeout { .. } => filter.contains(&EventFilter::ViewTimeout),
111 EventType::Transactions { .. } => filter.contains(&EventFilter::Transactions),
112 EventType::DaProposal { .. } => filter.contains(&EventFilter::DaProposal),
113 EventType::QuorumProposal { .. } => filter.contains(&EventFilter::QuorumProposal),
114 EventType::UpgradeProposal { .. } => filter.contains(&EventFilter::UpgradeProposal),
115 _ => false,
116 }
117 }
118}
119
120#[derive(Clone, Debug, PartialEq)]
123pub enum EventFilter<Types: NodeType> {
124 Error,
125 Decide,
126 ReplicaViewTimeout,
127 ViewFinished,
128 ViewTimeout,
129 Transactions,
130 DaProposal,
131 QuorumProposal,
132 UpgradeProposal,
133 Pd(PhantomData<Types>),
134}
135
136#[async_trait]
137impl<Types: NodeType> EventsSource<Types> for EventsStreamer<Types> {
138 type EventStream = BoxStream<'static, Arc<Event<Types>>>;
139 type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<Types>>>;
140
141 async fn get_event_stream(&self, filter: Option<EventFilterSet<Types>>) -> Self::EventStream {
142 let receiver = self.inactive_to_subscribe_clone_recv.activate_cloned();
143
144 if let Some(filter) = filter {
145 receiver
146 .filter(move |event| {
147 futures::future::ready(filter.should_broadcast(&event.as_ref().event))
148 })
149 .boxed()
150 } else {
151 receiver.boxed()
152 }
153 }
154
155 async fn get_legacy_event_stream(
156 &self,
157 filter: Option<EventFilterSet<Types>>,
158 ) -> Self::LegacyEventStream {
159 let receiver = self.inactive_to_subscribe_clone_recv.activate_cloned();
160
161 if let Some(filter) = filter {
162 receiver
163 .filter(move |event| {
164 futures::future::ready(filter.should_broadcast(&event.as_ref().event))
165 })
166 .filter_map(|a| {
167 futures::future::ready(Event::to_legacy(a.as_ref().clone()).ok().map(Arc::new))
168 })
169 .boxed()
170 } else {
171 receiver
172 .filter_map(|a| {
173 futures::future::ready(Event::to_legacy(a.as_ref().clone()).ok().map(Arc::new))
174 })
175 .boxed()
176 }
177 }
178
179 async fn get_startup_info(&self) -> StartupInfo<Types> {
180 StartupInfo {
181 known_node_with_stake: self.known_node_with_stake(),
182 non_staked_node_count: self.non_staked_node_count(),
183 }
184 }
185}
186
187impl<Types: NodeType> EventsStreamer<Types> {
188 pub fn new(
189 known_nodes_with_stake: Vec<PeerConfig<Types>>,
190 non_staked_node_count: usize,
191 ) -> Self {
192 let (mut subscriber_send_channel, to_subscribe_clone_recv) =
193 broadcast::<Arc<Event<Types>>>(RETAINED_EVENTS_COUNT);
194 subscriber_send_channel.set_overflow(true);
196 subscriber_send_channel.set_await_active(false);
198 let inactive_to_subscribe_clone_recv = to_subscribe_clone_recv.deactivate();
199
200 EventsStreamer {
201 subscriber_send_channel,
202 inactive_to_subscribe_clone_recv,
203 known_nodes_with_stake,
204 non_staked_node_count,
205 }
206 }
207}
208
209#[async_trait]
210impl<Types: NodeType> ReadState for EventsStreamer<Types> {
211 type State = Self;
212
213 async fn read<T>(
214 &self,
215 op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait,
216 ) -> T {
217 op(self).await
218 }
219}