FIX #1623: disable the timer signal when the system is idle (threaded RTS only)
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
36 # include "GranSim.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
40 # include "FetchMe.h"
41 # include "HLC.h"
42 #endif
43 #include "Sparks.h"
44 #include "Capability.h"
45 #include "Task.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
49 #endif
50 #include "Trace.h"
51 #include "RaiseAsync.h"
52 #include "Threads.h"
53 #include "ThrIOManager.h"
54
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
57 #endif
58 #ifdef HAVE_UNISTD_H
59 #include <unistd.h>
60 #endif
61
62 #include <string.h>
63 #include <stdlib.h>
64 #include <stdarg.h>
65
66 #ifdef HAVE_ERRNO_H
67 #include <errno.h>
68 #endif
69
70 // Turn off inlining when debugging - it obfuscates things
71 #ifdef DEBUG
72 # undef STATIC_INLINE
73 # define STATIC_INLINE static
74 #endif
75
76 /* -----------------------------------------------------------------------------
77 * Global variables
78 * -------------------------------------------------------------------------- */
79
80 #if defined(GRAN)
81
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
84
85 /*
86 In GranSim we have a runnable and a blocked queue for each processor.
87 In order to minimise code changes new arrays run_queue_hds/tls
88 are created. run_queue_hd is then a short cut (macro) for
89 run_queue_hds[CurrentProc] (see GranSim.h).
90 -- HWL
91 */
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96 the std RTS (i.e. we are cheating). However, we don't use this list in
97 the GranSim specific code at the moment (so we are only potentially
98 cheating). */
99
100 #else /* !GRAN */
101
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
107 #endif
108
109 /* Threads blocked on blackholes.
110 * LOCK: sched_mutex+capability, or all capabilities
111 */
112 StgTSO *blackhole_queue = NULL;
113 #endif
114
115 /* The blackhole_queue should be checked for threads to wake up. See
116 * Schedule.h for more thorough comment.
117 * LOCK: none (doesn't matter if we miss an update)
118 */
119 rtsBool blackholes_need_checking = rtsFalse;
120
121 /* Linked list of all threads.
122 * Used for detecting garbage collected threads.
123 * LOCK: sched_mutex+capability, or all capabilities
124 */
125 StgTSO *all_threads = NULL;
126
127 /* flag set by signal handler to precipitate a context switch
128 * LOCK: none (just an advisory flag)
129 */
130 int context_switch = 0;
131
132 /* flag that tracks whether we have done any execution in this time slice.
133 * LOCK: currently none, perhaps we should lock (but needs to be
134 * updated in the fast path of the scheduler).
135 */
136 nat recent_activity = ACTIVITY_YES;
137
138 /* if this flag is set as well, give up execution
139 * LOCK: none (changes once, from false->true)
140 */
141 rtsBool sched_state = SCHED_RUNNING;
142
143 #if defined(GRAN)
144 StgTSO *CurrentTSO;
145 #endif
146
147 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
148 * exists - earlier gccs apparently didn't.
149 * -= chak
150 */
151 StgTSO dummy_tso;
152
153 /*
154 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
155 * in an MT setting, needed to signal that a worker thread shouldn't hang around
156 * in the scheduler when it is out of work.
157 */
158 rtsBool shutting_down_scheduler = rtsFalse;
159
160 /*
161 * This mutex protects most of the global scheduler data in
162 * the THREADED_RTS runtime.
163 */
164 #if defined(THREADED_RTS)
165 Mutex sched_mutex;
166 #endif
167
168 #if defined(PARALLEL_HASKELL)
169 StgTSO *LastTSO;
170 rtsTime TimeOfLastYield;
171 rtsBool emitSchedule = rtsTrue;
172 #endif
173
174 #if !defined(mingw32_HOST_OS)
175 #define FORKPROCESS_PRIMOP_SUPPORTED
176 #endif
177
178 /* -----------------------------------------------------------------------------
179 * static function prototypes
180 * -------------------------------------------------------------------------- */
181
182 static Capability *schedule (Capability *initialCapability, Task *task);
183
184 //
185 // These function all encapsulate parts of the scheduler loop, and are
186 // abstracted only to make the structure and control flow of the
187 // scheduler clearer.
188 //
189 static void schedulePreLoop (void);
190 #if defined(THREADED_RTS)
191 static void schedulePushWork(Capability *cap, Task *task);
192 #endif
193 static void scheduleStartSignalHandlers (Capability *cap);
194 static void scheduleCheckBlockedThreads (Capability *cap);
195 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
196 static void scheduleCheckBlackHoles (Capability *cap);
197 static void scheduleDetectDeadlock (Capability *cap, Task *task);
198 #if defined(GRAN)
199 static StgTSO *scheduleProcessEvent(rtsEvent *event);
200 #endif
201 #if defined(PARALLEL_HASKELL)
202 static StgTSO *scheduleSendPendingMessages(void);
203 static void scheduleActivateSpark(void);
204 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
205 #endif
206 #if defined(PAR) || defined(GRAN)
207 static void scheduleGranParReport(void);
208 #endif
209 static void schedulePostRunThread(void);
210 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
211 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
212 StgTSO *t);
213 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
214 nat prev_what_next );
215 static void scheduleHandleThreadBlocked( StgTSO *t );
216 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
217 StgTSO *t );
218 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
219 static Capability *scheduleDoGC(Capability *cap, Task *task,
220 rtsBool force_major);
221
222 static rtsBool checkBlackHoles(Capability *cap);
223
224 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
225
226 static void deleteThread (Capability *cap, StgTSO *tso);
227 static void deleteAllThreads (Capability *cap);
228
229 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
230 static void deleteThread_(Capability *cap, StgTSO *tso);
231 #endif
232
233 #if defined(PARALLEL_HASKELL)
234 StgTSO * createSparkThread(rtsSpark spark);
235 StgTSO * activateSpark (rtsSpark spark);
236 #endif
237
238 #ifdef DEBUG
239 static char *whatNext_strs[] = {
240 "(unknown)",
241 "ThreadRunGHC",
242 "ThreadInterpret",
243 "ThreadKilled",
244 "ThreadRelocated",
245 "ThreadComplete"
246 };
247 #endif
248
249 /* -----------------------------------------------------------------------------
250 * Putting a thread on the run queue: different scheduling policies
251 * -------------------------------------------------------------------------- */
252
253 STATIC_INLINE void
254 addToRunQueue( Capability *cap, StgTSO *t )
255 {
256 #if defined(PARALLEL_HASKELL)
257 if (RtsFlags.ParFlags.doFairScheduling) {
258 // this does round-robin scheduling; good for concurrency
259 appendToRunQueue(cap,t);
260 } else {
261 // this does unfair scheduling; good for parallelism
262 pushOnRunQueue(cap,t);
263 }
264 #else
265 // this does round-robin scheduling; good for concurrency
266 appendToRunQueue(cap,t);
267 #endif
268 }
269
270 /* ---------------------------------------------------------------------------
271 Main scheduling loop.
272
273 We use round-robin scheduling, each thread returning to the
274 scheduler loop when one of these conditions is detected:
275
276 * out of heap space
277 * timer expires (thread yields)
278 * thread blocks
279 * thread ends
280 * stack overflow
281
282 GRAN version:
283 In a GranSim setup this loop iterates over the global event queue.
284 This revolves around the global event queue, which determines what
285 to do next. Therefore, it's more complicated than either the
286 concurrent or the parallel (GUM) setup.
287
288 GUM version:
289 GUM iterates over incoming messages.
290 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
291 and sends out a fish whenever it has nothing to do; in-between
292 doing the actual reductions (shared code below) it processes the
293 incoming messages and deals with delayed operations
294 (see PendingFetches).
295 This is not the ugliest code you could imagine, but it's bloody close.
296
297 ------------------------------------------------------------------------ */
298
299 static Capability *
300 schedule (Capability *initialCapability, Task *task)
301 {
302 StgTSO *t;
303 Capability *cap;
304 StgThreadReturnCode ret;
305 #if defined(GRAN)
306 rtsEvent *event;
307 #elif defined(PARALLEL_HASKELL)
308 StgTSO *tso;
309 GlobalTaskId pe;
310 rtsBool receivedFinish = rtsFalse;
311 # if defined(DEBUG)
312 nat tp_size, sp_size; // stats only
313 # endif
314 #endif
315 nat prev_what_next;
316 rtsBool ready_to_gc;
317 #if defined(THREADED_RTS)
318 rtsBool first = rtsTrue;
319 #endif
320
321 cap = initialCapability;
322
323 // Pre-condition: this task owns initialCapability.
324 // The sched_mutex is *NOT* held
325 // NB. on return, we still hold a capability.
326
327 debugTrace (DEBUG_sched,
328 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
329 task, initialCapability);
330
331 schedulePreLoop();
332
333 // -----------------------------------------------------------
334 // Scheduler loop starts here:
335
336 #if defined(PARALLEL_HASKELL)
337 #define TERMINATION_CONDITION (!receivedFinish)
338 #elif defined(GRAN)
339 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
340 #else
341 #define TERMINATION_CONDITION rtsTrue
342 #endif
343
344 while (TERMINATION_CONDITION) {
345
346 #if defined(GRAN)
347 /* Choose the processor with the next event */
348 CurrentProc = event->proc;
349 CurrentTSO = event->tso;
350 #endif
351
352 #if defined(THREADED_RTS)
353 if (first) {
354 // don't yield the first time, we want a chance to run this
355 // thread for a bit, even if there are others banging at the
356 // door.
357 first = rtsFalse;
358 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
359 } else {
360 // Yield the capability to higher-priority tasks if necessary.
361 yieldCapability(&cap, task);
362 }
363 #endif
364
365 #if defined(THREADED_RTS)
366 schedulePushWork(cap,task);
367 #endif
368
369 // Check whether we have re-entered the RTS from Haskell without
370 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
371 // call).
372 if (cap->in_haskell) {
373 errorBelch("schedule: re-entered unsafely.\n"
374 " Perhaps a 'foreign import unsafe' should be 'safe'?");
375 stg_exit(EXIT_FAILURE);
376 }
377
378 // The interruption / shutdown sequence.
379 //
380 // In order to cleanly shut down the runtime, we want to:
381 // * make sure that all main threads return to their callers
382 // with the state 'Interrupted'.
383 // * clean up all OS threads assocated with the runtime
384 // * free all memory etc.
385 //
386 // So the sequence for ^C goes like this:
387 //
388 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
389 // arranges for some Capability to wake up
390 //
391 // * all threads in the system are halted, and the zombies are
392 // placed on the run queue for cleaning up. We acquire all
393 // the capabilities in order to delete the threads, this is
394 // done by scheduleDoGC() for convenience (because GC already
395 // needs to acquire all the capabilities). We can't kill
396 // threads involved in foreign calls.
397 //
398 // * somebody calls shutdownHaskell(), which calls exitScheduler()
399 //
400 // * sched_state := SCHED_SHUTTING_DOWN
401 //
402 // * all workers exit when the run queue on their capability
403 // drains. All main threads will also exit when their TSO
404 // reaches the head of the run queue and they can return.
405 //
406 // * eventually all Capabilities will shut down, and the RTS can
407 // exit.
408 //
409 // * We might be left with threads blocked in foreign calls,
410 // we should really attempt to kill these somehow (TODO);
411
412 switch (sched_state) {
413 case SCHED_RUNNING:
414 break;
415 case SCHED_INTERRUPTING:
416 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
417 #if defined(THREADED_RTS)
418 discardSparksCap(cap);
419 #endif
420 /* scheduleDoGC() deletes all the threads */
421 cap = scheduleDoGC(cap,task,rtsFalse);
422 break;
423 case SCHED_SHUTTING_DOWN:
424 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
425 // If we are a worker, just exit. If we're a bound thread
426 // then we will exit below when we've removed our TSO from
427 // the run queue.
428 if (task->tso == NULL && emptyRunQueue(cap)) {
429 return cap;
430 }
431 break;
432 default:
433 barf("sched_state: %d", sched_state);
434 }
435
436 #if defined(THREADED_RTS)
437 // If the run queue is empty, take a spark and turn it into a thread.
438 {
439 if (emptyRunQueue(cap)) {
440 StgClosure *spark;
441 spark = findSpark(cap);
442 if (spark != NULL) {
443 debugTrace(DEBUG_sched,
444 "turning spark of closure %p into a thread",
445 (StgClosure *)spark);
446 createSparkThread(cap,spark);
447 }
448 }
449 }
450 #endif // THREADED_RTS
451
452 scheduleStartSignalHandlers(cap);
453
454 // Only check the black holes here if we've nothing else to do.
455 // During normal execution, the black hole list only gets checked
456 // at GC time, to avoid repeatedly traversing this possibly long
457 // list each time around the scheduler.
458 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
459
460 scheduleCheckWakeupThreads(cap);
461
462 scheduleCheckBlockedThreads(cap);
463
464 scheduleDetectDeadlock(cap,task);
465 #if defined(THREADED_RTS)
466 cap = task->cap; // reload cap, it might have changed
467 #endif
468
469 // Normally, the only way we can get here with no threads to
470 // run is if a keyboard interrupt received during
471 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
472 // Additionally, it is not fatal for the
473 // threaded RTS to reach here with no threads to run.
474 //
475 // win32: might be here due to awaitEvent() being abandoned
476 // as a result of a console event having been delivered.
477 if ( emptyRunQueue(cap) ) {
478 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
479 ASSERT(sched_state >= SCHED_INTERRUPTING);
480 #endif
481 continue; // nothing to do
482 }
483
484 #if defined(PARALLEL_HASKELL)
485 scheduleSendPendingMessages();
486 if (emptyRunQueue(cap) && scheduleActivateSpark())
487 continue;
488
489 #if defined(SPARKS)
490 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
491 #endif
492
493 /* If we still have no work we need to send a FISH to get a spark
494 from another PE */
495 if (emptyRunQueue(cap)) {
496 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
497 ASSERT(rtsFalse); // should not happen at the moment
498 }
499 // from here: non-empty run queue.
500 // TODO: merge above case with this, only one call processMessages() !
501 if (PacketsWaiting()) { /* process incoming messages, if
502 any pending... only in else
503 because getRemoteWork waits for
504 messages as well */
505 receivedFinish = processMessages();
506 }
507 #endif
508
509 #if defined(GRAN)
510 scheduleProcessEvent(event);
511 #endif
512
513 //
514 // Get a thread to run
515 //
516 t = popRunQueue(cap);
517
518 #if defined(GRAN) || defined(PAR)
519 scheduleGranParReport(); // some kind of debuging output
520 #else
521 // Sanity check the thread we're about to run. This can be
522 // expensive if there is lots of thread switching going on...
523 IF_DEBUG(sanity,checkTSO(t));
524 #endif
525
526 #if defined(THREADED_RTS)
527 // Check whether we can run this thread in the current task.
528 // If not, we have to pass our capability to the right task.
529 {
530 Task *bound = t->bound;
531
532 if (bound) {
533 if (bound == task) {
534 debugTrace(DEBUG_sched,
535 "### Running thread %lu in bound thread", (unsigned long)t->id);
536 // yes, the Haskell thread is bound to the current native thread
537 } else {
538 debugTrace(DEBUG_sched,
539 "### thread %lu bound to another OS thread", (unsigned long)t->id);
540 // no, bound to a different Haskell thread: pass to that thread
541 pushOnRunQueue(cap,t);
542 continue;
543 }
544 } else {
545 // The thread we want to run is unbound.
546 if (task->tso) {
547 debugTrace(DEBUG_sched,
548 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
549 // no, the current native thread is bound to a different
550 // Haskell thread, so pass it to any worker thread
551 pushOnRunQueue(cap,t);
552 continue;
553 }
554 }
555 }
556 #endif
557
558 cap->r.rCurrentTSO = t;
559
560 /* context switches are initiated by the timer signal, unless
561 * the user specified "context switch as often as possible", with
562 * +RTS -C0
563 */
564 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
565 && !emptyThreadQueues(cap)) {
566 context_switch = 1;
567 }
568
569 run_thread:
570
571 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
572 (long)t->id, whatNext_strs[t->what_next]);
573
574 startHeapProfTimer();
575
576 // Check for exceptions blocked on this thread
577 maybePerformBlockedException (cap, t);
578
579 // ----------------------------------------------------------------------
580 // Run the current thread
581
582 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
583 ASSERT(t->cap == cap);
584
585 prev_what_next = t->what_next;
586
587 errno = t->saved_errno;
588 #if mingw32_HOST_OS
589 SetLastError(t->saved_winerror);
590 #endif
591
592 cap->in_haskell = rtsTrue;
593
594 dirtyTSO(t);
595
596 #if defined(THREADED_RTS)
597 if (recent_activity == ACTIVITY_DONE_GC) {
598 // ACTIVITY_DONE_GC means we turned off the timer signal to
599 // conserve power (see #1623). Re-enable it here.
600 nat prev;
601 prev = xchg(&recent_activity, ACTIVITY_YES);
602 if (prev == ACTIVITY_DONE_GC) {
603 startTimer();
604 }
605 } else {
606 recent_activity = ACTIVITY_YES;
607 }
608 #endif
609
610 switch (prev_what_next) {
611
612 case ThreadKilled:
613 case ThreadComplete:
614 /* Thread already finished, return to scheduler. */
615 ret = ThreadFinished;
616 break;
617
618 case ThreadRunGHC:
619 {
620 StgRegTable *r;
621 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
622 cap = regTableToCapability(r);
623 ret = r->rRet;
624 break;
625 }
626
627 case ThreadInterpret:
628 cap = interpretBCO(cap);
629 ret = cap->r.rRet;
630 break;
631
632 default:
633 barf("schedule: invalid what_next field");
634 }
635
636 cap->in_haskell = rtsFalse;
637
638 // The TSO might have moved, eg. if it re-entered the RTS and a GC
639 // happened. So find the new location:
640 t = cap->r.rCurrentTSO;
641
642 // We have run some Haskell code: there might be blackhole-blocked
643 // threads to wake up now.
644 // Lock-free test here should be ok, we're just setting a flag.
645 if ( blackhole_queue != END_TSO_QUEUE ) {
646 blackholes_need_checking = rtsTrue;
647 }
648
649 // And save the current errno in this thread.
650 // XXX: possibly bogus for SMP because this thread might already
651 // be running again, see code below.
652 t->saved_errno = errno;
653 #if mingw32_HOST_OS
654 // Similarly for Windows error code
655 t->saved_winerror = GetLastError();
656 #endif
657
658 #if defined(THREADED_RTS)
659 // If ret is ThreadBlocked, and this Task is bound to the TSO that
660 // blocked, we are in limbo - the TSO is now owned by whatever it
661 // is blocked on, and may in fact already have been woken up,
662 // perhaps even on a different Capability. It may be the case
663 // that task->cap != cap. We better yield this Capability
664 // immediately and return to normaility.
665 if (ret == ThreadBlocked) {
666 debugTrace(DEBUG_sched,
667 "--<< thread %lu (%s) stopped: blocked",
668 (unsigned long)t->id, whatNext_strs[t->what_next]);
669 continue;
670 }
671 #endif
672
673 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
674 ASSERT(t->cap == cap);
675
676 // ----------------------------------------------------------------------
677
678 // Costs for the scheduler are assigned to CCS_SYSTEM
679 stopHeapProfTimer();
680 #if defined(PROFILING)
681 CCCS = CCS_SYSTEM;
682 #endif
683
684 schedulePostRunThread();
685
686 ready_to_gc = rtsFalse;
687
688 switch (ret) {
689 case HeapOverflow:
690 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
691 break;
692
693 case StackOverflow:
694 scheduleHandleStackOverflow(cap,task,t);
695 break;
696
697 case ThreadYielding:
698 if (scheduleHandleYield(cap, t, prev_what_next)) {
699 // shortcut for switching between compiler/interpreter:
700 goto run_thread;
701 }
702 break;
703
704 case ThreadBlocked:
705 scheduleHandleThreadBlocked(t);
706 break;
707
708 case ThreadFinished:
709 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
710 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
711 break;
712
713 default:
714 barf("schedule: invalid thread return code %d", (int)ret);
715 }
716
717 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
718 cap = scheduleDoGC(cap,task,rtsFalse);
719 }
720 } /* end of while() */
721
722 debugTrace(PAR_DEBUG_verbose,
723 "== Leaving schedule() after having received Finish");
724 }
725
726 /* ----------------------------------------------------------------------------
727 * Setting up the scheduler loop
728 * ------------------------------------------------------------------------- */
729
730 static void
731 schedulePreLoop(void)
732 {
733 #if defined(GRAN)
734 /* set up first event to get things going */
735 /* ToDo: assign costs for system setup and init MainTSO ! */
736 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
737 ContinueThread,
738 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
739
740 debugTrace (DEBUG_gran,
741 "GRAN: Init CurrentTSO (in schedule) = %p",
742 CurrentTSO);
743 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
744
745 if (RtsFlags.GranFlags.Light) {
746 /* Save current time; GranSim Light only */
747 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
748 }
749 #endif
750 }
751
752 /* -----------------------------------------------------------------------------
753 * schedulePushWork()
754 *
755 * Push work to other Capabilities if we have some.
756 * -------------------------------------------------------------------------- */
757
758 #if defined(THREADED_RTS)
759 static void
760 schedulePushWork(Capability *cap USED_IF_THREADS,
761 Task *task USED_IF_THREADS)
762 {
763 Capability *free_caps[n_capabilities], *cap0;
764 nat i, n_free_caps;
765
766 // migration can be turned off with +RTS -qg
767 if (!RtsFlags.ParFlags.migrate) return;
768
769 // Check whether we have more threads on our run queue, or sparks
770 // in our pool, that we could hand to another Capability.
771 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
772 && sparkPoolSizeCap(cap) < 2) {
773 return;
774 }
775
776 // First grab as many free Capabilities as we can.
777 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
778 cap0 = &capabilities[i];
779 if (cap != cap0 && tryGrabCapability(cap0,task)) {
780 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
781 // it already has some work, we just grabbed it at
782 // the wrong moment. Or maybe it's deadlocked!
783 releaseCapability(cap0);
784 } else {
785 free_caps[n_free_caps++] = cap0;
786 }
787 }
788 }
789
790 // we now have n_free_caps free capabilities stashed in
791 // free_caps[]. Share our run queue equally with them. This is
792 // probably the simplest thing we could do; improvements we might
793 // want to do include:
794 //
795 // - giving high priority to moving relatively new threads, on
796 // the gournds that they haven't had time to build up a
797 // working set in the cache on this CPU/Capability.
798 //
799 // - giving low priority to moving long-lived threads
800
801 if (n_free_caps > 0) {
802 StgTSO *prev, *t, *next;
803 rtsBool pushed_to_all;
804
805 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
806
807 i = 0;
808 pushed_to_all = rtsFalse;
809
810 if (cap->run_queue_hd != END_TSO_QUEUE) {
811 prev = cap->run_queue_hd;
812 t = prev->link;
813 prev->link = END_TSO_QUEUE;
814 for (; t != END_TSO_QUEUE; t = next) {
815 next = t->link;
816 t->link = END_TSO_QUEUE;
817 if (t->what_next == ThreadRelocated
818 || t->bound == task // don't move my bound thread
819 || tsoLocked(t)) { // don't move a locked thread
820 prev->link = t;
821 prev = t;
822 } else if (i == n_free_caps) {
823 pushed_to_all = rtsTrue;
824 i = 0;
825 // keep one for us
826 prev->link = t;
827 prev = t;
828 } else {
829 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
830 appendToRunQueue(free_caps[i],t);
831 if (t->bound) { t->bound->cap = free_caps[i]; }
832 t->cap = free_caps[i];
833 i++;
834 }
835 }
836 cap->run_queue_tl = prev;
837 }
838
839 // If there are some free capabilities that we didn't push any
840 // threads to, then try to push a spark to each one.
841 if (!pushed_to_all) {
842 StgClosure *spark;
843 // i is the next free capability to push to
844 for (; i < n_free_caps; i++) {
845 if (emptySparkPoolCap(free_caps[i])) {
846 spark = findSpark(cap);
847 if (spark != NULL) {
848 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
849 newSpark(&(free_caps[i]->r), spark);
850 }
851 }
852 }
853 }
854
855 // release the capabilities
856 for (i = 0; i < n_free_caps; i++) {
857 task->cap = free_caps[i];
858 releaseCapability(free_caps[i]);
859 }
860 }
861 task->cap = cap; // reset to point to our Capability.
862 }
863 #endif
864
865 /* ----------------------------------------------------------------------------
866 * Start any pending signal handlers
867 * ------------------------------------------------------------------------- */
868
869 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
870 static void
871 scheduleStartSignalHandlers(Capability *cap)
872 {
873 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
874 // safe outside the lock
875 startSignalHandlers(cap);
876 }
877 }
878 #else
879 static void
880 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
881 {
882 }
883 #endif
884
885 /* ----------------------------------------------------------------------------
886 * Check for blocked threads that can be woken up.
887 * ------------------------------------------------------------------------- */
888
889 static void
890 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
891 {
892 #if !defined(THREADED_RTS)
893 //
894 // Check whether any waiting threads need to be woken up. If the
895 // run queue is empty, and there are no other tasks running, we
896 // can wait indefinitely for something to happen.
897 //
898 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
899 {
900 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
901 }
902 #endif
903 }
904
905
906 /* ----------------------------------------------------------------------------
907 * Check for threads woken up by other Capabilities
908 * ------------------------------------------------------------------------- */
909
910 static void
911 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
912 {
913 #if defined(THREADED_RTS)
914 // Any threads that were woken up by other Capabilities get
915 // appended to our run queue.
916 if (!emptyWakeupQueue(cap)) {
917 ACQUIRE_LOCK(&cap->lock);
918 if (emptyRunQueue(cap)) {
919 cap->run_queue_hd = cap->wakeup_queue_hd;
920 cap->run_queue_tl = cap->wakeup_queue_tl;
921 } else {
922 cap->run_queue_tl->link = cap->wakeup_queue_hd;
923 cap->run_queue_tl = cap->wakeup_queue_tl;
924 }
925 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
926 RELEASE_LOCK(&cap->lock);
927 }
928 #endif
929 }
930
931 /* ----------------------------------------------------------------------------
932 * Check for threads blocked on BLACKHOLEs that can be woken up
933 * ------------------------------------------------------------------------- */
934 static void
935 scheduleCheckBlackHoles (Capability *cap)
936 {
937 if ( blackholes_need_checking ) // check without the lock first
938 {
939 ACQUIRE_LOCK(&sched_mutex);
940 if ( blackholes_need_checking ) {
941 checkBlackHoles(cap);
942 blackholes_need_checking = rtsFalse;
943 }
944 RELEASE_LOCK(&sched_mutex);
945 }
946 }
947
948 /* ----------------------------------------------------------------------------
949 * Detect deadlock conditions and attempt to resolve them.
950 * ------------------------------------------------------------------------- */
951
952 static void
953 scheduleDetectDeadlock (Capability *cap, Task *task)
954 {
955
956 #if defined(PARALLEL_HASKELL)
957 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
958 return;
959 #endif
960
961 /*
962 * Detect deadlock: when we have no threads to run, there are no
963 * threads blocked, waiting for I/O, or sleeping, and all the
964 * other tasks are waiting for work, we must have a deadlock of
965 * some description.
966 */
967 if ( emptyThreadQueues(cap) )
968 {
969 #if defined(THREADED_RTS)
970 /*
971 * In the threaded RTS, we only check for deadlock if there
972 * has been no activity in a complete timeslice. This means
973 * we won't eagerly start a full GC just because we don't have
974 * any threads to run currently.
975 */
976 if (recent_activity != ACTIVITY_INACTIVE) return;
977 #endif
978
979 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
980
981 // Garbage collection can release some new threads due to
982 // either (a) finalizers or (b) threads resurrected because
983 // they are unreachable and will therefore be sent an
984 // exception. Any threads thus released will be immediately
985 // runnable.
986 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
987
988 recent_activity = ACTIVITY_DONE_GC;
989 // disable timer signals (see #1623)
990 stopTimer();
991
992 if ( !emptyRunQueue(cap) ) return;
993
994 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
995 /* If we have user-installed signal handlers, then wait
996 * for signals to arrive rather then bombing out with a
997 * deadlock.
998 */
999 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1000 debugTrace(DEBUG_sched,
1001 "still deadlocked, waiting for signals...");
1002
1003 awaitUserSignals();
1004
1005 if (signals_pending()) {
1006 startSignalHandlers(cap);
1007 }
1008
1009 // either we have threads to run, or we were interrupted:
1010 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1011 }
1012 #endif
1013
1014 #if !defined(THREADED_RTS)
1015 /* Probably a real deadlock. Send the current main thread the
1016 * Deadlock exception.
1017 */
1018 if (task->tso) {
1019 switch (task->tso->why_blocked) {
1020 case BlockedOnSTM:
1021 case BlockedOnBlackHole:
1022 case BlockedOnException:
1023 case BlockedOnMVar:
1024 throwToSingleThreaded(cap, task->tso,
1025 (StgClosure *)NonTermination_closure);
1026 return;
1027 default:
1028 barf("deadlock: main thread blocked in a strange way");
1029 }
1030 }
1031 return;
1032 #endif
1033 }
1034 }
1035
1036 /* ----------------------------------------------------------------------------
1037 * Process an event (GRAN only)
1038 * ------------------------------------------------------------------------- */
1039
1040 #if defined(GRAN)
1041 static StgTSO *
1042 scheduleProcessEvent(rtsEvent *event)
1043 {
1044 StgTSO *t;
1045
1046 if (RtsFlags.GranFlags.Light)
1047 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1048
1049 /* adjust time based on time-stamp */
1050 if (event->time > CurrentTime[CurrentProc] &&
1051 event->evttype != ContinueThread)
1052 CurrentTime[CurrentProc] = event->time;
1053
1054 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1055 if (!RtsFlags.GranFlags.Light)
1056 handleIdlePEs();
1057
1058 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1059
1060 /* main event dispatcher in GranSim */
1061 switch (event->evttype) {
1062 /* Should just be continuing execution */
1063 case ContinueThread:
1064 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1065 /* ToDo: check assertion
1066 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1067 run_queue_hd != END_TSO_QUEUE);
1068 */
1069 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1070 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1071 procStatus[CurrentProc]==Fetching) {
1072 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1073 CurrentTSO->id, CurrentTSO, CurrentProc);
1074 goto next_thread;
1075 }
1076 /* Ignore ContinueThreads for completed threads */
1077 if (CurrentTSO->what_next == ThreadComplete) {
1078 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1079 CurrentTSO->id, CurrentTSO, CurrentProc);
1080 goto next_thread;
1081 }
1082 /* Ignore ContinueThreads for threads that are being migrated */
1083 if (PROCS(CurrentTSO)==Nowhere) {
1084 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1085 CurrentTSO->id, CurrentTSO, CurrentProc);
1086 goto next_thread;
1087 }
1088 /* The thread should be at the beginning of the run queue */
1089 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1090 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1091 CurrentTSO->id, CurrentTSO, CurrentProc);
1092 break; // run the thread anyway
1093 }
1094 /*
1095 new_event(proc, proc, CurrentTime[proc],
1096 FindWork,
1097 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1098 goto next_thread;
1099 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1100 break; // now actually run the thread; DaH Qu'vam yImuHbej
1101
1102 case FetchNode:
1103 do_the_fetchnode(event);
1104 goto next_thread; /* handle next event in event queue */
1105
1106 case GlobalBlock:
1107 do_the_globalblock(event);
1108 goto next_thread; /* handle next event in event queue */
1109
1110 case FetchReply:
1111 do_the_fetchreply(event);
1112 goto next_thread; /* handle next event in event queue */
1113
1114 case UnblockThread: /* Move from the blocked queue to the tail of */
1115 do_the_unblock(event);
1116 goto next_thread; /* handle next event in event queue */
1117
1118 case ResumeThread: /* Move from the blocked queue to the tail of */
1119 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1120 event->tso->gran.blocktime +=
1121 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1122 do_the_startthread(event);
1123 goto next_thread; /* handle next event in event queue */
1124
1125 case StartThread:
1126 do_the_startthread(event);
1127 goto next_thread; /* handle next event in event queue */
1128
1129 case MoveThread:
1130 do_the_movethread(event);
1131 goto next_thread; /* handle next event in event queue */
1132
1133 case MoveSpark:
1134 do_the_movespark(event);
1135 goto next_thread; /* handle next event in event queue */
1136
1137 case FindWork:
1138 do_the_findwork(event);
1139 goto next_thread; /* handle next event in event queue */
1140
1141 default:
1142 barf("Illegal event type %u\n", event->evttype);
1143 } /* switch */
1144
1145 /* This point was scheduler_loop in the old RTS */
1146
1147 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1148
1149 TimeOfLastEvent = CurrentTime[CurrentProc];
1150 TimeOfNextEvent = get_time_of_next_event();
1151 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1152 // CurrentTSO = ThreadQueueHd;
1153
1154 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1155 TimeOfNextEvent));
1156
1157 if (RtsFlags.GranFlags.Light)
1158 GranSimLight_leave_system(event, &ActiveTSO);
1159
1160 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1161
1162 IF_DEBUG(gran,
1163 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1164
1165 /* in a GranSim setup the TSO stays on the run queue */
1166 t = CurrentTSO;
1167 /* Take a thread from the run queue. */
1168 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1169
1170 IF_DEBUG(gran,
1171 debugBelch("GRAN: About to run current thread, which is\n");
1172 G_TSO(t,5));
1173
1174 context_switch = 0; // turned on via GranYield, checking events and time slice
1175
1176 IF_DEBUG(gran,
1177 DumpGranEvent(GR_SCHEDULE, t));
1178
1179 procStatus[CurrentProc] = Busy;
1180 }
1181 #endif // GRAN
1182
1183 /* ----------------------------------------------------------------------------
1184 * Send pending messages (PARALLEL_HASKELL only)
1185 * ------------------------------------------------------------------------- */
1186
1187 #if defined(PARALLEL_HASKELL)
1188 static StgTSO *
1189 scheduleSendPendingMessages(void)
1190 {
1191 StgSparkPool *pool;
1192 rtsSpark spark;
1193 StgTSO *t;
1194
1195 # if defined(PAR) // global Mem.Mgmt., omit for now
1196 if (PendingFetches != END_BF_QUEUE) {
1197 processFetches();
1198 }
1199 # endif
1200
1201 if (RtsFlags.ParFlags.BufferTime) {
1202 // if we use message buffering, we must send away all message
1203 // packets which have become too old...
1204 sendOldBuffers();
1205 }
1206 }
1207 #endif
1208
1209 /* ----------------------------------------------------------------------------
1210 * Activate spark threads (PARALLEL_HASKELL only)
1211 * ------------------------------------------------------------------------- */
1212
1213 #if defined(PARALLEL_HASKELL)
1214 static void
1215 scheduleActivateSpark(void)
1216 {
1217 #if defined(SPARKS)
1218 ASSERT(emptyRunQueue());
1219 /* We get here if the run queue is empty and want some work.
1220 We try to turn a spark into a thread, and add it to the run queue,
1221 from where it will be picked up in the next iteration of the scheduler
1222 loop.
1223 */
1224
1225 /* :-[ no local threads => look out for local sparks */
1226 /* the spark pool for the current PE */
1227 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1228 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1229 pool->hd < pool->tl) {
1230 /*
1231 * ToDo: add GC code check that we really have enough heap afterwards!!
1232 * Old comment:
1233 * If we're here (no runnable threads) and we have pending
1234 * sparks, we must have a space problem. Get enough space
1235 * to turn one of those pending sparks into a
1236 * thread...
1237 */
1238
1239 spark = findSpark(rtsFalse); /* get a spark */
1240 if (spark != (rtsSpark) NULL) {
1241 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1242 IF_PAR_DEBUG(fish, // schedule,
1243 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1244 tso->id, tso, advisory_thread_count));
1245
1246 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1247 IF_PAR_DEBUG(fish, // schedule,
1248 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1249 spark));
1250 return rtsFalse; /* failed to generate a thread */
1251 } /* otherwise fall through & pick-up new tso */
1252 } else {
1253 IF_PAR_DEBUG(fish, // schedule,
1254 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1255 spark_queue_len(pool)));
1256 return rtsFalse; /* failed to generate a thread */
1257 }
1258 return rtsTrue; /* success in generating a thread */
1259 } else { /* no more threads permitted or pool empty */
1260 return rtsFalse; /* failed to generateThread */
1261 }
1262 #else
1263 tso = NULL; // avoid compiler warning only
1264 return rtsFalse; /* dummy in non-PAR setup */
1265 #endif // SPARKS
1266 }
1267 #endif // PARALLEL_HASKELL
1268
1269 /* ----------------------------------------------------------------------------
1270 * Get work from a remote node (PARALLEL_HASKELL only)
1271 * ------------------------------------------------------------------------- */
1272
1273 #if defined(PARALLEL_HASKELL)
1274 static rtsBool
1275 scheduleGetRemoteWork(rtsBool *receivedFinish)
1276 {
1277 ASSERT(emptyRunQueue());
1278
1279 if (RtsFlags.ParFlags.BufferTime) {
1280 IF_PAR_DEBUG(verbose,
1281 debugBelch("...send all pending data,"));
1282 {
1283 nat i;
1284 for (i=1; i<=nPEs; i++)
1285 sendImmediately(i); // send all messages away immediately
1286 }
1287 }
1288 # ifndef SPARKS
1289 //++EDEN++ idle() , i.e. send all buffers, wait for work
1290 // suppress fishing in EDEN... just look for incoming messages
1291 // (blocking receive)
1292 IF_PAR_DEBUG(verbose,
1293 debugBelch("...wait for incoming messages...\n"));
1294 *receivedFinish = processMessages(); // blocking receive...
1295
1296 // and reenter scheduling loop after having received something
1297 // (return rtsFalse below)
1298
1299 # else /* activate SPARKS machinery */
1300 /* We get here, if we have no work, tried to activate a local spark, but still
1301 have no work. We try to get a remote spark, by sending a FISH message.
1302 Thread migration should be added here, and triggered when a sequence of
1303 fishes returns without work. */
1304 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1305
1306 /* =8-[ no local sparks => look for work on other PEs */
1307 /*
1308 * We really have absolutely no work. Send out a fish
1309 * (there may be some out there already), and wait for
1310 * something to arrive. We clearly can't run any threads
1311 * until a SCHEDULE or RESUME arrives, and so that's what
1312 * we're hoping to see. (Of course, we still have to
1313 * respond to other types of messages.)
1314 */
1315 rtsTime now = msTime() /*CURRENT_TIME*/;
1316 IF_PAR_DEBUG(verbose,
1317 debugBelch("-- now=%ld\n", now));
1318 IF_PAR_DEBUG(fish, // verbose,
1319 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1320 (last_fish_arrived_at!=0 &&
1321 last_fish_arrived_at+delay > now)) {
1322 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1323 now, last_fish_arrived_at+delay,
1324 last_fish_arrived_at,
1325 delay);
1326 });
1327
1328 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1329 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1330 if (last_fish_arrived_at==0 ||
1331 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1332 /* outstandingFishes is set in sendFish, processFish;
1333 avoid flooding system with fishes via delay */
1334 next_fish_to_send_at = 0;
1335 } else {
1336 /* ToDo: this should be done in the main scheduling loop to avoid the
1337 busy wait here; not so bad if fish delay is very small */
1338 int iq = 0; // DEBUGGING -- HWL
1339 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1340 /* send a fish when ready, but process messages that arrive in the meantime */
1341 do {
1342 if (PacketsWaiting()) {
1343 iq++; // DEBUGGING
1344 *receivedFinish = processMessages();
1345 }
1346 now = msTime();
1347 } while (!*receivedFinish || now<next_fish_to_send_at);
1348 // JB: This means the fish could become obsolete, if we receive
1349 // work. Better check for work again?
1350 // last line: while (!receivedFinish || !haveWork || now<...)
1351 // next line: if (receivedFinish || haveWork )
1352
1353 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1354 return rtsFalse; // NB: this will leave scheduler loop
1355 // immediately after return!
1356
1357 IF_PAR_DEBUG(fish, // verbose,
1358 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1359
1360 }
1361
1362 // JB: IMHO, this should all be hidden inside sendFish(...)
1363 /* pe = choosePE();
1364 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1365 NEW_FISH_HUNGER);
1366
1367 // Global statistics: count no. of fishes
1368 if (RtsFlags.ParFlags.ParStats.Global &&
1369 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1370 globalParStats.tot_fish_mess++;
1371 }
1372 */
1373
1374 /* delayed fishes must have been sent by now! */
1375 next_fish_to_send_at = 0;
1376 }
1377
1378 *receivedFinish = processMessages();
1379 # endif /* SPARKS */
1380
1381 return rtsFalse;
1382 /* NB: this function always returns rtsFalse, meaning the scheduler
1383 loop continues with the next iteration;
1384 rationale:
1385 return code means success in finding work; we enter this function
1386 if there is no local work, thus have to send a fish which takes
1387 time until it arrives with work; in the meantime we should process
1388 messages in the main loop;
1389 */
1390 }
1391 #endif // PARALLEL_HASKELL
1392
1393 /* ----------------------------------------------------------------------------
1394 * PAR/GRAN: Report stats & debugging info(?)
1395 * ------------------------------------------------------------------------- */
1396
1397 #if defined(PAR) || defined(GRAN)
1398 static void
1399 scheduleGranParReport(void)
1400 {
1401 ASSERT(run_queue_hd != END_TSO_QUEUE);
1402
1403 /* Take a thread from the run queue, if we have work */
1404 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1405
1406 /* If this TSO has got its outport closed in the meantime,
1407 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1408 * It has to be marked as TH_DEAD for this purpose.
1409 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1410
1411 JB: TODO: investigate wether state change field could be nuked
1412 entirely and replaced by the normal tso state (whatnext
1413 field). All we want to do is to kill tsos from outside.
1414 */
1415
1416 /* ToDo: write something to the log-file
1417 if (RTSflags.ParFlags.granSimStats && !sameThread)
1418 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1419
1420 CurrentTSO = t;
1421 */
1422 /* the spark pool for the current PE */
1423 pool = &(cap.r.rSparks); // cap = (old) MainCap
1424
1425 IF_DEBUG(scheduler,
1426 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1427 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1428
1429 IF_PAR_DEBUG(fish,
1430 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1431 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1432
1433 if (RtsFlags.ParFlags.ParStats.Full &&
1434 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1435 (emitSchedule || // forced emit
1436 (t && LastTSO && t->id != LastTSO->id))) {
1437 /*
1438 we are running a different TSO, so write a schedule event to log file
1439 NB: If we use fair scheduling we also have to write a deschedule
1440 event for LastTSO; with unfair scheduling we know that the
1441 previous tso has blocked whenever we switch to another tso, so
1442 we don't need it in GUM for now
1443 */
1444 IF_PAR_DEBUG(fish, // schedule,
1445 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1446
1447 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1448 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1449 emitSchedule = rtsFalse;
1450 }
1451 }
1452 #endif
1453
1454 /* ----------------------------------------------------------------------------
1455 * After running a thread...
1456 * ------------------------------------------------------------------------- */
1457
1458 static void
1459 schedulePostRunThread(void)
1460 {
1461 #if defined(PAR)
1462 /* HACK 675: if the last thread didn't yield, make sure to print a
1463 SCHEDULE event to the log file when StgRunning the next thread, even
1464 if it is the same one as before */
1465 LastTSO = t;
1466 TimeOfLastYield = CURRENT_TIME;
1467 #endif
1468
1469 /* some statistics gathering in the parallel case */
1470
1471 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1472 switch (ret) {
1473 case HeapOverflow:
1474 # if defined(GRAN)
1475 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1476 globalGranStats.tot_heapover++;
1477 # elif defined(PAR)
1478 globalParStats.tot_heapover++;
1479 # endif
1480 break;
1481
1482 case StackOverflow:
1483 # if defined(GRAN)
1484 IF_DEBUG(gran,
1485 DumpGranEvent(GR_DESCHEDULE, t));
1486 globalGranStats.tot_stackover++;
1487 # elif defined(PAR)
1488 // IF_DEBUG(par,
1489 // DumpGranEvent(GR_DESCHEDULE, t);
1490 globalParStats.tot_stackover++;
1491 # endif
1492 break;
1493
1494 case ThreadYielding:
1495 # if defined(GRAN)
1496 IF_DEBUG(gran,
1497 DumpGranEvent(GR_DESCHEDULE, t));
1498 globalGranStats.tot_yields++;
1499 # elif defined(PAR)
1500 // IF_DEBUG(par,
1501 // DumpGranEvent(GR_DESCHEDULE, t);
1502 globalParStats.tot_yields++;
1503 # endif
1504 break;
1505
1506 case ThreadBlocked:
1507 # if defined(GRAN)
1508 debugTrace(DEBUG_sched,
1509 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1510 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1511 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1512 if (t->block_info.closure!=(StgClosure*)NULL)
1513 print_bq(t->block_info.closure);
1514 debugBelch("\n"));
1515
1516 // ??? needed; should emit block before
1517 IF_DEBUG(gran,
1518 DumpGranEvent(GR_DESCHEDULE, t));
1519 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1520 /*
1521 ngoq Dogh!
1522 ASSERT(procStatus[CurrentProc]==Busy ||
1523 ((procStatus[CurrentProc]==Fetching) &&
1524 (t->block_info.closure!=(StgClosure*)NULL)));
1525 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1526 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1527 procStatus[CurrentProc]==Fetching))
1528 procStatus[CurrentProc] = Idle;
1529 */
1530 # elif defined(PAR)
1531 //++PAR++ blockThread() writes the event (change?)
1532 # endif
1533 break;
1534
1535 case ThreadFinished:
1536 break;
1537
1538 default:
1539 barf("parGlobalStats: unknown return code");
1540 break;
1541 }
1542 #endif
1543 }
1544
1545 /* -----------------------------------------------------------------------------
1546 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1547 * -------------------------------------------------------------------------- */
1548
1549 static rtsBool
1550 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1551 {
1552 // did the task ask for a large block?
1553 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1554 // if so, get one and push it on the front of the nursery.
1555 bdescr *bd;
1556 lnat blocks;
1557
1558 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1559
1560 debugTrace(DEBUG_sched,
1561 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1562 (long)t->id, whatNext_strs[t->what_next], blocks);
1563
1564 // don't do this if the nursery is (nearly) full, we'll GC first.
1565 if (cap->r.rCurrentNursery->link != NULL ||
1566 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1567 // if the nursery has only one block.
1568
1569 ACQUIRE_SM_LOCK
1570 bd = allocGroup( blocks );
1571 RELEASE_SM_LOCK
1572 cap->r.rNursery->n_blocks += blocks;
1573
1574 // link the new group into the list
1575 bd->link = cap->r.rCurrentNursery;
1576 bd->u.back = cap->r.rCurrentNursery->u.back;
1577 if (cap->r.rCurrentNursery->u.back != NULL) {
1578 cap->r.rCurrentNursery->u.back->link = bd;
1579 } else {
1580 #if !defined(THREADED_RTS)
1581 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1582 g0s0 == cap->r.rNursery);
1583 #endif
1584 cap->r.rNursery->blocks = bd;
1585 }
1586 cap->r.rCurrentNursery->u.back = bd;
1587
1588 // initialise it as a nursery block. We initialise the
1589 // step, gen_no, and flags field of *every* sub-block in
1590 // this large block, because this is easier than making
1591 // sure that we always find the block head of a large
1592 // block whenever we call Bdescr() (eg. evacuate() and
1593 // isAlive() in the GC would both have to do this, at
1594 // least).
1595 {
1596 bdescr *x;
1597 for (x = bd; x < bd + blocks; x++) {
1598 x->step = cap->r.rNursery;
1599 x->gen_no = 0;
1600 x->flags = 0;
1601 }
1602 }
1603
1604 // This assert can be a killer if the app is doing lots
1605 // of large block allocations.
1606 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1607
1608 // now update the nursery to point to the new block
1609 cap->r.rCurrentNursery = bd;
1610
1611 // we might be unlucky and have another thread get on the
1612 // run queue before us and steal the large block, but in that
1613 // case the thread will just end up requesting another large
1614 // block.
1615 pushOnRunQueue(cap,t);
1616 return rtsFalse; /* not actually GC'ing */
1617 }
1618 }
1619
1620 debugTrace(DEBUG_sched,
1621 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1622 (long)t->id, whatNext_strs[t->what_next]);
1623
1624 #if defined(GRAN)
1625 ASSERT(!is_on_queue(t,CurrentProc));
1626 #elif defined(PARALLEL_HASKELL)
1627 /* Currently we emit a DESCHEDULE event before GC in GUM.
1628 ToDo: either add separate event to distinguish SYSTEM time from rest
1629 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1630 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1631 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1632 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1633 emitSchedule = rtsTrue;
1634 }
1635 #endif
1636
1637 pushOnRunQueue(cap,t);
1638 return rtsTrue;
1639 /* actual GC is done at the end of the while loop in schedule() */
1640 }
1641
1642 /* -----------------------------------------------------------------------------
1643 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1644 * -------------------------------------------------------------------------- */
1645
1646 static void
1647 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1648 {
1649 debugTrace (DEBUG_sched,
1650 "--<< thread %ld (%s) stopped, StackOverflow",
1651 (long)t->id, whatNext_strs[t->what_next]);
1652
1653 /* just adjust the stack for this thread, then pop it back
1654 * on the run queue.
1655 */
1656 {
1657 /* enlarge the stack */
1658 StgTSO *new_t = threadStackOverflow(cap, t);
1659
1660 /* The TSO attached to this Task may have moved, so update the
1661 * pointer to it.
1662 */
1663 if (task->tso == t) {
1664 task->tso = new_t;
1665 }
1666 pushOnRunQueue(cap,new_t);
1667 }
1668 }
1669
1670 /* -----------------------------------------------------------------------------
1671 * Handle a thread that returned to the scheduler with ThreadYielding
1672 * -------------------------------------------------------------------------- */
1673
1674 static rtsBool
1675 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1676 {
1677 // Reset the context switch flag. We don't do this just before
1678 // running the thread, because that would mean we would lose ticks
1679 // during GC, which can lead to unfair scheduling (a thread hogs
1680 // the CPU because the tick always arrives during GC). This way
1681 // penalises threads that do a lot of allocation, but that seems
1682 // better than the alternative.
1683 context_switch = 0;
1684
1685 /* put the thread back on the run queue. Then, if we're ready to
1686 * GC, check whether this is the last task to stop. If so, wake
1687 * up the GC thread. getThread will block during a GC until the
1688 * GC is finished.
1689 */
1690 #ifdef DEBUG
1691 if (t->what_next != prev_what_next) {
1692 debugTrace(DEBUG_sched,
1693 "--<< thread %ld (%s) stopped to switch evaluators",
1694 (long)t->id, whatNext_strs[t->what_next]);
1695 } else {
1696 debugTrace(DEBUG_sched,
1697 "--<< thread %ld (%s) stopped, yielding",
1698 (long)t->id, whatNext_strs[t->what_next]);
1699 }
1700 #endif
1701
1702 IF_DEBUG(sanity,
1703 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1704 checkTSO(t));
1705 ASSERT(t->link == END_TSO_QUEUE);
1706
1707 // Shortcut if we're just switching evaluators: don't bother
1708 // doing stack squeezing (which can be expensive), just run the
1709 // thread.
1710 if (t->what_next != prev_what_next) {
1711 return rtsTrue;
1712 }
1713
1714 #if defined(GRAN)
1715 ASSERT(!is_on_queue(t,CurrentProc));
1716
1717 IF_DEBUG(sanity,
1718 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1719 checkThreadQsSanity(rtsTrue));
1720
1721 #endif
1722
1723 addToRunQueue(cap,t);
1724
1725 #if defined(GRAN)
1726 /* add a ContinueThread event to actually process the thread */
1727 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1728 ContinueThread,
1729 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1730 IF_GRAN_DEBUG(bq,
1731 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1732 G_EVENTQ(0);
1733 G_CURR_THREADQ(0));
1734 #endif
1735 return rtsFalse;
1736 }
1737
1738 /* -----------------------------------------------------------------------------
1739 * Handle a thread that returned to the scheduler with ThreadBlocked
1740 * -------------------------------------------------------------------------- */
1741
1742 static void
1743 scheduleHandleThreadBlocked( StgTSO *t
1744 #if !defined(GRAN) && !defined(DEBUG)
1745 STG_UNUSED
1746 #endif
1747 )
1748 {
1749 #if defined(GRAN)
1750 IF_DEBUG(scheduler,
1751 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1752 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1753 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1754
1755 // ??? needed; should emit block before
1756 IF_DEBUG(gran,
1757 DumpGranEvent(GR_DESCHEDULE, t));
1758 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1759 /*
1760 ngoq Dogh!
1761 ASSERT(procStatus[CurrentProc]==Busy ||
1762 ((procStatus[CurrentProc]==Fetching) &&
1763 (t->block_info.closure!=(StgClosure*)NULL)));
1764 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1765 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1766 procStatus[CurrentProc]==Fetching))
1767 procStatus[CurrentProc] = Idle;
1768 */
1769 #elif defined(PAR)
1770 IF_DEBUG(scheduler,
1771 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1772 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1773 IF_PAR_DEBUG(bq,
1774
1775 if (t->block_info.closure!=(StgClosure*)NULL)
1776 print_bq(t->block_info.closure));
1777
1778 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1779 blockThread(t);
1780
1781 /* whatever we schedule next, we must log that schedule */
1782 emitSchedule = rtsTrue;
1783
1784 #else /* !GRAN */
1785
1786 // We don't need to do anything. The thread is blocked, and it
1787 // has tidied up its stack and placed itself on whatever queue
1788 // it needs to be on.
1789
1790 // ASSERT(t->why_blocked != NotBlocked);
1791 // Not true: for example,
1792 // - in THREADED_RTS, the thread may already have been woken
1793 // up by another Capability. This actually happens: try
1794 // conc023 +RTS -N2.
1795 // - the thread may have woken itself up already, because
1796 // threadPaused() might have raised a blocked throwTo
1797 // exception, see maybePerformBlockedException().
1798
1799 #ifdef DEBUG
1800 if (traceClass(DEBUG_sched)) {
1801 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1802 (unsigned long)t->id, whatNext_strs[t->what_next]);
1803 printThreadBlockage(t);
1804 debugTraceEnd();
1805 }
1806 #endif
1807
1808 /* Only for dumping event to log file
1809 ToDo: do I need this in GranSim, too?
1810 blockThread(t);
1811 */
1812 #endif
1813 }
1814
1815 /* -----------------------------------------------------------------------------
1816 * Handle a thread that returned to the scheduler with ThreadFinished
1817 * -------------------------------------------------------------------------- */
1818
1819 static rtsBool
1820 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1821 {
1822 /* Need to check whether this was a main thread, and if so,
1823 * return with the return value.
1824 *
1825 * We also end up here if the thread kills itself with an
1826 * uncaught exception, see Exception.cmm.
1827 */
1828 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1829 (unsigned long)t->id, whatNext_strs[t->what_next]);
1830
1831 #if defined(GRAN)
1832 endThread(t, CurrentProc); // clean-up the thread
1833 #elif defined(PARALLEL_HASKELL)
1834 /* For now all are advisory -- HWL */
1835 //if(t->priority==AdvisoryPriority) ??
1836 advisory_thread_count--; // JB: Caution with this counter, buggy!
1837
1838 # if defined(DIST)
1839 if(t->dist.priority==RevalPriority)
1840 FinishReval(t);
1841 # endif
1842
1843 # if defined(EDENOLD)
1844 // the thread could still have an outport... (BUG)
1845 if (t->eden.outport != -1) {
1846 // delete the outport for the tso which has finished...
1847 IF_PAR_DEBUG(eden_ports,
1848 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1849 t->eden.outport, t->id));
1850 deleteOPT(t);
1851 }
1852 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1853 if (t->eden.epid != -1) {
1854 IF_PAR_DEBUG(eden_ports,
1855 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1856 t->id, t->eden.epid));
1857 removeTSOfromProcess(t);
1858 }
1859 # endif
1860
1861 # if defined(PAR)
1862 if (RtsFlags.ParFlags.ParStats.Full &&
1863 !RtsFlags.ParFlags.ParStats.Suppressed)
1864 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1865
1866 // t->par only contains statistics: left out for now...
1867 IF_PAR_DEBUG(fish,
1868 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1869 t->id,t,t->par.sparkname));
1870 # endif
1871 #endif // PARALLEL_HASKELL
1872
1873 //
1874 // Check whether the thread that just completed was a bound
1875 // thread, and if so return with the result.
1876 //
1877 // There is an assumption here that all thread completion goes
1878 // through this point; we need to make sure that if a thread
1879 // ends up in the ThreadKilled state, that it stays on the run
1880 // queue so it can be dealt with here.
1881 //
1882
1883 if (t->bound) {
1884
1885 if (t->bound != task) {
1886 #if !defined(THREADED_RTS)
1887 // Must be a bound thread that is not the topmost one. Leave
1888 // it on the run queue until the stack has unwound to the
1889 // point where we can deal with this. Leaving it on the run
1890 // queue also ensures that the garbage collector knows about
1891 // this thread and its return value (it gets dropped from the
1892 // all_threads list so there's no other way to find it).
1893 appendToRunQueue(cap,t);
1894 return rtsFalse;
1895 #else
1896 // this cannot happen in the threaded RTS, because a
1897 // bound thread can only be run by the appropriate Task.
1898 barf("finished bound thread that isn't mine");
1899 #endif
1900 }
1901
1902 ASSERT(task->tso == t);
1903
1904 if (t->what_next == ThreadComplete) {
1905 if (task->ret) {
1906 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1907 *(task->ret) = (StgClosure *)task->tso->sp[1];
1908 }
1909 task->stat = Success;
1910 } else {
1911 if (task->ret) {
1912 *(task->ret) = NULL;
1913 }
1914 if (sched_state >= SCHED_INTERRUPTING) {
1915 task->stat = Interrupted;
1916 } else {
1917 task->stat = Killed;
1918 }
1919 }
1920 #ifdef DEBUG
1921 removeThreadLabel((StgWord)task->tso->id);
1922 #endif
1923 return rtsTrue; // tells schedule() to return
1924 }
1925
1926 return rtsFalse;
1927 }
1928
1929 /* -----------------------------------------------------------------------------
1930 * Perform a heap census
1931 * -------------------------------------------------------------------------- */
1932
1933 static rtsBool
1934 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1935 {
1936 // When we have +RTS -i0 and we're heap profiling, do a census at
1937 // every GC. This lets us get repeatable runs for debugging.
1938 if (performHeapProfile ||
1939 (RtsFlags.ProfFlags.profileInterval==0 &&
1940 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1941 return rtsTrue;
1942 } else {
1943 return rtsFalse;
1944 }
1945 }
1946
1947 /* -----------------------------------------------------------------------------
1948 * Perform a garbage collection if necessary
1949 * -------------------------------------------------------------------------- */
1950
1951 static Capability *
1952 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1953 {
1954 StgTSO *t;
1955 rtsBool heap_census;
1956 #ifdef THREADED_RTS
1957 static volatile StgWord waiting_for_gc;
1958 rtsBool was_waiting;
1959 nat i;
1960 #endif
1961
1962 #ifdef THREADED_RTS
1963 // In order to GC, there must be no threads running Haskell code.
1964 // Therefore, the GC thread needs to hold *all* the capabilities,
1965 // and release them after the GC has completed.
1966 //
1967 // This seems to be the simplest way: previous attempts involved
1968 // making all the threads with capabilities give up their
1969 // capabilities and sleep except for the *last* one, which
1970 // actually did the GC. But it's quite hard to arrange for all
1971 // the other tasks to sleep and stay asleep.
1972 //
1973
1974 was_waiting = cas(&waiting_for_gc, 0, 1);
1975 if (was_waiting) {
1976 do {
1977 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1978 if (cap) yieldCapability(&cap,task);
1979 } while (waiting_for_gc);
1980 return cap; // NOTE: task->cap might have changed here
1981 }
1982
1983 for (i=0; i < n_capabilities; i++) {
1984 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1985 if (cap != &capabilities[i]) {
1986 Capability *pcap = &capabilities[i];
1987 // we better hope this task doesn't get migrated to
1988 // another Capability while we're waiting for this one.
1989 // It won't, because load balancing happens while we have
1990 // all the Capabilities, but even so it's a slightly
1991 // unsavoury invariant.
1992 task->cap = pcap;
1993 context_switch = 1;
1994 waitForReturnCapability(&pcap, task);
1995 if (pcap != &capabilities[i]) {
1996 barf("scheduleDoGC: got the wrong capability");
1997 }
1998 }
1999 }
2000
2001 waiting_for_gc = rtsFalse;
2002 #endif
2003
2004 /* Kick any transactions which are invalid back to their
2005 * atomically frames. When next scheduled they will try to
2006 * commit, this commit will fail and they will retry.
2007 */
2008 {
2009 StgTSO *next;
2010
2011 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2012 if (t->what_next == ThreadRelocated) {
2013 next = t->link;
2014 } else {
2015 next = t->global_link;
2016
2017 // This is a good place to check for blocked
2018 // exceptions. It might be the case that a thread is
2019 // blocked on delivering an exception to a thread that
2020 // is also blocked - we try to ensure that this
2021 // doesn't happen in throwTo(), but it's too hard (or
2022 // impossible) to close all the race holes, so we
2023 // accept that some might get through and deal with
2024 // them here. A GC will always happen at some point,
2025 // even if the system is otherwise deadlocked.
2026 maybePerformBlockedException (&capabilities[0], t);
2027
2028 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2029 if (!stmValidateNestOfTransactions (t -> trec)) {
2030 debugTrace(DEBUG_sched | DEBUG_stm,
2031 "trec %p found wasting its time", t);
2032
2033 // strip the stack back to the
2034 // ATOMICALLY_FRAME, aborting the (nested)
2035 // transaction, and saving the stack of any
2036 // partially-evaluated thunks on the heap.
2037 throwToSingleThreaded_(&capabilities[0], t,
2038 NULL, rtsTrue, NULL);
2039
2040 #ifdef REG_R1
2041 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2042 #endif
2043 }
2044 }
2045 }
2046 }
2047 }
2048
2049 // so this happens periodically:
2050 if (cap) scheduleCheckBlackHoles(cap);
2051
2052 IF_DEBUG(scheduler, printAllThreads());
2053
2054 /*
2055 * We now have all the capabilities; if we're in an interrupting
2056 * state, then we should take the opportunity to delete all the
2057 * threads in the system.
2058 */
2059 if (sched_state >= SCHED_INTERRUPTING) {
2060 deleteAllThreads(&capabilities[0]);
2061 sched_state = SCHED_SHUTTING_DOWN;
2062 }
2063
2064 heap_census = scheduleNeedHeapProfile(rtsTrue);
2065
2066 /* everybody back, start the GC.
2067 * Could do it in this thread, or signal a condition var
2068 * to do it in another thread. Either way, we need to
2069 * broadcast on gc_pending_cond afterward.
2070 */
2071 #if defined(THREADED_RTS)
2072 debugTrace(DEBUG_sched, "doing GC");
2073 #endif
2074 GarbageCollect(force_major || heap_census);
2075
2076 if (heap_census) {
2077 debugTrace(DEBUG_sched, "performing heap census");
2078 heapCensus();
2079 performHeapProfile = rtsFalse;
2080 }
2081
2082 #if defined(THREADED_RTS)
2083 // release our stash of capabilities.
2084 for (i = 0; i < n_capabilities; i++) {
2085 if (cap != &capabilities[i]) {
2086 task->cap = &capabilities[i];
2087 releaseCapability(&capabilities[i]);
2088 }
2089 }
2090 if (cap) {
2091 task->cap = cap;
2092 } else {
2093 task->cap = NULL;
2094 }
2095 #endif
2096
2097 #if defined(GRAN)
2098 /* add a ContinueThread event to continue execution of current thread */
2099 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2100 ContinueThread,
2101 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2102 IF_GRAN_DEBUG(bq,
2103 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2104 G_EVENTQ(0);
2105 G_CURR_THREADQ(0));
2106 #endif /* GRAN */
2107
2108 return cap;
2109 }
2110
2111 /* ---------------------------------------------------------------------------
2112 * Singleton fork(). Do not copy any running threads.
2113 * ------------------------------------------------------------------------- */
2114
2115 pid_t
2116 forkProcess(HsStablePtr *entry
2117 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2118 STG_UNUSED
2119 #endif
2120 )
2121 {
2122 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2123 Task *task;
2124 pid_t pid;
2125 StgTSO* t,*next;
2126 Capability *cap;
2127
2128 #if defined(THREADED_RTS)
2129 if (RtsFlags.ParFlags.nNodes > 1) {
2130 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2131 stg_exit(EXIT_FAILURE);
2132 }
2133 #endif
2134
2135 debugTrace(DEBUG_sched, "forking!");
2136
2137 // ToDo: for SMP, we should probably acquire *all* the capabilities
2138 cap = rts_lock();
2139
2140 pid = fork();
2141
2142 if (pid) { // parent
2143
2144 // just return the pid
2145 rts_unlock(cap);
2146 return pid;
2147
2148 } else { // child
2149
2150 // Now, all OS threads except the thread that forked are
2151 // stopped. We need to stop all Haskell threads, including
2152 // those involved in foreign calls. Also we need to delete
2153 // all Tasks, because they correspond to OS threads that are
2154 // now gone.
2155
2156 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2157 if (t->what_next == ThreadRelocated) {
2158 next = t->link;
2159 } else {
2160 next = t->global_link;
2161 // don't allow threads to catch the ThreadKilled
2162 // exception, but we do want to raiseAsync() because these
2163 // threads may be evaluating thunks that we need later.
2164 deleteThread_(cap,t);
2165 }
2166 }
2167
2168 // Empty the run queue. It seems tempting to let all the
2169 // killed threads stay on the run queue as zombies to be
2170 // cleaned up later, but some of them correspond to bound
2171 // threads for which the corresponding Task does not exist.
2172 cap->run_queue_hd = END_TSO_QUEUE;
2173 cap->run_queue_tl = END_TSO_QUEUE;
2174
2175 // Any suspended C-calling Tasks are no more, their OS threads
2176 // don't exist now:
2177 cap->suspended_ccalling_tasks = NULL;
2178
2179 // Empty the all_threads list. Otherwise, the garbage
2180 // collector may attempt to resurrect some of these threads.
2181 all_threads = END_TSO_QUEUE;
2182
2183 // Wipe the task list, except the current Task.
2184 ACQUIRE_LOCK(&sched_mutex);
2185 for (task = all_tasks; task != NULL; task=task->all_link) {
2186 if (task != cap->running_task) {
2187 discardTask(task);
2188 }
2189 }
2190 RELEASE_LOCK(&sched_mutex);
2191
2192 #if defined(THREADED_RTS)
2193 // Wipe our spare workers list, they no longer exist. New
2194 // workers will be created if necessary.
2195 cap->spare_workers = NULL;
2196 cap->returning_tasks_hd = NULL;
2197 cap->returning_tasks_tl = NULL;
2198 #endif
2199
2200 // On Unix, all timers are reset in the child, so we need to start
2201 // the timer again.
2202 initTimer();
2203 startTimer();
2204
2205 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2206 rts_checkSchedStatus("forkProcess",cap);
2207
2208 rts_unlock(cap);
2209 hs_exit(); // clean up and exit
2210 stg_exit(EXIT_SUCCESS);
2211 }
2212 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2213 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2214 return -1;
2215 #endif
2216 }
2217
2218 /* ---------------------------------------------------------------------------
2219 * Delete all the threads in the system
2220 * ------------------------------------------------------------------------- */
2221
2222 static void
2223 deleteAllThreads ( Capability *cap )
2224 {
2225 // NOTE: only safe to call if we own all capabilities.
2226
2227 StgTSO* t, *next;
2228 debugTrace(DEBUG_sched,"deleting all threads");
2229 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2230 if (t->what_next == ThreadRelocated) {
2231 next = t->link;
2232 } else {
2233 next = t->global_link;
2234 deleteThread(cap,t);
2235 }
2236 }
2237
2238 // The run queue now contains a bunch of ThreadKilled threads. We
2239 // must not throw these away: the main thread(s) will be in there
2240 // somewhere, and the main scheduler loop has to deal with it.
2241 // Also, the run queue is the only thing keeping these threads from
2242 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2243
2244 #if !defined(THREADED_RTS)
2245 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2246 ASSERT(sleeping_queue == END_TSO_QUEUE);
2247 #endif
2248 }
2249
2250 /* -----------------------------------------------------------------------------
2251 Managing the suspended_ccalling_tasks list.
2252 Locks required: sched_mutex
2253 -------------------------------------------------------------------------- */
2254
2255 STATIC_INLINE void
2256 suspendTask (Capability *cap, Task *task)
2257 {
2258 ASSERT(task->next == NULL && task->prev == NULL);
2259 task->next = cap->suspended_ccalling_tasks;
2260 task->prev = NULL;
2261 if (cap->suspended_ccalling_tasks) {
2262 cap->suspended_ccalling_tasks->prev = task;
2263 }
2264 cap->suspended_ccalling_tasks = task;
2265 }
2266
2267 STATIC_INLINE void
2268 recoverSuspendedTask (Capability *cap, Task *task)
2269 {
2270 if (task->prev) {
2271 task->prev->next = task->next;
2272 } else {
2273 ASSERT(cap->suspended_ccalling_tasks == task);
2274 cap->suspended_ccalling_tasks = task->next;
2275 }
2276 if (task->next) {
2277 task->next->prev = task->prev;
2278 }
2279 task->next = task->prev = NULL;
2280 }
2281
2282 /* ---------------------------------------------------------------------------
2283 * Suspending & resuming Haskell threads.
2284 *
2285 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2286 * its capability before calling the C function. This allows another
2287 * task to pick up the capability and carry on running Haskell
2288 * threads. It also means that if the C call blocks, it won't lock
2289 * the whole system.
2290 *
2291 * The Haskell thread making the C call is put to sleep for the
2292 * duration of the call, on the susepended_ccalling_threads queue. We
2293 * give out a token to the task, which it can use to resume the thread
2294 * on return from the C function.
2295 * ------------------------------------------------------------------------- */
2296
2297 void *
2298 suspendThread (StgRegTable *reg)
2299 {
2300 Capability *cap;
2301 int saved_errno;
2302 StgTSO *tso;
2303 Task *task;
2304 #if mingw32_HOST_OS
2305 StgWord32 saved_winerror;
2306 #endif
2307
2308 saved_errno = errno;
2309 #if mingw32_HOST_OS
2310 saved_winerror = GetLastError();
2311 #endif
2312
2313 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2314 */
2315 cap = regTableToCapability(reg);
2316
2317 task = cap->running_task;
2318 tso = cap->r.rCurrentTSO;
2319
2320 debugTrace(DEBUG_sched,
2321 "thread %lu did a safe foreign call",
2322 (unsigned long)cap->r.rCurrentTSO->id);
2323
2324 // XXX this might not be necessary --SDM
2325 tso->what_next = ThreadRunGHC;
2326
2327 threadPaused(cap,tso);
2328
2329 if ((tso->flags & TSO_BLOCKEX) == 0) {
2330 tso->why_blocked = BlockedOnCCall;
2331 tso->flags |= TSO_BLOCKEX;
2332 tso->flags &= ~TSO_INTERRUPTIBLE;
2333 } else {
2334 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2335 }
2336
2337 // Hand back capability
2338 task->suspended_tso = tso;
2339
2340 ACQUIRE_LOCK(&cap->lock);
2341
2342 suspendTask(cap,task);
2343 cap->in_haskell = rtsFalse;
2344 releaseCapability_(cap);
2345
2346 RELEASE_LOCK(&cap->lock);
2347
2348 #if defined(THREADED_RTS)
2349 /* Preparing to leave the RTS, so ensure there's a native thread/task
2350 waiting to take over.
2351 */
2352 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2353 #endif
2354
2355 errno = saved_errno;
2356 #if mingw32_HOST_OS
2357 SetLastError(saved_winerror);
2358 #endif
2359 return task;
2360 }
2361
2362 StgRegTable *
2363 resumeThread (void *task_)
2364 {
2365 StgTSO *tso;
2366 Capability *cap;
2367 Task *task = task_;
2368 int saved_errno;
2369 #if mingw32_HOST_OS
2370 StgWord32 saved_winerror;
2371 #endif
2372
2373 saved_errno = errno;
2374 #if mingw32_HOST_OS
2375 saved_winerror = GetLastError();
2376 #endif
2377
2378 cap = task->cap;
2379 // Wait for permission to re-enter the RTS with the result.
2380 waitForReturnCapability(&cap,task);
2381 // we might be on a different capability now... but if so, our
2382 // entry on the suspended_ccalling_tasks list will also have been
2383 // migrated.
2384
2385 // Remove the thread from the suspended list
2386 recoverSuspendedTask(cap,task);
2387
2388 tso = task->suspended_tso;
2389 task->suspended_tso = NULL;
2390 tso->link = END_TSO_QUEUE;
2391 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2392
2393 if (tso->why_blocked == BlockedOnCCall) {
2394 awakenBlockedExceptionQueue(cap,tso);
2395 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2396 }
2397
2398 /* Reset blocking status */
2399 tso->why_blocked = NotBlocked;
2400
2401 cap->r.rCurrentTSO = tso;
2402 cap->in_haskell = rtsTrue;
2403 errno = saved_errno;
2404 #if mingw32_HOST_OS
2405 SetLastError(saved_winerror);
2406 #endif
2407
2408 /* We might have GC'd, mark the TSO dirty again */
2409 dirtyTSO(tso);
2410
2411 IF_DEBUG(sanity, checkTSO(tso));
2412
2413 return &cap->r;
2414 }
2415
2416 /* ---------------------------------------------------------------------------
2417 * scheduleThread()
2418 *
2419 * scheduleThread puts a thread on the end of the runnable queue.
2420 * This will usually be done immediately after a thread is created.
2421 * The caller of scheduleThread must create the thread using e.g.
2422 * createThread and push an appropriate closure
2423 * on this thread's stack before the scheduler is invoked.
2424 * ------------------------------------------------------------------------ */
2425
2426 void
2427 scheduleThread(Capability *cap, StgTSO *tso)
2428 {
2429 // The thread goes at the *end* of the run-queue, to avoid possible
2430 // starvation of any threads already on the queue.
2431 appendToRunQueue(cap,tso);
2432 }
2433
2434 void
2435 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2436 {
2437 #if defined(THREADED_RTS)
2438 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2439 // move this thread from now on.
2440 cpu %= RtsFlags.ParFlags.nNodes;
2441 if (cpu == cap->no) {
2442 appendToRunQueue(cap,tso);
2443 } else {
2444 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2445 }
2446 #else
2447 appendToRunQueue(cap,tso);
2448 #endif
2449 }
2450
2451 Capability *
2452 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2453 {
2454 Task *task;
2455
2456 // We already created/initialised the Task
2457 task = cap->running_task;
2458
2459 // This TSO is now a bound thread; make the Task and TSO
2460 // point to each other.
2461 tso->bound = task;
2462 tso->cap = cap;
2463
2464 task->tso = tso;
2465 task->ret = ret;
2466 task->stat = NoStatus;
2467
2468 appendToRunQueue(cap,tso);
2469
2470 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2471
2472 #if defined(GRAN)
2473 /* GranSim specific init */
2474 CurrentTSO = m->tso; // the TSO to run
2475 procStatus[MainProc] = Busy; // status of main PE
2476 CurrentProc = MainProc; // PE to run it on
2477 #endif
2478
2479 cap = schedule(cap,task);
2480
2481 ASSERT(task->stat != NoStatus);
2482 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2483
2484 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2485 return cap;
2486 }
2487
2488 /* ----------------------------------------------------------------------------
2489 * Starting Tasks
2490 * ------------------------------------------------------------------------- */
2491
2492 #if defined(THREADED_RTS)
2493 void
2494 workerStart(Task *task)
2495 {
2496 Capability *cap;
2497
2498 // See startWorkerTask().
2499 ACQUIRE_LOCK(&task->lock);
2500 cap = task->cap;
2501 RELEASE_LOCK(&task->lock);
2502
2503 // set the thread-local pointer to the Task:
2504 taskEnter(task);
2505
2506 // schedule() runs without a lock.
2507 cap = schedule(cap,task);
2508
2509 // On exit from schedule(), we have a Capability.
2510 releaseCapability(cap);
2511 workerTaskStop(task);
2512 }
2513 #endif
2514
2515 /* ---------------------------------------------------------------------------
2516 * initScheduler()
2517 *
2518 * Initialise the scheduler. This resets all the queues - if the
2519 * queues contained any threads, they'll be garbage collected at the
2520 * next pass.
2521 *
2522 * ------------------------------------------------------------------------ */
2523
2524 void
2525 initScheduler(void)
2526 {
2527 #if defined(GRAN)
2528 nat i;
2529 for (i=0; i<=MAX_PROC; i++) {
2530 run_queue_hds[i] = END_TSO_QUEUE;
2531 run_queue_tls[i] = END_TSO_QUEUE;
2532 blocked_queue_hds[i] = END_TSO_QUEUE;
2533 blocked_queue_tls[i] = END_TSO_QUEUE;
2534 ccalling_threadss[i] = END_TSO_QUEUE;
2535 blackhole_queue[i] = END_TSO_QUEUE;
2536 sleeping_queue = END_TSO_QUEUE;
2537 }
2538 #elif !defined(THREADED_RTS)
2539 blocked_queue_hd = END_TSO_QUEUE;
2540 blocked_queue_tl = END_TSO_QUEUE;
2541 sleeping_queue = END_TSO_QUEUE;
2542 #endif
2543
2544 blackhole_queue = END_TSO_QUEUE;
2545 all_threads = END_TSO_QUEUE;
2546
2547 context_switch = 0;
2548 sched_state = SCHED_RUNNING;
2549 recent_activity = ACTIVITY_YES;
2550
2551 #if defined(THREADED_RTS)
2552 /* Initialise the mutex and condition variables used by
2553 * the scheduler. */
2554 initMutex(&sched_mutex);
2555 #endif
2556
2557 ACQUIRE_LOCK(&sched_mutex);
2558
2559 /* A capability holds the state a native thread needs in
2560 * order to execute STG code. At least one capability is
2561 * floating around (only THREADED_RTS builds have more than one).
2562 */
2563 initCapabilities();
2564
2565 initTaskManager();
2566
2567 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2568 initSparkPools();
2569 #endif
2570
2571 #if defined(THREADED_RTS)
2572 /*
2573 * Eagerly start one worker to run each Capability, except for
2574 * Capability 0. The idea is that we're probably going to start a
2575 * bound thread on Capability 0 pretty soon, so we don't want a
2576 * worker task hogging it.
2577 */
2578 {
2579 nat i;
2580 Capability *cap;
2581 for (i = 1; i < n_capabilities; i++) {
2582 cap = &capabilities[i];
2583 ACQUIRE_LOCK(&cap->lock);
2584 startWorkerTask(cap, workerStart);
2585 RELEASE_LOCK(&cap->lock);
2586 }
2587 }
2588 #endif
2589
2590 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2591
2592 RELEASE_LOCK(&sched_mutex);
2593 }
2594
2595 void
2596 exitScheduler(
2597 rtsBool wait_foreign
2598 #if !defined(THREADED_RTS)
2599 __attribute__((unused))
2600 #endif
2601 )
2602 /* see Capability.c, shutdownCapability() */
2603 {
2604 Task *task = NULL;
2605
2606 #if defined(THREADED_RTS)
2607 ACQUIRE_LOCK(&sched_mutex);
2608 task = newBoundTask();
2609 RELEASE_LOCK(&sched_mutex);
2610 #endif
2611
2612 // If we haven't killed all the threads yet, do it now.
2613 if (sched_state < SCHED_SHUTTING_DOWN) {
2614 sched_state = SCHED_INTERRUPTING;
2615 scheduleDoGC(NULL,task,rtsFalse);
2616 }
2617 sched_state = SCHED_SHUTTING_DOWN;
2618
2619 #if defined(THREADED_RTS)
2620 {
2621 nat i;
2622
2623 for (i = 0; i < n_capabilities; i++) {
2624 shutdownCapability(&capabilities[i], task, wait_foreign);
2625 }
2626 boundTaskExiting(task);
2627 stopTaskManager();
2628 }
2629 #else
2630 freeCapability(&MainCapability);
2631 #endif
2632 }
2633
2634 void
2635 freeScheduler( void )
2636 {
2637 freeTaskManager();
2638 if (n_capabilities != 1) {
2639 stgFree(capabilities);
2640 }
2641 #if defined(THREADED_RTS)
2642 closeMutex(&sched_mutex);
2643 #endif
2644 }
2645
2646 /* ---------------------------------------------------------------------------
2647 Where are the roots that we know about?
2648
2649 - all the threads on the runnable queue
2650 - all the threads on the blocked queue
2651 - all the threads on the sleeping queue
2652 - all the thread currently executing a _ccall_GC
2653 - all the "main threads"
2654
2655 ------------------------------------------------------------------------ */
2656
2657 /* This has to be protected either by the scheduler monitor, or by the
2658 garbage collection monitor (probably the latter).
2659 KH @ 25/10/99
2660 */
2661
2662 void
2663 GetRoots( evac_fn evac )
2664 {
2665 nat i;
2666 Capability *cap;
2667 Task *task;
2668
2669 #if defined(GRAN)
2670 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2671 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2672 evac((StgClosure **)&run_queue_hds[i]);
2673 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2674 evac((StgClosure **)&run_queue_tls[i]);
2675
2676 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2677 evac((StgClosure **)&blocked_queue_hds[i]);
2678 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2679 evac((StgClosure **)&blocked_queue_tls[i]);
2680 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2681 evac((StgClosure **)&ccalling_threads[i]);
2682 }
2683
2684 markEventQueue();
2685
2686 #else /* !GRAN */
2687
2688 for (i = 0; i < n_capabilities; i++) {
2689 cap = &capabilities[i];
2690 evac((StgClosure **)(void *)&cap->run_queue_hd);
2691 evac((StgClosure **)(void *)&cap->run_queue_tl);
2692 #if defined(THREADED_RTS)
2693 evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2694 evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2695 #endif
2696 for (task = cap->suspended_ccalling_tasks; task != NULL;
2697 task=task->next) {
2698 debugTrace(DEBUG_sched,
2699 "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2700 evac((StgClosure **)(void *)&task->suspended_tso);
2701 }
2702
2703 }
2704
2705
2706 #if !defined(THREADED_RTS)
2707 evac((StgClosure **)(void *)&blocked_queue_hd);
2708 evac((StgClosure **)(void *)&blocked_queue_tl);
2709 evac((StgClosure **)(void *)&sleeping_queue);
2710 #endif
2711 #endif
2712
2713 // evac((StgClosure **)&blackhole_queue);
2714
2715 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2716 markSparkQueue(evac);
2717 #endif
2718
2719 #if defined(RTS_USER_SIGNALS)
2720 // mark the signal handlers (signals should be already blocked)
2721 if (RtsFlags.MiscFlags.install_signal_handlers) {
2722 markSignalHandlers(evac);
2723 }
2724 #endif
2725 }
2726
2727 /* -----------------------------------------------------------------------------
2728 performGC
2729
2730 This is the interface to the garbage collector from Haskell land.
2731 We provide this so that external C code can allocate and garbage
2732 collect when called from Haskell via _ccall_GC.
2733 -------------------------------------------------------------------------- */
2734
2735 static void
2736 performGC_(rtsBool force_major)
2737 {
2738 Task *task;
2739 // We must grab a new Task here, because the existing Task may be
2740 // associated with a particular Capability, and chained onto the
2741 // suspended_ccalling_tasks queue.
2742 ACQUIRE_LOCK(&sched_mutex);
2743 task = newBoundTask();
2744 RELEASE_LOCK(&sched_mutex);
2745 scheduleDoGC(NULL,task,force_major);
2746 boundTaskExiting(task);
2747 }
2748
2749 void
2750 performGC(void)
2751 {
2752 performGC_(rtsFalse);
2753 }
2754
2755 void
2756 performMajorGC(void)
2757 {
2758 performGC_(rtsTrue);
2759 }
2760
2761 /* -----------------------------------------------------------------------------
2762 Stack overflow
2763
2764 If the thread has reached its maximum stack size, then raise the
2765 StackOverflow exception in the offending thread. Otherwise
2766 relocate the TSO into a larger chunk of memory and adjust its stack
2767 size appropriately.
2768 -------------------------------------------------------------------------- */
2769
2770 static StgTSO *
2771 threadStackOverflow(Capability *cap, StgTSO *tso)
2772 {
2773 nat new_stack_size, stack_words;
2774 lnat new_tso_size;
2775 StgPtr new_sp;
2776 StgTSO *dest;
2777
2778 IF_DEBUG(sanity,checkTSO(tso));
2779
2780 // don't allow throwTo() to modify the blocked_exceptions queue
2781 // while we are moving the TSO:
2782 lockClosure((StgClosure *)tso);
2783
2784 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2785 // NB. never raise a StackOverflow exception if the thread is
2786 // inside Control.Exceptino.block. It is impractical to protect
2787 // against stack overflow exceptions, since virtually anything
2788 // can raise one (even 'catch'), so this is the only sensible
2789 // thing to do here. See bug #767.
2790
2791 debugTrace(DEBUG_gc,
2792 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2793 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2794 IF_DEBUG(gc,
2795 /* If we're debugging, just print out the top of the stack */
2796 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2797 tso->sp+64)));
2798
2799 // Send this thread the StackOverflow exception
2800 unlockTSO(tso);
2801 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2802 return tso;
2803 }
2804
2805 /* Try to double the current stack size. If that takes us over the
2806 * maximum stack size for this thread, then use the maximum instead.
2807 * Finally round up so the TSO ends up as a whole number of blocks.
2808 */
2809 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2810 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2811 TSO_STRUCT_SIZE)/sizeof(W_);
2812 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2813 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2814
2815 debugTrace(DEBUG_sched,
2816 "increasing stack size from %ld words to %d.",
2817 (long)tso->stack_size, new_stack_size);
2818
2819 dest = (StgTSO *)allocate(new_tso_size);
2820 TICK_ALLOC_TSO(new_stack_size,0);
2821
2822 /* copy the TSO block and the old stack into the new area */
2823 memcpy(dest,tso,TSO_STRUCT_SIZE);
2824 stack_words = tso->stack + tso->stack_size - tso->sp;
2825 new_sp = (P_)dest + new_tso_size - stack_words;
2826 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2827
2828 /* relocate the stack pointers... */
2829 dest->sp = new_sp;
2830 dest->stack_size = new_stack_size;
2831
2832 /* Mark the old TSO as relocated. We have to check for relocated
2833 * TSOs in the garbage collector and any primops that deal with TSOs.
2834 *
2835 * It's important to set the sp value to just beyond the end
2836 * of the stack, so we don't attempt to scavenge any part of the
2837 * dead TSO's stack.
2838 */
2839 tso->what_next = ThreadRelocated;
2840 tso->link = dest;
2841 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2842 tso->why_blocked = NotBlocked;
2843
2844 IF_PAR_DEBUG(verbose,
2845 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2846 tso->id, tso, tso->stack_size);
2847 /* If we're debugging, just print out the top of the stack */
2848 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2849 tso->sp+64)));
2850
2851 unlockTSO(dest);
2852 unlockTSO(tso);
2853
2854 IF_DEBUG(sanity,checkTSO(dest));
2855 #if 0
2856 IF_DEBUG(scheduler,printTSO(dest));
2857 #endif
2858
2859 return dest;
2860 }
2861
2862 /* ---------------------------------------------------------------------------
2863 Interrupt execution
2864 - usually called inside a signal handler so it mustn't do anything fancy.
2865 ------------------------------------------------------------------------ */
2866
2867 void
2868 interruptStgRts(void)
2869 {
2870 sched_state = SCHED_INTERRUPTING;
2871 context_switch = 1;
2872 wakeUpRts();
2873 }
2874
2875 /* -----------------------------------------------------------------------------
2876 Wake up the RTS
2877
2878 This function causes at least one OS thread to wake up and run the
2879 scheduler loop. It is invoked when the RTS might be deadlocked, or
2880 an external event has arrived that may need servicing (eg. a
2881 keyboard interrupt).
2882
2883 In the single-threaded RTS we don't do anything here; we only have
2884 one thread anyway, and the event that caused us to want to wake up
2885 will have interrupted any blocking system call in progress anyway.
2886 -------------------------------------------------------------------------- */
2887
2888 void
2889 wakeUpRts(void)
2890 {
2891 #if defined(THREADED_RTS)
2892 // This forces the IO Manager thread to wakeup, which will
2893 // in turn ensure that some OS thread wakes up and runs the
2894 // scheduler loop, which will cause a GC and deadlock check.
2895 ioManagerWakeup();
2896 #endif
2897 }
2898
2899 /* -----------------------------------------------------------------------------
2900 * checkBlackHoles()
2901 *
2902 * Check the blackhole_queue for threads that can be woken up. We do
2903 * this periodically: before every GC, and whenever the run queue is
2904 * empty.
2905 *
2906 * An elegant solution might be to just wake up all the blocked
2907 * threads with awakenBlockedQueue occasionally: they'll go back to
2908 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2909 * doesn't give us a way to tell whether we've actually managed to
2910 * wake up any threads, so we would be busy-waiting.
2911 *
2912 * -------------------------------------------------------------------------- */
2913
2914 static rtsBool
2915 checkBlackHoles (Capability *cap)
2916 {
2917 StgTSO **prev, *t;
2918 rtsBool any_woke_up = rtsFalse;
2919 StgHalfWord type;
2920
2921 // blackhole_queue is global:
2922 ASSERT_LOCK_HELD(&sched_mutex);
2923
2924 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2925
2926 // ASSUMES: sched_mutex
2927 prev = &blackhole_queue;
2928 t = blackhole_queue;
2929 while (t != END_TSO_QUEUE) {
2930 ASSERT(t->why_blocked == BlockedOnBlackHole);
2931 type = get_itbl(t->block_info.closure)->type;
2932 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2933 IF_DEBUG(sanity,checkTSO(t));
2934 t = unblockOne(cap, t);
2935 // urk, the threads migrate to the current capability
2936 // here, but we'd like to keep them on the original one.
2937 *prev = t;
2938 any_woke_up = rtsTrue;
2939 } else {
2940 prev = &t->link;
2941 t = t->link;
2942 }
2943 }
2944
2945 return any_woke_up;
2946 }
2947
2948 /* -----------------------------------------------------------------------------
2949 Deleting threads
2950
2951 This is used for interruption (^C) and forking, and corresponds to
2952 raising an exception but without letting the thread catch the
2953 exception.
2954 -------------------------------------------------------------------------- */
2955
2956 static void
2957 deleteThread (Capability *cap, StgTSO *tso)
2958 {
2959 // NOTE: must only be called on a TSO that we have exclusive
2960 // access to, because we will call throwToSingleThreaded() below.
2961 // The TSO must be on the run queue of the Capability we own, or
2962 // we must own all Capabilities.
2963
2964 if (tso->why_blocked != BlockedOnCCall &&
2965 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2966 throwToSingleThreaded(cap,tso,NULL);
2967 }
2968 }
2969
2970 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2971 static void
2972 deleteThread_(Capability *cap, StgTSO *tso)
2973 { // for forkProcess only:
2974 // like deleteThread(), but we delete threads in foreign calls, too.
2975
2976 if (tso->why_blocked == BlockedOnCCall ||
2977 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2978 unblockOne(cap,tso);
2979 tso->what_next = ThreadKilled;
2980 } else {
2981 deleteThread(cap,tso);
2982 }
2983 }
2984 #endif
2985
2986 /* -----------------------------------------------------------------------------
2987 raiseExceptionHelper
2988
2989 This function is called by the raise# primitve, just so that we can
2990 move some of the tricky bits of raising an exception from C-- into
2991 C. Who knows, it might be a useful re-useable thing here too.
2992 -------------------------------------------------------------------------- */
2993
2994 StgWord
2995 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2996 {
2997 Capability *cap = regTableToCapability(reg);
2998 StgThunk *raise_closure = NULL;
2999 StgPtr p, next;
3000 StgRetInfoTable *info;
3001 //
3002 // This closure represents the expression 'raise# E' where E
3003 // is the exception raise. It is used to overwrite all the
3004 // thunks which are currently under evaluataion.
3005 //
3006
3007 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
3008 // LDV profiling: stg_raise_info has THUNK as its closure
3009 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3010 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3011 // 1 does not cause any problem unless profiling is performed.
3012 // However, when LDV profiling goes on, we need to linearly scan
3013 // small object pool, where raise_closure is stored, so we should
3014 // use MIN_UPD_SIZE.
3015 //
3016 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3017 // sizeofW(StgClosure)+1);
3018 //
3019
3020 //
3021 // Walk up the stack, looking for the catch frame. On the way,
3022 // we update any closures pointed to from update frames with the
3023 // raise closure that we just built.
3024 //
3025 p = tso->sp;
3026 while(1) {
3027 info = get_ret_itbl((StgClosure *)p);
3028 next = p + stack_frame_sizeW((StgClosure *)p);
3029 switch (info->i.type) {
3030
3031 case UPDATE_FRAME:
3032 // Only create raise_closure if we need to.
3033 if (raise_closure == NULL) {
3034 raise_closure =
3035 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3036 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3037 raise_closure->payload[0] = exception;
3038 }
3039 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3040 p = next;
3041 continue;
3042
3043 case ATOMICALLY_FRAME:
3044 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3045 tso->sp = p;
3046 return ATOMICALLY_FRAME;
3047
3048 case CATCH_FRAME:
3049 tso->sp = p;
3050 return CATCH_FRAME;
3051
3052 case CATCH_STM_FRAME:
3053 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3054 tso->sp = p;
3055 return CATCH_STM_FRAME;
3056
3057 case STOP_FRAME:
3058 tso->sp = p;
3059 return STOP_FRAME;
3060
3061 case CATCH_RETRY_FRAME:
3062 default:
3063 p = next;
3064 continue;
3065 }
3066 }
3067 }
3068
3069
3070 /* -----------------------------------------------------------------------------
3071 findRetryFrameHelper
3072
3073 This function is called by the retry# primitive. It traverses the stack
3074 leaving tso->sp referring to the frame which should handle the retry.
3075
3076 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3077 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3078
3079 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3080 create) because retries are not considered to be exceptions, despite the
3081 similar implementation.
3082
3083 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3084 not be created within memory transactions.
3085 -------------------------------------------------------------------------- */
3086
3087 StgWord
3088 findRetryFrameHelper (StgTSO *tso)
3089 {
3090 StgPtr p, next;
3091 StgRetInfoTable *info;
3092
3093 p = tso -> sp;
3094 while (1) {
3095 info = get_ret_itbl((StgClosure *)p);
3096 next = p + stack_frame_sizeW((StgClosure *)p);
3097 switch (info->i.type) {
3098
3099 case ATOMICALLY_FRAME:
3100 debugTrace(DEBUG_stm,
3101 "found ATOMICALLY_FRAME at %p during retry", p);
3102 tso->sp = p;
3103 return ATOMICALLY_FRAME;
3104
3105 case CATCH_RETRY_FRAME:
3106 debugTrace(DEBUG_stm,
3107 "found CATCH_RETRY_FRAME at %p during retrry", p);
3108 tso->sp = p;
3109 return CATCH_RETRY_FRAME;
3110
3111 case CATCH_STM_FRAME: {
3112 debugTrace(DEBUG_stm,
3113 "found CATCH_STM_FRAME at %p during retry", p);
3114 StgTRecHeader *trec = tso -> trec;
3115 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3116 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3117 stmAbortTransaction(tso -> cap, trec);
3118 stmFreeAbortedTRec(tso -> cap, trec);
3119 tso -> trec = outer;
3120 p = next;
3121 continue;
3122 }
3123
3124
3125 default:
3126 ASSERT(info->i.type != CATCH_FRAME);
3127 ASSERT(info->i.type != STOP_FRAME);
3128 p = next;
3129 continue;
3130 }
3131 }
3132 }
3133
3134 /* -----------------------------------------------------------------------------
3135 resurrectThreads is called after garbage collection on the list of
3136 threads found to be garbage. Each of these threads will be woken
3137 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3138 on an MVar, or NonTermination if the thread was blocked on a Black
3139 Hole.
3140
3141 Locks: assumes we hold *all* the capabilities.
3142 -------------------------------------------------------------------------- */
3143
3144 void
3145 resurrectThreads (StgTSO *threads)
3146 {
3147 StgTSO *tso, *next;
3148 Capability *cap;
3149
3150 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3151 next = tso->global_link;
3152 tso->global_link = all_threads;
3153 all_threads = tso;
3154 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3155
3156 // Wake up the thread on the Capability it was last on
3157 cap = tso->cap;
3158
3159 switch (tso->why_blocked) {
3160 case BlockedOnMVar:
3161 case BlockedOnException:
3162 /* Called by GC - sched_mutex lock is currently held. */
3163 throwToSingleThreaded(cap, tso,
3164 (StgClosure *)BlockedOnDeadMVar_closure);
3165 break;
3166 case BlockedOnBlackHole:
3167 throwToSingleThreaded(cap, tso,
3168 (StgClosure *)NonTermination_closure);
3169 break;
3170 case BlockedOnSTM:
3171 throwToSingleThreaded(cap, tso,
3172 (StgClosure *)BlockedIndefinitely_closure);
3173 break;
3174 case NotBlocked:
3175 /* This might happen if the thread was blocked on a black hole
3176 * belonging to a thread that we've just woken up (raiseAsync
3177 * can wake up threads, remember...).
3178 */
3179 continue;
3180 default:
3181 barf("resurrectThreads: thread blocked in a strange way");
3182 }
3183 }
3184 }