From d7f97c5fbd59f160d4fa8b09010d3d854c8d7b45 Mon Sep 17 00:00:00 2001 From: nanoqsh Date: Sat, 4 May 2024 17:49:35 +0500 Subject: [PATCH 1/3] Impl `MapWhile` --- futures-util/src/stream/mod.rs | 6 +- futures-util/src/stream/stream/map_while.rs | 115 ++++++++++++++++++++ futures-util/src/stream/stream/mod.rs | 28 +++++ 3 files changed, 146 insertions(+), 3 deletions(-) create mode 100644 futures-util/src/stream/stream/map_while.rs diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 789e1ad22..add5ad0da 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -19,9 +19,9 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; mod stream; pub use self::stream::{ All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, - Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, - SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, - TryFold, TryForEach, Unzip, Zip, + Fold, ForEach, Fuse, Inspect, Map, MapWhile, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, + Scan, SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, + Then, TryFold, TryForEach, Unzip, Zip, }; #[cfg(feature = "std")] diff --git a/futures-util/src/stream/stream/map_while.rs b/futures-util/src/stream/stream/map_while.rs new file mode 100644 index 000000000..30421d501 --- /dev/null +++ b/futures-util/src/stream/stream/map_while.rs @@ -0,0 +1,115 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`map_while`](super::StreamExt::map_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct MapWhile { + #[pin] + stream: St, + f: F, + #[pin] + pending_fut: Option, + done_mapping: bool, + } +} + +impl fmt::Debug for MapWhile +where + St: Stream + fmt::Debug, + St::Item: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MapWhile") + .field("stream", &self.stream) + .field("pending_fut", &self.pending_fut) + .field("done_mapping", &self.done_mapping) + .finish() + } +} + +impl MapWhile +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future>, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, pending_fut: None, done_mapping: false } + } + + delegate_access_inner!(stream, St, ()); +} + +impl Stream for MapWhile +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future>, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.done_mapping { + return Poll::Ready(None); + } + + let mut this = self.project(); + + Poll::Ready(loop { + if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { + let mapped = ready!(fut.poll(cx)); + this.pending_fut.set(None); + + if mapped.is_none() { + *this.done_mapping = true; + } + + break mapped; + } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { + this.pending_fut.set(Some((this.f)(item))); + } else { + break None; + } + }) + } + + fn size_hint(&self) -> (usize, Option) { + if self.done_mapping { + return (0, Some(0)); + } + + let (_, upper) = self.stream.size_hint(); + (0, upper) // can't know a lower bound, due to the predicate + } +} + +impl FusedStream for MapWhile +where + St: FusedStream, + F: FnMut(St::Item) -> Fut, + Fut: Future>, +{ + fn is_terminated(&self) -> bool { + self.done_mapping || self.stream.is_terminated() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl Sink for MapWhile +where + S: Stream + Sink, +{ + type Error = S::Error; + + delegate_sink!(stream, Item); +} diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index ee30f8da6..99fec1301 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -128,6 +128,9 @@ pub use self::take::Take; mod take_while; pub use self::take_while::TakeWhile; +mod map_while; +pub use self::map_while::MapWhile; + mod take_until; pub use self::take_until::TakeUntil; @@ -994,6 +997,31 @@ pub trait StreamExt: Stream { assert_stream::(TakeWhile::new(self, f)) } + /// todo + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future; + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(vec![1, 4, 0, 2]); + /// + /// let stream = stream.map_while(|x| future::ready(u32::checked_div(16, x))); + /// + /// assert_eq!(vec![16, 4], stream.collect::>().await); + /// # }); + /// ``` + fn map_while(self, f: F) -> MapWhile + where + F: FnMut(Self::Item) -> Fut, + Fut: Future>, + Self: Sized, + { + assert_stream::(MapWhile::new(self, f)) + } + /// Take elements from this stream until the provided future resolves. /// /// This function will take elements from the stream until the provided From b7842f0afe7b9d9b4ffb10dd96e9e7e07193e086 Mon Sep 17 00:00:00 2001 From: nanoqsh Date: Sat, 4 May 2024 18:02:28 +0500 Subject: [PATCH 2/3] Add documentation --- futures-util/src/stream/stream/mod.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 99fec1301..8787afe85 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -997,7 +997,12 @@ pub trait StreamExt: Stream { assert_stream::(TakeWhile::new(self, f)) } - /// todo + /// Take elements from this stream while the provided asynchronous future + /// resolves to `Some(_)`. + /// + /// This function, like `Iterator::map_while`, will take elements from the + /// stream until the future `f` resolves to `None`. Once one element + /// returns `None`, it will always return that the stream is done. /// /// # Examples /// From 63895a129c3066eb0ab9e0724abdaea869def674 Mon Sep 17 00:00:00 2001 From: nanoqsh Date: Sat, 4 May 2024 18:58:02 +0500 Subject: [PATCH 3/3] Impl `TryMapWhile` --- futures-util/src/stream/mod.rs | 4 +- futures-util/src/stream/stream/mod.rs | 2 +- futures-util/src/stream/try_stream/mod.rs | 34 +++++ .../src/stream/try_stream/try_map_while.rs | 120 ++++++++++++++++++ 4 files changed, 157 insertions(+), 3 deletions(-) create mode 100644 futures-util/src/stream/try_stream/try_map_while.rs diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index add5ad0da..8421b5325 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -53,8 +53,8 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream}; mod try_stream; pub use self::try_stream::{ try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, TryAll, - TryAny, TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext, TrySkipWhile, - TryStreamExt, TryTakeWhile, TryUnfold, + TryAny, TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryMapWhile, TryNext, + TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, }; #[cfg(feature = "io")] diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 8787afe85..1df53a6be 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -997,7 +997,7 @@ pub trait StreamExt: Stream { assert_stream::(TakeWhile::new(self, f)) } - /// Take elements from this stream while the provided asynchronous future + /// Maps elements from this stream while the provided asynchronous future /// resolves to `Some(_)`. /// /// This function, like `Iterator::map_while`, will take elements from the diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index fe9317d7a..782b6ea25 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -124,6 +124,9 @@ pub use self::try_skip_while::TrySkipWhile; mod try_take_while; pub use self::try_take_while::TryTakeWhile; +mod try_map_while; +pub use self::try_map_while::TryMapWhile; + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod try_buffer_unordered; @@ -467,6 +470,37 @@ pub trait TryStreamExt: TryStream { assert_stream::, _>(TryTakeWhile::new(self, f)) } + /// Maps elements from this stream while the provided asynchronous future + /// resolves to `Some(_)`. + /// + /// This function is similar to + /// [`StreamExt::map_while`](crate::stream::StreamExt::map_while) but exits + /// early if an error occurs. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future; + /// use futures::stream::{self, TryStreamExt}; + /// + /// let stream = stream::iter(vec![Ok::(1), Ok(4), Ok(0), Ok(2)]); + /// + /// let stream = stream.try_map_while(|x| future::ready(Ok(u32::checked_div(16, x)))); + /// + /// let output: Result, u32> = stream.try_collect().await; + /// assert_eq!(output, Ok(vec![16, 4])); + /// # }); + /// ``` + fn try_map_while(self, f: F) -> TryMapWhile + where + F: FnMut(Self::Ok) -> Fut, + Fut: TryFuture, Error = Self::Error>, + Self: Sized, + { + assert_stream::, _>(TryMapWhile::new(self, f)) + } + /// Attempt to transform a stream into a collection, /// returning a future representing the result of that computation. /// diff --git a/futures-util/src/stream/try_stream/try_map_while.rs b/futures-util/src/stream/try_stream/try_map_while.rs new file mode 100644 index 000000000..7f708bf92 --- /dev/null +++ b/futures-util/src/stream/try_stream/try_map_while.rs @@ -0,0 +1,120 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::TryFuture; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`try_map_while`](super::TryStreamExt::try_map_while) + /// method. + #[must_use = "streams do nothing unless polled"] + pub struct TryMapWhile + where + St: TryStream, + { + #[pin] + stream: St, + f: F, + #[pin] + pending_fut: Option, + done_mapping: bool, + } +} + +impl fmt::Debug for TryMapWhile +where + St: TryStream + fmt::Debug, + St::Ok: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TryMapWhile") + .field("stream", &self.stream) + .field("pending_fut", &self.pending_fut) + .field("done_mapping", &self.done_mapping) + .finish() + } +} + +impl TryMapWhile +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: TryFuture, Error = St::Error>, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, pending_fut: None, done_mapping: false } + } + + delegate_access_inner!(stream, St, ()); +} + +impl Stream for TryMapWhile +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: TryFuture, Error = St::Error>, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.done_mapping { + return Poll::Ready(None); + } + + let mut this = self.project(); + + Poll::Ready(loop { + if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { + let res = ready!(fut.try_poll(cx)); + this.pending_fut.set(None); + + let mapped = res?; + if mapped.is_none() { + *this.done_mapping = true; + } + + break mapped.map(Ok); + } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) { + this.pending_fut.set(Some((this.f)(item))); + } else { + break None; + } + }) + } + + fn size_hint(&self) -> (usize, Option) { + if self.done_mapping { + return (0, Some(0)); + } + + let (_, upper) = self.stream.size_hint(); + (0, upper) // can't know a lower bound, due to the predicate + } +} + +impl FusedStream for TryMapWhile +where + St: TryStream + FusedStream, + F: FnMut(St::Ok) -> Fut, + Fut: TryFuture, Error = St::Error>, +{ + fn is_terminated(&self) -> bool { + self.done_mapping || self.stream.is_terminated() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl Sink for TryMapWhile +where + S: TryStream + Sink, +{ + type Error = E; + + delegate_sink!(stream, Item); +}