hotshot_libp2p_networking/network/behaviours/dht/
bootstrap.rs1use std::time::Duration;
8
9use futures::{channel::mpsc, StreamExt};
10use tokio::{spawn, sync::mpsc::UnboundedSender, time::timeout};
11
12use crate::network::ClientRequest;
13
14pub enum InputEvent {
16 StartBootstrap,
18 BootstrapFinished,
20 ShutdownBootstrap,
22}
23pub struct DHTBootstrapTask {
25 rx: mpsc::Receiver<InputEvent>,
27 network_tx: UnboundedSender<ClientRequest>,
29 in_progress: bool,
31}
32
33impl DHTBootstrapTask {
34 pub fn run(rx: mpsc::Receiver<InputEvent>, tx: UnboundedSender<ClientRequest>) {
36 spawn(async move {
37 let state = Self {
38 rx,
39 network_tx: tx,
40 in_progress: false,
41 };
42 state.run_loop().await;
43 });
44 }
45 async fn run_loop(mut self) {
47 loop {
48 if self.in_progress {
49 match self.rx.next().await {
50 Some(InputEvent::BootstrapFinished) => {
51 tracing::debug!("Bootstrap finished");
52 self.in_progress = false;
53 },
54 Some(InputEvent::ShutdownBootstrap) => {
55 tracing::info!("ShutdownBootstrap received, shutting down");
56 break;
57 },
58 Some(InputEvent::StartBootstrap) => {
59 tracing::warn!("Trying to start bootstrap that's already in progress");
60 continue;
61 },
62 None => {
63 tracing::debug!("Bootstrap channel closed, exiting loop");
64 break;
65 },
66 }
67 } else if let Ok(maybe_event) = timeout(Duration::from_secs(120), self.rx.next()).await
68 {
69 match maybe_event {
70 Some(InputEvent::StartBootstrap) => {
71 tracing::debug!("Start bootstrap in bootstrap task");
72 self.bootstrap();
73 },
74 Some(InputEvent::ShutdownBootstrap) => {
75 tracing::debug!("ShutdownBootstrap received, shutting down");
76 break;
77 },
78 Some(InputEvent::BootstrapFinished) => {
79 tracing::debug!("not in progress got bootstrap finished");
80 },
81 None => {
82 tracing::debug!("Bootstrap channel closed, exiting loop");
83 break;
84 },
85 }
86 } else {
87 tracing::debug!("Start bootstrap in bootstrap task after timeout");
88 self.bootstrap();
89 }
90 }
91 }
92
93 fn bootstrap(&mut self) {
95 self.in_progress = true;
96 let _ = self.network_tx.send(ClientRequest::BeginBootstrap);
97 }
98}