hotshot_events_service/
test.rs1#[cfg(test)]
2mod tests {
3 use std::sync::Arc;
4
5 use alloy::primitives::U256;
6 use async_lock::RwLock;
7 use futures::stream::StreamExt;
8 use hotshot_example_types::node_types::TestTypes;
9 use hotshot_types::{
10 PeerConfig,
11 data::ViewNumber,
12 event::{Event, EventType},
13 light_client::StateKeyPair,
14 signature_key::BLSPubKey,
15 traits::{node_implementation::NodeType, signature_key::SignatureKey},
16 };
17 use surf_disco::Client;
18 use test_utils::reserve_tcp_port;
19 use tide_disco::{App, Url};
20 use tokio::spawn;
21 use tracing_test::traced_test;
22 use vbs::version::{StaticVersion, StaticVersionType};
23
24 use crate::events::{Error, Options, define_api};
26 use crate::events_source::{EventConsumer, EventsStreamer, StartupInfo}; fn generate_event<Types: NodeType>(view_number: u64) -> Event<Types> {
30 Event {
31 view_number: ViewNumber::new(view_number),
32 event: EventType::Transactions {
33 transactions: vec![],
34 },
35 }
36 }
37
38 #[tokio::test]
39 #[traced_test]
40 async fn test_no_active_receiver() {
41 tracing::info!("Starting test_no_active_receiver");
42 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
43 let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();
44
45 let known_nodes_with_stake = vec![];
46 let non_staked_node_count = 0;
47 let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
48 known_nodes_with_stake,
49 non_staked_node_count,
50 )));
51
52 let mut app = App::<_, Error>::with_state(events_streamer.clone());
54
55 let hotshot_events_api = define_api::<
56 Arc<RwLock<EventsStreamer<TestTypes>>>,
57 TestTypes,
58 StaticVersion<0, 1>,
59 >(&Options::default(), "0.0.1".parse().unwrap())
60 .expect("Failed to define hotshot eventsAPI");
61
62 app.register_module("hotshot_events", hotshot_events_api)
63 .expect("Failed to register hotshot events API");
64
65 spawn(app.serve(api_url, StaticVersion::<0, 1>::instance()));
66 let total_count = 5;
67 let send_handle = spawn(async move {
68 let mut send_count = 0;
69 loop {
70 let tx_event = generate_event(send_count);
71 tracing::debug!("Before writing to events_source");
72 events_streamer
73 .write()
74 .await
75 .handle_event(tx_event.clone())
76 .await;
77 send_count += 1;
78 tracing::debug!("After writing to events_source");
79 if send_count >= total_count {
80 break;
81 }
82 }
83 });
84
85 send_handle.await.unwrap();
86 }
87
88 #[tokio::test]
89 #[traced_test]
90 async fn test_startup_info_endpoint() {
91 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
92 let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();
93
94 let private_key =
95 <BLSPubKey as SignatureKey>::PrivateKey::generate(&mut rand::thread_rng());
96 let pub_key = BLSPubKey::from_private(&private_key);
97 let state_key_pair = StateKeyPair::generate();
98
99 let peer_config = PeerConfig::<TestTypes> {
100 stake_table_entry: pub_key.stake_table_entry(U256::from(1)),
101 state_ver_key: state_key_pair.ver_key(),
102 connect_info: None,
103 };
104
105 let known_nodes_with_stake = vec![peer_config];
106 let non_staked_node_count = 10;
107
108 let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
109 known_nodes_with_stake.clone(),
110 non_staked_node_count,
111 )));
112
113 let mut app = App::<_, Error>::with_state(events_streamer.clone());
115
116 let hotshot_events_api = define_api::<
117 Arc<RwLock<EventsStreamer<TestTypes>>>,
118 TestTypes,
119 StaticVersion<0, 1>,
120 >(&Options::default(), "0.0.1".parse().unwrap())
121 .expect("Failed to define hotshot eventsAPI");
122
123 app.register_module("api", hotshot_events_api)
124 .expect("Failed to register hotshot events API");
125
126 spawn(app.serve(api_url.clone(), StaticVersion::<0, 1>::instance()));
127
128 let client = Client::<Error, StaticVersion<0, 1>>::new(
129 format!("http://localhost:{port}/api").parse().unwrap(),
130 );
131 client.connect(None).await;
132
133 let startup_info: StartupInfo<TestTypes> = client
134 .get("startup_info")
135 .send()
136 .await
137 .expect("failed to get startup_info");
138
139 assert_eq!(startup_info.known_node_with_stake, known_nodes_with_stake);
140 assert_eq!(startup_info.non_staked_node_count, non_staked_node_count);
141 }
142
143 #[tokio::test]
144 #[traced_test]
145 async fn test_event_stream() {
146 tracing::info!("Starting test_event_stream");
147
148 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
149 let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();
150
151 let known_nodes_with_stake = vec![];
152 let non_staked_node_count = 0;
153 let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
154 known_nodes_with_stake,
155 non_staked_node_count,
156 )));
157
158 let mut app = App::<_, Error>::with_state(events_streamer.clone());
160
161 let hotshot_events_api = define_api::<
162 Arc<RwLock<EventsStreamer<TestTypes>>>,
163 TestTypes,
164 StaticVersion<0, 1>,
165 >(&Options::default(), "1.0.0".parse().unwrap())
166 .expect("Failed to define hotshot eventsAPI");
167
168 app.register_module("hotshot_events", hotshot_events_api)
169 .expect("Failed to register hotshot events API");
170
171 spawn(app.serve(api_url, StaticVersion::<0, 1>::instance()));
172
173 let client_1 = Client::<Error, StaticVersion<0, 1>>::new(
175 format!("http://localhost:{port}/hotshot_events")
176 .parse()
177 .unwrap(),
178 );
179 client_1.connect(None).await;
180
181 tracing::info!("Client 1 Connected to server");
182
183 let mut events_1 = client_1
185 .socket("events")
186 .subscribe::<Event<TestTypes>>()
187 .await
188 .unwrap();
189
190 tracing::info!("Client 1 Subscribed to events");
191
192 let client_2 = Client::<Error, StaticVersion<0, 1>>::new(
194 format!("http://localhost:{port}/hotshot_events")
195 .parse()
196 .unwrap(),
197 );
198 client_2.connect(None).await;
199
200 tracing::info!("Client 2 Connected to server");
201
202 let mut events_2 = client_2
204 .socket("events")
205 .subscribe::<Event<TestTypes>>()
206 .await
207 .unwrap();
208
209 tracing::info!("Client 2 Subscribed to events");
210
211 let total_count = 5;
212 let receive_handle_1 = spawn(async move {
214 let mut receive_count = 0;
215 while let Some(event) = events_1.next().await {
216 let event = event.unwrap();
217 tracing::info!("Received event in Client 1: {event:?}");
218
219 receive_count += 1;
220
221 if receive_count == total_count {
222 tracing::info!("Client1 Received all sent events, exiting loop");
223 break;
224 }
225 }
226
227 assert_eq!(receive_count, total_count);
228
229 tracing::info!("stream ended");
230 });
231
232 let receive_handle_2 = spawn(async move {
234 let mut receive_count = 0;
235 while let Some(event) = events_2.next().await {
236 let event = event.unwrap();
237
238 tracing::info!("Received event in Client 2: {event:?}");
239 receive_count += 1;
240
241 if receive_count == total_count {
242 tracing::info!("Client 2 Received all sent events, exiting loop");
243 break;
244 }
245 }
246
247 assert_eq!(receive_count, total_count);
248
249 tracing::info!("stream ended");
250 });
251
252 let send_handle = spawn(async move {
253 let mut send_count = 0;
254 loop {
255 let tx_event = generate_event(send_count);
256 tracing::debug!("Before writing to events_source");
257 events_streamer
258 .write()
259 .await
260 .handle_event(tx_event.clone())
261 .await;
262 send_count += 1;
263 tracing::debug!("After writing to events_source");
264 tracing::info!("Event sent: {tx_event:?}");
265 if send_count >= total_count {
266 break;
267 }
268 }
269 });
270
271 send_handle.await.unwrap();
272 receive_handle_1.await.unwrap();
273 receive_handle_2.await.unwrap();
274 }
275}