1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//! Unforgettable-types-friendly rendezvous channel.
//!
//! # Examples
//!
//! ```rust,compile_fail
//! use std::{future::Future, pin::Pin, task};
//! use noop_waker::noop_waker;
//! use leak_playground_flume::*;
//! use leak_playground_std::marker::{Forget, Unforget};
//!
//! let i = 37;
//! {
//!    let d = Unforget::<'static, &i32>::new(&i);
//!    let (tx, rx) = rendezvous();
//!
//!    let waker = noop_waker();
//!    let mut cx = task::Context::from_waker(&waker);
//!
//!    let mut rx = Box::pin(rx.into_recv_async());
//!    assert!(rx.as_mut().poll(&mut cx).is_pending());
//!
//!    tx.try_send((rx as Pin<Box<dyn Forget + '_>>, d)).unwrap();
//! }
//! ```

use std::{
    future::Future,
    pin::{self, Pin},
    task,
};

use leak_playground_std::marker::Forget;

/// Create a rendezvous channel.
pub fn rendezvous<T>() -> (Sender<T>, Receiver<T>) {
    let (tx, rx) = flume::bounded(0);
    (Sender { inner: tx }, Receiver { inner: rx })
}

pub struct Sender<T> {
    inner: flume::Sender<T>,
}

unsafe impl<T> Forget for Sender<T> {}

impl<T> Sender<T> {
    pub fn send(&self, msg: T) -> Result<(), flume::SendError<T>> {
        self.inner.send(msg)
    }

    pub fn try_send(&self, msg: T) -> Result<(), flume::TrySendError<T>> {
        self.inner.try_send(msg)
    }

    /// Asynchronously send an item.
    pub fn send_async(&self, item: T) -> flume::r#async::SendFut<T> {
        self.inner.send_async(item)
    }

    /// Asynchronously send an item and consume the sender.
    pub fn into_send_async<'a>(self, item: T) -> flume::r#async::SendFut<'a, T> {
        self.inner.into_send_async(item)
    }
}

pub struct Receiver<T> {
    inner: flume::Receiver<T>,
}

unsafe impl<T> Forget for Receiver<T> {}

impl<T> Receiver<T> {
    pub fn recv(&self) -> Result<T, flume::RecvError> {
        self.inner.recv()
    }

    pub fn try_recv(&self) -> Result<T, flume::TryRecvError> {
        self.inner.try_recv()
    }

    /// Asynchronously receive an item.
    pub fn recv_async(&self) -> RecvFut<'_, T>
    where
        T: Forget,
    {
        RecvFut {
            inner: self.inner.recv_async(),
        }
    }

    /// Asynchronously receive an unforgettable item.
    ///
    /// # Safety
    ///
    /// `T` must not take ownership over itself.
    pub unsafe fn recv_async_unchecked(&self) -> RecvFut<'_, T> {
        RecvFut {
            inner: self.inner.recv_async(),
        }
    }

    /// Asynchronously receive an item and consume the receiver.
    pub fn into_recv_async<'a>(self) -> RecvFut<'a, T>
    where
        T: Forget,
    {
        RecvFut {
            inner: self.inner.into_recv_async(),
        }
    }

    /// Asynchronously receive an unforgettable item and consume the receiver.
    ///
    /// # Safety
    ///
    /// `T` must not take ownership over itself.
    pub unsafe fn into_recv_async_unchecked<'a>(self) -> RecvFut<'a, T> {
        RecvFut {
            inner: self.inner.into_recv_async(),
        }
    }
}

pub struct RecvFut<'a, T> {
    inner: flume::r#async::RecvFut<'a, T>,
}

impl<'a, T> Future for RecvFut<'a, T> {
    type Output = Result<T, flume::RecvError>;

    fn poll(self: pin::Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().inner) }.poll(cx)
    }
}

unsafe impl<T: Forget> Forget for RecvFut<'_, T> {}