Implement atomicReadMVar, fixing #4001.
[ghc.git] / rts / PrimOps.cmm
index a227e77..63babd0 100644 (file)
@@ -1433,7 +1433,7 @@ loop:
         goto loop;
     }
 
-    // There are takeMVar(s) waiting: wake up the first one
+    // There are atomicReadMVar/takeMVar(s) waiting: wake up the first one
 
     tso = StgMVarTSOQueue_tso(q);
     StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
@@ -1441,8 +1441,11 @@ loop:
         StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
     }
 
-    ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
     ASSERT(StgTSO_block_info(tso) == mvar);
+    // save why_blocked here, because waking up the thread destroys
+    // this information
+    W_ why_blocked;
+    why_blocked = TO_W_(StgTSO_why_blocked(tso));
 
     // actually perform the takeMVar
     W_ stack;
@@ -1458,6 +1461,15 @@ loop:
 
     ccall tryWakeupThread(MyCapability() "ptr", tso);
 
+    // If it was an atomicReadMVar, then we can still do work,
+    // so loop back. (XXX: This could take a while)
+    if (why_blocked == BlockedOnMVarRead) {
+        q = StgMVarTSOQueue_link(q);
+        goto loop;
+    }
+
+    ASSERT(why_blocked == BlockedOnMVar);
+
     unlockClosure(mvar, info);
     return ();
 }
@@ -1512,8 +1524,11 @@ loop:
         StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
     }
 
-    ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
     ASSERT(StgTSO_block_info(tso) == mvar);
+    // save why_blocked here, because waking up the thread destroys
+    // this information
+    W_ why_blocked;
+    why_blocked = TO_W_(StgTSO_why_blocked(tso));
 
     // actually perform the takeMVar
     W_ stack;
@@ -1529,10 +1544,68 @@ loop:
 
     ccall tryWakeupThread(MyCapability() "ptr", tso);
 
+    // If it was an atomicReadMVar, then we can still do work,
+    // so loop back. (XXX: This could take a while)
+    if (why_blocked == BlockedOnMVarRead) {
+        q = StgMVarTSOQueue_link(q);
+        goto loop;
+    }
+
+    ASSERT(why_blocked == BlockedOnMVar);
+
     unlockClosure(mvar, info);
     return (1);
 }
 
+stg_atomicReadMVarzh ( P_ mvar, /* :: MVar a */ )
+{
+    W_ val, info, tso, q;
+
+#if defined(THREADED_RTS)
+    ("ptr" info) = ccall lockClosure(mvar "ptr");
+#else
+    info = GET_INFO(mvar);
+#endif
+
+    if (info == stg_MVAR_CLEAN_info) {
+        ccall dirty_MVAR(BaseReg "ptr", mvar "ptr");
+    }
+
+    /* If the MVar is empty, put ourselves on the blocked readers
+     * list and wait until we're woken up.
+     */
+    if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) {
+
+        ALLOC_PRIM_WITH_CUSTOM_FAILURE
+            (SIZEOF_StgMVarTSOQueue,
+             unlockClosure(mvar, stg_MVAR_DIRTY_info);
+             GC_PRIM_P(stg_atomicReadMVarzh, mvar));
+
+        q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
+
+        // readMVars are pushed to the front of the queue, so
+        // they get handled immediately
+        SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
+        StgMVarTSOQueue_link(q) = StgMVar_head(mvar);
+        StgMVarTSOQueue_tso(q)  = CurrentTSO;
+
+        StgTSO__link(CurrentTSO)       = q;
+        StgTSO_block_info(CurrentTSO)  = mvar;
+        StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16;
+        StgMVar_head(mvar) = q;
+
+        if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) {
+            StgMVar_tail(mvar) = q;
+        }
+
+        jump stg_block_atomicreadmvar(mvar);
+    }
+
+    val = StgMVar_value(mvar);
+
+    unlockClosure(mvar, stg_MVAR_DIRTY_info);
+    return (val);
+}
 
 /* -----------------------------------------------------------------------------
    Stable pointer primitives