Says, I have a Task<T> and a Task<U>, and I want to get a Task<(T, U)>.
Currently I use channels to do this, but it only works for tasks producing only one output. Working with the streams in Task is required to correctly implement this feature.
pub fn zip<T, U>(left: Task<T>, right: Task<U>) -> Task<(T, U)>
where
T: Send + 'static,
U: Send + 'static,
{
let (send_left, recv_left) = async_channel::bounded(1);
let (send_right, recv_right) = async_channel::bounded(1);
Task::chain(
left.then(move |value| {
let send = send_left.clone();
Task::future(async move { send.try_send(value).ok() })
}),
right.then(move |value| {
let send = send_right.clone();
Task::future(async move { send.try_send(value).ok() })
}),
)
.collect()
.then(move |_| {
let recv_left = recv_left.clone();
let recv_right = recv_right.clone();
Task::future(async move {
let left = recv_left.recv().await.unwrap();
let right = recv_right.recv().await.unwrap();
(left, right)
})
})
}
I think you would have options if you make T and U variants of the same Enum, in which case you could chain or batch the Tasks, collect their results, and create a new Task that zips those results in a tuple. With different types I don’t see a way to do this.
I made a PoC of this, but it looks complicated. Maybe there’s a better way.
Stream::zip seems doesn’t help because it zips all actions rather than the outputs.
Code
/// Zips two tasks together. The zipped task waits for both tasks to produce an output,
/// and then produces that pair.
pub fn zip<U>(self, other: Task<U>) -> Task<(T, U)>
where
T: MaybeSend + 'static,
U: MaybeSend + 'static,
{
let streams = match (self.stream, other.stream) {
(None, None) => return Task::none(),
(Some(stream), None) => return Task::stream(stream).discard(),
(None, Some(stream)) => return Task::stream(stream).discard(),
(Some(first), Some(second)) => (first.fuse(), second.fuse()),
};
let stream = stream::unfold(
(streams, Some((None, None))),
move |(mut streams, outputs)| async move {
let Some((first, second)) = outputs else {
// One of the streams is terminated.
// Yield the rest actions of the another stream.
let action = select! {
action = streams.0.select_next_some() => action.output().err(),
action = streams.1.select_next_some() => action.output().err(),
};
return action.map(|action| {
(Some(action), (streams, Some((None, None))))
});
};
let Some(first) = first else {
let Some(action) = streams.0.next().await else {
return Some((None, (streams, None)));
};
return match action.output() {
Ok(output) => {
Some((None, (streams, Some((Some(output), second)))))
}
Err(action) => Some((
Some(action),
(streams, Some((first, second))),
)),
};
};
let Some(second) = second else {
let Some(action) = streams.1.next().await else {
return Some((None, (streams, None)));
};
return match action.output() {
Ok(output) => {
Some((None, (streams, Some((Some(first), Some(output))))))
}
Err(action) => Some((
Some(action),
(streams, Some((Some(first), second))),
)),
};
};
Some((
Some(Action::Output((first, second))),
(streams, Some((None, None))),
))
},
)
.filter_map(future::ready);
Task {
stream: Some(boxed_stream(stream)),
units: self.units + other.units,
}
}
Also, a correct channel-based implementation that works with multiple outputs. This one looks simpler but requires mpmc, which is not provided by the futures crate.
Code
pub fn zip<T, U>(left: Task<T>, right: Task<U>) -> Task<(T, U)>
where
T: Send + 'static,
U: Send + 'static,
{
let (send_left, recv_left) = async_channel::bounded(1);
let (send_right, recv_right) = async_channel::bounded(1);
Task::batch([
left.then(move |value| {
let send = send_left.clone();
Task::future(async move { send.send(value).await }).discard()
}),
right.then(move |value| {
let send = send_right.clone();
Task::future(async move { send.send(value).await }).discard()
}),
Task::stream(unfold((), move |_| {
let recv_left = recv_left.clone();
let recv_right = recv_right.clone();
async move {
let left = recv_left.recv().await.ok();
let right = recv_right.recv().await.ok();
Option::zip(left, right).map(|pair| (pair, ()))
}
})),
])
}