Instead of a separate context-switch flag, set HpLim to zero
[ghc.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2006
4 *
5 * Thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "Storage.h"
13 #include "Threads.h"
14 #include "RtsFlags.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19
20 /* Next thread ID to allocate.
21 * LOCK: sched_mutex
22 */
23 static StgThreadID next_thread_id = 1;
24
25 /* The smallest stack size that makes any sense is:
26 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
27 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
28 * + 1 (the closure to enter)
29 * + 1 (stg_ap_v_ret)
30 * + 1 (spare slot req'd by stg_ap_v_ret)
31 *
32 * A thread with this stack will bomb immediately with a stack
33 * overflow, which will increase its stack size.
34 */
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
36
37 /* ---------------------------------------------------------------------------
38 Create a new thread.
39
40 The new thread starts with the given stack size. Before the
41 scheduler can run, however, this thread needs to have a closure
42 (and possibly some arguments) pushed on its stack. See
43 pushClosure() in Schedule.h.
44
45 createGenThread() and createIOThread() (in SchedAPI.h) are
46 convenient packaged versions of this function.
47
48 currently pri (priority) is only used in a GRAN setup -- HWL
49 ------------------------------------------------------------------------ */
50 #if defined(GRAN)
51 /* currently pri (priority) is only used in a GRAN setup -- HWL */
52 StgTSO *
53 createThread(nat size, StgInt pri)
54 #else
55 StgTSO *
56 createThread(Capability *cap, nat size)
57 #endif
58 {
59 StgTSO *tso;
60 nat stack_size;
61
62 /* sched_mutex is *not* required */
63
64 /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
68 threadsIgnored++;
69 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
71 return END_TSO_QUEUE;
72 }
73 threadsCreated++;
74 #endif
75
76 #if defined(GRAN)
77 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
78 #endif
79
80 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
81
82 /* catch ridiculously small stack sizes */
83 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
85 }
86
87 stack_size = size - TSO_STRUCT_SIZEW;
88
89 tso = (StgTSO *)allocateLocal(cap, size);
90 TICK_ALLOC_TSO(stack_size, 0);
91
92 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
93 #if defined(GRAN)
94 SET_GRAN_HDR(tso, ThisPE);
95 #endif
96
97 // Always start with the compiled code evaluator
98 tso->what_next = ThreadRunGHC;
99
100 tso->why_blocked = NotBlocked;
101 tso->blocked_exceptions = END_TSO_QUEUE;
102 tso->flags = TSO_DIRTY;
103
104 tso->saved_errno = 0;
105 tso->bound = NULL;
106 tso->cap = cap;
107
108 tso->stack_size = stack_size;
109 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
110 - TSO_STRUCT_SIZEW;
111 tso->sp = (P_)&(tso->stack) + stack_size;
112
113 tso->trec = NO_TREC;
114
115 #ifdef PROFILING
116 tso->prof.CCCS = CCS_MAIN;
117 #endif
118
119 /* put a stop frame on the stack */
120 tso->sp -= sizeofW(StgStopFrame);
121 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122 tso->_link = END_TSO_QUEUE;
123
124 // ToDo: check this
125 #if defined(GRAN)
126 /* uses more flexible routine in GranSim */
127 insertThread(tso, CurrentProc);
128 #else
129 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
130 * from its creation
131 */
132 #endif
133
134 #if defined(GRAN)
135 if (RtsFlags.GranFlags.GranSimStats.Full)
136 DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138 if (RtsFlags.ParFlags.ParStats.Full)
139 DumpGranEvent(GR_STARTQ,tso);
140 /* HACk to avoid SCHEDULE
141 LastTSO = tso; */
142 #endif
143
144 /* Link the new thread on the global thread list.
145 */
146 ACQUIRE_LOCK(&sched_mutex);
147 tso->id = next_thread_id++; // while we have the mutex
148 tso->global_link = g0s0->threads;
149 g0s0->threads = tso;
150 RELEASE_LOCK(&sched_mutex);
151
152 #if defined(DIST)
153 tso->dist.priority = MandatoryPriority; //by default that is...
154 #endif
155
156 #if defined(GRAN)
157 tso->gran.pri = pri;
158 # if defined(DEBUG)
159 tso->gran.magic = TSO_MAGIC; // debugging only
160 # endif
161 tso->gran.sparkname = 0;
162 tso->gran.startedat = CURRENT_TIME;
163 tso->gran.exported = 0;
164 tso->gran.basicblocks = 0;
165 tso->gran.allocs = 0;
166 tso->gran.exectime = 0;
167 tso->gran.fetchtime = 0;
168 tso->gran.fetchcount = 0;
169 tso->gran.blocktime = 0;
170 tso->gran.blockcount = 0;
171 tso->gran.blockedat = 0;
172 tso->gran.globalsparks = 0;
173 tso->gran.localsparks = 0;
174 if (RtsFlags.GranFlags.Light)
175 tso->gran.clock = Now; /* local clock */
176 else
177 tso->gran.clock = 0;
178
179 IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
181 # if defined(DEBUG)
182 tso->par.magic = TSO_MAGIC; // debugging only
183 # endif
184 tso->par.sparkname = 0;
185 tso->par.startedat = CURRENT_TIME;
186 tso->par.exported = 0;
187 tso->par.basicblocks = 0;
188 tso->par.allocs = 0;
189 tso->par.exectime = 0;
190 tso->par.fetchtime = 0;
191 tso->par.fetchcount = 0;
192 tso->par.blocktime = 0;
193 tso->par.blockcount = 0;
194 tso->par.blockedat = 0;
195 tso->par.globalsparks = 0;
196 tso->par.localsparks = 0;
197 #endif
198
199 #if defined(GRAN)
200 globalGranStats.tot_threads_created++;
201 globalGranStats.threads_created_on_PE[CurrentProc]++;
202 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203 globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205 // collect parallel global statistics (currently done together with GC stats)
206 if (RtsFlags.ParFlags.ParStats.Global &&
207 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
209 globalParStats.tot_threads_created++;
210 }
211 #endif
212
213 #if defined(GRAN)
214 debugTrace(GRAN_DEBUG_pri,
215 "==__ schedule: Created TSO %d (%p);",
216 CurrentProc, tso, tso->id);
217 #elif defined(PARALLEL_HASKELL)
218 debugTrace(PAR_DEBUG_verbose,
219 "==__ schedule: Created TSO %d (%p); %d threads active",
220 (long)tso->id, tso, advisory_thread_count);
221 #else
222 debugTrace(DEBUG_sched,
223 "created thread %ld, stack size = %lx words",
224 (long)tso->id, (long)tso->stack_size);
225 #endif
226 return tso;
227 }
228
229 #if defined(PAR)
230 /* RFP:
231 all parallel thread creation calls should fall through the following routine.
232 */
233 StgTSO *
234 createThreadFromSpark(rtsSpark spark)
235 { StgTSO *tso;
236 ASSERT(spark != (rtsSpark)NULL);
237 // JB: TAKE CARE OF THIS COUNTER! BUGGY
238 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
239 { threadsIgnored++;
240 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
241 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
242 return END_TSO_QUEUE;
243 }
244 else
245 { threadsCreated++;
246 tso = createThread(RtsFlags.GcFlags.initialStkSize);
247 if (tso==END_TSO_QUEUE)
248 barf("createSparkThread: Cannot create TSO");
249 #if defined(DIST)
250 tso->priority = AdvisoryPriority;
251 #endif
252 pushClosure(tso,spark);
253 addToRunQueue(tso);
254 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
255 }
256 return tso;
257 }
258 #endif
259
260 /* ---------------------------------------------------------------------------
261 * Comparing Thread ids.
262 *
263 * This is used from STG land in the implementation of the
264 * instances of Eq/Ord for ThreadIds.
265 * ------------------------------------------------------------------------ */
266
267 int
268 cmp_thread(StgPtr tso1, StgPtr tso2)
269 {
270 StgThreadID id1 = ((StgTSO *)tso1)->id;
271 StgThreadID id2 = ((StgTSO *)tso2)->id;
272
273 if (id1 < id2) return (-1);
274 if (id1 > id2) return 1;
275 return 0;
276 }
277
278 /* ---------------------------------------------------------------------------
279 * Fetching the ThreadID from an StgTSO.
280 *
281 * This is used in the implementation of Show for ThreadIds.
282 * ------------------------------------------------------------------------ */
283 int
284 rts_getThreadId(StgPtr tso)
285 {
286 return ((StgTSO *)tso)->id;
287 }
288
289 /* -----------------------------------------------------------------------------
290 Remove a thread from a queue.
291 Fails fatally if the TSO is not on the queue.
292 -------------------------------------------------------------------------- */
293
294 void
295 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
296 {
297 StgTSO *t, *prev;
298
299 prev = NULL;
300 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
301 if (t == tso) {
302 if (prev) {
303 setTSOLink(cap,prev,t->_link);
304 } else {
305 *queue = t->_link;
306 }
307 return;
308 }
309 }
310 barf("removeThreadFromQueue: not found");
311 }
312
313 void
314 removeThreadFromDeQueue (Capability *cap,
315 StgTSO **head, StgTSO **tail, StgTSO *tso)
316 {
317 StgTSO *t, *prev;
318
319 prev = NULL;
320 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
321 if (t == tso) {
322 if (prev) {
323 setTSOLink(cap,prev,t->_link);
324 } else {
325 *head = t->_link;
326 }
327 if (*tail == tso) {
328 if (prev) {
329 *tail = prev;
330 } else {
331 *tail = END_TSO_QUEUE;
332 }
333 }
334 return;
335 }
336 }
337 barf("removeThreadFromMVarQueue: not found");
338 }
339
340 void
341 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
342 {
343 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
344 }
345
346 /* ----------------------------------------------------------------------------
347 unblockOne()
348
349 unblock a single thread.
350 ------------------------------------------------------------------------- */
351
352 #if defined(GRAN)
353 STATIC_INLINE void
354 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
355 {
356 }
357 #elif defined(PARALLEL_HASKELL)
358 STATIC_INLINE void
359 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
360 {
361 /* write RESUME events to log file and
362 update blocked and fetch time (depending on type of the orig closure) */
363 if (RtsFlags.ParFlags.ParStats.Full) {
364 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
365 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
366 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
367 if (emptyRunQueue())
368 emitSchedule = rtsTrue;
369
370 switch (get_itbl(node)->type) {
371 case FETCH_ME_BQ:
372 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
373 break;
374 case RBH:
375 case FETCH_ME:
376 case BLACKHOLE_BQ:
377 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
378 break;
379 #ifdef DIST
380 case MVAR:
381 break;
382 #endif
383 default:
384 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
385 }
386 }
387 }
388 #endif
389
390 #if defined(GRAN)
391 StgBlockingQueueElement *
392 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
393 {
394 StgTSO *tso;
395 PEs node_loc, tso_loc;
396
397 node_loc = where_is(node); // should be lifted out of loop
398 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
399 tso_loc = where_is((StgClosure *)tso);
400 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
401 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
402 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
403 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
404 // insertThread(tso, node_loc);
405 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
406 ResumeThread,
407 tso, node, (rtsSpark*)NULL);
408 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
409 // len_local++;
410 // len++;
411 } else { // TSO is remote (actually should be FMBQ)
412 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
413 RtsFlags.GranFlags.Costs.gunblocktime +
414 RtsFlags.GranFlags.Costs.latency;
415 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
416 UnblockThread,
417 tso, node, (rtsSpark*)NULL);
418 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
419 // len++;
420 }
421 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
422 IF_GRAN_DEBUG(bq,
423 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
424 (node_loc==tso_loc ? "Local" : "Global"),
425 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
426 tso->block_info.closure = NULL;
427 debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
428 tso->id, tso));
429 }
430 #elif defined(PARALLEL_HASKELL)
431 StgBlockingQueueElement *
432 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
433 {
434 StgBlockingQueueElement *next;
435
436 switch (get_itbl(bqe)->type) {
437 case TSO:
438 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
439 /* if it's a TSO just push it onto the run_queue */
440 next = bqe->link;
441 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
442 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
443 threadRunnable();
444 unblockCount(bqe, node);
445 /* reset blocking status after dumping event */
446 ((StgTSO *)bqe)->why_blocked = NotBlocked;
447 break;
448
449 case BLOCKED_FETCH:
450 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
451 next = bqe->link;
452 bqe->link = (StgBlockingQueueElement *)PendingFetches;
453 PendingFetches = (StgBlockedFetch *)bqe;
454 break;
455
456 # if defined(DEBUG)
457 /* can ignore this case in a non-debugging setup;
458 see comments on RBHSave closures above */
459 case CONSTR:
460 /* check that the closure is an RBHSave closure */
461 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
462 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
463 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
464 break;
465
466 default:
467 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
468 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
469 (StgClosure *)bqe);
470 # endif
471 }
472 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
473 return next;
474 }
475 #endif
476
477 StgTSO *
478 unblockOne (Capability *cap, StgTSO *tso)
479 {
480 return unblockOne_(cap,tso,rtsTrue); // allow migration
481 }
482
483 StgTSO *
484 unblockOne_ (Capability *cap, StgTSO *tso,
485 rtsBool allow_migrate USED_IF_THREADS)
486 {
487 StgTSO *next;
488
489 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
490 ASSERT(tso->why_blocked != NotBlocked);
491
492 tso->why_blocked = NotBlocked;
493 next = tso->_link;
494 tso->_link = END_TSO_QUEUE;
495
496 #if defined(THREADED_RTS)
497 if (tso->cap == cap || (!tsoLocked(tso) &&
498 allow_migrate &&
499 RtsFlags.ParFlags.wakeupMigrate)) {
500 // We are waking up this thread on the current Capability, which
501 // might involve migrating it from the Capability it was last on.
502 if (tso->bound) {
503 ASSERT(tso->bound->cap == tso->cap);
504 tso->bound->cap = cap;
505 }
506 tso->cap = cap;
507 appendToRunQueue(cap,tso);
508
509 // context-switch soonish so we can migrate the new thread if
510 // necessary. NB. not contextSwitchCapability(cap), which would
511 // force a context switch immediately.
512 cap->context_switch = 1;
513 } else {
514 // we'll try to wake it up on the Capability it was last on.
515 wakeupThreadOnCapability(cap, tso->cap, tso);
516 }
517 #else
518 appendToRunQueue(cap,tso);
519
520 // context-switch soonish so we can migrate the new thread if
521 // necessary. NB. not contextSwitchCapability(cap), which would
522 // force a context switch immediately.
523 cap->context_switch = 1;
524 #endif
525
526 debugTrace(DEBUG_sched,
527 "waking up thread %ld on cap %d",
528 (long)tso->id, tso->cap->no);
529
530 return next;
531 }
532
533 /* ----------------------------------------------------------------------------
534 awakenBlockedQueue
535
536 wakes up all the threads on the specified queue.
537 ------------------------------------------------------------------------- */
538
539 #if defined(GRAN)
540 void
541 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
542 {
543 StgBlockingQueueElement *bqe;
544 PEs node_loc;
545 nat len = 0;
546
547 IF_GRAN_DEBUG(bq,
548 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
549 node, CurrentProc, CurrentTime[CurrentProc],
550 CurrentTSO->id, CurrentTSO));
551
552 node_loc = where_is(node);
553
554 ASSERT(q == END_BQ_QUEUE ||
555 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
556 get_itbl(q)->type == CONSTR); // closure (type constructor)
557 ASSERT(is_unique(node));
558
559 /* FAKE FETCH: magically copy the node to the tso's proc;
560 no Fetch necessary because in reality the node should not have been
561 moved to the other PE in the first place
562 */
563 if (CurrentProc!=node_loc) {
564 IF_GRAN_DEBUG(bq,
565 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
566 node, node_loc, CurrentProc, CurrentTSO->id,
567 // CurrentTSO, where_is(CurrentTSO),
568 node->header.gran.procs));
569 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
570 IF_GRAN_DEBUG(bq,
571 debugBelch("## new bitmask of node %p is %#x\n",
572 node, node->header.gran.procs));
573 if (RtsFlags.GranFlags.GranSimStats.Global) {
574 globalGranStats.tot_fake_fetches++;
575 }
576 }
577
578 bqe = q;
579 // ToDo: check: ASSERT(CurrentProc==node_loc);
580 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
581 //next = bqe->link;
582 /*
583 bqe points to the current element in the queue
584 next points to the next element in the queue
585 */
586 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
587 //tso_loc = where_is(tso);
588 len++;
589 bqe = unblockOne(bqe, node);
590 }
591
592 /* if this is the BQ of an RBH, we have to put back the info ripped out of
593 the closure to make room for the anchor of the BQ */
594 if (bqe!=END_BQ_QUEUE) {
595 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
596 /*
597 ASSERT((info_ptr==&RBH_Save_0_info) ||
598 (info_ptr==&RBH_Save_1_info) ||
599 (info_ptr==&RBH_Save_2_info));
600 */
601 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
602 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
603 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
604
605 IF_GRAN_DEBUG(bq,
606 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
607 node, info_type(node)));
608 }
609
610 /* statistics gathering */
611 if (RtsFlags.GranFlags.GranSimStats.Global) {
612 // globalGranStats.tot_bq_processing_time += bq_processing_time;
613 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
614 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
615 globalGranStats.tot_awbq++; // total no. of bqs awakened
616 }
617 IF_GRAN_DEBUG(bq,
618 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
619 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
620 }
621 #elif defined(PARALLEL_HASKELL)
622 void
623 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
624 {
625 StgBlockingQueueElement *bqe;
626
627 IF_PAR_DEBUG(verbose,
628 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
629 node, mytid));
630 #ifdef DIST
631 //RFP
632 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
633 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
634 return;
635 }
636 #endif
637
638 ASSERT(q == END_BQ_QUEUE ||
639 get_itbl(q)->type == TSO ||
640 get_itbl(q)->type == BLOCKED_FETCH ||
641 get_itbl(q)->type == CONSTR);
642
643 bqe = q;
644 while (get_itbl(bqe)->type==TSO ||
645 get_itbl(bqe)->type==BLOCKED_FETCH) {
646 bqe = unblockOne(bqe, node);
647 }
648 }
649
650 #else /* !GRAN && !PARALLEL_HASKELL */
651
652 void
653 awakenBlockedQueue(Capability *cap, StgTSO *tso)
654 {
655 while (tso != END_TSO_QUEUE) {
656 tso = unblockOne(cap,tso);
657 }
658 }
659 #endif
660
661
662 /* ---------------------------------------------------------------------------
663 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
664 * used by Control.Concurrent for error checking.
665 * ------------------------------------------------------------------------- */
666
667 HsBool
668 rtsSupportsBoundThreads(void)
669 {
670 #if defined(THREADED_RTS)
671 return HS_BOOL_TRUE;
672 #else
673 return HS_BOOL_FALSE;
674 #endif
675 }
676
677 /* ---------------------------------------------------------------------------
678 * isThreadBound(tso): check whether tso is bound to an OS thread.
679 * ------------------------------------------------------------------------- */
680
681 StgBool
682 isThreadBound(StgTSO* tso USED_IF_THREADS)
683 {
684 #if defined(THREADED_RTS)
685 return (tso->bound != NULL);
686 #endif
687 return rtsFalse;
688 }
689
690 /* ----------------------------------------------------------------------------
691 * Debugging: why is a thread blocked
692 * ------------------------------------------------------------------------- */
693
694 #if DEBUG
695 void
696 printThreadBlockage(StgTSO *tso)
697 {
698 switch (tso->why_blocked) {
699 case BlockedOnRead:
700 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
701 break;
702 case BlockedOnWrite:
703 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
704 break;
705 #if defined(mingw32_HOST_OS)
706 case BlockedOnDoProc:
707 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
708 break;
709 #endif
710 case BlockedOnDelay:
711 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
712 break;
713 case BlockedOnMVar:
714 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
715 break;
716 case BlockedOnException:
717 debugBelch("is blocked on delivering an exception to thread %lu",
718 (unsigned long)tso->block_info.tso->id);
719 break;
720 case BlockedOnBlackHole:
721 debugBelch("is blocked on a black hole");
722 break;
723 case NotBlocked:
724 debugBelch("is not blocked");
725 break;
726 #if defined(PARALLEL_HASKELL)
727 case BlockedOnGA:
728 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
729 tso->block_info.closure, info_type(tso->block_info.closure));
730 break;
731 case BlockedOnGA_NoSend:
732 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
733 tso->block_info.closure, info_type(tso->block_info.closure));
734 break;
735 #endif
736 case BlockedOnCCall:
737 debugBelch("is blocked on an external call");
738 break;
739 case BlockedOnCCall_NoUnblockExc:
740 debugBelch("is blocked on an external call (exceptions were already blocked)");
741 break;
742 case BlockedOnSTM:
743 debugBelch("is blocked on an STM operation");
744 break;
745 default:
746 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
747 tso->why_blocked, tso->id, tso);
748 }
749 }
750
751 void
752 printThreadStatus(StgTSO *t)
753 {
754 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
755 {
756 void *label = lookupThreadLabel(t->id);
757 if (label) debugBelch("[\"%s\"] ",(char *)label);
758 }
759 if (t->what_next == ThreadRelocated) {
760 debugBelch("has been relocated...\n");
761 } else {
762 switch (t->what_next) {
763 case ThreadKilled:
764 debugBelch("has been killed");
765 break;
766 case ThreadComplete:
767 debugBelch("has completed");
768 break;
769 default:
770 printThreadBlockage(t);
771 }
772 if (t->flags & TSO_DIRTY) {
773 debugBelch(" (TSO_DIRTY)");
774 } else if (t->flags & TSO_LINK_DIRTY) {
775 debugBelch(" (TSO_LINK_DIRTY)");
776 }
777 debugBelch("\n");
778 }
779 }
780
781 void
782 printAllThreads(void)
783 {
784 StgTSO *t, *next;
785 nat i, s;
786 Capability *cap;
787
788 # if defined(GRAN)
789 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
790 ullong_format_string(TIME_ON_PROC(CurrentProc),
791 time_string, rtsFalse/*no commas!*/);
792
793 debugBelch("all threads at [%s]:\n", time_string);
794 # elif defined(PARALLEL_HASKELL)
795 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
796 ullong_format_string(CURRENT_TIME,
797 time_string, rtsFalse/*no commas!*/);
798
799 debugBelch("all threads at [%s]:\n", time_string);
800 # else
801 debugBelch("all threads:\n");
802 # endif
803
804 for (i = 0; i < n_capabilities; i++) {
805 cap = &capabilities[i];
806 debugBelch("threads on capability %d:\n", cap->no);
807 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
808 printThreadStatus(t);
809 }
810 }
811
812 debugBelch("other threads:\n");
813 for (s = 0; s < total_steps; s++) {
814 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
815 if (t->why_blocked != NotBlocked) {
816 printThreadStatus(t);
817 }
818 if (t->what_next == ThreadRelocated) {
819 next = t->_link;
820 } else {
821 next = t->global_link;
822 }
823 }
824 }
825 }
826
827 // useful from gdb
828 void
829 printThreadQueue(StgTSO *t)
830 {
831 nat i = 0;
832 for (; t != END_TSO_QUEUE; t = t->_link) {
833 printThreadStatus(t);
834 i++;
835 }
836 debugBelch("%d threads on queue\n", i);
837 }
838
839 /*
840 Print a whole blocking queue attached to node (debugging only).
841 */
842 # if defined(PARALLEL_HASKELL)
843 void
844 print_bq (StgClosure *node)
845 {
846 StgBlockingQueueElement *bqe;
847 StgTSO *tso;
848 rtsBool end;
849
850 debugBelch("## BQ of closure %p (%s): ",
851 node, info_type(node));
852
853 /* should cover all closures that may have a blocking queue */
854 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
855 get_itbl(node)->type == FETCH_ME_BQ ||
856 get_itbl(node)->type == RBH ||
857 get_itbl(node)->type == MVAR);
858
859 ASSERT(node!=(StgClosure*)NULL); // sanity check
860
861 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
862 }
863
864 /*
865 Print a whole blocking queue starting with the element bqe.
866 */
867 void
868 print_bqe (StgBlockingQueueElement *bqe)
869 {
870 rtsBool end;
871
872 /*
873 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
874 */
875 for (end = (bqe==END_BQ_QUEUE);
876 !end; // iterate until bqe points to a CONSTR
877 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
878 bqe = end ? END_BQ_QUEUE : bqe->link) {
879 ASSERT(bqe != END_BQ_QUEUE); // sanity check
880 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
881 /* types of closures that may appear in a blocking queue */
882 ASSERT(get_itbl(bqe)->type == TSO ||
883 get_itbl(bqe)->type == BLOCKED_FETCH ||
884 get_itbl(bqe)->type == CONSTR);
885 /* only BQs of an RBH end with an RBH_Save closure */
886 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
887
888 switch (get_itbl(bqe)->type) {
889 case TSO:
890 debugBelch(" TSO %u (%x),",
891 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
892 break;
893 case BLOCKED_FETCH:
894 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
895 ((StgBlockedFetch *)bqe)->node,
896 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
897 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
898 ((StgBlockedFetch *)bqe)->ga.weight);
899 break;
900 case CONSTR:
901 debugBelch(" %s (IP %p),",
902 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
903 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
904 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
905 "RBH_Save_?"), get_itbl(bqe));
906 break;
907 default:
908 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
909 info_type((StgClosure *)bqe)); // , node, info_type(node));
910 break;
911 }
912 } /* for */
913 debugBelch("\n");
914 }
915 # elif defined(GRAN)
916 void
917 print_bq (StgClosure *node)
918 {
919 StgBlockingQueueElement *bqe;
920 PEs node_loc, tso_loc;
921 rtsBool end;
922
923 /* should cover all closures that may have a blocking queue */
924 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
925 get_itbl(node)->type == FETCH_ME_BQ ||
926 get_itbl(node)->type == RBH);
927
928 ASSERT(node!=(StgClosure*)NULL); // sanity check
929 node_loc = where_is(node);
930
931 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
932 node, info_type(node), node_loc);
933
934 /*
935 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
936 */
937 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
938 !end; // iterate until bqe points to a CONSTR
939 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
940 ASSERT(bqe != END_BQ_QUEUE); // sanity check
941 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
942 /* types of closures that may appear in a blocking queue */
943 ASSERT(get_itbl(bqe)->type == TSO ||
944 get_itbl(bqe)->type == CONSTR);
945 /* only BQs of an RBH end with an RBH_Save closure */
946 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
947
948 tso_loc = where_is((StgClosure *)bqe);
949 switch (get_itbl(bqe)->type) {
950 case TSO:
951 debugBelch(" TSO %d (%p) on [PE %d],",
952 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
953 break;
954 case CONSTR:
955 debugBelch(" %s (IP %p),",
956 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
957 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
958 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
959 "RBH_Save_?"), get_itbl(bqe));
960 break;
961 default:
962 barf("Unexpected closure type %s in blocking queue of %p (%s)",
963 info_type((StgClosure *)bqe), node, info_type(node));
964 break;
965 }
966 } /* for */
967 debugBelch("\n");
968 }
969 # endif
970
971 #if defined(PARALLEL_HASKELL)
972 nat
973 run_queue_len(void)
974 {
975 nat i;
976 StgTSO *tso;
977
978 for (i=0, tso=run_queue_hd;
979 tso != END_TSO_QUEUE;
980 i++, tso=tso->link) {
981 /* nothing */
982 }
983
984 return i;
985 }
986 #endif
987
988 #endif /* DEBUG */