saito_rust/
network_controller.rs

1use std::collections::{HashMap, HashSet};
2use std::fs;
3use std::net::SocketAddr;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::stream::{SplitSink, SplitStream};
9use futures::{SinkExt, StreamExt};
10use log::{debug, error, info, trace, warn};
11use reqwest::Client;
12use tokio::fs::File;
13use tokio::io::AsyncReadExt;
14use tokio::net::TcpStream;
15use tokio::select;
16use tokio::sync::mpsc::{Receiver, Sender};
17use tokio::sync::{Mutex, RwLock};
18use tokio::task::JoinHandle;
19use tokio::time::Instant;
20use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream};
21use warp::http::StatusCode;
22use warp::ws::WebSocket;
23use warp::Filter;
24
25use saito_core::core::consensus::block::{Block, BlockType};
26use saito_core::core::consensus::blockchain::Blockchain;
27use saito_core::core::consensus::peers::peer_collection::PeerCollection;
28use saito_core::core::defs::{
29    BlockId, PeerIndex, PrintForLog, SaitoHash, SaitoPublicKey, StatVariable, BLOCK_FILE_EXTENSION,
30    STAT_BIN_COUNT,
31};
32use saito_core::core::io::network::PeerDisconnectType;
33use saito_core::core::io::network_event::NetworkEvent;
34use saito_core::core::process::keep_time::Timer;
35use saito_core::core::util::configuration::Configuration;
36
37use crate::io_event::IoEvent;
38use crate::rust_io_handler::BLOCKS_DIR_PATH;
39
40// use crate::{IoEvent, NetworkEvent, TimeKeeper};
41
42type SocketSender = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>;
43type SocketReceiver = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
44
45pub struct NetworkController {
46    sockets: Arc<Mutex<HashMap<u64, PeerSender>>>,
47    currently_queried_urls: Arc<Mutex<HashSet<String>>>,
48    pub sender_to_saito_controller: Sender<IoEvent>,
49}
50
51impl NetworkController {
52    pub async fn send(connection: &mut PeerSender, peer_index: u64, buffer: Vec<u8>) -> bool {
53        let mut send_failed = false;
54        // TODO : can be better optimized if we buffer the messages and flush once per timer event
55        match connection {
56            PeerSender::Warp(sender) => {
57                if let Err(error) = sender.send(warp::ws::Message::binary(buffer)).await {
58                    error!(
59                        "Error sending message, Peer Index = {:?}, Reason {:?}",
60                        peer_index, error
61                    );
62
63                    send_failed = true;
64                }
65            }
66            PeerSender::Tungstenite(sender) => {
67                if let Err(error) = sender
68                    .send(tokio_tungstenite::tungstenite::Message::Binary(buffer))
69                    .await
70                {
71                    error!(
72                        "Error sending message, Peer Index = {:?}, Reason {:?}",
73                        peer_index, error
74                    );
75                    send_failed = true;
76                }
77            }
78        }
79
80        !send_failed
81    }
82
83    pub async fn send_outgoing_message(
84        sockets: Arc<Mutex<HashMap<u64, PeerSender>>>,
85        peer_index: u64,
86        buffer: Vec<u8>,
87    ) {
88        let buf_len = buffer.len();
89
90        let mut sockets = sockets.lock().await;
91        let socket = sockets.get_mut(&peer_index);
92        if socket.is_none() {
93            error!(
94                "Cannot find the corresponding sender socket, Peer Index : {:?}",
95                peer_index
96            );
97            return;
98        }
99
100        let socket = socket.unwrap();
101
102        if !Self::send(socket, peer_index, buffer).await {
103            warn!(
104                "failed sending buffer of size : {:?} to peer : {:?}",
105                buf_len, peer_index
106            );
107            sockets.remove(&peer_index);
108        }
109    }
110
111    pub async fn connect_to_peer(
112        event_id: u64,
113        io_controller: Arc<RwLock<NetworkController>>,
114        url: String,
115        peer_index: PeerIndex,
116    ) {
117        // TODO : handle connecting to an already connected (via incoming connection) node.
118
119        debug!("connecting to peer : {:?}", url);
120
121        let result = connect_async(url.clone()).await;
122        if result.is_ok() {
123            let result = result.unwrap();
124            let socket: WebSocketStream<MaybeTlsStream<TcpStream>> = result.0;
125
126            let ip = match socket.get_ref() {
127                MaybeTlsStream::NativeTls(s) => {
128                    if let Ok(socket_address) = s.get_ref().get_ref().get_ref().peer_addr() {
129                        Some(socket_address.ip().to_string())
130                    } else {
131                        None
132                    }
133                }
134                MaybeTlsStream::Plain(t) => {
135                    if let Ok(socket_address) = t.peer_addr() {
136                        Some(socket_address.ip().to_string())
137                    } else {
138                        None
139                    }
140                }
141                _ => None,
142            };
143
144            let network_controller = io_controller.read().await;
145
146            let sender_to_controller = network_controller.sender_to_saito_controller.clone();
147            let (socket_sender, socket_receiver): (SocketSender, SocketReceiver) = socket.split();
148
149            info!(
150                "connected to peer : {:?} with index : {:?}",
151                url, peer_index
152            );
153
154            NetworkController::send_new_peer(
155                event_id,
156                peer_index,
157                ip,
158                network_controller.sockets.clone(),
159                PeerSender::Tungstenite(socket_sender),
160                PeerReceiver::Tungstenite(socket_receiver),
161                sender_to_controller,
162            )
163            .await;
164        } else {
165            warn!(
166                "failed connecting to : {:?}, reason {:?}",
167                url,
168                result.err().unwrap()
169            );
170        }
171    }
172
173    pub async fn send_to_all(
174        sockets: Arc<Mutex<HashMap<u64, PeerSender>>>,
175        buffer: Vec<u8>,
176        exceptions: Vec<u64>,
177    ) {
178        trace!("sending buffer of size : {:?} to all", buffer.len());
179        let mut sockets = sockets.lock().await;
180        let mut peers_with_errors: Vec<u64> = Default::default();
181
182        for entry in sockets.iter_mut() {
183            let peer_index = entry.0;
184            if exceptions.contains(peer_index) {
185                continue;
186            }
187            let socket = entry.1;
188
189            trace!(
190                "sending buffer of size : {:?} to peer : {:?}",
191                buffer.len(),
192                peer_index
193            );
194            if !Self::send(socket, *peer_index, buffer.clone()).await {
195                warn!(
196                    "failed sending buffer of size : {:?} to peer : {:?}",
197                    buffer.len(),
198                    peer_index
199                );
200                peers_with_errors.push(*peer_index)
201            }
202        }
203
204        for peer in peers_with_errors {
205            sockets.remove(&peer);
206        }
207
208        // trace!("message sent to all");
209    }
210    pub async fn fetch_block(
211        block_hash: SaitoHash,
212        peer_index: u64,
213        url: String,
214        event_id: u64,
215        sender_to_core: Sender<IoEvent>,
216        current_queries: Arc<Mutex<HashSet<String>>>,
217        client: Client,
218        block_id: BlockId,
219    ) {
220        debug!("fetching block : {:?}", url);
221
222        {
223            // since the block sizes can be large, we need to make sure same block is not fetched multiple times before first fetch finishes.
224            let mut queries = current_queries.lock().await;
225            if queries.contains(&url) {
226                debug!("url : {:?} is already being fetched", url);
227                return;
228            }
229            queries.insert(url.clone());
230        }
231        let block_fetch_timeout_in_ms = 10_000;
232        let result = client
233            .get(url.clone())
234            .timeout(Duration::from_millis(block_fetch_timeout_in_ms))
235            .send()
236            .await;
237        if result.is_err() {
238            // TODO : should we retry here?
239            warn!("failed fetching : {:?}", url);
240            let mut queries = current_queries.lock().await;
241            queries.remove(&url);
242            sender_to_core
243                .send(IoEvent {
244                    event_processor_id: 1,
245                    event_id,
246                    event: NetworkEvent::BlockFetchFailed {
247                        block_hash,
248                        peer_index,
249                        block_id,
250                    },
251                })
252                .await
253                .unwrap();
254            return;
255        }
256        let response = result.unwrap();
257        if !matches!(response.status(), StatusCode::OK) {
258            warn!(
259                "failed fetching block : {:?}, with error code : {:?} from url : {:?}",
260                block_hash.to_hex(),
261                response.status(),
262                url
263            );
264            let mut queries = current_queries.lock().await;
265            queries.remove(&url);
266            sender_to_core
267                .send(IoEvent {
268                    event_processor_id: 1,
269                    event_id,
270                    event: NetworkEvent::BlockFetchFailed {
271                        block_hash,
272                        peer_index,
273                        block_id,
274                    },
275                })
276                .await
277                .unwrap();
278            return;
279        }
280        let result = response.bytes().await;
281        if result.is_err() {
282            warn!("failed getting byte buffer from fetching block : {:?}", url);
283            let mut queries = current_queries.lock().await;
284            queries.remove(&url);
285            sender_to_core
286                .send(IoEvent {
287                    event_processor_id: 1,
288                    event_id,
289                    event: NetworkEvent::BlockFetchFailed {
290                        block_hash,
291                        peer_index,
292                        block_id,
293                    },
294                })
295                .await
296                .unwrap();
297            return;
298        }
299        let result = result.unwrap();
300        let buffer = result.to_vec();
301
302        debug!(
303            "block buffer received with size : {:?} for url : {:?}",
304            buffer.len(),
305            url
306        );
307        // RustIOHandler::set_event_response(event_id, FutureState::BlockFetched(block));
308        sender_to_core
309            .send(IoEvent {
310                event_processor_id: 1,
311                event_id,
312                event: NetworkEvent::BlockFetched {
313                    block_hash,
314                    block_id,
315                    peer_index,
316                    buffer,
317                },
318            })
319            .await
320            .unwrap();
321        {
322            // since we have already fetched the block, we will remove it from the set.
323            let mut queries = current_queries.lock().await;
324            queries.remove(&url);
325        }
326        // debug!("block buffer sent to blockchain controller");
327    }
328    pub async fn send_new_peer(
329        event_id: u64,
330        peer_index: u64,
331        ip: Option<String>,
332        sockets: Arc<Mutex<HashMap<u64, PeerSender>>>,
333        sender: PeerSender,
334        receiver: PeerReceiver,
335        sender_to_core: Sender<IoEvent>,
336    ) {
337        {
338            sockets.lock().await.insert(peer_index, sender);
339        }
340
341        debug!("sending new peer : {:?}", peer_index);
342
343        sender_to_core
344            .send(IoEvent {
345                event_processor_id: 1,
346                event_id,
347                event: NetworkEvent::PeerConnectionResult {
348                    result: Ok((peer_index, ip)),
349                },
350            })
351            .await
352            .expect("sending failed");
353
354        NetworkController::receive_message_from_peer(
355            receiver,
356            sender_to_core.clone(),
357            peer_index,
358            sockets,
359        )
360        .await;
361    }
362
363    pub async fn disconnect_socket(
364        sockets: Arc<Mutex<HashMap<u64, PeerSender>>>,
365        peer_index: PeerIndex,
366        sender_to_core: Sender<IoEvent>,
367    ) {
368        info!("disconnect peer : {:?}", peer_index);
369        let mut sockets = sockets.lock().await;
370        let socket = sockets.remove(&peer_index);
371        if let Some(_socket) = socket {
372            // TODO : disconnect the socket here
373        }
374
375        sender_to_core
376            .send(IoEvent {
377                event_processor_id: 1,
378                event_id: 0,
379                event: NetworkEvent::PeerDisconnected {
380                    peer_index,
381                    disconnect_type: PeerDisconnectType::InternalDisconnect,
382                },
383            })
384            .await
385            .expect("sending failed");
386    }
387    pub async fn receive_message_from_peer(
388        receiver: PeerReceiver,
389        sender: Sender<IoEvent>,
390        peer_index: u64,
391        sockets: Arc<Mutex<HashMap<u64, PeerSender>>>,
392    ) {
393        debug!("starting new task for reading from peer : {:?}", peer_index);
394        tokio::task::Builder::new()
395            .name(format!("saito-peer-receiver-{:?}", peer_index).as_str())
396            .spawn(async move {
397                debug!("new thread started for peer receiving");
398                match receiver {
399                    PeerReceiver::Warp(mut receiver) => loop {
400                        let result = receiver.next().await;
401                        if result.is_none() {
402                            trace!("no message received");
403                            continue;
404                        }
405                        let result = result.unwrap();
406                        if result.is_err() {
407                            // TODO : handle peer disconnections
408                            warn!("failed receiving message [1] : {:?}", result.err().unwrap());
409                            NetworkController::disconnect_socket(sockets, peer_index, sender).await;
410                            break;
411                        }
412                        let result = result.unwrap();
413
414                        if result.is_binary() {
415                            let buffer = result.into_bytes();
416                            trace!("received buffer of size : {:?}", buffer.len());
417
418                            let message = IoEvent {
419                                event_processor_id: 1,
420                                event_id: 0,
421                                event: NetworkEvent::IncomingNetworkMessage { peer_index, buffer },
422                            };
423                            sender.send(message).await.expect("sending failed");
424                        } else if result.is_close() {
425                            warn!("connection closed by remote peer : {:?}", peer_index);
426                            NetworkController::disconnect_socket(sockets, peer_index, sender).await;
427                            break;
428                        }
429                    },
430                    PeerReceiver::Tungstenite(mut receiver) => loop {
431                        let result = receiver.next().await;
432                        if result.is_none() {
433                            trace!("no message received");
434                            continue;
435                        }
436                        let result = result.unwrap();
437                        if result.is_err() {
438                            warn!("failed receiving message [2] : {:?}", result.err().unwrap());
439                            NetworkController::disconnect_socket(sockets, peer_index, sender).await;
440                            break;
441                        }
442                        let result = result.unwrap();
443                        match result {
444                            tokio_tungstenite::tungstenite::Message::Binary(buffer) => {
445                                trace!("received buffer of size : {:?}", buffer.len());
446                                let message = IoEvent {
447                                    event_processor_id: 1,
448                                    event_id: 0,
449                                    event: NetworkEvent::IncomingNetworkMessage {
450                                        peer_index,
451                                        buffer,
452                                    },
453                                };
454                                sender.send(message).await.expect("sending failed");
455                            }
456                            tokio_tungstenite::tungstenite::Message::Close(_) => {
457                                info!("socket for peer : {:?} was closed", peer_index);
458                                NetworkController::disconnect_socket(sockets, peer_index, sender)
459                                    .await;
460                                break;
461                            }
462                            _ => {
463                                // Not handling these scenarios
464                            }
465                        }
466                    },
467                }
468                debug!("listening thread existed for peer : {:?}", peer_index);
469            })
470            .unwrap();
471    }
472}
473
474///
475///
476/// # Arguments
477///
478/// * `receiver`:
479/// * `sender_to_core`:
480/// * `configs_lock`:
481/// * `blockchain_lock`:
482/// * `sender_to_stat`:
483/// * `peers_lock`:
484/// * `sender_to_network`: sender for this thread. only used for reading performance stats
485///
486/// returns: ()
487///
488/// # Examples
489///
490/// ```
491///
492/// ```
493// TODO : refactor to use ProcessEvent trait
494pub async fn run_network_controller(
495    mut receiver: Receiver<IoEvent>,
496    sender_to_core: Sender<IoEvent>,
497    configs_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
498    blockchain_lock: Arc<RwLock<Blockchain>>,
499    sender_to_stat: Sender<String>,
500    peers_lock: Arc<RwLock<PeerCollection>>,
501    sender_to_network: Sender<IoEvent>,
502    timer: &Timer,
503) -> (JoinHandle<()>, JoinHandle<()>) {
504    info!("running network handler");
505
506    let host;
507    let url;
508    let port;
509    let public_key;
510    {
511        let configs = configs_lock.read().await;
512
513        url = configs.get_server_configs().unwrap().host.clone()
514            + ":"
515            + configs
516                .get_server_configs()
517                .unwrap()
518                .port
519                .to_string()
520                .as_str();
521        port = configs.get_server_configs().unwrap().port;
522        host = configs.get_server_configs().unwrap().host.clone();
523
524        // trace!("locking blockchain 9");
525        let blockchain = blockchain_lock.read().await;
526        let wallet = blockchain.wallet_lock.read().await;
527        public_key = wallet.public_key;
528    }
529    // trace!("releasing blockchain 9");
530
531    info!("starting server on : {:?}", url);
532    let sender_clone = sender_to_core.clone();
533
534    let network_controller_lock = Arc::new(RwLock::new(NetworkController {
535        sockets: Arc::new(Mutex::new(HashMap::new())),
536        sender_to_saito_controller: sender_to_core,
537        currently_queried_urls: Arc::new(Default::default()),
538    }));
539
540    let server_handle = run_websocket_server(
541        sender_clone.clone(),
542        network_controller_lock.clone(),
543        port,
544        host,
545        public_key,
546        peers_lock,
547    );
548
549    let time_keeper = timer.clone();
550
551    let controller_handle = tokio::task::Builder::new()
552        .name("saito-io-controller")
553        .spawn(async move {
554            let mut outgoing_messages = StatVariable::new(
555                "network::outgoing_msgs".to_string(),
556                STAT_BIN_COUNT,
557                sender_to_stat.clone(),
558            );
559            let stat_timer_in_ms;
560            {
561                let configs_temp = configs_lock.read().await;
562                stat_timer_in_ms = configs_temp.get_server_configs().unwrap().stat_timer_in_ms;
563            }
564            let mut stat_interval = tokio::time::interval(Duration::from_millis(stat_timer_in_ms));
565
566            let io_pool = tokio::runtime::Builder::new_multi_thread()
567                .worker_threads(10)
568                .enable_io()
569                .enable_time()
570                .thread_name("saito-io-thread-pool")
571                .build()
572                .unwrap();
573
574            let mut last_stat_on: Instant = Instant::now();
575            loop {
576                select!{
577                    result = receiver.recv()=>{
578                        if result.is_some() {
579                            let event = result.unwrap();
580                            let event_id = event.event_id;
581                            let interface_event = event.event;
582                            match interface_event {
583                                NetworkEvent::OutgoingNetworkMessageForAll { buffer, exceptions } => {
584                                    let sockets;
585                                    {
586                                        let network_controller = network_controller_lock.read().await;
587                                        sockets = network_controller.sockets.clone();
588                                    }
589
590                                    NetworkController::send_to_all(sockets, buffer, exceptions).await;
591                                    outgoing_messages.increment();
592                                }
593                                NetworkEvent::OutgoingNetworkMessage {
594                                    peer_index: index,
595                                    buffer,
596                                } => {
597                                    let sockets;
598                                    {
599                                        let network_controller = network_controller_lock.read().await;
600                                        sockets = network_controller.sockets.clone();
601                                    }
602                                    NetworkController::send_outgoing_message(sockets, index, buffer).await;
603                                    outgoing_messages.increment();
604                                }
605                                NetworkEvent::ConnectToPeer {url,peer_index  } => {
606                                    NetworkController::connect_to_peer(
607                                        event_id,
608                                        network_controller_lock.clone(),
609                                        url,peer_index
610                                    )
611                                    .await;
612                                }
613
614                                NetworkEvent::BlockFetchRequest {
615                                    block_hash,
616                                    peer_index,
617                                    url,
618                                    block_id,
619                                } => {
620                                    let sender;
621                                    let current_queries;
622                                    {
623                                        let network_controller = network_controller_lock.read().await;
624
625                                        sender = network_controller.sender_to_saito_controller.clone();
626                                        current_queries = network_controller.currently_queried_urls.clone();
627                                    }
628                                    // starting new thread to stop io controller from getting blocked
629                                    io_pool.spawn(async move {
630                                        let client = reqwest::Client::new();
631
632                                        NetworkController::fetch_block(
633                                            block_hash,
634                                            peer_index,
635                                            url,
636                                            event_id,
637                                            sender,
638                                            current_queries,
639                                            client,
640                                            block_id,
641                                        )
642                                        .await
643                                    });
644                                }
645
646                                NetworkEvent::DisconnectFromPeer { peer_index } => {
647                                    let sockets;
648                                    let sender;
649                                    {
650                                        let network_controller = network_controller_lock.read().await;
651                                        sockets = network_controller.sockets.clone();
652                                        sender = network_controller.sender_to_saito_controller.clone();
653                                    }
654                                    NetworkController::disconnect_socket(
655                                        sockets,
656                                        peer_index,
657                                        sender
658                                    )
659                                    .await
660                                }
661                                _ => unreachable!()
662                            }
663                        }
664                    }
665                    _ = stat_interval.tick() => {
666                        {
667                            if Instant::now().duration_since(last_stat_on)
668                                > Duration::from_millis(stat_timer_in_ms)
669                            {
670                                last_stat_on = Instant::now();
671                                outgoing_messages
672                                    .calculate_stats(time_keeper.get_timestamp_in_ms())
673                                    .await;
674                                let network_controller = network_controller_lock.read().await;
675
676                                let stat = format!(
677                                    "{} - {} - capacity : {:?} / {:?}",
678                                    StatVariable::format_timestamp(time_keeper.get_timestamp_in_ms()),
679                                    format!("{:width$}", "network::channel_to_core", width = 40),
680                                    network_controller.sender_to_saito_controller.capacity(),
681                                    network_controller.sender_to_saito_controller.max_capacity()
682                                );
683                                sender_to_stat.send(stat).await.unwrap();
684
685                                let stat = format!(
686                                    "{} - {} - capacity : {:?} / {:?}",
687                                    StatVariable::format_timestamp(time_keeper.get_timestamp_in_ms()),
688                                    format!("{:width$}", "network::channel_outgoing", width = 40),
689                                    sender_to_network.capacity(),
690                                    sender_to_network.max_capacity()
691                                );
692                                sender_to_stat.send(stat).await.unwrap();
693                            }
694                        }
695                    }
696                }
697            }
698        })
699        .unwrap();
700    (server_handle, controller_handle)
701}
702
703pub enum PeerSender {
704    Warp(SplitSink<WebSocket, warp::ws::Message>),
705    Tungstenite(SocketSender),
706}
707
708pub enum PeerReceiver {
709    Warp(SplitStream<WebSocket>),
710    Tungstenite(SocketReceiver),
711}
712
713fn run_websocket_server(
714    sender_clone: Sender<IoEvent>,
715    io_controller: Arc<RwLock<NetworkController>>,
716    port: u16,
717    host: String,
718    public_key: SaitoPublicKey,
719    peers_lock: Arc<RwLock<PeerCollection>>,
720) -> JoinHandle<()> {
721    info!("running websocket server on {:?}", port);
722    tokio::task::Builder::new()
723        .name("saito-ws-server")
724        .spawn(async move {
725            info!("starting websocket server");
726            let io_controller = io_controller.clone();
727            let peers_lock_1 = peers_lock.clone();
728            let sender_to_io = sender_clone.clone();
729            let ws_route = warp::path("wsopen")
730                .and(warp::ws())
731                .and(warp::addr::remote())
732                .map(move |ws: warp::ws::Ws, addr: Option<SocketAddr>| {
733                    debug!("incoming connection received");
734                    let clone = io_controller.clone();
735                    let peers = peers_lock_1.clone();
736                    let sender_to_io = sender_to_io.clone();
737                    let ws = ws.max_message_size(10_000_000_000);
738                    let ws = ws.max_frame_size(10_000_000_000);
739
740                    ws.on_upgrade(move |socket| async move {
741                        debug!("socket connection established");
742
743                        let (sender, receiver) = socket.split();
744
745                        let network_controller = clone.read().await;
746
747                        let peer_index;
748                        {
749                            let mut peers = peers.write().await;
750                            peer_index = peers.peer_counter.get_next_index();
751                        }
752
753                        NetworkController::send_new_peer(
754                            0,
755                            peer_index,
756                            addr.map(|a| a.ip().to_string()),
757                            network_controller.sockets.clone(),
758                            PeerSender::Warp(sender),
759                            PeerReceiver::Warp(receiver),
760                            sender_to_io,
761                        )
762                        .await
763                    })
764                });
765            let http_route =
766                warp::path!("block" / String).and_then(|block_hash: String| async move {
767                    // debug!("serving block : {:?}", block_hash);
768                    let mut buffer: Vec<u8> = Default::default();
769                    let result = fs::read_dir(BLOCKS_DIR_PATH.to_string());
770                    if result.is_err() {
771                        debug!("no blocks found");
772                        return Err(warp::reject::not_found());
773                    }
774                    let paths: Vec<_> = result
775                        .unwrap()
776                        .map(|r| r.unwrap())
777                        .filter(|r| {
778                            let filename = r.file_name().into_string().unwrap();
779                            if !filename.contains(BLOCK_FILE_EXTENSION) {
780                                return false;
781                            }
782                            if !filename.contains(block_hash.as_str()) {
783                                return false;
784                            }
785                            // debug!("selected file : {:?}", filename);
786                            true
787                        })
788                        .collect();
789
790                    if paths.is_empty() {
791                        return Err(warp::reject::not_found());
792                    }
793                    let path = paths.first().unwrap();
794                    let file_path = BLOCKS_DIR_PATH.to_string()
795                        + "/"
796                        + path.file_name().into_string().unwrap().as_str();
797                    let result = File::open(file_path.as_str()).await;
798                    if result.is_err() {
799                        error!("failed opening file : {:?}", result.err().unwrap());
800                        return Err(warp::reject::not_found());
801                    }
802                    let mut file = result.unwrap();
803
804                    let result = file.read_to_end(&mut buffer).await;
805                    if result.is_err() {
806                        error!("failed reading file : {:?}", result.err().unwrap());
807                        return Err(warp::reject::not_found());
808                    }
809                    drop(file);
810
811                    let _buffer_len = buffer.len();
812                    let result = Ok(warp::reply::with_status(buffer, StatusCode::OK));
813                    // debug!("served block with : {:?} length", buffer_len);
814                    result
815                });
816
817            // TODO : review this code
818            let opt = warp::path::param::<String>()
819                .map(Some)
820                .or_else(|_| async { Ok::<(Option<String>,), std::convert::Infallible>((None,)) });
821            let lite_route = warp::path!("lite-block" / String / ..)
822                .and(opt)
823                .and(warp::path::end())
824                .and(warp::any().map(move || peers_lock.clone()))
825                .and_then(
826                    move |block_hash: String,
827                          key: Option<String>,
828                          peer_lock: Arc<RwLock<PeerCollection>>| async move {
829                        // debug!("serving lite block : {:?}", block_hash);
830
831                        let key1;
832                        if key.is_some() {
833                            key1 = key.unwrap();
834                        } else {
835                            warn!("key is not set to request lite blocks");
836                            return Err(warp::reject::reject());
837                        }
838
839                        let key;
840                        if key1.is_empty() {
841                            key = public_key;
842                        } else {
843                            let result: Result<SaitoPublicKey, String>;
844                            if key1.len() == 66 {
845                                result = SaitoPublicKey::from_hex(key1.as_str());
846                            } else {
847                                result = SaitoPublicKey::from_base58(key1.as_str());
848                            }
849                            if result.is_err() {
850                                warn!("key : {:?} couldn't be decoded", key1);
851                                return Err(warp::reject::reject());
852                            }
853
854                            let result = result.unwrap();
855                            if result.len() != 33 {
856                                warn!("key length : {:?} is not for public key", result.len());
857                                return Err(warp::reject::reject());
858                            }
859                            key = result;
860                        }
861                        let mut keylist;
862                        {
863                            let peers = peer_lock.read().await;
864                            let peer = peers.find_peer_by_address(&key);
865                            if peer.is_none() {
866                                debug!(
867                                    "lite block requester : {:?} is not connected as a peer",
868                                    key.to_hex()
869                                );
870                                keylist = vec![key];
871                            } else {
872                                keylist = peer.as_ref().unwrap().key_list.clone();
873                                keylist.push(key);
874                            }
875                        }
876
877                        let mut buffer: Vec<u8> = Default::default();
878                        let result = fs::read_dir(BLOCKS_DIR_PATH.to_string());
879                        if result.is_err() {
880                            debug!("no blocks found");
881                            return Err(warp::reject::not_found());
882                        }
883                        let paths: Vec<_> = result
884                            .unwrap()
885                            .map(|r| r.unwrap())
886                            .filter(|r| {
887                                let filename = r.file_name().into_string().unwrap();
888                                if !filename.contains(BLOCK_FILE_EXTENSION) {
889                                    return false;
890                                }
891                                if !filename.contains(block_hash.as_str()) {
892                                    return false;
893                                }
894                                true
895                            })
896                            .collect();
897
898                        if paths.is_empty() {
899                            return Err(warp::reject::not_found());
900                        }
901                        let path = paths.first().unwrap();
902                        let file_path = BLOCKS_DIR_PATH.to_string()
903                            + "/"
904                            + path.file_name().into_string().unwrap().as_str();
905                        let result = File::open(file_path.as_str()).await;
906                        if result.is_err() {
907                            error!("failed opening file : {:?}", result.err().unwrap());
908                            return Err(warp::reject::not_found());
909                        }
910                        let mut file = result.unwrap();
911
912                        let result = file.read_to_end(&mut buffer).await;
913                        if result.is_err() {
914                            error!("failed reading file : {:?}", result.err().unwrap());
915                            return Err(warp::reject::not_found());
916                        }
917                        drop(file);
918
919                        let block = Block::deserialize_from_net(&buffer);
920                        if block.is_err() {
921                            error!("failed parsing buffer into a block");
922                            return Err(warp::reject::not_found());
923                        }
924                        let mut block = block.unwrap();
925                        if block.generate().is_err() {
926                            error!("failed generating block : {}", block_hash);
927                            return Err(warp::reject::not_found());
928                        }
929                        let block = block.generate_lite_block(keylist);
930                        let buffer = block.serialize_for_net(BlockType::Full);
931                        Ok(warp::reply::with_status(buffer, StatusCode::OK))
932                    },
933                );
934            let routes = http_route.or(ws_route).or(lite_route);
935            // let (_, server) =
936            //     warp::serve(ws_route).bind_with_graceful_shutdown(([127, 0, 0, 1], port), async {
937            //         // tokio::signal::ctrl_c().await.ok();
938            //     });
939            // server.await;
940            let address =
941                SocketAddr::from_str((host + ":" + port.to_string().as_str()).as_str()).unwrap();
942            warp::serve(routes).run(address).await;
943        })
944        .unwrap()
945}
946
947#[cfg(test)]
948mod tests {
949    use futures::SinkExt;
950    use log::info;
951
952    use saito_core::core::msg::handshake::HandshakeChallenge;
953    use saito_core::core::msg::message::Message;
954    use saito_core::core::util::crypto::generate_random_bytes;
955    use tokio_tungstenite::connect_async;
956
957    #[tokio::test]
958    async fn multi_peer_perf_test() {
959        // pretty_env_logger::init();
960        let url = "ws://127.0.0.1:12101/wsopen";
961
962        info!("url = {:?}", url);
963
964        let mut sockets = vec![];
965        let it = 10000;
966        for i in 0..it {
967            let result = connect_async(url).await;
968            if result.is_err() {
969                println!("{:?}", result.err().unwrap());
970                return;
971            }
972            let result = result.unwrap();
973            let mut socket = result.0;
974
975            let challenge = HandshakeChallenge {
976                challenge: generate_random_bytes(32).await.try_into().unwrap(),
977            };
978            // challenge_for_peer = Some(challenge.challenge);
979            let message = Message::HandshakeChallenge(challenge);
980
981            socket
982                .send(tokio_tungstenite::tungstenite::Message::Binary(
983                    message.serialize(),
984                ))
985                .await
986                .unwrap();
987
988            sockets.push(socket);
989
990            // let (socket_sender, socket_receiver): (SocketSender, SocketReceiver) = socket.split();
991            info!("connecting ... : {:?}", i);
992        }
993    }
994}