hotshot_libp2p_networking/network/behaviours/
direct_message.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::collections::HashMap;
8
9use libp2p::request_response::{Event, Message, OutboundRequestId, ResponseChannel};
10use libp2p_identity::PeerId;
11use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
12use tracing::{debug, error, warn};
13
14use super::exponential_backoff::ExponentialBackoff;
15use crate::network::{ClientRequest, NetworkEvent};
16
17/// Request to direct message a peert
18#[derive(Debug)]
19pub struct DMRequest {
20    /// the recv-ers peer id
21    pub peer_id: PeerId,
22    /// the data
23    pub data: Vec<u8>,
24    /// backoff since last attempted request
25    pub backoff: ExponentialBackoff,
26    /// the number of remaining retries before giving up
27    pub(crate) retry_count: u8,
28}
29
30/// Wrapper metadata around libp2p's request response
31/// usage: direct message peer
32#[derive(Debug, Default)]
33pub struct DMBehaviour {
34    /// In progress queries
35    in_progress_rr: HashMap<OutboundRequestId, DMRequest>,
36}
37
38/// Lilst of direct message output events
39#[derive(Debug)]
40pub enum DMEvent {
41    /// We received as Direct Request
42    DirectRequest(Vec<u8>, PeerId, ResponseChannel<Vec<u8>>),
43    /// We received a Direct Response
44    DirectResponse(Vec<u8>, PeerId),
45}
46
47impl DMBehaviour {
48    /// handle a direct message event
49    pub(crate) fn handle_dm_event(
50        &mut self,
51        event: Event<Vec<u8>, Vec<u8>>,
52        retry_tx: Option<UnboundedSender<ClientRequest>>,
53    ) -> Option<NetworkEvent> {
54        match event {
55            Event::InboundFailure {
56                peer,
57                request_id: _,
58                error,
59                connection_id: _,
60            } => {
61                error!("Inbound message failure from {:?}: {:?}", peer, error);
62                None
63            },
64            Event::OutboundFailure {
65                peer,
66                request_id,
67                error,
68                connection_id: _,
69            } => {
70                warn!("Outbound message failure to {:?}: {:?}", peer, error);
71                if let Some(mut req) = self.in_progress_rr.remove(&request_id) {
72                    if req.retry_count == 0 {
73                        return None;
74                    }
75                    req.retry_count -= 1;
76                    if let Some(retry_tx) = retry_tx {
77                        spawn(async move {
78                            sleep(req.backoff.next_timeout(false)).await;
79                            let _ = retry_tx.send(ClientRequest::DirectRequest {
80                                pid: peer,
81                                contents: req.data,
82                                retry_count: req.retry_count,
83                            });
84                        });
85                    }
86                }
87                None
88            },
89            Event::Message { message, peer, .. } => match message {
90                Message::Request {
91                    request: msg,
92                    channel,
93                    ..
94                } => {
95                    debug!("Received direct request {:?}", msg);
96                    // receiver, not initiator.
97                    // don't track. If we are disconnected, sender will reinitiate
98                    Some(NetworkEvent::DirectRequest(msg, peer, channel))
99                },
100                Message::Response {
101                    request_id,
102                    response: msg,
103                } => {
104                    // success, finished.
105                    if let Some(req) = self.in_progress_rr.remove(&request_id) {
106                        debug!("Received direct response {:?}", msg);
107                        Some(NetworkEvent::DirectResponse(msg, req.peer_id))
108                    } else {
109                        warn!("Received response for unknown request id {:?}", request_id);
110                        None
111                    }
112                },
113            },
114            e @ Event::ResponseSent { .. } => {
115                debug!("Response sent {:?}", e);
116                None
117            },
118        }
119    }
120}
121
122impl DMBehaviour {
123    /// Add a direct request for a given peer
124    pub fn add_direct_request(&mut self, mut req: DMRequest, request_id: OutboundRequestId) {
125        if req.retry_count == 0 {
126            return;
127        }
128
129        req.retry_count -= 1;
130
131        debug!("Adding direct request {:?}", req);
132
133        self.in_progress_rr.insert(request_id, req);
134    }
135}