FIX #2164: check for ThreadRelocated in isAlive()
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
36 # include "GranSim.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
40 # include "FetchMe.h"
41 # include "HLC.h"
42 #endif
43 #include "Sparks.h"
44 #include "Capability.h"
45 #include "Task.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
49 #endif
50 #include "Trace.h"
51 #include "RaiseAsync.h"
52 #include "Threads.h"
53 #include "ThrIOManager.h"
54
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
57 #endif
58 #ifdef HAVE_UNISTD_H
59 #include <unistd.h>
60 #endif
61
62 #include <string.h>
63 #include <stdlib.h>
64 #include <stdarg.h>
65
66 #ifdef HAVE_ERRNO_H
67 #include <errno.h>
68 #endif
69
70 // Turn off inlining when debugging - it obfuscates things
71 #ifdef DEBUG
72 # undef STATIC_INLINE
73 # define STATIC_INLINE static
74 #endif
75
76 /* -----------------------------------------------------------------------------
77 * Global variables
78 * -------------------------------------------------------------------------- */
79
80 #if defined(GRAN)
81
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
84
85 /*
86 In GranSim we have a runnable and a blocked queue for each processor.
87 In order to minimise code changes new arrays run_queue_hds/tls
88 are created. run_queue_hd is then a short cut (macro) for
89 run_queue_hds[CurrentProc] (see GranSim.h).
90 -- HWL
91 */
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96 the std RTS (i.e. we are cheating). However, we don't use this list in
97 the GranSim specific code at the moment (so we are only potentially
98 cheating). */
99
100 #else /* !GRAN */
101
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
107 #endif
108
109 /* Threads blocked on blackholes.
110 * LOCK: sched_mutex+capability, or all capabilities
111 */
112 StgTSO *blackhole_queue = NULL;
113 #endif
114
115 /* The blackhole_queue should be checked for threads to wake up. See
116 * Schedule.h for more thorough comment.
117 * LOCK: none (doesn't matter if we miss an update)
118 */
119 rtsBool blackholes_need_checking = rtsFalse;
120
121 /* flag set by signal handler to precipitate a context switch
122 * LOCK: none (just an advisory flag)
123 */
124 int context_switch = 0;
125
126 /* flag that tracks whether we have done any execution in this time slice.
127 * LOCK: currently none, perhaps we should lock (but needs to be
128 * updated in the fast path of the scheduler).
129 */
130 nat recent_activity = ACTIVITY_YES;
131
132 /* if this flag is set as well, give up execution
133 * LOCK: none (changes once, from false->true)
134 */
135 rtsBool sched_state = SCHED_RUNNING;
136
137 #if defined(GRAN)
138 StgTSO *CurrentTSO;
139 #endif
140
141 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
142 * exists - earlier gccs apparently didn't.
143 * -= chak
144 */
145 StgTSO dummy_tso;
146
147 /*
148 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
149 * in an MT setting, needed to signal that a worker thread shouldn't hang around
150 * in the scheduler when it is out of work.
151 */
152 rtsBool shutting_down_scheduler = rtsFalse;
153
154 /*
155 * This mutex protects most of the global scheduler data in
156 * the THREADED_RTS runtime.
157 */
158 #if defined(THREADED_RTS)
159 Mutex sched_mutex;
160 #endif
161
162 #if defined(PARALLEL_HASKELL)
163 StgTSO *LastTSO;
164 rtsTime TimeOfLastYield;
165 rtsBool emitSchedule = rtsTrue;
166 #endif
167
168 #if !defined(mingw32_HOST_OS)
169 #define FORKPROCESS_PRIMOP_SUPPORTED
170 #endif
171
172 /* -----------------------------------------------------------------------------
173 * static function prototypes
174 * -------------------------------------------------------------------------- */
175
176 static Capability *schedule (Capability *initialCapability, Task *task);
177
178 //
179 // These function all encapsulate parts of the scheduler loop, and are
180 // abstracted only to make the structure and control flow of the
181 // scheduler clearer.
182 //
183 static void schedulePreLoop (void);
184 #if defined(THREADED_RTS)
185 static void schedulePushWork(Capability *cap, Task *task);
186 #endif
187 static void scheduleStartSignalHandlers (Capability *cap);
188 static void scheduleCheckBlockedThreads (Capability *cap);
189 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
190 static void scheduleCheckBlackHoles (Capability *cap);
191 static void scheduleDetectDeadlock (Capability *cap, Task *task);
192 #if defined(GRAN)
193 static StgTSO *scheduleProcessEvent(rtsEvent *event);
194 #endif
195 #if defined(PARALLEL_HASKELL)
196 static StgTSO *scheduleSendPendingMessages(void);
197 static void scheduleActivateSpark(void);
198 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
199 #endif
200 #if defined(PAR) || defined(GRAN)
201 static void scheduleGranParReport(void);
202 #endif
203 static void schedulePostRunThread(StgTSO *t);
204 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
205 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
206 StgTSO *t);
207 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
208 nat prev_what_next );
209 static void scheduleHandleThreadBlocked( StgTSO *t );
210 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
211 StgTSO *t );
212 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
213 static Capability *scheduleDoGC(Capability *cap, Task *task,
214 rtsBool force_major);
215
216 static rtsBool checkBlackHoles(Capability *cap);
217
218 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
219 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
220
221 static void deleteThread (Capability *cap, StgTSO *tso);
222 static void deleteAllThreads (Capability *cap);
223
224 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
225 static void deleteThread_(Capability *cap, StgTSO *tso);
226 #endif
227
228 #if defined(PARALLEL_HASKELL)
229 StgTSO * createSparkThread(rtsSpark spark);
230 StgTSO * activateSpark (rtsSpark spark);
231 #endif
232
233 #ifdef DEBUG
234 static char *whatNext_strs[] = {
235 "(unknown)",
236 "ThreadRunGHC",
237 "ThreadInterpret",
238 "ThreadKilled",
239 "ThreadRelocated",
240 "ThreadComplete"
241 };
242 #endif
243
244 /* -----------------------------------------------------------------------------
245 * Putting a thread on the run queue: different scheduling policies
246 * -------------------------------------------------------------------------- */
247
248 STATIC_INLINE void
249 addToRunQueue( Capability *cap, StgTSO *t )
250 {
251 #if defined(PARALLEL_HASKELL)
252 if (RtsFlags.ParFlags.doFairScheduling) {
253 // this does round-robin scheduling; good for concurrency
254 appendToRunQueue(cap,t);
255 } else {
256 // this does unfair scheduling; good for parallelism
257 pushOnRunQueue(cap,t);
258 }
259 #else
260 // this does round-robin scheduling; good for concurrency
261 appendToRunQueue(cap,t);
262 #endif
263 }
264
265 /* ---------------------------------------------------------------------------
266 Main scheduling loop.
267
268 We use round-robin scheduling, each thread returning to the
269 scheduler loop when one of these conditions is detected:
270
271 * out of heap space
272 * timer expires (thread yields)
273 * thread blocks
274 * thread ends
275 * stack overflow
276
277 GRAN version:
278 In a GranSim setup this loop iterates over the global event queue.
279 This revolves around the global event queue, which determines what
280 to do next. Therefore, it's more complicated than either the
281 concurrent or the parallel (GUM) setup.
282
283 GUM version:
284 GUM iterates over incoming messages.
285 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
286 and sends out a fish whenever it has nothing to do; in-between
287 doing the actual reductions (shared code below) it processes the
288 incoming messages and deals with delayed operations
289 (see PendingFetches).
290 This is not the ugliest code you could imagine, but it's bloody close.
291
292 ------------------------------------------------------------------------ */
293
294 static Capability *
295 schedule (Capability *initialCapability, Task *task)
296 {
297 StgTSO *t;
298 Capability *cap;
299 StgThreadReturnCode ret;
300 #if defined(GRAN)
301 rtsEvent *event;
302 #elif defined(PARALLEL_HASKELL)
303 StgTSO *tso;
304 GlobalTaskId pe;
305 rtsBool receivedFinish = rtsFalse;
306 # if defined(DEBUG)
307 nat tp_size, sp_size; // stats only
308 # endif
309 #endif
310 nat prev_what_next;
311 rtsBool ready_to_gc;
312 #if defined(THREADED_RTS)
313 rtsBool first = rtsTrue;
314 #endif
315
316 cap = initialCapability;
317
318 // Pre-condition: this task owns initialCapability.
319 // The sched_mutex is *NOT* held
320 // NB. on return, we still hold a capability.
321
322 debugTrace (DEBUG_sched,
323 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
324 task, initialCapability);
325
326 schedulePreLoop();
327
328 // -----------------------------------------------------------
329 // Scheduler loop starts here:
330
331 #if defined(PARALLEL_HASKELL)
332 #define TERMINATION_CONDITION (!receivedFinish)
333 #elif defined(GRAN)
334 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
335 #else
336 #define TERMINATION_CONDITION rtsTrue
337 #endif
338
339 while (TERMINATION_CONDITION) {
340
341 #if defined(GRAN)
342 /* Choose the processor with the next event */
343 CurrentProc = event->proc;
344 CurrentTSO = event->tso;
345 #endif
346
347 #if defined(THREADED_RTS)
348 if (first) {
349 // don't yield the first time, we want a chance to run this
350 // thread for a bit, even if there are others banging at the
351 // door.
352 first = rtsFalse;
353 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
354 } else {
355 // Yield the capability to higher-priority tasks if necessary.
356 yieldCapability(&cap, task);
357 }
358 #endif
359
360 #if defined(THREADED_RTS)
361 schedulePushWork(cap,task);
362 #endif
363
364 // Check whether we have re-entered the RTS from Haskell without
365 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
366 // call).
367 if (cap->in_haskell) {
368 errorBelch("schedule: re-entered unsafely.\n"
369 " Perhaps a 'foreign import unsafe' should be 'safe'?");
370 stg_exit(EXIT_FAILURE);
371 }
372
373 // The interruption / shutdown sequence.
374 //
375 // In order to cleanly shut down the runtime, we want to:
376 // * make sure that all main threads return to their callers
377 // with the state 'Interrupted'.
378 // * clean up all OS threads assocated with the runtime
379 // * free all memory etc.
380 //
381 // So the sequence for ^C goes like this:
382 //
383 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
384 // arranges for some Capability to wake up
385 //
386 // * all threads in the system are halted, and the zombies are
387 // placed on the run queue for cleaning up. We acquire all
388 // the capabilities in order to delete the threads, this is
389 // done by scheduleDoGC() for convenience (because GC already
390 // needs to acquire all the capabilities). We can't kill
391 // threads involved in foreign calls.
392 //
393 // * somebody calls shutdownHaskell(), which calls exitScheduler()
394 //
395 // * sched_state := SCHED_SHUTTING_DOWN
396 //
397 // * all workers exit when the run queue on their capability
398 // drains. All main threads will also exit when their TSO
399 // reaches the head of the run queue and they can return.
400 //
401 // * eventually all Capabilities will shut down, and the RTS can
402 // exit.
403 //
404 // * We might be left with threads blocked in foreign calls,
405 // we should really attempt to kill these somehow (TODO);
406
407 switch (sched_state) {
408 case SCHED_RUNNING:
409 break;
410 case SCHED_INTERRUPTING:
411 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
412 #if defined(THREADED_RTS)
413 discardSparksCap(cap);
414 #endif
415 /* scheduleDoGC() deletes all the threads */
416 cap = scheduleDoGC(cap,task,rtsFalse);
417 break;
418 case SCHED_SHUTTING_DOWN:
419 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
420 // If we are a worker, just exit. If we're a bound thread
421 // then we will exit below when we've removed our TSO from
422 // the run queue.
423 if (task->tso == NULL && emptyRunQueue(cap)) {
424 return cap;
425 }
426 break;
427 default:
428 barf("sched_state: %d", sched_state);
429 }
430
431 #if defined(THREADED_RTS)
432 // If the run queue is empty, take a spark and turn it into a thread.
433 {
434 if (emptyRunQueue(cap)) {
435 StgClosure *spark;
436 spark = findSpark(cap);
437 if (spark != NULL) {
438 debugTrace(DEBUG_sched,
439 "turning spark of closure %p into a thread",
440 (StgClosure *)spark);
441 createSparkThread(cap,spark);
442 }
443 }
444 }
445 #endif // THREADED_RTS
446
447 scheduleStartSignalHandlers(cap);
448
449 // Only check the black holes here if we've nothing else to do.
450 // During normal execution, the black hole list only gets checked
451 // at GC time, to avoid repeatedly traversing this possibly long
452 // list each time around the scheduler.
453 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
454
455 scheduleCheckWakeupThreads(cap);
456
457 scheduleCheckBlockedThreads(cap);
458
459 scheduleDetectDeadlock(cap,task);
460 #if defined(THREADED_RTS)
461 cap = task->cap; // reload cap, it might have changed
462 #endif
463
464 // Normally, the only way we can get here with no threads to
465 // run is if a keyboard interrupt received during
466 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
467 // Additionally, it is not fatal for the
468 // threaded RTS to reach here with no threads to run.
469 //
470 // win32: might be here due to awaitEvent() being abandoned
471 // as a result of a console event having been delivered.
472 if ( emptyRunQueue(cap) ) {
473 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
474 ASSERT(sched_state >= SCHED_INTERRUPTING);
475 #endif
476 continue; // nothing to do
477 }
478
479 #if defined(PARALLEL_HASKELL)
480 scheduleSendPendingMessages();
481 if (emptyRunQueue(cap) && scheduleActivateSpark())
482 continue;
483
484 #if defined(SPARKS)
485 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
486 #endif
487
488 /* If we still have no work we need to send a FISH to get a spark
489 from another PE */
490 if (emptyRunQueue(cap)) {
491 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
492 ASSERT(rtsFalse); // should not happen at the moment
493 }
494 // from here: non-empty run queue.
495 // TODO: merge above case with this, only one call processMessages() !
496 if (PacketsWaiting()) { /* process incoming messages, if
497 any pending... only in else
498 because getRemoteWork waits for
499 messages as well */
500 receivedFinish = processMessages();
501 }
502 #endif
503
504 #if defined(GRAN)
505 scheduleProcessEvent(event);
506 #endif
507
508 //
509 // Get a thread to run
510 //
511 t = popRunQueue(cap);
512
513 #if defined(GRAN) || defined(PAR)
514 scheduleGranParReport(); // some kind of debuging output
515 #else
516 // Sanity check the thread we're about to run. This can be
517 // expensive if there is lots of thread switching going on...
518 IF_DEBUG(sanity,checkTSO(t));
519 #endif
520
521 #if defined(THREADED_RTS)
522 // Check whether we can run this thread in the current task.
523 // If not, we have to pass our capability to the right task.
524 {
525 Task *bound = t->bound;
526
527 if (bound) {
528 if (bound == task) {
529 debugTrace(DEBUG_sched,
530 "### Running thread %lu in bound thread", (unsigned long)t->id);
531 // yes, the Haskell thread is bound to the current native thread
532 } else {
533 debugTrace(DEBUG_sched,
534 "### thread %lu bound to another OS thread", (unsigned long)t->id);
535 // no, bound to a different Haskell thread: pass to that thread
536 pushOnRunQueue(cap,t);
537 continue;
538 }
539 } else {
540 // The thread we want to run is unbound.
541 if (task->tso) {
542 debugTrace(DEBUG_sched,
543 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
544 // no, the current native thread is bound to a different
545 // Haskell thread, so pass it to any worker thread
546 pushOnRunQueue(cap,t);
547 continue;
548 }
549 }
550 }
551 #endif
552
553 cap->r.rCurrentTSO = t;
554
555 /* context switches are initiated by the timer signal, unless
556 * the user specified "context switch as often as possible", with
557 * +RTS -C0
558 */
559 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
560 && !emptyThreadQueues(cap)) {
561 context_switch = 1;
562 }
563
564 run_thread:
565
566 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
567 (long)t->id, whatNext_strs[t->what_next]);
568
569 startHeapProfTimer();
570
571 // Check for exceptions blocked on this thread
572 maybePerformBlockedException (cap, t);
573
574 // ----------------------------------------------------------------------
575 // Run the current thread
576
577 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
578 ASSERT(t->cap == cap);
579
580 prev_what_next = t->what_next;
581
582 errno = t->saved_errno;
583 #if mingw32_HOST_OS
584 SetLastError(t->saved_winerror);
585 #endif
586
587 cap->in_haskell = rtsTrue;
588
589 dirty_TSO(cap,t);
590
591 #if defined(THREADED_RTS)
592 if (recent_activity == ACTIVITY_DONE_GC) {
593 // ACTIVITY_DONE_GC means we turned off the timer signal to
594 // conserve power (see #1623). Re-enable it here.
595 nat prev;
596 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
597 if (prev == ACTIVITY_DONE_GC) {
598 startTimer();
599 }
600 } else {
601 recent_activity = ACTIVITY_YES;
602 }
603 #endif
604
605 switch (prev_what_next) {
606
607 case ThreadKilled:
608 case ThreadComplete:
609 /* Thread already finished, return to scheduler. */
610 ret = ThreadFinished;
611 break;
612
613 case ThreadRunGHC:
614 {
615 StgRegTable *r;
616 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
617 cap = regTableToCapability(r);
618 ret = r->rRet;
619 break;
620 }
621
622 case ThreadInterpret:
623 cap = interpretBCO(cap);
624 ret = cap->r.rRet;
625 break;
626
627 default:
628 barf("schedule: invalid what_next field");
629 }
630
631 cap->in_haskell = rtsFalse;
632
633 // The TSO might have moved, eg. if it re-entered the RTS and a GC
634 // happened. So find the new location:
635 t = cap->r.rCurrentTSO;
636
637 // We have run some Haskell code: there might be blackhole-blocked
638 // threads to wake up now.
639 // Lock-free test here should be ok, we're just setting a flag.
640 if ( blackhole_queue != END_TSO_QUEUE ) {
641 blackholes_need_checking = rtsTrue;
642 }
643
644 // And save the current errno in this thread.
645 // XXX: possibly bogus for SMP because this thread might already
646 // be running again, see code below.
647 t->saved_errno = errno;
648 #if mingw32_HOST_OS
649 // Similarly for Windows error code
650 t->saved_winerror = GetLastError();
651 #endif
652
653 #if defined(THREADED_RTS)
654 // If ret is ThreadBlocked, and this Task is bound to the TSO that
655 // blocked, we are in limbo - the TSO is now owned by whatever it
656 // is blocked on, and may in fact already have been woken up,
657 // perhaps even on a different Capability. It may be the case
658 // that task->cap != cap. We better yield this Capability
659 // immediately and return to normaility.
660 if (ret == ThreadBlocked) {
661 debugTrace(DEBUG_sched,
662 "--<< thread %lu (%s) stopped: blocked",
663 (unsigned long)t->id, whatNext_strs[t->what_next]);
664 continue;
665 }
666 #endif
667
668 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
669 ASSERT(t->cap == cap);
670
671 // ----------------------------------------------------------------------
672
673 // Costs for the scheduler are assigned to CCS_SYSTEM
674 stopHeapProfTimer();
675 #if defined(PROFILING)
676 CCCS = CCS_SYSTEM;
677 #endif
678
679 schedulePostRunThread(t);
680
681 t = threadStackUnderflow(task,t);
682
683 ready_to_gc = rtsFalse;
684
685 switch (ret) {
686 case HeapOverflow:
687 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
688 break;
689
690 case StackOverflow:
691 scheduleHandleStackOverflow(cap,task,t);
692 break;
693
694 case ThreadYielding:
695 if (scheduleHandleYield(cap, t, prev_what_next)) {
696 // shortcut for switching between compiler/interpreter:
697 goto run_thread;
698 }
699 break;
700
701 case ThreadBlocked:
702 scheduleHandleThreadBlocked(t);
703 break;
704
705 case ThreadFinished:
706 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
707 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
708 break;
709
710 default:
711 barf("schedule: invalid thread return code %d", (int)ret);
712 }
713
714 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
715 cap = scheduleDoGC(cap,task,rtsFalse);
716 }
717 } /* end of while() */
718 }
719
720 /* ----------------------------------------------------------------------------
721 * Setting up the scheduler loop
722 * ------------------------------------------------------------------------- */
723
724 static void
725 schedulePreLoop(void)
726 {
727 #if defined(GRAN)
728 /* set up first event to get things going */
729 /* ToDo: assign costs for system setup and init MainTSO ! */
730 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
731 ContinueThread,
732 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
733
734 debugTrace (DEBUG_gran,
735 "GRAN: Init CurrentTSO (in schedule) = %p",
736 CurrentTSO);
737 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
738
739 if (RtsFlags.GranFlags.Light) {
740 /* Save current time; GranSim Light only */
741 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
742 }
743 #endif
744 }
745
746 /* -----------------------------------------------------------------------------
747 * schedulePushWork()
748 *
749 * Push work to other Capabilities if we have some.
750 * -------------------------------------------------------------------------- */
751
752 #if defined(THREADED_RTS)
753 static void
754 schedulePushWork(Capability *cap USED_IF_THREADS,
755 Task *task USED_IF_THREADS)
756 {
757 Capability *free_caps[n_capabilities], *cap0;
758 nat i, n_free_caps;
759
760 // migration can be turned off with +RTS -qg
761 if (!RtsFlags.ParFlags.migrate) return;
762
763 // Check whether we have more threads on our run queue, or sparks
764 // in our pool, that we could hand to another Capability.
765 if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
766 && sparkPoolSizeCap(cap) < 2) {
767 return;
768 }
769
770 // First grab as many free Capabilities as we can.
771 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
772 cap0 = &capabilities[i];
773 if (cap != cap0 && tryGrabCapability(cap0,task)) {
774 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
775 // it already has some work, we just grabbed it at
776 // the wrong moment. Or maybe it's deadlocked!
777 releaseCapability(cap0);
778 } else {
779 free_caps[n_free_caps++] = cap0;
780 }
781 }
782 }
783
784 // we now have n_free_caps free capabilities stashed in
785 // free_caps[]. Share our run queue equally with them. This is
786 // probably the simplest thing we could do; improvements we might
787 // want to do include:
788 //
789 // - giving high priority to moving relatively new threads, on
790 // the gournds that they haven't had time to build up a
791 // working set in the cache on this CPU/Capability.
792 //
793 // - giving low priority to moving long-lived threads
794
795 if (n_free_caps > 0) {
796 StgTSO *prev, *t, *next;
797 rtsBool pushed_to_all;
798
799 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
800
801 i = 0;
802 pushed_to_all = rtsFalse;
803
804 if (cap->run_queue_hd != END_TSO_QUEUE) {
805 prev = cap->run_queue_hd;
806 t = prev->_link;
807 prev->_link = END_TSO_QUEUE;
808 for (; t != END_TSO_QUEUE; t = next) {
809 next = t->_link;
810 t->_link = END_TSO_QUEUE;
811 if (t->what_next == ThreadRelocated
812 || t->bound == task // don't move my bound thread
813 || tsoLocked(t)) { // don't move a locked thread
814 setTSOLink(cap, prev, t);
815 prev = t;
816 } else if (i == n_free_caps) {
817 pushed_to_all = rtsTrue;
818 i = 0;
819 // keep one for us
820 setTSOLink(cap, prev, t);
821 prev = t;
822 } else {
823 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
824 appendToRunQueue(free_caps[i],t);
825 if (t->bound) { t->bound->cap = free_caps[i]; }
826 t->cap = free_caps[i];
827 i++;
828 }
829 }
830 cap->run_queue_tl = prev;
831 }
832
833 // If there are some free capabilities that we didn't push any
834 // threads to, then try to push a spark to each one.
835 if (!pushed_to_all) {
836 StgClosure *spark;
837 // i is the next free capability to push to
838 for (; i < n_free_caps; i++) {
839 if (emptySparkPoolCap(free_caps[i])) {
840 spark = findSpark(cap);
841 if (spark != NULL) {
842 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
843 newSpark(&(free_caps[i]->r), spark);
844 }
845 }
846 }
847 }
848
849 // release the capabilities
850 for (i = 0; i < n_free_caps; i++) {
851 task->cap = free_caps[i];
852 releaseCapability(free_caps[i]);
853 }
854 }
855 task->cap = cap; // reset to point to our Capability.
856 }
857 #endif
858
859 /* ----------------------------------------------------------------------------
860 * Start any pending signal handlers
861 * ------------------------------------------------------------------------- */
862
863 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
864 static void
865 scheduleStartSignalHandlers(Capability *cap)
866 {
867 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
868 // safe outside the lock
869 startSignalHandlers(cap);
870 }
871 }
872 #else
873 static void
874 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
875 {
876 }
877 #endif
878
879 /* ----------------------------------------------------------------------------
880 * Check for blocked threads that can be woken up.
881 * ------------------------------------------------------------------------- */
882
883 static void
884 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
885 {
886 #if !defined(THREADED_RTS)
887 //
888 // Check whether any waiting threads need to be woken up. If the
889 // run queue is empty, and there are no other tasks running, we
890 // can wait indefinitely for something to happen.
891 //
892 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
893 {
894 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
895 }
896 #endif
897 }
898
899
900 /* ----------------------------------------------------------------------------
901 * Check for threads woken up by other Capabilities
902 * ------------------------------------------------------------------------- */
903
904 static void
905 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
906 {
907 #if defined(THREADED_RTS)
908 // Any threads that were woken up by other Capabilities get
909 // appended to our run queue.
910 if (!emptyWakeupQueue(cap)) {
911 ACQUIRE_LOCK(&cap->lock);
912 if (emptyRunQueue(cap)) {
913 cap->run_queue_hd = cap->wakeup_queue_hd;
914 cap->run_queue_tl = cap->wakeup_queue_tl;
915 } else {
916 setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
917 cap->run_queue_tl = cap->wakeup_queue_tl;
918 }
919 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
920 RELEASE_LOCK(&cap->lock);
921 }
922 #endif
923 }
924
925 /* ----------------------------------------------------------------------------
926 * Check for threads blocked on BLACKHOLEs that can be woken up
927 * ------------------------------------------------------------------------- */
928 static void
929 scheduleCheckBlackHoles (Capability *cap)
930 {
931 if ( blackholes_need_checking ) // check without the lock first
932 {
933 ACQUIRE_LOCK(&sched_mutex);
934 if ( blackholes_need_checking ) {
935 checkBlackHoles(cap);
936 blackholes_need_checking = rtsFalse;
937 }
938 RELEASE_LOCK(&sched_mutex);
939 }
940 }
941
942 /* ----------------------------------------------------------------------------
943 * Detect deadlock conditions and attempt to resolve them.
944 * ------------------------------------------------------------------------- */
945
946 static void
947 scheduleDetectDeadlock (Capability *cap, Task *task)
948 {
949
950 #if defined(PARALLEL_HASKELL)
951 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
952 return;
953 #endif
954
955 /*
956 * Detect deadlock: when we have no threads to run, there are no
957 * threads blocked, waiting for I/O, or sleeping, and all the
958 * other tasks are waiting for work, we must have a deadlock of
959 * some description.
960 */
961 if ( emptyThreadQueues(cap) )
962 {
963 #if defined(THREADED_RTS)
964 /*
965 * In the threaded RTS, we only check for deadlock if there
966 * has been no activity in a complete timeslice. This means
967 * we won't eagerly start a full GC just because we don't have
968 * any threads to run currently.
969 */
970 if (recent_activity != ACTIVITY_INACTIVE) return;
971 #endif
972
973 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
974
975 // Garbage collection can release some new threads due to
976 // either (a) finalizers or (b) threads resurrected because
977 // they are unreachable and will therefore be sent an
978 // exception. Any threads thus released will be immediately
979 // runnable.
980 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
981
982 recent_activity = ACTIVITY_DONE_GC;
983 // disable timer signals (see #1623)
984 stopTimer();
985
986 if ( !emptyRunQueue(cap) ) return;
987
988 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
989 /* If we have user-installed signal handlers, then wait
990 * for signals to arrive rather then bombing out with a
991 * deadlock.
992 */
993 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
994 debugTrace(DEBUG_sched,
995 "still deadlocked, waiting for signals...");
996
997 awaitUserSignals();
998
999 if (signals_pending()) {
1000 startSignalHandlers(cap);
1001 }
1002
1003 // either we have threads to run, or we were interrupted:
1004 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1005 }
1006 #endif
1007
1008 #if !defined(THREADED_RTS)
1009 /* Probably a real deadlock. Send the current main thread the
1010 * Deadlock exception.
1011 */
1012 if (task->tso) {
1013 switch (task->tso->why_blocked) {
1014 case BlockedOnSTM:
1015 case BlockedOnBlackHole:
1016 case BlockedOnException:
1017 case BlockedOnMVar:
1018 throwToSingleThreaded(cap, task->tso,
1019 (StgClosure *)NonTermination_closure);
1020 return;
1021 default:
1022 barf("deadlock: main thread blocked in a strange way");
1023 }
1024 }
1025 return;
1026 #endif
1027 }
1028 }
1029
1030 /* ----------------------------------------------------------------------------
1031 * Process an event (GRAN only)
1032 * ------------------------------------------------------------------------- */
1033
1034 #if defined(GRAN)
1035 static StgTSO *
1036 scheduleProcessEvent(rtsEvent *event)
1037 {
1038 StgTSO *t;
1039
1040 if (RtsFlags.GranFlags.Light)
1041 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1042
1043 /* adjust time based on time-stamp */
1044 if (event->time > CurrentTime[CurrentProc] &&
1045 event->evttype != ContinueThread)
1046 CurrentTime[CurrentProc] = event->time;
1047
1048 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1049 if (!RtsFlags.GranFlags.Light)
1050 handleIdlePEs();
1051
1052 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1053
1054 /* main event dispatcher in GranSim */
1055 switch (event->evttype) {
1056 /* Should just be continuing execution */
1057 case ContinueThread:
1058 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1059 /* ToDo: check assertion
1060 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1061 run_queue_hd != END_TSO_QUEUE);
1062 */
1063 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1064 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1065 procStatus[CurrentProc]==Fetching) {
1066 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1067 CurrentTSO->id, CurrentTSO, CurrentProc);
1068 goto next_thread;
1069 }
1070 /* Ignore ContinueThreads for completed threads */
1071 if (CurrentTSO->what_next == ThreadComplete) {
1072 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1073 CurrentTSO->id, CurrentTSO, CurrentProc);
1074 goto next_thread;
1075 }
1076 /* Ignore ContinueThreads for threads that are being migrated */
1077 if (PROCS(CurrentTSO)==Nowhere) {
1078 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1079 CurrentTSO->id, CurrentTSO, CurrentProc);
1080 goto next_thread;
1081 }
1082 /* The thread should be at the beginning of the run queue */
1083 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1084 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1085 CurrentTSO->id, CurrentTSO, CurrentProc);
1086 break; // run the thread anyway
1087 }
1088 /*
1089 new_event(proc, proc, CurrentTime[proc],
1090 FindWork,
1091 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1092 goto next_thread;
1093 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1094 break; // now actually run the thread; DaH Qu'vam yImuHbej
1095
1096 case FetchNode:
1097 do_the_fetchnode(event);
1098 goto next_thread; /* handle next event in event queue */
1099
1100 case GlobalBlock:
1101 do_the_globalblock(event);
1102 goto next_thread; /* handle next event in event queue */
1103
1104 case FetchReply:
1105 do_the_fetchreply(event);
1106 goto next_thread; /* handle next event in event queue */
1107
1108 case UnblockThread: /* Move from the blocked queue to the tail of */
1109 do_the_unblock(event);
1110 goto next_thread; /* handle next event in event queue */
1111
1112 case ResumeThread: /* Move from the blocked queue to the tail of */
1113 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1114 event->tso->gran.blocktime +=
1115 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1116 do_the_startthread(event);
1117 goto next_thread; /* handle next event in event queue */
1118
1119 case StartThread:
1120 do_the_startthread(event);
1121 goto next_thread; /* handle next event in event queue */
1122
1123 case MoveThread:
1124 do_the_movethread(event);
1125 goto next_thread; /* handle next event in event queue */
1126
1127 case MoveSpark:
1128 do_the_movespark(event);
1129 goto next_thread; /* handle next event in event queue */
1130
1131 case FindWork:
1132 do_the_findwork(event);
1133 goto next_thread; /* handle next event in event queue */
1134
1135 default:
1136 barf("Illegal event type %u\n", event->evttype);
1137 } /* switch */
1138
1139 /* This point was scheduler_loop in the old RTS */
1140
1141 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1142
1143 TimeOfLastEvent = CurrentTime[CurrentProc];
1144 TimeOfNextEvent = get_time_of_next_event();
1145 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1146 // CurrentTSO = ThreadQueueHd;
1147
1148 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1149 TimeOfNextEvent));
1150
1151 if (RtsFlags.GranFlags.Light)
1152 GranSimLight_leave_system(event, &ActiveTSO);
1153
1154 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1155
1156 IF_DEBUG(gran,
1157 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1158
1159 /* in a GranSim setup the TSO stays on the run queue */
1160 t = CurrentTSO;
1161 /* Take a thread from the run queue. */
1162 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1163
1164 IF_DEBUG(gran,
1165 debugBelch("GRAN: About to run current thread, which is\n");
1166 G_TSO(t,5));
1167
1168 context_switch = 0; // turned on via GranYield, checking events and time slice
1169
1170 IF_DEBUG(gran,
1171 DumpGranEvent(GR_SCHEDULE, t));
1172
1173 procStatus[CurrentProc] = Busy;
1174 }
1175 #endif // GRAN
1176
1177 /* ----------------------------------------------------------------------------
1178 * Send pending messages (PARALLEL_HASKELL only)
1179 * ------------------------------------------------------------------------- */
1180
1181 #if defined(PARALLEL_HASKELL)
1182 static StgTSO *
1183 scheduleSendPendingMessages(void)
1184 {
1185 StgSparkPool *pool;
1186 rtsSpark spark;
1187 StgTSO *t;
1188
1189 # if defined(PAR) // global Mem.Mgmt., omit for now
1190 if (PendingFetches != END_BF_QUEUE) {
1191 processFetches();
1192 }
1193 # endif
1194
1195 if (RtsFlags.ParFlags.BufferTime) {
1196 // if we use message buffering, we must send away all message
1197 // packets which have become too old...
1198 sendOldBuffers();
1199 }
1200 }
1201 #endif
1202
1203 /* ----------------------------------------------------------------------------
1204 * Activate spark threads (PARALLEL_HASKELL only)
1205 * ------------------------------------------------------------------------- */
1206
1207 #if defined(PARALLEL_HASKELL)
1208 static void
1209 scheduleActivateSpark(void)
1210 {
1211 #if defined(SPARKS)
1212 ASSERT(emptyRunQueue());
1213 /* We get here if the run queue is empty and want some work.
1214 We try to turn a spark into a thread, and add it to the run queue,
1215 from where it will be picked up in the next iteration of the scheduler
1216 loop.
1217 */
1218
1219 /* :-[ no local threads => look out for local sparks */
1220 /* the spark pool for the current PE */
1221 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1222 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1223 pool->hd < pool->tl) {
1224 /*
1225 * ToDo: add GC code check that we really have enough heap afterwards!!
1226 * Old comment:
1227 * If we're here (no runnable threads) and we have pending
1228 * sparks, we must have a space problem. Get enough space
1229 * to turn one of those pending sparks into a
1230 * thread...
1231 */
1232
1233 spark = findSpark(rtsFalse); /* get a spark */
1234 if (spark != (rtsSpark) NULL) {
1235 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1236 IF_PAR_DEBUG(fish, // schedule,
1237 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1238 tso->id, tso, advisory_thread_count));
1239
1240 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1241 IF_PAR_DEBUG(fish, // schedule,
1242 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1243 spark));
1244 return rtsFalse; /* failed to generate a thread */
1245 } /* otherwise fall through & pick-up new tso */
1246 } else {
1247 IF_PAR_DEBUG(fish, // schedule,
1248 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1249 spark_queue_len(pool)));
1250 return rtsFalse; /* failed to generate a thread */
1251 }
1252 return rtsTrue; /* success in generating a thread */
1253 } else { /* no more threads permitted or pool empty */
1254 return rtsFalse; /* failed to generateThread */
1255 }
1256 #else
1257 tso = NULL; // avoid compiler warning only
1258 return rtsFalse; /* dummy in non-PAR setup */
1259 #endif // SPARKS
1260 }
1261 #endif // PARALLEL_HASKELL
1262
1263 /* ----------------------------------------------------------------------------
1264 * Get work from a remote node (PARALLEL_HASKELL only)
1265 * ------------------------------------------------------------------------- */
1266
1267 #if defined(PARALLEL_HASKELL)
1268 static rtsBool
1269 scheduleGetRemoteWork(rtsBool *receivedFinish)
1270 {
1271 ASSERT(emptyRunQueue());
1272
1273 if (RtsFlags.ParFlags.BufferTime) {
1274 IF_PAR_DEBUG(verbose,
1275 debugBelch("...send all pending data,"));
1276 {
1277 nat i;
1278 for (i=1; i<=nPEs; i++)
1279 sendImmediately(i); // send all messages away immediately
1280 }
1281 }
1282 # ifndef SPARKS
1283 //++EDEN++ idle() , i.e. send all buffers, wait for work
1284 // suppress fishing in EDEN... just look for incoming messages
1285 // (blocking receive)
1286 IF_PAR_DEBUG(verbose,
1287 debugBelch("...wait for incoming messages...\n"));
1288 *receivedFinish = processMessages(); // blocking receive...
1289
1290 // and reenter scheduling loop after having received something
1291 // (return rtsFalse below)
1292
1293 # else /* activate SPARKS machinery */
1294 /* We get here, if we have no work, tried to activate a local spark, but still
1295 have no work. We try to get a remote spark, by sending a FISH message.
1296 Thread migration should be added here, and triggered when a sequence of
1297 fishes returns without work. */
1298 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1299
1300 /* =8-[ no local sparks => look for work on other PEs */
1301 /*
1302 * We really have absolutely no work. Send out a fish
1303 * (there may be some out there already), and wait for
1304 * something to arrive. We clearly can't run any threads
1305 * until a SCHEDULE or RESUME arrives, and so that's what
1306 * we're hoping to see. (Of course, we still have to
1307 * respond to other types of messages.)
1308 */
1309 rtsTime now = msTime() /*CURRENT_TIME*/;
1310 IF_PAR_DEBUG(verbose,
1311 debugBelch("-- now=%ld\n", now));
1312 IF_PAR_DEBUG(fish, // verbose,
1313 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1314 (last_fish_arrived_at!=0 &&
1315 last_fish_arrived_at+delay > now)) {
1316 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1317 now, last_fish_arrived_at+delay,
1318 last_fish_arrived_at,
1319 delay);
1320 });
1321
1322 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1323 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1324 if (last_fish_arrived_at==0 ||
1325 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1326 /* outstandingFishes is set in sendFish, processFish;
1327 avoid flooding system with fishes via delay */
1328 next_fish_to_send_at = 0;
1329 } else {
1330 /* ToDo: this should be done in the main scheduling loop to avoid the
1331 busy wait here; not so bad if fish delay is very small */
1332 int iq = 0; // DEBUGGING -- HWL
1333 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1334 /* send a fish when ready, but process messages that arrive in the meantime */
1335 do {
1336 if (PacketsWaiting()) {
1337 iq++; // DEBUGGING
1338 *receivedFinish = processMessages();
1339 }
1340 now = msTime();
1341 } while (!*receivedFinish || now<next_fish_to_send_at);
1342 // JB: This means the fish could become obsolete, if we receive
1343 // work. Better check for work again?
1344 // last line: while (!receivedFinish || !haveWork || now<...)
1345 // next line: if (receivedFinish || haveWork )
1346
1347 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1348 return rtsFalse; // NB: this will leave scheduler loop
1349 // immediately after return!
1350
1351 IF_PAR_DEBUG(fish, // verbose,
1352 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1353
1354 }
1355
1356 // JB: IMHO, this should all be hidden inside sendFish(...)
1357 /* pe = choosePE();
1358 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1359 NEW_FISH_HUNGER);
1360
1361 // Global statistics: count no. of fishes
1362 if (RtsFlags.ParFlags.ParStats.Global &&
1363 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1364 globalParStats.tot_fish_mess++;
1365 }
1366 */
1367
1368 /* delayed fishes must have been sent by now! */
1369 next_fish_to_send_at = 0;
1370 }
1371
1372 *receivedFinish = processMessages();
1373 # endif /* SPARKS */
1374
1375 return rtsFalse;
1376 /* NB: this function always returns rtsFalse, meaning the scheduler
1377 loop continues with the next iteration;
1378 rationale:
1379 return code means success in finding work; we enter this function
1380 if there is no local work, thus have to send a fish which takes
1381 time until it arrives with work; in the meantime we should process
1382 messages in the main loop;
1383 */
1384 }
1385 #endif // PARALLEL_HASKELL
1386
1387 /* ----------------------------------------------------------------------------
1388 * PAR/GRAN: Report stats & debugging info(?)
1389 * ------------------------------------------------------------------------- */
1390
1391 #if defined(PAR) || defined(GRAN)
1392 static void
1393 scheduleGranParReport(void)
1394 {
1395 ASSERT(run_queue_hd != END_TSO_QUEUE);
1396
1397 /* Take a thread from the run queue, if we have work */
1398 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1399
1400 /* If this TSO has got its outport closed in the meantime,
1401 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1402 * It has to be marked as TH_DEAD for this purpose.
1403 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1404
1405 JB: TODO: investigate wether state change field could be nuked
1406 entirely and replaced by the normal tso state (whatnext
1407 field). All we want to do is to kill tsos from outside.
1408 */
1409
1410 /* ToDo: write something to the log-file
1411 if (RTSflags.ParFlags.granSimStats && !sameThread)
1412 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1413
1414 CurrentTSO = t;
1415 */
1416 /* the spark pool for the current PE */
1417 pool = &(cap.r.rSparks); // cap = (old) MainCap
1418
1419 IF_DEBUG(scheduler,
1420 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1421 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1422
1423 IF_PAR_DEBUG(fish,
1424 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1425 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1426
1427 if (RtsFlags.ParFlags.ParStats.Full &&
1428 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1429 (emitSchedule || // forced emit
1430 (t && LastTSO && t->id != LastTSO->id))) {
1431 /*
1432 we are running a different TSO, so write a schedule event to log file
1433 NB: If we use fair scheduling we also have to write a deschedule
1434 event for LastTSO; with unfair scheduling we know that the
1435 previous tso has blocked whenever we switch to another tso, so
1436 we don't need it in GUM for now
1437 */
1438 IF_PAR_DEBUG(fish, // schedule,
1439 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1440
1441 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1442 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1443 emitSchedule = rtsFalse;
1444 }
1445 }
1446 #endif
1447
1448 /* ----------------------------------------------------------------------------
1449 * After running a thread...
1450 * ------------------------------------------------------------------------- */
1451
1452 static void
1453 schedulePostRunThread (StgTSO *t)
1454 {
1455 // We have to be able to catch transactions that are in an
1456 // infinite loop as a result of seeing an inconsistent view of
1457 // memory, e.g.
1458 //
1459 // atomically $ do
1460 // [a,b] <- mapM readTVar [ta,tb]
1461 // when (a == b) loop
1462 //
1463 // and a is never equal to b given a consistent view of memory.
1464 //
1465 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1466 if (!stmValidateNestOfTransactions (t -> trec)) {
1467 debugTrace(DEBUG_sched | DEBUG_stm,
1468 "trec %p found wasting its time", t);
1469
1470 // strip the stack back to the
1471 // ATOMICALLY_FRAME, aborting the (nested)
1472 // transaction, and saving the stack of any
1473 // partially-evaluated thunks on the heap.
1474 throwToSingleThreaded_(&capabilities[0], t,
1475 NULL, rtsTrue, NULL);
1476
1477 #ifdef REG_R1
1478 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1479 #endif
1480 }
1481 }
1482
1483 #if defined(PAR)
1484 /* HACK 675: if the last thread didn't yield, make sure to print a
1485 SCHEDULE event to the log file when StgRunning the next thread, even
1486 if it is the same one as before */
1487 LastTSO = t;
1488 TimeOfLastYield = CURRENT_TIME;
1489 #endif
1490
1491 /* some statistics gathering in the parallel case */
1492
1493 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1494 switch (ret) {
1495 case HeapOverflow:
1496 # if defined(GRAN)
1497 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1498 globalGranStats.tot_heapover++;
1499 # elif defined(PAR)
1500 globalParStats.tot_heapover++;
1501 # endif
1502 break;
1503
1504 case StackOverflow:
1505 # if defined(GRAN)
1506 IF_DEBUG(gran,
1507 DumpGranEvent(GR_DESCHEDULE, t));
1508 globalGranStats.tot_stackover++;
1509 # elif defined(PAR)
1510 // IF_DEBUG(par,
1511 // DumpGranEvent(GR_DESCHEDULE, t);
1512 globalParStats.tot_stackover++;
1513 # endif
1514 break;
1515
1516 case ThreadYielding:
1517 # if defined(GRAN)
1518 IF_DEBUG(gran,
1519 DumpGranEvent(GR_DESCHEDULE, t));
1520 globalGranStats.tot_yields++;
1521 # elif defined(PAR)
1522 // IF_DEBUG(par,
1523 // DumpGranEvent(GR_DESCHEDULE, t);
1524 globalParStats.tot_yields++;
1525 # endif
1526 break;
1527
1528 case ThreadBlocked:
1529 # if defined(GRAN)
1530 debugTrace(DEBUG_sched,
1531 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1532 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1533 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1534 if (t->block_info.closure!=(StgClosure*)NULL)
1535 print_bq(t->block_info.closure);
1536 debugBelch("\n"));
1537
1538 // ??? needed; should emit block before
1539 IF_DEBUG(gran,
1540 DumpGranEvent(GR_DESCHEDULE, t));
1541 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1542 /*
1543 ngoq Dogh!
1544 ASSERT(procStatus[CurrentProc]==Busy ||
1545 ((procStatus[CurrentProc]==Fetching) &&
1546 (t->block_info.closure!=(StgClosure*)NULL)));
1547 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1548 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1549 procStatus[CurrentProc]==Fetching))
1550 procStatus[CurrentProc] = Idle;
1551 */
1552 # elif defined(PAR)
1553 //++PAR++ blockThread() writes the event (change?)
1554 # endif
1555 break;
1556
1557 case ThreadFinished:
1558 break;
1559
1560 default:
1561 barf("parGlobalStats: unknown return code");
1562 break;
1563 }
1564 #endif
1565 }
1566
1567 /* -----------------------------------------------------------------------------
1568 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1569 * -------------------------------------------------------------------------- */
1570
1571 static rtsBool
1572 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1573 {
1574 // did the task ask for a large block?
1575 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1576 // if so, get one and push it on the front of the nursery.
1577 bdescr *bd;
1578 lnat blocks;
1579
1580 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1581
1582 debugTrace(DEBUG_sched,
1583 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1584 (long)t->id, whatNext_strs[t->what_next], blocks);
1585
1586 // don't do this if the nursery is (nearly) full, we'll GC first.
1587 if (cap->r.rCurrentNursery->link != NULL ||
1588 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1589 // if the nursery has only one block.
1590
1591 ACQUIRE_SM_LOCK
1592 bd = allocGroup( blocks );
1593 RELEASE_SM_LOCK
1594 cap->r.rNursery->n_blocks += blocks;
1595
1596 // link the new group into the list
1597 bd->link = cap->r.rCurrentNursery;
1598 bd->u.back = cap->r.rCurrentNursery->u.back;
1599 if (cap->r.rCurrentNursery->u.back != NULL) {
1600 cap->r.rCurrentNursery->u.back->link = bd;
1601 } else {
1602 #if !defined(THREADED_RTS)
1603 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1604 g0s0 == cap->r.rNursery);
1605 #endif
1606 cap->r.rNursery->blocks = bd;
1607 }
1608 cap->r.rCurrentNursery->u.back = bd;
1609
1610 // initialise it as a nursery block. We initialise the
1611 // step, gen_no, and flags field of *every* sub-block in
1612 // this large block, because this is easier than making
1613 // sure that we always find the block head of a large
1614 // block whenever we call Bdescr() (eg. evacuate() and
1615 // isAlive() in the GC would both have to do this, at
1616 // least).
1617 {
1618 bdescr *x;
1619 for (x = bd; x < bd + blocks; x++) {
1620 x->step = cap->r.rNursery;
1621 x->gen_no = 0;
1622 x->flags = 0;
1623 }
1624 }
1625
1626 // This assert can be a killer if the app is doing lots
1627 // of large block allocations.
1628 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1629
1630 // now update the nursery to point to the new block
1631 cap->r.rCurrentNursery = bd;
1632
1633 // we might be unlucky and have another thread get on the
1634 // run queue before us and steal the large block, but in that
1635 // case the thread will just end up requesting another large
1636 // block.
1637 pushOnRunQueue(cap,t);
1638 return rtsFalse; /* not actually GC'ing */
1639 }
1640 }
1641
1642 debugTrace(DEBUG_sched,
1643 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1644 (long)t->id, whatNext_strs[t->what_next]);
1645
1646 #if defined(GRAN)
1647 ASSERT(!is_on_queue(t,CurrentProc));
1648 #elif defined(PARALLEL_HASKELL)
1649 /* Currently we emit a DESCHEDULE event before GC in GUM.
1650 ToDo: either add separate event to distinguish SYSTEM time from rest
1651 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1652 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1653 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1654 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1655 emitSchedule = rtsTrue;
1656 }
1657 #endif
1658
1659 if (context_switch) {
1660 // Sometimes we miss a context switch, e.g. when calling
1661 // primitives in a tight loop, MAYBE_GC() doesn't check the
1662 // context switch flag, and we end up waiting for a GC.
1663 // See #1984, and concurrent/should_run/1984
1664 context_switch = 0;
1665 addToRunQueue(cap,t);
1666 } else {
1667 pushOnRunQueue(cap,t);
1668 }
1669 return rtsTrue;
1670 /* actual GC is done at the end of the while loop in schedule() */
1671 }
1672
1673 /* -----------------------------------------------------------------------------
1674 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1675 * -------------------------------------------------------------------------- */
1676
1677 static void
1678 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1679 {
1680 debugTrace (DEBUG_sched,
1681 "--<< thread %ld (%s) stopped, StackOverflow",
1682 (long)t->id, whatNext_strs[t->what_next]);
1683
1684 /* just adjust the stack for this thread, then pop it back
1685 * on the run queue.
1686 */
1687 {
1688 /* enlarge the stack */
1689 StgTSO *new_t = threadStackOverflow(cap, t);
1690
1691 /* The TSO attached to this Task may have moved, so update the
1692 * pointer to it.
1693 */
1694 if (task->tso == t) {
1695 task->tso = new_t;
1696 }
1697 pushOnRunQueue(cap,new_t);
1698 }
1699 }
1700
1701 /* -----------------------------------------------------------------------------
1702 * Handle a thread that returned to the scheduler with ThreadYielding
1703 * -------------------------------------------------------------------------- */
1704
1705 static rtsBool
1706 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1707 {
1708 // Reset the context switch flag. We don't do this just before
1709 // running the thread, because that would mean we would lose ticks
1710 // during GC, which can lead to unfair scheduling (a thread hogs
1711 // the CPU because the tick always arrives during GC). This way
1712 // penalises threads that do a lot of allocation, but that seems
1713 // better than the alternative.
1714 context_switch = 0;
1715
1716 /* put the thread back on the run queue. Then, if we're ready to
1717 * GC, check whether this is the last task to stop. If so, wake
1718 * up the GC thread. getThread will block during a GC until the
1719 * GC is finished.
1720 */
1721 #ifdef DEBUG
1722 if (t->what_next != prev_what_next) {
1723 debugTrace(DEBUG_sched,
1724 "--<< thread %ld (%s) stopped to switch evaluators",
1725 (long)t->id, whatNext_strs[t->what_next]);
1726 } else {
1727 debugTrace(DEBUG_sched,
1728 "--<< thread %ld (%s) stopped, yielding",
1729 (long)t->id, whatNext_strs[t->what_next]);
1730 }
1731 #endif
1732
1733 IF_DEBUG(sanity,
1734 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1735 checkTSO(t));
1736 ASSERT(t->_link == END_TSO_QUEUE);
1737
1738 // Shortcut if we're just switching evaluators: don't bother
1739 // doing stack squeezing (which can be expensive), just run the
1740 // thread.
1741 if (t->what_next != prev_what_next) {
1742 return rtsTrue;
1743 }
1744
1745 #if defined(GRAN)
1746 ASSERT(!is_on_queue(t,CurrentProc));
1747
1748 IF_DEBUG(sanity,
1749 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1750 checkThreadQsSanity(rtsTrue));
1751
1752 #endif
1753
1754 addToRunQueue(cap,t);
1755
1756 #if defined(GRAN)
1757 /* add a ContinueThread event to actually process the thread */
1758 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1759 ContinueThread,
1760 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1761 IF_GRAN_DEBUG(bq,
1762 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1763 G_EVENTQ(0);
1764 G_CURR_THREADQ(0));
1765 #endif
1766 return rtsFalse;
1767 }
1768
1769 /* -----------------------------------------------------------------------------
1770 * Handle a thread that returned to the scheduler with ThreadBlocked
1771 * -------------------------------------------------------------------------- */
1772
1773 static void
1774 scheduleHandleThreadBlocked( StgTSO *t
1775 #if !defined(GRAN) && !defined(DEBUG)
1776 STG_UNUSED
1777 #endif
1778 )
1779 {
1780 #if defined(GRAN)
1781 IF_DEBUG(scheduler,
1782 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1783 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1784 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1785
1786 // ??? needed; should emit block before
1787 IF_DEBUG(gran,
1788 DumpGranEvent(GR_DESCHEDULE, t));
1789 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1790 /*
1791 ngoq Dogh!
1792 ASSERT(procStatus[CurrentProc]==Busy ||
1793 ((procStatus[CurrentProc]==Fetching) &&
1794 (t->block_info.closure!=(StgClosure*)NULL)));
1795 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1796 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1797 procStatus[CurrentProc]==Fetching))
1798 procStatus[CurrentProc] = Idle;
1799 */
1800 #elif defined(PAR)
1801 IF_DEBUG(scheduler,
1802 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1803 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1804 IF_PAR_DEBUG(bq,
1805
1806 if (t->block_info.closure!=(StgClosure*)NULL)
1807 print_bq(t->block_info.closure));
1808
1809 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1810 blockThread(t);
1811
1812 /* whatever we schedule next, we must log that schedule */
1813 emitSchedule = rtsTrue;
1814
1815 #else /* !GRAN */
1816
1817 // We don't need to do anything. The thread is blocked, and it
1818 // has tidied up its stack and placed itself on whatever queue
1819 // it needs to be on.
1820
1821 // ASSERT(t->why_blocked != NotBlocked);
1822 // Not true: for example,
1823 // - in THREADED_RTS, the thread may already have been woken
1824 // up by another Capability. This actually happens: try
1825 // conc023 +RTS -N2.
1826 // - the thread may have woken itself up already, because
1827 // threadPaused() might have raised a blocked throwTo
1828 // exception, see maybePerformBlockedException().
1829
1830 #ifdef DEBUG
1831 if (traceClass(DEBUG_sched)) {
1832 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1833 (unsigned long)t->id, whatNext_strs[t->what_next]);
1834 printThreadBlockage(t);
1835 debugTraceEnd();
1836 }
1837 #endif
1838
1839 /* Only for dumping event to log file
1840 ToDo: do I need this in GranSim, too?
1841 blockThread(t);
1842 */
1843 #endif
1844 }
1845
1846 /* -----------------------------------------------------------------------------
1847 * Handle a thread that returned to the scheduler with ThreadFinished
1848 * -------------------------------------------------------------------------- */
1849
1850 static rtsBool
1851 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1852 {
1853 /* Need to check whether this was a main thread, and if so,
1854 * return with the return value.
1855 *
1856 * We also end up here if the thread kills itself with an
1857 * uncaught exception, see Exception.cmm.
1858 */
1859 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1860 (unsigned long)t->id, whatNext_strs[t->what_next]);
1861
1862 #if defined(GRAN)
1863 endThread(t, CurrentProc); // clean-up the thread
1864 #elif defined(PARALLEL_HASKELL)
1865 /* For now all are advisory -- HWL */
1866 //if(t->priority==AdvisoryPriority) ??
1867 advisory_thread_count--; // JB: Caution with this counter, buggy!
1868
1869 # if defined(DIST)
1870 if(t->dist.priority==RevalPriority)
1871 FinishReval(t);
1872 # endif
1873
1874 # if defined(EDENOLD)
1875 // the thread could still have an outport... (BUG)
1876 if (t->eden.outport != -1) {
1877 // delete the outport for the tso which has finished...
1878 IF_PAR_DEBUG(eden_ports,
1879 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1880 t->eden.outport, t->id));
1881 deleteOPT(t);
1882 }
1883 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1884 if (t->eden.epid != -1) {
1885 IF_PAR_DEBUG(eden_ports,
1886 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1887 t->id, t->eden.epid));
1888 removeTSOfromProcess(t);
1889 }
1890 # endif
1891
1892 # if defined(PAR)
1893 if (RtsFlags.ParFlags.ParStats.Full &&
1894 !RtsFlags.ParFlags.ParStats.Suppressed)
1895 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1896
1897 // t->par only contains statistics: left out for now...
1898 IF_PAR_DEBUG(fish,
1899 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1900 t->id,t,t->par.sparkname));
1901 # endif
1902 #endif // PARALLEL_HASKELL
1903
1904 //
1905 // Check whether the thread that just completed was a bound
1906 // thread, and if so return with the result.
1907 //
1908 // There is an assumption here that all thread completion goes
1909 // through this point; we need to make sure that if a thread
1910 // ends up in the ThreadKilled state, that it stays on the run
1911 // queue so it can be dealt with here.
1912 //
1913
1914 if (t->bound) {
1915
1916 if (t->bound != task) {
1917 #if !defined(THREADED_RTS)
1918 // Must be a bound thread that is not the topmost one. Leave
1919 // it on the run queue until the stack has unwound to the
1920 // point where we can deal with this. Leaving it on the run
1921 // queue also ensures that the garbage collector knows about
1922 // this thread and its return value (it gets dropped from the
1923 // step->threads list so there's no other way to find it).
1924 appendToRunQueue(cap,t);
1925 return rtsFalse;
1926 #else
1927 // this cannot happen in the threaded RTS, because a
1928 // bound thread can only be run by the appropriate Task.
1929 barf("finished bound thread that isn't mine");
1930 #endif
1931 }
1932
1933 ASSERT(task->tso == t);
1934
1935 if (t->what_next == ThreadComplete) {
1936 if (task->ret) {
1937 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1938 *(task->ret) = (StgClosure *)task->tso->sp[1];
1939 }
1940 task->stat = Success;
1941 } else {
1942 if (task->ret) {
1943 *(task->ret) = NULL;
1944 }
1945 if (sched_state >= SCHED_INTERRUPTING) {
1946 task->stat = Interrupted;
1947 } else {
1948 task->stat = Killed;
1949 }
1950 }
1951 #ifdef DEBUG
1952 removeThreadLabel((StgWord)task->tso->id);
1953 #endif
1954 return rtsTrue; // tells schedule() to return
1955 }
1956
1957 return rtsFalse;
1958 }
1959
1960 /* -----------------------------------------------------------------------------
1961 * Perform a heap census
1962 * -------------------------------------------------------------------------- */
1963
1964 static rtsBool
1965 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1966 {
1967 // When we have +RTS -i0 and we're heap profiling, do a census at
1968 // every GC. This lets us get repeatable runs for debugging.
1969 if (performHeapProfile ||
1970 (RtsFlags.ProfFlags.profileInterval==0 &&
1971 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1972 return rtsTrue;
1973 } else {
1974 return rtsFalse;
1975 }
1976 }
1977
1978 /* -----------------------------------------------------------------------------
1979 * Perform a garbage collection if necessary
1980 * -------------------------------------------------------------------------- */
1981
1982 static Capability *
1983 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1984 {
1985 StgTSO *t;
1986 rtsBool heap_census;
1987 #ifdef THREADED_RTS
1988 static volatile StgWord waiting_for_gc;
1989 rtsBool was_waiting;
1990 nat i;
1991 #endif
1992
1993 #ifdef THREADED_RTS
1994 // In order to GC, there must be no threads running Haskell code.
1995 // Therefore, the GC thread needs to hold *all* the capabilities,
1996 // and release them after the GC has completed.
1997 //
1998 // This seems to be the simplest way: previous attempts involved
1999 // making all the threads with capabilities give up their
2000 // capabilities and sleep except for the *last* one, which
2001 // actually did the GC. But it's quite hard to arrange for all
2002 // the other tasks to sleep and stay asleep.
2003 //
2004
2005 was_waiting = cas(&waiting_for_gc, 0, 1);
2006 if (was_waiting) {
2007 do {
2008 debugTrace(DEBUG_sched, "someone else is trying to GC...");
2009 if (cap) yieldCapability(&cap,task);
2010 } while (waiting_for_gc);
2011 return cap; // NOTE: task->cap might have changed here
2012 }
2013
2014 for (i=0; i < n_capabilities; i++) {
2015 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
2016 if (cap != &capabilities[i]) {
2017 Capability *pcap = &capabilities[i];
2018 // we better hope this task doesn't get migrated to
2019 // another Capability while we're waiting for this one.
2020 // It won't, because load balancing happens while we have
2021 // all the Capabilities, but even so it's a slightly
2022 // unsavoury invariant.
2023 task->cap = pcap;
2024 context_switch = 1;
2025 waitForReturnCapability(&pcap, task);
2026 if (pcap != &capabilities[i]) {
2027 barf("scheduleDoGC: got the wrong capability");
2028 }
2029 }
2030 }
2031
2032 waiting_for_gc = rtsFalse;
2033 #endif
2034
2035 // so this happens periodically:
2036 if (cap) scheduleCheckBlackHoles(cap);
2037
2038 IF_DEBUG(scheduler, printAllThreads());
2039
2040 /*
2041 * We now have all the capabilities; if we're in an interrupting
2042 * state, then we should take the opportunity to delete all the
2043 * threads in the system.
2044 */
2045 if (sched_state >= SCHED_INTERRUPTING) {
2046 deleteAllThreads(&capabilities[0]);
2047 sched_state = SCHED_SHUTTING_DOWN;
2048 }
2049
2050 heap_census = scheduleNeedHeapProfile(rtsTrue);
2051
2052 /* everybody back, start the GC.
2053 * Could do it in this thread, or signal a condition var
2054 * to do it in another thread. Either way, we need to
2055 * broadcast on gc_pending_cond afterward.
2056 */
2057 #if defined(THREADED_RTS)
2058 debugTrace(DEBUG_sched, "doing GC");
2059 #endif
2060 GarbageCollect(force_major || heap_census);
2061
2062 if (heap_census) {
2063 debugTrace(DEBUG_sched, "performing heap census");
2064 heapCensus();
2065 performHeapProfile = rtsFalse;
2066 }
2067
2068 #if defined(THREADED_RTS)
2069 // release our stash of capabilities.
2070 for (i = 0; i < n_capabilities; i++) {
2071 if (cap != &capabilities[i]) {
2072 task->cap = &capabilities[i];
2073 releaseCapability(&capabilities[i]);
2074 }
2075 }
2076 if (cap) {
2077 task->cap = cap;
2078 } else {
2079 task->cap = NULL;
2080 }
2081 #endif
2082
2083 #if defined(GRAN)
2084 /* add a ContinueThread event to continue execution of current thread */
2085 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2086 ContinueThread,
2087 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2088 IF_GRAN_DEBUG(bq,
2089 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2090 G_EVENTQ(0);
2091 G_CURR_THREADQ(0));
2092 #endif /* GRAN */
2093
2094 return cap;
2095 }
2096
2097 /* ---------------------------------------------------------------------------
2098 * Singleton fork(). Do not copy any running threads.
2099 * ------------------------------------------------------------------------- */
2100
2101 pid_t
2102 forkProcess(HsStablePtr *entry
2103 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2104 STG_UNUSED
2105 #endif
2106 )
2107 {
2108 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2109 Task *task;
2110 pid_t pid;
2111 StgTSO* t,*next;
2112 Capability *cap;
2113 nat s;
2114
2115 #if defined(THREADED_RTS)
2116 if (RtsFlags.ParFlags.nNodes > 1) {
2117 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2118 stg_exit(EXIT_FAILURE);
2119 }
2120 #endif
2121
2122 debugTrace(DEBUG_sched, "forking!");
2123
2124 // ToDo: for SMP, we should probably acquire *all* the capabilities
2125 cap = rts_lock();
2126
2127 // no funny business: hold locks while we fork, otherwise if some
2128 // other thread is holding a lock when the fork happens, the data
2129 // structure protected by the lock will forever be in an
2130 // inconsistent state in the child. See also #1391.
2131 ACQUIRE_LOCK(&sched_mutex);
2132 ACQUIRE_LOCK(&cap->lock);
2133 ACQUIRE_LOCK(&cap->running_task->lock);
2134
2135 pid = fork();
2136
2137 if (pid) { // parent
2138
2139 RELEASE_LOCK(&sched_mutex);
2140 RELEASE_LOCK(&cap->lock);
2141 RELEASE_LOCK(&cap->running_task->lock);
2142
2143 // just return the pid
2144 rts_unlock(cap);
2145 return pid;
2146
2147 } else { // child
2148
2149 #if defined(THREADED_RTS)
2150 initMutex(&sched_mutex);
2151 initMutex(&cap->lock);
2152 initMutex(&cap->running_task->lock);
2153 #endif
2154
2155 // Now, all OS threads except the thread that forked are
2156 // stopped. We need to stop all Haskell threads, including
2157 // those involved in foreign calls. Also we need to delete
2158 // all Tasks, because they correspond to OS threads that are
2159 // now gone.
2160
2161 for (s = 0; s < total_steps; s++) {
2162 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2163 if (t->what_next == ThreadRelocated) {
2164 next = t->_link;
2165 } else {
2166 next = t->global_link;
2167 // don't allow threads to catch the ThreadKilled
2168 // exception, but we do want to raiseAsync() because these
2169 // threads may be evaluating thunks that we need later.
2170 deleteThread_(cap,t);
2171 }
2172 }
2173 }
2174
2175 // Empty the run queue. It seems tempting to let all the
2176 // killed threads stay on the run queue as zombies to be
2177 // cleaned up later, but some of them correspond to bound
2178 // threads for which the corresponding Task does not exist.
2179 cap->run_queue_hd = END_TSO_QUEUE;
2180 cap->run_queue_tl = END_TSO_QUEUE;
2181
2182 // Any suspended C-calling Tasks are no more, their OS threads
2183 // don't exist now:
2184 cap->suspended_ccalling_tasks = NULL;
2185
2186 // Empty the threads lists. Otherwise, the garbage
2187 // collector may attempt to resurrect some of these threads.
2188 for (s = 0; s < total_steps; s++) {
2189 all_steps[s].threads = END_TSO_QUEUE;
2190 }
2191
2192 // Wipe the task list, except the current Task.
2193 ACQUIRE_LOCK(&sched_mutex);
2194 for (task = all_tasks; task != NULL; task=task->all_link) {
2195 if (task != cap->running_task) {
2196 #if defined(THREADED_RTS)
2197 initMutex(&task->lock); // see #1391
2198 #endif
2199 discardTask(task);
2200 }
2201 }
2202 RELEASE_LOCK(&sched_mutex);
2203
2204 #if defined(THREADED_RTS)
2205 // Wipe our spare workers list, they no longer exist. New
2206 // workers will be created if necessary.
2207 cap->spare_workers = NULL;
2208 cap->returning_tasks_hd = NULL;
2209 cap->returning_tasks_tl = NULL;
2210 #endif
2211
2212 // On Unix, all timers are reset in the child, so we need to start
2213 // the timer again.
2214 initTimer();
2215 startTimer();
2216
2217 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2218 rts_checkSchedStatus("forkProcess",cap);
2219
2220 rts_unlock(cap);
2221 hs_exit(); // clean up and exit
2222 stg_exit(EXIT_SUCCESS);
2223 }
2224 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2225 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2226 return -1;
2227 #endif
2228 }
2229
2230 /* ---------------------------------------------------------------------------
2231 * Delete all the threads in the system
2232 * ------------------------------------------------------------------------- */
2233
2234 static void
2235 deleteAllThreads ( Capability *cap )
2236 {
2237 // NOTE: only safe to call if we own all capabilities.
2238
2239 StgTSO* t, *next;
2240 nat s;
2241
2242 debugTrace(DEBUG_sched,"deleting all threads");
2243 for (s = 0; s < total_steps; s++) {
2244 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2245 if (t->what_next == ThreadRelocated) {
2246 next = t->_link;
2247 } else {
2248 next = t->global_link;
2249 deleteThread(cap,t);
2250 }
2251 }
2252 }
2253
2254 // The run queue now contains a bunch of ThreadKilled threads. We
2255 // must not throw these away: the main thread(s) will be in there
2256 // somewhere, and the main scheduler loop has to deal with it.
2257 // Also, the run queue is the only thing keeping these threads from
2258 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2259
2260 #if !defined(THREADED_RTS)
2261 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2262 ASSERT(sleeping_queue == END_TSO_QUEUE);
2263 #endif
2264 }
2265
2266 /* -----------------------------------------------------------------------------
2267 Managing the suspended_ccalling_tasks list.
2268 Locks required: sched_mutex
2269 -------------------------------------------------------------------------- */
2270
2271 STATIC_INLINE void
2272 suspendTask (Capability *cap, Task *task)
2273 {
2274 ASSERT(task->next == NULL && task->prev == NULL);
2275 task->next = cap->suspended_ccalling_tasks;
2276 task->prev = NULL;
2277 if (cap->suspended_ccalling_tasks) {
2278 cap->suspended_ccalling_tasks->prev = task;
2279 }
2280 cap->suspended_ccalling_tasks = task;
2281 }
2282
2283 STATIC_INLINE void
2284 recoverSuspendedTask (Capability *cap, Task *task)
2285 {
2286 if (task->prev) {
2287 task->prev->next = task->next;
2288 } else {
2289 ASSERT(cap->suspended_ccalling_tasks == task);
2290 cap->suspended_ccalling_tasks = task->next;
2291 }
2292 if (task->next) {
2293 task->next->prev = task->prev;
2294 }
2295 task->next = task->prev = NULL;
2296 }
2297
2298 /* ---------------------------------------------------------------------------
2299 * Suspending & resuming Haskell threads.
2300 *
2301 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2302 * its capability before calling the C function. This allows another
2303 * task to pick up the capability and carry on running Haskell
2304 * threads. It also means that if the C call blocks, it won't lock
2305 * the whole system.
2306 *
2307 * The Haskell thread making the C call is put to sleep for the
2308 * duration of the call, on the susepended_ccalling_threads queue. We
2309 * give out a token to the task, which it can use to resume the thread
2310 * on return from the C function.
2311 * ------------------------------------------------------------------------- */
2312
2313 void *
2314 suspendThread (StgRegTable *reg)
2315 {
2316 Capability *cap;
2317 int saved_errno;
2318 StgTSO *tso;
2319 Task *task;
2320 #if mingw32_HOST_OS
2321 StgWord32 saved_winerror;
2322 #endif
2323
2324 saved_errno = errno;
2325 #if mingw32_HOST_OS
2326 saved_winerror = GetLastError();
2327 #endif
2328
2329 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2330 */
2331 cap = regTableToCapability(reg);
2332
2333 task = cap->running_task;
2334 tso = cap->r.rCurrentTSO;
2335
2336 debugTrace(DEBUG_sched,
2337 "thread %lu did a safe foreign call",
2338 (unsigned long)cap->r.rCurrentTSO->id);
2339
2340 // XXX this might not be necessary --SDM
2341 tso->what_next = ThreadRunGHC;
2342
2343 threadPaused(cap,tso);
2344
2345 if ((tso->flags & TSO_BLOCKEX) == 0) {
2346 tso->why_blocked = BlockedOnCCall;
2347 tso->flags |= TSO_BLOCKEX;
2348 tso->flags &= ~TSO_INTERRUPTIBLE;
2349 } else {
2350 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2351 }
2352
2353 // Hand back capability
2354 task->suspended_tso = tso;
2355
2356 ACQUIRE_LOCK(&cap->lock);
2357
2358 suspendTask(cap,task);
2359 cap->in_haskell = rtsFalse;
2360 releaseCapability_(cap);
2361
2362 RELEASE_LOCK(&cap->lock);
2363
2364 #if defined(THREADED_RTS)
2365 /* Preparing to leave the RTS, so ensure there's a native thread/task
2366 waiting to take over.
2367 */
2368 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2369 #endif
2370
2371 errno = saved_errno;
2372 #if mingw32_HOST_OS
2373 SetLastError(saved_winerror);
2374 #endif
2375 return task;
2376 }
2377
2378 StgRegTable *
2379 resumeThread (void *task_)
2380 {
2381 StgTSO *tso;
2382 Capability *cap;
2383 Task *task = task_;
2384 int saved_errno;
2385 #if mingw32_HOST_OS
2386 StgWord32 saved_winerror;
2387 #endif
2388
2389 saved_errno = errno;
2390 #if mingw32_HOST_OS
2391 saved_winerror = GetLastError();
2392 #endif
2393
2394 cap = task->cap;
2395 // Wait for permission to re-enter the RTS with the result.
2396 waitForReturnCapability(&cap,task);
2397 // we might be on a different capability now... but if so, our
2398 // entry on the suspended_ccalling_tasks list will also have been
2399 // migrated.
2400
2401 // Remove the thread from the suspended list
2402 recoverSuspendedTask(cap,task);
2403
2404 tso = task->suspended_tso;
2405 task->suspended_tso = NULL;
2406 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2407 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2408
2409 if (tso->why_blocked == BlockedOnCCall) {
2410 awakenBlockedExceptionQueue(cap,tso);
2411 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2412 }
2413
2414 /* Reset blocking status */
2415 tso->why_blocked = NotBlocked;
2416
2417 cap->r.rCurrentTSO = tso;
2418 cap->in_haskell = rtsTrue;
2419 errno = saved_errno;
2420 #if mingw32_HOST_OS
2421 SetLastError(saved_winerror);
2422 #endif
2423
2424 /* We might have GC'd, mark the TSO dirty again */
2425 dirty_TSO(cap,tso);
2426
2427 IF_DEBUG(sanity, checkTSO(tso));
2428
2429 return &cap->r;
2430 }
2431
2432 /* ---------------------------------------------------------------------------
2433 * scheduleThread()
2434 *
2435 * scheduleThread puts a thread on the end of the runnable queue.
2436 * This will usually be done immediately after a thread is created.
2437 * The caller of scheduleThread must create the thread using e.g.
2438 * createThread and push an appropriate closure
2439 * on this thread's stack before the scheduler is invoked.
2440 * ------------------------------------------------------------------------ */
2441
2442 void
2443 scheduleThread(Capability *cap, StgTSO *tso)
2444 {
2445 // The thread goes at the *end* of the run-queue, to avoid possible
2446 // starvation of any threads already on the queue.
2447 appendToRunQueue(cap,tso);
2448 }
2449
2450 void
2451 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2452 {
2453 #if defined(THREADED_RTS)
2454 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2455 // move this thread from now on.
2456 cpu %= RtsFlags.ParFlags.nNodes;
2457 if (cpu == cap->no) {
2458 appendToRunQueue(cap,tso);
2459 } else {
2460 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2461 }
2462 #else
2463 appendToRunQueue(cap,tso);
2464 #endif
2465 }
2466
2467 Capability *
2468 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2469 {
2470 Task *task;
2471
2472 // We already created/initialised the Task
2473 task = cap->running_task;
2474
2475 // This TSO is now a bound thread; make the Task and TSO
2476 // point to each other.
2477 tso->bound = task;
2478 tso->cap = cap;
2479
2480 task->tso = tso;
2481 task->ret = ret;
2482 task->stat = NoStatus;
2483
2484 appendToRunQueue(cap,tso);
2485
2486 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2487
2488 #if defined(GRAN)
2489 /* GranSim specific init */
2490 CurrentTSO = m->tso; // the TSO to run
2491 procStatus[MainProc] = Busy; // status of main PE
2492 CurrentProc = MainProc; // PE to run it on
2493 #endif
2494
2495 cap = schedule(cap,task);
2496
2497 ASSERT(task->stat != NoStatus);
2498 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2499
2500 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2501 return cap;
2502 }
2503
2504 /* ----------------------------------------------------------------------------
2505 * Starting Tasks
2506 * ------------------------------------------------------------------------- */
2507
2508 #if defined(THREADED_RTS)
2509 void
2510 workerStart(Task *task)
2511 {
2512 Capability *cap;
2513
2514 // See startWorkerTask().
2515 ACQUIRE_LOCK(&task->lock);
2516 cap = task->cap;
2517 RELEASE_LOCK(&task->lock);
2518
2519 // set the thread-local pointer to the Task:
2520 taskEnter(task);
2521
2522 // schedule() runs without a lock.
2523 cap = schedule(cap,task);
2524
2525 // On exit from schedule(), we have a Capability.
2526 releaseCapability(cap);
2527 workerTaskStop(task);
2528 }
2529 #endif
2530
2531 /* ---------------------------------------------------------------------------
2532 * initScheduler()
2533 *
2534 * Initialise the scheduler. This resets all the queues - if the
2535 * queues contained any threads, they'll be garbage collected at the
2536 * next pass.
2537 *
2538 * ------------------------------------------------------------------------ */
2539
2540 void
2541 initScheduler(void)
2542 {
2543 #if defined(GRAN)
2544 nat i;
2545 for (i=0; i<=MAX_PROC; i++) {
2546 run_queue_hds[i] = END_TSO_QUEUE;
2547 run_queue_tls[i] = END_TSO_QUEUE;
2548 blocked_queue_hds[i] = END_TSO_QUEUE;
2549 blocked_queue_tls[i] = END_TSO_QUEUE;
2550 ccalling_threadss[i] = END_TSO_QUEUE;
2551 blackhole_queue[i] = END_TSO_QUEUE;
2552 sleeping_queue = END_TSO_QUEUE;
2553 }
2554 #elif !defined(THREADED_RTS)
2555 blocked_queue_hd = END_TSO_QUEUE;
2556 blocked_queue_tl = END_TSO_QUEUE;
2557 sleeping_queue = END_TSO_QUEUE;
2558 #endif
2559
2560 blackhole_queue = END_TSO_QUEUE;
2561
2562 context_switch = 0;
2563 sched_state = SCHED_RUNNING;
2564 recent_activity = ACTIVITY_YES;
2565
2566 #if defined(THREADED_RTS)
2567 /* Initialise the mutex and condition variables used by
2568 * the scheduler. */
2569 initMutex(&sched_mutex);
2570 #endif
2571
2572 ACQUIRE_LOCK(&sched_mutex);
2573
2574 /* A capability holds the state a native thread needs in
2575 * order to execute STG code. At least one capability is
2576 * floating around (only THREADED_RTS builds have more than one).
2577 */
2578 initCapabilities();
2579
2580 initTaskManager();
2581
2582 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2583 initSparkPools();
2584 #endif
2585
2586 #if defined(THREADED_RTS)
2587 /*
2588 * Eagerly start one worker to run each Capability, except for
2589 * Capability 0. The idea is that we're probably going to start a
2590 * bound thread on Capability 0 pretty soon, so we don't want a
2591 * worker task hogging it.
2592 */
2593 {
2594 nat i;
2595 Capability *cap;
2596 for (i = 1; i < n_capabilities; i++) {
2597 cap = &capabilities[i];
2598 ACQUIRE_LOCK(&cap->lock);
2599 startWorkerTask(cap, workerStart);
2600 RELEASE_LOCK(&cap->lock);
2601 }
2602 }
2603 #endif
2604
2605 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2606
2607 RELEASE_LOCK(&sched_mutex);
2608 }
2609
2610 void
2611 exitScheduler(
2612 rtsBool wait_foreign
2613 #if !defined(THREADED_RTS)
2614 __attribute__((unused))
2615 #endif
2616 )
2617 /* see Capability.c, shutdownCapability() */
2618 {
2619 Task *task = NULL;
2620
2621 #if defined(THREADED_RTS)
2622 ACQUIRE_LOCK(&sched_mutex);
2623 task = newBoundTask();
2624 RELEASE_LOCK(&sched_mutex);
2625 #endif
2626
2627 // If we haven't killed all the threads yet, do it now.
2628 if (sched_state < SCHED_SHUTTING_DOWN) {
2629 sched_state = SCHED_INTERRUPTING;
2630 scheduleDoGC(NULL,task,rtsFalse);
2631 }
2632 sched_state = SCHED_SHUTTING_DOWN;
2633
2634 #if defined(THREADED_RTS)
2635 {
2636 nat i;
2637
2638 for (i = 0; i < n_capabilities; i++) {
2639 shutdownCapability(&capabilities[i], task, wait_foreign);
2640 }
2641 boundTaskExiting(task);
2642 stopTaskManager();
2643 }
2644 #else
2645 freeCapability(&MainCapability);
2646 #endif
2647 }
2648
2649 void
2650 freeScheduler( void )
2651 {
2652 freeTaskManager();
2653 if (n_capabilities != 1) {
2654 stgFree(capabilities);
2655 }
2656 #if defined(THREADED_RTS)
2657 closeMutex(&sched_mutex);
2658 #endif
2659 }
2660
2661 /* -----------------------------------------------------------------------------
2662 performGC
2663
2664 This is the interface to the garbage collector from Haskell land.
2665 We provide this so that external C code can allocate and garbage
2666 collect when called from Haskell via _ccall_GC.
2667 -------------------------------------------------------------------------- */
2668
2669 static void
2670 performGC_(rtsBool force_major)
2671 {
2672 Task *task;
2673 // We must grab a new Task here, because the existing Task may be
2674 // associated with a particular Capability, and chained onto the
2675 // suspended_ccalling_tasks queue.
2676 ACQUIRE_LOCK(&sched_mutex);
2677 task = newBoundTask();
2678 RELEASE_LOCK(&sched_mutex);
2679 scheduleDoGC(NULL,task,force_major);
2680 boundTaskExiting(task);
2681 }
2682
2683 void
2684 performGC(void)
2685 {
2686 performGC_(rtsFalse);
2687 }
2688
2689 void
2690 performMajorGC(void)
2691 {
2692 performGC_(rtsTrue);
2693 }
2694
2695 /* -----------------------------------------------------------------------------
2696 Stack overflow
2697
2698 If the thread has reached its maximum stack size, then raise the
2699 StackOverflow exception in the offending thread. Otherwise
2700 relocate the TSO into a larger chunk of memory and adjust its stack
2701 size appropriately.
2702 -------------------------------------------------------------------------- */
2703
2704 static StgTSO *
2705 threadStackOverflow(Capability *cap, StgTSO *tso)
2706 {
2707 nat new_stack_size, stack_words;
2708 lnat new_tso_size;
2709 StgPtr new_sp;
2710 StgTSO *dest;
2711
2712 IF_DEBUG(sanity,checkTSO(tso));
2713
2714 // don't allow throwTo() to modify the blocked_exceptions queue
2715 // while we are moving the TSO:
2716 lockClosure((StgClosure *)tso);
2717
2718 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2719 // NB. never raise a StackOverflow exception if the thread is
2720 // inside Control.Exceptino.block. It is impractical to protect
2721 // against stack overflow exceptions, since virtually anything
2722 // can raise one (even 'catch'), so this is the only sensible
2723 // thing to do here. See bug #767.
2724
2725 debugTrace(DEBUG_gc,
2726 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2727 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2728 IF_DEBUG(gc,
2729 /* If we're debugging, just print out the top of the stack */
2730 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2731 tso->sp+64)));
2732
2733 // Send this thread the StackOverflow exception
2734 unlockTSO(tso);
2735 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2736 return tso;
2737 }
2738
2739 /* Try to double the current stack size. If that takes us over the
2740 * maximum stack size for this thread, then use the maximum instead.
2741 * Finally round up so the TSO ends up as a whole number of blocks.
2742 */
2743 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2744 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2745 TSO_STRUCT_SIZE)/sizeof(W_);
2746 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2747 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2748
2749 debugTrace(DEBUG_sched,
2750 "increasing stack size from %ld words to %d.",
2751 (long)tso->stack_size, new_stack_size);
2752
2753 dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2754 TICK_ALLOC_TSO(new_stack_size,0);
2755
2756 /* copy the TSO block and the old stack into the new area */
2757 memcpy(dest,tso,TSO_STRUCT_SIZE);
2758 stack_words = tso->stack + tso->stack_size - tso->sp;
2759 new_sp = (P_)dest + new_tso_size - stack_words;
2760 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2761
2762 /* relocate the stack pointers... */
2763 dest->sp = new_sp;
2764 dest->stack_size = new_stack_size;
2765
2766 /* Mark the old TSO as relocated. We have to check for relocated
2767 * TSOs in the garbage collector and any primops that deal with TSOs.
2768 *
2769 * It's important to set the sp value to just beyond the end
2770 * of the stack, so we don't attempt to scavenge any part of the
2771 * dead TSO's stack.
2772 */
2773 tso->what_next = ThreadRelocated;
2774 setTSOLink(cap,tso,dest);
2775 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2776 tso->why_blocked = NotBlocked;
2777
2778 IF_PAR_DEBUG(verbose,
2779 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2780 tso->id, tso, tso->stack_size);
2781 /* If we're debugging, just print out the top of the stack */
2782 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2783 tso->sp+64)));
2784
2785 unlockTSO(dest);
2786 unlockTSO(tso);
2787
2788 IF_DEBUG(sanity,checkTSO(dest));
2789 #if 0
2790 IF_DEBUG(scheduler,printTSO(dest));
2791 #endif
2792
2793 return dest;
2794 }
2795
2796 static StgTSO *
2797 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2798 {
2799 bdescr *bd, *new_bd;
2800 lnat new_tso_size_w, tso_size_w;
2801 StgTSO *new_tso;
2802
2803 tso_size_w = tso_sizeW(tso);
2804
2805 if (tso_size_w < MBLOCK_SIZE_W ||
2806 (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4)
2807 {
2808 return tso;
2809 }
2810
2811 // don't allow throwTo() to modify the blocked_exceptions queue
2812 // while we are moving the TSO:
2813 lockClosure((StgClosure *)tso);
2814
2815 new_tso_size_w = round_to_mblocks(tso_size_w/2);
2816
2817 debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2818 tso->id, tso_size_w, new_tso_size_w);
2819
2820 bd = Bdescr((StgPtr)tso);
2821 new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W);
2822
2823 new_tso = (StgTSO *)new_bd->start;
2824 memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2825 new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW;
2826
2827 tso->what_next = ThreadRelocated;
2828 tso->_link = new_tso; // no write barrier reqd: same generation
2829
2830 // The TSO attached to this Task may have moved, so update the
2831 // pointer to it.
2832 if (task->tso == tso) {
2833 task->tso = new_tso;
2834 }
2835
2836 unlockTSO(new_tso);
2837 unlockTSO(tso);
2838
2839 IF_DEBUG(sanity,checkTSO(new_tso));
2840
2841 return new_tso;
2842 }
2843
2844 /* ---------------------------------------------------------------------------
2845 Interrupt execution
2846 - usually called inside a signal handler so it mustn't do anything fancy.
2847 ------------------------------------------------------------------------ */
2848
2849 void
2850 interruptStgRts(void)
2851 {
2852 sched_state = SCHED_INTERRUPTING;
2853 context_switch = 1;
2854 wakeUpRts();
2855 }
2856
2857 /* -----------------------------------------------------------------------------
2858 Wake up the RTS
2859
2860 This function causes at least one OS thread to wake up and run the
2861 scheduler loop. It is invoked when the RTS might be deadlocked, or
2862 an external event has arrived that may need servicing (eg. a
2863 keyboard interrupt).
2864
2865 In the single-threaded RTS we don't do anything here; we only have
2866 one thread anyway, and the event that caused us to want to wake up
2867 will have interrupted any blocking system call in progress anyway.
2868 -------------------------------------------------------------------------- */
2869
2870 void
2871 wakeUpRts(void)
2872 {
2873 #if defined(THREADED_RTS)
2874 // This forces the IO Manager thread to wakeup, which will
2875 // in turn ensure that some OS thread wakes up and runs the
2876 // scheduler loop, which will cause a GC and deadlock check.
2877 ioManagerWakeup();
2878 #endif
2879 }
2880
2881 /* -----------------------------------------------------------------------------
2882 * checkBlackHoles()
2883 *
2884 * Check the blackhole_queue for threads that can be woken up. We do
2885 * this periodically: before every GC, and whenever the run queue is
2886 * empty.
2887 *
2888 * An elegant solution might be to just wake up all the blocked
2889 * threads with awakenBlockedQueue occasionally: they'll go back to
2890 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2891 * doesn't give us a way to tell whether we've actually managed to
2892 * wake up any threads, so we would be busy-waiting.
2893 *
2894 * -------------------------------------------------------------------------- */
2895
2896 static rtsBool
2897 checkBlackHoles (Capability *cap)
2898 {
2899 StgTSO **prev, *t;
2900 rtsBool any_woke_up = rtsFalse;
2901 StgHalfWord type;
2902
2903 // blackhole_queue is global:
2904 ASSERT_LOCK_HELD(&sched_mutex);
2905
2906 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2907
2908 // ASSUMES: sched_mutex
2909 prev = &blackhole_queue;
2910 t = blackhole_queue;
2911 while (t != END_TSO_QUEUE) {
2912 ASSERT(t->why_blocked == BlockedOnBlackHole);
2913 type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2914 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2915 IF_DEBUG(sanity,checkTSO(t));
2916 t = unblockOne(cap, t);
2917 // urk, the threads migrate to the current capability
2918 // here, but we'd like to keep them on the original one.
2919 *prev = t;
2920 any_woke_up = rtsTrue;
2921 } else {
2922 prev = &t->_link;
2923 t = t->_link;
2924 }
2925 }
2926
2927 return any_woke_up;
2928 }
2929
2930 /* -----------------------------------------------------------------------------
2931 Deleting threads
2932
2933 This is used for interruption (^C) and forking, and corresponds to
2934 raising an exception but without letting the thread catch the
2935 exception.
2936 -------------------------------------------------------------------------- */
2937
2938 static void
2939 deleteThread (Capability *cap, StgTSO *tso)
2940 {
2941 // NOTE: must only be called on a TSO that we have exclusive
2942 // access to, because we will call throwToSingleThreaded() below.
2943 // The TSO must be on the run queue of the Capability we own, or
2944 // we must own all Capabilities.
2945
2946 if (tso->why_blocked != BlockedOnCCall &&
2947 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2948 throwToSingleThreaded(cap,tso,NULL);
2949 }
2950 }
2951
2952 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2953 static void
2954 deleteThread_(Capability *cap, StgTSO *tso)
2955 { // for forkProcess only:
2956 // like deleteThread(), but we delete threads in foreign calls, too.
2957
2958 if (tso->why_blocked == BlockedOnCCall ||
2959 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2960 unblockOne(cap,tso);
2961 tso->what_next = ThreadKilled;
2962 } else {
2963 deleteThread(cap,tso);
2964 }
2965 }
2966 #endif
2967
2968 /* -----------------------------------------------------------------------------
2969 raiseExceptionHelper
2970
2971 This function is called by the raise# primitve, just so that we can
2972 move some of the tricky bits of raising an exception from C-- into
2973 C. Who knows, it might be a useful re-useable thing here too.
2974 -------------------------------------------------------------------------- */
2975
2976 StgWord
2977 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2978 {
2979 Capability *cap = regTableToCapability(reg);
2980 StgThunk *raise_closure = NULL;
2981 StgPtr p, next;
2982 StgRetInfoTable *info;
2983 //
2984 // This closure represents the expression 'raise# E' where E
2985 // is the exception raise. It is used to overwrite all the
2986 // thunks which are currently under evaluataion.
2987 //
2988
2989 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2990 // LDV profiling: stg_raise_info has THUNK as its closure
2991 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2992 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2993 // 1 does not cause any problem unless profiling is performed.
2994 // However, when LDV profiling goes on, we need to linearly scan
2995 // small object pool, where raise_closure is stored, so we should
2996 // use MIN_UPD_SIZE.
2997 //
2998 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2999 // sizeofW(StgClosure)+1);
3000 //
3001
3002 //
3003 // Walk up the stack, looking for the catch frame. On the way,
3004 // we update any closures pointed to from update frames with the
3005 // raise closure that we just built.
3006 //
3007 p = tso->sp;
3008 while(1) {
3009 info = get_ret_itbl((StgClosure *)p);
3010 next = p + stack_frame_sizeW((StgClosure *)p);
3011 switch (info->i.type) {
3012
3013 case UPDATE_FRAME:
3014 // Only create raise_closure if we need to.
3015 if (raise_closure == NULL) {
3016 raise_closure =
3017 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3018 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3019 raise_closure->payload[0] = exception;
3020 }
3021 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3022 p = next;
3023 continue;
3024
3025 case ATOMICALLY_FRAME:
3026 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3027 tso->sp = p;
3028 return ATOMICALLY_FRAME;
3029
3030 case CATCH_FRAME:
3031 tso->sp = p;
3032 return CATCH_FRAME;
3033
3034 case CATCH_STM_FRAME:
3035 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3036 tso->sp = p;
3037 return CATCH_STM_FRAME;
3038
3039 case STOP_FRAME:
3040 tso->sp = p;
3041 return STOP_FRAME;
3042
3043 case CATCH_RETRY_FRAME:
3044 default:
3045 p = next;
3046 continue;
3047 }
3048 }
3049 }
3050
3051
3052 /* -----------------------------------------------------------------------------
3053 findRetryFrameHelper
3054
3055 This function is called by the retry# primitive. It traverses the stack
3056 leaving tso->sp referring to the frame which should handle the retry.
3057
3058 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3059 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3060
3061 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3062 create) because retries are not considered to be exceptions, despite the
3063 similar implementation.
3064
3065 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3066 not be created within memory transactions.
3067 -------------------------------------------------------------------------- */
3068
3069 StgWord
3070 findRetryFrameHelper (StgTSO *tso)
3071 {
3072 StgPtr p, next;
3073 StgRetInfoTable *info;
3074
3075 p = tso -> sp;
3076 while (1) {
3077 info = get_ret_itbl((StgClosure *)p);
3078 next = p + stack_frame_sizeW((StgClosure *)p);
3079 switch (info->i.type) {
3080
3081 case ATOMICALLY_FRAME:
3082 debugTrace(DEBUG_stm,
3083 "found ATOMICALLY_FRAME at %p during retry", p);
3084 tso->sp = p;
3085 return ATOMICALLY_FRAME;
3086
3087 case CATCH_RETRY_FRAME:
3088 debugTrace(DEBUG_stm,
3089 "found CATCH_RETRY_FRAME at %p during retrry", p);
3090 tso->sp = p;
3091 return CATCH_RETRY_FRAME;
3092
3093 case CATCH_STM_FRAME: {
3094 StgTRecHeader *trec = tso -> trec;
3095 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3096 debugTrace(DEBUG_stm,
3097 "found CATCH_STM_FRAME at %p during retry", p);
3098 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3099 stmAbortTransaction(tso -> cap, trec);
3100 stmFreeAbortedTRec(tso -> cap, trec);
3101 tso -> trec = outer;
3102 p = next;
3103 continue;
3104 }
3105
3106
3107 default:
3108 ASSERT(info->i.type != CATCH_FRAME);
3109 ASSERT(info->i.type != STOP_FRAME);
3110 p = next;
3111 continue;
3112 }
3113 }
3114 }
3115
3116 /* -----------------------------------------------------------------------------
3117 resurrectThreads is called after garbage collection on the list of
3118 threads found to be garbage. Each of these threads will be woken
3119 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3120 on an MVar, or NonTermination if the thread was blocked on a Black
3121 Hole.
3122
3123 Locks: assumes we hold *all* the capabilities.
3124 -------------------------------------------------------------------------- */
3125
3126 void
3127 resurrectThreads (StgTSO *threads)
3128 {
3129 StgTSO *tso, *next;
3130 Capability *cap;
3131 step *step;
3132
3133 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3134 next = tso->global_link;
3135
3136 step = Bdescr((P_)tso)->step;
3137 tso->global_link = step->threads;
3138 step->threads = tso;
3139
3140 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3141
3142 // Wake up the thread on the Capability it was last on
3143 cap = tso->cap;
3144
3145 switch (tso->why_blocked) {
3146 case BlockedOnMVar:
3147 case BlockedOnException:
3148 /* Called by GC - sched_mutex lock is currently held. */
3149 throwToSingleThreaded(cap, tso,
3150 (StgClosure *)BlockedOnDeadMVar_closure);
3151 break;
3152 case BlockedOnBlackHole:
3153 throwToSingleThreaded(cap, tso,
3154 (StgClosure *)NonTermination_closure);
3155 break;
3156 case BlockedOnSTM:
3157 throwToSingleThreaded(cap, tso,
3158 (StgClosure *)BlockedIndefinitely_closure);
3159 break;
3160 case NotBlocked:
3161 /* This might happen if the thread was blocked on a black hole
3162 * belonging to a thread that we've just woken up (raiseAsync
3163 * can wake up threads, remember...).
3164 */
3165 continue;
3166 default:
3167 barf("resurrectThreads: thread blocked in a strange way");
3168 }
3169 }
3170 }
3171
3172 /* -----------------------------------------------------------------------------
3173 performPendingThrowTos is called after garbage collection, and
3174 passed a list of threads that were found to have pending throwTos
3175 (tso->blocked_exceptions was not empty), and were blocked.
3176 Normally this doesn't happen, because we would deliver the
3177 exception directly if the target thread is blocked, but there are
3178 small windows where it might occur on a multiprocessor (see
3179 throwTo()).
3180
3181 NB. we must be holding all the capabilities at this point, just
3182 like resurrectThreads().
3183 -------------------------------------------------------------------------- */
3184
3185 void
3186 performPendingThrowTos (StgTSO *threads)
3187 {
3188 StgTSO *tso, *next;
3189 Capability *cap;
3190 step *step;
3191
3192 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3193 next = tso->global_link;
3194
3195 step = Bdescr((P_)tso)->step;
3196 tso->global_link = step->threads;
3197 step->threads = tso;
3198
3199 debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
3200
3201 cap = tso->cap;
3202 maybePerformBlockedException(cap, tso);
3203 }
3204 }