[project @ 2002-04-24 16:31:37 by simonmar]
[packages/old-time.git] / Control / Concurrent.hs
1 -----------------------------------------------------------------------------
2 -- |
3 -- Module : Control.Concurrent
4 -- Copyright : (c) The University of Glasgow 2001
5 -- License : BSD-style (see the file libraries/core/LICENSE)
6 --
7 -- Maintainer : libraries@haskell.org
8 -- Stability : experimental
9 -- Portability : non-portable
10 --
11 -- $Id: Concurrent.hs,v 1.6 2002/04/24 16:31:37 simonmar Exp $
12 --
13 -- A common interface to a collection of useful concurrency
14 -- abstractions.
15 --
16 -----------------------------------------------------------------------------
17
18 module Control.Concurrent (
19 module Control.Concurrent.Chan,
20 module Control.Concurrent.CVar,
21 module Control.Concurrent.MVar,
22 module Control.Concurrent.QSem,
23 module Control.Concurrent.QSemN,
24 module Control.Concurrent.SampleVar,
25
26 forkIO, -- :: IO () -> IO ()
27 yield, -- :: IO ()
28
29 #ifdef __GLASGOW_HASKELL__
30 ThreadId,
31
32 -- Forking and suchlike
33 myThreadId, -- :: IO ThreadId
34 killThread, -- :: ThreadId -> IO ()
35 throwTo, -- :: ThreadId -> Exception -> IO ()
36
37 threadDelay, -- :: Int -> IO ()
38 threadWaitRead, -- :: Int -> IO ()
39 threadWaitWrite, -- :: Int -> IO ()
40 #endif
41
42 -- merging of streams
43 mergeIO, -- :: [a] -> [a] -> IO [a]
44 nmergeIO -- :: [[a]] -> IO [a]
45 ) where
46
47 import Prelude
48
49 import Control.Exception as Exception
50
51 #ifdef __GLASGOW_HASKELL__
52 import GHC.Conc
53 import GHC.TopHandler ( reportStackOverflow, reportError )
54 import GHC.IOBase ( IO(..) )
55 import GHC.IOBase ( unsafeInterleaveIO )
56 import GHC.Base
57 #endif
58
59 #ifdef __HUGS__
60 import IOExts ( unsafeInterleaveIO )
61 import ConcBase
62 #endif
63
64 import Control.Concurrent.MVar
65 import Control.Concurrent.CVar
66 import Control.Concurrent.Chan
67 import Control.Concurrent.QSem
68 import Control.Concurrent.QSemN
69 import Control.Concurrent.SampleVar
70
71 -- Thread Ids, specifically the instances of Eq and Ord for these things.
72 -- The ThreadId type itself is defined in std/PrelConc.lhs.
73
74 -- Rather than define a new primitve, we use a little helper function
75 -- cmp_thread in the RTS.
76
77 #ifdef __GLASGOW_HASKELL__
78 foreign import ccall unsafe "cmp_thread" cmp_thread :: Addr# -> Addr# -> Int
79 -- Returns -1, 0, 1
80
81 cmpThread :: ThreadId -> ThreadId -> Ordering
82 cmpThread (ThreadId t1) (ThreadId t2) =
83 case cmp_thread (unsafeCoerce# t1) (unsafeCoerce# t2) of
84 -1 -> LT
85 0 -> EQ
86 _ -> GT -- must be 1
87
88 instance Eq ThreadId where
89 t1 == t2 =
90 case t1 `cmpThread` t2 of
91 EQ -> True
92 _ -> False
93
94 instance Ord ThreadId where
95 compare = cmpThread
96
97 foreign import ccall unsafe "rts_getThreadId" getThreadId :: Addr# -> Int
98
99 instance Show ThreadId where
100 showsPrec d (ThreadId t) =
101 showString "ThreadId " .
102 showsPrec d (getThreadId (unsafeCoerce# t))
103
104 forkIO :: IO () -> IO ThreadId
105 forkIO action = IO $ \ s ->
106 case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
107 where
108 action_plus = Exception.catch action childHandler
109
110 childHandler :: Exception -> IO ()
111 childHandler err = Exception.catch (real_handler err) childHandler
112
113 real_handler :: Exception -> IO ()
114 real_handler ex =
115 case ex of
116 -- ignore thread GC and killThread exceptions:
117 BlockedOnDeadMVar -> return ()
118 AsyncException ThreadKilled -> return ()
119
120 -- report all others:
121 AsyncException StackOverflow -> reportStackOverflow False
122 ErrorCall s -> reportError False s
123 other -> reportError False (showsPrec 0 other "\n")
124
125 #endif /* __GLASGOW_HASKELL__ */
126
127
128 max_buff_size :: Int
129 max_buff_size = 1
130
131 mergeIO :: [a] -> [a] -> IO [a]
132 nmergeIO :: [[a]] -> IO [a]
133
134 mergeIO ls rs
135 = newEmptyMVar >>= \ tail_node ->
136 newMVar tail_node >>= \ tail_list ->
137 newQSem max_buff_size >>= \ e ->
138 newMVar 2 >>= \ branches_running ->
139 let
140 buff = (tail_list,e)
141 in
142 forkIO (suckIO branches_running buff ls) >>
143 forkIO (suckIO branches_running buff rs) >>
144 takeMVar tail_node >>= \ val ->
145 signalQSem e >>
146 return val
147
148 type Buffer a
149 = (MVar (MVar [a]), QSem)
150
151 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
152
153 suckIO branches_running buff@(tail_list,e) vs
154 = case vs of
155 [] -> takeMVar branches_running >>= \ val ->
156 if val == 1 then
157 takeMVar tail_list >>= \ node ->
158 putMVar node [] >>
159 putMVar tail_list node
160 else
161 putMVar branches_running (val-1)
162 (x:xs) ->
163 waitQSem e >>
164 takeMVar tail_list >>= \ node ->
165 newEmptyMVar >>= \ next_node ->
166 unsafeInterleaveIO (
167 takeMVar next_node >>= \ y ->
168 signalQSem e >>
169 return y) >>= \ next_node_val ->
170 putMVar node (x:next_node_val) >>
171 putMVar tail_list next_node >>
172 suckIO branches_running buff xs
173
174 nmergeIO lss
175 = let
176 len = length lss
177 in
178 newEmptyMVar >>= \ tail_node ->
179 newMVar tail_node >>= \ tail_list ->
180 newQSem max_buff_size >>= \ e ->
181 newMVar len >>= \ branches_running ->
182 let
183 buff = (tail_list,e)
184 in
185 mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
186 takeMVar tail_node >>= \ val ->
187 signalQSem e >>
188 return val
189 where
190 mapIO f xs = sequence (map f xs)