1use ahash::HashMap;
2use std::fmt::{Debug, Formatter};
3use std::fs;
4use std::io::{Error, ErrorKind};
5use std::path::Path;
6
7use async_trait::async_trait;
8use lazy_static::lazy_static;
9use log::{debug, error, trace};
10use tokio::fs::{File, OpenOptions};
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12use tokio::sync::mpsc::Sender;
13
14use saito_core::core::consensus::peers::peer_service::PeerService;
15use saito_core::core::consensus::wallet::Wallet;
16use saito_core::core::defs::{BlockId, PeerIndex, SaitoHash, BLOCK_FILE_EXTENSION};
17use saito_core::core::io::interface_io::{InterfaceEvent, InterfaceIO};
18use saito_core::core::io::network_event::NetworkEvent;
19
20use crate::io_event::IoEvent;
21
22lazy_static! {
23 pub static ref BLOCKS_DIR_PATH: String = configure_storage();
24 pub static ref WALLET_DIR_PATH: String = String::from("./data/wallet");
25 pub static ref CHECKPOINT_DIR_PATH: String = String::from("./data/checkpoints/");
26}
27pub fn configure_storage() -> String {
28 if cfg!(test) {
29 String::from("./data/test/blocks/")
30 } else {
31 String::from("./data/blocks/")
32 }
33}
34
35pub struct RustIOHandler {
36 sender: Sender<IoEvent>,
37 handler_id: u8,
38 open_files: HashMap<String, File>,
39}
40
41impl RustIOHandler {
42 pub fn new(sender: Sender<IoEvent>, handler_id: u8) -> RustIOHandler {
43 RustIOHandler {
44 sender,
45 handler_id,
46 open_files: Default::default(),
47 }
48 }
49}
50
51impl Debug for RustIOHandler {
52 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
53 f.debug_struct("RustIoHandler")
54 .field("handler_id", &self.handler_id)
55 .finish()
56 }
57}
58
59#[async_trait]
60impl InterfaceIO for RustIOHandler {
61 async fn send_message(&self, peer_index: u64, buffer: &[u8]) -> Result<(), Error> {
62 let event = IoEvent::new(NetworkEvent::OutgoingNetworkMessage {
64 peer_index,
65 buffer: buffer.to_vec(),
66 });
67
68 self.sender.send(event).await.unwrap();
69
70 Ok(())
71 }
72
73 async fn send_message_to_all(
74 &self,
75 buffer: &[u8],
76 peer_exceptions: Vec<u64>,
77 ) -> Result<(), Error> {
78 let event = IoEvent::new(NetworkEvent::OutgoingNetworkMessageForAll {
81 buffer: buffer.to_vec(),
82 exceptions: peer_exceptions,
83 });
84
85 self.sender.send(event).await.unwrap();
86
87 Ok(())
88 }
89
90 async fn connect_to_peer(&mut self, url: String, peer_index: PeerIndex) -> Result<(), Error> {
91 debug!("connecting to peer : {:?} with url : {:?}", peer_index, url);
92 let event = IoEvent::new(NetworkEvent::ConnectToPeer { url, peer_index });
93
94 self.sender.send(event).await.unwrap();
95
96 Ok(())
97 }
98
99 async fn disconnect_from_peer(&self, peer_index: u64) -> Result<(), Error> {
100 self.sender
101 .send(IoEvent::new(NetworkEvent::DisconnectFromPeer {
102 peer_index,
103 }))
104 .await
105 .unwrap();
106 Ok(())
107 }
108
109 async fn fetch_block_from_peer(
110 &self,
111 block_hash: SaitoHash,
112 peer_index: u64,
113 url: &str,
114 block_id: BlockId,
115 ) -> Result<(), Error> {
116 if block_hash == [0; 32] {
117 return Ok(());
118 }
119
120 debug!("fetching block : {:?} from peer : {:?}", block_id, url);
121 let event = IoEvent::new(NetworkEvent::BlockFetchRequest {
122 block_hash,
123 peer_index,
124 block_id,
125 url: url.to_string(),
126 });
127
128 self.sender
129 .send(event)
130 .await
131 .expect("failed sending to io controller");
132
133 Ok(())
134 }
135
136 async fn write_value(&self, key: &str, value: &[u8]) -> Result<(), Error> {
137 let filename = key;
139 let path = Path::new(filename);
140 if path.parent().is_some() {
141 tokio::fs::create_dir_all(path.parent().unwrap())
142 .await
143 .expect("creating directory structure failed");
144 }
145 let mut file = File::create(filename).await?;
146
147 file.write_all(value).await?;
148
149 Ok(())
152 }
153
154 async fn append_value(&mut self, key: &str, value: &[u8]) -> Result<(), Error> {
155 if !self.open_files.contains_key(key) {
158 debug!("file is not yet opened to append. opening file : {:?}", key);
159 let file = OpenOptions::new()
160 .create(true)
161 .append(true)
162 .open(key)
163 .await?;
164 self.open_files.insert(key.to_string(), file);
165 }
166
167 let file = self.open_files.get_mut(key).unwrap();
168
169 let result = file.write_all(value).await;
171 if result.is_err() {
172 return Err(result.err().unwrap());
173 }
174 Ok(())
175 }
176
177 async fn flush_data(&mut self, key: &str) -> Result<(), Error> {
178 trace!("flushing values to disk : {:?}", key);
179
180 if !self.open_files.contains_key(key) {
181 debug!("file : {:?} is not yet opened so cannot be flushed.", key);
182 return Err(Error::from(ErrorKind::Unsupported));
183 }
184
185 let file = self.open_files.get_mut(key).unwrap();
186
187 let result = file.flush().await;
188 if result.is_err() {
189 return Err(result.err().unwrap());
190 }
191 Ok(())
192 }
193
194 async fn read_value(&self, key: &str) -> Result<Vec<u8>, Error> {
195 let result = File::open(key).await;
196 if result.is_err() {
197 let err = result.err().unwrap();
198 error!("couldn't open file for : {:?}. {:?}", key, err);
199 return Err(err);
200 }
201 let mut file = result.unwrap();
202 let mut encoded = Vec::<u8>::new();
203
204 let result = file.read_to_end(&mut encoded).await;
205 if result.is_err() {
206 let err = result.err().unwrap();
207 error!("couldn't read file : {:?}. {:?}", key, err);
208 return Err(err);
209 }
210 Ok(encoded)
211 }
212
213 async fn load_block_file_list(&self) -> Result<Vec<String>, Error> {
214 debug!(
215 "loading blocks from dir : {:?}",
216 self.get_block_dir().to_string(),
217 );
218 let result = fs::read_dir(self.get_block_dir());
219 if result.is_err() {
220 debug!("no blocks found");
221 return Err(result.err().unwrap());
222 }
223 let mut paths: Vec<_> = result
224 .unwrap()
225 .map(|r| r.unwrap())
226 .filter(|r| {
227 r.file_name()
228 .into_string()
229 .unwrap()
230 .contains(BLOCK_FILE_EXTENSION)
231 })
232 .collect();
233 paths.sort_by(|a, b| {
234 let a_metadata = fs::metadata(a.path()).unwrap();
235 let b_metadata = fs::metadata(b.path()).unwrap();
236 a_metadata
237 .modified()
238 .unwrap()
239 .partial_cmp(&b_metadata.modified().unwrap())
240 .unwrap()
241 });
242 let mut filenames = vec![];
243 for entry in paths {
244 filenames.push(entry.file_name().into_string().unwrap());
245 }
246
247 Ok(filenames)
248 }
249
250 async fn is_existing_file(&self, key: &str) -> bool {
251 return Path::new(&key).exists();
252 }
253
254 async fn remove_value(&self, key: &str) -> Result<(), Error> {
255 let result = tokio::fs::remove_file(key).await;
256 return result;
257 }
258
259 fn get_block_dir(&self) -> String {
260 BLOCKS_DIR_PATH.to_string()
261 }
262
263 fn get_checkpoint_dir(&self) -> String {
264 CHECKPOINT_DIR_PATH.to_string()
265 }
266
267 fn ensure_block_directory_exists(&self, block_dir_path: &str) -> Result<(), Error> {
268 if !Path::new(&block_dir_path).exists() {
269 fs::create_dir_all(BLOCKS_DIR_PATH.to_string())?;
270 }
271 Ok(())
272 }
273
274 async fn process_api_call(&self, _buffer: Vec<u8>, _msg_index: u32, _peer_index: PeerIndex) {}
275
276 async fn process_api_success(&self, _buffer: Vec<u8>, _msg_index: u32, _peer_index: PeerIndex) {
277 }
278
279 async fn process_api_error(&self, _buffer: Vec<u8>, _msg_index: u32, _peer_index: PeerIndex) {}
280
281 fn send_interface_event(&self, _event: InterfaceEvent) {
282 }
284
285 async fn save_wallet(&self, wallet: &mut Wallet) -> Result<(), Error> {
286 let buffer = wallet.serialize_for_disk();
287 self.write_value(WALLET_DIR_PATH.as_str(), buffer.as_slice())
288 .await
289 }
290
291 async fn load_wallet(&self, wallet: &mut Wallet) -> Result<(), Error> {
292 if !self.is_existing_file(WALLET_DIR_PATH.as_str()).await {
293 return Ok(());
294 }
295 let buffer = self.read_value(WALLET_DIR_PATH.as_str()).await?;
296 wallet.deserialize_from_disk(&buffer);
297 Ok(())
298 }
299
300 fn get_my_services(&self) -> Vec<PeerService> {
301 vec![]
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use saito_core::core::io::interface_io::InterfaceIO;
308
309 use crate::rust_io_handler::RustIOHandler;
310
311 #[tokio::test]
312 async fn test_write_value() {
313 let (sender, mut _receiver) = tokio::sync::mpsc::channel(10);
314 let io_handler = RustIOHandler::new(sender, 0);
315
316 let result = io_handler
317 .write_value("./data/test/KEY", [1, 2, 3, 4].as_slice())
318 .await;
319 assert!(result.is_ok(), "{:?}", result.err().unwrap().to_string());
320 let result = io_handler.read_value("./data/test/KEY").await;
321 assert!(result.is_ok());
322 let result = result.unwrap();
323 assert_eq!(result, [1, 2, 3, 4]);
324 }
325
326 #[tokio::test]
327 async fn file_exists_success() {
328 let (sender, mut _receiver) = tokio::sync::mpsc::channel(10);
329 let io_handler = RustIOHandler::new(sender, 0);
330 let path = String::from("src/test/data/config_handler_tests.json");
331
332 let result = io_handler.is_existing_file(path.as_str()).await;
333 assert!(result);
334 }
335
336 #[tokio::test]
337 async fn file_exists_fail() {
338 let (sender, mut _receiver) = tokio::sync::mpsc::channel(10);
339 let io_handler = RustIOHandler::new(sender, 0);
340 let path = String::from("badfilename.json");
341
342 let result = io_handler.is_existing_file(path.as_str()).await;
343 assert!(!result);
344 }
345}