Change tryStealSpark so it does not consume fizzled sparks
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "Sparks.h"
31 #include "Capability.h"
32 #include "Task.h"
33 #include "AwaitEvent.h"
34 #if defined(mingw32_HOST_OS)
35 #include "win32/IOManager.h"
36 #endif
37 #include "Trace.h"
38 #include "RaiseAsync.h"
39 #include "Threads.h"
40 #include "Timer.h"
41 #include "ThreadPaused.h"
42 #include "Messages.h"
43
44 #ifdef HAVE_SYS_TYPES_H
45 #include <sys/types.h>
46 #endif
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <string.h>
52 #include <stdlib.h>
53 #include <stdarg.h>
54
55 #ifdef HAVE_ERRNO_H
56 #include <errno.h>
57 #endif
58
59 #ifdef TRACING
60 #include "eventlog/EventLog.h"
61 #endif
62 /* -----------------------------------------------------------------------------
63 * Global variables
64 * -------------------------------------------------------------------------- */
65
66 #if !defined(THREADED_RTS)
67 // Blocked/sleeping thrads
68 StgTSO *blocked_queue_hd = NULL;
69 StgTSO *blocked_queue_tl = NULL;
70 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
71 #endif
72
73 /* Set to true when the latest garbage collection failed to reclaim
74 * enough space, and the runtime should proceed to shut itself down in
75 * an orderly fashion (emitting profiling info etc.)
76 */
77 rtsBool heap_overflow = rtsFalse;
78
79 /* flag that tracks whether we have done any execution in this time slice.
80 * LOCK: currently none, perhaps we should lock (but needs to be
81 * updated in the fast path of the scheduler).
82 *
83 * NB. must be StgWord, we do xchg() on it.
84 */
85 volatile StgWord recent_activity = ACTIVITY_YES;
86
87 /* if this flag is set as well, give up execution
88 * LOCK: none (changes monotonically)
89 */
90 volatile StgWord sched_state = SCHED_RUNNING;
91
92 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
93 * exists - earlier gccs apparently didn't.
94 * -= chak
95 */
96 StgTSO dummy_tso;
97
98 /*
99 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
100 * in an MT setting, needed to signal that a worker thread shouldn't hang around
101 * in the scheduler when it is out of work.
102 */
103 rtsBool shutting_down_scheduler = rtsFalse;
104
105 /*
106 * This mutex protects most of the global scheduler data in
107 * the THREADED_RTS runtime.
108 */
109 #if defined(THREADED_RTS)
110 Mutex sched_mutex;
111 #endif
112
113 #if !defined(mingw32_HOST_OS)
114 #define FORKPROCESS_PRIMOP_SUPPORTED
115 #endif
116
117 /* -----------------------------------------------------------------------------
118 * static function prototypes
119 * -------------------------------------------------------------------------- */
120
121 static Capability *schedule (Capability *initialCapability, Task *task);
122
123 //
124 // These function all encapsulate parts of the scheduler loop, and are
125 // abstracted only to make the structure and control flow of the
126 // scheduler clearer.
127 //
128 static void schedulePreLoop (void);
129 static void scheduleFindWork (Capability *cap);
130 #if defined(THREADED_RTS)
131 static void scheduleYield (Capability **pcap, Task *task);
132 #endif
133 static void scheduleStartSignalHandlers (Capability *cap);
134 static void scheduleCheckBlockedThreads (Capability *cap);
135 static void scheduleProcessInbox(Capability *cap);
136 static void scheduleDetectDeadlock (Capability *cap, Task *task);
137 static void schedulePushWork(Capability *cap, Task *task);
138 #if defined(THREADED_RTS)
139 static void scheduleActivateSpark(Capability *cap);
140 #endif
141 static void schedulePostRunThread(Capability *cap, StgTSO *t);
142 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
143 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
144 nat prev_what_next );
145 static void scheduleHandleThreadBlocked( StgTSO *t );
146 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
147 StgTSO *t );
148 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
149 static Capability *scheduleDoGC(Capability *cap, Task *task,
150 rtsBool force_major);
151
152 static void deleteThread (Capability *cap, StgTSO *tso);
153 static void deleteAllThreads (Capability *cap);
154
155 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
156 static void deleteThread_(Capability *cap, StgTSO *tso);
157 #endif
158
159 /* ---------------------------------------------------------------------------
160 Main scheduling loop.
161
162 We use round-robin scheduling, each thread returning to the
163 scheduler loop when one of these conditions is detected:
164
165 * out of heap space
166 * timer expires (thread yields)
167 * thread blocks
168 * thread ends
169 * stack overflow
170
171 GRAN version:
172 In a GranSim setup this loop iterates over the global event queue.
173 This revolves around the global event queue, which determines what
174 to do next. Therefore, it's more complicated than either the
175 concurrent or the parallel (GUM) setup.
176 This version has been entirely removed (JB 2008/08).
177
178 GUM version:
179 GUM iterates over incoming messages.
180 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
181 and sends out a fish whenever it has nothing to do; in-between
182 doing the actual reductions (shared code below) it processes the
183 incoming messages and deals with delayed operations
184 (see PendingFetches).
185 This is not the ugliest code you could imagine, but it's bloody close.
186
187 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
188 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
189 as well as future GUM versions. This file has been refurbished to
190 only contain valid code, which is however incomplete, refers to
191 invalid includes etc.
192
193 ------------------------------------------------------------------------ */
194
195 static Capability *
196 schedule (Capability *initialCapability, Task *task)
197 {
198 StgTSO *t;
199 Capability *cap;
200 StgThreadReturnCode ret;
201 nat prev_what_next;
202 rtsBool ready_to_gc;
203 #if defined(THREADED_RTS)
204 rtsBool first = rtsTrue;
205 #endif
206
207 cap = initialCapability;
208
209 // Pre-condition: this task owns initialCapability.
210 // The sched_mutex is *NOT* held
211 // NB. on return, we still hold a capability.
212
213 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
214
215 schedulePreLoop();
216
217 // -----------------------------------------------------------
218 // Scheduler loop starts here:
219
220 while (1) {
221
222 // Check whether we have re-entered the RTS from Haskell without
223 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
224 // call).
225 if (cap->in_haskell) {
226 errorBelch("schedule: re-entered unsafely.\n"
227 " Perhaps a 'foreign import unsafe' should be 'safe'?");
228 stg_exit(EXIT_FAILURE);
229 }
230
231 // The interruption / shutdown sequence.
232 //
233 // In order to cleanly shut down the runtime, we want to:
234 // * make sure that all main threads return to their callers
235 // with the state 'Interrupted'.
236 // * clean up all OS threads assocated with the runtime
237 // * free all memory etc.
238 //
239 // So the sequence for ^C goes like this:
240 //
241 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
242 // arranges for some Capability to wake up
243 //
244 // * all threads in the system are halted, and the zombies are
245 // placed on the run queue for cleaning up. We acquire all
246 // the capabilities in order to delete the threads, this is
247 // done by scheduleDoGC() for convenience (because GC already
248 // needs to acquire all the capabilities). We can't kill
249 // threads involved in foreign calls.
250 //
251 // * somebody calls shutdownHaskell(), which calls exitScheduler()
252 //
253 // * sched_state := SCHED_SHUTTING_DOWN
254 //
255 // * all workers exit when the run queue on their capability
256 // drains. All main threads will also exit when their TSO
257 // reaches the head of the run queue and they can return.
258 //
259 // * eventually all Capabilities will shut down, and the RTS can
260 // exit.
261 //
262 // * We might be left with threads blocked in foreign calls,
263 // we should really attempt to kill these somehow (TODO);
264
265 switch (sched_state) {
266 case SCHED_RUNNING:
267 break;
268 case SCHED_INTERRUPTING:
269 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
270 #if defined(THREADED_RTS)
271 discardSparksCap(cap);
272 #endif
273 /* scheduleDoGC() deletes all the threads */
274 cap = scheduleDoGC(cap,task,rtsFalse);
275
276 // after scheduleDoGC(), we must be shutting down. Either some
277 // other Capability did the final GC, or we did it above,
278 // either way we can fall through to the SCHED_SHUTTING_DOWN
279 // case now.
280 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
281 // fall through
282
283 case SCHED_SHUTTING_DOWN:
284 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
285 // If we are a worker, just exit. If we're a bound thread
286 // then we will exit below when we've removed our TSO from
287 // the run queue.
288 if (!isBoundTask(task) && emptyRunQueue(cap)) {
289 return cap;
290 }
291 break;
292 default:
293 barf("sched_state: %d", sched_state);
294 }
295
296 scheduleFindWork(cap);
297
298 /* work pushing, currently relevant only for THREADED_RTS:
299 (pushes threads, wakes up idle capabilities for stealing) */
300 schedulePushWork(cap,task);
301
302 scheduleDetectDeadlock(cap,task);
303
304 #if defined(THREADED_RTS)
305 cap = task->cap; // reload cap, it might have changed
306 #endif
307
308 // Normally, the only way we can get here with no threads to
309 // run is if a keyboard interrupt received during
310 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
311 // Additionally, it is not fatal for the
312 // threaded RTS to reach here with no threads to run.
313 //
314 // win32: might be here due to awaitEvent() being abandoned
315 // as a result of a console event having been delivered.
316
317 #if defined(THREADED_RTS)
318 if (first)
319 {
320 // XXX: ToDo
321 // // don't yield the first time, we want a chance to run this
322 // // thread for a bit, even if there are others banging at the
323 // // door.
324 // first = rtsFalse;
325 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
326 }
327
328 scheduleYield(&cap,task);
329
330 if (emptyRunQueue(cap)) continue; // look for work again
331 #endif
332
333 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
334 if ( emptyRunQueue(cap) ) {
335 ASSERT(sched_state >= SCHED_INTERRUPTING);
336 }
337 #endif
338
339 //
340 // Get a thread to run
341 //
342 t = popRunQueue(cap);
343
344 // Sanity check the thread we're about to run. This can be
345 // expensive if there is lots of thread switching going on...
346 IF_DEBUG(sanity,checkTSO(t));
347
348 #if defined(THREADED_RTS)
349 // Check whether we can run this thread in the current task.
350 // If not, we have to pass our capability to the right task.
351 {
352 InCall *bound = t->bound;
353
354 if (bound) {
355 if (bound->task == task) {
356 // yes, the Haskell thread is bound to the current native thread
357 } else {
358 debugTrace(DEBUG_sched,
359 "thread %lu bound to another OS thread",
360 (unsigned long)t->id);
361 // no, bound to a different Haskell thread: pass to that thread
362 pushOnRunQueue(cap,t);
363 continue;
364 }
365 } else {
366 // The thread we want to run is unbound.
367 if (task->incall->tso) {
368 debugTrace(DEBUG_sched,
369 "this OS thread cannot run thread %lu",
370 (unsigned long)t->id);
371 // no, the current native thread is bound to a different
372 // Haskell thread, so pass it to any worker thread
373 pushOnRunQueue(cap,t);
374 continue;
375 }
376 }
377 }
378 #endif
379
380 // If we're shutting down, and this thread has not yet been
381 // killed, kill it now. This sometimes happens when a finalizer
382 // thread is created by the final GC, or a thread previously
383 // in a foreign call returns.
384 if (sched_state >= SCHED_INTERRUPTING &&
385 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
386 deleteThread(cap,t);
387 }
388
389 /* context switches are initiated by the timer signal, unless
390 * the user specified "context switch as often as possible", with
391 * +RTS -C0
392 */
393 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
394 && !emptyThreadQueues(cap)) {
395 cap->context_switch = 1;
396 }
397
398 run_thread:
399
400 // CurrentTSO is the thread to run. t might be different if we
401 // loop back to run_thread, so make sure to set CurrentTSO after
402 // that.
403 cap->r.rCurrentTSO = t;
404
405 startHeapProfTimer();
406
407 // ----------------------------------------------------------------------
408 // Run the current thread
409
410 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
411 ASSERT(t->cap == cap);
412 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
413
414 prev_what_next = t->what_next;
415
416 errno = t->saved_errno;
417 #if mingw32_HOST_OS
418 SetLastError(t->saved_winerror);
419 #endif
420
421 cap->in_haskell = rtsTrue;
422
423 dirty_TSO(cap,t);
424 dirty_STACK(cap,t->stackobj);
425
426 #if defined(THREADED_RTS)
427 if (recent_activity == ACTIVITY_DONE_GC) {
428 // ACTIVITY_DONE_GC means we turned off the timer signal to
429 // conserve power (see #1623). Re-enable it here.
430 nat prev;
431 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
432 if (prev == ACTIVITY_DONE_GC) {
433 startTimer();
434 }
435 } else if (recent_activity != ACTIVITY_INACTIVE) {
436 // If we reached ACTIVITY_INACTIVE, then don't reset it until
437 // we've done the GC. The thread running here might just be
438 // the IO manager thread that handle_tick() woke up via
439 // wakeUpRts().
440 recent_activity = ACTIVITY_YES;
441 }
442 #endif
443
444 traceEventRunThread(cap, t);
445
446 switch (prev_what_next) {
447
448 case ThreadKilled:
449 case ThreadComplete:
450 /* Thread already finished, return to scheduler. */
451 ret = ThreadFinished;
452 break;
453
454 case ThreadRunGHC:
455 {
456 StgRegTable *r;
457 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
458 cap = regTableToCapability(r);
459 ret = r->rRet;
460 break;
461 }
462
463 case ThreadInterpret:
464 cap = interpretBCO(cap);
465 ret = cap->r.rRet;
466 break;
467
468 default:
469 barf("schedule: invalid what_next field");
470 }
471
472 cap->in_haskell = rtsFalse;
473
474 // The TSO might have moved, eg. if it re-entered the RTS and a GC
475 // happened. So find the new location:
476 t = cap->r.rCurrentTSO;
477
478 // And save the current errno in this thread.
479 // XXX: possibly bogus for SMP because this thread might already
480 // be running again, see code below.
481 t->saved_errno = errno;
482 #if mingw32_HOST_OS
483 // Similarly for Windows error code
484 t->saved_winerror = GetLastError();
485 #endif
486
487 if (ret == ThreadBlocked) {
488 if (t->why_blocked == BlockedOnBlackHole) {
489 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
490 traceEventStopThread(cap, t, t->why_blocked + 6,
491 owner != NULL ? owner->id : 0);
492 } else {
493 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
494 }
495 } else {
496 traceEventStopThread(cap, t, ret, 0);
497 }
498
499 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
500 ASSERT(t->cap == cap);
501
502 // ----------------------------------------------------------------------
503
504 // Costs for the scheduler are assigned to CCS_SYSTEM
505 stopHeapProfTimer();
506 #if defined(PROFILING)
507 CCCS = CCS_SYSTEM;
508 #endif
509
510 schedulePostRunThread(cap,t);
511
512 ready_to_gc = rtsFalse;
513
514 switch (ret) {
515 case HeapOverflow:
516 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
517 break;
518
519 case StackOverflow:
520 // just adjust the stack for this thread, then pop it back
521 // on the run queue.
522 threadStackOverflow(cap, t);
523 pushOnRunQueue(cap,t);
524 break;
525
526 case ThreadYielding:
527 if (scheduleHandleYield(cap, t, prev_what_next)) {
528 // shortcut for switching between compiler/interpreter:
529 goto run_thread;
530 }
531 break;
532
533 case ThreadBlocked:
534 scheduleHandleThreadBlocked(t);
535 break;
536
537 case ThreadFinished:
538 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
539 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
540 break;
541
542 default:
543 barf("schedule: invalid thread return code %d", (int)ret);
544 }
545
546 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
547 cap = scheduleDoGC(cap,task,rtsFalse);
548 }
549 } /* end of while() */
550 }
551
552 /* -----------------------------------------------------------------------------
553 * Run queue operations
554 * -------------------------------------------------------------------------- */
555
556 void
557 removeFromRunQueue (Capability *cap, StgTSO *tso)
558 {
559 if (tso->block_info.prev == END_TSO_QUEUE) {
560 ASSERT(cap->run_queue_hd == tso);
561 cap->run_queue_hd = tso->_link;
562 } else {
563 setTSOLink(cap, tso->block_info.prev, tso->_link);
564 }
565 if (tso->_link == END_TSO_QUEUE) {
566 ASSERT(cap->run_queue_tl == tso);
567 cap->run_queue_tl = tso->block_info.prev;
568 } else {
569 setTSOPrev(cap, tso->_link, tso->block_info.prev);
570 }
571 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
572
573 IF_DEBUG(sanity, checkRunQueue(cap));
574 }
575
576 /* ----------------------------------------------------------------------------
577 * Setting up the scheduler loop
578 * ------------------------------------------------------------------------- */
579
580 static void
581 schedulePreLoop(void)
582 {
583 // initialisation for scheduler - what cannot go into initScheduler()
584
585 #if defined(mingw32_HOST_OS)
586 win32AllocStack();
587 #endif
588 }
589
590 /* -----------------------------------------------------------------------------
591 * scheduleFindWork()
592 *
593 * Search for work to do, and handle messages from elsewhere.
594 * -------------------------------------------------------------------------- */
595
596 static void
597 scheduleFindWork (Capability *cap)
598 {
599 scheduleStartSignalHandlers(cap);
600
601 scheduleProcessInbox(cap);
602
603 scheduleCheckBlockedThreads(cap);
604
605 #if defined(THREADED_RTS)
606 if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
607 #endif
608 }
609
610 #if defined(THREADED_RTS)
611 STATIC_INLINE rtsBool
612 shouldYieldCapability (Capability *cap, Task *task)
613 {
614 // we need to yield this capability to someone else if..
615 // - another thread is initiating a GC
616 // - another Task is returning from a foreign call
617 // - the thread at the head of the run queue cannot be run
618 // by this Task (it is bound to another Task, or it is unbound
619 // and this task it bound).
620 return (waiting_for_gc ||
621 cap->returning_tasks_hd != NULL ||
622 (!emptyRunQueue(cap) && (task->incall->tso == NULL
623 ? cap->run_queue_hd->bound != NULL
624 : cap->run_queue_hd->bound != task->incall)));
625 }
626
627 // This is the single place where a Task goes to sleep. There are
628 // two reasons it might need to sleep:
629 // - there are no threads to run
630 // - we need to yield this Capability to someone else
631 // (see shouldYieldCapability())
632 //
633 // Careful: the scheduler loop is quite delicate. Make sure you run
634 // the tests in testsuite/concurrent (all ways) after modifying this,
635 // and also check the benchmarks in nofib/parallel for regressions.
636
637 static void
638 scheduleYield (Capability **pcap, Task *task)
639 {
640 Capability *cap = *pcap;
641
642 // if we have work, and we don't need to give up the Capability, continue.
643 //
644 if (!shouldYieldCapability(cap,task) &&
645 (!emptyRunQueue(cap) ||
646 !emptyInbox(cap) ||
647 sched_state >= SCHED_INTERRUPTING))
648 return;
649
650 // otherwise yield (sleep), and keep yielding if necessary.
651 do {
652 yieldCapability(&cap,task);
653 }
654 while (shouldYieldCapability(cap,task));
655
656 // note there may still be no threads on the run queue at this
657 // point, the caller has to check.
658
659 *pcap = cap;
660 return;
661 }
662 #endif
663
664 /* -----------------------------------------------------------------------------
665 * schedulePushWork()
666 *
667 * Push work to other Capabilities if we have some.
668 * -------------------------------------------------------------------------- */
669
670 static void
671 schedulePushWork(Capability *cap USED_IF_THREADS,
672 Task *task USED_IF_THREADS)
673 {
674 /* following code not for PARALLEL_HASKELL. I kept the call general,
675 future GUM versions might use pushing in a distributed setup */
676 #if defined(THREADED_RTS)
677
678 Capability *free_caps[n_capabilities], *cap0;
679 nat i, n_free_caps;
680
681 // migration can be turned off with +RTS -qm
682 if (!RtsFlags.ParFlags.migrate) return;
683
684 // Check whether we have more threads on our run queue, or sparks
685 // in our pool, that we could hand to another Capability.
686 if (cap->run_queue_hd == END_TSO_QUEUE) {
687 if (sparkPoolSizeCap(cap) < 2) return;
688 } else {
689 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
690 sparkPoolSizeCap(cap) < 1) return;
691 }
692
693 // First grab as many free Capabilities as we can.
694 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
695 cap0 = &capabilities[i];
696 if (cap != cap0 && tryGrabCapability(cap0,task)) {
697 if (!emptyRunQueue(cap0)
698 || cap->returning_tasks_hd != NULL
699 || cap->inbox != (Message*)END_TSO_QUEUE) {
700 // it already has some work, we just grabbed it at
701 // the wrong moment. Or maybe it's deadlocked!
702 releaseCapability(cap0);
703 } else {
704 free_caps[n_free_caps++] = cap0;
705 }
706 }
707 }
708
709 // we now have n_free_caps free capabilities stashed in
710 // free_caps[]. Share our run queue equally with them. This is
711 // probably the simplest thing we could do; improvements we might
712 // want to do include:
713 //
714 // - giving high priority to moving relatively new threads, on
715 // the gournds that they haven't had time to build up a
716 // working set in the cache on this CPU/Capability.
717 //
718 // - giving low priority to moving long-lived threads
719
720 if (n_free_caps > 0) {
721 StgTSO *prev, *t, *next;
722 #ifdef SPARK_PUSHING
723 rtsBool pushed_to_all;
724 #endif
725
726 debugTrace(DEBUG_sched,
727 "cap %d: %s and %d free capabilities, sharing...",
728 cap->no,
729 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
730 "excess threads on run queue":"sparks to share (>=2)",
731 n_free_caps);
732
733 i = 0;
734 #ifdef SPARK_PUSHING
735 pushed_to_all = rtsFalse;
736 #endif
737
738 if (cap->run_queue_hd != END_TSO_QUEUE) {
739 prev = cap->run_queue_hd;
740 t = prev->_link;
741 prev->_link = END_TSO_QUEUE;
742 for (; t != END_TSO_QUEUE; t = next) {
743 next = t->_link;
744 t->_link = END_TSO_QUEUE;
745 if (t->bound == task->incall // don't move my bound thread
746 || tsoLocked(t)) { // don't move a locked thread
747 setTSOLink(cap, prev, t);
748 setTSOPrev(cap, t, prev);
749 prev = t;
750 } else if (i == n_free_caps) {
751 #ifdef SPARK_PUSHING
752 pushed_to_all = rtsTrue;
753 #endif
754 i = 0;
755 // keep one for us
756 setTSOLink(cap, prev, t);
757 setTSOPrev(cap, t, prev);
758 prev = t;
759 } else {
760 appendToRunQueue(free_caps[i],t);
761
762 traceEventMigrateThread (cap, t, free_caps[i]->no);
763
764 if (t->bound) { t->bound->task->cap = free_caps[i]; }
765 t->cap = free_caps[i];
766 i++;
767 }
768 }
769 cap->run_queue_tl = prev;
770
771 IF_DEBUG(sanity, checkRunQueue(cap));
772 }
773
774 #ifdef SPARK_PUSHING
775 /* JB I left this code in place, it would work but is not necessary */
776
777 // If there are some free capabilities that we didn't push any
778 // threads to, then try to push a spark to each one.
779 if (!pushed_to_all) {
780 StgClosure *spark;
781 // i is the next free capability to push to
782 for (; i < n_free_caps; i++) {
783 if (emptySparkPoolCap(free_caps[i])) {
784 spark = tryStealSpark(cap->sparks);
785 if (spark != NULL) {
786 /* TODO: if anyone wants to re-enable this code then
787 * they must consider the fizzledSpark(spark) case
788 * and update the per-cap spark statistics.
789 */
790 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
791
792 traceEventStealSpark(free_caps[i], t, cap->no);
793
794 newSpark(&(free_caps[i]->r), spark);
795 }
796 }
797 }
798 }
799 #endif /* SPARK_PUSHING */
800
801 // release the capabilities
802 for (i = 0; i < n_free_caps; i++) {
803 task->cap = free_caps[i];
804 releaseAndWakeupCapability(free_caps[i]);
805 }
806 }
807 task->cap = cap; // reset to point to our Capability.
808
809 #endif /* THREADED_RTS */
810
811 }
812
813 /* ----------------------------------------------------------------------------
814 * Start any pending signal handlers
815 * ------------------------------------------------------------------------- */
816
817 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
818 static void
819 scheduleStartSignalHandlers(Capability *cap)
820 {
821 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
822 // safe outside the lock
823 startSignalHandlers(cap);
824 }
825 }
826 #else
827 static void
828 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
829 {
830 }
831 #endif
832
833 /* ----------------------------------------------------------------------------
834 * Check for blocked threads that can be woken up.
835 * ------------------------------------------------------------------------- */
836
837 static void
838 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
839 {
840 #if !defined(THREADED_RTS)
841 //
842 // Check whether any waiting threads need to be woken up. If the
843 // run queue is empty, and there are no other tasks running, we
844 // can wait indefinitely for something to happen.
845 //
846 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
847 {
848 awaitEvent (emptyRunQueue(cap));
849 }
850 #endif
851 }
852
853 /* ----------------------------------------------------------------------------
854 * Detect deadlock conditions and attempt to resolve them.
855 * ------------------------------------------------------------------------- */
856
857 static void
858 scheduleDetectDeadlock (Capability *cap, Task *task)
859 {
860 /*
861 * Detect deadlock: when we have no threads to run, there are no
862 * threads blocked, waiting for I/O, or sleeping, and all the
863 * other tasks are waiting for work, we must have a deadlock of
864 * some description.
865 */
866 if ( emptyThreadQueues(cap) )
867 {
868 #if defined(THREADED_RTS)
869 /*
870 * In the threaded RTS, we only check for deadlock if there
871 * has been no activity in a complete timeslice. This means
872 * we won't eagerly start a full GC just because we don't have
873 * any threads to run currently.
874 */
875 if (recent_activity != ACTIVITY_INACTIVE) return;
876 #endif
877
878 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
879
880 // Garbage collection can release some new threads due to
881 // either (a) finalizers or (b) threads resurrected because
882 // they are unreachable and will therefore be sent an
883 // exception. Any threads thus released will be immediately
884 // runnable.
885 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
886 // when force_major == rtsTrue. scheduleDoGC sets
887 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
888 // signal.
889
890 if ( !emptyRunQueue(cap) ) return;
891
892 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
893 /* If we have user-installed signal handlers, then wait
894 * for signals to arrive rather then bombing out with a
895 * deadlock.
896 */
897 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
898 debugTrace(DEBUG_sched,
899 "still deadlocked, waiting for signals...");
900
901 awaitUserSignals();
902
903 if (signals_pending()) {
904 startSignalHandlers(cap);
905 }
906
907 // either we have threads to run, or we were interrupted:
908 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
909
910 return;
911 }
912 #endif
913
914 #if !defined(THREADED_RTS)
915 /* Probably a real deadlock. Send the current main thread the
916 * Deadlock exception.
917 */
918 if (task->incall->tso) {
919 switch (task->incall->tso->why_blocked) {
920 case BlockedOnSTM:
921 case BlockedOnBlackHole:
922 case BlockedOnMsgThrowTo:
923 case BlockedOnMVar:
924 throwToSingleThreaded(cap, task->incall->tso,
925 (StgClosure *)nonTermination_closure);
926 return;
927 default:
928 barf("deadlock: main thread blocked in a strange way");
929 }
930 }
931 return;
932 #endif
933 }
934 }
935
936
937 /* ----------------------------------------------------------------------------
938 * Send pending messages (PARALLEL_HASKELL only)
939 * ------------------------------------------------------------------------- */
940
941 #if defined(PARALLEL_HASKELL)
942 static void
943 scheduleSendPendingMessages(void)
944 {
945
946 # if defined(PAR) // global Mem.Mgmt., omit for now
947 if (PendingFetches != END_BF_QUEUE) {
948 processFetches();
949 }
950 # endif
951
952 if (RtsFlags.ParFlags.BufferTime) {
953 // if we use message buffering, we must send away all message
954 // packets which have become too old...
955 sendOldBuffers();
956 }
957 }
958 #endif
959
960 /* ----------------------------------------------------------------------------
961 * Process message in the current Capability's inbox
962 * ------------------------------------------------------------------------- */
963
964 static void
965 scheduleProcessInbox (Capability *cap USED_IF_THREADS)
966 {
967 #if defined(THREADED_RTS)
968 Message *m, *next;
969 int r;
970
971 while (!emptyInbox(cap)) {
972 if (cap->r.rCurrentNursery->link == NULL ||
973 g0->n_new_large_words >= large_alloc_lim) {
974 scheduleDoGC(cap, cap->running_task, rtsFalse);
975 }
976
977 // don't use a blocking acquire; if the lock is held by
978 // another thread then just carry on. This seems to avoid
979 // getting stuck in a message ping-pong situation with other
980 // processors. We'll check the inbox again later anyway.
981 //
982 // We should really use a more efficient queue data structure
983 // here. The trickiness is that we must ensure a Capability
984 // never goes idle if the inbox is non-empty, which is why we
985 // use cap->lock (cap->lock is released as the last thing
986 // before going idle; see Capability.c:releaseCapability()).
987 r = TRY_ACQUIRE_LOCK(&cap->lock);
988 if (r != 0) return;
989
990 m = cap->inbox;
991 cap->inbox = (Message*)END_TSO_QUEUE;
992
993 RELEASE_LOCK(&cap->lock);
994
995 while (m != (Message*)END_TSO_QUEUE) {
996 next = m->link;
997 executeMessage(cap, m);
998 m = next;
999 }
1000 }
1001 #endif
1002 }
1003
1004 /* ----------------------------------------------------------------------------
1005 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1006 * ------------------------------------------------------------------------- */
1007
1008 #if defined(THREADED_RTS)
1009 static void
1010 scheduleActivateSpark(Capability *cap)
1011 {
1012 if (anySparks())
1013 {
1014 createSparkThread(cap);
1015 debugTrace(DEBUG_sched, "creating a spark thread");
1016 }
1017 }
1018 #endif // PARALLEL_HASKELL || THREADED_RTS
1019
1020 /* ----------------------------------------------------------------------------
1021 * After running a thread...
1022 * ------------------------------------------------------------------------- */
1023
1024 static void
1025 schedulePostRunThread (Capability *cap, StgTSO *t)
1026 {
1027 // We have to be able to catch transactions that are in an
1028 // infinite loop as a result of seeing an inconsistent view of
1029 // memory, e.g.
1030 //
1031 // atomically $ do
1032 // [a,b] <- mapM readTVar [ta,tb]
1033 // when (a == b) loop
1034 //
1035 // and a is never equal to b given a consistent view of memory.
1036 //
1037 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1038 if (!stmValidateNestOfTransactions (t -> trec)) {
1039 debugTrace(DEBUG_sched | DEBUG_stm,
1040 "trec %p found wasting its time", t);
1041
1042 // strip the stack back to the
1043 // ATOMICALLY_FRAME, aborting the (nested)
1044 // transaction, and saving the stack of any
1045 // partially-evaluated thunks on the heap.
1046 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1047
1048 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1049 }
1050 }
1051
1052 /* some statistics gathering in the parallel case */
1053 }
1054
1055 /* -----------------------------------------------------------------------------
1056 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1057 * -------------------------------------------------------------------------- */
1058
1059 static rtsBool
1060 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1061 {
1062 // did the task ask for a large block?
1063 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1064 // if so, get one and push it on the front of the nursery.
1065 bdescr *bd;
1066 lnat blocks;
1067
1068 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1069
1070 if (blocks > BLOCKS_PER_MBLOCK) {
1071 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1072 }
1073
1074 debugTrace(DEBUG_sched,
1075 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1076 (long)t->id, what_next_strs[t->what_next], blocks);
1077
1078 // don't do this if the nursery is (nearly) full, we'll GC first.
1079 if (cap->r.rCurrentNursery->link != NULL ||
1080 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1081 // if the nursery has only one block.
1082
1083 bd = allocGroup_lock(blocks);
1084 cap->r.rNursery->n_blocks += blocks;
1085
1086 // link the new group into the list
1087 bd->link = cap->r.rCurrentNursery;
1088 bd->u.back = cap->r.rCurrentNursery->u.back;
1089 if (cap->r.rCurrentNursery->u.back != NULL) {
1090 cap->r.rCurrentNursery->u.back->link = bd;
1091 } else {
1092 cap->r.rNursery->blocks = bd;
1093 }
1094 cap->r.rCurrentNursery->u.back = bd;
1095
1096 // initialise it as a nursery block. We initialise the
1097 // step, gen_no, and flags field of *every* sub-block in
1098 // this large block, because this is easier than making
1099 // sure that we always find the block head of a large
1100 // block whenever we call Bdescr() (eg. evacuate() and
1101 // isAlive() in the GC would both have to do this, at
1102 // least).
1103 {
1104 bdescr *x;
1105 for (x = bd; x < bd + blocks; x++) {
1106 initBdescr(x,g0,g0);
1107 x->free = x->start;
1108 x->flags = 0;
1109 }
1110 }
1111
1112 // This assert can be a killer if the app is doing lots
1113 // of large block allocations.
1114 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1115
1116 // now update the nursery to point to the new block
1117 cap->r.rCurrentNursery = bd;
1118
1119 // we might be unlucky and have another thread get on the
1120 // run queue before us and steal the large block, but in that
1121 // case the thread will just end up requesting another large
1122 // block.
1123 pushOnRunQueue(cap,t);
1124 return rtsFalse; /* not actually GC'ing */
1125 }
1126 }
1127
1128 if (cap->r.rHpLim == NULL || cap->context_switch) {
1129 // Sometimes we miss a context switch, e.g. when calling
1130 // primitives in a tight loop, MAYBE_GC() doesn't check the
1131 // context switch flag, and we end up waiting for a GC.
1132 // See #1984, and concurrent/should_run/1984
1133 cap->context_switch = 0;
1134 appendToRunQueue(cap,t);
1135 } else {
1136 pushOnRunQueue(cap,t);
1137 }
1138 return rtsTrue;
1139 /* actual GC is done at the end of the while loop in schedule() */
1140 }
1141
1142 /* -----------------------------------------------------------------------------
1143 * Handle a thread that returned to the scheduler with ThreadYielding
1144 * -------------------------------------------------------------------------- */
1145
1146 static rtsBool
1147 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1148 {
1149 /* put the thread back on the run queue. Then, if we're ready to
1150 * GC, check whether this is the last task to stop. If so, wake
1151 * up the GC thread. getThread will block during a GC until the
1152 * GC is finished.
1153 */
1154
1155 ASSERT(t->_link == END_TSO_QUEUE);
1156
1157 // Shortcut if we're just switching evaluators: don't bother
1158 // doing stack squeezing (which can be expensive), just run the
1159 // thread.
1160 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1161 debugTrace(DEBUG_sched,
1162 "--<< thread %ld (%s) stopped to switch evaluators",
1163 (long)t->id, what_next_strs[t->what_next]);
1164 return rtsTrue;
1165 }
1166
1167 // Reset the context switch flag. We don't do this just before
1168 // running the thread, because that would mean we would lose ticks
1169 // during GC, which can lead to unfair scheduling (a thread hogs
1170 // the CPU because the tick always arrives during GC). This way
1171 // penalises threads that do a lot of allocation, but that seems
1172 // better than the alternative.
1173 cap->context_switch = 0;
1174
1175 IF_DEBUG(sanity,
1176 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1177 checkTSO(t));
1178
1179 appendToRunQueue(cap,t);
1180
1181 return rtsFalse;
1182 }
1183
1184 /* -----------------------------------------------------------------------------
1185 * Handle a thread that returned to the scheduler with ThreadBlocked
1186 * -------------------------------------------------------------------------- */
1187
1188 static void
1189 scheduleHandleThreadBlocked( StgTSO *t
1190 #if !defined(DEBUG)
1191 STG_UNUSED
1192 #endif
1193 )
1194 {
1195
1196 // We don't need to do anything. The thread is blocked, and it
1197 // has tidied up its stack and placed itself on whatever queue
1198 // it needs to be on.
1199
1200 // ASSERT(t->why_blocked != NotBlocked);
1201 // Not true: for example,
1202 // - the thread may have woken itself up already, because
1203 // threadPaused() might have raised a blocked throwTo
1204 // exception, see maybePerformBlockedException().
1205
1206 #ifdef DEBUG
1207 traceThreadStatus(DEBUG_sched, t);
1208 #endif
1209 }
1210
1211 /* -----------------------------------------------------------------------------
1212 * Handle a thread that returned to the scheduler with ThreadFinished
1213 * -------------------------------------------------------------------------- */
1214
1215 static rtsBool
1216 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1217 {
1218 /* Need to check whether this was a main thread, and if so,
1219 * return with the return value.
1220 *
1221 * We also end up here if the thread kills itself with an
1222 * uncaught exception, see Exception.cmm.
1223 */
1224
1225 // blocked exceptions can now complete, even if the thread was in
1226 // blocked mode (see #2910).
1227 awakenBlockedExceptionQueue (cap, t);
1228
1229 //
1230 // Check whether the thread that just completed was a bound
1231 // thread, and if so return with the result.
1232 //
1233 // There is an assumption here that all thread completion goes
1234 // through this point; we need to make sure that if a thread
1235 // ends up in the ThreadKilled state, that it stays on the run
1236 // queue so it can be dealt with here.
1237 //
1238
1239 if (t->bound) {
1240
1241 if (t->bound != task->incall) {
1242 #if !defined(THREADED_RTS)
1243 // Must be a bound thread that is not the topmost one. Leave
1244 // it on the run queue until the stack has unwound to the
1245 // point where we can deal with this. Leaving it on the run
1246 // queue also ensures that the garbage collector knows about
1247 // this thread and its return value (it gets dropped from the
1248 // step->threads list so there's no other way to find it).
1249 appendToRunQueue(cap,t);
1250 return rtsFalse;
1251 #else
1252 // this cannot happen in the threaded RTS, because a
1253 // bound thread can only be run by the appropriate Task.
1254 barf("finished bound thread that isn't mine");
1255 #endif
1256 }
1257
1258 ASSERT(task->incall->tso == t);
1259
1260 if (t->what_next == ThreadComplete) {
1261 if (task->incall->ret) {
1262 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1263 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1264 }
1265 task->incall->stat = Success;
1266 } else {
1267 if (task->incall->ret) {
1268 *(task->incall->ret) = NULL;
1269 }
1270 if (sched_state >= SCHED_INTERRUPTING) {
1271 if (heap_overflow) {
1272 task->incall->stat = HeapExhausted;
1273 } else {
1274 task->incall->stat = Interrupted;
1275 }
1276 } else {
1277 task->incall->stat = Killed;
1278 }
1279 }
1280 #ifdef DEBUG
1281 removeThreadLabel((StgWord)task->incall->tso->id);
1282 #endif
1283
1284 // We no longer consider this thread and task to be bound to
1285 // each other. The TSO lives on until it is GC'd, but the
1286 // task is about to be released by the caller, and we don't
1287 // want anyone following the pointer from the TSO to the
1288 // defunct task (which might have already been
1289 // re-used). This was a real bug: the GC updated
1290 // tso->bound->tso which lead to a deadlock.
1291 t->bound = NULL;
1292 task->incall->tso = NULL;
1293
1294 return rtsTrue; // tells schedule() to return
1295 }
1296
1297 return rtsFalse;
1298 }
1299
1300 /* -----------------------------------------------------------------------------
1301 * Perform a heap census
1302 * -------------------------------------------------------------------------- */
1303
1304 static rtsBool
1305 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1306 {
1307 // When we have +RTS -i0 and we're heap profiling, do a census at
1308 // every GC. This lets us get repeatable runs for debugging.
1309 if (performHeapProfile ||
1310 (RtsFlags.ProfFlags.profileInterval==0 &&
1311 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1312 return rtsTrue;
1313 } else {
1314 return rtsFalse;
1315 }
1316 }
1317
1318 /* -----------------------------------------------------------------------------
1319 * Perform a garbage collection if necessary
1320 * -------------------------------------------------------------------------- */
1321
1322 static Capability *
1323 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1324 {
1325 rtsBool heap_census;
1326 #ifdef THREADED_RTS
1327 /* extern static volatile StgWord waiting_for_gc;
1328 lives inside capability.c */
1329 rtsBool gc_type, prev_pending_gc;
1330 nat i;
1331 #endif
1332
1333 if (sched_state == SCHED_SHUTTING_DOWN) {
1334 // The final GC has already been done, and the system is
1335 // shutting down. We'll probably deadlock if we try to GC
1336 // now.
1337 return cap;
1338 }
1339
1340 #ifdef THREADED_RTS
1341 if (sched_state < SCHED_INTERRUPTING
1342 && RtsFlags.ParFlags.parGcEnabled
1343 && N >= RtsFlags.ParFlags.parGcGen
1344 && ! oldest_gen->mark)
1345 {
1346 gc_type = PENDING_GC_PAR;
1347 } else {
1348 gc_type = PENDING_GC_SEQ;
1349 }
1350
1351 // In order to GC, there must be no threads running Haskell code.
1352 // Therefore, the GC thread needs to hold *all* the capabilities,
1353 // and release them after the GC has completed.
1354 //
1355 // This seems to be the simplest way: previous attempts involved
1356 // making all the threads with capabilities give up their
1357 // capabilities and sleep except for the *last* one, which
1358 // actually did the GC. But it's quite hard to arrange for all
1359 // the other tasks to sleep and stay asleep.
1360 //
1361
1362 /* Other capabilities are prevented from running yet more Haskell
1363 threads if waiting_for_gc is set. Tested inside
1364 yieldCapability() and releaseCapability() in Capability.c */
1365
1366 prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1367 if (prev_pending_gc) {
1368 do {
1369 debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...",
1370 prev_pending_gc);
1371 ASSERT(cap);
1372 yieldCapability(&cap,task);
1373 } while (waiting_for_gc);
1374 return cap; // NOTE: task->cap might have changed here
1375 }
1376
1377 setContextSwitches();
1378
1379 // The final shutdown GC is always single-threaded, because it's
1380 // possible that some of the Capabilities have no worker threads.
1381
1382 if (gc_type == PENDING_GC_SEQ)
1383 {
1384 traceEventRequestSeqGc(cap);
1385 }
1386 else
1387 {
1388 traceEventRequestParGc(cap);
1389 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1390 }
1391
1392 if (gc_type == PENDING_GC_SEQ)
1393 {
1394 // single-threaded GC: grab all the capabilities
1395 for (i=0; i < n_capabilities; i++) {
1396 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1397 if (cap != &capabilities[i]) {
1398 Capability *pcap = &capabilities[i];
1399 // we better hope this task doesn't get migrated to
1400 // another Capability while we're waiting for this one.
1401 // It won't, because load balancing happens while we have
1402 // all the Capabilities, but even so it's a slightly
1403 // unsavoury invariant.
1404 task->cap = pcap;
1405 waitForReturnCapability(&pcap, task);
1406 if (pcap != &capabilities[i]) {
1407 barf("scheduleDoGC: got the wrong capability");
1408 }
1409 }
1410 }
1411 }
1412 else
1413 {
1414 // multi-threaded GC: make sure all the Capabilities donate one
1415 // GC thread each.
1416 waitForGcThreads(cap);
1417 }
1418
1419 #endif
1420
1421 IF_DEBUG(scheduler, printAllThreads());
1422
1423 delete_threads_and_gc:
1424 /*
1425 * We now have all the capabilities; if we're in an interrupting
1426 * state, then we should take the opportunity to delete all the
1427 * threads in the system.
1428 */
1429 if (sched_state == SCHED_INTERRUPTING) {
1430 deleteAllThreads(cap);
1431 sched_state = SCHED_SHUTTING_DOWN;
1432 }
1433
1434 heap_census = scheduleNeedHeapProfile(rtsTrue);
1435
1436 traceEventGcStart(cap);
1437 #if defined(THREADED_RTS)
1438 // reset waiting_for_gc *before* GC, so that when the GC threads
1439 // emerge they don't immediately re-enter the GC.
1440 waiting_for_gc = 0;
1441 GarbageCollect(force_major || heap_census, gc_type, cap);
1442 #else
1443 GarbageCollect(force_major || heap_census, 0, cap);
1444 #endif
1445 traceEventGcEnd(cap);
1446
1447 if (recent_activity == ACTIVITY_INACTIVE && force_major)
1448 {
1449 // We are doing a GC because the system has been idle for a
1450 // timeslice and we need to check for deadlock. Record the
1451 // fact that we've done a GC and turn off the timer signal;
1452 // it will get re-enabled if we run any threads after the GC.
1453 recent_activity = ACTIVITY_DONE_GC;
1454 stopTimer();
1455 }
1456 else
1457 {
1458 // the GC might have taken long enough for the timer to set
1459 // recent_activity = ACTIVITY_INACTIVE, but we aren't
1460 // necessarily deadlocked:
1461 recent_activity = ACTIVITY_YES;
1462 }
1463
1464 if (heap_census) {
1465 debugTrace(DEBUG_sched, "performing heap census");
1466 heapCensus();
1467 performHeapProfile = rtsFalse;
1468 }
1469
1470 #if defined(THREADED_RTS)
1471 if (gc_type == PENDING_GC_PAR)
1472 {
1473 releaseGCThreads(cap);
1474 }
1475 #endif
1476
1477 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1478 // GC set the heap_overflow flag, so we should proceed with
1479 // an orderly shutdown now. Ultimately we want the main
1480 // thread to return to its caller with HeapExhausted, at which
1481 // point the caller should call hs_exit(). The first step is
1482 // to delete all the threads.
1483 //
1484 // Another way to do this would be to raise an exception in
1485 // the main thread, which we really should do because it gives
1486 // the program a chance to clean up. But how do we find the
1487 // main thread? It should presumably be the same one that
1488 // gets ^C exceptions, but that's all done on the Haskell side
1489 // (GHC.TopHandler).
1490 sched_state = SCHED_INTERRUPTING;
1491 goto delete_threads_and_gc;
1492 }
1493
1494 #ifdef SPARKBALANCE
1495 /* JB
1496 Once we are all together... this would be the place to balance all
1497 spark pools. No concurrent stealing or adding of new sparks can
1498 occur. Should be defined in Sparks.c. */
1499 balanceSparkPoolsCaps(n_capabilities, capabilities);
1500 #endif
1501
1502 #if defined(THREADED_RTS)
1503 if (gc_type == PENDING_GC_SEQ) {
1504 // release our stash of capabilities.
1505 for (i = 0; i < n_capabilities; i++) {
1506 if (cap != &capabilities[i]) {
1507 task->cap = &capabilities[i];
1508 releaseCapability(&capabilities[i]);
1509 }
1510 }
1511 }
1512 if (cap) {
1513 task->cap = cap;
1514 } else {
1515 task->cap = NULL;
1516 }
1517 #endif
1518
1519 return cap;
1520 }
1521
1522 /* ---------------------------------------------------------------------------
1523 * Singleton fork(). Do not copy any running threads.
1524 * ------------------------------------------------------------------------- */
1525
1526 pid_t
1527 forkProcess(HsStablePtr *entry
1528 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1529 STG_UNUSED
1530 #endif
1531 )
1532 {
1533 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1534 pid_t pid;
1535 StgTSO* t,*next;
1536 Capability *cap;
1537 nat g;
1538
1539 #if defined(THREADED_RTS)
1540 if (RtsFlags.ParFlags.nNodes > 1) {
1541 errorBelch("forking not supported with +RTS -N<n> greater than 1");
1542 stg_exit(EXIT_FAILURE);
1543 }
1544 #endif
1545
1546 debugTrace(DEBUG_sched, "forking!");
1547
1548 // ToDo: for SMP, we should probably acquire *all* the capabilities
1549 cap = rts_lock();
1550
1551 // no funny business: hold locks while we fork, otherwise if some
1552 // other thread is holding a lock when the fork happens, the data
1553 // structure protected by the lock will forever be in an
1554 // inconsistent state in the child. See also #1391.
1555 ACQUIRE_LOCK(&sched_mutex);
1556 ACQUIRE_LOCK(&cap->lock);
1557 ACQUIRE_LOCK(&cap->running_task->lock);
1558
1559 stopTimer(); // See #4074
1560
1561 #if defined(TRACING)
1562 flushEventLog(); // so that child won't inherit dirty file buffers
1563 #endif
1564
1565 pid = fork();
1566
1567 if (pid) { // parent
1568
1569 startTimer(); // #4074
1570
1571 RELEASE_LOCK(&sched_mutex);
1572 RELEASE_LOCK(&cap->lock);
1573 RELEASE_LOCK(&cap->running_task->lock);
1574
1575 // just return the pid
1576 rts_unlock(cap);
1577 return pid;
1578
1579 } else { // child
1580
1581 #if defined(THREADED_RTS)
1582 initMutex(&sched_mutex);
1583 initMutex(&cap->lock);
1584 initMutex(&cap->running_task->lock);
1585 #endif
1586
1587 #ifdef TRACING
1588 resetTracing();
1589 #endif
1590
1591 // Now, all OS threads except the thread that forked are
1592 // stopped. We need to stop all Haskell threads, including
1593 // those involved in foreign calls. Also we need to delete
1594 // all Tasks, because they correspond to OS threads that are
1595 // now gone.
1596
1597 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1598 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1599 next = t->global_link;
1600 // don't allow threads to catch the ThreadKilled
1601 // exception, but we do want to raiseAsync() because these
1602 // threads may be evaluating thunks that we need later.
1603 deleteThread_(cap,t);
1604
1605 // stop the GC from updating the InCall to point to
1606 // the TSO. This is only necessary because the
1607 // OSThread bound to the TSO has been killed, and
1608 // won't get a chance to exit in the usual way (see
1609 // also scheduleHandleThreadFinished).
1610 t->bound = NULL;
1611 }
1612 }
1613
1614 // Empty the run queue. It seems tempting to let all the
1615 // killed threads stay on the run queue as zombies to be
1616 // cleaned up later, but some of them correspond to bound
1617 // threads for which the corresponding Task does not exist.
1618 cap->run_queue_hd = END_TSO_QUEUE;
1619 cap->run_queue_tl = END_TSO_QUEUE;
1620
1621 // Any suspended C-calling Tasks are no more, their OS threads
1622 // don't exist now:
1623 cap->suspended_ccalls = NULL;
1624
1625 // Empty the threads lists. Otherwise, the garbage
1626 // collector may attempt to resurrect some of these threads.
1627 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1628 generations[g].threads = END_TSO_QUEUE;
1629 }
1630
1631 discardTasksExcept(cap->running_task);
1632
1633 #if defined(THREADED_RTS)
1634 // Wipe our spare workers list, they no longer exist. New
1635 // workers will be created if necessary.
1636 cap->spare_workers = NULL;
1637 cap->n_spare_workers = 0;
1638 cap->returning_tasks_hd = NULL;
1639 cap->returning_tasks_tl = NULL;
1640 #endif
1641
1642 // On Unix, all timers are reset in the child, so we need to start
1643 // the timer again.
1644 initTimer();
1645 startTimer();
1646
1647 #if defined(THREADED_RTS)
1648 cap = ioManagerStartCap(cap);
1649 #endif
1650
1651 cap = rts_evalStableIO(cap, entry, NULL); // run the action
1652 rts_checkSchedStatus("forkProcess",cap);
1653
1654 rts_unlock(cap);
1655 hs_exit(); // clean up and exit
1656 stg_exit(EXIT_SUCCESS);
1657 }
1658 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1659 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1660 #endif
1661 }
1662
1663 /* ---------------------------------------------------------------------------
1664 * Delete all the threads in the system
1665 * ------------------------------------------------------------------------- */
1666
1667 static void
1668 deleteAllThreads ( Capability *cap )
1669 {
1670 // NOTE: only safe to call if we own all capabilities.
1671
1672 StgTSO* t, *next;
1673 nat g;
1674
1675 debugTrace(DEBUG_sched,"deleting all threads");
1676 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1677 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1678 next = t->global_link;
1679 deleteThread(cap,t);
1680 }
1681 }
1682
1683 // The run queue now contains a bunch of ThreadKilled threads. We
1684 // must not throw these away: the main thread(s) will be in there
1685 // somewhere, and the main scheduler loop has to deal with it.
1686 // Also, the run queue is the only thing keeping these threads from
1687 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1688
1689 #if !defined(THREADED_RTS)
1690 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1691 ASSERT(sleeping_queue == END_TSO_QUEUE);
1692 #endif
1693 }
1694
1695 /* -----------------------------------------------------------------------------
1696 Managing the suspended_ccalls list.
1697 Locks required: sched_mutex
1698 -------------------------------------------------------------------------- */
1699
1700 STATIC_INLINE void
1701 suspendTask (Capability *cap, Task *task)
1702 {
1703 InCall *incall;
1704
1705 incall = task->incall;
1706 ASSERT(incall->next == NULL && incall->prev == NULL);
1707 incall->next = cap->suspended_ccalls;
1708 incall->prev = NULL;
1709 if (cap->suspended_ccalls) {
1710 cap->suspended_ccalls->prev = incall;
1711 }
1712 cap->suspended_ccalls = incall;
1713 }
1714
1715 STATIC_INLINE void
1716 recoverSuspendedTask (Capability *cap, Task *task)
1717 {
1718 InCall *incall;
1719
1720 incall = task->incall;
1721 if (incall->prev) {
1722 incall->prev->next = incall->next;
1723 } else {
1724 ASSERT(cap->suspended_ccalls == incall);
1725 cap->suspended_ccalls = incall->next;
1726 }
1727 if (incall->next) {
1728 incall->next->prev = incall->prev;
1729 }
1730 incall->next = incall->prev = NULL;
1731 }
1732
1733 /* ---------------------------------------------------------------------------
1734 * Suspending & resuming Haskell threads.
1735 *
1736 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1737 * its capability before calling the C function. This allows another
1738 * task to pick up the capability and carry on running Haskell
1739 * threads. It also means that if the C call blocks, it won't lock
1740 * the whole system.
1741 *
1742 * The Haskell thread making the C call is put to sleep for the
1743 * duration of the call, on the suspended_ccalling_threads queue. We
1744 * give out a token to the task, which it can use to resume the thread
1745 * on return from the C function.
1746 *
1747 * If this is an interruptible C call, this means that the FFI call may be
1748 * unceremoniously terminated and should be scheduled on an
1749 * unbound worker thread.
1750 * ------------------------------------------------------------------------- */
1751
1752 void *
1753 suspendThread (StgRegTable *reg, rtsBool interruptible)
1754 {
1755 Capability *cap;
1756 int saved_errno;
1757 StgTSO *tso;
1758 Task *task;
1759 #if mingw32_HOST_OS
1760 StgWord32 saved_winerror;
1761 #endif
1762
1763 saved_errno = errno;
1764 #if mingw32_HOST_OS
1765 saved_winerror = GetLastError();
1766 #endif
1767
1768 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1769 */
1770 cap = regTableToCapability(reg);
1771
1772 task = cap->running_task;
1773 tso = cap->r.rCurrentTSO;
1774
1775 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
1776
1777 // XXX this might not be necessary --SDM
1778 tso->what_next = ThreadRunGHC;
1779
1780 threadPaused(cap,tso);
1781
1782 if (interruptible) {
1783 tso->why_blocked = BlockedOnCCall_Interruptible;
1784 } else {
1785 tso->why_blocked = BlockedOnCCall;
1786 }
1787
1788 // Hand back capability
1789 task->incall->suspended_tso = tso;
1790 task->incall->suspended_cap = cap;
1791
1792 ACQUIRE_LOCK(&cap->lock);
1793
1794 suspendTask(cap,task);
1795 cap->in_haskell = rtsFalse;
1796 releaseCapability_(cap,rtsFalse);
1797
1798 RELEASE_LOCK(&cap->lock);
1799
1800 errno = saved_errno;
1801 #if mingw32_HOST_OS
1802 SetLastError(saved_winerror);
1803 #endif
1804 return task;
1805 }
1806
1807 StgRegTable *
1808 resumeThread (void *task_)
1809 {
1810 StgTSO *tso;
1811 InCall *incall;
1812 Capability *cap;
1813 Task *task = task_;
1814 int saved_errno;
1815 #if mingw32_HOST_OS
1816 StgWord32 saved_winerror;
1817 #endif
1818
1819 saved_errno = errno;
1820 #if mingw32_HOST_OS
1821 saved_winerror = GetLastError();
1822 #endif
1823
1824 incall = task->incall;
1825 cap = incall->suspended_cap;
1826 task->cap = cap;
1827
1828 // Wait for permission to re-enter the RTS with the result.
1829 waitForReturnCapability(&cap,task);
1830 // we might be on a different capability now... but if so, our
1831 // entry on the suspended_ccalls list will also have been
1832 // migrated.
1833
1834 // Remove the thread from the suspended list
1835 recoverSuspendedTask(cap,task);
1836
1837 tso = incall->suspended_tso;
1838 incall->suspended_tso = NULL;
1839 incall->suspended_cap = NULL;
1840 tso->_link = END_TSO_QUEUE; // no write barrier reqd
1841
1842 traceEventRunThread(cap, tso);
1843
1844 /* Reset blocking status */
1845 tso->why_blocked = NotBlocked;
1846
1847 if ((tso->flags & TSO_BLOCKEX) == 0) {
1848 // avoid locking the TSO if we don't have to
1849 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
1850 maybePerformBlockedException(cap,tso);
1851 }
1852 }
1853
1854 cap->r.rCurrentTSO = tso;
1855 cap->in_haskell = rtsTrue;
1856 errno = saved_errno;
1857 #if mingw32_HOST_OS
1858 SetLastError(saved_winerror);
1859 #endif
1860
1861 /* We might have GC'd, mark the TSO dirty again */
1862 dirty_TSO(cap,tso);
1863 dirty_STACK(cap,tso->stackobj);
1864
1865 IF_DEBUG(sanity, checkTSO(tso));
1866
1867 return &cap->r;
1868 }
1869
1870 /* ---------------------------------------------------------------------------
1871 * scheduleThread()
1872 *
1873 * scheduleThread puts a thread on the end of the runnable queue.
1874 * This will usually be done immediately after a thread is created.
1875 * The caller of scheduleThread must create the thread using e.g.
1876 * createThread and push an appropriate closure
1877 * on this thread's stack before the scheduler is invoked.
1878 * ------------------------------------------------------------------------ */
1879
1880 void
1881 scheduleThread(Capability *cap, StgTSO *tso)
1882 {
1883 // The thread goes at the *end* of the run-queue, to avoid possible
1884 // starvation of any threads already on the queue.
1885 appendToRunQueue(cap,tso);
1886 }
1887
1888 void
1889 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1890 {
1891 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1892 // move this thread from now on.
1893 #if defined(THREADED_RTS)
1894 cpu %= RtsFlags.ParFlags.nNodes;
1895 if (cpu == cap->no) {
1896 appendToRunQueue(cap,tso);
1897 } else {
1898 migrateThread(cap, tso, &capabilities[cpu]);
1899 }
1900 #else
1901 appendToRunQueue(cap,tso);
1902 #endif
1903 }
1904
1905 Capability *
1906 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1907 {
1908 Task *task;
1909 DEBUG_ONLY( StgThreadID id );
1910
1911 // We already created/initialised the Task
1912 task = cap->running_task;
1913
1914 // This TSO is now a bound thread; make the Task and TSO
1915 // point to each other.
1916 tso->bound = task->incall;
1917 tso->cap = cap;
1918
1919 task->incall->tso = tso;
1920 task->incall->ret = ret;
1921 task->incall->stat = NoStatus;
1922
1923 appendToRunQueue(cap,tso);
1924
1925 DEBUG_ONLY( id = tso->id );
1926 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
1927
1928 cap = schedule(cap,task);
1929
1930 ASSERT(task->incall->stat != NoStatus);
1931 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1932
1933 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
1934 return cap;
1935 }
1936
1937 /* ----------------------------------------------------------------------------
1938 * Starting Tasks
1939 * ------------------------------------------------------------------------- */
1940
1941 #if defined(THREADED_RTS)
1942 void scheduleWorker (Capability *cap, Task *task)
1943 {
1944 // schedule() runs without a lock.
1945 cap = schedule(cap,task);
1946
1947 // On exit from schedule(), we have a Capability, but possibly not
1948 // the same one we started with.
1949
1950 // During shutdown, the requirement is that after all the
1951 // Capabilities are shut down, all workers that are shutting down
1952 // have finished workerTaskStop(). This is why we hold on to
1953 // cap->lock until we've finished workerTaskStop() below.
1954 //
1955 // There may be workers still involved in foreign calls; those
1956 // will just block in waitForReturnCapability() because the
1957 // Capability has been shut down.
1958 //
1959 ACQUIRE_LOCK(&cap->lock);
1960 releaseCapability_(cap,rtsFalse);
1961 workerTaskStop(task);
1962 RELEASE_LOCK(&cap->lock);
1963 }
1964 #endif
1965
1966 /* ---------------------------------------------------------------------------
1967 * initScheduler()
1968 *
1969 * Initialise the scheduler. This resets all the queues - if the
1970 * queues contained any threads, they'll be garbage collected at the
1971 * next pass.
1972 *
1973 * ------------------------------------------------------------------------ */
1974
1975 void
1976 initScheduler(void)
1977 {
1978 #if !defined(THREADED_RTS)
1979 blocked_queue_hd = END_TSO_QUEUE;
1980 blocked_queue_tl = END_TSO_QUEUE;
1981 sleeping_queue = END_TSO_QUEUE;
1982 #endif
1983
1984 sched_state = SCHED_RUNNING;
1985 recent_activity = ACTIVITY_YES;
1986
1987 #if defined(THREADED_RTS)
1988 /* Initialise the mutex and condition variables used by
1989 * the scheduler. */
1990 initMutex(&sched_mutex);
1991 #endif
1992
1993 ACQUIRE_LOCK(&sched_mutex);
1994
1995 /* A capability holds the state a native thread needs in
1996 * order to execute STG code. At least one capability is
1997 * floating around (only THREADED_RTS builds have more than one).
1998 */
1999 initCapabilities();
2000
2001 initTaskManager();
2002
2003 #if defined(THREADED_RTS)
2004 initSparkPools();
2005 #endif
2006
2007 RELEASE_LOCK(&sched_mutex);
2008
2009 #if defined(THREADED_RTS)
2010 /*
2011 * Eagerly start one worker to run each Capability, except for
2012 * Capability 0. The idea is that we're probably going to start a
2013 * bound thread on Capability 0 pretty soon, so we don't want a
2014 * worker task hogging it.
2015 */
2016 {
2017 nat i;
2018 Capability *cap;
2019 for (i = 1; i < n_capabilities; i++) {
2020 cap = &capabilities[i];
2021 ACQUIRE_LOCK(&cap->lock);
2022 startWorkerTask(cap);
2023 RELEASE_LOCK(&cap->lock);
2024 }
2025 }
2026 #endif
2027 }
2028
2029 void
2030 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2031 /* see Capability.c, shutdownCapability() */
2032 {
2033 Task *task = NULL;
2034
2035 task = newBoundTask();
2036
2037 // If we haven't killed all the threads yet, do it now.
2038 if (sched_state < SCHED_SHUTTING_DOWN) {
2039 sched_state = SCHED_INTERRUPTING;
2040 waitForReturnCapability(&task->cap,task);
2041 scheduleDoGC(task->cap,task,rtsFalse);
2042 ASSERT(task->incall->tso == NULL);
2043 releaseCapability(task->cap);
2044 }
2045 sched_state = SCHED_SHUTTING_DOWN;
2046
2047 shutdownCapabilities(task, wait_foreign);
2048
2049 boundTaskExiting(task);
2050 }
2051
2052 void
2053 freeScheduler( void )
2054 {
2055 nat still_running;
2056
2057 ACQUIRE_LOCK(&sched_mutex);
2058 still_running = freeTaskManager();
2059 // We can only free the Capabilities if there are no Tasks still
2060 // running. We might have a Task about to return from a foreign
2061 // call into waitForReturnCapability(), for example (actually,
2062 // this should be the *only* thing that a still-running Task can
2063 // do at this point, and it will block waiting for the
2064 // Capability).
2065 if (still_running == 0) {
2066 freeCapabilities();
2067 if (n_capabilities != 1) {
2068 stgFree(capabilities);
2069 }
2070 }
2071 RELEASE_LOCK(&sched_mutex);
2072 #if defined(THREADED_RTS)
2073 closeMutex(&sched_mutex);
2074 #endif
2075 }
2076
2077 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2078 void *user USED_IF_NOT_THREADS)
2079 {
2080 #if !defined(THREADED_RTS)
2081 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2082 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2083 evac(user, (StgClosure **)(void *)&sleeping_queue);
2084 #endif
2085 }
2086
2087 /* -----------------------------------------------------------------------------
2088 performGC
2089
2090 This is the interface to the garbage collector from Haskell land.
2091 We provide this so that external C code can allocate and garbage
2092 collect when called from Haskell via _ccall_GC.
2093 -------------------------------------------------------------------------- */
2094
2095 static void
2096 performGC_(rtsBool force_major)
2097 {
2098 Task *task;
2099
2100 // We must grab a new Task here, because the existing Task may be
2101 // associated with a particular Capability, and chained onto the
2102 // suspended_ccalls queue.
2103 task = newBoundTask();
2104
2105 waitForReturnCapability(&task->cap,task);
2106 scheduleDoGC(task->cap,task,force_major);
2107 releaseCapability(task->cap);
2108 boundTaskExiting(task);
2109 }
2110
2111 void
2112 performGC(void)
2113 {
2114 performGC_(rtsFalse);
2115 }
2116
2117 void
2118 performMajorGC(void)
2119 {
2120 performGC_(rtsTrue);
2121 }
2122
2123 /* ---------------------------------------------------------------------------
2124 Interrupt execution
2125 - usually called inside a signal handler so it mustn't do anything fancy.
2126 ------------------------------------------------------------------------ */
2127
2128 void
2129 interruptStgRts(void)
2130 {
2131 sched_state = SCHED_INTERRUPTING;
2132 setContextSwitches();
2133 #if defined(THREADED_RTS)
2134 wakeUpRts();
2135 #endif
2136 }
2137
2138 /* -----------------------------------------------------------------------------
2139 Wake up the RTS
2140
2141 This function causes at least one OS thread to wake up and run the
2142 scheduler loop. It is invoked when the RTS might be deadlocked, or
2143 an external event has arrived that may need servicing (eg. a
2144 keyboard interrupt).
2145
2146 In the single-threaded RTS we don't do anything here; we only have
2147 one thread anyway, and the event that caused us to want to wake up
2148 will have interrupted any blocking system call in progress anyway.
2149 -------------------------------------------------------------------------- */
2150
2151 #if defined(THREADED_RTS)
2152 void wakeUpRts(void)
2153 {
2154 // This forces the IO Manager thread to wakeup, which will
2155 // in turn ensure that some OS thread wakes up and runs the
2156 // scheduler loop, which will cause a GC and deadlock check.
2157 ioManagerWakeup();
2158 }
2159 #endif
2160
2161 /* -----------------------------------------------------------------------------
2162 Deleting threads
2163
2164 This is used for interruption (^C) and forking, and corresponds to
2165 raising an exception but without letting the thread catch the
2166 exception.
2167 -------------------------------------------------------------------------- */
2168
2169 static void
2170 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2171 {
2172 // NOTE: must only be called on a TSO that we have exclusive
2173 // access to, because we will call throwToSingleThreaded() below.
2174 // The TSO must be on the run queue of the Capability we own, or
2175 // we must own all Capabilities.
2176
2177 if (tso->why_blocked != BlockedOnCCall &&
2178 tso->why_blocked != BlockedOnCCall_Interruptible) {
2179 throwToSingleThreaded(tso->cap,tso,NULL);
2180 }
2181 }
2182
2183 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2184 static void
2185 deleteThread_(Capability *cap, StgTSO *tso)
2186 { // for forkProcess only:
2187 // like deleteThread(), but we delete threads in foreign calls, too.
2188
2189 if (tso->why_blocked == BlockedOnCCall ||
2190 tso->why_blocked == BlockedOnCCall_Interruptible) {
2191 tso->what_next = ThreadKilled;
2192 appendToRunQueue(tso->cap, tso);
2193 } else {
2194 deleteThread(cap,tso);
2195 }
2196 }
2197 #endif
2198
2199 /* -----------------------------------------------------------------------------
2200 raiseExceptionHelper
2201
2202 This function is called by the raise# primitve, just so that we can
2203 move some of the tricky bits of raising an exception from C-- into
2204 C. Who knows, it might be a useful re-useable thing here too.
2205 -------------------------------------------------------------------------- */
2206
2207 StgWord
2208 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2209 {
2210 Capability *cap = regTableToCapability(reg);
2211 StgThunk *raise_closure = NULL;
2212 StgPtr p, next;
2213 StgRetInfoTable *info;
2214 //
2215 // This closure represents the expression 'raise# E' where E
2216 // is the exception raise. It is used to overwrite all the
2217 // thunks which are currently under evaluataion.
2218 //
2219
2220 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2221 // LDV profiling: stg_raise_info has THUNK as its closure
2222 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2223 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2224 // 1 does not cause any problem unless profiling is performed.
2225 // However, when LDV profiling goes on, we need to linearly scan
2226 // small object pool, where raise_closure is stored, so we should
2227 // use MIN_UPD_SIZE.
2228 //
2229 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2230 // sizeofW(StgClosure)+1);
2231 //
2232
2233 //
2234 // Walk up the stack, looking for the catch frame. On the way,
2235 // we update any closures pointed to from update frames with the
2236 // raise closure that we just built.
2237 //
2238 p = tso->stackobj->sp;
2239 while(1) {
2240 info = get_ret_itbl((StgClosure *)p);
2241 next = p + stack_frame_sizeW((StgClosure *)p);
2242 switch (info->i.type) {
2243
2244 case UPDATE_FRAME:
2245 // Only create raise_closure if we need to.
2246 if (raise_closure == NULL) {
2247 raise_closure =
2248 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2249 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2250 raise_closure->payload[0] = exception;
2251 }
2252 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2253 (StgClosure *)raise_closure);
2254 p = next;
2255 continue;
2256
2257 case ATOMICALLY_FRAME:
2258 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2259 tso->stackobj->sp = p;
2260 return ATOMICALLY_FRAME;
2261
2262 case CATCH_FRAME:
2263 tso->stackobj->sp = p;
2264 return CATCH_FRAME;
2265
2266 case CATCH_STM_FRAME:
2267 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2268 tso->stackobj->sp = p;
2269 return CATCH_STM_FRAME;
2270
2271 case UNDERFLOW_FRAME:
2272 tso->stackobj->sp = p;
2273 threadStackUnderflow(cap,tso);
2274 p = tso->stackobj->sp;
2275 continue;
2276
2277 case STOP_FRAME:
2278 tso->stackobj->sp = p;
2279 return STOP_FRAME;
2280
2281 case CATCH_RETRY_FRAME:
2282 default:
2283 p = next;
2284 continue;
2285 }
2286 }
2287 }
2288
2289
2290 /* -----------------------------------------------------------------------------
2291 findRetryFrameHelper
2292
2293 This function is called by the retry# primitive. It traverses the stack
2294 leaving tso->sp referring to the frame which should handle the retry.
2295
2296 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2297 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2298
2299 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2300 create) because retries are not considered to be exceptions, despite the
2301 similar implementation.
2302
2303 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2304 not be created within memory transactions.
2305 -------------------------------------------------------------------------- */
2306
2307 StgWord
2308 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2309 {
2310 StgPtr p, next;
2311 StgRetInfoTable *info;
2312
2313 p = tso->stackobj->sp;
2314 while (1) {
2315 info = get_ret_itbl((StgClosure *)p);
2316 next = p + stack_frame_sizeW((StgClosure *)p);
2317 switch (info->i.type) {
2318
2319 case ATOMICALLY_FRAME:
2320 debugTrace(DEBUG_stm,
2321 "found ATOMICALLY_FRAME at %p during retry", p);
2322 tso->stackobj->sp = p;
2323 return ATOMICALLY_FRAME;
2324
2325 case CATCH_RETRY_FRAME:
2326 debugTrace(DEBUG_stm,
2327 "found CATCH_RETRY_FRAME at %p during retrry", p);
2328 tso->stackobj->sp = p;
2329 return CATCH_RETRY_FRAME;
2330
2331 case CATCH_STM_FRAME: {
2332 StgTRecHeader *trec = tso -> trec;
2333 StgTRecHeader *outer = trec -> enclosing_trec;
2334 debugTrace(DEBUG_stm,
2335 "found CATCH_STM_FRAME at %p during retry", p);
2336 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2337 stmAbortTransaction(cap, trec);
2338 stmFreeAbortedTRec(cap, trec);
2339 tso -> trec = outer;
2340 p = next;
2341 continue;
2342 }
2343
2344 case UNDERFLOW_FRAME:
2345 threadStackUnderflow(cap,tso);
2346 p = tso->stackobj->sp;
2347 continue;
2348
2349 default:
2350 ASSERT(info->i.type != CATCH_FRAME);
2351 ASSERT(info->i.type != STOP_FRAME);
2352 p = next;
2353 continue;
2354 }
2355 }
2356 }
2357
2358 /* -----------------------------------------------------------------------------
2359 resurrectThreads is called after garbage collection on the list of
2360 threads found to be garbage. Each of these threads will be woken
2361 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2362 on an MVar, or NonTermination if the thread was blocked on a Black
2363 Hole.
2364
2365 Locks: assumes we hold *all* the capabilities.
2366 -------------------------------------------------------------------------- */
2367
2368 void
2369 resurrectThreads (StgTSO *threads)
2370 {
2371 StgTSO *tso, *next;
2372 Capability *cap;
2373 generation *gen;
2374
2375 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2376 next = tso->global_link;
2377
2378 gen = Bdescr((P_)tso)->gen;
2379 tso->global_link = gen->threads;
2380 gen->threads = tso;
2381
2382 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2383
2384 // Wake up the thread on the Capability it was last on
2385 cap = tso->cap;
2386
2387 switch (tso->why_blocked) {
2388 case BlockedOnMVar:
2389 /* Called by GC - sched_mutex lock is currently held. */
2390 throwToSingleThreaded(cap, tso,
2391 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2392 break;
2393 case BlockedOnBlackHole:
2394 throwToSingleThreaded(cap, tso,
2395 (StgClosure *)nonTermination_closure);
2396 break;
2397 case BlockedOnSTM:
2398 throwToSingleThreaded(cap, tso,
2399 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2400 break;
2401 case NotBlocked:
2402 /* This might happen if the thread was blocked on a black hole
2403 * belonging to a thread that we've just woken up (raiseAsync
2404 * can wake up threads, remember...).
2405 */
2406 continue;
2407 case BlockedOnMsgThrowTo:
2408 // This can happen if the target is masking, blocks on a
2409 // black hole, and then is found to be unreachable. In
2410 // this case, we want to let the target wake up and carry
2411 // on, and do nothing to this thread.
2412 continue;
2413 default:
2414 barf("resurrectThreads: thread blocked in a strange way: %d",
2415 tso->why_blocked);
2416 }
2417 }
2418 }