1use std::collections::{HashMap, HashSet};
2use std::fs;
3use std::net::SocketAddr;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::stream::{SplitSink, SplitStream};
9use futures::{SinkExt, StreamExt};
10use log::{debug, error, info, trace, warn};
11use reqwest::Client;
12use tokio::fs::File;
13use tokio::io::AsyncReadExt;
14use tokio::net::TcpStream;
15use tokio::select;
16use tokio::sync::mpsc::{Receiver, Sender};
17use tokio::sync::{Mutex, RwLock};
18use tokio::task::JoinHandle;
19use tokio::time::Instant;
20use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream};
21use warp::http::StatusCode;
22use warp::ws::WebSocket;
23use warp::Filter;
24
25use saito_core::core::consensus::block::{Block, BlockType};
26use saito_core::core::consensus::blockchain::Blockchain;
27use saito_core::core::consensus::peers::peer_collection::PeerCollection;
28use saito_core::core::defs::{
29 BlockId, PeerIndex, PrintForLog, SaitoHash, SaitoPublicKey, StatVariable, BLOCK_FILE_EXTENSION,
30 STAT_BIN_COUNT,
31};
32use saito_core::core::io::network::PeerDisconnectType;
33use saito_core::core::io::network_event::NetworkEvent;
34use saito_core::core::process::keep_time::Timer;
35use saito_core::core::util::configuration::Configuration;
36
37use crate::io_event::IoEvent;
38use crate::rust_io_handler::BLOCKS_DIR_PATH;
39
40type SocketSender = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>;
43type SocketReceiver = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
44
45pub struct NetworkController {
46 sockets: Arc<Mutex<HashMap<u64, PeerSender>>>,
47 currently_queried_urls: Arc<Mutex<HashSet<String>>>,
48 pub sender_to_saito_controller: Sender<IoEvent>,
49}
50
51impl NetworkController {
52 pub async fn send(connection: &mut PeerSender, peer_index: u64, buffer: Vec<u8>) -> bool {
53 let mut send_failed = false;
54 match connection {
56 PeerSender::Warp(sender) => {
57 if let Err(error) = sender.send(warp::ws::Message::binary(buffer)).await {
58 error!(
59 "Error sending message, Peer Index = {:?}, Reason {:?}",
60 peer_index, error
61 );
62
63 send_failed = true;
64 }
65 }
66 PeerSender::Tungstenite(sender) => {
67 if let Err(error) = sender
68 .send(tokio_tungstenite::tungstenite::Message::Binary(buffer))
69 .await
70 {
71 error!(
72 "Error sending message, Peer Index = {:?}, Reason {:?}",
73 peer_index, error
74 );
75 send_failed = true;
76 }
77 }
78 }
79
80 !send_failed
81 }
82
83 pub async fn send_outgoing_message(
84 sockets: Arc<Mutex<HashMap<u64, PeerSender>>>,
85 peer_index: u64,
86 buffer: Vec<u8>,
87 ) {
88 let buf_len = buffer.len();
89
90 let mut sockets = sockets.lock().await;
91 let socket = sockets.get_mut(&peer_index);
92 if socket.is_none() {
93 error!(
94 "Cannot find the corresponding sender socket, Peer Index : {:?}",
95 peer_index
96 );
97 return;
98 }
99
100 let socket = socket.unwrap();
101
102 if !Self::send(socket, peer_index, buffer).await {
103 warn!(
104 "failed sending buffer of size : {:?} to peer : {:?}",
105 buf_len, peer_index
106 );
107 sockets.remove(&peer_index);
108 }
109 }
110
111 pub async fn connect_to_peer(
112 event_id: u64,
113 io_controller: Arc<RwLock<NetworkController>>,
114 url: String,
115 peer_index: PeerIndex,
116 ) {
117 debug!("connecting to peer : {:?}", url);
120
121 let result = connect_async(url.clone()).await;
122 if result.is_ok() {
123 let result = result.unwrap();
124 let socket: WebSocketStream<MaybeTlsStream<TcpStream>> = result.0;
125
126 let ip = match socket.get_ref() {
127 MaybeTlsStream::NativeTls(s) => {
128 if let Ok(socket_address) = s.get_ref().get_ref().get_ref().peer_addr() {
129 Some(socket_address.ip().to_string())
130 } else {
131 None
132 }
133 }
134 MaybeTlsStream::Plain(t) => {
135 if let Ok(socket_address) = t.peer_addr() {
136 Some(socket_address.ip().to_string())
137 } else {
138 None
139 }
140 }
141 _ => None,
142 };
143
144 let network_controller = io_controller.read().await;
145
146 let sender_to_controller = network_controller.sender_to_saito_controller.clone();
147 let (socket_sender, socket_receiver): (SocketSender, SocketReceiver) = socket.split();
148
149 info!(
150 "connected to peer : {:?} with index : {:?}",
151 url, peer_index
152 );
153
154 NetworkController::send_new_peer(
155 event_id,
156 peer_index,
157 ip,
158 network_controller.sockets.clone(),
159 PeerSender::Tungstenite(socket_sender),
160 PeerReceiver::Tungstenite(socket_receiver),
161 sender_to_controller,
162 )
163 .await;
164 } else {
165 warn!(
166 "failed connecting to : {:?}, reason {:?}",
167 url,
168 result.err().unwrap()
169 );
170 }
171 }
172
173 pub async fn send_to_all(
174 sockets: Arc<Mutex<HashMap<u64, PeerSender>>>,
175 buffer: Vec<u8>,
176 exceptions: Vec<u64>,
177 ) {
178 trace!("sending buffer of size : {:?} to all", buffer.len());
179 let mut sockets = sockets.lock().await;
180 let mut peers_with_errors: Vec<u64> = Default::default();
181
182 for entry in sockets.iter_mut() {
183 let peer_index = entry.0;
184 if exceptions.contains(peer_index) {
185 continue;
186 }
187 let socket = entry.1;
188
189 trace!(
190 "sending buffer of size : {:?} to peer : {:?}",
191 buffer.len(),
192 peer_index
193 );
194 if !Self::send(socket, *peer_index, buffer.clone()).await {
195 warn!(
196 "failed sending buffer of size : {:?} to peer : {:?}",
197 buffer.len(),
198 peer_index
199 );
200 peers_with_errors.push(*peer_index)
201 }
202 }
203
204 for peer in peers_with_errors {
205 sockets.remove(&peer);
206 }
207
208 }
210 pub async fn fetch_block(
211 block_hash: SaitoHash,
212 peer_index: u64,
213 url: String,
214 event_id: u64,
215 sender_to_core: Sender<IoEvent>,
216 current_queries: Arc<Mutex<HashSet<String>>>,
217 client: Client,
218 block_id: BlockId,
219 ) {
220 debug!("fetching block : {:?}", url);
221
222 {
223 let mut queries = current_queries.lock().await;
225 if queries.contains(&url) {
226 debug!("url : {:?} is already being fetched", url);
227 return;
228 }
229 queries.insert(url.clone());
230 }
231 let block_fetch_timeout_in_ms = 10_000;
232 let result = client
233 .get(url.clone())
234 .timeout(Duration::from_millis(block_fetch_timeout_in_ms))
235 .send()
236 .await;
237 if result.is_err() {
238 warn!("failed fetching : {:?}", url);
240 let mut queries = current_queries.lock().await;
241 queries.remove(&url);
242 sender_to_core
243 .send(IoEvent {
244 event_processor_id: 1,
245 event_id,
246 event: NetworkEvent::BlockFetchFailed {
247 block_hash,
248 peer_index,
249 block_id,
250 },
251 })
252 .await
253 .unwrap();
254 return;
255 }
256 let response = result.unwrap();
257 if !matches!(response.status(), StatusCode::OK) {
258 warn!(
259 "failed fetching block : {:?}, with error code : {:?} from url : {:?}",
260 block_hash.to_hex(),
261 response.status(),
262 url
263 );
264 let mut queries = current_queries.lock().await;
265 queries.remove(&url);
266 sender_to_core
267 .send(IoEvent {
268 event_processor_id: 1,
269 event_id,
270 event: NetworkEvent::BlockFetchFailed {
271 block_hash,
272 peer_index,
273 block_id,
274 },
275 })
276 .await
277 .unwrap();
278 return;
279 }
280 let result = response.bytes().await;
281 if result.is_err() {
282 warn!("failed getting byte buffer from fetching block : {:?}", url);
283 let mut queries = current_queries.lock().await;
284 queries.remove(&url);
285 sender_to_core
286 .send(IoEvent {
287 event_processor_id: 1,
288 event_id,
289 event: NetworkEvent::BlockFetchFailed {
290 block_hash,
291 peer_index,
292 block_id,
293 },
294 })
295 .await
296 .unwrap();
297 return;
298 }
299 let result = result.unwrap();
300 let buffer = result.to_vec();
301
302 debug!(
303 "block buffer received with size : {:?} for url : {:?}",
304 buffer.len(),
305 url
306 );
307 sender_to_core
309 .send(IoEvent {
310 event_processor_id: 1,
311 event_id,
312 event: NetworkEvent::BlockFetched {
313 block_hash,
314 block_id,
315 peer_index,
316 buffer,
317 },
318 })
319 .await
320 .unwrap();
321 {
322 let mut queries = current_queries.lock().await;
324 queries.remove(&url);
325 }
326 }
328 pub async fn send_new_peer(
329 event_id: u64,
330 peer_index: u64,
331 ip: Option<String>,
332 sockets: Arc<Mutex<HashMap<u64, PeerSender>>>,
333 sender: PeerSender,
334 receiver: PeerReceiver,
335 sender_to_core: Sender<IoEvent>,
336 ) {
337 {
338 sockets.lock().await.insert(peer_index, sender);
339 }
340
341 debug!("sending new peer : {:?}", peer_index);
342
343 sender_to_core
344 .send(IoEvent {
345 event_processor_id: 1,
346 event_id,
347 event: NetworkEvent::PeerConnectionResult {
348 result: Ok((peer_index, ip)),
349 },
350 })
351 .await
352 .expect("sending failed");
353
354 NetworkController::receive_message_from_peer(
355 receiver,
356 sender_to_core.clone(),
357 peer_index,
358 sockets,
359 )
360 .await;
361 }
362
363 pub async fn disconnect_socket(
364 sockets: Arc<Mutex<HashMap<u64, PeerSender>>>,
365 peer_index: PeerIndex,
366 sender_to_core: Sender<IoEvent>,
367 ) {
368 info!("disconnect peer : {:?}", peer_index);
369 let mut sockets = sockets.lock().await;
370 let socket = sockets.remove(&peer_index);
371 if let Some(_socket) = socket {
372 }
374
375 sender_to_core
376 .send(IoEvent {
377 event_processor_id: 1,
378 event_id: 0,
379 event: NetworkEvent::PeerDisconnected {
380 peer_index,
381 disconnect_type: PeerDisconnectType::InternalDisconnect,
382 },
383 })
384 .await
385 .expect("sending failed");
386 }
387 pub async fn receive_message_from_peer(
388 receiver: PeerReceiver,
389 sender: Sender<IoEvent>,
390 peer_index: u64,
391 sockets: Arc<Mutex<HashMap<u64, PeerSender>>>,
392 ) {
393 debug!("starting new task for reading from peer : {:?}", peer_index);
394 tokio::task::Builder::new()
395 .name(format!("saito-peer-receiver-{:?}", peer_index).as_str())
396 .spawn(async move {
397 debug!("new thread started for peer receiving");
398 match receiver {
399 PeerReceiver::Warp(mut receiver) => loop {
400 let result = receiver.next().await;
401 if result.is_none() {
402 trace!("no message received");
403 continue;
404 }
405 let result = result.unwrap();
406 if result.is_err() {
407 warn!("failed receiving message [1] : {:?}", result.err().unwrap());
409 NetworkController::disconnect_socket(sockets, peer_index, sender).await;
410 break;
411 }
412 let result = result.unwrap();
413
414 if result.is_binary() {
415 let buffer = result.into_bytes();
416 trace!("received buffer of size : {:?}", buffer.len());
417
418 let message = IoEvent {
419 event_processor_id: 1,
420 event_id: 0,
421 event: NetworkEvent::IncomingNetworkMessage { peer_index, buffer },
422 };
423 sender.send(message).await.expect("sending failed");
424 } else if result.is_close() {
425 warn!("connection closed by remote peer : {:?}", peer_index);
426 NetworkController::disconnect_socket(sockets, peer_index, sender).await;
427 break;
428 }
429 },
430 PeerReceiver::Tungstenite(mut receiver) => loop {
431 let result = receiver.next().await;
432 if result.is_none() {
433 trace!("no message received");
434 continue;
435 }
436 let result = result.unwrap();
437 if result.is_err() {
438 warn!("failed receiving message [2] : {:?}", result.err().unwrap());
439 NetworkController::disconnect_socket(sockets, peer_index, sender).await;
440 break;
441 }
442 let result = result.unwrap();
443 match result {
444 tokio_tungstenite::tungstenite::Message::Binary(buffer) => {
445 trace!("received buffer of size : {:?}", buffer.len());
446 let message = IoEvent {
447 event_processor_id: 1,
448 event_id: 0,
449 event: NetworkEvent::IncomingNetworkMessage {
450 peer_index,
451 buffer,
452 },
453 };
454 sender.send(message).await.expect("sending failed");
455 }
456 tokio_tungstenite::tungstenite::Message::Close(_) => {
457 info!("socket for peer : {:?} was closed", peer_index);
458 NetworkController::disconnect_socket(sockets, peer_index, sender)
459 .await;
460 break;
461 }
462 _ => {
463 }
465 }
466 },
467 }
468 debug!("listening thread existed for peer : {:?}", peer_index);
469 })
470 .unwrap();
471 }
472}
473
474pub async fn run_network_controller(
495 mut receiver: Receiver<IoEvent>,
496 sender_to_core: Sender<IoEvent>,
497 configs_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
498 blockchain_lock: Arc<RwLock<Blockchain>>,
499 sender_to_stat: Sender<String>,
500 peers_lock: Arc<RwLock<PeerCollection>>,
501 sender_to_network: Sender<IoEvent>,
502 timer: &Timer,
503) -> (JoinHandle<()>, JoinHandle<()>) {
504 info!("running network handler");
505
506 let host;
507 let url;
508 let port;
509 let public_key;
510 {
511 let configs = configs_lock.read().await;
512
513 url = configs.get_server_configs().unwrap().host.clone()
514 + ":"
515 + configs
516 .get_server_configs()
517 .unwrap()
518 .port
519 .to_string()
520 .as_str();
521 port = configs.get_server_configs().unwrap().port;
522 host = configs.get_server_configs().unwrap().host.clone();
523
524 let blockchain = blockchain_lock.read().await;
526 let wallet = blockchain.wallet_lock.read().await;
527 public_key = wallet.public_key;
528 }
529 info!("starting server on : {:?}", url);
532 let sender_clone = sender_to_core.clone();
533
534 let network_controller_lock = Arc::new(RwLock::new(NetworkController {
535 sockets: Arc::new(Mutex::new(HashMap::new())),
536 sender_to_saito_controller: sender_to_core,
537 currently_queried_urls: Arc::new(Default::default()),
538 }));
539
540 let server_handle = run_websocket_server(
541 sender_clone.clone(),
542 network_controller_lock.clone(),
543 port,
544 host,
545 public_key,
546 peers_lock,
547 );
548
549 let time_keeper = timer.clone();
550
551 let controller_handle = tokio::task::Builder::new()
552 .name("saito-io-controller")
553 .spawn(async move {
554 let mut outgoing_messages = StatVariable::new(
555 "network::outgoing_msgs".to_string(),
556 STAT_BIN_COUNT,
557 sender_to_stat.clone(),
558 );
559 let stat_timer_in_ms;
560 {
561 let configs_temp = configs_lock.read().await;
562 stat_timer_in_ms = configs_temp.get_server_configs().unwrap().stat_timer_in_ms;
563 }
564 let mut stat_interval = tokio::time::interval(Duration::from_millis(stat_timer_in_ms));
565
566 let io_pool = tokio::runtime::Builder::new_multi_thread()
567 .worker_threads(10)
568 .enable_io()
569 .enable_time()
570 .thread_name("saito-io-thread-pool")
571 .build()
572 .unwrap();
573
574 let mut last_stat_on: Instant = Instant::now();
575 loop {
576 select!{
577 result = receiver.recv()=>{
578 if result.is_some() {
579 let event = result.unwrap();
580 let event_id = event.event_id;
581 let interface_event = event.event;
582 match interface_event {
583 NetworkEvent::OutgoingNetworkMessageForAll { buffer, exceptions } => {
584 let sockets;
585 {
586 let network_controller = network_controller_lock.read().await;
587 sockets = network_controller.sockets.clone();
588 }
589
590 NetworkController::send_to_all(sockets, buffer, exceptions).await;
591 outgoing_messages.increment();
592 }
593 NetworkEvent::OutgoingNetworkMessage {
594 peer_index: index,
595 buffer,
596 } => {
597 let sockets;
598 {
599 let network_controller = network_controller_lock.read().await;
600 sockets = network_controller.sockets.clone();
601 }
602 NetworkController::send_outgoing_message(sockets, index, buffer).await;
603 outgoing_messages.increment();
604 }
605 NetworkEvent::ConnectToPeer {url,peer_index } => {
606 NetworkController::connect_to_peer(
607 event_id,
608 network_controller_lock.clone(),
609 url,peer_index
610 )
611 .await;
612 }
613
614 NetworkEvent::BlockFetchRequest {
615 block_hash,
616 peer_index,
617 url,
618 block_id,
619 } => {
620 let sender;
621 let current_queries;
622 {
623 let network_controller = network_controller_lock.read().await;
624
625 sender = network_controller.sender_to_saito_controller.clone();
626 current_queries = network_controller.currently_queried_urls.clone();
627 }
628 io_pool.spawn(async move {
630 let client = reqwest::Client::new();
631
632 NetworkController::fetch_block(
633 block_hash,
634 peer_index,
635 url,
636 event_id,
637 sender,
638 current_queries,
639 client,
640 block_id,
641 )
642 .await
643 });
644 }
645
646 NetworkEvent::DisconnectFromPeer { peer_index } => {
647 let sockets;
648 let sender;
649 {
650 let network_controller = network_controller_lock.read().await;
651 sockets = network_controller.sockets.clone();
652 sender = network_controller.sender_to_saito_controller.clone();
653 }
654 NetworkController::disconnect_socket(
655 sockets,
656 peer_index,
657 sender
658 )
659 .await
660 }
661 _ => unreachable!()
662 }
663 }
664 }
665 _ = stat_interval.tick() => {
666 {
667 if Instant::now().duration_since(last_stat_on)
668 > Duration::from_millis(stat_timer_in_ms)
669 {
670 last_stat_on = Instant::now();
671 outgoing_messages
672 .calculate_stats(time_keeper.get_timestamp_in_ms())
673 .await;
674 let network_controller = network_controller_lock.read().await;
675
676 let stat = format!(
677 "{} - {} - capacity : {:?} / {:?}",
678 StatVariable::format_timestamp(time_keeper.get_timestamp_in_ms()),
679 format!("{:width$}", "network::channel_to_core", width = 40),
680 network_controller.sender_to_saito_controller.capacity(),
681 network_controller.sender_to_saito_controller.max_capacity()
682 );
683 sender_to_stat.send(stat).await.unwrap();
684
685 let stat = format!(
686 "{} - {} - capacity : {:?} / {:?}",
687 StatVariable::format_timestamp(time_keeper.get_timestamp_in_ms()),
688 format!("{:width$}", "network::channel_outgoing", width = 40),
689 sender_to_network.capacity(),
690 sender_to_network.max_capacity()
691 );
692 sender_to_stat.send(stat).await.unwrap();
693 }
694 }
695 }
696 }
697 }
698 })
699 .unwrap();
700 (server_handle, controller_handle)
701}
702
703pub enum PeerSender {
704 Warp(SplitSink<WebSocket, warp::ws::Message>),
705 Tungstenite(SocketSender),
706}
707
708pub enum PeerReceiver {
709 Warp(SplitStream<WebSocket>),
710 Tungstenite(SocketReceiver),
711}
712
713fn run_websocket_server(
714 sender_clone: Sender<IoEvent>,
715 io_controller: Arc<RwLock<NetworkController>>,
716 port: u16,
717 host: String,
718 public_key: SaitoPublicKey,
719 peers_lock: Arc<RwLock<PeerCollection>>,
720) -> JoinHandle<()> {
721 info!("running websocket server on {:?}", port);
722 tokio::task::Builder::new()
723 .name("saito-ws-server")
724 .spawn(async move {
725 info!("starting websocket server");
726 let io_controller = io_controller.clone();
727 let peers_lock_1 = peers_lock.clone();
728 let sender_to_io = sender_clone.clone();
729 let ws_route = warp::path("wsopen")
730 .and(warp::ws())
731 .and(warp::addr::remote())
732 .map(move |ws: warp::ws::Ws, addr: Option<SocketAddr>| {
733 debug!("incoming connection received");
734 let clone = io_controller.clone();
735 let peers = peers_lock_1.clone();
736 let sender_to_io = sender_to_io.clone();
737 let ws = ws.max_message_size(10_000_000_000);
738 let ws = ws.max_frame_size(10_000_000_000);
739
740 ws.on_upgrade(move |socket| async move {
741 debug!("socket connection established");
742
743 let (sender, receiver) = socket.split();
744
745 let network_controller = clone.read().await;
746
747 let peer_index;
748 {
749 let mut peers = peers.write().await;
750 peer_index = peers.peer_counter.get_next_index();
751 }
752
753 NetworkController::send_new_peer(
754 0,
755 peer_index,
756 addr.map(|a| a.ip().to_string()),
757 network_controller.sockets.clone(),
758 PeerSender::Warp(sender),
759 PeerReceiver::Warp(receiver),
760 sender_to_io,
761 )
762 .await
763 })
764 });
765 let http_route =
766 warp::path!("block" / String).and_then(|block_hash: String| async move {
767 let mut buffer: Vec<u8> = Default::default();
769 let result = fs::read_dir(BLOCKS_DIR_PATH.to_string());
770 if result.is_err() {
771 debug!("no blocks found");
772 return Err(warp::reject::not_found());
773 }
774 let paths: Vec<_> = result
775 .unwrap()
776 .map(|r| r.unwrap())
777 .filter(|r| {
778 let filename = r.file_name().into_string().unwrap();
779 if !filename.contains(BLOCK_FILE_EXTENSION) {
780 return false;
781 }
782 if !filename.contains(block_hash.as_str()) {
783 return false;
784 }
785 true
787 })
788 .collect();
789
790 if paths.is_empty() {
791 return Err(warp::reject::not_found());
792 }
793 let path = paths.first().unwrap();
794 let file_path = BLOCKS_DIR_PATH.to_string()
795 + "/"
796 + path.file_name().into_string().unwrap().as_str();
797 let result = File::open(file_path.as_str()).await;
798 if result.is_err() {
799 error!("failed opening file : {:?}", result.err().unwrap());
800 return Err(warp::reject::not_found());
801 }
802 let mut file = result.unwrap();
803
804 let result = file.read_to_end(&mut buffer).await;
805 if result.is_err() {
806 error!("failed reading file : {:?}", result.err().unwrap());
807 return Err(warp::reject::not_found());
808 }
809 drop(file);
810
811 let _buffer_len = buffer.len();
812 let result = Ok(warp::reply::with_status(buffer, StatusCode::OK));
813 result
815 });
816
817 let opt = warp::path::param::<String>()
819 .map(Some)
820 .or_else(|_| async { Ok::<(Option<String>,), std::convert::Infallible>((None,)) });
821 let lite_route = warp::path!("lite-block" / String / ..)
822 .and(opt)
823 .and(warp::path::end())
824 .and(warp::any().map(move || peers_lock.clone()))
825 .and_then(
826 move |block_hash: String,
827 key: Option<String>,
828 peer_lock: Arc<RwLock<PeerCollection>>| async move {
829 let key1;
832 if key.is_some() {
833 key1 = key.unwrap();
834 } else {
835 warn!("key is not set to request lite blocks");
836 return Err(warp::reject::reject());
837 }
838
839 let key;
840 if key1.is_empty() {
841 key = public_key;
842 } else {
843 let result: Result<SaitoPublicKey, String>;
844 if key1.len() == 66 {
845 result = SaitoPublicKey::from_hex(key1.as_str());
846 } else {
847 result = SaitoPublicKey::from_base58(key1.as_str());
848 }
849 if result.is_err() {
850 warn!("key : {:?} couldn't be decoded", key1);
851 return Err(warp::reject::reject());
852 }
853
854 let result = result.unwrap();
855 if result.len() != 33 {
856 warn!("key length : {:?} is not for public key", result.len());
857 return Err(warp::reject::reject());
858 }
859 key = result;
860 }
861 let mut keylist;
862 {
863 let peers = peer_lock.read().await;
864 let peer = peers.find_peer_by_address(&key);
865 if peer.is_none() {
866 debug!(
867 "lite block requester : {:?} is not connected as a peer",
868 key.to_hex()
869 );
870 keylist = vec![key];
871 } else {
872 keylist = peer.as_ref().unwrap().key_list.clone();
873 keylist.push(key);
874 }
875 }
876
877 let mut buffer: Vec<u8> = Default::default();
878 let result = fs::read_dir(BLOCKS_DIR_PATH.to_string());
879 if result.is_err() {
880 debug!("no blocks found");
881 return Err(warp::reject::not_found());
882 }
883 let paths: Vec<_> = result
884 .unwrap()
885 .map(|r| r.unwrap())
886 .filter(|r| {
887 let filename = r.file_name().into_string().unwrap();
888 if !filename.contains(BLOCK_FILE_EXTENSION) {
889 return false;
890 }
891 if !filename.contains(block_hash.as_str()) {
892 return false;
893 }
894 true
895 })
896 .collect();
897
898 if paths.is_empty() {
899 return Err(warp::reject::not_found());
900 }
901 let path = paths.first().unwrap();
902 let file_path = BLOCKS_DIR_PATH.to_string()
903 + "/"
904 + path.file_name().into_string().unwrap().as_str();
905 let result = File::open(file_path.as_str()).await;
906 if result.is_err() {
907 error!("failed opening file : {:?}", result.err().unwrap());
908 return Err(warp::reject::not_found());
909 }
910 let mut file = result.unwrap();
911
912 let result = file.read_to_end(&mut buffer).await;
913 if result.is_err() {
914 error!("failed reading file : {:?}", result.err().unwrap());
915 return Err(warp::reject::not_found());
916 }
917 drop(file);
918
919 let block = Block::deserialize_from_net(&buffer);
920 if block.is_err() {
921 error!("failed parsing buffer into a block");
922 return Err(warp::reject::not_found());
923 }
924 let mut block = block.unwrap();
925 if block.generate().is_err() {
926 error!("failed generating block : {}", block_hash);
927 return Err(warp::reject::not_found());
928 }
929 let block = block.generate_lite_block(keylist);
930 let buffer = block.serialize_for_net(BlockType::Full);
931 Ok(warp::reply::with_status(buffer, StatusCode::OK))
932 },
933 );
934 let routes = http_route.or(ws_route).or(lite_route);
935 let address =
941 SocketAddr::from_str((host + ":" + port.to_string().as_str()).as_str()).unwrap();
942 warp::serve(routes).run(address).await;
943 })
944 .unwrap()
945}
946
947#[cfg(test)]
948mod tests {
949 use futures::SinkExt;
950 use log::info;
951
952 use saito_core::core::msg::handshake::HandshakeChallenge;
953 use saito_core::core::msg::message::Message;
954 use saito_core::core::util::crypto::generate_random_bytes;
955 use tokio_tungstenite::connect_async;
956
957 #[tokio::test]
958 async fn multi_peer_perf_test() {
959 let url = "ws://127.0.0.1:12101/wsopen";
961
962 info!("url = {:?}", url);
963
964 let mut sockets = vec![];
965 let it = 10000;
966 for i in 0..it {
967 let result = connect_async(url).await;
968 if result.is_err() {
969 println!("{:?}", result.err().unwrap());
970 return;
971 }
972 let result = result.unwrap();
973 let mut socket = result.0;
974
975 let challenge = HandshakeChallenge {
976 challenge: generate_random_bytes(32).await.try_into().unwrap(),
977 };
978 let message = Message::HandshakeChallenge(challenge);
980
981 socket
982 .send(tokio_tungstenite::tungstenite::Message::Binary(
983 message.serialize(),
984 ))
985 .await
986 .unwrap();
987
988 sockets.push(socket);
989
990 info!("connecting ... : {:?}", i);
992 }
993 }
994}