cancel_safe_futures/sink/
reserve.rs

1use super::Permit;
2use core::{
3    future::Future,
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7};
8use futures_core::{ready, FusedFuture};
9use futures_sink::Sink;
10
11/// Future for the [`reserve`](super::SinkExt::reserve) method.
12#[derive(Debug)]
13#[must_use]
14pub struct Reserve<'a, Si: ?Sized, Item> {
15    // This is None once a permit has been issued.
16    sink: Option<&'a mut Si>,
17    // Not entirely sure why this is fn(Item), but presumably it's for variance reasons (an argument
18    // to a fn parameter is contravariant with respect to lifetimes). This is copied from
19    // futures_util::SinkExt's futures which all use PhantomData<fn(Item)>.
20    _phantom: PhantomData<fn(Item)>,
21}
22
23// By default, Unpin would be implemented for Reserve even if Si isn't Unpin. But we explicitly only
24// support Unpin sinks.
25impl<Si: Unpin + ?Sized, Item> Unpin for Reserve<'_, Si, Item> {}
26
27impl<'a, Item, Si: Sink<Item> + Unpin + ?Sized> Reserve<'a, Si, Item> {
28    pub(super) fn new(sink: &'a mut Si) -> Self {
29        Self {
30            sink: Some(sink),
31            _phantom: PhantomData,
32        }
33    }
34
35    pub(super) fn sink_pin_mut(&mut self) -> Option<Pin<&mut Si>> {
36        // Can't use Option::map here due to lifetime issues.
37        #[allow(clippy::manual_map)]
38        match &mut self.sink {
39            Some(sink) => Some(Pin::new(sink)),
40            None => None,
41        }
42    }
43}
44
45impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Future for Reserve<'a, Si, Item> {
46    type Output = Result<Permit<'a, Si, Item>, Si::Error>;
47
48    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
49        if let Some(sink) = self.sink.as_mut() {
50            ready!(Pin::new(sink).poll_ready(cx))?;
51            let sink = self.sink.take().unwrap();
52            Poll::Ready(Ok(Permit::new(sink)))
53        } else {
54            Poll::Pending
55        }
56    }
57}
58
59impl<Si: Sink<Item> + Unpin + ?Sized, Item> FusedFuture for Reserve<'_, Si, Item> {
60    fn is_terminated(&self) -> bool {
61        // Once a permit has been issued, the sink becomes None.
62        self.sink.is_none()
63    }
64}