hotshot_libp2p_networking/network/behaviours/
direct_message.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::collections::HashMap;
8
9use libp2p::request_response::{Event, Message, OutboundRequestId, ResponseChannel};
10use libp2p_identity::PeerId;
11use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
12use tracing::{debug, error, warn};
13
14use super::exponential_backoff::ExponentialBackoff;
15use crate::network::{ClientRequest, NetworkEvent};
16
17/// Request to direct message a peert
18#[derive(Debug)]
19pub struct DMRequest {
20    /// the recv-ers peer id
21    pub peer_id: PeerId,
22    /// the data
23    pub data: Vec<u8>,
24    /// backoff since last attempted request
25    pub backoff: ExponentialBackoff,
26    /// the number of remaining retries before giving up
27    pub(crate) retry_count: u8,
28}
29
30/// Wrapper metadata around libp2p's request response
31/// usage: direct message peer
32#[derive(Debug, Default)]
33pub struct DMBehaviour {
34    /// In progress queries
35    in_progress_rr: HashMap<OutboundRequestId, DMRequest>,
36}
37
38/// Lilst of direct message output events
39#[derive(Debug)]
40pub enum DMEvent {
41    /// We received as Direct Request
42    DirectRequest(Vec<u8>, PeerId, ResponseChannel<Vec<u8>>),
43    /// We received a Direct Response
44    DirectResponse(Vec<u8>, PeerId),
45}
46
47impl DMBehaviour {
48    /// handle a direct message event
49    pub(crate) fn handle_dm_event(
50        &mut self,
51        event: Event<Vec<u8>, Vec<u8>>,
52        retry_tx: Option<UnboundedSender<ClientRequest>>,
53    ) -> Option<NetworkEvent> {
54        match event {
55            Event::InboundFailure {
56                peer,
57                request_id: _,
58                error,
59            } => {
60                error!("Inbound message failure from {:?}: {:?}", peer, error);
61                None
62            },
63            Event::OutboundFailure {
64                peer,
65                request_id,
66                error,
67            } => {
68                warn!("Outbound message failure to {:?}: {:?}", peer, error);
69                if let Some(mut req) = self.in_progress_rr.remove(&request_id) {
70                    if req.retry_count == 0 {
71                        return None;
72                    }
73                    req.retry_count -= 1;
74                    if let Some(retry_tx) = retry_tx {
75                        spawn(async move {
76                            sleep(req.backoff.next_timeout(false)).await;
77                            let _ = retry_tx.send(ClientRequest::DirectRequest {
78                                pid: peer,
79                                contents: req.data,
80                                retry_count: req.retry_count,
81                            });
82                        });
83                    }
84                }
85                None
86            },
87            Event::Message { message, peer, .. } => match message {
88                Message::Request {
89                    request: msg,
90                    channel,
91                    ..
92                } => {
93                    debug!("Received direct request {:?}", msg);
94                    // receiver, not initiator.
95                    // don't track. If we are disconnected, sender will reinitiate
96                    Some(NetworkEvent::DirectRequest(msg, peer, channel))
97                },
98                Message::Response {
99                    request_id,
100                    response: msg,
101                } => {
102                    // success, finished.
103                    if let Some(req) = self.in_progress_rr.remove(&request_id) {
104                        debug!("Received direct response {:?}", msg);
105                        Some(NetworkEvent::DirectResponse(msg, req.peer_id))
106                    } else {
107                        warn!("Received response for unknown request id {:?}", request_id);
108                        None
109                    }
110                },
111            },
112            e @ Event::ResponseSent { .. } => {
113                debug!("Response sent {:?}", e);
114                None
115            },
116        }
117    }
118}
119
120impl DMBehaviour {
121    /// Add a direct request for a given peer
122    pub fn add_direct_request(&mut self, mut req: DMRequest, request_id: OutboundRequestId) {
123        if req.retry_count == 0 {
124            return;
125        }
126
127        req.retry_count -= 1;
128
129        debug!("Adding direct request {:?}", req);
130
131        self.in_progress_rr.insert(request_id, req);
132    }
133}