Implement atomicReadMVar, fixing #4001.
authorEdward Z. Yang <ezyang@mit.edu>
Mon, 8 Jul 2013 18:03:35 +0000 (11:03 -0700)
committerEdward Z. Yang <ezyang@mit.edu>
Tue, 9 Jul 2013 18:29:11 +0000 (11:29 -0700)
We add the invariant to the MVar blocked threads queue that
threads blocked on an atomic read are always at the front of
the queue.  This invariant is easy to maintain, since takers
are only ever added to the end of the queue.

Signed-off-by: Edward Z. Yang <ezyang@mit.edu>
15 files changed:
compiler/prelude/primops.txt.pp
includes/rts/Constants.h
includes/stg/MiscClosures.h
rts/HeapStackCheck.cmm
rts/Linker.c
rts/PrimOps.cmm
rts/RaiseAsync.c
rts/RaiseAsync.h
rts/RetainerProfile.c
rts/Schedule.c
rts/Threads.c
rts/Trace.c
rts/sm/Compact.c
rts/sm/Sanity.c
rts/sm/Scav.c

index 7203c11..739092d 100644 (file)
@@ -1717,6 +1717,15 @@ primop  TryPutMVarOp "tryPutMVar#" GenPrimOp
    out_of_line      = True
    has_side_effects = True
 
+primop  AtomicReadMVarOp "atomicReadMVar#" GenPrimOp
+   MVar# s a -> State# s -> (# State# s, a #)
+   {If {\tt MVar\#} is empty, block until it becomes full.
+   Then read its contents without modifying the MVar, without possibility
+   of intervention from other threads.}
+   with
+   out_of_line      = True
+   has_side_effects = True
+
 primop  SameMVarOp "sameMVar#" GenPrimOp
    MVar# s a -> MVar# s a -> Bool
 
index 5ff4d4e..4739e3a 100644 (file)
  */
 #define NotBlocked          0
 #define BlockedOnMVar       1
-#define BlockedOnBlackHole  2
-#define BlockedOnRead       3
-#define BlockedOnWrite      4
-#define BlockedOnDelay      5
-#define BlockedOnSTM        6
+#define BlockedOnMVarRead   2
+#define BlockedOnBlackHole  3
+#define BlockedOnRead       4
+#define BlockedOnWrite      5
+#define BlockedOnDelay      6
+#define BlockedOnSTM        7
 
 /* Win32 only: */
-#define BlockedOnDoProc     7
+#define BlockedOnDoProc     8
 
 /* Only relevant for PAR: */
   /* blocked on a remote closure represented by a Global Address: */
-#define BlockedOnGA         8
+#define BlockedOnGA         9
   /* same as above but without sending a Fetch message */
-#define BlockedOnGA_NoSend  9
+#define BlockedOnGA_NoSend  10
 /* Only relevant for THREADED_RTS: */
-#define BlockedOnCCall      10
-#define BlockedOnCCall_Interruptible 11
+#define BlockedOnCCall      11
+#define BlockedOnCCall_Interruptible 12
    /* same as above but permit killing the worker thread */
 
 /* Involved in a message sent to tso->msg_cap */
-#define BlockedOnMsgThrowTo 12
+#define BlockedOnMsgThrowTo 13
 
 /* The thread is not on any run queues, but can be woken up
    by tryWakeupThread() */
-#define ThreadMigrating     13
+#define ThreadMigrating     14
 
 /*
  * These constants are returned to the scheduler by a thread that has
index 8717687..88cee59 100644 (file)
@@ -293,7 +293,9 @@ RTS_FUN_DECL(stg_block_noregs);
 RTS_FUN_DECL(stg_block_blackhole);
 RTS_FUN_DECL(stg_block_blackhole_finally);
 RTS_FUN_DECL(stg_block_takemvar);
+RTS_FUN_DECL(stg_block_atomicreadmvar);
 RTS_RET(stg_block_takemvar);
+RTS_RET(stg_block_atomicreadmvar);
 RTS_FUN_DECL(stg_block_putmvar);
 RTS_RET(stg_block_putmvar);
 #ifdef mingw32_HOST_OS
@@ -376,6 +378,7 @@ RTS_FUN_DECL(stg_isEmptyMVarzh);
 RTS_FUN_DECL(stg_newMVarzh);
 RTS_FUN_DECL(stg_takeMVarzh);
 RTS_FUN_DECL(stg_putMVarzh);
+RTS_FUN_DECL(stg_atomicReadMVarzh);
 RTS_FUN_DECL(stg_tryTakeMVarzh);
 RTS_FUN_DECL(stg_tryPutMVarzh);
 
index fbceb76..20cd9df 100644 (file)
@@ -487,11 +487,11 @@ stg_block_noregs
 /* -----------------------------------------------------------------------------
  * takeMVar/putMVar-specific blocks
  *
- * Stack layout for a thread blocked in takeMVar:
+ * Stack layout for a thread blocked in takeMVar/atomicReadMVar:
  *      
  *       ret. addr
  *       ptr to MVar   (R1)
- *       stg_block_takemvar_info
+ *       stg_block_takemvar_info (or stg_block_readmvar_info)
  *
  * Stack layout for a thread blocked in putMVar:
  *      
@@ -531,6 +531,33 @@ stg_block_takemvar /* mvar passed in R1 */
     BLOCK_BUT_FIRST(stg_block_takemvar_finally);
 }
 
