Split part of the Task struct into a separate struct InCall
[ghc.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2006
4 *
5 * Thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Threads.h"
13 #include "STM.h"
14 #include "Schedule.h"
15 #include "Trace.h"
16 #include "ThreadLabels.h"
17
18 /* Next thread ID to allocate.
19 * LOCK: sched_mutex
20 */
21 static StgThreadID next_thread_id = 1;
22
23 /* The smallest stack size that makes any sense is:
24 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
25 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
26 * + 1 (the closure to enter)
27 * + 1 (stg_ap_v_ret)
28 * + 1 (spare slot req'd by stg_ap_v_ret)
29 *
30 * A thread with this stack will bomb immediately with a stack
31 * overflow, which will increase its stack size.
32 */
33 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
34
35 /* ---------------------------------------------------------------------------
36 Create a new thread.
37
38 The new thread starts with the given stack size. Before the
39 scheduler can run, however, this thread needs to have a closure
40 (and possibly some arguments) pushed on its stack. See
41 pushClosure() in Schedule.h.
42
43 createGenThread() and createIOThread() (in SchedAPI.h) are
44 convenient packaged versions of this function.
45
46 currently pri (priority) is only used in a GRAN setup -- HWL
47 ------------------------------------------------------------------------ */
48 StgTSO *
49 createThread(Capability *cap, nat size)
50 {
51 StgTSO *tso;
52 nat stack_size;
53
54 /* sched_mutex is *not* required */
55
56 /* First check whether we should create a thread at all */
57
58 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
59
60 /* catch ridiculously small stack sizes */
61 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
62 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
63 }
64
65 size = round_to_mblocks(size);
66 tso = (StgTSO *)allocate(cap, size);
67
68 stack_size = size - TSO_STRUCT_SIZEW;
69 TICK_ALLOC_TSO(stack_size, 0);
70
71 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
72
73 // Always start with the compiled code evaluator
74 tso->what_next = ThreadRunGHC;
75
76 tso->why_blocked = NotBlocked;
77 tso->blocked_exceptions = END_TSO_QUEUE;
78 tso->flags = 0;
79 tso->dirty = 1;
80
81 tso->saved_errno = 0;
82 tso->bound = NULL;
83 tso->cap = cap;
84
85 tso->stack_size = stack_size;
86 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
87 - TSO_STRUCT_SIZEW;
88 tso->sp = (P_)&(tso->stack) + stack_size;
89
90 tso->trec = NO_TREC;
91
92 #ifdef PROFILING
93 tso->prof.CCCS = CCS_MAIN;
94 #endif
95
96 /* put a stop frame on the stack */
97 tso->sp -= sizeofW(StgStopFrame);
98 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
99 tso->_link = END_TSO_QUEUE;
100
101 /* Link the new thread on the global thread list.
102 */
103 ACQUIRE_LOCK(&sched_mutex);
104 tso->id = next_thread_id++; // while we have the mutex
105 tso->global_link = g0->threads;
106 g0->threads = tso;
107 RELEASE_LOCK(&sched_mutex);
108
109 // ToDo: report the stack size in the event?
110 traceEventCreateThread(cap, tso);
111
112 return tso;
113 }
114
115 /* ---------------------------------------------------------------------------
116 * Comparing Thread ids.
117 *
118 * This is used from STG land in the implementation of the
119 * instances of Eq/Ord for ThreadIds.
120 * ------------------------------------------------------------------------ */
121
122 int
123 cmp_thread(StgPtr tso1, StgPtr tso2)
124 {
125 StgThreadID id1 = ((StgTSO *)tso1)->id;
126 StgThreadID id2 = ((StgTSO *)tso2)->id;
127
128 if (id1 < id2) return (-1);
129 if (id1 > id2) return 1;
130 return 0;
131 }
132
133 /* ---------------------------------------------------------------------------
134 * Fetching the ThreadID from an StgTSO.
135 *
136 * This is used in the implementation of Show for ThreadIds.
137 * ------------------------------------------------------------------------ */
138 int
139 rts_getThreadId(StgPtr tso)
140 {
141 return ((StgTSO *)tso)->id;
142 }
143
144 /* -----------------------------------------------------------------------------
145 Remove a thread from a queue.
146 Fails fatally if the TSO is not on the queue.
147 -------------------------------------------------------------------------- */
148
149 void
150 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
151 {
152 StgTSO *t, *prev;
153
154 prev = NULL;
155 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
156 if (t == tso) {
157 if (prev) {
158 setTSOLink(cap,prev,t->_link);
159 } else {
160 *queue = t->_link;
161 }
162 return;
163 }
164 }
165 barf("removeThreadFromQueue: not found");
166 }
167
168 void
169 removeThreadFromDeQueue (Capability *cap,
170 StgTSO **head, StgTSO **tail, StgTSO *tso)
171 {
172 StgTSO *t, *prev;
173
174 prev = NULL;
175 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
176 if (t == tso) {
177 if (prev) {
178 setTSOLink(cap,prev,t->_link);
179 } else {
180 *head = t->_link;
181 }
182 if (*tail == tso) {
183 if (prev) {
184 *tail = prev;
185 } else {
186 *tail = END_TSO_QUEUE;
187 }
188 }
189 return;
190 }
191 }
192 barf("removeThreadFromMVarQueue: not found");
193 }
194
195 void
196 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
197 {
198 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
199 }
200
201 /* ----------------------------------------------------------------------------
202 unblockOne()
203
204 unblock a single thread.
205 ------------------------------------------------------------------------- */
206
207 StgTSO *
208 unblockOne (Capability *cap, StgTSO *tso)
209 {
210 return unblockOne_(cap,tso,rtsTrue); // allow migration
211 }
212
213 StgTSO *
214 unblockOne_ (Capability *cap, StgTSO *tso,
215 rtsBool allow_migrate USED_IF_THREADS)
216 {
217 StgTSO *next;
218
219 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
220 ASSERT(tso->why_blocked != NotBlocked);
221
222 tso->why_blocked = NotBlocked;
223 next = tso->_link;
224 tso->_link = END_TSO_QUEUE;
225
226 #if defined(THREADED_RTS)
227 if (tso->cap == cap || (!tsoLocked(tso) &&
228 allow_migrate &&
229 RtsFlags.ParFlags.wakeupMigrate)) {
230 // We are waking up this thread on the current Capability, which
231 // might involve migrating it from the Capability it was last on.
232 if (tso->bound) {
233 ASSERT(tso->bound->task->cap == tso->cap);
234 tso->bound->task->cap = cap;
235 }
236
237 tso->cap = cap;
238 appendToRunQueue(cap,tso);
239
240 // context-switch soonish so we can migrate the new thread if
241 // necessary. NB. not contextSwitchCapability(cap), which would
242 // force a context switch immediately.
243 cap->context_switch = 1;
244 } else {
245 // we'll try to wake it up on the Capability it was last on.
246 wakeupThreadOnCapability(cap, tso->cap, tso);
247 }
248 #else
249 appendToRunQueue(cap,tso);
250
251 // context-switch soonish so we can migrate the new thread if
252 // necessary. NB. not contextSwitchCapability(cap), which would
253 // force a context switch immediately.
254 cap->context_switch = 1;
255 #endif
256
257 traceEventThreadWakeup (cap, tso, tso->cap->no);
258
259 return next;
260 }
261
262 /* ----------------------------------------------------------------------------
263 awakenBlockedQueue
264
265 wakes up all the threads on the specified queue.
266 ------------------------------------------------------------------------- */
267
268 void
269 awakenBlockedQueue(Capability *cap, StgTSO *tso)
270 {
271 while (tso != END_TSO_QUEUE) {
272 tso = unblockOne(cap,tso);
273 }
274 }
275
276 /* ---------------------------------------------------------------------------
277 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
278 * used by Control.Concurrent for error checking.
279 * ------------------------------------------------------------------------- */
280
281 HsBool
282 rtsSupportsBoundThreads(void)
283 {
284 #if defined(THREADED_RTS)
285 return HS_BOOL_TRUE;
286 #else
287 return HS_BOOL_FALSE;
288 #endif
289 }
290
291 /* ---------------------------------------------------------------------------
292 * isThreadBound(tso): check whether tso is bound to an OS thread.
293 * ------------------------------------------------------------------------- */
294
295 StgBool
296 isThreadBound(StgTSO* tso USED_IF_THREADS)
297 {
298 #if defined(THREADED_RTS)
299 return (tso->bound != NULL);
300 #endif
301 return rtsFalse;
302 }
303
304 /* ----------------------------------------------------------------------------
305 * Debugging: why is a thread blocked
306 * ------------------------------------------------------------------------- */
307
308 #if DEBUG
309 void
310 printThreadBlockage(StgTSO *tso)
311 {
312 switch (tso->why_blocked) {
313 case BlockedOnRead:
314 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
315 break;
316 case BlockedOnWrite:
317 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
318 break;
319 #if defined(mingw32_HOST_OS)
320 case BlockedOnDoProc:
321 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
322 break;
323 #endif
324 case BlockedOnDelay:
325 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
326 break;
327 case BlockedOnMVar:
328 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
329 break;
330 case BlockedOnException:
331 debugBelch("is blocked on delivering an exception to thread %lu",
332 (unsigned long)tso->block_info.tso->id);
333 break;
334 case BlockedOnBlackHole:
335 debugBelch("is blocked on a black hole");
336 break;
337 case NotBlocked:
338 debugBelch("is not blocked");
339 break;
340 case BlockedOnCCall:
341 debugBelch("is blocked on an external call");
342 break;
343 case BlockedOnCCall_NoUnblockExc:
344 debugBelch("is blocked on an external call (exceptions were already blocked)");
345 break;
346 case BlockedOnSTM:
347 debugBelch("is blocked on an STM operation");
348 break;
349 default:
350 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
351 tso->why_blocked, tso->id, tso);
352 }
353 }
354
355
356 void
357 printThreadStatus(StgTSO *t)
358 {
359 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
360 {
361 void *label = lookupThreadLabel(t->id);
362 if (label) debugBelch("[\"%s\"] ",(char *)label);
363 }
364 if (t->what_next == ThreadRelocated) {
365 debugBelch("has been relocated...\n");
366 } else {
367 switch (t->what_next) {
368 case ThreadKilled:
369 debugBelch("has been killed");
370 break;
371 case ThreadComplete:
372 debugBelch("has completed");
373 break;
374 default:
375 printThreadBlockage(t);
376 }
377 if (t->dirty) {
378 debugBelch(" (TSO_DIRTY)");
379 } else if (t->flags & TSO_LINK_DIRTY) {
380 debugBelch(" (TSO_LINK_DIRTY)");
381 }
382 debugBelch("\n");
383 }
384 }
385
386 void
387 printAllThreads(void)
388 {
389 StgTSO *t, *next;
390 nat i, g;
391 Capability *cap;
392
393 debugBelch("all threads:\n");
394
395 for (i = 0; i < n_capabilities; i++) {
396 cap = &capabilities[i];
397 debugBelch("threads on capability %d:\n", cap->no);
398 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
399 printThreadStatus(t);
400 }
401 }
402
403 debugBelch("other threads:\n");
404 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
405 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
406 if (t->why_blocked != NotBlocked) {
407 printThreadStatus(t);
408 }
409 if (t->what_next == ThreadRelocated) {
410 next = t->_link;
411 } else {
412 next = t->global_link;
413 }
414 }
415 }
416 }
417
418 // useful from gdb
419 void
420 printThreadQueue(StgTSO *t)
421 {
422 nat i = 0;
423 for (; t != END_TSO_QUEUE; t = t->_link) {
424 printThreadStatus(t);
425 i++;
426 }
427 debugBelch("%d threads on queue\n", i);
428 }
429
430 #endif /* DEBUG */