Split part of the Task struct into a separate struct InCall
authorSimon Marlow <marlowsd@gmail.com>
Tue, 9 Mar 2010 14:31:11 +0000 (14:31 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Tue, 9 Mar 2010 14:31:11 +0000 (14:31 +0000)
The idea is that this leaves Tasks and OSThread in one-to-one
correspondence.  The part of a Task that represents a call into
Haskell from C is split into a separate struct InCall, pointed to by
the Task and the TSO bound to it.  A given OSThread/Task thus always
uses the same mutex and condition variable, rather than getting a new
one for each callback.  Conceptually it is simpler, although there are
more types and indirections in a few places now.

This improves callback performance by removing some of the locks that
we had to take when making in-calls.  Now we also keep the current Task
in a thread-local variable if supported by the OS and gcc (currently
only Linux).

includes/rts/storage/TSO.h
rts/Capability.c
rts/Capability.h
rts/Schedule.c
rts/Schedule.h
rts/Stats.c
rts/Task.c
rts/Task.h
rts/Threads.c
rts/sm/Compact.c

index b00f5d4..e8d97c5 100644 (file)
@@ -114,7 +114,7 @@ typedef struct StgTSO_ {
     StgTSOBlockInfo         block_info;
     StgThreadID             id;
     int                     saved_errno;
-    struct Task_*           bound;
+    struct InCall_*         bound;
     struct Capability_*     cap;
     struct StgTRecHeader_ * trec;       /* STM transaction record */
 
index cf85372..ce6eceb 100644 (file)
@@ -173,10 +173,10 @@ STATIC_INLINE void
 newReturningTask (Capability *cap, Task *task)
 {
     ASSERT_LOCK_HELD(&cap->lock);
-    ASSERT(task->return_link == NULL);
+    ASSERT(task->next == NULL);
     if (cap->returning_tasks_hd) {
-       ASSERT(cap->returning_tasks_tl->return_link == NULL);
-       cap->returning_tasks_tl->return_link = task;
+       ASSERT(cap->returning_tasks_tl->next == NULL);
+       cap->returning_tasks_tl->next = task;
     } else {
        cap->returning_tasks_hd = task;
     }
@@ -190,11 +190,11 @@ popReturningTask (Capability *cap)
     Task *task;
     task = cap->returning_tasks_hd;
     ASSERT(task);
-    cap->returning_tasks_hd = task->return_link;
+    cap->returning_tasks_hd = task->next;
     if (!cap->returning_tasks_hd) {
        cap->returning_tasks_tl = NULL;
     }
-    task->return_link = NULL;
+    task->next = NULL;
     return task;
 }
 #endif
@@ -220,7 +220,7 @@ initCapability( Capability *cap, nat i )
     initMutex(&cap->lock);
     cap->running_task      = NULL; // indicates cap is free
     cap->spare_workers     = NULL;
-    cap->suspended_ccalling_tasks = NULL;
+    cap->suspended_ccall = NULL;
     cap->returning_tasks_hd = NULL;
     cap->returning_tasks_tl = NULL;
     cap->wakeup_queue_hd    = END_TSO_QUEUE;
@@ -342,7 +342,7 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
     ASSERT_LOCK_HELD(&cap->lock);
     ASSERT(task->cap == cap);
     debugTrace(DEBUG_sched, "passing capability %d to %s %p",
-               cap->no, task->tso ? "bound task" : "worker",
+               cap->no, task->incall->tso ? "bound task" : "worker",
                (void *)task->id);
     ACQUIRE_LOCK(&task->lock);
     task->wakeup = rtsTrue;
@@ -398,7 +398,7 @@ releaseCapability_ (Capability* cap,
         // assertion is false: in schedule() we force a yield after
        // ThreadBlocked, but the thread may be back on the run queue
        // by now.
-       task = cap->run_queue_hd->bound;
+       task = cap->run_queue_hd->bound->task;
        giveCapabilityToTask(cap,task);
        return;
     }
@@ -411,7 +411,7 @@ releaseCapability_ (Capability* cap,
        if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
            debugTrace(DEBUG_sched,
                       "starting new worker on capability %d", cap->no);
-           startWorkerTask(cap, workerStart);
+           startWorkerTask(cap);
            return;
        }
     }
@@ -462,9 +462,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
     // in which case it is not replaced on the spare_worker queue.
     // This happens when the system is shutting down (see
     // Schedule.c:workerStart()).
-    // Also, be careful to check that this task hasn't just exited
-    // Haskell to do a foreign call (task->suspended_tso).
-    if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
+    if (!isBoundTask(task) && !task->stopped) {
        task->next = cap->spare_workers;
        cap->spare_workers = task;
     }
@@ -612,7 +610,7 @@ yieldCapability (Capability** pCap, Task *task)
                continue;
            }
 
