removeThreadFromQueue: stub out the link field before returning (#4813)
[ghc.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2006
4 *
5 * Thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Capability.h"
13 #include "Updates.h"
14 #include "Threads.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19 #include "Updates.h"
20 #include "Messages.h"
21 #include "sm/Storage.h"
22
23 /* Next thread ID to allocate.
24 * LOCK: sched_mutex
25 */
26 static StgThreadID next_thread_id = 1;
27
28 /* The smallest stack size that makes any sense is:
29 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
30 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
31 * + 1 (the closure to enter)
32 * + 1 (stg_ap_v_ret)
33 * + 1 (spare slot req'd by stg_ap_v_ret)
34 *
35 * A thread with this stack will bomb immediately with a stack
36 * overflow, which will increase its stack size.
37 */
38 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
39
40 /* ---------------------------------------------------------------------------
41 Create a new thread.
42
43 The new thread starts with the given stack size. Before the
44 scheduler can run, however, this thread needs to have a closure
45 (and possibly some arguments) pushed on its stack. See
46 pushClosure() in Schedule.h.
47
48 createGenThread() and createIOThread() (in SchedAPI.h) are
49 convenient packaged versions of this function.
50
51 currently pri (priority) is only used in a GRAN setup -- HWL
52 ------------------------------------------------------------------------ */
53 StgTSO *
54 createThread(Capability *cap, nat size)
55 {
56 StgTSO *tso;
57 nat stack_size;
58
59 /* sched_mutex is *not* required */
60
61 /* First check whether we should create a thread at all */
62
63 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
64
65 /* catch ridiculously small stack sizes */
66 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
67 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
68 }
69
70 size = round_to_mblocks(size);
71 tso = (StgTSO *)allocate(cap, size);
72
73 stack_size = size - TSO_STRUCT_SIZEW;
74 TICK_ALLOC_TSO(stack_size, 0);
75
76 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
77
78 // Always start with the compiled code evaluator
79 tso->what_next = ThreadRunGHC;
80
81 tso->why_blocked = NotBlocked;
82 tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
83 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
84 tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
85 tso->flags = 0;
86 tso->dirty = 1;
87
88 tso->saved_errno = 0;
89 tso->bound = NULL;
90 tso->cap = cap;
91
92 tso->stack_size = stack_size;
93 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
94 - TSO_STRUCT_SIZEW;
95 tso->sp = (P_)&(tso->stack) + stack_size;
96
97 tso->trec = NO_TREC;
98
99 #ifdef PROFILING
100 tso->prof.CCCS = CCS_MAIN;
101 #endif
102
103 /* put a stop frame on the stack */
104 tso->sp -= sizeofW(StgStopFrame);
105 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
106 tso->_link = END_TSO_QUEUE;
107
108 /* Link the new thread on the global thread list.
109 */
110 ACQUIRE_LOCK(&sched_mutex);
111 tso->id = next_thread_id++; // while we have the mutex
112 tso->global_link = g0->threads;
113 g0->threads = tso;
114 RELEASE_LOCK(&sched_mutex);
115
116 // ToDo: report the stack size in the event?
117 traceEventCreateThread(cap, tso);
118
119 return tso;
120 }
121
122 /* ---------------------------------------------------------------------------
123 * Comparing Thread ids.
124 *
125 * This is used from STG land in the implementation of the
126 * instances of Eq/Ord for ThreadIds.
127 * ------------------------------------------------------------------------ */
128
129 int
130 cmp_thread(StgPtr tso1, StgPtr tso2)
131 {
132 StgThreadID id1 = ((StgTSO *)tso1)->id;
133 StgThreadID id2 = ((StgTSO *)tso2)->id;
134
135 if (id1 < id2) return (-1);
136 if (id1 > id2) return 1;
137 return 0;
138 }
139
140 /* ---------------------------------------------------------------------------
141 * Fetching the ThreadID from an StgTSO.
142 *
143 * This is used in the implementation of Show for ThreadIds.
144 * ------------------------------------------------------------------------ */
145 int
146 rts_getThreadId(StgPtr tso)
147 {
148 return ((StgTSO *)tso)->id;
149 }
150
151 /* -----------------------------------------------------------------------------
152 Remove a thread from a queue.
153 Fails fatally if the TSO is not on the queue.
154 -------------------------------------------------------------------------- */
155
156 rtsBool // returns True if we modified queue
157 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
158 {
159 StgTSO *t, *prev;
160
161 prev = NULL;
162 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
163 if (t == tso) {
164 if (prev) {
165 setTSOLink(cap,prev,t->_link);
166 t->_link = END_TSO_QUEUE;
167 return rtsFalse;
168 } else {
169 *queue = t->_link;
170 t->_link = END_TSO_QUEUE;
171 return rtsTrue;
172 }
173 }
174 }
175 barf("removeThreadFromQueue: not found");
176 }
177
178 rtsBool // returns True if we modified head or tail
179 removeThreadFromDeQueue (Capability *cap,
180 StgTSO **head, StgTSO **tail, StgTSO *tso)
181 {
182 StgTSO *t, *prev;
183 rtsBool flag = rtsFalse;
184
185 prev = NULL;
186 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
187 if (t == tso) {
188 if (prev) {
189 setTSOLink(cap,prev,t->_link);
190 flag = rtsFalse;
191 } else {
192 *head = t->_link;
193 flag = rtsTrue;
194 }
195 t->_link = END_TSO_QUEUE;
196 if (*tail == tso) {
197 if (prev) {
198 *tail = prev;
199 } else {
200 *tail = END_TSO_QUEUE;
201 }
202 return rtsTrue;
203 } else {
204 return flag;
205 }
206 }
207 }
208 barf("removeThreadFromMVarQueue: not found");
209 }
210
211 /* ----------------------------------------------------------------------------
212 tryWakeupThread()
213
214 Attempt to wake up a thread. tryWakeupThread is idempotent: it is
215 always safe to call it too many times, but it is not safe in
216 general to omit a call.
217
218 ------------------------------------------------------------------------- */
219
220 void
221 tryWakeupThread (Capability *cap, StgTSO *tso)
222 {
223 tryWakeupThread_(cap, deRefTSO(tso));
224 }
225
226 void
227 tryWakeupThread_ (Capability *cap, StgTSO *tso)
228 {
229 traceEventThreadWakeup (cap, tso, tso->cap->no);
230
231 #ifdef THREADED_RTS
232 if (tso->cap != cap)
233 {
234 MessageWakeup *msg;
235 msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
236 SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
237 msg->tso = tso;
238 sendMessage(cap, tso->cap, (Message*)msg);
239 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
240 (lnat)tso->id, tso->cap->no);
241 return;
242 }
243 #endif
244
245 switch (tso->why_blocked)
246 {
247 case BlockedOnMVar:
248 {
249 if (tso->_link == END_TSO_QUEUE) {
250 tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
251 goto unblock;
252 } else {
253 return;
254 }
255 }
256
257 case BlockedOnMsgThrowTo:
258 {
259 const StgInfoTable *i;
260
261 i = lockClosure(tso->block_info.closure);
262 unlockClosure(tso->block_info.closure, i);
263 if (i != &stg_MSG_NULL_info) {
264 debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
265 (lnat)tso->id, tso->block_info.throwto->header.info);
266 return;
267 }
268
269 // remove the block frame from the stack
270 ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
271 tso->sp += 3;
272 goto unblock;
273 }
274
275 case BlockedOnBlackHole:
276 case BlockedOnSTM:
277 case ThreadMigrating:
278 goto unblock;
279
280 default:
281 // otherwise, do nothing
282 return;
283 }
284
285 unblock:
286 // just run the thread now, if the BH is not really available,
287 // we'll block again.
288 tso->why_blocked = NotBlocked;
289 appendToRunQueue(cap,tso);
290
291 // We used to set the context switch flag here, which would
292 // trigger a context switch a short time in the future (at the end
293 // of the current nursery block). The idea is that we have just
294 // woken up a thread, so we may need to load-balance and migrate
295 // threads to other CPUs. On the other hand, setting the context
296 // switch flag here unfairly penalises the current thread by
297 // yielding its time slice too early.
298 //
299 // The synthetic benchmark nofib/smp/chan can be used to show the
300 // difference quite clearly.
301
302 // cap->context_switch = 1;
303 }
304
305 /* ----------------------------------------------------------------------------
306 migrateThread
307 ------------------------------------------------------------------------- */
308
309 void
310 migrateThread (Capability *from, StgTSO *tso, Capability *to)
311 {
312 traceEventMigrateThread (from, tso, to->no);
313 // ThreadMigrating tells the target cap that it needs to be added to
314 // the run queue when it receives the MSG_TRY_WAKEUP.
315 tso->why_blocked = ThreadMigrating;
316 tso->cap = to;
317 tryWakeupThread(from, tso);
318 }
319
320 /* ----------------------------------------------------------------------------
321 awakenBlockedQueue
322
323 wakes up all the threads on the specified queue.
324 ------------------------------------------------------------------------- */
325
326 void
327 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
328 {
329 MessageBlackHole *msg;
330 const StgInfoTable *i;
331
332 ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
333 bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
334
335 for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
336 msg = msg->link) {
337 i = msg->header.info;
338 if (i != &stg_IND_info) {
339 ASSERT(i == &stg_MSG_BLACKHOLE_info);
340 tryWakeupThread(cap,msg->tso);
341 }
342 }
343
344 // overwrite the BQ with an indirection so it will be
345 // collected at the next GC.
346 #if defined(DEBUG) && !defined(THREADED_RTS)
347 // XXX FILL_SLOP, but not if THREADED_RTS because in that case
348 // another thread might be looking at this BLOCKING_QUEUE and
349 // checking the owner field at the same time.
350 bq->bh = 0; bq->queue = 0; bq->owner = 0;
351 #endif
352 OVERWRITE_INFO(bq, &stg_IND_info);
353 }
354
355 // If we update a closure that we know we BLACKHOLE'd, and the closure
356 // no longer points to the current TSO as its owner, then there may be
357 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
358 // it. We therefore traverse the BLOCKING_QUEUEs attached to the
359 // current TSO to see if any can now be woken up.
360 void
361 checkBlockingQueues (Capability *cap, StgTSO *tso)
362 {
363 StgBlockingQueue *bq, *next;
364 StgClosure *p;
365
366 debugTraceCap(DEBUG_sched, cap,
367 "collision occurred; checking blocking queues for thread %ld",
368 (lnat)tso->id);
369
370 for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
371 next = bq->link;
372
373 if (bq->header.info == &stg_IND_info) {
374 // ToDo: could short it out right here, to avoid
375 // traversing this IND multiple times.
376 continue;
377 }
378
379 p = bq->bh;
380
381 if (p->header.info != &stg_BLACKHOLE_info ||
382 ((StgInd *)p)->indirectee != (StgClosure*)bq)
383 {
384 wakeBlockingQueue(cap,bq);
385 }
386 }
387 }
388
389 /* ----------------------------------------------------------------------------
390 updateThunk
391
392 Update a thunk with a value. In order to do this, we need to know
393 which TSO owns (or is evaluating) the thunk, in case we need to
394 awaken any threads that are blocked on it.
395 ------------------------------------------------------------------------- */
396
397 void
398 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
399 {
400 StgClosure *v;
401 StgTSO *owner;
402 const StgInfoTable *i;
403
404 i = thunk->header.info;
405 if (i != &stg_BLACKHOLE_info &&
406 i != &stg_CAF_BLACKHOLE_info &&
407 i != &__stg_EAGER_BLACKHOLE_info &&
408 i != &stg_WHITEHOLE_info) {
409 updateWithIndirection(cap, thunk, val);
410 return;
411 }
412
413 v = ((StgInd*)thunk)->indirectee;
414
415 updateWithIndirection(cap, thunk, val);
416
417 i = v->header.info;
418 if (i == &stg_TSO_info) {
419 owner = deRefTSO((StgTSO*)v);
420 if (owner != tso) {
421 checkBlockingQueues(cap, tso);
422 }
423 return;
424 }
425
426 if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
427 i != &stg_BLOCKING_QUEUE_DIRTY_info) {
428 checkBlockingQueues(cap, tso);
429 return;
430 }
431
432 owner = deRefTSO(((StgBlockingQueue*)v)->owner);
433
434 if (owner != tso) {
435 checkBlockingQueues(cap, tso);
436 } else {
437 wakeBlockingQueue(cap, (StgBlockingQueue*)v);
438 }
439 }
440
441 /* ---------------------------------------------------------------------------
442 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
443 * used by Control.Concurrent for error checking.
444 * ------------------------------------------------------------------------- */
445
446 HsBool
447 rtsSupportsBoundThreads(void)
448 {
449 #if defined(THREADED_RTS)
450 return HS_BOOL_TRUE;
451 #else
452 return HS_BOOL_FALSE;
453 #endif
454 }
455
456 /* ---------------------------------------------------------------------------
457 * isThreadBound(tso): check whether tso is bound to an OS thread.
458 * ------------------------------------------------------------------------- */
459
460 StgBool
461 isThreadBound(StgTSO* tso USED_IF_THREADS)
462 {
463 #if defined(THREADED_RTS)
464 return (tso->bound != NULL);
465 #endif
466 return rtsFalse;
467 }
468
469 /* ----------------------------------------------------------------------------
470 * Debugging: why is a thread blocked
471 * ------------------------------------------------------------------------- */
472
473 #if DEBUG
474 void
475 printThreadBlockage(StgTSO *tso)
476 {
477 switch (tso->why_blocked) {
478 case BlockedOnRead:
479 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
480 break;
481 case BlockedOnWrite:
482 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
483 break;
484 #if defined(mingw32_HOST_OS)
485 case BlockedOnDoProc:
486 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
487 break;
488 #endif
489 case BlockedOnDelay:
490 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
491 break;
492 case BlockedOnMVar:
493 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
494 break;
495 case BlockedOnBlackHole:
496 debugBelch("is blocked on a black hole %p",
497 ((StgBlockingQueue*)tso->block_info.bh->bh));
498 break;
499 case BlockedOnMsgThrowTo:
500 debugBelch("is blocked on a throwto message");
501 break;
502 case NotBlocked:
503 debugBelch("is not blocked");
504 break;
505 case ThreadMigrating:
506 debugBelch("is runnable, but not on the run queue");
507 break;
508 case BlockedOnCCall:
509 debugBelch("is blocked on an external call");
510 break;
511 case BlockedOnCCall_Interruptible:
512 debugBelch("is blocked on an external call (but may be interrupted)");
513 break;
514 case BlockedOnSTM:
515 debugBelch("is blocked on an STM operation");
516 break;
517 default:
518 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
519 tso->why_blocked, tso->id, tso);
520 }
521 }
522
523
524 void
525 printThreadStatus(StgTSO *t)
526 {
527 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
528 {
529 void *label = lookupThreadLabel(t->id);
530 if (label) debugBelch("[\"%s\"] ",(char *)label);
531 }
532 if (t->what_next == ThreadRelocated) {
533 debugBelch("has been relocated...\n");
534 } else {
535 switch (t->what_next) {
536 case ThreadKilled:
537 debugBelch("has been killed");
538 break;
539 case ThreadComplete:
540 debugBelch("has completed");
541 break;
542 default:
543 printThreadBlockage(t);
544 }
545 if (t->dirty) {
546 debugBelch(" (TSO_DIRTY)");
547 } else if (t->flags & TSO_LINK_DIRTY) {
548 debugBelch(" (TSO_LINK_DIRTY)");
549 }
550 debugBelch("\n");
551 }
552 }
553
554 void
555 printAllThreads(void)
556 {
557 StgTSO *t, *next;
558 nat i, g;
559 Capability *cap;
560
561 debugBelch("all threads:\n");
562
563 for (i = 0; i < n_capabilities; i++) {
564 cap = &capabilities[i];
565 debugBelch("threads on capability %d:\n", cap->no);
566 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
567 printThreadStatus(t);
568 }
569 }
570
571 debugBelch("other threads:\n");
572 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
573 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
574 if (t->why_blocked != NotBlocked) {
575 printThreadStatus(t);
576 }
577 if (t->what_next == ThreadRelocated) {
578 next = t->_link;
579 } else {
580 next = t->global_link;
581 }
582 }
583 }
584 }
585
586 // useful from gdb
587 void
588 printThreadQueue(StgTSO *t)
589 {
590 nat i = 0;
591 for (; t != END_TSO_QUEUE; t = t->_link) {
592 printThreadStatus(t);
593 i++;
594 }
595 debugBelch("%d threads on queue\n", i);
596 }
597
598 #endif /* DEBUG */