hadrian: eliminate most of the remaining big rule enumerations
[ghc.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team 2001-2005
4 *
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
8 *
9 * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #include <string.h>
23
24 #if HAVE_SIGNAL_H
25 #include <signal.h>
26 #endif
27
28 // Task lists and global counters.
29 // Locks required: all_tasks_mutex.
30 Task *all_tasks = NULL;
31
32 // current number of bound tasks + total number of worker tasks.
33 uint32_t taskCount;
34 uint32_t workerCount;
35 uint32_t currentWorkerCount;
36 uint32_t peakWorkerCount;
37
38 static int tasksInitialized = 0;
39
40 static void freeTask (Task *task);
41 static Task * newTask (bool);
42
43 #if defined(THREADED_RTS)
44 Mutex all_tasks_mutex;
45 #endif
46
47 /* -----------------------------------------------------------------------------
48 * Remembering the current thread's Task
49 * -------------------------------------------------------------------------- */
50
51 // A thread-local-storage key that we can use to get access to the
52 // current thread's Task structure.
53 #if defined(THREADED_RTS)
54 # if defined(MYTASK_USE_TLV)
55 __thread Task *my_task;
56 # else
57 ThreadLocalKey currentTaskKey;
58 # endif
59 #else
60 Task *my_task;
61 #endif
62
63 /* -----------------------------------------------------------------------------
64 * Rest of the Task API
65 * -------------------------------------------------------------------------- */
66
67 void
68 initTaskManager (void)
69 {
70 if (!tasksInitialized) {
71 taskCount = 0;
72 workerCount = 0;
73 currentWorkerCount = 0;
74 peakWorkerCount = 0;
75 tasksInitialized = 1;
76 #if defined(THREADED_RTS)
77 #if !defined(MYTASK_USE_TLV)
78 newThreadLocalKey(&currentTaskKey);
79 #endif
80 initMutex(&all_tasks_mutex);
81 #endif
82 }
83 }
84
85 uint32_t
86 freeTaskManager (void)
87 {
88 Task *task, *next;
89 uint32_t tasksRunning = 0;
90
91 ACQUIRE_LOCK(&all_tasks_mutex);
92
93 for (task = all_tasks; task != NULL; task = next) {
94 next = task->all_next;
95 if (task->stopped) {
96 freeTask(task);
97 } else {
98 tasksRunning++;
99 }
100 }
101
102 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
103 tasksRunning);
104
105 all_tasks = NULL;
106
107 RELEASE_LOCK(&all_tasks_mutex);
108
109 #if defined(THREADED_RTS)
110 closeMutex(&all_tasks_mutex);
111 #if !defined(MYTASK_USE_TLV)
112 freeThreadLocalKey(&currentTaskKey);
113 #endif
114 #endif
115
116 tasksInitialized = 0;
117
118 return tasksRunning;
119 }
120
121 Task* getTask (void)
122 {
123 Task *task;
124
125 task = myTask();
126 if (task != NULL) {
127 return task;
128 } else {
129 task = newTask(false);
130 #if defined(THREADED_RTS)
131 task->id = osThreadId();
132 #endif
133 setMyTask(task);
134 return task;
135 }
136 }
137
138 void freeMyTask (void)
139 {
140 Task *task;
141
142 task = myTask();
143
144 if (task == NULL) return;
145
146 if (!task->stopped) {
147 errorBelch(
148 "freeMyTask() called, but the Task is not stopped; ignoring");
149 return;
150 }
151
152 if (task->worker) {
153 errorBelch("freeMyTask() called on a worker; ignoring");
154 return;
155 }
156
157 ACQUIRE_LOCK(&all_tasks_mutex);
158
159 if (task->all_prev) {
160 task->all_prev->all_next = task->all_next;
161 } else {
162 all_tasks = task->all_next;
163 }
164 if (task->all_next) {
165 task->all_next->all_prev = task->all_prev;
166 }
167
168 taskCount--;
169
170 RELEASE_LOCK(&all_tasks_mutex);
171
172 freeTask(task);
173 setMyTask(NULL);
174 }
175
176 static void
177 freeTask (Task *task)
178 {
179 InCall *incall, *next;
180
181 // We only free resources if the Task is not in use. A
182 // Task may still be in use if we have a Haskell thread in
183 // a foreign call while we are attempting to shut down the
184 // RTS (see conc059).
185 #if defined(THREADED_RTS)
186 closeCondition(&task->cond);
187 closeMutex(&task->lock);
188 #endif
189
190 for (incall = task->incall; incall != NULL; incall = next) {
191 next = incall->prev_stack;
192 stgFree(incall);
193 }
194 for (incall = task->spare_incalls; incall != NULL; incall = next) {
195 next = incall->next;
196 stgFree(incall);
197 }
198
199 stgFree(task);
200 }
201
202 /* Must take all_tasks_mutex */
203 static Task*
204 newTask (bool worker)
205 {
206 Task *task;
207
208 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
209 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
210
211 task->cap = NULL;
212 task->worker = worker;
213 task->stopped = true;
214 task->running_finalizers = false;
215 task->n_spare_incalls = 0;
216 task->spare_incalls = NULL;
217 task->incall = NULL;
218 task->preferred_capability = -1;
219
220 #if defined(THREADED_RTS)
221 initCondition(&task->cond);
222 initMutex(&task->lock);
223 task->wakeup = false;
224 task->node = 0;
225 #endif
226
227 task->next = NULL;
228
229 ACQUIRE_LOCK(&all_tasks_mutex);
230
231 task->all_prev = NULL;
232 task->all_next = all_tasks;
233 if (all_tasks != NULL) {
234 all_tasks->all_prev = task;
235 }
236 all_tasks = task;
237
238 taskCount++;
239 if (worker) {
240 workerCount++;
241 currentWorkerCount++;
242 if (currentWorkerCount > peakWorkerCount) {
243 peakWorkerCount = currentWorkerCount;
244 }
245 }
246 RELEASE_LOCK(&all_tasks_mutex);
247
248 return task;
249 }
250
251 // avoid the spare_incalls list growing unboundedly
252 #define MAX_SPARE_INCALLS 8
253
254 static void
255 newInCall (Task *task)
256 {
257 InCall *incall;
258
259 if (task->spare_incalls != NULL) {
260 incall = task->spare_incalls;
261 task->spare_incalls = incall->next;
262 task->n_spare_incalls--;
263 } else {
264 incall = stgMallocBytes((sizeof(InCall)), "newInCall");
265 }
266
267 incall->tso = NULL;
268 incall->task = task;
269 incall->suspended_tso = NULL;
270 incall->suspended_cap = NULL;
271 incall->rstat = NoStatus;
272 incall->ret = NULL;
273 incall->next = NULL;
274 incall->prev = NULL;
275 incall->prev_stack = task->incall;
276 task->incall = incall;
277 }
278
279 static void
280 endInCall (Task *task)
281 {
282 InCall *incall;
283
284 incall = task->incall;
285 incall->tso = NULL;
286 task->incall = task->incall->prev_stack;
287
288 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
289 stgFree(incall);
290 } else {
291 incall->next = task->spare_incalls;
292 task->spare_incalls = incall;
293 task->n_spare_incalls++;
294 }
295 }
296
297
298 Task *
299 newBoundTask (void)
300 {
301 Task *task;
302
303 if (!tasksInitialized) {
304 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
305 stg_exit(EXIT_FAILURE);
306 }
307
308 task = getTask();
309
310 task->stopped = false;
311
312 newInCall(task);
313
314 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
315 return task;
316 }
317
318 void
319 boundTaskExiting (Task *task)
320 {
321 #if defined(THREADED_RTS)
322 ASSERT(osThreadId() == task->id);
323 #endif
324 ASSERT(myTask() == task);
325
326 endInCall(task);
327
328 // Set task->stopped, but only if this is the last call (#4850).
329 // Remember that we might have a worker Task that makes a foreign
330 // call and then a callback, so it can transform into a bound
331 // Task for the duration of the callback.
332 if (task->incall == NULL) {
333 task->stopped = true;
334 }
335
336 debugTrace(DEBUG_sched, "task exiting");
337 }
338
339
340 #if defined(THREADED_RTS)
341 #define TASK_ID(t) (t)->id
342 #else
343 #define TASK_ID(t) (t)
344 #endif
345
346 void
347 discardTasksExcept (Task *keep)
348 {
349 Task *task, *next;
350
351 // Wipe the task list, except the current Task.
352 ACQUIRE_LOCK(&all_tasks_mutex);
353 for (task = all_tasks; task != NULL; task=next) {
354 next = task->all_next;
355 if (task != keep) {
356 debugTrace(DEBUG_sched, "discarding task %" FMT_SizeT "", (size_t)TASK_ID(task));
357 #if defined(THREADED_RTS)
358 // It is possible that some of these tasks are currently blocked
359 // (in the parent process) either on their condition variable
360 // `cond` or on their mutex `lock`. If they are we may deadlock
361 // when `freeTask` attempts to call `closeCondition` or
362 // `closeMutex` (the behaviour of these functions is documented to
363 // be undefined in the case that there are threads blocked on
364 // them). To avoid this, we re-initialize both the condition
365 // variable and the mutex before calling `freeTask` (we do
366 // precisely the same for all global locks in `forkProcess`).
367 initCondition(&task->cond);
368 initMutex(&task->lock);
369 #endif
370
371 // Note that we do not traceTaskDelete here because
372 // we are not really deleting a task.
373 // The OS threads for all these tasks do not exist in
374 // this process (since we're currently
375 // in the child of a forkProcess).
376 freeTask(task);
377 }
378 }
379 all_tasks = keep;
380 keep->all_next = NULL;
381 keep->all_prev = NULL;
382 RELEASE_LOCK(&all_tasks_mutex);
383 }
384
385 #if defined(THREADED_RTS)
386
387 void
388 workerTaskStop (Task *task)
389 {
390 DEBUG_ONLY( OSThreadId id );
391 DEBUG_ONLY( id = osThreadId() );
392 ASSERT(task->id == id);
393 ASSERT(myTask() == task);
394
395 ACQUIRE_LOCK(&all_tasks_mutex);
396
397 if (task->all_prev) {
398 task->all_prev->all_next = task->all_next;
399 } else {
400 all_tasks = task->all_next;
401 }
402 if (task->all_next) {
403 task->all_next->all_prev = task->all_prev;
404 }
405
406 currentWorkerCount--;
407
408 RELEASE_LOCK(&all_tasks_mutex);
409
410 traceTaskDelete(task);
411
412 freeTask(task);
413 }
414
415 #endif
416
417 #if defined(THREADED_RTS)
418
419 static void* OSThreadProcAttr
420 workerStart(Task *task)
421 {
422 Capability *cap;
423
424 // See startWorkerTask().
425 ACQUIRE_LOCK(&task->lock);
426 cap = task->cap;
427 RELEASE_LOCK(&task->lock);
428
429 if (RtsFlags.ParFlags.setAffinity) {
430 setThreadAffinity(cap->no, n_capabilities);
431 }
432 if (RtsFlags.GcFlags.numa && !RtsFlags.DebugFlags.numa) {
433 setThreadNode(numa_map[task->node]);
434 }
435
436 // set the thread-local pointer to the Task:
437 setMyTask(task);
438
439 newInCall(task);
440
441 // Everything set up; emit the event before the worker starts working.
442 traceTaskCreate(task, cap);
443
444 scheduleWorker(cap,task);
445
446 return NULL;
447 }
448
449 /* N.B. must take all_tasks_mutex */
450 void
451 startWorkerTask (Capability *cap)
452 {
453 int r;
454 OSThreadId tid;
455 Task *task;
456
457 // A worker always gets a fresh Task structure.
458 task = newTask(true);
459 task->stopped = false;
460
461 // The lock here is to synchronise with taskStart(), to make sure
462 // that we have finished setting up the Task structure before the
463 // worker thread reads it.
464 ACQUIRE_LOCK(&task->lock);
465
466 // We don't emit a task creation event here, but in workerStart,
467 // where the kernel thread id is known.
468 task->cap = cap;
469 task->node = cap->node;
470
471 // Give the capability directly to the worker; we can't let anyone
472 // else get in, because the new worker Task has nowhere to go to
473 // sleep so that it could be woken up again.
474 ASSERT_LOCK_HELD(&cap->lock);
475 cap->running_task = task;
476
477 // Set the name of the worker thread to the original process name followed by
478 // ":w", but only if we're on Linux where the program_invocation_short_name
479 // global is available.
480 #if defined(linux_HOST_OS)
481 size_t procname_len = strlen(program_invocation_short_name);
482 char worker_name[16];
483 // The kernel only allocates 16 bytes for thread names, so we truncate if the
484 // original name is too long. Process names go in another table that has more
485 // capacity.
486 if (procname_len >= 13) {
487 strncpy(worker_name, program_invocation_short_name, 13);
488 strcpy(worker_name + 13, ":w");
489 } else {
490 strcpy(worker_name, program_invocation_short_name);
491 strcpy(worker_name + procname_len, ":w");
492 }
493 #else
494 char * worker_name = "ghc_worker";
495 #endif
496 r = createOSThread(&tid, worker_name, (OSThreadProc*)workerStart, task);
497 if (r != 0) {
498 sysErrorBelch("failed to create OS thread");
499 stg_exit(EXIT_FAILURE);
500 }
501
502 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
503
504 task->id = tid;
505
506 // ok, finished with the Task struct.
507 RELEASE_LOCK(&task->lock);
508 }
509
510 void
511 interruptWorkerTask (Task *task)
512 {
513 ASSERT(osThreadId() != task->id); // seppuku not allowed
514 ASSERT(task->incall->suspended_tso); // use this only for FFI calls
515 interruptOSThread(task->id);
516 debugTrace(DEBUG_sched, "interrupted worker task %#" FMT_HexWord64,
517 serialisableTaskId(task));
518 }
519
520 #endif /* THREADED_RTS */
521
522 void rts_setInCallCapability (
523 int preferred_capability,
524 int affinity USED_IF_THREADS)
525 {
526 Task *task = getTask();
527 task->preferred_capability = preferred_capability;
528
529 #if defined(THREADED_RTS)
530 if (affinity) {
531 if (RtsFlags.ParFlags.setAffinity) {
532 setThreadAffinity(preferred_capability, n_capabilities);
533 }
534 }
535 #endif
536 }
537
538 void rts_pinThreadToNumaNode (
539 int node USED_IF_THREADS)
540 {
541 #if defined(THREADED_RTS)
542 if (RtsFlags.GcFlags.numa) {
543 Task *task = getTask();
544 task->node = capNoToNumaNode(node);
545 if (!DEBUG_IS_ON || !RtsFlags.DebugFlags.numa) { // faking NUMA
546 setThreadNode(numa_map[task->node]);
547 }
548 }
549 #endif
550 }
551
552 #if defined(DEBUG)
553
554 void printAllTasks(void);
555
556 void
557 printAllTasks(void)
558 {
559 Task *task;
560 for (task = all_tasks; task != NULL; task = task->all_next) {
561 debugBelch("task %#" FMT_HexWord64 " is %s, ", serialisableTaskId(task),
562 task->stopped ? "stopped" : "alive");
563 if (!task->stopped) {
564 if (task->cap) {
565 debugBelch("on capability %d, ", task->cap->no);
566 }
567 if (task->incall->tso) {
568 debugBelch("bound to thread %lu",
569 (unsigned long)task->incall->tso->id);
570 } else {
571 debugBelch("worker");
572 }
573 }
574 debugBelch("\n");
575 }
576 }
577
578 #endif