Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamExt::map_while #2856

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
mod stream;
pub use self::stream::{
All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten,
Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan,
SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then,
TryFold, TryForEach, Unzip, Zip,
Fold, ForEach, Fuse, Inspect, Map, MapWhile, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable,
Scan, SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile,
Then, TryFold, TryForEach, Unzip, Zip,
};

#[cfg(feature = "std")]
Expand Down Expand Up @@ -53,8 +53,8 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream};
mod try_stream;
pub use self::try_stream::{
try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, TryAll,
TryAny, TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext, TrySkipWhile,
TryStreamExt, TryTakeWhile, TryUnfold,
TryAny, TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryMapWhile, TryNext,
TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold,
};

#[cfg(feature = "io")]
Expand Down
115 changes: 115 additions & 0 deletions futures-util/src/stream/stream/map_while.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_project_lite::pin_project;

pin_project! {
/// Stream for the [`map_while`](super::StreamExt::map_while) method.
#[must_use = "streams do nothing unless polled"]
pub struct MapWhile<St: Stream, Fut, F> {
#[pin]
stream: St,
f: F,
#[pin]
pending_fut: Option<Fut>,
done_mapping: bool,
}
}

impl<St, Fut, F> fmt::Debug for MapWhile<St, Fut, F>
where
St: Stream + fmt::Debug,
St::Item: fmt::Debug,
Fut: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MapWhile")
.field("stream", &self.stream)
.field("pending_fut", &self.pending_fut)
.field("done_mapping", &self.done_mapping)
.finish()
}
}

impl<St, Fut, F, T> MapWhile<St, Fut, F>
where
St: Stream,
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = Option<T>>,
{
pub(super) fn new(stream: St, f: F) -> Self {
Self { stream, f, pending_fut: None, done_mapping: false }
}

delegate_access_inner!(stream, St, ());
}

impl<St, Fut, F, T> Stream for MapWhile<St, Fut, F>
where
St: Stream,
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = Option<T>>,
{
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
if self.done_mapping {
return Poll::Ready(None);
}

let mut this = self.project();

Poll::Ready(loop {
if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
let mapped = ready!(fut.poll(cx));
this.pending_fut.set(None);

if mapped.is_none() {
*this.done_mapping = true;
}

break mapped;
} else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
this.pending_fut.set(Some((this.f)(item)));
} else {
break None;
}
})
}

fn size_hint(&self) -> (usize, Option<usize>) {
if self.done_mapping {
return (0, Some(0));
}

let (_, upper) = self.stream.size_hint();
(0, upper) // can't know a lower bound, due to the predicate
}
}

impl<St, Fut, F, T> FusedStream for MapWhile<St, Fut, F>
where
St: FusedStream,
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = Option<T>>,
{
fn is_terminated(&self) -> bool {
self.done_mapping || self.stream.is_terminated()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Fut, F, Item> Sink<Item> for MapWhile<S, Fut, F>
where
S: Stream + Sink<Item>,
{
type Error = S::Error;

delegate_sink!(stream, Item);
}
33 changes: 33 additions & 0 deletions futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ pub use self::take::Take;
mod take_while;
pub use self::take_while::TakeWhile;

mod map_while;
pub use self::map_while::MapWhile;

mod take_until;
pub use self::take_until::TakeUntil;

Expand Down Expand Up @@ -994,6 +997,36 @@ pub trait StreamExt: Stream {
assert_stream::<Self::Item, _>(TakeWhile::new(self, f))
}

/// Maps elements from this stream while the provided asynchronous future
/// resolves to `Some(_)`.
///
/// This function, like `Iterator::map_while`, will take elements from the
/// stream until the future `f` resolves to `None`. Once one element
/// returns `None`, it will always return that the stream is done.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future;
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter(vec![1, 4, 0, 2]);
///
/// let stream = stream.map_while(|x| future::ready(u32::checked_div(16, x)));
///
/// assert_eq!(vec![16, 4], stream.collect::<Vec<_>>().await);
/// # });
/// ```
fn map_while<Fut, F, T>(self, f: F) -> MapWhile<Self, Fut, F>
where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = Option<T>>,
Self: Sized,
{
assert_stream::<T, _>(MapWhile::new(self, f))
}

/// Take elements from this stream until the provided future resolves.
///
/// This function will take elements from the stream until the provided
Expand Down
34 changes: 34 additions & 0 deletions futures-util/src/stream/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ pub use self::try_skip_while::TrySkipWhile;
mod try_take_while;
pub use self::try_take_while::TryTakeWhile;

mod try_map_while;
pub use self::try_map_while::TryMapWhile;

#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
mod try_buffer_unordered;
Expand Down Expand Up @@ -467,6 +470,37 @@ pub trait TryStreamExt: TryStream {
assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f))
}

