saito_core/core/io/
storage.rs

1use rayon::prelude::*;
2use std::fs::File;
3use std::io::{Error, ErrorKind, Write};
4use std::sync::Arc;
5
6use ahash::AHashMap;
7use bs58;
8use log::{debug, error, info, warn};
9use tokio::sync::RwLock;
10
11use crate::core::consensus::block::{Block, BlockType};
12use crate::core::consensus::mempool::Mempool;
13use crate::core::consensus::slip::{Slip, SlipType};
14use crate::core::defs::{
15    BlockId, PrintForLog, SaitoHash, SaitoPublicKey, SaitoUTXOSetKey, PROJECT_PUBLIC_KEY,
16};
17use crate::core::io::interface_io::InterfaceIO;
18
19#[derive(Debug)]
20pub struct Storage {
21    pub io_interface: Box<dyn InterfaceIO + Send + Sync>,
22}
23
24pub const ISSUANCE_FILE_PATH: &'static str = "./data/issuance/issuance";
25pub const EARLYBIRDS_FILE_PATH: &'static str = "./data/issuance/earlybirds";
26pub const DEFAULT_FILE_PATH: &'static str = "./data/issuance/default";
27pub const UTXOSTATE_FILE_PATH: &'static str = "./data/issuance/utxodata";
28
29pub struct StorageConfigurer {}
30
31pub fn configure_storage() -> String {
32    if cfg!(test) {
33        String::from("./data/test/blocks/")
34    } else {
35        String::from("./data/blocks/")
36    }
37}
38
39impl Storage {
40    pub fn new(io_interface: Box<dyn InterfaceIO + Send + Sync>) -> Storage {
41        Storage { io_interface }
42    }
43    // TODO : remove if not used
44    /// read from a path to a Vec<u8>
45    pub async fn read(&self, path: &str) -> std::io::Result<Vec<u8>> {
46        let buffer = self.io_interface.read_value(path).await;
47        if buffer.is_err() {
48            let err = buffer.err().unwrap();
49            error!("reading failed : {:?}", err);
50            return Err(err);
51        }
52        let buffer = buffer.unwrap();
53        Ok(buffer)
54    }
55
56    // TODO : remove if not used
57    pub async fn write(&mut self, data: &[u8], filename: &str) {
58        self.io_interface
59            .write_value(filename, data)
60            .await
61            .expect("writing to storage failed");
62    }
63
64    pub async fn file_exists(&self, filename: &str) -> bool {
65        self.io_interface.is_existing_file(filename).await
66    }
67
68    pub fn generate_block_filepath(&self, block: &Block) -> String {
69        self.io_interface.get_block_dir() + block.get_file_name().as_str()
70    }
71    pub async fn write_block_to_disk(&mut self, block: &Block) -> String {
72        let buffer = block.serialize_for_net(BlockType::Full);
73        let filename = self.generate_block_filepath(block);
74
75        let result = self
76            .io_interface
77            .write_value(filename.as_str(), buffer.as_slice())
78            .await;
79        if result.is_err() {
80            let err = result.err().unwrap();
81            // TODO : panicking currently to make sure we can serve any blocks for which we have propagated the header for
82            panic!("failed writing block to disk. {:?}", err);
83        }
84        filename
85    }
86
87    pub async fn load_block_name_list(&self) -> Result<Vec<String>, Error> {
88        let block_dir_path = self.io_interface.get_block_dir();
89
90        match self
91            .io_interface
92            .ensure_block_directory_exists(block_dir_path.as_str())
93        {
94            Ok(()) => debug!("Block directory created"),
95            Err(err) => {
96                error!("Error creating block directory {:?}", err);
97                return Err(err);
98            }
99        }
100
101        let mut list = self
102            .io_interface
103            .load_block_file_list()
104            .await
105            .map_err(|err| {
106                error!("failed loading block list from disk : {:?}", err);
107                Error::from(ErrorKind::InvalidData)
108            })?;
109
110        list.sort();
111        Ok(list)
112    }
113
114    pub async fn load_blocks_from_disk(
115        &mut self,
116        file_names: &[String],
117        mempool_lock: Arc<RwLock<Mempool>>,
118    ) {
119        debug!("loading  {:?} blocks from disk", file_names.len());
120
121        let mut mempool = mempool_lock.write().await;
122        for file_name in file_names.iter() {
123            let file_name = file_name.clone();
124            let result = self
125                .io_interface
126                .read_value((self.io_interface.get_block_dir() + file_name.as_str()).as_str())
127                .await;
128            if result.is_err() {
129                error!(
130                    "failed loading block from disk : {:?}",
131                    result.err().unwrap()
132                );
133                return;
134            }
135            debug!("file : {:?} loaded", file_name);
136            let buffer: Vec<u8> = result.unwrap();
137            let buffer_len = buffer.len();
138            let result = Block::deserialize_from_net(&buffer);
139            if result.is_err() {
140                // ideally this shouldn't happen since we only write blocks which are valid to disk
141                warn!(
142                    "failed deserializing block with buffer length : {:?}",
143                    buffer_len
144                );
145                return;
146            }
147            let mut block: Block = result.unwrap();
148            block.force_loaded = true;
149            block.generate().unwrap();
150            debug!("block : {:?} loaded from disk", block.hash.to_hex());
151            mempool.add_block(block);
152        }
153        // mempool.blocks_queue.shrink_to_fit();
154        // mempool.transactions.shrink_to_fit();
155        // mempool.golden_tickets.shrink_to_fit();
156
157        debug!("blocks loaded to mempool");
158    }
159
160    pub async fn load_block_from_disk(&self, file_name: &str) -> Result<Block, std::io::Error> {
161        debug!("loading block {:?} from disk", file_name);
162        let result = self.io_interface.read_value(file_name).await;
163        if result.is_err() {
164            error!(
165                "failed loading block from disk : {:?}",
166                result.err().unwrap()
167            );
168            return Err(Error::from(ErrorKind::NotFound));
169        }
170        let buffer = result.unwrap();
171        Block::deserialize_from_net(&buffer)
172    }
173
174    pub async fn delete_block_from_disk(&self, filename: &str) -> bool {
175        info!("deleting block from disk : {:?}", filename);
176        self.io_interface.remove_value(filename).await.is_ok()
177    }
178
179    /// Asynchronously retrieves token issuance slips from the provided file path.
180    ///
181    /// This function reads a file from disk that contains the token issuance slips
182    /// and returns these slips as a vector.
183    pub async fn get_token_supply_slips_from_disk_path(&self, issuance_file: &str) -> Vec<Slip> {
184        let mut v: Vec<Slip> = vec![];
185        let mut tokens_issued = 0;
186        //
187        if self.file_exists(issuance_file).await {
188            if let Ok(lines) = self.io_interface.read_value(issuance_file).await {
189                let mut contents = String::from_utf8(lines).unwrap();
190                contents = contents.trim_end_matches('\r').to_string();
191                let lines: Vec<&str> = contents.split('\n').collect();
192
193                for line in lines {
194                    let line = line.trim_end_matches('\r');
195                    if !line.is_empty() {
196                        if let Some(mut slip) = self.convert_issuance_into_slip(line) {
197                            slip.generate_utxoset_key();
198                            v.push(slip);
199                        }
200                    }
201                }
202
203                for i in 0..v.len() {
204                    tokens_issued += v[i].amount;
205                }
206
207                info!("{:?} tokens issued", tokens_issued);
208                return v;
209            }
210        } else {
211            error!("issuance file does not exist");
212        }
213
214        vec![]
215    }
216
217    /// get issuance slips from the standard file
218    pub async fn get_token_supply_slips_from_disk(&self) -> Vec<Slip> {
219        return self
220            .get_token_supply_slips_from_disk_path(ISSUANCE_FILE_PATH)
221            .await;
222    }
223
224    /// convert an issuance expression to slip
225    fn convert_issuance_into_slip(&self, line: &str) -> Option<Slip> {
226        let entries: Vec<&str> = line.split_whitespace().collect();
227
228        let result = entries[0].parse::<u64>();
229
230        if result.is_err() {
231            error!("couldn't parse line : {:?}", line);
232            error!("{:?}", result.err().unwrap());
233            return None;
234        }
235
236        let amount = result.unwrap();
237
238        // Check if amount is less than 25000 and set public key if so
239
240        let publickey_str = if amount < 25000 {
241            PROJECT_PUBLIC_KEY
242        } else {
243            entries[1]
244        };
245
246        let publickey_result = Self::decode_str(publickey_str);
247
248        match publickey_result {
249            Ok(val) => {
250                let mut publickey_array: SaitoPublicKey = [0u8; 33];
251                publickey_array.copy_from_slice(&val);
252
253                // VipOutput is deprecated on mainnet
254                let slip_type = match entries[2].trim_end_matches('\r') {
255                    "VipOutput" => SlipType::Normal,
256                    "Normal" => SlipType::Normal,
257                    _ => panic!("Invalid slip type"),
258                };
259
260                let mut slip = Slip::default();
261                slip.amount = amount;
262                slip.public_key = publickey_array;
263                slip.slip_type = slip_type;
264
265                Some(slip)
266            }
267            Err(err) => {
268                debug!("error reading issuance line {:?}", err);
269                None
270            }
271        }
272    }
273
274    pub fn decode_str(string: &str) -> Result<Vec<u8>, bs58::decode::Error> {
275        return bs58::decode(string).into_vec();
276    }
277
278    /// store the state of utxo balances given that map of balances and a treshold
279    pub async fn write_utxoset_to_disk_path(
280        &self,
281        balance_map: AHashMap<SaitoPublicKey, u64>,
282        threshold: u64,
283        path: &str,
284    ) -> Result<(), Box<dyn std::error::Error>> {
285        debug!("store to {}", path);
286        let file_path = format!("{}", path);
287        let mut file = File::create(&file_path)?;
288
289        //assume normal txtype
290        let txtype = "Normal";
291
292        for (key, value) in &balance_map {
293            if value > &threshold {
294                let key_base58 = key.to_base58();
295                let _ = writeln!(file, "{}\t{}\t{}", value, key_base58, txtype);
296            }
297        }
298        debug!("written {} records", balance_map.len());
299        Ok(())
300    }
301
302    /// store the state of utxo balances to standard file
303    pub async fn write_utxoset_to_disk(
304        &self,
305        balance_map: AHashMap<SaitoPublicKey, u64>,
306        threshold: u64,
307    ) {
308        let _ = self
309            .write_utxoset_to_disk_path(balance_map, threshold, UTXOSTATE_FILE_PATH)
310            .await;
311    }
312
313    pub async fn load_checkpoint_file(
314        &self,
315        block_hash: &SaitoHash,
316        block_id: BlockId,
317    ) -> Option<Vec<SaitoUTXOSetKey>> {
318        let file_path = self.io_interface.get_checkpoint_dir()
319            + format!("{}-{}.chk", block_id, block_hash.to_hex()).as_str();
320        if !self.io_interface.is_existing_file(&file_path).await {
321            debug!("no checkpoint file : {} exists for block", file_path,);
322            return None;
323        }
324        if let Ok(result) = self.io_interface.read_value(file_path.as_str()).await {
325            let mut contents = String::from_utf8(result).unwrap();
326            contents = contents.trim_end_matches('\r').to_string();
327            let lines: Vec<&str> = contents.split('\n').collect();
328            let mut keys: Vec<SaitoUTXOSetKey> = vec![];
329            for line in lines {
330                let line = line.trim_end_matches('\r');
331                if !line.is_empty() {
332                    if let Ok(key) = SaitoUTXOSetKey::from_hex(line) {
333                        keys.push(key);
334                    }
335                }
336            }
337            return Some(keys);
338        }
339        None
340    }
341}
342
343#[cfg(test)]
344mod test {
345    use log::trace;
346
347    use crate::core::consensus::block::Block;
348    use crate::core::defs::{PrintForLog, SaitoHash};
349    use crate::core::util::crypto::hash;
350    use crate::core::util::test::test_manager::test::{create_timestamp, TestManager};
351
352    // part is relative to it's cargo.toml
353
354    // tests if issuance file can be read
355    #[tokio::test]
356    #[serial_test::serial]
357    async fn read_issuance_file_test() {
358        let t = TestManager::default();
359        let read_result = t.storage.read(t.issuance_path).await;
360        assert!(read_result.is_ok(), "Failed to read issuance file.");
361    }
362
363    // test if issuance file utxo is equal to the resultant balance map on created blockchain
364    #[tokio::test]
365    #[serial_test::serial]
366    async fn issuance_hashmap_equals_balance_hashmap_test() {
367        let mut t = TestManager::default();
368
369        let issuance_hashmap = t.convert_issuance_to_hashmap(t.issuance_path).await;
370        let slips = t
371            .storage
372            .get_token_supply_slips_from_disk_path(t.issuance_path)
373            .await;
374
375        t.initialize_from_slips(slips).await;
376        let balance_map = t.balance_map().await;
377        assert_eq!(issuance_hashmap, balance_map);
378    }
379
380    // // check if issuance occurs on block one
381    // #[tokio::test]
382    // async fn issuance_occurs_only_on_block_one_test() {
383    //     let mut t = TestManager::new();
384    //     let issuance_hashmap = t.convert_issuance_to_hashmap(TEST_ISSUANCE_FILEPATH).await;
385    //     let slips = t
386    //         .storage
387    //         .get_token_supply_slips_from_disk_path(TEST_ISSUANCE_FILEPATH)
388    //         .await;
389    //     t.initialize_from_slips(slips).await;
390    //     dbg!();
391
392    //     assert_eq!(t.get_latest_block_id().await, 1);
393    // }
394
395    #[tokio::test]
396    #[serial_test::serial]
397    async fn write_read_block_to_file_test() {
398        let mut t = TestManager::default();
399        t.initialize(100, 100_000_000).await;
400
401        let current_timestamp = create_timestamp();
402
403        let mut block = Block::new();
404        block.timestamp = current_timestamp;
405
406        let filename = t.storage.write_block_to_disk(&mut block).await;
407        trace!("block written to file : {}", filename);
408        let retrieved_block = t.storage.load_block_from_disk(filename.as_str()).await;
409        let mut actual_retrieved_block = retrieved_block.unwrap();
410        actual_retrieved_block.generate().unwrap();
411
412        assert_eq!(block.timestamp, actual_retrieved_block.timestamp);
413    }
414
415    #[test]
416    fn hashing_test() {
417        // pretty_env_logger::init();
418        let h1: SaitoHash =
419            hex::decode("fa761296cdca6b5c0e587e8bdc75f86223072780533a8edeb90fa51aea597128")
420                .unwrap()
421                .try_into()
422                .unwrap();
423        let h2: SaitoHash =
424            hex::decode("8f1717d0f4a244f805436633897d48952c30cb35b3941e5d36cb371c68289d25")
425                .unwrap()
426                .try_into()
427                .unwrap();
428        let mut h3: Vec<u8> = vec![];
429        h3.extend(&h1);
430        h3.extend(&h2);
431
432        let hash = hash(&h3);
433        assert_eq!(
434            hash.to_hex(),
435            "de0cdde5db8fd4489f2038aca5224c18983f6676aebcb2561f5089e12ea2eedf"
436        );
437    }
438}