[project @ 2001-08-07 15:25:04 by simonmar]
[packages/old-time.git] / Control / Concurrent.hs
1 -----------------------------------------------------------------------------
2 --
3 -- Module : Control.Concurrent
4 -- Copyright : (c) The University of Glasgow 2001
5 -- License : BSD-style (see the file libraries/core/LICENSE)
6 --
7 -- Maintainer : libraries@haskell.org
8 -- Stability : experimental
9 -- Portability : non-portable
10 --
11 -- $Id: Concurrent.hs,v 1.2 2001/08/07 15:25:04 simonmar Exp $
12 --
13 -- A common interface to a collection of useful concurrency
14 -- abstractions.
15 --
16 -----------------------------------------------------------------------------
17
18 module Control.Concurrent (
19 module Control.Concurrent.Chan,
20 module Control.Concurrent.CVar,
21 module Control.Concurrent.MVar,
22 module Control.Concurrent.QSem,
23 module Control.Concurrent.QSemN,
24 module Control.Concurrent.SampleVar,
25
26 forkIO, -- :: IO () -> IO ()
27 yield, -- :: IO ()
28
29 #ifdef __GLASGOW_HASKELL__
30 ThreadId,
31
32 -- Forking and suchlike
33 myThreadId, -- :: IO ThreadId
34 killThread, -- :: ThreadId -> IO ()
35 throwTo, -- :: ThreadId -> Exception -> IO ()
36
37 threadDelay, -- :: Int -> IO ()
38 threadWaitRead, -- :: Int -> IO ()
39 threadWaitWrite, -- :: Int -> IO ()
40 #endif
41
42 -- merging of streams
43 mergeIO, -- :: [a] -> [a] -> IO [a]
44 nmergeIO -- :: [[a]] -> IO [a]
45 ) where
46
47 import Prelude
48
49 import Control.Exception as Exception
50
51 #ifdef __GLASGOW_HASKELL__
52 import GHC.Conc
53 import GHC.TopHandler ( reportStackOverflow, reportError )
54 import GHC.IOBase ( IO(..) )
55 import GHC.IOBase ( unsafeInterleaveIO )
56 import GHC.Base ( fork# )
57 import GHC.Prim ( Addr#, unsafeCoerce# )
58 #endif
59
60 #ifdef __HUGS__
61 import IOExts ( unsafeInterleaveIO )
62 import ConcBase
63 #endif
64
65 import Control.Concurrent.MVar
66 import Control.Concurrent.CVar
67 import Control.Concurrent.Chan
68 import Control.Concurrent.QSem
69 import Control.Concurrent.QSemN
70 import Control.Concurrent.SampleVar
71
72 -- Thread Ids, specifically the instances of Eq and Ord for these things.
73 -- The ThreadId type itself is defined in std/PrelConc.lhs.
74
75 -- Rather than define a new primitve, we use a little helper function
76 -- cmp_thread in the RTS.
77
78 #ifdef __GLASGOW_HASKELL__
79 foreign import ccall "cmp_thread" unsafe cmp_thread :: Addr# -> Addr# -> Int
80 -- Returns -1, 0, 1
81
82 cmpThread :: ThreadId -> ThreadId -> Ordering
83 cmpThread (ThreadId t1) (ThreadId t2) =
84 case cmp_thread (unsafeCoerce# t1) (unsafeCoerce# t2) of
85 -1 -> LT
86 0 -> EQ
87 _ -> GT -- must be 1
88
89 instance Eq ThreadId where
90 t1 == t2 =
91 case t1 `cmpThread` t2 of
92 EQ -> True
93 _ -> False
94
95 instance Ord ThreadId where
96 compare = cmpThread
97
98 forkIO :: IO () -> IO ThreadId
99 forkIO action = IO $ \ s ->
100 case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
101 where
102 action_plus = Exception.catch action childHandler
103
104 childHandler :: Exception -> IO ()
105 childHandler err = Exception.catch (real_handler err) childHandler
106
107 real_handler :: Exception -> IO ()
108 real_handler ex =
109 case ex of
110 -- ignore thread GC and killThread exceptions:
111 BlockedOnDeadMVar -> return ()
112 AsyncException ThreadKilled -> return ()
113
114 -- report all others:
115 AsyncException StackOverflow -> reportStackOverflow False
116 ErrorCall s -> reportError False s
117 other -> reportError False (showsPrec 0 other "\n")
118
119 #endif /* __GLASGOW_HASKELL__ */
120
121
122 max_buff_size :: Int
123 max_buff_size = 1
124
125 mergeIO :: [a] -> [a] -> IO [a]
126 nmergeIO :: [[a]] -> IO [a]
127
128 mergeIO ls rs
129 = newEmptyMVar >>= \ tail_node ->
130 newMVar tail_node >>= \ tail_list ->
131 newQSem max_buff_size >>= \ e ->
132 newMVar 2 >>= \ branches_running ->
133 let
134 buff = (tail_list,e)
135 in
136 forkIO (suckIO branches_running buff ls) >>
137 forkIO (suckIO branches_running buff rs) >>
138 takeMVar tail_node >>= \ val ->
139 signalQSem e >>
140 return val
141
142 type Buffer a
143 = (MVar (MVar [a]), QSem)
144
145 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
146
147 suckIO branches_running buff@(tail_list,e) vs
148 = case vs of
149 [] -> takeMVar branches_running >>= \ val ->
150 if val == 1 then
151 takeMVar tail_list >>= \ node ->
152 putMVar node [] >>
153 putMVar tail_list node
154 else
155 putMVar branches_running (val-1)
156 (x:xs) ->
157 waitQSem e >>
158 takeMVar tail_list >>= \ node ->
159 newEmptyMVar >>= \ next_node ->
160 unsafeInterleaveIO (
161 takeMVar next_node >>= \ y ->
162 signalQSem e >>
163 return y) >>= \ next_node_val ->
164 putMVar node (x:next_node_val) >>
165 putMVar tail_list next_node >>
166 suckIO branches_running buff xs
167
168 nmergeIO lss
169 = let
170 len = length lss
171 in
172 newEmptyMVar >>= \ tail_node ->
173 newMVar tail_node >>= \ tail_list ->
174 newQSem max_buff_size >>= \ e ->
175 newMVar len >>= \ branches_running ->
176 let
177 buff = (tail_list,e)
178 in
179 mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
180 takeMVar tail_node >>= \ val ->
181 signalQSem e >>
182 return val
183 where
184 mapIO f xs = sequence (map f xs)