Don't traverse the entire list of threads on every GC (phase 1)
[ghc.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2006
4 *
5 * Thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "Storage.h"
13 #include "Threads.h"
14 #include "RtsFlags.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19
20 /* Next thread ID to allocate.
21 * LOCK: sched_mutex
22 */
23 static StgThreadID next_thread_id = 1;
24
25 /* The smallest stack size that makes any sense is:
26 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
27 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
28 * + 1 (the closure to enter)
29 * + 1 (stg_ap_v_ret)
30 * + 1 (spare slot req'd by stg_ap_v_ret)
31 *
32 * A thread with this stack will bomb immediately with a stack
33 * overflow, which will increase its stack size.
34 */
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
36
37 /* ---------------------------------------------------------------------------
38 Create a new thread.
39
40 The new thread starts with the given stack size. Before the
41 scheduler can run, however, this thread needs to have a closure
42 (and possibly some arguments) pushed on its stack. See
43 pushClosure() in Schedule.h.
44
45 createGenThread() and createIOThread() (in SchedAPI.h) are
46 convenient packaged versions of this function.
47
48 currently pri (priority) is only used in a GRAN setup -- HWL
49 ------------------------------------------------------------------------ */
50 #if defined(GRAN)
51 /* currently pri (priority) is only used in a GRAN setup -- HWL */
52 StgTSO *
53 createThread(nat size, StgInt pri)
54 #else
55 StgTSO *
56 createThread(Capability *cap, nat size)
57 #endif
58 {
59 StgTSO *tso;
60 nat stack_size;
61
62 /* sched_mutex is *not* required */
63
64 /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
68 threadsIgnored++;
69 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
71 return END_TSO_QUEUE;
72 }
73 threadsCreated++;
74 #endif
75
76 #if defined(GRAN)
77 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
78 #endif
79
80 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
81
82 /* catch ridiculously small stack sizes */
83 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
85 }
86
87 stack_size = size - TSO_STRUCT_SIZEW;
88
89 tso = (StgTSO *)allocateLocal(cap, size);
90 TICK_ALLOC_TSO(stack_size, 0);
91
92 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
93 #if defined(GRAN)
94 SET_GRAN_HDR(tso, ThisPE);
95 #endif
96
97 // Always start with the compiled code evaluator
98 tso->what_next = ThreadRunGHC;
99
100 tso->why_blocked = NotBlocked;
101 tso->blocked_exceptions = END_TSO_QUEUE;
102 tso->flags = TSO_DIRTY;
103
104 tso->saved_errno = 0;
105 tso->bound = NULL;
106 tso->cap = cap;
107
108 tso->stack_size = stack_size;
109 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
110 - TSO_STRUCT_SIZEW;
111 tso->sp = (P_)&(tso->stack) + stack_size;
112
113 tso->trec = NO_TREC;
114
115 #ifdef PROFILING
116 tso->prof.CCCS = CCS_MAIN;
117 #endif
118
119 /* put a stop frame on the stack */
120 tso->sp -= sizeofW(StgStopFrame);
121 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122 tso->_link = END_TSO_QUEUE;
123
124 // ToDo: check this
125 #if defined(GRAN)
126 /* uses more flexible routine in GranSim */
127 insertThread(tso, CurrentProc);
128 #else
129 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
130 * from its creation
131 */
132 #endif
133
134 #if defined(GRAN)
135 if (RtsFlags.GranFlags.GranSimStats.Full)
136 DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138 if (RtsFlags.ParFlags.ParStats.Full)
139 DumpGranEvent(GR_STARTQ,tso);
140 /* HACk to avoid SCHEDULE
141 LastTSO = tso; */
142 #endif
143
144 /* Link the new thread on the global thread list.
145 */
146 ACQUIRE_LOCK(&sched_mutex);
147 tso->id = next_thread_id++; // while we have the mutex
148 tso->global_link = g0s0->threads;
149 g0s0->threads = tso;
150 RELEASE_LOCK(&sched_mutex);
151
152 #if defined(DIST)
153 tso->dist.priority = MandatoryPriority; //by default that is...
154 #endif
155
156 #if defined(GRAN)
157 tso->gran.pri = pri;
158 # if defined(DEBUG)
159 tso->gran.magic = TSO_MAGIC; // debugging only
160 # endif
161 tso->gran.sparkname = 0;
162 tso->gran.startedat = CURRENT_TIME;
163 tso->gran.exported = 0;
164 tso->gran.basicblocks = 0;
165 tso->gran.allocs = 0;
166 tso->gran.exectime = 0;
167 tso->gran.fetchtime = 0;
168 tso->gran.fetchcount = 0;
169 tso->gran.blocktime = 0;
170 tso->gran.blockcount = 0;
171 tso->gran.blockedat = 0;
172 tso->gran.globalsparks = 0;
173 tso->gran.localsparks = 0;
174 if (RtsFlags.GranFlags.Light)
175 tso->gran.clock = Now; /* local clock */
176 else
177 tso->gran.clock = 0;
178
179 IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
181 # if defined(DEBUG)
182 tso->par.magic = TSO_MAGIC; // debugging only
183 # endif
184 tso->par.sparkname = 0;
185 tso->par.startedat = CURRENT_TIME;
186 tso->par.exported = 0;
187 tso->par.basicblocks = 0;
188 tso->par.allocs = 0;
189 tso->par.exectime = 0;
190 tso->par.fetchtime = 0;
191 tso->par.fetchcount = 0;
192 tso->par.blocktime = 0;
193 tso->par.blockcount = 0;
194 tso->par.blockedat = 0;
195 tso->par.globalsparks = 0;
196 tso->par.localsparks = 0;
197 #endif
198
199 #if defined(GRAN)
200 globalGranStats.tot_threads_created++;
201 globalGranStats.threads_created_on_PE[CurrentProc]++;
202 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203 globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205 // collect parallel global statistics (currently done together with GC stats)
206 if (RtsFlags.ParFlags.ParStats.Global &&
207 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
209 globalParStats.tot_threads_created++;
210 }
211 #endif
212
213 #if defined(GRAN)
214 debugTrace(GRAN_DEBUG_pri,
215 "==__ schedule: Created TSO %d (%p);",
216 CurrentProc, tso, tso->id);
217 #elif defined(PARALLEL_HASKELL)
218 debugTrace(PAR_DEBUG_verbose,
219 "==__ schedule: Created TSO %d (%p); %d threads active",
220 (long)tso->id, tso, advisory_thread_count);
221 #else
222 debugTrace(DEBUG_sched,
223 "created thread %ld, stack size = %lx words",
224 (long)tso->id, (long)tso->stack_size);
225 #endif
226 return tso;
227 }
228
229 #if defined(PAR)
230 /* RFP:
231 all parallel thread creation calls should fall through the following routine.
232 */
233 StgTSO *
234 createThreadFromSpark(rtsSpark spark)
235 { StgTSO *tso;
236 ASSERT(spark != (rtsSpark)NULL);
237 // JB: TAKE CARE OF THIS COUNTER! BUGGY
238 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
239 { threadsIgnored++;
240 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
241 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
242 return END_TSO_QUEUE;
243 }
244 else
245 { threadsCreated++;
246 tso = createThread(RtsFlags.GcFlags.initialStkSize);
247 if (tso==END_TSO_QUEUE)
248 barf("createSparkThread: Cannot create TSO");
249 #if defined(DIST)
250 tso->priority = AdvisoryPriority;
251 #endif
252 pushClosure(tso,spark);
253 addToRunQueue(tso);
254 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
255 }
256 return tso;
257 }
258 #endif
259
260 /* ---------------------------------------------------------------------------
261 * Comparing Thread ids.
262 *
263 * This is used from STG land in the implementation of the
264 * instances of Eq/Ord for ThreadIds.
265 * ------------------------------------------------------------------------ */
266
267 int
268 cmp_thread(StgPtr tso1, StgPtr tso2)
269 {
270 StgThreadID id1 = ((StgTSO *)tso1)->id;
271 StgThreadID id2 = ((StgTSO *)tso2)->id;
272
273 if (id1 < id2) return (-1);
274 if (id1 > id2) return 1;
275 return 0;
276 }
277
278 /* ---------------------------------------------------------------------------
279 * Fetching the ThreadID from an StgTSO.
280 *
281 * This is used in the implementation of Show for ThreadIds.
282 * ------------------------------------------------------------------------ */
283 int
284 rts_getThreadId(StgPtr tso)
285 {
286 return ((StgTSO *)tso)->id;
287 }
288
289 /* -----------------------------------------------------------------------------
290 Remove a thread from a queue.
291 Fails fatally if the TSO is not on the queue.
292 -------------------------------------------------------------------------- */
293
294 void
295 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
296 {
297 StgTSO *t, *prev;
298
299 prev = NULL;
300 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
301 if (t == tso) {
302 if (prev) {
303 setTSOLink(cap,prev,t->_link);
304 } else {
305 *queue = t->_link;
306 }
307 return;
308 }
309 }
310 barf("removeThreadFromQueue: not found");
311 }
312
313 void
314 removeThreadFromDeQueue (Capability *cap,
315 StgTSO **head, StgTSO **tail, StgTSO *tso)
316 {
317 StgTSO *t, *prev;
318
319 prev = NULL;
320 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
321 if (t == tso) {
322 if (prev) {
323 setTSOLink(cap,prev,t->_link);
324 } else {
325 *head = t->_link;
326 }
327 if (*tail == tso) {
328 if (prev) {
329 *tail = prev;
330 } else {
331 *tail = END_TSO_QUEUE;
332 }
333 }
334 return;
335 }
336 }
337 barf("removeThreadFromMVarQueue: not found");
338 }
339
340 void
341 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
342 {
343 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
344 }
345
346 /* ----------------------------------------------------------------------------
347 unblockOne()
348
349 unblock a single thread.
350 ------------------------------------------------------------------------- */
351
352 #if defined(GRAN)
353 STATIC_INLINE void
354 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
355 {
356 }
357 #elif defined(PARALLEL_HASKELL)
358 STATIC_INLINE void
359 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
360 {
361 /* write RESUME events to log file and
362 update blocked and fetch time (depending on type of the orig closure) */
363 if (RtsFlags.ParFlags.ParStats.Full) {
364 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
365 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
366 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
367 if (emptyRunQueue())
368 emitSchedule = rtsTrue;
369
370 switch (get_itbl(node)->type) {
371 case FETCH_ME_BQ:
372 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
373 break;
374 case RBH:
375 case FETCH_ME:
376 case BLACKHOLE_BQ:
377 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
378 break;
379 #ifdef DIST
380 case MVAR:
381 break;
382 #endif
383 default:
384 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
385 }
386 }
387 }
388 #endif
389
390 #if defined(GRAN)
391 StgBlockingQueueElement *
392 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
393 {
394 StgTSO *tso;
395 PEs node_loc, tso_loc;
396
397 node_loc = where_is(node); // should be lifted out of loop
398 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
399 tso_loc = where_is((StgClosure *)tso);
400 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
401 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
402 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
403 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
404 // insertThread(tso, node_loc);
405 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
406 ResumeThread,
407 tso, node, (rtsSpark*)NULL);
408 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
409 // len_local++;
410 // len++;
411 } else { // TSO is remote (actually should be FMBQ)
412 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
413 RtsFlags.GranFlags.Costs.gunblocktime +
414 RtsFlags.GranFlags.Costs.latency;
415 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
416 UnblockThread,
417 tso, node, (rtsSpark*)NULL);
418 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
419 // len++;
420 }
421 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
422 IF_GRAN_DEBUG(bq,
423 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
424 (node_loc==tso_loc ? "Local" : "Global"),
425 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
426 tso->block_info.closure = NULL;
427 debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
428 tso->id, tso));
429 }
430 #elif defined(PARALLEL_HASKELL)
431 StgBlockingQueueElement *
432 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
433 {
434 StgBlockingQueueElement *next;
435
436 switch (get_itbl(bqe)->type) {
437 case TSO:
438 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
439 /* if it's a TSO just push it onto the run_queue */
440 next = bqe->link;
441 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
442 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
443 threadRunnable();
444 unblockCount(bqe, node);
445 /* reset blocking status after dumping event */
446 ((StgTSO *)bqe)->why_blocked = NotBlocked;
447 break;
448
449 case BLOCKED_FETCH:
450 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
451 next = bqe->link;
452 bqe->link = (StgBlockingQueueElement *)PendingFetches;
453 PendingFetches = (StgBlockedFetch *)bqe;
454 break;
455
456 # if defined(DEBUG)
457 /* can ignore this case in a non-debugging setup;
458 see comments on RBHSave closures above */
459 case CONSTR:
460 /* check that the closure is an RBHSave closure */
461 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
462 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
463 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
464 break;
465
466 default:
467 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
468 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
469 (StgClosure *)bqe);
470 # endif
471 }
472 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
473 return next;
474 }
475 #endif
476
477 StgTSO *
478 unblockOne (Capability *cap, StgTSO *tso)
479 {
480 return unblockOne_(cap,tso,rtsTrue); // allow migration
481 }
482
483 StgTSO *
484 unblockOne_ (Capability *cap, StgTSO *tso,
485 rtsBool allow_migrate USED_IF_THREADS)
486 {
487 StgTSO *next;
488
489 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
490 ASSERT(tso->why_blocked != NotBlocked);
491
492 tso->why_blocked = NotBlocked;
493 next = tso->_link;
494 tso->_link = END_TSO_QUEUE;
495
496 #if defined(THREADED_RTS)
497 if (tso->cap == cap || (!tsoLocked(tso) &&
498 allow_migrate &&
499 RtsFlags.ParFlags.wakeupMigrate)) {
500 // We are waking up this thread on the current Capability, which
501 // might involve migrating it from the Capability it was last on.
502 if (tso->bound) {
503 ASSERT(tso->bound->cap == tso->cap);
504 tso->bound->cap = cap;
505 }
506 tso->cap = cap;
507 appendToRunQueue(cap,tso);
508 // we're holding a newly woken thread, make sure we context switch
509 // quickly so we can migrate it if necessary.
510 context_switch = 1;
511 } else {
512 // we'll try to wake it up on the Capability it was last on.
513 wakeupThreadOnCapability_lock(tso->cap, tso);
514 }
515 #else
516 appendToRunQueue(cap,tso);
517 context_switch = 1;
518 #endif
519
520 debugTrace(DEBUG_sched,
521 "waking up thread %ld on cap %d",
522 (long)tso->id, tso->cap->no);
523
524 return next;
525 }
526
527 /* ----------------------------------------------------------------------------
528 awakenBlockedQueue
529
530 wakes up all the threads on the specified queue.
531 ------------------------------------------------------------------------- */
532
533 #if defined(GRAN)
534 void
535 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
536 {
537 StgBlockingQueueElement *bqe;
538 PEs node_loc;
539 nat len = 0;
540
541 IF_GRAN_DEBUG(bq,
542 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
543 node, CurrentProc, CurrentTime[CurrentProc],
544 CurrentTSO->id, CurrentTSO));
545
546 node_loc = where_is(node);
547
548 ASSERT(q == END_BQ_QUEUE ||
549 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
550 get_itbl(q)->type == CONSTR); // closure (type constructor)
551 ASSERT(is_unique(node));
552
553 /* FAKE FETCH: magically copy the node to the tso's proc;
554 no Fetch necessary because in reality the node should not have been
555 moved to the other PE in the first place
556 */
557 if (CurrentProc!=node_loc) {
558 IF_GRAN_DEBUG(bq,
559 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
560 node, node_loc, CurrentProc, CurrentTSO->id,
561 // CurrentTSO, where_is(CurrentTSO),
562 node->header.gran.procs));
563 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
564 IF_GRAN_DEBUG(bq,
565 debugBelch("## new bitmask of node %p is %#x\n",
566 node, node->header.gran.procs));
567 if (RtsFlags.GranFlags.GranSimStats.Global) {
568 globalGranStats.tot_fake_fetches++;
569 }
570 }
571
572 bqe = q;
573 // ToDo: check: ASSERT(CurrentProc==node_loc);
574 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
575 //next = bqe->link;
576 /*
577 bqe points to the current element in the queue
578 next points to the next element in the queue
579 */
580 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
581 //tso_loc = where_is(tso);
582 len++;
583 bqe = unblockOne(bqe, node);
584 }
585
586 /* if this is the BQ of an RBH, we have to put back the info ripped out of
587 the closure to make room for the anchor of the BQ */
588 if (bqe!=END_BQ_QUEUE) {
589 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
590 /*
591 ASSERT((info_ptr==&RBH_Save_0_info) ||
592 (info_ptr==&RBH_Save_1_info) ||
593 (info_ptr==&RBH_Save_2_info));
594 */
595 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
596 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
597 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
598
599 IF_GRAN_DEBUG(bq,
600 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
601 node, info_type(node)));
602 }
603
604 /* statistics gathering */
605 if (RtsFlags.GranFlags.GranSimStats.Global) {
606 // globalGranStats.tot_bq_processing_time += bq_processing_time;
607 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
608 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
609 globalGranStats.tot_awbq++; // total no. of bqs awakened
610 }
611 IF_GRAN_DEBUG(bq,
612 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
613 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
614 }
615 #elif defined(PARALLEL_HASKELL)
616 void
617 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
618 {
619 StgBlockingQueueElement *bqe;
620
621 IF_PAR_DEBUG(verbose,
622 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
623 node, mytid));
624 #ifdef DIST
625 //RFP
626 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
627 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
628 return;
629 }
630 #endif
631
632 ASSERT(q == END_BQ_QUEUE ||
633 get_itbl(q)->type == TSO ||
634 get_itbl(q)->type == BLOCKED_FETCH ||
635 get_itbl(q)->type == CONSTR);
636
637 bqe = q;
638 while (get_itbl(bqe)->type==TSO ||
639 get_itbl(bqe)->type==BLOCKED_FETCH) {
640 bqe = unblockOne(bqe, node);
641 }
642 }
643
644 #else /* !GRAN && !PARALLEL_HASKELL */
645
646 void
647 awakenBlockedQueue(Capability *cap, StgTSO *tso)
648 {
649 while (tso != END_TSO_QUEUE) {
650 tso = unblockOne(cap,tso);
651 }
652 }
653 #endif
654
655
656 /* ---------------------------------------------------------------------------
657 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
658 * used by Control.Concurrent for error checking.
659 * ------------------------------------------------------------------------- */
660
661 HsBool
662 rtsSupportsBoundThreads(void)
663 {
664 #if defined(THREADED_RTS)
665 return HS_BOOL_TRUE;
666 #else
667 return HS_BOOL_FALSE;
668 #endif
669 }
670
671 /* ---------------------------------------------------------------------------
672 * isThreadBound(tso): check whether tso is bound to an OS thread.
673 * ------------------------------------------------------------------------- */
674
675 StgBool
676 isThreadBound(StgTSO* tso USED_IF_THREADS)
677 {
678 #if defined(THREADED_RTS)
679 return (tso->bound != NULL);
680 #endif
681 return rtsFalse;
682 }
683
684 /* ----------------------------------------------------------------------------
685 * Debugging: why is a thread blocked
686 * ------------------------------------------------------------------------- */
687
688 #if DEBUG
689 void
690 printThreadBlockage(StgTSO *tso)
691 {
692 switch (tso->why_blocked) {
693 case BlockedOnRead:
694 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
695 break;
696 case BlockedOnWrite:
697 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
698 break;
699 #if defined(mingw32_HOST_OS)
700 case BlockedOnDoProc:
701 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
702 break;
703 #endif
704 case BlockedOnDelay:
705 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
706 break;
707 case BlockedOnMVar:
708 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
709 break;
710 case BlockedOnException:
711 debugBelch("is blocked on delivering an exception to thread %lu",
712 (unsigned long)tso->block_info.tso->id);
713 break;
714 case BlockedOnBlackHole:
715 debugBelch("is blocked on a black hole");
716 break;
717 case NotBlocked:
718 debugBelch("is not blocked");
719 break;
720 #if defined(PARALLEL_HASKELL)
721 case BlockedOnGA:
722 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
723 tso->block_info.closure, info_type(tso->block_info.closure));
724 break;
725 case BlockedOnGA_NoSend:
726 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
727 tso->block_info.closure, info_type(tso->block_info.closure));
728 break;
729 #endif
730 case BlockedOnCCall:
731 debugBelch("is blocked on an external call");
732 break;
733 case BlockedOnCCall_NoUnblockExc:
734 debugBelch("is blocked on an external call (exceptions were already blocked)");
735 break;
736 case BlockedOnSTM:
737 debugBelch("is blocked on an STM operation");
738 break;
739 default:
740 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
741 tso->why_blocked, tso->id, tso);
742 }
743 }
744
745 void
746 printThreadStatus(StgTSO *t)
747 {
748 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
749 {
750 void *label = lookupThreadLabel(t->id);
751 if (label) debugBelch("[\"%s\"] ",(char *)label);
752 }
753 if (t->what_next == ThreadRelocated) {
754 debugBelch("has been relocated...\n");
755 } else {
756 switch (t->what_next) {
757 case ThreadKilled:
758 debugBelch("has been killed");
759 break;
760 case ThreadComplete:
761 debugBelch("has completed");
762 break;
763 default:
764 printThreadBlockage(t);
765 }
766 debugBelch("\n");
767 }
768 }
769
770 void
771 printAllThreads(void)
772 {
773 StgTSO *t, *next;
774 nat i, s;
775 Capability *cap;
776
777 # if defined(GRAN)
778 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
779 ullong_format_string(TIME_ON_PROC(CurrentProc),
780 time_string, rtsFalse/*no commas!*/);
781
782 debugBelch("all threads at [%s]:\n", time_string);
783 # elif defined(PARALLEL_HASKELL)
784 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
785 ullong_format_string(CURRENT_TIME,
786 time_string, rtsFalse/*no commas!*/);
787
788 debugBelch("all threads at [%s]:\n", time_string);
789 # else
790 debugBelch("all threads:\n");
791 # endif
792
793 for (i = 0; i < n_capabilities; i++) {
794 cap = &capabilities[i];
795 debugBelch("threads on capability %d:\n", cap->no);
796 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
797 printThreadStatus(t);
798 }
799 }
800
801 debugBelch("other threads:\n");
802 for (s = 0; s < total_steps; s++) {
803 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
804 if (t->why_blocked != NotBlocked) {
805 printThreadStatus(t);
806 }
807 if (t->what_next == ThreadRelocated) {
808 next = t->_link;
809 } else {
810 next = t->global_link;
811 }
812 }
813 }
814 }
815
816 // useful from gdb
817 void
818 printThreadQueue(StgTSO *t)
819 {
820 nat i = 0;
821 for (; t != END_TSO_QUEUE; t = t->_link) {
822 printThreadStatus(t);
823 i++;
824 }
825 debugBelch("%d threads on queue\n", i);
826 }
827
828 /*
829 Print a whole blocking queue attached to node (debugging only).
830 */
831 # if defined(PARALLEL_HASKELL)
832 void
833 print_bq (StgClosure *node)
834 {
835 StgBlockingQueueElement *bqe;
836 StgTSO *tso;
837 rtsBool end;
838
839 debugBelch("## BQ of closure %p (%s): ",
840 node, info_type(node));
841
842 /* should cover all closures that may have a blocking queue */
843 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
844 get_itbl(node)->type == FETCH_ME_BQ ||
845 get_itbl(node)->type == RBH ||
846 get_itbl(node)->type == MVAR);
847
848 ASSERT(node!=(StgClosure*)NULL); // sanity check
849
850 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
851 }
852
853 /*
854 Print a whole blocking queue starting with the element bqe.
855 */
856 void
857 print_bqe (StgBlockingQueueElement *bqe)
858 {
859 rtsBool end;
860
861 /*
862 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
863 */
864 for (end = (bqe==END_BQ_QUEUE);
865 !end; // iterate until bqe points to a CONSTR
866 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
867 bqe = end ? END_BQ_QUEUE : bqe->link) {
868 ASSERT(bqe != END_BQ_QUEUE); // sanity check
869 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
870 /* types of closures that may appear in a blocking queue */
871 ASSERT(get_itbl(bqe)->type == TSO ||
872 get_itbl(bqe)->type == BLOCKED_FETCH ||
873 get_itbl(bqe)->type == CONSTR);
874 /* only BQs of an RBH end with an RBH_Save closure */
875 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
876
877 switch (get_itbl(bqe)->type) {
878 case TSO:
879 debugBelch(" TSO %u (%x),",
880 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
881 break;
882 case BLOCKED_FETCH:
883 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
884 ((StgBlockedFetch *)bqe)->node,
885 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
886 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
887 ((StgBlockedFetch *)bqe)->ga.weight);
888 break;
889 case CONSTR:
890 debugBelch(" %s (IP %p),",
891 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
892 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
893 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
894 "RBH_Save_?"), get_itbl(bqe));
895 break;
896 default:
897 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
898 info_type((StgClosure *)bqe)); // , node, info_type(node));
899 break;
900 }
901 } /* for */
902 debugBelch("\n");
903 }
904 # elif defined(GRAN)
905 void
906 print_bq (StgClosure *node)
907 {
908 StgBlockingQueueElement *bqe;
909 PEs node_loc, tso_loc;
910 rtsBool end;
911
912 /* should cover all closures that may have a blocking queue */
913 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
914 get_itbl(node)->type == FETCH_ME_BQ ||
915 get_itbl(node)->type == RBH);
916
917 ASSERT(node!=(StgClosure*)NULL); // sanity check
918 node_loc = where_is(node);
919
920 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
921 node, info_type(node), node_loc);
922
923 /*
924 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
925 */
926 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
927 !end; // iterate until bqe points to a CONSTR
928 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
929 ASSERT(bqe != END_BQ_QUEUE); // sanity check
930 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
931 /* types of closures that may appear in a blocking queue */
932 ASSERT(get_itbl(bqe)->type == TSO ||
933 get_itbl(bqe)->type == CONSTR);
934 /* only BQs of an RBH end with an RBH_Save closure */
935 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
936
937 tso_loc = where_is((StgClosure *)bqe);
938 switch (get_itbl(bqe)->type) {
939 case TSO:
940 debugBelch(" TSO %d (%p) on [PE %d],",
941 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
942 break;
943 case CONSTR:
944 debugBelch(" %s (IP %p),",
945 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
946 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
947 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
948 "RBH_Save_?"), get_itbl(bqe));
949 break;
950 default:
951 barf("Unexpected closure type %s in blocking queue of %p (%s)",
952 info_type((StgClosure *)bqe), node, info_type(node));
953 break;
954 }
955 } /* for */
956 debugBelch("\n");
957 }
958 # endif
959
960 #if defined(PARALLEL_HASKELL)
961 nat
962 run_queue_len(void)
963 {
964 nat i;
965 StgTSO *tso;
966
967 for (i=0, tso=run_queue_hd;
968 tso != END_TSO_QUEUE;
969 i++, tso=tso->link) {
970 /* nothing */
971 }
972
973 return i;
974 }
975 #endif
976
977 #endif /* DEBUG */