[project @ 2002-07-26 12:12:33 by stolz]
[packages/old-time.git] / Control / Concurrent.hs
1 -----------------------------------------------------------------------------
2 -- |
3 -- Module : Control.Concurrent
4 -- Copyright : (c) The University of Glasgow 2001
5 -- License : BSD-style (see the file libraries/base/LICENSE)
6 --
7 -- Maintainer : libraries@haskell.org
8 -- Stability : experimental
9 -- Portability : non-portable (concurrency)
10 --
11 -- A common interface to a collection of useful concurrency
12 -- abstractions.
13 --
14 -----------------------------------------------------------------------------
15
16 module Control.Concurrent (
17 -- * Concurrent Haskell
18
19 -- $conc_intro
20
21 -- * Basic concurrency operations
22
23 ThreadId,
24 myThreadId,
25
26 forkIO,
27 killThread,
28 throwTo,
29
30 -- * Scheduling
31
32 -- $conc_scheduling
33 yield, -- :: IO ()
34
35 -- ** Blocking
36
37 -- $blocking
38
39 #ifdef __GLASGOW_HASKELL__
40 -- ** Waiting
41 threadDelay, -- :: Int -> IO ()
42 threadWaitRead, -- :: Int -> IO ()
43 threadWaitWrite, -- :: Int -> IO ()
44 #endif
45
46 -- * Communication abstractions
47
48 module Control.Concurrent.MVar,
49 module Control.Concurrent.Chan,
50 module Control.Concurrent.QSem,
51 module Control.Concurrent.QSemN,
52 module Control.Concurrent.SampleVar,
53
54 -- * Merging of streams
55 mergeIO, -- :: [a] -> [a] -> IO [a]
56 nmergeIO, -- :: [[a]] -> IO [a]
57 -- $merge
58
59 -- * GHC's implementation of concurrency
60
61 -- |This section describes features specific to GHC's
62 -- implementation of Concurrent Haskell.
63
64 -- ** Terminating the program
65
66 -- $termination
67
68 -- ** Pre-emption
69
70 -- $preemption
71
72 ) where
73
74 import Prelude
75
76 import Control.Exception as Exception
77
78 #ifdef __GLASGOW_HASKELL__
79 import GHC.Conc
80 import GHC.TopHandler ( reportStackOverflow, reportError )
81 import GHC.IOBase ( IO(..) )
82 import GHC.IOBase ( unsafeInterleaveIO )
83 import GHC.Base
84 #endif
85
86 #ifdef __HUGS__
87 import IOExts ( unsafeInterleaveIO )
88 import ConcBase
89 #endif
90
91 import Control.Concurrent.MVar
92 import Control.Concurrent.Chan
93 import Control.Concurrent.QSem
94 import Control.Concurrent.QSemN
95 import Control.Concurrent.SampleVar
96
97 {- $conc_intro
98
99 The concurrency extension for Haskell is described in the paper
100 /Concurrent Haskell/
101 <http://www.haskell.org/ghc/docs/papers/concurrent-haskell.ps.gz>.
102
103 Concurrency is \"lightweight\", which means that both thread creation
104 and context switching overheads are extremely low. Scheduling of
105 Haskell threads is done internally in the Haskell runtime system, and
106 doesn't make use of any operating system-supplied thread packages.
107
108 Haskell threads can communicate via 'MVar's, a kind of synchronised
109 mutable variable (see "Control.Concurrent.MVar"). Several common
110 concurrency abstractions can be built from 'MVar's, and these are
111 provided by the "Concurrent" library. Threads may also communicate
112 via exceptions.
113 -}
114
115 {- $conc_scheduling
116
117 Scheduling may be either pre-emptive or co-operative,
118 depending on the implementation of Concurrent Haskell (see below
119 for imformation related to specific compilers). In a co-operative
120 system, context switches only occur when you use one of the
121 primitives defined in this module. This means that programs such
122 as:
123
124
125 > main = forkIO (write 'a') >> write 'b'
126 > where write c = putChar c >> write c
127
128 will print either @aaaaaaaaaaaaaa...@ or @bbbbbbbbbbbb...@,
129 instead of some random interleaving of @a@s and @b@s. In
130 practice, cooperative multitasking is sufficient for writing
131 simple graphical user interfaces.
132 -}
133
134 {- $blocking
135 Calling a foreign C procedure (such as @getchar@) that blocks waiting
136 for input will block /all/ threads, unless the @threadsafe@ attribute
137 is used on the foreign call (and your compiler \/ operating system
138 supports it). GHC's I\/O system uses non-blocking I\/O internally to
139 implement thread-friendly I\/O, so calling standard Haskell I\/O
140 functions blocks only the thread making the call.
141 -}
142
143 -- Thread Ids, specifically the instances of Eq and Ord for these things.
144 -- The ThreadId type itself is defined in std/PrelConc.lhs.
145
146 -- Rather than define a new primitve, we use a little helper function
147 -- cmp_thread in the RTS.
148
149 #ifdef __GLASGOW_HASKELL__
150 foreign import ccall unsafe "cmp_thread" cmp_thread :: Addr# -> Addr# -> Int
151 -- Returns -1, 0, 1
152
153 cmpThread :: ThreadId -> ThreadId -> Ordering
154 cmpThread (ThreadId t1) (ThreadId t2) =
155 case cmp_thread (unsafeCoerce# t1) (unsafeCoerce# t2) of
156 -1 -> LT
157 0 -> EQ
158 _ -> GT -- must be 1
159
160 instance Eq ThreadId where
161 t1 == t2 =
162 case t1 `cmpThread` t2 of
163 EQ -> True
164 _ -> False
165
166 instance Ord ThreadId where
167 compare = cmpThread
168
169 foreign import ccall unsafe "rts_getThreadId" getThreadId :: Addr# -> Int
170
171 instance Show ThreadId where
172 showsPrec d (ThreadId t) =
173 showString "ThreadId " .
174 showsPrec d (getThreadId (unsafeCoerce# t))
175
176 {- |
177 This sparks off a new thread to run the 'IO' computation passed as the
178 first argument, and returns the 'ThreadId' of the newly created
179 thread.
180 -}
181 forkIO :: IO () -> IO ThreadId
182 forkIO action = IO $ \ s ->
183 case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
184 where
185 action_plus = Exception.catch action childHandler
186
187 childHandler :: Exception -> IO ()
188 childHandler err = Exception.catch (real_handler err) childHandler
189
190 real_handler :: Exception -> IO ()
191 real_handler ex =
192 case ex of
193 -- ignore thread GC and killThread exceptions:
194 BlockedOnDeadMVar -> return ()
195 AsyncException ThreadKilled -> return ()
196
197 -- report all others:
198 AsyncException StackOverflow -> reportStackOverflow False
199 ErrorCall s -> reportError False s
200 other -> reportError False (showsPrec 0 other "\n")
201
202 #endif /* __GLASGOW_HASKELL__ */
203
204
205 max_buff_size :: Int
206 max_buff_size = 1
207
208 mergeIO :: [a] -> [a] -> IO [a]
209 nmergeIO :: [[a]] -> IO [a]
210
211 -- $merge
212 -- The 'mergeIO' and 'nmergeIO' functions fork one thread for each
213 -- input list that concurrently evaluates that list; the results are
214 -- merged into a single output list.
215 --
216 -- Note: Hugs does not provide these functions, since they require
217 -- preemptive multitasking.
218
219 mergeIO ls rs
220 = newEmptyMVar >>= \ tail_node ->
221 newMVar tail_node >>= \ tail_list ->
222 newQSem max_buff_size >>= \ e ->
223 newMVar 2 >>= \ branches_running ->
224 let
225 buff = (tail_list,e)
226 in
227 forkIO (suckIO branches_running buff ls) >>
228 forkIO (suckIO branches_running buff rs) >>
229 takeMVar tail_node >>= \ val ->
230 signalQSem e >>
231 return val
232
233 type Buffer a
234 = (MVar (MVar [a]), QSem)
235
236 suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
237
238 suckIO branches_running buff@(tail_list,e) vs
239 = case vs of
240 [] -> takeMVar branches_running >>= \ val ->
241 if val == 1 then
242 takeMVar tail_list >>= \ node ->
243 putMVar node [] >>
244 putMVar tail_list node
245 else
246 putMVar branches_running (val-1)
247 (x:xs) ->
248 waitQSem e >>
249 takeMVar tail_list >>= \ node ->
250 newEmptyMVar >>= \ next_node ->
251 unsafeInterleaveIO (
252 takeMVar next_node >>= \ y ->
253 signalQSem e >>
254 return y) >>= \ next_node_val ->
255 putMVar node (x:next_node_val) >>
256 putMVar tail_list next_node >>
257 suckIO branches_running buff xs
258
259 nmergeIO lss
260 = let
261 len = length lss
262 in
263 newEmptyMVar >>= \ tail_node ->
264 newMVar tail_node >>= \ tail_list ->
265 newQSem max_buff_size >>= \ e ->
266 newMVar len >>= \ branches_running ->
267 let
268 buff = (tail_list,e)
269 in
270 mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
271 takeMVar tail_node >>= \ val ->
272 signalQSem e >>
273 return val
274 where
275 mapIO f xs = sequence (map f xs)
276
277 -- ---------------------------------------------------------------------------
278 -- More docs
279
280 {- $termination
281
282 In a standalone GHC program, only the main thread is
283 required to terminate in order for the process to terminate.
284 Thus all other forked threads will simply terminate at the same
285 time as the main thread (the terminology for this kind of
286 behaviour is \"daemonic threads\").
287
288 If you want the program to wait for child threads to
289 finish before exiting, you need to program this yourself. A
290 simple mechanism is to have each child thread write to an
291 'MVar' when it completes, and have the main
292 thread wait on all the 'MVar's before
293 exiting:
294
295 > myForkIO :: IO () -> IO (MVar ())
296 > myForkIO io = do
297 > mvar \<- newEmptyMVar
298 > forkIO (io \`finally\` putMVar mvar ())
299 > return mvar
300
301 Note that we use 'finally' from the
302 "Exception" module to make sure that the
303 'MVar' is written to even if the thread dies or
304 is killed for some reason.
305
306 A better method is to keep a global list of all child
307 threads which we should wait for at the end of the program:
308
309 > children :: MVar [MVar ()]
310 > children = unsafePerformIO (newMVar [])
311 >
312 > waitForChildren :: IO ()
313 > waitForChildren = do
314 > (mvar:mvars) \<- takeMVar children
315 > putMVar children mvars
316 > takeMVar mvar
317 > waitForChildren
318 >
319 > forkChild :: IO () -> IO ()
320 > forkChild io = do
321 > mvar \<- newEmptyMVar
322 > forkIO (p \`finally\` putMVar mvar ())
323 > childs \<- takeMVar children
324 > putMVar children (mvar:childs)
325 >
326 > later = flip finally
327 >
328 > main =
329 > later waitForChildren $
330 > ...
331
332 The main thread principle also applies to calls to Haskell from
333 outside, using @foreign export@. When the @foreign export@ed
334 function is invoked, it starts a new main thread, and it returns
335 when this main thread terminates. If the call causes new
336 threads to be forked, they may remain in the system after the
337 @foreign export@ed function has returned.
338 -}
339
340 {- $preemption
341
342 GHC implements pre-emptive multitasking: the execution of
343 threads are interleaved in a random fashion. More specifically,
344 a thread may be pre-empted whenever it allocates some memory,
345 which unfortunately means that tight loops which do no
346 allocation tend to lock out other threads (this only seems to
347 happen with pathalogical benchmark-style code, however).
348
349 The rescheduling timer runs on a 20ms granularity by
350 default, but this may be altered using the
351 @-i<n>@ RTS option. After a rescheduling
352 \"tick\" the running thread is pre-empted as soon as
353 possible.
354
355 One final note: the
356 @aaaa@ @bbbb@ example may not
357 work too well on GHC (see Scheduling, above), due
358 to the locking on a 'Handle'. Only one thread
359 may hold the lock on a 'Handle' at any one
360 time, so if a reschedule happens while a thread is holding the
361 lock, the other thread won't be able to run. The upshot is that
362 the switch from @aaaa@ to
363 @bbbbb@ happens infrequently. It can be
364 improved by lowering the reschedule tick period. We also have a
365 patch that causes a reschedule whenever a thread waiting on a
366 lock is woken up, but haven't found it to be useful for anything
367 other than this example :-)
368 -}