1use std::{
2 cmp::{min, Ordering},
3 num::NonZeroUsize,
4 pin::Pin,
5 result::Result as StdResult,
6 sync::Arc,
7 time::Instant,
8};
9
10use alloy::{
11 eips::BlockId,
12 hex,
13 primitives::{Address, B256, U256},
14 providers::{Provider, ProviderBuilder, WsConnect},
15 rpc::{
16 client::RpcClient,
17 json_rpc::{RequestPacket, ResponsePacket},
18 types::Block,
19 },
20 transports::{http::Http, RpcError, TransportErrorKind},
21};
22use anyhow::Context;
23use async_trait::async_trait;
24use clap::Parser;
25use committable::{Commitment, Committable, RawCommitmentBuilder};
26use futures::{
27 future::{Future, TryFuture, TryFutureExt},
28 stream::{self, StreamExt},
29};
30use hotshot_contract_adapter::sol_types::FeeContract;
31use hotshot_types::traits::metrics::Metrics;
32use lru::LruCache;
33use parking_lot::RwLock;
34use tokio::{
35 spawn,
36 sync::{Mutex, MutexGuard, Notify},
37 time::{sleep, Duration},
38};
39use tower_service::Service;
40use tracing::Instrument;
41use url::Url;
42
43use super::{
44 v0_1::{L1BlockInfoWithParent, SingleTransport, SingleTransportStatus, SwitchingTransport},
45 L1BlockInfo, L1ClientMetrics, L1State, L1UpdateTask,
46};
47use crate::{FeeInfo, L1Client, L1ClientOptions, L1Event, L1Snapshot};
48
49impl PartialOrd for L1BlockInfo {
50 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
51 Some(self.cmp(other))
52 }
53}
54
55impl Ord for L1BlockInfo {
56 fn cmp(&self, other: &Self) -> Ordering {
57 self.number.cmp(&other.number)
58 }
59}
60
61impl From<&Block> for L1BlockInfo {
62 fn from(block: &Block) -> Self {
63 Self {
64 number: block.header.number,
65 timestamp: U256::from(block.header.timestamp),
66 hash: block.header.hash,
67 }
68 }
69}
70
71impl From<&Block> for L1BlockInfoWithParent {
72 fn from(block: &Block) -> Self {
73 Self {
74 info: block.into(),
75 parent_hash: block.header.parent_hash,
76 }
77 }
78}
79
80impl Committable for L1BlockInfo {
81 fn commit(&self) -> Commitment<Self> {
82 let timestamp: [u8; 32] = self.timestamp.to_le_bytes();
83
84 RawCommitmentBuilder::new(&Self::tag())
85 .u64_field("number", self.number)
86 .constant_str("timestamp")
88 .fixed_size_bytes(×tamp)
89 .constant_str("hash")
90 .fixed_size_bytes(&self.hash.0)
91 .finalize()
92 }
93
94 fn tag() -> String {
95 "L1BLOCK".into()
96 }
97}
98
99impl L1BlockInfo {
100 pub fn number(&self) -> u64 {
101 self.number
102 }
103
104 pub fn timestamp(&self) -> U256 {
105 self.timestamp
106 }
107
108 pub fn hash(&self) -> B256 {
109 self.hash
110 }
111}
112
113impl Drop for L1UpdateTask {
114 fn drop(&mut self) {
115 if let Some(task) = self.0.get_mut().take() {
116 task.abort();
117 }
118 }
119}
120
121impl Default for L1ClientOptions {
122 fn default() -> Self {
123 Self::parse_from(std::iter::empty::<String>())
124 }
125}
126
127impl L1ClientOptions {
128 pub fn with_metrics(mut self, metrics: &(impl Metrics + ?Sized)) -> Self {
130 self.metrics = Arc::new(metrics.subgroup("l1".into()));
131 self
132 }
133
134 pub fn connect(self, urls: Vec<Url>) -> anyhow::Result<L1Client> {
136 let t = SwitchingTransport::new(self, urls)
138 .with_context(|| "failed to create switching transport")?;
139 Ok(L1Client::with_transport(t))
141 }
142
143 fn rate_limit_delay(&self) -> Duration {
144 self.l1_rate_limit_delay.unwrap_or(self.l1_retry_delay)
145 }
146}
147
148impl L1ClientMetrics {
149 fn new(metrics: &(impl Metrics + ?Sized), num_urls: usize) -> Self {
150 let failures = metrics.counter_family("failed_requests".into(), vec!["provider".into()]);
152
153 let mut failure_metrics = Vec::with_capacity(num_urls);
155 for url_index in 0..num_urls {
156 failure_metrics.push(failures.create(vec![url_index.to_string()]));
157 }
158
159 Self {
160 head: metrics.create_gauge("head".into(), None).into(),
161 finalized: metrics.create_gauge("finalized".into(), None).into(),
162 reconnects: metrics
163 .create_counter("stream_reconnects".into(), None)
164 .into(),
165 failovers: metrics.create_counter("failovers".into(), None).into(),
166 failures: Arc::new(failure_metrics),
167 }
168 }
169}
170
171impl SwitchingTransport {
172 pub fn new(opt: L1ClientOptions, urls: Vec<Url>) -> anyhow::Result<Self> {
174 let Some(first_url) = urls.first().cloned() else {
176 return Err(anyhow::anyhow!("No valid URLs provided"));
177 };
178
179 let metrics = L1ClientMetrics::new(&**opt.metrics, urls.len());
181
182 let first_transport = Arc::new(RwLock::new(SingleTransport::new(&first_url, 0, None)));
184
185 Ok(Self {
186 urls: Arc::new(urls),
187 current_transport: first_transport,
188 opt: Arc::new(opt),
189 metrics,
190 switch_notify: Arc::new(Notify::new()),
191 })
192 }
193
194 async fn wait_switch(&self) {
196 self.switch_notify.notified().await;
197 }
198
199 fn options(&self) -> &L1ClientOptions {
200 &self.opt
201 }
202
203 fn metrics(&self) -> &L1ClientMetrics {
204 &self.metrics
205 }
206}
207
208impl SingleTransportStatus {
209 fn log_success(&mut self) {
211 self.consecutive_failures = 0;
212 }
213
214 fn log_failure(&mut self, opt: &L1ClientOptions) -> bool {
216 self.consecutive_failures += 1;
218
219 let should_switch = self.should_switch(opt);
221
222 self.last_failure = Some(Instant::now());
224
225 should_switch
227 }
228
229 fn should_switch(&mut self, opt: &L1ClientOptions) -> bool {
231 if self.shutting_down {
233 return false;
234 }
235
236 if self.consecutive_failures >= opt.l1_consecutive_failure_tolerance {
238 self.shutting_down = true;
239 return true;
240 }
241
242 let now = Instant::now();
244 if let Some(prev) = self.last_failure {
245 if now.saturating_duration_since(prev) < opt.l1_frequent_failure_tolerance {
246 self.shutting_down = true;
247 return true;
248 }
249 }
250
251 false
252 }
253
254 fn should_revert(&mut self, revert_at: Option<Instant>) -> bool {
256 if self.shutting_down {
257 return false;
259 }
260 let Some(revert_at) = revert_at else {
261 return false;
262 };
263 if Instant::now() >= revert_at {
264 self.shutting_down = true;
265 return true;
266 }
267
268 false
269 }
270}
271
272impl SingleTransport {
273 fn new(url: &Url, generation: usize, revert_at: Option<Instant>) -> Self {
275 Self {
276 generation,
277 client: Http::new(url.clone()),
278 status: Default::default(),
279 revert_at,
280 }
281 }
282}
283
284#[async_trait]
289impl Service<RequestPacket> for SwitchingTransport {
290 type Error = RpcError<TransportErrorKind>;
291 type Response = ResponsePacket;
292 type Future =
293 Pin<Box<dyn Future<Output = Result<ResponsePacket, RpcError<TransportErrorKind>>> + Send>>;
294
295 fn poll_ready(
296 &mut self,
297 cx: &mut std::task::Context<'_>,
298 ) -> std::task::Poll<StdResult<(), Self::Error>> {
299 self.current_transport.read().clone().client.poll_ready(cx)
301 }
302
303 fn call(&mut self, req: RequestPacket) -> Self::Future {
304 let self_clone = self.clone();
306
307 Box::pin(async move {
309 let mut current_transport = self_clone.current_transport.read().clone();
311
312 let should_revert = current_transport
314 .status
315 .write()
316 .should_revert(current_transport.revert_at);
317 if should_revert {
318 let n = self_clone.urls.len();
320 let prev_primary_gen = (current_transport.generation / n) * n;
322 let next_gen = prev_primary_gen + n;
324 current_transport = self_clone.switch_to(next_gen, current_transport);
325 }
326
327 if let Some(t) = current_transport.status.read().rate_limited_until {
329 if t > Instant::now() {
330 return Err(RpcError::Transport(TransportErrorKind::Custom(
332 "Rate limit exceeded".into(),
333 )));
334 } else {
335 current_transport.status.write().rate_limited_until = None;
337 }
338 }
339
340 match current_transport.client.call(req).await {
342 Ok(res) => {
343 current_transport.status.write().log_success();
345 Ok(res)
346 },
347 Err(err) => {
348 if let Some(f) = self_clone
350 .metrics
351 .failures
352 .get(current_transport.generation % self_clone.urls.len())
353 {
354 f.add(1);
355 }
356
357 if let RpcError::ErrorResp(e) = &err {
361 if e.code == 429 {
363 current_transport.status.write().rate_limited_until =
364 Some(Instant::now() + self_clone.opt.rate_limit_delay());
365 return Err(err);
366 }
367 }
368
369 tracing::warn!(?err, "L1 client error");
371
372 if current_transport
375 .status
376 .write()
377 .log_failure(&self_clone.opt)
378 {
379 self_clone.metrics.failovers.add(1);
381 self_clone.switch_to(current_transport.generation + 1, current_transport);
382 }
383
384 Err(err)
385 },
386 }
387 })
388 }
389}
390
391impl SwitchingTransport {
392 fn switch_to(&self, next_gen: usize, current_transport: SingleTransport) -> SingleTransport {
393 let next_index = next_gen % self.urls.len();
394 let url = self.urls[next_index].clone();
395 tracing::info!(%url, next_gen, "switch L1 transport");
396
397 let revert_at = if next_gen % self.urls.len() == 0 {
398 None
400 } else if current_transport.generation % self.urls.len() == 0 {
401 Some(Instant::now() + self.opt.l1_failover_revert)
404 } else {
405 current_transport.revert_at
407 };
408
409 let new_transport = SingleTransport::new(&url, next_gen, revert_at);
411
412 *self.current_transport.write() = new_transport.clone();
414
415 self.switch_notify.notify_waiters();
417
418 new_transport
419 }
420}
421
422impl L1Client {
423 fn with_transport(transport: SwitchingTransport) -> Self {
424 let rpc_client = RpcClient::new(transport.clone(), false);
426 let provider = ProviderBuilder::new().on_client(rpc_client);
427
428 let opt = transport.options().clone();
429
430 let (sender, mut receiver) = async_broadcast::broadcast(opt.l1_events_channel_capacity);
431 receiver.set_await_active(false);
432 receiver.set_overflow(true);
433
434 Self {
435 provider,
436 transport,
437 state: Arc::new(Mutex::new(L1State::new(opt.l1_blocks_cache_size))),
438 sender,
439 receiver: receiver.deactivate(),
440 update_task: Default::default(),
441 }
442 }
443
444 pub fn new(url: Vec<Url>) -> anyhow::Result<Self> {
446 L1ClientOptions::default().connect(url)
447 }
448
449 pub fn anvil(anvil: &alloy::node_bindings::AnvilInstance) -> anyhow::Result<Self> {
451 L1ClientOptions {
452 l1_ws_provider: Some(vec![anvil.ws_endpoint().parse()?]),
453 ..Default::default()
454 }
455 .connect(vec![anvil.endpoint().parse()?])
456 }
457
458 pub async fn spawn_tasks(&self) {
460 let mut update_task = self.update_task.0.lock().await;
461 if update_task.is_none() {
462 *update_task = Some(spawn(self.update_loop()));
463 }
464 }
465
466 pub async fn shut_down_tasks(&self) {
471 if let Some(update_task) = self.update_task.0.lock().await.take() {
472 update_task.abort();
473 }
474 }
475
476 fn update_loop(&self) -> impl Future<Output = ()> {
477 let opt = self.options().clone();
478 let rpc = self.provider.clone();
479 let ws_urls = opt.l1_ws_provider.clone();
480 let retry_delay = opt.l1_retry_delay;
481 let subscription_timeout = opt.subscription_timeout;
482 let state = self.state.clone();
483 let sender = self.sender.clone();
484 let metrics = self.metrics().clone();
485 let polling_interval = opt.l1_polling_interval;
486 let transport = self.transport.clone();
487
488 let span = tracing::warn_span!("L1 client update");
489
490 async move {
491
492 for i in 0.. {
493 let ws;
494
495 let l1_head = loop {
498 match rpc.get_block(BlockId::latest()).await {
499 Ok(Some(block)) => break block.header,
500 Ok(None) => {
501 tracing::info!("Failed to fetch L1 head block, will retry");
502 },
503 Err(err) => {
504 tracing::info!("Failed to fetch L1 head block, will retry: err {err}");
505 }
506 }
507 sleep(retry_delay).await;
508 };
509
510 let mut block_stream = {
512 let res = match &ws_urls {
513 Some(urls) => {
514 let provider = i % urls.len();
517 let url = &urls[provider];
518 ws = match ProviderBuilder::new().on_ws(WsConnect::new(url.clone())).await {
519 Ok(ws) => ws,
520 Err(err) => {
521 tracing::warn!(provider, "Failed to connect WebSockets provider: {err:#}");
522 sleep(retry_delay).await;
523 continue;
524 }
525 };
526 ws.subscribe_blocks().await.map(|stream| {stream::once(async { l1_head.clone() }).chain(stream.into_stream()).boxed()})
527 },
528 None => {
529 rpc
530 .watch_blocks()
531 .await
532 .map(|poller_builder| {
533 let stream = poller_builder.with_poll_interval(polling_interval).into_stream();
535
536 let rpc = rpc.clone();
537
538 stream::once(async { l1_head.clone() })
542 .chain(
543 stream.map(stream::iter).flatten().filter_map(move |hash| {
544 let rpc = rpc.clone();
545 async move {
546 match rpc.get_block(BlockId::hash(hash)).await {
547 Ok(Some(block)) => Some(block.header),
548 Ok(None) => {
551 tracing::warn!(%hash, "HTTP stream yielded a block hash that was not available");
552 None
553 }
554 Err(err) => {
555 tracing::warn!(%hash, "Error fetching block from HTTP stream: {err:#}");
556 None
557 }
558 }
559 }
560 }))
561 }.take_until(transport.wait_switch())
563 .boxed())
564 }
565 };
566 match res {
567 Ok(stream) => stream,
568 Err(err) => {
569 tracing::error!("Error subscribing to L1 blocks: {err:#}");
570 sleep(retry_delay).await;
571 continue;
572 }
573 }
574 };
575
576 tracing::info!("Established L1 block stream");
577 loop {
578 let block_timeout = tokio::time::timeout(subscription_timeout, block_stream.next()).await;
580 match block_timeout {
581 Ok(Some(head)) => {
583 let head = head.number;
584 tracing::debug!(head, "Received L1 block");
585
586 let finalized = loop {
589 match fetch_finalized_block_from_rpc(&rpc).await {
590 Ok(finalized) => break finalized,
591 Err(err) => {
592 tracing::warn!("Error getting finalized block: {err:#}");
593 sleep(retry_delay).await;
594 }
595 }
596 };
597
598 let mut state = state.lock().await;
600 if head > state.snapshot.head {
601 tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated");
602 metrics.head.set(head as usize);
603 state.snapshot.head = head;
604 sender
607 .broadcast_direct(L1Event::NewHead { head })
608 .await
609 .ok();
610 }
611 if let Some(finalized) = finalized {
612 if Some(finalized.info) > state.snapshot.finalized {
613 tracing::info!(
614 ?finalized,
615 old_finalized = ?state.snapshot.finalized,
616 "L1 finalized updated",
617 );
618 metrics.finalized.set(finalized.info.number as usize);
619 state.snapshot.finalized = Some(finalized.info);
620 state.put_finalized(finalized);
621 sender
622 .broadcast_direct(L1Event::NewFinalized { finalized })
623 .await
624 .ok();
625 }
626 }
627 tracing::debug!("Updated L1 snapshot to {:?}", state.snapshot);
628 }
629 Ok(None) => {
631 tracing::error!("L1 block stream ended unexpectedly, trying to re-establish block stream");
632 break;
633 }
634 Err(_) => {
636 tracing::error!("No block received for {} seconds, trying to re-establish block stream", subscription_timeout.as_secs());
637 break;
638 }
639 }
640 }
641
642 metrics.reconnects.add(1);
643 }
644 }.instrument(span)
645 }
646
647 pub async fn snapshot(&self) -> L1Snapshot {
649 self.state.lock().await.snapshot
650 }
651
652 pub async fn wait_for_block(&self, number: u64) {
658 loop {
659 let mut events = self.receiver.activate_cloned();
662
663 {
665 let state = self.state.lock().await;
666 if state.snapshot.head >= number {
667 return;
668 }
669 tracing::info!(number, head = state.snapshot.head, "Waiting for l1 block");
670 }
671
672 while let Some(event) = events.next().await {
674 let L1Event::NewHead { head } = event else {
675 continue;
676 };
677 if head >= number {
678 tracing::info!(number, head, "Got L1 block");
679 return;
680 }
681 tracing::debug!(number, head, "Waiting for L1 block");
682 }
683
684 tracing::warn!(number, "L1 event stream ended unexpectedly; retry");
686 self.retry_delay().await;
687 }
688 }
689
690 pub async fn wait_for_finalized_block(&self, number: u64) -> L1BlockInfo {
695 loop {
696 let mut events = self.receiver.activate_cloned();
699
700 {
702 let state = self.state.lock().await;
703 if let Some(finalized) = state.snapshot.finalized {
704 if finalized.number >= number {
705 return self.fetch_finalized_block_by_number(state, number).await.1;
706 }
707 tracing::info!(
708 number,
709 finalized = ?state.snapshot.finalized,
710 "waiting for l1 finalized block",
711 );
712 };
713 }
714
715 while let Some(event) = events.next().await {
717 let L1Event::NewFinalized { finalized } = event else {
718 continue;
719 };
720 let mut state = self.state.lock().await;
721 state.put_finalized(finalized);
722 if finalized.info.number >= number {
723 tracing::info!(number, ?finalized, "got finalized L1 block");
724 return self.fetch_finalized_block_by_number(state, number).await.1;
725 }
726 tracing::debug!(number, ?finalized, "waiting for finalized L1 block");
727 }
728
729 tracing::warn!(number, "L1 event stream ended unexpectedly; retry",);
731 self.retry_delay().await;
732 }
733 }
734
735 pub async fn wait_for_finalized_block_with_timestamp(&self, timestamp: U256) -> L1BlockInfo {
738 let (mut state, mut block) = 'outer: loop {
740 let mut events = self.receiver.activate_cloned();
743
744 {
746 let state = self.state.lock().await;
747 if let Some(finalized) = state.snapshot.finalized {
748 if finalized.timestamp >= timestamp {
749 break 'outer (state, finalized);
750 }
751 }
752 tracing::info!(
753 %timestamp,
754 finalized = ?state.snapshot.finalized,
755 "waiting for L1 finalized block",
756 );
757 }
758
759 while let Some(event) = events.next().await {
761 let L1Event::NewFinalized { finalized } = event else {
762 continue;
763 };
764 if finalized.info.timestamp >= timestamp {
765 tracing::info!(%timestamp, ?finalized, "got finalized block");
766 break 'outer (self.state.lock().await, finalized.info);
767 }
768 tracing::debug!(%timestamp, ?finalized, "waiting for L1 finalized block");
769 }
770
771 tracing::warn!(%timestamp, "L1 event stream ended unexpectedly; retry",);
773 self.retry_delay().await;
774 };
775
776 let mut upper_bound = block.number;
785 let mut lower_bound = 0;
786 while lower_bound < upper_bound {
787 let midpoint = (upper_bound + lower_bound) / 2;
788 tracing::debug!(
789 lower_bound,
790 midpoint,
791 upper_bound,
792 %timestamp,
793 ?block,
794 "searching for earliest block with sufficient timestamp"
795 );
796
797 let (state_lock, midpoint_block) =
798 self.fetch_finalized_block_by_number(state, midpoint).await;
799 state = state_lock;
800
801 tracing::debug!(?midpoint_block, %timestamp, "pivot on midpoint block");
802 if midpoint_block.timestamp < timestamp {
803 lower_bound = midpoint + 1;
804 } else {
805 upper_bound = midpoint;
806 block = midpoint_block;
807 }
808 }
809
810 block
811 }
812
813 async fn fetch_finalized_block_by_number<'a>(
814 &'a self,
815 mut state: MutexGuard<'a, L1State>,
816 number: u64,
817 ) -> (MutexGuard<'a, L1State>, L1BlockInfo) {
818 let latest_finalized = state
819 .snapshot
820 .finalized
821 .expect("get_finalized_block called before any blocks are finalized");
822 assert!(
823 number <= latest_finalized.number,
824 "requesting a finalized block {number} that isn't finalized; snapshot: {:?}",
825 state.snapshot,
826 );
827
828 if let Some(safety_margin) = self.options().l1_finalized_safety_margin {
829 if number < latest_finalized.number.saturating_sub(safety_margin) {
830 tracing::debug!(
836 number,
837 ?latest_finalized,
838 "skipping hash check for old finalized block"
839 );
840 let (state, block) = self
841 .load_and_cache_finalized_block(state, number.into())
842 .await;
843 return (state, block.info);
844 }
845 }
846
847 let mut successor_number = number;
853 let mut successor = loop {
854 if let Some(block) = state.finalized.get(&successor_number) {
855 break *block;
856 }
857 successor_number += 1;
858 if successor_number > latest_finalized.number {
859 drop(state);
863 let block = loop {
864 match fetch_finalized_block_from_rpc(&self.provider).await {
865 Ok(Some(block)) => {
866 break block;
867 },
868 Ok(None) => {
869 tracing::warn!(
870 "no finalized block even though finalized snapshot is Some; this \
871 can be caused by an L1 client failover"
872 );
873 self.retry_delay().await;
874 },
875 Err(err) => {
876 tracing::warn!("Error getting finalized block: {err:#}");
877 self.retry_delay().await;
878 },
879 }
880 };
881 state = self.state.lock().await;
882 state.put_finalized(block);
883 break block;
884 }
885 };
886
887 while successor.info.number > number {
890 tracing::debug!(
891 number,
892 ?successor,
893 "checking hash chaining for finalized block"
894 );
895 (state, successor) = self
896 .load_and_cache_finalized_block(state, successor.parent_hash.into())
897 .await;
898 }
899
900 (state, successor.info)
901 }
902
903 async fn load_and_cache_finalized_block<'a>(
904 &'a self,
905 mut state: MutexGuard<'a, L1State>,
906 id: BlockId,
907 ) -> (MutexGuard<'a, L1State>, L1BlockInfoWithParent) {
908 drop(state);
910 let block = loop {
911 let block = match self.provider.get_block(id).await {
912 Ok(Some(block)) => block,
913 Ok(None) => {
914 tracing::warn!(
915 %id,
916 "provider error: finalized L1 block should always be available"
917 );
918 self.retry_delay().await;
919 continue;
920 },
921 Err(err) => {
922 tracing::warn!(%id, "failed to get finalized L1 block: {err:#}");
923 self.retry_delay().await;
924 continue;
925 },
926 };
927 break (&block).into();
928 };
929 state = self.state.lock().await;
930 state.put_finalized(block);
931 (state, block)
932 }
933
934 pub async fn get_finalized_deposits(
937 &self,
938 fee_contract_address: Address,
939 prev_finalized: Option<u64>,
940 new_finalized: u64,
941 ) -> Vec<FeeInfo> {
942 if prev_finalized >= Some(new_finalized) {
945 return vec![];
946 }
947
948 let opt = self.options();
949
950 let prev = prev_finalized.map(|prev| prev + 1).unwrap_or(0);
953
954 let mut start = prev;
957 let end = new_finalized;
958 let chunk_size = opt.l1_events_max_block_range;
959 let chunks = std::iter::from_fn(move || {
960 let chunk_end = min(start + chunk_size - 1, end);
961 if chunk_end < start {
962 return None;
963 }
964
965 let chunk = (start, chunk_end);
966 start = chunk_end + 1;
967 Some(chunk)
968 });
969
970 let events = stream::iter(chunks).then(|(from, to)| {
972 let retry_delay = opt.l1_retry_delay;
973 let fee_contract = FeeContract::new(fee_contract_address, self.provider.clone());
974 async move {
975 tracing::debug!(from, to, "fetch events in range");
976
977 loop {
979 match fee_contract
980 .Deposit_filter()
981 .address(*fee_contract.address())
982 .from_block(from)
983 .to_block(to)
984 .query()
985 .await
986 {
987 Ok(events) => break stream::iter(events),
988 Err(err) => {
989 tracing::warn!(from, to, %err, "Fee L1Event Error");
990 sleep(retry_delay).await;
991 },
992 }
993 }
994 }
995 });
996 events
997 .flatten()
998 .map(|(deposit, _)| FeeInfo::from(deposit))
999 .collect()
1000 .await
1001 }
1002
1003 pub async fn is_proxy_contract(&self, proxy_address: Address) -> anyhow::Result<bool> {
1005 let hex_bytes =
1008 hex::decode("360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc")
1009 .expect("Failed to decode hex string");
1010 let implementation_slot = B256::from_slice(&hex_bytes);
1011 let storage = self
1012 .provider
1013 .get_storage_at(proxy_address, implementation_slot.into())
1014 .await?;
1015
1016 let implementation_address = Address::from_slice(&storage.to_be_bytes::<32>()[12..]);
1017
1018 Ok(implementation_address != Address::ZERO)
1020 }
1021
1022 pub async fn retry_on_all_providers<Fut>(
1023 &self,
1024 op: impl Fn() -> Fut,
1025 ) -> Result<Fut::Ok, Fut::Error>
1026 where
1027 Fut: TryFuture,
1028 {
1029 let transport = &self.transport;
1030 let start = transport.current_transport.read().generation % transport.urls.len();
1031 let end = start + transport.urls.len();
1032 loop {
1033 match op().into_future().await {
1034 Ok(res) => return Ok(res),
1035 Err(err) => {
1036 if transport.current_transport.read().generation >= end {
1037 return Err(err);
1038 } else {
1039 self.retry_delay().await;
1040 }
1041 },
1042 }
1043 }
1044 }
1045
1046 pub(crate) fn options(&self) -> &L1ClientOptions {
1047 self.transport.options()
1048 }
1049
1050 fn metrics(&self) -> &L1ClientMetrics {
1051 self.transport.metrics()
1052 }
1053
1054 async fn retry_delay(&self) {
1055 sleep(self.options().l1_retry_delay).await;
1056 }
1057}
1058
1059impl L1State {
1060 fn new(cache_size: NonZeroUsize) -> Self {
1061 Self {
1062 snapshot: Default::default(),
1063 finalized: LruCache::new(cache_size),
1064 last_finalized: None,
1065 }
1066 }
1067
1068 fn put_finalized(&mut self, block: L1BlockInfoWithParent) {
1069 assert!(
1070 self.snapshot.finalized.is_some()
1071 && block.info.number <= self.snapshot.finalized.unwrap().number,
1072 "inserting a finalized block {block:?} that isn't finalized; snapshot: {:?}",
1073 self.snapshot,
1074 );
1075
1076 if Some(block.info.number()) > self.last_finalized {
1077 self.last_finalized = Some(block.info.number());
1078 }
1079
1080 if let Some((old_number, old_block)) = self.finalized.push(block.info.number, block) {
1081 if old_number == block.info.number && block != old_block {
1082 tracing::error!(
1083 ?old_block,
1084 ?block,
1085 "got different info for the same finalized height; something has gone very \
1086 wrong with the L1",
1087 );
1088 }
1089 }
1090 }
1091}
1092
1093async fn fetch_finalized_block_from_rpc(
1094 rpc: &impl Provider,
1095) -> anyhow::Result<Option<L1BlockInfoWithParent>> {
1096 let Some(block) = rpc.get_block(BlockId::finalized()).await? else {
1097 tracing::warn!("no finalized block yet");
1102 return Ok(None);
1103 };
1104
1105 Ok(Some((&block).into()))
1106}
1107
1108#[cfg(test)]
1109mod test {
1110 use std::{ops::Add, time::Duration};
1111
1112 use alloy::{
1113 eips::BlockNumberOrTag,
1114 node_bindings::{Anvil, AnvilInstance},
1115 primitives::utils::parse_ether,
1116 providers::layers::AnvilProvider,
1117 };
1118 use espresso_contract_deployer::{deploy_fee_contract_proxy, Contracts};
1119 use portpicker::pick_unused_port;
1120 use sequencer_utils::test_utils::setup_test;
1121 use time::OffsetDateTime;
1122
1123 use super::*;
1124
1125 async fn new_l1_client_opt(
1126 anvil: &Arc<AnvilInstance>,
1127 f: impl FnOnce(&mut L1ClientOptions),
1128 ) -> L1Client {
1129 let mut opt = L1ClientOptions {
1130 l1_events_max_block_range: 1,
1131 l1_polling_interval: Duration::from_secs(1),
1132 subscription_timeout: Duration::from_secs(5),
1133 ..Default::default()
1134 };
1135 f(&mut opt);
1136
1137 let l1_client = opt
1138 .connect(vec![anvil.endpoint_url()])
1139 .expect("Failed to create L1 client");
1140
1141 l1_client.spawn_tasks().await;
1142 l1_client
1143 }
1144
1145 async fn new_l1_client(anvil: &Arc<AnvilInstance>, include_ws: bool) -> L1Client {
1146 new_l1_client_opt(anvil, |opt| {
1147 if include_ws {
1148 opt.l1_ws_provider = Some(vec![anvil.ws_endpoint_url()]);
1149 }
1150 })
1151 .await
1152 }
1153
1154 #[tokio::test(flavor = "multi_thread")]
1155 async fn test_get_finalized_deposits() -> anyhow::Result<()> {
1156 setup_test();
1157 let num_deposits = 5;
1158
1159 let anvil = Anvil::new().spawn();
1160 let wallet = anvil.wallet().unwrap();
1161 let deployer = wallet.default_signer().address();
1162 let inner_provider = ProviderBuilder::new()
1163 .wallet(wallet)
1164 .on_http(anvil.endpoint_url());
1165 let provider = AnvilProvider::new(inner_provider, Arc::new(anvil));
1167 let mut contracts = Contracts::new();
1169
1170 let l1_client = new_l1_client(provider.anvil(), false).await;
1172
1173 let fee_proxy_addr = deploy_fee_contract_proxy(&provider, &mut contracts, deployer).await?;
1175 let fee_proxy = FeeContract::new(fee_proxy_addr, &provider);
1176 let num_tx_for_deploy = provider.get_block_number().await?;
1177
1178 for n in 1..=num_deposits {
1180 let amount = n as f32 / 10.0;
1182 let receipt = fee_proxy
1183 .deposit(deployer)
1184 .value(parse_ether(&amount.to_string())?)
1185 .send()
1186 .await?
1187 .get_receipt()
1188 .await?;
1189 assert!(receipt.inner.is_success());
1190 }
1191
1192 let cur_height = provider.get_block_number().await?;
1193 assert_eq!(num_deposits + num_tx_for_deploy, cur_height);
1194
1195 let pending = l1_client
1199 .get_finalized_deposits(fee_proxy_addr, None, cur_height)
1200 .await;
1201
1202 assert_eq!(num_deposits as usize, pending.len(), "{pending:?}");
1203 assert_eq!(deployer, pending[0].account().0);
1204 assert_eq!(
1205 U256::from(1500000000000000000u64),
1206 pending
1207 .iter()
1208 .fold(U256::from(0), |total, info| total.add(info.amount().0))
1209 );
1210
1211 let pending = l1_client
1213 .get_finalized_deposits(fee_proxy_addr, Some(0), cur_height)
1214 .await;
1215 assert_eq!(num_deposits as usize, pending.len());
1216
1217 let pending = l1_client
1218 .get_finalized_deposits(fee_proxy_addr, Some(0), 0)
1219 .await;
1220 assert_eq!(0, pending.len());
1221
1222 let pending = l1_client
1223 .get_finalized_deposits(fee_proxy_addr, Some(0), 1)
1224 .await;
1225 assert_eq!(0, pending.len());
1226
1227 let pending = l1_client
1228 .get_finalized_deposits(fee_proxy_addr, Some(num_tx_for_deploy), num_tx_for_deploy)
1229 .await;
1230 assert_eq!(0, pending.len());
1231
1232 let pending = l1_client
1233 .get_finalized_deposits(
1234 fee_proxy_addr,
1235 Some(num_tx_for_deploy),
1236 num_tx_for_deploy + 1,
1237 )
1238 .await;
1239 assert_eq!(1, pending.len());
1240
1241 let pending = l1_client
1243 .get_finalized_deposits(fee_proxy_addr, Some(num_tx_for_deploy), 0)
1244 .await;
1245 assert_eq!(0, pending.len());
1246
1247 Ok(())
1248 }
1249
1250 async fn test_wait_for_finalized_block_helper(ws: bool) {
1251 setup_test();
1252
1253 let anvil = Arc::new(Anvil::new().block_time_f64(0.1).spawn());
1254 let l1_client = new_l1_client(&anvil, ws).await;
1255 let provider = &l1_client.provider;
1256
1257 let block_height = provider.get_block_number().await.unwrap();
1259 let block = l1_client.wait_for_finalized_block(block_height + 10).await;
1260 assert_eq!(block.number, block_height + 10);
1261
1262 let true_block = provider
1264 .get_block(BlockId::Number(BlockNumberOrTag::Number(block_height + 10)))
1265 .full()
1266 .await
1267 .unwrap()
1268 .unwrap();
1269
1270 assert_eq!(
1271 block.timestamp.to::<u64>(),
1272 true_block.header.inner.timestamp
1273 );
1274 assert_eq!(block.hash, true_block.header.hash);
1275 }
1276
1277 #[tokio::test(flavor = "multi_thread")]
1278 async fn test_wait_for_finalized_block_ws() {
1279 test_wait_for_finalized_block_helper(true).await
1280 }
1281
1282 #[tokio::test(flavor = "multi_thread")]
1283 async fn test_wait_for_finalized_block_http() {
1284 test_wait_for_finalized_block_helper(false).await
1285 }
1286
1287 async fn test_wait_for_old_finalized_block_helper(ws: bool) {
1288 setup_test();
1289
1290 let anvil = Arc::new(Anvil::new().block_time_f64(0.2).spawn());
1291 let l1_client = new_l1_client_opt(&anvil, |opt| {
1292 if ws {
1293 opt.l1_ws_provider = Some(vec![anvil.ws_endpoint_url()]);
1294 }
1295 opt.l1_finalized_safety_margin = Some(1);
1296 })
1297 .await;
1298 let provider = &l1_client.provider;
1299
1300 l1_client.wait_for_finalized_block(2).await;
1302
1303 let block = l1_client.wait_for_finalized_block(0).await;
1305
1306 let true_block = provider.get_block(0.into()).await.unwrap().unwrap();
1308 assert_eq!(block.hash, true_block.header.hash);
1309 }
1310
1311 #[tokio::test(flavor = "multi_thread")]
1312 async fn test_wait_for_old_finalized_block_ws() {
1313 test_wait_for_old_finalized_block_helper(true).await
1314 }
1315
1316 #[tokio::test(flavor = "multi_thread")]
1317 async fn test_wait_for_old_finalized_block_http() {
1318 test_wait_for_old_finalized_block_helper(false).await
1319 }
1320
1321 async fn test_wait_for_finalized_block_by_timestamp_helper(ws: bool) {
1322 setup_test();
1323
1324 let anvil = Arc::new(Anvil::new().block_time_f64(0.2).spawn());
1325 let l1_client = new_l1_client(&anvil, ws).await;
1326 let provider = &l1_client.provider;
1327
1328 let timestamp = U256::from(OffsetDateTime::now_utc().unix_timestamp() as u64 + 5);
1330 let block = l1_client
1331 .wait_for_finalized_block_with_timestamp(timestamp)
1332 .await;
1333 assert!(
1334 block.timestamp >= timestamp,
1335 "wait_for_finalized_block_with_timestamp({timestamp}) returned too early a block: \
1336 {block:?}",
1337 );
1338 let parent = provider
1339 .get_block(BlockId::Number(BlockNumberOrTag::Number(block.number - 1)))
1340 .full()
1341 .await
1342 .unwrap()
1343 .unwrap();
1344 assert!(
1345 parent.header.inner.timestamp < timestamp.to::<u64>(),
1346 "wait_for_finalized_block_with_timestamp({timestamp}) did not return the earliest \
1347 possible block: returned {block:?}, but earlier block {parent:?} has an acceptable \
1348 timestamp too",
1349 );
1350
1351 let true_block = provider
1353 .get_block(BlockId::Number(BlockNumberOrTag::Number(block.number)))
1354 .await
1355 .unwrap()
1356 .unwrap();
1357 assert_eq!(
1358 block.timestamp.to::<u64>(),
1359 true_block.header.inner.timestamp
1360 );
1361 assert_eq!(block.hash, true_block.header.hash);
1362 }
1363
1364 #[tokio::test(flavor = "multi_thread")]
1365 async fn test_wait_for_finalized_block_by_timestamp_ws() {
1366 test_wait_for_finalized_block_by_timestamp_helper(true).await
1367 }
1368
1369 #[tokio::test(flavor = "multi_thread")]
1370 async fn test_wait_for_finalized_block_by_timestamp_http() {
1371 test_wait_for_finalized_block_by_timestamp_helper(false).await
1372 }
1373
1374 async fn test_wait_for_old_finalized_block_by_timestamp_helper(ws: bool) {
1375 setup_test();
1376
1377 let anvil = Arc::new(Anvil::new().block_time_f64(0.2).spawn());
1378 let l1_client = new_l1_client(&anvil, ws).await;
1379
1380 let true_block = l1_client.wait_for_finalized_block(0).await;
1382 let timestamp = true_block.timestamp;
1383
1384 l1_client.wait_for_finalized_block(10).await;
1386
1387 let block = l1_client
1389 .wait_for_finalized_block_with_timestamp(U256::from(timestamp))
1390 .await;
1391 assert_eq!(block, true_block);
1392 }
1393
1394 #[tokio::test(flavor = "multi_thread")]
1395 async fn test_wait_for_old_finalized_block_by_timestamp_ws() {
1396 test_wait_for_old_finalized_block_by_timestamp_helper(true).await
1397 }
1398
1399 #[tokio::test(flavor = "multi_thread")]
1400 async fn test_wait_for_old_finalized_block_by_timestamp_http() {
1401 test_wait_for_old_finalized_block_by_timestamp_helper(false).await
1402 }
1403
1404 async fn test_wait_for_block_helper(ws: bool) {
1405 setup_test();
1406
1407 let anvil = Arc::new(Anvil::new().block_time_f64(0.1).spawn());
1408 let l1_client = new_l1_client(&anvil, ws).await;
1409 let provider = &l1_client.provider;
1410
1411 let block_height = provider.get_block_number().await.unwrap();
1413 l1_client.wait_for_block(block_height + 10).await;
1414
1415 let new_block_height = provider.get_block_number().await.unwrap();
1416 assert!(
1417 new_block_height >= block_height + 10,
1418 "wait_for_block returned too early; initial height = {block_height}, new height = \
1419 {new_block_height}",
1420 );
1421 }
1422
1423 #[tokio::test(flavor = "multi_thread")]
1424 async fn test_wait_for_block_ws() {
1425 test_wait_for_block_helper(true).await
1426 }
1427
1428 #[tokio::test(flavor = "multi_thread")]
1429 async fn test_wait_for_block_http() {
1430 test_wait_for_block_helper(false).await
1431 }
1432
1433 async fn test_reconnect_update_task_helper(ws: bool) {
1434 setup_test();
1435
1436 let port = pick_unused_port().unwrap();
1437 let anvil = Arc::new(Anvil::new().block_time(1).port(port).spawn());
1438 let client = new_l1_client(&anvil, ws).await;
1439
1440 let initial_state = client.snapshot().await;
1441 tracing::info!(?initial_state, "initial state");
1442
1443 let mut retry = 0;
1445 let updated_state = loop {
1446 assert!(retry < 10, "state did not update in time");
1447
1448 let updated_state = client.snapshot().await;
1449 if updated_state.head > initial_state.head {
1450 break updated_state;
1451 }
1452 tracing::info!(retry, "waiting for state update");
1453 sleep(Duration::from_secs(1)).await;
1454 retry += 1;
1455 };
1456 tracing::info!(?updated_state, "state updated");
1457
1458 drop(anvil);
1462
1463 tracing::info!("sleep 5");
1468 sleep(Duration::from_secs(5)).await;
1469
1470 tracing::info!("restarting L1");
1472 let _anvil = Anvil::new().block_time(1).port(port).spawn();
1473
1474 let mut retry = 0;
1475 let final_state = loop {
1476 assert!(retry < 5, "state did not update in time");
1477
1478 let final_state = client.snapshot().await;
1479 if final_state.head > updated_state.head {
1480 break final_state;
1481 }
1482 tracing::info!(retry, "waiting for state update");
1483 sleep(Duration::from_secs(1)).await;
1484 retry += 1;
1485 };
1486 tracing::info!(?final_state, "state updated");
1487 }
1488
1489 #[tokio::test(flavor = "multi_thread")]
1490 async fn test_reconnect_update_task_ws() {
1491 test_reconnect_update_task_helper(true).await
1492 }
1493
1494 #[tokio::test(flavor = "multi_thread")]
1495 async fn test_reconnect_update_task_http() {
1496 test_reconnect_update_task_helper(false).await
1497 }
1498
1499 fn get_failover_index(provider: &L1Client) -> usize {
1540 let transport = &provider.transport;
1541 provider.transport.current_transport.read().generation % transport.urls.len()
1542 }
1543
1544 async fn test_failover_update_task_helper(ws: bool) {
1545 setup_test();
1546
1547 let anvil = Anvil::new().block_time(1).spawn();
1548
1549 let client = L1ClientOptions {
1552 l1_polling_interval: Duration::from_secs(1),
1553 subscription_timeout: Duration::from_secs(1000),
1556 l1_ws_provider: if ws {
1557 Some(vec![
1558 "ws://notarealurl:1234".parse().unwrap(),
1559 anvil.ws_endpoint_url(),
1560 ])
1561 } else {
1562 None
1563 },
1564 ..Default::default()
1565 }
1566 .connect(vec![
1567 "http://notarealurl:1234".parse().unwrap(),
1568 anvil.endpoint_url(),
1569 ])
1570 .expect("Failed to create L1 client");
1571
1572 client.spawn_tasks().await;
1573
1574 let initial_state = client.snapshot().await;
1575 tracing::info!(?initial_state, "initial state");
1576
1577 let mut retry = 0;
1579 let updated_state = loop {
1580 assert!(retry < 10, "state did not update in time");
1581
1582 let updated_state = client.snapshot().await;
1583 if updated_state.head > initial_state.head {
1584 break updated_state;
1585 }
1586 tracing::info!(retry, "waiting for state update");
1587 sleep(Duration::from_secs(1)).await;
1588 retry += 1;
1589 };
1590 tracing::info!(?updated_state, "state updated");
1591 }
1592
1593 #[tokio::test(flavor = "multi_thread")]
1594 async fn test_failover_update_task_ws() {
1595 test_failover_update_task_helper(true).await;
1596 }
1597
1598 #[tokio::test(flavor = "multi_thread")]
1599 async fn test_failover_update_task_http() {
1600 test_failover_update_task_helper(false).await;
1601 }
1602
1603 #[tokio::test(flavor = "multi_thread")]
1604 async fn test_failover_consecutive_failures() {
1605 setup_test();
1606
1607 let anvil = Anvil::new().block_time(1).spawn();
1608
1609 let l1_options = L1ClientOptions {
1610 l1_polling_interval: Duration::from_secs(1),
1611 l1_frequent_failure_tolerance: Duration::from_millis(0),
1612 l1_consecutive_failure_tolerance: 3,
1613 ..Default::default()
1614 };
1615
1616 let provider = l1_options
1617 .connect(vec![
1618 "http://notarealurl:1234".parse().unwrap(),
1619 anvil.endpoint_url(),
1620 ])
1621 .expect("Failed to create L1 client");
1622
1623 for _ in 0..2 {
1625 provider.get_block_number().await.unwrap_err();
1626 assert!(get_failover_index(&provider) == 0);
1627 }
1628
1629 provider.get_block_number().await.unwrap_err();
1631 assert!(get_failover_index(&provider) == 1);
1632
1633 provider.get_block_number().await.unwrap();
1635 }
1636
1637 #[tokio::test(flavor = "multi_thread")]
1638 async fn test_failover_frequent_failures() {
1639 setup_test();
1640
1641 let anvil = Anvil::new().block_time(1).spawn();
1642 let provider = L1ClientOptions {
1643 l1_polling_interval: Duration::from_secs(1),
1644 l1_frequent_failure_tolerance: Duration::from_millis(100),
1645 ..Default::default()
1646 }
1647 .connect(vec![
1648 "http://notarealurl:1234".parse().unwrap(),
1649 anvil.endpoint_url(),
1650 ])
1651 .expect("Failed to create L1 client");
1652
1653 provider.get_block_number().await.unwrap_err();
1655 sleep(Duration::from_secs(1)).await;
1656 provider.get_block_number().await.unwrap_err();
1657
1658 assert!(get_failover_index(&provider) == 0);
1660
1661 sleep(Duration::from_secs(1)).await;
1663
1664 provider.get_block_number().await.unwrap_err();
1666 provider.get_block_number().await.unwrap_err();
1667 provider.get_block_number().await.unwrap();
1668 assert!(get_failover_index(&provider) == 1);
1669 }
1670
1671 #[tokio::test(flavor = "multi_thread")]
1672 async fn test_failover_revert() {
1673 setup_test();
1674
1675 let anvil = Anvil::new().block_time(1).spawn();
1676 let provider = L1ClientOptions {
1677 l1_polling_interval: Duration::from_secs(1),
1678 l1_consecutive_failure_tolerance: 1,
1679 l1_failover_revert: Duration::from_secs(2),
1680 ..Default::default()
1681 }
1682 .connect(vec![
1683 "http://notarealurl:1234".parse().unwrap(),
1684 anvil.endpoint_url(),
1685 ])
1686 .expect("Failed to create L1 client");
1687
1688 provider.get_block_number().await.unwrap_err();
1690 assert_eq!(get_failover_index(&provider), 1);
1691
1692 provider.get_block_number().await.unwrap();
1694
1695 sleep(Duration::from_millis(2100)).await;
1697 provider.get_block_number().await.unwrap_err();
1698 }
1699
1700 #[tokio::test(flavor = "multi_thread")]
1704 async fn test_update_loop_initializes_l1_state() {
1705 setup_test();
1706 let anvil = Arc::new(Anvil::new().port(9988u16).spawn());
1707 let l1_client = new_l1_client(&anvil, true).await;
1708
1709 for _try in 0..10 {
1710 let mut state = l1_client.state.lock().await;
1711 let has_snapshot = state.snapshot.finalized.is_some();
1712 let has_cache = state.finalized.get(&0).is_some();
1713 drop(state);
1714 if has_snapshot && has_cache {
1715 return;
1716 }
1717 sleep(Duration::from_millis(200)).await;
1718 }
1719 panic!("L1 state of L1Client not initialized");
1720 }
1721}