1use crate::core::consensus::block::BlockType;
2use crate::core::consensus::blockchain::Blockchain;
3use crate::core::consensus::blockchain_sync_state::BlockchainSyncState;
4use crate::core::consensus::mempool::Mempool;
5use crate::core::consensus::peers::peer_service::PeerService;
6use crate::core::consensus::peers::peer_state_writer::{PeerStateEntry, PEER_STATE_WRITE_PERIOD};
7use crate::core::consensus::wallet::Wallet;
8use crate::core::consensus_thread::ConsensusEvent;
9use crate::core::defs::{
10 BlockHash, BlockId, PeerIndex, PrintForLog, SaitoHash, SaitoPublicKey, StatVariable, Timestamp,
11 CHANNEL_SAFE_BUFFER, STAT_BIN_COUNT,
12};
13use crate::core::io::interface_io::InterfaceEvent;
14use crate::core::io::network::{Network, PeerDisconnectType};
15use crate::core::io::network_event::NetworkEvent;
16use crate::core::io::storage::Storage;
17use crate::core::mining_thread::MiningEvent;
18use crate::core::msg::block_request::BlockchainRequest;
19use crate::core::msg::ghost_chain_sync::GhostChainSync;
20use crate::core::msg::message::Message;
21use crate::core::process::keep_time::Timer;
22use crate::core::process::process_event::ProcessEvent;
23use crate::core::process::version::Version;
24use crate::core::util;
25use crate::core::util::configuration::Configuration;
26use crate::core::util::crypto::hash;
27use crate::core::verification_thread::VerifyRequest;
28use async_trait::async_trait;
29use log::{debug, error, info, trace, warn};
30use std::ops::Deref;
31use std::sync::Arc;
32use std::time::Duration;
33use tokio::sync::mpsc::Sender;
34use tokio::sync::RwLock;
35
36#[derive(Debug)]
37pub enum RoutingEvent {
38 BlockchainUpdated(BlockHash),
39 BlockFetchRequest(PeerIndex, BlockHash, BlockId),
40 BlockchainRequest(PeerIndex),
41}
42
43#[derive(Debug)]
44pub enum PeerState {
45 Connected,
46 Connecting,
47 Disconnected,
48}
49
50pub struct StaticPeer {
51 pub peer_details: util::configuration::PeerConfig,
52 pub peer_state: PeerState,
53 pub peer_index: u64,
54}
55
56pub struct RoutingStats {
57 pub received_transactions: StatVariable,
58 pub received_blocks: StatVariable,
59 pub total_incoming_messages: StatVariable,
60}
61
62impl RoutingStats {
63 pub fn new(sender: Sender<String>) -> Self {
64 RoutingStats {
65 received_transactions: StatVariable::new(
66 "routing::received_txs".to_string(),
67 STAT_BIN_COUNT,
68 sender.clone(),
69 ),
70 received_blocks: StatVariable::new(
71 "routing::received_blocks".to_string(),
72 STAT_BIN_COUNT,
73 sender.clone(),
74 ),
75 total_incoming_messages: StatVariable::new(
76 "routing::incoming_msgs".to_string(),
77 STAT_BIN_COUNT,
78 sender,
79 ),
80 }
81 }
82}
83
84pub struct RoutingThread {
86 pub blockchain_lock: Arc<RwLock<Blockchain>>,
87 pub mempool_lock: Arc<RwLock<Mempool>>,
88 pub sender_to_consensus: Sender<ConsensusEvent>,
89 pub sender_to_miner: Sender<MiningEvent>,
90 pub config_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
91 pub timer: Timer,
92 pub wallet_lock: Arc<RwLock<Wallet>>,
93 pub network: Network,
94 pub storage: Storage,
95 pub reconnection_timer: Timestamp,
96 pub peer_removal_timer: Timestamp,
97 pub peer_file_write_timer: Timestamp,
98 pub last_emitted_block_fetch_count: BlockId,
99 pub stats: RoutingStats,
100 pub senders_to_verification: Vec<Sender<VerifyRequest>>,
101 pub last_verification_thread_index: usize,
102 pub stat_sender: Sender<String>,
103 pub blockchain_sync_state: BlockchainSyncState,
104}
105
106impl RoutingThread {
107 async fn process_incoming_message(&mut self, peer_index: PeerIndex, message: Message) {
122 self.network.update_peer_timer(peer_index).await;
123 match message {
124 Message::HandshakeChallenge(challenge) => {
125 debug!("received handshake challenge from peer : {:?}", peer_index);
126 self.network
127 .handle_handshake_challenge(
128 peer_index,
129 challenge,
130 self.wallet_lock.clone(),
131 self.config_lock.clone(),
132 )
133 .await;
134 }
135 Message::HandshakeResponse(response) => {
136 trace!("received handshake response from peer : {:?}", peer_index);
137 self.network
138 .handle_handshake_response(
139 peer_index,
140 response,
141 self.wallet_lock.clone(),
142 self.blockchain_lock.clone(),
143 self.config_lock.clone(),
144 )
145 .await;
146 }
147
148 Message::Transaction(transaction) => {
149 trace!(
150 "received transaction : {} from peer : {:?}",
151 transaction.signature.to_hex(),
152 peer_index
153 );
154 self.stats.received_transactions.increment();
155 self.send_to_verification_thread(VerifyRequest::Transaction(transaction))
156 .await;
157 }
158 Message::BlockchainRequest(request) => {
159 trace!(
160 "received blockchain request from peer : {:?} with block id : {:?} and hash : {:?}",
161 peer_index,
162 request.latest_block_id,
163 request.latest_block_hash.to_hex()
164 );
165 {
166 let configs = self.config_lock.read().await;
167 if configs.is_browser() || configs.is_spv_mode() {
168 return;
170 }
171 }
172 self.process_incoming_blockchain_request(request, peer_index)
173 .await;
174 }
175 Message::BlockHeaderHash(hash, block_id) => {
176 trace!(
177 "received block header hash from peer : {:?} with block id : {:?} and hash : {:?}",
178 peer_index,
179 block_id,
180 hash.to_hex()
181 );
182 self.process_incoming_block_hash(hash, block_id, peer_index)
183 .await;
184 }
185 Message::Ping() => {}
186 Message::SPVChain() => {}
187 Message::Services(services) => {
188 self.process_peer_services(services, peer_index).await;
189 }
190 Message::GhostChain(chain) => {
191 self.process_ghost_chain(chain, peer_index).await;
192 }
193 Message::GhostChainRequest(block_id, block_hash, fork_id) => {
194 self.process_ghost_chain_request(block_id, block_hash, fork_id, peer_index)
195 .await;
196 }
197 Message::ApplicationMessage(api_message) => {
198 trace!(
199 "processing application msg with buffer size : {:?} from peer : {:?}",
200 api_message.data.len(),
201 peer_index
202 );
203 self.network
204 .io_interface
205 .process_api_call(api_message.data, api_message.msg_index, peer_index)
206 .await;
207 }
208 Message::Result(api_message) => {
209 self.network
210 .io_interface
211 .process_api_success(api_message.data, api_message.msg_index, peer_index)
212 .await;
213 }
214 Message::Error(api_message) => {
215 self.network
216 .io_interface
217 .process_api_error(api_message.data, api_message.msg_index, peer_index)
218 .await;
219 }
220 Message::KeyListUpdate(key_list) => {
221 self.network
222 .handle_received_key_list(peer_index, key_list)
223 .await
224 .unwrap();
225 }
226 Message::Block(_) => {
227 error!("received block message");
228 unreachable!();
229 }
230 }
231 }
232 async fn process_ghost_chain_request(
249 &self,
250 block_id: u64,
251 block_hash: SaitoHash,
252 fork_id: SaitoHash,
253 peer_index: u64,
254 ) {
255 debug!("processing ghost chain request from peer : {:?}. block_id : {:?} block_hash: {:?} fork_id: {:?}",
256 peer_index,
257 block_id,
258 block_hash.to_hex(),
259 fork_id.to_hex()
260 );
261 let blockchain = self.blockchain_lock.read().await;
262 let mut peer_key_list: Vec<SaitoPublicKey> = vec![];
263 {
264 let peers = self.network.peer_lock.read().await;
265 let peer = peers.find_peer_by_index(peer_index).unwrap();
266 peer_key_list.push(peer.public_key.unwrap());
267 peer_key_list.append(&mut peer.key_list.clone());
268 }
269
270 let ghost = Self::generate_ghost_chain(
271 block_id,
272 fork_id,
273 &blockchain,
274 peer_key_list,
275 &self.storage,
276 )
277 .await;
278
279 debug!("sending ghost chain to peer : {:?}", peer_index);
280 let buffer = Message::GhostChain(ghost).serialize();
282 self.network
283 .io_interface
284 .send_message(peer_index, buffer.as_slice())
285 .await
286 .unwrap();
287 }
288
289 pub(crate) async fn generate_ghost_chain(
290 block_id: u64,
291 fork_id: SaitoHash,
292 blockchain: &Blockchain,
293 peer_key_list: Vec<SaitoPublicKey>,
294 storage: &Storage,
295 ) -> GhostChainSync {
296 debug!(
297 "generating ghost chain for block_id : {:?} fork_id : {:?}",
298 block_id,
299 fork_id.to_hex()
300 );
301 let mut last_shared_ancestor = blockchain.generate_last_shared_ancestor(block_id, fork_id);
302
303 debug!("last_shared_ancestor 1 : {:?}", last_shared_ancestor);
304
305 debug!(
306 "peer key list: {:?}",
307 peer_key_list
308 .iter()
309 .map(|pk| pk.to_base58())
310 .collect::<Vec<String>>()
311 );
312
313 if last_shared_ancestor == 0 {
314 last_shared_ancestor = block_id;
316 }
317
318 let start = blockchain
319 .blockring
320 .get_longest_chain_block_hash_at_block_id(last_shared_ancestor)
321 .unwrap_or([0; 32]);
322
323 let latest_block_id = blockchain.blockring.get_latest_block_id();
324 debug!("latest_block_id : {:?}", latest_block_id);
325 debug!("last_shared_ancestor : {:?}", last_shared_ancestor);
326 debug!("start : {:?}", start.to_hex());
327
328 let mut ghost = GhostChainSync {
329 start,
330 prehashes: vec![],
331 previous_block_hashes: vec![],
332 block_ids: vec![],
333 block_ts: vec![],
334 txs: vec![],
335 gts: vec![],
336 };
337 for i in (last_shared_ancestor + 1)..=latest_block_id {
338 if let Some(hash) = blockchain
339 .blockring
340 .get_longest_chain_block_hash_at_block_id(i)
341 {
342 let block = blockchain.get_block(&hash);
343 if let Some(block) = block {
344 ghost.gts.push(block.has_golden_ticket);
345 ghost.block_ts.push(block.timestamp);
346 ghost.prehashes.push(block.pre_hash);
347 ghost.previous_block_hashes.push(block.previous_block_hash);
348 ghost.block_ids.push(block.id);
349
350 let mut clone = block.clone();
351 if !clone
352 .upgrade_block_to_block_type(BlockType::Full, storage, false)
353 .await
354 {
355 warn!(
356 "couldn't upgrade block : {:?}-{:?} for ghost chain generation",
357 clone.id,
358 clone.hash.to_hex()
359 );
360 }
361 debug!(
362 "pushing block : {:?} at index : {:?} with txs : {:?} has txs : {:?}",
363 clone.hash.to_hex(),
364 i,
365 clone.transactions.len(),
366 clone.has_keylist_txs(&peer_key_list)
367 );
368 ghost.txs.push(clone.has_keylist_txs(&peer_key_list));
370 }
371 }
372 }
373 ghost
374 }
375
376 async fn handle_new_peer(&mut self, peer_index: u64, ip: Option<String>) {
377 trace!("handling new peer : {:?}", peer_index);
378 self.network.handle_new_peer(peer_index, ip).await;
379 }
380
381 async fn handle_new_stun_peer(&mut self, peer_index: u64, public_key: SaitoPublicKey) {
382 trace!("handling new stun peer : {:?}", peer_index);
383 self.network
384 .handle_new_stun_peer(peer_index, public_key)
385 .await;
386 }
387
388 async fn remove_stun_peer(&mut self, peer_index: u64) {
389 trace!("removing stun peer : {:?}", peer_index);
390 self.network.remove_stun_peer(peer_index).await;
391 }
392
393 async fn handle_peer_disconnect(
394 &mut self,
395 peer_index: u64,
396 disconnect_type: PeerDisconnectType,
397 ) {
398 trace!("handling peer disconnect, peer_index = {}", peer_index);
399 self.network
400 .handle_peer_disconnect(peer_index, disconnect_type)
401 .await;
402 }
403 pub async fn set_my_key_list(&mut self, mut key_list: Vec<SaitoPublicKey>) {
404 let mut wallet = self.wallet_lock.write().await;
405
406 key_list.sort();
407 if wallet
409 .key_list
410 .iter()
411 .zip(key_list.iter())
412 .any(|(a, b)| a != b)
413 {
414 wallet.set_key_list(key_list);
415 self.network.send_key_list(&wallet.key_list).await;
416 }
417 }
418
419 pub async fn process_incoming_blockchain_request(
420 &self,
421 request: BlockchainRequest,
422 peer_index: u64,
423 ) {
424 debug!(
425 "processing incoming blockchain request : {:?}-{:?}-{:?} from peer : {:?}",
426 request.latest_block_id,
427 request.latest_block_hash.to_hex(),
428 request.fork_id.to_hex(),
429 peer_index
430 );
431 let blockchain = self.blockchain_lock.read().await;
434
435 let last_shared_ancestor =
436 blockchain.generate_last_shared_ancestor(request.latest_block_id, request.fork_id);
437 debug!(
438 "last shared ancestor = {:?} latest_id = {:?}",
439 last_shared_ancestor,
440 blockchain.blockring.get_latest_block_id()
441 );
442
443 for i in last_shared_ancestor..(blockchain.blockring.get_latest_block_id() + 1) {
444 if let Some(block_hash) = blockchain
445 .blockring
446 .get_longest_chain_block_hash_at_block_id(i)
447 {
448 trace!(
449 "sending block header hash: {:?}-{:?} to peer : {:?}",
450 i,
451 block_hash.to_hex(),
452 peer_index
453 );
454 let buffer = Message::BlockHeaderHash(block_hash, i).serialize();
455 self.network
456 .io_interface
457 .send_message(peer_index, buffer.as_slice())
458 .await
459 .unwrap();
460 } else {
461 continue;
462 }
463 }
464 }
465 async fn process_incoming_block_hash(
466 &mut self,
467 block_hash: SaitoHash,
468 block_id: u64,
469 peer_index: u64,
470 ) {
471 debug!(
472 "processing incoming block hash : {:?}-{:?} from peer : {:?}",
473 block_id,
474 block_hash.to_hex(),
475 peer_index
476 );
477 {
478 let blockchain = self.blockchain_lock.read().await;
480 if !blockchain.blocks.is_empty() && blockchain.lowest_acceptable_block_id >= block_id {
481 debug!("skipping block header : {:?}-{:?} from peer : {:?} since our lowest acceptable id : {:?}",block_id,block_hash.to_hex(),peer_index, blockchain.lowest_acceptable_block_id);
482 return;
483 }
484 }
485 let peers = self.network.peer_lock.read().await;
488 let wallet = self.wallet_lock.read().await;
489
490 if let Some(peer) = peers.index_to_peers.get(&peer_index) {
491 if wallet.wallet_version > peer.wallet_version
493 && peer.wallet_version != Version::new(0, 0, 0)
494 {
495 warn!(
496 "Not Fetching Block: {:?} from peer :{:?} since peer version is old. expected: {:?} actual {:?} ",
497 block_hash.to_hex(), peer.index, wallet.wallet_version, peer.wallet_version
498 );
499 return;
500 }
501 }
502
503 drop(peers);
504 drop(wallet);
505
506 self.blockchain_sync_state
507 .add_entry(
508 block_hash,
509 block_id,
510 peer_index,
511 self.network.peer_lock.clone(),
512 )
513 .await;
514
515 self.fetch_next_blocks().await;
516 }
517
518 async fn fetch_next_blocks(&mut self) -> bool {
519 let mut work_done = false;
520 {
521 let blockchain = self.blockchain_lock.read().await;
522 self.blockchain_sync_state
523 .build_peer_block_picture(&blockchain);
524 }
525
526 let map = self.blockchain_sync_state.get_blocks_to_fetch_per_peer();
527
528 let fetching_count = self.blockchain_sync_state.get_fetching_block_count();
529 self.network
531 .io_interface
532 .send_interface_event(InterfaceEvent::BlockFetchStatus(fetching_count as BlockId));
533
534 let mut fetched_blocks: Vec<(PeerIndex, SaitoHash)> = Default::default();
535 for (peer_index, vec) in map {
536 for (hash, block_id) in vec.iter() {
537 work_done = true;
538 let result = self
539 .network
540 .process_incoming_block_hash(
541 *hash,
542 *block_id,
543 peer_index,
544 self.blockchain_lock.clone(),
545 self.mempool_lock.clone(),
546 )
547 .await;
548 if result.is_some() {
549 fetched_blocks.push((peer_index, *hash));
550 } else {
551 self.blockchain_sync_state.remove_entry(*hash);
553 }
554 }
555 }
556 work_done
557 }
558 async fn send_to_verification_thread(&mut self, request: VerifyRequest) {
559 let sender_count = self.senders_to_verification.len();
561 let mut trials = 0;
562 loop {
563 trials += 1;
564 self.last_verification_thread_index += 1;
565 let sender_index: usize = self.last_verification_thread_index % sender_count;
566 let sender = self
567 .senders_to_verification
568 .get(sender_index)
569 .expect("sender should be here as we are using the modulus on index");
570
571 if sender.capacity() > 0 {
572 trace!("sending to verification thread : {:?}", sender_index);
573 sender.send(request).await.unwrap();
574
575 return;
576 }
577 trace!(
578 "verification thread sender : {:?} is full. capacity : {:?} max capacity : {:?}",
579 sender_index,
580 sender.capacity(),
581 sender.max_capacity()
582 );
583 if trials == sender_count {
584 trials = 0;
586 }
587 }
588 }
589 async fn process_ghost_chain(&mut self, chain: GhostChainSync, peer_index: u64) {
590 debug!("processing ghost chain from peer : {:?}", peer_index);
591
592 let mut previous_block_hash = chain.start;
593 let configs = self.config_lock.read().await;
594 let mut blockchain = self.blockchain_lock.write().await;
595 let mut lowest_id_to_reorg = 0;
596 let mut lowest_hash_to_reorg = [0; 32];
597 let mut need_blocks_fetched = false;
598 for i in 0..chain.prehashes.len() {
599 let buf = [
600 previous_block_hash.as_slice(),
601 chain.prehashes[i].as_slice(),
602 ]
603 .concat();
604 let block_hash = hash(&buf);
605 if chain.txs[i] {
606 debug!(
607 "ghost block : {:?} has txs for me. fetching from peer : {:?}",
608 block_hash.to_hex(),
609 peer_index
610 );
611 self.blockchain_sync_state
612 .add_entry(
613 block_hash,
614 chain.block_ids[i],
615 peer_index,
616 self.network.peer_lock.clone(),
617 )
618 .await;
619 need_blocks_fetched = true;
620 } else {
621 if !need_blocks_fetched {
622 lowest_id_to_reorg = chain.block_ids[i];
623 lowest_hash_to_reorg = block_hash;
624 }
625 debug!(
626 "ghost block : {:?} doesn't have txs for me. not fetching",
627 block_hash.to_hex()
628 );
629 blockchain.add_ghost_block(
630 chain.block_ids[i],
631 chain.previous_block_hashes[i],
632 chain.block_ts[i],
633 chain.prehashes[i],
634 chain.gts[i],
635 block_hash,
636 );
637 }
638 previous_block_hash = block_hash;
639 }
640 debug!(
641 "calling reorg with lowest values : {:?}-{:?}",
642 lowest_id_to_reorg,
643 lowest_hash_to_reorg.to_hex()
644 );
645
646 if lowest_id_to_reorg != 0 {
647 blockchain.blockring.on_chain_reorganization(
648 lowest_id_to_reorg,
649 lowest_hash_to_reorg,
650 true,
651 );
652 blockchain
653 .on_chain_reorganization(
654 lowest_id_to_reorg,
655 lowest_hash_to_reorg,
656 true,
657 &self.storage,
658 configs.deref(),
659 )
660 .await;
661
662 if let Some(fork_id) = blockchain.generate_fork_id(blockchain.last_block_id) {
663 if fork_id != [0; 32] {
664 blockchain.set_fork_id(fork_id);
665 }
666 } else {
667 trace!(
669 "fork id not generated for last block id : {:?} after ghost chain processing",
670 blockchain.last_block_id
671 );
672 }
673 self.network
674 .io_interface
675 .send_interface_event(InterfaceEvent::BlockAddSuccess(
676 lowest_hash_to_reorg,
677 lowest_id_to_reorg,
678 ));
679 }
680 }
681
682 async fn process_peer_services(&mut self, services: Vec<PeerService>, peer_index: u64) {
684 let mut peers = self.network.peer_lock.write().await;
685 let peer = peers.index_to_peers.get_mut(&peer_index);
686 if peer.is_some() {
687 let peer = peer.unwrap();
688 peer.services = services;
689 } else {
690 warn!("peer {:?} not found to update services", peer_index);
691 }
692 }
693
694 async fn write_peer_state_data(&mut self, duration_value: Timestamp, work_done: &mut bool) {
695 self.peer_file_write_timer += duration_value;
696 if self.peer_file_write_timer >= PEER_STATE_WRITE_PERIOD {
697 let mut peers = self.network.peer_lock.write().await;
698 let mut data: Vec<PeerStateEntry> = Default::default();
699
700 let current_time = self.timer.get_timestamp_in_ms();
701
702 for (_, peer) in peers.index_to_peers.iter_mut() {
703 data.push(PeerStateEntry {
704 peer_index: peer.index,
705 public_key: peer.public_key.unwrap_or([0; 33]),
706 msg_limit_exceeded: peer.has_message_limit_exceeded(current_time),
707 invalid_blocks_received: peer.has_invalid_block_limit_exceeded(current_time),
708 same_depth_blocks_received: false,
709 too_far_blocks_received: false,
710 handshake_limit_exceeded: peer.has_handshake_limit_exceeded(current_time),
711 keylist_limit_exceeded: peer.has_key_list_limit_exceeded(current_time),
712 limited_till: None,
713 current_time,
714 peer_address: peer.ip_address.clone().unwrap_or("NA".to_string()),
715 });
716 }
717 peers
718 .peer_state_writer
719 .write_state(data, &mut self.network.io_interface)
720 .await
721 .unwrap();
722 self.peer_file_write_timer = 0;
723 *work_done = true;
724 }
725 }
726}
727
728#[async_trait]
729impl ProcessEvent<RoutingEvent> for RoutingThread {
730 async fn process_network_event(&mut self, event: NetworkEvent) -> Option<()> {
731 match event {
732 NetworkEvent::IncomingNetworkMessage { peer_index, buffer } => {
733 trace!(
734 "processing incoming network message from peer : {:?} of size : {}",
735 peer_index,
736 buffer.len()
737 );
738 {
739 let mut peers = self.network.peer_lock.write().await;
741 let peer = peers.find_peer_by_index_mut(peer_index)?;
742
743 let time: u64 = self.timer.get_timestamp_in_ms();
744 peer.message_limiter.increase();
745 if peer.has_message_limit_exceeded(time) {
746 info!(
747 "peers exceeded for messages from peer : {:?} - {:?} - rates : {:?}",
748 peer_index,
749 peer.public_key.unwrap_or([0; 33]).to_base58(),
750 peer.message_limiter
751 );
752 return None;
753 }
754 }
755 let buffer_len = buffer.len();
756 let message = Message::deserialize(buffer);
757 if message.is_err() {
758 warn!(
759 "failed deserializing msg from peer : {:?} with buffer size : {:?}. disconnecting peer",
760 peer_index, buffer_len
761 );
762 self.network
763 .io_interface
764 .disconnect_from_peer(peer_index)
765 .await
766 .unwrap();
767 return None;
768 }
769 let message = message.unwrap();
770
771 self.stats.total_incoming_messages.increment();
772 self.process_incoming_message(peer_index, message).await;
773 return Some(());
774 }
775 NetworkEvent::PeerConnectionResult { result } => {
776 if result.is_ok() {
777 let (peer_index, ip) = result.unwrap();
778 self.handle_new_peer(peer_index, ip).await;
779 return Some(());
780 }
781 }
782 NetworkEvent::AddStunPeer {
783 peer_index,
784 public_key,
785 } => {
786 self.handle_new_stun_peer(peer_index, public_key).await;
787 return Some(());
788 }
789 NetworkEvent::RemoveStunPeer { peer_index } => {
790 self.remove_stun_peer(peer_index).await;
791 return Some(());
792 }
793 NetworkEvent::PeerDisconnected {
794 peer_index,
795 disconnect_type,
796 } => {
797 self.handle_peer_disconnect(peer_index, disconnect_type)
798 .await;
799 return Some(());
800 }
801 NetworkEvent::BlockFetched {
802 block_hash,
803 block_id,
804 peer_index,
805 buffer,
806 } => {
807 debug!("block received : {:?}", block_hash.to_hex());
808 {
809 let mut peers = self.network.peer_lock.write().await;
810 let peer = peers.find_peer_by_index_mut(peer_index)?;
811 let time = self.timer.get_timestamp_in_ms();
812 if peer.has_invalid_block_limit_exceeded(time) {
813 info!(
814 "peers exceeded for invalid blocks from peer : {:?}. disconnecting peer...",
815 peer_index
816 );
817 self.network
818 .io_interface
819 .disconnect_from_peer(peer_index)
820 .await
821 .unwrap();
822 return None;
823 }
824 }
825
826 self.send_to_verification_thread(VerifyRequest::Block(
827 buffer, peer_index, block_hash, block_id,
828 ))
829 .await;
830
831 self.blockchain_sync_state.mark_as_fetched(block_hash);
832
833 self.fetch_next_blocks().await;
834
835 return Some(());
836 }
837 NetworkEvent::BlockFetchFailed {
838 block_hash,
839 peer_index,
840 block_id,
841 } => {
842 debug!("block fetch failed : {:?}", block_hash.to_hex());
843
844 self.blockchain_sync_state
845 .mark_as_failed(block_id, block_hash, peer_index);
846 }
847 _ => unreachable!(),
848 }
849 debug!("network event processed");
850 None
851 }
852 async fn process_timer_event(&mut self, duration: Duration) -> Option<()> {
853 let duration_value: Timestamp = duration.as_millis() as Timestamp;
856
857 let mut work_done = false;
858
859 const RECONNECTION_PERIOD: Timestamp = Duration::from_secs(2).as_millis() as Timestamp;
860 self.reconnection_timer += duration_value;
861 let current_time = self.timer.get_timestamp_in_ms();
862 if self.reconnection_timer >= RECONNECTION_PERIOD {
863 self.network.connect_to_static_peers(current_time).await;
864 self.network.send_pings().await;
865 self.reconnection_timer = 0;
866 self.fetch_next_blocks().await;
867 work_done = true;
868 }
869
870 const PEER_REMOVAL_TIMER_PERIOD: Timestamp =
871 Duration::from_secs(5).as_millis() as Timestamp;
872 self.peer_removal_timer += duration_value;
873 if self.peer_removal_timer >= PEER_REMOVAL_TIMER_PERIOD {
874 let mut peers = self.network.peer_lock.write().await;
875 peers.remove_disconnected_peers(current_time);
876 self.peer_removal_timer = 0;
877 work_done = true;
878 }
879
880 self.write_peer_state_data(duration_value, &mut work_done)
881 .await;
882
883 if work_done {
884 return Some(());
885 }
886 None
887 }
888
889 async fn process_event(&mut self, event: RoutingEvent) -> Option<()> {
890 match event {
891 RoutingEvent::BlockchainUpdated(block_hash) => {
892 trace!(
893 "received blockchain update event : {:?}",
894 block_hash.to_hex()
895 );
896 self.blockchain_sync_state.remove_entry(block_hash);
897 self.fetch_next_blocks().await;
898 }
899
900 RoutingEvent::BlockFetchRequest(peer_index, block_hash, block_id) => {
901 self.blockchain_sync_state
902 .add_entry(
903 block_hash,
904 block_id,
905 peer_index,
906 self.network.peer_lock.clone(),
907 )
908 .await;
909 }
910 RoutingEvent::BlockchainRequest(peer_index) => {
911 self.network
912 .request_blockchain_from_peer(peer_index, self.blockchain_lock.clone())
913 .await;
914 }
915 }
916 None
917 }
918
919 async fn on_init(&mut self) {
920 assert!(!self.senders_to_verification.is_empty());
921 self.reconnection_timer = self.timer.get_timestamp_in_ms();
922 self.network
924 .initialize_static_peers(self.config_lock.clone())
925 .await;
926 }
927 async fn on_stat_interval(&mut self, current_time: Timestamp) {
928 self.stats
929 .received_transactions
930 .calculate_stats(current_time)
931 .await;
932 self.stats
933 .received_blocks
934 .calculate_stats(current_time)
935 .await;
936 self.stats
937 .total_incoming_messages
938 .calculate_stats(current_time)
939 .await;
940
941 let stat = format!(
942 "{} - {} - capacity : {:?} / {:?}",
943 StatVariable::format_timestamp(current_time),
944 format!("{:width$}", "consensus::channel", width = 40),
945 self.sender_to_consensus.capacity(),
946 self.sender_to_consensus.max_capacity()
947 );
948 self.stat_sender.send(stat).await.unwrap();
949 for (index, sender) in self.senders_to_verification.iter().enumerate() {
950 let stat = format!(
951 "{} - {} - capacity : {:?} / {:?}",
952 StatVariable::format_timestamp(current_time),
953 format!(
954 "{:width$}",
955 format!("verification_{:?}::channel", index),
956 width = 40
957 ),
958 sender.capacity(),
959 sender.max_capacity()
960 );
961 self.stat_sender.send(stat).await.unwrap();
962 }
963
964 let stats = self.blockchain_sync_state.get_stats();
965 for stat in stats {
966 self.stat_sender.send(stat).await.unwrap();
967 }
968
969 let peers = self.network.peer_lock.read().await;
970 let mut peer_count = 0;
971 let mut peers_in_handshake = 0;
972
973 for (_, peer) in peers.index_to_peers.iter() {
974 peer_count += 1;
975
976 if peer.challenge_for_peer.is_some() {
977 peers_in_handshake += 1;
978 }
979 }
980
981 let stat = format!(
982 "{} - {} - total peers : {:?}. in handshake : {:?}",
983 StatVariable::format_timestamp(current_time),
984 format!("{:width$}", "peers::state", width = 40),
985 peer_count,
986 peers_in_handshake,
987 );
988 self.stat_sender.send(stat).await.unwrap();
989 }
990
991 fn is_ready_to_process(&self) -> bool {
992 self.sender_to_miner.capacity() > CHANNEL_SAFE_BUFFER
993 && self.sender_to_consensus.capacity() > CHANNEL_SAFE_BUFFER
994 && self
995 .senders_to_verification
996 .iter()
997 .all(|sender| sender.capacity() > CHANNEL_SAFE_BUFFER)
998 }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003 use crate::core::defs::NOLAN_PER_SAITO;
1004 use crate::core::routing_thread::RoutingThread;
1005 use crate::core::util::crypto::generate_keys;
1006 use crate::core::util::test::node_tester::test::NodeTester;
1007
1008 #[tokio::test]
1009 #[serial_test::serial]
1010 async fn test_ghost_chain_gen() {
1011 NodeTester::delete_data().await.unwrap();
1013 let peer_public_key = generate_keys().0;
1014 let mut tester = NodeTester::new(1000, None, None);
1015 tester
1016 .init_with_staking(0, 60, 100_000 * NOLAN_PER_SAITO)
1017 .await
1018 .unwrap();
1019
1020 tester.wait_till_block_id_with_txs(100, 0, 0).await.unwrap();
1021
1022 {
1023 let fork_id = tester.get_fork_id(50).await;
1024 let blockchain = tester.routing_thread.blockchain_lock.read().await;
1025
1026 let ghost_chain = RoutingThread::generate_ghost_chain(
1027 50,
1028 fork_id,
1029 &blockchain,
1030 vec![peer_public_key],
1031 &tester.routing_thread.storage,
1032 )
1033 .await;
1034
1035 assert_eq!(ghost_chain.block_ids.len(), 50);
1036 assert_eq!(ghost_chain.block_ts.len(), 50);
1037 assert_eq!(ghost_chain.gts.len(), 50);
1038 assert_eq!(ghost_chain.prehashes.len(), 50);
1039 assert_eq!(ghost_chain.previous_block_hashes.len(), 50);
1040 assert!(ghost_chain.txs.iter().all(|x| !(*x)));
1041 }
1042
1043 {
1044 let tx = tester
1045 .create_transaction(100, 10, peer_public_key)
1046 .await
1047 .unwrap();
1048 tester.add_transaction(tx).await;
1049 }
1050
1051 tester.wait_till_block_id(101).await.unwrap();
1052
1053 tester
1054 .wait_till_block_id_with_txs(105, 10, 0)
1055 .await
1056 .unwrap();
1057
1058 {
1059 let block_id = 101;
1060 let fork_id = tester.get_fork_id(block_id).await;
1061 let blockchain = tester.routing_thread.blockchain_lock.read().await;
1062 let ghost_chain = RoutingThread::generate_ghost_chain(
1063 block_id,
1064 fork_id,
1065 &blockchain,
1066 vec![peer_public_key],
1067 &tester.routing_thread.storage,
1068 )
1069 .await;
1070
1071 assert_eq!(ghost_chain.block_ids.len(), 5);
1072 assert_eq!(ghost_chain.block_ts.len(), 5);
1073 assert_eq!(ghost_chain.gts.len(), 5);
1074 assert_eq!(ghost_chain.prehashes.len(), 5);
1075 assert_eq!(ghost_chain.previous_block_hashes.len(), 5);
1076 assert_eq!(ghost_chain.txs.iter().filter(|x| **x).count(), 1);
1077 }
1078 }
1079}