Questions about subscriptions

I posted a question on Discord about how to manage a serial connection in Iced. I got some pointers and was referred to the websocket example in the repo. This somewhat helped me to understand how to set this up but I have a couple more general questions about how Subscriptions work that I wasn’t able to find an answer to. I’ll post them here because they may be relevant to other people as well and it’s easier to find here than on Discord.

  1. What exactly is a subscription? The docs for Application::subscription say

A Subscription will be kept alive as long as you keep returning it, and the messages produced will be handled by update.

What does “keep returning it” mean here? That bring my to the following question:

  1. When is Application::subscription called? It returns a Subscription<Self::Message> so with the above note from the docs that leads to me to think this function is called repeatedly. But in the websocket example the connect function starts it’s own loop to listen for messages from the websocket so that implies subscription is only called once (otherwise you’d have multiple loops running at the same time).

  2. In my case I want to start and stop a connection when the user clicks “Connect” and “Disconnect” buttons. Does that mean I need to start and stop a subscription? Ho would I do that? subscription takes a reference to the struct that implements the Application trait but how do I use that here? Or should the subscription keep running (whatever that means) and communicate via messages that the connection should be opened and closed?

  3. I’m using the mavlink crate to communicate with a UAV and this library is not async. I can wrap everything in an async block but code that potentially blocks the thread should by run on a separate thread with tokio::task::spawn_blocking. It’s not clear to me which apart of the code should be ran like this and how this interacts with the Iced runtime. I included my code below.

main.rs:

use iced::{
    executor,
    widget::{column, container, scrollable, text, Button, Column},
    Alignment, Application, Color, Command, Element, Length, Renderer, Settings, Subscription,
    Theme,
};
use mavlink::ardupilotmega::MavMessage;

#[derive(Default)]
struct App {
    state: ConnectionState,
    messages: Vec<MavMessage>,
}

#[derive(Debug, Default)]
enum ConnectionState {
    #[default]
    Disconnected,
    Connected(connection::Connection),
}

impl ConnectionState {
    fn is_connected(&self) -> bool {
        match self {
            ConnectionState::Disconnected => false,
            ConnectionState::Connected(_) => true,
        }
    }
}

#[derive(Debug, Clone)]
enum Message {
    Connect,
    Disconnect,
    ConnectionEvent(connection::Event),
}

impl Application for App {
    type Executor = executor::Default;
    type Message = Message;
    type Theme = Theme;
    type Flags = ();

    fn new(_flags: Self::Flags) -> (Self, Command<Message>) {
        (Self::default(), Command::none())
    }

    fn title(&self) -> String {
        "Iced Mavlink Connection Question".to_string()
    }

    fn update(&mut self, message: Message) -> Command<Message> {
        match message {
            Message::Connect => todo!("Open the connection"),
            Message::Disconnect => todo!("Close the connection"),
            Message::ConnectionEvent(event) => match event {
                connection::Event::Connected(connection) => {
                    println!("Connection started");
                    self.state = ConnectionState::Connected(connection);
                }
                connection::Event::FailedToConnect => {
                    println!("Failed to create connection")
                }
                connection::Event::Disconnected => {
                    println!("Connection closed");
                    self.state = ConnectionState::Disconnected;
                }
                connection::Event::MavMessage(message) => {
                    self.messages.push(message);
                }
            },
        }
        Command::none()
    }

    fn view(&self) -> Element<Message, Renderer<Theme>> {
        let connect = Button::new("Connect")
            .on_press_maybe((!self.state.is_connected()).then(|| Message::Connect));
        let disconnect = Button::new("Disconnect")
            .on_press_maybe(self.state.is_connected().then(|| Message::Disconnect));

        let message_log: Element<_> = if self.messages.is_empty() {
            container(
                text("Your messages will appear here...").style(Color::from_rgb8(0x88, 0x88, 0x88)),
            )
            .width(Length::Fill)
            .height(Length::Fill)
            .center_x()
            .center_y()
            .into()
        } else {
            scrollable(
                Column::with_children(
                    self.messages
                        .iter()
                        .cloned()
                        .map(|m| text(format!("{:?}", m)))
                        .map(Element::from)
                        .collect(),
                )
                .width(Length::Fill)
                .spacing(10),
            )
            // .id(MESSAGE_LOG.clone())
            .height(Length::Fill)
            .into()
        };

        container(
            column![connect, disconnect, message_log]
                .spacing(10)
                .align_items(Alignment::Center),
        )
        .width(Length::Fill)
        .height(Length::Fill)
        .padding(20)
        .into()
    }

