1use std::collections::VecDeque;
2use std::sync::Arc;
3use std::time::Duration;
4
5use log::{debug, info};
6use rayon::prelude::*;
7use saito_core::core::consensus::peers::peer::PeerStatus;
8use tokio::sync::mpsc::Sender;
9use tokio::sync::RwLock;
10
11use saito_core::core::consensus::blockchain::Blockchain;
12use saito_core::core::consensus::peers::peer_collection::PeerCollection;
13use saito_core::core::consensus::slip::{Slip, SLIP_SIZE};
14use saito_core::core::consensus::transaction::Transaction;
15use saito_core::core::consensus::wallet::Wallet;
16use saito_core::core::defs::{Currency, SaitoPrivateKey, SaitoPublicKey};
17use saito_core::core::process::keep_time::KeepTime;
18use saito_core::core::util::crypto::generate_random_bytes;
19use saito_core::drain;
20use saito_rust::time_keeper::TimeKeeper;
21use tracing_subscriber::field::debug;
22
23use crate::config_handler::SpammerConfigs;
24use saito_core::core::util::configuration::Configuration;
25
26#[derive(Clone, PartialEq)]
27pub enum GeneratorState {
28 CreatingSlips,
29 WaitingForBlockChainConfirmation,
30 Done,
31}
32
33pub struct TransactionGenerator {
34 pub state: GeneratorState,
35 wallet_lock: Arc<RwLock<Wallet>>,
36 blockchain_lock: Arc<RwLock<Blockchain>>,
37 expected_slip_count: u64,
38 tx_size: u64,
39 tx_count: u64,
40 time_keeper: Box<TimeKeeper>,
41 public_key: SaitoPublicKey,
42 private_key: SaitoPrivateKey,
43 sender: Sender<VecDeque<Transaction>>,
44 tx_payment: Currency,
45 tx_fee: Currency,
46 pub peer_lock: Arc<RwLock<PeerCollection>>,
47 configuration_lock: Arc<RwLock<SpammerConfigs>>,
48}
49
50impl TransactionGenerator {
51 pub async fn create(
52 wallet_lock: Arc<RwLock<Wallet>>,
53 peers_lock: Arc<RwLock<PeerCollection>>,
54 blockchain_lock: Arc<RwLock<Blockchain>>,
55 configuration_lock: Arc<RwLock<SpammerConfigs>>,
56 sender: Sender<VecDeque<Transaction>>,
57 tx_payment: Currency,
58 tx_fee: Currency,
59 ) -> Self {
60 let tx_size;
61 let tx_count;
62 {
63 let configs = configuration_lock.read().await;
64
65 tx_size = configs.get_spammer_configs().tx_size;
66 tx_count = configs.get_spammer_configs().tx_count;
67 }
68
69 let mut res = TransactionGenerator {
70 state: GeneratorState::CreatingSlips,
71 wallet_lock: wallet_lock.clone(),
72 blockchain_lock: blockchain_lock.clone(),
73 expected_slip_count: 1,
74 tx_size,
75 tx_count,
76 time_keeper: Box::new(TimeKeeper {}),
77 public_key: [0; 33],
78 private_key: [0; 32],
79 sender,
80 tx_payment,
81 tx_fee,
82 peer_lock: peers_lock.clone(),
83 configuration_lock,
84 };
85 {
86 let wallet = wallet_lock.read().await;
87 res.public_key = wallet.public_key;
88 res.private_key = wallet.private_key;
89 }
90 res
91 }
92
93 pub fn get_state(&self) -> GeneratorState {
94 self.state.clone()
95 }
96 pub async fn on_new_block(&mut self) {
97 match self.state {
98 GeneratorState::CreatingSlips => {
99 self.create_slips().await;
100 }
101 GeneratorState::WaitingForBlockChainConfirmation => {
102 if self.check_blockchain_for_confirmation().await {
103 self.create_test_transactions().await;
104 self.state = GeneratorState::Done;
105 } else {
106 tokio::time::sleep(Duration::from_millis(1_000)).await;
107 }
108 }
109 GeneratorState::Done => {}
110 }
111 }
112
113 async fn create_slips(&mut self) {
114 info!(
115 "creating slips for spammer. expect : {:?}",
116 self.expected_slip_count
117 );
118 let output_slips_per_input_slip: u8 = 100;
119 let unspent_slip_count;
120 let available_balance;
121
122 {
123 let wallet = self.wallet_lock.read().await;
124
125 unspent_slip_count = wallet.get_unspent_slip_count();
126 available_balance = wallet.get_available_balance();
127 }
128
129 if unspent_slip_count < self.tx_count && unspent_slip_count >= self.expected_slip_count {
130 info!(
131 "Creating new slips, current = {:?}, target = {:?} balance = {:?}",
132 unspent_slip_count, self.tx_count, available_balance
133 );
134
135 let total_nolans_requested_per_slip =
136 available_balance / unspent_slip_count as Currency;
137 let mut total_output_slips_created: u64 = 0;
138
139 let mut to_public_key = [0; 33];
140
141 {
142 let peers = self.peer_lock.read().await;
143
144 if peers.index_to_peers.is_empty() {
145 info!("not yet connected to a node");
146 return;
147 }
148
149 if let Some((index, peer)) = peers.index_to_peers.iter().next() {
150 if let PeerStatus::Connected = peer.peer_status {
151 to_public_key = peer.get_public_key().unwrap();
152 } else {
153 info!("peer not connected. status : {:?}", peer.peer_status);
154 return;
155 }
156 }
157 assert_eq!(peers.address_to_peers.len(), 1usize, "we have assumed connecting to a single node. move add_hop to correct place if not.");
158 assert_ne!(to_public_key, self.public_key);
159 }
160 let mut txs: VecDeque<Transaction> = Default::default();
161 for _i in 0..unspent_slip_count {
162 let transaction = self
163 .create_slip_transaction(
164 output_slips_per_input_slip,
165 total_nolans_requested_per_slip,
166 &mut total_output_slips_created,
167 &to_public_key,
168 )
169 .await;
170
171 txs.push_back(transaction);
173
174 if total_output_slips_created >= self.tx_count {
175 info!(
176 "Slip creation completed, current = {:?}, target = {:?}",
177 total_output_slips_created, self.tx_count
178 );
179 info!("changing state to 'WaitingForBlockChainConfirmation'");
180 self.state = GeneratorState::WaitingForBlockChainConfirmation;
181 break;
182 }
183 }
184 info!("{:?} slip creation txs generated", txs.len());
185 self.sender.send(txs).await.unwrap();
186
187 info!(
190 "New slips created, current = {:?}, target = {:?}",
191 total_output_slips_created, self.tx_count
192 );
193 } else {
194 if unspent_slip_count >= self.tx_count {
195 self.state = GeneratorState::WaitingForBlockChainConfirmation;
196 info!("changing state to 'WaitingForBlockChainConfirmation' since we have enough slips");
197 return;
198 }
199 info!(
200 "not enough slips. unspent slip count : {:?} tx count : {:?} expected slips : {:?}",
201 unspent_slip_count, self.tx_count, self.expected_slip_count
202 );
203 tokio::time::sleep(Duration::from_millis(1_000)).await;
204 }
205 }
206
207 async fn create_slip_transaction(
208 &mut self,
209 output_slips_per_input_slip: u8,
210 total_nolans_requested_per_slip: Currency,
211 total_output_slips_created: &mut u64,
212 to_public_key: &SaitoPublicKey,
213 ) -> Transaction {
214 let payment_amount =
215 total_nolans_requested_per_slip / output_slips_per_input_slip as Currency;
216
217 let genesis_period;
218 let latest_block_id;
219 {
220 genesis_period = self.get_genesis_period().await;
221 latest_block_id = self.get_latest_block_id().await;
222 }
223
224 let mut wallet = self.wallet_lock.write().await;
225
226 let mut transaction = Transaction::default();
227
228 let (input_slips, output_slips) = wallet.generate_slips(
229 total_nolans_requested_per_slip,
230 None,
231 latest_block_id,
232 genesis_period,
233 );
234
235 for slip in input_slips {
236 transaction.add_from_slip(slip);
237 }
238 for slip in output_slips {
239 transaction.add_to_slip(slip);
240 }
241
242 for _c in 0..output_slips_per_input_slip {
243 let mut output = Slip::default();
244 output.public_key = self.public_key;
245 output.amount = payment_amount;
246 transaction.add_to_slip(output);
247 *total_output_slips_created += 1;
248 }
249
250 let remaining_bytes: i64 =
251 self.tx_size as i64 - (*total_output_slips_created + 1) as i64 * SLIP_SIZE as i64;
252
253 if remaining_bytes > 0 {
254 transaction.data = generate_random_bytes(remaining_bytes as u64).await;
255 }
256
257 transaction.timestamp = self.time_keeper.get_timestamp_in_ms();
258 transaction.generate(&self.public_key, 0, 0);
259 transaction.sign(&self.private_key);
260 transaction.add_hop(&wallet.private_key, &wallet.public_key, to_public_key);
261
262 transaction
263 }
264
265 pub async fn check_blockchain_for_confirmation(&mut self) -> bool {
266 info!("checking for blockchain confirmation...");
267 let unspent_slip_count;
268 {
269 let wallet = self.wallet_lock.read().await;
270 unspent_slip_count = wallet.get_unspent_slip_count();
271 }
272
273 if unspent_slip_count >= self.tx_count {
274 info!(
275 "New slips detected on the blockchain, current = {:?}, target = {:?}",
276 unspent_slip_count, self.tx_count
277 );
278 info!("changing state to 'Generation Done'");
279 self.state = GeneratorState::Done;
280 return true;
281 }
282 info!(
283 "unspent slips : {:?} tx count : {:?}",
284 unspent_slip_count, self.tx_count
285 );
286 false
287 }
288
289 pub async fn create_test_transactions(&mut self) {
290 info!("creating test transactions : {:?}", self.tx_count);
291
292 let time_keeper = TimeKeeper {};
293 let wallet = self.wallet_lock.clone();
294 let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
295 let public_key = self.public_key;
296 let count = self.tx_count;
297 let required_balance = (self.tx_payment + self.tx_fee) * count as Currency;
298 let payment = self.tx_payment;
299 let fee = self.tx_fee;
300
301 let genesis_period = self.get_genesis_period().await;
302 let latest_block_id = self.get_latest_block_id().await;
303
304 tokio::spawn(async move {
305 info!(
306 "creating test transactions from new thread : count = {:?}",
307 count
308 );
309 let sender = sender.clone();
310 loop {
311 let mut work_done = false;
312 {
313 let mut wallet = wallet.write().await;
315
316 if wallet.get_available_balance() >= required_balance {
317 let mut vec = VecDeque::with_capacity(count as usize);
319 for i in 0..count {
320 if i % 100_000 == 0 {
321 info!("creating test transactions : {:?}", i);
322 }
323 let transaction = Transaction::create(
324 &mut wallet,
325 public_key,
326 payment,
327 fee,
328 false,
329 None,
330 latest_block_id,
331 genesis_period,
332 );
333 if transaction.is_err() {
334 debug!("transaction creation failed. {:?}", transaction);
335 break;
336 }
337 let mut transaction = transaction.unwrap();
338 transaction.generate_total_fees(0, 0);
339 if (transaction.total_in == 0 || transaction.total_out == 0)
340 && (payment + fee != 0)
341 {
342 debug!("transaction not added since not enough funds. in : {:?} out : {:?}. current balance : {:?}, required : {:?}", transaction.total_in, transaction.total_out,wallet.get_available_balance(), required_balance);
343 break;
344 }
345 vec.push_back(transaction);
346 }
347 if !vec.is_empty() {
348 info!("sending generated {:?} txs to spammer. sender capacity : {:?} / {:?}",vec.len(),sender.capacity(),sender.max_capacity());
349 sender.send(vec).await.unwrap();
350 work_done = true;
351 }
352 } else {
353 info!("not enough balance in wallet to create spam txs");
354 }
355 }
356 if !work_done {
357 tokio::time::sleep(Duration::from_millis(1_000)).await;
358 }
359 }
360 });
361 tokio::task::yield_now().await;
362
363 let mut to_public_key = [0; 33];
364
365 {
366 let peers = self.peer_lock.read().await;
367
368 if let Some((index, peer)) = peers.index_to_peers.iter().next() {
369 info!("peer count : {}", peers.index_to_peers.len());
371 info!("peer status : {:?}", peer.peer_status);
372 to_public_key = peer.get_public_key().unwrap();
373 }
377 assert_ne!(to_public_key, self.public_key);
379 }
380
381 while let Some(mut transactions) = receiver.recv().await {
382 let sender = self.sender.clone();
383 let tx_size = self.tx_size;
384 info!(
385 "received {:?} unsigned txs from generator",
386 transactions.len()
387 );
388
389 let txs: VecDeque<Transaction> = drain!(transactions, 100)
390 .map(|mut transaction| {
391 transaction.data = vec![0; tx_size as usize]; transaction.timestamp = time_keeper.get_timestamp_in_ms();
393 transaction.generate(&public_key, 0, 0);
394 transaction.sign(&self.private_key);
395 transaction.add_hop(&self.private_key, &self.public_key, &to_public_key);
396
397 transaction
398 })
399 .collect();
400 info!("sending {:?} signed txs to spammer", txs.len());
401 sender.send(txs).await.unwrap();
402 }
403
404 }
406
407 async fn get_latest_block_id(&self) -> u64 {
408 let blockchain = self.blockchain_lock.read().await;
409 blockchain.blockring.get_latest_block_id()
410 }
411
412 async fn get_genesis_period(&self) -> u64 {
413 let config_guard = self.configuration_lock.read().await;
414
415 let config: &dyn Configuration = &*config_guard;
416
417 if let Some(consensus_config) = config.get_consensus_config() {
418 let period = consensus_config.genesis_period;
419 period
421 } else {
422 println!("No consensus config available.");
423 1000
425 }
426 }
427}