saito_core/core/
routing_thread.rs

1use crate::core::consensus::block::BlockType;
2use crate::core::consensus::blockchain::Blockchain;
3use crate::core::consensus::blockchain_sync_state::BlockchainSyncState;
4use crate::core::consensus::mempool::Mempool;
5use crate::core::consensus::peers::peer_service::PeerService;
6use crate::core::consensus::peers::peer_state_writer::{PeerStateEntry, PEER_STATE_WRITE_PERIOD};
7use crate::core::consensus::wallet::Wallet;
8use crate::core::consensus_thread::ConsensusEvent;
9use crate::core::defs::{
10    BlockHash, BlockId, PeerIndex, PrintForLog, SaitoHash, SaitoPublicKey, StatVariable, Timestamp,
11    CHANNEL_SAFE_BUFFER, STAT_BIN_COUNT,
12};
13use crate::core::io::interface_io::InterfaceEvent;
14use crate::core::io::network::{Network, PeerDisconnectType};
15use crate::core::io::network_event::NetworkEvent;
16use crate::core::io::storage::Storage;
17use crate::core::mining_thread::MiningEvent;
18use crate::core::msg::block_request::BlockchainRequest;
19use crate::core::msg::ghost_chain_sync::GhostChainSync;
20use crate::core::msg::message::Message;
21use crate::core::process::keep_time::Timer;
22use crate::core::process::process_event::ProcessEvent;
23use crate::core::process::version::Version;
24use crate::core::util;
25use crate::core::util::configuration::Configuration;
26use crate::core::util::crypto::hash;
27use crate::core::verification_thread::VerifyRequest;
28use async_trait::async_trait;
29use log::{debug, error, info, trace, warn};
30use std::ops::Deref;
31use std::sync::Arc;
32use std::time::Duration;
33use tokio::sync::mpsc::Sender;
34use tokio::sync::RwLock;
35
36#[derive(Debug)]
37pub enum RoutingEvent {
38    BlockchainUpdated(BlockHash),
39    BlockFetchRequest(PeerIndex, BlockHash, BlockId),
40    BlockchainRequest(PeerIndex),
41}
42
43#[derive(Debug)]
44pub enum PeerState {
45    Connected,
46    Connecting,
47    Disconnected,
48}
49
50pub struct StaticPeer {
51    pub peer_details: util::configuration::PeerConfig,
52    pub peer_state: PeerState,
53    pub peer_index: u64,
54}
55
56pub struct RoutingStats {
57    pub received_transactions: StatVariable,
58    pub received_blocks: StatVariable,
59    pub total_incoming_messages: StatVariable,
60}
61
62impl RoutingStats {
63    pub fn new(sender: Sender<String>) -> Self {
64        RoutingStats {
65            received_transactions: StatVariable::new(
66                "routing::received_txs".to_string(),
67                STAT_BIN_COUNT,
68                sender.clone(),
69            ),
70            received_blocks: StatVariable::new(
71                "routing::received_blocks".to_string(),
72                STAT_BIN_COUNT,
73                sender.clone(),
74            ),
75            total_incoming_messages: StatVariable::new(
76                "routing::incoming_msgs".to_string(),
77                STAT_BIN_COUNT,
78                sender,
79            ),
80        }
81    }
82}
83
84/// Manages peers and routes messages to correct controller
85pub struct RoutingThread {
86    pub blockchain_lock: Arc<RwLock<Blockchain>>,
87    pub mempool_lock: Arc<RwLock<Mempool>>,
88    pub sender_to_consensus: Sender<ConsensusEvent>,
89    pub sender_to_miner: Sender<MiningEvent>,
90    pub config_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
91    pub timer: Timer,
92    pub wallet_lock: Arc<RwLock<Wallet>>,
93    pub network: Network,
94    pub storage: Storage,
95    pub reconnection_timer: Timestamp,
96    pub peer_removal_timer: Timestamp,
97    pub peer_file_write_timer: Timestamp,
98    pub last_emitted_block_fetch_count: BlockId,
99    pub stats: RoutingStats,
100    pub senders_to_verification: Vec<Sender<VerifyRequest>>,
101    pub last_verification_thread_index: usize,
102    pub stat_sender: Sender<String>,
103    pub blockchain_sync_state: BlockchainSyncState,
104}
105
106impl RoutingThread {
107    ///
108    ///
109    /// # Arguments
110    ///
111    /// * `peer_index`:
112    /// * `message`:
113    ///
114    /// returns: ()
115    ///
116    /// # Examples
117    ///
118    /// ```
119    ///
120    /// ```
121    async fn process_incoming_message(&mut self, peer_index: PeerIndex, message: Message) {
122        self.network.update_peer_timer(peer_index).await;
123        match message {
124            Message::HandshakeChallenge(challenge) => {
125                debug!("received handshake challenge from peer : {:?}", peer_index);
126                self.network
127                    .handle_handshake_challenge(
128                        peer_index,
129                        challenge,
130                        self.wallet_lock.clone(),
131                        self.config_lock.clone(),
132                    )
133                    .await;
134            }
135            Message::HandshakeResponse(response) => {
136                trace!("received handshake response from peer : {:?}", peer_index);
137                self.network
138                    .handle_handshake_response(
139                        peer_index,
140                        response,
141                        self.wallet_lock.clone(),
142                        self.blockchain_lock.clone(),
143                        self.config_lock.clone(),
144                    )
145                    .await;
146            }
147
148            Message::Transaction(transaction) => {
149                trace!(
150                    "received transaction : {} from peer : {:?}",
151                    transaction.signature.to_hex(),
152                    peer_index
153                );
154                self.stats.received_transactions.increment();
155                self.send_to_verification_thread(VerifyRequest::Transaction(transaction))
156                    .await;
157            }
158            Message::BlockchainRequest(request) => {
159                trace!(
160                    "received blockchain request from peer : {:?} with block id : {:?} and hash : {:?}",
161                    peer_index,
162                    request.latest_block_id,
163                    request.latest_block_hash.to_hex()
164                );
165                {
166                    let configs = self.config_lock.read().await;
167                    if configs.is_browser() || configs.is_spv_mode() {
168                        // not processing incoming blockchain request. since we cannot provide any blocks
169                        return;
170                    }
171                }
172                self.process_incoming_blockchain_request(request, peer_index)
173                    .await;
174            }
175            Message::BlockHeaderHash(hash, block_id) => {
176                trace!(
177                    "received block header hash from peer : {:?} with block id : {:?} and hash : {:?}",
178                    peer_index,
179                    block_id,
180                    hash.to_hex()
181                );
182                self.process_incoming_block_hash(hash, block_id, peer_index)
183                    .await;
184            }
185            Message::Ping() => {}
186            Message::SPVChain() => {}
187            Message::Services(services) => {
188                self.process_peer_services(services, peer_index).await;
189            }
190            Message::GhostChain(chain) => {
191                self.process_ghost_chain(chain, peer_index).await;
192            }
193            Message::GhostChainRequest(block_id, block_hash, fork_id) => {
194                self.process_ghost_chain_request(block_id, block_hash, fork_id, peer_index)
195                    .await;
196            }
197            Message::ApplicationMessage(api_message) => {
198                trace!(
199                    "processing application msg with buffer size : {:?} from peer : {:?}",
200                    api_message.data.len(),
201                    peer_index
202                );
203                self.network
204                    .io_interface
205                    .process_api_call(api_message.data, api_message.msg_index, peer_index)
206                    .await;
207            }
208            Message::Result(api_message) => {
209                self.network
210                    .io_interface
211                    .process_api_success(api_message.data, api_message.msg_index, peer_index)
212                    .await;
213            }
214            Message::Error(api_message) => {
215                self.network
216                    .io_interface
217                    .process_api_error(api_message.data, api_message.msg_index, peer_index)
218                    .await;
219            }
220            Message::KeyListUpdate(key_list) => {
221                self.network
222                    .handle_received_key_list(peer_index, key_list)
223                    .await
224                    .unwrap();
225            }
226            Message::Block(_) => {
227                error!("received block message");
228                unreachable!();
229            }
230        }
231    }
232    /// Processes a received ghost chain request from a peer to sync itself with the blockchain
233    ///
234    /// # Arguments
235    ///
236    /// * `block_id`:
237    /// * `block_hash`:
238    /// * `fork_id`:
239    /// * `peer_index`:
240    ///
241    /// returns: ()
242    ///
243    /// # Examples
244    ///
245    /// ```
246    ///
247    /// ```
248    async fn process_ghost_chain_request(
249        &self,
250        block_id: u64,
251        block_hash: SaitoHash,
252        fork_id: SaitoHash,
253        peer_index: u64,
254    ) {
255        debug!("processing ghost chain request from peer : {:?}. block_id : {:?} block_hash: {:?} fork_id: {:?}",
256            peer_index,
257            block_id,
258            block_hash.to_hex(),
259            fork_id.to_hex()
260        );
261        let blockchain = self.blockchain_lock.read().await;
262        let mut peer_key_list: Vec<SaitoPublicKey> = vec![];
263        {
264            let peers = self.network.peer_lock.read().await;
265            let peer = peers.find_peer_by_index(peer_index).unwrap();
266            peer_key_list.push(peer.public_key.unwrap());
267            peer_key_list.append(&mut peer.key_list.clone());
268        }
269
270        let ghost = Self::generate_ghost_chain(
271            block_id,
272            fork_id,
273            &blockchain,
274            peer_key_list,
275            &self.storage,
276        )
277        .await;
278
279        debug!("sending ghost chain to peer : {:?}", peer_index);
280        // debug!("ghost : {:?}", ghost);
281        let buffer = Message::GhostChain(ghost).serialize();
282        self.network
283            .io_interface
284            .send_message(peer_index, buffer.as_slice())
285            .await
286            .unwrap();
287    }
288
289    pub(crate) async fn generate_ghost_chain(
290        block_id: u64,
291        fork_id: SaitoHash,
292        blockchain: &Blockchain,
293        peer_key_list: Vec<SaitoPublicKey>,
294        storage: &Storage,
295    ) -> GhostChainSync {
296        debug!(
297            "generating ghost chain for block_id : {:?} fork_id : {:?}",
298            block_id,
299            fork_id.to_hex()
300        );
301        let mut last_shared_ancestor = blockchain.generate_last_shared_ancestor(block_id, fork_id);
302
303        debug!("last_shared_ancestor 1 : {:?}", last_shared_ancestor);
304
305        debug!(
306            "peer key list: {:?}",
307            peer_key_list
308                .iter()
309                .map(|pk| pk.to_base58())
310                .collect::<Vec<String>>()
311        );
312
313        if last_shared_ancestor == 0 {
314            // if we cannot find the last shared ancestor in a long chain, we just need to sync from peer's block id
315            last_shared_ancestor = block_id;
316        }
317
318        let start = blockchain
319            .blockring
320            .get_longest_chain_block_hash_at_block_id(last_shared_ancestor)
321            .unwrap_or([0; 32]);
322
323        let latest_block_id = blockchain.blockring.get_latest_block_id();
324        debug!("latest_block_id : {:?}", latest_block_id);
325        debug!("last_shared_ancestor : {:?}", last_shared_ancestor);
326        debug!("start : {:?}", start.to_hex());
327
328        let mut ghost = GhostChainSync {
329            start,
330            prehashes: vec![],
331            previous_block_hashes: vec![],
332            block_ids: vec![],
333            block_ts: vec![],
334            txs: vec![],
335            gts: vec![],
336        };
337        for i in (last_shared_ancestor + 1)..=latest_block_id {
338            if let Some(hash) = blockchain
339                .blockring
340                .get_longest_chain_block_hash_at_block_id(i)
341            {
342                let block = blockchain.get_block(&hash);
343                if let Some(block) = block {
344                    ghost.gts.push(block.has_golden_ticket);
345                    ghost.block_ts.push(block.timestamp);
346                    ghost.prehashes.push(block.pre_hash);
347                    ghost.previous_block_hashes.push(block.previous_block_hash);
348                    ghost.block_ids.push(block.id);
349
350                    let mut clone = block.clone();
351                    if !clone
352                        .upgrade_block_to_block_type(BlockType::Full, storage, false)
353                        .await
354                    {
355                        warn!(
356                            "couldn't upgrade block : {:?}-{:?} for ghost chain generation",
357                            clone.id,
358                            clone.hash.to_hex()
359                        );
360                    }
361                    debug!(
362                        "pushing block : {:?} at index : {:?} with txs : {:?} has txs : {:?}",
363                        clone.hash.to_hex(),
364                        i,
365                        clone.transactions.len(),
366                        clone.has_keylist_txs(&peer_key_list)
367                    );
368                    // whether this block has any txs which the peer will be interested in
369                    ghost.txs.push(clone.has_keylist_txs(&peer_key_list));
370                }
371            }
372        }
373        ghost
374    }
375
376    async fn handle_new_peer(&mut self, peer_index: u64, ip: Option<String>) {
377        trace!("handling new peer : {:?}", peer_index);
378        self.network.handle_new_peer(peer_index, ip).await;
379    }
380
381    async fn handle_new_stun_peer(&mut self, peer_index: u64, public_key: SaitoPublicKey) {
382        trace!("handling new stun peer : {:?}", peer_index);
383        self.network
384            .handle_new_stun_peer(peer_index, public_key)
385            .await;
386    }
387
388    async fn remove_stun_peer(&mut self, peer_index: u64) {
389        trace!("removing stun peer : {:?}", peer_index);
390        self.network.remove_stun_peer(peer_index).await;
391    }
392
393    async fn handle_peer_disconnect(
394        &mut self,
395        peer_index: u64,
396        disconnect_type: PeerDisconnectType,
397    ) {
398        trace!("handling peer disconnect, peer_index = {}", peer_index);
399        self.network
400            .handle_peer_disconnect(peer_index, disconnect_type)
401            .await;
402    }
403    pub async fn set_my_key_list(&mut self, mut key_list: Vec<SaitoPublicKey>) {
404        let mut wallet = self.wallet_lock.write().await;
405
406        key_list.sort();
407        // check if key list is different from what we already have
408        if wallet
409            .key_list
410            .iter()
411            .zip(key_list.iter())
412            .any(|(a, b)| a != b)
413        {
414            wallet.set_key_list(key_list);
415            self.network.send_key_list(&wallet.key_list).await;
416        }
417    }
418
419    pub async fn process_incoming_blockchain_request(
420        &self,
421        request: BlockchainRequest,
422        peer_index: u64,
423    ) {
424        debug!(
425            "processing incoming blockchain request : {:?}-{:?}-{:?} from peer : {:?}",
426            request.latest_block_id,
427            request.latest_block_hash.to_hex(),
428            request.fork_id.to_hex(),
429            peer_index
430        );
431        // TODO : can we ignore the functionality if it's a lite node ?
432
433        let blockchain = self.blockchain_lock.read().await;
434
435        let last_shared_ancestor =
436            blockchain.generate_last_shared_ancestor(request.latest_block_id, request.fork_id);
437        debug!(
438            "last shared ancestor = {:?} latest_id = {:?}",
439            last_shared_ancestor,
440            blockchain.blockring.get_latest_block_id()
441        );
442
443        for i in last_shared_ancestor..(blockchain.blockring.get_latest_block_id() + 1) {
444            if let Some(block_hash) = blockchain
445                .blockring
446                .get_longest_chain_block_hash_at_block_id(i)
447            {
448                trace!(
449                    "sending block header hash: {:?}-{:?} to peer : {:?}",
450                    i,
451                    block_hash.to_hex(),
452                    peer_index
453                );
454                let buffer = Message::BlockHeaderHash(block_hash, i).serialize();
455                self.network
456                    .io_interface
457                    .send_message(peer_index, buffer.as_slice())
458                    .await
459                    .unwrap();
460            } else {
461                continue;
462            }
463        }
464    }
465    async fn process_incoming_block_hash(
466        &mut self,
467        block_hash: SaitoHash,
468        block_id: u64,
469        peer_index: u64,
470    ) {
471        debug!(
472            "processing incoming block hash : {:?}-{:?} from peer : {:?}",
473            block_id,
474            block_hash.to_hex(),
475            peer_index
476        );
477        {
478            // trace!("locking blockchain 6");
479            let blockchain = self.blockchain_lock.read().await;
480            if !blockchain.blocks.is_empty() && blockchain.lowest_acceptable_block_id >= block_id {
481                debug!("skipping block header : {:?}-{:?} from peer : {:?} since our lowest acceptable id : {:?}",block_id,block_hash.to_hex(),peer_index, blockchain.lowest_acceptable_block_id);
482                return;
483            }
484        }
485        // trace!("releasing blockchain 6");
486
487        let peers = self.network.peer_lock.read().await;
488        let wallet = self.wallet_lock.read().await;
489
490        if let Some(peer) = peers.index_to_peers.get(&peer_index) {
491            // TODO : check if this check can be removed from here, since network.rs also have the same check
492            if wallet.wallet_version > peer.wallet_version
493                && peer.wallet_version != Version::new(0, 0, 0)
494            {
495                warn!(
496                    "Not Fetching Block: {:?} from peer :{:?} since peer version is old. expected: {:?} actual {:?} ",
497                    block_hash.to_hex(), peer.index, wallet.wallet_version, peer.wallet_version
498                );
499                return;
500            }
501        }
502
503        drop(peers);
504        drop(wallet);
505
506        self.blockchain_sync_state
507            .add_entry(
508                block_hash,
509                block_id,
510                peer_index,
511                self.network.peer_lock.clone(),
512            )
513            .await;
514
515        self.fetch_next_blocks().await;
516    }
517
518    async fn fetch_next_blocks(&mut self) -> bool {
519        let mut work_done = false;
520        {
521            let blockchain = self.blockchain_lock.read().await;
522            self.blockchain_sync_state
523                .build_peer_block_picture(&blockchain);
524        }
525
526        let map = self.blockchain_sync_state.get_blocks_to_fetch_per_peer();
527
528        let fetching_count = self.blockchain_sync_state.get_fetching_block_count();
529        // trace!("fetching next blocks : {:?} from peers", fetching_count);
530        self.network
531            .io_interface
532            .send_interface_event(InterfaceEvent::BlockFetchStatus(fetching_count as BlockId));
533
534        let mut fetched_blocks: Vec<(PeerIndex, SaitoHash)> = Default::default();
535        for (peer_index, vec) in map {
536            for (hash, block_id) in vec.iter() {
537                work_done = true;
538                let result = self
539                    .network
540                    .process_incoming_block_hash(
541                        *hash,
542                        *block_id,
543                        peer_index,
544                        self.blockchain_lock.clone(),
545                        self.mempool_lock.clone(),
546                    )
547                    .await;
548                if result.is_some() {
549                    fetched_blocks.push((peer_index, *hash));
550                } else {
551                    // if we already have the block added don't need to request it from peer
552                    self.blockchain_sync_state.remove_entry(*hash);
553                }
554            }
555        }
556        work_done
557    }
558    async fn send_to_verification_thread(&mut self, request: VerifyRequest) {
559        // waiting till we get an acceptable sender
560        let sender_count = self.senders_to_verification.len();
561        let mut trials = 0;
562        loop {
563            trials += 1;
564            self.last_verification_thread_index += 1;
565            let sender_index: usize = self.last_verification_thread_index % sender_count;
566            let sender = self
567                .senders_to_verification
568                .get(sender_index)
569                .expect("sender should be here as we are using the modulus on index");
570
571            if sender.capacity() > 0 {
572                trace!("sending to verification thread : {:?}", sender_index);
573                sender.send(request).await.unwrap();
574
575                return;
576            }
577            trace!(
578                "verification thread sender : {:?} is full. capacity : {:?} max capacity : {:?}",
579                sender_index,
580                sender.capacity(),
581                sender.max_capacity()
582            );
583            if trials == sender_count {
584                // todo : if all the channels are full, we should wait here. cannot sleep to support wasm interface
585                trials = 0;
586            }
587        }
588    }
589    async fn process_ghost_chain(&mut self, chain: GhostChainSync, peer_index: u64) {
590        debug!("processing ghost chain from peer : {:?}", peer_index);
591
592        let mut previous_block_hash = chain.start;
593        let configs = self.config_lock.read().await;
594        let mut blockchain = self.blockchain_lock.write().await;
595        let mut lowest_id_to_reorg = 0;
596        let mut lowest_hash_to_reorg = [0; 32];
597        let mut need_blocks_fetched = false;
598        for i in 0..chain.prehashes.len() {
599            let buf = [
600                previous_block_hash.as_slice(),
601                chain.prehashes[i].as_slice(),
602            ]
603            .concat();
604            let block_hash = hash(&buf);
605            if chain.txs[i] {
606                debug!(
607                    "ghost block : {:?} has txs for me. fetching from peer : {:?}",
608                    block_hash.to_hex(),
609                    peer_index
610                );
611                self.blockchain_sync_state
612                    .add_entry(
613                        block_hash,
614                        chain.block_ids[i],
615                        peer_index,
616                        self.network.peer_lock.clone(),
617                    )
618                    .await;
619                need_blocks_fetched = true;
620            } else {
621                if !need_blocks_fetched {
622                    lowest_id_to_reorg = chain.block_ids[i];
623                    lowest_hash_to_reorg = block_hash;
624                }
625                debug!(
626                    "ghost block : {:?} doesn't have txs for me. not fetching",
627                    block_hash.to_hex()
628                );
629                blockchain.add_ghost_block(
630                    chain.block_ids[i],
631                    chain.previous_block_hashes[i],
632                    chain.block_ts[i],
633                    chain.prehashes[i],
634                    chain.gts[i],
635                    block_hash,
636                );
637            }
638            previous_block_hash = block_hash;
639        }
640        debug!(
641            "calling reorg with lowest values : {:?}-{:?}",
642            lowest_id_to_reorg,
643            lowest_hash_to_reorg.to_hex()
644        );
645
646        if lowest_id_to_reorg != 0 {
647            blockchain.blockring.on_chain_reorganization(
648                lowest_id_to_reorg,
649                lowest_hash_to_reorg,
650                true,
651            );
652            blockchain
653                .on_chain_reorganization(
654                    lowest_id_to_reorg,
655                    lowest_hash_to_reorg,
656                    true,
657                    &self.storage,
658                    configs.deref(),
659                )
660                .await;
661
662            if let Some(fork_id) = blockchain.generate_fork_id(blockchain.last_block_id) {
663                if fork_id != [0; 32] {
664                    blockchain.set_fork_id(fork_id);
665                }
666            } else {
667                // blockchain.set_fork_id([0; 32]);
668                trace!(
669                    "fork id not generated for last block id : {:?} after ghost chain processing",
670                    blockchain.last_block_id
671                );
672            }
673            self.network
674                .io_interface
675                .send_interface_event(InterfaceEvent::BlockAddSuccess(
676                    lowest_hash_to_reorg,
677                    lowest_id_to_reorg,
678                ));
679        }
680    }
681
682    // TODO : remove if not required
683    async fn process_peer_services(&mut self, services: Vec<PeerService>, peer_index: u64) {
684        let mut peers = self.network.peer_lock.write().await;
685        let peer = peers.index_to_peers.get_mut(&peer_index);
686        if peer.is_some() {
687            let peer = peer.unwrap();
688            peer.services = services;
689        } else {
690            warn!("peer {:?} not found to update services", peer_index);
691        }
692    }
693
694    async fn write_peer_state_data(&mut self, duration_value: Timestamp, work_done: &mut bool) {
695        self.peer_file_write_timer += duration_value;
696        if self.peer_file_write_timer >= PEER_STATE_WRITE_PERIOD {
697            let mut peers = self.network.peer_lock.write().await;
698            let mut data: Vec<PeerStateEntry> = Default::default();
699
700            let current_time = self.timer.get_timestamp_in_ms();
701
702            for (_, peer) in peers.index_to_peers.iter_mut() {
703                data.push(PeerStateEntry {
704                    peer_index: peer.index,
705                    public_key: peer.public_key.unwrap_or([0; 33]),
706                    msg_limit_exceeded: peer.has_message_limit_exceeded(current_time),
707                    invalid_blocks_received: peer.has_invalid_block_limit_exceeded(current_time),
708                    same_depth_blocks_received: false,
709                    too_far_blocks_received: false,
710                    handshake_limit_exceeded: peer.has_handshake_limit_exceeded(current_time),
711                    keylist_limit_exceeded: peer.has_key_list_limit_exceeded(current_time),
712                    limited_till: None,
713                    current_time,
714                    peer_address: peer.ip_address.clone().unwrap_or("NA".to_string()),
715                });
716            }
717            peers
718                .peer_state_writer
719                .write_state(data, &mut self.network.io_interface)
720                .await
721                .unwrap();
722            self.peer_file_write_timer = 0;
723            *work_done = true;
724        }
725    }
726}
727
728#[async_trait]
729impl ProcessEvent<RoutingEvent> for RoutingThread {
730    async fn process_network_event(&mut self, event: NetworkEvent) -> Option<()> {
731        match event {
732            NetworkEvent::IncomingNetworkMessage { peer_index, buffer } => {
733                trace!(
734                    "processing incoming network message from peer : {:?} of size : {}",
735                    peer_index,
736                    buffer.len()
737                );
738                {
739                    // TODO : move this before deserialization to avoid spending CPU time on it. moved here to just print message type
740                    let mut peers = self.network.peer_lock.write().await;
741                    let peer = peers.find_peer_by_index_mut(peer_index)?;
742
743                    let time: u64 = self.timer.get_timestamp_in_ms();
744                    peer.message_limiter.increase();
745                    if peer.has_message_limit_exceeded(time) {
746                        info!(
747                            "peers exceeded for messages from peer : {:?} - {:?} - rates : {:?}",
748                            peer_index,
749                            peer.public_key.unwrap_or([0; 33]).to_base58(),
750                            peer.message_limiter
751                        );
752                        return None;
753                    }
754                }
755                let buffer_len = buffer.len();
756                let message = Message::deserialize(buffer);
757                if message.is_err() {
758                    warn!(
759                        "failed deserializing msg from peer : {:?} with buffer size : {:?}. disconnecting peer",
760                        peer_index, buffer_len
761                    );
762                    self.network
763                        .io_interface
764                        .disconnect_from_peer(peer_index)
765                        .await
766                        .unwrap();
767                    return None;
768                }
769                let message = message.unwrap();
770
771                self.stats.total_incoming_messages.increment();
772                self.process_incoming_message(peer_index, message).await;
773                return Some(());
774            }
775            NetworkEvent::PeerConnectionResult { result } => {
776                if result.is_ok() {
777                    let (peer_index, ip) = result.unwrap();
778                    self.handle_new_peer(peer_index, ip).await;
779                    return Some(());
780                }
781            }
782            NetworkEvent::AddStunPeer {
783                peer_index,
784                public_key,
785            } => {
786                self.handle_new_stun_peer(peer_index, public_key).await;
787                return Some(());
788            }
789            NetworkEvent::RemoveStunPeer { peer_index } => {
790                self.remove_stun_peer(peer_index).await;
791                return Some(());
792            }
793            NetworkEvent::PeerDisconnected {
794                peer_index,
795                disconnect_type,
796            } => {
797                self.handle_peer_disconnect(peer_index, disconnect_type)
798                    .await;
799                return Some(());
800            }
801            NetworkEvent::BlockFetched {
802                block_hash,
803                block_id,
804                peer_index,
805                buffer,
806            } => {
807                debug!("block received : {:?}", block_hash.to_hex());
808                {
809                    let mut peers = self.network.peer_lock.write().await;
810                    let peer = peers.find_peer_by_index_mut(peer_index)?;
811                    let time = self.timer.get_timestamp_in_ms();
812                    if peer.has_invalid_block_limit_exceeded(time) {
813                        info!(
814                            "peers exceeded for invalid blocks from peer : {:?}. disconnecting peer...",
815                            peer_index
816                        );
817                        self.network
818                            .io_interface
819                            .disconnect_from_peer(peer_index)
820                            .await
821                            .unwrap();
822                        return None;
823                    }
824                }
825
826                self.send_to_verification_thread(VerifyRequest::Block(
827                    buffer, peer_index, block_hash, block_id,
828                ))
829                .await;
830
831                self.blockchain_sync_state.mark_as_fetched(block_hash);
832
833                self.fetch_next_blocks().await;
834
835                return Some(());
836            }
837            NetworkEvent::BlockFetchFailed {
838                block_hash,
839                peer_index,
840                block_id,
841            } => {
842                debug!("block fetch failed : {:?}", block_hash.to_hex());
843
844                self.blockchain_sync_state
845                    .mark_as_failed(block_id, block_hash, peer_index);
846            }
847            _ => unreachable!(),
848        }
849        debug!("network event processed");
850        None
851    }
852    async fn process_timer_event(&mut self, duration: Duration) -> Option<()> {
853        // trace!("processing timer event : {:?}", duration.as_micros());
854
855        let duration_value: Timestamp = duration.as_millis() as Timestamp;
856
857        let mut work_done = false;
858
859        const RECONNECTION_PERIOD: Timestamp = Duration::from_secs(2).as_millis() as Timestamp;
860        self.reconnection_timer += duration_value;
861        let current_time = self.timer.get_timestamp_in_ms();
862        if self.reconnection_timer >= RECONNECTION_PERIOD {
863            self.network.connect_to_static_peers(current_time).await;
864            self.network.send_pings().await;
865            self.reconnection_timer = 0;
866            self.fetch_next_blocks().await;
867            work_done = true;
868        }
869
870        const PEER_REMOVAL_TIMER_PERIOD: Timestamp =
871            Duration::from_secs(5).as_millis() as Timestamp;
872        self.peer_removal_timer += duration_value;
873        if self.peer_removal_timer >= PEER_REMOVAL_TIMER_PERIOD {
874            let mut peers = self.network.peer_lock.write().await;
875            peers.remove_disconnected_peers(current_time);
876            self.peer_removal_timer = 0;
877            work_done = true;
878        }
879
880        self.write_peer_state_data(duration_value, &mut work_done)
881            .await;
882
883        if work_done {
884            return Some(());
885        }
886        None
887    }
888
889    async fn process_event(&mut self, event: RoutingEvent) -> Option<()> {
890        match event {
891            RoutingEvent::BlockchainUpdated(block_hash) => {
892                trace!(
893                    "received blockchain update event : {:?}",
894                    block_hash.to_hex()
895                );
896                self.blockchain_sync_state.remove_entry(block_hash);
897                self.fetch_next_blocks().await;
898            }
899
900            RoutingEvent::BlockFetchRequest(peer_index, block_hash, block_id) => {
901                self.blockchain_sync_state
902                    .add_entry(
903                        block_hash,
904                        block_id,
905                        peer_index,
906                        self.network.peer_lock.clone(),
907                    )
908                    .await;
909            }
910            RoutingEvent::BlockchainRequest(peer_index) => {
911                self.network
912                    .request_blockchain_from_peer(peer_index, self.blockchain_lock.clone())
913                    .await;
914            }
915        }
916        None
917    }
918
919    async fn on_init(&mut self) {
920        assert!(!self.senders_to_verification.is_empty());
921        self.reconnection_timer = self.timer.get_timestamp_in_ms();
922        // connect to peers
923        self.network
924            .initialize_static_peers(self.config_lock.clone())
925            .await;
926    }
927    async fn on_stat_interval(&mut self, current_time: Timestamp) {
928        self.stats
929            .received_transactions
930            .calculate_stats(current_time)
931            .await;
932        self.stats
933            .received_blocks
934            .calculate_stats(current_time)
935            .await;
936        self.stats
937            .total_incoming_messages
938            .calculate_stats(current_time)
939            .await;
940
941        let stat = format!(
942            "{} - {} - capacity : {:?} / {:?}",
943            StatVariable::format_timestamp(current_time),
944            format!("{:width$}", "consensus::channel", width = 40),
945            self.sender_to_consensus.capacity(),
946            self.sender_to_consensus.max_capacity()
947        );
948        self.stat_sender.send(stat).await.unwrap();
949        for (index, sender) in self.senders_to_verification.iter().enumerate() {
950            let stat = format!(
951                "{} - {} - capacity : {:?} / {:?}",
952                StatVariable::format_timestamp(current_time),
953                format!(
954                    "{:width$}",
955                    format!("verification_{:?}::channel", index),
956                    width = 40
957                ),
958                sender.capacity(),
959                sender.max_capacity()
960            );
961            self.stat_sender.send(stat).await.unwrap();
962        }
963
964        let stats = self.blockchain_sync_state.get_stats();
965        for stat in stats {
966            self.stat_sender.send(stat).await.unwrap();
967        }
968
969        let peers = self.network.peer_lock.read().await;
970        let mut peer_count = 0;
971        let mut peers_in_handshake = 0;
972
973        for (_, peer) in peers.index_to_peers.iter() {
974            peer_count += 1;
975
976            if peer.challenge_for_peer.is_some() {
977                peers_in_handshake += 1;
978            }
979        }
980
981        let stat = format!(
982            "{} - {} - total peers : {:?}. in handshake : {:?}",
983            StatVariable::format_timestamp(current_time),
984            format!("{:width$}", "peers::state", width = 40),
985            peer_count,
986            peers_in_handshake,
987        );
988        self.stat_sender.send(stat).await.unwrap();
989    }
990
991    fn is_ready_to_process(&self) -> bool {
992        self.sender_to_miner.capacity() > CHANNEL_SAFE_BUFFER
993            && self.sender_to_consensus.capacity() > CHANNEL_SAFE_BUFFER
994            && self
995                .senders_to_verification
996                .iter()
997                .all(|sender| sender.capacity() > CHANNEL_SAFE_BUFFER)
998    }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003    use crate::core::defs::NOLAN_PER_SAITO;
1004    use crate::core::routing_thread::RoutingThread;
1005    use crate::core::util::crypto::generate_keys;
1006    use crate::core::util::test::node_tester::test::NodeTester;
1007
1008    #[tokio::test]
1009    #[serial_test::serial]
1010    async fn test_ghost_chain_gen() {
1011        // pretty_env_logger::init();
1012        NodeTester::delete_data().await.unwrap();
1013        let peer_public_key = generate_keys().0;
1014        let mut tester = NodeTester::new(1000, None, None);
1015        tester
1016            .init_with_staking(0, 60, 100_000 * NOLAN_PER_SAITO)
1017            .await
1018            .unwrap();
1019
1020        tester.wait_till_block_id_with_txs(100, 0, 0).await.unwrap();
1021
1022        {
1023            let fork_id = tester.get_fork_id(50).await;
1024            let blockchain = tester.routing_thread.blockchain_lock.read().await;
1025
1026            let ghost_chain = RoutingThread::generate_ghost_chain(
1027                50,
1028                fork_id,
1029                &blockchain,
1030                vec![peer_public_key],
1031                &tester.routing_thread.storage,
1032            )
1033            .await;
1034
1035            assert_eq!(ghost_chain.block_ids.len(), 50);
1036            assert_eq!(ghost_chain.block_ts.len(), 50);
1037            assert_eq!(ghost_chain.gts.len(), 50);
1038            assert_eq!(ghost_chain.prehashes.len(), 50);
1039            assert_eq!(ghost_chain.previous_block_hashes.len(), 50);
1040            assert!(ghost_chain.txs.iter().all(|x| !(*x)));
1041        }
1042
1043        {
1044            let tx = tester
1045                .create_transaction(100, 10, peer_public_key)
1046                .await
1047                .unwrap();
1048            tester.add_transaction(tx).await;
1049        }
1050
1051        tester.wait_till_block_id(101).await.unwrap();
1052
1053        tester
1054            .wait_till_block_id_with_txs(105, 10, 0)
1055            .await
1056            .unwrap();
1057
1058        {
1059            let block_id = 101;
1060            let fork_id = tester.get_fork_id(block_id).await;
1061            let blockchain = tester.routing_thread.blockchain_lock.read().await;
1062            let ghost_chain = RoutingThread::generate_ghost_chain(
1063                block_id,
1064                fork_id,
1065                &blockchain,
1066                vec![peer_public_key],
1067                &tester.routing_thread.storage,
1068            )
1069            .await;
1070
1071            assert_eq!(ghost_chain.block_ids.len(), 5);
1072            assert_eq!(ghost_chain.block_ts.len(), 5);
1073            assert_eq!(ghost_chain.gts.len(), 5);
1074            assert_eq!(ghost_chain.prehashes.len(), 5);
1075            assert_eq!(ghost_chain.previous_block_hashes.len(), 5);
1076            assert_eq!(ghost_chain.txs.iter().filter(|x| **x).count(), 1);
1077        }
1078    }
1079}