hirofa_utils/
resolvable_future.rs

1use crate::debug_mutex::DebugMutex;
2use futures::task::{Context, Poll, Waker};
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::mpsc::{channel, Receiver, SendError, Sender};
6use std::sync::Arc;
7
8pub struct ResolvableFutureResolver<R> {
9    sender: Sender<R>,
10    waker: DebugMutex<Option<Waker>>,
11}
12
13impl<R> ResolvableFutureResolver<R> {
14    pub fn new(tx: Sender<R>) -> Self {
15        Self {
16            sender: tx,
17            waker: DebugMutex::new(None, "ResolvableFutureResolver::waker"),
18        }
19    }
20    pub fn resolve(&self, resolution: R) -> Result<(), SendError<R>> {
21        log::trace!("ResolvableFutureResolver.resolve");
22        let waker_opt = &mut *self.waker.lock("resolve").unwrap();
23
24        match self.sender.send(resolution) {
25            Ok(_) => {
26                if let Some(waker) = waker_opt.take() {
27                    log::trace!("ResolvableFutureResolver.resolve has waker, waking");
28                    waker.wake();
29                }
30                Ok(())
31            }
32            Err(se) => {
33                if let Some(waker) = waker_opt.take() {
34                    log::error!(
35                        "ResolvableFutureResolver::could not send response ({:?}), had waker so waking", se
36                    );
37                    waker.wake();
38                } else {
39                    log::error!("ResolvableFutureResolver::could not send response ({:?}), had no waker so was possibly already resolved", se);
40                }
41                Err(se)
42            }
43        }
44    }
45}
46
47pub struct ResolvableFuture<R> {
48    result: Receiver<R>,
49    resolver: Arc<ResolvableFutureResolver<R>>,
50}
51impl<R> ResolvableFuture<R> {
52    pub fn new() -> Self {
53        let (tx, rx) = channel();
54
55        Self {
56            result: rx,
57            resolver: Arc::new(ResolvableFutureResolver::new(tx)),
58        }
59    }
60    pub fn get_resolver(&self) -> Arc<ResolvableFutureResolver<R>> {
61        self.resolver.clone()
62    }
63}
64impl<R> Default for ResolvableFuture<R> {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69impl<R> Future for ResolvableFuture<R> {
70    type Output = R;
71
72    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
73        log::trace!("TaskFuture::poll");
74        match self.result.try_recv() {
75            Ok(res) => {
76                log::trace!("TaskFuture::poll -> Ready");
77                Poll::Ready(res)
78            }
79            Err(_) => {
80                log::trace!("TaskFuture::poll -> Pending");
81                let mtx = &self.resolver.waker;
82                let waker_opt = &mut *mtx.lock("poll").unwrap();
83                if let Ok(res) = self.result.try_recv() {
84                    log::trace!("TaskFuture::poll -> Ready @ 2");
85                    Poll::Ready(res)
86                } else {
87                    let _ = waker_opt.replace(cx.waker().clone());
88                    Poll::Pending
89                }
90            }
91        }
92    }
93}
94
95#[cfg(test)]
96pub mod tests {
97    use crate::resolvable_future::ResolvableFuture;
98
99    #[tokio::test]
100    async fn my_test() {
101        let fut = ResolvableFuture::new();
102        let r = fut.resolver.clone();
103        tokio::spawn(async move {
104            match r.resolve("hi".to_string()) {
105                Ok(_) => {}
106                Err(e) => {
107                    println!("could not resolve {e}");
108                }
109            }
110        });
111
112        let res = fut.await;
113
114        println!("res={res}");
115    }
116}