cancel_safe_futures/sink/
flush_reserve.rs

1use super::{Permit, Reserve};
2use core::{
3    future::Future,
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7};
8use futures_core::{ready, FusedFuture};
9use futures_sink::Sink;
10
11/// Future for the [`flush_reserve`](super::SinkExt::flush_reserve) method.
12#[derive(Debug)]
13#[must_use]
14pub struct FlushReserve<'a, Si: ?Sized, Item> {
15    reserve: Reserve<'a, Si, Item>,
16    state: FlushReserveState,
17    _phantom: PhantomData<fn(Item)>,
18}
19
20// By default, Unpin would be implemented for FlushReserve even if Si isn't Unpin. But we explicitly
21// only support Unpin sinks.
22impl<Si: Unpin + ?Sized, Item> Unpin for FlushReserve<'_, Si, Item> {}
23
24impl<'a, Item, Si: Sink<Item> + Unpin + ?Sized> FlushReserve<'a, Si, Item> {
25    pub(super) fn new(sink: &'a mut Si) -> Self {
26        Self {
27            reserve: Reserve::new(sink),
28            state: FlushReserveState::PollFlush,
29            _phantom: PhantomData,
30        }
31    }
32}
33
34impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Future for FlushReserve<'a, Si, Item> {
35    type Output = Result<Permit<'a, Si, Item>, Si::Error>;
36
37    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38        let this = &mut *self;
39        if this.state == FlushReserveState::PollFlush {
40            ready!(this
41                .reserve
42                .sink_pin_mut()
43                .expect("PollFlush => sink must be Some")
44                .poll_flush(cx))?;
45            // Move to the reserve state.
46            this.state = FlushReserveState::Reserve;
47        }
48
49        debug_assert_eq!(this.state, FlushReserveState::Reserve);
50
51        Pin::new(&mut this.reserve).poll(cx)
52    }
53}
54
55impl<Si: Sink<Item> + Unpin + ?Sized, Item> FusedFuture for FlushReserve<'_, Si, Item> {
56    fn is_terminated(&self) -> bool {
57        // Once a permit has been issued, the sink becomes None.
58        self.reserve.is_terminated()
59    }
60}
61
62#[derive(Copy, Clone, Debug, Eq, PartialEq)]
63enum FlushReserveState {
64    PollFlush,
65    Reserve,
66}