Use https links in user-facing startup and error messages
[ghc.git] / rts / Task.c
index f4a37bf..11ba5f1 100644 (file)
@@ -5,7 +5,7 @@
  * The task manager subsystem.  Tasks execute STG code, with this
  * module providing the API which the Scheduler uses to control their
  * creation and destruction.
- * 
+ *
  * -------------------------------------------------------------------------*/
 
 #include "PosixSource.h"
@@ -19,6 +19,8 @@
 #include "Hash.h"
 #include "Trace.h"
 
+#include <string.h>
+
 #if HAVE_SIGNAL_H
 #include <signal.h>
 #endif
 // Locks required: all_tasks_mutex.
 Task *all_tasks = NULL;
 
-nat taskCount;
-nat workerCount;
-nat currentWorkerCount;
-nat peakWorkerCount;
+// current number of bound tasks + total number of worker tasks.
+uint32_t taskCount;
+uint32_t workerCount;
+uint32_t currentWorkerCount;
+uint32_t peakWorkerCount;
 
 static int tasksInitialized = 0;
 
 static void   freeTask  (Task *task);
-static Task * allocTask (void);
-static Task * newTask   (rtsBool);
+static Task * newTask   (bool);
 
 #if defined(THREADED_RTS)
-static Mutex all_tasks_mutex;
+Mutex all_tasks_mutex;
 #endif
 
 /* -----------------------------------------------------------------------------
@@ -54,9 +56,6 @@ __thread Task *my_task;
 # else
 ThreadLocalKey currentTaskKey;
 # endif
-#ifdef llvm_CC_FLAVOR
-ThreadLocalKey gctKey;
-#endif
 #else
 Task *my_task;
 #endif
@@ -76,21 +75,18 @@ initTaskManager (void)
         tasksInitialized = 1;
 #if defined(THREADED_RTS)
 #if !defined(MYTASK_USE_TLV)
-       newThreadLocalKey(&currentTaskKey);
-#endif
-#if defined(llvm_CC_FLAVOR)
-       newThreadLocalKey(&gctKey);
+        newThreadLocalKey(&currentTaskKey);
 #endif
         initMutex(&all_tasks_mutex);
 #endif
     }
 }
 
-nat
+uint32_t
 freeTaskManager (void)
 {
     Task *task, *next;
-    nat tasksRunning = 0;
+    uint32_t tasksRunning = 0;
 
     ACQUIRE_LOCK(&all_tasks_mutex);
 
@@ -111,13 +107,10 @@ freeTaskManager (void)
     RELEASE_LOCK(&all_tasks_mutex);
 
 #if defined(THREADED_RTS)
-    closeMutex(&all_tasks_mutex); 
+    closeMutex(&all_tasks_mutex);
 #if !defined(MYTASK_USE_TLV)
     freeThreadLocalKey(&currentTaskKey);
 #endif
-#if defined(llvm_CC_FLAVOR)
-    freeThreadLocalKey(&gctKey);
-#endif
 #endif
 
     tasksInitialized = 0;
@@ -125,8 +118,7 @@ freeTaskManager (void)
     return tasksRunning;
 }
 
-static Task *
-allocTask (void)
+Task* getTask (void)
 {
     Task *task;
 
@@ -134,7 +126,7 @@ allocTask (void)
     if (task != NULL) {
         return task;
     } else {
-        task = newTask(rtsFalse);
+        task = newTask(false);
 #if defined(THREADED_RTS)
         task->id = osThreadId();
 #endif
@@ -143,6 +135,44 @@ allocTask (void)
     }
 }
 
+void freeMyTask (void)
+{
+    Task *task;
+
+    task = myTask();
+
+    if (task == NULL) return;
+
+    if (!task->stopped) {
+        errorBelch(
+            "freeMyTask() called, but the Task is not stopped; ignoring");
+        return;
+    }
+
+    if (task->worker) {
+        errorBelch("freeMyTask() called on a worker; ignoring");
+        return;
+    }
+
+    ACQUIRE_LOCK(&all_tasks_mutex);
+
+    if (task->all_prev) {
+        task->all_prev->all_next = task->all_next;
+    } else {
+        all_tasks = task->all_next;
+    }
+    if (task->all_next) {
+        task->all_next->all_prev = task->all_prev;
+    }
+
+    taskCount--;
+
+    RELEASE_LOCK(&all_tasks_mutex);
+
+    freeTask(task);
+    setMyTask(NULL);
+}
+
 static void
 freeTask (Task *task)
 {
@@ -169,26 +199,30 @@ freeTask (Task *task)
     stgFree(task);
 }
 
+/* Must take all_tasks_mutex */
 static Task*
