Round stack size to a whole number of megablocks
[ghc.git] / rts / Threads.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2006
4 *
5 * Thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "Storage.h"
13 #include "Threads.h"
14 #include "RtsFlags.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19
20 /* Next thread ID to allocate.
21 * LOCK: sched_mutex
22 */
23 static StgThreadID next_thread_id = 1;
24
25 /* The smallest stack size that makes any sense is:
26 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
27 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
28 * + 1 (the closure to enter)
29 * + 1 (stg_ap_v_ret)
30 * + 1 (spare slot req'd by stg_ap_v_ret)
31 *
32 * A thread with this stack will bomb immediately with a stack
33 * overflow, which will increase its stack size.
34 */
35 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
36
37 /* ---------------------------------------------------------------------------
38 Create a new thread.
39
40 The new thread starts with the given stack size. Before the
41 scheduler can run, however, this thread needs to have a closure
42 (and possibly some arguments) pushed on its stack. See
43 pushClosure() in Schedule.h.
44
45 createGenThread() and createIOThread() (in SchedAPI.h) are
46 convenient packaged versions of this function.
47
48 currently pri (priority) is only used in a GRAN setup -- HWL
49 ------------------------------------------------------------------------ */
50 #if defined(GRAN)
51 /* currently pri (priority) is only used in a GRAN setup -- HWL */
52 StgTSO *
53 createThread(nat size, StgInt pri)
54 #else
55 StgTSO *
56 createThread(Capability *cap, nat size)
57 #endif
58 {
59 StgTSO *tso;
60 nat stack_size;
61
62 /* sched_mutex is *not* required */
63
64 /* First check whether we should create a thread at all */
65 #if defined(PARALLEL_HASKELL)
66 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
67 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
68 threadsIgnored++;
69 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
70 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
71 return END_TSO_QUEUE;
72 }
73 threadsCreated++;
74 #endif
75
76 #if defined(GRAN)
77 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
78 #endif
79
80 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
81
82 /* catch ridiculously small stack sizes */
83 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
84 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
85 }
86
87 stack_size = round_to_mblocks(size) - TSO_STRUCT_SIZEW;
88
89 tso = (StgTSO *)allocateLocal(cap, size);
90 TICK_ALLOC_TSO(stack_size, 0);
91
92 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
93 #if defined(GRAN)
94 SET_GRAN_HDR(tso, ThisPE);
95 #endif
96
97 // Always start with the compiled code evaluator
98 tso->what_next = ThreadRunGHC;
99
100 tso->why_blocked = NotBlocked;
101 tso->blocked_exceptions = END_TSO_QUEUE;
102 tso->flags = TSO_DIRTY;
103
104 tso->saved_errno = 0;
105 tso->bound = NULL;
106 tso->cap = cap;
107
108 tso->stack_size = stack_size;
109 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
110 - TSO_STRUCT_SIZEW;
111 tso->sp = (P_)&(tso->stack) + stack_size;
112
113 tso->trec = NO_TREC;
114
115 #ifdef PROFILING
116 tso->prof.CCCS = CCS_MAIN;
117 #endif
118
119 /* put a stop frame on the stack */
120 tso->sp -= sizeofW(StgStopFrame);
121 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
122 tso->_link = END_TSO_QUEUE;
123
124 // ToDo: check this
125 #if defined(GRAN)
126 /* uses more flexible routine in GranSim */
127 insertThread(tso, CurrentProc);
128 #else
129 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
130 * from its creation
131 */
132 #endif
133
134 #if defined(GRAN)
135 if (RtsFlags.GranFlags.GranSimStats.Full)
136 DumpGranEvent(GR_START,tso);
137 #elif defined(PARALLEL_HASKELL)
138 if (RtsFlags.ParFlags.ParStats.Full)
139 DumpGranEvent(GR_STARTQ,tso);
140 /* HACk to avoid SCHEDULE
141 LastTSO = tso; */
142 #endif
143
144 /* Link the new thread on the global thread list.
145 */
146 ACQUIRE_LOCK(&sched_mutex);
147 tso->id = next_thread_id++; // while we have the mutex
148 tso->global_link = g0s0->threads;
149 g0s0->threads = tso;
150 RELEASE_LOCK(&sched_mutex);
151
152 #if defined(DIST)
153 tso->dist.priority = MandatoryPriority; //by default that is...
154 #endif
155
156 #if defined(GRAN)
157 tso->gran.pri = pri;
158 # if defined(DEBUG)
159 tso->gran.magic = TSO_MAGIC; // debugging only
160 # endif
161 tso->gran.sparkname = 0;
162 tso->gran.startedat = CURRENT_TIME;
163 tso->gran.exported = 0;
164 tso->gran.basicblocks = 0;
165 tso->gran.allocs = 0;
166 tso->gran.exectime = 0;
167 tso->gran.fetchtime = 0;
168 tso->gran.fetchcount = 0;
169 tso->gran.blocktime = 0;
170 tso->gran.blockcount = 0;
171 tso->gran.blockedat = 0;
172 tso->gran.globalsparks = 0;
173 tso->gran.localsparks = 0;
174 if (RtsFlags.GranFlags.Light)
175 tso->gran.clock = Now; /* local clock */
176 else
177 tso->gran.clock = 0;
178
179 IF_DEBUG(gran,printTSO(tso));
180 #elif defined(PARALLEL_HASKELL)
181 # if defined(DEBUG)
182 tso->par.magic = TSO_MAGIC; // debugging only
183 # endif
184 tso->par.sparkname = 0;
185 tso->par.startedat = CURRENT_TIME;
186 tso->par.exported = 0;
187 tso->par.basicblocks = 0;
188 tso->par.allocs = 0;
189 tso->par.exectime = 0;
190 tso->par.fetchtime = 0;
191 tso->par.fetchcount = 0;
192 tso->par.blocktime = 0;
193 tso->par.blockcount = 0;
194 tso->par.blockedat = 0;
195 tso->par.globalsparks = 0;
196 tso->par.localsparks = 0;
197 #endif
198
199 #if defined(GRAN)
200 globalGranStats.tot_threads_created++;
201 globalGranStats.threads_created_on_PE[CurrentProc]++;
202 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
203 globalGranStats.tot_sq_probes++;
204 #elif defined(PARALLEL_HASKELL)
205 // collect parallel global statistics (currently done together with GC stats)
206 if (RtsFlags.ParFlags.ParStats.Global &&
207 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
208 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
209 globalParStats.tot_threads_created++;
210 }
211 #endif
212
213 postEvent (cap, EVENT_CREATE_THREAD, tso->id, 0);
214
215 #if defined(GRAN)
216 debugTrace(GRAN_DEBUG_pri,
217 "==__ schedule: Created TSO %d (%p);",
218 CurrentProc, tso, tso->id);
219 #elif defined(PARALLEL_HASKELL)
220 debugTrace(PAR_DEBUG_verbose,
221 "==__ schedule: Created TSO %d (%p); %d threads active",
222 (long)tso->id, tso, advisory_thread_count);
223 #else
224 debugTrace(DEBUG_sched,
225 "created thread %ld, stack size = %lx words",
226 (long)tso->id, (long)tso->stack_size);
227 #endif
228 return tso;
229 }
230
231 #if defined(PAR)
232 /* RFP:
233 all parallel thread creation calls should fall through the following routine.
234 */
235 StgTSO *
236 createThreadFromSpark(rtsSpark spark)
237 { StgTSO *tso;
238 ASSERT(spark != (rtsSpark)NULL);
239 // JB: TAKE CARE OF THIS COUNTER! BUGGY
240 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
241 { threadsIgnored++;
242 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
243 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
244 return END_TSO_QUEUE;
245 }
246 else
247 { threadsCreated++;
248 tso = createThread(RtsFlags.GcFlags.initialStkSize);
249 if (tso==END_TSO_QUEUE)
250 barf("createSparkThread: Cannot create TSO");
251 #if defined(DIST)
252 tso->priority = AdvisoryPriority;
253 #endif
254 pushClosure(tso,spark);
255 addToRunQueue(tso);
256 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
257 }
258 return tso;
259 }
260 #endif
261
262 /* ---------------------------------------------------------------------------
263 * Comparing Thread ids.
264 *
265 * This is used from STG land in the implementation of the
266 * instances of Eq/Ord for ThreadIds.
267 * ------------------------------------------------------------------------ */
268
269 int
270 cmp_thread(StgPtr tso1, StgPtr tso2)
271 {
272 StgThreadID id1 = ((StgTSO *)tso1)->id;
273 StgThreadID id2 = ((StgTSO *)tso2)->id;
274
275 if (id1 < id2) return (-1);
276 if (id1 > id2) return 1;
277 return 0;
278 }
279
280 /* ---------------------------------------------------------------------------
281 * Fetching the ThreadID from an StgTSO.
282 *
283 * This is used in the implementation of Show for ThreadIds.
284 * ------------------------------------------------------------------------ */
285 int
286 rts_getThreadId(StgPtr tso)
287 {
288 return ((StgTSO *)tso)->id;
289 }
290
291 /* -----------------------------------------------------------------------------
292 Remove a thread from a queue.
293 Fails fatally if the TSO is not on the queue.
294 -------------------------------------------------------------------------- */
295
296 void
297 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
298 {
299 StgTSO *t, *prev;
300
301 prev = NULL;
302 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
303 if (t == tso) {
304 if (prev) {
305 setTSOLink(cap,prev,t->_link);
306 } else {
307 *queue = t->_link;
308 }
309 return;
310 }
311 }
312 barf("removeThreadFromQueue: not found");
313 }
314
315 void
316 removeThreadFromDeQueue (Capability *cap,
317 StgTSO **head, StgTSO **tail, StgTSO *tso)
318 {
319 StgTSO *t, *prev;
320
321 prev = NULL;
322 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
323 if (t == tso) {
324 if (prev) {
325 setTSOLink(cap,prev,t->_link);
326 } else {
327 *head = t->_link;
328 }
329 if (*tail == tso) {
330 if (prev) {
331 *tail = prev;
332 } else {
333 *tail = END_TSO_QUEUE;
334 }
335 }
336 return;
337 }
338 }
339 barf("removeThreadFromMVarQueue: not found");
340 }
341
342 void
343 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
344 {
345 removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
346 }
347
348 /* ----------------------------------------------------------------------------
349 unblockOne()
350
351 unblock a single thread.
352 ------------------------------------------------------------------------- */
353
354 #if defined(GRAN)
355 STATIC_INLINE void
356 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
357 {
358 }
359 #elif defined(PARALLEL_HASKELL)
360 STATIC_INLINE void
361 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
362 {
363 /* write RESUME events to log file and
364 update blocked and fetch time (depending on type of the orig closure) */
365 if (RtsFlags.ParFlags.ParStats.Full) {
366 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
367 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
368 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
369 if (emptyRunQueue())
370 emitSchedule = rtsTrue;
371
372 switch (get_itbl(node)->type) {
373 case FETCH_ME_BQ:
374 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
375 break;
376 case RBH:
377 case FETCH_ME:
378 case BLACKHOLE_BQ:
379 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
380 break;
381 #ifdef DIST
382 case MVAR:
383 break;
384 #endif
385 default:
386 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
387 }
388 }
389 }
390 #endif
391
392 #if defined(GRAN)
393 StgBlockingQueueElement *
394 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
395 {
396 StgTSO *tso;
397 PEs node_loc, tso_loc;
398
399 node_loc = where_is(node); // should be lifted out of loop
400 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
401 tso_loc = where_is((StgClosure *)tso);
402 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
403 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
404 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
405 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
406 // insertThread(tso, node_loc);
407 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
408 ResumeThread,
409 tso, node, (rtsSpark*)NULL);
410 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
411 // len_local++;
412 // len++;
413 } else { // TSO is remote (actually should be FMBQ)
414 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
415 RtsFlags.GranFlags.Costs.gunblocktime +
416 RtsFlags.GranFlags.Costs.latency;
417 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
418 UnblockThread,
419 tso, node, (rtsSpark*)NULL);
420 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
421 // len++;
422 }
423 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
424 IF_GRAN_DEBUG(bq,
425 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
426 (node_loc==tso_loc ? "Local" : "Global"),
427 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
428 tso->block_info.closure = NULL;
429 debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
430 tso->id, tso));
431 }
432 #elif defined(PARALLEL_HASKELL)
433 StgBlockingQueueElement *
434 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
435 {
436 StgBlockingQueueElement *next;
437
438 switch (get_itbl(bqe)->type) {
439 case TSO:
440 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
441 /* if it's a TSO just push it onto the run_queue */
442 next = bqe->link;
443 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
444 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
445 threadRunnable();
446 unblockCount(bqe, node);
447 /* reset blocking status after dumping event */
448 ((StgTSO *)bqe)->why_blocked = NotBlocked;
449 break;
450
451 case BLOCKED_FETCH:
452 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
453 next = bqe->link;
454 bqe->link = (StgBlockingQueueElement *)PendingFetches;
455 PendingFetches = (StgBlockedFetch *)bqe;
456 break;
457
458 # if defined(DEBUG)
459 /* can ignore this case in a non-debugging setup;
460 see comments on RBHSave closures above */
461 case CONSTR:
462 /* check that the closure is an RBHSave closure */
463 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
464 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
465 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
466 break;
467
468 default:
469 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
470 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
471 (StgClosure *)bqe);
472 # endif
473 }
474 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
475 return next;
476 }
477 #endif
478
479 StgTSO *
480 unblockOne (Capability *cap, StgTSO *tso)
481 {
482 return unblockOne_(cap,tso,rtsTrue); // allow migration
483 }
484
485 StgTSO *
486 unblockOne_ (Capability *cap, StgTSO *tso,
487 rtsBool allow_migrate USED_IF_THREADS)
488 {
489 StgTSO *next;
490
491 // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
492 ASSERT(tso->why_blocked != NotBlocked);
493
494 tso->why_blocked = NotBlocked;
495 next = tso->_link;
496 tso->_link = END_TSO_QUEUE;
497
498 #if defined(THREADED_RTS)
499 if (tso->cap == cap || (!tsoLocked(tso) &&
500 allow_migrate &&
501 RtsFlags.ParFlags.wakeupMigrate)) {
502 // We are waking up this thread on the current Capability, which
503 // might involve migrating it from the Capability it was last on.
504 if (tso->bound) {
505 ASSERT(tso->bound->cap == tso->cap);
506 tso->bound->cap = cap;
507 }
508
509 tso->cap = cap;
510 appendToRunQueue(cap,tso);
511
512 // context-switch soonish so we can migrate the new thread if
513 // necessary. NB. not contextSwitchCapability(cap), which would
514 // force a context switch immediately.
515 cap->context_switch = 1;
516 } else {
517 // we'll try to wake it up on the Capability it was last on.
518 wakeupThreadOnCapability(cap, tso->cap, tso);
519 }
520 #else
521 appendToRunQueue(cap,tso);
522
523 // context-switch soonish so we can migrate the new thread if
524 // necessary. NB. not contextSwitchCapability(cap), which would
525 // force a context switch immediately.
526 cap->context_switch = 1;
527 #endif
528
529 postEvent (cap, EVENT_THREAD_WAKEUP, tso->id, tso->cap->no);
530
531 debugTrace(DEBUG_sched, "waking up thread %ld on cap %d",
532 (long)tso->id, tso->cap->no);
533
534 return next;
535 }
536
537 /* ----------------------------------------------------------------------------
538 awakenBlockedQueue
539
540 wakes up all the threads on the specified queue.
541 ------------------------------------------------------------------------- */
542
543 #if defined(GRAN)
544 void
545 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
546 {
547 StgBlockingQueueElement *bqe;
548 PEs node_loc;
549 nat len = 0;
550
551 IF_GRAN_DEBUG(bq,
552 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
553 node, CurrentProc, CurrentTime[CurrentProc],
554 CurrentTSO->id, CurrentTSO));
555
556 node_loc = where_is(node);
557
558 ASSERT(q == END_BQ_QUEUE ||
559 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
560 get_itbl(q)->type == CONSTR); // closure (type constructor)
561 ASSERT(is_unique(node));
562
563 /* FAKE FETCH: magically copy the node to the tso's proc;
564 no Fetch necessary because in reality the node should not have been
565 moved to the other PE in the first place
566 */
567 if (CurrentProc!=node_loc) {
568 IF_GRAN_DEBUG(bq,
569 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
570 node, node_loc, CurrentProc, CurrentTSO->id,
571 // CurrentTSO, where_is(CurrentTSO),
572 node->header.gran.procs));
573 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
574 IF_GRAN_DEBUG(bq,
575 debugBelch("## new bitmask of node %p is %#x\n",
576 node, node->header.gran.procs));
577 if (RtsFlags.GranFlags.GranSimStats.Global) {
578 globalGranStats.tot_fake_fetches++;
579 }
580 }
581
582 bqe = q;
583 // ToDo: check: ASSERT(CurrentProc==node_loc);
584 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
585 //next = bqe->link;
586 /*
587 bqe points to the current element in the queue
588 next points to the next element in the queue
589 */
590 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
591 //tso_loc = where_is(tso);
592 len++;
593 bqe = unblockOne(bqe, node);
594 }
595
596 /* if this is the BQ of an RBH, we have to put back the info ripped out of
597 the closure to make room for the anchor of the BQ */
598 if (bqe!=END_BQ_QUEUE) {
599 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
600 /*
601 ASSERT((info_ptr==&RBH_Save_0_info) ||
602 (info_ptr==&RBH_Save_1_info) ||
603 (info_ptr==&RBH_Save_2_info));
604 */
605 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
606 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
607 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
608
609 IF_GRAN_DEBUG(bq,
610 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
611 node, info_type(node)));
612 }
613
614 /* statistics gathering */
615 if (RtsFlags.GranFlags.GranSimStats.Global) {
616 // globalGranStats.tot_bq_processing_time += bq_processing_time;
617 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
618 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
619 globalGranStats.tot_awbq++; // total no. of bqs awakened
620 }
621 IF_GRAN_DEBUG(bq,
622 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
623 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
624 }
625 #elif defined(PARALLEL_HASKELL)
626 void
627 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
628 {
629 StgBlockingQueueElement *bqe;
630
631 IF_PAR_DEBUG(verbose,
632 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
633 node, mytid));
634 #ifdef DIST
635 //RFP
636 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
637 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
638 return;
639 }
640 #endif
641
642 ASSERT(q == END_BQ_QUEUE ||
643 get_itbl(q)->type == TSO ||
644 get_itbl(q)->type == BLOCKED_FETCH ||
645 get_itbl(q)->type == CONSTR);
646
647 bqe = q;
648 while (get_itbl(bqe)->type==TSO ||
649 get_itbl(bqe)->type==BLOCKED_FETCH) {
650 bqe = unblockOne(bqe, node);
651 }
652 }
653
654 #else /* !GRAN && !PARALLEL_HASKELL */
655
656 void
657 awakenBlockedQueue(Capability *cap, StgTSO *tso)
658 {
659 while (tso != END_TSO_QUEUE) {
660 tso = unblockOne(cap,tso);
661 }
662 }
663 #endif
664
665
666 /* ---------------------------------------------------------------------------
667 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
668 * used by Control.Concurrent for error checking.
669 * ------------------------------------------------------------------------- */
670
671 HsBool
672 rtsSupportsBoundThreads(void)
673 {
674 #if defined(THREADED_RTS)
675 return HS_BOOL_TRUE;
676 #else
677 return HS_BOOL_FALSE;
678 #endif
679 }
680
681 /* ---------------------------------------------------------------------------
682 * isThreadBound(tso): check whether tso is bound to an OS thread.
683 * ------------------------------------------------------------------------- */
684
685 StgBool
686 isThreadBound(StgTSO* tso USED_IF_THREADS)
687 {
688 #if defined(THREADED_RTS)
689 return (tso->bound != NULL);
690 #endif
691 return rtsFalse;
692 }
693
694 /* ----------------------------------------------------------------------------
695 * Debugging: why is a thread blocked
696 * ------------------------------------------------------------------------- */
697
698 #if DEBUG
699 void
700 printThreadBlockage(StgTSO *tso)
701 {
702 switch (tso->why_blocked) {
703 case BlockedOnRead:
704 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
705 break;
706 case BlockedOnWrite:
707 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
708 break;
709 #if defined(mingw32_HOST_OS)
710 case BlockedOnDoProc:
711 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
712 break;
713 #endif
714 case BlockedOnDelay:
715 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
716 break;
717 case BlockedOnMVar:
718 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
719 break;
720 case BlockedOnException:
721 debugBelch("is blocked on delivering an exception to thread %lu",
722 (unsigned long)tso->block_info.tso->id);
723 break;
724 case BlockedOnBlackHole:
725 debugBelch("is blocked on a black hole");
726 break;
727 case NotBlocked:
728 debugBelch("is not blocked");
729 break;
730 #if defined(PARALLEL_HASKELL)
731 case BlockedOnGA:
732 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
733 tso->block_info.closure, info_type(tso->block_info.closure));
734 break;
735 case BlockedOnGA_NoSend:
736 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
737 tso->block_info.closure, info_type(tso->block_info.closure));
738 break;
739 #endif
740 case BlockedOnCCall:
741 debugBelch("is blocked on an external call");
742 break;
743 case BlockedOnCCall_NoUnblockExc:
744 debugBelch("is blocked on an external call (exceptions were already blocked)");
745 break;
746 case BlockedOnSTM:
747 debugBelch("is blocked on an STM operation");
748 break;
749 default:
750 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
751 tso->why_blocked, tso->id, tso);
752 }
753 }
754
755 void
756 printThreadStatus(StgTSO *t)
757 {
758 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
759 {
760 void *label = lookupThreadLabel(t->id);
761 if (label) debugBelch("[\"%s\"] ",(char *)label);
762 }
763 if (t->what_next == ThreadRelocated) {
764 debugBelch("has been relocated...\n");
765 } else {
766 switch (t->what_next) {
767 case ThreadKilled:
768 debugBelch("has been killed");
769 break;
770 case ThreadComplete:
771 debugBelch("has completed");
772 break;
773 default:
774 printThreadBlockage(t);
775 }
776 if (t->flags & TSO_DIRTY) {
777 debugBelch(" (TSO_DIRTY)");
778 } else if (t->flags & TSO_LINK_DIRTY) {
779 debugBelch(" (TSO_LINK_DIRTY)");
780 }
781 debugBelch("\n");
782 }
783 }
784
785 void
786 printAllThreads(void)
787 {
788 StgTSO *t, *next;
789 nat i, s;
790 Capability *cap;
791
792 # if defined(GRAN)
793 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
794 ullong_format_string(TIME_ON_PROC(CurrentProc),
795 time_string, rtsFalse/*no commas!*/);
796
797 debugBelch("all threads at [%s]:\n", time_string);
798 # elif defined(PARALLEL_HASKELL)
799 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
800 ullong_format_string(CURRENT_TIME,
801 time_string, rtsFalse/*no commas!*/);
802
803 debugBelch("all threads at [%s]:\n", time_string);
804 # else
805 debugBelch("all threads:\n");
806 # endif
807
808 for (i = 0; i < n_capabilities; i++) {
809 cap = &capabilities[i];
810 debugBelch("threads on capability %d:\n", cap->no);
811 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
812 printThreadStatus(t);
813 }
814 }
815
816 debugBelch("other threads:\n");
817 for (s = 0; s < total_steps; s++) {
818 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
819 if (t->why_blocked != NotBlocked) {
820 printThreadStatus(t);
821 }
822 if (t->what_next == ThreadRelocated) {
823 next = t->_link;
824 } else {
825 next = t->global_link;
826 }
827 }
828 }
829 }
830
831 // useful from gdb
832 void
833 printThreadQueue(StgTSO *t)
834 {
835 nat i = 0;
836 for (; t != END_TSO_QUEUE; t = t->_link) {
837 printThreadStatus(t);
838 i++;
839 }
840 debugBelch("%d threads on queue\n", i);
841 }
842
843 /*
844 Print a whole blocking queue attached to node (debugging only).
845 */
846 # if defined(PARALLEL_HASKELL)
847 void
848 print_bq (StgClosure *node)
849 {
850 StgBlockingQueueElement *bqe;
851 StgTSO *tso;
852 rtsBool end;
853
854 debugBelch("## BQ of closure %p (%s): ",
855 node, info_type(node));
856
857 /* should cover all closures that may have a blocking queue */
858 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
859 get_itbl(node)->type == FETCH_ME_BQ ||
860 get_itbl(node)->type == RBH ||
861 get_itbl(node)->type == MVAR);
862
863 ASSERT(node!=(StgClosure*)NULL); // sanity check
864
865 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
866 }
867
868 /*
869 Print a whole blocking queue starting with the element bqe.
870 */
871 void
872 print_bqe (StgBlockingQueueElement *bqe)
873 {
874 rtsBool end;
875
876 /*
877 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
878 */
879 for (end = (bqe==END_BQ_QUEUE);
880 !end; // iterate until bqe points to a CONSTR
881 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
882 bqe = end ? END_BQ_QUEUE : bqe->link) {
883 ASSERT(bqe != END_BQ_QUEUE); // sanity check
884 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
885 /* types of closures that may appear in a blocking queue */
886 ASSERT(get_itbl(bqe)->type == TSO ||
887 get_itbl(bqe)->type == BLOCKED_FETCH ||
888 get_itbl(bqe)->type == CONSTR);
889 /* only BQs of an RBH end with an RBH_Save closure */
890 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
891
892 switch (get_itbl(bqe)->type) {
893 case TSO:
894 debugBelch(" TSO %u (%x),",
895 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
896 break;
897 case BLOCKED_FETCH:
898 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
899 ((StgBlockedFetch *)bqe)->node,
900 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
901 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
902 ((StgBlockedFetch *)bqe)->ga.weight);
903 break;
904 case CONSTR:
905 debugBelch(" %s (IP %p),",
906 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
907 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
908 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
909 "RBH_Save_?"), get_itbl(bqe));
910 break;
911 default:
912 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
913 info_type((StgClosure *)bqe)); // , node, info_type(node));
914 break;
915 }
916 } /* for */
917 debugBelch("\n");
918 }
919 # elif defined(GRAN)
920 void
921 print_bq (StgClosure *node)
922 {
923 StgBlockingQueueElement *bqe;
924 PEs node_loc, tso_loc;
925 rtsBool end;
926
927 /* should cover all closures that may have a blocking queue */
928 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
929 get_itbl(node)->type == FETCH_ME_BQ ||
930 get_itbl(node)->type == RBH);
931
932 ASSERT(node!=(StgClosure*)NULL); // sanity check
933 node_loc = where_is(node);
934
935 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
936 node, info_type(node), node_loc);
937
938 /*
939 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
940 */
941 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
942 !end; // iterate until bqe points to a CONSTR
943 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
944 ASSERT(bqe != END_BQ_QUEUE); // sanity check
945 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
946 /* types of closures that may appear in a blocking queue */
947 ASSERT(get_itbl(bqe)->type == TSO ||
948 get_itbl(bqe)->type == CONSTR);
949 /* only BQs of an RBH end with an RBH_Save closure */
950 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
951
952 tso_loc = where_is((StgClosure *)bqe);
953 switch (get_itbl(bqe)->type) {
954 case TSO:
955 debugBelch(" TSO %d (%p) on [PE %d],",
956 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
957 break;
958 case CONSTR:
959 debugBelch(" %s (IP %p),",
960 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
961 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
962 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
963 "RBH_Save_?"), get_itbl(bqe));
964 break;
965 default:
966 barf("Unexpected closure type %s in blocking queue of %p (%s)",
967 info_type((StgClosure *)bqe), node, info_type(node));
968 break;
969 }
970 } /* for */
971 debugBelch("\n");
972 }
973 # endif
974
975 #if defined(PARALLEL_HASKELL)
976 nat
977 run_queue_len(void)
978 {
979 nat i;
980 StgTSO *tso;
981
982 for (i=0, tso=run_queue_hd;
983 tso != END_TSO_QUEUE;
984 i++, tso=tso->link) {
985 /* nothing */
986 }
987
988 return i;
989 }
990 #endif
991
992 #endif /* DEBUG */