1use std::{
2 cmp::{min, Ordering},
3 num::NonZeroUsize,
4 pin::Pin,
5 result::Result as StdResult,
6 sync::Arc,
7 time::Instant,
8};
9
10use alloy::{
11 eips::BlockId,
12 hex,
13 primitives::{Address, B256, U256},
14 providers::{Provider, ProviderBuilder, WsConnect},
15 rpc::{
16 client::RpcClient,
17 json_rpc::{RequestPacket, ResponsePacket},
18 types::Block,
19 },
20 transports::{http::Http, RpcError, TransportErrorKind},
21};
22use anyhow::Context;
23use async_trait::async_trait;
24use clap::Parser;
25use committable::{Commitment, Committable, RawCommitmentBuilder};
26use futures::{
27 future::{Future, TryFuture, TryFutureExt},
28 stream::{self, StreamExt},
29};
30use hotshot_contract_adapter::sol_types::FeeContract;
31use hotshot_types::traits::metrics::Metrics;
32use lru::LruCache;
33use parking_lot::RwLock;
34use tokio::{
35 spawn,
36 sync::{Mutex, MutexGuard, Notify},
37 time::{sleep, Duration},
38};
39use tower_service::Service;
40use tracing::Instrument;
41use url::Url;
42
43use super::{
44 v0_1::{L1BlockInfoWithParent, SingleTransport, SingleTransportStatus, SwitchingTransport},
45 L1BlockInfo, L1ClientMetrics, L1State, L1UpdateTask,
46};
47use crate::{FeeInfo, L1Client, L1ClientOptions, L1Event, L1Snapshot};
48
49impl PartialOrd for L1BlockInfo {
50 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
51 Some(self.cmp(other))
52 }
53}
54
55impl Ord for L1BlockInfo {
56 fn cmp(&self, other: &Self) -> Ordering {
57 self.number.cmp(&other.number)
58 }
59}
60
61impl From<&Block> for L1BlockInfo {
62 fn from(block: &Block) -> Self {
63 Self {
64 number: block.header.number,
65 timestamp: U256::from(block.header.timestamp),
66 hash: block.header.hash,
67 }
68 }
69}
70
71impl From<&Block> for L1BlockInfoWithParent {
72 fn from(block: &Block) -> Self {
73 Self {
74 info: block.into(),
75 parent_hash: block.header.parent_hash,
76 }
77 }
78}
79
80impl Committable for L1BlockInfo {
81 fn commit(&self) -> Commitment<Self> {
82 let timestamp: [u8; 32] = self.timestamp.to_le_bytes();
83
84 RawCommitmentBuilder::new(&Self::tag())
85 .u64_field("number", self.number)
86 .constant_str("timestamp")
88 .fixed_size_bytes(×tamp)
89 .constant_str("hash")
90 .fixed_size_bytes(&self.hash.0)
91 .finalize()
92 }
93
94 fn tag() -> String {
95 "L1BLOCK".into()
96 }
97}
98
99impl L1BlockInfo {
100 pub fn number(&self) -> u64 {
101 self.number
102 }
103
104 pub fn timestamp(&self) -> U256 {
105 self.timestamp
106 }
107
108 pub fn hash(&self) -> B256 {
109 self.hash
110 }
111}
112
113impl Drop for L1UpdateTask {
114 fn drop(&mut self) {
115 if let Some(task) = self.0.get_mut().take() {
116 task.abort();
117 }
118 }
119}
120
121impl Default for L1ClientOptions {
122 fn default() -> Self {
123 Self::parse_from(std::iter::empty::<String>())
124 }
125}
126
127impl L1ClientOptions {
128 pub fn with_metrics(mut self, metrics: &(impl Metrics + ?Sized)) -> Self {
130 self.metrics = Arc::new(metrics.subgroup("l1".into()));
131 self
132 }
133
134 pub fn connect(self, urls: Vec<Url>) -> anyhow::Result<L1Client> {
136 let t = SwitchingTransport::new(self, urls)
138 .with_context(|| "failed to create switching transport")?;
139 Ok(L1Client::with_transport(t))
141 }
142
143 fn rate_limit_delay(&self) -> Duration {
144 self.l1_rate_limit_delay.unwrap_or(self.l1_retry_delay)
145 }
146}
147
148impl L1ClientMetrics {
149 fn new(metrics: &(impl Metrics + ?Sized), num_urls: usize) -> Self {
150 let failures = metrics.counter_family("failed_requests".into(), vec!["provider".into()]);
152
153 let mut failure_metrics = Vec::with_capacity(num_urls);
155 for url_index in 0..num_urls {
156 failure_metrics.push(failures.create(vec![url_index.to_string()]));
157 }
158
159 Self {
160 head: metrics.create_gauge("head".into(), None).into(),
161 finalized: metrics.create_gauge("finalized".into(), None).into(),
162 reconnects: metrics
163 .create_counter("stream_reconnects".into(), None)
164 .into(),
165 failovers: metrics.create_counter("failovers".into(), None).into(),
166 failures: Arc::new(failure_metrics),
167 }
168 }
169}
170
171impl SwitchingTransport {
172 pub fn new(opt: L1ClientOptions, urls: Vec<Url>) -> anyhow::Result<Self> {
174 let Some(first_url) = urls.first().cloned() else {
176 return Err(anyhow::anyhow!("No valid URLs provided"));
177 };
178
179 let metrics = L1ClientMetrics::new(&**opt.metrics, urls.len());
181
182 let first_transport = Arc::new(RwLock::new(SingleTransport::new(&first_url, 0, None)));
184
185 Ok(Self {
186 urls: Arc::new(urls),
187 current_transport: first_transport,
188 opt: Arc::new(opt),
189 metrics,
190 switch_notify: Arc::new(Notify::new()),
191 })
192 }
193
194 async fn wait_switch(&self) {
196 self.switch_notify.notified().await;
197 }
198
199 fn options(&self) -> &L1ClientOptions {
200 &self.opt
201 }
202
203 fn metrics(&self) -> &L1ClientMetrics {
204 &self.metrics
205 }
206}
207
208impl SingleTransportStatus {
209 fn log_success(&mut self) {
211 self.consecutive_failures = 0;
212 }
213
214 fn log_failure(&mut self, opt: &L1ClientOptions) -> bool {
216 self.consecutive_failures += 1;
218
219 let should_switch = self.should_switch(opt);
221
222 self.last_failure = Some(Instant::now());
224
225 should_switch
227 }
228
229 fn should_switch(&mut self, opt: &L1ClientOptions) -> bool {
231 if self.shutting_down {
233 return false;
234 }
235
236 if self.consecutive_failures >= opt.l1_consecutive_failure_tolerance {
238 self.shutting_down = true;
239 return true;
240 }
241
242 let now = Instant::now();
244 if let Some(prev) = self.last_failure {
245 if now.saturating_duration_since(prev) < opt.l1_frequent_failure_tolerance {
246 self.shutting_down = true;
247 return true;
248 }
249 }
250
251 false
252 }
253
254 fn should_revert(&mut self, revert_at: Option<Instant>) -> bool {
256 if self.shutting_down {
257 return false;
259 }
260 let Some(revert_at) = revert_at else {
261 return false;
262 };
263 if Instant::now() >= revert_at {
264 self.shutting_down = true;
265 return true;
266 }
267
268 false
269 }
270}
271
272impl SingleTransport {
273 fn new(url: &Url, generation: usize, revert_at: Option<Instant>) -> Self {
275 Self {
276 generation,
277 client: Http::new(url.clone()),
278 status: Default::default(),
279 revert_at,
280 }
281 }
282}
283
284#[async_trait]
289impl Service<RequestPacket> for SwitchingTransport {
290 type Error = RpcError<TransportErrorKind>;
291 type Response = ResponsePacket;
292 type Future =
293 Pin<Box<dyn Future<Output = Result<ResponsePacket, RpcError<TransportErrorKind>>> + Send>>;
294
295 fn poll_ready(
296 &mut self,
297 cx: &mut std::task::Context<'_>,
298 ) -> std::task::Poll<StdResult<(), Self::Error>> {
299 self.current_transport.read().clone().client.poll_ready(cx)
301 }
302
303 fn call(&mut self, req: RequestPacket) -> Self::Future {
304 let self_clone = self.clone();
306
307 Box::pin(async move {
309 let mut current_transport = self_clone.current_transport.read().clone();
311
312 let should_revert = current_transport
314 .status
315 .write()
316 .should_revert(current_transport.revert_at);
317 if should_revert {
318 let n = self_clone.urls.len();
320 let prev_primary_gen = (current_transport.generation / n) * n;
322 let next_gen = prev_primary_gen + n;
324 current_transport = self_clone.switch_to(next_gen, current_transport);
325 }
326
327 let rate_limit_until = current_transport.status.read().rate_limited_until;
329 if let Some(t) = rate_limit_until {
330 if t > Instant::now() {
331 return Err(RpcError::Transport(TransportErrorKind::Custom(
333 "Rate limit exceeded".into(),
334 )));
335 } else {
336 current_transport.status.write().rate_limited_until = None;
338 }
339 }
340
341 match current_transport.client.call(req).await {
343 Ok(res) => {
344 current_transport.status.write().log_success();
346 Ok(res)
347 },
348 Err(err) => {
349 if let Some(f) = self_clone
351 .metrics
352 .failures
353 .get(current_transport.generation % self_clone.urls.len())
354 {
355 f.add(1);
356 }
357
358 if let RpcError::ErrorResp(e) = &err {
362 if e.code == 429 {
364 current_transport.status.write().rate_limited_until =
365 Some(Instant::now() + self_clone.opt.rate_limit_delay());
366 return Err(err);
367 }
368 }
369
370 tracing::warn!(?err, "L1 client error");
372
373 if current_transport
376 .status
377 .write()
378 .log_failure(&self_clone.opt)
379 {
380 self_clone.metrics.failovers.add(1);
382 self_clone.switch_to(current_transport.generation + 1, current_transport);
383 }
384
385 Err(err)
386 },
387 }
388 })
389 }
390}
391
392impl SwitchingTransport {
393 fn switch_to(&self, next_gen: usize, current_transport: SingleTransport) -> SingleTransport {
394 let next_index = next_gen % self.urls.len();
395 let url = self.urls[next_index].clone();
396 tracing::info!(%url, next_gen, "switch L1 transport");
397
398 let revert_at = if next_gen % self.urls.len() == 0 {
399 None
401 } else if current_transport.generation % self.urls.len() == 0 {
402 Some(Instant::now() + self.opt.l1_failover_revert)
405 } else {
406 current_transport.revert_at
408 };
409
410 let new_transport = SingleTransport::new(&url, next_gen, revert_at);
412
413 *self.current_transport.write() = new_transport.clone();
415
416 self.switch_notify.notify_waiters();
418
419 new_transport
420 }
421}
422
423impl L1Client {
424 fn with_transport(transport: SwitchingTransport) -> Self {
425 let rpc_client = RpcClient::new(transport.clone(), false);
427 let provider = ProviderBuilder::new().on_client(rpc_client);
428
429 let opt = transport.options().clone();
430
431 let (sender, mut receiver) = async_broadcast::broadcast(opt.l1_events_channel_capacity);
432 receiver.set_await_active(false);
433 receiver.set_overflow(true);
434
435 Self {
436 provider,
437 transport,
438 state: Arc::new(Mutex::new(L1State::new(opt.l1_blocks_cache_size))),
439 sender,
440 receiver: receiver.deactivate(),
441 update_task: Default::default(),
442 }
443 }
444
445 pub fn new(url: Vec<Url>) -> anyhow::Result<Self> {
447 L1ClientOptions::default().connect(url)
448 }
449
450 pub fn anvil(anvil: &alloy::node_bindings::AnvilInstance) -> anyhow::Result<Self> {
452 L1ClientOptions {
453 l1_ws_provider: Some(vec![anvil.ws_endpoint().parse()?]),
454 ..Default::default()
455 }
456 .connect(vec![anvil.endpoint().parse()?])
457 }
458
459 pub async fn spawn_tasks(&self) {
461 let mut update_task = self.update_task.0.lock().await;
462 if update_task.is_none() {
463 *update_task = Some(spawn(self.update_loop()));
464 }
465 }
466
467 pub async fn shut_down_tasks(&self) {
472 let update_task = self.update_task.0.lock().await.take();
473 if let Some(update_task) = update_task {
474 update_task.abort();
475 }
476 }
477
478 fn update_loop(&self) -> impl Future<Output = ()> {
479 let opt = self.options().clone();
480 let rpc = self.provider.clone();
481 let ws_urls = opt.l1_ws_provider.clone();
482 let retry_delay = opt.l1_retry_delay;
483 let subscription_timeout = opt.subscription_timeout;
484 let state = self.state.clone();
485 let sender = self.sender.clone();
486 let metrics = self.metrics().clone();
487 let polling_interval = opt.l1_polling_interval;
488 let transport = self.transport.clone();
489
490 let span = tracing::warn_span!("L1 client update");
491
492 async move {
493
494 for i in 0.. {
495 let ws;
496
497 let l1_head = loop {
500 match rpc.get_block(BlockId::latest()).await {
501 Ok(Some(block)) => break block.header,
502 Ok(None) => {
503 tracing::info!("Failed to fetch L1 head block, will retry");
504 },
505 Err(err) => {
506 tracing::info!("Failed to fetch L1 head block, will retry: err {err}");
507 }
508 }
509 sleep(retry_delay).await;
510 };
511
512 let mut block_stream = {
514 let res = match &ws_urls {
515 Some(urls) => {
516 let provider = i % urls.len();
519 let url = &urls[provider];
520 ws = match ProviderBuilder::new().on_ws(WsConnect::new(url.clone())).await {
521 Ok(ws) => ws,
522 Err(err) => {
523 tracing::warn!(provider, "Failed to connect WebSockets provider: {err:#}");
524 sleep(retry_delay).await;
525 continue;
526 }
527 };
528 ws.subscribe_blocks().await.map(|stream| {stream::once(async { l1_head.clone() }).chain(stream.into_stream()).boxed()})
529 },
530 None => {
531 rpc
532 .watch_blocks()
533 .await
534 .map(|poller_builder| {
535 let stream = poller_builder.with_poll_interval(polling_interval).into_stream();
537
538 let rpc = rpc.clone();
539
540 stream::once(async { l1_head.clone() })
544 .chain(
545 stream.map(stream::iter).flatten().filter_map(move |hash| {
546 let rpc = rpc.clone();
547 async move {
548 match rpc.get_block(BlockId::hash(hash)).await {
549 Ok(Some(block)) => Some(block.header),
550 Ok(None) => {
553 tracing::warn!(%hash, "HTTP stream yielded a block hash that was not available");
554 None
555 }
556 Err(err) => {
557 tracing::warn!(%hash, "Error fetching block from HTTP stream: {err:#}");
558 None
559 }
560 }
561 }
562 }))
563 }.take_until(transport.wait_switch())
565 .boxed())
566 }
567 };
568 match res {
569 Ok(stream) => stream,
570 Err(err) => {
571 tracing::error!("Error subscribing to L1 blocks: {err:#}");
572 sleep(retry_delay).await;
573 continue;
574 }
575 }
576 };
577
578 tracing::info!("Established L1 block stream");
579 loop {
580 let block_timeout = tokio::time::timeout(subscription_timeout, block_stream.next()).await;
582 match block_timeout {
583 Ok(Some(head)) => {
585 let head = head.number;
586 tracing::debug!(head, "Received L1 block");
587
588 let finalized = loop {
591 match fetch_finalized_block_from_rpc(&rpc).await {
592 Ok(finalized) => break finalized,
593 Err(err) => {
594 tracing::warn!("Error getting finalized block: {err:#}");
595 sleep(retry_delay).await;
596 }
597 }
598 };
599
600 let mut state = state.lock().await;
602 if head > state.snapshot.head {
603 tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated");
604 metrics.head.set(head as usize);
605 state.snapshot.head = head;
606 sender
609 .broadcast_direct(L1Event::NewHead { head })
610 .await
611 .ok();
612 }
613 if let Some(finalized) = finalized {
614 if Some(finalized.info) > state.snapshot.finalized {
615 tracing::info!(
616 ?finalized,
617 old_finalized = ?state.snapshot.finalized,
618 "L1 finalized updated",
619 );
620 metrics.finalized.set(finalized.info.number as usize);
621 state.snapshot.finalized = Some(finalized.info);
622 state.put_finalized(finalized);
623 sender
624 .broadcast_direct(L1Event::NewFinalized { finalized })
625 .await
626 .ok();
627 }
628 }
629 tracing::debug!("Updated L1 snapshot to {:?}", state.snapshot);
630 }
631 Ok(None) => {
633 tracing::error!("L1 block stream ended unexpectedly, trying to re-establish block stream");
634 break;
635 }
636 Err(_) => {
638 tracing::error!("No block received for {} seconds, trying to re-establish block stream", subscription_timeout.as_secs());
639 break;
640 }
641 }
642 }
643
644 metrics.reconnects.add(1);
645 }
646 }.instrument(span)
647 }
648
649 pub async fn snapshot(&self) -> L1Snapshot {
651 self.state.lock().await.snapshot
652 }
653
654 pub async fn wait_for_block(&self, number: u64) {
660 loop {
661 let mut events = self.receiver.activate_cloned();
664
665 {
667 let state = self.state.lock().await;
668 if state.snapshot.head >= number {
669 return;
670 }
671 tracing::info!(number, head = state.snapshot.head, "Waiting for l1 block");
672 }
673
674 while let Some(event) = events.next().await {
676 let L1Event::NewHead { head } = event else {
677 continue;
678 };
679 if head >= number {
680 tracing::info!(number, head, "Got L1 block");
681 return;
682 }
683 tracing::debug!(number, head, "Waiting for L1 block");
684 }
685
686 tracing::warn!(number, "L1 event stream ended unexpectedly; retry");
688 self.retry_delay().await;
689 }
690 }
691
692 pub async fn wait_for_finalized_block(&self, number: u64) -> L1BlockInfo {
697 loop {
698 let mut events = self.receiver.activate_cloned();
701
702 {
704 let state = self.state.lock().await;
705 if let Some(finalized) = state.snapshot.finalized {
706 if finalized.number >= number {
707 return self.fetch_finalized_block_by_number(state, number).await.1;
708 }
709 tracing::info!(
710 number,
711 finalized = ?state.snapshot.finalized,
712 "waiting for l1 finalized block",
713 );
714 };
715 }
716
717 while let Some(event) = events.next().await {
719 let L1Event::NewFinalized { finalized } = event else {
720 continue;
721 };
722 let mut state = self.state.lock().await;
723 state.put_finalized(finalized);
724 if finalized.info.number >= number {
725 tracing::info!(number, ?finalized, "got finalized L1 block");
726 return self.fetch_finalized_block_by_number(state, number).await.1;
727 }
728 tracing::debug!(number, ?finalized, "waiting for finalized L1 block");
729 }
730
731 tracing::warn!(number, "L1 event stream ended unexpectedly; retry",);
733 self.retry_delay().await;
734 }
735 }
736
737 pub async fn wait_for_finalized_block_with_timestamp(&self, timestamp: U256) -> L1BlockInfo {
740 let (mut state, mut block) = 'outer: loop {
742 let mut events = self.receiver.activate_cloned();
745
746 {
748 let state = self.state.lock().await;
749 if let Some(finalized) = state.snapshot.finalized {
750 if finalized.timestamp >= timestamp {
751 break 'outer (state, finalized);
752 }
753 }
754 tracing::info!(
755 %timestamp,
756 finalized = ?state.snapshot.finalized,
757 "waiting for L1 finalized block",
758 );
759 }
760
761 while let Some(event) = events.next().await {
763 let L1Event::NewFinalized { finalized } = event else {
764 continue;
765 };
766 if finalized.info.timestamp >= timestamp {
767 tracing::info!(%timestamp, ?finalized, "got finalized block");
768 break 'outer (self.state.lock().await, finalized.info);
769 }
770 tracing::debug!(%timestamp, ?finalized, "waiting for L1 finalized block");
771 }
772
773 tracing::warn!(%timestamp, "L1 event stream ended unexpectedly; retry",);
775 self.retry_delay().await;
776 };
777
778 let mut upper_bound = block.number;
787 let mut lower_bound = 0;
788 while lower_bound < upper_bound {
789 let midpoint = (upper_bound + lower_bound) / 2;
790 tracing::debug!(
791 lower_bound,
792 midpoint,
793 upper_bound,
794 %timestamp,
795 ?block,
796 "searching for earliest block with sufficient timestamp"
797 );
798
799 let (state_lock, midpoint_block) =
800 self.fetch_finalized_block_by_number(state, midpoint).await;
801 state = state_lock;
802
803 tracing::debug!(?midpoint_block, %timestamp, "pivot on midpoint block");
804 if midpoint_block.timestamp < timestamp {
805 lower_bound = midpoint + 1;
806 } else {
807 upper_bound = midpoint;
808 block = midpoint_block;
809 }
810 }
811
812 block
813 }
814
815 async fn fetch_finalized_block_by_number<'a>(
816 &'a self,
817 mut state: MutexGuard<'a, L1State>,
818 number: u64,
819 ) -> (MutexGuard<'a, L1State>, L1BlockInfo) {
820 let latest_finalized = state
821 .snapshot
822 .finalized
823 .expect("get_finalized_block called before any blocks are finalized");
824 assert!(
825 number <= latest_finalized.number,
826 "requesting a finalized block {number} that isn't finalized; snapshot: {:?}",
827 state.snapshot,
828 );
829
830 if let Some(safety_margin) = self.options().l1_finalized_safety_margin {
831 if number < latest_finalized.number.saturating_sub(safety_margin) {
832 tracing::debug!(
838 number,
839 ?latest_finalized,
840 "skipping hash check for old finalized block"
841 );
842 let (state, block) = self
843 .load_and_cache_finalized_block(state, number.into())
844 .await;
845 return (state, block.info);
846 }
847 }
848
849 let mut successor_number = number;
855 let mut successor = loop {
856 if let Some(block) = state.finalized.get(&successor_number) {
857 break *block;
858 }
859 successor_number += 1;
860 if successor_number > latest_finalized.number {
861 drop(state);
865 let block = loop {
866 match fetch_finalized_block_from_rpc(&self.provider).await {
867 Ok(Some(block)) => {
868 break block;
869 },
870 Ok(None) => {
871 tracing::warn!(
872 "no finalized block even though finalized snapshot is Some; this \
873 can be caused by an L1 client failover"
874 );
875 self.retry_delay().await;
876 },
877 Err(err) => {
878 tracing::warn!("Error getting finalized block: {err:#}");
879 self.retry_delay().await;
880 },
881 }
882 };
883 state = self.state.lock().await;
884 state.put_finalized(block);
885 break block;
886 }
887 };
888
889 while successor.info.number > number {
892 tracing::debug!(
893 number,
894 ?successor,
895 "checking hash chaining for finalized block"
896 );
897 (state, successor) = self
898 .load_and_cache_finalized_block(state, successor.parent_hash.into())
899 .await;
900 }
901
902 (state, successor.info)
903 }
904
905 async fn load_and_cache_finalized_block<'a>(
906 &'a self,
907 mut state: MutexGuard<'a, L1State>,
908 id: BlockId,
909 ) -> (MutexGuard<'a, L1State>, L1BlockInfoWithParent) {
910 drop(state);
912 let block = loop {
913 let block = match self.provider.get_block(id).await {
914 Ok(Some(block)) => block,
915 Ok(None) => {
916 tracing::warn!(
917 %id,
918 "provider error: finalized L1 block should always be available"
919 );
920 self.retry_delay().await;
921 continue;
922 },
923 Err(err) => {
924 tracing::warn!(%id, "failed to get finalized L1 block: {err:#}");
925 self.retry_delay().await;
926 continue;
927 },
928 };
929 break (&block).into();
930 };
931 state = self.state.lock().await;
932 state.put_finalized(block);
933 (state, block)
934 }
935
936 pub async fn get_finalized_deposits(
939 &self,
940 fee_contract_address: Address,
941 prev_finalized: Option<u64>,
942 new_finalized: u64,
943 ) -> Vec<FeeInfo> {
944 if prev_finalized >= Some(new_finalized) {
947 return vec![];
948 }
949
950 let opt = self.options();
951
952 let prev = prev_finalized.map(|prev| prev + 1).unwrap_or(0);
955
956 let mut start = prev;
959 let end = new_finalized;
960 let chunk_size = opt.l1_events_max_block_range;
961 let chunks = std::iter::from_fn(move || {
962 let chunk_end = min(start + chunk_size - 1, end);
963 if chunk_end < start {
964 return None;
965 }
966
967 let chunk = (start, chunk_end);
968 start = chunk_end + 1;
969 Some(chunk)
970 });
971
972 let events = stream::iter(chunks).then(|(from, to)| {
974 let retry_delay = opt.l1_retry_delay;
975 let fee_contract = FeeContract::new(fee_contract_address, self.provider.clone());
976 async move {
977 tracing::debug!(from, to, "fetch events in range");
978
979 loop {
981 match fee_contract
982 .Deposit_filter()
983 .address(*fee_contract.address())
984 .from_block(from)
985 .to_block(to)
986 .query()
987 .await
988 {
989 Ok(events) => break stream::iter(events),
990 Err(err) => {
991 tracing::warn!(from, to, %err, "Fee L1Event Error");
992 sleep(retry_delay).await;
993 },
994 }
995 }
996 }
997 });
998 events
999 .flatten()
1000 .map(|(deposit, _)| FeeInfo::from(deposit))
1001 .collect()
1002 .await
1003 }
1004
1005 pub async fn is_proxy_contract(&self, proxy_address: Address) -> anyhow::Result<bool> {
1007 let hex_bytes =
1010 hex::decode("360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc")
1011 .expect("Failed to decode hex string");
1012 let implementation_slot = B256::from_slice(&hex_bytes);
1013 let storage = self
1014 .provider
1015 .get_storage_at(proxy_address, implementation_slot.into())
1016 .await?;
1017
1018 let implementation_address = Address::from_slice(&storage.to_be_bytes::<32>()[12..]);
1019
1020 Ok(implementation_address != Address::ZERO)
1022 }
1023
1024 pub async fn retry_on_all_providers<Fut>(
1025 &self,
1026 op: impl Fn() -> Fut,
1027 ) -> Result<Fut::Ok, Fut::Error>
1028 where
1029 Fut: TryFuture,
1030 {
1031 let transport = &self.transport;
1032 let start = transport.current_transport.read().generation % transport.urls.len();
1033 let end = start + transport.urls.len();
1034 loop {
1035 match op().into_future().await {
1036 Ok(res) => return Ok(res),
1037 Err(err) => {
1038 if transport.current_transport.read().generation >= end {
1039 return Err(err);
1040 } else {
1041 self.retry_delay().await;
1042 }
1043 },
1044 }
1045 }
1046 }
1047
1048 pub(crate) fn options(&self) -> &L1ClientOptions {
1049 self.transport.options()
1050 }
1051
1052 fn metrics(&self) -> &L1ClientMetrics {
1053 self.transport.metrics()
1054 }
1055
1056 async fn retry_delay(&self) {
1057 sleep(self.options().l1_retry_delay).await;
1058 }
1059}
1060
1061impl L1State {
1062 fn new(cache_size: NonZeroUsize) -> Self {
1063 Self {
1064 snapshot: Default::default(),
1065 finalized: LruCache::new(cache_size),
1066 last_finalized: None,
1067 }
1068 }
1069
1070 fn put_finalized(&mut self, block: L1BlockInfoWithParent) {
1071 assert!(
1072 self.snapshot.finalized.is_some()
1073 && block.info.number <= self.snapshot.finalized.unwrap().number,
1074 "inserting a finalized block {block:?} that isn't finalized; snapshot: {:?}",
1075 self.snapshot,
1076 );
1077
1078 if Some(block.info.number()) > self.last_finalized {
1079 self.last_finalized = Some(block.info.number());
1080 }
1081
1082 if let Some((old_number, old_block)) = self.finalized.push(block.info.number, block) {
1083 if old_number == block.info.number && block != old_block {
1084 tracing::error!(
1085 ?old_block,
1086 ?block,
1087 "got different info for the same finalized height; something has gone very \
1088 wrong with the L1",
1089 );
1090 }
1091 }
1092 }
1093}
1094
1095async fn fetch_finalized_block_from_rpc(
1096 rpc: &impl Provider,
1097) -> anyhow::Result<Option<L1BlockInfoWithParent>> {
1098 let Some(block) = rpc.get_block(BlockId::finalized()).await? else {
1099 tracing::warn!("no finalized block yet");
1104 return Ok(None);
1105 };
1106
1107 Ok(Some((&block).into()))
1108}
1109
1110#[cfg(test)]
1111mod test {
1112 use std::{ops::Add, time::Duration};
1113
1114 use alloy::{
1115 eips::BlockNumberOrTag,
1116 node_bindings::{Anvil, AnvilInstance},
1117 primitives::utils::parse_ether,
1118 providers::layers::AnvilProvider,
1119 };
1120 use espresso_contract_deployer::{deploy_fee_contract_proxy, Contracts};
1121 use portpicker::pick_unused_port;
1122 use time::OffsetDateTime;
1123
1124 use super::*;
1125
1126 async fn new_l1_client_opt(
1127 anvil: &Arc<AnvilInstance>,
1128 f: impl FnOnce(&mut L1ClientOptions),
1129 ) -> L1Client {
1130 let mut opt = L1ClientOptions {
1131 l1_events_max_block_range: 1,
1132 l1_polling_interval: Duration::from_secs(1),
1133 subscription_timeout: Duration::from_secs(5),
1134 ..Default::default()
1135 };
1136 f(&mut opt);
1137
1138 let l1_client = opt
1139 .connect(vec![anvil.endpoint_url()])
1140 .expect("Failed to create L1 client");
1141
1142 l1_client.spawn_tasks().await;
1143 l1_client
1144 }
1145
1146 async fn new_l1_client(anvil: &Arc<AnvilInstance>, include_ws: bool) -> L1Client {
1147 new_l1_client_opt(anvil, |opt| {
1148 if include_ws {
1149 opt.l1_ws_provider = Some(vec![anvil.ws_endpoint_url()]);
1150 }
1151 })
1152 .await
1153 }
1154
1155 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1156 async fn test_get_finalized_deposits() -> anyhow::Result<()> {
1157 let num_deposits = 5;
1158
1159 let anvil = Anvil::new().spawn();
1160 let wallet = anvil.wallet().unwrap();
1161 let deployer = wallet.default_signer().address();
1162 let inner_provider = ProviderBuilder::new()
1163 .wallet(wallet)
1164 .on_http(anvil.endpoint_url());
1165 let provider = AnvilProvider::new(inner_provider, Arc::new(anvil));
1167 let mut contracts = Contracts::new();
1169
1170 let l1_client = new_l1_client(provider.anvil(), false).await;
1172
1173 let fee_proxy_addr = deploy_fee_contract_proxy(&provider, &mut contracts, deployer).await?;
1175 let fee_proxy = FeeContract::new(fee_proxy_addr, &provider);
1176 let num_tx_for_deploy = provider.get_block_number().await?;
1177
1178 for n in 1..=num_deposits {
1180 let amount = n as f32 / 10.0;
1182 let receipt = fee_proxy
1183 .deposit(deployer)
1184 .value(parse_ether(&amount.to_string())?)
1185 .send()
1186 .await?
1187 .get_receipt()
1188 .await?;
1189 assert!(receipt.inner.is_success());
1190 }
1191
1192 let cur_height = provider.get_block_number().await?;
1193 assert_eq!(num_deposits + num_tx_for_deploy, cur_height);
1194
1195 let pending = l1_client
1199 .get_finalized_deposits(fee_proxy_addr, None, cur_height)
1200 .await;
1201
1202 assert_eq!(num_deposits as usize, pending.len(), "{pending:?}");
1203 assert_eq!(deployer, pending[0].account().0);
1204 assert_eq!(
1205 U256::from(1500000000000000000u64),
1206 pending
1207 .iter()
1208 .fold(U256::from(0), |total, info| total.add(info.amount().0))
1209 );
1210
1211 let pending = l1_client
1213 .get_finalized_deposits(fee_proxy_addr, Some(0), cur_height)
1214 .await;
1215 assert_eq!(num_deposits as usize, pending.len());
1216
1217 let pending = l1_client
1218 .get_finalized_deposits(fee_proxy_addr, Some(0), 0)
1219 .await;
1220 assert_eq!(0, pending.len());
1221
1222 let pending = l1_client
1223 .get_finalized_deposits(fee_proxy_addr, Some(0), 1)
1224 .await;
1225 assert_eq!(0, pending.len());
1226
1227 let pending = l1_client
1228 .get_finalized_deposits(fee_proxy_addr, Some(num_tx_for_deploy), num_tx_for_deploy)
1229 .await;
1230 assert_eq!(0, pending.len());
1231
1232 let pending = l1_client
1233 .get_finalized_deposits(
1234 fee_proxy_addr,
1235 Some(num_tx_for_deploy),
1236 num_tx_for_deploy + 1,
1237 )
1238 .await;
1239 assert_eq!(1, pending.len());
1240
1241 let pending = l1_client
1243 .get_finalized_deposits(fee_proxy_addr, Some(num_tx_for_deploy), 0)
1244 .await;
1245 assert_eq!(0, pending.len());
1246
1247 Ok(())
1248 }
1249
1250 async fn test_wait_for_finalized_block_helper(ws: bool) {
1251 let anvil = Arc::new(Anvil::new().block_time_f64(0.1).spawn());
1252 let l1_client = new_l1_client(&anvil, ws).await;
1253 let provider = &l1_client.provider;
1254
1255 let block_height = provider.get_block_number().await.unwrap();
1257 let block = l1_client.wait_for_finalized_block(block_height + 10).await;
1258 assert_eq!(block.number, block_height + 10);
1259
1260 let true_block = provider
1262 .get_block(BlockId::Number(BlockNumberOrTag::Number(block_height + 10)))
1263 .full()
1264 .await
1265 .unwrap()
1266 .unwrap();
1267
1268 assert_eq!(
1269 block.timestamp.to::<u64>(),
1270 true_block.header.inner.timestamp
1271 );
1272 assert_eq!(block.hash, true_block.header.hash);
1273 }
1274
1275 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1276 async fn test_wait_for_finalized_block_ws() {
1277 test_wait_for_finalized_block_helper(true).await
1278 }
1279
1280 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1281 async fn test_wait_for_finalized_block_http() {
1282 test_wait_for_finalized_block_helper(false).await
1283 }
1284
1285 async fn test_wait_for_old_finalized_block_helper(ws: bool) {
1286 let anvil = Arc::new(Anvil::new().block_time_f64(0.2).spawn());
1287 let l1_client = new_l1_client_opt(&anvil, |opt| {
1288 if ws {
1289 opt.l1_ws_provider = Some(vec![anvil.ws_endpoint_url()]);
1290 }
1291 opt.l1_finalized_safety_margin = Some(1);
1292 })
1293 .await;
1294 let provider = &l1_client.provider;
1295
1296 l1_client.wait_for_finalized_block(2).await;
1298
1299 let block = l1_client.wait_for_finalized_block(0).await;
1301
1302 let true_block = provider.get_block(0.into()).await.unwrap().unwrap();
1304 assert_eq!(block.hash, true_block.header.hash);
1305 }
1306
1307 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1308 async fn test_wait_for_old_finalized_block_ws() {
1309 test_wait_for_old_finalized_block_helper(true).await
1310 }
1311
1312 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1313 async fn test_wait_for_old_finalized_block_http() {
1314 test_wait_for_old_finalized_block_helper(false).await
1315 }
1316
1317 async fn test_wait_for_finalized_block_by_timestamp_helper(ws: bool) {
1318 let anvil = Arc::new(Anvil::new().block_time_f64(0.2).spawn());
1319 let l1_client = new_l1_client(&anvil, ws).await;
1320 let provider = &l1_client.provider;
1321
1322 let timestamp = U256::from(OffsetDateTime::now_utc().unix_timestamp() as u64 + 5);
1324 let block = l1_client
1325 .wait_for_finalized_block_with_timestamp(timestamp)
1326 .await;
1327 assert!(
1328 block.timestamp >= timestamp,
1329 "wait_for_finalized_block_with_timestamp({timestamp}) returned too early a block: \
1330 {block:?}",
1331 );
1332 let parent = provider
1333 .get_block(BlockId::Number(BlockNumberOrTag::Number(block.number - 1)))
1334 .full()
1335 .await
1336 .unwrap()
1337 .unwrap();
1338 assert!(
1339 parent.header.inner.timestamp < timestamp.to::<u64>(),
1340 "wait_for_finalized_block_with_timestamp({timestamp}) did not return the earliest \
1341 possible block: returned {block:?}, but earlier block {parent:?} has an acceptable \
1342 timestamp too",
1343 );
1344
1345 let true_block = provider
1347 .get_block(BlockId::Number(BlockNumberOrTag::Number(block.number)))
1348 .await
1349 .unwrap()
1350 .unwrap();
1351 assert_eq!(
1352 block.timestamp.to::<u64>(),
1353 true_block.header.inner.timestamp
1354 );
1355 assert_eq!(block.hash, true_block.header.hash);
1356 }
1357
1358 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1359 async fn test_wait_for_finalized_block_by_timestamp_ws() {
1360 test_wait_for_finalized_block_by_timestamp_helper(true).await
1361 }
1362
1363 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1364 async fn test_wait_for_finalized_block_by_timestamp_http() {
1365 test_wait_for_finalized_block_by_timestamp_helper(false).await
1366 }
1367
1368 async fn test_wait_for_old_finalized_block_by_timestamp_helper(ws: bool) {
1369 let anvil = Arc::new(Anvil::new().block_time_f64(0.2).spawn());
1370 let l1_client = new_l1_client(&anvil, ws).await;
1371
1372 let true_block = l1_client.wait_for_finalized_block(0).await;
1374 let timestamp = true_block.timestamp;
1375
1376 l1_client.wait_for_finalized_block(10).await;
1378
1379 let block = l1_client
1381 .wait_for_finalized_block_with_timestamp(U256::from(timestamp))
1382 .await;
1383 assert_eq!(block, true_block);
1384 }
1385
1386 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1387 async fn test_wait_for_old_finalized_block_by_timestamp_ws() {
1388 test_wait_for_old_finalized_block_by_timestamp_helper(true).await
1389 }
1390
1391 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1392 async fn test_wait_for_old_finalized_block_by_timestamp_http() {
1393 test_wait_for_old_finalized_block_by_timestamp_helper(false).await
1394 }
1395
1396 async fn test_wait_for_block_helper(ws: bool) {
1397 let anvil = Arc::new(Anvil::new().block_time_f64(0.1).spawn());
1398 let l1_client = new_l1_client(&anvil, ws).await;
1399 let provider = &l1_client.provider;
1400
1401 let block_height = provider.get_block_number().await.unwrap();
1403 l1_client.wait_for_block(block_height + 10).await;
1404
1405 let new_block_height = provider.get_block_number().await.unwrap();
1406 assert!(
1407 new_block_height >= block_height + 10,
1408 "wait_for_block returned too early; initial height = {block_height}, new height = \
1409 {new_block_height}",
1410 );
1411 }
1412
1413 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1414 async fn test_wait_for_block_ws() {
1415 test_wait_for_block_helper(true).await
1416 }
1417
1418 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1419 async fn test_wait_for_block_http() {
1420 test_wait_for_block_helper(false).await
1421 }
1422
1423 async fn test_reconnect_update_task_helper(ws: bool) {
1424 let port = pick_unused_port().unwrap();
1425 let anvil = Arc::new(Anvil::new().block_time(1).port(port).spawn());
1426 let client = new_l1_client(&anvil, ws).await;
1427
1428 let initial_state = client.snapshot().await;
1429 tracing::info!(?initial_state, "initial state");
1430
1431 let mut retry = 0;
1433 let updated_state = loop {
1434 assert!(retry < 10, "state did not update in time");
1435
1436 let updated_state = client.snapshot().await;
1437 if updated_state.head > initial_state.head {
1438 break updated_state;
1439 }
1440 tracing::info!(retry, "waiting for state update");
1441 sleep(Duration::from_secs(1)).await;
1442 retry += 1;
1443 };
1444 tracing::info!(?updated_state, "state updated");
1445
1446 drop(anvil);
1450
1451 tracing::info!("sleep 5");
1456 sleep(Duration::from_secs(5)).await;
1457
1458 tracing::info!("restarting L1");
1460 let _anvil = Anvil::new().block_time(1).port(port).spawn();
1461
1462 let mut retry = 0;
1463 let final_state = loop {
1464 assert!(retry < 5, "state did not update in time");
1465
1466 let final_state = client.snapshot().await;
1467 if final_state.head > updated_state.head {
1468 break final_state;
1469 }
1470 tracing::info!(retry, "waiting for state update");
1471 sleep(Duration::from_secs(1)).await;
1472 retry += 1;
1473 };
1474 tracing::info!(?final_state, "state updated");
1475 }
1476
1477 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1478 async fn test_reconnect_update_task_ws() {
1479 test_reconnect_update_task_helper(true).await
1480 }
1481
1482 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1483 async fn test_reconnect_update_task_http() {
1484 test_reconnect_update_task_helper(false).await
1485 }
1486
1487 fn get_failover_index(provider: &L1Client) -> usize {
1527 let transport = &provider.transport;
1528 provider.transport.current_transport.read().generation % transport.urls.len()
1529 }
1530
1531 async fn test_failover_update_task_helper(ws: bool) {
1532 let anvil = Anvil::new().block_time(1).spawn();
1533
1534 let client = L1ClientOptions {
1537 l1_polling_interval: Duration::from_secs(1),
1538 subscription_timeout: Duration::from_secs(1000),
1541 l1_ws_provider: if ws {
1542 Some(vec![
1543 "ws://notarealurl:1234".parse().unwrap(),
1544 anvil.ws_endpoint_url(),
1545 ])
1546 } else {
1547 None
1548 },
1549 ..Default::default()
1550 }
1551 .connect(vec![
1552 "http://notarealurl:1234".parse().unwrap(),
1553 anvil.endpoint_url(),
1554 ])
1555 .expect("Failed to create L1 client");
1556
1557 client.spawn_tasks().await;
1558
1559 let initial_state = client.snapshot().await;
1560 tracing::info!(?initial_state, "initial state");
1561
1562 let mut retry = 0;
1564 let updated_state = loop {
1565 assert!(retry < 10, "state did not update in time");
1566
1567 let updated_state = client.snapshot().await;
1568 if updated_state.head > initial_state.head {
1569 break updated_state;
1570 }
1571 tracing::info!(retry, "waiting for state update");
1572 sleep(Duration::from_secs(1)).await;
1573 retry += 1;
1574 };
1575 tracing::info!(?updated_state, "state updated");
1576 }
1577
1578 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1579 async fn test_failover_update_task_ws() {
1580 test_failover_update_task_helper(true).await;
1581 }
1582
1583 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1584 async fn test_failover_update_task_http() {
1585 test_failover_update_task_helper(false).await;
1586 }
1587
1588 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1589 async fn test_failover_consecutive_failures() {
1590 let anvil = Anvil::new().block_time(1).spawn();
1591
1592 let l1_options = L1ClientOptions {
1593 l1_polling_interval: Duration::from_secs(1),
1594 l1_frequent_failure_tolerance: Duration::from_millis(0),
1595 l1_consecutive_failure_tolerance: 3,
1596 ..Default::default()
1597 };
1598
1599 let provider = l1_options
1600 .connect(vec![
1601 "http://notarealurl:1234".parse().unwrap(),
1602 anvil.endpoint_url(),
1603 ])
1604 .expect("Failed to create L1 client");
1605
1606 for _ in 0..2 {
1608 provider.get_block_number().await.unwrap_err();
1609 assert!(get_failover_index(&provider) == 0);
1610 }
1611
1612 provider.get_block_number().await.unwrap_err();
1614 assert!(get_failover_index(&provider) == 1);
1615
1616 provider.get_block_number().await.unwrap();
1618 }
1619
1620 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1621 async fn test_failover_frequent_failures() {
1622 let anvil = Anvil::new().block_time(1).spawn();
1623 let provider = L1ClientOptions {
1624 l1_polling_interval: Duration::from_secs(1),
1625 l1_frequent_failure_tolerance: Duration::from_millis(100),
1626 ..Default::default()
1627 }
1628 .connect(vec![
1629 "http://notarealurl:1234".parse().unwrap(),
1630 anvil.endpoint_url(),
1631 ])
1632 .expect("Failed to create L1 client");
1633
1634 provider.get_block_number().await.unwrap_err();
1636 sleep(Duration::from_secs(1)).await;
1637 provider.get_block_number().await.unwrap_err();
1638
1639 assert!(get_failover_index(&provider) == 0);
1641
1642 sleep(Duration::from_secs(1)).await;
1644
1645 provider.get_block_number().await.unwrap_err();
1647 provider.get_block_number().await.unwrap_err();
1648 provider.get_block_number().await.unwrap();
1649 assert!(get_failover_index(&provider) == 1);
1650 }
1651
1652 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1653 async fn test_failover_revert() {
1654 let anvil = Anvil::new().block_time(1).spawn();
1655 let provider = L1ClientOptions {
1656 l1_polling_interval: Duration::from_secs(1),
1657 l1_consecutive_failure_tolerance: 1,
1658 l1_failover_revert: Duration::from_secs(2),
1659 ..Default::default()
1660 }
1661 .connect(vec![
1662 "http://notarealurl:1234".parse().unwrap(),
1663 anvil.endpoint_url(),
1664 ])
1665 .expect("Failed to create L1 client");
1666
1667 provider.get_block_number().await.unwrap_err();
1669 assert_eq!(get_failover_index(&provider), 1);
1670
1671 provider.get_block_number().await.unwrap();
1673
1674 sleep(Duration::from_millis(2100)).await;
1676 provider.get_block_number().await.unwrap_err();
1677 }
1678
1679 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1683 async fn test_update_loop_initializes_l1_state() {
1684 let anvil = Arc::new(Anvil::new().port(9988u16).spawn());
1685 let l1_client = new_l1_client(&anvil, true).await;
1686
1687 for _try in 0..10 {
1688 let mut state = l1_client.state.lock().await;
1689 let has_snapshot = state.snapshot.finalized.is_some();
1690 let has_cache = state.finalized.get(&0).is_some();
1691 drop(state);
1692 if has_snapshot && has_cache {
1693 return;
1694 }
1695 sleep(Duration::from_millis(200)).await;
1696 }
1697 panic!("L1 state of L1Client not initialized");
1698 }
1699}