Another try to get thread migration right
[ghc.git] / rts / Schedule.c
index 1f3fa36..74859af 100644 (file)
@@ -736,12 +736,6 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
     //  - threads that have TSO_LOCKED cannot migrate
     //  - a thread that is bound to the current Task cannot be migrated
     //
-    // So we walk through the run queue, migrating threads to
-    // free_caps[] round-robin, skipping over immovable threads.  Each
-    // time through free_caps[] we keep one thread for ourselves,
-    // provided we haven't encountered one or more immovable threads
-    // in this pass.
-    //
     // This is about the simplest thing we could do; improvements we
     // might want to do include:
     //
@@ -753,112 +747,81 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
 
     if (n_free_caps > 0) {
         StgTSO *prev, *t, *next;
-#ifdef SPARK_PUSHING
-        rtsBool pushed_to_all;
-#endif
 
         debugTrace(DEBUG_sched,
                    "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
                    cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
                    n_free_caps);
 
-        i = 0;
-#ifdef SPARK_PUSHING
-        pushed_to_all = rtsFalse;
-#endif
-
-        // We want to share threads equally amongst free_caps[] and the
-        // current capability, but sometimes we encounter immovable
-        // threads.  This counter tracks the number of threads we have kept
-        // for the current capability minus the number of passes over
-        // free_caps[]. If it is great than zero (due to immovable
-        // threads), we should try to bring it back to zero again by not
-        // keeping any threads for the current capability.
-        uint32_t imbalance = 0;
-
-        // n_free_caps may be larger than the number of spare threads we have,
-        // if there were sparks in the spark pool. To avoid giving away all our
-        // threads in this case, we limit the number of caps that we give
-        // threads to, to the number of spare threads (n_run_queue-1).
-        uint32_t thread_recipients = stg_min(spare_threads, n_free_caps);
-
-        if (thread_recipients > 0) {
-            prev = END_TSO_QUEUE;
-            t = cap->run_queue_hd;
-            for (; t != END_TSO_QUEUE; t = next) {
-                next = t->_link;
-                t->_link = END_TSO_QUEUE;
-                if (t->bound == task->incall // don't move my bound thread
-                    || tsoLocked(t)) {  // don't move a locked thread
-                    if (prev == END_TSO_QUEUE) {
-                        cap->run_queue_hd = t;
-                    } else {
-                        setTSOLink(cap, prev, t);
-                    }
-                    setTSOPrev(cap, t, prev);
-                    prev = t;
-                    imbalance++;
-                } else if (i == thread_recipients) {
-#ifdef SPARK_PUSHING
-                    pushed_to_all = rtsTrue;
-#endif
-                    // If we have not already kept any threads for this
-                    // capability during the current pass over free_caps[],
-                    // keep one now.
-                    if (imbalance == 0) {
-                        if (prev == END_TSO_QUEUE) {
-                            cap->run_queue_hd = t;
-                        } else {
-                            setTSOLink(cap, prev, t);
-                        }
-                        setTSOPrev(cap, t, prev);
-                        prev = t;
-                    } else {
-                        imbalance--;
-                    }
-                    i = 0;
+        // There are n_free_caps+1 caps in total.  We will share the threads
+        // evently between them, *except* that if the run queue does not divide
+        // evenly by n_free_caps+1 then we bias towards the current capability.
+        // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
+        uint32_t keep_threads =
+            (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
+
+        // This also ensures that we don't give away all our threads, since
+        // (x + y) / (y + 1) >= 1 when x >= 1.
+
+        // The number of threads we have left.
+        uint32_t n = cap->n_run_queue;
+
+        // prev = the previous thread on this cap's run queue
+        prev = END_TSO_QUEUE;
+
+        // We're going to walk through the run queue, migrating threads to other
+        // capabilities until we have only keep_threads left.  We might
+        // encounter a thread that cannot be migrated, in which case we add it
+        // to the current run queue and decrement keep_threads.
+        for (t = cap->run_queue_hd, i = 0;
+             t != END_TSO_QUEUE && n > keep_threads;
+             t = next)
+        {
+            next = t->_link;
+            t->_link = END_TSO_QUEUE;
+
+            // Should we keep this thread?
+            if (t->bound == task->incall // don't move my bound thread
+                || tsoLocked(t) // don't move a locked thread
+                ) {
+                if (prev == END_TSO_QUEUE) {
+                    cap->run_queue_hd = t;
                 } else {
-                    appendToRunQueue(free_caps[i],t);
-                    cap->n_run_queue--;
-
-                    traceEventMigrateThread (cap, t, free_caps[i]->no);
-
-                    if (t->bound) { t->bound->task->cap = free_caps[i]; }
-                    t->cap = free_caps[i];
-                    i++;
+                    setTSOLink(cap, prev, t);
                 }
+                setTSOPrev(cap, t, prev);
+                prev = t;
+                if (keep_threads > 0) keep_threads--;
             }
-            cap->run_queue_tl = prev;
 
-            IF_DEBUG(sanity, checkRunQueue(cap));
-        }
+            // Or migrate it?
+            else {
+                appendToRunQueue(free_caps[i],t);
+                traceEventMigrateThread (cap, t, free_caps[i]->no);
 
-#ifdef SPARK_PUSHING
-        /* JB I left this code in place, it would work but is not necessary */
-
-        // If there are some free capabilities that we didn't push any
-        // threads to, then try to push a spark to each one.
-        if (!pushed_to_all) {
-            StgClosure *spark;
-            // i is the next free capability to push to
-            for (; i < n_free_caps; i++) {
-                if (emptySparkPoolCap(free_caps[i])) {
-                    spark = tryStealSpark(cap->sparks);
-                    if (spark != NULL) {
-                        /* TODO: if anyone wants to re-enable this code then
-                         * they must consider the fizzledSpark(spark) case
-                         * and update the per-cap spark statistics.
-                         */
-                        debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
-
-            traceEventStealSpark(free_caps[i], t, cap->no);
-
-                        newSpark(&(free_caps[i]->r), spark);
-                    }
-                }
+                if (t->bound) { t->bound->task->cap = free_caps[i]; }
+                t->cap = free_caps[i];
+                n--; // we have one fewer threads now
+                i++; // move on to the next free_cap
+                if (i == n_free_caps) i = 0;
             }
         }
-#endif /* SPARK_PUSHING */
+
+        // Join up the beginning of the queue (prev)
+        // with the rest of the queue (t)
+        if (t == END_TSO_QUEUE) {
+            cap->run_queue_tl = prev;
+        } else {
+            setTSOPrev(cap, t, prev);
+        }
+        if (prev == END_TSO_QUEUE) {
+            cap->run_queue_hd = t;
+        } else {
+            setTSOLink(cap, prev, t);
+        }
+        cap->n_run_queue = n;
+
+        IF_DEBUG(sanity, checkRunQueue(cap));
 
         // release the capabilities
         for (i = 0; i < n_free_caps; i++) {