1use std::{sync::Arc, time::Duration};
2
3use anyhow::Context;
4use async_channel::{Receiver, Sender};
5use async_lock::RwLock;
6use clap::Parser;
7use committable::Commitment;
8use derivative::Derivative;
9use espresso_types::{parse_duration, v0::traits::SequencerPersistence, PubKey, ValidatedState};
10use futures::stream::StreamExt;
11use hotshot::types::EventType;
12use hotshot_types::{
13 data::{Leaf2, ViewNumber},
14 traits::{
15 metrics::{Counter, Gauge, Metrics},
16 network::ConnectedNetwork,
17 node_implementation::{ConsensusTime, Versions},
18 ValidatedState as _,
19 },
20 utils::{View, ViewInner},
21};
22use tokio::time::{sleep, timeout};
23use tracing::Instrument;
24
25use crate::{
26 context::{Consensus, TaskList},
27 SeqTypes,
28};
29
30#[derive(Clone, Copy, Debug, Parser)]
31pub struct ProposalFetcherConfig {
32 #[clap(
33 long = "proposal-fetcher-num-workers",
34 env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS",
35 default_value = "2"
36 )]
37 pub num_workers: usize,
38
39 #[clap(
40 long = "proposal-fetcher-fetch-timeout",
41 env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT",
42 default_value = "2s",
43 value_parser = parse_duration,
44 )]
45 pub fetch_timeout: Duration,
46}
47
48impl Default for ProposalFetcherConfig {
49 fn default() -> Self {
50 Self::parse_from(std::iter::empty::<String>())
51 }
52}
53
54impl ProposalFetcherConfig {
55 pub(crate) fn spawn<N, P, V>(
56 self,
57 tasks: &mut TaskList,
58 consensus: Arc<RwLock<Consensus<N, P, V>>>,
59 persistence: Arc<P>,
60 metrics: &(impl Metrics + ?Sized),
61 ) where
62 N: ConnectedNetwork<PubKey>,
63 P: SequencerPersistence,
64 V: Versions,
65 {
66 let (sender, receiver) = async_channel::unbounded();
67 let fetcher = ProposalFetcher {
68 sender,
69 consensus,
70 persistence,
71 cfg: self,
72 metrics: ProposalFetcherMetrics::new(metrics),
73 };
74
75 tasks.spawn("proposal scanner", fetcher.clone().scan());
76 for i in 0..self.num_workers {
77 tasks.spawn(
78 format!("proposal fetcher {i}"),
79 fetcher.clone().fetch(receiver.clone()),
80 );
81 }
82 }
83}
84
85#[derive(Clone, Debug)]
86struct ProposalFetcherMetrics {
87 fetched: Arc<dyn Counter>,
88 failed: Arc<dyn Counter>,
89 queue_len: Arc<dyn Gauge>,
90 last_seen: Arc<dyn Gauge>,
91 last_fetched: Arc<dyn Gauge>,
92}
93
94impl ProposalFetcherMetrics {
95 fn new(metrics: &(impl Metrics + ?Sized)) -> Self {
96 let metrics = metrics.subgroup("proposal_fetcher".into());
97 Self {
98 fetched: metrics.create_counter("fetched".into(), None).into(),
99 failed: metrics.create_counter("failed".into(), None).into(),
100 queue_len: metrics.create_gauge("queue_len".into(), None).into(),
101 last_seen: metrics
102 .create_gauge("last_seen".into(), Some("view".into()))
103 .into(),
104 last_fetched: metrics
105 .create_gauge("last_fetched".into(), Some("view".into()))
106 .into(),
107 }
108 }
109}
110
111type Request = (ViewNumber, Commitment<Leaf2<SeqTypes>>);
112
113#[derive(Derivative)]
114#[derivative(Clone(bound = ""), Debug(bound = ""))]
115struct ProposalFetcher<N, P, V>
116where
117 N: ConnectedNetwork<PubKey>,
118 P: SequencerPersistence,
119 V: Versions,
120{
121 sender: Sender<Request>,
122 #[derivative(Debug = "ignore")]
123 consensus: Arc<RwLock<Consensus<N, P, V>>>,
124 #[derivative(Debug = "ignore")]
125 persistence: Arc<P>,
126 cfg: ProposalFetcherConfig,
127 metrics: ProposalFetcherMetrics,
128}
129
130impl<N, P, V> ProposalFetcher<N, P, V>
131where
132 N: ConnectedNetwork<PubKey>,
133 P: SequencerPersistence,
134 V: Versions,
135{
136 #[tracing::instrument(skip_all)]
137 async fn scan(self) {
138 let mut events = self.consensus.read().await.event_stream();
139 while let Some(event) = events.next().await {
140 let EventType::QuorumProposal { proposal, .. } = event.event else {
141 continue;
142 };
143 let parent_view = proposal.data.justify_qc().view_number;
146 let parent_leaf = proposal.data.justify_qc().data.leaf_commit;
147 self.request((parent_view, parent_leaf)).await;
148 }
149 }
150
151 #[tracing::instrument(skip_all)]
152 async fn fetch(self, receiver: Receiver<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>) {
153 let mut receiver = std::pin::pin!(receiver);
154 while let Some(req) = receiver.next().await {
155 self.fetch_request(req).await;
156 }
157 }
158
159 async fn request(&self, req: Request) {
160 self.sender.send(req).await.ok();
161 self.metrics.queue_len.set(self.sender.len());
162 self.metrics.last_seen.set(req.0.u64() as usize);
163 }
164
165 async fn fetch_request(&self, (view, leaf): Request) {
166 let span = tracing::warn_span!("fetch proposal", ?view, %leaf);
167 let res: anyhow::Result<()> = async {
168 let anchor_view = self
169 .persistence
170 .load_anchor_view()
171 .await
172 .context("loading anchor view")?;
173 if view <= anchor_view {
174 tracing::debug!(?anchor_view, "skipping already-decided proposal");
175 return Ok(());
176 }
177
178 match self.persistence.load_quorum_proposal(view).await {
179 Ok(proposal) => {
180 let view = proposal.data.justify_qc().view_number;
183 let leaf = proposal.data.justify_qc().data.leaf_commit;
184 self.request((view, leaf)).await;
185 return Ok(());
186 },
187 Err(err) => {
188 tracing::info!("proposal missing from storage; fetching from network: {err:#}");
189 },
190 }
191
192 let future = self.consensus.read().await.request_proposal(view, leaf)?;
193 let proposal = timeout(self.cfg.fetch_timeout, future)
194 .await
195 .context("timed out fetching proposal")?
196 .context("error fetching proposal")?;
197 self.persistence
198 .append_quorum_proposal2(&proposal)
199 .await
200 .context("error saving fetched proposal")?;
201
202 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
204 let handle = self.consensus.read().await;
205 let consensus = handle.consensus();
206 let mut consensus = consensus.write().await;
207 if matches!(
208 consensus.validated_state_map().get(&view),
209 None | Some(View {
210 view_inner: ViewInner::Da { .. }
212 })
213 ) {
214 let state = Arc::new(ValidatedState::from_header(leaf.block_header()));
215 if let Err(err) = consensus.update_leaf(leaf, state, None) {
216 tracing::warn!("unable to update leaf: {err:#}");
217 }
218 }
219
220 self.metrics.last_fetched.set(view.u64() as usize);
221 self.metrics.fetched.add(1);
222
223 Ok(())
224 }
225 .instrument(span)
226 .await;
227 if let Err(err) = res {
228 tracing::warn!("failed to fetch proposal: {err:#}");
229 self.metrics.failed.add(1);
230
231 sleep(Duration::from_secs(1)).await;
233
234 self.request((view, leaf)).await;
237 }
238 }
239}