Context
I have an Iced powered GUI wrapping a backend library/CLI that is doing the heavy lifting of a long running CPU bound task.
This long running task provides an API to get progress updates out of it by providing the Send side of a synchronous channel.
Similar to the download_progress
example, I want to provide a progress/status bar to the user while this long running process takes place.
Current (Broken) Solution
In my GUI subscription
I do a check for if an internal state
field is ready to begin processing or not, if not it returns subscription::none()
otherwise it calls a helper that builds a Recipe
, kicks off the long running process in another thread providing the channel and creates a Stream
which polls the channel, and turns any progress updates.
Because the external API uses a synchronous channel I also have a sync → async thread that just pipes the messages from the synchronous channel into an async channel used by the GUI application.
EDIT: An important note I forgot! Because kickoff_process
requires copying/cloning a bunch of state, I also immediately change the application state so that the next subscription()
call does not try to kick off that long running process again. I later learned that so long as the long running process gets kicked off in the Future
and not as part of the helper this at least I wouldn’t have multiple CPU intensive tasks running at once (because Subscription
Recipes
are de-duplicated internally) but I’d still have all that copy/cloning happening which is still less than ideal. I only mention this because perhaps it’s important that subscription()
only returns a real Recipe
/Subscription
once?
Issue
The issue I’m running into is that the Receiver
side of the async channel (the one stored in the Stream
) is closing after the first message is recv()
ed which causes all other progress updates to fail to send.
Code
Here is some trimmed down code the represents what I’m seeing:
pub fn subscription(&self) -> Subscription<Message> {
match self.state {
State::Ready => kickoff_process(
/* Lots of state provided that is required by the long running process */
)
.map(Message::Progressed),
_ => Subscription::none(),
}
}
// ...
pub fn kickoff_process(/* State */) -> Subscription<ApiProgress> {
let (atx, arx) = mpsc::unbounded_channel();
thread::spawn(move || {
let (tx, rx) = crossbeam_channel::unbounded();
thread::spawn(|| external_task.progress(tx).run()); // <-- The Long Running Process
while let Ok(p) = rx.recv() {
let _res = atx.send(p.clone());
if p == ApiProgress::Finished {
break;
}
}
});
Subscription::from_recipe(AsyncProgress {
done: false,
recv: Box::pin(arx),
})
}
pub(crate) struct AsyncProgress {
done: bool,
recv: Pin<Box<UnboundedReceiver<ApiProgress>>>,
}
impl Recipe for AsyncProgress {
type Output = ApiProgress;
fn hash(&self, state: &mut Hasher) {
TypeId::of::<Self>().hash(state);
}
fn stream(self: Box<Self>, _input: EventStream) -> BoxStream<'static, Self::Output> {
self.boxed()
}
}
impl Stream for AsyncProgress {
type Item = ApiProgress;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.recv.poll_recv(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(progress)) => Poll::Ready(Some(progress)),
Poll::Ready(None) => {
if self.done {
Poll::Ready(None)
} else {
self.done = true;
Poll::Ready(Some(ApiProgress::Finished))
}
}
}
}
}
I’ve tried a few variations of this as well, such as futures::stream::unfold
but all to the same end, after the first message is recv()
ed, the AsyncProgress
gets polled again and then dropped.