saito_core/core/
verification_thread.rs1use std::any::Any;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use log::{debug, trace, warn};
8use rayon::prelude::*;
9use tokio::sync::mpsc::Sender;
10use tokio::sync::RwLock;
11
12use crate::core::consensus::block::Block;
13use crate::core::consensus::blockchain::Blockchain;
14use crate::core::consensus::peers::peer_collection::PeerCollection;
15use crate::core::consensus::transaction::Transaction;
16use crate::core::consensus::wallet::Wallet;
17use crate::core::consensus_thread::ConsensusEvent;
18use crate::core::defs::{
19 BlockHash, BlockId, PeerIndex, PrintForLog, StatVariable, Timestamp, CHANNEL_SAFE_BUFFER,
20};
21use crate::core::io::network_event::NetworkEvent;
22use crate::core::process::process_event::ProcessEvent;
23use crate::drain;
24
25#[derive(Debug)]
26pub enum VerifyRequest {
27 Transaction(Transaction),
28 Transactions(VecDeque<Transaction>),
29 Block(Vec<u8>, PeerIndex, BlockHash, BlockId),
30}
31
32pub struct VerificationThread {
33 pub sender_to_consensus: Sender<ConsensusEvent>,
34 pub blockchain_lock: Arc<RwLock<Blockchain>>,
35 pub peer_lock: Arc<RwLock<PeerCollection>>,
36 pub wallet_lock: Arc<RwLock<Wallet>>,
37 pub processed_txs: StatVariable,
38 pub processed_blocks: StatVariable,
39 pub processed_msgs: StatVariable,
40 pub invalid_txs: StatVariable,
41 pub stat_sender: Sender<String>,
42}
43
44impl VerificationThread {
45 pub async fn verify_tx(&mut self, mut transaction: Transaction) {
46 trace!("verifying tx : {:?}", transaction.signature.to_hex());
47 let blockchain = self.blockchain_lock.read().await;
48 let wallet = self.wallet_lock.read().await;
49 let public_key = wallet.public_key;
50 transaction.generate(&public_key, 0, 0);
51
52 if !transaction.validate(&blockchain.utxoset, &blockchain, true) {
54 debug!(
55 "transaction : {:?} not valid",
56 transaction.signature.to_hex()
57 );
58 self.processed_txs.increment();
59 return;
60 }
61
62 self.processed_txs.increment();
63 self.processed_msgs.increment();
64 self.sender_to_consensus
65 .send(ConsensusEvent::NewTransaction { transaction })
66 .await
67 .unwrap();
68 }
70 pub async fn verify_txs(&mut self, transactions: &mut VecDeque<Transaction>) {
71 self.processed_txs.increment_by(transactions.len() as u64);
72 self.processed_msgs.increment_by(transactions.len() as u64);
73 let prev_count = transactions.len();
74 let txs: Vec<Transaction>;
75 {
76 let blockchain = self.blockchain_lock.read().await;
78
79 let public_key;
80 let wallet = self.wallet_lock.read().await;
81 public_key = wallet.public_key;
82 txs = drain!(transactions, 10)
83 .filter_map(|mut transaction| {
84 transaction.generate(&public_key, 0, 0);
85
86 if !transaction.validate(&blockchain.utxoset, &blockchain, true) {
87 debug!(
88 "transaction : {:?} not valid",
89 transaction.signature.to_hex()
90 );
91
92 None
93 } else {
94 Some(transaction)
95 }
96 })
97 .collect();
98 }
99 let invalid_txs = prev_count - txs.len();
102 for transaction in txs {
103 self.sender_to_consensus
104 .send(ConsensusEvent::NewTransaction { transaction })
105 .await
106 .unwrap();
107 }
108 self.invalid_txs.increment_by(invalid_txs as u64);
109 }
110 pub async fn verify_block(
111 &mut self,
112 buffer: &[u8],
113 peer_index: PeerIndex,
114 block_hash: BlockHash,
115 block_id: BlockId,
116 ) {
117 let buffer_len = buffer.len();
119 let result = Block::deserialize_from_net(buffer);
120 if result.is_err() {
121 warn!(
122 "failed verifying block buffer with length : {:?}",
123 buffer_len
124 );
125 let mut peers = self.peer_lock.write().await;
126 if let Some(peer) = peers.find_peer_by_index_mut(peer_index) {
127 peer.invalid_block_limiter.increase();
129 }
130
131 return;
132 }
133
134 let mut block = result.unwrap();
135 block.routed_from_peer = Some(peer_index);
136
137 block.generate().unwrap();
138
139 if block.id != block_id || block.hash != block_hash {
140 warn!(
141 "block : {:?}-{:?} fetched. but deserialized block's hash is : {:?}-{:?}",
142 block.id,
143 block.hash.to_hex(),
144 block_id,
145 block_hash.to_hex()
146 );
147 let mut peers = self.peer_lock.write().await;
148 if let Some(peer) = peers.find_peer_by_index_mut(peer_index) {
149 peer.invalid_block_limiter.increase();
151 }
152 return;
153 }
154
155 debug!(
156 "block : {:?}-{:?} deserialized from buffer from peer : {:?}",
157 block.id,
158 block.hash.to_hex(),
159 peer_index
160 );
161
162 self.processed_blocks.increment();
163 self.processed_msgs.increment();
164
165 self.sender_to_consensus
166 .send(ConsensusEvent::BlockFetched { peer_index, block })
167 .await
168 .unwrap();
169 }
170}
171
172#[async_trait]
173impl ProcessEvent<VerifyRequest> for VerificationThread {
174 async fn process_network_event(&mut self, _event: NetworkEvent) -> Option<()> {
175 unreachable!();
176 }
177
178 async fn process_timer_event(&mut self, _duration: Duration) -> Option<()> {
179 None
180 }
181
182 async fn process_event(&mut self, request: VerifyRequest) -> Option<()> {
183 trace!(
184 "verification thread processing event : {:?}",
185 request.type_id()
186 );
187 match request {
188 VerifyRequest::Transaction(transaction) => {
189 self.verify_tx(transaction).await;
190 }
191 VerifyRequest::Block(block, peer_index, block_hash, block_id) => {
192 self.verify_block(block.as_slice(), peer_index, block_hash, block_id)
193 .await;
194 }
195 VerifyRequest::Transactions(mut txs) => {
196 self.verify_txs(&mut txs).await;
197 }
198 }
199
200 Some(())
201 }
202
203 async fn on_init(&mut self) {}
204
205 async fn on_stat_interval(&mut self, current_time: Timestamp) {
206 self.processed_msgs.calculate_stats(current_time).await;
207 self.invalid_txs.calculate_stats(current_time).await;
208 self.processed_txs.calculate_stats(current_time).await;
209 self.processed_blocks.calculate_stats(current_time).await;
210 }
211
212 fn is_ready_to_process(&self) -> bool {
213 self.sender_to_consensus.capacity() > CHANNEL_SAFE_BUFFER
214 }
215}