1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use future::{Future, IntoFuture};
use stream::Stream;
use poll::Poll;
use Async;
use stack::{Stack, Drain};
use std::sync::Arc;
use task::{self, UnparkEvent};
use std::prelude::v1::*;
#[must_use = "streams do nothing unless polled"]
pub struct FuturesUnordered<F>
where F: Future
{
futures: Vec<Option<F>>,
stack: Arc<Stack<usize>>,
pending: Option<Drain<usize>>,
active: usize,
}
pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
where I: IntoIterator,
I::Item: IntoFuture
{
let futures = futures.into_iter()
.map(IntoFuture::into_future)
.map(Some)
.collect::<Vec<_>>();
let stack = Arc::new(Stack::new());
for i in 0..futures.len() {
stack.push(i);
}
FuturesUnordered {
active: futures.len(),
futures: futures,
pending: None,
stack: stack,
}
}
impl<F> FuturesUnordered<F>
where F: Future
{
fn poll_pending(&mut self, mut drain: Drain<usize>)
-> Option<Poll<Option<F::Item>, F::Error>> {
while let Some(id) = drain.next() {
let event = UnparkEvent::new(self.stack.clone(), id);
let ret = match task::with_unpark_event(event, || {
self.futures[id]
.as_mut()
.unwrap()
.poll()
}) {
Ok(Async::NotReady) => continue,
Ok(Async::Ready(val)) => Ok(Async::Ready(Some(val))),
Err(e) => Err(e),
};
self.pending = Some(drain);
self.active -= 1;
self.futures[id] = None;
return Some(ret)
}
None
}
}
impl<F> Stream for FuturesUnordered<F>
where F: Future
{
type Item = F::Item;
type Error = F::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.active == 0 {
return Ok(Async::Ready(None))
}
if let Some(drain) = self.pending.take() {
if let Some(ret) = self.poll_pending(drain){
return ret
}
}
let drain = self.stack.drain();
if let Some(ret) = self.poll_pending(drain) {
return ret
}
assert!(self.active > 0);
Ok(Async::NotReady)
}
}