Add hs_try_putmvar()
authorSimon Marlow <marlowsd@gmail.com>
Tue, 30 Aug 2016 19:55:10 +0000 (20:55 +0100)
committerSimon Marlow <marlowsd@gmail.com>
Mon, 12 Sep 2016 07:33:24 +0000 (08:33 +0100)
Summary:
This is a fast, non-blocking, asynchronous, interface to tryPutMVar that
can be called from C/C++.

It's useful for callback-based C/C++ APIs: the idea is that the callback
invokes hs_try_putmvar(), and the Haskell code waits for the callback to
run by blocking in takeMVar.

The callback doesn't block - this is often a requirement of
callback-based APIs.  The callback wakes up the Haskell thread with
minimal overhead and no unnecessary context-switches.

There are a couple of benchmarks in
testsuite/tests/concurrent/should_run.  Some example results comparing
hs_try_putmvar() with using a standard foreign export:

    ./hs_try_putmvar003 1 64 16 100 +RTS -s -N4     0.49s
    ./hs_try_putmvar003 2 64 16 100 +RTS -s -N4     2.30s

hs_try_putmvar() is 4x faster for this workload (see the source for
hs_try_putmvar003.hs for details of the workload).

An alternative solution is to use the IO Manager for this.  We've tried
it, but there are problems with that approach:
* Need to create a new file descriptor for each callback
* The IO Manger thread(s) become a bottleneck
* More potential for things to go wrong, e.g. throwing an exception in
  an IO Manager callback kills the IO Manager thread.

Test Plan: validate; new unit tests

Reviewers: niteria, erikd, ezyang, bgamari, austin, hvr

Subscribers: thomie

Differential Revision: https://phabricator.haskell.org/D2501

24 files changed:
docs/users_guide/ffi-chap.rst
includes/HsFFI.h
libraries/base/GHC/Conc.hs
libraries/base/GHC/Conc/Sync.hs
rts/Capability.c
rts/Capability.h
rts/Prelude.h
rts/PrimOps.cmm
rts/RtsAPI.c
rts/Schedule.c
rts/Task.c
rts/Task.h
rts/Threads.c
rts/Threads.h
rts/package.conf.in
testsuite/tests/concurrent/should_run/Makefile
testsuite/tests/concurrent/should_run/all.T
testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs [new file with mode: 0644]
testsuite/tests/concurrent/should_run/hs_try_putmvar001.stdout [new file with mode: 0644]
testsuite/tests/concurrent/should_run/hs_try_putmvar001_c.c [new file with mode: 0644]
testsuite/tests/concurrent/should_run/hs_try_putmvar002.hs [new file with mode: 0644]
testsuite/tests/concurrent/should_run/hs_try_putmvar002_c.c [new file with mode: 0644]
testsuite/tests/concurrent/should_run/hs_try_putmvar003.hs [new file with mode: 0644]
testsuite/tests/concurrent/should_run/hs_try_putmvar003_c.c [new file with mode: 0644]

index 63324d1..f46d902 100644 (file)
@@ -616,6 +616,125 @@ the threads have exited first. (Unofficially, if you want to use this
 fast and loose version of ``hs_exit()``, then call
 ``shutdownHaskellAndExit()`` instead).
 
