Move `Maybe`-typedef into GHC.Base
[ghc.git] / libraries / base / GHC / Event / Thread.hs
1 {-# LANGUAGE Trustworthy #-}
2 {-# LANGUAGE BangPatterns, NoImplicitPrelude #-}
3 module GHC.Event.Thread
4 ( getSystemEventManager
5 , getSystemTimerManager
6 , ensureIOManagerIsRunning
7 , ioManagerCapabilitiesChanged
8 , threadWaitRead
9 , threadWaitWrite
10 , threadWaitReadSTM
11 , threadWaitWriteSTM
12 , closeFdWith
13 , threadDelay
14 , registerDelay
15 , blockedOnBadFD -- used by RTS
16 ) where
17
18 import Control.Exception (finally, SomeException, toException)
19 import Control.Monad (forM, forM_, sequence_, zipWithM, when)
20 import Data.IORef (IORef, newIORef, readIORef, writeIORef)
21 import Data.List (zipWith3)
22 import Data.Tuple (snd)
23 import Foreign.C.Error (eBADF, errnoToIOError)
24 import Foreign.C.Types (CInt(..), CUInt(..))
25 import Foreign.Ptr (Ptr)
26 import GHC.Base
27 import GHC.Conc.Sync (TVar, ThreadId, ThreadStatus(..), atomically, forkIO,
28 labelThread, modifyMVar_, withMVar, newTVar, sharedCAF,
29 getNumCapabilities, threadCapability, myThreadId, forkOn,
30 threadStatus, writeTVar, newTVarIO, readTVar, retry,throwSTM,STM)
31 import GHC.IO (mask_, onException)
32 import GHC.IO.Exception (ioError)
33 import GHC.IOArray (IOArray, newIOArray, readIOArray, writeIOArray,
34 boundsIOArray)
35 import GHC.MVar (MVar, newEmptyMVar, newMVar, putMVar, takeMVar)
36 import GHC.Event.Control (controlWriteFd)
37 import GHC.Event.Internal (eventIs, evtClose)
38 import GHC.Event.Manager (Event, EventManager, evtRead, evtWrite, loop,
39 new, registerFd, unregisterFd_)
40 import qualified GHC.Event.Manager as M
41 import qualified GHC.Event.TimerManager as TM
42 import GHC.Num ((-), (+))
43 import GHC.Real (fromIntegral)
44 import GHC.Show (showSignedInt)
45 import System.IO.Unsafe (unsafePerformIO)
46 import System.Posix.Types (Fd)
47
48 -- | Suspends the current thread for a given number of microseconds
49 -- (GHC only).
50 --
51 -- There is no guarantee that the thread will be rescheduled promptly
52 -- when the delay has expired, but the thread will never continue to
53 -- run /earlier/ than specified.
54 threadDelay :: Int -> IO ()
55 threadDelay usecs = mask_ $ do
56 mgr <- getSystemTimerManager
57 m <- newEmptyMVar
58 reg <- TM.registerTimeout mgr usecs (putMVar m ())
59 takeMVar m `onException` TM.unregisterTimeout mgr reg
60
61 -- | Set the value of returned TVar to True after a given number of
62 -- microseconds. The caveats associated with threadDelay also apply.
63 --
64 registerDelay :: Int -> IO (TVar Bool)
65 registerDelay usecs = do
66 t <- atomically $ newTVar False
67 mgr <- getSystemTimerManager
68 _ <- TM.registerTimeout mgr usecs . atomically $ writeTVar t True
69 return t
70
71 -- | Block the current thread until data is available to read from the
72 -- given file descriptor.
73 --
74 -- This will throw an 'IOError' if the file descriptor was closed
75 -- while this thread was blocked. To safely close a file descriptor
76 -- that has been used with 'threadWaitRead', use 'closeFdWith'.
77 threadWaitRead :: Fd -> IO ()
78 threadWaitRead = threadWait evtRead
79 {-# INLINE threadWaitRead #-}
80
81 -- | Block the current thread until the given file descriptor can
82 -- accept data to write.
83 --
84 -- This will throw an 'IOError' if the file descriptor was closed
85 -- while this thread was blocked. To safely close a file descriptor
86 -- that has been used with 'threadWaitWrite', use 'closeFdWith'.
87 threadWaitWrite :: Fd -> IO ()
88 threadWaitWrite = threadWait evtWrite
89 {-# INLINE threadWaitWrite #-}
90
91 -- | Close a file descriptor in a concurrency-safe way.
92 --
93 -- Any threads that are blocked on the file descriptor via
94 -- 'threadWaitRead' or 'threadWaitWrite' will be unblocked by having
95 -- IO exceptions thrown.
96 closeFdWith :: (Fd -> IO ()) -- ^ Action that performs the close.
97 -> Fd -- ^ File descriptor to close.
98 -> IO ()
99 closeFdWith close fd = do
100 eventManagerArray <- readIORef eventManager
101 let (low, high) = boundsIOArray eventManagerArray
102 mgrs <- forM [low..high] $ \i -> do
103 Just (_,!mgr) <- readIOArray eventManagerArray i
104 return mgr
105 mask_ $ do
106 tables <- forM mgrs $ \mgr -> takeMVar $ M.callbackTableVar mgr fd
107 cbApps <- zipWithM (\mgr table -> M.closeFd_ mgr table fd) mgrs tables
108 close fd `finally` sequence_ (zipWith3 finish mgrs tables cbApps)
109 where
110 finish mgr table cbApp = putMVar (M.callbackTableVar mgr fd) table >> cbApp
111
112 threadWait :: Event -> Fd -> IO ()
113 threadWait evt fd = mask_ $ do
114 m <- newEmptyMVar
115 mgr <- getSystemEventManager_
116 reg <- registerFd mgr (\_ e -> putMVar m e) fd evt
117 evt' <- takeMVar m `onException` unregisterFd_ mgr reg
118 if evt' `eventIs` evtClose
119 then ioError $ errnoToIOError "threadWait" eBADF Nothing Nothing
120 else return ()
121
122 -- used at least by RTS in 'select()' IO manager backend
123 blockedOnBadFD :: SomeException
124 blockedOnBadFD = toException $ errnoToIOError "awaitEvent" eBADF Nothing Nothing
125
126 threadWaitSTM :: Event -> Fd -> IO (STM (), IO ())
127 threadWaitSTM evt fd = mask_ $ do
128 m <- newTVarIO Nothing
129 mgr <- getSystemEventManager_
130 reg <- registerFd mgr (\_ e -> atomically (writeTVar m (Just e))) fd evt
131 let waitAction =
132 do mevt <- readTVar m
133 case mevt of
134 Nothing -> retry
135 Just evt' ->
136 if evt' `eventIs` evtClose
137 then throwSTM $ errnoToIOError "threadWaitSTM" eBADF Nothing Nothing
138 else return ()
139 return (waitAction, unregisterFd_ mgr reg >> return ())
140
141 -- | Allows a thread to use an STM action to wait for a file descriptor to be readable.
142 -- The STM action will retry until the file descriptor has data ready.
143 -- The second element of the return value pair is an IO action that can be used
144 -- to deregister interest in the file descriptor.
145 --
146 -- The STM action will throw an 'IOError' if the file descriptor was closed
147 -- while the STM action is being executed. To safely close a file descriptor
148 -- that has been used with 'threadWaitReadSTM', use 'closeFdWith'.
149 threadWaitReadSTM :: Fd -> IO (STM (), IO ())
150 threadWaitReadSTM = threadWaitSTM evtRead
151 {-# INLINE threadWaitReadSTM #-}
152
153 -- | Allows a thread to use an STM action to wait until a file descriptor can accept a write.
154 -- The STM action will retry while the file until the given file descriptor can accept a write.
155 -- The second element of the return value pair is an IO action that can be used to deregister
156 -- interest in the file descriptor.
157 --
158 -- The STM action will throw an 'IOError' if the file descriptor was closed
159 -- while the STM action is being executed. To safely close a file descriptor
160 -- that has been used with 'threadWaitWriteSTM', use 'closeFdWith'.
161 threadWaitWriteSTM :: Fd -> IO (STM (), IO ())
162 threadWaitWriteSTM = threadWaitSTM evtWrite
163 {-# INLINE threadWaitWriteSTM #-}
164
165
166 -- | Retrieve the system event manager for the capability on which the
167 -- calling thread is running.
168 --
169 -- This function always returns 'Just' the current thread's event manager
170 -- when using the threaded RTS and 'Nothing' otherwise.
171 getSystemEventManager :: IO (Maybe EventManager)
172 getSystemEventManager = do
173 t <- myThreadId
174 (cap, _) <- threadCapability t
175 eventManagerArray <- readIORef eventManager
176 mmgr <- readIOArray eventManagerArray cap
177 return $ fmap snd mmgr
178
179 getSystemEventManager_ :: IO EventManager
180 getSystemEventManager_ = do
181 Just mgr <- getSystemEventManager
182 return mgr
183 {-# INLINE getSystemEventManager_ #-}
184
185 foreign import ccall unsafe "getOrSetSystemEventThreadEventManagerStore"
186 getOrSetSystemEventThreadEventManagerStore :: Ptr a -> IO (Ptr a)
187
188 eventManager :: IORef (IOArray Int (Maybe (ThreadId, EventManager)))
189 eventManager = unsafePerformIO $ do
190 numCaps <- getNumCapabilities
191 eventManagerArray <- newIOArray (0, numCaps - 1) Nothing
192 em <- newIORef eventManagerArray
193 sharedCAF em getOrSetSystemEventThreadEventManagerStore
194 {-# NOINLINE eventManager #-}
195
196 numEnabledEventManagers :: IORef Int
197 numEnabledEventManagers = unsafePerformIO $ do
198 newIORef 0
199 {-# NOINLINE numEnabledEventManagers #-}
200
201 foreign import ccall unsafe "getOrSetSystemEventThreadIOManagerThreadStore"
202 getOrSetSystemEventThreadIOManagerThreadStore :: Ptr a -> IO (Ptr a)
203
204 -- | The ioManagerLock protects the 'eventManager' value:
205 -- Only one thread at a time can start or shutdown event managers.
206 {-# NOINLINE ioManagerLock #-}
207 ioManagerLock :: MVar ()
208 ioManagerLock = unsafePerformIO $ do
209 m <- newMVar ()
210 sharedCAF m getOrSetSystemEventThreadIOManagerThreadStore
211
212 getSystemTimerManager :: IO TM.TimerManager
213 getSystemTimerManager = do
214 Just mgr <- readIORef timerManager
215 return mgr
216
217 foreign import ccall unsafe "getOrSetSystemTimerThreadEventManagerStore"
218 getOrSetSystemTimerThreadEventManagerStore :: Ptr a -> IO (Ptr a)
219
220 timerManager :: IORef (Maybe TM.TimerManager)
221 timerManager = unsafePerformIO $ do
222 em <- newIORef Nothing
223 sharedCAF em getOrSetSystemTimerThreadEventManagerStore
224 {-# NOINLINE timerManager #-}
225
226 foreign import ccall unsafe "getOrSetSystemTimerThreadIOManagerThreadStore"
227 getOrSetSystemTimerThreadIOManagerThreadStore :: Ptr a -> IO (Ptr a)
228
229 {-# NOINLINE timerManagerThreadVar #-}
230 timerManagerThreadVar :: MVar (Maybe ThreadId)
231 timerManagerThreadVar = unsafePerformIO $ do
232 m <- newMVar Nothing
233 sharedCAF m getOrSetSystemTimerThreadIOManagerThreadStore
234
235 ensureIOManagerIsRunning :: IO ()
236 ensureIOManagerIsRunning
237 | not threaded = return ()
238 | otherwise = do
239 startIOManagerThreads
240 startTimerManagerThread
241
242 startIOManagerThreads :: IO ()
243 startIOManagerThreads =
244 withMVar ioManagerLock $ \_ -> do
245 eventManagerArray <- readIORef eventManager
246 let (_, high) = boundsIOArray eventManagerArray
247 forM_ [0..high] (startIOManagerThread eventManagerArray)
248 writeIORef numEnabledEventManagers (high+1)
249
250 show_int :: Int -> String
251 show_int i = showSignedInt 0 i ""
252
253 restartPollLoop :: EventManager -> Int -> IO ThreadId
254 restartPollLoop mgr i = do
255 M.release mgr
256 !t <- forkOn i $ loop mgr
257 labelThread t ("IOManager on cap " ++ show_int i)
258 return t
259
260 startIOManagerThread :: IOArray Int (Maybe (ThreadId, EventManager))
261 -> Int
262 -> IO ()
263 startIOManagerThread eventManagerArray i = do
264 let create = do
265 !mgr <- new True
266 !t <- forkOn i $ do
267 c_setIOManagerControlFd
268 (fromIntegral i)
269 (fromIntegral $ controlWriteFd $ M.emControl mgr)
270 loop mgr
271 labelThread t ("IOManager on cap " ++ show_int i)
272 writeIOArray eventManagerArray i (Just (t,mgr))
273 old <- readIOArray eventManagerArray i
274 case old of
275 Nothing -> create
276 Just (t,em) -> do
277 s <- threadStatus t
278 case s of
279 ThreadFinished -> create
280 ThreadDied -> do
281 -- Sanity check: if the thread has died, there is a chance
282 -- that event manager is still alive. This could happend during
283 -- the fork, for example. In this case we should clean up
284 -- open pipes and everything else related to the event manager.
285 -- See #4449
286 c_setIOManagerControlFd (fromIntegral i) (-1)
287 M.cleanup em
288 create
289 _other -> return ()
290
291 startTimerManagerThread :: IO ()
292 startTimerManagerThread = modifyMVar_ timerManagerThreadVar $ \old -> do
293 let create = do
294 !mgr <- TM.new
295 c_setTimerManagerControlFd
296 (fromIntegral $ controlWriteFd $ TM.emControl mgr)
297 writeIORef timerManager $ Just mgr
298 !t <- forkIO $ TM.loop mgr
299 labelThread t "TimerManager"
300 return $ Just t
301 case old of
302 Nothing -> create
303 st@(Just t) -> do
304 s <- threadStatus t
305 case s of
306 ThreadFinished -> create
307 ThreadDied -> do
308 -- Sanity check: if the thread has died, there is a chance
309 -- that event manager is still alive. This could happend during
310 -- the fork, for example. In this case we should clean up
311 -- open pipes and everything else related to the event manager.
312 -- See #4449
313 mem <- readIORef timerManager
314 _ <- case mem of
315 Nothing -> return ()
316 Just em -> do c_setTimerManagerControlFd (-1)
317 TM.cleanup em
318 create
319 _other -> return st
320
321 foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
322
323 ioManagerCapabilitiesChanged :: IO ()
324 ioManagerCapabilitiesChanged = do
325 withMVar ioManagerLock $ \_ -> do
326 new_n_caps <- getNumCapabilities
327 numEnabled <- readIORef numEnabledEventManagers
328 writeIORef numEnabledEventManagers new_n_caps
329 eventManagerArray <- readIORef eventManager
330 let (_, high) = boundsIOArray eventManagerArray
331 let old_n_caps = high + 1
332 if new_n_caps > old_n_caps
333 then do new_eventManagerArray <- newIOArray (0, new_n_caps - 1) Nothing
334
335 -- copy the existing values into the new array:
336 forM_ [0..high] $ \i -> do
337 Just (tid,mgr) <- readIOArray eventManagerArray i
338 if i < numEnabled
339 then writeIOArray new_eventManagerArray i (Just (tid,mgr))
340 else do tid' <- restartPollLoop mgr i
341 writeIOArray new_eventManagerArray i (Just (tid',mgr))
342
343 -- create new IO managers for the new caps:
344 forM_ [old_n_caps..new_n_caps-1] $
345 startIOManagerThread new_eventManagerArray
346
347 -- update the event manager array reference:
348 writeIORef eventManager new_eventManagerArray
349 else when (new_n_caps > numEnabled) $
350 forM_ [numEnabled..new_n_caps-1] $ \i -> do
351 Just (_,mgr) <- readIOArray eventManagerArray i
352 tid <- restartPollLoop mgr i
353 writeIOArray eventManagerArray i (Just (tid,mgr))
354
355 -- Used to tell the RTS how it can send messages to the I/O manager.
356 foreign import ccall unsafe "setIOManagerControlFd"
357 c_setIOManagerControlFd :: CUInt -> CInt -> IO ()
358
359 foreign import ccall unsafe "setTimerManagerControlFd"
360 c_setTimerManagerControlFd :: CInt -> IO ()