use crate::auto_id_map::AutoIdMap;
use futures::executor::{LocalPool, LocalSpawner};
use futures::task::{LocalSpawnExt, SpawnExt};
use lazy_static::lazy_static;
use std::cell::RefCell;
use std::fmt::Formatter;
use std::future::Future;
use std::ops::Add;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
lazy_static! {
static ref IDS: AtomicUsize = AtomicUsize::new(0);
}
fn next_id() -> usize {
IDS.fetch_add(1, Ordering::SeqCst)
}
pub struct EventLoop {
tx: Sender<Box<dyn FnOnce() + Send + 'static>>,
join_handle: Option<JoinHandle<()>>,
id: usize,
}
struct Timeout {
next_run: Instant,
task: Box<dyn FnOnce()>,
}
struct Interval {
next_run: Instant,
interval: Duration,
task: Rc<dyn Fn()>,
}
thread_local! {
static TIMEOUTS: RefCell<AutoIdMap<Timeout>> = RefCell::new(AutoIdMap::new_with_max_size(i32::MAX as usize));
static INTERVALS: RefCell<AutoIdMap<Interval>> = RefCell::new(AutoIdMap::new_with_max_size(i32::MAX as usize));
static POOL: RefCell<LocalPool> = RefCell::new(LocalPool::new());
static SPAWNER: RefCell<Option<LocalSpawner>> = const { RefCell::new(None) };
static LOCAL_ID: RefCell<Option<usize>> = const { RefCell::new(None) };
}
impl EventLoop {
pub fn new() -> Self {
let (tx, rx) = channel();
let id = next_id();
let join_handle = std::thread::spawn(move || {
LOCAL_ID.with(|rc| {
rc.borrow_mut().replace(id);
});
POOL.with(|rc| {
let pool = &mut *rc.borrow_mut();
SPAWNER.with(|rc| {
let opt = &mut *rc.borrow_mut();
let _ = opt.replace(pool.spawner());
});
});
POOL.with(|rc| {
let pool = &mut *rc.borrow_mut();
let spawner = pool.spawner();
let mut next_deadline = Instant::now().add(Duration::from_secs(10));
loop {
let recv_res = rx.recv_timeout(next_deadline.duration_since(Instant::now()));
if recv_res.is_ok() {
let fut: Box<dyn FnOnce() + Send + 'static> = recv_res.ok().unwrap();
spawner.spawn(async move { fut() }).expect("spawn failed");
}
pool.run_until_stalled();
next_deadline = Self::run_timeouts_and_intervals();
if SPAWNER.with(|rc| rc.borrow().is_none()) {
log::debug!("EventLoop worker loop break");
TIMEOUTS.with(|rc| rc.borrow_mut().clear());
INTERVALS.with(|rc| rc.borrow_mut().clear());
pool.run_until_stalled();
break;
}
}
log::debug!("EventLoop worker loop done");
})
});
Self {
tx,
join_handle: Some(join_handle),
id,
}
}
fn run_timeouts_and_intervals() -> Instant {
let now = Instant::now();
let timeout_todos = TIMEOUTS.with(|rc| {
let timeouts = &mut rc.borrow_mut();
timeouts.remove_values(|timeout| timeout.next_run.lt(&now))
});
for timeout_todo in timeout_todos {
let task = timeout_todo.task;
task();
}
let interval_todos = INTERVALS.with(|rc| {
let intervals = &mut *rc.borrow_mut();
let mut todos = vec![];
for interval in intervals.map.values_mut() {
if interval.next_run.lt(&now) {
todos.push(interval.task.clone());
interval.next_run = now.add(interval.interval);
}
}
todos
});
for interval_todo in interval_todos {
interval_todo();
}
let next_deadline = TIMEOUTS.with(|rc| {
let timeouts = &mut rc.borrow();
let mut ret = now.add(Duration::from_secs(10));
for timeout in timeouts.map.values() {
if timeout.next_run.lt(&ret) {
ret = timeout.next_run;
}
}
ret
});
INTERVALS.with(|rc| {
let intervals = &*rc.borrow();
let mut ret = next_deadline;
for interval in intervals.map.values() {
if interval.next_run.lt(&ret) {
ret = interval.next_run;
}
}
ret
})
}
pub fn is_my_pool_thread(&self) -> bool {
LOCAL_ID.with(|rc| {
let opt = &*rc.borrow();
opt.is_some() && opt.unwrap() == self.id
})
}
pub fn is_a_pool_thread() -> bool {
SPAWNER.with(|rc| rc.borrow().is_some())
}
pub fn add_local_future_void<F: Future<Output = ()> + 'static>(fut: F) {
debug_assert!(EventLoop::is_a_pool_thread());
SPAWNER.with(move |rc| {
let spawner = &*rc.borrow();
spawner
.as_ref()
.unwrap()
.spawn_local(fut)
.expect("start fut failed");
});
}
pub fn add_local_future<R: Send + 'static, F: Future<Output = R> + 'static>(
fut: F,
) -> impl Future<Output = R> {
debug_assert!(EventLoop::is_a_pool_thread());
SPAWNER.with(move |rc| {
let spawner = &*rc.borrow();
spawner
.as_ref()
.unwrap()
.spawn_local_with_handle(fut)
.expect("start fut failed")
})
}
pub fn add_local_void<T: FnOnce() + 'static>(task: T) {
debug_assert!(EventLoop::is_a_pool_thread());
Self::add_local_future_void(async move { task() });
}
pub fn add<T: FnOnce() -> R + Send + 'static, R: Send + 'static>(
&self,
task: T,
) -> impl Future<Output = R> {
self.add_future(async move { task() })
}
pub fn exe<R: Send + 'static, T: FnOnce() -> R + Send + 'static>(&self, task: T) -> R {
if Self::is_my_pool_thread(self) {
task()
} else {
let (tx, rx) = channel();
self.add_void(move || tx.send(task()).expect("could not send"));
rx.recv().expect("could not recv")
}
}
pub fn add_future<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
&self,
fut: F,
) -> impl Future<Output = R> {
let (tx, rx) = channel();
self.add_void(move || {
let res_fut = Self::add_local_future(fut);
tx.send(res_fut).expect("send failed");
});
rx.recv().expect("recv failed")
}
pub fn add_future_void<F: Future<Output = ()> + Send + 'static>(&self, fut: F) {
self.add_void(move || EventLoop::add_local_future_void(fut))
}
pub fn add_void<T: FnOnce() + Send + 'static>(&self, task: T) {
if self.is_my_pool_thread() {
Self::add_local_void(task);
} else {
let tx = self.tx.clone();
tx.send(Box::new(task)).expect("send failed");
}
}
pub fn add_timeout<F: FnOnce() + 'static>(task: F, delay: Duration) -> i32 {
debug_assert!(EventLoop::is_a_pool_thread());
let timeout = Timeout {
next_run: Instant::now().add(delay),
task: Box::new(task),
};
TIMEOUTS.with(|rc| rc.borrow_mut().insert(timeout) as i32)
}
pub fn add_interval<F: Fn() + 'static>(task: F, delay: Duration, interval: Duration) -> i32 {
debug_assert!(EventLoop::is_a_pool_thread());
let interval = Interval {
next_run: Instant::now().add(delay),
interval,
task: Rc::new(task),
};
INTERVALS.with(|rc| rc.borrow_mut().insert(interval) as i32)
}
pub fn clear_timeout(id: i32) {
debug_assert!(EventLoop::is_a_pool_thread());
TIMEOUTS.with(|rc| {
let map = &mut *rc.borrow_mut();
if map.contains_key(&(id as usize)) {
let _ = map.remove(&(id as usize));
}
});
}
pub fn clear_interval(id: i32) {
debug_assert!(EventLoop::is_a_pool_thread());
INTERVALS.with(|rc| {
let map = &mut *rc.borrow_mut();
if map.contains_key(&(id as usize)) {
let _ = map.remove(&(id as usize));
}
});
}
}
impl Default for EventLoop {
fn default() -> Self {
Self::new()
}
}
impl Drop for EventLoop {
fn drop(&mut self) {
self.exe(|| {
SPAWNER.with(|rc| {
let spawner = &mut *rc.borrow_mut();
let _ = spawner.take();
})
});
if let Some(join_handle) = self.join_handle.take() {
let _ = join_handle.join();
}
}
}
impl std::fmt::Debug for EventLoop {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("EventLoop")
}
}
#[cfg(test)]
pub mod tests {
use crate::eventloop::EventLoop;
use futures::executor::block_on;
use std::ops::Add;
use std::sync::mpsc::channel;
use std::time::{Duration, Instant};
async fn test_as(input: i32) -> i32 {
input * 12
}
#[test]
fn test() {
let test_loop = EventLoop::new();
let res = test_loop.exe(|| 123);
assert_eq!(res, 123);
let (tx, rx) = channel();
test_loop.add_void(move || {
tx.send("async".to_string()).ok().unwrap();
});
let res = rx.recv().ok().unwrap();
assert_eq!(res.as_str(), "async");
let i = 43;
let fut = test_loop.add_future(async move { test_as(i).await });
let out = block_on(fut);
assert_eq!(43 * 12, out);
let (tx, rx) = channel();
let start = Instant::now();
test_loop.add_void(move || {
EventLoop::add_timeout(
move || {
tx.send(129).ok().expect("send failed");
},
Duration::from_secs(2),
);
});
let res = rx.recv();
let res_i32 = match res {
Ok(i) => i,
Err(e) => panic!("recv failed: {}", e),
};
assert_eq!(res_i32, 129);
assert!(Instant::now().gt(&start.add(Duration::from_millis(1999))));
assert!(Instant::now().lt(&start.add(Duration::from_millis(2999))));
log::debug!("dropping loop");
drop(test_loop);
log::debug!("after loop dropped");
}
#[test]
fn test_sync() {
fn t<E: Send + Sync>(_s: E) {}
let event_loop = EventLoop::new();
t(event_loop);
println!("yup, EL is sync");
}
}