1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
//! An example to illustrate selecting by macro or manually.
use std::{future::Future, pin::pin};
use monoio::{io::Canceller, net::TcpListener, time::Duration};
use pin_project_lite::pin_project;
#[monoio::main(enable_timer = true)]
async fn main() {
// You can write poll by yourself
let fut1 = monoio::time::sleep(Duration::from_secs(1));
let fut2 = monoio::time::sleep(Duration::from_secs(20));
let dual_select = DualSelect { fut1, fut2 };
dual_select.await;
println!("manually select returned");
// Or you can use select macro
let fut1 = monoio::time::sleep(Duration::from_secs(1));
let fut2 = monoio::time::sleep(Duration::from_secs(20));
monoio::select! {
_ = fut1 => {
println!("select returned with macro");
}
_ = fut2 => {
println!("bug! something bad happened!");
}
}
// A very important thing is if you use select to do timeout io,
// you must cancel it manually.
// In epoll, dropping a read future has no side effect since we
// only wait for io ready, after waiting aborted, no syscall
// will be made.
// In io_uring, kernel may execute syscall at any time. So drop
// a read future(even a AsyncCancel op will be issued after
// dropping) does not mean it will be canceled instantly.
// It may canceled, success or fail. We encourage you use our
// cancelable io when you want to do io with timeout.
//
// Another option for timeout io is using readable/writable.
// In this case you can sense io readiness. But it is not
// a good way.
let canceller = Canceller::new();
let timeout_fut = monoio::time::sleep(Duration::from_millis(100));
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let mut io_fut = pin!(listener.cancelable_accept(canceller.handle()));
monoio::select! {
_ = timeout_fut => {
println!("accept timeout but not sure!");
canceller.cancel();
// Here we must check io_fut again because it may succeed between timeout and canceled.
// The io is canceled, so await for it is expected to return instantly.
let res = io_fut.await;
match res {
Ok(_) => println!("connection accepted even timeout!"),
Err(_) => println!("we are sure accept timeout!"),
}
}
_ = &mut io_fut => {
println!("connection accepted!");
}
}
}
pin_project! {
struct DualSelect<F> {
#[pin]
fut1: F,
#[pin]
fut2: F
}
}
impl<F> Future for DualSelect<F>
where
F: Future<Output = ()>,
{
type Output = ();
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
match this.fut1.poll(cx) {
std::task::Poll::Ready(_) => return std::task::Poll::Ready(()),
std::task::Poll::Pending => {}
}
match this.fut2.poll(cx) {
std::task::Poll::Ready(_) => std::task::Poll::Ready(()),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
|