1pub mod client;
11
12use std::{
13 collections::{HashMap, HashSet},
14 fs,
15 fs::OpenOptions,
16 io,
17 time::Duration,
18};
19
20use alloy::primitives::U256;
21use async_lock::RwLock;
22use client::{BenchResults, BenchResultsDownloadConfig};
23use csv::Writer;
24use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
25use hotshot_types::{
26 network::{BuilderType, NetworkConfig, PublicKeysFile},
27 traits::{
28 node_implementation::NodeType,
29 signature_key::{SignatureKey, StakeTableEntryType},
30 },
31 PeerConfig,
32};
33use libp2p_identity::{
34 ed25519::{Keypair as EdKeypair, SecretKey},
35 Keypair, PeerId,
36};
37use multiaddr::Multiaddr;
38use surf_disco::Url;
39use tide_disco::{
40 api::ApiError,
41 error::ServerError,
42 method::{ReadState, WriteState},
43 Api, App, RequestError,
44};
45use vbs::{
46 version::{StaticVersion, StaticVersionType},
47 BinarySerializer,
48};
49
50pub const ORCHESTRATOR_MAJOR_VERSION: u16 = 0;
53pub const ORCHESTRATOR_MINOR_VERSION: u16 = 1;
55pub type OrchestratorVersion =
57 StaticVersion<ORCHESTRATOR_MAJOR_VERSION, ORCHESTRATOR_MINOR_VERSION>;
58pub const ORCHESTRATOR_VERSION: OrchestratorVersion = StaticVersion {};
60
61#[must_use]
65pub fn libp2p_generate_indexed_identity(seed: [u8; 32], index: u64) -> Keypair {
66 let mut hasher = blake3::Hasher::new();
67 hasher.update(&seed);
68 hasher.update(&index.to_le_bytes());
69 let new_seed = *hasher.finalize().as_bytes();
70 let sk_bytes = SecretKey::try_from_bytes(new_seed).unwrap();
71 <EdKeypair as From<SecretKey>>::from(sk_bytes).into()
72}
73
74#[derive(Default, Clone)]
76#[allow(clippy::struct_excessive_bools)]
77struct OrchestratorState<TYPES: NodeType> {
78 latest_index: u16,
80 tmp_latest_index: u16,
82 config: NetworkConfig<TYPES>,
84 peer_pub_ready: bool,
86 pub_posted: HashMap<Vec<u8>, (u64, bool)>,
88 start: bool,
91 nodes_connected: HashSet<PeerConfig<TYPES>>,
93 bench_results: BenchResults,
95 nodes_post_results: u64,
97 manual_start_allowed: bool,
99 accepting_new_keys: bool,
101 builders: Vec<Url>,
103 fixed_stake_table: bool,
105}
106
107impl<TYPES: NodeType> OrchestratorState<TYPES> {
108 pub fn new(network_config: NetworkConfig<TYPES>) -> Self {
110 let mut peer_pub_ready = false;
111 let mut fixed_stake_table = false;
112
113 if network_config.config.known_nodes_with_stake.is_empty() {
114 println!(
115 "No nodes were loaded from the config file. Nodes will be allowed to register \
116 dynamically."
117 );
118 } else {
119 println!("Initializing orchestrator with fixed stake table.");
120 peer_pub_ready = true;
121 fixed_stake_table = true;
122 }
123
124 let builders = if matches!(network_config.builder, BuilderType::External) {
125 network_config.config.builder_urls.clone().into()
126 } else {
127 vec![]
128 };
129
130 OrchestratorState {
131 latest_index: 0,
132 tmp_latest_index: 0,
133 config: network_config,
134 peer_pub_ready,
135 pub_posted: HashMap::new(),
136 nodes_connected: HashSet::new(),
137 start: false,
138 bench_results: BenchResults::default(),
139 nodes_post_results: 0,
140 manual_start_allowed: true,
141 accepting_new_keys: true,
142 builders,
143 fixed_stake_table,
144 }
145 }
146
147 pub fn output_to_csv(&self) {
149 let output_csv = BenchResultsDownloadConfig {
150 commit_sha: self.config.commit_sha.clone(),
151 total_nodes: self.config.config.num_nodes_with_stake.into(),
152 da_committee_size: self.config.config.da_staked_committee_size,
153 fixed_leader_for_gpuvid: self.config.config.fixed_leader_for_gpuvid,
154 transactions_per_round: self.config.transactions_per_round,
155 transaction_size: self.bench_results.transaction_size_in_bytes,
156 rounds: self.config.rounds,
157 partial_results: self.bench_results.partial_results.clone(),
158 avg_latency_in_sec: self.bench_results.avg_latency_in_sec,
159 minimum_latency_in_sec: self.bench_results.minimum_latency_in_sec,
160 maximum_latency_in_sec: self.bench_results.maximum_latency_in_sec,
161 throughput_bytes_per_sec: self.bench_results.throughput_bytes_per_sec,
162 total_transactions_committed: self.bench_results.total_transactions_committed,
163 total_time_elapsed_in_sec: self.bench_results.total_time_elapsed_in_sec,
164 total_num_views: self.bench_results.total_num_views,
165 failed_num_views: self.bench_results.failed_num_views,
166 committee_type: self.bench_results.committee_type.clone(),
167 };
168 let results_csv_file = OpenOptions::new()
170 .create(true)
171 .append(true) .open("scripts/benchmarks_results/results.csv")
173 .unwrap();
174 let mut wtr = Writer::from_writer(results_csv_file);
176 let _ = wtr.serialize(output_csv);
177 let _ = wtr.flush();
178 println!("Results successfully saved in scripts/benchmarks_results/results.csv");
179 }
180}
181
182pub trait OrchestratorApi<TYPES: NodeType> {
184 fn post_identity(
189 &mut self,
190 libp2p_address: Option<Multiaddr>,
191 libp2p_public_key: Option<PeerId>,
192 ) -> Result<u16, ServerError>;
193 fn post_getconfig(&mut self, _node_index: u16) -> Result<NetworkConfig<TYPES>, ServerError>;
197 fn get_tmp_node_index(&mut self) -> Result<u16, ServerError>;
201 fn register_public_key(
205 &mut self,
206 pubkey: &mut Vec<u8>,
207 is_da: bool,
208 libp2p_address: Option<Multiaddr>,
209 libp2p_public_key: Option<PeerId>,
210 ) -> Result<(u64, bool), ServerError>;
211 fn peer_pub_ready(&self) -> Result<bool, ServerError>;
215 fn post_config_after_peer_collected(&mut self) -> Result<NetworkConfig<TYPES>, ServerError>;
219 fn get_start(&self) -> Result<bool, ServerError>;
223 fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError>;
227 fn post_ready(&mut self, peer_config: &PeerConfig<TYPES>) -> Result<(), ServerError>;
231 fn post_manual_start(&mut self, password_bytes: Vec<u8>) -> Result<(), ServerError>;
235 fn post_builder(&mut self, builder: Url) -> Result<(), ServerError>;
239 fn get_builders(&self) -> Result<Vec<Url>, ServerError>;
243}
244
245impl<TYPES: NodeType> OrchestratorState<TYPES>
246where
247 TYPES::SignatureKey: serde::Serialize + Clone + SignatureKey + 'static,
248{
249 fn register_unknown(
252 &mut self,
253 pubkey: &mut Vec<u8>,
254 da_requested: bool,
255 libp2p_address: Option<Multiaddr>,
256 libp2p_public_key: Option<PeerId>,
257 ) -> Result<(u64, bool), ServerError> {
258 if let Some((node_index, is_da)) = self.pub_posted.get(pubkey) {
259 return Ok((*node_index, *is_da));
260 }
261
262 if !self.accepting_new_keys {
263 return Err(ServerError {
264 status: tide_disco::StatusCode::FORBIDDEN,
265 message: "Network has been started manually, and is no longer registering new \
266 keys."
267 .to_string(),
268 });
269 }
270
271 let node_index = self.pub_posted.len() as u64;
272
273 let staked_pubkey = PeerConfig::<TYPES>::from_bytes(pubkey).unwrap();
275
276 self.config
277 .config
278 .known_nodes_with_stake
279 .push(staked_pubkey.clone());
280
281 let mut added_to_da = false;
282
283 let da_full =
284 self.config.config.known_da_nodes.len() >= self.config.config.da_staked_committee_size;
285
286 #[allow(clippy::nonminimal_bool)]
287 if (self.config.indexed_da || (!self.config.indexed_da && da_requested)) && !da_full {
295 self.config.config.known_da_nodes.push(staked_pubkey);
296 added_to_da = true;
297 }
298
299 self.pub_posted
300 .insert(pubkey.clone(), (node_index, added_to_da));
301
302 if self.config.libp2p_config.clone().is_some() {
305 if let (Some(libp2p_public_key), Some(libp2p_address)) =
306 (libp2p_public_key, libp2p_address)
307 {
308 self.config
310 .libp2p_config
311 .as_mut()
312 .unwrap()
313 .bootstrap_nodes
314 .push((libp2p_public_key, libp2p_address));
315 }
316 }
317
318 tracing::error!("Posted public key for node_index {node_index}");
319
320 if node_index + 1 >= (self.config.config.num_nodes_with_stake.get() as u64) {
323 self.peer_pub_ready = true;
324 self.accepting_new_keys = false;
325 }
326 Ok((node_index, added_to_da))
327 }
328
329 fn register_from_list(
331 &mut self,
332 pubkey: &mut Vec<u8>,
333 da_requested: bool,
334 libp2p_address: Option<Multiaddr>,
335 libp2p_public_key: Option<PeerId>,
336 ) -> Result<(u64, bool), ServerError> {
337 if let Some((node_index, is_da)) = self.pub_posted.get(pubkey) {
339 return Ok((*node_index, *is_da));
340 }
341
342 let staked_pubkey = PeerConfig::<TYPES>::from_bytes(pubkey).unwrap();
344
345 let Some((node_index, node_config)) =
347 self.config.public_keys.iter().enumerate().find(|keys| {
348 keys.1.stake_table_key == staked_pubkey.stake_table_entry.public_key()
349 })
350 else {
351 return Err(ServerError {
352 status: tide_disco::StatusCode::FORBIDDEN,
353 message: "You are unauthorized to register with the orchestrator".to_string(),
354 });
355 };
356
357 if node_config.da != da_requested {
359 return Err(ServerError {
360 status: tide_disco::StatusCode::BAD_REQUEST,
361 message: format!(
362 "Mismatch in DA status in registration for node {}. DA requested: {}, \
363 expected: {}",
364 node_index, da_requested, node_config.da
365 ),
366 });
367 }
368
369 let added_to_da = node_config.da;
370
371 self.pub_posted
372 .insert(pubkey.clone(), (node_index as u64, added_to_da));
373
374 if self.config.libp2p_config.clone().is_some() {
377 if let (Some(libp2p_public_key), Some(libp2p_address)) =
378 (libp2p_public_key, libp2p_address)
379 {
380 self.config
382 .libp2p_config
383 .as_mut()
384 .unwrap()
385 .bootstrap_nodes
386 .push((libp2p_public_key, libp2p_address));
387 }
388 }
389
390 tracing::error!("Node {node_index} has registered.");
391
392 Ok((node_index as u64, added_to_da))
393 }
394}
395
396impl<TYPES: NodeType> OrchestratorApi<TYPES> for OrchestratorState<TYPES>
397where
398 TYPES::SignatureKey: serde::Serialize + Clone + SignatureKey + 'static,
399{
400 fn post_identity(
405 &mut self,
406 libp2p_address: Option<Multiaddr>,
407 libp2p_public_key: Option<PeerId>,
408 ) -> Result<u16, ServerError> {
409 let node_index = self.latest_index;
410 self.latest_index += 1;
411
412 if usize::from(node_index) >= self.config.config.num_nodes_with_stake.get() {
413 return Err(ServerError {
414 status: tide_disco::StatusCode::BAD_REQUEST,
415 message: "Network has reached capacity".to_string(),
416 });
417 }
418
419 if self.config.libp2p_config.clone().is_some() {
422 if let (Some(libp2p_public_key), Some(libp2p_address)) =
423 (libp2p_public_key, libp2p_address)
424 {
425 self.config
427 .libp2p_config
428 .as_mut()
429 .unwrap()
430 .bootstrap_nodes
431 .push((libp2p_public_key, libp2p_address));
432 }
433 }
434 Ok(node_index)
435 }
436
437 fn post_getconfig(&mut self, _node_index: u16) -> Result<NetworkConfig<TYPES>, ServerError> {
440 Ok(self.config.clone())
441 }
442
443 fn get_tmp_node_index(&mut self) -> Result<u16, ServerError> {
445 let tmp_node_index = self.tmp_latest_index;
446 self.tmp_latest_index += 1;
447
448 if usize::from(tmp_node_index) >= self.config.config.num_nodes_with_stake.get() {
449 return Err(ServerError {
450 status: tide_disco::StatusCode::BAD_REQUEST,
451 message: "Node index getter for key pair generation has reached capacity"
452 .to_string(),
453 });
454 }
455 Ok(tmp_node_index)
456 }
457
458 #[allow(clippy::cast_possible_truncation)]
459 fn register_public_key(
460 &mut self,
461 pubkey: &mut Vec<u8>,
462 da_requested: bool,
463 libp2p_address: Option<Multiaddr>,
464 libp2p_public_key: Option<PeerId>,
465 ) -> Result<(u64, bool), ServerError> {
466 if self.fixed_stake_table {
467 self.register_from_list(pubkey, da_requested, libp2p_address, libp2p_public_key)
468 } else {
469 self.register_unknown(pubkey, da_requested, libp2p_address, libp2p_public_key)
470 }
471 }
472
473 fn peer_pub_ready(&self) -> Result<bool, ServerError> {
474 if !self.peer_pub_ready {
475 return Err(ServerError {
476 status: tide_disco::StatusCode::BAD_REQUEST,
477 message: "Peer's public configs are not ready".to_string(),
478 });
479 }
480 Ok(self.peer_pub_ready)
481 }
482
483 fn post_config_after_peer_collected(&mut self) -> Result<NetworkConfig<TYPES>, ServerError> {
484 if !self.peer_pub_ready {
485 return Err(ServerError {
486 status: tide_disco::StatusCode::BAD_REQUEST,
487 message: "Peer's public configs are not ready".to_string(),
488 });
489 }
490
491 Ok(self.config.clone())
492 }
493
494 fn get_start(&self) -> Result<bool, ServerError> {
495 if !self.start {
497 return Err(ServerError {
498 status: tide_disco::StatusCode::BAD_REQUEST,
499 message: "Network is not ready to start".to_string(),
500 });
501 }
502 Ok(self.start)
503 }
504
505 fn post_ready(&mut self, peer_config: &PeerConfig<TYPES>) -> Result<(), ServerError> {
507 if !self
510 .config
511 .config
512 .known_nodes_with_stake
513 .contains(peer_config)
514 {
515 return Err(ServerError {
516 status: tide_disco::StatusCode::FORBIDDEN,
517 message: "You are unauthorized to register with the orchestrator".to_string(),
518 });
519 }
520
521 if self.nodes_connected.insert(peer_config.clone()) {
523 tracing::error!(
524 "Node {peer_config} connected. Total nodes connected: {}",
525 self.nodes_connected.len()
526 );
527 }
528
529 if self.nodes_connected.len() as u64 * self.config.config.start_threshold.1
531 >= (self.config.config.num_nodes_with_stake.get() as u64)
532 * self.config.config.start_threshold.0
533 {
534 self.accepting_new_keys = false;
535 self.manual_start_allowed = false;
536 self.start = true;
537 }
538
539 Ok(())
540 }
541
542 fn post_manual_start(&mut self, password_bytes: Vec<u8>) -> Result<(), ServerError> {
544 if !self.manual_start_allowed {
545 return Err(ServerError {
546 status: tide_disco::StatusCode::FORBIDDEN,
547 message: "Configs have already been distributed to nodes, and the network can no \
548 longer be started manually."
549 .to_string(),
550 });
551 }
552
553 let password = String::from_utf8(password_bytes)
554 .expect("Failed to decode raw password as UTF-8 string.");
555
556 if self.config.manual_start_password != Some(password) {
558 return Err(ServerError {
559 status: tide_disco::StatusCode::FORBIDDEN,
560 message: "Incorrect password.".to_string(),
561 });
562 }
563
564 let registered_nodes_with_stake = self.config.config.known_nodes_with_stake.len();
565 let registered_da_nodes = self.config.config.known_da_nodes.len();
566
567 if registered_da_nodes > 1 {
568 self.config.config.num_nodes_with_stake =
569 std::num::NonZeroUsize::new(registered_nodes_with_stake)
570 .expect("Failed to convert to NonZeroUsize; this should be impossible.");
571
572 self.config.config.da_staked_committee_size = registered_da_nodes;
573 } else {
574 return Err(ServerError {
575 status: tide_disco::StatusCode::FORBIDDEN,
576 message: format!(
577 "We cannot manually start the network, because we only have \
578 {registered_nodes_with_stake} nodes with stake registered, with \
579 {registered_da_nodes} DA nodes."
580 ),
581 });
582 }
583
584 self.accepting_new_keys = false;
585 self.manual_start_allowed = false;
586 self.peer_pub_ready = true;
587 self.start = true;
588
589 Ok(())
590 }
591
592 fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError> {
594 if metrics.total_transactions_committed != 0 {
595 if self.bench_results.total_transactions_committed == 0 {
597 self.bench_results = metrics;
598 } else {
599 let cur_metrics = self.bench_results.clone();
601 self.bench_results.avg_latency_in_sec = (metrics.avg_latency_in_sec
602 * metrics.num_latency
603 + cur_metrics.avg_latency_in_sec * cur_metrics.num_latency)
604 / (metrics.num_latency + cur_metrics.num_latency);
605 self.bench_results.num_latency += metrics.num_latency;
606 self.bench_results.minimum_latency_in_sec = metrics
607 .minimum_latency_in_sec
608 .min(cur_metrics.minimum_latency_in_sec);
609 self.bench_results.maximum_latency_in_sec = metrics
610 .maximum_latency_in_sec
611 .max(cur_metrics.maximum_latency_in_sec);
612 self.bench_results.throughput_bytes_per_sec = metrics
613 .throughput_bytes_per_sec
614 .max(cur_metrics.throughput_bytes_per_sec);
615 self.bench_results.total_transactions_committed = metrics
616 .total_transactions_committed
617 .max(cur_metrics.total_transactions_committed);
618 self.bench_results.total_time_elapsed_in_sec = metrics
619 .total_time_elapsed_in_sec
620 .max(cur_metrics.total_time_elapsed_in_sec);
621 self.bench_results.total_num_views =
622 metrics.total_num_views.min(cur_metrics.total_num_views);
623 self.bench_results.failed_num_views =
624 metrics.failed_num_views.max(cur_metrics.failed_num_views);
625 }
626 }
627 self.nodes_post_results += 1;
628 if self.bench_results.partial_results == "Unset" {
629 self.bench_results.partial_results = "One".to_string();
630 self.bench_results.printout();
631 self.output_to_csv();
632 }
633 if self.bench_results.partial_results == "One"
634 && self.nodes_post_results >= (self.config.config.da_staked_committee_size as u64 / 2)
635 {
636 self.bench_results.partial_results = "HalfDA".to_string();
637 self.bench_results.printout();
638 self.output_to_csv();
639 }
640 if self.bench_results.partial_results == "HalfDA"
641 && self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64 / 2)
642 {
643 self.bench_results.partial_results = "Half".to_string();
644 self.bench_results.printout();
645 self.output_to_csv();
646 }
647 if self.bench_results.partial_results != "Full"
648 && self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64)
649 {
650 self.bench_results.partial_results = "Full".to_string();
651 self.bench_results.printout();
652 self.output_to_csv();
653 }
654 Ok(())
655 }
656
657 fn post_builder(&mut self, builder: Url) -> Result<(), ServerError> {
658 self.builders.push(builder);
659 Ok(())
660 }
661
662 fn get_builders(&self) -> Result<Vec<Url>, ServerError> {
663 if !matches!(self.config.builder, BuilderType::External)
664 && self.builders.len() != self.config.config.da_staked_committee_size
665 {
666 return Err(ServerError {
667 status: tide_disco::StatusCode::NOT_FOUND,
668 message: "Not all builders are registered yet".to_string(),
669 });
670 }
671 Ok(self.builders.clone())
672 }
673}
674
675#[allow(clippy::too_many_lines)]
677fn define_api<TYPES, State, VER>() -> Result<Api<State, ServerError, VER>, ApiError>
678where
679 TYPES: NodeType,
680 State: 'static + Send + Sync + ReadState + WriteState,
681 <State as ReadState>::State: Send + Sync + OrchestratorApi<TYPES>,
682 TYPES::SignatureKey: serde::Serialize,
683 VER: StaticVersionType + 'static,
684{
685 let api_toml = toml::from_str::<toml::Value>(include_str!(concat!(
686 env!("CARGO_MANIFEST_DIR"),
687 "/api.toml"
688 )))
689 .expect("API file is not valid toml");
690 let mut api = Api::<State, ServerError, VER>::new(api_toml)?;
691 api.post("post_identity", |req, state| {
692 async move {
693 let mut body_bytes = req.body_bytes();
695 body_bytes.drain(..12);
696
697 let Ok((libp2p_address, libp2p_public_key)) =
699 vbs::Serializer::<OrchestratorVersion>::deserialize(&body_bytes)
700 else {
701 return Err(ServerError {
702 status: tide_disco::StatusCode::BAD_REQUEST,
703 message: "Malformed body".to_string(),
704 });
705 };
706
707 state.post_identity(libp2p_address, libp2p_public_key)
709 }
710 .boxed()
711 })?
712 .post("post_getconfig", |req, state| {
713 async move {
714 let node_index = req.integer_param("node_index")?;
715 state.post_getconfig(node_index)
716 }
717 .boxed()
718 })?
719 .post("get_tmp_node_index", |_req, state| {
720 async move { state.get_tmp_node_index() }.boxed()
721 })?
722 .post("post_pubkey", |req, state| {
723 async move {
724 let is_da = req.boolean_param("is_da")?;
725 let mut body_bytes = req.body_bytes();
727 body_bytes.drain(..12);
728
729 let Ok((mut pubkey, libp2p_address, libp2p_public_key)) =
731 vbs::Serializer::<OrchestratorVersion>::deserialize(&body_bytes)
732 else {
733 return Err(ServerError {
734 status: tide_disco::StatusCode::BAD_REQUEST,
735 message: "Malformed body".to_string(),
736 });
737 };
738
739 state.register_public_key(&mut pubkey, is_da, libp2p_address, libp2p_public_key)
740 }
741 .boxed()
742 })?
743 .get("peer_pubconfig_ready", |_req, state| {
744 async move { state.peer_pub_ready() }.boxed()
745 })?
746 .post("post_config_after_peer_collected", |_req, state| {
747 async move { state.post_config_after_peer_collected() }.boxed()
748 })?
749 .post(
750 "post_ready",
751 |req, state: &mut <State as ReadState>::State| {
752 async move {
753 let mut body_bytes = req.body_bytes();
754 body_bytes.drain(..12);
755 let Some(pubkey) = PeerConfig::<TYPES>::from_bytes(&body_bytes) else {
757 return Err(ServerError {
758 status: tide_disco::StatusCode::BAD_REQUEST,
759 message: "Malformed body".to_string(),
760 });
761 };
762 state.post_ready(&pubkey)
763 }
764 .boxed()
765 },
766 )?
767 .post(
768 "post_manual_start",
769 |req, state: &mut <State as ReadState>::State| {
770 async move {
771 let password = req.body_bytes();
772 state.post_manual_start(password)
773 }
774 .boxed()
775 },
776 )?
777 .get("get_start", |_req, state| {
778 async move { state.get_start() }.boxed()
779 })?
780 .post("post_results", |req, state| {
781 async move {
782 let metrics: Result<BenchResults, RequestError> = req.body_json();
783 state.post_run_results(metrics.unwrap())
784 }
785 .boxed()
786 })?
787 .post("post_builder", |req, state| {
788 async move {
789 let mut body_bytes = req.body_bytes();
791 body_bytes.drain(..12);
792
793 let Ok(urls) =
794 vbs::Serializer::<OrchestratorVersion>::deserialize::<Vec<Url>>(&body_bytes)
795 else {
796 return Err(ServerError {
797 status: tide_disco::StatusCode::BAD_REQUEST,
798 message: "Malformed body".to_string(),
799 });
800 };
801
802 let mut futures = urls
803 .into_iter()
804 .map(|url| async {
805 let client: surf_disco::Client<ServerError, OrchestratorVersion> =
806 surf_disco::client::Client::builder(url.clone()).build();
807 if client.connect(Some(Duration::from_secs(2))).await {
808 Some(url)
809 } else {
810 None
811 }
812 })
813 .collect::<FuturesUnordered<_>>()
814 .filter_map(futures::future::ready);
815
816 if let Some(url) = futures.next().await {
817 state.post_builder(url)
818 } else {
819 Err(ServerError {
820 status: tide_disco::StatusCode::BAD_REQUEST,
821 message: "No reachable addresses".to_string(),
822 })
823 }
824 }
825 .boxed()
826 })?
827 .get("get_builders", |_req, state| {
828 async move { state.get_builders() }.boxed()
829 })?;
830 Ok(api)
831}
832
833pub async fn run_orchestrator<TYPES: NodeType>(
839 mut network_config: NetworkConfig<TYPES>,
840 url: Url,
841) -> io::Result<()>
842where
843 TYPES::SignatureKey: 'static + serde::Serialize,
844{
845 let env_password = std::env::var("ORCHESTRATOR_MANUAL_START_PASSWORD");
846
847 if env_password.is_ok() {
848 tracing::warn!(
849 "Took orchestrator manual start password from the environment variable: \
850 ORCHESTRATOR_MANUAL_START_PASSWORD={:?}",
851 env_password
852 );
853 network_config.manual_start_password = env_password.ok();
854 }
855
856 {
859 let env_public_keys = std::env::var("ORCHESTRATOR_PUBLIC_KEYS");
860
861 if let Ok(filepath) = env_public_keys {
862 #[allow(clippy::panic)]
863 let config_file_as_string: String = fs::read_to_string(filepath.clone())
864 .unwrap_or_else(|_| panic!("Could not read config file located at {filepath}"));
865
866 let file: PublicKeysFile<TYPES> =
867 toml::from_str::<PublicKeysFile<TYPES>>(&config_file_as_string)
868 .expect("Unable to convert config file to TOML");
869
870 network_config.public_keys = file.public_keys;
871 }
872 }
873
874 network_config.config.known_nodes_with_stake = network_config
875 .public_keys
876 .iter()
877 .map(|keys| PeerConfig {
878 stake_table_entry: keys
879 .stake_table_key
880 .stake_table_entry(U256::from(keys.stake)),
881 state_ver_key: keys.state_ver_key.clone(),
882 })
883 .collect();
884
885 network_config.config.known_da_nodes = network_config
886 .public_keys
887 .iter()
888 .filter(|keys| keys.da)
889 .map(|keys| PeerConfig {
890 stake_table_entry: keys
891 .stake_table_key
892 .stake_table_entry(U256::from(keys.stake)),
893 state_ver_key: keys.state_ver_key.clone(),
894 })
895 .collect();
896
897 let web_api = define_api().map_err(|_e| io::Error::other("Failed to define api"));
898
899 let state: RwLock<OrchestratorState<TYPES>> =
900 RwLock::new(OrchestratorState::new(network_config));
901
902 let mut app = App::<RwLock<OrchestratorState<TYPES>>, ServerError>::with_state(state);
903 app.register_module::<ServerError, OrchestratorVersion>("api", web_api.unwrap())
904 .expect("Error registering api");
905 tracing::error!("listening on {url:?}");
906 app.serve(url, ORCHESTRATOR_VERSION).await
907}