hotshot_events_service/
events.rs

1use std::path::PathBuf;
2
3use clap::Args;
4use derive_more::From;
5use futures::{FutureExt, StreamExt, TryFutureExt};
6use hotshot_types::traits::node_implementation::NodeType;
7use serde::{Deserialize, Serialize};
8use snafu::Snafu;
9use tide_disco::{api::ApiError, method::ReadState, Api, RequestError, StatusCode};
10use vbs::version::StaticVersionType;
11
12use crate::{api::load_api, events_source::EventsSource};
13
14#[derive(Args, Default, Debug)]
15pub struct Options {
16    #[arg(
17        long = "hotshot-events-service-api-path",
18        env = "HOTSHOT_EVENTS_SERVICE_API_PATH"
19    )]
20    pub api_path: Option<PathBuf>,
21
22    /// Additional API specification files to merge with `hotshot-events-service-api-path`.
23    ///
24    /// These optional files may contain route definitions for application-specific routes that have
25    /// been added as extensions to the basic hotshot-events-service API.
26    #[arg(
27        long = "hotshot-events-extension",
28        env = "HOTSHOT_EVENTS_SERVICE_EXTENSIONS",
29        value_delimiter = ','
30    )]
31    pub extensions: Vec<toml::Value>,
32}
33
34#[derive(Clone, Debug, Snafu, Deserialize, Serialize)]
35#[snafu(visibility(pub))]
36pub enum EventError {
37    /// The requested resource does not exist or is not known to this hotshot node.
38    NotFound,
39    /// The requested resource exists but is not currently available.
40    Missing,
41    /// There was an error while trying to fetch the requested resource.
42    #[snafu(display("Failed to fetch requested resource: {message}"))]
43    Error { message: String },
44}
45
46#[derive(Clone, Debug, From, Snafu, Deserialize, Serialize)]
47#[snafu(visibility(pub))]
48pub enum Error {
49    Request {
50        source: RequestError,
51    },
52    #[snafu(display("error receiving events {resource}: {source}"))]
53    #[from(ignore)]
54    EventAvailable {
55        source: EventError,
56        resource: String,
57    },
58    Custom {
59        message: String,
60        status: StatusCode,
61    },
62}
63
64impl tide_disco::error::Error for Error {
65    fn catch_all(status: StatusCode, msg: String) -> Self {
66        Error::Custom {
67            message: msg,
68            status,
69        }
70    }
71    fn status(&self) -> StatusCode {
72        match self {
73            Error::Request { .. } => StatusCode::BAD_REQUEST,
74            Error::EventAvailable { source, .. } => match source {
75                EventError::NotFound => StatusCode::NOT_FOUND,
76                EventError::Missing => StatusCode::NOT_FOUND,
77                EventError::Error { .. } => StatusCode::INTERNAL_SERVER_ERROR,
78            },
79            Error::Custom { .. } => StatusCode::INTERNAL_SERVER_ERROR,
80        }
81    }
82}
83
84pub fn define_api<State, Types, Ver>(
85    options: &Options,
86    api_ver: semver::Version,
87) -> Result<Api<State, Error, Ver>, ApiError>
88where
89    State: 'static + Send + Sync + ReadState,
90    <State as ReadState>::State: Send + Sync + EventsSource<Types>,
91    Types: NodeType,
92    Ver: StaticVersionType + 'static,
93{
94    let mut api = load_api::<State, Error, Ver>(
95        options.api_path.as_ref(),
96        include_str!("../api/hotshot_events.toml"),
97        options.extensions.clone(),
98    )?;
99
100    api.with_version(api_ver.clone());
101
102    if api_ver.major == 0 {
103        api.stream("events", move |_, state| {
104            async move {
105                tracing::info!("client subscribed to legacy events");
106                state
107                    .read(|state| {
108                        async move { Ok(state.get_legacy_event_stream(None).await.map(Ok)) }.boxed()
109                    })
110                    .await
111            }
112            .try_flatten_stream()
113            .boxed()
114        })?;
115    } else {
116        api.stream("events", move |_, state| {
117            async move {
118                tracing::info!("client subscribed to events");
119                state
120                    .read(|state| {
121                        async move { Ok(state.get_event_stream(None).await.map(Ok)) }.boxed()
122                    })
123                    .await
124            }
125            .try_flatten_stream()
126            .boxed()
127        })?;
128    }
129
130    api.get("startup_info", |_, state| {
131        async move { Ok(state.get_startup_info().await) }.boxed()
132    })?;
133
134    Ok(api)
135}