1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
13 #include "sm/Storage.h"
17 #include "Interpreter.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
24 #include "ThreadLabels.h"
26 #include "Proftimer.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
32 #include "Capability.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
39 #include "RaiseAsync.h"
42 #include "ThreadPaused.h"
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
62 #include "eventlog/EventLog.h"
64 /* -----------------------------------------------------------------------------
66 * -------------------------------------------------------------------------- */
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO
*blocked_queue_hd
= NULL
;
71 StgTSO
*blocked_queue_tl
= NULL
;
72 StgTSO
*sleeping_queue
= NULL
; // perhaps replace with a hash table?
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
79 rtsBool heap_overflow
= rtsFalse
;
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
85 * NB. must be StgWord, we do xchg() on it.
87 volatile StgWord recent_activity
= ACTIVITY_YES
;
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
92 volatile StgWord sched_state
= SCHED_RUNNING
;
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
101 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
102 * in an MT setting, needed to signal that a worker thread shouldn't hang around
103 * in the scheduler when it is out of work.
105 rtsBool shutting_down_scheduler
= rtsFalse
;
108 * This mutex protects most of the global scheduler data in
109 * the THREADED_RTS runtime.
111 #if defined(THREADED_RTS)
115 #if !defined(mingw32_HOST_OS)
116 #define FORKPROCESS_PRIMOP_SUPPORTED
121 static nat n_failed_trygrab_idles
= 0, n_idle_caps
= 0;
124 /* -----------------------------------------------------------------------------
125 * static function prototypes
126 * -------------------------------------------------------------------------- */
128 static Capability
*schedule (Capability
*initialCapability
, Task
*task
);
131 // These functions all encapsulate parts of the scheduler loop, and are
132 // abstracted only to make the structure and control flow of the
133 // scheduler clearer.
135 static void schedulePreLoop (void);
136 static void scheduleFindWork (Capability
**pcap
);
137 #if defined(THREADED_RTS)
138 static void scheduleYield (Capability
**pcap
, Task
*task
);
140 #if defined(THREADED_RTS)
141 static nat
requestSync (Capability
**pcap
, Task
*task
, nat sync_type
);
142 static void acquireAllCapabilities(Capability
*cap
, Task
*task
);
143 static void releaseAllCapabilities(Capability
*cap
, Task
*task
);
144 static void startWorkerTasks (nat from USED_IF_THREADS
, nat to USED_IF_THREADS
);
146 static void scheduleStartSignalHandlers (Capability
*cap
);
147 static void scheduleCheckBlockedThreads (Capability
*cap
);
148 static void scheduleProcessInbox(Capability
**cap
);
149 static void scheduleDetectDeadlock (Capability
**pcap
, Task
*task
);
150 static void schedulePushWork(Capability
*cap
, Task
*task
);
151 #if defined(THREADED_RTS)
152 static void scheduleActivateSpark(Capability
*cap
);
154 static void schedulePostRunThread(Capability
*cap
, StgTSO
*t
);
155 static rtsBool
scheduleHandleHeapOverflow( Capability
*cap
, StgTSO
*t
);
156 static rtsBool
scheduleHandleYield( Capability
*cap
, StgTSO
*t
,
157 nat prev_what_next
);
158 static void scheduleHandleThreadBlocked( StgTSO
*t
);
159 static rtsBool
scheduleHandleThreadFinished( Capability
*cap
, Task
*task
,
161 static rtsBool
scheduleNeedHeapProfile(rtsBool ready_to_gc
);
162 static void scheduleDoGC(Capability
**pcap
, Task
*task
, rtsBool force_major
);
164 static void deleteThread (Capability
*cap
, StgTSO
*tso
);
165 static void deleteAllThreads (Capability
*cap
);
167 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
168 static void deleteThread_(Capability
*cap
, StgTSO
*tso
);
171 /* ---------------------------------------------------------------------------
172 Main scheduling loop.
174 We use round-robin scheduling, each thread returning to the
175 scheduler loop when one of these conditions is detected:
178 * timer expires (thread yields)
184 In a GranSim setup this loop iterates over the global event queue.
185 This revolves around the global event queue, which determines what
186 to do next. Therefore, it's more complicated than either the
187 concurrent or the parallel (GUM) setup.
188 This version has been entirely removed (JB 2008/08).
191 GUM iterates over incoming messages.
192 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
193 and sends out a fish whenever it has nothing to do; in-between
194 doing the actual reductions (shared code below) it processes the
195 incoming messages and deals with delayed operations
196 (see PendingFetches).
197 This is not the ugliest code you could imagine, but it's bloody close.
199 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
200 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
201 as well as future GUM versions. This file has been refurbished to
202 only contain valid code, which is however incomplete, refers to
203 invalid includes etc.
205 ------------------------------------------------------------------------ */
208 schedule (Capability
*initialCapability
, Task
*task
)
212 StgThreadReturnCode ret
;
215 #if defined(THREADED_RTS)
216 rtsBool first
= rtsTrue
;
219 cap
= initialCapability
;
221 // Pre-condition: this task owns initialCapability.
222 // The sched_mutex is *NOT* held
223 // NB. on return, we still hold a capability.
225 debugTrace (DEBUG_sched
, "cap %d: schedule()", initialCapability
->no
);
229 // -----------------------------------------------------------
230 // Scheduler loop starts here:
234 // Check whether we have re-entered the RTS from Haskell without
235 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
237 if (cap
->in_haskell
) {
238 errorBelch("schedule: re-entered unsafely.\n"
239 " Perhaps a 'foreign import unsafe' should be 'safe'?");
240 stg_exit(EXIT_FAILURE
);
243 // The interruption / shutdown sequence.
245 // In order to cleanly shut down the runtime, we want to:
246 // * make sure that all main threads return to their callers
247 // with the state 'Interrupted'.
248 // * clean up all OS threads assocated with the runtime
249 // * free all memory etc.
251 // So the sequence for ^C goes like this:
253 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
254 // arranges for some Capability to wake up
256 // * all threads in the system are halted, and the zombies are
257 // placed on the run queue for cleaning up. We acquire all
258 // the capabilities in order to delete the threads, this is
259 // done by scheduleDoGC() for convenience (because GC already
260 // needs to acquire all the capabilities). We can't kill
261 // threads involved in foreign calls.
263 // * somebody calls shutdownHaskell(), which calls exitScheduler()
265 // * sched_state := SCHED_SHUTTING_DOWN
267 // * all workers exit when the run queue on their capability
268 // drains. All main threads will also exit when their TSO
269 // reaches the head of the run queue and they can return.
271 // * eventually all Capabilities will shut down, and the RTS can
274 // * We might be left with threads blocked in foreign calls,
275 // we should really attempt to kill these somehow (TODO);
277 switch (sched_state
) {
280 case SCHED_INTERRUPTING
:
281 debugTrace(DEBUG_sched
, "SCHED_INTERRUPTING");
282 /* scheduleDoGC() deletes all the threads */
283 scheduleDoGC(&cap
,task
,rtsFalse
);
285 // after scheduleDoGC(), we must be shutting down. Either some
286 // other Capability did the final GC, or we did it above,
287 // either way we can fall through to the SCHED_SHUTTING_DOWN
289 ASSERT(sched_state
== SCHED_SHUTTING_DOWN
);
292 case SCHED_SHUTTING_DOWN
:
293 debugTrace(DEBUG_sched
, "SCHED_SHUTTING_DOWN");
294 // If we are a worker, just exit. If we're a bound thread
295 // then we will exit below when we've removed our TSO from
297 if (!isBoundTask(task
) && emptyRunQueue(cap
)) {
302 barf("sched_state: %d", sched_state
);
305 scheduleFindWork(&cap
);
307 /* work pushing, currently relevant only for THREADED_RTS:
308 (pushes threads, wakes up idle capabilities for stealing) */
309 schedulePushWork(cap
,task
);
311 scheduleDetectDeadlock(&cap
,task
);
313 // Normally, the only way we can get here with no threads to
314 // run is if a keyboard interrupt received during
315 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
316 // Additionally, it is not fatal for the
317 // threaded RTS to reach here with no threads to run.
319 // win32: might be here due to awaitEvent() being abandoned
320 // as a result of a console event having been delivered.
322 #if defined(THREADED_RTS)
326 // // don't yield the first time, we want a chance to run this
327 // // thread for a bit, even if there are others banging at the
330 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
333 scheduleYield(&cap
,task
);
335 if (emptyRunQueue(cap
)) continue; // look for work again
338 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
339 if ( emptyRunQueue(cap
) ) {
340 ASSERT(sched_state
>= SCHED_INTERRUPTING
);
345 // Get a thread to run
347 t
= popRunQueue(cap
);
349 // Sanity check the thread we're about to run. This can be
350 // expensive if there is lots of thread switching going on...
351 IF_DEBUG(sanity
,checkTSO(t
));
353 #if defined(THREADED_RTS)
354 // Check whether we can run this thread in the current task.
355 // If not, we have to pass our capability to the right task.
357 InCall
*bound
= t
->bound
;
360 if (bound
->task
== task
) {
361 // yes, the Haskell thread is bound to the current native thread
363 debugTrace(DEBUG_sched
,
364 "thread %lu bound to another OS thread",
365 (unsigned long)t
->id
);
366 // no, bound to a different Haskell thread: pass to that thread
367 pushOnRunQueue(cap
,t
);
371 // The thread we want to run is unbound.
372 if (task
->incall
->tso
) {
373 debugTrace(DEBUG_sched
,
374 "this OS thread cannot run thread %lu",
375 (unsigned long)t
->id
);
376 // no, the current native thread is bound to a different
377 // Haskell thread, so pass it to any worker thread
378 pushOnRunQueue(cap
,t
);
385 // If we're shutting down, and this thread has not yet been
386 // killed, kill it now. This sometimes happens when a finalizer
387 // thread is created by the final GC, or a thread previously
388 // in a foreign call returns.
389 if (sched_state
>= SCHED_INTERRUPTING
&&
390 !(t
->what_next
== ThreadComplete
|| t
->what_next
== ThreadKilled
)) {
394 // If this capability is disabled, migrate the thread away rather
395 // than running it. NB. but not if the thread is bound: it is
396 // really hard for a bound thread to migrate itself. Believe me,
397 // I tried several ways and couldn't find a way to do it.
398 // Instead, when everything is stopped for GC, we migrate all the
399 // threads on the run queue then (see scheduleDoGC()).
401 // ToDo: what about TSO_LOCKED? Currently we're migrating those
402 // when the number of capabilities drops, but we never migrate
403 // them back if it rises again. Presumably we should, but after
404 // the thread has been migrated we no longer know what capability
405 // it was originally on.
407 if (cap
->disabled
&& !t
->bound
) {
408 Capability
*dest_cap
= &capabilities
[cap
->no
% enabled_capabilities
];
409 migrateThread(cap
, t
, dest_cap
);
414 /* context switches are initiated by the timer signal, unless
415 * the user specified "context switch as often as possible", with
418 if (RtsFlags
.ConcFlags
.ctxtSwitchTicks
== 0
419 && !emptyThreadQueues(cap
)) {
420 cap
->context_switch
= 1;
425 // CurrentTSO is the thread to run. t might be different if we
426 // loop back to run_thread, so make sure to set CurrentTSO after
428 cap
->r
.rCurrentTSO
= t
;
430 startHeapProfTimer();
432 // ----------------------------------------------------------------------
433 // Run the current thread
435 ASSERT_FULL_CAPABILITY_INVARIANTS(cap
,task
);
436 ASSERT(t
->cap
== cap
);
437 ASSERT(t
->bound ? t
->bound
->task
->cap
== cap
: 1);
439 prev_what_next
= t
->what_next
;
441 errno
= t
->saved_errno
;
443 SetLastError(t
->saved_winerror
);
446 // reset the interrupt flag before running Haskell code
449 cap
->in_haskell
= rtsTrue
;
453 dirty_STACK(cap
,t
->stackobj
);
455 #if defined(THREADED_RTS)
456 if (recent_activity
== ACTIVITY_DONE_GC
) {
457 // ACTIVITY_DONE_GC means we turned off the timer signal to
458 // conserve power (see #1623). Re-enable it here.
460 prev
= xchg((P_
)&recent_activity
, ACTIVITY_YES
);
461 if (prev
== ACTIVITY_DONE_GC
) {
464 } else if (recent_activity
!= ACTIVITY_INACTIVE
) {
465 // If we reached ACTIVITY_INACTIVE, then don't reset it until
466 // we've done the GC. The thread running here might just be
467 // the IO manager thread that handle_tick() woke up via
469 recent_activity
= ACTIVITY_YES
;
473 traceEventRunThread(cap
, t
);
475 switch (prev_what_next
) {
479 /* Thread already finished, return to scheduler. */
480 ret
= ThreadFinished
;
486 r
= StgRun((StgFunPtr
) stg_returnToStackTop
, &cap
->r
);
487 cap
= regTableToCapability(r
);
492 case ThreadInterpret
:
493 cap
= interpretBCO(cap
);
498 barf("schedule: invalid what_next field");
501 cap
->in_haskell
= rtsFalse
;
503 // The TSO might have moved, eg. if it re-entered the RTS and a GC
504 // happened. So find the new location:
505 t
= cap
->r
.rCurrentTSO
;
507 // And save the current errno in this thread.
508 // XXX: possibly bogus for SMP because this thread might already
509 // be running again, see code below.
510 t
->saved_errno
= errno
;
512 // Similarly for Windows error code
513 t
->saved_winerror
= GetLastError();
516 if (ret
== ThreadBlocked
) {
517 if (t
->why_blocked
== BlockedOnBlackHole
) {
518 StgTSO
*owner
= blackHoleOwner(t
->block_info
.bh
->bh
);
519 traceEventStopThread(cap
, t
, t
->why_blocked
+ 6,
520 owner
!= NULL ? owner
->id
: 0);
522 traceEventStopThread(cap
, t
, t
->why_blocked
+ 6, 0);
525 traceEventStopThread(cap
, t
, ret
, 0);
528 ASSERT_FULL_CAPABILITY_INVARIANTS(cap
,task
);
529 ASSERT(t
->cap
== cap
);
531 // ----------------------------------------------------------------------
533 // Costs for the scheduler are assigned to CCS_SYSTEM
535 #if defined(PROFILING)
536 cap
->r
.rCCCS
= CCS_SYSTEM
;
539 schedulePostRunThread(cap
,t
);
541 ready_to_gc
= rtsFalse
;
545 ready_to_gc
= scheduleHandleHeapOverflow(cap
,t
);
549 // just adjust the stack for this thread, then pop it back
551 threadStackOverflow(cap
, t
);
552 pushOnRunQueue(cap
,t
);
556 if (scheduleHandleYield(cap
, t
, prev_what_next
)) {
557 // shortcut for switching between compiler/interpreter:
563 scheduleHandleThreadBlocked(t
);
567 if (scheduleHandleThreadFinished(cap
, task
, t
)) return cap
;
568 ASSERT_FULL_CAPABILITY_INVARIANTS(cap
,task
);
572 barf("schedule: invalid thread return code %d", (int)ret
);
575 if (ready_to_gc
|| scheduleNeedHeapProfile(ready_to_gc
)) {
576 scheduleDoGC(&cap
,task
,rtsFalse
);
578 } /* end of while() */
581 /* -----------------------------------------------------------------------------
582 * Run queue operations
583 * -------------------------------------------------------------------------- */
586 removeFromRunQueue (Capability
*cap
, StgTSO
*tso
)
588 if (tso
->block_info
.prev
== END_TSO_QUEUE
) {
589 ASSERT(cap
->run_queue_hd
== tso
);
590 cap
->run_queue_hd
= tso
->_link
;
592 setTSOLink(cap
, tso
->block_info
.prev
, tso
->_link
);
594 if (tso
->_link
== END_TSO_QUEUE
) {
595 ASSERT(cap
->run_queue_tl
== tso
);
596 cap
->run_queue_tl
= tso
->block_info
.prev
;
598 setTSOPrev(cap
, tso
->_link
, tso
->block_info
.prev
);
600 tso
->_link
= tso
->block_info
.prev
= END_TSO_QUEUE
;
602 IF_DEBUG(sanity
, checkRunQueue(cap
));
605 /* ----------------------------------------------------------------------------
606 * Setting up the scheduler loop
607 * ------------------------------------------------------------------------- */
610 schedulePreLoop(void)
612 // initialisation for scheduler - what cannot go into initScheduler()
614 #if defined(mingw32_HOST_OS) && !defined(GhcUnregisterised)
619 /* -----------------------------------------------------------------------------
622 * Search for work to do, and handle messages from elsewhere.
623 * -------------------------------------------------------------------------- */
626 scheduleFindWork (Capability
**pcap
)
628 scheduleStartSignalHandlers(*pcap
);
630 scheduleProcessInbox(pcap
);
632 scheduleCheckBlockedThreads(*pcap
);
634 #if defined(THREADED_RTS)
635 if (emptyRunQueue(*pcap
)) { scheduleActivateSpark(*pcap
); }
639 #if defined(THREADED_RTS)
640 STATIC_INLINE rtsBool
641 shouldYieldCapability (Capability
*cap
, Task
*task
)
643 // we need to yield this capability to someone else if..
644 // - another thread is initiating a GC
645 // - another Task is returning from a foreign call
646 // - the thread at the head of the run queue cannot be run
647 // by this Task (it is bound to another Task, or it is unbound
648 // and this task it bound).
649 return (pending_sync
||
650 cap
->returning_tasks_hd
!= NULL
||
651 (!emptyRunQueue(cap
) && (task
->incall
->tso
== NULL
652 ? cap
->run_queue_hd
->bound
!= NULL
653 : cap
->run_queue_hd
->bound
!= task
->incall
)));
656 // This is the single place where a Task goes to sleep. There are
657 // two reasons it might need to sleep:
658 // - there are no threads to run
659 // - we need to yield this Capability to someone else
660 // (see shouldYieldCapability())
662 // Careful: the scheduler loop is quite delicate. Make sure you run
663 // the tests in testsuite/concurrent (all ways) after modifying this,
664 // and also check the benchmarks in nofib/parallel for regressions.
667 scheduleYield (Capability
**pcap
, Task
*task
)
669 Capability
*cap
= *pcap
;
671 // if we have work, and we don't need to give up the Capability, continue.
673 if (!shouldYieldCapability(cap
,task
) &&
674 (!emptyRunQueue(cap
) ||
676 sched_state
>= SCHED_INTERRUPTING
))
679 // otherwise yield (sleep), and keep yielding if necessary.
681 yieldCapability(&cap
,task
);
683 while (shouldYieldCapability(cap
,task
));
685 // note there may still be no threads on the run queue at this
686 // point, the caller has to check.
693 /* -----------------------------------------------------------------------------
696 * Push work to other Capabilities if we have some.
697 * -------------------------------------------------------------------------- */
700 schedulePushWork(Capability
*cap USED_IF_THREADS
,
701 Task
*task USED_IF_THREADS
)
703 /* following code not for PARALLEL_HASKELL. I kept the call general,
704 future GUM versions might use pushing in a distributed setup */
705 #if defined(THREADED_RTS)
707 Capability
*free_caps
[n_capabilities
], *cap0
;
710 // migration can be turned off with +RTS -qm
711 if (!RtsFlags
.ParFlags
.migrate
) return;
713 // Check whether we have more threads on our run queue, or sparks
714 // in our pool, that we could hand to another Capability.
715 if (cap
->run_queue_hd
== END_TSO_QUEUE
) {
716 if (sparkPoolSizeCap(cap
) < 2) return;
718 if (cap
->run_queue_hd
->_link
== END_TSO_QUEUE
&&
719 sparkPoolSizeCap(cap
) < 1) return;
722 // First grab as many free Capabilities as we can.
723 for (i
=0, n_free_caps
=0; i
< n_capabilities
; i
++) {
724 cap0
= &capabilities
[i
];
725 if (cap
!= cap0
&& !cap0
->disabled
&& tryGrabCapability(cap0
,task
)) {
726 if (!emptyRunQueue(cap0
)
727 || cap0
->returning_tasks_hd
!= NULL
728 || cap0
->inbox
!= (Message
*)END_TSO_QUEUE
) {
729 // it already has some work, we just grabbed it at
730 // the wrong moment. Or maybe it's deadlocked!
731 releaseCapability(cap0
);
733 free_caps
[n_free_caps
++] = cap0
;
738 // we now have n_free_caps free capabilities stashed in
739 // free_caps[]. Share our run queue equally with them. This is
740 // probably the simplest thing we could do; improvements we might
741 // want to do include:
743 // - giving high priority to moving relatively new threads, on
744 // the gournds that they haven't had time to build up a
745 // working set in the cache on this CPU/Capability.
747 // - giving low priority to moving long-lived threads
749 if (n_free_caps
> 0) {
750 StgTSO
*prev
, *t
, *next
;
752 rtsBool pushed_to_all
;
755 debugTrace(DEBUG_sched
,
756 "cap %d: %s and %d free capabilities, sharing...",
758 (!emptyRunQueue(cap
) && cap
->run_queue_hd
->_link
!= END_TSO_QUEUE
)?
759 "excess threads on run queue":"sparks to share (>=2)",
764 pushed_to_all
= rtsFalse
;
767 if (cap
->run_queue_hd
!= END_TSO_QUEUE
) {
768 prev
= cap
->run_queue_hd
;
770 prev
->_link
= END_TSO_QUEUE
;
771 for (; t
!= END_TSO_QUEUE
; t
= next
) {
773 t
->_link
= END_TSO_QUEUE
;
774 if (t
->bound
== task
->incall
// don't move my bound thread
775 || tsoLocked(t
)) { // don't move a locked thread
776 setTSOLink(cap
, prev
, t
);
777 setTSOPrev(cap
, t
, prev
);
779 } else if (i
== n_free_caps
) {
781 pushed_to_all
= rtsTrue
;
785 setTSOLink(cap
, prev
, t
);
786 setTSOPrev(cap
, t
, prev
);
789 appendToRunQueue(free_caps
[i
],t
);
791 traceEventMigrateThread (cap
, t
, free_caps
[i
]->no
);
793 if (t
->bound
) { t
->bound
->task
->cap
= free_caps
[i
]; }
794 t
->cap
= free_caps
[i
];
798 cap
->run_queue_tl
= prev
;
800 IF_DEBUG(sanity
, checkRunQueue(cap
));
804 /* JB I left this code in place, it would work but is not necessary */
806 // If there are some free capabilities that we didn't push any
807 // threads to, then try to push a spark to each one.
808 if (!pushed_to_all
) {
810 // i is the next free capability to push to
811 for (; i
< n_free_caps
; i
++) {
812 if (emptySparkPoolCap(free_caps
[i
])) {
813 spark
= tryStealSpark(cap
->sparks
);
815 /* TODO: if anyone wants to re-enable this code then
816 * they must consider the fizzledSpark(spark) case
817 * and update the per-cap spark statistics.
819 debugTrace(DEBUG_sched
, "pushing spark %p to capability %d", spark
, free_caps
[i
]->no
);
821 traceEventStealSpark(free_caps
[i
], t
, cap
->no
);
823 newSpark(&(free_caps
[i
]->r
), spark
);
828 #endif /* SPARK_PUSHING */
830 // release the capabilities
831 for (i
= 0; i
< n_free_caps
; i
++) {
832 task
->cap
= free_caps
[i
];
833 releaseAndWakeupCapability(free_caps
[i
]);
836 task
->cap
= cap
; // reset to point to our Capability.
838 #endif /* THREADED_RTS */
842 /* ----------------------------------------------------------------------------
843 * Start any pending signal handlers
844 * ------------------------------------------------------------------------- */
846 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
848 scheduleStartSignalHandlers(Capability
*cap
)
850 if (RtsFlags
.MiscFlags
.install_signal_handlers
&& signals_pending()) {
851 // safe outside the lock
852 startSignalHandlers(cap
);
857 scheduleStartSignalHandlers(Capability
*cap STG_UNUSED
)
862 /* ----------------------------------------------------------------------------
863 * Check for blocked threads that can be woken up.
864 * ------------------------------------------------------------------------- */
867 scheduleCheckBlockedThreads(Capability
*cap USED_IF_NOT_THREADS
)
869 #if !defined(THREADED_RTS)
871 // Check whether any waiting threads need to be woken up. If the
872 // run queue is empty, and there are no other tasks running, we
873 // can wait indefinitely for something to happen.
875 if ( !emptyQueue(blocked_queue_hd
) || !emptyQueue(sleeping_queue
) )
877 awaitEvent (emptyRunQueue(cap
));
882 /* ----------------------------------------------------------------------------
883 * Detect deadlock conditions and attempt to resolve them.
884 * ------------------------------------------------------------------------- */
887 scheduleDetectDeadlock (Capability
**pcap
, Task
*task
)
889 Capability
*cap
= *pcap
;
891 * Detect deadlock: when we have no threads to run, there are no
892 * threads blocked, waiting for I/O, or sleeping, and all the
893 * other tasks are waiting for work, we must have a deadlock of
896 if ( emptyThreadQueues(cap
) )
898 #if defined(THREADED_RTS)
900 * In the threaded RTS, we only check for deadlock if there
901 * has been no activity in a complete timeslice. This means
902 * we won't eagerly start a full GC just because we don't have
903 * any threads to run currently.
905 if (recent_activity
!= ACTIVITY_INACTIVE
) return;
908 debugTrace(DEBUG_sched
, "deadlocked, forcing major GC...");
910 // Garbage collection can release some new threads due to
911 // either (a) finalizers or (b) threads resurrected because
912 // they are unreachable and will therefore be sent an
913 // exception. Any threads thus released will be immediately
915 scheduleDoGC (pcap
, task
, rtsTrue
/*force major GC*/);
917 // when force_major == rtsTrue. scheduleDoGC sets
918 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
921 if ( !emptyRunQueue(cap
) ) return;
923 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
924 /* If we have user-installed signal handlers, then wait
925 * for signals to arrive rather then bombing out with a
928 if ( RtsFlags
.MiscFlags
.install_signal_handlers
&& anyUserHandlers() ) {
929 debugTrace(DEBUG_sched
,
930 "still deadlocked, waiting for signals...");
934 if (signals_pending()) {
935 startSignalHandlers(cap
);
938 // either we have threads to run, or we were interrupted:
939 ASSERT(!emptyRunQueue(cap
) || sched_state
>= SCHED_INTERRUPTING
);
945 #if !defined(THREADED_RTS)
946 /* Probably a real deadlock. Send the current main thread the
947 * Deadlock exception.
949 if (task
->incall
->tso
) {
950 switch (task
->incall
->tso
->why_blocked
) {
952 case BlockedOnBlackHole
:
953 case BlockedOnMsgThrowTo
:
955 throwToSingleThreaded(cap
, task
->incall
->tso
,
956 (StgClosure
*)nonTermination_closure
);
959 barf("deadlock: main thread blocked in a strange way");
968 /* ----------------------------------------------------------------------------
969 * Send pending messages (PARALLEL_HASKELL only)
970 * ------------------------------------------------------------------------- */
972 #if defined(PARALLEL_HASKELL)
974 scheduleSendPendingMessages(void)
977 # if defined(PAR) // global Mem.Mgmt., omit for now
978 if (PendingFetches
!= END_BF_QUEUE
) {
983 if (RtsFlags
.ParFlags
.BufferTime
) {
984 // if we use message buffering, we must send away all message
985 // packets which have become too old...
991 /* ----------------------------------------------------------------------------
992 * Process message in the current Capability's inbox
993 * ------------------------------------------------------------------------- */
996 scheduleProcessInbox (Capability
**pcap USED_IF_THREADS
)
998 #if defined(THREADED_RTS)
1001 Capability
*cap
= *pcap
;
1003 while (!emptyInbox(cap
)) {
1004 if (cap
->r
.rCurrentNursery
->link
== NULL
||
1005 g0
->n_new_large_words
>= large_alloc_lim
) {
1006 scheduleDoGC(pcap
, cap
->running_task
, rtsFalse
);
1010 // don't use a blocking acquire; if the lock is held by
1011 // another thread then just carry on. This seems to avoid
1012 // getting stuck in a message ping-pong situation with other
1013 // processors. We'll check the inbox again later anyway.
1015 // We should really use a more efficient queue data structure
1016 // here. The trickiness is that we must ensure a Capability
1017 // never goes idle if the inbox is non-empty, which is why we
1018 // use cap->lock (cap->lock is released as the last thing
1019 // before going idle; see Capability.c:releaseCapability()).
1020 r
= TRY_ACQUIRE_LOCK(&cap
->lock
);
1024 cap
->inbox
= (Message
*)END_TSO_QUEUE
;
1026 RELEASE_LOCK(&cap
->lock
);
1028 while (m
!= (Message
*)END_TSO_QUEUE
) {
1030 executeMessage(cap
, m
);
1037 /* ----------------------------------------------------------------------------
1038 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1039 * ------------------------------------------------------------------------- */
1041 #if defined(THREADED_RTS)
1043 scheduleActivateSpark(Capability
*cap
)
1045 if (anySparks() && !cap
->disabled
)
1047 createSparkThread(cap
);
1048 debugTrace(DEBUG_sched
, "creating a spark thread");
1051 #endif // PARALLEL_HASKELL || THREADED_RTS
1053 /* ----------------------------------------------------------------------------
1054 * After running a thread...
1055 * ------------------------------------------------------------------------- */
1058 schedulePostRunThread (Capability
*cap
, StgTSO
*t
)
1060 // We have to be able to catch transactions that are in an
1061 // infinite loop as a result of seeing an inconsistent view of
1065 // [a,b] <- mapM readTVar [ta,tb]
1066 // when (a == b) loop
1068 // and a is never equal to b given a consistent view of memory.
1070 if (t
-> trec
!= NO_TREC
&& t
-> why_blocked
== NotBlocked
) {
1071 if (!stmValidateNestOfTransactions (t
-> trec
)) {
1072 debugTrace(DEBUG_sched
| DEBUG_stm
,
1073 "trec %p found wasting its time", t
);
1075 // strip the stack back to the
1076 // ATOMICALLY_FRAME, aborting the (nested)
1077 // transaction, and saving the stack of any
1078 // partially-evaluated thunks on the heap.
1079 throwToSingleThreaded_(cap
, t
, NULL
, rtsTrue
);
1081 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1085 /* some statistics gathering in the parallel case */
1088 /* -----------------------------------------------------------------------------
1089 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1090 * -------------------------------------------------------------------------- */
1093 scheduleHandleHeapOverflow( Capability
*cap
, StgTSO
*t
)
1095 // did the task ask for a large block?
1096 if (cap
->r
.rHpAlloc
> BLOCK_SIZE
) {
1097 // if so, get one and push it on the front of the nursery.
1101 blocks
= (lnat
)BLOCK_ROUND_UP(cap
->r
.rHpAlloc
) / BLOCK_SIZE
;
1103 if (blocks
> BLOCKS_PER_MBLOCK
) {
1104 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap
->r
.rHpAlloc
);
1107 debugTrace(DEBUG_sched
,
1108 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1109 (long)t
->id
, what_next_strs
[t
->what_next
], blocks
);
1111 // don't do this if the nursery is (nearly) full, we'll GC first.
1112 if (cap
->r
.rCurrentNursery
->link
!= NULL
||
1113 cap
->r
.rNursery
->n_blocks
== 1) { // paranoia to prevent infinite loop
1114 // if the nursery has only one block.
1116 bd
= allocGroup_lock(blocks
);
1117 cap
->r
.rNursery
->n_blocks
+= blocks
;
1119 // link the new group into the list
1120 bd
->link
= cap
->r
.rCurrentNursery
;
1121 bd
->u
.back
= cap
->r
.rCurrentNursery
->u
.back
;
1122 if (cap
->r
.rCurrentNursery
->u
.back
!= NULL
) {
1123 cap
->r
.rCurrentNursery
->u
.back
->link
= bd
;
1125 cap
->r
.rNursery
->blocks
= bd
;
1127 cap
->r
.rCurrentNursery
->u
.back
= bd
;
1129 // initialise it as a nursery block. We initialise the
1130 // step, gen_no, and flags field of *every* sub-block in
1131 // this large block, because this is easier than making
1132 // sure that we always find the block head of a large
1133 // block whenever we call Bdescr() (eg. evacuate() and
1134 // isAlive() in the GC would both have to do this, at
1138 for (x
= bd
; x
< bd
+ blocks
; x
++) {
1139 initBdescr(x
,g0
,g0
);
1145 // This assert can be a killer if the app is doing lots
1146 // of large block allocations.
1147 IF_DEBUG(sanity
, checkNurserySanity(cap
->r
.rNursery
));
1149 // now update the nursery to point to the new block
1150 cap
->r
.rCurrentNursery
= bd
;
1152 // we might be unlucky and have another thread get on the
1153 // run queue before us and steal the large block, but in that
1154 // case the thread will just end up requesting another large
1156 pushOnRunQueue(cap
,t
);
1157 return rtsFalse
; /* not actually GC'ing */
1161 if (cap
->r
.rHpLim
== NULL
|| cap
->context_switch
) {
1162 // Sometimes we miss a context switch, e.g. when calling
1163 // primitives in a tight loop, MAYBE_GC() doesn't check the
1164 // context switch flag, and we end up waiting for a GC.
1165 // See #1984, and concurrent/should_run/1984
1166 cap
->context_switch
= 0;
1167 appendToRunQueue(cap
,t
);
1169 pushOnRunQueue(cap
,t
);
1172 /* actual GC is done at the end of the while loop in schedule() */
1175 /* -----------------------------------------------------------------------------
1176 * Handle a thread that returned to the scheduler with ThreadYielding
1177 * -------------------------------------------------------------------------- */
1180 scheduleHandleYield( Capability
*cap
, StgTSO
*t
, nat prev_what_next
)
1182 /* put the thread back on the run queue. Then, if we're ready to
1183 * GC, check whether this is the last task to stop. If so, wake
1184 * up the GC thread. getThread will block during a GC until the
1188 ASSERT(t
->_link
== END_TSO_QUEUE
);
1190 // Shortcut if we're just switching evaluators: don't bother
1191 // doing stack squeezing (which can be expensive), just run the
1193 if (cap
->context_switch
== 0 && t
->what_next
!= prev_what_next
) {
1194 debugTrace(DEBUG_sched
,
1195 "--<< thread %ld (%s) stopped to switch evaluators",
1196 (long)t
->id
, what_next_strs
[t
->what_next
]);
1200 // Reset the context switch flag. We don't do this just before
1201 // running the thread, because that would mean we would lose ticks
1202 // during GC, which can lead to unfair scheduling (a thread hogs
1203 // the CPU because the tick always arrives during GC). This way
1204 // penalises threads that do a lot of allocation, but that seems
1205 // better than the alternative.
1206 if (cap
->context_switch
!= 0) {
1207 cap
->context_switch
= 0;
1208 appendToRunQueue(cap
,t
);
1210 pushOnRunQueue(cap
,t
);
1214 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1220 /* -----------------------------------------------------------------------------
1221 * Handle a thread that returned to the scheduler with ThreadBlocked
1222 * -------------------------------------------------------------------------- */
1225 scheduleHandleThreadBlocked( StgTSO
*t
1232 // We don't need to do anything. The thread is blocked, and it
1233 // has tidied up its stack and placed itself on whatever queue
1234 // it needs to be on.
1236 // ASSERT(t->why_blocked != NotBlocked);
1237 // Not true: for example,
1238 // - the thread may have woken itself up already, because
1239 // threadPaused() might have raised a blocked throwTo
1240 // exception, see maybePerformBlockedException().
1243 traceThreadStatus(DEBUG_sched
, t
);
1247 /* -----------------------------------------------------------------------------
1248 * Handle a thread that returned to the scheduler with ThreadFinished
1249 * -------------------------------------------------------------------------- */
1252 scheduleHandleThreadFinished (Capability
*cap STG_UNUSED
, Task
*task
, StgTSO
*t
)
1254 /* Need to check whether this was a main thread, and if so,
1255 * return with the return value.
1257 * We also end up here if the thread kills itself with an
1258 * uncaught exception, see Exception.cmm.
1261 // blocked exceptions can now complete, even if the thread was in
1262 // blocked mode (see #2910).
1263 awakenBlockedExceptionQueue (cap
, t
);
1266 // Check whether the thread that just completed was a bound
1267 // thread, and if so return with the result.
1269 // There is an assumption here that all thread completion goes
1270 // through this point; we need to make sure that if a thread
1271 // ends up in the ThreadKilled state, that it stays on the run
1272 // queue so it can be dealt with here.
1277 if (t
->bound
!= task
->incall
) {
1278 #if !defined(THREADED_RTS)
1279 // Must be a bound thread that is not the topmost one. Leave
1280 // it on the run queue until the stack has unwound to the
1281 // point where we can deal with this. Leaving it on the run
1282 // queue also ensures that the garbage collector knows about
1283 // this thread and its return value (it gets dropped from the
1284 // step->threads list so there's no other way to find it).
1285 appendToRunQueue(cap
,t
);
1288 // this cannot happen in the threaded RTS, because a
1289 // bound thread can only be run by the appropriate Task.
1290 barf("finished bound thread that isn't mine");
1294 ASSERT(task
->incall
->tso
== t
);
1296 if (t
->what_next
== ThreadComplete
) {
1297 if (task
->incall
->ret
) {
1298 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1299 *(task
->incall
->ret
) = (StgClosure
*)task
->incall
->tso
->stackobj
->sp
[1];
1301 task
->incall
->stat
= Success
;
1303 if (task
->incall
->ret
) {
1304 *(task
->incall
->ret
) = NULL
;
1306 if (sched_state
>= SCHED_INTERRUPTING
) {
1307 if (heap_overflow
) {
1308 task
->incall
->stat
= HeapExhausted
;
1310 task
->incall
->stat
= Interrupted
;
1313 task
->incall
->stat
= Killed
;
1317 removeThreadLabel((StgWord
)task
->incall
->tso
->id
);
1320 // We no longer consider this thread and task to be bound to
1321 // each other. The TSO lives on until it is GC'd, but the
1322 // task is about to be released by the caller, and we don't
1323 // want anyone following the pointer from the TSO to the
1324 // defunct task (which might have already been
1325 // re-used). This was a real bug: the GC updated
1326 // tso->bound->tso which lead to a deadlock.
1328 task
->incall
->tso
= NULL
;
1330 return rtsTrue
; // tells schedule() to return
1336 /* -----------------------------------------------------------------------------
1337 * Perform a heap census
1338 * -------------------------------------------------------------------------- */
1341 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED
)
1343 // When we have +RTS -i0 and we're heap profiling, do a census at
1344 // every GC. This lets us get repeatable runs for debugging.
1345 if (performHeapProfile
||
1346 (RtsFlags
.ProfFlags
.heapProfileInterval
==0 &&
1347 RtsFlags
.ProfFlags
.doHeapProfile
&& ready_to_gc
)) {
1354 /* -----------------------------------------------------------------------------
1355 * Start a synchronisation of all capabilities
1356 * -------------------------------------------------------------------------- */
1359 // 0 if we successfully got a sync
1360 // non-0 if there was another sync request in progress,
1361 // and we yielded to it. The value returned is the
1362 // type of the other sync request.
1364 #if defined(THREADED_RTS)
1365 static nat
requestSync (Capability
**pcap
, Task
*task
, nat sync_type
)
1367 nat prev_pending_sync
;
1369 prev_pending_sync
= cas(&pending_sync
, 0, sync_type
);
1371 if (prev_pending_sync
)
1374 debugTrace(DEBUG_sched
, "someone else is trying to sync (%d)...",
1377 yieldCapability(pcap
,task
);
1378 } while (pending_sync
);
1379 return prev_pending_sync
; // NOTE: task->cap might have changed now
1388 // Grab all the capabilities except the one we already hold. Used
1389 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1390 // before a fork (SYNC_OTHER).
1392 // Only call this after requestSync(), otherwise a deadlock might
1393 // ensue if another thread is trying to synchronise.
1395 static void acquireAllCapabilities(Capability
*cap
, Task
*task
)
1400 for (i
=0; i
< n_capabilities
; i
++) {
1401 debugTrace(DEBUG_sched
, "grabbing all the capabilies (%d/%d)", i
, n_capabilities
);
1402 tmpcap
= &capabilities
[i
];
1403 if (tmpcap
!= cap
) {
1404 // we better hope this task doesn't get migrated to
1405 // another Capability while we're waiting for this one.
1406 // It won't, because load balancing happens while we have
1407 // all the Capabilities, but even so it's a slightly
1408 // unsavoury invariant.
1410 waitForReturnCapability(&tmpcap
, task
);
1411 if (tmpcap
->no
!= i
) {
1412 barf("acquireAllCapabilities: got the wrong capability");
1419 static void releaseAllCapabilities(Capability
*cap
, Task
*task
)
1423 for (i
= 0; i
< n_capabilities
; i
++) {
1425 task
->cap
= &capabilities
[i
];
1426 releaseCapability(&capabilities
[i
]);
1433 /* -----------------------------------------------------------------------------
1434 * Perform a garbage collection if necessary
1435 * -------------------------------------------------------------------------- */
1438 scheduleDoGC (Capability
**pcap
, Task
*task USED_IF_THREADS
,
1439 rtsBool force_major
)
1441 Capability
*cap
= *pcap
;
1442 rtsBool heap_census
;
1444 rtsBool idle_cap
[n_capabilities
];
1450 if (sched_state
== SCHED_SHUTTING_DOWN
) {
1451 // The final GC has already been done, and the system is
1452 // shutting down. We'll probably deadlock if we try to GC
1458 if (sched_state
< SCHED_INTERRUPTING
1459 && RtsFlags
.ParFlags
.parGcEnabled
1460 && N
>= RtsFlags
.ParFlags
.parGcGen
1461 && ! oldest_gen
->mark
)
1463 gc_type
= SYNC_GC_PAR
;
1465 gc_type
= SYNC_GC_SEQ
;
1468 // In order to GC, there must be no threads running Haskell code.
1469 // Therefore, the GC thread needs to hold *all* the capabilities,
1470 // and release them after the GC has completed.
1472 // This seems to be the simplest way: previous attempts involved
1473 // making all the threads with capabilities give up their
1474 // capabilities and sleep except for the *last* one, which
1475 // actually did the GC. But it's quite hard to arrange for all
1476 // the other tasks to sleep and stay asleep.
1479 /* Other capabilities are prevented from running yet more Haskell
1480 threads if pending_sync is set. Tested inside
1481 yieldCapability() and releaseCapability() in Capability.c */
1484 sync
= requestSync(pcap
, task
, gc_type
);
1486 if (sync
== SYNC_GC_SEQ
|| sync
== SYNC_GC_PAR
) {
1487 // someone else had a pending sync request for a GC, so
1488 // let's assume GC has been done and we don't need to GC
1492 if (sched_state
== SCHED_SHUTTING_DOWN
) {
1493 // The scheduler might now be shutting down. We tested
1494 // this above, but it might have become true since then as
1495 // we yielded the capability in requestSync().
1500 interruptAllCapabilities();
1502 // The final shutdown GC is always single-threaded, because it's
1503 // possible that some of the Capabilities have no worker threads.
1505 if (gc_type
== SYNC_GC_SEQ
)
1507 traceEventRequestSeqGc(cap
);
1511 traceEventRequestParGc(cap
);
1512 debugTrace(DEBUG_sched
, "ready_to_gc, grabbing GC threads");
1515 if (gc_type
== SYNC_GC_SEQ
)
1517 // single-threaded GC: grab all the capabilities
1518 acquireAllCapabilities(cap
,task
);
1522 // If we are load-balancing collections in this
1523 // generation, then we require all GC threads to participate
1524 // in the collection. Otherwise, we only require active
1525 // threads to participate, and we set gc_threads[i]->idle for
1526 // any idle capabilities. The rationale here is that waking
1527 // up an idle Capability takes much longer than just doing any
1528 // GC work on its behalf.
1530 if (RtsFlags
.ParFlags
.parGcNoSyncWithIdle
== 0
1531 || (RtsFlags
.ParFlags
.parGcLoadBalancingEnabled
&&
1532 N
>= RtsFlags
.ParFlags
.parGcLoadBalancingGen
)) {
1533 for (i
=0; i
< n_capabilities
; i
++) {
1534 if (capabilities
[i
].disabled
) {
1535 idle_cap
[i
] = tryGrabCapability(&capabilities
[i
], task
);
1537 idle_cap
[i
] = rtsFalse
;
1541 for (i
=0; i
< n_capabilities
; i
++) {
1542 if (capabilities
[i
].disabled
) {
1543 idle_cap
[i
] = tryGrabCapability(&capabilities
[i
], task
);
1544 } else if (i
== cap
->no
||
1545 capabilities
[i
].idle
< RtsFlags
.ParFlags
.parGcNoSyncWithIdle
) {
1546 idle_cap
[i
] = rtsFalse
;
1548 idle_cap
[i
] = tryGrabCapability(&capabilities
[i
], task
);
1550 n_failed_trygrab_idles
++;
1558 // We set the gc_thread[i]->idle flag if that
1559 // capability/thread is not participating in this collection.
1560 // We also keep a local record of which capabilities are idle
1561 // in idle_cap[], because scheduleDoGC() is re-entrant:
1562 // another thread might start a GC as soon as we've finished
1563 // this one, and thus the gc_thread[]->idle flags are invalid
1564 // as soon as we release any threads after GC. Getting this
1565 // wrong leads to a rare and hard to debug deadlock!
1567 for (i
=0; i
< n_capabilities
; i
++) {
1568 gc_threads
[i
]->idle
= idle_cap
[i
];
1569 capabilities
[i
].idle
++;
1572 // For all capabilities participating in this GC, wait until
1573 // they have stopped mutating and are standing by for GC.
1574 waitForGcThreads(cap
);
1576 #if defined(THREADED_RTS)
1577 // Stable point where we can do a global check on our spark counters
1578 ASSERT(checkSparkCountInvariant());
1584 IF_DEBUG(scheduler
, printAllThreads());
1586 delete_threads_and_gc
:
1588 * We now have all the capabilities; if we're in an interrupting
1589 * state, then we should take the opportunity to delete all the
1590 * threads in the system.
1592 if (sched_state
== SCHED_INTERRUPTING
) {
1593 deleteAllThreads(cap
);
1594 #if defined(THREADED_RTS)
1595 // Discard all the sparks from every Capability. Why?
1596 // They'll probably be GC'd anyway since we've killed all the
1597 // threads. It just avoids the GC having to do any work to
1598 // figure out that any remaining sparks are garbage.
1599 for (i
= 0; i
< n_capabilities
; i
++) {
1600 capabilities
[i
].spark_stats
.gcd
+=
1601 sparkPoolSize(capabilities
[i
].sparks
);
1602 // No race here since all Caps are stopped.
1603 discardSparksCap(&capabilities
[i
]);
1606 sched_state
= SCHED_SHUTTING_DOWN
;
1610 * When there are disabled capabilities, we want to migrate any
1611 * threads away from them. Normally this happens in the
1612 * scheduler's loop, but only for unbound threads - it's really
1613 * hard for a bound thread to migrate itself. So we have another
1616 #if defined(THREADED_RTS)
1617 for (i
= enabled_capabilities
; i
< n_capabilities
; i
++) {
1618 Capability
*tmp_cap
, *dest_cap
;
1619 tmp_cap
= &capabilities
[i
];
1620 ASSERT(tmp_cap
->disabled
);
1622 dest_cap
= &capabilities
[i
% enabled_capabilities
];
1623 while (!emptyRunQueue(tmp_cap
)) {
1624 tso
= popRunQueue(tmp_cap
);
1625 migrateThread(tmp_cap
, tso
, dest_cap
);
1626 if (tso
->bound
) { tso
->bound
->task
->cap
= dest_cap
; }
1632 heap_census
= scheduleNeedHeapProfile(rtsTrue
);
1634 #if defined(THREADED_RTS)
1635 // reset pending_sync *before* GC, so that when the GC threads
1636 // emerge they don't immediately re-enter the GC.
1638 GarbageCollect(force_major
|| heap_census
, heap_census
, gc_type
, cap
);
1640 GarbageCollect(force_major
|| heap_census
, heap_census
, 0, cap
);
1643 traceSparkCounters(cap
);
1645 if (recent_activity
== ACTIVITY_INACTIVE
&& force_major
)
1647 // We are doing a GC because the system has been idle for a
1648 // timeslice and we need to check for deadlock. Record the
1649 // fact that we've done a GC and turn off the timer signal;
1650 // it will get re-enabled if we run any threads after the GC.
1651 recent_activity
= ACTIVITY_DONE_GC
;
1656 // the GC might have taken long enough for the timer to set
1657 // recent_activity = ACTIVITY_INACTIVE, but we aren't
1658 // necessarily deadlocked:
1659 recent_activity
= ACTIVITY_YES
;
1662 #if defined(THREADED_RTS)
1663 // Stable point where we can do a global check on our spark counters
1664 ASSERT(checkSparkCountInvariant());
1667 // The heap census itself is done during GarbageCollect().
1669 performHeapProfile
= rtsFalse
;
1672 #if defined(THREADED_RTS)
1673 if (gc_type
== SYNC_GC_PAR
)
1675 releaseGCThreads(cap
);
1676 for (i
= 0; i
< n_capabilities
; i
++) {
1679 ASSERT(capabilities
[i
].running_task
== task
);
1680 task
->cap
= &capabilities
[i
];
1681 releaseCapability(&capabilities
[i
]);
1683 ASSERT(capabilities
[i
].running_task
!= task
);
1691 if (heap_overflow
&& sched_state
< SCHED_INTERRUPTING
) {
1692 // GC set the heap_overflow flag, so we should proceed with
1693 // an orderly shutdown now. Ultimately we want the main
1694 // thread to return to its caller with HeapExhausted, at which
1695 // point the caller should call hs_exit(). The first step is
1696 // to delete all the threads.
1698 // Another way to do this would be to raise an exception in
1699 // the main thread, which we really should do because it gives
1700 // the program a chance to clean up. But how do we find the
1701 // main thread? It should presumably be the same one that
1702 // gets ^C exceptions, but that's all done on the Haskell side
1703 // (GHC.TopHandler).
1704 sched_state
= SCHED_INTERRUPTING
;
1705 goto delete_threads_and_gc
;
1710 Once we are all together... this would be the place to balance all
1711 spark pools. No concurrent stealing or adding of new sparks can
1712 occur. Should be defined in Sparks.c. */
1713 balanceSparkPoolsCaps(n_capabilities
, capabilities
);
1716 #if defined(THREADED_RTS)
1717 if (gc_type
== SYNC_GC_SEQ
) {
1718 // release our stash of capabilities.
1719 releaseAllCapabilities(cap
, task
);
1726 /* ---------------------------------------------------------------------------
1727 * Singleton fork(). Do not copy any running threads.
1728 * ------------------------------------------------------------------------- */
1731 forkProcess(HsStablePtr
*entry
1732 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1737 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1748 debugTrace(DEBUG_sched
, "forking!");
1750 task
= newBoundTask();
1753 waitForReturnCapability(&cap
, task
);
1757 sync
= requestSync(&cap
, task
, SYNC_OTHER
);
1760 acquireAllCapabilities(cap
,task
);
1765 // no funny business: hold locks while we fork, otherwise if some
1766 // other thread is holding a lock when the fork happens, the data
1767 // structure protected by the lock will forever be in an
1768 // inconsistent state in the child. See also #1391.
1769 ACQUIRE_LOCK(&sched_mutex
);
1770 ACQUIRE_LOCK(&sm_mutex
);
1771 ACQUIRE_LOCK(&stable_mutex
);
1772 ACQUIRE_LOCK(&task
->lock
);
1774 for (i
=0; i
< n_capabilities
; i
++) {
1775 ACQUIRE_LOCK(&capabilities
[i
].lock
);
1778 stopTimer(); // See #4074
1780 #if defined(TRACING)
1781 flushEventLog(); // so that child won't inherit dirty file buffers
1786 if (pid
) { // parent
1788 startTimer(); // #4074
1790 RELEASE_LOCK(&sched_mutex
);
1791 RELEASE_LOCK(&sm_mutex
);
1792 RELEASE_LOCK(&stable_mutex
);
1793 RELEASE_LOCK(&task
->lock
);
1795 for (i
=0; i
< n_capabilities
; i
++) {
1796 releaseCapability_(&capabilities
[i
],rtsFalse
);
1797 RELEASE_LOCK(&capabilities
[i
].lock
);
1799 boundTaskExiting(task
);
1801 // just return the pid
1806 #if defined(THREADED_RTS)
1807 initMutex(&sched_mutex
);
1808 initMutex(&sm_mutex
);
1809 initMutex(&stable_mutex
);
1810 initMutex(&task
->lock
);
1812 for (i
=0; i
< n_capabilities
; i
++) {
1813 initMutex(&capabilities
[i
].lock
);
1821 // Now, all OS threads except the thread that forked are
1822 // stopped. We need to stop all Haskell threads, including
1823 // those involved in foreign calls. Also we need to delete
1824 // all Tasks, because they correspond to OS threads that are
1827 for (g
= 0; g
< RtsFlags
.GcFlags
.generations
; g
++) {
1828 for (t
= generations
[g
].threads
; t
!= END_TSO_QUEUE
; t
= next
) {
1829 next
= t
->global_link
;
1830 // don't allow threads to catch the ThreadKilled
1831 // exception, but we do want to raiseAsync() because these
1832 // threads may be evaluating thunks that we need later.
1833 deleteThread_(t
->cap
,t
);
1835 // stop the GC from updating the InCall to point to
1836 // the TSO. This is only necessary because the
1837 // OSThread bound to the TSO has been killed, and
1838 // won't get a chance to exit in the usual way (see
1839 // also scheduleHandleThreadFinished).
1844 discardTasksExcept(task
);
1846 for (i
=0; i
< n_capabilities
; i
++) {
1847 cap
= &capabilities
[i
];
1849 // Empty the run queue. It seems tempting to let all the
1850 // killed threads stay on the run queue as zombies to be
1851 // cleaned up later, but some of them may correspond to
1852 // bound threads for which the corresponding Task does not
1854 cap
->run_queue_hd
= END_TSO_QUEUE
;
1855 cap
->run_queue_tl
= END_TSO_QUEUE
;
1857 // Any suspended C-calling Tasks are no more, their OS threads
1859 cap
->suspended_ccalls
= NULL
;
1861 #if defined(THREADED_RTS)
1862 // Wipe our spare workers list, they no longer exist. New
1863 // workers will be created if necessary.
1864 cap
->spare_workers
= NULL
;
1865 cap
->n_spare_workers
= 0;
1866 cap
->returning_tasks_hd
= NULL
;
1867 cap
->returning_tasks_tl
= NULL
;
1870 // Release all caps except 0, we'll use that for starting
1871 // the IO manager and running the client action below.
1874 releaseCapability(cap
);
1877 cap
= &capabilities
[0];
1880 // Empty the threads lists. Otherwise, the garbage
1881 // collector may attempt to resurrect some of these threads.
1882 for (g
= 0; g
< RtsFlags
.GcFlags
.generations
; g
++) {
1883 generations
[g
].threads
= END_TSO_QUEUE
;
1886 // On Unix, all timers are reset in the child, so we need to start
1891 #if defined(THREADED_RTS)
1892 ioManagerStartCap(&cap
);
1895 rts_evalStableIO(&cap
, entry
, NULL
); // run the action
1896 rts_checkSchedStatus("forkProcess",cap
);
1899 hs_exit(); // clean up and exit
1900 stg_exit(EXIT_SUCCESS
);
1902 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1903 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1907 /* ---------------------------------------------------------------------------
1908 * Changing the number of Capabilities
1910 * Changing the number of Capabilities is very tricky! We can only do
1911 * it with the system fully stopped, so we do a full sync with
1912 * requestSync(SYNC_OTHER) and grab all the capabilities.
1914 * Then we resize the appropriate data structures, and update all
1915 * references to the old data structures which have now moved.
1916 * Finally we release the Capabilities we are holding, and start
1917 * worker Tasks on the new Capabilities we created.
1919 * ------------------------------------------------------------------------- */
1922 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS
)
1924 #if !defined(THREADED_RTS)
1925 if (new_n_capabilities
!= 1) {
1926 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1929 #elif defined(NOSMP)
1930 if (new_n_capabilities
!= 1) {
1931 errorBelch("setNumCapabilities: not supported on this platform");
1940 Capability
*old_capabilities
= NULL
;
1942 if (new_n_capabilities
== enabled_capabilities
) return;
1944 debugTrace(DEBUG_sched
, "changing the number of Capabilities from %d to %d",
1945 enabled_capabilities
, new_n_capabilities
);
1948 task
= cap
->running_task
;
1951 sync
= requestSync(&cap
, task
, SYNC_OTHER
);
1954 acquireAllCapabilities(cap
,task
);
1958 if (new_n_capabilities
< enabled_capabilities
)
1960 // Reducing the number of capabilities: we do not actually
1961 // remove the extra capabilities, we just mark them as
1962 // "disabled". This has the following effects:
1964 // - threads on a disabled capability are migrated away by the
1967 // - disabled capabilities do not participate in GC
1968 // (see scheduleDoGC())
1970 // - No spark threads are created on this capability
1971 // (see scheduleActivateSpark())
1973 // - We do not attempt to migrate threads *to* a disabled
1974 // capability (see schedulePushWork()).
1976 // but in other respects, a disabled capability remains
1977 // alive. Threads may be woken up on a disabled capability,
1978 // but they will be immediately migrated away.
1980 // This approach is much easier than trying to actually remove
1981 // the capability; we don't have to worry about GC data
1982 // structures, the nursery, etc.
1984 for (n
= new_n_capabilities
; n
< enabled_capabilities
; n
++) {
1985 capabilities
[n
].disabled
= rtsTrue
;
1986 traceCapDisable(&capabilities
[n
]);
1988 enabled_capabilities
= new_n_capabilities
;
1992 // Increasing the number of enabled capabilities.
1994 // enable any disabled capabilities, up to the required number
1995 for (n
= enabled_capabilities
;
1996 n
< new_n_capabilities
&& n
< n_capabilities
; n
++) {
1997 capabilities
[n
].disabled
= rtsFalse
;
1998 traceCapEnable(&capabilities
[n
]);
2000 enabled_capabilities
= n
;
2002 if (new_n_capabilities
> n_capabilities
) {
2003 #if defined(TRACING)
2004 // Allocate eventlog buffers for the new capabilities. Note this
2005 // must be done before calling moreCapabilities(), because that
2006 // will emit events about creating the new capabilities and adding
2007 // them to existing capsets.
2008 tracingAddCapapilities(n_capabilities
, new_n_capabilities
);
2011 // Resize the capabilities array
2012 // NB. after this, capabilities points somewhere new. Any pointers
2013 // of type (Capability *) are now invalid.
2014 old_capabilities
= moreCapabilities(n_capabilities
, new_n_capabilities
);
2016 // update our own cap pointer
2017 cap
= &capabilities
[cap
->no
];
2019 // Resize and update storage manager data structures
2020 storageAddCapabilities(n_capabilities
, new_n_capabilities
);
2022 // Update (Capability *) refs in the Task manager.
2023 updateCapabilityRefs();
2025 // Update (Capability *) refs from TSOs
2026 for (g
= 0; g
< RtsFlags
.GcFlags
.generations
; g
++) {
2027 for (t
= generations
[g
].threads
; t
!= END_TSO_QUEUE
; t
= t
->global_link
) {
2028 t
->cap
= &capabilities
[t
->cap
->no
];
2034 // We're done: release the original Capabilities
2035 releaseAllCapabilities(cap
,task
);
2037 // Start worker tasks on the new Capabilities
2038 startWorkerTasks(n_capabilities
, new_n_capabilities
);
2040 // finally, update n_capabilities
2041 if (new_n_capabilities
> n_capabilities
) {
2042 n_capabilities
= enabled_capabilities
= new_n_capabilities
;
2045 // We can't free the old array until now, because we access it
2046 // while updating pointers in updateCapabilityRefs().
2047 if (old_capabilities
) {
2048 stgFree(old_capabilities
);
2053 #endif // THREADED_RTS
2058 /* ---------------------------------------------------------------------------
2059 * Delete all the threads in the system
2060 * ------------------------------------------------------------------------- */
2063 deleteAllThreads ( Capability
*cap
)
2065 // NOTE: only safe to call if we own all capabilities.
2070 debugTrace(DEBUG_sched
,"deleting all threads");
2071 for (g
= 0; g
< RtsFlags
.GcFlags
.generations
; g
++) {
2072 for (t
= generations
[g
].threads
; t
!= END_TSO_QUEUE
; t
= next
) {
2073 next
= t
->global_link
;
2074 deleteThread(cap
,t
);
2078 // The run queue now contains a bunch of ThreadKilled threads. We
2079 // must not throw these away: the main thread(s) will be in there
2080 // somewhere, and the main scheduler loop has to deal with it.
2081 // Also, the run queue is the only thing keeping these threads from
2082 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2084 #if !defined(THREADED_RTS)
2085 ASSERT(blocked_queue_hd
== END_TSO_QUEUE
);
2086 ASSERT(sleeping_queue
== END_TSO_QUEUE
);
2090 /* -----------------------------------------------------------------------------
2091 Managing the suspended_ccalls list.
2092 Locks required: sched_mutex
2093 -------------------------------------------------------------------------- */
2096 suspendTask (Capability
*cap
, Task
*task
)
2100 incall
= task
->incall
;
2101 ASSERT(incall
->next
== NULL
&& incall
->prev
== NULL
);
2102 incall
->next
= cap
->suspended_ccalls
;
2103 incall
->prev
= NULL
;
2104 if (cap
->suspended_ccalls
) {
2105 cap
->suspended_ccalls
->prev
= incall
;
2107 cap
->suspended_ccalls
= incall
;
2111 recoverSuspendedTask (Capability
*cap
, Task
*task
)
2115 incall
= task
->incall
;
2117 incall
->prev
->next
= incall
->next
;
2119 ASSERT(cap
->suspended_ccalls
== incall
);
2120 cap
->suspended_ccalls
= incall
->next
;
2123 incall
->next
->prev
= incall
->prev
;
2125 incall
->next
= incall
->prev
= NULL
;
2128 /* ---------------------------------------------------------------------------
2129 * Suspending & resuming Haskell threads.
2131 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2132 * its capability before calling the C function. This allows another
2133 * task to pick up the capability and carry on running Haskell
2134 * threads. It also means that if the C call blocks, it won't lock
2137 * The Haskell thread making the C call is put to sleep for the
2138 * duration of the call, on the suspended_ccalling_threads queue. We
2139 * give out a token to the task, which it can use to resume the thread
2140 * on return from the C function.
2142 * If this is an interruptible C call, this means that the FFI call may be
2143 * unceremoniously terminated and should be scheduled on an
2144 * unbound worker thread.
2145 * ------------------------------------------------------------------------- */
2148 suspendThread (StgRegTable
*reg
, rtsBool interruptible
)
2155 StgWord32 saved_winerror
;
2158 saved_errno
= errno
;
2160 saved_winerror
= GetLastError();
2163 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2165 cap
= regTableToCapability(reg
);
2167 task
= cap
->running_task
;
2168 tso
= cap
->r
.rCurrentTSO
;
2170 traceEventStopThread(cap
, tso
, THREAD_SUSPENDED_FOREIGN_CALL
, 0);
2172 // XXX this might not be necessary --SDM
2173 tso
->what_next
= ThreadRunGHC
;
2175 threadPaused(cap
,tso
);
2177 if (interruptible
) {
2178 tso
->why_blocked
= BlockedOnCCall_Interruptible
;
2180 tso
->why_blocked
= BlockedOnCCall
;
2183 // Hand back capability
2184 task
->incall
->suspended_tso
= tso
;
2185 task
->incall
->suspended_cap
= cap
;
2187 ACQUIRE_LOCK(&cap
->lock
);
2189 suspendTask(cap
,task
);
2190 cap
->in_haskell
= rtsFalse
;
2191 releaseCapability_(cap
,rtsFalse
);
2193 RELEASE_LOCK(&cap
->lock
);
2195 errno
= saved_errno
;
2197 SetLastError(saved_winerror
);
2203 resumeThread (void *task_
)
2211 StgWord32 saved_winerror
;
2214 saved_errno
= errno
;
2216 saved_winerror
= GetLastError();
2219 incall
= task
->incall
;
2220 cap
= incall
->suspended_cap
;
2223 // Wait for permission to re-enter the RTS with the result.
2224 waitForReturnCapability(&cap
,task
);
2225 // we might be on a different capability now... but if so, our
2226 // entry on the suspended_ccalls list will also have been
2229 // Remove the thread from the suspended list
2230 recoverSuspendedTask(cap
,task
);
2232 tso
= incall
->suspended_tso
;
2233 incall
->suspended_tso
= NULL
;
2234 incall
->suspended_cap
= NULL
;
2235 tso
->_link
= END_TSO_QUEUE
; // no write barrier reqd
2237 traceEventRunThread(cap
, tso
);
2239 /* Reset blocking status */
2240 tso
->why_blocked
= NotBlocked
;
2242 if ((tso
->flags
& TSO_BLOCKEX
) == 0) {
2243 // avoid locking the TSO if we don't have to
2244 if (tso
->blocked_exceptions
!= END_BLOCKED_EXCEPTIONS_QUEUE
) {
2245 maybePerformBlockedException(cap
,tso
);
2249 cap
->r
.rCurrentTSO
= tso
;
2250 cap
->in_haskell
= rtsTrue
;
2251 errno
= saved_errno
;
2253 SetLastError(saved_winerror
);
2256 /* We might have GC'd, mark the TSO dirty again */
2258 dirty_STACK(cap
,tso
->stackobj
);
2260 IF_DEBUG(sanity
, checkTSO(tso
));
2265 /* ---------------------------------------------------------------------------
2268 * scheduleThread puts a thread on the end of the runnable queue.
2269 * This will usually be done immediately after a thread is created.
2270 * The caller of scheduleThread must create the thread using e.g.
2271 * createThread and push an appropriate closure
2272 * on this thread's stack before the scheduler is invoked.
2273 * ------------------------------------------------------------------------ */
2276 scheduleThread(Capability
*cap
, StgTSO
*tso
)
2278 // The thread goes at the *end* of the run-queue, to avoid possible
2279 // starvation of any threads already on the queue.
2280 appendToRunQueue(cap
,tso
);
2284 scheduleThreadOn(Capability
*cap
, StgWord cpu USED_IF_THREADS
, StgTSO
*tso
)
2286 tso
->flags
|= TSO_LOCKED
; // we requested explicit affinity; don't
2287 // move this thread from now on.
2288 #if defined(THREADED_RTS)
2289 cpu
%= enabled_capabilities
;
2290 if (cpu
== cap
->no
) {
2291 appendToRunQueue(cap
,tso
);
2293 migrateThread(cap
, tso
, &capabilities
[cpu
]);
2296 appendToRunQueue(cap
,tso
);
2301 scheduleWaitThread (StgTSO
* tso
, /*[out]*/HaskellObj
* ret
, Capability
**pcap
)
2304 DEBUG_ONLY( StgThreadID id
);
2309 // We already created/initialised the Task
2310 task
= cap
->running_task
;
2312 // This TSO is now a bound thread; make the Task and TSO
2313 // point to each other.
2314 tso
->bound
= task
->incall
;
2317 task
->incall
->tso
= tso
;
2318 task
->incall
->ret
= ret
;
2319 task
->incall
->stat
= NoStatus
;
2321 appendToRunQueue(cap
,tso
);
2323 DEBUG_ONLY( id
= tso
->id
);
2324 debugTrace(DEBUG_sched
, "new bound thread (%lu)", (unsigned long)id
);
2326 cap
= schedule(cap
,task
);
2328 ASSERT(task
->incall
->stat
!= NoStatus
);
2329 ASSERT_FULL_CAPABILITY_INVARIANTS(cap
,task
);
2331 debugTrace(DEBUG_sched
, "bound thread (%lu) finished", (unsigned long)id
);
2335 /* ----------------------------------------------------------------------------
2337 * ------------------------------------------------------------------------- */
2339 #if defined(THREADED_RTS)
2340 void scheduleWorker (Capability
*cap
, Task
*task
)
2342 // schedule() runs without a lock.
2343 cap
= schedule(cap
,task
);
2345 // On exit from schedule(), we have a Capability, but possibly not
2346 // the same one we started with.
2348 // During shutdown, the requirement is that after all the
2349 // Capabilities are shut down, all workers that are shutting down
2350 // have finished workerTaskStop(). This is why we hold on to
2351 // cap->lock until we've finished workerTaskStop() below.
2353 // There may be workers still involved in foreign calls; those
2354 // will just block in waitForReturnCapability() because the
2355 // Capability has been shut down.
2357 ACQUIRE_LOCK(&cap
->lock
);
2358 releaseCapability_(cap
,rtsFalse
);
2359 workerTaskStop(task
);
2360 RELEASE_LOCK(&cap
->lock
);
2364 /* ---------------------------------------------------------------------------
2365 * Start new worker tasks on Capabilities from--to
2366 * -------------------------------------------------------------------------- */
2369 startWorkerTasks (nat from USED_IF_THREADS
, nat to USED_IF_THREADS
)
2371 #if defined(THREADED_RTS)
2375 for (i
= from
; i
< to
; i
++) {
2376 cap
= &capabilities
[i
];
2377 ACQUIRE_LOCK(&cap
->lock
);
2378 startWorkerTask(cap
);
2379 RELEASE_LOCK(&cap
->lock
);
2384 /* ---------------------------------------------------------------------------
2387 * Initialise the scheduler. This resets all the queues - if the
2388 * queues contained any threads, they'll be garbage collected at the
2391 * ------------------------------------------------------------------------ */
2396 #if !defined(THREADED_RTS)
2397 blocked_queue_hd
= END_TSO_QUEUE
;
2398 blocked_queue_tl
= END_TSO_QUEUE
;
2399 sleeping_queue
= END_TSO_QUEUE
;
2402 sched_state
= SCHED_RUNNING
;
2403 recent_activity
= ACTIVITY_YES
;
2405 #if defined(THREADED_RTS)
2406 /* Initialise the mutex and condition variables used by
2408 initMutex(&sched_mutex
);
2411 ACQUIRE_LOCK(&sched_mutex
);
2413 /* A capability holds the state a native thread needs in
2414 * order to execute STG code. At least one capability is
2415 * floating around (only THREADED_RTS builds have more than one).
2422 * Eagerly start one worker to run each Capability, except for
2423 * Capability 0. The idea is that we're probably going to start a
2424 * bound thread on Capability 0 pretty soon, so we don't want a
2425 * worker task hogging it.
2427 startWorkerTasks(1, n_capabilities
);
2429 RELEASE_LOCK(&sched_mutex
);
2434 exitScheduler (rtsBool wait_foreign USED_IF_THREADS
)
2435 /* see Capability.c, shutdownCapability() */
2439 task
= newBoundTask();
2441 // If we haven't killed all the threads yet, do it now.
2442 if (sched_state
< SCHED_SHUTTING_DOWN
) {
2443 sched_state
= SCHED_INTERRUPTING
;
2444 Capability
*cap
= task
->cap
;
2445 waitForReturnCapability(&cap
,task
);
2446 scheduleDoGC(&cap
,task
,rtsFalse
);
2447 ASSERT(task
->incall
->tso
== NULL
);
2448 releaseCapability(cap
);
2450 sched_state
= SCHED_SHUTTING_DOWN
;
2452 shutdownCapabilities(task
, wait_foreign
);
2454 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2455 // n_failed_trygrab_idles, n_idle_caps);
2457 boundTaskExiting(task
);
2461 freeScheduler( void )
2465 ACQUIRE_LOCK(&sched_mutex
);
2466 still_running
= freeTaskManager();
2467 // We can only free the Capabilities if there are no Tasks still
2468 // running. We might have a Task about to return from a foreign
2469 // call into waitForReturnCapability(), for example (actually,
2470 // this should be the *only* thing that a still-running Task can
2471 // do at this point, and it will block waiting for the
2473 if (still_running
== 0) {
2475 if (n_capabilities
!= 1) {
2476 stgFree(capabilities
);
2479 RELEASE_LOCK(&sched_mutex
);
2480 #if defined(THREADED_RTS)
2481 closeMutex(&sched_mutex
);
2485 void markScheduler (evac_fn evac USED_IF_NOT_THREADS
,
2486 void *user USED_IF_NOT_THREADS
)
2488 #if !defined(THREADED_RTS)
2489 evac(user
, (StgClosure
**)(void *)&blocked_queue_hd
);
2490 evac(user
, (StgClosure
**)(void *)&blocked_queue_tl
);
2491 evac(user
, (StgClosure
**)(void *)&sleeping_queue
);
2495 /* -----------------------------------------------------------------------------
2498 This is the interface to the garbage collector from Haskell land.
2499 We provide this so that external C code can allocate and garbage
2500 collect when called from Haskell via _ccall_GC.
2501 -------------------------------------------------------------------------- */
2504 performGC_(rtsBool force_major
)
2507 Capability
*cap
= NULL
;
2509 // We must grab a new Task here, because the existing Task may be
2510 // associated with a particular Capability, and chained onto the
2511 // suspended_ccalls queue.
2512 task
= newBoundTask();
2514 waitForReturnCapability(&cap
,task
);
2515 scheduleDoGC(&cap
,task
,force_major
);
2516 releaseCapability(cap
);
2517 boundTaskExiting(task
);
2523 performGC_(rtsFalse
);
2527 performMajorGC(void)
2529 performGC_(rtsTrue
);
2532 /* ---------------------------------------------------------------------------
2534 - usually called inside a signal handler so it mustn't do anything fancy.
2535 ------------------------------------------------------------------------ */
2538 interruptStgRts(void)
2540 sched_state
= SCHED_INTERRUPTING
;
2541 interruptAllCapabilities();
2542 #if defined(THREADED_RTS)
2547 /* -----------------------------------------------------------------------------
2550 This function causes at least one OS thread to wake up and run the
2551 scheduler loop. It is invoked when the RTS might be deadlocked, or
2552 an external event has arrived that may need servicing (eg. a
2553 keyboard interrupt).
2555 In the single-threaded RTS we don't do anything here; we only have
2556 one thread anyway, and the event that caused us to want to wake up
2557 will have interrupted any blocking system call in progress anyway.
2558 -------------------------------------------------------------------------- */
2560 #if defined(THREADED_RTS)
2561 void wakeUpRts(void)
2563 // This forces the IO Manager thread to wakeup, which will
2564 // in turn ensure that some OS thread wakes up and runs the
2565 // scheduler loop, which will cause a GC and deadlock check.
2570 /* -----------------------------------------------------------------------------
2573 This is used for interruption (^C) and forking, and corresponds to
2574 raising an exception but without letting the thread catch the
2576 -------------------------------------------------------------------------- */
2579 deleteThread (Capability
*cap STG_UNUSED
, StgTSO
*tso
)
2581 // NOTE: must only be called on a TSO that we have exclusive
2582 // access to, because we will call throwToSingleThreaded() below.
2583 // The TSO must be on the run queue of the Capability we own, or
2584 // we must own all Capabilities.
2586 if (tso
->why_blocked
!= BlockedOnCCall
&&
2587 tso
->why_blocked
!= BlockedOnCCall_Interruptible
) {
2588 throwToSingleThreaded(tso
->cap
,tso
,NULL
);
2592 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2594 deleteThread_(Capability
*cap
, StgTSO
*tso
)
2595 { // for forkProcess only:
2596 // like deleteThread(), but we delete threads in foreign calls, too.
2598 if (tso
->why_blocked
== BlockedOnCCall
||
2599 tso
->why_blocked
== BlockedOnCCall_Interruptible
) {
2600 tso
->what_next
= ThreadKilled
;
2601 appendToRunQueue(tso
->cap
, tso
);
2603 deleteThread(cap
,tso
);
2608 /* -----------------------------------------------------------------------------
2609 raiseExceptionHelper
2611 This function is called by the raise# primitve, just so that we can
2612 move some of the tricky bits of raising an exception from C-- into
2613 C. Who knows, it might be a useful re-useable thing here too.
2614 -------------------------------------------------------------------------- */
2617 raiseExceptionHelper (StgRegTable
*reg
, StgTSO
*tso
, StgClosure
*exception
)
2619 Capability
*cap
= regTableToCapability(reg
);
2620 StgThunk
*raise_closure
= NULL
;
2622 StgRetInfoTable
*info
;
2624 // This closure represents the expression 'raise# E' where E
2625 // is the exception raise. It is used to overwrite all the
2626 // thunks which are currently under evaluataion.
2629 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2630 // LDV profiling: stg_raise_info has THUNK as its closure
2631 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2632 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2633 // 1 does not cause any problem unless profiling is performed.
2634 // However, when LDV profiling goes on, we need to linearly scan
2635 // small object pool, where raise_closure is stored, so we should
2636 // use MIN_UPD_SIZE.
2638 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2639 // sizeofW(StgClosure)+1);
2643 // Walk up the stack, looking for the catch frame. On the way,
2644 // we update any closures pointed to from update frames with the
2645 // raise closure that we just built.
2647 p
= tso
->stackobj
->sp
;
2649 info
= get_ret_itbl((StgClosure
*)p
);
2650 next
= p
+ stack_frame_sizeW((StgClosure
*)p
);
2651 switch (info
->i
.type
) {
2654 // Only create raise_closure if we need to.
2655 if (raise_closure
== NULL
) {
2657 (StgThunk
*)allocate(cap
,sizeofW(StgThunk
)+1);
2658 SET_HDR(raise_closure
, &stg_raise_info
, cap
->r
.rCCCS
);
2659 raise_closure
->payload
[0] = exception
;
2661 updateThunk(cap
, tso
, ((StgUpdateFrame
*)p
)->updatee
,
2662 (StgClosure
*)raise_closure
);
2666 case ATOMICALLY_FRAME
:
2667 debugTrace(DEBUG_stm
, "found ATOMICALLY_FRAME at %p", p
);
2668 tso
->stackobj
->sp
= p
;
2669 return ATOMICALLY_FRAME
;
2672 tso
->stackobj
->sp
= p
;
2675 case CATCH_STM_FRAME
:
2676 debugTrace(DEBUG_stm
, "found CATCH_STM_FRAME at %p", p
);
2677 tso
->stackobj
->sp
= p
;
2678 return CATCH_STM_FRAME
;
2680 case UNDERFLOW_FRAME
:
2681 tso
->stackobj
->sp
= p
;
2682 threadStackUnderflow(cap
,tso
);
2683 p
= tso
->stackobj
->sp
;
2687 tso
->stackobj
->sp
= p
;
2690 case CATCH_RETRY_FRAME
:
2699 /* -----------------------------------------------------------------------------
2700 findRetryFrameHelper
2702 This function is called by the retry# primitive. It traverses the stack
2703 leaving tso->sp referring to the frame which should handle the retry.
2705 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2706 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2708 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2709 create) because retries are not considered to be exceptions, despite the
2710 similar implementation.
2712 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2713 not be created within memory transactions.
2714 -------------------------------------------------------------------------- */
2717 findRetryFrameHelper (Capability
*cap
, StgTSO
*tso
)
2720 StgRetInfoTable
*info
;
2722 p
= tso
->stackobj
->sp
;
2724 info
= get_ret_itbl((StgClosure
*)p
);
2725 next
= p
+ stack_frame_sizeW((StgClosure
*)p
);
2726 switch (info
->i
.type
) {
2728 case ATOMICALLY_FRAME
:
2729 debugTrace(DEBUG_stm
,
2730 "found ATOMICALLY_FRAME at %p during retry", p
);
2731 tso
->stackobj
->sp
= p
;
2732 return ATOMICALLY_FRAME
;
2734 case CATCH_RETRY_FRAME
:
2735 debugTrace(DEBUG_stm
,
2736 "found CATCH_RETRY_FRAME at %p during retrry", p
);
2737 tso
->stackobj
->sp
= p
;
2738 return CATCH_RETRY_FRAME
;
2740 case CATCH_STM_FRAME
: {
2741 StgTRecHeader
*trec
= tso
-> trec
;
2742 StgTRecHeader
*outer
= trec
-> enclosing_trec
;
2743 debugTrace(DEBUG_stm
,
2744 "found CATCH_STM_FRAME at %p during retry", p
);
2745 debugTrace(DEBUG_stm
, "trec=%p outer=%p", trec
, outer
);
2746 stmAbortTransaction(cap
, trec
);
2747 stmFreeAbortedTRec(cap
, trec
);
2748 tso
-> trec
= outer
;
2753 case UNDERFLOW_FRAME
:
2754 threadStackUnderflow(cap
,tso
);
2755 p
= tso
->stackobj
->sp
;
2759 ASSERT(info
->i
.type
!= CATCH_FRAME
);
2760 ASSERT(info
->i
.type
!= STOP_FRAME
);
2767 /* -----------------------------------------------------------------------------
2768 resurrectThreads is called after garbage collection on the list of
2769 threads found to be garbage. Each of these threads will be woken
2770 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2771 on an MVar, or NonTermination if the thread was blocked on a Black
2774 Locks: assumes we hold *all* the capabilities.
2775 -------------------------------------------------------------------------- */
2778 resurrectThreads (StgTSO
*threads
)
2784 for (tso
= threads
; tso
!= END_TSO_QUEUE
; tso
= next
) {
2785 next
= tso
->global_link
;
2787 gen
= Bdescr((P_
)tso
)->gen
;
2788 tso
->global_link
= gen
->threads
;
2791 debugTrace(DEBUG_sched
, "resurrecting thread %lu", (unsigned long)tso
->id
);
2793 // Wake up the thread on the Capability it was last on
2796 switch (tso
->why_blocked
) {
2798 /* Called by GC - sched_mutex lock is currently held. */
2799 throwToSingleThreaded(cap
, tso
,
2800 (StgClosure
*)blockedIndefinitelyOnMVar_closure
);
2802 case BlockedOnBlackHole
:
2803 throwToSingleThreaded(cap
, tso
,
2804 (StgClosure
*)nonTermination_closure
);
2807 throwToSingleThreaded(cap
, tso
,
2808 (StgClosure
*)blockedIndefinitelyOnSTM_closure
);
2811 /* This might happen if the thread was blocked on a black hole
2812 * belonging to a thread that we've just woken up (raiseAsync
2813 * can wake up threads, remember...).
2816 case BlockedOnMsgThrowTo
:
2817 // This can happen if the target is masking, blocks on a
2818 // black hole, and then is found to be unreachable. In
2819 // this case, we want to let the target wake up and carry
2820 // on, and do nothing to this thread.
2823 barf("resurrectThreads: thread blocked in a strange way: %d",