saito_rust/
run_thread.rs

1use std::fmt::Debug;
2use std::panic;
3use std::time::{Duration, Instant};
4
5use log::debug;
6use log::info;
7use tokio::select;
8use tokio::sync::mpsc::Receiver;
9use tokio::task::JoinHandle;
10
11use saito_core::core::io::network_event::NetworkEvent;
12use saito_core::core::process::keep_time::{KeepTime, Timer};
13use saito_core::core::process::process_event::ProcessEvent;
14
15pub async fn receive_event<T>(receiver: &mut Option<Receiver<T>>) -> Option<T> {
16    if let Some(receiver) = receiver.as_mut() {
17        return receiver.recv().await;
18    }
19    // tokio::time::sleep(Duration::from_secs(1_000_000)).await;
20    None
21}
22
23/// Runs a permanent thread with an event loop
24///
25/// This thread will have,
26/// 1. an event loop which processes incoming events
27/// 2. a timer functionality which fires for each iteration of the event loop
28///
29/// If any work is done in the event loop, it will immediately begin the next iteration after this one.
30/// If no work is done in the current iteration, it will go to sleep **thread_sleep_time_in_ms** amount of time
31///
32/// # Arguments
33///
34/// * `event_processor`:
35/// * `network_event_receiver`:
36/// * `event_receiver`:
37/// * `stat_timer_in_ms`:
38/// * `thread_sleep_time_in_ms`:
39///
40/// returns: JoinHandle<()>
41///
42/// # Examples
43///
44/// ```
45///
46/// ```
47pub async fn run_thread<T>(
48    mut event_processor: Box<(dyn ProcessEvent<T> + Send + 'static)>,
49    mut network_event_receiver: Option<Receiver<NetworkEvent>>,
50    mut event_receiver: Option<Receiver<T>>,
51    stat_timer_in_ms: u64,
52    thread_name: &str,
53    thread_sleep_time_in_ms: u64,
54    time_keeper_origin: &Timer,
55) -> JoinHandle<()>
56where
57    T: Send + Debug + 'static,
58{
59    let time_keeper = time_keeper_origin.clone();
60    let t_name = thread_name.to_string();
61    tokio::task::Builder::new()
62        .name(thread_name)
63        .spawn(async move {
64            info!("new thread started");
65            // let mut work_done;
66            let mut last_stat_time = Instant::now();
67            let time_keeper = time_keeper.clone();
68
69            event_processor.on_init().await;
70            let mut interval =
71                tokio::time::interval(Duration::from_millis(thread_sleep_time_in_ms));
72            let mut stat_interval = tokio::time::interval(Duration::from_millis(stat_timer_in_ms));
73
74            loop {
75                let ready = event_processor.is_ready_to_process();
76                if !ready{
77                    debug!("event processor : {:?} not ready. channels are filled",t_name);
78                }
79                select! {
80                        result = receive_event(&mut event_receiver), if event_receiver.is_some() && ready=>{
81                            if result.is_some() {
82                                let event = result.unwrap();
83                                event_processor.process_event(event).await;
84                            }
85                        }
86                        result = receive_event(&mut network_event_receiver), if network_event_receiver.is_some() && ready=>{
87                            if result.is_some() {
88                                let event: NetworkEvent = result.unwrap();
89                                event_processor.process_network_event(event).await;
90                            }
91                        }
92                        _ = interval.tick()=>{
93                                event_processor
94                                   .process_timer_event(interval.period())
95                                   .await;
96                        }
97                        _ = stat_interval.tick()=>{
98                            {
99                                let current_instant = Instant::now();
100
101                                let duration = current_instant.duration_since(last_stat_time);
102                                if duration > Duration::from_millis(stat_timer_in_ms) {
103                                    last_stat_time = current_instant;
104                                    event_processor
105                                        .on_stat_interval(time_keeper.get_timestamp_in_ms())
106                                        .await;
107                                }
108                            }
109                        }
110                    }
111            }
112        })
113        .unwrap()
114}