Raise exceptions when blocked in bad FDs (fixes Trac #4934)
authorSergei Trofimovich <slyfox@gentoo.org>
Fri, 23 May 2014 20:58:06 +0000 (23:58 +0300)
committerSimon Marlow <marlowsd@gmail.com>
Sun, 8 Jun 2014 10:21:11 +0000 (11:21 +0100)
Before the patch any call to 'select()' with 'bad_fd' led to:
- unblocking of all threads
- hiding exception for 'threadWaitRead bad_fd'

The patch fixes both cases in this way:
after 'select()' failure we iterate over each blocked descriptor
and poll individually to see it's actual status, which is:
- READY (move to run queue)
- BLOCKED (leave in blocked queue)
- INVALID (send an IOErrror exception)

Signed-off-by: Sergei Trofimovich <slyfox@gentoo.org>
libraries/base/GHC/Event/Thread.hs
rts/Prelude.h
rts/RtsStartup.c
rts/package.conf.in
rts/posix/Select.c
rts/win32/libHSbase.def

index c599047..6e991bf 100644 (file)
@@ -12,9 +12,10 @@ module GHC.Event.Thread
     , closeFdWith
     , threadDelay
     , registerDelay
+    , blockedOnBadFD -- used by RTS
     ) where
 
-import Control.Exception (finally)
+import Control.Exception (finally, SomeException, toException)
 import Control.Monad (forM, forM_, sequence_, zipWithM, when)
 import Data.IORef (IORef, newIORef, readIORef, writeIORef)
 import Data.List (zipWith3)
@@ -115,6 +116,9 @@ threadWait evt fd = mask_ $ do
     then ioError $ errnoToIOError "threadWait" eBADF Nothing Nothing
     else return ()
 
+-- used at least by RTS in 'select()' IO manager backend
+blockedOnBadFD :: SomeException
+blockedOnBadFD = toException $ errnoToIOError "awaitEvent" eBADF Nothing Nothing
 
 threadWaitSTM :: Event -> Fd -> IO (STM (), IO ())
 threadWaitSTM evt fd = mask_ $ do
index 89e80a0..0c54148 100644 (file)
@@ -42,6 +42,7 @@ PRELUDE_CLOSURE(base_GHCziIOziException_blockedIndefinitelyOnMVar_closure);
 PRELUDE_CLOSURE(base_GHCziIOziException_blockedIndefinitelyOnSTM_closure);
 PRELUDE_CLOSURE(base_ControlziExceptionziBase_nonTermination_closure);
 PRELUDE_CLOSURE(base_ControlziExceptionziBase_nestedAtomically_closure);
+PRELUDE_CLOSURE(base_GHCziEventziThread_blockedOnBadFD_closure);
 
 PRELUDE_CLOSURE(base_GHCziConcziSync_runSparks_closure);
 PRELUDE_CLOSURE(base_GHCziConcziIO_ensureIOManagerIsRunning_closure);
@@ -104,6 +105,7 @@ PRELUDE_INFO(base_GHCziStable_StablePtr_con_info);
 #define blockedIndefinitelyOnSTM_closure DLL_IMPORT_DATA_REF(base_GHCziIOziException_blockedIndefinitelyOnSTM_closure)
 #define nonTermination_closure    DLL_IMPORT_DATA_REF(base_ControlziExceptionziBase_nonTermination_closure)
 #define nestedAtomically_closure  DLL_IMPORT_DATA_REF(base_ControlziExceptionziBase_nestedAtomically_closure)
+#define blockedOnBadFD_closure    DLL_IMPORT_DATA_REF(base_GHCziEventziThread_blockedOnBadFD_closure)
 
 #define Czh_static_info           DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_Czh_static_info)
 #define Fzh_static_info           DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_Fzh_static_info)
index 06e888c..c9f5880 100644 (file)
@@ -209,6 +209,7 @@ hs_init_ghc(int *argc, char **argv[], RtsConfig rts_config)
     getStablePtr((StgPtr)nonTermination_closure);
     getStablePtr((StgPtr)blockedIndefinitelyOnSTM_closure);
     getStablePtr((StgPtr)nestedAtomically_closure);
+    getStablePtr((StgPtr)blockedOnBadFD_closure);
 
     getStablePtr((StgPtr)runSparks_closure);
     getStablePtr((StgPtr)ensureIOManagerIsRunning_closure);
