Tidy up tso->stackobj before calling threadStackUnderflow (#7636)
[ghc.git] / rts / Schedule.c
index cd704d2..5f48ef6 100644 (file)
@@ -27,6 +27,7 @@
 #include "ProfHeap.h"
 #include "Weak.h"
 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
+#include "sm/GCThread.h"
 #include "Sparks.h"
 #include "Capability.h"
 #include "Task.h"
@@ -40,6 +41,7 @@
 #include "Timer.h"
 #include "ThreadPaused.h"
 #include "Messages.h"
+#include "Stable.h"
 
 #ifdef HAVE_SYS_TYPES_H
 #include <sys/types.h>
@@ -96,13 +98,6 @@ volatile StgWord sched_state = SCHED_RUNNING;
 StgTSO dummy_tso;
 
 /*
- * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
- * in an MT setting, needed to signal that a worker thread shouldn't hang around
- * in the scheduler when it is out of work.
- */
-rtsBool shutting_down_scheduler = rtsFalse;
-
-/*
  * This mutex protects most of the global scheduler data in
  * the THREADED_RTS runtime.
  */
@@ -114,6 +109,11 @@ Mutex sched_mutex;
 #define FORKPROCESS_PRIMOP_SUPPORTED
 #endif
 
+// Local stats
+#ifdef THREADED_RTS
+static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
+#endif
+
 /* -----------------------------------------------------------------------------
  * static function prototypes
  * -------------------------------------------------------------------------- */
@@ -121,19 +121,25 @@ Mutex sched_mutex;
 static Capability *schedule (Capability *initialCapability, Task *task);
 
 //
-// These function all encapsulate parts of the scheduler loop, and are
+// These functions all encapsulate parts of the scheduler loop, and are
 // abstracted only to make the structure and control flow of the
 // scheduler clearer.
 //
 static void schedulePreLoop (void);
-static void scheduleFindWork (Capability *cap);
+static void scheduleFindWork (Capability **pcap);
 #if defined(THREADED_RTS)
 static void scheduleYield (Capability **pcap, Task *task);
 #endif
+#if defined(THREADED_RTS)
+static nat requestSync (Capability **pcap, Task *task, nat sync_type);
+static void acquireAllCapabilities(Capability *cap, Task *task);
+static void releaseAllCapabilities(Capability *cap, Task *task);
+static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
+#endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
-static void scheduleProcessInbox(Capability *cap);
-static void scheduleDetectDeadlock (Capability *cap, Task *task);
+static void scheduleProcessInbox(Capability **cap);
+static void scheduleDetectDeadlock (Capability **pcap, Task *task);
 static void schedulePushWork(Capability *cap, Task *task);
 #if defined(THREADED_RTS)
 static void scheduleActivateSpark(Capability *cap);
@@ -146,8 +152,7 @@ static void scheduleHandleThreadBlocked( StgTSO *t );
 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
                                             StgTSO *t );
 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
-static Capability *scheduleDoGC(Capability *cap, Task *task,
-                               rtsBool force_major);
+static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
 
 static void deleteThread (Capability *cap, StgTSO *tso);
 static void deleteAllThreads (Capability *cap);
@@ -168,28 +173,6 @@ static void deleteThread_(Capability *cap, StgTSO *tso);
       * thread ends
       * stack overflow
 
-   GRAN version:
-     In a GranSim setup this loop iterates over the global event queue.
-     This revolves around the global event queue, which determines what 
-     to do next. Therefore, it's more complicated than either the 
-     concurrent or the parallel (GUM) setup.
-  This version has been entirely removed (JB 2008/08).
-
-   GUM version:
-     GUM iterates over incoming messages.
-     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
-     and sends out a fish whenever it has nothing to do; in-between
-     doing the actual reductions (shared code below) it processes the
-     incoming messages and deals with delayed operations 
-     (see PendingFetches).
-     This is not the ugliest code you could imagine, but it's bloody close.
-
-  (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
-  now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
-  as well as future GUM versions. This file has been refurbished to
-  only contain valid code, which is however incomplete, refers to
-  invalid includes etc.
-
    ------------------------------------------------------------------------ */
 
 static Capability *
@@ -268,7 +251,7 @@ schedule (Capability *initialCapability, Task *task)
     case SCHED_INTERRUPTING:
        debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
         /* scheduleDoGC() deletes all the threads */
-       cap = scheduleDoGC(cap,task,rtsFalse);
+        scheduleDoGC(&cap,task,rtsFalse);
 
         // after scheduleDoGC(), we must be shutting down.  Either some
         // other Capability did the final GC, or we did it above,
@@ -290,17 +273,13 @@ schedule (Capability *initialCapability, Task *task)
        barf("sched_state: %d", sched_state);
     }
 
-    scheduleFindWork(cap);
+    scheduleFindWork(&cap);
 
     /* work pushing, currently relevant only for THREADED_RTS:
        (pushes threads, wakes up idle capabilities for stealing) */
     schedulePushWork(cap,task);
 
-    scheduleDetectDeadlock(cap,task);
-
-#if defined(THREADED_RTS)
-    cap = task->cap;    // reload cap, it might have changed
-#endif
+    scheduleDetectDeadlock(&cap,task);
 
     // Normally, the only way we can get here with no threads to
     // run is if a keyboard interrupt received during 
@@ -383,6 +362,26 @@ schedule (Capability *initialCapability, Task *task)
         deleteThread(cap,t);
     }
 
