1use std::{fmt::Display, ops::Bound, path::PathBuf};
24
25use derive_more::From;
26use futures::FutureExt;
27use hotshot_types::traits::node_implementation::NodeType;
28use serde::{Deserialize, Serialize};
29use snafu::{ResultExt, Snafu};
30use tide_disco::{Api, RequestError, StatusCode, api::ApiError, method::ReadState};
31use vbs::version::StaticVersionType;
32
33use crate::{Header, QueryError, api::load_api, availability::QueryableHeader};
34
35pub(crate) mod data_source;
36pub(crate) mod query_data;
37pub use data_source::*;
38pub use query_data::*;
39
40#[derive(Debug)]
41pub struct Options {
42 pub api_path: Option<PathBuf>,
43
44 pub extensions: Vec<toml::Value>,
49
50 pub window_limit: usize,
52}
53
54impl Default for Options {
55 fn default() -> Self {
56 Self {
57 api_path: None,
58 extensions: vec![],
59 window_limit: 500,
60 }
61 }
62}
63
64#[derive(Clone, Debug, From, Snafu, Deserialize, Serialize)]
65#[snafu(visibility(pub))]
66pub enum Error {
67 Request {
68 source: RequestError,
69 },
70 #[snafu(display("{source}"))]
71 Query {
72 source: QueryError,
73 },
74 #[snafu(display("error fetching VID share for block {block}: {source}"))]
75 #[from(ignore)]
76 QueryVid {
77 source: QueryError,
78 block: String,
79 },
80 #[snafu(display(
81 "error fetching window starting from {start} and ending at time {end}: {source}"
82 ))]
83 #[from(ignore)]
84 QueryWindow {
85 source: QueryError,
86 start: String,
87 end: u64,
88 },
89 #[snafu(display("error {status}: {message}"))]
90 Custom {
91 message: String,
92 status: StatusCode,
93 },
94}
95
96impl Error {
97 pub fn internal<M: Display>(message: M) -> Self {
98 Self::Custom {
99 message: message.to_string(),
100 status: StatusCode::INTERNAL_SERVER_ERROR,
101 }
102 }
103
104 pub fn status(&self) -> StatusCode {
105 match self {
106 Self::Request { .. } => StatusCode::BAD_REQUEST,
107 Self::Query { source, .. }
108 | Self::QueryVid { source, .. }
109 | Self::QueryWindow { source, .. } => source.status(),
110 Self::Custom { status, .. } => *status,
111 }
112 }
113}
114
115pub fn define_api<State, Types, Ver: StaticVersionType + 'static>(
116 options: &Options,
117 _: Ver,
118 api_ver: semver::Version,
119) -> Result<Api<State, Error, Ver>, ApiError>
120where
121 Types: NodeType,
122 Header<Types>: QueryableHeader<Types>,
123 State: 'static + Send + Sync + ReadState,
124 <State as ReadState>::State: NodeDataSource<Types> + Send + Sync,
125{
126 let mut api = load_api::<State, Error, Ver>(
127 options.api_path.as_ref(),
128 include_str!("../api/node.toml"),
129 options.extensions.clone(),
130 )?;
131 let window_limit = options.window_limit;
132 api.with_version(api_ver)
133 .get("block_height", |_req, state| {
134 async move { state.block_height().await.context(QuerySnafu) }.boxed()
135 })?
136 .get("count_transactions", |req, state| {
137 async move {
138 let from: Bound<usize> = match req.opt_integer_param("from")? {
139 Some(from) => Bound::Included(from),
140 None => Bound::Unbounded,
141 };
142 let to = match req.opt_integer_param("to")? {
143 Some(to) => Bound::Included(to),
144 None => Bound::Unbounded,
145 };
146
147 let ns = req.opt_integer_param::<_, i64>("namespace")?;
148
149 Ok(state
150 .count_transactions_in_range((from, to), ns.map(Into::into))
151 .await?)
152 }
153 .boxed()
154 })?
155 .get("payload_size", |req, state| {
156 async move {
157 let from: Bound<usize> = match req.opt_integer_param("from")? {
158 Some(from) => Bound::Included(from),
159 None => Bound::Unbounded,
160 };
161 let to = match req.opt_integer_param("to")? {
162 Some(to) => Bound::Included(to),
163 None => Bound::Unbounded,
164 };
165
166 let ns = req.opt_integer_param::<_, i64>("namespace")?;
167
168 Ok(state
169 .payload_size_in_range((from, to), ns.map(Into::into))
170 .await?)
171 }
172 .boxed()
173 })?
174 .get("get_vid_share", |req, state| {
175 async move {
176 let id = if let Some(height) = req.opt_integer_param("height")? {
177 BlockId::Number(height)
178 } else if let Some(hash) = req.opt_blob_param("hash")? {
179 BlockId::Hash(hash)
180 } else {
181 BlockId::PayloadHash(req.blob_param("payload-hash")?)
182 };
183 state.vid_share(id).await.context(QueryVidSnafu {
184 block: id.to_string(),
185 })
186 }
187 .boxed()
188 })?
189 .get("sync_status", |_req, state| {
190 async move { state.sync_status().await.context(QuerySnafu) }.boxed()
191 })?
192 .get("get_header_window", move |req, state| {
193 async move {
194 let start = if let Some(height) = req.opt_integer_param("height")? {
195 WindowStart::Height(height)
196 } else if let Some(hash) = req.opt_blob_param("hash")? {
197 WindowStart::Hash(hash)
198 } else {
199 WindowStart::Time(req.integer_param("start")?)
200 };
201 let end = req.integer_param("end")?;
202 state
203 .get_header_window(start, end, window_limit)
204 .await
205 .context(QueryWindowSnafu {
206 start: format!("{start:?}"),
207 end,
208 })
209 }
210 .boxed()
211 })?
212 .get("get_limits", move |_req, _state| {
213 async move { Ok(Limits { window_limit }) }.boxed()
214 })?;
215 Ok(api)
216}
217
218#[cfg(test)]
219mod test {
220 use std::time::Duration;
221
222 use async_lock::RwLock;
223 use committable::Committable;
224 use futures::{FutureExt, StreamExt};
225 use hotshot_types::{
226 data::{VidDisperseShare, VidShare},
227 event::{EventType, LeafInfo},
228 traits::{
229 EncodeBytes,
230 block_contents::{BlockHeader, BlockPayload},
231 },
232 };
233 use surf_disco::Client;
234 use tempfile::TempDir;
235 use test_utils::reserve_tcp_port;
236 use tide_disco::{App, Error as _};
237 use tokio::time::sleep;
238 use toml::toml;
239
240 use super::*;
241 use crate::{
242 ApiState, Error, Header,
243 data_source::ExtensibleDataSource,
244 task::BackgroundTask,
245 testing::{
246 consensus::{MockDataSource, MockNetwork, MockSqlDataSource},
247 mocks::{MockBase, MockTypes, mock_transaction},
248 },
249 };
250
251 #[test_log::test(tokio::test(flavor = "multi_thread"))]
252 async fn test_api() {
253 let window_limit = 78;
254
255 let mut network = MockNetwork::<MockDataSource>::init().await;
257 let mut events = network.handle().event_stream();
258 network.start().await;
259
260 let port = reserve_tcp_port().unwrap();
262 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
263 app.register_module(
264 "node",
265 define_api(
266 &Options {
267 window_limit,
268 ..Default::default()
269 },
270 MockBase::instance(),
271 "1.0.0".parse().unwrap(),
272 )
273 .unwrap(),
274 )
275 .unwrap();
276 network.spawn(
277 "server",
278 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
279 );
280
281 let client = Client::<Error, MockBase>::new(
283 format!("http://localhost:{port}/node").parse().unwrap(),
284 );
285 assert!(client.connect(Some(Duration::from_secs(60))).await);
286
287 assert_eq!(
289 client.get::<Limits>("limits").send().await.unwrap(),
290 Limits { window_limit }
291 );
292
293 let block_height = loop {
295 let block_height = client.get::<usize>("block-height").send().await.unwrap();
296 if block_height > network.num_nodes() {
297 break block_height;
298 }
299 sleep(Duration::from_secs(1)).await;
300 };
301
302 assert_eq!(
305 client
306 .get::<u64>("transactions/count")
307 .send()
308 .await
309 .unwrap(),
310 0
311 );
312 assert_eq!(
313 client
314 .get::<u64>("payloads/total-size")
315 .send()
316 .await
317 .unwrap(),
318 0
319 );
320
321 let mut headers = vec![];
322
323 tracing::info!(block_height, "checking VID shares");
325 'outer: while let Some(event) = events.next().await {
326 let EventType::Decide { leaf_chain, .. } = event.event else {
327 continue;
328 };
329 for LeafInfo {
330 leaf, vid_share, ..
331 } in leaf_chain.iter().rev()
332 {
333 headers.push(leaf.block_header().clone());
334 if leaf.block_header().block_number >= block_height as u64 {
335 break 'outer;
336 }
337 tracing::info!(height = leaf.block_header().block_number, "checking share");
338
339 let share = client
340 .get::<VidShare>(&format!("vid/share/{}", leaf.block_header().block_number))
341 .send()
342 .await
343 .unwrap();
344 if let Some(vid_share) = vid_share.as_ref() {
345 let VidDisperseShare::V0(new_share) = vid_share else {
346 panic!("VID share is not V0");
347 };
348 assert_eq!(share, VidShare::V0(new_share.share.clone()));
349 }
350
351 assert_eq!(
353 share,
354 client
355 .get(&format!("vid/share/hash/{}", leaf.block_header().commit()))
356 .send()
357 .await
358 .unwrap()
359 );
360 assert_eq!(
361 share,
362 client
363 .get(&format!(
364 "vid/share/payload-hash/{}",
365 leaf.block_header().payload_commitment
366 ))
367 .send()
368 .await
369 .unwrap()
370 );
371 }
372 }
373
374 sleep(Duration::from_secs(2)).await;
379 let first_header = &headers[0];
380 let last_header = &headers.last().unwrap();
381 let window: TimeWindowQueryData<Header<MockTypes>> = client
382 .get(&format!(
383 "header/window/{}/{}",
384 first_header.timestamp,
385 last_header.timestamp + 1
386 ))
387 .send()
388 .await
389 .unwrap();
390 assert!(window.window.contains(first_header));
391 assert!(window.window.contains(last_header));
392 assert!(window.next.is_some());
393
394 assert_eq!(
396 window,
397 client
398 .get(&format!(
399 "header/window/from/0/{}",
400 last_header.timestamp + 1
401 ))
402 .send()
403 .await
404 .unwrap()
405 );
406 assert_eq!(
407 window,
408 client
409 .get(&format!(
410 "header/window/from/hash/{}/{}",
411 first_header.commit(),
412 last_header.timestamp + 1
413 ))
414 .send()
415 .await
416 .unwrap()
417 );
418
419 let sync_status = client
421 .get::<SyncStatusQueryData>("sync-status")
422 .send()
423 .await
424 .unwrap();
425 assert!(sync_status.is_fully_synced(), "{sync_status:#?}");
426
427 network.shut_down().await;
428 }
429
430 #[test_log::test(tokio::test(flavor = "multi_thread"))]
431 async fn test_aggregate_ranges() {
432 let mut network = MockNetwork::<MockSqlDataSource>::init().await;
434 let mut events = network.handle().event_stream();
435 network.start().await;
436
437 let port = reserve_tcp_port().unwrap();
439 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
440 app.register_module(
441 "node",
442 define_api(
443 &Default::default(),
444 MockBase::instance(),
445 "1.0.0".parse().unwrap(),
446 )
447 .unwrap(),
448 )
449 .unwrap();
450 network.spawn(
451 "server",
452 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
453 );
454
455 let client =
457 Client::<Error, MockBase>::new(format!("http://localhost:{port}").parse().unwrap());
458 assert!(client.connect(Some(Duration::from_secs(60))).await);
459
460 let mut tx_heights = vec![];
462 let mut tx_sizes = vec![];
463 for i in [1, 2] {
464 let txn = mock_transaction(vec![0; i]);
465 let hash = txn.commit();
466
467 network.submit_transaction(txn).await;
468
469 let leaf = 'outer: loop {
470 let EventType::Decide { leaf_chain, .. } = events.next().await.unwrap().event
471 else {
472 continue;
473 };
474 for info in leaf_chain.iter().rev() {
475 let leaf = &info.leaf;
476 if BlockPayload::<MockTypes>::transaction_commitments(
477 &leaf.block_payload().unwrap(),
478 BlockHeader::<MockTypes>::metadata(leaf.block_header()),
479 )
480 .contains(&hash)
481 {
482 break 'outer leaf.clone();
483 }
484 }
485
486 tracing::info!("waiting for tx {i}");
487 sleep(Duration::from_secs(1)).await;
488 };
489 tx_heights.push(leaf.height());
490 tx_sizes.push(leaf.block_payload().unwrap().encode().len());
491 }
492 tracing::info!(?tx_heights, ?tx_sizes, "transactions sequenced");
493
494 while let Err(err) = client
496 .get::<usize>(&format!("node/transactions/count/{}", tx_heights[1]))
497 .send()
498 .await
499 {
500 if err.status() == StatusCode::NOT_FOUND {
501 tracing::info!(?tx_heights, "waiting for aggregator");
502 sleep(Duration::from_secs(1)).await;
503 continue;
504 } else {
505 panic!("unexpected error: {err:#}");
506 }
507 }
508
509 assert_eq!(
511 0,
512 client
513 .get::<usize>("node/transactions/count/0")
514 .send()
515 .await
516 .unwrap()
517 );
518 assert_eq!(
519 0,
520 client
521 .get::<usize>("node/payloads/size/0")
522 .send()
523 .await
524 .unwrap()
525 );
526
527 assert_eq!(
529 1,
530 client
531 .get::<usize>(&format!("node/transactions/count/{}", tx_heights[0]))
532 .send()
533 .await
534 .unwrap()
535 );
536 assert_eq!(
537 tx_sizes[0],
538 client
539 .get::<usize>(&format!("node/payloads/size/{}", tx_heights[0]))
540 .send()
541 .await
542 .unwrap()
543 );
544
545 assert_eq!(
547 1,
548 client
549 .get::<usize>(&format!(
550 "node/transactions/count/{}/{}",
551 tx_heights[0] + 1,
552 tx_heights[1]
553 ))
554 .send()
555 .await
556 .unwrap()
557 );
558 assert_eq!(
559 tx_sizes[1],
560 client
561 .get::<usize>(&format!(
562 "node/payloads/size/{}/{}",
563 tx_heights[0] + 1,
564 tx_heights[1]
565 ))
566 .send()
567 .await
568 .unwrap()
569 );
570
571 assert_eq!(
573 2,
574 client
575 .get::<usize>("node/transactions/count",)
576 .send()
577 .await
578 .unwrap()
579 );
580 assert_eq!(
581 tx_sizes[0] + tx_sizes[1],
582 client
583 .get::<usize>("node/payloads/size",)
584 .send()
585 .await
586 .unwrap()
587 );
588
589 network.shut_down().await;
590 }
591
592 #[test_log::test(tokio::test(flavor = "multi_thread"))]
593 async fn test_extensions() {
594 let dir = TempDir::with_prefix("test_node_extensions").unwrap();
595 let data_source = ExtensibleDataSource::new(
596 MockDataSource::create(dir.path(), Default::default())
597 .await
598 .unwrap(),
599 0,
600 );
601
602 let extensions = toml! {
604 [route.post_ext]
605 PATH = ["/ext/:val"]
606 METHOD = "POST"
607 ":val" = "Integer"
608
609 [route.get_ext]
610 PATH = ["/ext"]
611 METHOD = "GET"
612 };
613
614 let mut api =
615 define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockTypes, MockBase>(
616 &Options {
617 extensions: vec![extensions.into()],
618 ..Default::default()
619 },
620 MockBase::instance(),
621 "1.0.0".parse().unwrap(),
622 )
623 .unwrap();
624 api.get("get_ext", |_, state| {
625 async move { Ok(*state.as_ref()) }.boxed()
626 })
627 .unwrap()
628 .post("post_ext", |req, state| {
629 async move {
630 *state.as_mut() = req.integer_param("val")?;
631 Ok(())
632 }
633 .boxed()
634 })
635 .unwrap();
636
637 let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
638 app.register_module("node", api).unwrap();
639
640 let port = reserve_tcp_port().unwrap();
641 let _server = BackgroundTask::spawn(
642 "server",
643 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
644 );
645
646 let client = Client::<Error, MockBase>::new(
647 format!("http://localhost:{port}/node").parse().unwrap(),
648 );
649 assert!(client.connect(Some(Duration::from_secs(60))).await);
650
651 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
652 client.post::<()>("ext/42").send().await.unwrap();
653 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
654
655 let sync_status: SyncStatusQueryData = client.get("sync-status").send().await.unwrap();
657 assert!(sync_status.is_fully_synced(), "{sync_status:#?}");
658 }
659}