hotshot_orchestrator/
client.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{net::SocketAddr, time::Duration};
8
9use clap::Parser;
10use futures::{Future, FutureExt};
11use hotshot_types::{
12    network::{NetworkConfig, NetworkConfigSource},
13    traits::node_implementation::NodeType,
14    PeerConfig, ValidatorConfig,
15};
16use libp2p_identity::PeerId;
17use multiaddr::Multiaddr;
18use surf_disco::{error::ClientError, Client};
19use tide_disco::Url;
20use tokio::time::sleep;
21use tracing::{info, instrument};
22use vbs::BinarySerializer;
23
24use crate::OrchestratorVersion;
25
26/// Holds the client connection to the orchestrator
27pub struct OrchestratorClient {
28    /// the client
29    pub client: surf_disco::Client<ClientError, OrchestratorVersion>,
30}
31
32/// Struct describing a benchmark result
33#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
34pub struct BenchResults {
35    /// Whether it's partial collected results
36    pub partial_results: String,
37    /// The average latency of the transactions
38    pub avg_latency_in_sec: i64,
39    /// The number of transactions that were latency measured
40    pub num_latency: i64,
41    /// The minimum latency of the transactions
42    pub minimum_latency_in_sec: i64,
43    /// The maximum latency of the transactions
44    pub maximum_latency_in_sec: i64,
45    /// The throughput of the consensus protocol = number of transactions committed per second * transaction size in bytes
46    pub throughput_bytes_per_sec: u64,
47    /// The number of transactions committed during benchmarking
48    pub total_transactions_committed: u64,
49    /// The size of each transaction in bytes
50    pub transaction_size_in_bytes: u64,
51    /// The total time elapsed for benchmarking
52    pub total_time_elapsed_in_sec: u64,
53    /// The total number of views during benchmarking
54    pub total_num_views: usize,
55    /// The number of failed views during benchmarking
56    pub failed_num_views: usize,
57    /// The membership committee type used
58    pub committee_type: String,
59}
60
61impl BenchResults {
62    /// printout the results of one example run
63    pub fn printout(&self) {
64        println!("=====================");
65        println!("{0} Benchmark results:", self.partial_results);
66        println!("Committee type: {}", self.committee_type);
67        println!(
68            "Average latency: {} seconds, Minimum latency: {} seconds, Maximum latency: {} seconds",
69            self.avg_latency_in_sec, self.minimum_latency_in_sec, self.maximum_latency_in_sec
70        );
71        println!("Throughput: {} bytes/sec", self.throughput_bytes_per_sec);
72        println!(
73            "Total transactions committed: {}",
74            self.total_transactions_committed
75        );
76        println!(
77            "Total number of views: {}, Failed number of views: {}",
78            self.total_num_views, self.failed_num_views
79        );
80        println!("=====================");
81    }
82}
83
84/// Struct describing a benchmark result needed for download, also include the config
85#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
86pub struct BenchResultsDownloadConfig {
87    // Config starting here
88    /// The commit this benchmark was run on
89    pub commit_sha: String,
90    /// Total number of nodes
91    pub total_nodes: usize,
92    /// The size of the da committee
93    pub da_committee_size: usize,
94    /// The number of fixed_leader_for_gpuvid when we enable the feature [fixed-leader-election]
95    pub fixed_leader_for_gpuvid: usize,
96    /// Number of transactions submitted per round
97    pub transactions_per_round: usize,
98    /// The size of each transaction in bytes
99    pub transaction_size: u64,
100    /// The number of rounds
101    pub rounds: usize,
102
103    // Results starting here
104    /// Whether the results are partially collected
105    /// "One" when the results are collected for one node
106    /// "Half" when the results are collective for half running nodes if not all nodes terminate successfully
107    /// "Full" if the results are successfully collected from all nodes
108    pub partial_results: String,
109    /// The average latency of the transactions
110    pub avg_latency_in_sec: i64,
111    /// The minimum latency of the transactions
112    pub minimum_latency_in_sec: i64,
113    /// The maximum latency of the transactions
114    pub maximum_latency_in_sec: i64,
115    /// The throughput of the consensus protocol = number of transactions committed per second * transaction size in bytes
116    pub throughput_bytes_per_sec: u64,
117    /// The number of transactions committed during benchmarking
118    pub total_transactions_committed: u64,
119    /// The total time elapsed for benchmarking
120    pub total_time_elapsed_in_sec: u64,
121    /// The total number of views during benchmarking
122    pub total_num_views: usize,
123    /// The number of failed views during benchmarking
124    pub failed_num_views: usize,
125    /// The membership committee type used
126    pub committee_type: String,
127}
128
129// VALIDATOR
130
131#[derive(Parser, Debug, Clone)]
132#[command(
133    name = "Multi-machine consensus",
134    about = "Simulates consensus among multiple machines"
135)]
136/// Arguments passed to the validator
137pub struct ValidatorArgs {
138    /// The address the orchestrator runs on
139    pub url: Url,
140    /// The optional advertise address to use for Libp2p
141    pub advertise_address: Option<String>,
142    /// Optional address to run builder on. Address must be accessible by other nodes
143    pub builder_address: Option<SocketAddr>,
144    /// An optional network config file to save to/load from
145    /// Allows for rejoining the network on a complete state loss
146    #[arg(short, long)]
147    pub network_config_file: Option<String>,
148}
149
150/// arguments to run multiple validators
151#[derive(Parser, Debug, Clone)]
152pub struct MultiValidatorArgs {
153    /// Number of validators to run
154    pub num_nodes: u16,
155    /// The address the orchestrator runs on
156    pub url: Url,
157    /// The optional advertise address to use for Libp2p
158    pub advertise_address: Option<String>,
159    /// An optional network config file to save to/load from
160    /// Allows for rejoining the network on a complete state loss
161    #[arg(short, long)]
162    pub network_config_file: Option<String>,
163}
164
165/// Asynchronously retrieves a `NetworkConfig` from an orchestrator.
166/// The retrieved one includes correct `node_index` and peer's public config.
167///
168/// # Errors
169/// If we are unable to get the configuration from the orchestrator
170pub async fn get_complete_config<TYPES: NodeType>(
171    client: &OrchestratorClient,
172    mut validator_config: ValidatorConfig<TYPES>,
173    libp2p_advertise_address: Option<Multiaddr>,
174    libp2p_public_key: Option<PeerId>,
175) -> anyhow::Result<(
176    NetworkConfig<TYPES>,
177    ValidatorConfig<TYPES>,
178    NetworkConfigSource,
179)> {
180    // get the configuration from the orchestrator
181    let run_config: NetworkConfig<TYPES> = client
182        .post_and_wait_all_public_keys::<TYPES>(
183            &mut validator_config,
184            libp2p_advertise_address,
185            libp2p_public_key,
186        )
187        .await;
188
189    info!(
190        "Retrieved config; our node index is {}. DA committee member: {}",
191        run_config.node_index, validator_config.is_da
192    );
193    Ok((
194        run_config,
195        validator_config,
196        NetworkConfigSource::Orchestrator,
197    ))
198}
199
200impl ValidatorArgs {
201    /// Constructs `ValidatorArgs` from `MultiValidatorArgs` and a node index.
202    ///
203    /// If `network_config_file` is present in `MultiValidatorArgs`, it appends the node index to it to create a unique file name for each node.
204    ///
205    /// # Arguments
206    ///
207    /// * `multi_args` - A `MultiValidatorArgs` instance containing the base arguments for the construction.
208    /// * `node_index` - A `u16` representing the index of the node for which the args are being constructed.
209    ///
210    /// # Returns
211    ///
212    /// This function returns a new instance of `ValidatorArgs`.
213    ///
214    /// # Examples
215    ///
216    /// ```ignore
217    /// // NOTE this is a toy example,
218    /// // the user will need to construct a multivalidatorargs since `new` does not exist
219    /// # use hotshot_orchestrator::client::MultiValidatorArgs;
220    /// let multi_args = MultiValidatorArgs::new();
221    /// let node_index = 1;
222    /// let instance = Self::from_multi_args(multi_args, node_index);
223    /// ```
224    #[must_use]
225    pub fn from_multi_args(multi_args: MultiValidatorArgs, node_index: u16) -> Self {
226        Self {
227            url: multi_args.url,
228            advertise_address: multi_args.advertise_address,
229            builder_address: None,
230            network_config_file: multi_args
231                .network_config_file
232                .map(|s| format!("{s}-{node_index}")),
233        }
234    }
235}
236
237impl OrchestratorClient {
238    /// Creates the client that will connect to the orchestrator
239    #[must_use]
240    pub fn new(url: Url) -> Self {
241        let client = surf_disco::Client::<ClientError, OrchestratorVersion>::new(url);
242        // TODO ED: Add healthcheck wait here
243        OrchestratorClient { client }
244    }
245
246    /// Get the config from the orchestrator.
247    /// If the identity is provided, register the identity with the orchestrator.
248    /// If not, just retrieving the config (for passive observers)
249    ///
250    /// # Panics
251    /// if unable to convert the node index from usize into u64
252    /// (only applicable on 32 bit systems)
253    ///
254    /// # Errors
255    /// If we were unable to serialize the Libp2p data
256    #[allow(clippy::type_complexity)]
257    pub async fn get_config_without_peer<TYPES: NodeType>(
258        &self,
259        libp2p_advertise_address: Option<Multiaddr>,
260        libp2p_public_key: Option<PeerId>,
261    ) -> anyhow::Result<NetworkConfig<TYPES>> {
262        // Serialize our (possible) libp2p-specific data
263        let request_body = vbs::Serializer::<OrchestratorVersion>::serialize(&(
264            libp2p_advertise_address,
265            libp2p_public_key,
266        ))?;
267
268        let identity = |client: Client<ClientError, OrchestratorVersion>| {
269            // We need to clone here to move it into the closure
270            let request_body = request_body.clone();
271            async move {
272                let node_index: Result<u16, ClientError> = client
273                    .post("api/identity")
274                    .body_binary(&request_body)
275                    .expect("failed to set request body")
276                    .send()
277                    .await;
278
279                node_index
280            }
281            .boxed()
282        };
283        let node_index = self.wait_for_fn_from_orchestrator(identity).await;
284
285        // get the corresponding config
286        let f = |client: Client<ClientError, OrchestratorVersion>| {
287            async move {
288                let config: Result<NetworkConfig<TYPES>, ClientError> = client
289                    .post(&format!("api/config/{node_index}"))
290                    .send()
291                    .await;
292                config
293            }
294            .boxed()
295        };
296
297        let mut config = self.wait_for_fn_from_orchestrator(f).await;
298        config.node_index = From::<u16>::from(node_index);
299
300        Ok(config)
301    }
302
303    /// Post to the orchestrator and get the latest `node_index`
304    /// Then return it for the init validator config
305    /// # Panics
306    /// if unable to post
307    #[instrument(skip_all, name = "orchestrator node index for validator config")]
308    pub async fn get_node_index_for_init_validator_config(&self) -> u16 {
309        let cur_node_index = |client: Client<ClientError, OrchestratorVersion>| {
310            async move {
311                let cur_node_index: Result<u16, ClientError> = client
312                    .post("api/get_tmp_node_index")
313                    .send()
314                    .await
315                    .inspect_err(|err| tracing::error!("{err}"));
316
317                cur_node_index
318            }
319            .boxed()
320        };
321        self.wait_for_fn_from_orchestrator(cur_node_index).await
322    }
323
324    /// Requests the configuration from the orchestrator with the stipulation that
325    /// a successful call requires all nodes to be registered.
326    ///
327    /// Does not fail, retries internally until success.
328    #[instrument(skip_all, name = "orchestrator config")]
329    pub async fn get_config_after_collection<TYPES: NodeType>(&self) -> NetworkConfig<TYPES> {
330        // Define the request for post-register configurations
331        let get_config_after_collection = |client: Client<ClientError, OrchestratorVersion>| {
332            async move {
333                let result = client
334                    .post("api/post_config_after_peer_collected")
335                    .send()
336                    .await;
337
338                if let Err(ref err) = result {
339                    tracing::error!("{err}");
340                }
341
342                result
343            }
344            .boxed()
345        };
346
347        // Loop until successful
348        self.wait_for_fn_from_orchestrator(get_config_after_collection)
349            .await
350    }
351
352    /// Registers a builder URL with the orchestrator
353    ///
354    /// # Panics
355    /// if unable to serialize `address`
356    pub async fn post_builder_addresses(&self, addresses: Vec<Url>) {
357        let send_builder_f = |client: Client<ClientError, OrchestratorVersion>| {
358            let request_body = vbs::Serializer::<OrchestratorVersion>::serialize(&addresses)
359                .expect("Failed to serialize request");
360
361            async move {
362                let result: Result<_, ClientError> = client
363                    .post("api/builder")
364                    .body_binary(&request_body)
365                    .unwrap()
366                    .send()
367                    .await
368                    .inspect_err(|err| tracing::error!("{err}"));
369                result
370            }
371            .boxed()
372        };
373        self.wait_for_fn_from_orchestrator::<_, _, ()>(send_builder_f)
374            .await;
375    }
376
377    /// Requests a builder URL from orchestrator
378    pub async fn get_builder_addresses(&self) -> Vec<Url> {
379        // Define the request for post-register configurations
380        let get_builder = |client: Client<ClientError, OrchestratorVersion>| {
381            async move {
382                let result = client.get("api/builders").send().await;
383
384                if let Err(ref err) = result {
385                    tracing::error!("{err}");
386                }
387
388                result
389            }
390            .boxed()
391        };
392
393        // Loop until successful
394        self.wait_for_fn_from_orchestrator(get_builder).await
395    }
396
397    /// Sends my public key to the orchestrator so that it can collect all public keys
398    /// And get the updated config
399    /// Blocks until the orchestrator collects all peer's public keys/configs
400    /// # Panics
401    /// if unable to post
402    #[instrument(skip(self), name = "orchestrator public keys")]
403    pub async fn post_and_wait_all_public_keys<TYPES: NodeType>(
404        &self,
405        validator_config: &mut ValidatorConfig<TYPES>,
406        libp2p_advertise_address: Option<Multiaddr>,
407        libp2p_public_key: Option<PeerId>,
408    ) -> NetworkConfig<TYPES> {
409        let pubkey: Vec<u8> =
410            PeerConfig::<TYPES>::to_bytes(&validator_config.public_config()).clone();
411        let da_requested: bool = validator_config.is_da;
412
413        // Serialize our (possible) libp2p-specific data
414        let request_body = vbs::Serializer::<OrchestratorVersion>::serialize(&(
415            pubkey,
416            libp2p_advertise_address,
417            libp2p_public_key,
418        ))
419        .expect("failed to serialize request");
420
421        // register our public key with the orchestrator
422        let (node_index, is_da): (u64, bool) = loop {
423            let result = self
424                .client
425                .post(&format!("api/pubkey/{da_requested}"))
426                .body_binary(&request_body)
427                .expect("Failed to form request")
428                .send()
429                .await
430                .inspect_err(|err| tracing::error!("{err}"));
431
432            if let Ok((index, is_da)) = result {
433                break (index, is_da);
434            }
435
436            sleep(Duration::from_millis(250)).await;
437        };
438
439        validator_config.is_da = is_da;
440
441        // wait for all nodes' public keys
442        let wait_for_all_nodes_pub_key = |client: Client<ClientError, OrchestratorVersion>| {
443            async move {
444                client
445                    .get("api/peer_pub_ready")
446                    .send()
447                    .await
448                    .inspect_err(|err| tracing::error!("{err}"))
449            }
450            .boxed()
451        };
452        self.wait_for_fn_from_orchestrator::<_, _, ()>(wait_for_all_nodes_pub_key)
453            .await;
454
455        let mut network_config = self.get_config_after_collection().await;
456
457        network_config.node_index = node_index;
458
459        network_config
460    }
461
462    /// Tells the orchestrator this validator is ready to start
463    /// Blocks until the orchestrator indicates all nodes are ready to start
464    /// # Panics
465    /// Panics if unable to post.
466    #[instrument(skip(self), name = "orchestrator ready signal")]
467    pub async fn wait_for_all_nodes_ready(&self, peer_config: Vec<u8>) -> bool {
468        let send_ready_f = |client: Client<ClientError, OrchestratorVersion>| {
469            let pk = peer_config.clone();
470            async move {
471                let result: Result<_, ClientError> = client
472                    .post("api/ready")
473                    .body_binary(&pk)
474                    .unwrap()
475                    .send()
476                    .await
477                    .inspect_err(|err| tracing::error!("{err}"));
478                result
479            }
480            .boxed()
481        };
482        self.wait_for_fn_from_orchestrator::<_, _, ()>(send_ready_f)
483            .await;
484
485        let wait_for_all_nodes_ready_f = |client: Client<ClientError, OrchestratorVersion>| {
486            async move { client.get("api/start").send().await }.boxed()
487        };
488        self.wait_for_fn_from_orchestrator(wait_for_all_nodes_ready_f)
489            .await
490    }
491
492    /// Sends the benchmark metrics to the orchestrator
493    /// # Panics
494    /// Panics if unable to post
495    #[instrument(skip_all, name = "orchestrator metrics")]
496    pub async fn post_bench_results(&self, bench_results: BenchResults) {
497        let _send_metrics_f: Result<(), ClientError> = self
498            .client
499            .post("api/results")
500            .body_json(&bench_results)
501            .unwrap()
502            .send()
503            .await
504            .inspect_err(|err| tracing::warn!("{err}"));
505    }
506
507    /// Generic function that waits for the orchestrator to return a non-error
508    /// Returns whatever type the given function returns
509    #[instrument(skip_all, name = "waiting for orchestrator")]
510    async fn wait_for_fn_from_orchestrator<F, Fut, GEN>(&self, f: F) -> GEN
511    where
512        F: Fn(Client<ClientError, OrchestratorVersion>) -> Fut,
513        Fut: Future<Output = Result<GEN, ClientError>>,
514    {
515        loop {
516            let client = self.client.clone();
517            let res = f(client).await;
518            match res {
519                Ok(x) => break x,
520                Err(err) => {
521                    tracing::info!("{err}");
522                    sleep(Duration::from_millis(250)).await;
523                },
524            }
525        }
526    }
527}