index 4c8686f..8250bc2 100644 (file)
@@ -99,6 +99,7 @@ ld-options:
          , "-Wl,-u,_base_GHCziIOziException_blockedIndefinitelyOnMVar_closure"
          , "-Wl,-u,_base_GHCziIOziException_blockedIndefinitelyOnSTM_closure"
          , "-Wl,-u,_base_ControlziExceptionziBase_nestedAtomically_closure"
+         , "-Wl,-u,_base_GHCziEventziThread_blockedOnBadFD_closure"
          , "-Wl,-u,_base_GHCziWeak_runFinalizzerBatch_closure"
          , "-Wl,-u,_base_GHCziTopHandler_flushStdHandles_closure"
          , "-Wl,-u,_base_GHCziTopHandler_runIO_closure"
@@ -139,6 +140,7 @@ ld-options:
          , "-Wl,-u,base_GHCziIOziException_blockedIndefinitelyOnMVar_closure"
          , "-Wl,-u,base_GHCziIOziException_blockedIndefinitelyOnSTM_closure"
          , "-Wl,-u,base_ControlziExceptionziBase_nestedAtomically_closure"
+         , "-Wl,-u,base_GHCziEventziThread_blockedOnBadFD_closure"
          , "-Wl,-u,base_GHCziWeak_runFinalizzerBatch_closure"
          , "-Wl,-u,base_GHCziTopHandler_flushStdHandles_closure"
          , "-Wl,-u,base_GHCziTopHandler_runIO_closure"
index 3d92a46..a101f03 100644 (file)
@@ -14,6 +14,8 @@
 
 #include "Signals.h"
 #include "Schedule.h"
+#include "Prelude.h"
+#include "RaiseAsync.h"
 #include "RtsUtils.h"
 #include "Itimer.h"
 #include "Capability.h"
@@ -120,6 +122,85 @@ fdOutOfRange (int fd)
     stg_exit(EXIT_FAILURE);
 }
 
