Pull recent Hadrian changes from upstream
[ghc.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team 2001-2005
4 *
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
8 *
9 * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #include <string.h>
23
24 #if HAVE_SIGNAL_H
25 #include <signal.h>
26 #endif
27
28 // Task lists and global counters.
29 // Locks required: all_tasks_mutex.
30 Task *all_tasks = NULL;
31
32 // current number of bound tasks + total number of worker tasks.
33 uint32_t taskCount;
34 uint32_t workerCount;
35 uint32_t currentWorkerCount;
36 uint32_t peakWorkerCount;
37
38 static int tasksInitialized = 0;
39
40 static void freeTask (Task *task);
41 static Task * newTask (bool);
42
43 #if defined(THREADED_RTS)
44 Mutex all_tasks_mutex;
45 #endif
46
47 /* -----------------------------------------------------------------------------
48 * Remembering the current thread's Task
49 * -------------------------------------------------------------------------- */
50
51 // A thread-local-storage key that we can use to get access to the
52 // current thread's Task structure.
53 #if defined(THREADED_RTS)
54 # if defined(MYTASK_USE_TLV)
55 __thread Task *my_task;
56 # else
57 ThreadLocalKey currentTaskKey;
58 # endif
59 #else
60 Task *my_task;
61 #endif
62
63 /* -----------------------------------------------------------------------------
64 * Rest of the Task API
65 * -------------------------------------------------------------------------- */
66
67 void
68 initTaskManager (void)
69 {
70 if (!tasksInitialized) {
71 taskCount = 0;
72 workerCount = 0;
73 currentWorkerCount = 0;
74 peakWorkerCount = 0;
75 tasksInitialized = 1;
76 #if defined(THREADED_RTS)
77 #if !defined(MYTASK_USE_TLV)
78 newThreadLocalKey(&currentTaskKey);
79 #endif
80 initMutex(&all_tasks_mutex);
81 #endif
82 }
83 }
84
85 uint32_t
86 freeTaskManager (void)
87 {
88 Task *task, *next;
89 uint32_t tasksRunning = 0;
90
91 ACQUIRE_LOCK(&all_tasks_mutex);
92
93 for (task = all_tasks; task != NULL; task = next) {
94 next = task->all_next;
95 if (task->stopped) {
96 freeTask(task);
97 } else {
98 tasksRunning++;
99 }
100 }
101
102 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
103 tasksRunning);
104
105 all_tasks = NULL;
106
107 RELEASE_LOCK(&all_tasks_mutex);
108
109 #if defined(THREADED_RTS)
110 closeMutex(&all_tasks_mutex);
111 #if !defined(MYTASK_USE_TLV)
112 freeThreadLocalKey(&currentTaskKey);
113 #endif
114 #endif
115
116 tasksInitialized = 0;
117
118 return tasksRunning;
119 }
120
121 Task* getTask (void)
122 {
123 Task *task;
124
125 task = myTask();
126 if (task != NULL) {
127 return task;
128 } else {
129 task = newTask(false);
130 #if defined(THREADED_RTS)
131 task->id = osThreadId();
132 #endif
133 setMyTask(task);
134 return task;
135 }
136 }
137
138 void freeMyTask (void)
139 {
140 Task *task;
141
142 task = myTask();
143
144 if (task == NULL) return;
145
146 if (!task->stopped) {
147 errorBelch(
148 "freeMyTask() called, but the Task is not stopped; ignoring");
149 return;
150 }
151
152 if (task->worker) {
153 errorBelch("freeMyTask() called on a worker; ignoring");
154 return;
155 }
156
157 ACQUIRE_LOCK(&all_tasks_mutex);
158
159 if (task->all_prev) {
160 task->all_prev->all_next = task->all_next;
161 } else {
162 all_tasks = task->all_next;
163 }
164 if (task->all_next) {
165 task->all_next->all_prev = task->all_prev;
166 }
167
168 taskCount--;
169
170 RELEASE_LOCK(&all_tasks_mutex);
171
172 freeTask(task);
173 setMyTask(NULL);
174 }
175
176 static void
177 freeTask (Task *task)
178 {
179 InCall *incall, *next;
180
181 // We only free resources if the Task is not in use. A
182 // Task may still be in use if we have a Haskell thread in
183 // a foreign call while we are attempting to shut down the
184 // RTS (see conc059).
185 #if defined(THREADED_RTS)
186 closeCondition(&task->cond);
187 closeMutex(&task->lock);
188 #endif
189
190 for (incall = task->incall; incall != NULL; incall = next) {
191 next = incall->prev_stack;
192 stgFree(incall);
193 }
194 for (incall = task->spare_incalls; incall != NULL; incall = next) {
195 next = incall->next;
196 stgFree(incall);
197 }
198
199 stgFree(task);
200 }
201
202 static Task*
203 newTask (bool worker)
204 {
205 Task *task;
206
207 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
208 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
209
210 task->cap = NULL;
211 task->worker = worker;
212 task->stopped = true;
213 task->running_finalizers = false;
214 task->n_spare_incalls = 0;
215 task->spare_incalls = NULL;
216 task->incall = NULL;
217 task->preferred_capability = -1;
218
219 #if defined(THREADED_RTS)
220 initCondition(&task->cond);
221 initMutex(&task->lock);
222 task->wakeup = false;
223 task->node = 0;
224 #endif
225
226 task->next = NULL;
227
228 ACQUIRE_LOCK(&all_tasks_mutex);
229
230 task->all_prev = NULL;
231 task->all_next = all_tasks;
232 if (all_tasks != NULL) {
233 all_tasks->all_prev = task;
234 }
235 all_tasks = task;
236
237 taskCount++;
238 if (worker) {
239 workerCount++;
240 currentWorkerCount++;
241 if (currentWorkerCount > peakWorkerCount) {
242 peakWorkerCount = currentWorkerCount;
243 }
244 }
245 RELEASE_LOCK(&all_tasks_mutex);
246
247 return task;
248 }
249
250 // avoid the spare_incalls list growing unboundedly
251 #define MAX_SPARE_INCALLS 8
252
253 static void
254 newInCall (Task *task)
255 {
256 InCall *incall;
257
258 if (task->spare_incalls != NULL) {
259 incall = task->spare_incalls;
260 task->spare_incalls = incall->next;
261 task->n_spare_incalls--;
262 } else {
263 incall = stgMallocBytes((sizeof(InCall)), "newInCall");
264 }
265
266 incall->tso = NULL;
267 incall->task = task;
268 incall->suspended_tso = NULL;
269 incall->suspended_cap = NULL;
270 incall->rstat = NoStatus;
271 incall->ret = NULL;
272 incall->next = NULL;
273 incall->prev = NULL;
274 incall->prev_stack = task->incall;
275 task->incall = incall;
276 }
277
278 static void
279 endInCall (Task *task)
280 {
281 InCall *incall;
282
283 incall = task->incall;
284 incall->tso = NULL;
285 task->incall = task->incall->prev_stack;
286
287 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
288 stgFree(incall);
289 } else {
290 incall->next = task->spare_incalls;
291 task->spare_incalls = incall;
292 task->n_spare_incalls++;
293 }
294 }
295
296
297 Task *
298 newBoundTask (void)
299 {
300 Task *task;
301
302 if (!tasksInitialized) {
303 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
304 stg_exit(EXIT_FAILURE);
305 }
306
307 task = getTask();
308
309 task->stopped = false;
310
311 newInCall(task);
312
313 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
314 return task;
315 }
316
317 void
318 boundTaskExiting (Task *task)
319 {
320 #if defined(THREADED_RTS)
321 ASSERT(osThreadId() == task->id);
322 #endif
323 ASSERT(myTask() == task);
324
325 endInCall(task);
326
327 // Set task->stopped, but only if this is the last call (#4850).
328 // Remember that we might have a worker Task that makes a foreign
329 // call and then a callback, so it can transform into a bound
330 // Task for the duration of the callback.
331 if (task->incall == NULL) {
332 task->stopped = true;
333 }
334
335 debugTrace(DEBUG_sched, "task exiting");
336 }
337
338
339 #if defined(THREADED_RTS)
340 #define TASK_ID(t) (t)->id
341 #else
342 #define TASK_ID(t) (t)
343 #endif
344
345 void
346 discardTasksExcept (Task *keep)
347 {
348 Task *task, *next;
349
350 // Wipe the task list, except the current Task.
351 ACQUIRE_LOCK(&all_tasks_mutex);
352 for (task = all_tasks; task != NULL; task=next) {
353 next = task->all_next;
354 if (task != keep) {
355 debugTrace(DEBUG_sched, "discarding task %" FMT_SizeT "", (size_t)TASK_ID(task));
356 #if defined(THREADED_RTS)
357 // It is possible that some of these tasks are currently blocked
358 // (in the parent process) either on their condition variable
359 // `cond` or on their mutex `lock`. If they are we may deadlock
360 // when `freeTask` attempts to call `closeCondition` or
361 // `closeMutex` (the behaviour of these functions is documented to
362 // be undefined in the case that there are threads blocked on
363 // them). To avoid this, we re-initialize both the condition
364 // variable and the mutex before calling `freeTask` (we do
365 // precisely the same for all global locks in `forkProcess`).
366 initCondition(&task->cond);
367 initMutex(&task->lock);
368 #endif
369
370 // Note that we do not traceTaskDelete here because
371 // we are not really deleting a task.
372 // The OS threads for all these tasks do not exist in
373 // this process (since we're currently
374 // in the child of a forkProcess).
375 freeTask(task);
376 }
377 }
378 all_tasks = keep;
379 keep->all_next = NULL;
380 keep->all_prev = NULL;
381 RELEASE_LOCK(&all_tasks_mutex);
382 }
383
384 #if defined(THREADED_RTS)
385
386 void
387 workerTaskStop (Task *task)
388 {
389 DEBUG_ONLY( OSThreadId id );
390 DEBUG_ONLY( id = osThreadId() );
391 ASSERT(task->id == id);
392 ASSERT(myTask() == task);
393
394 ACQUIRE_LOCK(&all_tasks_mutex);
395
396 if (task->all_prev) {
397 task->all_prev->all_next = task->all_next;
398 } else {
399 all_tasks = task->all_next;
400 }
401 if (task->all_next) {
402 task->all_next->all_prev = task->all_prev;
403 }
404
405 currentWorkerCount--;
406
407 RELEASE_LOCK(&all_tasks_mutex);
408
409 traceTaskDelete(task);
410
411 freeTask(task);
412 }
413
414 #endif
415
416 #if defined(THREADED_RTS)
417
418 static void OSThreadProcAttr
419 workerStart(Task *task)
420 {
421 Capability *cap;
422
423 // See startWorkerTask().
424 ACQUIRE_LOCK(&task->lock);
425 cap = task->cap;
426 RELEASE_LOCK(&task->lock);
427
428 if (RtsFlags.ParFlags.setAffinity) {
429 setThreadAffinity(cap->no, n_capabilities);
430 }
431 if (RtsFlags.GcFlags.numa && !RtsFlags.DebugFlags.numa) {
432 setThreadNode(numa_map[task->node]);
433 }
434
435 // set the thread-local pointer to the Task:
436 setMyTask(task);
437
438 newInCall(task);
439
440 // Everything set up; emit the event before the worker starts working.
441 traceTaskCreate(task, cap);
442
443 scheduleWorker(cap,task);
444 }
445
446 void
447 startWorkerTask (Capability *cap)
448 {
449 int r;
450 OSThreadId tid;
451 Task *task;
452
453 // A worker always gets a fresh Task structure.
454 task = newTask(true);
455 task->stopped = false;
456
457 // The lock here is to synchronise with taskStart(), to make sure
458 // that we have finished setting up the Task structure before the
459 // worker thread reads it.
460 ACQUIRE_LOCK(&task->lock);
461
462 // We don't emit a task creation event here, but in workerStart,
463 // where the kernel thread id is known.
464 task->cap = cap;
465 task->node = cap->node;
466
467 // Give the capability directly to the worker; we can't let anyone
468 // else get in, because the new worker Task has nowhere to go to
469 // sleep so that it could be woken up again.
470 ASSERT_LOCK_HELD(&cap->lock);
471 cap->running_task = task;
472
473 // Set the name of the worker thread to the original process name followed by
474 // ":w", but only if we're on Linux where the program_invocation_short_name
475 // global is available.
476 #if defined(linux_HOST_OS)
477 size_t procname_len = strlen(program_invocation_short_name);
478 char worker_name[16];
479 // The kernel only allocates 16 bytes for thread names, so we truncate if the
480 // original name is too long. Process names go in another table that has more
481 // capacity.
482 if (procname_len >= 13) {
483 strncpy(worker_name, program_invocation_short_name, 13);
484 strcpy(worker_name + 13, ":w");
485 } else {
486 strcpy(worker_name, program_invocation_short_name);
487 strcpy(worker_name + procname_len, ":w");
488 }
489 #else
490 char * worker_name = "ghc_worker";
491 #endif
492 r = createOSThread(&tid, worker_name, (OSThreadProc*)workerStart, task);
493 if (r != 0) {
494 sysErrorBelch("failed to create OS thread");
495 stg_exit(EXIT_FAILURE);
496 }
497
498 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
499
500 task->id = tid;
501
502 // ok, finished with the Task struct.
503 RELEASE_LOCK(&task->lock);
504 }
505
506 void
507 interruptWorkerTask (Task *task)
508 {
509 ASSERT(osThreadId() != task->id); // seppuku not allowed
510 ASSERT(task->incall->suspended_tso); // use this only for FFI calls
511 interruptOSThread(task->id);
512 debugTrace(DEBUG_sched, "interrupted worker task %#" FMT_HexWord64,
513 serialisableTaskId(task));
514 }
515
516 #endif /* THREADED_RTS */
517
518 void rts_setInCallCapability (
519 int preferred_capability,
520 int affinity USED_IF_THREADS)
521 {
522 Task *task = getTask();
523 task->preferred_capability = preferred_capability;
524
525 #if defined(THREADED_RTS)
526 if (affinity) {
527 if (RtsFlags.ParFlags.setAffinity) {
528 setThreadAffinity(preferred_capability, n_capabilities);
529 }
530 }
531 #endif
532 }
533
534 void rts_pinThreadToNumaNode (
535 int node USED_IF_THREADS)
536 {
537 #if defined(THREADED_RTS)
538 if (RtsFlags.GcFlags.numa) {
539 Task *task = getTask();
540 task->node = capNoToNumaNode(node);
541 if (!DEBUG_IS_ON || !RtsFlags.DebugFlags.numa) { // faking NUMA
542 setThreadNode(numa_map[task->node]);
543 }
544 }
545 #endif
546 }
547
548 #if defined(DEBUG)
549
550 void printAllTasks(void);
551
552 void
553 printAllTasks(void)
554 {
555 Task *task;
556 for (task = all_tasks; task != NULL; task = task->all_next) {
557 debugBelch("task %#" FMT_HexWord64 " is %s, ", serialisableTaskId(task),
558 task->stopped ? "stopped" : "alive");
559 if (!task->stopped) {
560 if (task->cap) {
561 debugBelch("on capability %d, ", task->cap->no);
562 }
563 if (task->incall->tso) {
564 debugBelch("bound to thread %lu",
565 (unsigned long)task->incall->tso->id);
566 } else {
567 debugBelch("worker");
568 }
569 }
570 debugBelch("\n");
571 }
572 }
573
574 #endif