A problem and a possible solution for clogged mpsc channel in iced_futures

Problem:

My application has multiple subscription running like the subscription events that gives me the signal from user to close the app through the window::Event::CloseRequest. I am using it to close a child process before closing the app.

pub fn subscription(&self) -> Subscription<Message> {
        Subscription::batch(vec![
            iced_native::subscription::events().map(Message::Event),
            time::every(Duration::from_secs(5)).map(|_| Message::Tick),
            self.state.subscription(),
        ])
    }

Weirdly after a long time passed without touching the application window, I pass my mouse on it and suddenly it starts having a lot of logs and I am not able to close the app.

[...][iced_futures::subscription::tracker][WARN] Error sending event to subscription: TrySendError { kind: Full }
[...][iced_futures::subscription::tracker][WARN] Error sending event to subscription: TrySendError { kind: Full }
[...][iced_futures::subscription::tracker][WARN] Error sending event to subscription: TrySendError { kind: Full }
[...

I tried using the events_with method in order to listen only to the event I am interested with.

             },
-            iced_native::subscription::events().map(Self::Message::Event),
+            iced_native::subscription::events_with(|event, _status| {
+                if matches!(
+                    event,
+                    iced::Event::Window(iced_native::window::Event::CloseRequested)
+                ) {
+                    Some(event)
+                } else {
+                    None
+                }
+            })
+            .map(Self::Message::Event),
         ])
     }

But it does not solve the problem. After reading and understanding the iced_futures crate, It seems that:

  • every recipe has a 100 sized mpsc channel to receive system events.
  • for each system event, tracker clone the event and send it to each recipe future through each recipe respective channel.

It seems then that too much system events can clog the recipe channel, before the recipe future is able to consume them.

Suggested solution:

Recipe provide to the tracker a filter that can reject events that recipe is not interested in before they are sent in the recipe channel

I know that iced_futures may change in the future to use an async equivalent of the bus crate.
But for now, I found a small API addition to the iced_futures crate.

  1. Change the Recipe trait to have some sort of concerned_by method:
+    /// If set to true recipe is concerned by event.
+    fn concerned_by(&self) -> fn(&Event) -> bool {
+        |_| true
+    }
+

Like the With or Map, we could provide a sort of Filtered wrapper

struct Filtered<Hasher, Event, A> {
    recipe: Box<dyn Recipe<Hasher, Event, Output = A>>,
    filter: fn(&Event) -> bool,
}

impl<H, E, A> Filtered<H, E, A> {
    fn new(
        recipe: Box<dyn Recipe<H, E, Output = A>>,
        filter: fn(&E) -> bool,
    ) -> Self {
        Filtered { recipe, filter }
    }
}

impl<H, E, A> Recipe<H, E> for Filtered<H, E, A>
where
    A: 'static,
    H: std::hash::Hasher,
{
    type Output = A;

    fn hash(&self, state: &mut H) {
       ...
    }

    fn concerned_by(&self) -> fn(&E) -> bool {
        self.filter
    }

    fn stream(self: Box<Self>, input: BoxStream<E>) -> BoxStream<Self::Output> {
        ...
    }
}

Provide to the subscription this method to apply a filter to a large scope of recipe.

    /// Adds a filter to the [`Subscription`] context.
    ///
    /// The value will be part of the identity of a [`Subscription`].
    pub fn with_filter(
        mut self,
        filter: fn(&E) -> bool,
    ) -> Subscription<H, E, O>
    where
        H: 'static,
        E: 'static,
        O: 'static,
    {
        Subscription {
            recipes: self
                .recipes
                .drain(..)
                .map(|recipe| {
                    Box::new(Filtered::new(recipe, filter))
                        as Box<dyn Recipe<H, E, Output = O>>
                })
                .collect(),
        }
    }
  1. then use the concerned_by method in the tracker to track if a subscription recipe is concerned by an event before sending it
diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs
index 9fe110b0..99884aa3 100644
--- a/futures/src/subscription/tracker.rs
+++ b/futures/src/subscription/tracker.rs
 
 pub struct Execution<Event> {
     _cancel: futures::channel::oneshot::Sender<()>,
     listener: Option<futures::channel::mpsc::Sender<Event>>,
+    concerned_by: fn(&Event) -> bool,
+}
 
 impl<Hasher, Event> Tracker<Hasher, Event>
@@ -88,6 +94,8 @@ where
                 continue;
             }
 
+            let concerned_by = recipe.concerned_by();
+
             let (cancel, mut canceled) = futures::channel::oneshot::channel();
 
             // TODO: Use bus if/when it supports async
@@ -121,6 +129,7 @@ where
                     } else {
                         Some(event_sender)
                     },
+                    concerned_by,
                 },
             );
 
@@ -145,7 +154,15 @@ where
     pub fn broadcast(&mut self, event: Event) {
         self.subscriptions
             .values_mut()
-            .filter_map(|connection| connection.listener.as_mut())
+            .filter_map(|connection| {
+                if connection.listener.is_some()
+                    && (connection.concerned_by)(&event)
+                {
+                    connection.listener.as_mut()
+                } else {
+                    None
+                }
+            })
             .for_each(|listener| {
                 if let Err(error) = listener.try_send(event.clone()) {

In my opinion, the fix is not the most elegant one, because it relies on the user awareness of the problem to change his subscriptions by specifying which system events they are listening to,
but it is compatible with the current API and does not introduce breaking change and increases performance.

Please tell me what do you think of it
Should I create a PR ?

Thanks for the detailed post!

Your solution sounds reasonable, but I believe it is worth investigating the root cause of the problem.

  • Why is the application suddenly receiving a lot of events in the first place? What kind of events are occurring? Is winit buffering all the events while the application has lost focus? Is it possible for us to simply dismiss these events?
  • How come you cannot close the application afterwards? The burst of events should eventually end and the subscription channels should have new space.

Once we have more answers, we can think about a proper solution! Breaking changes are not a problem. I break the API all the time; iced is experimental after all!