sequencer/state_signature/relay_server/
lcv2_relay.rs

1use std::{
2    collections::{hash_map::Entry, BTreeSet, HashMap},
3    sync::Arc,
4};
5
6use alloy::primitives::U256;
7use hotshot_types::{
8    light_client::{
9        LCV2StateSignatureRequestBody, LCV2StateSignaturesBundle, LightClientState, StateVerKey,
10    },
11    traits::signature_key::LCV2StateSignatureKey,
12};
13use tide_disco::{error::ServerError, Error, StatusCode};
14
15use super::stake_table_tracker::StakeTableTracker;
16
17#[async_trait::async_trait]
18pub trait LCV2StateRelayServerDataSource {
19    /// Get the latest available signatures bundle.
20    /// # Errors
21    /// Errors if there's no available signatures bundle.
22    fn get_latest_signature_bundle(&self) -> Result<LCV2StateSignaturesBundle, ServerError>;
23
24    /// Post a signature to the relay server
25    /// # Errors
26    /// Errors if the signature is invalid, already posted, or no longer needed.
27    async fn post_signature(
28        &mut self,
29        req: LCV2StateSignatureRequestBody,
30    ) -> Result<(), ServerError>;
31}
32
33/// Server state that tracks the light client V2 state and signatures
34pub struct LCV2StateRelayServerState {
35    /// Bundles for light client V2
36    bundles: HashMap<u64, HashMap<LightClientState, LCV2StateSignaturesBundle>>,
37
38    /// The latest state signatures bundle for LCV2 light client
39    latest_available_bundle: Option<LCV2StateSignaturesBundle>,
40    /// The block height of the latest available LCV2 state signature bundle
41    latest_block_height: Option<u64>,
42
43    /// A ordered queue of block heights for V2 light client state, used for garbage collection.
44    gc_queue: BTreeSet<u64>,
45
46    /// Stake table tracker
47    stake_table_tracker: Arc<StakeTableTracker>,
48}
49
50#[async_trait::async_trait]
51impl LCV2StateRelayServerDataSource for LCV2StateRelayServerState {
52    fn get_latest_signature_bundle(&self) -> Result<LCV2StateSignaturesBundle, ServerError> {
53        self.latest_available_bundle
54            .clone()
55            .ok_or(ServerError::catch_all(
56                StatusCode::NOT_FOUND,
57                "The light client V2 state signatures are not ready.".to_owned(),
58            ))
59    }
60
61    async fn post_signature(
62        &mut self,
63        req: LCV2StateSignatureRequestBody,
64    ) -> Result<(), ServerError> {
65        let block_height = req.state.block_height;
66        if block_height <= self.latest_block_height.unwrap_or(0) {
67            // This signature is no longer needed
68            return Ok(());
69        }
70        let stake_table = self
71            .stake_table_tracker
72            .stake_table_info_for_block(block_height)
73            .await
74            .map_err(|e| {
75                ServerError::catch_all(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
76            })?;
77        let Some(weight) = stake_table.known_nodes.get(&req.key) else {
78            tracing::warn!("Received LCV2 signature from unknown node: {req}");
79            return Err(ServerError::catch_all(
80                StatusCode::UNAUTHORIZED,
81                "LCV2 signature posted by nodes not on the stake table".to_owned(),
82            ));
83        };
84
85        // sanity check the signature validity first before adding in
86        if !<StateVerKey as LCV2StateSignatureKey>::verify_state_sig(
87            &req.key,
88            &req.signature,
89            &req.state,
90            &req.next_stake,
91        ) {
92            tracing::warn!("Couldn't verify the received LCV2 signature: {req}");
93            return Err(ServerError::catch_all(
94                StatusCode::BAD_REQUEST,
95                "The posted LCV2 signature is not valid.".to_owned(),
96            ));
97        }
98
99        let bundles_at_height = self.bundles.entry(block_height).or_default();
100        self.gc_queue.insert(block_height);
101
102        let bundle = bundles_at_height
103            .entry(req.state)
104            .or_insert(LCV2StateSignaturesBundle {
105                state: req.state,
106                next_stake: req.next_stake,
107                signatures: Default::default(),
108                accumulated_weight: U256::from(0),
109            });
110        tracing::debug!(
111            "Accepting new LCV2 signature for block height {} from {}.",
112            block_height,
113            req.key
114        );
115        match bundle.signatures.entry(req.key) {
116            Entry::Occupied(_) => {
117                // A signature is already posted for this key with this state
118                return Err(ServerError::catch_all(
119                    StatusCode::BAD_REQUEST,
120                    "A LCV2 signature of this light client state is already posted at this block \
121                     height for this key."
122                        .to_owned(),
123                ));
124            },
125            Entry::Vacant(entry) => {
126                entry.insert(req.signature);
127                bundle.accumulated_weight += *weight;
128            },
129        }
130
131        if bundle.accumulated_weight >= stake_table.threshold {
132            tracing::info!(
133                "Light client V2 state signature bundle at block height {} is ready to serve.",
134                block_height
135            );
136            self.latest_block_height = Some(block_height);
137            self.latest_available_bundle = Some(bundle.clone());
138
139            // garbage collect
140            self.prune(block_height);
141        }
142
143        Ok(())
144    }
145}
146
147impl LCV2StateRelayServerState {
148    /// Centralizing all garbage-collection logic, won't panic, won't error, simply do nothing if nothing to prune.
149    /// `until_height` is inclusive, meaning that would also be pruned.
150    pub fn prune(&mut self, until_height: u64) {
151        while let Some(&height) = self.gc_queue.first() {
152            if height > until_height {
153                return;
154            }
155            self.bundles.remove(&height);
156            self.gc_queue.pop_first();
157            tracing::debug!(%height, "garbage collected for ");
158        }
159    }
160
161    pub fn new(stake_table_tracker: Arc<StakeTableTracker>) -> Self {
162        Self {
163            bundles: HashMap::new(),
164            latest_available_bundle: None,
165            latest_block_height: None,
166            gc_queue: BTreeSet::new(),
167            stake_table_tracker,
168        }
169    }
170}