1use std::{borrow::Cow, fmt::Display, path::PathBuf};
27
28use derive_more::From;
29use futures::FutureExt;
30use serde::{Deserialize, Serialize};
31use snafu::Snafu;
32use tide_disco::{api::ApiError, method::ReadState, Api, RequestError, StatusCode};
33use vbs::version::StaticVersionType;
34
35use crate::api::load_api;
36
37pub(crate) mod data_source;
38
39pub use data_source::*;
40
41#[derive(Default)]
42pub struct Options {
43 pub api_path: Option<PathBuf>,
44
45 pub extensions: Vec<toml::Value>,
50}
51
52#[derive(Clone, Debug, From, Snafu, Deserialize, Serialize)]
53pub enum Error {
54 Request { source: RequestError },
55 Internal { reason: String },
56}
57
58impl Error {
59 pub fn status(&self) -> StatusCode {
60 match self {
61 Self::Request { .. } => StatusCode::BAD_REQUEST,
62 Self::Internal { .. } => StatusCode::INTERNAL_SERVER_ERROR,
63 }
64 }
65}
66
67fn internal<M: Display>(msg: M) -> Error {
68 Error::Internal {
69 reason: msg.to_string(),
70 }
71}
72
73pub fn define_api<State, Ver: StaticVersionType + 'static>(
74 options: &Options,
75 _: Ver,
76 api_ver: semver::Version,
77) -> Result<Api<State, Error, Ver>, ApiError>
78where
79 State: 'static + Send + Sync + ReadState,
80 <State as ReadState>::State: Send + Sync + StatusDataSource,
81{
82 let mut api = load_api::<State, Error, Ver>(
83 options.api_path.as_ref(),
84 include_str!("../api/status.toml"),
85 options.extensions.clone(),
86 )?;
87 api.with_version(api_ver)
88 .get("block_height", |_, state| {
89 async { state.block_height().await.map_err(internal) }.boxed()
90 })?
91 .get("success_rate", |_, state| {
92 async { state.success_rate().await.map_err(internal) }.boxed()
93 })?
94 .get("time_since_last_decide", |_, state| {
95 async {
96 state
97 .elapsed_time_since_last_decide()
98 .await
99 .map_err(internal)
100 }
101 .boxed()
102 })?
103 .metrics("metrics", |_, state| {
104 async { Ok(Cow::Borrowed(state.metrics())) }.boxed()
105 })?;
106 Ok(api)
107}
108
109#[cfg(test)]
110mod test {
111 use std::{str::FromStr, time::Duration};
112
113 use async_lock::RwLock;
114 use futures::FutureExt;
115 use portpicker::pick_unused_port;
116 use reqwest::redirect::Policy;
117 use surf_disco::Client;
118 use tempfile::TempDir;
119 use tide_disco::{App, Url};
120 use toml::toml;
121
122 use super::*;
123 use crate::{
124 data_source::ExtensibleDataSource,
125 task::BackgroundTask,
126 testing::{
127 consensus::{MockDataSource, MockNetwork},
128 mocks::{MockBase, MockVersions},
129 setup_test, sleep,
130 },
131 ApiState, Error,
132 };
133
134 #[tokio::test(flavor = "multi_thread")]
135 async fn test_api() {
136 setup_test();
137
138 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
140
141 let port = pick_unused_port().unwrap();
143 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
144 app.register_module(
145 "status",
146 define_api(
147 &Default::default(),
148 MockBase::instance(),
149 "0.0.1".parse().unwrap(),
150 )
151 .unwrap(),
152 )
153 .unwrap();
154 network.spawn(
155 "server",
156 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
157 );
158
159 let url = Url::from_str(&format!("http://localhost:{port}/status")).unwrap();
161 let client = Client::<Error, MockBase>::new(url.clone());
162 assert!(client.connect(Some(Duration::from_secs(60))).await);
163
164 assert_eq!(client.get::<u64>("block-height").send().await.unwrap(), 0);
166
167 let reqwest_client = reqwest::Client::builder()
170 .redirect(Policy::limited(5))
171 .build()
172 .unwrap();
173
174 let res = reqwest_client
176 .get(format!("{url}/metrics"))
177 .send()
178 .await
179 .unwrap();
180
181 assert_eq!(res.status(), StatusCode::OK);
183 let prometheus = res.text().await.unwrap();
184 let lines = prometheus.lines().collect::<Vec<_>>();
185 assert!(
186 lines.contains(&"consensus_current_view 0"),
187 "Missing consensus_current_view in metrics:\n{prometheus}"
188 );
189
190 network.start().await;
192
193 while client.get::<u64>("block-height").send().await.unwrap() <= 1 {
197 tracing::info!("waiting for block height to update");
198 sleep(Duration::from_secs(1)).await;
199 }
200 let success_rate = client.get::<f64>("success-rate").send().await.unwrap();
201 assert!(success_rate.is_finite(), "{success_rate}");
204 assert!(success_rate > 0.0, "{success_rate}");
206
207 network.shut_down().await;
208 }
209
210 #[tokio::test(flavor = "multi_thread")]
211 async fn test_extensions() {
212 setup_test();
213
214 let dir = TempDir::with_prefix("test_status_extensions").unwrap();
215 let data_source = ExtensibleDataSource::new(
216 MockDataSource::create(dir.path(), Default::default())
217 .await
218 .unwrap(),
219 0,
220 );
221
222 let extensions = toml! {
223 [route.post_ext]
224 PATH = ["/ext/:val"]
225 METHOD = "POST"
226 ":val" = "Integer"
227
228 [route.get_ext]
229 PATH = ["/ext"]
230 METHOD = "GET"
231 };
232
233 let mut api = define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockBase>(
234 &Options {
235 extensions: vec![extensions.into()],
236 ..Default::default()
237 },
238 MockBase::instance(),
239 "0.0.1".parse().unwrap(),
240 )
241 .unwrap();
242 api.get("get_ext", |_, state| {
243 async move { Ok(*state.as_ref()) }.boxed()
244 })
245 .unwrap()
246 .post("post_ext", |req, state| {
247 async move {
248 *state.as_mut() = req.integer_param("val")?;
249 Ok(())
250 }
251 .boxed()
252 })
253 .unwrap();
254
255 let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
256 app.register_module("status", api).unwrap();
257
258 let port = pick_unused_port().unwrap();
259 let _server = BackgroundTask::spawn(
260 "server",
261 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
262 );
263
264 let client = Client::<Error, MockBase>::new(
265 format!("http://localhost:{port}/status").parse().unwrap(),
266 );
267 assert!(client.connect(Some(Duration::from_secs(60))).await);
268
269 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
270 client.post::<()>("ext/42").send().await.unwrap();
271 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
272
273 assert_eq!(client.get::<u64>("block-height").send().await.unwrap(), 0);
275 }
276}