saito_rust/
rust_io_handler.rs

1use ahash::HashMap;
2use std::fmt::{Debug, Formatter};
3use std::fs;
4use std::io::{Error, ErrorKind};
5use std::path::Path;
6
7use async_trait::async_trait;
8use lazy_static::lazy_static;
9use log::{debug, error, trace};
10use tokio::fs::{File, OpenOptions};
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12use tokio::sync::mpsc::Sender;
13
14use saito_core::core::consensus::peers::peer_service::PeerService;
15use saito_core::core::consensus::wallet::Wallet;
16use saito_core::core::defs::{BlockId, PeerIndex, SaitoHash, BLOCK_FILE_EXTENSION};
17use saito_core::core::io::interface_io::{InterfaceEvent, InterfaceIO};
18use saito_core::core::io::network_event::NetworkEvent;
19
20use crate::io_event::IoEvent;
21
22lazy_static! {
23    pub static ref BLOCKS_DIR_PATH: String = configure_storage();
24    pub static ref WALLET_DIR_PATH: String = String::from("./data/wallet");
25    pub static ref CHECKPOINT_DIR_PATH: String = String::from("./data/checkpoints/");
26}
27pub fn configure_storage() -> String {
28    if cfg!(test) {
29        String::from("./data/test/blocks/")
30    } else {
31        String::from("./data/blocks/")
32    }
33}
34
35pub struct RustIOHandler {
36    sender: Sender<IoEvent>,
37    handler_id: u8,
38    open_files: HashMap<String, File>,
39}
40
41impl RustIOHandler {
42    pub fn new(sender: Sender<IoEvent>, handler_id: u8) -> RustIOHandler {
43        RustIOHandler {
44            sender,
45            handler_id,
46            open_files: Default::default(),
47        }
48    }
49}
50
51impl Debug for RustIOHandler {
52    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
53        f.debug_struct("RustIoHandler")
54            .field("handler_id", &self.handler_id)
55            .finish()
56    }
57}
58
59#[async_trait]
60impl InterfaceIO for RustIOHandler {
61    async fn send_message(&self, peer_index: u64, buffer: &[u8]) -> Result<(), Error> {
62        // TODO : refactor to combine event and the future
63        let event = IoEvent::new(NetworkEvent::OutgoingNetworkMessage {
64            peer_index,
65            buffer: buffer.to_vec(),
66        });
67
68        self.sender.send(event).await.unwrap();
69
70        Ok(())
71    }
72
73    async fn send_message_to_all(
74        &self,
75        buffer: &[u8],
76        peer_exceptions: Vec<u64>,
77    ) -> Result<(), Error> {
78        // debug!("send message to all");
79
80        let event = IoEvent::new(NetworkEvent::OutgoingNetworkMessageForAll {
81            buffer: buffer.to_vec(),
82            exceptions: peer_exceptions,
83        });
84
85        self.sender.send(event).await.unwrap();
86
87        Ok(())
88    }
89
90    async fn connect_to_peer(&mut self, url: String, peer_index: PeerIndex) -> Result<(), Error> {
91        debug!("connecting to peer : {:?} with url : {:?}", peer_index, url);
92        let event = IoEvent::new(NetworkEvent::ConnectToPeer { url, peer_index });
93
94        self.sender.send(event).await.unwrap();
95
96        Ok(())
97    }
98
99    async fn disconnect_from_peer(&self, peer_index: u64) -> Result<(), Error> {
100        self.sender
101            .send(IoEvent::new(NetworkEvent::DisconnectFromPeer {
102                peer_index,
103            }))
104            .await
105            .unwrap();
106        Ok(())
107    }
108
109    async fn fetch_block_from_peer(
110        &self,
111        block_hash: SaitoHash,
112        peer_index: u64,
113        url: &str,
114        block_id: BlockId,
115    ) -> Result<(), Error> {
116        if block_hash == [0; 32] {
117            return Ok(());
118        }
119
120        debug!("fetching block : {:?} from peer : {:?}", block_id, url);
121        let event = IoEvent::new(NetworkEvent::BlockFetchRequest {
122            block_hash,
123            peer_index,
124            block_id,
125            url: url.to_string(),
126        });
127
128        self.sender
129            .send(event)
130            .await
131            .expect("failed sending to io controller");
132
133        Ok(())
134    }
135
136    async fn write_value(&self, key: &str, value: &[u8]) -> Result<(), Error> {
137        // trace!("writing value to disk : {:?}", key);
138        let filename = key;
139        let path = Path::new(filename);
140        if path.parent().is_some() {
141            tokio::fs::create_dir_all(path.parent().unwrap())
142                .await
143                .expect("creating directory structure failed");
144        }
145        let mut file = File::create(filename).await?;
146
147        file.write_all(value).await?;
148
149        // TODO : write the file to a temp file and move to avoid file corruptions
150
151        Ok(())
152    }
153
154    async fn append_value(&mut self, key: &str, value: &[u8]) -> Result<(), Error> {
155        // trace!("appending value to disk : {:?}", key);
156
157        if !self.open_files.contains_key(key) {
158            debug!("file is not yet opened to append. opening file : {:?}", key);
159            let file = OpenOptions::new()
160                .create(true)
161                .append(true)
162                .open(key)
163                .await?;
164            self.open_files.insert(key.to_string(), file);
165        }
166
167        let file = self.open_files.get_mut(key).unwrap();
168
169        // TODO : write the file to a temp file and move to avoid file corruptions
170        let result = file.write_all(value).await;
171        if result.is_err() {
172            return Err(result.err().unwrap());
173        }
174        Ok(())
175    }
176
177    async fn flush_data(&mut self, key: &str) -> Result<(), Error> {
178        trace!("flushing values to disk : {:?}", key);
179
180        if !self.open_files.contains_key(key) {
181            debug!("file : {:?} is not yet opened so cannot be flushed.", key);
182            return Err(Error::from(ErrorKind::Unsupported));
183        }
184
185        let file = self.open_files.get_mut(key).unwrap();
186
187        let result = file.flush().await;
188        if result.is_err() {
189            return Err(result.err().unwrap());
190        }
191        Ok(())
192    }
193
194    async fn read_value(&self, key: &str) -> Result<Vec<u8>, Error> {
195        let result = File::open(key).await;
196        if result.is_err() {
197            let err = result.err().unwrap();
198            error!("couldn't open file for : {:?}. {:?}", key, err);
199            return Err(err);
200        }
201        let mut file = result.unwrap();
202        let mut encoded = Vec::<u8>::new();
203
204        let result = file.read_to_end(&mut encoded).await;
205        if result.is_err() {
206            let err = result.err().unwrap();
207            error!("couldn't read file : {:?}. {:?}", key, err);
208            return Err(err);
209        }
210        Ok(encoded)
211    }
212
213    async fn load_block_file_list(&self) -> Result<Vec<String>, Error> {
214        debug!(
215            "loading blocks from dir : {:?}",
216            self.get_block_dir().to_string(),
217        );
218        let result = fs::read_dir(self.get_block_dir());
219        if result.is_err() {
220            debug!("no blocks found");
221            return Err(result.err().unwrap());
222        }
223        let mut paths: Vec<_> = result
224            .unwrap()
225            .map(|r| r.unwrap())
226            .filter(|r| {
227                r.file_name()
228                    .into_string()
229                    .unwrap()
230                    .contains(BLOCK_FILE_EXTENSION)
231            })
232            .collect();
233        paths.sort_by(|a, b| {
234            let a_metadata = fs::metadata(a.path()).unwrap();
235            let b_metadata = fs::metadata(b.path()).unwrap();
236            a_metadata
237                .modified()
238                .unwrap()
239                .partial_cmp(&b_metadata.modified().unwrap())
240                .unwrap()
241        });
242        let mut filenames = vec![];
243        for entry in paths {
244            filenames.push(entry.file_name().into_string().unwrap());
245        }
246
247        Ok(filenames)
248    }
249
250    async fn is_existing_file(&self, key: &str) -> bool {
251        return Path::new(&key).exists();
252    }
253
254    async fn remove_value(&self, key: &str) -> Result<(), Error> {
255        let result = tokio::fs::remove_file(key).await;
256        return result;
257    }
258
259    fn get_block_dir(&self) -> String {
260        BLOCKS_DIR_PATH.to_string()
261    }
262
263    fn get_checkpoint_dir(&self) -> String {
264        CHECKPOINT_DIR_PATH.to_string()
265    }
266
267    fn ensure_block_directory_exists(&self, block_dir_path: &str) -> Result<(), Error> {
268        if !Path::new(&block_dir_path).exists() {
269            fs::create_dir_all(BLOCKS_DIR_PATH.to_string())?;
270        }
271        Ok(())
272    }
273
274    async fn process_api_call(&self, _buffer: Vec<u8>, _msg_index: u32, _peer_index: PeerIndex) {}
275
276    async fn process_api_success(&self, _buffer: Vec<u8>, _msg_index: u32, _peer_index: PeerIndex) {
277    }
278
279    async fn process_api_error(&self, _buffer: Vec<u8>, _msg_index: u32, _peer_index: PeerIndex) {}
280
281    fn send_interface_event(&self, _event: InterfaceEvent) {
282        // no one is listening to these events in rust node
283    }
284
285    async fn save_wallet(&self, wallet: &mut Wallet) -> Result<(), Error> {
286        let buffer = wallet.serialize_for_disk();
287        self.write_value(WALLET_DIR_PATH.as_str(), buffer.as_slice())
288            .await
289    }
290
291    async fn load_wallet(&self, wallet: &mut Wallet) -> Result<(), Error> {
292        if !self.is_existing_file(WALLET_DIR_PATH.as_str()).await {
293            return Ok(());
294        }
295        let buffer = self.read_value(WALLET_DIR_PATH.as_str()).await?;
296        wallet.deserialize_from_disk(&buffer);
297        Ok(())
298    }
299
300    fn get_my_services(&self) -> Vec<PeerService> {
301        vec![]
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use saito_core::core::io::interface_io::InterfaceIO;
308
309    use crate::rust_io_handler::RustIOHandler;
310
311    #[tokio::test]
312    async fn test_write_value() {
313        let (sender, mut _receiver) = tokio::sync::mpsc::channel(10);
314        let io_handler = RustIOHandler::new(sender, 0);
315
316        let result = io_handler
317            .write_value("./data/test/KEY", [1, 2, 3, 4].as_slice())
318            .await;
319        assert!(result.is_ok(), "{:?}", result.err().unwrap().to_string());
320        let result = io_handler.read_value("./data/test/KEY").await;
321        assert!(result.is_ok());
322        let result = result.unwrap();
323        assert_eq!(result, [1, 2, 3, 4]);
324    }
325
326    #[tokio::test]
327    async fn file_exists_success() {
328        let (sender, mut _receiver) = tokio::sync::mpsc::channel(10);
329        let io_handler = RustIOHandler::new(sender, 0);
330        let path = String::from("src/test/data/config_handler_tests.json");
331
332        let result = io_handler.is_existing_file(path.as_str()).await;
333        assert!(result);
334    }
335
336    #[tokio::test]
337    async fn file_exists_fail() {
338        let (sender, mut _receiver) = tokio::sync::mpsc::channel(10);
339        let io_handler = RustIOHandler::new(sender, 0);
340        let path = String::from("badfilename.json");
341
342        let result = io_handler.is_existing_file(path.as_str()).await;
343        assert!(!result);
344    }
345}