1use std::{collections::BTreeMap, sync::Arc};
2
3use async_broadcast::{Receiver, Sender};
4use async_trait::async_trait;
5use either::Either;
6use hotshot_task::task::TaskState;
7use hotshot_types::{
8 consensus::OuterConsensus,
9 epoch_membership::EpochMembershipCoordinator,
10 traits::node_implementation::{ConsensusTime, NodeType},
11 vote::HasViewNumber,
12};
13use hotshot_utils::{
14 anytrace::{Error, Level, Result},
15 line_info, warn,
16};
17use serde::{Deserialize, Serialize};
18use time::OffsetDateTime;
19
20use crate::events::HotShotEvent;
21
22#[derive(Serialize, Deserialize)]
23pub struct LeaderViewStats<TYPES: NodeType> {
24 pub view: TYPES::View,
25 pub prev_proposal_send: Option<i128>,
26 pub proposal_send: Option<i128>,
27 pub vote_recv: Option<i128>,
28 pub da_proposal_send: Option<i128>,
29 pub builder_start: Option<i128>,
30 pub block_built: Option<i128>,
31 pub vid_disperse_send: Option<i128>,
32 pub timeout_certificate_formed: Option<i128>,
33 pub qc_formed: Option<i128>,
34 pub da_cert_send: Option<i128>,
35}
36
37#[derive(Serialize, Deserialize)]
38pub struct ReplicaViewStats<TYPES: NodeType> {
39 pub view: TYPES::View,
40 pub view_change: Option<i128>,
41 pub proposal_recv: Option<i128>,
42 pub vote_send: Option<i128>,
43 pub timeout_vote_send: Option<i128>,
44 pub da_proposal_received: Option<i128>,
45 pub da_proposal_validated: Option<i128>,
46 pub da_certificate_recv: Option<i128>,
47 pub proposal_prelim_validated: Option<i128>,
48 pub proposal_validated: Option<i128>,
49 pub timeout_triggered: Option<i128>,
50 pub vid_share_validated: Option<i128>,
51 pub vid_share_recv: Option<i128>,
52}
53
54impl<TYPES: NodeType> LeaderViewStats<TYPES> {
55 fn new(view: TYPES::View) -> Self {
56 Self {
57 view,
58 prev_proposal_send: None,
59 proposal_send: None,
60 vote_recv: None,
61 da_proposal_send: None,
62 builder_start: None,
63 block_built: None,
64 vid_disperse_send: None,
65 timeout_certificate_formed: None,
66 qc_formed: None,
67 da_cert_send: None,
68 }
69 }
70}
71
72impl<TYPES: NodeType> ReplicaViewStats<TYPES> {
73 fn new(view: TYPES::View) -> Self {
74 Self {
75 view,
76 view_change: None,
77 proposal_recv: None,
78 vote_send: None,
79 timeout_vote_send: None,
80 da_proposal_received: None,
81 da_proposal_validated: None,
82 da_certificate_recv: None,
83 proposal_prelim_validated: None,
84 proposal_validated: None,
85 timeout_triggered: None,
86 vid_share_validated: None,
87 vid_share_recv: None,
88 }
89 }
90}
91
92pub struct StatsTaskState<TYPES: NodeType> {
93 view: TYPES::View,
94 epoch: Option<TYPES::Epoch>,
95 public_key: TYPES::SignatureKey,
96 consensus: OuterConsensus<TYPES>,
97 membership_coordinator: EpochMembershipCoordinator<TYPES>,
98 leader_stats: BTreeMap<TYPES::View, LeaderViewStats<TYPES>>,
99 replica_stats: BTreeMap<TYPES::View, ReplicaViewStats<TYPES>>,
100}
101
102impl<TYPES: NodeType> StatsTaskState<TYPES> {
103 pub fn new(
104 view: TYPES::View,
105 epoch: Option<TYPES::Epoch>,
106 public_key: TYPES::SignatureKey,
107 consensus: OuterConsensus<TYPES>,
108 membership_coordinator: EpochMembershipCoordinator<TYPES>,
109 ) -> Self {
110 Self {
111 view,
112 epoch,
113 public_key,
114 consensus,
115 membership_coordinator,
116 leader_stats: BTreeMap::new(),
117 replica_stats: BTreeMap::new(),
118 }
119 }
120 fn leader_entry(&mut self, view: TYPES::View) -> &mut LeaderViewStats<TYPES> {
121 self.leader_stats
122 .entry(view)
123 .or_insert_with(|| LeaderViewStats::new(view))
124 }
125 fn replica_entry(&mut self, view: TYPES::View) -> &mut ReplicaViewStats<TYPES> {
126 self.replica_stats
127 .entry(view)
128 .or_insert_with(|| ReplicaViewStats::new(view))
129 }
130 fn garbage_collect(&mut self, view: TYPES::View) {
131 self.leader_stats = self.leader_stats.split_off(&view);
132 self.replica_stats = self.replica_stats.split_off(&view);
133 }
134
135 fn dump_stats(&self) -> Result<()> {
136 let mut writer = csv::Writer::from_writer(vec![]);
137 for (_, leader_stats) in self.leader_stats.iter() {
138 writer
139 .serialize(leader_stats)
140 .map_err(|e| warn!("Failed to serialize leader stats: {}", e))?;
141 }
142 let output = writer
143 .into_inner()
144 .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
145 tracing::warn!(
146 "Leader stats: {}",
147 String::from_utf8(output)
148 .map_err(|e| warn!("Failed to convert leader stats to string: {}", e))?
149 );
150 let mut writer = csv::Writer::from_writer(vec![]);
151 for (_, replica_stats) in self.replica_stats.iter() {
152 writer
153 .serialize(replica_stats)
154 .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
155 }
156 let output = writer
157 .into_inner()
158 .map_err(|e| warn!("Failed to serialize replica stats: {}", e))?;
159 tracing::warn!(
160 "Replica stats: {}",
161 String::from_utf8(output)
162 .map_err(|e| warn!("Failed to convert replica stats to string: {}", e))?
163 );
164 Ok(())
165 }
166}
167
168#[async_trait]
169impl<TYPES: NodeType> TaskState for StatsTaskState<TYPES> {
170 type Event = HotShotEvent<TYPES>;
171
172 async fn handle_event(
173 &mut self,
174 event: Arc<Self::Event>,
175 _sender: &Sender<Arc<Self::Event>>,
176 _receiver: &Receiver<Arc<Self::Event>>,
177 ) -> Result<()> {
178 let now = OffsetDateTime::now_utc().unix_timestamp_nanos();
179
180 match event.as_ref() {
181 HotShotEvent::BlockRecv(block_recv) => {
182 self.leader_entry(block_recv.view_number).block_built = Some(now);
183 },
184 HotShotEvent::QuorumProposalRecv(proposal, _) => {
185 self.replica_entry(proposal.data.view_number())
186 .proposal_recv = Some(now);
187 },
188 HotShotEvent::QuorumVoteRecv(_vote) => {},
189 HotShotEvent::TimeoutVoteRecv(_vote) => {},
190 HotShotEvent::TimeoutVoteSend(vote) => {
191 self.replica_entry(vote.view_number()).timeout_vote_send = Some(now);
192 },
193 HotShotEvent::DaProposalRecv(proposal, _) => {
194 self.replica_entry(proposal.data.view_number())
195 .da_proposal_received = Some(now);
196 },
197 HotShotEvent::DaProposalValidated(proposal, _) => {
198 self.replica_entry(proposal.data.view_number())
199 .da_proposal_validated = Some(now);
200 },
201 HotShotEvent::DaVoteRecv(_simple_vote) => {},
202 HotShotEvent::DaCertificateRecv(simple_certificate) => {
203 self.replica_entry(simple_certificate.view_number())
204 .da_certificate_recv = Some(now);
205 },
206 HotShotEvent::DaCertificateValidated(_simple_certificate) => {},
207 HotShotEvent::QuorumProposalSend(proposal, _) => {
208 self.leader_entry(proposal.data.view_number()).proposal_send = Some(now);
209
210 if proposal.data.view_change_evidence().is_none() {
212 if let Some(previous_proposal_time) = self
213 .replica_entry(proposal.data.view_number() - 1)
214 .proposal_recv
215 {
216 self.leader_entry(proposal.data.view_number())
217 .prev_proposal_send = Some(previous_proposal_time);
218
219 let elapsed_time = (now - previous_proposal_time) / 1_000_000;
221 if elapsed_time > 0 {
222 self.consensus
223 .read()
224 .await
225 .metrics
226 .previous_proposal_to_proposal_time
227 .add_point(elapsed_time as f64);
228 } else {
229 tracing::warn!("Previous proposal time is in the future");
230 }
231 }
232 }
233 },
234 HotShotEvent::QuorumVoteSend(simple_vote) => {
235 self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
236 },
237 HotShotEvent::ExtendedQuorumVoteSend(simple_vote) => {
238 self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
239 },
240 HotShotEvent::QuorumProposalValidated(proposal, _) => {
241 self.replica_entry(proposal.data.view_number())
242 .proposal_validated = Some(now);
243 },
244 HotShotEvent::DaProposalSend(proposal, _) => {
245 self.leader_entry(proposal.data.view_number())
246 .da_proposal_send = Some(now);
247 },
248 HotShotEvent::DaVoteSend(simple_vote) => {
249 self.replica_entry(simple_vote.view_number()).vote_send = Some(now);
250 },
251 HotShotEvent::QcFormed(either) => {
252 match either {
253 Either::Left(qc) => {
254 self.leader_entry(qc.view_number() + 1).qc_formed = Some(now)
255 },
256 Either::Right(tc) => {
257 self.leader_entry(tc.view_number())
258 .timeout_certificate_formed = Some(now)
259 },
260 };
261 },
262 HotShotEvent::Qc2Formed(either) => {
263 match either {
264 Either::Left(qc) => {
265 self.leader_entry(qc.view_number() + 1).qc_formed = Some(now)
266 },
267 Either::Right(tc) => {
268 self.leader_entry(tc.view_number())
269 .timeout_certificate_formed = Some(now)
270 },
271 };
272 },
273 HotShotEvent::DacSend(simple_certificate, _) => {
274 self.leader_entry(simple_certificate.view_number())
275 .da_cert_send = Some(now);
276 },
277 HotShotEvent::ViewChange(view, epoch) => {
278 if self.replica_entry(*view + 1).view_change.is_none() {
282 self.replica_entry(*view + 1).view_change = Some(now);
283 }
284
285 if *epoch <= self.epoch && *view <= self.view {
286 return Ok(());
287 }
288 if self.view < *view {
289 self.view = *view;
290 }
291 let mut new_epoch = false;
292 if self.epoch < *epoch {
293 self.epoch = *epoch;
294 new_epoch = true;
295 }
296 if *view == TYPES::View::new(0) {
297 return Ok(());
298 }
299
300 if new_epoch {
301 let _ = self.dump_stats();
302 self.garbage_collect(*view - 1);
303 }
304
305 let leader = self
306 .membership_coordinator
307 .membership_for_epoch(*epoch)
308 .await?
309 .leader(*view)
310 .await?;
311 if leader == self.public_key {
312 self.leader_entry(*view).builder_start = Some(now);
313 }
314 },
315 HotShotEvent::Timeout(..) => {
316 self.replica_entry(self.view).timeout_triggered = Some(now);
317 },
318 HotShotEvent::TransactionsRecv(_txns) => {
319 },
322 HotShotEvent::SendPayloadCommitmentAndMetadata(_, _, _, view, _) => {
323 self.leader_entry(*view).vid_disperse_send = Some(now);
324 },
325 HotShotEvent::VidShareRecv(_, proposal) => {
326 self.replica_entry(proposal.data.view_number())
327 .vid_share_recv = Some(now);
328 },
329 HotShotEvent::VidShareValidated(proposal) => {
330 self.replica_entry(proposal.data.view_number())
331 .vid_share_validated = Some(now);
332 },
333 HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
334 self.replica_entry(proposal.data.view_number())
335 .proposal_prelim_validated = Some(now);
336 },
337 _ => {},
338 }
339 Ok(())
340 }
341
342 fn cancel_subtasks(&mut self) {
343 }
345}