-newTask (rtsBool worker)
+newTask (bool worker)
 {
     Task *task;
 
 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
     task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
-    
+
     task->cap           = NULL;
     task->worker        = worker;
-    task->stopped       = rtsFalse;
-    task->running_finalizers = rtsFalse;
+    task->stopped       = true;
+    task->running_finalizers = false;
     task->n_spare_incalls = 0;
     task->spare_incalls = NULL;
     task->incall        = NULL;
-    
+    task->preferred_capability = -1;
+
 #if defined(THREADED_RTS)
     initCondition(&task->cond);
     initMutex(&task->lock);
-    task->wakeup = rtsFalse;
+    task->id = 0;
+    task->wakeup = false;
+    task->node = 0;
 #endif
 
     task->next = NULL;
@@ -222,20 +256,20 @@ static void
 newInCall (Task *task)
 {
     InCall *incall;
-    
+
     if (task->spare_incalls != NULL) {
         incall = task->spare_incalls;
         task->spare_incalls = incall->next;
         task->n_spare_incalls--;
     } else {
-        incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
+        incall = stgMallocBytes((sizeof(InCall)), "newInCall");
     }
 
     incall->tso = NULL;
     incall->task = task;
     incall->suspended_tso = NULL;
     incall->suspended_cap = NULL;
-    incall->stat          = NoStatus;
+    incall->rstat         = NoStatus;
     incall->ret           = NULL;
     incall->next = NULL;
     incall->prev = NULL;
@@ -272,9 +306,9 @@ newBoundTask (void)
         stg_exit(EXIT_FAILURE);
     }
 
-    task = allocTask();
+    task = getTask();
 
-    task->stopped = rtsFalse;
+    task->stopped = false;
 
     newInCall(task);
 
@@ -297,14 +331,14 @@ boundTaskExiting (Task *task)
     // call and then a callback, so it can transform into a bound
     // Task for the duration of the callback.
     if (task->incall == NULL) {
-        task->stopped = rtsTrue;
+        task->stopped = true;
     }
 
     debugTrace(DEBUG_sched, "task exiting");
 }
 
 
-#ifdef THREADED_RTS
+#if defined(THREADED_RTS)
 #define TASK_ID(t) (t)->id
 #else
 #define TASK_ID(t) (t)
@@ -320,7 +354,26 @@ discardTasksExcept (Task *keep)
     for (task = all_tasks; task != NULL; task=next) {
         next = task->all_next;
         if (task != keep) {
-            debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
+            debugTrace(DEBUG_sched, "discarding task %" FMT_SizeT "", (size_t)TASK_ID(task));
+#if defined(THREADED_RTS)
+            // It is possible that some of these tasks are currently blocked
+            // (in the parent process) either on their condition variable
+            // `cond` or on their mutex `lock`. If they are we may deadlock
+            // when `freeTask` attempts to call `closeCondition` or
+            // `closeMutex` (the behaviour of these functions is documented to
+            // be undefined in the case that there are threads blocked on
+            // them). To avoid this, we re-initialize both the condition
+            // variable and the mutex before calling `freeTask` (we do
+            // precisely the same for all global locks in `forkProcess`).
+            initCondition(&task->cond);
+            initMutex(&task->lock);
+#endif
+
+            // Note that we do not traceTaskDelete here because
+            // we are not really deleting a task.
+            // The OS threads for all these tasks do not exist in
+            // this process (since we're currently
+            // in the child of a forkProcess).
             freeTask(task);
         }
     }
@@ -330,34 +383,6 @@ discardTasksExcept (Task *keep)
     RELEASE_LOCK(&all_tasks_mutex);
 }
 
-//
-// After the capabilities[] array has moved, we have to adjust all
-// (Capability *) pointers to point to the new array.  The old array
-// is still valid at this point.
-//
-void updateCapabilityRefs (void)
-{
-    Task *task;
-    InCall *incall;
-
-    ACQUIRE_LOCK(&all_tasks_mutex);
-
-    for (task = all_tasks; task != NULL; task=task->all_next) {
-        if (task->cap != NULL) {
-            task->cap = &capabilities[task->cap->no];
-        }
-
-        for (incall = task->incall; incall != NULL; incall = incall->prev_stack) {
-            if (incall->suspended_cap != NULL) {
-                incall->suspended_cap = &capabilities[incall->suspended_cap->no];
-            }
-        }
-    }
-
-    RELEASE_LOCK(&all_tasks_mutex);
-}
-
-
 #if defined(THREADED_RTS)
 
 void
@@ -383,27 +408,16 @@ workerTaskStop (Task *task)
 
     RELEASE_LOCK(&all_tasks_mutex);
 
-    freeTask(task);
-}
+    traceTaskDelete(task);
 
-#endif
-
-#ifdef DEBUG
-
-static void *taskId(Task *task)
-{
-#ifdef THREADED_RTS
-    return (void *)task->id;
-#else
-    return (void *)task;
-#endif
+    freeTask(task);
 }
 
 #endif
 
 #if defined(THREADED_RTS)
 
