Make start address of `osReserveHeapMemory` tunable via command line -xb
[ghc.git] / rts / Schedule.c
index c3911af..544b9c2 100644 (file)
@@ -574,6 +574,7 @@ removeFromRunQueue (Capability *cap, StgTSO *tso)
         setTSOPrev(cap, tso->_link, tso->block_info.prev);
     }
     tso->_link = tso->block_info.prev = END_TSO_QUEUE;
+    cap->n_run_queue--;
 
     IF_DEBUG(sanity, checkRunQueue(cap));
 }
@@ -639,7 +640,7 @@ shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
     // progress at all.
 
     return ((pending_sync && !didGcLast) ||
-            cap->returning_tasks_hd != NULL ||
+            cap->n_returning_tasks != 0 ||
             (!emptyRunQueue(cap) && (task->incall->tso == NULL
                                      ? peekRunQueue(cap)->bound != NULL
                                      : peekRunQueue(cap)->bound != task->incall)));
@@ -700,31 +701,18 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
 
     Capability *free_caps[n_capabilities], *cap0;
     uint32_t i, n_wanted_caps, n_free_caps;
-    StgTSO *t;
 
-    // migration can be turned off with +RTS -qm
-    if (!RtsFlags.ParFlags.migrate) return;
+    uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
 
-    // Check whether we have more threads on our run queue, or sparks
-    // in our pool, that we could hand to another Capability.
-    if (emptyRunQueue(cap)) {
-        if (sparkPoolSizeCap(cap) < 2) return;
-    } else {
-        if (singletonRunQueue(cap) &&
-            sparkPoolSizeCap(cap) < 1) return;
+    // migration can be turned off with +RTS -qm
+    if (!RtsFlags.ParFlags.migrate) {
+        spare_threads = 0;
     }
 
     // Figure out how many capabilities we want to wake up.  We need at least
     // sparkPoolSize(cap) plus the number of spare threads we have.
-    t = cap->run_queue_hd;
-    n_wanted_caps = sparkPoolSizeCap(cap);
-    if (t != END_TSO_QUEUE) {
-        do {
-            t = t->_link;
-            if (t == END_TSO_QUEUE) break;
-            n_wanted_caps++;
-        } while (n_wanted_caps < n_capabilities-1);
-    }
+    n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
+    if (n_wanted_caps == 0) return;
 
     // First grab as many free Capabilities as we can.  ToDo: we should use
     // capabilities on the same NUMA node preferably, but not exclusively.
@@ -734,7 +722,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
         cap0 = capabilities[i];
         if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
             if (!emptyRunQueue(cap0)
-                || cap0->returning_tasks_hd != NULL
+                || cap0->n_returning_tasks != 0
                 || cap0->inbox != (Message*)END_TSO_QUEUE) {
                 // it already has some work, we just grabbed it at
                 // the wrong moment.  Or maybe it's deadlocked!
@@ -745,10 +733,16 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
         }
     }
 
-    // we now have n_free_caps free capabilities stashed in
-    // free_caps[].  Share our run queue equally with them.  This is
-    // probably the simplest thing we could do; improvements we might
-    // want to do include:
+    // We now have n_free_caps free capabilities stashed in
+    // free_caps[].  Attempt to share our run queue equally with them.
+    // This is complicated slightly by the fact that we can't move
+    // some threads:
+    //
+    //  - threads that have TSO_LOCKED cannot migrate
+    //  - a thread that is bound to the current Task cannot be migrated
+    //
+    // This is about the simplest thing we could do; improvements we
+    // might want to do include:
     //
     //   - giving high priority to moving relatively new threads, on
     //     the gournds that they haven't had time to build up a
