New implementation of BLACKHOLEs
[ghc.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2006
4 *
5 * Thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Capability.h"
13 #include "Updates.h"
14 #include "Threads.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19 #include "Updates.h"
20 #include "Messages.h"
21 #include "sm/Storage.h"
22
23 /* Next thread ID to allocate.
24 * LOCK: sched_mutex
25 */
26 static StgThreadID next_thread_id = 1;
27
28 /* The smallest stack size that makes any sense is:
29 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
30 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
31 * + 1 (the closure to enter)
32 * + 1 (stg_ap_v_ret)
33 * + 1 (spare slot req'd by stg_ap_v_ret)
34 *
35 * A thread with this stack will bomb immediately with a stack
36 * overflow, which will increase its stack size.
37 */
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
39
40 /* ---------------------------------------------------------------------------
41 Create a new thread.
42
43 The new thread starts with the given stack size. Before the
44 scheduler can run, however, this thread needs to have a closure
45 (and possibly some arguments) pushed on its stack. See
46 pushClosure() in Schedule.h.
47
48 createGenThread() and createIOThread() (in SchedAPI.h) are
49 convenient packaged versions of this function.
50
51 currently pri (priority) is only used in a GRAN setup -- HWL
52 ------------------------------------------------------------------------ */
53 StgTSO *
54 createThread(Capability *cap, nat size)
55 {
56 StgTSO *tso;
57 nat stack_size;
58
59 /* sched_mutex is *not* required */
60
61 /* First check whether we should create a thread at all */
62
63 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
64
65 /* catch ridiculously small stack sizes */
66 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
68 }
69
70 size = round_to_mblocks(size);
71 tso = (StgTSO *)allocate(cap, size);
72
73 stack_size = size - TSO_STRUCT_SIZEW;
74 TICK_ALLOC_TSO(stack_size, 0);
75
76 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
77
78 // Always start with the compiled code evaluator
79 tso->what_next = ThreadRunGHC;
80
81 tso->why_blocked = NotBlocked;
82 tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84 tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
85 tso->flags = 0;
86 tso->dirty = 1;
87
88 tso->saved_errno = 0;
89 tso->bound = NULL;
90 tso->cap = cap;
91
92 tso->stack_size = stack_size;
93 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
94 - TSO_STRUCT_SIZEW;
95 tso->sp = (P_)&(tso->stack) + stack_size;
96
97 tso->trec = NO_TREC;
98
99 #ifdef PROFILING
100 tso->prof.CCCS = CCS_MAIN;
101 #endif
102
103 /* put a stop frame on the stack */
104 tso->sp -= sizeofW(StgStopFrame);
105 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106 tso->_link = END_TSO_QUEUE;
107
108 /* Link the new thread on the global thread list.
109 */
110 ACQUIRE_LOCK(&sched_mutex);
111 tso->id = next_thread_id++; // while we have the mutex
112 tso->global_link = g0->threads;
113 g0->threads = tso;
114 RELEASE_LOCK(&sched_mutex);
115
116 // ToDo: report the stack size in the event?
117 traceEventCreateThread(cap, tso);
118
119 return tso;
120 }
121
122 /* ---------------------------------------------------------------------------
123 * Comparing Thread ids.
124 *
125 * This is used from STG land in the implementation of the
126 * instances of Eq/Ord for ThreadIds.
127 * ------------------------------------------------------------------------ */
128
129 int
130 cmp_thread(StgPtr tso1, StgPtr tso2)
131 {
132 StgThreadID id1 = ((StgTSO *)tso1)->id;
133 StgThreadID id2 = ((StgTSO *)tso2)->id;
134
135 if (id1 < id2) return (-1);
136 if (id1 > id2) return 1;
137 return 0;
138 }
139
140 /* ---------------------------------------------------------------------------
141 * Fetching the ThreadID from an StgTSO.
142 *
143 * This is used in the implementation of Show for ThreadIds.
144 * ------------------------------------------------------------------------ */
145 int
146 rts_getThreadId(StgPtr tso)
147 {
148 return ((StgTSO *)tso)->id;
149 }
150
151 /* -----------------------------------------------------------------------------
152 Remove a thread from a queue.
153 Fails fatally if the TSO is not on the queue.
154 -------------------------------------------------------------------------- */
155
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
158 {
159 StgTSO *t, *prev;
160
161 prev = NULL;
162 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
163 if (t == tso) {
164 if (prev) {
165 setTSOLink(cap,prev,t->_link);
166 return rtsFalse;
167 } else {
168 *queue = t->_link;
169 return rtsTrue;
170 }
171 }
172 }
173 barf("removeThreadFromQueue: not found");
174 }
175
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap,
178 StgTSO **head, StgTSO **tail, StgTSO *tso)
179 {
180 StgTSO *t, *prev;
181 rtsBool flag = rtsFalse;
182
183 prev = NULL;
184 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
185 if (t == tso) {
186 if (prev) {
187 setTSOLink(cap,prev,t->_link);
188 flag = rtsFalse;
189 } else {
190 *head = t->_link;
191 flag = rtsTrue;
192 }
193 if (*tail == tso) {
194 if (prev) {
195 *tail = prev;
196 } else {
197 *tail = END_TSO_QUEUE;
198 }
199 return rtsTrue;
200 } else {
201 return flag;
202 }
203 }
204 }
205 barf("removeThreadFromMVarQueue: not found");
206 }
207
208 void
209 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
210 {
211 // caller must do the write barrier, because replacing the info
212 // pointer will unlock the MVar.
213 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
214 tso->_link = END_TSO_QUEUE;
215 }
216
217 /* ----------------------------------------------------------------------------
218 unblockOne()
219
220 unblock a single thread.
221 ------------------------------------------------------------------------- */
222
223 StgTSO *
224 unblockOne (Capability *cap, StgTSO *tso)
225 {
226 return unblockOne_(cap,tso,rtsTrue); // allow migration
227 }
228
229 StgTSO *
230 unblockOne_ (Capability *cap, StgTSO *tso,
231 rtsBool allow_migrate USED_IF_THREADS)
232 {
233 StgTSO *next;
234
235 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
236 ASSERT(tso->why_blocked != NotBlocked);
237 ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
238 tso->block_info.closure->header.info == &stg_IND_info);
239
240 next = tso->_link;
241 tso->_link = END_TSO_QUEUE;
242
243 #if defined(THREADED_RTS)
244 if (tso->cap == cap || (!tsoLocked(tso) &&
245 allow_migrate &&
246 RtsFlags.ParFlags.wakeupMigrate)) {
247 // We are waking up this thread on the current Capability, which
248 // might involve migrating it from the Capability it was last on.
249 if (tso->bound) {
250 ASSERT(tso->bound->task->cap == tso->cap);
251 tso->bound->task->cap = cap;
252 }
253
254 tso->cap = cap;
255 write_barrier();
256 tso->why_blocked = NotBlocked;
257 appendToRunQueue(cap,tso);
258
259 // context-switch soonish so we can migrate the new thread if
260 // necessary. NB. not contextSwitchCapability(cap), which would
261 // force a context switch immediately.
262 cap->context_switch = 1;
263 } else {
264 // we'll try to wake it up on the Capability it was last on.
265 wakeupThreadOnCapability(cap, tso->cap, tso);
266 }
267 #else
268 tso->why_blocked = NotBlocked;
269 appendToRunQueue(cap,tso);
270
271 // context-switch soonish so we can migrate the new thread if
272 // necessary. NB. not contextSwitchCapability(cap), which would
273 // force a context switch immediately.
274 cap->context_switch = 1;
275 #endif
276
277 traceEventThreadWakeup (cap, tso, tso->cap->no);
278
279 return next;
280 }
281
282 void
283 tryWakeupThread (Capability *cap, StgTSO *tso)
284 {
285 #ifdef THREADED_RTS
286 if (tso->cap != cap)
287 {
288 MessageWakeup *msg;
289 msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
290 SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
291 msg->tso = tso;
292 sendMessage(cap, tso->cap, (Message*)msg);
293 return;
294 }
295 #endif
296
297 switch (tso->why_blocked)
298 {
299 case BlockedOnBlackHole:
300 case BlockedOnSTM:
301 {
302 // just run the thread now, if the BH is not really available,
303 // we'll block again.
304 tso->why_blocked = NotBlocked;
305 appendToRunQueue(cap,tso);
306 break;
307 }
308 default:
309 // otherwise, do nothing
310 break;
311 }
312 }
313
314 /* ----------------------------------------------------------------------------
315 awakenBlockedQueue
316
317 wakes up all the threads on the specified queue.
318 ------------------------------------------------------------------------- */
319
320 void
321 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
322 {
323 MessageBlackHole *msg;
324 const StgInfoTable *i;
325
326 ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
327 bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
328
329 for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
330 msg = msg->link) {
331 i = msg->header.info;
332 if (i != &stg_IND_info) {
333 ASSERT(i == &stg_MSG_BLACKHOLE_info);
334 tryWakeupThread(cap,msg->tso);
335 }
336 }
337
338 // overwrite the BQ with an indirection so it will be
339 // collected at the next GC.
340 #if defined(DEBUG) && !defined(THREADED_RTS)
341 // XXX FILL_SLOP, but not if THREADED_RTS because in that case
342 // another thread might be looking at this BLOCKING_QUEUE and
343 // checking the owner field at the same time.
344 bq->bh = 0; bq->queue = 0; bq->owner = 0;
345 #endif
346 OVERWRITE_INFO(bq, &stg_IND_info);
347 }
348
349 // If we update a closure that we know we BLACKHOLE'd, and the closure
350 // no longer points to the current TSO as its owner, then there may be
351 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
352 // it. We therefore traverse the BLOCKING_QUEUEs attached to the
353 // current TSO to see if any can now be woken up.
354 void
355 checkBlockingQueues (Capability *cap, StgTSO *tso)
356 {
357 StgBlockingQueue *bq, *next;
358 StgClosure *p;
359
360 debugTraceCap(DEBUG_sched, cap,
361 "collision occurred; checking blocking queues for thread %ld",
362 (lnat)tso->id);
363
364 for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
365 next = bq->link;
366
367 if (bq->header.info == &stg_IND_info) {
368 // ToDo: could short it out right here, to avoid
369 // traversing this IND multiple times.
370 continue;
371 }
372
373 p = bq->bh;
374
375 if (p->header.info != &stg_BLACKHOLE_info ||
376 ((StgInd *)p)->indirectee != (StgClosure*)bq)
377 {
378 wakeBlockingQueue(cap,bq);
379 }
380 }
381 }
382
383 /* ----------------------------------------------------------------------------
384 updateThunk
385
386 Update a thunk with a value. In order to do this, we need to know
387 which TSO owns (or is evaluating) the thunk, in case we need to
388 awaken any threads that are blocked on it.
389 ------------------------------------------------------------------------- */
390
391 void
392 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
393 {
394 StgClosure *v;
395 StgTSO *owner;
396 const StgInfoTable *i;
397
398 i = thunk->header.info;
399 if (i != &stg_BLACKHOLE_info &&
400 i != &stg_CAF_BLACKHOLE_info &&
401 i != &stg_WHITEHOLE_info) {
402 updateWithIndirection(cap, thunk, val);
403 return;
404 }
405
406 v = ((StgInd*)thunk)->indirectee;
407
408 updateWithIndirection(cap, thunk, val);
409
410 i = v->header.info;
411 if (i == &stg_TSO_info) {
412 owner = deRefTSO((StgTSO*)v);
413 if (owner != tso) {
414 checkBlockingQueues(cap, tso);
415 }
416 return;
417 }
418
419 if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
420 i != &stg_BLOCKING_QUEUE_DIRTY_info) {
421 checkBlockingQueues(cap, tso);
422 return;
423 }
424
425 owner = deRefTSO(((StgBlockingQueue*)v)->owner);
426
427 if (owner != tso) {
428 checkBlockingQueues(cap, tso);
429 } else {
430 wakeBlockingQueue(cap, (StgBlockingQueue*)v);
431 }
432 }
433
434 /* ----------------------------------------------------------------------------
435 * Wake up a thread on a Capability.
436 *
437 * This is used when the current Task is running on a Capability and
438 * wishes to wake up a thread on a different Capability.
439 * ------------------------------------------------------------------------- */
440
441 #ifdef THREADED_RTS
442
443 void
444 wakeupThreadOnCapability (Capability *cap,
445 Capability *other_cap,
446 StgTSO *tso)
447 {
448 MessageWakeup *msg;
449
450 // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
451 if (tso->bound) {
452 ASSERT(tso->bound->task->cap == tso->cap);
453 tso->bound->task->cap = other_cap;
454 }
455 tso->cap = other_cap;
456
457 ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
458 tso->block_info.closure->header.info == &stg_IND_info);
459
460 ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
461
462 msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
463 SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
464 msg->tso = tso;
465 tso->block_info.closure = (StgClosure *)msg;
466 dirty_TSO(cap, tso);
467 write_barrier();
468 tso->why_blocked = BlockedOnMsgWakeup;
469
470 sendMessage(cap, other_cap, (Message*)msg);
471 }
472
473 #endif /* THREADED_RTS */
474
475 /* ---------------------------------------------------------------------------
476 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
477 * used by Control.Concurrent for error checking.
478 * ------------------------------------------------------------------------- */
479
480 HsBool
481 rtsSupportsBoundThreads(void)
482 {
483 #if defined(THREADED_RTS)
484 return HS_BOOL_TRUE;
485 #else
486 return HS_BOOL_FALSE;
487 #endif
488 }
489
490 /* ---------------------------------------------------------------------------
491 * isThreadBound(tso): check whether tso is bound to an OS thread.
492 * ------------------------------------------------------------------------- */
493
494 StgBool
495 isThreadBound(StgTSO* tso USED_IF_THREADS)
496 {
497 #if defined(THREADED_RTS)
498 return (tso->bound != NULL);
499 #endif
500 return rtsFalse;
501 }
502
503 /* ----------------------------------------------------------------------------
504 * Debugging: why is a thread blocked
505 * ------------------------------------------------------------------------- */
506
507 #if DEBUG
508 void
509 printThreadBlockage(StgTSO *tso)
510 {
511 switch (tso->why_blocked) {
512 case BlockedOnRead:
513 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
514 break;
515 case BlockedOnWrite:
516 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
517 break;
518 #if defined(mingw32_HOST_OS)
519 case BlockedOnDoProc:
520 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
521 break;
522 #endif
523 case BlockedOnDelay:
524 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
525 break;
526 case BlockedOnMVar:
527 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
528 break;
529 case BlockedOnBlackHole:
530 debugBelch("is blocked on a black hole %p",
531 ((StgBlockingQueue*)tso->block_info.bh->bh));
532 break;
533 case BlockedOnMsgWakeup:
534 debugBelch("is blocked on a wakeup message");
535 break;
536 case BlockedOnMsgThrowTo:
537 debugBelch("is blocked on a throwto message");
538 break;
539 case NotBlocked:
540 debugBelch("is not blocked");
541 break;
542 case BlockedOnCCall:
543 debugBelch("is blocked on an external call");
544 break;
545 case BlockedOnCCall_NoUnblockExc:
546 debugBelch("is blocked on an external call (exceptions were already blocked)");
547 break;
548 case BlockedOnSTM:
549 debugBelch("is blocked on an STM operation");
550 break;
551 default:
552 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
553 tso->why_blocked, tso->id, tso);
554 }
555 }
556
557
558 void
559 printThreadStatus(StgTSO *t)
560 {
561 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
562 {
563 void *label = lookupThreadLabel(t->id);
564 if (label) debugBelch("[\"%s\"] ",(char *)label);
565 }
566 if (t->what_next == ThreadRelocated) {
567 debugBelch("has been relocated...\n");
568 } else {
569 switch (t->what_next) {
570 case ThreadKilled:
571 debugBelch("has been killed");
572 break;
573 case ThreadComplete:
574 debugBelch("has completed");
575 break;
576 default:
577 printThreadBlockage(t);
578 }
579 if (t->dirty) {
580 debugBelch(" (TSO_DIRTY)");
581 } else if (t->flags & TSO_LINK_DIRTY) {
582 debugBelch(" (TSO_LINK_DIRTY)");
583 }
584 debugBelch("\n");
585 }
586 }
587
588 void
589 printAllThreads(void)
590 {
591 StgTSO *t, *next;
592 nat i, g;
593 Capability *cap;
594
595 debugBelch("all threads:\n");
596
597 for (i = 0; i < n_capabilities; i++) {
598 cap = &capabilities[i];
599 debugBelch("threads on capability %d:\n", cap->no);
600 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
601 printThreadStatus(t);
602 }
603 }
604
605 debugBelch("other threads:\n");
606 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
607 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
608 if (t->why_blocked != NotBlocked) {
609 printThreadStatus(t);
610 }
611 if (t->what_next == ThreadRelocated) {
612 next = t->_link;
613 } else {
614 next = t->global_link;
615 }
616 }
617 }
618 }
619
620 // useful from gdb
621 void
622 printThreadQueue(StgTSO *t)
623 {
624 nat i = 0;
625 for (; t != END_TSO_QUEUE; t = t->_link) {
626 printThreadStatus(t);
627 i++;
628 }
629 debugBelch("%d threads on queue\n", i);
630 }
631
632 #endif /* DEBUG */