Don't call DEAD_WEAK finalizer again on shutdown (#7170)
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
95 * exists - earlier gccs apparently didn't.
96 * -= chak
97 */
98 StgTSO dummy_tso;
99
100 /*
101 * This mutex protects most of the global scheduler data in
102 * the THREADED_RTS runtime.
103 */
104 #if defined(THREADED_RTS)
105 Mutex sched_mutex;
106 #endif
107
108 #if !defined(mingw32_HOST_OS)
109 #define FORKPROCESS_PRIMOP_SUPPORTED
110 #endif
111
112 // Local stats
113 #ifdef THREADED_RTS
114 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
115 #endif
116
117 /* -----------------------------------------------------------------------------
118 * static function prototypes
119 * -------------------------------------------------------------------------- */
120
121 static Capability *schedule (Capability *initialCapability, Task *task);
122
123 //
124 // These functions all encapsulate parts of the scheduler loop, and are
125 // abstracted only to make the structure and control flow of the
126 // scheduler clearer.
127 //
128 static void schedulePreLoop (void);
129 static void scheduleFindWork (Capability **pcap);
130 #if defined(THREADED_RTS)
131 static void scheduleYield (Capability **pcap, Task *task);
132 #endif
133 #if defined(THREADED_RTS)
134 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
135 static void acquireAllCapabilities(Capability *cap, Task *task);
136 static void releaseAllCapabilities(nat n, Capability *cap, Task *task);
137 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
138 #endif
139 static void scheduleStartSignalHandlers (Capability *cap);
140 static void scheduleCheckBlockedThreads (Capability *cap);
141 static void scheduleProcessInbox(Capability **cap);
142 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
143 static void schedulePushWork(Capability *cap, Task *task);
144 #if defined(THREADED_RTS)
145 static void scheduleActivateSpark(Capability *cap);
146 #endif
147 static void schedulePostRunThread(Capability *cap, StgTSO *t);
148 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
149 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
150 nat prev_what_next );
151 static void scheduleHandleThreadBlocked( StgTSO *t );
152 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
153 StgTSO *t );
154 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
155 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
156
157 static void deleteThread (Capability *cap, StgTSO *tso);
158 static void deleteAllThreads (Capability *cap);
159
160 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
161 static void deleteThread_(Capability *cap, StgTSO *tso);
162 #endif
163
164 /* ---------------------------------------------------------------------------
165 Main scheduling loop.
166
167 We use round-robin scheduling, each thread returning to the
168 scheduler loop when one of these conditions is detected:
169
170 * out of heap space
171 * timer expires (thread yields)
172 * thread blocks
173 * thread ends
174 * stack overflow
175
176 ------------------------------------------------------------------------ */
177
178 static Capability *
179 schedule (Capability *initialCapability, Task *task)
180 {
181 StgTSO *t;
182 Capability *cap;
183 StgThreadReturnCode ret;
184 nat prev_what_next;
185 rtsBool ready_to_gc;
186 #if defined(THREADED_RTS)
187 rtsBool first = rtsTrue;
188 #endif
189
190 cap = initialCapability;
191
192 // Pre-condition: this task owns initialCapability.
193 // The sched_mutex is *NOT* held
194 // NB. on return, we still hold a capability.
195
196 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
197
198 schedulePreLoop();
199
200 // -----------------------------------------------------------
201 // Scheduler loop starts here:
202
203 while (1) {
204
205 // Check whether we have re-entered the RTS from Haskell without
206 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
207 // call).
208 if (cap->in_haskell) {
209 errorBelch("schedule: re-entered unsafely.\n"
210 " Perhaps a 'foreign import unsafe' should be 'safe'?");
211 stg_exit(EXIT_FAILURE);
212 }
213
214 // The interruption / shutdown sequence.
215 //
216 // In order to cleanly shut down the runtime, we want to:
217 // * make sure that all main threads return to their callers
218 // with the state 'Interrupted'.
219 // * clean up all OS threads assocated with the runtime
220 // * free all memory etc.
221 //
222 // So the sequence for ^C goes like this:
223 //
224 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
225 // arranges for some Capability to wake up
226 //
227 // * all threads in the system are halted, and the zombies are
228 // placed on the run queue for cleaning up. We acquire all
229 // the capabilities in order to delete the threads, this is
230 // done by scheduleDoGC() for convenience (because GC already
231 // needs to acquire all the capabilities). We can't kill
232 // threads involved in foreign calls.
233 //
234 // * somebody calls shutdownHaskell(), which calls exitScheduler()
235 //
236 // * sched_state := SCHED_SHUTTING_DOWN
237 //
238 // * all workers exit when the run queue on their capability
239 // drains. All main threads will also exit when their TSO
240 // reaches the head of the run queue and they can return.
241 //
242 // * eventually all Capabilities will shut down, and the RTS can
243 // exit.
244 //
245 // * We might be left with threads blocked in foreign calls,
246 // we should really attempt to kill these somehow (TODO);
247
248 switch (sched_state) {
249 case SCHED_RUNNING:
250 break;
251 case SCHED_INTERRUPTING:
252 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
253 /* scheduleDoGC() deletes all the threads */
254 scheduleDoGC(&cap,task,rtsTrue);
255
256 // after scheduleDoGC(), we must be shutting down. Either some
257 // other Capability did the final GC, or we did it above,
258 // either way we can fall through to the SCHED_SHUTTING_DOWN
259 // case now.
260 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
261 // fall through
262
263 case SCHED_SHUTTING_DOWN:
264 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
265 // If we are a worker, just exit. If we're a bound thread
266 // then we will exit below when we've removed our TSO from
267 // the run queue.
268 if (!isBoundTask(task) && emptyRunQueue(cap)) {
269 return cap;
270 }
271 break;
272 default:
273 barf("sched_state: %d", sched_state);
274 }
275
276 scheduleFindWork(&cap);
277
278 /* work pushing, currently relevant only for THREADED_RTS:
279 (pushes threads, wakes up idle capabilities for stealing) */
280 schedulePushWork(cap,task);
281
282 scheduleDetectDeadlock(&cap,task);
283
284 // Normally, the only way we can get here with no threads to
285 // run is if a keyboard interrupt received during
286 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
287 // Additionally, it is not fatal for the
288 // threaded RTS to reach here with no threads to run.
289 //
290 // win32: might be here due to awaitEvent() being abandoned
291 // as a result of a console event having been delivered.
292
293 #if defined(THREADED_RTS)
294 if (first)
295 {
296 // XXX: ToDo
297 // // don't yield the first time, we want a chance to run this
298 // // thread for a bit, even if there are others banging at the
299 // // door.
300 // first = rtsFalse;
301 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
302 }
303
304 scheduleYield(&cap,task);
305
306 if (emptyRunQueue(cap)) continue; // look for work again
307 #endif
308
309 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
310 if ( emptyRunQueue(cap) ) {
311 ASSERT(sched_state >= SCHED_INTERRUPTING);
312 }
313 #endif
314
315 //
316 // Get a thread to run
317 //
318 t = popRunQueue(cap);
319
320 // Sanity check the thread we're about to run. This can be
321 // expensive if there is lots of thread switching going on...
322 IF_DEBUG(sanity,checkTSO(t));
323
324 #if defined(THREADED_RTS)
325 // Check whether we can run this thread in the current task.
326 // If not, we have to pass our capability to the right task.
327 {
328 InCall *bound = t->bound;
329
330 if (bound) {
331 if (bound->task == task) {
332 // yes, the Haskell thread is bound to the current native thread
333 } else {
334 debugTrace(DEBUG_sched,
335 "thread %lu bound to another OS thread",
336 (unsigned long)t->id);
337 // no, bound to a different Haskell thread: pass to that thread
338 pushOnRunQueue(cap,t);
339 continue;
340 }
341 } else {
342 // The thread we want to run is unbound.
343 if (task->incall->tso) {
344 debugTrace(DEBUG_sched,
345 "this OS thread cannot run thread %lu",
346 (unsigned long)t->id);
347 // no, the current native thread is bound to a different
348 // Haskell thread, so pass it to any worker thread
349 pushOnRunQueue(cap,t);
350 continue;
351 }
352 }
353 }
354 #endif
355
356 // If we're shutting down, and this thread has not yet been
357 // killed, kill it now. This sometimes happens when a finalizer
358 // thread is created by the final GC, or a thread previously
359 // in a foreign call returns.
360 if (sched_state >= SCHED_INTERRUPTING &&
361 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
362 deleteThread(cap,t);
363 }
364
365 // If this capability is disabled, migrate the thread away rather
366 // than running it. NB. but not if the thread is bound: it is
367 // really hard for a bound thread to migrate itself. Believe me,
368 // I tried several ways and couldn't find a way to do it.
369 // Instead, when everything is stopped for GC, we migrate all the
370 // threads on the run queue then (see scheduleDoGC()).
371 //
372 // ToDo: what about TSO_LOCKED? Currently we're migrating those
373 // when the number of capabilities drops, but we never migrate
374 // them back if it rises again. Presumably we should, but after
375 // the thread has been migrated we no longer know what capability
376 // it was originally on.
377 #ifdef THREADED_RTS
378 if (cap->disabled && !t->bound) {
379 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
380 migrateThread(cap, t, dest_cap);
381 continue;
382 }
383 #endif
384
385 /* context switches are initiated by the timer signal, unless
386 * the user specified "context switch as often as possible", with
387 * +RTS -C0
388 */
389 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
390 && !emptyThreadQueues(cap)) {
391 cap->context_switch = 1;
392 }
393
394 run_thread:
395
396 // CurrentTSO is the thread to run. It might be different if we
397 // loop back to run_thread, so make sure to set CurrentTSO after
398 // that.
399 cap->r.rCurrentTSO = t;
400
401 startHeapProfTimer();
402
403 // ----------------------------------------------------------------------
404 // Run the current thread
405
406 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
407 ASSERT(t->cap == cap);
408 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
409
410 prev_what_next = t->what_next;
411
412 errno = t->saved_errno;
413 #if mingw32_HOST_OS
414 SetLastError(t->saved_winerror);
415 #endif
416
417 // reset the interrupt flag before running Haskell code
418 cap->interrupt = 0;
419
420 cap->in_haskell = rtsTrue;
421 cap->idle = 0;
422
423 dirty_TSO(cap,t);
424 dirty_STACK(cap,t->stackobj);
425
426 switch (recent_activity)
427 {
428 case ACTIVITY_DONE_GC: {
429 // ACTIVITY_DONE_GC means we turned off the timer signal to
430 // conserve power (see #1623). Re-enable it here.
431 nat prev;
432 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
433 if (prev == ACTIVITY_DONE_GC) {
434 #ifndef PROFILING
435 startTimer();
436 #endif
437 }
438 break;
439 }
440 case ACTIVITY_INACTIVE:
441 // If we reached ACTIVITY_INACTIVE, then don't reset it until
442 // we've done the GC. The thread running here might just be
443 // the IO manager thread that handle_tick() woke up via
444 // wakeUpRts().
445 break;
446 default:
447 recent_activity = ACTIVITY_YES;
448 }
449
450 traceEventRunThread(cap, t);
451
452 switch (prev_what_next) {
453
454 case ThreadKilled:
455 case ThreadComplete:
456 /* Thread already finished, return to scheduler. */
457 ret = ThreadFinished;
458 break;
459
460 case ThreadRunGHC:
461 {
462 StgRegTable *r;
463 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
464 cap = regTableToCapability(r);
465 ret = r->rRet;
466 break;
467 }
468
469 case ThreadInterpret:
470 cap = interpretBCO(cap);
471 ret = cap->r.rRet;
472 break;
473
474 default:
475 barf("schedule: invalid what_next field");
476 }
477
478 cap->in_haskell = rtsFalse;
479
480 // The TSO might have moved, eg. if it re-entered the RTS and a GC
481 // happened. So find the new location:
482 t = cap->r.rCurrentTSO;
483
484 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
485 // don't want it set when not running a Haskell thread.
486 cap->r.rCurrentTSO = NULL;
487
488 // And save the current errno in this thread.
489 // XXX: possibly bogus for SMP because this thread might already
490 // be running again, see code below.
491 t->saved_errno = errno;
492 #if mingw32_HOST_OS
493 // Similarly for Windows error code
494 t->saved_winerror = GetLastError();
495 #endif
496
497 if (ret == ThreadBlocked) {
498 if (t->why_blocked == BlockedOnBlackHole) {
499 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
500 traceEventStopThread(cap, t, t->why_blocked + 6,
501 owner != NULL ? owner->id : 0);
502 } else {
503 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
504 }
505 } else {
506 traceEventStopThread(cap, t, ret, 0);
507 }
508
509 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
510 ASSERT(t->cap == cap);
511
512 // ----------------------------------------------------------------------
513
514 // Costs for the scheduler are assigned to CCS_SYSTEM
515 stopHeapProfTimer();
516 #if defined(PROFILING)
517 cap->r.rCCCS = CCS_SYSTEM;
518 #endif
519
520 schedulePostRunThread(cap,t);
521
522 ready_to_gc = rtsFalse;
523
524 switch (ret) {
525 case HeapOverflow:
526 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
527 break;
528
529 case StackOverflow:
530 // just adjust the stack for this thread, then pop it back
531 // on the run queue.
532 threadStackOverflow(cap, t);
533 pushOnRunQueue(cap,t);
534 break;
535
536 case ThreadYielding:
537 if (scheduleHandleYield(cap, t, prev_what_next)) {
538 // shortcut for switching between compiler/interpreter:
539 goto run_thread;
540 }
541 break;
542
543 case ThreadBlocked:
544 scheduleHandleThreadBlocked(t);
545 break;
546
547 case ThreadFinished:
548 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
549 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
550 break;
551
552 default:
553 barf("schedule: invalid thread return code %d", (int)ret);
554 }
555
556 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
557 scheduleDoGC(&cap,task,rtsFalse);
558 }
559 } /* end of while() */
560 }
561
562 /* -----------------------------------------------------------------------------
563 * Run queue operations
564 * -------------------------------------------------------------------------- */
565
566 void
567 removeFromRunQueue (Capability *cap, StgTSO *tso)
568 {
569 if (tso->block_info.prev == END_TSO_QUEUE) {
570 ASSERT(cap->run_queue_hd == tso);
571 cap->run_queue_hd = tso->_link;
572 } else {
573 setTSOLink(cap, tso->block_info.prev, tso->_link);
574 }
575 if (tso->_link == END_TSO_QUEUE) {
576 ASSERT(cap->run_queue_tl == tso);
577 cap->run_queue_tl = tso->block_info.prev;
578 } else {
579 setTSOPrev(cap, tso->_link, tso->block_info.prev);
580 }
581 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
582
583 IF_DEBUG(sanity, checkRunQueue(cap));
584 }
585
586 void
587 promoteInRunQueue (Capability *cap, StgTSO *tso)
588 {
589 removeFromRunQueue(cap, tso);
590 pushOnRunQueue(cap, tso);
591 }
592
593 /* ----------------------------------------------------------------------------
594 * Setting up the scheduler loop
595 * ------------------------------------------------------------------------- */
596
597 static void
598 schedulePreLoop(void)
599 {
600 // initialisation for scheduler - what cannot go into initScheduler()
601
602 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
603 win32AllocStack();
604 #endif
605 }
606
607 /* -----------------------------------------------------------------------------
608 * scheduleFindWork()
609 *
610 * Search for work to do, and handle messages from elsewhere.
611 * -------------------------------------------------------------------------- */
612
613 static void
614 scheduleFindWork (Capability **pcap)
615 {
616 scheduleStartSignalHandlers(*pcap);
617
618 scheduleProcessInbox(pcap);
619
620 scheduleCheckBlockedThreads(*pcap);
621
622 #if defined(THREADED_RTS)
623 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
624 #endif
625 }
626
627 #if defined(THREADED_RTS)
628 STATIC_INLINE rtsBool
629 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
630 {
631 // we need to yield this capability to someone else if..
632 // - another thread is initiating a GC, and we didn't just do a GC
633 // (see Note [GC livelock])
634 // - another Task is returning from a foreign call
635 // - the thread at the head of the run queue cannot be run
636 // by this Task (it is bound to another Task, or it is unbound
637 // and this task it bound).
638 //
639 // Note [GC livelock]
640 //
641 // If we are interrupted to do a GC, then we do not immediately do
642 // another one. This avoids a starvation situation where one
643 // Capability keeps forcing a GC and the other Capabilities make no
644 // progress at all.
645
646 return ((pending_sync && !didGcLast) ||
647 cap->returning_tasks_hd != NULL ||
648 (!emptyRunQueue(cap) && (task->incall->tso == NULL
649 ? peekRunQueue(cap)->bound != NULL
650 : peekRunQueue(cap)->bound != task->incall)));
651 }
652
653 // This is the single place where a Task goes to sleep. There are
654 // two reasons it might need to sleep:
655 // - there are no threads to run
656 // - we need to yield this Capability to someone else
657 // (see shouldYieldCapability())
658 //
659 // Careful: the scheduler loop is quite delicate. Make sure you run
660 // the tests in testsuite/concurrent (all ways) after modifying this,
661 // and also check the benchmarks in nofib/parallel for regressions.
662
663 static void
664 scheduleYield (Capability **pcap, Task *task)
665 {
666 Capability *cap = *pcap;
667 int didGcLast = rtsFalse;
668
669 // if we have work, and we don't need to give up the Capability, continue.
670 //
671 if (!shouldYieldCapability(cap,task,rtsFalse) &&
672 (!emptyRunQueue(cap) ||
673 !emptyInbox(cap) ||
674 sched_state >= SCHED_INTERRUPTING)) {
675 return;
676 }
677
678 // otherwise yield (sleep), and keep yielding if necessary.
679 do {
680 didGcLast = yieldCapability(&cap,task, !didGcLast);
681 }
682 while (shouldYieldCapability(cap,task,didGcLast));
683
684 // note there may still be no threads on the run queue at this
685 // point, the caller has to check.
686
687 *pcap = cap;
688 return;
689 }
690 #endif
691
692 /* -----------------------------------------------------------------------------
693 * schedulePushWork()
694 *
695 * Push work to other Capabilities if we have some.
696 * -------------------------------------------------------------------------- */
697
698 static void
699 schedulePushWork(Capability *cap USED_IF_THREADS,
700 Task *task USED_IF_THREADS)
701 {
702 /* following code not for PARALLEL_HASKELL. I kept the call general,
703 future GUM versions might use pushing in a distributed setup */
704 #if defined(THREADED_RTS)
705
706 Capability *free_caps[n_capabilities], *cap0;
707 nat i, n_free_caps;
708
709 // migration can be turned off with +RTS -qm
710 if (!RtsFlags.ParFlags.migrate) return;
711
712 // Check whether we have more threads on our run queue, or sparks
713 // in our pool, that we could hand to another Capability.
714 if (emptyRunQueue(cap)) {
715 if (sparkPoolSizeCap(cap) < 2) return;
716 } else {
717 if (singletonRunQueue(cap) &&
718 sparkPoolSizeCap(cap) < 1) return;
719 }
720
721 // First grab as many free Capabilities as we can.
722 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
723 cap0 = capabilities[i];
724 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
725 if (!emptyRunQueue(cap0)
726 || cap0->returning_tasks_hd != NULL
727 || cap0->inbox != (Message*)END_TSO_QUEUE) {
728 // it already has some work, we just grabbed it at
729 // the wrong moment. Or maybe it's deadlocked!
730 releaseCapability(cap0);
731 } else {
732 free_caps[n_free_caps++] = cap0;
733 }
734 }
735 }
736
737 // we now have n_free_caps free capabilities stashed in
738 // free_caps[]. Share our run queue equally with them. This is
739 // probably the simplest thing we could do; improvements we might
740 // want to do include:
741 //
742 // - giving high priority to moving relatively new threads, on
743 // the gournds that they haven't had time to build up a
744 // working set in the cache on this CPU/Capability.
745 //
746 // - giving low priority to moving long-lived threads
747
748 if (n_free_caps > 0) {
749 StgTSO *prev, *t, *next;
750 #ifdef SPARK_PUSHING
751 rtsBool pushed_to_all;
752 #endif
753
754 debugTrace(DEBUG_sched,
755 "cap %d: %s and %d free capabilities, sharing...",
756 cap->no,
757 (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
758 "excess threads on run queue":"sparks to share (>=2)",
759 n_free_caps);
760
761 i = 0;
762 #ifdef SPARK_PUSHING
763 pushed_to_all = rtsFalse;
764 #endif
765
766 if (cap->run_queue_hd != END_TSO_QUEUE) {
767 prev = cap->run_queue_hd;
768 t = prev->_link;
769 prev->_link = END_TSO_QUEUE;
770 for (; t != END_TSO_QUEUE; t = next) {
771 next = t->_link;
772 t->_link = END_TSO_QUEUE;
773 if (t->bound == task->incall // don't move my bound thread
774 || tsoLocked(t)) { // don't move a locked thread
775 setTSOLink(cap, prev, t);
776 setTSOPrev(cap, t, prev);
777 prev = t;
778 } else if (i == n_free_caps) {
779 #ifdef SPARK_PUSHING
780 pushed_to_all = rtsTrue;
781 #endif
782 i = 0;
783 // keep one for us
784 setTSOLink(cap, prev, t);
785 setTSOPrev(cap, t, prev);
786 prev = t;
787 } else {
788 appendToRunQueue(free_caps[i],t);
789
790 traceEventMigrateThread (cap, t, free_caps[i]->no);
791
792 if (t->bound) { t->bound->task->cap = free_caps[i]; }
793 t->cap = free_caps[i];
794 i++;
795 }
796 }
797 cap->run_queue_tl = prev;
798
799 IF_DEBUG(sanity, checkRunQueue(cap));
800 }
801
802 #ifdef SPARK_PUSHING
803 /* JB I left this code in place, it would work but is not necessary */
804
805 // If there are some free capabilities that we didn't push any
806 // threads to, then try to push a spark to each one.
807 if (!pushed_to_all) {
808 StgClosure *spark;
809 // i is the next free capability to push to
810 for (; i < n_free_caps; i++) {
811 if (emptySparkPoolCap(free_caps[i])) {
812 spark = tryStealSpark(cap->sparks);
813 if (spark != NULL) {
814 /* TODO: if anyone wants to re-enable this code then
815 * they must consider the fizzledSpark(spark) case
816 * and update the per-cap spark statistics.
817 */
818 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
819
820 traceEventStealSpark(free_caps[i], t, cap->no);
821
822 newSpark(&(free_caps[i]->r), spark);
823 }
824 }
825 }
826 }
827 #endif /* SPARK_PUSHING */
828
829 // release the capabilities
830 for (i = 0; i < n_free_caps; i++) {
831 task->cap = free_caps[i];
832 releaseAndWakeupCapability(free_caps[i]);
833 }
834 }
835 task->cap = cap; // reset to point to our Capability.
836
837 #endif /* THREADED_RTS */
838
839 }
840
841 /* ----------------------------------------------------------------------------
842 * Start any pending signal handlers
843 * ------------------------------------------------------------------------- */
844
845 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
846 static void
847 scheduleStartSignalHandlers(Capability *cap)
848 {
849 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
850 // safe outside the lock
851 startSignalHandlers(cap);
852 }
853 }
854 #else
855 static void
856 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
857 {
858 }
859 #endif
860
861 /* ----------------------------------------------------------------------------
862 * Check for blocked threads that can be woken up.
863 * ------------------------------------------------------------------------- */
864
865 static void
866 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
867 {
868 #if !defined(THREADED_RTS)
869 //
870 // Check whether any waiting threads need to be woken up. If the
871 // run queue is empty, and there are no other tasks running, we
872 // can wait indefinitely for something to happen.
873 //
874 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
875 {
876 awaitEvent (emptyRunQueue(cap));
877 }
878 #endif
879 }
880
881 /* ----------------------------------------------------------------------------
882 * Detect deadlock conditions and attempt to resolve them.
883 * ------------------------------------------------------------------------- */
884
885 static void
886 scheduleDetectDeadlock (Capability **pcap, Task *task)
887 {
888 Capability *cap = *pcap;
889 /*
890 * Detect deadlock: when we have no threads to run, there are no
891 * threads blocked, waiting for I/O, or sleeping, and all the
892 * other tasks are waiting for work, we must have a deadlock of
893 * some description.
894 */
895 if ( emptyThreadQueues(cap) )
896 {
897 #if defined(THREADED_RTS)
898 /*
899 * In the threaded RTS, we only check for deadlock if there
900 * has been no activity in a complete timeslice. This means
901 * we won't eagerly start a full GC just because we don't have
902 * any threads to run currently.
903 */
904 if (recent_activity != ACTIVITY_INACTIVE) return;
905 #endif
906
907 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
908
909 // Garbage collection can release some new threads due to
910 // either (a) finalizers or (b) threads resurrected because
911 // they are unreachable and will therefore be sent an
912 // exception. Any threads thus released will be immediately
913 // runnable.
914 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
915 cap = *pcap;
916 // when force_major == rtsTrue. scheduleDoGC sets
917 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
918 // signal.
919
920 if ( !emptyRunQueue(cap) ) return;
921
922 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
923 /* If we have user-installed signal handlers, then wait
924 * for signals to arrive rather then bombing out with a
925 * deadlock.
926 */
927 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
928 debugTrace(DEBUG_sched,
929 "still deadlocked, waiting for signals...");
930
931 awaitUserSignals();
932
933 if (signals_pending()) {
934 startSignalHandlers(cap);
935 }
936
937 // either we have threads to run, or we were interrupted:
938 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
939
940 return;
941 }
942 #endif
943
944 #if !defined(THREADED_RTS)
945 /* Probably a real deadlock. Send the current main thread the
946 * Deadlock exception.
947 */
948 if (task->incall->tso) {
949 switch (task->incall->tso->why_blocked) {
950 case BlockedOnSTM:
951 case BlockedOnBlackHole:
952 case BlockedOnMsgThrowTo:
953 case BlockedOnMVar:
954 case BlockedOnMVarRead:
955 throwToSingleThreaded(cap, task->incall->tso,
956 (StgClosure *)nonTermination_closure);
957 return;
958 default:
959 barf("deadlock: main thread blocked in a strange way");
960 }
961 }
962 return;
963 #endif
964 }
965 }
966
967
968 /* ----------------------------------------------------------------------------
969 * Send pending messages (PARALLEL_HASKELL only)
970 * ------------------------------------------------------------------------- */
971
972 #if defined(PARALLEL_HASKELL)
973 static void
974 scheduleSendPendingMessages(void)
975 {
976
977 # if defined(PAR) // global Mem.Mgmt., omit for now
978 if (PendingFetches != END_BF_QUEUE) {
979 processFetches();
980 }
981 # endif
982
983 if (RtsFlags.ParFlags.BufferTime) {
984 // if we use message buffering, we must send away all message
985 // packets which have become too old...
986 sendOldBuffers();
987 }
988 }
989 #endif
990
991 /* ----------------------------------------------------------------------------
992 * Process message in the current Capability's inbox
993 * ------------------------------------------------------------------------- */
994
995 static void
996 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
997 {
998 #if defined(THREADED_RTS)
999 Message *m, *next;
1000 int r;
1001 Capability *cap = *pcap;
1002
1003 while (!emptyInbox(cap)) {
1004 if (cap->r.rCurrentNursery->link == NULL ||
1005 g0->n_new_large_words >= large_alloc_lim) {
1006 scheduleDoGC(pcap, cap->running_task, rtsFalse);
1007 cap = *pcap;
1008 }
1009
1010 // don't use a blocking acquire; if the lock is held by
1011 // another thread then just carry on. This seems to avoid
1012 // getting stuck in a message ping-pong situation with other
1013 // processors. We'll check the inbox again later anyway.
1014 //
1015 // We should really use a more efficient queue data structure
1016 // here. The trickiness is that we must ensure a Capability
1017 // never goes idle if the inbox is non-empty, which is why we
1018 // use cap->lock (cap->lock is released as the last thing
1019 // before going idle; see Capability.c:releaseCapability()).
1020 r = TRY_ACQUIRE_LOCK(&cap->lock);
1021 if (r != 0) return;
1022
1023 m = cap->inbox;
1024 cap->inbox = (Message*)END_TSO_QUEUE;
1025
1026 RELEASE_LOCK(&cap->lock);
1027
1028 while (m != (Message*)END_TSO_QUEUE) {
1029 next = m->link;
1030 executeMessage(cap, m);
1031 m = next;
1032 }
1033 }
1034 #endif
1035 }
1036
1037 /* ----------------------------------------------------------------------------
1038 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1039 * ------------------------------------------------------------------------- */
1040
1041 #if defined(THREADED_RTS)
1042 static void
1043 scheduleActivateSpark(Capability *cap)
1044 {
1045 if (anySparks() && !cap->disabled)
1046 {
1047 createSparkThread(cap);
1048 debugTrace(DEBUG_sched, "creating a spark thread");
1049 }
1050 }
1051 #endif // PARALLEL_HASKELL || THREADED_RTS
1052
1053 /* ----------------------------------------------------------------------------
1054 * After running a thread...
1055 * ------------------------------------------------------------------------- */
1056
1057 static void
1058 schedulePostRunThread (Capability *cap, StgTSO *t)
1059 {
1060 // We have to be able to catch transactions that are in an
1061 // infinite loop as a result of seeing an inconsistent view of
1062 // memory, e.g.
1063 //
1064 // atomically $ do
1065 // [a,b] <- mapM readTVar [ta,tb]
1066 // when (a == b) loop
1067 //
1068 // and a is never equal to b given a consistent view of memory.
1069 //
1070 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1071 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1072 debugTrace(DEBUG_sched | DEBUG_stm,
1073 "trec %p found wasting its time", t);
1074
1075 // strip the stack back to the
1076 // ATOMICALLY_FRAME, aborting the (nested)
1077 // transaction, and saving the stack of any
1078 // partially-evaluated thunks on the heap.
1079 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1080
1081 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1082 }
1083 }
1084
1085 //
1086 // If the current thread's allocation limit has run out, send it
1087 // the AllocationLimitExceeded exception.
1088
1089 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1090 // Use a throwToSelf rather than a throwToSingleThreaded, because
1091 // it correctly handles the case where the thread is currently
1092 // inside mask. Also the thread might be blocked (e.g. on an
1093 // MVar), and throwToSingleThreaded doesn't unblock it
1094 // correctly in that case.
1095 throwToSelf(cap, t, allocationLimitExceeded_closure);
1096 ASSIGN_Int64((W_*)&(t->alloc_limit),
1097 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1098 }
1099
1100 /* some statistics gathering in the parallel case */
1101 }
1102
1103 /* -----------------------------------------------------------------------------
1104 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1105 * -------------------------------------------------------------------------- */
1106
1107 static rtsBool
1108 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1109 {
1110 // did the task ask for a large block?
1111 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1112 // if so, get one and push it on the front of the nursery.
1113 bdescr *bd;
1114 W_ blocks;
1115
1116 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1117
1118 if (blocks > BLOCKS_PER_MBLOCK) {
1119 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1120 }
1121
1122 debugTrace(DEBUG_sched,
1123 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1124 (long)t->id, what_next_strs[t->what_next], blocks);
1125
1126 // don't do this if the nursery is (nearly) full, we'll GC first.
1127 if (cap->r.rCurrentNursery->link != NULL ||
1128 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1129 // infinite loop if the
1130 // nursery has only one
1131 // block.
1132
1133 bd = allocGroup_lock(blocks);
1134 cap->r.rNursery->n_blocks += blocks;
1135
1136 // link the new group after CurrentNursery
1137 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1138
1139 // initialise it as a nursery block. We initialise the
1140 // step, gen_no, and flags field of *every* sub-block in
1141 // this large block, because this is easier than making
1142 // sure that we always find the block head of a large
1143 // block whenever we call Bdescr() (eg. evacuate() and
1144 // isAlive() in the GC would both have to do this, at
1145 // least).
1146 {
1147 bdescr *x;
1148 for (x = bd; x < bd + blocks; x++) {
1149 initBdescr(x,g0,g0);
1150 x->free = x->start;
1151 x->flags = 0;
1152 }
1153 }
1154
1155 // This assert can be a killer if the app is doing lots
1156 // of large block allocations.
1157 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1158
1159 // now update the nursery to point to the new block
1160 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1161 cap->r.rCurrentNursery = bd;
1162
1163 // we might be unlucky and have another thread get on the
1164 // run queue before us and steal the large block, but in that
1165 // case the thread will just end up requesting another large
1166 // block.
1167 pushOnRunQueue(cap,t);
1168 return rtsFalse; /* not actually GC'ing */
1169 }
1170 }
1171
1172 if (getNewNursery(cap)) {
1173 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1174 pushOnRunQueue(cap,t);
1175 return rtsFalse;
1176 }
1177
1178 if (cap->r.rHpLim == NULL || cap->context_switch) {
1179 // Sometimes we miss a context switch, e.g. when calling
1180 // primitives in a tight loop, MAYBE_GC() doesn't check the
1181 // context switch flag, and we end up waiting for a GC.
1182 // See #1984, and concurrent/should_run/1984
1183 cap->context_switch = 0;
1184 appendToRunQueue(cap,t);
1185 } else {
1186 pushOnRunQueue(cap,t);
1187 }
1188 return rtsTrue;
1189 /* actual GC is done at the end of the while loop in schedule() */
1190 }
1191
1192 /* -----------------------------------------------------------------------------
1193 * Handle a thread that returned to the scheduler with ThreadYielding
1194 * -------------------------------------------------------------------------- */
1195
1196 static rtsBool
1197 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1198 {
1199 /* put the thread back on the run queue. Then, if we're ready to
1200 * GC, check whether this is the last task to stop. If so, wake
1201 * up the GC thread. getThread will block during a GC until the
1202 * GC is finished.
1203 */
1204
1205 ASSERT(t->_link == END_TSO_QUEUE);
1206
1207 // Shortcut if we're just switching evaluators: don't bother
1208 // doing stack squeezing (which can be expensive), just run the
1209 // thread.
1210 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1211 debugTrace(DEBUG_sched,
1212 "--<< thread %ld (%s) stopped to switch evaluators",
1213 (long)t->id, what_next_strs[t->what_next]);
1214 return rtsTrue;
1215 }
1216
1217 // Reset the context switch flag. We don't do this just before
1218 // running the thread, because that would mean we would lose ticks
1219 // during GC, which can lead to unfair scheduling (a thread hogs
1220 // the CPU because the tick always arrives during GC). This way
1221 // penalises threads that do a lot of allocation, but that seems
1222 // better than the alternative.
1223 if (cap->context_switch != 0) {
1224 cap->context_switch = 0;
1225 appendToRunQueue(cap,t);
1226 } else {
1227 pushOnRunQueue(cap,t);
1228 }
1229
1230 IF_DEBUG(sanity,
1231 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1232 checkTSO(t));
1233
1234 return rtsFalse;
1235 }
1236
1237 /* -----------------------------------------------------------------------------
1238 * Handle a thread that returned to the scheduler with ThreadBlocked
1239 * -------------------------------------------------------------------------- */
1240
1241 static void
1242 scheduleHandleThreadBlocked( StgTSO *t
1243 #if !defined(DEBUG)
1244 STG_UNUSED
1245 #endif
1246 )
1247 {
1248
1249 // We don't need to do anything. The thread is blocked, and it
1250 // has tidied up its stack and placed itself on whatever queue
1251 // it needs to be on.
1252
1253 // ASSERT(t->why_blocked != NotBlocked);
1254 // Not true: for example,
1255 // - the thread may have woken itself up already, because
1256 // threadPaused() might have raised a blocked throwTo
1257 // exception, see maybePerformBlockedException().
1258
1259 #ifdef DEBUG
1260 traceThreadStatus(DEBUG_sched, t);
1261 #endif
1262 }
1263
1264 /* -----------------------------------------------------------------------------
1265 * Handle a thread that returned to the scheduler with ThreadFinished
1266 * -------------------------------------------------------------------------- */
1267
1268 static rtsBool
1269 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1270 {
1271 /* Need to check whether this was a main thread, and if so,
1272 * return with the return value.
1273 *
1274 * We also end up here if the thread kills itself with an
1275 * uncaught exception, see Exception.cmm.
1276 */
1277
1278 // blocked exceptions can now complete, even if the thread was in
1279 // blocked mode (see #2910).
1280 awakenBlockedExceptionQueue (cap, t);
1281
1282 //
1283 // Check whether the thread that just completed was a bound
1284 // thread, and if so return with the result.
1285 //
1286 // There is an assumption here that all thread completion goes
1287 // through this point; we need to make sure that if a thread
1288 // ends up in the ThreadKilled state, that it stays on the run
1289 // queue so it can be dealt with here.
1290 //
1291
1292 if (t->bound) {
1293
1294 if (t->bound != task->incall) {
1295 #if !defined(THREADED_RTS)
1296 // Must be a bound thread that is not the topmost one. Leave
1297 // it on the run queue until the stack has unwound to the
1298 // point where we can deal with this. Leaving it on the run
1299 // queue also ensures that the garbage collector knows about
1300 // this thread and its return value (it gets dropped from the
1301 // step->threads list so there's no other way to find it).
1302 appendToRunQueue(cap,t);
1303 return rtsFalse;
1304 #else
1305 // this cannot happen in the threaded RTS, because a
1306 // bound thread can only be run by the appropriate Task.
1307 barf("finished bound thread that isn't mine");
1308 #endif
1309 }
1310
1311 ASSERT(task->incall->tso == t);
1312
1313 if (t->what_next == ThreadComplete) {
1314 if (task->incall->ret) {
1315 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1316 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1317 }
1318 task->incall->stat = Success;
1319 } else {
1320 if (task->incall->ret) {
1321 *(task->incall->ret) = NULL;
1322 }
1323 if (sched_state >= SCHED_INTERRUPTING) {
1324 if (heap_overflow) {
1325 task->incall->stat = HeapExhausted;
1326 } else {
1327 task->incall->stat = Interrupted;
1328 }
1329 } else {
1330 task->incall->stat = Killed;
1331 }
1332 }
1333 #ifdef DEBUG
1334 removeThreadLabel((StgWord)task->incall->tso->id);
1335 #endif
1336
1337 // We no longer consider this thread and task to be bound to
1338 // each other. The TSO lives on until it is GC'd, but the
1339 // task is about to be released by the caller, and we don't
1340 // want anyone following the pointer from the TSO to the
1341 // defunct task (which might have already been
1342 // re-used). This was a real bug: the GC updated
1343 // tso->bound->tso which lead to a deadlock.
1344 t->bound = NULL;
1345 task->incall->tso = NULL;
1346
1347 return rtsTrue; // tells schedule() to return
1348 }
1349
1350 return rtsFalse;
1351 }
1352
1353 /* -----------------------------------------------------------------------------
1354 * Perform a heap census
1355 * -------------------------------------------------------------------------- */
1356
1357 static rtsBool
1358 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1359 {
1360 // When we have +RTS -i0 and we're heap profiling, do a census at
1361 // every GC. This lets us get repeatable runs for debugging.
1362 if (performHeapProfile ||
1363 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1364 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1365 return rtsTrue;
1366 } else {
1367 return rtsFalse;
1368 }
1369 }
1370
1371 /* -----------------------------------------------------------------------------
1372 * Start a synchronisation of all capabilities
1373 * -------------------------------------------------------------------------- */
1374
1375 // Returns:
1376 // 0 if we successfully got a sync
1377 // non-0 if there was another sync request in progress,
1378 // and we yielded to it. The value returned is the
1379 // type of the other sync request.
1380 //
1381 #if defined(THREADED_RTS)
1382 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1383 {
1384 nat prev_pending_sync;
1385
1386 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1387
1388 if (prev_pending_sync)
1389 {
1390 do {
1391 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1392 prev_pending_sync);
1393 ASSERT(*pcap);
1394 yieldCapability(pcap,task,rtsTrue);
1395 } while (pending_sync);
1396 return prev_pending_sync; // NOTE: task->cap might have changed now
1397 }
1398 else
1399 {
1400 return 0;
1401 }
1402 }
1403
1404 //
1405 // Grab all the capabilities except the one we already hold. Used
1406 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1407 // before a fork (SYNC_OTHER).
1408 //
1409 // Only call this after requestSync(), otherwise a deadlock might
1410 // ensue if another thread is trying to synchronise.
1411 //
1412 static void acquireAllCapabilities(Capability *cap, Task *task)
1413 {
1414 Capability *tmpcap;
1415 nat i;
1416
1417 for (i=0; i < n_capabilities; i++) {
1418 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1419 tmpcap = capabilities[i];
1420 if (tmpcap != cap) {
1421 // we better hope this task doesn't get migrated to
1422 // another Capability while we're waiting for this one.
1423 // It won't, because load balancing happens while we have
1424 // all the Capabilities, but even so it's a slightly
1425 // unsavoury invariant.
1426 task->cap = tmpcap;
1427 waitForReturnCapability(&tmpcap, task);
1428 if (tmpcap->no != i) {
1429 barf("acquireAllCapabilities: got the wrong capability");
1430 }
1431 }
1432 }
1433 task->cap = cap;
1434 }
1435
1436 static void releaseAllCapabilities(nat n, Capability *cap, Task *task)
1437 {
1438 nat i;
1439
1440 for (i = 0; i < n; i++) {
1441 if (cap->no != i) {
1442 task->cap = capabilities[i];
1443 releaseCapability(capabilities[i]);
1444 }
1445 }
1446 task->cap = cap;
1447 }
1448 #endif
1449
1450 /* -----------------------------------------------------------------------------
1451 * Perform a garbage collection if necessary
1452 * -------------------------------------------------------------------------- */
1453
1454 static void
1455 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1456 rtsBool force_major)
1457 {
1458 Capability *cap = *pcap;
1459 rtsBool heap_census;
1460 nat collect_gen;
1461 rtsBool major_gc;
1462 #ifdef THREADED_RTS
1463 nat gc_type;
1464 nat i, sync;
1465 StgTSO *tso;
1466 #endif
1467
1468 if (sched_state == SCHED_SHUTTING_DOWN) {
1469 // The final GC has already been done, and the system is
1470 // shutting down. We'll probably deadlock if we try to GC
1471 // now.
1472 return;
1473 }
1474
1475 heap_census = scheduleNeedHeapProfile(rtsTrue);
1476
1477 // Figure out which generation we are collecting, so that we can
1478 // decide whether this is a parallel GC or not.
1479 collect_gen = calcNeeded(force_major || heap_census, NULL);
1480 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1481
1482 #ifdef THREADED_RTS
1483 if (sched_state < SCHED_INTERRUPTING
1484 && RtsFlags.ParFlags.parGcEnabled
1485 && collect_gen >= RtsFlags.ParFlags.parGcGen
1486 && ! oldest_gen->mark)
1487 {
1488 gc_type = SYNC_GC_PAR;
1489 } else {
1490 gc_type = SYNC_GC_SEQ;
1491 }
1492
1493 // In order to GC, there must be no threads running Haskell code.
1494 // Therefore, the GC thread needs to hold *all* the capabilities,
1495 // and release them after the GC has completed.
1496 //
1497 // This seems to be the simplest way: previous attempts involved
1498 // making all the threads with capabilities give up their
1499 // capabilities and sleep except for the *last* one, which
1500 // actually did the GC. But it's quite hard to arrange for all
1501 // the other tasks to sleep and stay asleep.
1502 //
1503
1504 /* Other capabilities are prevented from running yet more Haskell
1505 threads if pending_sync is set. Tested inside
1506 yieldCapability() and releaseCapability() in Capability.c */
1507
1508 do {
1509 sync = requestSync(pcap, task, gc_type);
1510 cap = *pcap;
1511 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1512 // someone else had a pending sync request for a GC, so
1513 // let's assume GC has been done and we don't need to GC
1514 // again.
1515 return;
1516 }
1517 if (sched_state == SCHED_SHUTTING_DOWN) {
1518 // The scheduler might now be shutting down. We tested
1519 // this above, but it might have become true since then as
1520 // we yielded the capability in requestSync().
1521 return;
1522 }
1523 } while (sync);
1524
1525 // don't declare this until after we have sync'd, because
1526 // n_capabilities may change.
1527 rtsBool idle_cap[n_capabilities];
1528 #ifdef DEBUG
1529 unsigned int old_n_capabilities = n_capabilities;
1530 #endif
1531
1532 interruptAllCapabilities();
1533
1534 // The final shutdown GC is always single-threaded, because it's
1535 // possible that some of the Capabilities have no worker threads.
1536
1537 if (gc_type == SYNC_GC_SEQ)
1538 {
1539 traceEventRequestSeqGc(cap);
1540 }
1541 else
1542 {
1543 traceEventRequestParGc(cap);
1544 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1545 }
1546
1547 if (gc_type == SYNC_GC_SEQ)
1548 {
1549 // single-threaded GC: grab all the capabilities
1550 acquireAllCapabilities(cap,task);
1551 }
1552 else
1553 {
1554 // If we are load-balancing collections in this
1555 // generation, then we require all GC threads to participate
1556 // in the collection. Otherwise, we only require active
1557 // threads to participate, and we set gc_threads[i]->idle for
1558 // any idle capabilities. The rationale here is that waking
1559 // up an idle Capability takes much longer than just doing any
1560 // GC work on its behalf.
1561
1562 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1563 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1564 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1565 for (i=0; i < n_capabilities; i++) {
1566 if (capabilities[i]->disabled) {
1567 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1568 } else {
1569 idle_cap[i] = rtsFalse;
1570 }
1571 }
1572 } else {
1573 for (i=0; i < n_capabilities; i++) {
1574 if (capabilities[i]->disabled) {
1575 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1576 } else if (i == cap->no ||
1577 capabilities[i]->idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1578 idle_cap[i] = rtsFalse;
1579 } else {
1580 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1581 if (!idle_cap[i]) {
1582 n_failed_trygrab_idles++;
1583 } else {
1584 n_idle_caps++;
1585 }
1586 }
1587 }
1588 }
1589
1590 // We set the gc_thread[i]->idle flag if that
1591 // capability/thread is not participating in this collection.
1592 // We also keep a local record of which capabilities are idle
1593 // in idle_cap[], because scheduleDoGC() is re-entrant:
1594 // another thread might start a GC as soon as we've finished
1595 // this one, and thus the gc_thread[]->idle flags are invalid
1596 // as soon as we release any threads after GC. Getting this
1597 // wrong leads to a rare and hard to debug deadlock!
1598
1599 for (i=0; i < n_capabilities; i++) {
1600 gc_threads[i]->idle = idle_cap[i];
1601 capabilities[i]->idle++;
1602 }
1603
1604 // For all capabilities participating in this GC, wait until
1605 // they have stopped mutating and are standing by for GC.
1606 waitForGcThreads(cap);
1607
1608 #if defined(THREADED_RTS)
1609 // Stable point where we can do a global check on our spark counters
1610 ASSERT(checkSparkCountInvariant());
1611 #endif
1612 }
1613
1614 #endif
1615
1616 IF_DEBUG(scheduler, printAllThreads());
1617
1618 delete_threads_and_gc:
1619 /*
1620 * We now have all the capabilities; if we're in an interrupting
1621 * state, then we should take the opportunity to delete all the
1622 * threads in the system.
1623 * Checking for major_gc ensures that the last GC is major.
1624 */
1625 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1626 deleteAllThreads(cap);
1627 #if defined(THREADED_RTS)
1628 // Discard all the sparks from every Capability. Why?
1629 // They'll probably be GC'd anyway since we've killed all the
1630 // threads. It just avoids the GC having to do any work to
1631 // figure out that any remaining sparks are garbage.
1632 for (i = 0; i < n_capabilities; i++) {
1633 capabilities[i]->spark_stats.gcd +=
1634 sparkPoolSize(capabilities[i]->sparks);
1635 // No race here since all Caps are stopped.
1636 discardSparksCap(capabilities[i]);
1637 }
1638 #endif
1639 sched_state = SCHED_SHUTTING_DOWN;
1640 }
1641
1642 /*
1643 * When there are disabled capabilities, we want to migrate any
1644 * threads away from them. Normally this happens in the
1645 * scheduler's loop, but only for unbound threads - it's really
1646 * hard for a bound thread to migrate itself. So we have another
1647 * go here.
1648 */
1649 #if defined(THREADED_RTS)
1650 for (i = enabled_capabilities; i < n_capabilities; i++) {
1651 Capability *tmp_cap, *dest_cap;
1652 tmp_cap = capabilities[i];
1653 ASSERT(tmp_cap->disabled);
1654 if (i != cap->no) {
1655 dest_cap = capabilities[i % enabled_capabilities];
1656 while (!emptyRunQueue(tmp_cap)) {
1657 tso = popRunQueue(tmp_cap);
1658 migrateThread(tmp_cap, tso, dest_cap);
1659 if (tso->bound) {
1660 traceTaskMigrate(tso->bound->task,
1661 tso->bound->task->cap,
1662 dest_cap);
1663 tso->bound->task->cap = dest_cap;
1664 }
1665 }
1666 }
1667 }
1668 #endif
1669
1670 #if defined(THREADED_RTS)
1671 // reset pending_sync *before* GC, so that when the GC threads
1672 // emerge they don't immediately re-enter the GC.
1673 pending_sync = 0;
1674 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1675 #else
1676 GarbageCollect(collect_gen, heap_census, 0, cap);
1677 #endif
1678
1679 traceSparkCounters(cap);
1680
1681 switch (recent_activity) {
1682 case ACTIVITY_INACTIVE:
1683 if (force_major) {
1684 // We are doing a GC because the system has been idle for a
1685 // timeslice and we need to check for deadlock. Record the
1686 // fact that we've done a GC and turn off the timer signal;
1687 // it will get re-enabled if we run any threads after the GC.
1688 recent_activity = ACTIVITY_DONE_GC;
1689 #ifndef PROFILING
1690 stopTimer();
1691 #endif
1692 break;
1693 }
1694 // fall through...
1695
1696 case ACTIVITY_MAYBE_NO:
1697 // the GC might have taken long enough for the timer to set
1698 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1699 // but we aren't necessarily deadlocked:
1700 recent_activity = ACTIVITY_YES;
1701 break;
1702
1703 case ACTIVITY_DONE_GC:
1704 // If we are actually active, the scheduler will reset the
1705 // recent_activity flag and re-enable the timer.
1706 break;
1707 }
1708
1709 #if defined(THREADED_RTS)
1710 // Stable point where we can do a global check on our spark counters
1711 ASSERT(checkSparkCountInvariant());
1712 #endif
1713
1714 // The heap census itself is done during GarbageCollect().
1715 if (heap_census) {
1716 performHeapProfile = rtsFalse;
1717 }
1718
1719 #if defined(THREADED_RTS)
1720
1721 // If n_capabilities has changed during GC, we're in trouble.
1722 ASSERT(n_capabilities == old_n_capabilities);
1723
1724 if (gc_type == SYNC_GC_PAR)
1725 {
1726 releaseGCThreads(cap);
1727 for (i = 0; i < n_capabilities; i++) {
1728 if (i != cap->no) {
1729 if (idle_cap[i]) {
1730 ASSERT(capabilities[i]->running_task == task);
1731 task->cap = capabilities[i];
1732 releaseCapability(capabilities[i]);
1733 } else {
1734 ASSERT(capabilities[i]->running_task != task);
1735 }
1736 }
1737 }
1738 task->cap = cap;
1739 }
1740 #endif
1741
1742 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1743 // GC set the heap_overflow flag, so we should proceed with
1744 // an orderly shutdown now. Ultimately we want the main
1745 // thread to return to its caller with HeapExhausted, at which
1746 // point the caller should call hs_exit(). The first step is
1747 // to delete all the threads.
1748 //
1749 // Another way to do this would be to raise an exception in
1750 // the main thread, which we really should do because it gives
1751 // the program a chance to clean up. But how do we find the
1752 // main thread? It should presumably be the same one that
1753 // gets ^C exceptions, but that's all done on the Haskell side
1754 // (GHC.TopHandler).
1755 sched_state = SCHED_INTERRUPTING;
1756 goto delete_threads_and_gc;
1757 }
1758
1759 #ifdef SPARKBALANCE
1760 /* JB
1761 Once we are all together... this would be the place to balance all
1762 spark pools. No concurrent stealing or adding of new sparks can
1763 occur. Should be defined in Sparks.c. */
1764 balanceSparkPoolsCaps(n_capabilities, capabilities);
1765 #endif
1766
1767 #if defined(THREADED_RTS)
1768 if (gc_type == SYNC_GC_SEQ) {
1769 // release our stash of capabilities.
1770 releaseAllCapabilities(n_capabilities, cap, task);
1771 }
1772 #endif
1773
1774 return;
1775 }
1776
1777 /* ---------------------------------------------------------------------------
1778 * Singleton fork(). Do not copy any running threads.
1779 * ------------------------------------------------------------------------- */
1780
1781 pid_t
1782 forkProcess(HsStablePtr *entry
1783 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1784 STG_UNUSED
1785 #endif
1786 )
1787 {
1788 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1789 pid_t pid;
1790 StgTSO* t,*next;
1791 Capability *cap;
1792 nat g;
1793 Task *task = NULL;
1794 nat i;
1795 #ifdef THREADED_RTS
1796 nat sync;
1797 #endif
1798
1799 debugTrace(DEBUG_sched, "forking!");
1800
1801 task = newBoundTask();
1802
1803 cap = NULL;
1804 waitForReturnCapability(&cap, task);
1805
1806 #ifdef THREADED_RTS
1807 do {
1808 sync = requestSync(&cap, task, SYNC_OTHER);
1809 } while (sync);
1810
1811 acquireAllCapabilities(cap,task);
1812
1813 pending_sync = 0;
1814 #endif
1815
1816 // no funny business: hold locks while we fork, otherwise if some
1817 // other thread is holding a lock when the fork happens, the data
1818 // structure protected by the lock will forever be in an
1819 // inconsistent state in the child. See also #1391.
1820 ACQUIRE_LOCK(&sched_mutex);
1821 ACQUIRE_LOCK(&sm_mutex);
1822 ACQUIRE_LOCK(&stable_mutex);
1823 ACQUIRE_LOCK(&task->lock);
1824
1825 for (i=0; i < n_capabilities; i++) {
1826 ACQUIRE_LOCK(&capabilities[i]->lock);
1827 }
1828
1829 #ifdef THREADED_RTS
1830 ACQUIRE_LOCK(&all_tasks_mutex);
1831 #endif
1832
1833 stopTimer(); // See #4074
1834
1835 #if defined(TRACING)
1836 flushEventLog(); // so that child won't inherit dirty file buffers
1837 #endif
1838
1839 pid = fork();
1840
1841 if (pid) { // parent
1842
1843 startTimer(); // #4074
1844
1845 RELEASE_LOCK(&sched_mutex);
1846 RELEASE_LOCK(&sm_mutex);
1847 RELEASE_LOCK(&stable_mutex);
1848 RELEASE_LOCK(&task->lock);
1849
1850 for (i=0; i < n_capabilities; i++) {
1851 releaseCapability_(capabilities[i],rtsFalse);
1852 RELEASE_LOCK(&capabilities[i]->lock);
1853 }
1854
1855 #ifdef THREADED_RTS
1856 RELEASE_LOCK(&all_tasks_mutex);
1857 #endif
1858
1859 boundTaskExiting(task);
1860
1861 // just return the pid
1862 return pid;
1863
1864 } else { // child
1865
1866 #if defined(THREADED_RTS)
1867 initMutex(&sched_mutex);
1868 initMutex(&sm_mutex);
1869 initMutex(&stable_mutex);
1870 initMutex(&task->lock);
1871
1872 for (i=0; i < n_capabilities; i++) {
1873 initMutex(&capabilities[i]->lock);
1874 }
1875
1876 initMutex(&all_tasks_mutex);
1877 #endif
1878
1879 #ifdef TRACING
1880 resetTracing();
1881 #endif
1882
1883 // Now, all OS threads except the thread that forked are
1884 // stopped. We need to stop all Haskell threads, including
1885 // those involved in foreign calls. Also we need to delete
1886 // all Tasks, because they correspond to OS threads that are
1887 // now gone.
1888
1889 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1890 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1891 next = t->global_link;
1892 // don't allow threads to catch the ThreadKilled
1893 // exception, but we do want to raiseAsync() because these
1894 // threads may be evaluating thunks that we need later.
1895 deleteThread_(t->cap,t);
1896
1897 // stop the GC from updating the InCall to point to
1898 // the TSO. This is only necessary because the
1899 // OSThread bound to the TSO has been killed, and
1900 // won't get a chance to exit in the usual way (see
1901 // also scheduleHandleThreadFinished).
1902 t->bound = NULL;
1903 }
1904 }
1905
1906 discardTasksExcept(task);
1907
1908 for (i=0; i < n_capabilities; i++) {
1909 cap = capabilities[i];
1910
1911 // Empty the run queue. It seems tempting to let all the
1912 // killed threads stay on the run queue as zombies to be
1913 // cleaned up later, but some of them may correspond to
1914 // bound threads for which the corresponding Task does not
1915 // exist.
1916 truncateRunQueue(cap);
1917
1918 // Any suspended C-calling Tasks are no more, their OS threads
1919 // don't exist now:
1920 cap->suspended_ccalls = NULL;
1921
1922 #if defined(THREADED_RTS)
1923 // Wipe our spare workers list, they no longer exist. New
1924 // workers will be created if necessary.
1925 cap->spare_workers = NULL;
1926 cap->n_spare_workers = 0;
1927 cap->returning_tasks_hd = NULL;
1928 cap->returning_tasks_tl = NULL;
1929 #endif
1930
1931 // Release all caps except 0, we'll use that for starting
1932 // the IO manager and running the client action below.
1933 if (cap->no != 0) {
1934 task->cap = cap;
1935 releaseCapability(cap);
1936 }
1937 }
1938 cap = capabilities[0];
1939 task->cap = cap;
1940
1941 // Empty the threads lists. Otherwise, the garbage
1942 // collector may attempt to resurrect some of these threads.
1943 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1944 generations[g].threads = END_TSO_QUEUE;
1945 }
1946
1947 // On Unix, all timers are reset in the child, so we need to start
1948 // the timer again.
1949 initTimer();
1950 startTimer();
1951
1952 // TODO: need to trace various other things in the child
1953 // like startup event, capabilities, process info etc
1954 traceTaskCreate(task, cap);
1955
1956 #if defined(THREADED_RTS)
1957 ioManagerStartCap(&cap);
1958 #endif
1959
1960 rts_evalStableIO(&cap, entry, NULL); // run the action
1961 rts_checkSchedStatus("forkProcess",cap);
1962
1963 rts_unlock(cap);
1964 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
1965 }
1966 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1967 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1968 #endif
1969 }
1970
1971 /* ---------------------------------------------------------------------------
1972 * Changing the number of Capabilities
1973 *
1974 * Changing the number of Capabilities is very tricky! We can only do
1975 * it with the system fully stopped, so we do a full sync with
1976 * requestSync(SYNC_OTHER) and grab all the capabilities.
1977 *
1978 * Then we resize the appropriate data structures, and update all
1979 * references to the old data structures which have now moved.
1980 * Finally we release the Capabilities we are holding, and start
1981 * worker Tasks on the new Capabilities we created.
1982 *
1983 * ------------------------------------------------------------------------- */
1984
1985 void
1986 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1987 {
1988 #if !defined(THREADED_RTS)
1989 if (new_n_capabilities != 1) {
1990 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1991 }
1992 return;
1993 #elif defined(NOSMP)
1994 if (new_n_capabilities != 1) {
1995 errorBelch("setNumCapabilities: not supported on this platform");
1996 }
1997 return;
1998 #else
1999 Task *task;
2000 Capability *cap;
2001 nat sync;
2002 nat n;
2003 Capability *old_capabilities = NULL;
2004 nat old_n_capabilities = n_capabilities;
2005
2006 if (new_n_capabilities == enabled_capabilities) return;
2007
2008 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2009 enabled_capabilities, new_n_capabilities);
2010
2011 cap = rts_lock();
2012 task = cap->running_task;
2013
2014 do {
2015 sync = requestSync(&cap, task, SYNC_OTHER);
2016 } while (sync);
2017
2018 acquireAllCapabilities(cap,task);
2019
2020 pending_sync = 0;
2021
2022 if (new_n_capabilities < enabled_capabilities)
2023 {
2024 // Reducing the number of capabilities: we do not actually
2025 // remove the extra capabilities, we just mark them as
2026 // "disabled". This has the following effects:
2027 //
2028 // - threads on a disabled capability are migrated away by the
2029 // scheduler loop
2030 //
2031 // - disabled capabilities do not participate in GC
2032 // (see scheduleDoGC())
2033 //
2034 // - No spark threads are created on this capability
2035 // (see scheduleActivateSpark())
2036 //
2037 // - We do not attempt to migrate threads *to* a disabled
2038 // capability (see schedulePushWork()).
2039 //
2040 // but in other respects, a disabled capability remains
2041 // alive. Threads may be woken up on a disabled capability,
2042 // but they will be immediately migrated away.
2043 //
2044 // This approach is much easier than trying to actually remove
2045 // the capability; we don't have to worry about GC data
2046 // structures, the nursery, etc.
2047 //
2048 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2049 capabilities[n]->disabled = rtsTrue;
2050 traceCapDisable(capabilities[n]);
2051 }
2052 enabled_capabilities = new_n_capabilities;
2053 }
2054 else
2055 {
2056 // Increasing the number of enabled capabilities.
2057 //
2058 // enable any disabled capabilities, up to the required number
2059 for (n = enabled_capabilities;
2060 n < new_n_capabilities && n < n_capabilities; n++) {
2061 capabilities[n]->disabled = rtsFalse;
2062 traceCapEnable(capabilities[n]);
2063 }
2064 enabled_capabilities = n;
2065
2066 if (new_n_capabilities > n_capabilities) {
2067 #if defined(TRACING)
2068 // Allocate eventlog buffers for the new capabilities. Note this
2069 // must be done before calling moreCapabilities(), because that
2070 // will emit events about creating the new capabilities and adding
2071 // them to existing capsets.
2072 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2073 #endif
2074
2075 // Resize the capabilities array
2076 // NB. after this, capabilities points somewhere new. Any pointers
2077 // of type (Capability *) are now invalid.
2078 moreCapabilities(n_capabilities, new_n_capabilities);
2079
2080 // Resize and update storage manager data structures
2081 storageAddCapabilities(n_capabilities, new_n_capabilities);
2082 }
2083 }
2084
2085 // update n_capabilities before things start running
2086 if (new_n_capabilities > n_capabilities) {
2087 n_capabilities = enabled_capabilities = new_n_capabilities;
2088 }
2089
2090 // Start worker tasks on the new Capabilities
2091 startWorkerTasks(old_n_capabilities, new_n_capabilities);
2092
2093 // We're done: release the original Capabilities
2094 releaseAllCapabilities(old_n_capabilities, cap,task);
2095
2096 // We can't free the old array until now, because we access it
2097 // while updating pointers in updateCapabilityRefs().
2098 if (old_capabilities) {
2099 stgFree(old_capabilities);
2100 }
2101
2102 // Notify IO manager that the number of capabilities has changed.
2103 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2104
2105 rts_unlock(cap);
2106
2107 #endif // THREADED_RTS
2108 }
2109
2110
2111
2112 /* ---------------------------------------------------------------------------
2113 * Delete all the threads in the system
2114 * ------------------------------------------------------------------------- */
2115
2116 static void
2117 deleteAllThreads ( Capability *cap )
2118 {
2119 // NOTE: only safe to call if we own all capabilities.
2120
2121 StgTSO* t, *next;
2122 nat g;
2123
2124 debugTrace(DEBUG_sched,"deleting all threads");
2125 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2126 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2127 next = t->global_link;
2128 deleteThread(cap,t);
2129 }
2130 }
2131
2132 // The run queue now contains a bunch of ThreadKilled threads. We
2133 // must not throw these away: the main thread(s) will be in there
2134 // somewhere, and the main scheduler loop has to deal with it.
2135 // Also, the run queue is the only thing keeping these threads from
2136 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2137
2138 #if !defined(THREADED_RTS)
2139 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2140 ASSERT(sleeping_queue == END_TSO_QUEUE);
2141 #endif
2142 }
2143
2144 /* -----------------------------------------------------------------------------
2145 Managing the suspended_ccalls list.
2146 Locks required: sched_mutex
2147 -------------------------------------------------------------------------- */
2148
2149 STATIC_INLINE void
2150 suspendTask (Capability *cap, Task *task)
2151 {
2152 InCall *incall;
2153
2154 incall = task->incall;
2155 ASSERT(incall->next == NULL && incall->prev == NULL);
2156 incall->next = cap->suspended_ccalls;
2157 incall->prev = NULL;
2158 if (cap->suspended_ccalls) {
2159 cap->suspended_ccalls->prev = incall;
2160 }
2161 cap->suspended_ccalls = incall;
2162 }
2163
2164 STATIC_INLINE void
2165 recoverSuspendedTask (Capability *cap, Task *task)
2166 {
2167 InCall *incall;
2168
2169 incall = task->incall;
2170 if (incall->prev) {
2171 incall->prev->next = incall->next;
2172 } else {
2173 ASSERT(cap->suspended_ccalls == incall);
2174 cap->suspended_ccalls = incall->next;
2175 }
2176 if (incall->next) {
2177 incall->next->prev = incall->prev;
2178 }
2179 incall->next = incall->prev = NULL;
2180 }
2181
2182 /* ---------------------------------------------------------------------------
2183 * Suspending & resuming Haskell threads.
2184 *
2185 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2186 * its capability before calling the C function. This allows another
2187 * task to pick up the capability and carry on running Haskell
2188 * threads. It also means that if the C call blocks, it won't lock
2189 * the whole system.
2190 *
2191 * The Haskell thread making the C call is put to sleep for the
2192 * duration of the call, on the suspended_ccalling_threads queue. We
2193 * give out a token to the task, which it can use to resume the thread
2194 * on return from the C function.
2195 *
2196 * If this is an interruptible C call, this means that the FFI call may be
2197 * unceremoniously terminated and should be scheduled on an
2198 * unbound worker thread.
2199 * ------------------------------------------------------------------------- */
2200
2201 void *
2202 suspendThread (StgRegTable *reg, rtsBool interruptible)
2203 {
2204 Capability *cap;
2205 int saved_errno;
2206 StgTSO *tso;
2207 Task *task;
2208 #if mingw32_HOST_OS
2209 StgWord32 saved_winerror;
2210 #endif
2211
2212 saved_errno = errno;
2213 #if mingw32_HOST_OS
2214 saved_winerror = GetLastError();
2215 #endif
2216
2217 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2218 */
2219 cap = regTableToCapability(reg);
2220
2221 task = cap->running_task;
2222 tso = cap->r.rCurrentTSO;
2223
2224 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2225
2226 // XXX this might not be necessary --SDM
2227 tso->what_next = ThreadRunGHC;
2228
2229 threadPaused(cap,tso);
2230
2231 if (interruptible) {
2232 tso->why_blocked = BlockedOnCCall_Interruptible;
2233 } else {
2234 tso->why_blocked = BlockedOnCCall;
2235 }
2236
2237 // Hand back capability
2238 task->incall->suspended_tso = tso;
2239 task->incall->suspended_cap = cap;
2240
2241 // Otherwise allocate() will write to invalid memory.
2242 cap->r.rCurrentTSO = NULL;
2243
2244 ACQUIRE_LOCK(&cap->lock);
2245
2246 suspendTask(cap,task);
2247 cap->in_haskell = rtsFalse;
2248 releaseCapability_(cap,rtsFalse);
2249
2250 RELEASE_LOCK(&cap->lock);
2251
2252 errno = saved_errno;
2253 #if mingw32_HOST_OS
2254 SetLastError(saved_winerror);
2255 #endif
2256 return task;
2257 }
2258
2259 StgRegTable *
2260 resumeThread (void *task_)
2261 {
2262 StgTSO *tso;
2263 InCall *incall;
2264 Capability *cap;
2265 Task *task = task_;
2266 int saved_errno;
2267 #if mingw32_HOST_OS
2268 StgWord32 saved_winerror;
2269 #endif
2270
2271 saved_errno = errno;
2272 #if mingw32_HOST_OS
2273 saved_winerror = GetLastError();
2274 #endif
2275
2276 incall = task->incall;
2277 cap = incall->suspended_cap;
2278 task->cap = cap;
2279
2280 // Wait for permission to re-enter the RTS with the result.
2281 waitForReturnCapability(&cap,task);
2282 // we might be on a different capability now... but if so, our
2283 // entry on the suspended_ccalls list will also have been
2284 // migrated.
2285
2286 // Remove the thread from the suspended list
2287 recoverSuspendedTask(cap,task);
2288
2289 tso = incall->suspended_tso;
2290 incall->suspended_tso = NULL;
2291 incall->suspended_cap = NULL;
2292 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2293
2294 traceEventRunThread(cap, tso);
2295
2296 /* Reset blocking status */
2297 tso->why_blocked = NotBlocked;
2298
2299 if ((tso->flags & TSO_BLOCKEX) == 0) {
2300 // avoid locking the TSO if we don't have to
2301 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2302 maybePerformBlockedException(cap,tso);
2303 }
2304 }
2305
2306 cap->r.rCurrentTSO = tso;
2307 cap->in_haskell = rtsTrue;
2308 errno = saved_errno;
2309 #if mingw32_HOST_OS
2310 SetLastError(saved_winerror);
2311 #endif
2312
2313 /* We might have GC'd, mark the TSO dirty again */
2314 dirty_TSO(cap,tso);
2315 dirty_STACK(cap,tso->stackobj);
2316
2317 IF_DEBUG(sanity, checkTSO(tso));
2318
2319 return &cap->r;
2320 }
2321
2322 /* ---------------------------------------------------------------------------
2323 * scheduleThread()
2324 *
2325 * scheduleThread puts a thread on the end of the runnable queue.
2326 * This will usually be done immediately after a thread is created.
2327 * The caller of scheduleThread must create the thread using e.g.
2328 * createThread and push an appropriate closure
2329 * on this thread's stack before the scheduler is invoked.
2330 * ------------------------------------------------------------------------ */
2331
2332 void
2333 scheduleThread(Capability *cap, StgTSO *tso)
2334 {
2335 // The thread goes at the *end* of the run-queue, to avoid possible
2336 // starvation of any threads already on the queue.
2337 appendToRunQueue(cap,tso);
2338 }
2339
2340 void
2341 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2342 {
2343 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2344 // move this thread from now on.
2345 #if defined(THREADED_RTS)
2346 cpu %= enabled_capabilities;
2347 if (cpu == cap->no) {
2348 appendToRunQueue(cap,tso);
2349 } else {
2350 migrateThread(cap, tso, capabilities[cpu]);
2351 }
2352 #else
2353 appendToRunQueue(cap,tso);
2354 #endif
2355 }
2356
2357 void
2358 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2359 {
2360 Task *task;
2361 DEBUG_ONLY( StgThreadID id );
2362 Capability *cap;
2363
2364 cap = *pcap;
2365
2366 // We already created/initialised the Task
2367 task = cap->running_task;
2368
2369 // This TSO is now a bound thread; make the Task and TSO
2370 // point to each other.
2371 tso->bound = task->incall;
2372 tso->cap = cap;
2373
2374 task->incall->tso = tso;
2375 task->incall->ret = ret;
2376 task->incall->stat = NoStatus;
2377
2378 appendToRunQueue(cap,tso);
2379
2380 DEBUG_ONLY( id = tso->id );
2381 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2382
2383 cap = schedule(cap,task);
2384
2385 ASSERT(task->incall->stat != NoStatus);
2386 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2387
2388 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2389 *pcap = cap;
2390 }
2391
2392 /* ----------------------------------------------------------------------------
2393 * Starting Tasks
2394 * ------------------------------------------------------------------------- */
2395
2396 #if defined(THREADED_RTS)
2397 void scheduleWorker (Capability *cap, Task *task)
2398 {
2399 // schedule() runs without a lock.
2400 cap = schedule(cap,task);
2401
2402 // On exit from schedule(), we have a Capability, but possibly not
2403 // the same one we started with.
2404
2405 // During shutdown, the requirement is that after all the
2406 // Capabilities are shut down, all workers that are shutting down
2407 // have finished workerTaskStop(). This is why we hold on to
2408 // cap->lock until we've finished workerTaskStop() below.
2409 //
2410 // There may be workers still involved in foreign calls; those
2411 // will just block in waitForReturnCapability() because the
2412 // Capability has been shut down.
2413 //
2414 ACQUIRE_LOCK(&cap->lock);
2415 releaseCapability_(cap,rtsFalse);
2416 workerTaskStop(task);
2417 RELEASE_LOCK(&cap->lock);
2418 }
2419 #endif
2420
2421 /* ---------------------------------------------------------------------------
2422 * Start new worker tasks on Capabilities from--to
2423 * -------------------------------------------------------------------------- */
2424
2425 static void
2426 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2427 {
2428 #if defined(THREADED_RTS)
2429 nat i;
2430 Capability *cap;
2431
2432 for (i = from; i < to; i++) {
2433 cap = capabilities[i];
2434 ACQUIRE_LOCK(&cap->lock);
2435 startWorkerTask(cap);
2436 RELEASE_LOCK(&cap->lock);
2437 }
2438 #endif
2439 }
2440
2441 /* ---------------------------------------------------------------------------
2442 * initScheduler()
2443 *
2444 * Initialise the scheduler. This resets all the queues - if the
2445 * queues contained any threads, they'll be garbage collected at the
2446 * next pass.
2447 *
2448 * ------------------------------------------------------------------------ */
2449
2450 void
2451 initScheduler(void)
2452 {
2453 #if !defined(THREADED_RTS)
2454 blocked_queue_hd = END_TSO_QUEUE;
2455 blocked_queue_tl = END_TSO_QUEUE;
2456 sleeping_queue = END_TSO_QUEUE;
2457 #endif
2458
2459 sched_state = SCHED_RUNNING;
2460 recent_activity = ACTIVITY_YES;
2461
2462 #if defined(THREADED_RTS)
2463 /* Initialise the mutex and condition variables used by
2464 * the scheduler. */
2465 initMutex(&sched_mutex);
2466 #endif
2467
2468 ACQUIRE_LOCK(&sched_mutex);
2469
2470 /* A capability holds the state a native thread needs in
2471 * order to execute STG code. At least one capability is
2472 * floating around (only THREADED_RTS builds have more than one).
2473 */
2474 initCapabilities();
2475
2476 initTaskManager();
2477
2478 /*
2479 * Eagerly start one worker to run each Capability, except for
2480 * Capability 0. The idea is that we're probably going to start a
2481 * bound thread on Capability 0 pretty soon, so we don't want a
2482 * worker task hogging it.
2483 */
2484 startWorkerTasks(1, n_capabilities);
2485
2486 RELEASE_LOCK(&sched_mutex);
2487
2488 }
2489
2490 void
2491 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2492 /* see Capability.c, shutdownCapability() */
2493 {
2494 Task *task = NULL;
2495
2496 task = newBoundTask();
2497
2498 // If we haven't killed all the threads yet, do it now.
2499 if (sched_state < SCHED_SHUTTING_DOWN) {
2500 sched_state = SCHED_INTERRUPTING;
2501 Capability *cap = task->cap;
2502 waitForReturnCapability(&cap,task);
2503 scheduleDoGC(&cap,task,rtsTrue);
2504 ASSERT(task->incall->tso == NULL);
2505 releaseCapability(cap);
2506 }
2507 sched_state = SCHED_SHUTTING_DOWN;
2508
2509 shutdownCapabilities(task, wait_foreign);
2510
2511 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2512 // n_failed_trygrab_idles, n_idle_caps);
2513
2514 boundTaskExiting(task);
2515 }
2516
2517 void
2518 freeScheduler( void )
2519 {
2520 nat still_running;
2521
2522 ACQUIRE_LOCK(&sched_mutex);
2523 still_running = freeTaskManager();
2524 // We can only free the Capabilities if there are no Tasks still
2525 // running. We might have a Task about to return from a foreign
2526 // call into waitForReturnCapability(), for example (actually,
2527 // this should be the *only* thing that a still-running Task can
2528 // do at this point, and it will block waiting for the
2529 // Capability).
2530 if (still_running == 0) {
2531 freeCapabilities();
2532 }
2533 RELEASE_LOCK(&sched_mutex);
2534 #if defined(THREADED_RTS)
2535 closeMutex(&sched_mutex);
2536 #endif
2537 }
2538
2539 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2540 void *user USED_IF_NOT_THREADS)
2541 {
2542 #if !defined(THREADED_RTS)
2543 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2544 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2545 evac(user, (StgClosure **)(void *)&sleeping_queue);
2546 #endif
2547 }
2548
2549 /* -----------------------------------------------------------------------------
2550 performGC
2551
2552 This is the interface to the garbage collector from Haskell land.
2553 We provide this so that external C code can allocate and garbage
2554 collect when called from Haskell via _ccall_GC.
2555 -------------------------------------------------------------------------- */
2556
2557 static void
2558 performGC_(rtsBool force_major)
2559 {
2560 Task *task;
2561 Capability *cap = NULL;
2562
2563 // We must grab a new Task here, because the existing Task may be
2564 // associated with a particular Capability, and chained onto the
2565 // suspended_ccalls queue.
2566 task = newBoundTask();
2567
2568 // TODO: do we need to traceTask*() here?
2569
2570 waitForReturnCapability(&cap,task);
2571 scheduleDoGC(&cap,task,force_major);
2572 releaseCapability(cap);
2573 boundTaskExiting(task);
2574 }
2575
2576 void
2577 performGC(void)
2578 {
2579 performGC_(rtsFalse);
2580 }
2581
2582 void
2583 performMajorGC(void)
2584 {
2585 performGC_(rtsTrue);
2586 }
2587
2588 /* ---------------------------------------------------------------------------
2589 Interrupt execution
2590 - usually called inside a signal handler so it mustn't do anything fancy.
2591 ------------------------------------------------------------------------ */
2592
2593 void
2594 interruptStgRts(void)
2595 {
2596 sched_state = SCHED_INTERRUPTING;
2597 interruptAllCapabilities();
2598 #if defined(THREADED_RTS)
2599 wakeUpRts();
2600 #endif
2601 }
2602
2603 /* -----------------------------------------------------------------------------
2604 Wake up the RTS
2605
2606 This function causes at least one OS thread to wake up and run the
2607 scheduler loop. It is invoked when the RTS might be deadlocked, or
2608 an external event has arrived that may need servicing (eg. a
2609 keyboard interrupt).
2610
2611 In the single-threaded RTS we don't do anything here; we only have
2612 one thread anyway, and the event that caused us to want to wake up
2613 will have interrupted any blocking system call in progress anyway.
2614 -------------------------------------------------------------------------- */
2615
2616 #if defined(THREADED_RTS)
2617 void wakeUpRts(void)
2618 {
2619 // This forces the IO Manager thread to wakeup, which will
2620 // in turn ensure that some OS thread wakes up and runs the
2621 // scheduler loop, which will cause a GC and deadlock check.
2622 ioManagerWakeup();
2623 }
2624 #endif
2625
2626 /* -----------------------------------------------------------------------------
2627 Deleting threads
2628
2629 This is used for interruption (^C) and forking, and corresponds to
2630 raising an exception but without letting the thread catch the
2631 exception.
2632 -------------------------------------------------------------------------- */
2633
2634 static void
2635 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2636 {
2637 // NOTE: must only be called on a TSO that we have exclusive
2638 // access to, because we will call throwToSingleThreaded() below.
2639 // The TSO must be on the run queue of the Capability we own, or
2640 // we must own all Capabilities.
2641
2642 if (tso->why_blocked != BlockedOnCCall &&
2643 tso->why_blocked != BlockedOnCCall_Interruptible) {
2644 throwToSingleThreaded(tso->cap,tso,NULL);
2645 }
2646 }
2647
2648 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2649 static void
2650 deleteThread_(Capability *cap, StgTSO *tso)
2651 { // for forkProcess only:
2652 // like deleteThread(), but we delete threads in foreign calls, too.
2653
2654 if (tso->why_blocked == BlockedOnCCall ||
2655 tso->why_blocked == BlockedOnCCall_Interruptible) {
2656 tso->what_next = ThreadKilled;
2657 appendToRunQueue(tso->cap, tso);
2658 } else {
2659 deleteThread(cap,tso);
2660 }
2661 }
2662 #endif
2663
2664 /* -----------------------------------------------------------------------------
2665 raiseExceptionHelper
2666
2667 This function is called by the raise# primitve, just so that we can
2668 move some of the tricky bits of raising an exception from C-- into
2669 C. Who knows, it might be a useful re-useable thing here too.
2670 -------------------------------------------------------------------------- */
2671
2672 StgWord
2673 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2674 {
2675 Capability *cap = regTableToCapability(reg);
2676 StgThunk *raise_closure = NULL;
2677 StgPtr p, next;
2678 StgRetInfoTable *info;
2679 //
2680 // This closure represents the expression 'raise# E' where E
2681 // is the exception raise. It is used to overwrite all the
2682 // thunks which are currently under evaluataion.
2683 //
2684
2685 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2686 // LDV profiling: stg_raise_info has THUNK as its closure
2687 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2688 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2689 // 1 does not cause any problem unless profiling is performed.
2690 // However, when LDV profiling goes on, we need to linearly scan
2691 // small object pool, where raise_closure is stored, so we should
2692 // use MIN_UPD_SIZE.
2693 //
2694 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2695 // sizeofW(StgClosure)+1);
2696 //
2697
2698 //
2699 // Walk up the stack, looking for the catch frame. On the way,
2700 // we update any closures pointed to from update frames with the
2701 // raise closure that we just built.
2702 //
2703 p = tso->stackobj->sp;
2704 while(1) {
2705 info = get_ret_itbl((StgClosure *)p);
2706 next = p + stack_frame_sizeW((StgClosure *)p);
2707 switch (info->i.type) {
2708
2709 case UPDATE_FRAME:
2710 // Only create raise_closure if we need to.
2711 if (raise_closure == NULL) {
2712 raise_closure =
2713 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2714 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2715 raise_closure->payload[0] = exception;
2716 }
2717 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2718 (StgClosure *)raise_closure);
2719 p = next;
2720 continue;
2721
2722 case ATOMICALLY_FRAME:
2723 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2724 tso->stackobj->sp = p;
2725 return ATOMICALLY_FRAME;
2726
2727 case CATCH_FRAME:
2728 tso->stackobj->sp = p;
2729 return CATCH_FRAME;
2730
2731 case CATCH_STM_FRAME:
2732 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2733 tso->stackobj->sp = p;
2734 return CATCH_STM_FRAME;
2735
2736 case UNDERFLOW_FRAME:
2737 tso->stackobj->sp = p;
2738 threadStackUnderflow(cap,tso);
2739 p = tso->stackobj->sp;
2740 continue;
2741
2742 case STOP_FRAME:
2743 tso->stackobj->sp = p;
2744 return STOP_FRAME;
2745
2746 case CATCH_RETRY_FRAME: {
2747 StgTRecHeader *trec = tso -> trec;
2748 StgTRecHeader *outer = trec -> enclosing_trec;
2749 debugTrace(DEBUG_stm,
2750 "found CATCH_RETRY_FRAME at %p during raise", p);
2751 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2752 stmAbortTransaction(cap, trec);
2753 stmFreeAbortedTRec(cap, trec);
2754 tso -> trec = outer;
2755 p = next;
2756 continue;
2757 }
2758
2759 default:
2760 p = next;
2761 continue;
2762 }
2763 }
2764 }
2765
2766
2767 /* -----------------------------------------------------------------------------
2768 findRetryFrameHelper
2769
2770 This function is called by the retry# primitive. It traverses the stack
2771 leaving tso->sp referring to the frame which should handle the retry.
2772
2773 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2774 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2775
2776 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2777 create) because retries are not considered to be exceptions, despite the
2778 similar implementation.
2779
2780 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2781 not be created within memory transactions.
2782 -------------------------------------------------------------------------- */
2783
2784 StgWord
2785 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2786 {
2787 StgPtr p, next;
2788 StgRetInfoTable *info;
2789
2790 p = tso->stackobj->sp;
2791 while (1) {
2792 info = get_ret_itbl((StgClosure *)p);
2793 next = p + stack_frame_sizeW((StgClosure *)p);
2794 switch (info->i.type) {
2795
2796 case ATOMICALLY_FRAME:
2797 debugTrace(DEBUG_stm,
2798 "found ATOMICALLY_FRAME at %p during retry", p);
2799 tso->stackobj->sp = p;
2800 return ATOMICALLY_FRAME;
2801
2802 case CATCH_RETRY_FRAME:
2803 debugTrace(DEBUG_stm,
2804 "found CATCH_RETRY_FRAME at %p during retry", p);
2805 tso->stackobj->sp = p;
2806 return CATCH_RETRY_FRAME;
2807
2808 case CATCH_STM_FRAME: {
2809 StgTRecHeader *trec = tso -> trec;
2810 StgTRecHeader *outer = trec -> enclosing_trec;
2811 debugTrace(DEBUG_stm,
2812 "found CATCH_STM_FRAME at %p during retry", p);
2813 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2814 stmAbortTransaction(cap, trec);
2815 stmFreeAbortedTRec(cap, trec);
2816 tso -> trec = outer;
2817 p = next;
2818 continue;
2819 }
2820
2821 case UNDERFLOW_FRAME:
2822 tso->stackobj->sp = p;
2823 threadStackUnderflow(cap,tso);
2824 p = tso->stackobj->sp;
2825 continue;
2826
2827 default:
2828 ASSERT(info->i.type != CATCH_FRAME);
2829 ASSERT(info->i.type != STOP_FRAME);
2830 p = next;
2831 continue;
2832 }
2833 }
2834 }
2835
2836 /* -----------------------------------------------------------------------------
2837 resurrectThreads is called after garbage collection on the list of
2838 threads found to be garbage. Each of these threads will be woken
2839 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2840 on an MVar, or NonTermination if the thread was blocked on a Black
2841 Hole.
2842
2843 Locks: assumes we hold *all* the capabilities.
2844 -------------------------------------------------------------------------- */
2845
2846 void
2847 resurrectThreads (StgTSO *threads)
2848 {
2849 StgTSO *tso, *next;
2850 Capability *cap;
2851 generation *gen;
2852
2853 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2854 next = tso->global_link;
2855
2856 gen = Bdescr((P_)tso)->gen;
2857 tso->global_link = gen->threads;
2858 gen->threads = tso;
2859
2860 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2861
2862 // Wake up the thread on the Capability it was last on
2863 cap = tso->cap;
2864
2865 switch (tso->why_blocked) {
2866 case BlockedOnMVar:
2867 case BlockedOnMVarRead:
2868 /* Called by GC - sched_mutex lock is currently held. */
2869 throwToSingleThreaded(cap, tso,
2870 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2871 break;
2872 case BlockedOnBlackHole:
2873 throwToSingleThreaded(cap, tso,
2874 (StgClosure *)nonTermination_closure);
2875 break;
2876 case BlockedOnSTM:
2877 throwToSingleThreaded(cap, tso,
2878 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2879 break;
2880 case NotBlocked:
2881 /* This might happen if the thread was blocked on a black hole
2882 * belonging to a thread that we've just woken up (raiseAsync
2883 * can wake up threads, remember...).
2884 */
2885 continue;
2886 case BlockedOnMsgThrowTo:
2887 // This can happen if the target is masking, blocks on a
2888 // black hole, and then is found to be unreachable. In
2889 // this case, we want to let the target wake up and carry
2890 // on, and do nothing to this thread.
2891 continue;
2892 default:
2893 barf("resurrectThreads: thread blocked in a strange way: %d",
2894 tso->why_blocked);
2895 }
2896 }
2897 }