+    // If this capability is disabled, migrate the thread away rather
+    // than running it.  NB. but not if the thread is bound: it is
+    // really hard for a bound thread to migrate itself.  Believe me,
+    // I tried several ways and couldn't find a way to do it.
+    // Instead, when everything is stopped for GC, we migrate all the
+    // threads on the run queue then (see scheduleDoGC()).
+    //
+    // ToDo: what about TSO_LOCKED?  Currently we're migrating those
+    // when the number of capabilities drops, but we never migrate
+    // them back if it rises again.  Presumably we should, but after
+    // the thread has been migrated we no longer know what capability
+    // it was originally on.
+#ifdef THREADED_RTS
+    if (cap->disabled && !t->bound) {
+        Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
+        migrateThread(cap, t, dest_cap);
+        continue;
+    }
+#endif
+
     /* context switches are initiated by the timer signal, unless
      * the user specified "context switch as often as possible", with
      * +RTS -C0
@@ -419,27 +418,34 @@ run_thread:
     cap->interrupt = 0;
 
     cap->in_haskell = rtsTrue;
+    cap->idle = 0;
 
     dirty_TSO(cap,t);
     dirty_STACK(cap,t->stackobj);
 
-#if defined(THREADED_RTS)
-    if (recent_activity == ACTIVITY_DONE_GC) {
+    switch (recent_activity)
+    {
+    case ACTIVITY_DONE_GC: {
         // ACTIVITY_DONE_GC means we turned off the timer signal to
         // conserve power (see #1623).  Re-enable it here.
         nat prev;
         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
         if (prev == ACTIVITY_DONE_GC) {
+#ifndef PROFILING
             startTimer();
+#endif
         }
-    } else if (recent_activity != ACTIVITY_INACTIVE) {
+        break;
+    }
+    case ACTIVITY_INACTIVE:
         // If we reached ACTIVITY_INACTIVE, then don't reset it until
         // we've done the GC.  The thread running here might just be
         // the IO manager thread that handle_tick() woke up via
         // wakeUpRts().
+        break;
+    default:
         recent_activity = ACTIVITY_YES;
     }
-#endif
 
     traceEventRunThread(cap, t);
 
@@ -544,7 +550,7 @@ run_thread:
     }
 
     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
-      cap = scheduleDoGC(cap,task,rtsFalse);
+      scheduleDoGC(&cap,task,rtsFalse);
     }
   } /* end of while() */
 }
@@ -573,6 +579,13 @@ removeFromRunQueue (Capability *cap, StgTSO *tso)
     IF_DEBUG(sanity, checkRunQueue(cap));
 }
 
+void
+promoteInRunQueue (Capability *cap, StgTSO *tso)
+{
+    removeFromRunQueue(cap, tso);
+    pushOnRunQueue(cap, tso);
+}
+
 /* ----------------------------------------------------------------------------
  * Setting up the scheduler loop
  * ------------------------------------------------------------------------- */
@@ -582,7 +595,7 @@ schedulePreLoop(void)
 {
   // initialisation for scheduler - what cannot go into initScheduler()  
 
-#if defined(mingw32_HOST_OS)
+#if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
     win32AllocStack();
 #endif
 }
@@ -594,34 +607,43 @@ schedulePreLoop(void)
  * -------------------------------------------------------------------------- */
 
 static void
-scheduleFindWork (Capability *cap)
+scheduleFindWork (Capability **pcap)
 {
-    scheduleStartSignalHandlers(cap);
+    scheduleStartSignalHandlers(*pcap);
 
-    scheduleProcessInbox(cap);
+    scheduleProcessInbox(pcap);
 
-    scheduleCheckBlockedThreads(cap);
+    scheduleCheckBlockedThreads(*pcap);
 
 #if defined(THREADED_RTS)
-    if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
+    if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
 #endif
 }
 
 #if defined(THREADED_RTS)
 STATIC_INLINE rtsBool
-shouldYieldCapability (Capability *cap, Task *task)
+shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
 {
     // we need to yield this capability to someone else if..
-    //   - another thread is initiating a GC
+    //   - another thread is initiating a GC, and we didn't just do a GC
+    //     (see Note [GC livelock])
     //   - another Task is returning from a foreign call
     //   - the thread at the head of the run queue cannot be run
     //     by this Task (it is bound to another Task, or it is unbound
     //     and this task it bound).
-    return (waiting_for_gc || 
+    //
+    // Note [GC livelock]
+    //
+    // If we are interrupted to do a GC, then we do not immediately do
+    // another one.  This avoids a starvation situation where one
+    // Capability keeps forcing a GC and the other Capabilities make no
+    // progress at all.
+
+    return ((pending_sync && !didGcLast) ||
             cap->returning_tasks_hd != NULL ||
             (!emptyRunQueue(cap) && (task->incall->tso == NULL
-                                     ? cap->run_queue_hd->bound != NULL
-                                     : cap->run_queue_hd->bound != task->incall)));
+                                     ? peekRunQueue(cap)->bound != NULL
+                                     : peekRunQueue(cap)->bound != task->incall)));
 }
 
 // This is the single place where a Task goes to sleep.  There are
@@ -638,20 +660,22 @@ static void
 scheduleYield (Capability **pcap, Task *task)
 {
     Capability *cap = *pcap;
+    int didGcLast = rtsFalse;
 
     // if we have work, and we don't need to give up the Capability, continue.
     //
-    if (!shouldYieldCapability(cap,task) && 
+    if (!shouldYieldCapability(cap,task,rtsFalse) && 
         (!emptyRunQueue(cap) ||
          !emptyInbox(cap) ||
-         sched_state >= SCHED_INTERRUPTING))
+         sched_state >= SCHED_INTERRUPTING)) {
         return;
+    }
 
     // otherwise yield (sleep), and keep yielding if necessary.
     do {
-        yieldCapability(&cap,task);
+        didGcLast = yieldCapability(&cap,task, !didGcLast);
     } 
