saito_spammer/
transaction_generator.rs

1use std::collections::VecDeque;
2use std::sync::Arc;
3use std::time::Duration;
4
5use log::{debug, info};
6use rayon::prelude::*;
7use saito_core::core::consensus::peers::peer::PeerStatus;
8use tokio::sync::mpsc::Sender;
9use tokio::sync::RwLock;
10
11use saito_core::core::consensus::blockchain::Blockchain;
12use saito_core::core::consensus::peers::peer_collection::PeerCollection;
13use saito_core::core::consensus::slip::{Slip, SLIP_SIZE};
14use saito_core::core::consensus::transaction::Transaction;
15use saito_core::core::consensus::wallet::Wallet;
16use saito_core::core::defs::{Currency, SaitoPrivateKey, SaitoPublicKey};
17use saito_core::core::process::keep_time::KeepTime;
18use saito_core::core::util::crypto::generate_random_bytes;
19use saito_core::drain;
20use saito_rust::time_keeper::TimeKeeper;
21use tracing_subscriber::field::debug;
22
23use crate::config_handler::SpammerConfigs;
24use saito_core::core::util::configuration::Configuration;
25
26#[derive(Clone, PartialEq)]
27pub enum GeneratorState {
28    CreatingSlips,
29    WaitingForBlockChainConfirmation,
30    Done,
31}
32
33pub struct TransactionGenerator {
34    pub state: GeneratorState,
35    wallet_lock: Arc<RwLock<Wallet>>,
36    blockchain_lock: Arc<RwLock<Blockchain>>,
37    expected_slip_count: u64,
38    tx_size: u64,
39    tx_count: u64,
40    time_keeper: Box<TimeKeeper>,
41    public_key: SaitoPublicKey,
42    private_key: SaitoPrivateKey,
43    sender: Sender<VecDeque<Transaction>>,
44    tx_payment: Currency,
45    tx_fee: Currency,
46    pub peer_lock: Arc<RwLock<PeerCollection>>,
47    configuration_lock: Arc<RwLock<SpammerConfigs>>,
48}
49
50impl TransactionGenerator {
51    pub async fn create(
52        wallet_lock: Arc<RwLock<Wallet>>,
53        peers_lock: Arc<RwLock<PeerCollection>>,
54        blockchain_lock: Arc<RwLock<Blockchain>>,
55        configuration_lock: Arc<RwLock<SpammerConfigs>>,
56        sender: Sender<VecDeque<Transaction>>,
57        tx_payment: Currency,
58        tx_fee: Currency,
59    ) -> Self {
60        let tx_size;
61        let tx_count;
62        {
63            let configs = configuration_lock.read().await;
64
65            tx_size = configs.get_spammer_configs().tx_size;
66            tx_count = configs.get_spammer_configs().tx_count;
67        }
68
69        let mut res = TransactionGenerator {
70            state: GeneratorState::CreatingSlips,
71            wallet_lock: wallet_lock.clone(),
72            blockchain_lock: blockchain_lock.clone(),
73            expected_slip_count: 1,
74            tx_size,
75            tx_count,
76            time_keeper: Box::new(TimeKeeper {}),
77            public_key: [0; 33],
78            private_key: [0; 32],
79            sender,
80            tx_payment,
81            tx_fee,
82            peer_lock: peers_lock.clone(),
83            configuration_lock,
84        };
85        {
86            let wallet = wallet_lock.read().await;
87            res.public_key = wallet.public_key;
88            res.private_key = wallet.private_key;
89        }
90        res
91    }
92
93    pub fn get_state(&self) -> GeneratorState {
94        self.state.clone()
95    }
96    pub async fn on_new_block(&mut self) {
97        match self.state {
98            GeneratorState::CreatingSlips => {
99                self.create_slips().await;
100            }
101            GeneratorState::WaitingForBlockChainConfirmation => {
102                if self.check_blockchain_for_confirmation().await {
103                    self.create_test_transactions().await;
104                    self.state = GeneratorState::Done;
105                } else {
106                    tokio::time::sleep(Duration::from_millis(1_000)).await;
107                }
108            }
109            GeneratorState::Done => {}
110        }
111    }
112
113    async fn create_slips(&mut self) {
114        info!(
115            "creating slips for spammer. expect : {:?}",
116            self.expected_slip_count
117        );
118        let output_slips_per_input_slip: u8 = 100;
119        let unspent_slip_count;
120        let available_balance;
121
122        {
123            let wallet = self.wallet_lock.read().await;
124
125            unspent_slip_count = wallet.get_unspent_slip_count();
126            available_balance = wallet.get_available_balance();
127        }
128
129        if unspent_slip_count < self.tx_count && unspent_slip_count >= self.expected_slip_count {
130            info!(
131                "Creating new slips, current = {:?}, target = {:?} balance = {:?}",
132                unspent_slip_count, self.tx_count, available_balance
133            );
134
135            let total_nolans_requested_per_slip =
136                available_balance / unspent_slip_count as Currency;
137            let mut total_output_slips_created: u64 = 0;
138
139            let mut to_public_key = [0; 33];
140
141            {
142                let peers = self.peer_lock.read().await;
143
144                if peers.index_to_peers.is_empty() {
145                    info!("not yet connected to a node");
146                    return;
147                }
148
149                if let Some((index, peer)) = peers.index_to_peers.iter().next() {
150                    if let PeerStatus::Connected = peer.peer_status {
151                        to_public_key = peer.get_public_key().unwrap();
152                    } else {
153                        info!("peer not connected. status : {:?}", peer.peer_status);
154                        return;
155                    }
156                }
157                assert_eq!(peers.address_to_peers.len(), 1usize, "we have assumed connecting to a single node. move add_hop to correct place if not.");
158                assert_ne!(to_public_key, self.public_key);
159            }
160            let mut txs: VecDeque<Transaction> = Default::default();
161            for _i in 0..unspent_slip_count {
162                let transaction = self
163                    .create_slip_transaction(
164                        output_slips_per_input_slip,
165                        total_nolans_requested_per_slip,
166                        &mut total_output_slips_created,
167                        &to_public_key,
168                    )
169                    .await;
170
171                // txs.push_back(transaction);
172                txs.push_back(transaction);
173
174                if total_output_slips_created >= self.tx_count {
175                    info!(
176                        "Slip creation completed, current = {:?}, target = {:?}",
177                        total_output_slips_created, self.tx_count
178                    );
179                    info!("changing state to 'WaitingForBlockChainConfirmation'");
180                    self.state = GeneratorState::WaitingForBlockChainConfirmation;
181                    break;
182                }
183            }
184            info!("{:?} slip creation txs generated", txs.len());
185            self.sender.send(txs).await.unwrap();
186
187            // self.expected_slip_count = total_output_slips_created;
188
189            info!(
190                "New slips created, current = {:?}, target = {:?}",
191                total_output_slips_created, self.tx_count
192            );
193        } else {
194            if unspent_slip_count >= self.tx_count {
195                self.state = GeneratorState::WaitingForBlockChainConfirmation;
196                info!("changing state to 'WaitingForBlockChainConfirmation' since we have enough slips");
197                return;
198            }
199            info!(
200                "not enough slips. unspent slip count : {:?} tx count : {:?} expected slips : {:?}",
201                unspent_slip_count, self.tx_count, self.expected_slip_count
202            );
203            tokio::time::sleep(Duration::from_millis(1_000)).await;
204        }
205    }
206
207    async fn create_slip_transaction(
208        &mut self,
209        output_slips_per_input_slip: u8,
210        total_nolans_requested_per_slip: Currency,
211        total_output_slips_created: &mut u64,
212        to_public_key: &SaitoPublicKey,
213    ) -> Transaction {
214        let payment_amount =
215            total_nolans_requested_per_slip / output_slips_per_input_slip as Currency;
216
217        let genesis_period;
218        let latest_block_id;
219        {
220            genesis_period = self.get_genesis_period().await;
221            latest_block_id = self.get_latest_block_id().await;
222        }
223
224        let mut wallet = self.wallet_lock.write().await;
225
226        let mut transaction = Transaction::default();
227
228        let (input_slips, output_slips) = wallet.generate_slips(
229            total_nolans_requested_per_slip,
230            None,
231            latest_block_id,
232            genesis_period,
233        );
234
235        for slip in input_slips {
236            transaction.add_from_slip(slip);
237        }
238        for slip in output_slips {
239            transaction.add_to_slip(slip);
240        }
241
242        for _c in 0..output_slips_per_input_slip {
243            let mut output = Slip::default();
244            output.public_key = self.public_key;
245            output.amount = payment_amount;
246            transaction.add_to_slip(output);
247            *total_output_slips_created += 1;
248        }
249
250        let remaining_bytes: i64 =
251            self.tx_size as i64 - (*total_output_slips_created + 1) as i64 * SLIP_SIZE as i64;
252
253        if remaining_bytes > 0 {
254            transaction.data = generate_random_bytes(remaining_bytes as u64).await;
255        }
256
257        transaction.timestamp = self.time_keeper.get_timestamp_in_ms();
258        transaction.generate(&self.public_key, 0, 0);
259        transaction.sign(&self.private_key);
260        transaction.add_hop(&wallet.private_key, &wallet.public_key, to_public_key);
261
262        transaction
263    }
264
265    pub async fn check_blockchain_for_confirmation(&mut self) -> bool {
266        info!("checking for blockchain confirmation...");
267        let unspent_slip_count;
268        {
269            let wallet = self.wallet_lock.read().await;
270            unspent_slip_count = wallet.get_unspent_slip_count();
271        }
272
273        if unspent_slip_count >= self.tx_count {
274            info!(
275                "New slips detected on the blockchain, current = {:?}, target = {:?}",
276                unspent_slip_count, self.tx_count
277            );
278            info!("changing state to 'Generation Done'");
279            self.state = GeneratorState::Done;
280            return true;
281        }
282        info!(
283            "unspent slips : {:?} tx count : {:?}",
284            unspent_slip_count, self.tx_count
285        );
286        false
287    }
288
289    pub async fn create_test_transactions(&mut self) {
290        info!("creating test transactions : {:?}", self.tx_count);
291
292        let time_keeper = TimeKeeper {};
293        let wallet = self.wallet_lock.clone();
294        let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
295        let public_key = self.public_key;
296        let count = self.tx_count;
297        let required_balance = (self.tx_payment + self.tx_fee) * count as Currency;
298        let payment = self.tx_payment;
299        let fee = self.tx_fee;
300
301        let genesis_period = self.get_genesis_period().await;
302        let latest_block_id = self.get_latest_block_id().await;
303
304        tokio::spawn(async move {
305            info!(
306                "creating test transactions from new thread : count = {:?}",
307                count
308            );
309            let sender = sender.clone();
310            loop {
311                let mut work_done = false;
312                {
313                    // let blockchain = lock_for_write!(blockchain, LOCK_ORDER_BLOCKCHAIN);
314                    let mut wallet = wallet.write().await;
315
316                    if wallet.get_available_balance() >= required_balance {
317                        // assert_ne!(blockchain.utxoset.len(), 0);
318                        let mut vec = VecDeque::with_capacity(count as usize);
319                        for i in 0..count {
320                            if i % 100_000 == 0 {
321                                info!("creating test transactions : {:?}", i);
322                            }
323                            let transaction = Transaction::create(
324                                &mut wallet,
325                                public_key,
326                                payment,
327                                fee,
328                                false,
329                                None,
330                                latest_block_id,
331                                genesis_period,
332                            );
333                            if transaction.is_err() {
334                                debug!("transaction creation failed. {:?}", transaction);
335                                break;
336                            }
337                            let mut transaction = transaction.unwrap();
338                            transaction.generate_total_fees(0, 0);
339                            if (transaction.total_in == 0 || transaction.total_out == 0)
340                                && (payment + fee != 0)
341                            {
342                                debug!("transaction not added since not enough funds. in : {:?} out : {:?}. current balance : {:?}, required : {:?}", transaction.total_in, transaction.total_out,wallet.get_available_balance(), required_balance);
343                                break;
344                            }
345                            vec.push_back(transaction);
346                        }
347                        if !vec.is_empty() {
348                            info!("sending generated {:?} txs to spammer. sender capacity : {:?} / {:?}",vec.len(),sender.capacity(),sender.max_capacity());
349                            sender.send(vec).await.unwrap();
350                            work_done = true;
351                        }
352                    } else {
353                        info!("not enough balance in wallet to create spam txs");
354                    }
355                }
356                if !work_done {
357                    tokio::time::sleep(Duration::from_millis(1_000)).await;
358                }
359            }
360        });
361        tokio::task::yield_now().await;
362
363        let mut to_public_key = [0; 33];
364
365        {
366            let peers = self.peer_lock.read().await;
367
368            if let Some((index, peer)) = peers.index_to_peers.iter().next() {
369                // if let PeerStatus::Connected = peer.peer_status {
370                info!("peer count : {}", peers.index_to_peers.len());
371                info!("peer status : {:?}", peer.peer_status);
372                to_public_key = peer.get_public_key().unwrap();
373                // } else {
374                //     info!("peer not connected. status : {:?}", peer.peer_status);
375                // }
376            }
377            // assert_eq!(peers.address_to_peers.len(), 1 as usize, "we have assumed connecting to a single node. move add_hop to correct place if not.");
378            assert_ne!(to_public_key, self.public_key);
379        }
380
381        while let Some(mut transactions) = receiver.recv().await {
382            let sender = self.sender.clone();
383            let tx_size = self.tx_size;
384            info!(
385                "received {:?} unsigned txs from generator",
386                transactions.len()
387            );
388
389            let txs: VecDeque<Transaction> = drain!(transactions, 100)
390                .map(|mut transaction| {
391                    transaction.data = vec![0; tx_size as usize]; //;generate_random_bytes(tx_size as u64);
392                    transaction.timestamp = time_keeper.get_timestamp_in_ms();
393                    transaction.generate(&public_key, 0, 0);
394                    transaction.sign(&self.private_key);
395                    transaction.add_hop(&self.private_key, &self.public_key, &to_public_key);
396
397                    transaction
398                })
399                .collect();
400            info!("sending {:?} signed txs to spammer", txs.len());
401            sender.send(txs).await.unwrap();
402        }
403
404        // info!("Test transactions created, count : {:?}", txs.len());
405    }
406
407    async fn get_latest_block_id(&self) -> u64 {
408        let blockchain = self.blockchain_lock.read().await;
409        blockchain.blockring.get_latest_block_id()
410    }
411
412    async fn get_genesis_period(&self) -> u64 {
413        let config_guard = self.configuration_lock.read().await;
414
415        let config: &dyn Configuration = &*config_guard;
416
417        if let Some(consensus_config) = config.get_consensus_config() {
418            let period = consensus_config.genesis_period;
419            // println!("genesis_period: {:?}", period);
420            period
421        } else {
422            println!("No consensus config available.");
423            // Default value if consensus config is not available
424            1000
425        }
426    }
427}