espresso_types/v0/impls/
l1.rs

1use std::{
2    cmp::{min, Ordering},
3    num::NonZeroUsize,
4    pin::Pin,
5    result::Result as StdResult,
6    sync::Arc,
7    time::Instant,
8};
9
10use alloy::{
11    eips::BlockId,
12    hex,
13    primitives::{Address, B256, U256},
14    providers::{Provider, ProviderBuilder, WsConnect},
15    rpc::{
16        client::RpcClient,
17        json_rpc::{RequestPacket, ResponsePacket},
18        types::Block,
19    },
20    transports::{http::Http, RpcError, TransportErrorKind},
21};
22use anyhow::Context;
23use async_trait::async_trait;
24use clap::Parser;
25use committable::{Commitment, Committable, RawCommitmentBuilder};
26use futures::{
27    future::{Future, TryFuture, TryFutureExt},
28    stream::{self, StreamExt},
29};
30use hotshot_contract_adapter::sol_types::FeeContract;
31use hotshot_types::traits::metrics::Metrics;
32use lru::LruCache;
33use parking_lot::RwLock;
34use tokio::{
35    spawn,
36    sync::{Mutex, MutexGuard, Notify},
37    time::{sleep, Duration},
38};
39use tower_service::Service;
40use tracing::Instrument;
41use url::Url;
42
43use super::{
44    v0_1::{L1BlockInfoWithParent, SingleTransport, SingleTransportStatus, SwitchingTransport},
45    L1BlockInfo, L1ClientMetrics, L1State, L1UpdateTask,
46};
47use crate::{FeeInfo, L1Client, L1ClientOptions, L1Event, L1Snapshot};
48
49impl PartialOrd for L1BlockInfo {
50    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
51        Some(self.cmp(other))
52    }
53}
54
55impl Ord for L1BlockInfo {
56    fn cmp(&self, other: &Self) -> Ordering {
57        self.number.cmp(&other.number)
58    }
59}
60
61impl From<&Block> for L1BlockInfo {
62    fn from(block: &Block) -> Self {
63        Self {
64            number: block.header.number,
65            timestamp: U256::from(block.header.timestamp),
66            hash: block.header.hash,
67        }
68    }
69}
70
71impl From<&Block> for L1BlockInfoWithParent {
72    fn from(block: &Block) -> Self {
73        Self {
74            info: block.into(),
75            parent_hash: block.header.parent_hash,
76        }
77    }
78}
79
80impl Committable for L1BlockInfo {
81    fn commit(&self) -> Commitment<Self> {
82        let timestamp: [u8; 32] = self.timestamp.to_le_bytes();
83
84        RawCommitmentBuilder::new(&Self::tag())
85            .u64_field("number", self.number)
86            // `RawCommitmentBuilder` doesn't have a `u256_field` method, so we simulate it:
87            .constant_str("timestamp")
88            .fixed_size_bytes(&timestamp)
89            .constant_str("hash")
90            .fixed_size_bytes(&self.hash.0)
91            .finalize()
92    }
93
94    fn tag() -> String {
95        "L1BLOCK".into()
96    }
97}
98
99impl L1BlockInfo {
100    pub fn number(&self) -> u64 {
101        self.number
102    }
103
104    pub fn timestamp(&self) -> U256 {
105        self.timestamp
106    }
107
108    pub fn hash(&self) -> B256 {
109        self.hash
110    }
111}
112
113impl Drop for L1UpdateTask {
114    fn drop(&mut self) {
115        if let Some(task) = self.0.get_mut().take() {
116            task.abort();
117        }
118    }
119}
120
121impl Default for L1ClientOptions {
122    fn default() -> Self {
123        Self::parse_from(std::iter::empty::<String>())
124    }
125}
126
127impl L1ClientOptions {
128    /// Use the given metrics collector to publish metrics related to the L1 client.
129    pub fn with_metrics(mut self, metrics: &(impl Metrics + ?Sized)) -> Self {
130        self.metrics = Arc::new(metrics.subgroup("l1".into()));
131        self
132    }
133
134    /// Instantiate an `L1Client` for a given list of provider `Url`s.
135    pub fn connect(self, urls: Vec<Url>) -> anyhow::Result<L1Client> {
136        // create custom transport
137        let t = SwitchingTransport::new(self, urls)
138            .with_context(|| "failed to create switching transport")?;
139        // Create a new L1 client with the transport
140        Ok(L1Client::with_transport(t))
141    }
142
143    fn rate_limit_delay(&self) -> Duration {
144        self.l1_rate_limit_delay.unwrap_or(self.l1_retry_delay)
145    }
146}
147
148impl L1ClientMetrics {
149    fn new(metrics: &(impl Metrics + ?Sized), num_urls: usize) -> Self {
150        // Create a counter family for the failures per URL
151        let failures = metrics.counter_family("failed_requests".into(), vec!["provider".into()]);
152
153        // Create a counter for each URL
154        let mut failure_metrics = Vec::with_capacity(num_urls);
155        for url_index in 0..num_urls {
156            failure_metrics.push(failures.create(vec![url_index.to_string()]));
157        }
158
159        Self {
160            head: metrics.create_gauge("head".into(), None).into(),
161            finalized: metrics.create_gauge("finalized".into(), None).into(),
162            reconnects: metrics
163                .create_counter("stream_reconnects".into(), None)
164                .into(),
165            failovers: metrics.create_counter("failovers".into(), None).into(),
166            failures: Arc::new(failure_metrics),
167        }
168    }
169}
170
171impl SwitchingTransport {
172    /// Create a new `SwitchingTransport` with the given options and URLs
173    pub fn new(opt: L1ClientOptions, urls: Vec<Url>) -> anyhow::Result<Self> {
174        // Return early if there were no URLs provided
175        let Some(first_url) = urls.first().cloned() else {
176            return Err(anyhow::anyhow!("No valid URLs provided"));
177        };
178
179        // Create the metrics
180        let metrics = L1ClientMetrics::new(&**opt.metrics, urls.len());
181
182        // Create a new `SingleTransport` for the first URL
183        let first_transport = Arc::new(RwLock::new(SingleTransport::new(&first_url, 0, None)));
184
185        Ok(Self {
186            urls: Arc::new(urls),
187            current_transport: first_transport,
188            opt: Arc::new(opt),
189            metrics,
190            switch_notify: Arc::new(Notify::new()),
191        })
192    }
193
194    /// Returns when the transport has been switched
195    async fn wait_switch(&self) {
196        self.switch_notify.notified().await;
197    }
198
199    fn options(&self) -> &L1ClientOptions {
200        &self.opt
201    }
202
203    fn metrics(&self) -> &L1ClientMetrics {
204        &self.metrics
205    }
206}
207
208impl SingleTransportStatus {
209    /// Log a successful call to the inner transport
210    fn log_success(&mut self) {
211        self.consecutive_failures = 0;
212    }
213
214    /// Log a failure to call the inner transport. Returns whether or not the transport should be switched to the next URL
215    fn log_failure(&mut self, opt: &L1ClientOptions) -> bool {
216        // Increment the consecutive failures
217        self.consecutive_failures += 1;
218
219        // Check if we should switch to the next URL
220        let should_switch = self.should_switch(opt);
221
222        // Update the last failure time
223        self.last_failure = Some(Instant::now());
224
225        // Return whether or not we should switch
226        should_switch
227    }
228
229    /// Whether or not the transport should be switched to the next URL
230    fn should_switch(&mut self, opt: &L1ClientOptions) -> bool {
231        // If someone else already beat us to switching, return false
232        if self.shutting_down {
233            return false;
234        }
235
236        // If we've reached the max number of consecutive failures, switch to the next URL
237        if self.consecutive_failures >= opt.l1_consecutive_failure_tolerance {
238            self.shutting_down = true;
239            return true;
240        }
241
242        // If we've failed recently, switch to the next URL
243        let now = Instant::now();
244        if let Some(prev) = self.last_failure {
245            if now.saturating_duration_since(prev) < opt.l1_frequent_failure_tolerance {
246                self.shutting_down = true;
247                return true;
248            }
249        }
250
251        false
252    }
253
254    /// Whether or not the transport should be switched back to the primary URL.
255    fn should_revert(&mut self, revert_at: Option<Instant>) -> bool {
256        if self.shutting_down {
257            // We have already switched away from this transport in another thread.
258            return false;
259        }
260        let Some(revert_at) = revert_at else {
261            return false;
262        };
263        if Instant::now() >= revert_at {
264            self.shutting_down = true;
265            return true;
266        }
267
268        false
269    }
270}
271
272impl SingleTransport {
273    /// Create a new `SingleTransport` with the given URL
274    fn new(url: &Url, generation: usize, revert_at: Option<Instant>) -> Self {
275        Self {
276            generation,
277            client: Http::new(url.clone()),
278            status: Default::default(),
279            revert_at,
280        }
281    }
282}
283
284/// `SwitchingTransport` is an alternative [`Client`](https://docs.rs/alloy/0.12.5/alloy/transports/http/struct.Client.html)
285/// which by implementing `tower_service::Service`, traits like [`Transport`](https://docs.rs/alloy/0.12.5/alloy/transports/trait.Transport.html)
286/// are auto-derived, thus can be used as an alt [`RpcClient`](https://docs.rs/alloy/0.12.5/alloy/rpc/client/struct.RpcClient.html#method.new)
287/// that can be further hooked with `Provider` via `Provider::on_client()`.
288#[async_trait]
289impl Service<RequestPacket> for SwitchingTransport {
290    type Error = RpcError<TransportErrorKind>;
291    type Response = ResponsePacket;
292    type Future =
293        Pin<Box<dyn Future<Output = Result<ResponsePacket, RpcError<TransportErrorKind>>> + Send>>;
294
295    fn poll_ready(
296        &mut self,
297        cx: &mut std::task::Context<'_>,
298    ) -> std::task::Poll<StdResult<(), Self::Error>> {
299        // Just poll the (current) inner client
300        self.current_transport.read().clone().client.poll_ready(cx)
301    }
302
303    fn call(&mut self, req: RequestPacket) -> Self::Future {
304        // Clone ourselves
305        let self_clone = self.clone();
306
307        // Pin and box, which turns this into a future
308        Box::pin(async move {
309            // Clone the current transport
310            let mut current_transport = self_clone.current_transport.read().clone();
311
312            // Revert back to the primary transport if it's time.
313            let should_revert = current_transport
314                .status
315                .write()
316                .should_revert(current_transport.revert_at);
317            if should_revert {
318                // Switch to the next generation which maps to index 0.
319                let n = self_clone.urls.len();
320                // Rounding down to a multiple of n gives us the last generation of the primary transport.
321                let prev_primary_gen = (current_transport.generation / n) * n;
322                // Adding n jumps to the next generation.
323                let next_gen = prev_primary_gen + n;
324                current_transport = self_clone.switch_to(next_gen, current_transport);
325            }
326
327            // If we've been rate limited, back off until the limit (hopefully) expires.
328            let rate_limit_until = current_transport.status.read().rate_limited_until;
329            if let Some(t) = rate_limit_until {
330                if t > Instant::now() {
331                    // Return an error with a non-standard code to indicate client-side rate limit.
332                    return Err(RpcError::Transport(TransportErrorKind::Custom(
333                        "Rate limit exceeded".into(),
334                    )));
335                } else {
336                    // Reset the rate limit if we are passed it so we don't check every time
337                    current_transport.status.write().rate_limited_until = None;
338                }
339            }
340
341            // Call the inner client, match on the result
342            match current_transport.client.call(req).await {
343                Ok(res) => {
344                    // If it's okay, log the success to the status
345                    current_transport.status.write().log_success();
346                    Ok(res)
347                },
348                Err(err) => {
349                    // Increment the failure metric
350                    if let Some(f) = self_clone
351                        .metrics
352                        .failures
353                        .get(current_transport.generation % self_clone.urls.len())
354                    {
355                        f.add(1);
356                    }
357
358                    // Treat rate limited errors specially; these should not cause failover, but instead
359                    // should only cause us to temporarily back off on making requests to the RPC
360                    // server.
361                    if let RpcError::ErrorResp(e) = &err {
362                        // 429 == Too Many Requests
363                        if e.code == 429 {
364                            current_transport.status.write().rate_limited_until =
365                                Some(Instant::now() + self_clone.opt.rate_limit_delay());
366                            return Err(err);
367                        }
368                    }
369
370                    // Log the error and indicate a failure
371                    tracing::warn!(?err, "L1 client error");
372
373                    // If the transport should switch, do so. We don't need to worry about
374                    // race conditions here, since it will only return true once.
375                    if current_transport
376                        .status
377                        .write()
378                        .log_failure(&self_clone.opt)
379                    {
380                        // Increment the failovers metric
381                        self_clone.metrics.failovers.add(1);
382                        self_clone.switch_to(current_transport.generation + 1, current_transport);
383                    }
384
385                    Err(err)
386                },
387            }
388        })
389    }
390}
391
392impl SwitchingTransport {
393    fn switch_to(&self, next_gen: usize, current_transport: SingleTransport) -> SingleTransport {
394        let next_index = next_gen % self.urls.len();
395        let url = self.urls[next_index].clone();
396        tracing::info!(%url, next_gen, "switch L1 transport");
397
398        let revert_at = if next_gen % self.urls.len() == 0 {
399            // If we are reverting to the primary transport, clear our scheduled revert time.
400            None
401        } else if current_transport.generation % self.urls.len() == 0 {
402            // If we are failing over from the primary transport, schedule a time to automatically
403            // revert back.
404            Some(Instant::now() + self.opt.l1_failover_revert)
405        } else {
406            // Otherwise keep the currently scheduled revert time.
407            current_transport.revert_at
408        };
409
410        // Create a new transport from the next URL and index
411        let new_transport = SingleTransport::new(&url, next_gen, revert_at);
412
413        // Switch to the next URL
414        *self.current_transport.write() = new_transport.clone();
415
416        // Notify the transport that it has been switched
417        self.switch_notify.notify_waiters();
418
419        new_transport
420    }
421}
422
423impl L1Client {
424    fn with_transport(transport: SwitchingTransport) -> Self {
425        // Create a new provider with that RPC client using the custom transport
426        let rpc_client = RpcClient::new(transport.clone(), false);
427        let provider = ProviderBuilder::new().on_client(rpc_client);
428
429        let opt = transport.options().clone();
430
431        let (sender, mut receiver) = async_broadcast::broadcast(opt.l1_events_channel_capacity);
432        receiver.set_await_active(false);
433        receiver.set_overflow(true);
434
435        Self {
436            provider,
437            transport,
438            state: Arc::new(Mutex::new(L1State::new(opt.l1_blocks_cache_size))),
439            sender,
440            receiver: receiver.deactivate(),
441            update_task: Default::default(),
442        }
443    }
444
445    /// Construct a new L1 client with the default options.
446    pub fn new(url: Vec<Url>) -> anyhow::Result<Self> {
447        L1ClientOptions::default().connect(url)
448    }
449
450    /// test only
451    pub fn anvil(anvil: &alloy::node_bindings::AnvilInstance) -> anyhow::Result<Self> {
452        L1ClientOptions {
453            l1_ws_provider: Some(vec![anvil.ws_endpoint().parse()?]),
454            ..Default::default()
455        }
456        .connect(vec![anvil.endpoint().parse()?])
457    }
458
459    /// Start the background tasks which keep the L1 client up to date.
460    pub async fn spawn_tasks(&self) {
461        let mut update_task = self.update_task.0.lock().await;
462        if update_task.is_none() {
463            *update_task = Some(spawn(self.update_loop()));
464        }
465    }
466
467    /// Shut down background tasks associated with this L1 client.
468    ///
469    /// The L1 client will still be usable, but will stop updating until [`start`](Self::start) is
470    /// called again.
471    pub async fn shut_down_tasks(&self) {
472        let update_task = self.update_task.0.lock().await.take();
473        if let Some(update_task) = update_task {
474            update_task.abort();
475        }
476    }
477
478    fn update_loop(&self) -> impl Future<Output = ()> {
479        let opt = self.options().clone();
480        let rpc = self.provider.clone();
481        let ws_urls = opt.l1_ws_provider.clone();
482        let retry_delay = opt.l1_retry_delay;
483        let subscription_timeout = opt.subscription_timeout;
484        let state = self.state.clone();
485        let sender = self.sender.clone();
486        let metrics = self.metrics().clone();
487        let polling_interval = opt.l1_polling_interval;
488        let transport = self.transport.clone();
489
490        let span = tracing::warn_span!("L1 client update");
491
492        async move {
493
494            for i in 0.. {
495                let ws;
496
497                // Fetch current L1 head block for the first value of the stream to avoid having
498                // to wait for new L1 blocks until the update loop starts processing blocks.
499                let l1_head = loop {
500                    match rpc.get_block(BlockId::latest()).await {
501                        Ok(Some(block)) => break block.header,
502                        Ok(None) => {
503                            tracing::info!("Failed to fetch L1 head block, will retry");
504                        },
505                        Err(err) => {
506                            tracing::info!("Failed to fetch L1 head block, will retry: err {err}");
507                        }
508                    }
509                    sleep(retry_delay).await;
510                };
511
512                // Subscribe to new blocks.
513                let mut block_stream = {
514                    let res = match &ws_urls {
515                        Some(urls) => {
516                            // Use a new WebSockets host each time we retry in case there is a
517                            // problem with one of the hosts specifically.
518                            let provider = i % urls.len();
519                            let url = &urls[provider];
520                            ws = match ProviderBuilder::new().on_ws(WsConnect::new(url.clone())).await {
521                                Ok(ws) => ws,
522                                Err(err) => {
523                                    tracing::warn!(provider, "Failed to connect WebSockets provider: {err:#}");
524                                    sleep(retry_delay).await;
525                                    continue;
526                                }
527                            };
528                            ws.subscribe_blocks().await.map(|stream| {stream::once(async { l1_head.clone() }).chain(stream.into_stream()).boxed()})
529                        },
530                        None => {
531                           rpc
532                            .watch_blocks()
533                            .await
534                            .map(|poller_builder| {
535                                // Configure it and get the stream
536                                let stream = poller_builder.with_poll_interval(polling_interval).into_stream();
537
538                                let rpc = rpc.clone();
539
540                                // For HTTP, we simulate a subscription by polling. The polling
541                                // stream provided by ethers only yields block hashes, so for each
542                                // one, we have to go fetch the block itself.
543                                stream::once(async { l1_head.clone() })
544                                 .chain(
545                                    stream.map(stream::iter).flatten().filter_map(move |hash| {
546                                        let rpc = rpc.clone();
547                                        async move {
548                                            match rpc.get_block(BlockId::hash(hash)).await {
549                                                Ok(Some(block)) => Some(block.header),
550                                                // If we can't fetch the block for some reason, we can
551                                                // just skip it.
552                                                Ok(None) => {
553                                                    tracing::warn!(%hash, "HTTP stream yielded a block hash that was not available");
554                                                    None
555                                                }
556                                                Err(err) => {
557                                                    tracing::warn!(%hash, "Error fetching block from HTTP stream: {err:#}");
558                                                    None
559                                                }
560                                            }
561                                        }
562                                    }))
563                                // Take until the transport is switched, so we will call `watch_blocks` instantly on it
564                            }.take_until(transport.wait_switch())
565                            .boxed())
566                        }
567                    };
568                    match res {
569                        Ok(stream) => stream,
570                        Err(err) => {
571                            tracing::error!("Error subscribing to L1 blocks: {err:#}");
572                            sleep(retry_delay).await;
573                            continue;
574                        }
575                    }
576                };
577
578                tracing::info!("Established L1 block stream");
579                loop {
580                    // Wait for a block, timing out if we don't get one soon enough
581                    let block_timeout = tokio::time::timeout(subscription_timeout, block_stream.next()).await;
582                    match block_timeout {
583                        // We got a block
584                        Ok(Some(head)) => {
585                            let head = head.number;
586                            tracing::debug!(head, "Received L1 block");
587
588                            // A new block has been produced. This happens fairly rarely, so it is now ok to
589                            // poll to see if a new block has been finalized.
590                            let finalized = loop {
591                                match fetch_finalized_block_from_rpc(&rpc).await {
592                                    Ok(finalized) => break finalized,
593                                    Err(err) => {
594                                        tracing::warn!("Error getting finalized block: {err:#}");
595                                        sleep(retry_delay).await;
596                                    }
597                                }
598                            };
599
600                            // Update the state snapshot;
601                            let mut state = state.lock().await;
602                            if head > state.snapshot.head {
603                                tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated");
604                                metrics.head.set(head as usize);
605                                state.snapshot.head = head;
606                                // Emit an event about the new L1 head. Ignore send errors; it just means no
607                                // one is listening to events right now.
608                                sender
609                                    .broadcast_direct(L1Event::NewHead { head })
610                                    .await
611                                    .ok();
612                            }
613                            if let Some(finalized) = finalized {
614                                if Some(finalized.info) > state.snapshot.finalized {
615                                    tracing::info!(
616                                        ?finalized,
617                                        old_finalized = ?state.snapshot.finalized,
618                                        "L1 finalized updated",
619                                    );
620                                    metrics.finalized.set(finalized.info.number as usize);
621                                    state.snapshot.finalized = Some(finalized.info);
622                                    state.put_finalized(finalized);
623                                    sender
624                                        .broadcast_direct(L1Event::NewFinalized { finalized })
625                                        .await
626                                        .ok();
627                                }
628                            }
629                            tracing::debug!("Updated L1 snapshot to {:?}", state.snapshot);
630                        }
631                        // The stream ended
632                        Ok(None) => {
633                            tracing::error!("L1 block stream ended unexpectedly, trying to re-establish block stream");
634                            break;
635                        }
636                        // We timed out waiting for a block
637                        Err(_) => {
638                            tracing::error!("No block received for {} seconds, trying to re-establish block stream", subscription_timeout.as_secs());
639                            break;
640                        }
641                    }
642                }
643
644                metrics.reconnects.add(1);
645            }
646        }.instrument(span)
647    }
648
649    /// Get a snapshot from the l1.
650    pub async fn snapshot(&self) -> L1Snapshot {
651        self.state.lock().await.snapshot
652    }
653
654    /// Wait until the highest L1 block number reaches at least `number`.
655    ///
656    /// This function does not return any information about the block, since the block is not
657    /// necessarily finalized when it returns. It is only used to guarantee that some block at
658    /// height `number` exists, possibly in the unsafe part of the L1 chain.
659    pub async fn wait_for_block(&self, number: u64) {
660        loop {
661            // Subscribe to events before checking the current state, to ensure we don't miss a
662            // relevant event.
663            let mut events = self.receiver.activate_cloned();
664
665            // Check if the block we are waiting for already exists.
666            {
667                let state = self.state.lock().await;
668                if state.snapshot.head >= number {
669                    return;
670                }
671                tracing::info!(number, head = state.snapshot.head, "Waiting for l1 block");
672            }
673
674            // Wait for the block.
675            while let Some(event) = events.next().await {
676                let L1Event::NewHead { head } = event else {
677                    continue;
678                };
679                if head >= number {
680                    tracing::info!(number, head, "Got L1 block");
681                    return;
682                }
683                tracing::debug!(number, head, "Waiting for L1 block");
684            }
685
686            // This should not happen: the event stream ended. All we can do is try again.
687            tracing::warn!(number, "L1 event stream ended unexpectedly; retry");
688            self.retry_delay().await;
689        }
690    }
691
692    /// Get information about the given block.
693    ///
694    /// If the desired block number is not finalized yet, this function will block until it becomes
695    /// finalized.
696    pub async fn wait_for_finalized_block(&self, number: u64) -> L1BlockInfo {
697        loop {
698            // Subscribe to events before checking the current state, to ensure we don't miss a relevant
699            // event.
700            let mut events = self.receiver.activate_cloned();
701
702            // Check if the block we are waiting for already exists.
703            {
704                let state = self.state.lock().await;
705                if let Some(finalized) = state.snapshot.finalized {
706                    if finalized.number >= number {
707                        return self.fetch_finalized_block_by_number(state, number).await.1;
708                    }
709                    tracing::info!(
710                        number,
711                        finalized = ?state.snapshot.finalized,
712                        "waiting for l1 finalized block",
713                    );
714                };
715            }
716
717            // Wait for the block.
718            while let Some(event) = events.next().await {
719                let L1Event::NewFinalized { finalized } = event else {
720                    continue;
721                };
722                let mut state = self.state.lock().await;
723                state.put_finalized(finalized);
724                if finalized.info.number >= number {
725                    tracing::info!(number, ?finalized, "got finalized L1 block");
726                    return self.fetch_finalized_block_by_number(state, number).await.1;
727                }
728                tracing::debug!(number, ?finalized, "waiting for finalized L1 block");
729            }
730
731            // This should not happen: the event stream ended. All we can do is try again.
732            tracing::warn!(number, "L1 event stream ended unexpectedly; retry",);
733            self.retry_delay().await;
734        }
735    }
736
737    /// Get information about the first finalized block with timestamp greater than or equal
738    /// `timestamp`.
739    pub async fn wait_for_finalized_block_with_timestamp(&self, timestamp: U256) -> L1BlockInfo {
740        // Wait until the finalized block has timestamp >= `timestamp`.
741        let (mut state, mut block) = 'outer: loop {
742            // Subscribe to events before checking the current state, to ensure we don't miss a
743            // relevant event.
744            let mut events = self.receiver.activate_cloned();
745
746            // Check if the block we are waiting for already exists.
747            {
748                let state = self.state.lock().await;
749                if let Some(finalized) = state.snapshot.finalized {
750                    if finalized.timestamp >= timestamp {
751                        break 'outer (state, finalized);
752                    }
753                }
754                tracing::info!(
755                    %timestamp,
756                    finalized = ?state.snapshot.finalized,
757                    "waiting for L1 finalized block",
758                );
759            }
760
761            // Wait for the block.
762            while let Some(event) = events.next().await {
763                let L1Event::NewFinalized { finalized } = event else {
764                    continue;
765                };
766                if finalized.info.timestamp >= timestamp {
767                    tracing::info!(%timestamp, ?finalized, "got finalized block");
768                    break 'outer (self.state.lock().await, finalized.info);
769                }
770                tracing::debug!(%timestamp, ?finalized, "waiting for L1 finalized block");
771            }
772
773            // This should not happen: the event stream ended. All we can do is try again.
774            tracing::warn!(%timestamp, "L1 event stream ended unexpectedly; retry",);
775            self.retry_delay().await;
776        };
777
778        // It is possible there is some earlier block that also has the proper timestamp. Binary
779        // search until we find the true earliest block with timestamp >= `timestamp`.
780        //
781        // Invariants:
782        // * `upper_bound <= lower_bound`
783        // * `upper_bound = block.number`
784        // * Block number `lower_bound - 1` has timestamp < `timestamp` (strictly)
785        // * `block` has timestamp >= `timestamp`
786        let mut upper_bound = block.number;
787        let mut lower_bound = 0;
788        while lower_bound < upper_bound {
789            let midpoint = (upper_bound + lower_bound) / 2;
790            tracing::debug!(
791                lower_bound,
792                midpoint,
793                upper_bound,
794                %timestamp,
795                ?block,
796                "searching for earliest block with sufficient timestamp"
797            );
798
799            let (state_lock, midpoint_block) =
800                self.fetch_finalized_block_by_number(state, midpoint).await;
801            state = state_lock;
802
803            tracing::debug!(?midpoint_block, %timestamp, "pivot on midpoint block");
804            if midpoint_block.timestamp < timestamp {
805                lower_bound = midpoint + 1;
806            } else {
807                upper_bound = midpoint;
808                block = midpoint_block;
809            }
810        }
811
812        block
813    }
814
815    async fn fetch_finalized_block_by_number<'a>(
816        &'a self,
817        mut state: MutexGuard<'a, L1State>,
818        number: u64,
819    ) -> (MutexGuard<'a, L1State>, L1BlockInfo) {
820        let latest_finalized = state
821            .snapshot
822            .finalized
823            .expect("get_finalized_block called before any blocks are finalized");
824        assert!(
825            number <= latest_finalized.number,
826            "requesting a finalized block {number} that isn't finalized; snapshot: {:?}",
827            state.snapshot,
828        );
829
830        if let Some(safety_margin) = self.options().l1_finalized_safety_margin {
831            if number < latest_finalized.number.saturating_sub(safety_margin) {
832                // If the requested block height is so old that we can assume all L1 providers have
833                // finalized it, we don't need to worry about failing over to a lagging L1 provider
834                // which has yet to finalize the block, so we don't need to bother with the
835                // expensive hash chaining logic below. Just look up the block by number and assume
836                // the response is finalized.
837                tracing::debug!(
838                    number,
839                    ?latest_finalized,
840                    "skipping hash check for old finalized block"
841                );
842                let (state, block) = self
843                    .load_and_cache_finalized_block(state, number.into())
844                    .await;
845                return (state, block.info);
846            }
847        }
848
849        // To get this block and be sure we are getting the correct finalized block, we first need
850        // to find an equal or later block so we can find the expected hash of this block. If we
851        // were to just look up the block by number, there could be problems if we failed over to a
852        // different (lagging) L1 provider, which has yet to finalize this block and reports a
853        // different block with the same number.
854        let mut successor_number = number;
855        let mut successor = loop {
856            if let Some(block) = state.finalized.get(&successor_number) {
857                break *block;
858            }
859            successor_number += 1;
860            if successor_number > latest_finalized.number {
861                // We don't have any cached finalized block after the requested one; fetch the
862                // current finalized block from the network.
863                // Don't hold state lock while fetching from network.
864                drop(state);
865                let block = loop {
866                    match fetch_finalized_block_from_rpc(&self.provider).await {
867                        Ok(Some(block)) => {
868                            break block;
869                        },
870                        Ok(None) => {
871                            tracing::warn!(
872                                "no finalized block even though finalized snapshot is Some; this \
873                                 can be caused by an L1 client failover"
874                            );
875                            self.retry_delay().await;
876                        },
877                        Err(err) => {
878                            tracing::warn!("Error getting finalized block: {err:#}");
879                            self.retry_delay().await;
880                        },
881                    }
882                };
883                state = self.state.lock().await;
884                state.put_finalized(block);
885                break block;
886            }
887        };
888
889        // Work backwards from the known finalized successor, fetching blocks by parent hash so we
890        // know we are getting the correct block.
891        while successor.info.number > number {
892            tracing::debug!(
893                number,
894                ?successor,
895                "checking hash chaining for finalized block"
896            );
897            (state, successor) = self
898                .load_and_cache_finalized_block(state, successor.parent_hash.into())
899                .await;
900        }
901
902        (state, successor.info)
903    }
904
905    async fn load_and_cache_finalized_block<'a>(
906        &'a self,
907        mut state: MutexGuard<'a, L1State>,
908        id: BlockId,
909    ) -> (MutexGuard<'a, L1State>, L1BlockInfoWithParent) {
910        // Don't hold state lock while fetching from network.
911        drop(state);
912        let block = loop {
913            let block = match self.provider.get_block(id).await {
914                Ok(Some(block)) => block,
915                Ok(None) => {
916                    tracing::warn!(
917                        %id,
918                        "provider error: finalized L1 block should always be available"
919                    );
920                    self.retry_delay().await;
921                    continue;
922                },
923                Err(err) => {
924                    tracing::warn!(%id, "failed to get finalized L1 block: {err:#}");
925                    self.retry_delay().await;
926                    continue;
927                },
928            };
929            break (&block).into();
930        };
931        state = self.state.lock().await;
932        state.put_finalized(block);
933        (state, block)
934    }
935
936    /// Get fee info for each `Deposit` occurring between `prev`
937    /// and `new`. Returns `Vec<FeeInfo>`
938    pub async fn get_finalized_deposits(
939        &self,
940        fee_contract_address: Address,
941        prev_finalized: Option<u64>,
942        new_finalized: u64,
943    ) -> Vec<FeeInfo> {
944        // No new blocks have been finalized, therefore there are no
945        // new deposits.
946        if prev_finalized >= Some(new_finalized) {
947            return vec![];
948        }
949
950        let opt = self.options();
951
952        // `prev` should have already been processed unless we
953        // haven't processed *any* blocks yet.
954        let prev = prev_finalized.map(|prev| prev + 1).unwrap_or(0);
955
956        // Divide the range `prev_finalized..=new_finalized` into chunks of size
957        // `events_max_block_range`.
958        let mut start = prev;
959        let end = new_finalized;
960        let chunk_size = opt.l1_events_max_block_range;
961        let chunks = std::iter::from_fn(move || {
962            let chunk_end = min(start + chunk_size - 1, end);
963            if chunk_end < start {
964                return None;
965            }
966
967            let chunk = (start, chunk_end);
968            start = chunk_end + 1;
969            Some(chunk)
970        });
971
972        // Fetch events for each chunk.
973        let events = stream::iter(chunks).then(|(from, to)| {
974            let retry_delay = opt.l1_retry_delay;
975            let fee_contract = FeeContract::new(fee_contract_address, self.provider.clone());
976            async move {
977                tracing::debug!(from, to, "fetch events in range");
978
979                // query for deposit events, loop until successful.
980                loop {
981                    match fee_contract
982                        .Deposit_filter()
983                        .address(*fee_contract.address())
984                        .from_block(from)
985                        .to_block(to)
986                        .query()
987                        .await
988                    {
989                        Ok(events) => break stream::iter(events),
990                        Err(err) => {
991                            tracing::warn!(from, to, %err, "Fee L1Event Error");
992                            sleep(retry_delay).await;
993                        },
994                    }
995                }
996            }
997        });
998        events
999            .flatten()
1000            .map(|(deposit, _)| FeeInfo::from(deposit))
1001            .collect()
1002            .await
1003    }
1004
1005    /// Check if the given address is a proxy contract.
1006    pub async fn is_proxy_contract(&self, proxy_address: Address) -> anyhow::Result<bool> {
1007        // confirm that the proxy_address is a proxy
1008        // using the implementation slot, 0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc, which is the keccak-256 hash of "eip1967.proxy.implementation" subtracted by 1
1009        let hex_bytes =
1010            hex::decode("360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc")
1011                .expect("Failed to decode hex string");
1012        let implementation_slot = B256::from_slice(&hex_bytes);
1013        let storage = self
1014            .provider
1015            .get_storage_at(proxy_address, implementation_slot.into())
1016            .await?;
1017
1018        let implementation_address = Address::from_slice(&storage.to_be_bytes::<32>()[12..]);
1019
1020        // when the implementation address is not equal to zero, it's a proxy
1021        Ok(implementation_address != Address::ZERO)
1022    }
1023
1024    pub async fn retry_on_all_providers<Fut>(
1025        &self,
1026        op: impl Fn() -> Fut,
1027    ) -> Result<Fut::Ok, Fut::Error>
1028    where
1029        Fut: TryFuture,
1030    {
1031        let transport = &self.transport;
1032        let start = transport.current_transport.read().generation % transport.urls.len();
1033        let end = start + transport.urls.len();
1034        loop {
1035            match op().into_future().await {
1036                Ok(res) => return Ok(res),
1037                Err(err) => {
1038                    if transport.current_transport.read().generation >= end {
1039                        return Err(err);
1040                    } else {
1041                        self.retry_delay().await;
1042                    }
1043                },
1044            }
1045        }
1046    }
1047
1048    pub(crate) fn options(&self) -> &L1ClientOptions {
1049        self.transport.options()
1050    }
1051
1052    fn metrics(&self) -> &L1ClientMetrics {
1053        self.transport.metrics()
1054    }
1055
1056    async fn retry_delay(&self) {
1057        sleep(self.options().l1_retry_delay).await;
1058    }
1059}
1060
1061impl L1State {
1062    fn new(cache_size: NonZeroUsize) -> Self {
1063        Self {
1064            snapshot: Default::default(),
1065            finalized: LruCache::new(cache_size),
1066            last_finalized: None,
1067        }
1068    }
1069
1070    fn put_finalized(&mut self, block: L1BlockInfoWithParent) {
1071        assert!(
1072            self.snapshot.finalized.is_some()
1073                && block.info.number <= self.snapshot.finalized.unwrap().number,
1074            "inserting a finalized block {block:?} that isn't finalized; snapshot: {:?}",
1075            self.snapshot,
1076        );
1077
1078        if Some(block.info.number()) > self.last_finalized {
1079            self.last_finalized = Some(block.info.number());
1080        }
1081
1082        if let Some((old_number, old_block)) = self.finalized.push(block.info.number, block) {
1083            if old_number == block.info.number && block != old_block {
1084                tracing::error!(
1085                    ?old_block,
1086                    ?block,
1087                    "got different info for the same finalized height; something has gone very \
1088                     wrong with the L1",
1089                );
1090            }
1091        }
1092    }
1093}
1094
1095async fn fetch_finalized_block_from_rpc(
1096    rpc: &impl Provider,
1097) -> anyhow::Result<Option<L1BlockInfoWithParent>> {
1098    let Some(block) = rpc.get_block(BlockId::finalized()).await? else {
1099        // This can happen in rare cases where the L1 chain is very young and has not finalized a
1100        // block yet. This is more common in testing and demo environments. In any case, we proceed
1101        // with a null L1 block rather than wait for the L1 to finalize a block, which can take a
1102        // long time.
1103        tracing::warn!("no finalized block yet");
1104        return Ok(None);
1105    };
1106
1107    Ok(Some((&block).into()))
1108}
1109
1110#[cfg(test)]
1111mod test {
1112    use std::{ops::Add, time::Duration};
1113
1114    use alloy::{
1115        eips::BlockNumberOrTag,
1116        node_bindings::{Anvil, AnvilInstance},
1117        primitives::utils::parse_ether,
1118        providers::layers::AnvilProvider,
1119    };
1120    use espresso_contract_deployer::{deploy_fee_contract_proxy, Contracts};
1121    use portpicker::pick_unused_port;
1122    use time::OffsetDateTime;
1123
1124    use super::*;
1125
1126    async fn new_l1_client_opt(
1127        anvil: &Arc<AnvilInstance>,
1128        f: impl FnOnce(&mut L1ClientOptions),
1129    ) -> L1Client {
1130        let mut opt = L1ClientOptions {
1131            l1_events_max_block_range: 1,
1132            l1_polling_interval: Duration::from_secs(1),
1133            subscription_timeout: Duration::from_secs(5),
1134            ..Default::default()
1135        };
1136        f(&mut opt);
1137
1138        let l1_client = opt
1139            .connect(vec![anvil.endpoint_url()])
1140            .expect("Failed to create L1 client");
1141
1142        l1_client.spawn_tasks().await;
1143        l1_client
1144    }
1145
1146    async fn new_l1_client(anvil: &Arc<AnvilInstance>, include_ws: bool) -> L1Client {
1147        new_l1_client_opt(anvil, |opt| {
1148            if include_ws {
1149                opt.l1_ws_provider = Some(vec![anvil.ws_endpoint_url()]);
1150            }
1151        })
1152        .await
1153    }
1154
1155    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1156    async fn test_get_finalized_deposits() -> anyhow::Result<()> {
1157        let num_deposits = 5;
1158
1159        let anvil = Anvil::new().spawn();
1160        let wallet = anvil.wallet().unwrap();
1161        let deployer = wallet.default_signer().address();
1162        let inner_provider = ProviderBuilder::new()
1163            .wallet(wallet)
1164            .on_http(anvil.endpoint_url());
1165        // a provider that holds both anvil (to avoid accidental drop) and wallet-enabled L1 provider
1166        let provider = AnvilProvider::new(inner_provider, Arc::new(anvil));
1167        // cache store for deployed contracts
1168        let mut contracts = Contracts::new();
1169
1170        // init and kick off the L1Client which wraps around standard L1 provider with more app-specific state management
1171        let l1_client = new_l1_client(provider.anvil(), false).await;
1172
1173        // Initialize a contract with some deposits
1174        let fee_proxy_addr = deploy_fee_contract_proxy(&provider, &mut contracts, deployer).await?;
1175        let fee_proxy = FeeContract::new(fee_proxy_addr, &provider);
1176        let num_tx_for_deploy = provider.get_block_number().await?;
1177
1178        // make some deposits.
1179        for n in 1..=num_deposits {
1180            // Varied amounts are less boring.
1181            let amount = n as f32 / 10.0;
1182            let receipt = fee_proxy
1183                .deposit(deployer)
1184                .value(parse_ether(&amount.to_string())?)
1185                .send()
1186                .await?
1187                .get_receipt()
1188                .await?;
1189            assert!(receipt.inner.is_success());
1190        }
1191
1192        let cur_height = provider.get_block_number().await?;
1193        assert_eq!(num_deposits + num_tx_for_deploy, cur_height);
1194
1195        // Set prev deposits to `None` so `Filter` will start at block
1196        // 0. The test would also succeed if we pass `0` (b/c first
1197        // block did not deposit).
1198        let pending = l1_client
1199            .get_finalized_deposits(fee_proxy_addr, None, cur_height)
1200            .await;
1201
1202        assert_eq!(num_deposits as usize, pending.len(), "{pending:?}");
1203        assert_eq!(deployer, pending[0].account().0);
1204        assert_eq!(
1205            U256::from(1500000000000000000u64),
1206            pending
1207                .iter()
1208                .fold(U256::from(0), |total, info| total.add(info.amount().0))
1209        );
1210
1211        // check a few more cases
1212        let pending = l1_client
1213            .get_finalized_deposits(fee_proxy_addr, Some(0), cur_height)
1214            .await;
1215        assert_eq!(num_deposits as usize, pending.len());
1216
1217        let pending = l1_client
1218            .get_finalized_deposits(fee_proxy_addr, Some(0), 0)
1219            .await;
1220        assert_eq!(0, pending.len());
1221
1222        let pending = l1_client
1223            .get_finalized_deposits(fee_proxy_addr, Some(0), 1)
1224            .await;
1225        assert_eq!(0, pending.len());
1226
1227        let pending = l1_client
1228            .get_finalized_deposits(fee_proxy_addr, Some(num_tx_for_deploy), num_tx_for_deploy)
1229            .await;
1230        assert_eq!(0, pending.len());
1231
1232        let pending = l1_client
1233            .get_finalized_deposits(
1234                fee_proxy_addr,
1235                Some(num_tx_for_deploy),
1236                num_tx_for_deploy + 1,
1237            )
1238            .await;
1239        assert_eq!(1, pending.len());
1240
1241        // what happens if `new_finalized` is `0`?
1242        let pending = l1_client
1243            .get_finalized_deposits(fee_proxy_addr, Some(num_tx_for_deploy), 0)
1244            .await;
1245        assert_eq!(0, pending.len());
1246
1247        Ok(())
1248    }
1249
1250    async fn test_wait_for_finalized_block_helper(ws: bool) {
1251        let anvil = Arc::new(Anvil::new().block_time_f64(0.1).spawn());
1252        let l1_client = new_l1_client(&anvil, ws).await;
1253        let provider = &l1_client.provider;
1254
1255        // Wait for a block 10 blocks in the future.
1256        let block_height = provider.get_block_number().await.unwrap();
1257        let block = l1_client.wait_for_finalized_block(block_height + 10).await;
1258        assert_eq!(block.number, block_height + 10);
1259
1260        // Compare against underlying provider.
1261        let true_block = provider
1262            .get_block(BlockId::Number(BlockNumberOrTag::Number(block_height + 10)))
1263            .full()
1264            .await
1265            .unwrap()
1266            .unwrap();
1267
1268        assert_eq!(
1269            block.timestamp.to::<u64>(),
1270            true_block.header.inner.timestamp
1271        );
1272        assert_eq!(block.hash, true_block.header.hash);
1273    }
1274
1275    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1276    async fn test_wait_for_finalized_block_ws() {
1277        test_wait_for_finalized_block_helper(true).await
1278    }
1279
1280    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1281    async fn test_wait_for_finalized_block_http() {
1282        test_wait_for_finalized_block_helper(false).await
1283    }
1284
1285    async fn test_wait_for_old_finalized_block_helper(ws: bool) {
1286        let anvil = Arc::new(Anvil::new().block_time_f64(0.2).spawn());
1287        let l1_client = new_l1_client_opt(&anvil, |opt| {
1288            if ws {
1289                opt.l1_ws_provider = Some(vec![anvil.ws_endpoint_url()]);
1290            }
1291            opt.l1_finalized_safety_margin = Some(1);
1292        })
1293        .await;
1294        let provider = &l1_client.provider;
1295
1296        // Wait for anvil to finalize a few blocks.
1297        l1_client.wait_for_finalized_block(2).await;
1298
1299        // Get an old finalized block.
1300        let block = l1_client.wait_for_finalized_block(0).await;
1301
1302        // Compare against underlying provider.
1303        let true_block = provider.get_block(0.into()).await.unwrap().unwrap();
1304        assert_eq!(block.hash, true_block.header.hash);
1305    }
1306
1307    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1308    async fn test_wait_for_old_finalized_block_ws() {
1309        test_wait_for_old_finalized_block_helper(true).await
1310    }
1311
1312    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1313    async fn test_wait_for_old_finalized_block_http() {
1314        test_wait_for_old_finalized_block_helper(false).await
1315    }
1316
1317    async fn test_wait_for_finalized_block_by_timestamp_helper(ws: bool) {
1318        let anvil = Arc::new(Anvil::new().block_time_f64(0.2).spawn());
1319        let l1_client = new_l1_client(&anvil, ws).await;
1320        let provider = &l1_client.provider;
1321
1322        // Wait for a block 5 blocks in the future.
1323        let timestamp = U256::from(OffsetDateTime::now_utc().unix_timestamp() as u64 + 5);
1324        let block = l1_client
1325            .wait_for_finalized_block_with_timestamp(timestamp)
1326            .await;
1327        assert!(
1328            block.timestamp >= timestamp,
1329            "wait_for_finalized_block_with_timestamp({timestamp}) returned too early a block: \
1330             {block:?}",
1331        );
1332        let parent = provider
1333            .get_block(BlockId::Number(BlockNumberOrTag::Number(block.number - 1)))
1334            .full()
1335            .await
1336            .unwrap()
1337            .unwrap();
1338        assert!(
1339            parent.header.inner.timestamp < timestamp.to::<u64>(),
1340            "wait_for_finalized_block_with_timestamp({timestamp}) did not return the earliest \
1341             possible block: returned {block:?}, but earlier block {parent:?} has an acceptable \
1342             timestamp too",
1343        );
1344
1345        // Compare against underlying provider.
1346        let true_block = provider
1347            .get_block(BlockId::Number(BlockNumberOrTag::Number(block.number)))
1348            .await
1349            .unwrap()
1350            .unwrap();
1351        assert_eq!(
1352            block.timestamp.to::<u64>(),
1353            true_block.header.inner.timestamp
1354        );
1355        assert_eq!(block.hash, true_block.header.hash);
1356    }
1357
1358    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1359    async fn test_wait_for_finalized_block_by_timestamp_ws() {
1360        test_wait_for_finalized_block_by_timestamp_helper(true).await
1361    }
1362
1363    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1364    async fn test_wait_for_finalized_block_by_timestamp_http() {
1365        test_wait_for_finalized_block_by_timestamp_helper(false).await
1366    }
1367
1368    async fn test_wait_for_old_finalized_block_by_timestamp_helper(ws: bool) {
1369        let anvil = Arc::new(Anvil::new().block_time_f64(0.2).spawn());
1370        let l1_client = new_l1_client(&anvil, ws).await;
1371
1372        // Get the timestamp of the first block.
1373        let true_block = l1_client.wait_for_finalized_block(0).await;
1374        let timestamp = true_block.timestamp;
1375
1376        // Wait for some more blocks to be produced.
1377        l1_client.wait_for_finalized_block(10).await;
1378
1379        // Get the old block by timestamp.
1380        let block = l1_client
1381            .wait_for_finalized_block_with_timestamp(U256::from(timestamp))
1382            .await;
1383        assert_eq!(block, true_block);
1384    }
1385
1386    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1387    async fn test_wait_for_old_finalized_block_by_timestamp_ws() {
1388        test_wait_for_old_finalized_block_by_timestamp_helper(true).await
1389    }
1390
1391    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1392    async fn test_wait_for_old_finalized_block_by_timestamp_http() {
1393        test_wait_for_old_finalized_block_by_timestamp_helper(false).await
1394    }
1395
1396    async fn test_wait_for_block_helper(ws: bool) {
1397        let anvil = Arc::new(Anvil::new().block_time_f64(0.1).spawn());
1398        let l1_client = new_l1_client(&anvil, ws).await;
1399        let provider = &l1_client.provider;
1400
1401        // Wait for a block 10 blocks in the future.
1402        let block_height = provider.get_block_number().await.unwrap();
1403        l1_client.wait_for_block(block_height + 10).await;
1404
1405        let new_block_height = provider.get_block_number().await.unwrap();
1406        assert!(
1407            new_block_height >= block_height + 10,
1408            "wait_for_block returned too early; initial height = {block_height}, new height = \
1409             {new_block_height}",
1410        );
1411    }
1412
1413    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1414    async fn test_wait_for_block_ws() {
1415        test_wait_for_block_helper(true).await
1416    }
1417
1418    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1419    async fn test_wait_for_block_http() {
1420        test_wait_for_block_helper(false).await
1421    }
1422
1423    async fn test_reconnect_update_task_helper(ws: bool) {
1424        let port = pick_unused_port().unwrap();
1425        let anvil = Arc::new(Anvil::new().block_time(1).port(port).spawn());
1426        let client = new_l1_client(&anvil, ws).await;
1427
1428        let initial_state = client.snapshot().await;
1429        tracing::info!(?initial_state, "initial state");
1430
1431        // Check the state is updating.
1432        let mut retry = 0;
1433        let updated_state = loop {
1434            assert!(retry < 10, "state did not update in time");
1435
1436            let updated_state = client.snapshot().await;
1437            if updated_state.head > initial_state.head {
1438                break updated_state;
1439            }
1440            tracing::info!(retry, "waiting for state update");
1441            sleep(Duration::from_secs(1)).await;
1442            retry += 1;
1443        };
1444        tracing::info!(?updated_state, "state updated");
1445
1446        // Disconnect the WebSocket and reconnect it. Technically this spawns a whole new Anvil
1447        // chain, but for the purposes of this test it should look to the client like an L1 server
1448        // closing a WebSocket connection.
1449        drop(anvil);
1450
1451        // Let the connection stay down for a little while: Ethers internally tries to reconnect,
1452        // and starting up to fast again might hit that and cause a false positive. The problem is,
1453        // Ethers doesn't try very hard, and if we wait a bit, we will test the worst possible case
1454        // where the internal retry logic gives up and just kills the whole provider.
1455        tracing::info!("sleep 5");
1456        sleep(Duration::from_secs(5)).await;
1457
1458        // Once a connection is reestablished, the state will eventually start to update again.
1459        tracing::info!("restarting L1");
1460        let _anvil = Anvil::new().block_time(1).port(port).spawn();
1461
1462        let mut retry = 0;
1463        let final_state = loop {
1464            assert!(retry < 5, "state did not update in time");
1465
1466            let final_state = client.snapshot().await;
1467            if final_state.head > updated_state.head {
1468                break final_state;
1469            }
1470            tracing::info!(retry, "waiting for state update");
1471            sleep(Duration::from_secs(1)).await;
1472            retry += 1;
1473        };
1474        tracing::info!(?final_state, "state updated");
1475    }
1476
1477    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1478    async fn test_reconnect_update_task_ws() {
1479        test_reconnect_update_task_helper(true).await
1480    }
1481
1482    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1483    async fn test_reconnect_update_task_http() {
1484        test_reconnect_update_task_helper(false).await
1485    }
1486
1487    // #[test_log::test(tokio::test(flavor = "multi_thread"))]
1488    // async fn test_fetch_stake_table() -> anyhow::Result<()> {
1489
1490    //     let anvil = Anvil::new().spawn();
1491    //     let wallet = anvil.wallet().unwrap();
1492    //     let inner_provider = ProviderBuilder::new()
1493    //         .wallet(wallet)
1494    //         .on_http(anvil.endpoint_url());
1495    //     let provider = AnvilProvider::new(inner_provider, Arc::new(anvil));
1496
1497    //     let l1_client = new_l1_client(provider.anvil(), false).await;
1498    //     let mut contracts = Contracts::new();
1499
1500    //     let stake_table_addr =
1501    //         deploy_permissioned_stake_table(&provider, &mut contracts, vec![]).await?;
1502    //     let stake_table_contract = PermissionedStakeTable::new(stake_table_addr, &provider);
1503
1504    //     let mut rng = rand::thread_rng();
1505    //     let node = NodeInfoSol::rand(&mut rng);
1506
1507    //     let new_nodes: Vec<NodeInfoSol> = vec![node];
1508    //     stake_table_contract
1509    //         .update(vec![], new_nodes)
1510    //         .send()
1511    //         .await?
1512    //         .watch()
1513    //         .await?;
1514
1515    //     let block = l1_client.get_block(BlockId::latest()).await?.unwrap();
1516    //     let nodes = l1_client
1517    //         .get_stake_table(stake_table_addr, block.header.inner.number)
1518    //         .await?;
1519
1520    //     let result = nodes.stake_table.0[0].clone();
1521    //     assert_eq!(result.stake_table_entry.stake_amount.to::<u64>(), 1);
1522    //     Ok(())
1523    // }
1524
1525    /// A helper function to get the index of the current provider in the failover list.
1526    fn get_failover_index(provider: &L1Client) -> usize {
1527        let transport = &provider.transport;
1528        provider.transport.current_transport.read().generation % transport.urls.len()
1529    }
1530
1531    async fn test_failover_update_task_helper(ws: bool) {
1532        let anvil = Anvil::new().block_time(1).spawn();
1533
1534        // Create an L1 client with fake providers, and check that the state is still updated after
1535        // it correctly fails over to the real providers.
1536        let client = L1ClientOptions {
1537            l1_polling_interval: Duration::from_secs(1),
1538            // Use a very long subscription timeout, so that we only succeed by triggering a
1539            // failover.
1540            subscription_timeout: Duration::from_secs(1000),
1541            l1_ws_provider: if ws {
1542                Some(vec![
1543                    "ws://notarealurl:1234".parse().unwrap(),
1544                    anvil.ws_endpoint_url(),
1545                ])
1546            } else {
1547                None
1548            },
1549            ..Default::default()
1550        }
1551        .connect(vec![
1552            "http://notarealurl:1234".parse().unwrap(),
1553            anvil.endpoint_url(),
1554        ])
1555        .expect("Failed to create L1 client");
1556
1557        client.spawn_tasks().await;
1558
1559        let initial_state = client.snapshot().await;
1560        tracing::info!(?initial_state, "initial state");
1561
1562        // Check the state is updating.
1563        let mut retry = 0;
1564        let updated_state = loop {
1565            assert!(retry < 10, "state did not update in time");
1566
1567            let updated_state = client.snapshot().await;
1568            if updated_state.head > initial_state.head {
1569                break updated_state;
1570            }
1571            tracing::info!(retry, "waiting for state update");
1572            sleep(Duration::from_secs(1)).await;
1573            retry += 1;
1574        };
1575        tracing::info!(?updated_state, "state updated");
1576    }
1577
1578    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1579    async fn test_failover_update_task_ws() {
1580        test_failover_update_task_helper(true).await;
1581    }
1582
1583    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1584    async fn test_failover_update_task_http() {
1585        test_failover_update_task_helper(false).await;
1586    }
1587
1588    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1589    async fn test_failover_consecutive_failures() {
1590        let anvil = Anvil::new().block_time(1).spawn();
1591
1592        let l1_options = L1ClientOptions {
1593            l1_polling_interval: Duration::from_secs(1),
1594            l1_frequent_failure_tolerance: Duration::from_millis(0),
1595            l1_consecutive_failure_tolerance: 3,
1596            ..Default::default()
1597        };
1598
1599        let provider = l1_options
1600            .connect(vec![
1601                "http://notarealurl:1234".parse().unwrap(),
1602                anvil.endpoint_url(),
1603            ])
1604            .expect("Failed to create L1 client");
1605
1606        // Make just enough failed requests not to trigger a failover.
1607        for _ in 0..2 {
1608            provider.get_block_number().await.unwrap_err();
1609            assert!(get_failover_index(&provider) == 0);
1610        }
1611
1612        // The final request triggers failover.
1613        provider.get_block_number().await.unwrap_err();
1614        assert!(get_failover_index(&provider) == 1);
1615
1616        // Now requests succeed.
1617        provider.get_block_number().await.unwrap();
1618    }
1619
1620    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1621    async fn test_failover_frequent_failures() {
1622        let anvil = Anvil::new().block_time(1).spawn();
1623        let provider = L1ClientOptions {
1624            l1_polling_interval: Duration::from_secs(1),
1625            l1_frequent_failure_tolerance: Duration::from_millis(100),
1626            ..Default::default()
1627        }
1628        .connect(vec![
1629            "http://notarealurl:1234".parse().unwrap(),
1630            anvil.endpoint_url(),
1631        ])
1632        .expect("Failed to create L1 client");
1633
1634        // Two failed requests that are not within the tolerance window do not trigger a failover.
1635        provider.get_block_number().await.unwrap_err();
1636        sleep(Duration::from_secs(1)).await;
1637        provider.get_block_number().await.unwrap_err();
1638
1639        // Check that we didn't fail over.
1640        assert!(get_failover_index(&provider) == 0);
1641
1642        // Reset the window.
1643        sleep(Duration::from_secs(1)).await;
1644
1645        // Two failed requests in a row trigger failover.
1646        provider.get_block_number().await.unwrap_err();
1647        provider.get_block_number().await.unwrap_err();
1648        provider.get_block_number().await.unwrap();
1649        assert!(get_failover_index(&provider) == 1);
1650    }
1651
1652    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1653    async fn test_failover_revert() {
1654        let anvil = Anvil::new().block_time(1).spawn();
1655        let provider = L1ClientOptions {
1656            l1_polling_interval: Duration::from_secs(1),
1657            l1_consecutive_failure_tolerance: 1,
1658            l1_failover_revert: Duration::from_secs(2),
1659            ..Default::default()
1660        }
1661        .connect(vec![
1662            "http://notarealurl:1234".parse().unwrap(),
1663            anvil.endpoint_url(),
1664        ])
1665        .expect("Failed to create L1 client");
1666
1667        // The first request fails and triggers a failover.
1668        provider.get_block_number().await.unwrap_err();
1669        assert_eq!(get_failover_index(&provider), 1);
1670
1671        // The next request succeeds from the other provider.
1672        provider.get_block_number().await.unwrap();
1673
1674        // Eventually we revert back to the primary and requests fail again.
1675        sleep(Duration::from_millis(2100)).await;
1676        provider.get_block_number().await.unwrap_err();
1677    }
1678
1679    // Checks that the L1 client initialized the state on startup even
1680    // if the L1 is not currently mining blocks. It's useful for testing that we
1681    // don't require an L1 that is continuously mining blocks.
1682    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1683    async fn test_update_loop_initializes_l1_state() {
1684        let anvil = Arc::new(Anvil::new().port(9988u16).spawn());
1685        let l1_client = new_l1_client(&anvil, true).await;
1686
1687        for _try in 0..10 {
1688            let mut state = l1_client.state.lock().await;
1689            let has_snapshot = state.snapshot.finalized.is_some();
1690            let has_cache = state.finalized.get(&0).is_some();
1691            drop(state);
1692            if has_snapshot && has_cache {
1693                return;
1694            }
1695            sleep(Duration::from_millis(200)).await;
1696        }
1697        panic!("L1 state of L1Client not initialized");
1698    }
1699}