1
1
//! A single-producer, single-consumer (oneshot) channel.
2
+ //!
3
+ //! This is an experimental module, so the API will likely change.
2
4
3
5
use crate :: sync:: mpmc;
4
6
use crate :: sync:: mpsc:: { RecvError , SendError } ;
5
7
use crate :: time:: { Duration , Instant } ;
6
8
use crate :: { error, fmt} ;
7
9
8
10
/// Creates a new oneshot channel, returning the sender/receiver halves.
11
+ ///
12
+ /// # Examples
13
+ ///
14
+ /// ```
15
+ /// # #![feature(oneshot_channel)]
16
+ /// # use std::sync::oneshot;
17
+ /// # use std::thread;
18
+ /// #
19
+ /// let (sender, receiver) = oneshot::channel();
20
+ ///
21
+ /// // Spawn off an expensive computation.
22
+ /// thread::spawn(move || {
23
+ /// # fn expensive_computation() -> i32 { 42 }
24
+ /// sender.send(expensive_computation()).unwrap();
25
+ /// // `sender` is consumed by `send`, so we cannot use it anymore.
26
+ /// });
27
+ ///
28
+ /// # fn do_other_work() -> i32 { 42 }
29
+ /// do_other_work();
30
+ ///
31
+ /// // Let's see what that answer was...
32
+ /// println!("{:?}", receiver.recv().unwrap());
33
+ /// // `receiver` is consumed by `recv`, so we cannot use it anymore.
34
+ /// ```
9
35
#[ must_use]
10
36
#[ unstable( feature = "oneshot_channel" , issue = "143674" ) ]
11
37
pub fn channel < T > ( ) -> ( Sender < T > , Receiver < T > ) {
12
38
// Using a `sync_channel` with capacity 1 means that the internal implementation will use the
13
- // `Array`-flavored channel implementtion .
14
- let ( tx , rx ) = mpmc:: sync_channel ( 1 ) ;
15
- ( Sender { inner : tx } , Receiver { inner : rx } )
39
+ // `Array`-flavored channel implementation .
40
+ let ( sender , receiver ) = mpmc:: sync_channel ( 1 ) ;
41
+ ( Sender { inner : sender } , Receiver { inner : receiver } )
16
42
}
17
43
18
44
////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -23,17 +49,33 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
23
49
///
24
50
/// # Examples
25
51
///
26
- /// (more examples to come)
52
+ /// ```
53
+ /// # #![feature(oneshot_channel)]
54
+ /// # use std::sync::oneshot;
55
+ /// # use std::thread;
56
+ /// #
57
+ /// let (sender, receiver) = oneshot::channel();
58
+ ///
59
+ /// thread::spawn(move || {
60
+ /// sender.send("Hello from thread!").unwrap();
61
+ /// });
62
+ ///
63
+ /// assert_eq!(receiver.recv().unwrap(), "Hello from thread!");
64
+ /// ```
65
+ ///
66
+ /// `Sender` cannot be sent between threads if it is sending non-`Send` types.
27
67
///
28
68
/// ```compile_fail
29
69
/// # #![feature(oneshot_channel)]
30
70
/// # use std::sync::oneshot;
71
+ /// # use std::thread;
72
+ /// # use std::ptr;
31
73
/// #
32
74
/// let (sender, receiver) = oneshot::channel();
33
75
///
34
76
/// struct NotSend(*mut ());
35
- /// std:: thread::spawn(move || {
36
- /// sender.send(NotSend(std:: ptr::null_mut()));
77
+ /// thread::spawn(move || {
78
+ /// sender.send(NotSend(ptr::null_mut()));
37
79
/// });
38
80
///
39
81
/// let reply = receiver.try_recv().unwrap();
@@ -57,6 +99,24 @@ impl<T> Sender<T> {
57
99
/// [`Receiver<T>`] has been dropped.
58
100
///
59
101
/// This method is non-blocking (wait-free).
102
+ ///
103
+ /// # Examples
104
+ ///
105
+ /// ```
106
+ /// # #![feature(oneshot_channel)]
107
+ /// # use std::sync::oneshot;
108
+ /// # use std::thread;
109
+ /// #
110
+ /// let (tx, rx) = oneshot::channel();
111
+ ///
112
+ /// thread::spawn(move || {
113
+ /// // Perform some computation.
114
+ /// let result = 2 + 2;
115
+ /// tx.send(result).unwrap();
116
+ /// });
117
+ ///
118
+ /// assert_eq!(rx.recv().unwrap(), 4);
119
+ /// ```
60
120
#[ unstable( feature = "oneshot_channel" , issue = "143674" ) ]
61
121
pub fn send ( self , t : T ) -> Result < ( ) , SendError < T > > {
62
122
self . inner . send ( t)
@@ -78,18 +138,37 @@ impl<T> fmt::Debug for Sender<T> {
78
138
///
79
139
/// # Examples
80
140
///
81
- /// (more examples to come)
141
+ /// ```
142
+ /// # #![feature(oneshot_channel)]
143
+ /// # use std::sync::oneshot;
144
+ /// # use std::thread;
145
+ /// # use std::time::Duration;
146
+ /// #
147
+ /// let (sender, receiver) = oneshot::channel();
148
+ ///
149
+ /// thread::spawn(move || {
150
+ /// thread::sleep(Duration::from_millis(100));
151
+ /// sender.send("Hello after delay!").unwrap();
152
+ /// });
153
+ ///
154
+ /// println!("Waiting for message...");
155
+ /// println!("{}", receiver.recv().unwrap());
156
+ /// ```
157
+ ///
158
+ /// `Receiver` cannot be sent between threads if it is receiving non-`Send` types.
82
159
///
83
160
/// ```compile_fail
84
161
/// # #![feature(oneshot_channel)]
85
162
/// # use std::sync::oneshot;
163
+ /// # use std::thread;
164
+ /// # use std::ptr;
86
165
/// #
87
166
/// let (sender, receiver) = oneshot::channel();
88
167
///
89
168
/// struct NotSend(*mut ());
90
- /// sender.send(NotSend(std:: ptr::null_mut()));
169
+ /// sender.send(NotSend(ptr::null_mut()));
91
170
///
92
- /// std:: thread::spawn(move || {
171
+ /// thread::spawn(move || {
93
172
/// let reply = receiver.try_recv().unwrap();
94
173
/// });
95
174
/// ```
@@ -111,6 +190,25 @@ impl<T> Receiver<T> {
111
190
/// Receives the value from the sending end, blocking the calling thread until it gets it.
112
191
///
113
192
/// Can only fail if the corresponding [`Sender<T>`] has been dropped.
193
+ ///
194
+ /// # Examples
195
+ ///
196
+ /// ```
197
+ /// # #![feature(oneshot_channel)]
198
+ /// # use std::sync::oneshot;
199
+ /// # use std::thread;
200
+ /// # use std::time::Duration;
201
+ /// #
202
+ /// let (tx, rx) = oneshot::channel();
203
+ ///
204
+ /// thread::spawn(move || {
205
+ /// thread::sleep(Duration::from_millis(500));
206
+ /// tx.send("Done!").unwrap();
207
+ /// });
208
+ ///
209
+ /// // This will block until the message arrives.
210
+ /// println!("{}", rx.recv().unwrap());
211
+ /// ```
114
212
#[ unstable( feature = "oneshot_channel" , issue = "143674" ) ]
115
213
pub fn recv ( self ) -> Result < T , RecvError > {
116
214
self . inner . recv ( )
@@ -119,6 +217,39 @@ impl<T> Receiver<T> {
119
217
// Fallible methods.
120
218
121
219
/// Attempts to return a pending value on this receiver without blocking.
220
+ ///
221
+ /// # Examples
222
+ ///
223
+ /// ```
224
+ /// # #![feature(oneshot_channel)]
225
+ /// # use std::sync::oneshot;
226
+ /// # use std::thread;
227
+ /// # use std::time::Duration;
228
+ /// #
229
+ /// let (sender, mut receiver) = oneshot::channel();
230
+ ///
231
+ /// thread::spawn(move || {
232
+ /// thread::sleep(Duration::from_millis(100));
233
+ /// sender.send(42).unwrap();
234
+ /// });
235
+ ///
236
+ /// // Keep trying until we get the message, doing other work in the process.
237
+ /// loop {
238
+ /// match receiver.try_recv() {
239
+ /// Ok(value) => {
240
+ /// assert_eq!(value, 42);
241
+ /// break;
242
+ /// }
243
+ /// Err(oneshot::TryRecvError::Empty(rx)) => {
244
+ /// // Retake ownership of the receiver.
245
+ /// receiver = rx;
246
+ /// # fn do_other_work() { thread::sleep(Duration::from_millis(25)); }
247
+ /// do_other_work();
248
+ /// }
249
+ /// Err(oneshot::TryRecvError::Disconnected) => panic!("Sender disconnected"),
250
+ /// }
251
+ /// }
252
+ /// ```
122
253
#[ unstable( feature = "oneshot_channel" , issue = "143674" ) ]
123
254
pub fn try_recv ( self ) -> Result < T , TryRecvError < T > > {
124
255
self . inner . try_recv ( ) . map_err ( |err| match err {
@@ -129,6 +260,29 @@ impl<T> Receiver<T> {
129
260
130
261
/// Attempts to wait for a value on this receiver, returning an error if the corresponding
131
262
/// [`Sender`] half of this channel has been dropped, or if it waits more than `timeout`.
263
+ ///
264
+ /// # Examples
265
+ ///
266
+ /// ```
267
+ /// # #![feature(oneshot_channel)]
268
+ /// # use std::sync::oneshot;
269
+ /// # use std::thread;
270
+ /// # use std::time::Duration;
271
+ /// #
272
+ /// let (sender, receiver) = oneshot::channel();
273
+ ///
274
+ /// thread::spawn(move || {
275
+ /// thread::sleep(Duration::from_millis(500));
276
+ /// sender.send("Success!").unwrap();
277
+ /// });
278
+ ///
279
+ /// // Wait up to 1 second for the message
280
+ /// match receiver.recv_timeout(Duration::from_secs(1)) {
281
+ /// Ok(msg) => println!("Received: {}", msg),
282
+ /// Err(oneshot::RecvTimeoutError::Timeout(_)) => println!("Timed out!"),
283
+ /// Err(oneshot::RecvTimeoutError::Disconnected) => println!("Sender dropped!"),
284
+ /// }
285
+ /// ```
132
286
#[ unstable( feature = "oneshot_channel" , issue = "143674" ) ]
133
287
pub fn recv_timeout ( self , timeout : Duration ) -> Result < T , RecvTimeoutError < T > > {
134
288
self . inner . recv_timeout ( timeout) . map_err ( |err| match err {
@@ -139,6 +293,29 @@ impl<T> Receiver<T> {
139
293
140
294
/// Attempts to wait for a value on this receiver, returning an error if the corresponding
141
295
/// [`Sender`] half of this channel has been dropped, or if `deadline` is reached.
296
+ ///
297
+ /// # Examples
298
+ ///
299
+ /// ```
300
+ /// # #![feature(oneshot_channel)]
301
+ /// # use std::sync::oneshot;
302
+ /// # use std::thread;
303
+ /// # use std::time::{Duration, Instant};
304
+ /// #
305
+ /// let (sender, receiver) = oneshot::channel();
306
+ ///
307
+ /// thread::spawn(move || {
308
+ /// thread::sleep(Duration::from_millis(100));
309
+ /// sender.send("Just in time!").unwrap();
310
+ /// });
311
+ ///
312
+ /// let deadline = Instant::now() + Duration::from_millis(500);
313
+ /// match receiver.recv_deadline(deadline) {
314
+ /// Ok(msg) => println!("Received: {}", msg),
315
+ /// Err(oneshot::RecvTimeoutError::Timeout(_)) => println!("Missed deadline!"),
316
+ /// Err(oneshot::RecvTimeoutError::Disconnected) => println!("Sender dropped!"),
317
+ /// }
318
+ /// ```
142
319
#[ unstable( feature = "oneshot_channel" , issue = "143674" ) ]
143
320
pub fn recv_deadline ( self , deadline : Instant ) -> Result < T , RecvTimeoutError < T > > {
144
321
self . inner . recv_deadline ( deadline) . map_err ( |err| match err {
@@ -160,6 +337,10 @@ impl<T> fmt::Debug for Receiver<T> {
160
337
////////////////////////////////////////////////////////////////////////////////////////////////////
161
338
162
339
/// An error returned from the [`try_recv`](Receiver::try_recv) method.
340
+ ///
341
+ /// See the documentation for [`try_recv`] for more information on how to use this error.
342
+ ///
343
+ /// [`try_recv`]: Receiver::try_recv
163
344
#[ unstable( feature = "oneshot_channel" , issue = "143674" ) ]
164
345
pub enum TryRecvError < T > {
165
346
/// The [`Sender`] has not sent a message yet, but it might in the future (as it has not yet
@@ -173,6 +354,36 @@ pub enum TryRecvError<T> {
173
354
174
355
/// An error returned from the [`recv_timeout`](Receiver::recv_timeout) or
175
356
/// [`recv_deadline`](Receiver::recv_deadline) methods.
357
+ ///
358
+ /// # Examples
359
+ ///
360
+ /// Usage of this error is similar to [`TryRecvError`].
361
+ ///
362
+ /// ```
363
+ /// # #![feature(oneshot_channel)]
364
+ /// # use std::sync::oneshot::{self, RecvTimeoutError};
365
+ /// # use std::thread;
366
+ /// # use std::time::Duration;
367
+ /// #
368
+ /// let (sender, receiver) = oneshot::channel();
369
+ ///
370
+ /// thread::spawn(move || {
371
+ /// // Simulate a long computation that takes longer than our timeout.
372
+ /// thread::sleep(Duration::from_millis(500));
373
+ /// sender.send("Too late!".to_string()).unwrap();
374
+ /// });
375
+ ///
376
+ /// // Try to receive the message with a short timeout.
377
+ /// match receiver.recv_timeout(Duration::from_millis(100)) {
378
+ /// Ok(msg) => println!("Received: {}", msg),
379
+ /// Err(RecvTimeoutError::Timeout(rx)) => {
380
+ /// println!("Timed out waiting for message!");
381
+ /// // You can reuse the receiver if needed.
382
+ /// drop(rx);
383
+ /// }
384
+ /// Err(RecvTimeoutError::Disconnected) => println!("Sender dropped!"),
385
+ /// }
386
+ /// ```
176
387
#[ unstable( feature = "oneshot_channel" , issue = "143674" ) ]
177
388
pub enum RecvTimeoutError < T > {
178
389
/// The [`Sender`] has not sent a message yet, but it might in the future (as it has not yet
0 commit comments