Merge branch 'master' of http://darcs.haskell.org/ghc
[ghc.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team 2001-2005
4 *
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
8 *
9 * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: all_tasks_mutex.
28 Task *all_tasks = NULL;
29 static nat taskCount;
30 static int tasksInitialized = 0;
31
32 static void freeTask (Task *task);
33 static Task * allocTask (void);
34 static Task * newTask (rtsBool);
35
36 #if defined(THREADED_RTS)
37 static Mutex all_tasks_mutex;
38 #endif
39
40 /* -----------------------------------------------------------------------------
41 * Remembering the current thread's Task
42 * -------------------------------------------------------------------------- */
43
44 // A thread-local-storage key that we can use to get access to the
45 // current thread's Task structure.
46 #if defined(THREADED_RTS)
47 # if defined(MYTASK_USE_TLV)
48 __thread Task *my_task;
49 # else
50 ThreadLocalKey currentTaskKey;
51 # endif
52 #else
53 Task *my_task;
54 #endif
55
56 /* -----------------------------------------------------------------------------
57 * Rest of the Task API
58 * -------------------------------------------------------------------------- */
59
60 void
61 initTaskManager (void)
62 {
63 if (!tasksInitialized) {
64 taskCount = 0;
65 tasksInitialized = 1;
66 #if defined(THREADED_RTS)
67 #if !defined(MYTASK_USE_TLV)
68 newThreadLocalKey(&currentTaskKey);
69 #endif
70 initMutex(&all_tasks_mutex);
71 #endif
72 }
73 }
74
75 nat
76 freeTaskManager (void)
77 {
78 Task *task, *next;
79 nat tasksRunning = 0;
80
81 ACQUIRE_LOCK(&all_tasks_mutex);
82
83 for (task = all_tasks; task != NULL; task = next) {
84 next = task->all_link;
85 if (task->stopped) {
86 freeTask(task);
87 } else {
88 tasksRunning++;
89 }
90 }
91
92 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
93 tasksRunning);
94
95 all_tasks = NULL;
96
97 RELEASE_LOCK(&all_tasks_mutex);
98
99 #if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
100 closeMutex(&all_tasks_mutex);
101 freeThreadLocalKey(&currentTaskKey);
102 #endif
103
104 tasksInitialized = 0;
105
106 return tasksRunning;
107 }
108
109 static Task *
110 allocTask (void)
111 {
112 Task *task;
113
114 task = myTask();
115 if (task != NULL) {
116 return task;
117 } else {
118 task = newTask(rtsFalse);
119 #if defined(THREADED_RTS)
120 task->id = osThreadId();
121 #endif
122 setMyTask(task);
123 return task;
124 }
125 }
126
127 static void
128 freeTask (Task *task)
129 {
130 InCall *incall, *next;
131
132 // We only free resources if the Task is not in use. A
133 // Task may still be in use if we have a Haskell thread in
134 // a foreign call while we are attempting to shut down the
135 // RTS (see conc059).
136 #if defined(THREADED_RTS)
137 closeCondition(&task->cond);
138 closeMutex(&task->lock);
139 #endif
140
141 for (incall = task->incall; incall != NULL; incall = next) {
142 next = incall->prev_stack;
143 stgFree(incall);
144 }
145 for (incall = task->spare_incalls; incall != NULL; incall = next) {
146 next = incall->next;
147 stgFree(incall);
148 }
149
150 stgFree(task);
151 }
152
153 static Task*
154 newTask (rtsBool worker)
155 {
156 #if defined(THREADED_RTS)
157 Ticks currentElapsedTime, currentUserTime;
158 #endif
159 Task *task;
160
161 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
162 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
163
164 task->cap = NULL;
165 task->worker = worker;
166 task->stopped = rtsFalse;
167 task->running_finalizers = rtsFalse;
168 task->n_spare_incalls = 0;
169 task->spare_incalls = NULL;
170 task->incall = NULL;
171
172 #if defined(THREADED_RTS)
173 initCondition(&task->cond);
174 initMutex(&task->lock);
175 task->wakeup = rtsFalse;
176 #endif
177
178 #if defined(THREADED_RTS)
179 currentUserTime = getThreadCPUTime();
180 currentElapsedTime = getProcessElapsedTime();
181 task->mut_time = 0;
182 task->mut_etime = 0;
183 task->gc_time = 0;
184 task->gc_etime = 0;
185 task->muttimestart = currentUserTime;
186 task->elapsedtimestart = currentElapsedTime;
187 #endif
188
189 task->next = NULL;
190
191 ACQUIRE_LOCK(&all_tasks_mutex);
192
193 task->all_link = all_tasks;
194 all_tasks = task;
195
196 taskCount++;
197
198 RELEASE_LOCK(&all_tasks_mutex);
199
200 return task;
201 }
202
203 // avoid the spare_incalls list growing unboundedly
204 #define MAX_SPARE_INCALLS 8
205
206 static void
207 newInCall (Task *task)
208 {
209 InCall *incall;
210
211 if (task->spare_incalls != NULL) {
212 incall = task->spare_incalls;
213 task->spare_incalls = incall->next;
214 task->n_spare_incalls--;
215 } else {
216 incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
217 }
218
219 incall->tso = NULL;
220 incall->task = task;
221 incall->suspended_tso = NULL;
222 incall->suspended_cap = NULL;
223 incall->stat = NoStatus;
224 incall->ret = NULL;
225 incall->next = NULL;
226 incall->prev = NULL;
227 incall->prev_stack = task->incall;
228 task->incall = incall;
229 }
230
231 static void
232 endInCall (Task *task)
233 {
234 InCall *incall;
235
236 incall = task->incall;
237 incall->tso = NULL;
238 task->incall = task->incall->prev_stack;
239
240 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
241 stgFree(incall);
242 } else {
243 incall->next = task->spare_incalls;
244 task->spare_incalls = incall;
245 task->n_spare_incalls++;
246 }
247 }
248
249
250 Task *
251 newBoundTask (void)
252 {
253 Task *task;
254
255 if (!tasksInitialized) {
256 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
257 stg_exit(EXIT_FAILURE);
258 }
259
260 task = allocTask();
261
262 task->stopped = rtsFalse;
263
264 newInCall(task);
265
266 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
267 return task;
268 }
269
270 void
271 boundTaskExiting (Task *task)
272 {
273 #if defined(THREADED_RTS)
274 ASSERT(osThreadId() == task->id);
275 #endif
276 ASSERT(myTask() == task);
277
278 endInCall(task);
279
280 // Set task->stopped, but only if this is the last call (#4850).
281 // Remember that we might have a worker Task that makes a foreign
282 // call and then a callback, so it can transform into a bound
283 // Task for the duration of the callback.
284 if (task->incall == NULL) {
285 task->stopped = rtsTrue;
286 }
287
288 debugTrace(DEBUG_sched, "task exiting");
289 }
290
291
292 #ifdef THREADED_RTS
293 #define TASK_ID(t) (t)->id
294 #else
295 #define TASK_ID(t) (t)
296 #endif
297
298 void
299 discardTasksExcept (Task *keep)
300 {
301 Task *task, *next;
302
303 // Wipe the task list, except the current Task.
304 ACQUIRE_LOCK(&all_tasks_mutex);
305 for (task = all_tasks; task != NULL; task=next) {
306 next = task->all_link;
307 if (task != keep) {
308 debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
309 freeTask(task);
310 }
311 }
312 all_tasks = keep;
313 keep->all_link = NULL;
314 RELEASE_LOCK(&all_tasks_mutex);
315 }
316
317 void
318 taskTimeStamp (Task *task USED_IF_THREADS)
319 {
320 #if defined(THREADED_RTS)
321 Ticks currentElapsedTime, currentUserTime;
322
323 currentUserTime = getThreadCPUTime();
324 currentElapsedTime = getProcessElapsedTime();
325
326 task->mut_time =
327 currentUserTime - task->muttimestart - task->gc_time;
328 task->mut_etime =
329 currentElapsedTime - task->elapsedtimestart - task->gc_etime;
330
331 if (task->gc_time < 0) { task->gc_time = 0; }
332 if (task->gc_etime < 0) { task->gc_etime = 0; }
333 if (task->mut_time < 0) { task->mut_time = 0; }
334 if (task->mut_etime < 0) { task->mut_etime = 0; }
335 #endif
336 }
337
338 void
339 taskDoneGC (Task *task, Ticks cpu_time, Ticks elapsed_time)
340 {
341 task->gc_time += cpu_time;
342 task->gc_etime += elapsed_time;
343 }
344
345 #if defined(THREADED_RTS)
346
347 void
348 workerTaskStop (Task *task)
349 {
350 DEBUG_ONLY( OSThreadId id );
351 DEBUG_ONLY( id = osThreadId() );
352 ASSERT(task->id == id);
353 ASSERT(myTask() == task);
354
355 task->cap = NULL;
356 taskTimeStamp(task);
357 task->stopped = rtsTrue;
358 }
359
360 #endif
361
362 #ifdef DEBUG
363
364 static void *taskId(Task *task)
365 {
366 #ifdef THREADED_RTS
367 return (void *)task->id;
368 #else
369 return (void *)task;
370 #endif
371 }
372
373 #endif
374
375 #if defined(THREADED_RTS)
376
377 static void OSThreadProcAttr
378 workerStart(Task *task)
379 {
380 Capability *cap;
381
382 // See startWorkerTask().
383 ACQUIRE_LOCK(&task->lock);
384 cap = task->cap;
385 RELEASE_LOCK(&task->lock);
386
387 if (RtsFlags.ParFlags.setAffinity) {
388 setThreadAffinity(cap->no, n_capabilities);
389 }
390
391 // set the thread-local pointer to the Task:
392 setMyTask(task);
393
394 newInCall(task);
395
396 scheduleWorker(cap,task);
397 }
398
399 void
400 startWorkerTask (Capability *cap)
401 {
402 int r;
403 OSThreadId tid;
404 Task *task;
405
406 // A worker always gets a fresh Task structure.
407 task = newTask(rtsTrue);
408
409 // The lock here is to synchronise with taskStart(), to make sure
410 // that we have finished setting up the Task structure before the
411 // worker thread reads it.
412 ACQUIRE_LOCK(&task->lock);
413
414 task->cap = cap;
415
416 // Give the capability directly to the worker; we can't let anyone
417 // else get in, because the new worker Task has nowhere to go to
418 // sleep so that it could be woken up again.
419 ASSERT_LOCK_HELD(&cap->lock);
420 cap->running_task = task;
421
422 r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
423 if (r != 0) {
424 sysErrorBelch("failed to create OS thread");
425 stg_exit(EXIT_FAILURE);
426 }
427
428 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
429
430 task->id = tid;
431
432 // ok, finished with the Task struct.
433 RELEASE_LOCK(&task->lock);
434 }
435
436 void
437 interruptWorkerTask (Task *task)
438 {
439 ASSERT(osThreadId() != task->id); // seppuku not allowed
440 ASSERT(task->incall->suspended_tso); // use this only for FFI calls
441 interruptOSThread(task->id);
442 debugTrace(DEBUG_sched, "interrupted worker task %p", taskId(task));
443 }
444
445 #endif /* THREADED_RTS */
446
447 #ifdef DEBUG
448
449 void printAllTasks(void);
450
451 void
452 printAllTasks(void)
453 {
454 Task *task;
455 for (task = all_tasks; task != NULL; task = task->all_link) {
456 debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
457 if (!task->stopped) {
458 if (task->cap) {
459 debugBelch("on capability %d, ", task->cap->no);
460 }
461 if (task->incall->tso) {
462 debugBelch("bound to thread %lu",
463 (unsigned long)task->incall->tso->id);
464 } else {
465 debugBelch("worker");
466 }
467 }
468 debugBelch("\n");
469 }
470 }
471
472 #endif
473