hotshot_libp2p_networking/network/behaviours/
direct_message.rs1use std::collections::HashMap;
8
9use libp2p::request_response::{Event, Message, OutboundRequestId, ResponseChannel};
10use libp2p_identity::PeerId;
11use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
12use tracing::{debug, error, warn};
13
14use super::exponential_backoff::ExponentialBackoff;
15use crate::network::{ClientRequest, NetworkEvent};
16
17#[derive(Debug)]
19pub struct DMRequest {
20 pub peer_id: PeerId,
22 pub data: Vec<u8>,
24 pub backoff: ExponentialBackoff,
26 pub(crate) retry_count: u8,
28}
29
30#[derive(Debug, Default)]
33pub struct DMBehaviour {
34 in_progress_rr: HashMap<OutboundRequestId, DMRequest>,
36}
37
38#[derive(Debug)]
40pub enum DMEvent {
41 DirectRequest(Vec<u8>, PeerId, ResponseChannel<Vec<u8>>),
43 DirectResponse(Vec<u8>, PeerId),
45}
46
47impl DMBehaviour {
48 pub(crate) fn handle_dm_event(
50 &mut self,
51 event: Event<Vec<u8>, Vec<u8>>,
52 retry_tx: Option<UnboundedSender<ClientRequest>>,
53 ) -> Option<NetworkEvent> {
54 match event {
55 Event::InboundFailure {
56 peer,
57 request_id: _,
58 error,
59 } => {
60 error!("Inbound message failure from {:?}: {:?}", peer, error);
61 None
62 },
63 Event::OutboundFailure {
64 peer,
65 request_id,
66 error,
67 } => {
68 warn!("Outbound message failure to {:?}: {:?}", peer, error);
69 if let Some(mut req) = self.in_progress_rr.remove(&request_id) {
70 if req.retry_count == 0 {
71 return None;
72 }
73 req.retry_count -= 1;
74 if let Some(retry_tx) = retry_tx {
75 spawn(async move {
76 sleep(req.backoff.next_timeout(false)).await;
77 let _ = retry_tx.send(ClientRequest::DirectRequest {
78 pid: peer,
79 contents: req.data,
80 retry_count: req.retry_count,
81 });
82 });
83 }
84 }
85 None
86 },
87 Event::Message { message, peer, .. } => match message {
88 Message::Request {
89 request: msg,
90 channel,
91 ..
92 } => {
93 debug!("Received direct request {:?}", msg);
94 Some(NetworkEvent::DirectRequest(msg, peer, channel))
97 },
98 Message::Response {
99 request_id,
100 response: msg,
101 } => {
102 if let Some(req) = self.in_progress_rr.remove(&request_id) {
104 debug!("Received direct response {:?}", msg);
105 Some(NetworkEvent::DirectResponse(msg, req.peer_id))
106 } else {
107 warn!("Received response for unknown request id {:?}", request_id);
108 None
109 }
110 },
111 },
112 e @ Event::ResponseSent { .. } => {
113 debug!("Response sent {:?}", e);
114 None
115 },
116 }
117 }
118}
119
120impl DMBehaviour {
121 pub fn add_direct_request(&mut self, mut req: DMRequest, request_id: OutboundRequestId) {
123 if req.retry_count == 0 {
124 return;
125 }
126
127 req.retry_count -= 1;
128
129 debug!("Adding direct request {:?}", req);
130
131 self.in_progress_rr.insert(request_id, req);
132 }
133}