Fix #3429: a tricky race condition
[ghc.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2006
4 *
5 * Thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Threads.h"
13 #include "STM.h"
14 #include "Schedule.h"
15 #include "Trace.h"
16 #include "ThreadLabels.h"
17
18 /* Next thread ID to allocate.
19 * LOCK: sched_mutex
20 */
21 static StgThreadID next_thread_id = 1;
22
23 /* The smallest stack size that makes any sense is:
24 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
25 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
26 * + 1 (the closure to enter)
27 * + 1 (stg_ap_v_ret)
28 * + 1 (spare slot req'd by stg_ap_v_ret)
29 *
30 * A thread with this stack will bomb immediately with a stack
31 * overflow, which will increase its stack size.
32 */
33 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
34
35 /* ---------------------------------------------------------------------------
36 Create a new thread.
37
38 The new thread starts with the given stack size. Before the
39 scheduler can run, however, this thread needs to have a closure
40 (and possibly some arguments) pushed on its stack. See
41 pushClosure() in Schedule.h.
42
43 createGenThread() and createIOThread() (in SchedAPI.h) are
44 convenient packaged versions of this function.
45
46 currently pri (priority) is only used in a GRAN setup -- HWL
47 ------------------------------------------------------------------------ */
48 StgTSO *
49 createThread(Capability *cap, nat size)
50 {
51 StgTSO *tso;
52 nat stack_size;
53
54 /* sched_mutex is *not* required */
55
56 /* First check whether we should create a thread at all */
57
58 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
59
60 /* catch ridiculously small stack sizes */
61 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
62 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
63 }
64
65 size = round_to_mblocks(size);
66 tso = (StgTSO *)allocateLocal(cap, size);
67
68 stack_size = size - TSO_STRUCT_SIZEW;
69 TICK_ALLOC_TSO(stack_size, 0);
70
71 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
72
73 // Always start with the compiled code evaluator
74 tso->what_next = ThreadRunGHC;
75
76 tso->why_blocked = NotBlocked;
77 tso->blocked_exceptions = END_TSO_QUEUE;
78 tso->flags = 0;
79 tso->dirty = 1;
80
81 tso->saved_errno = 0;
82 tso->bound = NULL;
83 tso->cap = cap;
84
85 tso->stack_size = stack_size;
86 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
87 - TSO_STRUCT_SIZEW;
88 tso->sp = (P_)&(tso->stack) + stack_size;
89
90 tso->trec = NO_TREC;
91
92 #ifdef PROFILING
93 tso->prof.CCCS = CCS_MAIN;
94 #endif
95
96 /* put a stop frame on the stack */
97 tso->sp -= sizeofW(StgStopFrame);
98 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
99 tso->_link = END_TSO_QUEUE;
100
101 /* Link the new thread on the global thread list.
102 */
103 ACQUIRE_LOCK(&sched_mutex);
104 tso->id = next_thread_id++; // while we have the mutex
105 tso->global_link = g0s0->threads;
106 g0s0->threads = tso;
107 RELEASE_LOCK(&sched_mutex);
108
109 postEvent (cap, EVENT_CREATE_THREAD, tso->id, 0);
110
111 debugTrace(DEBUG_sched,
112 "created thread %ld, stack size = %lx words",
113 (long)tso->id, (long)tso->stack_size);
114 return tso;
115 }
116
117 /* ---------------------------------------------------------------------------
118 * Comparing Thread ids.
119 *
120 * This is used from STG land in the implementation of the
121 * instances of Eq/Ord for ThreadIds.
122 * ------------------------------------------------------------------------ */
123
124 int
125 cmp_thread(StgPtr tso1, StgPtr tso2)
126 {
127 StgThreadID id1 = ((StgTSO *)tso1)->id;
128 StgThreadID id2 = ((StgTSO *)tso2)->id;
129
130 if (id1 < id2) return (-1);
131 if (id1 > id2) return 1;
132 return 0;
133 }
134
135 /* ---------------------------------------------------------------------------
136 * Fetching the ThreadID from an StgTSO.
137 *
138 * This is used in the implementation of Show for ThreadIds.
139 * ------------------------------------------------------------------------ */
140 int
141 rts_getThreadId(StgPtr tso)
142 {
143 return ((StgTSO *)tso)->id;
144 }
145
146 /* -----------------------------------------------------------------------------
147 Remove a thread from a queue.
148 Fails fatally if the TSO is not on the queue.
149 -------------------------------------------------------------------------- */
150
151 void
152 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
153 {
154 StgTSO *t, *prev;
155
156 prev = NULL;
157 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
158 if (t == tso) {
159 if (prev) {
160 setTSOLink(cap,prev,t->_link);
161 } else {
162 *queue = t->_link;
163 }
164 return;
165 }
166 }
167 barf("removeThreadFromQueue: not found");
168 }
169
170 void
171 removeThreadFromDeQueue (Capability *cap,
172 StgTSO **head, StgTSO **tail, StgTSO *tso)
173 {
174 StgTSO *t, *prev;
175
176 prev = NULL;
177 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
178 if (t == tso) {
179 if (prev) {
180 setTSOLink(cap,prev,t->_link);
181 } else {
182 *head = t->_link;
183 }
184 if (*tail == tso) {
185 if (prev) {
186 *tail = prev;
187 } else {
188 *tail = END_TSO_QUEUE;
189 }
190 }
191 return;
192 }
193 }
194 barf("removeThreadFromMVarQueue: not found");
195 }
196
197 void
198 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
199 {
200 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
201 }
202
203 /* ----------------------------------------------------------------------------
204 unblockOne()
205
206 unblock a single thread.
207 ------------------------------------------------------------------------- */
208
209 StgTSO *
210 unblockOne (Capability *cap, StgTSO *tso)
211 {
212 return unblockOne_(cap,tso,rtsTrue); // allow migration
213 }
214
215 StgTSO *
216 unblockOne_ (Capability *cap, StgTSO *tso,
217 rtsBool allow_migrate USED_IF_THREADS)
218 {
219 StgTSO *next;
220
221 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
222 ASSERT(tso->why_blocked != NotBlocked);
223
224 tso->why_blocked = NotBlocked;
225 next = tso->_link;
226 tso->_link = END_TSO_QUEUE;
227
228 #if defined(THREADED_RTS)
229 if (tso->cap == cap || (!tsoLocked(tso) &&
230 allow_migrate &&
231 RtsFlags.ParFlags.wakeupMigrate)) {
232 // We are waking up this thread on the current Capability, which
233 // might involve migrating it from the Capability it was last on.
234 if (tso->bound) {
235 ASSERT(tso->bound->cap == tso->cap);
236 tso->bound->cap = cap;
237 }
238
239 tso->cap = cap;
240 appendToRunQueue(cap,tso);
241
242 // context-switch soonish so we can migrate the new thread if
243 // necessary. NB. not contextSwitchCapability(cap), which would
244 // force a context switch immediately.
245 cap->context_switch = 1;
246 } else {
247 // we'll try to wake it up on the Capability it was last on.
248 wakeupThreadOnCapability(cap, tso->cap, tso);
249 }
250 #else
251 appendToRunQueue(cap,tso);
252
253 // context-switch soonish so we can migrate the new thread if
254 // necessary. NB. not contextSwitchCapability(cap), which would
255 // force a context switch immediately.
256 cap->context_switch = 1;
257 #endif
258
259 postEvent (cap, EVENT_THREAD_WAKEUP, tso->id, tso->cap->no);
260
261 debugTrace(DEBUG_sched, "waking up thread %ld on cap %d",
262 (long)tso->id, tso->cap->no);
263
264 return next;
265 }
266
267 /* ----------------------------------------------------------------------------
268 awakenBlockedQueue
269
270 wakes up all the threads on the specified queue.
271 ------------------------------------------------------------------------- */
272
273 void
274 awakenBlockedQueue(Capability *cap, StgTSO *tso)
275 {
276 while (tso != END_TSO_QUEUE) {
277 tso = unblockOne(cap,tso);
278 }
279 }
280
281 /* ---------------------------------------------------------------------------
282 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
283 * used by Control.Concurrent for error checking.
284 * ------------------------------------------------------------------------- */
285
286 HsBool
287 rtsSupportsBoundThreads(void)
288 {
289 #if defined(THREADED_RTS)
290 return HS_BOOL_TRUE;
291 #else
292 return HS_BOOL_FALSE;
293 #endif
294 }
295
296 /* ---------------------------------------------------------------------------
297 * isThreadBound(tso): check whether tso is bound to an OS thread.
298 * ------------------------------------------------------------------------- */
299
300 StgBool
301 isThreadBound(StgTSO* tso USED_IF_THREADS)
302 {
303 #if defined(THREADED_RTS)
304 return (tso->bound != NULL);
305 #endif
306 return rtsFalse;
307 }
308
309 /* ----------------------------------------------------------------------------
310 * Debugging: why is a thread blocked
311 * ------------------------------------------------------------------------- */
312
313 #if DEBUG
314 void
315 printThreadBlockage(StgTSO *tso)
316 {
317 switch (tso->why_blocked) {
318 case BlockedOnRead:
319 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
320 break;
321 case BlockedOnWrite:
322 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
323 break;
324 #if defined(mingw32_HOST_OS)
325 case BlockedOnDoProc:
326 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
327 break;
328 #endif
329 case BlockedOnDelay:
330 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
331 break;
332 case BlockedOnMVar:
333 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
334 break;
335 case BlockedOnException:
336 debugBelch("is blocked on delivering an exception to thread %lu",
337 (unsigned long)tso->block_info.tso->id);
338 break;
339 case BlockedOnBlackHole:
340 debugBelch("is blocked on a black hole");
341 break;
342 case NotBlocked:
343 debugBelch("is not blocked");
344 break;
345 case BlockedOnCCall:
346 debugBelch("is blocked on an external call");
347 break;
348 case BlockedOnCCall_NoUnblockExc:
349 debugBelch("is blocked on an external call (exceptions were already blocked)");
350 break;
351 case BlockedOnSTM:
352 debugBelch("is blocked on an STM operation");
353 break;
354 default:
355 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
356 tso->why_blocked, tso->id, tso);
357 }
358 }
359
360 void
361 printThreadStatus(StgTSO *t)
362 {
363 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
364 {
365 void *label = lookupThreadLabel(t->id);
366 if (label) debugBelch("[\"%s\"] ",(char *)label);
367 }
368 if (t->what_next == ThreadRelocated) {
369 debugBelch("has been relocated...\n");
370 } else {
371 switch (t->what_next) {
372 case ThreadKilled:
373 debugBelch("has been killed");
374 break;
375 case ThreadComplete:
376 debugBelch("has completed");
377 break;
378 default:
379 printThreadBlockage(t);
380 }
381 if (t->dirty) {
382 debugBelch(" (TSO_DIRTY)");
383 } else if (t->flags & TSO_LINK_DIRTY) {
384 debugBelch(" (TSO_LINK_DIRTY)");
385 }
386 debugBelch("\n");
387 }
388 }
389
390 void
391 printAllThreads(void)
392 {
393 StgTSO *t, *next;
394 nat i, s;
395 Capability *cap;
396
397 # if defined(GRAN)
398 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
399 ullong_format_string(TIME_ON_PROC(CurrentProc),
400 time_string, rtsFalse/*no commas!*/);
401
402 debugBelch("all threads at [%s]:\n", time_string);
403 # elif defined(PARALLEL_HASKELL)
404 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
405 ullong_format_string(CURRENT_TIME,
406 time_string, rtsFalse/*no commas!*/);
407
408 debugBelch("all threads at [%s]:\n", time_string);
409 # else
410 debugBelch("all threads:\n");
411 # endif
412
413 for (i = 0; i < n_capabilities; i++) {
414 cap = &capabilities[i];
415 debugBelch("threads on capability %d:\n", cap->no);
416 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
417 printThreadStatus(t);
418 }
419 }
420
421 debugBelch("other threads:\n");
422 for (s = 0; s < total_steps; s++) {
423 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
424 if (t->why_blocked != NotBlocked) {
425 printThreadStatus(t);
426 }
427 if (t->what_next == ThreadRelocated) {
428 next = t->_link;
429 } else {
430 next = t->global_link;
431 }
432 }
433 }
434 }
435
436 // useful from gdb
437 void
438 printThreadQueue(StgTSO *t)
439 {
440 nat i = 0;
441 for (; t != END_TSO_QUEUE; t = t->_link) {
442 printThreadStatus(t);
443 i++;
444 }
445 debugBelch("%d threads on queue\n", i);
446 }
447
448 #endif /* DEBUG */