    fn subscription(&self) -> Subscription<Self::Message> {
        connection::connect().map(Message::ConnectionEvent)
    }
}

fn main() -> iced::Result {
    App::run(Settings::default())
}

mod connection {

    use std::time::Duration;

    use iced::{
        futures::{channel::mpsc, SinkExt},
        subscription, Subscription,
    };
    use mavlink::{ardupilotmega::MavMessage, MavConnection};

    enum State {
        Disconnected,
        Connected {
            mavlink_connection: Box<dyn MavConnection<MavMessage> + Sync + Send>,
            input_receiver: mpsc::Receiver<InputMessage>,
        },
    }

    #[derive(Debug, Clone)]
    pub enum Event {
        Connected(Connection),
        FailedToConnect,
        Disconnected,
        MavMessage(MavMessage),
    }

    enum InputMessage {
        GetParameter(String),
    }

    #[derive(Debug, Clone)]
    pub struct Connection(mpsc::Sender<InputMessage>);

    pub fn connect() -> Subscription<Event> {
        struct Connect;
        subscription::channel(
            std::any::TypeId::of::<Connect>(),
            100,
            |mut output| async move {
                let mut state = State::Disconnected;

                loop {
                    match &mut state {
                        State::Disconnected => {
                            const ADDRESS: &str = "serial:/dev/ttyACM0:115200";
                            match mavlink::connect::<MavMessage>(ADDRESS) {
                                Ok(conn) => {
                                    let (sender, receiver) = mpsc::channel(100);
                                    let _ = output.send(Event::Connected(Connection(sender))).await;
                                    state = State::Connected {
                                        mavlink_connection: conn,
                                        input_receiver: receiver,
                                    };
                                }
                                Err(_) => {
                                    tokio::time::sleep(Duration::from_secs(1)).await;
                                    let _ = output.send(Event::FailedToConnect).await;
                                }
                            }
                        }
                        State::Connected {
                            mavlink_connection,
                            input_receiver: _,
                        } => match mavlink_connection.recv() {
                            Ok((_header, msg)) => {
                                let _ = output.send(Event::MavMessage(msg)).await;
                            }
                            Err(_) => {
                                let _ = output.send(Event::Disconnected).await;
                            }
                        },
                    }
                }
            },
        )
    }
}

Cargo.toml:

[package]
name = "iced-connection-question"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.75"
mavlink = "0.12.0"
iced = { version = "0.10.0", features = ["tokio"] }
rand = "0.8.5"
tokio = { version = "1.32.0", features = ["time"] }

A list of “recipes” that can be used to create Streams that produce messages.

Creating a Subscription on its own doesn’t do anything, just like creating a Command doesn’t execute it.

Same as every other method in Application! view is called repeatedly, and yet widgets do not necessarily lose their internal state between calls.

Subscriptions have internal identifiers that are used to ensure continuity. A subscription::Tracker takes care of this.

You should be able to stop returning it and the stream that the Subscription produced initially will be dropped by the iced runtime.

Every call that can potentially block. mavlink::connect and mavlink_connection.recv() would be the main contenders here.

On Discord I was advised to do something like this:

fn subscription(&self) -> Subscription<Self::Message> {
    if self.running {
        connection::connect().map(Message::ConnectionEvent)
    } else {
        Subscription::none()
    }
    
}

Is that what you mean by “stop returning it”? My alternative approach is to manage the mavlink connection over the same channel that’s used to send messages into the future. Not sure if that’s better.

use connection::InputMessage;
use iced::{
    executor,
    futures::channel::mpsc,
    widget::{column, container, scrollable, text, Button, Column},
    Alignment, Application, Color, Command, Element, Length, Renderer, Settings, Subscription,
    Theme,
};
use mavlink::ardupilotmega::MavMessage;

#[derive(Default)]
struct App {
    sender: Option<mpsc::Sender<InputMessage>>,
    state: ConnectionState,
    messages: Vec<MavMessage>,
}

#[derive(Debug, Default)]
enum ConnectionState {
    #[default]
    Startup,
    Pending,
    Connected,
}

impl ConnectionState {
    fn is_connected(&self) -> bool {
        match self {
            ConnectionState::Connected => true,
            _ => false,
        }
    }
}

