Add Neil Brown's pathalogical CML benchmark
authorSimon Marlow <marlowsd@gmail.com>
Tue, 23 Mar 2010 09:42:05 +0000 (09:42 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Tue, 23 Mar 2010 09:42:05 +0000 (09:42 +0000)
smp/threads007/Control/Concurrent/CML.hs [new file with mode: 0644]
smp/threads007/Main.hs [new file with mode: 0644]

diff --git a/smp/threads007/Control/Concurrent/CML.hs b/smp/threads007/Control/Concurrent/CML.hs
new file mode 100644 (file)
index 0000000..4c0a0e8
--- /dev/null
@@ -0,0 +1,261 @@
+--------------------------------------------------------------------------------
+-- |
+-- Module      :  Control.Concurrent.CML
+-- Copyright   :  Avik Chaudhuri 2009 (avik@cs.ucsc.edu)
+-- License     :  BSD3
+--
+-- Maintainer  :  ben.franksen@online.de
+-- Stability   :  provisional
+-- Portability :  portable
+--
+-- Events and Channels as in Concurrent ML (extended with communication guards)
+--
+-- See /A Concurrent ML Library in Concurrent Haskell/ by Avik Chaudhuri
+-- (avik\@cs.ucsc.edu). The original code as well as the papers can be
+-- found at <http://www.cs.umd.edu/~avik/projects/cmllch/>.
+--------------------------------------------------------------------------------
+module Control.Concurrent.CML (
+  -- * Channels
+  -- $channels
+  Channel,
+  channel,
+  receive,
+  transmit,
+  -- * Events
+  -- $events
+  Event,
+  sync,
+  choose,
+  wrap,
+  guard,
+  wrapabort,
+  spawn
+) where
+
+import Control.Concurrent.MVar(MVar, newEmptyMVar, putMVar, takeMVar)
+import Control.Concurrent(ThreadId, forkIO)
+import Control.Monad.Fix(fix)
+import Control.Monad(foldM, forever)
+import Data.Maybe(isJust)
+
+--------------------------------------------------------------------------------
+
+type Commit = MVar Bool
+type Decision = MVar (Maybe Commit)
+type Candidate = MVar (Maybe Decision)
+type In a = MVar (Candidate, a -> Bool, Synchronizer)
+type Out a = MVar (Candidate, a, Synchronizer)
+
+-- | Values of type @a@ can be transported over channels of type @Channel a@.
+data Channel a = Channel (In a) (Out a) (MVar a)
+
+instance Eq (Channel a) where
+    Channel _ _ m1 == Channel _ _ m2 = m1 == m2
+
+type Point = MVar ()
+type Name = MVar [Point]
+type Abort = MVar ([Point], IO ())
+type Synchronizer = MVar (Point, Decision)
+
+-- | Events return a value on synchronization.
+--
+-- Note that by construction, an event can synchronize at exactly one
+-- /commit point/, where a message is either sent or accepted on a
+-- channel. This commit point may be selected among several other,
+-- potential commit points. Some code may be run before
+-- synchronization, as specified by 'guard' functions throughout the
+-- event. Some more code may be run after synchronization, as specified
+-- by 'wrap' functions that surround the commit point, and by 'wrapabort'
+-- functions that do not surround the commit point.
+newtype Event a = Event (Synchronizer -> Abort -> Name -> IO a)
+
+--------------------------------------------------------------------------------
+
+atchan :: In a -> Out a -> IO ()
+atchan i o = do
+  (cand_i,patt,si) <- takeMVar i
+  (cand_o,y,so) <- takeMVar o
+  if (patt y && si /= so)
+    then do
+      dec_i <- newEmptyMVar
+      putMVar cand_i (Just dec_i)
+      ki <- takeMVar dec_i
+      dec_o <- newEmptyMVar
+      putMVar cand_o (Just dec_o)
+      ko <- takeMVar dec_o
+      maybe (return ()) (\ci -> putMVar ci (isJust ko)) ki
+      maybe (return ()) (\co -> putMVar co (isJust ki)) ko
+    else do
+      putMVar cand_i Nothing
+      putMVar cand_o Nothing
+      atchan i o
+
+atsync :: Synchronizer -> Abort -> IO () -> IO ()
+atsync r a x = do
+  (t,s) <- takeMVar r
+  forkIO $ fix $ \z -> do
+    (_,s') <- takeMVar r
+    forkIO z
+    putMVar s' Nothing
+  c <- newEmptyMVar
+  putMVar s (Just c)
+  b <- takeMVar c
+  if b
+    then do
+      putMVar t ()
+      fix $ \z -> do
+        (tL,f) <- takeMVar a
+        forkIO z
+        if elem t tL
+          then return ()
+          else f
+    else x
+
+atpointI :: Synchronizer -> Point -> In a -> (a -> Bool) -> IO a -> IO a
+atpointI r t i patt x = do
+  e <- newEmptyMVar
+  putMVar i (e,patt,r)
+  ms <- takeMVar e
+  maybe
+    (atpointI r t i patt x)
+    (\s -> do
+       putMVar r (t,s)
+       takeMVar t
+       x
+    )
+    ms
+
+atpointO :: Synchronizer -> Point -> Out a -> a -> IO () -> IO ()
+atpointO r t o y x = do
+  e <- newEmptyMVar
+  putMVar o (e,y,r)
+  ms <- takeMVar e
+  maybe
+    (atpointO r t o y x)
+    (\s -> do
+       putMVar r (t,s)
+       takeMVar t
+       x
+    )
+    ms
+
+--------------------------------------------------------------------------------
+
+-- $channels
+-- Channels transport a single value at a time. The operations on channels are:
+-- creation, transmit, and receive. None of them block the calling thread, in
+-- fact transmit and receive are pure functions, not IO actions. Blocking may
+-- occur only when a thread explicitly synchronizes on the resulting event.
+
+-- | Create a new channel.
+channel :: IO (Channel a)
+channel = do
+  i <- newEmptyMVar
+  o <- newEmptyMVar
+  forkIO $ forever $ atchan i o
+  m <- newEmptyMVar
+  return (Channel i o m)
+
+-- | Receive a message from a channel.
+--
+-- More precisely, @receive c cond@ returns an event that, on synchronization,
+-- accepts a message @m@ on channel @c@ and returns @m@. The resulting
+-- event is eligible for synchronization with a @transmit c m@ only if @cond m@
+-- is true.
+receive :: Channel a -> (a -> Bool) -> Event a
+receive (Channel i _ m) patt = Event efun where
+  efun r _ n = do
+    t <- newEmptyMVar
+    forkIO (putMVar n [t])
+    atpointI r t i patt (takeMVar m)
+
+-- | Transmit a message over a channel.
+--
+-- More precisely, @transmit c m@ returns an event that, on synchronization,
+-- sends the message @m@ on channel @c@ and returns @()@. Such an event must
+-- synchronize with @receive c@.
+transmit :: Channel a -> a -> Event ()
+transmit (Channel _ o m) y = Event efun where
+  efun r _ n = do
+    t <- newEmptyMVar
+    forkIO (putMVar n [t])
+    atpointO r t o y (putMVar m y)
+
+-- $events
+-- Events encapsulate a potentially blocking point of synchronization between
+-- threads, together with possible pre- and post-synchronization code as well
+-- as code that is executed (in a separate thread) when an event is /not/
+-- selected (aborted).
+
+-- | Non-deterministically select an event from a list of events, so that
+-- the selected event can be synchronized. The other events in the list are
+-- /aborted/.
+choose :: [Event a] -> Event a
+choose vL = Event efun where
+  efun r a n = do
+    j <- newEmptyMVar
+    tL <- foldM (\tL -> \(Event v) -> do
+        n' <- newEmptyMVar
+        forkIO $ v r a n' >>= putMVar j
+        tL' <- takeMVar n'
+        putMVar n' tL'
+        return (tL' ++ tL)
+      ) [] vL
+    forkIO (putMVar n tL)
+    takeMVar j
+
+-- | Specify a post-synchronization action.
+--
+-- More precisely, @wrap v f@ returns an event that, on synchronization,
+-- synchronizes the event @v@ and then runs the action returned by @f@
+-- applied to the result.
+wrap :: Event a -> (a -> IO b) -> Event b
+wrap (Event v) f = Event efun where
+  efun r a n = v r a n >>= f
+
+-- | Specify a pre-synchronization action.
+--
+-- More precisely, @guard a@ returns an event that, on synchronization,
+-- synchronizes the event returned by the action @a@. Here, @a@ is run
+-- every time a thread /tries/ to synchronize @guard a@.
+guard :: IO (Event a) -> Event a
+guard vs = Event efun where
+  efun r a n = do
+    Event v <- vs
+    v r a n
+
+-- | Specify a post-synchronization action that is spawned if an event is
+-- /not/ selected by a 'choose'.
+--
+-- More precisely, @wrapabort a v@ returns an event that, on
+-- synchronization, synchronizes the event @v@, and on abortion, spawns a
+-- thread that runs the action @a@. Here, if @v@ itself is of the form
+-- @choose vs@ and one of the events in @vs@ is selected, then @v@ is
+-- considered selected, so @a@ is not spawned.
+wrapabort :: IO () -> Event a -> Event a
+wrapabort f (Event v) = Event efun where
+  efun r a n = do
+    forkIO $ do
+      tL <- takeMVar n
+      putMVar n tL
+      putMVar a (tL, f)
+    v r a n
+
+-- | Synchronize an event.
+--
+-- This blocks the calling thread until a matching event is available.
+sync :: Event a -> IO a
+sync (Event v) = do
+  j <- newEmptyMVar
+  forkIO $ fix $ \z -> do
+    r <- newEmptyMVar
+    a <- newEmptyMVar
+    n <- newEmptyMVar
+    forkIO $ atsync r a z
+    x <- v r a n
+    putMVar j x
+  takeMVar j
+
+-- | A synonym for 'forkIO'.
+spawn :: IO () -> IO ThreadId
+spawn = forkIO
diff --git a/smp/threads007/Main.hs b/smp/threads007/Main.hs
new file mode 100644 (file)
index 0000000..e3f42e2
--- /dev/null
@@ -0,0 +1,16 @@
+-- Program from Neil Brown on haskell-cafe.
+--
+-- It exposes the O(n^2) behaviour in removing threads from the queue
+-- on an MVar during shutdown - in GHC 6.12.1 the program takes 25s in
+-- the EXIT phase deleting threads.
+
+import Control.Concurrent
+import Control.Concurrent.CML
+import Control.Monad
+
+main :: IO ()
+main = do 
+  let numChoices = 2
+  cs <- replicateM numChoices channel
+  mapM_ forkIO [replicateM_ (50000 `div` numChoices) $ sync $ transmit c () | c <- cs]
+  replicateM_ 50000 $ sync $ choose [receive c (const True) | c <- cs]