saito_core/core/consensus/
blockchain_sync_state.rs1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::sync::Arc;
4
5use crate::core::consensus::blockchain::Blockchain;
6use crate::core::consensus::peers::peer_collection::PeerCollection;
7use ahash::HashMap;
8use log::{debug, error, info, trace, warn};
9use tokio::sync::RwLock;
10
11use crate::core::defs::{BlockHash, BlockId, PeerIndex, PrintForLog, SaitoHash};
12
13#[derive(Debug)]
14enum BlockStatus {
15 Queued,
16 Fetching,
17 Fetched,
18 Failed,
19}
20
21struct BlockData {
22 block_hash: BlockHash,
23 block_id: BlockId,
24 status: BlockStatus,
25 retry_count: u32,
26}
27
28const MAX_RETRIES_PER_BLOCK: u32 = 500;
30
31pub struct BlockchainSyncState {
34 received_block_picture: HashMap<PeerIndex, VecDeque<(BlockId, SaitoHash)>>,
36 blocks_to_fetch: HashMap<PeerIndex, VecDeque<BlockData>>,
38 batch_size: usize,
40}
41
42impl BlockchainSyncState {
43 pub fn new(batch_size: usize) -> BlockchainSyncState {
44 info!(
45 "max concurrent block fetches per peer is set as {:?}",
46 batch_size
47 );
48 BlockchainSyncState {
49 received_block_picture: Default::default(),
50 blocks_to_fetch: Default::default(),
51 batch_size,
52 }
53 }
54
55 pub(crate) fn build_peer_block_picture(&mut self, blockchain: &Blockchain) {
58 for (peer_index, received_picture_from_peer) in self.received_block_picture.iter_mut() {
61 received_picture_from_peer.make_contiguous().sort_by(
63 |(id_a, hash_a), (id_b, hash_b)| {
64 if id_a == id_b {
65 return hash_a.cmp(hash_b);
66 }
67 id_a.cmp(id_b)
68 },
69 );
70
71 let blocks_to_fetch_from_peer = self.blocks_to_fetch.entry(*peer_index).or_default();
72 let mut counter = 0;
73
74 loop {
75 if received_picture_from_peer.is_empty() {
76 break;
78 }
79
80 let (id, hash) = received_picture_from_peer
81 .pop_front()
82 .expect("failed popping front from received picture");
83
84 if blockchain.blocks.contains_key(&hash) {
85 continue;
87 }
88
89 let block_data = BlockData {
90 block_hash: hash,
91 block_id: id,
92 status: BlockStatus::Queued,
93 retry_count: 0,
94 };
95
96 let already_exists = blocks_to_fetch_from_peer.iter().any(|b| {
97 let exists =
98 b.block_hash == block_data.block_hash && b.block_id == block_data.block_id;
99 if exists {
100 trace!(
101 "block : {:?}-{:?} already in the queue to be fetched with status : {:?} / retry_count : {:?}",
102 b.block_id,
103 b.block_hash.to_hex(),
104 b.status,
105 b.retry_count
106 );
107 }
108 exists
109 });
110
111 if !already_exists {
112 counter += 1;
113 blocks_to_fetch_from_peer.push_back(block_data);
114 }
115 }
116 if counter > 0 {
117 trace!(
118 "{:?} blocks selected (total : {:?}) for peer : {:?}",
119 counter,
120 blocks_to_fetch_from_peer.len(),
121 peer_index
122 );
123 }
124 }
125 self.received_block_picture.retain(|_, map| !map.is_empty());
127 self.blocks_to_fetch.retain(|_, vec| !vec.is_empty());
128 }
129 pub fn get_fetching_block_count(&self) -> BlockId {
130 self.blocks_to_fetch
131 .values()
132 .map(|v| v.len() as BlockId)
133 .sum::<BlockId>()
134 }
135 pub fn get_blocks_to_fetch_per_peer(
137 &mut self,
138 ) -> HashMap<PeerIndex, Vec<(SaitoHash, BlockId)>> {
139 let mut selected_blocks_per_peer: HashMap<PeerIndex, Vec<(SaitoHash, BlockId)>> =
141 Default::default();
142
143 for (peer_index, deq) in self.blocks_to_fetch.iter_mut() {
145 assert_ne!(
148 *peer_index, 0,
149 "peer index 0 should not enter this list since we handle it at add_entry"
150 );
151
152 deq.make_contiguous().sort_by(|a, b| {
156 if a.block_id == b.block_id {
157 return a.block_hash.cmp(&b.block_hash);
158 }
159 a.block_id.cmp(&b.block_id)
160 });
161
162 let mut fetching_count = 0;
163
164 for block_data in deq.iter_mut() {
167 match block_data.status {
168 BlockStatus::Queued => {}
169 BlockStatus::Fetching => {
170 fetching_count += 1;
171 trace!("currently fetching : {:?}-{:?} from peer : {:?} with retry_count : {:?}",block_data.block_id,block_data.block_hash.to_hex(),peer_index, block_data.retry_count);
172 }
173 BlockStatus::Fetched => {}
174 BlockStatus::Failed => {}
175 }
176 }
177
178 let mut allowed_quota = self.batch_size - fetching_count;
179
180 for block_data in deq.iter_mut() {
181 if allowed_quota == 0 {
183 break;
185 }
186
187 match block_data.status {
188 BlockStatus::Queued => {
189 trace!(
190 "selecting entry : {:?}-{:?} for peer : {:?}",
191 block_data.block_id,
192 block_data.block_hash.to_hex(),
193 peer_index
194 );
195 allowed_quota -= 1;
196 selected_blocks_per_peer
197 .entry(*peer_index)
198 .or_default()
199 .push((block_data.block_hash, block_data.block_id));
200 block_data.status = BlockStatus::Fetching;
201 }
202 BlockStatus::Fetching => {}
203 BlockStatus::Fetched => {}
204 BlockStatus::Failed => {
205 match block_data.retry_count.cmp(&MAX_RETRIES_PER_BLOCK) {
206 Ordering::Less => {
207 block_data.retry_count += 1;
208 trace!(
209 "selecting failed entry : {:?}-{:?} for peer : {:?}",
210 block_data.block_id,
211 block_data.block_hash.to_hex(),
212 peer_index
213 );
214 allowed_quota -= 1;
215 block_data.status = BlockStatus::Queued;
216 }
217 Ordering::Equal => {
218 error!("ignoring block : {:?}-{:?} from peer : {:?} since we have repeatedly failed to fetch it",
219 block_data.block_id,
220 block_data.block_hash.to_hex(),
221 peer_index);
222
223 block_data.retry_count += 1;
225 }
226 Ordering::Greater => {}
227 }
228 }
229 }
230 }
231
232 trace!(
233 "peer : {:?} to be fetched {:?} blocks. first : {:?} last : {:?} fetching : {:?} failed : {:?} queued : {:?}",
234 peer_index,
235 deq.len(),
236 deq.front().unwrap().block_id,
237 deq.back().unwrap().block_id,
238 deq.iter()
239 .filter(|b| matches!(b.status, BlockStatus::Fetching))
240 .count(),
241 deq.iter()
242 .filter(|b| matches!(b.status, BlockStatus::Failed))
243 .count(),
244 deq.iter()
245 .filter(|b| matches!(b.status, BlockStatus::Queued))
246 .count()
247 );
248 }
249
250 selected_blocks_per_peer
251 }
252
253 pub fn mark_as_fetched(&mut self, hash: SaitoHash) {
268 debug!("marking block : {:?} as fetched", hash.to_hex());
269 for (peer_index, deq) in self.blocks_to_fetch.iter_mut() {
270 for block_data in deq {
271 if hash.eq(&block_data.block_hash) {
272 block_data.status = BlockStatus::Fetched;
273 trace!(
274 "block : {:?} marked as fetched from peer : {:?}",
275 block_data.block_hash.to_hex(),
276 peer_index
277 );
278 break;
279 }
280 }
281 }
282
283 self.remove_fetched_blocks();
284 }
285
286 fn remove_fetched_blocks(&mut self) {
300 let mut counter = 0;
301 self.blocks_to_fetch.retain(|_, res| {
302 res.retain(|b| {
303 if matches!(b.status, BlockStatus::Fetched) {
304 counter += 1;
305 return false;
306 }
307 true
308 });
309 !res.is_empty()
310 });
311 trace!("{:?} fetched blocks removed from sync state", counter);
312 }
313 pub async fn add_entry(
329 &mut self,
330 block_hash: SaitoHash,
331 block_id: BlockId,
332 peer_index: PeerIndex,
333 peer_lock: Arc<RwLock<PeerCollection>>,
334 ) {
335 trace!(
336 "add entry : {:?} - {:?} from {:?}",
337 block_hash.to_hex(),
338 block_id,
339 peer_index
340 );
341 if peer_index == 0 {
342 let peers = peer_lock.read().await;
344 debug!("block : {:?}-{:?} is requested without a peer. request the block from all the peers", block_id,block_hash.to_hex());
345
346 for (index, peer) in peers.index_to_peers.iter() {
347 if peer.block_fetch_url.is_empty() {
348 continue;
349 }
350 self.received_block_picture
351 .entry(*index)
352 .or_default()
353 .push_back((block_id, block_hash));
354 }
355 } else {
356 self.received_block_picture
357 .entry(peer_index)
358 .or_default()
359 .push_back((block_id, block_hash));
360 }
361 }
362 pub fn remove_entry(&mut self, block_hash: SaitoHash) {
377 trace!("removing entry : {:?} from peer", block_hash.to_hex());
378 for (_, deq) in self.blocks_to_fetch.iter_mut() {
379 deq.retain(|block_data| block_data.block_hash != block_hash);
380 }
381
382 self.blocks_to_fetch.retain(|_, deq| !deq.is_empty());
383 }
384
385 pub fn get_stats(&self) -> Vec<String> {
386 let mut stats = vec![];
387 for (peer_index, vec) in self.blocks_to_fetch.iter() {
388 let res = self.received_block_picture.get(peer_index);
389 let mut count = 0;
390 if let Some(deq) = res {
391 count = deq.len();
392 }
393 let mut highest_id = 0;
394 let last = vec.back();
395 if let Some(block_data) = last {
396 highest_id = block_data.block_id;
397 }
398 let mut lowest_id = 0;
399 let first = vec.front();
400 if first.is_some() {
401 lowest_id = first.unwrap().block_id;
402 }
403 let fetching_blocks_count = vec
404 .iter()
405 .filter(|block_data| matches!(block_data.status, BlockStatus::Fetching))
406 .count();
407 let stat = format!(
408 "{} - peer : {:?} lowest_id: {:?} fetching_count : {:?} ordered_till : {:?} unordered_block_ids : {:?}",
409 format!("{:width$}", "routing::sync_state", width = 40),
410 peer_index,
411 lowest_id,
412 fetching_blocks_count,
413 highest_id,
414 count
415 );
416 stats.push(stat);
417 }
418 stats
425 }
426
427 pub fn mark_as_failed(&mut self, id: BlockId, hash: BlockHash, peer_index: PeerIndex) {
443 warn!(
444 "failed to fetch block : {:?}-{:?} from peer : {:?}",
445 id,
446 hash.to_hex(),
447 peer_index
448 );
449
450 if let Some(deq) = self.blocks_to_fetch.get_mut(&peer_index) {
451 let data = deq
452 .iter_mut()
453 .find(|data| data.block_id == id && data.block_hash == hash);
454 match data {
455 None => {
456 debug!("we are marking a block {:?}-{:?} from peer : {:?} as failed to fetch. But we don't have such a block or it's already fetched",id,hash.to_hex(),peer_index);
457 }
458 Some(data) => {
459 data.status = BlockStatus::Failed;
460 }
461 }
462 } else {
463 debug!("we are marking a block {:?}-{:?} from peer : {:?} as failed to fetch. But we don't have such a peer",id,hash.to_hex(),peer_index);
464 }
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use crate::core::consensus::blockchain_sync_state::BlockchainSyncState;
471 use crate::core::defs::BlockId;
472 use crate::core::util::test::test_manager::test::TestManager;
473 use std::ops::Deref;
474
475 #[tokio::test]
476 #[serial_test::serial]
477 async fn multiple_forks_from_multiple_peers_test() {
478 let t = TestManager::default();
479 let mut state = BlockchainSyncState::new(10);
480 for i in 0..state.batch_size + 50 {
481 state
482 .add_entry(
483 [(i + 1) as u8; 32],
484 (i + 1) as BlockId,
485 1,
486 t.peer_lock.clone(),
487 )
488 .await;
489 }
490 for i in 4..state.batch_size + 50 {
491 state
492 .add_entry(
493 [(i + 101) as u8; 32],
494 (i + 1) as BlockId,
495 1,
496 t.peer_lock.clone(),
497 )
498 .await;
499 }
500
501 state.build_peer_block_picture(t.blockchain_lock.read().await.deref());
502 let mut result = state.get_blocks_to_fetch_per_peer();
503 assert_eq!(result.len(), 1);
504 let vec = result.get_mut(&1);
505 assert!(vec.is_some());
506 let vec = vec.unwrap();
507 assert_eq!(vec.len(), state.batch_size);
508 assert_eq!(state.batch_size, 10);
509 let mut fetching = vec![];
510 for i in 0..4 {
511 let (entry, _) = vec.get(i).unwrap();
512 assert_eq!(*entry, [(i + 1) as u8; 32]);
513 fetching.push((1, [(i + 1) as u8; 32]));
514 }
515 let mut value = 4;
516 for index in (4..10).step_by(2) {
517 value += 1;
518 let (entry, _) = vec.get(index).unwrap();
519 assert_eq!(*entry, [(value) as u8; 32]);
520 fetching.push((1, [(value) as u8; 32]));
521
522 let (entry, _) = vec.get(index + 1).unwrap();
523 assert_eq!(*entry, [(value + 100) as u8; 32]);
524 fetching.push((1, [(value + 100) as u8; 32]));
525 }
526 state.build_peer_block_picture(t.blockchain_lock.read().await.deref());
527 let result = state.get_blocks_to_fetch_per_peer();
528 assert_eq!(result.len(), 0);
529
530 state.remove_entry([1; 32]);
531 state.remove_entry([5; 32]);
532 state.remove_entry([106; 32]);
533 state.build_peer_block_picture(t.blockchain_lock.read().await.deref());
534 let mut result = state.get_blocks_to_fetch_per_peer();
535 assert_eq!(result.len(), 1);
536 let vec = result.get_mut(&1).unwrap();
537 assert_eq!(vec.len(), 3);
538 }
543}