1pub mod bootstrap;
9use std::{collections::HashMap, marker::PhantomData, num::NonZeroUsize, time::Duration};
10
11use futures::{
13 channel::{mpsc, oneshot::Sender},
14 SinkExt,
15};
16use hotshot_types::traits::signature_key::SignatureKey;
17use lazy_static::lazy_static;
18use libp2p::kad::{
19 store::MemoryStore, BootstrapOk, GetClosestPeersOk,
20 GetRecordOk, GetRecordResult, ProgressStep, PutRecordResult, QueryId, QueryResult, Record,
21};
22use libp2p::kad::{
23 store::RecordStore, Behaviour as KademliaBehaviour, BootstrapError, Event as KademliaEvent,
24};
25use libp2p_identity::PeerId;
26use store::{
27 persistent::{DhtPersistentStorage, PersistentStore},
28 validated::ValidatedStore,
29};
30use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
31use tracing::{debug, error, warn};
32
33pub mod record;
35
36pub mod store;
38
39lazy_static! {
40 static ref MAX_DHT_QUERY_SIZE: NonZeroUsize = NonZeroUsize::new(50).unwrap();
42}
43
44use super::exponential_backoff::ExponentialBackoff;
45use crate::network::{ClientRequest, NetworkEvent};
46
47#[derive(Debug)]
54pub struct DHTBehaviour<K: SignatureKey + 'static, D: DhtPersistentStorage> {
55 pub in_progress_get_closest_peers: HashMap<QueryId, Sender<()>>,
57 in_progress_record_queries: HashMap<QueryId, KadGetQuery>,
59 outstanding_dht_query_keys: HashMap<Vec<u8>, QueryId>,
61 in_progress_put_record_queries: HashMap<QueryId, KadPutQuery>,
63 pub bootstrap_state: Bootstrap,
65 pub peer_id: PeerId,
67 pub replication_factor: NonZeroUsize,
69 retry_tx: Option<UnboundedSender<ClientRequest>>,
71 bootstrap_tx: Option<mpsc::Sender<bootstrap::InputEvent>>,
73
74 phantom: PhantomData<(K, D)>,
76}
77
78#[derive(Debug, Clone)]
80pub struct Bootstrap {
81 pub state: State,
83 pub backoff: ExponentialBackoff,
85}
86
87#[derive(Copy, Clone, Debug, PartialEq, Eq)]
89pub enum State {
90 NotStarted,
92 Started,
94}
95
96#[derive(Copy, Clone, Debug, PartialEq, Eq)]
98pub enum DHTEvent {
99 IsBootstrapped,
101}
102
103impl<K: SignatureKey + 'static, D: DhtPersistentStorage> DHTBehaviour<K, D> {
104 pub fn set_retry(&mut self, tx: UnboundedSender<ClientRequest>) {
106 self.retry_tx = Some(tx);
107 }
108 pub fn set_bootstrap_sender(&mut self, tx: mpsc::Sender<bootstrap::InputEvent>) {
110 self.bootstrap_tx = Some(tx);
111 }
112 #[must_use]
114 pub fn new(pid: PeerId, replication_factor: NonZeroUsize) -> Self {
115 Self {
121 peer_id: pid,
122 in_progress_record_queries: HashMap::default(),
123 in_progress_put_record_queries: HashMap::default(),
124 outstanding_dht_query_keys: HashMap::default(),
125 bootstrap_state: Bootstrap {
126 state: State::NotStarted,
127 backoff: ExponentialBackoff::new(2, Duration::from_secs(1)),
128 },
129 in_progress_get_closest_peers: HashMap::default(),
130 replication_factor,
131 retry_tx: None,
132 bootstrap_tx: None,
133 phantom: PhantomData,
134 }
135 }
136
137 pub fn print_routing_table(
139 &mut self,
140 kadem: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
141 ) {
142 let mut err = format!("KBUCKETS: PID: {:?}, ", self.peer_id);
143 let v = kadem.kbuckets().collect::<Vec<_>>();
144 for i in v {
145 for j in i.iter() {
146 let s = format!(
147 "node: key: {:?}, val {:?}, status: {:?}",
148 j.node.key, j.node.value, j.status
149 );
150 err.push_str(&s);
151 }
152 }
153 error!("{:?}", err);
154 }
155
156 #[must_use]
158 pub fn replication_factor(&self) -> NonZeroUsize {
159 self.replication_factor
160 }
161 pub fn put_record(&mut self, id: QueryId, query: KadPutQuery) {
165 self.in_progress_put_record_queries.insert(id, query);
166 }
167
168 pub fn get_record(
170 &mut self,
171 key: Vec<u8>,
172 chans: Vec<Sender<Vec<u8>>>,
173 backoff: ExponentialBackoff,
174 retry_count: u8,
175 kad: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
176 ) {
177 if retry_count == 0 {
179 return;
180 }
181
182 if let Some(entry) = kad.store_mut().get(&key.clone().into()) {
184 for chan in chans {
186 if chan.send(entry.value.clone()).is_err() {
187 warn!("Get DHT: channel closed before get record request result could be sent");
188 }
189 }
190 } else {
191 if let Some(qid) = self.outstanding_dht_query_keys.get(&key) {
193 let Some(query) = self.in_progress_record_queries.get_mut(qid) else {
196 warn!("Get DHT: outstanding query not found");
197 return;
198 };
199
200 query.notify.extend(chans);
202 } else {
203 let qid = kad.get_record(key.clone().into());
205 let query = KadGetQuery {
206 backoff,
207 progress: DHTProgress::InProgress(qid),
208 notify: chans,
209 key: key.clone(),
210 retry_count: retry_count - 1,
211 records: Vec::new(),
212 };
213
214 self.outstanding_dht_query_keys.insert(key, qid);
216 self.in_progress_record_queries.insert(qid, query);
217 }
218 }
219 }
220
221 fn retry_get(&self, mut query: KadGetQuery) {
223 let Some(tx) = self.retry_tx.clone() else {
224 return;
225 };
226 let req = ClientRequest::GetDHT {
227 key: query.key,
228 notify: query.notify,
229 retry_count: query.retry_count,
230 };
231 let backoff = query.backoff.next_timeout(false);
232 spawn(async move {
233 sleep(backoff).await;
234 let _ = tx.send(req);
235 });
236 }
237
238 fn retry_put(&self, mut query: KadPutQuery) {
240 let Some(tx) = self.retry_tx.clone() else {
241 return;
242 };
243 let req = ClientRequest::PutDHT {
244 key: query.key,
245 value: query.value,
246 notify: query.notify,
247 };
248 spawn(async move {
249 sleep(query.backoff.next_timeout(false)).await;
250 let _ = tx.send(req);
251 });
252 }
253
254 fn handle_get_query(
256 &mut self,
257 store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
258 record_results: GetRecordResult,
259 id: QueryId,
260 mut last: bool,
261 ) {
262 let found = match self.in_progress_record_queries.get_mut(&id) {
263 Some(query) => match record_results {
264 Ok(results) => match results {
265 GetRecordOk::FoundRecord(record) => {
266 if record.record.expires.is_some() {
268 query.records.push(record.record);
269 true
270 } else {
271 false
272 }
273 },
274 GetRecordOk::FinishedWithNoAdditionalRecord {
275 cache_candidates: _,
276 } => {
277 tracing::debug!("GetRecord Finished with No Additional Record");
278 last = true;
279 false
280 },
281 },
282 Err(err) => {
283 warn!("Error in Kademlia query: {err:?}");
284 false
285 },
286 },
287 None => {
288 return;
291 },
292 };
293
294 if found || last {
296 if let Some(KadGetQuery {
297 backoff,
298 progress,
299 notify,
300 key,
301 retry_count,
302 records,
303 }) = self.in_progress_record_queries.remove(&id)
304 {
305 self.outstanding_dht_query_keys.remove(&key);
307
308 let notify = notify
310 .into_iter()
311 .filter(|n| !n.is_canceled())
312 .collect::<Vec<_>>();
313
314 if notify.is_empty() {
316 return;
317 }
318
319 if let Some(record) = records.into_iter().max_by_key(|r| r.expires.unwrap()) {
321 if store.put(record.clone()).is_ok() {
323 for n in notify {
325 if n.send(record.value.clone()).is_err() {
326 warn!(
327 "Get DHT: channel closed before get record request result \
328 could be sent"
329 );
330 }
331 }
332 } else {
333 error!("Failed to store record in local store");
334 }
335 }
336 else {
338 if retry_count > 0 {
341 let new_retry_count = retry_count - 1;
342 warn!(
343 "Get DHT: Internal disagreement for get dht request {progress:?}! \
344 requerying with more nodes. {new_retry_count:?} retries left"
345 );
346 self.retry_get(KadGetQuery {
347 backoff,
348 progress: DHTProgress::NotStarted,
349 notify,
350 key,
351 retry_count: new_retry_count,
352 records: Vec::new(),
353 });
354 }
355 warn!(
356 "Get DHT: Internal disagreement for get dht request {progress:?}! Giving \
357 up because out of retries. "
358 );
359 }
360 }
361 }
362 }
363
364 fn handle_put_query(&mut self, record_results: PutRecordResult, id: QueryId) {
366 if let Some(mut query) = self.in_progress_put_record_queries.remove(&id) {
367 if query.notify.is_canceled() {
369 return;
370 }
371
372 match record_results {
373 Ok(_) => {
374 if query.notify.send(()).is_err() {
375 warn!(
376 "Put DHT: client channel closed before put record request could be \
377 sent"
378 );
379 }
380 },
381 Err(e) => {
382 query.progress = DHTProgress::NotStarted;
383 query.backoff.start_next(false);
384
385 warn!(
386 "Put DHT: error performing put: {:?}. Retrying on pid {:?}.",
387 e, self.peer_id
388 );
389 self.retry_put(query);
391 },
392 }
393 } else {
394 warn!("Put DHT: completed DHT query that is no longer tracked.");
395 }
396 }
397
398 fn finish_bootstrap(&mut self) {
400 if let Some(mut tx) = self.bootstrap_tx.clone() {
401 spawn(async move { tx.send(bootstrap::InputEvent::BootstrapFinished).await });
402 }
403 }
404 #[allow(clippy::too_many_lines)]
405 pub fn dht_handle_event(
407 &mut self,
408 event: KademliaEvent,
409 store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
410 ) -> Option<NetworkEvent> {
411 match event {
412 KademliaEvent::OutboundQueryProgressed {
413 result: QueryResult::PutRecord(record_results),
414 id,
415 step: ProgressStep { last, .. },
416 ..
417 } => {
418 if last {
419 self.handle_put_query(record_results, id);
420 }
421 },
422 KademliaEvent::OutboundQueryProgressed {
423 result: QueryResult::GetClosestPeers(r),
424 id: query_id,
425 stats: _,
426 step: ProgressStep { last: true, .. },
427 ..
428 } => match r {
429 Ok(GetClosestPeersOk { key, peers: _ }) => {
430 if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id) {
431 if chan.send(()).is_err() {
432 warn!("DHT: finished query but client was no longer interested");
433 };
434 };
435 debug!("Successfully got closest peers for key {key:?}");
436 },
437 Err(e) => {
438 if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id) {
439 let _: Result<_, _> = chan.send(());
440 };
441 warn!("Failed to get closest peers: {e:?}");
442 },
443 },
444 KademliaEvent::OutboundQueryProgressed {
445 result: QueryResult::GetRecord(record_results),
446 id,
447 step: ProgressStep { last, .. },
448 ..
449 } => {
450 self.handle_get_query(store, record_results, id, last);
451 },
452 KademliaEvent::OutboundQueryProgressed {
453 result:
454 QueryResult::Bootstrap(Ok(BootstrapOk {
455 peer: _,
456 num_remaining,
457 })),
458 step: ProgressStep { last: true, .. },
459 ..
460 } => {
461 if num_remaining == 0 {
462 self.finish_bootstrap();
463 } else {
464 debug!("Bootstrap in progress, {num_remaining} nodes remaining");
465 }
466 return Some(NetworkEvent::IsBootstrapped);
467 },
468 KademliaEvent::OutboundQueryProgressed {
469 result: QueryResult::Bootstrap(Err(e)),
470 ..
471 } => {
472 let BootstrapError::Timeout { num_remaining, .. } = e;
473 if num_remaining.is_none() {
474 error!("Failed to bootstrap: {e:?}");
475 }
476 self.finish_bootstrap();
477 },
478 KademliaEvent::RoutablePeer { peer, address: _ } => {
479 debug!("Found routable peer {peer:?}");
480 },
481 KademliaEvent::PendingRoutablePeer { peer, address: _ } => {
482 debug!("Found pending routable peer {peer:?}");
483 },
484 KademliaEvent::UnroutablePeer { peer } => {
485 debug!("Found unroutable peer {peer:?}");
486 },
487 KademliaEvent::RoutingUpdated {
488 peer: _,
489 is_new_peer: _,
490 addresses: _,
491 bucket_range: _,
492 old_peer: _,
493 } => {
494 debug!("Routing table updated");
495 },
496 e @ KademliaEvent::OutboundQueryProgressed { .. } => {
497 debug!("Not handling dht event {e:?}");
498 },
499 e => {
500 debug!("New unhandled swarm event: {e:?}");
501 },
502 }
503 None
504 }
505}
506
507#[derive(Debug)]
509pub(crate) struct KadGetQuery {
510 pub(crate) backoff: ExponentialBackoff,
512 pub(crate) progress: DHTProgress,
514 pub(crate) notify: Vec<Sender<Vec<u8>>>,
516 pub(crate) key: Vec<u8>,
518 pub(crate) retry_count: u8,
520 pub(crate) records: Vec<Record>,
522}
523
524#[derive(Debug)]
526pub struct KadPutQuery {
527 pub(crate) backoff: ExponentialBackoff,
529 pub(crate) progress: DHTProgress,
531 pub(crate) notify: Sender<()>,
533 pub(crate) key: Vec<u8>,
535 pub(crate) value: Vec<u8>,
537}
538
539#[derive(Debug, Clone, Eq, Hash, PartialEq)]
541pub enum DHTProgress {
542 InProgress(QueryId),
544 NotStarted,
546}