/* ---------------------------------------------------------------------------
*
- * (c) The GHC Team, 2003-2006
+ * (c) The GHC Team, 2003-2012
*
* Capabilities
*
- * A Capability represent the token required to execute STG code,
+ * A Capability represents the token required to execute STG code,
* and all the state an OS thread/task needs to run Haskell code:
* its STG registers, a pointer to its TSO, a nursery etc. During
* STG execution, a pointer to the capabilitity is kept in a
#include "STM.h"
#include "RtsUtils.h"
+#if !defined(mingw32_HOST_OS)
+#include "rts/IOManager.h" // for setIOManagerControlFd()
+#endif
+
+#include <string.h>
+
// one global capability, this is the Capability for non-threaded
// builds, and for +RTS -N1
Capability MainCapability;
nat n_capabilities = 0;
-Capability *capabilities = NULL;
+nat enabled_capabilities = 0;
+
+// The array of Capabilities. It's important that when we need
+// to allocate more Capabilities we don't have to move the existing
+// Capabilities, because there may be pointers to them in use
+// (e.g. threads in waitForReturnCapability(), see #8209), so this is
+// an array of Capability* rather than an array of Capability.
+Capability **capabilities = NULL;
// Holds the Capability which last became free. This is used so that
// an in-call has a chance of quickly finding a free Capability.
// locking, so we don't do that.
Capability *last_free_capability = NULL;
-/* GC indicator, in scope for the scheduler, init'ed to false */
-volatile StgWord waiting_for_gc = 0;
+/*
+ * Indicates that the RTS wants to synchronise all the Capabilities
+ * for some reason. All Capabilities should stop and return to the
+ * scheduler.
+ */
+volatile StgWord pending_sync = 0;
/* Let foreign code get the current Capability -- assuming there is one!
* This is useful for unsafe foreign calls because they are called with
* the current Capability held, but they are not passed it. For example,
- * see see the integer-gmp package which calls allocateLocal() in its
+ * see see the integer-gmp package which calls allocate() in its
* stgAllocForGMP() function (which gets called by gmp functions).
* */
Capability * rts_unsafeGetMyCapability (void)
if (n_capabilities == 1) { return NULL; } // makes no sense...
debugTrace(DEBUG_sched,
- "cap %d: Trying to steal work from other capabilities",
+ "cap %d: Trying to steal work from other capabilities",
cap->no);
/* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
start at a random place instead of 0 as well. */
for ( i=0 ; i < n_capabilities ; i++ ) {
- robbed = &capabilities[i];
+ robbed = capabilities[i];
if (cap == robbed) // ourselves...
continue;
if (spark != NULL) {
cap->spark_stats.converted++;
traceEventSparkSteal(cap, robbed->no);
-
+
return spark;
}
// otherwise: no success, try next one
nat i;
for (i=0; i < n_capabilities; i++) {
- if (!emptySparkPoolCap(&capabilities[i])) {
+ if (!emptySparkPoolCap(capabilities[i])) {
return rtsTrue;
}
}
ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->next == NULL);
if (cap->returning_tasks_hd) {
- ASSERT(cap->returning_tasks_tl->next == NULL);
- cap->returning_tasks_tl->next = task;
+ ASSERT(cap->returning_tasks_tl->next == NULL);
+ cap->returning_tasks_tl->next = task;
} else {
- cap->returning_tasks_hd = task;
+ cap->returning_tasks_hd = task;
}
cap->returning_tasks_tl = task;
}
ASSERT(task);
cap->returning_tasks_hd = task->next;
if (!cap->returning_tasks_hd) {
- cap->returning_tasks_tl = NULL;
+ cap->returning_tasks_tl = NULL;
}
task->next = NULL;
return task;
cap->no = i;
cap->in_haskell = rtsFalse;
+ cap->idle = 0;
+ cap->disabled = rtsFalse;
cap->run_queue_hd = END_TSO_QUEUE;
cap->run_queue_tl = END_TSO_QUEUE;
cap->spark_stats.converted = 0;
cap->spark_stats.gcd = 0;
cap->spark_stats.fizzled = 0;
+#if !defined(mingw32_HOST_OS)
+ cap->io_manager_control_wr_fd = -1;
+#endif
#endif
+ cap->total_allocated = 0;
cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
cap->f.stgGCEnter1 = (StgFunPtr)__stg_gc_enter_1;
cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun;
cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
- RtsFlags.GcFlags.generations,
- "initCapability");
+ RtsFlags.GcFlags.generations,
+ "initCapability");
cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
RtsFlags.GcFlags.generations,
"initCapability");
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
- cap->mut_lists[g] = NULL;
+ cap->mut_lists[g] = NULL;
}
+ cap->weak_ptr_list_hd = NULL;
+ cap->weak_ptr_list_tl = NULL;
cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
cap->free_trec_chunks = END_STM_CHUNK_LIST;
cap->transaction_tokens = 0;
cap->context_switch = 0;
cap->pinned_object_block = NULL;
+ cap->pinned_object_blocks = NULL;
+
+#ifdef PROFILING
+ cap->r.rCCCS = CCS_SYSTEM;
+#else
+ cap->r.rCCCS = NULL;
+#endif
+ // cap->r.rCurrentTSO is charged for calls to allocate(), so we
+ // don't want it set when not running a Haskell thread.
+ cap->r.rCurrentTSO = NULL;
+
+ traceCapCreate(cap);
traceCapsetAssignCap(CAPSET_OSPROCESS_DEFAULT, i);
+ traceCapsetAssignCap(CAPSET_CLOCKDOMAIN_DEFAULT, i);
#if defined(THREADED_RTS)
traceSparkCounters(cap);
#endif
void
initCapabilities( void )
{
- /* Declare a single capability set representing the process.
- Each capability will get added to this capset. */
+ /* Declare a couple capability sets representing the process and
+ clock domain. Each capability will get added to these capsets. */
traceCapsetCreate(CAPSET_OSPROCESS_DEFAULT, CapsetTypeOsProcess);
+ traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain);
#if defined(THREADED_RTS)
- nat i;
#ifndef REG_Base
// We can't support multiple CPUs if BaseReg is not a register
if (RtsFlags.ParFlags.nNodes > 1) {
- errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
- RtsFlags.ParFlags.nNodes = 1;
+ errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
+ RtsFlags.ParFlags.nNodes = 1;
}
#endif
+ n_capabilities = 0;
+ moreCapabilities(0, RtsFlags.ParFlags.nNodes);
n_capabilities = RtsFlags.ParFlags.nNodes;
- if (n_capabilities == 1) {
- capabilities = &MainCapability;
- // THREADED_RTS must work on builds that don't have a mutable
- // BaseReg (eg. unregisterised), so in this case
- // capabilities[0] must coincide with &MainCapability.
- } else {
- capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
- "initCapabilities");
- }
-
- for (i = 0; i < n_capabilities; i++) {
- initCapability(&capabilities[i], i);
- }
-
- debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities);
-
#else /* !THREADED_RTS */
n_capabilities = 1;
- capabilities = &MainCapability;
+ capabilities = stgMallocBytes(sizeof(Capability*), "initCapabilities");
+ capabilities[0] = &MainCapability;
initCapability(&MainCapability, 0);
#endif
+ enabled_capabilities = n_capabilities;
+
// There are no free capabilities to begin with. We will start
// a worker Task to each Capability, which will quickly put the
// Capability on the free list when it finds nothing to do.
- last_free_capability = &capabilities[0];
+ last_free_capability = capabilities[0];
+}
+
+void
+moreCapabilities (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ nat i;
+ Capability **old_capabilities = capabilities;
+
+ capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
+
+ if (to == 1) {
+ // THREADED_RTS must work on builds that don't have a mutable
+ // BaseReg (eg. unregisterised), so in this case
+ // capabilities[0] must coincide with &MainCapability.
+ capabilities[0] = &MainCapability;
+ initCapability(&MainCapability, 0);
+ }
+ else
+ {
+ for (i = 0; i < to; i++) {
+ if (i < from) {
+ capabilities[i] = old_capabilities[i];
+ } else {
+ capabilities[i] = stgMallocBytes(sizeof(Capability),
+ "moreCapabilities");
+ initCapability(capabilities[i], i);
+ }
+ }
+ }
+
+ debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
+
+ if (old_capabilities != NULL) {
+ stgFree(old_capabilities);
+ }
+#endif
}
/* ----------------------------------------------------------------------------
* soon as possible.
* ------------------------------------------------------------------------- */
-void setContextSwitches(void)
+void contextSwitchAllCapabilities(void)
{
nat i;
for (i=0; i < n_capabilities; i++) {
- contextSwitchCapability(&capabilities[i]);
+ contextSwitchCapability(capabilities[i]);
+ }
+}
+
+void interruptAllCapabilities(void)
+{
+ nat i;
+ for (i=0; i < n_capabilities; i++) {
+ interruptCapability(capabilities[i]);
}
}
{
ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->cap == cap);
- debugTrace(DEBUG_sched, "passing capability %d to %s %p",
+ debugTrace(DEBUG_sched, "passing capability %d to %s %#" FMT_HexWord64,
cap->no, task->incall->tso ? "bound task" : "worker",
- (void *)task->id);
+ serialisableTaskId(task));
ACQUIRE_LOCK(&task->lock);
- task->wakeup = rtsTrue;
- // the wakeup flag is needed because signalCondition() doesn't
- // flag the condition if the thread is already runniing, but we want
- // it to be sticky.
- signalCondition(&task->cond);
+ if (task->wakeup == rtsFalse) {
+ task->wakeup = rtsTrue;
+ // the wakeup flag is needed because signalCondition() doesn't
+ // flag the condition if the thread is already runniing, but we want
+ // it to be sticky.
+ signalCondition(&task->cond);
+ }
RELEASE_LOCK(&task->lock);
}
#endif
#if defined(THREADED_RTS)
void
-releaseCapability_ (Capability* cap,
+releaseCapability_ (Capability* cap,
rtsBool always_wakeup)
{
Task *task;
// Check to see whether a worker thread can be given
// the go-ahead to return the result of an external call..
if (cap->returning_tasks_hd != NULL) {
- giveCapabilityToTask(cap,cap->returning_tasks_hd);
- // The Task pops itself from the queue (see waitForReturnCapability())
- return;
+ giveCapabilityToTask(cap,cap->returning_tasks_hd);
+ // The Task pops itself from the queue (see waitForReturnCapability())
+ return;
}
- if (waiting_for_gc == PENDING_GC_SEQ) {
+ // If there is a pending sync, then we should just leave the
+ // Capability free. The thread trying to sync will be about to
+ // call waitForReturnCapability().
+ if (pending_sync != 0 && pending_sync != SYNC_GC_PAR) {
last_free_capability = cap; // needed?
- debugTrace(DEBUG_sched, "GC pending, set capability %d free", cap->no);
+ debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no);
return;
- }
-
+ }
// If the next thread on the run queue is a bound thread,
// give this Capability to the appropriate Task.
- if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
- // Make sure we're not about to try to wake ourselves up
- // ASSERT(task != cap->run_queue_hd->bound);
+ if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) {
+ // Make sure we're not about to try to wake ourselves up
+ // ASSERT(task != cap->run_queue_hd->bound);
// assertion is false: in schedule() we force a yield after
- // ThreadBlocked, but the thread may be back on the run queue
- // by now.
- task = cap->run_queue_hd->bound->task;
- giveCapabilityToTask(cap,task);
- return;
+ // ThreadBlocked, but the thread may be back on the run queue
+ // by now.
+ task = peekRunQueue(cap)->bound->task;
+ giveCapabilityToTask(cap, task);
+ return;
}
if (!cap->spare_workers) {
- // Create a worker thread if we don't have one. If the system
- // is interrupted, we only create a worker task if there
- // are threads that need to be completed. If the system is
- // shutting down, we never create a new worker.
- if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
- debugTrace(DEBUG_sched,
- "starting new worker on capability %d", cap->no);
- startWorkerTask(cap);
- return;
- }
+ // Create a worker thread if we don't have one. If the system
+ // is interrupted, we only create a worker task if there
+ // are threads that need to be completed. If the system is
+ // shutting down, we never create a new worker.
+ if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
+ debugTrace(DEBUG_sched,
+ "starting new worker on capability %d", cap->no);
+ startWorkerTask(cap);
+ return;
+ }
}
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
- if (always_wakeup ||
+ if (always_wakeup ||
!emptyRunQueue(cap) || !emptyInbox(cap) ||
- !emptySparkPoolCap(cap) || globalWorkToDo()) {
- if (cap->spare_workers) {
- giveCapabilityToTask(cap,cap->spare_workers);
- // The worker Task pops itself from the queue;
- return;
- }
+ (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
+ if (cap->spare_workers) {
+ giveCapabilityToTask(cap, cap->spare_workers);
+ // The worker Task pops itself from the queue;
+ return;
+ }
}
+#ifdef PROFILING
+ cap->r.rCCCS = CCS_IDLE;
+#endif
last_free_capability = cap;
debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
}
#endif
/* ----------------------------------------------------------------------------
- * waitForReturnCapability( Task *task )
+ * waitForReturnCapability (Capability **pCap, Task *task)
*
* Purpose: when an OS thread returns from an external call,
* it calls waitForReturnCapability() (via Schedule.resumeThread())
Capability *cap = *pCap;
if (cap == NULL) {
- // Try last_free_capability first
- cap = last_free_capability;
- if (cap->running_task) {
- nat i;
- // otherwise, search for a free capability
+ // Try last_free_capability first
+ cap = last_free_capability;
+ if (cap->running_task) {
+ nat i;
+ // otherwise, search for a free capability
cap = NULL;
- for (i = 0; i < n_capabilities; i++) {
- if (!capabilities[i].running_task) {
- cap = &capabilities[i];
- break;
- }
- }
+ for (i = 0; i < n_capabilities; i++) {
+ if (!capabilities[i]->running_task) {
+ cap = capabilities[i];
+ break;
+ }
+ }
if (cap == NULL) {
// Can't find a free one, use last_free_capability.
cap = last_free_capability;
}
- }
+ }
- // record the Capability as the one this Task is now assocated with.
- task->cap = cap;
+ // record the Capability as the one this Task is now assocated with.
+ task->cap = cap;
} else {
- ASSERT(task->cap == cap);
+ ASSERT(task->cap == cap);
}
ACQUIRE_LOCK(&cap->lock);
debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
if (!cap->running_task) {
- // It's free; just grab it
- cap->running_task = task;
- RELEASE_LOCK(&cap->lock);
+ // It's free; just grab it
+ cap->running_task = task;
+ RELEASE_LOCK(&cap->lock);
} else {
- newReturningTask(cap,task);
- RELEASE_LOCK(&cap->lock);
-
- for (;;) {
- ACQUIRE_LOCK(&task->lock);
- // task->lock held, cap->lock not held
- if (!task->wakeup) waitCondition(&task->cond, &task->lock);
- cap = task->cap;
- task->wakeup = rtsFalse;
- RELEASE_LOCK(&task->lock);
-
- // now check whether we should wake up...
- ACQUIRE_LOCK(&cap->lock);
- if (cap->running_task == NULL) {
- if (cap->returning_tasks_hd != task) {
- giveCapabilityToTask(cap,cap->returning_tasks_hd);
- RELEASE_LOCK(&cap->lock);
- continue;
- }
- cap->running_task = task;
- popReturningTask(cap);
- RELEASE_LOCK(&cap->lock);
- break;
- }
- RELEASE_LOCK(&cap->lock);
- }
+ newReturningTask(cap,task);
+ RELEASE_LOCK(&cap->lock);
+
+ for (;;) {
+ ACQUIRE_LOCK(&task->lock);
+ // task->lock held, cap->lock not held
+ if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+ cap = task->cap;
+ task->wakeup = rtsFalse;
+ RELEASE_LOCK(&task->lock);
+
+ // now check whether we should wake up...
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task == NULL) {
+ if (cap->returning_tasks_hd != task) {
+ giveCapabilityToTask(cap,cap->returning_tasks_hd);
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+ cap->running_task = task;
+ popReturningTask(cap);
+ RELEASE_LOCK(&cap->lock);
+ break;
+ }
+ RELEASE_LOCK(&cap->lock);
+ }
}
- ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+#ifdef PROFILING
+ cap->r.rCCCS = CCS_SYSTEM;
+#endif
+
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
* yieldCapability
* ------------------------------------------------------------------------- */
-void
-yieldCapability (Capability** pCap, Task *task)
+/* See Note [GC livelock] in Schedule.c for why we have gcAllowed
+ and return the rtsBool */
+rtsBool /* Did we GC? */
+yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
{
Capability *cap = *pCap;
- if (waiting_for_gc == PENDING_GC_PAR) {
+ if ((pending_sync == SYNC_GC_PAR) && gcAllowed) {
traceEventGcStart(cap);
gcWorkerThread(cap);
traceEventGcEnd(cap);
traceSparkCounters(cap);
- return;
+ // See Note [migrated bound threads 2]
+ if (task->cap == cap) {
+ return rtsTrue;
+ }
}
- debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
-
- // We must now release the capability and wait to be woken up
- // again.
- task->wakeup = rtsFalse;
- releaseCapabilityAndQueueWorker(cap);
-
- for (;;) {
- ACQUIRE_LOCK(&task->lock);
- // task->lock held, cap->lock not held
- if (!task->wakeup) waitCondition(&task->cond, &task->lock);
- cap = task->cap;
- task->wakeup = rtsFalse;
- RELEASE_LOCK(&task->lock);
-
- debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
-
- ACQUIRE_LOCK(&cap->lock);
- if (cap->running_task != NULL) {
- debugTrace(DEBUG_sched,
- "capability %d is owned by another task", cap->no);
- RELEASE_LOCK(&cap->lock);
- continue;
- }
-
- if (task->incall->tso == NULL) {
- ASSERT(cap->spare_workers != NULL);
- // if we're not at the front of the queue, release it
- // again. This is unlikely to happen.
- if (cap->spare_workers != task) {
- giveCapabilityToTask(cap,cap->spare_workers);
- RELEASE_LOCK(&cap->lock);
- continue;
- }
- cap->spare_workers = task->next;
- task->next = NULL;
+ debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
+
+ // We must now release the capability and wait to be woken up
+ // again.
+ task->wakeup = rtsFalse;
+ releaseCapabilityAndQueueWorker(cap);
+
+ for (;;) {
+ ACQUIRE_LOCK(&task->lock);
+ // task->lock held, cap->lock not held
+ if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+ cap = task->cap;
+ task->wakeup = rtsFalse;
+ RELEASE_LOCK(&task->lock);
+
+ debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
+
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task != NULL) {
+ debugTrace(DEBUG_sched,
+ "capability %d is owned by another task", cap->no);
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+
+ if (task->cap != cap) {
+ // see Note [migrated bound threads]
+ debugTrace(DEBUG_sched,
+ "task has been migrated to cap %d", task->cap->no);
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+
+ if (task->incall->tso == NULL) {
+ ASSERT(cap->spare_workers != NULL);
+ // if we're not at the front of the queue, release it
+ // again. This is unlikely to happen.
+ if (cap->spare_workers != task) {
+ giveCapabilityToTask(cap,cap->spare_workers);
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+ cap->spare_workers = task->next;
+ task->next = NULL;
cap->n_spare_workers--;
}
- cap->running_task = task;
- RELEASE_LOCK(&cap->lock);
- break;
- }
- debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
- ASSERT(cap->running_task == task);
+ cap->running_task = task;
+ RELEASE_LOCK(&cap->lock);
+ break;
+ }
+
+ debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
+ ASSERT(cap->running_task == task);
+
+#ifdef PROFILING
+ cap->r.rCCCS = CCS_SYSTEM;
+#endif
*pCap = cap;
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
- return;
+ return rtsFalse;
}
+// Note [migrated bound threads]
+//
+// There's a tricky case where:
+// - cap A is running an unbound thread T1
+// - there is a bound thread T2 at the head of the run queue on cap A
+// - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
+// - T1 returns quickly grabbing A again (T2 is still waking up on A)
+// - T1 blocks, the scheduler migrates T2 to cap B
+// - the task bound to T2 wakes up on cap B
+//
+// We take advantage of the following invariant:
+//
+// - A bound thread can only be migrated by the holder of the
+// Capability on which the bound thread currently lives. So, if we
+// hold Capabilty C, and task->cap == C, then task cannot be
+// migrated under our feet.
+
+// Note [migrated bound threads 2]
+//
+// Second tricky case;
+// - A bound Task becomes a GC thread
+// - scheduleDoGC() migrates the thread belonging to this Task,
+// because the Capability it is on is disabled
+// - after GC, gcWorkerThread() returns, but now we are
+// holding a Capability that is not the same as task->cap
+// - Hence we must check for this case and immediately give up the
+// cap we hold.
+
/* ----------------------------------------------------------------------------
* prodCapability
*
- * If a Capability is currently idle, wake up a Task on it. Used to
+ * If a Capability is currently idle, wake up a Task on it. Used to
* get every Capability into the GC.
* ------------------------------------------------------------------------- */
if (cap->running_task != NULL) return rtsFalse;
ACQUIRE_LOCK(&cap->lock);
if (cap->running_task != NULL) {
- RELEASE_LOCK(&cap->lock);
- return rtsFalse;
+ RELEASE_LOCK(&cap->lock);
+ return rtsFalse;
}
task->cap = cap;
cap->running_task = task;
* allow the workers to stop.
*
* This function should be called when interrupted and
- * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
+ * sched_state = SCHED_SHUTTING_DOWN, thus any worker that wakes up
* will exit the scheduler and call taskStop(), and any bound thread
* that wakes up will return to its caller. Runnable threads are
* killed.
* ------------------------------------------------------------------------- */
void
-shutdownCapability (Capability *cap,
+shutdownCapability (Capability *cap USED_IF_THREADS,
Task *task USED_IF_THREADS,
rtsBool safe USED_IF_THREADS)
{
for (i = 0; /* i < 50 */; i++) {
ASSERT(sched_state == SCHED_SHUTTING_DOWN);
- debugTrace(DEBUG_sched,
- "shutting down capability %d, attempt %d", cap->no, i);
- ACQUIRE_LOCK(&cap->lock);
- if (cap->running_task) {
- RELEASE_LOCK(&cap->lock);
- debugTrace(DEBUG_sched, "not owner, yielding");
- yieldThread();
- continue;
- }
- cap->running_task = task;
+ debugTrace(DEBUG_sched,
+ "shutting down capability %d, attempt %d", cap->no, i);
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task) {
+ RELEASE_LOCK(&cap->lock);
+ debugTrace(DEBUG_sched, "not owner, yielding");
+ yieldThread();
+ continue;
+ }
+ cap->running_task = task;
if (cap->spare_workers) {
// Look for workers that have died without removing
prev = NULL;
for (t = cap->spare_workers; t != NULL; t = t->next) {
if (!osThreadIsAlive(t->id)) {
- debugTrace(DEBUG_sched,
- "worker thread %p has died unexpectedly", (void *)t->id);
+ debugTrace(DEBUG_sched,
+ "worker thread %p has died unexpectedly", (void *)(size_t)t->id);
cap->n_spare_workers--;
if (!prev) {
cap->spare_workers = t->next;
}
}
- if (!emptyRunQueue(cap) || cap->spare_workers) {
- debugTrace(DEBUG_sched,
- "runnable threads or workers still alive, yielding");
- releaseCapability_(cap,rtsFalse); // this will wake up a worker
- RELEASE_LOCK(&cap->lock);
- yieldThread();
- continue;
- }
+ if (!emptyRunQueue(cap) || cap->spare_workers) {
+ debugTrace(DEBUG_sched,
+ "runnable threads or workers still alive, yielding");
+ releaseCapability_(cap,rtsFalse); // this will wake up a worker
+ RELEASE_LOCK(&cap->lock);
+ yieldThread();
+ continue;
+ }
// If "safe", then busy-wait for any threads currently doing
// foreign calls. If we're about to unload this DLL, for
// We can be a bit more relaxed when this is a standalone
// program that is about to terminate, and let safe=false.
if (cap->suspended_ccalls && safe) {
- debugTrace(DEBUG_sched,
- "thread(s) are involved in foreign calls, yielding");
+ debugTrace(DEBUG_sched,
+ "thread(s) are involved in foreign calls, yielding");
cap->running_task = NULL;
- RELEASE_LOCK(&cap->lock);
+ RELEASE_LOCK(&cap->lock);
// The IO manager thread might have been slow to start up,
// so the first attempt to kill it might not have
// succeeded. Just in case, try again - the kill message
continue;
}
- traceEventShutdown(cap);
- RELEASE_LOCK(&cap->lock);
- break;
+ traceSparkCounters(cap);
+ RELEASE_LOCK(&cap->lock);
+ break;
}
// we now have the Capability, its run queue and spare workers
// list are both empty.
// ToDo: we can't drop this mutex, because there might still be
- // threads performing foreign calls that will eventually try to
+ // threads performing foreign calls that will eventually try to
// return via resumeThread() and attempt to grab cap->lock.
// closeMutex(&cap->lock);
-
- traceSparkCounters(cap);
-
-#endif /* THREADED_RTS */
-
- traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no);
+#endif
}
void
nat i;
for (i=0; i < n_capabilities; i++) {
ASSERT(task->incall->tso == NULL);
- shutdownCapability(&capabilities[i], task, safe);
+ shutdownCapability(capabilities[i], task, safe);
}
- traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
-
#if defined(THREADED_RTS)
ASSERT(checkSparkCountInvariant());
#endif
#if defined(THREADED_RTS)
freeSparkPool(cap->sparks);
#endif
+ traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no);
+ traceCapsetRemoveCap(CAPSET_CLOCKDOMAIN_DEFAULT, cap->no);
+ traceCapDelete(cap);
}
void
#if defined(THREADED_RTS)
nat i;
for (i=0; i < n_capabilities; i++) {
- freeCapability(&capabilities[i]);
+ freeCapability(capabilities[i]);
+ if (capabilities[i] != &MainCapability)
+ stgFree(capabilities[i]);
}
#else
freeCapability(&MainCapability);
#endif
+ stgFree(capabilities);
+ traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
+ traceCapsetDelete(CAPSET_CLOCKDOMAIN_DEFAULT);
}
/* ---------------------------------------------------------------------------
{
nat n;
for (n = 0; n < n_capabilities; n++) {
- markCapability(evac, user, &capabilities[n], rtsFalse);
+ markCapability(evac, user, capabilities[n], rtsFalse);
}
}
nat i;
for (i = 0; i < n_capabilities; i++) {
- sparks.created += capabilities[i].spark_stats.created;
- sparks.dud += capabilities[i].spark_stats.dud;
- sparks.overflowed+= capabilities[i].spark_stats.overflowed;
- sparks.converted += capabilities[i].spark_stats.converted;
- sparks.gcd += capabilities[i].spark_stats.gcd;
- sparks.fizzled += capabilities[i].spark_stats.fizzled;
- remaining += sparkPoolSize(capabilities[i].sparks);
+ sparks.created += capabilities[i]->spark_stats.created;
+ sparks.dud += capabilities[i]->spark_stats.dud;
+ sparks.overflowed+= capabilities[i]->spark_stats.overflowed;
+ sparks.converted += capabilities[i]->spark_stats.converted;
+ sparks.gcd += capabilities[i]->spark_stats.gcd;
+ sparks.fizzled += capabilities[i]->spark_stats.fizzled;
+ remaining += sparkPoolSize(capabilities[i]->sparks);
}
-
+
/* The invariant is
* created = converted + remaining + gcd + fizzled
*/
}
#endif
+
+#if !defined(mingw32_HOST_OS)
+void setIOManagerControlFd(nat cap_no USED_IF_THREADS, int fd USED_IF_THREADS) {
+#if defined(THREADED_RTS)
+ if (cap_no < n_capabilities) {
+ capabilities[cap_no]->io_manager_control_wr_fd = fd;
+ } else {
+ errorBelch("warning: setIOManagerControlFd called with illegal capability number.");
+ }
+#endif
+}
+#endif