hotshot_events_service/
test.rs

1#[cfg(test)]
2mod tests {
3    use std::sync::Arc;
4
5    use alloy::primitives::U256;
6    use async_lock::RwLock;
7    use futures::stream::StreamExt;
8    use hotshot_example_types::node_types::TestTypes;
9    use hotshot_types::{
10        data::ViewNumber,
11        event::{Event, EventType},
12        light_client::StateKeyPair,
13        signature_key::BLSPubKey,
14        traits::{
15            node_implementation::{ConsensusTime, NodeType},
16            signature_key::SignatureKey,
17        },
18        PeerConfig,
19    };
20    use surf_disco::Client;
21    use tide_disco::{App, Url};
22    use tokio::spawn;
23    use tracing_test::traced_test;
24    use vbs::version::{StaticVersion, StaticVersionType};
25
26    //use crate::fetch::Fetch;
27    use crate::events::{define_api, Error, Options};
28    use crate::events_source::{EventConsumer, EventsStreamer, StartupInfo}; // EventsUpdater};
29
30    // return a empty transaction event
31    fn generate_event<Types: NodeType<View = ViewNumber>>(view_number: u64) -> Event<Types> {
32        Event {
33            view_number: ViewNumber::new(view_number),
34            event: EventType::Transactions {
35                transactions: vec![],
36            },
37        }
38    }
39
40    #[tokio::test]
41    #[traced_test]
42    async fn test_no_active_receiver() {
43        tracing::info!("Starting test_no_active_receiver");
44        let port = portpicker::pick_unused_port().expect("Could not find an open port");
45        let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();
46
47        let known_nodes_with_stake = vec![];
48        let non_staked_node_count = 0;
49        let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
50            known_nodes_with_stake,
51            non_staked_node_count,
52        )));
53
54        // Start the web server.
55        let mut app = App::<_, Error>::with_state(events_streamer.clone());
56
57        let hotshot_events_api = define_api::<
58            Arc<RwLock<EventsStreamer<TestTypes>>>,
59            TestTypes,
60            StaticVersion<0, 1>,
61        >(&Options::default(), "0.0.1".parse().unwrap())
62        .expect("Failed to define hotshot eventsAPI");
63
64        app.register_module("hotshot_events", hotshot_events_api)
65            .expect("Failed to register hotshot events API");
66
67        spawn(app.serve(api_url, StaticVersion::<0, 1>::instance()));
68        let total_count = 5;
69        let send_handle = spawn(async move {
70            let mut send_count = 0;
71            loop {
72                let tx_event = generate_event(send_count);
73                tracing::debug!("Before writing to events_source");
74                events_streamer
75                    .write()
76                    .await
77                    .handle_event(tx_event.clone())
78                    .await;
79                send_count += 1;
80                tracing::debug!("After writing to events_source");
81                if send_count >= total_count {
82                    break;
83                }
84            }
85        });
86
87        send_handle.await.unwrap();
88    }
89
90    #[tokio::test]
91    #[traced_test]
92    async fn test_startup_info_endpoint() {
93        let port = portpicker::pick_unused_port().expect("Could not find an open port");
94        let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();
95
96        let private_key =
97            <BLSPubKey as SignatureKey>::PrivateKey::generate(&mut rand::thread_rng());
98        let pub_key = BLSPubKey::from_private(&private_key);
99        let state_key_pair = StateKeyPair::generate();
100
101        let peer_config = PeerConfig::<TestTypes> {
102            stake_table_entry: pub_key.stake_table_entry(U256::from(1)),
103            state_ver_key: state_key_pair.ver_key(),
104        };
105
106        let known_nodes_with_stake = vec![peer_config];
107        let non_staked_node_count = 10;
108
109        let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
110            known_nodes_with_stake.clone(),
111            non_staked_node_count,
112        )));
113
114        // Start the web server.
115        let mut app = App::<_, Error>::with_state(events_streamer.clone());
116
117        let hotshot_events_api = define_api::<
118            Arc<RwLock<EventsStreamer<TestTypes>>>,
119            TestTypes,
120            StaticVersion<0, 1>,
121        >(&Options::default(), "0.0.1".parse().unwrap())
122        .expect("Failed to define hotshot eventsAPI");
123
124        app.register_module("api", hotshot_events_api)
125            .expect("Failed to register hotshot events API");
126
127        spawn(app.serve(api_url.clone(), StaticVersion::<0, 1>::instance()));
128
129        let client = Client::<Error, StaticVersion<0, 1>>::new(
130            format!("http://localhost:{port}/api").parse().unwrap(),
131        );
132        client.connect(None).await;
133
134        let startup_info: StartupInfo<TestTypes> = client
135            .get("startup_info")
136            .send()
137            .await
138            .expect("failed to get startup_info");
139
140        assert_eq!(startup_info.known_node_with_stake, known_nodes_with_stake);
141        assert_eq!(startup_info.non_staked_node_count, non_staked_node_count);
142    }
143
144    #[tokio::test]
145    #[traced_test]
146    async fn test_event_stream() {
147        tracing::info!("Starting test_event_stream");
148
149        let port = portpicker::pick_unused_port().expect("Could not find an open port");
150        let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();
151
152        let known_nodes_with_stake = vec![];
153        let non_staked_node_count = 0;
154        let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
155            known_nodes_with_stake,
156            non_staked_node_count,
157        )));
158
159        // Start the web server.
160        let mut app = App::<_, Error>::with_state(events_streamer.clone());
161
162        let hotshot_events_api = define_api::<
163            Arc<RwLock<EventsStreamer<TestTypes>>>,
164            TestTypes,
165            StaticVersion<0, 1>,
166        >(&Options::default(), "1.0.0".parse().unwrap())
167        .expect("Failed to define hotshot eventsAPI");
168
169        app.register_module("hotshot_events", hotshot_events_api)
170            .expect("Failed to register hotshot events API");
171
172        spawn(app.serve(api_url, StaticVersion::<0, 1>::instance()));
173
174        // Start Client 1
175        let client_1 = Client::<Error, StaticVersion<0, 1>>::new(
176            format!("http://localhost:{port}/hotshot_events")
177                .parse()
178                .unwrap(),
179        );
180        client_1.connect(None).await;
181
182        tracing::info!("Client 1 Connected to server");
183
184        // client 1 subscribe to hotshot events
185        let mut events_1 = client_1
186            .socket("events")
187            .subscribe::<Event<TestTypes>>()
188            .await
189            .unwrap();
190
191        tracing::info!("Client 1 Subscribed to events");
192
193        // Start Client 2
194        let client_2 = Client::<Error, StaticVersion<0, 1>>::new(
195            format!("http://localhost:{port}/hotshot_events")
196                .parse()
197                .unwrap(),
198        );
199        client_2.connect(None).await;
200
201        tracing::info!("Client 2 Connected to server");
202
203        // client 2 subscrive to hotshot events
204        let mut events_2 = client_2
205            .socket("events")
206            .subscribe::<Event<TestTypes>>()
207            .await
208            .unwrap();
209
210        tracing::info!("Client 2 Subscribed to events");
211
212        let total_count = 5;
213        // wait for these events to receive on client 1
214        let receive_handle_1 = spawn(async move {
215            let mut receive_count = 0;
216            while let Some(event) = events_1.next().await {
217                let event = event.unwrap();
218                tracing::info!("Received event in Client 1: {event:?}");
219
220                receive_count += 1;
221
222                if receive_count == total_count {
223                    tracing::info!("Client1 Received all sent events, exiting loop");
224                    break;
225                }
226            }
227
228            assert_eq!(receive_count, total_count);
229
230            tracing::info!("stream ended");
231        });
232
233        // wait for these events to receive on client 2
234        let receive_handle_2 = spawn(async move {
235            let mut receive_count = 0;
236            while let Some(event) = events_2.next().await {
237                let event = event.unwrap();
238
239                tracing::info!("Received event in Client 2: {event:?}");
240                receive_count += 1;
241
242                if receive_count == total_count {
243                    tracing::info!("Client 2 Received all sent events, exiting loop");
244                    break;
245                }
246            }
247
248            assert_eq!(receive_count, total_count);
249
250            tracing::info!("stream ended");
251        });
252
253        let send_handle = spawn(async move {
254            let mut send_count = 0;
255            loop {
256                let tx_event = generate_event(send_count);
257                tracing::debug!("Before writing to events_source");
258                events_streamer
259                    .write()
260                    .await
261                    .handle_event(tx_event.clone())
262                    .await;
263                send_count += 1;
264                tracing::debug!("After writing to events_source");
265                tracing::info!("Event sent: {tx_event:?}");
266                if send_count >= total_count {
267                    break;
268                }
269            }
270        });
271
272        send_handle.await.unwrap();
273        receive_handle_1.await.unwrap();
274        receive_handle_2.await.unwrap();
275    }
276}