b9b4325c0f17e2d556ee31730fcf6b3ec53ca5a2
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #ifdef PROFILING
33 #include "Proftimer.h"
34 #include "ProfHeap.h"
35 #endif
36 #if defined(GRAN) || defined(PARALLEL_HASKELL)
37 # include "GranSimRts.h"
38 # include "GranSim.h"
39 # include "ParallelRts.h"
40 # include "Parallel.h"
41 # include "ParallelDebug.h"
42 # include "FetchMe.h"
43 # include "HLC.h"
44 #endif
45 #include "Sparks.h"
46 #include "Capability.h"
47 #include "Task.h"
48 #include "AwaitEvent.h"
49 #if defined(mingw32_HOST_OS)
50 #include "win32/IOManager.h"
51 #endif
52 #include "Trace.h"
53 #include "RaiseAsync.h"
54 #include "Threads.h"
55
56 #ifdef HAVE_SYS_TYPES_H
57 #include <sys/types.h>
58 #endif
59 #ifdef HAVE_UNISTD_H
60 #include <unistd.h>
61 #endif
62
63 #include <string.h>
64 #include <stdlib.h>
65 #include <stdarg.h>
66
67 #ifdef HAVE_ERRNO_H
68 #include <errno.h>
69 #endif
70
71 // Turn off inlining when debugging - it obfuscates things
72 #ifdef DEBUG
73 # undef STATIC_INLINE
74 # define STATIC_INLINE static
75 #endif
76
77 /* -----------------------------------------------------------------------------
78 * Global variables
79 * -------------------------------------------------------------------------- */
80
81 #if defined(GRAN)
82
83 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
85
86 /*
87 In GranSim we have a runnable and a blocked queue for each processor.
88 In order to minimise code changes new arrays run_queue_hds/tls
89 are created. run_queue_hd is then a short cut (macro) for
90 run_queue_hds[CurrentProc] (see GranSim.h).
91 -- HWL
92 */
93 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
94 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
95 StgTSO *ccalling_threadss[MAX_PROC];
96 /* We use the same global list of threads (all_threads) in GranSim as in
97 the std RTS (i.e. we are cheating). However, we don't use this list in
98 the GranSim specific code at the moment (so we are only potentially
99 cheating). */
100
101 #else /* !GRAN */
102
103 #if !defined(THREADED_RTS)
104 // Blocked/sleeping thrads
105 StgTSO *blocked_queue_hd = NULL;
106 StgTSO *blocked_queue_tl = NULL;
107 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
108 #endif
109
110 /* Threads blocked on blackholes.
111 * LOCK: sched_mutex+capability, or all capabilities
112 */
113 StgTSO *blackhole_queue = NULL;
114 #endif
115
116 /* The blackhole_queue should be checked for threads to wake up. See
117 * Schedule.h for more thorough comment.
118 * LOCK: none (doesn't matter if we miss an update)
119 */
120 rtsBool blackholes_need_checking = rtsFalse;
121
122 /* Linked list of all threads.
123 * Used for detecting garbage collected threads.
124 * LOCK: sched_mutex+capability, or all capabilities
125 */
126 StgTSO *all_threads = NULL;
127
128 /* flag set by signal handler to precipitate a context switch
129 * LOCK: none (just an advisory flag)
130 */
131 int context_switch = 0;
132
133 /* flag that tracks whether we have done any execution in this time slice.
134 * LOCK: currently none, perhaps we should lock (but needs to be
135 * updated in the fast path of the scheduler).
136 */
137 nat recent_activity = ACTIVITY_YES;
138
139 /* if this flag is set as well, give up execution
140 * LOCK: none (changes once, from false->true)
141 */
142 rtsBool sched_state = SCHED_RUNNING;
143
144 #if defined(GRAN)
145 StgTSO *CurrentTSO;
146 #endif
147
148 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
149 * exists - earlier gccs apparently didn't.
150 * -= chak
151 */
152 StgTSO dummy_tso;
153
154 /*
155 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
156 * in an MT setting, needed to signal that a worker thread shouldn't hang around
157 * in the scheduler when it is out of work.
158 */
159 rtsBool shutting_down_scheduler = rtsFalse;
160
161 /*
162 * This mutex protects most of the global scheduler data in
163 * the THREADED_RTS runtime.
164 */
165 #if defined(THREADED_RTS)
166 Mutex sched_mutex;
167 #endif
168
169 #if defined(PARALLEL_HASKELL)
170 StgTSO *LastTSO;
171 rtsTime TimeOfLastYield;
172 rtsBool emitSchedule = rtsTrue;
173 #endif
174
175 #if !defined(mingw32_HOST_OS)
176 #define FORKPROCESS_PRIMOP_SUPPORTED
177 #endif
178
179 /* -----------------------------------------------------------------------------
180 * static function prototypes
181 * -------------------------------------------------------------------------- */
182
183 static Capability *schedule (Capability *initialCapability, Task *task);
184
185 //
186 // These function all encapsulate parts of the scheduler loop, and are
187 // abstracted only to make the structure and control flow of the
188 // scheduler clearer.
189 //
190 static void schedulePreLoop (void);
191 #if defined(THREADED_RTS)
192 static void schedulePushWork(Capability *cap, Task *task);
193 #endif
194 static void scheduleStartSignalHandlers (Capability *cap);
195 static void scheduleCheckBlockedThreads (Capability *cap);
196 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197 static void scheduleCheckBlackHoles (Capability *cap);
198 static void scheduleDetectDeadlock (Capability *cap, Task *task);
199 #if defined(GRAN)
200 static StgTSO *scheduleProcessEvent(rtsEvent *event);
201 #endif
202 #if defined(PARALLEL_HASKELL)
203 static StgTSO *scheduleSendPendingMessages(void);
204 static void scheduleActivateSpark(void);
205 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
206 #endif
207 #if defined(PAR) || defined(GRAN)
208 static void scheduleGranParReport(void);
209 #endif
210 static void schedulePostRunThread(void);
211 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
213 StgTSO *t);
214 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
215 nat prev_what_next );
216 static void scheduleHandleThreadBlocked( StgTSO *t );
217 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
218 StgTSO *t );
219 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220 static Capability *scheduleDoGC(Capability *cap, Task *task,
221 rtsBool force_major,
222 void (*get_roots)(evac_fn));
223
224 static rtsBool checkBlackHoles(Capability *cap);
225 static void AllRoots(evac_fn evac);
226
227 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
228
229 static void deleteThread (Capability *cap, StgTSO *tso);
230 static void deleteAllThreads (Capability *cap);
231
232 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
233 static void deleteThread_(Capability *cap, StgTSO *tso);
234 #endif
235
236 #if defined(PARALLEL_HASKELL)
237 StgTSO * createSparkThread(rtsSpark spark);
238 StgTSO * activateSpark (rtsSpark spark);
239 #endif
240
241 #ifdef DEBUG
242 static char *whatNext_strs[] = {
243 "(unknown)",
244 "ThreadRunGHC",
245 "ThreadInterpret",
246 "ThreadKilled",
247 "ThreadRelocated",
248 "ThreadComplete"
249 };
250 #endif
251
252 /* -----------------------------------------------------------------------------
253 * Putting a thread on the run queue: different scheduling policies
254 * -------------------------------------------------------------------------- */
255
256 STATIC_INLINE void
257 addToRunQueue( Capability *cap, StgTSO *t )
258 {
259 #if defined(PARALLEL_HASKELL)
260 if (RtsFlags.ParFlags.doFairScheduling) {
261 // this does round-robin scheduling; good for concurrency
262 appendToRunQueue(cap,t);
263 } else {
264 // this does unfair scheduling; good for parallelism
265 pushOnRunQueue(cap,t);
266 }
267 #else
268 // this does round-robin scheduling; good for concurrency
269 appendToRunQueue(cap,t);
270 #endif
271 }
272
273 /* ---------------------------------------------------------------------------
274 Main scheduling loop.
275
276 We use round-robin scheduling, each thread returning to the
277 scheduler loop when one of these conditions is detected:
278
279 * out of heap space
280 * timer expires (thread yields)
281 * thread blocks
282 * thread ends
283 * stack overflow
284
285 GRAN version:
286 In a GranSim setup this loop iterates over the global event queue.
287 This revolves around the global event queue, which determines what
288 to do next. Therefore, it's more complicated than either the
289 concurrent or the parallel (GUM) setup.
290
291 GUM version:
292 GUM iterates over incoming messages.
293 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
294 and sends out a fish whenever it has nothing to do; in-between
295 doing the actual reductions (shared code below) it processes the
296 incoming messages and deals with delayed operations
297 (see PendingFetches).
298 This is not the ugliest code you could imagine, but it's bloody close.
299
300 ------------------------------------------------------------------------ */
301
302 static Capability *
303 schedule (Capability *initialCapability, Task *task)
304 {
305 StgTSO *t;
306 Capability *cap;
307 StgThreadReturnCode ret;
308 #if defined(GRAN)
309 rtsEvent *event;
310 #elif defined(PARALLEL_HASKELL)
311 StgTSO *tso;
312 GlobalTaskId pe;
313 rtsBool receivedFinish = rtsFalse;
314 # if defined(DEBUG)
315 nat tp_size, sp_size; // stats only
316 # endif
317 #endif
318 nat prev_what_next;
319 rtsBool ready_to_gc;
320 #if defined(THREADED_RTS)
321 rtsBool first = rtsTrue;
322 #endif
323
324 cap = initialCapability;
325
326 // Pre-condition: this task owns initialCapability.
327 // The sched_mutex is *NOT* held
328 // NB. on return, we still hold a capability.
329
330 debugTrace (DEBUG_sched,
331 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
332 task, initialCapability);
333
334 schedulePreLoop();
335
336 // -----------------------------------------------------------
337 // Scheduler loop starts here:
338
339 #if defined(PARALLEL_HASKELL)
340 #define TERMINATION_CONDITION (!receivedFinish)
341 #elif defined(GRAN)
342 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
343 #else
344 #define TERMINATION_CONDITION rtsTrue
345 #endif
346
347 while (TERMINATION_CONDITION) {
348
349 #if defined(GRAN)
350 /* Choose the processor with the next event */
351 CurrentProc = event->proc;
352 CurrentTSO = event->tso;
353 #endif
354
355 #if defined(THREADED_RTS)
356 if (first) {
357 // don't yield the first time, we want a chance to run this
358 // thread for a bit, even if there are others banging at the
359 // door.
360 first = rtsFalse;
361 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
362 } else {
363 // Yield the capability to higher-priority tasks if necessary.
364 yieldCapability(&cap, task);
365 }
366 #endif
367
368 #if defined(THREADED_RTS)
369 schedulePushWork(cap,task);
370 #endif
371
372 // Check whether we have re-entered the RTS from Haskell without
373 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
374 // call).
375 if (cap->in_haskell) {
376 errorBelch("schedule: re-entered unsafely.\n"
377 " Perhaps a 'foreign import unsafe' should be 'safe'?");
378 stg_exit(EXIT_FAILURE);
379 }
380
381 // The interruption / shutdown sequence.
382 //
383 // In order to cleanly shut down the runtime, we want to:
384 // * make sure that all main threads return to their callers
385 // with the state 'Interrupted'.
386 // * clean up all OS threads assocated with the runtime
387 // * free all memory etc.
388 //
389 // So the sequence for ^C goes like this:
390 //
391 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
392 // arranges for some Capability to wake up
393 //
394 // * all threads in the system are halted, and the zombies are
395 // placed on the run queue for cleaning up. We acquire all
396 // the capabilities in order to delete the threads, this is
397 // done by scheduleDoGC() for convenience (because GC already
398 // needs to acquire all the capabilities). We can't kill
399 // threads involved in foreign calls.
400 //
401 // * somebody calls shutdownHaskell(), which calls exitScheduler()
402 //
403 // * sched_state := SCHED_SHUTTING_DOWN
404 //
405 // * all workers exit when the run queue on their capability
406 // drains. All main threads will also exit when their TSO
407 // reaches the head of the run queue and they can return.
408 //
409 // * eventually all Capabilities will shut down, and the RTS can
410 // exit.
411 //
412 // * We might be left with threads blocked in foreign calls,
413 // we should really attempt to kill these somehow (TODO);
414
415 switch (sched_state) {
416 case SCHED_RUNNING:
417 break;
418 case SCHED_INTERRUPTING:
419 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
420 #if defined(THREADED_RTS)
421 discardSparksCap(cap);
422 #endif
423 /* scheduleDoGC() deletes all the threads */
424 cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
425 break;
426 case SCHED_SHUTTING_DOWN:
427 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
428 // If we are a worker, just exit. If we're a bound thread
429 // then we will exit below when we've removed our TSO from
430 // the run queue.
431 if (task->tso == NULL && emptyRunQueue(cap)) {
432 return cap;
433 }
434 break;
435 default:
436 barf("sched_state: %d", sched_state);
437 }
438
439 #if defined(THREADED_RTS)
440 // If the run queue is empty, take a spark and turn it into a thread.
441 {
442 if (emptyRunQueue(cap)) {
443 StgClosure *spark;
444 spark = findSpark(cap);
445 if (spark != NULL) {
446 debugTrace(DEBUG_sched,
447 "turning spark of closure %p into a thread",
448 (StgClosure *)spark);
449 createSparkThread(cap,spark);
450 }
451 }
452 }
453 #endif // THREADED_RTS
454
455 scheduleStartSignalHandlers(cap);
456
457 // Only check the black holes here if we've nothing else to do.
458 // During normal execution, the black hole list only gets checked
459 // at GC time, to avoid repeatedly traversing this possibly long
460 // list each time around the scheduler.
461 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
462
463 scheduleCheckWakeupThreads(cap);
464
465 scheduleCheckBlockedThreads(cap);
466
467 scheduleDetectDeadlock(cap,task);
468 #if defined(THREADED_RTS)
469 cap = task->cap; // reload cap, it might have changed
470 #endif
471
472 // Normally, the only way we can get here with no threads to
473 // run is if a keyboard interrupt received during
474 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
475 // Additionally, it is not fatal for the
476 // threaded RTS to reach here with no threads to run.
477 //
478 // win32: might be here due to awaitEvent() being abandoned
479 // as a result of a console event having been delivered.
480 if ( emptyRunQueue(cap) ) {
481 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
482 ASSERT(sched_state >= SCHED_INTERRUPTING);
483 #endif
484 continue; // nothing to do
485 }
486
487 #if defined(PARALLEL_HASKELL)
488 scheduleSendPendingMessages();
489 if (emptyRunQueue(cap) && scheduleActivateSpark())
490 continue;
491
492 #if defined(SPARKS)
493 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
494 #endif
495
496 /* If we still have no work we need to send a FISH to get a spark
497 from another PE */
498 if (emptyRunQueue(cap)) {
499 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
500 ASSERT(rtsFalse); // should not happen at the moment
501 }
502 // from here: non-empty run queue.
503 // TODO: merge above case with this, only one call processMessages() !
504 if (PacketsWaiting()) { /* process incoming messages, if
505 any pending... only in else
506 because getRemoteWork waits for
507 messages as well */
508 receivedFinish = processMessages();
509 }
510 #endif
511
512 #if defined(GRAN)
513 scheduleProcessEvent(event);
514 #endif
515
516 //
517 // Get a thread to run
518 //
519 t = popRunQueue(cap);
520
521 #if defined(GRAN) || defined(PAR)
522 scheduleGranParReport(); // some kind of debuging output
523 #else
524 // Sanity check the thread we're about to run. This can be
525 // expensive if there is lots of thread switching going on...
526 IF_DEBUG(sanity,checkTSO(t));
527 #endif
528
529 #if defined(THREADED_RTS)
530 // Check whether we can run this thread in the current task.
531 // If not, we have to pass our capability to the right task.
532 {
533 Task *bound = t->bound;
534
535 if (bound) {
536 if (bound == task) {
537 debugTrace(DEBUG_sched,
538 "### Running thread %lu in bound thread", (unsigned long)t->id);
539 // yes, the Haskell thread is bound to the current native thread
540 } else {
541 debugTrace(DEBUG_sched,
542 "### thread %lu bound to another OS thread", (unsigned long)t->id);
543 // no, bound to a different Haskell thread: pass to that thread
544 pushOnRunQueue(cap,t);
545 continue;
546 }
547 } else {
548 // The thread we want to run is unbound.
549 if (task->tso) {
550 debugTrace(DEBUG_sched,
551 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
552 // no, the current native thread is bound to a different
553 // Haskell thread, so pass it to any worker thread
554 pushOnRunQueue(cap,t);
555 continue;
556 }
557 }
558 }
559 #endif
560
561 cap->r.rCurrentTSO = t;
562
563 /* context switches are initiated by the timer signal, unless
564 * the user specified "context switch as often as possible", with
565 * +RTS -C0
566 */
567 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
568 && !emptyThreadQueues(cap)) {
569 context_switch = 1;
570 }
571
572 run_thread:
573
574 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
575 (long)t->id, whatNext_strs[t->what_next]);
576
577 #if defined(PROFILING)
578 startHeapProfTimer();
579 #endif
580
581 // Check for exceptions blocked on this thread
582 maybePerformBlockedException (cap, t);
583
584 // ----------------------------------------------------------------------
585 // Run the current thread
586
587 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
588 ASSERT(t->cap == cap);
589
590 prev_what_next = t->what_next;
591
592 errno = t->saved_errno;
593 cap->in_haskell = rtsTrue;
594
595 dirtyTSO(t);
596
597 recent_activity = ACTIVITY_YES;
598
599 switch (prev_what_next) {
600
601 case ThreadKilled:
602 case ThreadComplete:
603 /* Thread already finished, return to scheduler. */
604 ret = ThreadFinished;
605 break;
606
607 case ThreadRunGHC:
608 {
609 StgRegTable *r;
610 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
611 cap = regTableToCapability(r);
612 ret = r->rRet;
613 break;
614 }
615
616 case ThreadInterpret:
617 cap = interpretBCO(cap);
618 ret = cap->r.rRet;
619 break;
620
621 default:
622 barf("schedule: invalid what_next field");
623 }
624
625 cap->in_haskell = rtsFalse;
626
627 // The TSO might have moved, eg. if it re-entered the RTS and a GC
628 // happened. So find the new location:
629 t = cap->r.rCurrentTSO;
630
631 // We have run some Haskell code: there might be blackhole-blocked
632 // threads to wake up now.
633 // Lock-free test here should be ok, we're just setting a flag.
634 if ( blackhole_queue != END_TSO_QUEUE ) {
635 blackholes_need_checking = rtsTrue;
636 }
637
638 // And save the current errno in this thread.
639 // XXX: possibly bogus for SMP because this thread might already
640 // be running again, see code below.
641 t->saved_errno = errno;
642
643 #if defined(THREADED_RTS)
644 // If ret is ThreadBlocked, and this Task is bound to the TSO that
645 // blocked, we are in limbo - the TSO is now owned by whatever it
646 // is blocked on, and may in fact already have been woken up,
647 // perhaps even on a different Capability. It may be the case
648 // that task->cap != cap. We better yield this Capability
649 // immediately and return to normaility.
650 if (ret == ThreadBlocked) {
651 debugTrace(DEBUG_sched,
652 "--<< thread %lu (%s) stopped: blocked",
653 (unsigned long)t->id, whatNext_strs[t->what_next]);
654 continue;
655 }
656 #endif
657
658 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
659 ASSERT(t->cap == cap);
660
661 // ----------------------------------------------------------------------
662
663 // Costs for the scheduler are assigned to CCS_SYSTEM
664 #if defined(PROFILING)
665 stopHeapProfTimer();
666 CCCS = CCS_SYSTEM;
667 #endif
668
669 schedulePostRunThread();
670
671 ready_to_gc = rtsFalse;
672
673 switch (ret) {
674 case HeapOverflow:
675 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
676 break;
677
678 case StackOverflow:
679 scheduleHandleStackOverflow(cap,task,t);
680 break;
681
682 case ThreadYielding:
683 if (scheduleHandleYield(cap, t, prev_what_next)) {
684 // shortcut for switching between compiler/interpreter:
685 goto run_thread;
686 }
687 break;
688
689 case ThreadBlocked:
690 scheduleHandleThreadBlocked(t);
691 break;
692
693 case ThreadFinished:
694 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
695 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
696 break;
697
698 default:
699 barf("schedule: invalid thread return code %d", (int)ret);
700 }
701
702 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
703 if (ready_to_gc) {
704 cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
705 }
706 } /* end of while() */
707
708 debugTrace(PAR_DEBUG_verbose,
709 "== Leaving schedule() after having received Finish");
710 }
711
712 /* ----------------------------------------------------------------------------
713 * Setting up the scheduler loop
714 * ------------------------------------------------------------------------- */
715
716 static void
717 schedulePreLoop(void)
718 {
719 #if defined(GRAN)
720 /* set up first event to get things going */
721 /* ToDo: assign costs for system setup and init MainTSO ! */
722 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
723 ContinueThread,
724 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
725
726 debugTrace (DEBUG_gran,
727 "GRAN: Init CurrentTSO (in schedule) = %p",
728 CurrentTSO);
729 IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
730
731 if (RtsFlags.GranFlags.Light) {
732 /* Save current time; GranSim Light only */
733 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
734 }
735 #endif
736 }
737
738 /* -----------------------------------------------------------------------------
739 * schedulePushWork()
740 *
741 * Push work to other Capabilities if we have some.
742 * -------------------------------------------------------------------------- */
743
744 #if defined(THREADED_RTS)
745 static void
746 schedulePushWork(Capability *cap USED_IF_THREADS,
747 Task *task USED_IF_THREADS)
748 {
749 Capability *free_caps[n_capabilities], *cap0;
750 nat i, n_free_caps;
751
752 // migration can be turned off with +RTS -qg
753 if (!RtsFlags.ParFlags.migrate) return;
754
755 // Check whether we have more threads on our run queue, or sparks
756 // in our pool, that we could hand to another Capability.
757 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
758 && sparkPoolSizeCap(cap) < 2) {
759 return;
760 }
761
762 // First grab as many free Capabilities as we can.
763 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
764 cap0 = &capabilities[i];
765 if (cap != cap0 && tryGrabCapability(cap0,task)) {
766 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
767 // it already has some work, we just grabbed it at
768 // the wrong moment. Or maybe it's deadlocked!
769 releaseCapability(cap0);
770 } else {
771 free_caps[n_free_caps++] = cap0;
772 }
773 }
774 }
775
776 // we now have n_free_caps free capabilities stashed in
777 // free_caps[]. Share our run queue equally with them. This is
778 // probably the simplest thing we could do; improvements we might
779 // want to do include:
780 //
781 // - giving high priority to moving relatively new threads, on
782 // the gournds that they haven't had time to build up a
783 // working set in the cache on this CPU/Capability.
784 //
785 // - giving low priority to moving long-lived threads
786
787 if (n_free_caps > 0) {
788 StgTSO *prev, *t, *next;
789 rtsBool pushed_to_all;
790
791 debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
792
793 i = 0;
794 pushed_to_all = rtsFalse;
795
796 if (cap->run_queue_hd != END_TSO_QUEUE) {
797 prev = cap->run_queue_hd;
798 t = prev->link;
799 prev->link = END_TSO_QUEUE;
800 for (; t != END_TSO_QUEUE; t = next) {
801 next = t->link;
802 t->link = END_TSO_QUEUE;
803 if (t->what_next == ThreadRelocated
804 || t->bound == task // don't move my bound thread
805 || tsoLocked(t)) { // don't move a locked thread
806 prev->link = t;
807 prev = t;
808 } else if (i == n_free_caps) {
809 pushed_to_all = rtsTrue;
810 i = 0;
811 // keep one for us
812 prev->link = t;
813 prev = t;
814 } else {
815 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
816 appendToRunQueue(free_caps[i],t);
817 if (t->bound) { t->bound->cap = free_caps[i]; }
818 t->cap = free_caps[i];
819 i++;
820 }
821 }
822 cap->run_queue_tl = prev;
823 }
824
825 // If there are some free capabilities that we didn't push any
826 // threads to, then try to push a spark to each one.
827 if (!pushed_to_all) {
828 StgClosure *spark;
829 // i is the next free capability to push to
830 for (; i < n_free_caps; i++) {
831 if (emptySparkPoolCap(free_caps[i])) {
832 spark = findSpark(cap);
833 if (spark != NULL) {
834 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
835 newSpark(&(free_caps[i]->r), spark);
836 }
837 }
838 }
839 }
840
841 // release the capabilities
842 for (i = 0; i < n_free_caps; i++) {
843 task->cap = free_caps[i];
844 releaseCapability(free_caps[i]);
845 }
846 }
847 task->cap = cap; // reset to point to our Capability.
848 }
849 #endif
850
851 /* ----------------------------------------------------------------------------
852 * Start any pending signal handlers
853 * ------------------------------------------------------------------------- */
854
855 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
856 static void
857 scheduleStartSignalHandlers(Capability *cap)
858 {
859 if (signals_pending()) { // safe outside the lock
860 startSignalHandlers(cap);
861 }
862 }
863 #else
864 static void
865 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
866 {
867 }
868 #endif
869
870 /* ----------------------------------------------------------------------------
871 * Check for blocked threads that can be woken up.
872 * ------------------------------------------------------------------------- */
873
874 static void
875 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
876 {
877 #if !defined(THREADED_RTS)
878 //
879 // Check whether any waiting threads need to be woken up. If the
880 // run queue is empty, and there are no other tasks running, we
881 // can wait indefinitely for something to happen.
882 //
883 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
884 {
885 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
886 }
887 #endif
888 }
889
890
891 /* ----------------------------------------------------------------------------
892 * Check for threads woken up by other Capabilities
893 * ------------------------------------------------------------------------- */
894
895 static void
896 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
897 {
898 #if defined(THREADED_RTS)
899 // Any threads that were woken up by other Capabilities get
900 // appended to our run queue.
901 if (!emptyWakeupQueue(cap)) {
902 ACQUIRE_LOCK(&cap->lock);
903 if (emptyRunQueue(cap)) {
904 cap->run_queue_hd = cap->wakeup_queue_hd;
905 cap->run_queue_tl = cap->wakeup_queue_tl;
906 } else {
907 cap->run_queue_tl->link = cap->wakeup_queue_hd;
908 cap->run_queue_tl = cap->wakeup_queue_tl;
909 }
910 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
911 RELEASE_LOCK(&cap->lock);
912 }
913 #endif
914 }
915
916 /* ----------------------------------------------------------------------------
917 * Check for threads blocked on BLACKHOLEs that can be woken up
918 * ------------------------------------------------------------------------- */
919 static void
920 scheduleCheckBlackHoles (Capability *cap)
921 {
922 if ( blackholes_need_checking ) // check without the lock first
923 {
924 ACQUIRE_LOCK(&sched_mutex);
925 if ( blackholes_need_checking ) {
926 checkBlackHoles(cap);
927 blackholes_need_checking = rtsFalse;
928 }
929 RELEASE_LOCK(&sched_mutex);
930 }
931 }
932
933 /* ----------------------------------------------------------------------------
934 * Detect deadlock conditions and attempt to resolve them.
935 * ------------------------------------------------------------------------- */
936
937 static void
938 scheduleDetectDeadlock (Capability *cap, Task *task)
939 {
940
941 #if defined(PARALLEL_HASKELL)
942 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
943 return;
944 #endif
945
946 /*
947 * Detect deadlock: when we have no threads to run, there are no
948 * threads blocked, waiting for I/O, or sleeping, and all the
949 * other tasks are waiting for work, we must have a deadlock of
950 * some description.
951 */
952 if ( emptyThreadQueues(cap) )
953 {
954 #if defined(THREADED_RTS)
955 /*
956 * In the threaded RTS, we only check for deadlock if there
957 * has been no activity in a complete timeslice. This means
958 * we won't eagerly start a full GC just because we don't have
959 * any threads to run currently.
960 */
961 if (recent_activity != ACTIVITY_INACTIVE) return;
962 #endif
963
964 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
965
966 // Garbage collection can release some new threads due to
967 // either (a) finalizers or (b) threads resurrected because
968 // they are unreachable and will therefore be sent an
969 // exception. Any threads thus released will be immediately
970 // runnable.
971 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/, GetRoots);
972
973 recent_activity = ACTIVITY_DONE_GC;
974
975 if ( !emptyRunQueue(cap) ) return;
976
977 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
978 /* If we have user-installed signal handlers, then wait
979 * for signals to arrive rather then bombing out with a
980 * deadlock.
981 */
982 if ( anyUserHandlers() ) {
983 debugTrace(DEBUG_sched,
984 "still deadlocked, waiting for signals...");
985
986 awaitUserSignals();
987
988 if (signals_pending()) {
989 startSignalHandlers(cap);
990 }
991
992 // either we have threads to run, or we were interrupted:
993 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
994 }
995 #endif
996
997 #if !defined(THREADED_RTS)
998 /* Probably a real deadlock. Send the current main thread the
999 * Deadlock exception.
1000 */
1001 if (task->tso) {
1002 switch (task->tso->why_blocked) {
1003 case BlockedOnSTM:
1004 case BlockedOnBlackHole:
1005 case BlockedOnException:
1006 case BlockedOnMVar:
1007 throwToSingleThreaded(cap, task->tso,
1008 (StgClosure *)NonTermination_closure);
1009 return;
1010 default:
1011 barf("deadlock: main thread blocked in a strange way");
1012 }
1013 }
1014 return;
1015 #endif
1016 }
1017 }
1018
1019 /* ----------------------------------------------------------------------------
1020 * Process an event (GRAN only)
1021 * ------------------------------------------------------------------------- */
1022
1023 #if defined(GRAN)
1024 static StgTSO *
1025 scheduleProcessEvent(rtsEvent *event)
1026 {
1027 StgTSO *t;
1028
1029 if (RtsFlags.GranFlags.Light)
1030 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1031
1032 /* adjust time based on time-stamp */
1033 if (event->time > CurrentTime[CurrentProc] &&
1034 event->evttype != ContinueThread)
1035 CurrentTime[CurrentProc] = event->time;
1036
1037 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1038 if (!RtsFlags.GranFlags.Light)
1039 handleIdlePEs();
1040
1041 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1042
1043 /* main event dispatcher in GranSim */
1044 switch (event->evttype) {
1045 /* Should just be continuing execution */
1046 case ContinueThread:
1047 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1048 /* ToDo: check assertion
1049 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1050 run_queue_hd != END_TSO_QUEUE);
1051 */
1052 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1053 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1054 procStatus[CurrentProc]==Fetching) {
1055 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1056 CurrentTSO->id, CurrentTSO, CurrentProc);
1057 goto next_thread;
1058 }
1059 /* Ignore ContinueThreads for completed threads */
1060 if (CurrentTSO->what_next == ThreadComplete) {
1061 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1062 CurrentTSO->id, CurrentTSO, CurrentProc);
1063 goto next_thread;
1064 }
1065 /* Ignore ContinueThreads for threads that are being migrated */
1066 if (PROCS(CurrentTSO)==Nowhere) {
1067 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1068 CurrentTSO->id, CurrentTSO, CurrentProc);
1069 goto next_thread;
1070 }
1071 /* The thread should be at the beginning of the run queue */
1072 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1073 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1074 CurrentTSO->id, CurrentTSO, CurrentProc);
1075 break; // run the thread anyway
1076 }
1077 /*
1078 new_event(proc, proc, CurrentTime[proc],
1079 FindWork,
1080 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1081 goto next_thread;
1082 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1083 break; // now actually run the thread; DaH Qu'vam yImuHbej
1084
1085 case FetchNode:
1086 do_the_fetchnode(event);
1087 goto next_thread; /* handle next event in event queue */
1088
1089 case GlobalBlock:
1090 do_the_globalblock(event);
1091 goto next_thread; /* handle next event in event queue */
1092
1093 case FetchReply:
1094 do_the_fetchreply(event);
1095 goto next_thread; /* handle next event in event queue */
1096
1097 case UnblockThread: /* Move from the blocked queue to the tail of */
1098 do_the_unblock(event);
1099 goto next_thread; /* handle next event in event queue */
1100
1101 case ResumeThread: /* Move from the blocked queue to the tail of */
1102 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1103 event->tso->gran.blocktime +=
1104 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1105 do_the_startthread(event);
1106 goto next_thread; /* handle next event in event queue */
1107
1108 case StartThread:
1109 do_the_startthread(event);
1110 goto next_thread; /* handle next event in event queue */
1111
1112 case MoveThread:
1113 do_the_movethread(event);
1114 goto next_thread; /* handle next event in event queue */
1115
1116 case MoveSpark:
1117 do_the_movespark(event);
1118 goto next_thread; /* handle next event in event queue */
1119
1120 case FindWork:
1121 do_the_findwork(event);
1122 goto next_thread; /* handle next event in event queue */
1123
1124 default:
1125 barf("Illegal event type %u\n", event->evttype);
1126 } /* switch */
1127
1128 /* This point was scheduler_loop in the old RTS */
1129
1130 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1131
1132 TimeOfLastEvent = CurrentTime[CurrentProc];
1133 TimeOfNextEvent = get_time_of_next_event();
1134 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1135 // CurrentTSO = ThreadQueueHd;
1136
1137 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1138 TimeOfNextEvent));
1139
1140 if (RtsFlags.GranFlags.Light)
1141 GranSimLight_leave_system(event, &ActiveTSO);
1142
1143 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1144
1145 IF_DEBUG(gran,
1146 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1147
1148 /* in a GranSim setup the TSO stays on the run queue */
1149 t = CurrentTSO;
1150 /* Take a thread from the run queue. */
1151 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1152
1153 IF_DEBUG(gran,
1154 debugBelch("GRAN: About to run current thread, which is\n");
1155 G_TSO(t,5));
1156
1157 context_switch = 0; // turned on via GranYield, checking events and time slice
1158
1159 IF_DEBUG(gran,
1160 DumpGranEvent(GR_SCHEDULE, t));
1161
1162 procStatus[CurrentProc] = Busy;
1163 }
1164 #endif // GRAN
1165
1166 /* ----------------------------------------------------------------------------
1167 * Send pending messages (PARALLEL_HASKELL only)
1168 * ------------------------------------------------------------------------- */
1169
1170 #if defined(PARALLEL_HASKELL)
1171 static StgTSO *
1172 scheduleSendPendingMessages(void)
1173 {
1174 StgSparkPool *pool;
1175 rtsSpark spark;
1176 StgTSO *t;
1177
1178 # if defined(PAR) // global Mem.Mgmt., omit for now
1179 if (PendingFetches != END_BF_QUEUE) {
1180 processFetches();
1181 }
1182 # endif
1183
1184 if (RtsFlags.ParFlags.BufferTime) {
1185 // if we use message buffering, we must send away all message
1186 // packets which have become too old...
1187 sendOldBuffers();
1188 }
1189 }
1190 #endif
1191
1192 /* ----------------------------------------------------------------------------
1193 * Activate spark threads (PARALLEL_HASKELL only)
1194 * ------------------------------------------------------------------------- */
1195
1196 #if defined(PARALLEL_HASKELL)
1197 static void
1198 scheduleActivateSpark(void)
1199 {
1200 #if defined(SPARKS)
1201 ASSERT(emptyRunQueue());
1202 /* We get here if the run queue is empty and want some work.
1203 We try to turn a spark into a thread, and add it to the run queue,
1204 from where it will be picked up in the next iteration of the scheduler
1205 loop.
1206 */
1207
1208 /* :-[ no local threads => look out for local sparks */
1209 /* the spark pool for the current PE */
1210 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1211 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1212 pool->hd < pool->tl) {
1213 /*
1214 * ToDo: add GC code check that we really have enough heap afterwards!!
1215 * Old comment:
1216 * If we're here (no runnable threads) and we have pending
1217 * sparks, we must have a space problem. Get enough space
1218 * to turn one of those pending sparks into a
1219 * thread...
1220 */
1221
1222 spark = findSpark(rtsFalse); /* get a spark */
1223 if (spark != (rtsSpark) NULL) {
1224 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1225 IF_PAR_DEBUG(fish, // schedule,
1226 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1227 tso->id, tso, advisory_thread_count));
1228
1229 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1230 IF_PAR_DEBUG(fish, // schedule,
1231 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1232 spark));
1233 return rtsFalse; /* failed to generate a thread */
1234 } /* otherwise fall through & pick-up new tso */
1235 } else {
1236 IF_PAR_DEBUG(fish, // schedule,
1237 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1238 spark_queue_len(pool)));
1239 return rtsFalse; /* failed to generate a thread */
1240 }
1241 return rtsTrue; /* success in generating a thread */
1242 } else { /* no more threads permitted or pool empty */
1243 return rtsFalse; /* failed to generateThread */
1244 }
1245 #else
1246 tso = NULL; // avoid compiler warning only
1247 return rtsFalse; /* dummy in non-PAR setup */
1248 #endif // SPARKS
1249 }
1250 #endif // PARALLEL_HASKELL
1251
1252 /* ----------------------------------------------------------------------------
1253 * Get work from a remote node (PARALLEL_HASKELL only)
1254 * ------------------------------------------------------------------------- */
1255
1256 #if defined(PARALLEL_HASKELL)
1257 static rtsBool
1258 scheduleGetRemoteWork(rtsBool *receivedFinish)
1259 {
1260 ASSERT(emptyRunQueue());
1261
1262 if (RtsFlags.ParFlags.BufferTime) {
1263 IF_PAR_DEBUG(verbose,
1264 debugBelch("...send all pending data,"));
1265 {
1266 nat i;
1267 for (i=1; i<=nPEs; i++)
1268 sendImmediately(i); // send all messages away immediately
1269 }
1270 }
1271 # ifndef SPARKS
1272 //++EDEN++ idle() , i.e. send all buffers, wait for work
1273 // suppress fishing in EDEN... just look for incoming messages
1274 // (blocking receive)
1275 IF_PAR_DEBUG(verbose,
1276 debugBelch("...wait for incoming messages...\n"));
1277 *receivedFinish = processMessages(); // blocking receive...
1278
1279 // and reenter scheduling loop after having received something
1280 // (return rtsFalse below)
1281
1282 # else /* activate SPARKS machinery */
1283 /* We get here, if we have no work, tried to activate a local spark, but still
1284 have no work. We try to get a remote spark, by sending a FISH message.
1285 Thread migration should be added here, and triggered when a sequence of
1286 fishes returns without work. */
1287 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1288
1289 /* =8-[ no local sparks => look for work on other PEs */
1290 /*
1291 * We really have absolutely no work. Send out a fish
1292 * (there may be some out there already), and wait for
1293 * something to arrive. We clearly can't run any threads
1294 * until a SCHEDULE or RESUME arrives, and so that's what
1295 * we're hoping to see. (Of course, we still have to
1296 * respond to other types of messages.)
1297 */
1298 rtsTime now = msTime() /*CURRENT_TIME*/;
1299 IF_PAR_DEBUG(verbose,
1300 debugBelch("-- now=%ld\n", now));
1301 IF_PAR_DEBUG(fish, // verbose,
1302 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1303 (last_fish_arrived_at!=0 &&
1304 last_fish_arrived_at+delay > now)) {
1305 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1306 now, last_fish_arrived_at+delay,
1307 last_fish_arrived_at,
1308 delay);
1309 });
1310
1311 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1312 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1313 if (last_fish_arrived_at==0 ||
1314 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1315 /* outstandingFishes is set in sendFish, processFish;
1316 avoid flooding system with fishes via delay */
1317 next_fish_to_send_at = 0;
1318 } else {
1319 /* ToDo: this should be done in the main scheduling loop to avoid the
1320 busy wait here; not so bad if fish delay is very small */
1321 int iq = 0; // DEBUGGING -- HWL
1322 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1323 /* send a fish when ready, but process messages that arrive in the meantime */
1324 do {
1325 if (PacketsWaiting()) {
1326 iq++; // DEBUGGING
1327 *receivedFinish = processMessages();
1328 }
1329 now = msTime();
1330 } while (!*receivedFinish || now<next_fish_to_send_at);
1331 // JB: This means the fish could become obsolete, if we receive
1332 // work. Better check for work again?
1333 // last line: while (!receivedFinish || !haveWork || now<...)
1334 // next line: if (receivedFinish || haveWork )
1335
1336 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1337 return rtsFalse; // NB: this will leave scheduler loop
1338 // immediately after return!
1339
1340 IF_PAR_DEBUG(fish, // verbose,
1341 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1342
1343 }
1344
1345 // JB: IMHO, this should all be hidden inside sendFish(...)
1346 /* pe = choosePE();
1347 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1348 NEW_FISH_HUNGER);
1349
1350 // Global statistics: count no. of fishes
1351 if (RtsFlags.ParFlags.ParStats.Global &&
1352 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1353 globalParStats.tot_fish_mess++;
1354 }
1355 */
1356
1357 /* delayed fishes must have been sent by now! */
1358 next_fish_to_send_at = 0;
1359 }
1360
1361 *receivedFinish = processMessages();
1362 # endif /* SPARKS */
1363
1364 return rtsFalse;
1365 /* NB: this function always returns rtsFalse, meaning the scheduler
1366 loop continues with the next iteration;
1367 rationale:
1368 return code means success in finding work; we enter this function
1369 if there is no local work, thus have to send a fish which takes
1370 time until it arrives with work; in the meantime we should process
1371 messages in the main loop;
1372 */
1373 }
1374 #endif // PARALLEL_HASKELL
1375
1376 /* ----------------------------------------------------------------------------
1377 * PAR/GRAN: Report stats & debugging info(?)
1378 * ------------------------------------------------------------------------- */
1379
1380 #if defined(PAR) || defined(GRAN)
1381 static void
1382 scheduleGranParReport(void)
1383 {
1384 ASSERT(run_queue_hd != END_TSO_QUEUE);
1385
1386 /* Take a thread from the run queue, if we have work */
1387 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1388
1389 /* If this TSO has got its outport closed in the meantime,
1390 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1391 * It has to be marked as TH_DEAD for this purpose.
1392 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1393
1394 JB: TODO: investigate wether state change field could be nuked
1395 entirely and replaced by the normal tso state (whatnext
1396 field). All we want to do is to kill tsos from outside.
1397 */
1398
1399 /* ToDo: write something to the log-file
1400 if (RTSflags.ParFlags.granSimStats && !sameThread)
1401 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1402
1403 CurrentTSO = t;
1404 */
1405 /* the spark pool for the current PE */
1406 pool = &(cap.r.rSparks); // cap = (old) MainCap
1407
1408 IF_DEBUG(scheduler,
1409 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1410 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1411
1412 IF_PAR_DEBUG(fish,
1413 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1414 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1415
1416 if (RtsFlags.ParFlags.ParStats.Full &&
1417 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1418 (emitSchedule || // forced emit
1419 (t && LastTSO && t->id != LastTSO->id))) {
1420 /*
1421 we are running a different TSO, so write a schedule event to log file
1422 NB: If we use fair scheduling we also have to write a deschedule
1423 event for LastTSO; with unfair scheduling we know that the
1424 previous tso has blocked whenever we switch to another tso, so
1425 we don't need it in GUM for now
1426 */
1427 IF_PAR_DEBUG(fish, // schedule,
1428 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1429
1430 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1431 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1432 emitSchedule = rtsFalse;
1433 }
1434 }
1435 #endif
1436
1437 /* ----------------------------------------------------------------------------
1438 * After running a thread...
1439 * ------------------------------------------------------------------------- */
1440
1441 static void
1442 schedulePostRunThread(void)
1443 {
1444 #if defined(PAR)
1445 /* HACK 675: if the last thread didn't yield, make sure to print a
1446 SCHEDULE event to the log file when StgRunning the next thread, even
1447 if it is the same one as before */
1448 LastTSO = t;
1449 TimeOfLastYield = CURRENT_TIME;
1450 #endif
1451
1452 /* some statistics gathering in the parallel case */
1453
1454 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1455 switch (ret) {
1456 case HeapOverflow:
1457 # if defined(GRAN)
1458 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1459 globalGranStats.tot_heapover++;
1460 # elif defined(PAR)
1461 globalParStats.tot_heapover++;
1462 # endif
1463 break;
1464
1465 case StackOverflow:
1466 # if defined(GRAN)
1467 IF_DEBUG(gran,
1468 DumpGranEvent(GR_DESCHEDULE, t));
1469 globalGranStats.tot_stackover++;
1470 # elif defined(PAR)
1471 // IF_DEBUG(par,
1472 // DumpGranEvent(GR_DESCHEDULE, t);
1473 globalParStats.tot_stackover++;
1474 # endif
1475 break;
1476
1477 case ThreadYielding:
1478 # if defined(GRAN)
1479 IF_DEBUG(gran,
1480 DumpGranEvent(GR_DESCHEDULE, t));
1481 globalGranStats.tot_yields++;
1482 # elif defined(PAR)
1483 // IF_DEBUG(par,
1484 // DumpGranEvent(GR_DESCHEDULE, t);
1485 globalParStats.tot_yields++;
1486 # endif
1487 break;
1488
1489 case ThreadBlocked:
1490 # if defined(GRAN)
1491 debugTrace(DEBUG_sched,
1492 "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1493 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1494 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1495 if (t->block_info.closure!=(StgClosure*)NULL)
1496 print_bq(t->block_info.closure);
1497 debugBelch("\n"));
1498
1499 // ??? needed; should emit block before
1500 IF_DEBUG(gran,
1501 DumpGranEvent(GR_DESCHEDULE, t));
1502 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1503 /*
1504 ngoq Dogh!
1505 ASSERT(procStatus[CurrentProc]==Busy ||
1506 ((procStatus[CurrentProc]==Fetching) &&
1507 (t->block_info.closure!=(StgClosure*)NULL)));
1508 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1509 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1510 procStatus[CurrentProc]==Fetching))
1511 procStatus[CurrentProc] = Idle;
1512 */
1513 # elif defined(PAR)
1514 //++PAR++ blockThread() writes the event (change?)
1515 # endif
1516 break;
1517
1518 case ThreadFinished:
1519 break;
1520
1521 default:
1522 barf("parGlobalStats: unknown return code");
1523 break;
1524 }
1525 #endif
1526 }
1527
1528 /* -----------------------------------------------------------------------------
1529 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1530 * -------------------------------------------------------------------------- */
1531
1532 static rtsBool
1533 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1534 {
1535 // did the task ask for a large block?
1536 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1537 // if so, get one and push it on the front of the nursery.
1538 bdescr *bd;
1539 lnat blocks;
1540
1541 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1542
1543 debugTrace(DEBUG_sched,
1544 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1545 (long)t->id, whatNext_strs[t->what_next], blocks);
1546
1547 // don't do this if the nursery is (nearly) full, we'll GC first.
1548 if (cap->r.rCurrentNursery->link != NULL ||
1549 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1550 // if the nursery has only one block.
1551
1552 ACQUIRE_SM_LOCK
1553 bd = allocGroup( blocks );
1554 RELEASE_SM_LOCK
1555 cap->r.rNursery->n_blocks += blocks;
1556
1557 // link the new group into the list
1558 bd->link = cap->r.rCurrentNursery;
1559 bd->u.back = cap->r.rCurrentNursery->u.back;
1560 if (cap->r.rCurrentNursery->u.back != NULL) {
1561 cap->r.rCurrentNursery->u.back->link = bd;
1562 } else {
1563 #if !defined(THREADED_RTS)
1564 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1565 g0s0 == cap->r.rNursery);
1566 #endif
1567 cap->r.rNursery->blocks = bd;
1568 }
1569 cap->r.rCurrentNursery->u.back = bd;
1570
1571 // initialise it as a nursery block. We initialise the
1572 // step, gen_no, and flags field of *every* sub-block in
1573 // this large block, because this is easier than making
1574 // sure that we always find the block head of a large
1575 // block whenever we call Bdescr() (eg. evacuate() and
1576 // isAlive() in the GC would both have to do this, at
1577 // least).
1578 {
1579 bdescr *x;
1580 for (x = bd; x < bd + blocks; x++) {
1581 x->step = cap->r.rNursery;
1582 x->gen_no = 0;
1583 x->flags = 0;
1584 }
1585 }
1586
1587 // This assert can be a killer if the app is doing lots
1588 // of large block allocations.
1589 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1590
1591 // now update the nursery to point to the new block
1592 cap->r.rCurrentNursery = bd;
1593
1594 // we might be unlucky and have another thread get on the
1595 // run queue before us and steal the large block, but in that
1596 // case the thread will just end up requesting another large
1597 // block.
1598 pushOnRunQueue(cap,t);
1599 return rtsFalse; /* not actually GC'ing */
1600 }
1601 }
1602
1603 debugTrace(DEBUG_sched,
1604 "--<< thread %ld (%s) stopped: HeapOverflow\n",
1605 (long)t->id, whatNext_strs[t->what_next]);
1606
1607 #if defined(GRAN)
1608 ASSERT(!is_on_queue(t,CurrentProc));
1609 #elif defined(PARALLEL_HASKELL)
1610 /* Currently we emit a DESCHEDULE event before GC in GUM.
1611 ToDo: either add separate event to distinguish SYSTEM time from rest
1612 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1613 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1614 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1615 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1616 emitSchedule = rtsTrue;
1617 }
1618 #endif
1619
1620 pushOnRunQueue(cap,t);
1621 return rtsTrue;
1622 /* actual GC is done at the end of the while loop in schedule() */
1623 }
1624
1625 /* -----------------------------------------------------------------------------
1626 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1627 * -------------------------------------------------------------------------- */
1628
1629 static void
1630 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1631 {
1632 debugTrace (DEBUG_sched,
1633 "--<< thread %ld (%s) stopped, StackOverflow",
1634 (long)t->id, whatNext_strs[t->what_next]);
1635
1636 /* just adjust the stack for this thread, then pop it back
1637 * on the run queue.
1638 */
1639 {
1640 /* enlarge the stack */
1641 StgTSO *new_t = threadStackOverflow(cap, t);
1642
1643 /* The TSO attached to this Task may have moved, so update the
1644 * pointer to it.
1645 */
1646 if (task->tso == t) {
1647 task->tso = new_t;
1648 }
1649 pushOnRunQueue(cap,new_t);
1650 }
1651 }
1652
1653 /* -----------------------------------------------------------------------------
1654 * Handle a thread that returned to the scheduler with ThreadYielding
1655 * -------------------------------------------------------------------------- */
1656
1657 static rtsBool
1658 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1659 {
1660 // Reset the context switch flag. We don't do this just before
1661 // running the thread, because that would mean we would lose ticks
1662 // during GC, which can lead to unfair scheduling (a thread hogs
1663 // the CPU because the tick always arrives during GC). This way
1664 // penalises threads that do a lot of allocation, but that seems
1665 // better than the alternative.
1666 context_switch = 0;
1667
1668 /* put the thread back on the run queue. Then, if we're ready to
1669 * GC, check whether this is the last task to stop. If so, wake
1670 * up the GC thread. getThread will block during a GC until the
1671 * GC is finished.
1672 */
1673 #ifdef DEBUG
1674 if (t->what_next != prev_what_next) {
1675 debugTrace(DEBUG_sched,
1676 "--<< thread %ld (%s) stopped to switch evaluators",
1677 (long)t->id, whatNext_strs[t->what_next]);
1678 } else {
1679 debugTrace(DEBUG_sched,
1680 "--<< thread %ld (%s) stopped, yielding",
1681 (long)t->id, whatNext_strs[t->what_next]);
1682 }
1683 #endif
1684
1685 IF_DEBUG(sanity,
1686 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1687 checkTSO(t));
1688 ASSERT(t->link == END_TSO_QUEUE);
1689
1690 // Shortcut if we're just switching evaluators: don't bother
1691 // doing stack squeezing (which can be expensive), just run the
1692 // thread.
1693 if (t->what_next != prev_what_next) {
1694 return rtsTrue;
1695 }
1696
1697 #if defined(GRAN)
1698 ASSERT(!is_on_queue(t,CurrentProc));
1699
1700 IF_DEBUG(sanity,
1701 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1702 checkThreadQsSanity(rtsTrue));
1703
1704 #endif
1705
1706 addToRunQueue(cap,t);
1707
1708 #if defined(GRAN)
1709 /* add a ContinueThread event to actually process the thread */
1710 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1711 ContinueThread,
1712 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1713 IF_GRAN_DEBUG(bq,
1714 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1715 G_EVENTQ(0);
1716 G_CURR_THREADQ(0));
1717 #endif
1718 return rtsFalse;
1719 }
1720
1721 /* -----------------------------------------------------------------------------
1722 * Handle a thread that returned to the scheduler with ThreadBlocked
1723 * -------------------------------------------------------------------------- */
1724
1725 static void
1726 scheduleHandleThreadBlocked( StgTSO *t
1727 #if !defined(GRAN) && !defined(DEBUG)
1728 STG_UNUSED
1729 #endif
1730 )
1731 {
1732 #if defined(GRAN)
1733 IF_DEBUG(scheduler,
1734 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1735 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1736 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1737
1738 // ??? needed; should emit block before
1739 IF_DEBUG(gran,
1740 DumpGranEvent(GR_DESCHEDULE, t));
1741 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1742 /*
1743 ngoq Dogh!
1744 ASSERT(procStatus[CurrentProc]==Busy ||
1745 ((procStatus[CurrentProc]==Fetching) &&
1746 (t->block_info.closure!=(StgClosure*)NULL)));
1747 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1748 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1749 procStatus[CurrentProc]==Fetching))
1750 procStatus[CurrentProc] = Idle;
1751 */
1752 #elif defined(PAR)
1753 IF_DEBUG(scheduler,
1754 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1755 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1756 IF_PAR_DEBUG(bq,
1757
1758 if (t->block_info.closure!=(StgClosure*)NULL)
1759 print_bq(t->block_info.closure));
1760
1761 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1762 blockThread(t);
1763
1764 /* whatever we schedule next, we must log that schedule */
1765 emitSchedule = rtsTrue;
1766
1767 #else /* !GRAN */
1768
1769 // We don't need to do anything. The thread is blocked, and it
1770 // has tidied up its stack and placed itself on whatever queue
1771 // it needs to be on.
1772
1773 #if !defined(THREADED_RTS)
1774 ASSERT(t->why_blocked != NotBlocked);
1775 // This might not be true under THREADED_RTS: we don't have
1776 // exclusive access to this TSO, so someone might have
1777 // woken it up by now. This actually happens: try
1778 // conc023 +RTS -N2.
1779 #endif
1780
1781 #ifdef DEBUG
1782 if (traceClass(DEBUG_sched)) {
1783 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1784 (unsigned long)t->id, whatNext_strs[t->what_next]);
1785 printThreadBlockage(t);
1786 debugTraceEnd();
1787 }
1788 #endif
1789
1790 /* Only for dumping event to log file
1791 ToDo: do I need this in GranSim, too?
1792 blockThread(t);
1793 */
1794 #endif
1795 }
1796
1797 /* -----------------------------------------------------------------------------
1798 * Handle a thread that returned to the scheduler with ThreadFinished
1799 * -------------------------------------------------------------------------- */
1800
1801 static rtsBool
1802 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1803 {
1804 /* Need to check whether this was a main thread, and if so,
1805 * return with the return value.
1806 *
1807 * We also end up here if the thread kills itself with an
1808 * uncaught exception, see Exception.cmm.
1809 */
1810 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1811 (unsigned long)t->id, whatNext_strs[t->what_next]);
1812
1813 #if defined(GRAN)
1814 endThread(t, CurrentProc); // clean-up the thread
1815 #elif defined(PARALLEL_HASKELL)
1816 /* For now all are advisory -- HWL */
1817 //if(t->priority==AdvisoryPriority) ??
1818 advisory_thread_count--; // JB: Caution with this counter, buggy!
1819
1820 # if defined(DIST)
1821 if(t->dist.priority==RevalPriority)
1822 FinishReval(t);
1823 # endif
1824
1825 # if defined(EDENOLD)
1826 // the thread could still have an outport... (BUG)
1827 if (t->eden.outport != -1) {
1828 // delete the outport for the tso which has finished...
1829 IF_PAR_DEBUG(eden_ports,
1830 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1831 t->eden.outport, t->id));
1832 deleteOPT(t);
1833 }
1834 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1835 if (t->eden.epid != -1) {
1836 IF_PAR_DEBUG(eden_ports,
1837 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1838 t->id, t->eden.epid));
1839 removeTSOfromProcess(t);
1840 }
1841 # endif
1842
1843 # if defined(PAR)
1844 if (RtsFlags.ParFlags.ParStats.Full &&
1845 !RtsFlags.ParFlags.ParStats.Suppressed)
1846 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1847
1848 // t->par only contains statistics: left out for now...
1849 IF_PAR_DEBUG(fish,
1850 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1851 t->id,t,t->par.sparkname));
1852 # endif
1853 #endif // PARALLEL_HASKELL
1854
1855 //
1856 // Check whether the thread that just completed was a bound
1857 // thread, and if so return with the result.
1858 //
1859 // There is an assumption here that all thread completion goes
1860 // through this point; we need to make sure that if a thread
1861 // ends up in the ThreadKilled state, that it stays on the run
1862 // queue so it can be dealt with here.
1863 //
1864
1865 if (t->bound) {
1866
1867 if (t->bound != task) {
1868 #if !defined(THREADED_RTS)
1869 // Must be a bound thread that is not the topmost one. Leave
1870 // it on the run queue until the stack has unwound to the
1871 // point where we can deal with this. Leaving it on the run
1872 // queue also ensures that the garbage collector knows about
1873 // this thread and its return value (it gets dropped from the
1874 // all_threads list so there's no other way to find it).
1875 appendToRunQueue(cap,t);
1876 return rtsFalse;
1877 #else
1878 // this cannot happen in the threaded RTS, because a
1879 // bound thread can only be run by the appropriate Task.
1880 barf("finished bound thread that isn't mine");
1881 #endif
1882 }
1883
1884 ASSERT(task->tso == t);
1885
1886 if (t->what_next == ThreadComplete) {
1887 if (task->ret) {
1888 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1889 *(task->ret) = (StgClosure *)task->tso->sp[1];
1890 }
1891 task->stat = Success;
1892 } else {
1893 if (task->ret) {
1894 *(task->ret) = NULL;
1895 }
1896 if (sched_state >= SCHED_INTERRUPTING) {
1897 task->stat = Interrupted;
1898 } else {
1899 task->stat = Killed;
1900 }
1901 }
1902 #ifdef DEBUG
1903 removeThreadLabel((StgWord)task->tso->id);
1904 #endif
1905 return rtsTrue; // tells schedule() to return
1906 }
1907
1908 return rtsFalse;
1909 }
1910
1911 /* -----------------------------------------------------------------------------
1912 * Perform a heap census, if PROFILING
1913 * -------------------------------------------------------------------------- */
1914
1915 static rtsBool
1916 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1917 {
1918 #if defined(PROFILING)
1919 // When we have +RTS -i0 and we're heap profiling, do a census at
1920 // every GC. This lets us get repeatable runs for debugging.
1921 if (performHeapProfile ||
1922 (RtsFlags.ProfFlags.profileInterval==0 &&
1923 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1924
1925 // checking black holes is necessary before GC, otherwise
1926 // there may be threads that are unreachable except by the
1927 // blackhole queue, which the GC will consider to be
1928 // deadlocked.
1929 scheduleCheckBlackHoles(&MainCapability);
1930
1931 debugTrace(DEBUG_sched, "garbage collecting before heap census");
1932 GarbageCollect(GetRoots, rtsTrue);
1933
1934 debugTrace(DEBUG_sched, "performing heap census");
1935 heapCensus();
1936
1937 performHeapProfile = rtsFalse;
1938 return rtsTrue; // true <=> we already GC'd
1939 }
1940 #endif
1941 return rtsFalse;
1942 }
1943
1944 /* -----------------------------------------------------------------------------
1945 * Perform a garbage collection if necessary
1946 * -------------------------------------------------------------------------- */
1947
1948 static Capability *
1949 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1950 rtsBool force_major, void (*get_roots)(evac_fn))
1951 {
1952 StgTSO *t;
1953 #ifdef THREADED_RTS
1954 static volatile StgWord waiting_for_gc;
1955 rtsBool was_waiting;
1956 nat i;
1957 #endif
1958
1959 #ifdef THREADED_RTS
1960 // In order to GC, there must be no threads running Haskell code.
1961 // Therefore, the GC thread needs to hold *all* the capabilities,
1962 // and release them after the GC has completed.
1963 //
1964 // This seems to be the simplest way: previous attempts involved
1965 // making all the threads with capabilities give up their
1966 // capabilities and sleep except for the *last* one, which
1967 // actually did the GC. But it's quite hard to arrange for all
1968 // the other tasks to sleep and stay asleep.
1969 //
1970
1971 was_waiting = cas(&waiting_for_gc, 0, 1);
1972 if (was_waiting) {
1973 do {
1974 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1975 if (cap) yieldCapability(&cap,task);
1976 } while (waiting_for_gc);
1977 return cap; // NOTE: task->cap might have changed here
1978 }
1979
1980 for (i=0; i < n_capabilities; i++) {
1981 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1982 if (cap != &capabilities[i]) {
1983 Capability *pcap = &capabilities[i];
1984 // we better hope this task doesn't get migrated to
1985 // another Capability while we're waiting for this one.
1986 // It won't, because load balancing happens while we have
1987 // all the Capabilities, but even so it's a slightly
1988 // unsavoury invariant.
1989 task->cap = pcap;
1990 context_switch = 1;
1991 waitForReturnCapability(&pcap, task);
1992 if (pcap != &capabilities[i]) {
1993 barf("scheduleDoGC: got the wrong capability");
1994 }
1995 }
1996 }
1997
1998 waiting_for_gc = rtsFalse;
1999 #endif
2000
2001 /* Kick any transactions which are invalid back to their
2002 * atomically frames. When next scheduled they will try to
2003 * commit, this commit will fail and they will retry.
2004 */
2005 {
2006 StgTSO *next;
2007
2008 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2009 if (t->what_next == ThreadRelocated) {
2010 next = t->link;
2011 } else {
2012 next = t->global_link;
2013
2014 // This is a good place to check for blocked
2015 // exceptions. It might be the case that a thread is
2016 // blocked on delivering an exception to a thread that
2017 // is also blocked - we try to ensure that this
2018 // doesn't happen in throwTo(), but it's too hard (or
2019 // impossible) to close all the race holes, so we
2020 // accept that some might get through and deal with
2021 // them here. A GC will always happen at some point,
2022 // even if the system is otherwise deadlocked.
2023 maybePerformBlockedException (&capabilities[0], t);
2024
2025 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2026 if (!stmValidateNestOfTransactions (t -> trec)) {
2027 debugTrace(DEBUG_sched | DEBUG_stm,
2028 "trec %p found wasting its time", t);
2029
2030 // strip the stack back to the
2031 // ATOMICALLY_FRAME, aborting the (nested)
2032 // transaction, and saving the stack of any
2033 // partially-evaluated thunks on the heap.
2034 throwToSingleThreaded_(&capabilities[0], t,
2035 NULL, rtsTrue, NULL);
2036
2037 #ifdef REG_R1
2038 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2039 #endif
2040 }
2041 }
2042 }
2043 }
2044 }
2045
2046 // so this happens periodically:
2047 if (cap) scheduleCheckBlackHoles(cap);
2048
2049 IF_DEBUG(scheduler, printAllThreads());
2050
2051 /*
2052 * We now have all the capabilities; if we're in an interrupting
2053 * state, then we should take the opportunity to delete all the
2054 * threads in the system.
2055 */
2056 if (sched_state >= SCHED_INTERRUPTING) {
2057 deleteAllThreads(&capabilities[0]);
2058 sched_state = SCHED_SHUTTING_DOWN;
2059 }
2060
2061 /* everybody back, start the GC.
2062 * Could do it in this thread, or signal a condition var
2063 * to do it in another thread. Either way, we need to
2064 * broadcast on gc_pending_cond afterward.
2065 */
2066 #if defined(THREADED_RTS)
2067 debugTrace(DEBUG_sched, "doing GC");
2068 #endif
2069 GarbageCollect(get_roots, force_major);
2070
2071 #if defined(THREADED_RTS)
2072 // release our stash of capabilities.
2073 for (i = 0; i < n_capabilities; i++) {
2074 if (cap != &capabilities[i]) {
2075 task->cap = &capabilities[i];
2076 releaseCapability(&capabilities[i]);
2077 }
2078 }
2079 if (cap) {
2080 task->cap = cap;
2081 } else {
2082 task->cap = NULL;
2083 }
2084 #endif
2085
2086 #if defined(GRAN)
2087 /* add a ContinueThread event to continue execution of current thread */
2088 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2089 ContinueThread,
2090 t, (StgClosure*)NULL, (rtsSpark*)NULL);
2091 IF_GRAN_DEBUG(bq,
2092 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2093 G_EVENTQ(0);
2094 G_CURR_THREADQ(0));
2095 #endif /* GRAN */
2096
2097 return cap;
2098 }
2099
2100 /* ---------------------------------------------------------------------------
2101 * Singleton fork(). Do not copy any running threads.
2102 * ------------------------------------------------------------------------- */
2103
2104 StgInt
2105 forkProcess(HsStablePtr *entry
2106 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2107 STG_UNUSED
2108 #endif
2109 )
2110 {
2111 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2112 Task *task;
2113 pid_t pid;
2114 StgTSO* t,*next;
2115 Capability *cap;
2116
2117 #if defined(THREADED_RTS)
2118 if (RtsFlags.ParFlags.nNodes > 1) {
2119 errorBelch("forking not supported with +RTS -N<n> greater than 1");
2120 stg_exit(EXIT_FAILURE);
2121 }
2122 #endif
2123
2124 debugTrace(DEBUG_sched, "forking!");
2125
2126 // ToDo: for SMP, we should probably acquire *all* the capabilities
2127 cap = rts_lock();
2128
2129 pid = fork();
2130
2131 if (pid) { // parent
2132
2133 // just return the pid
2134 rts_unlock(cap);
2135 return pid;
2136
2137 } else { // child
2138
2139 // Now, all OS threads except the thread that forked are
2140 // stopped. We need to stop all Haskell threads, including
2141 // those involved in foreign calls. Also we need to delete
2142 // all Tasks, because they correspond to OS threads that are
2143 // now gone.
2144
2145 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2146 if (t->what_next == ThreadRelocated) {
2147 next = t->link;
2148 } else {
2149 next = t->global_link;
2150 // don't allow threads to catch the ThreadKilled
2151 // exception, but we do want to raiseAsync() because these
2152 // threads may be evaluating thunks that we need later.
2153 deleteThread_(cap,t);
2154 }
2155 }
2156
2157 // Empty the run queue. It seems tempting to let all the
2158 // killed threads stay on the run queue as zombies to be
2159 // cleaned up later, but some of them correspond to bound
2160 // threads for which the corresponding Task does not exist.
2161 cap->run_queue_hd = END_TSO_QUEUE;
2162 cap->run_queue_tl = END_TSO_QUEUE;
2163
2164 // Any suspended C-calling Tasks are no more, their OS threads
2165 // don't exist now:
2166 cap->suspended_ccalling_tasks = NULL;
2167
2168 // Empty the all_threads list. Otherwise, the garbage
2169 // collector may attempt to resurrect some of these threads.
2170 all_threads = END_TSO_QUEUE;
2171
2172 // Wipe the task list, except the current Task.
2173 ACQUIRE_LOCK(&sched_mutex);
2174 for (task = all_tasks; task != NULL; task=task->all_link) {
2175 if (task != cap->running_task) {
2176 discardTask(task);
2177 }
2178 }
2179 RELEASE_LOCK(&sched_mutex);
2180
2181 #if defined(THREADED_RTS)
2182 // Wipe our spare workers list, they no longer exist. New
2183 // workers will be created if necessary.
2184 cap->spare_workers = NULL;
2185 cap->returning_tasks_hd = NULL;
2186 cap->returning_tasks_tl = NULL;
2187 #endif
2188
2189 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2190 rts_checkSchedStatus("forkProcess",cap);
2191
2192 rts_unlock(cap);
2193 hs_exit(); // clean up and exit
2194 stg_exit(EXIT_SUCCESS);
2195 }
2196 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2197 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2198 return -1;
2199 #endif
2200 }
2201
2202 /* ---------------------------------------------------------------------------
2203 * Delete all the threads in the system
2204 * ------------------------------------------------------------------------- */
2205
2206 static void
2207 deleteAllThreads ( Capability *cap )
2208 {
2209 // NOTE: only safe to call if we own all capabilities.
2210
2211 StgTSO* t, *next;
2212 debugTrace(DEBUG_sched,"deleting all threads");
2213 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2214 if (t->what_next == ThreadRelocated) {
2215 next = t->link;
2216 } else {
2217 next = t->global_link;
2218 deleteThread(cap,t);
2219 }
2220 }
2221
2222 // The run queue now contains a bunch of ThreadKilled threads. We
2223 // must not throw these away: the main thread(s) will be in there
2224 // somewhere, and the main scheduler loop has to deal with it.
2225 // Also, the run queue is the only thing keeping these threads from
2226 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2227
2228 #if !defined(THREADED_RTS)
2229 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2230 ASSERT(sleeping_queue == END_TSO_QUEUE);
2231 #endif
2232 }
2233
2234 /* -----------------------------------------------------------------------------
2235 Managing the suspended_ccalling_tasks list.
2236 Locks required: sched_mutex
2237 -------------------------------------------------------------------------- */
2238
2239 STATIC_INLINE void
2240 suspendTask (Capability *cap, Task *task)
2241 {
2242 ASSERT(task->next == NULL && task->prev == NULL);
2243 task->next = cap->suspended_ccalling_tasks;
2244 task->prev = NULL;
2245 if (cap->suspended_ccalling_tasks) {
2246 cap->suspended_ccalling_tasks->prev = task;
2247 }
2248 cap->suspended_ccalling_tasks = task;
2249 }
2250
2251 STATIC_INLINE void
2252 recoverSuspendedTask (Capability *cap, Task *task)
2253 {
2254 if (task->prev) {
2255 task->prev->next = task->next;
2256 } else {
2257 ASSERT(cap->suspended_ccalling_tasks == task);
2258 cap->suspended_ccalling_tasks = task->next;
2259 }
2260 if (task->next) {
2261 task->next->prev = task->prev;
2262 }
2263 task->next = task->prev = NULL;
2264 }
2265
2266 /* ---------------------------------------------------------------------------
2267 * Suspending & resuming Haskell threads.
2268 *
2269 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2270 * its capability before calling the C function. This allows another
2271 * task to pick up the capability and carry on running Haskell
2272 * threads. It also means that if the C call blocks, it won't lock
2273 * the whole system.
2274 *
2275 * The Haskell thread making the C call is put to sleep for the
2276 * duration of the call, on the susepended_ccalling_threads queue. We
2277 * give out a token to the task, which it can use to resume the thread
2278 * on return from the C function.
2279 * ------------------------------------------------------------------------- */
2280
2281 void *
2282 suspendThread (StgRegTable *reg)
2283 {
2284 Capability *cap;
2285 int saved_errno = errno;
2286 StgTSO *tso;
2287 Task *task;
2288
2289 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2290 */
2291 cap = regTableToCapability(reg);
2292
2293 task = cap->running_task;
2294 tso = cap->r.rCurrentTSO;
2295
2296 debugTrace(DEBUG_sched,
2297 "thread %lu did a safe foreign call",
2298 (unsigned long)cap->r.rCurrentTSO->id);
2299
2300 // XXX this might not be necessary --SDM
2301 tso->what_next = ThreadRunGHC;
2302
2303 threadPaused(cap,tso);
2304
2305 if ((tso->flags & TSO_BLOCKEX) == 0) {
2306 tso->why_blocked = BlockedOnCCall;
2307 tso->flags |= TSO_BLOCKEX;
2308 tso->flags &= ~TSO_INTERRUPTIBLE;
2309 } else {
2310 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2311 }
2312
2313 // Hand back capability
2314 task->suspended_tso = tso;
2315
2316 ACQUIRE_LOCK(&cap->lock);
2317
2318 suspendTask(cap,task);
2319 cap->in_haskell = rtsFalse;
2320 releaseCapability_(cap);
2321
2322 RELEASE_LOCK(&cap->lock);
2323
2324 #if defined(THREADED_RTS)
2325 /* Preparing to leave the RTS, so ensure there's a native thread/task
2326 waiting to take over.
2327 */
2328 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2329 #endif
2330
2331 errno = saved_errno;
2332 return task;
2333 }
2334
2335 StgRegTable *
2336 resumeThread (void *task_)
2337 {
2338 StgTSO *tso;
2339 Capability *cap;
2340 int saved_errno = errno;
2341 Task *task = task_;
2342
2343 cap = task->cap;
2344 // Wait for permission to re-enter the RTS with the result.
2345 waitForReturnCapability(&cap,task);
2346 // we might be on a different capability now... but if so, our
2347 // entry on the suspended_ccalling_tasks list will also have been
2348 // migrated.
2349
2350 // Remove the thread from the suspended list
2351 recoverSuspendedTask(cap,task);
2352
2353 tso = task->suspended_tso;
2354 task->suspended_tso = NULL;
2355 tso->link = END_TSO_QUEUE;
2356 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2357
2358 if (tso->why_blocked == BlockedOnCCall) {
2359 awakenBlockedExceptionQueue(cap,tso);
2360 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2361 }
2362
2363 /* Reset blocking status */
2364 tso->why_blocked = NotBlocked;
2365
2366 cap->r.rCurrentTSO = tso;
2367 cap->in_haskell = rtsTrue;
2368 errno = saved_errno;
2369
2370 /* We might have GC'd, mark the TSO dirty again */
2371 dirtyTSO(tso);
2372
2373 IF_DEBUG(sanity, checkTSO(tso));
2374
2375 return &cap->r;
2376 }
2377
2378 /* ---------------------------------------------------------------------------
2379 * scheduleThread()
2380 *
2381 * scheduleThread puts a thread on the end of the runnable queue.
2382 * This will usually be done immediately after a thread is created.
2383 * The caller of scheduleThread must create the thread using e.g.
2384 * createThread and push an appropriate closure
2385 * on this thread's stack before the scheduler is invoked.
2386 * ------------------------------------------------------------------------ */
2387
2388 void
2389 scheduleThread(Capability *cap, StgTSO *tso)
2390 {
2391 // The thread goes at the *end* of the run-queue, to avoid possible
2392 // starvation of any threads already on the queue.
2393 appendToRunQueue(cap,tso);
2394 }
2395
2396 void
2397 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2398 {
2399 #if defined(THREADED_RTS)
2400 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2401 // move this thread from now on.
2402 cpu %= RtsFlags.ParFlags.nNodes;
2403 if (cpu == cap->no) {
2404 appendToRunQueue(cap,tso);
2405 } else {
2406 migrateThreadToCapability_lock(&capabilities[cpu],tso);
2407 }
2408 #else
2409 appendToRunQueue(cap,tso);
2410 #endif
2411 }
2412
2413 Capability *
2414 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2415 {
2416 Task *task;
2417
2418 // We already created/initialised the Task
2419 task = cap->running_task;
2420
2421 // This TSO is now a bound thread; make the Task and TSO
2422 // point to each other.
2423 tso->bound = task;
2424 tso->cap = cap;
2425
2426 task->tso = tso;
2427 task->ret = ret;
2428 task->stat = NoStatus;
2429
2430 appendToRunQueue(cap,tso);
2431
2432 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2433
2434 #if defined(GRAN)
2435 /* GranSim specific init */
2436 CurrentTSO = m->tso; // the TSO to run
2437 procStatus[MainProc] = Busy; // status of main PE
2438 CurrentProc = MainProc; // PE to run it on
2439 #endif
2440
2441 cap = schedule(cap,task);
2442
2443 ASSERT(task->stat != NoStatus);
2444 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2445
2446 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2447 return cap;
2448 }
2449
2450 /* ----------------------------------------------------------------------------
2451 * Starting Tasks
2452 * ------------------------------------------------------------------------- */
2453
2454 #if defined(THREADED_RTS)
2455 void
2456 workerStart(Task *task)
2457 {
2458 Capability *cap;
2459
2460 // See startWorkerTask().
2461 ACQUIRE_LOCK(&task->lock);
2462 cap = task->cap;
2463 RELEASE_LOCK(&task->lock);
2464
2465 // set the thread-local pointer to the Task:
2466 taskEnter(task);
2467
2468 // schedule() runs without a lock.
2469 cap = schedule(cap,task);
2470
2471 // On exit from schedule(), we have a Capability.
2472 releaseCapability(cap);
2473 workerTaskStop(task);
2474 }
2475 #endif
2476
2477 /* ---------------------------------------------------------------------------
2478 * initScheduler()
2479 *
2480 * Initialise the scheduler. This resets all the queues - if the
2481 * queues contained any threads, they'll be garbage collected at the
2482 * next pass.
2483 *
2484 * ------------------------------------------------------------------------ */
2485
2486 void
2487 initScheduler(void)
2488 {
2489 #if defined(GRAN)
2490 nat i;
2491 for (i=0; i<=MAX_PROC; i++) {
2492 run_queue_hds[i] = END_TSO_QUEUE;
2493 run_queue_tls[i] = END_TSO_QUEUE;
2494 blocked_queue_hds[i] = END_TSO_QUEUE;
2495 blocked_queue_tls[i] = END_TSO_QUEUE;
2496 ccalling_threadss[i] = END_TSO_QUEUE;
2497 blackhole_queue[i] = END_TSO_QUEUE;
2498 sleeping_queue = END_TSO_QUEUE;
2499 }
2500 #elif !defined(THREADED_RTS)
2501 blocked_queue_hd = END_TSO_QUEUE;
2502 blocked_queue_tl = END_TSO_QUEUE;
2503 sleeping_queue = END_TSO_QUEUE;
2504 #endif
2505
2506 blackhole_queue = END_TSO_QUEUE;
2507 all_threads = END_TSO_QUEUE;
2508
2509 context_switch = 0;
2510 sched_state = SCHED_RUNNING;
2511
2512 RtsFlags.ConcFlags.ctxtSwitchTicks =
2513 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2514
2515 #if defined(THREADED_RTS)
2516 /* Initialise the mutex and condition variables used by
2517 * the scheduler. */
2518 initMutex(&sched_mutex);
2519 #endif
2520
2521 ACQUIRE_LOCK(&sched_mutex);
2522
2523 /* A capability holds the state a native thread needs in
2524 * order to execute STG code. At least one capability is
2525 * floating around (only THREADED_RTS builds have more than one).
2526 */
2527 initCapabilities();
2528
2529 initTaskManager();
2530
2531 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2532 initSparkPools();
2533 #endif
2534
2535 #if defined(THREADED_RTS)
2536 /*
2537 * Eagerly start one worker to run each Capability, except for
2538 * Capability 0. The idea is that we're probably going to start a
2539 * bound thread on Capability 0 pretty soon, so we don't want a
2540 * worker task hogging it.
2541 */
2542 {
2543 nat i;
2544 Capability *cap;
2545 for (i = 1; i < n_capabilities; i++) {
2546 cap = &capabilities[i];
2547 ACQUIRE_LOCK(&cap->lock);
2548 startWorkerTask(cap, workerStart);
2549 RELEASE_LOCK(&cap->lock);
2550 }
2551 }
2552 #endif
2553
2554 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2555
2556 RELEASE_LOCK(&sched_mutex);
2557 }
2558
2559 void
2560 exitScheduler( void )
2561 {
2562 Task *task = NULL;
2563
2564 #if defined(THREADED_RTS)
2565 ACQUIRE_LOCK(&sched_mutex);
2566 task = newBoundTask();
2567 RELEASE_LOCK(&sched_mutex);
2568 #endif
2569
2570 // If we haven't killed all the threads yet, do it now.
2571 if (sched_state < SCHED_SHUTTING_DOWN) {
2572 sched_state = SCHED_INTERRUPTING;
2573 scheduleDoGC(NULL,task,rtsFalse,GetRoots);
2574 }
2575 sched_state = SCHED_SHUTTING_DOWN;
2576
2577 #if defined(THREADED_RTS)
2578 {
2579 nat i;
2580
2581 for (i = 0; i < n_capabilities; i++) {
2582 shutdownCapability(&capabilities[i], task);
2583 }
2584 boundTaskExiting(task);
2585 stopTaskManager();
2586 }
2587 #endif
2588 }
2589
2590 /* ---------------------------------------------------------------------------
2591 Where are the roots that we know about?
2592
2593 - all the threads on the runnable queue
2594 - all the threads on the blocked queue
2595 - all the threads on the sleeping queue
2596 - all the thread currently executing a _ccall_GC
2597 - all the "main threads"
2598
2599 ------------------------------------------------------------------------ */
2600
2601 /* This has to be protected either by the scheduler monitor, or by the
2602 garbage collection monitor (probably the latter).
2603 KH @ 25/10/99
2604 */
2605
2606 void
2607 GetRoots( evac_fn evac )
2608 {
2609 nat i;
2610 Capability *cap;
2611 Task *task;
2612
2613 #if defined(GRAN)
2614 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2615 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2616 evac((StgClosure **)&run_queue_hds[i]);
2617 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2618 evac((StgClosure **)&run_queue_tls[i]);
2619
2620 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2621 evac((StgClosure **)&blocked_queue_hds[i]);
2622 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2623 evac((StgClosure **)&blocked_queue_tls[i]);
2624 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2625 evac((StgClosure **)&ccalling_threads[i]);
2626 }
2627
2628 markEventQueue();
2629
2630 #else /* !GRAN */
2631
2632 for (i = 0; i < n_capabilities; i++) {
2633 cap = &capabilities[i];
2634 evac((StgClosure **)(void *)&cap->run_queue_hd);
2635 evac((StgClosure **)(void *)&cap->run_queue_tl);
2636 #if defined(THREADED_RTS)
2637 evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2638 evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2639 #endif
2640 for (task = cap->suspended_ccalling_tasks; task != NULL;
2641 task=task->next) {
2642 debugTrace(DEBUG_sched,
2643 "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2644 evac((StgClosure **)(void *)&task->suspended_tso);
2645 }
2646
2647 }
2648
2649
2650 #if !defined(THREADED_RTS)
2651 evac((StgClosure **)(void *)&blocked_queue_hd);
2652 evac((StgClosure **)(void *)&blocked_queue_tl);
2653 evac((StgClosure **)(void *)&sleeping_queue);
2654 #endif
2655 #endif
2656
2657 // evac((StgClosure **)&blackhole_queue);
2658
2659 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2660 markSparkQueue(evac);
2661 #endif
2662
2663 #if defined(RTS_USER_SIGNALS)
2664 // mark the signal handlers (signals should be already blocked)
2665 markSignalHandlers(evac);
2666 #endif
2667 }
2668
2669 /* -----------------------------------------------------------------------------
2670 performGC
2671
2672 This is the interface to the garbage collector from Haskell land.
2673 We provide this so that external C code can allocate and garbage
2674 collect when called from Haskell via _ccall_GC.
2675
2676 It might be useful to provide an interface whereby the programmer
2677 can specify more roots (ToDo).
2678
2679 This needs to be protected by the GC condition variable above. KH.
2680 -------------------------------------------------------------------------- */
2681
2682 static void (*extra_roots)(evac_fn);
2683
2684 static void
2685 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
2686 {
2687 Task *task;
2688 // We must grab a new Task here, because the existing Task may be
2689 // associated with a particular Capability, and chained onto the
2690 // suspended_ccalling_tasks queue.
2691 ACQUIRE_LOCK(&sched_mutex);
2692 task = newBoundTask();
2693 RELEASE_LOCK(&sched_mutex);
2694 scheduleDoGC(NULL,task,force_major, get_roots);
2695 boundTaskExiting(task);
2696 }
2697
2698 void
2699 performGC(void)
2700 {
2701 performGC_(rtsFalse, GetRoots);
2702 }
2703
2704 void
2705 performMajorGC(void)
2706 {
2707 performGC_(rtsTrue, GetRoots);
2708 }
2709
2710 static void
2711 AllRoots(evac_fn evac)
2712 {
2713 GetRoots(evac); // the scheduler's roots
2714 extra_roots(evac); // the user's roots
2715 }
2716
2717 void
2718 performGCWithRoots(void (*get_roots)(evac_fn))
2719 {
2720 extra_roots = get_roots;
2721 performGC_(rtsFalse, AllRoots);
2722 }
2723
2724 /* -----------------------------------------------------------------------------
2725 Stack overflow
2726
2727 If the thread has reached its maximum stack size, then raise the
2728 StackOverflow exception in the offending thread. Otherwise
2729 relocate the TSO into a larger chunk of memory and adjust its stack
2730 size appropriately.
2731 -------------------------------------------------------------------------- */
2732
2733 static StgTSO *
2734 threadStackOverflow(Capability *cap, StgTSO *tso)
2735 {
2736 nat new_stack_size, stack_words;
2737 lnat new_tso_size;
2738 StgPtr new_sp;
2739 StgTSO *dest;
2740
2741 IF_DEBUG(sanity,checkTSO(tso));
2742
2743 // don't allow throwTo() to modify the blocked_exceptions queue
2744 // while we are moving the TSO:
2745 lockClosure((StgClosure *)tso);
2746
2747 if (tso->stack_size >= tso->max_stack_size) {
2748
2749 debugTrace(DEBUG_gc,
2750 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2751 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2752 IF_DEBUG(gc,
2753 /* If we're debugging, just print out the top of the stack */
2754 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2755 tso->sp+64)));
2756
2757 // Send this thread the StackOverflow exception
2758 unlockTSO(tso);
2759 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2760 return tso;
2761 }
2762
2763 /* Try to double the current stack size. If that takes us over the
2764 * maximum stack size for this thread, then use the maximum instead.
2765 * Finally round up so the TSO ends up as a whole number of blocks.
2766 */
2767 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2768 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2769 TSO_STRUCT_SIZE)/sizeof(W_);
2770 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2771 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2772
2773 debugTrace(DEBUG_sched,
2774 "increasing stack size from %ld words to %d.",
2775 (long)tso->stack_size, new_stack_size);
2776
2777 dest = (StgTSO *)allocate(new_tso_size);
2778 TICK_ALLOC_TSO(new_stack_size,0);
2779
2780 /* copy the TSO block and the old stack into the new area */
2781 memcpy(dest,tso,TSO_STRUCT_SIZE);
2782 stack_words = tso->stack + tso->stack_size - tso->sp;
2783 new_sp = (P_)dest + new_tso_size - stack_words;
2784 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2785
2786 /* relocate the stack pointers... */
2787 dest->sp = new_sp;
2788 dest->stack_size = new_stack_size;
2789
2790 /* Mark the old TSO as relocated. We have to check for relocated
2791 * TSOs in the garbage collector and any primops that deal with TSOs.
2792 *
2793 * It's important to set the sp value to just beyond the end
2794 * of the stack, so we don't attempt to scavenge any part of the
2795 * dead TSO's stack.
2796 */
2797 tso->what_next = ThreadRelocated;
2798 tso->link = dest;
2799 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2800 tso->why_blocked = NotBlocked;
2801
2802 IF_PAR_DEBUG(verbose,
2803 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2804 tso->id, tso, tso->stack_size);
2805 /* If we're debugging, just print out the top of the stack */
2806 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2807 tso->sp+64)));
2808
2809 unlockTSO(dest);
2810 unlockTSO(tso);
2811
2812 IF_DEBUG(sanity,checkTSO(dest));
2813 #if 0
2814 IF_DEBUG(scheduler,printTSO(dest));
2815 #endif
2816
2817 return dest;
2818 }
2819
2820 /* ---------------------------------------------------------------------------
2821 Interrupt execution
2822 - usually called inside a signal handler so it mustn't do anything fancy.
2823 ------------------------------------------------------------------------ */
2824
2825 void
2826 interruptStgRts(void)
2827 {
2828 sched_state = SCHED_INTERRUPTING;
2829 context_switch = 1;
2830 wakeUpRts();
2831 }
2832
2833 /* -----------------------------------------------------------------------------
2834 Wake up the RTS
2835
2836 This function causes at least one OS thread to wake up and run the
2837 scheduler loop. It is invoked when the RTS might be deadlocked, or
2838 an external event has arrived that may need servicing (eg. a
2839 keyboard interrupt).
2840
2841 In the single-threaded RTS we don't do anything here; we only have
2842 one thread anyway, and the event that caused us to want to wake up
2843 will have interrupted any blocking system call in progress anyway.
2844 -------------------------------------------------------------------------- */
2845
2846 void
2847 wakeUpRts(void)
2848 {
2849 #if defined(THREADED_RTS)
2850 #if !defined(mingw32_HOST_OS)
2851 // This forces the IO Manager thread to wakeup, which will
2852 // in turn ensure that some OS thread wakes up and runs the
2853 // scheduler loop, which will cause a GC and deadlock check.
2854 ioManagerWakeup();
2855 #else
2856 // On Windows this might be safe enough, because we aren't
2857 // in a signal handler. Later we should use the IO Manager,
2858 // though.
2859 prodOneCapability();
2860 #endif
2861 #endif
2862 }
2863
2864 /* -----------------------------------------------------------------------------
2865 * checkBlackHoles()
2866 *
2867 * Check the blackhole_queue for threads that can be woken up. We do
2868 * this periodically: before every GC, and whenever the run queue is
2869 * empty.
2870 *
2871 * An elegant solution might be to just wake up all the blocked
2872 * threads with awakenBlockedQueue occasionally: they'll go back to
2873 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2874 * doesn't give us a way to tell whether we've actually managed to
2875 * wake up any threads, so we would be busy-waiting.
2876 *
2877 * -------------------------------------------------------------------------- */
2878
2879 static rtsBool
2880 checkBlackHoles (Capability *cap)
2881 {
2882 StgTSO **prev, *t;
2883 rtsBool any_woke_up = rtsFalse;
2884 StgHalfWord type;
2885
2886 // blackhole_queue is global:
2887 ASSERT_LOCK_HELD(&sched_mutex);
2888
2889 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2890
2891 // ASSUMES: sched_mutex
2892 prev = &blackhole_queue;
2893 t = blackhole_queue;
2894 while (t != END_TSO_QUEUE) {
2895 ASSERT(t->why_blocked == BlockedOnBlackHole);
2896 type = get_itbl(t->block_info.closure)->type;
2897 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2898 IF_DEBUG(sanity,checkTSO(t));
2899 t = unblockOne(cap, t);
2900 // urk, the threads migrate to the current capability
2901 // here, but we'd like to keep them on the original one.
2902 *prev = t;
2903 any_woke_up = rtsTrue;
2904 } else {
2905 prev = &t->link;
2906 t = t->link;
2907 }
2908 }
2909
2910 return any_woke_up;
2911 }
2912
2913 /* -----------------------------------------------------------------------------
2914 Deleting threads
2915
2916 This is used for interruption (^C) and forking, and corresponds to
2917 raising an exception but without letting the thread catch the
2918 exception.
2919 -------------------------------------------------------------------------- */
2920
2921 static void
2922 deleteThread (Capability *cap, StgTSO *tso)
2923 {
2924 // NOTE: must only be called on a TSO that we have exclusive
2925 // access to, because we will call throwToSingleThreaded() below.
2926 // The TSO must be on the run queue of the Capability we own, or
2927 // we must own all Capabilities.
2928
2929 if (tso->why_blocked != BlockedOnCCall &&
2930 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2931 throwToSingleThreaded(cap,tso,NULL);
2932 }
2933 }
2934
2935 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2936 static void
2937 deleteThread_(Capability *cap, StgTSO *tso)
2938 { // for forkProcess only:
2939 // like deleteThread(), but we delete threads in foreign calls, too.
2940
2941 if (tso->why_blocked == BlockedOnCCall ||
2942 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2943 unblockOne(cap,tso);
2944 tso->what_next = ThreadKilled;
2945 } else {
2946 deleteThread(cap,tso);
2947 }
2948 }
2949 #endif
2950
2951 /* -----------------------------------------------------------------------------
2952 raiseExceptionHelper
2953
2954 This function is called by the raise# primitve, just so that we can
2955 move some of the tricky bits of raising an exception from C-- into
2956 C. Who knows, it might be a useful re-useable thing here too.
2957 -------------------------------------------------------------------------- */
2958
2959 StgWord
2960 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2961 {
2962 Capability *cap = regTableToCapability(reg);
2963 StgThunk *raise_closure = NULL;
2964 StgPtr p, next;
2965 StgRetInfoTable *info;
2966 //
2967 // This closure represents the expression 'raise# E' where E
2968 // is the exception raise. It is used to overwrite all the
2969 // thunks which are currently under evaluataion.
2970 //
2971
2972 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2973 // LDV profiling: stg_raise_info has THUNK as its closure
2974 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2975 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2976 // 1 does not cause any problem unless profiling is performed.
2977 // However, when LDV profiling goes on, we need to linearly scan
2978 // small object pool, where raise_closure is stored, so we should
2979 // use MIN_UPD_SIZE.
2980 //
2981 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2982 // sizeofW(StgClosure)+1);
2983 //
2984
2985 //
2986 // Walk up the stack, looking for the catch frame. On the way,
2987 // we update any closures pointed to from update frames with the
2988 // raise closure that we just built.
2989 //
2990 p = tso->sp;
2991 while(1) {
2992 info = get_ret_itbl((StgClosure *)p);
2993 next = p + stack_frame_sizeW((StgClosure *)p);
2994 switch (info->i.type) {
2995
2996 case UPDATE_FRAME:
2997 // Only create raise_closure if we need to.
2998 if (raise_closure == NULL) {
2999 raise_closure =
3000 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3001 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3002 raise_closure->payload[0] = exception;
3003 }
3004 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3005 p = next;
3006 continue;
3007
3008 case ATOMICALLY_FRAME:
3009 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3010 tso->sp = p;
3011 return ATOMICALLY_FRAME;
3012
3013 case CATCH_FRAME:
3014 tso->sp = p;
3015 return CATCH_FRAME;
3016
3017 case CATCH_STM_FRAME:
3018 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3019 tso->sp = p;
3020 return CATCH_STM_FRAME;
3021
3022 case STOP_FRAME:
3023 tso->sp = p;
3024 return STOP_FRAME;
3025
3026 case CATCH_RETRY_FRAME:
3027 default:
3028 p = next;
3029 continue;
3030 }
3031 }
3032 }
3033
3034
3035 /* -----------------------------------------------------------------------------
3036 findRetryFrameHelper
3037
3038 This function is called by the retry# primitive. It traverses the stack
3039 leaving tso->sp referring to the frame which should handle the retry.
3040
3041 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3042 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3043
3044 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3045 despite the similar implementation.
3046
3047 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3048 not be created within memory transactions.
3049 -------------------------------------------------------------------------- */
3050
3051 StgWord
3052 findRetryFrameHelper (StgTSO *tso)
3053 {
3054 StgPtr p, next;
3055 StgRetInfoTable *info;
3056
3057 p = tso -> sp;
3058 while (1) {
3059 info = get_ret_itbl((StgClosure *)p);
3060 next = p + stack_frame_sizeW((StgClosure *)p);
3061 switch (info->i.type) {
3062
3063 case ATOMICALLY_FRAME:
3064 debugTrace(DEBUG_stm,
3065 "found ATOMICALLY_FRAME at %p during retrry", p);
3066 tso->sp = p;
3067 return ATOMICALLY_FRAME;
3068
3069 case CATCH_RETRY_FRAME:
3070 debugTrace(DEBUG_stm,
3071 "found CATCH_RETRY_FRAME at %p during retrry", p);
3072 tso->sp = p;
3073 return CATCH_RETRY_FRAME;
3074
3075 case CATCH_STM_FRAME:
3076 default:
3077 ASSERT(info->i.type != CATCH_FRAME);
3078 ASSERT(info->i.type != STOP_FRAME);
3079 p = next;
3080 continue;
3081 }
3082 }
3083 }
3084
3085 /* -----------------------------------------------------------------------------
3086 resurrectThreads is called after garbage collection on the list of
3087 threads found to be garbage. Each of these threads will be woken
3088 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3089 on an MVar, or NonTermination if the thread was blocked on a Black
3090 Hole.
3091
3092 Locks: assumes we hold *all* the capabilities.
3093 -------------------------------------------------------------------------- */
3094
3095 void
3096 resurrectThreads (StgTSO *threads)
3097 {
3098 StgTSO *tso, *next;
3099 Capability *cap;
3100
3101 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3102 next = tso->global_link;
3103 tso->global_link = all_threads;
3104 all_threads = tso;
3105 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3106
3107 // Wake up the thread on the Capability it was last on
3108 cap = tso->cap;
3109
3110 switch (tso->why_blocked) {
3111 case BlockedOnMVar:
3112 case BlockedOnException:
3113 /* Called by GC - sched_mutex lock is currently held. */
3114 throwToSingleThreaded(cap, tso,
3115 (StgClosure *)BlockedOnDeadMVar_closure);
3116 break;
3117 case BlockedOnBlackHole:
3118 throwToSingleThreaded(cap, tso,
3119 (StgClosure *)NonTermination_closure);
3120 break;
3121 case BlockedOnSTM:
3122 throwToSingleThreaded(cap, tso,
3123 (StgClosure *)BlockedIndefinitely_closure);
3124 break;
3125 case NotBlocked:
3126 /* This might happen if the thread was blocked on a black hole
3127 * belonging to a thread that we've just woken up (raiseAsync
3128 * can wake up threads, remember...).
3129 */
3130 continue;
3131 default:
3132 barf("resurrectThreads: thread blocked in a strange way");
3133 }
3134 }
3135 }