cliquenet/
metrics.rs

1use std::{collections::HashMap, fmt::Display, hash::Hash, sync::Arc, time::Duration};
2
3use hotshot_types::traits::metrics::{Counter, CounterFamily, Gauge, GaugeFamily, Metrics};
4
5const CONNECT_ATTEMPTS: &str = "connect_attempts";
6const LATENCY: &str = "latency_ms";
7const PEER_OQUEUE_CAP: &str = "peer_oqueue_cap";
8const PEER_IQUEUE_CAP: &str = "peer_iqueue_cap";
9
10#[derive(Debug, Clone)]
11#[non_exhaustive]
12pub struct NetworkMetrics<K> {
13    pub connections: Box<dyn Gauge>,
14    pub iqueue: Box<dyn Gauge>,
15    pub oqueue: Box<dyn Gauge>,
16    peer_counter_fams: HashMap<&'static str, Arc<dyn CounterFamily>>,
17    peer_gauge_fams: HashMap<&'static str, Arc<dyn GaugeFamily>>,
18    connects: HashMap<K, Box<dyn Counter>>,
19    latencies: HashMap<K, Box<dyn Gauge>>,
20    peer_oqueues: HashMap<K, Box<dyn Gauge>>,
21    peer_iqueues: HashMap<K, Box<dyn Gauge>>,
22}
23
24impl<K> NetworkMetrics<K>
25where
26    K: Display + Eq + Hash + Clone,
27{
28    pub fn new<P>(label: &str, metrics: &dyn Metrics, parties: P) -> Self
29    where
30        P: IntoIterator<Item = K>,
31    {
32        let group = metrics.subgroup(format!("cliquenet_{label})"));
33
34        let peers = vec!["peers".into()];
35
36        let mut cf: HashMap<&'static str, Arc<dyn CounterFamily>> = HashMap::new();
37        cf.insert(
38            CONNECT_ATTEMPTS,
39            group
40                .counter_family(CONNECT_ATTEMPTS.into(), peers.clone())
41                .into(),
42        );
43
44        let mut gf: HashMap<&'static str, Arc<dyn GaugeFamily>> = HashMap::new();
45        gf.insert(
46            LATENCY,
47            group.gauge_family(LATENCY.into(), peers.clone()).into(),
48        );
49        gf.insert(
50            PEER_OQUEUE_CAP,
51            group
52                .gauge_family(PEER_OQUEUE_CAP.into(), peers.clone())
53                .into(),
54        );
55        gf.insert(
56            PEER_IQUEUE_CAP,
57            group.gauge_family(PEER_IQUEUE_CAP.into(), peers).into(),
58        );
59
60        let connects = peer_counters(&*cf[CONNECT_ATTEMPTS], parties);
61
62        Self {
63            connections: group.create_gauge("connections".into(), None),
64            iqueue: group.create_gauge("iqueue_cap".into(), None),
65            oqueue: group.create_gauge("oqueue_cap".into(), None),
66            latencies: peer_gauges(&*gf[LATENCY], connects.keys().cloned()),
67            peer_oqueues: peer_gauges(&*gf[PEER_OQUEUE_CAP], connects.keys().cloned()),
68            peer_iqueues: peer_gauges(&*gf[PEER_IQUEUE_CAP], connects.keys().cloned()),
69            connects,
70            peer_counter_fams: cf,
71            peer_gauge_fams: gf,
72        }
73    }
74
75    pub fn add_connect_attempt(&self, k: &K) {
76        if let Some(c) = self.connects.get(k) {
77            c.add(1)
78        }
79    }
80
81    pub fn set_latency(&self, k: &K, d: Duration) {
82        if let Some(g) = self.latencies.get(k) {
83            g.set(d.as_millis() as usize)
84        }
85    }
86
87    pub fn set_peer_oqueue_cap(&self, k: &K, n: usize) {
88        if let Some(g) = self.peer_oqueues.get(k) {
89            g.set(n)
90        }
91    }
92
93    pub fn set_peer_iqueue_cap(&self, k: &K, n: usize) {
94        if let Some(g) = self.peer_iqueues.get(k) {
95            g.set(n)
96        }
97    }
98
99    pub fn add_parties<P>(&mut self, parties: P)
100    where
101        P: IntoIterator<Item = K>,
102    {
103        for k in parties {
104            if !self.connects.contains_key(&k) {
105                let c = self.peer_counter_fams[CONNECT_ATTEMPTS].create(vec![k.to_string()]);
106                self.connects.insert(k.clone(), c);
107            }
108            if !self.latencies.contains_key(&k) {
109                let g = self.peer_gauge_fams[LATENCY].create(vec![k.to_string()]);
110                self.latencies.insert(k.clone(), g);
111            }
112            if !self.peer_oqueues.contains_key(&k) {
113                let g = self.peer_gauge_fams[PEER_OQUEUE_CAP].create(vec![k.to_string()]);
114                self.peer_oqueues.insert(k.clone(), g);
115            }
116            if !self.peer_iqueues.contains_key(&k) {
117                let g = self.peer_gauge_fams[PEER_IQUEUE_CAP].create(vec![k.to_string()]);
118                self.peer_iqueues.insert(k, g);
119            }
120        }
121    }
122
123    pub fn remove_parties<'a, P>(&mut self, parties: P)
124    where
125        P: IntoIterator<Item = &'a K>,
126        K: 'a,
127    {
128        // TODO: Counters and gauges should be de-registered.
129        for k in parties {
130            self.connects.remove(k);
131            self.latencies.remove(k);
132            self.peer_oqueues.remove(k);
133            self.peer_iqueues.remove(k);
134        }
135    }
136}
137
138fn peer_counters<P, K>(fam: &dyn CounterFamily, peers: P) -> HashMap<K, Box<dyn Counter>>
139where
140    P: IntoIterator<Item = K>,
141    K: Display + Eq + Hash + Clone,
142{
143    peers
144        .into_iter()
145        .map(|k| {
146            let c = fam.create(vec![k.to_string()]);
147            (k, c)
148        })
149        .collect()
150}
151
152fn peer_gauges<P, K>(fam: &dyn GaugeFamily, peers: P) -> HashMap<K, Box<dyn Gauge>>
153where
154    P: IntoIterator<Item = K>,
155    K: Display + Eq + Hash + Clone,
156{
157    peers
158        .into_iter()
159        .map(|k| {
160            let c = fam.create(vec![k.to_string()]);
161            (k, c)
162        })
163        .collect()
164}