hotshot_example_types/membership/
fetcher.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6use std::{collections::BTreeMap, sync::Arc};
7
8use alloy::transports::BoxFuture;
9use anyhow::Context;
10use async_broadcast::{Receiver, RecvError};
11use hotshot::traits::NodeImplementation;
12use hotshot_types::{
13    data::{Leaf2, ViewNumber},
14    event::{Event, EventType},
15    message::{Message, MessageKind},
16    traits::{
17        block_contents::BlockHeader, network::ConnectedNetwork, node_implementation::NodeType,
18    },
19    vote::HasViewNumber,
20};
21use tokio::task::JoinHandle;
22use vbs::{BinarySerializer, bincode_serializer::BincodeSerializer, version::StaticVersion};
23
24use crate::storage_types::TestStorage;
25
26pub struct Leaf2Fetcher<TYPES: NodeType> {
27    pub network_functions: NetworkFunctions<TYPES>,
28    pub storage: TestStorage<TYPES>,
29    pub listener: Option<JoinHandle<()>>,
30    pub public_key: TYPES::SignatureKey,
31    pub network_receiver: Option<Receiver<Event<TYPES>>>,
32}
33
34pub type RecvMessageFn =
35    std::sync::Arc<dyn Fn() -> BoxFuture<'static, anyhow::Result<Vec<u8>>> + Send + Sync>;
36
37pub type DirectMessageFn<TYPES> = std::sync::Arc<
38    dyn Fn(
39            ViewNumber,
40            Vec<u8>,
41            <TYPES as NodeType>::SignatureKey,
42        ) -> BoxFuture<'static, anyhow::Result<()>>
43        + Send
44        + Sync,
45>;
46
47#[derive(Clone)]
48pub struct NetworkFunctions<TYPES: NodeType> {
49    direct_message: DirectMessageFn<TYPES>,
50}
51
52pub async fn direct_message_impl<TYPES: NodeType, I: NodeImplementation<TYPES>>(
53    network: Arc<<I as NodeImplementation<TYPES>>::Network>,
54    view: ViewNumber,
55    message: Vec<u8>,
56    recipient: <TYPES as NodeType>::SignatureKey,
57) -> anyhow::Result<()> {
58    network
59        .direct_message(view, message, recipient.clone())
60        .await
61        .context(format!("Failed to send message to recipient {recipient}"))
62}
63
64pub fn direct_message_fn<TYPES: NodeType, I: NodeImplementation<TYPES>>(
65    network: Arc<<I as NodeImplementation<TYPES>>::Network>,
66) -> DirectMessageFn<TYPES> {
67    Arc::new(move |view, message, recipient| {
68        let network = network.clone();
69        Box::pin(direct_message_impl::<TYPES, I>(
70            network, view, message, recipient,
71        ))
72    })
73}
74
75pub fn network_functions<TYPES: NodeType, I: NodeImplementation<TYPES>>(
76    network: Arc<<I as NodeImplementation<TYPES>>::Network>,
77) -> NetworkFunctions<TYPES> {
78    let direct_message = direct_message_fn::<TYPES, I>(network.clone());
79
80    NetworkFunctions { direct_message }
81}
82
83impl<TYPES: NodeType> Leaf2Fetcher<TYPES> {
84    pub fn new<I: NodeImplementation<TYPES>>(
85        network: Arc<<I as NodeImplementation<TYPES>>::Network>,
86        storage: TestStorage<TYPES>,
87        public_key: TYPES::SignatureKey,
88    ) -> Self {
89        let listener = None;
90
91        let network_functions: NetworkFunctions<TYPES> = network_functions::<TYPES, I>(network);
92        Self {
93            network_functions,
94            storage,
95            listener,
96            public_key,
97            network_receiver: None,
98        }
99    }
100
101    pub fn set_external_channel(&mut self, mut network_receiver: Receiver<Event<TYPES>>) {
102        let public_key = self.public_key.clone();
103        let storage = self.storage.clone();
104        let network_functions = self.network_functions.clone();
105
106        self.network_receiver = Some(network_receiver.clone());
107
108        let listener = tokio::spawn(async move {
109            loop {
110                match network_receiver.recv_direct().await {
111                    Ok(Event {
112                        view_number: view,
113                        event: EventType::ExternalMessageReceived { sender: _, data },
114                    }) => {
115                        let (requested_height, requester): (u64, TYPES::SignatureKey) =
116                            match bincode::deserialize(&data) {
117                                Ok(message) => message,
118                                Err(e) => {
119                                    tracing::debug!("Failed to deserialize message: {e:?}");
120                                    continue;
121                                },
122                            };
123
124                        let leaves: BTreeMap<u64, Leaf2<TYPES>> = storage
125                            .inner
126                            .read()
127                            .await
128                            .proposals_wrapper
129                            .values()
130                            .map(|proposal| {
131                                (
132                                    proposal.data.block_header().block_number(),
133                                    Leaf2::from_quorum_proposal(&proposal.data.clone()),
134                                )
135                            })
136                            .collect();
137
138                        let heights = leaves.keys().collect::<Vec<_>>();
139
140                        let Some(leaf) = leaves.get(&requested_height) else {
141                            tracing::error!(
142                                "Block at height {requested_height} not found in storage.\n\n \
143                                 stored leaf heights: {heights:?}"
144                            );
145                            continue;
146                        };
147
148                        let leaf_response = Message {
149                            sender: public_key.clone(),
150                            kind: MessageKind::<TYPES>::External(
151                                bincode::serialize(&leaf).expect("Failed to serialize leaf"),
152                            ),
153                        };
154
155                        let serialized_leaf_response =
156                            BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_response)
157                                .expect("Failed to serialize leaf response");
158
159                        if let Err(e) = (network_functions.direct_message)(
160                            view.u64().into(),
161                            serialized_leaf_response,
162                            requester,
163                        )
164                        .await
165                        {
166                            tracing::error!(
167                                "Failed to send leaf response in test membership fetcher: {e}, \
168                                 requested height: {requested_height}"
169                            );
170                        };
171                    },
172                    Err(RecvError::Closed) => {
173                        break;
174                    },
175                    _ => {
176                        continue;
177                    },
178                }
179            }
180        });
181
182        self.listener = Some(listener);
183    }
184
185    pub async fn fetch_leaf(
186        &self,
187        height: u64,
188        source: TYPES::SignatureKey,
189    ) -> anyhow::Result<Leaf2<TYPES>> {
190        let leaf_request = Message {
191            sender: self.public_key.clone(),
192            kind: MessageKind::<TYPES>::External(
193                bincode::serialize(&(height, self.public_key.clone()))
194                    .expect("Failed to serialize leaf request"),
195            ),
196        };
197        let view = leaf_request.view_number();
198
199        let leaves: BTreeMap<u64, Leaf2<TYPES>> = self
200            .storage
201            .inner
202            .read()
203            .await
204            .proposals_wrapper
205            .values()
206            .map(|proposal| {
207                (
208                    proposal.data.block_header().block_number(),
209                    Leaf2::from_quorum_proposal(&proposal.data.clone()),
210                )
211            })
212            .collect();
213
214        let heights = leaves.keys().collect::<Vec<_>>();
215
216        if let Some(leaf) = leaves.get(&height) {
217            return Ok(leaf.clone());
218        };
219        tracing::debug!(
220            "Leaf at height {height} not found in storage. Stored leaf heights: {heights:?}"
221        );
222
223        let mut network_receiver = self
224            .network_receiver
225            .clone()
226            .expect("Tried to fetch leaf before calling `set_external_channel`");
227
228        let serialized_leaf_request =
229            BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_request)
230                .expect("Failed to serialize leaf request");
231
232        if let Err(e) = (self.network_functions.direct_message)(
233            view.u64().into(),
234            serialized_leaf_request,
235            source,
236        )
237        .await
238        {
239            tracing::error!("Failed to send leaf request in test membership fetcher: {e}");
240        };
241
242        tokio::time::timeout(std::time::Duration::from_millis(20), async {
243            loop {
244                match network_receiver.recv_direct().await {
245                    Ok(Event {
246                        view_number: _,
247                        event: EventType::ExternalMessageReceived { sender: _, data },
248                    }) => {
249                        let leaf: Leaf2<TYPES> = match bincode::deserialize(&data) {
250                            Ok(message) => message,
251                            Err(e) => {
252                                tracing::debug!("Failed to deserialize message: {e:?}");
253                                continue;
254                            },
255                        };
256
257                        if leaf.height() == height {
258                            return Ok(leaf);
259                        }
260                    },
261                    Err(RecvError::Closed) => {
262                        break Err(anyhow::anyhow!(
263                            "Failed to fetch leaf: network task receiver closed"
264                        ));
265                    },
266                    _ => {
267                        continue;
268                    },
269                }
270            }
271        })
272        .await
273        .context("Leaf fetch timed out")?
274    }
275}