saito_core/core/
consensus_thread.rs

1use std::ops::Deref;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use log::{debug, info, trace};
7use tokio::sync::mpsc::Sender;
8use tokio::sync::RwLock;
9
10use crate::core::consensus::block::{Block, BlockType};
11use crate::core::consensus::blockchain::Blockchain;
12use crate::core::consensus::golden_ticket::GoldenTicket;
13use crate::core::consensus::mempool::Mempool;
14use crate::core::consensus::transaction::{Transaction, TransactionType};
15use crate::core::consensus::wallet::Wallet;
16use crate::core::defs::{
17    PrintForLog, SaitoHash, StatVariable, Timestamp, CHANNEL_SAFE_BUFFER, STAT_BIN_COUNT,
18};
19use crate::core::io::network::Network;
20use crate::core::io::network_event::NetworkEvent;
21use crate::core::io::storage::Storage;
22use crate::core::mining_thread::MiningEvent;
23use crate::core::process::keep_time::Timer;
24use crate::core::process::process_event::ProcessEvent;
25use crate::core::routing_thread::RoutingEvent;
26use crate::core::util::configuration::Configuration;
27use crate::core::util::crypto::hash;
28
29pub const BLOCK_PRODUCING_TIMER: u64 = Duration::from_millis(1000).as_millis() as u64;
30
31#[derive(Debug)]
32pub enum ConsensusEvent {
33    NewGoldenTicket { golden_ticket: GoldenTicket },
34    BlockFetched { peer_index: u64, block: Block },
35    NewTransaction { transaction: Transaction },
36    NewTransactions { transactions: Vec<Transaction> },
37}
38
39pub struct ConsensusStats {
40    pub blocks_fetched: StatVariable,
41    pub blocks_created: StatVariable,
42    pub received_tx: StatVariable,
43    pub received_gts: StatVariable,
44}
45
46impl ConsensusStats {
47    pub fn new(sender: Sender<String>) -> Self {
48        ConsensusStats {
49            blocks_fetched: StatVariable::new(
50                "consensus::blocks_fetched".to_string(),
51                STAT_BIN_COUNT,
52                sender.clone(),
53            ),
54            blocks_created: StatVariable::new(
55                "consensus::blocks_created".to_string(),
56                STAT_BIN_COUNT,
57                sender.clone(),
58            ),
59            received_tx: StatVariable::new(
60                "consensus::received_tx".to_string(),
61                STAT_BIN_COUNT,
62                sender.clone(),
63            ),
64            received_gts: StatVariable::new(
65                "consensus::received_gts".to_string(),
66                STAT_BIN_COUNT,
67                sender.clone(),
68            ),
69        }
70    }
71}
72
73/// Manages blockchain and the mempool
74pub struct ConsensusThread {
75    pub mempool_lock: Arc<RwLock<Mempool>>,
76    pub blockchain_lock: Arc<RwLock<Blockchain>>,
77    pub wallet_lock: Arc<RwLock<Wallet>>,
78    pub generate_genesis_block: bool,
79    pub sender_to_router: Sender<RoutingEvent>,
80    pub sender_to_miner: Sender<MiningEvent>,
81    pub block_producing_timer: Timestamp,
82    pub timer: Timer,
83    pub network: Network,
84    pub storage: Storage,
85    pub stats: ConsensusStats,
86    pub txs_for_mempool: Vec<Transaction>,
87    pub stat_sender: Sender<String>,
88    pub config_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
89    pub produce_blocks_by_timer: bool,
90    pub delete_old_blocks: bool,
91}
92
93impl ConsensusThread {
94    async fn generate_issuance_tx(
95        &mut self,
96        mempool_lock: Arc<RwLock<Mempool>>,
97        blockchain_lock: Arc<RwLock<Blockchain>>,
98    ) {
99        info!("generating issuance init transaction");
100
101        let slips = self.storage.get_token_supply_slips_from_disk().await;
102        let private_key;
103        let public_key;
104        {
105            let wallet = self.wallet_lock.read().await;
106            private_key = wallet.private_key;
107            public_key = wallet.public_key;
108        }
109        let mut txs: Vec<Transaction> = vec![];
110        let mut initial_token_supply = 0;
111        let slip_count = slips.len();
112        for slip in slips {
113            debug!("{:?} slip public key", slip.public_key.to_base58());
114            initial_token_supply += slip.amount;
115            let mut tx = Transaction::create_issuance_transaction(slip.public_key, slip.amount);
116            tx.generate(&public_key, 0, 0);
117            tx.sign(&private_key);
118            txs.push(tx);
119        }
120
121        assert_eq!(
122            slip_count,
123            txs.len(),
124            "issuanace slips and txs counts should be equal"
125        );
126
127        let mut blockchain = blockchain_lock.write().await;
128        // setting the initial token supply to the blockchain here if we are generating the genesis block
129        blockchain.initial_token_supply = initial_token_supply;
130        info!("initial token supply : {:?} set", initial_token_supply);
131        let mut mempool = mempool_lock.write().await;
132
133        for tx in txs {
134            mempool
135                .add_transaction_if_validates(tx.clone(), &blockchain)
136                .await;
137            info!("added issuance init tx for : {:?}", tx.signature.to_hex());
138        }
139        assert_eq!(
140            mempool.transactions.len(),
141            slip_count,
142            "mempool txs count should be equal to issuance slips count"
143        );
144    }
145    pub async fn produce_block(
146        &mut self,
147        timestamp: Timestamp,
148        gt_result: Option<&Transaction>,
149        mempool: &mut Mempool,
150        blockchain: &Blockchain,
151        configs: &(dyn Configuration + Send + Sync),
152    ) -> Option<Block> {
153        // trace!("locking blockchain 3");
154
155        self.block_producing_timer = 0;
156
157        let block = mempool
158            .bundle_block(
159                blockchain,
160                timestamp,
161                gt_result.cloned(),
162                configs,
163                &self.storage,
164            )
165            .await;
166        if let Some(block) = block {
167            trace!(
168                "mempool size after creating block : {:?}",
169                mempool.transactions.len()
170            );
171
172            self.txs_for_mempool.clear();
173
174            return Some(block);
175        }
176        None
177    }
178    pub async fn bundle_block(
179        &mut self,
180        timestamp: Timestamp,
181        produce_without_limits: bool,
182    ) -> bool {
183        let config_lock = self.config_lock.clone();
184        let configs = config_lock.read().await;
185
186        // trace!("locking blockchain 3");
187        let blockchain_lock = self.blockchain_lock.clone();
188        let mempool_lock = self.mempool_lock.clone();
189        let mut blockchain = blockchain_lock.write().await;
190        let mut mempool = mempool_lock.write().await;
191
192        for tx in self.txs_for_mempool.iter() {
193            if let TransactionType::GoldenTicket = tx.transaction_type {
194                unreachable!("golden tickets shouldn't be here");
195            } else {
196                mempool
197                    .add_transaction_if_validates(tx.clone(), &blockchain)
198                    .await;
199            }
200        }
201
202        self.block_producing_timer = 0;
203
204        let mut gt_result = None;
205        let mut gt_propagated = false;
206        {
207            let result: Option<&(Transaction, bool)> = mempool
208                .golden_tickets
209                .get(&blockchain.get_latest_block_hash());
210            if let Some((tx, propagated)) = result {
211                gt_result = Some(tx.clone());
212                gt_propagated = *propagated;
213            }
214        }
215        let mut block = None;
216        if (produce_without_limits || (!configs.is_browser() && !configs.is_spv_mode()))
217            && !blockchain.blocks.is_empty()
218        {
219            block = self
220                .produce_block(
221                    timestamp,
222                    gt_result.as_ref(),
223                    &mut mempool,
224                    &blockchain,
225                    configs.deref(),
226                )
227                .await;
228        } else {
229            // debug!("skipped bundling block. : produce_without_limits = {:?}, is_browser : {:?} block_count : {:?}",
230            //     produce_without_limits,
231            //     configs.is_browser() || configs.is_spv_mode(),
232            //     blockchain.blocks.len());
233        }
234        if let Some(block) = block {
235            debug!(
236                "adding bundled block : {:?} with id : {:?} to mempool",
237                block.hash.to_hex(),
238                block.id
239            );
240            trace!(
241                "mempool size after bundling : {:?}",
242                mempool.transactions.len()
243            );
244
245            mempool.add_block(block);
246            // dropping the lock here since blockchain needs the write lock to add blocks
247            drop(mempool);
248            self.stats.blocks_created.increment();
249            let _updated = blockchain
250                .add_blocks_from_mempool(
251                    self.mempool_lock.clone(),
252                    Some(&self.network),
253                    &mut self.storage,
254                    Some(self.sender_to_miner.clone()),
255                    Some(self.sender_to_router.clone()),
256                    configs.deref(),
257                )
258                .await;
259
260            debug!("blocks added to blockchain");
261            return true;
262        } else {
263            // route messages to peers
264            if !self.txs_for_mempool.is_empty() {
265                trace!(
266                    "since a block was not produced, propagating {:?} txs to peers",
267                    self.txs_for_mempool.len()
268                );
269                for tx in self.txs_for_mempool.drain(..) {
270                    self.network.propagate_transaction(&tx).await;
271                }
272            }
273            // route golden tickets to peers
274            if gt_result.is_some() && !gt_propagated {
275                self.network
276                    .propagate_transaction(gt_result.as_ref().unwrap())
277                    .await;
278                debug!(
279                    "propagating gt : {:?} to peers",
280                    hash(&gt_result.unwrap().serialize_for_net()).to_hex()
281                );
282                let (_, propagated) = mempool
283                    .golden_tickets
284                    .get_mut(&blockchain.get_latest_block_hash())
285                    .unwrap();
286                *propagated = true;
287            }
288            return true;
289        }
290        // trace!("releasing blockchain 3");
291        false
292    }
293
294    async fn produce_genesis_block(&mut self, timestamp: Timestamp) {
295        info!("producing genesis block");
296        self.generate_issuance_tx(self.mempool_lock.clone(), self.blockchain_lock.clone())
297            .await;
298
299        let configs = self.config_lock.read().await;
300        let mut blockchain = self.blockchain_lock.write().await;
301        if blockchain.blocks.is_empty() && blockchain.genesis_block_id == 0 {
302            let mut mempool = self.mempool_lock.write().await;
303
304            let block = mempool
305                .bundle_genesis_block(&mut blockchain, timestamp, configs.deref(), &self.storage)
306                .await;
307            info!(
308                "produced genesis block : {:?} with id : {:?}",
309                block.hash.to_hex(),
310                block.id
311            );
312
313            let _res = blockchain
314                .add_block(block, &mut self.storage, &mut mempool, configs.deref())
315                .await;
316        }
317
318        self.generate_genesis_block = false;
319    }
320
321    pub async fn add_gt_to_mempool(&mut self, golden_ticket: GoldenTicket) {
322        debug!(
323            "adding received new golden ticket : {:?} to mempool",
324            golden_ticket.target.to_hex()
325        );
326        let mut mempool = self.mempool_lock.write().await;
327        let public_key;
328        let private_key;
329        {
330            let wallet = self.wallet_lock.read().await;
331
332            public_key = wallet.public_key;
333            private_key = wallet.private_key;
334        }
335        let transaction =
336            Wallet::create_golden_ticket_transaction(golden_ticket, &public_key, &private_key)
337                .await;
338        self.stats.received_gts.increment();
339        mempool.add_golden_ticket(transaction).await;
340    }
341}
342
343#[async_trait]
344impl ProcessEvent<ConsensusEvent> for ConsensusThread {
345    async fn process_network_event(&mut self, _event: NetworkEvent) -> Option<()> {
346        unreachable!();
347    }
348
349    async fn process_timer_event(&mut self, duration: Duration) -> Option<()> {
350        let mut work_done = false;
351        let timestamp = self.timer.get_timestamp_in_ms();
352        let duration_value = duration.as_millis() as u64;
353
354        if self.generate_genesis_block {
355            self.produce_genesis_block(timestamp).await;
356            return Some(());
357        }
358
359        // generate blocks
360        self.block_producing_timer += duration_value;
361        if self.produce_blocks_by_timer && self.block_producing_timer >= BLOCK_PRODUCING_TIMER {
362            self.bundle_block(timestamp, false).await;
363
364            work_done = true;
365        }
366
367        // let config = self.config_lock.read().await;
368        // if config.get_blockchain_configs().issuance_file_write_interval > 0 {
369        //     // generate issuance file periodically
370        //     self.issuance_writing_timer += duration_value;
371
372        //     if self.issuance_writing_timer
373        //         >= config.get_blockchain_configs().issuance_file_write_interval
374        //     {
375        //         let mut mempool = self.mempool_lock.write().await;
376        //         let mut blockchain = self.blockchain_lock.write().await;
377        //         blockchain.write_issuance_file(0, &mut self.storage).await;
378        //         self.issuance_writing_timer = 0;
379        //         work_done = true;
380        //     }
381        // }
382
383        if work_done {
384            return Some(());
385        }
386        None
387    }
388
389    async fn process_event(&mut self, event: ConsensusEvent) -> Option<()> {
390        match event {
391            ConsensusEvent::NewGoldenTicket { golden_ticket } => {
392                self.add_gt_to_mempool(golden_ticket).await;
393                Some(())
394            }
395            ConsensusEvent::BlockFetched { block, .. } => {
396                let configs = self.config_lock.read().await;
397                // trace!("locking blockchain 4");
398                let mut blockchain = self.blockchain_lock.write().await;
399
400                {
401                    debug!("block : {:?} fetched from peer", block.hash.to_hex());
402
403                    if blockchain.blocks.contains_key(&block.hash) {
404                        debug!(
405                            "fetched block : {:?} already in blockchain",
406                            block.hash.to_hex()
407                        );
408                        return Some(());
409                    }
410                    debug!(
411                        "adding fetched block : {:?}-{:?} to mempool",
412                        block.id,
413                        block.hash.to_hex()
414                    );
415                    let mut mempool = self.mempool_lock.write().await;
416                    mempool.add_block(block);
417                }
418                self.stats.blocks_fetched.increment();
419                blockchain
420                    .add_blocks_from_mempool(
421                        self.mempool_lock.clone(),
422                        Some(&self.network),
423                        &mut self.storage,
424                        Some(self.sender_to_miner.clone()),
425                        Some(self.sender_to_router.clone()),
426                        configs.deref(),
427                    )
428                    .await;
429                // trace!("releasing blockchain 4");
430
431                Some(())
432            }
433            ConsensusEvent::NewTransaction { transaction } => {
434                self.stats.received_tx.increment();
435
436                if let TransactionType::GoldenTicket = transaction.transaction_type {
437                    let mut mempool = self.mempool_lock.write().await;
438
439                    self.stats.received_gts.increment();
440                    trace!("adding golden ticket to mempool");
441                    mempool.add_golden_ticket(transaction).await;
442                } else {
443                    trace!("adding transaction to mempool");
444                    self.txs_for_mempool.push(transaction);
445                }
446
447                Some(())
448            }
449            ConsensusEvent::NewTransactions { mut transactions } => {
450                self.stats
451                    .received_tx
452                    .increment_by(transactions.len() as u64);
453
454                self.txs_for_mempool.reserve(transactions.len());
455                for transaction in transactions.drain(..) {
456                    if let TransactionType::GoldenTicket = transaction.transaction_type {
457                        let mut mempool = self.mempool_lock.write().await;
458
459                        self.stats.received_gts.increment();
460                        mempool.add_golden_ticket(transaction).await;
461                    } else {
462                        self.txs_for_mempool.push(transaction);
463                    }
464                }
465                Some(())
466            }
467        }
468    }
469
470    async fn on_init(&mut self) {
471        debug!("on_init");
472
473        {
474            let configs = self.config_lock.read().await;
475            info!(
476                "genesis_period : {:?}",
477                configs.get_consensus_config().unwrap().genesis_period
478            );
479            info!(
480                "default_social_stake : {:?}",
481                configs.get_consensus_config().unwrap().default_social_stake
482            );
483            info!(
484                "default_social_stake_period : {:?}",
485                configs
486                    .get_consensus_config()
487                    .unwrap()
488                    .default_social_stake_period
489            );
490            let mut blockchain = self.blockchain_lock.write().await;
491            let blockchain_configs = configs.get_blockchain_configs();
492            info!(
493                "loading blockchain state from configs : {:?}",
494                blockchain_configs
495            );
496            blockchain.last_block_hash =
497                SaitoHash::from_hex(blockchain_configs.last_block_hash.as_str()).unwrap_or([0; 32]);
498            blockchain.last_block_id = blockchain_configs.last_block_id;
499            blockchain.last_timestamp = blockchain_configs.last_timestamp;
500            blockchain.genesis_block_id = blockchain_configs.genesis_block_id;
501            blockchain.genesis_timestamp = blockchain_configs.genesis_timestamp;
502            blockchain.lowest_acceptable_timestamp = blockchain_configs.lowest_acceptable_timestamp;
503            blockchain.lowest_acceptable_block_hash =
504                SaitoHash::from_hex(blockchain_configs.lowest_acceptable_block_hash.as_str())
505                    .unwrap_or([0; 32]);
506            blockchain.lowest_acceptable_block_id = blockchain_configs.lowest_acceptable_block_id;
507            blockchain.fork_id =
508                Some(SaitoHash::from_hex(blockchain_configs.fork_id.as_str()).unwrap_or([0; 32]));
509        }
510
511        let configs = self.config_lock.read().await;
512        let mut blockchain = self.blockchain_lock.write().await;
513        if !configs.is_browser() {
514            let mut list = self
515                .storage
516                .load_block_name_list()
517                .await
518                .expect("cannot load block file list");
519            if configs.get_peer_configs().is_empty() && list.is_empty() {
520                self.generate_genesis_block = true;
521            }
522            let start_time = self.timer.get_timestamp_in_ms();
523
524            info!(
525                "loading {:?} blocks from disk. Timestamp : {:?}",
526                list.len(),
527                StatVariable::format_timestamp(start_time)
528            );
529            let mut files_to_delete = list.clone();
530
531            while !list.is_empty() {
532                let file_names: Vec<String> =
533                    list.drain(..std::cmp::min(1000, list.len())).collect();
534                self.storage
535                    .load_blocks_from_disk(file_names.as_slice(), self.mempool_lock.clone())
536                    .await;
537
538                blockchain
539                    .add_blocks_from_mempool(
540                        self.mempool_lock.clone(),
541                        None,
542                        &mut self.storage,
543                        None,
544                        None,
545                        configs.deref(),
546                    )
547                    .await;
548
549                info!(
550                    "{:?} blocks remaining to be loaded. Timestamp : {:?}",
551                    list.len(),
552                    StatVariable::format_timestamp(self.timer.get_timestamp_in_ms())
553                );
554            }
555            {
556                let mut mempool = self.mempool_lock.write().await;
557                info!("removing {} failed blocks from mempool so new blocks can be bundled after node loadup", mempool.blocks_queue.len());
558                mempool.blocks_queue.clear();
559            }
560            if self.delete_old_blocks {
561                let purge_id = blockchain
562                    .get_latest_block_id()
563                    .saturating_sub(blockchain.genesis_period * 2);
564                let retained_file_names: Vec<String> = blockchain
565                    .blocks
566                    .iter()
567                    .filter_map(|(_, block)| {
568                        if block.id < purge_id {
569                            return None;
570                        }
571                        Some(block.get_file_name())
572                    })
573                    .collect();
574                files_to_delete.retain(|name| !retained_file_names.contains(name));
575                info!(
576                    "removing {} blocks from disk which were not loaded to blockchain or older than genesis block : {}",
577                    files_to_delete.len(),
578                    blockchain.genesis_block_id
579                );
580                for file_name in files_to_delete {
581                    self.storage
582                        .delete_block_from_disk(
583                            (self.storage.io_interface.get_block_dir() + file_name.as_str())
584                                .as_str(),
585                        )
586                        .await;
587                }
588            }
589            info!(
590                "{:?} total blocks in blockchain. Timestamp : {:?}, elapsed_time : {:?}",
591                blockchain.blocks.len(),
592                StatVariable::format_timestamp(self.timer.get_timestamp_in_ms()),
593                self.timer.get_timestamp_in_ms() - start_time
594            );
595            {
596                if let Some(latest_block) = blockchain.get_latest_block() {
597                    self.sender_to_miner
598                        .send(MiningEvent::LongestChainBlockAdded {
599                            hash: latest_block.hash,
600                            difficulty: latest_block.difficulty,
601                            block_id: latest_block.id,
602                        })
603                        .await
604                        .unwrap();
605                }
606            }
607        }
608
609        debug!(
610            "sending block id update as : {:?}",
611            blockchain.last_block_id
612        );
613    }
614
615    async fn on_stat_interval(&mut self, current_time: Timestamp) {
616        // println!("on_stat_interval : {:?}", current_time);
617
618        self.stats
619            .blocks_fetched
620            .calculate_stats(current_time)
621            .await;
622        self.stats
623            .blocks_created
624            .calculate_stats(current_time)
625            .await;
626        self.stats.received_tx.calculate_stats(current_time).await;
627        self.stats.received_gts.calculate_stats(current_time).await;
628
629        {
630            let wallet = self.wallet_lock.read().await;
631
632            let stat = format!(
633                "{} - {} - total_slips : {:?}, unspent_slips : {:?}, current_balance : {:?}",
634                StatVariable::format_timestamp(current_time),
635                format!("{:width$}", "wallet::state", width = 40),
636                wallet.slips.len(),
637                wallet.get_unspent_slip_count(),
638                wallet.get_available_balance()
639            );
640            self.stat_sender.send(stat).await.unwrap();
641        }
642        {
643            let stat;
644            {
645                // trace!("locking blockchain 5");
646                let blockchain = self.blockchain_lock.read().await;
647
648                stat = format!(
649                    "{} - {} - utxo_size : {:?}, block_count : {:?}, longest_chain_len : {:?} full_block_count : {:?} txs_in_blocks : {:?}",
650                    StatVariable::format_timestamp(current_time),
651                    format!("{:width$}", "blockchain::state", width = 40),
652                    blockchain.utxoset.len(),
653                    blockchain.blocks.len(),
654                    blockchain.get_latest_block_id(),
655                    blockchain.blocks.iter().filter(|(_hash, block)| { block.block_type == BlockType::Full }).count(),
656                    blockchain.blocks.iter().map(|(_hash, block)| { block.transactions.len() }).sum::<usize>()
657                );
658            }
659            // trace!("releasing blockchain 5");
660            self.stat_sender.send(stat).await.unwrap();
661        }
662        {
663            let stat;
664            {
665                let mempool = self.mempool_lock.read().await;
666
667                stat = format!(
668                    "{} - {} - blocks_queue : {:?}, transactions : {:?}",
669                    StatVariable::format_timestamp(current_time),
670                    format!("{:width$}", "mempool:state", width = 40),
671                    mempool.blocks_queue.len(),
672                    mempool.transactions.len(),
673                );
674            }
675
676            self.stat_sender.send(stat).await.unwrap();
677        }
678        {
679            let stat = format!(
680                "{} - {} - capacity : {:?} / {:?}",
681                StatVariable::format_timestamp(current_time),
682                format!("{:width$}", "router::channel", width = 40),
683                self.sender_to_router.capacity(),
684                self.sender_to_router.max_capacity()
685            );
686            self.stat_sender.send(stat).await.unwrap();
687        }
688    }
689
690    fn is_ready_to_process(&self) -> bool {
691        self.sender_to_miner.capacity() > CHANNEL_SAFE_BUFFER
692            && self.sender_to_router.capacity() > CHANNEL_SAFE_BUFFER
693    }
694}
695
696#[cfg(test)]
697mod tests {
698    use log::info;
699    use tracing_subscriber::filter::Directive;
700    use tracing_subscriber::layer::SubscriberExt;
701    use tracing_subscriber::util::SubscriberInitExt;
702    use tracing_subscriber::Layer;
703
704    use crate::core::consensus::block::Block;
705    use crate::core::consensus::slip::SlipType;
706    use crate::core::consensus_thread::ConsensusEvent;
707    use crate::core::defs::{PrintForLog, SaitoHash, NOLAN_PER_SAITO, UTXO_KEY_LENGTH};
708
709    use crate::core::process::keep_time::KeepTime;
710    use crate::core::process::process_event::ProcessEvent;
711    use crate::core::util::crypto::generate_keys;
712    use crate::core::util::test::node_tester::test::{NodeTester, TestTimeKeeper};
713    use std::fs;
714    use std::str::FromStr;
715
716    #[tokio::test]
717    #[serial_test::serial]
718    async fn total_supply_test() {
719        // pretty_env_logger::init();
720        NodeTester::delete_data().await.unwrap();
721        let peer_public_key = generate_keys().0;
722        let mut tester = NodeTester::default();
723        let public_key = tester.get_public_key().await;
724        // tester
725        //     .set_staking_requirement(2_000_000 * NOLAN_PER_SAITO, 60)
726        //     .await;
727        let issuance = vec![
728            // (public_key.to_base58(), 100 * 2_000_000 * NOLAN_PER_SAITO),
729            (public_key.to_base58(), 100_000 * NOLAN_PER_SAITO),
730            (
731                "27UK2MuBTdeARhYp97XBnCovGkEquJjkrQntCgYoqj6GC".to_string(),
732                50_000 * NOLAN_PER_SAITO,
733            ),
734        ];
735        tester.set_issuance(issuance).await.unwrap();
736        tester.set_staking_enabled(false).await;
737        tester.init().await.unwrap();
738        tester.wait_till_block_id(1).await.unwrap();
739        tester
740            .check_total_supply()
741            .await
742            .expect("total supply should not change");
743
744        let tx = tester
745            .create_transaction(10_000, 1000, public_key)
746            .await
747            .unwrap();
748        tester.add_transaction(tx).await;
749        tester.wait_till_block_id(2).await.unwrap();
750        tester
751            .check_total_supply()
752            .await
753            .expect("total supply should not change");
754
755        let tx = tester
756            .create_transaction(10_000, 1000, public_key)
757            .await
758            .unwrap();
759        tester.add_transaction(tx).await;
760        tester.wait_till_block_id(3).await.unwrap();
761        tester
762            .check_total_supply()
763            .await
764            .expect("total supply should not change");
765
766        let tx = tester
767            .create_transaction(10_000, 1000, public_key)
768            .await
769            .unwrap();
770        tester.add_transaction(tx).await;
771        tester.wait_till_block_id(4).await.unwrap();
772        tester
773            .check_total_supply()
774            .await
775            .expect("total supply should not change");
776
777        let tx = tester
778            .create_transaction(10_000, 1000, public_key)
779            .await
780            .unwrap();
781        tester.add_transaction(tx).await;
782        tester.wait_till_block_id(5).await.unwrap();
783        tester
784            .check_total_supply()
785            .await
786            .expect("total supply should not change");
787    }
788    #[tokio::test]
789    #[serial_test::serial]
790    async fn total_supply_test_with_atr() {
791        // pretty_env_logger::init();
792        NodeTester::delete_data().await.unwrap();
793        let peer_public_key = generate_keys().0;
794        let mut tester = NodeTester::default();
795        let public_key = tester.get_public_key().await;
796        // tester
797        //     .set_staking_requirement(2_000_000 * NOLAN_PER_SAITO, 60)
798        //     .await;
799        let issuance = vec![
800            // (public_key.to_base58(), 100 * 2_000_000 * NOLAN_PER_SAITO),
801            (public_key.to_base58(), 100_000 * NOLAN_PER_SAITO),
802            (
803                "27UK2MuBTdeARhYp97XBnCovGkEquJjkrQntCgYoqj6GC".to_string(),
804                50_000 * NOLAN_PER_SAITO,
805            ),
806        ];
807        tester.set_issuance(issuance).await.unwrap();
808        tester.set_staking_enabled(false).await;
809        tester.init().await.unwrap();
810        tester.wait_till_block_id(1).await.unwrap();
811        tester
812            .check_total_supply()
813            .await
814            .expect("total supply should not change");
815
816        let genesis_period = tester
817            .consensus_thread
818            .config_lock
819            .read()
820            .await
821            .get_consensus_config()
822            .unwrap()
823            .genesis_period;
824        for i in 2..2 * genesis_period + 2 {
825            let tx = tester.create_transaction(10, 10, public_key).await.unwrap();
826            tester.add_transaction(tx).await;
827            tester.wait_till_block_id(i).await.unwrap();
828            tester
829                .check_total_supply()
830                .await
831                .expect("total supply should not change");
832        }
833    }
834
835    #[tokio::test]
836    #[serial_test::serial]
837    async fn total_supply_test_with_with_restarts_over_atr() {
838        // pretty_env_logger::init();
839        NodeTester::delete_data().await.unwrap();
840        let mut tester = NodeTester::new(10, None, None);
841        let public_key = tester.get_public_key().await;
842        let private_key = tester.get_private_key().await;
843
844        let issuance = vec![
845            (public_key.to_base58(), 100_000 * NOLAN_PER_SAITO),
846            (
847                "27UK2MuBTdeARhYp97XBnCovGkEquJjkrQntCgYoqj6GC".to_string(),
848                50_000 * NOLAN_PER_SAITO,
849            ),
850        ];
851        tester.set_issuance(issuance).await.unwrap();
852        tester.set_staking_enabled(false).await;
853        tester.init().await.unwrap();
854        tester.wait_till_block_id(1).await.unwrap();
855        tester
856            .check_total_supply()
857            .await
858            .expect("total supply should not change");
859
860        let genesis_period = tester
861            .consensus_thread
862            .config_lock
863            .read()
864            .await
865            .get_consensus_config()
866            .unwrap()
867            .genesis_period;
868
869        assert_eq!(genesis_period, 10, "genesis period should be 10");
870
871        let max_blocks = genesis_period + 2;
872        for i in 2..=max_blocks {
873            let tx = tester.create_transaction(10, 10, public_key).await.unwrap();
874            tester.add_transaction(tx).await;
875            tester.wait_till_block_id(i).await.unwrap();
876            tester
877                .check_total_supply()
878                .await
879                .expect("total supply should not change");
880            let wallet = tester.consensus_thread.wallet_lock.read().await;
881            assert_eq!(
882                wallet
883                    .slips
884                    .iter()
885                    .filter(|(_, slip)| slip.slip_type == SlipType::ATR)
886                    .count(),
887                0
888            );
889        }
890
891        let mut last_block_id = tester
892            .consensus_thread
893            .blockchain_lock
894            .read()
895            .await
896            .get_latest_block_id();
897        let timer = tester.consensus_thread.timer.clone();
898        drop(tester);
899
900        for _ in 0..10 {
901            info!("----------- ------------- ------------- loading the node again : last_block_id : {}", last_block_id);
902
903            let mut tester =
904                NodeTester::new(genesis_period, Some(private_key), Some(timer.clone()));
905            tester.set_staking_enabled(false).await;
906            tester.init().await.unwrap();
907            let loaded_last_block_id = tester
908                .consensus_thread
909                .blockchain_lock
910                .read()
911                .await
912                .get_latest_block_id();
913            assert_eq!(last_block_id, loaded_last_block_id);
914
915            for i in last_block_id + 1..=last_block_id + (genesis_period as f64 * 1.5f64) as u64 {
916                {
917                    let wallet = tester.consensus_thread.wallet_lock.read().await;
918                    info!(
919                    "current wallet balance : {:?} slip_count : {:?} unspent_slips : {}, i : {}",
920                        wallet.get_available_balance(),
921                        wallet.slips.len(),
922                        wallet.get_unspent_slip_count(),
923                        i
924                    );
925                    wallet.slips.iter().for_each(|(_, slip)| {
926                        info!(
927                            "slip : {}-{}-{} amount : {:?} type : {:?} spent : {:?}",
928                            slip.block_id,
929                            slip.tx_ordinal,
930                            slip.slip_index,
931                            slip.amount,
932                            slip.slip_type,
933                            slip.spent
934                        );
935                    });
936                    // since we keep reusing the same slip, there shouldn't be old ATR slips in the wallet
937                    assert_eq!(
938                        wallet
939                            .slips
940                            .iter()
941                            .filter(|(_, slip)| slip.slip_type == SlipType::ATR)
942                            .count(),
943                        0
944                    );
945                }
946
947                let tx = tester
948                    .create_transaction(10, 10, public_key)
949                    .await
950                    .unwrap_or_else(|_| panic!("couldn't create tx. i : {}", i));
951                tester.add_transaction(tx).await;
952                tester.wait_till_block_id(i).await.unwrap();
953                tester
954                    .check_total_supply()
955                    .await
956                    .expect("total supply should not change");
957            }
958            last_block_id = tester
959                .consensus_thread
960                .blockchain_lock
961                .read()
962                .await
963                .get_latest_block_id();
964        }
965    }
966
967    #[tokio::test]
968    #[serial_test::serial]
969    async fn total_supply_test_with_staking_for_slip_count() {
970        // pretty_env_logger::init();
971        NodeTester::delete_data().await.unwrap();
972        let peer_public_key = generate_keys().0;
973        let mut tester = NodeTester::default();
974        let public_key = tester.get_public_key().await;
975        tester
976            .set_staking_requirement(2 * NOLAN_PER_SAITO, 60)
977            .await;
978        let issuance = vec![
979            (public_key.to_base58(), 60 * 2 * NOLAN_PER_SAITO),
980            (public_key.to_base58(), 100 * NOLAN_PER_SAITO),
981            (
982                "27UK2MuBTdeARhYp97XBnCovGkEquJjkrQntCgYoqj6GC".to_string(),
983                50 * NOLAN_PER_SAITO,
984            ),
985        ];
986        tester.set_issuance(issuance).await.unwrap();
987        tester.init().await.unwrap();
988        tester.wait_till_block_id(1).await.unwrap();
989        tester
990            .check_total_supply()
991            .await
992            .expect("total supply should not change");
993
994        let genesis_period = tester
995            .consensus_thread
996            .config_lock
997            .read()
998            .await
999            .get_consensus_config()
1000            .unwrap()
1001            .genesis_period;
1002        for i in 2..2 * genesis_period + 2 {
1003            let tx = tester.create_transaction(10, 10, public_key).await.unwrap();
1004            tester.add_transaction(tx).await;
1005            tester.wait_till_block_id(i).await.unwrap();
1006            tester
1007                .check_total_supply()
1008                .await
1009                .expect("total supply should not change");
1010            assert_eq!(
1011                tester
1012                    .consensus_thread
1013                    .wallet_lock
1014                    .read()
1015                    .await
1016                    .staking_slips
1017                    .len() as u64,
1018                std::cmp::min(60, i - 1)
1019            );
1020            let available_balance = tester
1021                .consensus_thread
1022                .wallet_lock
1023                .read()
1024                .await
1025                .get_available_balance();
1026            assert!(
1027                available_balance + std::cmp::min(60, i - 1) * 2 * NOLAN_PER_SAITO
1028                    <= tester.initial_token_supply
1029            );
1030        }
1031    }
1032
1033    #[tokio::test]
1034    #[serial_test::serial]
1035    async fn blockchain_state_over_atr() {
1036        // pretty_env_logger::init();
1037        NodeTester::delete_data().await.unwrap();
1038        let peer_public_key = generate_keys().0;
1039        let mut tester = NodeTester::new(3, None, None);
1040        let public_key = tester.get_public_key().await;
1041        // tester
1042        //     .set_staking_requirement(2_000_000 * NOLAN_PER_SAITO, 60)
1043        //     .await;
1044        let issuance = vec![
1045            // (public_key.to_base58(), 100 * 2_000_000 * NOLAN_PER_SAITO),
1046            (public_key.to_base58(), 100_000 * NOLAN_PER_SAITO),
1047            // (
1048            //     "27UK2MuBTdeARhYp97XBnCovGkEquJjkrQntCgYoqj6GC".to_string(),
1049            //     50_000 * NOLAN_PER_SAITO,
1050            // ),
1051        ];
1052        tester.set_issuance(issuance).await.unwrap();
1053        tester.set_staking_enabled(false).await;
1054        tester.init().await.unwrap();
1055        tester.wait_till_block_id(1).await.unwrap();
1056        tester
1057            .check_total_supply()
1058            .await
1059            .expect("total supply should not change");
1060
1061        let genesis_period = tester
1062            .consensus_thread
1063            .config_lock
1064            .read()
1065            .await
1066            .get_consensus_config()
1067            .unwrap()
1068            .genesis_period;
1069        assert_eq!(genesis_period, 3, "genesis period should be 3");
1070
1071        let tx = tester
1072            .create_transaction(1000, 1000, public_key)
1073            .await
1074            .unwrap();
1075        tester.add_transaction(tx).await;
1076        tester.wait_till_block_id(2).await.unwrap();
1077        tester
1078            .check_total_supply()
1079            .await
1080            .expect("total supply should not change");
1081
1082        let tx = tester
1083            .create_transaction(1000, 1000, public_key)
1084            .await
1085            .unwrap();
1086        tester.add_transaction(tx).await;
1087        tester.wait_till_block_id(3).await.unwrap();
1088        tester
1089            .check_total_supply()
1090            .await
1091            .expect("total supply should not change");
1092        {
1093            let blockchain = tester.consensus_thread.blockchain_lock.read().await;
1094            let block = blockchain.get_latest_block().expect("block should exist");
1095            assert_eq!(block.total_fees_atr, 0);
1096        }
1097        let tx = tester
1098            .create_transaction(1000, 1000, public_key)
1099            .await
1100            .unwrap();
1101        tester.add_transaction(tx).await;
1102        tester.wait_till_block_id(4).await.unwrap();
1103        tester
1104            .check_total_supply()
1105            .await
1106            .expect("total supply should not change");
1107        {
1108            let blockchain = tester.consensus_thread.blockchain_lock.read().await;
1109            let block = blockchain.get_latest_block().expect("block should exist");
1110            // assert!(block.total_fees_atr > 0);
1111        }
1112
1113        let tx = tester
1114            .create_transaction(1000, 1000, public_key)
1115            .await
1116            .unwrap();
1117        tester.add_transaction(tx).await;
1118        tester.wait_till_block_id(5).await.unwrap();
1119        tester
1120            .check_total_supply()
1121            .await
1122            .expect("total supply should not change");
1123        {
1124            let blockchain = tester.consensus_thread.blockchain_lock.read().await;
1125            let block = blockchain.get_latest_block().expect("block should exist");
1126            // assert!(block.total_fees_atr > 0);
1127        }
1128
1129        let tx = tester
1130            .create_transaction(1000, 1000, public_key)
1131            .await
1132            .unwrap();
1133        tester.add_transaction(tx).await;
1134        tester.wait_till_block_id(6).await.unwrap();
1135        tester
1136            .check_total_supply()
1137            .await
1138            .expect("total supply should not change");
1139        {
1140            let blockchain = tester.consensus_thread.blockchain_lock.read().await;
1141            let block = blockchain.get_latest_block().expect("block should exist");
1142            // assert!(block.total_fees_atr > 0);
1143        }
1144
1145        let tx = tester
1146            .create_transaction(1000, 1000, public_key)
1147            .await
1148            .unwrap();
1149        tester.add_transaction(tx).await;
1150        tester.wait_till_block_id(7).await.unwrap();
1151        tester
1152            .check_total_supply()
1153            .await
1154            .expect("total supply should not change");
1155        {
1156            let blockchain = tester.consensus_thread.blockchain_lock.read().await;
1157            let block = blockchain.get_latest_block().expect("block should exist");
1158            // assert!(block.total_fees_atr > 0);
1159        }
1160    }
1161    #[tokio::test]
1162    #[serial_test::serial]
1163    async fn checkpoints_test() {
1164        // pretty_env_logger::init();
1165        NodeTester::delete_data().await.unwrap();
1166        let mut tester = NodeTester::new(10, None, None);
1167        let public_key = tester.get_public_key().await;
1168        let private_key = tester.get_private_key().await;
1169        let issuance = vec![(public_key.to_base58(), 100_000 * NOLAN_PER_SAITO)];
1170        tester.set_issuance(issuance).await.unwrap();
1171        tester.set_staking_enabled(false).await;
1172        tester.init().await.unwrap();
1173        tester.wait_till_block_id(1).await.unwrap();
1174        tester
1175            .check_total_supply()
1176            .await
1177            .expect("total supply should not change");
1178
1179        let mut utxokey = [0; UTXO_KEY_LENGTH];
1180        // create a main fork first
1181        for i in 2..=10 {
1182            let tx = tester.create_transaction(10, 10, public_key).await.unwrap();
1183
1184            if i == 6 {
1185                utxokey = tx.from[0].utxoset_key;
1186            }
1187            tester.add_transaction(tx).await;
1188            tester.wait_till_block_id(i).await.unwrap();
1189            tester
1190                .check_total_supply()
1191                .await
1192                .expect("total supply should not change");
1193        }
1194
1195        // create a checkpoint file for block 5
1196        {
1197            let io = tester.consensus_thread.storage.io_interface.as_ref();
1198            let blockchain = tester.consensus_thread.blockchain_lock.read().await;
1199            let block_hash: SaitoHash = blockchain.blockring.get_block_hash_by_block_id(5).unwrap();
1200            let file_path = format!("./data/checkpoints/{}-{}.chk", 5, block_hash.to_hex());
1201            io.write_value(&file_path, utxokey.to_hex().as_bytes())
1202                .await
1203                .unwrap();
1204        }
1205        drop(tester);
1206
1207        // reload the node
1208        let mut tester = NodeTester::new(10, Some(private_key), None);
1209        tester.set_staking_enabled(false).await;
1210        tester.init().await.unwrap();
1211        tester.wait_till_block_id(5).await.unwrap();
1212
1213        // check if the latest block is 5
1214        tester
1215            .check_total_supply()
1216            .await
1217            .expect("total supply should not change");
1218        let latest_block_id = tester
1219            .consensus_thread
1220            .blockchain_lock
1221            .read()
1222            .await
1223            .get_latest_block_id();
1224        assert_eq!(latest_block_id, 5);
1225
1226        NodeTester::delete_checkpoints().await.unwrap();
1227    }
1228
1229    #[tokio::test]
1230    #[serial_test::serial]
1231    async fn reorg_over_checkpoints() {
1232        // pretty_env_logger::init();
1233        NodeTester::delete_data().await.unwrap();
1234        let mut tester = NodeTester::new(10, None, None);
1235        let public_key = tester.get_public_key().await;
1236        let private_key = tester.get_private_key().await;
1237        let issuance = vec![
1238            (public_key.to_base58(), 100_000 * NOLAN_PER_SAITO),
1239            (public_key.to_base58(), 10_000 * NOLAN_PER_SAITO),
1240        ];
1241        tester.set_issuance(issuance).await.unwrap();
1242        tester.set_staking_enabled(false).await;
1243        tester.init().await.unwrap();
1244        tester.wait_till_block_id(1).await.unwrap();
1245        tester
1246            .check_total_supply()
1247            .await
1248            .expect("total supply should not change");
1249        let timer = tester.consensus_thread.timer.clone();
1250
1251        let mut utxokey = [0; UTXO_KEY_LENGTH];
1252        let mut block_3_hash = [0; 32];
1253        let alternate_block_4;
1254        // create a main fork first
1255        for i in 2..=4 {
1256            let tx = tester.create_transaction(10, 10, public_key).await.unwrap();
1257
1258            tester.add_transaction(tx).await;
1259            tester.wait_till_block_id(i).await.unwrap();
1260            tester
1261                .check_total_supply()
1262                .await
1263                .expect("total supply should not change");
1264
1265            if i == 3 {
1266                let blockchain = tester.consensus_thread.blockchain_lock.read().await;
1267                block_3_hash = blockchain.blockring.get_block_hash_by_block_id(3).unwrap();
1268            }
1269        }
1270
1271        {
1272            let blockchain = tester.consensus_thread.blockchain_lock.read().await;
1273            let block_hash = blockchain.blockring.get_block_hash_by_block_id(4).unwrap();
1274            alternate_block_4 = blockchain.get_block(&block_hash).unwrap().clone();
1275            // need to remove this from disk to make sure it won't be loaded from disk when the node is restarted
1276            std::fs::remove_file(format!(
1277                "./data/blocks/{}",
1278                alternate_block_4.get_file_name()
1279            ))
1280            .unwrap();
1281        }
1282        drop(tester);
1283
1284        let mut tester = NodeTester::new(10, Some(private_key), Some(timer));
1285        tester.set_staking_enabled(false).await;
1286        tester.init().await.unwrap();
1287        tester.wait_till_block_id(3).await.unwrap();
1288        tester
1289            .check_total_supply()
1290            .await
1291            .expect("total supply should not change");
1292        let timer = tester.consensus_thread.timer.clone();
1293
1294        for i in 4..=10 {
1295            let tx = tester.create_transaction(10, 10, public_key).await.unwrap();
1296
1297            if i == 6 {
1298                utxokey = tx.from[0].utxoset_key;
1299            }
1300            tester.add_transaction(tx).await;
1301            tester.wait_till_block_id(i).await.unwrap();
1302            tester
1303                .check_total_supply()
1304                .await
1305                .expect("total supply should not change");
1306        }
1307
1308        // create a checkpoint file for block 5
1309        {
1310            info!("creating checkpoint for block 5");
1311            let io = tester.consensus_thread.storage.io_interface.as_ref();
1312            let blockchain = tester.consensus_thread.blockchain_lock.read().await;
1313            let block_hash: SaitoHash = blockchain.blockring.get_block_hash_by_block_id(5).unwrap();
1314            let file_path = format!("./data/checkpoints/{}-{}.chk", 5, block_hash.to_hex());
1315            io.write_value(&file_path, utxokey.to_hex().as_bytes())
1316                .await
1317                .unwrap();
1318        }
1319        drop(tester);
1320
1321        // reload the node
1322        info!("-------------- reloading the node -----------------\n\n\n\n\n");
1323        let mut tester = NodeTester::new(10, Some(private_key), Some(timer));
1324        tester.set_staking_enabled(false).await;
1325        tester.init().await.unwrap();
1326        tester
1327            .run_until(TestTimeKeeper {}.get_timestamp_in_ms() + 5)
1328            .await
1329            .unwrap();
1330        tester.wait_till_block_id(5).await.unwrap();
1331
1332        // check if the latest block is 5
1333        tester
1334            .check_total_supply()
1335            .await
1336            .expect("total supply should not change");
1337        let latest_block_id = tester
1338            .consensus_thread
1339            .blockchain_lock
1340            .read()
1341            .await
1342            .get_latest_block_id();
1343        assert_eq!(latest_block_id, 5);
1344
1345        info!("adding new txs to increase the chain length");
1346        for i in 6..=10 {
1347            let tx = tester.create_transaction(10, 10, public_key).await.unwrap();
1348
1349            tester.add_transaction(tx).await;
1350            info!("added tx : {}", i);
1351            let latest_block_id = tester
1352                .consensus_thread
1353                .blockchain_lock
1354                .read()
1355                .await
1356                .get_latest_block_id();
1357            info!("latest block id : {}", latest_block_id);
1358            tester.wait_till_block_id(i).await.unwrap();
1359            tester
1360                .check_total_supply()
1361                .await
1362                .expect("total supply should not change");
1363        }
1364
1365        // create a fork starting from block 3
1366        {
1367            info!("adding alternate block 4");
1368            tester.add_block(alternate_block_4.clone()).await;
1369        }
1370
1371        // check if the latest block is 10 still
1372        let latest_block_id = tester
1373            .consensus_thread
1374            .blockchain_lock
1375            .read()
1376            .await
1377            .get_latest_block_id();
1378        assert_eq!(latest_block_id, 10);
1379
1380        tester
1381            .run_until(TestTimeKeeper {}.get_timestamp_in_ms() + 5)
1382            .await
1383            .unwrap();
1384
1385        // check that blockchain doesn't have the alternate block
1386        {
1387            let blockchain = tester.consensus_thread.blockchain_lock.read().await;
1388            let block = blockchain.get_block(&alternate_block_4.hash);
1389            assert!(block.is_none());
1390        }
1391    }
1392
1393    #[tokio::test]
1394    #[serial_test::serial]
1395    async fn loading_isolated_forks_test() {
1396        // pretty_env_logger::init();
1397        NodeTester::delete_data().await.unwrap();
1398        let mut tester = NodeTester::new(10, None, None);
1399        let public_key = tester.get_public_key().await;
1400        let private_key = tester.get_private_key().await;
1401        let issuance = vec![(public_key.to_base58(), 100_000 * NOLAN_PER_SAITO)];
1402        tester.set_issuance(issuance).await.unwrap();
1403        tester.set_staking_enabled(false).await;
1404        tester.init().await.unwrap();
1405        tester.wait_till_block_id(1).await.unwrap();
1406        tester
1407            .check_total_supply()
1408            .await
1409            .expect("total supply should not change");
1410
1411        let mut old_block: Option<Block> = None;
1412        // create a main fork first
1413        for i in 2..=100 {
1414            let tx = tester.create_transaction(10, 10, public_key).await.unwrap();
1415
1416            if i == 25 {
1417                old_block = tester
1418                    .consensus_thread
1419                    .blockchain_lock
1420                    .read()
1421                    .await
1422                    .get_latest_block()
1423                    .cloned();
1424                assert_eq!(old_block.as_ref().unwrap().id, 24);
1425            }
1426            tester.add_transaction(tx).await;
1427            tester.wait_till_block_id(i).await.unwrap();
1428            tester
1429                .check_total_supply()
1430                .await
1431                .expect("total supply should not change");
1432        }
1433
1434        // copy block 24 back to blocks folder
1435        {
1436            tester
1437                .consensus_thread
1438                .storage
1439                .write_block_to_disk(&old_block.unwrap())
1440                .await;
1441        }
1442        let timer = tester.consensus_thread.timer.clone();
1443        drop(tester);
1444
1445        // Check if the file count inside the data directory is 21
1446        {
1447            let data_dir = "./data/blocks";
1448            let file_count = fs::read_dir(data_dir)
1449                .unwrap_or_else(|_| panic!("Failed to read directory: {}", data_dir))
1450                .count();
1451
1452            assert_eq!(
1453                file_count, 21,
1454                "Expected 21 files in the data directory, found {}",
1455                file_count
1456            );
1457        }
1458
1459        // reload the node
1460        let mut tester = NodeTester::new(10, Some(private_key), Some(timer));
1461        tester.set_staking_enabled(false).await;
1462        tester.init().await.unwrap();
1463        tester.wait_till_block_id(100).await.unwrap();
1464
1465        // check if the latest block is 5
1466        tester
1467            .check_total_supply()
1468            .await
1469            .expect("total supply should not change");
1470        let latest_block_id = tester
1471            .consensus_thread
1472            .blockchain_lock
1473            .read()
1474            .await
1475            .get_latest_block_id();
1476        assert_eq!(latest_block_id, 100);
1477
1478        // Check if the file count inside the data directory is 20
1479        {
1480            let data_dir = "./data/blocks";
1481            let file_count = fs::read_dir(data_dir)
1482                .unwrap_or_else(|_| panic!("Failed to read directory: {}", data_dir))
1483                .count();
1484
1485            assert_eq!(
1486                file_count, 20,
1487                "Expected 20 files in the data directory, found {}",
1488                file_count
1489            );
1490        }
1491    }
1492
1493    #[tokio::test]
1494    #[serial_test::serial]
1495    async fn receiving_old_blocks_again_test() {
1496        // pretty_env_logger::init();
1497        NodeTester::delete_data().await.unwrap();
1498        let mut tester = NodeTester::new(10, None, None);
1499        let public_key = tester.get_public_key().await;
1500        let private_key = tester.get_private_key().await;
1501        tester.set_staking_requirement(2 * NOLAN_PER_SAITO, 8).await;
1502        let issuance = vec![
1503            (public_key.to_base58(), 8 * 2 * NOLAN_PER_SAITO),
1504            (public_key.to_base58(), 100 * NOLAN_PER_SAITO),
1505            (
1506                "27UK2MuBTdeARhYp97XBnCovGkEquJjkrQntCgYoqj6GC".to_string(),
1507                50 * NOLAN_PER_SAITO,
1508            ),
1509        ];
1510        tester.set_issuance(issuance).await.unwrap();
1511        tester.init().await.unwrap();
1512        tester.wait_till_block_id(1).await.unwrap();
1513        tester
1514            .check_total_supply()
1515            .await
1516            .expect("total supply should not change");
1517
1518        let mut blocks = vec![];
1519        // create a main fork first
1520        for i in 2..=100 {
1521            let tx = tester
1522                .create_transaction(NOLAN_PER_SAITO, NOLAN_PER_SAITO, public_key)
1523                .await
1524                .unwrap();
1525
1526            tester.add_transaction(tx).await;
1527            tester.wait_till_block_id(i).await.unwrap();
1528
1529            if i >= 30 && i < 50 {
1530                let block = tester
1531                    .consensus_thread
1532                    .blockchain_lock
1533                    .read()
1534                    .await
1535                    .get_latest_block()
1536                    .cloned();
1537                blocks.push(block.clone().unwrap());
1538            }
1539            tester
1540                .check_total_supply()
1541                .await
1542                .expect("total supply should not change");
1543        }
1544
1545        assert_eq!(blocks.len(), 20, "blocks length should be 20");
1546        let latest_block_id = tester
1547            .consensus_thread
1548            .blockchain_lock
1549            .read()
1550            .await
1551            .get_latest_block_id();
1552        assert_eq!(latest_block_id, 100);
1553
1554        // NodeTester::delete_data().await.unwrap();
1555        // let mut tester = NodeTester::new(10, Some(private_key), None);
1556        // tester.set_staking_requirement(2 * NOLAN_PER_SAITO, 8).await;
1557        // tester.init().await.unwrap();
1558        // tester.wait_till_block_id(100).await.unwrap();
1559        // tester
1560        //     .check_total_supply()
1561        //     .await
1562        //     .expect("total supply should not change");
1563
1564        {
1565            for mut block in blocks {
1566                block.in_longest_chain = false;
1567                block.transaction_map.clear();
1568                block.created_hashmap_of_slips_spent_this_block = false;
1569                block.safe_to_prune_transactions = false;
1570                block.slips_spent_this_block.clear();
1571
1572                tester
1573                    .consensus_thread
1574                    .process_event(ConsensusEvent::BlockFetched {
1575                        block: block,
1576                        peer_index: 0,
1577                    })
1578                    .await;
1579            }
1580        }
1581
1582        let latest_block_id_new = tester
1583            .consensus_thread
1584            .blockchain_lock
1585            .read()
1586            .await
1587            .get_latest_block_id();
1588        assert_eq!(latest_block_id_new, 100);
1589        tester
1590            .check_total_supply()
1591            .await
1592            .expect("total supply should not change");
1593    }
1594    fn setup_log() {
1595        // switch to this for instrumentation
1596        // console_subscriber::init();
1597
1598        let filter = tracing_subscriber::EnvFilter::builder()
1599            .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
1600            .from_env_lossy();
1601        let filter = filter.add_directive(Directive::from_str("tokio_tungstenite=info").unwrap());
1602        let filter = filter.add_directive(Directive::from_str("tungstenite=info").unwrap());
1603        let filter = filter.add_directive(Directive::from_str("mio::poll=info").unwrap());
1604        let filter = filter.add_directive(Directive::from_str("hyper::proto=info").unwrap());
1605        let filter = filter.add_directive(Directive::from_str("hyper::client=info").unwrap());
1606        let filter = filter.add_directive(Directive::from_str("want=info").unwrap());
1607        let filter = filter.add_directive(Directive::from_str("reqwest::async_impl=info").unwrap());
1608        let filter = filter.add_directive(Directive::from_str("reqwest::connect=info").unwrap());
1609        let filter = filter.add_directive(Directive::from_str("warp::filters=info").unwrap());
1610        let filter = filter.add_directive(Directive::from_str("tokio::task=info").unwrap());
1611        let filter = filter.add_directive(Directive::from_str("runtime::resource=info").unwrap());
1612
1613        // let filter = filter.add_directive(Directive::from_str("saito_stats=info").unwrap());
1614
1615        let fmt_layer = tracing_subscriber::fmt::Layer::default().with_filter(filter);
1616        // let fmt_layer = fmt_layer.with_filter(FilterFn::new(|meta| {
1617        //     !meta.target().contains("waker.clone") && !meta.target().contains("waker.drop") &&
1618        // }));
1619
1620        tracing_subscriber::registry().with(fmt_layer).init();
1621    }
1622    #[tokio::test]
1623    #[serial_test::serial]
1624    async fn receiving_out_of_order_blocks_test() {
1625        // setup_log();
1626        // pretty_env_logger::init();
1627        NodeTester::delete_data().await.unwrap();
1628        let mut tester = NodeTester::new(100, None, None);
1629        let public_key = tester.get_public_key().await;
1630        let private_key = tester.get_private_key().await;
1631        tester.set_staking_requirement(2 * NOLAN_PER_SAITO, 8).await;
1632        let issuance = vec![
1633            (public_key.to_base58(), 8 * 2 * NOLAN_PER_SAITO),
1634            (public_key.to_base58(), 100 * NOLAN_PER_SAITO),
1635            (
1636                "27UK2MuBTdeARhYp97XBnCovGkEquJjkrQntCgYoqj6GC".to_string(),
1637                50 * NOLAN_PER_SAITO,
1638            ),
1639        ];
1640        tester.set_issuance(issuance.clone()).await.unwrap();
1641        tester.init().await.unwrap();
1642        tester.wait_till_block_id(1).await.unwrap();
1643        tester
1644            .check_total_supply()
1645            .await
1646            .expect("total supply should not change");
1647
1648        let mut blocks = vec![];
1649        blocks.push(
1650            tester
1651                .consensus_thread
1652                .blockchain_lock
1653                .read()
1654                .await
1655                .get_latest_block()
1656                .cloned()
1657                .unwrap(),
1658        );
1659        assert_eq!(
1660            tester
1661                .consensus_thread
1662                .blockchain_lock
1663                .read()
1664                .await
1665                .get_latest_block_id(),
1666            1
1667        );
1668        // create a main fork first
1669        for i in 2..=100 {
1670            let tx = tester
1671                .create_transaction(NOLAN_PER_SAITO, 0, public_key)
1672                .await
1673                .unwrap();
1674
1675            tester.add_transaction(tx).await;
1676            tester.wait_till_block_id(i).await.unwrap();
1677
1678            // if i >= 30 && i < 50 {
1679            let block = tester
1680                .consensus_thread
1681                .blockchain_lock
1682                .read()
1683                .await
1684                .get_latest_block()
1685                .cloned();
1686            blocks.push(block.clone().unwrap());
1687            // }
1688            tester
1689                .check_total_supply()
1690                .await
1691                .expect("total supply should not change");
1692        }
1693
1694        assert_eq!(blocks.len(), 100, "blocks length should be 100");
1695        let latest_block_id = tester
1696            .consensus_thread
1697            .blockchain_lock
1698            .read()
1699            .await
1700            .get_latest_block_id();
1701        assert_eq!(latest_block_id, 100);
1702        let timer = tester.consensus_thread.timer.clone();
1703
1704        NodeTester::delete_data().await.unwrap();
1705        let mut tester = NodeTester::new(100, Some(private_key), Some(timer));
1706        tester
1707            .consensus_thread
1708            .storage
1709            .write_block_to_disk(&blocks.remove(0))
1710            .await;
1711        tester.set_staking_requirement(2 * NOLAN_PER_SAITO, 8).await;
1712        tester.init().await.unwrap();
1713
1714        for mut block in blocks.drain(0..19) {
1715            block.in_longest_chain = false;
1716            block.transaction_map.clear();
1717            block.created_hashmap_of_slips_spent_this_block = false;
1718            block.safe_to_prune_transactions = false;
1719            block.slips_spent_this_block.clear();
1720
1721            let block_id = block.id;
1722            tester
1723                .consensus_thread
1724                .process_event(ConsensusEvent::BlockFetched {
1725                    block: block,
1726                    peer_index: 0,
1727                })
1728                .await;
1729            tester.wait_till_block_id(block_id).await.unwrap();
1730
1731            if block_id == 20 {
1732                break;
1733            }
1734        }
1735        blocks.drain(0..10);
1736
1737        let latest_block_id = tester
1738            .consensus_thread
1739            .blockchain_lock
1740            .read()
1741            .await
1742            .get_latest_block_id();
1743        assert_eq!(latest_block_id, 20);
1744
1745        {
1746            for mut block in blocks {
1747                block.in_longest_chain = false;
1748                block.transaction_map.clear();
1749                block.created_hashmap_of_slips_spent_this_block = false;
1750                block.safe_to_prune_transactions = false;
1751                block.slips_spent_this_block.clear();
1752
1753                let block_id = block.id;
1754                let block_hash = block.hash;
1755
1756                tester
1757                    .consensus_thread
1758                    .process_event(ConsensusEvent::BlockFetched {
1759                        block: block,
1760                        peer_index: 0,
1761                    })
1762                    .await;
1763                tester
1764                    .wait_till_block_id_with_hash(block_id, block_hash)
1765                    .await
1766                    .unwrap();
1767                if block_id == 40 {
1768                    break;
1769                }
1770            }
1771        }
1772
1773        let latest_block_id_new = tester
1774            .consensus_thread
1775            .blockchain_lock
1776            .read()
1777            .await
1778            .get_latest_block_id();
1779        assert_eq!(latest_block_id_new, 20);
1780        tester
1781            .check_total_supply()
1782            .await
1783            .expect("total supply should not change");
1784    }
1785}