saito_core/core/consensus/
mempool.rs

1use std::collections::vec_deque::VecDeque;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ahash::AHashMap;
6use log::{debug, info, trace, warn};
7use primitive_types::U256;
8use rayon::prelude::*;
9use tokio::sync::RwLock;
10
11use crate::core::consensus::block::Block;
12use crate::core::consensus::blockchain::Blockchain;
13use crate::core::consensus::burnfee::BurnFee;
14use crate::core::consensus::golden_ticket::GoldenTicket;
15use crate::core::consensus::transaction::{Transaction, TransactionType};
16use crate::core::consensus::wallet::Wallet;
17use crate::core::defs::SaitoUTXOSetKey;
18use crate::core::defs::{
19    Currency, PrintForLog, SaitoHash, SaitoPublicKey, SaitoSignature, StatVariable, Timestamp,
20};
21use crate::core::io::storage::Storage;
22use crate::core::util::configuration::Configuration;
23use crate::core::util::crypto::hash;
24use crate::iterate;
25
26//
27// In addition to responding to global broadcast messages, the
28// mempool has a local broadcast channel it uses to coordinate
29// attempts to bundle blocks and notify itself when a block has
30// been produced.
31//
32#[derive(Clone, Debug)]
33pub enum MempoolMessage {
34    LocalTryBundleBlock,
35    LocalNewBlock,
36}
37
38/// The `Mempool` holds unprocessed blocks and transactions and is in control of
39/// discerning when the node is allowed to create a block. It bundles the block and
40/// sends it to the `Blockchain` to be added to the longest-chain. New `Block`s
41/// received over the network are queued in the `Mempool` before being added to
42/// the `Blockchain`
43#[derive(Debug)]
44pub struct Mempool {
45    pub blocks_queue: VecDeque<Block>,
46    pub transactions: AHashMap<SaitoSignature, Transaction>,
47    pub golden_tickets: AHashMap<SaitoHash, (Transaction, bool)>,
48    // vector so we just copy it over
49    routing_work_in_mempool: Currency,
50    pub new_tx_added: bool,
51    pub wallet_lock: Arc<RwLock<Wallet>>,
52    pub utxo_map: AHashMap<SaitoUTXOSetKey, u64>,
53}
54
55impl Mempool {
56    #[allow(clippy::new_without_default)]
57    pub fn new(wallet_lock: Arc<RwLock<Wallet>>) -> Self {
58        Mempool {
59            blocks_queue: VecDeque::new(),
60            transactions: Default::default(),
61            golden_tickets: Default::default(),
62            routing_work_in_mempool: 0,
63            new_tx_added: false,
64            wallet_lock,
65            utxo_map: AHashMap::new(),
66        }
67    }
68
69    pub fn add_block(&mut self, block: Block) {
70        debug!(
71            "mempool add block : {:?}-{:?}",
72            block.id,
73            block.hash.to_hex()
74        );
75        let hash_to_insert = block.hash;
76        if !iterate!(self.blocks_queue, 100).any(|block| block.hash == hash_to_insert) {
77            self.blocks_queue.push_back(block);
78        } else {
79            debug!("block not added to mempool as it was already there");
80        }
81    }
82    pub async fn add_golden_ticket(&mut self, golden_ticket: Transaction) {
83        let gt = GoldenTicket::deserialize_from_net(&golden_ticket.data);
84        debug!(
85            "adding golden ticket : {:?} target : {:?} public_key : {:?}",
86            hash(&golden_ticket.serialize_for_net()).to_hex(),
87            gt.target.to_hex(),
88            gt.public_key.to_base58()
89        );
90        // TODO : should we replace others' GT with our GT if targets are similar ?
91        if self.golden_tickets.contains_key(&gt.target) {
92            debug!(
93                "similar golden ticket already exists : {:?}",
94                gt.target.to_hex()
95            );
96            return;
97        }
98        self.golden_tickets
99            .insert(gt.target, (golden_ticket, false));
100
101        debug!("golden ticket added to mempool");
102    }
103    pub async fn add_transaction_if_validates(
104        &mut self,
105        mut transaction: Transaction,
106        blockchain: &Blockchain,
107    ) {
108        trace!(
109            "add transaction if validates : {:?}",
110            transaction.signature.to_hex()
111        );
112        let public_key;
113        let tx_valid;
114        {
115            let wallet = self.wallet_lock.read().await;
116            public_key = wallet.public_key;
117            transaction.generate(&public_key, 0, 0);
118
119            tx_valid = transaction.validate(&blockchain.utxoset, blockchain, true);
120        }
121
122        // validate
123        if tx_valid {
124            self.add_transaction(transaction).await;
125        } else {
126            debug!(
127                "transaction not valid : {:?}",
128                transaction.signature.to_hex()
129            );
130        }
131    }
132    pub async fn add_transaction(&mut self, transaction: Transaction) {
133        // trace!(
134        //     "add_transaction {:?} : type = {:?}",
135        //     transaction.signature.to_hex(),
136        //     transaction.transaction_type
137        // );
138
139        debug_assert!(transaction.hash_for_signature.is_some());
140
141        for input in transaction.from.iter() {
142            let utxo_key = input.utxoset_key;
143            if self.utxo_map.contains_key(&utxo_key) && input.amount > 0 {
144                // Duplicate input found, reject transaction
145                warn!(
146                    "duplicate input : \n{} found in transaction : \n{}",
147                    input, transaction
148                );
149                return;
150            }
151        }
152
153        // this assigns the amount of routing work that this transaction
154        // contains to us, which is why we need to provide our public_key
155        // so that we can calculate routing work.
156        //
157
158        // generates hashes, total fees, routing work for me, etc.
159        // transaction.generate(&self.public_key, 0, 0);
160
161        if !self.transactions.contains_key(&transaction.signature) {
162            self.routing_work_in_mempool += transaction.total_work_for_me;
163            // trace!(
164            //     "routing work available in mempool : {:?} after adding work : {:?} from tx with fees : {:?}",
165            //     self.routing_work_in_mempool, transaction.total_work_for_me, transaction.total_fees
166            // );
167            if let TransactionType::GoldenTicket = transaction.transaction_type {
168                panic!("golden tickets should be in gt collection");
169            } else {
170                self.transactions
171                    .insert(transaction.signature, transaction.clone());
172                self.new_tx_added = true;
173
174                for input in transaction.from.iter() {
175                    let utxo_key = input.utxoset_key;
176                    self.utxo_map.insert(utxo_key, 1);
177                }
178            }
179        }
180    }
181
182    pub async fn bundle_block(
183        &mut self,
184        blockchain: &Blockchain,
185        current_timestamp: Timestamp,
186        gt_tx: Option<Transaction>,
187        configs: &(dyn Configuration + Send + Sync),
188        storage: &Storage,
189    ) -> Option<Block> {
190        let previous_block_hash: SaitoHash;
191        let public_key;
192        let private_key;
193        let block_timestamp_gap;
194        {
195            let wallet = self.wallet_lock.read().await;
196            previous_block_hash = blockchain.get_latest_block_hash();
197            let previous_block_timestamp = match blockchain.get_latest_block() {
198                None => 0,
199                Some(block) => block.timestamp,
200            };
201
202            assert!(
203                current_timestamp > previous_block_timestamp,
204                "current timestamp = {:?} should be larger than previous block timestamp : {:?}",
205                StatVariable::format_timestamp(current_timestamp),
206                StatVariable::format_timestamp(previous_block_timestamp)
207            );
208            block_timestamp_gap =
209                Duration::from_millis(current_timestamp - previous_block_timestamp).as_secs();
210            public_key = wallet.public_key;
211            private_key = wallet.private_key;
212        }
213        let mempool_work = self
214            .can_bundle_block(blockchain, current_timestamp, &gt_tx, configs, &public_key)
215            .await?;
216        info!(
217            "bundling block with {:?} txs with work : {:?} with a gap of {:?} seconds. timestamp : {:?}",
218            self.transactions.len(),
219            mempool_work,
220            block_timestamp_gap,
221            current_timestamp
222        );
223
224        let staking_tx;
225        {
226            let mut wallet = self.wallet_lock.write().await;
227
228            staking_tx = wallet
229                .create_staking_transaction(
230                    blockchain.social_stake_requirement,
231                    blockchain.get_latest_unlocked_stake_block_id(),
232                    (blockchain.get_latest_block_id() + 1)
233                        .saturating_sub(configs.get_consensus_config().unwrap().genesis_period),
234                )
235                .ok()?;
236        }
237        self.add_transaction_if_validates(staking_tx, blockchain)
238            .await;
239
240        let mut block = Block::create(
241            &mut self.transactions,
242            previous_block_hash,
243            blockchain,
244            current_timestamp,
245            &public_key,
246            &private_key,
247            gt_tx,
248            configs,
249            storage,
250        )
251        .await
252        .ok()?;
253        block.generate().ok()?;
254        debug!(
255            "block generated with work : {:?} and burnfee : {:?} gts : {:?}",
256            block.total_work,
257            block.burnfee,
258            block
259                .transactions
260                .iter()
261                .filter(|tx| matches!(tx.transaction_type, TransactionType::GoldenTicket))
262                .count()
263        );
264        // assert_eq!(block.total_work, mempool_work);
265        self.new_tx_added = false;
266        self.routing_work_in_mempool = 0;
267
268        for tx in &block.transactions {
269            for input in &tx.from {
270                let utxo_key = input.utxoset_key;
271                self.utxo_map.remove(&utxo_key);
272            }
273        }
274
275        Some(block)
276    }
277
278    pub async fn bundle_genesis_block(
279        &mut self,
280        blockchain: &mut Blockchain,
281        current_timestamp: Timestamp,
282        configs: &(dyn Configuration + Send + Sync),
283        storage: &Storage,
284    ) -> Block {
285        debug!("bundling genesis block...");
286        let public_key;
287        let private_key;
288
289        let wallet = self.wallet_lock.read().await;
290        public_key = wallet.public_key;
291        private_key = wallet.private_key;
292
293        let mut block = Block::create(
294            &mut self.transactions,
295            [0; 32],
296            blockchain,
297            current_timestamp,
298            &public_key,
299            &private_key,
300            None,
301            configs,
302            storage,
303        )
304        .await
305        .unwrap();
306        block.generate().unwrap();
307        self.new_tx_added = false;
308        self.routing_work_in_mempool = 0;
309
310        block
311    }
312
313    pub async fn can_bundle_block(
314        &self,
315        blockchain: &Blockchain,
316        current_timestamp: Timestamp,
317        gt_tx: &Option<Transaction>,
318        configs: &(dyn Configuration + Send + Sync),
319        public_key: &SaitoPublicKey,
320    ) -> Option<Currency> {
321        if blockchain.blocks.is_empty() {
322            warn!("Not generating #1 block. Waiting for blocks from peers");
323            return None;
324        }
325        if !self.blocks_queue.is_empty() {
326            trace!("there are blocks in queue. so cannot bundle new block");
327            return None;
328        }
329        if self.transactions.is_empty() || !self.new_tx_added {
330            trace!("there are no transactions in queue to bundle new block");
331            return None;
332        }
333        if !blockchain.is_golden_ticket_count_valid(
334            blockchain.get_latest_block_hash(),
335            gt_tx.is_some(),
336            configs.is_browser(),
337            configs.is_spv_mode(),
338        ) {
339            debug!("waiting till more golden tickets come in");
340            return None;
341        }
342
343        if let Some(previous_block) = blockchain.get_latest_block() {
344            let work_available = self.get_routing_work_available();
345            let work_needed = BurnFee::return_routing_work_needed_to_produce_block_in_nolan(
346                previous_block.burnfee,
347                current_timestamp,
348                previous_block.timestamp,
349                configs.get_consensus_config().unwrap().heartbeat_interval,
350            );
351            let time_elapsed = current_timestamp - previous_block.timestamp;
352
353            let mut h: Vec<u8> = vec![];
354            h.append(&mut public_key.to_vec());
355            h.append(&mut previous_block.hash.to_vec());
356            let h = hash(h.as_slice());
357            let value = U256::from(h);
358            let value: Timestamp =
359                (value.low_u128() % Duration::from_secs(5).as_millis()) as Timestamp;
360            // random hack to make sure nodes are not generating forks when fees are zero
361            if current_timestamp < previous_block.timestamp + value {
362                return None;
363            }
364            // info!("aaaa value = {:?}", value);
365
366            let result = work_available >= work_needed;
367            if result {
368                info!(
369                "last ts: {:?}, this ts: {:?}, work available: {:?}, work needed: {:?}, time_elapsed : {:?} can_bundle : {:?}",
370                previous_block.timestamp, current_timestamp, work_available, work_needed, time_elapsed, true
371                );
372            } else {
373                info!(
374                "last ts: {:?}, this ts: {:?}, work available: {:?}, work needed: {:?}, time_elapsed : {:?} can_bundle : {:?}",
375                previous_block.timestamp, current_timestamp, work_available, work_needed, time_elapsed, false
376                );
377            }
378            if result {
379                return Some(work_available);
380            }
381            None
382        } else {
383            Some(0)
384        }
385    }
386
387    pub fn delete_block(&mut self, block_hash: &SaitoHash) {
388        debug!("deleting block from mempool : {:?}", block_hash.to_hex());
389
390        self.golden_tickets.remove(block_hash);
391        // self.blocks_queue.retain(|block| !block.hash.eq(block_hash));
392    }
393
394    pub fn delete_transactions(&mut self, transactions: &Vec<Transaction>) {
395        for transaction in transactions {
396            if let TransactionType::GoldenTicket = transaction.transaction_type {
397                let gt = GoldenTicket::deserialize_from_net(&transaction.data);
398                self.golden_tickets.remove(&gt.target);
399            } else {
400                self.transactions.remove(&transaction.signature);
401            }
402        }
403
404        self.routing_work_in_mempool = 0;
405
406        // add routing work from remaining tx
407        for (_, transaction) in &self.transactions {
408            self.routing_work_in_mempool += transaction.total_work_for_me;
409        }
410    }
411
412    ///
413    /// Calculates the work available in mempool to produce a block
414    ///
415    pub fn get_routing_work_available(&self) -> Currency {
416        self.routing_work_in_mempool
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use std::ops::Deref;
423    use std::sync::Arc;
424
425    use tokio::sync::RwLock;
426
427    use crate::core::consensus::wallet::Wallet;
428    use crate::core::defs::{SaitoPrivateKey, SaitoPublicKey};
429    use crate::core::util::test::test_manager::test::{create_timestamp, TestManager};
430
431    use super::*;
432
433    // #[test]
434    // fn mempool_new_test() {
435    //     let mempool = Mempool::new([0; 33], [0; 32]);
436    //     assert_eq!(mempool.blocks_queue, VecDeque::new());
437    // }
438    //
439    // #[test]
440    // fn mempool_add_block_test() {
441    //     let mut mempool = Mempool::new([0; 33], [0; 32]);
442    //     let block = Block::new();
443    //     mempool.add_block(block.clone());
444    //     assert_eq!(Some(block), mempool.blocks_queue.pop_front())
445    // }
446
447    #[tokio::test]
448    #[serial_test::serial]
449    async fn mempool_bundle_blocks_test() {
450        let mempool_lock: Arc<RwLock<Mempool>>;
451        let wallet_lock: Arc<RwLock<Wallet>>;
452        let blockchain_lock: Arc<RwLock<Blockchain>>;
453        let public_key: SaitoPublicKey;
454        let private_key: SaitoPrivateKey;
455        let mut t = TestManager::default();
456
457        {
458            t.initialize(100, 720_000).await;
459            // t.wait_for_mining_event().await;
460
461            wallet_lock = t.get_wallet_lock();
462            mempool_lock = t.get_mempool_lock();
463            blockchain_lock = t.get_blockchain_lock();
464        }
465
466        {
467            let wallet = wallet_lock.read().await;
468
469            public_key = wallet.public_key;
470            private_key = wallet.private_key;
471        }
472
473        let ts = create_timestamp();
474
475        let configs = t.config_lock.read().await;
476        let blockchain = blockchain_lock.read().await;
477        let genesis_period = configs.get_consensus_config().unwrap().genesis_period;
478        let latest_block_id = blockchain.get_latest_block_id();
479
480        let mut mempool = mempool_lock.write().await;
481
482        let _txs = Vec::<Transaction>::new();
483
484        assert_eq!(mempool.get_routing_work_available(), 0);
485
486        for _i in 0..5 {
487            let mut tx = Transaction::default();
488
489            {
490                let mut wallet = wallet_lock.write().await;
491
492                let (inputs, outputs) =
493                    wallet.generate_slips(720_000, None, latest_block_id, genesis_period);
494                tx.from = inputs;
495                tx.to = outputs;
496                // _i prevents sig from being identical during test
497                // and thus from being auto-rejected from mempool
498                tx.timestamp = ts + 120000 + _i;
499                tx.generate(&public_key, 0, 0);
500                tx.sign(&private_key);
501            }
502            let wallet = wallet_lock.read().await;
503            tx.add_hop(&wallet.private_key, &wallet.public_key, &[1; 33]);
504            tx.generate(&public_key, 0, 0);
505            mempool.add_transaction(tx).await;
506        }
507
508        assert_eq!(mempool.transactions.len(), 5);
509        assert_eq!(mempool.get_routing_work_available(), 0);
510
511        // TODO : FIX THIS TEST
512        // assert_eq!(
513        //     mempool.can_bundle_block(blockchain_lock.clone(), ts).await,
514        //     false
515        // );
516
517        assert!(mempool
518            .can_bundle_block(
519                &blockchain,
520                ts + 120000,
521                &None,
522                configs.deref(),
523                &public_key,
524            )
525            .await
526            .is_some());
527    }
528}