sequencer/
proposal_fetcher.rs

1use std::{sync::Arc, time::Duration};
2
3use anyhow::Context;
4use async_channel::{Receiver, Sender};
5use async_lock::RwLock;
6use clap::Parser;
7use committable::Commitment;
8use derivative::Derivative;
9use espresso_types::{parse_duration, v0::traits::SequencerPersistence, PubKey, ValidatedState};
10use futures::stream::StreamExt;
11use hotshot::types::EventType;
12use hotshot_types::{
13    data::{Leaf2, ViewNumber},
14    traits::{
15        metrics::{Counter, Gauge, Metrics},
16        network::ConnectedNetwork,
17        node_implementation::{ConsensusTime, Versions},
18        ValidatedState as _,
19    },
20    utils::{View, ViewInner},
21};
22use tokio::time::{sleep, timeout};
23use tracing::Instrument;
24
25use crate::{
26    context::{Consensus, TaskList},
27    SeqTypes,
28};
29
30#[derive(Clone, Copy, Debug, Parser)]
31pub struct ProposalFetcherConfig {
32    #[clap(
33        long = "proposal-fetcher-num-workers",
34        env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS",
35        default_value = "2"
36    )]
37    pub num_workers: usize,
38
39    #[clap(
40        long = "proposal-fetcher-fetch-timeout",
41        env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT",
42        default_value = "2s",
43        value_parser = parse_duration,
44    )]
45    pub fetch_timeout: Duration,
46}
47
48impl Default for ProposalFetcherConfig {
49    fn default() -> Self {
50        Self::parse_from(std::iter::empty::<String>())
51    }
52}
53
54impl ProposalFetcherConfig {
55    pub(crate) fn spawn<N, P, V>(
56        self,
57        tasks: &mut TaskList,
58        consensus: Arc<RwLock<Consensus<N, P, V>>>,
59        persistence: Arc<P>,
60        metrics: &(impl Metrics + ?Sized),
61    ) where
62        N: ConnectedNetwork<PubKey>,
63        P: SequencerPersistence,
64        V: Versions,
65    {
66        let (sender, receiver) = async_channel::unbounded();
67        let fetcher = ProposalFetcher {
68            sender,
69            consensus,
70            persistence,
71            cfg: self,
72            metrics: ProposalFetcherMetrics::new(metrics),
73        };
74
75        tasks.spawn("proposal scanner", fetcher.clone().scan());
76        for i in 0..self.num_workers {
77            tasks.spawn(
78                format!("proposal fetcher {i}"),
79                fetcher.clone().fetch(receiver.clone()),
80            );
81        }
82    }
83}
84
85#[derive(Clone, Debug)]
86struct ProposalFetcherMetrics {
87    fetched: Arc<dyn Counter>,
88    failed: Arc<dyn Counter>,
89    queue_len: Arc<dyn Gauge>,
90    last_seen: Arc<dyn Gauge>,
91    last_fetched: Arc<dyn Gauge>,
92}
93
94impl ProposalFetcherMetrics {
95    fn new(metrics: &(impl Metrics + ?Sized)) -> Self {
96        let metrics = metrics.subgroup("proposal_fetcher".into());
97        Self {
98            fetched: metrics.create_counter("fetched".into(), None).into(),
99            failed: metrics.create_counter("failed".into(), None).into(),
100            queue_len: metrics.create_gauge("queue_len".into(), None).into(),
101            last_seen: metrics
102                .create_gauge("last_seen".into(), Some("view".into()))
103                .into(),
104            last_fetched: metrics
105                .create_gauge("last_fetched".into(), Some("view".into()))
106                .into(),
107        }
108    }
109}
110
111type Request = (ViewNumber, Commitment<Leaf2<SeqTypes>>);
112
113#[derive(Derivative)]
114#[derivative(Clone(bound = ""), Debug(bound = ""))]
115struct ProposalFetcher<N, P, V>
116where
117    N: ConnectedNetwork<PubKey>,
118    P: SequencerPersistence,
119    V: Versions,
120{
121    sender: Sender<Request>,
122    #[derivative(Debug = "ignore")]
123    consensus: Arc<RwLock<Consensus<N, P, V>>>,
124    #[derivative(Debug = "ignore")]
125    persistence: Arc<P>,
126    cfg: ProposalFetcherConfig,
127    metrics: ProposalFetcherMetrics,
128}
129
130impl<N, P, V> ProposalFetcher<N, P, V>
131where
132    N: ConnectedNetwork<PubKey>,
133    P: SequencerPersistence,
134    V: Versions,
135{
136    #[tracing::instrument(skip_all)]
137    async fn scan(self) {
138        let mut events = self.consensus.read().await.event_stream();
139        while let Some(event) = events.next().await {
140            let EventType::QuorumProposal { proposal, .. } = event.event else {
141                continue;
142            };
143            // Whenever we see a quorum proposal, ensure we have the chain of proposals stretching back
144            // to the anchor. This allows state replay from the decided state.
145            let parent_view = proposal.data.justify_qc().view_number;
146            let parent_leaf = proposal.data.justify_qc().data.leaf_commit;
147            self.request((parent_view, parent_leaf)).await;
148        }
149    }
150
151    #[tracing::instrument(skip_all)]
152    async fn fetch(self, receiver: Receiver<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>) {
153        let mut receiver = std::pin::pin!(receiver);
154        while let Some(req) = receiver.next().await {
155            self.fetch_request(req).await;
156        }
157    }
158
159    async fn request(&self, req: Request) {
160        self.sender.send(req).await.ok();
161        self.metrics.queue_len.set(self.sender.len());
162        self.metrics.last_seen.set(req.0.u64() as usize);
163    }
164
165    async fn fetch_request(&self, (view, leaf): Request) {
166        let span = tracing::warn_span!("fetch proposal", ?view, %leaf);
167        let res: anyhow::Result<()> = async {
168            let anchor_view = self
169                .persistence
170                .load_anchor_view()
171                .await
172                .context("loading anchor view")?;
173            if view <= anchor_view {
174                tracing::debug!(?anchor_view, "skipping already-decided proposal");
175                return Ok(());
176            }
177
178            match self.persistence.load_quorum_proposal(view).await {
179                Ok(proposal) => {
180                    // If we already have the proposal in storage, keep traversing the chain to its
181                    // parent.
182                    let view = proposal.data.justify_qc().view_number;
183                    let leaf = proposal.data.justify_qc().data.leaf_commit;
184                    self.request((view, leaf)).await;
185                    return Ok(());
186                },
187                Err(err) => {
188                    tracing::info!("proposal missing from storage; fetching from network: {err:#}");
189                },
190            }
191
192            let future = self.consensus.read().await.request_proposal(view, leaf)?;
193            let proposal = timeout(self.cfg.fetch_timeout, future)
194                .await
195                .context("timed out fetching proposal")?
196                .context("error fetching proposal")?;
197            self.persistence
198                .append_quorum_proposal2(&proposal)
199                .await
200                .context("error saving fetched proposal")?;
201
202            // Add the fetched leaf to HotShot state, so consensus can make use of it.
203            let leaf = Leaf2::from_quorum_proposal(&proposal.data);
204            let handle = self.consensus.read().await;
205            let consensus = handle.consensus();
206            let mut consensus = consensus.write().await;
207            if matches!(
208                consensus.validated_state_map().get(&view),
209                None | Some(View {
210                    // Replace a Da-only view with a Leaf view, which has strictly more information.
211                    view_inner: ViewInner::Da { .. }
212                })
213            ) {
214                let state = Arc::new(ValidatedState::from_header(leaf.block_header()));
215                if let Err(err) = consensus.update_leaf(leaf, state, None) {
216                    tracing::warn!("unable to update leaf: {err:#}");
217                }
218            }
219
220            self.metrics.last_fetched.set(view.u64() as usize);
221            self.metrics.fetched.add(1);
222
223            Ok(())
224        }
225        .instrument(span)
226        .await;
227        if let Err(err) = res {
228            tracing::warn!("failed to fetch proposal: {err:#}");
229            self.metrics.failed.add(1);
230
231            // Avoid busy loop when operations are failing.
232            sleep(Duration::from_secs(1)).await;
233
234            // If we fail fetching the proposal, don't let it clog up the fetching task. Just push
235            // it back onto the queue and move onto the next proposal.
236            self.request((view, leaf)).await;
237        }
238    }
239}