Make start address of `osReserveHeapMemory` tunable via command line -xb
[ghc.git] / rts / Schedule.c
index 8bbc9cf..544b9c2 100644 (file)
@@ -574,6 +574,7 @@ removeFromRunQueue (Capability *cap, StgTSO *tso)
         setTSOPrev(cap, tso->_link, tso->block_info.prev);
     }
     tso->_link = tso->block_info.prev = END_TSO_QUEUE;
+    cap->n_run_queue--;
 
     IF_DEBUG(sanity, checkRunQueue(cap));
 }
@@ -639,7 +640,7 @@ shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
     // progress at all.
 
     return ((pending_sync && !didGcLast) ||
-            cap->returning_tasks_hd != NULL ||
+            cap->n_returning_tasks != 0 ||
             (!emptyRunQueue(cap) && (task->incall->tso == NULL
                                      ? peekRunQueue(cap)->bound != NULL
                                      : peekRunQueue(cap)->bound != task->incall)));
@@ -700,40 +701,28 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
 
     Capability *free_caps[n_capabilities], *cap0;
     uint32_t i, n_wanted_caps, n_free_caps;
-    StgTSO *t;
 
-    // migration can be turned off with +RTS -qm
-    if (!RtsFlags.ParFlags.migrate) return;
+    uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
 
-    // Check whether we have more threads on our run queue, or sparks
-    // in our pool, that we could hand to another Capability.
-    if (emptyRunQueue(cap)) {
-        if (sparkPoolSizeCap(cap) < 2) return;
-    } else {
-        if (singletonRunQueue(cap) &&
-            sparkPoolSizeCap(cap) < 1) return;
+    // migration can be turned off with +RTS -qm
+    if (!RtsFlags.ParFlags.migrate) {
+        spare_threads = 0;
     }
 
     // Figure out how many capabilities we want to wake up.  We need at least
     // sparkPoolSize(cap) plus the number of spare threads we have.
-    t = cap->run_queue_hd;
-    n_wanted_caps = sparkPoolSizeCap(cap);
-    if (t != END_TSO_QUEUE) {
-        do {
-            t = t->_link;
-            if (t == END_TSO_QUEUE) break;
-            n_wanted_caps++;
-        } while (n_wanted_caps < n_capabilities-1);
-    }
+    n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
+    if (n_wanted_caps == 0) return;
 
-    // Grab free capabilities, starting from cap->no+1.
+    // First grab as many free Capabilities as we can.  ToDo: we should use
+    // capabilities on the same NUMA node preferably, but not exclusively.
     for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
          n_free_caps < n_wanted_caps && i != cap->no;
          i = (i + 1) % n_capabilities) {
         cap0 = capabilities[i];
         if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
             if (!emptyRunQueue(cap0)
-                || cap0->returning_tasks_hd != NULL
+                || cap0->n_returning_tasks != 0
                 || cap0->inbox != (Message*)END_TSO_QUEUE) {
                 // it already has some work, we just grabbed it at
                 // the wrong moment.  Or maybe it's deadlocked!
@@ -744,10 +733,16 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
         }
     }
 
-    // we now have n_free_caps free capabilities stashed in
-    // free_caps[].  Share our run queue equally with them.  This is
-    // probably the simplest thing we could do; improvements we might
-    // want to do include:
+    // We now have n_free_caps free capabilities stashed in
+    // free_caps[].  Attempt to share our run queue equally with them.
+    // This is complicated slightly by the fact that we can't move
+    // some threads:
+    //
+    //  - threads that have TSO_LOCKED cannot migrate
+    //  - a thread that is bound to the current Task cannot be migrated
+    //
+    // This is about the simplest thing we could do; improvements we
+    // might want to do include:
     //
     //   - giving high priority to moving relatively new threads, on
     //     the gournds that they haven't had time to build up a
