Rewrite of the IO library, including Unicode support
[packages/base.git] / GHC / Conc.lhs
1 \begin{code}
2 {-# OPTIONS_GHC -XNoImplicitPrelude #-}
3 {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
4 {-# OPTIONS_HADDOCK not-home #-}
5 -----------------------------------------------------------------------------
6 -- |
7 -- Module      :  GHC.Conc
8 -- Copyright   :  (c) The University of Glasgow, 1994-2002
9 -- License     :  see libraries/base/LICENSE
10 -- 
11 -- Maintainer  :  cvs-ghc@haskell.org
12 -- Stability   :  internal
13 -- Portability :  non-portable (GHC extensions)
14 --
15 -- Basic concurrency stuff.
16 -- 
17 -----------------------------------------------------------------------------
18
19 -- No: #hide, because bits of this module are exposed by the stm package.
20 -- However, we don't want this module to be the home location for the
21 -- bits it exports, we'd rather have Control.Concurrent and the other
22 -- higher level modules be the home.  Hence:
23
24 #include "Typeable.h"
25
26 -- #not-home
27 module GHC.Conc
28         ( ThreadId(..)
29
30         -- * Forking and suchlike
31         , forkIO        -- :: IO a -> IO ThreadId
32         , forkOnIO      -- :: Int -> IO a -> IO ThreadId
33         , numCapabilities -- :: Int
34         , childHandler  -- :: Exception -> IO ()
35         , myThreadId    -- :: IO ThreadId
36         , killThread    -- :: ThreadId -> IO ()
37         , throwTo       -- :: ThreadId -> Exception -> IO ()
38         , par           -- :: a -> b -> b
39         , pseq          -- :: a -> b -> b
40         , runSparks
41         , yield         -- :: IO ()
42         , labelThread   -- :: ThreadId -> String -> IO ()
43
44         , ThreadStatus(..), BlockReason(..)
45         , threadStatus  -- :: ThreadId -> IO ThreadStatus
46
47         -- * Waiting
48         , threadDelay           -- :: Int -> IO ()
49         , registerDelay         -- :: Int -> IO (TVar Bool)
50         , threadWaitRead        -- :: Int -> IO ()
51         , threadWaitWrite       -- :: Int -> IO ()
52
53         -- * TVars
54         , STM(..)
55         , atomically    -- :: STM a -> IO a
56         , retry         -- :: STM a
57         , orElse        -- :: STM a -> STM a -> STM a
58         , catchSTM      -- :: STM a -> (Exception -> STM a) -> STM a
59         , alwaysSucceeds -- :: STM a -> STM ()
60         , always        -- :: STM Bool -> STM ()
61         , TVar(..)
62         , newTVar       -- :: a -> STM (TVar a)
63         , newTVarIO     -- :: a -> STM (TVar a)
64         , readTVar      -- :: TVar a -> STM a
65         , readTVarIO    -- :: TVar a -> IO a
66         , writeTVar     -- :: a -> TVar a -> STM ()
67         , unsafeIOToSTM -- :: IO a -> STM a
68
69         -- * Miscellaneous
70         , withMVar
71 #ifdef mingw32_HOST_OS
72         , asyncRead     -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
73         , asyncWrite    -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
74         , asyncDoProc   -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
75
76         , asyncReadBA   -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
77         , asyncWriteBA  -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
78 #endif
79
80 #ifndef mingw32_HOST_OS
81         , Signal, HandlerFun, setHandler, runHandlers
82 #endif
83
84         , ensureIOManagerIsRunning
85 #ifndef mingw32_HOST_OS
86         , syncIOManager
87 #endif
88
89 #ifdef mingw32_HOST_OS
90         , ConsoleEvent(..)
91         , win32ConsoleHandler
92         , toWin32ConsoleEvent
93 #endif
94         , setUncaughtExceptionHandler      -- :: (Exception -> IO ()) -> IO ()
95         , getUncaughtExceptionHandler      -- :: IO (Exception -> IO ())
96
97         , reportError, reportStackOverflow
98         ) where
99
100 import System.Posix.Types
101 #ifndef mingw32_HOST_OS
102 import System.Posix.Internals
103 #endif
104 import Foreign
105 import Foreign.C
106
107 #ifndef mingw32_HOST_OS
108 import Data.Dynamic
109 import Control.Monad
110 #endif
111 import Data.Maybe
112
113 import GHC.Base
114 import {-# SOURCE #-} GHC.IO.Handle ( hFlush )
115 import {-# SOURCE #-} GHC.IO.Handle.FD ( stdout )
116 import GHC.IO
117 import GHC.IO.Exception
118 import GHC.Exception
119 import GHC.IORef
120 import GHC.MVar
121 import GHC.Num          ( Num(..) )
122 import GHC.Real         ( fromIntegral )
123 #ifndef mingw32_HOST_OS
124 import GHC.IOArray
125 import GHC.Arr          ( inRange )
126 #endif
127 #ifdef mingw32_HOST_OS
128 import GHC.Real         ( div )
129 import GHC.Ptr          ( plusPtr, FunPtr(..) )
130 #endif
131 #ifdef mingw32_HOST_OS
132 import GHC.Read         ( Read )
133 import GHC.Enum         ( Enum )
134 #endif
135 import GHC.Pack         ( packCString# )
136 import GHC.Ptr          ( Ptr(..) )
137 import GHC.Show         ( Show(..), showString )
138 import Data.Typeable
139 import GHC.Err
140
141 infixr 0 `par`, `pseq`
142 \end{code}
143
144 %************************************************************************
145 %*                                                                      *
146 \subsection{@ThreadId@, @par@, and @fork@}
147 %*                                                                      *
148 %************************************************************************
149
150 \begin{code}
151 data ThreadId = ThreadId ThreadId# deriving( Typeable )
152 -- ToDo: data ThreadId = ThreadId (Weak ThreadId#)
153 -- But since ThreadId# is unlifted, the Weak type must use open
154 -- type variables.
155 {- ^
156 A 'ThreadId' is an abstract type representing a handle to a thread.
157 'ThreadId' is an instance of 'Eq', 'Ord' and 'Show', where
158 the 'Ord' instance implements an arbitrary total ordering over
159 'ThreadId's. The 'Show' instance lets you convert an arbitrary-valued
160 'ThreadId' to string form; showing a 'ThreadId' value is occasionally
161 useful when debugging or diagnosing the behaviour of a concurrent
162 program.
163
164 /Note/: in GHC, if you have a 'ThreadId', you essentially have
165 a pointer to the thread itself.  This means the thread itself can\'t be
166 garbage collected until you drop the 'ThreadId'.
167 This misfeature will hopefully be corrected at a later date.
168
169 /Note/: Hugs does not provide any operations on other threads;
170 it defines 'ThreadId' as a synonym for ().
171 -}
172
173 instance Show ThreadId where
174    showsPrec d t = 
175         showString "ThreadId " . 
176         showsPrec d (getThreadId (id2TSO t))
177
178 foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
179
180 id2TSO :: ThreadId -> ThreadId#
181 id2TSO (ThreadId t) = t
182
183 foreign import ccall unsafe "cmp_thread" cmp_thread :: ThreadId# -> ThreadId# -> CInt
184 -- Returns -1, 0, 1
185
186 cmpThread :: ThreadId -> ThreadId -> Ordering
187 cmpThread t1 t2 = 
188    case cmp_thread (id2TSO t1) (id2TSO t2) of
189       -1 -> LT
190       0  -> EQ
191       _  -> GT -- must be 1
192
193 instance Eq ThreadId where
194    t1 == t2 = 
195       case t1 `cmpThread` t2 of
196          EQ -> True
197          _  -> False
198
199 instance Ord ThreadId where
200    compare = cmpThread
201
202 {- |
203 Sparks off a new thread to run the 'IO' computation passed as the
204 first argument, and returns the 'ThreadId' of the newly created
205 thread.
206
207 The new thread will be a lightweight thread; if you want to use a foreign
208 library that uses thread-local storage, use 'Control.Concurrent.forkOS' instead.
209
210 GHC note: the new thread inherits the /blocked/ state of the parent 
211 (see 'Control.Exception.block').
212
213 The newly created thread has an exception handler that discards the
214 exceptions 'BlockedOnDeadMVar', 'BlockedIndefinitely', and
215 'ThreadKilled', and passes all other exceptions to the uncaught
216 exception handler (see 'setUncaughtExceptionHandler').
217 -}
218 forkIO :: IO () -> IO ThreadId
219 forkIO action = IO $ \ s -> 
220    case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
221  where
222   action_plus = catchException action childHandler
223
224 {- |
225 Like 'forkIO', but lets you specify on which CPU the thread is
226 created.  Unlike a `forkIO` thread, a thread created by `forkOnIO`
227 will stay on the same CPU for its entire lifetime (`forkIO` threads
228 can migrate between CPUs according to the scheduling policy).
229 `forkOnIO` is useful for overriding the scheduling policy when you
230 know in advance how best to distribute the threads.
231
232 The `Int` argument specifies the CPU number; it is interpreted modulo
233 'numCapabilities' (note that it actually specifies a capability number
234 rather than a CPU number, but to a first approximation the two are
235 equivalent).
236 -}
237 forkOnIO :: Int -> IO () -> IO ThreadId
238 forkOnIO (I# cpu) action = IO $ \ s -> 
239    case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
240  where
241   action_plus = catchException action childHandler
242
243 -- | the value passed to the @+RTS -N@ flag.  This is the number of
244 -- Haskell threads that can run truly simultaneously at any given
245 -- time, and is typically set to the number of physical CPU cores on
246 -- the machine.
247 numCapabilities :: Int
248 numCapabilities = unsafePerformIO $  do 
249                     n <- peek n_capabilities
250                     return (fromIntegral n)
251
252 #if defined(mingw32_HOST_OS) && defined(__PIC__)
253 foreign import ccall "_imp__n_capabilities" n_capabilities :: Ptr CInt
254 #else
255 foreign import ccall "&n_capabilities" n_capabilities :: Ptr CInt
256 #endif
257 childHandler :: SomeException -> IO ()
258 childHandler err = catchException (real_handler err) childHandler
259
260 real_handler :: SomeException -> IO ()
261 real_handler se@(SomeException ex) =
262   -- ignore thread GC and killThread exceptions:
263   case cast ex of
264   Just BlockedOnDeadMVar                -> return ()
265   _ -> case cast ex of
266        Just BlockedIndefinitely         -> return ()
267        _ -> case cast ex of
268             Just ThreadKilled           -> return ()
269             _ -> case cast ex of
270                  -- report all others:
271                  Just StackOverflow     -> reportStackOverflow
272                  _                      -> reportError se
273
274 {- | 'killThread' terminates the given thread (GHC only).
275 Any work already done by the thread isn\'t
276 lost: the computation is suspended until required by another thread.
277 The memory used by the thread will be garbage collected if it isn\'t
278 referenced from anywhere.  The 'killThread' function is defined in
279 terms of 'throwTo':
280
281 > killThread tid = throwTo tid ThreadKilled
282
283 Killthread is a no-op if the target thread has already completed.
284 -}
285 killThread :: ThreadId -> IO ()
286 killThread tid = throwTo tid ThreadKilled
287
288 {- | 'throwTo' raises an arbitrary exception in the target thread (GHC only).
289
290 'throwTo' does not return until the exception has been raised in the
291 target thread. 
292 The calling thread can thus be certain that the target
293 thread has received the exception.  This is a useful property to know
294 when dealing with race conditions: eg. if there are two threads that
295 can kill each other, it is guaranteed that only one of the threads
296 will get to kill the other.
297
298 If the target thread is currently making a foreign call, then the
299 exception will not be raised (and hence 'throwTo' will not return)
300 until the call has completed.  This is the case regardless of whether
301 the call is inside a 'block' or not.
302
303 Important note: the behaviour of 'throwTo' differs from that described in
304 the paper \"Asynchronous exceptions in Haskell\"
305 (<http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm>).
306 In the paper, 'throwTo' is non-blocking; but the library implementation adopts
307 a more synchronous design in which 'throwTo' does not return until the exception
308 is received by the target thread.  The trade-off is discussed in Section 9 of the paper.
309 Like any blocking operation, 'throwTo' is therefore interruptible (see Section 5.3 of
310 the paper).
311
312 There is currently no guarantee that the exception delivered by 'throwTo' will be
313 delivered at the first possible opportunity.  In particular, a thread may 
314 unblock and then re-block exceptions (using 'unblock' and 'block') without receiving
315 a pending 'throwTo'.  This is arguably undesirable behaviour.
316
317  -}
318 throwTo :: Exception e => ThreadId -> e -> IO ()
319 throwTo (ThreadId tid) ex = IO $ \ s ->
320    case (killThread# tid (toException ex) s) of s1 -> (# s1, () #)
321
322 -- | Returns the 'ThreadId' of the calling thread (GHC only).
323 myThreadId :: IO ThreadId
324 myThreadId = IO $ \s ->
325    case (myThreadId# s) of (# s1, tid #) -> (# s1, ThreadId tid #)
326
327
328 -- |The 'yield' action allows (forces, in a co-operative multitasking
329 -- implementation) a context-switch to any other currently runnable
330 -- threads (if any), and is occasionally useful when implementing
331 -- concurrency abstractions.
332 yield :: IO ()
333 yield = IO $ \s -> 
334    case (yield# s) of s1 -> (# s1, () #)
335
336 {- | 'labelThread' stores a string as identifier for this thread if
337 you built a RTS with debugging support. This identifier will be used in
338 the debugging output to make distinction of different threads easier
339 (otherwise you only have the thread state object\'s address in the heap).
340
341 Other applications like the graphical Concurrent Haskell Debugger
342 (<http://www.informatik.uni-kiel.de/~fhu/chd/>) may choose to overload
343 'labelThread' for their purposes as well.
344 -}
345
346 labelThread :: ThreadId -> String -> IO ()
347 labelThread (ThreadId t) str = IO $ \ s ->
348    let !ps  = packCString# str
349        !adr = byteArrayContents# ps in
350      case (labelThread# t adr s) of s1 -> (# s1, () #)
351
352 --      Nota Bene: 'pseq' used to be 'seq'
353 --                 but 'seq' is now defined in PrelGHC
354 --
355 -- "pseq" is defined a bit weirdly (see below)
356 --
357 -- The reason for the strange "lazy" call is that
358 -- it fools the compiler into thinking that pseq  and par are non-strict in
359 -- their second argument (even if it inlines pseq at the call site).
360 -- If it thinks pseq is strict in "y", then it often evaluates
361 -- "y" before "x", which is totally wrong.  
362
363 {-# INLINE pseq  #-}
364 pseq :: a -> b -> b
365 pseq  x y = x `seq` lazy y
366
367 {-# INLINE par  #-}
368 par :: a -> b -> b
369 par  x y = case (par# x) of { _ -> lazy y }
370
371 -- | Internal function used by the RTS to run sparks.
372 runSparks :: IO ()
373 runSparks = IO loop
374   where loop s = case getSpark# s of
375                    (# s', n, p #) ->
376                       if n ==# 0# then (# s', () #)
377                                   else p `seq` loop s'
378
379 data BlockReason
380   = BlockedOnMVar
381         -- ^blocked on on 'MVar'
382   | BlockedOnBlackHole
383         -- ^blocked on a computation in progress by another thread
384   | BlockedOnException
385         -- ^blocked in 'throwTo'
386   | BlockedOnSTM
387         -- ^blocked in 'retry' in an STM transaction
388   | BlockedOnForeignCall
389         -- ^currently in a foreign call
390   | BlockedOnOther
391         -- ^blocked on some other resource.  Without @-threaded@,
392         -- I\/O and 'threadDelay' show up as 'BlockedOnOther', with @-threaded@
393         -- they show up as 'BlockedOnMVar'.
394   deriving (Eq,Ord,Show)
395
396 -- | The current status of a thread
397 data ThreadStatus
398   = ThreadRunning
399         -- ^the thread is currently runnable or running
400   | ThreadFinished
401         -- ^the thread has finished
402   | ThreadBlocked  BlockReason
403         -- ^the thread is blocked on some resource
404   | ThreadDied
405         -- ^the thread received an uncaught exception
406   deriving (Eq,Ord,Show)
407
408 threadStatus :: ThreadId -> IO ThreadStatus
409 threadStatus (ThreadId t) = IO $ \s ->
410    case threadStatus# t s of
411      (# s', stat #) -> (# s', mk_stat (I# stat) #)
412    where
413         -- NB. keep these in sync with includes/Constants.h
414      mk_stat 0  = ThreadRunning
415      mk_stat 1  = ThreadBlocked BlockedOnMVar
416      mk_stat 2  = ThreadBlocked BlockedOnBlackHole
417      mk_stat 3  = ThreadBlocked BlockedOnException
418      mk_stat 7  = ThreadBlocked BlockedOnSTM
419      mk_stat 11 = ThreadBlocked BlockedOnForeignCall
420      mk_stat 12 = ThreadBlocked BlockedOnForeignCall
421      mk_stat 16 = ThreadFinished
422      mk_stat 17 = ThreadDied
423      mk_stat _  = ThreadBlocked BlockedOnOther
424 \end{code}
425
426
427 %************************************************************************
428 %*                                                                      *
429 \subsection[stm]{Transactional heap operations}
430 %*                                                                      *
431 %************************************************************************
432
433 TVars are shared memory locations which support atomic memory
434 transactions.
435
436 \begin{code}
437 -- |A monad supporting atomic memory transactions.
438 newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
439
440 unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
441 unSTM (STM a) = a
442
443 INSTANCE_TYPEABLE1(STM,stmTc,"STM")
444
445 instance  Functor STM where
446    fmap f x = x >>= (return . f)
447
448 instance  Monad STM  where
449     {-# INLINE return #-}
450     {-# INLINE (>>)   #-}
451     {-# INLINE (>>=)  #-}
452     m >> k      = thenSTM m k
453     return x    = returnSTM x
454     m >>= k     = bindSTM m k
455
456 bindSTM :: STM a -> (a -> STM b) -> STM b
457 bindSTM (STM m) k = STM ( \s ->
458   case m s of 
459     (# new_s, a #) -> unSTM (k a) new_s
460   )
461
462 thenSTM :: STM a -> STM b -> STM b
463 thenSTM (STM m) k = STM ( \s ->
464   case m s of 
465     (# new_s, _ #) -> unSTM k new_s
466   )
467
468 returnSTM :: a -> STM a
469 returnSTM x = STM (\s -> (# s, x #))
470
471 -- | Unsafely performs IO in the STM monad.  Beware: this is a highly
472 -- dangerous thing to do.  
473 --
474 --   * The STM implementation will often run transactions multiple
475 --     times, so you need to be prepared for this if your IO has any
476 --     side effects.
477 --
478 --   * The STM implementation will abort transactions that are known to
479 --     be invalid and need to be restarted.  This may happen in the middle
480 --     of `unsafeIOToSTM`, so make sure you don't acquire any resources
481 --     that need releasing (exception handlers are ignored when aborting
482 --     the transaction).  That includes doing any IO using Handles, for
483 --     example.  Getting this wrong will probably lead to random deadlocks.
484 --
485 --   * The transaction may have seen an inconsistent view of memory when
486 --     the IO runs.  Invariants that you expect to be true throughout
487 --     your program may not be true inside a transaction, due to the
488 --     way transactions are implemented.  Normally this wouldn't be visible
489 --     to the programmer, but using `unsafeIOToSTM` can expose it.
490 --
491 unsafeIOToSTM :: IO a -> STM a
492 unsafeIOToSTM (IO m) = STM m
493
494 -- |Perform a series of STM actions atomically.
495 --
496 -- You cannot use 'atomically' inside an 'unsafePerformIO' or 'unsafeInterleaveIO'. 
497 -- Any attempt to do so will result in a runtime error.  (Reason: allowing
498 -- this would effectively allow a transaction inside a transaction, depending
499 -- on exactly when the thunk is evaluated.)
500 --
501 -- However, see 'newTVarIO', which can be called inside 'unsafePerformIO',
502 -- and which allows top-level TVars to be allocated.
503
504 atomically :: STM a -> IO a
505 atomically (STM m) = IO (\s -> (atomically# m) s )
506
507 -- |Retry execution of the current memory transaction because it has seen
508 -- values in TVars which mean that it should not continue (e.g. the TVars
509 -- represent a shared buffer that is now empty).  The implementation may
510 -- block the thread until one of the TVars that it has read from has been
511 -- udpated. (GHC only)
512 retry :: STM a
513 retry = STM $ \s# -> retry# s#
514
515 -- |Compose two alternative STM actions (GHC only).  If the first action
516 -- completes without retrying then it forms the result of the orElse.
517 -- Otherwise, if the first action retries, then the second action is
518 -- tried in its place.  If both actions retry then the orElse as a
519 -- whole retries.
520 orElse :: STM a -> STM a -> STM a
521 orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
522
523 -- |Exception handling within STM actions.
524 catchSTM :: STM a -> (SomeException -> STM a) -> STM a
525 catchSTM (STM m) k = STM $ \s -> catchSTM# m (\ex -> unSTM (k ex)) s
526
527 -- | Low-level primitive on which always and alwaysSucceeds are built.
528 -- checkInv differs form these in that (i) the invariant is not 
529 -- checked when checkInv is called, only at the end of this and
530 -- subsequent transcations, (ii) the invariant failure is indicated
531 -- by raising an exception.
532 checkInv :: STM a -> STM ()
533 checkInv (STM m) = STM (\s -> (check# m) s)
534
535 -- | alwaysSucceeds adds a new invariant that must be true when passed
536 -- to alwaysSucceeds, at the end of the current transaction, and at
537 -- the end of every subsequent transaction.  If it fails at any
538 -- of those points then the transaction violating it is aborted
539 -- and the exception raised by the invariant is propagated.
540 alwaysSucceeds :: STM a -> STM ()
541 alwaysSucceeds i = do ( do i ; retry ) `orElse` ( return () ) 
542                       checkInv i
543
544 -- | always is a variant of alwaysSucceeds in which the invariant is
545 -- expressed as an STM Bool action that must return True.  Returning
546 -- False or raising an exception are both treated as invariant failures.
547 always :: STM Bool -> STM ()
548 always i = alwaysSucceeds ( do v <- i
549                                if (v) then return () else ( error "Transacional invariant violation" ) )
550
551 -- |Shared memory locations that support atomic memory transactions.
552 data TVar a = TVar (TVar# RealWorld a)
553
554 INSTANCE_TYPEABLE1(TVar,tvarTc,"TVar")
555
556 instance Eq (TVar a) where
557         (TVar tvar1#) == (TVar tvar2#) = sameTVar# tvar1# tvar2#
558
559 -- |Create a new TVar holding a value supplied
560 newTVar :: a -> STM (TVar a)
561 newTVar val = STM $ \s1# ->
562     case newTVar# val s1# of
563          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
564
565 -- |@IO@ version of 'newTVar'.  This is useful for creating top-level
566 -- 'TVar's using 'System.IO.Unsafe.unsafePerformIO', because using
567 -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
568 -- possible.
569 newTVarIO :: a -> IO (TVar a)
570 newTVarIO val = IO $ \s1# ->
571     case newTVar# val s1# of
572          (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
573
574 -- |Return the current value stored in a TVar.
575 -- This is equivalent to
576 --
577 -- >  readTVarIO = atomically . readTVar
578 --
579 -- but works much faster, because it doesn't perform a complete
580 -- transaction, it just reads the current value of the 'TVar'.
581 readTVarIO :: TVar a -> IO a
582 readTVarIO (TVar tvar#) = IO $ \s# -> readTVarIO# tvar# s#
583
584 -- |Return the current value stored in a TVar
585 readTVar :: TVar a -> STM a
586 readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
587
588 -- |Write the supplied value into a TVar
589 writeTVar :: TVar a -> a -> STM ()
590 writeTVar (TVar tvar#) val = STM $ \s1# ->
591     case writeTVar# tvar# val s1# of
592          s2# -> (# s2#, () #)
593   
594 \end{code}
595
596 MVar utilities
597
598 \begin{code}
599 withMVar :: MVar a -> (a -> IO b) -> IO b
600 withMVar m io = 
601   block $ do
602     a <- takeMVar m
603     b <- catchAny (unblock (io a))
604             (\e -> do putMVar m a; throw e)
605     putMVar m a
606     return b
607 \end{code}
608
609 %************************************************************************
610 %*                                                                      *
611 \subsection{Thread waiting}
612 %*                                                                      *
613 %************************************************************************
614
615 \begin{code}
616 #ifdef mingw32_HOST_OS
617
618 -- Note: threadWaitRead and threadWaitWrite aren't really functional
619 -- on Win32, but left in there because lib code (still) uses them (the manner
620 -- in which they're used doesn't cause problems on a Win32 platform though.)
621
622 asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
623 asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
624   IO $ \s -> case asyncRead# fd isSock len buf s of 
625                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
626
627 asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
628 asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
629   IO $ \s -> case asyncWrite# fd isSock len buf s of 
630                (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
631
632 asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
633 asyncDoProc (FunPtr proc) (Ptr param) = 
634     -- the 'length' value is ignored; simplifies implementation of
635     -- the async*# primops to have them all return the same result.
636   IO $ \s -> case asyncDoProc# proc param s  of 
637                (# s', _len#, err# #) -> (# s', I# err# #)
638
639 -- to aid the use of these primops by the IO Handle implementation,
640 -- provide the following convenience funs:
641
642 -- this better be a pinned byte array!
643 asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
644 asyncReadBA fd isSock len off bufB = 
645   asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
646   
647 asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
648 asyncWriteBA fd isSock len off bufB = 
649   asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
650
651 #endif
652
653 -- -----------------------------------------------------------------------------
654 -- Thread IO API
655
656 -- | Block the current thread until data is available to read on the
657 -- given file descriptor (GHC only).
658 threadWaitRead :: Fd -> IO ()
659 threadWaitRead fd
660 #ifndef mingw32_HOST_OS
661   | threaded  = waitForReadEvent fd
662 #endif
663   | otherwise = IO $ \s -> 
664         case fromIntegral fd of { I# fd# ->
665         case waitRead# fd# s of { s' -> (# s', () #)
666         }}
667
668 -- | Block the current thread until data can be written to the
669 -- given file descriptor (GHC only).
670 threadWaitWrite :: Fd -> IO ()
671 threadWaitWrite fd
672 #ifndef mingw32_HOST_OS
673   | threaded  = waitForWriteEvent fd
674 #endif
675   | otherwise = IO $ \s -> 
676         case fromIntegral fd of { I# fd# ->
677         case waitWrite# fd# s of { s' -> (# s', () #)
678         }}
679
680 -- | Suspends the current thread for a given number of microseconds
681 -- (GHC only).
682 --
683 -- There is no guarantee that the thread will be rescheduled promptly
684 -- when the delay has expired, but the thread will never continue to
685 -- run /earlier/ than specified.
686 --
687 threadDelay :: Int -> IO ()
688 threadDelay time
689   | threaded  = waitForDelayEvent time
690   | otherwise = IO $ \s -> 
691         case fromIntegral time of { I# time# ->
692         case delay# time# s of { s' -> (# s', () #)
693         }}
694
695
696 -- | Set the value of returned TVar to True after a given number of
697 -- microseconds. The caveats associated with threadDelay also apply.
698 --
699 registerDelay :: Int -> IO (TVar Bool)
700 registerDelay usecs 
701   | threaded = waitForDelayEventSTM usecs
702   | otherwise = error "registerDelay: requires -threaded"
703
704 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
705
706 waitForDelayEvent :: Int -> IO ()
707 waitForDelayEvent usecs = do
708   m <- newEmptyMVar
709   target <- calculateTarget usecs
710   atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
711   prodServiceThread
712   takeMVar m
713
714 -- Delays for use in STM
715 waitForDelayEventSTM :: Int -> IO (TVar Bool)
716 waitForDelayEventSTM usecs = do
717    t <- atomically $ newTVar False
718    target <- calculateTarget usecs
719    atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
720    prodServiceThread
721    return t  
722     
723 calculateTarget :: Int -> IO USecs
724 calculateTarget usecs = do
725     now <- getUSecOfDay
726     return $ now + (fromIntegral usecs)
727
728
729 -- ----------------------------------------------------------------------------
730 -- Threaded RTS implementation of threadWaitRead, threadWaitWrite, threadDelay
731
732 -- In the threaded RTS, we employ a single IO Manager thread to wait
733 -- for all outstanding IO requests (threadWaitRead,threadWaitWrite)
734 -- and delays (threadDelay).  
735 --
736 -- We can do this because in the threaded RTS the IO Manager can make
737 -- a non-blocking call to select(), so we don't have to do select() in
738 -- the scheduler as we have to in the non-threaded RTS.  We get performance
739 -- benefits from doing it this way, because we only have to restart the select()
740 -- when a new request arrives, rather than doing one select() each time
741 -- around the scheduler loop.  Furthermore, the scheduler can be simplified
742 -- by not having to check for completed IO requests.
743
744 -- Issues, possible problems:
745 --
746 --      - we might want bound threads to just do the blocking
747 --        operation rather than communicating with the IO manager
748 --        thread.  This would prevent simgle-threaded programs which do
749 --        IO from requiring multiple OS threads.  However, it would also
750 --        prevent bound threads waiting on IO from being killed or sent
751 --        exceptions.
752 --
753 --      - Apprently exec() doesn't work on Linux in a multithreaded program.
754 --        I couldn't repeat this.
755 --
756 --      - How do we handle signal delivery in the multithreaded RTS?
757 --
758 --      - forkProcess will kill the IO manager thread.  Let's just
759 --        hope we don't need to do any blocking IO between fork & exec.
760
761 #ifndef mingw32_HOST_OS
762 data IOReq
763   = Read   {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
764   | Write  {-# UNPACK #-} !Fd {-# UNPACK #-} !(MVar ())
765 #endif
766
767 data DelayReq
768   = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
769   | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
770
771 #ifndef mingw32_HOST_OS
772 pendingEvents :: IORef [IOReq]
773 #endif
774 pendingDelays :: IORef [DelayReq]
775         -- could use a strict list or array here
776 {-# NOINLINE pendingEvents #-}
777 {-# NOINLINE pendingDelays #-}
778 (pendingEvents,pendingDelays) = unsafePerformIO $ do
779   startIOManagerThread
780   reqs <- newIORef []
781   dels <- newIORef []
782   return (reqs, dels)
783         -- the first time we schedule an IO request, the service thread
784         -- will be created (cool, huh?)
785
786 ensureIOManagerIsRunning :: IO ()
787 ensureIOManagerIsRunning 
788   | threaded  = seq pendingEvents $ return ()
789   | otherwise = return ()
790
791 insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
792 insertDelay d [] = [d]
793 insertDelay d1 ds@(d2 : rest)
794   | delayTime d1 <= delayTime d2 = d1 : ds
795   | otherwise                    = d2 : insertDelay d1 rest
796
797 delayTime :: DelayReq -> USecs
798 delayTime (Delay t _) = t
799 delayTime (DelaySTM t _) = t
800
801 type USecs = Word64
802
803 foreign import ccall unsafe "getUSecOfDay" 
804   getUSecOfDay :: IO USecs
805
806 prodding :: IORef Bool
807 {-# NOINLINE prodding #-}
808 prodding = unsafePerformIO (newIORef False)
809
810 prodServiceThread :: IO ()
811 prodServiceThread = do
812   was_set <- atomicModifyIORef prodding (\a -> (True,a))
813   if (not (was_set)) then wakeupIOManager else return ()
814
815 #ifdef mingw32_HOST_OS
816 -- ----------------------------------------------------------------------------
817 -- Windows IO manager thread
818
819 startIOManagerThread :: IO ()
820 startIOManagerThread = do
821   wakeup <- c_getIOManagerEvent
822   forkIO $ service_loop wakeup []
823   return ()
824
825 service_loop :: HANDLE          -- read end of pipe
826              -> [DelayReq]      -- current delay requests
827              -> IO ()
828
829 service_loop wakeup old_delays = do
830   -- pick up new delay requests
831   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
832   let  delays = foldr insertDelay old_delays new_delays
833
834   now <- getUSecOfDay
835   (delays', timeout) <- getDelay now delays
836
837   r <- c_WaitForSingleObject wakeup timeout
838   case r of
839     0xffffffff -> do c_maperrno; throwErrno "service_loop"
840     0 -> do
841         r2 <- c_readIOManagerEvent
842         exit <- 
843               case r2 of
844                 _ | r2 == io_MANAGER_WAKEUP -> return False
845                 _ | r2 == io_MANAGER_DIE    -> return True
846                 0 -> return False -- spurious wakeup
847                 _ -> do start_console_handler (r2 `shiftR` 1); return False
848         if exit
849           then return ()
850           else service_cont wakeup delays'
851
852     _other -> service_cont wakeup delays' -- probably timeout        
853
854 service_cont :: HANDLE -> [DelayReq] -> IO ()
855 service_cont wakeup delays = do
856   r <- atomicModifyIORef prodding (\_ -> (False,False))
857   r `seq` return () -- avoid space leak
858   service_loop wakeup delays
859
860 -- must agree with rts/win32/ThrIOManager.c
861 io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
862 io_MANAGER_WAKEUP = 0xffffffff
863 io_MANAGER_DIE    = 0xfffffffe
864
865 data ConsoleEvent
866  = ControlC
867  | Break
868  | Close
869     -- these are sent to Services only.
870  | Logoff
871  | Shutdown
872  deriving (Eq, Ord, Enum, Show, Read, Typeable)
873
874 start_console_handler :: Word32 -> IO ()
875 start_console_handler r =
876   case toWin32ConsoleEvent r of
877      Just x  -> withMVar win32ConsoleHandler $ \handler -> do
878                     forkIO (handler x)
879                     return ()
880      Nothing -> return ()
881
882 toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
883 toWin32ConsoleEvent ev = 
884    case ev of
885        0 {- CTRL_C_EVENT-}        -> Just ControlC
886        1 {- CTRL_BREAK_EVENT-}    -> Just Break
887        2 {- CTRL_CLOSE_EVENT-}    -> Just Close
888        5 {- CTRL_LOGOFF_EVENT-}   -> Just Logoff
889        6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
890        _ -> Nothing
891
892 win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
893 win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
894
895 -- XXX Is this actually needed?
896 stick :: IORef HANDLE
897 {-# NOINLINE stick #-}
898 stick = unsafePerformIO (newIORef nullPtr)
899
900 wakeupIOManager :: IO ()
901 wakeupIOManager = do 
902   _hdl <- readIORef stick
903   c_sendIOManagerEvent io_MANAGER_WAKEUP
904
905 -- Walk the queue of pending delays, waking up any that have passed
906 -- and return the smallest delay to wait for.  The queue of pending
907 -- delays is kept ordered.
908 getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
909 getDelay _   [] = return ([], iNFINITE)
910 getDelay now all@(d : rest) 
911   = case d of
912      Delay time m | now >= time -> do
913         putMVar m ()
914         getDelay now rest
915      DelaySTM time t | now >= time -> do
916         atomically $ writeTVar t True
917         getDelay now rest
918      _otherwise ->
919         -- delay is in millisecs for WaitForSingleObject
920         let micro_seconds = delayTime d - now
921             milli_seconds = (micro_seconds + 999) `div` 1000
922         in return (all, fromIntegral milli_seconds)
923
924 -- ToDo: this just duplicates part of System.Win32.Types, which isn't
925 -- available yet.  We should move some Win32 functionality down here,
926 -- maybe as part of the grand reorganisation of the base package...
927 type HANDLE       = Ptr ()
928 type DWORD        = Word32
929
930 iNFINITE :: DWORD
931 iNFINITE = 0xFFFFFFFF -- urgh
932
933 foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
934   c_getIOManagerEvent :: IO HANDLE
935
936 foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
937   c_readIOManagerEvent :: IO Word32
938
939 foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
940   c_sendIOManagerEvent :: Word32 -> IO ()
941
942 foreign import ccall unsafe "maperrno"             -- in Win32Utils.c
943    c_maperrno :: IO ()
944
945 foreign import stdcall "WaitForSingleObject"
946    c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
947
948 #else
949 -- ----------------------------------------------------------------------------
950 -- Unix IO manager thread, using select()
951
952 startIOManagerThread :: IO ()
953 startIOManagerThread = do
954         allocaArray 2 $ \fds -> do
955         throwErrnoIfMinus1 "startIOManagerThread" (c_pipe fds)
956         rd_end <- peekElemOff fds 0
957         wr_end <- peekElemOff fds 1
958         setNonBlockingFD wr_end -- writes happen in a signal handler, we
959                                 -- don't want them to block.
960         setCloseOnExec rd_end
961         setCloseOnExec wr_end
962         writeIORef stick (fromIntegral wr_end)
963         c_setIOManagerPipe wr_end
964         forkIO $ do
965             allocaBytes sizeofFdSet   $ \readfds -> do
966             allocaBytes sizeofFdSet   $ \writefds -> do 
967             allocaBytes sizeofTimeVal $ \timeval -> do
968             service_loop (fromIntegral rd_end) readfds writefds timeval [] []
969         return ()
970
971 service_loop
972    :: Fd                -- listen to this for wakeup calls
973    -> Ptr CFdSet
974    -> Ptr CFdSet
975    -> Ptr CTimeVal
976    -> [IOReq]
977    -> [DelayReq]
978    -> IO ()
979 service_loop wakeup readfds writefds ptimeval old_reqs old_delays = do
980
981   -- pick up new IO requests
982   new_reqs <- atomicModifyIORef pendingEvents (\a -> ([],a))
983   let reqs = new_reqs ++ old_reqs
984
985   -- pick up new delay requests
986   new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
987   let  delays0 = foldr insertDelay old_delays new_delays
988
989   -- build the FDSets for select()
990   fdZero readfds
991   fdZero writefds
992   fdSet wakeup readfds
993   maxfd <- buildFdSets 0 readfds writefds reqs
994
995   -- perform the select()
996   let do_select delays = do
997           -- check the current time and wake up any thread in
998           -- threadDelay whose timeout has expired.  Also find the
999           -- timeout value for the select() call.
1000           now <- getUSecOfDay
1001           (delays', timeout) <- getDelay now ptimeval delays
1002
1003           res <- c_select (fromIntegral ((max wakeup maxfd)+1)) readfds writefds 
1004                         nullPtr timeout
1005           if (res == -1)
1006              then do
1007                 err <- getErrno
1008                 case err of
1009                   _ | err == eINTR ->  do_select delays'
1010                         -- EINTR: just redo the select()
1011                   _ | err == eBADF ->  return (True, delays)
1012                         -- EBADF: one of the file descriptors is closed or bad,
1013                         -- we don't know which one, so wake everyone up.
1014                   _ | otherwise    ->  throwErrno "select"
1015                         -- otherwise (ENOMEM or EINVAL) something has gone
1016                         -- wrong; report the error.
1017              else
1018                 return (False,delays')
1019
1020   (wakeup_all,delays') <- do_select delays0
1021
1022   exit <-
1023     if wakeup_all then return False
1024       else do
1025         b <- fdIsSet wakeup readfds
1026         if b == 0 
1027           then return False
1028           else alloca $ \p -> do 
1029                  c_read (fromIntegral wakeup) p 1
1030                  s <- peek p            
1031                  case s of
1032                   _ | s == io_MANAGER_WAKEUP -> return False
1033                   _ | s == io_MANAGER_DIE    -> return True
1034                   _ | s == io_MANAGER_SYNC   -> do
1035                        mvars <- readIORef sync
1036                        mapM_ (flip putMVar ()) mvars
1037                        return False
1038                   _ -> do
1039                        fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1040                        withForeignPtr fp $ \p_siginfo -> do
1041                          r <- c_read (fromIntegral wakeup) (castPtr p_siginfo)
1042                                  sizeof_siginfo_t
1043                          when (r /= fromIntegral sizeof_siginfo_t) $
1044                             error "failed to read siginfo_t"
1045                        runHandlers' fp (fromIntegral s)
1046                        return False
1047
1048   if exit then return () else do
1049
1050   atomicModifyIORef prodding (\_ -> (False,False))
1051
1052   reqs' <- if wakeup_all then do wakeupAll reqs; return []
1053                          else completeRequests reqs readfds writefds []
1054
1055   service_loop wakeup readfds writefds ptimeval reqs' delays'
1056
1057 io_MANAGER_WAKEUP, io_MANAGER_DIE, io_MANAGER_SYNC :: CChar
1058 io_MANAGER_WAKEUP = 0xff
1059 io_MANAGER_DIE    = 0xfe
1060 io_MANAGER_SYNC   = 0xfd
1061
1062 -- | the stick is for poking the IO manager with
1063 stick :: IORef Fd
1064 {-# NOINLINE stick #-}
1065 stick = unsafePerformIO (newIORef 0)
1066
1067 {-# NOINLINE sync #-}
1068 sync :: IORef [MVar ()]
1069 sync = unsafePerformIO (newIORef [])
1070
1071 -- waits for the IO manager to drain the pipe
1072 syncIOManager :: IO ()
1073 syncIOManager = do
1074   m <- newEmptyMVar
1075   atomicModifyIORef sync (\old -> (m:old,()))
1076   fd <- readIORef stick
1077   with io_MANAGER_SYNC $ \pbuf -> do 
1078     c_write (fromIntegral fd) pbuf 1; return ()
1079   takeMVar m
1080
1081 wakeupIOManager :: IO ()
1082 wakeupIOManager = do
1083   fd <- readIORef stick
1084   with io_MANAGER_WAKEUP $ \pbuf -> do 
1085     c_write (fromIntegral fd) pbuf 1; return ()
1086
1087 -- For the non-threaded RTS
1088 runHandlers :: Ptr Word8 -> Int -> IO ()
1089 runHandlers p_info sig = do
1090   fp <- mallocForeignPtrBytes (fromIntegral sizeof_siginfo_t)
1091   withForeignPtr fp $ \p -> do
1092     copyBytes p p_info (fromIntegral sizeof_siginfo_t)
1093     free p_info
1094   runHandlers' fp (fromIntegral sig)
1095
1096 runHandlers' :: ForeignPtr Word8 -> Signal -> IO ()
1097 runHandlers' p_info sig = do
1098   let int = fromIntegral sig
1099   withMVar signal_handlers $ \arr ->
1100       if not (inRange (boundsIOArray arr) int)
1101          then return ()
1102          else do handler <- unsafeReadIOArray arr int
1103                  case handler of
1104                     Nothing -> return ()
1105                     Just (f,_)  -> do forkIO (f p_info); return ()
1106
1107 foreign import ccall "setIOManagerPipe"
1108   c_setIOManagerPipe :: CInt -> IO ()
1109
1110 foreign import ccall "__hscore_sizeof_siginfo_t"
1111   sizeof_siginfo_t :: CSize
1112
1113 type Signal = CInt
1114
1115 maxSig = 64 :: Int
1116
1117 type HandlerFun = ForeignPtr Word8 -> IO ()
1118
1119 -- Lock used to protect concurrent access to signal_handlers.  Symptom of
1120 -- this race condition is #1922, although that bug was on Windows a similar
1121 -- bug also exists on Unix.
1122 {-# NOINLINE signal_handlers #-}
1123 signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
1124 signal_handlers = unsafePerformIO $ do
1125    arr <- newIOArray (0,maxSig) Nothing
1126    m <- newMVar arr
1127    block $ do
1128      stable_ref <- newStablePtr m
1129      let ref = castStablePtrToPtr stable_ref
1130      ref2 <- getOrSetSignalHandlerStore ref
1131      if ref==ref2
1132         then return m
1133         else do freeStablePtr stable_ref
1134                 deRefStablePtr (castPtrToStablePtr ref2)
1135
1136 foreign import ccall unsafe "getOrSetSignalHandlerStore"
1137     getOrSetSignalHandlerStore :: Ptr a -> IO (Ptr a)
1138
1139 setHandler :: Signal -> Maybe (HandlerFun,Dynamic) -> IO (Maybe (HandlerFun,Dynamic))
1140 setHandler sig handler = do
1141   let int = fromIntegral sig
1142   withMVar signal_handlers $ \arr -> 
1143      if not (inRange (boundsIOArray arr) int)
1144         then error "GHC.Conc.setHandler: signal out of range"
1145         else do old <- unsafeReadIOArray arr int
1146                 unsafeWriteIOArray arr int handler
1147                 return old
1148
1149 -- -----------------------------------------------------------------------------
1150 -- IO requests
1151
1152 buildFdSets :: Fd -> Ptr CFdSet -> Ptr CFdSet -> [IOReq] -> IO Fd
1153 buildFdSets maxfd _       _        [] = return maxfd
1154 buildFdSets maxfd readfds writefds (Read fd _ : reqs)
1155   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1156   | otherwise        =  do
1157         fdSet fd readfds
1158         buildFdSets (max maxfd fd) readfds writefds reqs
1159 buildFdSets maxfd readfds writefds (Write fd _ : reqs)
1160   | fd >= fD_SETSIZE =  error "buildFdSets: file descriptor out of range"
1161   | otherwise        =  do
1162         fdSet fd writefds
1163         buildFdSets (max maxfd fd) readfds writefds reqs
1164
1165 completeRequests :: [IOReq] -> Ptr CFdSet -> Ptr CFdSet -> [IOReq]
1166                  -> IO [IOReq]
1167 completeRequests [] _ _ reqs' = return reqs'
1168 completeRequests (Read fd m : reqs) readfds writefds reqs' = do
1169   b <- fdIsSet fd readfds
1170   if b /= 0
1171     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1172     else completeRequests reqs readfds writefds (Read fd m : reqs')
1173 completeRequests (Write fd m : reqs) readfds writefds reqs' = do
1174   b <- fdIsSet fd writefds
1175   if b /= 0
1176     then do putMVar m (); completeRequests reqs readfds writefds reqs'
1177     else completeRequests reqs readfds writefds (Write fd m : reqs')
1178
1179 wakeupAll :: [IOReq] -> IO ()
1180 wakeupAll [] = return ()
1181 wakeupAll (Read  _ m : reqs) = do putMVar m (); wakeupAll reqs
1182 wakeupAll (Write _ m : reqs) = do putMVar m (); wakeupAll reqs
1183
1184 waitForReadEvent :: Fd -> IO ()
1185 waitForReadEvent fd = do
1186   m <- newEmptyMVar
1187   atomicModifyIORef pendingEvents (\xs -> (Read fd m : xs, ()))
1188   prodServiceThread
1189   takeMVar m
1190
1191 waitForWriteEvent :: Fd -> IO ()
1192 waitForWriteEvent fd = do
1193   m <- newEmptyMVar
1194   atomicModifyIORef pendingEvents (\xs -> (Write fd m : xs, ()))
1195   prodServiceThread
1196   takeMVar m
1197
1198 -- -----------------------------------------------------------------------------
1199 -- Delays
1200
1201 -- Walk the queue of pending delays, waking up any that have passed
1202 -- and return the smallest delay to wait for.  The queue of pending
1203 -- delays is kept ordered.
1204 getDelay :: USecs -> Ptr CTimeVal -> [DelayReq] -> IO ([DelayReq], Ptr CTimeVal)
1205 getDelay _   _        [] = return ([],nullPtr)
1206 getDelay now ptimeval all@(d : rest) 
1207   = case d of
1208      Delay time m | now >= time -> do
1209         putMVar m ()
1210         getDelay now ptimeval rest
1211      DelaySTM time t | now >= time -> do
1212         atomically $ writeTVar t True
1213         getDelay now ptimeval rest
1214      _otherwise -> do
1215         setTimevalTicks ptimeval (delayTime d - now)
1216         return (all,ptimeval)
1217
1218 data CTimeVal
1219
1220 foreign import ccall unsafe "sizeofTimeVal"
1221   sizeofTimeVal :: Int
1222
1223 foreign import ccall unsafe "setTimevalTicks" 
1224   setTimevalTicks :: Ptr CTimeVal -> USecs -> IO ()
1225
1226 {- 
1227   On Win32 we're going to have a single Pipe, and a
1228   waitForSingleObject with the delay time.  For signals, we send a
1229   byte down the pipe just like on Unix.
1230 -}
1231
1232 -- ----------------------------------------------------------------------------
1233 -- select() interface
1234
1235 -- ToDo: move to System.Posix.Internals?
1236
1237 data CFdSet
1238
1239 foreign import ccall safe "select"
1240   c_select :: CInt -> Ptr CFdSet -> Ptr CFdSet -> Ptr CFdSet -> Ptr CTimeVal
1241            -> IO CInt
1242
1243 foreign import ccall unsafe "hsFD_SETSIZE"
1244   c_fD_SETSIZE :: CInt
1245
1246 fD_SETSIZE :: Fd
1247 fD_SETSIZE = fromIntegral c_fD_SETSIZE
1248
1249 foreign import ccall unsafe "hsFD_ISSET"
1250   c_fdIsSet :: CInt -> Ptr CFdSet -> IO CInt
1251
1252 fdIsSet :: Fd -> Ptr CFdSet -> IO CInt
1253 fdIsSet (Fd fd) fdset = c_fdIsSet fd fdset
1254
1255 foreign import ccall unsafe "hsFD_SET"
1256   c_fdSet :: CInt -> Ptr CFdSet -> IO ()
1257
1258 fdSet :: Fd -> Ptr CFdSet -> IO ()
1259 fdSet (Fd fd) fdset = c_fdSet fd fdset
1260
1261 foreign import ccall unsafe "hsFD_ZERO"
1262   fdZero :: Ptr CFdSet -> IO ()
1263
1264 foreign import ccall unsafe "sizeof_fd_set"
1265   sizeofFdSet :: Int
1266
1267 #endif
1268
1269 reportStackOverflow :: IO a
1270 reportStackOverflow = do callStackOverflowHook; return undefined
1271
1272 reportError :: SomeException -> IO a
1273 reportError ex = do
1274    handler <- getUncaughtExceptionHandler
1275    handler ex
1276    return undefined
1277
1278 -- SUP: Are the hooks allowed to re-enter Haskell land?  If so, remove
1279 -- the unsafe below.
1280 foreign import ccall unsafe "stackOverflow"
1281         callStackOverflowHook :: IO ()
1282
1283 {-# NOINLINE uncaughtExceptionHandler #-}
1284 uncaughtExceptionHandler :: IORef (SomeException -> IO ())
1285 uncaughtExceptionHandler = unsafePerformIO (newIORef defaultHandler)
1286    where
1287       defaultHandler :: SomeException -> IO ()
1288       defaultHandler se@(SomeException ex) = do
1289          (hFlush stdout) `catchAny` (\ _ -> return ())
1290          let msg = case cast ex of
1291                Just Deadlock -> "no threads to run:  infinite loop or deadlock?"
1292                _ -> case cast ex of
1293                     Just (ErrorCall s) -> s
1294                     _                  -> showsPrec 0 se ""
1295          withCString "%s" $ \cfmt ->
1296           withCString msg $ \cmsg ->
1297             errorBelch cfmt cmsg
1298
1299 -- don't use errorBelch() directly, because we cannot call varargs functions
1300 -- using the FFI.
1301 foreign import ccall unsafe "HsBase.h errorBelch2"
1302    errorBelch :: CString -> CString -> IO ()
1303
1304 setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()
1305 setUncaughtExceptionHandler = writeIORef uncaughtExceptionHandler
1306
1307 getUncaughtExceptionHandler :: IO (SomeException -> IO ())
1308 getUncaughtExceptionHandler = readIORef uncaughtExceptionHandler
1309 \end{code}