Asynchronous exception support for SMP
[ghc.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2006
4 *
5 * Thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "Storage.h"
13 #include "Threads.h"
14 #include "RtsFlags.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19
20 /* Next thread ID to allocate.
21 * LOCK: sched_mutex
22 */
23 static StgThreadID next_thread_id = 1;
24
25 /* The smallest stack size that makes any sense is:
26 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
27 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
28 * + 1 (the closure to enter)
29 * + 1 (stg_ap_v_ret)
30 * + 1 (spare slot req'd by stg_ap_v_ret)
31 *
32 * A thread with this stack will bomb immediately with a stack
33 * overflow, which will increase its stack size.
34 */
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
36
37 /* ---------------------------------------------------------------------------
38 Create a new thread.
39
40 The new thread starts with the given stack size. Before the
41 scheduler can run, however, this thread needs to have a closure
42 (and possibly some arguments) pushed on its stack. See
43 pushClosure() in Schedule.h.
44
45 createGenThread() and createIOThread() (in SchedAPI.h) are
46 convenient packaged versions of this function.
47
48 currently pri (priority) is only used in a GRAN setup -- HWL
49 ------------------------------------------------------------------------ */
50 #if defined(GRAN)
51 /* currently pri (priority) is only used in a GRAN setup -- HWL */
52 StgTSO *
53 createThread(nat size, StgInt pri)
54 #else
55 StgTSO *
56 createThread(Capability *cap, nat size)
57 #endif
58 {
59 StgTSO *tso;
60 nat stack_size;
61
62 /* sched_mutex is *not* required */
63
64 /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
68 threadsIgnored++;
69 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
71 return END_TSO_QUEUE;
72 }
73 threadsCreated++;
74 #endif
75
76 #if defined(GRAN)
77 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
78 #endif
79
80 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
81
82 /* catch ridiculously small stack sizes */
83 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
85 }
86
87 stack_size = size - TSO_STRUCT_SIZEW;
88
89 tso = (StgTSO *)allocateLocal(cap, size);
90 TICK_ALLOC_TSO(stack_size, 0);
91
92 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
93 #if defined(GRAN)
94 SET_GRAN_HDR(tso, ThisPE);
95 #endif
96
97 // Always start with the compiled code evaluator
98 tso->what_next = ThreadRunGHC;
99
100 tso->why_blocked = NotBlocked;
101 tso->blocked_exceptions = END_TSO_QUEUE;
102 tso->flags = TSO_DIRTY;
103
104 tso->saved_errno = 0;
105 tso->bound = NULL;
106 tso->cap = cap;
107
108 tso->stack_size = stack_size;
109 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
110 - TSO_STRUCT_SIZEW;
111 tso->sp = (P_)&(tso->stack) + stack_size;
112
113 tso->trec = NO_TREC;
114
115 #ifdef PROFILING
116 tso->prof.CCCS = CCS_MAIN;
117 #endif
118
119 /* put a stop frame on the stack */
120 tso->sp -= sizeofW(StgStopFrame);
121 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122 tso->link = END_TSO_QUEUE;
123
124 // ToDo: check this
125 #if defined(GRAN)
126 /* uses more flexible routine in GranSim */
127 insertThread(tso, CurrentProc);
128 #else
129 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
130 * from its creation
131 */
132 #endif
133
134 #if defined(GRAN)
135 if (RtsFlags.GranFlags.GranSimStats.Full)
136 DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138 if (RtsFlags.ParFlags.ParStats.Full)
139 DumpGranEvent(GR_STARTQ,tso);
140 /* HACk to avoid SCHEDULE
141 LastTSO = tso; */
142 #endif
143
144 /* Link the new thread on the global thread list.
145 */
146 ACQUIRE_LOCK(&sched_mutex);
147 tso->id = next_thread_id++; // while we have the mutex
148 tso->global_link = all_threads;
149 all_threads = tso;
150 RELEASE_LOCK(&sched_mutex);
151
152 #if defined(DIST)
153 tso->dist.priority = MandatoryPriority; //by default that is...
154 #endif
155
156 #if defined(GRAN)
157 tso->gran.pri = pri;
158 # if defined(DEBUG)
159 tso->gran.magic = TSO_MAGIC; // debugging only
160 # endif
161 tso->gran.sparkname = 0;
162 tso->gran.startedat = CURRENT_TIME;
163 tso->gran.exported = 0;
164 tso->gran.basicblocks = 0;
165 tso->gran.allocs = 0;
166 tso->gran.exectime = 0;
167 tso->gran.fetchtime = 0;
168 tso->gran.fetchcount = 0;
169 tso->gran.blocktime = 0;
170 tso->gran.blockcount = 0;
171 tso->gran.blockedat = 0;
172 tso->gran.globalsparks = 0;
173 tso->gran.localsparks = 0;
174 if (RtsFlags.GranFlags.Light)
175 tso->gran.clock = Now; /* local clock */
176 else
177 tso->gran.clock = 0;
178
179 IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
181 # if defined(DEBUG)
182 tso->par.magic = TSO_MAGIC; // debugging only
183 # endif
184 tso->par.sparkname = 0;
185 tso->par.startedat = CURRENT_TIME;
186 tso->par.exported = 0;
187 tso->par.basicblocks = 0;
188 tso->par.allocs = 0;
189 tso->par.exectime = 0;
190 tso->par.fetchtime = 0;
191 tso->par.fetchcount = 0;
192 tso->par.blocktime = 0;
193 tso->par.blockcount = 0;
194 tso->par.blockedat = 0;
195 tso->par.globalsparks = 0;
196 tso->par.localsparks = 0;
197 #endif
198
199 #if defined(GRAN)
200 globalGranStats.tot_threads_created++;
201 globalGranStats.threads_created_on_PE[CurrentProc]++;
202 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203 globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205 // collect parallel global statistics (currently done together with GC stats)
206 if (RtsFlags.ParFlags.ParStats.Global &&
207 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
209 globalParStats.tot_threads_created++;
210 }
211 #endif
212
213 #if defined(GRAN)
214 debugTrace(GRAN_DEBUG_pri,
215 "==__ schedule: Created TSO %d (%p);",
216 CurrentProc, tso, tso->id);
217 #elif defined(PARALLEL_HASKELL)
218 debugTrace(PAR_DEBUG_verbose,
219 "==__ schedule: Created TSO %d (%p); %d threads active",
220 (long)tso->id, tso, advisory_thread_count);
221 #else
222 debugTrace(DEBUG_sched,
223 "created thread %ld, stack size = %lx words",
224 (long)tso->id, (long)tso->stack_size);
225 #endif
226 return tso;
227 }
228
229 #if defined(PAR)
230 /* RFP:
231 all parallel thread creation calls should fall through the following routine.
232 */
233 StgTSO *
234 createThreadFromSpark(rtsSpark spark)
235 { StgTSO *tso;
236 ASSERT(spark != (rtsSpark)NULL);
237 // JB: TAKE CARE OF THIS COUNTER! BUGGY
238 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
239 { threadsIgnored++;
240 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
241 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
242 return END_TSO_QUEUE;
243 }
244 else
245 { threadsCreated++;
246 tso = createThread(RtsFlags.GcFlags.initialStkSize);
247 if (tso==END_TSO_QUEUE)
248 barf("createSparkThread: Cannot create TSO");
249 #if defined(DIST)
250 tso->priority = AdvisoryPriority;
251 #endif
252 pushClosure(tso,spark);
253 addToRunQueue(tso);
254 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
255 }
256 return tso;
257 }
258 #endif
259
260 /* ---------------------------------------------------------------------------
261 * Comparing Thread ids.
262 *
263 * This is used from STG land in the implementation of the
264 * instances of Eq/Ord for ThreadIds.
265 * ------------------------------------------------------------------------ */
266
267 int
268 cmp_thread(StgPtr tso1, StgPtr tso2)
269 {
270 StgThreadID id1 = ((StgTSO *)tso1)->id;
271 StgThreadID id2 = ((StgTSO *)tso2)->id;
272
273 if (id1 < id2) return (-1);
274 if (id1 > id2) return 1;
275 return 0;
276 }
277
278 /* ---------------------------------------------------------------------------
279 * Fetching the ThreadID from an StgTSO.
280 *
281 * This is used in the implementation of Show for ThreadIds.
282 * ------------------------------------------------------------------------ */
283 int
284 rts_getThreadId(StgPtr tso)
285 {
286 return ((StgTSO *)tso)->id;
287 }
288
289 /* -----------------------------------------------------------------------------
290 Remove a thread from a queue.
291 Fails fatally if the TSO is not on the queue.
292 -------------------------------------------------------------------------- */
293
294 void
295 removeThreadFromQueue (StgTSO **queue, StgTSO *tso)
296 {
297 StgTSO *t, *prev;
298
299 prev = NULL;
300 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->link) {
301 if (t == tso) {
302 if (prev) {
303 prev->link = t->link;
304 } else {
305 *queue = t->link;
306 }
307 return;
308 }
309 }
310 barf("removeThreadFromQueue: not found");
311 }
312
313 void
314 removeThreadFromDeQueue (StgTSO **head, StgTSO **tail, StgTSO *tso)
315 {
316 StgTSO *t, *prev;
317
318 prev = NULL;
319 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->link) {
320 if (t == tso) {
321 if (prev) {
322 prev->link = t->link;
323 } else {
324 *head = t->link;
325 }
326 if (*tail == tso) {
327 if (prev) {
328 *tail = prev;
329 } else {
330 *tail = END_TSO_QUEUE;
331 }
332 }
333 return;
334 }
335 }
336 barf("removeThreadFromMVarQueue: not found");
337 }
338
339 void
340 removeThreadFromMVarQueue (StgMVar *mvar, StgTSO *tso)
341 {
342 removeThreadFromDeQueue (&mvar->head, &mvar->tail, tso);
343 }
344
345 /* ----------------------------------------------------------------------------
346 unblockOne()
347
348 unblock a single thread.
349 ------------------------------------------------------------------------- */
350
351 #if defined(GRAN)
352 STATIC_INLINE void
353 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
354 {
355 }
356 #elif defined(PARALLEL_HASKELL)
357 STATIC_INLINE void
358 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
359 {
360 /* write RESUME events to log file and
361 update blocked and fetch time (depending on type of the orig closure) */
362 if (RtsFlags.ParFlags.ParStats.Full) {
363 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
364 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
365 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
366 if (emptyRunQueue())
367 emitSchedule = rtsTrue;
368
369 switch (get_itbl(node)->type) {
370 case FETCH_ME_BQ:
371 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
372 break;
373 case RBH:
374 case FETCH_ME:
375 case BLACKHOLE_BQ:
376 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
377 break;
378 #ifdef DIST
379 case MVAR:
380 break;
381 #endif
382 default:
383 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
384 }
385 }
386 }
387 #endif
388
389 #if defined(GRAN)
390 StgBlockingQueueElement *
391 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
392 {
393 StgTSO *tso;
394 PEs node_loc, tso_loc;
395
396 node_loc = where_is(node); // should be lifted out of loop
397 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
398 tso_loc = where_is((StgClosure *)tso);
399 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
400 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
401 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
402 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
403 // insertThread(tso, node_loc);
404 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
405 ResumeThread,
406 tso, node, (rtsSpark*)NULL);
407 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
408 // len_local++;
409 // len++;
410 } else { // TSO is remote (actually should be FMBQ)
411 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
412 RtsFlags.GranFlags.Costs.gunblocktime +
413 RtsFlags.GranFlags.Costs.latency;
414 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
415 UnblockThread,
416 tso, node, (rtsSpark*)NULL);
417 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
418 // len++;
419 }
420 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
421 IF_GRAN_DEBUG(bq,
422 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
423 (node_loc==tso_loc ? "Local" : "Global"),
424 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
425 tso->block_info.closure = NULL;
426 debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
427 tso->id, tso));
428 }
429 #elif defined(PARALLEL_HASKELL)
430 StgBlockingQueueElement *
431 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
432 {
433 StgBlockingQueueElement *next;
434
435 switch (get_itbl(bqe)->type) {
436 case TSO:
437 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
438 /* if it's a TSO just push it onto the run_queue */
439 next = bqe->link;
440 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
441 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
442 threadRunnable();
443 unblockCount(bqe, node);
444 /* reset blocking status after dumping event */
445 ((StgTSO *)bqe)->why_blocked = NotBlocked;
446 break;
447
448 case BLOCKED_FETCH:
449 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
450 next = bqe->link;
451 bqe->link = (StgBlockingQueueElement *)PendingFetches;
452 PendingFetches = (StgBlockedFetch *)bqe;
453 break;
454
455 # if defined(DEBUG)
456 /* can ignore this case in a non-debugging setup;
457 see comments on RBHSave closures above */
458 case CONSTR:
459 /* check that the closure is an RBHSave closure */
460 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
461 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
462 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
463 break;
464
465 default:
466 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
467 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
468 (StgClosure *)bqe);
469 # endif
470 }
471 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
472 return next;
473 }
474 #endif
475
476 StgTSO *
477 unblockOne (Capability *cap, StgTSO *tso)
478 {
479 return unblockOne_(cap,tso,rtsTrue); // allow migration
480 }
481
482 StgTSO *
483 unblockOne_ (Capability *cap, StgTSO *tso,
484 rtsBool allow_migrate USED_IF_THREADS)
485 {
486 StgTSO *next;
487
488 ASSERT(get_itbl(tso)->type == TSO);
489 ASSERT(tso->why_blocked != NotBlocked);
490
491 tso->why_blocked = NotBlocked;
492 next = tso->link;
493 tso->link = END_TSO_QUEUE;
494
495 #if defined(THREADED_RTS)
496 if (tso->cap == cap || (!tsoLocked(tso) &&
497 allow_migrate &&
498 RtsFlags.ParFlags.wakeupMigrate)) {
499 // We are waking up this thread on the current Capability, which
500 // might involve migrating it from the Capability it was last on.
501 if (tso->bound) {
502 ASSERT(tso->bound->cap == tso->cap);
503 tso->bound->cap = cap;
504 }
505 tso->cap = cap;
506 appendToRunQueue(cap,tso);
507 // we're holding a newly woken thread, make sure we context switch
508 // quickly so we can migrate it if necessary.
509 context_switch = 1;
510 } else {
511 // we'll try to wake it up on the Capability it was last on.
512 wakeupThreadOnCapability_lock(tso->cap, tso);
513 }
514 #else
515 appendToRunQueue(cap,tso);
516 context_switch = 1;
517 #endif
518
519 debugTrace(DEBUG_sched,
520 "waking up thread %ld on cap %d",
521 (long)tso->id, tso->cap->no);
522
523 return next;
524 }
525
526 /* ----------------------------------------------------------------------------
527 awakenBlockedQueue
528
529 wakes up all the threads on the specified queue.
530 ------------------------------------------------------------------------- */
531
532 #if defined(GRAN)
533 void
534 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
535 {
536 StgBlockingQueueElement *bqe;
537 PEs node_loc;
538 nat len = 0;
539
540 IF_GRAN_DEBUG(bq,
541 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
542 node, CurrentProc, CurrentTime[CurrentProc],
543 CurrentTSO->id, CurrentTSO));
544
545 node_loc = where_is(node);
546
547 ASSERT(q == END_BQ_QUEUE ||
548 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
549 get_itbl(q)->type == CONSTR); // closure (type constructor)
550 ASSERT(is_unique(node));
551
552 /* FAKE FETCH: magically copy the node to the tso's proc;
553 no Fetch necessary because in reality the node should not have been
554 moved to the other PE in the first place
555 */
556 if (CurrentProc!=node_loc) {
557 IF_GRAN_DEBUG(bq,
558 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
559 node, node_loc, CurrentProc, CurrentTSO->id,
560 // CurrentTSO, where_is(CurrentTSO),
561 node->header.gran.procs));
562 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
563 IF_GRAN_DEBUG(bq,
564 debugBelch("## new bitmask of node %p is %#x\n",
565 node, node->header.gran.procs));
566 if (RtsFlags.GranFlags.GranSimStats.Global) {
567 globalGranStats.tot_fake_fetches++;
568 }
569 }
570
571 bqe = q;
572 // ToDo: check: ASSERT(CurrentProc==node_loc);
573 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
574 //next = bqe->link;
575 /*
576 bqe points to the current element in the queue
577 next points to the next element in the queue
578 */
579 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
580 //tso_loc = where_is(tso);
581 len++;
582 bqe = unblockOne(bqe, node);
583 }
584
585 /* if this is the BQ of an RBH, we have to put back the info ripped out of
586 the closure to make room for the anchor of the BQ */
587 if (bqe!=END_BQ_QUEUE) {
588 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
589 /*
590 ASSERT((info_ptr==&RBH_Save_0_info) ||
591 (info_ptr==&RBH_Save_1_info) ||
592 (info_ptr==&RBH_Save_2_info));
593 */
594 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
595 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
596 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
597
598 IF_GRAN_DEBUG(bq,
599 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
600 node, info_type(node)));
601 }
602
603 /* statistics gathering */
604 if (RtsFlags.GranFlags.GranSimStats.Global) {
605 // globalGranStats.tot_bq_processing_time += bq_processing_time;
606 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
607 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
608 globalGranStats.tot_awbq++; // total no. of bqs awakened
609 }
610 IF_GRAN_DEBUG(bq,
611 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
612 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
613 }
614 #elif defined(PARALLEL_HASKELL)
615 void
616 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
617 {
618 StgBlockingQueueElement *bqe;
619
620 IF_PAR_DEBUG(verbose,
621 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
622 node, mytid));
623 #ifdef DIST
624 //RFP
625 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
626 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
627 return;
628 }
629 #endif
630
631 ASSERT(q == END_BQ_QUEUE ||
632 get_itbl(q)->type == TSO ||
633 get_itbl(q)->type == BLOCKED_FETCH ||
634 get_itbl(q)->type == CONSTR);
635
636 bqe = q;
637 while (get_itbl(bqe)->type==TSO ||
638 get_itbl(bqe)->type==BLOCKED_FETCH) {
639 bqe = unblockOne(bqe, node);
640 }
641 }
642
643 #else /* !GRAN && !PARALLEL_HASKELL */
644
645 void
646 awakenBlockedQueue(Capability *cap, StgTSO *tso)
647 {
648 while (tso != END_TSO_QUEUE) {
649 tso = unblockOne(cap,tso);
650 }
651 }
652 #endif
653
654
655 /* ---------------------------------------------------------------------------
656 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
657 * used by Control.Concurrent for error checking.
658 * ------------------------------------------------------------------------- */
659
660 StgBool
661 rtsSupportsBoundThreads(void)
662 {
663 #if defined(THREADED_RTS)
664 return rtsTrue;
665 #else
666 return rtsFalse;
667 #endif
668 }
669
670 /* ---------------------------------------------------------------------------
671 * isThreadBound(tso): check whether tso is bound to an OS thread.
672 * ------------------------------------------------------------------------- */
673
674 StgBool
675 isThreadBound(StgTSO* tso USED_IF_THREADS)
676 {
677 #if defined(THREADED_RTS)
678 return (tso->bound != NULL);
679 #endif
680 return rtsFalse;
681 }
682
683 /* ----------------------------------------------------------------------------
684 * Debugging: why is a thread blocked
685 * ------------------------------------------------------------------------- */
686
687 #if DEBUG
688 void
689 printThreadBlockage(StgTSO *tso)
690 {
691 switch (tso->why_blocked) {
692 case BlockedOnRead:
693 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
694 break;
695 case BlockedOnWrite:
696 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
697 break;
698 #if defined(mingw32_HOST_OS)
699 case BlockedOnDoProc:
700 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
701 break;
702 #endif
703 case BlockedOnDelay:
704 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
705 break;
706 case BlockedOnMVar:
707 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
708 break;
709 case BlockedOnException:
710 debugBelch("is blocked on delivering an exception to thread %d",
711 tso->block_info.tso->id);
712 break;
713 case BlockedOnBlackHole:
714 debugBelch("is blocked on a black hole");
715 break;
716 case NotBlocked:
717 debugBelch("is not blocked");
718 break;
719 #if defined(PARALLEL_HASKELL)
720 case BlockedOnGA:
721 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
722 tso->block_info.closure, info_type(tso->block_info.closure));
723 break;
724 case BlockedOnGA_NoSend:
725 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
726 tso->block_info.closure, info_type(tso->block_info.closure));
727 break;
728 #endif
729 case BlockedOnCCall:
730 debugBelch("is blocked on an external call");
731 break;
732 case BlockedOnCCall_NoUnblockExc:
733 debugBelch("is blocked on an external call (exceptions were already blocked)");
734 break;
735 case BlockedOnSTM:
736 debugBelch("is blocked on an STM operation");
737 break;
738 default:
739 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
740 tso->why_blocked, tso->id, tso);
741 }
742 }
743
744 void
745 printThreadStatus(StgTSO *t)
746 {
747 debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
748 {
749 void *label = lookupThreadLabel(t->id);
750 if (label) debugBelch("[\"%s\"] ",(char *)label);
751 }
752 if (t->what_next == ThreadRelocated) {
753 debugBelch("has been relocated...\n");
754 } else {
755 switch (t->what_next) {
756 case ThreadKilled:
757 debugBelch("has been killed");
758 break;
759 case ThreadComplete:
760 debugBelch("has completed");
761 break;
762 default:
763 printThreadBlockage(t);
764 }
765 debugBelch("\n");
766 }
767 }
768
769 void
770 printAllThreads(void)
771 {
772 StgTSO *t, *next;
773 nat i;
774 Capability *cap;
775
776 # if defined(GRAN)
777 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
778 ullong_format_string(TIME_ON_PROC(CurrentProc),
779 time_string, rtsFalse/*no commas!*/);
780
781 debugBelch("all threads at [%s]:\n", time_string);
782 # elif defined(PARALLEL_HASKELL)
783 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
784 ullong_format_string(CURRENT_TIME,
785 time_string, rtsFalse/*no commas!*/);
786
787 debugBelch("all threads at [%s]:\n", time_string);
788 # else
789 debugBelch("all threads:\n");
790 # endif
791
792 for (i = 0; i < n_capabilities; i++) {
793 cap = &capabilities[i];
794 debugBelch("threads on capability %d:\n", cap->no);
795 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
796 printThreadStatus(t);
797 }
798 }
799
800 debugBelch("other threads:\n");
801 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
802 if (t->why_blocked != NotBlocked) {
803 printThreadStatus(t);
804 }
805 if (t->what_next == ThreadRelocated) {
806 next = t->link;
807 } else {
808 next = t->global_link;
809 }
810 }
811 }
812
813 // useful from gdb
814 void
815 printThreadQueue(StgTSO *t)
816 {
817 nat i = 0;
818 for (; t != END_TSO_QUEUE; t = t->link) {
819 printThreadStatus(t);
820 i++;
821 }
822 debugBelch("%d threads on queue\n", i);
823 }
824
825 /*
826 Print a whole blocking queue attached to node (debugging only).
827 */
828 # if defined(PARALLEL_HASKELL)
829 void
830 print_bq (StgClosure *node)
831 {
832 StgBlockingQueueElement *bqe;
833 StgTSO *tso;
834 rtsBool end;
835
836 debugBelch("## BQ of closure %p (%s): ",
837 node, info_type(node));
838
839 /* should cover all closures that may have a blocking queue */
840 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
841 get_itbl(node)->type == FETCH_ME_BQ ||
842 get_itbl(node)->type == RBH ||
843 get_itbl(node)->type == MVAR);
844
845 ASSERT(node!=(StgClosure*)NULL); // sanity check
846
847 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
848 }
849
850 /*
851 Print a whole blocking queue starting with the element bqe.
852 */
853 void
854 print_bqe (StgBlockingQueueElement *bqe)
855 {
856 rtsBool end;
857
858 /*
859 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
860 */
861 for (end = (bqe==END_BQ_QUEUE);
862 !end; // iterate until bqe points to a CONSTR
863 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
864 bqe = end ? END_BQ_QUEUE : bqe->link) {
865 ASSERT(bqe != END_BQ_QUEUE); // sanity check
866 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
867 /* types of closures that may appear in a blocking queue */
868 ASSERT(get_itbl(bqe)->type == TSO ||
869 get_itbl(bqe)->type == BLOCKED_FETCH ||
870 get_itbl(bqe)->type == CONSTR);
871 /* only BQs of an RBH end with an RBH_Save closure */
872 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
873
874 switch (get_itbl(bqe)->type) {
875 case TSO:
876 debugBelch(" TSO %u (%x),",
877 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
878 break;
879 case BLOCKED_FETCH:
880 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
881 ((StgBlockedFetch *)bqe)->node,
882 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
883 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
884 ((StgBlockedFetch *)bqe)->ga.weight);
885 break;
886 case CONSTR:
887 debugBelch(" %s (IP %p),",
888 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
889 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
890 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
891 "RBH_Save_?"), get_itbl(bqe));
892 break;
893 default:
894 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
895 info_type((StgClosure *)bqe)); // , node, info_type(node));
896 break;
897 }
898 } /* for */
899 debugBelch("\n");
900 }
901 # elif defined(GRAN)
902 void
903 print_bq (StgClosure *node)
904 {
905 StgBlockingQueueElement *bqe;
906 PEs node_loc, tso_loc;
907 rtsBool end;
908
909 /* should cover all closures that may have a blocking queue */
910 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
911 get_itbl(node)->type == FETCH_ME_BQ ||
912 get_itbl(node)->type == RBH);
913
914 ASSERT(node!=(StgClosure*)NULL); // sanity check
915 node_loc = where_is(node);
916
917 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
918 node, info_type(node), node_loc);
919
920 /*
921 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
922 */
923 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
924 !end; // iterate until bqe points to a CONSTR
925 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
926 ASSERT(bqe != END_BQ_QUEUE); // sanity check
927 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
928 /* types of closures that may appear in a blocking queue */
929 ASSERT(get_itbl(bqe)->type == TSO ||
930 get_itbl(bqe)->type == CONSTR);
931 /* only BQs of an RBH end with an RBH_Save closure */
932 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
933
934 tso_loc = where_is((StgClosure *)bqe);
935 switch (get_itbl(bqe)->type) {
936 case TSO:
937 debugBelch(" TSO %d (%p) on [PE %d],",
938 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
939 break;
940 case CONSTR:
941 debugBelch(" %s (IP %p),",
942 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
943 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
944 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
945 "RBH_Save_?"), get_itbl(bqe));
946 break;
947 default:
948 barf("Unexpected closure type %s in blocking queue of %p (%s)",
949 info_type((StgClosure *)bqe), node, info_type(node));
950 break;
951 }
952 } /* for */
953 debugBelch("\n");
954 }
955 # endif
956
957 #if defined(PARALLEL_HASKELL)
958 nat
959 run_queue_len(void)
960 {
961 nat i;
962 StgTSO *tso;
963
964 for (i=0, tso=run_queue_hd;
965 tso != END_TSO_QUEUE;
966 i++, tso=tso->link) {
967 /* nothing */
968 }
969
970 return i;
971 }
972 #endif
973
974 #endif /* DEBUG */