hotshot_libp2p_networking/network/behaviours/dht/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7/// Task for doing bootstraps at a regular interval
8pub mod bootstrap;
9use std::{collections::HashMap, marker::PhantomData, num::NonZeroUsize, time::Duration};
10
11/// a local caching layer for the DHT key value pairs
12use futures::{
13    channel::{mpsc, oneshot::Sender},
14    SinkExt,
15};
16use hotshot_types::traits::signature_key::SignatureKey;
17use lazy_static::lazy_static;
18use libp2p::kad::{
19    /* handler::KademliaHandlerIn, */ store::MemoryStore, BootstrapOk, GetClosestPeersOk,
20    GetRecordOk, GetRecordResult, ProgressStep, PutRecordResult, QueryId, QueryResult, Record,
21};
22use libp2p::kad::{
23    store::RecordStore, Behaviour as KademliaBehaviour, BootstrapError, Event as KademliaEvent,
24};
25use libp2p_identity::PeerId;
26use store::{
27    persistent::{DhtPersistentStorage, PersistentStore},
28    validated::ValidatedStore,
29};
30use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
31use tracing::{debug, error, warn};
32
33/// Additional DHT record functionality
34pub mod record;
35
36/// Additional DHT store functionality
37pub mod store;
38
39lazy_static! {
40    /// the maximum number of nodes to query in the DHT at any one time
41    static ref MAX_DHT_QUERY_SIZE: NonZeroUsize = NonZeroUsize::new(50).unwrap();
42}
43
44use super::exponential_backoff::ExponentialBackoff;
45use crate::network::{ClientRequest, NetworkEvent};
46
47/// Behaviour wrapping libp2p's kademlia
48/// included:
49/// - publishing API
50/// - Request API
51/// - bootstrapping into the network
52/// - peer discovery
53#[derive(Debug)]
54pub struct DHTBehaviour<K: SignatureKey + 'static, D: DhtPersistentStorage> {
55    /// in progress queries for nearby peers
56    pub in_progress_get_closest_peers: HashMap<QueryId, Sender<()>>,
57    /// List of in-progress get requests
58    in_progress_record_queries: HashMap<QueryId, KadGetQuery>,
59    /// The list of in-progress get requests by key
60    outstanding_dht_query_keys: HashMap<Vec<u8>, QueryId>,
61    /// List of in-progress put requests
62    in_progress_put_record_queries: HashMap<QueryId, KadPutQuery>,
63    /// State of bootstrapping
64    pub bootstrap_state: Bootstrap,
65    /// the peer id (useful only for debugging right now)
66    pub peer_id: PeerId,
67    /// replication factor
68    pub replication_factor: NonZeroUsize,
69    /// Sender to retry requests.
70    retry_tx: Option<UnboundedSender<ClientRequest>>,
71    /// Sender to the bootstrap task
72    bootstrap_tx: Option<mpsc::Sender<bootstrap::InputEvent>>,
73
74    /// Phantom type for the key and persistent storage
75    phantom: PhantomData<(K, D)>,
76}
77
78/// State of bootstrapping
79#[derive(Debug, Clone)]
80pub struct Bootstrap {
81    /// State of bootstrap
82    pub state: State,
83    /// Retry timeout
84    pub backoff: ExponentialBackoff,
85}
86
87/// State used for random walk and bootstrapping
88#[derive(Copy, Clone, Debug, PartialEq, Eq)]
89pub enum State {
90    /// Not in progress
91    NotStarted,
92    /// In progress
93    Started,
94}
95
96/// DHT event enum
97#[derive(Copy, Clone, Debug, PartialEq, Eq)]
98pub enum DHTEvent {
99    /// Only event tracked currently is when we successfully bootstrap into the network
100    IsBootstrapped,
101}
102
103impl<K: SignatureKey + 'static, D: DhtPersistentStorage> DHTBehaviour<K, D> {
104    /// Give the handler a way to retry requests.
105    pub fn set_retry(&mut self, tx: UnboundedSender<ClientRequest>) {
106        self.retry_tx = Some(tx);
107    }
108    /// Sets a sender to bootstrap task
109    pub fn set_bootstrap_sender(&mut self, tx: mpsc::Sender<bootstrap::InputEvent>) {
110        self.bootstrap_tx = Some(tx);
111    }
112    /// Create a new DHT behaviour
113    #[must_use]
114    pub fn new(pid: PeerId, replication_factor: NonZeroUsize) -> Self {
115        // needed because otherwise we stay in client mode when testing locally
116        // and don't publish keys stuff
117        // e.g. dht just doesn't work. We'd need to add mdns and that doesn't seem worth it since
118        // we won't have a local network
119        // <https://github.com/libp2p/rust-libp2p/issues/4194>
120        Self {
121            peer_id: pid,
122            in_progress_record_queries: HashMap::default(),
123            in_progress_put_record_queries: HashMap::default(),
124            outstanding_dht_query_keys: HashMap::default(),
125            bootstrap_state: Bootstrap {
126                state: State::NotStarted,
127                backoff: ExponentialBackoff::new(2, Duration::from_secs(1)),
128            },
129            in_progress_get_closest_peers: HashMap::default(),
130            replication_factor,
131            retry_tx: None,
132            bootstrap_tx: None,
133            phantom: PhantomData,
134        }
135    }
136
137    /// print out the routing table to stderr
138    pub fn print_routing_table(
139        &mut self,
140        kadem: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
141    ) {
142        let mut err = format!("KBUCKETS: PID: {:?}, ", self.peer_id);
143        let v = kadem.kbuckets().collect::<Vec<_>>();
144        for i in v {
145            for j in i.iter() {
146                let s = format!(
147                    "node: key: {:?}, val {:?}, status: {:?}",
148                    j.node.key, j.node.value, j.status
149                );
150                err.push_str(&s);
151            }
152        }
153        error!("{:?}", err);
154    }
155
156    /// Get the replication factor for queries
157    #[must_use]
158    pub fn replication_factor(&self) -> NonZeroUsize {
159        self.replication_factor
160    }
161    /// Publish a key/value to the kv store.
162    /// Once replicated upon all nodes, the caller is notified over
163    /// `chan`
164    pub fn put_record(&mut self, id: QueryId, query: KadPutQuery) {
165        self.in_progress_put_record_queries.insert(id, query);
166    }
167
168    /// Retrieve a value for a key from the DHT.
169    pub fn get_record(
170        &mut self,
171        key: Vec<u8>,
172        chans: Vec<Sender<Vec<u8>>>,
173        backoff: ExponentialBackoff,
174        retry_count: u8,
175        kad: &mut KademliaBehaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
176    ) {
177        // noop
178        if retry_count == 0 {
179            return;
180        }
181
182        // Check the cache before making the (expensive) query
183        if let Some(entry) = kad.store_mut().get(&key.clone().into()) {
184            // The key already exists in the cache, send the value to all channels
185            for chan in chans {
186                if chan.send(entry.value.clone()).is_err() {
187                    warn!("Get DHT: channel closed before get record request result could be sent");
188                }
189            }
190        } else {
191            // Check if the key is already being queried
192            if let Some(qid) = self.outstanding_dht_query_keys.get(&key) {
193                // The key was already being queried. Add the channel to the existing query
194                // Try to get the query from the query id
195                let Some(query) = self.in_progress_record_queries.get_mut(qid) else {
196                    warn!("Get DHT: outstanding query not found");
197                    return;
198                };
199
200                // Add the channel to the existing query
201                query.notify.extend(chans);
202            } else {
203                // The key was not already being queried and was not in the cache. Start a new query.
204                let qid = kad.get_record(key.clone().into());
205                let query = KadGetQuery {
206                    backoff,
207                    progress: DHTProgress::InProgress(qid),
208                    notify: chans,
209                    key: key.clone(),
210                    retry_count: retry_count - 1,
211                    records: Vec::new(),
212                };
213
214                // Add the key to the outstanding queries and in-progress queries
215                self.outstanding_dht_query_keys.insert(key, qid);
216                self.in_progress_record_queries.insert(qid, query);
217            }
218        }
219    }
220
221    /// Spawn a task which will retry the query after a backoff.
222    fn retry_get(&self, mut query: KadGetQuery) {
223        let Some(tx) = self.retry_tx.clone() else {
224            return;
225        };
226        let req = ClientRequest::GetDHT {
227            key: query.key,
228            notify: query.notify,
229            retry_count: query.retry_count,
230        };
231        let backoff = query.backoff.next_timeout(false);
232        spawn(async move {
233            sleep(backoff).await;
234            let _ = tx.send(req);
235        });
236    }
237
238    /// Spawn a task which will retry the query after a backoff.
239    fn retry_put(&self, mut query: KadPutQuery) {
240        let Some(tx) = self.retry_tx.clone() else {
241            return;
242        };
243        let req = ClientRequest::PutDHT {
244            key: query.key,
245            value: query.value,
246            notify: query.notify,
247        };
248        spawn(async move {
249            sleep(query.backoff.next_timeout(false)).await;
250            let _ = tx.send(req);
251        });
252    }
253
254    /// update state based on recv-ed get query
255    fn handle_get_query(
256        &mut self,
257        store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
258        record_results: GetRecordResult,
259        id: QueryId,
260        mut last: bool,
261    ) {
262        let found = match self.in_progress_record_queries.get_mut(&id) {
263            Some(query) => match record_results {
264                Ok(results) => match results {
265                    GetRecordOk::FoundRecord(record) => {
266                        // Make sure the record has an expiration time
267                        if record.record.expires.is_some() {
268                            query.records.push(record.record);
269                            true
270                        } else {
271                            false
272                        }
273                    },
274                    GetRecordOk::FinishedWithNoAdditionalRecord {
275                        cache_candidates: _,
276                    } => {
277                        tracing::debug!("GetRecord Finished with No Additional Record");
278                        last = true;
279                        false
280                    },
281                },
282                Err(err) => {
283                    warn!("Error in Kademlia query: {err:?}");
284                    false
285                },
286            },
287            None => {
288                // We already finished the query (or it's been cancelled). Do nothing and exit the
289                // function.
290                return;
291            },
292        };
293
294        // If we have more than one record or the query has completed, we can return the record to the client.
295        if found || last {
296            if let Some(KadGetQuery {
297                backoff,
298                progress,
299                notify,
300                key,
301                retry_count,
302                records,
303            }) = self.in_progress_record_queries.remove(&id)
304            {
305                // Remove the key from the outstanding queries so we are in sync
306                self.outstanding_dht_query_keys.remove(&key);
307
308                // `notify` is all channels that are still open
309                let notify = notify
310                    .into_iter()
311                    .filter(|n| !n.is_canceled())
312                    .collect::<Vec<_>>();
313
314                // If all are closed, we can exit
315                if notify.is_empty() {
316                    return;
317                }
318
319                // Find the record with the highest expiry
320                if let Some(record) = records.into_iter().max_by_key(|r| r.expires.unwrap()) {
321                    // Only return the record if we can store it (validation passed)
322                    if store.put(record.clone()).is_ok() {
323                        // Send the record to all channels that are still open
324                        for n in notify {
325                            if n.send(record.value.clone()).is_err() {
326                                warn!("Get DHT: channel closed before get record request result could be sent");
327                            }
328                        }
329                    } else {
330                        error!("Failed to store record in local store");
331                    }
332                }
333                // disagreement => query more nodes
334                else {
335                    // there is some internal disagreement or not enough nodes returned
336                    // Initiate new query that hits more replicas
337                    if retry_count > 0 {
338                        let new_retry_count = retry_count - 1;
339                        warn!("Get DHT: Internal disagreement for get dht request {progress:?}! requerying with more nodes. {new_retry_count:?} retries left");
340                        self.retry_get(KadGetQuery {
341                            backoff,
342                            progress: DHTProgress::NotStarted,
343                            notify,
344                            key,
345                            retry_count: new_retry_count,
346                            records: Vec::new(),
347                        });
348                    }
349                    warn!("Get DHT: Internal disagreement for get dht request {progress:?}! Giving up because out of retries. ");
350                }
351            }
352        }
353    }
354
355    /// Update state based on put query
356    fn handle_put_query(&mut self, record_results: PutRecordResult, id: QueryId) {
357        if let Some(mut query) = self.in_progress_put_record_queries.remove(&id) {
358            // dropped so we handle further
359            if query.notify.is_canceled() {
360                return;
361            }
362
363            match record_results {
364                Ok(_) => {
365                    if query.notify.send(()).is_err() {
366                        warn!("Put DHT: client channel closed before put record request could be sent");
367                    }
368                },
369                Err(e) => {
370                    query.progress = DHTProgress::NotStarted;
371                    query.backoff.start_next(false);
372
373                    warn!(
374                        "Put DHT: error performing put: {:?}. Retrying on pid {:?}.",
375                        e, self.peer_id
376                    );
377                    // push back onto the queue
378                    self.retry_put(query);
379                },
380            }
381        } else {
382            warn!("Put DHT: completed DHT query that is no longer tracked.");
383        }
384    }
385
386    /// Send that the bootstrap succeeded
387    fn finish_bootstrap(&mut self) {
388        if let Some(mut tx) = self.bootstrap_tx.clone() {
389            spawn(async move { tx.send(bootstrap::InputEvent::BootstrapFinished).await });
390        }
391    }
392    #[allow(clippy::too_many_lines)]
393    /// handle a DHT event
394    pub fn dht_handle_event(
395        &mut self,
396        event: KademliaEvent,
397        store: &mut PersistentStore<ValidatedStore<MemoryStore, K>, D>,
398    ) -> Option<NetworkEvent> {
399        match event {
400            KademliaEvent::OutboundQueryProgressed {
401                result: QueryResult::PutRecord(record_results),
402                id,
403                step: ProgressStep { last, .. },
404                ..
405            } => {
406                if last {
407                    self.handle_put_query(record_results, id);
408                }
409            },
410            KademliaEvent::OutboundQueryProgressed {
411                result: QueryResult::GetClosestPeers(r),
412                id: query_id,
413                stats: _,
414                step: ProgressStep { last: true, .. },
415                ..
416            } => match r {
417                Ok(GetClosestPeersOk { key, peers: _ }) => {
418                    if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id) {
419                        if chan.send(()).is_err() {
420                            warn!("DHT: finished query but client was no longer interested");
421                        };
422                    };
423                    debug!("Successfully got closest peers for key {key:?}");
424                },
425                Err(e) => {
426                    if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id) {
427                        let _: Result<_, _> = chan.send(());
428                    };
429                    warn!("Failed to get closest peers: {e:?}");
430                },
431            },
432            KademliaEvent::OutboundQueryProgressed {
433                result: QueryResult::GetRecord(record_results),
434                id,
435                step: ProgressStep { last, .. },
436                ..
437            } => {
438                self.handle_get_query(store, record_results, id, last);
439            },
440            KademliaEvent::OutboundQueryProgressed {
441                result:
442                    QueryResult::Bootstrap(Ok(BootstrapOk {
443                        peer: _,
444                        num_remaining,
445                    })),
446                step: ProgressStep { last: true, .. },
447                ..
448            } => {
449                if num_remaining == 0 {
450                    self.finish_bootstrap();
451                } else {
452                    debug!("Bootstrap in progress, {num_remaining} nodes remaining");
453                }
454                return Some(NetworkEvent::IsBootstrapped);
455            },
456            KademliaEvent::OutboundQueryProgressed {
457                result: QueryResult::Bootstrap(Err(e)),
458                ..
459            } => {
460                let BootstrapError::Timeout { num_remaining, .. } = e;
461                if num_remaining.is_none() {
462                    error!("Failed to bootstrap: {e:?}");
463                }
464                self.finish_bootstrap();
465            },
466            KademliaEvent::RoutablePeer { peer, address: _ } => {
467                debug!("Found routable peer {peer:?}");
468            },
469            KademliaEvent::PendingRoutablePeer { peer, address: _ } => {
470                debug!("Found pending routable peer {peer:?}");
471            },
472            KademliaEvent::UnroutablePeer { peer } => {
473                debug!("Found unroutable peer {peer:?}");
474            },
475            KademliaEvent::RoutingUpdated {
476                peer: _,
477                is_new_peer: _,
478                addresses: _,
479                bucket_range: _,
480                old_peer: _,
481            } => {
482                debug!("Routing table updated");
483            },
484            e @ KademliaEvent::OutboundQueryProgressed { .. } => {
485                debug!("Not handling dht event {e:?}");
486            },
487            e => {
488                debug!("New unhandled swarm event: {e:?}");
489            },
490        }
491        None
492    }
493}
494
495/// Metadata holder for get query
496#[derive(Debug)]
497pub(crate) struct KadGetQuery {
498    /// Exponential retry backoff
499    pub(crate) backoff: ExponentialBackoff,
500    /// progress through DHT query
501    pub(crate) progress: DHTProgress,
502    /// The channels to notify of the result
503    pub(crate) notify: Vec<Sender<Vec<u8>>>,
504    /// the key to look up
505    pub(crate) key: Vec<u8>,
506    /// the number of remaining retries before giving up
507    pub(crate) retry_count: u8,
508    /// already received records
509    pub(crate) records: Vec<Record>,
510}
511
512/// Metadata holder for get query
513#[derive(Debug)]
514pub struct KadPutQuery {
515    /// Exponential retry backoff
516    pub(crate) backoff: ExponentialBackoff,
517    /// progress through DHT query
518    pub(crate) progress: DHTProgress,
519    /// notify client of result
520    pub(crate) notify: Sender<()>,
521    /// the key to put
522    pub(crate) key: Vec<u8>,
523    /// the value to put
524    pub(crate) value: Vec<u8>,
525}
526
527/// represents progress through DHT
528#[derive(Debug, Clone, Eq, Hash, PartialEq)]
529pub enum DHTProgress {
530    /// The query has been started
531    InProgress(QueryId),
532    /// The query has not been started
533    NotStarted,
534}