Merge branch 'master' of http://darcs.haskell.org//ghc
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
96 * -= chak
97 */
98 StgTSO dummy_tso;
99
100 /*
101 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
102 * in an MT setting, needed to signal that a worker thread shouldn't hang around
103 * in the scheduler when it is out of work.
104 */
105 rtsBool shutting_down_scheduler = rtsFalse;
106
107 /*
108 * This mutex protects most of the global scheduler data in
109 * the THREADED_RTS runtime.
110 */
111 #if defined(THREADED_RTS)
112 Mutex sched_mutex;
113 #endif
114
115 #if !defined(mingw32_HOST_OS)
116 #define FORKPROCESS_PRIMOP_SUPPORTED
117 #endif
118
119 // Local stats
120 #ifdef THREADED_RTS
121 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
122 #endif
123
124 /* -----------------------------------------------------------------------------
125 * static function prototypes
126 * -------------------------------------------------------------------------- */
127
128 static Capability *schedule (Capability *initialCapability, Task *task);
129
130 //
131 // These function all encapsulate parts of the scheduler loop, and are
132 // abstracted only to make the structure and control flow of the
133 // scheduler clearer.
134 //
135 static void schedulePreLoop (void);
136 static void scheduleFindWork (Capability *cap);
137 #if defined(THREADED_RTS)
138 static void scheduleYield (Capability **pcap, Task *task);
139 #endif
140 #if defined(THREADED_RTS)
141 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
142 static void acquireAllCapabilities(Capability *cap, Task *task);
143 static void releaseAllCapabilities(Capability *cap, Task *task);
144 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
145 #endif
146 static void scheduleStartSignalHandlers (Capability *cap);
147 static void scheduleCheckBlockedThreads (Capability *cap);
148 static void scheduleProcessInbox(Capability *cap);
149 static void scheduleDetectDeadlock (Capability *cap, Task *task);
150 static void schedulePushWork(Capability *cap, Task *task);
151 #if defined(THREADED_RTS)
152 static void scheduleActivateSpark(Capability *cap);
153 #endif
154 static void schedulePostRunThread(Capability *cap, StgTSO *t);
155 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
156 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
157 nat prev_what_next );
158 static void scheduleHandleThreadBlocked( StgTSO *t );
159 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
160 StgTSO *t );
161 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
162 static Capability *scheduleDoGC(Capability *cap, Task *task,
163 rtsBool force_major);
164
165 static void deleteThread (Capability *cap, StgTSO *tso);
166 static void deleteAllThreads (Capability *cap);
167
168 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
169 static void deleteThread_(Capability *cap, StgTSO *tso);
170 #endif
171
172 /* ---------------------------------------------------------------------------
173 Main scheduling loop.
174
175 We use round-robin scheduling, each thread returning to the
176 scheduler loop when one of these conditions is detected:
177
178 * out of heap space
179 * timer expires (thread yields)
180 * thread blocks
181 * thread ends
182 * stack overflow
183
184 GRAN version:
185 In a GranSim setup this loop iterates over the global event queue.
186 This revolves around the global event queue, which determines what
187 to do next. Therefore, it's more complicated than either the
188 concurrent or the parallel (GUM) setup.
189 This version has been entirely removed (JB 2008/08).
190
191 GUM version:
192 GUM iterates over incoming messages.
193 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
194 and sends out a fish whenever it has nothing to do; in-between
195 doing the actual reductions (shared code below) it processes the
196 incoming messages and deals with delayed operations
197 (see PendingFetches).
198 This is not the ugliest code you could imagine, but it's bloody close.
199
200 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
201 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
202 as well as future GUM versions. This file has been refurbished to
203 only contain valid code, which is however incomplete, refers to
204 invalid includes etc.
205
206 ------------------------------------------------------------------------ */
207
208 static Capability *
209 schedule (Capability *initialCapability, Task *task)
210 {
211 StgTSO *t;
212 Capability *cap;
213 StgThreadReturnCode ret;
214 nat prev_what_next;
215 rtsBool ready_to_gc;
216 #if defined(THREADED_RTS)
217 rtsBool first = rtsTrue;
218 #endif
219
220 cap = initialCapability;
221
222 // Pre-condition: this task owns initialCapability.
223 // The sched_mutex is *NOT* held
224 // NB. on return, we still hold a capability.
225
226 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
227
228 schedulePreLoop();
229
230 // -----------------------------------------------------------
231 // Scheduler loop starts here:
232
233 while (1) {
234
235 // Check whether we have re-entered the RTS from Haskell without
236 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
237 // call).
238 if (cap->in_haskell) {
239 errorBelch("schedule: re-entered unsafely.\n"
240 " Perhaps a 'foreign import unsafe' should be 'safe'?");
241 stg_exit(EXIT_FAILURE);
242 }
243
244 // The interruption / shutdown sequence.
245 //
246 // In order to cleanly shut down the runtime, we want to:
247 // * make sure that all main threads return to their callers
248 // with the state 'Interrupted'.
249 // * clean up all OS threads assocated with the runtime
250 // * free all memory etc.
251 //
252 // So the sequence for ^C goes like this:
253 //
254 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
255 // arranges for some Capability to wake up
256 //
257 // * all threads in the system are halted, and the zombies are
258 // placed on the run queue for cleaning up. We acquire all
259 // the capabilities in order to delete the threads, this is
260 // done by scheduleDoGC() for convenience (because GC already
261 // needs to acquire all the capabilities). We can't kill
262 // threads involved in foreign calls.
263 //
264 // * somebody calls shutdownHaskell(), which calls exitScheduler()
265 //
266 // * sched_state := SCHED_SHUTTING_DOWN
267 //
268 // * all workers exit when the run queue on their capability
269 // drains. All main threads will also exit when their TSO
270 // reaches the head of the run queue and they can return.
271 //
272 // * eventually all Capabilities will shut down, and the RTS can
273 // exit.
274 //
275 // * We might be left with threads blocked in foreign calls,
276 // we should really attempt to kill these somehow (TODO);
277
278 switch (sched_state) {
279 case SCHED_RUNNING:
280 break;
281 case SCHED_INTERRUPTING:
282 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
283 /* scheduleDoGC() deletes all the threads */
284 cap = scheduleDoGC(cap,task,rtsFalse);
285
286 // after scheduleDoGC(), we must be shutting down. Either some
287 // other Capability did the final GC, or we did it above,
288 // either way we can fall through to the SCHED_SHUTTING_DOWN
289 // case now.
290 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
291 // fall through
292
293 case SCHED_SHUTTING_DOWN:
294 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
295 // If we are a worker, just exit. If we're a bound thread
296 // then we will exit below when we've removed our TSO from
297 // the run queue.
298 if (!isBoundTask(task) && emptyRunQueue(cap)) {
299 return cap;
300 }
301 break;
302 default:
303 barf("sched_state: %d", sched_state);
304 }
305
306 scheduleFindWork(cap);
307
308 /* work pushing, currently relevant only for THREADED_RTS:
309 (pushes threads, wakes up idle capabilities for stealing) */
310 schedulePushWork(cap,task);
311
312 scheduleDetectDeadlock(cap,task);
313
314 #if defined(THREADED_RTS)
315 cap = task->cap; // reload cap, it might have changed
316 #endif
317
318 // Normally, the only way we can get here with no threads to
319 // run is if a keyboard interrupt received during
320 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
321 // Additionally, it is not fatal for the
322 // threaded RTS to reach here with no threads to run.
323 //
324 // win32: might be here due to awaitEvent() being abandoned
325 // as a result of a console event having been delivered.
326
327 #if defined(THREADED_RTS)
328 if (first)
329 {
330 // XXX: ToDo
331 // // don't yield the first time, we want a chance to run this
332 // // thread for a bit, even if there are others banging at the
333 // // door.
334 // first = rtsFalse;
335 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
336 }
337
338 scheduleYield(&cap,task);
339
340 if (emptyRunQueue(cap)) continue; // look for work again
341 #endif
342
343 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
344 if ( emptyRunQueue(cap) ) {
345 ASSERT(sched_state >= SCHED_INTERRUPTING);
346 }
347 #endif
348
349 //
350 // Get a thread to run
351 //
352 t = popRunQueue(cap);
353
354 // Sanity check the thread we're about to run. This can be
355 // expensive if there is lots of thread switching going on...
356 IF_DEBUG(sanity,checkTSO(t));
357
358 #if defined(THREADED_RTS)
359 // Check whether we can run this thread in the current task.
360 // If not, we have to pass our capability to the right task.
361 {
362 InCall *bound = t->bound;
363
364 if (bound) {
365 if (bound->task == task) {
366 // yes, the Haskell thread is bound to the current native thread
367 } else {
368 debugTrace(DEBUG_sched,
369 "thread %lu bound to another OS thread",
370 (unsigned long)t->id);
371 // no, bound to a different Haskell thread: pass to that thread
372 pushOnRunQueue(cap,t);
373 continue;
374 }
375 } else {
376 // The thread we want to run is unbound.
377 if (task->incall->tso) {
378 debugTrace(DEBUG_sched,
379 "this OS thread cannot run thread %lu",
380 (unsigned long)t->id);
381 // no, the current native thread is bound to a different
382 // Haskell thread, so pass it to any worker thread
383 pushOnRunQueue(cap,t);
384 continue;
385 }
386 }
387 }
388 #endif
389
390 // If we're shutting down, and this thread has not yet been
391 // killed, kill it now. This sometimes happens when a finalizer
392 // thread is created by the final GC, or a thread previously
393 // in a foreign call returns.
394 if (sched_state >= SCHED_INTERRUPTING &&
395 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
396 deleteThread(cap,t);
397 }
398
399 /* context switches are initiated by the timer signal, unless
400 * the user specified "context switch as often as possible", with
401 * +RTS -C0
402 */
403 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
404 && !emptyThreadQueues(cap)) {
405 cap->context_switch = 1;
406 }
407
408 run_thread:
409
410 // CurrentTSO is the thread to run. t might be different if we
411 // loop back to run_thread, so make sure to set CurrentTSO after
412 // that.
413 cap->r.rCurrentTSO = t;
414
415 startHeapProfTimer();
416
417 // ----------------------------------------------------------------------
418 // Run the current thread
419
420 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
421 ASSERT(t->cap == cap);
422 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
423
424 prev_what_next = t->what_next;
425
426 errno = t->saved_errno;
427 #if mingw32_HOST_OS
428 SetLastError(t->saved_winerror);
429 #endif
430
431 // reset the interrupt flag before running Haskell code
432 cap->interrupt = 0;
433
434 cap->in_haskell = rtsTrue;
435 cap->idle = 0;
436
437 dirty_TSO(cap,t);
438 dirty_STACK(cap,t->stackobj);
439
440 #if defined(THREADED_RTS)
441 if (recent_activity == ACTIVITY_DONE_GC) {
442 // ACTIVITY_DONE_GC means we turned off the timer signal to
443 // conserve power (see #1623). Re-enable it here.
444 nat prev;
445 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
446 if (prev == ACTIVITY_DONE_GC) {
447 startTimer();
448 }
449 } else if (recent_activity != ACTIVITY_INACTIVE) {
450 // If we reached ACTIVITY_INACTIVE, then don't reset it until
451 // we've done the GC. The thread running here might just be
452 // the IO manager thread that handle_tick() woke up via
453 // wakeUpRts().
454 recent_activity = ACTIVITY_YES;
455 }
456 #endif
457
458 traceEventRunThread(cap, t);
459
460 switch (prev_what_next) {
461
462 case ThreadKilled:
463 case ThreadComplete:
464 /* Thread already finished, return to scheduler. */
465 ret = ThreadFinished;
466 break;
467
468 case ThreadRunGHC:
469 {
470 StgRegTable *r;
471 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
472 cap = regTableToCapability(r);
473 ret = r->rRet;
474 break;
475 }
476
477 case ThreadInterpret:
478 cap = interpretBCO(cap);
479 ret = cap->r.rRet;
480 break;
481
482 default:
483 barf("schedule: invalid what_next field");
484 }
485
486 cap->in_haskell = rtsFalse;
487
488 // The TSO might have moved, eg. if it re-entered the RTS and a GC
489 // happened. So find the new location:
490 t = cap->r.rCurrentTSO;
491
492 // And save the current errno in this thread.
493 // XXX: possibly bogus for SMP because this thread might already
494 // be running again, see code below.
495 t->saved_errno = errno;
496 #if mingw32_HOST_OS
497 // Similarly for Windows error code
498 t->saved_winerror = GetLastError();
499 #endif
500
501 if (ret == ThreadBlocked) {
502 if (t->why_blocked == BlockedOnBlackHole) {
503 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
504 traceEventStopThread(cap, t, t->why_blocked + 6,
505 owner != NULL ? owner->id : 0);
506 } else {
507 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
508 }
509 } else {
510 traceEventStopThread(cap, t, ret, 0);
511 }
512
513 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
514 ASSERT(t->cap == cap);
515
516 // ----------------------------------------------------------------------
517
518 // Costs for the scheduler are assigned to CCS_SYSTEM
519 stopHeapProfTimer();
520 #if defined(PROFILING)
521 cap->r.rCCCS = CCS_SYSTEM;
522 #endif
523
524 schedulePostRunThread(cap,t);
525
526 ready_to_gc = rtsFalse;
527
528 switch (ret) {
529 case HeapOverflow:
530 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
531 break;
532
533 case StackOverflow:
534 // just adjust the stack for this thread, then pop it back
535 // on the run queue.
536 threadStackOverflow(cap, t);
537 pushOnRunQueue(cap,t);
538 break;
539
540 case ThreadYielding:
541 if (scheduleHandleYield(cap, t, prev_what_next)) {
542 // shortcut for switching between compiler/interpreter:
543 goto run_thread;
544 }
545 break;
546
547 case ThreadBlocked:
548 scheduleHandleThreadBlocked(t);
549 break;
550
551 case ThreadFinished:
552 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
553 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
554 break;
555
556 default:
557 barf("schedule: invalid thread return code %d", (int)ret);
558 }
559
560 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
561 cap = scheduleDoGC(cap,task,rtsFalse);
562 }
563 } /* end of while() */
564 }
565
566 /* -----------------------------------------------------------------------------
567 * Run queue operations
568 * -------------------------------------------------------------------------- */
569
570 void
571 removeFromRunQueue (Capability *cap, StgTSO *tso)
572 {
573 if (tso->block_info.prev == END_TSO_QUEUE) {
574 ASSERT(cap->run_queue_hd == tso);
575 cap->run_queue_hd = tso->_link;
576 } else {
577 setTSOLink(cap, tso->block_info.prev, tso->_link);
578 }
579 if (tso->_link == END_TSO_QUEUE) {
580 ASSERT(cap->run_queue_tl == tso);
581 cap->run_queue_tl = tso->block_info.prev;
582 } else {
583 setTSOPrev(cap, tso->_link, tso->block_info.prev);
584 }
585 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
586
587 IF_DEBUG(sanity, checkRunQueue(cap));
588 }
589
590 /* ----------------------------------------------------------------------------
591 * Setting up the scheduler loop
592 * ------------------------------------------------------------------------- */
593
594 static void
595 schedulePreLoop(void)
596 {
597 // initialisation for scheduler - what cannot go into initScheduler()
598
599 #if defined(mingw32_HOST_OS)
600 win32AllocStack();
601 #endif
602 }
603
604 /* -----------------------------------------------------------------------------
605 * scheduleFindWork()
606 *
607 * Search for work to do, and handle messages from elsewhere.
608 * -------------------------------------------------------------------------- */
609
610 static void
611 scheduleFindWork (Capability *cap)
612 {
613 scheduleStartSignalHandlers(cap);
614
615 scheduleProcessInbox(cap);
616
617 scheduleCheckBlockedThreads(cap);
618
619 #if defined(THREADED_RTS)
620 if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
621 #endif
622 }
623
624 #if defined(THREADED_RTS)
625 STATIC_INLINE rtsBool
626 shouldYieldCapability (Capability *cap, Task *task)
627 {
628 // we need to yield this capability to someone else if..
629 // - another thread is initiating a GC
630 // - another Task is returning from a foreign call
631 // - the thread at the head of the run queue cannot be run
632 // by this Task (it is bound to another Task, or it is unbound
633 // and this task it bound).
634 return (pending_sync ||
635 cap->returning_tasks_hd != NULL ||
636 (!emptyRunQueue(cap) && (task->incall->tso == NULL
637 ? cap->run_queue_hd->bound != NULL
638 : cap->run_queue_hd->bound != task->incall)));
639 }
640
641 // This is the single place where a Task goes to sleep. There are
642 // two reasons it might need to sleep:
643 // - there are no threads to run
644 // - we need to yield this Capability to someone else
645 // (see shouldYieldCapability())
646 //
647 // Careful: the scheduler loop is quite delicate. Make sure you run
648 // the tests in testsuite/concurrent (all ways) after modifying this,
649 // and also check the benchmarks in nofib/parallel for regressions.
650
651 static void
652 scheduleYield (Capability **pcap, Task *task)
653 {
654 Capability *cap = *pcap;
655
656 // if we have work, and we don't need to give up the Capability, continue.
657 //
658 if (!shouldYieldCapability(cap,task) &&
659 (!emptyRunQueue(cap) ||
660 !emptyInbox(cap) ||
661 sched_state >= SCHED_INTERRUPTING))
662 return;
663
664 // otherwise yield (sleep), and keep yielding if necessary.
665 do {
666 yieldCapability(&cap,task);
667 }
668 while (shouldYieldCapability(cap,task));
669
670 // note there may still be no threads on the run queue at this
671 // point, the caller has to check.
672
673 *pcap = cap;
674 return;
675 }
676 #endif
677
678 /* -----------------------------------------------------------------------------
679 * schedulePushWork()
680 *
681 * Push work to other Capabilities if we have some.
682 * -------------------------------------------------------------------------- */
683
684 static void
685 schedulePushWork(Capability *cap USED_IF_THREADS,
686 Task *task USED_IF_THREADS)
687 {
688 /* following code not for PARALLEL_HASKELL. I kept the call general,
689 future GUM versions might use pushing in a distributed setup */
690 #if defined(THREADED_RTS)
691
692 Capability *free_caps[n_capabilities], *cap0;
693 nat i, n_free_caps;
694
695 // migration can be turned off with +RTS -qm
696 if (!RtsFlags.ParFlags.migrate) return;
697
698 // Check whether we have more threads on our run queue, or sparks
699 // in our pool, that we could hand to another Capability.
700 if (cap->run_queue_hd == END_TSO_QUEUE) {
701 if (sparkPoolSizeCap(cap) < 2) return;
702 } else {
703 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
704 sparkPoolSizeCap(cap) < 1) return;
705 }
706
707 // First grab as many free Capabilities as we can.
708 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
709 cap0 = &capabilities[i];
710 if (cap != cap0 && tryGrabCapability(cap0,task)) {
711 if (!emptyRunQueue(cap0)
712 || cap->returning_tasks_hd != NULL
713 || cap->inbox != (Message*)END_TSO_QUEUE) {
714 // it already has some work, we just grabbed it at
715 // the wrong moment. Or maybe it's deadlocked!
716 releaseCapability(cap0);
717 } else {
718 free_caps[n_free_caps++] = cap0;
719 }
720 }
721 }
722
723 // we now have n_free_caps free capabilities stashed in
724 // free_caps[]. Share our run queue equally with them. This is
725 // probably the simplest thing we could do; improvements we might
726 // want to do include:
727 //
728 // - giving high priority to moving relatively new threads, on
729 // the gournds that they haven't had time to build up a
730 // working set in the cache on this CPU/Capability.
731 //
732 // - giving low priority to moving long-lived threads
733
734 if (n_free_caps > 0) {
735 StgTSO *prev, *t, *next;
736 #ifdef SPARK_PUSHING
737 rtsBool pushed_to_all;
738 #endif
739
740 debugTrace(DEBUG_sched,
741 "cap %d: %s and %d free capabilities, sharing...",
742 cap->no,
743 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
744 "excess threads on run queue":"sparks to share (>=2)",
745 n_free_caps);
746
747 i = 0;
748 #ifdef SPARK_PUSHING
749 pushed_to_all = rtsFalse;
750 #endif
751
752 if (cap->run_queue_hd != END_TSO_QUEUE) {
753 prev = cap->run_queue_hd;
754 t = prev->_link;
755 prev->_link = END_TSO_QUEUE;
756 for (; t != END_TSO_QUEUE; t = next) {
757 next = t->_link;
758 t->_link = END_TSO_QUEUE;
759 if (t->bound == task->incall // don't move my bound thread
760 || tsoLocked(t)) { // don't move a locked thread
761 setTSOLink(cap, prev, t);
762 setTSOPrev(cap, t, prev);
763 prev = t;
764 } else if (i == n_free_caps) {
765 #ifdef SPARK_PUSHING
766 pushed_to_all = rtsTrue;
767 #endif
768 i = 0;
769 // keep one for us
770 setTSOLink(cap, prev, t);
771 setTSOPrev(cap, t, prev);
772 prev = t;
773 } else {
774 appendToRunQueue(free_caps[i],t);
775
776 traceEventMigrateThread (cap, t, free_caps[i]->no);
777
778 if (t->bound) { t->bound->task->cap = free_caps[i]; }
779 t->cap = free_caps[i];
780 i++;
781 }
782 }
783 cap->run_queue_tl = prev;
784
785 IF_DEBUG(sanity, checkRunQueue(cap));
786 }
787
788 #ifdef SPARK_PUSHING
789 /* JB I left this code in place, it would work but is not necessary */
790
791 // If there are some free capabilities that we didn't push any
792 // threads to, then try to push a spark to each one.
793 if (!pushed_to_all) {
794 StgClosure *spark;
795 // i is the next free capability to push to
796 for (; i < n_free_caps; i++) {
797 if (emptySparkPoolCap(free_caps[i])) {
798 spark = tryStealSpark(cap->sparks);
799 if (spark != NULL) {
800 /* TODO: if anyone wants to re-enable this code then
801 * they must consider the fizzledSpark(spark) case
802 * and update the per-cap spark statistics.
803 */
804 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
805
806 traceEventStealSpark(free_caps[i], t, cap->no);
807
808 newSpark(&(free_caps[i]->r), spark);
809 }
810 }
811 }
812 }
813 #endif /* SPARK_PUSHING */
814
815 // release the capabilities
816 for (i = 0; i < n_free_caps; i++) {
817 task->cap = free_caps[i];
818 releaseAndWakeupCapability(free_caps[i]);
819 }
820 }
821 task->cap = cap; // reset to point to our Capability.
822
823 #endif /* THREADED_RTS */
824
825 }
826
827 /* ----------------------------------------------------------------------------
828 * Start any pending signal handlers
829 * ------------------------------------------------------------------------- */
830
831 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
832 static void
833 scheduleStartSignalHandlers(Capability *cap)
834 {
835 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
836 // safe outside the lock
837 startSignalHandlers(cap);
838 }
839 }
840 #else
841 static void
842 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
843 {
844 }
845 #endif
846
847 /* ----------------------------------------------------------------------------
848 * Check for blocked threads that can be woken up.
849 * ------------------------------------------------------------------------- */
850
851 static void
852 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
853 {
854 #if !defined(THREADED_RTS)
855 //
856 // Check whether any waiting threads need to be woken up. If the
857 // run queue is empty, and there are no other tasks running, we
858 // can wait indefinitely for something to happen.
859 //
860 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
861 {
862 awaitEvent (emptyRunQueue(cap));
863 }
864 #endif
865 }
866
867 /* ----------------------------------------------------------------------------
868 * Detect deadlock conditions and attempt to resolve them.
869 * ------------------------------------------------------------------------- */
870
871 static void
872 scheduleDetectDeadlock (Capability *cap, Task *task)
873 {
874 /*
875 * Detect deadlock: when we have no threads to run, there are no
876 * threads blocked, waiting for I/O, or sleeping, and all the
877 * other tasks are waiting for work, we must have a deadlock of
878 * some description.
879 */
880 if ( emptyThreadQueues(cap) )
881 {
882 #if defined(THREADED_RTS)
883 /*
884 * In the threaded RTS, we only check for deadlock if there
885 * has been no activity in a complete timeslice. This means
886 * we won't eagerly start a full GC just because we don't have
887 * any threads to run currently.
888 */
889 if (recent_activity != ACTIVITY_INACTIVE) return;
890 #endif
891
892 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
893
894 // Garbage collection can release some new threads due to
895 // either (a) finalizers or (b) threads resurrected because
896 // they are unreachable and will therefore be sent an
897 // exception. Any threads thus released will be immediately
898 // runnable.
899 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
900 // when force_major == rtsTrue. scheduleDoGC sets
901 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
902 // signal.
903
904 if ( !emptyRunQueue(cap) ) return;
905
906 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
907 /* If we have user-installed signal handlers, then wait
908 * for signals to arrive rather then bombing out with a
909 * deadlock.
910 */
911 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
912 debugTrace(DEBUG_sched,
913 "still deadlocked, waiting for signals...");
914
915 awaitUserSignals();
916
917 if (signals_pending()) {
918 startSignalHandlers(cap);
919 }
920
921 // either we have threads to run, or we were interrupted:
922 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
923
924 return;
925 }
926 #endif
927
928 #if !defined(THREADED_RTS)
929 /* Probably a real deadlock. Send the current main thread the
930 * Deadlock exception.
931 */
932 if (task->incall->tso) {
933 switch (task->incall->tso->why_blocked) {
934 case BlockedOnSTM:
935 case BlockedOnBlackHole:
936 case BlockedOnMsgThrowTo:
937 case BlockedOnMVar:
938 throwToSingleThreaded(cap, task->incall->tso,
939 (StgClosure *)nonTermination_closure);
940 return;
941 default:
942 barf("deadlock: main thread blocked in a strange way");
943 }
944 }
945 return;
946 #endif
947 }
948 }
949
950
951 /* ----------------------------------------------------------------------------
952 * Send pending messages (PARALLEL_HASKELL only)
953 * ------------------------------------------------------------------------- */
954
955 #if defined(PARALLEL_HASKELL)
956 static void
957 scheduleSendPendingMessages(void)
958 {
959
960 # if defined(PAR) // global Mem.Mgmt., omit for now
961 if (PendingFetches != END_BF_QUEUE) {
962 processFetches();
963 }
964 # endif
965
966 if (RtsFlags.ParFlags.BufferTime) {
967 // if we use message buffering, we must send away all message
968 // packets which have become too old...
969 sendOldBuffers();
970 }
971 }
972 #endif
973
974 /* ----------------------------------------------------------------------------
975 * Process message in the current Capability's inbox
976 * ------------------------------------------------------------------------- */
977
978 static void
979 scheduleProcessInbox (Capability *cap USED_IF_THREADS)
980 {
981 #if defined(THREADED_RTS)
982 Message *m, *next;
983 int r;
984
985 while (!emptyInbox(cap)) {
986 if (cap->r.rCurrentNursery->link == NULL ||
987 g0->n_new_large_words >= large_alloc_lim) {
988 scheduleDoGC(cap, cap->running_task, rtsFalse);
989 }
990
991 // don't use a blocking acquire; if the lock is held by
992 // another thread then just carry on. This seems to avoid
993 // getting stuck in a message ping-pong situation with other
994 // processors. We'll check the inbox again later anyway.
995 //
996 // We should really use a more efficient queue data structure
997 // here. The trickiness is that we must ensure a Capability
998 // never goes idle if the inbox is non-empty, which is why we
999 // use cap->lock (cap->lock is released as the last thing
1000 // before going idle; see Capability.c:releaseCapability()).
1001 r = TRY_ACQUIRE_LOCK(&cap->lock);
1002 if (r != 0) return;
1003
1004 m = cap->inbox;
1005 cap->inbox = (Message*)END_TSO_QUEUE;
1006
1007 RELEASE_LOCK(&cap->lock);
1008
1009 while (m != (Message*)END_TSO_QUEUE) {
1010 next = m->link;
1011 executeMessage(cap, m);
1012 m = next;
1013 }
1014 }
1015 #endif
1016 }
1017
1018 /* ----------------------------------------------------------------------------
1019 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1020 * ------------------------------------------------------------------------- */
1021
1022 #if defined(THREADED_RTS)
1023 static void
1024 scheduleActivateSpark(Capability *cap)
1025 {
1026 if (anySparks())
1027 {
1028 createSparkThread(cap);
1029 debugTrace(DEBUG_sched, "creating a spark thread");
1030 }
1031 }
1032 #endif // PARALLEL_HASKELL || THREADED_RTS
1033
1034 /* ----------------------------------------------------------------------------
1035 * After running a thread...
1036 * ------------------------------------------------------------------------- */
1037
1038 static void
1039 schedulePostRunThread (Capability *cap, StgTSO *t)
1040 {
1041 // We have to be able to catch transactions that are in an
1042 // infinite loop as a result of seeing an inconsistent view of
1043 // memory, e.g.
1044 //
1045 // atomically $ do
1046 // [a,b] <- mapM readTVar [ta,tb]
1047 // when (a == b) loop
1048 //
1049 // and a is never equal to b given a consistent view of memory.
1050 //
1051 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1052 if (!stmValidateNestOfTransactions (t -> trec)) {
1053 debugTrace(DEBUG_sched | DEBUG_stm,
1054 "trec %p found wasting its time", t);
1055
1056 // strip the stack back to the
1057 // ATOMICALLY_FRAME, aborting the (nested)
1058 // transaction, and saving the stack of any
1059 // partially-evaluated thunks on the heap.
1060 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1061
1062 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1063 }
1064 }
1065
1066 /* some statistics gathering in the parallel case */
1067 }
1068
1069 /* -----------------------------------------------------------------------------
1070 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1071 * -------------------------------------------------------------------------- */
1072
1073 static rtsBool
1074 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1075 {
1076 // did the task ask for a large block?
1077 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1078 // if so, get one and push it on the front of the nursery.
1079 bdescr *bd;
1080 lnat blocks;
1081
1082 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1083
1084 if (blocks > BLOCKS_PER_MBLOCK) {
1085 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1086 }
1087
1088 debugTrace(DEBUG_sched,
1089 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1090 (long)t->id, what_next_strs[t->what_next], blocks);
1091
1092 // don't do this if the nursery is (nearly) full, we'll GC first.
1093 if (cap->r.rCurrentNursery->link != NULL ||
1094 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1095 // if the nursery has only one block.
1096
1097 bd = allocGroup_lock(blocks);
1098 cap->r.rNursery->n_blocks += blocks;
1099
1100 // link the new group into the list
1101 bd->link = cap->r.rCurrentNursery;
1102 bd->u.back = cap->r.rCurrentNursery->u.back;
1103 if (cap->r.rCurrentNursery->u.back != NULL) {
1104 cap->r.rCurrentNursery->u.back->link = bd;
1105 } else {
1106 cap->r.rNursery->blocks = bd;
1107 }
1108 cap->r.rCurrentNursery->u.back = bd;
1109
1110 // initialise it as a nursery block. We initialise the
1111 // step, gen_no, and flags field of *every* sub-block in
1112 // this large block, because this is easier than making
1113 // sure that we always find the block head of a large
1114 // block whenever we call Bdescr() (eg. evacuate() and
1115 // isAlive() in the GC would both have to do this, at
1116 // least).
1117 {
1118 bdescr *x;
1119 for (x = bd; x < bd + blocks; x++) {
1120 initBdescr(x,g0,g0);
1121 x->free = x->start;
1122 x->flags = 0;
1123 }
1124 }
1125
1126 // This assert can be a killer if the app is doing lots
1127 // of large block allocations.
1128 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1129
1130 // now update the nursery to point to the new block
1131 cap->r.rCurrentNursery = bd;
1132
1133 // we might be unlucky and have another thread get on the
1134 // run queue before us and steal the large block, but in that
1135 // case the thread will just end up requesting another large
1136 // block.
1137 pushOnRunQueue(cap,t);
1138 return rtsFalse; /* not actually GC'ing */
1139 }
1140 }
1141
1142 if (cap->r.rHpLim == NULL || cap->context_switch) {
1143 // Sometimes we miss a context switch, e.g. when calling
1144 // primitives in a tight loop, MAYBE_GC() doesn't check the
1145 // context switch flag, and we end up waiting for a GC.
1146 // See #1984, and concurrent/should_run/1984
1147 cap->context_switch = 0;
1148 appendToRunQueue(cap,t);
1149 } else {
1150 pushOnRunQueue(cap,t);
1151 }
1152 return rtsTrue;
1153 /* actual GC is done at the end of the while loop in schedule() */
1154 }
1155
1156 /* -----------------------------------------------------------------------------
1157 * Handle a thread that returned to the scheduler with ThreadYielding
1158 * -------------------------------------------------------------------------- */
1159
1160 static rtsBool
1161 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1162 {
1163 /* put the thread back on the run queue. Then, if we're ready to
1164 * GC, check whether this is the last task to stop. If so, wake
1165 * up the GC thread. getThread will block during a GC until the
1166 * GC is finished.
1167 */
1168
1169 ASSERT(t->_link == END_TSO_QUEUE);
1170
1171 // Shortcut if we're just switching evaluators: don't bother
1172 // doing stack squeezing (which can be expensive), just run the
1173 // thread.
1174 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1175 debugTrace(DEBUG_sched,
1176 "--<< thread %ld (%s) stopped to switch evaluators",
1177 (long)t->id, what_next_strs[t->what_next]);
1178 return rtsTrue;
1179 }
1180
1181 // Reset the context switch flag. We don't do this just before
1182 // running the thread, because that would mean we would lose ticks
1183 // during GC, which can lead to unfair scheduling (a thread hogs
1184 // the CPU because the tick always arrives during GC). This way
1185 // penalises threads that do a lot of allocation, but that seems
1186 // better than the alternative.
1187 if (cap->context_switch != 0) {
1188 cap->context_switch = 0;
1189 appendToRunQueue(cap,t);
1190 } else {
1191 pushOnRunQueue(cap,t);
1192 }
1193
1194 IF_DEBUG(sanity,
1195 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1196 checkTSO(t));
1197
1198 return rtsFalse;
1199 }
1200
1201 /* -----------------------------------------------------------------------------
1202 * Handle a thread that returned to the scheduler with ThreadBlocked
1203 * -------------------------------------------------------------------------- */
1204
1205 static void
1206 scheduleHandleThreadBlocked( StgTSO *t
1207 #if !defined(DEBUG)
1208 STG_UNUSED
1209 #endif
1210 )
1211 {
1212
1213 // We don't need to do anything. The thread is blocked, and it
1214 // has tidied up its stack and placed itself on whatever queue
1215 // it needs to be on.
1216
1217 // ASSERT(t->why_blocked != NotBlocked);
1218 // Not true: for example,
1219 // - the thread may have woken itself up already, because
1220 // threadPaused() might have raised a blocked throwTo
1221 // exception, see maybePerformBlockedException().
1222
1223 #ifdef DEBUG
1224 traceThreadStatus(DEBUG_sched, t);
1225 #endif
1226 }
1227
1228 /* -----------------------------------------------------------------------------
1229 * Handle a thread that returned to the scheduler with ThreadFinished
1230 * -------------------------------------------------------------------------- */
1231
1232 static rtsBool
1233 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1234 {
1235 /* Need to check whether this was a main thread, and if so,
1236 * return with the return value.
1237 *
1238 * We also end up here if the thread kills itself with an
1239 * uncaught exception, see Exception.cmm.
1240 */
1241
1242 // blocked exceptions can now complete, even if the thread was in
1243 // blocked mode (see #2910).
1244 awakenBlockedExceptionQueue (cap, t);
1245
1246 //
1247 // Check whether the thread that just completed was a bound
1248 // thread, and if so return with the result.
1249 //
1250 // There is an assumption here that all thread completion goes
1251 // through this point; we need to make sure that if a thread
1252 // ends up in the ThreadKilled state, that it stays on the run
1253 // queue so it can be dealt with here.
1254 //
1255
1256 if (t->bound) {
1257
1258 if (t->bound != task->incall) {
1259 #if !defined(THREADED_RTS)
1260 // Must be a bound thread that is not the topmost one. Leave
1261 // it on the run queue until the stack has unwound to the
1262 // point where we can deal with this. Leaving it on the run
1263 // queue also ensures that the garbage collector knows about
1264 // this thread and its return value (it gets dropped from the
1265 // step->threads list so there's no other way to find it).
1266 appendToRunQueue(cap,t);
1267 return rtsFalse;
1268 #else
1269 // this cannot happen in the threaded RTS, because a
1270 // bound thread can only be run by the appropriate Task.
1271 barf("finished bound thread that isn't mine");
1272 #endif
1273 }
1274
1275 ASSERT(task->incall->tso == t);
1276
1277 if (t->what_next == ThreadComplete) {
1278 if (task->incall->ret) {
1279 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1280 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1281 }
1282 task->incall->stat = Success;
1283 } else {
1284 if (task->incall->ret) {
1285 *(task->incall->ret) = NULL;
1286 }
1287 if (sched_state >= SCHED_INTERRUPTING) {
1288 if (heap_overflow) {
1289 task->incall->stat = HeapExhausted;
1290 } else {
1291 task->incall->stat = Interrupted;
1292 }
1293 } else {
1294 task->incall->stat = Killed;
1295 }
1296 }
1297 #ifdef DEBUG
1298 removeThreadLabel((StgWord)task->incall->tso->id);
1299 #endif
1300
1301 // We no longer consider this thread and task to be bound to
1302 // each other. The TSO lives on until it is GC'd, but the
1303 // task is about to be released by the caller, and we don't
1304 // want anyone following the pointer from the TSO to the
1305 // defunct task (which might have already been
1306 // re-used). This was a real bug: the GC updated
1307 // tso->bound->tso which lead to a deadlock.
1308 t->bound = NULL;
1309 task->incall->tso = NULL;
1310
1311 return rtsTrue; // tells schedule() to return
1312 }
1313
1314 return rtsFalse;
1315 }
1316
1317 /* -----------------------------------------------------------------------------
1318 * Perform a heap census
1319 * -------------------------------------------------------------------------- */
1320
1321 static rtsBool
1322 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1323 {
1324 // When we have +RTS -i0 and we're heap profiling, do a census at
1325 // every GC. This lets us get repeatable runs for debugging.
1326 if (performHeapProfile ||
1327 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1328 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1329 return rtsTrue;
1330 } else {
1331 return rtsFalse;
1332 }
1333 }
1334
1335 /* -----------------------------------------------------------------------------
1336 * Start a synchronisation of all capabilities
1337 * -------------------------------------------------------------------------- */
1338
1339 // Returns:
1340 // 0 if we successfully got a sync
1341 // non-0 if there was another sync request in progress,
1342 // and we yielded to it. The value returned is the
1343 // type of the other sync request.
1344 //
1345 #if defined(THREADED_RTS)
1346 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1347 {
1348 nat prev_pending_sync;
1349
1350 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1351
1352 if (prev_pending_sync)
1353 {
1354 do {
1355 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1356 prev_pending_sync);
1357 ASSERT(*pcap);
1358 yieldCapability(pcap,task);
1359 } while (pending_sync);
1360 return prev_pending_sync; // NOTE: task->cap might have changed now
1361 }
1362 else
1363 {
1364 return 0;
1365 }
1366 }
1367
1368 //
1369 // Grab all the capabilities except the one we already hold. Used
1370 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1371 // before a fork (SYNC_OTHER).
1372 //
1373 // Only call this after requestSync(), otherwise a deadlock might
1374 // ensue if another thread is trying to synchronise.
1375 //
1376 static void acquireAllCapabilities(Capability *cap, Task *task)
1377 {
1378 Capability *tmpcap;
1379 nat i;
1380
1381 for (i=0; i < n_capabilities; i++) {
1382 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1383 tmpcap = &capabilities[i];
1384 if (tmpcap != cap) {
1385 // we better hope this task doesn't get migrated to
1386 // another Capability while we're waiting for this one.
1387 // It won't, because load balancing happens while we have
1388 // all the Capabilities, but even so it's a slightly
1389 // unsavoury invariant.
1390 task->cap = tmpcap;
1391 waitForReturnCapability(&tmpcap, task);
1392 if (tmpcap->no != i) {
1393 barf("acquireAllCapabilities: got the wrong capability");
1394 }
1395 }
1396 }
1397 task->cap = cap;
1398 }
1399
1400 static void releaseAllCapabilities(Capability *cap, Task *task)
1401 {
1402 nat i;
1403
1404 for (i = 0; i < n_capabilities; i++) {
1405 if (cap->no != i) {
1406 task->cap = &capabilities[i];
1407 releaseCapability(&capabilities[i]);
1408 }
1409 }
1410 task->cap = cap;
1411 }
1412 #endif
1413
1414 /* -----------------------------------------------------------------------------
1415 * Perform a garbage collection if necessary
1416 * -------------------------------------------------------------------------- */
1417
1418 static Capability *
1419 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1420 {
1421 rtsBool heap_census;
1422 #ifdef THREADED_RTS
1423 rtsBool idle_cap[n_capabilities];
1424 rtsBool gc_type;
1425 nat i, sync;
1426 #endif
1427
1428 if (sched_state == SCHED_SHUTTING_DOWN) {
1429 // The final GC has already been done, and the system is
1430 // shutting down. We'll probably deadlock if we try to GC
1431 // now.
1432 return cap;
1433 }
1434
1435 #ifdef THREADED_RTS
1436 if (sched_state < SCHED_INTERRUPTING
1437 && RtsFlags.ParFlags.parGcEnabled
1438 && N >= RtsFlags.ParFlags.parGcGen
1439 && ! oldest_gen->mark)
1440 {
1441 gc_type = SYNC_GC_PAR;
1442 } else {
1443 gc_type = SYNC_GC_SEQ;
1444 }
1445
1446 // In order to GC, there must be no threads running Haskell code.
1447 // Therefore, the GC thread needs to hold *all* the capabilities,
1448 // and release them after the GC has completed.
1449 //
1450 // This seems to be the simplest way: previous attempts involved
1451 // making all the threads with capabilities give up their
1452 // capabilities and sleep except for the *last* one, which
1453 // actually did the GC. But it's quite hard to arrange for all
1454 // the other tasks to sleep and stay asleep.
1455 //
1456
1457 /* Other capabilities are prevented from running yet more Haskell
1458 threads if pending_sync is set. Tested inside
1459 yieldCapability() and releaseCapability() in Capability.c */
1460
1461 do {
1462 sync = requestSync(&cap, task, gc_type);
1463 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1464 // someone else had a pending sync request for a GC, so
1465 // let's assume GC has been done and we don't need to GC
1466 // again.
1467 return cap;
1468 }
1469 } while (sync);
1470
1471 interruptAllCapabilities();
1472
1473 // The final shutdown GC is always single-threaded, because it's
1474 // possible that some of the Capabilities have no worker threads.
1475
1476 if (gc_type == SYNC_GC_SEQ)
1477 {
1478 traceEventRequestSeqGc(cap);
1479 }
1480 else
1481 {
1482 traceEventRequestParGc(cap);
1483 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1484 }
1485
1486 if (gc_type == SYNC_GC_SEQ)
1487 {
1488 // single-threaded GC: grab all the capabilities
1489 acquireAllCapabilities(cap,task);
1490 }
1491 else
1492 {
1493 // If we are load-balancing collections in this
1494 // generation, then we require all GC threads to participate
1495 // in the collection. Otherwise, we only require active
1496 // threads to participate, and we set gc_threads[i]->idle for
1497 // any idle capabilities. The rationale here is that waking
1498 // up an idle Capability takes much longer than just doing any
1499 // GC work on its behalf.
1500
1501 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1502 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1503 N >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1504 for (i=0; i < n_capabilities; i++) {
1505 idle_cap[i] = rtsFalse;
1506 }
1507 } else {
1508 for (i=0; i < n_capabilities; i++) {
1509 if (i == cap->no || capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1510 idle_cap[i] = rtsFalse;
1511 } else {
1512 idle_cap[i] = tryGrabCapability(&capabilities[i], task);
1513 if (!idle_cap[i]) {
1514 n_failed_trygrab_idles++;
1515 } else {
1516 n_idle_caps++;
1517 }
1518 }
1519 }
1520 }
1521
1522 // We set the gc_thread[i]->idle flag if that
1523 // capability/thread is not participating in this collection.
1524 // We also keep a local record of which capabilities are idle
1525 // in idle_cap[], because scheduleDoGC() is re-entrant:
1526 // another thread might start a GC as soon as we've finished
1527 // this one, and thus the gc_thread[]->idle flags are invalid
1528 // as soon as we release any threads after GC. Getting this
1529 // wrong leads to a rare and hard to debug deadlock!
1530
1531 for (i=0; i < n_capabilities; i++) {
1532 gc_threads[i]->idle = idle_cap[i];
1533 capabilities[i].idle++;
1534 }
1535
1536 // For all capabilities participating in this GC, wait until
1537 // they have stopped mutating and are standing by for GC.
1538 waitForGcThreads(cap);
1539
1540 #if defined(THREADED_RTS)
1541 // Stable point where we can do a global check on our spark counters
1542 ASSERT(checkSparkCountInvariant());
1543 #endif
1544 }
1545
1546 #endif
1547
1548 IF_DEBUG(scheduler, printAllThreads());
1549
1550 delete_threads_and_gc:
1551 /*
1552 * We now have all the capabilities; if we're in an interrupting
1553 * state, then we should take the opportunity to delete all the
1554 * threads in the system.
1555 */
1556 if (sched_state == SCHED_INTERRUPTING) {
1557 deleteAllThreads(cap);
1558 #if defined(THREADED_RTS)
1559 // Discard all the sparks from every Capability. Why?
1560 // They'll probably be GC'd anyway since we've killed all the
1561 // threads. It just avoids the GC having to do any work to
1562 // figure out that any remaining sparks are garbage.
1563 for (i = 0; i < n_capabilities; i++) {
1564 capabilities[i].spark_stats.gcd +=
1565 sparkPoolSize(capabilities[i].sparks);
1566 // No race here since all Caps are stopped.
1567 discardSparksCap(&capabilities[i]);
1568 }
1569 #endif
1570 sched_state = SCHED_SHUTTING_DOWN;
1571 }
1572
1573 heap_census = scheduleNeedHeapProfile(rtsTrue);
1574
1575 traceEventGcStart(cap);
1576 #if defined(THREADED_RTS)
1577 // reset pending_sync *before* GC, so that when the GC threads
1578 // emerge they don't immediately re-enter the GC.
1579 pending_sync = 0;
1580 GarbageCollect(force_major || heap_census, heap_census, gc_type, cap);
1581 #else
1582 GarbageCollect(force_major || heap_census, heap_census, 0, cap);
1583 #endif
1584 traceEventGcEnd(cap);
1585
1586 traceSparkCounters(cap);
1587
1588 if (recent_activity == ACTIVITY_INACTIVE && force_major)
1589 {
1590 // We are doing a GC because the system has been idle for a
1591 // timeslice and we need to check for deadlock. Record the
1592 // fact that we've done a GC and turn off the timer signal;
1593 // it will get re-enabled if we run any threads after the GC.
1594 recent_activity = ACTIVITY_DONE_GC;
1595 stopTimer();
1596 }
1597 else
1598 {
1599 // the GC might have taken long enough for the timer to set
1600 // recent_activity = ACTIVITY_INACTIVE, but we aren't
1601 // necessarily deadlocked:
1602 recent_activity = ACTIVITY_YES;
1603 }
1604
1605 #if defined(THREADED_RTS)
1606 // Stable point where we can do a global check on our spark counters
1607 ASSERT(checkSparkCountInvariant());
1608 #endif
1609
1610 // The heap census itself is done during GarbageCollect().
1611 if (heap_census) {
1612 performHeapProfile = rtsFalse;
1613 }
1614
1615 #if defined(THREADED_RTS)
1616 if (gc_type == SYNC_GC_PAR)
1617 {
1618 releaseGCThreads(cap);
1619 for (i = 0; i < n_capabilities; i++) {
1620 if (i != cap->no) {
1621 if (idle_cap[i]) {
1622 ASSERT(capabilities[i].running_task == task);
1623 task->cap = &capabilities[i];
1624 releaseCapability(&capabilities[i]);
1625 } else {
1626 ASSERT(capabilities[i].running_task != task);
1627 }
1628 }
1629 }
1630 task->cap = cap;
1631 }
1632 #endif
1633
1634 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1635 // GC set the heap_overflow flag, so we should proceed with
1636 // an orderly shutdown now. Ultimately we want the main
1637 // thread to return to its caller with HeapExhausted, at which
1638 // point the caller should call hs_exit(). The first step is
1639 // to delete all the threads.
1640 //
1641 // Another way to do this would be to raise an exception in
1642 // the main thread, which we really should do because it gives
1643 // the program a chance to clean up. But how do we find the
1644 // main thread? It should presumably be the same one that
1645 // gets ^C exceptions, but that's all done on the Haskell side
1646 // (GHC.TopHandler).
1647 sched_state = SCHED_INTERRUPTING;
1648 goto delete_threads_and_gc;
1649 }
1650
1651 #ifdef SPARKBALANCE
1652 /* JB
1653 Once we are all together... this would be the place to balance all
1654 spark pools. No concurrent stealing or adding of new sparks can
1655 occur. Should be defined in Sparks.c. */
1656 balanceSparkPoolsCaps(n_capabilities, capabilities);
1657 #endif
1658
1659 #if defined(THREADED_RTS)
1660 if (gc_type == SYNC_GC_SEQ) {
1661 // release our stash of capabilities.
1662 releaseAllCapabilities(cap, task);
1663 }
1664 #endif
1665
1666 return cap;
1667 }
1668
1669 /* ---------------------------------------------------------------------------
1670 * Singleton fork(). Do not copy any running threads.
1671 * ------------------------------------------------------------------------- */
1672
1673 pid_t
1674 forkProcess(HsStablePtr *entry
1675 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1676 STG_UNUSED
1677 #endif
1678 )
1679 {
1680 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1681 pid_t pid;
1682 StgTSO* t,*next;
1683 Capability *cap;
1684 nat g;
1685 Task *task = NULL;
1686 nat i;
1687 #ifdef THREADED_RTS
1688 nat sync;
1689 #endif
1690
1691 debugTrace(DEBUG_sched, "forking!");
1692
1693 task = newBoundTask();
1694
1695 cap = NULL;
1696 waitForReturnCapability(&cap, task);
1697
1698 #ifdef THREADED_RTS
1699 do {
1700 sync = requestSync(&cap, task, SYNC_OTHER);
1701 } while (sync);
1702
1703 acquireAllCapabilities(cap,task);
1704
1705 pending_sync = 0;
1706 #endif
1707
1708 // no funny business: hold locks while we fork, otherwise if some
1709 // other thread is holding a lock when the fork happens, the data
1710 // structure protected by the lock will forever be in an
1711 // inconsistent state in the child. See also #1391.
1712 ACQUIRE_LOCK(&sched_mutex);
1713 ACQUIRE_LOCK(&sm_mutex);
1714 ACQUIRE_LOCK(&stable_mutex);
1715 ACQUIRE_LOCK(&task->lock);
1716
1717 for (i=0; i < n_capabilities; i++) {
1718 ACQUIRE_LOCK(&capabilities[i].lock);
1719 }
1720
1721 stopTimer(); // See #4074
1722
1723 #if defined(TRACING)
1724 flushEventLog(); // so that child won't inherit dirty file buffers
1725 #endif
1726
1727 pid = fork();
1728
1729 if (pid) { // parent
1730
1731 startTimer(); // #4074
1732
1733 RELEASE_LOCK(&sched_mutex);
1734 RELEASE_LOCK(&sm_mutex);
1735 RELEASE_LOCK(&stable_mutex);
1736 RELEASE_LOCK(&task->lock);
1737
1738 for (i=0; i < n_capabilities; i++) {
1739 releaseCapability_(&capabilities[i],rtsFalse);
1740 RELEASE_LOCK(&capabilities[i].lock);
1741 }
1742 boundTaskExiting(task);
1743
1744 // just return the pid
1745 return pid;
1746
1747 } else { // child
1748
1749 #if defined(THREADED_RTS)
1750 initMutex(&sched_mutex);
1751 initMutex(&sm_mutex);
1752 initMutex(&stable_mutex);
1753 initMutex(&task->lock);
1754
1755 for (i=0; i < n_capabilities; i++) {
1756 initMutex(&capabilities[i].lock);
1757 }
1758 #endif
1759
1760 #ifdef TRACING
1761 resetTracing();
1762 #endif
1763
1764 // Now, all OS threads except the thread that forked are
1765 // stopped. We need to stop all Haskell threads, including
1766 // those involved in foreign calls. Also we need to delete
1767 // all Tasks, because they correspond to OS threads that are
1768 // now gone.
1769
1770 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1771 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1772 next = t->global_link;
1773 // don't allow threads to catch the ThreadKilled
1774 // exception, but we do want to raiseAsync() because these
1775 // threads may be evaluating thunks that we need later.
1776 deleteThread_(t->cap,t);
1777
1778 // stop the GC from updating the InCall to point to
1779 // the TSO. This is only necessary because the
1780 // OSThread bound to the TSO has been killed, and
1781 // won't get a chance to exit in the usual way (see
1782 // also scheduleHandleThreadFinished).
1783 t->bound = NULL;
1784 }
1785 }
1786
1787 discardTasksExcept(task);
1788
1789 for (i=0; i < n_capabilities; i++) {
1790 cap = &capabilities[i];
1791
1792 // Empty the run queue. It seems tempting to let all the
1793 // killed threads stay on the run queue as zombies to be
1794 // cleaned up later, but some of them may correspond to
1795 // bound threads for which the corresponding Task does not
1796 // exist.
1797 cap->run_queue_hd = END_TSO_QUEUE;
1798 cap->run_queue_tl = END_TSO_QUEUE;
1799
1800 // Any suspended C-calling Tasks are no more, their OS threads
1801 // don't exist now:
1802 cap->suspended_ccalls = NULL;
1803
1804 #if defined(THREADED_RTS)
1805 // Wipe our spare workers list, they no longer exist. New
1806 // workers will be created if necessary.
1807 cap->spare_workers = NULL;
1808 cap->n_spare_workers = 0;
1809 cap->returning_tasks_hd = NULL;
1810 cap->returning_tasks_tl = NULL;
1811 #endif
1812
1813 // Release all caps except 0, we'll use that for starting
1814 // the IO manager and running the client action below.
1815 if (cap->no != 0) {
1816 task->cap = cap;
1817 releaseCapability(cap);
1818 }
1819 }
1820 cap = &capabilities[0];
1821 task->cap = cap;
1822
1823 // Empty the threads lists. Otherwise, the garbage
1824 // collector may attempt to resurrect some of these threads.
1825 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1826 generations[g].threads = END_TSO_QUEUE;
1827 }
1828
1829 // On Unix, all timers are reset in the child, so we need to start
1830 // the timer again.
1831 initTimer();
1832 startTimer();
1833
1834 #if defined(THREADED_RTS)
1835 ioManagerStartCap(&cap);
1836 #endif
1837
1838 rts_evalStableIO(&cap, entry, NULL); // run the action
1839 rts_checkSchedStatus("forkProcess",cap);
1840
1841 rts_unlock(cap);
1842 hs_exit(); // clean up and exit
1843 stg_exit(EXIT_SUCCESS);
1844 }
1845 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1846 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1847 #endif
1848 }
1849
1850 /* ---------------------------------------------------------------------------
1851 * Increase the number of Capabilities
1852 *
1853 * Changing the number of Capabilities is very tricky! We can only do
1854 * it with the system fully stopped, so we do a full sync with
1855 * requestSync(SYNC_OTHER) and grab all the capabilities.
1856 *
1857 * Then we resize the appropriate data structures, and update all
1858 * references to the old data structures which have now moved.
1859 * Finally we release the Capabilities we are holding, and start
1860 * worker Tasks on the new Capabilities we created.
1861 *
1862 * ------------------------------------------------------------------------- */
1863
1864 void
1865 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1866 {
1867 #if !defined(THREADED_RTS)
1868
1869 barf("setNumCapabilities: not supported in the non-threaded RTS");
1870
1871 #else
1872 Task *task;
1873 Capability *cap;
1874 nat sync;
1875 StgTSO* t;
1876 nat g;
1877 Capability *old_capabilities;
1878
1879 if (new_n_capabilities == n_capabilities) return;
1880
1881 if (new_n_capabilities < n_capabilities) {
1882 barf("setNumCapabilities: reducing the number of Capabilities is not currently supported.");
1883 }
1884
1885 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1886 n_capabilities, new_n_capabilities);
1887
1888 cap = rts_lock();
1889 task = cap->running_task;
1890
1891 do {
1892 sync = requestSync(&cap, task, SYNC_OTHER);
1893 } while (sync);
1894
1895 acquireAllCapabilities(cap,task);
1896
1897 pending_sync = 0;
1898
1899 #if defined(TRACING)
1900 // Allocate eventlog buffers for the new capabilities. Note this
1901 // must be done before calling moreCapabilities(), because that
1902 // will emit events to add the new capabilities to capsets.
1903 tracingAddCapapilities(n_capabilities, new_n_capabilities);
1904 #endif
1905
1906 // Resize the capabilities array
1907 // NB. after this, capabilities points somewhere new. Any pointers
1908 // of type (Capability *) are now invalid.
1909 old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
1910
1911 // update our own cap pointer
1912 cap = &capabilities[cap->no];
1913
1914 // Resize and update storage manager data structures
1915 storageAddCapabilities(n_capabilities, new_n_capabilities);
1916
1917 // Update (Capability *) refs in the Task manager.
1918 updateCapabilityRefs();
1919
1920 // Update (Capability *) refs from TSOs
1921 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1922 for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
1923 t->cap = &capabilities[t->cap->no];
1924 }
1925 }
1926
1927 // We're done: release the original Capabilities
1928 releaseAllCapabilities(cap,task);
1929
1930 // Start worker tasks on the new Capabilities
1931 startWorkerTasks(n_capabilities, new_n_capabilities);
1932
1933 // finally, update n_capabilities
1934 n_capabilities = new_n_capabilities;
1935
1936 // We can't free the old array until now, because we access it
1937 // while updating pointers in updateCapabilityRefs().
1938 if (old_capabilities) {
1939 stgFree(old_capabilities);
1940 }
1941
1942 rts_unlock(cap);
1943
1944 #endif // THREADED_RTS
1945 }
1946
1947
1948
1949 /* ---------------------------------------------------------------------------
1950 * Delete all the threads in the system
1951 * ------------------------------------------------------------------------- */
1952
1953 static void
1954 deleteAllThreads ( Capability *cap )
1955 {
1956 // NOTE: only safe to call if we own all capabilities.
1957
1958 StgTSO* t, *next;
1959 nat g;
1960
1961 debugTrace(DEBUG_sched,"deleting all threads");
1962 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1963 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1964 next = t->global_link;
1965 deleteThread(cap,t);
1966 }
1967 }
1968
1969 // The run queue now contains a bunch of ThreadKilled threads. We
1970 // must not throw these away: the main thread(s) will be in there
1971 // somewhere, and the main scheduler loop has to deal with it.
1972 // Also, the run queue is the only thing keeping these threads from
1973 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1974
1975 #if !defined(THREADED_RTS)
1976 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1977 ASSERT(sleeping_queue == END_TSO_QUEUE);
1978 #endif
1979 }
1980
1981 /* -----------------------------------------------------------------------------
1982 Managing the suspended_ccalls list.
1983 Locks required: sched_mutex
1984 -------------------------------------------------------------------------- */
1985
1986 STATIC_INLINE void
1987 suspendTask (Capability *cap, Task *task)
1988 {
1989 InCall *incall;
1990
1991 incall = task->incall;
1992 ASSERT(incall->next == NULL && incall->prev == NULL);
1993 incall->next = cap->suspended_ccalls;
1994 incall->prev = NULL;
1995 if (cap->suspended_ccalls) {
1996 cap->suspended_ccalls->prev = incall;
1997 }
1998 cap->suspended_ccalls = incall;
1999 }
2000
2001 STATIC_INLINE void
2002 recoverSuspendedTask (Capability *cap, Task *task)
2003 {
2004 InCall *incall;
2005
2006 incall = task->incall;
2007 if (incall->prev) {
2008 incall->prev->next = incall->next;
2009 } else {
2010 ASSERT(cap->suspended_ccalls == incall);
2011 cap->suspended_ccalls = incall->next;
2012 }
2013 if (incall->next) {
2014 incall->next->prev = incall->prev;
2015 }
2016 incall->next = incall->prev = NULL;
2017 }
2018
2019 /* ---------------------------------------------------------------------------
2020 * Suspending & resuming Haskell threads.
2021 *
2022 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2023 * its capability before calling the C function. This allows another
2024 * task to pick up the capability and carry on running Haskell
2025 * threads. It also means that if the C call blocks, it won't lock
2026 * the whole system.
2027 *
2028 * The Haskell thread making the C call is put to sleep for the
2029 * duration of the call, on the suspended_ccalling_threads queue. We
2030 * give out a token to the task, which it can use to resume the thread
2031 * on return from the C function.
2032 *
2033 * If this is an interruptible C call, this means that the FFI call may be
2034 * unceremoniously terminated and should be scheduled on an
2035 * unbound worker thread.
2036 * ------------------------------------------------------------------------- */
2037
2038 void *
2039 suspendThread (StgRegTable *reg, rtsBool interruptible)
2040 {
2041 Capability *cap;
2042 int saved_errno;
2043 StgTSO *tso;
2044 Task *task;
2045 #if mingw32_HOST_OS
2046 StgWord32 saved_winerror;
2047 #endif
2048
2049 saved_errno = errno;
2050 #if mingw32_HOST_OS
2051 saved_winerror = GetLastError();
2052 #endif
2053
2054 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2055 */
2056 cap = regTableToCapability(reg);
2057
2058 task = cap->running_task;
2059 tso = cap->r.rCurrentTSO;
2060
2061 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2062
2063 // XXX this might not be necessary --SDM
2064 tso->what_next = ThreadRunGHC;
2065
2066 threadPaused(cap,tso);
2067
2068 if (interruptible) {
2069 tso->why_blocked = BlockedOnCCall_Interruptible;
2070 } else {
2071 tso->why_blocked = BlockedOnCCall;
2072 }
2073
2074 // Hand back capability
2075 task->incall->suspended_tso = tso;
2076 task->incall->suspended_cap = cap;
2077
2078 ACQUIRE_LOCK(&cap->lock);
2079
2080 suspendTask(cap,task);
2081 cap->in_haskell = rtsFalse;
2082 releaseCapability_(cap,rtsFalse);
2083
2084 RELEASE_LOCK(&cap->lock);
2085
2086 errno = saved_errno;
2087 #if mingw32_HOST_OS
2088 SetLastError(saved_winerror);
2089 #endif
2090 return task;
2091 }
2092
2093 StgRegTable *
2094 resumeThread (void *task_)
2095 {
2096 StgTSO *tso;
2097 InCall *incall;
2098 Capability *cap;
2099 Task *task = task_;
2100 int saved_errno;
2101 #if mingw32_HOST_OS
2102 StgWord32 saved_winerror;
2103 #endif
2104
2105 saved_errno = errno;
2106 #if mingw32_HOST_OS
2107 saved_winerror = GetLastError();
2108 #endif
2109
2110 incall = task->incall;
2111 cap = incall->suspended_cap;
2112 task->cap = cap;
2113
2114 // Wait for permission to re-enter the RTS with the result.
2115 waitForReturnCapability(&cap,task);
2116 // we might be on a different capability now... but if so, our
2117 // entry on the suspended_ccalls list will also have been
2118 // migrated.
2119
2120 // Remove the thread from the suspended list
2121 recoverSuspendedTask(cap,task);
2122
2123 tso = incall->suspended_tso;
2124 incall->suspended_tso = NULL;
2125 incall->suspended_cap = NULL;
2126 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2127
2128 traceEventRunThread(cap, tso);
2129
2130 /* Reset blocking status */
2131 tso->why_blocked = NotBlocked;
2132
2133 if ((tso->flags & TSO_BLOCKEX) == 0) {
2134 // avoid locking the TSO if we don't have to
2135 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2136 maybePerformBlockedException(cap,tso);
2137 }
2138 }
2139
2140 cap->r.rCurrentTSO = tso;
2141 cap->in_haskell = rtsTrue;
2142 errno = saved_errno;
2143 #if mingw32_HOST_OS
2144 SetLastError(saved_winerror);
2145 #endif
2146
2147 /* We might have GC'd, mark the TSO dirty again */
2148 dirty_TSO(cap,tso);
2149 dirty_STACK(cap,tso->stackobj);
2150
2151 IF_DEBUG(sanity, checkTSO(tso));
2152
2153 return &cap->r;
2154 }
2155
2156 /* ---------------------------------------------------------------------------
2157 * scheduleThread()
2158 *
2159 * scheduleThread puts a thread on the end of the runnable queue.
2160 * This will usually be done immediately after a thread is created.
2161 * The caller of scheduleThread must create the thread using e.g.
2162 * createThread and push an appropriate closure
2163 * on this thread's stack before the scheduler is invoked.
2164 * ------------------------------------------------------------------------ */
2165
2166 void
2167 scheduleThread(Capability *cap, StgTSO *tso)
2168 {
2169 // The thread goes at the *end* of the run-queue, to avoid possible
2170 // starvation of any threads already on the queue.
2171 appendToRunQueue(cap,tso);
2172 }
2173
2174 void
2175 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2176 {
2177 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2178 // move this thread from now on.
2179 #if defined(THREADED_RTS)
2180 cpu %= n_capabilities;
2181 if (cpu == cap->no) {
2182 appendToRunQueue(cap,tso);
2183 } else {
2184 migrateThread(cap, tso, &capabilities[cpu]);
2185 }
2186 #else
2187 appendToRunQueue(cap,tso);
2188 #endif
2189 }
2190
2191 void
2192 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2193 {
2194 Task *task;
2195 DEBUG_ONLY( StgThreadID id );
2196 Capability *cap;
2197
2198 cap = *pcap;
2199
2200 // We already created/initialised the Task
2201 task = cap->running_task;
2202
2203 // This TSO is now a bound thread; make the Task and TSO
2204 // point to each other.
2205 tso->bound = task->incall;
2206 tso->cap = cap;
2207
2208 task->incall->tso = tso;
2209 task->incall->ret = ret;
2210 task->incall->stat = NoStatus;
2211
2212 appendToRunQueue(cap,tso);
2213
2214 DEBUG_ONLY( id = tso->id );
2215 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2216
2217 cap = schedule(cap,task);
2218
2219 ASSERT(task->incall->stat != NoStatus);
2220 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2221
2222 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2223 *pcap = cap;
2224 }
2225
2226 /* ----------------------------------------------------------------------------
2227 * Starting Tasks
2228 * ------------------------------------------------------------------------- */
2229
2230 #if defined(THREADED_RTS)
2231 void scheduleWorker (Capability *cap, Task *task)
2232 {
2233 // schedule() runs without a lock.
2234 cap = schedule(cap,task);
2235
2236 // On exit from schedule(), we have a Capability, but possibly not
2237 // the same one we started with.
2238
2239 // During shutdown, the requirement is that after all the
2240 // Capabilities are shut down, all workers that are shutting down
2241 // have finished workerTaskStop(). This is why we hold on to
2242 // cap->lock until we've finished workerTaskStop() below.
2243 //
2244 // There may be workers still involved in foreign calls; those
2245 // will just block in waitForReturnCapability() because the
2246 // Capability has been shut down.
2247 //
2248 ACQUIRE_LOCK(&cap->lock);
2249 releaseCapability_(cap,rtsFalse);
2250 workerTaskStop(task);
2251 RELEASE_LOCK(&cap->lock);
2252 }
2253 #endif
2254
2255 /* ---------------------------------------------------------------------------
2256 * Start new worker tasks on Capabilities from--to
2257 * -------------------------------------------------------------------------- */
2258
2259 static void
2260 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2261 {
2262 #if defined(THREADED_RTS)
2263 nat i;
2264 Capability *cap;
2265
2266 for (i = from; i < to; i++) {
2267 cap = &capabilities[i];
2268 ACQUIRE_LOCK(&cap->lock);
2269 startWorkerTask(cap);
2270 RELEASE_LOCK(&cap->lock);
2271 }
2272 #endif
2273 }
2274
2275 /* ---------------------------------------------------------------------------
2276 * initScheduler()
2277 *
2278 * Initialise the scheduler. This resets all the queues - if the
2279 * queues contained any threads, they'll be garbage collected at the
2280 * next pass.
2281 *
2282 * ------------------------------------------------------------------------ */
2283
2284 void
2285 initScheduler(void)
2286 {
2287 #if !defined(THREADED_RTS)
2288 blocked_queue_hd = END_TSO_QUEUE;
2289 blocked_queue_tl = END_TSO_QUEUE;
2290 sleeping_queue = END_TSO_QUEUE;
2291 #endif
2292
2293 sched_state = SCHED_RUNNING;
2294 recent_activity = ACTIVITY_YES;
2295
2296 #if defined(THREADED_RTS)
2297 /* Initialise the mutex and condition variables used by
2298 * the scheduler. */
2299 initMutex(&sched_mutex);
2300 #endif
2301
2302 ACQUIRE_LOCK(&sched_mutex);
2303
2304 /* A capability holds the state a native thread needs in
2305 * order to execute STG code. At least one capability is
2306 * floating around (only THREADED_RTS builds have more than one).
2307 */
2308 initCapabilities();
2309
2310 initTaskManager();
2311
2312 /*
2313 * Eagerly start one worker to run each Capability, except for
2314 * Capability 0. The idea is that we're probably going to start a
2315 * bound thread on Capability 0 pretty soon, so we don't want a
2316 * worker task hogging it.
2317 */
2318 startWorkerTasks(1, n_capabilities);
2319
2320 RELEASE_LOCK(&sched_mutex);
2321
2322 }
2323
2324 void
2325 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2326 /* see Capability.c, shutdownCapability() */
2327 {
2328 Task *task = NULL;
2329
2330 task = newBoundTask();
2331
2332 // If we haven't killed all the threads yet, do it now.
2333 if (sched_state < SCHED_SHUTTING_DOWN) {
2334 sched_state = SCHED_INTERRUPTING;
2335 waitForReturnCapability(&task->cap,task);
2336 scheduleDoGC(task->cap,task,rtsFalse);
2337 ASSERT(task->incall->tso == NULL);
2338 releaseCapability(task->cap);
2339 }
2340 sched_state = SCHED_SHUTTING_DOWN;
2341
2342 shutdownCapabilities(task, wait_foreign);
2343
2344 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2345 // n_failed_trygrab_idles, n_idle_caps);
2346
2347 boundTaskExiting(task);
2348 }
2349
2350 void
2351 freeScheduler( void )
2352 {
2353 nat still_running;
2354
2355 ACQUIRE_LOCK(&sched_mutex);
2356 still_running = freeTaskManager();
2357 // We can only free the Capabilities if there are no Tasks still
2358 // running. We might have a Task about to return from a foreign
2359 // call into waitForReturnCapability(), for example (actually,
2360 // this should be the *only* thing that a still-running Task can
2361 // do at this point, and it will block waiting for the
2362 // Capability).
2363 if (still_running == 0) {
2364 freeCapabilities();
2365 if (n_capabilities != 1) {
2366 stgFree(capabilities);
2367 }
2368 }
2369 RELEASE_LOCK(&sched_mutex);
2370 #if defined(THREADED_RTS)
2371 closeMutex(&sched_mutex);
2372 #endif
2373 }
2374
2375 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2376 void *user USED_IF_NOT_THREADS)
2377 {
2378 #if !defined(THREADED_RTS)
2379 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2380 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2381 evac(user, (StgClosure **)(void *)&sleeping_queue);
2382 #endif
2383 }
2384
2385 /* -----------------------------------------------------------------------------
2386 performGC
2387
2388 This is the interface to the garbage collector from Haskell land.
2389 We provide this so that external C code can allocate and garbage
2390 collect when called from Haskell via _ccall_GC.
2391 -------------------------------------------------------------------------- */
2392
2393 static void
2394 performGC_(rtsBool force_major)
2395 {
2396 Task *task;
2397
2398 // We must grab a new Task here, because the existing Task may be
2399 // associated with a particular Capability, and chained onto the
2400 // suspended_ccalls queue.
2401 task = newBoundTask();
2402
2403 waitForReturnCapability(&task->cap,task);
2404 scheduleDoGC(task->cap,task,force_major);
2405 releaseCapability(task->cap);
2406 boundTaskExiting(task);
2407 }
2408
2409 void
2410 performGC(void)
2411 {
2412 performGC_(rtsFalse);
2413 }
2414
2415 void
2416 performMajorGC(void)
2417 {
2418 performGC_(rtsTrue);
2419 }
2420
2421 /* ---------------------------------------------------------------------------
2422 Interrupt execution
2423 - usually called inside a signal handler so it mustn't do anything fancy.
2424 ------------------------------------------------------------------------ */
2425
2426 void
2427 interruptStgRts(void)
2428 {
2429 sched_state = SCHED_INTERRUPTING;
2430 interruptAllCapabilities();
2431 #if defined(THREADED_RTS)
2432 wakeUpRts();
2433 #endif
2434 }
2435
2436 /* -----------------------------------------------------------------------------
2437 Wake up the RTS
2438
2439 This function causes at least one OS thread to wake up and run the
2440 scheduler loop. It is invoked when the RTS might be deadlocked, or
2441 an external event has arrived that may need servicing (eg. a
2442 keyboard interrupt).
2443
2444 In the single-threaded RTS we don't do anything here; we only have
2445 one thread anyway, and the event that caused us to want to wake up
2446 will have interrupted any blocking system call in progress anyway.
2447 -------------------------------------------------------------------------- */
2448
2449 #if defined(THREADED_RTS)
2450 void wakeUpRts(void)
2451 {
2452 // This forces the IO Manager thread to wakeup, which will
2453 // in turn ensure that some OS thread wakes up and runs the
2454 // scheduler loop, which will cause a GC and deadlock check.
2455 ioManagerWakeup();
2456 }
2457 #endif
2458
2459 /* -----------------------------------------------------------------------------
2460 Deleting threads
2461
2462 This is used for interruption (^C) and forking, and corresponds to
2463 raising an exception but without letting the thread catch the
2464 exception.
2465 -------------------------------------------------------------------------- */
2466
2467 static void
2468 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2469 {
2470 // NOTE: must only be called on a TSO that we have exclusive
2471 // access to, because we will call throwToSingleThreaded() below.
2472 // The TSO must be on the run queue of the Capability we own, or
2473 // we must own all Capabilities.
2474
2475 if (tso->why_blocked != BlockedOnCCall &&
2476 tso->why_blocked != BlockedOnCCall_Interruptible) {
2477 throwToSingleThreaded(tso->cap,tso,NULL);
2478 }
2479 }
2480
2481 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2482 static void
2483 deleteThread_(Capability *cap, StgTSO *tso)
2484 { // for forkProcess only:
2485 // like deleteThread(), but we delete threads in foreign calls, too.
2486
2487 if (tso->why_blocked == BlockedOnCCall ||
2488 tso->why_blocked == BlockedOnCCall_Interruptible) {
2489 tso->what_next = ThreadKilled;
2490 appendToRunQueue(tso->cap, tso);
2491 } else {
2492 deleteThread(cap,tso);
2493 }
2494 }
2495 #endif
2496
2497 /* -----------------------------------------------------------------------------
2498 raiseExceptionHelper
2499
2500 This function is called by the raise# primitve, just so that we can
2501 move some of the tricky bits of raising an exception from C-- into
2502 C. Who knows, it might be a useful re-useable thing here too.
2503 -------------------------------------------------------------------------- */
2504
2505 StgWord
2506 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2507 {
2508 Capability *cap = regTableToCapability(reg);
2509 StgThunk *raise_closure = NULL;
2510 StgPtr p, next;
2511 StgRetInfoTable *info;
2512 //
2513 // This closure represents the expression 'raise# E' where E
2514 // is the exception raise. It is used to overwrite all the
2515 // thunks which are currently under evaluataion.
2516 //
2517
2518 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2519 // LDV profiling: stg_raise_info has THUNK as its closure
2520 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2521 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2522 // 1 does not cause any problem unless profiling is performed.
2523 // However, when LDV profiling goes on, we need to linearly scan
2524 // small object pool, where raise_closure is stored, so we should
2525 // use MIN_UPD_SIZE.
2526 //
2527 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2528 // sizeofW(StgClosure)+1);
2529 //
2530
2531 //
2532 // Walk up the stack, looking for the catch frame. On the way,
2533 // we update any closures pointed to from update frames with the
2534 // raise closure that we just built.
2535 //
2536 p = tso->stackobj->sp;
2537 while(1) {
2538 info = get_ret_itbl((StgClosure *)p);
2539 next = p + stack_frame_sizeW((StgClosure *)p);
2540 switch (info->i.type) {
2541
2542 case UPDATE_FRAME:
2543 // Only create raise_closure if we need to.
2544 if (raise_closure == NULL) {
2545 raise_closure =
2546 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2547 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2548 raise_closure->payload[0] = exception;
2549 }
2550 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2551 (StgClosure *)raise_closure);
2552 p = next;
2553 continue;
2554
2555 case ATOMICALLY_FRAME:
2556 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2557 tso->stackobj->sp = p;
2558 return ATOMICALLY_FRAME;
2559
2560 case CATCH_FRAME:
2561 tso->stackobj->sp = p;
2562 return CATCH_FRAME;
2563
2564 case CATCH_STM_FRAME:
2565 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2566 tso->stackobj->sp = p;
2567 return CATCH_STM_FRAME;
2568
2569 case UNDERFLOW_FRAME:
2570 tso->stackobj->sp = p;
2571 threadStackUnderflow(cap,tso);
2572 p = tso->stackobj->sp;
2573 continue;
2574
2575 case STOP_FRAME:
2576 tso->stackobj->sp = p;
2577 return STOP_FRAME;
2578
2579 case CATCH_RETRY_FRAME:
2580 default:
2581 p = next;
2582 continue;
2583 }
2584 }
2585 }
2586
2587
2588 /* -----------------------------------------------------------------------------
2589 findRetryFrameHelper
2590
2591 This function is called by the retry# primitive. It traverses the stack
2592 leaving tso->sp referring to the frame which should handle the retry.
2593
2594 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2595 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2596
2597 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2598 create) because retries are not considered to be exceptions, despite the
2599 similar implementation.
2600
2601 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2602 not be created within memory transactions.
2603 -------------------------------------------------------------------------- */
2604
2605 StgWord
2606 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2607 {
2608 StgPtr p, next;
2609 StgRetInfoTable *info;
2610
2611 p = tso->stackobj->sp;
2612 while (1) {
2613 info = get_ret_itbl((StgClosure *)p);
2614 next = p + stack_frame_sizeW((StgClosure *)p);
2615 switch (info->i.type) {
2616
2617 case ATOMICALLY_FRAME:
2618 debugTrace(DEBUG_stm,
2619 "found ATOMICALLY_FRAME at %p during retry", p);
2620 tso->stackobj->sp = p;
2621 return ATOMICALLY_FRAME;
2622
2623 case CATCH_RETRY_FRAME:
2624 debugTrace(DEBUG_stm,
2625 "found CATCH_RETRY_FRAME at %p during retrry", p);
2626 tso->stackobj->sp = p;
2627 return CATCH_RETRY_FRAME;
2628
2629 case CATCH_STM_FRAME: {
2630 StgTRecHeader *trec = tso -> trec;
2631 StgTRecHeader *outer = trec -> enclosing_trec;
2632 debugTrace(DEBUG_stm,
2633 "found CATCH_STM_FRAME at %p during retry", p);
2634 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2635 stmAbortTransaction(cap, trec);
2636 stmFreeAbortedTRec(cap, trec);
2637 tso -> trec = outer;
2638 p = next;
2639 continue;
2640 }
2641
2642 case UNDERFLOW_FRAME:
2643 threadStackUnderflow(cap,tso);
2644 p = tso->stackobj->sp;
2645 continue;
2646
2647 default:
2648 ASSERT(info->i.type != CATCH_FRAME);
2649 ASSERT(info->i.type != STOP_FRAME);
2650 p = next;
2651 continue;
2652 }
2653 }
2654 }
2655
2656 /* -----------------------------------------------------------------------------
2657 resurrectThreads is called after garbage collection on the list of
2658 threads found to be garbage. Each of these threads will be woken
2659 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2660 on an MVar, or NonTermination if the thread was blocked on a Black
2661 Hole.
2662
2663 Locks: assumes we hold *all* the capabilities.
2664 -------------------------------------------------------------------------- */
2665
2666 void
2667 resurrectThreads (StgTSO *threads)
2668 {
2669 StgTSO *tso, *next;
2670 Capability *cap;
2671 generation *gen;
2672
2673 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2674 next = tso->global_link;
2675
2676 gen = Bdescr((P_)tso)->gen;
2677 tso->global_link = gen->threads;
2678 gen->threads = tso;
2679
2680 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2681
2682 // Wake up the thread on the Capability it was last on
2683 cap = tso->cap;
2684
2685 switch (tso->why_blocked) {
2686 case BlockedOnMVar:
2687 /* Called by GC - sched_mutex lock is currently held. */
2688 throwToSingleThreaded(cap, tso,
2689 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2690 break;
2691 case BlockedOnBlackHole:
2692 throwToSingleThreaded(cap, tso,
2693 (StgClosure *)nonTermination_closure);
2694 break;
2695 case BlockedOnSTM:
2696 throwToSingleThreaded(cap, tso,
2697 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2698 break;
2699 case NotBlocked:
2700 /* This might happen if the thread was blocked on a black hole
2701 * belonging to a thread that we've just woken up (raiseAsync
2702 * can wake up threads, remember...).
2703 */
2704 continue;
2705 case BlockedOnMsgThrowTo:
2706 // This can happen if the target is masking, blocks on a
2707 // black hole, and then is found to be unreachable. In
2708 // this case, we want to let the target wake up and carry
2709 // on, and do nothing to this thread.
2710 continue;
2711 default:
2712 barf("resurrectThreads: thread blocked in a strange way: %d",
2713 tso->why_blocked);
2714 }
2715 }
2716 }