hotshot_events_service/
test.rs1#[cfg(test)]
2mod tests {
3 use std::sync::Arc;
4
5 use alloy::primitives::U256;
6 use async_lock::RwLock;
7 use futures::stream::StreamExt;
8 use hotshot_example_types::node_types::TestTypes;
9 use hotshot_types::{
10 data::ViewNumber,
11 event::{Event, EventType},
12 light_client::StateKeyPair,
13 signature_key::BLSPubKey,
14 traits::{
15 node_implementation::{ConsensusTime, NodeType},
16 signature_key::SignatureKey,
17 },
18 PeerConfig,
19 };
20 use surf_disco::Client;
21 use tide_disco::{App, Url};
22 use tokio::spawn;
23 use tracing_test::traced_test;
24 use vbs::version::{StaticVersion, StaticVersionType};
25
26 use crate::events::{define_api, Error, Options};
28 use crate::events_source::{EventConsumer, EventsStreamer, StartupInfo}; fn generate_event<Types: NodeType<View = ViewNumber>>(view_number: u64) -> Event<Types> {
32 Event {
33 view_number: ViewNumber::new(view_number),
34 event: EventType::Transactions {
35 transactions: vec![],
36 },
37 }
38 }
39
40 #[tokio::test]
41 #[traced_test]
42 async fn test_no_active_receiver() {
43 tracing::info!("Starting test_no_active_receiver");
44 let port = portpicker::pick_unused_port().expect("Could not find an open port");
45 let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();
46
47 let known_nodes_with_stake = vec![];
48 let non_staked_node_count = 0;
49 let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
50 known_nodes_with_stake,
51 non_staked_node_count,
52 )));
53
54 let mut app = App::<_, Error>::with_state(events_streamer.clone());
56
57 let hotshot_events_api = define_api::<
58 Arc<RwLock<EventsStreamer<TestTypes>>>,
59 TestTypes,
60 StaticVersion<0, 1>,
61 >(&Options::default(), "0.0.1".parse().unwrap())
62 .expect("Failed to define hotshot eventsAPI");
63
64 app.register_module("hotshot_events", hotshot_events_api)
65 .expect("Failed to register hotshot events API");
66
67 spawn(app.serve(api_url, StaticVersion::<0, 1>::instance()));
68 let total_count = 5;
69 let send_handle = spawn(async move {
70 let mut send_count = 0;
71 loop {
72 let tx_event = generate_event(send_count);
73 tracing::debug!("Before writing to events_source");
74 events_streamer
75 .write()
76 .await
77 .handle_event(tx_event.clone())
78 .await;
79 send_count += 1;
80 tracing::debug!("After writing to events_source");
81 if send_count >= total_count {
82 break;
83 }
84 }
85 });
86
87 send_handle.await.unwrap();
88 }
89
90 #[tokio::test]
91 #[traced_test]
92 async fn test_startup_info_endpoint() {
93 let port = portpicker::pick_unused_port().expect("Could not find an open port");
94 let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();
95
96 let private_key =
97 <BLSPubKey as SignatureKey>::PrivateKey::generate(&mut rand::thread_rng());
98 let pub_key = BLSPubKey::from_private(&private_key);
99 let state_key_pair = StateKeyPair::generate();
100
101 let peer_config = PeerConfig::<TestTypes> {
102 stake_table_entry: pub_key.stake_table_entry(U256::from(1)),
103 state_ver_key: state_key_pair.ver_key(),
104 };
105
106 let known_nodes_with_stake = vec![peer_config];
107 let non_staked_node_count = 10;
108
109 let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
110 known_nodes_with_stake.clone(),
111 non_staked_node_count,
112 )));
113
114 let mut app = App::<_, Error>::with_state(events_streamer.clone());
116
117 let hotshot_events_api = define_api::<
118 Arc<RwLock<EventsStreamer<TestTypes>>>,
119 TestTypes,
120 StaticVersion<0, 1>,
121 >(&Options::default(), "0.0.1".parse().unwrap())
122 .expect("Failed to define hotshot eventsAPI");
123
124 app.register_module("api", hotshot_events_api)
125 .expect("Failed to register hotshot events API");
126
127 spawn(app.serve(api_url.clone(), StaticVersion::<0, 1>::instance()));
128
129 let client = Client::<Error, StaticVersion<0, 1>>::new(
130 format!("http://localhost:{port}/api").parse().unwrap(),
131 );
132 client.connect(None).await;
133
134 let startup_info: StartupInfo<TestTypes> = client
135 .get("startup_info")
136 .send()
137 .await
138 .expect("failed to get startup_info");
139
140 assert_eq!(startup_info.known_node_with_stake, known_nodes_with_stake);
141 assert_eq!(startup_info.non_staked_node_count, non_staked_node_count);
142 }
143
144 #[tokio::test]
145 #[traced_test]
146 async fn test_event_stream() {
147 tracing::info!("Starting test_event_stream");
148
149 let port = portpicker::pick_unused_port().expect("Could not find an open port");
150 let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();
151
152 let known_nodes_with_stake = vec![];
153 let non_staked_node_count = 0;
154 let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
155 known_nodes_with_stake,
156 non_staked_node_count,
157 )));
158
159 let mut app = App::<_, Error>::with_state(events_streamer.clone());
161
162 let hotshot_events_api = define_api::<
163 Arc<RwLock<EventsStreamer<TestTypes>>>,
164 TestTypes,
165 StaticVersion<0, 1>,
166 >(&Options::default(), "1.0.0".parse().unwrap())
167 .expect("Failed to define hotshot eventsAPI");
168
169 app.register_module("hotshot_events", hotshot_events_api)
170 .expect("Failed to register hotshot events API");
171
172 spawn(app.serve(api_url, StaticVersion::<0, 1>::instance()));
173
174 let client_1 = Client::<Error, StaticVersion<0, 1>>::new(
176 format!("http://localhost:{port}/hotshot_events")
177 .parse()
178 .unwrap(),
179 );
180 client_1.connect(None).await;
181
182 tracing::info!("Client 1 Connected to server");
183
184 let mut events_1 = client_1
186 .socket("events")
187 .subscribe::<Event<TestTypes>>()
188 .await
189 .unwrap();
190
191 tracing::info!("Client 1 Subscribed to events");
192
193 let client_2 = Client::<Error, StaticVersion<0, 1>>::new(
195 format!("http://localhost:{port}/hotshot_events")
196 .parse()
197 .unwrap(),
198 );
199 client_2.connect(None).await;
200
201 tracing::info!("Client 2 Connected to server");
202
203 let mut events_2 = client_2
205 .socket("events")
206 .subscribe::<Event<TestTypes>>()
207 .await
208 .unwrap();
209
210 tracing::info!("Client 2 Subscribed to events");
211
212 let total_count = 5;
213 let receive_handle_1 = spawn(async move {
215 let mut receive_count = 0;
216 while let Some(event) = events_1.next().await {
217 let event = event.unwrap();
218 tracing::info!("Received event in Client 1: {event:?}");
219
220 receive_count += 1;
221
222 if receive_count == total_count {
223 tracing::info!("Client1 Received all sent events, exiting loop");
224 break;
225 }
226 }
227
228 assert_eq!(receive_count, total_count);
229
230 tracing::info!("stream ended");
231 });
232
233 let receive_handle_2 = spawn(async move {
235 let mut receive_count = 0;
236 while let Some(event) = events_2.next().await {
237 let event = event.unwrap();
238
239 tracing::info!("Received event in Client 2: {event:?}");
240 receive_count += 1;
241
242 if receive_count == total_count {
243 tracing::info!("Client 2 Received all sent events, exiting loop");
244 break;
245 }
246 }
247
248 assert_eq!(receive_count, total_count);
249
250 tracing::info!("stream ended");
251 });
252
253 let send_handle = spawn(async move {
254 let mut send_count = 0;
255 loop {
256 let tx_event = generate_event(send_count);
257 tracing::debug!("Before writing to events_source");
258 events_streamer
259 .write()
260 .await
261 .handle_event(tx_event.clone())
262 .await;
263 send_count += 1;
264 tracing::debug!("After writing to events_source");
265 tracing::info!("Event sent: {tx_event:?}");
266 if send_count >= total_count {
267 break;
268 }
269 }
270 });
271
272 send_handle.await.unwrap();
273 receive_handle_1.await.unwrap();
274 receive_handle_2.await.unwrap();
275 }
276}