hotshot_events_service/
events.rs1use std::path::PathBuf;
2
3use clap::Args;
4use derive_more::From;
5use futures::{FutureExt, StreamExt, TryFutureExt};
6use hotshot_types::traits::node_implementation::NodeType;
7use serde::{Deserialize, Serialize};
8use snafu::Snafu;
9use tide_disco::{api::ApiError, method::ReadState, Api, RequestError, StatusCode};
10use vbs::version::StaticVersionType;
11
12use crate::{api::load_api, events_source::EventsSource};
13
14#[derive(Args, Default, Debug)]
15pub struct Options {
16 #[arg(
17 long = "hotshot-events-service-api-path",
18 env = "HOTSHOT_EVENTS_SERVICE_API_PATH"
19 )]
20 pub api_path: Option<PathBuf>,
21
22 #[arg(
27 long = "hotshot-events-extension",
28 env = "HOTSHOT_EVENTS_SERVICE_EXTENSIONS",
29 value_delimiter = ','
30 )]
31 pub extensions: Vec<toml::Value>,
32}
33
34#[derive(Clone, Debug, Snafu, Deserialize, Serialize)]
35#[snafu(visibility(pub))]
36pub enum EventError {
37 NotFound,
39 Missing,
41 #[snafu(display("Failed to fetch requested resource: {message}"))]
43 Error { message: String },
44}
45
46#[derive(Clone, Debug, From, Snafu, Deserialize, Serialize)]
47#[snafu(visibility(pub))]
48pub enum Error {
49 Request {
50 source: RequestError,
51 },
52 #[snafu(display("error receiving events {resource}: {source}"))]
53 #[from(ignore)]
54 EventAvailable {
55 source: EventError,
56 resource: String,
57 },
58 Custom {
59 message: String,
60 status: StatusCode,
61 },
62}
63
64impl tide_disco::error::Error for Error {
65 fn catch_all(status: StatusCode, msg: String) -> Self {
66 Error::Custom {
67 message: msg,
68 status,
69 }
70 }
71 fn status(&self) -> StatusCode {
72 match self {
73 Error::Request { .. } => StatusCode::BAD_REQUEST,
74 Error::EventAvailable { source, .. } => match source {
75 EventError::NotFound => StatusCode::NOT_FOUND,
76 EventError::Missing => StatusCode::NOT_FOUND,
77 EventError::Error { .. } => StatusCode::INTERNAL_SERVER_ERROR,
78 },
79 Error::Custom { .. } => StatusCode::INTERNAL_SERVER_ERROR,
80 }
81 }
82}
83
84pub fn define_api<State, Types, Ver>(
85 options: &Options,
86 api_ver: semver::Version,
87) -> Result<Api<State, Error, Ver>, ApiError>
88where
89 State: 'static + Send + Sync + ReadState,
90 <State as ReadState>::State: Send + Sync + EventsSource<Types>,
91 Types: NodeType,
92 Ver: StaticVersionType + 'static,
93{
94 let mut api = load_api::<State, Error, Ver>(
95 options.api_path.as_ref(),
96 include_str!("../api/hotshot_events.toml"),
97 options.extensions.clone(),
98 )?;
99
100 api.with_version(api_ver.clone());
101
102 if api_ver.major == 0 {
103 api.stream("events", move |_, state| {
104 async move {
105 tracing::info!("client subscribed to legacy events");
106 state
107 .read(|state| {
108 async move { Ok(state.get_legacy_event_stream(None).await.map(Ok)) }.boxed()
109 })
110 .await
111 }
112 .try_flatten_stream()
113 .boxed()
114 })?;
115 } else {
116 api.stream("events", move |_, state| {
117 async move {
118 tracing::info!("client subscribed to events");
119 state
120 .read(|state| {
121 async move { Ok(state.get_event_stream(None).await.map(Ok)) }.boxed()
122 })
123 .await
124 }
125 .try_flatten_stream()
126 .boxed()
127 })?;
128 }
129
130 api.get("startup_info", |_, state| {
131 async move { Ok(state.get_startup_info().await) }.boxed()
132 })?;
133
134 Ok(api)
135}