saito_core/core/io/
network.rs

1use std::io::{Error, ErrorKind};
2use std::sync::Arc;
3
4use log::{debug, error, info, trace, warn};
5use tokio::sync::RwLock;
6
7use crate::core::consensus::block::Block;
8use crate::core::consensus::blockchain::Blockchain;
9use crate::core::consensus::mempool::Mempool;
10use crate::core::consensus::peers::peer::{Peer, PeerStatus};
11use crate::core::consensus::peers::peer_collection::PeerCollection;
12use crate::core::consensus::transaction::{Transaction, TransactionType};
13use crate::core::consensus::wallet::Wallet;
14use crate::core::defs::{BlockId, PeerIndex, PrintForLog, SaitoHash, SaitoPublicKey, Timestamp};
15use crate::core::io::interface_io::{InterfaceEvent, InterfaceIO};
16use crate::core::msg::block_request::BlockchainRequest;
17use crate::core::msg::handshake::{HandshakeChallenge, HandshakeResponse};
18use crate::core::msg::message::Message;
19use crate::core::process::keep_time::Timer;
20use crate::core::process::version::Version;
21
22#[derive(Debug)]
23pub enum PeerDisconnectType {
24    /// If the peer was disconnected without our intervention
25    ExternalDisconnect,
26    /// If we disconnected the peer
27    InternalDisconnect,
28}
29use crate::core::util::configuration::Configuration;
30
31// #[derive(Debug)]
32pub struct Network {
33    // TODO : manage peers from network
34    pub peer_lock: Arc<RwLock<PeerCollection>>,
35    pub io_interface: Box<dyn InterfaceIO + Send + Sync>,
36    pub wallet_lock: Arc<RwLock<Wallet>>,
37    pub config_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
38    pub timer: Timer,
39}
40
41impl Network {
42    pub fn new(
43        io_handler: Box<dyn InterfaceIO + Send + Sync>,
44        peer_lock: Arc<RwLock<PeerCollection>>,
45        wallet_lock: Arc<RwLock<Wallet>>,
46        config_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
47        timer: Timer,
48    ) -> Network {
49        Network {
50            peer_lock,
51            io_interface: io_handler,
52            wallet_lock,
53            config_lock,
54            timer,
55        }
56    }
57    pub async fn propagate_block(&self, block: &Block) {
58        debug!("propagating block : {:?}", block.hash.to_hex());
59
60        let mut excluded_peers = vec![];
61        // finding block sender to avoid resending the block to that node
62        if let Some(index) = block.routed_from_peer.as_ref() {
63            excluded_peers.push(*index);
64        }
65
66        {
67            let peers = self.peer_lock.read().await;
68            for (index, peer) in peers.index_to_peers.iter() {
69                if peer.get_public_key().is_none() {
70                    excluded_peers.push(*index);
71                    continue;
72                }
73            }
74        }
75
76        debug!("sending block : {:?} to peers", block.hash.to_hex());
77        let message = Message::BlockHeaderHash(block.hash, block.id);
78        self.io_interface
79            .send_message_to_all(message.serialize().as_slice(), excluded_peers)
80            .await
81            .unwrap();
82    }
83
84    pub async fn propagate_transaction(&self, transaction: &Transaction) {
85        // TODO : return if tx is not valid
86
87        let peers = self.peer_lock.read().await;
88        let mut wallet = self.wallet_lock.write().await;
89
90        let public_key = wallet.public_key;
91
92        if transaction
93            .from
94            .first()
95            .expect("from slip should exist")
96            .public_key
97            == public_key
98        {
99            if let TransactionType::GoldenTicket = transaction.transaction_type {
100            } else {
101                wallet.add_to_pending(transaction.clone());
102            }
103        }
104
105        for (index, peer) in peers.index_to_peers.iter() {
106            if peer.get_public_key().is_none() {
107                continue;
108            }
109            let public_key = peer.get_public_key().unwrap();
110            if transaction.is_in_path(&public_key) {
111                continue;
112            }
113            let mut transaction = transaction.clone();
114            transaction.add_hop(&wallet.private_key, &wallet.public_key, &public_key);
115            let message = Message::Transaction(transaction);
116            self.io_interface
117                .send_message(*index, message.serialize().as_slice())
118                .await
119                .unwrap();
120        }
121    }
122
123    pub async fn handle_peer_disconnect(
124        &mut self,
125        peer_index: u64,
126        disconnect_type: PeerDisconnectType,
127    ) {
128        debug!("handling peer disconnect, peer_index = {}", peer_index);
129
130        if let PeerDisconnectType::ExternalDisconnect = disconnect_type {
131            self.io_interface
132                .disconnect_from_peer(peer_index)
133                .await
134                .unwrap();
135        }
136
137        let mut peers = self.peer_lock.write().await;
138        if let Some(peer) = peers.find_peer_by_index_mut(peer_index) {
139            if peer.get_public_key().is_some() {
140                // calling here before removing the peer from collections
141                self.io_interface
142                    .send_interface_event(InterfaceEvent::PeerConnectionDropped(
143                        peer_index,
144                        peer.get_public_key().unwrap(),
145                    ));
146            }
147
148            peer.mark_as_disconnected(self.timer.get_timestamp_in_ms());
149        } else {
150            error!("unknown peer : {:?} disconnected", peer_index);
151        }
152    }
153    pub async fn handle_new_peer(&mut self, peer_index: u64, ip_addr: Option<String>) {
154        // TODO : if an incoming peer is same as static peer, handle the scenario;
155        let mut peers = self.peer_lock.write().await;
156        if let Some(peer) = peers.find_peer_by_index_mut(peer_index) {
157            debug!("static peer : {:?} connected", peer_index);
158            peer.peer_status = PeerStatus::Connecting;
159            peer.ip_address = ip_addr;
160        } else {
161            debug!("new peer added : {:?}", peer_index);
162            let mut peer = Peer::new(peer_index);
163            peer.peer_status = PeerStatus::Connecting;
164            peer.ip_address = ip_addr;
165            peers.index_to_peers.insert(peer_index, peer);
166        }
167
168        if let Some(peer) = peers.find_peer_by_index_mut(peer_index) {
169            if peer.static_peer_config.is_none() {
170                peer.initiate_handshake(self.io_interface.as_ref())
171                    .await
172                    .unwrap();
173            }
174        }
175
176        debug!("current peer count = {:?}", peers.index_to_peers.len());
177    }
178
179    pub async fn handle_handshake_challenge(
180        &self,
181        peer_index: u64,
182        challenge: HandshakeChallenge,
183        wallet_lock: Arc<RwLock<Wallet>>,
184        config_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
185    ) {
186        let mut peers = self.peer_lock.write().await;
187
188        let peer = peers.index_to_peers.get_mut(&peer_index);
189        if peer.is_none() {
190            error!(
191                "peer not found for index : {:?}. cannot handle handshake challenge",
192                peer_index
193            );
194            return;
195        }
196        let peer = peer.unwrap();
197        let current_time = self.timer.get_timestamp_in_ms();
198
199        // TODO : this rate check is done after a lock is acquired which is not ideal
200        peer.handshake_limiter.increase();
201        if peer.has_handshake_limit_exceeded(current_time) {
202            warn!(
203                "peer {:?} exceeded rate peers for handshake challenge",
204                peer_index
205            );
206            return;
207        }
208
209        peer.handle_handshake_challenge(
210            challenge,
211            self.io_interface.as_ref(),
212            wallet_lock.clone(),
213            config_lock,
214        )
215        .await
216        .unwrap();
217    }
218    pub async fn handle_handshake_response(
219        &mut self,
220        peer_index: u64,
221        response: HandshakeResponse,
222        wallet_lock: Arc<RwLock<Wallet>>,
223        blockchain_lock: Arc<RwLock<Blockchain>>,
224        configs_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
225    ) {
226        let mut peers = self.peer_lock.write().await;
227        let public_key;
228        {
229            let peer = peers.index_to_peers.get_mut(&peer_index);
230            if peer.is_none() {
231                error!(
232                    "peer not found for index : {:?}. cannot handle handshake response",
233                    peer_index
234                );
235                return;
236            }
237            let peer: &mut Peer = peer.unwrap();
238            let current_time = self.timer.get_timestamp_in_ms();
239            peer.handshake_limiter.increase();
240            if peer.has_handshake_limit_exceeded(current_time) {
241                warn!(
242                    "peer {:?} exceeded rate peers for handshake challenge",
243                    peer_index
244                );
245                return;
246            }
247            let result = peer
248                .handle_handshake_response(
249                    response,
250                    self.io_interface.as_ref(),
251                    wallet_lock.clone(),
252                    configs_lock.clone(),
253                    current_time,
254                )
255                .await;
256            if result.is_err() || peer.get_public_key().is_none() {
257                info!(
258                    "disconnecting peer : {:?} as handshake response was not handled",
259                    peer_index
260                );
261                self.io_interface
262                    .disconnect_from_peer(peer_index)
263                    .await
264                    .unwrap();
265                return;
266            }
267            public_key = peer.get_public_key().unwrap();
268            debug!(
269                "peer : {:?} handshake successful for peer : {:?}",
270                peer.index,
271                public_key.to_base58()
272            );
273        }
274        if let Some(old_peer) = peers.remove_reconnected_peer(&public_key) {
275            // if we already have the public key, and it's disconnected, we will consider this as a reconnection.
276            // so we will remove the old peer and add those data into new peer
277            // else we will reject the new connection
278            let peer = peers
279                .find_peer_by_index_mut(peer_index)
280                .expect("peer should exist here since it was accessed previously");
281            peer.join_as_reconnection(old_peer);
282        } else {
283            peers.address_to_peers.insert(public_key, peer_index);
284        }
285
286        for (index, peer) in &peers.index_to_peers {
287            if peer.public_key.is_none() {
288                continue;
289            }
290            debug!(
291                "peer : {:?} with key : {:?} is currently connected : {:?}",
292                index,
293                peer.public_key.unwrap().to_base58(),
294                peer.peer_status
295            );
296        }
297
298        self.io_interface
299            .send_interface_event(InterfaceEvent::PeerConnected(peer_index));
300        // start block syncing here
301        self.request_blockchain_from_peer(peer_index, blockchain_lock.clone())
302            .await;
303    }
304    pub async fn handle_received_key_list(
305        &mut self,
306        peer_index: PeerIndex,
307        key_list: Vec<SaitoPublicKey>,
308    ) -> Result<(), Error> {
309        trace!(
310            "handler received key list of length : {:?} from peer : {:?}",
311            key_list.len(),
312            peer_index
313        );
314
315        let current_time = self.timer.get_timestamp_in_ms();
316        // Lock peers to write
317        let mut peers = self.peer_lock.write().await;
318        let peer = peers.index_to_peers.get_mut(&peer_index);
319
320        if let Some(peer) = peer {
321            // Check rate peers
322            peer.key_list_limiter.increase();
323            if peer.has_key_list_limit_exceeded(current_time) {
324                debug!(
325                    "peer {} - {} exceeded the rate for key list",
326                    peer_index,
327                    peer.public_key.unwrap().to_base58()
328                );
329                return Err(Error::from(ErrorKind::Other));
330            }
331
332            trace!(
333                "handling received keylist of length : {:?} from peer : {:?}",
334                key_list.len(),
335                peer_index
336            );
337            peer.key_list = key_list;
338            Ok(())
339        } else {
340            error!(
341                "peer not found for index : {:?}. cannot handle received key list",
342                peer_index
343            );
344            Err(Error::from(ErrorKind::NotFound))
345        }
346    }
347
348    pub async fn send_key_list(&self, key_list: &[SaitoPublicKey]) {
349        debug!(
350            "sending key list to all the peers {:?}",
351            key_list
352                .iter()
353                .map(|key| key.to_base58())
354                .collect::<Vec<String>>()
355        );
356
357        self.io_interface
358            .send_message_to_all(
359                Message::KeyListUpdate(key_list.to_vec())
360                    .serialize()
361                    .as_slice(),
362                vec![],
363            )
364            .await
365            .unwrap();
366    }
367
368    pub(crate) async fn request_blockchain_from_peer(
369        &self,
370        peer_index: u64,
371        blockchain_lock: Arc<RwLock<Blockchain>>,
372    ) {
373        let configs = self.config_lock.read().await;
374        // trace!("locking blockchain 1");
375        let blockchain = blockchain_lock.read().await;
376        let buffer: Vec<u8>;
377        debug!(
378            "requesting blockchain from peer : {:?} latest_block_id : {:?}, last_block_id : {:?}",
379            peer_index,
380            blockchain.get_latest_block_id(),
381            blockchain.last_block_id
382        );
383
384        if configs.is_spv_mode() {
385            let request;
386            {
387                debug!(
388                    "blockchain last block id : {:?}, latest block id : {:?}",
389                    blockchain.last_block_id,
390                    blockchain.get_latest_block_id()
391                );
392                if blockchain.last_block_id >= blockchain.get_latest_block_id() {
393                    let fork_id = blockchain.fork_id.unwrap_or([0; 32]);
394                    debug!(
395                        "blockchain request 1 : latest_id: {:?} latest_hash: {:?} fork_id: {:?}",
396                        blockchain.last_block_id,
397                        blockchain.last_block_hash.to_hex(),
398                        fork_id.to_hex()
399                    );
400                    request = BlockchainRequest {
401                        latest_block_id: blockchain.last_block_id,
402                        latest_block_hash: blockchain.last_block_hash,
403                        fork_id,
404                    };
405                } else if let Some(fork_id) =
406                    blockchain.generate_fork_id(blockchain.get_latest_block_id())
407                {
408                    debug!(
409                        "blockchain request 2 : latest_id: {:?} latest_hash: {:?} fork_id: {:?}",
410                        blockchain.get_latest_block_id(),
411                        blockchain.get_latest_block_hash().to_hex(),
412                        fork_id.to_hex()
413                    );
414                    request = BlockchainRequest {
415                        latest_block_id: blockchain.get_latest_block_id(),
416                        latest_block_hash: blockchain.get_latest_block_hash(),
417                        fork_id,
418                    };
419                } else {
420                    debug!(
421                        "blockchain request 3 : latest_id: {:?} latest_hash: {:?} fork_id: {:?}",
422                        blockchain.get_latest_block_id(),
423                        blockchain.get_latest_block_hash().to_hex(),
424                        [0; 32]
425                    );
426                    request = BlockchainRequest {
427                        latest_block_id: blockchain.get_latest_block_id(),
428                        latest_block_hash: blockchain.get_latest_block_hash(),
429                        fork_id: [0; 32],
430                    };
431                }
432            }
433            debug!("sending ghost chain request to peer : {:?}", peer_index);
434            buffer = Message::GhostChainRequest(
435                request.latest_block_id,
436                request.latest_block_hash,
437                request.fork_id,
438            )
439            .serialize();
440        } else {
441            let request;
442            if let Some(fork_id) = blockchain.generate_fork_id(blockchain.get_latest_block_id()) {
443                request = BlockchainRequest {
444                    latest_block_id: blockchain.get_latest_block_id(),
445                    latest_block_hash: blockchain.get_latest_block_hash(),
446                    fork_id,
447                };
448                debug!(
449                    "blockchain request 4 : latest_id: {:?} latest_hash: {:?} fork_id: {:?}",
450                    blockchain.get_latest_block_id(),
451                    blockchain.get_latest_block_hash().to_hex(),
452                    fork_id.to_hex()
453                );
454            } else {
455                request = BlockchainRequest {
456                    latest_block_id: blockchain.get_latest_block_id(),
457                    latest_block_hash: blockchain.get_latest_block_hash(),
458                    fork_id: [0; 32],
459                };
460                debug!(
461                    "blockchain request 5 : latest_id: {:?} latest_hash: {:?} fork_id: {:?}",
462                    blockchain.get_latest_block_id(),
463                    blockchain.get_latest_block_hash().to_hex(),
464                    [0; 32]
465                );
466            }
467            debug!("sending blockchain request to peer : {:?}", peer_index);
468            buffer = Message::BlockchainRequest(request).serialize();
469        }
470        // need to drop the reference here to avoid deadlocks.
471        // We need blockchain lock till here to avoid integrity issues
472        drop(blockchain);
473        drop(configs);
474        // trace!("releasing blockchain 1");
475
476        self.io_interface
477            .send_message(peer_index, buffer.as_slice())
478            .await
479            .unwrap();
480        trace!("blockchain request sent to peer : {:?}", peer_index);
481    }
482    pub async fn process_incoming_block_hash(
483        &mut self,
484        block_hash: SaitoHash,
485        block_id: BlockId,
486        peer_index: PeerIndex,
487        blockchain_lock: Arc<RwLock<Blockchain>>,
488        mempool_lock: Arc<RwLock<Mempool>>,
489    ) -> Option<()> {
490        trace!(
491            "fetching block : {:?}-{:?} from peer : {:?}",
492            block_id,
493            block_hash.to_hex(),
494            peer_index
495        );
496        let block_exists;
497        let my_public_key;
498
499        {
500            // trace!("locking blockchain 2");
501            let blockchain = blockchain_lock.read().await;
502            if blockchain.is_block_indexed(block_hash) {
503                block_exists = true;
504            } else {
505                let mempool = mempool_lock.read().await;
506                block_exists = mempool.blocks_queue.iter().any(|b| b.hash == block_hash);
507            }
508        }
509        // trace!("releasing blockchain 2");
510        {
511            let wallet = self.wallet_lock.read().await;
512            my_public_key = wallet.public_key;
513        }
514        if block_exists {
515            debug!(
516                "block : {:?}-{:?} already exists in chain. not fetching",
517                block_id,
518                block_hash.to_hex()
519            );
520            return None;
521        }
522        let url;
523        {
524            let configs = self.config_lock.read().await;
525            let peers = self.peer_lock.read().await;
526            let wallet = self.wallet_lock.read().await;
527
528            if let Some(peer) = peers.index_to_peers.get(&peer_index) {
529                if wallet.wallet_version > peer.wallet_version
530                    && peer.wallet_version != Version::new(0, 0, 0)
531                {
532                    warn!(
533                    "Not Fetching Block: {:?} from peer :{:?} since peer version is old. expected: {:?} actual {:?} ",
534                    block_hash.to_hex(), peer.index, wallet.wallet_version, peer.wallet_version
535                );
536                    return None;
537                }
538
539                if peer.block_fetch_url.is_empty() {
540                    debug!(
541                        "won't fetch block : {:?} from peer : {:?} since no url found",
542                        block_hash.to_hex(),
543                        peer_index
544                    );
545                    return None;
546                }
547                url = peer.get_block_fetch_url(block_hash, configs.is_spv_mode(), my_public_key);
548            } else {
549                warn!(
550                    "peer : {:?} is not in peer list. cannot generate the block fetch url",
551                    peer_index
552                );
553                return None;
554            }
555        }
556
557        debug!(
558            "fetching block for incoming hash : {:?}-{:?}",
559            block_id,
560            block_hash.to_hex()
561        );
562
563        if self
564            .io_interface
565            .fetch_block_from_peer(block_hash, peer_index, url.as_str(), block_id)
566            .await
567            .is_err()
568        {
569            // failed fetching block from peer
570            warn!(
571                "failed fetching block : {:?} for block hash. so unmarking block as fetching",
572                block_hash.to_hex()
573            );
574        }
575        Some(())
576    }
577
578    pub async fn initialize_static_peers(
579        &mut self,
580        configs_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
581    ) {
582        let configs = configs_lock.read().await;
583        let mut peers = self.peer_lock.write().await;
584
585        // TODO : can create a new disconnected peer with a is_static flag set. so we don't need to keep the static peers separately
586        configs
587            .get_peer_configs()
588            .clone()
589            .drain(..)
590            .for_each(|config| {
591                let mut peer = Peer::new(peers.peer_counter.get_next_index());
592
593                peer.static_peer_config = Some(config);
594
595                peers.index_to_peers.insert(peer.index, peer);
596            });
597
598        info!("added {:?} static peers", peers.index_to_peers.len());
599    }
600
601    pub async fn handle_new_stun_peer(
602        &mut self,
603        peer_index: PeerIndex,
604        public_key: SaitoPublicKey,
605    ) {
606        debug!(
607            "Adding STUN peer with index: {} and public key: {}",
608            peer_index,
609            public_key.to_base58()
610        );
611        let mut peers = self.peer_lock.write().await;
612        if peers.index_to_peers.contains_key(&peer_index) {
613            error!(
614                "Failed to add STUN peer: Peer with index {} already exists",
615                peer_index
616            );
617            return;
618        }
619        let peer = Peer::new_stun(peer_index, public_key, self.io_interface.as_ref());
620        peers.index_to_peers.insert(peer_index, peer);
621        peers.address_to_peers.insert(public_key, peer_index);
622        debug!("STUN peer added successfully");
623        self.io_interface
624            .send_interface_event(InterfaceEvent::StunPeerConnected(peer_index));
625    }
626
627    pub async fn remove_stun_peer(&mut self, peer_index: PeerIndex) {
628        debug!("Removing STUN peer with index: {}", peer_index);
629        let mut peers = self.peer_lock.write().await;
630        let peer_public_key: SaitoPublicKey;
631        if let Some(peer) = peers.index_to_peers.remove(&peer_index) {
632            if let Some(public_key) = peer.get_public_key() {
633                peer_public_key = public_key;
634                peers.address_to_peers.remove(&public_key);
635                debug!("STUN peer removed from network successfully");
636                self.io_interface
637                    .send_interface_event(InterfaceEvent::StunPeerDisconnected(
638                        peer_index,
639                        peer_public_key,
640                    ));
641            }
642        } else {
643            error!(
644                "Failed to remove STUN peer: Peer with index {} not found",
645                peer_index
646            );
647        }
648    }
649
650    pub async fn connect_to_static_peers(&mut self, current_time: Timestamp) {
651        // trace!("connecting to static peers...");
652        let mut peers = self.peer_lock.write().await;
653        for (peer_index, peer) in &mut peers.index_to_peers {
654            let url = peer.get_url();
655            if let PeerStatus::Disconnected(connect_time, period) = &mut peer.peer_status {
656                if current_time < *connect_time {
657                    continue;
658                }
659                if let Some(config) = peer.static_peer_config.as_ref() {
660                    info!(
661                        "trying to connect to static peer : {:?} with {:?}",
662                        peer_index, config
663                    );
664                    self.io_interface
665                        .connect_to_peer(url, peer.index)
666                        .await
667                        .unwrap();
668                    *period *= 2;
669                    *connect_time = current_time + *period;
670                }
671            }
672        }
673    }
674
675    pub async fn send_pings(&mut self) {
676        let current_time = self.timer.get_timestamp_in_ms();
677        let mut peers = self.peer_lock.write().await;
678        for (_, peer) in peers.index_to_peers.iter_mut() {
679            if peer.get_public_key().is_some() {
680                peer.send_ping(current_time, self.io_interface.as_ref())
681                    .await;
682            }
683        }
684    }
685
686    pub async fn update_peer_timer(&mut self, peer_index: PeerIndex) {
687        let mut peers = self.peer_lock.write().await;
688        let peer = peers.index_to_peers.get_mut(&peer_index);
689        if peer.is_none() {
690            return;
691        }
692        let peer = peer.unwrap();
693        peer.last_msg_at = self.timer.get_timestamp_in_ms();
694    }
695}
696
697#[cfg(test)]
698mod tests {
699    use super::*;
700    use crate::core::util::{configuration::PeerConfig, test::test_manager};
701    use rand::Rng;
702
703    #[serial_test::serial]
704    #[tokio::test]
705    async fn test_keylist_rate_limiter() {
706        let mut t1 = test_manager::test::TestManager::default();
707        let limit: u64 = 10;
708        let peer2_index: u64 = 0;
709        let mut peer2;
710
711        {
712            let mut peers = t1.network.peer_lock.write().await;
713
714            peer2 = Peer::new(peer2_index);
715
716            peer2.key_list_limiter.set_limit(limit);
717            let peer_data = PeerConfig {
718                host: String::from(""),
719                port: 8080,
720                protocol: String::from(""),
721                synctype: String::from(""),
722                // is_main: true,
723            };
724
725            peer2.static_peer_config = Some(peer_data);
726            peers.index_to_peers.insert(peer2_index, peer2.clone());
727            println!("peer count = {:?}", peers.index_to_peers.len());
728        }
729
730        for i in 0..40 {
731            let key_list: Vec<SaitoPublicKey> = (0..11)
732                .map(|_| {
733                    let mut key = [0u8; 33];
734                    rand::thread_rng().fill(&mut key[..]);
735                    key
736                })
737                .collect();
738
739            let result = t1
740                .network
741                .handle_received_key_list(peer2_index, key_list)
742                .await;
743
744            dbg!(&result);
745
746            if i < limit {
747                assert!(result.is_ok(), "Expected Ok, got {:?}", result);
748            } else {
749                assert!(result.is_err(), "Expected Err, got {:?}", result);
750            }
751        }
752        dbg!(peer2);
753    }
754}