/// Maps elements from this stream while the provided asynchronous future
/// resolves to `Some(_)`.
///
/// This function is similar to
/// [`StreamExt::map_while`](crate::stream::StreamExt::map_while) but exits
/// early if an error occurs.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future;
/// use futures::stream::{self, TryStreamExt};
///
/// let stream = stream::iter(vec![Ok::<u32, u32>(1), Ok(4), Ok(0), Ok(2)]);
///
/// let stream = stream.try_map_while(|x| future::ready(Ok(u32::checked_div(16, x))));
///
/// let output: Result<Vec<u32>, u32> = stream.try_collect().await;
/// assert_eq!(output, Ok(vec![16, 4]));
/// # });
/// ```
fn try_map_while<Fut, F, T>(self, f: F) -> TryMapWhile<Self, Fut, F>
where
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
Self: Sized,
{
assert_stream::<Result<T, Self::Error>, _>(TryMapWhile::new(self, f))
}

/// Attempt to transform a stream into a collection,
/// returning a future representing the result of that computation.
///
Expand Down
120 changes: 120 additions & 0 deletions futures-util/src/stream/try_stream/try_map_while.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::TryFuture;
use futures_core::ready;
use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_project_lite::pin_project;

pin_project! {
/// Stream for the [`try_map_while`](super::TryStreamExt::try_map_while)
/// method.
#[must_use = "streams do nothing unless polled"]
pub struct TryMapWhile<St, Fut, F>
where
St: TryStream,
{
#[pin]
stream: St,
f: F,
#[pin]
pending_fut: Option<Fut>,
done_mapping: bool,
}
}

impl<St, Fut, F> fmt::Debug for TryMapWhile<St, Fut, F>
where
St: TryStream + fmt::Debug,
St::Ok: fmt::Debug,
Fut: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TryMapWhile")
.field("stream", &self.stream)
.field("pending_fut", &self.pending_fut)
.field("done_mapping", &self.done_mapping)
.finish()
}
}

impl<St, Fut, F, T> TryMapWhile<St, Fut, F>
where
St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: TryFuture<Ok = Option<T>, Error = St::Error>,
{
pub(super) fn new(stream: St, f: F) -> Self {
Self { stream, f, pending_fut: None, done_mapping: false }
}

delegate_access_inner!(stream, St, ());
}

impl<St, Fut, F, T> Stream for TryMapWhile<St, Fut, F>
where
St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: TryFuture<Ok = Option<T>, Error = St::Error>,
{
type Item = Result<T, St::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.done_mapping {
return Poll::Ready(None);
}

let mut this = self.project();

Poll::Ready(loop {
if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
let res = ready!(fut.try_poll(cx));
this.pending_fut.set(None);

let mapped = res?;
if mapped.is_none() {
*this.done_mapping = true;
}

break mapped.map(Ok);
} else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
this.pending_fut.set(Some((this.f)(item)));
} else {
break None;
}
})
}

fn size_hint(&self) -> (usize, Option<usize>) {
if self.done_mapping {
return (0, Some(0));
}

let (_, upper) = self.stream.size_hint();
(0, upper) // can't know a lower bound, due to the predicate
}
}

impl<St, Fut, F, T> FusedStream for TryMapWhile<St, Fut, F>
where
St: TryStream + FusedStream,
F: FnMut(St::Ok) -> Fut,
Fut: TryFuture<Ok = Option<T>, Error = St::Error>,
{
fn is_terminated(&self) -> bool {
self.done_mapping || self.stream.is_terminated()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Fut, F, Item, E> Sink<Item> for TryMapWhile<S, Fut, F>
where
S: TryStream + Sink<Item, Error = E>,
{
type Error = E;

delegate_sink!(stream, Item);
}