992360988464b957e07961e89db41f96a2d166f7
[ghc.git] / rts / Task.c
1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team 2001-2005
4 *
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
8 *
9 * -------------------------------------------------------------------------*/
10
11 #include "Rts.h"
12 #include "RtsUtils.h"
13 #include "OSThreads.h"
14 #include "Task.h"
15 #include "Capability.h"
16 #include "Stats.h"
17 #include "RtsFlags.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20
21 #if HAVE_SIGNAL_H
22 #include <signal.h>
23 #endif
24
25 // Task lists and global counters.
26 // Locks required: sched_mutex.
27 Task *all_tasks = NULL;
28 static Task *task_free_list = NULL; // singly-linked
29 static nat taskCount;
30 #define DEFAULT_MAX_WORKERS 64
31 static nat maxWorkers; // we won't create more workers than this
32 static nat tasksRunning;
33 static nat workerCount;
34
35 /* -----------------------------------------------------------------------------
36 * Remembering the current thread's Task
37 * -------------------------------------------------------------------------- */
38
39 // A thread-local-storage key that we can use to get access to the
40 // current thread's Task structure.
41 #if defined(THREADED_RTS)
42 ThreadLocalKey currentTaskKey;
43 #else
44 Task *my_task;
45 #endif
46
47 /* -----------------------------------------------------------------------------
48 * Rest of the Task API
49 * -------------------------------------------------------------------------- */
50
51 void
52 initTaskManager (void)
53 {
54 static int initialized = 0;
55
56 if (!initialized) {
57 taskCount = 0;
58 workerCount = 0;
59 tasksRunning = 0;
60 maxWorkers = DEFAULT_MAX_WORKERS;
61 initialized = 1;
62 #if defined(THREADED_RTS)
63 newThreadLocalKey(&currentTaskKey);
64 #endif
65 }
66 }
67
68
69 void
70 stopTaskManager (void)
71 {
72 IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning));
73 }
74
75
76 static Task*
77 newTask (void)
78 {
79 #if defined(THREADED_RTS)
80 Ticks currentElapsedTime, currentUserTime;
81 #endif
82 Task *task;
83
84 task = stgMallocBytes(sizeof(Task), "newTask");
85
86 task->cap = NULL;
87 task->stopped = rtsFalse;
88 task->suspended_tso = NULL;
89 task->tso = NULL;
90 task->stat = NoStatus;
91 task->ret = NULL;
92
93 #if defined(THREADED_RTS)
94 initCondition(&task->cond);
95 initMutex(&task->lock);
96 task->wakeup = rtsFalse;
97 #endif
98
99 #if defined(THREADED_RTS)
100 currentUserTime = getThreadCPUTime();
101 currentElapsedTime = getProcessElapsedTime();
102 task->mut_time = 0;
103 task->mut_etime = 0;
104 task->gc_time = 0;
105 task->gc_etime = 0;
106 task->muttimestart = currentUserTime;
107 task->elapsedtimestart = currentElapsedTime;
108 #endif
109
110 task->prev = NULL;
111 task->next = NULL;
112 task->return_link = NULL;
113
114 task->all_link = all_tasks;
115 all_tasks = task;
116
117 taskCount++;
118 workerCount++;
119
120 return task;
121 }
122
123 Task *
124 newBoundTask (void)
125 {
126 Task *task;
127
128 ASSERT_LOCK_HELD(&sched_mutex);
129 if (task_free_list == NULL) {
130 task = newTask();
131 } else {
132 task = task_free_list;
133 task_free_list = task->next;
134 task->next = NULL;
135 task->prev = NULL;
136 task->stopped = rtsFalse;
137 }
138 #if defined(THREADED_RTS)
139 task->id = osThreadId();
140 #endif
141 ASSERT(task->cap == NULL);
142
143 tasksRunning++;
144
145 taskEnter(task);
146
147 IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount););
148 return task;
149 }
150
151 void
152 boundTaskExiting (Task *task)
153 {
154 task->stopped = rtsTrue;
155 task->cap = NULL;
156
157 #if defined(THREADED_RTS)
158 ASSERT(osThreadId() == task->id);
159 #endif
160 ASSERT(myTask() == task);
161 setMyTask(task->prev_stack);
162
163 tasksRunning--;
164
165 // sadly, we need a lock around the free task list. Todo: eliminate.
166 ACQUIRE_LOCK(&sched_mutex);
167 task->next = task_free_list;
168 task_free_list = task;
169 RELEASE_LOCK(&sched_mutex);
170
171 IF_DEBUG(scheduler,sched_belch("task exiting"));
172 }
173
174 #ifdef THREADED_RTS
175 #define TASK_ID(t) (t)->id
176 #else
177 #define TASK_ID(t) (t)
178 #endif
179
180 void
181 discardTask (Task *task)
182 {
183 ASSERT_LOCK_HELD(&sched_mutex);
184 if (!task->stopped) {
185 IF_DEBUG(scheduler,sched_belch("discarding task %p", TASK_ID(task)));
186 task->cap = NULL;
187 task->tso = NULL;
188 task->stopped = rtsTrue;
189 tasksRunning--;
190 task->next = task_free_list;
191 task_free_list = task;
192 }
193 }
194
195 void
196 taskTimeStamp (Task *task USED_IF_THREADS)
197 {
198 #if defined(THREADED_RTS)
199 Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
200
201 currentUserTime = getThreadCPUTime();
202 currentElapsedTime = getProcessElapsedTime();
203
204 // XXX this is wrong; we want elapsed GC time since the
205 // Task started.
206 elapsedGCTime = stat_getElapsedGCTime();
207
208 task->mut_time =
209 currentUserTime - task->muttimestart - task->gc_time;
210 task->mut_etime =
211 currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
212
213 if (task->mut_time < 0) { task->mut_time = 0; }
214 if (task->mut_etime < 0) { task->mut_etime = 0; }
215 #endif
216 }
217
218 void
219 workerTaskStop (Task *task)
220 {
221 #if defined(THREADED_RTS)
222 OSThreadId id;
223 id = osThreadId();
224 ASSERT(task->id == id);
225 ASSERT(myTask() == task);
226 #endif
227
228 taskTimeStamp(task);
229 task->stopped = rtsTrue;
230 tasksRunning--;
231 }
232
233 void
234 resetTaskManagerAfterFork (void)
235 {
236 #warning TODO!
237 taskCount = 0;
238 }
239
240 #if defined(THREADED_RTS)
241
242 void
243 startWorkerTask (Capability *cap,
244 void OSThreadProcAttr (*taskStart)(Task *task))
245 {
246 int r;
247 OSThreadId tid;
248 Task *task;
249
250 if (workerCount >= maxWorkers) {
251 barf("too many workers; runaway worker creation?");
252 }
253 workerCount++;
254
255 // A worker always gets a fresh Task structure.
256 task = newTask();
257
258 tasksRunning++;
259
260 // The lock here is to synchronise with taskStart(), to make sure
261 // that we have finished setting up the Task structure before the
262 // worker thread reads it.
263 ACQUIRE_LOCK(&task->lock);
264
265 task->cap = cap;
266
267 // Give the capability directly to the worker; we can't let anyone
268 // else get in, because the new worker Task has nowhere to go to
269 // sleep so that it could be woken up again.
270 ASSERT_LOCK_HELD(&cap->lock);
271 cap->running_task = task;
272
273 r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
274 if (r != 0) {
275 barf("startTask: Can't create new task");
276 }
277
278 IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount););
279
280 task->id = tid;
281
282 // ok, finished with the Task struct.
283 RELEASE_LOCK(&task->lock);
284 }
285
286 #endif /* THREADED_RTS */
287
288 #ifdef DEBUG
289
290 static void *taskId(Task *task)
291 {
292 #ifdef THREADED_RTS
293 return (void *)task->id;
294 #else
295 return (void *)task;
296 #endif
297 }
298
299 void printAllTasks(void);
300
301 void
302 printAllTasks(void)
303 {
304 Task *task;
305 for (task = all_tasks; task != NULL; task = task->all_link) {
306 debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
307 if (!task->stopped) {
308 if (task->cap) {
309 debugBelch("on capability %d, ", task->cap->no);
310 }
311 if (task->tso) {
312 debugBelch("bound to thread %d", task->tso->id);
313 } else {
314 debugBelch("worker");
315 }
316 }
317 debugBelch("\n");
318 }
319 }
320
321 #endif
322