Add TQueue, a faster TChan without dup/clone operations
authorSimon Marlow <marlowsd@gmail.com>
Fri, 15 Jun 2012 08:46:31 +0000 (09:46 +0100)
committerSimon Marlow <marlowsd@gmail.com>
Fri, 15 Jun 2012 09:09:25 +0000 (10:09 +0100)
Control/Concurrent/STM/TQueue.hs [new file with mode: 0644]
stm.cabal

diff --git a/Control/Concurrent/STM/TQueue.hs b/Control/Concurrent/STM/TQueue.hs
new file mode 100644 (file)
index 0000000..0df1052
--- /dev/null
@@ -0,0 +1,134 @@
+{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
+{-# LANGUAGE CPP, DeriveDataTypeable #-}
+
+#if __GLASGOW_HASKELL__ >= 701
+{-# LANGUAGE Trustworthy #-}
+#endif
+
+-----------------------------------------------------------------------------
+-- |
+-- Module      :  Control.Concurrent.STM.TQueue
+-- Copyright   :  (c) The University of Glasgow 2012
+-- License     :  BSD-style (see the file libraries/base/LICENSE)
+-- 
+-- Maintainer  :  libraries@haskell.org
+-- Stability   :  experimental
+-- Portability :  non-portable (requires STM)
+--
+-- A 'TQueue' is like a 'TChan', with two important differences:
+--
+--  * it has faster throughput than both 'TChan' and 'Chan' (although
+--    the costs are amortised, so the cost of individual operations
+--    can vary a lot).
+--
+--  * it does /not/ provide equivalents of the 'dupTChan' and
+--    'cloneTChan' operations.
+--
+-- The implementation is based on the traditional purely-functional
+-- queue representation that uses two lists to obtain amortised /O(1)/
+-- enqueue and dequeue operations.
+--
+-----------------------------------------------------------------------------
+
+module Control.Concurrent.STM.TQueue (
+        -- * TChans
+       TQueue,
+       newTQueue,
+       newTQueueIO,
+       readTQueue,
+       tryReadTQueue,
+       peekTQueue,
+       tryPeekTQueue,
+       writeTQueue,
+        unGetTQueue,
+        isEmptyTQueue,
+  ) where
+
+import GHC.Conc
+
+import Data.Typeable (Typeable)
+
+-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
+data TQueue a = TQueue {-# UNPACK #-} !(TVar [a])
+                       {-# UNPACK #-} !(TVar [a])
+  deriving Typeable
+
+-- |Build and returns a new instance of 'TQueue'
+newTQueue :: STM (TQueue a)
+newTQueue = do
+  read  <- newTVar []
+  write <- newTVar []
+  return (TQueue read write)
+
+-- |@IO@ version of 'newTQueue'.  This is useful for creating top-level
+-- 'TQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
+-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
+-- possible.
+newTQueueIO :: IO (TQueue a)
+newTQueueIO = do
+  read  <- newTVarIO []
+  write <- newTVarIO []
+  return (TQueue read write)
+
+-- |Write a value to a 'TQueue'.
+writeTQueue :: TQueue a -> a -> STM ()
+writeTQueue (TQueue _read write) a = do
+  listend <- readTVar write
+  writeTVar write (a:listend)
+
+-- |Read the next value from the 'TQueue'.
+readTQueue :: TQueue a -> STM a
+readTQueue (TQueue read write) = do
+  xs <- readTVar read
+  case xs of
+    (x:xs') -> do writeTVar read xs'
+                  return x
+    [] -> do ys <- readTVar write
+             case ys of
+               [] -> retry
+               _  -> case reverse ys of
+                       [] -> error "readTQueue"
+                       (z:zs) -> do writeTVar write []
+                                    writeTVar read zs
+                                    return z
+
+-- | A version of 'readTQueue' which does not retry. Instead it
+-- returns @Nothing@ if no value is available.
+tryReadTQueue :: TQueue a -> STM (Maybe a)
+tryReadTQueue c = fmap Just (readTQueue c) `orElse` return Nothing
+
+-- | Get the next value from the @TQueue@ without removing it,
+-- retrying if the channel is empty.
+peekTQueue :: TQueue a -> STM a
+peekTQueue c = do
+  x <- readTQueue c
+  unGetTQueue c x
+  return x
+
+-- | A version of 'peekTQueue' which does not retry. Instead it
+-- returns @Nothing@ if no value is available.
+tryPeekTQueue :: TQueue a -> STM (Maybe a)
+tryPeekTQueue c = do
+  m <- tryReadTQueue c
+  case m of
+    Nothing -> return Nothing
+    Just x  -> do
+      unGetTQueue c x
+      return m
+
+-- |Put a data item back onto a channel, where it will be the next item read.
+unGetTQueue :: TQueue a -> a -> STM ()
+unGetTQueue (TQueue read _write) a = do
+  xs <- readTVar read
+  writeTVar read (a:xs)
+
+-- |Returns 'True' if the supplied 'TQueue' is empty.
+isEmptyTQueue :: TQueue a -> STM Bool
+isEmptyTQueue (TQueue read write) = do
+  xs <- readTVar read
+  case xs of
+    (_:_) -> return False
+    [] -> do ys <- readTVar write
+             case ys of
+               [] -> return True
+               _  -> return False
index 97e817d..7c9366b 100644 (file)
--- a/stm.cabal
+++ b/stm.cabal
@@ -22,6 +22,7 @@ library
     Control.Concurrent.STM.TVar
     Control.Concurrent.STM.TChan
     Control.Concurrent.STM.TMVar
+    Control.Concurrent.STM.TQueue
     Control.Monad.STM
   other-modules:
     Control.Sequential.STM