diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index a7c794fb5f142..e29c30ba0334a 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -24,10 +24,12 @@ pub type RtioTcpStreamObject = uvio::UvTcpStream; pub type RtioTcpListenerObject = uvio::UvTcpListener; pub type RtioUdpSocketObject = uvio::UvUdpSocket; pub type RtioTimerObject = uvio::UvTimer; +pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback; pub trait EventLoop { fn run(&mut self); fn callback(&mut self, ~fn()); + fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback; fn callback_ms(&mut self, ms: u64, ~fn()); fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject; /// The asynchronous I/O services. Not all event loops may provide one @@ -35,11 +37,12 @@ pub trait EventLoop { } pub trait RemoteCallback { - /// Trigger the remote callback. Note that the number of times the callback - /// is run is not guaranteed. All that is guaranteed is that, after calling 'fire', - /// the callback will be called at least once, but multiple callbacks may be coalesced - /// and callbacks may be called more often requested. Destruction also triggers the - /// callback. + /// Trigger the remote callback. Note that the number of times the + /// callback is run is not guaranteed. All that is guaranteed is + /// that, after calling 'fire', the callback will be called at + /// least once, but multiple callbacks may be coalesced and + /// callbacks may be called more often requested. Destruction also + /// triggers the callback. fn fire(&mut self); } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index ce4e64c47d2ef..a50618ba0ad6d 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -23,7 +23,7 @@ use super::message_queue::MessageQueue; use rt::kill::BlockedTask; use rt::local_ptr; use rt::local::Local; -use rt::rtio::RemoteCallback; +use rt::rtio::{RemoteCallback, PausibleIdleCallback}; use rt::metrics::SchedMetrics; use borrow::{to_uint}; use cell::Cell; @@ -31,10 +31,11 @@ use rand::{XorShiftRng, RngUtil}; use iterator::{range}; use vec::{OwnedVector}; -/// The Scheduler is responsible for coordinating execution of Coroutines -/// on a single thread. When the scheduler is running it is owned by -/// thread local storage and the running task is owned by the -/// scheduler. +/// A scheduler is responsible for coordinating the execution of Tasks +/// on a single thread. The scheduler runs inside a slightly modified +/// Rust Task. When not running this task is stored in the scheduler +/// struct. The scheduler struct acts like a baton, all scheduling +/// actions are transfers of the baton. /// /// XXX: This creates too many callbacks to run_sched_once, resulting /// in too much allocation and too many events. @@ -64,11 +65,12 @@ pub struct Scheduler { stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, - /// The scheduler runs on a special task. + /// The scheduler runs on a special task. When it is not running + /// it is stored here instead of the work queue. sched_task: Option<~Task>, /// An action performed after a context switch on behalf of the /// code running before the context switch - priv cleanup_job: Option, + cleanup_job: Option, metrics: SchedMetrics, /// Should this scheduler run any task, or only pinned tasks? run_anything: bool, @@ -76,31 +78,14 @@ pub struct Scheduler { /// them to. friend_handle: Option, /// A fast XorShift rng for scheduler use - rng: XorShiftRng - -} - -pub struct SchedHandle { - priv remote: ~RemoteCallbackObject, - priv queue: MessageQueue, - sched_id: uint -} - -pub enum SchedMessage { - Wake, - Shutdown, - PinnedTask(~Task), - TaskFromFriend(~Task) -} - -enum CleanupJob { - DoNothing, - GiveTask(~Task, UnsafeTaskReceiver) + rng: XorShiftRng, + /// A toggleable idle callback + idle_callback: ~PausibleIdleCallback } impl Scheduler { - pub fn sched_id(&self) -> uint { to_uint(self) } + // * Initialization Functions pub fn new(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, @@ -114,8 +99,6 @@ impl Scheduler { } - // When you create a scheduler it isn't yet "in" a task, so the - // task field is None. pub fn new_special(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, work_queues: ~[WorkQueue<~Task>], @@ -124,6 +107,9 @@ impl Scheduler { friend: Option) -> Scheduler { + let mut event_loop = event_loop; + let idle_callback = event_loop.pausible_idle_callback(); + Scheduler { sleeper_list: sleeper_list, message_queue: MessageQueue::new(), @@ -138,7 +124,8 @@ impl Scheduler { metrics: SchedMetrics::new(), run_anything: run_anything, friend_handle: friend, - rng: XorShiftRng::new() + rng: XorShiftRng::new(), + idle_callback: idle_callback } } @@ -151,6 +138,8 @@ impl Scheduler { // scheduler task and bootstrap into it. pub fn bootstrap(~self, task: ~Task) { + let mut this = self; + // Initialize the TLS key. local_ptr::init_tls_key(); @@ -161,10 +150,15 @@ impl Scheduler { // task, put it in TLS. Local::put::(sched_task); + // Before starting our first task, make sure the idle callback + // is active. As we do not start in the sleep state this is + // important. + this.idle_callback.start(Scheduler::run_sched_once); + // Now, as far as all the scheduler state is concerned, we are // inside the "scheduler" context. So we can act like the // scheduler and resume the provided task. - self.resume_task_immediately(task); + this.resume_task_immediately(task); // Now we are back in the scheduler context, having // successfully run the input task. Start by running the @@ -173,7 +167,6 @@ impl Scheduler { let sched = Local::take::(); rtdebug!("starting scheduler %u", sched.sched_id()); - sched.run(); // Now that we are done with the scheduler, clean up the @@ -189,6 +182,9 @@ impl Scheduler { let message = stask.sched.get_mut_ref().message_queue.pop(); assert!(message.is_none()); + // Close the idle callback. + stask.sched.get_mut_ref().idle_callback.close(); + stask.destroyed = true; } @@ -198,11 +194,6 @@ impl Scheduler { let mut self_sched = self; - // Always run through the scheduler loop at least once so that - // we enter the sleep state and can then be woken up by other - // schedulers. - self_sched.event_loop.callback(Scheduler::run_sched_once); - // This is unsafe because we need to place the scheduler, with // the event_loop inside, inside our task. But we still need a // mutable reference to the event_loop to give it the "run" @@ -221,11 +212,11 @@ impl Scheduler { } } - // One iteration of the scheduler loop, always run at least once. + // * Execution Functions - Core Loop Logic // The model for this function is that you continue through it // until you either use the scheduler while performing a schedule - // action, in which case you give it away and do not return, or + // action, in which case you give it away and return early, or // you reach the end and sleep. In the case that a scheduler // action is performed the loop is evented such that this function // is called again. @@ -235,41 +226,24 @@ impl Scheduler { // already have a scheduler stored in our local task, so we // start off by taking it. This is the only path through the // scheduler where we get the scheduler this way. - let sched = Local::take::(); + let mut sched = Local::take::(); - // Our first task is to read mail to see if we have important - // messages. - - // 1) A wake message is easy, mutate sched struct and return - // it. - // 2) A shutdown is also easy, shutdown. - // 3) A pinned task - we resume immediately and do not return - // here. - // 4) A message from another scheduler with a non-homed task - // to run here. - - let result = sched.interpret_message_queue(); - let sched = match result { - Some(sched) => { - // We did not resume a task, so we returned. - sched - } - None => { - return; - } - }; + // Assume that we need to continue idling unless we reach the + // end of this function without performing an action. + sched.idle_callback.resume(); - // Second activity is to try resuming a task from the queue. + // First we check for scheduler messages, these are higher + // priority than regular tasks. + let sched = match sched.interpret_message_queue() { + Some(sched) => sched, + None => return + }; - let result = sched.do_work(); - let mut sched = match result { - Some(sched) => { - // Failed to dequeue a task, so we return. - sched - } - None => { - return; - } + // This helper will use a randomized work-stealing algorithm + // to find work. + let mut sched = match sched.do_work() { + Some(sched) => sched, + None => return }; // If we got here then there was no work to do. @@ -282,8 +256,13 @@ impl Scheduler { sched.sleepy = true; let handle = sched.make_handle(); sched.sleeper_list.push(handle); + // Since we are sleeping, deactivate the idle callback. + sched.idle_callback.pause(); } else { rtdebug!("not sleeping, already doing so or no_sleep set"); + // We may not be sleeping, but we still need to deactivate + // the idle callback. + sched.idle_callback.pause(); } // Finished a cycle without using the Scheduler. Place it back @@ -291,85 +270,33 @@ impl Scheduler { Local::put(sched); } - pub fn make_handle(&mut self) -> SchedHandle { - let remote = self.event_loop.remote_callback(Scheduler::run_sched_once); - - return SchedHandle { - remote: remote, - queue: self.message_queue.clone(), - sched_id: self.sched_id() - }; - } - - /// Schedule a task to be executed later. - /// - /// Pushes the task onto the work stealing queue and tells the - /// event loop to run it later. Always use this instead of pushing - /// to the work queue directly. - pub fn enqueue_task(&mut self, task: ~Task) { - - let this = self; - - // We push the task onto our local queue clone. - this.work_queue.push(task); - this.event_loop.callback(Scheduler::run_sched_once); - - // We've made work available. Notify a - // sleeping scheduler. - - // XXX: perf. Check for a sleeper without - // synchronizing memory. It's not critical - // that we always find it. - - // XXX: perf. If there's a sleeper then we - // might as well just send it the task - // directly instead of pushing it to the - // queue. That is essentially the intent here - // and it is less work. - match this.sleeper_list.pop() { - Some(handle) => { - let mut handle = handle; - handle.send(Wake) - } - None => { (/* pass */) } - }; - } - - /// As enqueue_task, but with the possibility for the blocked task to - /// already have been killed. - pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) { - do blocked_task.wake().map_move |task| { - self.enqueue_task(task); - }; - } - - // * Scheduler-context operations - // This function returns None if the scheduler is "used", or it - // returns the still-available scheduler. + // returns the still-available scheduler. At this point all + // message-handling will count as a turn of work, and as a result + // return None. fn interpret_message_queue(~self) -> Option<~Scheduler> { let mut this = self; match this.message_queue.pop() { Some(PinnedTask(task)) => { - this.event_loop.callback(Scheduler::run_sched_once); let mut task = task; task.give_home(Sched(this.make_handle())); this.resume_task_immediately(task); return None; } Some(TaskFromFriend(task)) => { - this.event_loop.callback(Scheduler::run_sched_once); rtdebug!("got a task from a friend. lovely!"); - return this.sched_schedule_task(task); + this.process_task(task, + Scheduler::resume_task_immediately_cl).map_move(Local::put); + return None; } Some(Wake) => { - this.event_loop.callback(Scheduler::run_sched_once); this.sleepy = false; - return Some(this); + Local::put(this); + return None; } Some(Shutdown) => { - this.event_loop.callback(Scheduler::run_sched_once); + rtdebug!("shutting down"); if this.sleepy { // There may be an outstanding handle on the // sleeper list. Pop them all to make sure that's @@ -388,11 +315,8 @@ impl Scheduler { // event loop references we will shut down. this.no_sleep = true; this.sleepy = false; - // YYY: Does a shutdown count as a "use" of the - // scheduler? This seems to work - so I'm leaving it - // this way despite not having a solid rational for - // why I should return the scheduler here. - return Some(this); + Local::put(this); + return None; } None => { return Some(this); @@ -400,30 +324,19 @@ impl Scheduler { } } - /// Given an input Coroutine sends it back to its home scheduler. - fn send_task_home(task: ~Task) { - let mut task = task; - let mut home = task.take_unwrap_home(); - match home { - Sched(ref mut home_handle) => { - home_handle.send(PinnedTask(task)); - } - AnySched => { - rtabort!("error: cannot send anysched task home"); - } - } - } + fn do_work(~self) -> Option<~Scheduler> { + let mut this = self; - /// Take a non-homed task we aren't allowed to run here and send - /// it to the designated friend scheduler to execute. - fn send_to_friend(&mut self, task: ~Task) { - rtdebug!("sending a task to friend"); - match self.friend_handle { - Some(ref mut handle) => { - handle.send(TaskFromFriend(task)); + rtdebug!("scheduler calling do work"); + match this.find_work() { + Some(task) => { + rtdebug!("found some work! processing the task"); + return this.process_task(task, + Scheduler::resume_task_immediately_cl); } None => { - rtabort!("tried to send task to a friend but scheduler has no friends"); + rtdebug!("no work was found, returning the scheduler struct"); + return Some(this); } } } @@ -447,8 +360,8 @@ impl Scheduler { None => { // Our naive stealing, try kinda hard. rtdebug!("scheduler trying to steal"); - let _len = self.work_queues.len(); - return self.try_steals(2); + let len = self.work_queues.len(); + return self.try_steals(len/2); } } } @@ -462,7 +375,8 @@ impl Scheduler { let work_queues = &mut self.work_queues; match work_queues[index].steal() { Some(task) => { - rtdebug!("found task by stealing"); return Some(task) + rtdebug!("found task by stealing"); + return Some(task) } None => () } @@ -471,8 +385,11 @@ impl Scheduler { return None; } - // Given a task, execute it correctly. - fn process_task(~self, task: ~Task) -> Option<~Scheduler> { + // * Task Routing Functions - Make sure tasks send up in the right + // place. + + fn process_task(~self, task: ~Task, + schedule_fn: SchedulingFn) -> Option<~Scheduler> { let mut this = self; let mut task = task; @@ -489,15 +406,13 @@ impl Scheduler { } else { rtdebug!("running task here"); task.give_home(Sched(home_handle)); - this.resume_task_immediately(task); - return None; + return schedule_fn(this, task); } } AnySched if this.run_anything => { rtdebug!("running anysched task here"); task.give_home(AnySched); - this.resume_task_immediately(task); - return None; + return schedule_fn(this, task); } AnySched => { rtdebug!("sending task to friend"); @@ -508,98 +423,71 @@ impl Scheduler { } } - // Bundle the helpers together. - fn do_work(~self) -> Option<~Scheduler> { - let mut this = self; - - rtdebug!("scheduler calling do work"); - match this.find_work() { - Some(task) => { - rtdebug!("found some work! processing the task"); - return this.process_task(task); + fn send_task_home(task: ~Task) { + let mut task = task; + let mut home = task.take_unwrap_home(); + match home { + Sched(ref mut home_handle) => { + home_handle.send(PinnedTask(task)); } - None => { - rtdebug!("no work was found, returning the scheduler struct"); - return Some(this); + AnySched => { + rtabort!("error: cannot send anysched task home"); } } } - /// Called by a running task to end execution, after which it will - /// be recycled by the scheduler for reuse in a new task. - pub fn terminate_current_task(~self) { - // Similar to deschedule running task and then, but cannot go through - // the task-blocking path. The task is already dying. - let mut this = self; - let stask = this.sched_task.take_unwrap(); - do this.change_task_context(stask) |sched, mut dead_task| { - let coroutine = dead_task.coroutine.take_unwrap(); - coroutine.recycle(&mut sched.stack_pool); + /// Take a non-homed task we aren't allowed to run here and send + /// it to the designated friend scheduler to execute. + fn send_to_friend(&mut self, task: ~Task) { + rtdebug!("sending a task to friend"); + match self.friend_handle { + Some(ref mut handle) => { + handle.send(TaskFromFriend(task)); + } + None => { + rtabort!("tried to send task to a friend but scheduler has no friends"); + } } } - // Scheduling a task requires a few checks to make sure the task - // ends up in the appropriate location. The run_anything flag on - // the scheduler and the home on the task need to be checked. This - // helper performs that check. It takes a function that specifies - // how to queue the the provided task if that is the correct - // action. This is a "core" function that requires handling the - // returned Option correctly. - - pub fn schedule_task(~self, task: ~Task, - schedule_fn: ~fn(sched: ~Scheduler, task: ~Task)) - -> Option<~Scheduler> { - - // is the task home? - let is_home = task.is_home_no_tls(&self); + /// Schedule a task to be executed later. + /// + /// Pushes the task onto the work stealing queue and tells the + /// event loop to run it later. Always use this instead of pushing + /// to the work queue directly. + pub fn enqueue_task(&mut self, task: ~Task) { - // does the task have a home? - let homed = task.homed(); + let this = self; - let mut this = self; + // We push the task onto our local queue clone. + this.work_queue.push(task); + this.idle_callback.resume(); - if is_home || (!homed && this.run_anything) { - // here we know we are home, execute now OR we know we - // aren't homed, and that this sched doesn't care - rtdebug!("task: %u is on ok sched, executing", to_uint(task)); - schedule_fn(this, task); - return None; - } else if !homed && !this.run_anything { - // the task isn't homed, but it can't be run here - this.send_to_friend(task); - return Some(this); - } else { - // task isn't home, so don't run it here, send it home - Scheduler::send_task_home(task); - return Some(this); - } - } + // We've made work available. Notify a + // sleeping scheduler. - // There are two contexts in which schedule_task can be called: - // inside the scheduler, and inside a task. These contexts handle - // executing the task slightly differently. In the scheduler - // context case we want to receive the scheduler as an input, and - // manually deal with the option. In the task context case we want - // to use TLS to find the scheduler, and deal with the option - // inside the helper. - - pub fn sched_schedule_task(~self, task: ~Task) -> Option<~Scheduler> { - do self.schedule_task(task) |sched, next_task| { - sched.resume_task_immediately(next_task); - } + // XXX: perf. Check for a sleeper without + // synchronizing memory. It's not critical + // that we always find it. + match this.sleeper_list.pop() { + Some(handle) => { + let mut handle = handle; + handle.send(Wake) + } + None => { (/* pass */) } + }; } - // Task context case - use TLS. - pub fn run_task(task: ~Task) { - let sched = Local::take::(); - let opt = do sched.schedule_task(task) |sched, next_task| { - do sched.switch_running_tasks_and_then(next_task) |sched, last_task| { - sched.enqueue_blocked_task(last_task); - } + /// As enqueue_task, but with the possibility for the blocked task to + /// already have been killed. + pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) { + do blocked_task.wake().map_move |task| { + self.enqueue_task(task); }; - opt.map_move(Local::put); } + // * Core Context Switching Functions + // The primary function for changing contexts. In the current // design the scheduler is just a slightly modified GreenTask, so // all context swaps are from Task to Task. The only difference @@ -629,7 +517,7 @@ impl Scheduler { // The current task is placed inside an enum with the cleanup // function. This enum is then placed inside the scheduler. - this.enqueue_cleanup_job(GiveTask(current_task, f_opaque)); + this.cleanup_job = Some(CleanupJob::new(current_task, f_opaque)); // The scheduler is then placed inside the next task. let mut next_task = next_task; @@ -645,12 +533,9 @@ impl Scheduler { transmute_mut_region(*next_task.sched.get_mut_ref()); let current_task: &mut Task = match sched.cleanup_job { - Some(GiveTask(ref task, _)) => { + Some(CleanupJob { task: ref task, _ }) => { transmute_mut_region(*transmute_mut_unsafe(task)) } - Some(DoNothing) => { - rtabort!("no next task"); - } None => { rtabort!("no cleanup job"); } @@ -684,19 +569,42 @@ impl Scheduler { } } - // Old API for task manipulation implemented over the new core - // function. + // Returns a mutable reference to both contexts involved in this + // swap. This is unsafe - we are getting mutable internal + // references to keep even when we don't own the tasks. It looks + // kinda safe because we are doing transmutes before passing in + // the arguments. + pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) -> + (&'a mut Context, &'a mut Context) { + let current_task_context = + &mut current_task.coroutine.get_mut_ref().saved_context; + let next_task_context = + &mut next_task.coroutine.get_mut_ref().saved_context; + unsafe { + (transmute_mut_region(current_task_context), + transmute_mut_region(next_task_context)) + } + } + + // * Context Swapping Helpers - Here be ugliness! - pub fn resume_task_immediately(~self, task: ~Task) { + pub fn resume_task_immediately(~self, task: ~Task) -> Option<~Scheduler> { do self.change_task_context(task) |sched, stask| { sched.sched_task = Some(stask); } + return None; } + fn resume_task_immediately_cl(sched: ~Scheduler, + task: ~Task) -> Option<~Scheduler> { + sched.resume_task_immediately(task) + } + + pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) { match blocked_task.wake() { - Some(task) => self.resume_task_immediately(task), - None => Local::put(self), + Some(task) => { self.resume_task_immediately(task); } + None => Local::put(self) }; } @@ -735,54 +643,75 @@ impl Scheduler { } } - // A helper that looks up the scheduler and runs a task later by - // enqueuing it. + fn switch_task(sched: ~Scheduler, task: ~Task) -> Option<~Scheduler> { + do sched.switch_running_tasks_and_then(task) |sched, last_task| { + sched.enqueue_blocked_task(last_task); + }; + return None; + } + + // * Task Context Helpers + + /// Called by a running task to end execution, after which it will + /// be recycled by the scheduler for reuse in a new task. + pub fn terminate_current_task(~self) { + // Similar to deschedule running task and then, but cannot go through + // the task-blocking path. The task is already dying. + let mut this = self; + let stask = this.sched_task.take_unwrap(); + do this.change_task_context(stask) |sched, mut dead_task| { + let coroutine = dead_task.coroutine.take_unwrap(); + coroutine.recycle(&mut sched.stack_pool); + } + } + + pub fn run_task(task: ~Task) { + let sched = Local::take::(); + sched.process_task(task, Scheduler::switch_task).map_move(Local::put); + } + pub fn run_task_later(next_task: ~Task) { - // We aren't performing a scheduler operation, so we want to - // put the Scheduler back when we finish. let next_task = Cell::new(next_task); do Local::borrow:: |sched| { sched.enqueue_task(next_task.take()); }; } - // Returns a mutable reference to both contexts involved in this - // swap. This is unsafe - we are getting mutable internal - // references to keep even when we don't own the tasks. It looks - // kinda safe because we are doing transmutes before passing in - // the arguments. - pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) -> - (&'a mut Context, &'a mut Context) { - let current_task_context = - &mut current_task.coroutine.get_mut_ref().saved_context; - let next_task_context = - &mut next_task.coroutine.get_mut_ref().saved_context; - unsafe { - (transmute_mut_region(current_task_context), - transmute_mut_region(next_task_context)) - } - } + // * Utility Functions - pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) { - self.cleanup_job = Some(job); - } + pub fn sched_id(&self) -> uint { to_uint(self) } pub fn run_cleanup_job(&mut self) { - rtdebug!("running cleanup job"); let cleanup_job = self.cleanup_job.take_unwrap(); - match cleanup_job { - DoNothing => { } - GiveTask(task, f) => f.to_fn()(self, task) - } + cleanup_job.run(self); + } + + pub fn make_handle(&mut self) -> SchedHandle { + let remote = self.event_loop.remote_callback(Scheduler::run_sched_once); + + return SchedHandle { + remote: remote, + queue: self.message_queue.clone(), + sched_id: self.sched_id() + }; } } -// The cases for the below function. -enum ResumeAction { - SendHome, - Requeue, - ResumeNow, - Homeless +// Supporting types + +type SchedulingFn = ~fn(~Scheduler, ~Task) -> Option<~Scheduler>; + +pub enum SchedMessage { + Wake, + Shutdown, + PinnedTask(~Task), + TaskFromFriend(~Task) +} + +pub struct SchedHandle { + priv remote: ~RemoteCallbackObject, + priv queue: MessageQueue, + sched_id: uint } impl SchedHandle { @@ -792,6 +721,25 @@ impl SchedHandle { } } +struct CleanupJob { + task: ~Task, + f: UnsafeTaskReceiver +} + +impl CleanupJob { + pub fn new(task: ~Task, f: UnsafeTaskReceiver) -> CleanupJob { + CleanupJob { + task: task, + f: f + } + } + + pub fn run(self, sched: &mut Scheduler) { + let CleanupJob { task: task, f: f } = self; + f.to_fn()(sched, task) + } +} + // XXX: Some hacks to put a &fn in Scheduler without borrowck // complaining type UnsafeTaskReceiver = raw::Closure; @@ -1096,6 +1044,51 @@ mod test { } } + // A regression test that the final message is always handled. + // Used to deadlock because Shutdown was never recvd. + #[test] + fn no_missed_messages() { + use rt::work_queue::WorkQueue; + use rt::sleeper_list::SleeperList; + use rt::stack::StackPool; + use rt::uv::uvio::UvEventLoop; + use rt::sched::{Shutdown, TaskFromFriend}; + use util; + + do run_in_bare_thread { + do stress_factor().times { + let sleepers = SleeperList::new(); + let queue = WorkQueue::new(); + let queues = ~[queue.clone()]; + + let mut sched = ~Scheduler::new( + ~UvEventLoop::new(), + queue, + queues.clone(), + sleepers.clone()); + + let mut handle = sched.make_handle(); + + let sched = Cell::new(sched); + + let thread = do Thread::start { + let mut sched = sched.take(); + let bootstrap_task = ~Task::new_root(&mut sched.stack_pool, None, ||()); + sched.bootstrap(bootstrap_task); + }; + + let mut stack_pool = StackPool::new(); + let task = ~Task::new_root(&mut stack_pool, None, ||()); + handle.send(TaskFromFriend(task)); + + handle.send(Shutdown); + util::ignore(handle); + + thread.join(); + } + } + } + #[test] fn multithreading() { use rt::comm::*; diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 708166518bb89..698c59805a4cf 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -360,7 +360,7 @@ impl Coroutine { // Again - might work while safe, or it might not. do Local::borrow:: |sched| { - (sched).run_cleanup_job(); + sched.run_cleanup_job(); } // To call the run method on a task we need a direct diff --git a/src/libstd/rt/util.rs b/src/libstd/rt/util.rs index 6280b64ecf51c..8a2541e73b384 100644 --- a/src/libstd/rt/util.rs +++ b/src/libstd/rt/util.rs @@ -38,8 +38,7 @@ pub fn default_sched_threads() -> uint { pub fn dumb_println(s: &str) { use io::WriterUtil; let dbg = ::libc::STDERR_FILENO as ::io::fd_t; - dbg.write_str(s); - dbg.write_str("\n"); + dbg.write_str(s + "\n"); } pub fn abort(msg: &str) -> ! { diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs index b73be9f7250db..a21146620ca82 100644 --- a/src/libstd/rt/uv/idle.rs +++ b/src/libstd/rt/uv/idle.rs @@ -48,6 +48,20 @@ impl IdleWatcher { } } + pub fn restart(&mut self) { + unsafe { + assert!(0 == uvll::idle_start(self.native_handle(), idle_cb)) + }; + + extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { + let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); + let data = idle_watcher.get_watcher_data(); + let cb: &IdleCallback = data.idle_cb.get_ref(); + let status = status_to_maybe_uv_error(idle_watcher, status); + (*cb)(idle_watcher, status); + } + } + pub fn stop(&mut self) { // NB: Not resetting the Rust idle_cb to None here because `stop` is // likely called from *within* the idle callback, causing a use after diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index a26b8a3ad594d..290e57942f47a 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -116,6 +116,15 @@ impl EventLoop for UvEventLoop { } } + fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback { + let idle_watcher = IdleWatcher::new(self.uvio.uv_loop()); + return ~UvPausibleIdleCallback { + watcher: idle_watcher, + idle_flag: false, + closed: false + }; + } + fn callback_ms(&mut self, ms: u64, f: ~fn()) { let mut timer = TimerWatcher::new(self.uvio.uv_loop()); do timer.start(ms, 0) |timer, status| { @@ -134,6 +143,44 @@ impl EventLoop for UvEventLoop { } } +pub struct UvPausibleIdleCallback { + watcher: IdleWatcher, + idle_flag: bool, + closed: bool +} + +impl UvPausibleIdleCallback { + #[inline] + pub fn start(&mut self, f: ~fn()) { + do self.watcher.start |_idle_watcher, _status| { + f(); + }; + self.idle_flag = true; + } + #[inline] + pub fn pause(&mut self) { + if self.idle_flag == true { + self.watcher.stop(); + self.idle_flag = false; + } + } + #[inline] + pub fn resume(&mut self) { + if self.idle_flag == false { + self.watcher.restart(); + self.idle_flag = true; + } + } + #[inline] + pub fn close(&mut self) { + self.pause(); + if !self.closed { + self.closed = true; + self.watcher.close(||()); + } + } +} + #[test] fn test_callback_run_once() { do run_in_bare_thread { @@ -162,14 +209,39 @@ impl UvRemoteCallback { let exit_flag_clone = exit_flag.clone(); let async = do AsyncWatcher::new(loop_) |watcher, status| { assert!(status.is_none()); + + // The synchronization logic here is subtle. To review, + // the uv async handle type promises that, after it is + // triggered the remote callback is definitely called at + // least once. UvRemoteCallback needs to maintain those + // semantics while also shutting down cleanly from the + // dtor. In our case that means that, when the + // UvRemoteCallback dtor calls `async.send()`, here `f` is + // always called later. + + // In the dtor both the exit flag is set and the async + // callback fired under a lock. Here, before calling `f`, + // we take the lock and check the flag. Because we are + // checking the flag before calling `f`, and the flag is + // set under the same lock as the send, then if the flag + // is set then we're guaranteed to call `f` after the + // final send. + + // If the check was done after `f()` then there would be a + // period between that call and the check where the dtor + // could be called in the other thread, missing the final + // callback while still destroying the handle. + + let should_exit = unsafe { + exit_flag_clone.with_imm(|&should_exit| should_exit) + }; + f(); - unsafe { - do exit_flag_clone.with_imm |&should_exit| { - if should_exit { - watcher.close(||()); - } - } + + if should_exit { + watcher.close(||()); } + }; UvRemoteCallback { async: async, @@ -218,7 +290,10 @@ mod test_remote { let tube_clone = tube_clone.clone(); let tube_clone_cell = Cell::new(tube_clone); let remote = do sched.event_loop.remote_callback { - tube_clone_cell.take().send(1); + // This could be called multiple times + if !tube_clone_cell.is_empty() { + tube_clone_cell.take().send(1); + } }; remote_cell.put_back(remote); }