+INFO_TABLE_RET ( stg_block_atomicreadmvar, RET_SMALL, W_ info_ptr, P_ mvar )
+    return ()
+{
+    jump stg_atomicReadMVarzh(mvar);
+}
+
+// code fragment executed just before we return to the scheduler
+stg_block_atomicreadmvar_finally
+{
+    W_ r1, r3;
+    r1 = R1;
+    r3 = R3;
+    unlockClosure(R3, stg_MVAR_DIRTY_info);
+    R1 = r1;
+    R3 = r3;
+    jump StgReturn [R1];
+}
+
+stg_block_atomicreadmvar /* mvar passed in R1 */
+{
+    Sp_adj(-2);
+    Sp(1) = R1;
+    Sp(0) = stg_block_atomicreadmvar_info;
+    R3 = R1; // mvar communicated to stg_block_atomicreadmvar_finally in R3
+    BLOCK_BUT_FIRST(stg_block_atomicreadmvar_finally);
+}
+
 INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, W_ info_ptr,
                 P_ mvar, P_ val )
     return ()
index 43edde2..9129b46 100644 (file)
@@ -1058,6 +1058,7 @@ typedef struct _RtsSymbolVal {
       SymI_HasProto(stg_yield_to_interpreter)                           \
       SymI_HasProto(stg_block_noregs)                                   \
       SymI_HasProto(stg_block_takemvar)                                 \
+      SymI_HasProto(stg_block_atomicreadmvar)                           \
       SymI_HasProto(stg_block_putmvar)                                  \
       MAIN_CAP_SYM                                                      \
       SymI_HasProto(MallocFailHook)                                     \
@@ -1314,6 +1315,7 @@ typedef struct _RtsSymbolVal {
       SymI_HasProto(stg_bh_upd_frame_info)                              \
       SymI_HasProto(suspendThread)                                      \
       SymI_HasProto(stg_takeMVarzh)                                     \
+      SymI_HasProto(stg_atomicReadMVarzh)                               \
       SymI_HasProto(stg_threadStatuszh)                                 \
       SymI_HasProto(stg_tryPutMVarzh)                                   \
       SymI_HasProto(stg_tryTakeMVarzh)                                  \
index a227e77..63babd0 100644 (file)
@@ -1433,7 +1433,7 @@ loop:
         goto loop;
     }
 
-    // There are takeMVar(s) waiting: wake up the first one
+    // There are atomicReadMVar/takeMVar(s) waiting: wake up the first one
 
     tso = StgMVarTSOQueue_tso(q);
     StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
@@ -1441,8 +1441,11 @@ loop:
         StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
     }
 
-    ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
     ASSERT(StgTSO_block_info(tso) == mvar);
+    // save why_blocked here, because waking up the thread destroys
+    // this information
+    W_ why_blocked;
+    why_blocked = TO_W_(StgTSO_why_blocked(tso));
 
     // actually perform the takeMVar
     W_ stack;
