indicate which TSOs are dirty in the printAllThreads() output
[ghc.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2006
4 *
5 * Thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "Storage.h"
13 #include "Threads.h"
14 #include "RtsFlags.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19
20 /* Next thread ID to allocate.
21 * LOCK: sched_mutex
22 */
23 static StgThreadID next_thread_id = 1;
24
25 /* The smallest stack size that makes any sense is:
26 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
27 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
28 * + 1 (the closure to enter)
29 * + 1 (stg_ap_v_ret)
30 * + 1 (spare slot req'd by stg_ap_v_ret)
31 *
32 * A thread with this stack will bomb immediately with a stack
33 * overflow, which will increase its stack size.
34 */
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
36
37 /* ---------------------------------------------------------------------------
38 Create a new thread.
39
40 The new thread starts with the given stack size. Before the
41 scheduler can run, however, this thread needs to have a closure
42 (and possibly some arguments) pushed on its stack. See
43 pushClosure() in Schedule.h.
44
45 createGenThread() and createIOThread() (in SchedAPI.h) are
46 convenient packaged versions of this function.
47
48 currently pri (priority) is only used in a GRAN setup -- HWL
49 ------------------------------------------------------------------------ */
50 #if defined(GRAN)
51 /* currently pri (priority) is only used in a GRAN setup -- HWL */
52 StgTSO *
53 createThread(nat size, StgInt pri)
54 #else
55 StgTSO *
56 createThread(Capability *cap, nat size)
57 #endif
58 {
59 StgTSO *tso;
60 nat stack_size;
61
62 /* sched_mutex is *not* required */
63
64 /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
68 threadsIgnored++;
69 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
71 return END_TSO_QUEUE;
72 }
73 threadsCreated++;
74 #endif
75
76 #if defined(GRAN)
77 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
78 #endif
79
80 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
81
82 /* catch ridiculously small stack sizes */
83 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
85 }
86
87 stack_size = size - TSO_STRUCT_SIZEW;
88
89 tso = (StgTSO *)allocateLocal(cap, size);
90 TICK_ALLOC_TSO(stack_size, 0);
91
92 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
93 #if defined(GRAN)
94 SET_GRAN_HDR(tso, ThisPE);
95 #endif
96
97 // Always start with the compiled code evaluator
98 tso->what_next = ThreadRunGHC;
99
100 tso->why_blocked = NotBlocked;
101 tso->blocked_exceptions = END_TSO_QUEUE;
102 tso->flags = TSO_DIRTY;
103
104 tso->saved_errno = 0;
105 tso->bound = NULL;
106 tso->cap = cap;
107
108 tso->stack_size = stack_size;
109 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
110 - TSO_STRUCT_SIZEW;
111 tso->sp = (P_)&(tso->stack) + stack_size;
112
113 tso->trec = NO_TREC;
114
115 #ifdef PROFILING
116 tso->prof.CCCS = CCS_MAIN;
117 #endif
118
119 /* put a stop frame on the stack */
120 tso->sp -= sizeofW(StgStopFrame);
121 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122 tso->_link = END_TSO_QUEUE;
123
124 // ToDo: check this
125 #if defined(GRAN)
126 /* uses more flexible routine in GranSim */
127 insertThread(tso, CurrentProc);
128 #else
129 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
130 * from its creation
131 */
132 #endif
133
134 #if defined(GRAN)
135 if (RtsFlags.GranFlags.GranSimStats.Full)
136 DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138 if (RtsFlags.ParFlags.ParStats.Full)
139 DumpGranEvent(GR_STARTQ,tso);
140 /* HACk to avoid SCHEDULE
141 LastTSO = tso; */
142 #endif
143
144 /* Link the new thread on the global thread list.
145 */
146 ACQUIRE_LOCK(&sched_mutex);
147 tso->id = next_thread_id++; // while we have the mutex
148 tso->global_link = g0s0->threads;
149 g0s0->threads = tso;
150 RELEASE_LOCK(&sched_mutex);
151
152 #if defined(DIST)
153 tso->dist.priority = MandatoryPriority; //by default that is...
154 #endif
155
156 #if defined(GRAN)
157 tso->gran.pri = pri;
158 # if defined(DEBUG)
159 tso->gran.magic = TSO_MAGIC; // debugging only
160 # endif
161 tso->gran.sparkname = 0;
162 tso->gran.startedat = CURRENT_TIME;
163 tso->gran.exported = 0;
164 tso->gran.basicblocks = 0;
165 tso->gran.allocs = 0;
166 tso->gran.exectime = 0;
167 tso->gran.fetchtime = 0;
168 tso->gran.fetchcount = 0;
169 tso->gran.blocktime = 0;
170 tso->gran.blockcount = 0;
171 tso->gran.blockedat = 0;
172 tso->gran.globalsparks = 0;
173 tso->gran.localsparks = 0;
174 if (RtsFlags.GranFlags.Light)
175 tso->gran.clock = Now; /* local clock */
176 else
177 tso->gran.clock = 0;
178
179 IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
181 # if defined(DEBUG)
182 tso->par.magic = TSO_MAGIC; // debugging only
183 # endif
184 tso->par.sparkname = 0;
185 tso->par.startedat = CURRENT_TIME;
186 tso->par.exported = 0;
187 tso->par.basicblocks = 0;
188 tso->par.allocs = 0;
189 tso->par.exectime = 0;
190 tso->par.fetchtime = 0;
191 tso->par.fetchcount = 0;
192 tso->par.blocktime = 0;
193 tso->par.blockcount = 0;
194 tso->par.blockedat = 0;
195 tso->par.globalsparks = 0;
196 tso->par.localsparks = 0;
197 #endif
198
199 #if defined(GRAN)
200 globalGranStats.tot_threads_created++;
201 globalGranStats.threads_created_on_PE[CurrentProc]++;
202 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203 globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205 // collect parallel global statistics (currently done together with GC stats)
206 if (RtsFlags.ParFlags.ParStats.Global &&
207 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
209 globalParStats.tot_threads_created++;
210 }
211 #endif
212
213 #if defined(GRAN)
214 debugTrace(GRAN_DEBUG_pri,
215 "==__ schedule: Created TSO %d (%p);",
216 CurrentProc, tso, tso->id);
217 #elif defined(PARALLEL_HASKELL)
218 debugTrace(PAR_DEBUG_verbose,
219 "==__ schedule: Created TSO %d (%p); %d threads active",
220 (long)tso->id, tso, advisory_thread_count);
221 #else
222 debugTrace(DEBUG_sched,
223 "created thread %ld, stack size = %lx words",
224 (long)tso->id, (long)tso->stack_size);
225 #endif
226 return tso;
227 }
228
229 #if defined(PAR)
230 /* RFP:
231 all parallel thread creation calls should fall through the following routine.
232 */
233 StgTSO *
234 createThreadFromSpark(rtsSpark spark)
235 { StgTSO *tso;
236 ASSERT(spark != (rtsSpark)NULL);
237 // JB: TAKE CARE OF THIS COUNTER! BUGGY
238 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
239 { threadsIgnored++;
240 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
241 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
242 return END_TSO_QUEUE;
243 }
244 else
245 { threadsCreated++;
246 tso = createThread(RtsFlags.GcFlags.initialStkSize);
247 if (tso==END_TSO_QUEUE)
248 barf("createSparkThread: Cannot create TSO");
249 #if defined(DIST)
250 tso->priority = AdvisoryPriority;
251 #endif
252 pushClosure(tso,spark);
253 addToRunQueue(tso);
254 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
255 }
256 return tso;
257 }
258 #endif
259
260 /* ---------------------------------------------------------------------------
261 * Comparing Thread ids.
262 *
263 * This is used from STG land in the implementation of the
264 * instances of Eq/Ord for ThreadIds.
265 * ------------------------------------------------------------------------ */
266
267 int
268 cmp_thread(StgPtr tso1, StgPtr tso2)
269 {
270 StgThreadID id1 = ((StgTSO *)tso1)->id;
271 StgThreadID id2 = ((StgTSO *)tso2)->id;
272
273 if (id1 < id2) return (-1);
274 if (id1 > id2) return 1;
275 return 0;
276 }
277
278 /* ---------------------------------------------------------------------------
279 * Fetching the ThreadID from an StgTSO.
280 *
281 * This is used in the implementation of Show for ThreadIds.
282 * ------------------------------------------------------------------------ */
283 int
284 rts_getThreadId(StgPtr tso)
285 {
286 return ((StgTSO *)tso)->id;
287 }
288
289 /* -----------------------------------------------------------------------------
290 Remove a thread from a queue.
291 Fails fatally if the TSO is not on the queue.
292 -------------------------------------------------------------------------- */
293
294 void
295 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
296 {
297 StgTSO *t, *prev;
298
299 prev = NULL;
300 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
301 if (t == tso) {
302 if (prev) {
303 setTSOLink(cap,prev,t->_link);
304 } else {
305 *queue = t->_link;
306 }
307 return;
308 }
309 }
310 barf("removeThreadFromQueue: not found");
311 }
312
313 void
314 removeThreadFromDeQueue (Capability *cap,
315 StgTSO **head, StgTSO **tail, StgTSO *tso)
316 {
317 StgTSO *t, *prev;
318
319 prev = NULL;
320 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
321 if (t == tso) {
322 if (prev) {
323 setTSOLink(cap,prev,t->_link);
324 } else {
325 *head = t->_link;
326 }
327 if (*tail == tso) {
328 if (prev) {
329 *tail = prev;
330 } else {
331 *tail = END_TSO_QUEUE;
332 }
333 }
334 return;
335 }
336 }
337 barf("removeThreadFromMVarQueue: not found");
338 }
339
340 void
341 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
342 {
343 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
344 }
345
346 /* ----------------------------------------------------------------------------
347 unblockOne()
348
349 unblock a single thread.
350 ------------------------------------------------------------------------- */
351
352 #if defined(GRAN)
353 STATIC_INLINE void
354 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
355 {
356 }
357 #elif defined(PARALLEL_HASKELL)
358 STATIC_INLINE void
359 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
360 {
361 /* write RESUME events to log file and
362 update blocked and fetch time (depending on type of the orig closure) */
363 if (RtsFlags.ParFlags.ParStats.Full) {
364 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
365 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
366 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
367 if (emptyRunQueue())
368 emitSchedule = rtsTrue;
369
370 switch (get_itbl(node)->type) {
371 case FETCH_ME_BQ:
372 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
373 break;
374 case RBH:
375 case FETCH_ME:
376 case BLACKHOLE_BQ:
377 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
378 break;
379 #ifdef DIST
380 case MVAR:
381 break;
382 #endif
383 default:
384 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
385 }
386 }
387 }
388 #endif
389
390 #if defined(GRAN)
391 StgBlockingQueueElement *
392 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
393 {
394 StgTSO *tso;
395 PEs node_loc, tso_loc;
396
397 node_loc = where_is(node); // should be lifted out of loop
398 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
399 tso_loc = where_is((StgClosure *)tso);
400 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
401 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
402 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
403 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
404 // insertThread(tso, node_loc);
405 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
406 ResumeThread,
407 tso, node, (rtsSpark*)NULL);
408 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
409 // len_local++;
410 // len++;
411 } else { // TSO is remote (actually should be FMBQ)
412 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
413 RtsFlags.GranFlags.Costs.gunblocktime +
414 RtsFlags.GranFlags.Costs.latency;
415 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
416 UnblockThread,
417 tso, node, (rtsSpark*)NULL);
418 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
419 // len++;
420 }
421 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
422 IF_GRAN_DEBUG(bq,
423 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
424 (node_loc==tso_loc ? "Local" : "Global"),
425 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
426 tso->block_info.closure = NULL;
427 debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
428 tso->id, tso));
429 }
430 #elif defined(PARALLEL_HASKELL)
431 StgBlockingQueueElement *
432 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
433 {
434 StgBlockingQueueElement *next;
435
436 switch (get_itbl(bqe)->type) {
437 case TSO:
438 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
439 /* if it's a TSO just push it onto the run_queue */
440 next = bqe->link;
441 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
442 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
443 threadRunnable();
444 unblockCount(bqe, node);
445 /* reset blocking status after dumping event */
446 ((StgTSO *)bqe)->why_blocked = NotBlocked;
447 break;
448
449 case BLOCKED_FETCH:
450 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
451 next = bqe->link;
452 bqe->link = (StgBlockingQueueElement *)PendingFetches;
453 PendingFetches = (StgBlockedFetch *)bqe;
454 break;
455
456 # if defined(DEBUG)
457 /* can ignore this case in a non-debugging setup;
458 see comments on RBHSave closures above */
459 case CONSTR:
460 /* check that the closure is an RBHSave closure */
461 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
462 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
463 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
464 break;
465
466 default:
467 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
468 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
469 (StgClosure *)bqe);
470 # endif
471 }
472 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
473 return next;
474 }
475 #endif
476
477 StgTSO *
478 unblockOne (Capability *cap, StgTSO *tso)
479 {
480 return unblockOne_(cap,tso,rtsTrue); // allow migration
481 }
482
483 StgTSO *
484 unblockOne_ (Capability *cap, StgTSO *tso,
485 rtsBool allow_migrate USED_IF_THREADS)
486 {
487 StgTSO *next;
488
489 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
490 ASSERT(tso->why_blocked != NotBlocked);
491
492 tso->why_blocked = NotBlocked;
493 next = tso->_link;
494 tso->_link = END_TSO_QUEUE;
495
496 #if defined(THREADED_RTS)
497 if (tso->cap == cap || (!tsoLocked(tso) &&
498 allow_migrate &&
499 RtsFlags.ParFlags.wakeupMigrate)) {
500 // We are waking up this thread on the current Capability, which
501 // might involve migrating it from the Capability it was last on.
502 if (tso->bound) {
503 ASSERT(tso->bound->cap == tso->cap);
504 tso->bound->cap = cap;
505 }
506 tso->cap = cap;
507 appendToRunQueue(cap,tso);
508 // we're holding a newly woken thread, make sure we context switch
509 // quickly so we can migrate it if necessary.
510 cap->context_switch = 1;
511 } else {
512 // we'll try to wake it up on the Capability it was last on.
513 wakeupThreadOnCapability(cap, tso->cap, tso);
514 }
515 #else
516 appendToRunQueue(cap,tso);
517 cap->context_switch = 1;
518 #endif
519
520 debugTrace(DEBUG_sched,
521 "waking up thread %ld on cap %d",
522 (long)tso->id, tso->cap->no);
523
524 return next;
525 }
526
527 /* ----------------------------------------------------------------------------
528 awakenBlockedQueue
529
530 wakes up all the threads on the specified queue.
531 ------------------------------------------------------------------------- */
532
533 #if defined(GRAN)
534 void
535 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
536 {
537 StgBlockingQueueElement *bqe;
538 PEs node_loc;
539 nat len = 0;
540
541 IF_GRAN_DEBUG(bq,
542 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
543 node, CurrentProc, CurrentTime[CurrentProc],
544 CurrentTSO->id, CurrentTSO));
545
546 node_loc = where_is(node);
547
548 ASSERT(q == END_BQ_QUEUE ||
549 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
550 get_itbl(q)->type == CONSTR); // closure (type constructor)
551 ASSERT(is_unique(node));
552
553 /* FAKE FETCH: magically copy the node to the tso's proc;
554 no Fetch necessary because in reality the node should not have been
555 moved to the other PE in the first place
556 */
557 if (CurrentProc!=node_loc) {
558 IF_GRAN_DEBUG(bq,
559 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
560 node, node_loc, CurrentProc, CurrentTSO->id,
561 // CurrentTSO, where_is(CurrentTSO),
562 node->header.gran.procs));
563 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
564 IF_GRAN_DEBUG(bq,
565 debugBelch("## new bitmask of node %p is %#x\n",
566 node, node->header.gran.procs));
567 if (RtsFlags.GranFlags.GranSimStats.Global) {
568 globalGranStats.tot_fake_fetches++;
569 }
570 }
571
572 bqe = q;
573 // ToDo: check: ASSERT(CurrentProc==node_loc);
574 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
575 //next = bqe->link;
576 /*
577 bqe points to the current element in the queue
578 next points to the next element in the queue
579 */
580 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
581 //tso_loc = where_is(tso);
582 len++;
583 bqe = unblockOne(bqe, node);
584 }
585
586 /* if this is the BQ of an RBH, we have to put back the info ripped out of
587 the closure to make room for the anchor of the BQ */
588 if (bqe!=END_BQ_QUEUE) {
589 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
590 /*
591 ASSERT((info_ptr==&RBH_Save_0_info) ||
592 (info_ptr==&RBH_Save_1_info) ||
593 (info_ptr==&RBH_Save_2_info));
594 */
595 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
596 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
597 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
598
599 IF_GRAN_DEBUG(bq,
600 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
601 node, info_type(node)));
602 }
603
604 /* statistics gathering */
605 if (RtsFlags.GranFlags.GranSimStats.Global) {
606 // globalGranStats.tot_bq_processing_time += bq_processing_time;
607 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
608 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
609 globalGranStats.tot_awbq++; // total no. of bqs awakened
610 }
611 IF_GRAN_DEBUG(bq,
612 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
613 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
614 }
615 #elif defined(PARALLEL_HASKELL)
616 void
617 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
618 {
619 StgBlockingQueueElement *bqe;
620
621 IF_PAR_DEBUG(verbose,
622 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
623 node, mytid));
624 #ifdef DIST
625 //RFP
626 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
627 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
628 return;
629 }
630 #endif
631
632 ASSERT(q == END_BQ_QUEUE ||
633 get_itbl(q)->type == TSO ||
634 get_itbl(q)->type == BLOCKED_FETCH ||
635 get_itbl(q)->type == CONSTR);
636
637 bqe = q;
638 while (get_itbl(bqe)->type==TSO ||
639 get_itbl(bqe)->type==BLOCKED_FETCH) {
640 bqe = unblockOne(bqe, node);
641 }
642 }
643
644 #else /* !GRAN && !PARALLEL_HASKELL */
645
646 void
647 awakenBlockedQueue(Capability *cap, StgTSO *tso)
648 {
649 while (tso != END_TSO_QUEUE) {
650 tso = unblockOne(cap,tso);
651 }
652 }
653 #endif
654
655
656 /* ---------------------------------------------------------------------------
657 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
658 * used by Control.Concurrent for error checking.
659 * ------------------------------------------------------------------------- */
660
661 HsBool
662 rtsSupportsBoundThreads(void)
663 {
664 #if defined(THREADED_RTS)
665 return HS_BOOL_TRUE;
666 #else
667 return HS_BOOL_FALSE;
668 #endif
669 }
670
671 /* ---------------------------------------------------------------------------
672 * isThreadBound(tso): check whether tso is bound to an OS thread.
673 * ------------------------------------------------------------------------- */
674
675 StgBool
676 isThreadBound(StgTSO* tso USED_IF_THREADS)
677 {
678 #if defined(THREADED_RTS)
679 return (tso->bound != NULL);
680 #endif
681 return rtsFalse;
682 }
683
684 /* ----------------------------------------------------------------------------
685 * Debugging: why is a thread blocked
686 * ------------------------------------------------------------------------- */
687
688 #if DEBUG
689 void
690 printThreadBlockage(StgTSO *tso)
691 {
692 switch (tso->why_blocked) {
693 case BlockedOnRead:
694 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
695 break;
696 case BlockedOnWrite:
697 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
698 break;
699 #if defined(mingw32_HOST_OS)
700 case BlockedOnDoProc:
701 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
702 break;
703 #endif
704 case BlockedOnDelay:
705 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
706 break;
707 case BlockedOnMVar:
708 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
709 break;
710 case BlockedOnException:
711 debugBelch("is blocked on delivering an exception to thread %lu",
712 (unsigned long)tso->block_info.tso->id);
713 break;
714 case BlockedOnBlackHole:
715 debugBelch("is blocked on a black hole");
716 break;
717 case NotBlocked:
718 debugBelch("is not blocked");
719 break;
720 #if defined(PARALLEL_HASKELL)
721 case BlockedOnGA:
722 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
723 tso->block_info.closure, info_type(tso->block_info.closure));
724 break;
725 case BlockedOnGA_NoSend:
726 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
727 tso->block_info.closure, info_type(tso->block_info.closure));
728 break;
729 #endif
730 case BlockedOnCCall:
731 debugBelch("is blocked on an external call");
732 break;
733 case BlockedOnCCall_NoUnblockExc:
734 debugBelch("is blocked on an external call (exceptions were already blocked)");
735 break;
736 case BlockedOnSTM:
737 debugBelch("is blocked on an STM operation");
738 break;
739 default:
740 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
741 tso->why_blocked, tso->id, tso);
742 }
743 }
744
745 void
746 printThreadStatus(StgTSO *t)
747 {
748 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
749 {
750 void *label = lookupThreadLabel(t->id);
751 if (label) debugBelch("[\"%s\"] ",(char *)label);
752 }
753 if (t->what_next == ThreadRelocated) {
754 debugBelch("has been relocated...\n");
755 } else {
756 switch (t->what_next) {
757 case ThreadKilled:
758 debugBelch("has been killed");
759 break;
760 case ThreadComplete:
761 debugBelch("has completed");
762 break;
763 default:
764 printThreadBlockage(t);
765 }
766 if (t->flags & TSO_DIRTY) {
767 debugBelch(" (TSO_DIRTY)");
768 } else if (t->flags & TSO_LINK_DIRTY) {
769 debugBelch(" (TSO_LINK_DIRTY)");
770 }
771 debugBelch("\n");
772 }
773 }
774
775 void
776 printAllThreads(void)
777 {
778 StgTSO *t, *next;
779 nat i, s;
780 Capability *cap;
781
782 # if defined(GRAN)
783 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
784 ullong_format_string(TIME_ON_PROC(CurrentProc),
785 time_string, rtsFalse/*no commas!*/);
786
787 debugBelch("all threads at [%s]:\n", time_string);
788 # elif defined(PARALLEL_HASKELL)
789 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
790 ullong_format_string(CURRENT_TIME,
791 time_string, rtsFalse/*no commas!*/);
792
793 debugBelch("all threads at [%s]:\n", time_string);
794 # else
795 debugBelch("all threads:\n");
796 # endif
797
798 for (i = 0; i < n_capabilities; i++) {
799 cap = &capabilities[i];
800 debugBelch("threads on capability %d:\n", cap->no);
801 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
802 printThreadStatus(t);
803 }
804 }
805
806 debugBelch("other threads:\n");
807 for (s = 0; s < total_steps; s++) {
808 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
809 if (t->why_blocked != NotBlocked) {
810 printThreadStatus(t);
811 }
812 if (t->what_next == ThreadRelocated) {
813 next = t->_link;
814 } else {
815 next = t->global_link;
816 }
817 }
818 }
819 }
820
821 // useful from gdb
822 void
823 printThreadQueue(StgTSO *t)
824 {
825 nat i = 0;
826 for (; t != END_TSO_QUEUE; t = t->_link) {
827 printThreadStatus(t);
828 i++;
829 }
830 debugBelch("%d threads on queue\n", i);
831 }
832
833 /*
834 Print a whole blocking queue attached to node (debugging only).
835 */
836 # if defined(PARALLEL_HASKELL)
837 void
838 print_bq (StgClosure *node)
839 {
840 StgBlockingQueueElement *bqe;
841 StgTSO *tso;
842 rtsBool end;
843
844 debugBelch("## BQ of closure %p (%s): ",
845 node, info_type(node));
846
847 /* should cover all closures that may have a blocking queue */
848 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
849 get_itbl(node)->type == FETCH_ME_BQ ||
850 get_itbl(node)->type == RBH ||
851 get_itbl(node)->type == MVAR);
852
853 ASSERT(node!=(StgClosure*)NULL); // sanity check
854
855 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
856 }
857
858 /*
859 Print a whole blocking queue starting with the element bqe.
860 */
861 void
862 print_bqe (StgBlockingQueueElement *bqe)
863 {
864 rtsBool end;
865
866 /*
867 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
868 */
869 for (end = (bqe==END_BQ_QUEUE);
870 !end; // iterate until bqe points to a CONSTR
871 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
872 bqe = end ? END_BQ_QUEUE : bqe->link) {
873 ASSERT(bqe != END_BQ_QUEUE); // sanity check
874 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
875 /* types of closures that may appear in a blocking queue */
876 ASSERT(get_itbl(bqe)->type == TSO ||
877 get_itbl(bqe)->type == BLOCKED_FETCH ||
878 get_itbl(bqe)->type == CONSTR);
879 /* only BQs of an RBH end with an RBH_Save closure */
880 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
881
882 switch (get_itbl(bqe)->type) {
883 case TSO:
884 debugBelch(" TSO %u (%x),",
885 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
886 break;
887 case BLOCKED_FETCH:
888 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
889 ((StgBlockedFetch *)bqe)->node,
890 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
891 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
892 ((StgBlockedFetch *)bqe)->ga.weight);
893 break;
894 case CONSTR:
895 debugBelch(" %s (IP %p),",
896 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
897 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
898 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
899 "RBH_Save_?"), get_itbl(bqe));
900 break;
901 default:
902 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
903 info_type((StgClosure *)bqe)); // , node, info_type(node));
904 break;
905 }
906 } /* for */
907 debugBelch("\n");
908 }
909 # elif defined(GRAN)
910 void
911 print_bq (StgClosure *node)
912 {
913 StgBlockingQueueElement *bqe;
914 PEs node_loc, tso_loc;
915 rtsBool end;
916
917 /* should cover all closures that may have a blocking queue */
918 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
919 get_itbl(node)->type == FETCH_ME_BQ ||
920 get_itbl(node)->type == RBH);
921
922 ASSERT(node!=(StgClosure*)NULL); // sanity check
923 node_loc = where_is(node);
924
925 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
926 node, info_type(node), node_loc);
927
928 /*
929 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
930 */
931 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
932 !end; // iterate until bqe points to a CONSTR
933 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
934 ASSERT(bqe != END_BQ_QUEUE); // sanity check
935 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
936 /* types of closures that may appear in a blocking queue */
937 ASSERT(get_itbl(bqe)->type == TSO ||
938 get_itbl(bqe)->type == CONSTR);
939 /* only BQs of an RBH end with an RBH_Save closure */
940 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
941
942 tso_loc = where_is((StgClosure *)bqe);
943 switch (get_itbl(bqe)->type) {
944 case TSO:
945 debugBelch(" TSO %d (%p) on [PE %d],",
946 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
947 break;
948 case CONSTR:
949 debugBelch(" %s (IP %p),",
950 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
951 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
952 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
953 "RBH_Save_?"), get_itbl(bqe));
954 break;
955 default:
956 barf("Unexpected closure type %s in blocking queue of %p (%s)",
957 info_type((StgClosure *)bqe), node, info_type(node));
958 break;
959 }
960 } /* for */
961 debugBelch("\n");
962 }
963 # endif
964
965 #if defined(PARALLEL_HASKELL)
966 nat
967 run_queue_len(void)
968 {
969 nat i;
970 StgTSO *tso;
971
972 for (i=0, tso=run_queue_hd;
973 tso != END_TSO_QUEUE;
974 i++, tso=tso->link) {
975 /* nothing */
976 }
977
978 return i;
979 }
980 #endif
981
982 #endif /* DEBUG */