1use std::collections::vec_deque::VecDeque;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ahash::AHashMap;
6use log::{debug, info, trace, warn};
7use primitive_types::U256;
8use rayon::prelude::*;
9use tokio::sync::RwLock;
10
11use crate::core::consensus::block::Block;
12use crate::core::consensus::blockchain::Blockchain;
13use crate::core::consensus::burnfee::BurnFee;
14use crate::core::consensus::golden_ticket::GoldenTicket;
15use crate::core::consensus::transaction::{Transaction, TransactionType};
16use crate::core::consensus::wallet::Wallet;
17use crate::core::defs::SaitoUTXOSetKey;
18use crate::core::defs::{
19 Currency, PrintForLog, SaitoHash, SaitoPublicKey, SaitoSignature, StatVariable, Timestamp,
20};
21use crate::core::io::storage::Storage;
22use crate::core::util::configuration::Configuration;
23use crate::core::util::crypto::hash;
24use crate::iterate;
25
26#[derive(Clone, Debug)]
33pub enum MempoolMessage {
34 LocalTryBundleBlock,
35 LocalNewBlock,
36}
37
38#[derive(Debug)]
44pub struct Mempool {
45 pub blocks_queue: VecDeque<Block>,
46 pub transactions: AHashMap<SaitoSignature, Transaction>,
47 pub golden_tickets: AHashMap<SaitoHash, (Transaction, bool)>,
48 routing_work_in_mempool: Currency,
50 pub new_tx_added: bool,
51 pub wallet_lock: Arc<RwLock<Wallet>>,
52 pub utxo_map: AHashMap<SaitoUTXOSetKey, u64>,
53}
54
55impl Mempool {
56 #[allow(clippy::new_without_default)]
57 pub fn new(wallet_lock: Arc<RwLock<Wallet>>) -> Self {
58 Mempool {
59 blocks_queue: VecDeque::new(),
60 transactions: Default::default(),
61 golden_tickets: Default::default(),
62 routing_work_in_mempool: 0,
63 new_tx_added: false,
64 wallet_lock,
65 utxo_map: AHashMap::new(),
66 }
67 }
68
69 pub fn add_block(&mut self, block: Block) {
70 debug!(
71 "mempool add block : {:?}-{:?}",
72 block.id,
73 block.hash.to_hex()
74 );
75 let hash_to_insert = block.hash;
76 if !iterate!(self.blocks_queue, 100).any(|block| block.hash == hash_to_insert) {
77 self.blocks_queue.push_back(block);
78 } else {
79 debug!("block not added to mempool as it was already there");
80 }
81 }
82 pub async fn add_golden_ticket(&mut self, golden_ticket: Transaction) {
83 let gt = GoldenTicket::deserialize_from_net(&golden_ticket.data);
84 debug!(
85 "adding golden ticket : {:?} target : {:?} public_key : {:?}",
86 hash(&golden_ticket.serialize_for_net()).to_hex(),
87 gt.target.to_hex(),
88 gt.public_key.to_base58()
89 );
90 if self.golden_tickets.contains_key(>.target) {
92 debug!(
93 "similar golden ticket already exists : {:?}",
94 gt.target.to_hex()
95 );
96 return;
97 }
98 self.golden_tickets
99 .insert(gt.target, (golden_ticket, false));
100
101 debug!("golden ticket added to mempool");
102 }
103 pub async fn add_transaction_if_validates(
104 &mut self,
105 mut transaction: Transaction,
106 blockchain: &Blockchain,
107 ) {
108 trace!(
109 "add transaction if validates : {:?}",
110 transaction.signature.to_hex()
111 );
112 let public_key;
113 let tx_valid;
114 {
115 let wallet = self.wallet_lock.read().await;
116 public_key = wallet.public_key;
117 transaction.generate(&public_key, 0, 0);
118
119 tx_valid = transaction.validate(&blockchain.utxoset, blockchain, true);
120 }
121
122 if tx_valid {
124 self.add_transaction(transaction).await;
125 } else {
126 debug!(
127 "transaction not valid : {:?}",
128 transaction.signature.to_hex()
129 );
130 }
131 }
132 pub async fn add_transaction(&mut self, transaction: Transaction) {
133 debug_assert!(transaction.hash_for_signature.is_some());
140
141 for input in transaction.from.iter() {
142 let utxo_key = input.utxoset_key;
143 if self.utxo_map.contains_key(&utxo_key) && input.amount > 0 {
144 warn!(
146 "duplicate input : \n{} found in transaction : \n{}",
147 input, transaction
148 );
149 return;
150 }
151 }
152
153 if !self.transactions.contains_key(&transaction.signature) {
162 self.routing_work_in_mempool += transaction.total_work_for_me;
163 if let TransactionType::GoldenTicket = transaction.transaction_type {
168 panic!("golden tickets should be in gt collection");
169 } else {
170 self.transactions
171 .insert(transaction.signature, transaction.clone());
172 self.new_tx_added = true;
173
174 for input in transaction.from.iter() {
175 let utxo_key = input.utxoset_key;
176 self.utxo_map.insert(utxo_key, 1);
177 }
178 }
179 }
180 }
181
182 pub async fn bundle_block(
183 &mut self,
184 blockchain: &Blockchain,
185 current_timestamp: Timestamp,
186 gt_tx: Option<Transaction>,
187 configs: &(dyn Configuration + Send + Sync),
188 storage: &Storage,
189 ) -> Option<Block> {
190 let previous_block_hash: SaitoHash;
191 let public_key;
192 let private_key;
193 let block_timestamp_gap;
194 {
195 let wallet = self.wallet_lock.read().await;
196 previous_block_hash = blockchain.get_latest_block_hash();
197 let previous_block_timestamp = match blockchain.get_latest_block() {
198 None => 0,
199 Some(block) => block.timestamp,
200 };
201
202 assert!(
203 current_timestamp > previous_block_timestamp,
204 "current timestamp = {:?} should be larger than previous block timestamp : {:?}",
205 StatVariable::format_timestamp(current_timestamp),
206 StatVariable::format_timestamp(previous_block_timestamp)
207 );
208 block_timestamp_gap =
209 Duration::from_millis(current_timestamp - previous_block_timestamp).as_secs();
210 public_key = wallet.public_key;
211 private_key = wallet.private_key;
212 }
213 let mempool_work = self
214 .can_bundle_block(blockchain, current_timestamp, >_tx, configs, &public_key)
215 .await?;
216 info!(
217 "bundling block with {:?} txs with work : {:?} with a gap of {:?} seconds. timestamp : {:?}",
218 self.transactions.len(),
219 mempool_work,
220 block_timestamp_gap,
221 current_timestamp
222 );
223
224 let staking_tx;
225 {
226 let mut wallet = self.wallet_lock.write().await;
227
228 staking_tx = wallet
229 .create_staking_transaction(
230 blockchain.social_stake_requirement,
231 blockchain.get_latest_unlocked_stake_block_id(),
232 (blockchain.get_latest_block_id() + 1)
233 .saturating_sub(configs.get_consensus_config().unwrap().genesis_period),
234 )
235 .ok()?;
236 }
237 self.add_transaction_if_validates(staking_tx, blockchain)
238 .await;
239
240 let mut block = Block::create(
241 &mut self.transactions,
242 previous_block_hash,
243 blockchain,
244 current_timestamp,
245 &public_key,
246 &private_key,
247 gt_tx,
248 configs,
249 storage,
250 )
251 .await
252 .ok()?;
253 block.generate().ok()?;
254 debug!(
255 "block generated with work : {:?} and burnfee : {:?} gts : {:?}",
256 block.total_work,
257 block.burnfee,
258 block
259 .transactions
260 .iter()
261 .filter(|tx| matches!(tx.transaction_type, TransactionType::GoldenTicket))
262 .count()
263 );
264 self.new_tx_added = false;
266 self.routing_work_in_mempool = 0;
267
268 for tx in &block.transactions {
269 for input in &tx.from {
270 let utxo_key = input.utxoset_key;
271 self.utxo_map.remove(&utxo_key);
272 }
273 }
274
275 Some(block)
276 }
277
278 pub async fn bundle_genesis_block(
279 &mut self,
280 blockchain: &mut Blockchain,
281 current_timestamp: Timestamp,
282 configs: &(dyn Configuration + Send + Sync),
283 storage: &Storage,
284 ) -> Block {
285 debug!("bundling genesis block...");
286 let public_key;
287 let private_key;
288
289 let wallet = self.wallet_lock.read().await;
290 public_key = wallet.public_key;
291 private_key = wallet.private_key;
292
293 let mut block = Block::create(
294 &mut self.transactions,
295 [0; 32],
296 blockchain,
297 current_timestamp,
298 &public_key,
299 &private_key,
300 None,
301 configs,
302 storage,
303 )
304 .await
305 .unwrap();
306 block.generate().unwrap();
307 self.new_tx_added = false;
308 self.routing_work_in_mempool = 0;
309
310 block
311 }
312
313 pub async fn can_bundle_block(
314 &self,
315 blockchain: &Blockchain,
316 current_timestamp: Timestamp,
317 gt_tx: &Option<Transaction>,
318 configs: &(dyn Configuration + Send + Sync),
319 public_key: &SaitoPublicKey,
320 ) -> Option<Currency> {
321 if blockchain.blocks.is_empty() {
322 warn!("Not generating #1 block. Waiting for blocks from peers");
323 return None;
324 }
325 if !self.blocks_queue.is_empty() {
326 trace!("there are blocks in queue. so cannot bundle new block");
327 return None;
328 }
329 if self.transactions.is_empty() || !self.new_tx_added {
330 trace!("there are no transactions in queue to bundle new block");
331 return None;
332 }
333 if !blockchain.is_golden_ticket_count_valid(
334 blockchain.get_latest_block_hash(),
335 gt_tx.is_some(),
336 configs.is_browser(),
337 configs.is_spv_mode(),
338 ) {
339 debug!("waiting till more golden tickets come in");
340 return None;
341 }
342
343 if let Some(previous_block) = blockchain.get_latest_block() {
344 let work_available = self.get_routing_work_available();
345 let work_needed = BurnFee::return_routing_work_needed_to_produce_block_in_nolan(
346 previous_block.burnfee,
347 current_timestamp,
348 previous_block.timestamp,
349 configs.get_consensus_config().unwrap().heartbeat_interval,
350 );
351 let time_elapsed = current_timestamp - previous_block.timestamp;
352
353 let mut h: Vec<u8> = vec![];
354 h.append(&mut public_key.to_vec());
355 h.append(&mut previous_block.hash.to_vec());
356 let h = hash(h.as_slice());
357 let value = U256::from(h);
358 let value: Timestamp =
359 (value.low_u128() % Duration::from_secs(5).as_millis()) as Timestamp;
360 if current_timestamp < previous_block.timestamp + value {
362 return None;
363 }
364 let result = work_available >= work_needed;
367 if result {
368 info!(
369 "last ts: {:?}, this ts: {:?}, work available: {:?}, work needed: {:?}, time_elapsed : {:?} can_bundle : {:?}",
370 previous_block.timestamp, current_timestamp, work_available, work_needed, time_elapsed, true
371 );
372 } else {
373 info!(
374 "last ts: {:?}, this ts: {:?}, work available: {:?}, work needed: {:?}, time_elapsed : {:?} can_bundle : {:?}",
375 previous_block.timestamp, current_timestamp, work_available, work_needed, time_elapsed, false
376 );
377 }
378 if result {
379 return Some(work_available);
380 }
381 None
382 } else {
383 Some(0)
384 }
385 }
386
387 pub fn delete_block(&mut self, block_hash: &SaitoHash) {
388 debug!("deleting block from mempool : {:?}", block_hash.to_hex());
389
390 self.golden_tickets.remove(block_hash);
391 }
393
394 pub fn delete_transactions(&mut self, transactions: &Vec<Transaction>) {
395 for transaction in transactions {
396 if let TransactionType::GoldenTicket = transaction.transaction_type {
397 let gt = GoldenTicket::deserialize_from_net(&transaction.data);
398 self.golden_tickets.remove(>.target);
399 } else {
400 self.transactions.remove(&transaction.signature);
401 }
402 }
403
404 self.routing_work_in_mempool = 0;
405
406 for (_, transaction) in &self.transactions {
408 self.routing_work_in_mempool += transaction.total_work_for_me;
409 }
410 }
411
412 pub fn get_routing_work_available(&self) -> Currency {
416 self.routing_work_in_mempool
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use std::ops::Deref;
423 use std::sync::Arc;
424
425 use tokio::sync::RwLock;
426
427 use crate::core::consensus::wallet::Wallet;
428 use crate::core::defs::{SaitoPrivateKey, SaitoPublicKey};
429 use crate::core::util::test::test_manager::test::{create_timestamp, TestManager};
430
431 use super::*;
432
433 #[tokio::test]
448 #[serial_test::serial]
449 async fn mempool_bundle_blocks_test() {
450 let mempool_lock: Arc<RwLock<Mempool>>;
451 let wallet_lock: Arc<RwLock<Wallet>>;
452 let blockchain_lock: Arc<RwLock<Blockchain>>;
453 let public_key: SaitoPublicKey;
454 let private_key: SaitoPrivateKey;
455 let mut t = TestManager::default();
456
457 {
458 t.initialize(100, 720_000).await;
459 wallet_lock = t.get_wallet_lock();
462 mempool_lock = t.get_mempool_lock();
463 blockchain_lock = t.get_blockchain_lock();
464 }
465
466 {
467 let wallet = wallet_lock.read().await;
468
469 public_key = wallet.public_key;
470 private_key = wallet.private_key;
471 }
472
473 let ts = create_timestamp();
474
475 let configs = t.config_lock.read().await;
476 let blockchain = blockchain_lock.read().await;
477 let genesis_period = configs.get_consensus_config().unwrap().genesis_period;
478 let latest_block_id = blockchain.get_latest_block_id();
479
480 let mut mempool = mempool_lock.write().await;
481
482 let _txs = Vec::<Transaction>::new();
483
484 assert_eq!(mempool.get_routing_work_available(), 0);
485
486 for _i in 0..5 {
487 let mut tx = Transaction::default();
488
489 {
490 let mut wallet = wallet_lock.write().await;
491
492 let (inputs, outputs) =
493 wallet.generate_slips(720_000, None, latest_block_id, genesis_period);
494 tx.from = inputs;
495 tx.to = outputs;
496 tx.timestamp = ts + 120000 + _i;
499 tx.generate(&public_key, 0, 0);
500 tx.sign(&private_key);
501 }
502 let wallet = wallet_lock.read().await;
503 tx.add_hop(&wallet.private_key, &wallet.public_key, &[1; 33]);
504 tx.generate(&public_key, 0, 0);
505 mempool.add_transaction(tx).await;
506 }
507
508 assert_eq!(mempool.transactions.len(), 5);
509 assert_eq!(mempool.get_routing_work_available(), 0);
510
511 assert!(mempool
518 .can_bundle_block(
519 &blockchain,
520 ts + 120000,
521 &None,
522 configs.deref(),
523 &public_key,
524 )
525 .await
526 .is_some());
527 }
528}