hotshot_example_types/membership/
fetcher.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6use std::{collections::BTreeMap, sync::Arc};
7
8use alloy::transports::BoxFuture;
9use anyhow::Context;
10use async_broadcast::{Receiver, RecvError};
11use hotshot::traits::NodeImplementation;
12use hotshot_types::{
13    data::{Leaf2, ViewNumber},
14    event::{Event, EventType},
15    message::{Message, MessageKind},
16    traits::{
17        block_contents::BlockHeader,
18        network::ConnectedNetwork,
19        node_implementation::{ConsensusTime, NodeType},
20    },
21    vote::HasViewNumber,
22};
23use tokio::task::JoinHandle;
24use vbs::{bincode_serializer::BincodeSerializer, version::StaticVersion, BinarySerializer};
25
26use crate::storage_types::TestStorage;
27
28pub struct Leaf2Fetcher<TYPES: NodeType> {
29    pub network_functions: NetworkFunctions<TYPES>,
30    pub storage: TestStorage<TYPES>,
31    pub listener: Option<JoinHandle<()>>,
32    pub public_key: TYPES::SignatureKey,
33    pub network_receiver: Option<Receiver<Event<TYPES>>>,
34}
35
36pub type RecvMessageFn =
37    std::sync::Arc<dyn Fn() -> BoxFuture<'static, anyhow::Result<Vec<u8>>> + Send + Sync>;
38
39pub type DirectMessageFn<TYPES> = std::sync::Arc<
40    dyn Fn(
41            ViewNumber,
42            Vec<u8>,
43            <TYPES as NodeType>::SignatureKey,
44        ) -> BoxFuture<'static, anyhow::Result<()>>
45        + Send
46        + Sync,
47>;
48
49#[derive(Clone)]
50pub struct NetworkFunctions<TYPES: NodeType> {
51    direct_message: DirectMessageFn<TYPES>,
52}
53
54pub async fn direct_message_impl<TYPES: NodeType, I: NodeImplementation<TYPES>>(
55    network: Arc<<I as NodeImplementation<TYPES>>::Network>,
56    view: ViewNumber,
57    message: Vec<u8>,
58    recipient: <TYPES as NodeType>::SignatureKey,
59) -> anyhow::Result<()> {
60    network
61        .direct_message(view, message, recipient.clone())
62        .await
63        .context(format!("Failed to send message to recipient {recipient}"))
64}
65
66pub fn direct_message_fn<TYPES: NodeType, I: NodeImplementation<TYPES>>(
67    network: Arc<<I as NodeImplementation<TYPES>>::Network>,
68) -> DirectMessageFn<TYPES> {
69    Arc::new(move |view, message, recipient| {
70        let network = network.clone();
71        Box::pin(direct_message_impl::<TYPES, I>(
72            network, view, message, recipient,
73        ))
74    })
75}
76
77pub fn network_functions<TYPES: NodeType, I: NodeImplementation<TYPES>>(
78    network: Arc<<I as NodeImplementation<TYPES>>::Network>,
79) -> NetworkFunctions<TYPES> {
80    let direct_message = direct_message_fn::<TYPES, I>(network.clone());
81
82    NetworkFunctions { direct_message }
83}
84
85impl<TYPES: NodeType> Leaf2Fetcher<TYPES> {
86    pub fn new<I: NodeImplementation<TYPES>>(
87        network: Arc<<I as NodeImplementation<TYPES>>::Network>,
88        storage: TestStorage<TYPES>,
89        public_key: TYPES::SignatureKey,
90    ) -> Self {
91        let listener = None;
92
93        let network_functions: NetworkFunctions<TYPES> = network_functions::<TYPES, I>(network);
94        Self {
95            network_functions,
96            storage,
97            listener,
98            public_key,
99            network_receiver: None,
100        }
101    }
102
103    pub fn set_external_channel(&mut self, mut network_receiver: Receiver<Event<TYPES>>) {
104        let public_key = self.public_key.clone();
105        let storage = self.storage.clone();
106        let network_functions = self.network_functions.clone();
107
108        self.network_receiver = Some(network_receiver.clone());
109
110        let listener = tokio::spawn(async move {
111            loop {
112                match network_receiver.recv_direct().await {
113                    Ok(Event {
114                        view_number: view,
115                        event: EventType::ExternalMessageReceived { sender: _, data },
116                    }) => {
117                        let (requested_height, requester): (u64, TYPES::SignatureKey) =
118                            match bincode::deserialize(&data) {
119                                Ok(message) => message,
120                                Err(e) => {
121                                    tracing::debug!("Failed to deserialize message: {e:?}");
122                                    continue;
123                                },
124                            };
125
126                        let leaves: BTreeMap<u64, Leaf2<TYPES>> = storage
127                            .inner
128                            .read()
129                            .await
130                            .proposals_wrapper
131                            .values()
132                            .map(|proposal| {
133                                (
134                                    proposal.data.block_header().block_number(),
135                                    Leaf2::from_quorum_proposal(&proposal.data.clone()),
136                                )
137                            })
138                            .collect();
139
140                        let heights = leaves.keys().collect::<Vec<_>>();
141
142                        let Some(leaf) = leaves.get(&requested_height) else {
143                            tracing::error!(
144                                "Block at height {requested_height} not found in storage.\n\n \
145                                 stored leaf heights: {heights:?}"
146                            );
147                            continue;
148                        };
149
150                        let leaf_response = Message {
151                            sender: public_key.clone(),
152                            kind: MessageKind::<TYPES>::External(
153                                bincode::serialize(&leaf).expect("Failed to serialize leaf"),
154                            ),
155                        };
156
157                        let serialized_leaf_response =
158                            BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_response)
159                                .expect("Failed to serialize leaf response");
160
161                        if let Err(e) = (network_functions.direct_message)(
162                            view.u64().into(),
163                            serialized_leaf_response,
164                            requester,
165                        )
166                        .await
167                        {
168                            tracing::error!(
169                                "Failed to send leaf response in test membership fetcher: {e}, \
170                                 requested height: {requested_height}"
171                            );
172                        };
173                    },
174                    Err(RecvError::Closed) => {
175                        break;
176                    },
177                    _ => {
178                        continue;
179                    },
180                }
181            }
182        });
183
184        self.listener = Some(listener);
185    }
186
187    pub async fn fetch_leaf(
188        &self,
189        height: u64,
190        source: TYPES::SignatureKey,
191    ) -> anyhow::Result<Leaf2<TYPES>> {
192        let leaf_request = Message {
193            sender: self.public_key.clone(),
194            kind: MessageKind::<TYPES>::External(
195                bincode::serialize(&(height, self.public_key.clone()))
196                    .expect("Failed to serialize leaf request"),
197            ),
198        };
199        let view = leaf_request.view_number();
200
201        let leaves: BTreeMap<u64, Leaf2<TYPES>> = self
202            .storage
203            .inner
204            .read()
205            .await
206            .proposals_wrapper
207            .values()
208            .map(|proposal| {
209                (
210                    proposal.data.block_header().block_number(),
211                    Leaf2::from_quorum_proposal(&proposal.data.clone()),
212                )
213            })
214            .collect();
215
216        let heights = leaves.keys().collect::<Vec<_>>();
217
218        if let Some(leaf) = leaves.get(&height) {
219            return Ok(leaf.clone());
220        };
221        tracing::debug!(
222            "Leaf at height {height} not found in storage. Stored leaf heights: {heights:?}"
223        );
224
225        let mut network_receiver = self
226            .network_receiver
227            .clone()
228            .expect("Tried to fetch leaf before calling `set_external_channel`");
229
230        let serialized_leaf_request =
231            BincodeSerializer::<StaticVersion<0, 0>>::serialize(&leaf_request)
232                .expect("Failed to serialize leaf request");
233
234        if let Err(e) = (self.network_functions.direct_message)(
235            view.u64().into(),
236            serialized_leaf_request,
237            source,
238        )
239        .await
240        {
241            tracing::error!("Failed to send leaf request in test membership fetcher: {e}");
242        };
243
244        tokio::time::timeout(std::time::Duration::from_millis(20), async {
245            loop {
246                match network_receiver.recv_direct().await {
247                    Ok(Event {
248                        view_number: _,
249                        event: EventType::ExternalMessageReceived { sender: _, data },
250                    }) => {
251                        let leaf: Leaf2<TYPES> = match bincode::deserialize(&data) {
252                            Ok(message) => message,
253                            Err(e) => {
254                                tracing::debug!("Failed to deserialize message: {e:?}");
255                                continue;
256                            },
257                        };
258
259                        if leaf.height() == height {
260                            return Ok(leaf);
261                        }
262                    },
263                    Err(RecvError::Closed) => {
264                        break Err(anyhow::anyhow!(
265                            "Failed to fetch leaf: network task receiver closed"
266                        ));
267                    },
268                    _ => {
269                        continue;
270                    },
271                }
272            }
273        })
274        .await
275        .context("Leaf fetch timed out")?
276    }
277}