Fix to thread migration
authorSimon Marlow <marlowsd@gmail.com>
Wed, 27 Jul 2016 15:00:08 +0000 (16:00 +0100)
committerSimon Marlow <marlowsd@gmail.com>
Wed, 3 Aug 2016 07:07:34 +0000 (08:07 +0100)
Summary:
If we had 2 threads on the run queue, say [A,B], and B is bound to the
current Task, then we would fail to migrate any threads.  This fixes it
so that we would migrate A in that case.

This will help parallelism a bit in programs that have lots of bound
threads.

Test Plan:
Test program in #12419, which is actually not a great program but it
does behave a bit better after this change.

Reviewers: ezyang, niteria, bgamari, austin, erikd

Subscribers: thomie

Differential Revision: https://phabricator.haskell.org/D2430

GHC Trac Issues: #12419

rts/Schedule.c

index ee2d7db..908acf2 100644 (file)
@@ -702,13 +702,16 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
     Capability *free_caps[n_capabilities], *cap0;
     uint32_t i, n_wanted_caps, n_free_caps;
 
+    uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
+
     // migration can be turned off with +RTS -qm
-    if (!RtsFlags.ParFlags.migrate) return;
+    if (!RtsFlags.ParFlags.migrate) {
+        spare_threads = 0;
+    }
 
     // Figure out how many capabilities we want to wake up.  We need at least
     // sparkPoolSize(cap) plus the number of spare threads we have.
-    n_wanted_caps = sparkPoolSizeCap(cap) + cap->n_run_queue - 1;
-
+    n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
     if (n_wanted_caps == 0) return;
 
     // First grab as many free Capabilities as we can.  ToDo: we should use
@@ -730,10 +733,22 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
         }
     }
 
-    // we now have n_free_caps free capabilities stashed in
-    // free_caps[].  Share our run queue equally with them.  This is
-    // probably the simplest thing we could do; improvements we might
-    // want to do include:
+    // We now have n_free_caps free capabilities stashed in
+    // free_caps[].  Attempt to share our run queue equally with them.
+    // This is complicated slightly by the fact that we can't move
+    // some threads:
+    //
+    //  - threads that have TSO_LOCKED cannot migrate
+    //  - a thread that is bound to the current Task cannot be migrated
+    //
+    // So we walk through the run queue, migrating threads to
+    // free_caps[] round-robin, skipping over immovable threads.  Each
+    // time through free_caps[] we keep one thread for ourselves,
+    // provided we haven't encountered one or more immovable threads
+    // in this pass.
+    //
+    // This is about the simplest thing we could do; improvements we
+    // might want to do include:
     //
     //   - giving high priority to moving relatively new threads, on
     //     the gournds that they haven't had time to build up a
@@ -748,10 +763,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
 #endif
 
         debugTrace(DEBUG_sched,
-                   "cap %d: %s and %d free capabilities, sharing...",
-                   cap->no,
-                   (cap->n_run_queue > 1)?
-                   "excess threads on run queue":"sparks to share (>=2)",
+                   "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
+                   cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
                    n_free_caps);
 
         i = 0;
@@ -759,27 +772,56 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
         pushed_to_all = rtsFalse;
 #endif
 
-        if (cap->run_queue_hd != END_TSO_QUEUE) {
-            prev = cap->run_queue_hd;
-            t = prev->_link;
-            prev->_link = END_TSO_QUEUE;
+        // We want to share threads equally amongst free_caps[] and the
+        // current capability, but sometimes we encounter immovable
+        // threads.  This counter tracks the number of threads we have kept
+        // for the current capability minus the number of passes over
+        // free_caps[]. If it is great than zero (due to immovable
+        // threads), we should try to bring it back to zero again by not
+        // keeping any threads for the current capability.
+        uint32_t imbalance = 0;
+
+        // n_free_caps may be larger than the number of spare threads we have,
+        // if there were sparks in the spark pool. To avoid giving away all our
+        // threads in this case, we limit the number of caps that we give
+        // threads to, to the number of spare threads (n_run_queue-1).
+        uint32_t thread_recipients = stg_min(spare_threads, n_free_caps);
+
+        if (thread_recipients > 0) {
+            prev = END_TSO_QUEUE;
+            t = cap->run_queue_hd;
             for (; t != END_TSO_QUEUE; t = next) {
                 next = t->_link;
                 t->_link = END_TSO_QUEUE;
                 if (t->bound == task->incall // don't move my bound thread
                     || tsoLocked(t)) {  // don't move a locked thread
-                    setTSOLink(cap, prev, t);
+                    if (prev == END_TSO_QUEUE) {
+                        cap->run_queue_hd = t;
+                    } else {
+                        setTSOLink(cap, prev, t);
+                    }
                     setTSOPrev(cap, t, prev);
                     prev = t;
-                } else if (i == n_free_caps) {
+                    imbalance++;
+                } else if (i == thread_recipients) {
 #ifdef SPARK_PUSHING
                     pushed_to_all = rtsTrue;
 #endif
+                    // If we have not already kept any threads for this
+                    // capability during the current pass over free_caps[],
+                    // keep one now.
+                    if (imbalance == 0) {
+                        if (prev == END_TSO_QUEUE) {
+                            cap->run_queue_hd = t;
+                        } else {
+                            setTSOLink(cap, prev, t);
+                        }
+                        setTSOPrev(cap, t, prev);
+                        prev = t;
+                    } else {
+                        imbalance--;
+                    }
                     i = 0;
-                    // keep one for us
-                    setTSOLink(cap, prev, t);
-                    setTSOPrev(cap, t, prev);
-                    prev = t;
                 } else {
                     appendToRunQueue(free_caps[i],t);
                     cap->n_run_queue--;
@@ -2194,9 +2236,6 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
         n_capabilities = enabled_capabilities = new_n_capabilities;
     }
 
-    // Start worker tasks on the new Capabilities
-    startWorkerTasks(old_n_capabilities, new_n_capabilities);
-
     // We're done: release the original Capabilities
     releaseAllCapabilities(old_n_capabilities, cap,task);