RTS: Add setInCallCapability()
[ghc.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team 2001-2005
4 *
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
8 *
9 * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: all_tasks_mutex.
28 Task *all_tasks = NULL;
29
30 nat taskCount; // current number of bound tasks + total number of worker tasks.
31 nat workerCount;
32 nat currentWorkerCount;
33 nat peakWorkerCount;
34
35 static int tasksInitialized = 0;
36
37 static void freeTask (Task *task);
38 static Task * allocTask (void);
39 static Task * newTask (rtsBool);
40
41 #if defined(THREADED_RTS)
42 Mutex all_tasks_mutex;
43 #endif
44
45 /* -----------------------------------------------------------------------------
46 * Remembering the current thread's Task
47 * -------------------------------------------------------------------------- */
48
49 // A thread-local-storage key that we can use to get access to the
50 // current thread's Task structure.
51 #if defined(THREADED_RTS)
52 # if defined(MYTASK_USE_TLV)
53 __thread Task *my_task;
54 # else
55 ThreadLocalKey currentTaskKey;
56 # endif
57 #else
58 Task *my_task;
59 #endif
60
61 /* -----------------------------------------------------------------------------
62 * Rest of the Task API
63 * -------------------------------------------------------------------------- */
64
65 void
66 initTaskManager (void)
67 {
68 if (!tasksInitialized) {
69 taskCount = 0;
70 workerCount = 0;
71 currentWorkerCount = 0;
72 peakWorkerCount = 0;
73 tasksInitialized = 1;
74 #if defined(THREADED_RTS)
75 #if !defined(MYTASK_USE_TLV)
76 newThreadLocalKey(&currentTaskKey);
77 #endif
78 initMutex(&all_tasks_mutex);
79 #endif
80 }
81 }
82
83 nat
84 freeTaskManager (void)
85 {
86 Task *task, *next;
87 nat tasksRunning = 0;
88
89 ACQUIRE_LOCK(&all_tasks_mutex);
90
91 for (task = all_tasks; task != NULL; task = next) {
92 next = task->all_next;
93 if (task->stopped) {
94 freeTask(task);
95 } else {
96 tasksRunning++;
97 }
98 }
99
100 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
101 tasksRunning);
102
103 all_tasks = NULL;
104
105 RELEASE_LOCK(&all_tasks_mutex);
106
107 #if defined(THREADED_RTS)
108 closeMutex(&all_tasks_mutex);
109 #if !defined(MYTASK_USE_TLV)
110 freeThreadLocalKey(&currentTaskKey);
111 #endif
112 #endif
113
114 tasksInitialized = 0;
115
116 return tasksRunning;
117 }
118
119 static Task *
120 allocTask (void)
121 {
122 Task *task;
123
124 task = myTask();
125 if (task != NULL) {
126 return task;
127 } else {
128 task = newTask(rtsFalse);
129 #if defined(THREADED_RTS)
130 task->id = osThreadId();
131 #endif
132 setMyTask(task);
133 return task;
134 }
135 }
136
137 void freeMyTask (void)
138 {
139 Task *task;
140
141 task = myTask();
142
143 if (task == NULL) return;
144
145 if (!task->stopped) {
146 errorBelch(
147 "freeMyTask() called, but the Task is not stopped; ignoring");
148 return;
149 }
150
151 if (task->worker) {
152 errorBelch("freeMyTask() called on a worker; ignoring");
153 return;
154 }
155
156 ACQUIRE_LOCK(&all_tasks_mutex);
157
158 if (task->all_prev) {
159 task->all_prev->all_next = task->all_next;
160 } else {
161 all_tasks = task->all_next;
162 }
163 if (task->all_next) {
164 task->all_next->all_prev = task->all_prev;
165 }
166
167 taskCount--;
168
169 RELEASE_LOCK(&all_tasks_mutex);
170
171 freeTask(task);
172 setMyTask(NULL);
173 }
174
175 static void
176 freeTask (Task *task)
177 {
178 InCall *incall, *next;
179
180 // We only free resources if the Task is not in use. A
181 // Task may still be in use if we have a Haskell thread in
182 // a foreign call while we are attempting to shut down the
183 // RTS (see conc059).
184 #if defined(THREADED_RTS)
185 closeCondition(&task->cond);
186 closeMutex(&task->lock);
187 #endif
188
189 for (incall = task->incall; incall != NULL; incall = next) {
190 next = incall->prev_stack;
191 stgFree(incall);
192 }
193 for (incall = task->spare_incalls; incall != NULL; incall = next) {
194 next = incall->next;
195 stgFree(incall);
196 }
197
198 stgFree(task);
199 }
200
201 static Task*
202 newTask (rtsBool worker)
203 {
204 Task *task;
205
206 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
207 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
208
209 task->cap = NULL;
210 task->worker = worker;
211 task->stopped = rtsFalse;
212 task->running_finalizers = rtsFalse;
213 task->n_spare_incalls = 0;
214 task->spare_incalls = NULL;
215 task->incall = NULL;
216 task->preferred_capability = -1;
217
218 #if defined(THREADED_RTS)
219 initCondition(&task->cond);
220 initMutex(&task->lock);
221 task->wakeup = rtsFalse;
222 #endif
223
224 task->next = NULL;
225
226 ACQUIRE_LOCK(&all_tasks_mutex);
227
228 task->all_prev = NULL;
229 task->all_next = all_tasks;
230 if (all_tasks != NULL) {
231 all_tasks->all_prev = task;
232 }
233 all_tasks = task;
234
235 taskCount++;
236 if (worker) {
237 workerCount++;
238 currentWorkerCount++;
239 if (currentWorkerCount > peakWorkerCount) {
240 peakWorkerCount = currentWorkerCount;
241 }
242 }
243 RELEASE_LOCK(&all_tasks_mutex);
244
245 return task;
246 }
247
248 // avoid the spare_incalls list growing unboundedly
249 #define MAX_SPARE_INCALLS 8
250
251 static void
252 newInCall (Task *task)
253 {
254 InCall *incall;
255
256 if (task->spare_incalls != NULL) {
257 incall = task->spare_incalls;
258 task->spare_incalls = incall->next;
259 task->n_spare_incalls--;
260 } else {
261 incall = stgMallocBytes((sizeof(InCall)), "newInCall");
262 }
263
264 incall->tso = NULL;
265 incall->task = task;
266 incall->suspended_tso = NULL;
267 incall->suspended_cap = NULL;
268 incall->rstat = NoStatus;
269 incall->ret = NULL;
270 incall->next = NULL;
271 incall->prev = NULL;
272 incall->prev_stack = task->incall;
273 task->incall = incall;
274 }
275
276 static void
277 endInCall (Task *task)
278 {
279 InCall *incall;
280
281 incall = task->incall;
282 incall->tso = NULL;
283 task->incall = task->incall->prev_stack;
284
285 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
286 stgFree(incall);
287 } else {
288 incall->next = task->spare_incalls;
289 task->spare_incalls = incall;
290 task->n_spare_incalls++;
291 }
292 }
293
294
295 Task *
296 newBoundTask (void)
297 {
298 Task *task;
299
300 if (!tasksInitialized) {
301 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
302 stg_exit(EXIT_FAILURE);
303 }
304
305 task = allocTask();
306
307 task->stopped = rtsFalse;
308
309 newInCall(task);
310
311 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
312 return task;
313 }
314
315 void
316 boundTaskExiting (Task *task)
317 {
318 #if defined(THREADED_RTS)
319 ASSERT(osThreadId() == task->id);
320 #endif
321 ASSERT(myTask() == task);
322
323 endInCall(task);
324
325 // Set task->stopped, but only if this is the last call (#4850).
326 // Remember that we might have a worker Task that makes a foreign
327 // call and then a callback, so it can transform into a bound
328 // Task for the duration of the callback.
329 if (task->incall == NULL) {
330 task->stopped = rtsTrue;
331 }
332
333 debugTrace(DEBUG_sched, "task exiting");
334 }
335
336
337 #ifdef THREADED_RTS
338 #define TASK_ID(t) (t)->id
339 #else
340 #define TASK_ID(t) (t)
341 #endif
342
343 void
344 discardTasksExcept (Task *keep)
345 {
346 Task *task, *next;
347
348 // Wipe the task list, except the current Task.
349 ACQUIRE_LOCK(&all_tasks_mutex);
350 for (task = all_tasks; task != NULL; task=next) {
351 next = task->all_next;
352 if (task != keep) {
353 debugTrace(DEBUG_sched, "discarding task %" FMT_SizeT "", (size_t)TASK_ID(task));
354 #if defined(THREADED_RTS)
355 // It is possible that some of these tasks are currently blocked
356 // (in the parent process) either on their condition variable
357 // `cond` or on their mutex `lock`. If they are we may deadlock
358 // when `freeTask` attempts to call `closeCondition` or
359 // `closeMutex` (the behaviour of these functions is documented to
360 // be undefined in the case that there are threads blocked on
361 // them). To avoid this, we re-initialize both the condition
362 // variable and the mutex before calling `freeTask` (we do
363 // precisely the same for all global locks in `forkProcess`).
364 initCondition(&task->cond);
365 initMutex(&task->lock);
366 #endif
367
368 // Note that we do not traceTaskDelete here because
369 // we are not really deleting a task.
370 // The OS threads for all these tasks do not exist in
371 // this process (since we're currently
372 // in the child of a forkProcess).
373 freeTask(task);
374 }
375 }
376 all_tasks = keep;
377 keep->all_next = NULL;
378 keep->all_prev = NULL;
379 RELEASE_LOCK(&all_tasks_mutex);
380 }
381
382 #if defined(THREADED_RTS)
383
384 void
385 workerTaskStop (Task *task)
386 {
387 DEBUG_ONLY( OSThreadId id );
388 DEBUG_ONLY( id = osThreadId() );
389 ASSERT(task->id == id);
390 ASSERT(myTask() == task);
391
392 ACQUIRE_LOCK(&all_tasks_mutex);
393
394 if (task->all_prev) {
395 task->all_prev->all_next = task->all_next;
396 } else {
397 all_tasks = task->all_next;
398 }
399 if (task->all_next) {
400 task->all_next->all_prev = task->all_prev;
401 }
402
403 currentWorkerCount--;
404
405 RELEASE_LOCK(&all_tasks_mutex);
406
407 traceTaskDelete(task);
408
409 freeTask(task);
410 }
411
412 #endif
413
414 #if defined(THREADED_RTS)
415
416 static void OSThreadProcAttr
417 workerStart(Task *task)
418 {
419 Capability *cap;
420
421 // See startWorkerTask().
422 ACQUIRE_LOCK(&task->lock);
423 cap = task->cap;
424 RELEASE_LOCK(&task->lock);
425
426 if (RtsFlags.ParFlags.setAffinity) {
427 setThreadAffinity(cap->no, n_capabilities);
428 }
429
430 // set the thread-local pointer to the Task:
431 setMyTask(task);
432
433 newInCall(task);
434
435 // Everything set up; emit the event before the worker starts working.
436 traceTaskCreate(task, cap);
437
438 scheduleWorker(cap,task);
439 }
440
441 void
442 startWorkerTask (Capability *cap)
443 {
444 int r;
445 OSThreadId tid;
446 Task *task;
447
448 // A worker always gets a fresh Task structure.
449 task = newTask(rtsTrue);
450
451 // The lock here is to synchronise with taskStart(), to make sure
452 // that we have finished setting up the Task structure before the
453 // worker thread reads it.
454 ACQUIRE_LOCK(&task->lock);
455
456 // We don't emit a task creation event here, but in workerStart,
457 // where the kernel thread id is known.
458 task->cap = cap;
459
460 // Give the capability directly to the worker; we can't let anyone
461 // else get in, because the new worker Task has nowhere to go to
462 // sleep so that it could be woken up again.
463 ASSERT_LOCK_HELD(&cap->lock);
464 cap->running_task = task;
465
466 r = createOSThread(&tid, "ghc_worker", (OSThreadProc*)workerStart, task);
467 if (r != 0) {
468 sysErrorBelch("failed to create OS thread");
469 stg_exit(EXIT_FAILURE);
470 }
471
472 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
473
474 task->id = tid;
475
476 // ok, finished with the Task struct.
477 RELEASE_LOCK(&task->lock);
478 }
479
480 void
481 interruptWorkerTask (Task *task)
482 {
483 ASSERT(osThreadId() != task->id); // seppuku not allowed
484 ASSERT(task->incall->suspended_tso); // use this only for FFI calls
485 interruptOSThread(task->id);
486 debugTrace(DEBUG_sched, "interrupted worker task %#" FMT_HexWord64,
487 serialisableTaskId(task));
488 }
489
490 #endif /* THREADED_RTS */
491
492 void
493 setInCallCapability (int preferred_capability)
494 {
495 Task *task = allocTask();
496 task->preferred_capability = preferred_capability;
497 }
498
499
500 #ifdef DEBUG
501
502 void printAllTasks(void);
503
504 void
505 printAllTasks(void)
506 {
507 Task *task;
508 for (task = all_tasks; task != NULL; task = task->all_next) {
509 debugBelch("task %#" FMT_HexWord64 " is %s, ", serialisableTaskId(task),
510 task->stopped ? "stopped" : "alive");
511 if (!task->stopped) {
512 if (task->cap) {
513 debugBelch("on capability %d, ", task->cap->no);
514 }
515 if (task->incall->tso) {
516 debugBelch("bound to thread %lu",
517 (unsigned long)task->incall->tso->id);
518 } else {
519 debugBelch("worker");
520 }
521 }
522 debugBelch("\n");
523 }
524 }
525
526 #endif
527