Tag the FUN before making a PAP (#13767)
[ghc.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team 2001-2005
4 *
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
8 *
9 * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: all_tasks_mutex.
28 Task *all_tasks = NULL;
29
30 // current number of bound tasks + total number of worker tasks.
31 uint32_t taskCount;
32 uint32_t workerCount;
33 uint32_t currentWorkerCount;
34 uint32_t peakWorkerCount;
35
36 static int tasksInitialized = 0;
37
38 static void freeTask (Task *task);
39 static Task * newTask (bool);
40
41 #if defined(THREADED_RTS)
42 Mutex all_tasks_mutex;
43 #endif
44
45 /* -----------------------------------------------------------------------------
46 * Remembering the current thread's Task
47 * -------------------------------------------------------------------------- */
48
49 // A thread-local-storage key that we can use to get access to the
50 // current thread's Task structure.
51 #if defined(THREADED_RTS)
52 # if defined(MYTASK_USE_TLV)
53 __thread Task *my_task;
54 # else
55 ThreadLocalKey currentTaskKey;
56 # endif
57 #else
58 Task *my_task;
59 #endif
60
61 /* -----------------------------------------------------------------------------
62 * Rest of the Task API
63 * -------------------------------------------------------------------------- */
64
65 void
66 initTaskManager (void)
67 {
68 if (!tasksInitialized) {
69 taskCount = 0;
70 workerCount = 0;
71 currentWorkerCount = 0;
72 peakWorkerCount = 0;
73 tasksInitialized = 1;
74 #if defined(THREADED_RTS)
75 #if !defined(MYTASK_USE_TLV)
76 newThreadLocalKey(&currentTaskKey);
77 #endif
78 initMutex(&all_tasks_mutex);
79 #endif
80 }
81 }
82
83 uint32_t
84 freeTaskManager (void)
85 {
86 Task *task, *next;
87 uint32_t tasksRunning = 0;
88
89 ACQUIRE_LOCK(&all_tasks_mutex);
90
91 for (task = all_tasks; task != NULL; task = next) {
92 next = task->all_next;
93 if (task->stopped) {
94 freeTask(task);
95 } else {
96 tasksRunning++;
97 }
98 }
99
100 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
101 tasksRunning);
102
103 all_tasks = NULL;
104
105 RELEASE_LOCK(&all_tasks_mutex);
106
107 #if defined(THREADED_RTS)
108 closeMutex(&all_tasks_mutex);
109 #if !defined(MYTASK_USE_TLV)
110 freeThreadLocalKey(&currentTaskKey);
111 #endif
112 #endif
113
114 tasksInitialized = 0;
115
116 return tasksRunning;
117 }
118
119 Task* getTask (void)
120 {
121 Task *task;
122
123 task = myTask();
124 if (task != NULL) {
125 return task;
126 } else {
127 task = newTask(false);
128 #if defined(THREADED_RTS)
129 task->id = osThreadId();
130 #endif
131 setMyTask(task);
132 return task;
133 }
134 }
135
136 void freeMyTask (void)
137 {
138 Task *task;
139
140 task = myTask();
141
142 if (task == NULL) return;
143
144 if (!task->stopped) {
145 errorBelch(
146 "freeMyTask() called, but the Task is not stopped; ignoring");
147 return;
148 }
149
150 if (task->worker) {
151 errorBelch("freeMyTask() called on a worker; ignoring");
152 return;
153 }
154
155 ACQUIRE_LOCK(&all_tasks_mutex);
156
157 if (task->all_prev) {
158 task->all_prev->all_next = task->all_next;
159 } else {
160 all_tasks = task->all_next;
161 }
162 if (task->all_next) {
163 task->all_next->all_prev = task->all_prev;
164 }
165
166 taskCount--;
167
168 RELEASE_LOCK(&all_tasks_mutex);
169
170 freeTask(task);
171 setMyTask(NULL);
172 }
173
174 static void
175 freeTask (Task *task)
176 {
177 InCall *incall, *next;
178
179 // We only free resources if the Task is not in use. A
180 // Task may still be in use if we have a Haskell thread in
181 // a foreign call while we are attempting to shut down the
182 // RTS (see conc059).
183 #if defined(THREADED_RTS)
184 closeCondition(&task->cond);
185 closeMutex(&task->lock);
186 #endif
187
188 for (incall = task->incall; incall != NULL; incall = next) {
189 next = incall->prev_stack;
190 stgFree(incall);
191 }
192 for (incall = task->spare_incalls; incall != NULL; incall = next) {
193 next = incall->next;
194 stgFree(incall);
195 }
196
197 stgFree(task);
198 }
199
200 static Task*
201 newTask (bool worker)
202 {
203 Task *task;
204
205 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
206 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
207
208 task->cap = NULL;
209 task->worker = worker;
210 task->stopped = true;
211 task->running_finalizers = false;
212 task->n_spare_incalls = 0;
213 task->spare_incalls = NULL;
214 task->incall = NULL;
215 task->preferred_capability = -1;
216
217 #if defined(THREADED_RTS)
218 initCondition(&task->cond);
219 initMutex(&task->lock);
220 task->wakeup = false;
221 task->node = 0;
222 #endif
223
224 task->next = NULL;
225
226 ACQUIRE_LOCK(&all_tasks_mutex);
227
228 task->all_prev = NULL;
229 task->all_next = all_tasks;
230 if (all_tasks != NULL) {
231 all_tasks->all_prev = task;
232 }
233 all_tasks = task;
234
235 taskCount++;
236 if (worker) {
237 workerCount++;
238 currentWorkerCount++;
239 if (currentWorkerCount > peakWorkerCount) {
240 peakWorkerCount = currentWorkerCount;
241 }
242 }
243 RELEASE_LOCK(&all_tasks_mutex);
244
245 return task;
246 }
247
248 // avoid the spare_incalls list growing unboundedly
249 #define MAX_SPARE_INCALLS 8
250
251 static void
252 newInCall (Task *task)
253 {
254 InCall *incall;
255
256 if (task->spare_incalls != NULL) {
257 incall = task->spare_incalls;
258 task->spare_incalls = incall->next;
259 task->n_spare_incalls--;
260 } else {
261 incall = stgMallocBytes((sizeof(InCall)), "newInCall");
262 }
263
264 incall->tso = NULL;
265 incall->task = task;
266 incall->suspended_tso = NULL;
267 incall->suspended_cap = NULL;
268 incall->rstat = NoStatus;
269 incall->ret = NULL;
270 incall->next = NULL;
271 incall->prev = NULL;
272 incall->prev_stack = task->incall;
273 task->incall = incall;
274 }
275
276 static void
277 endInCall (Task *task)
278 {
279 InCall *incall;
280
281 incall = task->incall;
282 incall->tso = NULL;
283 task->incall = task->incall->prev_stack;
284
285 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
286 stgFree(incall);
287 } else {
288 incall->next = task->spare_incalls;
289 task->spare_incalls = incall;
290 task->n_spare_incalls++;
291 }
292 }
293
294
295 Task *
296 newBoundTask (void)
297 {
298 Task *task;
299
300 if (!tasksInitialized) {
301 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
302 stg_exit(EXIT_FAILURE);
303 }
304
305 task = getTask();
306
307 task->stopped = false;
308
309 newInCall(task);
310
311 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
312 return task;
313 }
314
315 void
316 boundTaskExiting (Task *task)
317 {
318 #if defined(THREADED_RTS)
319 ASSERT(osThreadId() == task->id);
320 #endif
321 ASSERT(myTask() == task);
322
323 endInCall(task);
324
325 // Set task->stopped, but only if this is the last call (#4850).
326 // Remember that we might have a worker Task that makes a foreign
327 // call and then a callback, so it can transform into a bound
328 // Task for the duration of the callback.
329 if (task->incall == NULL) {
330 task->stopped = true;
331 }
332
333 debugTrace(DEBUG_sched, "task exiting");
334 }
335
336
337 #if defined(THREADED_RTS)
338 #define TASK_ID(t) (t)->id
339 #else
340 #define TASK_ID(t) (t)
341 #endif
342
343 void
344 discardTasksExcept (Task *keep)
345 {
346 Task *task, *next;
347
348 // Wipe the task list, except the current Task.
349 ACQUIRE_LOCK(&all_tasks_mutex);
350 for (task = all_tasks; task != NULL; task=next) {
351 next = task->all_next;
352 if (task != keep) {
353 debugTrace(DEBUG_sched, "discarding task %" FMT_SizeT "", (size_t)TASK_ID(task));
354 #if defined(THREADED_RTS)
355 // It is possible that some of these tasks are currently blocked
356 // (in the parent process) either on their condition variable
357 // `cond` or on their mutex `lock`. If they are we may deadlock
358 // when `freeTask` attempts to call `closeCondition` or
359 // `closeMutex` (the behaviour of these functions is documented to
360 // be undefined in the case that there are threads blocked on
361 // them). To avoid this, we re-initialize both the condition
362 // variable and the mutex before calling `freeTask` (we do
363 // precisely the same for all global locks in `forkProcess`).
364 initCondition(&task->cond);
365 initMutex(&task->lock);
366 #endif
367
368 // Note that we do not traceTaskDelete here because
369 // we are not really deleting a task.
370 // The OS threads for all these tasks do not exist in
371 // this process (since we're currently
372 // in the child of a forkProcess).
373 freeTask(task);
374 }
375 }
376 all_tasks = keep;
377 keep->all_next = NULL;
378 keep->all_prev = NULL;
379 RELEASE_LOCK(&all_tasks_mutex);
380 }
381
382 #if defined(THREADED_RTS)
383
384 void
385 workerTaskStop (Task *task)
386 {
387 DEBUG_ONLY( OSThreadId id );
388 DEBUG_ONLY( id = osThreadId() );
389 ASSERT(task->id == id);
390 ASSERT(myTask() == task);
391
392 ACQUIRE_LOCK(&all_tasks_mutex);
393
394 if (task->all_prev) {
395 task->all_prev->all_next = task->all_next;
396 } else {
397 all_tasks = task->all_next;
398 }
399 if (task->all_next) {
400 task->all_next->all_prev = task->all_prev;
401 }
402
403 currentWorkerCount--;
404
405 RELEASE_LOCK(&all_tasks_mutex);
406
407 traceTaskDelete(task);
408
409 freeTask(task);
410 }
411
412 #endif
413
414 #if defined(THREADED_RTS)
415
416 static void OSThreadProcAttr
417 workerStart(Task *task)
418 {
419 Capability *cap;
420
421 // See startWorkerTask().
422 ACQUIRE_LOCK(&task->lock);
423 cap = task->cap;
424 RELEASE_LOCK(&task->lock);
425
426 if (RtsFlags.ParFlags.setAffinity) {
427 setThreadAffinity(cap->no, n_capabilities);
428 }
429 if (RtsFlags.GcFlags.numa && !RtsFlags.DebugFlags.numa) {
430 setThreadNode(numa_map[task->node]);
431 }
432
433 // set the thread-local pointer to the Task:
434 setMyTask(task);
435
436 newInCall(task);
437
438 // Everything set up; emit the event before the worker starts working.
439 traceTaskCreate(task, cap);
440
441 scheduleWorker(cap,task);
442 }
443
444 void
445 startWorkerTask (Capability *cap)
446 {
447 int r;
448 OSThreadId tid;
449 Task *task;
450
451 // A worker always gets a fresh Task structure.
452 task = newTask(true);
453 task->stopped = false;
454
455 // The lock here is to synchronise with taskStart(), to make sure
456 // that we have finished setting up the Task structure before the
457 // worker thread reads it.
458 ACQUIRE_LOCK(&task->lock);
459
460 // We don't emit a task creation event here, but in workerStart,
461 // where the kernel thread id is known.
462 task->cap = cap;
463 task->node = cap->node;
464
465 // Give the capability directly to the worker; we can't let anyone
466 // else get in, because the new worker Task has nowhere to go to
467 // sleep so that it could be woken up again.
468 ASSERT_LOCK_HELD(&cap->lock);
469 cap->running_task = task;
470
471 r = createOSThread(&tid, "ghc_worker", (OSThreadProc*)workerStart, task);
472 if (r != 0) {
473 sysErrorBelch("failed to create OS thread");
474 stg_exit(EXIT_FAILURE);
475 }
476
477 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
478
479 task->id = tid;
480
481 // ok, finished with the Task struct.
482 RELEASE_LOCK(&task->lock);
483 }
484
485 void
486 interruptWorkerTask (Task *task)
487 {
488 ASSERT(osThreadId() != task->id); // seppuku not allowed
489 ASSERT(task->incall->suspended_tso); // use this only for FFI calls
490 interruptOSThread(task->id);
491 debugTrace(DEBUG_sched, "interrupted worker task %#" FMT_HexWord64,
492 serialisableTaskId(task));
493 }
494
495 #endif /* THREADED_RTS */
496
497 void rts_setInCallCapability (
498 int preferred_capability,
499 int affinity USED_IF_THREADS)
500 {
501 Task *task = getTask();
502 task->preferred_capability = preferred_capability;
503
504 #if defined(THREADED_RTS)
505 if (affinity) {
506 if (RtsFlags.ParFlags.setAffinity) {
507 setThreadAffinity(preferred_capability, n_capabilities);
508 }
509 }
510 #endif
511 }
512
513 void rts_pinThreadToNumaNode (
514 int node USED_IF_THREADS)
515 {
516 #if defined(THREADED_RTS)
517 if (RtsFlags.GcFlags.numa) {
518 Task *task = getTask();
519 task->node = capNoToNumaNode(node);
520 if (!DEBUG_IS_ON || !RtsFlags.DebugFlags.numa) { // faking NUMA
521 setThreadNode(numa_map[task->node]);
522 }
523 }
524 #endif
525 }
526
527 #if defined(DEBUG)
528
529 void printAllTasks(void);
530
531 void
532 printAllTasks(void)
533 {
534 Task *task;
535 for (task = all_tasks; task != NULL; task = task->all_next) {
536 debugBelch("task %#" FMT_HexWord64 " is %s, ", serialisableTaskId(task),
537 task->stopped ? "stopped" : "alive");
538 if (!task->stopped) {
539 if (task->cap) {
540 debugBelch("on capability %d, ", task->cap->no);
541 }
542 if (task->incall->tso) {
543 debugBelch("bound to thread %lu",
544 (unsigned long)task->incall->tso->id);
545 } else {
546 debugBelch("worker");
547 }
548 }
549 debugBelch("\n");
550 }
551 }
552
553 #endif