1use crate::core::consensus::peers::peer_service::PeerService;
2use crate::core::consensus::peers::rate_limiter::RateLimiter;
3use crate::core::consensus::wallet::Wallet;
4use crate::core::defs::{
5 PeerIndex, PrintForLog, SaitoHash, SaitoPublicKey, Timestamp, WS_KEEP_ALIVE_PERIOD,
6};
7use crate::core::io::interface_io::{InterfaceEvent, InterfaceIO};
8use crate::core::msg::handshake::{HandshakeChallenge, HandshakeResponse};
9use crate::core::msg::message::Message;
10use crate::core::process::version::Version;
11use crate::core::util;
12use crate::core::util::configuration::Configuration;
13use crate::core::util::crypto::{generate_random_bytes, sign, verify};
14use log::{debug, info, trace, warn};
15use std::cmp::Ordering;
16use std::io::{Error, ErrorKind};
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::RwLock;
20
21#[derive(Clone, Debug)]
22pub enum PeerType {
23 Default,
24 Stun,
25}
26
27#[derive(Clone, Debug)]
28pub enum PeerStatus {
29 Disconnected(
30 Timestamp, Timestamp, ),
33 Connecting,
34 Connected,
35}
36
37#[derive(Debug, Clone)]
38pub struct Peer {
39 pub index: PeerIndex,
40 pub peer_status: PeerStatus,
41 pub block_fetch_url: String,
42 pub static_peer_config: Option<util::configuration::PeerConfig>,
44 pub challenge_for_peer: Option<SaitoHash>,
45 pub key_list: Vec<SaitoPublicKey>,
46 pub services: Vec<PeerService>,
47 pub last_msg_at: Timestamp,
48 pub disconnected_at: Timestamp,
49 pub wallet_version: Version,
50 pub core_version: Version,
51 pub key_list_limiter: RateLimiter,
54 pub handshake_limiter: RateLimiter,
55 pub message_limiter: RateLimiter,
56 pub invalid_block_limiter: RateLimiter,
57 pub public_key: Option<SaitoPublicKey>,
58 pub peer_type: PeerType,
59 pub ip_address: Option<String>,
60}
61
62impl Peer {
63 pub fn new(peer_index: PeerIndex) -> Peer {
64 Peer {
65 index: peer_index,
66 peer_status: PeerStatus::Disconnected(0, 1_000),
67 block_fetch_url: "".to_string(),
68 static_peer_config: None,
69 challenge_for_peer: None,
70 key_list: vec![],
71 services: vec![],
72 last_msg_at: 0,
73 disconnected_at: Timestamp::MAX,
74 wallet_version: Default::default(),
75 core_version: Default::default(),
76 key_list_limiter: RateLimiter::builder(100, Duration::from_secs(60)),
77 handshake_limiter: RateLimiter::builder(100, Duration::from_secs(60)),
78 message_limiter: RateLimiter::builder(100_000, Duration::from_secs(1)),
79 invalid_block_limiter: RateLimiter::builder(10, Duration::from_secs(3600)),
80 public_key: None,
81 peer_type: PeerType::Default,
82 ip_address: None,
83 }
84 }
85
86 pub fn new_stun(
87 peer_index: PeerIndex,
88 public_key: SaitoPublicKey,
89 io_handler: &(dyn InterfaceIO + Send + Sync),
90 ) -> Peer {
91 let mut peer = Peer::new(peer_index);
92 peer.peer_type = PeerType::Stun;
93 peer.public_key = Some(public_key);
94 peer.peer_status = PeerStatus::Connected;
95 peer.services = io_handler.get_my_services();
96 peer
97 }
98
99 pub fn is_stun_peer(&self) -> bool {
100 matches!(self.peer_type, PeerType::Stun)
101 }
102
103 pub fn has_key_list_limit_exceeded(&mut self, current_time: Timestamp) -> bool {
104 self.key_list_limiter.has_limit_exceeded(current_time)
105 }
106 pub fn has_handshake_limit_exceeded(&mut self, current_time: Timestamp) -> bool {
107 self.handshake_limiter.has_limit_exceeded(current_time)
108 }
109
110 pub fn has_message_limit_exceeded(&mut self, current_time: Timestamp) -> bool {
111 self.message_limiter.has_limit_exceeded(current_time)
112 }
113
114 pub fn has_invalid_block_limit_exceeded(&mut self, current_time: Timestamp) -> bool {
115 self.invalid_block_limiter.has_limit_exceeded(current_time)
116 }
117 pub fn get_limited_till(&mut self, current_time: Timestamp) -> Option<Timestamp> {
118 let result = None;
119
120 if self.has_key_list_limit_exceeded(current_time) {
121 if self.key_list_limiter.has_limit_exceeded(current_time) {}
122 }
123
124 result
125 }
126
127 pub fn get_url(&self) -> String {
128 if let Some(config) = self.static_peer_config.as_ref() {
129 let mut protocol: String = String::from("ws");
130 if config.protocol == "https" {
131 protocol = String::from("wss");
132 }
133 protocol
134 + "://"
135 + config.host.as_str()
136 + ":"
137 + config.port.to_string().as_str()
138 + "/wsopen"
139 } else {
140 "".to_string()
141 }
142 }
143 pub async fn initiate_handshake(
144 &mut self,
145 io_handler: &(dyn InterfaceIO + Send + Sync),
146 ) -> Result<(), Error> {
147 debug!("initiating handshake : {:?}", self.index);
148
149 let challenge = HandshakeChallenge {
150 challenge: generate_random_bytes(32).await.try_into().unwrap(),
151 };
152 debug!(
153 "generated challenge : {:?} for peer : {:?}",
154 challenge.challenge.to_hex(),
155 self.index
156 );
157 self.challenge_for_peer = Some(challenge.challenge);
158 let message = Message::HandshakeChallenge(challenge);
159 io_handler
160 .send_message(self.index, message.serialize().as_slice())
161 .await?;
162 debug!("handshake challenge sent for peer: {:?}", self.index);
163
164 Ok(())
165 }
166
167 pub async fn handle_handshake_challenge(
168 &mut self,
169 challenge: HandshakeChallenge,
170 io_handler: &(dyn InterfaceIO + Send + Sync),
171 wallet_lock: Arc<RwLock<Wallet>>,
172 configs_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
173 ) -> Result<(), Error> {
174 debug!(
175 "handling handshake challenge : {:?} for peer : {:?}",
176 challenge.challenge.to_hex(),
177 self.index,
178 );
179 let block_fetch_url;
180 let is_lite;
181 {
182 let configs = configs_lock.read().await;
183
184 is_lite = configs.is_spv_mode();
185 if is_lite {
186 block_fetch_url = "".to_string();
187 } else {
188 block_fetch_url = configs.get_block_fetch_url();
189 }
190 }
191
192 let wallet = wallet_lock.read().await;
193 let response = HandshakeResponse {
194 public_key: wallet.public_key,
195 signature: sign(challenge.challenge.as_slice(), &wallet.private_key),
196 challenge: generate_random_bytes(32).await.try_into().unwrap(),
197 is_lite,
198 block_fetch_url,
199 services: io_handler.get_my_services(),
200 wallet_version: wallet.wallet_version,
201 core_version: wallet.core_version,
202 };
203 debug!(
204 "handshake challenge : {:?} generated for peer : {:?}",
205 response.challenge.to_hex(),
206 self.index
207 );
208
209 self.challenge_for_peer = Some(response.challenge);
210 io_handler
211 .send_message(
212 self.index,
213 Message::HandshakeResponse(response).serialize().as_slice(),
214 )
215 .await?;
216 debug!("first handshake response sent for peer: {:?}", self.index);
217
218 Ok(())
219 }
220 pub async fn handle_handshake_response(
221 &mut self,
222 response: HandshakeResponse,
223 io_handler: &(dyn InterfaceIO + Send + Sync),
224 wallet_lock: Arc<RwLock<Wallet>>,
225 configs_lock: Arc<RwLock<dyn Configuration + Send + Sync>>,
226 current_time: Timestamp,
227 ) -> Result<(), Error> {
228 debug!(
229 "handling handshake response :{:?} for peer : {:?} with address : {:?}",
230 response.challenge.to_hex(),
231 self.index,
232 response.public_key.to_base58()
233 );
234 if !response.core_version.is_set() {
235 debug!(
236 "core version is not set in handshake response. expected : {:?}",
237 wallet_lock.read().await.core_version
238 );
239 self.mark_as_disconnected(current_time);
240 io_handler.disconnect_from_peer(self.index).await?;
241 return Err(Error::from(ErrorKind::InvalidInput));
242 }
243 if self.challenge_for_peer.is_none() {
244 warn!(
245 "we don't have a challenge to verify for peer : {:?}",
246 self.index
247 );
248 self.mark_as_disconnected(current_time);
249 io_handler.disconnect_from_peer(self.index).await?;
250 return Err(Error::from(ErrorKind::InvalidInput));
251 }
252 let sent_challenge = self.challenge_for_peer.unwrap();
254 let result = verify(&sent_challenge, &response.signature, &response.public_key);
255 if !result {
256 warn!(
257 "handshake failed. signature is not valid. sig : {:?} challenge : {:?} key : {:?}",
258 response.signature.to_hex(),
259 sent_challenge.to_hex(),
260 response.public_key.to_base58()
261 );
262 self.mark_as_disconnected(current_time);
263 io_handler.disconnect_from_peer(self.index).await?;
264 return Err(Error::from(ErrorKind::InvalidInput));
265 }
266
267 let block_fetch_url;
268 let is_lite;
269 {
270 let configs = configs_lock.read().await;
271
272 is_lite = configs.is_spv_mode();
273 if is_lite {
274 block_fetch_url = "".to_string();
275 } else {
276 block_fetch_url = configs.get_block_fetch_url();
277 }
278 }
279 let wallet = wallet_lock.read().await;
280
281 if !wallet
282 .core_version
283 .is_same_minor_version(&response.core_version)
284 {
285 warn!("peer : {:?} core version is not compatible. current core version : {:?} peer core version : {:?}",
286 self.index, wallet.core_version, response.core_version);
287 io_handler.send_interface_event(InterfaceEvent::NewVersionDetected(
288 self.index,
289 response.wallet_version,
290 ));
291 self.mark_as_disconnected(current_time);
292 io_handler.disconnect_from_peer(self.index).await?;
293 return Err(Error::from(ErrorKind::InvalidInput));
294 }
295
296 if self.public_key.is_some() {
297 assert_eq!(
298 response.public_key,
299 self.public_key.unwrap(),
300 "This peer instance is to handle a peer with a different public key"
301 );
302 }
303
304 self.block_fetch_url = response.block_fetch_url;
305 self.services = response.services;
306 self.wallet_version = response.wallet_version;
307 self.core_version = response.core_version;
308 self.peer_status = PeerStatus::Connected;
309 self.public_key = Some(response.public_key);
310
311 debug!(
312 "my version : {:?} peer version : {:?}",
313 wallet.wallet_version, response.wallet_version
314 );
315 if wallet.wallet_version < response.wallet_version {
316 io_handler.send_interface_event(InterfaceEvent::NewVersionDetected(
317 self.index,
318 response.wallet_version,
319 ));
320 }
321
322 if self.static_peer_config.is_none() {
323 let response = HandshakeResponse {
328 public_key: wallet.public_key,
329 signature: sign(&response.challenge, &wallet.private_key),
330 is_lite,
331 block_fetch_url: block_fetch_url.to_string(),
332 challenge: [0; 32],
333 services: io_handler.get_my_services(),
334 wallet_version: wallet.wallet_version,
335 core_version: wallet.core_version,
336 };
337 io_handler
338 .send_message(
339 self.index,
340 Message::HandshakeResponse(response).serialize().as_slice(),
341 )
342 .await?;
343 debug!("second handshake response sent for peer: {:?}", self.index);
344 } else {
345 info!(
346 "handshake completed for peer : {:?}",
347 self.get_public_key().unwrap().to_base58()
348 );
349 }
350 self.challenge_for_peer = None;
351
352 io_handler.send_interface_event(InterfaceEvent::PeerHandshakeComplete(self.index));
353
354 Ok(())
355 }
356 pub fn get_block_fetch_url(
370 &self,
371 block_hash: SaitoHash,
372 lite: bool,
373 my_public_key: SaitoPublicKey,
374 ) -> String {
375 if lite {
377 self.block_fetch_url.to_string()
378 + "/lite-block/"
379 + block_hash.to_hex().as_str()
380 + "/"
381 + my_public_key.to_base58().as_str()
382 } else {
383 self.block_fetch_url.to_string() + "/block/" + block_hash.to_hex().as_str()
384 }
385 }
386 pub async fn send_ping(
387 &mut self,
388 current_time: Timestamp,
389 io_handler: &(dyn InterfaceIO + Send + Sync),
390 ) {
391 if self.last_msg_at + WS_KEEP_ALIVE_PERIOD < current_time {
392 self.last_msg_at = current_time;
393 trace!("sending ping to peer : {:?}", self.index);
394 io_handler
395 .send_message(self.index, Message::Ping().serialize().as_slice())
396 .await
397 .unwrap();
398 }
399 }
400 pub fn has_service(&self, service: String) -> bool {
401 self.services.iter().any(|s| s.service == service)
402 }
403
404 pub fn compare_version(&self, version: &Version) -> Option<Ordering> {
405 if !version.is_set() || !self.wallet_version.is_set() {
408 return Some(Ordering::Equal);
409 }
410 self.wallet_version.partial_cmp(version)
411 }
412
413 pub fn is_static_peer(&self) -> bool {
414 self.static_peer_config.is_some()
415 }
416 pub fn get_public_key(&self) -> Option<SaitoPublicKey> {
417 self.public_key
418 }
419
420 pub fn mark_as_disconnected(&mut self, disconnected_at: Timestamp) {
421 self.challenge_for_peer = None;
422 self.services = vec![];
423 info!(
424 "marking peer : {:?} as disconnected. at : {:?}",
425 self.index, disconnected_at
426 );
427 self.disconnected_at = disconnected_at;
428
429 if let PeerStatus::Disconnected(_, _) = self.peer_status {
430 } else {
431 self.peer_status = PeerStatus::Disconnected(0, 1_000);
432 }
433 }
434
435 pub(crate) fn join_as_reconnection(&mut self, peer: Peer) {
449 assert!(
450 !matches!(peer.peer_status, PeerStatus::Connected),
451 "Old peer should not be already connected"
452 );
453 info!(
454 "joining peer : {:?} to peer : {:?} as a reconnection",
455 peer.index, self.index
456 );
457
458 self.message_limiter = peer.message_limiter;
459 self.handshake_limiter = peer.handshake_limiter;
460 self.key_list_limiter = peer.key_list_limiter;
461 self.disconnected_at = Timestamp::MAX;
462
463 self.static_peer_config = peer.static_peer_config;
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use crate::core::consensus::peers::peer::{Peer, PeerStatus};
470 use crate::core::process::version::Version;
471 use std::cmp::Ordering;
472
473 #[test]
474 fn peer_new_test() {
475 let peer = Peer::new(1);
476
477 assert_eq!(peer.index, 1);
478 assert_eq!(peer.get_public_key(), None);
479 assert!(matches!(
480 peer.peer_status,
481 PeerStatus::Disconnected(0, 1_000)
482 ));
483 assert_eq!(peer.block_fetch_url, "".to_string());
484 assert_eq!(peer.static_peer_config, None);
485 assert_eq!(peer.challenge_for_peer, None);
486 }
487
488 #[test]
489 fn peer_compare_test() {
490 let peer_1 = Peer::new(1);
491 let mut peer_2 = Peer::new(2);
492 let mut peer_3 = Peer::new(3);
493 let mut peer_4 = Peer::new(4);
494
495 assert_eq!(peer_1.wallet_version, Version::new(0, 0, 0));
496
497 peer_2.wallet_version = Version::new(0, 0, 1);
498
499 peer_3.wallet_version = Version::new(0, 1, 0);
500
501 peer_4.wallet_version = Version::new(1, 0, 0);
502
503 assert_eq!(
504 peer_1.compare_version(&peer_2.wallet_version),
505 Some(Ordering::Equal)
506 );
507 assert_eq!(
508 peer_2.compare_version(&peer_1.wallet_version),
509 Some(Ordering::Equal)
510 );
511
512 assert_eq!(
513 peer_3.compare_version(&peer_2.wallet_version),
514 Some(Ordering::Greater)
515 );
516 assert_eq!(
517 peer_2.compare_version(&peer_3.wallet_version),
518 Some(Ordering::Less)
519 );
520
521 assert_eq!(
522 peer_3.compare_version(&peer_4.wallet_version),
523 Some(Ordering::Less)
524 );
525 assert_eq!(
526 peer_4.compare_version(&peer_3.wallet_version),
527 Some(Ordering::Greater)
528 );
529
530 assert_eq!(
531 peer_3.compare_version(&peer_3.wallet_version),
532 Some(Ordering::Equal)
533 );
534 assert_eq!(
535 peer_1.compare_version(&peer_1.wallet_version),
536 Some(Ordering::Equal)
537 );
538 }
539}