089532cafbba340e747988def5ecd612480fdd1d
[packages/base.git] / GHC / Event / Manager.hs
1 {-# LANGUAGE Trustworthy #-}
2 {-# LANGUAGE BangPatterns
3 , CPP
4 , ExistentialQuantification
5 , NoImplicitPrelude
6 , RecordWildCards
7 , TypeSynonymInstances
8 , FlexibleInstances
9 #-}
10
11 module GHC.Event.Manager
12 ( -- * Types
13 EventManager
14
15 -- * Creation
16 , new
17 , newWith
18 , newDefaultBackend
19
20 -- * Running
21 , finished
22 , loop
23 , step
24 , shutdown
25 , cleanup
26 , wakeManager
27
28 -- * Registering interest in I/O events
29 , Event
30 , evtRead
31 , evtWrite
32 , IOCallback
33 , FdKey(keyFd)
34 , registerFd_
35 , registerFd
36 , unregisterFd_
37 , unregisterFd
38 , closeFd
39
40 -- * Registering interest in timeout events
41 , TimeoutCallback
42 , TimeoutKey
43 , registerTimeout
44 , updateTimeout
45 , unregisterTimeout
46 ) where
47
48 #include "EventConfig.h"
49
50 ------------------------------------------------------------------------
51 -- Imports
52
53 import Control.Concurrent.MVar (MVar, modifyMVar, newMVar, readMVar)
54 import Control.Exception (finally)
55 import Control.Monad ((=<<), forM_, liftM, sequence_, when)
56 import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
57 writeIORef)
58 import Data.Maybe (Maybe(..))
59 import Data.Monoid (mappend, mconcat, mempty)
60 import GHC.Base
61 import GHC.Conc.Signal (runHandlers)
62 import GHC.List (filter)
63 import GHC.Num (Num(..))
64 import GHC.Real ((/), fromIntegral )
65 import GHC.Show (Show(..))
66 import GHC.Event.Clock (getCurrentTime)
67 import GHC.Event.Control
68 import GHC.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
69 Timeout(..))
70 import GHC.Event.Unique (Unique, UniqueSource, newSource, newUnique)
71 import System.Posix.Types (Fd)
72
73 import qualified GHC.Event.IntMap as IM
74 import qualified GHC.Event.Internal as I
75 import qualified GHC.Event.PSQ as Q
76
77 #if defined(HAVE_KQUEUE)
78 import qualified GHC.Event.KQueue as KQueue
79 #elif defined(HAVE_EPOLL)
80 import qualified GHC.Event.EPoll as EPoll
81 #elif defined(HAVE_POLL)
82 import qualified GHC.Event.Poll as Poll
83 #else
84 # error not implemented for this operating system
85 #endif
86
87 ------------------------------------------------------------------------
88 -- Types
89
90 data FdData = FdData {
91 fdKey :: {-# UNPACK #-} !FdKey
92 , fdEvents :: {-# UNPACK #-} !Event
93 , _fdCallback :: !IOCallback
94 }
95
96 -- | A file descriptor registration cookie.
97 data FdKey = FdKey {
98 keyFd :: {-# UNPACK #-} !Fd
99 , keyUnique :: {-# UNPACK #-} !Unique
100 } deriving (Eq, Show)
101
102 -- | Callback invoked on I/O events.
103 type IOCallback = FdKey -> Event -> IO ()
104
105 -- | A timeout registration cookie.
106 newtype TimeoutKey = TK Unique
107 deriving (Eq)
108
109 -- | Callback invoked on timeout events.
110 type TimeoutCallback = IO ()
111
112 data State = Created
113 | Running
114 | Dying
115 | Finished
116 deriving (Eq, Show)
117
118 -- | A priority search queue, with timeouts as priorities.
119 type TimeoutQueue = Q.PSQ TimeoutCallback
120
121 {-
122 Instead of directly modifying the 'TimeoutQueue' in
123 e.g. 'registerTimeout' we keep a list of edits to perform, in the form
124 of a chain of function closures, and have the I/O manager thread
125 perform the edits later. This exist to address the following GC
126 problem:
127
128 Since e.g. 'registerTimeout' doesn't force the evaluation of the
129 thunks inside the 'emTimeouts' IORef a number of thunks build up
130 inside the IORef. If the I/O manager thread doesn't evaluate these
131 thunks soon enough they'll get promoted to the old generation and
132 become roots for all subsequent minor GCs.
133
134 When the thunks eventually get evaluated they will each create a new
135 intermediate 'TimeoutQueue' that immediately becomes garbage. Since
136 the thunks serve as roots until the next major GC these intermediate
137 'TimeoutQueue's will get copied unnecesarily in the next minor GC,
138 increasing GC time. This problem is known as "floating garbage".
139
140 Keeping a list of edits doesn't stop this from happening but makes the
141 amount of data that gets copied smaller.
142
143 TODO: Evaluate the content of the IORef to WHNF on each insert once
144 this bug is resolved: http://hackage.haskell.org/trac/ghc/ticket/3838
145 -}
146
147 -- | An edit to apply to a 'TimeoutQueue'.
148 type TimeoutEdit = TimeoutQueue -> TimeoutQueue
149
150 -- | The event manager state.
151 data EventManager = EventManager
152 { emBackend :: !Backend
153 , emFds :: {-# UNPACK #-} !(MVar (IM.IntMap [FdData]))
154 , emTimeouts :: {-# UNPACK #-} !(IORef TimeoutEdit)
155 , emState :: {-# UNPACK #-} !(IORef State)
156 , emUniqueSource :: {-# UNPACK #-} !UniqueSource
157 , emControl :: {-# UNPACK #-} !Control
158 }
159
160 ------------------------------------------------------------------------
161 -- Creation
162
163 handleControlEvent :: EventManager -> FdKey -> Event -> IO ()
164 handleControlEvent mgr reg _evt = do
165 msg <- readControlMessage (emControl mgr) (keyFd reg)
166 case msg of
167 CMsgWakeup -> return ()
168 CMsgDie -> writeIORef (emState mgr) Finished
169 CMsgSignal fp s -> runHandlers fp s
170
171 newDefaultBackend :: IO Backend
172 #if defined(HAVE_KQUEUE)
173 newDefaultBackend = KQueue.new
174 #elif defined(HAVE_EPOLL)
175 newDefaultBackend = EPoll.new
176 #elif defined(HAVE_POLL)
177 newDefaultBackend = Poll.new
178 #else
179 newDefaultBackend = error "no back end for this platform"
180 #endif
181
182 -- | Create a new event manager.
183 new :: IO EventManager
184 new = newWith =<< newDefaultBackend
185
186 newWith :: Backend -> IO EventManager
187 newWith be = do
188 iofds <- newMVar IM.empty
189 timeouts <- newIORef id
190 ctrl <- newControl
191 state <- newIORef Created
192 us <- newSource
193 _ <- mkWeakIORef state $ do
194 st <- atomicModifyIORef state $ \s -> (Finished, s)
195 when (st /= Finished) $ do
196 I.delete be
197 closeControl ctrl
198 let mgr = EventManager { emBackend = be
199 , emFds = iofds
200 , emTimeouts = timeouts
201 , emState = state
202 , emUniqueSource = us
203 , emControl = ctrl
204 }
205 _ <- registerFd_ mgr (handleControlEvent mgr) (controlReadFd ctrl) evtRead
206 _ <- registerFd_ mgr (handleControlEvent mgr) (wakeupReadFd ctrl) evtRead
207 return mgr
208
209 -- | Asynchronously shuts down the event manager, if running.
210 shutdown :: EventManager -> IO ()
211 shutdown mgr = do
212 state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
213 when (state == Running) $ sendDie (emControl mgr)
214
215 finished :: EventManager -> IO Bool
216 finished mgr = (== Finished) `liftM` readIORef (emState mgr)
217
218 cleanup :: EventManager -> IO ()
219 cleanup EventManager{..} = do
220 writeIORef emState Finished
221 I.delete emBackend
222 closeControl emControl
223
224 ------------------------------------------------------------------------
225 -- Event loop
226
227 -- | Start handling events. This function loops until told to stop,
228 -- using 'shutdown'.
229 --
230 -- /Note/: This loop can only be run once per 'EventManager', as it
231 -- closes all of its control resources when it finishes.
232 loop :: EventManager -> IO ()
233 loop mgr@EventManager{..} = do
234 state <- atomicModifyIORef emState $ \s -> case s of
235 Created -> (Running, s)
236 _ -> (s, s)
237 case state of
238 Created -> go Q.empty `finally` cleanup mgr
239 Dying -> cleanup mgr
240 _ -> do cleanup mgr
241 error $ "GHC.Event.Manager.loop: state is already " ++
242 show state
243 where
244 go q = do (running, q') <- step mgr q
245 when running $ go q'
246
247 step :: EventManager -> TimeoutQueue -> IO (Bool, TimeoutQueue)
248 step mgr@EventManager{..} tq = do
249 (timeout, q') <- mkTimeout tq
250 I.poll emBackend timeout (onFdEvent mgr)
251 state <- readIORef emState
252 state `seq` return (state == Running, q')
253 where
254
255 -- | Call all expired timer callbacks and return the time to the
256 -- next timeout.
257 mkTimeout :: TimeoutQueue -> IO (Timeout, TimeoutQueue)
258 mkTimeout q = do
259 now <- getCurrentTime
260 applyEdits <- atomicModifyIORef emTimeouts $ \f -> (id, f)
261 let (expired, q'') = let q' = applyEdits q in q' `seq` Q.atMost now q'
262 sequence_ $ map Q.value expired
263 let timeout = case Q.minView q'' of
264 Nothing -> Forever
265 Just (Q.E _ t _, _) ->
266 -- This value will always be positive since the call
267 -- to 'atMost' above removed any timeouts <= 'now'
268 let t' = t - now in t' `seq` Timeout t'
269 return (timeout, q'')
270
271 ------------------------------------------------------------------------
272 -- Registering interest in I/O events
273
274 -- | Register interest in the given events, without waking the event
275 -- manager thread. The 'Bool' return value indicates whether the
276 -- event manager ought to be woken.
277 registerFd_ :: EventManager -> IOCallback -> Fd -> Event
278 -> IO (FdKey, Bool)
279 registerFd_ EventManager{..} cb fd evs = do
280 u <- newUnique emUniqueSource
281 modifyMVar emFds $ \oldMap -> do
282 let fd' = fromIntegral fd
283 reg = FdKey fd u
284 !fdd = FdData reg evs cb
285 (!newMap, (oldEvs, newEvs)) =
286 case IM.insertWith (++) fd' [fdd] oldMap of
287 (Nothing, n) -> (n, (mempty, evs))
288 (Just prev, n) -> (n, pairEvents prev newMap fd')
289 modify = oldEvs /= newEvs
290 when modify $ I.modifyFd emBackend fd oldEvs newEvs
291 return (newMap, (reg, modify))
292 {-# INLINE registerFd_ #-}
293
294 -- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
295 -- on the file descriptor @fd@. @cb@ is called for each event that
296 -- occurs. Returns a cookie that can be handed to 'unregisterFd'.
297 registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO FdKey
298 registerFd mgr cb fd evs = do
299 (r, wake) <- registerFd_ mgr cb fd evs
300 when wake $ wakeManager mgr
301 return r
302 {-# INLINE registerFd #-}
303
304 -- | Wake up the event manager.
305 wakeManager :: EventManager -> IO ()
306 wakeManager mgr = sendWakeup (emControl mgr)
307
308 eventsOf :: [FdData] -> Event
309 eventsOf = mconcat . map fdEvents
310
311 pairEvents :: [FdData] -> IM.IntMap [FdData] -> Int -> (Event, Event)
312 pairEvents prev m fd = let l = eventsOf prev
313 r = case IM.lookup fd m of
314 Nothing -> mempty
315 Just fds -> eventsOf fds
316 in (l, r)
317
318 -- | Drop a previous file descriptor registration, without waking the
319 -- event manager thread. The return value indicates whether the event
320 -- manager ought to be woken.
321 unregisterFd_ :: EventManager -> FdKey -> IO Bool
322 unregisterFd_ EventManager{..} (FdKey fd u) =
323 modifyMVar emFds $ \oldMap -> do
324 let dropReg cbs = case filter ((/= u) . keyUnique . fdKey) cbs of
325 [] -> Nothing
326 cbs' -> Just cbs'
327 fd' = fromIntegral fd
328 (!newMap, (oldEvs, newEvs)) =
329 case IM.updateWith dropReg fd' oldMap of
330 (Nothing, _) -> (oldMap, (mempty, mempty))
331 (Just prev, newm) -> (newm, pairEvents prev newm fd')
332 modify = oldEvs /= newEvs
333 when modify $ I.modifyFd emBackend fd oldEvs newEvs
334 return (newMap, modify)
335
336 -- | Drop a previous file descriptor registration.
337 unregisterFd :: EventManager -> FdKey -> IO ()
338 unregisterFd mgr reg = do
339 wake <- unregisterFd_ mgr reg
340 when wake $ wakeManager mgr
341
342 -- | Close a file descriptor in a race-safe way.
343 closeFd :: EventManager -> (Fd -> IO ()) -> Fd -> IO ()
344 closeFd mgr close fd = do
345 fds <- modifyMVar (emFds mgr) $ \oldMap -> do
346 close fd
347 case IM.delete (fromIntegral fd) oldMap of
348 (Nothing, _) -> return (oldMap, [])
349 (Just fds, !newMap) -> do
350 when (eventsOf fds /= mempty) $ wakeManager mgr
351 return (newMap, fds)
352 forM_ fds $ \(FdData reg ev cb) -> cb reg (ev `mappend` evtClose)
353
354 ------------------------------------------------------------------------
355 -- Registering interest in timeout events
356
357 -- | Register a timeout in the given number of microseconds. The
358 -- returned 'TimeoutKey' can be used to later unregister or update the
359 -- timeout. The timeout is automatically unregistered after the given
360 -- time has passed.
361 registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
362 registerTimeout mgr us cb = do
363 !key <- newUnique (emUniqueSource mgr)
364 if us <= 0 then cb
365 else do
366 now <- getCurrentTime
367 let expTime = fromIntegral us / 1000000.0 + now
368
369 -- We intentionally do not evaluate the modified map to WHNF here.
370 -- Instead, we leave a thunk inside the IORef and defer its
371 -- evaluation until mkTimeout in the event loop. This is a
372 -- workaround for a nasty IORef contention problem that causes the
373 -- thread-delay benchmark to take 20 seconds instead of 0.2.
374 atomicModifyIORef (emTimeouts mgr) $ \f ->
375 let f' = (Q.insert key expTime cb) . f in (f', ())
376 wakeManager mgr
377 return $ TK key
378
379 -- | Unregister an active timeout.
380 unregisterTimeout :: EventManager -> TimeoutKey -> IO ()
381 unregisterTimeout mgr (TK key) = do
382 atomicModifyIORef (emTimeouts mgr) $ \f ->
383 let f' = (Q.delete key) . f in (f', ())
384 wakeManager mgr
385
386 -- | Update an active timeout to fire in the given number of
387 -- microseconds.
388 updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
389 updateTimeout mgr (TK key) us = do
390 now <- getCurrentTime
391 let expTime = fromIntegral us / 1000000.0 + now
392
393 atomicModifyIORef (emTimeouts mgr) $ \f ->
394 let f' = (Q.adjust (const expTime) key) . f in (f', ())
395 wakeManager mgr
396
397 ------------------------------------------------------------------------
398 -- Utilities
399
400 -- | Call the callbacks corresponding to the given file descriptor.
401 onFdEvent :: EventManager -> Fd -> Event -> IO ()
402 onFdEvent mgr fd evs = do
403 fds <- readMVar (emFds mgr)
404 case IM.lookup (fromIntegral fd) fds of
405 Just cbs -> forM_ cbs $ \(FdData reg ev cb) ->
406 when (evs `I.eventIs` ev) $ cb reg evs
407 Nothing -> return ()