Better abstraction over run queues.
authorEdward Z. Yang <ezyang@mit.edu>
Tue, 15 Jan 2013 23:04:08 +0000 (15:04 -0800)
committerEdward Z. Yang <ezyang@mit.edu>
Wed, 16 Jan 2013 21:49:01 +0000 (13:49 -0800)
This adds some new functions: peekRunQueue, promoteInRunQueue,
singletonRunQueue and truncateRunQueue which help abstract away
manual linked list manipulation, making it easier to swap in
a new queue implementation.

Signed-off-by: Edward Z. Yang <ezyang@mit.edu>
rts/Capability.c
rts/Messages.c
rts/Schedule.c
rts/Schedule.h

index d02c34d..811df58 100644 (file)
@@ -472,13 +472,13 @@ releaseCapability_ (Capability* cap,
 
     // If the next thread on the run queue is a bound thread,
     // give this Capability to the appropriate Task.
-    if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
+    if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) {
        // Make sure we're not about to try to wake ourselves up
        // ASSERT(task != cap->run_queue_hd->bound);
         // assertion is false: in schedule() we force a yield after
        // ThreadBlocked, but the thread may be back on the run queue
        // by now.
-       task = cap->run_queue_hd->bound->task;
+       task = peekRunQueue(cap)->bound->task;
        giveCapabilityToTask(cap, task);
        return;
     }
index 34dcbdf..c5988f8 100644 (file)
@@ -246,8 +246,7 @@ loop:
         // the current thread, since in that case it will not be on
         // the run queue.
         if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
-            removeFromRunQueue(cap, owner);
-            pushOnRunQueue(cap,owner);
+            promoteInRunQueue(cap, owner);
         }
 
         // point to the BLOCKING_QUEUE from the BLACKHOLE
@@ -293,8 +292,7 @@ loop:
 
         // See above, #3838
         if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
-            removeFromRunQueue(cap, owner);
-            pushOnRunQueue(cap,owner);
+            promoteInRunQueue(cap, owner);
         }
 
         return 1; // blocked
index bb45af9..a21b312 100644 (file)
@@ -579,6 +579,13 @@ removeFromRunQueue (Capability *cap, StgTSO *tso)
     IF_DEBUG(sanity, checkRunQueue(cap));
 }
 
+void
+promoteInRunQueue (Capability *cap, StgTSO *tso)
+{
+    removeFromRunQueue(cap, tso);
+    pushOnRunQueue(cap, tso);
+}
+
 /* ----------------------------------------------------------------------------
  * Setting up the scheduler loop
  * ------------------------------------------------------------------------- */
@@ -635,8 +642,8 @@ shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
     return ((pending_sync && !didGcLast) ||
             cap->returning_tasks_hd != NULL ||
             (!emptyRunQueue(cap) && (task->incall->tso == NULL
-                                     ? cap->run_queue_hd->bound != NULL
-                                     : cap->run_queue_hd->bound != task->incall)));
+                                     ? peekRunQueue(cap)->bound != NULL
+                                     : peekRunQueue(cap)->bound != task->incall)));
 }
 
 // This is the single place where a Task goes to sleep.  There are
@@ -700,10 +707,10 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
 
     // Check whether we have more threads on our run queue, or sparks
     // in our pool, that we could hand to another Capability.
-    if (cap->run_queue_hd == END_TSO_QUEUE) {
+    if (emptyRunQueue(cap)) {
         if (sparkPoolSizeCap(cap) < 2) return;
     } else {
-        if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
+        if (singletonRunQueue(cap) &&
             sparkPoolSizeCap(cap) < 1) return;
     }
 
@@ -743,7 +750,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
        debugTrace(DEBUG_sched, 
                   "cap %d: %s and %d free capabilities, sharing...", 
                   cap->no, 
-                  (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
+                  (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
                   "excess threads on run queue":"sparks to share (>=2)",
                   n_free_caps);
 
@@ -1860,8 +1867,7 @@ forkProcess(HsStablePtr *entry
             // cleaned up later, but some of them may correspond to
             // bound threads for which the corresponding Task does not
             // exist.
-            cap->run_queue_hd = END_TSO_QUEUE;
-            cap->run_queue_tl = END_TSO_QUEUE;
+            truncateRunQueue(cap);
 
             // Any suspended C-calling Tasks are no more, their OS threads
             // don't exist now:
index a44949e..8b7caea 100644 (file)
@@ -183,7 +183,14 @@ popRunQueue (Capability *cap)
     return t;
 }
 
-extern void removeFromRunQueue (Capability *cap, StgTSO *tso);
+INLINE_HEADER StgTSO *
+peekRunQueue (Capability *cap)
+{
+    return cap->run_queue_hd;
+}
+
+void removeFromRunQueue (Capability *cap, StgTSO *tso);
+extern void promoteInRunQueue (Capability *cap, StgTSO *tso);
 
 /* Add a thread to the end of the blocked queue.
  */
@@ -215,6 +222,22 @@ emptyRunQueue(Capability *cap)
     return emptyQueue(cap->run_queue_hd);
 }
 
+/* assumes that the queue is not empty; so combine this with
+ * an emptyRunQueue check! */
+INLINE_HEADER rtsBool
+singletonRunQueue(Capability *cap)
+{
+    ASSERT(!emptyRunQueue(cap));
+    return cap->run_queue_hd->_link == END_TSO_QUEUE;
+}
+
+INLINE_HEADER void
+truncateRunQueue(Capability *cap)
+{
+    cap->run_queue_hd = END_TSO_QUEUE;
+    cap->run_queue_tl = END_TSO_QUEUE;
+}
+
 #if !defined(THREADED_RTS)
 #define EMPTY_BLOCKED_QUEUE()  (emptyQueue(blocked_queue_hd))
 #define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))