#[derive(Debug, Clone)]
enum Message {
    Connect,
    Disconnect,
    ConnectionEvent(connection::Event),
}

impl Application for App {
    type Executor = executor::Default;
    type Message = Message;
    type Theme = Theme;
    type Flags = ();

    fn new(_flags: Self::Flags) -> (Self, Command<Message>) {
        (Self::default(), Command::none())
    }

    fn title(&self) -> String {
        "Iced Mavlink Connection Question".to_string()
    }

    fn update(&mut self, message: Message) -> Command<Message> {
        match message {
            Message::Connect => self
                .sender
                .as_mut()
                .unwrap()
                .try_send(InputMessage::Connect)
                .unwrap(),
            Message::Disconnect => self
                .sender
                .as_mut()
                .unwrap()
                .try_send(InputMessage::Disconnect)
                .unwrap(),

            Message::ConnectionEvent(event) => match event {
                connection::Event::ConnectionPending(sender) => {
                    self.sender = Some(sender);
                }
                connection::Event::Connected => {
                    self.state = ConnectionState::Connected;
                    println!("Connection started");
                }
                connection::Event::FailedToConnect => {
                    println!("Failed to create connection")
                }
                connection::Event::Disconnected => {
                    self.state = ConnectionState::Pending;
                    println!("Connection closed");
                }
                connection::Event::MavMessage(message) => {
                    self.messages.push(message);
                }
            },
        }
        Command::none()
    }

    fn view(&self) -> Element<Message, Renderer<Theme>> {
        let connect = Button::new("Connect")
            .on_press_maybe((!self.state.is_connected()).then(|| Message::Connect));
        let disconnect = Button::new("Disconnect")
            .on_press_maybe(self.state.is_connected().then(|| Message::Disconnect));

        let message_log: Element<_> = if self.messages.is_empty() {
            container(
                text("Your messages will appear here...").style(Color::from_rgb8(0x88, 0x88, 0x88)),
            )
            .width(Length::Fill)
            .height(Length::Fill)
            .center_x()
            .center_y()
            .into()
        } else {
            scrollable(
                Column::with_children(
                    self.messages
                        .iter()
                        .cloned()
                        .map(|m| text(format!("{:?}", m)))
                        .map(Element::from)
                        .collect(),
                )
                .width(Length::Fill)
                .spacing(10),
            )
            // .id(MESSAGE_LOG.clone())
            .height(Length::Fill)
            .into()
        };

        container(
            column![connect, disconnect, message_log]
                .spacing(10)
                .align_items(Alignment::Center),
        )
        .width(Length::Fill)
        .height(Length::Fill)
        .padding(20)
        .into()
    }

    fn subscription(&self) -> Subscription<Self::Message> {
        connection::connect().map(Message::ConnectionEvent)
    }
}

fn main() -> iced::Result {
    App::run(Settings::default())
}

mod connection {

    use std::time::Duration;

    use iced::{
        futures::{channel::mpsc, SinkExt},
        subscription, Subscription,
    };
    use mavlink::{ardupilotmega::MavMessage, MavConnection};

    enum State {
        Disconnected,
        Connected {
            mavlink_connection: Box<dyn MavConnection<MavMessage> + Sync + Send>,
        },
    }

    #[derive(Debug, Clone)]
    pub enum Event {
        ConnectionPending(mpsc::Sender<InputMessage>),
        Connected,
        FailedToConnect,
        Disconnected,
        MavMessage(MavMessage),
    }

    pub enum InputMessage {
        Connect,
        Disconnect,
    }