-static void OSThreadProcAttr
+static void* OSThreadProcAttr
 workerStart(Task *task)
 {
     Capability *cap;
@@ -416,15 +430,24 @@ workerStart(Task *task)
     if (RtsFlags.ParFlags.setAffinity) {
         setThreadAffinity(cap->no, n_capabilities);
     }
+    if (RtsFlags.GcFlags.numa && !RtsFlags.DebugFlags.numa) {
+        setThreadNode(numa_map[task->node]);
+    }
 
     // set the thread-local pointer to the Task:
     setMyTask(task);
 
     newInCall(task);
 
+    // Everything set up; emit the event before the worker starts working.
+    traceTaskCreate(task, cap);
+
     scheduleWorker(cap,task);
+
+    return NULL;
 }
 
+/* N.B. must take all_tasks_mutex */
 void
 startWorkerTask (Capability *cap)
 {
@@ -433,14 +456,18 @@ startWorkerTask (Capability *cap)
   Task *task;
 
   // A worker always gets a fresh Task structure.
-  task = newTask(rtsTrue);
+  task = newTask(true);
+  task->stopped = false;
 
   // The lock here is to synchronise with taskStart(), to make sure
   // that we have finished setting up the Task structure before the
   // worker thread reads it.
   ACQUIRE_LOCK(&task->lock);
 
+  // We don't emit a task creation event here, but in workerStart,
+  // where the kernel thread id is known.
   task->cap = cap;
+  task->node = cap->node;
 
   // Give the capability directly to the worker; we can't let anyone
   // else get in, because the new worker Task has nowhere to go to
@@ -448,7 +475,26 @@ startWorkerTask (Capability *cap)
   ASSERT_LOCK_HELD(&cap->lock);
   cap->running_task = task;
 
-  r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
+  // Set the name of the worker thread to the original process name followed by
+  // ":w", but only if we're on Linux where the program_invocation_short_name
+  // global is available.
+#if defined(linux_HOST_OS)
+  size_t procname_len = strlen(program_invocation_short_name);
+  char worker_name[16];
+  // The kernel only allocates 16 bytes for thread names, so we truncate if the
+  // original name is too long. Process names go in another table that has more
+  // capacity.
+  if (procname_len >= 13) {
+      strncpy(worker_name, program_invocation_short_name, 13);
+      strcpy(worker_name + 13, ":w");
+  } else {
+      strcpy(worker_name, program_invocation_short_name);
+      strcpy(worker_name + procname_len, ":w");
+  }
+#else
+  char * worker_name = "ghc_worker";
+#endif
+  r = createOSThread(&tid, worker_name, (OSThreadProc*)workerStart, task);
   if (r != 0) {
     sysErrorBelch("failed to create OS thread");
     stg_exit(EXIT_FAILURE);
@@ -468,12 +514,43 @@ interruptWorkerTask (Task *task)
   ASSERT(osThreadId() != task->id);    // seppuku not allowed
   ASSERT(task->incall->suspended_tso); // use this only for FFI calls
   interruptOSThread(task->id);
-  debugTrace(DEBUG_sched, "interrupted worker task %p", taskId(task));
+  debugTrace(DEBUG_sched, "interrupted worker task %#" FMT_HexWord64,
+             serialisableTaskId(task));
 }
 
 #endif /* THREADED_RTS */
 
-#ifdef DEBUG
+void rts_setInCallCapability (
+    int preferred_capability,
+    int affinity USED_IF_THREADS)
+{
+    Task *task = getTask();
+    task->preferred_capability = preferred_capability;
+
+#if defined(THREADED_RTS)
+    if (affinity) {
+        if (RtsFlags.ParFlags.setAffinity) {
+            setThreadAffinity(preferred_capability, n_capabilities);
+        }
+    }
+#endif
+}
+
+void rts_pinThreadToNumaNode (
+    int node USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+    if (RtsFlags.GcFlags.numa) {
+        Task *task = getTask();
+        task->node = capNoToNumaNode(node);
+        if (!DEBUG_IS_ON || !RtsFlags.DebugFlags.numa) { // faking NUMA
+            setThreadNode(numa_map[task->node]);
+        }
+    }
+#endif
+}
+
+#if defined(DEBUG)
 
 void printAllTasks(void);
 
@@ -482,21 +559,21 @@ printAllTasks(void)
 {
     Task *task;
     for (task = all_tasks; task != NULL; task = task->all_next) {
-       debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
-       if (!task->stopped) {
-           if (task->cap) {
-               debugBelch("on capability %d, ", task->cap->no);
-           }
-           if (task->incall->tso) {
-             debugBelch("bound to thread %lu",
+        debugBelch("task %#" FMT_HexWord64 " is %s, ", serialisableTaskId(task),
+                   task->stopped ? "stopped" : "alive");
+        if (!task->stopped) {
+            if (task->cap) {
+                debugBelch("on capability %d, ", task->cap->no);
+            }
+            if (task->incall->tso) {
+              debugBelch("bound to thread %lu",
                          (unsigned long)task->incall->tso->id);
-           } else {
-               debugBelch("worker");
-           }
-       }
-       debugBelch("\n");
+            } else {
+                debugBelch("worker");
+            }
+        }
+        debugBelch("\n");
     }
-}                     
+}
 
 #endif
-