sequencer/state_signature/
relay_server.rs1use std::{path::PathBuf, sync::Arc};
2
3use async_lock::RwLock;
4use clap::Args;
5use futures::FutureExt;
6use hotshot_types::{
7 light_client::{
8 LCV1StateSignatureRequestBody, LCV1StateSignaturesBundle, LCV2StateSignaturesBundle,
9 LCV3StateSignatureRequestBody, LCV3StateSignaturesBundle,
10 },
11 traits::signature_key::LCV1StateSignatureKey,
12};
13use lcv1_relay::{LCV1StateRelayServerDataSource, LCV1StateRelayServerState};
14use lcv2_relay::{LCV2StateRelayServerDataSource, LCV2StateRelayServerState};
15use lcv3_relay::{LCV3StateRelayServerDataSource, LCV3StateRelayServerState};
16use tide_disco::{
17 api::ApiError,
18 error::ServerError,
19 method::{ReadState, WriteState},
20 Api, App, Error as _, StatusCode,
21};
22use tokio::sync::oneshot;
23use url::Url;
24use vbs::version::StaticVersionType;
25
26use super::LCV2StateSignatureRequestBody;
27
28pub mod lcv1_relay;
29pub mod lcv2_relay;
30pub mod lcv3_relay;
31pub mod stake_table_tracker;
32
33pub struct StateRelayServerState {
35 lcv1_state: LCV1StateRelayServerState,
37 lcv2_state: LCV2StateRelayServerState,
39 lcv3_state: LCV3StateRelayServerState,
41 shutdown: Option<oneshot::Receiver<()>>,
43}
44
45impl StateRelayServerState {
46 pub fn new(sequencer_url: Url) -> Self {
48 let stake_table_tracker =
49 Arc::new(stake_table_tracker::StakeTableTracker::new(sequencer_url));
50 Self {
51 lcv1_state: LCV1StateRelayServerState::new(stake_table_tracker.clone()),
52 lcv2_state: LCV2StateRelayServerState::new(stake_table_tracker.clone()),
53 lcv3_state: LCV3StateRelayServerState::new(stake_table_tracker),
54 shutdown: None,
55 }
56 }
57
58 pub fn with_shutdown_signal(
59 mut self,
60 shutdown_listener: Option<oneshot::Receiver<()>>,
61 ) -> Self {
62 if self.shutdown.is_some() {
63 panic!("A shutdown signal is already registered and can not be registered twice");
64 }
65 self.shutdown = shutdown_listener;
66 self
67 }
68}
69
70#[async_trait::async_trait]
71impl LCV1StateRelayServerDataSource for StateRelayServerState {
72 fn get_latest_signature_bundle(&self) -> Result<LCV1StateSignaturesBundle, ServerError> {
73 self.lcv1_state.get_latest_signature_bundle()
74 }
75
76 async fn post_signature(
77 &mut self,
78 req: LCV1StateSignatureRequestBody,
79 ) -> Result<(), ServerError> {
80 self.lcv1_state.post_signature(req).await
81 }
82}
83
84#[async_trait::async_trait]
85impl LCV2StateRelayServerDataSource for StateRelayServerState {
86 fn get_latest_signature_bundle(&self) -> Result<LCV2StateSignaturesBundle, ServerError> {
87 self.lcv2_state.get_latest_signature_bundle()
88 }
89
90 async fn post_signature(
91 &mut self,
92 req: LCV2StateSignatureRequestBody,
93 ) -> Result<(), ServerError> {
94 self.lcv2_state.post_signature(req).await
95 }
96}
97
98#[async_trait::async_trait]
99impl LCV3StateRelayServerDataSource for StateRelayServerState {
100 fn get_latest_signature_bundle(&self) -> Result<LCV3StateSignaturesBundle, ServerError> {
101 self.lcv3_state.get_latest_signature_bundle()
102 }
103
104 async fn post_signature(
105 &mut self,
106 req: LCV3StateSignatureRequestBody,
107 ) -> Result<(), ServerError> {
108 self.lcv3_state.post_signature(req).await
109 }
110}
111
112#[derive(Args, Default)]
114pub struct Options {
115 #[arg(
116 long = "state-relay-server-api-path",
117 env = "STATE_RELAY_SERVER_API_PATH"
118 )]
119 pub api_path: Option<PathBuf>,
121}
122
123fn define_api<State, BindVer: StaticVersionType + 'static>(
125 options: &Options,
126 bind_version: BindVer,
127 api_ver: semver::Version,
128) -> Result<Api<State, ServerError, BindVer>, ApiError>
129where
130 State: 'static + Send + Sync + ReadState + WriteState,
131 <State as ReadState>::State: Send
132 + Sync
133 + LCV1StateRelayServerDataSource
134 + LCV2StateRelayServerDataSource
135 + LCV3StateRelayServerDataSource,
136{
137 let mut api = match &options.api_path {
138 Some(path) => Api::<State, ServerError, BindVer>::from_file(path)?,
139 None => {
140 let toml: toml::Value = toml::from_str(include_str!(
141 "../../api/state_relay_server.toml"
142 ))
143 .map_err(|err| ApiError::CannotReadToml {
144 reason: err.to_string(),
145 })?;
146 Api::<State, ServerError, BindVer>::new(toml)?
147 },
148 };
149
150 api.with_version(api_ver.clone());
151
152 api.post("postlegacystatesignature", move |req, state| {
153 async move {
154 let req = match req.body_auto::<LCV1StateSignatureRequestBody, BindVer>(bind_version) {
155 Ok(req) => req,
156 Err(_) => {
157 match req.body_auto::<LCV2StateSignatureRequestBody, BindVer>(bind_version) {
158 Ok(req) => req.into(),
159 Err(_) => {
160 return Err(ServerError::catch_all(
161 StatusCode::BAD_REQUEST,
162 "Invalid request body".to_string(),
163 ))
164 },
165 }
166 },
167 };
168 LCV1StateRelayServerDataSource::post_signature(state, req).await?;
169 Ok(())
170 }
171 .boxed()
172 })?
173 .post("poststatesignature", move |req, state| {
174 async move {
175 if let Ok(req) = req.body_auto::<LCV3StateSignatureRequestBody, BindVer>(bind_version) {
176 tracing::debug!("Received LCV3 state signature: {req}");
177 if let Err(e) =
178 LCV2StateRelayServerDataSource::post_signature(state, req.clone().into()).await
179 {
180 tracing::error!("Failed to post downgraded LCV2 state signature: {}", e);
181 }
182 LCV3StateRelayServerDataSource::post_signature(state, req).await
183 } else if let Ok(req) =
184 req.body_auto::<LCV2StateSignatureRequestBody, BindVer>(bind_version)
185 {
186 tracing::debug!("Received LCV2 state signature: {req}");
187 if LCV1StateSignatureKey::verify_state_sig(&req.key, &req.signature, &req.state) {
188 LCV1StateRelayServerDataSource::post_signature(state, req.into()).await
189 } else {
190 LCV2StateRelayServerDataSource::post_signature(state, req).await
191 }
192 } else if let Ok(req) =
193 req.body_auto::<LCV1StateSignatureRequestBody, BindVer>(bind_version)
194 {
195 tracing::debug!("Received LCV1 state signature: {req}");
196 LCV1StateRelayServerDataSource::post_signature(state, req).await
197 } else {
198 Err(ServerError::catch_all(
199 StatusCode::BAD_REQUEST,
200 "Invalid request body".to_string(),
201 ))
202 }
203 }
204 .boxed()
205 })?
206 .get("getlatestlegacystate", |_req, state| {
207 async move {
208 LCV1StateRelayServerDataSource::get_latest_signature_bundle(state)
209 .map(LCV2StateSignaturesBundle::from_v1)
210 }
211 .boxed()
212 })?
213 .get("getlateststate", |_req, state| {
214 async move { LCV2StateRelayServerDataSource::get_latest_signature_bundle(state) }.boxed()
215 })?;
216
217 if api_ver.major == 1 {
218 api.get("lateststate", |_req, state| {
219 async move { LCV1StateRelayServerDataSource::get_latest_signature_bundle(state) }
220 .boxed()
221 })?;
222 } else if api_ver.major == 2 {
223 api.get("lateststate", |_req, state| {
224 async move { LCV2StateRelayServerDataSource::get_latest_signature_bundle(state) }
225 .boxed()
226 })?;
227 } else {
228 api.get("lateststate", |_req, state| {
229 async move { LCV3StateRelayServerDataSource::get_latest_signature_bundle(state) }
230 .boxed()
231 })?;
232 }
233 Ok(api)
234}
235
236pub async fn run_relay_server<BindVer: StaticVersionType + 'static>(
237 shutdown_listener: Option<oneshot::Receiver<()>>,
238 sequencer_url: Url,
239 url: Url,
240 bind_version: BindVer,
241) -> anyhow::Result<()> {
242 let options = Options::default();
243
244 let state = RwLock::new(
245 StateRelayServerState::new(sequencer_url).with_shutdown_signal(shutdown_listener),
246 );
247 let mut app = App::<RwLock<StateRelayServerState>, ServerError>::with_state(state);
248
249 let v1_api = define_api(&options, bind_version, "1.0.0".parse().unwrap()).unwrap();
250 let v2_api = define_api(&options, bind_version, "2.0.0".parse().unwrap()).unwrap();
251 let v3_api = define_api(&options, bind_version, "3.0.0".parse().unwrap()).unwrap();
252 app.register_module("api", v1_api)?
253 .register_module("api", v2_api)?
254 .register_module("api", v3_api)?;
255
256 let app_future = app.serve(url.clone(), bind_version);
257 app_future.await?;
258
259 tracing::info!(%url, "Relay server starts serving at ");
260
261 Ok(())
262}
263
264pub async fn run_relay_server_with_state<BindVer: StaticVersionType + 'static>(
265 server_url: Url,
266 bind_version: BindVer,
267 state: StateRelayServerState,
268) -> anyhow::Result<()> {
269 let options = Options::default();
270
271 let mut app = App::<RwLock<StateRelayServerState>, ServerError>::with_state(RwLock::new(state));
272
273 app.register_module(
274 "api",
275 define_api(&options, bind_version, "1.0.0".parse().unwrap()).unwrap(),
276 )
277 .unwrap();
278 app.register_module(
279 "api",
280 define_api(&options, bind_version, "2.0.0".parse().unwrap()).unwrap(),
281 )
282 .unwrap();
283 app.register_module(
284 "api",
285 define_api(&options, bind_version, "3.0.0".parse().unwrap()).unwrap(),
286 )
287 .unwrap();
288
289 let app_future = app.serve(server_url.clone(), bind_version);
290 app_future.await?;
291
292 tracing::info!(%server_url, "Relay server starts serving at ");
293
294 Ok(())
295}