async_std/stream/stream/
inspect.rs1use core::pin::Pin;
2
3use pin_project_lite::pin_project;
4
5use crate::stream::Stream;
6use crate::task::{Context, Poll};
7
8pin_project! {
9 #[derive(Debug)]
17 pub struct Inspect<S, F> {
18 #[pin]
19 stream: S,
20 f: F,
21 }
22}
23
24impl<S, F> Inspect<S, F> {
25 pub(super) fn new(stream: S, f: F) -> Self {
26 Self {
27 stream,
28 f,
29 }
30 }
31}
32
33impl<S, F> Stream for Inspect<S, F>
34where
35 S: Stream,
36 F: FnMut(&S::Item),
37{
38 type Item = S::Item;
39
40 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
41 let mut this = self.project();
42 let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
43
44 Poll::Ready(next.map(|x| {
45 (this.f)(&x);
46 x
47 }))
48 }
49}