hirofa_utils/
resolvable_future.rs1use crate::debug_mutex::DebugMutex;
2use futures::task::{Context, Poll, Waker};
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::mpsc::{channel, Receiver, SendError, Sender};
6use std::sync::Arc;
7
8pub struct ResolvableFutureResolver<R> {
9 sender: Sender<R>,
10 waker: DebugMutex<Option<Waker>>,
11}
12
13impl<R> ResolvableFutureResolver<R> {
14 pub fn new(tx: Sender<R>) -> Self {
15 Self {
16 sender: tx,
17 waker: DebugMutex::new(None, "ResolvableFutureResolver::waker"),
18 }
19 }
20 pub fn resolve(&self, resolution: R) -> Result<(), SendError<R>> {
21 log::trace!("ResolvableFutureResolver.resolve");
22 let waker_opt = &mut *self.waker.lock("resolve").unwrap();
23
24 match self.sender.send(resolution) {
25 Ok(_) => {
26 if let Some(waker) = waker_opt.take() {
27 log::trace!("ResolvableFutureResolver.resolve has waker, waking");
28 waker.wake();
29 }
30 Ok(())
31 }
32 Err(se) => {
33 if let Some(waker) = waker_opt.take() {
34 log::error!(
35 "ResolvableFutureResolver::could not send response ({:?}), had waker so waking", se
36 );
37 waker.wake();
38 } else {
39 log::error!("ResolvableFutureResolver::could not send response ({:?}), had no waker so was possibly already resolved", se);
40 }
41 Err(se)
42 }
43 }
44 }
45}
46
47pub struct ResolvableFuture<R> {
48 result: Receiver<R>,
49 resolver: Arc<ResolvableFutureResolver<R>>,
50}
51impl<R> ResolvableFuture<R> {
52 pub fn new() -> Self {
53 let (tx, rx) = channel();
54
55 Self {
56 result: rx,
57 resolver: Arc::new(ResolvableFutureResolver::new(tx)),
58 }
59 }
60 pub fn get_resolver(&self) -> Arc<ResolvableFutureResolver<R>> {
61 self.resolver.clone()
62 }
63}
64impl<R> Default for ResolvableFuture<R> {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69impl<R> Future for ResolvableFuture<R> {
70 type Output = R;
71
72 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
73 log::trace!("TaskFuture::poll");
74 match self.result.try_recv() {
75 Ok(res) => {
76 log::trace!("TaskFuture::poll -> Ready");
77 Poll::Ready(res)
78 }
79 Err(_) => {
80 log::trace!("TaskFuture::poll -> Pending");
81 let mtx = &self.resolver.waker;
82 let waker_opt = &mut *mtx.lock("poll").unwrap();
83 if let Ok(res) = self.result.try_recv() {
84 log::trace!("TaskFuture::poll -> Ready @ 2");
85 Poll::Ready(res)
86 } else {
87 let _ = waker_opt.replace(cx.waker().clone());
88 Poll::Pending
89 }
90 }
91 }
92 }
93}
94
95#[cfg(test)]
96pub mod tests {
97 use crate::resolvable_future::ResolvableFuture;
98
99 #[tokio::test]
100 async fn my_test() {
101 let fut = ResolvableFuture::new();
102 let r = fut.resolver.clone();
103 tokio::spawn(async move {
104 match r.resolve("hi".to_string()) {
105 Ok(_) => {}
106 Err(e) => {
107 println!("could not resolve {e}");
108 }
109 }
110 });
111
112 let res = fut.await;
113
114 println!("res={res}");
115 }
116}