cancel_safe_futures/sink/
flush_reserve.rs1use super::{Permit, Reserve};
2use core::{
3 future::Future,
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7};
8use futures_core::{ready, FusedFuture};
9use futures_sink::Sink;
10
11#[derive(Debug)]
13#[must_use]
14pub struct FlushReserve<'a, Si: ?Sized, Item> {
15 reserve: Reserve<'a, Si, Item>,
16 state: FlushReserveState,
17 _phantom: PhantomData<fn(Item)>,
18}
19
20impl<Si: Unpin + ?Sized, Item> Unpin for FlushReserve<'_, Si, Item> {}
23
24impl<'a, Item, Si: Sink<Item> + Unpin + ?Sized> FlushReserve<'a, Si, Item> {
25 pub(super) fn new(sink: &'a mut Si) -> Self {
26 Self {
27 reserve: Reserve::new(sink),
28 state: FlushReserveState::PollFlush,
29 _phantom: PhantomData,
30 }
31 }
32}
33
34impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Future for FlushReserve<'a, Si, Item> {
35 type Output = Result<Permit<'a, Si, Item>, Si::Error>;
36
37 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38 let this = &mut *self;
39 if this.state == FlushReserveState::PollFlush {
40 ready!(this
41 .reserve
42 .sink_pin_mut()
43 .expect("PollFlush => sink must be Some")
44 .poll_flush(cx))?;
45 this.state = FlushReserveState::Reserve;
47 }
48
49 debug_assert_eq!(this.state, FlushReserveState::Reserve);
50
51 Pin::new(&mut this.reserve).poll(cx)
52 }
53}
54
55impl<Si: Sink<Item> + Unpin + ?Sized, Item> FusedFuture for FlushReserve<'_, Si, Item> {
56 fn is_terminated(&self) -> bool {
57 self.reserve.is_terminated()
59 }
60}
61
62#[derive(Copy, Clone, Debug, Eq, PartialEq)]
63enum FlushReserveState {
64 PollFlush,
65 Reserve,
66}