Fix for derefing ThreadRelocated TSOs in MVar operations
[ghc.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2006
4 *
5 * Thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Capability.h"
13 #include "Updates.h"
14 #include "Threads.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19 #include "Updates.h"
20 #include "Messages.h"
21 #include "sm/Storage.h"
22
23 /* Next thread ID to allocate.
24 * LOCK: sched_mutex
25 */
26 static StgThreadID next_thread_id = 1;
27
28 /* The smallest stack size that makes any sense is:
29 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
30 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
31 * + 1 (the closure to enter)
32 * + 1 (stg_ap_v_ret)
33 * + 1 (spare slot req'd by stg_ap_v_ret)
34 *
35 * A thread with this stack will bomb immediately with a stack
36 * overflow, which will increase its stack size.
37 */
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
39
40 /* ---------------------------------------------------------------------------
41 Create a new thread.
42
43 The new thread starts with the given stack size. Before the
44 scheduler can run, however, this thread needs to have a closure
45 (and possibly some arguments) pushed on its stack. See
46 pushClosure() in Schedule.h.
47
48 createGenThread() and createIOThread() (in SchedAPI.h) are
49 convenient packaged versions of this function.
50
51 currently pri (priority) is only used in a GRAN setup -- HWL
52 ------------------------------------------------------------------------ */
53 StgTSO *
54 createThread(Capability *cap, nat size)
55 {
56 StgTSO *tso;
57 nat stack_size;
58
59 /* sched_mutex is *not* required */
60
61 /* First check whether we should create a thread at all */
62
63 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
64
65 /* catch ridiculously small stack sizes */
66 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
68 }
69
70 size = round_to_mblocks(size);
71 tso = (StgTSO *)allocate(cap, size);
72
73 stack_size = size - TSO_STRUCT_SIZEW;
74 TICK_ALLOC_TSO(stack_size, 0);
75
76 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
77
78 // Always start with the compiled code evaluator
79 tso->what_next = ThreadRunGHC;
80
81 tso->why_blocked = NotBlocked;
82 tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84 tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
85 tso->flags = 0;
86 tso->dirty = 1;
87
88 tso->saved_errno = 0;
89 tso->bound = NULL;
90 tso->cap = cap;
91
92 tso->stack_size = stack_size;
93 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
94 - TSO_STRUCT_SIZEW;
95 tso->sp = (P_)&(tso->stack) + stack_size;
96
97 tso->trec = NO_TREC;
98
99 #ifdef PROFILING
100 tso->prof.CCCS = CCS_MAIN;
101 #endif
102
103 /* put a stop frame on the stack */
104 tso->sp -= sizeofW(StgStopFrame);
105 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106 tso->_link = END_TSO_QUEUE;
107
108 /* Link the new thread on the global thread list.
109 */
110 ACQUIRE_LOCK(&sched_mutex);
111 tso->id = next_thread_id++; // while we have the mutex
112 tso->global_link = g0->threads;
113 g0->threads = tso;
114 RELEASE_LOCK(&sched_mutex);
115
116 // ToDo: report the stack size in the event?
117 traceEventCreateThread(cap, tso);
118
119 return tso;
120 }
121
122 /* ---------------------------------------------------------------------------
123 * Comparing Thread ids.
124 *
125 * This is used from STG land in the implementation of the
126 * instances of Eq/Ord for ThreadIds.
127 * ------------------------------------------------------------------------ */
128
129 int
130 cmp_thread(StgPtr tso1, StgPtr tso2)
131 {
132 StgThreadID id1 = ((StgTSO *)tso1)->id;
133 StgThreadID id2 = ((StgTSO *)tso2)->id;
134
135 if (id1 < id2) return (-1);
136 if (id1 > id2) return 1;
137 return 0;
138 }
139
140 /* ---------------------------------------------------------------------------
141 * Fetching the ThreadID from an StgTSO.
142 *
143 * This is used in the implementation of Show for ThreadIds.
144 * ------------------------------------------------------------------------ */
145 int
146 rts_getThreadId(StgPtr tso)
147 {
148 return ((StgTSO *)tso)->id;
149 }
150
151 /* -----------------------------------------------------------------------------
152 Remove a thread from a queue.
153 Fails fatally if the TSO is not on the queue.
154 -------------------------------------------------------------------------- */
155
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
158 {
159 StgTSO *t, *prev;
160
161 prev = NULL;
162 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
163 if (t == tso) {
164 if (prev) {
165 setTSOLink(cap,prev,t->_link);
166 return rtsFalse;
167 } else {
168 *queue = t->_link;
169 return rtsTrue;
170 }
171 }
172 }
173 barf("removeThreadFromQueue: not found");
174 }
175
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap,
178 StgTSO **head, StgTSO **tail, StgTSO *tso)
179 {
180 StgTSO *t, *prev;
181 rtsBool flag = rtsFalse;
182
183 prev = NULL;
184 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
185 if (t == tso) {
186 if (prev) {
187 setTSOLink(cap,prev,t->_link);
188 flag = rtsFalse;
189 } else {
190 *head = t->_link;
191 flag = rtsTrue;
192 }
193 if (*tail == tso) {
194 if (prev) {
195 *tail = prev;
196 } else {
197 *tail = END_TSO_QUEUE;
198 }
199 return rtsTrue;
200 } else {
201 return flag;
202 }
203 }
204 }
205 barf("removeThreadFromMVarQueue: not found");
206 }
207
208 /* ----------------------------------------------------------------------------
209 tryWakeupThread()
210
211 Attempt to wake up a thread. tryWakeupThread is idempotent: it is
212 always safe to call it too many times, but it is not safe in
213 general to omit a call.
214
215 ------------------------------------------------------------------------- */
216
217 void
218 tryWakeupThread (Capability *cap, StgTSO *tso)
219 {
220 tryWakeupThread_(cap, deRefTSO(tso));
221 }
222
223 void
224 tryWakeupThread_ (Capability *cap, StgTSO *tso)
225 {
226 traceEventThreadWakeup (cap, tso, tso->cap->no);
227
228 #ifdef THREADED_RTS
229 if (tso->cap != cap)
230 {
231 MessageWakeup *msg;
232 msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
233 SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
234 msg->tso = tso;
235 sendMessage(cap, tso->cap, (Message*)msg);
236 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
237 (lnat)tso->id, tso->cap->no);
238 return;
239 }
240 #endif
241
242 switch (tso->why_blocked)
243 {
244 case BlockedOnMVar:
245 {
246 if (tso->_link == END_TSO_QUEUE) {
247 tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
248 goto unblock;
249 } else {
250 return;
251 }
252 }
253
254 case BlockedOnMsgThrowTo:
255 {
256 const StgInfoTable *i;
257
258 i = lockClosure(tso->block_info.closure);
259 unlockClosure(tso->block_info.closure, i);
260 if (i != &stg_MSG_NULL_info) {
261 debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
262 (lnat)tso->id, tso->block_info.throwto->header.info);
263 return;
264 }
265
266 // remove the block frame from the stack
267 ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
268 tso->sp += 3;
269 goto unblock;
270 }
271
272 case BlockedOnBlackHole:
273 case BlockedOnSTM:
274 case ThreadMigrating:
275 goto unblock;
276
277 default:
278 // otherwise, do nothing
279 return;
280 }
281
282 unblock:
283 // just run the thread now, if the BH is not really available,
284 // we'll block again.
285 tso->why_blocked = NotBlocked;
286 appendToRunQueue(cap,tso);
287 }
288
289 /* ----------------------------------------------------------------------------
290 migrateThread
291 ------------------------------------------------------------------------- */
292
293 void
294 migrateThread (Capability *from, StgTSO *tso, Capability *to)
295 {
296 traceEventMigrateThread (from, tso, to->no);
297 // ThreadMigrating tells the target cap that it needs to be added to
298 // the run queue when it receives the MSG_TRY_WAKEUP.
299 tso->why_blocked = ThreadMigrating;
300 tso->cap = to;
301 tryWakeupThread(from, tso);
302 }
303
304 /* ----------------------------------------------------------------------------
305 awakenBlockedQueue
306
307 wakes up all the threads on the specified queue.
308 ------------------------------------------------------------------------- */
309
310 void
311 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
312 {
313 MessageBlackHole *msg;
314 const StgInfoTable *i;
315
316 ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
317 bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
318
319 for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
320 msg = msg->link) {
321 i = msg->header.info;
322 if (i != &stg_IND_info) {
323 ASSERT(i == &stg_MSG_BLACKHOLE_info);
324 tryWakeupThread(cap,msg->tso);
325 }
326 }
327
328 // overwrite the BQ with an indirection so it will be
329 // collected at the next GC.
330 #if defined(DEBUG) && !defined(THREADED_RTS)
331 // XXX FILL_SLOP, but not if THREADED_RTS because in that case
332 // another thread might be looking at this BLOCKING_QUEUE and
333 // checking the owner field at the same time.
334 bq->bh = 0; bq->queue = 0; bq->owner = 0;
335 #endif
336 OVERWRITE_INFO(bq, &stg_IND_info);
337 }
338
339 // If we update a closure that we know we BLACKHOLE'd, and the closure
340 // no longer points to the current TSO as its owner, then there may be
341 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
342 // it. We therefore traverse the BLOCKING_QUEUEs attached to the
343 // current TSO to see if any can now be woken up.
344 void
345 checkBlockingQueues (Capability *cap, StgTSO *tso)
346 {
347 StgBlockingQueue *bq, *next;
348 StgClosure *p;
349
350 debugTraceCap(DEBUG_sched, cap,
351 "collision occurred; checking blocking queues for thread %ld",
352 (lnat)tso->id);
353
354 for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
355 next = bq->link;
356
357 if (bq->header.info == &stg_IND_info) {
358 // ToDo: could short it out right here, to avoid
359 // traversing this IND multiple times.
360 continue;
361 }
362
363 p = bq->bh;
364
365 if (p->header.info != &stg_BLACKHOLE_info ||
366 ((StgInd *)p)->indirectee != (StgClosure*)bq)
367 {
368 wakeBlockingQueue(cap,bq);
369 }
370 }
371 }
372
373 /* ----------------------------------------------------------------------------
374 updateThunk
375
376 Update a thunk with a value. In order to do this, we need to know
377 which TSO owns (or is evaluating) the thunk, in case we need to
378 awaken any threads that are blocked on it.
379 ------------------------------------------------------------------------- */
380
381 void
382 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
383 {
384 StgClosure *v;
385 StgTSO *owner;
386 const StgInfoTable *i;
387
388 i = thunk->header.info;
389 if (i != &stg_BLACKHOLE_info &&
390 i != &stg_CAF_BLACKHOLE_info &&
391 i != &stg_WHITEHOLE_info) {
392 updateWithIndirection(cap, thunk, val);
393 return;
394 }
395
396 v = ((StgInd*)thunk)->indirectee;
397
398 updateWithIndirection(cap, thunk, val);
399
400 i = v->header.info;
401 if (i == &stg_TSO_info) {
402 owner = deRefTSO((StgTSO*)v);
403 if (owner != tso) {
404 checkBlockingQueues(cap, tso);
405 }
406 return;
407 }
408
409 if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
410 i != &stg_BLOCKING_QUEUE_DIRTY_info) {
411 checkBlockingQueues(cap, tso);
412 return;
413 }
414
415 owner = deRefTSO(((StgBlockingQueue*)v)->owner);
416
417 if (owner != tso) {
418 checkBlockingQueues(cap, tso);
419 } else {
420 wakeBlockingQueue(cap, (StgBlockingQueue*)v);
421 }
422 }
423
424 /* ---------------------------------------------------------------------------
425 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
426 * used by Control.Concurrent for error checking.
427 * ------------------------------------------------------------------------- */
428
429 HsBool
430 rtsSupportsBoundThreads(void)
431 {
432 #if defined(THREADED_RTS)
433 return HS_BOOL_TRUE;
434 #else
435 return HS_BOOL_FALSE;
436 #endif
437 }
438
439 /* ---------------------------------------------------------------------------
440 * isThreadBound(tso): check whether tso is bound to an OS thread.
441 * ------------------------------------------------------------------------- */
442
443 StgBool
444 isThreadBound(StgTSO* tso USED_IF_THREADS)
445 {
446 #if defined(THREADED_RTS)
447 return (tso->bound != NULL);
448 #endif
449 return rtsFalse;
450 }
451
452 /* ----------------------------------------------------------------------------
453 * Debugging: why is a thread blocked
454 * ------------------------------------------------------------------------- */
455
456 #if DEBUG
457 void
458 printThreadBlockage(StgTSO *tso)
459 {
460 switch (tso->why_blocked) {
461 case BlockedOnRead:
462 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
463 break;
464 case BlockedOnWrite:
465 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
466 break;
467 #if defined(mingw32_HOST_OS)
468 case BlockedOnDoProc:
469 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
470 break;
471 #endif
472 case BlockedOnDelay:
473 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
474 break;
475 case BlockedOnMVar:
476 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
477 break;
478 case BlockedOnBlackHole:
479 debugBelch("is blocked on a black hole %p",
480 ((StgBlockingQueue*)tso->block_info.bh->bh));
481 break;
482 case BlockedOnMsgThrowTo:
483 debugBelch("is blocked on a throwto message");
484 break;
485 case NotBlocked:
486 debugBelch("is not blocked");
487 break;
488 case ThreadMigrating:
489 debugBelch("is runnable, but not on the run queue");
490 break;
491 case BlockedOnCCall:
492 debugBelch("is blocked on an external call");
493 break;
494 case BlockedOnCCall_NoUnblockExc:
495 debugBelch("is blocked on an external call (exceptions were already blocked)");
496 break;
497 case BlockedOnSTM:
498 debugBelch("is blocked on an STM operation");
499 break;
500 default:
501 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
502 tso->why_blocked, tso->id, tso);
503 }
504 }
505
506
507 void
508 printThreadStatus(StgTSO *t)
509 {
510 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
511 {
512 void *label = lookupThreadLabel(t->id);
513 if (label) debugBelch("[\"%s\"] ",(char *)label);
514 }
515 if (t->what_next == ThreadRelocated) {
516 debugBelch("has been relocated...\n");
517 } else {
518 switch (t->what_next) {
519 case ThreadKilled:
520 debugBelch("has been killed");
521 break;
522 case ThreadComplete:
523 debugBelch("has completed");
524 break;
525 default:
526 printThreadBlockage(t);
527 }
528 if (t->dirty) {
529 debugBelch(" (TSO_DIRTY)");
530 } else if (t->flags & TSO_LINK_DIRTY) {
531 debugBelch(" (TSO_LINK_DIRTY)");
532 }
533 debugBelch("\n");
534 }
535 }
536
537 void
538 printAllThreads(void)
539 {
540 StgTSO *t, *next;
541 nat i, g;
542 Capability *cap;
543
544 debugBelch("all threads:\n");
545
546 for (i = 0; i < n_capabilities; i++) {
547 cap = &capabilities[i];
548 debugBelch("threads on capability %d:\n", cap->no);
549 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
550 printThreadStatus(t);
551 }
552 }
553
554 debugBelch("other threads:\n");
555 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
556 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
557 if (t->why_blocked != NotBlocked) {
558 printThreadStatus(t);
559 }
560 if (t->what_next == ThreadRelocated) {
561 next = t->_link;
562 } else {
563 next = t->global_link;
564 }
565 }
566 }
567 }
568
569 // useful from gdb
570 void
571 printThreadQueue(StgTSO *t)
572 {
573 nat i = 0;
574 for (; t != END_TSO_QUEUE; t = t->_link) {
575 printThreadStatus(t);
576 i++;
577 }
578 debugBelch("%d threads on queue\n", i);
579 }
580
581 #endif /* DEBUG */