hotshot_events_service/
test.rs

1#[cfg(test)]
2mod tests {
3    use std::sync::Arc;
4
5    use alloy::primitives::U256;
6    use async_lock::RwLock;
7    use futures::stream::StreamExt;
8    use hotshot_example_types::node_types::TestTypes;
9    use hotshot_types::{
10        PeerConfig,
11        data::ViewNumber,
12        event::{Event, EventType},
13        light_client::StateKeyPair,
14        signature_key::BLSPubKey,
15        traits::{node_implementation::NodeType, signature_key::SignatureKey},
16    };
17    use surf_disco::Client;
18    use test_utils::reserve_tcp_port;
19    use tide_disco::{App, Url};
20    use tokio::spawn;
21    use tracing_test::traced_test;
22    use vbs::version::{StaticVersion, StaticVersionType};
23
24    //use crate::fetch::Fetch;
25    use crate::events::{Error, Options, define_api};
26    use crate::events_source::{EventConsumer, EventsStreamer, StartupInfo}; // EventsUpdater};
27
28    // return a empty transaction event
29    fn generate_event<Types: NodeType>(view_number: u64) -> Event<Types> {
30        Event {
31            view_number: ViewNumber::new(view_number),
32            event: EventType::Transactions {
33                transactions: vec![],
34            },
35        }
36    }
37
38    #[tokio::test]
39    #[traced_test]
40    async fn test_no_active_receiver() {
41        tracing::info!("Starting test_no_active_receiver");
42        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
43        let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();
44
45        let known_nodes_with_stake = vec![];
46        let non_staked_node_count = 0;
47        let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
48            known_nodes_with_stake,
49            non_staked_node_count,
50        )));
51
52        // Start the web server.
53        let mut app = App::<_, Error>::with_state(events_streamer.clone());
54
55        let hotshot_events_api = define_api::<
56            Arc<RwLock<EventsStreamer<TestTypes>>>,
57            TestTypes,
58            StaticVersion<0, 1>,
59        >(&Options::default(), "0.0.1".parse().unwrap())
60        .expect("Failed to define hotshot eventsAPI");
61
62        app.register_module("hotshot_events", hotshot_events_api)
63            .expect("Failed to register hotshot events API");
64
65        spawn(app.serve(api_url, StaticVersion::<0, 1>::instance()));
66        let total_count = 5;
67        let send_handle = spawn(async move {
68            let mut send_count = 0;
69            loop {
70                let tx_event = generate_event(send_count);
71                tracing::debug!("Before writing to events_source");
72                events_streamer
73                    .write()
74                    .await
75                    .handle_event(tx_event.clone())
76                    .await;
77                send_count += 1;
78                tracing::debug!("After writing to events_source");
79                if send_count >= total_count {
80                    break;
81                }
82            }
83        });
84
85        send_handle.await.unwrap();
86    }
87
88    #[tokio::test]
89    #[traced_test]
90    async fn test_startup_info_endpoint() {
91        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
92        let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();
93
94        let private_key =
95            <BLSPubKey as SignatureKey>::PrivateKey::generate(&mut rand::thread_rng());
96        let pub_key = BLSPubKey::from_private(&private_key);
97        let state_key_pair = StateKeyPair::generate();
98
99        let peer_config = PeerConfig::<TestTypes> {
100            stake_table_entry: pub_key.stake_table_entry(U256::from(1)),
101            state_ver_key: state_key_pair.ver_key(),
102            connect_info: None,
103        };
104
105        let known_nodes_with_stake = vec![peer_config];
106        let non_staked_node_count = 10;
107
108        let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
109            known_nodes_with_stake.clone(),
110            non_staked_node_count,
111        )));
112
113        // Start the web server.
114        let mut app = App::<_, Error>::with_state(events_streamer.clone());
115
116        let hotshot_events_api = define_api::<
117            Arc<RwLock<EventsStreamer<TestTypes>>>,
118            TestTypes,
119            StaticVersion<0, 1>,
120        >(&Options::default(), "0.0.1".parse().unwrap())
121        .expect("Failed to define hotshot eventsAPI");
122
123        app.register_module("api", hotshot_events_api)
124            .expect("Failed to register hotshot events API");
125
126        spawn(app.serve(api_url.clone(), StaticVersion::<0, 1>::instance()));
127
128        let client = Client::<Error, StaticVersion<0, 1>>::new(
129            format!("http://localhost:{port}/api").parse().unwrap(),
130        );
131        client.connect(None).await;
132
133        let startup_info: StartupInfo<TestTypes> = client
134            .get("startup_info")
135            .send()
136            .await
137            .expect("failed to get startup_info");
138
139        assert_eq!(startup_info.known_node_with_stake, known_nodes_with_stake);
140        assert_eq!(startup_info.non_staked_node_count, non_staked_node_count);
141    }
142
143    #[tokio::test]
144    #[traced_test]
145    async fn test_event_stream() {
146        tracing::info!("Starting test_event_stream");
147
148        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
149        let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();
150
151        let known_nodes_with_stake = vec![];
152        let non_staked_node_count = 0;
153        let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
154            known_nodes_with_stake,
155            non_staked_node_count,
156        )));
157
158        // Start the web server.
159        let mut app = App::<_, Error>::with_state(events_streamer.clone());
160
161        let hotshot_events_api = define_api::<
162            Arc<RwLock<EventsStreamer<TestTypes>>>,
163            TestTypes,
164            StaticVersion<0, 1>,
165        >(&Options::default(), "1.0.0".parse().unwrap())
166        .expect("Failed to define hotshot eventsAPI");
167
168        app.register_module("hotshot_events", hotshot_events_api)
169            .expect("Failed to register hotshot events API");
170
171        spawn(app.serve(api_url, StaticVersion::<0, 1>::instance()));
172
173        // Start Client 1
174        let client_1 = Client::<Error, StaticVersion<0, 1>>::new(
175            format!("http://localhost:{port}/hotshot_events")
176                .parse()
177                .unwrap(),
178        );
179        client_1.connect(None).await;
180
181        tracing::info!("Client 1 Connected to server");
182
183        // client 1 subscribe to hotshot events
184        let mut events_1 = client_1
185            .socket("events")
186            .subscribe::<Event<TestTypes>>()
187            .await
188            .unwrap();
189
190        tracing::info!("Client 1 Subscribed to events");
191
192        // Start Client 2
193        let client_2 = Client::<Error, StaticVersion<0, 1>>::new(
194            format!("http://localhost:{port}/hotshot_events")
195                .parse()
196                .unwrap(),
197        );
198        client_2.connect(None).await;
199
200        tracing::info!("Client 2 Connected to server");
201
202        // client 2 subscrive to hotshot events
203        let mut events_2 = client_2
204            .socket("events")
205            .subscribe::<Event<TestTypes>>()
206            .await
207            .unwrap();
208
209        tracing::info!("Client 2 Subscribed to events");
210
211        let total_count = 5;
212        // wait for these events to receive on client 1
213        let receive_handle_1 = spawn(async move {
214            let mut receive_count = 0;
215            while let Some(event) = events_1.next().await {
216                let event = event.unwrap();
217                tracing::info!("Received event in Client 1: {event:?}");
218
219                receive_count += 1;
220
221                if receive_count == total_count {
222                    tracing::info!("Client1 Received all sent events, exiting loop");
223                    break;
224                }
225            }
226
227            assert_eq!(receive_count, total_count);
228
229            tracing::info!("stream ended");
230        });
231
232        // wait for these events to receive on client 2
233        let receive_handle_2 = spawn(async move {
234            let mut receive_count = 0;
235            while let Some(event) = events_2.next().await {
236                let event = event.unwrap();
237
238                tracing::info!("Received event in Client 2: {event:?}");
239                receive_count += 1;
240
241                if receive_count == total_count {
242                    tracing::info!("Client 2 Received all sent events, exiting loop");
243                    break;
244                }
245            }
246
247            assert_eq!(receive_count, total_count);
248
249            tracing::info!("stream ended");
250        });
251
252        let send_handle = spawn(async move {
253            let mut send_count = 0;
254            loop {
255                let tx_event = generate_event(send_count);
256                tracing::debug!("Before writing to events_source");
257                events_streamer
258                    .write()
259                    .await
260                    .handle_event(tx_event.clone())
261                    .await;
262                send_count += 1;
263                tracing::debug!("After writing to events_source");
264                tracing::info!("Event sent: {tx_event:?}");
265                if send_count >= total_count {
266                    break;
267                }
268            }
269        });
270
271        send_handle.await.unwrap();
272        receive_handle_1.await.unwrap();
273        receive_handle_2.await.unwrap();
274    }
275}