1use std::{net::SocketAddr, time::Duration};
8
9use clap::Parser;
10use futures::{Future, FutureExt};
11use hotshot_types::{
12 network::{NetworkConfig, NetworkConfigSource},
13 traits::node_implementation::NodeType,
14 PeerConfig, ValidatorConfig,
15};
16use libp2p_identity::PeerId;
17use multiaddr::Multiaddr;
18use surf_disco::{error::ClientError, Client};
19use tide_disco::Url;
20use tokio::time::sleep;
21use tracing::{info, instrument};
22use vbs::BinarySerializer;
23
24use crate::OrchestratorVersion;
25
26pub struct OrchestratorClient {
28 pub client: surf_disco::Client<ClientError, OrchestratorVersion>,
30}
31
32#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
34pub struct BenchResults {
35 pub partial_results: String,
37 pub avg_latency_in_sec: i64,
39 pub num_latency: i64,
41 pub minimum_latency_in_sec: i64,
43 pub maximum_latency_in_sec: i64,
45 pub throughput_bytes_per_sec: u64,
47 pub total_transactions_committed: u64,
49 pub transaction_size_in_bytes: u64,
51 pub total_time_elapsed_in_sec: u64,
53 pub total_num_views: usize,
55 pub failed_num_views: usize,
57 pub committee_type: String,
59}
60
61impl BenchResults {
62 pub fn printout(&self) {
64 println!("=====================");
65 println!("{0} Benchmark results:", self.partial_results);
66 println!("Committee type: {}", self.committee_type);
67 println!(
68 "Average latency: {} seconds, Minimum latency: {} seconds, Maximum latency: {} seconds",
69 self.avg_latency_in_sec, self.minimum_latency_in_sec, self.maximum_latency_in_sec
70 );
71 println!("Throughput: {} bytes/sec", self.throughput_bytes_per_sec);
72 println!(
73 "Total transactions committed: {}",
74 self.total_transactions_committed
75 );
76 println!(
77 "Total number of views: {}, Failed number of views: {}",
78 self.total_num_views, self.failed_num_views
79 );
80 println!("=====================");
81 }
82}
83
84#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
86pub struct BenchResultsDownloadConfig {
87 pub commit_sha: String,
90 pub total_nodes: usize,
92 pub da_committee_size: usize,
94 pub fixed_leader_for_gpuvid: usize,
96 pub transactions_per_round: usize,
98 pub transaction_size: u64,
100 pub rounds: usize,
102
103 pub partial_results: String,
109 pub avg_latency_in_sec: i64,
111 pub minimum_latency_in_sec: i64,
113 pub maximum_latency_in_sec: i64,
115 pub throughput_bytes_per_sec: u64,
117 pub total_transactions_committed: u64,
119 pub total_time_elapsed_in_sec: u64,
121 pub total_num_views: usize,
123 pub failed_num_views: usize,
125 pub committee_type: String,
127}
128
129#[derive(Parser, Debug, Clone)]
132#[command(
133 name = "Multi-machine consensus",
134 about = "Simulates consensus among multiple machines"
135)]
136pub struct ValidatorArgs {
138 pub url: Url,
140 pub advertise_address: Option<String>,
142 pub builder_address: Option<SocketAddr>,
144 #[arg(short, long)]
147 pub network_config_file: Option<String>,
148}
149
150#[derive(Parser, Debug, Clone)]
152pub struct MultiValidatorArgs {
153 pub num_nodes: u16,
155 pub url: Url,
157 pub advertise_address: Option<String>,
159 #[arg(short, long)]
162 pub network_config_file: Option<String>,
163}
164
165pub async fn get_complete_config<TYPES: NodeType>(
171 client: &OrchestratorClient,
172 mut validator_config: ValidatorConfig<TYPES>,
173 libp2p_advertise_address: Option<Multiaddr>,
174 libp2p_public_key: Option<PeerId>,
175) -> anyhow::Result<(
176 NetworkConfig<TYPES>,
177 ValidatorConfig<TYPES>,
178 NetworkConfigSource,
179)> {
180 let run_config: NetworkConfig<TYPES> = client
182 .post_and_wait_all_public_keys::<TYPES>(
183 &mut validator_config,
184 libp2p_advertise_address,
185 libp2p_public_key,
186 )
187 .await;
188
189 info!(
190 "Retrieved config; our node index is {}. DA committee member: {}",
191 run_config.node_index, validator_config.is_da
192 );
193 Ok((
194 run_config,
195 validator_config,
196 NetworkConfigSource::Orchestrator,
197 ))
198}
199
200impl ValidatorArgs {
201 #[must_use]
225 pub fn from_multi_args(multi_args: MultiValidatorArgs, node_index: u16) -> Self {
226 Self {
227 url: multi_args.url,
228 advertise_address: multi_args.advertise_address,
229 builder_address: None,
230 network_config_file: multi_args
231 .network_config_file
232 .map(|s| format!("{s}-{node_index}")),
233 }
234 }
235}
236
237impl OrchestratorClient {
238 #[must_use]
240 pub fn new(url: Url) -> Self {
241 let client = surf_disco::Client::<ClientError, OrchestratorVersion>::new(url);
242 OrchestratorClient { client }
244 }
245
246 #[allow(clippy::type_complexity)]
257 pub async fn get_config_without_peer<TYPES: NodeType>(
258 &self,
259 libp2p_advertise_address: Option<Multiaddr>,
260 libp2p_public_key: Option<PeerId>,
261 ) -> anyhow::Result<NetworkConfig<TYPES>> {
262 let request_body = vbs::Serializer::<OrchestratorVersion>::serialize(&(
264 libp2p_advertise_address,
265 libp2p_public_key,
266 ))?;
267
268 let identity = |client: Client<ClientError, OrchestratorVersion>| {
269 let request_body = request_body.clone();
271 async move {
272 let node_index: Result<u16, ClientError> = client
273 .post("api/identity")
274 .body_binary(&request_body)
275 .expect("failed to set request body")
276 .send()
277 .await;
278
279 node_index
280 }
281 .boxed()
282 };
283 let node_index = self.wait_for_fn_from_orchestrator(identity).await;
284
285 let f = |client: Client<ClientError, OrchestratorVersion>| {
287 async move {
288 let config: Result<NetworkConfig<TYPES>, ClientError> = client
289 .post(&format!("api/config/{node_index}"))
290 .send()
291 .await;
292 config
293 }
294 .boxed()
295 };
296
297 let mut config = self.wait_for_fn_from_orchestrator(f).await;
298 config.node_index = From::<u16>::from(node_index);
299
300 Ok(config)
301 }
302
303 #[instrument(skip_all, name = "orchestrator node index for validator config")]
308 pub async fn get_node_index_for_init_validator_config(&self) -> u16 {
309 let cur_node_index = |client: Client<ClientError, OrchestratorVersion>| {
310 async move {
311 let cur_node_index: Result<u16, ClientError> = client
312 .post("api/get_tmp_node_index")
313 .send()
314 .await
315 .inspect_err(|err| tracing::error!("{err}"));
316
317 cur_node_index
318 }
319 .boxed()
320 };
321 self.wait_for_fn_from_orchestrator(cur_node_index).await
322 }
323
324 #[instrument(skip_all, name = "orchestrator config")]
329 pub async fn get_config_after_collection<TYPES: NodeType>(&self) -> NetworkConfig<TYPES> {
330 let get_config_after_collection = |client: Client<ClientError, OrchestratorVersion>| {
332 async move {
333 let result = client
334 .post("api/post_config_after_peer_collected")
335 .send()
336 .await;
337
338 if let Err(ref err) = result {
339 tracing::error!("{err}");
340 }
341
342 result
343 }
344 .boxed()
345 };
346
347 self.wait_for_fn_from_orchestrator(get_config_after_collection)
349 .await
350 }
351
352 pub async fn post_builder_addresses(&self, addresses: Vec<Url>) {
357 let send_builder_f = |client: Client<ClientError, OrchestratorVersion>| {
358 let request_body = vbs::Serializer::<OrchestratorVersion>::serialize(&addresses)
359 .expect("Failed to serialize request");
360
361 async move {
362 let result: Result<_, ClientError> = client
363 .post("api/builder")
364 .body_binary(&request_body)
365 .unwrap()
366 .send()
367 .await
368 .inspect_err(|err| tracing::error!("{err}"));
369 result
370 }
371 .boxed()
372 };
373 self.wait_for_fn_from_orchestrator::<_, _, ()>(send_builder_f)
374 .await;
375 }
376
377 pub async fn get_builder_addresses(&self) -> Vec<Url> {
379 let get_builder = |client: Client<ClientError, OrchestratorVersion>| {
381 async move {
382 let result = client.get("api/builders").send().await;
383
384 if let Err(ref err) = result {
385 tracing::error!("{err}");
386 }
387
388 result
389 }
390 .boxed()
391 };
392
393 self.wait_for_fn_from_orchestrator(get_builder).await
395 }
396
397 #[instrument(skip(self), name = "orchestrator public keys")]
403 pub async fn post_and_wait_all_public_keys<TYPES: NodeType>(
404 &self,
405 validator_config: &mut ValidatorConfig<TYPES>,
406 libp2p_advertise_address: Option<Multiaddr>,
407 libp2p_public_key: Option<PeerId>,
408 ) -> NetworkConfig<TYPES> {
409 let pubkey: Vec<u8> =
410 PeerConfig::<TYPES>::to_bytes(&validator_config.public_config()).clone();
411 let da_requested: bool = validator_config.is_da;
412
413 let request_body = vbs::Serializer::<OrchestratorVersion>::serialize(&(
415 pubkey,
416 libp2p_advertise_address,
417 libp2p_public_key,
418 ))
419 .expect("failed to serialize request");
420
421 let (node_index, is_da): (u64, bool) = loop {
423 let result = self
424 .client
425 .post(&format!("api/pubkey/{da_requested}"))
426 .body_binary(&request_body)
427 .expect("Failed to form request")
428 .send()
429 .await
430 .inspect_err(|err| tracing::error!("{err}"));
431
432 if let Ok((index, is_da)) = result {
433 break (index, is_da);
434 }
435
436 sleep(Duration::from_millis(250)).await;
437 };
438
439 validator_config.is_da = is_da;
440
441 let wait_for_all_nodes_pub_key = |client: Client<ClientError, OrchestratorVersion>| {
443 async move {
444 client
445 .get("api/peer_pub_ready")
446 .send()
447 .await
448 .inspect_err(|err| tracing::error!("{err}"))
449 }
450 .boxed()
451 };
452 self.wait_for_fn_from_orchestrator::<_, _, ()>(wait_for_all_nodes_pub_key)
453 .await;
454
455 let mut network_config = self.get_config_after_collection().await;
456
457 network_config.node_index = node_index;
458
459 network_config
460 }
461
462 #[instrument(skip(self), name = "orchestrator ready signal")]
467 pub async fn wait_for_all_nodes_ready(&self, peer_config: Vec<u8>) -> bool {
468 let send_ready_f = |client: Client<ClientError, OrchestratorVersion>| {
469 let pk = peer_config.clone();
470 async move {
471 let result: Result<_, ClientError> = client
472 .post("api/ready")
473 .body_binary(&pk)
474 .unwrap()
475 .send()
476 .await
477 .inspect_err(|err| tracing::error!("{err}"));
478 result
479 }
480 .boxed()
481 };
482 self.wait_for_fn_from_orchestrator::<_, _, ()>(send_ready_f)
483 .await;
484
485 let wait_for_all_nodes_ready_f = |client: Client<ClientError, OrchestratorVersion>| {
486 async move { client.get("api/start").send().await }.boxed()
487 };
488 self.wait_for_fn_from_orchestrator(wait_for_all_nodes_ready_f)
489 .await
490 }
491
492 #[instrument(skip_all, name = "orchestrator metrics")]
496 pub async fn post_bench_results(&self, bench_results: BenchResults) {
497 let _send_metrics_f: Result<(), ClientError> = self
498 .client
499 .post("api/results")
500 .body_json(&bench_results)
501 .unwrap()
502 .send()
503 .await
504 .inspect_err(|err| tracing::warn!("{err}"));
505 }
506
507 #[instrument(skip_all, name = "waiting for orchestrator")]
510 async fn wait_for_fn_from_orchestrator<F, Fut, GEN>(&self, f: F) -> GEN
511 where
512 F: Fn(Client<ClientError, OrchestratorVersion>) -> Fut,
513 Fut: Future<Output = Result<GEN, ClientError>>,
514 {
515 loop {
516 let client = self.client.clone();
517 let res = f(client).await;
518 match res {
519 Ok(x) => break x,
520 Err(err) => {
521 tracing::info!("{err}");
522 sleep(Duration::from_millis(250)).await;
523 },
524 }
525 }
526 }
527}