1use std::{
2 cmp::{min, Ordering},
3 num::NonZeroUsize,
4 pin::Pin,
5 result::Result as StdResult,
6 sync::Arc,
7 time::Instant,
8};
9
10use alloy::{
11 eips::BlockId,
12 hex,
13 primitives::{Address, B256, U256},
14 providers::{Provider, ProviderBuilder, WsConnect},
15 rpc::{
16 client::RpcClient,
17 json_rpc::{RequestPacket, ResponsePacket},
18 types::Block,
19 },
20 transports::{http::Http, RpcError, TransportErrorKind},
21};
22use anyhow::Context;
23use async_trait::async_trait;
24use clap::Parser;
25use committable::{Commitment, Committable, RawCommitmentBuilder};
26use futures::{
27 future::{Future, TryFuture, TryFutureExt},
28 stream::{self, StreamExt},
29};
30use hotshot_contract_adapter::sol_types::FeeContract;
31use hotshot_types::traits::metrics::Metrics;
32use lru::LruCache;
33use parking_lot::RwLock;
34use tokio::{
35 spawn,
36 sync::{Mutex, MutexGuard, Notify},
37 time::{sleep, Duration},
38};
39use tower_service::Service;
40use tracing::Instrument;
41use url::Url;
42
43use super::{
44 v0_1::{L1BlockInfoWithParent, SingleTransport, SingleTransportStatus, SwitchingTransport},
45 L1BlockInfo, L1ClientMetrics, L1State, L1UpdateTask,
46};
47use crate::{FeeInfo, L1Client, L1ClientOptions, L1Event, L1Snapshot};
48
49impl PartialOrd for L1BlockInfo {
50 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
51 Some(self.cmp(other))
52 }
53}
54
55impl Ord for L1BlockInfo {
56 fn cmp(&self, other: &Self) -> Ordering {
57 self.number.cmp(&other.number)
58 }
59}
60
61impl From<&Block> for L1BlockInfo {
62 fn from(block: &Block) -> Self {
63 Self {
64 number: block.header.number,
65 timestamp: U256::from(block.header.timestamp),
66 hash: block.header.hash,
67 }
68 }
69}
70
71impl From<&Block> for L1BlockInfoWithParent {
72 fn from(block: &Block) -> Self {
73 Self {
74 info: block.into(),
75 parent_hash: block.header.parent_hash,
76 }
77 }
78}
79
80impl Committable for L1BlockInfo {
81 fn commit(&self) -> Commitment<Self> {
82 let timestamp: [u8; 32] = self.timestamp.to_le_bytes();
83
84 RawCommitmentBuilder::new(&Self::tag())
85 .u64_field("number", self.number)
86 .constant_str("timestamp")
88 .fixed_size_bytes(×tamp)
89 .constant_str("hash")
90 .fixed_size_bytes(&self.hash.0)
91 .finalize()
92 }
93
94 fn tag() -> String {
95 "L1BLOCK".into()
96 }
97}
98
99impl L1BlockInfo {
100 pub fn number(&self) -> u64 {
101 self.number
102 }
103
104 pub fn timestamp(&self) -> U256 {
105 self.timestamp
106 }
107
108 pub fn hash(&self) -> B256 {
109 self.hash
110 }
111}
112
113impl Drop for L1UpdateTask {
114 fn drop(&mut self) {
115 if let Some(task) = self.0.get_mut().take() {
116 task.abort();
117 }
118 }
119}
120
121impl Default for L1ClientOptions {
122 fn default() -> Self {
123 Self::parse_from(std::iter::empty::<String>())
124 }
125}
126
127impl L1ClientOptions {
128 pub fn with_metrics(mut self, metrics: &(impl Metrics + ?Sized)) -> Self {
130 self.metrics = Arc::new(metrics.subgroup("l1".into()));
131 self
132 }
133
134 pub fn connect(self, urls: Vec<Url>) -> anyhow::Result<L1Client> {
136 let t = SwitchingTransport::new(self, urls)
138 .with_context(|| "failed to create switching transport")?;
139 Ok(L1Client::with_transport(t))
141 }
142
143 fn rate_limit_delay(&self) -> Duration {
144 self.l1_rate_limit_delay.unwrap_or(self.l1_retry_delay)
145 }
146}
147
148impl L1ClientMetrics {
149 fn new(metrics: &(impl Metrics + ?Sized), num_urls: usize) -> Self {
150 let failures = metrics.counter_family("failed_requests".into(), vec!["provider".into()]);
152
153 let mut failure_metrics = Vec::with_capacity(num_urls);
155 for url_index in 0..num_urls {
156 failure_metrics.push(failures.create(vec![url_index.to_string()]));
157 }
158
159 Self {
160 head: metrics.create_gauge("head".into(), None).into(),
161 finalized: metrics.create_gauge("finalized".into(), None).into(),
162 reconnects: metrics
163 .create_counter("stream_reconnects".into(), None)
164 .into(),
165 failovers: metrics.create_counter("failovers".into(), None).into(),
166 failures: Arc::new(failure_metrics),
167 }
168 }
169}
170
171impl SwitchingTransport {
172 fn new(opt: L1ClientOptions, urls: Vec<Url>) -> anyhow::Result<Self> {
174 let Some(first_url) = urls.first().cloned() else {
176 return Err(anyhow::anyhow!("No valid URLs provided"));
177 };
178
179 let metrics = L1ClientMetrics::new(&**opt.metrics, urls.len());
181
182 let first_transport = Arc::new(RwLock::new(SingleTransport::new(&first_url, 0, None)));
184
185 Ok(Self {
186 urls: Arc::new(urls),
187 current_transport: first_transport,
188 opt: Arc::new(opt),
189 metrics,
190 switch_notify: Arc::new(Notify::new()),
191 })
192 }
193
194 async fn wait_switch(&self) {
196 self.switch_notify.notified().await;
197 }
198
199 fn options(&self) -> &L1ClientOptions {
200 &self.opt
201 }
202
203 fn metrics(&self) -> &L1ClientMetrics {
204 &self.metrics
205 }
206}
207
208impl SingleTransportStatus {
209 fn log_success(&mut self) {
211 self.consecutive_failures = 0;
212 }
213
214 fn log_failure(&mut self, opt: &L1ClientOptions) -> bool {
216 self.consecutive_failures += 1;
218
219 let should_switch = self.should_switch(opt);
221
222 self.last_failure = Some(Instant::now());
224
225 should_switch
227 }
228
229 fn should_switch(&mut self, opt: &L1ClientOptions) -> bool {
231 if self.shutting_down {
233 return false;
234 }
235
236 if self.consecutive_failures >= opt.l1_consecutive_failure_tolerance {
238 self.shutting_down = true;
239 return true;
240 }
241
242 let now = Instant::now();
244 if let Some(prev) = self.last_failure {
245 if now.saturating_duration_since(prev) < opt.l1_frequent_failure_tolerance {
246 self.shutting_down = true;
247 return true;
248 }
249 }
250
251 false
252 }
253
254 fn should_revert(&mut self, revert_at: Option<Instant>) -> bool {
256 if self.shutting_down {
257 return false;
259 }
260 let Some(revert_at) = revert_at else {
261 return false;
262 };
263 if Instant::now() >= revert_at {
264 self.shutting_down = true;
265 return true;
266 }
267
268 false
269 }
270}
271
272impl SingleTransport {
273 fn new(url: &Url, generation: usize, revert_at: Option<Instant>) -> Self {
275 Self {
276 generation,
277 client: Http::new(url.clone()),
278 status: Default::default(),
279 revert_at,
280 }
281 }
282}
283
284#[async_trait]
289impl Service<RequestPacket> for SwitchingTransport {
290 type Error = RpcError<TransportErrorKind>;
291 type Response = ResponsePacket;
292 type Future =
293 Pin<Box<dyn Future<Output = Result<ResponsePacket, RpcError<TransportErrorKind>>> + Send>>;
294
295 fn poll_ready(
296 &mut self,
297 cx: &mut std::task::Context<'_>,
298 ) -> std::task::Poll<StdResult<(), Self::Error>> {
299 self.current_transport.read().clone().client.poll_ready(cx)
301 }
302
303 fn call(&mut self, req: RequestPacket) -> Self::Future {
304 let self_clone = self.clone();
306
307 Box::pin(async move {
309 let mut current_transport = self_clone.current_transport.read().clone();
311
312 let should_revert = current_transport
314 .status
315 .write()
316 .should_revert(current_transport.revert_at);
317 if should_revert {
318 let n = self_clone.urls.len();
320 let prev_primary_gen = (current_transport.generation / n) * n;
322 let next_gen = prev_primary_gen + n;
324 current_transport = self_clone.switch_to(next_gen, current_transport);
325 }
326
327 if let Some(t) = current_transport.status.read().rate_limited_until {
329 if t > Instant::now() {
330 return Err(RpcError::Transport(TransportErrorKind::Custom(
332 "Rate limit exceeded".into(),
333 )));
334 } else {
335 current_transport.status.write().rate_limited_until = None;
337 }
338 }
339
340 match current_transport.client.call(req).await {
342 Ok(res) => {
343 current_transport.status.write().log_success();
345 Ok(res)
346 },
347 Err(err) => {
348 if let Some(f) = self_clone
350 .metrics
351 .failures
352 .get(current_transport.generation % self_clone.urls.len())
353 {
354 f.add(1);
355 }
356
357 if let RpcError::ErrorResp(e) = &err {
361 if e.code == 429 {
363 current_transport.status.write().rate_limited_until =
364 Some(Instant::now() + self_clone.opt.rate_limit_delay());
365 return Err(err);
366 }
367 }
368
369 tracing::warn!(?err, "L1 client error");
371
372 if current_transport
375 .status
376 .write()
377 .log_failure(&self_clone.opt)
378 {
379 self_clone.metrics.failovers.add(1);
381 self_clone.switch_to(current_transport.generation + 1, current_transport);
382 }
383
384 Err(err)
385 },
386 }
387 })
388 }
389}
390
391impl SwitchingTransport {
392 fn switch_to(&self, next_gen: usize, current_transport: SingleTransport) -> SingleTransport {
393 let next_index = next_gen % self.urls.len();
394 let url = self.urls[next_index].clone();
395 tracing::info!(%url, next_gen, "switch L1 transport");
396
397 let revert_at = if next_gen % self.urls.len() == 0 {
398 None
400 } else if current_transport.generation % self.urls.len() == 0 {
401 Some(Instant::now() + self.opt.l1_failover_revert)
404 } else {
405 current_transport.revert_at
407 };
408
409 let new_transport = SingleTransport::new(&url, next_gen, revert_at);
411
412 *self.current_transport.write() = new_transport.clone();
414
415 self.switch_notify.notify_waiters();
417
418 new_transport
419 }
420}
421
422impl L1Client {
423 fn with_transport(transport: SwitchingTransport) -> Self {
424 let rpc_client = RpcClient::new(transport.clone(), false);
426 let provider = ProviderBuilder::new().on_client(rpc_client);
427
428 let opt = transport.options().clone();
429
430 let (sender, mut receiver) = async_broadcast::broadcast(opt.l1_events_channel_capacity);
431 receiver.set_await_active(false);
432 receiver.set_overflow(true);
433
434 Self {
435 provider,
436 transport,
437 state: Arc::new(Mutex::new(L1State::new(opt.l1_blocks_cache_size))),
438 sender,
439 receiver: receiver.deactivate(),
440 update_task: Default::default(),
441 }
442 }
443
444 pub fn new(url: Vec<Url>) -> anyhow::Result<Self> {
446 L1ClientOptions::default().connect(url)
447 }
448
449 pub fn anvil(anvil: &alloy::node_bindings::AnvilInstance) -> anyhow::Result<Self> {
451 L1ClientOptions {
452 l1_ws_provider: Some(vec![anvil.ws_endpoint().parse()?]),
453 ..Default::default()
454 }
455 .connect(vec![anvil.endpoint().parse()?])
456 }
457
458 pub async fn spawn_tasks(&self) {
460 let mut update_task = self.update_task.0.lock().await;
461 if update_task.is_none() {
462 *update_task = Some(spawn(self.update_loop()));
463 }
464 }
465
466 pub async fn shut_down_tasks(&self) {
471 if let Some(update_task) = self.update_task.0.lock().await.take() {
472 update_task.abort();
473 }
474 }
475
476 fn update_loop(&self) -> impl Future<Output = ()> {
477 let opt = self.options().clone();
478 let rpc = self.provider.clone();
479 let ws_urls = opt.l1_ws_provider.clone();
480 let retry_delay = opt.l1_retry_delay;
481 let subscription_timeout = opt.subscription_timeout;
482 let state = self.state.clone();
483 let sender = self.sender.clone();
484 let metrics = self.metrics().clone();
485 let polling_interval = opt.l1_polling_interval;
486 let transport = self.transport.clone();
487
488 let span = tracing::warn_span!("L1 client update");
489
490 async move {
491
492 for i in 0.. {
493 let ws;
494
495 let l1_head = loop {
498 match rpc.get_block(BlockId::latest()).await {
499 Ok(Some(block)) => break block.header,
500 Ok(None) => {
501 tracing::info!("Failed to fetch L1 head block, will retry");
502 },
503 Err(err) => {
504 tracing::info!("Failed to fetch L1 head block, will retry: err {err}");
505 }
506 }
507 sleep(retry_delay).await;
508 };
509
510 let mut block_stream = {
512 let res = match &ws_urls {
513 Some(urls) => {
514 let provider = i % urls.len();
517 let url = &urls[provider];
518 ws = match ProviderBuilder::new().on_ws(WsConnect::new(url.clone())).await {
519 Ok(ws) => ws,
520 Err(err) => {
521 tracing::warn!(provider, "Failed to connect WebSockets provider: {err:#}");
522 sleep(retry_delay).await;
523 continue;
524 }
525 };
526 ws.subscribe_blocks().await.map(|stream| {stream::once(async { l1_head.clone() }).chain(stream.into_stream()).boxed()})
527 },
528 None => {
529 rpc
530 .watch_blocks()
531 .await
532 .map(|poller_builder| {
533 let stream = poller_builder.with_poll_interval(polling_interval).into_stream();
535
536 let rpc = rpc.clone();
537
538 stream::once(async { l1_head.clone() })
542 .chain(
543 stream.map(stream::iter).flatten().filter_map(move |hash| {
544 let rpc = rpc.clone();
545 async move {
546 match rpc.get_block(BlockId::hash(hash)).await {
547 Ok(Some(block)) => Some(block.header),
548 Ok(None) => {
551 tracing::warn!(%hash, "HTTP stream yielded a block hash that was not available");
552 None
553 }
554 Err(err) => {
555 tracing::warn!(%hash, "Error fetching block from HTTP stream: {err:#}");
556 None
557 }
558 }
559 }
560 }))
561 }.take_until(transport.wait_switch())
563 .boxed())
564 }
565 };
566 match res {
567 Ok(stream) => stream,
568 Err(err) => {
569 tracing::error!("Error subscribing to L1 blocks: {err:#}");
570 sleep(retry_delay).await;
571 continue;
572 }
573 }
574 };
575
576 tracing::info!("Established L1 block stream");
577 loop {
578 let block_timeout = tokio::time::timeout(subscription_timeout, block_stream.next()).await;
580 match block_timeout {
581 Ok(Some(head)) => {
583 let head = head.number;
584 tracing::debug!(head, "Received L1 block");
585
586 let finalized = loop {
589 match fetch_finalized_block_from_rpc(&rpc).await {
590 Ok(finalized) => break finalized,
591 Err(err) => {
592 tracing::warn!("Error getting finalized block: {err:#}");
593 sleep(retry_delay).await;
594 }
595 }
596 };
597
598 let mut state = state.lock().await;
600 if head > state.snapshot.head {
601 tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated");
602 metrics.head.set(head as usize);
603 state.snapshot.head = head;
604 sender
607 .broadcast_direct(L1Event::NewHead { head })
608 .await
609 .ok();
610 }
611 if let Some(finalized) = finalized {
612 if Some(finalized.info) > state.snapshot.finalized {
613 tracing::info!(
614 ?finalized,
615 old_finalized = ?state.snapshot.finalized,
616 "L1 finalized updated",
617 );
618 metrics.finalized.set(finalized.info.number as usize);
619 state.snapshot.finalized = Some(finalized.info);
620 state.put_finalized(finalized);
621 sender
622 .broadcast_direct(L1Event::NewFinalized { finalized })
623 .await
624 .ok();
625 }
626 }
627 tracing::debug!("Updated L1 snapshot to {:?}", state.snapshot);
628 }
629 Ok(None) => {
631 tracing::error!("L1 block stream ended unexpectedly, trying to re-establish block stream");
632 break;
633 }
634 Err(_) => {
636 tracing::error!("No block received for {} seconds, trying to re-establish block stream", subscription_timeout.as_secs());
637 break;
638 }
639 }
640 }
641
642 metrics.reconnects.add(1);
643 }
644 }.instrument(span)
645 }
646
647 pub async fn snapshot(&self) -> L1Snapshot {
649 self.state.lock().await.snapshot
650 }
651
652 pub async fn wait_for_block(&self, number: u64) {
658 loop {
659 let mut events = self.receiver.activate_cloned();
662
663 {
665 let state = self.state.lock().await;
666 if state.snapshot.head >= number {
667 return;
668 }
669 tracing::info!(number, head = state.snapshot.head, "Waiting for l1 block");
670 }
671
672 while let Some(event) = events.next().await {
674 let L1Event::NewHead { head } = event else {
675 continue;
676 };
677 if head >= number {
678 tracing::info!(number, head, "Got L1 block");
679 return;
680 }
681 tracing::debug!(number, head, "Waiting for L1 block");
682 }
683
684 tracing::warn!(number, "L1 event stream ended unexpectedly; retry");
686 self.retry_delay().await;
687 }
688 }
689
690 pub async fn wait_for_finalized_block(&self, number: u64) -> L1BlockInfo {
695 loop {
696 let mut events = self.receiver.activate_cloned();
699
700 {
702 let state = self.state.lock().await;
703 if let Some(finalized) = state.snapshot.finalized {
704 if finalized.number >= number {
705 return self.fetch_finalized_block_by_number(state, number).await.1;
706 }
707 tracing::info!(
708 number,
709 finalized = ?state.snapshot.finalized,
710 "waiting for l1 finalized block",
711 );
712 };
713 }
714
715 while let Some(event) = events.next().await {
717 let L1Event::NewFinalized { finalized } = event else {
718 continue;
719 };
720 let mut state = self.state.lock().await;
721 state.put_finalized(finalized);
722 if finalized.info.number >= number {
723 tracing::info!(number, ?finalized, "got finalized L1 block");
724 return self.fetch_finalized_block_by_number(state, number).await.1;
725 }
726 tracing::debug!(number, ?finalized, "waiting for finalized L1 block");
727 }
728
729 tracing::warn!(number, "L1 event stream ended unexpectedly; retry",);
731 self.retry_delay().await;
732 }
733 }
734
735 pub async fn wait_for_finalized_block_with_timestamp(&self, timestamp: U256) -> L1BlockInfo {
738 let (mut state, mut block) = 'outer: loop {
740 let mut events = self.receiver.activate_cloned();
743
744 {
746 let state = self.state.lock().await;
747 if let Some(finalized) = state.snapshot.finalized {
748 if finalized.timestamp >= timestamp {
749 break 'outer (state, finalized);
750 }
751 }
752 tracing::info!(
753 %timestamp,
754 finalized = ?state.snapshot.finalized,
755 "waiting for L1 finalized block",
756 );
757 }
758
759 while let Some(event) = events.next().await {
761 let L1Event::NewFinalized { finalized } = event else {
762 continue;
763 };
764 if finalized.info.timestamp >= timestamp {
765 tracing::info!(%timestamp, ?finalized, "got finalized block");
766 break 'outer (self.state.lock().await, finalized.info);
767 }
768 tracing::debug!(%timestamp, ?finalized, "waiting for L1 finalized block");
769 }
770
771 tracing::warn!(%timestamp, "L1 event stream ended unexpectedly; retry",);
773 self.retry_delay().await;
774 };
775
776 let mut upper_bound = block.number;
785 let mut lower_bound = 0;
786 while lower_bound < upper_bound {
787 let midpoint = (upper_bound + lower_bound) / 2;
788 tracing::debug!(
789 lower_bound,
790 midpoint,
791 upper_bound,
792 %timestamp,
793 ?block,
794 "searching for earliest block with sufficient timestamp"
795 );
796
797 let (state_lock, midpoint_block) =
798 self.fetch_finalized_block_by_number(state, midpoint).await;
799 state = state_lock;
800
801 tracing::debug!(?midpoint_block, %timestamp, "pivot on midpoint block");
802 if midpoint_block.timestamp < timestamp {
803 lower_bound = midpoint + 1;
804 } else {
805 upper_bound = midpoint;
806 block = midpoint_block;
807 }
808 }
809
810 block
811 }
812
813 async fn fetch_finalized_block_by_number<'a>(
814 &'a self,
815 mut state: MutexGuard<'a, L1State>,
816 number: u64,
817 ) -> (MutexGuard<'a, L1State>, L1BlockInfo) {
818 let latest_finalized = state
819 .snapshot
820 .finalized
821 .expect("get_finalized_block called before any blocks are finalized");
822 assert!(
823 number <= latest_finalized.number,
824 "requesting a finalized block {number} that isn't finalized; snapshot: {:?}",
825 state.snapshot,
826 );
827
828 if let Some(safety_margin) = self.options().l1_finalized_safety_margin {
829 if number < latest_finalized.number.saturating_sub(safety_margin) {
830 tracing::debug!(
836 number,
837 ?latest_finalized,
838 "skipping hash check for old finalized block"
839 );
840 let (state, block) = self
841 .load_and_cache_finalized_block(state, number.into())
842 .await;
843 return (state, block.info);
844 }
845 }
846
847 let mut successor_number = number;
853 let mut successor = loop {
854 if let Some(block) = state.finalized.get(&successor_number) {
855 break *block;
856 }
857 successor_number += 1;
858 if successor_number > latest_finalized.number {
859 drop(state);
863 let block = loop {
864 match fetch_finalized_block_from_rpc(&self.provider).await {
865 Ok(Some(block)) => {
866 break block;
867 },
868 Ok(None) => {
869 tracing::warn!("no finalized block even though finalized snapshot is Some; this can be caused by an L1 client failover");
870 self.retry_delay().await;
871 },
872 Err(err) => {
873 tracing::warn!("Error getting finalized block: {err:#}");
874 self.retry_delay().await;
875 },
876 }
877 };
878 state = self.state.lock().await;
879 state.put_finalized(block);
880 break block;
881 }
882 };
883
884 while successor.info.number > number {
887 tracing::debug!(
888 number,
889 ?successor,
890 "checking hash chaining for finalized block"
891 );
892 (state, successor) = self
893 .load_and_cache_finalized_block(state, successor.parent_hash.into())
894 .await;
895 }
896
897 (state, successor.info)
898 }
899
900 async fn load_and_cache_finalized_block<'a>(
901 &'a self,
902 mut state: MutexGuard<'a, L1State>,
903 id: BlockId,
904 ) -> (MutexGuard<'a, L1State>, L1BlockInfoWithParent) {
905 drop(state);
907 let block = loop {
908 let block = match self.provider.get_block(id).await {
909 Ok(Some(block)) => block,
910 Ok(None) => {
911 tracing::warn!(
912 %id,
913 "provider error: finalized L1 block should always be available"
914 );
915 self.retry_delay().await;
916 continue;
917 },
918 Err(err) => {
919 tracing::warn!(%id, "failed to get finalized L1 block: {err:#}");
920 self.retry_delay().await;
921 continue;
922 },
923 };
924 break (&block).into();
925 };
926 state = self.state.lock().await;
927 state.put_finalized(block);
928 (state, block)
929 }
930
931 pub async fn get_finalized_deposits(
934 &self,
935 fee_contract_address: Address,
936 prev_finalized: Option<u64>,
937 new_finalized: u64,
938 ) -> Vec<FeeInfo> {
939 if prev_finalized >= Some(new_finalized) {
942 return vec![];
943 }
944
945 let opt = self.options();
946
947 let prev = prev_finalized.map(|prev| prev + 1).unwrap_or(0);
950
951 let mut start = prev;
954 let end = new_finalized;
955 let chunk_size = opt.l1_events_max_block_range;
956 let chunks = std::iter::from_fn(move || {
957 let chunk_end = min(start + chunk_size - 1, end);
958 if chunk_end < start {
959 return None;
960 }
961
962 let chunk = (start, chunk_end);
963 start = chunk_end + 1;
964 Some(chunk)
965 });
966
967 let events = stream::iter(chunks).then(|(from, to)| {
969 let retry_delay = opt.l1_retry_delay;
970 let fee_contract = FeeContract::new(fee_contract_address, self.provider.clone());
971 async move {
972 tracing::debug!(from, to, "fetch events in range");
973
974 loop {
976 match fee_contract
977 .Deposit_filter()
978 .address(*fee_contract.address())
979 .from_block(from)
980 .to_block(to)
981 .query()
982 .await
983 {
984 Ok(events) => break stream::iter(events),
985 Err(err) => {
986 tracing::warn!(from, to, %err, "Fee L1Event Error");
987 sleep(retry_delay).await;
988 },
989 }
990 }
991 }
992 });
993 events
994 .flatten()
995 .map(|(deposit, _)| FeeInfo::from(deposit))
996 .collect()
997 .await
998 }
999
1000 pub async fn is_proxy_contract(&self, proxy_address: Address) -> anyhow::Result<bool> {
1002 let hex_bytes =
1005 hex::decode("360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc")
1006 .expect("Failed to decode hex string");
1007 let implementation_slot = B256::from_slice(&hex_bytes);
1008 let storage = self
1009 .provider
1010 .get_storage_at(proxy_address, implementation_slot.into())
1011 .await?;
1012
1013 let implementation_address = Address::from_slice(&storage.to_be_bytes::<32>()[12..]);
1014
1015 Ok(implementation_address != Address::ZERO)
1017 }
1018
1019 pub async fn retry_on_all_providers<Fut>(
1020 &self,
1021 op: impl Fn() -> Fut,
1022 ) -> Result<Fut::Ok, Fut::Error>
1023 where
1024 Fut: TryFuture,
1025 {
1026 let transport = &self.transport;
1027 let start = transport.current_transport.read().generation % transport.urls.len();
1028 let end = start + transport.urls.len();
1029 loop {
1030 match op().into_future().await {
1031 Ok(res) => return Ok(res),
1032 Err(err) => {
1033 if transport.current_transport.read().generation >= end {
1034 return Err(err);
1035 } else {
1036 self.retry_delay().await;
1037 }
1038 },
1039 }
1040 }
1041 }
1042
1043 pub(crate) fn options(&self) -> &L1ClientOptions {
1044 self.transport.options()
1045 }
1046
1047 fn metrics(&self) -> &L1ClientMetrics {
1048 self.transport.metrics()
1049 }
1050
1051 async fn retry_delay(&self) {
1052 sleep(self.options().l1_retry_delay).await;
1053 }
1054}
1055
1056impl L1State {
1057 fn new(cache_size: NonZeroUsize) -> Self {
1058 Self {
1059 snapshot: Default::default(),
1060 finalized: LruCache::new(cache_size),
1061 last_finalized: None,
1062 }
1063 }
1064
1065 fn put_finalized(&mut self, block: L1BlockInfoWithParent) {
1066 assert!(
1067 self.snapshot.finalized.is_some()
1068 && block.info.number <= self.snapshot.finalized.unwrap().number,
1069 "inserting a finalized block {block:?} that isn't finalized; snapshot: {:?}",
1070 self.snapshot,
1071 );
1072
1073 if Some(block.info.number()) > self.last_finalized {
1074 self.last_finalized = Some(block.info.number());
1075 }
1076
1077 if let Some((old_number, old_block)) = self.finalized.push(block.info.number, block) {
1078 if old_number == block.info.number && block != old_block {
1079 tracing::error!(
1080 ?old_block,
1081 ?block,
1082 "got different info for the same finalized height; something has gone very wrong with the L1",
1083 );
1084 }
1085 }
1086 }
1087}
1088
1089async fn fetch_finalized_block_from_rpc(
1090 rpc: &impl Provider,
1091) -> anyhow::Result<Option<L1BlockInfoWithParent>> {
1092 let Some(block) = rpc.get_block(BlockId::finalized()).await? else {
1093 tracing::warn!("no finalized block yet");
1098 return Ok(None);
1099 };
1100
1101 Ok(Some((&block).into()))
1102}
1103
1104#[cfg(test)]
1105mod test {
1106 use std::{ops::Add, time::Duration};
1107
1108 use alloy::{
1109 eips::BlockNumberOrTag,
1110 node_bindings::{Anvil, AnvilInstance},
1111 primitives::utils::parse_ether,
1112 providers::layers::AnvilProvider,
1113 };
1114 use espresso_contract_deployer::{deploy_fee_contract_proxy, Contracts};
1115 use portpicker::pick_unused_port;
1116 use sequencer_utils::test_utils::setup_test;
1117 use time::OffsetDateTime;
1118
1119 use super::*;
1120
1121 async fn new_l1_client_opt(
1122 anvil: &Arc<AnvilInstance>,
1123 f: impl FnOnce(&mut L1ClientOptions),
1124 ) -> L1Client {
1125 let mut opt = L1ClientOptions {
1126 l1_events_max_block_range: 1,
1127 l1_polling_interval: Duration::from_secs(1),
1128 subscription_timeout: Duration::from_secs(5),
1129 ..Default::default()
1130 };
1131 f(&mut opt);
1132
1133 let l1_client = opt
1134 .connect(vec![anvil.endpoint_url()])
1135 .expect("Failed to create L1 client");
1136
1137 l1_client.spawn_tasks().await;
1138 l1_client
1139 }
1140
1141 async fn new_l1_client(anvil: &Arc<AnvilInstance>, include_ws: bool) -> L1Client {
1142 new_l1_client_opt(anvil, |opt| {
1143 if include_ws {
1144 opt.l1_ws_provider = Some(vec![anvil.ws_endpoint_url()]);
1145 }
1146 })
1147 .await
1148 }
1149
1150 #[tokio::test(flavor = "multi_thread")]
1151 async fn test_get_finalized_deposits() -> anyhow::Result<()> {
1152 setup_test();
1153 let num_deposits = 5;
1154
1155 let anvil = Anvil::new().spawn();
1156 let wallet = anvil.wallet().unwrap();
1157 let deployer = wallet.default_signer().address();
1158 let inner_provider = ProviderBuilder::new()
1159 .wallet(wallet)
1160 .on_http(anvil.endpoint_url());
1161 let provider = AnvilProvider::new(inner_provider, Arc::new(anvil));
1163 let mut contracts = Contracts::new();
1165
1166 let l1_client = new_l1_client(provider.anvil(), false).await;
1168
1169 let fee_proxy_addr = deploy_fee_contract_proxy(&provider, &mut contracts, deployer).await?;
1171 let fee_proxy = FeeContract::new(fee_proxy_addr, &provider);
1172 let num_tx_for_deploy = provider.get_block_number().await?;
1173
1174 for n in 1..=num_deposits {
1176 let amount = n as f32 / 10.0;
1178 let receipt = fee_proxy
1179 .deposit(deployer)
1180 .value(parse_ether(&amount.to_string())?)
1181 .send()
1182 .await?
1183 .get_receipt()
1184 .await?;
1185 assert!(receipt.inner.is_success());
1186 }
1187
1188 let cur_height = provider.get_block_number().await?;
1189 assert_eq!(num_deposits + num_tx_for_deploy, cur_height);
1190
1191 let pending = l1_client
1195 .get_finalized_deposits(fee_proxy_addr, None, cur_height)
1196 .await;
1197
1198 assert_eq!(num_deposits as usize, pending.len(), "{pending:?}");
1199 assert_eq!(deployer, pending[0].account().0);
1200 assert_eq!(
1201 U256::from(1500000000000000000u64),
1202 pending
1203 .iter()
1204 .fold(U256::from(0), |total, info| total.add(info.amount().0))
1205 );
1206
1207 let pending = l1_client
1209 .get_finalized_deposits(fee_proxy_addr, Some(0), cur_height)
1210 .await;
1211 assert_eq!(num_deposits as usize, pending.len());
1212
1213 let pending = l1_client
1214 .get_finalized_deposits(fee_proxy_addr, Some(0), 0)
1215 .await;
1216 assert_eq!(0, pending.len());
1217
1218 let pending = l1_client
1219 .get_finalized_deposits(fee_proxy_addr, Some(0), 1)
1220 .await;
1221 assert_eq!(0, pending.len());
1222
1223 let pending = l1_client
1224 .get_finalized_deposits(fee_proxy_addr, Some(num_tx_for_deploy), num_tx_for_deploy)
1225 .await;
1226 assert_eq!(0, pending.len());
1227
1228 let pending = l1_client
1229 .get_finalized_deposits(
1230 fee_proxy_addr,
1231 Some(num_tx_for_deploy),
1232 num_tx_for_deploy + 1,
1233 )
1234 .await;
1235 assert_eq!(1, pending.len());
1236
1237 let pending = l1_client
1239 .get_finalized_deposits(fee_proxy_addr, Some(num_tx_for_deploy), 0)
1240 .await;
1241 assert_eq!(0, pending.len());
1242
1243 Ok(())
1244 }
1245
1246 async fn test_wait_for_finalized_block_helper(ws: bool) {
1247 setup_test();
1248
1249 let anvil = Arc::new(Anvil::new().block_time_f64(0.1).spawn());
1250 let l1_client = new_l1_client(&anvil, ws).await;
1251 let provider = &l1_client.provider;
1252
1253 let block_height = provider.get_block_number().await.unwrap();
1255 let block = l1_client.wait_for_finalized_block(block_height + 10).await;
1256 assert_eq!(block.number, block_height + 10);
1257
1258 let true_block = provider
1260 .get_block(BlockId::Number(BlockNumberOrTag::Number(block_height + 10)))
1261 .full()
1262 .await
1263 .unwrap()
1264 .unwrap();
1265
1266 assert_eq!(
1267 block.timestamp.to::<u64>(),
1268 true_block.header.inner.timestamp
1269 );
1270 assert_eq!(block.hash, true_block.header.hash);
1271 }
1272
1273 #[tokio::test(flavor = "multi_thread")]
1274 async fn test_wait_for_finalized_block_ws() {
1275 test_wait_for_finalized_block_helper(true).await
1276 }
1277
1278 #[tokio::test(flavor = "multi_thread")]
1279 async fn test_wait_for_finalized_block_http() {
1280 test_wait_for_finalized_block_helper(false).await
1281 }
1282
1283 async fn test_wait_for_old_finalized_block_helper(ws: bool) {
1284 setup_test();
1285
1286 let anvil = Arc::new(Anvil::new().block_time_f64(0.2).spawn());
1287 let l1_client = new_l1_client_opt(&anvil, |opt| {
1288 if ws {
1289 opt.l1_ws_provider = Some(vec![anvil.ws_endpoint_url()]);
1290 }
1291 opt.l1_finalized_safety_margin = Some(1);
1292 })
1293 .await;
1294 let provider = &l1_client.provider;
1295
1296 l1_client.wait_for_finalized_block(2).await;
1298
1299 let block = l1_client.wait_for_finalized_block(0).await;
1301
1302 let true_block = provider.get_block(0.into()).await.unwrap().unwrap();
1304 assert_eq!(block.hash, true_block.header.hash);
1305 }
1306
1307 #[tokio::test(flavor = "multi_thread")]
1308 async fn test_wait_for_old_finalized_block_ws() {
1309 test_wait_for_old_finalized_block_helper(true).await
1310 }
1311
1312 #[tokio::test(flavor = "multi_thread")]
1313 async fn test_wait_for_old_finalized_block_http() {
1314 test_wait_for_old_finalized_block_helper(false).await
1315 }
1316
1317 async fn test_wait_for_finalized_block_by_timestamp_helper(ws: bool) {
1318 setup_test();
1319
1320 let anvil = Arc::new(Anvil::new().block_time_f64(0.2).spawn());
1321 let l1_client = new_l1_client(&anvil, ws).await;
1322 let provider = &l1_client.provider;
1323
1324 let timestamp = U256::from(OffsetDateTime::now_utc().unix_timestamp() as u64 + 5);
1326 let block = l1_client
1327 .wait_for_finalized_block_with_timestamp(timestamp)
1328 .await;
1329 assert!(
1330 block.timestamp >= timestamp,
1331 "wait_for_finalized_block_with_timestamp({timestamp}) returned too early a block: {block:?}",
1332 );
1333 let parent = provider
1334 .get_block(BlockId::Number(BlockNumberOrTag::Number(block.number - 1)))
1335 .full()
1336 .await
1337 .unwrap()
1338 .unwrap();
1339 assert!(
1340 parent.header.inner.timestamp < timestamp.to::<u64>(),
1341 "wait_for_finalized_block_with_timestamp({timestamp}) did not return the earliest possible block: returned {block:?}, but earlier block {parent:?} has an acceptable timestamp too",
1342 );
1343
1344 let true_block = provider
1346 .get_block(BlockId::Number(BlockNumberOrTag::Number(block.number)))
1347 .await
1348 .unwrap()
1349 .unwrap();
1350 assert_eq!(
1351 block.timestamp.to::<u64>(),
1352 true_block.header.inner.timestamp
1353 );
1354 assert_eq!(block.hash, true_block.header.hash);
1355 }
1356
1357 #[tokio::test(flavor = "multi_thread")]
1358 async fn test_wait_for_finalized_block_by_timestamp_ws() {
1359 test_wait_for_finalized_block_by_timestamp_helper(true).await
1360 }
1361
1362 #[tokio::test(flavor = "multi_thread")]
1363 async fn test_wait_for_finalized_block_by_timestamp_http() {
1364 test_wait_for_finalized_block_by_timestamp_helper(false).await
1365 }
1366
1367 async fn test_wait_for_old_finalized_block_by_timestamp_helper(ws: bool) {
1368 setup_test();
1369
1370 let anvil = Arc::new(Anvil::new().block_time_f64(0.2).spawn());
1371 let l1_client = new_l1_client(&anvil, ws).await;
1372
1373 let true_block = l1_client.wait_for_finalized_block(0).await;
1375 let timestamp = true_block.timestamp;
1376
1377 l1_client.wait_for_finalized_block(10).await;
1379
1380 let block = l1_client
1382 .wait_for_finalized_block_with_timestamp(U256::from(timestamp))
1383 .await;
1384 assert_eq!(block, true_block);
1385 }
1386
1387 #[tokio::test(flavor = "multi_thread")]
1388 async fn test_wait_for_old_finalized_block_by_timestamp_ws() {
1389 test_wait_for_old_finalized_block_by_timestamp_helper(true).await
1390 }
1391
1392 #[tokio::test(flavor = "multi_thread")]
1393 async fn test_wait_for_old_finalized_block_by_timestamp_http() {
1394 test_wait_for_old_finalized_block_by_timestamp_helper(false).await
1395 }
1396
1397 async fn test_wait_for_block_helper(ws: bool) {
1398 setup_test();
1399
1400 let anvil = Arc::new(Anvil::new().block_time_f64(0.1).spawn());
1401 let l1_client = new_l1_client(&anvil, ws).await;
1402 let provider = &l1_client.provider;
1403
1404 let block_height = provider.get_block_number().await.unwrap();
1406 l1_client.wait_for_block(block_height + 10).await;
1407
1408 let new_block_height = provider.get_block_number().await.unwrap();
1409 assert!(
1410 new_block_height >= block_height + 10,
1411 "wait_for_block returned too early; initial height = {block_height}, new height = {new_block_height}",
1412 );
1413 }
1414
1415 #[tokio::test(flavor = "multi_thread")]
1416 async fn test_wait_for_block_ws() {
1417 test_wait_for_block_helper(true).await
1418 }
1419
1420 #[tokio::test(flavor = "multi_thread")]
1421 async fn test_wait_for_block_http() {
1422 test_wait_for_block_helper(false).await
1423 }
1424
1425 async fn test_reconnect_update_task_helper(ws: bool) {
1426 setup_test();
1427
1428 let port = pick_unused_port().unwrap();
1429 let anvil = Arc::new(Anvil::new().block_time(1).port(port).spawn());
1430 let client = new_l1_client(&anvil, ws).await;
1431
1432 let initial_state = client.snapshot().await;
1433 tracing::info!(?initial_state, "initial state");
1434
1435 let mut retry = 0;
1437 let updated_state = loop {
1438 assert!(retry < 10, "state did not update in time");
1439
1440 let updated_state = client.snapshot().await;
1441 if updated_state.head > initial_state.head {
1442 break updated_state;
1443 }
1444 tracing::info!(retry, "waiting for state update");
1445 sleep(Duration::from_secs(1)).await;
1446 retry += 1;
1447 };
1448 tracing::info!(?updated_state, "state updated");
1449
1450 drop(anvil);
1454
1455 tracing::info!("sleep 5");
1460 sleep(Duration::from_secs(5)).await;
1461
1462 tracing::info!("restarting L1");
1464 let _anvil = Anvil::new().block_time(1).port(port).spawn();
1465
1466 let mut retry = 0;
1467 let final_state = loop {
1468 assert!(retry < 5, "state did not update in time");
1469
1470 let final_state = client.snapshot().await;
1471 if final_state.head > updated_state.head {
1472 break final_state;
1473 }
1474 tracing::info!(retry, "waiting for state update");
1475 sleep(Duration::from_secs(1)).await;
1476 retry += 1;
1477 };
1478 tracing::info!(?final_state, "state updated");
1479 }
1480
1481 #[tokio::test(flavor = "multi_thread")]
1482 async fn test_reconnect_update_task_ws() {
1483 test_reconnect_update_task_helper(true).await
1484 }
1485
1486 #[tokio::test(flavor = "multi_thread")]
1487 async fn test_reconnect_update_task_http() {
1488 test_reconnect_update_task_helper(false).await
1489 }
1490
1491 fn get_failover_index(provider: &L1Client) -> usize {
1532 let transport = &provider.transport;
1533 provider.transport.current_transport.read().generation % transport.urls.len()
1534 }
1535
1536 async fn test_failover_update_task_helper(ws: bool) {
1537 setup_test();
1538
1539 let anvil = Anvil::new().block_time(1).spawn();
1540
1541 let client = L1ClientOptions {
1544 l1_polling_interval: Duration::from_secs(1),
1545 subscription_timeout: Duration::from_secs(1000),
1548 l1_ws_provider: if ws {
1549 Some(vec![
1550 "ws://notarealurl:1234".parse().unwrap(),
1551 anvil.ws_endpoint_url(),
1552 ])
1553 } else {
1554 None
1555 },
1556 ..Default::default()
1557 }
1558 .connect(vec![
1559 "http://notarealurl:1234".parse().unwrap(),
1560 anvil.endpoint_url(),
1561 ])
1562 .expect("Failed to create L1 client");
1563
1564 client.spawn_tasks().await;
1565
1566 let initial_state = client.snapshot().await;
1567 tracing::info!(?initial_state, "initial state");
1568
1569 let mut retry = 0;
1571 let updated_state = loop {
1572 assert!(retry < 10, "state did not update in time");
1573
1574 let updated_state = client.snapshot().await;
1575 if updated_state.head > initial_state.head {
1576 break updated_state;
1577 }
1578 tracing::info!(retry, "waiting for state update");
1579 sleep(Duration::from_secs(1)).await;
1580 retry += 1;
1581 };
1582 tracing::info!(?updated_state, "state updated");
1583 }
1584
1585 #[tokio::test(flavor = "multi_thread")]
1586 async fn test_failover_update_task_ws() {
1587 test_failover_update_task_helper(true).await;
1588 }
1589
1590 #[tokio::test(flavor = "multi_thread")]
1591 async fn test_failover_update_task_http() {
1592 test_failover_update_task_helper(false).await;
1593 }
1594
1595 #[tokio::test(flavor = "multi_thread")]
1596 async fn test_failover_consecutive_failures() {
1597 setup_test();
1598
1599 let anvil = Anvil::new().block_time(1).spawn();
1600
1601 let l1_options = L1ClientOptions {
1602 l1_polling_interval: Duration::from_secs(1),
1603 l1_frequent_failure_tolerance: Duration::from_millis(0),
1604 l1_consecutive_failure_tolerance: 3,
1605 ..Default::default()
1606 };
1607
1608 let provider = l1_options
1609 .connect(vec![
1610 "http://notarealurl:1234".parse().unwrap(),
1611 anvil.endpoint_url(),
1612 ])
1613 .expect("Failed to create L1 client");
1614
1615 for _ in 0..2 {
1617 provider.get_block_number().await.unwrap_err();
1618 assert!(get_failover_index(&provider) == 0);
1619 }
1620
1621 provider.get_block_number().await.unwrap_err();
1623 assert!(get_failover_index(&provider) == 1);
1624
1625 provider.get_block_number().await.unwrap();
1627 }
1628
1629 #[tokio::test(flavor = "multi_thread")]
1630 async fn test_failover_frequent_failures() {
1631 setup_test();
1632
1633 let anvil = Anvil::new().block_time(1).spawn();
1634 let provider = L1ClientOptions {
1635 l1_polling_interval: Duration::from_secs(1),
1636 l1_frequent_failure_tolerance: Duration::from_millis(100),
1637 ..Default::default()
1638 }
1639 .connect(vec![
1640 "http://notarealurl:1234".parse().unwrap(),
1641 anvil.endpoint_url(),
1642 ])
1643 .expect("Failed to create L1 client");
1644
1645 provider.get_block_number().await.unwrap_err();
1647 sleep(Duration::from_secs(1)).await;
1648 provider.get_block_number().await.unwrap_err();
1649
1650 assert!(get_failover_index(&provider) == 0);
1652
1653 sleep(Duration::from_secs(1)).await;
1655
1656 provider.get_block_number().await.unwrap_err();
1658 provider.get_block_number().await.unwrap_err();
1659 provider.get_block_number().await.unwrap();
1660 assert!(get_failover_index(&provider) == 1);
1661 }
1662
1663 #[tokio::test(flavor = "multi_thread")]
1664 async fn test_failover_revert() {
1665 setup_test();
1666
1667 let anvil = Anvil::new().block_time(1).spawn();
1668 let provider = L1ClientOptions {
1669 l1_polling_interval: Duration::from_secs(1),
1670 l1_consecutive_failure_tolerance: 1,
1671 l1_failover_revert: Duration::from_secs(2),
1672 ..Default::default()
1673 }
1674 .connect(vec![
1675 "http://notarealurl:1234".parse().unwrap(),
1676 anvil.endpoint_url(),
1677 ])
1678 .expect("Failed to create L1 client");
1679
1680 provider.get_block_number().await.unwrap_err();
1682 assert_eq!(get_failover_index(&provider), 1);
1683
1684 provider.get_block_number().await.unwrap();
1686
1687 sleep(Duration::from_millis(2100)).await;
1689 provider.get_block_number().await.unwrap_err();
1690 }
1691
1692 #[tokio::test(flavor = "multi_thread")]
1696 async fn test_update_loop_initializes_l1_state() {
1697 setup_test();
1698 let anvil = Arc::new(Anvil::new().port(9988u16).spawn());
1699 let l1_client = new_l1_client(&anvil, true).await;
1700
1701 for _try in 0..10 {
1702 let mut state = l1_client.state.lock().await;
1703 let has_snapshot = state.snapshot.finalized.is_some();
1704 let has_cache = state.finalized.get(&0).is_some();
1705 drop(state);
1706 if has_snapshot && has_cache {
1707 return;
1708 }
1709 sleep(Duration::from_millis(200)).await;
1710 }
1711 panic!("L1 state of L1Client not initialized");
1712 }
1713}