hotshot_libp2p_networking/network/
def.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use hotshot_types::traits::signature_key::SignatureKey;
8use libp2p::{
9    autonat,
10    gossipsub::{Behaviour as GossipBehaviour, Event as GossipEvent, IdentTopic},
11    identify::{Behaviour as IdentifyBehaviour, Event as IdentifyEvent},
12    kad::store::MemoryStore,
13    request_response::{OutboundRequestId, ResponseChannel},
14    Multiaddr,
15};
16use libp2p_identity::PeerId;
17use libp2p_swarm_derive::NetworkBehaviour;
18use tracing::{debug, error};
19
20use super::{
21    behaviours::dht::store::{
22        persistent::{DhtPersistentStorage, PersistentStore},
23        validated::ValidatedStore,
24    },
25    cbor, NetworkEventInternal,
26};
27
28/// Overarching network behaviour performing:
29/// - network topology discovery
30/// - direct messaging
31/// - p2p broadcast
32/// - connection management
33#[derive(NetworkBehaviour, derive_more::Debug)]
34#[behaviour(to_swarm = "NetworkEventInternal")]
35pub struct NetworkDef<K: SignatureKey + 'static, D: DhtPersistentStorage> {
36    /// purpose: broadcasting messages to many peers
37    /// NOTE gossipsub works ONLY for sharing messages right now
38    /// in the future it may be able to do peer discovery and routing
39    /// <https://github.com/libp2p/rust-libp2p/issues/2398>
40    #[debug(skip)]
41    gossipsub: GossipBehaviour,
42
43    /// The DHT store. We use a `PersistentStore` to occasionally save the DHT to
44    /// some persistent store and a `ValidatedStore` to validate the records stored.
45    #[debug(skip)]
46    pub dht: libp2p::kad::Behaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
47
48    /// purpose: identifying the addresses from an outside POV
49    #[debug(skip)]
50    identify: IdentifyBehaviour,
51
52    /// purpose: directly messaging peer
53    #[debug(skip)]
54    pub direct_message: cbor::Behaviour<Vec<u8>, Vec<u8>>,
55
56    /// Auto NAT behaviour to determine if we are publicly reachable and
57    /// by which address
58    #[debug(skip)]
59    pub autonat: libp2p::autonat::Behaviour,
60}
61
62impl<K: SignatureKey + 'static, D: DhtPersistentStorage> NetworkDef<K, D> {
63    /// Create a new instance of a `NetworkDef`
64    #[must_use]
65    pub fn new(
66        gossipsub: GossipBehaviour,
67        dht: libp2p::kad::Behaviour<PersistentStore<ValidatedStore<MemoryStore, K>, D>>,
68        identify: IdentifyBehaviour,
69        direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>>,
70        autonat: autonat::Behaviour,
71    ) -> NetworkDef<K, D> {
72        Self {
73            gossipsub,
74            dht,
75            identify,
76            direct_message,
77            autonat,
78        }
79    }
80}
81
82/// Address functions
83impl<K: SignatureKey + 'static, D: DhtPersistentStorage> NetworkDef<K, D> {
84    /// Add an address
85    pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
86        // NOTE to get this address to play nice with the other
87        // behaviours using the DHT for routing
88        // we only need to add this address to the DHT since it
89        // is always enabled. If it were not always enabled,
90        // we would need to manually add the address to
91        // the direct message behaviour
92        self.dht.add_address(peer_id, address);
93    }
94}
95
96/// Gossip functions
97impl<K: SignatureKey + 'static, D: DhtPersistentStorage> NetworkDef<K, D> {
98    /// Publish a given gossip
99    pub fn publish_gossip(&mut self, topic: IdentTopic, contents: Vec<u8>) {
100        if let Err(e) = self.gossipsub.publish(topic, contents) {
101            tracing::warn!("Failed to publish gossip message. Error: {:?}", e);
102        }
103    }
104    /// Subscribe to a given topic
105    pub fn subscribe_gossip(&mut self, t: &str) {
106        if let Err(e) = self.gossipsub.subscribe(&IdentTopic::new(t)) {
107            error!("Failed to subscribe to topic {:?}. Error: {:?}", t, e);
108        }
109    }
110
111    /// Unsubscribe from a given topic
112    pub fn unsubscribe_gossip(&mut self, t: &str) {
113        if let Err(e) = self.gossipsub.unsubscribe(&IdentTopic::new(t)) {
114            error!("Failed to unsubscribe from topic {:?}. Error: {:?}", t, e);
115        }
116    }
117}
118
119/// Request/response functions
120impl<K: SignatureKey + 'static, D: DhtPersistentStorage> NetworkDef<K, D> {
121    /// Add a direct request for a given peer
122    pub fn add_direct_request(&mut self, peer_id: PeerId, data: Vec<u8>) -> OutboundRequestId {
123        self.direct_message.send_request(&peer_id, data)
124    }
125
126    /// Add a direct response for a channel
127    pub fn add_direct_response(&mut self, chan: ResponseChannel<Vec<u8>>, msg: Vec<u8>) {
128        let _ = self.direct_message.send_response(chan, msg);
129    }
130}
131
132impl From<GossipEvent> for NetworkEventInternal {
133    fn from(event: GossipEvent) -> Self {
134        Self::GossipEvent(Box::new(event))
135    }
136}
137
138impl From<libp2p::kad::Event> for NetworkEventInternal {
139    fn from(event: libp2p::kad::Event) -> Self {
140        Self::DHTEvent(event)
141    }
142}
143
144impl From<IdentifyEvent> for NetworkEventInternal {
145    fn from(event: IdentifyEvent) -> Self {
146        Self::IdentifyEvent(Box::new(event))
147    }
148}
149impl From<libp2p::request_response::Event<Vec<u8>, Vec<u8>>> for NetworkEventInternal {
150    fn from(value: libp2p::request_response::Event<Vec<u8>, Vec<u8>>) -> Self {
151        Self::DMEvent(value)
152    }
153}
154
155impl From<libp2p::autonat::Event> for NetworkEventInternal {
156    fn from(event: libp2p::autonat::Event) -> Self {
157        Self::AutonatEvent(event)
158    }
159}