hotshot_libp2p_networking/network/behaviours/
direct_message.rs1use std::collections::HashMap;
8
9use libp2p::request_response::{Event, Message, OutboundRequestId, ResponseChannel};
10use libp2p_identity::PeerId;
11use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
12use tracing::{debug, error, warn};
13
14use super::exponential_backoff::ExponentialBackoff;
15use crate::network::{ClientRequest, NetworkEvent};
16
17#[derive(Debug)]
19pub struct DMRequest {
20 pub peer_id: PeerId,
22 pub data: Vec<u8>,
24 pub backoff: ExponentialBackoff,
26 pub(crate) retry_count: u8,
28}
29
30#[derive(Debug, Default)]
33pub struct DMBehaviour {
34 in_progress_rr: HashMap<OutboundRequestId, DMRequest>,
36}
37
38#[derive(Debug)]
40pub enum DMEvent {
41 DirectRequest(Vec<u8>, PeerId, ResponseChannel<Vec<u8>>),
43 DirectResponse(Vec<u8>, PeerId),
45}
46
47impl DMBehaviour {
48 pub(crate) fn handle_dm_event(
50 &mut self,
51 event: Event<Vec<u8>, Vec<u8>>,
52 retry_tx: Option<UnboundedSender<ClientRequest>>,
53 ) -> Option<NetworkEvent> {
54 match event {
55 Event::InboundFailure {
56 peer,
57 request_id: _,
58 error,
59 connection_id: _,
60 } => {
61 error!("Inbound message failure from {:?}: {:?}", peer, error);
62 None
63 },
64 Event::OutboundFailure {
65 peer,
66 request_id,
67 error,
68 connection_id: _,
69 } => {
70 warn!("Outbound message failure to {:?}: {:?}", peer, error);
71 if let Some(mut req) = self.in_progress_rr.remove(&request_id) {
72 if req.retry_count == 0 {
73 return None;
74 }
75 req.retry_count -= 1;
76 if let Some(retry_tx) = retry_tx {
77 spawn(async move {
78 sleep(req.backoff.next_timeout(false)).await;
79 let _ = retry_tx.send(ClientRequest::DirectRequest {
80 pid: peer,
81 contents: req.data,
82 retry_count: req.retry_count,
83 });
84 });
85 }
86 }
87 None
88 },
89 Event::Message { message, peer, .. } => match message {
90 Message::Request {
91 request: msg,
92 channel,
93 ..
94 } => {
95 debug!("Received direct request {:?}", msg);
96 Some(NetworkEvent::DirectRequest(msg, peer, channel))
99 },
100 Message::Response {
101 request_id,
102 response: msg,
103 } => {
104 if let Some(req) = self.in_progress_rr.remove(&request_id) {
106 debug!("Received direct response {:?}", msg);
107 Some(NetworkEvent::DirectResponse(msg, req.peer_id))
108 } else {
109 warn!("Received response for unknown request id {:?}", request_id);
110 None
111 }
112 },
113 },
114 e @ Event::ResponseSent { .. } => {
115 debug!("Response sent {:?}", e);
116 None
117 },
118 }
119 }
120}
121
122impl DMBehaviour {
123 pub fn add_direct_request(&mut self, mut req: DMRequest, request_id: OutboundRequestId) {
125 if req.retry_count == 0 {
126 return;
127 }
128
129 req.retry_count -= 1;
130
131 debug!("Adding direct request {:?}", req);
132
133 self.in_progress_rr.insert(request_id, req);
134 }
135}