saito_core/core/consensus/peers/
peer_collection.rs

1use crate::core::consensus::peers::peer::{Peer, PeerStatus};
2use crate::core::consensus::peers::peer_state_writer::PeerStateWriter;
3use crate::core::defs::{PeerIndex, PrintForLog, SaitoPublicKey, Timestamp};
4use log::{debug, info};
5use std::collections::HashMap;
6use std::time::Duration;
7
8const PEER_REMOVAL_WINDOW: Timestamp = Duration::from_secs(600).as_millis() as Timestamp;
9
10#[derive(Clone, Debug, Default)]
11pub struct PeerCounter {
12    counter: PeerIndex,
13}
14
15impl PeerCounter {
16    pub fn get_next_index(&mut self) -> PeerIndex {
17        self.counter += 1;
18        self.counter
19    }
20}
21#[derive(Debug, Clone, Default)]
22pub struct PeerCollection {
23    pub index_to_peers: HashMap<PeerIndex, Peer>,
24    pub address_to_peers: HashMap<SaitoPublicKey, PeerIndex>,
25    pub peer_counter: PeerCounter,
26    pub(crate) peer_state_writer: PeerStateWriter,
27}
28
29impl PeerCollection {
30    pub fn find_peer_by_address(&self, address: &SaitoPublicKey) -> Option<&Peer> {
31        let result = self.address_to_peers.get(address)?;
32
33        self.find_peer_by_index(*result)
34    }
35    pub fn find_peer_by_address_mut(&mut self, address: &SaitoPublicKey) -> Option<&mut Peer> {
36        let result = self.address_to_peers.get(address)?;
37
38        self.find_peer_by_index_mut(*result)
39    }
40
41    pub fn find_peer_by_index(&self, peer_index: u64) -> Option<&Peer> {
42        self.index_to_peers.get(&peer_index)
43    }
44    pub fn find_peer_by_index_mut(&mut self, peer_index: u64) -> Option<&mut Peer> {
45        self.index_to_peers.get_mut(&peer_index)
46    }
47
48    pub fn remove_reconnected_peer(&mut self, public_key: &SaitoPublicKey) -> Option<Peer> {
49        let mut peer_index = None;
50        {
51            for (index, peer) in self.index_to_peers.iter() {
52                if let Some(key) = &peer.public_key {
53                    if *key == *public_key {
54                        if let PeerStatus::Connected = peer.peer_status {
55                            debug!(
56                                "peer : {:?} with key : {:?} is already connected",
57                                peer.index,
58                                public_key.to_base58()
59                            );
60                            // since peer is already connected
61                            continue;
62                        }
63                        debug!("old peer found for key : {:?}", public_key.to_base58());
64                        peer_index = Some(*index);
65                        break;
66                    }
67                }
68            }
69            if peer_index.is_none() {
70                debug!("peer with key : {:?} not found", public_key.to_base58());
71                return None;
72            }
73        }
74
75        let peer = self.index_to_peers.remove(&peer_index.unwrap())?;
76        self.address_to_peers.remove(&peer.public_key?);
77        debug!(
78            "removed reconnected peer : {:?} with key : {:?}. current peer count : {:?}",
79            peer_index,
80            peer.public_key.unwrap().to_base58(),
81            self.index_to_peers.len()
82        );
83
84        Some(peer)
85    }
86
87    pub fn remove_disconnected_peers(&mut self, current_time: Timestamp) {
88        let peer_indices: Vec<PeerIndex> = self
89            .index_to_peers
90            .iter()
91            .filter_map(|(peer_index, peer)| {
92                if peer.static_peer_config.is_some() {
93                    // static peers always remain in memory
94                    return None;
95                }
96                if peer.is_stun_peer() {
97                    // stun peers remain unless explicity removed
98                    return None;
99                }
100                if peer.disconnected_at == Timestamp::MAX
101                    || peer.disconnected_at + PEER_REMOVAL_WINDOW > current_time
102                {
103                    return None;
104                }
105                info!(
106                    "removing peer : {:?} as peer hasn't connected for more than {:?} seconds",
107                    peer_index,
108                    Duration::from_millis(current_time - peer.disconnected_at).as_secs()
109                );
110                Some(*peer_index)
111            })
112            .collect();
113
114        for peer_index in peer_indices {
115            let peer = self.index_to_peers.remove(&peer_index).unwrap();
116            if let Some(public_key) = peer.get_public_key() {
117                self.address_to_peers.remove(&public_key);
118            }
119        }
120    }
121}