-           if (task->tso == NULL) {
+           if (task->incall->tso == NULL) {
                ASSERT(cap->spare_workers != NULL);
                // if we're not at the front of the queue, release it
                // again.  This is unlikely to happen.
@@ -655,12 +653,12 @@ wakeupThreadOnCapability (Capability *my_cap,
 
     // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
     if (tso->bound) {
-       ASSERT(tso->bound->cap == tso->cap);
-       tso->bound->cap = other_cap;
+       ASSERT(tso->bound->task->cap == tso->cap);
+       tso->bound->task->cap = other_cap;
     }
     tso->cap = other_cap;
 
-    ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
+    ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1);
 
     if (other_cap->running_task == NULL) {
        // nobody is running this Capability, we can add our thread
@@ -781,7 +779,7 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
         // that will try to return to code that has been unloaded.
         // We can be a bit more relaxed when this is a standalone
         // program that is about to terminate, and let safe=false.
-        if (cap->suspended_ccalling_tasks && safe) {
+        if (cap->suspended_ccalls && safe) {
            debugTrace(DEBUG_sched, 
                       "thread(s) are involved in foreign calls, yielding");
             cap->running_task = NULL;
@@ -871,7 +869,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
 {
     nat i;
     Capability *cap;
-    Task *task;
+    InCall *incall;
 
     // Each GC thread is responsible for following roots from the
     // Capability of the same number.  There will usually be the same
@@ -886,9 +884,9 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
        evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
        evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
 #endif
-       for (task = cap->suspended_ccalling_tasks; task != NULL; 
-            task=task->next) {
-           evac(user, (StgClosure **)(void *)&task->suspended_tso);
+       for (incall = cap->suspended_ccalls; incall != NULL; 
+            incall=incall->next) {
+           evac(user, (StgClosure **)(void *)&incall->suspended_tso);
        }
 
 #if defined(THREADED_RTS)
index 4b51548..41974dc 100644 (file)
@@ -56,7 +56,7 @@ struct Capability_ {
     // the suspended TSOs easily.  Hence, when migrating a Task from
     // the returning_tasks list, we must also migrate its entry from
     // this list.
-    Task *suspended_ccalling_tasks;
+    InCall *suspended_ccalls;
 
     // One mutable list per generation, so we don't need to take any
     // locks when updating an old-generation thunk.  This also lets us
index 66af8be..4cca469 100644 (file)
@@ -312,7 +312,7 @@ schedule (Capability *initialCapability, Task *task)
        // If we are a worker, just exit.  If we're a bound thread
        // then we will exit below when we've removed our TSO from
        // the run queue.
-       if (task->tso == NULL && emptyRunQueue(cap)) {
+       if (!isBoundTask(task) && emptyRunQueue(cap)) {
            return cap;
        }
        break;
@@ -378,10 +378,10 @@ schedule (Capability *initialCapability, Task *task)
     // Check whether we can run this thread in the current task.
     // If not, we have to pass our capability to the right task.
     {
-       Task *bound = t->bound;
+        InCall *bound = t->bound;
       
        if (bound) {
-           if (bound == task) {
+           if (bound->task == task) {
                // yes, the Haskell thread is bound to the current native thread
            } else {
                debugTrace(DEBUG_sched,
@@ -393,7 +393,7 @@ schedule (Capability *initialCapability, Task *task)
            }
        } else {
            // The thread we want to run is unbound.
-           if (task->tso) { 
+           if (task->incall->tso) { 
                debugTrace(DEBUG_sched,
                           "this OS thread cannot run thread %lu",
                            (unsigned long)t->id);
@@ -441,7 +441,7 @@ run_thread:
 
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
     ASSERT(t->cap == cap);
-    ASSERT(t->bound ? t->bound->cap == cap : 1);
+    ASSERT(t->bound ? t->bound->task->cap == cap : 1);
 
     prev_what_next = t->what_next;
 
@@ -639,9 +639,9 @@ shouldYieldCapability (Capability *cap, Task *task)
     //     and this task it bound).
     return (waiting_for_gc || 
             cap->returning_tasks_hd != NULL ||
-            (!emptyRunQueue(cap) && (task->tso == NULL
+            (!emptyRunQueue(cap) && (task->incall->tso == NULL
                                      ? cap->run_queue_hd->bound != NULL
-                                     : cap->run_queue_hd->bound != task)));
+                                     : cap->run_queue_hd->bound != task->incall)));
 }
 
 // This is the single place where a Task goes to sleep.  There are
@@ -768,7 +768,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                next = t->_link;
                t->_link = END_TSO_QUEUE;
                if (t->what_next == ThreadRelocated
-                   || t->bound == task // don't move my bound thread
+                   || t->bound == task->incall // don't move my bound thread
                    || tsoLocked(t)) {  // don't move a locked thread
                    setTSOLink(cap, prev, t);
                    prev = t;
@@ -781,9 +781,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                } else {
                    appendToRunQueue(free_caps[i],t);
 
-            traceEventMigrateThread (cap, t, free_caps[i]->no);
+                    traceEventMigrateThread (cap, t, free_caps[i]->no);
 
-                   if (t->bound) { t->bound->cap = free_caps[i]; }
+                   if (t->bound) { t->bound->task->cap = free_caps[i]; }
                    t->cap = free_caps[i];
                    i++;
                }
@@ -979,13 +979,13 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
        /* Probably a real deadlock.  Send the current main thread the
         * Deadlock exception.
         */
-       if (task->tso) {
-           switch (task->tso->why_blocked) {
+       if (task->incall->tso) {
+           switch (task->incall->tso->why_blocked) {
            case BlockedOnSTM:
            case BlockedOnBlackHole:
            case BlockedOnException:
            case BlockedOnMVar:
-               throwToSingleThreaded(cap, task->tso, 
+               throwToSingleThreaded(cap, task->incall->tso, 
                                      (StgClosure *)nonTermination_closure);
                return;
            default:
@@ -1174,8 +1174,8 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
        /* The TSO attached to this Task may have moved, so update the
         * pointer to it.
         */
-       if (task->tso == t) {
-           task->tso = new_t;
+       if (task->incall->tso == t) {
+           task->incall->tso = new_t;
        }
        pushOnRunQueue(cap,new_t);
     }
@@ -1285,7 +1285,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
 
       if (t->bound) {
 
-         if (t->bound != task) {
+         if (t->bound != task->incall) {
 #if !defined(THREADED_RTS)
              // Must be a bound thread that is not the topmost one.  Leave
              // it on the run queue until the stack has unwound to the
@@ -1302,12 +1302,12 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
 #endif
          }
 
-         ASSERT(task->tso == t);
+         ASSERT(task->incall->tso == t);
 
          if (t->what_next == ThreadComplete) {
              if (task->ret) {
                  // NOTE: return val is tso->sp[1] (see StgStartup.hc)
-                 *(task->ret) = (StgClosure *)task->tso->sp[1]; 
+                 *(task->ret) = (StgClosure *)task->incall->tso->sp[1]; 
              }
              task->stat = Success;
          } else {
@@ -1325,7 +1325,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
              }
          }
 #ifdef DEBUG
-         removeThreadLabel((StgWord)task->tso->id);
+         removeThreadLabel((StgWord)task->incall->tso->id);
 #endif
 
           // We no longer consider this thread and task to be bound to
@@ -1336,7 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
           // re-used). This was a real bug: the GC updated
           // tso->bound->tso which lead to a deadlock.
           t->bound = NULL;
-          task->tso = NULL;
+          task->incall->tso = NULL;
 
          return rtsTrue; // tells schedule() to return
       }
@@ -1586,7 +1586,6 @@ forkProcess(HsStablePtr *entry
            )
 {
 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
-    Task *task;
     pid_t pid;
     StgTSO* t,*next;
     Capability *cap;
@@ -1661,7 +1660,7 @@ forkProcess(HsStablePtr *entry
 
        // Any suspended C-calling Tasks are no more, their OS threads
        // don't exist now:
-       cap->suspended_ccalling_tasks = NULL;
+       cap->suspended_ccalls = NULL;
 
        // Empty the threads lists.  Otherwise, the garbage
        // collector may attempt to resurrect some of these threads.
@@ -1669,17 +1668,7 @@ forkProcess(HsStablePtr *entry
             generations[g].threads = END_TSO_QUEUE;
         }
 
-       // Wipe the task list, except the current Task.
-       ACQUIRE_LOCK(&sched_mutex);
-       for (task = all_tasks; task != NULL; task=task->all_link) {
-           if (task != cap->running_task) {
-#if defined(THREADED_RTS)
-                initMutex(&task->lock); // see #1391
-#endif
-               discardTask(task);
-           }
-       }
-       RELEASE_LOCK(&sched_mutex);
+        discardTasksExcept(cap->running_task);
 
 #if defined(THREADED_RTS)
        // Wipe our spare workers list, they no longer exist.  New
@@ -1747,35 +1736,41 @@ deleteAllThreads ( Capability *cap )
 }
 
 /* -----------------------------------------------------------------------------
-   Managing the suspended_ccalling_tasks list.
+   Managing the suspended_ccalls list.
    Locks required: sched_mutex
    -------------------------------------------------------------------------- */
 
 STATIC_INLINE void
 suspendTask (Capability *cap, Task *task)
 {
-    ASSERT(task->next == NULL && task->prev == NULL);
-    task->next = cap->suspended_ccalling_tasks;
-    task->prev = NULL;
-    if (cap->suspended_ccalling_tasks) {
-       cap->suspended_ccalling_tasks->prev = task;
-    }
-    cap->suspended_ccalling_tasks = task;
+    InCall *incall;
+    
+    incall = task->incall;
+    ASSERT(incall->next == NULL && incall->prev == NULL);
+    incall->next = cap->suspended_ccalls;
+    incall->prev = NULL;
+    if (cap->suspended_ccalls) {
+       cap->suspended_ccalls->prev = incall;
+    }
+    cap->suspended_ccalls = incall;
 }
 
 STATIC_INLINE void
 recoverSuspendedTask (Capability *cap, Task *task)
 {
-    if (task->prev) {
-       task->prev->next = task->next;
+    InCall *incall;
+
+    incall = task->incall;
+    if (incall->prev) {
+       incall->prev->next = incall->next;
     } else {
-       ASSERT(cap->suspended_ccalling_tasks == task);
-       cap->suspended_ccalling_tasks = task->next;
+       ASSERT(cap->suspended_ccalls == incall);
+       cap->suspended_ccalls = incall->next;
     }
-    if (task->next) {
-       task->next->prev = task->prev;
+    if (incall->next) {
+       incall->next->prev = incall->prev;
     }
-    task->next = task->prev = NULL;
+    incall->next = incall->prev = NULL;
 }
 
 /* ---------------------------------------------------------------------------
@@ -1832,7 +1827,8 @@ suspendThread (StgRegTable *reg)
   }
 
   // Hand back capability
-  task->suspended_tso = tso;
+  task->incall->suspended_tso = tso;
+  task->incall->suspended_cap = cap;
 
   ACQUIRE_LOCK(&cap->lock);
 
@@ -1853,6 +1849,7 @@ StgRegTable *
 resumeThread (void *task_)
 {
     StgTSO *tso;
+    InCall *incall;
     Capability *cap;
     Task *task = task_;
     int saved_errno;
@@ -1865,18 +1862,22 @@ resumeThread (void *task_)
     saved_winerror = GetLastError();
 #endif
 
-    cap = task->cap;
+    incall = task->incall;
+    cap = incall->suspended_cap;
+    task->cap = cap;
+
     // Wait for permission to re-enter the RTS with the result.
     waitForReturnCapability(&cap,task);
     // we might be on a different capability now... but if so, our
-    // entry on the suspended_ccalling_tasks list will also have been
+    // entry on the suspended_ccalls list will also have been
     // migrated.
 
     // Remove the thread from the suspended list
     recoverSuspendedTask(cap,task);
 
-    tso = task->suspended_tso;
-    task->suspended_tso = NULL;
+    tso = incall->suspended_tso;
+    incall->suspended_tso = NULL;
+    incall->suspended_cap = NULL;
     tso->_link = END_TSO_QUEUE; // no write barrier reqd
 
     traceEventRunThread(cap, tso);
@@ -1954,10 +1955,10 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
 
     // This TSO is now a bound thread; make the Task and TSO
     // point to each other.
-    tso->bound = task;
+    tso->bound = task->incall;
     tso->cap = cap;
 
-    task->tso = tso;
+    task->incall->tso = tso;
     task->ret = ret;
     task->stat = NoStatus;
 
@@ -1980,23 +1981,8 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
  * ------------------------------------------------------------------------- */
 
 #if defined(THREADED_RTS)
-void OSThreadProcAttr
-workerStart(Task *task)
+void scheduleWorker (Capability *cap, Task *task)
 {
-    Capability *cap;
-
-    // See startWorkerTask().
-    ACQUIRE_LOCK(&task->lock);
-    cap = task->cap;
-    RELEASE_LOCK(&task->lock);
-
-    if (RtsFlags.ParFlags.setAffinity) {
-        setThreadAffinity(cap->no, n_capabilities);
-    }
-
-    // set the thread-local pointer to the Task:
-    taskEnter(task);
-
     // schedule() runs without a lock.
     cap = schedule(cap,task);
 
@@ -2062,6 +2048,8 @@ initScheduler(void)
   initSparkPools();
 #endif
 
+  RELEASE_LOCK(&sched_mutex);
+
 #if defined(THREADED_RTS)
   /*
    * Eagerly start one worker to run each Capability, except for
@@ -2075,13 +2063,11 @@ initScheduler(void)
       for (i = 1; i < n_capabilities; i++) {
          cap = &capabilities[i];
          ACQUIRE_LOCK(&cap->lock);
-         startWorkerTask(cap, workerStart);
+         startWorkerTask(cap);
          RELEASE_LOCK(&cap->lock);
       }
   }
 #endif
-
-  RELEASE_LOCK(&sched_mutex);
 }
 
 void
@@ -2102,7 +2088,7 @@ exitScheduler(
        sched_state = SCHED_INTERRUPTING;
         waitForReturnCapability(&task->cap,task);
        scheduleDoGC(task->cap,task,rtsFalse);
-        ASSERT(task->tso == NULL);
+        ASSERT(task->incall->tso == NULL);
         releaseCapability(task->cap);
     }
     sched_state = SCHED_SHUTTING_DOWN;
@@ -2112,7 +2098,7 @@ exitScheduler(
        nat i;
        
        for (i = 0; i < n_capabilities; i++) {
-            ASSERT(task->tso == NULL);
+            ASSERT(task->incall->tso == NULL);
            shutdownCapability(&capabilities[i], task, wait_foreign);
        }
     }
@@ -2161,7 +2147,7 @@ performGC_(rtsBool force_major)
 
     // We must grab a new Task here, because the existing Task may be
     // associated with a particular Capability, and chained onto the 
-    // suspended_ccalling_tasks queue.
+    // suspended_ccalls queue.
     task = newBoundTask();
 
     waitForReturnCapability(&task->cap,task);
@@ -2368,8 +2354,8 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
 
     // The TSO attached to this Task may have moved, so update the
     // pointer to it.
-    if (task->tso == tso) {
-        task->tso = new_tso;
+    if (task->incall->tso == tso) {
+        task->incall->tso = new_tso;
     }
 
     unlockTSO(new_tso);
index 6751144..af322d8 100644 (file)
@@ -46,15 +46,8 @@ StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *excepti
 /* findRetryFrameHelper */
 StgWord findRetryFrameHelper (StgTSO *tso);
 
-/* workerStart()
- * 
- * Entry point for a new worker task.
- * Called from STG :  NO
- * Locks assumed   :  none
- */
-#if defined(THREADED_RTS)
-void OSThreadProcAttr workerStart(Task *task);
-#endif
+/* Entry point for a new worker */
+void scheduleWorker (Capability *cap, Task *task);
 
 /* The state of the scheduler.  This is used to control the sequence
  * of events during shutdown, and when the runtime is interrupted
index 20de32a..58b113d 100644 (file)
@@ -623,7 +623,7 @@ stat_exit(int alloc)
                     i++, task = task->all_link) {
                    statsPrintf("  Task %2d %-8s :  %6.2fs    (%6.2fs)     %6.2fs    (%6.2fs)\n",
                                i,
-                               (task->tso == NULL) ? "(worker)" : "(bound)",
+                               (task->worker) ? "(worker)" : "(bound)",
                                TICK_TO_DBL(task->mut_time),
                                TICK_TO_DBL(task->mut_etime),
                                TICK_TO_DBL(task->gc_time),
index 9a8ebd6..2921e9e 100644 (file)
 // Task lists and global counters.
 // Locks required: sched_mutex.
 Task *all_tasks = NULL;
-static Task *task_free_list = NULL; // singly-linked
 static nat taskCount;
-static nat tasksRunning;
-static nat workerCount;
 static int tasksInitialized = 0;
 
+static void   freeTask  (Task *task);
+static Task * allocTask (void);
+static Task * newTask   (rtsBool);
+
 /* -----------------------------------------------------------------------------
  * Remembering the current thread's Task
  * -------------------------------------------------------------------------- */
@@ -39,7 +40,11 @@ static int tasksInitialized = 0;
 // A thread-local-storage key that we can use to get access to the
 // current thread's Task structure.
 #if defined(THREADED_RTS)
+# if defined(MYTASK_USE_TLV)
+__thread Task *my_task;
+# else
 ThreadLocalKey currentTaskKey;
+# endif
 #else
 Task *my_task;
 #endif
@@ -53,10 +58,8 @@ initTaskManager (void)
 {
     if (!tasksInitialized) {
        taskCount = 0;
-       workerCount = 0;
-       tasksRunning = 0;
        tasksInitialized = 1;
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
        newThreadLocalKey(&currentTaskKey);
 #endif
     }
@@ -66,29 +69,24 @@ nat
 freeTaskManager (void)
 {
     Task *task, *next;
+    nat tasksRunning = 0;
 
     ASSERT_LOCK_HELD(&sched_mutex);
 
-    debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
-               tasksRunning);
-
     for (task = all_tasks; task != NULL; task = next) {
         next = task->all_link;
         if (task->stopped) {
-            // We only free resources if the Task is not in use.  A
-            // Task may still be in use if we have a Haskell thread in
-            // a foreign call while we are attempting to shut down the
-            // RTS (see conc059).
-#if defined(THREADED_RTS)
-            closeCondition(&task->cond);
-            closeMutex(&task->lock);
-#endif
-            stgFree(task);
+            freeTask(task);
+        } else {
+            tasksRunning++;
         }
     }
+
+    debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
+               tasksRunning);
+
     all_tasks = NULL;
-    task_free_list = NULL;
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
     freeThreadLocalKey(&currentTaskKey);
 #endif
 
@@ -97,9 +95,52 @@ freeTaskManager (void)
     return tasksRunning;
 }
 
+static Task *
+allocTask (void)
+{
+    Task *task;
+
+    task = myTask();
+    if (task != NULL) {
+        return task;
+    } else {
+        task = newTask(rtsFalse);
+#if defined(THREADED_RTS)
+        task->id = osThreadId();
+#endif
+        setMyTask(task);
+        return task;
+    }
+}
+
+static void
+freeTask (Task *task)
+{
+    InCall *incall, *next;
+
+    // We only free resources if the Task is not in use.  A
+    // Task may still be in use if we have a Haskell thread in
+    // a foreign call while we are attempting to shut down the
+    // RTS (see conc059).
+#if defined(THREADED_RTS)
+    closeCondition(&task->cond);
+    closeMutex(&task->lock);
+#endif
+
+    for (incall = task->incall; incall != NULL; incall = next) {
+        next = incall->prev_stack;
+        stgFree(incall);
+    }
+    for (incall = task->spare_incalls; incall != NULL; incall = next) {
+        next = incall->next;
+        stgFree(incall);
+    }
+
+    stgFree(task);
+}
 
 static Task*
-newTask (void)
+newTask (rtsBool worker)
 {
 #if defined(THREADED_RTS)
     Ticks currentElapsedTime, currentUserTime;
@@ -109,12 +150,14 @@ newTask (void)
 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
     task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
     
-    task->cap  = NULL;
-    task->stopped = rtsFalse;
-    task->suspended_tso = NULL;
-    task->tso  = NULL;
-    task->stat = NoStatus;
-    task->ret  = NULL;
+    task->cap           = NULL;
+    task->worker        = worker;
+    task->stopped       = rtsFalse;
+    task->stat          = NoStatus;
+    task->ret           = NULL;
+    task->n_spare_incalls = 0;
+    task->spare_incalls = NULL;
+    task->incall        = NULL;
     
 #if defined(THREADED_RTS)
     initCondition(&task->cond);
@@ -133,18 +176,65 @@ newTask (void)
     task->elapsedtimestart = currentElapsedTime;
 #endif
 
-    task->prev = NULL;
     task->next = NULL;
-    task->return_link = NULL;
+
+    ACQUIRE_LOCK(&sched_mutex);
 
     task->all_link = all_tasks;
     all_tasks = task;
 
     taskCount++;
 
+    RELEASE_LOCK(&sched_mutex);
+
     return task;
 }
 
+// avoid the spare_incalls list growing unboundedly
+#define MAX_SPARE_INCALLS 8
+
+static void
+newInCall (Task *task)
+{
+    InCall *incall;
+    
+    if (task->spare_incalls != NULL) {
+        incall = task->spare_incalls;
+        task->spare_incalls = incall->next;
+        task->n_spare_incalls--;
+    } else {
+        incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
+    }
+
+    incall->tso = NULL;
+    incall->task = task;
+    incall->suspended_tso = NULL;
+    incall->suspended_cap = NULL;
+    incall->next = NULL;
+    incall->prev = NULL;
+    incall->prev_stack = task->incall;
+    task->incall = incall;
+}
+
+static void
+endInCall (Task *task)
+{
+    InCall *incall;
+
+    incall = task->incall;
+    incall->tso = NULL;
+    task->incall = task->incall->prev_stack;
+
+    if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
+        stgFree(incall);
+    } else {
+        incall->next = task->spare_incalls;
+        task->spare_incalls = incall;
+        task->n_spare_incalls++;
+    }
+}
+
+
 Task *
 newBoundTask (void)
 {
@@ -155,31 +245,11 @@ newBoundTask (void)
         stg_exit(EXIT_FAILURE);
     }
 
-    // ToDo: get rid of this lock in the common case.  We could store
-    // a free Task in thread-local storage, for example.  That would
-    // leave just one lock on the path into the RTS: cap->lock when
-    // acquiring the Capability.
-    ACQUIRE_LOCK(&sched_mutex);
-
-    if (task_free_list == NULL) {
-       task = newTask();
-    } else {
-       task = task_free_list;
-       task_free_list = task->next;
-       task->next = NULL;
-       task->prev = NULL;
-       task->stopped = rtsFalse;
-    }
-#if defined(THREADED_RTS)
-    task->id = osThreadId();
-#endif
-    ASSERT(task->cap == NULL);
-
-    tasksRunning++;
+    task = allocTask();
 
-    taskEnter(task);
+    task->stopped = rtsFalse;
 
-    RELEASE_LOCK(&sched_mutex);
+    newInCall(task);
 
     debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
     return task;
@@ -188,27 +258,19 @@ newBoundTask (void)
 void
 boundTaskExiting (Task *task)
 {
-    task->tso = NULL;
     task->stopped = rtsTrue;
-    task->cap = NULL;
 
 #if defined(THREADED_RTS)
     ASSERT(osThreadId() == task->id);
 #endif
     ASSERT(myTask() == task);
-    setMyTask(task->prev_stack);
-
-    tasksRunning--;
 
-    // sadly, we need a lock around the free task list. Todo: eliminate.
-    ACQUIRE_LOCK(&sched_mutex);
-    task->next = task_free_list;
-    task_free_list = task;
-    RELEASE_LOCK(&sched_mutex);
+    endInCall(task);
 
     debugTrace(DEBUG_sched, "task exiting");
 }
 
+
 #ifdef THREADED_RTS
 #define TASK_ID(t) (t)->id
 #else
@@ -216,22 +278,20 @@ boundTaskExiting (Task *task)
 #endif
 
 void
-discardTask (Task *task)
+discardTasksExcept (Task *keep)
 {
-    ASSERT_LOCK_HELD(&sched_mutex);
-    if (!task->stopped) {
-       debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
-       task->cap = NULL;
-        if (task->tso == NULL) {
-            workerCount--;
-        } else {
-            task->tso = NULL;
+    Task *task;
+
+    // Wipe the task list, except the current Task.
+    ACQUIRE_LOCK(&sched_mutex);
+    for (task = all_tasks; task != NULL; task=task->all_link) {
+        if (task != keep) {
+            debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
+            freeTask(task);
         }
-       task->stopped = rtsTrue;
-       tasksRunning--;
-       task->next = task_free_list;
-       task_free_list = task;
     }
+    all_tasks = keep;
+    RELEASE_LOCK(&sched_mutex);
 }
 
 void
@@ -270,33 +330,43 @@ workerTaskStop (Task *task)
     task->cap = NULL;
     taskTimeStamp(task);
     task->stopped = rtsTrue;
-    tasksRunning--;
-    workerCount--;
-
-    ACQUIRE_LOCK(&sched_mutex);
-    task->next = task_free_list;
-    task_free_list = task;
-    RELEASE_LOCK(&sched_mutex);
 }
 
 #endif
 
 #if defined(THREADED_RTS)
 
+static void OSThreadProcAttr
+workerStart(Task *task)
+{
+    Capability *cap;
+
+    // See startWorkerTask().
+    ACQUIRE_LOCK(&task->lock);
+    cap = task->cap;
+    RELEASE_LOCK(&task->lock);
+
+    if (RtsFlags.ParFlags.setAffinity) {
+        setThreadAffinity(cap->no, n_capabilities);
+    }
+
+    // set the thread-local pointer to the Task:
+    setMyTask(task);
+
+    newInCall(task);
+
+    scheduleWorker(cap,task);
+}
+
 void
-startWorkerTask (Capability *cap, 
-                void OSThreadProcAttr (*taskStart)(Task *task))
+startWorkerTask (Capability *cap)
 {
   int r;
   OSThreadId tid;
   Task *task;
 
-  workerCount++;
-
   // A worker always gets a fresh Task structure.
-  task = newTask();
-
-  tasksRunning++;
+  task = newTask(rtsTrue);
 
   // The lock here is to synchronise with taskStart(), to make sure
   // that we have finished setting up the Task structure before the
@@ -311,7 +381,7 @@ startWorkerTask (Capability *cap,
   ASSERT_LOCK_HELD(&cap->lock);
   cap->running_task = task;
 
-  r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
+  r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
   if (r != 0) {
     sysErrorBelch("failed to create OS thread");
     stg_exit(EXIT_FAILURE);
@@ -350,8 +420,9 @@ printAllTasks(void)
            if (task->cap) {
                debugBelch("on capability %d, ", task->cap->no);
            }
-           if (task->tso) {
-             debugBelch("bound to thread %lu", (unsigned long)task->tso->id);
+           if (task->incall->tso) {
+             debugBelch("bound to thread %lu",
+                         (unsigned long)task->incall->tso->id);
            } else {
                debugBelch("worker");
            }
index 9b5f025..c2b58f2 100644 (file)
@@ -17,24 +17,20 @@ BEGIN_RTS_PRIVATE
    Definition of a Task
    --------------------
  
-   A task is an OSThread that runs Haskell code.  Every OSThread
-   created by the RTS for the purposes of running Haskell code is a
-   Task, and OS threads that enter the Haskell RTS for the purposes of
-   making a call-in are also Tasks.
+   A task is an OSThread that runs Haskell code.  Every OSThread that
+   runs inside the RTS, whether as a worker created by the RTS or via
+   an in-call from C to Haskell, has an associated Task.  The first
+   time an OS thread calls into Haskell it is allocated a Task, which
+   remains until the RTS is shut down.
+
+   There is a one-to-one relationship between OSThreads and Tasks.
+   The Task for an OSThread is kept in thread-local storage, and can
+   be retrieved at any time using myTask().
    
    In the THREADED_RTS build, multiple Tasks may all be running
    Haskell code simultaneously. A task relinquishes its Capability
    when it is asked to evaluate an external (C) call.
 
-   In general, there may be multiple Tasks associated with a given OS
-   thread.  A second Task is created when one Task makes a foreign
-   call from Haskell, and subsequently calls back in to Haskell,
-   creating a new bound thread.
-
-   A particular Task structure can belong to more than one OS thread
-   over its lifetime.  This is to avoid creating an unbounded number
-   of Task structures.  The stats just accumulate.
-
    Ownership of Task
    -----------------
 
@@ -59,8 +55,8 @@ BEGIN_RTS_PRIVATE
          (1) a bound Task, the TSO will be on a queue somewhere
         (2) a worker task, on the spare_workers queue of task->cap.
 
-     (b) making a foreign call.  The Task will be on the
-         suspended_ccalling_tasks list.
+     (b) making a foreign call.  The InCall will be on the
+         suspended_ccalls list.
 
    We re-establish ownership in each case by respectively
 
@@ -73,9 +69,46 @@ BEGIN_RTS_PRIVATE
           ownership of the Task and a Capability.
 */
 
+// The InCall structure represents either a single in-call from C to
+// Haskell, or a worker thread.
+typedef struct InCall_ {
+    StgTSO *   tso;             // the bound TSO (or NULL for a worker)
+
+    StgTSO *   suspended_tso;   // the TSO is stashed here when we
+                               // make a foreign call (NULL otherwise);
+
+    Capability *suspended_cap;  // The capability that the
+                                // suspended_tso is on, because
+                                // we can't read this from the TSO
+                                // without owning a Capability in the
+                                // first place.
+
+    struct Task_ *task;
+
+    // When a Haskell thread makes a foreign call that re-enters
+    // Haskell, we end up with another Task associated with the
+    // current thread.  We have to remember the whole stack of InCalls
+    // associated with the current Task so that we can correctly
+    // save & restore the InCall on entry to and exit from Haskell.
+    struct InCall_ *prev_stack;
+
+    // Links InCalls onto suspended_ccalls, spare_incalls
+    struct InCall_ *prev;
+    struct InCall_ *next;
+} InCall;
+
 typedef struct Task_ {
 #if defined(THREADED_RTS)
     OSThreadId id;             // The OS Thread ID of this task
+
+    Condition cond;             // used for sleeping & waking up this task
+    Mutex lock;                        // lock for the condition variable
+
+    // this flag tells the task whether it should wait on task->cond
+    // or just continue immediately.  It's a workaround for the fact
+    // that signalling a condition variable doesn't do anything if the
+    // thread is already running, but we want it to be sticky.
+    rtsBool wakeup;
 #endif
 
     // This points to the Capability that the Task "belongs" to.  If
@@ -92,26 +125,18 @@ typedef struct Task_ {
     // must be held when modifying task->cap.
     struct Capability_ *cap;
 
+    // The current top-of-stack InCall
+    struct InCall_ *incall;
+
+    nat n_spare_incalls;
+    struct InCall_ *spare_incalls;
+
+    rtsBool    worker;          // == rtsTrue if this is a worker Task
     rtsBool    stopped;         // this task has stopped or exited Haskell
-    StgTSO *   suspended_tso;   // the TSO is stashed here when we
-                               // make a foreign call (NULL otherwise);
 
-    // The following 3 fields are used by bound threads:
-    StgTSO *   tso;             // the bound TSO (or NULL)
     SchedulerStatus  stat;      // return status
     StgClosure **    ret;       // return value
 
-#if defined(THREADED_RTS)
-    Condition cond;             // used for sleeping & waking up this task
-    Mutex lock;                        // lock for the condition variable
-
-    // this flag tells the task whether it should wait on task->cond
-    // or just continue immediately.  It's a workaround for the fact
-    // that signalling a condition variable doesn't do anything if the
-    // thread is already running, but we want it to be sticky.
-    rtsBool wakeup;
-#endif
-
     // Stats that we collect about this task
     // ToDo: we probably want to put this in a separate TaskStats
     // structure, so we can share it between multiple Tasks.  We don't
@@ -125,29 +150,19 @@ typedef struct Task_ {
     Ticks       gc_time;
     Ticks       gc_etime;
 
-    // Links tasks onto various lists. (ToDo: do we need double
-    // linking now?)
-    struct Task_ *prev;
+    // Links tasks on the returning_tasks queue of a Capability, and
+    // on spare_workers.
     struct Task_ *next;
 
-    // Links tasks on the returning_tasks queue of a Capability.
-    struct Task_ *return_link;
-
     // Links tasks on the all_tasks list
     struct Task_ *all_link;
 
-    // When a Haskell thread makes a foreign call that re-enters
-    // Haskell, we end up with another Task associated with the
-    // current thread.  We have to remember the whole stack of Tasks
-    // associated with the current thread so that we can correctly
-    // save & restore the thread-local current task pointer.
-    struct Task_ *prev_stack;
 } Task;
 
 INLINE_HEADER rtsBool
 isBoundTask (Task *task) 
 {
-    return (task->tso != NULL);
+    return (task->incall->tso != NULL);
 }
 
 
@@ -171,11 +186,6 @@ Task *newBoundTask (void);
 //
 void boundTaskExiting (Task *task);
 
-// This must be called when a new Task is associated with the current
-// thread.  It sets up the thread-local current task pointer so that
-// myTask() can work.
-INLINE_HEADER void taskEnter (Task *task);
-
 // Notify the task manager that a task has stopped.  This is used
 // mainly for stats-gathering purposes.
 // Requires: sched_mutex.
@@ -194,7 +204,7 @@ void taskTimeStamp (Task *task);
 // Put the task back on the free list, mark it stopped.  Used by
 // forkProcess().
 //
-void discardTask (Task *task);
+void discardTasksExcept (Task *keep);
 
 // Get the Task associated with the current OS thread (or NULL if none).
 //
@@ -207,8 +217,7 @@ INLINE_HEADER Task *myTask (void);
 // will become the running_task for that Capability.
 // Requires: sched_mutex.
 //
-void startWorkerTask  (struct Capability_ *cap, 
-                      void OSThreadProcAttr (*taskStart)(Task *task));
+void startWorkerTask (Capability *cap);
 
 #endif /* THREADED_RTS */
 
@@ -218,7 +227,13 @@ void startWorkerTask  (struct Capability_ *cap,
 // A thread-local-storage key that we can use to get access to the
 // current thread's Task structure.
 #if defined(THREADED_RTS)
+#if defined(linux_HOST_OS) && \
+    (defined(i386_HOST_ARCH) || defined(x86_64_HOST_ARCH))
+#define MYTASK_USE_TLV
+extern __thread Task *my_task;
+#else
 extern ThreadLocalKey currentTaskKey;
+#endif
 #else
 extern Task *my_task;
 #endif
@@ -232,7 +247,7 @@ extern Task *my_task;
 INLINE_HEADER Task *
 myTask (void)
 {
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
     return getThreadLocalVar(&currentTaskKey);
 #else
     return my_task;
@@ -242,25 +257,13 @@ myTask (void)
 INLINE_HEADER void
 setMyTask (Task *task)
 {
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
     setThreadLocalVar(&currentTaskKey,task);
 #else
     my_task = task;
 #endif
 }
 
-// This must be called when a new Task is associated with the current
-// thread.  It sets up the thread-local current task pointer so that
-// myTask() can work.
-INLINE_HEADER void
-taskEnter (Task *task)
-{
-    // save the current value, just in case this Task has been created
-    // as a result of re-entering the RTS (defaults to NULL):
-    task->prev_stack = myTask();
-    setMyTask(task);
-}
-
 END_RTS_PRIVATE
 
 #endif /* TASK_H */
index 4f9560c..08b7aab 100644 (file)
@@ -230,8 +230,8 @@ unblockOne_ (Capability *cap, StgTSO *tso,
       // We are waking up this thread on the current Capability, which
       // might involve migrating it from the Capability it was last on.
       if (tso->bound) {
-         ASSERT(tso->bound->cap == tso->cap);
-         tso->bound->cap = cap;
+         ASSERT(tso->bound->task->cap == tso->cap);
+         tso->bound->task->cap = cap;
       }
 
       tso->cap = cap;
index 315eda7..e55ae2b 100644 (file)
@@ -1014,10 +1014,14 @@ compact(StgClosure *static_objects)
     // the task list
     {
        Task *task;
+        InCall *incall;
        for (task = all_tasks; task != NULL; task = task->all_link) {
-           if (task->tso) {
-               thread_(&task->tso);
-           }
+            for (incall = task->incall; incall != NULL; 
+                 incall = incall->prev_stack) {
+                if (incall->tso) {
+                    thread_(&incall->tso);
+                }
+            }
        }
     }