Change the representation of the MVar blocked queue
[ghc.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2006
4 *
5 * Thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Capability.h"
13 #include "Updates.h"
14 #include "Threads.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19 #include "Updates.h"
20 #include "Messages.h"
21 #include "sm/Storage.h"
22
23 /* Next thread ID to allocate.
24 * LOCK: sched_mutex
25 */
26 static StgThreadID next_thread_id = 1;
27
28 /* The smallest stack size that makes any sense is:
29 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
30 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
31 * + 1 (the closure to enter)
32 * + 1 (stg_ap_v_ret)
33 * + 1 (spare slot req'd by stg_ap_v_ret)
34 *
35 * A thread with this stack will bomb immediately with a stack
36 * overflow, which will increase its stack size.
37 */
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
39
40 /* ---------------------------------------------------------------------------
41 Create a new thread.
42
43 The new thread starts with the given stack size. Before the
44 scheduler can run, however, this thread needs to have a closure
45 (and possibly some arguments) pushed on its stack. See
46 pushClosure() in Schedule.h.
47
48 createGenThread() and createIOThread() (in SchedAPI.h) are
49 convenient packaged versions of this function.
50
51 currently pri (priority) is only used in a GRAN setup -- HWL
52 ------------------------------------------------------------------------ */
53 StgTSO *
54 createThread(Capability *cap, nat size)
55 {
56 StgTSO *tso;
57 nat stack_size;
58
59 /* sched_mutex is *not* required */
60
61 /* First check whether we should create a thread at all */
62
63 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
64
65 /* catch ridiculously small stack sizes */
66 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
68 }
69
70 size = round_to_mblocks(size);
71 tso = (StgTSO *)allocate(cap, size);
72
73 stack_size = size - TSO_STRUCT_SIZEW;
74 TICK_ALLOC_TSO(stack_size, 0);
75
76 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
77
78 // Always start with the compiled code evaluator
79 tso->what_next = ThreadRunGHC;
80
81 tso->why_blocked = NotBlocked;
82 tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84 tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
85 tso->flags = 0;
86 tso->dirty = 1;
87
88 tso->saved_errno = 0;
89 tso->bound = NULL;
90 tso->cap = cap;
91
92 tso->stack_size = stack_size;
93 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
94 - TSO_STRUCT_SIZEW;
95 tso->sp = (P_)&(tso->stack) + stack_size;
96
97 tso->trec = NO_TREC;
98
99 #ifdef PROFILING
100 tso->prof.CCCS = CCS_MAIN;
101 #endif
102
103 /* put a stop frame on the stack */
104 tso->sp -= sizeofW(StgStopFrame);
105 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106 tso->_link = END_TSO_QUEUE;
107
108 /* Link the new thread on the global thread list.
109 */
110 ACQUIRE_LOCK(&sched_mutex);
111 tso->id = next_thread_id++; // while we have the mutex
112 tso->global_link = g0->threads;
113 g0->threads = tso;
114 RELEASE_LOCK(&sched_mutex);
115
116 // ToDo: report the stack size in the event?
117 traceEventCreateThread(cap, tso);
118
119 return tso;
120 }
121
122 /* ---------------------------------------------------------------------------
123 * Comparing Thread ids.
124 *
125 * This is used from STG land in the implementation of the
126 * instances of Eq/Ord for ThreadIds.
127 * ------------------------------------------------------------------------ */
128
129 int
130 cmp_thread(StgPtr tso1, StgPtr tso2)
131 {
132 StgThreadID id1 = ((StgTSO *)tso1)->id;
133 StgThreadID id2 = ((StgTSO *)tso2)->id;
134
135 if (id1 < id2) return (-1);
136 if (id1 > id2) return 1;
137 return 0;
138 }
139
140 /* ---------------------------------------------------------------------------
141 * Fetching the ThreadID from an StgTSO.
142 *
143 * This is used in the implementation of Show for ThreadIds.
144 * ------------------------------------------------------------------------ */
145 int
146 rts_getThreadId(StgPtr tso)
147 {
148 return ((StgTSO *)tso)->id;
149 }
150
151 /* -----------------------------------------------------------------------------
152 Remove a thread from a queue.
153 Fails fatally if the TSO is not on the queue.
154 -------------------------------------------------------------------------- */
155
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
158 {
159 StgTSO *t, *prev;
160
161 prev = NULL;
162 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
163 if (t == tso) {
164 if (prev) {
165 setTSOLink(cap,prev,t->_link);
166 return rtsFalse;
167 } else {
168 *queue = t->_link;
169 return rtsTrue;
170 }
171 }
172 }
173 barf("removeThreadFromQueue: not found");
174 }
175
176 rtsBool // returns True if we modified head or tail
177 removeThreadFromDeQueue (Capability *cap,
178 StgTSO **head, StgTSO **tail, StgTSO *tso)
179 {
180 StgTSO *t, *prev;
181 rtsBool flag = rtsFalse;
182
183 prev = NULL;
184 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
185 if (t == tso) {
186 if (prev) {
187 setTSOLink(cap,prev,t->_link);
188 flag = rtsFalse;
189 } else {
190 *head = t->_link;
191 flag = rtsTrue;
192 }
193 if (*tail == tso) {
194 if (prev) {
195 *tail = prev;
196 } else {
197 *tail = END_TSO_QUEUE;
198 }
199 return rtsTrue;
200 } else {
201 return flag;
202 }
203 }
204 }
205 barf("removeThreadFromMVarQueue: not found");
206 }
207
208 /* ----------------------------------------------------------------------------
209 tryWakeupThread()
210
211 Attempt to wake up a thread. tryWakeupThread is idempotent: it is
212 always safe to call it too many times, but it is not safe in
213 general to omit a call.
214
215 ------------------------------------------------------------------------- */
216
217 void
218 tryWakeupThread (Capability *cap, StgTSO *tso)
219 {
220
221 traceEventThreadWakeup (cap, tso, tso->cap->no);
222
223 #ifdef THREADED_RTS
224 if (tso->cap != cap)
225 {
226 MessageWakeup *msg;
227 msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
228 SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
229 msg->tso = tso;
230 sendMessage(cap, tso->cap, (Message*)msg);
231 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
232 (lnat)tso->id, tso->cap->no);
233 return;
234 }
235 #endif
236
237 switch (tso->why_blocked)
238 {
239 case BlockedOnMVar:
240 {
241 if (tso->_link == END_TSO_QUEUE) {
242 tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
243 goto unblock;
244 } else {
245 return;
246 }
247 }
248
249 case BlockedOnMsgThrowTo:
250 {
251 const StgInfoTable *i;
252
253 i = lockClosure(tso->block_info.closure);
254 unlockClosure(tso->block_info.closure, i);
255 if (i != &stg_MSG_NULL_info) {
256 debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
257 (lnat)tso->id, tso->block_info.throwto->header.info);
258 return;
259 }
260
261 // remove the block frame from the stack
262 ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
263 tso->sp += 3;
264 goto unblock;
265 }
266
267 case BlockedOnBlackHole:
268 case BlockedOnSTM:
269 case ThreadMigrating:
270 goto unblock;
271
272 default:
273 // otherwise, do nothing
274 return;
275 }
276
277 unblock:
278 // just run the thread now, if the BH is not really available,
279 // we'll block again.
280 tso->why_blocked = NotBlocked;
281 appendToRunQueue(cap,tso);
282 }
283
284 /* ----------------------------------------------------------------------------
285 migrateThread
286 ------------------------------------------------------------------------- */
287
288 void
289 migrateThread (Capability *from, StgTSO *tso, Capability *to)
290 {
291 traceEventMigrateThread (from, tso, to->no);
292 // ThreadMigrating tells the target cap that it needs to be added to
293 // the run queue when it receives the MSG_TRY_WAKEUP.
294 tso->what_next = ThreadMigrating;
295 tso->cap = to;
296 tryWakeupThread(from, tso);
297 }
298
299 /* ----------------------------------------------------------------------------
300 awakenBlockedQueue
301
302 wakes up all the threads on the specified queue.
303 ------------------------------------------------------------------------- */
304
305 void
306 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
307 {
308 MessageBlackHole *msg;
309 const StgInfoTable *i;
310
311 ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
312 bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
313
314 for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
315 msg = msg->link) {
316 i = msg->header.info;
317 if (i != &stg_IND_info) {
318 ASSERT(i == &stg_MSG_BLACKHOLE_info);
319 tryWakeupThread(cap,msg->tso);
320 }
321 }
322
323 // overwrite the BQ with an indirection so it will be
324 // collected at the next GC.
325 #if defined(DEBUG) && !defined(THREADED_RTS)
326 // XXX FILL_SLOP, but not if THREADED_RTS because in that case
327 // another thread might be looking at this BLOCKING_QUEUE and
328 // checking the owner field at the same time.
329 bq->bh = 0; bq->queue = 0; bq->owner = 0;
330 #endif
331 OVERWRITE_INFO(bq, &stg_IND_info);
332 }
333
334 // If we update a closure that we know we BLACKHOLE'd, and the closure
335 // no longer points to the current TSO as its owner, then there may be
336 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
337 // it. We therefore traverse the BLOCKING_QUEUEs attached to the
338 // current TSO to see if any can now be woken up.
339 void
340 checkBlockingQueues (Capability *cap, StgTSO *tso)
341 {
342 StgBlockingQueue *bq, *next;
343 StgClosure *p;
344
345 debugTraceCap(DEBUG_sched, cap,
346 "collision occurred; checking blocking queues for thread %ld",
347 (lnat)tso->id);
348
349 for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
350 next = bq->link;
351
352 if (bq->header.info == &stg_IND_info) {
353 // ToDo: could short it out right here, to avoid
354 // traversing this IND multiple times.
355 continue;
356 }
357
358 p = bq->bh;
359
360 if (p->header.info != &stg_BLACKHOLE_info ||
361 ((StgInd *)p)->indirectee != (StgClosure*)bq)
362 {
363 wakeBlockingQueue(cap,bq);
364 }
365 }
366 }
367
368 /* ----------------------------------------------------------------------------
369 updateThunk
370
371 Update a thunk with a value. In order to do this, we need to know
372 which TSO owns (or is evaluating) the thunk, in case we need to
373 awaken any threads that are blocked on it.
374 ------------------------------------------------------------------------- */
375
376 void
377 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
378 {
379 StgClosure *v;
380 StgTSO *owner;
381 const StgInfoTable *i;
382
383 i = thunk->header.info;
384 if (i != &stg_BLACKHOLE_info &&
385 i != &stg_CAF_BLACKHOLE_info &&
386 i != &stg_WHITEHOLE_info) {
387 updateWithIndirection(cap, thunk, val);
388 return;
389 }
390
391 v = ((StgInd*)thunk)->indirectee;
392
393 updateWithIndirection(cap, thunk, val);
394
395 i = v->header.info;
396 if (i == &stg_TSO_info) {
397 owner = deRefTSO((StgTSO*)v);
398 if (owner != tso) {
399 checkBlockingQueues(cap, tso);
400 }
401 return;
402 }
403
404 if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
405 i != &stg_BLOCKING_QUEUE_DIRTY_info) {
406 checkBlockingQueues(cap, tso);
407 return;
408 }
409
410 owner = deRefTSO(((StgBlockingQueue*)v)->owner);
411
412 if (owner != tso) {
413 checkBlockingQueues(cap, tso);
414 } else {
415 wakeBlockingQueue(cap, (StgBlockingQueue*)v);
416 }
417 }
418
419 /* ---------------------------------------------------------------------------
420 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
421 * used by Control.Concurrent for error checking.
422 * ------------------------------------------------------------------------- */
423
424 HsBool
425 rtsSupportsBoundThreads(void)
426 {
427 #if defined(THREADED_RTS)
428 return HS_BOOL_TRUE;
429 #else
430 return HS_BOOL_FALSE;
431 #endif
432 }
433
434 /* ---------------------------------------------------------------------------
435 * isThreadBound(tso): check whether tso is bound to an OS thread.
436 * ------------------------------------------------------------------------- */
437
438 StgBool
439 isThreadBound(StgTSO* tso USED_IF_THREADS)
440 {
441 #if defined(THREADED_RTS)
442 return (tso->bound != NULL);
443 #endif
444 return rtsFalse;
445 }
446
447 /* ----------------------------------------------------------------------------
448 * Debugging: why is a thread blocked
449 * ------------------------------------------------------------------------- */
450
451 #if DEBUG
452 void
453 printThreadBlockage(StgTSO *tso)
454 {
455 switch (tso->why_blocked) {
456 case BlockedOnRead:
457 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
458 break;
459 case BlockedOnWrite:
460 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
461 break;
462 #if defined(mingw32_HOST_OS)
463 case BlockedOnDoProc:
464 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
465 break;
466 #endif
467 case BlockedOnDelay:
468 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
469 break;
470 case BlockedOnMVar:
471 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
472 break;
473 case BlockedOnBlackHole:
474 debugBelch("is blocked on a black hole %p",
475 ((StgBlockingQueue*)tso->block_info.bh->bh));
476 break;
477 case BlockedOnMsgThrowTo:
478 debugBelch("is blocked on a throwto message");
479 break;
480 case NotBlocked:
481 debugBelch("is not blocked");
482 break;
483 case ThreadMigrating:
484 debugBelch("is runnable, but not on the run queue");
485 break;
486 case BlockedOnCCall:
487 debugBelch("is blocked on an external call");
488 break;
489 case BlockedOnCCall_NoUnblockExc:
490 debugBelch("is blocked on an external call (exceptions were already blocked)");
491 break;
492 case BlockedOnSTM:
493 debugBelch("is blocked on an STM operation");
494 break;
495 default:
496 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
497 tso->why_blocked, tso->id, tso);
498 }
499 }
500
501
502 void
503 printThreadStatus(StgTSO *t)
504 {
505 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
506 {
507 void *label = lookupThreadLabel(t->id);
508 if (label) debugBelch("[\"%s\"] ",(char *)label);
509 }
510 if (t->what_next == ThreadRelocated) {
511 debugBelch("has been relocated...\n");
512 } else {
513 switch (t->what_next) {
514 case ThreadKilled:
515 debugBelch("has been killed");
516 break;
517 case ThreadComplete:
518 debugBelch("has completed");
519 break;
520 default:
521 printThreadBlockage(t);
522 }
523 if (t->dirty) {
524 debugBelch(" (TSO_DIRTY)");
525 } else if (t->flags & TSO_LINK_DIRTY) {
526 debugBelch(" (TSO_LINK_DIRTY)");
527 }
528 debugBelch("\n");
529 }
530 }
531
532 void
533 printAllThreads(void)
534 {
535 StgTSO *t, *next;
536 nat i, g;
537 Capability *cap;
538
539 debugBelch("all threads:\n");
540
541 for (i = 0; i < n_capabilities; i++) {
542 cap = &capabilities[i];
543 debugBelch("threads on capability %d:\n", cap->no);
544 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
545 printThreadStatus(t);
546 }
547 }
548
549 debugBelch("other threads:\n");
550 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
551 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
552 if (t->why_blocked != NotBlocked) {
553 printThreadStatus(t);
554 }
555 if (t->what_next == ThreadRelocated) {
556 next = t->_link;
557 } else {
558 next = t->global_link;
559 }
560 }
561 }
562 }
563
564 // useful from gdb
565 void
566 printThreadQueue(StgTSO *t)
567 {
568 nat i = 0;
569 for (; t != END_TSO_QUEUE; t = t->_link) {
570 printThreadStatus(t);
571 i++;
572 }
573 debugBelch("%d threads on queue\n", i);
574 }
575
576 #endif /* DEBUG */