    pub fn connect() -> Subscription<Event> {
        struct Connect;
        subscription::channel(
            std::any::TypeId::of::<Connect>(),
            100,
            |mut output| async move {
                // Start the subscription future and return the sender to send messages into the subscription
                let (sender, mut receiver) = mpsc::channel(100);
                let _ = output.send(Event::ConnectionPending(sender)).await;

                let mut state = State::Disconnected;

                loop {
                    match &mut state {
                        State::Disconnected => {
                            if let Ok(Some(InputMessage::Connect)) = receiver.try_next() {
                                println!("Connecting");
                                const ADDRESS: &str = "serial:/dev/ttyACM0:115200";
                                match tokio::task::block_in_place(|| {
                                    mavlink::connect::<MavMessage>(ADDRESS)
                                }) {
                                    Ok(conn) => {
                                        state = State::Connected {
                                            mavlink_connection: conn,
                                        };
                                        let _ = output.send(Event::Connected).await;
                                    }
                                    Err(_) => {
                                        tokio::time::sleep(Duration::from_secs(1)).await;
                                        let _ = output.send(Event::FailedToConnect).await;
                                    }
                                }
                            }
                            tokio::time::sleep(Duration::from_secs(1)).await;
                        }

                        State::Connected { mavlink_connection } => {
                            match tokio::task::block_in_place(|| mavlink_connection.recv()) {
                                Ok((_header, msg)) => {
                                    let _ = output.send(Event::MavMessage(msg)).await;
                                    tokio::time::sleep(Duration::from_millis(1)).await;
                                }
                                Err(_) => {
                                    state = State::Disconnected;
                                    tokio::time::sleep(Duration::from_secs(1)).await;
                                    let _ = output.send(Event::Disconnected).await;
                                }
                            }
                            if let Ok(Some(InputMessage::Disconnect)) = receiver.try_next() {
                                state = State::Disconnected;
                                let _ = output.send(Event::Disconnected).await;
                            }
                        }
                    }
                }
            },
        )
    }
}

I’ve wrapped these in tokio::task::block_in_place. Is that okay within the iced runtime? The tokio docs mention this will still block code running in the same task but I’m not sure if that applies here.

I personally would use spawn_blocking in tokio::task - Rust for the reasons state on here block_in_place in tokio::task - Rust.
" Be aware that although this function avoids starving other independently spawned tasks, any other code running concurrently in the same task will be suspended during the call to block_in_place. This can happen e.g. when using the join! macro. To avoid this issue, use spawn_blocking instead of block_in_place."

Does it make a difference here? I ask because switching to spawn_blocking and awaiting the JoinHandlecreates lifetime issues because I’m taking a mutable borrow to state in the match and spawn_blocking requires state is borrowed with static lifetime because it’s accessed from another thread.

EDIT: never mind that. It was an easy fix.

i think it will be fine in this case. just something to remember

I made some more progress but I have a follow-up question. After a connection has been established I want to fetch some data from the connection. This can take up to a minute and I want to display a progress bar and provide the option to cancel the operation. This should illustrate what I’m trying to achieve (I removed dependencies on crates other than iced and tokio so you can run this without specific hardware).

main.rs:

use std::fmt::Display;

use connection::InputMessage;
use iced::{
    executor,
    futures::channel::mpsc,
    widget::{button, column, container, progress_bar, row, text},
    Application, Command, Element, Length, Renderer, Settings, Subscription, Theme,
};

#[derive(Default)]
struct App {
    sender: Option<mpsc::Sender<InputMessage>>,
    state: ConnectionState,
    result: Option<String>,
    progress: Option<u32>,
}

#[derive(Debug, Default)]
enum ConnectionState {
    #[default]
    Pending,
    Connected,
    Disconnected,
}

impl ConnectionState {
    fn is_connected(&self) -> bool {
        match self {
            ConnectionState::Connected => true,
            _ => false,
        }
    }
}

impl Display for ConnectionState {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            ConnectionState::Pending => write!(f, "Pending"),
            ConnectionState::Connected => write!(f, "Connected"),
            ConnectionState::Disconnected => write!(f, "Disconnected"),
        }
    }
}

#[derive(Debug, Clone)]
enum Message {
    Connect,
    Disconnect,
    GetParams,
    Cancel,
    ConnectionEvent(connection::Event),
}

impl Application for App {
    type Executor = executor::Default;
    type Message = Message;
    type Theme = Theme;
    type Flags = ();

    fn new(_flags: Self::Flags) -> (Self, Command<Message>) {
        (Self::default(), Command::none())
    }

    fn title(&self) -> String {
        "Iced Mavlink Connection Question".to_string()
    }

    fn theme(&self) -> Theme {
        Theme::Dark
    }

    fn update(&mut self, message: Message) -> Command<Message> {
        match message {
            Message::Connect => self.send_message_to_subscription(InputMessage::Connect),
            Message::Disconnect => self.send_message_to_subscription(InputMessage::Disconnect),
            Message::GetParams => self.send_message_to_subscription(InputMessage::GetParameters),
            Message::Cancel => self.send_message_to_subscription(InputMessage::Cancel),

            Message::ConnectionEvent(event) => match event {
                connection::Event::ConnectionPending(sender) => {
                    self.state = ConnectionState::Disconnected;
                    self.sender = Some(sender);
                }
                connection::Event::Connected => {
                    self.state = ConnectionState::Connected;
                    println!("Connection started");
                }
                connection::Event::Disconnected => {
                    self.state = ConnectionState::Disconnected;
                    println!("Connection closed");
                }
                connection::Event::Progress(progress) => self.progress = Some(progress),
                connection::Event::Result(value) => self.result = Some(value),
            },
        }
        Command::none()
    }

