saito_core/core/consensus/
blockchain_sync_state.rs

1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::sync::Arc;
4
5use crate::core::consensus::blockchain::Blockchain;
6use crate::core::consensus::peers::peer_collection::PeerCollection;
7use ahash::HashMap;
8use log::{debug, error, info, trace, warn};
9use tokio::sync::RwLock;
10
11use crate::core::defs::{BlockHash, BlockId, PeerIndex, PrintForLog, SaitoHash};
12
13#[derive(Debug)]
14enum BlockStatus {
15    Queued,
16    Fetching,
17    Fetched,
18    Failed,
19}
20
21struct BlockData {
22    block_hash: BlockHash,
23    block_id: BlockId,
24    status: BlockStatus,
25    retry_count: u32,
26}
27
28/// How many times should we retry before giving up on that block for that peer
29const MAX_RETRIES_PER_BLOCK: u32 = 500;
30
31/// Maintains the state for fetching blocks from other peers into this peer.
32/// Tries to fetch the blocks in the most resource efficient way possible.
33pub struct BlockchainSyncState {
34    /// These are the blocks we have received from each of our peers
35    received_block_picture: HashMap<PeerIndex, VecDeque<(BlockId, SaitoHash)>>,
36    /// These are the blocks which we have to fetch from each of our peers
37    blocks_to_fetch: HashMap<PeerIndex, VecDeque<BlockData>>,
38    /// Maximum amount of blocks which can be fetched concurrently from a peer. If this number is too high, the peer's performance might get affected or the requests might be rejected
39    batch_size: usize,
40}
41
42impl BlockchainSyncState {
43    pub fn new(batch_size: usize) -> BlockchainSyncState {
44        info!(
45            "max concurrent block fetches per peer is set as {:?}",
46            batch_size
47        );
48        BlockchainSyncState {
49            received_block_picture: Default::default(),
50            blocks_to_fetch: Default::default(),
51            batch_size,
52        }
53    }
54
55    /// Builds the list of blocks to be fetched from each peer. Blocks fetched are in order if in the same fork,
56    /// or at the same level for multiple forks to make sure the blocks fetched can be processed most efficiently
57    pub(crate) fn build_peer_block_picture(&mut self, blockchain: &Blockchain) {
58        // trace!("building peer block picture");
59        // for every block picture received from a peer, we sort and create a list of sequential hashes to fetch from peers
60        for (peer_index, received_picture_from_peer) in self.received_block_picture.iter_mut() {
61            // need to sort before sequencing
62            received_picture_from_peer.make_contiguous().sort_by(
63                |(id_a, hash_a), (id_b, hash_b)| {
64                    if id_a == id_b {
65                        return hash_a.cmp(hash_b);
66                    }
67                    id_a.cmp(id_b)
68                },
69            );
70
71            let blocks_to_fetch_from_peer = self.blocks_to_fetch.entry(*peer_index).or_default();
72            let mut counter = 0;
73
74            loop {
75                if received_picture_from_peer.is_empty() {
76                    // have added all the received block hashes to the fetching list
77                    break;
78                }
79
80                let (id, hash) = received_picture_from_peer
81                    .pop_front()
82                    .expect("failed popping front from received picture");
83
84                if blockchain.blocks.contains_key(&hash) {
85                    // not fetching blocks we already have
86                    continue;
87                }
88
89                let block_data = BlockData {
90                    block_hash: hash,
91                    block_id: id,
92                    status: BlockStatus::Queued,
93                    retry_count: 0,
94                };
95
96                let already_exists = blocks_to_fetch_from_peer.iter().any(|b| {
97                    let exists =
98                        b.block_hash == block_data.block_hash && b.block_id == block_data.block_id;
99                    if exists {
100                        trace!(
101                            "block : {:?}-{:?} already in the queue to be fetched with status : {:?} / retry_count : {:?}",
102                            b.block_id,
103                            b.block_hash.to_hex(),
104                            b.status,
105                            b.retry_count
106                        );
107                    }
108                    exists
109                });
110
111                if !already_exists {
112                    counter += 1;
113                    blocks_to_fetch_from_peer.push_back(block_data);
114                }
115            }
116            if counter > 0 {
117                trace!(
118                    "{:?} blocks selected (total : {:?}) for peer : {:?}",
119                    counter,
120                    blocks_to_fetch_from_peer.len(),
121                    peer_index
122                );
123            }
124        }
125        // removing empty lists from memory
126        self.received_block_picture.retain(|_, map| !map.is_empty());
127        self.blocks_to_fetch.retain(|_, vec| !vec.is_empty());
128    }
129    pub fn get_fetching_block_count(&self) -> BlockId {
130        self.blocks_to_fetch
131            .values()
132            .map(|v| v.len() as BlockId)
133            .sum::<BlockId>()
134    }
135    /// Generates the list of blocks which needs to be fetched next. A list is generated per each peer since we can fetch from multiple peers concurrently.
136    pub fn get_blocks_to_fetch_per_peer(
137        &mut self,
138    ) -> HashMap<PeerIndex, Vec<(SaitoHash, BlockId)>> {
139        // trace!("getting block to be fetched per each peer",);
140        let mut selected_blocks_per_peer: HashMap<PeerIndex, Vec<(SaitoHash, BlockId)>> =
141            Default::default();
142
143        // for each peer check if we can fetch block
144        for (peer_index, deq) in self.blocks_to_fetch.iter_mut() {
145            // TODO : sorting this array can be a performance hit. need to check
146
147            assert_ne!(
148                *peer_index, 0,
149                "peer index 0 should not enter this list since we handle it at add_entry"
150            );
151
152            // we need to sort the list to make sure we are fetching the next in sequence blocks.
153            // otherwise our memory will grow since we need to keep those fetched blocks in memory.
154            // we need to sort this here because some previous block hashes can be received out of sequence
155            deq.make_contiguous().sort_by(|a, b| {
156                if a.block_id == b.block_id {
157                    return a.block_hash.cmp(&b.block_hash);
158                }
159                a.block_id.cmp(&b.block_id)
160            });
161
162            let mut fetching_count = 0;
163
164            // TODO : we don't need to iterate through this list multiple times. refactor !!!
165            //  (can collect more than required and drop larger block ids if there are too many)
166            for block_data in deq.iter_mut() {
167                match block_data.status {
168                    BlockStatus::Queued => {}
169                    BlockStatus::Fetching => {
170                        fetching_count += 1;
171                        trace!("currently fetching : {:?}-{:?} from peer : {:?} with retry_count : {:?}",block_data.block_id,block_data.block_hash.to_hex(),peer_index, block_data.retry_count);
172                    }
173                    BlockStatus::Fetched => {}
174                    BlockStatus::Failed => {}
175                }
176            }
177
178            let mut allowed_quota = self.batch_size - fetching_count;
179
180            for block_data in deq.iter_mut() {
181                // we peers concurrent fetches to this amount
182                if allowed_quota == 0 {
183                    // we have reached allowed concurrent fetches quota.
184                    break;
185                }
186
187                match block_data.status {
188                    BlockStatus::Queued => {
189                        trace!(
190                            "selecting entry : {:?}-{:?} for peer : {:?}",
191                            block_data.block_id,
192                            block_data.block_hash.to_hex(),
193                            peer_index
194                        );
195                        allowed_quota -= 1;
196                        selected_blocks_per_peer
197                            .entry(*peer_index)
198                            .or_default()
199                            .push((block_data.block_hash, block_data.block_id));
200                        block_data.status = BlockStatus::Fetching;
201                    }
202                    BlockStatus::Fetching => {}
203                    BlockStatus::Fetched => {}
204                    BlockStatus::Failed => {
205                        match block_data.retry_count.cmp(&MAX_RETRIES_PER_BLOCK) {
206                            Ordering::Less => {
207                                block_data.retry_count += 1;
208                                trace!(
209                                    "selecting failed entry : {:?}-{:?} for peer : {:?}",
210                                    block_data.block_id,
211                                    block_data.block_hash.to_hex(),
212                                    peer_index
213                                );
214                                allowed_quota -= 1;
215                                block_data.status = BlockStatus::Queued;
216                            }
217                            Ordering::Equal => {
218                                error!("ignoring block : {:?}-{:?} from peer : {:?} since we have repeatedly failed to fetch it",
219                                block_data.block_id,
220                                block_data.block_hash.to_hex(),
221                                peer_index);
222
223                                // increasing this so the error is only printed once per block per peer
224                                block_data.retry_count += 1;
225                            }
226                            Ordering::Greater => {}
227                        }
228                    }
229                }
230            }
231
232            trace!(
233                "peer : {:?} to be fetched {:?} blocks. first : {:?} last : {:?} fetching : {:?} failed : {:?} queued : {:?}",
234                peer_index,
235                deq.len(),
236                deq.front().unwrap().block_id,
237                deq.back().unwrap().block_id,
238                deq.iter()
239                    .filter(|b| matches!(b.status, BlockStatus::Fetching))
240                    .count(),
241                deq.iter()
242                    .filter(|b| matches!(b.status, BlockStatus::Failed))
243                    .count(),
244                deq.iter()
245                    .filter(|b| matches!(b.status, BlockStatus::Queued))
246                    .count()
247            );
248        }
249
250        selected_blocks_per_peer
251    }
252
253    /// Mark the block state as "fetched"
254    ///
255    /// # Arguments
256    ///
257    /// * `peer_index`:
258    /// * `hash`:
259    ///
260    /// returns: ()
261    ///
262    /// # Examples
263    ///
264    /// ```
265    ///
266    /// ```
267    pub fn mark_as_fetched(&mut self, hash: SaitoHash) {
268        debug!("marking block : {:?} as fetched", hash.to_hex());
269        for (peer_index, deq) in self.blocks_to_fetch.iter_mut() {
270            for block_data in deq {
271                if hash.eq(&block_data.block_hash) {
272                    block_data.status = BlockStatus::Fetched;
273                    trace!(
274                        "block : {:?} marked as fetched from peer : {:?}",
275                        block_data.block_hash.to_hex(),
276                        peer_index
277                    );
278                    break;
279                }
280            }
281        }
282
283        self.remove_fetched_blocks();
284    }
285
286    /// Removes all the entries related to fetched blocks and removes any empty collections from memory
287    ///
288    /// # Arguments
289    ///
290    /// * `peer_index`:
291    ///
292    /// returns: ()
293    ///
294    /// # Examples
295    ///
296    /// ```
297    ///
298    /// ```
299    fn remove_fetched_blocks(&mut self) {
300        let mut counter = 0;
301        self.blocks_to_fetch.retain(|_, res| {
302            res.retain(|b| {
303                if matches!(b.status, BlockStatus::Fetched) {
304                    counter += 1;
305                    return false;
306                }
307                true
308            });
309            !res.is_empty()
310        });
311        trace!("{:?} fetched blocks removed from sync state", counter);
312    }
313    /// Adds an entry to this data structure which will be fetched later after prioritizing.
314    ///
315    /// # Arguments
316    ///
317    /// * `block_hash`:
318    /// * `block_id`:
319    /// * `peer_index`:
320    ///
321    /// returns: ()
322    ///
323    /// # Examples
324    ///
325    /// ```
326    ///
327    /// ```
328    pub async fn add_entry(
329        &mut self,
330        block_hash: SaitoHash,
331        block_id: BlockId,
332        peer_index: PeerIndex,
333        peer_lock: Arc<RwLock<PeerCollection>>,
334    ) {
335        trace!(
336            "add entry : {:?} - {:?} from {:?}",
337            block_hash.to_hex(),
338            block_id,
339            peer_index
340        );
341        if peer_index == 0 {
342            // this means we don't have which peer to request this block from
343            let peers = peer_lock.read().await;
344            debug!("block : {:?}-{:?} is requested without a peer. request the block from all the peers", block_id,block_hash.to_hex());
345
346            for (index, peer) in peers.index_to_peers.iter() {
347                if peer.block_fetch_url.is_empty() {
348                    continue;
349                }
350                self.received_block_picture
351                    .entry(*index)
352                    .or_default()
353                    .push_back((block_id, block_hash));
354            }
355        } else {
356            self.received_block_picture
357                .entry(peer_index)
358                .or_default()
359                .push_back((block_id, block_hash));
360        }
361    }
362    /// Removes entry when the hash is added to the blockchain. If so we can move the block ceiling up.
363    ///
364    /// # Arguments
365    ///
366    /// * `block_hash`:
367    /// * `peer_index`:
368    ///
369    /// returns: ()
370    ///
371    /// # Examples
372    ///
373    /// ```
374    ///
375    /// ```
376    pub fn remove_entry(&mut self, block_hash: SaitoHash) {
377        trace!("removing entry : {:?} from peer", block_hash.to_hex());
378        for (_, deq) in self.blocks_to_fetch.iter_mut() {
379            deq.retain(|block_data| block_data.block_hash != block_hash);
380        }
381
382        self.blocks_to_fetch.retain(|_, deq| !deq.is_empty());
383    }
384
385    pub fn get_stats(&self) -> Vec<String> {
386        let mut stats = vec![];
387        for (peer_index, vec) in self.blocks_to_fetch.iter() {
388            let res = self.received_block_picture.get(peer_index);
389            let mut count = 0;
390            if let Some(deq) = res {
391                count = deq.len();
392            }
393            let mut highest_id = 0;
394            let last = vec.back();
395            if let Some(block_data) = last {
396                highest_id = block_data.block_id;
397            }
398            let mut lowest_id = 0;
399            let first = vec.front();
400            if first.is_some() {
401                lowest_id = first.unwrap().block_id;
402            }
403            let fetching_blocks_count = vec
404                .iter()
405                .filter(|block_data| matches!(block_data.status, BlockStatus::Fetching))
406                .count();
407            let stat = format!(
408                "{} - peer : {:?} lowest_id: {:?} fetching_count : {:?} ordered_till : {:?} unordered_block_ids : {:?}",
409                format!("{:width$}", "routing::sync_state", width = 40),
410                peer_index,
411                lowest_id,
412                fetching_blocks_count,
413                highest_id,
414                count
415            );
416            stats.push(stat);
417        }
418        // let stat = format!(
419        //     "{} - block_fetch_ceiling : {:?}",
420        //     format!("{:width$}", "routing::sync_state", width = 40),
421        //     self.block_fetch_ceiling
422        // );
423        // stats.push(stat);
424        stats
425    }
426
427    /// Mark the blocks which we couldn't fetch from the peer. After a sevaral retries we will stop fetching the block until we fetch it from another peer.
428    ///
429    /// # Arguments
430    ///
431    /// * `id`:
432    /// * `hash`:
433    /// * `peer_index`:
434    ///
435    /// returns: ()
436    ///
437    /// # Examples
438    ///
439    /// ```
440    ///
441    /// ```
442    pub fn mark_as_failed(&mut self, id: BlockId, hash: BlockHash, peer_index: PeerIndex) {
443        warn!(
444            "failed to fetch block : {:?}-{:?} from peer : {:?}",
445            id,
446            hash.to_hex(),
447            peer_index
448        );
449
450        if let Some(deq) = self.blocks_to_fetch.get_mut(&peer_index) {
451            let data = deq
452                .iter_mut()
453                .find(|data| data.block_id == id && data.block_hash == hash);
454            match data {
455                None => {
456                    debug!("we are marking a block {:?}-{:?} from peer : {:?} as failed to fetch. But we don't have such a block or it's already fetched",id,hash.to_hex(),peer_index);
457                }
458                Some(data) => {
459                    data.status = BlockStatus::Failed;
460                }
461            }
462        } else {
463            debug!("we are marking a block {:?}-{:?} from peer : {:?} as failed to fetch. But we don't have such a peer",id,hash.to_hex(),peer_index);
464        }
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use crate::core::consensus::blockchain_sync_state::BlockchainSyncState;
471    use crate::core::defs::BlockId;
472    use crate::core::util::test::test_manager::test::TestManager;
473    use std::ops::Deref;
474
475    #[tokio::test]
476    #[serial_test::serial]
477    async fn multiple_forks_from_multiple_peers_test() {
478        let t = TestManager::default();
479        let mut state = BlockchainSyncState::new(10);
480        for i in 0..state.batch_size + 50 {
481            state
482                .add_entry(
483                    [(i + 1) as u8; 32],
484                    (i + 1) as BlockId,
485                    1,
486                    t.peer_lock.clone(),
487                )
488                .await;
489        }
490        for i in 4..state.batch_size + 50 {
491            state
492                .add_entry(
493                    [(i + 101) as u8; 32],
494                    (i + 1) as BlockId,
495                    1,
496                    t.peer_lock.clone(),
497                )
498                .await;
499        }
500
501        state.build_peer_block_picture(t.blockchain_lock.read().await.deref());
502        let mut result = state.get_blocks_to_fetch_per_peer();
503        assert_eq!(result.len(), 1);
504        let vec = result.get_mut(&1);
505        assert!(vec.is_some());
506        let vec = vec.unwrap();
507        assert_eq!(vec.len(), state.batch_size);
508        assert_eq!(state.batch_size, 10);
509        let mut fetching = vec![];
510        for i in 0..4 {
511            let (entry, _) = vec.get(i).unwrap();
512            assert_eq!(*entry, [(i + 1) as u8; 32]);
513            fetching.push((1, [(i + 1) as u8; 32]));
514        }
515        let mut value = 4;
516        for index in (4..10).step_by(2) {
517            value += 1;
518            let (entry, _) = vec.get(index).unwrap();
519            assert_eq!(*entry, [(value) as u8; 32]);
520            fetching.push((1, [(value) as u8; 32]));
521
522            let (entry, _) = vec.get(index + 1).unwrap();
523            assert_eq!(*entry, [(value + 100) as u8; 32]);
524            fetching.push((1, [(value + 100) as u8; 32]));
525        }
526        state.build_peer_block_picture(t.blockchain_lock.read().await.deref());
527        let result = state.get_blocks_to_fetch_per_peer();
528        assert_eq!(result.len(), 0);
529
530        state.remove_entry([1; 32]);
531        state.remove_entry([5; 32]);
532        state.remove_entry([106; 32]);
533        state.build_peer_block_picture(t.blockchain_lock.read().await.deref());
534        let mut result = state.get_blocks_to_fetch_per_peer();
535        assert_eq!(result.len(), 1);
536        let vec = result.get_mut(&1).unwrap();
537        assert_eq!(vec.len(), 3);
538        // TODO : fix this
539        // assert!(vec.contains(&[8; 32]));
540        // assert!(vec.contains(&[108; 32]));
541        // assert!(vec.contains(&[9; 32]));
542    }
543}