hotshot_example_types/membership/
fetcher.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6use std::{collections::BTreeMap, sync::Arc};
7
8use alloy::transports::BoxFuture;
9use anyhow::Context;
10use async_broadcast::{Receiver, RecvError};
11use hotshot::traits::NodeImplementation;
12use hotshot_types::{
13    data::Leaf2,
14    event::{Event, EventType},
15    message::{Message, MessageKind},
16    traits::{
17        block_contents::BlockHeader, network::ConnectedNetwork, node_implementation::NodeType,
18    },
19};
20use tokio::task::JoinHandle;
21use vbs::{bincode_serializer::BincodeSerializer, version::StaticVersion, BinarySerializer};
22
23use crate::storage_types::TestStorage;
24
25pub struct Leaf2Fetcher<TYPES: NodeType> {
26    pub network_functions: NetworkFunctions<TYPES>,
27    pub storage: TestStorage<TYPES>,
28    pub listener: Option<JoinHandle<()>>,
29    pub public_key: TYPES::SignatureKey,
30    pub network_receiver: Option<Receiver<Event<TYPES>>>,
31}
32
33pub type RecvMessageFn =
34    std::sync::Arc<dyn Fn() -> BoxFuture<'static, anyhow::Result<Vec<u8>>> + Send + Sync>;
35
36pub type DirectMessageFn<TYPES> = std::sync::Arc<
37    dyn Fn(Vec<u8>, <TYPES as NodeType>::SignatureKey) -> BoxFuture<'static, anyhow::Result<()>>
38        + Send
39        + Sync,
40>;
41
42#[derive(Clone)]
43pub struct NetworkFunctions<TYPES: NodeType> {
44    direct_message: DirectMessageFn<TYPES>,
45}
46
47pub async fn direct_message_impl<TYPES: NodeType, I: NodeImplementation<TYPES>>(
48    network: Arc<<I as NodeImplementation<TYPES>>::Network>,
49    message: Vec<u8>,
50    recipient: <TYPES as NodeType>::SignatureKey,
51) -> anyhow::Result<()> {
52    network
53        .direct_message(message, recipient.clone())
54        .await
55        .context(format!("Failed to send message to recipient {recipient}"))
56}
57
58pub fn direct_message_fn<TYPES: NodeType, I: NodeImplementation<TYPES>>(
59    network: Arc<<I as NodeImplementation<TYPES>>::Network>,
60) -> DirectMessageFn<TYPES> {
61    Arc::new(move |message, recipient| {
62        let network = network.clone();
63        Box::pin(direct_message_impl::<TYPES, I>(network, message, recipient))
64    })
65}
66
67pub fn network_functions<TYPES: NodeType, I: NodeImplementation<TYPES>>(
68    network: Arc<<I as NodeImplementation<TYPES>>::Network>,
69) -> NetworkFunctions<TYPES> {
70    let direct_message = direct_message_fn::<TYPES, I>(network.clone());
71
72    NetworkFunctions { direct_message }
73}
74
75impl<TYPES: NodeType> Leaf2Fetcher<TYPES> {
76    pub fn new<I: NodeImplementation<TYPES>>(
77        network: Arc<<I as NodeImplementation<TYPES>>::Network>,
78        storage: TestStorage<TYPES>,
79        public_key: TYPES::SignatureKey,
80    ) -> Self {
81        let listener = None;
82
83        let network_functions: NetworkFunctions<TYPES> = network_functions::<TYPES, I>(network);
84        Self {
85            network_functions,
86            storage,
87            listener,
88            public_key,
89            network_receiver: None,
90        }
91    }
92
93    pub fn set_external_channel(&mut self, mut network_receiver: Receiver<Event<TYPES>>) {
94        let public_key = self.public_key.clone();
95        let storage = self.storage.clone();
96        let network_functions = self.network_functions.clone();
97
98        self.network_receiver = Some(network_receiver.clone());
99
100        let listener = tokio::spawn(async move {
101            loop {
102                match network_receiver.recv_direct().await {
103                    Ok(Event {
104                        view_number: _,
105                        event: EventType::ExternalMessageReceived { sender: _, data },
106                    }) => {
107                        let (requested_height, requester): (u64, TYPES::SignatureKey) =
108                            match bincode::deserialize(&data) {
109                                Ok(message) => message,
110                                Err(e) => {
111                                    tracing::debug!("Failed to deserialize message: {e:?}");
112                                    continue;
113                                },
114                            };
115
116                        let leaves: BTreeMap<u64, Leaf2<TYPES>> = storage
117                            .inner
118                            .read()
119                            .await
120                            .proposals_wrapper
121                            .values()
122                            .map(|proposal| {
123                                (
124                                    proposal.data.block_header().block_number(),
125                                    Leaf2::from_quorum_proposal(&proposal.data.clone()),
126                                )
127                            })
128                            .collect();
129
130                        let heights = leaves.keys().collect::<Vec<_>>();
131
132                        let Some(leaf) = leaves.get(&requested_height) else {
133                            tracing::error!(
134                                "Block at height {requested_height} not found in storage.\n\n \
135                                 stored leaf heights: {heights:?}"
136                            );
137                            continue;
138                        };
139
140                        let leaf_response = Message {
141                            sender: public_key.clone(),
142                            kind: MessageKind::<TYPES>::External(
143                                bincode::serialize(&leaf).expect("Failed to serialize leaf"),
144                            ),
145                        };
146
147                        let serialized_leaf_response =
148                            BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_response)
149                                .expect("Failed to serialize leaf response");
150
151                        if let Err(e) =
152                            (network_functions.direct_message)(serialized_leaf_response, requester)
153                                .await
154                        {
155                            tracing::error!(
156                                "Failed to send leaf response in test membership fetcher: {e}, \
157                                 requested height: {requested_height}"
158                            );
159                        };
160                    },
161                    Err(RecvError::Closed) => {
162                        break;
163                    },
164                    _ => {
165                        continue;
166                    },
167                }
168            }
169        });
170
171        self.listener = Some(listener);
172    }
173
174    pub async fn fetch_leaf(
175        &self,
176        height: u64,
177        source: TYPES::SignatureKey,
178    ) -> anyhow::Result<Leaf2<TYPES>> {
179        let leaf_request = Message {
180            sender: self.public_key.clone(),
181            kind: MessageKind::<TYPES>::External(
182                bincode::serialize(&(height, self.public_key.clone()))
183                    .expect("Failed to serialize leaf request"),
184            ),
185        };
186
187        let leaves: BTreeMap<u64, Leaf2<TYPES>> = self
188            .storage
189            .inner
190            .read()
191            .await
192            .proposals_wrapper
193            .values()
194            .map(|proposal| {
195                (
196                    proposal.data.block_header().block_number(),
197                    Leaf2::from_quorum_proposal(&proposal.data.clone()),
198                )
199            })
200            .collect();
201
202        let heights = leaves.keys().collect::<Vec<_>>();
203
204        if let Some(leaf) = leaves.get(&height) {
205            return Ok(leaf.clone());
206        };
207        tracing::debug!(
208            "Leaf at height {height} not found in storage. Stored leaf heights: {heights:?}"
209        );
210
211        let mut network_receiver = self
212            .network_receiver
213            .clone()
214            .expect("Tried to fetch leaf before calling `set_external_channel`");
215
216        let serialized_leaf_request =
217            BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_request)
218                .expect("Failed to serialize leaf request");
219
220        if let Err(e) =
221            (self.network_functions.direct_message)(serialized_leaf_request, source).await
222        {
223            tracing::error!("Failed to send leaf request in test membership fetcher: {e}");
224        };
225
226        tokio::time::timeout(std::time::Duration::from_millis(20), async {
227            loop {
228                match network_receiver.recv_direct().await {
229                    Ok(Event {
230                        view_number: _,
231                        event: EventType::ExternalMessageReceived { sender: _, data },
232                    }) => {
233                        let leaf: Leaf2<TYPES> = match bincode::deserialize(&data) {
234                            Ok(message) => message,
235                            Err(e) => {
236                                tracing::debug!("Failed to deserialize message: {e:?}");
237                                continue;
238                            },
239                        };
240
241                        if leaf.height() == height {
242                            return Ok(leaf);
243                        }
244                    },
245                    Err(RecvError::Closed) => {
246                        break Err(anyhow::anyhow!(
247                            "Failed to fetch leaf: network task receiver closed"
248                        ));
249                    },
250                    _ => {
251                        continue;
252                    },
253                }
254            }
255        })
256        .await
257        .context("Leaf fetch timed out")?
258    }
259}