Merge branch 'master' of http://darcs.haskell.org/ghc
[ghc.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team 2001-2005
4 *
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
8 *
9 * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #if HAVE_SIGNAL_H
23 #include <signal.h>
24 #endif
25
26 // Task lists and global counters.
27 // Locks required: all_tasks_mutex.
28 Task *all_tasks = NULL;
29 static nat taskCount;
30 static int tasksInitialized = 0;
31
32 static void freeTask (Task *task);
33 static Task * allocTask (void);
34 static Task * newTask (rtsBool);
35
36 #if defined(THREADED_RTS)
37 static Mutex all_tasks_mutex;
38 #endif
39
40 /* -----------------------------------------------------------------------------
41 * Remembering the current thread's Task
42 * -------------------------------------------------------------------------- */
43
44 // A thread-local-storage key that we can use to get access to the
45 // current thread's Task structure.
46 #if defined(THREADED_RTS)
47 # if defined(MYTASK_USE_TLV)
48 __thread Task *my_task;
49 # else
50 ThreadLocalKey currentTaskKey;
51 # endif
52 #ifdef llvm_CC_FLAVOR
53 ThreadLocalKey gctKey;
54 #endif
55 #else
56 Task *my_task;
57 #endif
58
59 /* -----------------------------------------------------------------------------
60 * Rest of the Task API
61 * -------------------------------------------------------------------------- */
62
63 void
64 initTaskManager (void)
65 {
66 if (!tasksInitialized) {
67 taskCount = 0;
68 tasksInitialized = 1;
69 #if defined(THREADED_RTS)
70 #if !defined(MYTASK_USE_TLV)
71 newThreadLocalKey(&currentTaskKey);
72 #endif
73 #if defined(llvm_CC_FLAVOR)
74 newThreadLocalKey(&gctKey);
75 #endif
76 initMutex(&all_tasks_mutex);
77 #endif
78 }
79 }
80
81 nat
82 freeTaskManager (void)
83 {
84 Task *task, *next;
85 nat tasksRunning = 0;
86
87 ACQUIRE_LOCK(&all_tasks_mutex);
88
89 for (task = all_tasks; task != NULL; task = next) {
90 next = task->all_link;
91 if (task->stopped) {
92 freeTask(task);
93 } else {
94 tasksRunning++;
95 }
96 }
97
98 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
99 tasksRunning);
100
101 all_tasks = NULL;
102
103 RELEASE_LOCK(&all_tasks_mutex);
104
105 #if defined(THREADED_RTS)
106 closeMutex(&all_tasks_mutex);
107 #if !defined(MYTASK_USE_TLV)
108 freeThreadLocalKey(&currentTaskKey);
109 #endif
110 #if defined(llvm_CC_FLAVOR)
111 freeThreadLocalKey(&gctKey);
112 #endif
113 #endif
114
115 tasksInitialized = 0;
116
117 return tasksRunning;
118 }
119
120 static Task *
121 allocTask (void)
122 {
123 Task *task;
124
125 task = myTask();
126 if (task != NULL) {
127 return task;
128 } else {
129 task = newTask(rtsFalse);
130 #if defined(THREADED_RTS)
131 task->id = osThreadId();
132 #endif
133 setMyTask(task);
134 return task;
135 }
136 }
137
138 static void
139 freeTask (Task *task)
140 {
141 InCall *incall, *next;
142
143 // We only free resources if the Task is not in use. A
144 // Task may still be in use if we have a Haskell thread in
145 // a foreign call while we are attempting to shut down the
146 // RTS (see conc059).
147 #if defined(THREADED_RTS)
148 closeCondition(&task->cond);
149 closeMutex(&task->lock);
150 #endif
151
152 for (incall = task->incall; incall != NULL; incall = next) {
153 next = incall->prev_stack;
154 stgFree(incall);
155 }
156 for (incall = task->spare_incalls; incall != NULL; incall = next) {
157 next = incall->next;
158 stgFree(incall);
159 }
160
161 stgFree(task);
162 }
163
164 static Task*
165 newTask (rtsBool worker)
166 {
167 #if defined(THREADED_RTS)
168 Time currentElapsedTime, currentUserTime;
169 #endif
170 Task *task;
171
172 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
173 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
174
175 task->cap = NULL;
176 task->worker = worker;
177 task->stopped = rtsFalse;
178 task->running_finalizers = rtsFalse;
179 task->n_spare_incalls = 0;
180 task->spare_incalls = NULL;
181 task->incall = NULL;
182
183 #if defined(THREADED_RTS)
184 initCondition(&task->cond);
185 initMutex(&task->lock);
186 task->wakeup = rtsFalse;
187 #endif
188
189 #if defined(THREADED_RTS)
190 currentUserTime = getThreadCPUTime();
191 currentElapsedTime = getProcessElapsedTime();
192 task->mut_time = 0;
193 task->mut_etime = 0;
194 task->gc_time = 0;
195 task->gc_etime = 0;
196 task->muttimestart = currentUserTime;
197 task->elapsedtimestart = currentElapsedTime;
198 #endif
199
200 task->next = NULL;
201
202 ACQUIRE_LOCK(&all_tasks_mutex);
203
204 task->all_link = all_tasks;
205 all_tasks = task;
206
207 taskCount++;
208
209 RELEASE_LOCK(&all_tasks_mutex);
210
211 return task;
212 }
213
214 // avoid the spare_incalls list growing unboundedly
215 #define MAX_SPARE_INCALLS 8
216
217 static void
218 newInCall (Task *task)
219 {
220 InCall *incall;
221
222 if (task->spare_incalls != NULL) {
223 incall = task->spare_incalls;
224 task->spare_incalls = incall->next;
225 task->n_spare_incalls--;
226 } else {
227 incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
228 }
229
230 incall->tso = NULL;
231 incall->task = task;
232 incall->suspended_tso = NULL;
233 incall->suspended_cap = NULL;
234 incall->stat = NoStatus;
235 incall->ret = NULL;
236 incall->next = NULL;
237 incall->prev = NULL;
238 incall->prev_stack = task->incall;
239 task->incall = incall;
240 }
241
242 static void
243 endInCall (Task *task)
244 {
245 InCall *incall;
246
247 incall = task->incall;
248 incall->tso = NULL;
249 task->incall = task->incall->prev_stack;
250
251 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
252 stgFree(incall);
253 } else {
254 incall->next = task->spare_incalls;
255 task->spare_incalls = incall;
256 task->n_spare_incalls++;
257 }
258 }
259
260
261 Task *
262 newBoundTask (void)
263 {
264 Task *task;
265
266 if (!tasksInitialized) {
267 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
268 stg_exit(EXIT_FAILURE);
269 }
270
271 task = allocTask();
272
273 task->stopped = rtsFalse;
274
275 newInCall(task);
276
277 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
278 return task;
279 }
280
281 void
282 boundTaskExiting (Task *task)
283 {
284 #if defined(THREADED_RTS)
285 ASSERT(osThreadId() == task->id);
286 #endif
287 ASSERT(myTask() == task);
288
289 endInCall(task);
290
291 // Set task->stopped, but only if this is the last call (#4850).
292 // Remember that we might have a worker Task that makes a foreign
293 // call and then a callback, so it can transform into a bound
294 // Task for the duration of the callback.
295 if (task->incall == NULL) {
296 task->stopped = rtsTrue;
297 }
298
299 debugTrace(DEBUG_sched, "task exiting");
300 }
301
302
303 #ifdef THREADED_RTS
304 #define TASK_ID(t) (t)->id
305 #else
306 #define TASK_ID(t) (t)
307 #endif
308
309 void
310 discardTasksExcept (Task *keep)
311 {
312 Task *task, *next;
313
314 // Wipe the task list, except the current Task.
315 ACQUIRE_LOCK(&all_tasks_mutex);
316 for (task = all_tasks; task != NULL; task=next) {
317 next = task->all_link;
318 if (task != keep) {
319 debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
320 freeTask(task);
321 }
322 }
323 all_tasks = keep;
324 keep->all_link = NULL;
325 RELEASE_LOCK(&all_tasks_mutex);
326 }
327
328 void
329 taskTimeStamp (Task *task USED_IF_THREADS)
330 {
331 #if defined(THREADED_RTS)
332 Time currentElapsedTime, currentUserTime;
333
334 currentUserTime = getThreadCPUTime();
335 currentElapsedTime = getProcessElapsedTime();
336
337 task->mut_time =
338 currentUserTime - task->muttimestart - task->gc_time;
339 task->mut_etime =
340 currentElapsedTime - task->elapsedtimestart - task->gc_etime;
341
342 if (task->gc_time < 0) { task->gc_time = 0; }
343 if (task->gc_etime < 0) { task->gc_etime = 0; }
344 if (task->mut_time < 0) { task->mut_time = 0; }
345 if (task->mut_etime < 0) { task->mut_etime = 0; }
346 #endif
347 }
348
349 void
350 taskDoneGC (Task *task, Time cpu_time, Time elapsed_time)
351 {
352 task->gc_time += cpu_time;
353 task->gc_etime += elapsed_time;
354 }
355
356 #if defined(THREADED_RTS)
357
358 void
359 workerTaskStop (Task *task)
360 {
361 DEBUG_ONLY( OSThreadId id );
362 DEBUG_ONLY( id = osThreadId() );
363 ASSERT(task->id == id);
364 ASSERT(myTask() == task);
365
366 task->cap = NULL;
367 taskTimeStamp(task);
368 task->stopped = rtsTrue;
369 }
370
371 #endif
372
373 #ifdef DEBUG
374
375 static void *taskId(Task *task)
376 {
377 #ifdef THREADED_RTS
378 return (void *)task->id;
379 #else
380 return (void *)task;
381 #endif
382 }
383
384 #endif
385
386 #if defined(THREADED_RTS)
387
388 static void OSThreadProcAttr
389 workerStart(Task *task)
390 {
391 Capability *cap;
392
393 // See startWorkerTask().
394 ACQUIRE_LOCK(&task->lock);
395 cap = task->cap;
396 RELEASE_LOCK(&task->lock);
397
398 if (RtsFlags.ParFlags.setAffinity) {
399 setThreadAffinity(cap->no, n_capabilities);
400 }
401
402 // set the thread-local pointer to the Task:
403 setMyTask(task);
404
405 newInCall(task);
406
407 scheduleWorker(cap,task);
408 }
409
410 void
411 startWorkerTask (Capability *cap)
412 {
413 int r;
414 OSThreadId tid;
415 Task *task;
416
417 // A worker always gets a fresh Task structure.
418 task = newTask(rtsTrue);
419
420 // The lock here is to synchronise with taskStart(), to make sure
421 // that we have finished setting up the Task structure before the
422 // worker thread reads it.
423 ACQUIRE_LOCK(&task->lock);
424
425 task->cap = cap;
426
427 // Give the capability directly to the worker; we can't let anyone
428 // else get in, because the new worker Task has nowhere to go to
429 // sleep so that it could be woken up again.
430 ASSERT_LOCK_HELD(&cap->lock);
431 cap->running_task = task;
432
433 r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
434 if (r != 0) {
435 sysErrorBelch("failed to create OS thread");
436 stg_exit(EXIT_FAILURE);
437 }
438
439 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
440
441 task->id = tid;
442
443 // ok, finished with the Task struct.
444 RELEASE_LOCK(&task->lock);
445 }
446
447 void
448 interruptWorkerTask (Task *task)
449 {
450 ASSERT(osThreadId() != task->id); // seppuku not allowed
451 ASSERT(task->incall->suspended_tso); // use this only for FFI calls
452 interruptOSThread(task->id);
453 debugTrace(DEBUG_sched, "interrupted worker task %p", taskId(task));
454 }
455
456 #endif /* THREADED_RTS */
457
458 #ifdef DEBUG
459
460 void printAllTasks(void);
461
462 void
463 printAllTasks(void)
464 {
465 Task *task;
466 for (task = all_tasks; task != NULL; task = task->all_link) {
467 debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
468 if (!task->stopped) {
469 if (task->cap) {
470 debugBelch("on capability %d, ", task->cap->no);
471 }
472 if (task->incall->tso) {
473 debugBelch("bound to thread %lu",
474 (unsigned long)task->incall->tso->id);
475 } else {
476 debugBelch("worker");
477 }
478 }
479 debugBelch("\n");
480 }
481 }
482
483 #endif
484