hotshot_task_impls/
stats.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    sync::Arc,
4};
5
6use async_broadcast::{Receiver, Sender};
7use async_trait::async_trait;
8use either::Either;
9use hotshot_task::task::TaskState;
10use hotshot_types::{
11    consensus::OuterConsensus,
12    epoch_membership::EpochMembershipCoordinator,
13    traits::{
14        block_contents::BlockHeader,
15        node_implementation::{ConsensusTime, NodeType},
16        BlockPayload,
17    },
18    vote::HasViewNumber,
19};
20use hotshot_utils::{
21    anytrace::{Error, Level, Result},
22    line_info, warn,
23};
24use serde::{Deserialize, Serialize};
25use time::OffsetDateTime;
26
27use crate::events::HotShotEvent;
28
29#[derive(Serialize, Deserialize)]
30pub struct LeaderViewStats<TYPES: NodeType> {
31    pub view: TYPES::View,
32    pub prev_proposal_send: Option<i128>,
33    pub proposal_send: Option<i128>,
34    pub vote_recv: Option<i128>,
35    pub da_proposal_send: Option<i128>,
36    pub builder_start: Option<i128>,
37    pub block_built: Option<i128>,
38    pub vid_disperse_send: Option<i128>,
39    pub timeout_certificate_formed: Option<i128>,
40    pub qc_formed: Option<i128>,
41    pub da_cert_send: Option<i128>,
42}
43
44#[derive(Serialize, Deserialize)]
45pub struct ReplicaViewStats<TYPES: NodeType> {
46    pub view: TYPES::View,
47    pub view_change: Option<i128>,
48    pub proposal_timestamp: Option<i128>,
49    pub proposal_recv: Option<i128>,
50    pub vote_send: Option<i128>,
51    pub timeout_vote_send: Option<i128>,
52    pub da_proposal_received: Option<i128>,
53    pub da_proposal_validated: Option<i128>,
54    pub da_certificate_recv: Option<i128>,
55    pub proposal_prelim_validated: Option<i128>,
56    pub proposal_validated: Option<i128>,
57    pub timeout_triggered: Option<i128>,
58    pub vid_share_validated: Option<i128>,
59    pub vid_share_recv: Option<i128>,
60}
61
62impl<TYPES: NodeType> LeaderViewStats<TYPES> {
63    fn new(view: TYPES::View) -> Self {
64        Self {
65            view,
66            prev_proposal_send: None,
67            proposal_send: None,
68            vote_recv: None,
69            da_proposal_send: None,
70            builder_start: None,
71            block_built: None,
72            vid_disperse_send: None,
73            timeout_certificate_formed: None,
74            qc_formed: None,
75            da_cert_send: None,
76        }
77    }
78}
79
80impl<TYPES: NodeType> ReplicaViewStats<TYPES> {
81    fn new(view: TYPES::View) -> Self {
82        Self {
83            view,
84            view_change: None,
85            proposal_timestamp: None,
86            proposal_recv: None,
87            vote_send: None,
88            timeout_vote_send: None,
89            da_proposal_received: None,
90            da_proposal_validated: None,
91            da_certificate_recv: None,
92            proposal_prelim_validated: None,
93            proposal_validated: None,
94            timeout_triggered: None,
95            vid_share_validated: None,
96            vid_share_recv: None,
97        }
98    }
99}
100
101pub struct StatsTaskState<TYPES: NodeType> {
102    view: TYPES::View,
103    epoch: Option<TYPES::Epoch>,
104    public_key: TYPES::SignatureKey,
105    consensus: OuterConsensus<TYPES>,
106    membership_coordinator: EpochMembershipCoordinator<TYPES>,
107    leader_stats: BTreeMap<TYPES::View, LeaderViewStats<TYPES>>,
108    replica_stats: BTreeMap<TYPES::View, ReplicaViewStats<TYPES>>,
109    latencies_by_view: BTreeMap<TYPES::View, i128>,
110    sizes_by_view: BTreeMap<TYPES::View, i128>,
111    epoch_start_times: BTreeMap<TYPES::Epoch, i128>,
112    timeouts: BTreeSet<TYPES::View>,
113}
114
115impl<TYPES: NodeType> StatsTaskState<TYPES> {
116    pub fn new(
117        view: TYPES::View,
118        epoch: Option<TYPES::Epoch>,
119        public_key: TYPES::SignatureKey,
120        consensus: OuterConsensus<TYPES>,
121        membership_coordinator: EpochMembershipCoordinator<TYPES>,
122    ) -> Self {
123        Self {
124            view,
125            epoch,
126            public_key,
127            consensus,
128            membership_coordinator,
129            leader_stats: BTreeMap::new(),
130            replica_stats: BTreeMap::new(),
131            latencies_by_view: BTreeMap::new(),
132            sizes_by_view: BTreeMap::new(),
133            epoch_start_times: BTreeMap::new(),
134            timeouts: BTreeSet::new(),
135        }
136    }
137    fn leader_entry(&mut self, view: TYPES::View) -> &mut LeaderViewStats<TYPES> {
138        self.leader_stats
139            .entry(view)
140            .or_insert_with(|| LeaderViewStats::new(view))
141    }
142    fn replica_entry(&mut self, view: TYPES::View) -> &mut ReplicaViewStats<TYPES> {
143        self.replica_stats
144            .entry(view)
145            .or_insert_with(|| ReplicaViewStats::new(view))
146    }
147    fn garbage_collect(&mut self, view: TYPES::View) {
148        self.leader_stats = self.leader_stats.split_off(&view);
149        self.replica_stats = self.replica_stats.split_off(&view);
150        self.latencies_by_view = self.latencies_by_view.split_off(&view);
151        self.sizes_by_view = self.sizes_by_view.split_off(&view);
152        self.timeouts = BTreeSet::new();
153    }
154
155    fn dump_stats(&self) -> Result<()> {
156        let mut writer = csv::Writer::from_writer(vec![]);
157        for (_, leader_stats) in self.leader_stats.iter() {
158            writer
159                .serialize(leader_stats)
160                .map_err(|e| warn!("Failed to serialize leader stats: {}", e))?;
161        }
162        let output = writer
163            .into_inner()
164            .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
165        tracing::warn!(
166            "Leader stats: {}",
167            String::from_utf8(output)
168                .map_err(|e| warn!("Failed to convert leader stats to string: {}", e))?
169        );
170        let mut writer = csv::Writer::from_writer(vec![]);
171        for (_, replica_stats) in self.replica_stats.iter() {
172            writer
173                .serialize(replica_stats)
174                .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
175        }
176        let output = writer
177            .into_inner()
178            .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
179        tracing::warn!(
180            "Replica stats: {}",
181            String::from_utf8(output)
182                .map_err(|e| warn!("Failed to convert replica stats to string: {}", e))?
183        );
184        Ok(())
185    }
186
187    fn log_basic_stats(&self, now: i128, epoch: &TYPES::Epoch) {
188        let num_views = self.latencies_by_view.len();
189        let total_size = self.sizes_by_view.values().sum::<i128>();
190
191        // Either we have no views logged yet, no TXNs or we are not in the DA committee and don't know block sizes
192        if num_views == 0 || total_size == 0 {
193            return;
194        }
195
196        let total_latency = self.latencies_by_view.values().sum::<i128>();
197        let average_latency = total_latency / num_views as i128;
198        tracing::warn!("Average latency: {}ms", average_latency);
199        tracing::warn!(
200            "Number of timeouts in epoch: {}, is {}",
201            epoch,
202            self.timeouts.len()
203        );
204        if let Some(epoch_start_time) = self.epoch_start_times.get(epoch) {
205            let elapsed_time = now - epoch_start_time;
206            // multiply by 1000 to convert to seconds
207            let throughput = (total_size / elapsed_time) * 1000;
208            tracing::warn!("Throughput: {} bytes/s", throughput);
209        }
210    }
211}
212
213#[async_trait]
214impl<TYPES: NodeType> TaskState for StatsTaskState<TYPES> {
215    type Event = HotShotEvent<TYPES>;
216
217    async fn handle_event(
218        &mut self,
219        event: Arc<Self::Event>,
220        _sender: &Sender<Arc<Self::Event>>,
221        _receiver: &Receiver<Arc<Self::Event>>,
222    ) -> Result<()> {
223        let now = OffsetDateTime::now_utc().unix_timestamp_nanos();
224
225        match event.as_ref() {
226            HotShotEvent::BlockRecv(block_recv) => {
227                self.leader_entry(block_recv.view_number).block_built = Some(now);
228            },
229            HotShotEvent::QuorumProposalRecv(proposal, _) => {
230                self.replica_entry(proposal.data.view_number())
231                    .proposal_recv = Some(now);
232            },
233            HotShotEvent::QuorumVoteRecv(_vote) => {},
234            HotShotEvent::TimeoutVoteRecv(_vote) => {},
235            HotShotEvent::TimeoutVoteSend(vote) => {
236                self.replica_entry(vote.view_number()).timeout_vote_send = Some(now);
237            },
238            HotShotEvent::DaProposalRecv(proposal, _) => {
239                self.replica_entry(proposal.data.view_number())
240                    .da_proposal_received = Some(now);
241            },
242            HotShotEvent::DaProposalValidated(proposal, _) => {
243                self.replica_entry(proposal.data.view_number())
244                    .da_proposal_validated = Some(now);
245            },
246            HotShotEvent::DaVoteRecv(_simple_vote) => {},
247            HotShotEvent::DaCertificateRecv(simple_certificate) => {
248                self.replica_entry(simple_certificate.view_number())
249                    .da_certificate_recv = Some(now);
250            },
251            HotShotEvent::DaCertificateValidated(_simple_certificate) => {},
252            HotShotEvent::QuorumProposalSend(proposal, _) => {
253                self.leader_entry(proposal.data.view_number()).proposal_send = Some(now);
254
255                // If the last view succeeded, add the metric for time between proposals
256                if proposal.data.view_change_evidence().is_none() {
257                    if let Some(previous_proposal_time) = self
258                        .replica_entry(proposal.data.view_number() - 1)
259                        .proposal_recv
260                    {
261                        self.leader_entry(proposal.data.view_number())
262                            .prev_proposal_send = Some(previous_proposal_time);
263
264                        // calculate the elapsed time as milliseconds (from nanoseconds)
265                        let elapsed_time = (now - previous_proposal_time) / 1_000_000;
266                        if elapsed_time > 0 {
267                            self.consensus
268                                .read()
269                                .await
270                                .metrics
271                                .previous_proposal_to_proposal_time
272                                .add_point(elapsed_time as f64);
273                        } else {
274                            tracing::warn!("Previous proposal time is in the future");
275                        }
276                    }
277                }
278            },
279            HotShotEvent::QuorumVoteSend(simple_vote) => {
280                self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
281            },
282            HotShotEvent::ExtendedQuorumVoteSend(simple_vote) => {
283                self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
284            },
285            HotShotEvent::QuorumProposalValidated(proposal, _) => {
286                self.replica_entry(proposal.data.view_number())
287                    .proposal_validated = Some(now);
288                self.replica_entry(proposal.data.view_number())
289                    .proposal_timestamp =
290                    Some(proposal.data.block_header().timestamp_millis() as i128);
291            },
292            HotShotEvent::DaProposalSend(proposal, _) => {
293                self.leader_entry(proposal.data.view_number())
294                    .da_proposal_send = Some(now);
295            },
296            HotShotEvent::DaVoteSend(simple_vote) => {
297                self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
298            },
299            HotShotEvent::QcFormed(either) => {
300                match either {
301                    Either::Left(qc) => {
302                        self.leader_entry(qc.view_number() + 1).qc_formed = Some(now)
303                    },
304                    Either::Right(tc) => {
305                        self.leader_entry(tc.view_number())
306                            .timeout_certificate_formed = Some(now)
307                    },
308                };
309            },
310            HotShotEvent::Qc2Formed(either) => {
311                match either {
312                    Either::Left(qc) => {
313                        self.leader_entry(qc.view_number() + 1).qc_formed = Some(now)
314                    },
315                    Either::Right(tc) => {
316                        self.leader_entry(tc.view_number())
317                            .timeout_certificate_formed = Some(now)
318                    },
319                };
320            },
321            HotShotEvent::DacSend(simple_certificate, _) => {
322                self.leader_entry(simple_certificate.view_number())
323                    .da_cert_send = Some(now);
324            },
325            HotShotEvent::ViewChange(view, epoch) => {
326                // Record the timestamp of the first observed view change
327                // This can happen when transitioning to the next view, either due to voting
328                // or receiving a proposal, but we only store the first one
329                if self.replica_entry(*view + 1).view_change.is_none() {
330                    self.replica_entry(*view + 1).view_change = Some(now);
331                }
332
333                if *epoch <= self.epoch && *view <= self.view {
334                    return Ok(());
335                }
336                if self.view < *view {
337                    self.view = *view;
338                }
339                let prev_epoch = self.epoch;
340                let mut new_epoch = false;
341                if self.epoch < *epoch {
342                    self.epoch = *epoch;
343                    new_epoch = true;
344                }
345                if *view == TYPES::View::new(0) {
346                    return Ok(());
347                }
348
349                if new_epoch {
350                    if let Some(prev_epoch) = prev_epoch {
351                        self.log_basic_stats(now, &prev_epoch);
352                    }
353                    let _ = self.dump_stats();
354                    self.garbage_collect(*view - 1);
355                }
356
357                let leader = self
358                    .membership_coordinator
359                    .membership_for_epoch(*epoch)
360                    .await?
361                    .leader(*view)
362                    .await?;
363                if leader == self.public_key {
364                    self.leader_entry(*view).builder_start = Some(now);
365                }
366            },
367            HotShotEvent::Timeout(view, _) => {
368                self.replica_entry(*view).timeout_triggered = Some(now);
369                self.timeouts.insert(*view);
370            },
371            HotShotEvent::TransactionsRecv(_txns) => {
372                // TODO: Track transactions by time
373                // #3526 https://github.com/EspressoSystems/espresso-network/issues/3526
374            },
375            HotShotEvent::SendPayloadCommitmentAndMetadata(_, _, _, view, _) => {
376                self.leader_entry(*view).vid_disperse_send = Some(now);
377            },
378            HotShotEvent::VidShareRecv(_, proposal) => {
379                self.replica_entry(proposal.data.view_number())
380                    .vid_share_recv = Some(now);
381            },
382            HotShotEvent::VidShareValidated(proposal) => {
383                self.replica_entry(proposal.data.view_number())
384                    .vid_share_validated = Some(now);
385            },
386            HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
387                self.replica_entry(proposal.data.view_number())
388                    .proposal_prelim_validated = Some(now);
389            },
390            HotShotEvent::LeavesDecided(leaves) => {
391                for leaf in leaves {
392                    if leaf.view_number() == TYPES::View::genesis() {
393                        continue;
394                    }
395                    let view = leaf.view_number();
396                    let timestamp = leaf.block_header().timestamp_millis() as i128;
397                    let now_millis = now / 1_000_000;
398                    let latency = now_millis - timestamp;
399                    tracing::debug!("View {} Latency: {}ms", view, latency);
400                    self.latencies_by_view.insert(view, latency);
401                    self.sizes_by_view.insert(
402                        view,
403                        leaf.block_payload().map(|p| p.txn_bytes()).unwrap_or(0) as i128,
404                    );
405                }
406            },
407            _ => {},
408        }
409        Ok(())
410    }
411
412    fn cancel_subtasks(&mut self) {
413        // No subtasks to cancel
414    }
415}