@@ -1458,6 +1461,15 @@ loop:
 
     ccall tryWakeupThread(MyCapability() "ptr", tso);
 
+    // If it was an atomicReadMVar, then we can still do work,
+    // so loop back. (XXX: This could take a while)
+    if (why_blocked == BlockedOnMVarRead) {
+        q = StgMVarTSOQueue_link(q);
+        goto loop;
+    }
+
+    ASSERT(why_blocked == BlockedOnMVar);
+
     unlockClosure(mvar, info);
     return ();
 }
@@ -1512,8 +1524,11 @@ loop:
         StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
     }
 
-    ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
     ASSERT(StgTSO_block_info(tso) == mvar);
+    // save why_blocked here, because waking up the thread destroys
+    // this information
+    W_ why_blocked;
+    why_blocked = TO_W_(StgTSO_why_blocked(tso));
 
     // actually perform the takeMVar
     W_ stack;
@@ -1529,10 +1544,68 @@ loop:
 
     ccall tryWakeupThread(MyCapability() "ptr", tso);
 
+    // If it was an atomicReadMVar, then we can still do work,
+    // so loop back. (XXX: This could take a while)
+    if (why_blocked == BlockedOnMVarRead) {
+        q = StgMVarTSOQueue_link(q);
+        goto loop;
+    }
+
+    ASSERT(why_blocked == BlockedOnMVar);
+
     unlockClosure(mvar, info);
     return (1);
 }
 
+stg_atomicReadMVarzh ( P_ mvar, /* :: MVar a */ )
+{
+    W_ val, info, tso, q;
+
+#if defined(THREADED_RTS)
+    ("ptr" info) = ccall lockClosure(mvar "ptr");
+#else
+    info = GET_INFO(mvar);
+#endif
+
+    if (info == stg_MVAR_CLEAN_info) {
+        ccall dirty_MVAR(BaseReg "ptr", mvar "ptr");
+    }
+
+    /* If the MVar is empty, put ourselves on the blocked readers
+     * list and wait until we're woken up.
+     */
+    if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) {
+
+        ALLOC_PRIM_WITH_CUSTOM_FAILURE
+            (SIZEOF_StgMVarTSOQueue,
+             unlockClosure(mvar, stg_MVAR_DIRTY_info);
+             GC_PRIM_P(stg_atomicReadMVarzh, mvar));
+
+        q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
+
+        // readMVars are pushed to the front of the queue, so
+        // they get handled immediately
+        SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
+        StgMVarTSOQueue_link(q) = StgMVar_head(mvar);
+        StgMVarTSOQueue_tso(q)  = CurrentTSO;
+
+        StgTSO__link(CurrentTSO)       = q;
+        StgTSO_block_info(CurrentTSO)  = mvar;
+        StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16;
+        StgMVar_head(mvar) = q;
+
+        if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) {
+            StgMVar_tail(mvar) = q;
+        }
+
+        jump stg_block_atomicreadmvar(mvar);
+    }
+
+    val = StgMVar_value(mvar);
+
+    unlockClosure(mvar, stg_MVAR_DIRTY_info);
+    return (val);
+}
 
 /* -----------------------------------------------------------------------------
    Stable pointer primitives
index 11f518a..edc4a91 100644 (file)
@@ -294,6 +294,7 @@ check_target:
     }
 
     case BlockedOnMVar:
+    case BlockedOnMVarRead:
     {
        /*
          To establish ownership of this TSO, we need to acquire a
@@ -318,7 +319,7 @@ check_target:
 
         // we have the MVar, let's check whether the thread
        // is still blocked on the same MVar.
-       if (target->why_blocked != BlockedOnMVar
+       if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead)
            || (StgMVar *)target->block_info.closure != mvar) {
            unlockClosure((StgClosure *)mvar, info);
            goto retry;
@@ -637,6 +638,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
     goto done;
 
   case BlockedOnMVar:
+  case BlockedOnMVarRead:
       removeFromMVarBlockedQueue(tso);
       goto done;
 
index 336ab30..d804f6b 100644 (file)
@@ -49,6 +49,7 @@ interruptible(StgTSO *t)
 {
   switch (t->why_blocked) {
   case BlockedOnMVar:
+  case BlockedOnMVarRead:
   case BlockedOnMsgThrowTo:
   case BlockedOnRead:
   case BlockedOnWrite:
index 77dc77c..dc21149 100644 (file)
@@ -1672,6 +1672,7 @@ inner_loop:
         retainClosure(tso->bq,                 c, c_child_r);
         retainClosure(tso->trec,               c, c_child_r);
         if (   tso->why_blocked == BlockedOnMVar
+               || tso->why_blocked == BlockedOnMVarRead
                || tso->why_blocked == BlockedOnBlackHole
                || tso->why_blocked == BlockedOnMsgThrowTo
             ) {
index 88bfd8c..408146f 100644 (file)
@@ -947,6 +947,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
            case BlockedOnBlackHole:
            case BlockedOnMsgThrowTo:
            case BlockedOnMVar:
+           case BlockedOnMVarRead:
                throwToSingleThreaded(cap, task->incall->tso, 
                                      (StgClosure *)nonTermination_closure);
                return;
@@ -2843,6 +2844,7 @@ resurrectThreads (StgTSO *threads)
        
        switch (tso->why_blocked) {
        case BlockedOnMVar:
+       case BlockedOnMVarRead:
            /* Called by GC - sched_mutex lock is currently held. */
            throwToSingleThreaded(cap, tso,
                                  (StgClosure *)blockedIndefinitelyOnMVar_closure);
