hotshot_orchestrator/
lib.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Orchestrator for manipulating nodes and recording results during a run of `HotShot` tests
8
9/// The orchestrator's clients
10pub mod client;
11
12use std::{
13    collections::{HashMap, HashSet},
14    fs,
15    fs::OpenOptions,
16    io,
17    time::Duration,
18};
19
20use alloy::primitives::U256;
21use async_lock::RwLock;
22use client::{BenchResults, BenchResultsDownloadConfig};
23use csv::Writer;
24use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
25use hotshot_types::{
26    network::{BuilderType, NetworkConfig, PublicKeysFile},
27    traits::{
28        node_implementation::NodeType,
29        signature_key::{SignatureKey, StakeTableEntryType},
30    },
31    PeerConfig,
32};
33use libp2p_identity::{
34    ed25519::{Keypair as EdKeypair, SecretKey},
35    Keypair, PeerId,
36};
37use multiaddr::Multiaddr;
38use surf_disco::Url;
39use tide_disco::{
40    api::ApiError,
41    error::ServerError,
42    method::{ReadState, WriteState},
43    Api, App, RequestError,
44};
45use vbs::{
46    version::{StaticVersion, StaticVersionType},
47    BinarySerializer,
48};
49
50/// Orchestrator is not, strictly speaking, bound to the network; it can have its own versioning.
51/// Orchestrator Version (major)
52pub const ORCHESTRATOR_MAJOR_VERSION: u16 = 0;
53/// Orchestrator Version (minor)
54pub const ORCHESTRATOR_MINOR_VERSION: u16 = 1;
55/// Orchestrator Version as a type
56pub type OrchestratorVersion =
57    StaticVersion<ORCHESTRATOR_MAJOR_VERSION, ORCHESTRATOR_MINOR_VERSION>;
58/// Orchestrator Version as a type-binding instance
59pub const ORCHESTRATOR_VERSION: OrchestratorVersion = StaticVersion {};
60
61/// Generate an keypair based on a `seed` and an `index`
62/// # Panics
63/// This panics if libp2p is unable to generate a secret key from the seed
64#[must_use]
65pub fn libp2p_generate_indexed_identity(seed: [u8; 32], index: u64) -> Keypair {
66    let mut hasher = blake3::Hasher::new();
67    hasher.update(&seed);
68    hasher.update(&index.to_le_bytes());
69    let new_seed = *hasher.finalize().as_bytes();
70    let sk_bytes = SecretKey::try_from_bytes(new_seed).unwrap();
71    <EdKeypair as From<SecretKey>>::from(sk_bytes).into()
72}
73
74/// The state of the orchestrator
75#[derive(Default, Clone)]
76#[allow(clippy::struct_excessive_bools)]
77struct OrchestratorState<TYPES: NodeType> {
78    /// Tracks the latest node index we have generated a configuration for
79    latest_index: u16,
80    /// Tracks the latest temporary index we have generated for init validator's key pair
81    tmp_latest_index: u16,
82    /// The network configuration
83    config: NetworkConfig<TYPES>,
84    /// Whether the network configuration has been updated with all the peer's public keys/configs
85    peer_pub_ready: bool,
86    /// A map from public keys to `(node_index, is_da)`.
87    pub_posted: HashMap<Vec<u8>, (u64, bool)>,
88    /// Whether nodes should start their HotShot instances
89    /// Will be set to true once all nodes post they are ready to start
90    start: bool,
91    /// The total nodes that have posted they are ready to start
92    nodes_connected: HashSet<PeerConfig<TYPES>>,
93    /// The results of the benchmarks
94    bench_results: BenchResults,
95    /// The number of nodes that have posted their results
96    nodes_post_results: u64,
97    /// Whether the orchestrator can be started manually
98    manual_start_allowed: bool,
99    /// Whether we are still accepting new keys for registration
100    accepting_new_keys: bool,
101    /// Builder address pool
102    builders: Vec<Url>,
103    /// whether we are using a fixed stake table, disabling public key registration
104    fixed_stake_table: bool,
105}
106
107impl<TYPES: NodeType> OrchestratorState<TYPES> {
108    /// create a new [`OrchestratorState`]
109    pub fn new(network_config: NetworkConfig<TYPES>) -> Self {
110        let mut peer_pub_ready = false;
111        let mut fixed_stake_table = false;
112
113        if network_config.config.known_nodes_with_stake.is_empty() {
114            println!(
115                "No nodes were loaded from the config file. Nodes will be allowed to register \
116                 dynamically."
117            );
118        } else {
119            println!("Initializing orchestrator with fixed stake table.");
120            peer_pub_ready = true;
121            fixed_stake_table = true;
122        }
123
124        let builders = if matches!(network_config.builder, BuilderType::External) {
125            network_config.config.builder_urls.clone().into()
126        } else {
127            vec![]
128        };
129
130        OrchestratorState {
131            latest_index: 0,
132            tmp_latest_index: 0,
133            config: network_config,
134            peer_pub_ready,
135            pub_posted: HashMap::new(),
136            nodes_connected: HashSet::new(),
137            start: false,
138            bench_results: BenchResults::default(),
139            nodes_post_results: 0,
140            manual_start_allowed: true,
141            accepting_new_keys: true,
142            builders,
143            fixed_stake_table,
144        }
145    }
146
147    /// Output the results to a csv file according to orchestrator state
148    pub fn output_to_csv(&self) {
149        let output_csv = BenchResultsDownloadConfig {
150            commit_sha: self.config.commit_sha.clone(),
151            total_nodes: self.config.config.num_nodes_with_stake.into(),
152            da_committee_size: self.config.config.da_staked_committee_size,
153            fixed_leader_for_gpuvid: self.config.config.fixed_leader_for_gpuvid,
154            transactions_per_round: self.config.transactions_per_round,
155            transaction_size: self.bench_results.transaction_size_in_bytes,
156            rounds: self.config.rounds,
157            partial_results: self.bench_results.partial_results.clone(),
158            avg_latency_in_sec: self.bench_results.avg_latency_in_sec,
159            minimum_latency_in_sec: self.bench_results.minimum_latency_in_sec,
160            maximum_latency_in_sec: self.bench_results.maximum_latency_in_sec,
161            throughput_bytes_per_sec: self.bench_results.throughput_bytes_per_sec,
162            total_transactions_committed: self.bench_results.total_transactions_committed,
163            total_time_elapsed_in_sec: self.bench_results.total_time_elapsed_in_sec,
164            total_num_views: self.bench_results.total_num_views,
165            failed_num_views: self.bench_results.failed_num_views,
166            committee_type: self.bench_results.committee_type.clone(),
167        };
168        // Open the CSV file in append mode
169        let results_csv_file = OpenOptions::new()
170            .create(true)
171            .append(true) // Open in append mode
172            .open("scripts/benchmarks_results/results.csv")
173            .unwrap();
174        // Open a file for writing
175        let mut wtr = Writer::from_writer(results_csv_file);
176        let _ = wtr.serialize(output_csv);
177        let _ = wtr.flush();
178        println!("Results successfully saved in scripts/benchmarks_results/results.csv");
179    }
180}
181
182/// An api exposed by the orchestrator
183pub trait OrchestratorApi<TYPES: NodeType> {
184    /// Post an identity to the orchestrator. Takes in optional
185    /// arguments so others can identify us on the Libp2p network.
186    /// # Errors
187    /// If we were unable to serve the request
188    fn post_identity(
189        &mut self,
190        libp2p_address: Option<Multiaddr>,
191        libp2p_public_key: Option<PeerId>,
192    ) -> Result<u16, ServerError>;
193    /// post endpoint for each node's config
194    /// # Errors
195    /// if unable to serve
196    fn post_getconfig(&mut self, _node_index: u16) -> Result<NetworkConfig<TYPES>, ServerError>;
197    /// get endpoint for the next available temporary node index
198    /// # Errors
199    /// if unable to serve
200    fn get_tmp_node_index(&mut self) -> Result<u16, ServerError>;
201    /// post endpoint for each node's public key
202    /// # Errors
203    /// if unable to serve
204    fn register_public_key(
205        &mut self,
206        pubkey: &mut Vec<u8>,
207        is_da: bool,
208        libp2p_address: Option<Multiaddr>,
209        libp2p_public_key: Option<PeerId>,
210    ) -> Result<(u64, bool), ServerError>;
211    /// post endpoint for whether or not all peers public keys are ready
212    /// # Errors
213    /// if unable to serve
214    fn peer_pub_ready(&self) -> Result<bool, ServerError>;
215    /// get endpoint for the network config after all peers public keys are collected
216    /// # Errors
217    /// if unable to serve
218    fn post_config_after_peer_collected(&mut self) -> Result<NetworkConfig<TYPES>, ServerError>;
219    /// get endpoint for whether or not the run has started
220    /// # Errors
221    /// if unable to serve
222    fn get_start(&self) -> Result<bool, ServerError>;
223    /// post endpoint for the results of the run
224    /// # Errors
225    /// if unable to serve
226    fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError>;
227    /// A node POSTs its public key to let the orchestrator know that it is ready
228    /// # Errors
229    /// if unable to serve
230    fn post_ready(&mut self, peer_config: &PeerConfig<TYPES>) -> Result<(), ServerError>;
231    /// post endpoint for manually starting the orchestrator
232    /// # Errors
233    /// if unable to serve
234    fn post_manual_start(&mut self, password_bytes: Vec<u8>) -> Result<(), ServerError>;
235    /// post endpoint for registering a builder with the orchestrator
236    /// # Errors
237    /// if unable to serve
238    fn post_builder(&mut self, builder: Url) -> Result<(), ServerError>;
239    /// get endpoints for builders
240    /// # Errors
241    /// if not all builders are registered yet
242    fn get_builders(&self) -> Result<Vec<Url>, ServerError>;
243}
244
245impl<TYPES: NodeType> OrchestratorState<TYPES>
246where
247    TYPES::SignatureKey: serde::Serialize + Clone + SignatureKey + 'static,
248{
249    /// register a node with an unknown public key.
250    /// this method should be used when we don't have a fixed stake table
251    fn register_unknown(
252        &mut self,
253        pubkey: &mut Vec<u8>,
254        da_requested: bool,
255        libp2p_address: Option<Multiaddr>,
256        libp2p_public_key: Option<PeerId>,
257    ) -> Result<(u64, bool), ServerError> {
258        if let Some((node_index, is_da)) = self.pub_posted.get(pubkey) {
259            return Ok((*node_index, *is_da));
260        }
261
262        if !self.accepting_new_keys {
263            return Err(ServerError {
264                status: tide_disco::StatusCode::FORBIDDEN,
265                message: "Network has been started manually, and is no longer registering new \
266                          keys."
267                    .to_string(),
268            });
269        }
270
271        let node_index = self.pub_posted.len() as u64;
272
273        // Deserialize the public key
274        let staked_pubkey = PeerConfig::<TYPES>::from_bytes(pubkey).unwrap();
275
276        self.config
277            .config
278            .known_nodes_with_stake
279            .push(staked_pubkey.clone());
280
281        let mut added_to_da = false;
282
283        let da_full =
284            self.config.config.known_da_nodes.len() >= self.config.config.da_staked_committee_size;
285
286        #[allow(clippy::nonminimal_bool)]
287        // We add the node to the DA committee depending on either its node index or whether it requested membership.
288        //
289        // Since we issue `node_index` incrementally, if we are deciding DA membership by node_index
290        // we only need to check that the DA committee is not yet full.
291        //
292        // Note: this logically simplifies to (self.config.indexed_da || da_requested) && !da_full,
293        // but writing it that way makes it a little less clear to me.
294        if (self.config.indexed_da || (!self.config.indexed_da && da_requested)) && !da_full {
295            self.config.config.known_da_nodes.push(staked_pubkey);
296            added_to_da = true;
297        }
298
299        self.pub_posted
300            .insert(pubkey.clone(), (node_index, added_to_da));
301
302        // If the orchestrator is set up for libp2p and we have supplied the proper
303        // Libp2p data, add our node to the list of bootstrap nodes.
304        if self.config.libp2p_config.clone().is_some() {
305            if let (Some(libp2p_public_key), Some(libp2p_address)) =
306                (libp2p_public_key, libp2p_address)
307            {
308                // Push to our bootstrap nodes
309                self.config
310                    .libp2p_config
311                    .as_mut()
312                    .unwrap()
313                    .bootstrap_nodes
314                    .push((libp2p_public_key, libp2p_address));
315            }
316        }
317
318        tracing::error!("Posted public key for node_index {node_index}");
319
320        // node_index starts at 0, so once it matches `num_nodes_with_stake`
321        // we will have registered one node too many. hence, we want `node_index + 1`.
322        if node_index + 1 >= (self.config.config.num_nodes_with_stake.get() as u64) {
323            self.peer_pub_ready = true;
324            self.accepting_new_keys = false;
325        }
326        Ok((node_index, added_to_da))
327    }
328
329    /// register a node on the fixed stake table, which was loaded at startup
330    fn register_from_list(
331        &mut self,
332        pubkey: &mut Vec<u8>,
333        da_requested: bool,
334        libp2p_address: Option<Multiaddr>,
335        libp2p_public_key: Option<PeerId>,
336    ) -> Result<(u64, bool), ServerError> {
337        // if we've already registered this node before, we just retrieve its info from `pub_posted`
338        if let Some((node_index, is_da)) = self.pub_posted.get(pubkey) {
339            return Ok((*node_index, *is_da));
340        }
341
342        // Deserialize the public key
343        let staked_pubkey = PeerConfig::<TYPES>::from_bytes(pubkey).unwrap();
344
345        // Check if the node is allowed to connect, returning its index and config entry if so.
346        let Some((node_index, node_config)) =
347            self.config.public_keys.iter().enumerate().find(|keys| {
348                keys.1.stake_table_key == staked_pubkey.stake_table_entry.public_key()
349            })
350        else {
351            return Err(ServerError {
352                status: tide_disco::StatusCode::FORBIDDEN,
353                message: "You are unauthorized to register with the orchestrator".to_string(),
354            });
355        };
356
357        // Check that our recorded DA status for the node matches what the node actually requested
358        if node_config.da != da_requested {
359            return Err(ServerError {
360                status: tide_disco::StatusCode::BAD_REQUEST,
361                message: format!(
362                    "Mismatch in DA status in registration for node {}. DA requested: {}, \
363                     expected: {}",
364                    node_index, da_requested, node_config.da
365                ),
366            });
367        }
368
369        let added_to_da = node_config.da;
370
371        self.pub_posted
372            .insert(pubkey.clone(), (node_index as u64, added_to_da));
373
374        // If the orchestrator is set up for libp2p and we have supplied the proper
375        // Libp2p data, add our node to the list of bootstrap nodes.
376        if self.config.libp2p_config.clone().is_some() {
377            if let (Some(libp2p_public_key), Some(libp2p_address)) =
378                (libp2p_public_key, libp2p_address)
379            {
380                // Push to our bootstrap nodes
381                self.config
382                    .libp2p_config
383                    .as_mut()
384                    .unwrap()
385                    .bootstrap_nodes
386                    .push((libp2p_public_key, libp2p_address));
387            }
388        }
389
390        tracing::error!("Node {node_index} has registered.");
391
392        Ok((node_index as u64, added_to_da))
393    }
394}
395
396impl<TYPES: NodeType> OrchestratorApi<TYPES> for OrchestratorState<TYPES>
397where
398    TYPES::SignatureKey: serde::Serialize + Clone + SignatureKey + 'static,
399{
400    /// Post an identity to the orchestrator. Takes in optional
401    /// arguments so others can identify us on the Libp2p network.
402    /// # Errors
403    /// If we were unable to serve the request
404    fn post_identity(
405        &mut self,
406        libp2p_address: Option<Multiaddr>,
407        libp2p_public_key: Option<PeerId>,
408    ) -> Result<u16, ServerError> {
409        let node_index = self.latest_index;
410        self.latest_index += 1;
411
412        if usize::from(node_index) >= self.config.config.num_nodes_with_stake.get() {
413            return Err(ServerError {
414                status: tide_disco::StatusCode::BAD_REQUEST,
415                message: "Network has reached capacity".to_string(),
416            });
417        }
418
419        // If the orchestrator is set up for libp2p and we have supplied the proper
420        // Libp2p data, add our node to the list of bootstrap nodes.
421        if self.config.libp2p_config.clone().is_some() {
422            if let (Some(libp2p_public_key), Some(libp2p_address)) =
423                (libp2p_public_key, libp2p_address)
424            {
425                // Push to our bootstrap nodes
426                self.config
427                    .libp2p_config
428                    .as_mut()
429                    .unwrap()
430                    .bootstrap_nodes
431                    .push((libp2p_public_key, libp2p_address));
432            }
433        }
434        Ok(node_index)
435    }
436
437    // Assumes nodes will set their own index that they received from the
438    // 'identity' endpoint
439    fn post_getconfig(&mut self, _node_index: u16) -> Result<NetworkConfig<TYPES>, ServerError> {
440        Ok(self.config.clone())
441    }
442
443    // Assumes one node do not get twice
444    fn get_tmp_node_index(&mut self) -> Result<u16, ServerError> {
445        let tmp_node_index = self.tmp_latest_index;
446        self.tmp_latest_index += 1;
447
448        if usize::from(tmp_node_index) >= self.config.config.num_nodes_with_stake.get() {
449            return Err(ServerError {
450                status: tide_disco::StatusCode::BAD_REQUEST,
451                message: "Node index getter for key pair generation has reached capacity"
452                    .to_string(),
453            });
454        }
455        Ok(tmp_node_index)
456    }
457
458    #[allow(clippy::cast_possible_truncation)]
459    fn register_public_key(
460        &mut self,
461        pubkey: &mut Vec<u8>,
462        da_requested: bool,
463        libp2p_address: Option<Multiaddr>,
464        libp2p_public_key: Option<PeerId>,
465    ) -> Result<(u64, bool), ServerError> {
466        if self.fixed_stake_table {
467            self.register_from_list(pubkey, da_requested, libp2p_address, libp2p_public_key)
468        } else {
469            self.register_unknown(pubkey, da_requested, libp2p_address, libp2p_public_key)
470        }
471    }
472
473    fn peer_pub_ready(&self) -> Result<bool, ServerError> {
474        if !self.peer_pub_ready {
475            return Err(ServerError {
476                status: tide_disco::StatusCode::BAD_REQUEST,
477                message: "Peer's public configs are not ready".to_string(),
478            });
479        }
480        Ok(self.peer_pub_ready)
481    }
482
483    fn post_config_after_peer_collected(&mut self) -> Result<NetworkConfig<TYPES>, ServerError> {
484        if !self.peer_pub_ready {
485            return Err(ServerError {
486                status: tide_disco::StatusCode::BAD_REQUEST,
487                message: "Peer's public configs are not ready".to_string(),
488            });
489        }
490
491        Ok(self.config.clone())
492    }
493
494    fn get_start(&self) -> Result<bool, ServerError> {
495        // println!("{}", self.start);
496        if !self.start {
497            return Err(ServerError {
498                status: tide_disco::StatusCode::BAD_REQUEST,
499                message: "Network is not ready to start".to_string(),
500            });
501        }
502        Ok(self.start)
503    }
504
505    // Assumes nodes do not post 'ready' twice
506    fn post_ready(&mut self, peer_config: &PeerConfig<TYPES>) -> Result<(), ServerError> {
507        // If we have not disabled registration verification.
508        // Is this node allowed to connect?
509        if !self
510            .config
511            .config
512            .known_nodes_with_stake
513            .contains(peer_config)
514        {
515            return Err(ServerError {
516                status: tide_disco::StatusCode::FORBIDDEN,
517                message: "You are unauthorized to register with the orchestrator".to_string(),
518            });
519        }
520
521        // `HashSet::insert()` returns whether the node was newly inserted (true) or not
522        if self.nodes_connected.insert(peer_config.clone()) {
523            tracing::error!(
524                "Node {peer_config} connected. Total nodes connected: {}",
525                self.nodes_connected.len()
526            );
527        }
528
529        // i.e. nodes_connected >= num_nodes_with_stake * (start_threshold.0 / start_threshold.1)
530        if self.nodes_connected.len() as u64 * self.config.config.start_threshold.1
531            >= (self.config.config.num_nodes_with_stake.get() as u64)
532                * self.config.config.start_threshold.0
533        {
534            self.accepting_new_keys = false;
535            self.manual_start_allowed = false;
536            self.start = true;
537        }
538
539        Ok(())
540    }
541
542    /// Manually start the network
543    fn post_manual_start(&mut self, password_bytes: Vec<u8>) -> Result<(), ServerError> {
544        if !self.manual_start_allowed {
545            return Err(ServerError {
546                status: tide_disco::StatusCode::FORBIDDEN,
547                message: "Configs have already been distributed to nodes, and the network can no \
548                          longer be started manually."
549                    .to_string(),
550            });
551        }
552
553        let password = String::from_utf8(password_bytes)
554            .expect("Failed to decode raw password as UTF-8 string.");
555
556        // Check that the password matches
557        if self.config.manual_start_password != Some(password) {
558            return Err(ServerError {
559                status: tide_disco::StatusCode::FORBIDDEN,
560                message: "Incorrect password.".to_string(),
561            });
562        }
563
564        let registered_nodes_with_stake = self.config.config.known_nodes_with_stake.len();
565        let registered_da_nodes = self.config.config.known_da_nodes.len();
566
567        if registered_da_nodes > 1 {
568            self.config.config.num_nodes_with_stake =
569                std::num::NonZeroUsize::new(registered_nodes_with_stake)
570                    .expect("Failed to convert to NonZeroUsize; this should be impossible.");
571
572            self.config.config.da_staked_committee_size = registered_da_nodes;
573        } else {
574            return Err(ServerError {
575                status: tide_disco::StatusCode::FORBIDDEN,
576                message: format!(
577                    "We cannot manually start the network, because we only have \
578                     {registered_nodes_with_stake} nodes with stake registered, with \
579                     {registered_da_nodes} DA nodes."
580                ),
581            });
582        }
583
584        self.accepting_new_keys = false;
585        self.manual_start_allowed = false;
586        self.peer_pub_ready = true;
587        self.start = true;
588
589        Ok(())
590    }
591
592    // Aggregates results of the run from all nodes
593    fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError> {
594        if metrics.total_transactions_committed != 0 {
595            // Deal with the bench results
596            if self.bench_results.total_transactions_committed == 0 {
597                self.bench_results = metrics;
598            } else {
599                // Deal with the bench results from different nodes
600                let cur_metrics = self.bench_results.clone();
601                self.bench_results.avg_latency_in_sec = (metrics.avg_latency_in_sec
602                    * metrics.num_latency
603                    + cur_metrics.avg_latency_in_sec * cur_metrics.num_latency)
604                    / (metrics.num_latency + cur_metrics.num_latency);
605                self.bench_results.num_latency += metrics.num_latency;
606                self.bench_results.minimum_latency_in_sec = metrics
607                    .minimum_latency_in_sec
608                    .min(cur_metrics.minimum_latency_in_sec);
609                self.bench_results.maximum_latency_in_sec = metrics
610                    .maximum_latency_in_sec
611                    .max(cur_metrics.maximum_latency_in_sec);
612                self.bench_results.throughput_bytes_per_sec = metrics
613                    .throughput_bytes_per_sec
614                    .max(cur_metrics.throughput_bytes_per_sec);
615                self.bench_results.total_transactions_committed = metrics
616                    .total_transactions_committed
617                    .max(cur_metrics.total_transactions_committed);
618                self.bench_results.total_time_elapsed_in_sec = metrics
619                    .total_time_elapsed_in_sec
620                    .max(cur_metrics.total_time_elapsed_in_sec);
621                self.bench_results.total_num_views =
622                    metrics.total_num_views.min(cur_metrics.total_num_views);
623                self.bench_results.failed_num_views =
624                    metrics.failed_num_views.max(cur_metrics.failed_num_views);
625            }
626        }
627        self.nodes_post_results += 1;
628        if self.bench_results.partial_results == "Unset" {
629            self.bench_results.partial_results = "One".to_string();
630            self.bench_results.printout();
631            self.output_to_csv();
632        }
633        if self.bench_results.partial_results == "One"
634            && self.nodes_post_results >= (self.config.config.da_staked_committee_size as u64 / 2)
635        {
636            self.bench_results.partial_results = "HalfDA".to_string();
637            self.bench_results.printout();
638            self.output_to_csv();
639        }
640        if self.bench_results.partial_results == "HalfDA"
641            && self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64 / 2)
642        {
643            self.bench_results.partial_results = "Half".to_string();
644            self.bench_results.printout();
645            self.output_to_csv();
646        }
647        if self.bench_results.partial_results != "Full"
648            && self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64)
649        {
650            self.bench_results.partial_results = "Full".to_string();
651            self.bench_results.printout();
652            self.output_to_csv();
653        }
654        Ok(())
655    }
656
657    fn post_builder(&mut self, builder: Url) -> Result<(), ServerError> {
658        self.builders.push(builder);
659        Ok(())
660    }
661
662    fn get_builders(&self) -> Result<Vec<Url>, ServerError> {
663        if !matches!(self.config.builder, BuilderType::External)
664            && self.builders.len() != self.config.config.da_staked_committee_size
665        {
666            return Err(ServerError {
667                status: tide_disco::StatusCode::NOT_FOUND,
668                message: "Not all builders are registered yet".to_string(),
669            });
670        }
671        Ok(self.builders.clone())
672    }
673}
674
675/// Sets up all API routes
676#[allow(clippy::too_many_lines)]
677fn define_api<TYPES, State, VER>() -> Result<Api<State, ServerError, VER>, ApiError>
678where
679    TYPES: NodeType,
680    State: 'static + Send + Sync + ReadState + WriteState,
681    <State as ReadState>::State: Send + Sync + OrchestratorApi<TYPES>,
682    TYPES::SignatureKey: serde::Serialize,
683    VER: StaticVersionType + 'static,
684{
685    let api_toml = toml::from_str::<toml::Value>(include_str!(concat!(
686        env!("CARGO_MANIFEST_DIR"),
687        "/api.toml"
688    )))
689    .expect("API file is not valid toml");
690    let mut api = Api::<State, ServerError, VER>::new(api_toml)?;
691    api.post("post_identity", |req, state| {
692        async move {
693            // Read the bytes from the body
694            let mut body_bytes = req.body_bytes();
695            body_bytes.drain(..12);
696
697            // Decode the libp2p data so we can add to our bootstrap nodes (if supplied)
698            let Ok((libp2p_address, libp2p_public_key)) =
699                vbs::Serializer::<OrchestratorVersion>::deserialize(&body_bytes)
700            else {
701                return Err(ServerError {
702                    status: tide_disco::StatusCode::BAD_REQUEST,
703                    message: "Malformed body".to_string(),
704                });
705            };
706
707            // Call our state function to process the request
708            state.post_identity(libp2p_address, libp2p_public_key)
709        }
710        .boxed()
711    })?
712    .post("post_getconfig", |req, state| {
713        async move {
714            let node_index = req.integer_param("node_index")?;
715            state.post_getconfig(node_index)
716        }
717        .boxed()
718    })?
719    .post("get_tmp_node_index", |_req, state| {
720        async move { state.get_tmp_node_index() }.boxed()
721    })?
722    .post("post_pubkey", |req, state| {
723        async move {
724            let is_da = req.boolean_param("is_da")?;
725            // Read the bytes from the body
726            let mut body_bytes = req.body_bytes();
727            body_bytes.drain(..12);
728
729            // Decode the libp2p data so we can add to our bootstrap nodes (if supplied)
730            let Ok((mut pubkey, libp2p_address, libp2p_public_key)) =
731                vbs::Serializer::<OrchestratorVersion>::deserialize(&body_bytes)
732            else {
733                return Err(ServerError {
734                    status: tide_disco::StatusCode::BAD_REQUEST,
735                    message: "Malformed body".to_string(),
736                });
737            };
738
739            state.register_public_key(&mut pubkey, is_da, libp2p_address, libp2p_public_key)
740        }
741        .boxed()
742    })?
743    .get("peer_pubconfig_ready", |_req, state| {
744        async move { state.peer_pub_ready() }.boxed()
745    })?
746    .post("post_config_after_peer_collected", |_req, state| {
747        async move { state.post_config_after_peer_collected() }.boxed()
748    })?
749    .post(
750        "post_ready",
751        |req, state: &mut <State as ReadState>::State| {
752            async move {
753                let mut body_bytes = req.body_bytes();
754                body_bytes.drain(..12);
755                // Decode the payload-supplied pubkey
756                let Some(pubkey) = PeerConfig::<TYPES>::from_bytes(&body_bytes) else {
757                    return Err(ServerError {
758                        status: tide_disco::StatusCode::BAD_REQUEST,
759                        message: "Malformed body".to_string(),
760                    });
761                };
762                state.post_ready(&pubkey)
763            }
764            .boxed()
765        },
766    )?
767    .post(
768        "post_manual_start",
769        |req, state: &mut <State as ReadState>::State| {
770            async move {
771                let password = req.body_bytes();
772                state.post_manual_start(password)
773            }
774            .boxed()
775        },
776    )?
777    .get("get_start", |_req, state| {
778        async move { state.get_start() }.boxed()
779    })?
780    .post("post_results", |req, state| {
781        async move {
782            let metrics: Result<BenchResults, RequestError> = req.body_json();
783            state.post_run_results(metrics.unwrap())
784        }
785        .boxed()
786    })?
787    .post("post_builder", |req, state| {
788        async move {
789            // Read the bytes from the body
790            let mut body_bytes = req.body_bytes();
791            body_bytes.drain(..12);
792
793            let Ok(urls) =
794                vbs::Serializer::<OrchestratorVersion>::deserialize::<Vec<Url>>(&body_bytes)
795            else {
796                return Err(ServerError {
797                    status: tide_disco::StatusCode::BAD_REQUEST,
798                    message: "Malformed body".to_string(),
799                });
800            };
801
802            let mut futures = urls
803                .into_iter()
804                .map(|url| async {
805                    let client: surf_disco::Client<ServerError, OrchestratorVersion> =
806                        surf_disco::client::Client::builder(url.clone()).build();
807                    if client.connect(Some(Duration::from_secs(2))).await {
808                        Some(url)
809                    } else {
810                        None
811                    }
812                })
813                .collect::<FuturesUnordered<_>>()
814                .filter_map(futures::future::ready);
815
816            if let Some(url) = futures.next().await {
817                state.post_builder(url)
818            } else {
819                Err(ServerError {
820                    status: tide_disco::StatusCode::BAD_REQUEST,
821                    message: "No reachable addresses".to_string(),
822                })
823            }
824        }
825        .boxed()
826    })?
827    .get("get_builders", |_req, state| {
828        async move { state.get_builders() }.boxed()
829    })?;
830    Ok(api)
831}
832
833/// Runs the orchestrator
834/// # Errors
835/// This errors if tide disco runs into an issue during serving
836/// # Panics
837/// This panics if unable to register the api with tide disco
838pub async fn run_orchestrator<TYPES: NodeType>(
839    mut network_config: NetworkConfig<TYPES>,
840    url: Url,
841) -> io::Result<()>
842where
843    TYPES::SignatureKey: 'static + serde::Serialize,
844{
845    let env_password = std::env::var("ORCHESTRATOR_MANUAL_START_PASSWORD");
846
847    if env_password.is_ok() {
848        tracing::warn!(
849            "Took orchestrator manual start password from the environment variable: \
850             ORCHESTRATOR_MANUAL_START_PASSWORD={:?}",
851            env_password
852        );
853        network_config.manual_start_password = env_password.ok();
854    }
855
856    // Try to overwrite the network_config public keys
857    // from the file the env var points to, or panic.
858    {
859        let env_public_keys = std::env::var("ORCHESTRATOR_PUBLIC_KEYS");
860
861        if let Ok(filepath) = env_public_keys {
862            #[allow(clippy::panic)]
863            let config_file_as_string: String = fs::read_to_string(filepath.clone())
864                .unwrap_or_else(|_| panic!("Could not read config file located at {filepath}"));
865
866            let file: PublicKeysFile<TYPES> =
867                toml::from_str::<PublicKeysFile<TYPES>>(&config_file_as_string)
868                    .expect("Unable to convert config file to TOML");
869
870            network_config.public_keys = file.public_keys;
871        }
872    }
873
874    network_config.config.known_nodes_with_stake = network_config
875        .public_keys
876        .iter()
877        .map(|keys| PeerConfig {
878            stake_table_entry: keys
879                .stake_table_key
880                .stake_table_entry(U256::from(keys.stake)),
881            state_ver_key: keys.state_ver_key.clone(),
882        })
883        .collect();
884
885    network_config.config.known_da_nodes = network_config
886        .public_keys
887        .iter()
888        .filter(|keys| keys.da)
889        .map(|keys| PeerConfig {
890            stake_table_entry: keys
891                .stake_table_key
892                .stake_table_entry(U256::from(keys.stake)),
893            state_ver_key: keys.state_ver_key.clone(),
894        })
895        .collect();
896
897    let web_api = define_api().map_err(|_e| io::Error::other("Failed to define api"));
898
899    let state: RwLock<OrchestratorState<TYPES>> =
900        RwLock::new(OrchestratorState::new(network_config));
901
902    let mut app = App::<RwLock<OrchestratorState<TYPES>>, ServerError>::with_state(state);
903    app.register_module::<ServerError, OrchestratorVersion>("api", web_api.unwrap())
904        .expect("Error registering api");
905    tracing::error!("listening on {url:?}");
906    app.serve(url, ORCHESTRATOR_VERSION).await
907}