saito_spammer/
spammer.rs

1use std::collections::VecDeque;
2use std::sync::Arc;
3use std::time::Duration;
4
5use log::info;
6use saito_core::core::consensus::peers::peer::PeerStatus;
7use tokio::sync::mpsc::{Receiver, Sender};
8use tokio::sync::RwLock;
9
10use saito_core::core::consensus::blockchain::Blockchain;
11use saito_core::core::consensus::peers::peer_collection::PeerCollection;
12use saito_core::core::consensus::transaction::Transaction;
13use saito_core::core::consensus::wallet::Wallet;
14use saito_core::core::defs::Currency;
15use saito_core::core::io::network_event::NetworkEvent;
16use saito_core::core::msg::message::Message;
17use saito_rust::io_event::IoEvent;
18
19use crate::config_handler::SpammerConfigs;
20use crate::transaction_generator::{GeneratorState, TransactionGenerator};
21
22pub struct Spammer {
23    sender_to_network: Sender<IoEvent>,
24    // peer_lock: Arc<RwLock<PeerCollection>>,
25    config_lock: Arc<RwLock<SpammerConfigs>>,
26    bootstrap_done: bool,
27    // sent_tx_count: u64,
28    tx_generator: TransactionGenerator,
29}
30
31impl Spammer {
32    pub async fn new(
33        wallet_lock: Arc<RwLock<Wallet>>,
34        peers_lock: Arc<RwLock<PeerCollection>>,
35        blockchain_lock: Arc<RwLock<Blockchain>>,
36        sender_to_network: Sender<IoEvent>,
37        sender: Sender<VecDeque<Transaction>>,
38        configs_lock: Arc<RwLock<SpammerConfigs>>,
39    ) -> Spammer {
40        let tx_payment;
41        let tx_fee;
42        {
43            let configs = configs_lock.read().await;
44            tx_payment = configs.get_spammer_configs().tx_payment;
45            tx_fee = configs.get_spammer_configs().tx_fee;
46        }
47        Spammer {
48            sender_to_network,
49            // peer_lock: peers_lock.clone(),
50            config_lock: configs_lock.clone(),
51            bootstrap_done: false,
52            // sent_tx_count: 0,
53            tx_generator: TransactionGenerator::create(
54                wallet_lock.clone(),
55                peers_lock.clone(),
56                blockchain_lock.clone(),
57                configs_lock.clone(),
58                sender,
59                tx_payment as Currency,
60                tx_fee as Currency,
61            )
62            .await,
63        }
64    }
65
66    async fn run(&mut self, mut receiver: Receiver<VecDeque<Transaction>>) {
67        let mut work_done;
68        let timer_in_milli;
69        let burst_count;
70        let stop_after;
71
72        {
73            let configs = self.config_lock.read().await;
74
75            timer_in_milli = configs.get_spammer_configs().timer_in_milli;
76            burst_count = configs.get_spammer_configs().burst_count;
77            stop_after = configs.get_spammer_configs().stop_after;
78        }
79
80        let sender = self.sender_to_network.clone();
81        let peer_lock = self.tx_generator.peer_lock.clone();
82        tokio::spawn(async move {
83            let mut total_count = 0;
84            let mut count = burst_count;
85            loop {
86                {
87                    let peers = peer_lock.read().await;
88                    if let Some((index, peer)) = peers.index_to_peers.iter().next() {
89                        if let PeerStatus::Connected = peer.peer_status {
90                            // info!("peer count : {}", peers.index_to_peers.len());
91                            // info!("peer status : {:?}", peer.peer_status);
92                        } else {
93                            info!("peer not connected. status : {:?}", peer.peer_status);
94                            tokio::time::sleep(Duration::from_millis(timer_in_milli)).await;
95                            continue;
96                        }
97                    }
98                }
99                if let Some(transactions) = receiver.recv().await {
100                    info!(
101                        "received {:?} txs to be sent. sender capacity : {:?} / {:?}",
102                        transactions.len(),
103                        sender.capacity(),
104                        sender.max_capacity()
105                    );
106                    for tx in transactions {
107                        count -= 1;
108                        total_count += 1;
109                        sender
110                            .send(IoEvent {
111                                event_processor_id: 0,
112                                event_id: 0,
113                                event: NetworkEvent::OutgoingNetworkMessageForAll {
114                                    buffer: Message::Transaction(tx).serialize(),
115                                    exceptions: vec![],
116                                },
117                            })
118                            .await
119                            .unwrap();
120
121                        if count == 0 {
122                            tokio::time::sleep(Duration::from_millis(timer_in_milli)).await;
123                            count = burst_count;
124                        }
125                        if total_count == stop_after {
126                            tokio::time::sleep(Duration::from_millis(10_000)).await;
127                            info!("terminating spammer after sending : {:?} txs", total_count);
128                            std::process::exit(0);
129                        }
130                    }
131                    info!(
132                        "sent all the txs received from generator : total count : {:?}",
133                        total_count
134                    );
135                }
136            }
137        });
138        tokio::task::yield_now().await;
139        loop {
140            work_done = false;
141
142            if !self.bootstrap_done {
143                // if self.tx_generator.get_state() != GeneratorState::Done  {
144                //     self.tx_generator.check_blockchain_for_confirmation().await ;
145                // }
146                {
147                    let peers = self.tx_generator.peer_lock.read().await;
148                    if let Some((index, peer)) = peers.index_to_peers.iter().next() {
149                        if let PeerStatus::Connected = peer.peer_status {
150                            // info!("peer count : {}", peers.index_to_peers.len());
151                            // info!("peer status : {:?}", peer.peer_status);
152                            // to_public_key = peer.get_public_key().unwrap();
153                        } else {
154                            info!("peer not connected. status : {:?}", peer.peer_status);
155                            tokio::time::sleep(Duration::from_millis(timer_in_milli)).await;
156                            continue;
157                        }
158                    }
159                }
160                self.tx_generator.on_new_block().await;
161                self.bootstrap_done = self.tx_generator.get_state() == GeneratorState::Done;
162                work_done = true;
163            }
164
165            if !work_done {
166                tokio::time::sleep(Duration::from_millis(timer_in_milli)).await;
167            }
168        }
169    }
170}
171
172pub async fn run_spammer(
173    wallet_lock: Arc<RwLock<Wallet>>,
174    peers_lock: Arc<RwLock<PeerCollection>>,
175    blockchain_lock: Arc<RwLock<Blockchain>>,
176    sender_to_network: Sender<IoEvent>,
177    configs_lock: Arc<RwLock<SpammerConfigs>>,
178) {
179    info!("starting the spammer");
180    tokio::spawn(async move {
181        let (sender, receiver) = tokio::sync::mpsc::channel::<VecDeque<Transaction>>(1000);
182        let mut spammer = Spammer::new(
183            wallet_lock,
184            peers_lock,
185            blockchain_lock,
186            sender_to_network,
187            sender,
188            configs_lock,
189        )
190        .await;
191        spammer.run(receiver).await;
192    });
193}