Fix a bug introduced with allocation counters
[ghc.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team 2001-2005
4 *
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
8 *
9 * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: all_tasks_mutex.
28 Task *all_tasks = NULL;
29
30 nat taskCount;
31 nat workerCount;
32 nat currentWorkerCount;
33 nat peakWorkerCount;
34
35 static int tasksInitialized = 0;
36
37 static void freeTask (Task *task);
38 static Task * allocTask (void);
39 static Task * newTask (rtsBool);
40
41 #if defined(THREADED_RTS)
42 Mutex all_tasks_mutex;
43 #endif
44
45 /* -----------------------------------------------------------------------------
46 * Remembering the current thread's Task
47 * -------------------------------------------------------------------------- */
48
49 // A thread-local-storage key that we can use to get access to the
50 // current thread's Task structure.
51 #if defined(THREADED_RTS)
52 # if defined(MYTASK_USE_TLV)
53 __thread Task *my_task;
54 # else
55 ThreadLocalKey currentTaskKey;
56 # endif
57 #else
58 Task *my_task;
59 #endif
60
61 /* -----------------------------------------------------------------------------
62 * Rest of the Task API
63 * -------------------------------------------------------------------------- */
64
65 void
66 initTaskManager (void)
67 {
68 if (!tasksInitialized) {
69 taskCount = 0;
70 workerCount = 0;
71 currentWorkerCount = 0;
72 peakWorkerCount = 0;
73 tasksInitialized = 1;
74 #if defined(THREADED_RTS)
75 #if !defined(MYTASK_USE_TLV)
76 newThreadLocalKey(&currentTaskKey);
77 #endif
78 initMutex(&all_tasks_mutex);
79 #endif
80 }
81 }
82
83 nat
84 freeTaskManager (void)
85 {
86 Task *task, *next;
87 nat tasksRunning = 0;
88
89 ACQUIRE_LOCK(&all_tasks_mutex);
90
91 for (task = all_tasks; task != NULL; task = next) {
92 next = task->all_next;
93 if (task->stopped) {
94 freeTask(task);
95 } else {
96 tasksRunning++;
97 }
98 }
99
100 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
101 tasksRunning);
102
103 all_tasks = NULL;
104
105 RELEASE_LOCK(&all_tasks_mutex);
106
107 #if defined(THREADED_RTS)
108 closeMutex(&all_tasks_mutex);
109 #if !defined(MYTASK_USE_TLV)
110 freeThreadLocalKey(&currentTaskKey);
111 #endif
112 #endif
113
114 tasksInitialized = 0;
115
116 return tasksRunning;
117 }
118
119 static Task *
120 allocTask (void)
121 {
122 Task *task;
123
124 task = myTask();
125 if (task != NULL) {
126 return task;
127 } else {
128 task = newTask(rtsFalse);
129 #if defined(THREADED_RTS)
130 task->id = osThreadId();
131 #endif
132 setMyTask(task);
133 return task;
134 }
135 }
136
137 void freeMyTask (void)
138 {
139 Task *task;
140
141 task = myTask();
142
143 if (task == NULL) return;
144
145 if (!task->stopped) {
146 errorBelch(
147 "freeMyTask() called, but the Task is not stopped; ignoring");
148 return;
149 }
150
151 if (task->worker) {
152 errorBelch("freeMyTask() called on a worker; ignoring");
153 return;
154 }
155
156 ACQUIRE_LOCK(&all_tasks_mutex);
157
158 if (task->all_prev) {
159 task->all_prev->all_next = task->all_next;
160 } else {
161 all_tasks = task->all_next;
162 }
163 if (task->all_next) {
164 task->all_next->all_prev = task->all_prev;
165 }
166
167 taskCount--;
168
169 RELEASE_LOCK(&all_tasks_mutex);
170
171 freeTask(task);
172 setMyTask(NULL);
173 }
174
175 static void
176 freeTask (Task *task)
177 {
178 InCall *incall, *next;
179
180 // We only free resources if the Task is not in use. A
181 // Task may still be in use if we have a Haskell thread in
182 // a foreign call while we are attempting to shut down the
183 // RTS (see conc059).
184 #if defined(THREADED_RTS)
185 closeCondition(&task->cond);
186 closeMutex(&task->lock);
187 #endif
188
189 for (incall = task->incall; incall != NULL; incall = next) {
190 next = incall->prev_stack;
191 stgFree(incall);
192 }
193 for (incall = task->spare_incalls; incall != NULL; incall = next) {
194 next = incall->next;
195 stgFree(incall);
196 }
197
198 stgFree(task);
199 }
200
201 static Task*
202 newTask (rtsBool worker)
203 {
204 Task *task;
205
206 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
207 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
208
209 task->cap = NULL;
210 task->worker = worker;
211 task->stopped = rtsFalse;
212 task->running_finalizers = rtsFalse;
213 task->n_spare_incalls = 0;
214 task->spare_incalls = NULL;
215 task->incall = NULL;
216
217 #if defined(THREADED_RTS)
218 initCondition(&task->cond);
219 initMutex(&task->lock);
220 task->wakeup = rtsFalse;
221 #endif
222
223 task->next = NULL;
224
225 ACQUIRE_LOCK(&all_tasks_mutex);
226
227 task->all_prev = NULL;
228 task->all_next = all_tasks;
229 if (all_tasks != NULL) {
230 all_tasks->all_prev = task;
231 }
232 all_tasks = task;
233
234 taskCount++;
235 if (worker) {
236 workerCount++;
237 currentWorkerCount++;
238 if (currentWorkerCount > peakWorkerCount) {
239 peakWorkerCount = currentWorkerCount;
240 }
241 }
242 RELEASE_LOCK(&all_tasks_mutex);
243
244 return task;
245 }
246
247 // avoid the spare_incalls list growing unboundedly
248 #define MAX_SPARE_INCALLS 8
249
250 static void
251 newInCall (Task *task)
252 {
253 InCall *incall;
254
255 if (task->spare_incalls != NULL) {
256 incall = task->spare_incalls;
257 task->spare_incalls = incall->next;
258 task->n_spare_incalls--;
259 } else {
260 incall = stgMallocBytes((sizeof(InCall)), "newInCall");
261 }
262
263 incall->tso = NULL;
264 incall->task = task;
265 incall->suspended_tso = NULL;
266 incall->suspended_cap = NULL;
267 incall->stat = NoStatus;
268 incall->ret = NULL;
269 incall->next = NULL;
270 incall->prev = NULL;
271 incall->prev_stack = task->incall;
272 task->incall = incall;
273 }
274
275 static void
276 endInCall (Task *task)
277 {
278 InCall *incall;
279
280 incall = task->incall;
281 incall->tso = NULL;
282 task->incall = task->incall->prev_stack;
283
284 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
285 stgFree(incall);
286 } else {
287 incall->next = task->spare_incalls;
288 task->spare_incalls = incall;
289 task->n_spare_incalls++;
290 }
291 }
292
293
294 Task *
295 newBoundTask (void)
296 {
297 Task *task;
298
299 if (!tasksInitialized) {
300 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
301 stg_exit(EXIT_FAILURE);
302 }
303
304 task = allocTask();
305
306 task->stopped = rtsFalse;
307
308 newInCall(task);
309
310 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
311 return task;
312 }
313
314 void
315 boundTaskExiting (Task *task)
316 {
317 #if defined(THREADED_RTS)
318 ASSERT(osThreadId() == task->id);
319 #endif
320 ASSERT(myTask() == task);
321
322 endInCall(task);
323
324 // Set task->stopped, but only if this is the last call (#4850).
325 // Remember that we might have a worker Task that makes a foreign
326 // call and then a callback, so it can transform into a bound
327 // Task for the duration of the callback.
328 if (task->incall == NULL) {
329 task->stopped = rtsTrue;
330 }
331
332 debugTrace(DEBUG_sched, "task exiting");
333 }
334
335
336 #ifdef THREADED_RTS
337 #define TASK_ID(t) (t)->id
338 #else
339 #define TASK_ID(t) (t)
340 #endif
341
342 void
343 discardTasksExcept (Task *keep)
344 {
345 Task *task, *next;
346
347 // Wipe the task list, except the current Task.
348 ACQUIRE_LOCK(&all_tasks_mutex);
349 for (task = all_tasks; task != NULL; task=next) {
350 next = task->all_next;
351 if (task != keep) {
352 debugTrace(DEBUG_sched, "discarding task %" FMT_SizeT "", (size_t)TASK_ID(task));
353 #if defined(THREADED_RTS)
354 // It is possible that some of these tasks are currently blocked
355 // (in the parent process) either on their condition variable
356 // `cond` or on their mutex `lock`. If they are we may deadlock
357 // when `freeTask` attempts to call `closeCondition` or
358 // `closeMutex` (the behaviour of these functions is documented to
359 // be undefined in the case that there are threads blocked on
360 // them). To avoid this, we re-initialize both the condition
361 // variable and the mutex before calling `freeTask` (we do
362 // precisely the same for all global locks in `forkProcess`).
363 initCondition(&task->cond);
364 initMutex(&task->lock);
365 #endif
366
367 // Note that we do not traceTaskDelete here because
368 // we are not really deleting a task.
369 // The OS threads for all these tasks do not exist in
370 // this process (since we're currently
371 // in the child of a forkProcess).
372 freeTask(task);
373 }
374 }
375 all_tasks = keep;
376 keep->all_next = NULL;
377 keep->all_prev = NULL;
378 RELEASE_LOCK(&all_tasks_mutex);
379 }
380
381 #if defined(THREADED_RTS)
382
383 void
384 workerTaskStop (Task *task)
385 {
386 DEBUG_ONLY( OSThreadId id );
387 DEBUG_ONLY( id = osThreadId() );
388 ASSERT(task->id == id);
389 ASSERT(myTask() == task);
390
391 ACQUIRE_LOCK(&all_tasks_mutex);
392
393 if (task->all_prev) {
394 task->all_prev->all_next = task->all_next;
395 } else {
396 all_tasks = task->all_next;
397 }
398 if (task->all_next) {
399 task->all_next->all_prev = task->all_prev;
400 }
401
402 currentWorkerCount--;
403
404 RELEASE_LOCK(&all_tasks_mutex);
405
406 traceTaskDelete(task);
407
408 freeTask(task);
409 }
410
411 #endif
412
413 #if defined(THREADED_RTS)
414
415 static void OSThreadProcAttr
416 workerStart(Task *task)
417 {
418 Capability *cap;
419
420 // See startWorkerTask().
421 ACQUIRE_LOCK(&task->lock);
422 cap = task->cap;
423 RELEASE_LOCK(&task->lock);
424
425 if (RtsFlags.ParFlags.setAffinity) {
426 setThreadAffinity(cap->no, n_capabilities);
427 }
428
429 // set the thread-local pointer to the Task:
430 setMyTask(task);
431
432 newInCall(task);
433
434 // Everything set up; emit the event before the worker starts working.
435 traceTaskCreate(task, cap);
436
437 scheduleWorker(cap,task);
438 }
439
440 void
441 startWorkerTask (Capability *cap)
442 {
443 int r;
444 OSThreadId tid;
445 Task *task;
446
447 // A worker always gets a fresh Task structure.
448 task = newTask(rtsTrue);
449
450 // The lock here is to synchronise with taskStart(), to make sure
451 // that we have finished setting up the Task structure before the
452 // worker thread reads it.
453 ACQUIRE_LOCK(&task->lock);
454
455 // We don't emit a task creation event here, but in workerStart,
456 // where the kernel thread id is known.
457 task->cap = cap;
458
459 // Give the capability directly to the worker; we can't let anyone
460 // else get in, because the new worker Task has nowhere to go to
461 // sleep so that it could be woken up again.
462 ASSERT_LOCK_HELD(&cap->lock);
463 cap->running_task = task;
464
465 r = createOSThread(&tid, "ghc_worker", (OSThreadProc*)workerStart, task);
466 if (r != 0) {
467 sysErrorBelch("failed to create OS thread");
468 stg_exit(EXIT_FAILURE);
469 }
470
471 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
472
473 task->id = tid;
474
475 // ok, finished with the Task struct.
476 RELEASE_LOCK(&task->lock);
477 }
478
479 void
480 interruptWorkerTask (Task *task)
481 {
482 ASSERT(osThreadId() != task->id); // seppuku not allowed
483 ASSERT(task->incall->suspended_tso); // use this only for FFI calls
484 interruptOSThread(task->id);
485 debugTrace(DEBUG_sched, "interrupted worker task %#" FMT_HexWord64,
486 serialisableTaskId(task));
487 }
488
489 #endif /* THREADED_RTS */
490
491 #ifdef DEBUG
492
493 void printAllTasks(void);
494
495 void
496 printAllTasks(void)
497 {
498 Task *task;
499 for (task = all_tasks; task != NULL; task = task->all_next) {
500 debugBelch("task %#" FMT_HexWord64 " is %s, ", serialisableTaskId(task),
501 task->stopped ? "stopped" : "alive");
502 if (!task->stopped) {
503 if (task->cap) {
504 debugBelch("on capability %d, ", task->cap->no);
505 }
506 if (task->incall->tso) {
507 debugBelch("bound to thread %lu",
508 (unsigned long)task->incall->tso->id);
509 } else {
510 debugBelch("worker");
511 }
512 }
513 debugBelch("\n");
514 }
515 }
516
517 #endif
518