testsuite: Add test for #13916
authorBen Gamari <ben@smart-cactus.org>
Thu, 20 Jul 2017 15:43:06 +0000 (11:43 -0400)
committerBen Gamari <ben@smart-cactus.org>
Thu, 20 Jul 2017 15:43:06 +0000 (11:43 -0400)
testsuite/tests/concurrent/should_run/T13916.hs [new file with mode: 0755]
testsuite/tests/concurrent/should_run/T13916_Bracket.hs [new file with mode: 0755]
testsuite/tests/concurrent/should_run/all.T

diff --git a/testsuite/tests/concurrent/should_run/T13916.hs b/testsuite/tests/concurrent/should_run/T13916.hs
new file mode 100755 (executable)
index 0000000..e81aabb
--- /dev/null
@@ -0,0 +1,33 @@
+module Main where
+
+import Data.IORef
+import System.IO.Unsafe
+import Control.Concurrent.STM
+import Control.Concurrent.Async
+import Control.Concurrent
+import System.IO
+import System.Directory
+import System.FilePath
+import T13916_Bracket
+
+type Thing = MVar Bool
+
+main :: IO ()
+main = do
+    withEnvCache limit spawner $ \cache ->
+        forConcurrently_ [1..1000 :: Int] $ \n -> withEnv cache (\handle -> put handle n)
+    where
+        limit :: Limit
+        limit = Hard 1
+
+        put handle n = return ()
+
+spawner :: Spawner Thing
+spawner = Spawner
+    { maker  = mkhandle
+    , killer = \thing -> takeMVar thing >> putMVar thing True
+    , isDead = \thing -> readMVar thing
+    }
+
+mkhandle :: IO Thing
+mkhandle = newMVar False
diff --git a/testsuite/tests/concurrent/should_run/T13916_Bracket.hs b/testsuite/tests/concurrent/should_run/T13916_Bracket.hs
new file mode 100755 (executable)
index 0000000..340cbb3
--- /dev/null
@@ -0,0 +1,135 @@
+{-# LANGUAGE RankNTypes #-}\r
+{-# LANGUAGE RecordWildCards #-}\r
+{- |\r
+Module      : Bracket\r
+Description : Handling multiple environments with bracket-like apis\r
+Maintainer  : robertkennedy@clearwateranalytics.com\r
+Stability   : stable\r
+\r
+This module is meant for ie Sql or mongo connections, where you may wish for some number of easy to grab\r
+environments. In particular, this assumes your connection has some initialization/release functions\r
+\r
+This module creates bugs with any optimizations enabled. The bugs do not occur if the program is in the same\r
+module.\r
+-}\r
+module Bracket (\r
+    -- * Data Types\r
+    Spawner(..), Limit(..), Cache,\r
+    -- * Usage\r
+    withEnvCache, withEnv\r
+    ) where\r
+\r
+import Control.Concurrent.STM\r
+import Control.Concurrent.STM.TSem\r
+import Control.Exception hiding (handle)\r
+import Control.Monad\r
+import Data.Vector (Vector)\r
+import qualified Data.Vector as Vector\r
+\r
+-- * Data Types\r
+-- | Tells the program how many environments it is allowed to spawn.\r
+-- A `Lax` limit will spawn extra connections if the `Cache` is empty,\r
+-- while a `Hard` limit will not spawn any more than the given number of connections simultaneously.\r
+--\r
+-- @since 0.3.7\r
+data Limit = Hard {getLimit :: {-# unpack #-} !Int}\r
+\r
+data Spawner env = Spawner\r
+    { maker  :: IO env\r
+    , killer :: env -> IO ()\r
+    , isDead :: env -> IO Bool\r
+    }\r
+\r
+type VCache env = Vector (TMVar env)\r
+data Cache env = Unlimited { spawner :: Spawner env\r
+                           , vcache :: !(VCache env)\r
+                           }\r
+               | Limited   { spawner :: Spawner env\r
+                           , vcache :: !(VCache env)\r
+                           , envsem :: TSem\r
+                           }\r
+\r
+-- ** Initialization\r
+withEnvCache :: Limit -> Spawner env -> (Cache env -> IO a) -> IO a\r
+withEnvCache limit spawner = bracket starter releaseCache\r
+    where starter = case limit of\r
+            Hard n -> Limited spawner <$> initializeEmptyCache n <*> atomically (newTSem n)\r
+\r
+-- ** Using a single value\r
+withEnv :: Cache env -> (env -> IO a) -> IO a\r
+withEnv cache = case cache of\r
+    Unlimited{..} -> withEnvUnlimited spawner vcache\r
+    Limited{..}   -> withEnvLimited   spawner vcache envsem\r
+\r
+-- *** Unlimited\r
+-- | Takes an env and returns it on completion of the function.\r
+-- If all envs are already taken or closed, this will spin up a new env.\r
+-- When the function finishes, this will attempt to put the env into the cache. If it cannot,\r
+-- it will kill the env. Note this can lead to many concurrent connections.\r
+--\r
+-- @since 0.3.5\r
+withEnvUnlimited :: Spawner env -> VCache env -> (env -> IO a) -> IO a\r
+withEnvUnlimited Spawner{..} cache = bracket taker putter\r
+  where\r
+    taker = do\r
+        mpipe <- atomically $ tryTakeEnv cache\r
+        case mpipe of\r
+            Nothing  -> maker\r
+            Just env -> isDead env >>= \b -> if not b then return env else killer env >> maker\r
+\r
+    putter env = do\r
+        accepted <- atomically $ tryPutEnv cache env\r
+        unless accepted $ killer env\r
+\r
+-- *** Limited\r
+-- | Takes an env and returns it on completion of the function.\r
+-- If all envs are already taken, this will wait. This should have a constant number of environments\r
+--\r
+-- @since 0.3.6\r
+withEnvLimited :: Spawner env -> VCache env -> TSem -> (env -> IO a) -> IO a\r
+withEnvLimited spawner vcache envsem = bracket taker putter\r
+  where\r
+    taker = limitMakeEnv spawner vcache envsem\r
+    putter env = atomically $ putEnv vcache env\r
+\r
+limitMakeEnv :: Spawner env -> VCache env -> TSem -> IO env\r
+limitMakeEnv Spawner{..} vcache envsem = go\r
+  where\r
+    go = do\r
+        eenvpermission <- atomically $ ( Left  <$> takeEnv  vcache )\r
+                              `orElse` ( Right <$> waitTSem envsem )\r
+        case eenvpermission of\r
+            Right () -> maker\r
+            Left env -> do\r
+                -- Given our env, we check if it's dead. If it's not, we are done and return it.\r
+                -- If it is dead, we release it, signal that a new env can be created, and then recurse\r
+                isdead <- isDead env\r
+                if not isdead then return env\r
+                    else do\r
+                         killer env\r
+                         atomically $ signalTSem envsem\r
+                         go\r
+\r
+-- * Low level\r
+initializeEmptyCache :: Int -> IO (VCache env)\r
+initializeEmptyCache n | n < 1     = return mempty\r
+                       | otherwise = Vector.replicateM n newEmptyTMVarIO\r
+\r
+takeEnv :: VCache env -> STM env\r
+takeEnv = Vector.foldl folding retry\r
+    where folding m stmenv = m `orElse` takeTMVar stmenv\r
+\r
+tryTakeEnv :: VCache env -> STM (Maybe env)\r
+tryTakeEnv cache = (Just <$> takeEnv cache) `orElse` pure Nothing\r
+\r
+putEnv :: VCache env -> env -> STM ()\r
+putEnv cache env = Vector.foldl folding retry cache\r
+    where folding m stmenv = m `orElse` putTMVar stmenv env\r
+\r
+tryPutEnv :: VCache env -> env -> STM Bool\r
+tryPutEnv cache env = (putEnv cache env *> return True) `orElse` pure False\r
+\r
+releaseCache :: Cache env -> IO ()\r
+releaseCache cache = Vector.mapM_ qkRelease (vcache cache)\r
+    where qkRelease tenv = atomically (tryTakeTMVar tenv)\r
+                       >>= maybe (return ()) (killer $ spawner cache)\r
index 69b8ad7..abac22a 100644 (file)
@@ -284,3 +284,4 @@ test('hs_try_putmvar003',
 
 # Check forkIO exception determinism under optimization
 test('T13330', normal, compile_and_run, ['-O'])
+test('T13916', normal, compile_and_run, ['-O2'])