Context
I have an Iced powered GUI wrapping a backend library/CLI that is doing the heavy lifting of a long running CPU bound task.
This long running task provides an API to get progress updates out of it by providing the Send side of a synchronous channel.
Similar to the download_progress example, I want to provide a progress/status bar to the user while this long running process takes place.
Current (Broken) Solution
In my GUI subscription I do a check for if an internal state field is ready to begin processing or not, if not it returns subscription::none() otherwise it calls a helper that builds a Recipe, kicks off the long running process in another thread providing the channel and creates a Stream which polls the channel, and turns any progress updates.
Because the external API uses a synchronous channel I also have a sync → async thread that just pipes the messages from the synchronous channel into an async channel used by the GUI application.
EDIT: An important note I forgot! Because kickoff_process requires copying/cloning a bunch of state, I also immediately change the application state so that the next subscription() call does not try to kick off that long running process again. I later learned that so long as the long running process gets kicked off in the Future and not as part of the helper this at least I wouldn’t have multiple CPU intensive tasks running at once (because Subscription Recipes are de-duplicated internally) but I’d still have all that copy/cloning happening which is still less than ideal. I only mention this because perhaps it’s important that subscription() only returns a real Recipe/Subscription once?
Issue
The issue I’m running into is that the Receiver side of the async channel (the one stored in the Stream) is closing after the first message is recv()ed which causes all other progress updates to fail to send.
Code
Here is some trimmed down code the represents what I’m seeing:
pub fn subscription(&self) -> Subscription<Message> {
match self.state {
State::Ready => kickoff_process(
/* Lots of state provided that is required by the long running process */
)
.map(Message::Progressed),
_ => Subscription::none(),
}
}
// ...
pub fn kickoff_process(/* State */) -> Subscription<ApiProgress> {
let (atx, arx) = mpsc::unbounded_channel();
thread::spawn(move || {
let (tx, rx) = crossbeam_channel::unbounded();
thread::spawn(|| external_task.progress(tx).run()); // <-- The Long Running Process
while let Ok(p) = rx.recv() {
let _res = atx.send(p.clone());
if p == ApiProgress::Finished {
break;
}
}
});
Subscription::from_recipe(AsyncProgress {
done: false,
recv: Box::pin(arx),
})
}
pub(crate) struct AsyncProgress {
done: bool,
recv: Pin<Box<UnboundedReceiver<ApiProgress>>>,
}
impl Recipe for AsyncProgress {
type Output = ApiProgress;
fn hash(&self, state: &mut Hasher) {
TypeId::of::<Self>().hash(state);
}
fn stream(self: Box<Self>, _input: EventStream) -> BoxStream<'static, Self::Output> {
self.boxed()
}
}
impl Stream for AsyncProgress {
type Item = ApiProgress;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.recv.poll_recv(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(progress)) => Poll::Ready(Some(progress)),
Poll::Ready(None) => {
if self.done {
Poll::Ready(None)
} else {
self.done = true;
Poll::Ready(Some(ApiProgress::Finished))
}
}
}
}
}
I’ve tried a few variations of this as well, such as futures::stream::unfold but all to the same end, after the first message is recv()ed, the AsyncProgress gets polled again and then dropped.