Update Trac ticket URLs to point to GitLab
[ghc.git] / rts / posix / Select.c
1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team 1995-2002
4 *
5 * Support for concurrent non-blocking I/O and thread waiting in the
6 * non-threaded RTS. In the threded RTS, this file is not used at
7 * all, instead we use the IO manager thread implemented in Haskell in
8 * the base package.
9 *
10 * ---------------------------------------------------------------------------*/
11
12 #include "PosixSource.h"
13 #include "Rts.h"
14
15 #include "Signals.h"
16 #include "Schedule.h"
17 #include "Prelude.h"
18 #include "RaiseAsync.h"
19 #include "RtsUtils.h"
20 #include "Capability.h"
21 #include "Select.h"
22 #include "AwaitEvent.h"
23 #include "Stats.h"
24 #include "GetTime.h"
25
26 # ifdef HAVE_SYS_SELECT_H
27 # include <sys/select.h>
28 # endif
29
30 # ifdef HAVE_SYS_TYPES_H
31 # include <sys/types.h>
32 # endif
33
34 #include <errno.h>
35 #include <string.h>
36
37 #include "Clock.h"
38
39 #if !defined(THREADED_RTS)
40
41 // The target time for a threadDelay is stored in a one-word quantity
42 // in the TSO (tso->block_info.target). On a 32-bit machine we
43 // therefore can't afford to use nanosecond resolution because it
44 // would overflow too quickly, so instead we use millisecond
45 // resolution.
46
47 #if SIZEOF_VOID_P == 4
48 #define LowResTimeToTime(t) (USToTime((t) * 1000))
49 #define TimeToLowResTimeRoundDown(t) ((LowResTime)(TimeToUS(t) / 1000))
50 #define TimeToLowResTimeRoundUp(t) ((TimeToUS(t) + 1000-1) / 1000)
51 #else
52 #define LowResTimeToTime(t) (t)
53 #define TimeToLowResTimeRoundDown(t) (t)
54 #define TimeToLowResTimeRoundUp(t) (t)
55 #endif
56
57 /*
58 * Return the time since the program started, in LowResTime,
59 * rounded down.
60 */
61 static LowResTime getLowResTimeOfDay(void)
62 {
63 return TimeToLowResTimeRoundDown(getProcessElapsedTime());
64 }
65
66 /*
67 * For a given microsecond delay, return the target time in LowResTime.
68 */
69 LowResTime getDelayTarget (HsInt us)
70 {
71 Time elapsed;
72 elapsed = getProcessElapsedTime();
73
74 // If the desired target would be larger than the maximum Time,
75 // default to the maximum Time. (#7087)
76 if (us > TimeToUS(TIME_MAX - elapsed)) {
77 return TimeToLowResTimeRoundDown(TIME_MAX);
78 } else {
79 // round up the target time, because we never want to sleep *less*
80 // than the desired amount.
81 return TimeToLowResTimeRoundUp(elapsed + USToTime(us));
82 }
83 }
84
85 /* There's a clever trick here to avoid problems when the time wraps
86 * around. Since our maximum delay is smaller than 31 bits of ticks
87 * (it's actually 31 bits of microseconds), we can safely check
88 * whether a timer has expired even if our timer will wrap around
89 * before the target is reached, using the following formula:
90 *
91 * (int)((uint)current_time - (uint)target_time) < 0
92 *
93 * if this is true, then our time has expired.
94 * (idea due to Andy Gill).
95 */
96 static bool wakeUpSleepingThreads (LowResTime now)
97 {
98 StgTSO *tso;
99 bool flag = false;
100
101 while (sleeping_queue != END_TSO_QUEUE) {
102 tso = sleeping_queue;
103 if (((long)now - (long)tso->block_info.target) < 0) {
104 break;
105 }
106 sleeping_queue = tso->_link;
107 tso->why_blocked = NotBlocked;
108 tso->_link = END_TSO_QUEUE;
109 IF_DEBUG(scheduler, debugBelch("Waking up sleeping thread %lu\n",
110 (unsigned long)tso->id));
111 // MainCapability: this code is !THREADED_RTS
112 pushOnRunQueue(&MainCapability,tso);
113 flag = true;
114 }
115 return flag;
116 }
117
118 static void GNUC3_ATTRIBUTE(__noreturn__)
119 fdOutOfRange (int fd)
120 {
121 errorBelch("file descriptor %d out of range for select (0--%d).\n"
122 "Recompile with -threaded to work around this.",
123 fd, (int)FD_SETSIZE);
124 stg_exit(EXIT_FAILURE);
125 }
126
127 /*
128 * State of individual file descriptor after a 'select()' poll.
129 */
130 enum FdState {
131 RTS_FD_IS_READY = 0,
132 RTS_FD_IS_BLOCKING,
133 RTS_FD_IS_INVALID,
134 };
135
136 static enum FdState fdPollReadState (int fd)
137 {
138 int r;
139 fd_set rfd;
140 struct timeval now;
141
142 FD_ZERO(&rfd);
143 FD_SET(fd, &rfd);
144
145 /* only poll */
146 now.tv_sec = 0;
147 now.tv_usec = 0;
148 for (;;)
149 {
150 r = select(fd+1, &rfd, NULL, NULL, &now);
151 /* the descriptor is sane */
152 if (r != -1)
153 break;
154
155 switch (errno)
156 {
157 case EBADF: return RTS_FD_IS_INVALID;
158 case EINTR: continue;
159 default:
160 sysErrorBelch("select");
161 stg_exit(EXIT_FAILURE);
162 }
163 }
164
165 if (r == 0)
166 return RTS_FD_IS_BLOCKING;
167 else
168 return RTS_FD_IS_READY;
169 }
170
171 static enum FdState fdPollWriteState (int fd)
172 {
173 int r;
174 fd_set wfd;
175 struct timeval now;
176
177 FD_ZERO(&wfd);
178 FD_SET(fd, &wfd);
179
180 /* only poll */
181 now.tv_sec = 0;
182 now.tv_usec = 0;
183 for (;;)
184 {
185 r = select(fd+1, NULL, &wfd, NULL, &now);
186 /* the descriptor is sane */
187 if (r != -1)
188 break;
189
190 switch (errno)
191 {
192 case EBADF: return RTS_FD_IS_INVALID;
193 case EINTR: continue;
194 default:
195 sysErrorBelch("select");
196 stg_exit(EXIT_FAILURE);
197 }
198 }
199
200 if (r == 0)
201 return RTS_FD_IS_BLOCKING;
202 else
203 return RTS_FD_IS_READY;
204 }
205
206 /* Argument 'wait' says whether to wait for I/O to become available,
207 * or whether to just check and return immediately. If there are
208 * other threads ready to run, we normally do the non-waiting variety,
209 * otherwise we wait (see Schedule.c).
210 *
211 * SMP note: must be called with sched_mutex locked.
212 *
213 * Windows: select only works on sockets, so this doesn't really work,
214 * though it makes things better than before. MsgWaitForMultipleObjects
215 * should really be used, though it only seems to work for read handles,
216 * not write handles.
217 *
218 */
219 void
220 awaitEvent(bool wait)
221 {
222 StgTSO *tso, *prev, *next;
223 fd_set rfd,wfd;
224 int numFound;
225 int maxfd = -1;
226 bool seen_bad_fd = false;
227 struct timeval tv, *ptv;
228 LowResTime now;
229
230 IF_DEBUG(scheduler,
231 debugBelch("scheduler: checking for threads blocked on I/O");
232 if (wait) {
233 debugBelch(" (waiting)");
234 }
235 debugBelch("\n");
236 );
237
238 /* loop until we've woken up some threads. This loop is needed
239 * because the select timing isn't accurate, we sometimes sleep
240 * for a while but not long enough to wake up a thread in
241 * a threadDelay.
242 */
243 do {
244
245 now = getLowResTimeOfDay();
246 if (wakeUpSleepingThreads(now)) {
247 return;
248 }
249
250 /*
251 * Collect all of the fd's that we're interested in
252 */
253 FD_ZERO(&rfd);
254 FD_ZERO(&wfd);
255
256 for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
257 next = tso->_link;
258
259 /* On older FreeBSDs, FD_SETSIZE is unsigned. Cast it to signed int
260 * in order to switch off the 'comparison between signed and
261 * unsigned error message
262 * Newer versions of FreeBSD have switched to unsigned int:
263 * https://github.com/freebsd/freebsd/commit/12ae7f74a071f0439763986026525094a7032dfd
264 * http://fa.freebsd.cvs-all.narkive.com/bCWNHbaC/svn-commit-r265051-head-sys-sys
265 * So the (int) cast should be removed across the code base once
266 * GHC requires a version of FreeBSD that has that change in it.
267 */
268 switch (tso->why_blocked) {
269 case BlockedOnRead:
270 {
271 int fd = tso->block_info.fd;
272 if ((fd >= (int)FD_SETSIZE) || (fd < 0)) {
273 fdOutOfRange(fd);
274 }
275 maxfd = (fd > maxfd) ? fd : maxfd;
276 FD_SET(fd, &rfd);
277 continue;
278 }
279
280 case BlockedOnWrite:
281 {
282 int fd = tso->block_info.fd;
283 if ((fd >= (int)FD_SETSIZE) || (fd < 0)) {
284 fdOutOfRange(fd);
285 }
286 maxfd = (fd > maxfd) ? fd : maxfd;
287 FD_SET(fd, &wfd);
288 continue;
289 }
290
291 default:
292 barf("AwaitEvent");
293 }
294 }
295
296 if (!wait) {
297 // just poll
298 tv.tv_sec = 0;
299 tv.tv_usec = 0;
300 ptv = &tv;
301 } else if (sleeping_queue != END_TSO_QUEUE) {
302 /* SUSv2 allows implementations to have an implementation defined
303 * maximum timeout for select(2). The standard requires
304 * implementations to silently truncate values exceeding this maximum
305 * to the maximum. Unfortunately, OSX and the BSD don't comply with
306 * SUSv2, instead opting to return EINVAL for values exceeding a
307 * timeout of 1e8.
308 *
309 * Select returning an error crashes the runtime in a bad way. To
310 * play it safe we truncate any timeout to 31 days, as SUSv2 requires
311 * any implementations maximum timeout to be larger than this.
312 *
313 * Truncating the timeout is not an issue, because if nothing
314 * interesting happens when the timeout expires, we'll see that the
315 * thread still wants to be blocked longer and simply block on a new
316 * iteration of select(2).
317 */
318 const time_t max_seconds = 2678400; // 31 * 24 * 60 * 60
319
320 Time min = LowResTimeToTime(sleeping_queue->block_info.target - now);
321 tv.tv_sec = TimeToSeconds(min);
322 if (tv.tv_sec < max_seconds) {
323 tv.tv_usec = TimeToUS(min) % 1000000;
324 } else {
325 tv.tv_sec = max_seconds;
326 tv.tv_usec = 0;
327 }
328 ptv = &tv;
329 } else {
330 ptv = NULL;
331 }
332
333 /* Check for any interesting events */
334
335 while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, ptv)) < 0) {
336 if (errno != EINTR) {
337 if ( errno == EBADF ) {
338 seen_bad_fd = true;
339 break;
340 } else {
341 sysErrorBelch("select");
342 stg_exit(EXIT_FAILURE);
343 }
344 }
345
346 /* We got a signal; could be one of ours. If so, we need
347 * to start up the signal handler straight away, otherwise
348 * we could block for a long time before the signal is
349 * serviced.
350 */
351 #if defined(RTS_USER_SIGNALS)
352 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
353 startSignalHandlers(&MainCapability);
354 return; /* still hold the lock */
355 }
356 #endif
357
358 /* we were interrupted, return to the scheduler immediately.
359 */
360 if (sched_state >= SCHED_INTERRUPTING) {
361 return; /* still hold the lock */
362 }
363
364 /* check for threads that need waking up
365 */
366 wakeUpSleepingThreads(getLowResTimeOfDay());
367
368 /* If new runnable threads have arrived, stop waiting for
369 * I/O and run them.
370 */
371 if (!emptyRunQueue(&MainCapability)) {
372 return; /* still hold the lock */
373 }
374 }
375
376 /* Step through the waiting queue, unblocking every thread that now has
377 * a file descriptor in a ready state.
378 */
379
380 prev = NULL;
381 {
382 /*
383 * The queue is being rebuilt in this loop:
384 * 'blocked_queue_hd' will contain already
385 * traversed blocked TSOs. As a result you
386 * can't use functions accessing 'blocked_queue_hd'.
387 */
388 for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
389 next = tso->_link;
390 int fd;
391 enum FdState fd_state = RTS_FD_IS_BLOCKING;
392
393 switch (tso->why_blocked) {
394 case BlockedOnRead:
395 fd = tso->block_info.fd;
396
397 if (seen_bad_fd) {
398 fd_state = fdPollReadState (fd);
399 } else if (FD_ISSET(fd, &rfd)) {
400 fd_state = RTS_FD_IS_READY;
401 }
402 break;
403 case BlockedOnWrite:
404 fd = tso->block_info.fd;
405
406 if (seen_bad_fd) {
407 fd_state = fdPollWriteState (fd);
408 } else if (FD_ISSET(fd, &wfd)) {
409 fd_state = RTS_FD_IS_READY;
410 }
411 break;
412 default:
413 barf("awaitEvent");
414 }
415
416 switch (fd_state) {
417 case RTS_FD_IS_INVALID:
418 /*
419 * Don't let RTS loop on such descriptors,
420 * pass an IOError to blocked threads (#4934)
421 */
422 IF_DEBUG(scheduler,
423 debugBelch("Killing blocked thread %lu on bad fd=%i\n",
424 (unsigned long)tso->id, fd));
425 raiseAsync(&MainCapability, tso,
426 (StgClosure *)blockedOnBadFD_closure, false, NULL);
427 break;
428 case RTS_FD_IS_READY:
429 IF_DEBUG(scheduler,
430 debugBelch("Waking up blocked thread %lu\n",
431 (unsigned long)tso->id));
432 tso->why_blocked = NotBlocked;
433 tso->_link = END_TSO_QUEUE;
434 pushOnRunQueue(&MainCapability,tso);
435 break;
436 case RTS_FD_IS_BLOCKING:
437 if (prev == NULL)
438 blocked_queue_hd = tso;
439 else
440 setTSOLink(&MainCapability, prev, tso);
441 prev = tso;
442 break;
443 }
444 }
445
446 if (prev == NULL)
447 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
448 else {
449 prev->_link = END_TSO_QUEUE;
450 blocked_queue_tl = prev;
451 }
452 }
453
454 } while (wait && sched_state == SCHED_RUNNING
455 && emptyRunQueue(&MainCapability));
456 }
457
458 #endif /* THREADED_RTS */