hotshot_task_impls/
stats.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    sync::Arc,
4};
5
6use async_broadcast::{Receiver, Sender};
7use async_trait::async_trait;
8use either::Either;
9use hotshot_task::task::TaskState;
10use hotshot_types::{
11    consensus::OuterConsensus,
12    data::{EpochNumber, ViewNumber},
13    epoch_membership::EpochMembershipCoordinator,
14    traits::{BlockPayload, block_contents::BlockHeader, node_implementation::NodeType},
15    vote::HasViewNumber,
16};
17use hotshot_utils::{
18    anytrace::{Error, Level, Result},
19    line_info, warn,
20};
21use serde::{Deserialize, Serialize};
22use time::OffsetDateTime;
23
24use crate::events::HotShotEvent;
25
26#[derive(Serialize, Deserialize)]
27pub struct LeaderViewStats {
28    pub view: ViewNumber,
29    pub prev_proposal_send: Option<i128>,
30    pub proposal_send: Option<i128>,
31    pub vote_recv: Option<i128>,
32    pub da_proposal_send: Option<i128>,
33    pub builder_start: Option<i128>,
34    pub block_built: Option<i128>,
35    pub vid_disperse_send: Option<i128>,
36    pub timeout_certificate_formed: Option<i128>,
37    pub qc_formed: Option<i128>,
38    pub da_cert_send: Option<i128>,
39}
40
41#[derive(Serialize, Deserialize)]
42pub struct ReplicaViewStats {
43    pub view: ViewNumber,
44    pub view_change: Option<i128>,
45    pub proposal_timestamp: Option<i128>,
46    pub proposal_recv: Option<i128>,
47    pub vote_send: Option<i128>,
48    pub timeout_vote_send: Option<i128>,
49    pub da_proposal_received: Option<i128>,
50    pub da_proposal_validated: Option<i128>,
51    pub da_certificate_recv: Option<i128>,
52    pub proposal_prelim_validated: Option<i128>,
53    pub proposal_validated: Option<i128>,
54    pub timeout_triggered: Option<i128>,
55    pub vid_share_validated: Option<i128>,
56    pub vid_share_recv: Option<i128>,
57}
58
59impl LeaderViewStats {
60    fn new(view: ViewNumber) -> Self {
61        Self {
62            view,
63            prev_proposal_send: None,
64            proposal_send: None,
65            vote_recv: None,
66            da_proposal_send: None,
67            builder_start: None,
68            block_built: None,
69            vid_disperse_send: None,
70            timeout_certificate_formed: None,
71            qc_formed: None,
72            da_cert_send: None,
73        }
74    }
75}
76
77impl ReplicaViewStats {
78    fn new(view: ViewNumber) -> Self {
79        Self {
80            view,
81            view_change: None,
82            proposal_timestamp: None,
83            proposal_recv: None,
84            vote_send: None,
85            timeout_vote_send: None,
86            da_proposal_received: None,
87            da_proposal_validated: None,
88            da_certificate_recv: None,
89            proposal_prelim_validated: None,
90            proposal_validated: None,
91            timeout_triggered: None,
92            vid_share_validated: None,
93            vid_share_recv: None,
94        }
95    }
96}
97
98pub struct StatsTaskState<TYPES: NodeType> {
99    view: ViewNumber,
100    epoch: Option<EpochNumber>,
101    public_key: TYPES::SignatureKey,
102    consensus: OuterConsensus<TYPES>,
103    membership_coordinator: EpochMembershipCoordinator<TYPES>,
104    leader_stats: BTreeMap<ViewNumber, LeaderViewStats>,
105    replica_stats: BTreeMap<ViewNumber, ReplicaViewStats>,
106    latencies_by_view: BTreeMap<ViewNumber, i128>,
107    sizes_by_view: BTreeMap<ViewNumber, i128>,
108    epoch_start_times: BTreeMap<EpochNumber, i128>,
109    timeouts: BTreeSet<ViewNumber>,
110}
111
112impl<TYPES: NodeType> StatsTaskState<TYPES> {
113    pub fn new(
114        view: ViewNumber,
115        epoch: Option<EpochNumber>,
116        public_key: TYPES::SignatureKey,
117        consensus: OuterConsensus<TYPES>,
118        membership_coordinator: EpochMembershipCoordinator<TYPES>,
119    ) -> Self {
120        Self {
121            view,
122            epoch,
123            public_key,
124            consensus,
125            membership_coordinator,
126            leader_stats: BTreeMap::new(),
127            replica_stats: BTreeMap::new(),
128            latencies_by_view: BTreeMap::new(),
129            sizes_by_view: BTreeMap::new(),
130            epoch_start_times: BTreeMap::new(),
131            timeouts: BTreeSet::new(),
132        }
133    }
134    fn leader_entry(&mut self, view: ViewNumber) -> &mut LeaderViewStats {
135        self.leader_stats
136            .entry(view)
137            .or_insert_with(|| LeaderViewStats::new(view))
138    }
139    fn replica_entry(&mut self, view: ViewNumber) -> &mut ReplicaViewStats {
140        self.replica_stats
141            .entry(view)
142            .or_insert_with(|| ReplicaViewStats::new(view))
143    }
144    fn garbage_collect(&mut self, view: ViewNumber) {
145        self.leader_stats = self.leader_stats.split_off(&view);
146        self.replica_stats = self.replica_stats.split_off(&view);
147        self.latencies_by_view = self.latencies_by_view.split_off(&view);
148        self.sizes_by_view = self.sizes_by_view.split_off(&view);
149        self.timeouts = BTreeSet::new();
150    }
151
152    fn dump_stats(&self) -> Result<()> {
153        let mut writer = csv::Writer::from_writer(vec![]);
154        for (_, leader_stats) in self.leader_stats.iter() {
155            writer
156                .serialize(leader_stats)
157                .map_err(|e| warn!("Failed to serialize leader stats: {}", e))?;
158        }
159        let output = writer
160            .into_inner()
161            .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
162        tracing::warn!(
163            "Leader stats: {}",
164            String::from_utf8(output)
165                .map_err(|e| warn!("Failed to convert leader stats to string: {}", e))?
166        );
167        let mut writer = csv::Writer::from_writer(vec![]);
168        for (_, replica_stats) in self.replica_stats.iter() {
169            writer
170                .serialize(replica_stats)
171                .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
172        }
173        let output = writer
174            .into_inner()
175            .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
176        tracing::warn!(
177            "Replica stats: {}",
178            String::from_utf8(output)
179                .map_err(|e| warn!("Failed to convert replica stats to string: {}", e))?
180        );
181        Ok(())
182    }
183
184    fn log_basic_stats(&self, now: i128, epoch: &EpochNumber) {
185        let num_views = self.latencies_by_view.len();
186        let total_size = self.sizes_by_view.values().sum::<i128>();
187
188        // Either we have no views logged yet, no TXNs or we are not in the DA committee and don't know block sizes
189        if num_views == 0 || total_size == 0 {
190            return;
191        }
192
193        let total_latency = self.latencies_by_view.values().sum::<i128>();
194        let average_latency = total_latency / num_views as i128;
195        tracing::warn!("Average latency: {}ms", average_latency);
196        tracing::warn!(
197            "Number of timeouts in epoch: {}, is {}",
198            epoch,
199            self.timeouts.len()
200        );
201        if let Some(epoch_start_time) = self.epoch_start_times.get(epoch) {
202            let elapsed_time = now - epoch_start_time;
203            // multiply by 1000 to convert to seconds
204            let throughput = (total_size / elapsed_time) * 1000;
205            tracing::warn!("Throughput: {} bytes/s", throughput);
206        }
207    }
208}
209
210#[async_trait]
211impl<TYPES: NodeType> TaskState for StatsTaskState<TYPES> {
212    type Event = HotShotEvent<TYPES>;
213
214    async fn handle_event(
215        &mut self,
216        event: Arc<Self::Event>,
217        _sender: &Sender<Arc<Self::Event>>,
218        _receiver: &Receiver<Arc<Self::Event>>,
219    ) -> Result<()> {
220        let now = OffsetDateTime::now_utc().unix_timestamp_nanos();
221
222        match event.as_ref() {
223            HotShotEvent::BlockRecv(block_recv) => {
224                self.leader_entry(block_recv.view_number).block_built = Some(now);
225            },
226            HotShotEvent::QuorumProposalRecv(proposal, _) => {
227                self.replica_entry(proposal.data.view_number())
228                    .proposal_recv = Some(now);
229            },
230            HotShotEvent::QuorumVoteRecv(_vote) => {},
231            HotShotEvent::TimeoutVoteRecv(_vote) => {},
232            HotShotEvent::TimeoutVoteSend(vote) => {
233                self.replica_entry(vote.view_number()).timeout_vote_send = Some(now);
234            },
235            HotShotEvent::DaProposalRecv(proposal, _) => {
236                self.replica_entry(proposal.data.view_number())
237                    .da_proposal_received = Some(now);
238            },
239            HotShotEvent::DaProposalValidated(proposal, _) => {
240                self.replica_entry(proposal.data.view_number())
241                    .da_proposal_validated = Some(now);
242            },
243            HotShotEvent::DaVoteRecv(_simple_vote) => {},
244            HotShotEvent::DaCertificateRecv(simple_certificate) => {
245                self.replica_entry(simple_certificate.view_number())
246                    .da_certificate_recv = Some(now);
247            },
248            HotShotEvent::DaCertificateValidated(_simple_certificate) => {},
249            HotShotEvent::QuorumProposalSend(proposal, _) => {
250                self.leader_entry(proposal.data.view_number()).proposal_send = Some(now);
251
252                // If the last view succeeded, add the metric for time between proposals
253                if proposal.data.view_change_evidence().is_none()
254                    && let Some(previous_proposal_time) = self
255                        .replica_entry(proposal.data.view_number() - 1)
256                        .proposal_recv
257                {
258                    self.leader_entry(proposal.data.view_number())
259                        .prev_proposal_send = Some(previous_proposal_time);
260
261                    // calculate the elapsed time as milliseconds (from nanoseconds)
262                    let elapsed_time = (now - previous_proposal_time) / 1_000_000;
263                    if elapsed_time > 0 {
264                        self.consensus
265                            .read()
266                            .await
267                            .metrics
268                            .previous_proposal_to_proposal_time
269                            .add_point(elapsed_time as f64);
270                    } else {
271                        tracing::warn!("Previous proposal time is in the future");
272                    }
273                }
274            },
275            HotShotEvent::QuorumVoteSend(simple_vote) => {
276                self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
277            },
278            HotShotEvent::ExtendedQuorumVoteSend(simple_vote) => {
279                self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
280            },
281            HotShotEvent::QuorumProposalValidated(proposal, _) => {
282                self.replica_entry(proposal.data.view_number())
283                    .proposal_validated = Some(now);
284                self.replica_entry(proposal.data.view_number())
285                    .proposal_timestamp =
286                    Some(proposal.data.block_header().timestamp_millis() as i128);
287            },
288            HotShotEvent::DaProposalSend(proposal, _) => {
289                self.leader_entry(proposal.data.view_number())
290                    .da_proposal_send = Some(now);
291            },
292            HotShotEvent::DaVoteSend(simple_vote) => {
293                self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
294            },
295            HotShotEvent::QcFormed(either) => {
296                match either {
297                    Either::Left(qc) => {
298                        self.leader_entry(qc.view_number() + 1).qc_formed = Some(now)
299                    },
300                    Either::Right(tc) => {
301                        self.leader_entry(tc.view_number())
302                            .timeout_certificate_formed = Some(now)
303                    },
304                };
305            },
306            HotShotEvent::Qc2Formed(either) => {
307                match either {
308                    Either::Left(qc) => {
309                        self.leader_entry(qc.view_number() + 1).qc_formed = Some(now)
310                    },
311                    Either::Right(tc) => {
312                        self.leader_entry(tc.view_number())
313                            .timeout_certificate_formed = Some(now)
314                    },
315                };
316            },
317            HotShotEvent::DacSend(simple_certificate, _) => {
318                self.leader_entry(simple_certificate.view_number())
319                    .da_cert_send = Some(now);
320            },
321            HotShotEvent::ViewChange(view, epoch) => {
322                // Record the timestamp of the first observed view change
323                // This can happen when transitioning to the next view, either due to voting
324                // or receiving a proposal, but we only store the first one
325                if self.replica_entry(*view + 1).view_change.is_none() {
326                    self.replica_entry(*view + 1).view_change = Some(now);
327                }
328
329                if *epoch <= self.epoch && *view <= self.view {
330                    return Ok(());
331                }
332                if self.view < *view {
333                    self.view = *view;
334                }
335                let prev_epoch = self.epoch;
336                let mut new_epoch = false;
337                if self.epoch < *epoch {
338                    self.epoch = *epoch;
339                    new_epoch = true;
340                }
341                if *view == ViewNumber::new(0) {
342                    return Ok(());
343                }
344
345                if new_epoch {
346                    if let Some(prev_epoch) = prev_epoch {
347                        self.log_basic_stats(now, &prev_epoch);
348                    }
349                    let _ = self.dump_stats();
350                    self.garbage_collect(*view - 1);
351                }
352
353                let leader = self
354                    .membership_coordinator
355                    .membership_for_epoch(*epoch)
356                    .await?
357                    .leader(*view)
358                    .await?;
359                if leader == self.public_key {
360                    self.leader_entry(*view).builder_start = Some(now);
361                }
362            },
363            HotShotEvent::Timeout(view, _) => {
364                self.replica_entry(*view).timeout_triggered = Some(now);
365                self.timeouts.insert(*view);
366            },
367            HotShotEvent::TransactionsRecv(_txns) => {
368                // TODO: Track transactions by time
369                // #3526 https://github.com/EspressoSystems/espresso-network/issues/3526
370            },
371            HotShotEvent::SendPayloadCommitmentAndMetadata(_, _, _, view, _) => {
372                self.leader_entry(*view).vid_disperse_send = Some(now);
373            },
374            HotShotEvent::VidShareRecv(_, proposal) => {
375                self.replica_entry(proposal.data.view_number())
376                    .vid_share_recv = Some(now);
377            },
378            HotShotEvent::VidShareValidated(proposal) => {
379                self.replica_entry(proposal.data.view_number())
380                    .vid_share_validated = Some(now);
381            },
382            HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
383                self.replica_entry(proposal.data.view_number())
384                    .proposal_prelim_validated = Some(now);
385            },
386            HotShotEvent::LeavesDecided(leaves) => {
387                for leaf in leaves {
388                    if leaf.view_number() == ViewNumber::genesis() {
389                        continue;
390                    }
391                    let view = leaf.view_number();
392                    let timestamp = leaf.block_header().timestamp_millis() as i128;
393                    let now_millis = now / 1_000_000;
394                    let latency = now_millis - timestamp;
395                    tracing::debug!("View {} Latency: {}ms", view, latency);
396                    self.latencies_by_view.insert(view, latency);
397                    self.sizes_by_view.insert(
398                        view,
399                        leaf.block_payload().map(|p| p.txn_bytes()).unwrap_or(0) as i128,
400                    );
401                }
402            },
403            _ => {},
404        }
405        Ok(())
406    }
407
408    fn cancel_subtasks(&mut self) {
409        // No subtasks to cancel
410    }
411}