Another try to get thread migration right
authorSimon Marlow <marlowsd@gmail.com>
Thu, 4 Aug 2016 14:59:43 +0000 (15:59 +0100)
committerSimon Marlow <marlowsd@gmail.com>
Fri, 5 Aug 2016 09:13:28 +0000 (10:13 +0100)
Summary:
This is surprisingly tricky.  There were linked list bugs in the
previous version (D2430) that showed up as a test failure in
setnumcapabilities001 (that's a great stress test!).

This new version uses a different strategy that doesn't suffer from
the problem that @ezyang pointed out in D2430.  We now pre-calculate
how many threads to keep for this capability, and then migrate any
surplus threads off the front of the queue, taking care to account for
threads that can't be migrated.

Test Plan:
1. setnumcapabilities001 stress test with sanity checking (+RTS -DS) turned on:

```
cd testsuite/tests/concurrent/should_run
make TEST=setnumcapabilities001 WAY=threaded1 EXTRA_HC_OPTS=-with-rtsopts=-DS CLEANUP=0
while true; do ./setnumcapabilities001.run/setnumcapabilities001 4 9 2000 || break; done
```

2. The test case from #12419

Reviewers: niteria, ezyang, rwbarton, austin, bgamari, erikd

Subscribers: thomie, ezyang

Differential Revision: https://phabricator.haskell.org/D2441

GHC Trac Issues: #12419

rts/Schedule.c

index 908acf2..544b9c2 100644 (file)
@@ -741,12 +741,6 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
     //  - threads that have TSO_LOCKED cannot migrate
     //  - a thread that is bound to the current Task cannot be migrated
     //
-    // So we walk through the run queue, migrating threads to
-    // free_caps[] round-robin, skipping over immovable threads.  Each
-    // time through free_caps[] we keep one thread for ourselves,
-    // provided we haven't encountered one or more immovable threads
-    // in this pass.
-    //
     // This is about the simplest thing we could do; improvements we
     // might want to do include:
     //
@@ -758,112 +752,81 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
 
     if (n_free_caps > 0) {
         StgTSO *prev, *t, *next;
-#ifdef SPARK_PUSHING
-        rtsBool pushed_to_all;
-#endif
 
         debugTrace(DEBUG_sched,
                    "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
                    cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
                    n_free_caps);
 
-        i = 0;
-#ifdef SPARK_PUSHING
-        pushed_to_all = rtsFalse;
-#endif
-
-        // We want to share threads equally amongst free_caps[] and the
-        // current capability, but sometimes we encounter immovable
-        // threads.  This counter tracks the number of threads we have kept
-        // for the current capability minus the number of passes over
-        // free_caps[]. If it is great than zero (due to immovable
-        // threads), we should try to bring it back to zero again by not
-        // keeping any threads for the current capability.
-        uint32_t imbalance = 0;
-
-        // n_free_caps may be larger than the number of spare threads we have,
-        // if there were sparks in the spark pool. To avoid giving away all our
-        // threads in this case, we limit the number of caps that we give
-        // threads to, to the number of spare threads (n_run_queue-1).
-        uint32_t thread_recipients = stg_min(spare_threads, n_free_caps);
-
-        if (thread_recipients > 0) {
-            prev = END_TSO_QUEUE;
-            t = cap->run_queue_hd;
-            for (; t != END_TSO_QUEUE; t = next) {
-                next = t->_link;
-                t->_link = END_TSO_QUEUE;
-                if (t->bound == task->incall // don't move my bound thread
-                    || tsoLocked(t)) {  // don't move a locked thread
-                    if (prev == END_TSO_QUEUE) {
-                        cap->run_queue_hd = t;
-                    } else {
-                        setTSOLink(cap, prev, t);
-                    }
-                    setTSOPrev(cap, t, prev);
-                    prev = t;
-                    imbalance++;
-                } else if (i == thread_recipients) {
-#ifdef SPARK_PUSHING
-                    pushed_to_all = rtsTrue;
-#endif
-                    // If we have not already kept any threads for this
-                    // capability during the current pass over free_caps[],
-                    // keep one now.
-                    if (imbalance == 0) {
-                        if (prev == END_TSO_QUEUE) {
-                            cap->run_queue_hd = t;
-                        } else {
-                            setTSOLink(cap, prev, t);
-                        }
-                        setTSOPrev(cap, t, prev);
-                        prev = t;
-                    } else {
-                        imbalance--;
-                    }
-                    i = 0;
+        // There are n_free_caps+1 caps in total.  We will share the threads
+        // evently between them, *except* that if the run queue does not divide
+        // evenly by n_free_caps+1 then we bias towards the current capability.
+        // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
+        uint32_t keep_threads =
+            (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
+
+        // This also ensures that we don't give away all our threads, since
+        // (x + y) / (y + 1) >= 1 when x >= 1.
+
+        // The number of threads we have left.
+        uint32_t n = cap->n_run_queue;
+
+        // prev = the previous thread on this cap's run queue
+        prev = END_TSO_QUEUE;
+
+        // We're going to walk through the run queue, migrating threads to other
+        // capabilities until we have only keep_threads left.  We might
+        // encounter a thread that cannot be migrated, in which case we add it
+        // to the current run queue and decrement keep_threads.
+        for (t = cap->run_queue_hd, i = 0;
+             t != END_TSO_QUEUE && n > keep_threads;
+             t = next)
+        {
+            next = t->_link;
+            t->_link = END_TSO_QUEUE;
+
+            // Should we keep this thread?
+            if (t->bound == task->incall // don't move my bound thread
+                || tsoLocked(t) // don't move a locked thread
+                ) {
+                if (prev == END_TSO_QUEUE) {
+                    cap->run_queue_hd = t;
                 } else {
-                    appendToRunQueue(free_caps[i],t);
-                    cap->n_run_queue--;
-
-                    traceEventMigrateThread (cap, t, free_caps[i]->no);
-
-                    if (t->bound) { t->bound->task->cap = free_caps[i]; }
-                    t->cap = free_caps[i];
-                    i++;
+                    setTSOLink(cap, prev, t);
                 }
+                setTSOPrev(cap, t, prev);
+                prev = t;
+                if (keep_threads > 0) keep_threads--;
             }
-            cap->run_queue_tl = prev;
 
-            IF_DEBUG(sanity, checkRunQueue(cap));
-        }
+            // Or migrate it?
+            else {
+                appendToRunQueue(free_caps[i],t);
+                traceEventMigrateThread (cap, t, free_caps[i]->no);
 
-#ifdef SPARK_PUSHING
-        /* JB I left this code in place, it would work but is not necessary */
-
-        // If there are some free capabilities that we didn't push any
-        // threads to, then try to push a spark to each one.
-        if (!pushed_to_all) {
-            StgClosure *spark;
-            // i is the next free capability to push to
-            for (; i < n_free_caps; i++) {
-                if (emptySparkPoolCap(free_caps[i])) {
-                    spark = tryStealSpark(cap->sparks);
-                    if (spark != NULL) {
-                        /* TODO: if anyone wants to re-enable this code then
-                         * they must consider the fizzledSpark(spark) case
-                         * and update the per-cap spark statistics.
-                         */
-                        debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
-
-            traceEventStealSpark(free_caps[i], t, cap->no);
-
-                        newSpark(&(free_caps[i]->r), spark);
-                    }
-                }
+                if (t->bound) { t->bound->task->cap = free_caps[i]; }
+                t->cap = free_caps[i];
+                n--; // we have one fewer threads now
+                i++; // move on to the next free_cap
+                if (i == n_free_caps) i = 0;
             }
         }
-#endif /* SPARK_PUSHING */
+
+        // Join up the beginning of the queue (prev)
+        // with the rest of the queue (t)
+        if (t == END_TSO_QUEUE) {
+            cap->run_queue_tl = prev;
+        } else {
+            setTSOPrev(cap, t, prev);
+        }
+        if (prev == END_TSO_QUEUE) {
+            cap->run_queue_hd = t;
+        } else {
+            setTSOLink(cap, prev, t);
+        }
+        cap->n_run_queue = n;
+
+        IF_DEBUG(sanity, checkRunQueue(cap));
 
         // release the capabilities
         for (i = 0; i < n_free_caps; i++) {