+.. _hs_try_putmvar:
+
+Waking up Haskell threads from C
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Sometimes we want to be able to wake up a Haskell thread from some C
+code.  For example, when using a callback-based C API, we register a C
+callback and then we need to wait for the callback to run.
+
+One way to do this is to create a ``foreign export`` that will do
+whatever needs to be done to wake up the Haskell thread - perhaps
+``putMVar`` - and then call this from our C callback.  There are a
+couple of problems with this:
+
+1. Calling a foreign export has a lot of overhead: it creates a
+   complete new Haskell thread, for example.
+2. The call may block for a long time if a GC is in progress.  We
+   can't use this method if the C API we're calling doesn't allow
+   blocking in the callback.
+
+For these reasons GHC provides an external API to ``tryPutMVar``,
+``hs_try_putmvar``, which you can use to cheaply and asynchronously
+wake up a Haskell thread from C/C++.
+
+.. code-block:: c
+
+  void hs_try_putmvar (int capability, HsStablePtr sp);
+
+The C call ``hs_try_putmvar(cap, mvar)`` is equivalent to the Haskell
+call ``tryPutMVar mvar ()``, except that it is
+
+* non-blocking: takes a bounded, short, amount of time
+
+* asynchronous: the actual putMVar may be performed after the call
+  returns (for example, if the RTS is currently garbage collecting).
+  That's why ``hs_try_putmvar()`` doesn't return a result to say
+  whether the put succeeded.  It is your responsibility to ensure that
+  the ``MVar`` is empty; if it is full, ``hs_try_putmvar()`` will have
+  no effect.
+
+**Example**. Suppose we have a C/C++ function to call that will return and then
+invoke a callback at some point in the future, passing us some data.
+We want to wait in Haskell for the callback to be called, and retrieve
+the data.  We can do it like this:
+
+.. code-block:: haskell
+
+     import GHC.Conc (newStablePtrPrimMVar, PrimMVar)
+
+     makeExternalCall = mask_ $ do
+       mvar <- newEmptyMVar
+       sp <- newStablePtrPrimMVar mvar
+       fp <- mallocForeignPtr
+       withForeignPtr fp $ \presult -> do
+         cap <- threadCapability =<< myThreadId
+         scheduleCallback sp cap presult
+         takeMVar mvar `onException`
+           forkIO (do takeMVar mvar; touchForeignPtr fp)
+         peek presult
+
+     foreign import ccall "scheduleCallback"
+         scheduleCallback :: StablePtr PrimMVar
+                          -> Int
+                          -> Ptr Result
+                          -> IO ()
+
+And inside ``scheduleCallback``, we create a callback that will in due
+course store the result data in the ``Ptr Result``, and then call
+``hs_try_putmvar()``.
+
+There are a few things to note here.
+
+* There's a special function to create the ``StablePtr``:
+  ``newStablePtrPrimMVar``, because the RTS needs a ``StablePtr`` to
+  the primitive ``MVar#`` object, and we can't create that directly.
+  Do *not* just use ``newStablePtr`` on the ``MVar``: your program
+  will crash.
+
+* The ``StablePtr`` is freed by ``hs_try_putmvar()``.  This is because
+  it would otherwise be difficult to arrange to free the ``StablePtr``
+  reliably: we can't free it in Haskell, because if the ``takeMVar``
+  is interrupted by an asynchronous exception, then the callback will
+  fire at a later time.  We can't free it in C, because we don't know
+  when to free it (not when ``hs_try_putmvar()`` returns, because that
+  is an async call that uses the ``StablePtr`` at some time in the
+  future).
+
+* The ``mask_`` is to avoid asynchronous exceptions before the
+  ``scheduleCallback`` call, which would leak the ``StablePtr``.
+
+* We find out the current capability number and pass it to C.  This is
+  passed back to ``hs_try_putmvar``, and helps the RTS to know which
+  capability it should try to perform the ``tryPutMVar`` on.  If you
+  don't care, you can pass ``-1`` for the capability to
+  ``hs_try_putmvar``, and it will pick an arbitrary one.
+
+  Picking the right capability will help avoid unnecessary context
+  switches.  Ideally you should pass the capability that the thread
+  that will be woken up last ran on, which you can find by calling
+  ``threadCapability`` in Haskell.
+
+* If you want to also pass some data back from the C callback to
+  Haskell, this is best done by first allocating some memory in
+  Haskell to receive the data, and passing the address to C, as we did
+  in the above example.
+
+* ``takeMVar`` can be interrupted by an asynchronous exception.  If
+  this happens, the callback in C will still run at some point in the
+  future, will still write the result, and will still call
+  ``hs_try_putmvar()``.  Therefore we have to arrange that the memory
+  for the result stays alive until the callback has run, so if an
+  exception is thrown during ``takeMVar`` we fork another thread to
+  wait for the callback and hold the memory alive using
+  ``touchForeignPtr``.
+
+For a fully working example, see
+``testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs`` in the
+GHC source tree.
+
 .. _ffi-floating-point:
 
 Floating point and the FFI
index 4f015b6..cdf4510 100644 (file)
@@ -113,8 +113,12 @@ extern StgPtr hs_spt_lookup(StgWord64 key[2]);
 extern int hs_spt_keys(StgPtr keys[], int szKeys);
 extern int hs_spt_key_count (void);
 
+extern void hs_try_putmvar (int capability, HsStablePtr sp);
+
 /* -------------------------------------------------------------------------- */
 
+
+
 #ifdef __cplusplus
 }
 #endif
index 38fac43..afc0a97 100644 (file)
@@ -50,6 +50,8 @@ module GHC.Conc
         , threadStatus
         , threadCapability
 
+        , newStablePtrPrimMVar, PrimMVar
+
         -- * Waiting
         , threadDelay
         , registerDelay
index 5476950..5986379 100644 (file)
@@ -59,6 +59,8 @@ module GHC.Conc.Sync
         , threadStatus
         , threadCapability
 
+        , newStablePtrPrimMVar, PrimMVar
+
         -- * Allocation counter and quota
         , setAllocationCounter
         , getAllocationCounter
@@ -117,6 +119,7 @@ import GHC.MVar
 import GHC.Ptr
 import GHC.Real         ( fromIntegral )
 import GHC.Show         ( Show(..), showString )
+import GHC.Stable       ( StablePtr(..) )
 import GHC.Weak
 
 infixr 0 `par`, `pseq`
