8bd42414b52f25a8342913738063f6d32c5fd6ab
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
36 # include "GranSim.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
40 # include "FetchMe.h"
41 # include "HLC.h"
42 #endif
43 #include "Sparks.h"
44 #include "Capability.h"
45 #include "Task.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
49 #endif
50 #include "Trace.h"
51 #include "RaiseAsync.h"
52 #include "Threads.h"
53 #include "ThrIOManager.h"
54
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
57 #endif
58 #ifdef HAVE_UNISTD_H
59 #include <unistd.h>
60 #endif
61
62 #include <string.h>
63 #include <stdlib.h>
64 #include <stdarg.h>
65
66 #ifdef HAVE_ERRNO_H
67 #include <errno.h>
68 #endif
69
70 // Turn off inlining when debugging - it obfuscates things
71 #ifdef DEBUG
72 # undef STATIC_INLINE
73 # define STATIC_INLINE static
74 #endif
75
76 /* -----------------------------------------------------------------------------
77 * Global variables
78 * -------------------------------------------------------------------------- */
79
80 #if defined(GRAN)
81
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
84
85 /*
86 In GranSim we have a runnable and a blocked queue for each processor.
87 In order to minimise code changes new arrays run_queue_hds/tls
88 are created. run_queue_hd is then a short cut (macro) for
89 run_queue_hds[CurrentProc] (see GranSim.h).
90 -- HWL
91 */
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96 the std RTS (i.e. we are cheating). However, we don't use this list in
97 the GranSim specific code at the moment (so we are only potentially
98 cheating). */
99
100 #else /* !GRAN */
101
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
107 #endif
108
109 /* Threads blocked on blackholes.
110 * LOCK: sched_mutex+capability, or all capabilities
111 */
112 StgTSO *blackhole_queue = NULL;
113 #endif
114
115 /* The blackhole_queue should be checked for threads to wake up. See
116 * Schedule.h for more thorough comment.
117 * LOCK: none (doesn't matter if we miss an update)
118 */
119 rtsBool blackholes_need_checking = rtsFalse;
120
121 /* Linked list of all threads.
122 * Used for detecting garbage collected threads.
123 * LOCK: sched_mutex+capability, or all capabilities
124 */
125 StgTSO *all_threads = NULL;
126
127 /* flag set by signal handler to precipitate a context switch
128 * LOCK: none (just an advisory flag)
129 */
130 int context_switch = 0;
131
132 /* flag that tracks whether we have done any execution in this time slice.
133 * LOCK: currently none, perhaps we should lock (but needs to be
134 * updated in the fast path of the scheduler).
135 */
136 nat recent_activity = ACTIVITY_YES;
137
138 /* if this flag is set as well, give up execution
139 * LOCK: none (changes once, from false->true)
140 */
141 rtsBool sched_state = SCHED_RUNNING;
142
143 #if defined(GRAN)
144 StgTSO *CurrentTSO;
145 #endif
146
147 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
148 * exists - earlier gccs apparently didn't.
149 * -= chak
150 */
151 StgTSO dummy_tso;
152
153 /*
154 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
155 * in an MT setting, needed to signal that a worker thread shouldn't hang around
156 * in the scheduler when it is out of work.
157 */
158 rtsBool shutting_down_scheduler = rtsFalse;
159
160 /*
161 * This mutex protects most of the global scheduler data in
162 * the THREADED_RTS runtime.
163 */
164 #if defined(THREADED_RTS)
165 Mutex sched_mutex;
166 #endif
167
168 #if defined(PARALLEL_HASKELL)
169 StgTSO *LastTSO;
170 rtsTime TimeOfLastYield;
171 rtsBool emitSchedule = rtsTrue;
172 #endif
173
174 #if !defined(mingw32_HOST_OS)
175 #define FORKPROCESS_PRIMOP_SUPPORTED
176 #endif
177
178 /* -----------------------------------------------------------------------------
179 * static function prototypes
180 * -------------------------------------------------------------------------- */
181
182 static Capability *schedule (Capability *initialCapability, Task *task);
183
184 //
185 // These function all encapsulate parts of the scheduler loop, and are
186 // abstracted only to make the structure and control flow of the
187 // scheduler clearer.
188 //
189 static void schedulePreLoop (void);
190 #if defined(THREADED_RTS)
191 static void schedulePushWork(Capability *cap, Task *task);
192 #endif
193 static void scheduleStartSignalHandlers (Capability *cap);
194 static void scheduleCheckBlockedThreads (Capability *cap);
195 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
196 static void scheduleCheckBlackHoles (Capability *cap);
197 static void scheduleDetectDeadlock (Capability *cap, Task *task);
198 #if defined(GRAN)
199 static StgTSO *scheduleProcessEvent(rtsEvent *event);
200 #endif
201 #if defined(PARALLEL_HASKELL)
202 static StgTSO *scheduleSendPendingMessages(void);
203 static void scheduleActivateSpark(void);
204 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
205 #endif
206 #if defined(PAR) || defined(GRAN)
207 static void scheduleGranParReport(void);
208 #endif
209 static void schedulePostRunThread(void);
210 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
211 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
212 StgTSO *t);
213 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
214 nat prev_what_next );
215 static void scheduleHandleThreadBlocked( StgTSO *t );
216 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
217 StgTSO *t );
218 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
219 static Capability *scheduleDoGC(Capability *cap, Task *task,
220 rtsBool force_major);
221
222 static rtsBool checkBlackHoles(Capability *cap);
223
224 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
225
226 static void deleteThread (Capability *cap, StgTSO *tso);
227 static void deleteAllThreads (Capability *cap);
228
229 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
230 static void deleteThread_(Capability *cap, StgTSO *tso);
231 #endif
232
233 #if defined(PARALLEL_HASKELL)
234 StgTSO * createSparkThread(rtsSpark spark);
235 StgTSO * activateSpark (rtsSpark spark);
236 #endif
237
238 #ifdef DEBUG
239 static char *whatNext_strs[] = {
240 "(unknown)",
241 "ThreadRunGHC",
242 "ThreadInterpret",
243 "ThreadKilled",
244 "ThreadRelocated",
245 "ThreadComplete"
246 };
247 #endif
248
249 /* -----------------------------------------------------------------------------
250 * Putting a thread on the run queue: different scheduling policies
251 * -------------------------------------------------------------------------- */
252
253 STATIC_INLINE void
254 addToRunQueue( Capability *cap, StgTSO *t )
255 {
256 #if defined(PARALLEL_HASKELL)
257 if (RtsFlags.ParFlags.doFairScheduling) {
258 // this does round-robin scheduling; good for concurrency
259 appendToRunQueue(cap,t);
260 } else {
261 // this does unfair scheduling; good for parallelism
262 pushOnRunQueue(cap,t);
263 }
264 #else
265 // this does round-robin scheduling; good for concurrency
266 appendToRunQueue(cap,t);
267 #endif
268 }
269
270 /* ---------------------------------------------------------------------------
271 Main scheduling loop.
272
273 We use round-robin scheduling, each thread returning to the
274 scheduler loop when one of these conditions is detected:
275
276 * out of heap space
277 * timer expires (thread yields)
278 * thread blocks
279 * thread ends
280 * stack overflow
281
282 GRAN version:
283 In a GranSim setup this loop iterates over the global event queue.
284 This revolves around the global event queue, which determines what
285 to do next. Therefore, it's more complicated than either the
286 concurrent or the parallel (GUM) setup.
287
288 GUM version:
289 GUM iterates over incoming messages.
290 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
291 and sends out a fish whenever it has nothing to do; in-between
292 doing the actual reductions (shared code below) it processes the
293 incoming messages and deals with delayed operations
294 (see PendingFetches).
295 This is not the ugliest code you could imagine, but it's bloody close.
296
297 ------------------------------------------------------------------------ */
298
299 static Capability *
300 schedule (Capability *initialCapability, Task *task)
301 {
302 StgTSO *t;
303 Capability *cap;
304 StgThreadReturnCode ret;
305 #if defined(GRAN)
306 rtsEvent *event;
307 #elif defined(PARALLEL_HASKELL)
308 StgTSO *tso;
309 GlobalTaskId pe;
310 rtsBool receivedFinish = rtsFalse;
311 # if defined(DEBUG)
312 nat tp_size, sp_size; // stats only
313 # endif
314 #endif
315 nat prev_what_next;
316 rtsBool ready_to_gc;
317 #if defined(THREADED_RTS)
318 rtsBool first = rtsTrue;
319 #endif
320
321 cap = initialCapability;
322
323 // Pre-condition: this task owns initialCapability.
324 // The sched_mutex is *NOT* held
325 // NB. on return, we still hold a capability.
326
327 debugTrace (DEBUG_sched,
328 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
329 task, initialCapability);
330
331 schedulePreLoop();
332
333 // -----------------------------------------------------------
334 // Scheduler loop starts here:
335
336 #if defined(PARALLEL_HASKELL)
337 #define TERMINATION_CONDITION (!receivedFinish)
338 #elif defined(GRAN)
339 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
340 #else
341 #define TERMINATION_CONDITION rtsTrue
342 #endif
343
344 while (TERMINATION_CONDITION) {
345
346 #if defined(GRAN)
347 /* Choose the processor with the next event */
348 CurrentProc = event->proc;
349 CurrentTSO = event->tso;
350 #endif
351
352 #if defined(THREADED_RTS)
353 if (first) {
354 // don't yield the first time, we want a chance to run this
355 // thread for a bit, even if there are others banging at the
356 // door.
357 first = rtsFalse;
358 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
359 } else {
360 // Yield the capability to higher-priority tasks if necessary.
361 yieldCapability(&cap, task);
362 }
363 #endif
364
365 #if defined(THREADED_RTS)
366 schedulePushWork(cap,task);
367 #endif
368
369 // Check whether we have re-entered the RTS from Haskell without
370 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
371 // call).
372 if (cap->in_haskell) {
373 errorBelch("schedule: re-entered unsafely.\n"
374 " Perhaps a 'foreign import unsafe' should be 'safe'?");
375 stg_exit(EXIT_FAILURE);
376 }
377
378 // The interruption / shutdown sequence.
379 //
380 // In order to cleanly shut down the runtime, we want to:
381 // * make sure that all main threads return to their callers
382 // with the state 'Interrupted'.
383 // * clean up all OS threads assocated with the runtime
384 // * free all memory etc.
385 //
386 // So the sequence for ^C goes like this:
387 //
388 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
389 // arranges for some Capability to wake up
390 //
391 // * all threads in the system are halted, and the zombies are
392 // placed on the run queue for cleaning up. We acquire all
393 // the capabilities in order to delete the threads, this is
394 // done by scheduleDoGC() for convenience (because GC already
395 // needs to acquire all the capabilities). We can't kill
396 // threads involved in foreign calls.
397 //
398 // * somebody calls shutdownHaskell(), which calls exitScheduler()
399 //
400 // * sched_state := SCHED_SHUTTING_DOWN
401 //
402 // * all workers exit when the run queue on their capability
403 // drains. All main threads will also exit when their TSO
404 // reaches the head of the run queue and they can return.
405 //
406 // * eventually all Capabilities will shut down, and the RTS can
407 // exit.
408 //
409 // * We might be left with threads blocked in foreign calls,
410 // we should really attempt to kill these somehow (TODO);
411
412 switch (sched_state) {
413 case SCHED_RUNNING:
414 break;
415 case SCHED_INTERRUPTING:
416 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
417 #if defined(THREADED_RTS)
418 discardSparksCap(cap);
419 #endif
420 /* scheduleDoGC() deletes all the threads */
421 cap = scheduleDoGC(cap,task,rtsFalse);
422 break;
423 case SCHED_SHUTTING_DOWN:
424 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
425 // If we are a worker, just exit. If we're a bound thread
426 // then we will exit below when we've removed our TSO from
427 // the run queue.
428 if (task->tso == NULL && emptyRunQueue(cap)) {
429 return cap;
430 }
431 break;
432 default:
433 barf("sched_state: %d", sched_state);
434 }
435
436 #if defined(THREADED_RTS)
437 // If the run queue is empty, take a spark and turn it into a thread.
438 {
439 if (emptyRunQueue(cap)) {
440 StgClosure *spark;
441 spark = findSpark(cap);
442 if (spark != NULL) {
443 debugTrace(DEBUG_sched,
444 "turning spark of closure %p into a thread",
445 (StgClosure *)spark);
446 createSparkThread(cap,spark);
447 }
448 }
449 }
450 #endif // THREADED_RTS
451
452 scheduleStartSignalHandlers(cap);
453
454 // Only check the black holes here if we've nothing else to do.
455 // During normal execution, the black hole list only gets checked
456 // at GC time, to avoid repeatedly traversing this possibly long
457 // list each time around the scheduler.
458 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
459
460 scheduleCheckWakeupThreads(cap);
461
462 scheduleCheckBlockedThreads(cap);
463
464 scheduleDetectDeadlock(cap,task);
465 #if defined(THREADED_RTS)
466 cap = task->cap; // reload cap, it might have changed
467 #endif
468
469 // Normally, the only way we can get here with no threads to
470 // run is if a keyboard interrupt received during
471 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
472 // Additionally, it is not fatal for the
473 // threaded RTS to reach here with no threads to run.
474 //
475 // win32: might be here due to awaitEvent() being abandoned
476 // as a result of a console event having been delivered.
477 if ( emptyRunQueue(cap) ) {
478 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
479 ASSERT(sched_state >= SCHED_INTERRUPTING);
480 #endif
481 continue; // nothing to do
482 }
483
484 #if defined(PARALLEL_HASKELL)
485 scheduleSendPendingMessages();
486 if (emptyRunQueue(cap) && scheduleActivateSpark())
487 continue;
488
489 #if defined(SPARKS)
490 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
491 #endif
492
493 /* If we still have no work we need to send a FISH to get a spark
494 from another PE */
495 if (emptyRunQueue(cap)) {
496 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
497 ASSERT(rtsFalse); // should not happen at the moment
498 }
499 // from here: non-empty run queue.
500 // TODO: merge above case with this, only one call processMessages() !
501 if (PacketsWaiting()) { /* process incoming messages, if
502 any pending... only in else
503 because getRemoteWork waits for
504 messages as well */
505 receivedFinish = processMessages();
506 }
507 #endif
508
509 #if defined(GRAN)
510 scheduleProcessEvent(event);
511 #endif
512
513 //
514 // Get a thread to run
515 //
516 t = popRunQueue(cap);
517
518 #if defined(GRAN) || defined(PAR)
519 scheduleGranParReport(); // some kind of debuging output
520 #else
521 // Sanity check the thread we're about to run. This can be
522 // expensive if there is lots of thread switching going on...
523 IF_DEBUG(sanity,checkTSO(t));
524 #endif
525
526 #if defined(THREADED_RTS)
527 // Check whether we can run this thread in the current task.
528 // If not, we have to pass our capability to the right task.
529 {
530 Task *bound = t->bound;
531
532 if (bound) {
533 if (bound == task) {
534 debugTrace(DEBUG_sched,
535 "### Running thread %lu in bound thread", (unsigned long)t->id);
536 // yes, the Haskell thread is bound to the current native thread
537 } else {
538 debugTrace(DEBUG_sched,
539 "### thread %lu bound to another OS thread", (unsigned long)t->id);
540 // no, bound to a different Haskell thread: pass to that thread
541 pushOnRunQueue(cap,t);
542 continue;
543 }
544 } else {
545 // The thread we want to run is unbound.
546 if (task->tso) {
547 debugTrace(DEBUG_sched,
548 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
549 // no, the current native thread is bound to a different
550 // Haskell thread, so pass it to any worker thread
551 pushOnRunQueue(cap,t);
552 continue;
553 }
554 }
555 }
556 #endif
557
558 cap->r.rCurrentTSO = t;
559
560 /* context switches are initiated by the timer signal, unless
561 * the user specified "context switch as often as possible", with
562 * +RTS -C0
563 */
564 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
565 && !emptyThreadQueues(cap)) {
566 context_switch = 1;
567 }
568
569 run_thread:
570
571 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
572 (long)t->id, whatNext_strs[t->what_next]);
573
574 startHeapProfTimer();
575
576 // Check for exceptions blocked on this thread
577 maybePerformBlockedException (cap, t);
578
579 // ----------------------------------------------------------------------
580 // Run the current thread
581
582 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
583 ASSERT(t->cap == cap);
584
585 prev_what_next = t->what_next;
586
587 errno = t->saved_errno;
588 #if mingw32_HOST_OS
589 SetLastError(t->saved_winerror);
590 #endif
591
592 cap->in_haskell = rtsTrue;
593
594 dirtyTSO(t);
595
596 recent_activity = ACTIVITY_YES;
597
598 switch (prev_what_next) {
599
600 case ThreadKilled:
601 case ThreadComplete:
602 /* Thread already finished, return to scheduler. */
603 ret = ThreadFinished;
604 break;
605
606 case ThreadRunGHC:
607 {
608 StgRegTable *r;
609 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
610 cap = regTableToCapability(r);
611 ret = r->rRet;
612 break;
613 }
614
615 case ThreadInterpret:
616 cap = interpretBCO(cap);
617 ret = cap->r.rRet;
618 break;
619
620 default:
621 barf("schedule: invalid what_next field");
622 }
623
624 cap->in_haskell = rtsFalse;
625
626 // The TSO might have moved, eg. if it re-entered the RTS and a GC
627 // happened. So find the new location:
628 t = cap->r.rCurrentTSO;
629
630 // We have run some Haskell code: there might be blackhole-blocked
631 // threads to wake up now.
632 // Lock-free test here should be ok, we're just setting a flag.
633 if ( blackhole_queue != END_TSO_QUEUE ) {
634 blackholes_need_checking = rtsTrue;
635 }
636
637 // And save the current errno in this thread.
638 // XXX: possibly bogus for SMP because this thread might already
639 // be running again, see code below.
640 t->saved_errno = errno;
641 #if mingw32_HOST_OS
642 // Similarly for Windows error code
643 t->saved_winerror = GetLastError();
644 #endif
645
646 #if defined(THREADED_RTS)
647 // If ret is ThreadBlocked, and this Task is bound to the TSO that
648 // blocked, we are in limbo - the TSO is now owned by whatever it
649 // is blocked on, and may in fact already have been woken up,
650 // perhaps even on a different Capability. It may be the case
651 // that task->cap != cap. We better yield this Capability
652 // immediately and return to normaility.
653 if (ret == ThreadBlocked) {
654 debugTrace(DEBUG_sched,
655 "--<< thread %lu (%s) stopped: blocked",
656 (unsigned long)t->id, whatNext_strs[t->what_next]);
657 continue;
658 }
659 #endif
660
661 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
662 ASSERT(t->cap == cap);
663
664 // ----------------------------------------------------------------------
665
666 // Costs for the scheduler are assigned to CCS_SYSTEM
667 stopHeapProfTimer();
668 #if defined(PROFILING)
669 CCCS = CCS_SYSTEM;
670 #endif
671
672 schedulePostRunThread();
673
674 ready_to_gc = rtsFalse;
675
676 switch (ret) {
677 case HeapOverflow:
678 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
679 break;
680
681 case StackOverflow:
682 scheduleHandleStackOverflow(cap,task,t);
683 break;
684
685 case ThreadYielding:
686 if (scheduleHandleYield(cap, t, prev_what_next)) {
687 // shortcut for switching between compiler/interpreter:
688 goto run_thread;
689 }
690 break;
691
692 case ThreadBlocked:
693 scheduleHandleThreadBlocked(t);
694 break;
695
696 case ThreadFinished:
697 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
698 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
699 break;
700
701 default:
702 barf("schedule: invalid thread return code %d", (int)ret);
703 }
704
705 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
706 cap = scheduleDoGC(cap,task,rtsFalse);
707 }
708 } /* end of while() */
709
710 debugTrace(PAR_DEBUG_verbose,
711 "== Leaving schedule() after having received Finish");
712 }
713
714 /* ----------------------------------------------------------------------------
715 * Setting up the scheduler loop
716 * ------------------------------------------------------------------------- */
717
718 static void
719 schedulePreLoop(void)
720 {
721 #if defined(GRAN)
722 /* set up first event to get things going */
723 /* ToDo: assign costs for system setup and init MainTSO ! */
724 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
725 ContinueThread,
726 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
727
728 debugTrace (DEBUG_gran,
729 "GRAN: Init CurrentTSO (in schedule) = %p",
730 CurrentTSO);
731 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
732
733 if (RtsFlags.GranFlags.Light) {
734 /* Save current time; GranSim Light only */
735 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
736 }
737 #endif
738 }
739
740 /* -----------------------------------------------------------------------------
741 * schedulePushWork()
742 *
743 * Push work to other Capabilities if we have some.
744 * -------------------------------------------------------------------------- */
745
746 #if defined(THREADED_RTS)
747 static void
748 schedulePushWork(Capability *cap USED_IF_THREADS,
749 Task *task USED_IF_THREADS)
750 {
751 Capability *free_caps[n_capabilities], *cap0;
752 nat i, n_free_caps;
753
754 // migration can be turned off with +RTS -qg
755 if (!RtsFlags.ParFlags.migrate) return;
756
757 // Check whether we have more threads on our run queue, or sparks
758 // in our pool, that we could hand to another Capability.
759 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
760 && sparkPoolSizeCap(cap) < 2) {
761 return;
762 }
763
764 // First grab as many free Capabilities as we can.
765 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
766 cap0 = &capabilities[i];
767 if (cap != cap0 && tryGrabCapability(cap0,task)) {
768 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
769 // it already has some work, we just grabbed it at
770 // the wrong moment. Or maybe it's deadlocked!
771 releaseCapability(cap0);
772 } else {
773 free_caps[n_free_caps++] = cap0;
774 }
775 }
776 }
777
778 // we now have n_free_caps free capabilities stashed in
779 // free_caps[]. Share our run queue equally with them. This is
780 // probably the simplest thing we could do; improvements we might
781 // want to do include:
782 //
783 // - giving high priority to moving relatively new threads, on
784 // the gournds that they haven't had time to build up a
785 // working set in the cache on this CPU/Capability.
786 //
787 // - giving low priority to moving long-lived threads
788
789 if (n_free_caps > 0) {
790 StgTSO *prev, *t, *next;
791 rtsBool pushed_to_all;
792
793 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
794
795 i = 0;
796 pushed_to_all = rtsFalse;
797
798 if (cap->run_queue_hd != END_TSO_QUEUE) {
799 prev = cap->run_queue_hd;
800 t = prev->link;
801 prev->link = END_TSO_QUEUE;
802 for (; t != END_TSO_QUEUE; t = next) {
803 next = t->link;
804 t->link = END_TSO_QUEUE;
805 if (t->what_next == ThreadRelocated
806 || t->bound == task // don't move my bound thread
807 || tsoLocked(t)) { // don't move a locked thread
808 prev->link = t;
809 prev = t;
810 } else if (i == n_free_caps) {
811 pushed_to_all = rtsTrue;
812 i = 0;
813 // keep one for us
814 prev->link = t;
815 prev = t;
816 } else {
817 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
818 appendToRunQueue(free_caps[i],t);
819 if (t->bound) { t->bound->cap = free_caps[i]; }
820 t->cap = free_caps[i];
821 i++;
822 }
823 }
824 cap->run_queue_tl = prev;
825 }
826
827 // If there are some free capabilities that we didn't push any
828 // threads to, then try to push a spark to each one.
829 if (!pushed_to_all) {
830 StgClosure *spark;
831 // i is the next free capability to push to
832 for (; i < n_free_caps; i++) {
833 if (emptySparkPoolCap(free_caps[i])) {
834 spark = findSpark(cap);
835 if (spark != NULL) {
836 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
837 newSpark(&(free_caps[i]->r), spark);
838 }
839 }
840 }
841 }
842
843 // release the capabilities
844 for (i = 0; i < n_free_caps; i++) {
845 task->cap = free_caps[i];
846 releaseCapability(free_caps[i]);
847 }
848 }
849 task->cap = cap; // reset to point to our Capability.
850 }
851 #endif
852
853 /* ----------------------------------------------------------------------------
854 * Start any pending signal handlers
855 * ------------------------------------------------------------------------- */
856
857 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
858 static void
859 scheduleStartSignalHandlers(Capability *cap)
860 {
861 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
862 // safe outside the lock
863 startSignalHandlers(cap);
864 }
865 }
866 #else
867 static void
868 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
869 {
870 }
871 #endif
872
873 /* ----------------------------------------------------------------------------
874 * Check for blocked threads that can be woken up.
875 * ------------------------------------------------------------------------- */
876
877 static void
878 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
879 {
880 #if !defined(THREADED_RTS)
881 //
882 // Check whether any waiting threads need to be woken up. If the
883 // run queue is empty, and there are no other tasks running, we
884 // can wait indefinitely for something to happen.
885 //
886 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
887 {
888 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
889 }
890 #endif
891 }
892
893
894 /* ----------------------------------------------------------------------------
895 * Check for threads woken up by other Capabilities
896 * ------------------------------------------------------------------------- */
897
898 static void
899 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
900 {
901 #if defined(THREADED_RTS)
902 // Any threads that were woken up by other Capabilities get
903 // appended to our run queue.
904 if (!emptyWakeupQueue(cap)) {
905 ACQUIRE_LOCK(&cap->lock);
906 if (emptyRunQueue(cap)) {
907 cap->run_queue_hd = cap->wakeup_queue_hd;
908 cap->run_queue_tl = cap->wakeup_queue_tl;
909 } else {
910 cap->run_queue_tl->link = cap->wakeup_queue_hd;
911 cap->run_queue_tl = cap->wakeup_queue_tl;
912 }
913 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
914 RELEASE_LOCK(&cap->lock);
915 }
916 #endif
917 }
918
919 /* ----------------------------------------------------------------------------
920 * Check for threads blocked on BLACKHOLEs that can be woken up
921 * ------------------------------------------------------------------------- */
922 static void
923 scheduleCheckBlackHoles (Capability *cap)
924 {
925 if ( blackholes_need_checking ) // check without the lock first
926 {
927 ACQUIRE_LOCK(&sched_mutex);
928 if ( blackholes_need_checking ) {
929 checkBlackHoles(cap);
930 blackholes_need_checking = rtsFalse;
931 }
932 RELEASE_LOCK(&sched_mutex);
933 }
934 }
935
936 /* ----------------------------------------------------------------------------
937 * Detect deadlock conditions and attempt to resolve them.
938 * ------------------------------------------------------------------------- */
939
940 static void
941 scheduleDetectDeadlock (Capability *cap, Task *task)
942 {
943
944 #if defined(PARALLEL_HASKELL)
945 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
946 return;
947 #endif
948
949 /*
950 * Detect deadlock: when we have no threads to run, there are no
951 * threads blocked, waiting for I/O, or sleeping, and all the
952 * other tasks are waiting for work, we must have a deadlock of
953 * some description.
954 */
955 if ( emptyThreadQueues(cap) )
956 {
957 #if defined(THREADED_RTS)
958 /*
959 * In the threaded RTS, we only check for deadlock if there
960 * has been no activity in a complete timeslice. This means
961 * we won't eagerly start a full GC just because we don't have
962 * any threads to run currently.
963 */
964 if (recent_activity != ACTIVITY_INACTIVE) return;
965 #endif
966
967 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
968
969 // Garbage collection can release some new threads due to
970 // either (a) finalizers or (b) threads resurrected because
971 // they are unreachable and will therefore be sent an
972 // exception. Any threads thus released will be immediately
973 // runnable.
974 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
975
976 recent_activity = ACTIVITY_DONE_GC;
977
978 if ( !emptyRunQueue(cap) ) return;
979
980 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
981 /* If we have user-installed signal handlers, then wait
982 * for signals to arrive rather then bombing out with a
983 * deadlock.
984 */
985 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
986 debugTrace(DEBUG_sched,
987 "still deadlocked, waiting for signals...");
988
989 awaitUserSignals();
990
991 if (signals_pending()) {
992 startSignalHandlers(cap);
993 }
994
995 // either we have threads to run, or we were interrupted:
996 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
997 }
998 #endif
999
1000 #if !defined(THREADED_RTS)
1001 /* Probably a real deadlock. Send the current main thread the
1002 * Deadlock exception.
1003 */
1004 if (task->tso) {
1005 switch (task->tso->why_blocked) {
1006 case BlockedOnSTM:
1007 case BlockedOnBlackHole:
1008 case BlockedOnException:
1009 case BlockedOnMVar:
1010 throwToSingleThreaded(cap, task->tso,
1011 (StgClosure *)NonTermination_closure);
1012 return;
1013 default:
1014 barf("deadlock: main thread blocked in a strange way");
1015 }
1016 }
1017 return;
1018 #endif
1019 }
1020 }
1021
1022 /* ----------------------------------------------------------------------------
1023 * Process an event (GRAN only)
1024 * ------------------------------------------------------------------------- */
1025
1026 #if defined(GRAN)
1027 static StgTSO *
1028 scheduleProcessEvent(rtsEvent *event)
1029 {
1030 StgTSO *t;
1031
1032 if (RtsFlags.GranFlags.Light)
1033 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1034
1035 /* adjust time based on time-stamp */
1036 if (event->time > CurrentTime[CurrentProc] &&
1037 event->evttype != ContinueThread)
1038 CurrentTime[CurrentProc] = event->time;
1039
1040 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1041 if (!RtsFlags.GranFlags.Light)
1042 handleIdlePEs();
1043
1044 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1045
1046 /* main event dispatcher in GranSim */
1047 switch (event->evttype) {
1048 /* Should just be continuing execution */
1049 case ContinueThread:
1050 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1051 /* ToDo: check assertion
1052 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1053 run_queue_hd != END_TSO_QUEUE);
1054 */
1055 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1056 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1057 procStatus[CurrentProc]==Fetching) {
1058 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1059 CurrentTSO->id, CurrentTSO, CurrentProc);
1060 goto next_thread;
1061 }
1062 /* Ignore ContinueThreads for completed threads */
1063 if (CurrentTSO->what_next == ThreadComplete) {
1064 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1065 CurrentTSO->id, CurrentTSO, CurrentProc);
1066 goto next_thread;
1067 }
1068 /* Ignore ContinueThreads for threads that are being migrated */
1069 if (PROCS(CurrentTSO)==Nowhere) {
1070 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1071 CurrentTSO->id, CurrentTSO, CurrentProc);
1072 goto next_thread;
1073 }
1074 /* The thread should be at the beginning of the run queue */
1075 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1076 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1077 CurrentTSO->id, CurrentTSO, CurrentProc);
1078 break; // run the thread anyway
1079 }
1080 /*
1081 new_event(proc, proc, CurrentTime[proc],
1082 FindWork,
1083 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1084 goto next_thread;
1085 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1086 break; // now actually run the thread; DaH Qu'vam yImuHbej
1087
1088 case FetchNode:
1089 do_the_fetchnode(event);
1090 goto next_thread; /* handle next event in event queue */
1091
1092 case GlobalBlock:
1093 do_the_globalblock(event);
1094 goto next_thread; /* handle next event in event queue */
1095
1096 case FetchReply:
1097 do_the_fetchreply(event);
1098 goto next_thread; /* handle next event in event queue */
1099
1100 case UnblockThread: /* Move from the blocked queue to the tail of */
1101 do_the_unblock(event);
1102 goto next_thread; /* handle next event in event queue */
1103
1104 case ResumeThread: /* Move from the blocked queue to the tail of */
1105 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1106 event->tso->gran.blocktime +=
1107 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1108 do_the_startthread(event);
1109 goto next_thread; /* handle next event in event queue */
1110
1111 case StartThread:
1112 do_the_startthread(event);
1113 goto next_thread; /* handle next event in event queue */
1114
1115 case MoveThread:
1116 do_the_movethread(event);
1117 goto next_thread; /* handle next event in event queue */
1118
1119 case MoveSpark:
1120 do_the_movespark(event);
1121 goto next_thread; /* handle next event in event queue */
1122
1123 case FindWork:
1124 do_the_findwork(event);
1125 goto next_thread; /* handle next event in event queue */
1126
1127 default:
1128 barf("Illegal event type %u\n", event->evttype);
1129 } /* switch */
1130
1131 /* This point was scheduler_loop in the old RTS */
1132
1133 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1134
1135 TimeOfLastEvent = CurrentTime[CurrentProc];
1136 TimeOfNextEvent = get_time_of_next_event();
1137 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1138 // CurrentTSO = ThreadQueueHd;
1139
1140 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1141 TimeOfNextEvent));
1142
1143 if (RtsFlags.GranFlags.Light)
1144 GranSimLight_leave_system(event, &ActiveTSO);
1145
1146 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1147
1148 IF_DEBUG(gran,
1149 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1150
1151 /* in a GranSim setup the TSO stays on the run queue */
1152 t = CurrentTSO;
1153 /* Take a thread from the run queue. */
1154 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1155
1156 IF_DEBUG(gran,
1157 debugBelch("GRAN: About to run current thread, which is\n");
1158 G_TSO(t,5));
1159
1160 context_switch = 0; // turned on via GranYield, checking events and time slice
1161
1162 IF_DEBUG(gran,
1163 DumpGranEvent(GR_SCHEDULE, t));
1164
1165 procStatus[CurrentProc] = Busy;
1166 }
1167 #endif // GRAN
1168
1169 /* ----------------------------------------------------------------------------
1170 * Send pending messages (PARALLEL_HASKELL only)
1171 * ------------------------------------------------------------------------- */
1172
1173 #if defined(PARALLEL_HASKELL)
1174 static StgTSO *
1175 scheduleSendPendingMessages(void)
1176 {
1177 StgSparkPool *pool;
1178 rtsSpark spark;
1179 StgTSO *t;
1180
1181 # if defined(PAR) // global Mem.Mgmt., omit for now
1182 if (PendingFetches != END_BF_QUEUE) {
1183 processFetches();
1184 }
1185 # endif
1186
1187 if (RtsFlags.ParFlags.BufferTime) {
1188 // if we use message buffering, we must send away all message
1189 // packets which have become too old...
1190 sendOldBuffers();
1191 }
1192 }
1193 #endif
1194
1195 /* ----------------------------------------------------------------------------
1196 * Activate spark threads (PARALLEL_HASKELL only)
1197 * ------------------------------------------------------------------------- */
1198
1199 #if defined(PARALLEL_HASKELL)
1200 static void
1201 scheduleActivateSpark(void)
1202 {
1203 #if defined(SPARKS)
1204 ASSERT(emptyRunQueue());
1205 /* We get here if the run queue is empty and want some work.
1206 We try to turn a spark into a thread, and add it to the run queue,
1207 from where it will be picked up in the next iteration of the scheduler
1208 loop.
1209 */
1210
1211 /* :-[ no local threads => look out for local sparks */
1212 /* the spark pool for the current PE */
1213 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1214 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1215 pool->hd < pool->tl) {
1216 /*
1217 * ToDo: add GC code check that we really have enough heap afterwards!!
1218 * Old comment:
1219 * If we're here (no runnable threads) and we have pending
1220 * sparks, we must have a space problem. Get enough space
1221 * to turn one of those pending sparks into a
1222 * thread...
1223 */
1224
1225 spark = findSpark(rtsFalse); /* get a spark */
1226 if (spark != (rtsSpark) NULL) {
1227 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1228 IF_PAR_DEBUG(fish, // schedule,
1229 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1230 tso->id, tso, advisory_thread_count));
1231
1232 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1233 IF_PAR_DEBUG(fish, // schedule,
1234 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1235 spark));
1236 return rtsFalse; /* failed to generate a thread */
1237 } /* otherwise fall through & pick-up new tso */
1238 } else {
1239 IF_PAR_DEBUG(fish, // schedule,
1240 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1241 spark_queue_len(pool)));
1242 return rtsFalse; /* failed to generate a thread */
1243 }
1244 return rtsTrue; /* success in generating a thread */
1245 } else { /* no more threads permitted or pool empty */
1246 return rtsFalse; /* failed to generateThread */
1247 }
1248 #else
1249 tso = NULL; // avoid compiler warning only
1250 return rtsFalse; /* dummy in non-PAR setup */
1251 #endif // SPARKS
1252 }
1253 #endif // PARALLEL_HASKELL
1254
1255 /* ----------------------------------------------------------------------------
1256 * Get work from a remote node (PARALLEL_HASKELL only)
1257 * ------------------------------------------------------------------------- */
1258
1259 #if defined(PARALLEL_HASKELL)
1260 static rtsBool
1261 scheduleGetRemoteWork(rtsBool *receivedFinish)
1262 {
1263 ASSERT(emptyRunQueue());
1264
1265 if (RtsFlags.ParFlags.BufferTime) {
1266 IF_PAR_DEBUG(verbose,
1267 debugBelch("...send all pending data,"));
1268 {
1269 nat i;
1270 for (i=1; i<=nPEs; i++)
1271 sendImmediately(i); // send all messages away immediately
1272 }
1273 }
1274 # ifndef SPARKS
1275 //++EDEN++ idle() , i.e. send all buffers, wait for work
1276 // suppress fishing in EDEN... just look for incoming messages
1277 // (blocking receive)
1278 IF_PAR_DEBUG(verbose,
1279 debugBelch("...wait for incoming messages...\n"));
1280 *receivedFinish = processMessages(); // blocking receive...
1281
1282 // and reenter scheduling loop after having received something
1283 // (return rtsFalse below)
1284
1285 # else /* activate SPARKS machinery */
1286 /* We get here, if we have no work, tried to activate a local spark, but still
1287 have no work. We try to get a remote spark, by sending a FISH message.
1288 Thread migration should be added here, and triggered when a sequence of
1289 fishes returns without work. */
1290 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1291
1292 /* =8-[ no local sparks => look for work on other PEs */
1293 /*
1294 * We really have absolutely no work. Send out a fish
1295 * (there may be some out there already), and wait for
1296 * something to arrive. We clearly can't run any threads
1297 * until a SCHEDULE or RESUME arrives, and so that's what
1298 * we're hoping to see. (Of course, we still have to
1299 * respond to other types of messages.)
1300 */
1301 rtsTime now = msTime() /*CURRENT_TIME*/;
1302 IF_PAR_DEBUG(verbose,
1303 debugBelch("-- now=%ld\n", now));
1304 IF_PAR_DEBUG(fish, // verbose,
1305 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1306 (last_fish_arrived_at!=0 &&
1307 last_fish_arrived_at+delay > now)) {
1308 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1309 now, last_fish_arrived_at+delay,
1310 last_fish_arrived_at,
1311 delay);
1312 });
1313
1314 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1315 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1316 if (last_fish_arrived_at==0 ||
1317 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1318 /* outstandingFishes is set in sendFish, processFish;
1319 avoid flooding system with fishes via delay */
1320 next_fish_to_send_at = 0;
1321 } else {
1322 /* ToDo: this should be done in the main scheduling loop to avoid the
1323 busy wait here; not so bad if fish delay is very small */
1324 int iq = 0; // DEBUGGING -- HWL
1325 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1326 /* send a fish when ready, but process messages that arrive in the meantime */
1327 do {
1328 if (PacketsWaiting()) {
1329 iq++; // DEBUGGING
1330 *receivedFinish = processMessages();
1331 }
1332 now = msTime();
1333 } while (!*receivedFinish || now<next_fish_to_send_at);
1334 // JB: This means the fish could become obsolete, if we receive
1335 // work. Better check for work again?
1336 // last line: while (!receivedFinish || !haveWork || now<...)
1337 // next line: if (receivedFinish || haveWork )
1338
1339 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1340 return rtsFalse; // NB: this will leave scheduler loop
1341 // immediately after return!
1342
1343 IF_PAR_DEBUG(fish, // verbose,
1344 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1345
1346 }
1347
1348 // JB: IMHO, this should all be hidden inside sendFish(...)
1349 /* pe = choosePE();
1350 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1351 NEW_FISH_HUNGER);
1352
1353 // Global statistics: count no. of fishes
1354 if (RtsFlags.ParFlags.ParStats.Global &&
1355 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1356 globalParStats.tot_fish_mess++;
1357 }
1358 */
1359
1360 /* delayed fishes must have been sent by now! */
1361 next_fish_to_send_at = 0;
1362 }
1363
1364 *receivedFinish = processMessages();
1365 # endif /* SPARKS */
1366
1367 return rtsFalse;
1368 /* NB: this function always returns rtsFalse, meaning the scheduler
1369 loop continues with the next iteration;
1370 rationale:
1371 return code means success in finding work; we enter this function
1372 if there is no local work, thus have to send a fish which takes
1373 time until it arrives with work; in the meantime we should process
1374 messages in the main loop;
1375 */
1376 }
1377 #endif // PARALLEL_HASKELL
1378
1379 /* ----------------------------------------------------------------------------
1380 * PAR/GRAN: Report stats & debugging info(?)
1381 * ------------------------------------------------------------------------- */
1382
1383 #if defined(PAR) || defined(GRAN)
1384 static void
1385 scheduleGranParReport(void)
1386 {
1387 ASSERT(run_queue_hd != END_TSO_QUEUE);
1388
1389 /* Take a thread from the run queue, if we have work */
1390 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1391
1392 /* If this TSO has got its outport closed in the meantime,
1393 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1394 * It has to be marked as TH_DEAD for this purpose.
1395 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1396
1397 JB: TODO: investigate wether state change field could be nuked
1398 entirely and replaced by the normal tso state (whatnext
1399 field). All we want to do is to kill tsos from outside.
1400 */
1401
1402 /* ToDo: write something to the log-file
1403 if (RTSflags.ParFlags.granSimStats && !sameThread)
1404 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1405
1406 CurrentTSO = t;
1407 */
1408 /* the spark pool for the current PE */
1409 pool = &(cap.r.rSparks); // cap = (old) MainCap
1410
1411 IF_DEBUG(scheduler,
1412 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1413 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1414
1415 IF_PAR_DEBUG(fish,
1416 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1417 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1418
1419 if (RtsFlags.ParFlags.ParStats.Full &&
1420 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1421 (emitSchedule || // forced emit
1422 (t && LastTSO && t->id != LastTSO->id))) {
1423 /*
1424 we are running a different TSO, so write a schedule event to log file
1425 NB: If we use fair scheduling we also have to write a deschedule
1426 event for LastTSO; with unfair scheduling we know that the
1427 previous tso has blocked whenever we switch to another tso, so
1428 we don't need it in GUM for now
1429 */
1430 IF_PAR_DEBUG(fish, // schedule,
1431 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1432
1433 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1434 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1435 emitSchedule = rtsFalse;
1436 }
1437 }
1438 #endif
1439
1440 /* ----------------------------------------------------------------------------
1441 * After running a thread...
1442 * ------------------------------------------------------------------------- */
1443
1444 static void
1445 schedulePostRunThread(void)
1446 {
1447 #if defined(PAR)
1448 /* HACK 675: if the last thread didn't yield, make sure to print a
1449 SCHEDULE event to the log file when StgRunning the next thread, even
1450 if it is the same one as before */
1451 LastTSO = t;
1452 TimeOfLastYield = CURRENT_TIME;
1453 #endif
1454
1455 /* some statistics gathering in the parallel case */
1456
1457 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1458 switch (ret) {
1459 case HeapOverflow:
1460 # if defined(GRAN)
1461 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1462 globalGranStats.tot_heapover++;
1463 # elif defined(PAR)
1464 globalParStats.tot_heapover++;
1465 # endif
1466 break;
1467
1468 case StackOverflow:
1469 # if defined(GRAN)
1470 IF_DEBUG(gran,
1471 DumpGranEvent(GR_DESCHEDULE, t));
1472 globalGranStats.tot_stackover++;
1473 # elif defined(PAR)
1474 // IF_DEBUG(par,
1475 // DumpGranEvent(GR_DESCHEDULE, t);
1476 globalParStats.tot_stackover++;
1477 # endif
1478 break;
1479
1480 case ThreadYielding:
1481 # if defined(GRAN)
1482 IF_DEBUG(gran,
1483 DumpGranEvent(GR_DESCHEDULE, t));
1484 globalGranStats.tot_yields++;
1485 # elif defined(PAR)
1486 // IF_DEBUG(par,
1487 // DumpGranEvent(GR_DESCHEDULE, t);
1488 globalParStats.tot_yields++;
1489 # endif
1490 break;
1491
1492 case ThreadBlocked:
1493 # if defined(GRAN)
1494 debugTrace(DEBUG_sched,
1495 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1496 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1497 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1498 if (t->block_info.closure!=(StgClosure*)NULL)
1499 print_bq(t->block_info.closure);
1500 debugBelch("\n"));
1501
1502 // ??? needed; should emit block before
1503 IF_DEBUG(gran,
1504 DumpGranEvent(GR_DESCHEDULE, t));
1505 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1506 /*
1507 ngoq Dogh!
1508 ASSERT(procStatus[CurrentProc]==Busy ||
1509 ((procStatus[CurrentProc]==Fetching) &&
1510 (t->block_info.closure!=(StgClosure*)NULL)));
1511 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1512 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1513 procStatus[CurrentProc]==Fetching))
1514 procStatus[CurrentProc] = Idle;
1515 */
1516 # elif defined(PAR)
1517 //++PAR++ blockThread() writes the event (change?)
1518 # endif
1519 break;
1520
1521 case ThreadFinished:
1522 break;
1523
1524 default:
1525 barf("parGlobalStats: unknown return code");
1526 break;
1527 }
1528 #endif
1529 }
1530
1531 /* -----------------------------------------------------------------------------
1532 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1533 * -------------------------------------------------------------------------- */
1534
1535 static rtsBool
1536 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1537 {
1538 // did the task ask for a large block?
1539 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1540 // if so, get one and push it on the front of the nursery.
1541 bdescr *bd;
1542 lnat blocks;
1543
1544 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1545
1546 debugTrace(DEBUG_sched,
1547 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1548 (long)t->id, whatNext_strs[t->what_next], blocks);
1549
1550 // don't do this if the nursery is (nearly) full, we'll GC first.
1551 if (cap->r.rCurrentNursery->link != NULL ||
1552 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1553 // if the nursery has only one block.
1554
1555 ACQUIRE_SM_LOCK
1556 bd = allocGroup( blocks );
1557 RELEASE_SM_LOCK
1558 cap->r.rNursery->n_blocks += blocks;
1559
1560 // link the new group into the list
1561 bd->link = cap->r.rCurrentNursery;
1562 bd->u.back = cap->r.rCurrentNursery->u.back;
1563 if (cap->r.rCurrentNursery->u.back != NULL) {
1564 cap->r.rCurrentNursery->u.back->link = bd;
1565 } else {
1566 #if !defined(THREADED_RTS)
1567 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1568 g0s0 == cap->r.rNursery);
1569 #endif
1570 cap->r.rNursery->blocks = bd;
1571 }
1572 cap->r.rCurrentNursery->u.back = bd;
1573
1574 // initialise it as a nursery block. We initialise the
1575 // step, gen_no, and flags field of *every* sub-block in
1576 // this large block, because this is easier than making
1577 // sure that we always find the block head of a large
1578 // block whenever we call Bdescr() (eg. evacuate() and
1579 // isAlive() in the GC would both have to do this, at
1580 // least).
1581 {
1582 bdescr *x;
1583 for (x = bd; x < bd + blocks; x++) {
1584 x->step = cap->r.rNursery;
1585 x->gen_no = 0;
1586 x->flags = 0;
1587 }
1588 }
1589
1590 // This assert can be a killer if the app is doing lots
1591 // of large block allocations.
1592 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1593
1594 // now update the nursery to point to the new block
1595 cap->r.rCurrentNursery = bd;
1596
1597 // we might be unlucky and have another thread get on the
1598 // run queue before us and steal the large block, but in that
1599 // case the thread will just end up requesting another large
1600 // block.
1601 pushOnRunQueue(cap,t);
1602 return rtsFalse; /* not actually GC'ing */
1603 }
1604 }
1605
1606 debugTrace(DEBUG_sched,
1607 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1608 (long)t->id, whatNext_strs[t->what_next]);
1609
1610 #if defined(GRAN)
1611 ASSERT(!is_on_queue(t,CurrentProc));
1612 #elif defined(PARALLEL_HASKELL)
1613 /* Currently we emit a DESCHEDULE event before GC in GUM.
1614 ToDo: either add separate event to distinguish SYSTEM time from rest
1615 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1616 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1617 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1618 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1619 emitSchedule = rtsTrue;
1620 }
1621 #endif
1622
1623 pushOnRunQueue(cap,t);
1624 return rtsTrue;
1625 /* actual GC is done at the end of the while loop in schedule() */
1626 }
1627
1628 /* -----------------------------------------------------------------------------
1629 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1630 * -------------------------------------------------------------------------- */
1631
1632 static void
1633 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1634 {
1635 debugTrace (DEBUG_sched,
1636 "--<< thread %ld (%s) stopped, StackOverflow",
1637 (long)t->id, whatNext_strs[t->what_next]);
1638
1639 /* just adjust the stack for this thread, then pop it back
1640 * on the run queue.
1641 */
1642 {
1643 /* enlarge the stack */
1644 StgTSO *new_t = threadStackOverflow(cap, t);
1645
1646 /* The TSO attached to this Task may have moved, so update the
1647 * pointer to it.
1648 */
1649 if (task->tso == t) {
1650 task->tso = new_t;
1651 }
1652 pushOnRunQueue(cap,new_t);
1653 }
1654 }
1655
1656 /* -----------------------------------------------------------------------------
1657 * Handle a thread that returned to the scheduler with ThreadYielding
1658 * -------------------------------------------------------------------------- */
1659
1660 static rtsBool
1661 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1662 {
1663 // Reset the context switch flag. We don't do this just before
1664 // running the thread, because that would mean we would lose ticks
1665 // during GC, which can lead to unfair scheduling (a thread hogs
1666 // the CPU because the tick always arrives during GC). This way
1667 // penalises threads that do a lot of allocation, but that seems
1668 // better than the alternative.
1669 context_switch = 0;
1670
1671 /* put the thread back on the run queue. Then, if we're ready to
1672 * GC, check whether this is the last task to stop. If so, wake
1673 * up the GC thread. getThread will block during a GC until the
1674 * GC is finished.
1675 */
1676 #ifdef DEBUG
1677 if (t->what_next != prev_what_next) {
1678 debugTrace(DEBUG_sched,
1679 "--<< thread %ld (%s) stopped to switch evaluators",
1680 (long)t->id, whatNext_strs[t->what_next]);
1681 } else {
1682 debugTrace(DEBUG_sched,
1683 "--<< thread %ld (%s) stopped, yielding",
1684 (long)t->id, whatNext_strs[t->what_next]);
1685 }
1686 #endif
1687
1688 IF_DEBUG(sanity,
1689 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1690 checkTSO(t));
1691 ASSERT(t->link == END_TSO_QUEUE);
1692
1693 // Shortcut if we're just switching evaluators: don't bother
1694 // doing stack squeezing (which can be expensive), just run the
1695 // thread.
1696 if (t->what_next != prev_what_next) {
1697 return rtsTrue;
1698 }
1699
1700 #if defined(GRAN)
1701 ASSERT(!is_on_queue(t,CurrentProc));
1702
1703 IF_DEBUG(sanity,
1704 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1705 checkThreadQsSanity(rtsTrue));
1706
1707 #endif
1708
1709 addToRunQueue(cap,t);
1710
1711 #if defined(GRAN)
1712 /* add a ContinueThread event to actually process the thread */
1713 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1714 ContinueThread,
1715 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1716 IF_GRAN_DEBUG(bq,
1717 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1718 G_EVENTQ(0);
1719 G_CURR_THREADQ(0));
1720 #endif
1721 return rtsFalse;
1722 }
1723
1724 /* -----------------------------------------------------------------------------
1725 * Handle a thread that returned to the scheduler with ThreadBlocked
1726 * -------------------------------------------------------------------------- */
1727
1728 static void
1729 scheduleHandleThreadBlocked( StgTSO *t
1730 #if !defined(GRAN) && !defined(DEBUG)
1731 STG_UNUSED
1732 #endif
1733 )
1734 {
1735 #if defined(GRAN)
1736 IF_DEBUG(scheduler,
1737 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1738 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1739 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1740
1741 // ??? needed; should emit block before
1742 IF_DEBUG(gran,
1743 DumpGranEvent(GR_DESCHEDULE, t));
1744 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1745 /*
1746 ngoq Dogh!
1747 ASSERT(procStatus[CurrentProc]==Busy ||
1748 ((procStatus[CurrentProc]==Fetching) &&
1749 (t->block_info.closure!=(StgClosure*)NULL)));
1750 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1751 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1752 procStatus[CurrentProc]==Fetching))
1753 procStatus[CurrentProc] = Idle;
1754 */
1755 #elif defined(PAR)
1756 IF_DEBUG(scheduler,
1757 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1758 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1759 IF_PAR_DEBUG(bq,
1760
1761 if (t->block_info.closure!=(StgClosure*)NULL)
1762 print_bq(t->block_info.closure));
1763
1764 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1765 blockThread(t);
1766
1767 /* whatever we schedule next, we must log that schedule */
1768 emitSchedule = rtsTrue;
1769
1770 #else /* !GRAN */
1771
1772 // We don't need to do anything. The thread is blocked, and it
1773 // has tidied up its stack and placed itself on whatever queue
1774 // it needs to be on.
1775
1776 // ASSERT(t->why_blocked != NotBlocked);
1777 // Not true: for example,
1778 // - in THREADED_RTS, the thread may already have been woken
1779 // up by another Capability. This actually happens: try
1780 // conc023 +RTS -N2.
1781 // - the thread may have woken itself up already, because
1782 // threadPaused() might have raised a blocked throwTo
1783 // exception, see maybePerformBlockedException().
1784
1785 #ifdef DEBUG
1786 if (traceClass(DEBUG_sched)) {
1787 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1788 (unsigned long)t->id, whatNext_strs[t->what_next]);
1789 printThreadBlockage(t);
1790 debugTraceEnd();
1791 }
1792 #endif
1793
1794 /* Only for dumping event to log file
1795 ToDo: do I need this in GranSim, too?
1796 blockThread(t);
1797 */
1798 #endif
1799 }
1800
1801 /* -----------------------------------------------------------------------------
1802 * Handle a thread that returned to the scheduler with ThreadFinished
1803 * -------------------------------------------------------------------------- */
1804
1805 static rtsBool
1806 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1807 {
1808 /* Need to check whether this was a main thread, and if so,
1809 * return with the return value.
1810 *
1811 * We also end up here if the thread kills itself with an
1812 * uncaught exception, see Exception.cmm.
1813 */
1814 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1815 (unsigned long)t->id, whatNext_strs[t->what_next]);
1816
1817 #if defined(GRAN)
1818 endThread(t, CurrentProc); // clean-up the thread
1819 #elif defined(PARALLEL_HASKELL)
1820 /* For now all are advisory -- HWL */
1821 //if(t->priority==AdvisoryPriority) ??
1822 advisory_thread_count--; // JB: Caution with this counter, buggy!
1823
1824 # if defined(DIST)
1825 if(t->dist.priority==RevalPriority)
1826 FinishReval(t);
1827 # endif
1828
1829 # if defined(EDENOLD)
1830 // the thread could still have an outport... (BUG)
1831 if (t->eden.outport != -1) {
1832 // delete the outport for the tso which has finished...
1833 IF_PAR_DEBUG(eden_ports,
1834 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1835 t->eden.outport, t->id));
1836 deleteOPT(t);
1837 }
1838 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1839 if (t->eden.epid != -1) {
1840 IF_PAR_DEBUG(eden_ports,
1841 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1842 t->id, t->eden.epid));
1843 removeTSOfromProcess(t);
1844 }
1845 # endif
1846
1847 # if defined(PAR)
1848 if (RtsFlags.ParFlags.ParStats.Full &&
1849 !RtsFlags.ParFlags.ParStats.Suppressed)
1850 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1851
1852 // t->par only contains statistics: left out for now...
1853 IF_PAR_DEBUG(fish,
1854 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1855 t->id,t,t->par.sparkname));
1856 # endif
1857 #endif // PARALLEL_HASKELL
1858
1859 //
1860 // Check whether the thread that just completed was a bound
1861 // thread, and if so return with the result.
1862 //
1863 // There is an assumption here that all thread completion goes
1864 // through this point; we need to make sure that if a thread
1865 // ends up in the ThreadKilled state, that it stays on the run
1866 // queue so it can be dealt with here.
1867 //
1868
1869 if (t->bound) {
1870
1871 if (t->bound != task) {
1872 #if !defined(THREADED_RTS)
1873 // Must be a bound thread that is not the topmost one. Leave
1874 // it on the run queue until the stack has unwound to the
1875 // point where we can deal with this. Leaving it on the run
1876 // queue also ensures that the garbage collector knows about
1877 // this thread and its return value (it gets dropped from the
1878 // all_threads list so there's no other way to find it).
1879 appendToRunQueue(cap,t);
1880 return rtsFalse;
1881 #else
1882 // this cannot happen in the threaded RTS, because a
1883 // bound thread can only be run by the appropriate Task.
1884 barf("finished bound thread that isn't mine");
1885 #endif
1886 }
1887
1888 ASSERT(task->tso == t);
1889
1890 if (t->what_next == ThreadComplete) {
1891 if (task->ret) {
1892 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1893 *(task->ret) = (StgClosure *)task->tso->sp[1];
1894 }
1895 task->stat = Success;
1896 } else {
1897 if (task->ret) {
1898 *(task->ret) = NULL;
1899 }
1900 if (sched_state >= SCHED_INTERRUPTING) {
1901 task->stat = Interrupted;
1902 } else {
1903 task->stat = Killed;
1904 }
1905 }
1906 #ifdef DEBUG
1907 removeThreadLabel((StgWord)task->tso->id);
1908 #endif
1909 return rtsTrue; // tells schedule() to return
1910 }
1911
1912 return rtsFalse;
1913 }
1914
1915 /* -----------------------------------------------------------------------------
1916 * Perform a heap census
1917 * -------------------------------------------------------------------------- */
1918
1919 static rtsBool
1920 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1921 {
1922 // When we have +RTS -i0 and we're heap profiling, do a census at
1923 // every GC. This lets us get repeatable runs for debugging.
1924 if (performHeapProfile ||
1925 (RtsFlags.ProfFlags.profileInterval==0 &&
1926 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1927 return rtsTrue;
1928 } else {
1929 return rtsFalse;
1930 }
1931 }
1932
1933 /* -----------------------------------------------------------------------------
1934 * Perform a garbage collection if necessary
1935 * -------------------------------------------------------------------------- */
1936
1937 static Capability *
1938 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1939 {
1940 StgTSO *t;
1941 rtsBool heap_census;
1942 #ifdef THREADED_RTS
1943 static volatile StgWord waiting_for_gc;
1944 rtsBool was_waiting;
1945 nat i;
1946 #endif
1947
1948 #ifdef THREADED_RTS
1949 // In order to GC, there must be no threads running Haskell code.
1950 // Therefore, the GC thread needs to hold *all* the capabilities,
1951 // and release them after the GC has completed.
1952 //
1953 // This seems to be the simplest way: previous attempts involved
1954 // making all the threads with capabilities give up their
1955 // capabilities and sleep except for the *last* one, which
1956 // actually did the GC. But it's quite hard to arrange for all
1957 // the other tasks to sleep and stay asleep.
1958 //
1959
1960 was_waiting = cas(&waiting_for_gc, 0, 1);
1961 if (was_waiting) {
1962 do {
1963 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1964 if (cap) yieldCapability(&cap,task);
1965 } while (waiting_for_gc);
1966 return cap; // NOTE: task->cap might have changed here
1967 }
1968
1969 for (i=0; i < n_capabilities; i++) {
1970 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1971 if (cap != &capabilities[i]) {
1972 Capability *pcap = &capabilities[i];
1973 // we better hope this task doesn't get migrated to
1974 // another Capability while we're waiting for this one.
1975 // It won't, because load balancing happens while we have
1976 // all the Capabilities, but even so it's a slightly
1977 // unsavoury invariant.
1978 task->cap = pcap;
1979 context_switch = 1;
1980 waitForReturnCapability(&pcap, task);
1981 if (pcap != &capabilities[i]) {
1982 barf("scheduleDoGC: got the wrong capability");
1983 }
1984 }
1985 }
1986
1987 waiting_for_gc = rtsFalse;
1988 #endif
1989
1990 /* Kick any transactions which are invalid back to their
1991 * atomically frames. When next scheduled they will try to
1992 * commit, this commit will fail and they will retry.
1993 */
1994 {
1995 StgTSO *next;
1996
1997 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1998 if (t->what_next == ThreadRelocated) {
1999 next = t->link;
2000 } else {
2001 next = t->global_link;
2002
2003 // This is a good place to check for blocked
2004 // exceptions. It might be the case that a thread is
2005 // blocked on delivering an exception to a thread that
2006 // is also blocked - we try to ensure that this
2007 // doesn't happen in throwTo(), but it's too hard (or
2008 // impossible) to close all the race holes, so we
2009 // accept that some might get through and deal with
2010 // them here. A GC will always happen at some point,
2011 // even if the system is otherwise deadlocked.
2012 maybePerformBlockedException (&capabilities[0], t);
2013
2014 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2015 if (!stmValidateNestOfTransactions (t -> trec)) {
2016 debugTrace(DEBUG_sched | DEBUG_stm,
2017 "trec %p found wasting its time", t);
2018
2019 // strip the stack back to the
2020 // ATOMICALLY_FRAME, aborting the (nested)
2021 // transaction, and saving the stack of any
2022 // partially-evaluated thunks on the heap.
2023 throwToSingleThreaded_(&capabilities[0], t,
2024 NULL, rtsTrue, NULL);
2025
2026 #ifdef REG_R1
2027 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2028 #endif
2029 }
2030 }
2031 }
2032 }
2033 }
2034
2035 // so this happens periodically:
2036 if (cap) scheduleCheckBlackHoles(cap);
2037
2038 IF_DEBUG(scheduler, printAllThreads());
2039
2040 /*
2041 * We now have all the capabilities; if we're in an interrupting
2042 * state, then we should take the opportunity to delete all the
2043 * threads in the system.
2044 */
2045 if (sched_state >= SCHED_INTERRUPTING) {
2046 deleteAllThreads(&capabilities[0]);
2047 sched_state = SCHED_SHUTTING_DOWN;
2048 }
2049
2050 heap_census = scheduleNeedHeapProfile(rtsTrue);
2051
2052 /* everybody back, start the GC.
2053 * Could do it in this thread, or signal a condition var
2054 * to do it in another thread. Either way, we need to
2055 * broadcast on gc_pending_cond afterward.
2056 */
2057 #if defined(THREADED_RTS)
2058 debugTrace(DEBUG_sched, "doing GC");
2059 #endif
2060 GarbageCollect(force_major || heap_census);
2061
2062 if (heap_census) {
2063 debugTrace(DEBUG_sched, "performing heap census");
2064 heapCensus();
2065 performHeapProfile = rtsFalse;
2066 }
2067
2068 #if defined(THREADED_RTS)
2069 // release our stash of capabilities.
2070 for (i = 0; i < n_capabilities; i++) {
2071 if (cap != &capabilities[i]) {
2072 task->cap = &capabilities[i];
2073 releaseCapability(&capabilities[i]);
2074 }
2075 }
2076 if (cap) {
2077 task->cap = cap;
2078 } else {
2079 task->cap = NULL;
2080 }
2081 #endif
2082
2083 #if defined(GRAN)
2084 /* add a ContinueThread event to continue execution of current thread */
2085 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2086 ContinueThread,
2087 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2088 IF_GRAN_DEBUG(bq,
2089 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2090 G_EVENTQ(0);
2091 G_CURR_THREADQ(0));
2092 #endif /* GRAN */
2093
2094 return cap;
2095 }
2096
2097 /* ---------------------------------------------------------------------------
2098 * Singleton fork(). Do not copy any running threads.
2099 * ------------------------------------------------------------------------- */
2100
2101 pid_t
2102 forkProcess(HsStablePtr *entry
2103 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2104 STG_UNUSED
2105 #endif
2106 )
2107 {
2108 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2109 Task *task;
2110 pid_t pid;
2111 StgTSO* t,*next;
2112 Capability *cap;
2113
2114 #if defined(THREADED_RTS)
2115 if (RtsFlags.ParFlags.nNodes > 1) {
2116 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2117 stg_exit(EXIT_FAILURE);
2118 }
2119 #endif
2120
2121 debugTrace(DEBUG_sched, "forking!");
2122
2123 // ToDo: for SMP, we should probably acquire *all* the capabilities
2124 cap = rts_lock();
2125
2126 pid = fork();
2127
2128 if (pid) { // parent
2129
2130 // just return the pid
2131 rts_unlock(cap);
2132 return pid;
2133
2134 } else { // child
2135
2136 // Now, all OS threads except the thread that forked are
2137 // stopped. We need to stop all Haskell threads, including
2138 // those involved in foreign calls. Also we need to delete
2139 // all Tasks, because they correspond to OS threads that are
2140 // now gone.
2141
2142 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2143 if (t->what_next == ThreadRelocated) {
2144 next = t->link;
2145 } else {
2146 next = t->global_link;
2147 // don't allow threads to catch the ThreadKilled
2148 // exception, but we do want to raiseAsync() because these
2149 // threads may be evaluating thunks that we need later.
2150 deleteThread_(cap,t);
2151 }
2152 }
2153
2154 // Empty the run queue. It seems tempting to let all the
2155 // killed threads stay on the run queue as zombies to be
2156 // cleaned up later, but some of them correspond to bound
2157 // threads for which the corresponding Task does not exist.
2158 cap->run_queue_hd = END_TSO_QUEUE;
2159 cap->run_queue_tl = END_TSO_QUEUE;
2160
2161 // Any suspended C-calling Tasks are no more, their OS threads
2162 // don't exist now:
2163 cap->suspended_ccalling_tasks = NULL;
2164
2165 // Empty the all_threads list. Otherwise, the garbage
2166 // collector may attempt to resurrect some of these threads.
2167 all_threads = END_TSO_QUEUE;
2168
2169 // Wipe the task list, except the current Task.
2170 ACQUIRE_LOCK(&sched_mutex);
2171 for (task = all_tasks; task != NULL; task=task->all_link) {
2172 if (task != cap->running_task) {
2173 discardTask(task);
2174 }
2175 }
2176 RELEASE_LOCK(&sched_mutex);
2177
2178 #if defined(THREADED_RTS)
2179 // Wipe our spare workers list, they no longer exist. New
2180 // workers will be created if necessary.
2181 cap->spare_workers = NULL;
2182 cap->returning_tasks_hd = NULL;
2183 cap->returning_tasks_tl = NULL;
2184 #endif
2185
2186 // On Unix, all timers are reset in the child, so we need to start
2187 // the timer again.
2188 startTimer();
2189
2190 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2191 rts_checkSchedStatus("forkProcess",cap);
2192
2193 rts_unlock(cap);
2194 hs_exit(); // clean up and exit
2195 stg_exit(EXIT_SUCCESS);
2196 }
2197 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2198 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2199 return -1;
2200 #endif
2201 }
2202
2203 /* ---------------------------------------------------------------------------
2204 * Delete all the threads in the system
2205 * ------------------------------------------------------------------------- */
2206
2207 static void
2208 deleteAllThreads ( Capability *cap )
2209 {
2210 // NOTE: only safe to call if we own all capabilities.
2211
2212 StgTSO* t, *next;
2213 debugTrace(DEBUG_sched,"deleting all threads");
2214 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2215 if (t->what_next == ThreadRelocated) {
2216 next = t->link;
2217 } else {
2218 next = t->global_link;
2219 deleteThread(cap,t);
2220 }
2221 }
2222
2223 // The run queue now contains a bunch of ThreadKilled threads. We
2224 // must not throw these away: the main thread(s) will be in there
2225 // somewhere, and the main scheduler loop has to deal with it.
2226 // Also, the run queue is the only thing keeping these threads from
2227 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2228
2229 #if !defined(THREADED_RTS)
2230 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2231 ASSERT(sleeping_queue == END_TSO_QUEUE);
2232 #endif
2233 }
2234
2235 /* -----------------------------------------------------------------------------
2236 Managing the suspended_ccalling_tasks list.
2237 Locks required: sched_mutex
2238 -------------------------------------------------------------------------- */
2239
2240 STATIC_INLINE void
2241 suspendTask (Capability *cap, Task *task)
2242 {
2243 ASSERT(task->next == NULL && task->prev == NULL);
2244 task->next = cap->suspended_ccalling_tasks;
2245 task->prev = NULL;
2246 if (cap->suspended_ccalling_tasks) {
2247 cap->suspended_ccalling_tasks->prev = task;
2248 }
2249 cap->suspended_ccalling_tasks = task;
2250 }
2251
2252 STATIC_INLINE void
2253 recoverSuspendedTask (Capability *cap, Task *task)
2254 {
2255 if (task->prev) {
2256 task->prev->next = task->next;
2257 } else {
2258 ASSERT(cap->suspended_ccalling_tasks == task);
2259 cap->suspended_ccalling_tasks = task->next;
2260 }
2261 if (task->next) {
2262 task->next->prev = task->prev;
2263 }
2264 task->next = task->prev = NULL;
2265 }
2266
2267 /* ---------------------------------------------------------------------------
2268 * Suspending & resuming Haskell threads.
2269 *
2270 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2271 * its capability before calling the C function. This allows another
2272 * task to pick up the capability and carry on running Haskell
2273 * threads. It also means that if the C call blocks, it won't lock
2274 * the whole system.
2275 *
2276 * The Haskell thread making the C call is put to sleep for the
2277 * duration of the call, on the susepended_ccalling_threads queue. We
2278 * give out a token to the task, which it can use to resume the thread
2279 * on return from the C function.
2280 * ------------------------------------------------------------------------- */
2281
2282 void *
2283 suspendThread (StgRegTable *reg)
2284 {
2285 Capability *cap;
2286 int saved_errno;
2287 StgTSO *tso;
2288 Task *task;
2289 #if mingw32_HOST_OS
2290 StgWord32 saved_winerror;
2291 #endif
2292
2293 saved_errno = errno;
2294 #if mingw32_HOST_OS
2295 saved_winerror = GetLastError();
2296 #endif
2297
2298 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2299 */
2300 cap = regTableToCapability(reg);
2301
2302 task = cap->running_task;
2303 tso = cap->r.rCurrentTSO;
2304
2305 debugTrace(DEBUG_sched,
2306 "thread %lu did a safe foreign call",
2307 (unsigned long)cap->r.rCurrentTSO->id);
2308
2309 // XXX this might not be necessary --SDM
2310 tso->what_next = ThreadRunGHC;
2311
2312 threadPaused(cap,tso);
2313
2314 if ((tso->flags & TSO_BLOCKEX) == 0) {
2315 tso->why_blocked = BlockedOnCCall;
2316 tso->flags |= TSO_BLOCKEX;
2317 tso->flags &= ~TSO_INTERRUPTIBLE;
2318 } else {
2319 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2320 }
2321
2322 // Hand back capability
2323 task->suspended_tso = tso;
2324
2325 ACQUIRE_LOCK(&cap->lock);
2326
2327 suspendTask(cap,task);
2328 cap->in_haskell = rtsFalse;
2329 releaseCapability_(cap);
2330
2331 RELEASE_LOCK(&cap->lock);
2332
2333 #if defined(THREADED_RTS)
2334 /* Preparing to leave the RTS, so ensure there's a native thread/task
2335 waiting to take over.
2336 */
2337 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2338 #endif
2339
2340 errno = saved_errno;
2341 #if mingw32_HOST_OS
2342 SetLastError(saved_winerror);
2343 #endif
2344 return task;
2345 }
2346
2347 StgRegTable *
2348 resumeThread (void *task_)
2349 {
2350 StgTSO *tso;
2351 Capability *cap;
2352 Task *task = task_;
2353 int saved_errno;
2354 #if mingw32_HOST_OS
2355 StgWord32 saved_winerror;
2356 #endif
2357
2358 saved_errno = errno;
2359 #if mingw32_HOST_OS
2360 saved_winerror = GetLastError();
2361 #endif
2362
2363 cap = task->cap;
2364 // Wait for permission to re-enter the RTS with the result.
2365 waitForReturnCapability(&cap,task);
2366 // we might be on a different capability now... but if so, our
2367 // entry on the suspended_ccalling_tasks list will also have been
2368 // migrated.
2369
2370 // Remove the thread from the suspended list
2371 recoverSuspendedTask(cap,task);
2372
2373 tso = task->suspended_tso;
2374 task->suspended_tso = NULL;
2375 tso->link = END_TSO_QUEUE;
2376 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2377
2378 if (tso->why_blocked == BlockedOnCCall) {
2379 awakenBlockedExceptionQueue(cap,tso);
2380 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2381 }
2382
2383 /* Reset blocking status */
2384 tso->why_blocked = NotBlocked;
2385
2386 cap->r.rCurrentTSO = tso;
2387 cap->in_haskell = rtsTrue;
2388 errno = saved_errno;
2389 #if mingw32_HOST_OS
2390 SetLastError(saved_winerror);
2391 #endif
2392
2393 /* We might have GC'd, mark the TSO dirty again */
2394 dirtyTSO(tso);
2395
2396 IF_DEBUG(sanity, checkTSO(tso));
2397
2398 return &cap->r;
2399 }
2400
2401 /* ---------------------------------------------------------------------------
2402 * scheduleThread()
2403 *
2404 * scheduleThread puts a thread on the end of the runnable queue.
2405 * This will usually be done immediately after a thread is created.
2406 * The caller of scheduleThread must create the thread using e.g.
2407 * createThread and push an appropriate closure
2408 * on this thread's stack before the scheduler is invoked.
2409 * ------------------------------------------------------------------------ */
2410
2411 void
2412 scheduleThread(Capability *cap, StgTSO *tso)
2413 {
2414 // The thread goes at the *end* of the run-queue, to avoid possible
2415 // starvation of any threads already on the queue.
2416 appendToRunQueue(cap,tso);
2417 }
2418
2419 void
2420 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2421 {
2422 #if defined(THREADED_RTS)
2423 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2424 // move this thread from now on.
2425 cpu %= RtsFlags.ParFlags.nNodes;
2426 if (cpu == cap->no) {
2427 appendToRunQueue(cap,tso);
2428 } else {
2429 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2430 }
2431 #else
2432 appendToRunQueue(cap,tso);
2433 #endif
2434 }
2435
2436 Capability *
2437 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2438 {
2439 Task *task;
2440
2441 // We already created/initialised the Task
2442 task = cap->running_task;
2443
2444 // This TSO is now a bound thread; make the Task and TSO
2445 // point to each other.
2446 tso->bound = task;
2447 tso->cap = cap;
2448
2449 task->tso = tso;
2450 task->ret = ret;
2451 task->stat = NoStatus;
2452
2453 appendToRunQueue(cap,tso);
2454
2455 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2456
2457 #if defined(GRAN)
2458 /* GranSim specific init */
2459 CurrentTSO = m->tso; // the TSO to run
2460 procStatus[MainProc] = Busy; // status of main PE
2461 CurrentProc = MainProc; // PE to run it on
2462 #endif
2463
2464 cap = schedule(cap,task);
2465
2466 ASSERT(task->stat != NoStatus);
2467 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2468
2469 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2470 return cap;
2471 }
2472
2473 /* ----------------------------------------------------------------------------
2474 * Starting Tasks
2475 * ------------------------------------------------------------------------- */
2476
2477 #if defined(THREADED_RTS)
2478 void
2479 workerStart(Task *task)
2480 {
2481 Capability *cap;
2482
2483 // See startWorkerTask().
2484 ACQUIRE_LOCK(&task->lock);
2485 cap = task->cap;
2486 RELEASE_LOCK(&task->lock);
2487
2488 // set the thread-local pointer to the Task:
2489 taskEnter(task);
2490
2491 // schedule() runs without a lock.
2492 cap = schedule(cap,task);
2493
2494 // On exit from schedule(), we have a Capability.
2495 releaseCapability(cap);
2496 workerTaskStop(task);
2497 }
2498 #endif
2499
2500 /* ---------------------------------------------------------------------------
2501 * initScheduler()
2502 *
2503 * Initialise the scheduler. This resets all the queues - if the
2504 * queues contained any threads, they'll be garbage collected at the
2505 * next pass.
2506 *
2507 * ------------------------------------------------------------------------ */
2508
2509 void
2510 initScheduler(void)
2511 {
2512 #if defined(GRAN)
2513 nat i;
2514 for (i=0; i<=MAX_PROC; i++) {
2515 run_queue_hds[i] = END_TSO_QUEUE;
2516 run_queue_tls[i] = END_TSO_QUEUE;
2517 blocked_queue_hds[i] = END_TSO_QUEUE;
2518 blocked_queue_tls[i] = END_TSO_QUEUE;
2519 ccalling_threadss[i] = END_TSO_QUEUE;
2520 blackhole_queue[i] = END_TSO_QUEUE;
2521 sleeping_queue = END_TSO_QUEUE;
2522 }
2523 #elif !defined(THREADED_RTS)
2524 blocked_queue_hd = END_TSO_QUEUE;
2525 blocked_queue_tl = END_TSO_QUEUE;
2526 sleeping_queue = END_TSO_QUEUE;
2527 #endif
2528
2529 blackhole_queue = END_TSO_QUEUE;
2530 all_threads = END_TSO_QUEUE;
2531
2532 context_switch = 0;
2533 sched_state = SCHED_RUNNING;
2534
2535 #if defined(THREADED_RTS)
2536 /* Initialise the mutex and condition variables used by
2537 * the scheduler. */
2538 initMutex(&sched_mutex);
2539 #endif
2540
2541 ACQUIRE_LOCK(&sched_mutex);
2542
2543 /* A capability holds the state a native thread needs in
2544 * order to execute STG code. At least one capability is
2545 * floating around (only THREADED_RTS builds have more than one).
2546 */
2547 initCapabilities();
2548
2549 initTaskManager();
2550
2551 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2552 initSparkPools();
2553 #endif
2554
2555 #if defined(THREADED_RTS)
2556 /*
2557 * Eagerly start one worker to run each Capability, except for
2558 * Capability 0. The idea is that we're probably going to start a
2559 * bound thread on Capability 0 pretty soon, so we don't want a
2560 * worker task hogging it.
2561 */
2562 {
2563 nat i;
2564 Capability *cap;
2565 for (i = 1; i < n_capabilities; i++) {
2566 cap = &capabilities[i];
2567 ACQUIRE_LOCK(&cap->lock);
2568 startWorkerTask(cap, workerStart);
2569 RELEASE_LOCK(&cap->lock);
2570 }
2571 }
2572 #endif
2573
2574 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2575
2576 RELEASE_LOCK(&sched_mutex);
2577 }
2578
2579 void
2580 exitScheduler(
2581 rtsBool wait_foreign
2582 #if !defined(THREADED_RTS)
2583 __attribute__((unused))
2584 #endif
2585 )
2586 /* see Capability.c, shutdownCapability() */
2587 {
2588 Task *task = NULL;
2589
2590 #if defined(THREADED_RTS)
2591 ACQUIRE_LOCK(&sched_mutex);
2592 task = newBoundTask();
2593 RELEASE_LOCK(&sched_mutex);
2594 #endif
2595
2596 // If we haven't killed all the threads yet, do it now.
2597 if (sched_state < SCHED_SHUTTING_DOWN) {
2598 sched_state = SCHED_INTERRUPTING;
2599 scheduleDoGC(NULL,task,rtsFalse);
2600 }
2601 sched_state = SCHED_SHUTTING_DOWN;
2602
2603 #if defined(THREADED_RTS)
2604 {
2605 nat i;
2606
2607 for (i = 0; i < n_capabilities; i++) {
2608 shutdownCapability(&capabilities[i], task, wait_foreign);
2609 }
2610 boundTaskExiting(task);
2611 stopTaskManager();
2612 }
2613 #else
2614 freeCapability(&MainCapability);
2615 #endif
2616 }
2617
2618 void
2619 freeScheduler( void )
2620 {
2621 freeTaskManager();
2622 if (n_capabilities != 1) {
2623 stgFree(capabilities);
2624 }
2625 #if defined(THREADED_RTS)
2626 closeMutex(&sched_mutex);
2627 #endif
2628 }
2629
2630 /* ---------------------------------------------------------------------------
2631 Where are the roots that we know about?
2632
2633 - all the threads on the runnable queue
2634 - all the threads on the blocked queue
2635 - all the threads on the sleeping queue
2636 - all the thread currently executing a _ccall_GC
2637 - all the "main threads"
2638
2639 ------------------------------------------------------------------------ */
2640
2641 /* This has to be protected either by the scheduler monitor, or by the
2642 garbage collection monitor (probably the latter).
2643 KH @ 25/10/99
2644 */
2645
2646 void
2647 GetRoots( evac_fn evac )
2648 {
2649 nat i;
2650 Capability *cap;
2651 Task *task;
2652
2653 #if defined(GRAN)
2654 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2655 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2656 evac((StgClosure **)&run_queue_hds[i]);
2657 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2658 evac((StgClosure **)&run_queue_tls[i]);
2659
2660 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2661 evac((StgClosure **)&blocked_queue_hds[i]);
2662 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2663 evac((StgClosure **)&blocked_queue_tls[i]);
2664 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2665 evac((StgClosure **)&ccalling_threads[i]);
2666 }
2667
2668 markEventQueue();
2669
2670 #else /* !GRAN */
2671
2672 for (i = 0; i < n_capabilities; i++) {
2673 cap = &capabilities[i];
2674 evac((StgClosure **)(void *)&cap->run_queue_hd);
2675 evac((StgClosure **)(void *)&cap->run_queue_tl);
2676 #if defined(THREADED_RTS)
2677 evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2678 evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2679 #endif
2680 for (task = cap->suspended_ccalling_tasks; task != NULL;
2681 task=task->next) {
2682 debugTrace(DEBUG_sched,
2683 "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2684 evac((StgClosure **)(void *)&task->suspended_tso);
2685 }
2686
2687 }
2688
2689
2690 #if !defined(THREADED_RTS)
2691 evac((StgClosure **)(void *)&blocked_queue_hd);
2692 evac((StgClosure **)(void *)&blocked_queue_tl);
2693 evac((StgClosure **)(void *)&sleeping_queue);
2694 #endif
2695 #endif
2696
2697 // evac((StgClosure **)&blackhole_queue);
2698
2699 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2700 markSparkQueue(evac);
2701 #endif
2702
2703 #if defined(RTS_USER_SIGNALS)
2704 // mark the signal handlers (signals should be already blocked)
2705 if (RtsFlags.MiscFlags.install_signal_handlers) {
2706 markSignalHandlers(evac);
2707 }
2708 #endif
2709 }
2710
2711 /* -----------------------------------------------------------------------------
2712 performGC
2713
2714 This is the interface to the garbage collector from Haskell land.
2715 We provide this so that external C code can allocate and garbage
2716 collect when called from Haskell via _ccall_GC.
2717 -------------------------------------------------------------------------- */
2718
2719 static void
2720 performGC_(rtsBool force_major)
2721 {
2722 Task *task;
2723 // We must grab a new Task here, because the existing Task may be
2724 // associated with a particular Capability, and chained onto the
2725 // suspended_ccalling_tasks queue.
2726 ACQUIRE_LOCK(&sched_mutex);
2727 task = newBoundTask();
2728 RELEASE_LOCK(&sched_mutex);
2729 scheduleDoGC(NULL,task,force_major);
2730 boundTaskExiting(task);
2731 }
2732
2733 void
2734 performGC(void)
2735 {
2736 performGC_(rtsFalse);
2737 }
2738
2739 void
2740 performMajorGC(void)
2741 {
2742 performGC_(rtsTrue);
2743 }
2744
2745 /* -----------------------------------------------------------------------------
2746 Stack overflow
2747
2748 If the thread has reached its maximum stack size, then raise the
2749 StackOverflow exception in the offending thread. Otherwise
2750 relocate the TSO into a larger chunk of memory and adjust its stack
2751 size appropriately.
2752 -------------------------------------------------------------------------- */
2753
2754 static StgTSO *
2755 threadStackOverflow(Capability *cap, StgTSO *tso)
2756 {
2757 nat new_stack_size, stack_words;
2758 lnat new_tso_size;
2759 StgPtr new_sp;
2760 StgTSO *dest;
2761
2762 IF_DEBUG(sanity,checkTSO(tso));
2763
2764 // don't allow throwTo() to modify the blocked_exceptions queue
2765 // while we are moving the TSO:
2766 lockClosure((StgClosure *)tso);
2767
2768 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2769 // NB. never raise a StackOverflow exception if the thread is
2770 // inside Control.Exceptino.block. It is impractical to protect
2771 // against stack overflow exceptions, since virtually anything
2772 // can raise one (even 'catch'), so this is the only sensible
2773 // thing to do here. See bug #767.
2774
2775 debugTrace(DEBUG_gc,
2776 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2777 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2778 IF_DEBUG(gc,
2779 /* If we're debugging, just print out the top of the stack */
2780 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2781 tso->sp+64)));
2782
2783 // Send this thread the StackOverflow exception
2784 unlockTSO(tso);
2785 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2786 return tso;
2787 }
2788
2789 /* Try to double the current stack size. If that takes us over the
2790 * maximum stack size for this thread, then use the maximum instead.
2791 * Finally round up so the TSO ends up as a whole number of blocks.
2792 */
2793 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2794 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2795 TSO_STRUCT_SIZE)/sizeof(W_);
2796 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2797 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2798
2799 debugTrace(DEBUG_sched,
2800 "increasing stack size from %ld words to %d.",
2801 (long)tso->stack_size, new_stack_size);
2802
2803 dest = (StgTSO *)allocate(new_tso_size);
2804 TICK_ALLOC_TSO(new_stack_size,0);
2805
2806 /* copy the TSO block and the old stack into the new area */
2807 memcpy(dest,tso,TSO_STRUCT_SIZE);
2808 stack_words = tso->stack + tso->stack_size - tso->sp;
2809 new_sp = (P_)dest + new_tso_size - stack_words;
2810 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2811
2812 /* relocate the stack pointers... */
2813 dest->sp = new_sp;
2814 dest->stack_size = new_stack_size;
2815
2816 /* Mark the old TSO as relocated. We have to check for relocated
2817 * TSOs in the garbage collector and any primops that deal with TSOs.
2818 *
2819 * It's important to set the sp value to just beyond the end
2820 * of the stack, so we don't attempt to scavenge any part of the
2821 * dead TSO's stack.
2822 */
2823 tso->what_next = ThreadRelocated;
2824 tso->link = dest;
2825 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2826 tso->why_blocked = NotBlocked;
2827
2828 IF_PAR_DEBUG(verbose,
2829 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2830 tso->id, tso, tso->stack_size);
2831 /* If we're debugging, just print out the top of the stack */
2832 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2833 tso->sp+64)));
2834
2835 unlockTSO(dest);
2836 unlockTSO(tso);
2837
2838 IF_DEBUG(sanity,checkTSO(dest));
2839 #if 0
2840 IF_DEBUG(scheduler,printTSO(dest));
2841 #endif
2842
2843 return dest;
2844 }
2845
2846 /* ---------------------------------------------------------------------------
2847 Interrupt execution
2848 - usually called inside a signal handler so it mustn't do anything fancy.
2849 ------------------------------------------------------------------------ */
2850
2851 void
2852 interruptStgRts(void)
2853 {
2854 sched_state = SCHED_INTERRUPTING;
2855 context_switch = 1;
2856 wakeUpRts();
2857 }
2858
2859 /* -----------------------------------------------------------------------------
2860 Wake up the RTS
2861
2862 This function causes at least one OS thread to wake up and run the
2863 scheduler loop. It is invoked when the RTS might be deadlocked, or
2864 an external event has arrived that may need servicing (eg. a
2865 keyboard interrupt).
2866
2867 In the single-threaded RTS we don't do anything here; we only have
2868 one thread anyway, and the event that caused us to want to wake up
2869 will have interrupted any blocking system call in progress anyway.
2870 -------------------------------------------------------------------------- */
2871
2872 void
2873 wakeUpRts(void)
2874 {
2875 #if defined(THREADED_RTS)
2876 // This forces the IO Manager thread to wakeup, which will
2877 // in turn ensure that some OS thread wakes up and runs the
2878 // scheduler loop, which will cause a GC and deadlock check.
2879 ioManagerWakeup();
2880 #endif
2881 }
2882
2883 /* -----------------------------------------------------------------------------
2884 * checkBlackHoles()
2885 *
2886 * Check the blackhole_queue for threads that can be woken up. We do
2887 * this periodically: before every GC, and whenever the run queue is
2888 * empty.
2889 *
2890 * An elegant solution might be to just wake up all the blocked
2891 * threads with awakenBlockedQueue occasionally: they'll go back to
2892 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2893 * doesn't give us a way to tell whether we've actually managed to
2894 * wake up any threads, so we would be busy-waiting.
2895 *
2896 * -------------------------------------------------------------------------- */
2897
2898 static rtsBool
2899 checkBlackHoles (Capability *cap)
2900 {
2901 StgTSO **prev, *t;
2902 rtsBool any_woke_up = rtsFalse;
2903 StgHalfWord type;
2904
2905 // blackhole_queue is global:
2906 ASSERT_LOCK_HELD(&sched_mutex);
2907
2908 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2909
2910 // ASSUMES: sched_mutex
2911 prev = &blackhole_queue;
2912 t = blackhole_queue;
2913 while (t != END_TSO_QUEUE) {
2914 ASSERT(t->why_blocked == BlockedOnBlackHole);
2915 type = get_itbl(t->block_info.closure)->type;
2916 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2917 IF_DEBUG(sanity,checkTSO(t));
2918 t = unblockOne(cap, t);
2919 // urk, the threads migrate to the current capability
2920 // here, but we'd like to keep them on the original one.
2921 *prev = t;
2922 any_woke_up = rtsTrue;
2923 } else {
2924 prev = &t->link;
2925 t = t->link;
2926 }
2927 }
2928
2929 return any_woke_up;
2930 }
2931
2932 /* -----------------------------------------------------------------------------
2933 Deleting threads
2934
2935 This is used for interruption (^C) and forking, and corresponds to
2936 raising an exception but without letting the thread catch the
2937 exception.
2938 -------------------------------------------------------------------------- */
2939
2940 static void
2941 deleteThread (Capability *cap, StgTSO *tso)
2942 {
2943 // NOTE: must only be called on a TSO that we have exclusive
2944 // access to, because we will call throwToSingleThreaded() below.
2945 // The TSO must be on the run queue of the Capability we own, or
2946 // we must own all Capabilities.
2947
2948 if (tso->why_blocked != BlockedOnCCall &&
2949 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2950 throwToSingleThreaded(cap,tso,NULL);
2951 }
2952 }
2953
2954 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2955 static void
2956 deleteThread_(Capability *cap, StgTSO *tso)
2957 { // for forkProcess only:
2958 // like deleteThread(), but we delete threads in foreign calls, too.
2959
2960 if (tso->why_blocked == BlockedOnCCall ||
2961 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2962 unblockOne(cap,tso);
2963 tso->what_next = ThreadKilled;
2964 } else {
2965 deleteThread(cap,tso);
2966 }
2967 }
2968 #endif
2969
2970 /* -----------------------------------------------------------------------------
2971 raiseExceptionHelper
2972
2973 This function is called by the raise# primitve, just so that we can
2974 move some of the tricky bits of raising an exception from C-- into
2975 C. Who knows, it might be a useful re-useable thing here too.
2976 -------------------------------------------------------------------------- */
2977
2978 StgWord
2979 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2980 {
2981 Capability *cap = regTableToCapability(reg);
2982 StgThunk *raise_closure = NULL;
2983 StgPtr p, next;
2984 StgRetInfoTable *info;
2985 //
2986 // This closure represents the expression 'raise# E' where E
2987 // is the exception raise. It is used to overwrite all the
2988 // thunks which are currently under evaluataion.
2989 //
2990
2991 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2992 // LDV profiling: stg_raise_info has THUNK as its closure
2993 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2994 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2995 // 1 does not cause any problem unless profiling is performed.
2996 // However, when LDV profiling goes on, we need to linearly scan
2997 // small object pool, where raise_closure is stored, so we should
2998 // use MIN_UPD_SIZE.
2999 //
3000 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3001 // sizeofW(StgClosure)+1);
3002 //
3003
3004 //
3005 // Walk up the stack, looking for the catch frame. On the way,
3006 // we update any closures pointed to from update frames with the
3007 // raise closure that we just built.
3008 //
3009 p = tso->sp;
3010 while(1) {
3011 info = get_ret_itbl((StgClosure *)p);
3012 next = p + stack_frame_sizeW((StgClosure *)p);
3013 switch (info->i.type) {
3014
3015 case UPDATE_FRAME:
3016 // Only create raise_closure if we need to.
3017 if (raise_closure == NULL) {
3018 raise_closure =
3019 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3020 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3021 raise_closure->payload[0] = exception;
3022 }
3023 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3024 p = next;
3025 continue;
3026
3027 case ATOMICALLY_FRAME:
3028 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3029 tso->sp = p;
3030 return ATOMICALLY_FRAME;
3031
3032 case CATCH_FRAME:
3033 tso->sp = p;
3034 return CATCH_FRAME;
3035
3036 case CATCH_STM_FRAME:
3037 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3038 tso->sp = p;
3039 return CATCH_STM_FRAME;
3040
3041 case STOP_FRAME:
3042 tso->sp = p;
3043 return STOP_FRAME;
3044
3045 case CATCH_RETRY_FRAME:
3046 default:
3047 p = next;
3048 continue;
3049 }
3050 }
3051 }
3052
3053
3054 /* -----------------------------------------------------------------------------
3055 findRetryFrameHelper
3056
3057 This function is called by the retry# primitive. It traverses the stack
3058 leaving tso->sp referring to the frame which should handle the retry.
3059
3060 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3061 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3062
3063 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3064 create) because retries are not considered to be exceptions, despite the
3065 similar implementation.
3066
3067 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3068 not be created within memory transactions.
3069 -------------------------------------------------------------------------- */
3070
3071 StgWord
3072 findRetryFrameHelper (StgTSO *tso)
3073 {
3074 StgPtr p, next;
3075 StgRetInfoTable *info;
3076
3077 p = tso -> sp;
3078 while (1) {
3079 info = get_ret_itbl((StgClosure *)p);
3080 next = p + stack_frame_sizeW((StgClosure *)p);
3081 switch (info->i.type) {
3082
3083 case ATOMICALLY_FRAME:
3084 debugTrace(DEBUG_stm,
3085 "found ATOMICALLY_FRAME at %p during retry", p);
3086 tso->sp = p;
3087 return ATOMICALLY_FRAME;
3088
3089 case CATCH_RETRY_FRAME:
3090 debugTrace(DEBUG_stm,
3091 "found CATCH_RETRY_FRAME at %p during retrry", p);
3092 tso->sp = p;
3093 return CATCH_RETRY_FRAME;
3094
3095 case CATCH_STM_FRAME: {
3096 debugTrace(DEBUG_stm,
3097 "found CATCH_STM_FRAME at %p during retry", p);
3098 StgTRecHeader *trec = tso -> trec;
3099 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3100 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3101 stmAbortTransaction(tso -> cap, trec);
3102 stmFreeAbortedTRec(tso -> cap, trec);
3103 tso -> trec = outer;
3104 p = next;
3105 continue;
3106 }
3107
3108
3109 default:
3110 ASSERT(info->i.type != CATCH_FRAME);
3111 ASSERT(info->i.type != STOP_FRAME);
3112 p = next;
3113 continue;
3114 }
3115 }
3116 }
3117
3118 /* -----------------------------------------------------------------------------
3119 resurrectThreads is called after garbage collection on the list of
3120 threads found to be garbage. Each of these threads will be woken
3121 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3122 on an MVar, or NonTermination if the thread was blocked on a Black
3123 Hole.
3124
3125 Locks: assumes we hold *all* the capabilities.
3126 -------------------------------------------------------------------------- */
3127
3128 void
3129 resurrectThreads (StgTSO *threads)
3130 {
3131 StgTSO *tso, *next;
3132 Capability *cap;
3133
3134 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3135 next = tso->global_link;
3136 tso->global_link = all_threads;
3137 all_threads = tso;
3138 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3139
3140 // Wake up the thread on the Capability it was last on
3141 cap = tso->cap;
3142
3143 switch (tso->why_blocked) {
3144 case BlockedOnMVar:
3145 case BlockedOnException:
3146 /* Called by GC - sched_mutex lock is currently held. */
3147 throwToSingleThreaded(cap, tso,
3148 (StgClosure *)BlockedOnDeadMVar_closure);
3149 break;
3150 case BlockedOnBlackHole:
3151 throwToSingleThreaded(cap, tso,
3152 (StgClosure *)NonTermination_closure);
3153 break;
3154 case BlockedOnSTM:
3155 throwToSingleThreaded(cap, tso,
3156 (StgClosure *)BlockedIndefinitely_closure);
3157 break;
3158 case NotBlocked:
3159 /* This might happen if the thread was blocked on a black hole
3160 * belonging to a thread that we've just woken up (raiseAsync
3161 * can wake up threads, remember...).
3162 */
3163 continue;
3164 default:
3165 barf("resurrectThreads: thread blocked in a strange way");
3166 }
3167 }
3168 }