hotshot_libp2p_networking/network/behaviours/dht/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7/// Task for doing bootstraps at a regular interval
8pub mod bootstrap;
9use std::{collections::HashMap, marker::PhantomData, num::NonZeroUsize, time::Duration};
10
11/// a local caching layer for the DHT key value pairs
12use futures::{
13    channel::{mpsc, oneshot::Sender},
14    SinkExt,
15};
16use hotshot_types::traits::signature_key::SignatureKey;
17use lazy_static::lazy_static;
18use libp2p::kad::{
19    /* handler::KademliaHandlerIn, */ store::MemoryStore, BootstrapOk, GetClosestPeersOk,
20    GetRecordOk, GetRecordResult, ProgressStep, PutRecordResult, QueryId, QueryResult, Record,
21};
22use libp2p::kad::{
23    store::RecordStore, Behaviour as KademliaBehaviour, BootstrapError, Event as KademliaEvent,
24};
25use libp2p_identity::PeerId;
26use store::{
27    persistent::{DhtPersistentStorage, PersistentStore},
28    validated::ValidatedStore,
29};
30use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
31use tracing::{debug, error, warn};
32
33/// Additional DHT record functionality
34pub mod record;
35
36/// Additional DHT store functionality
37pub mod store;
38
39lazy_static! {
40    /// the maximum number of nodes to query in the DHT at any one time
41    static ref MAX_DHT_QUERY_SIZE: NonZeroUsize = NonZeroUsize::new(50).unwrap();
42}
43
44use super::exponential_backoff::ExponentialBackoff;
45use crate::network::{ClientRequest, NetworkEvent};
46
47/// Behaviour wrapping libp2p's kademlia
48/// included:
49/// - publishing API
50/// - Request API
51/// - bootstrapping into the network
52/// - peer discovery
53#[derive(Debug)]
54pub struct DHTBehaviour<K: SignatureKey + 'static, D: DhtPersistentStorage> {
55    /// in progress queries for nearby peers
56    pub in_progress_get_closest_peers: HashMap<QueryId, Sender<()>>,
57    /// List of in-progress get requests
58    in_progress_record_queries: HashMap<QueryId, KadGetQuery>,
59    /// The list of in-progress get requests by key
60    outstanding_dht_query_keys: HashMap<Vec<u8>, QueryId>,
61    /// List of in-progress put requests
62    in_progress_put_record_queries: HashMap<QueryId, KadPutQuery>,
63    /// State of bootstrapping
64    pub bootstrap_state: Bootstrap,
65    /// the peer id (useful only for debugging right now)
66    pub peer_id: PeerId,
67    /// replication factor
68    pub replication_factor: NonZeroUsize,
69    /// Sender to retry requests.
70    retry_tx: Option<UnboundedSender<ClientRequest>>,
71    /// Sender to the bootstrap task
72    bootstrap_tx: Option<mpsc::Sender<bootstrap::InputEvent>>,
73
74    /// Phantom type for the key and persistent storage
75    phantom: PhantomData<(K, D)>,
76}
77
78/// State of bootstrapping
79#[derive(Debug, Clone)]
80pub struct Bootstrap {
81    /// State of bootstrap
82    pub state: State,
83    /// Retry timeout
84    pub backoff: ExponentialBackoff,
85}
86
87/// State used for random walk and bootstrapping
88#[derive(Copy, Clone, Debug, PartialEq, Eq)]
89pub enum State {
90    /// Not in progress
91    NotStarted,
92    /// In progress
93    Started,
94}
95
96/// DHT event enum
97#[derive(Copy, Clone, Debug, PartialEq, Eq)]
98pub enum DHTEvent {
99    /// Only event tracked currently is when we successfully bootstrap into the network
100    IsBootstrapped,
101}
102
103impl<K: SignatureKey + 'static, D: DhtPersistentStorage> DHTBehaviour<K, D> {
104    /// Give the handler a way to retry requests.
105    pub fn set_retry(&mut self, tx: UnboundedSender<ClientRequest>) {
106        self.retry_tx = Some(tx);
107    }
108    /// Sets a sender to bootstrap task
109    pub fn set_bootstrap_sender(&mut self, tx: mpsc::Sender<bootstrap::InputEvent>) {
110        self.bootstrap_tx = Some(tx);
111    }
112    /// Create a new DHT behaviour
113    #[must_use]
114    pub fn new(pid: PeerId, replication_factor: NonZeroUsize) -> Self {
115        // needed because otherwise we stay in client mode when testing locally
116        // and don't publish keys stuff
117        // e.g. dht just doesn't work. We'd need to add mdns and that doesn't seem worth it since
118        // we won't have a local network
119        // <https://github.com/libp2p/rust-libp2p/issues/4194>
120        Self {
121            peer_id: pid,
122            in_progress_record_queries: HashMap::default(),
123            in_progress_put_record_queries: HashMap::default(),
124            outstanding_dht_query_keys: HashMap::default(),
125            bootstrap_state: Bootstrap {
126                state: State::NotStarted,
127                backoff: ExponentialBackoff::new(2, Duration::from_secs(1)),
128            },
129            in_progress_get_closest_peers: HashMap::default(),
130            replication_factor,
131            retry_tx: None,
132            bootstrap_tx: None,
133            phantom: PhantomData,
134        }
135    }
136
137    /// print out the routing table to stderr
138    pub fn print_routing_table(
139        &mut self,
140        kadem: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
141    ) {
142        let mut err = format!("KBUCKETS: PID: {:?}, ", self.peer_id);
143        let v = kadem.kbuckets().collect::<Vec<_>>();
144        for i in v {
145            for j in i.iter() {
146                let s = format!(
147                    "node: key: {:?}, val {:?}, status: {:?}",
148                    j.node.key, j.node.value, j.status
149                );
150                err.push_str(&s);
151            }
152        }
153        error!("{:?}", err);
154    }
155
156    /// Get the replication factor for queries
157    #[must_use]
158    pub fn replication_factor(&self) -> NonZeroUsize {
159        self.replication_factor
160    }
161    /// Publish a key/value to the kv store.
162    /// Once replicated upon all nodes, the caller is notified over
163    /// `chan`
164    pub fn put_record(&mut self, id: QueryId, query: KadPutQuery) {
165        self.in_progress_put_record_queries.insert(id, query);
166    }
167
168    /// Retrieve a value for a key from the DHT.
169    pub fn get_record(
170        &mut self,
171        key: Vec<u8>,
172        chans: Vec<Sender<Vec<u8>>>,
173        backoff: ExponentialBackoff,
174        retry_count: u8,
175        kad: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
176    ) {
177        // noop
178        if retry_count == 0 {
179            return;
180        }
181
182        // Check the cache before making the (expensive) query
183        if let Some(entry) = kad.store_mut().get(&key.clone().into()) {
184            // The key already exists in the cache, send the value to all channels
185            for chan in chans {
186                if chan.send(entry.value.clone()).is_err() {
187                    warn!("Get DHT: channel closed before get record request result could be sent");
188                }
189            }
190        } else {
191            // Check if the key is already being queried
192            if let Some(qid) = self.outstanding_dht_query_keys.get(&key) {
193                // The key was already being queried. Add the channel to the existing query
194                // Try to get the query from the query id
195                let Some(query) = self.in_progress_record_queries.get_mut(qid) else {
196                    warn!("Get DHT: outstanding query not found");
197                    return;
198                };
199
200                // Add the channel to the existing query
201                query.notify.extend(chans);
202            } else {
203                // The key was not already being queried and was not in the cache. Start a new query.
204                let qid = kad.get_record(key.clone().into());
205                let query = KadGetQuery {
206                    backoff,
207                    progress: DHTProgress::InProgress(qid),
208                    notify: chans,
209                    key: key.clone(),
210                    retry_count: retry_count - 1,
211                    records: Vec::new(),
212                };
213
214                // Add the key to the outstanding queries and in-progress queries
215                self.outstanding_dht_query_keys.insert(key, qid);
216                self.in_progress_record_queries.insert(qid, query);
217            }
218        }
219    }
220
221    /// Spawn a task which will retry the query after a backoff.
222    fn retry_get(&self, mut query: KadGetQuery) {
223        let Some(tx) = self.retry_tx.clone() else {
224            return;
225        };
226        let req = ClientRequest::GetDHT {
227            key: query.key,
228            notify: query.notify,
229            retry_count: query.retry_count,
230        };
231        let backoff = query.backoff.next_timeout(false);
232        spawn(async move {
233            sleep(backoff).await;
234            let _ = tx.send(req);
235        });
236    }
237
238    /// Spawn a task which will retry the query after a backoff.
239    fn retry_put(&self, mut query: KadPutQuery) {
240        let Some(tx) = self.retry_tx.clone() else {
241            return;
242        };
243        let req = ClientRequest::PutDHT {
244            key: query.key,
245            value: query.value,
246            notify: query.notify,
247        };
248        spawn(async move {
249            sleep(query.backoff.next_timeout(false)).await;
250            let _ = tx.send(req);
251        });
252    }
253
254    /// update state based on recv-ed get query
255    fn handle_get_query(
256        &mut self,
257        store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
258        record_results: GetRecordResult,
259        id: QueryId,
260        mut last: bool,
261    ) {
262        let found = match self.in_progress_record_queries.get_mut(&id) {
263            Some(query) => match record_results {
264                Ok(results) => match results {
265                    GetRecordOk::FoundRecord(record) => {
266                        // Make sure the record has an expiration time
267                        if record.record.expires.is_some() {
268                            query.records.push(record.record);
269                            true
270                        } else {
271                            false
272                        }
273                    },
274                    GetRecordOk::FinishedWithNoAdditionalRecord {
275                        cache_candidates: _,
276                    } => {
277                        tracing::debug!("GetRecord Finished with No Additional Record");
278                        last = true;
279                        false
280                    },
281                },
282                Err(err) => {
283                    warn!("Error in Kademlia query: {err:?}");
284                    false
285                },
286            },
287            None => {
288                // We already finished the query (or it's been cancelled). Do nothing and exit the
289                // function.
290                return;
291            },
292        };
293
294        // If we have more than one record or the query has completed, we can return the record to the client.
295        if found || last {
296            if let Some(KadGetQuery {
297                backoff,
298                progress,
299                notify,
300                key,
301                retry_count,
302                records,
303            }) = self.in_progress_record_queries.remove(&id)
304            {
305                // Remove the key from the outstanding queries so we are in sync
306                self.outstanding_dht_query_keys.remove(&key);
307
308                // `notify` is all channels that are still open
309                let notify = notify
310                    .into_iter()
311                    .filter(|n| !n.is_canceled())
312                    .collect::<Vec<_>>();
313
314                // If all are closed, we can exit
315                if notify.is_empty() {
316                    return;
317                }
318
319                // Find the record with the highest expiry
320                if let Some(record) = records.into_iter().max_by_key(|r| r.expires.unwrap()) {
321                    // Only return the record if we can store it (validation passed)
322                    if store.put(record.clone()).is_ok() {
323                        // Send the record to all channels that are still open
324                        for n in notify {
325                            if n.send(record.value.clone()).is_err() {
326                                warn!(
327                                    "Get DHT: channel closed before get record request result \
328                                     could be sent"
329                                );
330                            }
331                        }
332                    } else {
333                        error!("Failed to store record in local store");
334                    }
335                }
336                // disagreement => query more nodes
337                else {
338                    // there is some internal disagreement or not enough nodes returned
339                    // Initiate new query that hits more replicas
340                    if retry_count > 0 {
341                        let new_retry_count = retry_count - 1;
342                        warn!(
343                            "Get DHT: Internal disagreement for get dht request {progress:?}! \
344                             requerying with more nodes. {new_retry_count:?} retries left"
345                        );
346                        self.retry_get(KadGetQuery {
347                            backoff,
348                            progress: DHTProgress::NotStarted,
349                            notify,
350                            key,
351                            retry_count: new_retry_count,
352                            records: Vec::new(),
353                        });
354                    }
355                    warn!(
356                        "Get DHT: Internal disagreement for get dht request {progress:?}! Giving \
357                         up because out of retries. "
358                    );
359                }
360            }
361        }
362    }
363
364    /// Update state based on put query
365    fn handle_put_query(&mut self, record_results: PutRecordResult, id: QueryId) {
366        if let Some(mut query) = self.in_progress_put_record_queries.remove(&id) {
367            // dropped so we handle further
368            if query.notify.is_canceled() {
369                return;
370            }
371
372            match record_results {
373                Ok(_) => {
374                    if query.notify.send(()).is_err() {
375                        warn!(
376                            "Put DHT: client channel closed before put record request could be \
377                             sent"
378                        );
379                    }
380                },
381                Err(e) => {
382                    query.progress = DHTProgress::NotStarted;
383                    query.backoff.start_next(false);
384
385                    warn!(
386                        "Put DHT: error performing put: {:?}. Retrying on pid {:?}.",
387                        e, self.peer_id
388                    );
389                    // push back onto the queue
390                    self.retry_put(query);
391                },
392            }
393        } else {
394            warn!("Put DHT: completed DHT query that is no longer tracked.");
395        }
396    }
397
398    /// Send that the bootstrap succeeded
399    fn finish_bootstrap(&mut self) {
400        if let Some(mut tx) = self.bootstrap_tx.clone() {
401            spawn(async move { tx.send(bootstrap::InputEvent::BootstrapFinished).await });
402        }
403    }
404    #[allow(clippy::too_many_lines)]
405    /// handle a DHT event
406    pub fn dht_handle_event(
407        &mut self,
408        event: KademliaEvent,
409        store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
410    ) -> Option<NetworkEvent> {
411        match event {
412            KademliaEvent::OutboundQueryProgressed {
413                result: QueryResult::PutRecord(record_results),
414                id,
415                step: ProgressStep { last, .. },
416                ..
417            } => {
418                if last {
419                    self.handle_put_query(record_results, id);
420                }
421            },
422            KademliaEvent::OutboundQueryProgressed {
423                result: QueryResult::GetClosestPeers(r),
424                id: query_id,
425                stats: _,
426                step: ProgressStep { last: true, .. },
427                ..
428            } => match r {
429                Ok(GetClosestPeersOk { key, peers: _ }) => {
430                    if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id) {
431                        if chan.send(()).is_err() {
432                            warn!("DHT: finished query but client was no longer interested");
433                        };
434                    };
435                    debug!("Successfully got closest peers for key {key:?}");
436                },
437                Err(e) => {
438                    if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id) {
439                        let _: Result<_, _> = chan.send(());
440                    };
441                    warn!("Failed to get closest peers: {e:?}");
442                },
443            },
444            KademliaEvent::OutboundQueryProgressed {
445                result: QueryResult::GetRecord(record_results),
446                id,
447                step: ProgressStep { last, .. },
448                ..
449            } => {
450                self.handle_get_query(store, record_results, id, last);
451            },
452            KademliaEvent::OutboundQueryProgressed {
453                result:
454                    QueryResult::Bootstrap(Ok(BootstrapOk {
455                        peer: _,
456                        num_remaining,
457                    })),
458                step: ProgressStep { last: true, .. },
459                ..
460            } => {
461                if num_remaining == 0 {
462                    self.finish_bootstrap();
463                } else {
464                    debug!("Bootstrap in progress, {num_remaining} nodes remaining");
465                }
466                return Some(NetworkEvent::IsBootstrapped);
467            },
468            KademliaEvent::OutboundQueryProgressed {
469                result: QueryResult::Bootstrap(Err(e)),
470                ..
471            } => {
472                let BootstrapError::Timeout { num_remaining, .. } = e;
473                if num_remaining.is_none() {
474                    error!("Failed to bootstrap: {e:?}");
475                }
476                self.finish_bootstrap();
477            },
478            KademliaEvent::RoutablePeer { peer, address: _ } => {
479                debug!("Found routable peer {peer:?}");
480            },
481            KademliaEvent::PendingRoutablePeer { peer, address: _ } => {
482                debug!("Found pending routable peer {peer:?}");
483            },
484            KademliaEvent::UnroutablePeer { peer } => {
485                debug!("Found unroutable peer {peer:?}");
486            },
487            KademliaEvent::RoutingUpdated {
488                peer: _,
489                is_new_peer: _,
490                addresses: _,
491                bucket_range: _,
492                old_peer: _,
493            } => {
494                debug!("Routing table updated");
495            },
496            e @ KademliaEvent::OutboundQueryProgressed { .. } => {
497                debug!("Not handling dht event {e:?}");
498            },
499            e => {
500                debug!("New unhandled swarm event: {e:?}");
501            },
502        }
503        None
504    }
505}
506
507/// Metadata holder for get query
508#[derive(Debug)]
509pub(crate) struct KadGetQuery {
510    /// Exponential retry backoff
511    pub(crate) backoff: ExponentialBackoff,
512    /// progress through DHT query
513    pub(crate) progress: DHTProgress,
514    /// The channels to notify of the result
515    pub(crate) notify: Vec<Sender<Vec<u8>>>,
516    /// the key to look up
517    pub(crate) key: Vec<u8>,
518    /// the number of remaining retries before giving up
519    pub(crate) retry_count: u8,
520    /// already received records
521    pub(crate) records: Vec<Record>,
522}
523
524/// Metadata holder for get query
525#[derive(Debug)]
526pub struct KadPutQuery {
527    /// Exponential retry backoff
528    pub(crate) backoff: ExponentialBackoff,
529    /// progress through DHT query
530    pub(crate) progress: DHTProgress,
531    /// notify client of result
532    pub(crate) notify: Sender<()>,
533    /// the key to put
534    pub(crate) key: Vec<u8>,
535    /// the value to put
536    pub(crate) value: Vec<u8>,
537}
538
539/// represents progress through DHT
540#[derive(Debug, Clone, Eq, Hash, PartialEq)]
541pub enum DHTProgress {
542    /// The query has been started
543    InProgress(QueryId),
544    /// The query has not been started
545    NotStarted,
546}