    fn view(&self) -> Element<Message, Renderer<Theme>> {
        let controls = {
            let connect = button("Connect")
                .on_press_maybe((!self.state.is_connected()).then(|| Message::Connect));
            let disconnect = button("Disconnect")
                .on_press_maybe(self.state.is_connected().then(|| Message::Disconnect));

            let get_params = button("Get params")
                .on_press_maybe(self.state.is_connected().then(|| Message::GetParams));

            let cancel = button("Cancel").on_press(Message::Cancel);

            row![connect, disconnect, get_params, cancel].spacing(10)
        };

        let status = {
            let state = text(self.state.to_string());
            let progress = progress_bar(0.0..=9.0, self.progress.unwrap_or(0) as f32).width(200);
            let result = text(self.result.as_ref().unwrap_or(&"None yet".to_string()));
            row![state, progress, result].spacing(10)
        };

        container(column![controls, status].spacing(10))
            .width(Length::Fill)
            .height(Length::Fill)
            .padding(10)
            .into()
    }

    fn subscription(&self) -> Subscription<Self::Message> {
        connection::connection().map(Message::ConnectionEvent)
    }
}

impl App {
    fn send_message_to_subscription(&mut self, message: InputMessage) {
        self.sender.as_mut().unwrap().try_send(message).unwrap()
    }
}

fn main() -> iced::Result {
    App::run(Settings::default())
}

mod connection {

    use std::{sync::Arc, thread, time::Duration};

    use iced::{
        futures::{channel::mpsc, SinkExt, StreamExt},
        subscription, Subscription,
    };
    use tokio::time::timeout;

    type Connection = Arc<String>;

    enum State {
        Disconnected,
        Connected { connection: Connection },
    }

    #[derive(Debug, Clone)]
    pub enum Event {
        ConnectionPending(mpsc::Sender<InputMessage>),
        Connected,
        Disconnected,
        Progress(u32),
        Result(String),
    }

    pub enum InputMessage {
        Connect,
        GetParameters,
        Cancel,
        Disconnect,
        None,
    }

    pub fn connection() -> Subscription<Event> {
        struct Connect;
        subscription::channel(
            std::any::TypeId::of::<Connect>(),
            100,
            |mut output| async move {
                // Start the subscription future and return the sender to send messages into the subscription back to the application
                let (sender, mut receiver) = mpsc::channel(100);
                let _ = output.send(Event::ConnectionPending(sender)).await;

                let mut state = State::Disconnected;

                loop {
                    match &mut state {
                        State::Disconnected => {
                            if let InputMessage::Connect = receiver.select_next_some().await {
                                state = State::Connected {
                                    connection: Arc::new("Connection".to_string()),
                                };
                                let _ = output.send(Event::Connected).await;
                            }
                        }

                        State::Connected { connection } => {
                            match receive_input_message(&mut receiver).await {
                                InputMessage::Connect => panic!("Already connected"),
                                InputMessage::Disconnect => {
                                    state = State::Disconnected;
                                    let _ = output.send(Event::Disconnected).await;
                                }
                                InputMessage::GetParameters => {
                                    let result =
                                        get_parameters(connection.clone(), output.clone()).await;
                                    let _ = output.send(Event::Result(result)).await;
                                }
                                InputMessage::None => {
                                    // Here I poll the connection and report status back to application
                                }
                                InputMessage::Cancel => {
                                    // How do a cancel the `get_parameters` task?
                                }
                            }
                        }
                    }
                }
            },
        )
    }

    async fn receive_input_message(rx: &mut mpsc::Receiver<InputMessage>) -> InputMessage {
        // Try to receive a message. If there isn't one, every second default to a `None` message which triggers polling the connection
        timeout(Duration::from_secs(1), rx.select_next_some())
            .await
            .unwrap_or(InputMessage::None)
    }

