Make forkProcess work with +RTS -N
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "Sparks.h"
31 #include "Capability.h"
32 #include "Task.h"
33 #include "AwaitEvent.h"
34 #if defined(mingw32_HOST_OS)
35 #include "win32/IOManager.h"
36 #endif
37 #include "Trace.h"
38 #include "RaiseAsync.h"
39 #include "Threads.h"
40 #include "Timer.h"
41 #include "ThreadPaused.h"
42 #include "Messages.h"
43 #include "Stable.h"
44
45 #ifdef HAVE_SYS_TYPES_H
46 #include <sys/types.h>
47 #endif
48 #ifdef HAVE_UNISTD_H
49 #include <unistd.h>
50 #endif
51
52 #include <string.h>
53 #include <stdlib.h>
54 #include <stdarg.h>
55
56 #ifdef HAVE_ERRNO_H
57 #include <errno.h>
58 #endif
59
60 #ifdef TRACING
61 #include "eventlog/EventLog.h"
62 #endif
63 /* -----------------------------------------------------------------------------
64 * Global variables
65 * -------------------------------------------------------------------------- */
66
67 #if !defined(THREADED_RTS)
68 // Blocked/sleeping thrads
69 StgTSO *blocked_queue_hd = NULL;
70 StgTSO *blocked_queue_tl = NULL;
71 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
72 #endif
73
74 /* Set to true when the latest garbage collection failed to reclaim
75 * enough space, and the runtime should proceed to shut itself down in
76 * an orderly fashion (emitting profiling info etc.)
77 */
78 rtsBool heap_overflow = rtsFalse;
79
80 /* flag that tracks whether we have done any execution in this time slice.
81 * LOCK: currently none, perhaps we should lock (but needs to be
82 * updated in the fast path of the scheduler).
83 *
84 * NB. must be StgWord, we do xchg() on it.
85 */
86 volatile StgWord recent_activity = ACTIVITY_YES;
87
88 /* if this flag is set as well, give up execution
89 * LOCK: none (changes monotonically)
90 */
91 volatile StgWord sched_state = SCHED_RUNNING;
92
93 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
94 * exists - earlier gccs apparently didn't.
95 * -= chak
96 */
97 StgTSO dummy_tso;
98
99 /*
100 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
101 * in an MT setting, needed to signal that a worker thread shouldn't hang around
102 * in the scheduler when it is out of work.
103 */
104 rtsBool shutting_down_scheduler = rtsFalse;
105
106 /*
107 * This mutex protects most of the global scheduler data in
108 * the THREADED_RTS runtime.
109 */
110 #if defined(THREADED_RTS)
111 Mutex sched_mutex;
112 #endif
113
114 #if !defined(mingw32_HOST_OS)
115 #define FORKPROCESS_PRIMOP_SUPPORTED
116 #endif
117
118 /* -----------------------------------------------------------------------------
119 * static function prototypes
120 * -------------------------------------------------------------------------- */
121
122 static Capability *schedule (Capability *initialCapability, Task *task);
123
124 //
125 // These function all encapsulate parts of the scheduler loop, and are
126 // abstracted only to make the structure and control flow of the
127 // scheduler clearer.
128 //
129 static void schedulePreLoop (void);
130 static void scheduleFindWork (Capability *cap);
131 #if defined(THREADED_RTS)
132 static void scheduleYield (Capability **pcap, Task *task);
133 #endif
134 #if defined(THREADED_RTS)
135 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
136 static void acquireAllCapabilities(Capability *cap, Task *task);
137 #endif
138 static void scheduleStartSignalHandlers (Capability *cap);
139 static void scheduleCheckBlockedThreads (Capability *cap);
140 static void scheduleProcessInbox(Capability *cap);
141 static void scheduleDetectDeadlock (Capability *cap, Task *task);
142 static void schedulePushWork(Capability *cap, Task *task);
143 #if defined(THREADED_RTS)
144 static void scheduleActivateSpark(Capability *cap);
145 #endif
146 static void schedulePostRunThread(Capability *cap, StgTSO *t);
147 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
148 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
149 nat prev_what_next );
150 static void scheduleHandleThreadBlocked( StgTSO *t );
151 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
152 StgTSO *t );
153 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
154 static Capability *scheduleDoGC(Capability *cap, Task *task,
155 rtsBool force_major);
156
157 static void deleteThread (Capability *cap, StgTSO *tso);
158 static void deleteAllThreads (Capability *cap);
159
160 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
161 static void deleteThread_(Capability *cap, StgTSO *tso);
162 #endif
163
164 /* ---------------------------------------------------------------------------
165 Main scheduling loop.
166
167 We use round-robin scheduling, each thread returning to the
168 scheduler loop when one of these conditions is detected:
169
170 * out of heap space
171 * timer expires (thread yields)
172 * thread blocks
173 * thread ends
174 * stack overflow
175
176 GRAN version:
177 In a GranSim setup this loop iterates over the global event queue.
178 This revolves around the global event queue, which determines what
179 to do next. Therefore, it's more complicated than either the
180 concurrent or the parallel (GUM) setup.
181 This version has been entirely removed (JB 2008/08).
182
183 GUM version:
184 GUM iterates over incoming messages.
185 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
186 and sends out a fish whenever it has nothing to do; in-between
187 doing the actual reductions (shared code below) it processes the
188 incoming messages and deals with delayed operations
189 (see PendingFetches).
190 This is not the ugliest code you could imagine, but it's bloody close.
191
192 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
193 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
194 as well as future GUM versions. This file has been refurbished to
195 only contain valid code, which is however incomplete, refers to
196 invalid includes etc.
197
198 ------------------------------------------------------------------------ */
199
200 static Capability *
201 schedule (Capability *initialCapability, Task *task)
202 {
203 StgTSO *t;
204 Capability *cap;
205 StgThreadReturnCode ret;
206 nat prev_what_next;
207 rtsBool ready_to_gc;
208 #if defined(THREADED_RTS)
209 rtsBool first = rtsTrue;
210 #endif
211
212 cap = initialCapability;
213
214 // Pre-condition: this task owns initialCapability.
215 // The sched_mutex is *NOT* held
216 // NB. on return, we still hold a capability.
217
218 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
219
220 schedulePreLoop();
221
222 // -----------------------------------------------------------
223 // Scheduler loop starts here:
224
225 while (1) {
226
227 // Check whether we have re-entered the RTS from Haskell without
228 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
229 // call).
230 if (cap->in_haskell) {
231 errorBelch("schedule: re-entered unsafely.\n"
232 " Perhaps a 'foreign import unsafe' should be 'safe'?");
233 stg_exit(EXIT_FAILURE);
234 }
235
236 // The interruption / shutdown sequence.
237 //
238 // In order to cleanly shut down the runtime, we want to:
239 // * make sure that all main threads return to their callers
240 // with the state 'Interrupted'.
241 // * clean up all OS threads assocated with the runtime
242 // * free all memory etc.
243 //
244 // So the sequence for ^C goes like this:
245 //
246 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
247 // arranges for some Capability to wake up
248 //
249 // * all threads in the system are halted, and the zombies are
250 // placed on the run queue for cleaning up. We acquire all
251 // the capabilities in order to delete the threads, this is
252 // done by scheduleDoGC() for convenience (because GC already
253 // needs to acquire all the capabilities). We can't kill
254 // threads involved in foreign calls.
255 //
256 // * somebody calls shutdownHaskell(), which calls exitScheduler()
257 //
258 // * sched_state := SCHED_SHUTTING_DOWN
259 //
260 // * all workers exit when the run queue on their capability
261 // drains. All main threads will also exit when their TSO
262 // reaches the head of the run queue and they can return.
263 //
264 // * eventually all Capabilities will shut down, and the RTS can
265 // exit.
266 //
267 // * We might be left with threads blocked in foreign calls,
268 // we should really attempt to kill these somehow (TODO);
269
270 switch (sched_state) {
271 case SCHED_RUNNING:
272 break;
273 case SCHED_INTERRUPTING:
274 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
275 /* scheduleDoGC() deletes all the threads */
276 cap = scheduleDoGC(cap,task,rtsFalse);
277
278 // after scheduleDoGC(), we must be shutting down. Either some
279 // other Capability did the final GC, or we did it above,
280 // either way we can fall through to the SCHED_SHUTTING_DOWN
281 // case now.
282 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
283 // fall through
284
285 case SCHED_SHUTTING_DOWN:
286 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
287 // If we are a worker, just exit. If we're a bound thread
288 // then we will exit below when we've removed our TSO from
289 // the run queue.
290 if (!isBoundTask(task) && emptyRunQueue(cap)) {
291 return cap;
292 }
293 break;
294 default:
295 barf("sched_state: %d", sched_state);
296 }
297
298 scheduleFindWork(cap);
299
300 /* work pushing, currently relevant only for THREADED_RTS:
301 (pushes threads, wakes up idle capabilities for stealing) */
302 schedulePushWork(cap,task);
303
304 scheduleDetectDeadlock(cap,task);
305
306 #if defined(THREADED_RTS)
307 cap = task->cap; // reload cap, it might have changed
308 #endif
309
310 // Normally, the only way we can get here with no threads to
311 // run is if a keyboard interrupt received during
312 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
313 // Additionally, it is not fatal for the
314 // threaded RTS to reach here with no threads to run.
315 //
316 // win32: might be here due to awaitEvent() being abandoned
317 // as a result of a console event having been delivered.
318
319 #if defined(THREADED_RTS)
320 if (first)
321 {
322 // XXX: ToDo
323 // // don't yield the first time, we want a chance to run this
324 // // thread for a bit, even if there are others banging at the
325 // // door.
326 // first = rtsFalse;
327 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
328 }
329
330 scheduleYield(&cap,task);
331
332 if (emptyRunQueue(cap)) continue; // look for work again
333 #endif
334
335 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
336 if ( emptyRunQueue(cap) ) {
337 ASSERT(sched_state >= SCHED_INTERRUPTING);
338 }
339 #endif
340
341 //
342 // Get a thread to run
343 //
344 t = popRunQueue(cap);
345
346 // Sanity check the thread we're about to run. This can be
347 // expensive if there is lots of thread switching going on...
348 IF_DEBUG(sanity,checkTSO(t));
349
350 #if defined(THREADED_RTS)
351 // Check whether we can run this thread in the current task.
352 // If not, we have to pass our capability to the right task.
353 {
354 InCall *bound = t->bound;
355
356 if (bound) {
357 if (bound->task == task) {
358 // yes, the Haskell thread is bound to the current native thread
359 } else {
360 debugTrace(DEBUG_sched,
361 "thread %lu bound to another OS thread",
362 (unsigned long)t->id);
363 // no, bound to a different Haskell thread: pass to that thread
364 pushOnRunQueue(cap,t);
365 continue;
366 }
367 } else {
368 // The thread we want to run is unbound.
369 if (task->incall->tso) {
370 debugTrace(DEBUG_sched,
371 "this OS thread cannot run thread %lu",
372 (unsigned long)t->id);
373 // no, the current native thread is bound to a different
374 // Haskell thread, so pass it to any worker thread
375 pushOnRunQueue(cap,t);
376 continue;
377 }
378 }
379 }
380 #endif
381
382 // If we're shutting down, and this thread has not yet been
383 // killed, kill it now. This sometimes happens when a finalizer
384 // thread is created by the final GC, or a thread previously
385 // in a foreign call returns.
386 if (sched_state >= SCHED_INTERRUPTING &&
387 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
388 deleteThread(cap,t);
389 }
390
391 /* context switches are initiated by the timer signal, unless
392 * the user specified "context switch as often as possible", with
393 * +RTS -C0
394 */
395 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
396 && !emptyThreadQueues(cap)) {
397 cap->context_switch = 1;
398 }
399
400 run_thread:
401
402 // CurrentTSO is the thread to run. t might be different if we
403 // loop back to run_thread, so make sure to set CurrentTSO after
404 // that.
405 cap->r.rCurrentTSO = t;
406
407 startHeapProfTimer();
408
409 // ----------------------------------------------------------------------
410 // Run the current thread
411
412 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
413 ASSERT(t->cap == cap);
414 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
415
416 prev_what_next = t->what_next;
417
418 errno = t->saved_errno;
419 #if mingw32_HOST_OS
420 SetLastError(t->saved_winerror);
421 #endif
422
423 // reset the interrupt flag before running Haskell code
424 cap->interrupt = 0;
425
426 cap->in_haskell = rtsTrue;
427
428 dirty_TSO(cap,t);
429 dirty_STACK(cap,t->stackobj);
430
431 #if defined(THREADED_RTS)
432 if (recent_activity == ACTIVITY_DONE_GC) {
433 // ACTIVITY_DONE_GC means we turned off the timer signal to
434 // conserve power (see #1623). Re-enable it here.
435 nat prev;
436 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
437 if (prev == ACTIVITY_DONE_GC) {
438 startTimer();
439 }
440 } else if (recent_activity != ACTIVITY_INACTIVE) {
441 // If we reached ACTIVITY_INACTIVE, then don't reset it until
442 // we've done the GC. The thread running here might just be
443 // the IO manager thread that handle_tick() woke up via
444 // wakeUpRts().
445 recent_activity = ACTIVITY_YES;
446 }
447 #endif
448
449 traceEventRunThread(cap, t);
450
451 switch (prev_what_next) {
452
453 case ThreadKilled:
454 case ThreadComplete:
455 /* Thread already finished, return to scheduler. */
456 ret = ThreadFinished;
457 break;
458
459 case ThreadRunGHC:
460 {
461 StgRegTable *r;
462 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
463 cap = regTableToCapability(r);
464 ret = r->rRet;
465 break;
466 }
467
468 case ThreadInterpret:
469 cap = interpretBCO(cap);
470 ret = cap->r.rRet;
471 break;
472
473 default:
474 barf("schedule: invalid what_next field");
475 }
476
477 cap->in_haskell = rtsFalse;
478
479 // The TSO might have moved, eg. if it re-entered the RTS and a GC
480 // happened. So find the new location:
481 t = cap->r.rCurrentTSO;
482
483 // And save the current errno in this thread.
484 // XXX: possibly bogus for SMP because this thread might already
485 // be running again, see code below.
486 t->saved_errno = errno;
487 #if mingw32_HOST_OS
488 // Similarly for Windows error code
489 t->saved_winerror = GetLastError();
490 #endif
491
492 if (ret == ThreadBlocked) {
493 if (t->why_blocked == BlockedOnBlackHole) {
494 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
495 traceEventStopThread(cap, t, t->why_blocked + 6,
496 owner != NULL ? owner->id : 0);
497 } else {
498 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
499 }
500 } else {
501 traceEventStopThread(cap, t, ret, 0);
502 }
503
504 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505 ASSERT(t->cap == cap);
506
507 // ----------------------------------------------------------------------
508
509 // Costs for the scheduler are assigned to CCS_SYSTEM
510 stopHeapProfTimer();
511 #if defined(PROFILING)
512 cap->r.rCCCS = CCS_SYSTEM;
513 #endif
514
515 schedulePostRunThread(cap,t);
516
517 ready_to_gc = rtsFalse;
518
519 switch (ret) {
520 case HeapOverflow:
521 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
522 break;
523
524 case StackOverflow:
525 // just adjust the stack for this thread, then pop it back
526 // on the run queue.
527 threadStackOverflow(cap, t);
528 pushOnRunQueue(cap,t);
529 break;
530
531 case ThreadYielding:
532 if (scheduleHandleYield(cap, t, prev_what_next)) {
533 // shortcut for switching between compiler/interpreter:
534 goto run_thread;
535 }
536 break;
537
538 case ThreadBlocked:
539 scheduleHandleThreadBlocked(t);
540 break;
541
542 case ThreadFinished:
543 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
544 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
545 break;
546
547 default:
548 barf("schedule: invalid thread return code %d", (int)ret);
549 }
550
551 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
552 cap = scheduleDoGC(cap,task,rtsFalse);
553 }
554 } /* end of while() */
555 }
556
557 /* -----------------------------------------------------------------------------
558 * Run queue operations
559 * -------------------------------------------------------------------------- */
560
561 void
562 removeFromRunQueue (Capability *cap, StgTSO *tso)
563 {
564 if (tso->block_info.prev == END_TSO_QUEUE) {
565 ASSERT(cap->run_queue_hd == tso);
566 cap->run_queue_hd = tso->_link;
567 } else {
568 setTSOLink(cap, tso->block_info.prev, tso->_link);
569 }
570 if (tso->_link == END_TSO_QUEUE) {
571 ASSERT(cap->run_queue_tl == tso);
572 cap->run_queue_tl = tso->block_info.prev;
573 } else {
574 setTSOPrev(cap, tso->_link, tso->block_info.prev);
575 }
576 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
577
578 IF_DEBUG(sanity, checkRunQueue(cap));
579 }
580
581 /* ----------------------------------------------------------------------------
582 * Setting up the scheduler loop
583 * ------------------------------------------------------------------------- */
584
585 static void
586 schedulePreLoop(void)
587 {
588 // initialisation for scheduler - what cannot go into initScheduler()
589
590 #if defined(mingw32_HOST_OS)
591 win32AllocStack();
592 #endif
593 }
594
595 /* -----------------------------------------------------------------------------
596 * scheduleFindWork()
597 *
598 * Search for work to do, and handle messages from elsewhere.
599 * -------------------------------------------------------------------------- */
600
601 static void
602 scheduleFindWork (Capability *cap)
603 {
604 scheduleStartSignalHandlers(cap);
605
606 scheduleProcessInbox(cap);
607
608 scheduleCheckBlockedThreads(cap);
609
610 #if defined(THREADED_RTS)
611 if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
612 #endif
613 }
614
615 #if defined(THREADED_RTS)
616 STATIC_INLINE rtsBool
617 shouldYieldCapability (Capability *cap, Task *task)
618 {
619 // we need to yield this capability to someone else if..
620 // - another thread is initiating a GC
621 // - another Task is returning from a foreign call
622 // - the thread at the head of the run queue cannot be run
623 // by this Task (it is bound to another Task, or it is unbound
624 // and this task it bound).
625 return (pending_sync ||
626 cap->returning_tasks_hd != NULL ||
627 (!emptyRunQueue(cap) && (task->incall->tso == NULL
628 ? cap->run_queue_hd->bound != NULL
629 : cap->run_queue_hd->bound != task->incall)));
630 }
631
632 // This is the single place where a Task goes to sleep. There are
633 // two reasons it might need to sleep:
634 // - there are no threads to run
635 // - we need to yield this Capability to someone else
636 // (see shouldYieldCapability())
637 //
638 // Careful: the scheduler loop is quite delicate. Make sure you run
639 // the tests in testsuite/concurrent (all ways) after modifying this,
640 // and also check the benchmarks in nofib/parallel for regressions.
641
642 static void
643 scheduleYield (Capability **pcap, Task *task)
644 {
645 Capability *cap = *pcap;
646
647 // if we have work, and we don't need to give up the Capability, continue.
648 //
649 if (!shouldYieldCapability(cap,task) &&
650 (!emptyRunQueue(cap) ||
651 !emptyInbox(cap) ||
652 sched_state >= SCHED_INTERRUPTING))
653 return;
654
655 // otherwise yield (sleep), and keep yielding if necessary.
656 do {
657 yieldCapability(&cap,task);
658 }
659 while (shouldYieldCapability(cap,task));
660
661 // note there may still be no threads on the run queue at this
662 // point, the caller has to check.
663
664 *pcap = cap;
665 return;
666 }
667 #endif
668
669 /* -----------------------------------------------------------------------------
670 * schedulePushWork()
671 *
672 * Push work to other Capabilities if we have some.
673 * -------------------------------------------------------------------------- */
674
675 static void
676 schedulePushWork(Capability *cap USED_IF_THREADS,
677 Task *task USED_IF_THREADS)
678 {
679 /* following code not for PARALLEL_HASKELL. I kept the call general,
680 future GUM versions might use pushing in a distributed setup */
681 #if defined(THREADED_RTS)
682
683 Capability *free_caps[n_capabilities], *cap0;
684 nat i, n_free_caps;
685
686 // migration can be turned off with +RTS -qm
687 if (!RtsFlags.ParFlags.migrate) return;
688
689 // Check whether we have more threads on our run queue, or sparks
690 // in our pool, that we could hand to another Capability.
691 if (cap->run_queue_hd == END_TSO_QUEUE) {
692 if (sparkPoolSizeCap(cap) < 2) return;
693 } else {
694 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
695 sparkPoolSizeCap(cap) < 1) return;
696 }
697
698 // First grab as many free Capabilities as we can.
699 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
700 cap0 = &capabilities[i];
701 if (cap != cap0 && tryGrabCapability(cap0,task)) {
702 if (!emptyRunQueue(cap0)
703 || cap->returning_tasks_hd != NULL
704 || cap->inbox != (Message*)END_TSO_QUEUE) {
705 // it already has some work, we just grabbed it at
706 // the wrong moment. Or maybe it's deadlocked!
707 releaseCapability(cap0);
708 } else {
709 free_caps[n_free_caps++] = cap0;
710 }
711 }
712 }
713
714 // we now have n_free_caps free capabilities stashed in
715 // free_caps[]. Share our run queue equally with them. This is
716 // probably the simplest thing we could do; improvements we might
717 // want to do include:
718 //
719 // - giving high priority to moving relatively new threads, on
720 // the gournds that they haven't had time to build up a
721 // working set in the cache on this CPU/Capability.
722 //
723 // - giving low priority to moving long-lived threads
724
725 if (n_free_caps > 0) {
726 StgTSO *prev, *t, *next;
727 #ifdef SPARK_PUSHING
728 rtsBool pushed_to_all;
729 #endif
730
731 debugTrace(DEBUG_sched,
732 "cap %d: %s and %d free capabilities, sharing...",
733 cap->no,
734 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
735 "excess threads on run queue":"sparks to share (>=2)",
736 n_free_caps);
737
738 i = 0;
739 #ifdef SPARK_PUSHING
740 pushed_to_all = rtsFalse;
741 #endif
742
743 if (cap->run_queue_hd != END_TSO_QUEUE) {
744 prev = cap->run_queue_hd;
745 t = prev->_link;
746 prev->_link = END_TSO_QUEUE;
747 for (; t != END_TSO_QUEUE; t = next) {
748 next = t->_link;
749 t->_link = END_TSO_QUEUE;
750 if (t->bound == task->incall // don't move my bound thread
751 || tsoLocked(t)) { // don't move a locked thread
752 setTSOLink(cap, prev, t);
753 setTSOPrev(cap, t, prev);
754 prev = t;
755 } else if (i == n_free_caps) {
756 #ifdef SPARK_PUSHING
757 pushed_to_all = rtsTrue;
758 #endif
759 i = 0;
760 // keep one for us
761 setTSOLink(cap, prev, t);
762 setTSOPrev(cap, t, prev);
763 prev = t;
764 } else {
765 appendToRunQueue(free_caps[i],t);
766
767 traceEventMigrateThread (cap, t, free_caps[i]->no);
768
769 if (t->bound) { t->bound->task->cap = free_caps[i]; }
770 t->cap = free_caps[i];
771 i++;
772 }
773 }
774 cap->run_queue_tl = prev;
775
776 IF_DEBUG(sanity, checkRunQueue(cap));
777 }
778
779 #ifdef SPARK_PUSHING
780 /* JB I left this code in place, it would work but is not necessary */
781
782 // If there are some free capabilities that we didn't push any
783 // threads to, then try to push a spark to each one.
784 if (!pushed_to_all) {
785 StgClosure *spark;
786 // i is the next free capability to push to
787 for (; i < n_free_caps; i++) {
788 if (emptySparkPoolCap(free_caps[i])) {
789 spark = tryStealSpark(cap->sparks);
790 if (spark != NULL) {
791 /* TODO: if anyone wants to re-enable this code then
792 * they must consider the fizzledSpark(spark) case
793 * and update the per-cap spark statistics.
794 */
795 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
796
797 traceEventStealSpark(free_caps[i], t, cap->no);
798
799 newSpark(&(free_caps[i]->r), spark);
800 }
801 }
802 }
803 }
804 #endif /* SPARK_PUSHING */
805
806 // release the capabilities
807 for (i = 0; i < n_free_caps; i++) {
808 task->cap = free_caps[i];
809 releaseAndWakeupCapability(free_caps[i]);
810 }
811 }
812 task->cap = cap; // reset to point to our Capability.
813
814 #endif /* THREADED_RTS */
815
816 }
817
818 /* ----------------------------------------------------------------------------
819 * Start any pending signal handlers
820 * ------------------------------------------------------------------------- */
821
822 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
823 static void
824 scheduleStartSignalHandlers(Capability *cap)
825 {
826 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
827 // safe outside the lock
828 startSignalHandlers(cap);
829 }
830 }
831 #else
832 static void
833 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
834 {
835 }
836 #endif
837
838 /* ----------------------------------------------------------------------------
839 * Check for blocked threads that can be woken up.
840 * ------------------------------------------------------------------------- */
841
842 static void
843 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
844 {
845 #if !defined(THREADED_RTS)
846 //
847 // Check whether any waiting threads need to be woken up. If the
848 // run queue is empty, and there are no other tasks running, we
849 // can wait indefinitely for something to happen.
850 //
851 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
852 {
853 awaitEvent (emptyRunQueue(cap));
854 }
855 #endif
856 }
857
858 /* ----------------------------------------------------------------------------
859 * Detect deadlock conditions and attempt to resolve them.
860 * ------------------------------------------------------------------------- */
861
862 static void
863 scheduleDetectDeadlock (Capability *cap, Task *task)
864 {
865 /*
866 * Detect deadlock: when we have no threads to run, there are no
867 * threads blocked, waiting for I/O, or sleeping, and all the
868 * other tasks are waiting for work, we must have a deadlock of
869 * some description.
870 */
871 if ( emptyThreadQueues(cap) )
872 {
873 #if defined(THREADED_RTS)
874 /*
875 * In the threaded RTS, we only check for deadlock if there
876 * has been no activity in a complete timeslice. This means
877 * we won't eagerly start a full GC just because we don't have
878 * any threads to run currently.
879 */
880 if (recent_activity != ACTIVITY_INACTIVE) return;
881 #endif
882
883 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
884
885 // Garbage collection can release some new threads due to
886 // either (a) finalizers or (b) threads resurrected because
887 // they are unreachable and will therefore be sent an
888 // exception. Any threads thus released will be immediately
889 // runnable.
890 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
891 // when force_major == rtsTrue. scheduleDoGC sets
892 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
893 // signal.
894
895 if ( !emptyRunQueue(cap) ) return;
896
897 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
898 /* If we have user-installed signal handlers, then wait
899 * for signals to arrive rather then bombing out with a
900 * deadlock.
901 */
902 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
903 debugTrace(DEBUG_sched,
904 "still deadlocked, waiting for signals...");
905
906 awaitUserSignals();
907
908 if (signals_pending()) {
909 startSignalHandlers(cap);
910 }
911
912 // either we have threads to run, or we were interrupted:
913 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
914
915 return;
916 }
917 #endif
918
919 #if !defined(THREADED_RTS)
920 /* Probably a real deadlock. Send the current main thread the
921 * Deadlock exception.
922 */
923 if (task->incall->tso) {
924 switch (task->incall->tso->why_blocked) {
925 case BlockedOnSTM:
926 case BlockedOnBlackHole:
927 case BlockedOnMsgThrowTo:
928 case BlockedOnMVar:
929 throwToSingleThreaded(cap, task->incall->tso,
930 (StgClosure *)nonTermination_closure);
931 return;
932 default:
933 barf("deadlock: main thread blocked in a strange way");
934 }
935 }
936 return;
937 #endif
938 }
939 }
940
941
942 /* ----------------------------------------------------------------------------
943 * Send pending messages (PARALLEL_HASKELL only)
944 * ------------------------------------------------------------------------- */
945
946 #if defined(PARALLEL_HASKELL)
947 static void
948 scheduleSendPendingMessages(void)
949 {
950
951 # if defined(PAR) // global Mem.Mgmt., omit for now
952 if (PendingFetches != END_BF_QUEUE) {
953 processFetches();
954 }
955 # endif
956
957 if (RtsFlags.ParFlags.BufferTime) {
958 // if we use message buffering, we must send away all message
959 // packets which have become too old...
960 sendOldBuffers();
961 }
962 }
963 #endif
964
965 /* ----------------------------------------------------------------------------
966 * Process message in the current Capability's inbox
967 * ------------------------------------------------------------------------- */
968
969 static void
970 scheduleProcessInbox (Capability *cap USED_IF_THREADS)
971 {
972 #if defined(THREADED_RTS)
973 Message *m, *next;
974 int r;
975
976 while (!emptyInbox(cap)) {
977 if (cap->r.rCurrentNursery->link == NULL ||
978 g0->n_new_large_words >= large_alloc_lim) {
979 scheduleDoGC(cap, cap->running_task, rtsFalse);
980 }
981
982 // don't use a blocking acquire; if the lock is held by
983 // another thread then just carry on. This seems to avoid
984 // getting stuck in a message ping-pong situation with other
985 // processors. We'll check the inbox again later anyway.
986 //
987 // We should really use a more efficient queue data structure
988 // here. The trickiness is that we must ensure a Capability
989 // never goes idle if the inbox is non-empty, which is why we
990 // use cap->lock (cap->lock is released as the last thing
991 // before going idle; see Capability.c:releaseCapability()).
992 r = TRY_ACQUIRE_LOCK(&cap->lock);
993 if (r != 0) return;
994
995 m = cap->inbox;
996 cap->inbox = (Message*)END_TSO_QUEUE;
997
998 RELEASE_LOCK(&cap->lock);
999
1000 while (m != (Message*)END_TSO_QUEUE) {
1001 next = m->link;
1002 executeMessage(cap, m);
1003 m = next;
1004 }
1005 }
1006 #endif
1007 }
1008
1009 /* ----------------------------------------------------------------------------
1010 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1011 * ------------------------------------------------------------------------- */
1012
1013 #if defined(THREADED_RTS)
1014 static void
1015 scheduleActivateSpark(Capability *cap)
1016 {
1017 if (anySparks())
1018 {
1019 createSparkThread(cap);
1020 debugTrace(DEBUG_sched, "creating a spark thread");
1021 }
1022 }
1023 #endif // PARALLEL_HASKELL || THREADED_RTS
1024
1025 /* ----------------------------------------------------------------------------
1026 * After running a thread...
1027 * ------------------------------------------------------------------------- */
1028
1029 static void
1030 schedulePostRunThread (Capability *cap, StgTSO *t)
1031 {
1032 // We have to be able to catch transactions that are in an
1033 // infinite loop as a result of seeing an inconsistent view of
1034 // memory, e.g.
1035 //
1036 // atomically $ do
1037 // [a,b] <- mapM readTVar [ta,tb]
1038 // when (a == b) loop
1039 //
1040 // and a is never equal to b given a consistent view of memory.
1041 //
1042 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1043 if (!stmValidateNestOfTransactions (t -> trec)) {
1044 debugTrace(DEBUG_sched | DEBUG_stm,
1045 "trec %p found wasting its time", t);
1046
1047 // strip the stack back to the
1048 // ATOMICALLY_FRAME, aborting the (nested)
1049 // transaction, and saving the stack of any
1050 // partially-evaluated thunks on the heap.
1051 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1052
1053 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1054 }
1055 }
1056
1057 /* some statistics gathering in the parallel case */
1058 }
1059
1060 /* -----------------------------------------------------------------------------
1061 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1062 * -------------------------------------------------------------------------- */
1063
1064 static rtsBool
1065 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1066 {
1067 // did the task ask for a large block?
1068 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1069 // if so, get one and push it on the front of the nursery.
1070 bdescr *bd;
1071 lnat blocks;
1072
1073 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1074
1075 if (blocks > BLOCKS_PER_MBLOCK) {
1076 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1077 }
1078
1079 debugTrace(DEBUG_sched,
1080 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1081 (long)t->id, what_next_strs[t->what_next], blocks);
1082
1083 // don't do this if the nursery is (nearly) full, we'll GC first.
1084 if (cap->r.rCurrentNursery->link != NULL ||
1085 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1086 // if the nursery has only one block.
1087
1088 bd = allocGroup_lock(blocks);
1089 cap->r.rNursery->n_blocks += blocks;
1090
1091 // link the new group into the list
1092 bd->link = cap->r.rCurrentNursery;
1093 bd->u.back = cap->r.rCurrentNursery->u.back;
1094 if (cap->r.rCurrentNursery->u.back != NULL) {
1095 cap->r.rCurrentNursery->u.back->link = bd;
1096 } else {
1097 cap->r.rNursery->blocks = bd;
1098 }
1099 cap->r.rCurrentNursery->u.back = bd;
1100
1101 // initialise it as a nursery block. We initialise the
1102 // step, gen_no, and flags field of *every* sub-block in
1103 // this large block, because this is easier than making
1104 // sure that we always find the block head of a large
1105 // block whenever we call Bdescr() (eg. evacuate() and
1106 // isAlive() in the GC would both have to do this, at
1107 // least).
1108 {
1109 bdescr *x;
1110 for (x = bd; x < bd + blocks; x++) {
1111 initBdescr(x,g0,g0);
1112 x->free = x->start;
1113 x->flags = 0;
1114 }
1115 }
1116
1117 // This assert can be a killer if the app is doing lots
1118 // of large block allocations.
1119 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1120
1121 // now update the nursery to point to the new block
1122 cap->r.rCurrentNursery = bd;
1123
1124 // we might be unlucky and have another thread get on the
1125 // run queue before us and steal the large block, but in that
1126 // case the thread will just end up requesting another large
1127 // block.
1128 pushOnRunQueue(cap,t);
1129 return rtsFalse; /* not actually GC'ing */
1130 }
1131 }
1132
1133 if (cap->r.rHpLim == NULL || cap->context_switch) {
1134 // Sometimes we miss a context switch, e.g. when calling
1135 // primitives in a tight loop, MAYBE_GC() doesn't check the
1136 // context switch flag, and we end up waiting for a GC.
1137 // See #1984, and concurrent/should_run/1984
1138 cap->context_switch = 0;
1139 appendToRunQueue(cap,t);
1140 } else {
1141 pushOnRunQueue(cap,t);
1142 }
1143 return rtsTrue;
1144 /* actual GC is done at the end of the while loop in schedule() */
1145 }
1146
1147 /* -----------------------------------------------------------------------------
1148 * Handle a thread that returned to the scheduler with ThreadYielding
1149 * -------------------------------------------------------------------------- */
1150
1151 static rtsBool
1152 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1153 {
1154 /* put the thread back on the run queue. Then, if we're ready to
1155 * GC, check whether this is the last task to stop. If so, wake
1156 * up the GC thread. getThread will block during a GC until the
1157 * GC is finished.
1158 */
1159
1160 ASSERT(t->_link == END_TSO_QUEUE);
1161
1162 // Shortcut if we're just switching evaluators: don't bother
1163 // doing stack squeezing (which can be expensive), just run the
1164 // thread.
1165 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1166 debugTrace(DEBUG_sched,
1167 "--<< thread %ld (%s) stopped to switch evaluators",
1168 (long)t->id, what_next_strs[t->what_next]);
1169 return rtsTrue;
1170 }
1171
1172 // Reset the context switch flag. We don't do this just before
1173 // running the thread, because that would mean we would lose ticks
1174 // during GC, which can lead to unfair scheduling (a thread hogs
1175 // the CPU because the tick always arrives during GC). This way
1176 // penalises threads that do a lot of allocation, but that seems
1177 // better than the alternative.
1178 if (cap->context_switch != 0) {
1179 cap->context_switch = 0;
1180 appendToRunQueue(cap,t);
1181 } else {
1182 pushOnRunQueue(cap,t);
1183 }
1184
1185 IF_DEBUG(sanity,
1186 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1187 checkTSO(t));
1188
1189 return rtsFalse;
1190 }
1191
1192 /* -----------------------------------------------------------------------------
1193 * Handle a thread that returned to the scheduler with ThreadBlocked
1194 * -------------------------------------------------------------------------- */
1195
1196 static void
1197 scheduleHandleThreadBlocked( StgTSO *t
1198 #if !defined(DEBUG)
1199 STG_UNUSED
1200 #endif
1201 )
1202 {
1203
1204 // We don't need to do anything. The thread is blocked, and it
1205 // has tidied up its stack and placed itself on whatever queue
1206 // it needs to be on.
1207
1208 // ASSERT(t->why_blocked != NotBlocked);
1209 // Not true: for example,
1210 // - the thread may have woken itself up already, because
1211 // threadPaused() might have raised a blocked throwTo
1212 // exception, see maybePerformBlockedException().
1213
1214 #ifdef DEBUG
1215 traceThreadStatus(DEBUG_sched, t);
1216 #endif
1217 }
1218
1219 /* -----------------------------------------------------------------------------
1220 * Handle a thread that returned to the scheduler with ThreadFinished
1221 * -------------------------------------------------------------------------- */
1222
1223 static rtsBool
1224 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1225 {
1226 /* Need to check whether this was a main thread, and if so,
1227 * return with the return value.
1228 *
1229 * We also end up here if the thread kills itself with an
1230 * uncaught exception, see Exception.cmm.
1231 */
1232
1233 // blocked exceptions can now complete, even if the thread was in
1234 // blocked mode (see #2910).
1235 awakenBlockedExceptionQueue (cap, t);
1236
1237 //
1238 // Check whether the thread that just completed was a bound
1239 // thread, and if so return with the result.
1240 //
1241 // There is an assumption here that all thread completion goes
1242 // through this point; we need to make sure that if a thread
1243 // ends up in the ThreadKilled state, that it stays on the run
1244 // queue so it can be dealt with here.
1245 //
1246
1247 if (t->bound) {
1248
1249 if (t->bound != task->incall) {
1250 #if !defined(THREADED_RTS)
1251 // Must be a bound thread that is not the topmost one. Leave
1252 // it on the run queue until the stack has unwound to the
1253 // point where we can deal with this. Leaving it on the run
1254 // queue also ensures that the garbage collector knows about
1255 // this thread and its return value (it gets dropped from the
1256 // step->threads list so there's no other way to find it).
1257 appendToRunQueue(cap,t);
1258 return rtsFalse;
1259 #else
1260 // this cannot happen in the threaded RTS, because a
1261 // bound thread can only be run by the appropriate Task.
1262 barf("finished bound thread that isn't mine");
1263 #endif
1264 }
1265
1266 ASSERT(task->incall->tso == t);
1267
1268 if (t->what_next == ThreadComplete) {
1269 if (task->incall->ret) {
1270 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1271 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1272 }
1273 task->incall->stat = Success;
1274 } else {
1275 if (task->incall->ret) {
1276 *(task->incall->ret) = NULL;
1277 }
1278 if (sched_state >= SCHED_INTERRUPTING) {
1279 if (heap_overflow) {
1280 task->incall->stat = HeapExhausted;
1281 } else {
1282 task->incall->stat = Interrupted;
1283 }
1284 } else {
1285 task->incall->stat = Killed;
1286 }
1287 }
1288 #ifdef DEBUG
1289 removeThreadLabel((StgWord)task->incall->tso->id);
1290 #endif
1291
1292 // We no longer consider this thread and task to be bound to
1293 // each other. The TSO lives on until it is GC'd, but the
1294 // task is about to be released by the caller, and we don't
1295 // want anyone following the pointer from the TSO to the
1296 // defunct task (which might have already been
1297 // re-used). This was a real bug: the GC updated
1298 // tso->bound->tso which lead to a deadlock.
1299 t->bound = NULL;
1300 task->incall->tso = NULL;
1301
1302 return rtsTrue; // tells schedule() to return
1303 }
1304
1305 return rtsFalse;
1306 }
1307
1308 /* -----------------------------------------------------------------------------
1309 * Perform a heap census
1310 * -------------------------------------------------------------------------- */
1311
1312 static rtsBool
1313 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1314 {
1315 // When we have +RTS -i0 and we're heap profiling, do a census at
1316 // every GC. This lets us get repeatable runs for debugging.
1317 if (performHeapProfile ||
1318 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1319 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1320 return rtsTrue;
1321 } else {
1322 return rtsFalse;
1323 }
1324 }
1325
1326 /* -----------------------------------------------------------------------------
1327 * Start a synchronisation of all capabilities
1328 * -------------------------------------------------------------------------- */
1329
1330 // Returns:
1331 // 0 if we successfully got a sync
1332 // non-0 if there was another sync request in progress,
1333 // and we yielded to it. The value returned is the
1334 // type of the other sync request.
1335 //
1336 #if defined(THREADED_RTS)
1337 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1338 {
1339 nat prev_pending_sync;
1340
1341 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1342
1343 if (prev_pending_sync)
1344 {
1345 do {
1346 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1347 prev_pending_sync);
1348 ASSERT(*pcap);
1349 yieldCapability(pcap,task);
1350 } while (pending_sync);
1351 return prev_pending_sync; // NOTE: task->cap might have changed now
1352 }
1353 else
1354 {
1355 return 0;
1356 }
1357 }
1358
1359 //
1360 // Grab all the capabilities except the one we already hold. Used
1361 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1362 // before a fork (SYNC_FORK).
1363 //
1364 // Only call this after requestSync(), otherwise a deadlock might
1365 // ensue if another thread is trying to synchronise.
1366 //
1367 static void acquireAllCapabilities(Capability *cap, Task *task)
1368 {
1369 Capability *tmpcap;
1370 nat i;
1371
1372 for (i=0; i < n_capabilities; i++) {
1373 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1374 tmpcap = &capabilities[i];
1375 if (tmpcap != cap) {
1376 // we better hope this task doesn't get migrated to
1377 // another Capability while we're waiting for this one.
1378 // It won't, because load balancing happens while we have
1379 // all the Capabilities, but even so it's a slightly
1380 // unsavoury invariant.
1381 task->cap = tmpcap;
1382 waitForReturnCapability(&tmpcap, task);
1383 if (tmpcap != &capabilities[i]) {
1384 barf("acquireAllCapabilities: got the wrong capability");
1385 }
1386 }
1387 }
1388 }
1389
1390 #endif
1391
1392 /* -----------------------------------------------------------------------------
1393 * Perform a garbage collection if necessary
1394 * -------------------------------------------------------------------------- */
1395
1396 static Capability *
1397 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1398 {
1399 rtsBool heap_census;
1400 #ifdef THREADED_RTS
1401 rtsBool gc_type;
1402 nat i, sync;
1403 #endif
1404
1405 if (sched_state == SCHED_SHUTTING_DOWN) {
1406 // The final GC has already been done, and the system is
1407 // shutting down. We'll probably deadlock if we try to GC
1408 // now.
1409 return cap;
1410 }
1411
1412 #ifdef THREADED_RTS
1413 if (sched_state < SCHED_INTERRUPTING
1414 && RtsFlags.ParFlags.parGcEnabled
1415 && N >= RtsFlags.ParFlags.parGcGen
1416 && ! oldest_gen->mark)
1417 {
1418 gc_type = SYNC_GC_PAR;
1419 } else {
1420 gc_type = SYNC_GC_SEQ;
1421 }
1422
1423 // In order to GC, there must be no threads running Haskell code.
1424 // Therefore, the GC thread needs to hold *all* the capabilities,
1425 // and release them after the GC has completed.
1426 //
1427 // This seems to be the simplest way: previous attempts involved
1428 // making all the threads with capabilities give up their
1429 // capabilities and sleep except for the *last* one, which
1430 // actually did the GC. But it's quite hard to arrange for all
1431 // the other tasks to sleep and stay asleep.
1432 //
1433
1434 /* Other capabilities are prevented from running yet more Haskell
1435 threads if pending_sync is set. Tested inside
1436 yieldCapability() and releaseCapability() in Capability.c */
1437
1438 do {
1439 sync = requestSync(&cap, task, gc_type);
1440 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1441 // someone else had a pending sync request for a GC, so
1442 // let's assume GC has been done and we don't need to GC
1443 // again.
1444 return cap;
1445 }
1446 } while (sync);
1447
1448 interruptAllCapabilities();
1449
1450 // The final shutdown GC is always single-threaded, because it's
1451 // possible that some of the Capabilities have no worker threads.
1452
1453 if (gc_type == SYNC_GC_SEQ)
1454 {
1455 traceEventRequestSeqGc(cap);
1456 }
1457 else
1458 {
1459 traceEventRequestParGc(cap);
1460 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1461 }
1462
1463 if (gc_type == SYNC_GC_SEQ)
1464 {
1465 // single-threaded GC: grab all the capabilities
1466 acquireAllCapabilities(cap,task);
1467 }
1468 else
1469 {
1470 // multi-threaded GC: make sure all the Capabilities donate one
1471 // GC thread each.
1472 waitForGcThreads(cap);
1473
1474 #if defined(THREADED_RTS)
1475 // Stable point where we can do a global check on our spark counters
1476 ASSERT(checkSparkCountInvariant());
1477 #endif
1478 }
1479
1480 #endif
1481
1482 IF_DEBUG(scheduler, printAllThreads());
1483
1484 delete_threads_and_gc:
1485 /*
1486 * We now have all the capabilities; if we're in an interrupting
1487 * state, then we should take the opportunity to delete all the
1488 * threads in the system.
1489 */
1490 if (sched_state == SCHED_INTERRUPTING) {
1491 deleteAllThreads(cap);
1492 #if defined(THREADED_RTS)
1493 // Discard all the sparks from every Capability. Why?
1494 // They'll probably be GC'd anyway since we've killed all the
1495 // threads. It just avoids the GC having to do any work to
1496 // figure out that any remaining sparks are garbage.
1497 for (i = 0; i < n_capabilities; i++) {
1498 capabilities[i].spark_stats.gcd +=
1499 sparkPoolSize(capabilities[i].sparks);
1500 // No race here since all Caps are stopped.
1501 discardSparksCap(&capabilities[i]);
1502 }
1503 #endif
1504 sched_state = SCHED_SHUTTING_DOWN;
1505 }
1506
1507 heap_census = scheduleNeedHeapProfile(rtsTrue);
1508
1509 traceEventGcStart(cap);
1510 #if defined(THREADED_RTS)
1511 // reset pending_sync *before* GC, so that when the GC threads
1512 // emerge they don't immediately re-enter the GC.
1513 pending_sync = 0;
1514 GarbageCollect(force_major || heap_census, heap_census, gc_type, cap);
1515 #else
1516 GarbageCollect(force_major || heap_census, heap_census, 0, cap);
1517 #endif
1518 traceEventGcEnd(cap);
1519
1520 traceSparkCounters(cap);
1521
1522 if (recent_activity == ACTIVITY_INACTIVE && force_major)
1523 {
1524 // We are doing a GC because the system has been idle for a
1525 // timeslice and we need to check for deadlock. Record the
1526 // fact that we've done a GC and turn off the timer signal;
1527 // it will get re-enabled if we run any threads after the GC.
1528 recent_activity = ACTIVITY_DONE_GC;
1529 stopTimer();
1530 }
1531 else
1532 {
1533 // the GC might have taken long enough for the timer to set
1534 // recent_activity = ACTIVITY_INACTIVE, but we aren't
1535 // necessarily deadlocked:
1536 recent_activity = ACTIVITY_YES;
1537 }
1538
1539 #if defined(THREADED_RTS)
1540 // Stable point where we can do a global check on our spark counters
1541 ASSERT(checkSparkCountInvariant());
1542 #endif
1543
1544 // The heap census itself is done during GarbageCollect().
1545 if (heap_census) {
1546 performHeapProfile = rtsFalse;
1547 }
1548
1549 #if defined(THREADED_RTS)
1550 if (gc_type == SYNC_GC_PAR)
1551 {
1552 releaseGCThreads(cap);
1553 }
1554 #endif
1555
1556 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1557 // GC set the heap_overflow flag, so we should proceed with
1558 // an orderly shutdown now. Ultimately we want the main
1559 // thread to return to its caller with HeapExhausted, at which
1560 // point the caller should call hs_exit(). The first step is
1561 // to delete all the threads.
1562 //
1563 // Another way to do this would be to raise an exception in
1564 // the main thread, which we really should do because it gives
1565 // the program a chance to clean up. But how do we find the
1566 // main thread? It should presumably be the same one that
1567 // gets ^C exceptions, but that's all done on the Haskell side
1568 // (GHC.TopHandler).
1569 sched_state = SCHED_INTERRUPTING;
1570 goto delete_threads_and_gc;
1571 }
1572
1573 #ifdef SPARKBALANCE
1574 /* JB
1575 Once we are all together... this would be the place to balance all
1576 spark pools. No concurrent stealing or adding of new sparks can
1577 occur. Should be defined in Sparks.c. */
1578 balanceSparkPoolsCaps(n_capabilities, capabilities);
1579 #endif
1580
1581 #if defined(THREADED_RTS)
1582 if (gc_type == SYNC_GC_SEQ) {
1583 // release our stash of capabilities.
1584 for (i = 0; i < n_capabilities; i++) {
1585 if (cap != &capabilities[i]) {
1586 task->cap = &capabilities[i];
1587 releaseCapability(&capabilities[i]);
1588 }
1589 }
1590 }
1591 if (cap) {
1592 task->cap = cap;
1593 } else {
1594 task->cap = NULL;
1595 }
1596 #endif
1597
1598 return cap;
1599 }
1600
1601 /* ---------------------------------------------------------------------------
1602 * Singleton fork(). Do not copy any running threads.
1603 * ------------------------------------------------------------------------- */
1604
1605 pid_t
1606 forkProcess(HsStablePtr *entry
1607 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1608 STG_UNUSED
1609 #endif
1610 )
1611 {
1612 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1613 pid_t pid;
1614 StgTSO* t,*next;
1615 Capability *cap;
1616 nat g;
1617 Task *task = NULL;
1618 nat i;
1619 #ifdef THREADED_RTS
1620 nat sync;
1621 #endif
1622
1623 debugTrace(DEBUG_sched, "forking!");
1624
1625 task = newBoundTask();
1626
1627 cap = NULL;
1628 waitForReturnCapability(&cap, task);
1629
1630 #ifdef THREADED_RTS
1631 do {
1632 sync = requestSync(&cap, task, SYNC_FORK);
1633 } while (sync);
1634
1635 acquireAllCapabilities(cap,task);
1636
1637 pending_sync = 0;
1638 #endif
1639
1640 // no funny business: hold locks while we fork, otherwise if some
1641 // other thread is holding a lock when the fork happens, the data
1642 // structure protected by the lock will forever be in an
1643 // inconsistent state in the child. See also #1391.
1644 ACQUIRE_LOCK(&sched_mutex);
1645 ACQUIRE_LOCK(&sm_mutex);
1646 ACQUIRE_LOCK(&stable_mutex);
1647 ACQUIRE_LOCK(&task->lock);
1648
1649 for (i=0; i < n_capabilities; i++) {
1650 ACQUIRE_LOCK(&capabilities[i].lock);
1651 }
1652
1653 stopTimer(); // See #4074
1654
1655 #if defined(TRACING)
1656 flushEventLog(); // so that child won't inherit dirty file buffers
1657 #endif
1658
1659 pid = fork();
1660
1661 if (pid) { // parent
1662
1663 startTimer(); // #4074
1664
1665 RELEASE_LOCK(&sched_mutex);
1666 RELEASE_LOCK(&sm_mutex);
1667 RELEASE_LOCK(&stable_mutex);
1668 RELEASE_LOCK(&task->lock);
1669
1670 for (i=0; i < n_capabilities; i++) {
1671 releaseCapability_(&capabilities[i],rtsFalse);
1672 RELEASE_LOCK(&capabilities[i].lock);
1673 }
1674 boundTaskExiting(task);
1675
1676 // just return the pid
1677 return pid;
1678
1679 } else { // child
1680
1681 #if defined(THREADED_RTS)
1682 initMutex(&sched_mutex);
1683 initMutex(&sm_mutex);
1684 initMutex(&stable_mutex);
1685 initMutex(&task->lock);
1686
1687 for (i=0; i < n_capabilities; i++) {
1688 initMutex(&capabilities[i].lock);
1689 }
1690 #endif
1691
1692 #ifdef TRACING
1693 resetTracing();
1694 #endif
1695
1696 // Now, all OS threads except the thread that forked are
1697 // stopped. We need to stop all Haskell threads, including
1698 // those involved in foreign calls. Also we need to delete
1699 // all Tasks, because they correspond to OS threads that are
1700 // now gone.
1701
1702 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1703 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1704 next = t->global_link;
1705 // don't allow threads to catch the ThreadKilled
1706 // exception, but we do want to raiseAsync() because these
1707 // threads may be evaluating thunks that we need later.
1708 deleteThread_(t->cap,t);
1709
1710 // stop the GC from updating the InCall to point to
1711 // the TSO. This is only necessary because the
1712 // OSThread bound to the TSO has been killed, and
1713 // won't get a chance to exit in the usual way (see
1714 // also scheduleHandleThreadFinished).
1715 t->bound = NULL;
1716 }
1717 }
1718
1719 discardTasksExcept(task);
1720
1721 for (i=0; i < n_capabilities; i++) {
1722 cap = &capabilities[i];
1723
1724 // Empty the run queue. It seems tempting to let all the
1725 // killed threads stay on the run queue as zombies to be
1726 // cleaned up later, but some of them may correspond to
1727 // bound threads for which the corresponding Task does not
1728 // exist.
1729 cap->run_queue_hd = END_TSO_QUEUE;
1730 cap->run_queue_tl = END_TSO_QUEUE;
1731
1732 // Any suspended C-calling Tasks are no more, their OS threads
1733 // don't exist now:
1734 cap->suspended_ccalls = NULL;
1735
1736 #if defined(THREADED_RTS)
1737 // Wipe our spare workers list, they no longer exist. New
1738 // workers will be created if necessary.
1739 cap->spare_workers = NULL;
1740 cap->n_spare_workers = 0;
1741 cap->returning_tasks_hd = NULL;
1742 cap->returning_tasks_tl = NULL;
1743 #endif
1744
1745 // Release all caps except 0, we'll use that for starting
1746 // the IO manager and running the client action below.
1747 if (cap->no != 0) {
1748 task->cap = cap;
1749 releaseCapability(cap);
1750 }
1751 }
1752 cap = &capabilities[0];
1753 task->cap = cap;
1754
1755 // Empty the threads lists. Otherwise, the garbage
1756 // collector may attempt to resurrect some of these threads.
1757 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1758 generations[g].threads = END_TSO_QUEUE;
1759 }
1760
1761 // On Unix, all timers are reset in the child, so we need to start
1762 // the timer again.
1763 initTimer();
1764 startTimer();
1765
1766 #if defined(THREADED_RTS)
1767 ioManagerStartCap(&cap);
1768 #endif
1769
1770 rts_evalStableIO(&cap, entry, NULL); // run the action
1771 rts_checkSchedStatus("forkProcess",cap);
1772
1773 rts_unlock(cap);
1774 hs_exit(); // clean up and exit
1775 stg_exit(EXIT_SUCCESS);
1776 }
1777 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1778 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1779 #endif
1780 }
1781
1782 /* ---------------------------------------------------------------------------
1783 * Delete all the threads in the system
1784 * ------------------------------------------------------------------------- */
1785
1786 static void
1787 deleteAllThreads ( Capability *cap )
1788 {
1789 // NOTE: only safe to call if we own all capabilities.
1790
1791 StgTSO* t, *next;
1792 nat g;
1793
1794 debugTrace(DEBUG_sched,"deleting all threads");
1795 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1796 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1797 next = t->global_link;
1798 deleteThread(cap,t);
1799 }
1800 }
1801
1802 // The run queue now contains a bunch of ThreadKilled threads. We
1803 // must not throw these away: the main thread(s) will be in there
1804 // somewhere, and the main scheduler loop has to deal with it.
1805 // Also, the run queue is the only thing keeping these threads from
1806 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1807
1808 #if !defined(THREADED_RTS)
1809 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1810 ASSERT(sleeping_queue == END_TSO_QUEUE);
1811 #endif
1812 }
1813
1814 /* -----------------------------------------------------------------------------
1815 Managing the suspended_ccalls list.
1816 Locks required: sched_mutex
1817 -------------------------------------------------------------------------- */
1818
1819 STATIC_INLINE void
1820 suspendTask (Capability *cap, Task *task)
1821 {
1822 InCall *incall;
1823
1824 incall = task->incall;
1825 ASSERT(incall->next == NULL && incall->prev == NULL);
1826 incall->next = cap->suspended_ccalls;
1827 incall->prev = NULL;
1828 if (cap->suspended_ccalls) {
1829 cap->suspended_ccalls->prev = incall;
1830 }
1831 cap->suspended_ccalls = incall;
1832 }
1833
1834 STATIC_INLINE void
1835 recoverSuspendedTask (Capability *cap, Task *task)
1836 {
1837 InCall *incall;
1838
1839 incall = task->incall;
1840 if (incall->prev) {
1841 incall->prev->next = incall->next;
1842 } else {
1843 ASSERT(cap->suspended_ccalls == incall);
1844 cap->suspended_ccalls = incall->next;
1845 }
1846 if (incall->next) {
1847 incall->next->prev = incall->prev;
1848 }
1849 incall->next = incall->prev = NULL;
1850 }
1851
1852 /* ---------------------------------------------------------------------------
1853 * Suspending & resuming Haskell threads.
1854 *
1855 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1856 * its capability before calling the C function. This allows another
1857 * task to pick up the capability and carry on running Haskell
1858 * threads. It also means that if the C call blocks, it won't lock
1859 * the whole system.
1860 *
1861 * The Haskell thread making the C call is put to sleep for the
1862 * duration of the call, on the suspended_ccalling_threads queue. We
1863 * give out a token to the task, which it can use to resume the thread
1864 * on return from the C function.
1865 *
1866 * If this is an interruptible C call, this means that the FFI call may be
1867 * unceremoniously terminated and should be scheduled on an
1868 * unbound worker thread.
1869 * ------------------------------------------------------------------------- */
1870
1871 void *
1872 suspendThread (StgRegTable *reg, rtsBool interruptible)
1873 {
1874 Capability *cap;
1875 int saved_errno;
1876 StgTSO *tso;
1877 Task *task;
1878 #if mingw32_HOST_OS
1879 StgWord32 saved_winerror;
1880 #endif
1881
1882 saved_errno = errno;
1883 #if mingw32_HOST_OS
1884 saved_winerror = GetLastError();
1885 #endif
1886
1887 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1888 */
1889 cap = regTableToCapability(reg);
1890
1891 task = cap->running_task;
1892 tso = cap->r.rCurrentTSO;
1893
1894 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
1895
1896 // XXX this might not be necessary --SDM
1897 tso->what_next = ThreadRunGHC;
1898
1899 threadPaused(cap,tso);
1900
1901 if (interruptible) {
1902 tso->why_blocked = BlockedOnCCall_Interruptible;
1903 } else {
1904 tso->why_blocked = BlockedOnCCall;
1905 }
1906
1907 // Hand back capability
1908 task->incall->suspended_tso = tso;
1909 task->incall->suspended_cap = cap;
1910
1911 ACQUIRE_LOCK(&cap->lock);
1912
1913 suspendTask(cap,task);
1914 cap->in_haskell = rtsFalse;
1915 releaseCapability_(cap,rtsFalse);
1916
1917 RELEASE_LOCK(&cap->lock);
1918
1919 errno = saved_errno;
1920 #if mingw32_HOST_OS
1921 SetLastError(saved_winerror);
1922 #endif
1923 return task;
1924 }
1925
1926 StgRegTable *
1927 resumeThread (void *task_)
1928 {
1929 StgTSO *tso;
1930 InCall *incall;
1931 Capability *cap;
1932 Task *task = task_;
1933 int saved_errno;
1934 #if mingw32_HOST_OS
1935 StgWord32 saved_winerror;
1936 #endif
1937
1938 saved_errno = errno;
1939 #if mingw32_HOST_OS
1940 saved_winerror = GetLastError();
1941 #endif
1942
1943 incall = task->incall;
1944 cap = incall->suspended_cap;
1945 task->cap = cap;
1946
1947 // Wait for permission to re-enter the RTS with the result.
1948 waitForReturnCapability(&cap,task);
1949 // we might be on a different capability now... but if so, our
1950 // entry on the suspended_ccalls list will also have been
1951 // migrated.
1952
1953 // Remove the thread from the suspended list
1954 recoverSuspendedTask(cap,task);
1955
1956 tso = incall->suspended_tso;
1957 incall->suspended_tso = NULL;
1958 incall->suspended_cap = NULL;
1959 tso->_link = END_TSO_QUEUE; // no write barrier reqd
1960
1961 traceEventRunThread(cap, tso);
1962
1963 /* Reset blocking status */
1964 tso->why_blocked = NotBlocked;
1965
1966 if ((tso->flags & TSO_BLOCKEX) == 0) {
1967 // avoid locking the TSO if we don't have to
1968 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
1969 maybePerformBlockedException(cap,tso);
1970 }
1971 }
1972
1973 cap->r.rCurrentTSO = tso;
1974 cap->in_haskell = rtsTrue;
1975 errno = saved_errno;
1976 #if mingw32_HOST_OS
1977 SetLastError(saved_winerror);
1978 #endif
1979
1980 /* We might have GC'd, mark the TSO dirty again */
1981 dirty_TSO(cap,tso);
1982 dirty_STACK(cap,tso->stackobj);
1983
1984 IF_DEBUG(sanity, checkTSO(tso));
1985
1986 return &cap->r;
1987 }
1988
1989 /* ---------------------------------------------------------------------------
1990 * scheduleThread()
1991 *
1992 * scheduleThread puts a thread on the end of the runnable queue.
1993 * This will usually be done immediately after a thread is created.
1994 * The caller of scheduleThread must create the thread using e.g.
1995 * createThread and push an appropriate closure
1996 * on this thread's stack before the scheduler is invoked.
1997 * ------------------------------------------------------------------------ */
1998
1999 void
2000 scheduleThread(Capability *cap, StgTSO *tso)
2001 {
2002 // The thread goes at the *end* of the run-queue, to avoid possible
2003 // starvation of any threads already on the queue.
2004 appendToRunQueue(cap,tso);
2005 }
2006
2007 void
2008 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2009 {
2010 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2011 // move this thread from now on.
2012 #if defined(THREADED_RTS)
2013 cpu %= RtsFlags.ParFlags.nNodes;
2014 if (cpu == cap->no) {
2015 appendToRunQueue(cap,tso);
2016 } else {
2017 migrateThread(cap, tso, &capabilities[cpu]);
2018 }
2019 #else
2020 appendToRunQueue(cap,tso);
2021 #endif
2022 }
2023
2024 void
2025 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2026 {
2027 Task *task;
2028 DEBUG_ONLY( StgThreadID id );
2029 Capability *cap;
2030
2031 cap = *pcap;
2032
2033 // We already created/initialised the Task
2034 task = cap->running_task;
2035
2036 // This TSO is now a bound thread; make the Task and TSO
2037 // point to each other.
2038 tso->bound = task->incall;
2039 tso->cap = cap;
2040
2041 task->incall->tso = tso;
2042 task->incall->ret = ret;
2043 task->incall->stat = NoStatus;
2044
2045 appendToRunQueue(cap,tso);
2046
2047 DEBUG_ONLY( id = tso->id );
2048 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2049
2050 cap = schedule(cap,task);
2051
2052 ASSERT(task->incall->stat != NoStatus);
2053 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2054
2055 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2056 *pcap = cap;
2057 }
2058
2059 /* ----------------------------------------------------------------------------
2060 * Starting Tasks
2061 * ------------------------------------------------------------------------- */
2062
2063 #if defined(THREADED_RTS)
2064 void scheduleWorker (Capability *cap, Task *task)
2065 {
2066 // schedule() runs without a lock.
2067 cap = schedule(cap,task);
2068
2069 // On exit from schedule(), we have a Capability, but possibly not
2070 // the same one we started with.
2071
2072 // During shutdown, the requirement is that after all the
2073 // Capabilities are shut down, all workers that are shutting down
2074 // have finished workerTaskStop(). This is why we hold on to
2075 // cap->lock until we've finished workerTaskStop() below.
2076 //
2077 // There may be workers still involved in foreign calls; those
2078 // will just block in waitForReturnCapability() because the
2079 // Capability has been shut down.
2080 //
2081 ACQUIRE_LOCK(&cap->lock);
2082 releaseCapability_(cap,rtsFalse);
2083 workerTaskStop(task);
2084 RELEASE_LOCK(&cap->lock);
2085 }
2086 #endif
2087
2088 /* ---------------------------------------------------------------------------
2089 * initScheduler()
2090 *
2091 * Initialise the scheduler. This resets all the queues - if the
2092 * queues contained any threads, they'll be garbage collected at the
2093 * next pass.
2094 *
2095 * ------------------------------------------------------------------------ */
2096
2097 void
2098 initScheduler(void)
2099 {
2100 #if !defined(THREADED_RTS)
2101 blocked_queue_hd = END_TSO_QUEUE;
2102 blocked_queue_tl = END_TSO_QUEUE;
2103 sleeping_queue = END_TSO_QUEUE;
2104 #endif
2105
2106 sched_state = SCHED_RUNNING;
2107 recent_activity = ACTIVITY_YES;
2108
2109 #if defined(THREADED_RTS)
2110 /* Initialise the mutex and condition variables used by
2111 * the scheduler. */
2112 initMutex(&sched_mutex);
2113 #endif
2114
2115 ACQUIRE_LOCK(&sched_mutex);
2116
2117 /* A capability holds the state a native thread needs in
2118 * order to execute STG code. At least one capability is
2119 * floating around (only THREADED_RTS builds have more than one).
2120 */
2121 initCapabilities();
2122
2123 initTaskManager();
2124
2125 RELEASE_LOCK(&sched_mutex);
2126
2127 #if defined(THREADED_RTS)
2128 /*
2129 * Eagerly start one worker to run each Capability, except for
2130 * Capability 0. The idea is that we're probably going to start a
2131 * bound thread on Capability 0 pretty soon, so we don't want a
2132 * worker task hogging it.
2133 */
2134 {
2135 nat i;
2136 Capability *cap;
2137 for (i = 1; i < n_capabilities; i++) {
2138 cap = &capabilities[i];
2139 ACQUIRE_LOCK(&cap->lock);
2140 startWorkerTask(cap);
2141 RELEASE_LOCK(&cap->lock);
2142 }
2143 }
2144 #endif
2145 }
2146
2147 void
2148 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2149 /* see Capability.c, shutdownCapability() */
2150 {
2151 Task *task = NULL;
2152
2153 task = newBoundTask();
2154
2155 // If we haven't killed all the threads yet, do it now.
2156 if (sched_state < SCHED_SHUTTING_DOWN) {
2157 sched_state = SCHED_INTERRUPTING;
2158 waitForReturnCapability(&task->cap,task);
2159 scheduleDoGC(task->cap,task,rtsFalse);
2160 ASSERT(task->incall->tso == NULL);
2161 releaseCapability(task->cap);
2162 }
2163 sched_state = SCHED_SHUTTING_DOWN;
2164
2165 shutdownCapabilities(task, wait_foreign);
2166
2167 boundTaskExiting(task);
2168 }
2169
2170 void
2171 freeScheduler( void )
2172 {
2173 nat still_running;
2174
2175 ACQUIRE_LOCK(&sched_mutex);
2176 still_running = freeTaskManager();
2177 // We can only free the Capabilities if there are no Tasks still
2178 // running. We might have a Task about to return from a foreign
2179 // call into waitForReturnCapability(), for example (actually,
2180 // this should be the *only* thing that a still-running Task can
2181 // do at this point, and it will block waiting for the
2182 // Capability).
2183 if (still_running == 0) {
2184 freeCapabilities();
2185 if (n_capabilities != 1) {
2186 stgFree(capabilities);
2187 }
2188 }
2189 RELEASE_LOCK(&sched_mutex);
2190 #if defined(THREADED_RTS)
2191 closeMutex(&sched_mutex);
2192 #endif
2193 }
2194
2195 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2196 void *user USED_IF_NOT_THREADS)
2197 {
2198 #if !defined(THREADED_RTS)
2199 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2200 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2201 evac(user, (StgClosure **)(void *)&sleeping_queue);
2202 #endif
2203 }
2204
2205 /* -----------------------------------------------------------------------------
2206 performGC
2207
2208 This is the interface to the garbage collector from Haskell land.
2209 We provide this so that external C code can allocate and garbage
2210 collect when called from Haskell via _ccall_GC.
2211 -------------------------------------------------------------------------- */
2212
2213 static void
2214 performGC_(rtsBool force_major)
2215 {
2216 Task *task;
2217
2218 // We must grab a new Task here, because the existing Task may be
2219 // associated with a particular Capability, and chained onto the
2220 // suspended_ccalls queue.
2221 task = newBoundTask();
2222
2223 waitForReturnCapability(&task->cap,task);
2224 scheduleDoGC(task->cap,task,force_major);
2225 releaseCapability(task->cap);
2226 boundTaskExiting(task);
2227 }
2228
2229 void
2230 performGC(void)
2231 {
2232 performGC_(rtsFalse);
2233 }
2234
2235 void
2236 performMajorGC(void)
2237 {
2238 performGC_(rtsTrue);
2239 }
2240
2241 /* ---------------------------------------------------------------------------
2242 Interrupt execution
2243 - usually called inside a signal handler so it mustn't do anything fancy.
2244 ------------------------------------------------------------------------ */
2245
2246 void
2247 interruptStgRts(void)
2248 {
2249 sched_state = SCHED_INTERRUPTING;
2250 interruptAllCapabilities();
2251 #if defined(THREADED_RTS)
2252 wakeUpRts();
2253 #endif
2254 }
2255
2256 /* -----------------------------------------------------------------------------
2257 Wake up the RTS
2258
2259 This function causes at least one OS thread to wake up and run the
2260 scheduler loop. It is invoked when the RTS might be deadlocked, or
2261 an external event has arrived that may need servicing (eg. a
2262 keyboard interrupt).
2263
2264 In the single-threaded RTS we don't do anything here; we only have
2265 one thread anyway, and the event that caused us to want to wake up
2266 will have interrupted any blocking system call in progress anyway.
2267 -------------------------------------------------------------------------- */
2268
2269 #if defined(THREADED_RTS)
2270 void wakeUpRts(void)
2271 {
2272 // This forces the IO Manager thread to wakeup, which will
2273 // in turn ensure that some OS thread wakes up and runs the
2274 // scheduler loop, which will cause a GC and deadlock check.
2275 ioManagerWakeup();
2276 }
2277 #endif
2278
2279 /* -----------------------------------------------------------------------------
2280 Deleting threads
2281
2282 This is used for interruption (^C) and forking, and corresponds to
2283 raising an exception but without letting the thread catch the
2284 exception.
2285 -------------------------------------------------------------------------- */
2286
2287 static void
2288 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2289 {
2290 // NOTE: must only be called on a TSO that we have exclusive
2291 // access to, because we will call throwToSingleThreaded() below.
2292 // The TSO must be on the run queue of the Capability we own, or
2293 // we must own all Capabilities.
2294
2295 if (tso->why_blocked != BlockedOnCCall &&
2296 tso->why_blocked != BlockedOnCCall_Interruptible) {
2297 throwToSingleThreaded(tso->cap,tso,NULL);
2298 }
2299 }
2300
2301 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2302 static void
2303 deleteThread_(Capability *cap, StgTSO *tso)
2304 { // for forkProcess only:
2305 // like deleteThread(), but we delete threads in foreign calls, too.
2306
2307 if (tso->why_blocked == BlockedOnCCall ||
2308 tso->why_blocked == BlockedOnCCall_Interruptible) {
2309 tso->what_next = ThreadKilled;
2310 appendToRunQueue(tso->cap, tso);
2311 } else {
2312 deleteThread(cap,tso);
2313 }
2314 }
2315 #endif
2316
2317 /* -----------------------------------------------------------------------------
2318 raiseExceptionHelper
2319
2320 This function is called by the raise# primitve, just so that we can
2321 move some of the tricky bits of raising an exception from C-- into
2322 C. Who knows, it might be a useful re-useable thing here too.
2323 -------------------------------------------------------------------------- */
2324
2325 StgWord
2326 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2327 {
2328 Capability *cap = regTableToCapability(reg);
2329 StgThunk *raise_closure = NULL;
2330 StgPtr p, next;
2331 StgRetInfoTable *info;
2332 //
2333 // This closure represents the expression 'raise# E' where E
2334 // is the exception raise. It is used to overwrite all the
2335 // thunks which are currently under evaluataion.
2336 //
2337
2338 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2339 // LDV profiling: stg_raise_info has THUNK as its closure
2340 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2341 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2342 // 1 does not cause any problem unless profiling is performed.
2343 // However, when LDV profiling goes on, we need to linearly scan
2344 // small object pool, where raise_closure is stored, so we should
2345 // use MIN_UPD_SIZE.
2346 //
2347 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2348 // sizeofW(StgClosure)+1);
2349 //
2350
2351 //
2352 // Walk up the stack, looking for the catch frame. On the way,
2353 // we update any closures pointed to from update frames with the
2354 // raise closure that we just built.
2355 //
2356 p = tso->stackobj->sp;
2357 while(1) {
2358 info = get_ret_itbl((StgClosure *)p);
2359 next = p + stack_frame_sizeW((StgClosure *)p);
2360 switch (info->i.type) {
2361
2362 case UPDATE_FRAME:
2363 // Only create raise_closure if we need to.
2364 if (raise_closure == NULL) {
2365 raise_closure =
2366 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2367 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2368 raise_closure->payload[0] = exception;
2369 }
2370 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2371 (StgClosure *)raise_closure);
2372 p = next;
2373 continue;
2374
2375 case ATOMICALLY_FRAME:
2376 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2377 tso->stackobj->sp = p;
2378 return ATOMICALLY_FRAME;
2379
2380 case CATCH_FRAME:
2381 tso->stackobj->sp = p;
2382 return CATCH_FRAME;
2383
2384 case CATCH_STM_FRAME:
2385 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2386 tso->stackobj->sp = p;
2387 return CATCH_STM_FRAME;
2388
2389 case UNDERFLOW_FRAME:
2390 tso->stackobj->sp = p;
2391 threadStackUnderflow(cap,tso);
2392 p = tso->stackobj->sp;
2393 continue;
2394
2395 case STOP_FRAME:
2396 tso->stackobj->sp = p;
2397 return STOP_FRAME;
2398
2399 case CATCH_RETRY_FRAME:
2400 default:
2401 p = next;
2402 continue;
2403 }
2404 }
2405 }
2406
2407
2408 /* -----------------------------------------------------------------------------
2409 findRetryFrameHelper
2410
2411 This function is called by the retry# primitive. It traverses the stack
2412 leaving tso->sp referring to the frame which should handle the retry.
2413
2414 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2415 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2416
2417 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2418 create) because retries are not considered to be exceptions, despite the
2419 similar implementation.
2420
2421 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2422 not be created within memory transactions.
2423 -------------------------------------------------------------------------- */
2424
2425 StgWord
2426 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2427 {
2428 StgPtr p, next;
2429 StgRetInfoTable *info;
2430
2431 p = tso->stackobj->sp;
2432 while (1) {
2433 info = get_ret_itbl((StgClosure *)p);
2434 next = p + stack_frame_sizeW((StgClosure *)p);
2435 switch (info->i.type) {
2436
2437 case ATOMICALLY_FRAME:
2438 debugTrace(DEBUG_stm,
2439 "found ATOMICALLY_FRAME at %p during retry", p);
2440 tso->stackobj->sp = p;
2441 return ATOMICALLY_FRAME;
2442
2443 case CATCH_RETRY_FRAME:
2444 debugTrace(DEBUG_stm,
2445 "found CATCH_RETRY_FRAME at %p during retrry", p);
2446 tso->stackobj->sp = p;
2447 return CATCH_RETRY_FRAME;
2448
2449 case CATCH_STM_FRAME: {
2450 StgTRecHeader *trec = tso -> trec;
2451 StgTRecHeader *outer = trec -> enclosing_trec;
2452 debugTrace(DEBUG_stm,
2453 "found CATCH_STM_FRAME at %p during retry", p);
2454 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2455 stmAbortTransaction(cap, trec);
2456 stmFreeAbortedTRec(cap, trec);
2457 tso -> trec = outer;
2458 p = next;
2459 continue;
2460 }
2461
2462 case UNDERFLOW_FRAME:
2463 threadStackUnderflow(cap,tso);
2464 p = tso->stackobj->sp;
2465 continue;
2466
2467 default:
2468 ASSERT(info->i.type != CATCH_FRAME);
2469 ASSERT(info->i.type != STOP_FRAME);
2470 p = next;
2471 continue;
2472 }
2473 }
2474 }
2475
2476 /* -----------------------------------------------------------------------------
2477 resurrectThreads is called after garbage collection on the list of
2478 threads found to be garbage. Each of these threads will be woken
2479 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2480 on an MVar, or NonTermination if the thread was blocked on a Black
2481 Hole.
2482
2483 Locks: assumes we hold *all* the capabilities.
2484 -------------------------------------------------------------------------- */
2485
2486 void
2487 resurrectThreads (StgTSO *threads)
2488 {
2489 StgTSO *tso, *next;
2490 Capability *cap;
2491 generation *gen;
2492
2493 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2494 next = tso->global_link;
2495
2496 gen = Bdescr((P_)tso)->gen;
2497 tso->global_link = gen->threads;
2498 gen->threads = tso;
2499
2500 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2501
2502 // Wake up the thread on the Capability it was last on
2503 cap = tso->cap;
2504
2505 switch (tso->why_blocked) {
2506 case BlockedOnMVar:
2507 /* Called by GC - sched_mutex lock is currently held. */
2508 throwToSingleThreaded(cap, tso,
2509 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2510 break;
2511 case BlockedOnBlackHole:
2512 throwToSingleThreaded(cap, tso,
2513 (StgClosure *)nonTermination_closure);
2514 break;
2515 case BlockedOnSTM:
2516 throwToSingleThreaded(cap, tso,
2517 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2518 break;
2519 case NotBlocked:
2520 /* This might happen if the thread was blocked on a black hole
2521 * belonging to a thread that we've just woken up (raiseAsync
2522 * can wake up threads, remember...).
2523 */
2524 continue;
2525 case BlockedOnMsgThrowTo:
2526 // This can happen if the target is masking, blocks on a
2527 // black hole, and then is found to be unreachable. In
2528 // this case, we want to let the target wake up and carry
2529 // on, and do nothing to this thread.
2530 continue;
2531 default:
2532 barf("resurrectThreads: thread blocked in a strange way: %d",
2533 tso->why_blocked);
2534 }
2535 }
2536 }