1use std::{
2 collections::{BTreeMap, BTreeSet},
3 sync::Arc,
4};
5
6use async_broadcast::{Receiver, Sender};
7use async_trait::async_trait;
8use either::Either;
9use hotshot_task::task::TaskState;
10use hotshot_types::{
11 consensus::OuterConsensus,
12 epoch_membership::EpochMembershipCoordinator,
13 traits::{
14 block_contents::BlockHeader,
15 node_implementation::{ConsensusTime, NodeType},
16 BlockPayload,
17 },
18 vote::HasViewNumber,
19};
20use hotshot_utils::{
21 anytrace::{Error, Level, Result},
22 line_info, warn,
23};
24use serde::{Deserialize, Serialize};
25use time::OffsetDateTime;
26
27use crate::events::HotShotEvent;
28
29#[derive(Serialize, Deserialize)]
30pub struct LeaderViewStats<TYPES: NodeType> {
31 pub view: TYPES::View,
32 pub prev_proposal_send: Option<i128>,
33 pub proposal_send: Option<i128>,
34 pub vote_recv: Option<i128>,
35 pub da_proposal_send: Option<i128>,
36 pub builder_start: Option<i128>,
37 pub block_built: Option<i128>,
38 pub vid_disperse_send: Option<i128>,
39 pub timeout_certificate_formed: Option<i128>,
40 pub qc_formed: Option<i128>,
41 pub da_cert_send: Option<i128>,
42}
43
44#[derive(Serialize, Deserialize)]
45pub struct ReplicaViewStats<TYPES: NodeType> {
46 pub view: TYPES::View,
47 pub view_change: Option<i128>,
48 pub proposal_timestamp: Option<i128>,
49 pub proposal_recv: Option<i128>,
50 pub vote_send: Option<i128>,
51 pub timeout_vote_send: Option<i128>,
52 pub da_proposal_received: Option<i128>,
53 pub da_proposal_validated: Option<i128>,
54 pub da_certificate_recv: Option<i128>,
55 pub proposal_prelim_validated: Option<i128>,
56 pub proposal_validated: Option<i128>,
57 pub timeout_triggered: Option<i128>,
58 pub vid_share_validated: Option<i128>,
59 pub vid_share_recv: Option<i128>,
60}
61
62impl<TYPES: NodeType> LeaderViewStats<TYPES> {
63 fn new(view: TYPES::View) -> Self {
64 Self {
65 view,
66 prev_proposal_send: None,
67 proposal_send: None,
68 vote_recv: None,
69 da_proposal_send: None,
70 builder_start: None,
71 block_built: None,
72 vid_disperse_send: None,
73 timeout_certificate_formed: None,
74 qc_formed: None,
75 da_cert_send: None,
76 }
77 }
78}
79
80impl<TYPES: NodeType> ReplicaViewStats<TYPES> {
81 fn new(view: TYPES::View) -> Self {
82 Self {
83 view,
84 view_change: None,
85 proposal_timestamp: None,
86 proposal_recv: None,
87 vote_send: None,
88 timeout_vote_send: None,
89 da_proposal_received: None,
90 da_proposal_validated: None,
91 da_certificate_recv: None,
92 proposal_prelim_validated: None,
93 proposal_validated: None,
94 timeout_triggered: None,
95 vid_share_validated: None,
96 vid_share_recv: None,
97 }
98 }
99}
100
101pub struct StatsTaskState<TYPES: NodeType> {
102 view: TYPES::View,
103 epoch: Option<TYPES::Epoch>,
104 public_key: TYPES::SignatureKey,
105 consensus: OuterConsensus<TYPES>,
106 membership_coordinator: EpochMembershipCoordinator<TYPES>,
107 leader_stats: BTreeMap<TYPES::View, LeaderViewStats<TYPES>>,
108 replica_stats: BTreeMap<TYPES::View, ReplicaViewStats<TYPES>>,
109 latencies_by_view: BTreeMap<TYPES::View, i128>,
110 sizes_by_view: BTreeMap<TYPES::View, i128>,
111 epoch_start_times: BTreeMap<TYPES::Epoch, i128>,
112 timeouts: BTreeSet<TYPES::View>,
113}
114
115impl<TYPES: NodeType> StatsTaskState<TYPES> {
116 pub fn new(
117 view: TYPES::View,
118 epoch: Option<TYPES::Epoch>,
119 public_key: TYPES::SignatureKey,
120 consensus: OuterConsensus<TYPES>,
121 membership_coordinator: EpochMembershipCoordinator<TYPES>,
122 ) -> Self {
123 Self {
124 view,
125 epoch,
126 public_key,
127 consensus,
128 membership_coordinator,
129 leader_stats: BTreeMap::new(),
130 replica_stats: BTreeMap::new(),
131 latencies_by_view: BTreeMap::new(),
132 sizes_by_view: BTreeMap::new(),
133 epoch_start_times: BTreeMap::new(),
134 timeouts: BTreeSet::new(),
135 }
136 }
137 fn leader_entry(&mut self, view: TYPES::View) -> &mut LeaderViewStats<TYPES> {
138 self.leader_stats
139 .entry(view)
140 .or_insert_with(|| LeaderViewStats::new(view))
141 }
142 fn replica_entry(&mut self, view: TYPES::View) -> &mut ReplicaViewStats<TYPES> {
143 self.replica_stats
144 .entry(view)
145 .or_insert_with(|| ReplicaViewStats::new(view))
146 }
147 fn garbage_collect(&mut self, view: TYPES::View) {
148 self.leader_stats = self.leader_stats.split_off(&view);
149 self.replica_stats = self.replica_stats.split_off(&view);
150 self.latencies_by_view = self.latencies_by_view.split_off(&view);
151 self.sizes_by_view = self.sizes_by_view.split_off(&view);
152 self.timeouts = BTreeSet::new();
153 }
154
155 fn dump_stats(&self) -> Result<()> {
156 let mut writer = csv::Writer::from_writer(vec![]);
157 for (_, leader_stats) in self.leader_stats.iter() {
158 writer
159 .serialize(leader_stats)
160 .map_err(|e| warn!("Failed to serialize leader stats: {}", e))?;
161 }
162 let output = writer
163 .into_inner()
164 .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
165 tracing::warn!(
166 "Leader stats: {}",
167 String::from_utf8(output)
168 .map_err(|e| warn!("Failed to convert leader stats to string: {}", e))?
169 );
170 let mut writer = csv::Writer::from_writer(vec![]);
171 for (_, replica_stats) in self.replica_stats.iter() {
172 writer
173 .serialize(replica_stats)
174 .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
175 }
176 let output = writer
177 .into_inner()
178 .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
179 tracing::warn!(
180 "Replica stats: {}",
181 String::from_utf8(output)
182 .map_err(|e| warn!("Failed to convert replica stats to string: {}", e))?
183 );
184 Ok(())
185 }
186
187 fn log_basic_stats(&self, now: i128, epoch: &TYPES::Epoch) {
188 let num_views = self.latencies_by_view.len();
189 let total_size = self.sizes_by_view.values().sum::<i128>();
190
191 if num_views == 0 || total_size == 0 {
193 return;
194 }
195
196 let total_latency = self.latencies_by_view.values().sum::<i128>();
197 let average_latency = total_latency / num_views as i128;
198 tracing::warn!("Average latency: {}ms", average_latency);
199 tracing::warn!(
200 "Number of timeouts in epoch: {}, is {}",
201 epoch,
202 self.timeouts.len()
203 );
204 if let Some(epoch_start_time) = self.epoch_start_times.get(epoch) {
205 let elapsed_time = now - epoch_start_time;
206 let throughput = (total_size / elapsed_time) * 1000;
208 tracing::warn!("Throughput: {} bytes/s", throughput);
209 }
210 }
211}
212
213#[async_trait]
214impl<TYPES: NodeType> TaskState for StatsTaskState<TYPES> {
215 type Event = HotShotEvent<TYPES>;
216
217 async fn handle_event(
218 &mut self,
219 event: Arc<Self::Event>,
220 _sender: &Sender<Arc<Self::Event>>,
221 _receiver: &Receiver<Arc<Self::Event>>,
222 ) -> Result<()> {
223 let now = OffsetDateTime::now_utc().unix_timestamp_nanos();
224
225 match event.as_ref() {
226 HotShotEvent::BlockRecv(block_recv) => {
227 self.leader_entry(block_recv.view_number).block_built = Some(now);
228 },
229 HotShotEvent::QuorumProposalRecv(proposal, _) => {
230 self.replica_entry(proposal.data.view_number())
231 .proposal_recv = Some(now);
232 },
233 HotShotEvent::QuorumVoteRecv(_vote) => {},
234 HotShotEvent::TimeoutVoteRecv(_vote) => {},
235 HotShotEvent::TimeoutVoteSend(vote) => {
236 self.replica_entry(vote.view_number()).timeout_vote_send = Some(now);
237 },
238 HotShotEvent::DaProposalRecv(proposal, _) => {
239 self.replica_entry(proposal.data.view_number())
240 .da_proposal_received = Some(now);
241 },
242 HotShotEvent::DaProposalValidated(proposal, _) => {
243 self.replica_entry(proposal.data.view_number())
244 .da_proposal_validated = Some(now);
245 },
246 HotShotEvent::DaVoteRecv(_simple_vote) => {},
247 HotShotEvent::DaCertificateRecv(simple_certificate) => {
248 self.replica_entry(simple_certificate.view_number())
249 .da_certificate_recv = Some(now);
250 },
251 HotShotEvent::DaCertificateValidated(_simple_certificate) => {},
252 HotShotEvent::QuorumProposalSend(proposal, _) => {
253 self.leader_entry(proposal.data.view_number()).proposal_send = Some(now);
254
255 if proposal.data.view_change_evidence().is_none() {
257 if let Some(previous_proposal_time) = self
258 .replica_entry(proposal.data.view_number() - 1)
259 .proposal_recv
260 {
261 self.leader_entry(proposal.data.view_number())
262 .prev_proposal_send = Some(previous_proposal_time);
263
264 let elapsed_time = (now - previous_proposal_time) / 1_000_000;
266 if elapsed_time > 0 {
267 self.consensus
268 .read()
269 .await
270 .metrics
271 .previous_proposal_to_proposal_time
272 .add_point(elapsed_time as f64);
273 } else {
274 tracing::warn!("Previous proposal time is in the future");
275 }
276 }
277 }
278 },
279 HotShotEvent::QuorumVoteSend(simple_vote) => {
280 self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
281 },
282 HotShotEvent::ExtendedQuorumVoteSend(simple_vote) => {
283 self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
284 },
285 HotShotEvent::QuorumProposalValidated(proposal, _) => {
286 self.replica_entry(proposal.data.view_number())
287 .proposal_validated = Some(now);
288 self.replica_entry(proposal.data.view_number())
289 .proposal_timestamp =
290 Some(proposal.data.block_header().timestamp_millis() as i128);
291 },
292 HotShotEvent::DaProposalSend(proposal, _) => {
293 self.leader_entry(proposal.data.view_number())
294 .da_proposal_send = Some(now);
295 },
296 HotShotEvent::DaVoteSend(simple_vote) => {
297 self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
298 },
299 HotShotEvent::QcFormed(either) => {
300 match either {
301 Either::Left(qc) => {
302 self.leader_entry(qc.view_number() + 1).qc_formed = Some(now)
303 },
304 Either::Right(tc) => {
305 self.leader_entry(tc.view_number())
306 .timeout_certificate_formed = Some(now)
307 },
308 };
309 },
310 HotShotEvent::Qc2Formed(either) => {
311 match either {
312 Either::Left(qc) => {
313 self.leader_entry(qc.view_number() + 1).qc_formed = Some(now)
314 },
315 Either::Right(tc) => {
316 self.leader_entry(tc.view_number())
317 .timeout_certificate_formed = Some(now)
318 },
319 };
320 },
321 HotShotEvent::DacSend(simple_certificate, _) => {
322 self.leader_entry(simple_certificate.view_number())
323 .da_cert_send = Some(now);
324 },
325 HotShotEvent::ViewChange(view, epoch) => {
326 if self.replica_entry(*view + 1).view_change.is_none() {
330 self.replica_entry(*view + 1).view_change = Some(now);
331 }
332
333 if *epoch <= self.epoch && *view <= self.view {
334 return Ok(());
335 }
336 if self.view < *view {
337 self.view = *view;
338 }
339 let prev_epoch = self.epoch;
340 let mut new_epoch = false;
341 if self.epoch < *epoch {
342 self.epoch = *epoch;
343 new_epoch = true;
344 }
345 if *view == TYPES::View::new(0) {
346 return Ok(());
347 }
348
349 if new_epoch {
350 if let Some(prev_epoch) = prev_epoch {
351 self.log_basic_stats(now, &prev_epoch);
352 }
353 let _ = self.dump_stats();
354 self.garbage_collect(*view - 1);
355 }
356
357 let leader = self
358 .membership_coordinator
359 .membership_for_epoch(*epoch)
360 .await?
361 .leader(*view)
362 .await?;
363 if leader == self.public_key {
364 self.leader_entry(*view).builder_start = Some(now);
365 }
366 },
367 HotShotEvent::Timeout(view, _) => {
368 self.replica_entry(*view).timeout_triggered = Some(now);
369 self.timeouts.insert(*view);
370 },
371 HotShotEvent::TransactionsRecv(_txns) => {
372 },
375 HotShotEvent::SendPayloadCommitmentAndMetadata(_, _, _, view, _) => {
376 self.leader_entry(*view).vid_disperse_send = Some(now);
377 },
378 HotShotEvent::VidShareRecv(_, proposal) => {
379 self.replica_entry(proposal.data.view_number())
380 .vid_share_recv = Some(now);
381 },
382 HotShotEvent::VidShareValidated(proposal) => {
383 self.replica_entry(proposal.data.view_number())
384 .vid_share_validated = Some(now);
385 },
386 HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
387 self.replica_entry(proposal.data.view_number())
388 .proposal_prelim_validated = Some(now);
389 },
390 HotShotEvent::LeavesDecided(leaves) => {
391 for leaf in leaves {
392 if leaf.view_number() == TYPES::View::genesis() {
393 continue;
394 }
395 let view = leaf.view_number();
396 let timestamp = leaf.block_header().timestamp_millis() as i128;
397 let now_millis = now / 1_000_000;
398 let latency = now_millis - timestamp;
399 tracing::debug!("View {} Latency: {}ms", view, latency);
400 self.latencies_by_view.insert(view, latency);
401 self.sizes_by_view.insert(
402 view,
403 leaf.block_payload().map(|p| p.txn_bytes()).unwrap_or(0) as i128,
404 );
405 }
406 },
407 _ => {},
408 }
409 Ok(())
410 }
411
412 fn cancel_subtasks(&mut self) {
413 }
415}