@@ -758,84 +752,81 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
 
     if (n_free_caps > 0) {
         StgTSO *prev, *t, *next;
-#ifdef SPARK_PUSHING
-        rtsBool pushed_to_all;
-#endif
 
         debugTrace(DEBUG_sched,
-                   "cap %d: %s and %d free capabilities, sharing...",
-                   cap->no,
-                   (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
-                   "excess threads on run queue":"sparks to share (>=2)",
+                   "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
+                   cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
                    n_free_caps);
 
-        i = 0;
-#ifdef SPARK_PUSHING
-        pushed_to_all = rtsFalse;
-#endif
-
-        if (cap->run_queue_hd != END_TSO_QUEUE) {
-            prev = cap->run_queue_hd;
-            t = prev->_link;
-            prev->_link = END_TSO_QUEUE;
-            for (; t != END_TSO_QUEUE; t = next) {
-                next = t->_link;
-                t->_link = END_TSO_QUEUE;
-                if (t->bound == task->incall // don't move my bound thread
-                    || tsoLocked(t)) {  // don't move a locked thread
-                    setTSOLink(cap, prev, t);
-                    setTSOPrev(cap, t, prev);
-                    prev = t;
-                } else if (i == n_free_caps) {
-#ifdef SPARK_PUSHING
-                    pushed_to_all = rtsTrue;
-#endif
-                    i = 0;
-                    // keep one for us
-                    setTSOLink(cap, prev, t);
-                    setTSOPrev(cap, t, prev);
-                    prev = t;
+        // There are n_free_caps+1 caps in total.  We will share the threads
+        // evently between them, *except* that if the run queue does not divide
+        // evenly by n_free_caps+1 then we bias towards the current capability.
+        // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
+        uint32_t keep_threads =
+            (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
+
+        // This also ensures that we don't give away all our threads, since
+        // (x + y) / (y + 1) >= 1 when x >= 1.
+
+        // The number of threads we have left.
+        uint32_t n = cap->n_run_queue;
+
+        // prev = the previous thread on this cap's run queue
+        prev = END_TSO_QUEUE;
+
+        // We're going to walk through the run queue, migrating threads to other
+        // capabilities until we have only keep_threads left.  We might
+        // encounter a thread that cannot be migrated, in which case we add it
+        // to the current run queue and decrement keep_threads.
+        for (t = cap->run_queue_hd, i = 0;
+             t != END_TSO_QUEUE && n > keep_threads;
+             t = next)
+        {
+            next = t->_link;
+            t->_link = END_TSO_QUEUE;
+
+            // Should we keep this thread?
+            if (t->bound == task->incall // don't move my bound thread
+                || tsoLocked(t) // don't move a locked thread
+                ) {
+                if (prev == END_TSO_QUEUE) {
+                    cap->run_queue_hd = t;
                 } else {
-                    appendToRunQueue(free_caps[i],t);
-
-                    traceEventMigrateThread (cap, t, free_caps[i]->no);
-
-                    if (t->bound) { t->bound->task->cap = free_caps[i]; }
-                    t->cap = free_caps[i];
-                    i++;
+                    setTSOLink(cap, prev, t);
                 }
+                setTSOPrev(cap, t, prev);
+                prev = t;
+                if (keep_threads > 0) keep_threads--;
             }
-            cap->run_queue_tl = prev;
 
-            IF_DEBUG(sanity, checkRunQueue(cap));
-        }
+            // Or migrate it?
+            else {
+                appendToRunQueue(free_caps[i],t);
+                traceEventMigrateThread (cap, t, free_caps[i]->no);
 
-#ifdef SPARK_PUSHING
-        /* JB I left this code in place, it would work but is not necessary */
-
-        // If there are some free capabilities that we didn't push any
-        // threads to, then try to push a spark to each one.
-        if (!pushed_to_all) {
-            StgClosure *spark;
-            // i is the next free capability to push to
-            for (; i < n_free_caps; i++) {
-                if (emptySparkPoolCap(free_caps[i])) {
-                    spark = tryStealSpark(cap->sparks);
-                    if (spark != NULL) {
-                        /* TODO: if anyone wants to re-enable this code then
-                         * they must consider the fizzledSpark(spark) case
-                         * and update the per-cap spark statistics.
-                         */
-                        debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
-
-            traceEventStealSpark(free_caps[i], t, cap->no);
-
-                        newSpark(&(free_caps[i]->r), spark);
-                    }
-                }
+                if (t->bound) { t->bound->task->cap = free_caps[i]; }
+                t->cap = free_caps[i];
+                n--; // we have one fewer threads now
+                i++; // move on to the next free_cap
+                if (i == n_free_caps) i = 0;
             }
         }
-#endif /* SPARK_PUSHING */
+
+        // Join up the beginning of the queue (prev)
+        // with the rest of the queue (t)
+        if (t == END_TSO_QUEUE) {
+            cap->run_queue_tl = prev;
+        } else {
+            setTSOPrev(cap, t, prev);
+        }
+        if (prev == END_TSO_QUEUE) {
+            cap->run_queue_hd = t;
+        } else {
+            setTSOLink(cap, prev, t);
+        }
+        cap->n_run_queue = n;
+
+        IF_DEBUG(sanity, checkRunQueue(cap));
 
         // release the capabilities
         for (i = 0; i < n_free_caps; i++) {
@@ -2039,10 +2030,12 @@ forkProcess(HsStablePtr *entry
             // bound threads for which the corresponding Task does not
             // exist.
             truncateRunQueue(cap);
+            cap->n_run_queue = 0;
 
             // Any suspended C-calling Tasks are no more, their OS threads
             // don't exist now:
             cap->suspended_ccalls = NULL;
+            cap->n_suspended_ccalls = 0;
 
 #if defined(THREADED_RTS)
             // Wipe our spare workers list, they no longer exist.  New
@@ -2051,6 +2044,7 @@ forkProcess(HsStablePtr *entry
             cap->n_spare_workers = 0;
             cap->returning_tasks_hd = NULL;
             cap->returning_tasks_tl = NULL;
+            cap->n_returning_tasks = 0;
 #endif
 
             // Release all caps except 0, we'll use that for starting
@@ -2205,9 +2199,6 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
         n_capabilities = enabled_capabilities = new_n_capabilities;
     }
 
-    // Start worker tasks on the new Capabilities
-    startWorkerTasks(old_n_capabilities, new_n_capabilities);
-
     // We're done: release the original Capabilities
     releaseAllCapabilities(old_n_capabilities, cap,task);
 
@@ -2277,6 +2268,7 @@ suspendTask (Capability *cap, Task *task)
         cap->suspended_ccalls->prev = incall;
     }
     cap->suspended_ccalls = incall;
+    cap->n_suspended_ccalls++;
 }
 
 STATIC_INLINE void
@@ -2295,6 +2287,7 @@ recoverSuspendedTask (Capability *cap, Task *task)
         incall->next->prev = incall->prev;
     }
     incall->next = incall->prev = NULL;
+    cap->n_suspended_ccalls--;
 }
 
 /* ---------------------------------------------------------------------------