+/*
+ * State of individual file descriptor after a 'select()' poll.
+ */
+enum FdState {
+    RTS_FD_IS_READY = 0,
+    RTS_FD_IS_BLOCKING,
+    RTS_FD_IS_INVALID,
+};
+
+static enum FdState fdPollReadState (int fd)
+{
+    int r;
+    fd_set rfd;
+    struct timeval now;
+
+    FD_ZERO(&rfd);
+    FD_SET(fd, &rfd);
+
+    /* only poll */
+    now.tv_sec  = 0;
+    now.tv_usec = 0;
+    for (;;)
+    {
+        r = select(fd+1, &rfd, NULL, NULL, &now);
+        /* the descriptor is sane */
+        if (r != -1)
+            break;
+
+        switch (errno)
+        {
+            case EBADF: return RTS_FD_IS_INVALID;
+            case EINTR: continue;
+            default:
+                sysErrorBelch("select");
+                stg_exit(EXIT_FAILURE);
+        }
+    }
+
+    if (r == 0)
+        return RTS_FD_IS_BLOCKING;
+    else
+        return RTS_FD_IS_READY;
+}
+
+static enum FdState fdPollWriteState (int fd)
+{
+    int r;
+    fd_set wfd;
+    struct timeval now;
+
+    FD_ZERO(&wfd);
+    FD_SET(fd, &wfd);
+
+    /* only poll */
+    now.tv_sec  = 0;
+    now.tv_usec = 0;
+    for (;;)
+    {
+        r = select(fd+1, NULL, &wfd, NULL, &now);
+        /* the descriptor is sane */
+        if (r != -1)
+            break;
+
+        switch (errno)
+        {
+            case EBADF: return RTS_FD_IS_INVALID;
+            case EINTR: continue;
+            default:
+                sysErrorBelch("select");
+                stg_exit(EXIT_FAILURE);
+        }
+    }
+
+    if (r == 0)
+        return RTS_FD_IS_BLOCKING;
+    else
+        return RTS_FD_IS_READY;
+}
+
 /* Argument 'wait' says whether to wait for I/O to become available,
  * or whether to just check and return immediately.  If there are
  * other threads ready to run, we normally do the non-waiting variety,
@@ -137,12 +218,10 @@ void
 awaitEvent(rtsBool wait)
 {
     StgTSO *tso, *prev, *next;
-    rtsBool ready;
     fd_set rfd,wfd;
     int numFound;
     int maxfd = -1;
-    rtsBool select_succeeded = rtsTrue;
-    rtsBool unblock_all = rtsFalse;
+    rtsBool seen_bad_fd = rtsFalse;
     struct timeval tv, *ptv;
     LowResTime now;
 
@@ -225,25 +304,8 @@ awaitEvent(rtsBool wait)
       
       while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, ptv)) < 0) {
          if (errno != EINTR) {
-           /* Handle bad file descriptors by unblocking all the
-              waiting threads. Why? Because a thread might have been
-              a bit naughty and closed a file descriptor while another
-              was blocked waiting. This is less-than-good programming
-              practice, but having the RTS as a result fall over isn't
-              acceptable, so we simply unblock all the waiting threads
-              should we see a bad file descriptor & give the threads
-              a chance to clean up their act. 
-              
-              Note: assume here that threads becoming unblocked
-              will try to read/write the file descriptor before trying
-              to issue a threadWaitRead/threadWaitWrite again (==> an
-              IOError will result for the thread that's got the bad
-              file descriptor.) Hence, there's no danger of a bad
-              file descriptor being repeatedly select()'ed on, so
-              the RTS won't loop.
-           */
            if ( errno == EBADF ) {
-                unblock_all = rtsTrue;
+                seen_bad_fd = rtsTrue;
                 break;
            } else {
                 sysErrorBelch("select");
@@ -286,33 +348,58 @@ awaitEvent(rtsBool wait)
        */
 
       prev = NULL;
-      if (select_succeeded || unblock_all) {
-         for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
-             next = tso->_link;
+      {
+          for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
+              next = tso->_link;
+              int fd;
+              enum FdState fd_state = RTS_FD_IS_BLOCKING;
 
               switch (tso->why_blocked) {
-             case BlockedOnRead:
-                 ready = unblock_all || FD_ISSET(tso->block_info.fd, &rfd);
-                 break;
-             case BlockedOnWrite:
-                 ready = unblock_all || FD_ISSET(tso->block_info.fd, &wfd);
-                 break;
-             default:
-                 barf("awaitEvent");
-             }
-      
-             if (ready) {
-               IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %lu\n", (unsigned long)tso->id));
-                 tso->why_blocked = NotBlocked;
-                 tso->_link = END_TSO_QUEUE;
-                 pushOnRunQueue(&MainCapability,tso);
-             } else {
-                 if (prev == NULL)
-                     blocked_queue_hd = tso;
-                 else
-                     setTSOLink(&MainCapability, prev, tso);
-                 prev = tso;
-             }
+              case BlockedOnRead:
+                  fd = tso->block_info.fd;
+
+                  if (seen_bad_fd) {
+                      fd_state = fdPollReadState (fd);
+                  } else if (FD_ISSET(fd, &rfd)) {
+                      fd_state = RTS_FD_IS_READY;
+                  }
+                  break;
+              case BlockedOnWrite:
+                  fd = tso->block_info.fd;
+
+                  if (seen_bad_fd) {
+                      fd_state = fdPollWriteState (fd);
+                  } else if (FD_ISSET(fd, &wfd)) {
+                      fd_state = RTS_FD_IS_READY;
+                  }
+                  break;
+              default:
+                  barf("awaitEvent");
+              }
+
+              switch (fd_state) {
+              case RTS_FD_IS_INVALID:
+                  /*
+                   * Don't let RTS loop on such descriptors,
+                   * pass an IOError to blocked threads (Trac #4934)
+                   */
+                  IF_DEBUG(scheduler,debugBelch("Killing blocked thread %lu on bad fd=%i\n", (unsigned long)tso->id, fd));
+                  throwToSingleThreaded(&MainCapability, tso, (StgClosure *)blockedOnBadFD_closure);
+                  break;
+              case RTS_FD_IS_READY:
+                  IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %lu\n", (unsigned long)tso->id));
+                  tso->why_blocked = NotBlocked;
+                  tso->_link = END_TSO_QUEUE;
+                  pushOnRunQueue(&MainCapability,tso);
+                  break;
+              case RTS_FD_IS_BLOCKING:
+                  if (prev == NULL)
+                      blocked_queue_hd = tso;
+                  else
+                      setTSOLink(&MainCapability, prev, tso);
+                  prev = tso;
+                  break;
+              }
          }
 
          if (prev == NULL)
index 119237b..8140528 100644 (file)
@@ -40,5 +40,4 @@ EXPORTS
 
        base_ControlziExceptionziBase_nonTermination_closure
        base_ControlziExceptionziBase_nestedAtomically_closure
-
-
+       base_GHCziEventziThread_blockedOnBadFD_closure