-    while (shouldYieldCapability(cap,task));
+    while (shouldYieldCapability(cap,task,didGcLast));
 
     // note there may still be no threads on the run queue at this
     // point, the caller has to check.
@@ -683,20 +707,20 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
 
     // Check whether we have more threads on our run queue, or sparks
     // in our pool, that we could hand to another Capability.
-    if (cap->run_queue_hd == END_TSO_QUEUE) {
+    if (emptyRunQueue(cap)) {
         if (sparkPoolSizeCap(cap) < 2) return;
     } else {
-        if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
+        if (singletonRunQueue(cap) &&
             sparkPoolSizeCap(cap) < 1) return;
     }
 
     // First grab as many free Capabilities as we can.
     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
        cap0 = &capabilities[i];
-       if (cap != cap0 && tryGrabCapability(cap0,task)) {
+        if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
            if (!emptyRunQueue(cap0)
-                || cap->returning_tasks_hd != NULL
-                || cap->inbox != (Message*)END_TSO_QUEUE) {
+                || cap0->returning_tasks_hd != NULL
+                || cap0->inbox != (Message*)END_TSO_QUEUE) {
                // it already has some work, we just grabbed it at 
                // the wrong moment.  Or maybe it's deadlocked!
                releaseCapability(cap0);
@@ -726,7 +750,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
        debugTrace(DEBUG_sched, 
                   "cap %d: %s and %d free capabilities, sharing...", 
                   cap->no, 
-                  (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
+                  (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
                   "excess threads on run queue":"sparks to share (>=2)",
                   n_free_caps);
 
@@ -855,9 +879,10 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
  * ------------------------------------------------------------------------- */
 
 static void
-scheduleDetectDeadlock (Capability *cap, Task *task)
+scheduleDetectDeadlock (Capability **pcap, Task *task)
 {
-    /* 
+    Capability *cap = *pcap;
+    /*
      * Detect deadlock: when we have no threads to run, there are no
      * threads blocked, waiting for I/O, or sleeping, and all the
      * other tasks are waiting for work, we must have a deadlock of
@@ -882,7 +907,8 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
        // they are unreachable and will therefore be sent an
        // exception.  Any threads thus released will be immediately
        // runnable.
-       cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
+        scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
+        cap = *pcap;
         // when force_major == rtsTrue. scheduleDoGC sets
         // recent_activity to ACTIVITY_DONE_GC and turns off the timer
         // signal.
@@ -962,16 +988,18 @@ scheduleSendPendingMessages(void)
  * ------------------------------------------------------------------------- */
 
 static void
-scheduleProcessInbox (Capability *cap USED_IF_THREADS)
+scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
     Message *m, *next;
     int r;
+    Capability *cap = *pcap;
 
     while (!emptyInbox(cap)) {
         if (cap->r.rCurrentNursery->link == NULL ||
             g0->n_new_large_words >= large_alloc_lim) {
-            scheduleDoGC(cap, cap->running_task, rtsFalse);
+            scheduleDoGC(pcap, cap->running_task, rtsFalse);
+            cap = *pcap;
         }
 
         // don't use a blocking acquire; if the lock is held by
@@ -1009,7 +1037,7 @@ scheduleProcessInbox (Capability *cap USED_IF_THREADS)
 static void
 scheduleActivateSpark(Capability *cap)
 {
-    if (anySparks())
+    if (anySparks() && !cap->disabled)
     {
         createSparkThread(cap);
         debugTrace(DEBUG_sched, "creating a spark thread");
@@ -1035,7 +1063,7 @@ schedulePostRunThread (Capability *cap, StgTSO *t)
     // and a is never equal to b given a consistent view of memory.
     //
     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
-        if (!stmValidateNestOfTransactions (t -> trec)) {
+        if (!stmValidateNestOfTransactions(cap, t -> trec)) {
             debugTrace(DEBUG_sched | DEBUG_stm,
                        "trec %p found wasting its time", t);
             
@@ -1063,9 +1091,9 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
     if (cap->r.rHpAlloc > BLOCK_SIZE) {
        // if so, get one and push it on the front of the nursery.
        bdescr *bd;
-       lnat blocks;
+       W_ blocks;
        
-       blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
+       blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
        
         if (blocks > BLOCKS_PER_MBLOCK) {
             barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
@@ -1319,36 +1347,124 @@ scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
 }
 
 /* -----------------------------------------------------------------------------
+ * Start a synchronisation of all capabilities
+ * -------------------------------------------------------------------------- */
+
+// Returns:
+//    0      if we successfully got a sync
+//    non-0  if there was another sync request in progress,
+//           and we yielded to it.  The value returned is the
+//           type of the other sync request.
+//
+#if defined(THREADED_RTS)
+static nat requestSync (Capability **pcap, Task *task, nat sync_type)
+{
+    nat prev_pending_sync;
+
+    prev_pending_sync = cas(&pending_sync, 0, sync_type);
+
+    if (prev_pending_sync)
+    {
+       do {
+            debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
+                       prev_pending_sync);
+            ASSERT(*pcap);
+            yieldCapability(pcap,task,rtsTrue);
+        } while (pending_sync);
+        return prev_pending_sync; // NOTE: task->cap might have changed now
+    }
+    else
+    {
+        return 0;
+    }
+}
+
+//
+// Grab all the capabilities except the one we already hold.  Used
+// when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
+// before a fork (SYNC_OTHER).
+//
+// Only call this after requestSync(), otherwise a deadlock might
+// ensue if another thread is trying to synchronise.
+//
+static void acquireAllCapabilities(Capability *cap, Task *task)
+{
+    Capability *tmpcap;
+    nat i;
+
+    for (i=0; i < n_capabilities; i++) {
+        debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
+        tmpcap = &capabilities[i];
+        if (tmpcap != cap) {
+            // we better hope this task doesn't get migrated to
+            // another Capability while we're waiting for this one.
+            // It won't, because load balancing happens while we have
+            // all the Capabilities, but even so it's a slightly
+            // unsavoury invariant.
+            task->cap = tmpcap;
+            waitForReturnCapability(&tmpcap, task);
+            if (tmpcap->no != i) {
+                barf("acquireAllCapabilities: got the wrong capability");
+            }
+        }
+    }
+    task->cap = cap;
+}
+
+static void releaseAllCapabilities(Capability *cap, Task *task)
+{
+    nat i;
+
+    for (i = 0; i < n_capabilities; i++) {
+        if (cap->no != i) {
+            task->cap = &capabilities[i];
+            releaseCapability(&capabilities[i]);
+        }
+    }
+    task->cap = cap;
+}
+#endif
+
+/* -----------------------------------------------------------------------------
  * Perform a garbage collection if necessary
  * -------------------------------------------------------------------------- */
 
-static Capability *
-scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
+static void
+scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
+              rtsBool force_major)
 {
+    Capability *cap = *pcap;
     rtsBool heap_census;
+    nat collect_gen;
 #ifdef THREADED_RTS
-    /* extern static volatile StgWord waiting_for_gc; 
-       lives inside capability.c */
-    rtsBool gc_type, prev_pending_gc;
-    nat i;
+    rtsBool idle_cap[n_capabilities];
+    rtsBool gc_type;
+    nat i, sync;
+    StgTSO *tso;
 #endif
 
     if (sched_state == SCHED_SHUTTING_DOWN) {
         // The final GC has already been done, and the system is
         // shutting down.  We'll probably deadlock if we try to GC
         // now.
-        return cap;
+        return;
     }
 
+    heap_census = scheduleNeedHeapProfile(rtsTrue);
+
+    // Figure out which generation we are collecting, so that we can
+    // decide whether this is a parallel GC or not.
+    collect_gen = calcNeeded(force_major || heap_census, NULL);
+
 #ifdef THREADED_RTS
     if (sched_state < SCHED_INTERRUPTING
         && RtsFlags.ParFlags.parGcEnabled
-        && N >= RtsFlags.ParFlags.parGcGen
+        && collect_gen >= RtsFlags.ParFlags.parGcGen
         && ! oldest_gen->mark)
     {
-        gc_type = PENDING_GC_PAR;
+        gc_type = SYNC_GC_PAR;
     } else {
-        gc_type = PENDING_GC_SEQ;
+        gc_type = SYNC_GC_SEQ;
     }
 
     // In order to GC, there must be no threads running Haskell code.
@@ -1363,26 +1479,32 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     //
 
     /*  Other capabilities are prevented from running yet more Haskell
-       threads if waiting_for_gc is set. Tested inside
+        threads if pending_sync is set. Tested inside
        yieldCapability() and releaseCapability() in Capability.c */
 
-    prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
-    if (prev_pending_gc) {
-       do {
-           debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", 
-                       prev_pending_gc);
-            ASSERT(cap);
-            yieldCapability(&cap,task);
-       } while (waiting_for_gc);
-       return cap;  // NOTE: task->cap might have changed here
-    }
+    do {
+        sync = requestSync(pcap, task, gc_type);
+        cap = *pcap;
+        if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
+            // someone else had a pending sync request for a GC, so
+            // let's assume GC has been done and we don't need to GC
+            // again.
+            return;
+        }
+        if (sched_state == SCHED_SHUTTING_DOWN) {
+            // The scheduler might now be shutting down.  We tested
+            // this above, but it might have become true since then as
+            // we yielded the capability in requestSync().
+            return;
+        }
+    } while (sync);
 
     interruptAllCapabilities();
 
     // The final shutdown GC is always single-threaded, because it's
     // possible that some of the Capabilities have no worker threads.
     
-    if (gc_type == PENDING_GC_SEQ)
+    if (gc_type == SYNC_GC_SEQ)
     {
         traceEventRequestSeqGc(cap);
     }
@@ -1392,30 +1514,65 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
     }
 
-    if (gc_type == PENDING_GC_SEQ)
+    if (gc_type == SYNC_GC_SEQ)
     {
         // single-threaded GC: grab all the capabilities
-        for (i=0; i < n_capabilities; i++) {
-            debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
-            if (cap != &capabilities[i]) {
-                Capability *pcap = &capabilities[i];
-                // we better hope this task doesn't get migrated to
-                // another Capability while we're waiting for this one.
-                // It won't, because load balancing happens while we have
-                // all the Capabilities, but even so it's a slightly
-                // unsavoury invariant.
-                task->cap = pcap;
-                waitForReturnCapability(&pcap, task);
-                if (pcap != &capabilities[i]) {
-                    barf("scheduleDoGC: got the wrong capability");
-                }
-            }
-        }
+        acquireAllCapabilities(cap,task);
     }
     else
     {
-        // multi-threaded GC: make sure all the Capabilities donate one
-        // GC thread each.
+        // If we are load-balancing collections in this
+        // generation, then we require all GC threads to participate
+        // in the collection.  Otherwise, we only require active
+        // threads to participate, and we set gc_threads[i]->idle for
+        // any idle capabilities.  The rationale here is that waking
+        // up an idle Capability takes much longer than just doing any
+        // GC work on its behalf.
+
+        if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
+            || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
+                collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
+            for (i=0; i < n_capabilities; i++) {
+                if (capabilities[i].disabled) {
+                    idle_cap[i] = tryGrabCapability(&capabilities[i], task);
+                } else {
+                    idle_cap[i] = rtsFalse;
+                }
+            }
+        } else {
+            for (i=0; i < n_capabilities; i++) {
+                if (capabilities[i].disabled) {
+                    idle_cap[i] = tryGrabCapability(&capabilities[i], task);
+                } else if (i == cap->no ||
+                           capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
+                    idle_cap[i] = rtsFalse;
+                } else {
+                    idle_cap[i] = tryGrabCapability(&capabilities[i], task);
+                    if (!idle_cap[i]) {
+                        n_failed_trygrab_idles++;
+                    } else {
+                        n_idle_caps++;
+                    }
+                }
+            }
+        }
+
+        // We set the gc_thread[i]->idle flag if that
+        // capability/thread is not participating in this collection.
+        // We also keep a local record of which capabilities are idle
+        // in idle_cap[], because scheduleDoGC() is re-entrant:
+        // another thread might start a GC as soon as we've finished
+        // this one, and thus the gc_thread[]->idle flags are invalid
+        // as soon as we release any threads after GC.  Getting this
+        // wrong leads to a rare and hard to debug deadlock!
+
+        for (i=0; i < n_capabilities; i++) {
+            gc_threads[i]->idle = idle_cap[i];
+            capabilities[i].idle++;
+        }
+
+        // For all capabilities participating in this GC, wait until
+        // they have stopped mutating and are standing by for GC.
         waitForGcThreads(cap);
         
 #if defined(THREADED_RTS)
@@ -1451,36 +1608,71 @@ delete_threads_and_gc:
         sched_state = SCHED_SHUTTING_DOWN;
     }
     
-    heap_census = scheduleNeedHeapProfile(rtsTrue);
+    /*
+     * When there are disabled capabilities, we want to migrate any
+     * threads away from them.  Normally this happens in the
+     * scheduler's loop, but only for unbound threads - it's really
+     * hard for a bound thread to migrate itself.  So we have another
+     * go here.
+     */
+#if defined(THREADED_RTS)
+    for (i = enabled_capabilities; i < n_capabilities; i++) {
+        Capability *tmp_cap, *dest_cap;
+        tmp_cap = &capabilities[i];
+        ASSERT(tmp_cap->disabled);
+        if (i != cap->no) {
+            dest_cap = &capabilities[i % enabled_capabilities];
+            while (!emptyRunQueue(tmp_cap)) {
+                tso = popRunQueue(tmp_cap);
+                migrateThread(tmp_cap, tso, dest_cap);
+                if (tso->bound) {
+                  traceTaskMigrate(tso->bound->task,
+                                   tso->bound->task->cap,
+                                   dest_cap);
+                  tso->bound->task->cap = dest_cap;
+                }
+            }
+        }
+    }
+#endif
 
-    traceEventGcStart(cap);
 #if defined(THREADED_RTS)
-    // reset waiting_for_gc *before* GC, so that when the GC threads
+    // reset pending_sync *before* GC, so that when the GC threads
     // emerge they don't immediately re-enter the GC.
-    waiting_for_gc = 0;
-    GarbageCollect(force_major || heap_census, heap_census, gc_type, cap);
+    pending_sync = 0;
+    GarbageCollect(collect_gen, heap_census, gc_type, cap);
 #else
-    GarbageCollect(force_major || heap_census, heap_census, 0, cap);
+    GarbageCollect(collect_gen, heap_census, 0, cap);
 #endif
-    traceEventGcEnd(cap);
 
     traceSparkCounters(cap);
 
-    if (recent_activity == ACTIVITY_INACTIVE && force_major)
-    {
-        // We are doing a GC because the system has been idle for a
-        // timeslice and we need to check for deadlock.  Record the
-        // fact that we've done a GC and turn off the timer signal;
-        // it will get re-enabled if we run any threads after the GC.
-        recent_activity = ACTIVITY_DONE_GC;
-        stopTimer();
-    }
-    else
-    {
+    switch (recent_activity) {
+    case ACTIVITY_INACTIVE:
+        if (force_major) {
+            // We are doing a GC because the system has been idle for a
+            // timeslice and we need to check for deadlock.  Record the
+            // fact that we've done a GC and turn off the timer signal;
+            // it will get re-enabled if we run any threads after the GC.
+            recent_activity = ACTIVITY_DONE_GC;
+#ifndef PROFILING
+            stopTimer();
+#endif
+            break;
+        }
+        // fall through...
+
+    case ACTIVITY_MAYBE_NO:
         // the GC might have taken long enough for the timer to set
-        // recent_activity = ACTIVITY_INACTIVE, but we aren't
-        // necessarily deadlocked:
+        // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
+        // but we aren't necessarily deadlocked:
         recent_activity = ACTIVITY_YES;
+        break;
+
+    case ACTIVITY_DONE_GC:
+        // If we are actually active, the scheduler will reset the
+        // recent_activity flag and re-enable the timer.
+        break;
     }
 
 #if defined(THREADED_RTS)
@@ -1494,9 +1686,21 @@ delete_threads_and_gc:
     }
 
 #if defined(THREADED_RTS)
-    if (gc_type == PENDING_GC_PAR)
+    if (gc_type == SYNC_GC_PAR)
     {
         releaseGCThreads(cap);
+        for (i = 0; i < n_capabilities; i++) {
+            if (i != cap->no) {
+                if (idle_cap[i]) {
+                    ASSERT(capabilities[i].running_task == task);
+                    task->cap = &capabilities[i];
+                    releaseCapability(&capabilities[i]);
+                } else {
+                    ASSERT(capabilities[i].running_task != task);
+                }
+            }
+        }
+        task->cap = cap;
     }
 #endif
 
@@ -1526,23 +1730,13 @@ delete_threads_and_gc:
 #endif
 
 #if defined(THREADED_RTS)
-    if (gc_type == PENDING_GC_SEQ) {
+    if (gc_type == SYNC_GC_SEQ) {
         // release our stash of capabilities.
-        for (i = 0; i < n_capabilities; i++) {
-            if (cap != &capabilities[i]) {
-                task->cap = &capabilities[i];
-                releaseCapability(&capabilities[i]);
-            }
-        }
-    }
-    if (cap) {
-       task->cap = cap;
-    } else {
-       task->cap = NULL;
+        releaseAllCapabilities(cap, task);
     }
 #endif
 
-    return cap;
+    return;
 }
 
 /* ---------------------------------------------------------------------------
@@ -1561,26 +1755,41 @@ forkProcess(HsStablePtr *entry
     StgTSO* t,*next;
     Capability *cap;
     nat g;
-    
-#if defined(THREADED_RTS)
-    if (RtsFlags.ParFlags.nNodes > 1) {
-       errorBelch("forking not supported with +RTS -N<n> greater than 1");
-       stg_exit(EXIT_FAILURE);
-    }
+    Task *task = NULL;
+    nat i;
+#ifdef THREADED_RTS
+    nat sync;
 #endif
 
     debugTrace(DEBUG_sched, "forking!");
     
-    // ToDo: for SMP, we should probably acquire *all* the capabilities
-    cap = rts_lock();
-    
+    task = newBoundTask();
+
+    cap = NULL;
+    waitForReturnCapability(&cap, task);
+
+#ifdef THREADED_RTS
+    do {
+        sync = requestSync(&cap, task, SYNC_OTHER);
+    } while (sync);
+
+    acquireAllCapabilities(cap,task);
+
+    pending_sync = 0;
+#endif
+
     // no funny business: hold locks while we fork, otherwise if some
     // other thread is holding a lock when the fork happens, the data
     // structure protected by the lock will forever be in an
     // inconsistent state in the child.  See also #1391.
     ACQUIRE_LOCK(&sched_mutex);
-    ACQUIRE_LOCK(&cap->lock);
-    ACQUIRE_LOCK(&cap->running_task->lock);
+    ACQUIRE_LOCK(&sm_mutex);
+    ACQUIRE_LOCK(&stable_mutex);
+    ACQUIRE_LOCK(&task->lock);
+
+    for (i=0; i < n_capabilities; i++) {
+        ACQUIRE_LOCK(&capabilities[i].lock);
+    }
 
     stopTimer(); // See #4074
 
@@ -1595,19 +1804,30 @@ forkProcess(HsStablePtr *entry
         startTimer(); // #4074
 
         RELEASE_LOCK(&sched_mutex);
-        RELEASE_LOCK(&cap->lock);
-        RELEASE_LOCK(&cap->running_task->lock);
+        RELEASE_LOCK(&sm_mutex);
+        RELEASE_LOCK(&stable_mutex);
+        RELEASE_LOCK(&task->lock);
+
+        for (i=0; i < n_capabilities; i++) {
+            releaseCapability_(&capabilities[i],rtsFalse);
+            RELEASE_LOCK(&capabilities[i].lock);
+        }
+        boundTaskExiting(task);
 
        // just return the pid
-       rts_unlock(cap);
-       return pid;
+        return pid;
        
     } else { // child
        
 #if defined(THREADED_RTS)
         initMutex(&sched_mutex);
-        initMutex(&cap->lock);
-        initMutex(&cap->running_task->lock);
+        initMutex(&sm_mutex);
+        initMutex(&stable_mutex);
+        initMutex(&task->lock);
+
+        for (i=0; i < n_capabilities; i++) {
+            initMutex(&capabilities[i].lock);
+        }
 #endif
 
 #ifdef TRACING
@@ -1626,7 +1846,7 @@ forkProcess(HsStablePtr *entry
                // don't allow threads to catch the ThreadKilled
                // exception, but we do want to raiseAsync() because these
                // threads may be evaluating thunks that we need later.
-               deleteThread_(cap,t);
+                deleteThread_(t->cap,t);
 
                 // stop the GC from updating the InCall to point to
                 // the TSO.  This is only necessary because the
@@ -1637,44 +1857,61 @@ forkProcess(HsStablePtr *entry
           }
        }
        
-       // Empty the run queue.  It seems tempting to let all the
-       // killed threads stay on the run queue as zombies to be
-       // cleaned up later, but some of them correspond to bound
-       // threads for which the corresponding Task does not exist.
-       cap->run_queue_hd = END_TSO_QUEUE;
-       cap->run_queue_tl = END_TSO_QUEUE;
-
-       // Any suspended C-calling Tasks are no more, their OS threads
-       // don't exist now:
-       cap->suspended_ccalls = NULL;
-
-       // Empty the threads lists.  Otherwise, the garbage
+        discardTasksExcept(task);
+
+        for (i=0; i < n_capabilities; i++) {
+            cap = &capabilities[i];
+
+            // Empty the run queue.  It seems tempting to let all the
+            // killed threads stay on the run queue as zombies to be
+            // cleaned up later, but some of them may correspond to
+            // bound threads for which the corresponding Task does not
+            // exist.
+            truncateRunQueue(cap);
+
+            // Any suspended C-calling Tasks are no more, their OS threads
+            // don't exist now:
+            cap->suspended_ccalls = NULL;
+
+#if defined(THREADED_RTS)
+            // Wipe our spare workers list, they no longer exist.  New
+            // workers will be created if necessary.
+            cap->spare_workers = NULL;
+            cap->n_spare_workers = 0;
+            cap->returning_tasks_hd = NULL;
+            cap->returning_tasks_tl = NULL;
+#endif
+
+            // Release all caps except 0, we'll use that for starting
+            // the IO manager and running the client action below.
+            if (cap->no != 0) {
+                task->cap = cap;
+                releaseCapability(cap);
+            }
+        }
+        cap = &capabilities[0];
+        task->cap = cap;
+
+        // Empty the threads lists.  Otherwise, the garbage
        // collector may attempt to resurrect some of these threads.
         for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
             generations[g].threads = END_TSO_QUEUE;
         }
 
-        discardTasksExcept(cap->running_task);
-
-#if defined(THREADED_RTS)
-       // Wipe our spare workers list, they no longer exist.  New
-       // workers will be created if necessary.
-       cap->spare_workers = NULL;
-        cap->n_spare_workers = 0;
-        cap->returning_tasks_hd = NULL;
-       cap->returning_tasks_tl = NULL;
-#endif
-
         // On Unix, all timers are reset in the child, so we need to start
         // the timer again.
         initTimer();
         startTimer();
 
+        // TODO: need to trace various other things in the child
+        // like startup event, capabilities, process info etc
+        traceTaskCreate(task, cap);
+
 #if defined(THREADED_RTS)
-        cap = ioManagerStartCap(cap);
+        ioManagerStartCap(&cap);
 #endif
 
-       cap = rts_evalStableIO(cap, entry, NULL);  // run the action
+        rts_evalStableIO(&cap, entry, NULL);  // run the action
        rts_checkSchedStatus("forkProcess",cap);
        
        rts_unlock(cap);
@@ -1687,6 +1924,157 @@ forkProcess(HsStablePtr *entry
 }
 
 /* ---------------------------------------------------------------------------
+ * Changing the number of Capabilities
+ *
+ * Changing the number of Capabilities is very tricky!  We can only do
+ * it with the system fully stopped, so we do a full sync with
+ * requestSync(SYNC_OTHER) and grab all the capabilities.
+ *
+ * Then we resize the appropriate data structures, and update all
+ * references to the old data structures which have now moved.
+ * Finally we release the Capabilities we are holding, and start
+ * worker Tasks on the new Capabilities we created.
+ *
+ * ------------------------------------------------------------------------- */
+   
+void
+setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
+{
+#if !defined(THREADED_RTS)
+    if (new_n_capabilities != 1) {
+        errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
+    }
+    return;
+#elif defined(NOSMP)
+    if (new_n_capabilities != 1) {
+        errorBelch("setNumCapabilities: not supported on this platform");
+    }
+    return;
+#else
+    Task *task;
+    Capability *cap;
+    nat sync;
+    StgTSO* t;
+    nat g, n;
+    Capability *old_capabilities = NULL;
+
+    if (new_n_capabilities == enabled_capabilities) return;
+
+    debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
+               enabled_capabilities, new_n_capabilities);
+    
+    cap = rts_lock();
+    task = cap->running_task;
+
+    do {
+        sync = requestSync(&cap, task, SYNC_OTHER);
+    } while (sync);
+
+    acquireAllCapabilities(cap,task);
+
+    pending_sync = 0;
+
+    if (new_n_capabilities < enabled_capabilities)
+    {
+        // Reducing the number of capabilities: we do not actually
+        // remove the extra capabilities, we just mark them as
+        // "disabled". This has the following effects:
+        //
+        //   - threads on a disabled capability are migrated away by the
+        //     scheduler loop
+        //
+        //   - disabled capabilities do not participate in GC
+        //     (see scheduleDoGC())
+        //
+        //   - No spark threads are created on this capability
+        //     (see scheduleActivateSpark())
+        //
+        //   - We do not attempt to migrate threads *to* a disabled
+        //     capability (see schedulePushWork()).
+        //
+        // but in other respects, a disabled capability remains
+        // alive.  Threads may be woken up on a disabled capability,
+        // but they will be immediately migrated away.
+        //
+        // This approach is much easier than trying to actually remove
+        // the capability; we don't have to worry about GC data
+        // structures, the nursery, etc.
+        //
+        for (n = new_n_capabilities; n < enabled_capabilities; n++) {
+            capabilities[n].disabled = rtsTrue;
+            traceCapDisable(&capabilities[n]);
+        }
+        enabled_capabilities = new_n_capabilities;
+    }
+    else
+    {
+        // Increasing the number of enabled capabilities.
+        //
+        // enable any disabled capabilities, up to the required number
+        for (n = enabled_capabilities;
+             n < new_n_capabilities && n < n_capabilities; n++) {
+            capabilities[n].disabled = rtsFalse;
+            traceCapEnable(&capabilities[n]);
+        }
+        enabled_capabilities = n;
+
+        if (new_n_capabilities > n_capabilities) {
+#if defined(TRACING)
+            // Allocate eventlog buffers for the new capabilities.  Note this
+            // must be done before calling moreCapabilities(), because that
+            // will emit events about creating the new capabilities and adding
+            // them to existing capsets.
+            tracingAddCapapilities(n_capabilities, new_n_capabilities);
+#endif
+
+            // Resize the capabilities array
+            // NB. after this, capabilities points somewhere new.  Any pointers
+            // of type (Capability *) are now invalid.
+            old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
+
+            // update our own cap pointer
+            cap = &capabilities[cap->no];
+
+            // Resize and update storage manager data structures
+            storageAddCapabilities(n_capabilities, new_n_capabilities);
+
+            // Update (Capability *) refs in the Task manager.
+            updateCapabilityRefs();
+
+            // Update (Capability *) refs from TSOs
+            for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+                for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
+                    t->cap = &capabilities[t->cap->no];
+                }
+            }
+        }
+    }
+
+    // We're done: release the original Capabilities
+    releaseAllCapabilities(cap,task);
+
+    // Start worker tasks on the new Capabilities
+    startWorkerTasks(n_capabilities, new_n_capabilities);
+
+    // finally, update n_capabilities
+    if (new_n_capabilities > n_capabilities) {
+        n_capabilities = enabled_capabilities = new_n_capabilities;
+    }
+
+    // We can't free the old array until now, because we access it
+    // while updating pointers in updateCapabilityRefs().
+    if (old_capabilities) {
+        stgFree(old_capabilities);
+    }
+
+    rts_unlock(cap);
+
+#endif // THREADED_RTS
+}
+
+
+
+/* ---------------------------------------------------------------------------
  * Delete all the threads in the system
  * ------------------------------------------------------------------------- */
    
@@ -1917,7 +2305,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
                              // move this thread from now on.
 #if defined(THREADED_RTS)
-    cpu %= RtsFlags.ParFlags.nNodes;
+    cpu %= enabled_capabilities;
     if (cpu == cap->no) {
        appendToRunQueue(cap,tso);
     } else {
@@ -1928,11 +2316,14 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
 #endif
 }
 
-Capability *
-scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
+void
+scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
 {
     Task *task;
     DEBUG_ONLY( StgThreadID id );
+    Capability *cap;
+
+    cap = *pcap;
 
     // We already created/initialised the Task
     task = cap->running_task;
@@ -1957,7 +2348,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
 
     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
-    return cap;
+    *pcap = cap;
 }
 
 /* ----------------------------------------------------------------------------
@@ -1990,6 +2381,26 @@ void scheduleWorker (Capability *cap, Task *task)
 #endif
 
 /* ---------------------------------------------------------------------------
+ * Start new worker tasks on Capabilities from--to
+ * -------------------------------------------------------------------------- */
+
+static void
+startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+    nat i;
+    Capability *cap;
+
+    for (i = from; i < to; i++) {
+        cap = &capabilities[i];
+        ACQUIRE_LOCK(&cap->lock);
+        startWorkerTask(cap);
+        RELEASE_LOCK(&cap->lock);
+    }
+#endif
+}
+
+/* ---------------------------------------------------------------------------
  * initScheduler()
  *
  * Initialise the scheduler.  This resets all the queues - if the
@@ -2026,26 +2437,16 @@ initScheduler(void)
 
   initTaskManager();
 
-  RELEASE_LOCK(&sched_mutex);
-
-#if defined(THREADED_RTS)
   /*
    * Eagerly start one worker to run each Capability, except for
    * Capability 0.  The idea is that we're probably going to start a
    * bound thread on Capability 0 pretty soon, so we don't want a
    * worker task hogging it.
    */
-  { 
-      nat i;
-      Capability *cap;
-      for (i = 1; i < n_capabilities; i++) {
-         cap = &capabilities[i];
-         ACQUIRE_LOCK(&cap->lock);
-         startWorkerTask(cap);
-         RELEASE_LOCK(&cap->lock);
-      }
-  }
-#endif
+  startWorkerTasks(1, n_capabilities);
+
+  RELEASE_LOCK(&sched_mutex);
+
 }
 
 void
@@ -2059,15 +2460,19 @@ exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
     // If we haven't killed all the threads yet, do it now.
     if (sched_state < SCHED_SHUTTING_DOWN) {
        sched_state = SCHED_INTERRUPTING;
-        waitForReturnCapability(&task->cap,task);
-       scheduleDoGC(task->cap,task,rtsFalse);
+        Capability *cap = task->cap;
+        waitForReturnCapability(&cap,task);
+        scheduleDoGC(&cap,task,rtsTrue);
         ASSERT(task->incall->tso == NULL);
-        releaseCapability(task->cap);
+        releaseCapability(cap);
     }
     sched_state = SCHED_SHUTTING_DOWN;
 
     shutdownCapabilities(task, wait_foreign);
 
+    // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
+    //            n_failed_trygrab_idles, n_idle_caps);
+
     boundTaskExiting(task);
 }
 
@@ -2118,15 +2523,18 @@ static void
 performGC_(rtsBool force_major)
 {
     Task *task;
+    Capability *cap = NULL;
 
     // We must grab a new Task here, because the existing Task may be
     // associated with a particular Capability, and chained onto the 
     // suspended_ccalls queue.
     task = newBoundTask();
+    
+    // TODO: do we need to traceTask*() here?
 
-    waitForReturnCapability(&task->cap,task);
-    scheduleDoGC(task->cap,task,force_major);
-    releaseCapability(task->cap);
+    waitForReturnCapability(&cap,task);
+    scheduleDoGC(&cap,task,force_major);
+    releaseCapability(cap);
     boundTaskExiting(task);
 }
 
@@ -2346,7 +2754,7 @@ findRetryFrameHelper (Capability *cap, StgTSO *tso)
       
     case CATCH_RETRY_FRAME:
        debugTrace(DEBUG_stm,
-                  "found CATCH_RETRY_FRAME at %p during retrry", p);
+                   "found CATCH_RETRY_FRAME at %p during retry", p);
         tso->stackobj->sp = p;
        return CATCH_RETRY_FRAME;
       
@@ -2364,6 +2772,7 @@ findRetryFrameHelper (Capability *cap, StgTSO *tso)
     }
       
     case UNDERFLOW_FRAME:
+        tso->stackobj->sp = p;
         threadStackUnderflow(cap,tso);
         p = tso->stackobj->sp;
         continue;