45ef77ba0bcd573d51c77a77a6ecd2dbd4ea0df7
[ghc.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team 2001-2005
4 *
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
8 *
9 * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: all_tasks_mutex.
28 Task *all_tasks = NULL;
29
30 // current number of bound tasks + total number of worker tasks.
31 uint32_t taskCount;
32 uint32_t workerCount;
33 uint32_t currentWorkerCount;
34 uint32_t peakWorkerCount;
35
36 static int tasksInitialized = 0;
37
38 static void freeTask (Task *task);
39 static Task * allocTask (void);
40 static Task * newTask (rtsBool);
41
42 #if defined(THREADED_RTS)
43 Mutex all_tasks_mutex;
44 #endif
45
46 /* -----------------------------------------------------------------------------
47 * Remembering the current thread's Task
48 * -------------------------------------------------------------------------- */
49
50 // A thread-local-storage key that we can use to get access to the
51 // current thread's Task structure.
52 #if defined(THREADED_RTS)
53 # if defined(MYTASK_USE_TLV)
54 __thread Task *my_task;
55 # else
56 ThreadLocalKey currentTaskKey;
57 # endif
58 #else
59 Task *my_task;
60 #endif
61
62 /* -----------------------------------------------------------------------------
63 * Rest of the Task API
64 * -------------------------------------------------------------------------- */
65
66 void
67 initTaskManager (void)
68 {
69 if (!tasksInitialized) {
70 taskCount = 0;
71 workerCount = 0;
72 currentWorkerCount = 0;
73 peakWorkerCount = 0;
74 tasksInitialized = 1;
75 #if defined(THREADED_RTS)
76 #if !defined(MYTASK_USE_TLV)
77 newThreadLocalKey(&currentTaskKey);
78 #endif
79 initMutex(&all_tasks_mutex);
80 #endif
81 }
82 }
83
84 uint32_t
85 freeTaskManager (void)
86 {
87 Task *task, *next;
88 uint32_t tasksRunning = 0;
89
90 ACQUIRE_LOCK(&all_tasks_mutex);
91
92 for (task = all_tasks; task != NULL; task = next) {
93 next = task->all_next;
94 if (task->stopped) {
95 freeTask(task);
96 } else {
97 tasksRunning++;
98 }
99 }
100
101 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
102 tasksRunning);
103
104 all_tasks = NULL;
105
106 RELEASE_LOCK(&all_tasks_mutex);
107
108 #if defined(THREADED_RTS)
109 closeMutex(&all_tasks_mutex);
110 #if !defined(MYTASK_USE_TLV)
111 freeThreadLocalKey(&currentTaskKey);
112 #endif
113 #endif
114
115 tasksInitialized = 0;
116
117 return tasksRunning;
118 }
119
120 static Task *
121 allocTask (void)
122 {
123 Task *task;
124
125 task = myTask();
126 if (task != NULL) {
127 return task;
128 } else {
129 task = newTask(rtsFalse);
130 #if defined(THREADED_RTS)
131 task->id = osThreadId();
132 #endif
133 setMyTask(task);
134 return task;
135 }
136 }
137
138 void freeMyTask (void)
139 {
140 Task *task;
141
142 task = myTask();
143
144 if (task == NULL) return;
145
146 if (!task->stopped) {
147 errorBelch(
148 "freeMyTask() called, but the Task is not stopped; ignoring");
149 return;
150 }
151
152 if (task->worker) {
153 errorBelch("freeMyTask() called on a worker; ignoring");
154 return;
155 }
156
157 ACQUIRE_LOCK(&all_tasks_mutex);
158
159 if (task->all_prev) {
160 task->all_prev->all_next = task->all_next;
161 } else {
162 all_tasks = task->all_next;
163 }
164 if (task->all_next) {
165 task->all_next->all_prev = task->all_prev;
166 }
167
168 taskCount--;
169
170 RELEASE_LOCK(&all_tasks_mutex);
171
172 freeTask(task);
173 setMyTask(NULL);
174 }
175
176 static void
177 freeTask (Task *task)
178 {
179 InCall *incall, *next;
180
181 // We only free resources if the Task is not in use. A
182 // Task may still be in use if we have a Haskell thread in
183 // a foreign call while we are attempting to shut down the
184 // RTS (see conc059).
185 #if defined(THREADED_RTS)
186 closeCondition(&task->cond);
187 closeMutex(&task->lock);
188 #endif
189
190 for (incall = task->incall; incall != NULL; incall = next) {
191 next = incall->prev_stack;
192 stgFree(incall);
193 }
194 for (incall = task->spare_incalls; incall != NULL; incall = next) {
195 next = incall->next;
196 stgFree(incall);
197 }
198
199 stgFree(task);
200 }
201
202 static Task*
203 newTask (rtsBool worker)
204 {
205 Task *task;
206
207 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
208 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
209
210 task->cap = NULL;
211 task->worker = worker;
212 task->stopped = rtsFalse;
213 task->running_finalizers = rtsFalse;
214 task->n_spare_incalls = 0;
215 task->spare_incalls = NULL;
216 task->incall = NULL;
217 task->preferred_capability = -1;
218
219 #if defined(THREADED_RTS)
220 initCondition(&task->cond);
221 initMutex(&task->lock);
222 task->wakeup = rtsFalse;
223 #endif
224
225 task->next = NULL;
226
227 ACQUIRE_LOCK(&all_tasks_mutex);
228
229 task->all_prev = NULL;
230 task->all_next = all_tasks;
231 if (all_tasks != NULL) {
232 all_tasks->all_prev = task;
233 }
234 all_tasks = task;
235
236 taskCount++;
237 if (worker) {
238 workerCount++;
239 currentWorkerCount++;
240 if (currentWorkerCount > peakWorkerCount) {
241 peakWorkerCount = currentWorkerCount;
242 }
243 }
244 RELEASE_LOCK(&all_tasks_mutex);
245
246 return task;
247 }
248
249 // avoid the spare_incalls list growing unboundedly
250 #define MAX_SPARE_INCALLS 8
251
252 static void
253 newInCall (Task *task)
254 {
255 InCall *incall;
256
257 if (task->spare_incalls != NULL) {
258 incall = task->spare_incalls;
259 task->spare_incalls = incall->next;
260 task->n_spare_incalls--;
261 } else {
262 incall = stgMallocBytes((sizeof(InCall)), "newInCall");
263 }
264
265 incall->tso = NULL;
266 incall->task = task;
267 incall->suspended_tso = NULL;
268 incall->suspended_cap = NULL;
269 incall->rstat = NoStatus;
270 incall->ret = NULL;
271 incall->next = NULL;
272 incall->prev = NULL;
273 incall->prev_stack = task->incall;
274 task->incall = incall;
275 }
276
277 static void
278 endInCall (Task *task)
279 {
280 InCall *incall;
281
282 incall = task->incall;
283 incall->tso = NULL;
284 task->incall = task->incall->prev_stack;
285
286 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
287 stgFree(incall);
288 } else {
289 incall->next = task->spare_incalls;
290 task->spare_incalls = incall;
291 task->n_spare_incalls++;
292 }
293 }
294
295
296 Task *
297 newBoundTask (void)
298 {
299 Task *task;
300
301 if (!tasksInitialized) {
302 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
303 stg_exit(EXIT_FAILURE);
304 }
305
306 task = allocTask();
307
308 task->stopped = rtsFalse;
309
310 newInCall(task);
311
312 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
313 return task;
314 }
315
316 void
317 boundTaskExiting (Task *task)
318 {
319 #if defined(THREADED_RTS)
320 ASSERT(osThreadId() == task->id);
321 #endif
322 ASSERT(myTask() == task);
323
324 endInCall(task);
325
326 // Set task->stopped, but only if this is the last call (#4850).
327 // Remember that we might have a worker Task that makes a foreign
328 // call and then a callback, so it can transform into a bound
329 // Task for the duration of the callback.
330 if (task->incall == NULL) {
331 task->stopped = rtsTrue;
332 }
333
334 debugTrace(DEBUG_sched, "task exiting");
335 }
336
337
338 #ifdef THREADED_RTS
339 #define TASK_ID(t) (t)->id
340 #else
341 #define TASK_ID(t) (t)
342 #endif
343
344 void
345 discardTasksExcept (Task *keep)
346 {
347 Task *task, *next;
348
349 // Wipe the task list, except the current Task.
350 ACQUIRE_LOCK(&all_tasks_mutex);
351 for (task = all_tasks; task != NULL; task=next) {
352 next = task->all_next;
353 if (task != keep) {
354 debugTrace(DEBUG_sched, "discarding task %" FMT_SizeT "", (size_t)TASK_ID(task));
355 #if defined(THREADED_RTS)
356 // It is possible that some of these tasks are currently blocked
357 // (in the parent process) either on their condition variable
358 // `cond` or on their mutex `lock`. If they are we may deadlock
359 // when `freeTask` attempts to call `closeCondition` or
360 // `closeMutex` (the behaviour of these functions is documented to
361 // be undefined in the case that there are threads blocked on
362 // them). To avoid this, we re-initialize both the condition
363 // variable and the mutex before calling `freeTask` (we do
364 // precisely the same for all global locks in `forkProcess`).
365 initCondition(&task->cond);
366 initMutex(&task->lock);
367 #endif
368
369 // Note that we do not traceTaskDelete here because
370 // we are not really deleting a task.
371 // The OS threads for all these tasks do not exist in
372 // this process (since we're currently
373 // in the child of a forkProcess).
374 freeTask(task);
375 }
376 }
377 all_tasks = keep;
378 keep->all_next = NULL;
379 keep->all_prev = NULL;
380 RELEASE_LOCK(&all_tasks_mutex);
381 }
382
383 #if defined(THREADED_RTS)
384
385 void
386 workerTaskStop (Task *task)
387 {
388 DEBUG_ONLY( OSThreadId id );
389 DEBUG_ONLY( id = osThreadId() );
390 ASSERT(task->id == id);
391 ASSERT(myTask() == task);
392
393 ACQUIRE_LOCK(&all_tasks_mutex);
394
395 if (task->all_prev) {
396 task->all_prev->all_next = task->all_next;
397 } else {
398 all_tasks = task->all_next;
399 }
400 if (task->all_next) {
401 task->all_next->all_prev = task->all_prev;
402 }
403
404 currentWorkerCount--;
405
406 RELEASE_LOCK(&all_tasks_mutex);
407
408 traceTaskDelete(task);
409
410 freeTask(task);
411 }
412
413 #endif
414
415 #if defined(THREADED_RTS)
416
417 static void OSThreadProcAttr
418 workerStart(Task *task)
419 {
420 Capability *cap;
421
422 // See startWorkerTask().
423 ACQUIRE_LOCK(&task->lock);
424 cap = task->cap;
425 RELEASE_LOCK(&task->lock);
426
427 if (RtsFlags.ParFlags.setAffinity) {
428 setThreadAffinity(cap->no, n_capabilities);
429 }
430
431 // set the thread-local pointer to the Task:
432 setMyTask(task);
433
434 newInCall(task);
435
436 // Everything set up; emit the event before the worker starts working.
437 traceTaskCreate(task, cap);
438
439 scheduleWorker(cap,task);
440 }
441
442 void
443 startWorkerTask (Capability *cap)
444 {
445 int r;
446 OSThreadId tid;
447 Task *task;
448
449 // A worker always gets a fresh Task structure.
450 task = newTask(rtsTrue);
451
452 // The lock here is to synchronise with taskStart(), to make sure
453 // that we have finished setting up the Task structure before the
454 // worker thread reads it.
455 ACQUIRE_LOCK(&task->lock);
456
457 // We don't emit a task creation event here, but in workerStart,
458 // where the kernel thread id is known.
459 task->cap = cap;
460
461 // Give the capability directly to the worker; we can't let anyone
462 // else get in, because the new worker Task has nowhere to go to
463 // sleep so that it could be woken up again.
464 ASSERT_LOCK_HELD(&cap->lock);
465 cap->running_task = task;
466
467 r = createOSThread(&tid, "ghc_worker", (OSThreadProc*)workerStart, task);
468 if (r != 0) {
469 sysErrorBelch("failed to create OS thread");
470 stg_exit(EXIT_FAILURE);
471 }
472
473 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
474
475 task->id = tid;
476
477 // ok, finished with the Task struct.
478 RELEASE_LOCK(&task->lock);
479 }
480
481 void
482 interruptWorkerTask (Task *task)
483 {
484 ASSERT(osThreadId() != task->id); // seppuku not allowed
485 ASSERT(task->incall->suspended_tso); // use this only for FFI calls
486 interruptOSThread(task->id);
487 debugTrace(DEBUG_sched, "interrupted worker task %#" FMT_HexWord64,
488 serialisableTaskId(task));
489 }
490
491 #endif /* THREADED_RTS */
492
493 void
494 setInCallCapability (int preferred_capability)
495 {
496 Task *task = allocTask();
497 task->preferred_capability = preferred_capability;
498 }
499
500
501 #ifdef DEBUG
502
503 void printAllTasks(void);
504
505 void
506 printAllTasks(void)
507 {
508 Task *task;
509 for (task = all_tasks; task != NULL; task = task->all_next) {
510 debugBelch("task %#" FMT_HexWord64 " is %s, ", serialisableTaskId(task),
511 task->stopped ? "stopped" : "alive");
512 if (!task->stopped) {
513 if (task->cap) {
514 debugBelch("on capability %d, ", task->cap->no);
515 }
516 if (task->incall->tso) {
517 debugBelch("bound to thread %lu",
518 (unsigned long)task->incall->tso->id);
519 } else {
520 debugBelch("worker");
521 }
522 }
523 debugBelch("\n");
524 }
525 }
526
527 #endif
528