1use std::collections::VecDeque;
2use std::sync::Arc;
3use std::time::Duration;
4
5use log::info;
6use saito_core::core::consensus::peers::peer::PeerStatus;
7use tokio::sync::mpsc::{Receiver, Sender};
8use tokio::sync::RwLock;
9
10use saito_core::core::consensus::blockchain::Blockchain;
11use saito_core::core::consensus::peers::peer_collection::PeerCollection;
12use saito_core::core::consensus::transaction::Transaction;
13use saito_core::core::consensus::wallet::Wallet;
14use saito_core::core::defs::Currency;
15use saito_core::core::io::network_event::NetworkEvent;
16use saito_core::core::msg::message::Message;
17use saito_rust::io_event::IoEvent;
18
19use crate::config_handler::SpammerConfigs;
20use crate::transaction_generator::{GeneratorState, TransactionGenerator};
21
22pub struct Spammer {
23 sender_to_network: Sender<IoEvent>,
24 config_lock: Arc<RwLock<SpammerConfigs>>,
26 bootstrap_done: bool,
27 tx_generator: TransactionGenerator,
29}
30
31impl Spammer {
32 pub async fn new(
33 wallet_lock: Arc<RwLock<Wallet>>,
34 peers_lock: Arc<RwLock<PeerCollection>>,
35 blockchain_lock: Arc<RwLock<Blockchain>>,
36 sender_to_network: Sender<IoEvent>,
37 sender: Sender<VecDeque<Transaction>>,
38 configs_lock: Arc<RwLock<SpammerConfigs>>,
39 ) -> Spammer {
40 let tx_payment;
41 let tx_fee;
42 {
43 let configs = configs_lock.read().await;
44 tx_payment = configs.get_spammer_configs().tx_payment;
45 tx_fee = configs.get_spammer_configs().tx_fee;
46 }
47 Spammer {
48 sender_to_network,
49 config_lock: configs_lock.clone(),
51 bootstrap_done: false,
52 tx_generator: TransactionGenerator::create(
54 wallet_lock.clone(),
55 peers_lock.clone(),
56 blockchain_lock.clone(),
57 configs_lock.clone(),
58 sender,
59 tx_payment as Currency,
60 tx_fee as Currency,
61 )
62 .await,
63 }
64 }
65
66 async fn run(&mut self, mut receiver: Receiver<VecDeque<Transaction>>) {
67 let mut work_done;
68 let timer_in_milli;
69 let burst_count;
70 let stop_after;
71
72 {
73 let configs = self.config_lock.read().await;
74
75 timer_in_milli = configs.get_spammer_configs().timer_in_milli;
76 burst_count = configs.get_spammer_configs().burst_count;
77 stop_after = configs.get_spammer_configs().stop_after;
78 }
79
80 let sender = self.sender_to_network.clone();
81 let peer_lock = self.tx_generator.peer_lock.clone();
82 tokio::spawn(async move {
83 let mut total_count = 0;
84 let mut count = burst_count;
85 loop {
86 {
87 let peers = peer_lock.read().await;
88 if let Some((index, peer)) = peers.index_to_peers.iter().next() {
89 if let PeerStatus::Connected = peer.peer_status {
90 } else {
93 info!("peer not connected. status : {:?}", peer.peer_status);
94 tokio::time::sleep(Duration::from_millis(timer_in_milli)).await;
95 continue;
96 }
97 }
98 }
99 if let Some(transactions) = receiver.recv().await {
100 info!(
101 "received {:?} txs to be sent. sender capacity : {:?} / {:?}",
102 transactions.len(),
103 sender.capacity(),
104 sender.max_capacity()
105 );
106 for tx in transactions {
107 count -= 1;
108 total_count += 1;
109 sender
110 .send(IoEvent {
111 event_processor_id: 0,
112 event_id: 0,
113 event: NetworkEvent::OutgoingNetworkMessageForAll {
114 buffer: Message::Transaction(tx).serialize(),
115 exceptions: vec![],
116 },
117 })
118 .await
119 .unwrap();
120
121 if count == 0 {
122 tokio::time::sleep(Duration::from_millis(timer_in_milli)).await;
123 count = burst_count;
124 }
125 if total_count == stop_after {
126 tokio::time::sleep(Duration::from_millis(10_000)).await;
127 info!("terminating spammer after sending : {:?} txs", total_count);
128 std::process::exit(0);
129 }
130 }
131 info!(
132 "sent all the txs received from generator : total count : {:?}",
133 total_count
134 );
135 }
136 }
137 });
138 tokio::task::yield_now().await;
139 loop {
140 work_done = false;
141
142 if !self.bootstrap_done {
143 {
147 let peers = self.tx_generator.peer_lock.read().await;
148 if let Some((index, peer)) = peers.index_to_peers.iter().next() {
149 if let PeerStatus::Connected = peer.peer_status {
150 } else {
154 info!("peer not connected. status : {:?}", peer.peer_status);
155 tokio::time::sleep(Duration::from_millis(timer_in_milli)).await;
156 continue;
157 }
158 }
159 }
160 self.tx_generator.on_new_block().await;
161 self.bootstrap_done = self.tx_generator.get_state() == GeneratorState::Done;
162 work_done = true;
163 }
164
165 if !work_done {
166 tokio::time::sleep(Duration::from_millis(timer_in_milli)).await;
167 }
168 }
169 }
170}
171
172pub async fn run_spammer(
173 wallet_lock: Arc<RwLock<Wallet>>,
174 peers_lock: Arc<RwLock<PeerCollection>>,
175 blockchain_lock: Arc<RwLock<Blockchain>>,
176 sender_to_network: Sender<IoEvent>,
177 configs_lock: Arc<RwLock<SpammerConfigs>>,
178) {
179 info!("starting the spammer");
180 tokio::spawn(async move {
181 let (sender, receiver) = tokio::sync::mpsc::channel::<VecDeque<Transaction>>(1000);
182 let mut spammer = Spammer::new(
183 wallet_lock,
184 peers_lock,
185 blockchain_lock,
186 sender_to_network,
187 sender,
188 configs_lock,
189 )
190 .await;
191 spammer.run(receiver).await;
192 });
193}