hirofa_utils/
eventloop.rs

1use crate::auto_id_map::AutoIdMap;
2use futures::executor::{LocalPool, LocalSpawner};
3use futures::task::{LocalSpawnExt, SpawnExt};
4use lazy_static::lazy_static;
5use std::cell::RefCell;
6use std::fmt::Formatter;
7use std::future::Future;
8use std::ops::Add;
9use std::rc::Rc;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::mpsc::{channel, Sender};
12use std::thread::JoinHandle;
13use std::time::{Duration, Instant};
14
15lazy_static! {
16    static ref IDS: AtomicUsize = AtomicUsize::new(0);
17}
18
19fn next_id() -> usize {
20    IDS.fetch_add(1, Ordering::SeqCst)
21}
22
23/// the EventLoop struct is a single thread event queue
24pub struct EventLoop {
25    tx: Sender<Box<dyn FnOnce() + Send + 'static>>,
26    join_handle: Option<JoinHandle<()>>,
27    id: usize,
28}
29
30struct Timeout {
31    next_run: Instant,
32    task: Box<dyn FnOnce()>,
33}
34
35struct Interval {
36    next_run: Instant,
37    interval: Duration,
38    task: Rc<dyn Fn()>,
39}
40
41thread_local! {
42    static TIMEOUTS: RefCell<AutoIdMap<Timeout>> = RefCell::new(AutoIdMap::new_with_max_size(i32::MAX as usize));
43    static INTERVALS: RefCell<AutoIdMap<Interval>> = RefCell::new(AutoIdMap::new_with_max_size(i32::MAX as usize));
44    // impl timeout and interval tasks as two separate thread_locals, add a single method to add jobs for timeouts and intervals which returns a next)runt instant, that may be used for recv on next loop
45    static POOL: RefCell<LocalPool> = RefCell::new(LocalPool::new());
46    static SPAWNER: RefCell<Option<LocalSpawner>> = const { RefCell::new(None) };
47    static LOCAL_ID: RefCell<Option<usize>> = const { RefCell::new(None) };
48}
49
50impl EventLoop {
51    /// init a new EventLoop
52    pub fn new() -> Self {
53        // todo settable buffer size
54        let (tx, rx) = channel();
55
56        let id = next_id();
57
58        let join_handle = std::thread::spawn(move || {
59            LOCAL_ID.with(|rc| {
60                rc.borrow_mut().replace(id);
61            });
62
63            POOL.with(|rc| {
64                let pool = &mut *rc.borrow_mut();
65
66                SPAWNER.with(|rc| {
67                    let opt = &mut *rc.borrow_mut();
68                    let _ = opt.replace(pool.spawner());
69                });
70            });
71
72            POOL.with(|rc| {
73                let pool = &mut *rc.borrow_mut();
74                let spawner = pool.spawner();
75                let mut next_deadline = Instant::now().add(Duration::from_secs(10));
76                loop {
77                    // recv may fail on timeout
78                    let recv_res = rx.recv_timeout(next_deadline.duration_since(Instant::now()));
79                    if recv_res.is_ok() {
80                        let fut: Box<dyn FnOnce() + Send + 'static> = recv_res.ok().unwrap();
81                        // this seems redundant.. i could just run the task closure
82
83                        spawner.spawn(async move { fut() }).expect("spawn failed");
84                    }
85
86                    pool.run_until_stalled();
87
88                    // add jobs for timeout and interval here, recalc next timout deadline based on next pending timeout or interval
89                    next_deadline = Self::run_timeouts_and_intervals();
90
91                    // shutdown indicator
92                    if SPAWNER.with(|rc| rc.borrow().is_none()) {
93                        log::debug!("EventLoop worker loop break");
94                        // drop all timeouts and intervals here
95                        TIMEOUTS.with(|rc| rc.borrow_mut().clear());
96                        INTERVALS.with(|rc| rc.borrow_mut().clear());
97                        // then do run_until_stalled again so finalizers may run
98                        pool.run_until_stalled();
99                        // exit loop
100                        break;
101                    }
102                }
103                log::debug!("EventLoop worker loop done");
104            })
105        });
106
107        Self {
108            tx,
109            join_handle: Some(join_handle),
110            id,
111        }
112    }
113
114    /// run scheduled tasks and calculate next deadline for running other tasks
115    fn run_timeouts_and_intervals() -> Instant {
116        // this is probably not very efficient when there are lots of timeouts, could be optimized by sorting based on next_run and thus not looping over future jobs
117        let now = Instant::now();
118
119        let timeout_todos = TIMEOUTS.with(|rc| {
120            let timeouts = &mut rc.borrow_mut();
121            timeouts.remove_values(|timeout| timeout.next_run.lt(&now))
122        });
123
124        for timeout_todo in timeout_todos {
125            let task = timeout_todo.task;
126            task();
127        }
128
129        let interval_todos = INTERVALS.with(|rc| {
130            let intervals = &mut *rc.borrow_mut();
131            let mut todos = vec![];
132            for interval in intervals.map.values_mut() {
133                if interval.next_run.lt(&now) {
134                    todos.push(interval.task.clone());
135                    interval.next_run = now.add(interval.interval);
136                }
137            }
138            todos
139        });
140
141        for interval_todo in interval_todos {
142            interval_todo();
143        }
144
145        // next deadline is always calculated afterwards because timeouts may have been added from a timeout (or interval)
146
147        let next_deadline = TIMEOUTS.with(|rc| {
148            let timeouts = &mut rc.borrow();
149            let mut ret = now.add(Duration::from_secs(10));
150            for timeout in timeouts.map.values() {
151                if timeout.next_run.lt(&ret) {
152                    ret = timeout.next_run;
153                }
154            }
155            ret
156        });
157
158        INTERVALS.with(|rc| {
159            let intervals = &*rc.borrow();
160            let mut ret = next_deadline;
161            for interval in intervals.map.values() {
162                if interval.next_run.lt(&ret) {
163                    ret = interval.next_run;
164                }
165            }
166            ret
167        })
168    }
169
170    /// internal method to ensure a member is called from the worker thread
171    pub fn is_my_pool_thread(&self) -> bool {
172        LOCAL_ID.with(|rc| {
173            let opt = &*rc.borrow();
174            opt.is_some() && opt.unwrap() == self.id
175        })
176    }
177
178    /// internal method to ensure a member is called from the worker thread
179    pub fn is_a_pool_thread() -> bool {
180        SPAWNER.with(|rc| rc.borrow().is_some())
181    }
182
183    /// add a future to the EventLoop from within a running task
184    pub fn add_local_future_void<F: Future<Output = ()> + 'static>(fut: F) {
185        debug_assert!(EventLoop::is_a_pool_thread());
186        SPAWNER.with(move |rc| {
187            let spawner = &*rc.borrow();
188            spawner
189                .as_ref()
190                .unwrap()
191                .spawn_local(fut)
192                .expect("start fut failed");
193        });
194    }
195
196    /// add a future to the EventLoop from within a running task
197    pub fn add_local_future<R: Send + 'static, F: Future<Output = R> + 'static>(
198        fut: F,
199    ) -> impl Future<Output = R> {
200        debug_assert!(EventLoop::is_a_pool_thread());
201        SPAWNER.with(move |rc| {
202            let spawner = &*rc.borrow();
203            spawner
204                .as_ref()
205                .unwrap()
206                .spawn_local_with_handle(fut)
207                .expect("start fut failed")
208        })
209    }
210
211    /// add a task to the EventLoop from within a running task
212    pub fn add_local_void<T: FnOnce() + 'static>(task: T) {
213        debug_assert!(EventLoop::is_a_pool_thread());
214        Self::add_local_future_void(async move { task() });
215    }
216
217    /// add a task to the EventLoop
218    pub fn add<T: FnOnce() -> R + Send + 'static, R: Send + 'static>(
219        &self,
220        task: T,
221    ) -> impl Future<Output = R> {
222        self.add_future(async move { task() })
223    }
224
225    /// execute a task in the EventLoop and block until it completes
226    pub fn exe<R: Send + 'static, T: FnOnce() -> R + Send + 'static>(&self, task: T) -> R {
227        if Self::is_my_pool_thread(self) {
228            task()
229        } else {
230            let (tx, rx) = channel();
231            self.add_void(move || tx.send(task()).expect("could not send"));
232            rx.recv().expect("could not recv")
233        }
234    }
235
236    /// add an async block to the EventLoop
237    /// #Example
238    /// ```rust
239    /// use hirofa_utils::eventloop::EventLoop;
240    /// use futures::executor::block_on;
241    /// let test_loop = EventLoop::new();
242    /// let fut = test_loop.add_future(async move {
243    ///    // this is an async block, you can .await async functions here
244    ///    123
245    /// });
246    /// let res = block_on(fut); // get result
247    /// assert_eq!(res, 123);
248    /// ```
249    pub fn add_future<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
250        &self,
251        fut: F,
252    ) -> impl Future<Output = R> {
253        let (tx, rx) = channel();
254        self.add_void(move || {
255            let res_fut = Self::add_local_future(fut);
256            tx.send(res_fut).expect("send failed");
257        });
258        rx.recv().expect("recv failed")
259    }
260
261    /// add a Future to the pool, for when you don't need the result
262    /// #Example
263    /// ```rust
264    /// use hirofa_utils::eventloop::EventLoop;
265    /// use futures::executor::block_on;
266    /// use std::sync::mpsc::channel;
267    /// let test_loop = EventLoop::new();
268    /// let (tx, rx) = channel(); // just to see if it works
269    /// let fut = test_loop.add_future(async move {
270    ///    // this is an async block, you can .await async functions here
271    ///    println!("running async here");
272    ///    tx.send(1234);
273    /// });
274    ///
275    /// let res = rx.recv().expect("could not recv");
276    /// assert_eq!(res, 1234);
277    /// ```    
278    pub fn add_future_void<F: Future<Output = ()> + Send + 'static>(&self, fut: F) {
279        self.add_void(move || EventLoop::add_local_future_void(fut))
280    }
281
282    /// add a task to the pool
283    pub fn add_void<T: FnOnce() + Send + 'static>(&self, task: T) {
284        if self.is_my_pool_thread() {
285            Self::add_local_void(task);
286        } else {
287            let tx = self.tx.clone();
288            tx.send(Box::new(task)).expect("send failed");
289        }
290    }
291
292    /// add a timeout (delayed task) to the EventLoop
293    pub fn add_timeout<F: FnOnce() + 'static>(task: F, delay: Duration) -> i32 {
294        debug_assert!(EventLoop::is_a_pool_thread());
295        let timeout = Timeout {
296            next_run: Instant::now().add(delay),
297            task: Box::new(task),
298        };
299        TIMEOUTS.with(|rc| rc.borrow_mut().insert(timeout) as i32)
300    }
301
302    /// add an interval (repeated task) to the EventLoop
303    pub fn add_interval<F: Fn() + 'static>(task: F, delay: Duration, interval: Duration) -> i32 {
304        debug_assert!(EventLoop::is_a_pool_thread());
305        let interval = Interval {
306            next_run: Instant::now().add(delay),
307            interval,
308            task: Rc::new(task),
309        };
310        INTERVALS.with(|rc| rc.borrow_mut().insert(interval) as i32)
311    }
312
313    /// cancel a previously added timeout
314    pub fn clear_timeout(id: i32) {
315        debug_assert!(EventLoop::is_a_pool_thread());
316        TIMEOUTS.with(|rc| {
317            let map = &mut *rc.borrow_mut();
318            if map.contains_key(&(id as usize)) {
319                let _ = map.remove(&(id as usize));
320            }
321        });
322    }
323
324    /// cancel a previously added interval
325    pub fn clear_interval(id: i32) {
326        debug_assert!(EventLoop::is_a_pool_thread());
327        INTERVALS.with(|rc| {
328            let map = &mut *rc.borrow_mut();
329            if map.contains_key(&(id as usize)) {
330                let _ = map.remove(&(id as usize));
331            }
332        });
333    }
334}
335
336impl Default for EventLoop {
337    fn default() -> Self {
338        Self::new()
339    }
340}
341
342impl Drop for EventLoop {
343    fn drop(&mut self) {
344        self.exe(|| {
345            SPAWNER.with(|rc| {
346                let spawner = &mut *rc.borrow_mut();
347                let _ = spawner.take();
348            })
349        });
350        if let Some(join_handle) = self.join_handle.take() {
351            let _ = join_handle.join();
352        }
353    }
354}
355impl std::fmt::Debug for EventLoop {
356    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
357        f.write_str("EventLoop")
358    }
359}
360
361#[cfg(test)]
362pub mod tests {
363    use crate::eventloop::EventLoop;
364    use futures::executor::block_on;
365    use std::ops::Add;
366    use std::sync::mpsc::channel;
367    use std::time::{Duration, Instant};
368
369    async fn test_as(input: i32) -> i32 {
370        input * 12
371    }
372
373    #[test]
374    fn test() {
375        //
376
377        let test_loop = EventLoop::new();
378
379        let res = test_loop.exe(|| 123);
380        assert_eq!(res, 123);
381
382        let (tx, rx) = channel();
383        test_loop.add_void(move || {
384            tx.send("async".to_string()).ok().unwrap();
385        });
386
387        let res = rx.recv().ok().unwrap();
388        assert_eq!(res.as_str(), "async");
389
390        let i = 43;
391        let fut = test_loop.add_future(async move { test_as(i).await });
392        let out = block_on(fut);
393        assert_eq!(43 * 12, out);
394
395        let (tx, rx) = channel();
396        let start = Instant::now();
397        test_loop.add_void(move || {
398            EventLoop::add_timeout(
399                move || {
400                    tx.send(129).ok().expect("send failed");
401                },
402                Duration::from_secs(2),
403            );
404        });
405        let res = rx.recv();
406        let res_i32 = match res {
407            Ok(i) => i,
408            Err(e) => panic!("recv failed: {}", e),
409        };
410        assert_eq!(res_i32, 129);
411        // we should be at least 2 seconds further
412        assert!(Instant::now().gt(&start.add(Duration::from_millis(1999))));
413        // but certainly not 3
414        assert!(Instant::now().lt(&start.add(Duration::from_millis(2999))));
415
416        log::debug!("dropping loop");
417        drop(test_loop);
418        log::debug!("after loop dropped");
419    }
420
421    #[test]
422    fn test_sync() {
423        fn t<E: Send + Sync>(_s: E) {}
424        let event_loop = EventLoop::new();
425        t(event_loop);
426        println!("yup, EL is sync");
427    }
428}