1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
13 #include "sm/Storage.h"
17 #include "Interpreter.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
24 #include "ThreadLabels.h"
26 #include "Proftimer.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
32 #include "Capability.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
39 #include "RaiseAsync.h"
42 #include "ThreadPaused.h"
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
62 #include "eventlog/EventLog.h"
64 /* -----------------------------------------------------------------------------
66 * -------------------------------------------------------------------------- */
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO
*blocked_queue_hd
= NULL
;
71 StgTSO
*blocked_queue_tl
= NULL
;
72 StgTSO
*sleeping_queue
= NULL
; // perhaps replace with a hash table?
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
79 rtsBool heap_overflow
= rtsFalse
;
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
85 * NB. must be StgWord, we do xchg() on it.
87 volatile StgWord recent_activity
= ACTIVITY_YES
;
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
92 volatile StgWord sched_state
= SCHED_RUNNING
;
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
104 #if defined(THREADED_RTS)
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
114 static nat n_failed_trygrab_idles
= 0, n_idle_caps
= 0;
117 /* -----------------------------------------------------------------------------
118 * static function prototypes
119 * -------------------------------------------------------------------------- */
121 static Capability
*schedule (Capability
*initialCapability
, Task
*task
);
124 // These functions all encapsulate parts of the scheduler loop, and are
125 // abstracted only to make the structure and control flow of the
126 // scheduler clearer.
128 static void schedulePreLoop (void);
129 static void scheduleFindWork (Capability
**pcap
);
130 #if defined(THREADED_RTS)
131 static void scheduleYield (Capability
**pcap
, Task
*task
);
133 #if defined(THREADED_RTS)
134 static nat
requestSync (Capability
**pcap
, Task
*task
, nat sync_type
);
135 static void acquireAllCapabilities(Capability
*cap
, Task
*task
);
136 static void releaseAllCapabilities(nat n
, Capability
*cap
, Task
*task
);
137 static void startWorkerTasks (nat from USED_IF_THREADS
, nat to USED_IF_THREADS
);
139 static void scheduleStartSignalHandlers (Capability
*cap
);
140 static void scheduleCheckBlockedThreads (Capability
*cap
);
141 static void scheduleProcessInbox(Capability
**cap
);
142 static void scheduleDetectDeadlock (Capability
**pcap
, Task
*task
);
143 static void schedulePushWork(Capability
*cap
, Task
*task
);
144 #if defined(THREADED_RTS)
145 static void scheduleActivateSpark(Capability
*cap
);
147 static void schedulePostRunThread(Capability
*cap
, StgTSO
*t
);
148 static rtsBool
scheduleHandleHeapOverflow( Capability
*cap
, StgTSO
*t
);
149 static rtsBool
scheduleHandleYield( Capability
*cap
, StgTSO
*t
,
150 nat prev_what_next
);
151 static void scheduleHandleThreadBlocked( StgTSO
*t
);
152 static rtsBool
scheduleHandleThreadFinished( Capability
*cap
, Task
*task
,
154 static rtsBool
scheduleNeedHeapProfile(rtsBool ready_to_gc
);
155 static void scheduleDoGC(Capability
**pcap
, Task
*task
, rtsBool force_major
);
157 static void deleteThread (Capability
*cap
, StgTSO
*tso
);
158 static void deleteAllThreads (Capability
*cap
);
160 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
161 static void deleteThread_(Capability
*cap
, StgTSO
*tso
);
164 /* ---------------------------------------------------------------------------
165 Main scheduling loop.
167 We use round-robin scheduling, each thread returning to the
168 scheduler loop when one of these conditions is detected:
171 * timer expires (thread yields)
176 ------------------------------------------------------------------------ */
179 schedule (Capability
*initialCapability
, Task
*task
)
183 StgThreadReturnCode ret
;
186 #if defined(THREADED_RTS)
187 rtsBool first
= rtsTrue
;
190 cap
= initialCapability
;
192 // Pre-condition: this task owns initialCapability.
193 // The sched_mutex is *NOT* held
194 // NB. on return, we still hold a capability.
196 debugTrace (DEBUG_sched
, "cap %d: schedule()", initialCapability
->no
);
200 // -----------------------------------------------------------
201 // Scheduler loop starts here:
205 // Check whether we have re-entered the RTS from Haskell without
206 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
208 if (cap
->in_haskell
) {
209 errorBelch("schedule: re-entered unsafely.\n"
210 " Perhaps a 'foreign import unsafe' should be 'safe'?");
211 stg_exit(EXIT_FAILURE
);
214 // The interruption / shutdown sequence.
216 // In order to cleanly shut down the runtime, we want to:
217 // * make sure that all main threads return to their callers
218 // with the state 'Interrupted'.
219 // * clean up all OS threads assocated with the runtime
220 // * free all memory etc.
222 // So the sequence for ^C goes like this:
224 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
225 // arranges for some Capability to wake up
227 // * all threads in the system are halted, and the zombies are
228 // placed on the run queue for cleaning up. We acquire all
229 // the capabilities in order to delete the threads, this is
230 // done by scheduleDoGC() for convenience (because GC already
231 // needs to acquire all the capabilities). We can't kill
232 // threads involved in foreign calls.
234 // * somebody calls shutdownHaskell(), which calls exitScheduler()
236 // * sched_state := SCHED_SHUTTING_DOWN
238 // * all workers exit when the run queue on their capability
239 // drains. All main threads will also exit when their TSO
240 // reaches the head of the run queue and they can return.
242 // * eventually all Capabilities will shut down, and the RTS can
245 // * We might be left with threads blocked in foreign calls,
246 // we should really attempt to kill these somehow (TODO);
248 switch (sched_state
) {
251 case SCHED_INTERRUPTING
:
252 debugTrace(DEBUG_sched
, "SCHED_INTERRUPTING");
253 /* scheduleDoGC() deletes all the threads */
254 scheduleDoGC(&cap
,task
,rtsFalse
);
256 // after scheduleDoGC(), we must be shutting down. Either some
257 // other Capability did the final GC, or we did it above,
258 // either way we can fall through to the SCHED_SHUTTING_DOWN
260 ASSERT(sched_state
== SCHED_SHUTTING_DOWN
);
263 case SCHED_SHUTTING_DOWN
:
264 debugTrace(DEBUG_sched
, "SCHED_SHUTTING_DOWN");
265 // If we are a worker, just exit. If we're a bound thread
266 // then we will exit below when we've removed our TSO from
268 if (!isBoundTask(task
) && emptyRunQueue(cap
)) {
273 barf("sched_state: %d", sched_state
);
276 scheduleFindWork(&cap
);
278 /* work pushing, currently relevant only for THREADED_RTS:
279 (pushes threads, wakes up idle capabilities for stealing) */
280 schedulePushWork(cap
,task
);
282 scheduleDetectDeadlock(&cap
,task
);
284 // Normally, the only way we can get here with no threads to
285 // run is if a keyboard interrupt received during
286 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
287 // Additionally, it is not fatal for the
288 // threaded RTS to reach here with no threads to run.
290 // win32: might be here due to awaitEvent() being abandoned
291 // as a result of a console event having been delivered.
293 #if defined(THREADED_RTS)
297 // // don't yield the first time, we want a chance to run this
298 // // thread for a bit, even if there are others banging at the
301 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
304 scheduleYield(&cap
,task
);
306 if (emptyRunQueue(cap
)) continue; // look for work again
309 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
310 if ( emptyRunQueue(cap
) ) {
311 ASSERT(sched_state
>= SCHED_INTERRUPTING
);
316 // Get a thread to run
318 t
= popRunQueue(cap
);
320 // Sanity check the thread we're about to run. This can be
321 // expensive if there is lots of thread switching going on...
322 IF_DEBUG(sanity
,checkTSO(t
));
324 #if defined(THREADED_RTS)
325 // Check whether we can run this thread in the current task.
326 // If not, we have to pass our capability to the right task.
328 InCall
*bound
= t
->bound
;
331 if (bound
->task
== task
) {
332 // yes, the Haskell thread is bound to the current native thread
334 debugTrace(DEBUG_sched
,
335 "thread %lu bound to another OS thread",
336 (unsigned long)t
->id
);
337 // no, bound to a different Haskell thread: pass to that thread
338 pushOnRunQueue(cap
,t
);
342 // The thread we want to run is unbound.
343 if (task
->incall
->tso
) {
344 debugTrace(DEBUG_sched
,
345 "this OS thread cannot run thread %lu",
346 (unsigned long)t
->id
);
347 // no, the current native thread is bound to a different
348 // Haskell thread, so pass it to any worker thread
349 pushOnRunQueue(cap
,t
);
356 // If we're shutting down, and this thread has not yet been
357 // killed, kill it now. This sometimes happens when a finalizer
358 // thread is created by the final GC, or a thread previously
359 // in a foreign call returns.
360 if (sched_state
>= SCHED_INTERRUPTING
&&
361 !(t
->what_next
== ThreadComplete
|| t
->what_next
== ThreadKilled
)) {
365 // If this capability is disabled, migrate the thread away rather
366 // than running it. NB. but not if the thread is bound: it is
367 // really hard for a bound thread to migrate itself. Believe me,
368 // I tried several ways and couldn't find a way to do it.
369 // Instead, when everything is stopped for GC, we migrate all the
370 // threads on the run queue then (see scheduleDoGC()).
372 // ToDo: what about TSO_LOCKED? Currently we're migrating those
373 // when the number of capabilities drops, but we never migrate
374 // them back if it rises again. Presumably we should, but after
375 // the thread has been migrated we no longer know what capability
376 // it was originally on.
378 if (cap
->disabled
&& !t
->bound
) {
379 Capability
*dest_cap
= capabilities
[cap
->no
% enabled_capabilities
];
380 migrateThread(cap
, t
, dest_cap
);
385 /* context switches are initiated by the timer signal, unless
386 * the user specified "context switch as often as possible", with
389 if (RtsFlags
.ConcFlags
.ctxtSwitchTicks
== 0
390 && !emptyThreadQueues(cap
)) {
391 cap
->context_switch
= 1;
396 // CurrentTSO is the thread to run. It might be different if we
397 // loop back to run_thread, so make sure to set CurrentTSO after
399 cap
->r
.rCurrentTSO
= t
;
401 startHeapProfTimer();
403 // ----------------------------------------------------------------------
404 // Run the current thread
406 ASSERT_FULL_CAPABILITY_INVARIANTS(cap
,task
);
407 ASSERT(t
->cap
== cap
);
408 ASSERT(t
->bound ? t
->bound
->task
->cap
== cap
: 1);
410 prev_what_next
= t
->what_next
;
412 errno
= t
->saved_errno
;
414 SetLastError(t
->saved_winerror
);
417 // reset the interrupt flag before running Haskell code
420 cap
->in_haskell
= rtsTrue
;
424 dirty_STACK(cap
,t
->stackobj
);
426 switch (recent_activity
)
428 case ACTIVITY_DONE_GC
: {
429 // ACTIVITY_DONE_GC means we turned off the timer signal to
430 // conserve power (see #1623). Re-enable it here.
432 prev
= xchg((P_
)&recent_activity
, ACTIVITY_YES
);
433 if (prev
== ACTIVITY_DONE_GC
) {
440 case ACTIVITY_INACTIVE
:
441 // If we reached ACTIVITY_INACTIVE, then don't reset it until
442 // we've done the GC. The thread running here might just be
443 // the IO manager thread that handle_tick() woke up via
447 recent_activity
= ACTIVITY_YES
;
450 traceEventRunThread(cap
, t
);
452 switch (prev_what_next
) {
456 /* Thread already finished, return to scheduler. */
457 ret
= ThreadFinished
;
463 r
= StgRun((StgFunPtr
) stg_returnToStackTop
, &cap
->r
);
464 cap
= regTableToCapability(r
);
469 case ThreadInterpret
:
470 cap
= interpretBCO(cap
);
475 barf("schedule: invalid what_next field");
478 cap
->in_haskell
= rtsFalse
;
480 // The TSO might have moved, eg. if it re-entered the RTS and a GC
481 // happened. So find the new location:
482 t
= cap
->r
.rCurrentTSO
;
484 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
485 // don't want it set when not running a Haskell thread.
486 cap
->r
.rCurrentTSO
= NULL
;
488 // And save the current errno in this thread.
489 // XXX: possibly bogus for SMP because this thread might already
490 // be running again, see code below.
491 t
->saved_errno
= errno
;
493 // Similarly for Windows error code
494 t
->saved_winerror
= GetLastError();
497 if (ret
== ThreadBlocked
) {
498 if (t
->why_blocked
== BlockedOnBlackHole
) {
499 StgTSO
*owner
= blackHoleOwner(t
->block_info
.bh
->bh
);
500 traceEventStopThread(cap
, t
, t
->why_blocked
+ 6,
501 owner
!= NULL ? owner
->id
: 0);
503 traceEventStopThread(cap
, t
, t
->why_blocked
+ 6, 0);
506 traceEventStopThread(cap
, t
, ret
, 0);
509 ASSERT_FULL_CAPABILITY_INVARIANTS(cap
,task
);
510 ASSERT(t
->cap
== cap
);
512 // ----------------------------------------------------------------------
514 // Costs for the scheduler are assigned to CCS_SYSTEM
516 #if defined(PROFILING)
517 cap
->r
.rCCCS
= CCS_SYSTEM
;
520 schedulePostRunThread(cap
,t
);
522 ready_to_gc
= rtsFalse
;
526 ready_to_gc
= scheduleHandleHeapOverflow(cap
,t
);
530 // just adjust the stack for this thread, then pop it back
532 threadStackOverflow(cap
, t
);
533 pushOnRunQueue(cap
,t
);
537 if (scheduleHandleYield(cap
, t
, prev_what_next
)) {
538 // shortcut for switching between compiler/interpreter:
544 scheduleHandleThreadBlocked(t
);
548 if (scheduleHandleThreadFinished(cap
, task
, t
)) return cap
;
549 ASSERT_FULL_CAPABILITY_INVARIANTS(cap
,task
);
553 barf("schedule: invalid thread return code %d", (int)ret
);
556 if (ready_to_gc
|| scheduleNeedHeapProfile(ready_to_gc
)) {
557 scheduleDoGC(&cap
,task
,rtsFalse
);
559 } /* end of while() */
562 /* -----------------------------------------------------------------------------
563 * Run queue operations
564 * -------------------------------------------------------------------------- */
567 removeFromRunQueue (Capability
*cap
, StgTSO
*tso
)
569 if (tso
->block_info
.prev
== END_TSO_QUEUE
) {
570 ASSERT(cap
->run_queue_hd
== tso
);
571 cap
->run_queue_hd
= tso
->_link
;
573 setTSOLink(cap
, tso
->block_info
.prev
, tso
->_link
);
575 if (tso
->_link
== END_TSO_QUEUE
) {
576 ASSERT(cap
->run_queue_tl
== tso
);
577 cap
->run_queue_tl
= tso
->block_info
.prev
;
579 setTSOPrev(cap
, tso
->_link
, tso
->block_info
.prev
);
581 tso
->_link
= tso
->block_info
.prev
= END_TSO_QUEUE
;
583 IF_DEBUG(sanity
, checkRunQueue(cap
));
587 promoteInRunQueue (Capability
*cap
, StgTSO
*tso
)
589 removeFromRunQueue(cap
, tso
);
590 pushOnRunQueue(cap
, tso
);
593 /* ----------------------------------------------------------------------------
594 * Setting up the scheduler loop
595 * ------------------------------------------------------------------------- */
598 schedulePreLoop(void)
600 // initialisation for scheduler - what cannot go into initScheduler()
602 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
607 /* -----------------------------------------------------------------------------
610 * Search for work to do, and handle messages from elsewhere.
611 * -------------------------------------------------------------------------- */
614 scheduleFindWork (Capability
**pcap
)
616 scheduleStartSignalHandlers(*pcap
);
618 scheduleProcessInbox(pcap
);
620 scheduleCheckBlockedThreads(*pcap
);
622 #if defined(THREADED_RTS)
623 if (emptyRunQueue(*pcap
)) { scheduleActivateSpark(*pcap
); }
627 #if defined(THREADED_RTS)
628 STATIC_INLINE rtsBool
629 shouldYieldCapability (Capability
*cap
, Task
*task
, rtsBool didGcLast
)
631 // we need to yield this capability to someone else if..
632 // - another thread is initiating a GC, and we didn't just do a GC
633 // (see Note [GC livelock])
634 // - another Task is returning from a foreign call
635 // - the thread at the head of the run queue cannot be run
636 // by this Task (it is bound to another Task, or it is unbound
637 // and this task it bound).
639 // Note [GC livelock]
641 // If we are interrupted to do a GC, then we do not immediately do
642 // another one. This avoids a starvation situation where one
643 // Capability keeps forcing a GC and the other Capabilities make no
646 return ((pending_sync
&& !didGcLast
) ||
647 cap
->returning_tasks_hd
!= NULL
||
648 (!emptyRunQueue(cap
) && (task
->incall
->tso
== NULL
649 ?
peekRunQueue(cap
)->bound
!= NULL
650 : peekRunQueue(cap
)->bound
!= task
->incall
)));
653 // This is the single place where a Task goes to sleep. There are
654 // two reasons it might need to sleep:
655 // - there are no threads to run
656 // - we need to yield this Capability to someone else
657 // (see shouldYieldCapability())
659 // Careful: the scheduler loop is quite delicate. Make sure you run
660 // the tests in testsuite/concurrent (all ways) after modifying this,
661 // and also check the benchmarks in nofib/parallel for regressions.
664 scheduleYield (Capability
**pcap
, Task
*task
)
666 Capability
*cap
= *pcap
;
667 int didGcLast
= rtsFalse
;
669 // if we have work, and we don't need to give up the Capability, continue.
671 if (!shouldYieldCapability(cap
,task
,rtsFalse
) &&
672 (!emptyRunQueue(cap
) ||
674 sched_state
>= SCHED_INTERRUPTING
)) {
678 // otherwise yield (sleep), and keep yielding if necessary.
680 didGcLast
= yieldCapability(&cap
,task
, !didGcLast
);
682 while (shouldYieldCapability(cap
,task
,didGcLast
));
684 // note there may still be no threads on the run queue at this
685 // point, the caller has to check.
692 /* -----------------------------------------------------------------------------
695 * Push work to other Capabilities if we have some.
696 * -------------------------------------------------------------------------- */
699 schedulePushWork(Capability
*cap USED_IF_THREADS
,
700 Task
*task USED_IF_THREADS
)
702 /* following code not for PARALLEL_HASKELL. I kept the call general,
703 future GUM versions might use pushing in a distributed setup */
704 #if defined(THREADED_RTS)
706 Capability
*free_caps
[n_capabilities
], *cap0
;
709 // migration can be turned off with +RTS -qm
710 if (!RtsFlags
.ParFlags
.migrate
) return;
712 // Check whether we have more threads on our run queue, or sparks
713 // in our pool, that we could hand to another Capability.
714 if (emptyRunQueue(cap
)) {
715 if (sparkPoolSizeCap(cap
) < 2) return;
717 if (singletonRunQueue(cap
) &&
718 sparkPoolSizeCap(cap
) < 1) return;
721 // First grab as many free Capabilities as we can.
722 for (i
=0, n_free_caps
=0; i
< n_capabilities
; i
++) {
723 cap0
= capabilities
[i
];
724 if (cap
!= cap0
&& !cap0
->disabled
&& tryGrabCapability(cap0
,task
)) {
725 if (!emptyRunQueue(cap0
)
726 || cap0
->returning_tasks_hd
!= NULL
727 || cap0
->inbox
!= (Message
*)END_TSO_QUEUE
) {
728 // it already has some work, we just grabbed it at
729 // the wrong moment. Or maybe it's deadlocked!
730 releaseCapability(cap0
);
732 free_caps
[n_free_caps
++] = cap0
;
737 // we now have n_free_caps free capabilities stashed in
738 // free_caps[]. Share our run queue equally with them. This is
739 // probably the simplest thing we could do; improvements we might
740 // want to do include:
742 // - giving high priority to moving relatively new threads, on
743 // the gournds that they haven't had time to build up a
744 // working set in the cache on this CPU/Capability.
746 // - giving low priority to moving long-lived threads
748 if (n_free_caps
> 0) {
749 StgTSO
*prev
, *t
, *next
;
751 rtsBool pushed_to_all
;
754 debugTrace(DEBUG_sched
,
755 "cap %d: %s and %d free capabilities, sharing...",
757 (!emptyRunQueue(cap
) && !singletonRunQueue(cap
))?
758 "excess threads on run queue":"sparks to share (>=2)",
763 pushed_to_all
= rtsFalse
;
766 if (cap
->run_queue_hd
!= END_TSO_QUEUE
) {
767 prev
= cap
->run_queue_hd
;
769 prev
->_link
= END_TSO_QUEUE
;
770 for (; t
!= END_TSO_QUEUE
; t
= next
) {
772 t
->_link
= END_TSO_QUEUE
;
773 if (t
->bound
== task
->incall
// don't move my bound thread
774 || tsoLocked(t
)) { // don't move a locked thread
775 setTSOLink(cap
, prev
, t
);
776 setTSOPrev(cap
, t
, prev
);
778 } else if (i
== n_free_caps
) {
780 pushed_to_all
= rtsTrue
;
784 setTSOLink(cap
, prev
, t
);
785 setTSOPrev(cap
, t
, prev
);
788 appendToRunQueue(free_caps
[i
],t
);
790 traceEventMigrateThread (cap
, t
, free_caps
[i
]->no
);
792 if (t
->bound
) { t
->bound
->task
->cap
= free_caps
[i
]; }
793 t
->cap
= free_caps
[i
];
797 cap
->run_queue_tl
= prev
;
799 IF_DEBUG(sanity
, checkRunQueue(cap
));
803 /* JB I left this code in place, it would work but is not necessary */
805 // If there are some free capabilities that we didn't push any
806 // threads to, then try to push a spark to each one.
807 if (!pushed_to_all
) {
809 // i is the next free capability to push to
810 for (; i
< n_free_caps
; i
++) {
811 if (emptySparkPoolCap(free_caps
[i
])) {
812 spark
= tryStealSpark(cap
->sparks
);
814 /* TODO: if anyone wants to re-enable this code then
815 * they must consider the fizzledSpark(spark) case
816 * and update the per-cap spark statistics.
818 debugTrace(DEBUG_sched
, "pushing spark %p to capability %d", spark
, free_caps
[i
]->no
);
820 traceEventStealSpark(free_caps
[i
], t
, cap
->no
);
822 newSpark(&(free_caps
[i
]->r
), spark
);
827 #endif /* SPARK_PUSHING */
829 // release the capabilities
830 for (i
= 0; i
< n_free_caps
; i
++) {
831 task
->cap
= free_caps
[i
];
832 releaseAndWakeupCapability(free_caps
[i
]);
835 task
->cap
= cap
; // reset to point to our Capability.
837 #endif /* THREADED_RTS */
841 /* ----------------------------------------------------------------------------
842 * Start any pending signal handlers
843 * ------------------------------------------------------------------------- */
845 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
847 scheduleStartSignalHandlers(Capability
*cap
)
849 if (RtsFlags
.MiscFlags
.install_signal_handlers
&& signals_pending()) {
850 // safe outside the lock
851 startSignalHandlers(cap
);
856 scheduleStartSignalHandlers(Capability
*cap STG_UNUSED
)
861 /* ----------------------------------------------------------------------------
862 * Check for blocked threads that can be woken up.
863 * ------------------------------------------------------------------------- */
866 scheduleCheckBlockedThreads(Capability
*cap USED_IF_NOT_THREADS
)
868 #if !defined(THREADED_RTS)
870 // Check whether any waiting threads need to be woken up. If the
871 // run queue is empty, and there are no other tasks running, we
872 // can wait indefinitely for something to happen.
874 if ( !emptyQueue(blocked_queue_hd
) || !emptyQueue(sleeping_queue
) )
876 awaitEvent (emptyRunQueue(cap
));
881 /* ----------------------------------------------------------------------------
882 * Detect deadlock conditions and attempt to resolve them.
883 * ------------------------------------------------------------------------- */
886 scheduleDetectDeadlock (Capability
**pcap
, Task
*task
)
888 Capability
*cap
= *pcap
;
890 * Detect deadlock: when we have no threads to run, there are no
891 * threads blocked, waiting for I/O, or sleeping, and all the
892 * other tasks are waiting for work, we must have a deadlock of
895 if ( emptyThreadQueues(cap
) )
897 #if defined(THREADED_RTS)
899 * In the threaded RTS, we only check for deadlock if there
900 * has been no activity in a complete timeslice. This means
901 * we won't eagerly start a full GC just because we don't have
902 * any threads to run currently.
904 if (recent_activity
!= ACTIVITY_INACTIVE
) return;
907 debugTrace(DEBUG_sched
, "deadlocked, forcing major GC...");
909 // Garbage collection can release some new threads due to
910 // either (a) finalizers or (b) threads resurrected because
911 // they are unreachable and will therefore be sent an
912 // exception. Any threads thus released will be immediately
914 scheduleDoGC (pcap
, task
, rtsTrue
/*force major GC*/);
916 // when force_major == rtsTrue. scheduleDoGC sets
917 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
920 if ( !emptyRunQueue(cap
) ) return;
922 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
923 /* If we have user-installed signal handlers, then wait
924 * for signals to arrive rather then bombing out with a
927 if ( RtsFlags
.MiscFlags
.install_signal_handlers
&& anyUserHandlers() ) {
928 debugTrace(DEBUG_sched
,
929 "still deadlocked, waiting for signals...");
933 if (signals_pending()) {
934 startSignalHandlers(cap
);
937 // either we have threads to run, or we were interrupted:
938 ASSERT(!emptyRunQueue(cap
) || sched_state
>= SCHED_INTERRUPTING
);
944 #if !defined(THREADED_RTS)
945 /* Probably a real deadlock. Send the current main thread the
946 * Deadlock exception.
948 if (task
->incall
->tso
) {
949 switch (task
->incall
->tso
->why_blocked
) {
951 case BlockedOnBlackHole
:
952 case BlockedOnMsgThrowTo
:
954 case BlockedOnMVarRead
:
955 throwToSingleThreaded(cap
, task
->incall
->tso
,
956 (StgClosure
*)nonTermination_closure
);
959 barf("deadlock: main thread blocked in a strange way");
968 /* ----------------------------------------------------------------------------
969 * Send pending messages (PARALLEL_HASKELL only)
970 * ------------------------------------------------------------------------- */
972 #if defined(PARALLEL_HASKELL)
974 scheduleSendPendingMessages(void)
977 # if defined(PAR) // global Mem.Mgmt., omit for now
978 if (PendingFetches
!= END_BF_QUEUE
) {
983 if (RtsFlags
.ParFlags
.BufferTime
) {
984 // if we use message buffering, we must send away all message
985 // packets which have become too old...
991 /* ----------------------------------------------------------------------------
992 * Process message in the current Capability's inbox
993 * ------------------------------------------------------------------------- */
996 scheduleProcessInbox (Capability
**pcap USED_IF_THREADS
)
998 #if defined(THREADED_RTS)
1001 Capability
*cap
= *pcap
;
1003 while (!emptyInbox(cap
)) {
1004 if (cap
->r
.rCurrentNursery
->link
== NULL
||
1005 g0
->n_new_large_words
>= large_alloc_lim
) {
1006 scheduleDoGC(pcap
, cap
->running_task
, rtsFalse
);
1010 // don't use a blocking acquire; if the lock is held by
1011 // another thread then just carry on. This seems to avoid
1012 // getting stuck in a message ping-pong situation with other
1013 // processors. We'll check the inbox again later anyway.
1015 // We should really use a more efficient queue data structure
1016 // here. The trickiness is that we must ensure a Capability
1017 // never goes idle if the inbox is non-empty, which is why we
1018 // use cap->lock (cap->lock is released as the last thing
1019 // before going idle; see Capability.c:releaseCapability()).
1020 r
= TRY_ACQUIRE_LOCK(&cap
->lock
);
1024 cap
->inbox
= (Message
*)END_TSO_QUEUE
;
1026 RELEASE_LOCK(&cap
->lock
);
1028 while (m
!= (Message
*)END_TSO_QUEUE
) {
1030 executeMessage(cap
, m
);
1037 /* ----------------------------------------------------------------------------
1038 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1039 * ------------------------------------------------------------------------- */
1041 #if defined(THREADED_RTS)
1043 scheduleActivateSpark(Capability
*cap
)
1045 if (anySparks() && !cap
->disabled
)
1047 createSparkThread(cap
);
1048 debugTrace(DEBUG_sched
, "creating a spark thread");
1051 #endif // PARALLEL_HASKELL || THREADED_RTS
1053 /* ----------------------------------------------------------------------------
1054 * After running a thread...
1055 * ------------------------------------------------------------------------- */
1058 schedulePostRunThread (Capability
*cap
, StgTSO
*t
)
1060 // We have to be able to catch transactions that are in an
1061 // infinite loop as a result of seeing an inconsistent view of
1065 // [a,b] <- mapM readTVar [ta,tb]
1066 // when (a == b) loop
1068 // and a is never equal to b given a consistent view of memory.
1070 if (t
-> trec
!= NO_TREC
&& t
-> why_blocked
== NotBlocked
) {
1071 if (!stmValidateNestOfTransactions(cap
, t
-> trec
)) {
1072 debugTrace(DEBUG_sched
| DEBUG_stm
,
1073 "trec %p found wasting its time", t
);
1075 // strip the stack back to the
1076 // ATOMICALLY_FRAME, aborting the (nested)
1077 // transaction, and saving the stack of any
1078 // partially-evaluated thunks on the heap.
1079 throwToSingleThreaded_(cap
, t
, NULL
, rtsTrue
);
1081 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1086 // If the current thread's allocation limit has run out, send it
1087 // the AllocationLimitExceeded exception.
1089 if (PK_Int64((W_
*)&(t
->alloc_limit
)) < 0 && (t
->flags
& TSO_ALLOC_LIMIT
)) {
1090 // Use a throwToSelf rather than a throwToSingleThreaded, because
1091 // it correctly handles the case where the thread is currently
1092 // inside mask. Also the thread might be blocked (e.g. on an
1093 // MVar), and throwToSingleThreaded doesn't unblock it
1094 // correctly in that case.
1095 throwToSelf(cap
, t
, allocationLimitExceeded_closure
);
1096 ASSIGN_Int64((W_
*)&(t
->alloc_limit
),
1097 (StgInt64
)RtsFlags
.GcFlags
.allocLimitGrace
* BLOCK_SIZE
);
1100 /* some statistics gathering in the parallel case */
1103 /* -----------------------------------------------------------------------------
1104 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1105 * -------------------------------------------------------------------------- */
1108 scheduleHandleHeapOverflow( Capability
*cap
, StgTSO
*t
)
1110 // did the task ask for a large block?
1111 if (cap
->r
.rHpAlloc
> BLOCK_SIZE
) {
1112 // if so, get one and push it on the front of the nursery.
1116 blocks
= (W_
)BLOCK_ROUND_UP(cap
->r
.rHpAlloc
) / BLOCK_SIZE
;
1118 if (blocks
> BLOCKS_PER_MBLOCK
) {
1119 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap
->r
.rHpAlloc
);
1122 debugTrace(DEBUG_sched
,
1123 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1124 (long)t
->id
, what_next_strs
[t
->what_next
], blocks
);
1126 // don't do this if the nursery is (nearly) full, we'll GC first.
1127 if (cap
->r
.rCurrentNursery
->link
!= NULL
||
1128 cap
->r
.rNursery
->n_blocks
== 1) { // paranoia to prevent
1129 // infinite loop if the
1130 // nursery has only one
1133 bd
= allocGroup_lock(blocks
);
1134 cap
->r
.rNursery
->n_blocks
+= blocks
;
1136 // link the new group after CurrentNursery
1137 dbl_link_insert_after(bd
, cap
->r
.rCurrentNursery
);
1139 // initialise it as a nursery block. We initialise the
1140 // step, gen_no, and flags field of *every* sub-block in
1141 // this large block, because this is easier than making
1142 // sure that we always find the block head of a large
1143 // block whenever we call Bdescr() (eg. evacuate() and
1144 // isAlive() in the GC would both have to do this, at
1148 for (x
= bd
; x
< bd
+ blocks
; x
++) {
1149 initBdescr(x
,g0
,g0
);
1155 // This assert can be a killer if the app is doing lots
1156 // of large block allocations.
1157 IF_DEBUG(sanity
, checkNurserySanity(cap
->r
.rNursery
));
1159 // now update the nursery to point to the new block
1160 finishedNurseryBlock(cap
, cap
->r
.rCurrentNursery
);
1161 cap
->r
.rCurrentNursery
= bd
;
1163 // we might be unlucky and have another thread get on the
1164 // run queue before us and steal the large block, but in that
1165 // case the thread will just end up requesting another large
1167 pushOnRunQueue(cap
,t
);
1168 return rtsFalse
; /* not actually GC'ing */
1172 if (getNewNursery(cap
)) {
1173 debugTrace(DEBUG_sched
, "thread %ld got a new nursery", t
->id
);
1174 pushOnRunQueue(cap
,t
);
1178 if (cap
->r
.rHpLim
== NULL
|| cap
->context_switch
) {
1179 // Sometimes we miss a context switch, e.g. when calling
1180 // primitives in a tight loop, MAYBE_GC() doesn't check the
1181 // context switch flag, and we end up waiting for a GC.
1182 // See #1984, and concurrent/should_run/1984
1183 cap
->context_switch
= 0;
1184 appendToRunQueue(cap
,t
);
1186 pushOnRunQueue(cap
,t
);
1189 /* actual GC is done at the end of the while loop in schedule() */
1192 /* -----------------------------------------------------------------------------
1193 * Handle a thread that returned to the scheduler with ThreadYielding
1194 * -------------------------------------------------------------------------- */
1197 scheduleHandleYield( Capability
*cap
, StgTSO
*t
, nat prev_what_next
)
1199 /* put the thread back on the run queue. Then, if we're ready to
1200 * GC, check whether this is the last task to stop. If so, wake
1201 * up the GC thread. getThread will block during a GC until the
1205 ASSERT(t
->_link
== END_TSO_QUEUE
);
1207 // Shortcut if we're just switching evaluators: don't bother
1208 // doing stack squeezing (which can be expensive), just run the
1210 if (cap
->context_switch
== 0 && t
->what_next
!= prev_what_next
) {
1211 debugTrace(DEBUG_sched
,
1212 "--<< thread %ld (%s) stopped to switch evaluators",
1213 (long)t
->id
, what_next_strs
[t
->what_next
]);
1217 // Reset the context switch flag. We don't do this just before
1218 // running the thread, because that would mean we would lose ticks
1219 // during GC, which can lead to unfair scheduling (a thread hogs
1220 // the CPU because the tick always arrives during GC). This way
1221 // penalises threads that do a lot of allocation, but that seems
1222 // better than the alternative.
1223 if (cap
->context_switch
!= 0) {
1224 cap
->context_switch
= 0;
1225 appendToRunQueue(cap
,t
);
1227 pushOnRunQueue(cap
,t
);
1231 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1237 /* -----------------------------------------------------------------------------
1238 * Handle a thread that returned to the scheduler with ThreadBlocked
1239 * -------------------------------------------------------------------------- */
1242 scheduleHandleThreadBlocked( StgTSO
*t
1249 // We don't need to do anything. The thread is blocked, and it
1250 // has tidied up its stack and placed itself on whatever queue
1251 // it needs to be on.
1253 // ASSERT(t->why_blocked != NotBlocked);
1254 // Not true: for example,
1255 // - the thread may have woken itself up already, because
1256 // threadPaused() might have raised a blocked throwTo
1257 // exception, see maybePerformBlockedException().
1260 traceThreadStatus(DEBUG_sched
, t
);
1264 /* -----------------------------------------------------------------------------
1265 * Handle a thread that returned to the scheduler with ThreadFinished
1266 * -------------------------------------------------------------------------- */
1269 scheduleHandleThreadFinished (Capability
*cap STG_UNUSED
, Task
*task
, StgTSO
*t
)
1271 /* Need to check whether this was a main thread, and if so,
1272 * return with the return value.
1274 * We also end up here if the thread kills itself with an
1275 * uncaught exception, see Exception.cmm.
1278 // blocked exceptions can now complete, even if the thread was in
1279 // blocked mode (see #2910).
1280 awakenBlockedExceptionQueue (cap
, t
);
1283 // Check whether the thread that just completed was a bound
1284 // thread, and if so return with the result.
1286 // There is an assumption here that all thread completion goes
1287 // through this point; we need to make sure that if a thread
1288 // ends up in the ThreadKilled state, that it stays on the run
1289 // queue so it can be dealt with here.
1294 if (t
->bound
!= task
->incall
) {
1295 #if !defined(THREADED_RTS)
1296 // Must be a bound thread that is not the topmost one. Leave
1297 // it on the run queue until the stack has unwound to the
1298 // point where we can deal with this. Leaving it on the run
1299 // queue also ensures that the garbage collector knows about
1300 // this thread and its return value (it gets dropped from the
1301 // step->threads list so there's no other way to find it).
1302 appendToRunQueue(cap
,t
);
1305 // this cannot happen in the threaded RTS, because a
1306 // bound thread can only be run by the appropriate Task.
1307 barf("finished bound thread that isn't mine");
1311 ASSERT(task
->incall
->tso
== t
);
1313 if (t
->what_next
== ThreadComplete
) {
1314 if (task
->incall
->ret
) {
1315 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1316 *(task
->incall
->ret
) = (StgClosure
*)task
->incall
->tso
->stackobj
->sp
[1];
1318 task
->incall
->stat
= Success
;
1320 if (task
->incall
->ret
) {
1321 *(task
->incall
->ret
) = NULL
;
1323 if (sched_state
>= SCHED_INTERRUPTING
) {
1324 if (heap_overflow
) {
1325 task
->incall
->stat
= HeapExhausted
;
1327 task
->incall
->stat
= Interrupted
;
1330 task
->incall
->stat
= Killed
;
1334 removeThreadLabel((StgWord
)task
->incall
->tso
->id
);
1337 // We no longer consider this thread and task to be bound to
1338 // each other. The TSO lives on until it is GC'd, but the
1339 // task is about to be released by the caller, and we don't
1340 // want anyone following the pointer from the TSO to the
1341 // defunct task (which might have already been
1342 // re-used). This was a real bug: the GC updated
1343 // tso->bound->tso which lead to a deadlock.
1345 task
->incall
->tso
= NULL
;
1347 return rtsTrue
; // tells schedule() to return
1353 /* -----------------------------------------------------------------------------
1354 * Perform a heap census
1355 * -------------------------------------------------------------------------- */
1358 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED
)
1360 // When we have +RTS -i0 and we're heap profiling, do a census at
1361 // every GC. This lets us get repeatable runs for debugging.
1362 if (performHeapProfile
||
1363 (RtsFlags
.ProfFlags
.heapProfileInterval
==0 &&
1364 RtsFlags
.ProfFlags
.doHeapProfile
&& ready_to_gc
)) {
1371 /* -----------------------------------------------------------------------------
1372 * Start a synchronisation of all capabilities
1373 * -------------------------------------------------------------------------- */
1376 // 0 if we successfully got a sync
1377 // non-0 if there was another sync request in progress,
1378 // and we yielded to it. The value returned is the
1379 // type of the other sync request.
1381 #if defined(THREADED_RTS)
1382 static nat
requestSync (Capability
**pcap
, Task
*task
, nat sync_type
)
1384 nat prev_pending_sync
;
1386 prev_pending_sync
= cas(&pending_sync
, 0, sync_type
);
1388 if (prev_pending_sync
)
1391 debugTrace(DEBUG_sched
, "someone else is trying to sync (%d)...",
1394 yieldCapability(pcap
,task
,rtsTrue
);
1395 } while (pending_sync
);
1396 return prev_pending_sync
; // NOTE: task->cap might have changed now
1405 // Grab all the capabilities except the one we already hold. Used
1406 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1407 // before a fork (SYNC_OTHER).
1409 // Only call this after requestSync(), otherwise a deadlock might
1410 // ensue if another thread is trying to synchronise.
1412 static void acquireAllCapabilities(Capability
*cap
, Task
*task
)
1417 for (i
=0; i
< n_capabilities
; i
++) {
1418 debugTrace(DEBUG_sched
, "grabbing all the capabilies (%d/%d)", i
, n_capabilities
);
1419 tmpcap
= capabilities
[i
];
1420 if (tmpcap
!= cap
) {
1421 // we better hope this task doesn't get migrated to
1422 // another Capability while we're waiting for this one.
1423 // It won't, because load balancing happens while we have
1424 // all the Capabilities, but even so it's a slightly
1425 // unsavoury invariant.
1427 waitForReturnCapability(&tmpcap
, task
);
1428 if (tmpcap
->no
!= i
) {
1429 barf("acquireAllCapabilities: got the wrong capability");
1436 static void releaseAllCapabilities(nat n
, Capability
*cap
, Task
*task
)
1440 for (i
= 0; i
< n
; i
++) {
1442 task
->cap
= capabilities
[i
];
1443 releaseCapability(capabilities
[i
]);
1450 /* -----------------------------------------------------------------------------
1451 * Perform a garbage collection if necessary
1452 * -------------------------------------------------------------------------- */
1455 scheduleDoGC (Capability
**pcap
, Task
*task USED_IF_THREADS
,
1456 rtsBool force_major
)
1458 Capability
*cap
= *pcap
;
1459 rtsBool heap_census
;
1467 if (sched_state
== SCHED_SHUTTING_DOWN
) {
1468 // The final GC has already been done, and the system is
1469 // shutting down. We'll probably deadlock if we try to GC
1474 heap_census
= scheduleNeedHeapProfile(rtsTrue
);
1476 // Figure out which generation we are collecting, so that we can
1477 // decide whether this is a parallel GC or not.
1478 collect_gen
= calcNeeded(force_major
|| heap_census
, NULL
);
1481 if (sched_state
< SCHED_INTERRUPTING
1482 && RtsFlags
.ParFlags
.parGcEnabled
1483 && collect_gen
>= RtsFlags
.ParFlags
.parGcGen
1484 && ! oldest_gen
->mark
)
1486 gc_type
= SYNC_GC_PAR
;
1488 gc_type
= SYNC_GC_SEQ
;
1491 // In order to GC, there must be no threads running Haskell code.
1492 // Therefore, the GC thread needs to hold *all* the capabilities,
1493 // and release them after the GC has completed.
1495 // This seems to be the simplest way: previous attempts involved
1496 // making all the threads with capabilities give up their
1497 // capabilities and sleep except for the *last* one, which
1498 // actually did the GC. But it's quite hard to arrange for all
1499 // the other tasks to sleep and stay asleep.
1502 /* Other capabilities are prevented from running yet more Haskell
1503 threads if pending_sync is set. Tested inside
1504 yieldCapability() and releaseCapability() in Capability.c */
1507 sync
= requestSync(pcap
, task
, gc_type
);
1509 if (sync
== SYNC_GC_SEQ
|| sync
== SYNC_GC_PAR
) {
1510 // someone else had a pending sync request for a GC, so
1511 // let's assume GC has been done and we don't need to GC
1515 if (sched_state
== SCHED_SHUTTING_DOWN
) {
1516 // The scheduler might now be shutting down. We tested
1517 // this above, but it might have become true since then as
1518 // we yielded the capability in requestSync().
1523 // don't declare this until after we have sync'd, because
1524 // n_capabilities may change.
1525 rtsBool idle_cap
[n_capabilities
];
1527 unsigned int old_n_capabilities
= n_capabilities
;
1530 interruptAllCapabilities();
1532 // The final shutdown GC is always single-threaded, because it's
1533 // possible that some of the Capabilities have no worker threads.
1535 if (gc_type
== SYNC_GC_SEQ
)
1537 traceEventRequestSeqGc(cap
);
1541 traceEventRequestParGc(cap
);
1542 debugTrace(DEBUG_sched
, "ready_to_gc, grabbing GC threads");
1545 if (gc_type
== SYNC_GC_SEQ
)
1547 // single-threaded GC: grab all the capabilities
1548 acquireAllCapabilities(cap
,task
);
1552 // If we are load-balancing collections in this
1553 // generation, then we require all GC threads to participate
1554 // in the collection. Otherwise, we only require active
1555 // threads to participate, and we set gc_threads[i]->idle for
1556 // any idle capabilities. The rationale here is that waking
1557 // up an idle Capability takes much longer than just doing any
1558 // GC work on its behalf.
1560 if (RtsFlags
.ParFlags
.parGcNoSyncWithIdle
== 0
1561 || (RtsFlags
.ParFlags
.parGcLoadBalancingEnabled
&&
1562 collect_gen
>= RtsFlags
.ParFlags
.parGcLoadBalancingGen
)) {
1563 for (i
=0; i
< n_capabilities
; i
++) {
1564 if (capabilities
[i
]->disabled
) {
1565 idle_cap
[i
] = tryGrabCapability(capabilities
[i
], task
);
1567 idle_cap
[i
] = rtsFalse
;
1571 for (i
=0; i
< n_capabilities
; i
++) {
1572 if (capabilities
[i
]->disabled
) {
1573 idle_cap
[i
] = tryGrabCapability(capabilities
[i
], task
);
1574 } else if (i
== cap
->no
||
1575 capabilities
[i
]->idle
< RtsFlags
.ParFlags
.parGcNoSyncWithIdle
) {
1576 idle_cap
[i
] = rtsFalse
;
1578 idle_cap
[i
] = tryGrabCapability(capabilities
[i
], task
);
1580 n_failed_trygrab_idles
++;
1588 // We set the gc_thread[i]->idle flag if that
1589 // capability/thread is not participating in this collection.
1590 // We also keep a local record of which capabilities are idle
1591 // in idle_cap[], because scheduleDoGC() is re-entrant:
1592 // another thread might start a GC as soon as we've finished
1593 // this one, and thus the gc_thread[]->idle flags are invalid
1594 // as soon as we release any threads after GC. Getting this
1595 // wrong leads to a rare and hard to debug deadlock!
1597 for (i
=0; i
< n_capabilities
; i
++) {
1598 gc_threads
[i
]->idle
= idle_cap
[i
];
1599 capabilities
[i
]->idle
++;
1602 // For all capabilities participating in this GC, wait until
1603 // they have stopped mutating and are standing by for GC.
1604 waitForGcThreads(cap
);
1606 #if defined(THREADED_RTS)
1607 // Stable point where we can do a global check on our spark counters
1608 ASSERT(checkSparkCountInvariant());
1614 IF_DEBUG(scheduler
, printAllThreads());
1616 delete_threads_and_gc
:
1618 * We now have all the capabilities; if we're in an interrupting
1619 * state, then we should take the opportunity to delete all the
1620 * threads in the system.
1622 if (sched_state
== SCHED_INTERRUPTING
) {
1623 deleteAllThreads(cap
);
1624 #if defined(THREADED_RTS)
1625 // Discard all the sparks from every Capability. Why?
1626 // They'll probably be GC'd anyway since we've killed all the
1627 // threads. It just avoids the GC having to do any work to
1628 // figure out that any remaining sparks are garbage.
1629 for (i
= 0; i
< n_capabilities
; i
++) {
1630 capabilities
[i
]->spark_stats
.gcd
+=
1631 sparkPoolSize(capabilities
[i
]->sparks
);
1632 // No race here since all Caps are stopped.
1633 discardSparksCap(capabilities
[i
]);
1636 sched_state
= SCHED_SHUTTING_DOWN
;
1640 * When there are disabled capabilities, we want to migrate any
1641 * threads away from them. Normally this happens in the
1642 * scheduler's loop, but only for unbound threads - it's really
1643 * hard for a bound thread to migrate itself. So we have another
1646 #if defined(THREADED_RTS)
1647 for (i
= enabled_capabilities
; i
< n_capabilities
; i
++) {
1648 Capability
*tmp_cap
, *dest_cap
;
1649 tmp_cap
= capabilities
[i
];
1650 ASSERT(tmp_cap
->disabled
);
1652 dest_cap
= capabilities
[i
% enabled_capabilities
];
1653 while (!emptyRunQueue(tmp_cap
)) {
1654 tso
= popRunQueue(tmp_cap
);
1655 migrateThread(tmp_cap
, tso
, dest_cap
);
1657 traceTaskMigrate(tso
->bound
->task
,
1658 tso
->bound
->task
->cap
,
1660 tso
->bound
->task
->cap
= dest_cap
;
1667 #if defined(THREADED_RTS)
1668 // reset pending_sync *before* GC, so that when the GC threads
1669 // emerge they don't immediately re-enter the GC.
1671 GarbageCollect(collect_gen
, heap_census
, gc_type
, cap
);
1673 GarbageCollect(collect_gen
, heap_census
, 0, cap
);
1676 traceSparkCounters(cap
);
1678 switch (recent_activity
) {
1679 case ACTIVITY_INACTIVE
:
1681 // We are doing a GC because the system has been idle for a
1682 // timeslice and we need to check for deadlock. Record the
1683 // fact that we've done a GC and turn off the timer signal;
1684 // it will get re-enabled if we run any threads after the GC.
1685 recent_activity
= ACTIVITY_DONE_GC
;
1693 case ACTIVITY_MAYBE_NO
:
1694 // the GC might have taken long enough for the timer to set
1695 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1696 // but we aren't necessarily deadlocked:
1697 recent_activity
= ACTIVITY_YES
;
1700 case ACTIVITY_DONE_GC
:
1701 // If we are actually active, the scheduler will reset the
1702 // recent_activity flag and re-enable the timer.
1706 #if defined(THREADED_RTS)
1707 // Stable point where we can do a global check on our spark counters
1708 ASSERT(checkSparkCountInvariant());
1711 // The heap census itself is done during GarbageCollect().
1713 performHeapProfile
= rtsFalse
;
1716 #if defined(THREADED_RTS)
1718 // If n_capabilities has changed during GC, we're in trouble.
1719 ASSERT(n_capabilities
== old_n_capabilities
);
1721 if (gc_type
== SYNC_GC_PAR
)
1723 releaseGCThreads(cap
);
1724 for (i
= 0; i
< n_capabilities
; i
++) {
1727 ASSERT(capabilities
[i
]->running_task
== task
);
1728 task
->cap
= capabilities
[i
];
1729 releaseCapability(capabilities
[i
]);
1731 ASSERT(capabilities
[i
]->running_task
!= task
);
1739 if (heap_overflow
&& sched_state
< SCHED_INTERRUPTING
) {
1740 // GC set the heap_overflow flag, so we should proceed with
1741 // an orderly shutdown now. Ultimately we want the main
1742 // thread to return to its caller with HeapExhausted, at which
1743 // point the caller should call hs_exit(). The first step is
1744 // to delete all the threads.
1746 // Another way to do this would be to raise an exception in
1747 // the main thread, which we really should do because it gives
1748 // the program a chance to clean up. But how do we find the
1749 // main thread? It should presumably be the same one that
1750 // gets ^C exceptions, but that's all done on the Haskell side
1751 // (GHC.TopHandler).
1752 sched_state
= SCHED_INTERRUPTING
;
1753 goto delete_threads_and_gc
;
1758 Once we are all together... this would be the place to balance all
1759 spark pools. No concurrent stealing or adding of new sparks can
1760 occur. Should be defined in Sparks.c. */
1761 balanceSparkPoolsCaps(n_capabilities
, capabilities
);
1764 #if defined(THREADED_RTS)
1765 if (gc_type
== SYNC_GC_SEQ
) {
1766 // release our stash of capabilities.
1767 releaseAllCapabilities(n_capabilities
, cap
, task
);
1774 /* ---------------------------------------------------------------------------
1775 * Singleton fork(). Do not copy any running threads.
1776 * ------------------------------------------------------------------------- */
1779 forkProcess(HsStablePtr
*entry
1780 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1785 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1796 debugTrace(DEBUG_sched
, "forking!");
1798 task
= newBoundTask();
1801 waitForReturnCapability(&cap
, task
);
1805 sync
= requestSync(&cap
, task
, SYNC_OTHER
);
1808 acquireAllCapabilities(cap
,task
);
1813 // no funny business: hold locks while we fork, otherwise if some
1814 // other thread is holding a lock when the fork happens, the data
1815 // structure protected by the lock will forever be in an
1816 // inconsistent state in the child. See also #1391.
1817 ACQUIRE_LOCK(&sched_mutex
);
1818 ACQUIRE_LOCK(&sm_mutex
);
1819 ACQUIRE_LOCK(&stable_mutex
);
1820 ACQUIRE_LOCK(&task
->lock
);
1822 for (i
=0; i
< n_capabilities
; i
++) {
1823 ACQUIRE_LOCK(&capabilities
[i
]->lock
);
1827 ACQUIRE_LOCK(&all_tasks_mutex
);
1830 stopTimer(); // See #4074
1832 #if defined(TRACING)
1833 flushEventLog(); // so that child won't inherit dirty file buffers
1838 if (pid
) { // parent
1840 startTimer(); // #4074
1842 RELEASE_LOCK(&sched_mutex
);
1843 RELEASE_LOCK(&sm_mutex
);
1844 RELEASE_LOCK(&stable_mutex
);
1845 RELEASE_LOCK(&task
->lock
);
1847 for (i
=0; i
< n_capabilities
; i
++) {
1848 releaseCapability_(capabilities
[i
],rtsFalse
);
1849 RELEASE_LOCK(&capabilities
[i
]->lock
);
1853 RELEASE_LOCK(&all_tasks_mutex
);
1856 boundTaskExiting(task
);
1858 // just return the pid
1863 #if defined(THREADED_RTS)
1864 initMutex(&sched_mutex
);
1865 initMutex(&sm_mutex
);
1866 initMutex(&stable_mutex
);
1867 initMutex(&task
->lock
);
1869 for (i
=0; i
< n_capabilities
; i
++) {
1870 initMutex(&capabilities
[i
]->lock
);
1873 initMutex(&all_tasks_mutex
);
1880 // Now, all OS threads except the thread that forked are
1881 // stopped. We need to stop all Haskell threads, including
1882 // those involved in foreign calls. Also we need to delete
1883 // all Tasks, because they correspond to OS threads that are
1886 for (g
= 0; g
< RtsFlags
.GcFlags
.generations
; g
++) {
1887 for (t
= generations
[g
].threads
; t
!= END_TSO_QUEUE
; t
= next
) {
1888 next
= t
->global_link
;
1889 // don't allow threads to catch the ThreadKilled
1890 // exception, but we do want to raiseAsync() because these
1891 // threads may be evaluating thunks that we need later.
1892 deleteThread_(t
->cap
,t
);
1894 // stop the GC from updating the InCall to point to
1895 // the TSO. This is only necessary because the
1896 // OSThread bound to the TSO has been killed, and
1897 // won't get a chance to exit in the usual way (see
1898 // also scheduleHandleThreadFinished).
1903 discardTasksExcept(task
);
1905 for (i
=0; i
< n_capabilities
; i
++) {
1906 cap
= capabilities
[i
];
1908 // Empty the run queue. It seems tempting to let all the
1909 // killed threads stay on the run queue as zombies to be
1910 // cleaned up later, but some of them may correspond to
1911 // bound threads for which the corresponding Task does not
1913 truncateRunQueue(cap
);
1915 // Any suspended C-calling Tasks are no more, their OS threads
1917 cap
->suspended_ccalls
= NULL
;
1919 #if defined(THREADED_RTS)
1920 // Wipe our spare workers list, they no longer exist. New
1921 // workers will be created if necessary.
1922 cap
->spare_workers
= NULL
;
1923 cap
->n_spare_workers
= 0;
1924 cap
->returning_tasks_hd
= NULL
;
1925 cap
->returning_tasks_tl
= NULL
;
1928 // Release all caps except 0, we'll use that for starting
1929 // the IO manager and running the client action below.
1932 releaseCapability(cap
);
1935 cap
= capabilities
[0];
1938 // Empty the threads lists. Otherwise, the garbage
1939 // collector may attempt to resurrect some of these threads.
1940 for (g
= 0; g
< RtsFlags
.GcFlags
.generations
; g
++) {
1941 generations
[g
].threads
= END_TSO_QUEUE
;
1944 // On Unix, all timers are reset in the child, so we need to start
1949 // TODO: need to trace various other things in the child
1950 // like startup event, capabilities, process info etc
1951 traceTaskCreate(task
, cap
);
1953 #if defined(THREADED_RTS)
1954 ioManagerStartCap(&cap
);
1957 rts_evalStableIO(&cap
, entry
, NULL
); // run the action
1958 rts_checkSchedStatus("forkProcess",cap
);
1961 shutdownHaskellAndExit(EXIT_SUCCESS
, 0 /* !fastExit */);
1963 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1964 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1968 /* ---------------------------------------------------------------------------
1969 * Changing the number of Capabilities
1971 * Changing the number of Capabilities is very tricky! We can only do
1972 * it with the system fully stopped, so we do a full sync with
1973 * requestSync(SYNC_OTHER) and grab all the capabilities.
1975 * Then we resize the appropriate data structures, and update all
1976 * references to the old data structures which have now moved.
1977 * Finally we release the Capabilities we are holding, and start
1978 * worker Tasks on the new Capabilities we created.
1980 * ------------------------------------------------------------------------- */
1983 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS
)
1985 #if !defined(THREADED_RTS)
1986 if (new_n_capabilities
!= 1) {
1987 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1990 #elif defined(NOSMP)
1991 if (new_n_capabilities
!= 1) {
1992 errorBelch("setNumCapabilities: not supported on this platform");
2000 Capability
*old_capabilities
= NULL
;
2001 nat old_n_capabilities
= n_capabilities
;
2003 if (new_n_capabilities
== enabled_capabilities
) return;
2005 debugTrace(DEBUG_sched
, "changing the number of Capabilities from %d to %d",
2006 enabled_capabilities
, new_n_capabilities
);
2009 task
= cap
->running_task
;
2012 sync
= requestSync(&cap
, task
, SYNC_OTHER
);
2015 acquireAllCapabilities(cap
,task
);
2019 if (new_n_capabilities
< enabled_capabilities
)
2021 // Reducing the number of capabilities: we do not actually
2022 // remove the extra capabilities, we just mark them as
2023 // "disabled". This has the following effects:
2025 // - threads on a disabled capability are migrated away by the
2028 // - disabled capabilities do not participate in GC
2029 // (see scheduleDoGC())
2031 // - No spark threads are created on this capability
2032 // (see scheduleActivateSpark())
2034 // - We do not attempt to migrate threads *to* a disabled
2035 // capability (see schedulePushWork()).
2037 // but in other respects, a disabled capability remains
2038 // alive. Threads may be woken up on a disabled capability,
2039 // but they will be immediately migrated away.
2041 // This approach is much easier than trying to actually remove
2042 // the capability; we don't have to worry about GC data
2043 // structures, the nursery, etc.
2045 for (n
= new_n_capabilities
; n
< enabled_capabilities
; n
++) {
2046 capabilities
[n
]->disabled
= rtsTrue
;
2047 traceCapDisable(capabilities
[n
]);
2049 enabled_capabilities
= new_n_capabilities
;
2053 // Increasing the number of enabled capabilities.
2055 // enable any disabled capabilities, up to the required number
2056 for (n
= enabled_capabilities
;
2057 n
< new_n_capabilities
&& n
< n_capabilities
; n
++) {
2058 capabilities
[n
]->disabled
= rtsFalse
;
2059 traceCapEnable(capabilities
[n
]);
2061 enabled_capabilities
= n
;
2063 if (new_n_capabilities
> n_capabilities
) {
2064 #if defined(TRACING)
2065 // Allocate eventlog buffers for the new capabilities. Note this
2066 // must be done before calling moreCapabilities(), because that
2067 // will emit events about creating the new capabilities and adding
2068 // them to existing capsets.
2069 tracingAddCapapilities(n_capabilities
, new_n_capabilities
);
2072 // Resize the capabilities array
2073 // NB. after this, capabilities points somewhere new. Any pointers
2074 // of type (Capability *) are now invalid.
2075 moreCapabilities(n_capabilities
, new_n_capabilities
);
2077 // Resize and update storage manager data structures
2078 storageAddCapabilities(n_capabilities
, new_n_capabilities
);
2082 // update n_capabilities before things start running
2083 if (new_n_capabilities
> n_capabilities
) {
2084 n_capabilities
= enabled_capabilities
= new_n_capabilities
;
2087 // Start worker tasks on the new Capabilities
2088 startWorkerTasks(old_n_capabilities
, new_n_capabilities
);
2090 // We're done: release the original Capabilities
2091 releaseAllCapabilities(old_n_capabilities
, cap
,task
);
2093 // We can't free the old array until now, because we access it
2094 // while updating pointers in updateCapabilityRefs().
2095 if (old_capabilities
) {
2096 stgFree(old_capabilities
);
2099 // Notify IO manager that the number of capabilities has changed.
2100 rts_evalIO(&cap
, ioManagerCapabilitiesChanged_closure
, NULL
);
2104 #endif // THREADED_RTS
2109 /* ---------------------------------------------------------------------------
2110 * Delete all the threads in the system
2111 * ------------------------------------------------------------------------- */
2114 deleteAllThreads ( Capability
*cap
)
2116 // NOTE: only safe to call if we own all capabilities.
2121 debugTrace(DEBUG_sched
,"deleting all threads");
2122 for (g
= 0; g
< RtsFlags
.GcFlags
.generations
; g
++) {
2123 for (t
= generations
[g
].threads
; t
!= END_TSO_QUEUE
; t
= next
) {
2124 next
= t
->global_link
;
2125 deleteThread(cap
,t
);
2129 // The run queue now contains a bunch of ThreadKilled threads. We
2130 // must not throw these away: the main thread(s) will be in there
2131 // somewhere, and the main scheduler loop has to deal with it.
2132 // Also, the run queue is the only thing keeping these threads from
2133 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2135 #if !defined(THREADED_RTS)
2136 ASSERT(blocked_queue_hd
== END_TSO_QUEUE
);
2137 ASSERT(sleeping_queue
== END_TSO_QUEUE
);
2141 /* -----------------------------------------------------------------------------
2142 Managing the suspended_ccalls list.
2143 Locks required: sched_mutex
2144 -------------------------------------------------------------------------- */
2147 suspendTask (Capability
*cap
, Task
*task
)
2151 incall
= task
->incall
;
2152 ASSERT(incall
->next
== NULL
&& incall
->prev
== NULL
);
2153 incall
->next
= cap
->suspended_ccalls
;
2154 incall
->prev
= NULL
;
2155 if (cap
->suspended_ccalls
) {
2156 cap
->suspended_ccalls
->prev
= incall
;
2158 cap
->suspended_ccalls
= incall
;
2162 recoverSuspendedTask (Capability
*cap
, Task
*task
)
2166 incall
= task
->incall
;
2168 incall
->prev
->next
= incall
->next
;
2170 ASSERT(cap
->suspended_ccalls
== incall
);
2171 cap
->suspended_ccalls
= incall
->next
;
2174 incall
->next
->prev
= incall
->prev
;
2176 incall
->next
= incall
->prev
= NULL
;
2179 /* ---------------------------------------------------------------------------
2180 * Suspending & resuming Haskell threads.
2182 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2183 * its capability before calling the C function. This allows another
2184 * task to pick up the capability and carry on running Haskell
2185 * threads. It also means that if the C call blocks, it won't lock
2188 * The Haskell thread making the C call is put to sleep for the
2189 * duration of the call, on the suspended_ccalling_threads queue. We
2190 * give out a token to the task, which it can use to resume the thread
2191 * on return from the C function.
2193 * If this is an interruptible C call, this means that the FFI call may be
2194 * unceremoniously terminated and should be scheduled on an
2195 * unbound worker thread.
2196 * ------------------------------------------------------------------------- */
2199 suspendThread (StgRegTable
*reg
, rtsBool interruptible
)
2206 StgWord32 saved_winerror
;
2209 saved_errno
= errno
;
2211 saved_winerror
= GetLastError();
2214 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2216 cap
= regTableToCapability(reg
);
2218 task
= cap
->running_task
;
2219 tso
= cap
->r
.rCurrentTSO
;
2221 traceEventStopThread(cap
, tso
, THREAD_SUSPENDED_FOREIGN_CALL
, 0);
2223 // XXX this might not be necessary --SDM
2224 tso
->what_next
= ThreadRunGHC
;
2226 threadPaused(cap
,tso
);
2228 if (interruptible
) {
2229 tso
->why_blocked
= BlockedOnCCall_Interruptible
;
2231 tso
->why_blocked
= BlockedOnCCall
;
2234 // Hand back capability
2235 task
->incall
->suspended_tso
= tso
;
2236 task
->incall
->suspended_cap
= cap
;
2238 // Otherwise allocate() will write to invalid memory.
2239 cap
->r
.rCurrentTSO
= NULL
;
2241 ACQUIRE_LOCK(&cap
->lock
);
2243 suspendTask(cap
,task
);
2244 cap
->in_haskell
= rtsFalse
;
2245 releaseCapability_(cap
,rtsFalse
);
2247 RELEASE_LOCK(&cap
->lock
);
2249 errno
= saved_errno
;
2251 SetLastError(saved_winerror
);
2257 resumeThread (void *task_
)
2265 StgWord32 saved_winerror
;
2268 saved_errno
= errno
;
2270 saved_winerror
= GetLastError();
2273 incall
= task
->incall
;
2274 cap
= incall
->suspended_cap
;
2277 // Wait for permission to re-enter the RTS with the result.
2278 waitForReturnCapability(&cap
,task
);
2279 // we might be on a different capability now... but if so, our
2280 // entry on the suspended_ccalls list will also have been
2283 // Remove the thread from the suspended list
2284 recoverSuspendedTask(cap
,task
);
2286 tso
= incall
->suspended_tso
;
2287 incall
->suspended_tso
= NULL
;
2288 incall
->suspended_cap
= NULL
;
2289 tso
->_link
= END_TSO_QUEUE
; // no write barrier reqd
2291 traceEventRunThread(cap
, tso
);
2293 /* Reset blocking status */
2294 tso
->why_blocked
= NotBlocked
;
2296 if ((tso
->flags
& TSO_BLOCKEX
) == 0) {
2297 // avoid locking the TSO if we don't have to
2298 if (tso
->blocked_exceptions
!= END_BLOCKED_EXCEPTIONS_QUEUE
) {
2299 maybePerformBlockedException(cap
,tso
);
2303 cap
->r
.rCurrentTSO
= tso
;
2304 cap
->in_haskell
= rtsTrue
;
2305 errno
= saved_errno
;
2307 SetLastError(saved_winerror
);
2310 /* We might have GC'd, mark the TSO dirty again */
2312 dirty_STACK(cap
,tso
->stackobj
);
2314 IF_DEBUG(sanity
, checkTSO(tso
));
2319 /* ---------------------------------------------------------------------------
2322 * scheduleThread puts a thread on the end of the runnable queue.
2323 * This will usually be done immediately after a thread is created.
2324 * The caller of scheduleThread must create the thread using e.g.
2325 * createThread and push an appropriate closure
2326 * on this thread's stack before the scheduler is invoked.
2327 * ------------------------------------------------------------------------ */
2330 scheduleThread(Capability
*cap
, StgTSO
*tso
)
2332 // The thread goes at the *end* of the run-queue, to avoid possible
2333 // starvation of any threads already on the queue.
2334 appendToRunQueue(cap
,tso
);
2338 scheduleThreadOn(Capability
*cap
, StgWord cpu USED_IF_THREADS
, StgTSO
*tso
)
2340 tso
->flags
|= TSO_LOCKED
; // we requested explicit affinity; don't
2341 // move this thread from now on.
2342 #if defined(THREADED_RTS)
2343 cpu
%= enabled_capabilities
;
2344 if (cpu
== cap
->no
) {
2345 appendToRunQueue(cap
,tso
);
2347 migrateThread(cap
, tso
, capabilities
[cpu
]);
2350 appendToRunQueue(cap
,tso
);
2355 scheduleWaitThread (StgTSO
* tso
, /*[out]*/HaskellObj
* ret
, Capability
**pcap
)
2358 DEBUG_ONLY( StgThreadID id
);
2363 // We already created/initialised the Task
2364 task
= cap
->running_task
;
2366 // This TSO is now a bound thread; make the Task and TSO
2367 // point to each other.
2368 tso
->bound
= task
->incall
;
2371 task
->incall
->tso
= tso
;
2372 task
->incall
->ret
= ret
;
2373 task
->incall
->stat
= NoStatus
;
2375 appendToRunQueue(cap
,tso
);
2377 DEBUG_ONLY( id
= tso
->id
);
2378 debugTrace(DEBUG_sched
, "new bound thread (%lu)", (unsigned long)id
);
2380 cap
= schedule(cap
,task
);
2382 ASSERT(task
->incall
->stat
!= NoStatus
);
2383 ASSERT_FULL_CAPABILITY_INVARIANTS(cap
,task
);
2385 debugTrace(DEBUG_sched
, "bound thread (%lu) finished", (unsigned long)id
);
2389 /* ----------------------------------------------------------------------------
2391 * ------------------------------------------------------------------------- */
2393 #if defined(THREADED_RTS)
2394 void scheduleWorker (Capability
*cap
, Task
*task
)
2396 // schedule() runs without a lock.
2397 cap
= schedule(cap
,task
);
2399 // On exit from schedule(), we have a Capability, but possibly not
2400 // the same one we started with.
2402 // During shutdown, the requirement is that after all the
2403 // Capabilities are shut down, all workers that are shutting down
2404 // have finished workerTaskStop(). This is why we hold on to
2405 // cap->lock until we've finished workerTaskStop() below.
2407 // There may be workers still involved in foreign calls; those
2408 // will just block in waitForReturnCapability() because the
2409 // Capability has been shut down.
2411 ACQUIRE_LOCK(&cap
->lock
);
2412 releaseCapability_(cap
,rtsFalse
);
2413 workerTaskStop(task
);
2414 RELEASE_LOCK(&cap
->lock
);
2418 /* ---------------------------------------------------------------------------
2419 * Start new worker tasks on Capabilities from--to
2420 * -------------------------------------------------------------------------- */
2423 startWorkerTasks (nat from USED_IF_THREADS
, nat to USED_IF_THREADS
)
2425 #if defined(THREADED_RTS)
2429 for (i
= from
; i
< to
; i
++) {
2430 cap
= capabilities
[i
];
2431 ACQUIRE_LOCK(&cap
->lock
);
2432 startWorkerTask(cap
);
2433 RELEASE_LOCK(&cap
->lock
);
2438 /* ---------------------------------------------------------------------------
2441 * Initialise the scheduler. This resets all the queues - if the
2442 * queues contained any threads, they'll be garbage collected at the
2445 * ------------------------------------------------------------------------ */
2450 #if !defined(THREADED_RTS)
2451 blocked_queue_hd
= END_TSO_QUEUE
;
2452 blocked_queue_tl
= END_TSO_QUEUE
;
2453 sleeping_queue
= END_TSO_QUEUE
;
2456 sched_state
= SCHED_RUNNING
;
2457 recent_activity
= ACTIVITY_YES
;
2459 #if defined(THREADED_RTS)
2460 /* Initialise the mutex and condition variables used by
2462 initMutex(&sched_mutex
);
2465 ACQUIRE_LOCK(&sched_mutex
);
2467 /* A capability holds the state a native thread needs in
2468 * order to execute STG code. At least one capability is
2469 * floating around (only THREADED_RTS builds have more than one).
2476 * Eagerly start one worker to run each Capability, except for
2477 * Capability 0. The idea is that we're probably going to start a
2478 * bound thread on Capability 0 pretty soon, so we don't want a
2479 * worker task hogging it.
2481 startWorkerTasks(1, n_capabilities
);
2483 RELEASE_LOCK(&sched_mutex
);
2488 exitScheduler (rtsBool wait_foreign USED_IF_THREADS
)
2489 /* see Capability.c, shutdownCapability() */
2493 task
= newBoundTask();
2495 // If we haven't killed all the threads yet, do it now.
2496 if (sched_state
< SCHED_SHUTTING_DOWN
) {
2497 sched_state
= SCHED_INTERRUPTING
;
2498 Capability
*cap
= task
->cap
;
2499 waitForReturnCapability(&cap
,task
);
2500 scheduleDoGC(&cap
,task
,rtsTrue
);
2501 ASSERT(task
->incall
->tso
== NULL
);
2502 releaseCapability(cap
);
2504 sched_state
= SCHED_SHUTTING_DOWN
;
2506 shutdownCapabilities(task
, wait_foreign
);
2508 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2509 // n_failed_trygrab_idles, n_idle_caps);
2511 boundTaskExiting(task
);
2515 freeScheduler( void )
2519 ACQUIRE_LOCK(&sched_mutex
);
2520 still_running
= freeTaskManager();
2521 // We can only free the Capabilities if there are no Tasks still
2522 // running. We might have a Task about to return from a foreign
2523 // call into waitForReturnCapability(), for example (actually,
2524 // this should be the *only* thing that a still-running Task can
2525 // do at this point, and it will block waiting for the
2527 if (still_running
== 0) {
2530 RELEASE_LOCK(&sched_mutex
);
2531 #if defined(THREADED_RTS)
2532 closeMutex(&sched_mutex
);
2536 void markScheduler (evac_fn evac USED_IF_NOT_THREADS
,
2537 void *user USED_IF_NOT_THREADS
)
2539 #if !defined(THREADED_RTS)
2540 evac(user
, (StgClosure
**)(void *)&blocked_queue_hd
);
2541 evac(user
, (StgClosure
**)(void *)&blocked_queue_tl
);
2542 evac(user
, (StgClosure
**)(void *)&sleeping_queue
);
2546 /* -----------------------------------------------------------------------------
2549 This is the interface to the garbage collector from Haskell land.
2550 We provide this so that external C code can allocate and garbage
2551 collect when called from Haskell via _ccall_GC.
2552 -------------------------------------------------------------------------- */
2555 performGC_(rtsBool force_major
)
2558 Capability
*cap
= NULL
;
2560 // We must grab a new Task here, because the existing Task may be
2561 // associated with a particular Capability, and chained onto the
2562 // suspended_ccalls queue.
2563 task
= newBoundTask();
2565 // TODO: do we need to traceTask*() here?
2567 waitForReturnCapability(&cap
,task
);
2568 scheduleDoGC(&cap
,task
,force_major
);
2569 releaseCapability(cap
);
2570 boundTaskExiting(task
);
2576 performGC_(rtsFalse
);
2580 performMajorGC(void)
2582 performGC_(rtsTrue
);
2585 /* ---------------------------------------------------------------------------
2587 - usually called inside a signal handler so it mustn't do anything fancy.
2588 ------------------------------------------------------------------------ */
2591 interruptStgRts(void)
2593 sched_state
= SCHED_INTERRUPTING
;
2594 interruptAllCapabilities();
2595 #if defined(THREADED_RTS)
2600 /* -----------------------------------------------------------------------------
2603 This function causes at least one OS thread to wake up and run the
2604 scheduler loop. It is invoked when the RTS might be deadlocked, or
2605 an external event has arrived that may need servicing (eg. a
2606 keyboard interrupt).
2608 In the single-threaded RTS we don't do anything here; we only have
2609 one thread anyway, and the event that caused us to want to wake up
2610 will have interrupted any blocking system call in progress anyway.
2611 -------------------------------------------------------------------------- */
2613 #if defined(THREADED_RTS)
2614 void wakeUpRts(void)
2616 // This forces the IO Manager thread to wakeup, which will
2617 // in turn ensure that some OS thread wakes up and runs the
2618 // scheduler loop, which will cause a GC and deadlock check.
2623 /* -----------------------------------------------------------------------------
2626 This is used for interruption (^C) and forking, and corresponds to
2627 raising an exception but without letting the thread catch the
2629 -------------------------------------------------------------------------- */
2632 deleteThread (Capability
*cap STG_UNUSED
, StgTSO
*tso
)
2634 // NOTE: must only be called on a TSO that we have exclusive
2635 // access to, because we will call throwToSingleThreaded() below.
2636 // The TSO must be on the run queue of the Capability we own, or
2637 // we must own all Capabilities.
2639 if (tso
->why_blocked
!= BlockedOnCCall
&&
2640 tso
->why_blocked
!= BlockedOnCCall_Interruptible
) {
2641 throwToSingleThreaded(tso
->cap
,tso
,NULL
);
2645 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2647 deleteThread_(Capability
*cap
, StgTSO
*tso
)
2648 { // for forkProcess only:
2649 // like deleteThread(), but we delete threads in foreign calls, too.
2651 if (tso
->why_blocked
== BlockedOnCCall
||
2652 tso
->why_blocked
== BlockedOnCCall_Interruptible
) {
2653 tso
->what_next
= ThreadKilled
;
2654 appendToRunQueue(tso
->cap
, tso
);
2656 deleteThread(cap
,tso
);
2661 /* -----------------------------------------------------------------------------
2662 raiseExceptionHelper
2664 This function is called by the raise# primitve, just so that we can
2665 move some of the tricky bits of raising an exception from C-- into
2666 C. Who knows, it might be a useful re-useable thing here too.
2667 -------------------------------------------------------------------------- */
2670 raiseExceptionHelper (StgRegTable
*reg
, StgTSO
*tso
, StgClosure
*exception
)
2672 Capability
*cap
= regTableToCapability(reg
);
2673 StgThunk
*raise_closure
= NULL
;
2675 StgRetInfoTable
*info
;
2677 // This closure represents the expression 'raise# E' where E
2678 // is the exception raise. It is used to overwrite all the
2679 // thunks which are currently under evaluataion.
2682 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2683 // LDV profiling: stg_raise_info has THUNK as its closure
2684 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2685 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2686 // 1 does not cause any problem unless profiling is performed.
2687 // However, when LDV profiling goes on, we need to linearly scan
2688 // small object pool, where raise_closure is stored, so we should
2689 // use MIN_UPD_SIZE.
2691 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2692 // sizeofW(StgClosure)+1);
2696 // Walk up the stack, looking for the catch frame. On the way,
2697 // we update any closures pointed to from update frames with the
2698 // raise closure that we just built.
2700 p
= tso
->stackobj
->sp
;
2702 info
= get_ret_itbl((StgClosure
*)p
);
2703 next
= p
+ stack_frame_sizeW((StgClosure
*)p
);
2704 switch (info
->i
.type
) {
2707 // Only create raise_closure if we need to.
2708 if (raise_closure
== NULL
) {
2710 (StgThunk
*)allocate(cap
,sizeofW(StgThunk
)+1);
2711 SET_HDR(raise_closure
, &stg_raise_info
, cap
->r
.rCCCS
);
2712 raise_closure
->payload
[0] = exception
;
2714 updateThunk(cap
, tso
, ((StgUpdateFrame
*)p
)->updatee
,
2715 (StgClosure
*)raise_closure
);
2719 case ATOMICALLY_FRAME
:
2720 debugTrace(DEBUG_stm
, "found ATOMICALLY_FRAME at %p", p
);
2721 tso
->stackobj
->sp
= p
;
2722 return ATOMICALLY_FRAME
;
2725 tso
->stackobj
->sp
= p
;
2728 case CATCH_STM_FRAME
:
2729 debugTrace(DEBUG_stm
, "found CATCH_STM_FRAME at %p", p
);
2730 tso
->stackobj
->sp
= p
;
2731 return CATCH_STM_FRAME
;
2733 case UNDERFLOW_FRAME
:
2734 tso
->stackobj
->sp
= p
;
2735 threadStackUnderflow(cap
,tso
);
2736 p
= tso
->stackobj
->sp
;
2740 tso
->stackobj
->sp
= p
;
2743 case CATCH_RETRY_FRAME
: {
2744 StgTRecHeader
*trec
= tso
-> trec
;
2745 StgTRecHeader
*outer
= trec
-> enclosing_trec
;
2746 debugTrace(DEBUG_stm
,
2747 "found CATCH_RETRY_FRAME at %p during raise", p
);
2748 debugTrace(DEBUG_stm
, "trec=%p outer=%p", trec
, outer
);
2749 stmAbortTransaction(cap
, trec
);
2750 stmFreeAbortedTRec(cap
, trec
);
2751 tso
-> trec
= outer
;
2764 /* -----------------------------------------------------------------------------
2765 findRetryFrameHelper
2767 This function is called by the retry# primitive. It traverses the stack
2768 leaving tso->sp referring to the frame which should handle the retry.
2770 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2771 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2773 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2774 create) because retries are not considered to be exceptions, despite the
2775 similar implementation.
2777 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2778 not be created within memory transactions.
2779 -------------------------------------------------------------------------- */
2782 findRetryFrameHelper (Capability
*cap
, StgTSO
*tso
)
2785 StgRetInfoTable
*info
;
2787 p
= tso
->stackobj
->sp
;
2789 info
= get_ret_itbl((StgClosure
*)p
);
2790 next
= p
+ stack_frame_sizeW((StgClosure
*)p
);
2791 switch (info
->i
.type
) {
2793 case ATOMICALLY_FRAME
:
2794 debugTrace(DEBUG_stm
,
2795 "found ATOMICALLY_FRAME at %p during retry", p
);
2796 tso
->stackobj
->sp
= p
;
2797 return ATOMICALLY_FRAME
;
2799 case CATCH_RETRY_FRAME
:
2800 debugTrace(DEBUG_stm
,
2801 "found CATCH_RETRY_FRAME at %p during retry", p
);
2802 tso
->stackobj
->sp
= p
;
2803 return CATCH_RETRY_FRAME
;
2805 case CATCH_STM_FRAME
: {
2806 StgTRecHeader
*trec
= tso
-> trec
;
2807 StgTRecHeader
*outer
= trec
-> enclosing_trec
;
2808 debugTrace(DEBUG_stm
,
2809 "found CATCH_STM_FRAME at %p during retry", p
);
2810 debugTrace(DEBUG_stm
, "trec=%p outer=%p", trec
, outer
);
2811 stmAbortTransaction(cap
, trec
);
2812 stmFreeAbortedTRec(cap
, trec
);
2813 tso
-> trec
= outer
;
2818 case UNDERFLOW_FRAME
:
2819 tso
->stackobj
->sp
= p
;
2820 threadStackUnderflow(cap
,tso
);
2821 p
= tso
->stackobj
->sp
;
2825 ASSERT(info
->i
.type
!= CATCH_FRAME
);
2826 ASSERT(info
->i
.type
!= STOP_FRAME
);
2833 /* -----------------------------------------------------------------------------
2834 resurrectThreads is called after garbage collection on the list of
2835 threads found to be garbage. Each of these threads will be woken
2836 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2837 on an MVar, or NonTermination if the thread was blocked on a Black
2840 Locks: assumes we hold *all* the capabilities.
2841 -------------------------------------------------------------------------- */
2844 resurrectThreads (StgTSO
*threads
)
2850 for (tso
= threads
; tso
!= END_TSO_QUEUE
; tso
= next
) {
2851 next
= tso
->global_link
;
2853 gen
= Bdescr((P_
)tso
)->gen
;
2854 tso
->global_link
= gen
->threads
;
2857 debugTrace(DEBUG_sched
, "resurrecting thread %lu", (unsigned long)tso
->id
);
2859 // Wake up the thread on the Capability it was last on
2862 switch (tso
->why_blocked
) {
2864 case BlockedOnMVarRead
:
2865 /* Called by GC - sched_mutex lock is currently held. */
2866 throwToSingleThreaded(cap
, tso
,
2867 (StgClosure
*)blockedIndefinitelyOnMVar_closure
);
2869 case BlockedOnBlackHole
:
2870 throwToSingleThreaded(cap
, tso
,
2871 (StgClosure
*)nonTermination_closure
);
2874 throwToSingleThreaded(cap
, tso
,
2875 (StgClosure
*)blockedIndefinitelyOnSTM_closure
);
2878 /* This might happen if the thread was blocked on a black hole
2879 * belonging to a thread that we've just woken up (raiseAsync
2880 * can wake up threads, remember...).
2883 case BlockedOnMsgThrowTo
:
2884 // This can happen if the target is masking, blocks on a
2885 // black hole, and then is found to be unreachable. In
2886 // this case, we want to let the target wake up and carry
2887 // on, and do nothing to this thread.
2890 barf("resurrectThreads: thread blocked in a strange way: %d",