Make globals use sharedCAF
[ghc.git] / rts / Threads.c
index 76e844a..f5eb9d3 100644 (file)
@@ -53,15 +53,13 @@ static StgThreadID next_thread_id = 1;
 
    createGenThread() and createIOThread() (in SchedAPI.h) are
    convenient packaged versions of this function.
-
-   currently pri (priority) is only used in a GRAN setup -- HWL
    ------------------------------------------------------------------------ */
 StgTSO *
 createThread(Capability *cap, W_ size)
 {
     StgTSO *tso;
     StgStack *stack;
-    nat stack_size;
+    uint32_t stack_size;
 
     /* sched_mutex is *not* required */
 
@@ -110,6 +108,8 @@ createThread(Capability *cap, W_ size)
     tso->stackobj       = stack;
     tso->tot_stack_size = stack->stack_size;
 
+    ASSIGN_Int64((W_*)&(tso->alloc_limit), 0);
+
     tso->trec = NO_TREC;
 
 #ifdef PROFILING
@@ -164,12 +164,37 @@ rts_getThreadId(StgPtr tso)
   return ((StgTSO *)tso)->id;
 }
 
+/* ---------------------------------------------------------------------------
+ * Getting & setting the thread allocation limit
+ * ------------------------------------------------------------------------ */
+HsInt64 rts_getThreadAllocationCounter(StgPtr tso)
+{
+    // NB. doesn't take into account allocation in the current nursery
+    // block, so it might be off by up to 4k.
+    return PK_Int64((W_*)&(((StgTSO *)tso)->alloc_limit));
+}
+
+void rts_setThreadAllocationCounter(StgPtr tso, HsInt64 i)
+{
+    ASSIGN_Int64((W_*)&(((StgTSO *)tso)->alloc_limit), i);
+}
+
+void rts_enableThreadAllocationLimit(StgPtr tso)
+{
+    ((StgTSO *)tso)->flags |= TSO_ALLOC_LIMIT;
+}
+
+void rts_disableThreadAllocationLimit(StgPtr tso)
+{
+    ((StgTSO *)tso)->flags &= ~TSO_ALLOC_LIMIT;
+}
+
 /* -----------------------------------------------------------------------------
    Remove a thread from a queue.
    Fails fatally if the TSO is not on the queue.
    -------------------------------------------------------------------------- */
 
-rtsBool // returns True if we modified queue
+bool // returns true if we modified queue
 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
 {
     StgTSO *t, *prev;
@@ -180,33 +205,33 @@ removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
             if (prev) {
                 setTSOLink(cap,prev,t->_link);
                 t->_link = END_TSO_QUEUE;
-                return rtsFalse;
+                return false;
             } else {
                 *queue = t->_link;
                 t->_link = END_TSO_QUEUE;
-                return rtsTrue;
+                return true;
             }
         }
     }
     barf("removeThreadFromQueue: not found");
 }
 
