1use std::{
2 collections::{BTreeMap, BTreeSet},
3 sync::Arc,
4};
5
6use async_broadcast::{Receiver, Sender};
7use async_trait::async_trait;
8use either::Either;
9use hotshot_task::task::TaskState;
10use hotshot_types::{
11 consensus::OuterConsensus,
12 data::{EpochNumber, ViewNumber},
13 epoch_membership::EpochMembershipCoordinator,
14 traits::{BlockPayload, block_contents::BlockHeader, node_implementation::NodeType},
15 vote::HasViewNumber,
16};
17use hotshot_utils::{
18 anytrace::{Error, Level, Result},
19 line_info, warn,
20};
21use serde::{Deserialize, Serialize};
22use time::OffsetDateTime;
23
24use crate::events::HotShotEvent;
25
26#[derive(Serialize, Deserialize)]
27pub struct LeaderViewStats {
28 pub view: ViewNumber,
29 pub prev_proposal_send: Option<i128>,
30 pub proposal_send: Option<i128>,
31 pub vote_recv: Option<i128>,
32 pub da_proposal_send: Option<i128>,
33 pub builder_start: Option<i128>,
34 pub block_built: Option<i128>,
35 pub vid_disperse_send: Option<i128>,
36 pub timeout_certificate_formed: Option<i128>,
37 pub qc_formed: Option<i128>,
38 pub da_cert_send: Option<i128>,
39}
40
41#[derive(Serialize, Deserialize)]
42pub struct ReplicaViewStats {
43 pub view: ViewNumber,
44 pub view_change: Option<i128>,
45 pub proposal_timestamp: Option<i128>,
46 pub proposal_recv: Option<i128>,
47 pub vote_send: Option<i128>,
48 pub timeout_vote_send: Option<i128>,
49 pub da_proposal_received: Option<i128>,
50 pub da_proposal_validated: Option<i128>,
51 pub da_certificate_recv: Option<i128>,
52 pub proposal_prelim_validated: Option<i128>,
53 pub proposal_validated: Option<i128>,
54 pub timeout_triggered: Option<i128>,
55 pub vid_share_validated: Option<i128>,
56 pub vid_share_recv: Option<i128>,
57}
58
59impl LeaderViewStats {
60 fn new(view: ViewNumber) -> Self {
61 Self {
62 view,
63 prev_proposal_send: None,
64 proposal_send: None,
65 vote_recv: None,
66 da_proposal_send: None,
67 builder_start: None,
68 block_built: None,
69 vid_disperse_send: None,
70 timeout_certificate_formed: None,
71 qc_formed: None,
72 da_cert_send: None,
73 }
74 }
75}
76
77impl ReplicaViewStats {
78 fn new(view: ViewNumber) -> Self {
79 Self {
80 view,
81 view_change: None,
82 proposal_timestamp: None,
83 proposal_recv: None,
84 vote_send: None,
85 timeout_vote_send: None,
86 da_proposal_received: None,
87 da_proposal_validated: None,
88 da_certificate_recv: None,
89 proposal_prelim_validated: None,
90 proposal_validated: None,
91 timeout_triggered: None,
92 vid_share_validated: None,
93 vid_share_recv: None,
94 }
95 }
96}
97
98pub struct StatsTaskState<TYPES: NodeType> {
99 view: ViewNumber,
100 epoch: Option<EpochNumber>,
101 public_key: TYPES::SignatureKey,
102 consensus: OuterConsensus<TYPES>,
103 membership_coordinator: EpochMembershipCoordinator<TYPES>,
104 leader_stats: BTreeMap<ViewNumber, LeaderViewStats>,
105 replica_stats: BTreeMap<ViewNumber, ReplicaViewStats>,
106 latencies_by_view: BTreeMap<ViewNumber, i128>,
107 sizes_by_view: BTreeMap<ViewNumber, i128>,
108 epoch_start_times: BTreeMap<EpochNumber, i128>,
109 timeouts: BTreeSet<ViewNumber>,
110}
111
112impl<TYPES: NodeType> StatsTaskState<TYPES> {
113 pub fn new(
114 view: ViewNumber,
115 epoch: Option<EpochNumber>,
116 public_key: TYPES::SignatureKey,
117 consensus: OuterConsensus<TYPES>,
118 membership_coordinator: EpochMembershipCoordinator<TYPES>,
119 ) -> Self {
120 Self {
121 view,
122 epoch,
123 public_key,
124 consensus,
125 membership_coordinator,
126 leader_stats: BTreeMap::new(),
127 replica_stats: BTreeMap::new(),
128 latencies_by_view: BTreeMap::new(),
129 sizes_by_view: BTreeMap::new(),
130 epoch_start_times: BTreeMap::new(),
131 timeouts: BTreeSet::new(),
132 }
133 }
134 fn leader_entry(&mut self, view: ViewNumber) -> &mut LeaderViewStats {
135 self.leader_stats
136 .entry(view)
137 .or_insert_with(|| LeaderViewStats::new(view))
138 }
139 fn replica_entry(&mut self, view: ViewNumber) -> &mut ReplicaViewStats {
140 self.replica_stats
141 .entry(view)
142 .or_insert_with(|| ReplicaViewStats::new(view))
143 }
144 fn garbage_collect(&mut self, view: ViewNumber) {
145 self.leader_stats = self.leader_stats.split_off(&view);
146 self.replica_stats = self.replica_stats.split_off(&view);
147 self.latencies_by_view = self.latencies_by_view.split_off(&view);
148 self.sizes_by_view = self.sizes_by_view.split_off(&view);
149 self.timeouts = BTreeSet::new();
150 }
151
152 fn dump_stats(&self) -> Result<()> {
153 let mut writer = csv::Writer::from_writer(vec![]);
154 for (_, leader_stats) in self.leader_stats.iter() {
155 writer
156 .serialize(leader_stats)
157 .map_err(|e| warn!("Failed to serialize leader stats: {}", e))?;
158 }
159 let output = writer
160 .into_inner()
161 .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
162 tracing::warn!(
163 "Leader stats: {}",
164 String::from_utf8(output)
165 .map_err(|e| warn!("Failed to convert leader stats to string: {}", e))?
166 );
167 let mut writer = csv::Writer::from_writer(vec![]);
168 for (_, replica_stats) in self.replica_stats.iter() {
169 writer
170 .serialize(replica_stats)
171 .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
172 }
173 let output = writer
174 .into_inner()
175 .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
176 tracing::warn!(
177 "Replica stats: {}",
178 String::from_utf8(output)
179 .map_err(|e| warn!("Failed to convert replica stats to string: {}", e))?
180 );
181 Ok(())
182 }
183
184 fn log_basic_stats(&self, now: i128, epoch: &EpochNumber) {
185 let num_views = self.latencies_by_view.len();
186 let total_size = self.sizes_by_view.values().sum::<i128>();
187
188 if num_views == 0 || total_size == 0 {
190 return;
191 }
192
193 let total_latency = self.latencies_by_view.values().sum::<i128>();
194 let average_latency = total_latency / num_views as i128;
195 tracing::warn!("Average latency: {}ms", average_latency);
196 tracing::warn!(
197 "Number of timeouts in epoch: {}, is {}",
198 epoch,
199 self.timeouts.len()
200 );
201 if let Some(epoch_start_time) = self.epoch_start_times.get(epoch) {
202 let elapsed_time = now - epoch_start_time;
203 let throughput = (total_size / elapsed_time) * 1000;
205 tracing::warn!("Throughput: {} bytes/s", throughput);
206 }
207 }
208}
209
210#[async_trait]
211impl<TYPES: NodeType> TaskState for StatsTaskState<TYPES> {
212 type Event = HotShotEvent<TYPES>;
213
214 async fn handle_event(
215 &mut self,
216 event: Arc<Self::Event>,
217 _sender: &Sender<Arc<Self::Event>>,
218 _receiver: &Receiver<Arc<Self::Event>>,
219 ) -> Result<()> {
220 let now = OffsetDateTime::now_utc().unix_timestamp_nanos();
221
222 match event.as_ref() {
223 HotShotEvent::BlockRecv(block_recv) => {
224 self.leader_entry(block_recv.view_number).block_built = Some(now);
225 },
226 HotShotEvent::QuorumProposalRecv(proposal, _) => {
227 self.replica_entry(proposal.data.view_number())
228 .proposal_recv = Some(now);
229 },
230 HotShotEvent::QuorumVoteRecv(_vote) => {},
231 HotShotEvent::TimeoutVoteRecv(_vote) => {},
232 HotShotEvent::TimeoutVoteSend(vote) => {
233 self.replica_entry(vote.view_number()).timeout_vote_send = Some(now);
234 },
235 HotShotEvent::DaProposalRecv(proposal, _) => {
236 self.replica_entry(proposal.data.view_number())
237 .da_proposal_received = Some(now);
238 },
239 HotShotEvent::DaProposalValidated(proposal, _) => {
240 self.replica_entry(proposal.data.view_number())
241 .da_proposal_validated = Some(now);
242 },
243 HotShotEvent::DaVoteRecv(_simple_vote) => {},
244 HotShotEvent::DaCertificateRecv(simple_certificate) => {
245 self.replica_entry(simple_certificate.view_number())
246 .da_certificate_recv = Some(now);
247 },
248 HotShotEvent::DaCertificateValidated(_simple_certificate) => {},
249 HotShotEvent::QuorumProposalSend(proposal, _) => {
250 self.leader_entry(proposal.data.view_number()).proposal_send = Some(now);
251
252 if proposal.data.view_change_evidence().is_none()
254 && let Some(previous_proposal_time) = self
255 .replica_entry(proposal.data.view_number() - 1)
256 .proposal_recv
257 {
258 self.leader_entry(proposal.data.view_number())
259 .prev_proposal_send = Some(previous_proposal_time);
260
261 let elapsed_time = (now - previous_proposal_time) / 1_000_000;
263 if elapsed_time > 0 {
264 self.consensus
265 .read()
266 .await
267 .metrics
268 .previous_proposal_to_proposal_time
269 .add_point(elapsed_time as f64);
270 } else {
271 tracing::warn!("Previous proposal time is in the future");
272 }
273 }
274 },
275 HotShotEvent::QuorumVoteSend(simple_vote) => {
276 self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
277 },
278 HotShotEvent::ExtendedQuorumVoteSend(simple_vote) => {
279 self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
280 },
281 HotShotEvent::QuorumProposalValidated(proposal, _) => {
282 self.replica_entry(proposal.data.view_number())
283 .proposal_validated = Some(now);
284 self.replica_entry(proposal.data.view_number())
285 .proposal_timestamp =
286 Some(proposal.data.block_header().timestamp_millis() as i128);
287 },
288 HotShotEvent::DaProposalSend(proposal, _) => {
289 self.leader_entry(proposal.data.view_number())
290 .da_proposal_send = Some(now);
291 },
292 HotShotEvent::DaVoteSend(simple_vote) => {
293 self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
294 },
295 HotShotEvent::QcFormed(either) => {
296 match either {
297 Either::Left(qc) => {
298 self.leader_entry(qc.view_number() + 1).qc_formed = Some(now)
299 },
300 Either::Right(tc) => {
301 self.leader_entry(tc.view_number())
302 .timeout_certificate_formed = Some(now)
303 },
304 };
305 },
306 HotShotEvent::Qc2Formed(either) => {
307 match either {
308 Either::Left(qc) => {
309 self.leader_entry(qc.view_number() + 1).qc_formed = Some(now)
310 },
311 Either::Right(tc) => {
312 self.leader_entry(tc.view_number())
313 .timeout_certificate_formed = Some(now)
314 },
315 };
316 },
317 HotShotEvent::DacSend(simple_certificate, _) => {
318 self.leader_entry(simple_certificate.view_number())
319 .da_cert_send = Some(now);
320 },
321 HotShotEvent::ViewChange(view, epoch) => {
322 if self.replica_entry(*view + 1).view_change.is_none() {
326 self.replica_entry(*view + 1).view_change = Some(now);
327 }
328
329 if *epoch <= self.epoch && *view <= self.view {
330 return Ok(());
331 }
332 if self.view < *view {
333 self.view = *view;
334 }
335 let prev_epoch = self.epoch;
336 let mut new_epoch = false;
337 if self.epoch < *epoch {
338 self.epoch = *epoch;
339 new_epoch = true;
340 }
341 if *view == ViewNumber::new(0) {
342 return Ok(());
343 }
344
345 if new_epoch {
346 if let Some(prev_epoch) = prev_epoch {
347 self.log_basic_stats(now, &prev_epoch);
348 }
349 let _ = self.dump_stats();
350 self.garbage_collect(*view - 1);
351 }
352
353 let leader = self
354 .membership_coordinator
355 .membership_for_epoch(*epoch)
356 .await?
357 .leader(*view)
358 .await?;
359 if leader == self.public_key {
360 self.leader_entry(*view).builder_start = Some(now);
361 }
362 },
363 HotShotEvent::Timeout(view, _) => {
364 self.replica_entry(*view).timeout_triggered = Some(now);
365 self.timeouts.insert(*view);
366 },
367 HotShotEvent::TransactionsRecv(_txns) => {
368 },
371 HotShotEvent::SendPayloadCommitmentAndMetadata(_, _, _, view, _) => {
372 self.leader_entry(*view).vid_disperse_send = Some(now);
373 },
374 HotShotEvent::VidShareRecv(_, proposal) => {
375 self.replica_entry(proposal.data.view_number())
376 .vid_share_recv = Some(now);
377 },
378 HotShotEvent::VidShareValidated(proposal) => {
379 self.replica_entry(proposal.data.view_number())
380 .vid_share_validated = Some(now);
381 },
382 HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
383 self.replica_entry(proposal.data.view_number())
384 .proposal_prelim_validated = Some(now);
385 },
386 HotShotEvent::LeavesDecided(leaves) => {
387 for leaf in leaves {
388 if leaf.view_number() == ViewNumber::genesis() {
389 continue;
390 }
391 let view = leaf.view_number();
392 let timestamp = leaf.block_header().timestamp_millis() as i128;
393 let now_millis = now / 1_000_000;
394 let latency = now_millis - timestamp;
395 tracing::debug!("View {} Latency: {}ms", view, latency);
396 self.latencies_by_view.insert(view, latency);
397 self.sizes_by_view.insert(
398 view,
399 leaf.block_payload().map(|p| p.txn_bytes()).unwrap_or(0) as i128,
400 );
401 }
402 },
403 _ => {},
404 }
405 Ok(())
406 }
407
408 fn cancel_subtasks(&mut self) {
409 }
411}