hirofa_utils/
eventloop.rs1use crate::auto_id_map::AutoIdMap;
2use futures::executor::{LocalPool, LocalSpawner};
3use futures::task::{LocalSpawnExt, SpawnExt};
4use lazy_static::lazy_static;
5use std::cell::RefCell;
6use std::fmt::Formatter;
7use std::future::Future;
8use std::ops::Add;
9use std::rc::Rc;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::mpsc::{channel, Sender};
12use std::thread::JoinHandle;
13use std::time::{Duration, Instant};
14
15lazy_static! {
16 static ref IDS: AtomicUsize = AtomicUsize::new(0);
17}
18
19fn next_id() -> usize {
20 IDS.fetch_add(1, Ordering::SeqCst)
21}
22
23pub struct EventLoop {
25 tx: Sender<Box<dyn FnOnce() + Send + 'static>>,
26 join_handle: Option<JoinHandle<()>>,
27 id: usize,
28}
29
30struct Timeout {
31 next_run: Instant,
32 task: Box<dyn FnOnce()>,
33}
34
35struct Interval {
36 next_run: Instant,
37 interval: Duration,
38 task: Rc<dyn Fn()>,
39}
40
41thread_local! {
42 static TIMEOUTS: RefCell<AutoIdMap<Timeout>> = RefCell::new(AutoIdMap::new_with_max_size(i32::MAX as usize));
43 static INTERVALS: RefCell<AutoIdMap<Interval>> = RefCell::new(AutoIdMap::new_with_max_size(i32::MAX as usize));
44 static POOL: RefCell<LocalPool> = RefCell::new(LocalPool::new());
46 static SPAWNER: RefCell<Option<LocalSpawner>> = const { RefCell::new(None) };
47 static LOCAL_ID: RefCell<Option<usize>> = const { RefCell::new(None) };
48}
49
50impl EventLoop {
51 pub fn new() -> Self {
53 let (tx, rx) = channel();
55
56 let id = next_id();
57
58 let join_handle = std::thread::spawn(move || {
59 LOCAL_ID.with(|rc| {
60 rc.borrow_mut().replace(id);
61 });
62
63 POOL.with(|rc| {
64 let pool = &mut *rc.borrow_mut();
65
66 SPAWNER.with(|rc| {
67 let opt = &mut *rc.borrow_mut();
68 let _ = opt.replace(pool.spawner());
69 });
70 });
71
72 POOL.with(|rc| {
73 let pool = &mut *rc.borrow_mut();
74 let spawner = pool.spawner();
75 let mut next_deadline = Instant::now().add(Duration::from_secs(10));
76 loop {
77 let recv_res = rx.recv_timeout(next_deadline.duration_since(Instant::now()));
79 if recv_res.is_ok() {
80 let fut: Box<dyn FnOnce() + Send + 'static> = recv_res.ok().unwrap();
81 spawner.spawn(async move { fut() }).expect("spawn failed");
84 }
85
86 pool.run_until_stalled();
87
88 next_deadline = Self::run_timeouts_and_intervals();
90
91 if SPAWNER.with(|rc| rc.borrow().is_none()) {
93 log::debug!("EventLoop worker loop break");
94 TIMEOUTS.with(|rc| rc.borrow_mut().clear());
96 INTERVALS.with(|rc| rc.borrow_mut().clear());
97 pool.run_until_stalled();
99 break;
101 }
102 }
103 log::debug!("EventLoop worker loop done");
104 })
105 });
106
107 Self {
108 tx,
109 join_handle: Some(join_handle),
110 id,
111 }
112 }
113
114 fn run_timeouts_and_intervals() -> Instant {
116 let now = Instant::now();
118
119 let timeout_todos = TIMEOUTS.with(|rc| {
120 let timeouts = &mut rc.borrow_mut();
121 timeouts.remove_values(|timeout| timeout.next_run.lt(&now))
122 });
123
124 for timeout_todo in timeout_todos {
125 let task = timeout_todo.task;
126 task();
127 }
128
129 let interval_todos = INTERVALS.with(|rc| {
130 let intervals = &mut *rc.borrow_mut();
131 let mut todos = vec![];
132 for interval in intervals.map.values_mut() {
133 if interval.next_run.lt(&now) {
134 todos.push(interval.task.clone());
135 interval.next_run = now.add(interval.interval);
136 }
137 }
138 todos
139 });
140
141 for interval_todo in interval_todos {
142 interval_todo();
143 }
144
145 let next_deadline = TIMEOUTS.with(|rc| {
148 let timeouts = &mut rc.borrow();
149 let mut ret = now.add(Duration::from_secs(10));
150 for timeout in timeouts.map.values() {
151 if timeout.next_run.lt(&ret) {
152 ret = timeout.next_run;
153 }
154 }
155 ret
156 });
157
158 INTERVALS.with(|rc| {
159 let intervals = &*rc.borrow();
160 let mut ret = next_deadline;
161 for interval in intervals.map.values() {
162 if interval.next_run.lt(&ret) {
163 ret = interval.next_run;
164 }
165 }
166 ret
167 })
168 }
169
170 pub fn is_my_pool_thread(&self) -> bool {
172 LOCAL_ID.with(|rc| {
173 let opt = &*rc.borrow();
174 opt.is_some() && opt.unwrap() == self.id
175 })
176 }
177
178 pub fn is_a_pool_thread() -> bool {
180 SPAWNER.with(|rc| rc.borrow().is_some())
181 }
182
183 pub fn add_local_future_void<F: Future<Output = ()> + 'static>(fut: F) {
185 debug_assert!(EventLoop::is_a_pool_thread());
186 SPAWNER.with(move |rc| {
187 let spawner = &*rc.borrow();
188 spawner
189 .as_ref()
190 .unwrap()
191 .spawn_local(fut)
192 .expect("start fut failed");
193 });
194 }
195
196 pub fn add_local_future<R: Send + 'static, F: Future<Output = R> + 'static>(
198 fut: F,
199 ) -> impl Future<Output = R> {
200 debug_assert!(EventLoop::is_a_pool_thread());
201 SPAWNER.with(move |rc| {
202 let spawner = &*rc.borrow();
203 spawner
204 .as_ref()
205 .unwrap()
206 .spawn_local_with_handle(fut)
207 .expect("start fut failed")
208 })
209 }
210
211 pub fn add_local_void<T: FnOnce() + 'static>(task: T) {
213 debug_assert!(EventLoop::is_a_pool_thread());
214 Self::add_local_future_void(async move { task() });
215 }
216
217 pub fn add<T: FnOnce() -> R + Send + 'static, R: Send + 'static>(
219 &self,
220 task: T,
221 ) -> impl Future<Output = R> {
222 self.add_future(async move { task() })
223 }
224
225 pub fn exe<R: Send + 'static, T: FnOnce() -> R + Send + 'static>(&self, task: T) -> R {
227 if Self::is_my_pool_thread(self) {
228 task()
229 } else {
230 let (tx, rx) = channel();
231 self.add_void(move || tx.send(task()).expect("could not send"));
232 rx.recv().expect("could not recv")
233 }
234 }
235
236 pub fn add_future<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
250 &self,
251 fut: F,
252 ) -> impl Future<Output = R> {
253 let (tx, rx) = channel();
254 self.add_void(move || {
255 let res_fut = Self::add_local_future(fut);
256 tx.send(res_fut).expect("send failed");
257 });
258 rx.recv().expect("recv failed")
259 }
260
261 pub fn add_future_void<F: Future<Output = ()> + Send + 'static>(&self, fut: F) {
279 self.add_void(move || EventLoop::add_local_future_void(fut))
280 }
281
282 pub fn add_void<T: FnOnce() + Send + 'static>(&self, task: T) {
284 if self.is_my_pool_thread() {
285 Self::add_local_void(task);
286 } else {
287 let tx = self.tx.clone();
288 tx.send(Box::new(task)).expect("send failed");
289 }
290 }
291
292 pub fn add_timeout<F: FnOnce() + 'static>(task: F, delay: Duration) -> i32 {
294 debug_assert!(EventLoop::is_a_pool_thread());
295 let timeout = Timeout {
296 next_run: Instant::now().add(delay),
297 task: Box::new(task),
298 };
299 TIMEOUTS.with(|rc| rc.borrow_mut().insert(timeout) as i32)
300 }
301
302 pub fn add_interval<F: Fn() + 'static>(task: F, delay: Duration, interval: Duration) -> i32 {
304 debug_assert!(EventLoop::is_a_pool_thread());
305 let interval = Interval {
306 next_run: Instant::now().add(delay),
307 interval,
308 task: Rc::new(task),
309 };
310 INTERVALS.with(|rc| rc.borrow_mut().insert(interval) as i32)
311 }
312
313 pub fn clear_timeout(id: i32) {
315 debug_assert!(EventLoop::is_a_pool_thread());
316 TIMEOUTS.with(|rc| {
317 let map = &mut *rc.borrow_mut();
318 if map.contains_key(&(id as usize)) {
319 let _ = map.remove(&(id as usize));
320 }
321 });
322 }
323
324 pub fn clear_interval(id: i32) {
326 debug_assert!(EventLoop::is_a_pool_thread());
327 INTERVALS.with(|rc| {
328 let map = &mut *rc.borrow_mut();
329 if map.contains_key(&(id as usize)) {
330 let _ = map.remove(&(id as usize));
331 }
332 });
333 }
334}
335
336impl Default for EventLoop {
337 fn default() -> Self {
338 Self::new()
339 }
340}
341
342impl Drop for EventLoop {
343 fn drop(&mut self) {
344 self.exe(|| {
345 SPAWNER.with(|rc| {
346 let spawner = &mut *rc.borrow_mut();
347 let _ = spawner.take();
348 })
349 });
350 if let Some(join_handle) = self.join_handle.take() {
351 let _ = join_handle.join();
352 }
353 }
354}
355impl std::fmt::Debug for EventLoop {
356 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
357 f.write_str("EventLoop")
358 }
359}
360
361#[cfg(test)]
362pub mod tests {
363 use crate::eventloop::EventLoop;
364 use futures::executor::block_on;
365 use std::ops::Add;
366 use std::sync::mpsc::channel;
367 use std::time::{Duration, Instant};
368
369 async fn test_as(input: i32) -> i32 {
370 input * 12
371 }
372
373 #[test]
374 fn test() {
375 let test_loop = EventLoop::new();
378
379 let res = test_loop.exe(|| 123);
380 assert_eq!(res, 123);
381
382 let (tx, rx) = channel();
383 test_loop.add_void(move || {
384 tx.send("async".to_string()).ok().unwrap();
385 });
386
387 let res = rx.recv().ok().unwrap();
388 assert_eq!(res.as_str(), "async");
389
390 let i = 43;
391 let fut = test_loop.add_future(async move { test_as(i).await });
392 let out = block_on(fut);
393 assert_eq!(43 * 12, out);
394
395 let (tx, rx) = channel();
396 let start = Instant::now();
397 test_loop.add_void(move || {
398 EventLoop::add_timeout(
399 move || {
400 tx.send(129).ok().expect("send failed");
401 },
402 Duration::from_secs(2),
403 );
404 });
405 let res = rx.recv();
406 let res_i32 = match res {
407 Ok(i) => i,
408 Err(e) => panic!("recv failed: {}", e),
409 };
410 assert_eq!(res_i32, 129);
411 assert!(Instant::now().gt(&start.add(Duration::from_millis(1999))));
413 assert!(Instant::now().lt(&start.add(Duration::from_millis(2999))));
415
416 log::debug!("dropping loop");
417 drop(test_loop);
418 log::debug!("after loop dropped");
419 }
420
421 #[test]
422 fn test_sync() {
423 fn t<E: Send + Sync>(_s: E) {}
424 let event_loop = EventLoop::new();
425 t(event_loop);
426 println!("yup, EL is sync");
427 }
428}