b53bf54b91a843c544ebfef5415265e83f641131
[packages/base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
6 -- |
7 -- Module      :  GHC.Conc
8 -- Copyright   :  (c) The University of Glasgow, 1994-2002
9 -- License     :  see libraries/base/LICENSE
10 -- 
11 -- Maintainer  :  cvs-ghc@haskell.org
12 -- Stability   :  internal
13 -- Portability :  non-portable (GHC extensions)
14 --
15 -- Basic concurrency stuff.
16 -- 
17 -----------------------------------------------------------------------------
18
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home.  Hence:
23
24 #include "Typeable.h"
25
26 -- #not-home
27 module GHC.Conc
28         ( ThreadId(..)
29
30         -- * Forking and suchlike
31         , forkIO        -- :: IO a -> IO ThreadId
32         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
33         , numCapabilities -- :: Int
34         , childHandler  -- :: Exception -> IO ()
35         , myThreadId    -- :: IO ThreadId
36         , killThread    -- :: ThreadId -> IO ()
37         , throwTo       -- :: ThreadId -> Exception -> IO ()
38         , par           -- :: a -> b -> b
39         , pseq          -- :: a -> b -> b
40         , runSparks
41         , yield         -- :: IO ()
42         , labelThread   -- :: ThreadId -> String -> IO ()
43
44         , ThreadStatus(..), BlockReason(..)
45         , threadStatus  -- :: ThreadId -> IO ThreadStatus
46
47         -- * Waiting
48         , threadDelay           -- :: Int -> IO ()
49         , registerDelay         -- :: Int -> IO (TVar Bool)
50         , threadWaitRead        -- :: Int -> IO ()
51         , threadWaitWrite       -- :: Int -> IO ()
52
53         -- * MVars
54         , MVar(..)
55         , newMVar       -- :: a -> IO (MVar a)
56         , newEmptyMVar  -- :: IO (MVar a)
57         , takeMVar      -- :: MVar a -> IO a
58         , putMVar       -- :: MVar a -> a -> IO ()
59         , tryTakeMVar   -- :: MVar a -> IO (Maybe a)
60         , tryPutMVar    -- :: MVar a -> a -> IO Bool
61         , isEmptyMVar   -- :: MVar a -> IO Bool
62         , addMVarFinalizer -- :: MVar a -> IO () -> IO ()
63
64         -- * TVars
65         , STM(..)
66         , atomically    -- :: STM a -> IO a
67         , retry         -- :: STM a
68         , orElse        -- :: STM a -> STM a -> STM a
69         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
70         , alwaysSucceeds -- :: STM a -> STM ()
71         , always        -- :: STM Bool -> STM ()
72         , TVar(..)
73         , newTVar       -- :: a -> STM (TVar a)
74         , newTVarIO     -- :: a -> STM (TVar a)
75         , readTVar      -- :: TVar a -> STM a
76         , readTVarIO    -- :: TVar a -> IO a
77         , writeTVar     -- :: a -> TVar a -> STM ()
78         , unsafeIOToSTM -- :: IO a -> STM a
79
80         -- * Miscellaneous
81 #ifdef mingw32_HOST_OS
82         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
83         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
84         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
85
86         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
87         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
88 #endif
89
90 #ifndef mingw32_HOST_OS
91         , Signal, HandlerFun, setHandler, runHandlers
92 #endif
93
94         , ensureIOManagerIsRunning
95 #ifndef mingw32_HOST_OS
96         , syncIOManager
97 #endif
98
99 #ifdef mingw32_HOST_OS
100         , ConsoleEvent(..)
101         , win32ConsoleHandler
102         , toWin32ConsoleEvent
103 #endif
104         , setUncaughtExceptionHandler      -- :: (Exception -> IO ()) -> IO ()
105         , getUncaughtExceptionHandler      -- :: IO (Exception -> IO ())
106
107         , reportError, reportStackOverflow
108         ) where
109
110 import System.Posix.Types
111 #ifndef mingw32_HOST_OS
112 import System.Posix.Internals
113 #endif
114 import Foreign
115 import Foreign.C
116
117 #ifndef mingw32_HOST_OS
118 import Data.Dynamic
119 import Control.Monad
120 #endif
121 import Data.Maybe
122
123 import GHC.Base
124 import {-# SOURCE #-} GHC.Handle
125 import GHC.IOBase
126 import GHC.Num          ( Num(..) )
127 import GHC.Real         ( fromIntegral )
128 #ifndef mingw32_HOST_OS
129 import GHC.Arr          ( inRange )
130 #endif
131 #ifdef mingw32_HOST_OS
132 import GHC.Real         ( div )
133 import GHC.Ptr          ( plusPtr, FunPtr(..) )
134 #endif
135 #ifdef mingw32_HOST_OS
136 import GHC.Read         ( Read )
137 import GHC.Enum         ( Enum )
138 #endif
139 import GHC.Exception    ( SomeException(..), throw )
140 import GHC.Pack         ( packCString# )
141 import GHC.Ptr          ( Ptr(..) )
142 import GHC.STRef
143 import GHC.Show         ( Show(..), showString )
144 import Data.Typeable
145 import GHC.Err
146
147 infixr 0 `par`, `pseq`
148 \end{code}
149
150 %************************************************************************
151 %*                                                                      *
152 \subsection{@ThreadId@, @par@, and @fork@}
153 %*                                                                      *
154 %************************************************************************
155
156 \begin{code}
157 data ThreadId = ThreadId ThreadId# deriving( Typeable )
158 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
159 -- But since ThreadId# is unlifted, the Weak type must use open
160 -- type variables.
161 {- ^
162 A 'ThreadId' is an abstract type representing a handle to a thread.
163 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
164 the 'Ord' instance implements an arbitrary total ordering over
165 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
166 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
167 useful when debugging or diagnosing the behaviour of a concurrent
168 program.
169
170 /Note/: in GHC, if you have a 'ThreadId', you essentially have
171 a pointer to the thread itself.  This means the thread itself can\'t be
172 garbage collected until you drop the 'ThreadId'.
173 This misfeature will hopefully be corrected at a later date.
174
175 /Note/: Hugs does not provide any operations on other threads;
176 it defines 'ThreadId' as a synonym for ().
177 -}
178
179 instance Show ThreadId where
180    showsPrec d t = 
181         showString "ThreadId " . 
182         showsPrec d (getThreadId (id2TSO t))
183
184 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
185
186 id2TSO :: ThreadId -> ThreadId#
187 id2TSO (ThreadId t) = t
188
189 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
190 -- Returns -1, 0, 1
191
192 cmpThread :: ThreadId -> ThreadId -> Ordering
193 cmpThread t1 t2 = 
194    case cmp_thread (id2TSO t1) (id2TSO t2) of
195       -1 -> LT
196       0  -> EQ
197       _  -> GT -- must be 1
198
199 instance Eq ThreadId where
200    t1 == t2 = 
201       case t1 `cmpThread` t2 of
202          EQ -> True
203          _  -> False
204
205 instance Ord ThreadId where
206    compare = cmpThread
207
208 {- |
209 Sparks off a new thread to run the 'IO' computation passed as the
210 first argument, and returns the 'ThreadId' of the newly created
211 thread.
212
213 The new thread will be a lightweight thread; if you want to use a foreign
214 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
215
216 GHC note: the new thread inherits the /blocked/ state of the parent 
217 (see 'Control.Exception.block').
218
219 The newly created thread has an exception handler that discards the
220 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
221 'ThreadKilled', and passes all other exceptions to the uncaught
222 exception handler (see 'setUncaughtExceptionHandler').
223 -}
224 forkIO :: IO () -> IO ThreadId
225 forkIO action = IO $ \ s -> 
226    case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
227  where
228   action_plus = catchException action childHandler
229
230 {- |
231 Like 'forkIO', but lets you specify on which CPU the thread is
232 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
233 will stay on the same CPU for its entire lifetime (`forkIO` threads
234 can migrate between CPUs according to the scheduling policy).
235 `forkOnIO` is useful for overriding the scheduling policy when you
236 know in advance how best to distribute the threads.
237
238 The `Int` argument specifies the CPU number; it is interpreted modulo
239 'numCapabilities' (note that it actually specifies a capability number
240 rather than a CPU number, but to a first approximation the two are
241 equivalent).
242 -}
243 forkOnIO :: Int -> IO () -> IO ThreadId
244 forkOnIO (I# cpu) action = IO $ \ s -> 
245    case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
246  where
247   action_plus = catchException action childHandler
248
249 -- | the value passed to the @+RTS -N@ flag.  This is the number of
250 -- Haskell threads that can run truly simultaneously at any given
251 -- time, and is typically set to the number of physical CPU cores on
252 -- the machine.
253 numCapabilities :: Int
254 numCapabilities = unsafePerformIO $  do 
255                     n <- peek n_capabilities
256                     return (fromIntegral n)
257
258 #if defined(mingw32_HOST_OS) && defined(__PIC__)
259 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
260 #else
261 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
262 #endif
263 childHandler :: SomeException -> IO ()
264 childHandler err = catchException (real_handler err) childHandler
265
266 real_handler :: SomeException -> IO ()
267 real_handler se@(SomeException ex) =
268   -- ignore thread GC and killThread exceptions:
269   case cast ex of
270   Just BlockedOnDeadMVar                -> return ()
271   _ -> case cast ex of
272        Just BlockedIndefinitely         -> return ()
273        _ -> case cast ex of
274             Just ThreadKilled           -> return ()
275             _ -> case cast ex of
276                  -- report all others:
277                  Just StackOverflow     -> reportStackOverflow
278                  _                      -> reportError se
279
280 {- | 'killThread' terminates the given thread (GHC only).
281 Any work already done by the thread isn\'t
282 lost: the computation is suspended until required by another thread.
283 The memory used by the thread will be garbage collected if it isn\'t
284 referenced from anywhere.  The 'killThread' function is defined in
285 terms of 'throwTo':
286
287 > killThread tid = throwTo tid ThreadKilled
288
289 Killthread is a no-op if the target thread has already completed.
290 -}
291 killThread :: ThreadId -> IO ()
292 killThread tid = throwTo tid ThreadKilled
293
294 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
295
296 'throwTo' does not return until the exception has been raised in the
297 target thread. 
298 The calling thread can thus be certain that the target
299 thread has received the exception.  This is a useful property to know
300 when dealing with race conditions: eg. if there are two threads that
301 can kill each other, it is guaranteed that only one of the threads
302 will get to kill the other.
303
304 If the target thread is currently making a foreign call, then the
305 exception will not be raised (and hence 'throwTo' will not return)
306 until the call has completed.  This is the case regardless of whether
307 the call is inside a 'block' or not.
308
309 Important note: the behaviour of 'throwTo' differs from that described in
310 the paper \"Asynchronous exceptions in Haskell\"
311 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
312 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
313 a more synchronous design in which 'throwTo' does not return until the exception
314 is received by the target thread.  The trade-off is discussed in Section 9 of the paper.
315 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
316 the paper).
317
318 There is currently no guarantee that the exception delivered by 'throwTo' will be
319 delivered at the first possible opportunity.  In particular, a thread may 
320 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
321 a pending 'throwTo'.  This is arguably undesirable behaviour.
322
323  -}
324 throwTo :: Exception e => ThreadId -> e -> IO ()
325 throwTo (ThreadId tid) ex = IO $ \ s ->
326    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
327
328 -- | Returns the 'ThreadId' of the calling thread (GHC only).
329 myThreadId :: IO ThreadId
330 myThreadId = IO $ \s ->
331    case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
332
333
334 -- |The 'yield' action allows (forces, in a co-operative multitasking
335 -- implementation) a context-switch to any other currently runnable
336 -- threads (if any), and is occasionally useful when implementing
337 -- concurrency abstractions.
338 yield :: IO ()
339 yield = IO $ \s -> 
340    case (yield# s) of s1 -> (# s1, () #)
341
342 {- | 'labelThread' stores a string as identifier for this thread if
343 you built a RTS with debugging support. This identifier will be used in
344 the debugging output to make distinction of different threads easier
345 (otherwise you only have the thread state object\'s address in the heap).
346
347 Other applications like the graphical Concurrent Haskell Debugger
348 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
349 'labelThread' for their purposes as well.
350 -}
351
352 labelThread :: ThreadId -> String -> IO ()
353 labelThread (ThreadId t) str = IO $ \ s ->
354    let !ps  = packCString# str
355        !adr = byteArrayContents# ps in
356      case (labelThread# t adr s) of s1 -> (# s1, () #)
357
358 --      Nota Bene: 'pseq' used to be 'seq'
359 --                 but 'seq' is now defined in PrelGHC
360 --
361 -- "pseq" is defined a bit weirdly (see below)
362 --
363 -- The reason for the strange "lazy" call is that
364 -- it fools the compiler into thinking that pseq  and par are non-strict in
365 -- their second argument (even if it inlines pseq at the call site).
366 -- If it thinks pseq is strict in "y", then it often evaluates
367 -- "y" before "x", which is totally wrong.  
368
369 {-# INLINE pseq  #-}
370 pseq :: a -> b -> b
371 pseq  x y = x `seq` lazy y
372
373 {-# INLINE par  #-}
374 par :: a -> b -> b
375 par  x y = case (par# x) of { _ -> lazy y }
376
377 -- | Internal function used by the RTS to run sparks.
378 runSparks :: IO ()
379 runSparks = IO loop
380   where loop s = case getSpark# s of
381                    (# s', n, p #) ->
382                       if n ==# 0# then (# s', () #)
383                                   else p `seq` loop s'
384
385 data BlockReason
386   = BlockedOnMVar
387         -- ^blocked on on 'MVar'
388   | BlockedOnBlackHole
389         -- ^blocked on a computation in progress by another thread
390   | BlockedOnException
391         -- ^blocked in 'throwTo'
392   | BlockedOnSTM
393         -- ^blocked in 'retry' in an STM transaction
394   | BlockedOnForeignCall
395         -- ^currently in a foreign call
396   | BlockedOnOther
397         -- ^blocked on some other resource.  Without @-threaded@,
398         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
399         -- they show up as 'BlockedOnMVar'.
400   deriving (Eq,Ord,Show)
401
402 -- | The current status of a thread
403 data ThreadStatus
404   = ThreadRunning
405         -- ^the thread is currently runnable or running
406   | ThreadFinished
407         -- ^the thread has finished
408   | ThreadBlocked  BlockReason
409         -- ^the thread is blocked on some resource
410   | ThreadDied
411         -- ^the thread received an uncaught exception
412   deriving (Eq,Ord,Show)
413
414 threadStatus :: ThreadId -> IO ThreadStatus
415 threadStatus (ThreadId t) = IO $ \s ->
416    case threadStatus# t s of
417      (# s', stat #) -> (# s', mk_stat (I# stat) #)
418    where
419         -- NB. keep these in sync with includes/Constants.h
420      mk_stat 0  = ThreadRunning
421      mk_stat 1  = ThreadBlocked BlockedOnMVar
422      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
423      mk_stat 3  = ThreadBlocked BlockedOnException
424      mk_stat 7  = ThreadBlocked BlockedOnSTM
425      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
426      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
427      mk_stat 16 = ThreadFinished
428      mk_stat 17 = ThreadDied
429      mk_stat _  = ThreadBlocked BlockedOnOther
430 \end{code}
431
432
433 %************************************************************************
434 %*                                                                      *
435 \subsection[stm]{Transactional heap operations}
436 %*                                                                      *
437 %************************************************************************
438
439 TVars are shared memory locations which support atomic memory
440 transactions.
441
442 \begin{code}
443 -- |A monad supporting atomic memory transactions.
444 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
445
446 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
447 unSTM (STM a) = a
448
449 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
450
451 instance  Functor STM where
452    fmap f x = x >>= (return . f)
453
454 instance  Monad STM  where
455     {-# INLINE return #-}
456     {-# INLINE (>>)   #-}
457     {-# INLINE (>>=)  #-}
458     m >> k      = thenSTM m k
459     return x    = returnSTM x
460     m >>= k     = bindSTM m k
461
462 bindSTM :: STM a -> (a -> STM b) -> STM b
463 bindSTM (STM m) k = STM ( \s ->
464   case m s of 
465     (# new_s, a #) -> unSTM (k a) new_s
466   )
467
468 thenSTM :: STM a -> STM b -> STM b
469 thenSTM (STM m) k = STM ( \s ->
470   case m s of 
471     (# new_s, _ #) -> unSTM k new_s
472   )
473
474 returnSTM :: a -> STM a
475 returnSTM x = STM (\s -> (# s, x #))
476
477 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
478 -- dangerous thing to do.  
479 --
480 --   * The STM implementation will often run transactions multiple
481 --     times, so you need to be prepared for this if your IO has any
482 --     side effects.
483 --
484 --   * The STM implementation will abort transactions that are known to
485 --     be invalid and need to be restarted.  This may happen in the middle
486 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
487 --     that need releasing (exception handlers are ignored when aborting
488 --     the transaction).  That includes doing any IO using Handles, for
489 --     example.  Getting this wrong will probably lead to random deadlocks.
490 --
491 --   * The transaction may have seen an inconsistent view of memory when
492 --     the IO runs.  Invariants that you expect to be true throughout
493 --     your program may not be true inside a transaction, due to the
494 --     way transactions are implemented.  Normally this wouldn't be visible
495 --     to the programmer, but using `unsafeIOToSTM` can expose it.
496 --
497 unsafeIOToSTM :: IO a -> STM a
498 unsafeIOToSTM (IO m) = STM m
499
500 -- |Perform a series of STM actions atomically.
501 --
502 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
503 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
504 -- this would effectively allow a transaction inside a transaction, depending
505 -- on exactly when the thunk is evaluated.)
506 --
507 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
508 -- and which allows top-level TVars to be allocated.
509
510 atomically :: STM a -> IO a
511 atomically (STM m) = IO (\s -> (atomically# m) s )
512
513 -- |Retry execution of the current memory transaction because it has seen
514 -- values in TVars which mean that it should not continue (e.g. the TVars
515 -- represent a shared buffer that is now empty).  The implementation may
516 -- block the thread until one of the TVars that it has read from has been
517 -- udpated. (GHC only)
518 retry :: STM a
519 retry = STM $ \s# -> retry# s#
520
521 -- |Compose two alternative STM actions (GHC only).  If the first action
522 -- completes without retrying then it forms the result of the orElse.
523 -- Otherwise, if the first action retries, then the second action is
524 -- tried in its place.  If both actions retry then the orElse as a
525 -- whole retries.
526 orElse :: STM a -> STM a -> STM a
527 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
528
529 -- |Exception handling within STM actions.
530 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
531 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
532
533 -- | Low-level primitive on which always and alwaysSucceeds are built.
534 -- checkInv differs form these in that (i) the invariant is not 
535 -- checked when checkInv is called, only at the end of this and
536 -- subsequent transcations, (ii) the invariant failure is indicated
537 -- by raising an exception.
538 checkInv :: STM a -> STM ()
539 checkInv (STM m) = STM (\s -> (check# m) s)
540
541 -- | alwaysSucceeds adds a new invariant that must be true when passed
542 -- to alwaysSucceeds, at the end of the current transaction, and at
543 -- the end of every subsequent transaction.  If it fails at any
544 -- of those points then the transaction violating it is aborted
545 -- and the exception raised by the invariant is propagated.
546 alwaysSucceeds :: STM a -> STM ()
547 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
548                       checkInv i
549
550 -- | always is a variant of alwaysSucceeds in which the invariant is
551 -- expressed as an STM Bool action that must return True.  Returning
552 -- False or raising an exception are both treated as invariant failures.
553 always :: STM Bool -> STM ()
554 always i = alwaysSucceeds ( do v <- i
555                                if (v) then return () else ( error "Transacional invariant violation" ) )
556
557 -- |Shared memory locations that support atomic memory transactions.
558 data TVar a = TVar (TVar# RealWorld a)
559
560 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
561
562 instance Eq (TVar a) where
563         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
564
565 -- |Create a new TVar holding a value supplied
566 newTVar :: a -> STM (TVar a)
567 newTVar val = STM $ \s1# ->
568     case newTVar# val s1# of
569          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
570
571 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
572 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
573 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
574 -- possible.
575 newTVarIO :: a -> IO (TVar a)
576 newTVarIO val = IO $ \s1# ->
577     case newTVar# val s1# of
578          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
579
580 -- |Return the current value stored in a TVar.
581 -- This is equivalent to
582 --
583 -- >  readTVarIO = atomically . readTVar
584 --
585 -- but works much faster, because it doesn't perform a complete
586 -- transaction, it just reads the current value of the 'TVar'.
587 readTVarIO :: TVar a -> IO a
588 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
589
590 -- |Return the current value stored in a TVar
591 readTVar :: TVar a -> STM a
592 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
593
594 -- |Write the supplied value into a TVar
595 writeTVar :: TVar a -> a -> STM ()
596 writeTVar (TVar tvar#) val = STM $ \s1# ->
597     case writeTVar# tvar# val s1# of
598          s2# -> (# s2#, () #)
599   
600 \end{code}
601
602 %************************************************************************
603 %*                                                                      *
604 \subsection[mvars]{M-Structures}
605 %*                                                                      *
606 %************************************************************************
607
608 M-Vars are rendezvous points for concurrent threads.  They begin
609 empty, and any attempt to read an empty M-Var blocks.  When an M-Var
610 is written, a single blocked thread may be freed.  Reading an M-Var
611 toggles its state from full back to empty.  Therefore, any value
612 written to an M-Var may only be read once.  Multiple reads and writes
613 are allowed, but there must be at least one read between any two
614 writes.
615
616 \begin{code}
617 --Defined in IOBase to avoid cycle: data MVar a = MVar (SynchVar# RealWorld a)
618
619 -- |Create an 'MVar' which is initially empty.
620 newEmptyMVar  :: IO (MVar a)
621 newEmptyMVar = IO $ \ s# ->
622     case newMVar# s# of
623          (# s2#, svar# #) -> (# s2#, MVar svar# #)
624
625 -- |Create an 'MVar' which contains the supplied value.
626 newMVar :: a -> IO (MVar a)
627 newMVar value =
628     newEmptyMVar        >>= \ mvar ->
629     putMVar mvar value  >>
630     return mvar
631
632 -- |Return the contents of the 'MVar'.  If the 'MVar' is currently
633 -- empty, 'takeMVar' will wait until it is full.  After a 'takeMVar', 
634 -- the 'MVar' is left empty.
635 -- 
636 -- There are two further important properties of 'takeMVar':
637 --
638 --   * 'takeMVar' is single-wakeup.  That is, if there are multiple
639 --     threads blocked in 'takeMVar', and the 'MVar' becomes full,
640 --     only one thread will be woken up.  The runtime guarantees that
641 --     the woken thread completes its 'takeMVar' operation.
642 --
643 --   * When multiple threads are blocked on an 'MVar', they are
644 --     woken up in FIFO order.  This is useful for providing
645 --     fairness properties of abstractions built using 'MVar's.
646 --
647 takeMVar :: MVar a -> IO a
648 takeMVar (MVar mvar#) = IO $ \ s# -> takeMVar# mvar# s#
649
650 -- |Put a value into an 'MVar'.  If the 'MVar' is currently full,
651 -- 'putMVar' will wait until it becomes empty.
652 --
653 -- There are two further important properties of 'putMVar':
654 --
655 --   * 'putMVar' is single-wakeup.  That is, if there are multiple
656 --     threads blocked in 'putMVar', and the 'MVar' becomes empty,
657 --     only one thread will be woken up.  The runtime guarantees that
658 --     the woken thread completes its 'putMVar' operation.
659 --
660 --   * When multiple threads are blocked on an 'MVar', they are
661 --     woken up in FIFO order.  This is useful for providing
662 --     fairness properties of abstractions built using 'MVar's.
663 --
664 putMVar  :: MVar a -> a -> IO ()
665 putMVar (MVar mvar#) x = IO $ \ s# ->
666     case putMVar# mvar# x s# of
667         s2# -> (# s2#, () #)
668
669 -- |A non-blocking version of 'takeMVar'.  The 'tryTakeMVar' function
670 -- returns immediately, with 'Nothing' if the 'MVar' was empty, or
671 -- @'Just' a@ if the 'MVar' was full with contents @a@.  After 'tryTakeMVar',
672 -- the 'MVar' is left empty.
673 tryTakeMVar :: MVar a -> IO (Maybe a)
674 tryTakeMVar (MVar m) = IO $ \ s ->
675     case tryTakeMVar# m s of
676         (# s', 0#, _ #) -> (# s', Nothing #)      -- MVar is empty
677         (# s', _,  a #) -> (# s', Just a  #)      -- MVar is full
678
679 -- |A non-blocking version of 'putMVar'.  The 'tryPutMVar' function
680 -- attempts to put the value @a@ into the 'MVar', returning 'True' if
681 -- it was successful, or 'False' otherwise.
682 tryPutMVar  :: MVar a -> a -> IO Bool
683 tryPutMVar (MVar mvar#) x = IO $ \ s# ->
684     case tryPutMVar# mvar# x s# of
685         (# s, 0# #) -> (# s, False #)
686         (# s, _  #) -> (# s, True #)
687
688 -- |Check whether a given 'MVar' is empty.
689 --
690 -- Notice that the boolean value returned  is just a snapshot of
691 -- the state of the MVar. By the time you get to react on its result,
692 -- the MVar may have been filled (or emptied) - so be extremely
693 -- careful when using this operation.   Use 'tryTakeMVar' instead if possible.
694 isEmptyMVar :: MVar a -> IO Bool
695 isEmptyMVar (MVar mv#) = IO $ \ s# -> 
696     case isEmptyMVar# mv# s# of
697         (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
698
699 -- |Add a finalizer to an 'MVar' (GHC only).  See "Foreign.ForeignPtr" and
700 -- "System.Mem.Weak" for more about finalizers.
701 addMVarFinalizer :: MVar a -> IO () -> IO ()
702 addMVarFinalizer (MVar m) finalizer = 
703   IO $ \s -> case mkWeak# m () finalizer s of { (# s1, _ #) -> (# s1, () #) }
704 \end{code}
705
706
707 %************************************************************************
708 %*                                                                      *
709 \subsection{Thread waiting}
710 %*                                                                      *
711 %************************************************************************
712
713 \begin{code}
714 #ifdef mingw32_HOST_OS
715
716 -- Note: threadWaitRead and threadWaitWrite aren't really functional
717 -- on Win32, but left in there because lib code (still) uses them (the manner
718 -- in which they're used doesn't cause problems on a Win32 platform though.)
719
720 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
721 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
722   IO $ \s -> case asyncRead# fd isSock len buf s of 
723                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
724
725 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
726 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
727   IO $ \s -> case asyncWrite# fd isSock len buf s of 
728                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
729
730 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
731 asyncDoProc (FunPtr proc) (Ptr param) = 
732     -- the 'length' value is ignored; simplifies implementation of
733     -- the async*# primops to have them all return the same result.
734   IO $ \s -> case asyncDoProc# proc param s  of 
735                (# s', _len#, err# #) -> (# s', I# err# #)
736
737 -- to aid the use of these primops by the IO Handle implementation,
738 -- provide the following convenience funs:
739
740 -- this better be a pinned byte array!
741 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
742 asyncReadBA fd isSock len off bufB = 
743   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
744   
745 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
746 asyncWriteBA fd isSock len off bufB = 
747   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
748
749 #endif
750
751 -- -----------------------------------------------------------------------------
752 -- Thread IO API
753
754 -- | Block the current thread until data is available to read on the
755 -- given file descriptor (GHC only).
756 threadWaitRead :: Fd -> IO ()
757 threadWaitRead fd
758 #ifndef mingw32_HOST_OS
759   | threaded  = waitForReadEvent fd
760 #endif
761   | otherwise = IO $ \s -> 
762         case fromIntegral fd of { I# fd# ->
763         case waitRead# fd# s of { s' -> (# s', () #)
764         }}
765
766 -- | Block the current thread until data can be written to the
767 -- given file descriptor (GHC only).
768 threadWaitWrite :: Fd -> IO ()
769 threadWaitWrite fd
770 #ifndef mingw32_HOST_OS
771   | threaded  = waitForWriteEvent fd
772 #endif
773   | otherwise = IO $ \s -> 
774         case fromIntegral fd of { I# fd# ->
775         case waitWrite# fd# s of { s' -> (# s', () #)
776         }}
777
778 -- | Suspends the current thread for a given number of microseconds
779 -- (GHC only).
780 --
781 -- There is no guarantee that the thread will be rescheduled promptly
782 -- when the delay has expired, but the thread will never continue to
783 -- run /earlier/ than specified.
784 --
785 threadDelay :: Int -> IO ()
786 threadDelay time
787   | threaded  = waitForDelayEvent time
788   | otherwise = IO $ \s -> 
789         case fromIntegral time of { I# time# ->
790         case delay# time# s of { s' -> (# s', () #)
791         }}
792
793
794 -- | Set the value of returned TVar to True after a given number of
795 -- microseconds. The caveats associated with threadDelay also apply.
796 --
797 registerDelay :: Int -> IO (TVar Bool)
798 registerDelay usecs 
799   | threaded = waitForDelayEventSTM usecs
800   | otherwise = error "registerDelay: requires -threaded"
801
802 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
803
804 waitForDelayEvent :: Int -> IO ()
805 waitForDelayEvent usecs = do
806   m <- newEmptyMVar
807   target <- calculateTarget usecs
808   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
809   prodServiceThread
810   takeMVar m
811
812 -- Delays for use in STM
813 waitForDelayEventSTM :: Int -> IO (TVar Bool)
814 waitForDelayEventSTM usecs = do
815    t <- atomically $ newTVar False
816    target <- calculateTarget usecs
817    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
818    prodServiceThread
819    return t  
820     
821 calculateTarget :: Int -> IO USecs
822 calculateTarget usecs = do
823     now <- getUSecOfDay
824     return $ now + (fromIntegral usecs)
825
826
827 -- ----------------------------------------------------------------------------
828 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
829
830 -- In the threaded RTS, we employ a single IO Manager thread to wait
831 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
832 -- and delays (threadDelay).  
833 --
834 -- We can do this because in the threaded RTS the IO Manager can make
835 -- a non-blocking call to select(), so we don't have to do select() in
836 -- the scheduler as we have to in the non-threaded RTS.  We get performance
837 -- benefits from doing it this way, because we only have to restart the select()
838 -- when a new request arrives, rather than doing one select() each time
839 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
840 -- by not having to check for completed IO requests.
841
842 -- Issues, possible problems:
843 --
844 --      - we might want bound threads to just do the blocking
845 --        operation rather than communicating with the IO manager
846 --        thread.  This would prevent simgle-threaded programs which do
847 --        IO from requiring multiple OS threads.  However, it would also
848 --        prevent bound threads waiting on IO from being killed or sent
849 --        exceptions.
850 --
851 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
852 --        I couldn't repeat this.
853 --
854 --      - How do we handle signal delivery in the multithreaded RTS?
855 --
856 --      - forkProcess will kill the IO manager thread.  Let's just
857 --        hope we don't need to do any blocking IO between fork & exec.
858
859 #ifndef mingw32_HOST_OS
860 data IOReq
861   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
862   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
863 #endif
864
865 data DelayReq
866   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
867   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
868
869 #ifndef mingw32_HOST_OS
870 pendingEvents :: IORef [IOReq]
871 #endif
872 pendingDelays :: IORef [DelayReq]
873         -- could use a strict list or array here
874 {-# NOINLINE pendingEvents #-}
875 {-# NOINLINE pendingDelays #-}
876 (pendingEvents,pendingDelays) = unsafePerformIO $ do
877   startIOManagerThread
878   reqs <- newIORef []
879   dels <- newIORef []
880   return (reqs, dels)
881         -- the first time we schedule an IO request, the service thread
882         -- will be created (cool, huh?)
883
884 ensureIOManagerIsRunning :: IO ()
885 ensureIOManagerIsRunning 
886   | threaded  = seq pendingEvents $ return ()
887   | otherwise = return ()
888
889 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
890 insertDelay d [] = [d]
891 insertDelay d1 ds@(d2 : rest)
892   | delayTime d1 <= delayTime d2 = d1 : ds
893   | otherwise                    = d2 : insertDelay d1 rest
894
895 delayTime :: DelayReq -> USecs
896 delayTime (Delay t _) = t
897 delayTime (DelaySTM t _) = t
898
899 type USecs = Word64
900
901 -- XXX: move into GHC.IOBase from Data.IORef?
902 atomicModifyIORef :: IORef a -> (a -> (a,b)) -> IO b
903 atomicModifyIORef (IORef (STRef r#)) f = IO $ \s -> atomicModifyMutVar# r# f s
904
905 foreign import ccall unsafe "getUSecOfDay" 
906   getUSecOfDay :: IO USecs
907
908 prodding :: IORef Bool
909 {-# NOINLINE prodding #-}
910 prodding = unsafePerformIO (newIORef False)
911
912 prodServiceThread :: IO ()
913 prodServiceThread = do
914   was_set <- atomicModifyIORef prodding (\a -> (True,a))
915   if (not (was_set)) then wakeupIOManager else return ()
916
917 #ifdef mingw32_HOST_OS
918 -- ----------------------------------------------------------------------------
919 -- Windows IO manager thread
920
921 startIOManagerThread :: IO ()
922 startIOManagerThread = do
923   wakeup <- c_getIOManagerEvent
924   forkIO $ service_loop wakeup []
925   return ()
926
927 service_loop :: HANDLE          -- read end of pipe
928              -> [DelayReq]      -- current delay requests
929              -> IO ()
930
931 service_loop wakeup old_delays = do
932   -- pick up new delay requests
933   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
934   let  delays = foldr insertDelay old_delays new_delays
935
936   now <- getUSecOfDay
937   (delays', timeout) <- getDelay now delays
938
939   r <- c_WaitForSingleObject wakeup timeout
940   case r of
941     0xffffffff -> do c_maperrno; throwErrno "service_loop"
942     0 -> do
943         r2 <- c_readIOManagerEvent
944         exit <- 
945               case r2 of
946                 _ | r2 == io_MANAGER_WAKEUP -> return False
947                 _ | r2 == io_MANAGER_DIE    -> return True
948                 0 -> return False -- spurious wakeup
949                 _ -> do start_console_handler (r2 `shiftR` 1); return False
950         if exit
951           then return ()
952           else service_cont wakeup delays'
953
954     _other -> service_cont wakeup delays' -- probably timeout        
955
956 service_cont :: HANDLE -> [DelayReq] -> IO ()
957 service_cont wakeup delays = do
958   r <- atomicModifyIORef prodding (\_ -> (False,False))
959   r `seq` return () -- avoid space leak
960   service_loop wakeup delays
961
962 -- must agree with rts/win32/ThrIOManager.c
963 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
964 io_MANAGER_WAKEUP = 0xffffffff
965 io_MANAGER_DIE    = 0xfffffffe
966
967 data ConsoleEvent
968  = ControlC
969  | Break
970  | Close
971     -- these are sent to Services only.
972  | Logoff
973  | Shutdown
974  deriving (Eq, Ord, Enum, Show, Read, Typeable)
975
976 start_console_handler :: Word32 -> IO ()
977 start_console_handler r =
978   case toWin32ConsoleEvent r of
979      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
980                     forkIO (handler x)
981                     return ()
982      Nothing -> return ()
983
984 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
985 toWin32ConsoleEvent ev = 
986    case ev of
987        0 {- CTRL_C_EVENT-}        -> Just ControlC
988        1 {- CTRL_BREAK_EVENT-}    -> Just Break
989        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
990        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
991        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
992        _ -> Nothing
993
994 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
995 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
996
997 -- XXX Is this actually needed?
998 stick :: IORef HANDLE
999 {-# NOINLINE stick #-}
1000 stick = unsafePerformIO (newIORef nullPtr)
1001
1002 wakeupIOManager :: IO ()
1003 wakeupIOManager = do 
1004   _hdl <- readIORef stick
1005   c_sendIOManagerEvent io_MANAGER_WAKEUP
1006
1007 -- Walk the queue of pending delays, waking up any that have passed
1008 -- and return the smallest delay to wait for.  The queue of pending
1009 -- delays is kept ordered.
1010 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
1011 getDelay _   [] = return ([], iNFINITE)
1012 getDelay now all@(d : rest) 
1013   = case d of
1014      Delay time m | now >= time -> do
1015         putMVar m ()
1016         getDelay now rest
1017      DelaySTM time t | now >= time -> do
1018         atomically $ writeTVar t True
1019         getDelay now rest
1020      _otherwise ->
1021         -- delay is in millisecs for WaitForSingleObject
1022         let micro_seconds = delayTime d - now
1023             milli_seconds = (micro_seconds + 999) `div` 1000
1024         in return (all, fromIntegral milli_seconds)
1025
1026 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
1027 -- available yet.  We should move some Win32 functionality down here,
1028 -- maybe as part of the grand reorganisation of the base package...
1029 type HANDLE       = Ptr ()
1030 type DWORD        = Word32
1031
1032 iNFINITE :: DWORD
1033 iNFINITE = 0xFFFFFFFF -- urgh
1034
1035 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
1036   c_getIOManagerEvent :: IO HANDLE
1037
1038 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
1039   c_readIOManagerEvent :: IO Word32
1040
1041 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
1042   c_sendIOManagerEvent :: Word32 -> IO ()
1043
1044 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
1045    c_maperrno :: IO ()
1046
1047 foreign import stdcall "WaitForSingleObject"
1048    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
1049
1050 #else
1051 -- ----------------------------------------------------------------------------
1052 -- Unix IO manager thread, using select()
1053
1054 startIOManagerThread :: IO ()
1055 startIOManagerThread = do
1056         allocaArray 2 $ \fds -> do
1057         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
1058         rd_end <- peekElemOff fds 0
1059         wr_end <- peekElemOff fds 1
1060         setNonBlockingFD wr_end -- writes happen in a signal handler, we
1061                                 -- don't want them to block.
1062         setCloseOnExec rd_end
1063         setCloseOnExec wr_end
1064         writeIORef stick (fromIntegral wr_end)
1065         c_setIOManagerPipe wr_end
1066         forkIO $ do
1067             allocaBytes sizeofFdSet   $ \readfds -> do
1068             allocaBytes sizeofFdSet   $ \writefds -> do 
1069             allocaBytes sizeofTimeVal $ \timeval -> do
1070             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
1071         return ()
1072
1073 service_loop
1074    :: Fd                -- listen to this for wakeup calls
1075    -> Ptr CFdSet
1076    -> Ptr CFdSet
1077    -> Ptr CTimeVal
1078    -> [IOReq]
1079    -> [DelayReq]
1080    -> IO ()
1081 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
1082
1083   -- pick up new IO requests
1084   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
1085   let reqs = new_reqs ++ old_reqs
1086
1087   -- pick up new delay requests
1088   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
1089   let  delays0 = foldr insertDelay old_delays new_delays
1090
1091   -- build the FDSets for select()
1092   fdZero readfds
1093   fdZero writefds
1094   fdSet wakeup readfds
1095   maxfd <- buildFdSets 0 readfds writefds reqs
1096
1097   -- perform the select()
1098   let do_select delays = do
1099           -- check the current time and wake up any thread in
1100           -- threadDelay whose timeout has expired.  Also find the
1101           -- timeout value for the select() call.
1102           now <- getUSecOfDay
1103           (delays', timeout) <- getDelay now ptimeval delays
1104
1105           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1106                         nullPtr timeout
1107           if (res == -1)
1108              then do
1109                 err <- getErrno
1110                 case err of
1111                   _ | err == eINTR ->  do_select delays'
1112                         -- EINTR: just redo the select()
1113                   _ | err == eBADF ->  return (True, delays)
1114                         -- EBADF: one of the file descriptors is closed or bad,
1115                         -- we don't know which one, so wake everyone up.
1116                   _ | otherwise    ->  throwErrno "select"
1117                         -- otherwise (ENOMEM or EINVAL) something has gone
1118                         -- wrong; report the error.
1119              else
1120                 return (False,delays')
1121
1122   (wakeup_all,delays') <- do_select delays0
1123
1124   exit <-
1125     if wakeup_all then return False
1126       else do
1127         b <- fdIsSet wakeup readfds
1128         if b == 0 
1129           then return False
1130           else alloca $ \p -> do 
1131                  c_read (fromIntegral wakeup) p 1
1132                  s <- peek p            
1133                  case s of
1134                   _ | s == io_MANAGER_WAKEUP -> return False
1135                   _ | s == io_MANAGER_DIE    -> return True
1136                   _ | s == io_MANAGER_SYNC   -> do
1137                        mvars <- readIORef sync
1138                        mapM_ (flip putMVar ()) mvars
1139                        return False
1140                   _ -> do
1141                        fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1142                        withForeignPtr fp $ \p_siginfo -> do
1143                          r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1144                                  sizeof_siginfo_t
1145                          when (r /= fromIntegral sizeof_siginfo_t) $
1146                             error "failed to read siginfo_t"
1147                        runHandlers' fp (fromIntegral s)
1148                        return False
1149
1150   if exit then return () else do
1151
1152   atomicModifyIORef prodding (\_ -> (False,False))
1153
1154   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1155                          else completeRequests reqs readfds writefds []
1156
1157   service_loop wakeup readfds writefds ptimeval reqs' delays'
1158
1159 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: CChar
1160 io_MANAGER_WAKEUP = 0xff
1161 io_MANAGER_DIE    = 0xfe
1162 io_MANAGER_SYNC   = 0xfd
1163
1164 -- | the stick is for poking the IO manager with
1165 stick :: IORef Fd
1166 {-# NOINLINE stick #-}
1167 stick = unsafePerformIO (newIORef 0)
1168
1169 {-# NOINLINE sync #-}
1170 sync :: IORef [MVar ()]
1171 sync = unsafePerformIO (newIORef [])
1172
1173 -- waits for the IO manager to drain the pipe
1174 syncIOManager :: IO ()
1175 syncIOManager = do
1176   m <- newEmptyMVar
1177   atomicModifyIORef sync (\old -> (m:old,()))
1178   fd <- readIORef stick
1179   with io_MANAGER_SYNC $ \pbuf -> do 
1180     c_write (fromIntegral fd) pbuf 1; return ()
1181   takeMVar m
1182
1183 wakeupIOManager :: IO ()
1184 wakeupIOManager = do
1185   fd <- readIORef stick
1186   with io_MANAGER_WAKEUP $ \pbuf -> do 
1187     c_write (fromIntegral fd) pbuf 1; return ()
1188
1189 -- For the non-threaded RTS
1190 runHandlers :: Ptr Word8 -> Int -> IO ()
1191 runHandlers p_info sig = do
1192   fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1193   withForeignPtr fp $ \p -> do
1194     copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1195     free p_info
1196   runHandlers' fp (fromIntegral sig)
1197
1198 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1199 runHandlers' p_info sig = do
1200   let int = fromIntegral sig
1201   withMVar signal_handlers $ \arr ->
1202       if not (inRange (boundsIOArray arr) int)
1203          then return ()
1204          else do handler <- unsafeReadIOArray arr int
1205                  case handler of
1206                     Nothing -> return ()
1207                     Just (f,_)  -> do forkIO (f p_info); return ()
1208
1209 foreign import ccall "setIOManagerPipe"
1210   c_setIOManagerPipe :: CInt -> IO ()
1211
1212 foreign import ccall "__hscore_sizeof_siginfo_t"
1213   sizeof_siginfo_t :: CSize
1214
1215 type Signal = CInt
1216
1217 maxSig = 64 :: Int
1218
1219 type HandlerFun = ForeignPtr Word8 -> IO ()
1220
1221 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1222 -- this race condition is #1922, although that bug was on Windows a similar
1223 -- bug also exists on Unix.
1224 {-# NOINLINE signal_handlers #-}
1225 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1226 signal_handlers = unsafePerformIO $ do
1227    arr <- newIOArray (0,maxSig) Nothing
1228    m <- newMVar arr
1229    block $ do
1230      stable_ref <- newStablePtr m
1231      let ref = castStablePtrToPtr stable_ref
1232      ref2 <- getOrSetSignalHandlerStore ref
1233      if ref==ref2
1234         then return m
1235         else do freeStablePtr stable_ref
1236                 deRefStablePtr (castPtrToStablePtr ref2)
1237
1238 foreign import ccall unsafe "getOrSetSignalHandlerStore"
1239     getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a)
1240
1241 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1242 setHandler sig handler = do
1243   let int = fromIntegral sig
1244   withMVar signal_handlers $ \arr -> 
1245      if not (inRange (boundsIOArray arr) int)
1246         then error "GHC.Conc.setHandler: signal out of range"
1247         else do old <- unsafeReadIOArray arr int
1248                 unsafeWriteIOArray arr int handler
1249                 return old
1250
1251 -- -----------------------------------------------------------------------------
1252 -- IO requests
1253
1254 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1255 buildFdSets maxfd _       _        [] = return maxfd
1256 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1257   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1258   | otherwise        =  do
1259         fdSet fd readfds
1260         buildFdSets (max maxfd fd) readfds writefds reqs
1261 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1262   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1263   | otherwise        =  do
1264         fdSet fd writefds
1265         buildFdSets (max maxfd fd) readfds writefds reqs
1266
1267 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1268                  -> IO [IOReq]
1269 completeRequests [] _ _ reqs' = return reqs'
1270 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1271   b <- fdIsSet fd readfds
1272   if b /= 0
1273     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1274     else completeRequests reqs readfds writefds (Read fd m : reqs')
1275 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1276   b <- fdIsSet fd writefds
1277   if b /= 0
1278     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1279     else completeRequests reqs readfds writefds (Write fd m : reqs')
1280
1281 wakeupAll :: [IOReq] -> IO ()
1282 wakeupAll [] = return ()
1283 wakeupAll (Read  _ m : reqs) = do putMVar m (); wakeupAll reqs
1284 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1285
1286 waitForReadEvent :: Fd -> IO ()
1287 waitForReadEvent fd = do
1288   m <- newEmptyMVar
1289   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1290   prodServiceThread
1291   takeMVar m
1292
1293 waitForWriteEvent :: Fd -> IO ()
1294 waitForWriteEvent fd = do
1295   m <- newEmptyMVar
1296   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1297   prodServiceThread
1298   takeMVar m
1299
1300 -- -----------------------------------------------------------------------------
1301 -- Delays
1302
1303 -- Walk the queue of pending delays, waking up any that have passed
1304 -- and return the smallest delay to wait for.  The queue of pending
1305 -- delays is kept ordered.
1306 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1307 getDelay _   _        [] = return ([],nullPtr)
1308 getDelay now ptimeval all@(d : rest) 
1309   = case d of
1310      Delay time m | now >= time -> do
1311         putMVar m ()
1312         getDelay now ptimeval rest
1313      DelaySTM time t | now >= time -> do
1314         atomically $ writeTVar t True
1315         getDelay now ptimeval rest
1316      _otherwise -> do
1317         setTimevalTicks ptimeval (delayTime d - now)
1318         return (all,ptimeval)
1319
1320 data CTimeVal
1321
1322 foreign import ccall unsafe "sizeofTimeVal"
1323   sizeofTimeVal :: Int
1324
1325 foreign import ccall unsafe "setTimevalTicks" 
1326   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1327
1328 {- 
1329   On Win32 we're going to have a single Pipe, and a
1330   waitForSingleObject with the delay time.  For signals, we send a
1331   byte down the pipe just like on Unix.
1332 -}
1333
1334 -- ----------------------------------------------------------------------------
1335 -- select() interface
1336
1337 -- ToDo: move to System.Posix.Internals?
1338
1339 data CFdSet
1340
1341 foreign import ccall safe "select"
1342   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1343            -> IO CInt
1344
1345 foreign import ccall unsafe "hsFD_SETSIZE"
1346   c_fD_SETSIZE :: CInt
1347
1348 fD_SETSIZE :: Fd
1349 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1350
1351 foreign import ccall unsafe "hsFD_ISSET"
1352   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1353
1354 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1355 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1356
1357 foreign import ccall unsafe "hsFD_SET"
1358   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1359
1360 fdSet :: Fd -> Ptr CFdSet -> IO ()
1361 fdSet (Fd fd) fdset = c_fdSet fd fdset
1362
1363 foreign import ccall unsafe "hsFD_ZERO"
1364   fdZero :: Ptr CFdSet -> IO ()
1365
1366 foreign import ccall unsafe "sizeof_fd_set"
1367   sizeofFdSet :: Int
1368
1369 #endif
1370
1371 reportStackOverflow :: IO a
1372 reportStackOverflow = do callStackOverflowHook; return undefined
1373
1374 reportError :: SomeException -> IO a
1375 reportError ex = do
1376    handler <- getUncaughtExceptionHandler
1377    handler ex
1378    return undefined
1379
1380 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
1381 -- the unsafe below.
1382 foreign import ccall unsafe "stackOverflow"
1383         callStackOverflowHook :: IO ()
1384
1385 {-# NOINLINE uncaughtExceptionHandler #-}
1386 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1387 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1388    where
1389       defaultHandler :: SomeException -> IO ()
1390       defaultHandler se@(SomeException ex) = do
1391          (hFlush stdout) `catchAny` (\ _ -> return ())
1392          let msg = case cast ex of
1393                Just Deadlock -> "no threads to run:  infinite loop or deadlock?"
1394                _ -> case cast ex of
1395                     Just (ErrorCall s) -> s
1396                     _                  -> showsPrec 0 se ""
1397          withCString "%s" $ \cfmt ->
1398           withCString msg $ \cmsg ->
1399             errorBelch cfmt cmsg
1400
1401 -- don't use errorBelch() directly, because we cannot call varargs functions
1402 -- using the FFI.
1403 foreign import ccall unsafe "HsBase.h errorBelch2"
1404    errorBelch :: CString -> CString -> IO ()
1405
1406 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1407 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1408
1409 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1410 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
1411
1412
1413 withMVar :: MVar a -> (a -> IO b) -> IO b
1414 withMVar m io = 
1415   block $ do
1416     a <- takeMVar m
1417     b <- catchAny (unblock (io a))
1418             (\e -> do putMVar m a; throw e)
1419     putMVar m a
1420     return b
1421 \end{code}