    async fn get_parameters(connection: Connection, mut output: mpsc::Sender<Event>) -> String {
        tokio::task::spawn_blocking({
            let conn = connection.clone();
            move || {
                for i in 0..10 {
                    println!("Using connection {}", conn);
                    thread::sleep(Duration::from_secs(1));
                    let _ = output.send(Event::Progress(i)); // Cannot await here because the block is not async
                }
                "Result that took a long time to download or compute or whatever".to_string()
            }
        })
        .await
        .unwrap()
    }
}

Cargo.toml:

[package]
name = "iced-connection-question"
version = "0.1.0"
edition = "2021"

[dependencies]
iced = { version = "0.10.0", features = ["tokio"] }
tokio = { version = "1.32.0", features = ["time", "rt-multi-thread"] }

My questions:

  1. I’m passing the mpsc::Sender<Event> into get_parameters to send the progress updates to the application but the closure in spawn_blocking is not async so I’m not able to await sending the event. This causes the event to not be send. How can I solve this?
  2. While get_parameters is running the loop is blocked so a message to cancel the task won’t be received. My first idea was to spawn the task instead of awaiting it immediately and adding a Downloading variant to State that holds the JoinHandle (and the Connection so it doesn’t get dropped). But this just shifts the problem to the State::Downloading match arm:
loop {
    match &mut state {
        // ...
        State::Connected { connection } => {
            match receive_input_message(&mut receiver).await {
                // ...
                InputMessage::GetParameters => {
                    let handle = tokio::task::spawn(get_parameters(
                        connection.clone(),
                        output.clone(),
                    ));
                    state = State::Downloading {
                        connection: connection.clone(),
                        handle,
                    };
                }
                // ...
            }
        }
        State::Downloading { connection, handle } => {
            let result = handle.await.unwrap(); // Also can't listen for `InputMessage::Cancel` here.
            state = State::Connected {
                connection: connection.clone(),
            };
            let _ = output.send(Event::Result(result)).await;
        }
    }
}

An alternative would be to listen for the cancel instruction from within get_parameters and returning early when the message is that happens. The problem is that receive_input_message needs to be awaited so this creates the same problem as in 1.

Both approaches feel rather clunky because I’m manually managing tasks instead of letting a runtime do it for me. I found tokio_util::sync::CancellationToken which exists specifically to cancel tasks but I don’t see from where I would call token.cancel().

Is there a nicer way to do this?

Well, first thing that comes to my mind, about not being able to cancel a download, just tokio::select! with your handle.await and receive_input_message

I gave it a try but it’s not working as expected.

match &mut state {
    // ...
    State::Connected { connection } => {
        match receive_input_message(&mut receiver).await {
            // ...
            InputMessage::GetParameters => {
                let result = select! {
                    result = get_parameters(connection.clone(), output.clone()) => Some(result),
                    _ = receive_cancel_message(&mut receiver) => {
                        println!("Cancelled future");
                        None
                    }
                };
                match result {
                    Some(result) => {
                        let _ = output.send(Event::Result(result)).await;
                    }
                    None => {
                        let _ = output.send(Event::Cancelled).await;
                    }
                }
            }
            // ...
        }
    }
}
async fn receive_cancel_message(rx: &mut mpsc::Receiver<InputMessage>) {
    loop {
        if let InputMessage::Cancel = rx.select_next_some().await {}
        {
            return;
        }
    }
}

async fn get_parameters(connection: Connection, mut output: mpsc::Sender<Event>) -> String {
    tokio::task::spawn_blocking({
        println!("Spawned task");
        let _conn = connection.clone();
        move || {
            for i in 0..10 {
                println!("Task progress {}", i);
                thread::sleep(Duration::from_secs(1));
                let _ = output.send(Event::Progress(i)); // Cannot await here because the block is not async
            }
            println!("Task completed");
            "Result that took a long time to download or compute or whatever".to_string()
        }
    })
    .await
    .unwrap()
}

When clicking the cancel button it does print “Cancelled future” but the spawned task doesn’t get cancelled. That wouldn’t be the worst thing but when you repeatedly click the “Get parameters” button it starts the task multiple times. This doesn’t make sense to me because it implies the loop continues after select!while I expected it to wait for either one of the futures to resolve.

can’t you just cancel it by dropping it?
what is blocking in get_parameters?

Not blocking the thread but I’m awaiting the task that I spawn so it should block the execution of the loop.

I’ve since changed to using async-mavlink which allowed me to refactor this code so for now the problem is solved.

Just in case, there is an example showing exactly that https://github.com/iced-rs/iced/tree/master/examples/download_progress

This example doesn’t show how to cancel a download.