saito_core/core/
verification_thread.rs

1use std::any::Any;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use log::{debug, trace, warn};
8use rayon::prelude::*;
9use tokio::sync::mpsc::Sender;
10use tokio::sync::RwLock;
11
12use crate::core::consensus::block::Block;
13use crate::core::consensus::blockchain::Blockchain;
14use crate::core::consensus::peers::peer_collection::PeerCollection;
15use crate::core::consensus::transaction::Transaction;
16use crate::core::consensus::wallet::Wallet;
17use crate::core::consensus_thread::ConsensusEvent;
18use crate::core::defs::{
19    BlockHash, BlockId, PeerIndex, PrintForLog, StatVariable, Timestamp, CHANNEL_SAFE_BUFFER,
20};
21use crate::core::io::network_event::NetworkEvent;
22use crate::core::process::process_event::ProcessEvent;
23use crate::drain;
24
25#[derive(Debug)]
26pub enum VerifyRequest {
27    Transaction(Transaction),
28    Transactions(VecDeque<Transaction>),
29    Block(Vec<u8>, PeerIndex, BlockHash, BlockId),
30}
31
32pub struct VerificationThread {
33    pub sender_to_consensus: Sender<ConsensusEvent>,
34    pub blockchain_lock: Arc<RwLock<Blockchain>>,
35    pub peer_lock: Arc<RwLock<PeerCollection>>,
36    pub wallet_lock: Arc<RwLock<Wallet>>,
37    pub processed_txs: StatVariable,
38    pub processed_blocks: StatVariable,
39    pub processed_msgs: StatVariable,
40    pub invalid_txs: StatVariable,
41    pub stat_sender: Sender<String>,
42}
43
44impl VerificationThread {
45    pub async fn verify_tx(&mut self, mut transaction: Transaction) {
46        trace!("verifying tx : {:?}", transaction.signature.to_hex());
47        let blockchain = self.blockchain_lock.read().await;
48        let wallet = self.wallet_lock.read().await;
49        let public_key = wallet.public_key;
50        transaction.generate(&public_key, 0, 0);
51
52        // TODO : should we skip validation against utxo if we don't have the full utxo ?
53        if !transaction.validate(&blockchain.utxoset, &blockchain, true) {
54            debug!(
55                "transaction : {:?} not valid",
56                transaction.signature.to_hex()
57            );
58            self.processed_txs.increment();
59            return;
60        }
61
62        self.processed_txs.increment();
63        self.processed_msgs.increment();
64        self.sender_to_consensus
65            .send(ConsensusEvent::NewTransaction { transaction })
66            .await
67            .unwrap();
68        // trace!("releasing blockchain 7");
69    }
70    pub async fn verify_txs(&mut self, transactions: &mut VecDeque<Transaction>) {
71        self.processed_txs.increment_by(transactions.len() as u64);
72        self.processed_msgs.increment_by(transactions.len() as u64);
73        let prev_count = transactions.len();
74        let txs: Vec<Transaction>;
75        {
76            // trace!("locking blockchain 8");
77            let blockchain = self.blockchain_lock.read().await;
78
79            let public_key;
80            let wallet = self.wallet_lock.read().await;
81            public_key = wallet.public_key;
82            txs = drain!(transactions, 10)
83                .filter_map(|mut transaction| {
84                    transaction.generate(&public_key, 0, 0);
85
86                    if !transaction.validate(&blockchain.utxoset, &blockchain, true) {
87                        debug!(
88                            "transaction : {:?} not valid",
89                            transaction.signature.to_hex()
90                        );
91
92                        None
93                    } else {
94                        Some(transaction)
95                    }
96                })
97                .collect();
98        }
99        // trace!("releasing blockchain 8");
100
101        let invalid_txs = prev_count - txs.len();
102        for transaction in txs {
103            self.sender_to_consensus
104                .send(ConsensusEvent::NewTransaction { transaction })
105                .await
106                .unwrap();
107        }
108        self.invalid_txs.increment_by(invalid_txs as u64);
109    }
110    pub async fn verify_block(
111        &mut self,
112        buffer: &[u8],
113        peer_index: PeerIndex,
114        block_hash: BlockHash,
115        block_id: BlockId,
116    ) {
117        // debug!("verifying block buffer of size : {:?}", buffer.len());
118        let buffer_len = buffer.len();
119        let result = Block::deserialize_from_net(buffer);
120        if result.is_err() {
121            warn!(
122                "failed verifying block buffer with length : {:?}",
123                buffer_len
124            );
125            let mut peers = self.peer_lock.write().await;
126            if let Some(peer) = peers.find_peer_by_index_mut(peer_index) {
127                // NOTE : this means if we cannot deserialize a block from the buffer we mark it as blacklisted.
128                peer.invalid_block_limiter.increase();
129            }
130
131            return;
132        }
133
134        let mut block = result.unwrap();
135        block.routed_from_peer = Some(peer_index);
136
137        block.generate().unwrap();
138
139        if block.id != block_id || block.hash != block_hash {
140            warn!(
141                "block : {:?}-{:?} fetched. but deserialized block's hash is : {:?}-{:?}",
142                block.id,
143                block.hash.to_hex(),
144                block_id,
145                block_hash.to_hex()
146            );
147            let mut peers = self.peer_lock.write().await;
148            if let Some(peer) = peers.find_peer_by_index_mut(peer_index) {
149                // NOTE : this means if we receive an invalid block, peer is blacklisted.
150                peer.invalid_block_limiter.increase();
151            }
152            return;
153        }
154
155        debug!(
156            "block : {:?}-{:?} deserialized from buffer from peer : {:?}",
157            block.id,
158            block.hash.to_hex(),
159            peer_index
160        );
161
162        self.processed_blocks.increment();
163        self.processed_msgs.increment();
164
165        self.sender_to_consensus
166            .send(ConsensusEvent::BlockFetched { peer_index, block })
167            .await
168            .unwrap();
169    }
170}
171
172#[async_trait]
173impl ProcessEvent<VerifyRequest> for VerificationThread {
174    async fn process_network_event(&mut self, _event: NetworkEvent) -> Option<()> {
175        unreachable!();
176    }
177
178    async fn process_timer_event(&mut self, _duration: Duration) -> Option<()> {
179        None
180    }
181
182    async fn process_event(&mut self, request: VerifyRequest) -> Option<()> {
183        trace!(
184            "verification thread processing event : {:?}",
185            request.type_id()
186        );
187        match request {
188            VerifyRequest::Transaction(transaction) => {
189                self.verify_tx(transaction).await;
190            }
191            VerifyRequest::Block(block, peer_index, block_hash, block_id) => {
192                self.verify_block(block.as_slice(), peer_index, block_hash, block_id)
193                    .await;
194            }
195            VerifyRequest::Transactions(mut txs) => {
196                self.verify_txs(&mut txs).await;
197            }
198        }
199
200        Some(())
201    }
202
203    async fn on_init(&mut self) {}
204
205    async fn on_stat_interval(&mut self, current_time: Timestamp) {
206        self.processed_msgs.calculate_stats(current_time).await;
207        self.invalid_txs.calculate_stats(current_time).await;
208        self.processed_txs.calculate_stats(current_time).await;
209        self.processed_blocks.calculate_stats(current_time).await;
210    }
211
212    fn is_ready_to_process(&self) -> bool {
213        self.sender_to_consensus.capacity() > CHANNEL_SAFE_BUFFER
214    }
215}