saito_core/core/consensus/peers/
peer.rs

1use crate::core::consensus::peers::peer_service::PeerService;
2use crate::core::consensus::peers::rate_limiter::RateLimiter;
3use crate::core::consensus::wallet::Wallet;
4use crate::core::defs::{
5    PeerIndex, PrintForLog, SaitoHash, SaitoPublicKey, Timestamp, WS_KEEP_ALIVE_PERIOD,
6};
7use crate::core::io::interface_io::{InterfaceEvent, InterfaceIO};
8use crate::core::msg::handshake::{HandshakeChallenge, HandshakeResponse};
9use crate::core::msg::message::Message;
10use crate::core::process::version::Version;
11use crate::core::util;
12use crate::core::util::configuration::Configuration;
13use crate::core::util::crypto::{generate_random_bytes, sign, verify};
14use log::{debug, info, trace, warn};
15use std::cmp::Ordering;
16use std::io::{Error, ErrorKind};
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::RwLock;
20
21#[derive(Clone, Debug)]
22pub enum PeerType {
23    Default,
24    Stun,
25}
26
27#[derive(Clone, Debug)]
28pub enum PeerStatus {
29    Disconnected(
30        Timestamp, /*next connection time*/
31        Timestamp, /*reconnection period*/
32    ),
33    Connecting,
34    Connected,
35}
36
37#[derive(Debug, Clone)]
38pub struct Peer {
39    pub index: PeerIndex,
40    pub peer_status: PeerStatus,
41    pub block_fetch_url: String,
42    // if this is None(), it means an incoming connection. else a connection which we started from the data from config file
43    pub static_peer_config: Option<util::configuration::PeerConfig>,
44    pub challenge_for_peer: Option<SaitoHash>,
45    pub key_list: Vec<SaitoPublicKey>,
46    pub services: Vec<PeerService>,
47    pub last_msg_at: Timestamp,
48    pub disconnected_at: Timestamp,
49    pub wallet_version: Version,
50    pub core_version: Version,
51    // NOTE: we are currently mapping 1 peer = 1 socket = 1 public key. But in the future we need to support multiple peers per public key
52    // so some of these limiters might have to be handled from a different place than the peer. (Eg : Account struct?)
53    pub key_list_limiter: RateLimiter,
54    pub handshake_limiter: RateLimiter,
55    pub message_limiter: RateLimiter,
56    pub invalid_block_limiter: RateLimiter,
57    pub public_key: Option<SaitoPublicKey>,
58    pub peer_type: PeerType,
59    pub ip_address: Option<String>,
60}
61
62impl Peer {
63    pub fn new(peer_index: PeerIndex) -> Peer {
64        Peer {
65            index: peer_index,
66            peer_status: PeerStatus::Disconnected(0, 1_000),
67            block_fetch_url: "".to_string(),
68            static_peer_config: None,
69            challenge_for_peer: None,
70            key_list: vec![],
71            services: vec![],
72            last_msg_at: 0,
73            disconnected_at: Timestamp::MAX,
74            wallet_version: Default::default(),
75            core_version: Default::default(),
76            key_list_limiter: RateLimiter::builder(100, Duration::from_secs(60)),
77            handshake_limiter: RateLimiter::builder(100, Duration::from_secs(60)),
78            message_limiter: RateLimiter::builder(100_000, Duration::from_secs(1)),
79            invalid_block_limiter: RateLimiter::builder(10, Duration::from_secs(3600)),
80            public_key: None,
81            peer_type: PeerType::Default,
82            ip_address: None,
83        }
84    }
85
86    pub fn new_stun(
87        peer_index: PeerIndex,
88        public_key: SaitoPublicKey,
89        io_handler: &(dyn InterfaceIO + Send + Sync),
90    ) -> Peer {
91        let mut peer = Peer::new(peer_index);
92        peer.peer_type = PeerType::Stun;
93        peer.public_key = Some(public_key);
94        peer.peer_status = PeerStatus::Connected;
95        peer.services = io_handler.get_my_services();
96        peer
97    }
98
99    pub fn is_stun_peer(&self) -> bool {
100        matches!(self.peer_type, PeerType::Stun)
101    }
102
103    pub fn has_key_list_limit_exceeded(&mut self, current_time: Timestamp) -> bool {
104        self.key_list_limiter.has_limit_exceeded(current_time)
105    }
106    pub fn has_handshake_limit_exceeded(&mut self, current_time: Timestamp) -> bool {
107        self.handshake_limiter.has_limit_exceeded(current_time)
108    }
109
110    pub fn has_message_limit_exceeded(&mut self, current_time: Timestamp) -> bool {
111        self.message_limiter.has_limit_exceeded(current_time)
112    }
113
114    pub fn has_invalid_block_limit_exceeded(&mut self, current_time: Timestamp) -> bool {
115        self.invalid_block_limiter.has_limit_exceeded(current_time)
116    }
117    pub fn get_limited_till(&mut self, current_time: Timestamp) -> Option<Timestamp> {
118        let result = None;
119
120        if self.has_key_list_limit_exceeded(current_time) {
121            if self.key_list_limiter.has_limit_exceeded(current_time) {}
122        }
123
124        result
125    }
126
127    pub fn get_url(&self) -> String {
128        if let Some(config) = self.static_peer_config.as_ref() {
129            let mut protocol: String = String::from("ws");
130            if config.protocol == "https" {
131                protocol = String::from("wss");
132            }
133            protocol
134                + "://"
135                + config.host.as_str()
136                + ":"
137                + config.port.to_string().as_str()
138                + "/wsopen"
139        } else {
140            "".to_string()
141        }
142    }
143    pub async fn initiate_handshake(
144        &mut self,
145        io_handler: &(dyn InterfaceIO + Send + Sync),
146    ) -> Result<(), Error> {
147        debug!("initiating handshake : {:?}", self.index);
148
149        let challenge = HandshakeChallenge {
150            challenge: generate_random_bytes(32).await.try_into().unwrap(),
151        };
152        debug!(
153            "generated challenge : {:?} for peer : {:?}",
154            challenge.challenge.to_hex(),
155            self.index
156        );
157        self.challenge_for_peer = Some(challenge.challenge);
158        let message = Message::HandshakeChallenge(challenge);
159        io_handler
160            .send_message(self.index, message.serialize().as_slice())
161            .await?;
162        debug!("handshake challenge sent for peer: {:?}", self.index);
163
164        Ok(())
165    }
166
167    pub async fn handle_handshake_challenge(
168        &mut self,
169        challenge: HandshakeChallenge,
170        io_handler: &(dyn InterfaceIO + Send + Sync),
171        wallet_lock: Arc<RwLock<Wallet>>,
172        configs_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
173    ) -> Result<(), Error> {
174        debug!(
175            "handling handshake challenge : {:?} for peer : {:?}",
176            challenge.challenge.to_hex(),
177            self.index,
178        );
179        let block_fetch_url;
180        let is_lite;
181        {
182            let configs = configs_lock.read().await;
183
184            is_lite = configs.is_spv_mode();
185            if is_lite {
186                block_fetch_url = "".to_string();
187            } else {
188                block_fetch_url = configs.get_block_fetch_url();
189            }
190        }
191
192        let wallet = wallet_lock.read().await;
193        let response = HandshakeResponse {
194            public_key: wallet.public_key,
195            signature: sign(challenge.challenge.as_slice(), &wallet.private_key),
196            challenge: generate_random_bytes(32).await.try_into().unwrap(),
197            is_lite,
198            block_fetch_url,
199            services: io_handler.get_my_services(),
200            wallet_version: wallet.wallet_version,
201            core_version: wallet.core_version,
202        };
203        debug!(
204            "handshake challenge : {:?} generated for peer : {:?}",
205            response.challenge.to_hex(),
206            self.index
207        );
208
209        self.challenge_for_peer = Some(response.challenge);
210        io_handler
211            .send_message(
212                self.index,
213                Message::HandshakeResponse(response).serialize().as_slice(),
214            )
215            .await?;
216        debug!("first handshake response sent for peer: {:?}", self.index);
217
218        Ok(())
219    }
220    pub async fn handle_handshake_response(
221        &mut self,
222        response: HandshakeResponse,
223        io_handler: &(dyn InterfaceIO + Send + Sync),
224        wallet_lock: Arc<RwLock<Wallet>>,
225        configs_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
226        current_time: Timestamp,
227    ) -> Result<(), Error> {
228        debug!(
229            "handling handshake response :{:?} for peer : {:?} with address : {:?}",
230            response.challenge.to_hex(),
231            self.index,
232            response.public_key.to_base58()
233        );
234        if !response.core_version.is_set() {
235            debug!(
236                "core version is not set in handshake response. expected : {:?}",
237                wallet_lock.read().await.core_version
238            );
239            self.mark_as_disconnected(current_time);
240            io_handler.disconnect_from_peer(self.index).await?;
241            return Err(Error::from(ErrorKind::InvalidInput));
242        }
243        if self.challenge_for_peer.is_none() {
244            warn!(
245                "we don't have a challenge to verify for peer : {:?}",
246                self.index
247            );
248            self.mark_as_disconnected(current_time);
249            io_handler.disconnect_from_peer(self.index).await?;
250            return Err(Error::from(ErrorKind::InvalidInput));
251        }
252        // TODO : validate block fetch URL
253        let sent_challenge = self.challenge_for_peer.unwrap();
254        let result = verify(&sent_challenge, &response.signature, &response.public_key);
255        if !result {
256            warn!(
257                "handshake failed. signature is not valid. sig : {:?} challenge : {:?} key : {:?}",
258                response.signature.to_hex(),
259                sent_challenge.to_hex(),
260                response.public_key.to_base58()
261            );
262            self.mark_as_disconnected(current_time);
263            io_handler.disconnect_from_peer(self.index).await?;
264            return Err(Error::from(ErrorKind::InvalidInput));
265        }
266
267        let block_fetch_url;
268        let is_lite;
269        {
270            let configs = configs_lock.read().await;
271
272            is_lite = configs.is_spv_mode();
273            if is_lite {
274                block_fetch_url = "".to_string();
275            } else {
276                block_fetch_url = configs.get_block_fetch_url();
277            }
278        }
279        let wallet = wallet_lock.read().await;
280
281        if !wallet
282            .core_version
283            .is_same_minor_version(&response.core_version)
284        {
285            warn!("peer : {:?} core version is not compatible. current core version : {:?} peer core version : {:?}",
286                self.index, wallet.core_version, response.core_version);
287            io_handler.send_interface_event(InterfaceEvent::NewVersionDetected(
288                self.index,
289                response.wallet_version,
290            ));
291            self.mark_as_disconnected(current_time);
292            io_handler.disconnect_from_peer(self.index).await?;
293            return Err(Error::from(ErrorKind::InvalidInput));
294        }
295
296        if self.public_key.is_some() {
297            assert_eq!(
298                response.public_key,
299                self.public_key.unwrap(),
300                "This peer instance is to handle a peer with a different public key"
301            );
302        }
303
304        self.block_fetch_url = response.block_fetch_url;
305        self.services = response.services;
306        self.wallet_version = response.wallet_version;
307        self.core_version = response.core_version;
308        self.peer_status = PeerStatus::Connected;
309        self.public_key = Some(response.public_key);
310
311        debug!(
312            "my version : {:?} peer version : {:?}",
313            wallet.wallet_version, response.wallet_version
314        );
315        if wallet.wallet_version < response.wallet_version {
316            io_handler.send_interface_event(InterfaceEvent::NewVersionDetected(
317                self.index,
318                response.wallet_version,
319            ));
320        }
321
322        if self.static_peer_config.is_none() {
323            // this is only called in initiator's side.
324            // [1. A:challenge -> 2. B:response -> 3. A : response|B verified -> 4. B: A verified]
325            // we only need to send a response for response is in above stage 3 (meaning the challenger).
326
327            let response = HandshakeResponse {
328                public_key: wallet.public_key,
329                signature: sign(&response.challenge, &wallet.private_key),
330                is_lite,
331                block_fetch_url: block_fetch_url.to_string(),
332                challenge: [0; 32],
333                services: io_handler.get_my_services(),
334                wallet_version: wallet.wallet_version,
335                core_version: wallet.core_version,
336            };
337            io_handler
338                .send_message(
339                    self.index,
340                    Message::HandshakeResponse(response).serialize().as_slice(),
341                )
342                .await?;
343            debug!("second handshake response sent for peer: {:?}", self.index);
344        } else {
345            info!(
346                "handshake completed for peer : {:?}",
347                self.get_public_key().unwrap().to_base58()
348            );
349        }
350        self.challenge_for_peer = None;
351
352        io_handler.send_interface_event(InterfaceEvent::PeerHandshakeComplete(self.index));
353
354        Ok(())
355    }
356    /// Since each peer have a different url for a block to be fetched, this function will generate the correct url from a given block hash
357    ///
358    /// # Arguments
359    ///
360    /// * `block_hash`: hash of the block to be fetched
361    ///
362    /// returns: String
363    ///
364    /// # Examples
365    ///
366    /// ```
367    ///
368    /// ```
369    pub fn get_block_fetch_url(
370        &self,
371        block_hash: SaitoHash,
372        lite: bool,
373        my_public_key: SaitoPublicKey,
374    ) -> String {
375        // TODO : generate the url with proper / escapes,etc...
376        if lite {
377            self.block_fetch_url.to_string()
378                + "/lite-block/"
379                + block_hash.to_hex().as_str()
380                + "/"
381                + my_public_key.to_base58().as_str()
382        } else {
383            self.block_fetch_url.to_string() + "/block/" + block_hash.to_hex().as_str()
384        }
385    }
386    pub async fn send_ping(
387        &mut self,
388        current_time: Timestamp,
389        io_handler: &(dyn InterfaceIO + Send + Sync),
390    ) {
391        if self.last_msg_at + WS_KEEP_ALIVE_PERIOD < current_time {
392            self.last_msg_at = current_time;
393            trace!("sending ping to peer : {:?}", self.index);
394            io_handler
395                .send_message(self.index, Message::Ping().serialize().as_slice())
396                .await
397                .unwrap();
398        }
399    }
400    pub fn has_service(&self, service: String) -> bool {
401        self.services.iter().any(|s| s.service == service)
402    }
403
404    pub fn compare_version(&self, version: &Version) -> Option<Ordering> {
405        // for peer versions, if the version is not set we still consider it as a valid peer
406        // TODO : this could lead to an attack. need to provide different versions for different layer components
407        if !version.is_set() || !self.wallet_version.is_set() {
408            return Some(Ordering::Equal);
409        }
410        self.wallet_version.partial_cmp(version)
411    }
412
413    pub fn is_static_peer(&self) -> bool {
414        self.static_peer_config.is_some()
415    }
416    pub fn get_public_key(&self) -> Option<SaitoPublicKey> {
417        self.public_key
418    }
419
420    pub fn mark_as_disconnected(&mut self, disconnected_at: Timestamp) {
421        self.challenge_for_peer = None;
422        self.services = vec![];
423        info!(
424            "marking peer : {:?} as disconnected. at : {:?}",
425            self.index, disconnected_at
426        );
427        self.disconnected_at = disconnected_at;
428
429        if let PeerStatus::Disconnected(_, _) = self.peer_status {
430        } else {
431            self.peer_status = PeerStatus::Disconnected(0, 1_000);
432        }
433    }
434
435    /// Copies data from an old peer instance to a new reconnected peer
436    ///
437    /// # Arguments
438    ///
439    /// * `peer`:
440    ///
441    /// returns: ()
442    ///
443    /// # Examples
444    ///
445    /// ```
446    ///
447    /// ```
448    pub(crate) fn join_as_reconnection(&mut self, peer: Peer) {
449        assert!(
450            !matches!(peer.peer_status, PeerStatus::Connected),
451            "Old peer should not be already connected"
452        );
453        info!(
454            "joining peer : {:?} to peer : {:?} as a reconnection",
455            peer.index, self.index
456        );
457
458        self.message_limiter = peer.message_limiter;
459        self.handshake_limiter = peer.handshake_limiter;
460        self.key_list_limiter = peer.key_list_limiter;
461        self.disconnected_at = Timestamp::MAX;
462
463        self.static_peer_config = peer.static_peer_config;
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use crate::core::consensus::peers::peer::{Peer, PeerStatus};
470    use crate::core::process::version::Version;
471    use std::cmp::Ordering;
472
473    #[test]
474    fn peer_new_test() {
475        let peer = Peer::new(1);
476
477        assert_eq!(peer.index, 1);
478        assert_eq!(peer.get_public_key(), None);
479        assert!(matches!(
480            peer.peer_status,
481            PeerStatus::Disconnected(0, 1_000)
482        ));
483        assert_eq!(peer.block_fetch_url, "".to_string());
484        assert_eq!(peer.static_peer_config, None);
485        assert_eq!(peer.challenge_for_peer, None);
486    }
487
488    #[test]
489    fn peer_compare_test() {
490        let peer_1 = Peer::new(1);
491        let mut peer_2 = Peer::new(2);
492        let mut peer_3 = Peer::new(3);
493        let mut peer_4 = Peer::new(4);
494
495        assert_eq!(peer_1.wallet_version, Version::new(0, 0, 0));
496
497        peer_2.wallet_version = Version::new(0, 0, 1);
498
499        peer_3.wallet_version = Version::new(0, 1, 0);
500
501        peer_4.wallet_version = Version::new(1, 0, 0);
502
503        assert_eq!(
504            peer_1.compare_version(&peer_2.wallet_version),
505            Some(Ordering::Equal)
506        );
507        assert_eq!(
508            peer_2.compare_version(&peer_1.wallet_version),
509            Some(Ordering::Equal)
510        );
511
512        assert_eq!(
513            peer_3.compare_version(&peer_2.wallet_version),
514            Some(Ordering::Greater)
515        );
516        assert_eq!(
517            peer_2.compare_version(&peer_3.wallet_version),
518            Some(Ordering::Less)
519        );
520
521        assert_eq!(
522            peer_3.compare_version(&peer_4.wallet_version),
523            Some(Ordering::Less)
524        );
525        assert_eq!(
526            peer_4.compare_version(&peer_3.wallet_version),
527            Some(Ordering::Greater)
528        );
529
530        assert_eq!(
531            peer_3.compare_version(&peer_3.wallet_version),
532            Some(Ordering::Equal)
533        );
534        assert_eq!(
535            peer_1.compare_version(&peer_1.wallet_version),
536            Some(Ordering::Equal)
537        );
538    }
539}