[project @ 2001-06-28 14:15:04 by simonmar]
[packages/old-time.git] / Control / Concurrent.hs
1 -----------------------------------------------------------------------------
2 --
3 -- Module : Control.Concurrent
4 -- Copyright : (c) The University of Glasgow 2001
5 -- License : BSD-style (see the file libraries/core/LICENSE)
6 --
7 -- Maintainer : libraries@haskell.org
8 -- Stability : experimental
9 -- Portability : non-portable
10 --
11 -- $Id: Concurrent.hs,v 1.1 2001/06/28 14:15:01 simonmar Exp $
12 --
13 -- A common interface to a collection of useful concurrency
14 -- abstractions.
15 --
16 -----------------------------------------------------------------------------
17
18 module Control.Concurrent
19 ( module Control.Concurrent.Chan
20 , module Control.Concurrent.CVar
21 , module Control.Concurrent.MVar
22 , module Control.Concurrent.QSem
23 , module Control.Concurrent.QSemN
24 , module Control.Concurrent.SampleVar
25
26 #ifdef __HUGS__
27 , forkIO -- :: IO () -> IO ()
28 #elif defined(__GLASGOW_HASKELL__)
29 , ThreadId
30
31 -- Forking and suchlike
32 , myThreadId -- :: IO ThreadId
33 , killThread -- :: ThreadId -> IO ()
34 , throwTo -- :: ThreadId -> Exception -> IO ()
35 #endif
36 , par -- :: a -> b -> b
37 , seq -- :: a -> b -> b
38 #ifdef __GLASGOW_HASKELL__
39 , fork -- :: a -> b -> b
40 #endif
41 , yield -- :: IO ()
42
43 #ifdef __GLASGOW_HASKELL__
44 , threadDelay -- :: Int -> IO ()
45 , threadWaitRead -- :: Int -> IO ()
46 , threadWaitWrite -- :: Int -> IO ()
47 #endif
48
49 -- merging of streams
50 , mergeIO -- :: [a] -> [a] -> IO [a]
51 , nmergeIO -- :: [[a]] -> IO [a]
52 ) where
53
54 import Prelude
55
56 import Control.Exception as Exception
57
58 #ifdef __GLASGOW_HASKELL__
59 import GHC.Conc
60 import GHC.TopHandler ( reportStackOverflow, reportError )
61 import GHC.IOBase ( IO(..) )
62 import GHC.IOBase ( unsafePerformIO , unsafeInterleaveIO )
63 import GHC.Base ( fork# )
64 import GHC.Prim ( Addr#, unsafeCoerce# )
65 #endif
66
67 #ifdef __HUGS__
68 import IOExts ( unsafeInterleaveIO, unsafePerformIO )
69 import ConcBase
70 #endif
71
72 import Control.Concurrent.MVar
73 import Control.Concurrent.CVar
74 import Control.Concurrent.Chan
75 import Control.Concurrent.QSem
76 import Control.Concurrent.QSemN
77 import Control.Concurrent.SampleVar
78
79 #ifdef __GLASGOW_HASKELL__
80 infixr 0 `fork`
81 #endif
82
83 -- Thread Ids, specifically the instances of Eq and Ord for these things.
84 -- The ThreadId type itself is defined in std/PrelConc.lhs.
85
86 -- Rather than define a new primitve, we use a little helper function
87 -- cmp_thread in the RTS.
88
89 #ifdef __GLASGOW_HASKELL__
90 foreign import ccall "cmp_thread" unsafe cmp_thread :: Addr# -> Addr# -> Int
91 -- Returns -1, 0, 1
92
93 cmpThread :: ThreadId -> ThreadId -> Ordering
94 cmpThread (ThreadId t1) (ThreadId t2) =
95 case cmp_thread (unsafeCoerce# t1) (unsafeCoerce# t2) of
96 -1 -> LT
97 0 -> EQ
98 _ -> GT -- must be 1
99
100 instance Eq ThreadId where
101 t1 == t2 =
102 case t1 `cmpThread` t2 of
103 EQ -> True
104 _ -> False
105
106 instance Ord ThreadId where
107 compare = cmpThread
108
109 forkIO :: IO () -> IO ThreadId
110 forkIO action = IO $ \ s ->
111 case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
112 where
113 action_plus = Exception.catch action childHandler
114
115 childHandler :: Exception -> IO ()
116 childHandler err = Exception.catch (real_handler err) childHandler
117
118 real_handler :: Exception -> IO ()
119 real_handler ex =
120 case ex of
121 -- ignore thread GC and killThread exceptions:
122 BlockedOnDeadMVar -> return ()
123 AsyncException ThreadKilled -> return ()
124
125 -- report all others:
126 AsyncException StackOverflow -> reportStackOverflow False
127 ErrorCall s -> reportError False s
128 other -> reportError False (showsPrec 0 other "\n")
129
130 {-# INLINE fork #-}
131 fork :: a -> b -> b
132 fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y
133
134 #endif /* __GLASGOW_HASKELL__ */
135
136
137 max_buff_size :: Int
138 max_buff_size = 1
139
140 mergeIO :: [a] -> [a] -> IO [a]
141 nmergeIO :: [[a]] -> IO [a]
142
143 mergeIO ls rs
144 = newEmptyMVar >>= \ tail_node ->
145 newMVar tail_node >>= \ tail_list ->
146 newQSem max_buff_size >>= \ e ->
147 newMVar 2 >>= \ branches_running ->
148 let
149 buff = (tail_list,e)
150 in
151 forkIO (suckIO branches_running buff ls) >>
152 forkIO (suckIO branches_running buff rs) >>
153 takeMVar tail_node >>= \ val ->
154 signalQSem e >>
155 return val
156
157 type Buffer a
158 = (MVar (MVar [a]), QSem)
159
160 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
161
162 suckIO branches_running buff@(tail_list,e) vs
163 = case vs of
164 [] -> takeMVar branches_running >>= \ val ->
165 if val == 1 then
166 takeMVar tail_list >>= \ node ->
167 putMVar node [] >>
168 putMVar tail_list node
169 else
170 putMVar branches_running (val-1)
171 (x:xs) ->
172 waitQSem e >>
173 takeMVar tail_list >>= \ node ->
174 newEmptyMVar >>= \ next_node ->
175 unsafeInterleaveIO (
176 takeMVar next_node >>= \ y ->
177 signalQSem e >>
178 return y) >>= \ next_node_val ->
179 putMVar node (x:next_node_val) >>
180 putMVar tail_list next_node >>
181 suckIO branches_running buff xs
182
183 nmergeIO lss
184 = let
185 len = length lss
186 in
187 newEmptyMVar >>= \ tail_node ->
188 newMVar tail_node >>= \ tail_list ->
189 newQSem max_buff_size >>= \ e ->
190 newMVar len >>= \ branches_running ->
191 let
192 buff = (tail_list,e)
193 in
194 mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
195 takeMVar tail_node >>= \ val ->
196 signalQSem e >>
197 return val
198 where
199 mapIO f xs = sequence (map f xs)