Channels within a Subscription (Receiver being dropped)

Context

I have an Iced powered GUI wrapping a backend library/CLI that is doing the heavy lifting of a long running CPU bound task.

This long running task provides an API to get progress updates out of it by providing the Send side of a synchronous channel.

Similar to the download_progress example, I want to provide a progress/status bar to the user while this long running process takes place.

Current (Broken) Solution

In my GUI subscription I do a check for if an internal state field is ready to begin processing or not, if not it returns subscription::none() otherwise it calls a helper that builds a Recipe, kicks off the long running process in another thread providing the channel and creates a Stream which polls the channel, and turns any progress updates.

Because the external API uses a synchronous channel I also have a sync → async thread that just pipes the messages from the synchronous channel into an async channel used by the GUI application.

EDIT: An important note I forgot! Because kickoff_process requires copying/cloning a bunch of state, I also immediately change the application state so that the next subscription() call does not try to kick off that long running process again. I later learned that so long as the long running process gets kicked off in the Future and not as part of the helper this at least I wouldn’t have multiple CPU intensive tasks running at once (because Subscription Recipes are de-duplicated internally) but I’d still have all that copy/cloning happening which is still less than ideal. I only mention this because perhaps it’s important that subscription() only returns a real Recipe/Subscription once?

Issue

The issue I’m running into is that the Receiver side of the async channel (the one stored in the Stream) is closing after the first message is recv()ed which causes all other progress updates to fail to send.

Code

Here is some trimmed down code the represents what I’m seeing:

    pub fn subscription(&self) -> Subscription<Message> {
        match self.state {
            State::Ready => kickoff_process(
                /* Lots of state provided that is required by the long running process */
            )
            .map(Message::Progressed),
            _ => Subscription::none(),
        }
    }

// ...

pub fn kickoff_process(/* State */) -> Subscription<ApiProgress> {
    let (atx, arx) = mpsc::unbounded_channel();

    thread::spawn(move || {
        let (tx, rx) = crossbeam_channel::unbounded();
        thread::spawn(|| external_task.progress(tx).run()); // <-- The Long Running Process

        while let Ok(p) = rx.recv() {
            let _res = atx.send(p.clone());
            if p == ApiProgress::Finished {
                break;
            }
        }
    });

    Subscription::from_recipe(AsyncProgress {
        done: false,
        recv: Box::pin(arx),
    })
}

pub(crate) struct AsyncProgress {
    done: bool,
    recv: Pin<Box<UnboundedReceiver<ApiProgress>>>,
}

impl Recipe for AsyncProgress {
    type Output = ApiProgress;

    fn hash(&self, state: &mut Hasher) {
        TypeId::of::<Self>().hash(state);
    }

    fn stream(self: Box<Self>, _input: EventStream) -> BoxStream<'static, Self::Output> {
        self.boxed()
    }
}

impl Stream for AsyncProgress {
    type Item = ApiProgress;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match self.recv.poll_recv(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(Some(progress)) => Poll::Ready(Some(progress)),
            Poll::Ready(None) => {
                if self.done {
                    Poll::Ready(None)
                } else {
                    self.done = true;
                    Poll::Ready(Some(ApiProgress::Finished))
                }
            }
        }
    }
}

I’ve tried a few variations of this as well, such as futures::stream::unfold but all to the same end, after the first message is recv()ed, the AsyncProgress gets polled again and then dropped.

Looks like I solved this, so in case anyone else runs into something similar it turned out the edit I made above was exactly the problem. I’ll quote myself so you don’t have to scroll up:

EDIT: An important note I forgot! Because kickoff_process requires copying/cloning a bunch of state, I also immediately change the application state so that the next subscription() call does not try to kick off that long running process again. I later learned that so long as the long running process gets kicked off in the Future and not as part of the helper this at least I wouldn’t have multiple CPU intensive tasks running at once (because Subscription Recipes are de-duplicated internally) but I’d still have all that copy/cloning happening which is still less than ideal. I only mention this because perhaps it’s important that subscription() only returns a real Recipe/Subscription once?

Perhaps older versions of the docs say that if you stop returning a subscription with the ID in question (e.g. you start returning subscription::none()) your Stream will get canceled.

Current versions of the docs say:

A Subscription is normally provided to some runtime, like a Command, and it will generate events as long as the user keeps requesting it.

Which I’d seen…but I didn’t really know what “as long as the user keeps requesting it” meant.

I refactored the code so that the copy/cloning nonsense happens once, which also creates the channels and stores the Receive side in the application state. Then on subscription() the Receiver is cloned (which lives inside an Option<Arc<Mutex<T>>>) and attempts to create the Recipe. Because of how the iced_futures::Tracker works, that ID is deduplicated and no additional Stream is created. The only overhead is the cloning of an Arc which is fine (although if I could eliminate it, that’d be cool too).

Ideally, you’d be able to create the channel and move the Receiver into the async task inside of Recipe::stream, however in my case creating that channel required having access to all that state required to call the long running task, so cloning all that state on each subscription() call was worse than just cloning an Arc.

You may find the new command::channel API useful. It’s only available in master currently, however.