1use std::io::{Error, ErrorKind};
2use std::sync::Arc;
3
4use log::{debug, error, info, trace, warn};
5use tokio::sync::RwLock;
6
7use crate::core::consensus::block::Block;
8use crate::core::consensus::blockchain::Blockchain;
9use crate::core::consensus::mempool::Mempool;
10use crate::core::consensus::peers::peer::{Peer, PeerStatus};
11use crate::core::consensus::peers::peer_collection::PeerCollection;
12use crate::core::consensus::transaction::{Transaction, TransactionType};
13use crate::core::consensus::wallet::Wallet;
14use crate::core::defs::{BlockId, PeerIndex, PrintForLog, SaitoHash, SaitoPublicKey, Timestamp};
15use crate::core::io::interface_io::{InterfaceEvent, InterfaceIO};
16use crate::core::msg::block_request::BlockchainRequest;
17use crate::core::msg::handshake::{HandshakeChallenge, HandshakeResponse};
18use crate::core::msg::message::Message;
19use crate::core::process::keep_time::Timer;
20use crate::core::process::version::Version;
21
22#[derive(Debug)]
23pub enum PeerDisconnectType {
24 ExternalDisconnect,
26 InternalDisconnect,
28}
29use crate::core::util::configuration::Configuration;
30
31pub struct Network {
33 pub peer_lock: Arc<RwLock<PeerCollection>>,
35 pub io_interface: Box<dyn InterfaceIO + Send + Sync>,
36 pub wallet_lock: Arc<RwLock<Wallet>>,
37 pub config_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
38 pub timer: Timer,
39}
40
41impl Network {
42 pub fn new(
43 io_handler: Box<dyn InterfaceIO + Send + Sync>,
44 peer_lock: Arc<RwLock<PeerCollection>>,
45 wallet_lock: Arc<RwLock<Wallet>>,
46 config_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
47 timer: Timer,
48 ) -> Network {
49 Network {
50 peer_lock,
51 io_interface: io_handler,
52 wallet_lock,
53 config_lock,
54 timer,
55 }
56 }
57 pub async fn propagate_block(&self, block: &Block) {
58 debug!("propagating block : {:?}", block.hash.to_hex());
59
60 let mut excluded_peers = vec![];
61 if let Some(index) = block.routed_from_peer.as_ref() {
63 excluded_peers.push(*index);
64 }
65
66 {
67 let peers = self.peer_lock.read().await;
68 for (index, peer) in peers.index_to_peers.iter() {
69 if peer.get_public_key().is_none() {
70 excluded_peers.push(*index);
71 continue;
72 }
73 }
74 }
75
76 debug!("sending block : {:?} to peers", block.hash.to_hex());
77 let message = Message::BlockHeaderHash(block.hash, block.id);
78 self.io_interface
79 .send_message_to_all(message.serialize().as_slice(), excluded_peers)
80 .await
81 .unwrap();
82 }
83
84 pub async fn propagate_transaction(&self, transaction: &Transaction) {
85 let peers = self.peer_lock.read().await;
88 let mut wallet = self.wallet_lock.write().await;
89
90 let public_key = wallet.public_key;
91
92 if transaction
93 .from
94 .first()
95 .expect("from slip should exist")
96 .public_key
97 == public_key
98 {
99 if let TransactionType::GoldenTicket = transaction.transaction_type {
100 } else {
101 wallet.add_to_pending(transaction.clone());
102 }
103 }
104
105 for (index, peer) in peers.index_to_peers.iter() {
106 if peer.get_public_key().is_none() {
107 continue;
108 }
109 let public_key = peer.get_public_key().unwrap();
110 if transaction.is_in_path(&public_key) {
111 continue;
112 }
113 let mut transaction = transaction.clone();
114 transaction.add_hop(&wallet.private_key, &wallet.public_key, &public_key);
115 let message = Message::Transaction(transaction);
116 self.io_interface
117 .send_message(*index, message.serialize().as_slice())
118 .await
119 .unwrap();
120 }
121 }
122
123 pub async fn handle_peer_disconnect(
124 &mut self,
125 peer_index: u64,
126 disconnect_type: PeerDisconnectType,
127 ) {
128 debug!("handling peer disconnect, peer_index = {}", peer_index);
129
130 if let PeerDisconnectType::ExternalDisconnect = disconnect_type {
131 self.io_interface
132 .disconnect_from_peer(peer_index)
133 .await
134 .unwrap();
135 }
136
137 let mut peers = self.peer_lock.write().await;
138 if let Some(peer) = peers.find_peer_by_index_mut(peer_index) {
139 if peer.get_public_key().is_some() {
140 self.io_interface
142 .send_interface_event(InterfaceEvent::PeerConnectionDropped(
143 peer_index,
144 peer.get_public_key().unwrap(),
145 ));
146 }
147
148 peer.mark_as_disconnected(self.timer.get_timestamp_in_ms());
149 } else {
150 error!("unknown peer : {:?} disconnected", peer_index);
151 }
152 }
153 pub async fn handle_new_peer(&mut self, peer_index: u64, ip_addr: Option<String>) {
154 let mut peers = self.peer_lock.write().await;
156 if let Some(peer) = peers.find_peer_by_index_mut(peer_index) {
157 debug!("static peer : {:?} connected", peer_index);
158 peer.peer_status = PeerStatus::Connecting;
159 peer.ip_address = ip_addr;
160 } else {
161 debug!("new peer added : {:?}", peer_index);
162 let mut peer = Peer::new(peer_index);
163 peer.peer_status = PeerStatus::Connecting;
164 peer.ip_address = ip_addr;
165 peers.index_to_peers.insert(peer_index, peer);
166 }
167
168 if let Some(peer) = peers.find_peer_by_index_mut(peer_index) {
169 if peer.static_peer_config.is_none() {
170 peer.initiate_handshake(self.io_interface.as_ref())
171 .await
172 .unwrap();
173 }
174 }
175
176 debug!("current peer count = {:?}", peers.index_to_peers.len());
177 }
178
179 pub async fn handle_handshake_challenge(
180 &self,
181 peer_index: u64,
182 challenge: HandshakeChallenge,
183 wallet_lock: Arc<RwLock<Wallet>>,
184 config_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
185 ) {
186 let mut peers = self.peer_lock.write().await;
187
188 let peer = peers.index_to_peers.get_mut(&peer_index);
189 if peer.is_none() {
190 error!(
191 "peer not found for index : {:?}. cannot handle handshake challenge",
192 peer_index
193 );
194 return;
195 }
196 let peer = peer.unwrap();
197 let current_time = self.timer.get_timestamp_in_ms();
198
199 peer.handshake_limiter.increase();
201 if peer.has_handshake_limit_exceeded(current_time) {
202 warn!(
203 "peer {:?} exceeded rate peers for handshake challenge",
204 peer_index
205 );
206 return;
207 }
208
209 peer.handle_handshake_challenge(
210 challenge,
211 self.io_interface.as_ref(),
212 wallet_lock.clone(),
213 config_lock,
214 )
215 .await
216 .unwrap();
217 }
218 pub async fn handle_handshake_response(
219 &mut self,
220 peer_index: u64,
221 response: HandshakeResponse,
222 wallet_lock: Arc<RwLock<Wallet>>,
223 blockchain_lock: Arc<RwLock<Blockchain>>,
224 configs_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
225 ) {
226 let mut peers = self.peer_lock.write().await;
227 let public_key;
228 {
229 let peer = peers.index_to_peers.get_mut(&peer_index);
230 if peer.is_none() {
231 error!(
232 "peer not found for index : {:?}. cannot handle handshake response",
233 peer_index
234 );
235 return;
236 }
237 let peer: &mut Peer = peer.unwrap();
238 let current_time = self.timer.get_timestamp_in_ms();
239 peer.handshake_limiter.increase();
240 if peer.has_handshake_limit_exceeded(current_time) {
241 warn!(
242 "peer {:?} exceeded rate peers for handshake challenge",
243 peer_index
244 );
245 return;
246 }
247 let result = peer
248 .handle_handshake_response(
249 response,
250 self.io_interface.as_ref(),
251 wallet_lock.clone(),
252 configs_lock.clone(),
253 current_time,
254 )
255 .await;
256 if result.is_err() || peer.get_public_key().is_none() {
257 info!(
258 "disconnecting peer : {:?} as handshake response was not handled",
259 peer_index
260 );
261 self.io_interface
262 .disconnect_from_peer(peer_index)
263 .await
264 .unwrap();
265 return;
266 }
267 public_key = peer.get_public_key().unwrap();
268 debug!(
269 "peer : {:?} handshake successful for peer : {:?}",
270 peer.index,
271 public_key.to_base58()
272 );
273 }
274 if let Some(old_peer) = peers.remove_reconnected_peer(&public_key) {
275 let peer = peers
279 .find_peer_by_index_mut(peer_index)
280 .expect("peer should exist here since it was accessed previously");
281 peer.join_as_reconnection(old_peer);
282 } else {
283 peers.address_to_peers.insert(public_key, peer_index);
284 }
285
286 for (index, peer) in &peers.index_to_peers {
287 if peer.public_key.is_none() {
288 continue;
289 }
290 debug!(
291 "peer : {:?} with key : {:?} is currently connected : {:?}",
292 index,
293 peer.public_key.unwrap().to_base58(),
294 peer.peer_status
295 );
296 }
297
298 self.io_interface
299 .send_interface_event(InterfaceEvent::PeerConnected(peer_index));
300 self.request_blockchain_from_peer(peer_index, blockchain_lock.clone())
302 .await;
303 }
304 pub async fn handle_received_key_list(
305 &mut self,
306 peer_index: PeerIndex,
307 key_list: Vec<SaitoPublicKey>,
308 ) -> Result<(), Error> {
309 trace!(
310 "handler received key list of length : {:?} from peer : {:?}",
311 key_list.len(),
312 peer_index
313 );
314
315 let current_time = self.timer.get_timestamp_in_ms();
316 let mut peers = self.peer_lock.write().await;
318 let peer = peers.index_to_peers.get_mut(&peer_index);
319
320 if let Some(peer) = peer {
321 peer.key_list_limiter.increase();
323 if peer.has_key_list_limit_exceeded(current_time) {
324 debug!(
325 "peer {} - {} exceeded the rate for key list",
326 peer_index,
327 peer.public_key.unwrap().to_base58()
328 );
329 return Err(Error::from(ErrorKind::Other));
330 }
331
332 trace!(
333 "handling received keylist of length : {:?} from peer : {:?}",
334 key_list.len(),
335 peer_index
336 );
337 peer.key_list = key_list;
338 Ok(())
339 } else {
340 error!(
341 "peer not found for index : {:?}. cannot handle received key list",
342 peer_index
343 );
344 Err(Error::from(ErrorKind::NotFound))
345 }
346 }
347
348 pub async fn send_key_list(&self, key_list: &[SaitoPublicKey]) {
349 debug!(
350 "sending key list to all the peers {:?}",
351 key_list
352 .iter()
353 .map(|key| key.to_base58())
354 .collect::<Vec<String>>()
355 );
356
357 self.io_interface
358 .send_message_to_all(
359 Message::KeyListUpdate(key_list.to_vec())
360 .serialize()
361 .as_slice(),
362 vec![],
363 )
364 .await
365 .unwrap();
366 }
367
368 pub(crate) async fn request_blockchain_from_peer(
369 &self,
370 peer_index: u64,
371 blockchain_lock: Arc<RwLock<Blockchain>>,
372 ) {
373 let configs = self.config_lock.read().await;
374 let blockchain = blockchain_lock.read().await;
376 let buffer: Vec<u8>;
377 debug!(
378 "requesting blockchain from peer : {:?} latest_block_id : {:?}, last_block_id : {:?}",
379 peer_index,
380 blockchain.get_latest_block_id(),
381 blockchain.last_block_id
382 );
383
384 if configs.is_spv_mode() {
385 let request;
386 {
387 debug!(
388 "blockchain last block id : {:?}, latest block id : {:?}",
389 blockchain.last_block_id,
390 blockchain.get_latest_block_id()
391 );
392 if blockchain.last_block_id >= blockchain.get_latest_block_id() {
393 let fork_id = blockchain.fork_id.unwrap_or([0; 32]);
394 debug!(
395 "blockchain request 1 : latest_id: {:?} latest_hash: {:?} fork_id: {:?}",
396 blockchain.last_block_id,
397 blockchain.last_block_hash.to_hex(),
398 fork_id.to_hex()
399 );
400 request = BlockchainRequest {
401 latest_block_id: blockchain.last_block_id,
402 latest_block_hash: blockchain.last_block_hash,
403 fork_id,
404 };
405 } else if let Some(fork_id) =
406 blockchain.generate_fork_id(blockchain.get_latest_block_id())
407 {
408 debug!(
409 "blockchain request 2 : latest_id: {:?} latest_hash: {:?} fork_id: {:?}",
410 blockchain.get_latest_block_id(),
411 blockchain.get_latest_block_hash().to_hex(),
412 fork_id.to_hex()
413 );
414 request = BlockchainRequest {
415 latest_block_id: blockchain.get_latest_block_id(),
416 latest_block_hash: blockchain.get_latest_block_hash(),
417 fork_id,
418 };
419 } else {
420 debug!(
421 "blockchain request 3 : latest_id: {:?} latest_hash: {:?} fork_id: {:?}",
422 blockchain.get_latest_block_id(),
423 blockchain.get_latest_block_hash().to_hex(),
424 [0; 32]
425 );
426 request = BlockchainRequest {
427 latest_block_id: blockchain.get_latest_block_id(),
428 latest_block_hash: blockchain.get_latest_block_hash(),
429 fork_id: [0; 32],
430 };
431 }
432 }
433 debug!("sending ghost chain request to peer : {:?}", peer_index);
434 buffer = Message::GhostChainRequest(
435 request.latest_block_id,
436 request.latest_block_hash,
437 request.fork_id,
438 )
439 .serialize();
440 } else {
441 let request;
442 if let Some(fork_id) = blockchain.generate_fork_id(blockchain.get_latest_block_id()) {
443 request = BlockchainRequest {
444 latest_block_id: blockchain.get_latest_block_id(),
445 latest_block_hash: blockchain.get_latest_block_hash(),
446 fork_id,
447 };
448 debug!(
449 "blockchain request 4 : latest_id: {:?} latest_hash: {:?} fork_id: {:?}",
450 blockchain.get_latest_block_id(),
451 blockchain.get_latest_block_hash().to_hex(),
452 fork_id.to_hex()
453 );
454 } else {
455 request = BlockchainRequest {
456 latest_block_id: blockchain.get_latest_block_id(),
457 latest_block_hash: blockchain.get_latest_block_hash(),
458 fork_id: [0; 32],
459 };
460 debug!(
461 "blockchain request 5 : latest_id: {:?} latest_hash: {:?} fork_id: {:?}",
462 blockchain.get_latest_block_id(),
463 blockchain.get_latest_block_hash().to_hex(),
464 [0; 32]
465 );
466 }
467 debug!("sending blockchain request to peer : {:?}", peer_index);
468 buffer = Message::BlockchainRequest(request).serialize();
469 }
470 drop(blockchain);
473 drop(configs);
474 self.io_interface
477 .send_message(peer_index, buffer.as_slice())
478 .await
479 .unwrap();
480 trace!("blockchain request sent to peer : {:?}", peer_index);
481 }
482 pub async fn process_incoming_block_hash(
483 &mut self,
484 block_hash: SaitoHash,
485 block_id: BlockId,
486 peer_index: PeerIndex,
487 blockchain_lock: Arc<RwLock<Blockchain>>,
488 mempool_lock: Arc<RwLock<Mempool>>,
489 ) -> Option<()> {
490 trace!(
491 "fetching block : {:?}-{:?} from peer : {:?}",
492 block_id,
493 block_hash.to_hex(),
494 peer_index
495 );
496 let block_exists;
497 let my_public_key;
498
499 {
500 let blockchain = blockchain_lock.read().await;
502 if blockchain.is_block_indexed(block_hash) {
503 block_exists = true;
504 } else {
505 let mempool = mempool_lock.read().await;
506 block_exists = mempool.blocks_queue.iter().any(|b| b.hash == block_hash);
507 }
508 }
509 {
511 let wallet = self.wallet_lock.read().await;
512 my_public_key = wallet.public_key;
513 }
514 if block_exists {
515 debug!(
516 "block : {:?}-{:?} already exists in chain. not fetching",
517 block_id,
518 block_hash.to_hex()
519 );
520 return None;
521 }
522 let url;
523 {
524 let configs = self.config_lock.read().await;
525 let peers = self.peer_lock.read().await;
526 let wallet = self.wallet_lock.read().await;
527
528 if let Some(peer) = peers.index_to_peers.get(&peer_index) {
529 if wallet.wallet_version > peer.wallet_version
530 && peer.wallet_version != Version::new(0, 0, 0)
531 {
532 warn!(
533 "Not Fetching Block: {:?} from peer :{:?} since peer version is old. expected: {:?} actual {:?} ",
534 block_hash.to_hex(), peer.index, wallet.wallet_version, peer.wallet_version
535 );
536 return None;
537 }
538
539 if peer.block_fetch_url.is_empty() {
540 debug!(
541 "won't fetch block : {:?} from peer : {:?} since no url found",
542 block_hash.to_hex(),
543 peer_index
544 );
545 return None;
546 }
547 url = peer.get_block_fetch_url(block_hash, configs.is_spv_mode(), my_public_key);
548 } else {
549 warn!(
550 "peer : {:?} is not in peer list. cannot generate the block fetch url",
551 peer_index
552 );
553 return None;
554 }
555 }
556
557 debug!(
558 "fetching block for incoming hash : {:?}-{:?}",
559 block_id,
560 block_hash.to_hex()
561 );
562
563 if self
564 .io_interface
565 .fetch_block_from_peer(block_hash, peer_index, url.as_str(), block_id)
566 .await
567 .is_err()
568 {
569 warn!(
571 "failed fetching block : {:?} for block hash. so unmarking block as fetching",
572 block_hash.to_hex()
573 );
574 }
575 Some(())
576 }
577
578 pub async fn initialize_static_peers(
579 &mut self,
580 configs_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
581 ) {
582 let configs = configs_lock.read().await;
583 let mut peers = self.peer_lock.write().await;
584
585 configs
587 .get_peer_configs()
588 .clone()
589 .drain(..)
590 .for_each(|config| {
591 let mut peer = Peer::new(peers.peer_counter.get_next_index());
592
593 peer.static_peer_config = Some(config);
594
595 peers.index_to_peers.insert(peer.index, peer);
596 });
597
598 info!("added {:?} static peers", peers.index_to_peers.len());
599 }
600
601 pub async fn handle_new_stun_peer(
602 &mut self,
603 peer_index: PeerIndex,
604 public_key: SaitoPublicKey,
605 ) {
606 debug!(
607 "Adding STUN peer with index: {} and public key: {}",
608 peer_index,
609 public_key.to_base58()
610 );
611 let mut peers = self.peer_lock.write().await;
612 if peers.index_to_peers.contains_key(&peer_index) {
613 error!(
614 "Failed to add STUN peer: Peer with index {} already exists",
615 peer_index
616 );
617 return;
618 }
619 let peer = Peer::new_stun(peer_index, public_key, self.io_interface.as_ref());
620 peers.index_to_peers.insert(peer_index, peer);
621 peers.address_to_peers.insert(public_key, peer_index);
622 debug!("STUN peer added successfully");
623 self.io_interface
624 .send_interface_event(InterfaceEvent::StunPeerConnected(peer_index));
625 }
626
627 pub async fn remove_stun_peer(&mut self, peer_index: PeerIndex) {
628 debug!("Removing STUN peer with index: {}", peer_index);
629 let mut peers = self.peer_lock.write().await;
630 let peer_public_key: SaitoPublicKey;
631 if let Some(peer) = peers.index_to_peers.remove(&peer_index) {
632 if let Some(public_key) = peer.get_public_key() {
633 peer_public_key = public_key;
634 peers.address_to_peers.remove(&public_key);
635 debug!("STUN peer removed from network successfully");
636 self.io_interface
637 .send_interface_event(InterfaceEvent::StunPeerDisconnected(
638 peer_index,
639 peer_public_key,
640 ));
641 }
642 } else {
643 error!(
644 "Failed to remove STUN peer: Peer with index {} not found",
645 peer_index
646 );
647 }
648 }
649
650 pub async fn connect_to_static_peers(&mut self, current_time: Timestamp) {
651 let mut peers = self.peer_lock.write().await;
653 for (peer_index, peer) in &mut peers.index_to_peers {
654 let url = peer.get_url();
655 if let PeerStatus::Disconnected(connect_time, period) = &mut peer.peer_status {
656 if current_time < *connect_time {
657 continue;
658 }
659 if let Some(config) = peer.static_peer_config.as_ref() {
660 info!(
661 "trying to connect to static peer : {:?} with {:?}",
662 peer_index, config
663 );
664 self.io_interface
665 .connect_to_peer(url, peer.index)
666 .await
667 .unwrap();
668 *period *= 2;
669 *connect_time = current_time + *period;
670 }
671 }
672 }
673 }
674
675 pub async fn send_pings(&mut self) {
676 let current_time = self.timer.get_timestamp_in_ms();
677 let mut peers = self.peer_lock.write().await;
678 for (_, peer) in peers.index_to_peers.iter_mut() {
679 if peer.get_public_key().is_some() {
680 peer.send_ping(current_time, self.io_interface.as_ref())
681 .await;
682 }
683 }
684 }
685
686 pub async fn update_peer_timer(&mut self, peer_index: PeerIndex) {
687 let mut peers = self.peer_lock.write().await;
688 let peer = peers.index_to_peers.get_mut(&peer_index);
689 if peer.is_none() {
690 return;
691 }
692 let peer = peer.unwrap();
693 peer.last_msg_at = self.timer.get_timestamp_in_ms();
694 }
695}
696
697#[cfg(test)]
698mod tests {
699 use super::*;
700 use crate::core::util::{configuration::PeerConfig, test::test_manager};
701 use rand::Rng;
702
703 #[serial_test::serial]
704 #[tokio::test]
705 async fn test_keylist_rate_limiter() {
706 let mut t1 = test_manager::test::TestManager::default();
707 let limit: u64 = 10;
708 let peer2_index: u64 = 0;
709 let mut peer2;
710
711 {
712 let mut peers = t1.network.peer_lock.write().await;
713
714 peer2 = Peer::new(peer2_index);
715
716 peer2.key_list_limiter.set_limit(limit);
717 let peer_data = PeerConfig {
718 host: String::from(""),
719 port: 8080,
720 protocol: String::from(""),
721 synctype: String::from(""),
722 };
724
725 peer2.static_peer_config = Some(peer_data);
726 peers.index_to_peers.insert(peer2_index, peer2.clone());
727 println!("peer count = {:?}", peers.index_to_peers.len());
728 }
729
730 for i in 0..40 {
731 let key_list: Vec<SaitoPublicKey> = (0..11)
732 .map(|_| {
733 let mut key = [0u8; 33];
734 rand::thread_rng().fill(&mut key[..]);
735 key
736 })
737 .collect();
738
739 let result = t1
740 .network
741 .handle_received_key_list(peer2_index, key_list)
742 .await;
743
744 dbg!(&result);
745
746 if i < limit {
747 assert!(result.is_ok(), "Expected Ok, got {:?}", result);
748 } else {
749 assert!(result.is_err(), "Expected Err, got {:?}", result);
750 }
751 }
752 dbg!(peer2);
753 }
754}