From b447e0562262275de6826d65e9ca65eee8f1f0d6 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Sat, 18 Aug 2012 08:05:27 -0700 Subject: [PATCH 01/15] core: port task.rs to comm::Chan/Port to pipes::Chan/Port --- src/libcore/task.rs | 173 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 135 insertions(+), 38 deletions(-) diff --git a/src/libcore/task.rs b/src/libcore/task.rs index 6db50291b95f3..f84e0ac07605a 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -29,6 +29,7 @@ use cmp::Eq; use result::Result; +use pipes::{stream, Chan, Port}; export Task; export TaskResult; @@ -77,6 +78,10 @@ export ThreadPerTask; export ManualThreads; export PlatformThread; +macro_rules! move_it { + { $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } } +} + /* Data types */ /// A handle to a task @@ -203,8 +208,8 @@ type SchedOpts = { type TaskOpts = { linked: bool, supervised: bool, - notify_chan: Option>, - sched: Option, + mut notify_chan: Option>, + sched: option, }; /** @@ -214,7 +219,7 @@ type TaskOpts = { */ // NB: Builders are designed to be single-use because they do stateful // things that get weird when reusing - e.g. if you create a result future -// it only applies to a single task, so then you have to maintain some +// it only applies to a single task, so then you have to maintain Some // potentially tricky state to ensure that everything behaves correctly // when you try to reuse the builder to spawn a new task. We'll just // sidestep that whole issue by making builders uncopyable and making @@ -248,7 +253,22 @@ priv impl TaskBuilder { fail ~"Cannot copy a task_builder"; // Fake move mode on self } self.consumed = true; - TaskBuilder({ can_not_copy: None, mut consumed: false,.. *self }) + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; + TaskBuilder({ + opts: { + linked: self.opts.linked, + supervised: self.opts.supervised, + mut notify_chan: notify_chan, + sched: self.opts.sched + }, + gen_body: self.gen_body, + can_not_copy: None, + mut consumed: false + }) } } @@ -258,8 +278,18 @@ impl TaskBuilder { * the other will not be killed. */ fn unlinked() -> TaskBuilder { + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; TaskBuilder({ - opts: { linked: false,.. self.opts }, + opts: { + linked: false, + supervised: self.opts.supervised, + mut notify_chan: notify_chan, + sched: self.opts.sched + }, can_not_copy: None, .. *self.consume() }) @@ -270,8 +300,18 @@ impl TaskBuilder { * the child. */ fn supervised() -> TaskBuilder { + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; TaskBuilder({ - opts: { linked: false, supervised: true,.. self.opts }, + opts: { + linked: false, + supervised: true, + mut notify_chan: notify_chan, + sched: self.opts.sched + }, can_not_copy: None, .. *self.consume() }) @@ -281,8 +321,18 @@ impl TaskBuilder { * other will be killed. */ fn linked() -> TaskBuilder { + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; TaskBuilder({ - opts: { linked: true, supervised: false,.. self.opts }, + opts: { + linked: true, + supervised: false, + mut notify_chan: notify_chan, + sched: self.opts.sched + }, can_not_copy: None, .. *self.consume() }) @@ -316,11 +366,10 @@ impl TaskBuilder { } // Construct the future and give it to the caller. - let po = comm::Port::(); - let ch = comm::Chan(po); + let (ch, po) = stream::(); blk(do future::from_fn { - match comm::recv(po) { + match po.recv() { Exit(_, result) => result } }); @@ -334,9 +383,18 @@ impl TaskBuilder { } /// Configure a custom scheduler mode for the task. fn sched_mode(mode: SchedMode) -> TaskBuilder { + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; TaskBuilder({ - opts: { sched: Some({ mode: mode, foreign_stack_size: None}), - .. self.opts }, + opts: { + linked: self.opts.linked, + supervised: self.opts.supervised, + mut notify_chan: notify_chan, + sched: Some({ mode: mode, foreign_stack_size: None}) + }, can_not_copy: None, .. *self.consume() }) @@ -356,7 +414,18 @@ impl TaskBuilder { */ fn add_wrapper(wrapper: fn@(+fn~()) -> fn~()) -> TaskBuilder { let prev_gen_body = self.gen_body; + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; TaskBuilder({ + opts: { + linked: self.opts.linked, + supervised: self.opts.supervised, + mut notify_chan: notify_chan, + sched: self.opts.sched + }, gen_body: |body| { wrapper(prev_gen_body(body)) }, can_not_copy: None, .. *self.consume() @@ -377,7 +446,18 @@ impl TaskBuilder { */ fn spawn(+f: fn~()) { let x = self.consume(); - spawn_raw(x.opts, x.gen_body(f)); + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; + let opts = { + linked: x.opts.linked, + supervised: x.opts.supervised, + mut notify_chan: notify_chan, + sched: x.opts.sched + }; + spawn_raw(opts, x.gen_body(f)); } /// Runs a task, while transfering ownership of one argument to the child. fn spawn_with(+arg: A, +f: fn~(+A)) { @@ -394,7 +474,7 @@ impl TaskBuilder { * child task, passes the port to child's body, and returns a channel * linked to the port to the parent. * - * This encapsulates some boilerplate handshaking logic that would + * This encapsulates Some boilerplate handshaking logic that would * otherwise be required to establish communication from the parent * to the child. */ @@ -466,7 +546,7 @@ fn default_task_opts() -> TaskOpts { { linked: true, supervised: false, - notify_chan: None, + mut notify_chan: None, sched: None } } @@ -872,7 +952,7 @@ fn each_ancestor(list: &mut AncestorList, // 'do_continue' - Did the forward_blk succeed at this point? (i.e., // should we recurse? or should our callers unwind?) - // The map defaults to none, because if ancestors is none, we're at + // The map defaults to None, because if ancestors is None, we're at // the end of the list, which doesn't make sense to coalesce. return do (**ancestors).map_default((None,false)) |ancestor_arc| { // NB: Takes a lock! (this ancestor node) @@ -950,7 +1030,11 @@ fn each_ancestor(list: &mut AncestorList, struct TCB { me: *rust_task, // List of tasks with whose fates this one's is intertwined. +<<<<<<< HEAD tasks: TaskGroupArc, // 'none' means the group has failed. +======= + let tasks: TaskGroupArc; // 'None' means the group has failed. +>>>>>>> core: port task.rs to comm::Chan/Port to pipes::Chan/Port // Lists of tasks who will kill us if they fail, but whom we won't kill. mut ancestors: AncestorList, is_main: bool, @@ -995,15 +1079,15 @@ fn TCB(me: *rust_task, +tasks: TaskGroupArc, +ancestors: AncestorList, } struct AutoNotify { - notify_chan: comm::Chan, + notify_chan: Chan, mut failed: bool, drop { let result = if self.failed { Failure } else { Success }; - comm::send(self.notify_chan, Exit(get_task(), result)); + self.notify_chan.send(Exit(get_task(), result)); } } -fn AutoNotify(chan: comm::Chan) -> AutoNotify { +fn AutoNotify(chan: Chan) -> AutoNotify { AutoNotify { notify_chan: chan, failed: true // Un-set above when taskgroup successfully made. @@ -1013,7 +1097,7 @@ fn AutoNotify(chan: comm::Chan) -> AutoNotify { fn enlist_in_taskgroup(state: TaskGroupInner, me: *rust_task, is_member: bool) -> bool { let newstate = util::replace(state, None); - // If 'none', the group was failing. Can't enlist. + // If 'None', the group was failing. Can't enlist. if newstate.is_some() { let group = option::unwrap(newstate); taskset_insert(if is_member { &mut group.members } @@ -1028,7 +1112,7 @@ fn enlist_in_taskgroup(state: TaskGroupInner, me: *rust_task, // NB: Runs in destructor/post-exit context. Can't 'fail'. fn leave_taskgroup(state: TaskGroupInner, me: *rust_task, is_member: bool) { let newstate = util::replace(state, None); - // If 'none', already failing and we've already gotten a kill signal. + // If 'None', already failing and we've already gotten a kill signal. if newstate.is_some() { let group = option::unwrap(newstate); taskset_remove(if is_member { &mut group.members } @@ -1048,9 +1132,9 @@ fn kill_taskgroup(state: TaskGroupInner, me: *rust_task, is_main: bool) { // To do it differently, we'd have to use the runtime's task refcounting, // but that could leave task structs around long after their task exited. let newstate = util::replace(state, None); - // Might already be none, if somebody is failing simultaneously. + // Might already be None, if Somebody is failing simultaneously. // That's ok; only one task needs to do the dirty work. (Might also - // see 'none' if somebody already failed and we got a kill signal.) + // see 'None' if Somebody already failed and we got a kill signal.) if newstate.is_some() { let group = option::unwrap(newstate); for taskset_each(&group.members) |+sibling| { @@ -1067,7 +1151,7 @@ fn kill_taskgroup(state: TaskGroupInner, me: *rust_task, is_main: bool) { if is_main { rustrt::rust_task_kill_all(me); } - // Do NOT restore state to Some(..)! It stays none to indicate + // Do NOT restore state to Some(..)! It stays None to indicate // that the whole taskgroup is failing, to forbid new spawns. } // (note: multiple tasks may reach this point) @@ -1145,7 +1229,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool) // Appease the borrow-checker. Really this wants to be written as: // match ancestors // Some(ancestor_arc) { ancestor_list(Some(ancestor_arc.clone())) } - // none { ancestor_list(none) } + // None { ancestor_list(None) } let tmp = util::replace(&mut **ancestors, None); if tmp.is_some() { let ancestor_arc = option::unwrap(tmp); @@ -1175,10 +1259,15 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) { }; assert !new_task.is_null(); // Getting killed after here would leak the task. + let mut notify_chan = if opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut opts.notify_chan)) + }; let child_wrapper = make_child_wrapper(new_task, child_tg, ancestors, is_main, - opts.notify_chan, f); + notify_chan, f); let fptr = ptr::addr_of(child_wrapper); let closure: *rust_closure = unsafe::reinterpret_cast(&fptr); @@ -1198,17 +1287,25 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) { // (4) ...and runs the provided body function. fn make_child_wrapper(child: *rust_task, +child_arc: TaskGroupArc, +ancestors: AncestorList, is_main: bool, - notify_chan: Option>, + +notify_chan: Option>, +f: fn~()) -> fn~() { let child_data = ~mut Some((child_arc, ancestors)); - return fn~() { + return fn~(move notify_chan) { // Agh. Get move-mode items into the closure. FIXME (#2829) let mut (child_arc, ancestors) = option::swap_unwrap(child_data); // Child task runs this code. // Even if the below code fails to kick the child off, we must - // send something on the notify channel. - let notifier = notify_chan.map(|c| AutoNotify(c)); + // send Something on the notify channel. + + //let mut notifier = None;//notify_chan.map(|c| AutoNotify(c)); + let notifier = match notify_chan { + Some(notify_chan_value) => { + let moved_ncv = move_it!{notify_chan_value}; + Some(AutoNotify(moved_ncv)) + } + _ => None + }; if enlist_many(child, &child_arc, &mut ancestors) { let group = @TCB(child, child_arc, ancestors, @@ -1221,7 +1318,7 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) { }; // Set up membership in taskgroup and descendantship in all ancestor - // groups. If any enlistment fails, some task was already failing, so + // groups. If any enlistment fails, Some task was already failing, so // don't let the child task run, and undo every successful enlistment. fn enlist_many(child: *rust_task, child_arc: &TaskGroupArc, ancestors: &mut AncestorList) -> bool { @@ -1387,7 +1484,7 @@ unsafe fn local_data_lookup( } ); do map_pos.map |index| { - // .get() is guaranteed because of "none { false }" above. + // .get() is guaranteed because of "None { false }" above. let (_, data_ptr, _) = (*map)[index].get(); (index, data_ptr) } @@ -1452,7 +1549,7 @@ unsafe fn local_set( } None => { // Find an empty slot. If not, grow the vector. - match (*map).position(|x| x.is_none()) { + match (*map).position(|x| x.is_None()) { Some(empty_index) => (*map).set_elt(empty_index, new_entry), None => (*map).push(new_entry) } @@ -1500,7 +1597,7 @@ unsafe fn local_data_set( local_set(rustrt::rust_get_task(), key, data) } /** - * Modify a task-local data value. If the function returns 'none', the + * Modify a task-local data value. If the function returns 'None', the * data is removed (and its reference dropped). */ unsafe fn local_data_modify( @@ -1583,7 +1680,7 @@ fn test_cant_dup_task_builder() { // The following 8 tests test the following 2^3 combinations: // {un,}linked {un,}supervised failure propagation {up,down}wards. -// !!! These tests are dangerous. If something is buggy, they will hang, !!! +// !!! These tests are dangerous. If Something is buggy, they will hang, !!! // !!! instead of exiting cleanly. This might wedge the buildbots. !!! #[test] #[ignore(cfg(windows))] @@ -2146,7 +2243,7 @@ fn test_tls_multitask() unsafe { fn my_key(+_x: @~str) { } local_data_set(my_key, @~"parent data"); do task::spawn unsafe { - assert local_data_get(my_key).is_none(); // TLS shouldn't carry over. + assert local_data_get(my_key).is_None(); // TLS shouldn't carry over. local_data_set(my_key, @~"child data"); assert *(local_data_get(my_key).get()) == ~"child data"; // should be cleaned up for us @@ -2171,7 +2268,7 @@ fn test_tls_pop() unsafe { local_data_set(my_key, @~"weasel"); assert *(local_data_pop(my_key).get()) == ~"weasel"; // Pop must remove the data from the map. - assert local_data_pop(my_key).is_none(); + assert local_data_pop(my_key).is_None(); } #[test] @@ -2198,7 +2295,7 @@ fn test_tls_crust_automorestack_memorial_bug() unsafe { // This might result in a stack-canary clobber if the runtime fails to set // sp_limit to 0 when calling the cleanup extern - it might automatically // jump over to the rust stack, which causes next_c_sp to get recorded as - // something within a rust stack segment. Then a subsequent upcall (esp. + // Something within a rust stack segment. Then a subsequent upcall (esp. // for logging, think vsnprintf) would run on a stack smaller than 1 MB. fn my_key(+_x: @~str) { } do task::spawn { From ecda18f5f55b3c107b9569493720ba00b5e666c0 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sat, 18 Aug 2012 20:00:12 -0700 Subject: [PATCH 02/15] core: Fix stage0 build errors --- src/libcore/task.rs | 67 ++++++++++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 19 deletions(-) diff --git a/src/libcore/task.rs b/src/libcore/task.rs index f84e0ac07605a..6c140637f4fbc 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -1720,9 +1720,16 @@ fn test_spawn_linked_sup_fail_up() { // child fails; parent fails // Unidirectional "parenting" shouldn't override bidirectional linked. // We have to cheat with opts - the interface doesn't support them because // they don't make sense (redundant with task().supervised()). + let opts = { + let mut opts = default_task_opts(); + opts.linked = true; + opts.supervised = true; + move opts + }; + let b0 = task(); let b1 = TaskBuilder({ - opts: { linked: true, supervised: true,.. b0.opts }, + opts: move opts, can_not_copy: None, .. *b0 }); @@ -1733,9 +1740,16 @@ fn test_spawn_linked_sup_fail_up() { // child fails; parent fails fn test_spawn_linked_sup_fail_down() { // parent fails; child fails // We have to cheat with opts - the interface doesn't support them because // they don't make sense (redundant with task().supervised()). + let opts = { + let mut opts = default_task_opts(); + opts.linked = true; + opts.supervised = true; + move opts + }; + let b0 = task(); let b1 = TaskBuilder({ - opts: { linked: true, supervised: true,.. b0.opts }, + opts: move opts, can_not_copy: None, .. *b0 }); @@ -1816,21 +1830,27 @@ fn test_spawn_linked_sup_propagate_sibling() { #[test] #[ignore(cfg(windows))] -fn test_spawn_raw_notify() { - let task_po = comm::Port(); - let task_ch = comm::Chan(task_po); - let notify_po = comm::Port(); - let notify_ch = comm::Chan(notify_po); +fn test_spawn_raw_notify_success() { + let (task_ch, task_po) = pipes::stream(); + let (notify_ch, notify_po) = pipes::stream(); let opts = { - notify_chan: Some(notify_ch), + notify_chan: Some(move notify_ch) .. default_task_opts() }; - do spawn_raw(opts) { - comm::send(task_ch, get_task()); + do spawn_raw(opts) |move task_ch| { + task_ch.send(get_task()); } - let task_ = comm::recv(task_po); - assert comm::recv(notify_po) == Exit(task_, Success); + let task_ = task_po.recv(); + assert notify_po.recv() == Exit(task_, Success); +} + +#[test] +#[ignore(cfg(windows))] +fn test_spawn_raw_notify_failure() { + // New bindings for these + let (task_ch, task_po) = pipes::stream(); + let (notify_ch, notify_po) = pipes::stream(); let opts = { linked: false, @@ -1838,11 +1858,11 @@ fn test_spawn_raw_notify() { .. default_task_opts() }; do spawn_raw(opts) { - comm::send(task_ch, get_task()); + task_ch.send(get_task()); fail; } - let task_ = comm::recv(task_po); - assert comm::recv(notify_po) == Exit(task_, Failure); + let task_ = task_po.recv(); + assert notify_po.recv() == Exit(task_, Failure); } #[test] @@ -2140,8 +2160,13 @@ fn test_unkillable() { let po = comm::Port(); let ch = po.chan(); + let opts = { + let mut opts = default_task_opts(); + opts.linked = false; + move opts + }; // We want to do this after failing - do spawn_raw({ linked: false,.. default_task_opts() }) { + do spawn_raw(opts) { for iter::repeat(10u) { yield() } ch.send(()); } @@ -2173,11 +2198,15 @@ fn test_unkillable() { #[ignore(cfg(windows))] #[should_fail] fn test_unkillable_nested() { - let po = comm::Port(); - let ch = po.chan(); + let (ch, po) = pipes::stream(); // We want to do this after failing - do spawn_raw({ linked: false,.. default_task_opts() }) { + let opts = { + let mut opts = default_task_opts(); + opts.linked = false; + move opts + }; + do spawn_raw(opts) { for iter::repeat(10u) { yield() } ch.send(()); } From e48841bf441426ba787bb3da544dbe15d2e6e89f Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Tue, 21 Aug 2012 15:20:10 -0700 Subject: [PATCH 03/15] core: changing pipes::port/chan_one to Port/ChanOne in unsafe.rs --- src/libcore/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libcore/task.rs b/src/libcore/task.rs index 6c140637f4fbc..0d7e4fde5c35d 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -254,7 +254,7 @@ priv impl TaskBuilder { } self.consumed = true; let notify_chan = if self.opts.notify_chan == None { - None + none } else { Some(option::swap_unwrap(&mut self.opts.notify_chan)) }; From 5a7647beca16019079fb88da60d5637ff1fdb162 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Thu, 23 Aug 2012 22:49:09 -0700 Subject: [PATCH 04/15] core: fix breakage in TaskBuilder.future_result the actual "fix" in this change is the chunk that moves `let x = self.consume()` to after the option dance that results in the `notify_chan` in TaskBuilder.try() the rest is cleanup/sense-making of what some of this code is doing (I'm looking at you, future_result) --- src/libcore/task.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/libcore/task.rs b/src/libcore/task.rs index 0d7e4fde5c35d..6b5fac563360f 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -366,17 +366,22 @@ impl TaskBuilder { } // Construct the future and give it to the caller. - let (ch, po) = stream::(); + let (notify_pipe_ch, notify_pipe_po) = stream::(); blk(do future::from_fn { - match po.recv() { + match notify_pipe_po.recv() { Exit(_, result) => result } }); // Reconfigure self to use a notify channel. TaskBuilder({ - opts: { notify_chan: Some(ch),.. self.opts }, + opts: { + linked: self.opts.linked, + supervised: self.opts.supervised, + mut notify_chan: Some(notify_pipe_ch), + sched: self.opts.sched + }, can_not_copy: None, .. *self.consume() }) @@ -445,12 +450,14 @@ impl TaskBuilder { * must be greater than zero. */ fn spawn(+f: fn~()) { - let x = self.consume(); - let notify_chan = if self.opts.notify_chan == None { + let notify_chan = if self.opts.notify_chan == none { None } else { - Some(option::swap_unwrap(&mut self.opts.notify_chan)) + let swapped_notify_chan = + option::swap_unwrap(&mut self.opts.notify_chan); + some(swapped_notify_chan) }; + let x = self.consume(); let opts = { linked: x.opts.linked, supervised: x.opts.supervised, @@ -522,7 +529,8 @@ impl TaskBuilder { let ch = comm::Chan(po); let mut result = None; - do self.future_result(|+r| { result = Some(r); }).spawn { + let fr_task_builder = self.future_result(|+r| { result = Some(r); }); + do fr_task_builder.spawn { comm::send(ch, f()); } match future::get(&option::unwrap(result)) { From 057056ce838833c6e6cc2fee76ff781feeb3cc0e Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Mon, 27 Aug 2012 23:03:04 -0700 Subject: [PATCH 05/15] core: cleanup in task.rs for things missed in last rebase --- src/libcore/task.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/libcore/task.rs b/src/libcore/task.rs index 6b5fac563360f..3c6fa72c47c75 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -78,9 +78,9 @@ export ThreadPerTask; export ManualThreads; export PlatformThread; -macro_rules! move_it { +macro_rules! move_it ( { $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } } -} +) /* Data types */ @@ -209,7 +209,7 @@ type TaskOpts = { linked: bool, supervised: bool, mut notify_chan: Option>, - sched: option, + sched: Option, }; /** @@ -254,7 +254,7 @@ priv impl TaskBuilder { } self.consumed = true; let notify_chan = if self.opts.notify_chan == None { - none + None } else { Some(option::swap_unwrap(&mut self.opts.notify_chan)) }; @@ -450,12 +450,12 @@ impl TaskBuilder { * must be greater than zero. */ fn spawn(+f: fn~()) { - let notify_chan = if self.opts.notify_chan == none { + let notify_chan = if self.opts.notify_chan == None { None } else { let swapped_notify_chan = option::swap_unwrap(&mut self.opts.notify_chan); - some(swapped_notify_chan) + Some(swapped_notify_chan) }; let x = self.consume(); let opts = { @@ -1309,7 +1309,7 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) { //let mut notifier = None;//notify_chan.map(|c| AutoNotify(c)); let notifier = match notify_chan { Some(notify_chan_value) => { - let moved_ncv = move_it!{notify_chan_value}; + let moved_ncv = move_it!(notify_chan_value); Some(AutoNotify(moved_ncv)) } _ => None From c23e1a68be8fc500a327ffabaafb6140be94219b Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Mon, 27 Aug 2012 23:03:24 -0700 Subject: [PATCH 06/15] std: cleanup in sync.rs for things missed in last rebase --- src/libstd/sync.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/libstd/sync.rs b/src/libstd/sync.rs index 45ab8d4c4274a..1f62fd63ad168 100644 --- a/src/libstd/sync.rs +++ b/src/libstd/sync.rs @@ -96,11 +96,11 @@ impl &Sem { state.count -= 1; if state.count < 0 { // Create waiter nobe. - let (signal_end, wait_end) = pipes::oneshot(); + let (SignalEnd, WaitEnd) = pipes::oneshot(); // Tell outer scope we need to block. - waiter_nobe = Some(wait_end); + waiter_nobe = Some(WaitEnd); // Enqueue ourself. - state.waiters.tail.send(signal_end); + state.waiters.tail.send(SignalEnd); } } } @@ -202,9 +202,9 @@ impl &Condvar { */ fn wait_on(condvar_id: uint) { // Create waiter nobe. - let (signal_end, wait_end) = pipes::oneshot(); - let mut wait_end = Some(wait_end); - let mut signal_end = Some(signal_end); + let (SignalEnd, WaitEnd) = pipes::oneshot(); + let mut WaitEnd = Some(WaitEnd); + let mut SignalEnd = Some(SignalEnd); let mut reacquire = None; let mut out_of_bounds = None; unsafe { @@ -218,8 +218,8 @@ impl &Condvar { signal_waitqueue(&state.waiters); } // Enqueue ourself to be woken up by a signaller. - let signal_end = option::swap_unwrap(&mut signal_end); - state.blocked[condvar_id].tail.send(signal_end); + let SignalEnd = option::swap_unwrap(&mut SignalEnd); + state.blocked[condvar_id].tail.send(SignalEnd); } else { out_of_bounds = Some(vec::len(state.blocked)); } @@ -238,7 +238,7 @@ impl &Condvar { // Unconditionally "block". (Might not actually block if a // signaller already sent -- I mean 'unconditionally' in contrast // with acquire().) - let _ = pipes::recv_one(option::swap_unwrap(&mut wait_end)); + let _ = pipes::recv_one(option::swap_unwrap(&mut WaitEnd)); } // This is needed for a failing condition variable to reacquire the From 9581c93eff20540ec85728f82f95f2670b781156 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Mon, 27 Aug 2012 23:26:31 -0700 Subject: [PATCH 07/15] core/std: an unending parade of minor tweaks due to renaming Option et al --- src/libcore/task.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/libcore/task.rs b/src/libcore/task.rs index 3c6fa72c47c75..fde9756d86f8b 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -1557,7 +1557,7 @@ unsafe fn local_set( } None => { // Find an empty slot. If not, grow the vector. - match (*map).position(|x| x.is_None()) { + match (*map).position(|x| x.is_none()) { Some(empty_index) => (*map).set_elt(empty_index, new_entry), None => (*map).push(new_entry) } @@ -2280,7 +2280,7 @@ fn test_tls_multitask() unsafe { fn my_key(+_x: @~str) { } local_data_set(my_key, @~"parent data"); do task::spawn unsafe { - assert local_data_get(my_key).is_None(); // TLS shouldn't carry over. + assert local_data_get(my_key).is_none(); // TLS shouldn't carry over. local_data_set(my_key, @~"child data"); assert *(local_data_get(my_key).get()) == ~"child data"; // should be cleaned up for us @@ -2305,7 +2305,7 @@ fn test_tls_pop() unsafe { local_data_set(my_key, @~"weasel"); assert *(local_data_pop(my_key).get()) == ~"weasel"; // Pop must remove the data from the map. - assert local_data_pop(my_key).is_None(); + assert local_data_pop(my_key).is_none(); } #[test] From 44c0788696badf231bcd33408d12e2e4ec375356 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Tue, 28 Aug 2012 06:43:58 -0700 Subject: [PATCH 08/15] core/std: finish making futures sendable + test.. still issues --- src/libcore/future.rs | 16 +++++++++++++--- src/libcore/task.rs | 2 +- src/rustdoc/markdown_writer.rs | 7 +++---- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/libcore/future.rs b/src/libcore/future.rs index cd0d2b25e95d9..9e7fd4d7d8c0b 100644 --- a/src/libcore/future.rs +++ b/src/libcore/future.rs @@ -37,7 +37,7 @@ struct Future { } priv enum FutureState { - Pending(fn@() -> A), + Pending(fn~() -> A), Evaluating, Forced(A) } @@ -93,7 +93,7 @@ fn from_port(+port: future_pipe::client::waiting) -> Future { } } -fn from_fn(+f: @fn() -> A) -> Future { +fn from_fn(+f: ~fn() -> A) -> Future { /*! * Create a future from a function. * @@ -239,4 +239,14 @@ mod test { let f = spawn(|| fail); let _x: ~str = get(&f); } -} \ No newline at end of file + + #[test] + fn test_sendable_future() { + let expected = ~"schlorf"; + let f = do spawn |copy expected| { expected }; + do task::spawn { + let actual = get(&f); + assert actual == expected; + } + } +} diff --git a/src/libcore/task.rs b/src/libcore/task.rs index fde9756d86f8b..9b741154d444b 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -1754,7 +1754,7 @@ fn test_spawn_linked_sup_fail_down() { // parent fails; child fails opts.supervised = true; move opts }; - + let b0 = task(); let b1 = TaskBuilder({ opts: move opts, diff --git a/src/rustdoc/markdown_writer.rs b/src/rustdoc/markdown_writer.rs index c9d7fe0adb79d..00c80366084a4 100644 --- a/src/rustdoc/markdown_writer.rs +++ b/src/rustdoc/markdown_writer.rs @@ -285,15 +285,14 @@ fn future_writer_factory( } fn future_writer() -> (writer, future::Future<~str>) { - let port = comm::Port(); - let chan = comm::Chan(port); + let (chan, port) = pipes::stream(); let writer = fn~(+instr: writeinstr) { - comm::send(chan, copy instr); + chan.send(copy instr); }; let future = do future::from_fn { let mut res = ~""; loop { - match comm::recv(port) { + match port.recv() { write(s) => res += s, done => break } From cf3a7087d22e3a95cee18343a3c74d9cae06b6ab Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Tue, 28 Aug 2012 21:28:25 -0700 Subject: [PATCH 09/15] core: patch from nmatsakis to make futures non-copyable --- src/libcore/future.rs | 8 ++++++-- src/libcore/unsafe.rs | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/libcore/future.rs b/src/libcore/future.rs index 9e7fd4d7d8c0b..b77bb1971a1cc 100644 --- a/src/libcore/future.rs +++ b/src/libcore/future.rs @@ -33,7 +33,11 @@ export future_pipe; #[doc = "The future type"] struct Future { - /*priv*/ mut state: FutureState, + /*priv*/ mut state: FutureState; + + // FIXME(#2829) -- futures should not be copyable, because they close + // over fn~'s that have pipes and so forth within! + drop {} } priv enum FutureState { @@ -88,7 +92,7 @@ fn from_port(+port: future_pipe::client::waiting) -> Future { port_ <-> *port; let port = option::unwrap(port_); match recv(port) { - future_pipe::completed(move data) => data + future_pipe::completed(move data) => data } } } diff --git a/src/libcore/unsafe.rs b/src/libcore/unsafe.rs index 2b57d694cbf30..9a984d5fcc630 100644 --- a/src/libcore/unsafe.rs +++ b/src/libcore/unsafe.rs @@ -415,13 +415,13 @@ mod tests { for uint::range(0u, num_tasks) |_i| { let total = total.clone(); - futures += ~[future::spawn(|| { + vec::push(futures, future::spawn(|| { for uint::range(0u, count) |_i| { do total.with |count| { **count += 1u; } } - })]; + })); }; for futures.each |f| { f.get() } From c7d5320a0dd264848865dd304eaf051209dd1bbf Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Wed, 5 Sep 2012 15:39:51 -0700 Subject: [PATCH 10/15] core: change notify_chan eq checks to is_none(), instead --- src/libcore/task.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/libcore/task.rs b/src/libcore/task.rs index 9b741154d444b..b9c65434938ce 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -246,14 +246,13 @@ fn task() -> TaskBuilder { mut consumed: false, }) } - priv impl TaskBuilder { fn consume() -> TaskBuilder { if self.consumed { fail ~"Cannot copy a task_builder"; // Fake move mode on self } self.consumed = true; - let notify_chan = if self.opts.notify_chan == None { + let notify_chan = if self.opts.notify_chan.is_none() { None } else { Some(option::swap_unwrap(&mut self.opts.notify_chan)) @@ -278,7 +277,7 @@ impl TaskBuilder { * the other will not be killed. */ fn unlinked() -> TaskBuilder { - let notify_chan = if self.opts.notify_chan == None { + let notify_chan = if self.opts.notify_chan.is_none() { None } else { Some(option::swap_unwrap(&mut self.opts.notify_chan)) @@ -300,7 +299,7 @@ impl TaskBuilder { * the child. */ fn supervised() -> TaskBuilder { - let notify_chan = if self.opts.notify_chan == None { + let notify_chan = if self.opts.notify_chan.is_none() { None } else { Some(option::swap_unwrap(&mut self.opts.notify_chan)) @@ -321,7 +320,7 @@ impl TaskBuilder { * other will be killed. */ fn linked() -> TaskBuilder { - let notify_chan = if self.opts.notify_chan == None { + let notify_chan = if self.opts.notify_chan.is_none() { None } else { Some(option::swap_unwrap(&mut self.opts.notify_chan)) @@ -388,7 +387,7 @@ impl TaskBuilder { } /// Configure a custom scheduler mode for the task. fn sched_mode(mode: SchedMode) -> TaskBuilder { - let notify_chan = if self.opts.notify_chan == None { + let notify_chan = if self.opts.notify_chan.is_none() { None } else { Some(option::swap_unwrap(&mut self.opts.notify_chan)) @@ -419,7 +418,7 @@ impl TaskBuilder { */ fn add_wrapper(wrapper: fn@(+fn~()) -> fn~()) -> TaskBuilder { let prev_gen_body = self.gen_body; - let notify_chan = if self.opts.notify_chan == None { + let notify_chan = if self.opts.notify_chan.is_none() { None } else { Some(option::swap_unwrap(&mut self.opts.notify_chan)) @@ -450,7 +449,7 @@ impl TaskBuilder { * must be greater than zero. */ fn spawn(+f: fn~()) { - let notify_chan = if self.opts.notify_chan == None { + let notify_chan = if self.opts.notify_chan.is_none() { None } else { let swapped_notify_chan = @@ -1267,7 +1266,7 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) { }; assert !new_task.is_null(); // Getting killed after here would leak the task. - let mut notify_chan = if opts.notify_chan == None { + let mut notify_chan = if opts.notify_chan.is_none() { None } else { Some(option::swap_unwrap(&mut opts.notify_chan)) From 689462d8b66f2a716fda4756d3af56389df1a24b Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Fri, 7 Sep 2012 13:11:03 -0700 Subject: [PATCH 11/15] core: change FutureState Forced(A) to Forced(~A) --- src/libcore/future.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/libcore/future.rs b/src/libcore/future.rs index b77bb1971a1cc..8d23da957e61e 100644 --- a/src/libcore/future.rs +++ b/src/libcore/future.rs @@ -43,7 +43,7 @@ struct Future { priv enum FutureState { Pending(fn~() -> A), Evaluating, - Forced(A) + Forced(~A) } /// Methods on the `future` type @@ -75,7 +75,7 @@ fn from_value(+val: A) -> Future { * not block. */ - Future {state: Forced(val)} + Future {state: Forced(~val)} } fn from_port(+port: future_pipe::client::waiting) -> Future { @@ -139,7 +139,7 @@ fn get_ref(future: &r/Future) -> &r/A { match future.state { Forced(ref v) => { // v here has type &A, but with a shorter lifetime. - return unsafe{ copy_lifetime(future, v) }; // ...extend it. + return unsafe{ copy_lifetime(future, &**v) }; // ...extend it. } Evaluating => { fail ~"Recursive forcing of future!"; @@ -154,7 +154,7 @@ fn get_ref(future: &r/Future) -> &r/A { fail ~"Logic error."; } Pending(move f) => { - future.state = Forced(f()); + future.state = Forced(~f()); return get_ref(future); } } From 17c8657f436e469f0836dae1e2600eef8fa1129f Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Fri, 7 Sep 2012 13:11:15 -0700 Subject: [PATCH 12/15] core: most rebase cruft cleanup --- src/libcore/task.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/libcore/task.rs b/src/libcore/task.rs index b9c65434938ce..6b905e372d52b 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -1094,7 +1094,7 @@ struct AutoNotify { } } -fn AutoNotify(chan: Chan) -> AutoNotify { +fn AutoNotify(+chan: Chan) -> AutoNotify { AutoNotify { notify_chan: chan, failed: true // Un-set above when taskgroup successfully made. @@ -1667,6 +1667,7 @@ fn test_spawn_raw_simple() { fn test_spawn_raw_unsupervise() { let opts = { linked: false, + mut notify_chan: None, .. default_task_opts() }; do spawn_raw(opts) { @@ -1842,7 +1843,7 @@ fn test_spawn_raw_notify_success() { let (notify_ch, notify_po) = pipes::stream(); let opts = { - notify_chan: Some(move notify_ch) + notify_chan: Some(move notify_ch), .. default_task_opts() }; do spawn_raw(opts) |move task_ch| { From 75d0a0cfb5a5d0e39af76514f90ec2add5d94763 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Fri, 7 Sep 2012 15:58:45 -0700 Subject: [PATCH 13/15] core: missed rebase cruft cleanup --- src/libcore/task.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/libcore/task.rs b/src/libcore/task.rs index 6b905e372d52b..e7101f587177e 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -1037,11 +1037,7 @@ fn each_ancestor(list: &mut AncestorList, struct TCB { me: *rust_task, // List of tasks with whose fates this one's is intertwined. -<<<<<<< HEAD tasks: TaskGroupArc, // 'none' means the group has failed. -======= - let tasks: TaskGroupArc; // 'None' means the group has failed. ->>>>>>> core: port task.rs to comm::Chan/Port to pipes::Chan/Port // Lists of tasks who will kill us if they fail, but whom we won't kill. mut ancestors: AncestorList, is_main: bool, From d3fda84005672c93c88b2c74500e0f87ba2e2026 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Fri, 7 Sep 2012 15:59:14 -0700 Subject: [PATCH 14/15] test: bunch of test cleanup from sendable_futures change --- src/test/bench/msgsend-ring-mutex-arcs.rs | 5 +++-- src/test/bench/msgsend-ring-pipes.rs | 6 ++++-- src/test/bench/msgsend-ring-rw-arcs.rs | 6 ++++-- src/test/bench/msgsend-ring.rs | 6 ++++-- src/test/bench/shootout-pfib.rs | 2 +- src/test/run-pass/task-comm-3.rs | 2 +- src/test/run-pass/task-comm.rs | 4 ++-- 7 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/test/bench/msgsend-ring-mutex-arcs.rs b/src/test/bench/msgsend-ring-mutex-arcs.rs index 81babc00f0954..6cc9be29a22fa 100644 --- a/src/test/bench/msgsend-ring-mutex-arcs.rs +++ b/src/test/bench/msgsend-ring-mutex-arcs.rs @@ -82,7 +82,7 @@ fn main(args: ~[~str]) { let num_chan2 = ~mut None; *num_chan2 <-> num_chan; let num_port = ~mut Some(num_port); - futures += ~[future::spawn(|move num_chan2, move num_port| { + let new_future = future::spawn(|move num_chan2, move num_port| { let mut num_chan = None; num_chan <-> *num_chan2; let mut num_port1 = None; @@ -90,7 +90,8 @@ fn main(args: ~[~str]) { thread_ring(i, msg_per_task, option::unwrap(num_chan), option::unwrap(num_port1)) - })]; + }); + vec::push(futures, new_future); num_chan = Some(new_chan); }; diff --git a/src/test/bench/msgsend-ring-pipes.rs b/src/test/bench/msgsend-ring-pipes.rs index 63ac80536afc2..8c21db3d2f505 100644 --- a/src/test/bench/msgsend-ring-pipes.rs +++ b/src/test/bench/msgsend-ring-pipes.rs @@ -78,7 +78,8 @@ fn main(args: ~[~str]) { let num_chan2 = ~mut None; *num_chan2 <-> num_chan; let num_port = ~mut Some(num_port); - futures += ~[future::spawn(|move num_chan2, move num_port| { + let new_future = do future::spawn + |move num_chan2, move num_port| { let mut num_chan = None; num_chan <-> *num_chan2; let mut num_port1 = None; @@ -86,7 +87,8 @@ fn main(args: ~[~str]) { thread_ring(i, msg_per_task, option::unwrap(num_chan), option::unwrap(num_port1)) - })]; + }; + vec::push(futures, new_future); num_chan = Some(new_chan); }; diff --git a/src/test/bench/msgsend-ring-rw-arcs.rs b/src/test/bench/msgsend-ring-rw-arcs.rs index 745e1e4e758eb..82621c2f7e912 100644 --- a/src/test/bench/msgsend-ring-rw-arcs.rs +++ b/src/test/bench/msgsend-ring-rw-arcs.rs @@ -82,7 +82,8 @@ fn main(args: ~[~str]) { let num_chan2 = ~mut None; *num_chan2 <-> num_chan; let num_port = ~mut Some(num_port); - futures += ~[future::spawn(|move num_chan2, move num_port| { + let new_future = do future::spawn + |move num_chan2, move num_port| { let mut num_chan = None; num_chan <-> *num_chan2; let mut num_port1 = None; @@ -90,7 +91,8 @@ fn main(args: ~[~str]) { thread_ring(i, msg_per_task, option::unwrap(num_chan), option::unwrap(num_port1)) - })]; + }; + vec::push(futures, new_future); num_chan = Some(new_chan); }; diff --git a/src/test/bench/msgsend-ring.rs b/src/test/bench/msgsend-ring.rs index d972dde4c4aef..15a18725abf65 100644 --- a/src/test/bench/msgsend-ring.rs +++ b/src/test/bench/msgsend-ring.rs @@ -45,11 +45,13 @@ fn main(args: ~[~str]) { let get_chan = Port(); let get_chan_chan = Chan(get_chan); - futures += ~[do future::spawn |copy num_chan, move get_chan_chan| { + let new_future = do future::spawn + |copy num_chan, move get_chan_chan| { let p = Port(); get_chan_chan.send(Chan(p)); thread_ring(i, msg_per_task, num_chan, p) - }]; + }; + vec::push(futures, new_future); num_chan = get_chan.recv(); }; diff --git a/src/test/bench/shootout-pfib.rs b/src/test/bench/shootout-pfib.rs index 5e7648b5f85c8..17ecf3fb332ee 100644 --- a/src/test/bench/shootout-pfib.rs +++ b/src/test/bench/shootout-pfib.rs @@ -71,7 +71,7 @@ fn stress(num_tasks: int) { let mut results = ~[]; for range(0, num_tasks) |i| { do task::task().future_result(|+r| { - results += ~[r]; + vec::push(results, r); }).spawn { stress_task(i); } diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs index 2dc33fa52bd88..e47daceb6e6f4 100644 --- a/src/test/run-pass/task-comm-3.rs +++ b/src/test/run-pass/task-comm-3.rs @@ -31,7 +31,7 @@ fn test00() { while i < number_of_tasks { let ch = po.chan(); do task::task().future_result(|+r| { - results += ~[r]; + vec::push(results, r); }).spawn |copy i| { test00_start(ch, i, number_of_messages) } diff --git a/src/test/run-pass/task-comm.rs b/src/test/run-pass/task-comm.rs index 5320ced981d57..ca34d42423fa6 100644 --- a/src/test/run-pass/task-comm.rs +++ b/src/test/run-pass/task-comm.rs @@ -40,7 +40,7 @@ fn test00() { while i < number_of_tasks { i = i + 1; do task::task().future_result(|+r| { - results += ~[r]; + vec::push(results, r); }).spawn |copy i| { test00_start(ch, i, number_of_messages); } @@ -127,7 +127,7 @@ fn test06() { while i < number_of_tasks { i = i + 1; do task::task().future_result(|+r| { - results += ~[r]; + vec::push(results, r); }).spawn |copy i| { test06_start(i); }; From c3757ac05273e3cb0a5ef9b2aabf142300112b1e Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Fri, 7 Sep 2012 18:52:35 -0700 Subject: [PATCH 15/15] core: fix separator for Future property --- src/libcore/future.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libcore/future.rs b/src/libcore/future.rs index 8d23da957e61e..122f671447287 100644 --- a/src/libcore/future.rs +++ b/src/libcore/future.rs @@ -33,7 +33,7 @@ export future_pipe; #[doc = "The future type"] struct Future { - /*priv*/ mut state: FutureState; + /*priv*/ mut state: FutureState, // FIXME(#2829) -- futures should not be copyable, because they close // over fn~'s that have pipes and so forth within!