@@ -615,6 +618,17 @@ mkWeakThreadId t@(ThreadId t#) = IO $ \s ->
       (# s1, w #) -> (# s1, Weak w #)
 
 
+data PrimMVar
+
+-- | Make a StablePtr that can be passed to the C function
+-- @hs_try_putmvar()@.  The RTS wants a 'StablePtr' to the underlying
+-- 'MVar#', but a 'StablePtr#' can only refer to lifted types, so we
+-- have to cheat by coercing.
+newStablePtrPrimMVar :: MVar () -> IO (StablePtr PrimMVar)
+newStablePtrPrimMVar (MVar m) = IO $ \s0 ->
+  case makeStablePtr# (unsafeCoerce# m :: PrimMVar) s0 of
+    (# s1, sp #) -> (# s1, StablePtr sp #)
+
 -----------------------------------------------------------------------------
 -- Transactional heap operations
 -----------------------------------------------------------------------------
index 681797b..6979c63 100644 (file)
@@ -266,6 +266,7 @@ initCapability (Capability *cap, uint32_t i)
     cap->returning_tasks_tl = NULL;
     cap->n_returning_tasks  = 0;
     cap->inbox              = (Message*)END_TSO_QUEUE;
+    cap->putMVars           = NULL;
     cap->sparks             = allocSparkPool();
     cap->spark_stats.created    = 0;
     cap->spark_stats.dud        = 0;
index 8e0288b..bbf0262 100644 (file)
@@ -123,6 +123,7 @@ struct Capability_ {
     //    returning_tasks_{hd,tl}
     //    wakeup_queue
     //    inbox
+    //    putMVars
     Mutex lock;
 
     // Tasks waiting to return from a foreign call, or waiting to make
@@ -138,6 +139,10 @@ struct Capability_ {
     // Locks required: cap->lock
     Message *inbox;
 
+    // putMVars are really messages, but they're allocated with malloc() so they
+    // can't go on the inbox queue: the GC would get confused.
+    struct PutMVar_ *putMVars;
+
     SparkPool *sparks;
 
     // Stats on spark creation/conversion
@@ -378,6 +383,11 @@ extern uint32_t numa_map[MAX_NUMA_NODES];
    Messages
    -------------------------------------------------------------------------- */
 
+typedef struct PutMVar_ {
+    StgStablePtr mvar;
+    struct PutMVar_ *link;
+} PutMVar;
+
 #ifdef THREADED_RTS
 
 INLINE_HEADER rtsBool emptyInbox(Capability *cap);
@@ -459,7 +469,8 @@ contextSwitchCapability (Capability *cap)
 
 INLINE_HEADER rtsBool emptyInbox(Capability *cap)
 {
-    return (cap->inbox == (Message*)END_TSO_QUEUE);
+    return (cap->inbox == (Message*)END_TSO_QUEUE &&
+            cap->putMVars == NULL);
 }
 
 #endif
index ae1e9cb..58de230 100644 (file)
@@ -24,6 +24,7 @@
  * modules these names are defined in.
  */
 
+PRELUDE_CLOSURE(ghczmprim_GHCziTuple_Z0T_closure);
 PRELUDE_CLOSURE(ghczmprim_GHCziTypes_True_closure);
 PRELUDE_CLOSURE(ghczmprim_GHCziTypes_False_closure);
 PRELUDE_CLOSURE(base_GHCziPack_unpackCString_closure);
@@ -87,6 +88,7 @@ PRELUDE_INFO(base_GHCziWord_W64zh_con_info);
 PRELUDE_INFO(base_GHCziStable_StablePtr_static_info);
 PRELUDE_INFO(base_GHCziStable_StablePtr_con_info);
 
+#define Unit_closure              DLL_IMPORT_DATA_REF(ghczmprim_GHCziTuple_Z0T_closure)
 #define True_closure              DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_True_closure)
 #define False_closure             DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_False_closure)
 #define unpackCString_closure     DLL_IMPORT_DATA_REF(base_GHCziPack_unpackCString_closure)
index b468c33..02a7daf 100644 (file)
@@ -1739,6 +1739,13 @@ loop:
 }
 
 
+// NOTE: there is another implementation of this function in
+// Threads.c:performTryPutMVar().  Keep them in sync!  It was
+// measurably slower to call the C function from here (70% for a
+// tight loop doing tryPutMVar#).
+//
+// TODO: we could kill the duplication by making tryPutMVar# into an
+// inline primop that expands into a C call to performTryPutMVar().
 stg_tryPutMVarzh ( P_ mvar, /* :: MVar a */
                    P_ val,  /* :: a */ )
 {
@@ -1812,6 +1819,7 @@ loop:
     return (1);
 }
 
+
 stg_readMVarzh ( P_ mvar, /* :: MVar a */ )
 {
     W_ val, info, tso, q;
index 34d68c7..e724307 100644 (file)
@@ -16,6 +16,7 @@
 #include "Schedule.h"
 #include "Capability.h"
 #include "Stable.h"
+#include "Threads.h"
 #include "Weak.h"
 
 /* ----------------------------------------------------------------------------
@@ -620,3 +621,77 @@ void rts_done (void)
     freeMyTask();
 }
 
+/* -----------------------------------------------------------------------------
+   tryPutMVar from outside Haskell
+
+   The C call
+
+      hs_try_putmvar(cap, mvar)
+
+   is equivalent to the Haskell call
+
+      tryPutMVar mvar ()
+
+   but it is
+
+     * non-blocking: takes a bounded, short, amount of time
+     * asynchronous: the actual putMVar may be performed after the
+       call returns.  That's why hs_try_putmvar() doesn't return a
+       result to say whether the put succeeded.
+
+   NOTE: this call transfers ownership of the StablePtr to the RTS, which will
+   free it after the tryPutMVar has taken place.  The reason is that otherwise,
+   it would be very difficult for the caller to arrange to free the StablePtr
+   in all circumstances.
+
+   For more details, see the section "Waking up Haskell threads from C" in the
+   User's Guide.
+   -------------------------------------------------------------------------- */
+
+void hs_try_putmvar (/* in */ int capability,
+                     /* in */ HsStablePtr mvar)
+{
+    Task *task = getTask();
+    Capability *cap;
+
+    if (capability < 0) {
+        capability = task->preferred_capability;
+        if (capability < 0) {
+            capability = 0;
+        }
+    }
+    cap = capabilities[capability % enabled_capabilities];
+
+#if !defined(THREADED_RTS)
+
+    performTryPutMVar(cap, (StgMVar*)deRefStablePtr(mvar), Unit_closure);
+    freeStablePtr(mvar);
+
+#else
+
+    ACQUIRE_LOCK(&cap->lock);
+    // If the capability is free, we can perform the tryPutMVar immediately
+    if (cap->running_task == NULL) {
+        cap->running_task = task;
+        task->cap = cap;
+        RELEASE_LOCK(&cap->lock);
+
+        performTryPutMVar(cap, (StgMVar*)deRefStablePtr(mvar), Unit_closure);
+
+        freeStablePtr(mvar);
+
+        // Wake up the capability, which will start running the thread that we
+        // just awoke (if there was one).
+        releaseCapability(cap);
+    } else {
+        PutMVar *p = stgMallocBytes(sizeof(PutMVar),"hs_try_putmvar");
+        // We cannot deref the StablePtr if we don't have a capability,
+        // so we have to store it and deref it later.
+        p->mvar = mvar;
+        p->link = cap->putMVars;
+        cap->putMVars = p;
+        RELEASE_LOCK(&cap->lock);
+    }
+
+#endif
+}
index 544b9c2..611d704 100644 (file)
@@ -723,7 +723,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
         if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
             if (!emptyRunQueue(cap0)
                 || cap0->n_returning_tasks != 0
-                || cap0->inbox != (Message*)END_TSO_QUEUE) {
+                || !emptyInbox(cap0)) {
                 // it already has some work, we just grabbed it at
                 // the wrong moment.  Or maybe it's deadlocked!
                 releaseCapability(cap0);
@@ -982,6 +982,7 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
     Message *m, *next;
+    PutMVar *p, *pnext;
     int r;
     Capability *cap = *pcap;
 
@@ -1006,7 +1007,9 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
         if (r != 0) return;
 
         m = cap->inbox;
+        p = cap->putMVars;
         cap->inbox = (Message*)END_TSO_QUEUE;
+        cap->putMVars = NULL;
 
         RELEASE_LOCK(&cap->lock);
 
@@ -1015,10 +1018,20 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
             executeMessage(cap, m);
             m = next;
         }
+
+        while (p != NULL) {
+            pnext = p->link;
+            performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
+                              Unit_closure);
+            freeStablePtr(p->mvar);
+            stgFree(p);
+            p = pnext;
+        }
     }
 #endif
 }
 
+
 /* ----------------------------------------------------------------------------
  * Activate spark threads (THREADED_RTS)
  * ------------------------------------------------------------------------- */
index 9a658e0..253520f 100644 (file)
@@ -36,7 +36,6 @@ uint32_t peakWorkerCount;
 static int tasksInitialized = 0;
 
 static void   freeTask  (Task *task);
-static Task * allocTask (void);
 static Task * newTask   (rtsBool);
 
 #if defined(THREADED_RTS)
@@ -117,8 +116,7 @@ freeTaskManager (void)
     return tasksRunning;
 }
 
-static Task *
-allocTask (void)
+Task* getTask (void)
 {
     Task *task;
 
@@ -209,7 +207,7 @@ newTask (rtsBool worker)
 
     task->cap           = NULL;
     task->worker        = worker;
-    task->stopped       = rtsFalse;
+    task->stopped       = rtsTrue;
     task->running_finalizers = rtsFalse;
     task->n_spare_incalls = 0;
     task->spare_incalls = NULL;
@@ -304,7 +302,7 @@ newBoundTask (void)
         stg_exit(EXIT_FAILURE);
     }
 
-    task = allocTask();
+    task = getTask();
 
     task->stopped = rtsFalse;
 
@@ -452,6 +450,7 @@ startWorkerTask (Capability *cap)
 
   // A worker always gets a fresh Task structure.
   task = newTask(rtsTrue);
+  task->stopped = rtsFalse;
 
   // The lock here is to synchronise with taskStart(), to make sure
   // that we have finished setting up the Task structure before the
@@ -499,7 +498,7 @@ void rts_setInCallCapability (
     int preferred_capability,
     int affinity USED_IF_THREADS)
 {
-    Task *task = allocTask();
+    Task *task = getTask();
     task->preferred_capability = preferred_capability;
 
 #ifdef THREADED_RTS
index 558f543..9323459 100644 (file)
@@ -150,7 +150,8 @@ typedef struct Task_ {
     struct InCall_ *spare_incalls;
 
     rtsBool    worker;          // == rtsTrue if this is a worker Task
-    rtsBool    stopped;         // this task has stopped or exited Haskell
+    rtsBool    stopped;         // == rtsTrue between newBoundTask and
+                                // boundTaskExiting, or in a worker Task.
 
     // So that we can detect when a finalizer illegally calls back into Haskell
     rtsBool running_finalizers;
@@ -205,7 +206,12 @@ uint32_t  freeTaskManager (void);
 // thread-local storage and will remain even after boundTaskExiting()
 // has been called; to free the memory, see freeMyTask().
 //
-Task *newBoundTask (void);
+Task* newBoundTask (void);
+
+// Return the current OS thread's Task, which is created if it doesn't already
+// exist.  After you have finished using RTS APIs, you should call freeMyTask()
+// to release this thread's Task.
+Task* getTask (void);
 
 // The current task is a bound task that is exiting.
 //
index 7317249..1782da6 100644 (file)
@@ -744,6 +744,85 @@ threadStackUnderflow (Capability *cap, StgTSO *tso)
 }
 
 /* ----------------------------------------------------------------------------
+   Implementation of tryPutMVar#
+
+   NOTE: this should be kept in sync with stg_tryPutMVarzh in PrimOps.cmm
+   ------------------------------------------------------------------------- */
+
+rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value)
+{
+    const StgInfoTable *info;
+    StgMVarTSOQueue *q;
+    StgTSO *tso;
+
+    info = lockClosure((StgClosure*)mvar);
+
+    if (mvar->value != &stg_END_TSO_QUEUE_closure) {
+#if defined(THREADED_RTS)
+        unlockClosure((StgClosure*)mvar, info);
+#endif
+        return rtsFalse;
+    }
+
+    q = mvar->head;
+loop:
+    if (q == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
+        /* No further takes, the MVar is now full. */
+        if (info == &stg_MVAR_CLEAN_info) {
+            dirty_MVAR(&cap->r, (StgClosure*)mvar);
+        }
+
+        mvar->value = value;
+        unlockClosure((StgClosure*)mvar, &stg_MVAR_DIRTY_info);
+        return rtsTrue;
+    }
+    if (q->header.info == &stg_IND_info ||
+        q->header.info == &stg_MSG_NULL_info) {
+        q = (StgMVarTSOQueue*)((StgInd*)q)->indirectee;
+        goto loop;
+    }
+
+    // There are takeMVar(s) waiting: wake up the first one
+    tso = q->tso;
+    mvar->head = q->link;
+    if (mvar->head == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
+        mvar->tail = (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure;
+    }
+
+    ASSERT(tso->block_info.closure == (StgClosure*)mvar);
+    // save why_blocked here, because waking up the thread destroys
+    // this information
+    StgWord why_blocked = tso->why_blocked;
+
+    // actually perform the takeMVar
+    StgStack* stack = tso->stackobj;
+    stack->sp[1] = (W_)value;
+    stack->sp[0] = (W_)&stg_ret_p_info;
+
+    // indicate that the MVar operation has now completed.
+    tso->_link = (StgTSO*)&stg_END_TSO_QUEUE_closure;
+
+    if (stack->dirty == 0) {
+        dirty_STACK(cap, stack);
+    }
+
+    tryWakeupThread(cap, tso);
+
+    // If it was an readMVar, then we can still do work,
+    // so loop back. (XXX: This could take a while)
+    if (why_blocked == BlockedOnMVarRead) {
+        q = ((StgMVarTSOQueue*)q)->link;
+        goto loop;
+    }
+
+    ASSERT(why_blocked == BlockedOnMVar);
+
+    unlockClosure((StgClosure*)mvar, info);
+
+    return rtsTrue;
+}
+
+/* ----------------------------------------------------------------------------
  * Debugging: why is a thread blocked
  * ------------------------------------------------------------------------- */
 
index 01c493e..4588008 100644 (file)
@@ -41,6 +41,8 @@ StgBool isThreadBound (StgTSO* tso);
 void threadStackOverflow  (Capability *cap, StgTSO *tso);
 W_   threadStackUnderflow (Capability *cap, StgTSO *tso);
 
+rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value);
+
 #ifdef DEBUG
 void printThreadBlockage (StgTSO *tso);
 void printThreadStatus (StgTSO *t);
index 65aa5c3..03848c4 100644 (file)
@@ -103,6 +103,7 @@ ld-options:
          , "-Wl,-u,_base_GHCziPtr_Ptr_con_info"
          , "-Wl,-u,_base_GHCziPtr_FunPtr_con_info"
          , "-Wl,-u,_base_GHCziStable_StablePtr_con_info"
+         , "-Wl,-u,_ghczmprim_GHCziTuple_Z0T_closure"
          , "-Wl,-u,_ghczmprim_GHCziTypes_False_closure"
          , "-Wl,-u,_ghczmprim_GHCziTypes_True_closure"
          , "-Wl,-u,_base_GHCziPack_unpackCString_closure"
@@ -199,6 +200,7 @@ ld-options:
          , "-Wl,-u,base_GHCziPtr_Ptr_con_info"
          , "-Wl,-u,base_GHCziPtr_FunPtr_con_info"
          , "-Wl,-u,base_GHCziStable_StablePtr_con_info"
+         , "-Wl,-u,ghczmprim_GHCziTuple_Z0T_closure"
          , "-Wl,-u,ghczmprim_GHCziTypes_False_closure"
          , "-Wl,-u,ghczmprim_GHCziTypes_True_closure"
          , "-Wl,-u,base_GHCziPack_unpackCString_closure"
index c6bef49..26f13c5 100644 (file)
@@ -4,3 +4,9 @@ include $(TOP)/mk/test.mk
 
 conc059_setup :
        '$(TEST_HC)' $(TEST_HC_OPTS) -c conc059.hs
+
+hs_try_putmvar002_setup :
+       '$(TEST_HC)' $(TEST_HC_OPTS) -c hs_try_putmvar002.hs
+
+hs_try_putmvar003_setup :
+       '$(TEST_HC)' $(TEST_HC_OPTS) -c hs_try_putmvar003.hs
index 9f2ed28..6eda000 100644 (file)
@@ -255,3 +255,34 @@ test('setnumcapabilities001',
 
 # omit ghci, which can't handle unboxed tuples:
 test('compareAndSwap', [omit_ways(['ghci','hpc']), reqlib('primitive')], compile_and_run, [''])
+
+test('hs_try_putmvar001',
+     [
+     when(opsys('mingw32'),skip), # uses pthread APIs in the C code
+     only_ways(['threaded1','threaded2']),
+      extra_clean(['hs_try_putmvar001_c.o'])],
+     compile_and_run,
+     ['hs_try_putmvar001_c.c'])
+
+# A benchmark for hs_try_putmvar() vs. foreign export
+# This one should work for both threaded and non-threaded RTS
+test('hs_try_putmvar002',
+     [
+     pre_cmd('$MAKE -s --no-print-directory hs_try_putmvar002_setup'),
+     extra_clean(['hs_try_putmvar002_c.o']),
+     extra_run_opts('1 8 10000')
+     ],
+     compile_and_run,
+     ['hs_try_putmvar002_c.c'])
+
+# Another benchmark for hs_try_putmvar() vs. foreign export
+test('hs_try_putmvar003',
+     [
+     when(opsys('mingw32'),skip), # uses pthread APIs in the C code
+     pre_cmd('$MAKE -s --no-print-directory hs_try_putmvar003_setup'),
+     only_ways(['threaded1','threaded2']),
+     extra_clean(['hs_try_putmvar003_c.o']),
+     extra_run_opts('1 16 32 100')
+     ],
+     compile_and_run,
+     ['hs_try_putmvar003_c.c'])
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs b/testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs
new file mode 100644 (file)
index 0000000..af4eabb
--- /dev/null
@@ -0,0 +1,34 @@
+{-# LANGUAGE MagicHash #-}
+module Main where
+
+import Control.Concurrent
+import Control.Exception
+import Foreign
+import Foreign.C
+import GHC.Conc
+import GHC.Prim
+
+-- Sample code demonstrating proper use of hs_try_putmvar()
+
+main = do
+   makeExternalCall >>= print
+   threadDelay 100000
+
+makeExternalCall :: IO CInt
+makeExternalCall = mask_ $ do
+  mvar <- newEmptyMVar
+  sp <- newStablePtrPrimMVar mvar -- freed by hs_try_takemvar()
+  fp <- mallocForeignPtr
+  withForeignPtr fp $ \presult -> do
+    (cap,_) <- threadCapability =<< myThreadId
+    scheduleCallback sp cap presult
+    takeMVar mvar `onException` forkIO (do takeMVar mvar; touchForeignPtr fp)
+      -- the C callback will still run if takeMVar is interrupted, so the
+      -- exception handler keeps the result memory alive long enough.
+    peek presult
+
+foreign import ccall "scheduleCallback"
+  scheduleCallback :: StablePtr PrimMVar
+                   -> Int
+                   -> Ptr CInt
+                   -> IO ()
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar001.stdout b/testsuite/tests/concurrent/should_run/hs_try_putmvar001.stdout
new file mode 100644 (file)
index 0000000..d81cc07
--- /dev/null
@@ -0,0 +1 @@
+42
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar001_c.c b/testsuite/tests/concurrent/should_run/hs_try_putmvar001_c.c
new file mode 100644 (file)
index 0000000..f214c5c
--- /dev/null
@@ -0,0 +1,31 @@
+#include "HsFFI.h"
+#include "Rts.h"
+#include "RtsAPI.h"
+#include <unistd.h>
+#include <pthread.h>
+
+struct callback {
+    HsStablePtr mvar;
+    int cap;
+    int *presult;
+};
+
+void* callback(struct callback *p)
+{
+    usleep(200);
+    *p->presult = 42;
+    hs_try_putmvar(p->cap,p->mvar);
+    free(p);
+    hs_thread_done();
+    return NULL;
+}
+
+void scheduleCallback(HsStablePtr mvar, HsInt cap, int *presult)
+{
+    pthread_t t;
+    struct callback *p = malloc(sizeof(struct callback));
+    p->mvar = mvar;
+    p->cap = cap;
+    p->presult = presult;
+    pthread_create(&t, NULL, callback, p);
+}
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar002.hs b/testsuite/tests/concurrent/should_run/hs_try_putmvar002.hs
new file mode 100644 (file)
index 0000000..a8eac42
--- /dev/null
@@ -0,0 +1,66 @@
+{-# LANGUAGE MagicHash #-}
+module Main where
+
+import Control.Concurrent
+import Control.Exception
+import Control.Monad
+import Foreign hiding (void)
+import Foreign.C
+import GHC.Conc
+import GHC.Prim
+import System.Environment
+
+-- Measure raw throughput, for M threads that each do N calls to C
+-- that call back to hs_try_putmvar() or the foreign export equivalent
+
+main = do
+   args <- getArgs
+   case args of
+     ["1",n,m] -> experiment2 (read m) (experiment1 (read n))
+     ["2",n,m] -> experiment2 (read m) (experiment1FE (read n))
+
+-- -----------------------------------------------------------------------------
+
+experiment1 :: Int -> IO ()
+experiment1 n = mask_ $ do
+  mvar <- newEmptyMVar
+  (cap,_) <- threadCapability =<< myThreadId
+  replicateM_ n $ do
+    sp <- newStablePtrPrimMVar mvar
+    externalPutMVar sp cap
+    takeMVar mvar
+
+foreign import ccall "externalPutMVar"
+  externalPutMVar :: StablePtr PrimMVar
+                  -> Int
+                  -> IO ()
+
+experiment1FE :: Int -> IO ()
+experiment1FE n = do
+  mvar <- newEmptyMVar
+  (cap,_) <- threadCapability =<< myThreadId
+  bracket (newStablePtr mvar) freeStablePtr $ \sp -> do
+    replicateM_ n $ do externalPutMVarFE sp cap; takeMVar mvar
+
+foreign import ccall "externalPutMVarFE"
+  externalPutMVarFE :: StablePtr (MVar ())
+                    -> Int
+                   -> IO ()
+
+callbackPutMVar :: StablePtr (MVar ()) -> IO ()
+callbackPutMVar sp = do
+  mvar <- deRefStablePtr sp
+  void $ tryPutMVar mvar ()
+
+foreign export ccall callbackPutMVar :: StablePtr (MVar ()) -> IO ()
+
+-- -----------------------------------------------------------------------------
+-- Perform M copies of experiment1 concurrently
+
+experiment2 :: Int -> IO () -> IO ()
+experiment2 m exp = do
+  mvars <- replicateM m $ do
+    m <- newEmptyMVar
+    forkFinally exp (\_ -> putMVar m ())
+    return m
+  mapM_ takeMVar mvars
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar002_c.c b/testsuite/tests/concurrent/should_run/hs_try_putmvar002_c.c
new file mode 100644 (file)
index 0000000..34a339d
--- /dev/null
@@ -0,0 +1,28 @@
+#include "HsFFI.h"
+#include <unistd.h>
+#include <pthread.h>
+#include "hs_try_putmvar002_stub.h"
+
+void externalPutMVar(HsStablePtr mvar, HsInt cap)
+{
+    hs_try_putmvar(cap,mvar);
+}
+
+void externalPutMVarFE(HsStablePtr mvar, HsInt cap)
+{
+    callbackPutMVar(mvar);
+}
+
+void externalManyPutMVars(HsStablePtr mvar, HsInt n, HsInt cap)
+{
+    for (int i = 0; i < n; i++) {
+        hs_try_putmvar(cap,mvar);
+    }
+}
+
+void externalManyPutMVarsFE(HsStablePtr mvar, HsInt n, HsInt cap)
+{
+    for (int i = 0; i < n; i++) {
+        callbackPutMVar(mvar);
+    }
+}
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar003.hs b/testsuite/tests/concurrent/should_run/hs_try_putmvar003.hs
new file mode 100644 (file)
index 0000000..d74a9cb
--- /dev/null
@@ -0,0 +1,88 @@
+{-# LANGUAGE MagicHash #-}
+module Main where
+
+import Control.Concurrent
+import Control.Exception
+import Control.Monad
+import Foreign hiding (void)
+import Foreign.C
+import GHC.Conc
+import GHC.MVar (MVar(..))
+import GHC.Prim
+import System.Environment
+
+-- Measure C to Haskell callback throughput under a workload with
+-- several dimensions:
+--
+--  * X callback queues (each managed by an OS thread in C)
+--  * each queue has Y Haskell threads, each making Z requests
+--
+-- And we can run the whole thing in two ways:
+--  * With the callbacks calling into a foreign export
+--  * With the callbacks using hs_try_putmvar()
+--
+-- Example results (using WAY=threaded2)
+--
+--  hs_try_putmvar003 1 64 16 500 +RTS -s -N4    1.10s
+--  hs_try_putmvar003 2 64 16 500 +RTS -s -N4    9.88s
+--
+-- hs_try_putmvar() is 9x faster with these parameters.
+
+main = do
+   args <- getArgs
+   case args of
+     ["1",x,y,z] -> experiment False (read x) (read y) (read z)
+     ["2",x,y,z] -> experiment True (read x) (read y) (read z)
+
+makeExternalCall :: Ptr CallbackQueue -> IO CInt
+makeExternalCall q = mask_ $ do
+  mvar <- newEmptyMVar
+  sp <- newStablePtrPrimMVar mvar
+  fp <- mallocForeignPtr
+  (cap,_) <- threadCapability =<< myThreadId
+  withForeignPtr fp $ \presult -> do
+    scheduleCallback q sp cap presult
+    takeMVar mvar `onException` forkIO (do takeMVar mvar; touchForeignPtr fp)
+    peek presult
+
+data CallbackQueue
+
+foreign import ccall "mkCallbackQueue"
+  mkCallbackQueue :: Int -> IO (Ptr CallbackQueue)
+
+foreign import ccall "destroyCallbackQueue"
+  destroyCallbackQueue :: Ptr CallbackQueue -> IO ()
+
+foreign import ccall "scheduleCallback"
+  scheduleCallback :: Ptr CallbackQueue
+                   -> StablePtr PrimMVar
+                   -> Int
+                   -> Ptr CInt
+                   -> IO ()
+
+callbackPutMVar :: StablePtr PrimMVar -> IO ()
+callbackPutMVar sp = do
+  mvar <- deRefStablePtr sp
+  void $ tryPutMVar (MVar (unsafeCoerce# mvar)) ()
+
+foreign export ccall callbackPutMVar :: StablePtr PrimMVar -> IO ()
+
+-- Make
+--   * x callback queues, each with
+--   * y threads, doing
+--   * z requests each
+experiment :: Bool -> Int -> Int -> Int -> IO ()
+experiment use_foreign_export x y z = do
+  mvars <- replicateM x $ async $ do
+    bracket (mkCallbackQueue (fromEnum use_foreign_export))
+            destroyCallbackQueue $ \q -> do
+      mvars <- replicateM y $ async $
+        replicateM_ z $ void $ makeExternalCall q
+      mapM_ takeMVar mvars
+  mapM_ takeMVar mvars
+
+async :: IO () -> IO (MVar ())
+async io = do
+  m <- newEmptyMVar
+  forkFinally io (\_ -> putMVar m ())
+  return m
diff --git a/testsuite/tests/concurrent/should_run/hs_try_putmvar003_c.c b/testsuite/tests/concurrent/should_run/hs_try_putmvar003_c.c
new file mode 100644 (file)
index 0000000..c499b72
--- /dev/null
@@ -0,0 +1,83 @@
+#include "HsFFI.h"
+#include "Rts.h"
+#include "RtsAPI.h"
+#include <unistd.h>
+#include <pthread.h>
+#include "hs_try_putmvar003_stub.h"
+
+struct callback_queue {
+    pthread_mutex_t lock;
+    pthread_cond_t cond;
+    int use_foreign_export;
+    struct callback *pending;
+};
+
+struct callback {
+    HsStablePtr mvar;
+    int cap;
+    int *presult;
+    struct callback *next;
+};
+
+void* callback(struct callback_queue *q)
+{
+    struct callback *cb;
+
+    pthread_mutex_lock(&q->lock);
+    do {
+        if (q->pending == NULL) {
+            pthread_cond_wait(&q->cond,&q->lock);
+        }
+        if (q->pending != NULL) {
+            cb = q->pending;
+            q->pending = cb->next;
+            *cb->presult = 42;
+            if (q->use_foreign_export) {
+                callbackPutMVar(cb->mvar);
+            } else {
+                hs_try_putmvar(cb->cap,cb->mvar);
+            }
+            free(cb);
+        }
+    } while (1);
+    pthread_mutex_unlock(&q->lock);
+
+    hs_thread_done();
+    return NULL;
+}
+
+typedef void* threadfunc(void *);
+
+struct callback_queue* mkCallbackQueue(int use_foreign_export)
+{
+    struct callback_queue *q = malloc(sizeof(struct callback_queue));
+    pthread_t t;
+    pthread_mutex_init(&q->lock, NULL);
+    pthread_cond_init(&q->cond, NULL);
+    pthread_create(&t, NULL, (threadfunc*)callback, q);
+    q->pending = NULL;
+    q->use_foreign_export = use_foreign_export;
+    return q;
+}
+
+void destroyCallbackQueue(struct callback_queue *q)
+{
+    pthread_mutex_destroy(&q->lock);
+    pthread_cond_destroy(&q->cond);
+    free(q);
+}
+
+void scheduleCallback(struct callback_queue *q,
+                      HsStablePtr mvar,
+                      HsInt cap, int *presult)
+{
+    struct callback *p = malloc(sizeof(struct callback));
+    p->mvar = mvar;
+    p->cap = cap;
+    p->presult = presult;
+    pthread_mutex_lock(&q->lock);
+    p->next = q->pending;
+    q->pending = p;
+    pthread_cond_signal(&q->cond);
+    pthread_mutex_unlock(&q->lock);
+}