ghci: Ensure that system libffi include path is searched
[ghc.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team 2001-2005
4 *
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
8 *
9 * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #include <string.h>
23
24 #if HAVE_SIGNAL_H
25 #include <signal.h>
26 #endif
27
28 // Task lists and global counters.
29 // Locks required: all_tasks_mutex.
30 Task *all_tasks = NULL;
31
32 // current number of bound tasks + total number of worker tasks.
33 uint32_t taskCount;
34 uint32_t workerCount;
35 uint32_t currentWorkerCount;
36 uint32_t peakWorkerCount;
37
38 static int tasksInitialized = 0;
39
40 static void freeTask (Task *task);
41 static Task * newTask (bool);
42
43 #if defined(THREADED_RTS)
44 Mutex all_tasks_mutex;
45 #endif
46
47 /* -----------------------------------------------------------------------------
48 * Remembering the current thread's Task
49 * -------------------------------------------------------------------------- */
50
51 // A thread-local-storage key that we can use to get access to the
52 // current thread's Task structure.
53 #if defined(THREADED_RTS)
54 # if defined(MYTASK_USE_TLV)
55 __thread Task *my_task;
56 # else
57 ThreadLocalKey currentTaskKey;
58 # endif
59 #else
60 Task *my_task;
61 #endif
62
63 /* -----------------------------------------------------------------------------
64 * Rest of the Task API
65 * -------------------------------------------------------------------------- */
66
67 void
68 initTaskManager (void)
69 {
70 if (!tasksInitialized) {
71 taskCount = 0;
72 workerCount = 0;
73 currentWorkerCount = 0;
74 peakWorkerCount = 0;
75 tasksInitialized = 1;
76 #if defined(THREADED_RTS)
77 #if !defined(MYTASK_USE_TLV)
78 newThreadLocalKey(&currentTaskKey);
79 #endif
80 initMutex(&all_tasks_mutex);
81 #endif
82 }
83 }
84
85 uint32_t
86 freeTaskManager (void)
87 {
88 Task *task, *next;
89 uint32_t tasksRunning = 0;
90
91 ACQUIRE_LOCK(&all_tasks_mutex);
92
93 for (task = all_tasks; task != NULL; task = next) {
94 next = task->all_next;
95 if (task->stopped) {
96 freeTask(task);
97 } else {
98 tasksRunning++;
99 }
100 }
101
102 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
103 tasksRunning);
104
105 all_tasks = NULL;
106
107 RELEASE_LOCK(&all_tasks_mutex);
108
109 #if defined(THREADED_RTS)
110 closeMutex(&all_tasks_mutex);
111 #if !defined(MYTASK_USE_TLV)
112 freeThreadLocalKey(&currentTaskKey);
113 #endif
114 #endif
115
116 tasksInitialized = 0;
117
118 return tasksRunning;
119 }
120
121 Task* getTask (void)
122 {
123 Task *task;
124
125 task = myTask();
126 if (task != NULL) {
127 return task;
128 } else {
129 task = newTask(false);
130 #if defined(THREADED_RTS)
131 task->id = osThreadId();
132 #endif
133 setMyTask(task);
134 return task;
135 }
136 }
137
138 void freeMyTask (void)
139 {
140 Task *task;
141
142 task = myTask();
143
144 if (task == NULL) return;
145
146 if (!task->stopped) {
147 errorBelch(
148 "freeMyTask() called, but the Task is not stopped; ignoring");
149 return;
150 }
151
152 if (task->worker) {
153 errorBelch("freeMyTask() called on a worker; ignoring");
154 return;
155 }
156
157 ACQUIRE_LOCK(&all_tasks_mutex);
158
159 if (task->all_prev) {
160 task->all_prev->all_next = task->all_next;
161 } else {
162 all_tasks = task->all_next;
163 }
164 if (task->all_next) {
165 task->all_next->all_prev = task->all_prev;
166 }
167
168 taskCount--;
169
170 RELEASE_LOCK(&all_tasks_mutex);
171
172 freeTask(task);
173 setMyTask(NULL);
174 }
175
176 static void
177 freeTask (Task *task)
178 {
179 InCall *incall, *next;
180
181 // We only free resources if the Task is not in use. A
182 // Task may still be in use if we have a Haskell thread in
183 // a foreign call while we are attempting to shut down the
184 // RTS (see conc059).
185 #if defined(THREADED_RTS)
186 closeCondition(&task->cond);
187 closeMutex(&task->lock);
188 #endif
189
190 for (incall = task->incall; incall != NULL; incall = next) {
191 next = incall->prev_stack;
192 stgFree(incall);
193 }
194 for (incall = task->spare_incalls; incall != NULL; incall = next) {
195 next = incall->next;
196 stgFree(incall);
197 }
198
199 stgFree(task);
200 }
201
202 /* Must take all_tasks_mutex */
203 static Task*
204 newTask (bool worker)
205 {
206 Task *task;
207
208 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
209 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
210
211 task->cap = NULL;
212 task->worker = worker;
213 task->stopped = true;
214 task->running_finalizers = false;
215 task->n_spare_incalls = 0;
216 task->spare_incalls = NULL;
217 task->incall = NULL;
218 task->preferred_capability = -1;
219
220 #if defined(THREADED_RTS)
221 initCondition(&task->cond);
222 initMutex(&task->lock);
223 task->id = 0;
224 task->wakeup = false;
225 task->node = 0;
226 #endif
227
228 task->next = NULL;
229
230 ACQUIRE_LOCK(&all_tasks_mutex);
231
232 task->all_prev = NULL;
233 task->all_next = all_tasks;
234 if (all_tasks != NULL) {
235 all_tasks->all_prev = task;
236 }
237 all_tasks = task;
238
239 taskCount++;
240 if (worker) {
241 workerCount++;
242 currentWorkerCount++;
243 if (currentWorkerCount > peakWorkerCount) {
244 peakWorkerCount = currentWorkerCount;
245 }
246 }
247 RELEASE_LOCK(&all_tasks_mutex);
248
249 return task;
250 }
251
252 // avoid the spare_incalls list growing unboundedly
253 #define MAX_SPARE_INCALLS 8
254
255 static void
256 newInCall (Task *task)
257 {
258 InCall *incall;
259
260 if (task->spare_incalls != NULL) {
261 incall = task->spare_incalls;
262 task->spare_incalls = incall->next;
263 task->n_spare_incalls--;
264 } else {
265 incall = stgMallocBytes((sizeof(InCall)), "newInCall");
266 }
267
268 incall->tso = NULL;
269 incall->task = task;
270 incall->suspended_tso = NULL;
271 incall->suspended_cap = NULL;
272 incall->rstat = NoStatus;
273 incall->ret = NULL;
274 incall->next = NULL;
275 incall->prev = NULL;
276 incall->prev_stack = task->incall;
277 task->incall = incall;
278 }
279
280 static void
281 endInCall (Task *task)
282 {
283 InCall *incall;
284
285 incall = task->incall;
286 incall->tso = NULL;
287 task->incall = task->incall->prev_stack;
288
289 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
290 stgFree(incall);
291 } else {
292 incall->next = task->spare_incalls;
293 task->spare_incalls = incall;
294 task->n_spare_incalls++;
295 }
296 }
297
298
299 Task *
300 newBoundTask (void)
301 {
302 Task *task;
303
304 if (!tasksInitialized) {
305 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
306 stg_exit(EXIT_FAILURE);
307 }
308
309 task = getTask();
310
311 task->stopped = false;
312
313 newInCall(task);
314
315 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
316 return task;
317 }
318
319 void
320 boundTaskExiting (Task *task)
321 {
322 #if defined(THREADED_RTS)
323 ASSERT(osThreadId() == task->id);
324 #endif
325 ASSERT(myTask() == task);
326
327 endInCall(task);
328
329 // Set task->stopped, but only if this is the last call (#4850).
330 // Remember that we might have a worker Task that makes a foreign
331 // call and then a callback, so it can transform into a bound
332 // Task for the duration of the callback.
333 if (task->incall == NULL) {
334 task->stopped = true;
335 }
336
337 debugTrace(DEBUG_sched, "task exiting");
338 }
339
340
341 #if defined(THREADED_RTS)
342 #define TASK_ID(t) (t)->id
343 #else
344 #define TASK_ID(t) (t)
345 #endif
346
347 void
348 discardTasksExcept (Task *keep)
349 {
350 Task *task, *next;
351
352 // Wipe the task list, except the current Task.
353 ACQUIRE_LOCK(&all_tasks_mutex);
354 for (task = all_tasks; task != NULL; task=next) {
355 next = task->all_next;
356 if (task != keep) {
357 debugTrace(DEBUG_sched, "discarding task %" FMT_SizeT "", (size_t)TASK_ID(task));
358 #if defined(THREADED_RTS)
359 // It is possible that some of these tasks are currently blocked
360 // (in the parent process) either on their condition variable
361 // `cond` or on their mutex `lock`. If they are we may deadlock
362 // when `freeTask` attempts to call `closeCondition` or
363 // `closeMutex` (the behaviour of these functions is documented to
364 // be undefined in the case that there are threads blocked on
365 // them). To avoid this, we re-initialize both the condition
366 // variable and the mutex before calling `freeTask` (we do
367 // precisely the same for all global locks in `forkProcess`).
368 initCondition(&task->cond);
369 initMutex(&task->lock);
370 #endif
371
372 // Note that we do not traceTaskDelete here because
373 // we are not really deleting a task.
374 // The OS threads for all these tasks do not exist in
375 // this process (since we're currently
376 // in the child of a forkProcess).
377 freeTask(task);
378 }
379 }
380 all_tasks = keep;
381 keep->all_next = NULL;
382 keep->all_prev = NULL;
383 RELEASE_LOCK(&all_tasks_mutex);
384 }
385
386 #if defined(THREADED_RTS)
387
388 void
389 workerTaskStop (Task *task)
390 {
391 DEBUG_ONLY( OSThreadId id );
392 DEBUG_ONLY( id = osThreadId() );
393 ASSERT(task->id == id);
394 ASSERT(myTask() == task);
395
396 ACQUIRE_LOCK(&all_tasks_mutex);
397
398 if (task->all_prev) {
399 task->all_prev->all_next = task->all_next;
400 } else {
401 all_tasks = task->all_next;
402 }
403 if (task->all_next) {
404 task->all_next->all_prev = task->all_prev;
405 }
406
407 currentWorkerCount--;
408
409 RELEASE_LOCK(&all_tasks_mutex);
410
411 traceTaskDelete(task);
412
413 freeTask(task);
414 }
415
416 #endif
417
418 #if defined(THREADED_RTS)
419
420 static void* OSThreadProcAttr
421 workerStart(Task *task)
422 {
423 Capability *cap;
424
425 // See startWorkerTask().
426 ACQUIRE_LOCK(&task->lock);
427 cap = task->cap;
428 RELEASE_LOCK(&task->lock);
429
430 if (RtsFlags.ParFlags.setAffinity) {
431 setThreadAffinity(cap->no, n_capabilities);
432 }
433 if (RtsFlags.GcFlags.numa && !RtsFlags.DebugFlags.numa) {
434 setThreadNode(numa_map[task->node]);
435 }
436
437 // set the thread-local pointer to the Task:
438 setMyTask(task);
439
440 newInCall(task);
441
442 // Everything set up; emit the event before the worker starts working.
443 traceTaskCreate(task, cap);
444
445 scheduleWorker(cap,task);
446
447 return NULL;
448 }
449
450 /* N.B. must take all_tasks_mutex */
451 void
452 startWorkerTask (Capability *cap)
453 {
454 int r;
455 OSThreadId tid;
456 Task *task;
457
458 // A worker always gets a fresh Task structure.
459 task = newTask(true);
460 task->stopped = false;
461
462 // The lock here is to synchronise with taskStart(), to make sure
463 // that we have finished setting up the Task structure before the
464 // worker thread reads it.
465 ACQUIRE_LOCK(&task->lock);
466
467 // We don't emit a task creation event here, but in workerStart,
468 // where the kernel thread id is known.
469 task->cap = cap;
470 task->node = cap->node;
471
472 // Give the capability directly to the worker; we can't let anyone
473 // else get in, because the new worker Task has nowhere to go to
474 // sleep so that it could be woken up again.
475 ASSERT_LOCK_HELD(&cap->lock);
476 cap->running_task = task;
477
478 // Set the name of the worker thread to the original process name followed by
479 // ":w", but only if we're on Linux where the program_invocation_short_name
480 // global is available.
481 #if defined(linux_HOST_OS)
482 size_t procname_len = strlen(program_invocation_short_name);
483 char worker_name[16];
484 // The kernel only allocates 16 bytes for thread names, so we truncate if the
485 // original name is too long. Process names go in another table that has more
486 // capacity.
487 if (procname_len >= 13) {
488 strncpy(worker_name, program_invocation_short_name, 13);
489 strcpy(worker_name + 13, ":w");
490 } else {
491 strcpy(worker_name, program_invocation_short_name);
492 strcpy(worker_name + procname_len, ":w");
493 }
494 #else
495 char * worker_name = "ghc_worker";
496 #endif
497 r = createOSThread(&tid, worker_name, (OSThreadProc*)workerStart, task);
498 if (r != 0) {
499 sysErrorBelch("failed to create OS thread");
500 stg_exit(EXIT_FAILURE);
501 }
502
503 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
504
505 task->id = tid;
506
507 // ok, finished with the Task struct.
508 RELEASE_LOCK(&task->lock);
509 }
510
511 void
512 interruptWorkerTask (Task *task)
513 {
514 ASSERT(osThreadId() != task->id); // seppuku not allowed
515 ASSERT(task->incall->suspended_tso); // use this only for FFI calls
516 interruptOSThread(task->id);
517 debugTrace(DEBUG_sched, "interrupted worker task %#" FMT_HexWord64,
518 serialisableTaskId(task));
519 }
520
521 #endif /* THREADED_RTS */
522
523 void rts_setInCallCapability (
524 int preferred_capability,
525 int affinity USED_IF_THREADS)
526 {
527 Task *task = getTask();
528 task->preferred_capability = preferred_capability;
529
530 #if defined(THREADED_RTS)
531 if (affinity) {
532 if (RtsFlags.ParFlags.setAffinity) {
533 setThreadAffinity(preferred_capability, n_capabilities);
534 }
535 }
536 #endif
537 }
538
539 void rts_pinThreadToNumaNode (
540 int node USED_IF_THREADS)
541 {
542 #if defined(THREADED_RTS)
543 if (RtsFlags.GcFlags.numa) {
544 Task *task = getTask();
545 task->node = capNoToNumaNode(node);
546 if (!DEBUG_IS_ON || !RtsFlags.DebugFlags.numa) { // faking NUMA
547 setThreadNode(numa_map[task->node]);
548 }
549 }
550 #endif
551 }
552
553 #if defined(DEBUG)
554
555 void printAllTasks(void);
556
557 void
558 printAllTasks(void)
559 {
560 Task *task;
561 for (task = all_tasks; task != NULL; task = task->all_next) {
562 debugBelch("task %#" FMT_HexWord64 " is %s, ", serialisableTaskId(task),
563 task->stopped ? "stopped" : "alive");
564 if (!task->stopped) {
565 if (task->cap) {
566 debugBelch("on capability %d, ", task->cap->no);
567 }
568 if (task->incall->tso) {
569 debugBelch("bound to thread %lu",
570 (unsigned long)task->incall->tso->id);
571 } else {
572 debugBelch("worker");
573 }
574 }
575 debugBelch("\n");
576 }
577 }
578
579 #endif