-rtsBool // returns True if we modified head or tail
+bool // returns true if we modified head or tail
 removeThreadFromDeQueue (Capability *cap,
                          StgTSO **head, StgTSO **tail, StgTSO *tso)
 {
     StgTSO *t, *prev;
-    rtsBool flag = rtsFalse;
+    bool flag = false;
 
     prev = NULL;
     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
         if (t == tso) {
             if (prev) {
                 setTSOLink(cap,prev,t->_link);
-                flag = rtsFalse;
+                flag = false;
             } else {
                 *head = t->_link;
-                flag = rtsTrue;
+                flag = true;
             }
             t->_link = END_TSO_QUEUE;
             if (*tail == tso) {
@@ -215,7 +240,7 @@ removeThreadFromDeQueue (Capability *cap,
                 } else {
                     *tail = END_TSO_QUEUE;
                 }
-                return rtsTrue;
+                return true;
             } else {
                 return flag;
             }
@@ -334,7 +359,7 @@ migrateThread (Capability *from, StgTSO *tso, Capability *to)
    wakes up all the threads on the specified queue.
    ------------------------------------------------------------------------- */
 
-void
+static void
 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
 {
     MessageBlackHole *msg;
@@ -478,7 +503,7 @@ isThreadBound(StgTSO* tso USED_IF_THREADS)
 #if defined(THREADED_RTS)
   return (tso->bound != NULL);
 #endif
-  return rtsFalse;
+  return false;
 }
 
 /* -----------------------------------------------------------------------------
@@ -524,21 +549,9 @@ threadStackOverflow (Capability *cap, StgTSO *tso)
                                  stg_min(tso->stackobj->stack + tso->stackobj->stack_size,
                                          tso->stackobj->sp+64)));
 
-        if (tso->flags & TSO_BLOCKEX) {
-            // NB. StackOverflow exceptions must be deferred if the thread is
-            // inside Control.Exception.mask.  See bug #767 and bug #8303.
-            // This implementation is a minor hack, see Note [Throw to self when masked]
-            MessageThrowTo *msg = (MessageThrowTo*)allocate(cap, sizeofW(MessageThrowTo));
-            SET_HDR(msg, &stg_MSG_THROWTO_info, CCS_SYSTEM);
-            msg->source = tso;
-            msg->target = tso;
-            msg->exception = (StgClosure *)stackOverflow_closure;
-            blockedThrowTo(cap, tso, msg);
-        } else {
-            // Send this thread the StackOverflow exception
-            throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
-            return;
-        }
+        // Note [Throw to self when masked], also #767 and #8303.
+        throwToSelf(cap, tso, (StgClosure *)stackOverflow_closure);
+        return;
     }
 
 
@@ -589,7 +602,15 @@ threadStackOverflow (Capability *cap, StgTSO *tso)
                   "allocating new stack chunk of size %d bytes",
                   chunk_size * sizeof(W_));
 
+    // Charge the current thread for allocating stack.  Stack usage is
+    // non-deterministic, because the chunk boundaries might vary from
+    // run to run, but accounting for this is better than not
+    // accounting for it, since a deep recursion will otherwise not be
+    // subject to allocation limits.
+    cap->r.rCurrentTSO = tso;
     new_stack = (StgStack*) allocate(cap, chunk_size);
+    cap->r.rCurrentTSO = NULL;
+
     SET_HDR(new_stack, &stg_STACK_info, old_stack->header.prof.ccs);
     TICK_ALLOC_STACK(chunk_size);
 
@@ -669,39 +690,6 @@ threadStackOverflow (Capability *cap, StgTSO *tso)
     // IF_DEBUG(scheduler,printTSO(new_tso));
 }
 
-/* Note [Throw to self when masked]
- *
- * When a StackOverflow occurs when the thread is masked, we want to
- * defer the exception to when the thread becomes unmasked/hits an
- * interruptible point.  We already have a mechanism for doing this,
- * the blocked_exceptions list, but the use here is a bit unusual,
- * because an exception is normally only added to this list upon
- * an asynchronous 'throwTo' call (with all of the relevant
- * multithreaded nonsense). Morally, a stack overflow should be an
- * asynchronous exception sent by a thread to itself, and it should
- * have the same semantics.  But there are a few key differences:
- *
- * - If you actually tried to send an asynchronous exception to
- *   yourself using throwTo, the exception would actually immediately
- *   be delivered.  This is because throwTo itself is considered an
- *   interruptible point, so the exception is always deliverable. Thus,
- *   ordinarily, we never end up with a message to onesself in the
- *   blocked_exceptions queue.
- *
- * - In the case of a StackOverflow, we don't actually care about the
- *   wakeup semantics; when an exception is delivered, the thread that
- *   originally threw the exception should be woken up, since throwTo
- *   blocks until the exception is successfully thrown.  Fortunately,
- *   it is harmless to wakeup a thread that doesn't actually need waking
- *   up, e.g. ourselves.
- *
- * - No synchronization is necessary, because we own the TSO and the
- *   capability.  You can observe this by tracing through the execution
- *   of throwTo.  We skip synchronizing the message and inter-capability
- *   communication.
- *
- * We think this doesn't break any invariants, but do be careful!
- */
 
 
 /* ---------------------------------------------------------------------------
@@ -713,7 +701,7 @@ threadStackUnderflow (Capability *cap, StgTSO *tso)
 {
     StgStack *new_stack, *old_stack;
     StgUnderflowFrame *frame;
-    nat retvals;
+    uint32_t retvals;
 
     debugTraceCap(DEBUG_sched, cap, "stack underflow");
 
@@ -756,6 +744,85 @@ threadStackUnderflow (Capability *cap, StgTSO *tso)
 }
 
 /* ----------------------------------------------------------------------------
+   Implementation of tryPutMVar#
+
+   NOTE: this should be kept in sync with stg_tryPutMVarzh in PrimOps.cmm
+   ------------------------------------------------------------------------- */
+
+bool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value)
+{
+    const StgInfoTable *info;
+    StgMVarTSOQueue *q;
+    StgTSO *tso;
+
+    info = lockClosure((StgClosure*)mvar);
+
+    if (mvar->value != &stg_END_TSO_QUEUE_closure) {
+#if defined(THREADED_RTS)
+        unlockClosure((StgClosure*)mvar, info);
+#endif
+        return false;
+    }
+
+    q = mvar->head;
+loop:
+    if (q == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
+        /* No further takes, the MVar is now full. */
+        if (info == &stg_MVAR_CLEAN_info) {
+            dirty_MVAR(&cap->r, (StgClosure*)mvar);
+        }
+
+        mvar->value = value;
+        unlockClosure((StgClosure*)mvar, &stg_MVAR_DIRTY_info);
+        return true;
+    }
+    if (q->header.info == &stg_IND_info ||
+        q->header.info == &stg_MSG_NULL_info) {
+        q = (StgMVarTSOQueue*)((StgInd*)q)->indirectee;
+        goto loop;
+    }
+
+    // There are takeMVar(s) waiting: wake up the first one
+    tso = q->tso;
+    mvar->head = q->link;
+    if (mvar->head == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
+        mvar->tail = (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure;
+    }
+
+    ASSERT(tso->block_info.closure == (StgClosure*)mvar);
+    // save why_blocked here, because waking up the thread destroys
+    // this information
+    StgWord why_blocked = tso->why_blocked;
+
+    // actually perform the takeMVar
+    StgStack* stack = tso->stackobj;
+    stack->sp[1] = (W_)value;
+    stack->sp[0] = (W_)&stg_ret_p_info;
+
+    // indicate that the MVar operation has now completed.
+    tso->_link = (StgTSO*)&stg_END_TSO_QUEUE_closure;
+
+    if (stack->dirty == 0) {
+        dirty_STACK(cap, stack);
+    }
+
+    tryWakeupThread(cap, tso);
+
+    // If it was an readMVar, then we can still do work,
+    // so loop back. (XXX: This could take a while)
+    if (why_blocked == BlockedOnMVarRead) {
+        q = ((StgMVarTSOQueue*)q)->link;
+        goto loop;
+    }
+
+    ASSERT(why_blocked == BlockedOnMVar);
+
+    unlockClosure((StgClosure*)mvar, info);
+
+    return true;
+}
+
+/* ----------------------------------------------------------------------------
  * Debugging: why is a thread blocked
  * ------------------------------------------------------------------------- */
 
@@ -843,7 +910,7 @@ void
 printAllThreads(void)
 {
   StgTSO *t, *next;
-  nat i, g;
+  uint32_t i, g;
   Capability *cap;
 
   debugBelch("all threads:\n");
@@ -871,7 +938,7 @@ printAllThreads(void)
 void
 printThreadQueue(StgTSO *t)
 {
-    nat i = 0;
+    uint32_t i = 0;
     for (; t != END_TSO_QUEUE; t = t->_link) {
         printThreadStatus(t);
         i++;