diff --git a/src/libcore/future.rs b/src/libcore/future.rs
index cd0d2b25e95d9..122f671447287 100644
--- a/src/libcore/future.rs
+++ b/src/libcore/future.rs
@@ -34,12 +34,16 @@ export future_pipe;
#[doc = "The future type"]
struct Future {
/*priv*/ mut state: FutureState,
+
+ // FIXME(#2829) -- futures should not be copyable, because they close
+ // over fn~'s that have pipes and so forth within!
+ drop {}
}
priv enum FutureState {
- Pending(fn@() -> A),
+ Pending(fn~() -> A),
Evaluating,
- Forced(A)
+ Forced(~A)
}
/// Methods on the `future` type
@@ -71,7 +75,7 @@ fn from_value(+val: A) -> Future {
* not block.
*/
- Future {state: Forced(val)}
+ Future {state: Forced(~val)}
}
fn from_port(+port: future_pipe::client::waiting) -> Future {
@@ -88,12 +92,12 @@ fn from_port(+port: future_pipe::client::waiting) -> Future {
port_ <-> *port;
let port = option::unwrap(port_);
match recv(port) {
- future_pipe::completed(move data) => data
+ future_pipe::completed(move data) => data
}
}
}
-fn from_fn(+f: @fn() -> A) -> Future {
+fn from_fn(+f: ~fn() -> A) -> Future {
/*!
* Create a future from a function.
*
@@ -135,7 +139,7 @@ fn get_ref(future: &r/Future) -> &r/A {
match future.state {
Forced(ref v) => { // v here has type &A, but with a shorter lifetime.
- return unsafe{ copy_lifetime(future, v) }; // ...extend it.
+ return unsafe{ copy_lifetime(future, &**v) }; // ...extend it.
}
Evaluating => {
fail ~"Recursive forcing of future!";
@@ -150,7 +154,7 @@ fn get_ref(future: &r/Future) -> &r/A {
fail ~"Logic error.";
}
Pending(move f) => {
- future.state = Forced(f());
+ future.state = Forced(~f());
return get_ref(future);
}
}
@@ -239,4 +243,14 @@ mod test {
let f = spawn(|| fail);
let _x: ~str = get(&f);
}
-}
\ No newline at end of file
+
+ #[test]
+ fn test_sendable_future() {
+ let expected = ~"schlorf";
+ let f = do spawn |copy expected| { expected };
+ do task::spawn {
+ let actual = get(&f);
+ assert actual == expected;
+ }
+ }
+}
diff --git a/src/libcore/task.rs b/src/libcore/task.rs
index 6db50291b95f3..e7101f587177e 100644
--- a/src/libcore/task.rs
+++ b/src/libcore/task.rs
@@ -29,6 +29,7 @@
use cmp::Eq;
use result::Result;
+use pipes::{stream, Chan, Port};
export Task;
export TaskResult;
@@ -77,6 +78,10 @@ export ThreadPerTask;
export ManualThreads;
export PlatformThread;
+macro_rules! move_it (
+ { $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
+)
+
/* Data types */
/// A handle to a task
@@ -203,7 +208,7 @@ type SchedOpts = {
type TaskOpts = {
linked: bool,
supervised: bool,
- notify_chan: Option>,
+ mut notify_chan: Option>,
sched: Option,
};
@@ -214,7 +219,7 @@ type TaskOpts = {
*/
// NB: Builders are designed to be single-use because they do stateful
// things that get weird when reusing - e.g. if you create a result future
-// it only applies to a single task, so then you have to maintain some
+// it only applies to a single task, so then you have to maintain Some
// potentially tricky state to ensure that everything behaves correctly
// when you try to reuse the builder to spawn a new task. We'll just
// sidestep that whole issue by making builders uncopyable and making
@@ -241,14 +246,28 @@ fn task() -> TaskBuilder {
mut consumed: false,
})
}
-
priv impl TaskBuilder {
fn consume() -> TaskBuilder {
if self.consumed {
fail ~"Cannot copy a task_builder"; // Fake move mode on self
}
self.consumed = true;
- TaskBuilder({ can_not_copy: None, mut consumed: false,.. *self })
+ let notify_chan = if self.opts.notify_chan.is_none() {
+ None
+ } else {
+ Some(option::swap_unwrap(&mut self.opts.notify_chan))
+ };
+ TaskBuilder({
+ opts: {
+ linked: self.opts.linked,
+ supervised: self.opts.supervised,
+ mut notify_chan: notify_chan,
+ sched: self.opts.sched
+ },
+ gen_body: self.gen_body,
+ can_not_copy: None,
+ mut consumed: false
+ })
}
}
@@ -258,8 +277,18 @@ impl TaskBuilder {
* the other will not be killed.
*/
fn unlinked() -> TaskBuilder {
+ let notify_chan = if self.opts.notify_chan.is_none() {
+ None
+ } else {
+ Some(option::swap_unwrap(&mut self.opts.notify_chan))
+ };
TaskBuilder({
- opts: { linked: false,.. self.opts },
+ opts: {
+ linked: false,
+ supervised: self.opts.supervised,
+ mut notify_chan: notify_chan,
+ sched: self.opts.sched
+ },
can_not_copy: None,
.. *self.consume()
})
@@ -270,8 +299,18 @@ impl TaskBuilder {
* the child.
*/
fn supervised() -> TaskBuilder {
+ let notify_chan = if self.opts.notify_chan.is_none() {
+ None
+ } else {
+ Some(option::swap_unwrap(&mut self.opts.notify_chan))
+ };
TaskBuilder({
- opts: { linked: false, supervised: true,.. self.opts },
+ opts: {
+ linked: false,
+ supervised: true,
+ mut notify_chan: notify_chan,
+ sched: self.opts.sched
+ },
can_not_copy: None,
.. *self.consume()
})
@@ -281,8 +320,18 @@ impl TaskBuilder {
* other will be killed.
*/
fn linked() -> TaskBuilder {
+ let notify_chan = if self.opts.notify_chan.is_none() {
+ None
+ } else {
+ Some(option::swap_unwrap(&mut self.opts.notify_chan))
+ };
TaskBuilder({
- opts: { linked: true, supervised: false,.. self.opts },
+ opts: {
+ linked: true,
+ supervised: false,
+ mut notify_chan: notify_chan,
+ sched: self.opts.sched
+ },
can_not_copy: None,
.. *self.consume()
})
@@ -316,27 +365,40 @@ impl TaskBuilder {
}
// Construct the future and give it to the caller.
- let po = comm::Port::();
- let ch = comm::Chan(po);
+ let (notify_pipe_ch, notify_pipe_po) = stream::();
blk(do future::from_fn {
- match comm::recv(po) {
+ match notify_pipe_po.recv() {
Exit(_, result) => result
}
});
// Reconfigure self to use a notify channel.
TaskBuilder({
- opts: { notify_chan: Some(ch),.. self.opts },
+ opts: {
+ linked: self.opts.linked,
+ supervised: self.opts.supervised,
+ mut notify_chan: Some(notify_pipe_ch),
+ sched: self.opts.sched
+ },
can_not_copy: None,
.. *self.consume()
})
}
/// Configure a custom scheduler mode for the task.
fn sched_mode(mode: SchedMode) -> TaskBuilder {
+ let notify_chan = if self.opts.notify_chan.is_none() {
+ None
+ } else {
+ Some(option::swap_unwrap(&mut self.opts.notify_chan))
+ };
TaskBuilder({
- opts: { sched: Some({ mode: mode, foreign_stack_size: None}),
- .. self.opts },
+ opts: {
+ linked: self.opts.linked,
+ supervised: self.opts.supervised,
+ mut notify_chan: notify_chan,
+ sched: Some({ mode: mode, foreign_stack_size: None})
+ },
can_not_copy: None,
.. *self.consume()
})
@@ -356,7 +418,18 @@ impl TaskBuilder {
*/
fn add_wrapper(wrapper: fn@(+fn~()) -> fn~()) -> TaskBuilder {
let prev_gen_body = self.gen_body;
+ let notify_chan = if self.opts.notify_chan.is_none() {
+ None
+ } else {
+ Some(option::swap_unwrap(&mut self.opts.notify_chan))
+ };
TaskBuilder({
+ opts: {
+ linked: self.opts.linked,
+ supervised: self.opts.supervised,
+ mut notify_chan: notify_chan,
+ sched: self.opts.sched
+ },
gen_body: |body| { wrapper(prev_gen_body(body)) },
can_not_copy: None,
.. *self.consume()
@@ -376,8 +449,21 @@ impl TaskBuilder {
* must be greater than zero.
*/
fn spawn(+f: fn~()) {
+ let notify_chan = if self.opts.notify_chan.is_none() {
+ None
+ } else {
+ let swapped_notify_chan =
+ option::swap_unwrap(&mut self.opts.notify_chan);
+ Some(swapped_notify_chan)
+ };
let x = self.consume();
- spawn_raw(x.opts, x.gen_body(f));
+ let opts = {
+ linked: x.opts.linked,
+ supervised: x.opts.supervised,
+ mut notify_chan: notify_chan,
+ sched: x.opts.sched
+ };
+ spawn_raw(opts, x.gen_body(f));
}
/// Runs a task, while transfering ownership of one argument to the child.
fn spawn_with(+arg: A, +f: fn~(+A)) {
@@ -394,7 +480,7 @@ impl TaskBuilder {
* child task, passes the port to child's body, and returns a channel
* linked to the port to the parent.
*
- * This encapsulates some boilerplate handshaking logic that would
+ * This encapsulates Some boilerplate handshaking logic that would
* otherwise be required to establish communication from the parent
* to the child.
*/
@@ -442,7 +528,8 @@ impl TaskBuilder {
let ch = comm::Chan(po);
let mut result = None;
- do self.future_result(|+r| { result = Some(r); }).spawn {
+ let fr_task_builder = self.future_result(|+r| { result = Some(r); });
+ do fr_task_builder.spawn {
comm::send(ch, f());
}
match future::get(&option::unwrap(result)) {
@@ -466,7 +553,7 @@ fn default_task_opts() -> TaskOpts {
{
linked: true,
supervised: false,
- notify_chan: None,
+ mut notify_chan: None,
sched: None
}
}
@@ -872,7 +959,7 @@ fn each_ancestor(list: &mut AncestorList,
// 'do_continue' - Did the forward_blk succeed at this point? (i.e.,
// should we recurse? or should our callers unwind?)
- // The map defaults to none, because if ancestors is none, we're at
+ // The map defaults to None, because if ancestors is None, we're at
// the end of the list, which doesn't make sense to coalesce.
return do (**ancestors).map_default((None,false)) |ancestor_arc| {
// NB: Takes a lock! (this ancestor node)
@@ -995,15 +1082,15 @@ fn TCB(me: *rust_task, +tasks: TaskGroupArc, +ancestors: AncestorList,
}
struct AutoNotify {
- notify_chan: comm::Chan,
+ notify_chan: Chan,
mut failed: bool,
drop {
let result = if self.failed { Failure } else { Success };
- comm::send(self.notify_chan, Exit(get_task(), result));
+ self.notify_chan.send(Exit(get_task(), result));
}
}
-fn AutoNotify(chan: comm::Chan) -> AutoNotify {
+fn AutoNotify(+chan: Chan) -> AutoNotify {
AutoNotify {
notify_chan: chan,
failed: true // Un-set above when taskgroup successfully made.
@@ -1013,7 +1100,7 @@ fn AutoNotify(chan: comm::Chan) -> AutoNotify {
fn enlist_in_taskgroup(state: TaskGroupInner, me: *rust_task,
is_member: bool) -> bool {
let newstate = util::replace(state, None);
- // If 'none', the group was failing. Can't enlist.
+ // If 'None', the group was failing. Can't enlist.
if newstate.is_some() {
let group = option::unwrap(newstate);
taskset_insert(if is_member { &mut group.members }
@@ -1028,7 +1115,7 @@ fn enlist_in_taskgroup(state: TaskGroupInner, me: *rust_task,
// NB: Runs in destructor/post-exit context. Can't 'fail'.
fn leave_taskgroup(state: TaskGroupInner, me: *rust_task, is_member: bool) {
let newstate = util::replace(state, None);
- // If 'none', already failing and we've already gotten a kill signal.
+ // If 'None', already failing and we've already gotten a kill signal.
if newstate.is_some() {
let group = option::unwrap(newstate);
taskset_remove(if is_member { &mut group.members }
@@ -1048,9 +1135,9 @@ fn kill_taskgroup(state: TaskGroupInner, me: *rust_task, is_main: bool) {
// To do it differently, we'd have to use the runtime's task refcounting,
// but that could leave task structs around long after their task exited.
let newstate = util::replace(state, None);
- // Might already be none, if somebody is failing simultaneously.
+ // Might already be None, if Somebody is failing simultaneously.
// That's ok; only one task needs to do the dirty work. (Might also
- // see 'none' if somebody already failed and we got a kill signal.)
+ // see 'None' if Somebody already failed and we got a kill signal.)
if newstate.is_some() {
let group = option::unwrap(newstate);
for taskset_each(&group.members) |+sibling| {
@@ -1067,7 +1154,7 @@ fn kill_taskgroup(state: TaskGroupInner, me: *rust_task, is_main: bool) {
if is_main {
rustrt::rust_task_kill_all(me);
}
- // Do NOT restore state to Some(..)! It stays none to indicate
+ // Do NOT restore state to Some(..)! It stays None to indicate
// that the whole taskgroup is failing, to forbid new spawns.
}
// (note: multiple tasks may reach this point)
@@ -1145,7 +1232,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
// Appease the borrow-checker. Really this wants to be written as:
// match ancestors
// Some(ancestor_arc) { ancestor_list(Some(ancestor_arc.clone())) }
- // none { ancestor_list(none) }
+ // None { ancestor_list(None) }
let tmp = util::replace(&mut **ancestors, None);
if tmp.is_some() {
let ancestor_arc = option::unwrap(tmp);
@@ -1175,10 +1262,15 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) {
};
assert !new_task.is_null();
// Getting killed after here would leak the task.
+ let mut notify_chan = if opts.notify_chan.is_none() {
+ None
+ } else {
+ Some(option::swap_unwrap(&mut opts.notify_chan))
+ };
let child_wrapper =
make_child_wrapper(new_task, child_tg, ancestors, is_main,
- opts.notify_chan, f);
+ notify_chan, f);
let fptr = ptr::addr_of(child_wrapper);
let closure: *rust_closure = unsafe::reinterpret_cast(&fptr);
@@ -1198,17 +1290,25 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) {
// (4) ...and runs the provided body function.
fn make_child_wrapper(child: *rust_task, +child_arc: TaskGroupArc,
+ancestors: AncestorList, is_main: bool,
- notify_chan: Option>,
+ +notify_chan: Option>,
+f: fn~()) -> fn~() {
let child_data = ~mut Some((child_arc, ancestors));
- return fn~() {
+ return fn~(move notify_chan) {
// Agh. Get move-mode items into the closure. FIXME (#2829)
let mut (child_arc, ancestors) = option::swap_unwrap(child_data);
// Child task runs this code.
// Even if the below code fails to kick the child off, we must
- // send something on the notify channel.
- let notifier = notify_chan.map(|c| AutoNotify(c));
+ // send Something on the notify channel.
+
+ //let mut notifier = None;//notify_chan.map(|c| AutoNotify(c));
+ let notifier = match notify_chan {
+ Some(notify_chan_value) => {
+ let moved_ncv = move_it!(notify_chan_value);
+ Some(AutoNotify(moved_ncv))
+ }
+ _ => None
+ };
if enlist_many(child, &child_arc, &mut ancestors) {
let group = @TCB(child, child_arc, ancestors,
@@ -1221,7 +1321,7 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) {
};
// Set up membership in taskgroup and descendantship in all ancestor
- // groups. If any enlistment fails, some task was already failing, so
+ // groups. If any enlistment fails, Some task was already failing, so
// don't let the child task run, and undo every successful enlistment.
fn enlist_many(child: *rust_task, child_arc: &TaskGroupArc,
ancestors: &mut AncestorList) -> bool {
@@ -1387,7 +1487,7 @@ unsafe fn local_data_lookup(
}
);
do map_pos.map |index| {
- // .get() is guaranteed because of "none { false }" above.
+ // .get() is guaranteed because of "None { false }" above.
let (_, data_ptr, _) = (*map)[index].get();
(index, data_ptr)
}
@@ -1500,7 +1600,7 @@ unsafe fn local_data_set(
local_set(rustrt::rust_get_task(), key, data)
}
/**
- * Modify a task-local data value. If the function returns 'none', the
+ * Modify a task-local data value. If the function returns 'None', the
* data is removed (and its reference dropped).
*/
unsafe fn local_data_modify(
@@ -1563,6 +1663,7 @@ fn test_spawn_raw_simple() {
fn test_spawn_raw_unsupervise() {
let opts = {
linked: false,
+ mut notify_chan: None,
.. default_task_opts()
};
do spawn_raw(opts) {
@@ -1583,7 +1684,7 @@ fn test_cant_dup_task_builder() {
// The following 8 tests test the following 2^3 combinations:
// {un,}linked {un,}supervised failure propagation {up,down}wards.
-// !!! These tests are dangerous. If something is buggy, they will hang, !!!
+// !!! These tests are dangerous. If Something is buggy, they will hang, !!!
// !!! instead of exiting cleanly. This might wedge the buildbots. !!!
#[test] #[ignore(cfg(windows))]
@@ -1623,9 +1724,16 @@ fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
// Unidirectional "parenting" shouldn't override bidirectional linked.
// We have to cheat with opts - the interface doesn't support them because
// they don't make sense (redundant with task().supervised()).
+ let opts = {
+ let mut opts = default_task_opts();
+ opts.linked = true;
+ opts.supervised = true;
+ move opts
+ };
+
let b0 = task();
let b1 = TaskBuilder({
- opts: { linked: true, supervised: true,.. b0.opts },
+ opts: move opts,
can_not_copy: None,
.. *b0
});
@@ -1636,9 +1744,16 @@ fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
// We have to cheat with opts - the interface doesn't support them because
// they don't make sense (redundant with task().supervised()).
+ let opts = {
+ let mut opts = default_task_opts();
+ opts.linked = true;
+ opts.supervised = true;
+ move opts
+ };
+
let b0 = task();
let b1 = TaskBuilder({
- opts: { linked: true, supervised: true,.. b0.opts },
+ opts: move opts,
can_not_copy: None,
.. *b0
});
@@ -1719,21 +1834,27 @@ fn test_spawn_linked_sup_propagate_sibling() {
#[test]
#[ignore(cfg(windows))]
-fn test_spawn_raw_notify() {
- let task_po = comm::Port();
- let task_ch = comm::Chan(task_po);
- let notify_po = comm::Port();
- let notify_ch = comm::Chan(notify_po);
+fn test_spawn_raw_notify_success() {
+ let (task_ch, task_po) = pipes::stream();
+ let (notify_ch, notify_po) = pipes::stream();
let opts = {
- notify_chan: Some(notify_ch),
+ notify_chan: Some(move notify_ch),
.. default_task_opts()
};
- do spawn_raw(opts) {
- comm::send(task_ch, get_task());
+ do spawn_raw(opts) |move task_ch| {
+ task_ch.send(get_task());
}
- let task_ = comm::recv(task_po);
- assert comm::recv(notify_po) == Exit(task_, Success);
+ let task_ = task_po.recv();
+ assert notify_po.recv() == Exit(task_, Success);
+}
+
+#[test]
+#[ignore(cfg(windows))]
+fn test_spawn_raw_notify_failure() {
+ // New bindings for these
+ let (task_ch, task_po) = pipes::stream();
+ let (notify_ch, notify_po) = pipes::stream();
let opts = {
linked: false,
@@ -1741,11 +1862,11 @@ fn test_spawn_raw_notify() {
.. default_task_opts()
};
do spawn_raw(opts) {
- comm::send(task_ch, get_task());
+ task_ch.send(get_task());
fail;
}
- let task_ = comm::recv(task_po);
- assert comm::recv(notify_po) == Exit(task_, Failure);
+ let task_ = task_po.recv();
+ assert notify_po.recv() == Exit(task_, Failure);
}
#[test]
@@ -2043,8 +2164,13 @@ fn test_unkillable() {
let po = comm::Port();
let ch = po.chan();
+ let opts = {
+ let mut opts = default_task_opts();
+ opts.linked = false;
+ move opts
+ };
// We want to do this after failing
- do spawn_raw({ linked: false,.. default_task_opts() }) {
+ do spawn_raw(opts) {
for iter::repeat(10u) { yield() }
ch.send(());
}
@@ -2076,11 +2202,15 @@ fn test_unkillable() {
#[ignore(cfg(windows))]
#[should_fail]
fn test_unkillable_nested() {
- let po = comm::Port();
- let ch = po.chan();
+ let (ch, po) = pipes::stream();
// We want to do this after failing
- do spawn_raw({ linked: false,.. default_task_opts() }) {
+ let opts = {
+ let mut opts = default_task_opts();
+ opts.linked = false;
+ move opts
+ };
+ do spawn_raw(opts) {
for iter::repeat(10u) { yield() }
ch.send(());
}
@@ -2198,7 +2328,7 @@ fn test_tls_crust_automorestack_memorial_bug() unsafe {
// This might result in a stack-canary clobber if the runtime fails to set
// sp_limit to 0 when calling the cleanup extern - it might automatically
// jump over to the rust stack, which causes next_c_sp to get recorded as
- // something within a rust stack segment. Then a subsequent upcall (esp.
+ // Something within a rust stack segment. Then a subsequent upcall (esp.
// for logging, think vsnprintf) would run on a stack smaller than 1 MB.
fn my_key(+_x: @~str) { }
do task::spawn {
diff --git a/src/libcore/unsafe.rs b/src/libcore/unsafe.rs
index 2b57d694cbf30..9a984d5fcc630 100644
--- a/src/libcore/unsafe.rs
+++ b/src/libcore/unsafe.rs
@@ -415,13 +415,13 @@ mod tests {
for uint::range(0u, num_tasks) |_i| {
let total = total.clone();
- futures += ~[future::spawn(|| {
+ vec::push(futures, future::spawn(|| {
for uint::range(0u, count) |_i| {
do total.with |count| {
**count += 1u;
}
}
- })];
+ }));
};
for futures.each |f| { f.get() }
diff --git a/src/libstd/sync.rs b/src/libstd/sync.rs
index 45ab8d4c4274a..1f62fd63ad168 100644
--- a/src/libstd/sync.rs
+++ b/src/libstd/sync.rs
@@ -96,11 +96,11 @@ impl &Sem {
state.count -= 1;
if state.count < 0 {
// Create waiter nobe.
- let (signal_end, wait_end) = pipes::oneshot();
+ let (SignalEnd, WaitEnd) = pipes::oneshot();
// Tell outer scope we need to block.
- waiter_nobe = Some(wait_end);
+ waiter_nobe = Some(WaitEnd);
// Enqueue ourself.
- state.waiters.tail.send(signal_end);
+ state.waiters.tail.send(SignalEnd);
}
}
}
@@ -202,9 +202,9 @@ impl &Condvar {
*/
fn wait_on(condvar_id: uint) {
// Create waiter nobe.
- let (signal_end, wait_end) = pipes::oneshot();
- let mut wait_end = Some(wait_end);
- let mut signal_end = Some(signal_end);
+ let (SignalEnd, WaitEnd) = pipes::oneshot();
+ let mut WaitEnd = Some(WaitEnd);
+ let mut SignalEnd = Some(SignalEnd);
let mut reacquire = None;
let mut out_of_bounds = None;
unsafe {
@@ -218,8 +218,8 @@ impl &Condvar {
signal_waitqueue(&state.waiters);
}
// Enqueue ourself to be woken up by a signaller.
- let signal_end = option::swap_unwrap(&mut signal_end);
- state.blocked[condvar_id].tail.send(signal_end);
+ let SignalEnd = option::swap_unwrap(&mut SignalEnd);
+ state.blocked[condvar_id].tail.send(SignalEnd);
} else {
out_of_bounds = Some(vec::len(state.blocked));
}
@@ -238,7 +238,7 @@ impl &Condvar {
// Unconditionally "block". (Might not actually block if a
// signaller already sent -- I mean 'unconditionally' in contrast
// with acquire().)
- let _ = pipes::recv_one(option::swap_unwrap(&mut wait_end));
+ let _ = pipes::recv_one(option::swap_unwrap(&mut WaitEnd));
}
// This is needed for a failing condition variable to reacquire the
diff --git a/src/rustdoc/markdown_writer.rs b/src/rustdoc/markdown_writer.rs
index c9d7fe0adb79d..00c80366084a4 100644
--- a/src/rustdoc/markdown_writer.rs
+++ b/src/rustdoc/markdown_writer.rs
@@ -285,15 +285,14 @@ fn future_writer_factory(
}
fn future_writer() -> (writer, future::Future<~str>) {
- let port = comm::Port();
- let chan = comm::Chan(port);
+ let (chan, port) = pipes::stream();
let writer = fn~(+instr: writeinstr) {
- comm::send(chan, copy instr);
+ chan.send(copy instr);
};
let future = do future::from_fn {
let mut res = ~"";
loop {
- match comm::recv(port) {
+ match port.recv() {
write(s) => res += s,
done => break
}
diff --git a/src/test/bench/msgsend-ring-mutex-arcs.rs b/src/test/bench/msgsend-ring-mutex-arcs.rs
index 81babc00f0954..6cc9be29a22fa 100644
--- a/src/test/bench/msgsend-ring-mutex-arcs.rs
+++ b/src/test/bench/msgsend-ring-mutex-arcs.rs
@@ -82,7 +82,7 @@ fn main(args: ~[~str]) {
let num_chan2 = ~mut None;
*num_chan2 <-> num_chan;
let num_port = ~mut Some(num_port);
- futures += ~[future::spawn(|move num_chan2, move num_port| {
+ let new_future = future::spawn(|move num_chan2, move num_port| {
let mut num_chan = None;
num_chan <-> *num_chan2;
let mut num_port1 = None;
@@ -90,7 +90,8 @@ fn main(args: ~[~str]) {
thread_ring(i, msg_per_task,
option::unwrap(num_chan),
option::unwrap(num_port1))
- })];
+ });
+ vec::push(futures, new_future);
num_chan = Some(new_chan);
};
diff --git a/src/test/bench/msgsend-ring-pipes.rs b/src/test/bench/msgsend-ring-pipes.rs
index 63ac80536afc2..8c21db3d2f505 100644
--- a/src/test/bench/msgsend-ring-pipes.rs
+++ b/src/test/bench/msgsend-ring-pipes.rs
@@ -78,7 +78,8 @@ fn main(args: ~[~str]) {
let num_chan2 = ~mut None;
*num_chan2 <-> num_chan;
let num_port = ~mut Some(num_port);
- futures += ~[future::spawn(|move num_chan2, move num_port| {
+ let new_future = do future::spawn
+ |move num_chan2, move num_port| {
let mut num_chan = None;
num_chan <-> *num_chan2;
let mut num_port1 = None;
@@ -86,7 +87,8 @@ fn main(args: ~[~str]) {
thread_ring(i, msg_per_task,
option::unwrap(num_chan),
option::unwrap(num_port1))
- })];
+ };
+ vec::push(futures, new_future);
num_chan = Some(new_chan);
};
diff --git a/src/test/bench/msgsend-ring-rw-arcs.rs b/src/test/bench/msgsend-ring-rw-arcs.rs
index 745e1e4e758eb..82621c2f7e912 100644
--- a/src/test/bench/msgsend-ring-rw-arcs.rs
+++ b/src/test/bench/msgsend-ring-rw-arcs.rs
@@ -82,7 +82,8 @@ fn main(args: ~[~str]) {
let num_chan2 = ~mut None;
*num_chan2 <-> num_chan;
let num_port = ~mut Some(num_port);
- futures += ~[future::spawn(|move num_chan2, move num_port| {
+ let new_future = do future::spawn
+ |move num_chan2, move num_port| {
let mut num_chan = None;
num_chan <-> *num_chan2;
let mut num_port1 = None;
@@ -90,7 +91,8 @@ fn main(args: ~[~str]) {
thread_ring(i, msg_per_task,
option::unwrap(num_chan),
option::unwrap(num_port1))
- })];
+ };
+ vec::push(futures, new_future);
num_chan = Some(new_chan);
};
diff --git a/src/test/bench/msgsend-ring.rs b/src/test/bench/msgsend-ring.rs
index d972dde4c4aef..15a18725abf65 100644
--- a/src/test/bench/msgsend-ring.rs
+++ b/src/test/bench/msgsend-ring.rs
@@ -45,11 +45,13 @@ fn main(args: ~[~str]) {
let get_chan = Port();
let get_chan_chan = Chan(get_chan);
- futures += ~[do future::spawn |copy num_chan, move get_chan_chan| {
+ let new_future = do future::spawn
+ |copy num_chan, move get_chan_chan| {
let p = Port();
get_chan_chan.send(Chan(p));
thread_ring(i, msg_per_task, num_chan, p)
- }];
+ };
+ vec::push(futures, new_future);
num_chan = get_chan.recv();
};
diff --git a/src/test/bench/shootout-pfib.rs b/src/test/bench/shootout-pfib.rs
index 5e7648b5f85c8..17ecf3fb332ee 100644
--- a/src/test/bench/shootout-pfib.rs
+++ b/src/test/bench/shootout-pfib.rs
@@ -71,7 +71,7 @@ fn stress(num_tasks: int) {
let mut results = ~[];
for range(0, num_tasks) |i| {
do task::task().future_result(|+r| {
- results += ~[r];
+ vec::push(results, r);
}).spawn {
stress_task(i);
}
diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs
index 2dc33fa52bd88..e47daceb6e6f4 100644
--- a/src/test/run-pass/task-comm-3.rs
+++ b/src/test/run-pass/task-comm-3.rs
@@ -31,7 +31,7 @@ fn test00() {
while i < number_of_tasks {
let ch = po.chan();
do task::task().future_result(|+r| {
- results += ~[r];
+ vec::push(results, r);
}).spawn |copy i| {
test00_start(ch, i, number_of_messages)
}
diff --git a/src/test/run-pass/task-comm.rs b/src/test/run-pass/task-comm.rs
index 5320ced981d57..ca34d42423fa6 100644
--- a/src/test/run-pass/task-comm.rs
+++ b/src/test/run-pass/task-comm.rs
@@ -40,7 +40,7 @@ fn test00() {
while i < number_of_tasks {
i = i + 1;
do task::task().future_result(|+r| {
- results += ~[r];
+ vec::push(results, r);
}).spawn |copy i| {
test00_start(ch, i, number_of_messages);
}
@@ -127,7 +127,7 @@ fn test06() {
while i < number_of_tasks {
i = i + 1;
do task::task().future_result(|+r| {
- results += ~[r];
+ vec::push(results, r);
}).spawn |copy i| {
test06_start(i);
};