Problem:
My application has multiple subscription running like the subscription events that gives me the signal from user to close the app through the window::Event::CloseRequest
. I am using it to close a child process before closing the app.
pub fn subscription(&self) -> Subscription<Message> {
Subscription::batch(vec![
iced_native::subscription::events().map(Message::Event),
time::every(Duration::from_secs(5)).map(|_| Message::Tick),
self.state.subscription(),
])
}
Weirdly after a long time passed without touching the application window, I pass my mouse on it and suddenly it starts having a lot of logs and I am not able to close the app.
[...][iced_futures::subscription::tracker][WARN] Error sending event to subscription: TrySendError { kind: Full }
[...][iced_futures::subscription::tracker][WARN] Error sending event to subscription: TrySendError { kind: Full }
[...][iced_futures::subscription::tracker][WARN] Error sending event to subscription: TrySendError { kind: Full }
[...
I tried using the events_with
method in order to listen only to the event I am interested with.
},
- iced_native::subscription::events().map(Self::Message::Event),
+ iced_native::subscription::events_with(|event, _status| {
+ if matches!(
+ event,
+ iced::Event::Window(iced_native::window::Event::CloseRequested)
+ ) {
+ Some(event)
+ } else {
+ None
+ }
+ })
+ .map(Self::Message::Event),
])
}
But it does not solve the problem. After reading and understanding the iced_futures crate, It seems that:
- every recipe has a 100 sized mpsc channel to receive system events.
- for each system event, tracker clone the event and send it to each recipe future through each recipe respective channel.
It seems then that too much system events can clog the recipe channel, before the recipe future is able to consume them.
Suggested solution:
Recipe provide to the tracker a filter that can reject events that recipe is not interested in before they are sent in the recipe channel
I know that iced_futures
may change in the future to use an async equivalent of the bus
crate.
But for now, I found a small API addition to the iced_futures
crate.
- Change the Recipe trait to have some sort of
concerned_by
method:
+ /// If set to true recipe is concerned by event.
+ fn concerned_by(&self) -> fn(&Event) -> bool {
+ |_| true
+ }
+
Like the With
or Map
, we could provide a sort of Filtered
wrapper
struct Filtered<Hasher, Event, A> {
recipe: Box<dyn Recipe<Hasher, Event, Output = A>>,
filter: fn(&Event) -> bool,
}
impl<H, E, A> Filtered<H, E, A> {
fn new(
recipe: Box<dyn Recipe<H, E, Output = A>>,
filter: fn(&E) -> bool,
) -> Self {
Filtered { recipe, filter }
}
}
impl<H, E, A> Recipe<H, E> for Filtered<H, E, A>
where
A: 'static,
H: std::hash::Hasher,
{
type Output = A;
fn hash(&self, state: &mut H) {
...
}
fn concerned_by(&self) -> fn(&E) -> bool {
self.filter
}
fn stream(self: Box<Self>, input: BoxStream<E>) -> BoxStream<Self::Output> {
...
}
}
Provide to the subscription this method to apply a filter to a large scope of recipe.
/// Adds a filter to the [`Subscription`] context.
///
/// The value will be part of the identity of a [`Subscription`].
pub fn with_filter(
mut self,
filter: fn(&E) -> bool,
) -> Subscription<H, E, O>
where
H: 'static,
E: 'static,
O: 'static,
{
Subscription {
recipes: self
.recipes
.drain(..)
.map(|recipe| {
Box::new(Filtered::new(recipe, filter))
as Box<dyn Recipe<H, E, Output = O>>
})
.collect(),
}
}
- then use the
concerned_by
method in the tracker to track if a subscription recipe is concerned by an event before sending it
diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs
index 9fe110b0..99884aa3 100644
--- a/futures/src/subscription/tracker.rs
+++ b/futures/src/subscription/tracker.rs
pub struct Execution<Event> {
_cancel: futures::channel::oneshot::Sender<()>,
listener: Option<futures::channel::mpsc::Sender<Event>>,
+ concerned_by: fn(&Event) -> bool,
+}
impl<Hasher, Event> Tracker<Hasher, Event>
@@ -88,6 +94,8 @@ where
continue;
}
+ let concerned_by = recipe.concerned_by();
+
let (cancel, mut canceled) = futures::channel::oneshot::channel();
// TODO: Use bus if/when it supports async
@@ -121,6 +129,7 @@ where
} else {
Some(event_sender)
},
+ concerned_by,
},
);
@@ -145,7 +154,15 @@ where
pub fn broadcast(&mut self, event: Event) {
self.subscriptions
.values_mut()
- .filter_map(|connection| connection.listener.as_mut())
+ .filter_map(|connection| {
+ if connection.listener.is_some()
+ && (connection.concerned_by)(&event)
+ {
+ connection.listener.as_mut()
+ } else {
+ None
+ }
+ })
.for_each(|listener| {
if let Err(error) = listener.try_send(event.clone()) {
In my opinion, the fix is not the most elegant one, because it relies on the user awareness of the problem to change his subscriptions by specifying which system events they are listening to,
but it is compatible with the current API and does not introduce breaking change and increases performance.
Please tell me what do you think of it
Should I create a PR ?