@@ -757,84 +752,81 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
 
     if (n_free_caps > 0) {
         StgTSO *prev, *t, *next;
-#ifdef SPARK_PUSHING
-        rtsBool pushed_to_all;
-#endif
 
         debugTrace(DEBUG_sched,
-                   "cap %d: %s and %d free capabilities, sharing...",
-                   cap->no,
-                   (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
-                   "excess threads on run queue":"sparks to share (>=2)",
+                   "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
+                   cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
                    n_free_caps);
 
-        i = 0;
-#ifdef SPARK_PUSHING
-        pushed_to_all = rtsFalse;
-#endif
-
-        if (cap->run_queue_hd != END_TSO_QUEUE) {
-            prev = cap->run_queue_hd;
-            t = prev->_link;
-            prev->_link = END_TSO_QUEUE;
-            for (; t != END_TSO_QUEUE; t = next) {
-                next = t->_link;
-                t->_link = END_TSO_QUEUE;
-                if (t->bound == task->incall // don't move my bound thread
-                    || tsoLocked(t)) {  // don't move a locked thread
-                    setTSOLink(cap, prev, t);
-                    setTSOPrev(cap, t, prev);
-                    prev = t;
-                } else if (i == n_free_caps) {
-#ifdef SPARK_PUSHING
-                    pushed_to_all = rtsTrue;
-#endif
-                    i = 0;
-                    // keep one for us
-                    setTSOLink(cap, prev, t);
-                    setTSOPrev(cap, t, prev);
-                    prev = t;
+        // There are n_free_caps+1 caps in total.  We will share the threads
+        // evently between them, *except* that if the run queue does not divide
+        // evenly by n_free_caps+1 then we bias towards the current capability.
+        // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
+        uint32_t keep_threads =
+            (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
+
+        // This also ensures that we don't give away all our threads, since
+        // (x + y) / (y + 1) >= 1 when x >= 1.
+
+        // The number of threads we have left.
+        uint32_t n = cap->n_run_queue;
+
+        // prev = the previous thread on this cap's run queue
+        prev = END_TSO_QUEUE;
+
+        // We're going to walk through the run queue, migrating threads to other
+        // capabilities until we have only keep_threads left.  We might
+        // encounter a thread that cannot be migrated, in which case we add it
+        // to the current run queue and decrement keep_threads.
+        for (t = cap->run_queue_hd, i = 0;
+             t != END_TSO_QUEUE && n > keep_threads;
+             t = next)
+        {
+            next = t->_link;
+            t->_link = END_TSO_QUEUE;
+
+            // Should we keep this thread?
+            if (t->bound == task->incall // don't move my bound thread
+                || tsoLocked(t) // don't move a locked thread
+                ) {
+                if (prev == END_TSO_QUEUE) {
+                    cap->run_queue_hd = t;
                 } else {
-                    appendToRunQueue(free_caps[i],t);
-
-                    traceEventMigrateThread (cap, t, free_caps[i]->no);
-
-                    if (t->bound) { t->bound->task->cap = free_caps[i]; }
-                    t->cap = free_caps[i];
-                    i++;
+                    setTSOLink(cap, prev, t);
                 }
+                setTSOPrev(cap, t, prev);
+                prev = t;
+                if (keep_threads > 0) keep_threads--;
             }
-            cap->run_queue_tl = prev;
 
-            IF_DEBUG(sanity, checkRunQueue(cap));
-        }
+            // Or migrate it?
+            else {
+                appendToRunQueue(free_caps[i],t);
+                traceEventMigrateThread (cap, t, free_caps[i]->no);
 
-#ifdef SPARK_PUSHING
-        /* JB I left this code in place, it would work but is not necessary */
-
-        // If there are some free capabilities that we didn't push any
-        // threads to, then try to push a spark to each one.
-        if (!pushed_to_all) {
-            StgClosure *spark;
-            // i is the next free capability to push to
-            for (; i < n_free_caps; i++) {
-                if (emptySparkPoolCap(free_caps[i])) {
-                    spark = tryStealSpark(cap->sparks);
-                    if (spark != NULL) {
-                        /* TODO: if anyone wants to re-enable this code then
-                         * they must consider the fizzledSpark(spark) case
-                         * and update the per-cap spark statistics.
-                         */
-                        debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
-
-            traceEventStealSpark(free_caps[i], t, cap->no);
-
-                        newSpark(&(free_caps[i]->r), spark);
-                    }
-                }
+                if (t->bound) { t->bound->task->cap = free_caps[i]; }
+                t->cap = free_caps[i];
+                n--; // we have one fewer threads now
+                i++; // move on to the next free_cap
+                if (i == n_free_caps) i = 0;
             }
         }
-#endif /* SPARK_PUSHING */
+
+        // Join up the beginning of the queue (prev)
+        // with the rest of the queue (t)
+        if (t == END_TSO_QUEUE) {
+            cap->run_queue_tl = prev;
+        } else {
+            setTSOPrev(cap, t, prev);
+        }
+        if (prev == END_TSO_QUEUE) {
+            cap->run_queue_hd = t;
+        } else {
+            setTSOLink(cap, prev, t);
+        }
+        cap->n_run_queue = n;
+
+        IF_DEBUG(sanity, checkRunQueue(cap));
 
         // release the capabilities
         for (i = 0; i < n_free_caps; i++) {
@@ -1134,7 +1126,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
                                                // nursery has only one
                                                // block.
 
-            bd = allocGroup_lock(blocks);
+            bd = allocGroupOnNode_lock(cap->node,blocks);
             cap->r.rNursery->n_blocks += blocks;
 
             // link the new group after CurrentNursery
@@ -1416,25 +1408,30 @@ static void stopAllCapabilities (Capability **pCap, Task *task)
 
 #if defined(THREADED_RTS)
 static rtsBool requestSync (
-    Capability **pcap, Task *task, PendingSync *sync,
+    Capability **pcap, Task *task, PendingSync *new_sync,
     SyncType *prev_sync_type)
 {
-    PendingSync *prev_sync;
+    PendingSync *sync;
 
-    prev_sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
-                                  (StgWord)NULL,
-                                  (StgWord)sync);
+    sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
+                             (StgWord)NULL,
+                             (StgWord)new_sync);
 
-    if (prev_sync)
+    if (sync != NULL)
     {
+        // sync is valid until we have called yieldCapability().
+        // After the sync is completed, we cannot read that struct any
+        // more because it has been freed.
+        *prev_sync_type = sync->type;
         do {
             debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
-                       prev_sync->type);
+                       sync->type);
             ASSERT(*pcap);
             yieldCapability(pcap,task,rtsTrue);
-        } while (pending_sync);
+            sync = pending_sync;
+        } while (sync != NULL);
+
         // NOTE: task->cap might have changed now
-        *prev_sync_type = prev_sync->type;
         return rtsTrue;
     }
     else
@@ -1621,11 +1618,14 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
             if (was_syncing) {
                 stgFree(idle_cap);
             }
-            if (was_syncing && (prev_sync == SYNC_GC_SEQ ||
-                                prev_sync == SYNC_GC_PAR)) {
+            if (was_syncing &&
+                (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
+                !(sched_state == SCHED_INTERRUPTING && force_major)) {
                 // someone else had a pending sync request for a GC, so
                 // let's assume GC has been done and we don't need to GC
                 // again.
+                // Exception to this: if SCHED_INTERRUPTING, then we still
+                // need to do the final GC.
                 return;
             }
             if (sched_state == SCHED_SHUTTING_DOWN) {
@@ -1637,6 +1637,8 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
         } while (was_syncing);
     }
 
+    stat_startGCSync(gc_threads[cap->no]);
+
 #ifdef DEBUG
     unsigned int old_n_capabilities = n_capabilities;
 #endif
@@ -1858,8 +1860,6 @@ delete_threads_and_gc:
         }
         task->cap = cap;
     }
-
-    stgFree(idle_cap);
 #endif
 
     if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
@@ -1888,6 +1888,8 @@ delete_threads_and_gc:
 #endif
 
 #if defined(THREADED_RTS)
+    stgFree(idle_cap);
+
     if (gc_type == SYNC_GC_SEQ) {
         // release our stash of capabilities.
         releaseAllCapabilities(n_capabilities, cap, task);
@@ -2028,10 +2030,12 @@ forkProcess(HsStablePtr *entry
             // bound threads for which the corresponding Task does not
             // exist.
             truncateRunQueue(cap);
+            cap->n_run_queue = 0;
 
             // Any suspended C-calling Tasks are no more, their OS threads
             // don't exist now:
             cap->suspended_ccalls = NULL;
+            cap->n_suspended_ccalls = 0;
 
 #if defined(THREADED_RTS)
             // Wipe our spare workers list, they no longer exist.  New
@@ -2040,6 +2044,7 @@ forkProcess(HsStablePtr *entry
             cap->n_spare_workers = 0;
             cap->returning_tasks_hd = NULL;
             cap->returning_tasks_tl = NULL;
+            cap->n_returning_tasks = 0;
 #endif
 
             // Release all caps except 0, we'll use that for starting
@@ -2194,9 +2199,6 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
         n_capabilities = enabled_capabilities = new_n_capabilities;
     }
 
-    // Start worker tasks on the new Capabilities
-    startWorkerTasks(old_n_capabilities, new_n_capabilities);
-
     // We're done: release the original Capabilities
     releaseAllCapabilities(old_n_capabilities, cap,task);
 
@@ -2266,6 +2268,7 @@ suspendTask (Capability *cap, Task *task)
         cap->suspended_ccalls->prev = incall;
     }
     cap->suspended_ccalls = incall;
+    cap->n_suspended_ccalls++;
 }
 
 STATIC_INLINE void
@@ -2284,6 +2287,7 @@ recoverSuspendedTask (Capability *cap, Task *task)
         incall->next->prev = incall->prev;
     }
     incall->next = incall->prev = NULL;
+    cap->n_suspended_ccalls--;
 }
 
 /* ---------------------------------------------------------------------------
@@ -2782,7 +2786,7 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
     Capability *cap = regTableToCapability(reg);
     StgThunk *raise_closure = NULL;
     StgPtr p, next;
-    StgRetInfoTable *info;
+    const StgRetInfoTable *info;
     //
     // This closure represents the expression 'raise# E' where E
     // is the exception raise.  It is used to overwrite all the
@@ -2891,12 +2895,12 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
 StgWord
 findRetryFrameHelper (Capability *cap, StgTSO *tso)
 {
-  StgPtr           p, next;
-  StgRetInfoTable *info;
+  const StgRetInfoTable *info;
+  StgPtr    p, next;
 
   p = tso->stackobj->sp;
   while (1) {
-    info = get_ret_itbl((StgClosure *)p);
+    info = get_ret_itbl((const StgClosure *)p);
     next = p + stack_frame_sizeW((StgClosure *)p);
     switch (info->i.type) {