1use std::fmt::Debug;
2use std::panic;
3use std::time::{Duration, Instant};
4
5use log::debug;
6use log::info;
7use tokio::select;
8use tokio::sync::mpsc::Receiver;
9use tokio::task::JoinHandle;
10
11use saito_core::core::io::network_event::NetworkEvent;
12use saito_core::core::process::keep_time::{KeepTime, Timer};
13use saito_core::core::process::process_event::ProcessEvent;
14
15pub async fn receive_event<T>(receiver: &mut Option<Receiver<T>>) -> Option<T> {
16 if let Some(receiver) = receiver.as_mut() {
17 return receiver.recv().await;
18 }
19 None
21}
22
23pub async fn run_thread<T>(
48 mut event_processor: Box<(dyn ProcessEvent<T> + Send + 'static)>,
49 mut network_event_receiver: Option<Receiver<NetworkEvent>>,
50 mut event_receiver: Option<Receiver<T>>,
51 stat_timer_in_ms: u64,
52 thread_name: &str,
53 thread_sleep_time_in_ms: u64,
54 time_keeper_origin: &Timer,
55) -> JoinHandle<()>
56where
57 T: Send + Debug + 'static,
58{
59 let time_keeper = time_keeper_origin.clone();
60 let t_name = thread_name.to_string();
61 tokio::task::Builder::new()
62 .name(thread_name)
63 .spawn(async move {
64 info!("new thread started");
65 let mut last_stat_time = Instant::now();
67 let time_keeper = time_keeper.clone();
68
69 event_processor.on_init().await;
70 let mut interval =
71 tokio::time::interval(Duration::from_millis(thread_sleep_time_in_ms));
72 let mut stat_interval = tokio::time::interval(Duration::from_millis(stat_timer_in_ms));
73
74 loop {
75 let ready = event_processor.is_ready_to_process();
76 if !ready{
77 debug!("event processor : {:?} not ready. channels are filled",t_name);
78 }
79 select! {
80 result = receive_event(&mut event_receiver), if event_receiver.is_some() && ready=>{
81 if result.is_some() {
82 let event = result.unwrap();
83 event_processor.process_event(event).await;
84 }
85 }
86 result = receive_event(&mut network_event_receiver), if network_event_receiver.is_some() && ready=>{
87 if result.is_some() {
88 let event: NetworkEvent = result.unwrap();
89 event_processor.process_network_event(event).await;
90 }
91 }
92 _ = interval.tick()=>{
93 event_processor
94 .process_timer_event(interval.period())
95 .await;
96 }
97 _ = stat_interval.tick()=>{
98 {
99 let current_instant = Instant::now();
100
101 let duration = current_instant.duration_since(last_stat_time);
102 if duration > Duration::from_millis(stat_timer_in_ms) {
103 last_stat_time = current_instant;
104 event_processor
105 .on_stat_interval(time_keeper.get_timestamp_in_ms())
106 .await;
107 }
108 }
109 }
110 }
111 }
112 })
113 .unwrap()
114}