Merge branch 'master' of http://darcs.haskell.org/ghc
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "Sparks.h"
31 #include "Capability.h"
32 #include "Task.h"
33 #include "AwaitEvent.h"
34 #if defined(mingw32_HOST_OS)
35 #include "win32/IOManager.h"
36 #endif
37 #include "Trace.h"
38 #include "RaiseAsync.h"
39 #include "Threads.h"
40 #include "Timer.h"
41 #include "ThreadPaused.h"
42 #include "Messages.h"
43
44 #ifdef HAVE_SYS_TYPES_H
45 #include <sys/types.h>
46 #endif
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <string.h>
52 #include <stdlib.h>
53 #include <stdarg.h>
54
55 #ifdef HAVE_ERRNO_H
56 #include <errno.h>
57 #endif
58
59 #ifdef TRACING
60 #include "eventlog/EventLog.h"
61 #endif
62 /* -----------------------------------------------------------------------------
63 * Global variables
64 * -------------------------------------------------------------------------- */
65
66 #if !defined(THREADED_RTS)
67 // Blocked/sleeping thrads
68 StgTSO *blocked_queue_hd = NULL;
69 StgTSO *blocked_queue_tl = NULL;
70 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
71 #endif
72
73 /* Set to true when the latest garbage collection failed to reclaim
74 * enough space, and the runtime should proceed to shut itself down in
75 * an orderly fashion (emitting profiling info etc.)
76 */
77 rtsBool heap_overflow = rtsFalse;
78
79 /* flag that tracks whether we have done any execution in this time slice.
80 * LOCK: currently none, perhaps we should lock (but needs to be
81 * updated in the fast path of the scheduler).
82 *
83 * NB. must be StgWord, we do xchg() on it.
84 */
85 volatile StgWord recent_activity = ACTIVITY_YES;
86
87 /* if this flag is set as well, give up execution
88 * LOCK: none (changes monotonically)
89 */
90 volatile StgWord sched_state = SCHED_RUNNING;
91
92 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
93 * exists - earlier gccs apparently didn't.
94 * -= chak
95 */
96 StgTSO dummy_tso;
97
98 /*
99 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
100 * in an MT setting, needed to signal that a worker thread shouldn't hang around
101 * in the scheduler when it is out of work.
102 */
103 rtsBool shutting_down_scheduler = rtsFalse;
104
105 /*
106 * This mutex protects most of the global scheduler data in
107 * the THREADED_RTS runtime.
108 */
109 #if defined(THREADED_RTS)
110 Mutex sched_mutex;
111 #endif
112
113 #if !defined(mingw32_HOST_OS)
114 #define FORKPROCESS_PRIMOP_SUPPORTED
115 #endif
116
117 /* -----------------------------------------------------------------------------
118 * static function prototypes
119 * -------------------------------------------------------------------------- */
120
121 static Capability *schedule (Capability *initialCapability, Task *task);
122
123 //
124 // These function all encapsulate parts of the scheduler loop, and are
125 // abstracted only to make the structure and control flow of the
126 // scheduler clearer.
127 //
128 static void schedulePreLoop (void);
129 static void scheduleFindWork (Capability *cap);
130 #if defined(THREADED_RTS)
131 static void scheduleYield (Capability **pcap, Task *task);
132 #endif
133 static void scheduleStartSignalHandlers (Capability *cap);
134 static void scheduleCheckBlockedThreads (Capability *cap);
135 static void scheduleProcessInbox(Capability *cap);
136 static void scheduleDetectDeadlock (Capability *cap, Task *task);
137 static void schedulePushWork(Capability *cap, Task *task);
138 #if defined(THREADED_RTS)
139 static void scheduleActivateSpark(Capability *cap);
140 #endif
141 static void schedulePostRunThread(Capability *cap, StgTSO *t);
142 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
143 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
144 nat prev_what_next );
145 static void scheduleHandleThreadBlocked( StgTSO *t );
146 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
147 StgTSO *t );
148 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
149 static Capability *scheduleDoGC(Capability *cap, Task *task,
150 rtsBool force_major);
151
152 static void deleteThread (Capability *cap, StgTSO *tso);
153 static void deleteAllThreads (Capability *cap);
154
155 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
156 static void deleteThread_(Capability *cap, StgTSO *tso);
157 #endif
158
159 /* ---------------------------------------------------------------------------
160 Main scheduling loop.
161
162 We use round-robin scheduling, each thread returning to the
163 scheduler loop when one of these conditions is detected:
164
165 * out of heap space
166 * timer expires (thread yields)
167 * thread blocks
168 * thread ends
169 * stack overflow
170
171 GRAN version:
172 In a GranSim setup this loop iterates over the global event queue.
173 This revolves around the global event queue, which determines what
174 to do next. Therefore, it's more complicated than either the
175 concurrent or the parallel (GUM) setup.
176 This version has been entirely removed (JB 2008/08).
177
178 GUM version:
179 GUM iterates over incoming messages.
180 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
181 and sends out a fish whenever it has nothing to do; in-between
182 doing the actual reductions (shared code below) it processes the
183 incoming messages and deals with delayed operations
184 (see PendingFetches).
185 This is not the ugliest code you could imagine, but it's bloody close.
186
187 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
188 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
189 as well as future GUM versions. This file has been refurbished to
190 only contain valid code, which is however incomplete, refers to
191 invalid includes etc.
192
193 ------------------------------------------------------------------------ */
194
195 static Capability *
196 schedule (Capability *initialCapability, Task *task)
197 {
198 StgTSO *t;
199 Capability *cap;
200 StgThreadReturnCode ret;
201 nat prev_what_next;
202 rtsBool ready_to_gc;
203 #if defined(THREADED_RTS)
204 rtsBool first = rtsTrue;
205 #endif
206
207 cap = initialCapability;
208
209 // Pre-condition: this task owns initialCapability.
210 // The sched_mutex is *NOT* held
211 // NB. on return, we still hold a capability.
212
213 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
214
215 schedulePreLoop();
216
217 // -----------------------------------------------------------
218 // Scheduler loop starts here:
219
220 while (1) {
221
222 // Check whether we have re-entered the RTS from Haskell without
223 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
224 // call).
225 if (cap->in_haskell) {
226 errorBelch("schedule: re-entered unsafely.\n"
227 " Perhaps a 'foreign import unsafe' should be 'safe'?");
228 stg_exit(EXIT_FAILURE);
229 }
230
231 // The interruption / shutdown sequence.
232 //
233 // In order to cleanly shut down the runtime, we want to:
234 // * make sure that all main threads return to their callers
235 // with the state 'Interrupted'.
236 // * clean up all OS threads assocated with the runtime
237 // * free all memory etc.
238 //
239 // So the sequence for ^C goes like this:
240 //
241 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
242 // arranges for some Capability to wake up
243 //
244 // * all threads in the system are halted, and the zombies are
245 // placed on the run queue for cleaning up. We acquire all
246 // the capabilities in order to delete the threads, this is
247 // done by scheduleDoGC() for convenience (because GC already
248 // needs to acquire all the capabilities). We can't kill
249 // threads involved in foreign calls.
250 //
251 // * somebody calls shutdownHaskell(), which calls exitScheduler()
252 //
253 // * sched_state := SCHED_SHUTTING_DOWN
254 //
255 // * all workers exit when the run queue on their capability
256 // drains. All main threads will also exit when their TSO
257 // reaches the head of the run queue and they can return.
258 //
259 // * eventually all Capabilities will shut down, and the RTS can
260 // exit.
261 //
262 // * We might be left with threads blocked in foreign calls,
263 // we should really attempt to kill these somehow (TODO);
264
265 switch (sched_state) {
266 case SCHED_RUNNING:
267 break;
268 case SCHED_INTERRUPTING:
269 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
270 #if defined(THREADED_RTS)
271 discardSparksCap(cap);
272 #endif
273 /* scheduleDoGC() deletes all the threads */
274 cap = scheduleDoGC(cap,task,rtsFalse);
275
276 // after scheduleDoGC(), we must be shutting down. Either some
277 // other Capability did the final GC, or we did it above,
278 // either way we can fall through to the SCHED_SHUTTING_DOWN
279 // case now.
280 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
281 // fall through
282
283 case SCHED_SHUTTING_DOWN:
284 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
285 // If we are a worker, just exit. If we're a bound thread
286 // then we will exit below when we've removed our TSO from
287 // the run queue.
288 if (!isBoundTask(task) && emptyRunQueue(cap)) {
289 return cap;
290 }
291 break;
292 default:
293 barf("sched_state: %d", sched_state);
294 }
295
296 scheduleFindWork(cap);
297
298 /* work pushing, currently relevant only for THREADED_RTS:
299 (pushes threads, wakes up idle capabilities for stealing) */
300 schedulePushWork(cap,task);
301
302 scheduleDetectDeadlock(cap,task);
303
304 #if defined(THREADED_RTS)
305 cap = task->cap; // reload cap, it might have changed
306 #endif
307
308 // Normally, the only way we can get here with no threads to
309 // run is if a keyboard interrupt received during
310 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
311 // Additionally, it is not fatal for the
312 // threaded RTS to reach here with no threads to run.
313 //
314 // win32: might be here due to awaitEvent() being abandoned
315 // as a result of a console event having been delivered.
316
317 #if defined(THREADED_RTS)
318 if (first)
319 {
320 // XXX: ToDo
321 // // don't yield the first time, we want a chance to run this
322 // // thread for a bit, even if there are others banging at the
323 // // door.
324 // first = rtsFalse;
325 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
326 }
327
328 scheduleYield(&cap,task);
329
330 if (emptyRunQueue(cap)) continue; // look for work again
331 #endif
332
333 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
334 if ( emptyRunQueue(cap) ) {
335 ASSERT(sched_state >= SCHED_INTERRUPTING);
336 }
337 #endif
338
339 //
340 // Get a thread to run
341 //
342 t = popRunQueue(cap);
343
344 // Sanity check the thread we're about to run. This can be
345 // expensive if there is lots of thread switching going on...
346 IF_DEBUG(sanity,checkTSO(t));
347
348 #if defined(THREADED_RTS)
349 // Check whether we can run this thread in the current task.
350 // If not, we have to pass our capability to the right task.
351 {
352 InCall *bound = t->bound;
353
354 if (bound) {
355 if (bound->task == task) {
356 // yes, the Haskell thread is bound to the current native thread
357 } else {
358 debugTrace(DEBUG_sched,
359 "thread %lu bound to another OS thread",
360 (unsigned long)t->id);
361 // no, bound to a different Haskell thread: pass to that thread
362 pushOnRunQueue(cap,t);
363 continue;
364 }
365 } else {
366 // The thread we want to run is unbound.
367 if (task->incall->tso) {
368 debugTrace(DEBUG_sched,
369 "this OS thread cannot run thread %lu",
370 (unsigned long)t->id);
371 // no, the current native thread is bound to a different
372 // Haskell thread, so pass it to any worker thread
373 pushOnRunQueue(cap,t);
374 continue;
375 }
376 }
377 }
378 #endif
379
380 // If we're shutting down, and this thread has not yet been
381 // killed, kill it now. This sometimes happens when a finalizer
382 // thread is created by the final GC, or a thread previously
383 // in a foreign call returns.
384 if (sched_state >= SCHED_INTERRUPTING &&
385 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
386 deleteThread(cap,t);
387 }
388
389 /* context switches are initiated by the timer signal, unless
390 * the user specified "context switch as often as possible", with
391 * +RTS -C0
392 */
393 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
394 && !emptyThreadQueues(cap)) {
395 cap->context_switch = 1;
396 }
397
398 run_thread:
399
400 // CurrentTSO is the thread to run. t might be different if we
401 // loop back to run_thread, so make sure to set CurrentTSO after
402 // that.
403 cap->r.rCurrentTSO = t;
404
405 startHeapProfTimer();
406
407 // ----------------------------------------------------------------------
408 // Run the current thread
409
410 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
411 ASSERT(t->cap == cap);
412 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
413
414 prev_what_next = t->what_next;
415
416 errno = t->saved_errno;
417 #if mingw32_HOST_OS
418 SetLastError(t->saved_winerror);
419 #endif
420
421 cap->in_haskell = rtsTrue;
422
423 dirty_TSO(cap,t);
424 dirty_STACK(cap,t->stackobj);
425
426 #if defined(THREADED_RTS)
427 if (recent_activity == ACTIVITY_DONE_GC) {
428 // ACTIVITY_DONE_GC means we turned off the timer signal to
429 // conserve power (see #1623). Re-enable it here.
430 nat prev;
431 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
432 if (prev == ACTIVITY_DONE_GC) {
433 startTimer();
434 }
435 } else if (recent_activity != ACTIVITY_INACTIVE) {
436 // If we reached ACTIVITY_INACTIVE, then don't reset it until
437 // we've done the GC. The thread running here might just be
438 // the IO manager thread that handle_tick() woke up via
439 // wakeUpRts().
440 recent_activity = ACTIVITY_YES;
441 }
442 #endif
443
444 traceEventRunThread(cap, t);
445
446 switch (prev_what_next) {
447
448 case ThreadKilled:
449 case ThreadComplete:
450 /* Thread already finished, return to scheduler. */
451 ret = ThreadFinished;
452 break;
453
454 case ThreadRunGHC:
455 {
456 StgRegTable *r;
457 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
458 cap = regTableToCapability(r);
459 ret = r->rRet;
460 break;
461 }
462
463 case ThreadInterpret:
464 cap = interpretBCO(cap);
465 ret = cap->r.rRet;
466 break;
467
468 default:
469 barf("schedule: invalid what_next field");
470 }
471
472 cap->in_haskell = rtsFalse;
473
474 // The TSO might have moved, eg. if it re-entered the RTS and a GC
475 // happened. So find the new location:
476 t = cap->r.rCurrentTSO;
477
478 // And save the current errno in this thread.
479 // XXX: possibly bogus for SMP because this thread might already
480 // be running again, see code below.
481 t->saved_errno = errno;
482 #if mingw32_HOST_OS
483 // Similarly for Windows error code
484 t->saved_winerror = GetLastError();
485 #endif
486
487 if (ret == ThreadBlocked) {
488 if (t->why_blocked == BlockedOnBlackHole) {
489 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
490 traceEventStopThread(cap, t, t->why_blocked + 6,
491 owner != NULL ? owner->id : 0);
492 } else {
493 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
494 }
495 } else {
496 traceEventStopThread(cap, t, ret, 0);
497 }
498
499 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
500 ASSERT(t->cap == cap);
501
502 // ----------------------------------------------------------------------
503
504 // Costs for the scheduler are assigned to CCS_SYSTEM
505 stopHeapProfTimer();
506 #if defined(PROFILING)
507 CCCS = CCS_SYSTEM;
508 #endif
509
510 schedulePostRunThread(cap,t);
511
512 ready_to_gc = rtsFalse;
513
514 switch (ret) {
515 case HeapOverflow:
516 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
517 break;
518
519 case StackOverflow:
520 // just adjust the stack for this thread, then pop it back
521 // on the run queue.
522 threadStackOverflow(cap, t);
523 pushOnRunQueue(cap,t);
524 break;
525
526 case ThreadYielding:
527 if (scheduleHandleYield(cap, t, prev_what_next)) {
528 // shortcut for switching between compiler/interpreter:
529 goto run_thread;
530 }
531 break;
532
533 case ThreadBlocked:
534 scheduleHandleThreadBlocked(t);
535 break;
536
537 case ThreadFinished:
538 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
539 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
540 break;
541
542 default:
543 barf("schedule: invalid thread return code %d", (int)ret);
544 }
545
546 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
547 cap = scheduleDoGC(cap,task,rtsFalse);
548 }
549 } /* end of while() */
550 }
551
552 /* -----------------------------------------------------------------------------
553 * Run queue operations
554 * -------------------------------------------------------------------------- */
555
556 void
557 removeFromRunQueue (Capability *cap, StgTSO *tso)
558 {
559 if (tso->block_info.prev == END_TSO_QUEUE) {
560 ASSERT(cap->run_queue_hd == tso);
561 cap->run_queue_hd = tso->_link;
562 } else {
563 setTSOLink(cap, tso->block_info.prev, tso->_link);
564 }
565 if (tso->_link == END_TSO_QUEUE) {
566 ASSERT(cap->run_queue_tl == tso);
567 cap->run_queue_tl = tso->block_info.prev;
568 } else {
569 setTSOPrev(cap, tso->_link, tso->block_info.prev);
570 }
571 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
572
573 IF_DEBUG(sanity, checkRunQueue(cap));
574 }
575
576 /* ----------------------------------------------------------------------------
577 * Setting up the scheduler loop
578 * ------------------------------------------------------------------------- */
579
580 static void
581 schedulePreLoop(void)
582 {
583 // initialisation for scheduler - what cannot go into initScheduler()
584 }
585
586 /* -----------------------------------------------------------------------------
587 * scheduleFindWork()
588 *
589 * Search for work to do, and handle messages from elsewhere.
590 * -------------------------------------------------------------------------- */
591
592 static void
593 scheduleFindWork (Capability *cap)
594 {
595 scheduleStartSignalHandlers(cap);
596
597 scheduleProcessInbox(cap);
598
599 scheduleCheckBlockedThreads(cap);
600
601 #if defined(THREADED_RTS)
602 if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
603 #endif
604 }
605
606 #if defined(THREADED_RTS)
607 STATIC_INLINE rtsBool
608 shouldYieldCapability (Capability *cap, Task *task)
609 {
610 // we need to yield this capability to someone else if..
611 // - another thread is initiating a GC
612 // - another Task is returning from a foreign call
613 // - the thread at the head of the run queue cannot be run
614 // by this Task (it is bound to another Task, or it is unbound
615 // and this task it bound).
616 return (waiting_for_gc ||
617 cap->returning_tasks_hd != NULL ||
618 (!emptyRunQueue(cap) && (task->incall->tso == NULL
619 ? cap->run_queue_hd->bound != NULL
620 : cap->run_queue_hd->bound != task->incall)));
621 }
622
623 // This is the single place where a Task goes to sleep. There are
624 // two reasons it might need to sleep:
625 // - there are no threads to run
626 // - we need to yield this Capability to someone else
627 // (see shouldYieldCapability())
628 //
629 // Careful: the scheduler loop is quite delicate. Make sure you run
630 // the tests in testsuite/concurrent (all ways) after modifying this,
631 // and also check the benchmarks in nofib/parallel for regressions.
632
633 static void
634 scheduleYield (Capability **pcap, Task *task)
635 {
636 Capability *cap = *pcap;
637
638 // if we have work, and we don't need to give up the Capability, continue.
639 //
640 if (!shouldYieldCapability(cap,task) &&
641 (!emptyRunQueue(cap) ||
642 !emptyInbox(cap) ||
643 sched_state >= SCHED_INTERRUPTING))
644 return;
645
646 // otherwise yield (sleep), and keep yielding if necessary.
647 do {
648 yieldCapability(&cap,task);
649 }
650 while (shouldYieldCapability(cap,task));
651
652 // note there may still be no threads on the run queue at this
653 // point, the caller has to check.
654
655 *pcap = cap;
656 return;
657 }
658 #endif
659
660 /* -----------------------------------------------------------------------------
661 * schedulePushWork()
662 *
663 * Push work to other Capabilities if we have some.
664 * -------------------------------------------------------------------------- */
665
666 static void
667 schedulePushWork(Capability *cap USED_IF_THREADS,
668 Task *task USED_IF_THREADS)
669 {
670 /* following code not for PARALLEL_HASKELL. I kept the call general,
671 future GUM versions might use pushing in a distributed setup */
672 #if defined(THREADED_RTS)
673
674 Capability *free_caps[n_capabilities], *cap0;
675 nat i, n_free_caps;
676
677 // migration can be turned off with +RTS -qm
678 if (!RtsFlags.ParFlags.migrate) return;
679
680 // Check whether we have more threads on our run queue, or sparks
681 // in our pool, that we could hand to another Capability.
682 if (cap->run_queue_hd == END_TSO_QUEUE) {
683 if (sparkPoolSizeCap(cap) < 2) return;
684 } else {
685 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
686 sparkPoolSizeCap(cap) < 1) return;
687 }
688
689 // First grab as many free Capabilities as we can.
690 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
691 cap0 = &capabilities[i];
692 if (cap != cap0 && tryGrabCapability(cap0,task)) {
693 if (!emptyRunQueue(cap0)
694 || cap->returning_tasks_hd != NULL
695 || cap->inbox != (Message*)END_TSO_QUEUE) {
696 // it already has some work, we just grabbed it at
697 // the wrong moment. Or maybe it's deadlocked!
698 releaseCapability(cap0);
699 } else {
700 free_caps[n_free_caps++] = cap0;
701 }
702 }
703 }
704
705 // we now have n_free_caps free capabilities stashed in
706 // free_caps[]. Share our run queue equally with them. This is
707 // probably the simplest thing we could do; improvements we might
708 // want to do include:
709 //
710 // - giving high priority to moving relatively new threads, on
711 // the gournds that they haven't had time to build up a
712 // working set in the cache on this CPU/Capability.
713 //
714 // - giving low priority to moving long-lived threads
715
716 if (n_free_caps > 0) {
717 StgTSO *prev, *t, *next;
718 #ifdef SPARK_PUSHING
719 rtsBool pushed_to_all;
720 #endif
721
722 debugTrace(DEBUG_sched,
723 "cap %d: %s and %d free capabilities, sharing...",
724 cap->no,
725 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
726 "excess threads on run queue":"sparks to share (>=2)",
727 n_free_caps);
728
729 i = 0;
730 #ifdef SPARK_PUSHING
731 pushed_to_all = rtsFalse;
732 #endif
733
734 if (cap->run_queue_hd != END_TSO_QUEUE) {
735 prev = cap->run_queue_hd;
736 t = prev->_link;
737 prev->_link = END_TSO_QUEUE;
738 for (; t != END_TSO_QUEUE; t = next) {
739 next = t->_link;
740 t->_link = END_TSO_QUEUE;
741 if (t->bound == task->incall // don't move my bound thread
742 || tsoLocked(t)) { // don't move a locked thread
743 setTSOLink(cap, prev, t);
744 setTSOPrev(cap, t, prev);
745 prev = t;
746 } else if (i == n_free_caps) {
747 #ifdef SPARK_PUSHING
748 pushed_to_all = rtsTrue;
749 #endif
750 i = 0;
751 // keep one for us
752 setTSOLink(cap, prev, t);
753 setTSOPrev(cap, t, prev);
754 prev = t;
755 } else {
756 appendToRunQueue(free_caps[i],t);
757
758 traceEventMigrateThread (cap, t, free_caps[i]->no);
759
760 if (t->bound) { t->bound->task->cap = free_caps[i]; }
761 t->cap = free_caps[i];
762 i++;
763 }
764 }
765 cap->run_queue_tl = prev;
766
767 IF_DEBUG(sanity, checkRunQueue(cap));
768 }
769
770 #ifdef SPARK_PUSHING
771 /* JB I left this code in place, it would work but is not necessary */
772
773 // If there are some free capabilities that we didn't push any
774 // threads to, then try to push a spark to each one.
775 if (!pushed_to_all) {
776 StgClosure *spark;
777 // i is the next free capability to push to
778 for (; i < n_free_caps; i++) {
779 if (emptySparkPoolCap(free_caps[i])) {
780 spark = tryStealSpark(cap->sparks);
781 if (spark != NULL) {
782 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
783
784 traceEventStealSpark(free_caps[i], t, cap->no);
785
786 newSpark(&(free_caps[i]->r), spark);
787 }
788 }
789 }
790 }
791 #endif /* SPARK_PUSHING */
792
793 // release the capabilities
794 for (i = 0; i < n_free_caps; i++) {
795 task->cap = free_caps[i];
796 releaseAndWakeupCapability(free_caps[i]);
797 }
798 }
799 task->cap = cap; // reset to point to our Capability.
800
801 #endif /* THREADED_RTS */
802
803 }
804
805 /* ----------------------------------------------------------------------------
806 * Start any pending signal handlers
807 * ------------------------------------------------------------------------- */
808
809 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
810 static void
811 scheduleStartSignalHandlers(Capability *cap)
812 {
813 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
814 // safe outside the lock
815 startSignalHandlers(cap);
816 }
817 }
818 #else
819 static void
820 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
821 {
822 }
823 #endif
824
825 /* ----------------------------------------------------------------------------
826 * Check for blocked threads that can be woken up.
827 * ------------------------------------------------------------------------- */
828
829 static void
830 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
831 {
832 #if !defined(THREADED_RTS)
833 //
834 // Check whether any waiting threads need to be woken up. If the
835 // run queue is empty, and there are no other tasks running, we
836 // can wait indefinitely for something to happen.
837 //
838 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
839 {
840 awaitEvent (emptyRunQueue(cap));
841 }
842 #endif
843 }
844
845 /* ----------------------------------------------------------------------------
846 * Detect deadlock conditions and attempt to resolve them.
847 * ------------------------------------------------------------------------- */
848
849 static void
850 scheduleDetectDeadlock (Capability *cap, Task *task)
851 {
852 /*
853 * Detect deadlock: when we have no threads to run, there are no
854 * threads blocked, waiting for I/O, or sleeping, and all the
855 * other tasks are waiting for work, we must have a deadlock of
856 * some description.
857 */
858 if ( emptyThreadQueues(cap) )
859 {
860 #if defined(THREADED_RTS)
861 /*
862 * In the threaded RTS, we only check for deadlock if there
863 * has been no activity in a complete timeslice. This means
864 * we won't eagerly start a full GC just because we don't have
865 * any threads to run currently.
866 */
867 if (recent_activity != ACTIVITY_INACTIVE) return;
868 #endif
869
870 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
871
872 // Garbage collection can release some new threads due to
873 // either (a) finalizers or (b) threads resurrected because
874 // they are unreachable and will therefore be sent an
875 // exception. Any threads thus released will be immediately
876 // runnable.
877 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
878 // when force_major == rtsTrue. scheduleDoGC sets
879 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
880 // signal.
881
882 if ( !emptyRunQueue(cap) ) return;
883
884 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
885 /* If we have user-installed signal handlers, then wait
886 * for signals to arrive rather then bombing out with a
887 * deadlock.
888 */
889 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
890 debugTrace(DEBUG_sched,
891 "still deadlocked, waiting for signals...");
892
893 awaitUserSignals();
894
895 if (signals_pending()) {
896 startSignalHandlers(cap);
897 }
898
899 // either we have threads to run, or we were interrupted:
900 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
901
902 return;
903 }
904 #endif
905
906 #if !defined(THREADED_RTS)
907 /* Probably a real deadlock. Send the current main thread the
908 * Deadlock exception.
909 */
910 if (task->incall->tso) {
911 switch (task->incall->tso->why_blocked) {
912 case BlockedOnSTM:
913 case BlockedOnBlackHole:
914 case BlockedOnMsgThrowTo:
915 case BlockedOnMVar:
916 throwToSingleThreaded(cap, task->incall->tso,
917 (StgClosure *)nonTermination_closure);
918 return;
919 default:
920 barf("deadlock: main thread blocked in a strange way");
921 }
922 }
923 return;
924 #endif
925 }
926 }
927
928
929 /* ----------------------------------------------------------------------------
930 * Send pending messages (PARALLEL_HASKELL only)
931 * ------------------------------------------------------------------------- */
932
933 #if defined(PARALLEL_HASKELL)
934 static void
935 scheduleSendPendingMessages(void)
936 {
937
938 # if defined(PAR) // global Mem.Mgmt., omit for now
939 if (PendingFetches != END_BF_QUEUE) {
940 processFetches();
941 }
942 # endif
943
944 if (RtsFlags.ParFlags.BufferTime) {
945 // if we use message buffering, we must send away all message
946 // packets which have become too old...
947 sendOldBuffers();
948 }
949 }
950 #endif
951
952 /* ----------------------------------------------------------------------------
953 * Process message in the current Capability's inbox
954 * ------------------------------------------------------------------------- */
955
956 static void
957 scheduleProcessInbox (Capability *cap USED_IF_THREADS)
958 {
959 #if defined(THREADED_RTS)
960 Message *m, *next;
961 int r;
962
963 while (!emptyInbox(cap)) {
964 if (cap->r.rCurrentNursery->link == NULL ||
965 g0->n_new_large_words >= large_alloc_lim) {
966 scheduleDoGC(cap, cap->running_task, rtsFalse);
967 }
968
969 // don't use a blocking acquire; if the lock is held by
970 // another thread then just carry on. This seems to avoid
971 // getting stuck in a message ping-pong situation with other
972 // processors. We'll check the inbox again later anyway.
973 //
974 // We should really use a more efficient queue data structure
975 // here. The trickiness is that we must ensure a Capability
976 // never goes idle if the inbox is non-empty, which is why we
977 // use cap->lock (cap->lock is released as the last thing
978 // before going idle; see Capability.c:releaseCapability()).
979 r = TRY_ACQUIRE_LOCK(&cap->lock);
980 if (r != 0) return;
981
982 m = cap->inbox;
983 cap->inbox = (Message*)END_TSO_QUEUE;
984
985 RELEASE_LOCK(&cap->lock);
986
987 while (m != (Message*)END_TSO_QUEUE) {
988 next = m->link;
989 executeMessage(cap, m);
990 m = next;
991 }
992 }
993 #endif
994 }
995
996 /* ----------------------------------------------------------------------------
997 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
998 * ------------------------------------------------------------------------- */
999
1000 #if defined(THREADED_RTS)
1001 static void
1002 scheduleActivateSpark(Capability *cap)
1003 {
1004 if (anySparks())
1005 {
1006 createSparkThread(cap);
1007 debugTrace(DEBUG_sched, "creating a spark thread");
1008 }
1009 }
1010 #endif // PARALLEL_HASKELL || THREADED_RTS
1011
1012 /* ----------------------------------------------------------------------------
1013 * After running a thread...
1014 * ------------------------------------------------------------------------- */
1015
1016 static void
1017 schedulePostRunThread (Capability *cap, StgTSO *t)
1018 {
1019 // We have to be able to catch transactions that are in an
1020 // infinite loop as a result of seeing an inconsistent view of
1021 // memory, e.g.
1022 //
1023 // atomically $ do
1024 // [a,b] <- mapM readTVar [ta,tb]
1025 // when (a == b) loop
1026 //
1027 // and a is never equal to b given a consistent view of memory.
1028 //
1029 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1030 if (!stmValidateNestOfTransactions (t -> trec)) {
1031 debugTrace(DEBUG_sched | DEBUG_stm,
1032 "trec %p found wasting its time", t);
1033
1034 // strip the stack back to the
1035 // ATOMICALLY_FRAME, aborting the (nested)
1036 // transaction, and saving the stack of any
1037 // partially-evaluated thunks on the heap.
1038 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1039
1040 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1041 }
1042 }
1043
1044 /* some statistics gathering in the parallel case */
1045 }
1046
1047 /* -----------------------------------------------------------------------------
1048 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1049 * -------------------------------------------------------------------------- */
1050
1051 static rtsBool
1052 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1053 {
1054 // did the task ask for a large block?
1055 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1056 // if so, get one and push it on the front of the nursery.
1057 bdescr *bd;
1058 lnat blocks;
1059
1060 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1061
1062 if (blocks > BLOCKS_PER_MBLOCK) {
1063 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1064 }
1065
1066 debugTrace(DEBUG_sched,
1067 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1068 (long)t->id, what_next_strs[t->what_next], blocks);
1069
1070 // don't do this if the nursery is (nearly) full, we'll GC first.
1071 if (cap->r.rCurrentNursery->link != NULL ||
1072 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1073 // if the nursery has only one block.
1074
1075 bd = allocGroup_lock(blocks);
1076 cap->r.rNursery->n_blocks += blocks;
1077
1078 // link the new group into the list
1079 bd->link = cap->r.rCurrentNursery;
1080 bd->u.back = cap->r.rCurrentNursery->u.back;
1081 if (cap->r.rCurrentNursery->u.back != NULL) {
1082 cap->r.rCurrentNursery->u.back->link = bd;
1083 } else {
1084 cap->r.rNursery->blocks = bd;
1085 }
1086 cap->r.rCurrentNursery->u.back = bd;
1087
1088 // initialise it as a nursery block. We initialise the
1089 // step, gen_no, and flags field of *every* sub-block in
1090 // this large block, because this is easier than making
1091 // sure that we always find the block head of a large
1092 // block whenever we call Bdescr() (eg. evacuate() and
1093 // isAlive() in the GC would both have to do this, at
1094 // least).
1095 {
1096 bdescr *x;
1097 for (x = bd; x < bd + blocks; x++) {
1098 initBdescr(x,g0,g0);
1099 x->free = x->start;
1100 x->flags = 0;
1101 }
1102 }
1103
1104 // This assert can be a killer if the app is doing lots
1105 // of large block allocations.
1106 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1107
1108 // now update the nursery to point to the new block
1109 cap->r.rCurrentNursery = bd;
1110
1111 // we might be unlucky and have another thread get on the
1112 // run queue before us and steal the large block, but in that
1113 // case the thread will just end up requesting another large
1114 // block.
1115 pushOnRunQueue(cap,t);
1116 return rtsFalse; /* not actually GC'ing */
1117 }
1118 }
1119
1120 if (cap->r.rHpLim == NULL || cap->context_switch) {
1121 // Sometimes we miss a context switch, e.g. when calling
1122 // primitives in a tight loop, MAYBE_GC() doesn't check the
1123 // context switch flag, and we end up waiting for a GC.
1124 // See #1984, and concurrent/should_run/1984
1125 cap->context_switch = 0;
1126 appendToRunQueue(cap,t);
1127 } else {
1128 pushOnRunQueue(cap,t);
1129 }
1130 return rtsTrue;
1131 /* actual GC is done at the end of the while loop in schedule() */
1132 }
1133
1134 /* -----------------------------------------------------------------------------
1135 * Handle a thread that returned to the scheduler with ThreadYielding
1136 * -------------------------------------------------------------------------- */
1137
1138 static rtsBool
1139 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1140 {
1141 /* put the thread back on the run queue. Then, if we're ready to
1142 * GC, check whether this is the last task to stop. If so, wake
1143 * up the GC thread. getThread will block during a GC until the
1144 * GC is finished.
1145 */
1146
1147 ASSERT(t->_link == END_TSO_QUEUE);
1148
1149 // Shortcut if we're just switching evaluators: don't bother
1150 // doing stack squeezing (which can be expensive), just run the
1151 // thread.
1152 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1153 debugTrace(DEBUG_sched,
1154 "--<< thread %ld (%s) stopped to switch evaluators",
1155 (long)t->id, what_next_strs[t->what_next]);
1156 return rtsTrue;
1157 }
1158
1159 // Reset the context switch flag. We don't do this just before
1160 // running the thread, because that would mean we would lose ticks
1161 // during GC, which can lead to unfair scheduling (a thread hogs
1162 // the CPU because the tick always arrives during GC). This way
1163 // penalises threads that do a lot of allocation, but that seems
1164 // better than the alternative.
1165 cap->context_switch = 0;
1166
1167 IF_DEBUG(sanity,
1168 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1169 checkTSO(t));
1170
1171 appendToRunQueue(cap,t);
1172
1173 return rtsFalse;
1174 }
1175
1176 /* -----------------------------------------------------------------------------
1177 * Handle a thread that returned to the scheduler with ThreadBlocked
1178 * -------------------------------------------------------------------------- */
1179
1180 static void
1181 scheduleHandleThreadBlocked( StgTSO *t
1182 #if !defined(DEBUG)
1183 STG_UNUSED
1184 #endif
1185 )
1186 {
1187
1188 // We don't need to do anything. The thread is blocked, and it
1189 // has tidied up its stack and placed itself on whatever queue
1190 // it needs to be on.
1191
1192 // ASSERT(t->why_blocked != NotBlocked);
1193 // Not true: for example,
1194 // - the thread may have woken itself up already, because
1195 // threadPaused() might have raised a blocked throwTo
1196 // exception, see maybePerformBlockedException().
1197
1198 #ifdef DEBUG
1199 traceThreadStatus(DEBUG_sched, t);
1200 #endif
1201 }
1202
1203 /* -----------------------------------------------------------------------------
1204 * Handle a thread that returned to the scheduler with ThreadFinished
1205 * -------------------------------------------------------------------------- */
1206
1207 static rtsBool
1208 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1209 {
1210 /* Need to check whether this was a main thread, and if so,
1211 * return with the return value.
1212 *
1213 * We also end up here if the thread kills itself with an
1214 * uncaught exception, see Exception.cmm.
1215 */
1216
1217 // blocked exceptions can now complete, even if the thread was in
1218 // blocked mode (see #2910).
1219 awakenBlockedExceptionQueue (cap, t);
1220
1221 //
1222 // Check whether the thread that just completed was a bound
1223 // thread, and if so return with the result.
1224 //
1225 // There is an assumption here that all thread completion goes
1226 // through this point; we need to make sure that if a thread
1227 // ends up in the ThreadKilled state, that it stays on the run
1228 // queue so it can be dealt with here.
1229 //
1230
1231 if (t->bound) {
1232
1233 if (t->bound != task->incall) {
1234 #if !defined(THREADED_RTS)
1235 // Must be a bound thread that is not the topmost one. Leave
1236 // it on the run queue until the stack has unwound to the
1237 // point where we can deal with this. Leaving it on the run
1238 // queue also ensures that the garbage collector knows about
1239 // this thread and its return value (it gets dropped from the
1240 // step->threads list so there's no other way to find it).
1241 appendToRunQueue(cap,t);
1242 return rtsFalse;
1243 #else
1244 // this cannot happen in the threaded RTS, because a
1245 // bound thread can only be run by the appropriate Task.
1246 barf("finished bound thread that isn't mine");
1247 #endif
1248 }
1249
1250 ASSERT(task->incall->tso == t);
1251
1252 if (t->what_next == ThreadComplete) {
1253 if (task->incall->ret) {
1254 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1255 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1256 }
1257 task->incall->stat = Success;
1258 } else {
1259 if (task->incall->ret) {
1260 *(task->incall->ret) = NULL;
1261 }
1262 if (sched_state >= SCHED_INTERRUPTING) {
1263 if (heap_overflow) {
1264 task->incall->stat = HeapExhausted;
1265 } else {
1266 task->incall->stat = Interrupted;
1267 }
1268 } else {
1269 task->incall->stat = Killed;
1270 }
1271 }
1272 #ifdef DEBUG
1273 removeThreadLabel((StgWord)task->incall->tso->id);
1274 #endif
1275
1276 // We no longer consider this thread and task to be bound to
1277 // each other. The TSO lives on until it is GC'd, but the
1278 // task is about to be released by the caller, and we don't
1279 // want anyone following the pointer from the TSO to the
1280 // defunct task (which might have already been
1281 // re-used). This was a real bug: the GC updated
1282 // tso->bound->tso which lead to a deadlock.
1283 t->bound = NULL;
1284 task->incall->tso = NULL;
1285
1286 return rtsTrue; // tells schedule() to return
1287 }
1288
1289 return rtsFalse;
1290 }
1291
1292 /* -----------------------------------------------------------------------------
1293 * Perform a heap census
1294 * -------------------------------------------------------------------------- */
1295
1296 static rtsBool
1297 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1298 {
1299 // When we have +RTS -i0 and we're heap profiling, do a census at
1300 // every GC. This lets us get repeatable runs for debugging.
1301 if (performHeapProfile ||
1302 (RtsFlags.ProfFlags.profileInterval==0 &&
1303 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1304 return rtsTrue;
1305 } else {
1306 return rtsFalse;
1307 }
1308 }
1309
1310 /* -----------------------------------------------------------------------------
1311 * Perform a garbage collection if necessary
1312 * -------------------------------------------------------------------------- */
1313
1314 static Capability *
1315 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1316 {
1317 rtsBool heap_census;
1318 #ifdef THREADED_RTS
1319 /* extern static volatile StgWord waiting_for_gc;
1320 lives inside capability.c */
1321 rtsBool gc_type, prev_pending_gc;
1322 nat i;
1323 #endif
1324
1325 if (sched_state == SCHED_SHUTTING_DOWN) {
1326 // The final GC has already been done, and the system is
1327 // shutting down. We'll probably deadlock if we try to GC
1328 // now.
1329 return cap;
1330 }
1331
1332 #ifdef THREADED_RTS
1333 if (sched_state < SCHED_INTERRUPTING
1334 && RtsFlags.ParFlags.parGcEnabled
1335 && N >= RtsFlags.ParFlags.parGcGen
1336 && ! oldest_gen->mark)
1337 {
1338 gc_type = PENDING_GC_PAR;
1339 } else {
1340 gc_type = PENDING_GC_SEQ;
1341 }
1342
1343 // In order to GC, there must be no threads running Haskell code.
1344 // Therefore, the GC thread needs to hold *all* the capabilities,
1345 // and release them after the GC has completed.
1346 //
1347 // This seems to be the simplest way: previous attempts involved
1348 // making all the threads with capabilities give up their
1349 // capabilities and sleep except for the *last* one, which
1350 // actually did the GC. But it's quite hard to arrange for all
1351 // the other tasks to sleep and stay asleep.
1352 //
1353
1354 /* Other capabilities are prevented from running yet more Haskell
1355 threads if waiting_for_gc is set. Tested inside
1356 yieldCapability() and releaseCapability() in Capability.c */
1357
1358 prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1359 if (prev_pending_gc) {
1360 do {
1361 debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...",
1362 prev_pending_gc);
1363 ASSERT(cap);
1364 yieldCapability(&cap,task);
1365 } while (waiting_for_gc);
1366 return cap; // NOTE: task->cap might have changed here
1367 }
1368
1369 setContextSwitches();
1370
1371 // The final shutdown GC is always single-threaded, because it's
1372 // possible that some of the Capabilities have no worker threads.
1373
1374 if (gc_type == PENDING_GC_SEQ)
1375 {
1376 traceEventRequestSeqGc(cap);
1377 }
1378 else
1379 {
1380 traceEventRequestParGc(cap);
1381 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1382 }
1383
1384 if (gc_type == PENDING_GC_SEQ)
1385 {
1386 // single-threaded GC: grab all the capabilities
1387 for (i=0; i < n_capabilities; i++) {
1388 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1389 if (cap != &capabilities[i]) {
1390 Capability *pcap = &capabilities[i];
1391 // we better hope this task doesn't get migrated to
1392 // another Capability while we're waiting for this one.
1393 // It won't, because load balancing happens while we have
1394 // all the Capabilities, but even so it's a slightly
1395 // unsavoury invariant.
1396 task->cap = pcap;
1397 waitForReturnCapability(&pcap, task);
1398 if (pcap != &capabilities[i]) {
1399 barf("scheduleDoGC: got the wrong capability");
1400 }
1401 }
1402 }
1403 }
1404 else
1405 {
1406 // multi-threaded GC: make sure all the Capabilities donate one
1407 // GC thread each.
1408 waitForGcThreads(cap);
1409 }
1410
1411 #endif
1412
1413 IF_DEBUG(scheduler, printAllThreads());
1414
1415 delete_threads_and_gc:
1416 /*
1417 * We now have all the capabilities; if we're in an interrupting
1418 * state, then we should take the opportunity to delete all the
1419 * threads in the system.
1420 */
1421 if (sched_state == SCHED_INTERRUPTING) {
1422 deleteAllThreads(cap);
1423 sched_state = SCHED_SHUTTING_DOWN;
1424 }
1425
1426 heap_census = scheduleNeedHeapProfile(rtsTrue);
1427
1428 traceEventGcStart(cap);
1429 #if defined(THREADED_RTS)
1430 // reset waiting_for_gc *before* GC, so that when the GC threads
1431 // emerge they don't immediately re-enter the GC.
1432 waiting_for_gc = 0;
1433 GarbageCollect(force_major || heap_census, gc_type, cap);
1434 #else
1435 GarbageCollect(force_major || heap_census, 0, cap);
1436 #endif
1437 traceEventGcEnd(cap);
1438
1439 if (recent_activity == ACTIVITY_INACTIVE && force_major)
1440 {
1441 // We are doing a GC because the system has been idle for a
1442 // timeslice and we need to check for deadlock. Record the
1443 // fact that we've done a GC and turn off the timer signal;
1444 // it will get re-enabled if we run any threads after the GC.
1445 recent_activity = ACTIVITY_DONE_GC;
1446 stopTimer();
1447 }
1448 else
1449 {
1450 // the GC might have taken long enough for the timer to set
1451 // recent_activity = ACTIVITY_INACTIVE, but we aren't
1452 // necessarily deadlocked:
1453 recent_activity = ACTIVITY_YES;
1454 }
1455
1456 if (heap_census) {
1457 debugTrace(DEBUG_sched, "performing heap census");
1458 heapCensus();
1459 performHeapProfile = rtsFalse;
1460 }
1461
1462 #if defined(THREADED_RTS)
1463 if (gc_type == PENDING_GC_PAR)
1464 {
1465 releaseGCThreads(cap);
1466 }
1467 #endif
1468
1469 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1470 // GC set the heap_overflow flag, so we should proceed with
1471 // an orderly shutdown now. Ultimately we want the main
1472 // thread to return to its caller with HeapExhausted, at which
1473 // point the caller should call hs_exit(). The first step is
1474 // to delete all the threads.
1475 //
1476 // Another way to do this would be to raise an exception in
1477 // the main thread, which we really should do because it gives
1478 // the program a chance to clean up. But how do we find the
1479 // main thread? It should presumably be the same one that
1480 // gets ^C exceptions, but that's all done on the Haskell side
1481 // (GHC.TopHandler).
1482 sched_state = SCHED_INTERRUPTING;
1483 goto delete_threads_and_gc;
1484 }
1485
1486 #ifdef SPARKBALANCE
1487 /* JB
1488 Once we are all together... this would be the place to balance all
1489 spark pools. No concurrent stealing or adding of new sparks can
1490 occur. Should be defined in Sparks.c. */
1491 balanceSparkPoolsCaps(n_capabilities, capabilities);
1492 #endif
1493
1494 #if defined(THREADED_RTS)
1495 if (gc_type == PENDING_GC_SEQ) {
1496 // release our stash of capabilities.
1497 for (i = 0; i < n_capabilities; i++) {
1498 if (cap != &capabilities[i]) {
1499 task->cap = &capabilities[i];
1500 releaseCapability(&capabilities[i]);
1501 }
1502 }
1503 }
1504 if (cap) {
1505 task->cap = cap;
1506 } else {
1507 task->cap = NULL;
1508 }
1509 #endif
1510
1511 return cap;
1512 }
1513
1514 /* ---------------------------------------------------------------------------
1515 * Singleton fork(). Do not copy any running threads.
1516 * ------------------------------------------------------------------------- */
1517
1518 pid_t
1519 forkProcess(HsStablePtr *entry
1520 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1521 STG_UNUSED
1522 #endif
1523 )
1524 {
1525 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1526 pid_t pid;
1527 StgTSO* t,*next;
1528 Capability *cap;
1529 nat g;
1530
1531 #if defined(THREADED_RTS)
1532 if (RtsFlags.ParFlags.nNodes > 1) {
1533 errorBelch("forking not supported with +RTS -N<n> greater than 1");
1534 stg_exit(EXIT_FAILURE);
1535 }
1536 #endif
1537
1538 debugTrace(DEBUG_sched, "forking!");
1539
1540 // ToDo: for SMP, we should probably acquire *all* the capabilities
1541 cap = rts_lock();
1542
1543 // no funny business: hold locks while we fork, otherwise if some
1544 // other thread is holding a lock when the fork happens, the data
1545 // structure protected by the lock will forever be in an
1546 // inconsistent state in the child. See also #1391.
1547 ACQUIRE_LOCK(&sched_mutex);
1548 ACQUIRE_LOCK(&cap->lock);
1549 ACQUIRE_LOCK(&cap->running_task->lock);
1550
1551 stopTimer(); // See #4074
1552
1553 #if defined(TRACING)
1554 flushEventLog(); // so that child won't inherit dirty file buffers
1555 #endif
1556
1557 pid = fork();
1558
1559 if (pid) { // parent
1560
1561 startTimer(); // #4074
1562
1563 RELEASE_LOCK(&sched_mutex);
1564 RELEASE_LOCK(&cap->lock);
1565 RELEASE_LOCK(&cap->running_task->lock);
1566
1567 // just return the pid
1568 rts_unlock(cap);
1569 return pid;
1570
1571 } else { // child
1572
1573 #if defined(THREADED_RTS)
1574 initMutex(&sched_mutex);
1575 initMutex(&cap->lock);
1576 initMutex(&cap->running_task->lock);
1577 #endif
1578
1579 #ifdef TRACING
1580 resetTracing();
1581 #endif
1582
1583 // Now, all OS threads except the thread that forked are
1584 // stopped. We need to stop all Haskell threads, including
1585 // those involved in foreign calls. Also we need to delete
1586 // all Tasks, because they correspond to OS threads that are
1587 // now gone.
1588
1589 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1590 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1591 next = t->global_link;
1592 // don't allow threads to catch the ThreadKilled
1593 // exception, but we do want to raiseAsync() because these
1594 // threads may be evaluating thunks that we need later.
1595 deleteThread_(cap,t);
1596
1597 // stop the GC from updating the InCall to point to
1598 // the TSO. This is only necessary because the
1599 // OSThread bound to the TSO has been killed, and
1600 // won't get a chance to exit in the usual way (see
1601 // also scheduleHandleThreadFinished).
1602 t->bound = NULL;
1603 }
1604 }
1605
1606 // Empty the run queue. It seems tempting to let all the
1607 // killed threads stay on the run queue as zombies to be
1608 // cleaned up later, but some of them correspond to bound
1609 // threads for which the corresponding Task does not exist.
1610 cap->run_queue_hd = END_TSO_QUEUE;
1611 cap->run_queue_tl = END_TSO_QUEUE;
1612
1613 // Any suspended C-calling Tasks are no more, their OS threads
1614 // don't exist now:
1615 cap->suspended_ccalls = NULL;
1616
1617 // Empty the threads lists. Otherwise, the garbage
1618 // collector may attempt to resurrect some of these threads.
1619 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1620 generations[g].threads = END_TSO_QUEUE;
1621 }
1622
1623 discardTasksExcept(cap->running_task);
1624
1625 #if defined(THREADED_RTS)
1626 // Wipe our spare workers list, they no longer exist. New
1627 // workers will be created if necessary.
1628 cap->spare_workers = NULL;
1629 cap->n_spare_workers = 0;
1630 cap->returning_tasks_hd = NULL;
1631 cap->returning_tasks_tl = NULL;
1632 #endif
1633
1634 // On Unix, all timers are reset in the child, so we need to start
1635 // the timer again.
1636 initTimer();
1637 startTimer();
1638
1639 #if defined(THREADED_RTS)
1640 cap = ioManagerStartCap(cap);
1641 #endif
1642
1643 cap = rts_evalStableIO(cap, entry, NULL); // run the action
1644 rts_checkSchedStatus("forkProcess",cap);
1645
1646 rts_unlock(cap);
1647 hs_exit(); // clean up and exit
1648 stg_exit(EXIT_SUCCESS);
1649 }
1650 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1651 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1652 #endif
1653 }
1654
1655 /* ---------------------------------------------------------------------------
1656 * Delete all the threads in the system
1657 * ------------------------------------------------------------------------- */
1658
1659 static void
1660 deleteAllThreads ( Capability *cap )
1661 {
1662 // NOTE: only safe to call if we own all capabilities.
1663
1664 StgTSO* t, *next;
1665 nat g;
1666
1667 debugTrace(DEBUG_sched,"deleting all threads");
1668 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1669 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1670 next = t->global_link;
1671 deleteThread(cap,t);
1672 }
1673 }
1674
1675 // The run queue now contains a bunch of ThreadKilled threads. We
1676 // must not throw these away: the main thread(s) will be in there
1677 // somewhere, and the main scheduler loop has to deal with it.
1678 // Also, the run queue is the only thing keeping these threads from
1679 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1680
1681 #if !defined(THREADED_RTS)
1682 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1683 ASSERT(sleeping_queue == END_TSO_QUEUE);
1684 #endif
1685 }
1686
1687 /* -----------------------------------------------------------------------------
1688 Managing the suspended_ccalls list.
1689 Locks required: sched_mutex
1690 -------------------------------------------------------------------------- */
1691
1692 STATIC_INLINE void
1693 suspendTask (Capability *cap, Task *task)
1694 {
1695 InCall *incall;
1696
1697 incall = task->incall;
1698 ASSERT(incall->next == NULL && incall->prev == NULL);
1699 incall->next = cap->suspended_ccalls;
1700 incall->prev = NULL;
1701 if (cap->suspended_ccalls) {
1702 cap->suspended_ccalls->prev = incall;
1703 }
1704 cap->suspended_ccalls = incall;
1705 }
1706
1707 STATIC_INLINE void
1708 recoverSuspendedTask (Capability *cap, Task *task)
1709 {
1710 InCall *incall;
1711
1712 incall = task->incall;
1713 if (incall->prev) {
1714 incall->prev->next = incall->next;
1715 } else {
1716 ASSERT(cap->suspended_ccalls == incall);
1717 cap->suspended_ccalls = incall->next;
1718 }
1719 if (incall->next) {
1720 incall->next->prev = incall->prev;
1721 }
1722 incall->next = incall->prev = NULL;
1723 }
1724
1725 /* ---------------------------------------------------------------------------
1726 * Suspending & resuming Haskell threads.
1727 *
1728 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1729 * its capability before calling the C function. This allows another
1730 * task to pick up the capability and carry on running Haskell
1731 * threads. It also means that if the C call blocks, it won't lock
1732 * the whole system.
1733 *
1734 * The Haskell thread making the C call is put to sleep for the
1735 * duration of the call, on the suspended_ccalling_threads queue. We
1736 * give out a token to the task, which it can use to resume the thread
1737 * on return from the C function.
1738 *
1739 * If this is an interruptible C call, this means that the FFI call may be
1740 * unceremoniously terminated and should be scheduled on an
1741 * unbound worker thread.
1742 * ------------------------------------------------------------------------- */
1743
1744 void *
1745 suspendThread (StgRegTable *reg, rtsBool interruptible)
1746 {
1747 Capability *cap;
1748 int saved_errno;
1749 StgTSO *tso;
1750 Task *task;
1751 #if mingw32_HOST_OS
1752 StgWord32 saved_winerror;
1753 #endif
1754
1755 saved_errno = errno;
1756 #if mingw32_HOST_OS
1757 saved_winerror = GetLastError();
1758 #endif
1759
1760 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1761 */
1762 cap = regTableToCapability(reg);
1763
1764 task = cap->running_task;
1765 tso = cap->r.rCurrentTSO;
1766
1767 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
1768
1769 // XXX this might not be necessary --SDM
1770 tso->what_next = ThreadRunGHC;
1771
1772 threadPaused(cap,tso);
1773
1774 if (interruptible) {
1775 tso->why_blocked = BlockedOnCCall_Interruptible;
1776 } else {
1777 tso->why_blocked = BlockedOnCCall;
1778 }
1779
1780 // Hand back capability
1781 task->incall->suspended_tso = tso;
1782 task->incall->suspended_cap = cap;
1783
1784 ACQUIRE_LOCK(&cap->lock);
1785
1786 suspendTask(cap,task);
1787 cap->in_haskell = rtsFalse;
1788 releaseCapability_(cap,rtsFalse);
1789
1790 RELEASE_LOCK(&cap->lock);
1791
1792 errno = saved_errno;
1793 #if mingw32_HOST_OS
1794 SetLastError(saved_winerror);
1795 #endif
1796 return task;
1797 }
1798
1799 StgRegTable *
1800 resumeThread (void *task_)
1801 {
1802 StgTSO *tso;
1803 InCall *incall;
1804 Capability *cap;
1805 Task *task = task_;
1806 int saved_errno;
1807 #if mingw32_HOST_OS
1808 StgWord32 saved_winerror;
1809 #endif
1810
1811 saved_errno = errno;
1812 #if mingw32_HOST_OS
1813 saved_winerror = GetLastError();
1814 #endif
1815
1816 incall = task->incall;
1817 cap = incall->suspended_cap;
1818 task->cap = cap;
1819
1820 // Wait for permission to re-enter the RTS with the result.
1821 waitForReturnCapability(&cap,task);
1822 // we might be on a different capability now... but if so, our
1823 // entry on the suspended_ccalls list will also have been
1824 // migrated.
1825
1826 // Remove the thread from the suspended list
1827 recoverSuspendedTask(cap,task);
1828
1829 tso = incall->suspended_tso;
1830 incall->suspended_tso = NULL;
1831 incall->suspended_cap = NULL;
1832 tso->_link = END_TSO_QUEUE; // no write barrier reqd
1833
1834 traceEventRunThread(cap, tso);
1835
1836 /* Reset blocking status */
1837 tso->why_blocked = NotBlocked;
1838
1839 if ((tso->flags & TSO_BLOCKEX) == 0) {
1840 // avoid locking the TSO if we don't have to
1841 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
1842 maybePerformBlockedException(cap,tso);
1843 }
1844 }
1845
1846 cap->r.rCurrentTSO = tso;
1847 cap->in_haskell = rtsTrue;
1848 errno = saved_errno;
1849 #if mingw32_HOST_OS
1850 SetLastError(saved_winerror);
1851 #endif
1852
1853 /* We might have GC'd, mark the TSO dirty again */
1854 dirty_TSO(cap,tso);
1855 dirty_STACK(cap,tso->stackobj);
1856
1857 IF_DEBUG(sanity, checkTSO(tso));
1858
1859 return &cap->r;
1860 }
1861
1862 /* ---------------------------------------------------------------------------
1863 * scheduleThread()
1864 *
1865 * scheduleThread puts a thread on the end of the runnable queue.
1866 * This will usually be done immediately after a thread is created.
1867 * The caller of scheduleThread must create the thread using e.g.
1868 * createThread and push an appropriate closure
1869 * on this thread's stack before the scheduler is invoked.
1870 * ------------------------------------------------------------------------ */
1871
1872 void
1873 scheduleThread(Capability *cap, StgTSO *tso)
1874 {
1875 // The thread goes at the *end* of the run-queue, to avoid possible
1876 // starvation of any threads already on the queue.
1877 appendToRunQueue(cap,tso);
1878 }
1879
1880 void
1881 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1882 {
1883 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1884 // move this thread from now on.
1885 #if defined(THREADED_RTS)
1886 cpu %= RtsFlags.ParFlags.nNodes;
1887 if (cpu == cap->no) {
1888 appendToRunQueue(cap,tso);
1889 } else {
1890 migrateThread(cap, tso, &capabilities[cpu]);
1891 }
1892 #else
1893 appendToRunQueue(cap,tso);
1894 #endif
1895 }
1896
1897 Capability *
1898 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1899 {
1900 Task *task;
1901 DEBUG_ONLY( StgThreadID id );
1902
1903 // We already created/initialised the Task
1904 task = cap->running_task;
1905
1906 // This TSO is now a bound thread; make the Task and TSO
1907 // point to each other.
1908 tso->bound = task->incall;
1909 tso->cap = cap;
1910
1911 task->incall->tso = tso;
1912 task->incall->ret = ret;
1913 task->incall->stat = NoStatus;
1914
1915 appendToRunQueue(cap,tso);
1916
1917 DEBUG_ONLY( id = tso->id );
1918 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
1919
1920 cap = schedule(cap,task);
1921
1922 ASSERT(task->incall->stat != NoStatus);
1923 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1924
1925 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
1926 return cap;
1927 }
1928
1929 /* ----------------------------------------------------------------------------
1930 * Starting Tasks
1931 * ------------------------------------------------------------------------- */
1932
1933 #if defined(THREADED_RTS)
1934 void scheduleWorker (Capability *cap, Task *task)
1935 {
1936 // schedule() runs without a lock.
1937 cap = schedule(cap,task);
1938
1939 // On exit from schedule(), we have a Capability, but possibly not
1940 // the same one we started with.
1941
1942 // During shutdown, the requirement is that after all the
1943 // Capabilities are shut down, all workers that are shutting down
1944 // have finished workerTaskStop(). This is why we hold on to
1945 // cap->lock until we've finished workerTaskStop() below.
1946 //
1947 // There may be workers still involved in foreign calls; those
1948 // will just block in waitForReturnCapability() because the
1949 // Capability has been shut down.
1950 //
1951 ACQUIRE_LOCK(&cap->lock);
1952 releaseCapability_(cap,rtsFalse);
1953 workerTaskStop(task);
1954 RELEASE_LOCK(&cap->lock);
1955 }
1956 #endif
1957
1958 /* ---------------------------------------------------------------------------
1959 * initScheduler()
1960 *
1961 * Initialise the scheduler. This resets all the queues - if the
1962 * queues contained any threads, they'll be garbage collected at the
1963 * next pass.
1964 *
1965 * ------------------------------------------------------------------------ */
1966
1967 void
1968 initScheduler(void)
1969 {
1970 #if !defined(THREADED_RTS)
1971 blocked_queue_hd = END_TSO_QUEUE;
1972 blocked_queue_tl = END_TSO_QUEUE;
1973 sleeping_queue = END_TSO_QUEUE;
1974 #endif
1975
1976 sched_state = SCHED_RUNNING;
1977 recent_activity = ACTIVITY_YES;
1978
1979 #if defined(THREADED_RTS)
1980 /* Initialise the mutex and condition variables used by
1981 * the scheduler. */
1982 initMutex(&sched_mutex);
1983 #endif
1984
1985 ACQUIRE_LOCK(&sched_mutex);
1986
1987 /* A capability holds the state a native thread needs in
1988 * order to execute STG code. At least one capability is
1989 * floating around (only THREADED_RTS builds have more than one).
1990 */
1991 initCapabilities();
1992
1993 initTaskManager();
1994
1995 #if defined(THREADED_RTS)
1996 initSparkPools();
1997 #endif
1998
1999 RELEASE_LOCK(&sched_mutex);
2000
2001 #if defined(THREADED_RTS)
2002 /*
2003 * Eagerly start one worker to run each Capability, except for
2004 * Capability 0. The idea is that we're probably going to start a
2005 * bound thread on Capability 0 pretty soon, so we don't want a
2006 * worker task hogging it.
2007 */
2008 {
2009 nat i;
2010 Capability *cap;
2011 for (i = 1; i < n_capabilities; i++) {
2012 cap = &capabilities[i];
2013 ACQUIRE_LOCK(&cap->lock);
2014 startWorkerTask(cap);
2015 RELEASE_LOCK(&cap->lock);
2016 }
2017 }
2018 #endif
2019 }
2020
2021 void
2022 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2023 /* see Capability.c, shutdownCapability() */
2024 {
2025 Task *task = NULL;
2026
2027 task = newBoundTask();
2028
2029 // If we haven't killed all the threads yet, do it now.
2030 if (sched_state < SCHED_SHUTTING_DOWN) {
2031 sched_state = SCHED_INTERRUPTING;
2032 waitForReturnCapability(&task->cap,task);
2033 scheduleDoGC(task->cap,task,rtsFalse);
2034 ASSERT(task->incall->tso == NULL);
2035 releaseCapability(task->cap);
2036 }
2037 sched_state = SCHED_SHUTTING_DOWN;
2038
2039 shutdownCapabilities(task, wait_foreign);
2040
2041 boundTaskExiting(task);
2042 }
2043
2044 void
2045 freeScheduler( void )
2046 {
2047 nat still_running;
2048
2049 ACQUIRE_LOCK(&sched_mutex);
2050 still_running = freeTaskManager();
2051 // We can only free the Capabilities if there are no Tasks still
2052 // running. We might have a Task about to return from a foreign
2053 // call into waitForReturnCapability(), for example (actually,
2054 // this should be the *only* thing that a still-running Task can
2055 // do at this point, and it will block waiting for the
2056 // Capability).
2057 if (still_running == 0) {
2058 freeCapabilities();
2059 if (n_capabilities != 1) {
2060 stgFree(capabilities);
2061 }
2062 }
2063 RELEASE_LOCK(&sched_mutex);
2064 #if defined(THREADED_RTS)
2065 closeMutex(&sched_mutex);
2066 #endif
2067 }
2068
2069 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2070 void *user USED_IF_NOT_THREADS)
2071 {
2072 #if !defined(THREADED_RTS)
2073 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2074 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2075 evac(user, (StgClosure **)(void *)&sleeping_queue);
2076 #endif
2077 }
2078
2079 /* -----------------------------------------------------------------------------
2080 performGC
2081
2082 This is the interface to the garbage collector from Haskell land.
2083 We provide this so that external C code can allocate and garbage
2084 collect when called from Haskell via _ccall_GC.
2085 -------------------------------------------------------------------------- */
2086
2087 static void
2088 performGC_(rtsBool force_major)
2089 {
2090 Task *task;
2091
2092 // We must grab a new Task here, because the existing Task may be
2093 // associated with a particular Capability, and chained onto the
2094 // suspended_ccalls queue.
2095 task = newBoundTask();
2096
2097 waitForReturnCapability(&task->cap,task);
2098 scheduleDoGC(task->cap,task,force_major);
2099 releaseCapability(task->cap);
2100 boundTaskExiting(task);
2101 }
2102
2103 void
2104 performGC(void)
2105 {
2106 performGC_(rtsFalse);
2107 }
2108
2109 void
2110 performMajorGC(void)
2111 {
2112 performGC_(rtsTrue);
2113 }
2114
2115 /* ---------------------------------------------------------------------------
2116 Interrupt execution
2117 - usually called inside a signal handler so it mustn't do anything fancy.
2118 ------------------------------------------------------------------------ */
2119
2120 void
2121 interruptStgRts(void)
2122 {
2123 sched_state = SCHED_INTERRUPTING;
2124 setContextSwitches();
2125 #if defined(THREADED_RTS)
2126 wakeUpRts();
2127 #endif
2128 }
2129
2130 /* -----------------------------------------------------------------------------
2131 Wake up the RTS
2132
2133 This function causes at least one OS thread to wake up and run the
2134 scheduler loop. It is invoked when the RTS might be deadlocked, or
2135 an external event has arrived that may need servicing (eg. a
2136 keyboard interrupt).
2137
2138 In the single-threaded RTS we don't do anything here; we only have
2139 one thread anyway, and the event that caused us to want to wake up
2140 will have interrupted any blocking system call in progress anyway.
2141 -------------------------------------------------------------------------- */
2142
2143 #if defined(THREADED_RTS)
2144 void wakeUpRts(void)
2145 {
2146 // This forces the IO Manager thread to wakeup, which will
2147 // in turn ensure that some OS thread wakes up and runs the
2148 // scheduler loop, which will cause a GC and deadlock check.
2149 ioManagerWakeup();
2150 }
2151 #endif
2152
2153 /* -----------------------------------------------------------------------------
2154 Deleting threads
2155
2156 This is used for interruption (^C) and forking, and corresponds to
2157 raising an exception but without letting the thread catch the
2158 exception.
2159 -------------------------------------------------------------------------- */
2160
2161 static void
2162 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2163 {
2164 // NOTE: must only be called on a TSO that we have exclusive
2165 // access to, because we will call throwToSingleThreaded() below.
2166 // The TSO must be on the run queue of the Capability we own, or
2167 // we must own all Capabilities.
2168
2169 if (tso->why_blocked != BlockedOnCCall &&
2170 tso->why_blocked != BlockedOnCCall_Interruptible) {
2171 throwToSingleThreaded(tso->cap,tso,NULL);
2172 }
2173 }
2174
2175 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2176 static void
2177 deleteThread_(Capability *cap, StgTSO *tso)
2178 { // for forkProcess only:
2179 // like deleteThread(), but we delete threads in foreign calls, too.
2180
2181 if (tso->why_blocked == BlockedOnCCall ||
2182 tso->why_blocked == BlockedOnCCall_Interruptible) {
2183 tso->what_next = ThreadKilled;
2184 appendToRunQueue(tso->cap, tso);
2185 } else {
2186 deleteThread(cap,tso);
2187 }
2188 }
2189 #endif
2190
2191 /* -----------------------------------------------------------------------------
2192 raiseExceptionHelper
2193
2194 This function is called by the raise# primitve, just so that we can
2195 move some of the tricky bits of raising an exception from C-- into
2196 C. Who knows, it might be a useful re-useable thing here too.
2197 -------------------------------------------------------------------------- */
2198
2199 StgWord
2200 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2201 {
2202 Capability *cap = regTableToCapability(reg);
2203 StgThunk *raise_closure = NULL;
2204 StgPtr p, next;
2205 StgRetInfoTable *info;
2206 //
2207 // This closure represents the expression 'raise# E' where E
2208 // is the exception raise. It is used to overwrite all the
2209 // thunks which are currently under evaluataion.
2210 //
2211
2212 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2213 // LDV profiling: stg_raise_info has THUNK as its closure
2214 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2215 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2216 // 1 does not cause any problem unless profiling is performed.
2217 // However, when LDV profiling goes on, we need to linearly scan
2218 // small object pool, where raise_closure is stored, so we should
2219 // use MIN_UPD_SIZE.
2220 //
2221 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2222 // sizeofW(StgClosure)+1);
2223 //
2224
2225 //
2226 // Walk up the stack, looking for the catch frame. On the way,
2227 // we update any closures pointed to from update frames with the
2228 // raise closure that we just built.
2229 //
2230 p = tso->stackobj->sp;
2231 while(1) {
2232 info = get_ret_itbl((StgClosure *)p);
2233 next = p + stack_frame_sizeW((StgClosure *)p);
2234 switch (info->i.type) {
2235
2236 case UPDATE_FRAME:
2237 // Only create raise_closure if we need to.
2238 if (raise_closure == NULL) {
2239 raise_closure =
2240 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2241 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2242 raise_closure->payload[0] = exception;
2243 }
2244 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2245 (StgClosure *)raise_closure);
2246 p = next;
2247 continue;
2248
2249 case ATOMICALLY_FRAME:
2250 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2251 tso->stackobj->sp = p;
2252 return ATOMICALLY_FRAME;
2253
2254 case CATCH_FRAME:
2255 tso->stackobj->sp = p;
2256 return CATCH_FRAME;
2257
2258 case CATCH_STM_FRAME:
2259 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2260 tso->stackobj->sp = p;
2261 return CATCH_STM_FRAME;
2262
2263 case UNDERFLOW_FRAME:
2264 tso->stackobj->sp = p;
2265 threadStackUnderflow(cap,tso);
2266 p = tso->stackobj->sp;
2267 continue;
2268
2269 case STOP_FRAME:
2270 tso->stackobj->sp = p;
2271 return STOP_FRAME;
2272
2273 case CATCH_RETRY_FRAME:
2274 default:
2275 p = next;
2276 continue;
2277 }
2278 }
2279 }
2280
2281
2282 /* -----------------------------------------------------------------------------
2283 findRetryFrameHelper
2284
2285 This function is called by the retry# primitive. It traverses the stack
2286 leaving tso->sp referring to the frame which should handle the retry.
2287
2288 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2289 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2290
2291 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2292 create) because retries are not considered to be exceptions, despite the
2293 similar implementation.
2294
2295 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2296 not be created within memory transactions.
2297 -------------------------------------------------------------------------- */
2298
2299 StgWord
2300 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2301 {
2302 StgPtr p, next;
2303 StgRetInfoTable *info;
2304
2305 p = tso->stackobj->sp;
2306 while (1) {
2307 info = get_ret_itbl((StgClosure *)p);
2308 next = p + stack_frame_sizeW((StgClosure *)p);
2309 switch (info->i.type) {
2310
2311 case ATOMICALLY_FRAME:
2312 debugTrace(DEBUG_stm,
2313 "found ATOMICALLY_FRAME at %p during retry", p);
2314 tso->stackobj->sp = p;
2315 return ATOMICALLY_FRAME;
2316
2317 case CATCH_RETRY_FRAME:
2318 debugTrace(DEBUG_stm,
2319 "found CATCH_RETRY_FRAME at %p during retrry", p);
2320 tso->stackobj->sp = p;
2321 return CATCH_RETRY_FRAME;
2322
2323 case CATCH_STM_FRAME: {
2324 StgTRecHeader *trec = tso -> trec;
2325 StgTRecHeader *outer = trec -> enclosing_trec;
2326 debugTrace(DEBUG_stm,
2327 "found CATCH_STM_FRAME at %p during retry", p);
2328 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2329 stmAbortTransaction(cap, trec);
2330 stmFreeAbortedTRec(cap, trec);
2331 tso -> trec = outer;
2332 p = next;
2333 continue;
2334 }
2335
2336 case UNDERFLOW_FRAME:
2337 threadStackUnderflow(cap,tso);
2338 p = tso->stackobj->sp;
2339 continue;
2340
2341 default:
2342 ASSERT(info->i.type != CATCH_FRAME);
2343 ASSERT(info->i.type != STOP_FRAME);
2344 p = next;
2345 continue;
2346 }
2347 }
2348 }
2349
2350 /* -----------------------------------------------------------------------------
2351 resurrectThreads is called after garbage collection on the list of
2352 threads found to be garbage. Each of these threads will be woken
2353 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2354 on an MVar, or NonTermination if the thread was blocked on a Black
2355 Hole.
2356
2357 Locks: assumes we hold *all* the capabilities.
2358 -------------------------------------------------------------------------- */
2359
2360 void
2361 resurrectThreads (StgTSO *threads)
2362 {
2363 StgTSO *tso, *next;
2364 Capability *cap;
2365 generation *gen;
2366
2367 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2368 next = tso->global_link;
2369
2370 gen = Bdescr((P_)tso)->gen;
2371 tso->global_link = gen->threads;
2372 gen->threads = tso;
2373
2374 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2375
2376 // Wake up the thread on the Capability it was last on
2377 cap = tso->cap;
2378
2379 switch (tso->why_blocked) {
2380 case BlockedOnMVar:
2381 /* Called by GC - sched_mutex lock is currently held. */
2382 throwToSingleThreaded(cap, tso,
2383 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2384 break;
2385 case BlockedOnBlackHole:
2386 throwToSingleThreaded(cap, tso,
2387 (StgClosure *)nonTermination_closure);
2388 break;
2389 case BlockedOnSTM:
2390 throwToSingleThreaded(cap, tso,
2391 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2392 break;
2393 case NotBlocked:
2394 /* This might happen if the thread was blocked on a black hole
2395 * belonging to a thread that we've just woken up (raiseAsync
2396 * can wake up threads, remember...).
2397 */
2398 continue;
2399 case BlockedOnMsgThrowTo:
2400 // This can happen if the target is masking, blocks on a
2401 // black hole, and then is found to be unreachable. In
2402 // this case, we want to let the target wake up and carry
2403 // on, and do nothing to this thread.
2404 continue;
2405 default:
2406 barf("resurrectThreads: thread blocked in a strange way: %d",
2407 tso->why_blocked);
2408 }
2409 }
2410 }