index 4c990f1..f2b8005 100644 (file)
@@ -255,6 +255,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
     switch (tso->why_blocked)
     {
     case BlockedOnMVar:
+    case BlockedOnMVarRead:
     {
         if (tso->_link == END_TSO_QUEUE) {
             tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
@@ -734,6 +735,9 @@ printThreadBlockage(StgTSO *tso)
   case BlockedOnMVar:
     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
     break;
+  case BlockedOnMVarRead:
+    debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure);
+    break;
   case BlockedOnBlackHole:
       debugBelch("is blocked on a black hole %p", 
                  ((StgBlockingQueue*)tso->block_info.bh->bh));
index 78dfead..2190189 100644 (file)
@@ -179,6 +179,7 @@ static char *thread_stop_reasons[] = {
     [ThreadFinished] = "finished",
     [THREAD_SUSPENDED_FOREIGN_CALL] = "suspended while making a foreign call",
     [6 + BlockedOnMVar]         = "blocked on an MVar",
+    [6 + BlockedOnMVarRead]     = "blocked on an atomic MVar read",
     [6 + BlockedOnBlackHole]    = "blocked on a black hole",
     [6 + BlockedOnRead]         = "blocked on a read operation",
     [6 + BlockedOnWrite]        = "blocked on a write operation",
index 9c98dc9..247f1a0 100644 (file)
@@ -442,6 +442,7 @@ thread_TSO (StgTSO *tso)
     thread_(&tso->global_link);
 
     if (   tso->why_blocked == BlockedOnMVar
+        || tso->why_blocked == BlockedOnMVarRead
        || tso->why_blocked == BlockedOnBlackHole
        || tso->why_blocked == BlockedOnMsgThrowTo
         || tso->why_blocked == NotBlocked
index f0e1659..9b579ab 100644 (file)
@@ -519,6 +519,7 @@ checkTSO(StgTSO *tso)
            info == &stg_WHITEHOLE_info); // happens due to STM doing lockTSO()
 
     if (   tso->why_blocked == BlockedOnMVar
+        || tso->why_blocked == BlockedOnMVarRead
        || tso->why_blocked == BlockedOnBlackHole
        || tso->why_blocked == BlockedOnMsgThrowTo
         || tso->why_blocked == NotBlocked
index 6137f6d..e0cc688 100644 (file)
@@ -71,6 +71,7 @@ scavengeTSO (StgTSO *tso)
 
     evacuate((StgClosure **)&tso->_link);
     if (   tso->why_blocked == BlockedOnMVar
+        || tso->why_blocked == BlockedOnMVarRead
        || tso->why_blocked == BlockedOnBlackHole
        || tso->why_blocked == BlockedOnMsgThrowTo
         || tso->why_blocked == NotBlocked