1pub mod bootstrap;
9use std::{collections::HashMap, marker::PhantomData, num::NonZeroUsize, time::Duration};
10
11use futures::{
13 channel::{mpsc, oneshot::Sender},
14 SinkExt,
15};
16use hotshot_types::traits::signature_key::SignatureKey;
17use lazy_static::lazy_static;
18use libp2p::kad::{
19 store::MemoryStore, BootstrapOk, GetClosestPeersOk,
20 GetRecordOk, GetRecordResult, ProgressStep, PutRecordResult, QueryId, QueryResult, Record,
21};
22use libp2p::kad::{
23 store::RecordStore, Behaviour as KademliaBehaviour, BootstrapError, Event as KademliaEvent,
24};
25use libp2p_identity::PeerId;
26use store::{
27 persistent::{DhtPersistentStorage, PersistentStore},
28 validated::ValidatedStore,
29};
30use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
31use tracing::{debug, error, warn};
32
33pub mod record;
35
36pub mod store;
38
39lazy_static! {
40 static ref MAX_DHT_QUERY_SIZE: NonZeroUsize = NonZeroUsize::new(50).unwrap();
42}
43
44use super::exponential_backoff::ExponentialBackoff;
45use crate::network::{ClientRequest, NetworkEvent};
46
47#[derive(Debug)]
54pub struct DHTBehaviour<K: SignatureKey + 'static, D: DhtPersistentStorage> {
55 pub in_progress_get_closest_peers: HashMap<QueryId, Sender<()>>,
57 in_progress_record_queries: HashMap<QueryId, KadGetQuery>,
59 outstanding_dht_query_keys: HashMap<Vec<u8>, QueryId>,
61 in_progress_put_record_queries: HashMap<QueryId, KadPutQuery>,
63 pub bootstrap_state: Bootstrap,
65 pub peer_id: PeerId,
67 pub replication_factor: NonZeroUsize,
69 retry_tx: Option<UnboundedSender<ClientRequest>>,
71 bootstrap_tx: Option<mpsc::Sender<bootstrap::InputEvent>>,
73
74 phantom: PhantomData<(K, D)>,
76}
77
78#[derive(Debug, Clone)]
80pub struct Bootstrap {
81 pub state: State,
83 pub backoff: ExponentialBackoff,
85}
86
87#[derive(Copy, Clone, Debug, PartialEq, Eq)]
89pub enum State {
90 NotStarted,
92 Started,
94}
95
96#[derive(Copy, Clone, Debug, PartialEq, Eq)]
98pub enum DHTEvent {
99 IsBootstrapped,
101}
102
103impl<K: SignatureKey + 'static, D: DhtPersistentStorage> DHTBehaviour<K, D> {
104 pub fn set_retry(&mut self, tx: UnboundedSender<ClientRequest>) {
106 self.retry_tx = Some(tx);
107 }
108 pub fn set_bootstrap_sender(&mut self, tx: mpsc::Sender<bootstrap::InputEvent>) {
110 self.bootstrap_tx = Some(tx);
111 }
112 #[must_use]
114 pub fn new(pid: PeerId, replication_factor: NonZeroUsize) -> Self {
115 Self {
121 peer_id: pid,
122 in_progress_record_queries: HashMap::default(),
123 in_progress_put_record_queries: HashMap::default(),
124 outstanding_dht_query_keys: HashMap::default(),
125 bootstrap_state: Bootstrap {
126 state: State::NotStarted,
127 backoff: ExponentialBackoff::new(2, Duration::from_secs(1)),
128 },
129 in_progress_get_closest_peers: HashMap::default(),
130 replication_factor,
131 retry_tx: None,
132 bootstrap_tx: None,
133 phantom: PhantomData,
134 }
135 }
136
137 pub fn print_routing_table(
139 &mut self,
140 kadem: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
141 ) {
142 let mut err = format!("KBUCKETS: PID: {:?}, ", self.peer_id);
143 let v = kadem.kbuckets().collect::<Vec<_>>();
144 for i in v {
145 for j in i.iter() {
146 let s = format!(
147 "node: key: {:?}, val {:?}, status: {:?}",
148 j.node.key, j.node.value, j.status
149 );
150 err.push_str(&s);
151 }
152 }
153 error!("{:?}", err);
154 }
155
156 #[must_use]
158 pub fn replication_factor(&self) -> NonZeroUsize {
159 self.replication_factor
160 }
161 pub fn put_record(&mut self, id: QueryId, query: KadPutQuery) {
165 self.in_progress_put_record_queries.insert(id, query);
166 }
167
168 pub fn get_record(
170 &mut self,
171 key: Vec<u8>,
172 chans: Vec<Sender<Vec<u8>>>,
173 backoff: ExponentialBackoff,
174 retry_count: u8,
175 kad: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
176 ) {
177 if retry_count == 0 {
179 return;
180 }
181
182 if let Some(entry) = kad.store_mut().get(&key.clone().into()) {
184 for chan in chans {
186 if chan.send(entry.value.clone()).is_err() {
187 warn!("Get DHT: channel closed before get record request result could be sent");
188 }
189 }
190 } else {
191 if let Some(qid) = self.outstanding_dht_query_keys.get(&key) {
193 let Some(query) = self.in_progress_record_queries.get_mut(qid) else {
196 warn!("Get DHT: outstanding query not found");
197 return;
198 };
199
200 query.notify.extend(chans);
202 } else {
203 let qid = kad.get_record(key.clone().into());
205 let query = KadGetQuery {
206 backoff,
207 progress: DHTProgress::InProgress(qid),
208 notify: chans,
209 key: key.clone(),
210 retry_count: retry_count - 1,
211 records: Vec::new(),
212 };
213
214 self.outstanding_dht_query_keys.insert(key, qid);
216 self.in_progress_record_queries.insert(qid, query);
217 }
218 }
219 }
220
221 fn retry_get(&self, mut query: KadGetQuery) {
223 let Some(tx) = self.retry_tx.clone() else {
224 return;
225 };
226 let req = ClientRequest::GetDHT {
227 key: query.key,
228 notify: query.notify,
229 retry_count: query.retry_count,
230 };
231 let backoff = query.backoff.next_timeout(false);
232 spawn(async move {
233 sleep(backoff).await;
234 let _ = tx.send(req);
235 });
236 }
237
238 fn retry_put(&self, mut query: KadPutQuery) {
240 let Some(tx) = self.retry_tx.clone() else {
241 return;
242 };
243 let req = ClientRequest::PutDHT {
244 key: query.key,
245 value: query.value,
246 notify: query.notify,
247 };
248 spawn(async move {
249 sleep(query.backoff.next_timeout(false)).await;
250 let _ = tx.send(req);
251 });
252 }
253
254 fn handle_get_query(
256 &mut self,
257 store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
258 record_results: GetRecordResult,
259 id: QueryId,
260 mut last: bool,
261 ) {
262 let found = match self.in_progress_record_queries.get_mut(&id) {
263 Some(query) => match record_results {
264 Ok(results) => match results {
265 GetRecordOk::FoundRecord(record) => {
266 if record.record.expires.is_some() {
268 query.records.push(record.record);
269 true
270 } else {
271 false
272 }
273 },
274 GetRecordOk::FinishedWithNoAdditionalRecord {
275 cache_candidates: _,
276 } => {
277 tracing::debug!("GetRecord Finished with No Additional Record");
278 last = true;
279 false
280 },
281 },
282 Err(err) => {
283 warn!("Error in Kademlia query: {err:?}");
284 false
285 },
286 },
287 None => {
288 return;
291 },
292 };
293
294 if found || last {
296 if let Some(KadGetQuery {
297 backoff,
298 progress,
299 notify,
300 key,
301 retry_count,
302 records,
303 }) = self.in_progress_record_queries.remove(&id)
304 {
305 self.outstanding_dht_query_keys.remove(&key);
307
308 let notify = notify
310 .into_iter()
311 .filter(|n| !n.is_canceled())
312 .collect::<Vec<_>>();
313
314 if notify.is_empty() {
316 return;
317 }
318
319 if let Some(record) = records.into_iter().max_by_key(|r| r.expires.unwrap()) {
321 if store.put(record.clone()).is_ok() {
323 for n in notify {
325 if n.send(record.value.clone()).is_err() {
326 warn!("Get DHT: channel closed before get record request result could be sent");
327 }
328 }
329 } else {
330 error!("Failed to store record in local store");
331 }
332 }
333 else {
335 if retry_count > 0 {
338 let new_retry_count = retry_count - 1;
339 warn!("Get DHT: Internal disagreement for get dht request {progress:?}! requerying with more nodes. {new_retry_count:?} retries left");
340 self.retry_get(KadGetQuery {
341 backoff,
342 progress: DHTProgress::NotStarted,
343 notify,
344 key,
345 retry_count: new_retry_count,
346 records: Vec::new(),
347 });
348 }
349 warn!("Get DHT: Internal disagreement for get dht request {progress:?}! Giving up because out of retries. ");
350 }
351 }
352 }
353 }
354
355 fn handle_put_query(&mut self, record_results: PutRecordResult, id: QueryId) {
357 if let Some(mut query) = self.in_progress_put_record_queries.remove(&id) {
358 if query.notify.is_canceled() {
360 return;
361 }
362
363 match record_results {
364 Ok(_) => {
365 if query.notify.send(()).is_err() {
366 warn!("Put DHT: client channel closed before put record request could be sent");
367 }
368 },
369 Err(e) => {
370 query.progress = DHTProgress::NotStarted;
371 query.backoff.start_next(false);
372
373 warn!(
374 "Put DHT: error performing put: {:?}. Retrying on pid {:?}.",
375 e, self.peer_id
376 );
377 self.retry_put(query);
379 },
380 }
381 } else {
382 warn!("Put DHT: completed DHT query that is no longer tracked.");
383 }
384 }
385
386 fn finish_bootstrap(&mut self) {
388 if let Some(mut tx) = self.bootstrap_tx.clone() {
389 spawn(async move { tx.send(bootstrap::InputEvent::BootstrapFinished).await });
390 }
391 }
392 #[allow(clippy::too_many_lines)]
393 pub fn dht_handle_event(
395 &mut self,
396 event: KademliaEvent,
397 store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
398 ) -> Option<NetworkEvent> {
399 match event {
400 KademliaEvent::OutboundQueryProgressed {
401 result: QueryResult::PutRecord(record_results),
402 id,
403 step: ProgressStep { last, .. },
404 ..
405 } => {
406 if last {
407 self.handle_put_query(record_results, id);
408 }
409 },
410 KademliaEvent::OutboundQueryProgressed {
411 result: QueryResult::GetClosestPeers(r),
412 id: query_id,
413 stats: _,
414 step: ProgressStep { last: true, .. },
415 ..
416 } => match r {
417 Ok(GetClosestPeersOk { key, peers: _ }) => {
418 if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id) {
419 if chan.send(()).is_err() {
420 warn!("DHT: finished query but client was no longer interested");
421 };
422 };
423 debug!("Successfully got closest peers for key {key:?}");
424 },
425 Err(e) => {
426 if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id) {
427 let _: Result<_, _> = chan.send(());
428 };
429 warn!("Failed to get closest peers: {e:?}");
430 },
431 },
432 KademliaEvent::OutboundQueryProgressed {
433 result: QueryResult::GetRecord(record_results),
434 id,
435 step: ProgressStep { last, .. },
436 ..
437 } => {
438 self.handle_get_query(store, record_results, id, last);
439 },
440 KademliaEvent::OutboundQueryProgressed {
441 result:
442 QueryResult::Bootstrap(Ok(BootstrapOk {
443 peer: _,
444 num_remaining,
445 })),
446 step: ProgressStep { last: true, .. },
447 ..
448 } => {
449 if num_remaining == 0 {
450 self.finish_bootstrap();
451 } else {
452 debug!("Bootstrap in progress, {num_remaining} nodes remaining");
453 }
454 return Some(NetworkEvent::IsBootstrapped);
455 },
456 KademliaEvent::OutboundQueryProgressed {
457 result: QueryResult::Bootstrap(Err(e)),
458 ..
459 } => {
460 let BootstrapError::Timeout { num_remaining, .. } = e;
461 if num_remaining.is_none() {
462 error!("Failed to bootstrap: {e:?}");
463 }
464 self.finish_bootstrap();
465 },
466 KademliaEvent::RoutablePeer { peer, address: _ } => {
467 debug!("Found routable peer {peer:?}");
468 },
469 KademliaEvent::PendingRoutablePeer { peer, address: _ } => {
470 debug!("Found pending routable peer {peer:?}");
471 },
472 KademliaEvent::UnroutablePeer { peer } => {
473 debug!("Found unroutable peer {peer:?}");
474 },
475 KademliaEvent::RoutingUpdated {
476 peer: _,
477 is_new_peer: _,
478 addresses: _,
479 bucket_range: _,
480 old_peer: _,
481 } => {
482 debug!("Routing table updated");
483 },
484 e @ KademliaEvent::OutboundQueryProgressed { .. } => {
485 debug!("Not handling dht event {e:?}");
486 },
487 e => {
488 debug!("New unhandled swarm event: {e:?}");
489 },
490 }
491 None
492 }
493}
494
495#[derive(Debug)]
497pub(crate) struct KadGetQuery {
498 pub(crate) backoff: ExponentialBackoff,
500 pub(crate) progress: DHTProgress,
502 pub(crate) notify: Vec<Sender<Vec<u8>>>,
504 pub(crate) key: Vec<u8>,
506 pub(crate) retry_count: u8,
508 pub(crate) records: Vec<Record>,
510}
511
512#[derive(Debug)]
514pub struct KadPutQuery {
515 pub(crate) backoff: ExponentialBackoff,
517 pub(crate) progress: DHTProgress,
519 pub(crate) notify: Sender<()>,
521 pub(crate) key: Vec<u8>,
523 pub(crate) value: Vec<u8>,
525}
526
527#[derive(Debug, Clone, Eq, Hash, PartialEq)]
529pub enum DHTProgress {
530 InProgress(QueryId),
532 NotStarted,
534}