1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
13 #include "sm/Storage.h"
17 #include "Interpreter.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
24 #include "ThreadLabels.h"
26 #include "Proftimer.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
32 #include "Capability.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
39 #include "RaiseAsync.h"
42 #include "ThreadPaused.h"
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
62 #include "eventlog/EventLog.h"
64 /* -----------------------------------------------------------------------------
66 * -------------------------------------------------------------------------- */
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO
*blocked_queue_hd
= NULL
;
71 StgTSO
*blocked_queue_tl
= NULL
;
72 StgTSO
*sleeping_queue
= NULL
; // perhaps replace with a hash table?
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
79 rtsBool heap_overflow
= rtsFalse
;
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
85 * NB. must be StgWord, we do xchg() on it.
87 volatile StgWord recent_activity
= ACTIVITY_YES
;
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
92 volatile StgWord sched_state
= SCHED_RUNNING
;
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
101 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
102 * in an MT setting, needed to signal that a worker thread shouldn't hang around
103 * in the scheduler when it is out of work.
105 rtsBool shutting_down_scheduler
= rtsFalse
;
108 * This mutex protects most of the global scheduler data in
109 * the THREADED_RTS runtime.
111 #if defined(THREADED_RTS)
115 #if !defined(mingw32_HOST_OS)
116 #define FORKPROCESS_PRIMOP_SUPPORTED
121 static nat n_failed_trygrab_idles
= 0, n_idle_caps
= 0;
124 /* -----------------------------------------------------------------------------
125 * static function prototypes
126 * -------------------------------------------------------------------------- */
128 static Capability
*schedule (Capability
*initialCapability
, Task
*task
);
131 // These functions all encapsulate parts of the scheduler loop, and are
132 // abstracted only to make the structure and control flow of the
133 // scheduler clearer.
135 static void schedulePreLoop (void);
136 static void scheduleFindWork (Capability
**pcap
);
137 #if defined(THREADED_RTS)
138 static void scheduleYield (Capability
**pcap
, Task
*task
);
140 #if defined(THREADED_RTS)
141 static nat
requestSync (Capability
**pcap
, Task
*task
, nat sync_type
);
142 static void acquireAllCapabilities(Capability
*cap
, Task
*task
);
143 static void releaseAllCapabilities(Capability
*cap
, Task
*task
);
144 static void startWorkerTasks (nat from USED_IF_THREADS
, nat to USED_IF_THREADS
);
146 static void scheduleStartSignalHandlers (Capability
*cap
);
147 static void scheduleCheckBlockedThreads (Capability
*cap
);
148 static void scheduleProcessInbox(Capability
**cap
);
149 static void scheduleDetectDeadlock (Capability
**pcap
, Task
*task
);
150 static void schedulePushWork(Capability
*cap
, Task
*task
);
151 #if defined(THREADED_RTS)
152 static void scheduleActivateSpark(Capability
*cap
);
154 static void schedulePostRunThread(Capability
*cap
, StgTSO
*t
);
155 static rtsBool
scheduleHandleHeapOverflow( Capability
*cap
, StgTSO
*t
);
156 static rtsBool
scheduleHandleYield( Capability
*cap
, StgTSO
*t
,
157 nat prev_what_next
);
158 static void scheduleHandleThreadBlocked( StgTSO
*t
);
159 static rtsBool
scheduleHandleThreadFinished( Capability
*cap
, Task
*task
,
161 static rtsBool
scheduleNeedHeapProfile(rtsBool ready_to_gc
);
162 static void scheduleDoGC(Capability
**pcap
, Task
*task
, rtsBool force_major
);
164 static void deleteThread (Capability
*cap
, StgTSO
*tso
);
165 static void deleteAllThreads (Capability
*cap
);
167 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
168 static void deleteThread_(Capability
*cap
, StgTSO
*tso
);
171 /* ---------------------------------------------------------------------------
172 Main scheduling loop.
174 We use round-robin scheduling, each thread returning to the
175 scheduler loop when one of these conditions is detected:
178 * timer expires (thread yields)
184 In a GranSim setup this loop iterates over the global event queue.
185 This revolves around the global event queue, which determines what
186 to do next. Therefore, it's more complicated than either the
187 concurrent or the parallel (GUM) setup.
188 This version has been entirely removed (JB 2008/08).
191 GUM iterates over incoming messages.
192 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
193 and sends out a fish whenever it has nothing to do; in-between
194 doing the actual reductions (shared code below) it processes the
195 incoming messages and deals with delayed operations
196 (see PendingFetches).
197 This is not the ugliest code you could imagine, but it's bloody close.
199 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
200 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
201 as well as future GUM versions. This file has been refurbished to
202 only contain valid code, which is however incomplete, refers to
203 invalid includes etc.
205 ------------------------------------------------------------------------ */
208 schedule (Capability
*initialCapability
, Task
*task
)
212 StgThreadReturnCode ret
;
215 #if defined(THREADED_RTS)
216 rtsBool first
= rtsTrue
;
219 cap
= initialCapability
;
221 // Pre-condition: this task owns initialCapability.
222 // The sched_mutex is *NOT* held
223 // NB. on return, we still hold a capability.
225 debugTrace (DEBUG_sched
, "cap %d: schedule()", initialCapability
->no
);
229 // -----------------------------------------------------------
230 // Scheduler loop starts here:
234 // Check whether we have re-entered the RTS from Haskell without
235 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
237 if (cap
->in_haskell
) {
238 errorBelch("schedule: re-entered unsafely.\n"
239 " Perhaps a 'foreign import unsafe' should be 'safe'?");
240 stg_exit(EXIT_FAILURE
);
243 // The interruption / shutdown sequence.
245 // In order to cleanly shut down the runtime, we want to:
246 // * make sure that all main threads return to their callers
247 // with the state 'Interrupted'.
248 // * clean up all OS threads assocated with the runtime
249 // * free all memory etc.
251 // So the sequence for ^C goes like this:
253 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
254 // arranges for some Capability to wake up
256 // * all threads in the system are halted, and the zombies are
257 // placed on the run queue for cleaning up. We acquire all
258 // the capabilities in order to delete the threads, this is
259 // done by scheduleDoGC() for convenience (because GC already
260 // needs to acquire all the capabilities). We can't kill
261 // threads involved in foreign calls.
263 // * somebody calls shutdownHaskell(), which calls exitScheduler()
265 // * sched_state := SCHED_SHUTTING_DOWN
267 // * all workers exit when the run queue on their capability
268 // drains. All main threads will also exit when their TSO
269 // reaches the head of the run queue and they can return.
271 // * eventually all Capabilities will shut down, and the RTS can
274 // * We might be left with threads blocked in foreign calls,
275 // we should really attempt to kill these somehow (TODO);
277 switch (sched_state
) {
280 case SCHED_INTERRUPTING
:
281 debugTrace(DEBUG_sched
, "SCHED_INTERRUPTING");
282 /* scheduleDoGC() deletes all the threads */
283 scheduleDoGC(&cap
,task
,rtsFalse
);
285 // after scheduleDoGC(), we must be shutting down. Either some
286 // other Capability did the final GC, or we did it above,
287 // either way we can fall through to the SCHED_SHUTTING_DOWN
289 ASSERT(sched_state
== SCHED_SHUTTING_DOWN
);
292 case SCHED_SHUTTING_DOWN
:
293 debugTrace(DEBUG_sched
, "SCHED_SHUTTING_DOWN");
294 // If we are a worker, just exit. If we're a bound thread
295 // then we will exit below when we've removed our TSO from
297 if (!isBoundTask(task
) && emptyRunQueue(cap
)) {
302 barf("sched_state: %d", sched_state
);
305 scheduleFindWork(&cap
);
307 /* work pushing, currently relevant only for THREADED_RTS:
308 (pushes threads, wakes up idle capabilities for stealing) */
309 schedulePushWork(cap
,task
);
311 scheduleDetectDeadlock(&cap
,task
);
313 // Normally, the only way we can get here with no threads to
314 // run is if a keyboard interrupt received during
315 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
316 // Additionally, it is not fatal for the
317 // threaded RTS to reach here with no threads to run.
319 // win32: might be here due to awaitEvent() being abandoned
320 // as a result of a console event having been delivered.
322 #if defined(THREADED_RTS)
326 // // don't yield the first time, we want a chance to run this
327 // // thread for a bit, even if there are others banging at the
330 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
333 scheduleYield(&cap
,task
);
335 if (emptyRunQueue(cap
)) continue; // look for work again
338 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
339 if ( emptyRunQueue(cap
) ) {
340 ASSERT(sched_state
>= SCHED_INTERRUPTING
);
345 // Get a thread to run
347 t
= popRunQueue(cap
);
349 // Sanity check the thread we're about to run. This can be
350 // expensive if there is lots of thread switching going on...
351 IF_DEBUG(sanity
,checkTSO(t
));
353 #if defined(THREADED_RTS)
354 // Check whether we can run this thread in the current task.
355 // If not, we have to pass our capability to the right task.
357 InCall
*bound
= t
->bound
;
360 if (bound
->task
== task
) {
361 // yes, the Haskell thread is bound to the current native thread
363 debugTrace(DEBUG_sched
,
364 "thread %lu bound to another OS thread",
365 (unsigned long)t
->id
);
366 // no, bound to a different Haskell thread: pass to that thread
367 pushOnRunQueue(cap
,t
);
371 // The thread we want to run is unbound.
372 if (task
->incall
->tso
) {
373 debugTrace(DEBUG_sched
,
374 "this OS thread cannot run thread %lu",
375 (unsigned long)t
->id
);
376 // no, the current native thread is bound to a different
377 // Haskell thread, so pass it to any worker thread
378 pushOnRunQueue(cap
,t
);
385 // If we're shutting down, and this thread has not yet been
386 // killed, kill it now. This sometimes happens when a finalizer
387 // thread is created by the final GC, or a thread previously
388 // in a foreign call returns.
389 if (sched_state
>= SCHED_INTERRUPTING
&&
390 !(t
->what_next
== ThreadComplete
|| t
->what_next
== ThreadKilled
)) {
394 // If this capability is disabled, migrate the thread away rather
395 // than running it. NB. but not if the thread is bound: it is
396 // really hard for a bound thread to migrate itself. Believe me,
397 // I tried several ways and couldn't find a way to do it.
398 // Instead, when everything is stopped for GC, we migrate all the
399 // threads on the run queue then (see scheduleDoGC()).
401 // ToDo: what about TSO_LOCKED? Currently we're migrating those
402 // when the number of capabilities drops, but we never migrate
403 // them back if it rises again. Presumably we should, but after
404 // the thread has been migrated we no longer know what capability
405 // it was originally on.
407 if (cap
->disabled
&& !t
->bound
) {
408 Capability
*dest_cap
= &capabilities
[cap
->no
% enabled_capabilities
];
409 migrateThread(cap
, t
, dest_cap
);
414 /* context switches are initiated by the timer signal, unless
415 * the user specified "context switch as often as possible", with
418 if (RtsFlags
.ConcFlags
.ctxtSwitchTicks
== 0
419 && !emptyThreadQueues(cap
)) {
420 cap
->context_switch
= 1;
425 // CurrentTSO is the thread to run. t might be different if we
426 // loop back to run_thread, so make sure to set CurrentTSO after
428 cap
->r
.rCurrentTSO
= t
;
430 startHeapProfTimer();
432 // ----------------------------------------------------------------------
433 // Run the current thread
435 ASSERT_FULL_CAPABILITY_INVARIANTS(cap
,task
);
436 ASSERT(t
->cap
== cap
);
437 ASSERT(t
->bound ? t
->bound
->task
->cap
== cap
: 1);
439 prev_what_next
= t
->what_next
;
441 errno
= t
->saved_errno
;
443 SetLastError(t
->saved_winerror
);
446 // reset the interrupt flag before running Haskell code
449 cap
->in_haskell
= rtsTrue
;
453 dirty_STACK(cap
,t
->stackobj
);
455 #if defined(THREADED_RTS)
456 if (recent_activity
== ACTIVITY_DONE_GC
) {
457 // ACTIVITY_DONE_GC means we turned off the timer signal to
458 // conserve power (see #1623). Re-enable it here.
460 prev
= xchg((P_
)&recent_activity
, ACTIVITY_YES
);
461 if (prev
== ACTIVITY_DONE_GC
) {
464 } else if (recent_activity
!= ACTIVITY_INACTIVE
) {
465 // If we reached ACTIVITY_INACTIVE, then don't reset it until
466 // we've done the GC. The thread running here might just be
467 // the IO manager thread that handle_tick() woke up via
469 recent_activity
= ACTIVITY_YES
;
473 traceEventRunThread(cap
, t
);
475 switch (prev_what_next
) {
479 /* Thread already finished, return to scheduler. */
480 ret
= ThreadFinished
;
486 r
= StgRun((StgFunPtr
) stg_returnToStackTop
, &cap
->r
);
487 cap
= regTableToCapability(r
);
492 case ThreadInterpret
:
493 cap
= interpretBCO(cap
);
498 barf("schedule: invalid what_next field");
501 cap
->in_haskell
= rtsFalse
;
503 // The TSO might have moved, eg. if it re-entered the RTS and a GC
504 // happened. So find the new location:
505 t
= cap
->r
.rCurrentTSO
;
507 // And save the current errno in this thread.
508 // XXX: possibly bogus for SMP because this thread might already
509 // be running again, see code below.
510 t
->saved_errno
= errno
;
512 // Similarly for Windows error code
513 t
->saved_winerror
= GetLastError();
516 if (ret
== ThreadBlocked
) {
517 if (t
->why_blocked
== BlockedOnBlackHole
) {
518 StgTSO
*owner
= blackHoleOwner(t
->block_info
.bh
->bh
);
519 traceEventStopThread(cap
, t
, t
->why_blocked
+ 6,
520 owner
!= NULL ? owner
->id
: 0);
522 traceEventStopThread(cap
, t
, t
->why_blocked
+ 6, 0);
525 traceEventStopThread(cap
, t
, ret
, 0);
528 ASSERT_FULL_CAPABILITY_INVARIANTS(cap
,task
);
529 ASSERT(t
->cap
== cap
);
531 // ----------------------------------------------------------------------
533 // Costs for the scheduler are assigned to CCS_SYSTEM
535 #if defined(PROFILING)
536 cap
->r
.rCCCS
= CCS_SYSTEM
;
539 schedulePostRunThread(cap
,t
);
541 ready_to_gc
= rtsFalse
;
545 ready_to_gc
= scheduleHandleHeapOverflow(cap
,t
);
549 // just adjust the stack for this thread, then pop it back
551 threadStackOverflow(cap
, t
);
552 pushOnRunQueue(cap
,t
);
556 if (scheduleHandleYield(cap
, t
, prev_what_next
)) {
557 // shortcut for switching between compiler/interpreter:
563 scheduleHandleThreadBlocked(t
);
567 if (scheduleHandleThreadFinished(cap
, task
, t
)) return cap
;
568 ASSERT_FULL_CAPABILITY_INVARIANTS(cap
,task
);
572 barf("schedule: invalid thread return code %d", (int)ret
);
575 if (ready_to_gc
|| scheduleNeedHeapProfile(ready_to_gc
)) {
576 scheduleDoGC(&cap
,task
,rtsFalse
);
578 } /* end of while() */
581 /* -----------------------------------------------------------------------------
582 * Run queue operations
583 * -------------------------------------------------------------------------- */
586 removeFromRunQueue (Capability
*cap
, StgTSO
*tso
)
588 if (tso
->block_info
.prev
== END_TSO_QUEUE
) {
589 ASSERT(cap
->run_queue_hd
== tso
);
590 cap
->run_queue_hd
= tso
->_link
;
592 setTSOLink(cap
, tso
->block_info
.prev
, tso
->_link
);
594 if (tso
->_link
== END_TSO_QUEUE
) {
595 ASSERT(cap
->run_queue_tl
== tso
);
596 cap
->run_queue_tl
= tso
->block_info
.prev
;
598 setTSOPrev(cap
, tso
->_link
, tso
->block_info
.prev
);
600 tso
->_link
= tso
->block_info
.prev
= END_TSO_QUEUE
;
602 IF_DEBUG(sanity
, checkRunQueue(cap
));
605 /* ----------------------------------------------------------------------------
606 * Setting up the scheduler loop
607 * ------------------------------------------------------------------------- */
610 schedulePreLoop(void)
612 // initialisation for scheduler - what cannot go into initScheduler()
614 #if defined(mingw32_HOST_OS) && !defined(GhcUnregisterised)
619 /* -----------------------------------------------------------------------------
622 * Search for work to do, and handle messages from elsewhere.
623 * -------------------------------------------------------------------------- */
626 scheduleFindWork (Capability
**pcap
)
628 scheduleStartSignalHandlers(*pcap
);
630 scheduleProcessInbox(pcap
);
632 scheduleCheckBlockedThreads(*pcap
);
634 #if defined(THREADED_RTS)
635 if (emptyRunQueue(*pcap
)) { scheduleActivateSpark(*pcap
); }
639 #if defined(THREADED_RTS)
640 STATIC_INLINE rtsBool
641 shouldYieldCapability (Capability
*cap
, Task
*task
, rtsBool didGcLast
)
643 // we need to yield this capability to someone else if..
644 // - another thread is initiating a GC, and we didn't just do a GC
645 // (see Note [GC livelock])
646 // - another Task is returning from a foreign call
647 // - the thread at the head of the run queue cannot be run
648 // by this Task (it is bound to another Task, or it is unbound
649 // and this task it bound).
651 // Note [GC livelock]
653 // If we are interrupted to do a GC, then we do not immediately do
654 // another one. This avoids a starvation situation where one
655 // Capability keeps forcing a GC and the other Capabilities make no
658 return ((pending_sync
&& !didGcLast
) ||
659 cap
->returning_tasks_hd
!= NULL
||
660 (!emptyRunQueue(cap
) && (task
->incall
->tso
== NULL
661 ? cap
->run_queue_hd
->bound
!= NULL
662 : cap
->run_queue_hd
->bound
!= task
->incall
)));
665 // This is the single place where a Task goes to sleep. There are
666 // two reasons it might need to sleep:
667 // - there are no threads to run
668 // - we need to yield this Capability to someone else
669 // (see shouldYieldCapability())
671 // Careful: the scheduler loop is quite delicate. Make sure you run
672 // the tests in testsuite/concurrent (all ways) after modifying this,
673 // and also check the benchmarks in nofib/parallel for regressions.
676 scheduleYield (Capability
**pcap
, Task
*task
)
678 Capability
*cap
= *pcap
;
679 int didGcLast
= rtsFalse
;
681 // if we have work, and we don't need to give up the Capability, continue.
683 if (!shouldYieldCapability(cap
,task
,rtsFalse
) &&
684 (!emptyRunQueue(cap
) ||
686 sched_state
>= SCHED_INTERRUPTING
)) {
690 // otherwise yield (sleep), and keep yielding if necessary.
692 didGcLast
= yieldCapability(&cap
,task
, !didGcLast
);
694 while (shouldYieldCapability(cap
,task
,didGcLast
));
696 // note there may still be no threads on the run queue at this
697 // point, the caller has to check.
704 /* -----------------------------------------------------------------------------
707 * Push work to other Capabilities if we have some.
708 * -------------------------------------------------------------------------- */
711 schedulePushWork(Capability
*cap USED_IF_THREADS
,
712 Task
*task USED_IF_THREADS
)
714 /* following code not for PARALLEL_HASKELL. I kept the call general,
715 future GUM versions might use pushing in a distributed setup */
716 #if defined(THREADED_RTS)
718 Capability
*free_caps
[n_capabilities
], *cap0
;
721 // migration can be turned off with +RTS -qm
722 if (!RtsFlags
.ParFlags
.migrate
) return;
724 // Check whether we have more threads on our run queue, or sparks
725 // in our pool, that we could hand to another Capability.
726 if (cap
->run_queue_hd
== END_TSO_QUEUE
) {
727 if (sparkPoolSizeCap(cap
) < 2) return;
729 if (cap
->run_queue_hd
->_link
== END_TSO_QUEUE
&&
730 sparkPoolSizeCap(cap
) < 1) return;
733 // First grab as many free Capabilities as we can.
734 for (i
=0, n_free_caps
=0; i
< n_capabilities
; i
++) {
735 cap0
= &capabilities
[i
];
736 if (cap
!= cap0
&& !cap0
->disabled
&& tryGrabCapability(cap0
,task
)) {
737 if (!emptyRunQueue(cap0
)
738 || cap0
->returning_tasks_hd
!= NULL
739 || cap0
->inbox
!= (Message
*)END_TSO_QUEUE
) {
740 // it already has some work, we just grabbed it at
741 // the wrong moment. Or maybe it's deadlocked!
742 releaseCapability(cap0
);
744 free_caps
[n_free_caps
++] = cap0
;
749 // we now have n_free_caps free capabilities stashed in
750 // free_caps[]. Share our run queue equally with them. This is
751 // probably the simplest thing we could do; improvements we might
752 // want to do include:
754 // - giving high priority to moving relatively new threads, on
755 // the gournds that they haven't had time to build up a
756 // working set in the cache on this CPU/Capability.
758 // - giving low priority to moving long-lived threads
760 if (n_free_caps
> 0) {
761 StgTSO
*prev
, *t
, *next
;
763 rtsBool pushed_to_all
;
766 debugTrace(DEBUG_sched
,
767 "cap %d: %s and %d free capabilities, sharing...",
769 (!emptyRunQueue(cap
) && cap
->run_queue_hd
->_link
!= END_TSO_QUEUE
)?
770 "excess threads on run queue":"sparks to share (>=2)",
775 pushed_to_all
= rtsFalse
;
778 if (cap
->run_queue_hd
!= END_TSO_QUEUE
) {
779 prev
= cap
->run_queue_hd
;
781 prev
->_link
= END_TSO_QUEUE
;
782 for (; t
!= END_TSO_QUEUE
; t
= next
) {
784 t
->_link
= END_TSO_QUEUE
;
785 if (t
->bound
== task
->incall
// don't move my bound thread
786 || tsoLocked(t
)) { // don't move a locked thread
787 setTSOLink(cap
, prev
, t
);
788 setTSOPrev(cap
, t
, prev
);
790 } else if (i
== n_free_caps
) {
792 pushed_to_all
= rtsTrue
;
796 setTSOLink(cap
, prev
, t
);
797 setTSOPrev(cap
, t
, prev
);
800 appendToRunQueue(free_caps
[i
],t
);
802 traceEventMigrateThread (cap
, t
, free_caps
[i
]->no
);
804 if (t
->bound
) { t
->bound
->task
->cap
= free_caps
[i
]; }
805 t
->cap
= free_caps
[i
];
809 cap
->run_queue_tl
= prev
;
811 IF_DEBUG(sanity
, checkRunQueue(cap
));
815 /* JB I left this code in place, it would work but is not necessary */
817 // If there are some free capabilities that we didn't push any
818 // threads to, then try to push a spark to each one.
819 if (!pushed_to_all
) {
821 // i is the next free capability to push to
822 for (; i
< n_free_caps
; i
++) {
823 if (emptySparkPoolCap(free_caps
[i
])) {
824 spark
= tryStealSpark(cap
->sparks
);
826 /* TODO: if anyone wants to re-enable this code then
827 * they must consider the fizzledSpark(spark) case
828 * and update the per-cap spark statistics.
830 debugTrace(DEBUG_sched
, "pushing spark %p to capability %d", spark
, free_caps
[i
]->no
);
832 traceEventStealSpark(free_caps
[i
], t
, cap
->no
);
834 newSpark(&(free_caps
[i
]->r
), spark
);
839 #endif /* SPARK_PUSHING */
841 // release the capabilities
842 for (i
= 0; i
< n_free_caps
; i
++) {
843 task
->cap
= free_caps
[i
];
844 releaseAndWakeupCapability(free_caps
[i
]);
847 task
->cap
= cap
; // reset to point to our Capability.
849 #endif /* THREADED_RTS */
853 /* ----------------------------------------------------------------------------
854 * Start any pending signal handlers
855 * ------------------------------------------------------------------------- */
857 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
859 scheduleStartSignalHandlers(Capability
*cap
)
861 if (RtsFlags
.MiscFlags
.install_signal_handlers
&& signals_pending()) {
862 // safe outside the lock
863 startSignalHandlers(cap
);
868 scheduleStartSignalHandlers(Capability
*cap STG_UNUSED
)
873 /* ----------------------------------------------------------------------------
874 * Check for blocked threads that can be woken up.
875 * ------------------------------------------------------------------------- */
878 scheduleCheckBlockedThreads(Capability
*cap USED_IF_NOT_THREADS
)
880 #if !defined(THREADED_RTS)
882 // Check whether any waiting threads need to be woken up. If the
883 // run queue is empty, and there are no other tasks running, we
884 // can wait indefinitely for something to happen.
886 if ( !emptyQueue(blocked_queue_hd
) || !emptyQueue(sleeping_queue
) )
888 awaitEvent (emptyRunQueue(cap
));
893 /* ----------------------------------------------------------------------------
894 * Detect deadlock conditions and attempt to resolve them.
895 * ------------------------------------------------------------------------- */
898 scheduleDetectDeadlock (Capability
**pcap
, Task
*task
)
900 Capability
*cap
= *pcap
;
902 * Detect deadlock: when we have no threads to run, there are no
903 * threads blocked, waiting for I/O, or sleeping, and all the
904 * other tasks are waiting for work, we must have a deadlock of
907 if ( emptyThreadQueues(cap
) )
909 #if defined(THREADED_RTS)
911 * In the threaded RTS, we only check for deadlock if there
912 * has been no activity in a complete timeslice. This means
913 * we won't eagerly start a full GC just because we don't have
914 * any threads to run currently.
916 if (recent_activity
!= ACTIVITY_INACTIVE
) return;
919 debugTrace(DEBUG_sched
, "deadlocked, forcing major GC...");
921 // Garbage collection can release some new threads due to
922 // either (a) finalizers or (b) threads resurrected because
923 // they are unreachable and will therefore be sent an
924 // exception. Any threads thus released will be immediately
926 scheduleDoGC (pcap
, task
, rtsTrue
/*force major GC*/);
928 // when force_major == rtsTrue. scheduleDoGC sets
929 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
932 if ( !emptyRunQueue(cap
) ) return;
934 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
935 /* If we have user-installed signal handlers, then wait
936 * for signals to arrive rather then bombing out with a
939 if ( RtsFlags
.MiscFlags
.install_signal_handlers
&& anyUserHandlers() ) {
940 debugTrace(DEBUG_sched
,
941 "still deadlocked, waiting for signals...");
945 if (signals_pending()) {
946 startSignalHandlers(cap
);
949 // either we have threads to run, or we were interrupted:
950 ASSERT(!emptyRunQueue(cap
) || sched_state
>= SCHED_INTERRUPTING
);
956 #if !defined(THREADED_RTS)
957 /* Probably a real deadlock. Send the current main thread the
958 * Deadlock exception.
960 if (task
->incall
->tso
) {
961 switch (task
->incall
->tso
->why_blocked
) {
963 case BlockedOnBlackHole
:
964 case BlockedOnMsgThrowTo
:
966 throwToSingleThreaded(cap
, task
->incall
->tso
,
967 (StgClosure
*)nonTermination_closure
);
970 barf("deadlock: main thread blocked in a strange way");
979 /* ----------------------------------------------------------------------------
980 * Send pending messages (PARALLEL_HASKELL only)
981 * ------------------------------------------------------------------------- */
983 #if defined(PARALLEL_HASKELL)
985 scheduleSendPendingMessages(void)
988 # if defined(PAR) // global Mem.Mgmt., omit for now
989 if (PendingFetches
!= END_BF_QUEUE
) {
994 if (RtsFlags
.ParFlags
.BufferTime
) {
995 // if we use message buffering, we must send away all message
996 // packets which have become too old...
1002 /* ----------------------------------------------------------------------------
1003 * Process message in the current Capability's inbox
1004 * ------------------------------------------------------------------------- */
1007 scheduleProcessInbox (Capability
**pcap USED_IF_THREADS
)
1009 #if defined(THREADED_RTS)
1012 Capability
*cap
= *pcap
;
1014 while (!emptyInbox(cap
)) {
1015 if (cap
->r
.rCurrentNursery
->link
== NULL
||
1016 g0
->n_new_large_words
>= large_alloc_lim
) {
1017 scheduleDoGC(pcap
, cap
->running_task
, rtsFalse
);
1021 // don't use a blocking acquire; if the lock is held by
1022 // another thread then just carry on. This seems to avoid
1023 // getting stuck in a message ping-pong situation with other
1024 // processors. We'll check the inbox again later anyway.
1026 // We should really use a more efficient queue data structure
1027 // here. The trickiness is that we must ensure a Capability
1028 // never goes idle if the inbox is non-empty, which is why we
1029 // use cap->lock (cap->lock is released as the last thing
1030 // before going idle; see Capability.c:releaseCapability()).
1031 r
= TRY_ACQUIRE_LOCK(&cap
->lock
);
1035 cap
->inbox
= (Message
*)END_TSO_QUEUE
;
1037 RELEASE_LOCK(&cap
->lock
);
1039 while (m
!= (Message
*)END_TSO_QUEUE
) {
1041 executeMessage(cap
, m
);
1048 /* ----------------------------------------------------------------------------
1049 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1050 * ------------------------------------------------------------------------- */
1052 #if defined(THREADED_RTS)
1054 scheduleActivateSpark(Capability
*cap
)
1056 if (anySparks() && !cap
->disabled
)
1058 createSparkThread(cap
);
1059 debugTrace(DEBUG_sched
, "creating a spark thread");
1062 #endif // PARALLEL_HASKELL || THREADED_RTS
1064 /* ----------------------------------------------------------------------------
1065 * After running a thread...
1066 * ------------------------------------------------------------------------- */
1069 schedulePostRunThread (Capability
*cap
, StgTSO
*t
)
1071 // We have to be able to catch transactions that are in an
1072 // infinite loop as a result of seeing an inconsistent view of
1076 // [a,b] <- mapM readTVar [ta,tb]
1077 // when (a == b) loop
1079 // and a is never equal to b given a consistent view of memory.
1081 if (t
-> trec
!= NO_TREC
&& t
-> why_blocked
== NotBlocked
) {
1082 if (!stmValidateNestOfTransactions (t
-> trec
)) {
1083 debugTrace(DEBUG_sched
| DEBUG_stm
,
1084 "trec %p found wasting its time", t
);
1086 // strip the stack back to the
1087 // ATOMICALLY_FRAME, aborting the (nested)
1088 // transaction, and saving the stack of any
1089 // partially-evaluated thunks on the heap.
1090 throwToSingleThreaded_(cap
, t
, NULL
, rtsTrue
);
1092 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1096 /* some statistics gathering in the parallel case */
1099 /* -----------------------------------------------------------------------------
1100 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1101 * -------------------------------------------------------------------------- */
1104 scheduleHandleHeapOverflow( Capability
*cap
, StgTSO
*t
)
1106 // did the task ask for a large block?
1107 if (cap
->r
.rHpAlloc
> BLOCK_SIZE
) {
1108 // if so, get one and push it on the front of the nursery.
1112 blocks
= (lnat
)BLOCK_ROUND_UP(cap
->r
.rHpAlloc
) / BLOCK_SIZE
;
1114 if (blocks
> BLOCKS_PER_MBLOCK
) {
1115 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap
->r
.rHpAlloc
);
1118 debugTrace(DEBUG_sched
,
1119 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1120 (long)t
->id
, what_next_strs
[t
->what_next
], blocks
);
1122 // don't do this if the nursery is (nearly) full, we'll GC first.
1123 if (cap
->r
.rCurrentNursery
->link
!= NULL
||
1124 cap
->r
.rNursery
->n_blocks
== 1) { // paranoia to prevent infinite loop
1125 // if the nursery has only one block.
1127 bd
= allocGroup_lock(blocks
);
1128 cap
->r
.rNursery
->n_blocks
+= blocks
;
1130 // link the new group into the list
1131 bd
->link
= cap
->r
.rCurrentNursery
;
1132 bd
->u
.back
= cap
->r
.rCurrentNursery
->u
.back
;
1133 if (cap
->r
.rCurrentNursery
->u
.back
!= NULL
) {
1134 cap
->r
.rCurrentNursery
->u
.back
->link
= bd
;
1136 cap
->r
.rNursery
->blocks
= bd
;
1138 cap
->r
.rCurrentNursery
->u
.back
= bd
;
1140 // initialise it as a nursery block. We initialise the
1141 // step, gen_no, and flags field of *every* sub-block in
1142 // this large block, because this is easier than making
1143 // sure that we always find the block head of a large
1144 // block whenever we call Bdescr() (eg. evacuate() and
1145 // isAlive() in the GC would both have to do this, at
1149 for (x
= bd
; x
< bd
+ blocks
; x
++) {
1150 initBdescr(x
,g0
,g0
);
1156 // This assert can be a killer if the app is doing lots
1157 // of large block allocations.
1158 IF_DEBUG(sanity
, checkNurserySanity(cap
->r
.rNursery
));
1160 // now update the nursery to point to the new block
1161 cap
->r
.rCurrentNursery
= bd
;
1163 // we might be unlucky and have another thread get on the
1164 // run queue before us and steal the large block, but in that
1165 // case the thread will just end up requesting another large
1167 pushOnRunQueue(cap
,t
);
1168 return rtsFalse
; /* not actually GC'ing */
1172 if (cap
->r
.rHpLim
== NULL
|| cap
->context_switch
) {
1173 // Sometimes we miss a context switch, e.g. when calling
1174 // primitives in a tight loop, MAYBE_GC() doesn't check the
1175 // context switch flag, and we end up waiting for a GC.
1176 // See #1984, and concurrent/should_run/1984
1177 cap
->context_switch
= 0;
1178 appendToRunQueue(cap
,t
);
1180 pushOnRunQueue(cap
,t
);
1183 /* actual GC is done at the end of the while loop in schedule() */
1186 /* -----------------------------------------------------------------------------
1187 * Handle a thread that returned to the scheduler with ThreadYielding
1188 * -------------------------------------------------------------------------- */
1191 scheduleHandleYield( Capability
*cap
, StgTSO
*t
, nat prev_what_next
)
1193 /* put the thread back on the run queue. Then, if we're ready to
1194 * GC, check whether this is the last task to stop. If so, wake
1195 * up the GC thread. getThread will block during a GC until the
1199 ASSERT(t
->_link
== END_TSO_QUEUE
);
1201 // Shortcut if we're just switching evaluators: don't bother
1202 // doing stack squeezing (which can be expensive), just run the
1204 if (cap
->context_switch
== 0 && t
->what_next
!= prev_what_next
) {
1205 debugTrace(DEBUG_sched
,
1206 "--<< thread %ld (%s) stopped to switch evaluators",
1207 (long)t
->id
, what_next_strs
[t
->what_next
]);
1211 // Reset the context switch flag. We don't do this just before
1212 // running the thread, because that would mean we would lose ticks
1213 // during GC, which can lead to unfair scheduling (a thread hogs
1214 // the CPU because the tick always arrives during GC). This way
1215 // penalises threads that do a lot of allocation, but that seems
1216 // better than the alternative.
1217 if (cap
->context_switch
!= 0) {
1218 cap
->context_switch
= 0;
1219 appendToRunQueue(cap
,t
);
1221 pushOnRunQueue(cap
,t
);
1225 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1231 /* -----------------------------------------------------------------------------
1232 * Handle a thread that returned to the scheduler with ThreadBlocked
1233 * -------------------------------------------------------------------------- */
1236 scheduleHandleThreadBlocked( StgTSO
*t
1243 // We don't need to do anything. The thread is blocked, and it
1244 // has tidied up its stack and placed itself on whatever queue
1245 // it needs to be on.
1247 // ASSERT(t->why_blocked != NotBlocked);
1248 // Not true: for example,
1249 // - the thread may have woken itself up already, because
1250 // threadPaused() might have raised a blocked throwTo
1251 // exception, see maybePerformBlockedException().
1254 traceThreadStatus(DEBUG_sched
, t
);
1258 /* -----------------------------------------------------------------------------
1259 * Handle a thread that returned to the scheduler with ThreadFinished
1260 * -------------------------------------------------------------------------- */
1263 scheduleHandleThreadFinished (Capability
*cap STG_UNUSED
, Task
*task
, StgTSO
*t
)
1265 /* Need to check whether this was a main thread, and if so,
1266 * return with the return value.
1268 * We also end up here if the thread kills itself with an
1269 * uncaught exception, see Exception.cmm.
1272 // blocked exceptions can now complete, even if the thread was in
1273 // blocked mode (see #2910).
1274 awakenBlockedExceptionQueue (cap
, t
);
1277 // Check whether the thread that just completed was a bound
1278 // thread, and if so return with the result.
1280 // There is an assumption here that all thread completion goes
1281 // through this point; we need to make sure that if a thread
1282 // ends up in the ThreadKilled state, that it stays on the run
1283 // queue so it can be dealt with here.
1288 if (t
->bound
!= task
->incall
) {
1289 #if !defined(THREADED_RTS)
1290 // Must be a bound thread that is not the topmost one. Leave
1291 // it on the run queue until the stack has unwound to the
1292 // point where we can deal with this. Leaving it on the run
1293 // queue also ensures that the garbage collector knows about
1294 // this thread and its return value (it gets dropped from the
1295 // step->threads list so there's no other way to find it).
1296 appendToRunQueue(cap
,t
);
1299 // this cannot happen in the threaded RTS, because a
1300 // bound thread can only be run by the appropriate Task.
1301 barf("finished bound thread that isn't mine");
1305 ASSERT(task
->incall
->tso
== t
);
1307 if (t
->what_next
== ThreadComplete
) {
1308 if (task
->incall
->ret
) {
1309 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1310 *(task
->incall
->ret
) = (StgClosure
*)task
->incall
->tso
->stackobj
->sp
[1];
1312 task
->incall
->stat
= Success
;
1314 if (task
->incall
->ret
) {
1315 *(task
->incall
->ret
) = NULL
;
1317 if (sched_state
>= SCHED_INTERRUPTING
) {
1318 if (heap_overflow
) {
1319 task
->incall
->stat
= HeapExhausted
;
1321 task
->incall
->stat
= Interrupted
;
1324 task
->incall
->stat
= Killed
;
1328 removeThreadLabel((StgWord
)task
->incall
->tso
->id
);
1331 // We no longer consider this thread and task to be bound to
1332 // each other. The TSO lives on until it is GC'd, but the
1333 // task is about to be released by the caller, and we don't
1334 // want anyone following the pointer from the TSO to the
1335 // defunct task (which might have already been
1336 // re-used). This was a real bug: the GC updated
1337 // tso->bound->tso which lead to a deadlock.
1339 task
->incall
->tso
= NULL
;
1341 return rtsTrue
; // tells schedule() to return
1347 /* -----------------------------------------------------------------------------
1348 * Perform a heap census
1349 * -------------------------------------------------------------------------- */
1352 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED
)
1354 // When we have +RTS -i0 and we're heap profiling, do a census at
1355 // every GC. This lets us get repeatable runs for debugging.
1356 if (performHeapProfile
||
1357 (RtsFlags
.ProfFlags
.heapProfileInterval
==0 &&
1358 RtsFlags
.ProfFlags
.doHeapProfile
&& ready_to_gc
)) {
1365 /* -----------------------------------------------------------------------------
1366 * Start a synchronisation of all capabilities
1367 * -------------------------------------------------------------------------- */
1370 // 0 if we successfully got a sync
1371 // non-0 if there was another sync request in progress,
1372 // and we yielded to it. The value returned is the
1373 // type of the other sync request.
1375 #if defined(THREADED_RTS)
1376 static nat
requestSync (Capability
**pcap
, Task
*task
, nat sync_type
)
1378 nat prev_pending_sync
;
1380 prev_pending_sync
= cas(&pending_sync
, 0, sync_type
);
1382 if (prev_pending_sync
)
1385 debugTrace(DEBUG_sched
, "someone else is trying to sync (%d)...",
1388 yieldCapability(pcap
,task
,rtsTrue
);
1389 } while (pending_sync
);
1390 return prev_pending_sync
; // NOTE: task->cap might have changed now
1399 // Grab all the capabilities except the one we already hold. Used
1400 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1401 // before a fork (SYNC_OTHER).
1403 // Only call this after requestSync(), otherwise a deadlock might
1404 // ensue if another thread is trying to synchronise.
1406 static void acquireAllCapabilities(Capability
*cap
, Task
*task
)
1411 for (i
=0; i
< n_capabilities
; i
++) {
1412 debugTrace(DEBUG_sched
, "grabbing all the capabilies (%d/%d)", i
, n_capabilities
);
1413 tmpcap
= &capabilities
[i
];
1414 if (tmpcap
!= cap
) {
1415 // we better hope this task doesn't get migrated to
1416 // another Capability while we're waiting for this one.
1417 // It won't, because load balancing happens while we have
1418 // all the Capabilities, but even so it's a slightly
1419 // unsavoury invariant.
1421 waitForReturnCapability(&tmpcap
, task
);
1422 if (tmpcap
->no
!= i
) {
1423 barf("acquireAllCapabilities: got the wrong capability");
1430 static void releaseAllCapabilities(Capability
*cap
, Task
*task
)
1434 for (i
= 0; i
< n_capabilities
; i
++) {
1436 task
->cap
= &capabilities
[i
];
1437 releaseCapability(&capabilities
[i
]);
1444 /* -----------------------------------------------------------------------------
1445 * Perform a garbage collection if necessary
1446 * -------------------------------------------------------------------------- */
1449 scheduleDoGC (Capability
**pcap
, Task
*task USED_IF_THREADS
,
1450 rtsBool force_major
)
1452 Capability
*cap
= *pcap
;
1453 rtsBool heap_census
;
1455 rtsBool idle_cap
[n_capabilities
];
1461 if (sched_state
== SCHED_SHUTTING_DOWN
) {
1462 // The final GC has already been done, and the system is
1463 // shutting down. We'll probably deadlock if we try to GC
1469 if (sched_state
< SCHED_INTERRUPTING
1470 && RtsFlags
.ParFlags
.parGcEnabled
1471 && N
>= RtsFlags
.ParFlags
.parGcGen
1472 && ! oldest_gen
->mark
)
1474 gc_type
= SYNC_GC_PAR
;
1476 gc_type
= SYNC_GC_SEQ
;
1479 // In order to GC, there must be no threads running Haskell code.
1480 // Therefore, the GC thread needs to hold *all* the capabilities,
1481 // and release them after the GC has completed.
1483 // This seems to be the simplest way: previous attempts involved
1484 // making all the threads with capabilities give up their
1485 // capabilities and sleep except for the *last* one, which
1486 // actually did the GC. But it's quite hard to arrange for all
1487 // the other tasks to sleep and stay asleep.
1490 /* Other capabilities are prevented from running yet more Haskell
1491 threads if pending_sync is set. Tested inside
1492 yieldCapability() and releaseCapability() in Capability.c */
1495 sync
= requestSync(pcap
, task
, gc_type
);
1497 if (sync
== SYNC_GC_SEQ
|| sync
== SYNC_GC_PAR
) {
1498 // someone else had a pending sync request for a GC, so
1499 // let's assume GC has been done and we don't need to GC
1503 if (sched_state
== SCHED_SHUTTING_DOWN
) {
1504 // The scheduler might now be shutting down. We tested
1505 // this above, but it might have become true since then as
1506 // we yielded the capability in requestSync().
1511 interruptAllCapabilities();
1513 // The final shutdown GC is always single-threaded, because it's
1514 // possible that some of the Capabilities have no worker threads.
1516 if (gc_type
== SYNC_GC_SEQ
)
1518 traceEventRequestSeqGc(cap
);
1522 traceEventRequestParGc(cap
);
1523 debugTrace(DEBUG_sched
, "ready_to_gc, grabbing GC threads");
1526 if (gc_type
== SYNC_GC_SEQ
)
1528 // single-threaded GC: grab all the capabilities
1529 acquireAllCapabilities(cap
,task
);
1533 // If we are load-balancing collections in this
1534 // generation, then we require all GC threads to participate
1535 // in the collection. Otherwise, we only require active
1536 // threads to participate, and we set gc_threads[i]->idle for
1537 // any idle capabilities. The rationale here is that waking
1538 // up an idle Capability takes much longer than just doing any
1539 // GC work on its behalf.
1541 if (RtsFlags
.ParFlags
.parGcNoSyncWithIdle
== 0
1542 || (RtsFlags
.ParFlags
.parGcLoadBalancingEnabled
&&
1543 N
>= RtsFlags
.ParFlags
.parGcLoadBalancingGen
)) {
1544 for (i
=0; i
< n_capabilities
; i
++) {
1545 if (capabilities
[i
].disabled
) {
1546 idle_cap
[i
] = tryGrabCapability(&capabilities
[i
], task
);
1548 idle_cap
[i
] = rtsFalse
;
1552 for (i
=0; i
< n_capabilities
; i
++) {
1553 if (capabilities
[i
].disabled
) {
1554 idle_cap
[i
] = tryGrabCapability(&capabilities
[i
], task
);
1555 } else if (i
== cap
->no
||
1556 capabilities
[i
].idle
< RtsFlags
.ParFlags
.parGcNoSyncWithIdle
) {
1557 idle_cap
[i
] = rtsFalse
;
1559 idle_cap
[i
] = tryGrabCapability(&capabilities
[i
], task
);
1561 n_failed_trygrab_idles
++;
1569 // We set the gc_thread[i]->idle flag if that
1570 // capability/thread is not participating in this collection.
1571 // We also keep a local record of which capabilities are idle
1572 // in idle_cap[], because scheduleDoGC() is re-entrant:
1573 // another thread might start a GC as soon as we've finished
1574 // this one, and thus the gc_thread[]->idle flags are invalid
1575 // as soon as we release any threads after GC. Getting this
1576 // wrong leads to a rare and hard to debug deadlock!
1578 for (i
=0; i
< n_capabilities
; i
++) {
1579 gc_threads
[i
]->idle
= idle_cap
[i
];
1580 capabilities
[i
].idle
++;
1583 // For all capabilities participating in this GC, wait until
1584 // they have stopped mutating and are standing by for GC.
1585 waitForGcThreads(cap
);
1587 #if defined(THREADED_RTS)
1588 // Stable point where we can do a global check on our spark counters
1589 ASSERT(checkSparkCountInvariant());
1595 IF_DEBUG(scheduler
, printAllThreads());
1597 delete_threads_and_gc
:
1599 * We now have all the capabilities; if we're in an interrupting
1600 * state, then we should take the opportunity to delete all the
1601 * threads in the system.
1603 if (sched_state
== SCHED_INTERRUPTING
) {
1604 deleteAllThreads(cap
);
1605 #if defined(THREADED_RTS)
1606 // Discard all the sparks from every Capability. Why?
1607 // They'll probably be GC'd anyway since we've killed all the
1608 // threads. It just avoids the GC having to do any work to
1609 // figure out that any remaining sparks are garbage.
1610 for (i
= 0; i
< n_capabilities
; i
++) {
1611 capabilities
[i
].spark_stats
.gcd
+=
1612 sparkPoolSize(capabilities
[i
].sparks
);
1613 // No race here since all Caps are stopped.
1614 discardSparksCap(&capabilities
[i
]);
1617 sched_state
= SCHED_SHUTTING_DOWN
;
1621 * When there are disabled capabilities, we want to migrate any
1622 * threads away from them. Normally this happens in the
1623 * scheduler's loop, but only for unbound threads - it's really
1624 * hard for a bound thread to migrate itself. So we have another
1627 #if defined(THREADED_RTS)
1628 for (i
= enabled_capabilities
; i
< n_capabilities
; i
++) {
1629 Capability
*tmp_cap
, *dest_cap
;
1630 tmp_cap
= &capabilities
[i
];
1631 ASSERT(tmp_cap
->disabled
);
1633 dest_cap
= &capabilities
[i
% enabled_capabilities
];
1634 while (!emptyRunQueue(tmp_cap
)) {
1635 tso
= popRunQueue(tmp_cap
);
1636 migrateThread(tmp_cap
, tso
, dest_cap
);
1637 if (tso
->bound
) { tso
->bound
->task
->cap
= dest_cap
; }
1643 heap_census
= scheduleNeedHeapProfile(rtsTrue
);
1645 #if defined(THREADED_RTS)
1646 // reset pending_sync *before* GC, so that when the GC threads
1647 // emerge they don't immediately re-enter the GC.
1649 GarbageCollect(force_major
|| heap_census
, heap_census
, gc_type
, cap
);
1651 GarbageCollect(force_major
|| heap_census
, heap_census
, 0, cap
);
1654 traceSparkCounters(cap
);
1656 if (recent_activity
== ACTIVITY_INACTIVE
&& force_major
)
1658 // We are doing a GC because the system has been idle for a
1659 // timeslice and we need to check for deadlock. Record the
1660 // fact that we've done a GC and turn off the timer signal;
1661 // it will get re-enabled if we run any threads after the GC.
1662 recent_activity
= ACTIVITY_DONE_GC
;
1667 // the GC might have taken long enough for the timer to set
1668 // recent_activity = ACTIVITY_INACTIVE, but we aren't
1669 // necessarily deadlocked:
1670 recent_activity
= ACTIVITY_YES
;
1673 #if defined(THREADED_RTS)
1674 // Stable point where we can do a global check on our spark counters
1675 ASSERT(checkSparkCountInvariant());
1678 // The heap census itself is done during GarbageCollect().
1680 performHeapProfile
= rtsFalse
;
1683 #if defined(THREADED_RTS)
1684 if (gc_type
== SYNC_GC_PAR
)
1686 releaseGCThreads(cap
);
1687 for (i
= 0; i
< n_capabilities
; i
++) {
1690 ASSERT(capabilities
[i
].running_task
== task
);
1691 task
->cap
= &capabilities
[i
];
1692 releaseCapability(&capabilities
[i
]);
1694 ASSERT(capabilities
[i
].running_task
!= task
);
1702 if (heap_overflow
&& sched_state
< SCHED_INTERRUPTING
) {
1703 // GC set the heap_overflow flag, so we should proceed with
1704 // an orderly shutdown now. Ultimately we want the main
1705 // thread to return to its caller with HeapExhausted, at which
1706 // point the caller should call hs_exit(). The first step is
1707 // to delete all the threads.
1709 // Another way to do this would be to raise an exception in
1710 // the main thread, which we really should do because it gives
1711 // the program a chance to clean up. But how do we find the
1712 // main thread? It should presumably be the same one that
1713 // gets ^C exceptions, but that's all done on the Haskell side
1714 // (GHC.TopHandler).
1715 sched_state
= SCHED_INTERRUPTING
;
1716 goto delete_threads_and_gc
;
1721 Once we are all together... this would be the place to balance all
1722 spark pools. No concurrent stealing or adding of new sparks can
1723 occur. Should be defined in Sparks.c. */
1724 balanceSparkPoolsCaps(n_capabilities
, capabilities
);
1727 #if defined(THREADED_RTS)
1728 if (gc_type
== SYNC_GC_SEQ
) {
1729 // release our stash of capabilities.
1730 releaseAllCapabilities(cap
, task
);
1737 /* ---------------------------------------------------------------------------
1738 * Singleton fork(). Do not copy any running threads.
1739 * ------------------------------------------------------------------------- */
1742 forkProcess(HsStablePtr
*entry
1743 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1748 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1759 debugTrace(DEBUG_sched
, "forking!");
1761 task
= newBoundTask();
1764 waitForReturnCapability(&cap
, task
);
1768 sync
= requestSync(&cap
, task
, SYNC_OTHER
);
1771 acquireAllCapabilities(cap
,task
);
1776 // no funny business: hold locks while we fork, otherwise if some
1777 // other thread is holding a lock when the fork happens, the data
1778 // structure protected by the lock will forever be in an
1779 // inconsistent state in the child. See also #1391.
1780 ACQUIRE_LOCK(&sched_mutex
);
1781 ACQUIRE_LOCK(&sm_mutex
);
1782 ACQUIRE_LOCK(&stable_mutex
);
1783 ACQUIRE_LOCK(&task
->lock
);
1785 for (i
=0; i
< n_capabilities
; i
++) {
1786 ACQUIRE_LOCK(&capabilities
[i
].lock
);
1789 stopTimer(); // See #4074
1791 #if defined(TRACING)
1792 flushEventLog(); // so that child won't inherit dirty file buffers
1797 if (pid
) { // parent
1799 startTimer(); // #4074
1801 RELEASE_LOCK(&sched_mutex
);
1802 RELEASE_LOCK(&sm_mutex
);
1803 RELEASE_LOCK(&stable_mutex
);
1804 RELEASE_LOCK(&task
->lock
);
1806 for (i
=0; i
< n_capabilities
; i
++) {
1807 releaseCapability_(&capabilities
[i
],rtsFalse
);
1808 RELEASE_LOCK(&capabilities
[i
].lock
);
1810 boundTaskExiting(task
);
1812 // just return the pid
1817 #if defined(THREADED_RTS)
1818 initMutex(&sched_mutex
);
1819 initMutex(&sm_mutex
);
1820 initMutex(&stable_mutex
);
1821 initMutex(&task
->lock
);
1823 for (i
=0; i
< n_capabilities
; i
++) {
1824 initMutex(&capabilities
[i
].lock
);
1832 // Now, all OS threads except the thread that forked are
1833 // stopped. We need to stop all Haskell threads, including
1834 // those involved in foreign calls. Also we need to delete
1835 // all Tasks, because they correspond to OS threads that are
1838 for (g
= 0; g
< RtsFlags
.GcFlags
.generations
; g
++) {
1839 for (t
= generations
[g
].threads
; t
!= END_TSO_QUEUE
; t
= next
) {
1840 next
= t
->global_link
;
1841 // don't allow threads to catch the ThreadKilled
1842 // exception, but we do want to raiseAsync() because these
1843 // threads may be evaluating thunks that we need later.
1844 deleteThread_(t
->cap
,t
);
1846 // stop the GC from updating the InCall to point to
1847 // the TSO. This is only necessary because the
1848 // OSThread bound to the TSO has been killed, and
1849 // won't get a chance to exit in the usual way (see
1850 // also scheduleHandleThreadFinished).
1855 discardTasksExcept(task
);
1857 for (i
=0; i
< n_capabilities
; i
++) {
1858 cap
= &capabilities
[i
];
1860 // Empty the run queue. It seems tempting to let all the
1861 // killed threads stay on the run queue as zombies to be
1862 // cleaned up later, but some of them may correspond to
1863 // bound threads for which the corresponding Task does not
1865 cap
->run_queue_hd
= END_TSO_QUEUE
;
1866 cap
->run_queue_tl
= END_TSO_QUEUE
;
1868 // Any suspended C-calling Tasks are no more, their OS threads
1870 cap
->suspended_ccalls
= NULL
;
1872 #if defined(THREADED_RTS)
1873 // Wipe our spare workers list, they no longer exist. New
1874 // workers will be created if necessary.
1875 cap
->spare_workers
= NULL
;
1876 cap
->n_spare_workers
= 0;
1877 cap
->returning_tasks_hd
= NULL
;
1878 cap
->returning_tasks_tl
= NULL
;
1881 // Release all caps except 0, we'll use that for starting
1882 // the IO manager and running the client action below.
1885 releaseCapability(cap
);
1888 cap
= &capabilities
[0];
1891 // Empty the threads lists. Otherwise, the garbage
1892 // collector may attempt to resurrect some of these threads.
1893 for (g
= 0; g
< RtsFlags
.GcFlags
.generations
; g
++) {
1894 generations
[g
].threads
= END_TSO_QUEUE
;
1897 // On Unix, all timers are reset in the child, so we need to start
1902 #if defined(THREADED_RTS)
1903 ioManagerStartCap(&cap
);
1906 rts_evalStableIO(&cap
, entry
, NULL
); // run the action
1907 rts_checkSchedStatus("forkProcess",cap
);
1910 hs_exit(); // clean up and exit
1911 stg_exit(EXIT_SUCCESS
);
1913 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1914 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1918 /* ---------------------------------------------------------------------------
1919 * Changing the number of Capabilities
1921 * Changing the number of Capabilities is very tricky! We can only do
1922 * it with the system fully stopped, so we do a full sync with
1923 * requestSync(SYNC_OTHER) and grab all the capabilities.
1925 * Then we resize the appropriate data structures, and update all
1926 * references to the old data structures which have now moved.
1927 * Finally we release the Capabilities we are holding, and start
1928 * worker Tasks on the new Capabilities we created.
1930 * ------------------------------------------------------------------------- */
1933 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS
)
1935 #if !defined(THREADED_RTS)
1936 if (new_n_capabilities
!= 1) {
1937 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1940 #elif defined(NOSMP)
1941 if (new_n_capabilities
!= 1) {
1942 errorBelch("setNumCapabilities: not supported on this platform");
1951 Capability
*old_capabilities
= NULL
;
1953 if (new_n_capabilities
== enabled_capabilities
) return;
1955 debugTrace(DEBUG_sched
, "changing the number of Capabilities from %d to %d",
1956 enabled_capabilities
, new_n_capabilities
);
1959 task
= cap
->running_task
;
1962 sync
= requestSync(&cap
, task
, SYNC_OTHER
);
1965 acquireAllCapabilities(cap
,task
);
1969 if (new_n_capabilities
< enabled_capabilities
)
1971 // Reducing the number of capabilities: we do not actually
1972 // remove the extra capabilities, we just mark them as
1973 // "disabled". This has the following effects:
1975 // - threads on a disabled capability are migrated away by the
1978 // - disabled capabilities do not participate in GC
1979 // (see scheduleDoGC())
1981 // - No spark threads are created on this capability
1982 // (see scheduleActivateSpark())
1984 // - We do not attempt to migrate threads *to* a disabled
1985 // capability (see schedulePushWork()).
1987 // but in other respects, a disabled capability remains
1988 // alive. Threads may be woken up on a disabled capability,
1989 // but they will be immediately migrated away.
1991 // This approach is much easier than trying to actually remove
1992 // the capability; we don't have to worry about GC data
1993 // structures, the nursery, etc.
1995 for (n
= new_n_capabilities
; n
< enabled_capabilities
; n
++) {
1996 capabilities
[n
].disabled
= rtsTrue
;
1997 traceCapDisable(&capabilities
[n
]);
1999 enabled_capabilities
= new_n_capabilities
;
2003 // Increasing the number of enabled capabilities.
2005 // enable any disabled capabilities, up to the required number
2006 for (n
= enabled_capabilities
;
2007 n
< new_n_capabilities
&& n
< n_capabilities
; n
++) {
2008 capabilities
[n
].disabled
= rtsFalse
;
2009 traceCapEnable(&capabilities
[n
]);
2011 enabled_capabilities
= n
;
2013 if (new_n_capabilities
> n_capabilities
) {
2014 #if defined(TRACING)
2015 // Allocate eventlog buffers for the new capabilities. Note this
2016 // must be done before calling moreCapabilities(), because that
2017 // will emit events about creating the new capabilities and adding
2018 // them to existing capsets.
2019 tracingAddCapapilities(n_capabilities
, new_n_capabilities
);
2022 // Resize the capabilities array
2023 // NB. after this, capabilities points somewhere new. Any pointers
2024 // of type (Capability *) are now invalid.
2025 old_capabilities
= moreCapabilities(n_capabilities
, new_n_capabilities
);
2027 // update our own cap pointer
2028 cap
= &capabilities
[cap
->no
];
2030 // Resize and update storage manager data structures
2031 storageAddCapabilities(n_capabilities
, new_n_capabilities
);
2033 // Update (Capability *) refs in the Task manager.
2034 updateCapabilityRefs();
2036 // Update (Capability *) refs from TSOs
2037 for (g
= 0; g
< RtsFlags
.GcFlags
.generations
; g
++) {
2038 for (t
= generations
[g
].threads
; t
!= END_TSO_QUEUE
; t
= t
->global_link
) {
2039 t
->cap
= &capabilities
[t
->cap
->no
];
2045 // We're done: release the original Capabilities
2046 releaseAllCapabilities(cap
,task
);
2048 // Start worker tasks on the new Capabilities
2049 startWorkerTasks(n_capabilities
, new_n_capabilities
);
2051 // finally, update n_capabilities
2052 if (new_n_capabilities
> n_capabilities
) {
2053 n_capabilities
= enabled_capabilities
= new_n_capabilities
;
2056 // We can't free the old array until now, because we access it
2057 // while updating pointers in updateCapabilityRefs().
2058 if (old_capabilities
) {
2059 stgFree(old_capabilities
);
2064 #endif // THREADED_RTS
2069 /* ---------------------------------------------------------------------------
2070 * Delete all the threads in the system
2071 * ------------------------------------------------------------------------- */
2074 deleteAllThreads ( Capability
*cap
)
2076 // NOTE: only safe to call if we own all capabilities.
2081 debugTrace(DEBUG_sched
,"deleting all threads");
2082 for (g
= 0; g
< RtsFlags
.GcFlags
.generations
; g
++) {
2083 for (t
= generations
[g
].threads
; t
!= END_TSO_QUEUE
; t
= next
) {
2084 next
= t
->global_link
;
2085 deleteThread(cap
,t
);
2089 // The run queue now contains a bunch of ThreadKilled threads. We
2090 // must not throw these away: the main thread(s) will be in there
2091 // somewhere, and the main scheduler loop has to deal with it.
2092 // Also, the run queue is the only thing keeping these threads from
2093 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2095 #if !defined(THREADED_RTS)
2096 ASSERT(blocked_queue_hd
== END_TSO_QUEUE
);
2097 ASSERT(sleeping_queue
== END_TSO_QUEUE
);
2101 /* -----------------------------------------------------------------------------
2102 Managing the suspended_ccalls list.
2103 Locks required: sched_mutex
2104 -------------------------------------------------------------------------- */
2107 suspendTask (Capability
*cap
, Task
*task
)
2111 incall
= task
->incall
;
2112 ASSERT(incall
->next
== NULL
&& incall
->prev
== NULL
);
2113 incall
->next
= cap
->suspended_ccalls
;
2114 incall
->prev
= NULL
;
2115 if (cap
->suspended_ccalls
) {
2116 cap
->suspended_ccalls
->prev
= incall
;
2118 cap
->suspended_ccalls
= incall
;
2122 recoverSuspendedTask (Capability
*cap
, Task
*task
)
2126 incall
= task
->incall
;
2128 incall
->prev
->next
= incall
->next
;
2130 ASSERT(cap
->suspended_ccalls
== incall
);
2131 cap
->suspended_ccalls
= incall
->next
;
2134 incall
->next
->prev
= incall
->prev
;
2136 incall
->next
= incall
->prev
= NULL
;
2139 /* ---------------------------------------------------------------------------
2140 * Suspending & resuming Haskell threads.
2142 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2143 * its capability before calling the C function. This allows another
2144 * task to pick up the capability and carry on running Haskell
2145 * threads. It also means that if the C call blocks, it won't lock
2148 * The Haskell thread making the C call is put to sleep for the
2149 * duration of the call, on the suspended_ccalling_threads queue. We
2150 * give out a token to the task, which it can use to resume the thread
2151 * on return from the C function.
2153 * If this is an interruptible C call, this means that the FFI call may be
2154 * unceremoniously terminated and should be scheduled on an
2155 * unbound worker thread.
2156 * ------------------------------------------------------------------------- */
2159 suspendThread (StgRegTable
*reg
, rtsBool interruptible
)
2166 StgWord32 saved_winerror
;
2169 saved_errno
= errno
;
2171 saved_winerror
= GetLastError();
2174 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2176 cap
= regTableToCapability(reg
);
2178 task
= cap
->running_task
;
2179 tso
= cap
->r
.rCurrentTSO
;
2181 traceEventStopThread(cap
, tso
, THREAD_SUSPENDED_FOREIGN_CALL
, 0);
2183 // XXX this might not be necessary --SDM
2184 tso
->what_next
= ThreadRunGHC
;
2186 threadPaused(cap
,tso
);
2188 if (interruptible
) {
2189 tso
->why_blocked
= BlockedOnCCall_Interruptible
;
2191 tso
->why_blocked
= BlockedOnCCall
;
2194 // Hand back capability
2195 task
->incall
->suspended_tso
= tso
;
2196 task
->incall
->suspended_cap
= cap
;
2198 ACQUIRE_LOCK(&cap
->lock
);
2200 suspendTask(cap
,task
);
2201 cap
->in_haskell
= rtsFalse
;
2202 releaseCapability_(cap
,rtsFalse
);
2204 RELEASE_LOCK(&cap
->lock
);
2206 errno
= saved_errno
;
2208 SetLastError(saved_winerror
);
2214 resumeThread (void *task_
)
2222 StgWord32 saved_winerror
;
2225 saved_errno
= errno
;
2227 saved_winerror
= GetLastError();
2230 incall
= task
->incall
;
2231 cap
= incall
->suspended_cap
;
2234 // Wait for permission to re-enter the RTS with the result.
2235 waitForReturnCapability(&cap
,task
);
2236 // we might be on a different capability now... but if so, our
2237 // entry on the suspended_ccalls list will also have been
2240 // Remove the thread from the suspended list
2241 recoverSuspendedTask(cap
,task
);
2243 tso
= incall
->suspended_tso
;
2244 incall
->suspended_tso
= NULL
;
2245 incall
->suspended_cap
= NULL
;
2246 tso
->_link
= END_TSO_QUEUE
; // no write barrier reqd
2248 traceEventRunThread(cap
, tso
);
2250 /* Reset blocking status */
2251 tso
->why_blocked
= NotBlocked
;
2253 if ((tso
->flags
& TSO_BLOCKEX
) == 0) {
2254 // avoid locking the TSO if we don't have to
2255 if (tso
->blocked_exceptions
!= END_BLOCKED_EXCEPTIONS_QUEUE
) {
2256 maybePerformBlockedException(cap
,tso
);
2260 cap
->r
.rCurrentTSO
= tso
;
2261 cap
->in_haskell
= rtsTrue
;
2262 errno
= saved_errno
;
2264 SetLastError(saved_winerror
);
2267 /* We might have GC'd, mark the TSO dirty again */
2269 dirty_STACK(cap
,tso
->stackobj
);
2271 IF_DEBUG(sanity
, checkTSO(tso
));
2276 /* ---------------------------------------------------------------------------
2279 * scheduleThread puts a thread on the end of the runnable queue.
2280 * This will usually be done immediately after a thread is created.
2281 * The caller of scheduleThread must create the thread using e.g.
2282 * createThread and push an appropriate closure
2283 * on this thread's stack before the scheduler is invoked.
2284 * ------------------------------------------------------------------------ */
2287 scheduleThread(Capability
*cap
, StgTSO
*tso
)
2289 // The thread goes at the *end* of the run-queue, to avoid possible
2290 // starvation of any threads already on the queue.
2291 appendToRunQueue(cap
,tso
);
2295 scheduleThreadOn(Capability
*cap
, StgWord cpu USED_IF_THREADS
, StgTSO
*tso
)
2297 tso
->flags
|= TSO_LOCKED
; // we requested explicit affinity; don't
2298 // move this thread from now on.
2299 #if defined(THREADED_RTS)
2300 cpu
%= enabled_capabilities
;
2301 if (cpu
== cap
->no
) {
2302 appendToRunQueue(cap
,tso
);
2304 migrateThread(cap
, tso
, &capabilities
[cpu
]);
2307 appendToRunQueue(cap
,tso
);
2312 scheduleWaitThread (StgTSO
* tso
, /*[out]*/HaskellObj
* ret
, Capability
**pcap
)
2315 DEBUG_ONLY( StgThreadID id
);
2320 // We already created/initialised the Task
2321 task
= cap
->running_task
;
2323 // This TSO is now a bound thread; make the Task and TSO
2324 // point to each other.
2325 tso
->bound
= task
->incall
;
2328 task
->incall
->tso
= tso
;
2329 task
->incall
->ret
= ret
;
2330 task
->incall
->stat
= NoStatus
;
2332 appendToRunQueue(cap
,tso
);
2334 DEBUG_ONLY( id
= tso
->id
);
2335 debugTrace(DEBUG_sched
, "new bound thread (%lu)", (unsigned long)id
);
2337 cap
= schedule(cap
,task
);
2339 ASSERT(task
->incall
->stat
!= NoStatus
);
2340 ASSERT_FULL_CAPABILITY_INVARIANTS(cap
,task
);
2342 debugTrace(DEBUG_sched
, "bound thread (%lu) finished", (unsigned long)id
);
2346 /* ----------------------------------------------------------------------------
2348 * ------------------------------------------------------------------------- */
2350 #if defined(THREADED_RTS)
2351 void scheduleWorker (Capability
*cap
, Task
*task
)
2353 // schedule() runs without a lock.
2354 cap
= schedule(cap
,task
);
2356 // On exit from schedule(), we have a Capability, but possibly not
2357 // the same one we started with.
2359 // During shutdown, the requirement is that after all the
2360 // Capabilities are shut down, all workers that are shutting down
2361 // have finished workerTaskStop(). This is why we hold on to
2362 // cap->lock until we've finished workerTaskStop() below.
2364 // There may be workers still involved in foreign calls; those
2365 // will just block in waitForReturnCapability() because the
2366 // Capability has been shut down.
2368 ACQUIRE_LOCK(&cap
->lock
);
2369 releaseCapability_(cap
,rtsFalse
);
2370 workerTaskStop(task
);
2371 RELEASE_LOCK(&cap
->lock
);
2375 /* ---------------------------------------------------------------------------
2376 * Start new worker tasks on Capabilities from--to
2377 * -------------------------------------------------------------------------- */
2380 startWorkerTasks (nat from USED_IF_THREADS
, nat to USED_IF_THREADS
)
2382 #if defined(THREADED_RTS)
2386 for (i
= from
; i
< to
; i
++) {
2387 cap
= &capabilities
[i
];
2388 ACQUIRE_LOCK(&cap
->lock
);
2389 startWorkerTask(cap
);
2390 RELEASE_LOCK(&cap
->lock
);
2395 /* ---------------------------------------------------------------------------
2398 * Initialise the scheduler. This resets all the queues - if the
2399 * queues contained any threads, they'll be garbage collected at the
2402 * ------------------------------------------------------------------------ */
2407 #if !defined(THREADED_RTS)
2408 blocked_queue_hd
= END_TSO_QUEUE
;
2409 blocked_queue_tl
= END_TSO_QUEUE
;
2410 sleeping_queue
= END_TSO_QUEUE
;
2413 sched_state
= SCHED_RUNNING
;
2414 recent_activity
= ACTIVITY_YES
;
2416 #if defined(THREADED_RTS)
2417 /* Initialise the mutex and condition variables used by
2419 initMutex(&sched_mutex
);
2422 ACQUIRE_LOCK(&sched_mutex
);
2424 /* A capability holds the state a native thread needs in
2425 * order to execute STG code. At least one capability is
2426 * floating around (only THREADED_RTS builds have more than one).
2433 * Eagerly start one worker to run each Capability, except for
2434 * Capability 0. The idea is that we're probably going to start a
2435 * bound thread on Capability 0 pretty soon, so we don't want a
2436 * worker task hogging it.
2438 startWorkerTasks(1, n_capabilities
);
2440 RELEASE_LOCK(&sched_mutex
);
2445 exitScheduler (rtsBool wait_foreign USED_IF_THREADS
)
2446 /* see Capability.c, shutdownCapability() */
2450 task
= newBoundTask();
2452 // If we haven't killed all the threads yet, do it now.
2453 if (sched_state
< SCHED_SHUTTING_DOWN
) {
2454 sched_state
= SCHED_INTERRUPTING
;
2455 Capability
*cap
= task
->cap
;
2456 waitForReturnCapability(&cap
,task
);
2457 scheduleDoGC(&cap
,task
,rtsFalse
);
2458 ASSERT(task
->incall
->tso
== NULL
);
2459 releaseCapability(cap
);
2461 sched_state
= SCHED_SHUTTING_DOWN
;
2463 shutdownCapabilities(task
, wait_foreign
);
2465 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2466 // n_failed_trygrab_idles, n_idle_caps);
2468 boundTaskExiting(task
);
2472 freeScheduler( void )
2476 ACQUIRE_LOCK(&sched_mutex
);
2477 still_running
= freeTaskManager();
2478 // We can only free the Capabilities if there are no Tasks still
2479 // running. We might have a Task about to return from a foreign
2480 // call into waitForReturnCapability(), for example (actually,
2481 // this should be the *only* thing that a still-running Task can
2482 // do at this point, and it will block waiting for the
2484 if (still_running
== 0) {
2486 if (n_capabilities
!= 1) {
2487 stgFree(capabilities
);
2490 RELEASE_LOCK(&sched_mutex
);
2491 #if defined(THREADED_RTS)
2492 closeMutex(&sched_mutex
);
2496 void markScheduler (evac_fn evac USED_IF_NOT_THREADS
,
2497 void *user USED_IF_NOT_THREADS
)
2499 #if !defined(THREADED_RTS)
2500 evac(user
, (StgClosure
**)(void *)&blocked_queue_hd
);
2501 evac(user
, (StgClosure
**)(void *)&blocked_queue_tl
);
2502 evac(user
, (StgClosure
**)(void *)&sleeping_queue
);
2506 /* -----------------------------------------------------------------------------
2509 This is the interface to the garbage collector from Haskell land.
2510 We provide this so that external C code can allocate and garbage
2511 collect when called from Haskell via _ccall_GC.
2512 -------------------------------------------------------------------------- */
2515 performGC_(rtsBool force_major
)
2518 Capability
*cap
= NULL
;
2520 // We must grab a new Task here, because the existing Task may be
2521 // associated with a particular Capability, and chained onto the
2522 // suspended_ccalls queue.
2523 task
= newBoundTask();
2525 waitForReturnCapability(&cap
,task
);
2526 scheduleDoGC(&cap
,task
,force_major
);
2527 releaseCapability(cap
);
2528 boundTaskExiting(task
);
2534 performGC_(rtsFalse
);
2538 performMajorGC(void)
2540 performGC_(rtsTrue
);
2543 /* ---------------------------------------------------------------------------
2545 - usually called inside a signal handler so it mustn't do anything fancy.
2546 ------------------------------------------------------------------------ */
2549 interruptStgRts(void)
2551 sched_state
= SCHED_INTERRUPTING
;
2552 interruptAllCapabilities();
2553 #if defined(THREADED_RTS)
2558 /* -----------------------------------------------------------------------------
2561 This function causes at least one OS thread to wake up and run the
2562 scheduler loop. It is invoked when the RTS might be deadlocked, or
2563 an external event has arrived that may need servicing (eg. a
2564 keyboard interrupt).
2566 In the single-threaded RTS we don't do anything here; we only have
2567 one thread anyway, and the event that caused us to want to wake up
2568 will have interrupted any blocking system call in progress anyway.
2569 -------------------------------------------------------------------------- */
2571 #if defined(THREADED_RTS)
2572 void wakeUpRts(void)
2574 // This forces the IO Manager thread to wakeup, which will
2575 // in turn ensure that some OS thread wakes up and runs the
2576 // scheduler loop, which will cause a GC and deadlock check.
2581 /* -----------------------------------------------------------------------------
2584 This is used for interruption (^C) and forking, and corresponds to
2585 raising an exception but without letting the thread catch the
2587 -------------------------------------------------------------------------- */
2590 deleteThread (Capability
*cap STG_UNUSED
, StgTSO
*tso
)
2592 // NOTE: must only be called on a TSO that we have exclusive
2593 // access to, because we will call throwToSingleThreaded() below.
2594 // The TSO must be on the run queue of the Capability we own, or
2595 // we must own all Capabilities.
2597 if (tso
->why_blocked
!= BlockedOnCCall
&&
2598 tso
->why_blocked
!= BlockedOnCCall_Interruptible
) {
2599 throwToSingleThreaded(tso
->cap
,tso
,NULL
);
2603 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2605 deleteThread_(Capability
*cap
, StgTSO
*tso
)
2606 { // for forkProcess only:
2607 // like deleteThread(), but we delete threads in foreign calls, too.
2609 if (tso
->why_blocked
== BlockedOnCCall
||
2610 tso
->why_blocked
== BlockedOnCCall_Interruptible
) {
2611 tso
->what_next
= ThreadKilled
;
2612 appendToRunQueue(tso
->cap
, tso
);
2614 deleteThread(cap
,tso
);
2619 /* -----------------------------------------------------------------------------
2620 raiseExceptionHelper
2622 This function is called by the raise# primitve, just so that we can
2623 move some of the tricky bits of raising an exception from C-- into
2624 C. Who knows, it might be a useful re-useable thing here too.
2625 -------------------------------------------------------------------------- */
2628 raiseExceptionHelper (StgRegTable
*reg
, StgTSO
*tso
, StgClosure
*exception
)
2630 Capability
*cap
= regTableToCapability(reg
);
2631 StgThunk
*raise_closure
= NULL
;
2633 StgRetInfoTable
*info
;
2635 // This closure represents the expression 'raise# E' where E
2636 // is the exception raise. It is used to overwrite all the
2637 // thunks which are currently under evaluataion.
2640 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2641 // LDV profiling: stg_raise_info has THUNK as its closure
2642 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2643 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2644 // 1 does not cause any problem unless profiling is performed.
2645 // However, when LDV profiling goes on, we need to linearly scan
2646 // small object pool, where raise_closure is stored, so we should
2647 // use MIN_UPD_SIZE.
2649 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2650 // sizeofW(StgClosure)+1);
2654 // Walk up the stack, looking for the catch frame. On the way,
2655 // we update any closures pointed to from update frames with the
2656 // raise closure that we just built.
2658 p
= tso
->stackobj
->sp
;
2660 info
= get_ret_itbl((StgClosure
*)p
);
2661 next
= p
+ stack_frame_sizeW((StgClosure
*)p
);
2662 switch (info
->i
.type
) {
2665 // Only create raise_closure if we need to.
2666 if (raise_closure
== NULL
) {
2668 (StgThunk
*)allocate(cap
,sizeofW(StgThunk
)+1);
2669 SET_HDR(raise_closure
, &stg_raise_info
, cap
->r
.rCCCS
);
2670 raise_closure
->payload
[0] = exception
;
2672 updateThunk(cap
, tso
, ((StgUpdateFrame
*)p
)->updatee
,
2673 (StgClosure
*)raise_closure
);
2677 case ATOMICALLY_FRAME
:
2678 debugTrace(DEBUG_stm
, "found ATOMICALLY_FRAME at %p", p
);
2679 tso
->stackobj
->sp
= p
;
2680 return ATOMICALLY_FRAME
;
2683 tso
->stackobj
->sp
= p
;
2686 case CATCH_STM_FRAME
:
2687 debugTrace(DEBUG_stm
, "found CATCH_STM_FRAME at %p", p
);
2688 tso
->stackobj
->sp
= p
;
2689 return CATCH_STM_FRAME
;
2691 case UNDERFLOW_FRAME
:
2692 tso
->stackobj
->sp
= p
;
2693 threadStackUnderflow(cap
,tso
);
2694 p
= tso
->stackobj
->sp
;
2698 tso
->stackobj
->sp
= p
;
2701 case CATCH_RETRY_FRAME
:
2710 /* -----------------------------------------------------------------------------
2711 findRetryFrameHelper
2713 This function is called by the retry# primitive. It traverses the stack
2714 leaving tso->sp referring to the frame which should handle the retry.
2716 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2717 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2719 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2720 create) because retries are not considered to be exceptions, despite the
2721 similar implementation.
2723 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2724 not be created within memory transactions.
2725 -------------------------------------------------------------------------- */
2728 findRetryFrameHelper (Capability
*cap
, StgTSO
*tso
)
2731 StgRetInfoTable
*info
;
2733 p
= tso
->stackobj
->sp
;
2735 info
= get_ret_itbl((StgClosure
*)p
);
2736 next
= p
+ stack_frame_sizeW((StgClosure
*)p
);
2737 switch (info
->i
.type
) {
2739 case ATOMICALLY_FRAME
:
2740 debugTrace(DEBUG_stm
,
2741 "found ATOMICALLY_FRAME at %p during retry", p
);
2742 tso
->stackobj
->sp
= p
;
2743 return ATOMICALLY_FRAME
;
2745 case CATCH_RETRY_FRAME
:
2746 debugTrace(DEBUG_stm
,
2747 "found CATCH_RETRY_FRAME at %p during retrry", p
);
2748 tso
->stackobj
->sp
= p
;
2749 return CATCH_RETRY_FRAME
;
2751 case CATCH_STM_FRAME
: {
2752 StgTRecHeader
*trec
= tso
-> trec
;
2753 StgTRecHeader
*outer
= trec
-> enclosing_trec
;
2754 debugTrace(DEBUG_stm
,
2755 "found CATCH_STM_FRAME at %p during retry", p
);
2756 debugTrace(DEBUG_stm
, "trec=%p outer=%p", trec
, outer
);
2757 stmAbortTransaction(cap
, trec
);
2758 stmFreeAbortedTRec(cap
, trec
);
2759 tso
-> trec
= outer
;
2764 case UNDERFLOW_FRAME
:
2765 threadStackUnderflow(cap
,tso
);
2766 p
= tso
->stackobj
->sp
;
2770 ASSERT(info
->i
.type
!= CATCH_FRAME
);
2771 ASSERT(info
->i
.type
!= STOP_FRAME
);
2778 /* -----------------------------------------------------------------------------
2779 resurrectThreads is called after garbage collection on the list of
2780 threads found to be garbage. Each of these threads will be woken
2781 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2782 on an MVar, or NonTermination if the thread was blocked on a Black
2785 Locks: assumes we hold *all* the capabilities.
2786 -------------------------------------------------------------------------- */
2789 resurrectThreads (StgTSO
*threads
)
2795 for (tso
= threads
; tso
!= END_TSO_QUEUE
; tso
= next
) {
2796 next
= tso
->global_link
;
2798 gen
= Bdescr((P_
)tso
)->gen
;
2799 tso
->global_link
= gen
->threads
;
2802 debugTrace(DEBUG_sched
, "resurrecting thread %lu", (unsigned long)tso
->id
);
2804 // Wake up the thread on the Capability it was last on
2807 switch (tso
->why_blocked
) {
2809 /* Called by GC - sched_mutex lock is currently held. */
2810 throwToSingleThreaded(cap
, tso
,
2811 (StgClosure
*)blockedIndefinitelyOnMVar_closure
);
2813 case BlockedOnBlackHole
:
2814 throwToSingleThreaded(cap
, tso
,
2815 (StgClosure
*)nonTermination_closure
);
2818 throwToSingleThreaded(cap
, tso
,
2819 (StgClosure
*)blockedIndefinitelyOnSTM_closure
);
2822 /* This might happen if the thread was blocked on a black hole
2823 * belonging to a thread that we've just woken up (raiseAsync
2824 * can wake up threads, remember...).
2827 case BlockedOnMsgThrowTo
:
2828 // This can happen if the target is masking, blocks on a
2829 // black hole, and then is found to be unreachable. In
2830 // this case, we want to let the target wake up and carry
2831 // on, and do nothing to this thread.
2834 barf("resurrectThreads: thread blocked in a strange way: %d",