hotshot_task_impls/
stats.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use async_broadcast::{Receiver, Sender};
4use async_trait::async_trait;
5use either::Either;
6use hotshot_task::task::TaskState;
7use hotshot_types::{
8    consensus::OuterConsensus,
9    epoch_membership::EpochMembershipCoordinator,
10    traits::node_implementation::{ConsensusTime, NodeType},
11    vote::HasViewNumber,
12};
13use hotshot_utils::{
14    anytrace::{Error, Level, Result},
15    line_info, warn,
16};
17use serde::{Deserialize, Serialize};
18use time::OffsetDateTime;
19
20use crate::events::HotShotEvent;
21
22#[derive(Serialize, Deserialize)]
23pub struct LeaderViewStats<TYPES: NodeType> {
24    pub view: TYPES::View,
25    pub prev_proposal_send: Option<i128>,
26    pub proposal_send: Option<i128>,
27    pub vote_recv: Option<i128>,
28    pub da_proposal_send: Option<i128>,
29    pub builder_start: Option<i128>,
30    pub block_built: Option<i128>,
31    pub vid_disperse_send: Option<i128>,
32    pub timeout_certificate_formed: Option<i128>,
33    pub qc_formed: Option<i128>,
34    pub da_cert_send: Option<i128>,
35}
36
37#[derive(Serialize, Deserialize)]
38pub struct ReplicaViewStats<TYPES: NodeType> {
39    pub view: TYPES::View,
40    pub view_change: Option<i128>,
41    pub proposal_recv: Option<i128>,
42    pub vote_send: Option<i128>,
43    pub timeout_vote_send: Option<i128>,
44    pub da_proposal_received: Option<i128>,
45    pub da_proposal_validated: Option<i128>,
46    pub da_certificate_recv: Option<i128>,
47    pub proposal_prelim_validated: Option<i128>,
48    pub proposal_validated: Option<i128>,
49    pub timeout_triggered: Option<i128>,
50    pub vid_share_validated: Option<i128>,
51    pub vid_share_recv: Option<i128>,
52}
53
54impl<TYPES: NodeType> LeaderViewStats<TYPES> {
55    fn new(view: TYPES::View) -> Self {
56        Self {
57            view,
58            prev_proposal_send: None,
59            proposal_send: None,
60            vote_recv: None,
61            da_proposal_send: None,
62            builder_start: None,
63            block_built: None,
64            vid_disperse_send: None,
65            timeout_certificate_formed: None,
66            qc_formed: None,
67            da_cert_send: None,
68        }
69    }
70}
71
72impl<TYPES: NodeType> ReplicaViewStats<TYPES> {
73    fn new(view: TYPES::View) -> Self {
74        Self {
75            view,
76            view_change: None,
77            proposal_recv: None,
78            vote_send: None,
79            timeout_vote_send: None,
80            da_proposal_received: None,
81            da_proposal_validated: None,
82            da_certificate_recv: None,
83            proposal_prelim_validated: None,
84            proposal_validated: None,
85            timeout_triggered: None,
86            vid_share_validated: None,
87            vid_share_recv: None,
88        }
89    }
90}
91
92pub struct StatsTaskState<TYPES: NodeType> {
93    view: TYPES::View,
94    epoch: Option<TYPES::Epoch>,
95    public_key: TYPES::SignatureKey,
96    consensus: OuterConsensus<TYPES>,
97    membership_coordinator: EpochMembershipCoordinator<TYPES>,
98    leader_stats: BTreeMap<TYPES::View, LeaderViewStats<TYPES>>,
99    replica_stats: BTreeMap<TYPES::View, ReplicaViewStats<TYPES>>,
100}
101
102impl<TYPES: NodeType> StatsTaskState<TYPES> {
103    pub fn new(
104        view: TYPES::View,
105        epoch: Option<TYPES::Epoch>,
106        public_key: TYPES::SignatureKey,
107        consensus: OuterConsensus<TYPES>,
108        membership_coordinator: EpochMembershipCoordinator<TYPES>,
109    ) -> Self {
110        Self {
111            view,
112            epoch,
113            public_key,
114            consensus,
115            membership_coordinator,
116            leader_stats: BTreeMap::new(),
117            replica_stats: BTreeMap::new(),
118        }
119    }
120    fn leader_entry(&mut self, view: TYPES::View) -> &mut LeaderViewStats<TYPES> {
121        self.leader_stats
122            .entry(view)
123            .or_insert_with(|| LeaderViewStats::new(view))
124    }
125    fn replica_entry(&mut self, view: TYPES::View) -> &mut ReplicaViewStats<TYPES> {
126        self.replica_stats
127            .entry(view)
128            .or_insert_with(|| ReplicaViewStats::new(view))
129    }
130    fn garbage_collect(&mut self, view: TYPES::View) {
131        self.leader_stats = self.leader_stats.split_off(&view);
132        self.replica_stats = self.replica_stats.split_off(&view);
133    }
134
135    fn dump_stats(&self) -> Result<()> {
136        let mut writer = csv::Writer::from_writer(vec![]);
137        for (_, leader_stats) in self.leader_stats.iter() {
138            writer
139                .serialize(leader_stats)
140                .map_err(|e| warn!("Failed to serialize leader stats: {}", e))?;
141        }
142        let output = writer
143            .into_inner()
144            .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
145        tracing::warn!(
146            "Leader stats: {}",
147            String::from_utf8(output)
148                .map_err(|e| warn!("Failed to convert leader stats to string: {}", e))?
149        );
150        let mut writer = csv::Writer::from_writer(vec![]);
151        for (_, replica_stats) in self.replica_stats.iter() {
152            writer
153                .serialize(replica_stats)
154                .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
155        }
156        let output = writer
157            .into_inner()
158            .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
159        tracing::warn!(
160            "Replica stats: {}",
161            String::from_utf8(output)
162                .map_err(|e| warn!("Failed to convert replica stats to string: {}", e))?
163        );
164        Ok(())
165    }
166}
167
168#[async_trait]
169impl<TYPES: NodeType> TaskState for StatsTaskState<TYPES> {
170    type Event = HotShotEvent<TYPES>;
171
172    async fn handle_event(
173        &mut self,
174        event: Arc<Self::Event>,
175        _sender: &Sender<Arc<Self::Event>>,
176        _receiver: &Receiver<Arc<Self::Event>>,
177    ) -> Result<()> {
178        let now = OffsetDateTime::now_utc().unix_timestamp_nanos();
179
180        match event.as_ref() {
181            HotShotEvent::BlockRecv(block_recv) => {
182                self.leader_entry(block_recv.view_number).block_built = Some(now);
183            },
184            HotShotEvent::QuorumProposalRecv(proposal, _) => {
185                self.replica_entry(proposal.data.view_number())
186                    .proposal_recv = Some(now);
187            },
188            HotShotEvent::QuorumVoteRecv(_vote) => {},
189            HotShotEvent::TimeoutVoteRecv(_vote) => {},
190            HotShotEvent::TimeoutVoteSend(vote) => {
191                self.replica_entry(vote.view_number()).timeout_vote_send = Some(now);
192            },
193            HotShotEvent::DaProposalRecv(proposal, _) => {
194                self.replica_entry(proposal.data.view_number())
195                    .da_proposal_received = Some(now);
196            },
197            HotShotEvent::DaProposalValidated(proposal, _) => {
198                self.replica_entry(proposal.data.view_number())
199                    .da_proposal_validated = Some(now);
200            },
201            HotShotEvent::DaVoteRecv(_simple_vote) => {},
202            HotShotEvent::DaCertificateRecv(simple_certificate) => {
203                self.replica_entry(simple_certificate.view_number())
204                    .da_certificate_recv = Some(now);
205            },
206            HotShotEvent::DaCertificateValidated(_simple_certificate) => {},
207            HotShotEvent::QuorumProposalSend(proposal, _) => {
208                self.leader_entry(proposal.data.view_number()).proposal_send = Some(now);
209
210                // If the last view succeeded, add the metric for time between proposals
211                if proposal.data.view_change_evidence().is_none() {
212                    if let Some(previous_proposal_time) = self
213                        .replica_entry(proposal.data.view_number() - 1)
214                        .proposal_recv
215                    {
216                        self.leader_entry(proposal.data.view_number())
217                            .prev_proposal_send = Some(previous_proposal_time);
218
219                        // calculate the elapsed time as milliseconds (from nanoseconds)
220                        let elapsed_time = (now - previous_proposal_time) / 1_000_000;
221                        if elapsed_time > 0 {
222                            self.consensus
223                                .read()
224                                .await
225                                .metrics
226                                .previous_proposal_to_proposal_time
227                                .add_point(elapsed_time as f64);
228                        } else {
229                            tracing::warn!("Previous proposal time is in the future");
230                        }
231                    }
232                }
233            },
234            HotShotEvent::QuorumVoteSend(simple_vote) => {
235                self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
236            },
237            HotShotEvent::ExtendedQuorumVoteSend(simple_vote) => {
238                self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
239            },
240            HotShotEvent::QuorumProposalValidated(proposal, _) => {
241                self.replica_entry(proposal.data.view_number())
242                    .proposal_validated = Some(now);
243            },
244            HotShotEvent::DaProposalSend(proposal, _) => {
245                self.leader_entry(proposal.data.view_number())
246                    .da_proposal_send = Some(now);
247            },
248            HotShotEvent::DaVoteSend(simple_vote) => {
249                self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
250            },
251            HotShotEvent::QcFormed(either) => {
252                match either {
253                    Either::Left(qc) => {
254                        self.leader_entry(qc.view_number() + 1).qc_formed = Some(now)
255                    },
256                    Either::Right(tc) => {
257                        self.leader_entry(tc.view_number())
258                            .timeout_certificate_formed = Some(now)
259                    },
260                };
261            },
262            HotShotEvent::Qc2Formed(either) => {
263                match either {
264                    Either::Left(qc) => {
265                        self.leader_entry(qc.view_number() + 1).qc_formed = Some(now)
266                    },
267                    Either::Right(tc) => {
268                        self.leader_entry(tc.view_number())
269                            .timeout_certificate_formed = Some(now)
270                    },
271                };
272            },
273            HotShotEvent::DacSend(simple_certificate, _) => {
274                self.leader_entry(simple_certificate.view_number())
275                    .da_cert_send = Some(now);
276            },
277            HotShotEvent::ViewChange(view, epoch) => {
278                // Record the timestamp of the first observed view change
279                // This can happen when transitioning to the next view, either due to voting
280                // or receiving a proposal, but we only store the first one
281                if self.replica_entry(*view + 1).view_change.is_none() {
282                    self.replica_entry(*view + 1).view_change = Some(now);
283                }
284
285                if *epoch <= self.epoch && *view <= self.view {
286                    return Ok(());
287                }
288                if self.view < *view {
289                    self.view = *view;
290                }
291                let mut new_epoch = false;
292                if self.epoch < *epoch {
293                    self.epoch = *epoch;
294                    new_epoch = true;
295                }
296                if *view == TYPES::View::new(0) {
297                    return Ok(());
298                }
299
300                if new_epoch {
301                    let _ = self.dump_stats();
302                    self.garbage_collect(*view - 1);
303                }
304
305                let leader = self
306                    .membership_coordinator
307                    .membership_for_epoch(*epoch)
308                    .await?
309                    .leader(*view)
310                    .await?;
311                if leader == self.public_key {
312                    self.leader_entry(*view).builder_start = Some(now);
313                }
314            },
315            HotShotEvent::Timeout(..) => {
316                self.replica_entry(self.view).timeout_triggered = Some(now);
317            },
318            HotShotEvent::TransactionsRecv(_txns) => {
319                // TODO: Track transactions by time
320                // #3526 https://github.com/EspressoSystems/espresso-network/issues/3526
321            },
322            HotShotEvent::SendPayloadCommitmentAndMetadata(_, _, _, view, _) => {
323                self.leader_entry(*view).vid_disperse_send = Some(now);
324            },
325            HotShotEvent::VidShareRecv(_, proposal) => {
326                self.replica_entry(proposal.data.view_number())
327                    .vid_share_recv = Some(now);
328            },
329            HotShotEvent::VidShareValidated(proposal) => {
330                self.replica_entry(proposal.data.view_number())
331                    .vid_share_validated = Some(now);
332            },
333            HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
334                self.replica_entry(proposal.data.view_number())
335                    .proposal_prelim_validated = Some(now);
336            },
337            _ => {},
338        }
339        Ok(())
340    }
341
342    fn cancel_subtasks(&mut self) {
343        // No subtasks to cancel
344    }
345}