hotshot_libp2p_networking/network/behaviours/dht/
bootstrap.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::time::Duration;
8
9use futures::{channel::mpsc, StreamExt};
10use tokio::{spawn, sync::mpsc::UnboundedSender, time::timeout};
11
12use crate::network::ClientRequest;
13
14/// Internal bootstrap events
15pub enum InputEvent {
16    /// Start bootstrap
17    StartBootstrap,
18    /// Bootstrap has finished
19    BootstrapFinished,
20    /// Shutdown bootstrap
21    ShutdownBootstrap,
22}
23/// Bootstrap task's state
24pub struct DHTBootstrapTask {
25    /// Task's receiver
26    rx: mpsc::Receiver<InputEvent>,
27    /// Task's sender
28    network_tx: UnboundedSender<ClientRequest>,
29    /// Field indicating progress state
30    in_progress: bool,
31}
32
33impl DHTBootstrapTask {
34    /// Run bootstrap task
35    pub fn run(rx: mpsc::Receiver<InputEvent>, tx: UnboundedSender<ClientRequest>) {
36        spawn(async move {
37            let state = Self {
38                rx,
39                network_tx: tx,
40                in_progress: false,
41            };
42            state.run_loop().await;
43        });
44    }
45    /// Task's loop
46    async fn run_loop(mut self) {
47        loop {
48            if self.in_progress {
49                match self.rx.next().await {
50                    Some(InputEvent::BootstrapFinished) => {
51                        tracing::debug!("Bootstrap finished");
52                        self.in_progress = false;
53                    },
54                    Some(InputEvent::ShutdownBootstrap) => {
55                        tracing::info!("ShutdownBootstrap received, shutting down");
56                        break;
57                    },
58                    Some(InputEvent::StartBootstrap) => {
59                        tracing::warn!("Trying to start bootstrap that's already in progress");
60                        continue;
61                    },
62                    None => {
63                        tracing::debug!("Bootstrap channel closed, exiting loop");
64                        break;
65                    },
66                }
67            } else if let Ok(maybe_event) = timeout(Duration::from_secs(120), self.rx.next()).await
68            {
69                match maybe_event {
70                    Some(InputEvent::StartBootstrap) => {
71                        tracing::debug!("Start bootstrap in bootstrap task");
72                        self.bootstrap();
73                    },
74                    Some(InputEvent::ShutdownBootstrap) => {
75                        tracing::debug!("ShutdownBootstrap received, shutting down");
76                        break;
77                    },
78                    Some(InputEvent::BootstrapFinished) => {
79                        tracing::debug!("not in progress got bootstrap finished");
80                    },
81                    None => {
82                        tracing::debug!("Bootstrap channel closed, exiting loop");
83                        break;
84                    },
85                }
86            } else {
87                tracing::debug!("Start bootstrap in bootstrap task after timeout");
88                self.bootstrap();
89            }
90        }
91    }
92
93    /// Start bootstrap
94    fn bootstrap(&mut self) {
95        self.in_progress = true;
96        let _ = self.network_tx.send(ClientRequest::BeginBootstrap);
97    }
98}