Allow the number of capabilities to be increased at runtime (#3729)
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "Sparks.h"
31 #include "Capability.h"
32 #include "Task.h"
33 #include "AwaitEvent.h"
34 #if defined(mingw32_HOST_OS)
35 #include "win32/IOManager.h"
36 #endif
37 #include "Trace.h"
38 #include "RaiseAsync.h"
39 #include "Threads.h"
40 #include "Timer.h"
41 #include "ThreadPaused.h"
42 #include "Messages.h"
43 #include "Stable.h"
44
45 #ifdef HAVE_SYS_TYPES_H
46 #include <sys/types.h>
47 #endif
48 #ifdef HAVE_UNISTD_H
49 #include <unistd.h>
50 #endif
51
52 #include <string.h>
53 #include <stdlib.h>
54 #include <stdarg.h>
55
56 #ifdef HAVE_ERRNO_H
57 #include <errno.h>
58 #endif
59
60 #ifdef TRACING
61 #include "eventlog/EventLog.h"
62 #endif
63 /* -----------------------------------------------------------------------------
64 * Global variables
65 * -------------------------------------------------------------------------- */
66
67 #if !defined(THREADED_RTS)
68 // Blocked/sleeping thrads
69 StgTSO *blocked_queue_hd = NULL;
70 StgTSO *blocked_queue_tl = NULL;
71 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
72 #endif
73
74 /* Set to true when the latest garbage collection failed to reclaim
75 * enough space, and the runtime should proceed to shut itself down in
76 * an orderly fashion (emitting profiling info etc.)
77 */
78 rtsBool heap_overflow = rtsFalse;
79
80 /* flag that tracks whether we have done any execution in this time slice.
81 * LOCK: currently none, perhaps we should lock (but needs to be
82 * updated in the fast path of the scheduler).
83 *
84 * NB. must be StgWord, we do xchg() on it.
85 */
86 volatile StgWord recent_activity = ACTIVITY_YES;
87
88 /* if this flag is set as well, give up execution
89 * LOCK: none (changes monotonically)
90 */
91 volatile StgWord sched_state = SCHED_RUNNING;
92
93 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
94 * exists - earlier gccs apparently didn't.
95 * -= chak
96 */
97 StgTSO dummy_tso;
98
99 /*
100 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
101 * in an MT setting, needed to signal that a worker thread shouldn't hang around
102 * in the scheduler when it is out of work.
103 */
104 rtsBool shutting_down_scheduler = rtsFalse;
105
106 /*
107 * This mutex protects most of the global scheduler data in
108 * the THREADED_RTS runtime.
109 */
110 #if defined(THREADED_RTS)
111 Mutex sched_mutex;
112 #endif
113
114 #if !defined(mingw32_HOST_OS)
115 #define FORKPROCESS_PRIMOP_SUPPORTED
116 #endif
117
118 /* -----------------------------------------------------------------------------
119 * static function prototypes
120 * -------------------------------------------------------------------------- */
121
122 static Capability *schedule (Capability *initialCapability, Task *task);
123
124 //
125 // These function all encapsulate parts of the scheduler loop, and are
126 // abstracted only to make the structure and control flow of the
127 // scheduler clearer.
128 //
129 static void schedulePreLoop (void);
130 static void scheduleFindWork (Capability *cap);
131 #if defined(THREADED_RTS)
132 static void scheduleYield (Capability **pcap, Task *task);
133 #endif
134 #if defined(THREADED_RTS)
135 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
136 static void acquireAllCapabilities(Capability *cap, Task *task);
137 static void releaseAllCapabilities(Capability *cap, Task *task);
138 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
139 #endif
140 static void scheduleStartSignalHandlers (Capability *cap);
141 static void scheduleCheckBlockedThreads (Capability *cap);
142 static void scheduleProcessInbox(Capability *cap);
143 static void scheduleDetectDeadlock (Capability *cap, Task *task);
144 static void schedulePushWork(Capability *cap, Task *task);
145 #if defined(THREADED_RTS)
146 static void scheduleActivateSpark(Capability *cap);
147 #endif
148 static void schedulePostRunThread(Capability *cap, StgTSO *t);
149 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
150 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
151 nat prev_what_next );
152 static void scheduleHandleThreadBlocked( StgTSO *t );
153 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
154 StgTSO *t );
155 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
156 static Capability *scheduleDoGC(Capability *cap, Task *task,
157 rtsBool force_major);
158
159 static void deleteThread (Capability *cap, StgTSO *tso);
160 static void deleteAllThreads (Capability *cap);
161
162 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
163 static void deleteThread_(Capability *cap, StgTSO *tso);
164 #endif
165
166 /* ---------------------------------------------------------------------------
167 Main scheduling loop.
168
169 We use round-robin scheduling, each thread returning to the
170 scheduler loop when one of these conditions is detected:
171
172 * out of heap space
173 * timer expires (thread yields)
174 * thread blocks
175 * thread ends
176 * stack overflow
177
178 GRAN version:
179 In a GranSim setup this loop iterates over the global event queue.
180 This revolves around the global event queue, which determines what
181 to do next. Therefore, it's more complicated than either the
182 concurrent or the parallel (GUM) setup.
183 This version has been entirely removed (JB 2008/08).
184
185 GUM version:
186 GUM iterates over incoming messages.
187 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
188 and sends out a fish whenever it has nothing to do; in-between
189 doing the actual reductions (shared code below) it processes the
190 incoming messages and deals with delayed operations
191 (see PendingFetches).
192 This is not the ugliest code you could imagine, but it's bloody close.
193
194 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
195 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
196 as well as future GUM versions. This file has been refurbished to
197 only contain valid code, which is however incomplete, refers to
198 invalid includes etc.
199
200 ------------------------------------------------------------------------ */
201
202 static Capability *
203 schedule (Capability *initialCapability, Task *task)
204 {
205 StgTSO *t;
206 Capability *cap;
207 StgThreadReturnCode ret;
208 nat prev_what_next;
209 rtsBool ready_to_gc;
210 #if defined(THREADED_RTS)
211 rtsBool first = rtsTrue;
212 #endif
213
214 cap = initialCapability;
215
216 // Pre-condition: this task owns initialCapability.
217 // The sched_mutex is *NOT* held
218 // NB. on return, we still hold a capability.
219
220 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
221
222 schedulePreLoop();
223
224 // -----------------------------------------------------------
225 // Scheduler loop starts here:
226
227 while (1) {
228
229 // Check whether we have re-entered the RTS from Haskell without
230 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
231 // call).
232 if (cap->in_haskell) {
233 errorBelch("schedule: re-entered unsafely.\n"
234 " Perhaps a 'foreign import unsafe' should be 'safe'?");
235 stg_exit(EXIT_FAILURE);
236 }
237
238 // The interruption / shutdown sequence.
239 //
240 // In order to cleanly shut down the runtime, we want to:
241 // * make sure that all main threads return to their callers
242 // with the state 'Interrupted'.
243 // * clean up all OS threads assocated with the runtime
244 // * free all memory etc.
245 //
246 // So the sequence for ^C goes like this:
247 //
248 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
249 // arranges for some Capability to wake up
250 //
251 // * all threads in the system are halted, and the zombies are
252 // placed on the run queue for cleaning up. We acquire all
253 // the capabilities in order to delete the threads, this is
254 // done by scheduleDoGC() for convenience (because GC already
255 // needs to acquire all the capabilities). We can't kill
256 // threads involved in foreign calls.
257 //
258 // * somebody calls shutdownHaskell(), which calls exitScheduler()
259 //
260 // * sched_state := SCHED_SHUTTING_DOWN
261 //
262 // * all workers exit when the run queue on their capability
263 // drains. All main threads will also exit when their TSO
264 // reaches the head of the run queue and they can return.
265 //
266 // * eventually all Capabilities will shut down, and the RTS can
267 // exit.
268 //
269 // * We might be left with threads blocked in foreign calls,
270 // we should really attempt to kill these somehow (TODO);
271
272 switch (sched_state) {
273 case SCHED_RUNNING:
274 break;
275 case SCHED_INTERRUPTING:
276 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
277 /* scheduleDoGC() deletes all the threads */
278 cap = scheduleDoGC(cap,task,rtsFalse);
279
280 // after scheduleDoGC(), we must be shutting down. Either some
281 // other Capability did the final GC, or we did it above,
282 // either way we can fall through to the SCHED_SHUTTING_DOWN
283 // case now.
284 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
285 // fall through
286
287 case SCHED_SHUTTING_DOWN:
288 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
289 // If we are a worker, just exit. If we're a bound thread
290 // then we will exit below when we've removed our TSO from
291 // the run queue.
292 if (!isBoundTask(task) && emptyRunQueue(cap)) {
293 return cap;
294 }
295 break;
296 default:
297 barf("sched_state: %d", sched_state);
298 }
299
300 scheduleFindWork(cap);
301
302 /* work pushing, currently relevant only for THREADED_RTS:
303 (pushes threads, wakes up idle capabilities for stealing) */
304 schedulePushWork(cap,task);
305
306 scheduleDetectDeadlock(cap,task);
307
308 #if defined(THREADED_RTS)
309 cap = task->cap; // reload cap, it might have changed
310 #endif
311
312 // Normally, the only way we can get here with no threads to
313 // run is if a keyboard interrupt received during
314 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
315 // Additionally, it is not fatal for the
316 // threaded RTS to reach here with no threads to run.
317 //
318 // win32: might be here due to awaitEvent() being abandoned
319 // as a result of a console event having been delivered.
320
321 #if defined(THREADED_RTS)
322 if (first)
323 {
324 // XXX: ToDo
325 // // don't yield the first time, we want a chance to run this
326 // // thread for a bit, even if there are others banging at the
327 // // door.
328 // first = rtsFalse;
329 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
330 }
331
332 scheduleYield(&cap,task);
333
334 if (emptyRunQueue(cap)) continue; // look for work again
335 #endif
336
337 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
338 if ( emptyRunQueue(cap) ) {
339 ASSERT(sched_state >= SCHED_INTERRUPTING);
340 }
341 #endif
342
343 //
344 // Get a thread to run
345 //
346 t = popRunQueue(cap);
347
348 // Sanity check the thread we're about to run. This can be
349 // expensive if there is lots of thread switching going on...
350 IF_DEBUG(sanity,checkTSO(t));
351
352 #if defined(THREADED_RTS)
353 // Check whether we can run this thread in the current task.
354 // If not, we have to pass our capability to the right task.
355 {
356 InCall *bound = t->bound;
357
358 if (bound) {
359 if (bound->task == task) {
360 // yes, the Haskell thread is bound to the current native thread
361 } else {
362 debugTrace(DEBUG_sched,
363 "thread %lu bound to another OS thread",
364 (unsigned long)t->id);
365 // no, bound to a different Haskell thread: pass to that thread
366 pushOnRunQueue(cap,t);
367 continue;
368 }
369 } else {
370 // The thread we want to run is unbound.
371 if (task->incall->tso) {
372 debugTrace(DEBUG_sched,
373 "this OS thread cannot run thread %lu",
374 (unsigned long)t->id);
375 // no, the current native thread is bound to a different
376 // Haskell thread, so pass it to any worker thread
377 pushOnRunQueue(cap,t);
378 continue;
379 }
380 }
381 }
382 #endif
383
384 // If we're shutting down, and this thread has not yet been
385 // killed, kill it now. This sometimes happens when a finalizer
386 // thread is created by the final GC, or a thread previously
387 // in a foreign call returns.
388 if (sched_state >= SCHED_INTERRUPTING &&
389 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
390 deleteThread(cap,t);
391 }
392
393 /* context switches are initiated by the timer signal, unless
394 * the user specified "context switch as often as possible", with
395 * +RTS -C0
396 */
397 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
398 && !emptyThreadQueues(cap)) {
399 cap->context_switch = 1;
400 }
401
402 run_thread:
403
404 // CurrentTSO is the thread to run. t might be different if we
405 // loop back to run_thread, so make sure to set CurrentTSO after
406 // that.
407 cap->r.rCurrentTSO = t;
408
409 startHeapProfTimer();
410
411 // ----------------------------------------------------------------------
412 // Run the current thread
413
414 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
415 ASSERT(t->cap == cap);
416 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
417
418 prev_what_next = t->what_next;
419
420 errno = t->saved_errno;
421 #if mingw32_HOST_OS
422 SetLastError(t->saved_winerror);
423 #endif
424
425 // reset the interrupt flag before running Haskell code
426 cap->interrupt = 0;
427
428 cap->in_haskell = rtsTrue;
429
430 dirty_TSO(cap,t);
431 dirty_STACK(cap,t->stackobj);
432
433 #if defined(THREADED_RTS)
434 if (recent_activity == ACTIVITY_DONE_GC) {
435 // ACTIVITY_DONE_GC means we turned off the timer signal to
436 // conserve power (see #1623). Re-enable it here.
437 nat prev;
438 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
439 if (prev == ACTIVITY_DONE_GC) {
440 startTimer();
441 }
442 } else if (recent_activity != ACTIVITY_INACTIVE) {
443 // If we reached ACTIVITY_INACTIVE, then don't reset it until
444 // we've done the GC. The thread running here might just be
445 // the IO manager thread that handle_tick() woke up via
446 // wakeUpRts().
447 recent_activity = ACTIVITY_YES;
448 }
449 #endif
450
451 traceEventRunThread(cap, t);
452
453 switch (prev_what_next) {
454
455 case ThreadKilled:
456 case ThreadComplete:
457 /* Thread already finished, return to scheduler. */
458 ret = ThreadFinished;
459 break;
460
461 case ThreadRunGHC:
462 {
463 StgRegTable *r;
464 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
465 cap = regTableToCapability(r);
466 ret = r->rRet;
467 break;
468 }
469
470 case ThreadInterpret:
471 cap = interpretBCO(cap);
472 ret = cap->r.rRet;
473 break;
474
475 default:
476 barf("schedule: invalid what_next field");
477 }
478
479 cap->in_haskell = rtsFalse;
480
481 // The TSO might have moved, eg. if it re-entered the RTS and a GC
482 // happened. So find the new location:
483 t = cap->r.rCurrentTSO;
484
485 // And save the current errno in this thread.
486 // XXX: possibly bogus for SMP because this thread might already
487 // be running again, see code below.
488 t->saved_errno = errno;
489 #if mingw32_HOST_OS
490 // Similarly for Windows error code
491 t->saved_winerror = GetLastError();
492 #endif
493
494 if (ret == ThreadBlocked) {
495 if (t->why_blocked == BlockedOnBlackHole) {
496 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
497 traceEventStopThread(cap, t, t->why_blocked + 6,
498 owner != NULL ? owner->id : 0);
499 } else {
500 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
501 }
502 } else {
503 traceEventStopThread(cap, t, ret, 0);
504 }
505
506 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
507 ASSERT(t->cap == cap);
508
509 // ----------------------------------------------------------------------
510
511 // Costs for the scheduler are assigned to CCS_SYSTEM
512 stopHeapProfTimer();
513 #if defined(PROFILING)
514 cap->r.rCCCS = CCS_SYSTEM;
515 #endif
516
517 schedulePostRunThread(cap,t);
518
519 ready_to_gc = rtsFalse;
520
521 switch (ret) {
522 case HeapOverflow:
523 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
524 break;
525
526 case StackOverflow:
527 // just adjust the stack for this thread, then pop it back
528 // on the run queue.
529 threadStackOverflow(cap, t);
530 pushOnRunQueue(cap,t);
531 break;
532
533 case ThreadYielding:
534 if (scheduleHandleYield(cap, t, prev_what_next)) {
535 // shortcut for switching between compiler/interpreter:
536 goto run_thread;
537 }
538 break;
539
540 case ThreadBlocked:
541 scheduleHandleThreadBlocked(t);
542 break;
543
544 case ThreadFinished:
545 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
546 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
547 break;
548
549 default:
550 barf("schedule: invalid thread return code %d", (int)ret);
551 }
552
553 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
554 cap = scheduleDoGC(cap,task,rtsFalse);
555 }
556 } /* end of while() */
557 }
558
559 /* -----------------------------------------------------------------------------
560 * Run queue operations
561 * -------------------------------------------------------------------------- */
562
563 void
564 removeFromRunQueue (Capability *cap, StgTSO *tso)
565 {
566 if (tso->block_info.prev == END_TSO_QUEUE) {
567 ASSERT(cap->run_queue_hd == tso);
568 cap->run_queue_hd = tso->_link;
569 } else {
570 setTSOLink(cap, tso->block_info.prev, tso->_link);
571 }
572 if (tso->_link == END_TSO_QUEUE) {
573 ASSERT(cap->run_queue_tl == tso);
574 cap->run_queue_tl = tso->block_info.prev;
575 } else {
576 setTSOPrev(cap, tso->_link, tso->block_info.prev);
577 }
578 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
579
580 IF_DEBUG(sanity, checkRunQueue(cap));
581 }
582
583 /* ----------------------------------------------------------------------------
584 * Setting up the scheduler loop
585 * ------------------------------------------------------------------------- */
586
587 static void
588 schedulePreLoop(void)
589 {
590 // initialisation for scheduler - what cannot go into initScheduler()
591
592 #if defined(mingw32_HOST_OS)
593 win32AllocStack();
594 #endif
595 }
596
597 /* -----------------------------------------------------------------------------
598 * scheduleFindWork()
599 *
600 * Search for work to do, and handle messages from elsewhere.
601 * -------------------------------------------------------------------------- */
602
603 static void
604 scheduleFindWork (Capability *cap)
605 {
606 scheduleStartSignalHandlers(cap);
607
608 scheduleProcessInbox(cap);
609
610 scheduleCheckBlockedThreads(cap);
611
612 #if defined(THREADED_RTS)
613 if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
614 #endif
615 }
616
617 #if defined(THREADED_RTS)
618 STATIC_INLINE rtsBool
619 shouldYieldCapability (Capability *cap, Task *task)
620 {
621 // we need to yield this capability to someone else if..
622 // - another thread is initiating a GC
623 // - another Task is returning from a foreign call
624 // - the thread at the head of the run queue cannot be run
625 // by this Task (it is bound to another Task, or it is unbound
626 // and this task it bound).
627 return (pending_sync ||
628 cap->returning_tasks_hd != NULL ||
629 (!emptyRunQueue(cap) && (task->incall->tso == NULL
630 ? cap->run_queue_hd->bound != NULL
631 : cap->run_queue_hd->bound != task->incall)));
632 }
633
634 // This is the single place where a Task goes to sleep. There are
635 // two reasons it might need to sleep:
636 // - there are no threads to run
637 // - we need to yield this Capability to someone else
638 // (see shouldYieldCapability())
639 //
640 // Careful: the scheduler loop is quite delicate. Make sure you run
641 // the tests in testsuite/concurrent (all ways) after modifying this,
642 // and also check the benchmarks in nofib/parallel for regressions.
643
644 static void
645 scheduleYield (Capability **pcap, Task *task)
646 {
647 Capability *cap = *pcap;
648
649 // if we have work, and we don't need to give up the Capability, continue.
650 //
651 if (!shouldYieldCapability(cap,task) &&
652 (!emptyRunQueue(cap) ||
653 !emptyInbox(cap) ||
654 sched_state >= SCHED_INTERRUPTING))
655 return;
656
657 // otherwise yield (sleep), and keep yielding if necessary.
658 do {
659 yieldCapability(&cap,task);
660 }
661 while (shouldYieldCapability(cap,task));
662
663 // note there may still be no threads on the run queue at this
664 // point, the caller has to check.
665
666 *pcap = cap;
667 return;
668 }
669 #endif
670
671 /* -----------------------------------------------------------------------------
672 * schedulePushWork()
673 *
674 * Push work to other Capabilities if we have some.
675 * -------------------------------------------------------------------------- */
676
677 static void
678 schedulePushWork(Capability *cap USED_IF_THREADS,
679 Task *task USED_IF_THREADS)
680 {
681 /* following code not for PARALLEL_HASKELL. I kept the call general,
682 future GUM versions might use pushing in a distributed setup */
683 #if defined(THREADED_RTS)
684
685 Capability *free_caps[n_capabilities], *cap0;
686 nat i, n_free_caps;
687
688 // migration can be turned off with +RTS -qm
689 if (!RtsFlags.ParFlags.migrate) return;
690
691 // Check whether we have more threads on our run queue, or sparks
692 // in our pool, that we could hand to another Capability.
693 if (cap->run_queue_hd == END_TSO_QUEUE) {
694 if (sparkPoolSizeCap(cap) < 2) return;
695 } else {
696 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
697 sparkPoolSizeCap(cap) < 1) return;
698 }
699
700 // First grab as many free Capabilities as we can.
701 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
702 cap0 = &capabilities[i];
703 if (cap != cap0 && tryGrabCapability(cap0,task)) {
704 if (!emptyRunQueue(cap0)
705 || cap->returning_tasks_hd != NULL
706 || cap->inbox != (Message*)END_TSO_QUEUE) {
707 // it already has some work, we just grabbed it at
708 // the wrong moment. Or maybe it's deadlocked!
709 releaseCapability(cap0);
710 } else {
711 free_caps[n_free_caps++] = cap0;
712 }
713 }
714 }
715
716 // we now have n_free_caps free capabilities stashed in
717 // free_caps[]. Share our run queue equally with them. This is
718 // probably the simplest thing we could do; improvements we might
719 // want to do include:
720 //
721 // - giving high priority to moving relatively new threads, on
722 // the gournds that they haven't had time to build up a
723 // working set in the cache on this CPU/Capability.
724 //
725 // - giving low priority to moving long-lived threads
726
727 if (n_free_caps > 0) {
728 StgTSO *prev, *t, *next;
729 #ifdef SPARK_PUSHING
730 rtsBool pushed_to_all;
731 #endif
732
733 debugTrace(DEBUG_sched,
734 "cap %d: %s and %d free capabilities, sharing...",
735 cap->no,
736 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
737 "excess threads on run queue":"sparks to share (>=2)",
738 n_free_caps);
739
740 i = 0;
741 #ifdef SPARK_PUSHING
742 pushed_to_all = rtsFalse;
743 #endif
744
745 if (cap->run_queue_hd != END_TSO_QUEUE) {
746 prev = cap->run_queue_hd;
747 t = prev->_link;
748 prev->_link = END_TSO_QUEUE;
749 for (; t != END_TSO_QUEUE; t = next) {
750 next = t->_link;
751 t->_link = END_TSO_QUEUE;
752 if (t->bound == task->incall // don't move my bound thread
753 || tsoLocked(t)) { // don't move a locked thread
754 setTSOLink(cap, prev, t);
755 setTSOPrev(cap, t, prev);
756 prev = t;
757 } else if (i == n_free_caps) {
758 #ifdef SPARK_PUSHING
759 pushed_to_all = rtsTrue;
760 #endif
761 i = 0;
762 // keep one for us
763 setTSOLink(cap, prev, t);
764 setTSOPrev(cap, t, prev);
765 prev = t;
766 } else {
767 appendToRunQueue(free_caps[i],t);
768
769 traceEventMigrateThread (cap, t, free_caps[i]->no);
770
771 if (t->bound) { t->bound->task->cap = free_caps[i]; }
772 t->cap = free_caps[i];
773 i++;
774 }
775 }
776 cap->run_queue_tl = prev;
777
778 IF_DEBUG(sanity, checkRunQueue(cap));
779 }
780
781 #ifdef SPARK_PUSHING
782 /* JB I left this code in place, it would work but is not necessary */
783
784 // If there are some free capabilities that we didn't push any
785 // threads to, then try to push a spark to each one.
786 if (!pushed_to_all) {
787 StgClosure *spark;
788 // i is the next free capability to push to
789 for (; i < n_free_caps; i++) {
790 if (emptySparkPoolCap(free_caps[i])) {
791 spark = tryStealSpark(cap->sparks);
792 if (spark != NULL) {
793 /* TODO: if anyone wants to re-enable this code then
794 * they must consider the fizzledSpark(spark) case
795 * and update the per-cap spark statistics.
796 */
797 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
798
799 traceEventStealSpark(free_caps[i], t, cap->no);
800
801 newSpark(&(free_caps[i]->r), spark);
802 }
803 }
804 }
805 }
806 #endif /* SPARK_PUSHING */
807
808 // release the capabilities
809 for (i = 0; i < n_free_caps; i++) {
810 task->cap = free_caps[i];
811 releaseAndWakeupCapability(free_caps[i]);
812 }
813 }
814 task->cap = cap; // reset to point to our Capability.
815
816 #endif /* THREADED_RTS */
817
818 }
819
820 /* ----------------------------------------------------------------------------
821 * Start any pending signal handlers
822 * ------------------------------------------------------------------------- */
823
824 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
825 static void
826 scheduleStartSignalHandlers(Capability *cap)
827 {
828 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
829 // safe outside the lock
830 startSignalHandlers(cap);
831 }
832 }
833 #else
834 static void
835 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
836 {
837 }
838 #endif
839
840 /* ----------------------------------------------------------------------------
841 * Check for blocked threads that can be woken up.
842 * ------------------------------------------------------------------------- */
843
844 static void
845 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
846 {
847 #if !defined(THREADED_RTS)
848 //
849 // Check whether any waiting threads need to be woken up. If the
850 // run queue is empty, and there are no other tasks running, we
851 // can wait indefinitely for something to happen.
852 //
853 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
854 {
855 awaitEvent (emptyRunQueue(cap));
856 }
857 #endif
858 }
859
860 /* ----------------------------------------------------------------------------
861 * Detect deadlock conditions and attempt to resolve them.
862 * ------------------------------------------------------------------------- */
863
864 static void
865 scheduleDetectDeadlock (Capability *cap, Task *task)
866 {
867 /*
868 * Detect deadlock: when we have no threads to run, there are no
869 * threads blocked, waiting for I/O, or sleeping, and all the
870 * other tasks are waiting for work, we must have a deadlock of
871 * some description.
872 */
873 if ( emptyThreadQueues(cap) )
874 {
875 #if defined(THREADED_RTS)
876 /*
877 * In the threaded RTS, we only check for deadlock if there
878 * has been no activity in a complete timeslice. This means
879 * we won't eagerly start a full GC just because we don't have
880 * any threads to run currently.
881 */
882 if (recent_activity != ACTIVITY_INACTIVE) return;
883 #endif
884
885 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
886
887 // Garbage collection can release some new threads due to
888 // either (a) finalizers or (b) threads resurrected because
889 // they are unreachable and will therefore be sent an
890 // exception. Any threads thus released will be immediately
891 // runnable.
892 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
893 // when force_major == rtsTrue. scheduleDoGC sets
894 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
895 // signal.
896
897 if ( !emptyRunQueue(cap) ) return;
898
899 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
900 /* If we have user-installed signal handlers, then wait
901 * for signals to arrive rather then bombing out with a
902 * deadlock.
903 */
904 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
905 debugTrace(DEBUG_sched,
906 "still deadlocked, waiting for signals...");
907
908 awaitUserSignals();
909
910 if (signals_pending()) {
911 startSignalHandlers(cap);
912 }
913
914 // either we have threads to run, or we were interrupted:
915 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
916
917 return;
918 }
919 #endif
920
921 #if !defined(THREADED_RTS)
922 /* Probably a real deadlock. Send the current main thread the
923 * Deadlock exception.
924 */
925 if (task->incall->tso) {
926 switch (task->incall->tso->why_blocked) {
927 case BlockedOnSTM:
928 case BlockedOnBlackHole:
929 case BlockedOnMsgThrowTo:
930 case BlockedOnMVar:
931 throwToSingleThreaded(cap, task->incall->tso,
932 (StgClosure *)nonTermination_closure);
933 return;
934 default:
935 barf("deadlock: main thread blocked in a strange way");
936 }
937 }
938 return;
939 #endif
940 }
941 }
942
943
944 /* ----------------------------------------------------------------------------
945 * Send pending messages (PARALLEL_HASKELL only)
946 * ------------------------------------------------------------------------- */
947
948 #if defined(PARALLEL_HASKELL)
949 static void
950 scheduleSendPendingMessages(void)
951 {
952
953 # if defined(PAR) // global Mem.Mgmt., omit for now
954 if (PendingFetches != END_BF_QUEUE) {
955 processFetches();
956 }
957 # endif
958
959 if (RtsFlags.ParFlags.BufferTime) {
960 // if we use message buffering, we must send away all message
961 // packets which have become too old...
962 sendOldBuffers();
963 }
964 }
965 #endif
966
967 /* ----------------------------------------------------------------------------
968 * Process message in the current Capability's inbox
969 * ------------------------------------------------------------------------- */
970
971 static void
972 scheduleProcessInbox (Capability *cap USED_IF_THREADS)
973 {
974 #if defined(THREADED_RTS)
975 Message *m, *next;
976 int r;
977
978 while (!emptyInbox(cap)) {
979 if (cap->r.rCurrentNursery->link == NULL ||
980 g0->n_new_large_words >= large_alloc_lim) {
981 scheduleDoGC(cap, cap->running_task, rtsFalse);
982 }
983
984 // don't use a blocking acquire; if the lock is held by
985 // another thread then just carry on. This seems to avoid
986 // getting stuck in a message ping-pong situation with other
987 // processors. We'll check the inbox again later anyway.
988 //
989 // We should really use a more efficient queue data structure
990 // here. The trickiness is that we must ensure a Capability
991 // never goes idle if the inbox is non-empty, which is why we
992 // use cap->lock (cap->lock is released as the last thing
993 // before going idle; see Capability.c:releaseCapability()).
994 r = TRY_ACQUIRE_LOCK(&cap->lock);
995 if (r != 0) return;
996
997 m = cap->inbox;
998 cap->inbox = (Message*)END_TSO_QUEUE;
999
1000 RELEASE_LOCK(&cap->lock);
1001
1002 while (m != (Message*)END_TSO_QUEUE) {
1003 next = m->link;
1004 executeMessage(cap, m);
1005 m = next;
1006 }
1007 }
1008 #endif
1009 }
1010
1011 /* ----------------------------------------------------------------------------
1012 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1013 * ------------------------------------------------------------------------- */
1014
1015 #if defined(THREADED_RTS)
1016 static void
1017 scheduleActivateSpark(Capability *cap)
1018 {
1019 if (anySparks())
1020 {
1021 createSparkThread(cap);
1022 debugTrace(DEBUG_sched, "creating a spark thread");
1023 }
1024 }
1025 #endif // PARALLEL_HASKELL || THREADED_RTS
1026
1027 /* ----------------------------------------------------------------------------
1028 * After running a thread...
1029 * ------------------------------------------------------------------------- */
1030
1031 static void
1032 schedulePostRunThread (Capability *cap, StgTSO *t)
1033 {
1034 // We have to be able to catch transactions that are in an
1035 // infinite loop as a result of seeing an inconsistent view of
1036 // memory, e.g.
1037 //
1038 // atomically $ do
1039 // [a,b] <- mapM readTVar [ta,tb]
1040 // when (a == b) loop
1041 //
1042 // and a is never equal to b given a consistent view of memory.
1043 //
1044 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1045 if (!stmValidateNestOfTransactions (t -> trec)) {
1046 debugTrace(DEBUG_sched | DEBUG_stm,
1047 "trec %p found wasting its time", t);
1048
1049 // strip the stack back to the
1050 // ATOMICALLY_FRAME, aborting the (nested)
1051 // transaction, and saving the stack of any
1052 // partially-evaluated thunks on the heap.
1053 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1054
1055 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1056 }
1057 }
1058
1059 /* some statistics gathering in the parallel case */
1060 }
1061
1062 /* -----------------------------------------------------------------------------
1063 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1064 * -------------------------------------------------------------------------- */
1065
1066 static rtsBool
1067 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1068 {
1069 // did the task ask for a large block?
1070 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1071 // if so, get one and push it on the front of the nursery.
1072 bdescr *bd;
1073 lnat blocks;
1074
1075 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1076
1077 if (blocks > BLOCKS_PER_MBLOCK) {
1078 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1079 }
1080
1081 debugTrace(DEBUG_sched,
1082 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1083 (long)t->id, what_next_strs[t->what_next], blocks);
1084
1085 // don't do this if the nursery is (nearly) full, we'll GC first.
1086 if (cap->r.rCurrentNursery->link != NULL ||
1087 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1088 // if the nursery has only one block.
1089
1090 bd = allocGroup_lock(blocks);
1091 cap->r.rNursery->n_blocks += blocks;
1092
1093 // link the new group into the list
1094 bd->link = cap->r.rCurrentNursery;
1095 bd->u.back = cap->r.rCurrentNursery->u.back;
1096 if (cap->r.rCurrentNursery->u.back != NULL) {
1097 cap->r.rCurrentNursery->u.back->link = bd;
1098 } else {
1099 cap->r.rNursery->blocks = bd;
1100 }
1101 cap->r.rCurrentNursery->u.back = bd;
1102
1103 // initialise it as a nursery block. We initialise the
1104 // step, gen_no, and flags field of *every* sub-block in
1105 // this large block, because this is easier than making
1106 // sure that we always find the block head of a large
1107 // block whenever we call Bdescr() (eg. evacuate() and
1108 // isAlive() in the GC would both have to do this, at
1109 // least).
1110 {
1111 bdescr *x;
1112 for (x = bd; x < bd + blocks; x++) {
1113 initBdescr(x,g0,g0);
1114 x->free = x->start;
1115 x->flags = 0;
1116 }
1117 }
1118
1119 // This assert can be a killer if the app is doing lots
1120 // of large block allocations.
1121 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1122
1123 // now update the nursery to point to the new block
1124 cap->r.rCurrentNursery = bd;
1125
1126 // we might be unlucky and have another thread get on the
1127 // run queue before us and steal the large block, but in that
1128 // case the thread will just end up requesting another large
1129 // block.
1130 pushOnRunQueue(cap,t);
1131 return rtsFalse; /* not actually GC'ing */
1132 }
1133 }
1134
1135 if (cap->r.rHpLim == NULL || cap->context_switch) {
1136 // Sometimes we miss a context switch, e.g. when calling
1137 // primitives in a tight loop, MAYBE_GC() doesn't check the
1138 // context switch flag, and we end up waiting for a GC.
1139 // See #1984, and concurrent/should_run/1984
1140 cap->context_switch = 0;
1141 appendToRunQueue(cap,t);
1142 } else {
1143 pushOnRunQueue(cap,t);
1144 }
1145 return rtsTrue;
1146 /* actual GC is done at the end of the while loop in schedule() */
1147 }
1148
1149 /* -----------------------------------------------------------------------------
1150 * Handle a thread that returned to the scheduler with ThreadYielding
1151 * -------------------------------------------------------------------------- */
1152
1153 static rtsBool
1154 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1155 {
1156 /* put the thread back on the run queue. Then, if we're ready to
1157 * GC, check whether this is the last task to stop. If so, wake
1158 * up the GC thread. getThread will block during a GC until the
1159 * GC is finished.
1160 */
1161
1162 ASSERT(t->_link == END_TSO_QUEUE);
1163
1164 // Shortcut if we're just switching evaluators: don't bother
1165 // doing stack squeezing (which can be expensive), just run the
1166 // thread.
1167 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1168 debugTrace(DEBUG_sched,
1169 "--<< thread %ld (%s) stopped to switch evaluators",
1170 (long)t->id, what_next_strs[t->what_next]);
1171 return rtsTrue;
1172 }
1173
1174 // Reset the context switch flag. We don't do this just before
1175 // running the thread, because that would mean we would lose ticks
1176 // during GC, which can lead to unfair scheduling (a thread hogs
1177 // the CPU because the tick always arrives during GC). This way
1178 // penalises threads that do a lot of allocation, but that seems
1179 // better than the alternative.
1180 if (cap->context_switch != 0) {
1181 cap->context_switch = 0;
1182 appendToRunQueue(cap,t);
1183 } else {
1184 pushOnRunQueue(cap,t);
1185 }
1186
1187 IF_DEBUG(sanity,
1188 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1189 checkTSO(t));
1190
1191 return rtsFalse;
1192 }
1193
1194 /* -----------------------------------------------------------------------------
1195 * Handle a thread that returned to the scheduler with ThreadBlocked
1196 * -------------------------------------------------------------------------- */
1197
1198 static void
1199 scheduleHandleThreadBlocked( StgTSO *t
1200 #if !defined(DEBUG)
1201 STG_UNUSED
1202 #endif
1203 )
1204 {
1205
1206 // We don't need to do anything. The thread is blocked, and it
1207 // has tidied up its stack and placed itself on whatever queue
1208 // it needs to be on.
1209
1210 // ASSERT(t->why_blocked != NotBlocked);
1211 // Not true: for example,
1212 // - the thread may have woken itself up already, because
1213 // threadPaused() might have raised a blocked throwTo
1214 // exception, see maybePerformBlockedException().
1215
1216 #ifdef DEBUG
1217 traceThreadStatus(DEBUG_sched, t);
1218 #endif
1219 }
1220
1221 /* -----------------------------------------------------------------------------
1222 * Handle a thread that returned to the scheduler with ThreadFinished
1223 * -------------------------------------------------------------------------- */
1224
1225 static rtsBool
1226 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1227 {
1228 /* Need to check whether this was a main thread, and if so,
1229 * return with the return value.
1230 *
1231 * We also end up here if the thread kills itself with an
1232 * uncaught exception, see Exception.cmm.
1233 */
1234
1235 // blocked exceptions can now complete, even if the thread was in
1236 // blocked mode (see #2910).
1237 awakenBlockedExceptionQueue (cap, t);
1238
1239 //
1240 // Check whether the thread that just completed was a bound
1241 // thread, and if so return with the result.
1242 //
1243 // There is an assumption here that all thread completion goes
1244 // through this point; we need to make sure that if a thread
1245 // ends up in the ThreadKilled state, that it stays on the run
1246 // queue so it can be dealt with here.
1247 //
1248
1249 if (t->bound) {
1250
1251 if (t->bound != task->incall) {
1252 #if !defined(THREADED_RTS)
1253 // Must be a bound thread that is not the topmost one. Leave
1254 // it on the run queue until the stack has unwound to the
1255 // point where we can deal with this. Leaving it on the run
1256 // queue also ensures that the garbage collector knows about
1257 // this thread and its return value (it gets dropped from the
1258 // step->threads list so there's no other way to find it).
1259 appendToRunQueue(cap,t);
1260 return rtsFalse;
1261 #else
1262 // this cannot happen in the threaded RTS, because a
1263 // bound thread can only be run by the appropriate Task.
1264 barf("finished bound thread that isn't mine");
1265 #endif
1266 }
1267
1268 ASSERT(task->incall->tso == t);
1269
1270 if (t->what_next == ThreadComplete) {
1271 if (task->incall->ret) {
1272 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1273 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1274 }
1275 task->incall->stat = Success;
1276 } else {
1277 if (task->incall->ret) {
1278 *(task->incall->ret) = NULL;
1279 }
1280 if (sched_state >= SCHED_INTERRUPTING) {
1281 if (heap_overflow) {
1282 task->incall->stat = HeapExhausted;
1283 } else {
1284 task->incall->stat = Interrupted;
1285 }
1286 } else {
1287 task->incall->stat = Killed;
1288 }
1289 }
1290 #ifdef DEBUG
1291 removeThreadLabel((StgWord)task->incall->tso->id);
1292 #endif
1293
1294 // We no longer consider this thread and task to be bound to
1295 // each other. The TSO lives on until it is GC'd, but the
1296 // task is about to be released by the caller, and we don't
1297 // want anyone following the pointer from the TSO to the
1298 // defunct task (which might have already been
1299 // re-used). This was a real bug: the GC updated
1300 // tso->bound->tso which lead to a deadlock.
1301 t->bound = NULL;
1302 task->incall->tso = NULL;
1303
1304 return rtsTrue; // tells schedule() to return
1305 }
1306
1307 return rtsFalse;
1308 }
1309
1310 /* -----------------------------------------------------------------------------
1311 * Perform a heap census
1312 * -------------------------------------------------------------------------- */
1313
1314 static rtsBool
1315 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1316 {
1317 // When we have +RTS -i0 and we're heap profiling, do a census at
1318 // every GC. This lets us get repeatable runs for debugging.
1319 if (performHeapProfile ||
1320 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1321 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1322 return rtsTrue;
1323 } else {
1324 return rtsFalse;
1325 }
1326 }
1327
1328 /* -----------------------------------------------------------------------------
1329 * Start a synchronisation of all capabilities
1330 * -------------------------------------------------------------------------- */
1331
1332 // Returns:
1333 // 0 if we successfully got a sync
1334 // non-0 if there was another sync request in progress,
1335 // and we yielded to it. The value returned is the
1336 // type of the other sync request.
1337 //
1338 #if defined(THREADED_RTS)
1339 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1340 {
1341 nat prev_pending_sync;
1342
1343 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1344
1345 if (prev_pending_sync)
1346 {
1347 do {
1348 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1349 prev_pending_sync);
1350 ASSERT(*pcap);
1351 yieldCapability(pcap,task);
1352 } while (pending_sync);
1353 return prev_pending_sync; // NOTE: task->cap might have changed now
1354 }
1355 else
1356 {
1357 return 0;
1358 }
1359 }
1360
1361 //
1362 // Grab all the capabilities except the one we already hold. Used
1363 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1364 // before a fork (SYNC_OTHER).
1365 //
1366 // Only call this after requestSync(), otherwise a deadlock might
1367 // ensue if another thread is trying to synchronise.
1368 //
1369 static void acquireAllCapabilities(Capability *cap, Task *task)
1370 {
1371 Capability *tmpcap;
1372 nat i;
1373
1374 for (i=0; i < n_capabilities; i++) {
1375 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1376 tmpcap = &capabilities[i];
1377 if (tmpcap != cap) {
1378 // we better hope this task doesn't get migrated to
1379 // another Capability while we're waiting for this one.
1380 // It won't, because load balancing happens while we have
1381 // all the Capabilities, but even so it's a slightly
1382 // unsavoury invariant.
1383 task->cap = tmpcap;
1384 waitForReturnCapability(&tmpcap, task);
1385 if (tmpcap->no != i) {
1386 barf("acquireAllCapabilities: got the wrong capability");
1387 }
1388 }
1389 }
1390 task->cap = cap;
1391 }
1392
1393 static void releaseAllCapabilities(Capability *cap, Task *task)
1394 {
1395 nat i;
1396
1397 for (i = 0; i < n_capabilities; i++) {
1398 if (cap->no != i) {
1399 task->cap = &capabilities[i];
1400 releaseCapability(&capabilities[i]);
1401 }
1402 }
1403 task->cap = cap;
1404 }
1405 #endif
1406
1407 /* -----------------------------------------------------------------------------
1408 * Perform a garbage collection if necessary
1409 * -------------------------------------------------------------------------- */
1410
1411 static Capability *
1412 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1413 {
1414 rtsBool heap_census;
1415 #ifdef THREADED_RTS
1416 rtsBool gc_type;
1417 nat i, sync;
1418 #endif
1419
1420 if (sched_state == SCHED_SHUTTING_DOWN) {
1421 // The final GC has already been done, and the system is
1422 // shutting down. We'll probably deadlock if we try to GC
1423 // now.
1424 return cap;
1425 }
1426
1427 #ifdef THREADED_RTS
1428 if (sched_state < SCHED_INTERRUPTING
1429 && RtsFlags.ParFlags.parGcEnabled
1430 && N >= RtsFlags.ParFlags.parGcGen
1431 && ! oldest_gen->mark)
1432 {
1433 gc_type = SYNC_GC_PAR;
1434 } else {
1435 gc_type = SYNC_GC_SEQ;
1436 }
1437
1438 // In order to GC, there must be no threads running Haskell code.
1439 // Therefore, the GC thread needs to hold *all* the capabilities,
1440 // and release them after the GC has completed.
1441 //
1442 // This seems to be the simplest way: previous attempts involved
1443 // making all the threads with capabilities give up their
1444 // capabilities and sleep except for the *last* one, which
1445 // actually did the GC. But it's quite hard to arrange for all
1446 // the other tasks to sleep and stay asleep.
1447 //
1448
1449 /* Other capabilities are prevented from running yet more Haskell
1450 threads if pending_sync is set. Tested inside
1451 yieldCapability() and releaseCapability() in Capability.c */
1452
1453 do {
1454 sync = requestSync(&cap, task, gc_type);
1455 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1456 // someone else had a pending sync request for a GC, so
1457 // let's assume GC has been done and we don't need to GC
1458 // again.
1459 return cap;
1460 }
1461 } while (sync);
1462
1463 interruptAllCapabilities();
1464
1465 // The final shutdown GC is always single-threaded, because it's
1466 // possible that some of the Capabilities have no worker threads.
1467
1468 if (gc_type == SYNC_GC_SEQ)
1469 {
1470 traceEventRequestSeqGc(cap);
1471 }
1472 else
1473 {
1474 traceEventRequestParGc(cap);
1475 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1476 }
1477
1478 if (gc_type == SYNC_GC_SEQ)
1479 {
1480 // single-threaded GC: grab all the capabilities
1481 acquireAllCapabilities(cap,task);
1482 }
1483 else
1484 {
1485 // multi-threaded GC: make sure all the Capabilities donate one
1486 // GC thread each.
1487 waitForGcThreads(cap);
1488
1489 #if defined(THREADED_RTS)
1490 // Stable point where we can do a global check on our spark counters
1491 ASSERT(checkSparkCountInvariant());
1492 #endif
1493 }
1494
1495 #endif
1496
1497 IF_DEBUG(scheduler, printAllThreads());
1498
1499 delete_threads_and_gc:
1500 /*
1501 * We now have all the capabilities; if we're in an interrupting
1502 * state, then we should take the opportunity to delete all the
1503 * threads in the system.
1504 */
1505 if (sched_state == SCHED_INTERRUPTING) {
1506 deleteAllThreads(cap);
1507 #if defined(THREADED_RTS)
1508 // Discard all the sparks from every Capability. Why?
1509 // They'll probably be GC'd anyway since we've killed all the
1510 // threads. It just avoids the GC having to do any work to
1511 // figure out that any remaining sparks are garbage.
1512 for (i = 0; i < n_capabilities; i++) {
1513 capabilities[i].spark_stats.gcd +=
1514 sparkPoolSize(capabilities[i].sparks);
1515 // No race here since all Caps are stopped.
1516 discardSparksCap(&capabilities[i]);
1517 }
1518 #endif
1519 sched_state = SCHED_SHUTTING_DOWN;
1520 }
1521
1522 heap_census = scheduleNeedHeapProfile(rtsTrue);
1523
1524 traceEventGcStart(cap);
1525 #if defined(THREADED_RTS)
1526 // reset pending_sync *before* GC, so that when the GC threads
1527 // emerge they don't immediately re-enter the GC.
1528 pending_sync = 0;
1529 GarbageCollect(force_major || heap_census, heap_census, gc_type, cap);
1530 #else
1531 GarbageCollect(force_major || heap_census, heap_census, 0, cap);
1532 #endif
1533 traceEventGcEnd(cap);
1534
1535 traceSparkCounters(cap);
1536
1537 if (recent_activity == ACTIVITY_INACTIVE && force_major)
1538 {
1539 // We are doing a GC because the system has been idle for a
1540 // timeslice and we need to check for deadlock. Record the
1541 // fact that we've done a GC and turn off the timer signal;
1542 // it will get re-enabled if we run any threads after the GC.
1543 recent_activity = ACTIVITY_DONE_GC;
1544 stopTimer();
1545 }
1546 else
1547 {
1548 // the GC might have taken long enough for the timer to set
1549 // recent_activity = ACTIVITY_INACTIVE, but we aren't
1550 // necessarily deadlocked:
1551 recent_activity = ACTIVITY_YES;
1552 }
1553
1554 #if defined(THREADED_RTS)
1555 // Stable point where we can do a global check on our spark counters
1556 ASSERT(checkSparkCountInvariant());
1557 #endif
1558
1559 // The heap census itself is done during GarbageCollect().
1560 if (heap_census) {
1561 performHeapProfile = rtsFalse;
1562 }
1563
1564 #if defined(THREADED_RTS)
1565 if (gc_type == SYNC_GC_PAR)
1566 {
1567 releaseGCThreads(cap);
1568 }
1569 #endif
1570
1571 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1572 // GC set the heap_overflow flag, so we should proceed with
1573 // an orderly shutdown now. Ultimately we want the main
1574 // thread to return to its caller with HeapExhausted, at which
1575 // point the caller should call hs_exit(). The first step is
1576 // to delete all the threads.
1577 //
1578 // Another way to do this would be to raise an exception in
1579 // the main thread, which we really should do because it gives
1580 // the program a chance to clean up. But how do we find the
1581 // main thread? It should presumably be the same one that
1582 // gets ^C exceptions, but that's all done on the Haskell side
1583 // (GHC.TopHandler).
1584 sched_state = SCHED_INTERRUPTING;
1585 goto delete_threads_and_gc;
1586 }
1587
1588 #ifdef SPARKBALANCE
1589 /* JB
1590 Once we are all together... this would be the place to balance all
1591 spark pools. No concurrent stealing or adding of new sparks can
1592 occur. Should be defined in Sparks.c. */
1593 balanceSparkPoolsCaps(n_capabilities, capabilities);
1594 #endif
1595
1596 #if defined(THREADED_RTS)
1597 if (gc_type == SYNC_GC_SEQ) {
1598 // release our stash of capabilities.
1599 releaseAllCapabilities(cap, task);
1600 }
1601 #endif
1602
1603 return cap;
1604 }
1605
1606 /* ---------------------------------------------------------------------------
1607 * Singleton fork(). Do not copy any running threads.
1608 * ------------------------------------------------------------------------- */
1609
1610 pid_t
1611 forkProcess(HsStablePtr *entry
1612 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1613 STG_UNUSED
1614 #endif
1615 )
1616 {
1617 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1618 pid_t pid;
1619 StgTSO* t,*next;
1620 Capability *cap;
1621 nat g;
1622 Task *task = NULL;
1623 nat i;
1624 #ifdef THREADED_RTS
1625 nat sync;
1626 #endif
1627
1628 debugTrace(DEBUG_sched, "forking!");
1629
1630 task = newBoundTask();
1631
1632 cap = NULL;
1633 waitForReturnCapability(&cap, task);
1634
1635 #ifdef THREADED_RTS
1636 do {
1637 sync = requestSync(&cap, task, SYNC_OTHER);
1638 } while (sync);
1639
1640 acquireAllCapabilities(cap,task);
1641
1642 pending_sync = 0;
1643 #endif
1644
1645 // no funny business: hold locks while we fork, otherwise if some
1646 // other thread is holding a lock when the fork happens, the data
1647 // structure protected by the lock will forever be in an
1648 // inconsistent state in the child. See also #1391.
1649 ACQUIRE_LOCK(&sched_mutex);
1650 ACQUIRE_LOCK(&sm_mutex);
1651 ACQUIRE_LOCK(&stable_mutex);
1652 ACQUIRE_LOCK(&task->lock);
1653
1654 for (i=0; i < n_capabilities; i++) {
1655 ACQUIRE_LOCK(&capabilities[i].lock);
1656 }
1657
1658 stopTimer(); // See #4074
1659
1660 #if defined(TRACING)
1661 flushEventLog(); // so that child won't inherit dirty file buffers
1662 #endif
1663
1664 pid = fork();
1665
1666 if (pid) { // parent
1667
1668 startTimer(); // #4074
1669
1670 RELEASE_LOCK(&sched_mutex);
1671 RELEASE_LOCK(&sm_mutex);
1672 RELEASE_LOCK(&stable_mutex);
1673 RELEASE_LOCK(&task->lock);
1674
1675 for (i=0; i < n_capabilities; i++) {
1676 releaseCapability_(&capabilities[i],rtsFalse);
1677 RELEASE_LOCK(&capabilities[i].lock);
1678 }
1679 boundTaskExiting(task);
1680
1681 // just return the pid
1682 return pid;
1683
1684 } else { // child
1685
1686 #if defined(THREADED_RTS)
1687 initMutex(&sched_mutex);
1688 initMutex(&sm_mutex);
1689 initMutex(&stable_mutex);
1690 initMutex(&task->lock);
1691
1692 for (i=0; i < n_capabilities; i++) {
1693 initMutex(&capabilities[i].lock);
1694 }
1695 #endif
1696
1697 #ifdef TRACING
1698 resetTracing();
1699 #endif
1700
1701 // Now, all OS threads except the thread that forked are
1702 // stopped. We need to stop all Haskell threads, including
1703 // those involved in foreign calls. Also we need to delete
1704 // all Tasks, because they correspond to OS threads that are
1705 // now gone.
1706
1707 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1708 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1709 next = t->global_link;
1710 // don't allow threads to catch the ThreadKilled
1711 // exception, but we do want to raiseAsync() because these
1712 // threads may be evaluating thunks that we need later.
1713 deleteThread_(t->cap,t);
1714
1715 // stop the GC from updating the InCall to point to
1716 // the TSO. This is only necessary because the
1717 // OSThread bound to the TSO has been killed, and
1718 // won't get a chance to exit in the usual way (see
1719 // also scheduleHandleThreadFinished).
1720 t->bound = NULL;
1721 }
1722 }
1723
1724 discardTasksExcept(task);
1725
1726 for (i=0; i < n_capabilities; i++) {
1727 cap = &capabilities[i];
1728
1729 // Empty the run queue. It seems tempting to let all the
1730 // killed threads stay on the run queue as zombies to be
1731 // cleaned up later, but some of them may correspond to
1732 // bound threads for which the corresponding Task does not
1733 // exist.
1734 cap->run_queue_hd = END_TSO_QUEUE;
1735 cap->run_queue_tl = END_TSO_QUEUE;
1736
1737 // Any suspended C-calling Tasks are no more, their OS threads
1738 // don't exist now:
1739 cap->suspended_ccalls = NULL;
1740
1741 #if defined(THREADED_RTS)
1742 // Wipe our spare workers list, they no longer exist. New
1743 // workers will be created if necessary.
1744 cap->spare_workers = NULL;
1745 cap->n_spare_workers = 0;
1746 cap->returning_tasks_hd = NULL;
1747 cap->returning_tasks_tl = NULL;
1748 #endif
1749
1750 // Release all caps except 0, we'll use that for starting
1751 // the IO manager and running the client action below.
1752 if (cap->no != 0) {
1753 task->cap = cap;
1754 releaseCapability(cap);
1755 }
1756 }
1757 cap = &capabilities[0];
1758 task->cap = cap;
1759
1760 // Empty the threads lists. Otherwise, the garbage
1761 // collector may attempt to resurrect some of these threads.
1762 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1763 generations[g].threads = END_TSO_QUEUE;
1764 }
1765
1766 // On Unix, all timers are reset in the child, so we need to start
1767 // the timer again.
1768 initTimer();
1769 startTimer();
1770
1771 #if defined(THREADED_RTS)
1772 ioManagerStartCap(&cap);
1773 #endif
1774
1775 rts_evalStableIO(&cap, entry, NULL); // run the action
1776 rts_checkSchedStatus("forkProcess",cap);
1777
1778 rts_unlock(cap);
1779 hs_exit(); // clean up and exit
1780 stg_exit(EXIT_SUCCESS);
1781 }
1782 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1783 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1784 #endif
1785 }
1786
1787 /* ---------------------------------------------------------------------------
1788 * Increase the number of Capabilities
1789 *
1790 * Changing the number of Capabilities is very tricky! We can only do
1791 * it with the system fully stopped, so we do a full sync with
1792 * requestSync(SYNC_OTHER) and grab all the capabilities.
1793 *
1794 * Then we resize the appropriate data structures, and update all
1795 * references to the old data structures which have now moved.
1796 * Finally we release the Capabilities we are holding, and start
1797 * worker Tasks on the new Capabilities we created.
1798 *
1799 * ------------------------------------------------------------------------- */
1800
1801 void
1802 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1803 {
1804 #if !defined(THREADED_RTS)
1805
1806 barf("setNumCapabilities: not supported in the non-threaded RTS");
1807
1808 #else
1809 Task *task;
1810 Capability *cap;
1811 nat sync;
1812 StgTSO* t;
1813 nat g;
1814 Capability *old_capabilities;
1815
1816 if (new_n_capabilities == n_capabilities) return;
1817
1818 if (new_n_capabilities < n_capabilities) {
1819 barf("setNumCapabilities: reducing the number of Capabilities is not currently supported.");
1820 }
1821
1822 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1823 n_capabilities, new_n_capabilities);
1824
1825 cap = rts_lock();
1826 task = cap->running_task;
1827
1828 do {
1829 sync = requestSync(&cap, task, SYNC_OTHER);
1830 } while (sync);
1831
1832 acquireAllCapabilities(cap,task);
1833
1834 pending_sync = 0;
1835
1836 #if defined(TRACING)
1837 // Allocate eventlog buffers for the new capabilities. Note this
1838 // must be done before calling moreCapabilities(), because that
1839 // will emit events to add the new capabilities to capsets.
1840 tracingAddCapapilities(n_capabilities, new_n_capabilities);
1841 #endif
1842
1843 // Resize the capabilities array
1844 // NB. after this, capabilities points somewhere new. Any pointers
1845 // of type (Capability *) are now invalid.
1846 old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
1847
1848 // update our own cap pointer
1849 cap = &capabilities[cap->no];
1850
1851 // Resize and update storage manager data structures
1852 storageAddCapabilities(n_capabilities, new_n_capabilities);
1853
1854 // Update (Capability *) refs in the Task manager.
1855 updateCapabilityRefs();
1856
1857 // Update (Capability *) refs from TSOs
1858 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1859 for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
1860 t->cap = &capabilities[t->cap->no];
1861 }
1862 }
1863
1864 // We're done: release the original Capabilities
1865 releaseAllCapabilities(cap,task);
1866
1867 // Start worker tasks on the new Capabilities
1868 startWorkerTasks(n_capabilities, new_n_capabilities);
1869
1870 // finally, update n_capabilities
1871 n_capabilities = new_n_capabilities;
1872
1873 // We can't free the old array until now, because we access it
1874 // while updating pointers in updateCapabilityRefs().
1875 if (old_capabilities) {
1876 stgFree(old_capabilities);
1877 }
1878
1879 rts_unlock(cap);
1880
1881 #endif // THREADED_RTS
1882 }
1883
1884
1885
1886 /* ---------------------------------------------------------------------------
1887 * Delete all the threads in the system
1888 * ------------------------------------------------------------------------- */
1889
1890 static void
1891 deleteAllThreads ( Capability *cap )
1892 {
1893 // NOTE: only safe to call if we own all capabilities.
1894
1895 StgTSO* t, *next;
1896 nat g;
1897
1898 debugTrace(DEBUG_sched,"deleting all threads");
1899 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1900 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1901 next = t->global_link;
1902 deleteThread(cap,t);
1903 }
1904 }
1905
1906 // The run queue now contains a bunch of ThreadKilled threads. We
1907 // must not throw these away: the main thread(s) will be in there
1908 // somewhere, and the main scheduler loop has to deal with it.
1909 // Also, the run queue is the only thing keeping these threads from
1910 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1911
1912 #if !defined(THREADED_RTS)
1913 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1914 ASSERT(sleeping_queue == END_TSO_QUEUE);
1915 #endif
1916 }
1917
1918 /* -----------------------------------------------------------------------------
1919 Managing the suspended_ccalls list.
1920 Locks required: sched_mutex
1921 -------------------------------------------------------------------------- */
1922
1923 STATIC_INLINE void
1924 suspendTask (Capability *cap, Task *task)
1925 {
1926 InCall *incall;
1927
1928 incall = task->incall;
1929 ASSERT(incall->next == NULL && incall->prev == NULL);
1930 incall->next = cap->suspended_ccalls;
1931 incall->prev = NULL;
1932 if (cap->suspended_ccalls) {
1933 cap->suspended_ccalls->prev = incall;
1934 }
1935 cap->suspended_ccalls = incall;
1936 }
1937
1938 STATIC_INLINE void
1939 recoverSuspendedTask (Capability *cap, Task *task)
1940 {
1941 InCall *incall;
1942
1943 incall = task->incall;
1944 if (incall->prev) {
1945 incall->prev->next = incall->next;
1946 } else {
1947 ASSERT(cap->suspended_ccalls == incall);
1948 cap->suspended_ccalls = incall->next;
1949 }
1950 if (incall->next) {
1951 incall->next->prev = incall->prev;
1952 }
1953 incall->next = incall->prev = NULL;
1954 }
1955
1956 /* ---------------------------------------------------------------------------
1957 * Suspending & resuming Haskell threads.
1958 *
1959 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1960 * its capability before calling the C function. This allows another
1961 * task to pick up the capability and carry on running Haskell
1962 * threads. It also means that if the C call blocks, it won't lock
1963 * the whole system.
1964 *
1965 * The Haskell thread making the C call is put to sleep for the
1966 * duration of the call, on the suspended_ccalling_threads queue. We
1967 * give out a token to the task, which it can use to resume the thread
1968 * on return from the C function.
1969 *
1970 * If this is an interruptible C call, this means that the FFI call may be
1971 * unceremoniously terminated and should be scheduled on an
1972 * unbound worker thread.
1973 * ------------------------------------------------------------------------- */
1974
1975 void *
1976 suspendThread (StgRegTable *reg, rtsBool interruptible)
1977 {
1978 Capability *cap;
1979 int saved_errno;
1980 StgTSO *tso;
1981 Task *task;
1982 #if mingw32_HOST_OS
1983 StgWord32 saved_winerror;
1984 #endif
1985
1986 saved_errno = errno;
1987 #if mingw32_HOST_OS
1988 saved_winerror = GetLastError();
1989 #endif
1990
1991 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1992 */
1993 cap = regTableToCapability(reg);
1994
1995 task = cap->running_task;
1996 tso = cap->r.rCurrentTSO;
1997
1998 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
1999
2000 // XXX this might not be necessary --SDM
2001 tso->what_next = ThreadRunGHC;
2002
2003 threadPaused(cap,tso);
2004
2005 if (interruptible) {
2006 tso->why_blocked = BlockedOnCCall_Interruptible;
2007 } else {
2008 tso->why_blocked = BlockedOnCCall;
2009 }
2010
2011 // Hand back capability
2012 task->incall->suspended_tso = tso;
2013 task->incall->suspended_cap = cap;
2014
2015 ACQUIRE_LOCK(&cap->lock);
2016
2017 suspendTask(cap,task);
2018 cap->in_haskell = rtsFalse;
2019 releaseCapability_(cap,rtsFalse);
2020
2021 RELEASE_LOCK(&cap->lock);
2022
2023 errno = saved_errno;
2024 #if mingw32_HOST_OS
2025 SetLastError(saved_winerror);
2026 #endif
2027 return task;
2028 }
2029
2030 StgRegTable *
2031 resumeThread (void *task_)
2032 {
2033 StgTSO *tso;
2034 InCall *incall;
2035 Capability *cap;
2036 Task *task = task_;
2037 int saved_errno;
2038 #if mingw32_HOST_OS
2039 StgWord32 saved_winerror;
2040 #endif
2041
2042 saved_errno = errno;
2043 #if mingw32_HOST_OS
2044 saved_winerror = GetLastError();
2045 #endif
2046
2047 incall = task->incall;
2048 cap = incall->suspended_cap;
2049 task->cap = cap;
2050
2051 // Wait for permission to re-enter the RTS with the result.
2052 waitForReturnCapability(&cap,task);
2053 // we might be on a different capability now... but if so, our
2054 // entry on the suspended_ccalls list will also have been
2055 // migrated.
2056
2057 // Remove the thread from the suspended list
2058 recoverSuspendedTask(cap,task);
2059
2060 tso = incall->suspended_tso;
2061 incall->suspended_tso = NULL;
2062 incall->suspended_cap = NULL;
2063 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2064
2065 traceEventRunThread(cap, tso);
2066
2067 /* Reset blocking status */
2068 tso->why_blocked = NotBlocked;
2069
2070 if ((tso->flags & TSO_BLOCKEX) == 0) {
2071 // avoid locking the TSO if we don't have to
2072 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2073 maybePerformBlockedException(cap,tso);
2074 }
2075 }
2076
2077 cap->r.rCurrentTSO = tso;
2078 cap->in_haskell = rtsTrue;
2079 errno = saved_errno;
2080 #if mingw32_HOST_OS
2081 SetLastError(saved_winerror);
2082 #endif
2083
2084 /* We might have GC'd, mark the TSO dirty again */
2085 dirty_TSO(cap,tso);
2086 dirty_STACK(cap,tso->stackobj);
2087
2088 IF_DEBUG(sanity, checkTSO(tso));
2089
2090 return &cap->r;
2091 }
2092
2093 /* ---------------------------------------------------------------------------
2094 * scheduleThread()
2095 *
2096 * scheduleThread puts a thread on the end of the runnable queue.
2097 * This will usually be done immediately after a thread is created.
2098 * The caller of scheduleThread must create the thread using e.g.
2099 * createThread and push an appropriate closure
2100 * on this thread's stack before the scheduler is invoked.
2101 * ------------------------------------------------------------------------ */
2102
2103 void
2104 scheduleThread(Capability *cap, StgTSO *tso)
2105 {
2106 // The thread goes at the *end* of the run-queue, to avoid possible
2107 // starvation of any threads already on the queue.
2108 appendToRunQueue(cap,tso);
2109 }
2110
2111 void
2112 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2113 {
2114 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2115 // move this thread from now on.
2116 #if defined(THREADED_RTS)
2117 cpu %= n_capabilities;
2118 if (cpu == cap->no) {
2119 appendToRunQueue(cap,tso);
2120 } else {
2121 migrateThread(cap, tso, &capabilities[cpu]);
2122 }
2123 #else
2124 appendToRunQueue(cap,tso);
2125 #endif
2126 }
2127
2128 void
2129 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2130 {
2131 Task *task;
2132 DEBUG_ONLY( StgThreadID id );
2133 Capability *cap;
2134
2135 cap = *pcap;
2136
2137 // We already created/initialised the Task
2138 task = cap->running_task;
2139
2140 // This TSO is now a bound thread; make the Task and TSO
2141 // point to each other.
2142 tso->bound = task->incall;
2143 tso->cap = cap;
2144
2145 task->incall->tso = tso;
2146 task->incall->ret = ret;
2147 task->incall->stat = NoStatus;
2148
2149 appendToRunQueue(cap,tso);
2150
2151 DEBUG_ONLY( id = tso->id );
2152 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2153
2154 cap = schedule(cap,task);
2155
2156 ASSERT(task->incall->stat != NoStatus);
2157 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2158
2159 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2160 *pcap = cap;
2161 }
2162
2163 /* ----------------------------------------------------------------------------
2164 * Starting Tasks
2165 * ------------------------------------------------------------------------- */
2166
2167 #if defined(THREADED_RTS)
2168 void scheduleWorker (Capability *cap, Task *task)
2169 {
2170 // schedule() runs without a lock.
2171 cap = schedule(cap,task);
2172
2173 // On exit from schedule(), we have a Capability, but possibly not
2174 // the same one we started with.
2175
2176 // During shutdown, the requirement is that after all the
2177 // Capabilities are shut down, all workers that are shutting down
2178 // have finished workerTaskStop(). This is why we hold on to
2179 // cap->lock until we've finished workerTaskStop() below.
2180 //
2181 // There may be workers still involved in foreign calls; those
2182 // will just block in waitForReturnCapability() because the
2183 // Capability has been shut down.
2184 //
2185 ACQUIRE_LOCK(&cap->lock);
2186 releaseCapability_(cap,rtsFalse);
2187 workerTaskStop(task);
2188 RELEASE_LOCK(&cap->lock);
2189 }
2190 #endif
2191
2192 /* ---------------------------------------------------------------------------
2193 * Start new worker tasks on Capabilities from--to
2194 * -------------------------------------------------------------------------- */
2195
2196 static void
2197 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2198 {
2199 #if defined(THREADED_RTS)
2200 nat i;
2201 Capability *cap;
2202
2203 for (i = from; i < to; i++) {
2204 cap = &capabilities[i];
2205 ACQUIRE_LOCK(&cap->lock);
2206 startWorkerTask(cap);
2207 RELEASE_LOCK(&cap->lock);
2208 }
2209 #endif
2210 }
2211
2212 /* ---------------------------------------------------------------------------
2213 * initScheduler()
2214 *
2215 * Initialise the scheduler. This resets all the queues - if the
2216 * queues contained any threads, they'll be garbage collected at the
2217 * next pass.
2218 *
2219 * ------------------------------------------------------------------------ */
2220
2221 void
2222 initScheduler(void)
2223 {
2224 #if !defined(THREADED_RTS)
2225 blocked_queue_hd = END_TSO_QUEUE;
2226 blocked_queue_tl = END_TSO_QUEUE;
2227 sleeping_queue = END_TSO_QUEUE;
2228 #endif
2229
2230 sched_state = SCHED_RUNNING;
2231 recent_activity = ACTIVITY_YES;
2232
2233 #if defined(THREADED_RTS)
2234 /* Initialise the mutex and condition variables used by
2235 * the scheduler. */
2236 initMutex(&sched_mutex);
2237 #endif
2238
2239 ACQUIRE_LOCK(&sched_mutex);
2240
2241 /* A capability holds the state a native thread needs in
2242 * order to execute STG code. At least one capability is
2243 * floating around (only THREADED_RTS builds have more than one).
2244 */
2245 initCapabilities();
2246
2247 initTaskManager();
2248
2249 /*
2250 * Eagerly start one worker to run each Capability, except for
2251 * Capability 0. The idea is that we're probably going to start a
2252 * bound thread on Capability 0 pretty soon, so we don't want a
2253 * worker task hogging it.
2254 */
2255 startWorkerTasks(1, n_capabilities);
2256
2257 RELEASE_LOCK(&sched_mutex);
2258
2259 }
2260
2261 void
2262 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2263 /* see Capability.c, shutdownCapability() */
2264 {
2265 Task *task = NULL;
2266
2267 task = newBoundTask();
2268
2269 // If we haven't killed all the threads yet, do it now.
2270 if (sched_state < SCHED_SHUTTING_DOWN) {
2271 sched_state = SCHED_INTERRUPTING;
2272 waitForReturnCapability(&task->cap,task);
2273 scheduleDoGC(task->cap,task,rtsFalse);
2274 ASSERT(task->incall->tso == NULL);
2275 releaseCapability(task->cap);
2276 }
2277 sched_state = SCHED_SHUTTING_DOWN;
2278
2279 shutdownCapabilities(task, wait_foreign);
2280
2281 boundTaskExiting(task);
2282 }
2283
2284 void
2285 freeScheduler( void )
2286 {
2287 nat still_running;
2288
2289 ACQUIRE_LOCK(&sched_mutex);
2290 still_running = freeTaskManager();
2291 // We can only free the Capabilities if there are no Tasks still
2292 // running. We might have a Task about to return from a foreign
2293 // call into waitForReturnCapability(), for example (actually,
2294 // this should be the *only* thing that a still-running Task can
2295 // do at this point, and it will block waiting for the
2296 // Capability).
2297 if (still_running == 0) {
2298 freeCapabilities();
2299 if (n_capabilities != 1) {
2300 stgFree(capabilities);
2301 }
2302 }
2303 RELEASE_LOCK(&sched_mutex);
2304 #if defined(THREADED_RTS)
2305 closeMutex(&sched_mutex);
2306 #endif
2307 }
2308
2309 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2310 void *user USED_IF_NOT_THREADS)
2311 {
2312 #if !defined(THREADED_RTS)
2313 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2314 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2315 evac(user, (StgClosure **)(void *)&sleeping_queue);
2316 #endif
2317 }
2318
2319 /* -----------------------------------------------------------------------------
2320 performGC
2321
2322 This is the interface to the garbage collector from Haskell land.
2323 We provide this so that external C code can allocate and garbage
2324 collect when called from Haskell via _ccall_GC.
2325 -------------------------------------------------------------------------- */
2326
2327 static void
2328 performGC_(rtsBool force_major)
2329 {
2330 Task *task;
2331
2332 // We must grab a new Task here, because the existing Task may be
2333 // associated with a particular Capability, and chained onto the
2334 // suspended_ccalls queue.
2335 task = newBoundTask();
2336
2337 waitForReturnCapability(&task->cap,task);
2338 scheduleDoGC(task->cap,task,force_major);
2339 releaseCapability(task->cap);
2340 boundTaskExiting(task);
2341 }
2342
2343 void
2344 performGC(void)
2345 {
2346 performGC_(rtsFalse);
2347 }
2348
2349 void
2350 performMajorGC(void)
2351 {
2352 performGC_(rtsTrue);
2353 }
2354
2355 /* ---------------------------------------------------------------------------
2356 Interrupt execution
2357 - usually called inside a signal handler so it mustn't do anything fancy.
2358 ------------------------------------------------------------------------ */
2359
2360 void
2361 interruptStgRts(void)
2362 {
2363 sched_state = SCHED_INTERRUPTING;
2364 interruptAllCapabilities();
2365 #if defined(THREADED_RTS)
2366 wakeUpRts();
2367 #endif
2368 }
2369
2370 /* -----------------------------------------------------------------------------
2371 Wake up the RTS
2372
2373 This function causes at least one OS thread to wake up and run the
2374 scheduler loop. It is invoked when the RTS might be deadlocked, or
2375 an external event has arrived that may need servicing (eg. a
2376 keyboard interrupt).
2377
2378 In the single-threaded RTS we don't do anything here; we only have
2379 one thread anyway, and the event that caused us to want to wake up
2380 will have interrupted any blocking system call in progress anyway.
2381 -------------------------------------------------------------------------- */
2382
2383 #if defined(THREADED_RTS)
2384 void wakeUpRts(void)
2385 {
2386 // This forces the IO Manager thread to wakeup, which will
2387 // in turn ensure that some OS thread wakes up and runs the
2388 // scheduler loop, which will cause a GC and deadlock check.
2389 ioManagerWakeup();
2390 }
2391 #endif
2392
2393 /* -----------------------------------------------------------------------------
2394 Deleting threads
2395
2396 This is used for interruption (^C) and forking, and corresponds to
2397 raising an exception but without letting the thread catch the
2398 exception.
2399 -------------------------------------------------------------------------- */
2400
2401 static void
2402 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2403 {
2404 // NOTE: must only be called on a TSO that we have exclusive
2405 // access to, because we will call throwToSingleThreaded() below.
2406 // The TSO must be on the run queue of the Capability we own, or
2407 // we must own all Capabilities.
2408
2409 if (tso->why_blocked != BlockedOnCCall &&
2410 tso->why_blocked != BlockedOnCCall_Interruptible) {
2411 throwToSingleThreaded(tso->cap,tso,NULL);
2412 }
2413 }
2414
2415 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2416 static void
2417 deleteThread_(Capability *cap, StgTSO *tso)
2418 { // for forkProcess only:
2419 // like deleteThread(), but we delete threads in foreign calls, too.
2420
2421 if (tso->why_blocked == BlockedOnCCall ||
2422 tso->why_blocked == BlockedOnCCall_Interruptible) {
2423 tso->what_next = ThreadKilled;
2424 appendToRunQueue(tso->cap, tso);
2425 } else {
2426 deleteThread(cap,tso);
2427 }
2428 }
2429 #endif
2430
2431 /* -----------------------------------------------------------------------------
2432 raiseExceptionHelper
2433
2434 This function is called by the raise# primitve, just so that we can
2435 move some of the tricky bits of raising an exception from C-- into
2436 C. Who knows, it might be a useful re-useable thing here too.
2437 -------------------------------------------------------------------------- */
2438
2439 StgWord
2440 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2441 {
2442 Capability *cap = regTableToCapability(reg);
2443 StgThunk *raise_closure = NULL;
2444 StgPtr p, next;
2445 StgRetInfoTable *info;
2446 //
2447 // This closure represents the expression 'raise# E' where E
2448 // is the exception raise. It is used to overwrite all the
2449 // thunks which are currently under evaluataion.
2450 //
2451
2452 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2453 // LDV profiling: stg_raise_info has THUNK as its closure
2454 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2455 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2456 // 1 does not cause any problem unless profiling is performed.
2457 // However, when LDV profiling goes on, we need to linearly scan
2458 // small object pool, where raise_closure is stored, so we should
2459 // use MIN_UPD_SIZE.
2460 //
2461 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2462 // sizeofW(StgClosure)+1);
2463 //
2464
2465 //
2466 // Walk up the stack, looking for the catch frame. On the way,
2467 // we update any closures pointed to from update frames with the
2468 // raise closure that we just built.
2469 //
2470 p = tso->stackobj->sp;
2471 while(1) {
2472 info = get_ret_itbl((StgClosure *)p);
2473 next = p + stack_frame_sizeW((StgClosure *)p);
2474 switch (info->i.type) {
2475
2476 case UPDATE_FRAME:
2477 // Only create raise_closure if we need to.
2478 if (raise_closure == NULL) {
2479 raise_closure =
2480 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2481 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2482 raise_closure->payload[0] = exception;
2483 }
2484 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2485 (StgClosure *)raise_closure);
2486 p = next;
2487 continue;
2488
2489 case ATOMICALLY_FRAME:
2490 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2491 tso->stackobj->sp = p;
2492 return ATOMICALLY_FRAME;
2493
2494 case CATCH_FRAME:
2495 tso->stackobj->sp = p;
2496 return CATCH_FRAME;
2497
2498 case CATCH_STM_FRAME:
2499 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2500 tso->stackobj->sp = p;
2501 return CATCH_STM_FRAME;
2502
2503 case UNDERFLOW_FRAME:
2504 tso->stackobj->sp = p;
2505 threadStackUnderflow(cap,tso);
2506 p = tso->stackobj->sp;
2507 continue;
2508
2509 case STOP_FRAME:
2510 tso->stackobj->sp = p;
2511 return STOP_FRAME;
2512
2513 case CATCH_RETRY_FRAME:
2514 default:
2515 p = next;
2516 continue;
2517 }
2518 }
2519 }
2520
2521
2522 /* -----------------------------------------------------------------------------
2523 findRetryFrameHelper
2524
2525 This function is called by the retry# primitive. It traverses the stack
2526 leaving tso->sp referring to the frame which should handle the retry.
2527
2528 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2529 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2530
2531 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2532 create) because retries are not considered to be exceptions, despite the
2533 similar implementation.
2534
2535 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2536 not be created within memory transactions.
2537 -------------------------------------------------------------------------- */
2538
2539 StgWord
2540 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2541 {
2542 StgPtr p, next;
2543 StgRetInfoTable *info;
2544
2545 p = tso->stackobj->sp;
2546 while (1) {
2547 info = get_ret_itbl((StgClosure *)p);
2548 next = p + stack_frame_sizeW((StgClosure *)p);
2549 switch (info->i.type) {
2550
2551 case ATOMICALLY_FRAME:
2552 debugTrace(DEBUG_stm,
2553 "found ATOMICALLY_FRAME at %p during retry", p);
2554 tso->stackobj->sp = p;
2555 return ATOMICALLY_FRAME;
2556
2557 case CATCH_RETRY_FRAME:
2558 debugTrace(DEBUG_stm,
2559 "found CATCH_RETRY_FRAME at %p during retrry", p);
2560 tso->stackobj->sp = p;
2561 return CATCH_RETRY_FRAME;
2562
2563 case CATCH_STM_FRAME: {
2564 StgTRecHeader *trec = tso -> trec;
2565 StgTRecHeader *outer = trec -> enclosing_trec;
2566 debugTrace(DEBUG_stm,
2567 "found CATCH_STM_FRAME at %p during retry", p);
2568 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2569 stmAbortTransaction(cap, trec);
2570 stmFreeAbortedTRec(cap, trec);
2571 tso -> trec = outer;
2572 p = next;
2573 continue;
2574 }
2575
2576 case UNDERFLOW_FRAME:
2577 threadStackUnderflow(cap,tso);
2578 p = tso->stackobj->sp;
2579 continue;
2580
2581 default:
2582 ASSERT(info->i.type != CATCH_FRAME);
2583 ASSERT(info->i.type != STOP_FRAME);
2584 p = next;
2585 continue;
2586 }
2587 }
2588 }
2589
2590 /* -----------------------------------------------------------------------------
2591 resurrectThreads is called after garbage collection on the list of
2592 threads found to be garbage. Each of these threads will be woken
2593 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2594 on an MVar, or NonTermination if the thread was blocked on a Black
2595 Hole.
2596
2597 Locks: assumes we hold *all* the capabilities.
2598 -------------------------------------------------------------------------- */
2599
2600 void
2601 resurrectThreads (StgTSO *threads)
2602 {
2603 StgTSO *tso, *next;
2604 Capability *cap;
2605 generation *gen;
2606
2607 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2608 next = tso->global_link;
2609
2610 gen = Bdescr((P_)tso)->gen;
2611 tso->global_link = gen->threads;
2612 gen->threads = tso;
2613
2614 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2615
2616 // Wake up the thread on the Capability it was last on
2617 cap = tso->cap;
2618
2619 switch (tso->why_blocked) {
2620 case BlockedOnMVar:
2621 /* Called by GC - sched_mutex lock is currently held. */
2622 throwToSingleThreaded(cap, tso,
2623 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2624 break;
2625 case BlockedOnBlackHole:
2626 throwToSingleThreaded(cap, tso,
2627 (StgClosure *)nonTermination_closure);
2628 break;
2629 case BlockedOnSTM:
2630 throwToSingleThreaded(cap, tso,
2631 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2632 break;
2633 case NotBlocked:
2634 /* This might happen if the thread was blocked on a black hole
2635 * belonging to a thread that we've just woken up (raiseAsync
2636 * can wake up threads, remember...).
2637 */
2638 continue;
2639 case BlockedOnMsgThrowTo:
2640 // This can happen if the target is masking, blocks on a
2641 // black hole, and then is found to be unreachable. In
2642 // this case, we want to let the target wake up and carry
2643 // on, and do nothing to this thread.
2644 continue;
2645 default:
2646 barf("resurrectThreads: thread blocked in a strange way: %d",
2647 tso->why_blocked);
2648 }
2649 }
2650 }