Fix segfault with STM; fixes #8035. Patch from errge.
[ghc.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team 2001-2005
4 *
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
8 *
9 * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: all_tasks_mutex.
28 Task *all_tasks = NULL;
29
30 nat taskCount;
31 nat workerCount;
32 nat currentWorkerCount;
33 nat peakWorkerCount;
34
35 static int tasksInitialized = 0;
36
37 static void freeTask (Task *task);
38 static Task * allocTask (void);
39 static Task * newTask (rtsBool);
40
41 #if defined(THREADED_RTS)
42 static Mutex all_tasks_mutex;
43 #endif
44
45 /* -----------------------------------------------------------------------------
46 * Remembering the current thread's Task
47 * -------------------------------------------------------------------------- */
48
49 // A thread-local-storage key that we can use to get access to the
50 // current thread's Task structure.
51 #if defined(THREADED_RTS)
52 # if defined(MYTASK_USE_TLV)
53 __thread Task *my_task;
54 # else
55 ThreadLocalKey currentTaskKey;
56 # endif
57 #else
58 Task *my_task;
59 #endif
60
61 /* -----------------------------------------------------------------------------
62 * Rest of the Task API
63 * -------------------------------------------------------------------------- */
64
65 void
66 initTaskManager (void)
67 {
68 if (!tasksInitialized) {
69 taskCount = 0;
70 workerCount = 0;
71 currentWorkerCount = 0;
72 peakWorkerCount = 0;
73 tasksInitialized = 1;
74 #if defined(THREADED_RTS)
75 #if !defined(MYTASK_USE_TLV)
76 newThreadLocalKey(&currentTaskKey);
77 #endif
78 initMutex(&all_tasks_mutex);
79 #endif
80 }
81 }
82
83 nat
84 freeTaskManager (void)
85 {
86 Task *task, *next;
87 nat tasksRunning = 0;
88
89 ACQUIRE_LOCK(&all_tasks_mutex);
90
91 for (task = all_tasks; task != NULL; task = next) {
92 next = task->all_next;
93 if (task->stopped) {
94 freeTask(task);
95 } else {
96 tasksRunning++;
97 }
98 }
99
100 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
101 tasksRunning);
102
103 all_tasks = NULL;
104
105 RELEASE_LOCK(&all_tasks_mutex);
106
107 #if defined(THREADED_RTS)
108 closeMutex(&all_tasks_mutex);
109 #if !defined(MYTASK_USE_TLV)
110 freeThreadLocalKey(&currentTaskKey);
111 #endif
112 #endif
113
114 tasksInitialized = 0;
115
116 return tasksRunning;
117 }
118
119 static Task *
120 allocTask (void)
121 {
122 Task *task;
123
124 task = myTask();
125 if (task != NULL) {
126 return task;
127 } else {
128 task = newTask(rtsFalse);
129 #if defined(THREADED_RTS)
130 task->id = osThreadId();
131 #endif
132 setMyTask(task);
133 return task;
134 }
135 }
136
137 static void
138 freeTask (Task *task)
139 {
140 InCall *incall, *next;
141
142 // We only free resources if the Task is not in use. A
143 // Task may still be in use if we have a Haskell thread in
144 // a foreign call while we are attempting to shut down the
145 // RTS (see conc059).
146 #if defined(THREADED_RTS)
147 closeCondition(&task->cond);
148 closeMutex(&task->lock);
149 #endif
150
151 for (incall = task->incall; incall != NULL; incall = next) {
152 next = incall->prev_stack;
153 stgFree(incall);
154 }
155 for (incall = task->spare_incalls; incall != NULL; incall = next) {
156 next = incall->next;
157 stgFree(incall);
158 }
159
160 stgFree(task);
161 }
162
163 static Task*
164 newTask (rtsBool worker)
165 {
166 Task *task;
167
168 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
169 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
170
171 task->cap = NULL;
172 task->worker = worker;
173 task->stopped = rtsFalse;
174 task->running_finalizers = rtsFalse;
175 task->n_spare_incalls = 0;
176 task->spare_incalls = NULL;
177 task->incall = NULL;
178
179 #if defined(THREADED_RTS)
180 initCondition(&task->cond);
181 initMutex(&task->lock);
182 task->wakeup = rtsFalse;
183 #endif
184
185 task->next = NULL;
186
187 ACQUIRE_LOCK(&all_tasks_mutex);
188
189 task->all_prev = NULL;
190 task->all_next = all_tasks;
191 if (all_tasks != NULL) {
192 all_tasks->all_prev = task;
193 }
194 all_tasks = task;
195
196 taskCount++;
197 if (worker) {
198 workerCount++;
199 currentWorkerCount++;
200 if (currentWorkerCount > peakWorkerCount) {
201 peakWorkerCount = currentWorkerCount;
202 }
203 }
204 RELEASE_LOCK(&all_tasks_mutex);
205
206 return task;
207 }
208
209 // avoid the spare_incalls list growing unboundedly
210 #define MAX_SPARE_INCALLS 8
211
212 static void
213 newInCall (Task *task)
214 {
215 InCall *incall;
216
217 if (task->spare_incalls != NULL) {
218 incall = task->spare_incalls;
219 task->spare_incalls = incall->next;
220 task->n_spare_incalls--;
221 } else {
222 incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
223 }
224
225 incall->tso = NULL;
226 incall->task = task;
227 incall->suspended_tso = NULL;
228 incall->suspended_cap = NULL;
229 incall->stat = NoStatus;
230 incall->ret = NULL;
231 incall->next = NULL;
232 incall->prev = NULL;
233 incall->prev_stack = task->incall;
234 task->incall = incall;
235 }
236
237 static void
238 endInCall (Task *task)
239 {
240 InCall *incall;
241
242 incall = task->incall;
243 incall->tso = NULL;
244 task->incall = task->incall->prev_stack;
245
246 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
247 stgFree(incall);
248 } else {
249 incall->next = task->spare_incalls;
250 task->spare_incalls = incall;
251 task->n_spare_incalls++;
252 }
253 }
254
255
256 Task *
257 newBoundTask (void)
258 {
259 Task *task;
260
261 if (!tasksInitialized) {
262 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
263 stg_exit(EXIT_FAILURE);
264 }
265
266 task = allocTask();
267
268 task->stopped = rtsFalse;
269
270 newInCall(task);
271
272 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
273 return task;
274 }
275
276 void
277 boundTaskExiting (Task *task)
278 {
279 #if defined(THREADED_RTS)
280 ASSERT(osThreadId() == task->id);
281 #endif
282 ASSERT(myTask() == task);
283
284 endInCall(task);
285
286 // Set task->stopped, but only if this is the last call (#4850).
287 // Remember that we might have a worker Task that makes a foreign
288 // call and then a callback, so it can transform into a bound
289 // Task for the duration of the callback.
290 if (task->incall == NULL) {
291 task->stopped = rtsTrue;
292 }
293
294 debugTrace(DEBUG_sched, "task exiting");
295 }
296
297
298 #ifdef THREADED_RTS
299 #define TASK_ID(t) (t)->id
300 #else
301 #define TASK_ID(t) (t)
302 #endif
303
304 void
305 discardTasksExcept (Task *keep)
306 {
307 Task *task, *next;
308
309 // Wipe the task list, except the current Task.
310 ACQUIRE_LOCK(&all_tasks_mutex);
311 for (task = all_tasks; task != NULL; task=next) {
312 next = task->all_next;
313 if (task != keep) {
314 debugTrace(DEBUG_sched, "discarding task %" FMT_SizeT "", (size_t)TASK_ID(task));
315 // Note that we do not traceTaskDelete here because
316 // we are not really deleting a task.
317 // The OS threads for all these tasks do not exist in
318 // this process (since we're currently
319 // in the child of a forkProcess).
320 freeTask(task);
321 }
322 }
323 all_tasks = keep;
324 keep->all_next = NULL;
325 keep->all_prev = NULL;
326 RELEASE_LOCK(&all_tasks_mutex);
327 }
328
329 //
330 // After the capabilities[] array has moved, we have to adjust all
331 // (Capability *) pointers to point to the new array. The old array
332 // is still valid at this point.
333 //
334 void updateCapabilityRefs (void)
335 {
336 Task *task;
337 InCall *incall;
338
339 ACQUIRE_LOCK(&all_tasks_mutex);
340
341 for (task = all_tasks; task != NULL; task=task->all_next) {
342 if (task->cap != NULL) {
343 task->cap = &capabilities[task->cap->no];
344 }
345
346 for (incall = task->incall; incall != NULL; incall = incall->prev_stack) {
347 if (incall->suspended_cap != NULL) {
348 incall->suspended_cap = &capabilities[incall->suspended_cap->no];
349 }
350 }
351 }
352
353 RELEASE_LOCK(&all_tasks_mutex);
354 }
355
356
357 #if defined(THREADED_RTS)
358
359 void
360 workerTaskStop (Task *task)
361 {
362 DEBUG_ONLY( OSThreadId id );
363 DEBUG_ONLY( id = osThreadId() );
364 ASSERT(task->id == id);
365 ASSERT(myTask() == task);
366
367 ACQUIRE_LOCK(&all_tasks_mutex);
368
369 if (task->all_prev) {
370 task->all_prev->all_next = task->all_next;
371 } else {
372 all_tasks = task->all_next;
373 }
374 if (task->all_next) {
375 task->all_next->all_prev = task->all_prev;
376 }
377
378 currentWorkerCount--;
379
380 RELEASE_LOCK(&all_tasks_mutex);
381
382 traceTaskDelete(task);
383
384 freeTask(task);
385 }
386
387 #endif
388
389 #if defined(THREADED_RTS)
390
391 static void OSThreadProcAttr
392 workerStart(Task *task)
393 {
394 Capability *cap;
395
396 // See startWorkerTask().
397 ACQUIRE_LOCK(&task->lock);
398 cap = task->cap;
399 RELEASE_LOCK(&task->lock);
400
401 if (RtsFlags.ParFlags.setAffinity) {
402 setThreadAffinity(cap->no, n_capabilities);
403 }
404
405 // set the thread-local pointer to the Task:
406 setMyTask(task);
407
408 newInCall(task);
409
410 // Everything set up; emit the event before the worker starts working.
411 traceTaskCreate(task, cap);
412
413 scheduleWorker(cap,task);
414 }
415
416 void
417 startWorkerTask (Capability *cap)
418 {
419 int r;
420 OSThreadId tid;
421 Task *task;
422
423 // A worker always gets a fresh Task structure.
424 task = newTask(rtsTrue);
425
426 // The lock here is to synchronise with taskStart(), to make sure
427 // that we have finished setting up the Task structure before the
428 // worker thread reads it.
429 ACQUIRE_LOCK(&task->lock);
430
431 // We don't emit a task creation event here, but in workerStart,
432 // where the kernel thread id is known.
433 task->cap = cap;
434
435 // Give the capability directly to the worker; we can't let anyone
436 // else get in, because the new worker Task has nowhere to go to
437 // sleep so that it could be woken up again.
438 ASSERT_LOCK_HELD(&cap->lock);
439 cap->running_task = task;
440
441 r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
442 if (r != 0) {
443 sysErrorBelch("failed to create OS thread");
444 stg_exit(EXIT_FAILURE);
445 }
446
447 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
448
449 task->id = tid;
450
451 // ok, finished with the Task struct.
452 RELEASE_LOCK(&task->lock);
453 }
454
455 void
456 interruptWorkerTask (Task *task)
457 {
458 ASSERT(osThreadId() != task->id); // seppuku not allowed
459 ASSERT(task->incall->suspended_tso); // use this only for FFI calls
460 interruptOSThread(task->id);
461 debugTrace(DEBUG_sched, "interrupted worker task %#" FMT_HexWord64,
462 serialisableTaskId(task));
463 }
464
465 #endif /* THREADED_RTS */
466
467 #ifdef DEBUG
468
469 void printAllTasks(void);
470
471 void
472 printAllTasks(void)
473 {
474 Task *task;
475 for (task = all_tasks; task != NULL; task = task->all_next) {
476 debugBelch("task %#" FMT_HexWord64 " is %s, ", serialisableTaskId(task),
477 task->stopped ? "stopped" : "alive");
478 if (!task->stopped) {
479 if (task->cap) {
480 debugBelch("on capability %d, ", task->cap->no);
481 }
482 if (task->incall->tso) {
483 debugBelch("bound to thread %lu",
484 (unsigned long)task->incall->tso->id);
485 } else {
486 debugBelch("worker");
487 }
488 }
489 debugBelch("\n");
490 }
491 }
492
493 #endif
494