Refactor comments about shutdown
[ghc.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "Stable.h"
45
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52
53 #include <string.h>
54 #include <stdlib.h>
55 #include <stdarg.h>
56
57 #ifdef HAVE_ERRNO_H
58 #include <errno.h>
59 #endif
60
61 #ifdef TRACING
62 #include "eventlog/EventLog.h"
63 #endif
64 /* -----------------------------------------------------------------------------
65 * Global variables
66 * -------------------------------------------------------------------------- */
67
68 #if !defined(THREADED_RTS)
69 // Blocked/sleeping thrads
70 StgTSO *blocked_queue_hd = NULL;
71 StgTSO *blocked_queue_tl = NULL;
72 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
73 #endif
74
75 /* Set to true when the latest garbage collection failed to reclaim
76 * enough space, and the runtime should proceed to shut itself down in
77 * an orderly fashion (emitting profiling info etc.)
78 */
79 rtsBool heap_overflow = rtsFalse;
80
81 /* flag that tracks whether we have done any execution in this time slice.
82 * LOCK: currently none, perhaps we should lock (but needs to be
83 * updated in the fast path of the scheduler).
84 *
85 * NB. must be StgWord, we do xchg() on it.
86 */
87 volatile StgWord recent_activity = ACTIVITY_YES;
88
89 /* if this flag is set as well, give up execution
90 * LOCK: none (changes monotonically)
91 */
92 volatile StgWord sched_state = SCHED_RUNNING;
93
94 /*
95 * This mutex protects most of the global scheduler data in
96 * the THREADED_RTS runtime.
97 */
98 #if defined(THREADED_RTS)
99 Mutex sched_mutex;
100 #endif
101
102 #if !defined(mingw32_HOST_OS)
103 #define FORKPROCESS_PRIMOP_SUPPORTED
104 #endif
105
106 // Local stats
107 #ifdef THREADED_RTS
108 static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
109 #endif
110
111 /* -----------------------------------------------------------------------------
112 * static function prototypes
113 * -------------------------------------------------------------------------- */
114
115 static Capability *schedule (Capability *initialCapability, Task *task);
116
117 //
118 // These functions all encapsulate parts of the scheduler loop, and are
119 // abstracted only to make the structure and control flow of the
120 // scheduler clearer.
121 //
122 static void schedulePreLoop (void);
123 static void scheduleFindWork (Capability **pcap);
124 #if defined(THREADED_RTS)
125 static void scheduleYield (Capability **pcap, Task *task);
126 #endif
127 #if defined(THREADED_RTS)
128 static nat requestSync (Capability **pcap, Task *task, nat sync_type);
129 static void acquireAllCapabilities(Capability *cap, Task *task);
130 static void releaseAllCapabilities(nat n, Capability *cap, Task *task);
131 static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
132 #endif
133 static void scheduleStartSignalHandlers (Capability *cap);
134 static void scheduleCheckBlockedThreads (Capability *cap);
135 static void scheduleProcessInbox(Capability **cap);
136 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
137 static void schedulePushWork(Capability *cap, Task *task);
138 #if defined(THREADED_RTS)
139 static void scheduleActivateSpark(Capability *cap);
140 #endif
141 static void schedulePostRunThread(Capability *cap, StgTSO *t);
142 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
143 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
144 nat prev_what_next );
145 static void scheduleHandleThreadBlocked( StgTSO *t );
146 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
147 StgTSO *t );
148 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
149 static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
150
151 static void deleteThread (Capability *cap, StgTSO *tso);
152 static void deleteAllThreads (Capability *cap);
153
154 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
155 static void deleteThread_(Capability *cap, StgTSO *tso);
156 #endif
157
158 /* ---------------------------------------------------------------------------
159 Main scheduling loop.
160
161 We use round-robin scheduling, each thread returning to the
162 scheduler loop when one of these conditions is detected:
163
164 * out of heap space
165 * timer expires (thread yields)
166 * thread blocks
167 * thread ends
168 * stack overflow
169
170 ------------------------------------------------------------------------ */
171
172 static Capability *
173 schedule (Capability *initialCapability, Task *task)
174 {
175 StgTSO *t;
176 Capability *cap;
177 StgThreadReturnCode ret;
178 nat prev_what_next;
179 rtsBool ready_to_gc;
180 #if defined(THREADED_RTS)
181 rtsBool first = rtsTrue;
182 #endif
183
184 cap = initialCapability;
185
186 // Pre-condition: this task owns initialCapability.
187 // The sched_mutex is *NOT* held
188 // NB. on return, we still hold a capability.
189
190 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
191
192 schedulePreLoop();
193
194 // -----------------------------------------------------------
195 // Scheduler loop starts here:
196
197 while (1) {
198
199 // Check whether we have re-entered the RTS from Haskell without
200 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
201 // call).
202 if (cap->in_haskell) {
203 errorBelch("schedule: re-entered unsafely.\n"
204 " Perhaps a 'foreign import unsafe' should be 'safe'?");
205 stg_exit(EXIT_FAILURE);
206 }
207
208 // Note [shutdown]: The interruption / shutdown sequence.
209 //
210 // In order to cleanly shut down the runtime, we want to:
211 // * make sure that all main threads return to their callers
212 // with the state 'Interrupted'.
213 // * clean up all OS threads assocated with the runtime
214 // * free all memory etc.
215 //
216 // So the sequence goes like this:
217 //
218 // * The shutdown sequence is initiated by calling hs_exit(),
219 // interruptStgRts(), or running out of memory in the GC.
220 //
221 // * Set sched_state = SCHED_INTERRUPTING
222 //
223 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
224 // scheduleDoGC(), which halts the whole runtime by acquiring all the
225 // capabilities, does a GC and then calls deleteAllThreads() to kill all
226 // the remaining threads. The zombies are left on the run queue for
227 // cleaning up. We can't kill threads involved in foreign calls.
228 //
229 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
230 //
231 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
232 // enforced by the scheduler, which won't run any Haskell code when
233 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
234 // other capabilities by doing the GC earlier.
235 //
236 // * all workers exit when the run queue on their capability
237 // drains. All main threads will also exit when their TSO
238 // reaches the head of the run queue and they can return.
239 //
240 // * eventually all Capabilities will shut down, and the RTS can
241 // exit.
242 //
243 // * We might be left with threads blocked in foreign calls,
244 // we should really attempt to kill these somehow (TODO).
245
246 switch (sched_state) {
247 case SCHED_RUNNING:
248 break;
249 case SCHED_INTERRUPTING:
250 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
251 /* scheduleDoGC() deletes all the threads */
252 scheduleDoGC(&cap,task,rtsTrue);
253
254 // after scheduleDoGC(), we must be shutting down. Either some
255 // other Capability did the final GC, or we did it above,
256 // either way we can fall through to the SCHED_SHUTTING_DOWN
257 // case now.
258 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
259 // fall through
260
261 case SCHED_SHUTTING_DOWN:
262 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
263 // If we are a worker, just exit. If we're a bound thread
264 // then we will exit below when we've removed our TSO from
265 // the run queue.
266 if (!isBoundTask(task) && emptyRunQueue(cap)) {
267 return cap;
268 }
269 break;
270 default:
271 barf("sched_state: %d", sched_state);
272 }
273
274 scheduleFindWork(&cap);
275
276 /* work pushing, currently relevant only for THREADED_RTS:
277 (pushes threads, wakes up idle capabilities for stealing) */
278 schedulePushWork(cap,task);
279
280 scheduleDetectDeadlock(&cap,task);
281
282 // Normally, the only way we can get here with no threads to
283 // run is if a keyboard interrupt received during
284 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
285 // Additionally, it is not fatal for the
286 // threaded RTS to reach here with no threads to run.
287 //
288 // win32: might be here due to awaitEvent() being abandoned
289 // as a result of a console event having been delivered.
290
291 #if defined(THREADED_RTS)
292 if (first)
293 {
294 // XXX: ToDo
295 // // don't yield the first time, we want a chance to run this
296 // // thread for a bit, even if there are others banging at the
297 // // door.
298 // first = rtsFalse;
299 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
300 }
301
302 scheduleYield(&cap,task);
303
304 if (emptyRunQueue(cap)) continue; // look for work again
305 #endif
306
307 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
308 if ( emptyRunQueue(cap) ) {
309 ASSERT(sched_state >= SCHED_INTERRUPTING);
310 }
311 #endif
312
313 //
314 // Get a thread to run
315 //
316 t = popRunQueue(cap);
317
318 // Sanity check the thread we're about to run. This can be
319 // expensive if there is lots of thread switching going on...
320 IF_DEBUG(sanity,checkTSO(t));
321
322 #if defined(THREADED_RTS)
323 // Check whether we can run this thread in the current task.
324 // If not, we have to pass our capability to the right task.
325 {
326 InCall *bound = t->bound;
327
328 if (bound) {
329 if (bound->task == task) {
330 // yes, the Haskell thread is bound to the current native thread
331 } else {
332 debugTrace(DEBUG_sched,
333 "thread %lu bound to another OS thread",
334 (unsigned long)t->id);
335 // no, bound to a different Haskell thread: pass to that thread
336 pushOnRunQueue(cap,t);
337 continue;
338 }
339 } else {
340 // The thread we want to run is unbound.
341 if (task->incall->tso) {
342 debugTrace(DEBUG_sched,
343 "this OS thread cannot run thread %lu",
344 (unsigned long)t->id);
345 // no, the current native thread is bound to a different
346 // Haskell thread, so pass it to any worker thread
347 pushOnRunQueue(cap,t);
348 continue;
349 }
350 }
351 }
352 #endif
353
354 // If we're shutting down, and this thread has not yet been
355 // killed, kill it now. This sometimes happens when a finalizer
356 // thread is created by the final GC, or a thread previously
357 // in a foreign call returns.
358 if (sched_state >= SCHED_INTERRUPTING &&
359 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
360 deleteThread(cap,t);
361 }
362
363 // If this capability is disabled, migrate the thread away rather
364 // than running it. NB. but not if the thread is bound: it is
365 // really hard for a bound thread to migrate itself. Believe me,
366 // I tried several ways and couldn't find a way to do it.
367 // Instead, when everything is stopped for GC, we migrate all the
368 // threads on the run queue then (see scheduleDoGC()).
369 //
370 // ToDo: what about TSO_LOCKED? Currently we're migrating those
371 // when the number of capabilities drops, but we never migrate
372 // them back if it rises again. Presumably we should, but after
373 // the thread has been migrated we no longer know what capability
374 // it was originally on.
375 #ifdef THREADED_RTS
376 if (cap->disabled && !t->bound) {
377 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
378 migrateThread(cap, t, dest_cap);
379 continue;
380 }
381 #endif
382
383 /* context switches are initiated by the timer signal, unless
384 * the user specified "context switch as often as possible", with
385 * +RTS -C0
386 */
387 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
388 && !emptyThreadQueues(cap)) {
389 cap->context_switch = 1;
390 }
391
392 run_thread:
393
394 // CurrentTSO is the thread to run. It might be different if we
395 // loop back to run_thread, so make sure to set CurrentTSO after
396 // that.
397 cap->r.rCurrentTSO = t;
398
399 startHeapProfTimer();
400
401 // ----------------------------------------------------------------------
402 // Run the current thread
403
404 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
405 ASSERT(t->cap == cap);
406 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
407
408 prev_what_next = t->what_next;
409
410 errno = t->saved_errno;
411 #if mingw32_HOST_OS
412 SetLastError(t->saved_winerror);
413 #endif
414
415 // reset the interrupt flag before running Haskell code
416 cap->interrupt = 0;
417
418 cap->in_haskell = rtsTrue;
419 cap->idle = 0;
420
421 dirty_TSO(cap,t);
422 dirty_STACK(cap,t->stackobj);
423
424 switch (recent_activity)
425 {
426 case ACTIVITY_DONE_GC: {
427 // ACTIVITY_DONE_GC means we turned off the timer signal to
428 // conserve power (see #1623). Re-enable it here.
429 nat prev;
430 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
431 if (prev == ACTIVITY_DONE_GC) {
432 #ifndef PROFILING
433 startTimer();
434 #endif
435 }
436 break;
437 }
438 case ACTIVITY_INACTIVE:
439 // If we reached ACTIVITY_INACTIVE, then don't reset it until
440 // we've done the GC. The thread running here might just be
441 // the IO manager thread that handle_tick() woke up via
442 // wakeUpRts().
443 break;
444 default:
445 recent_activity = ACTIVITY_YES;
446 }
447
448 traceEventRunThread(cap, t);
449
450 switch (prev_what_next) {
451
452 case ThreadKilled:
453 case ThreadComplete:
454 /* Thread already finished, return to scheduler. */
455 ret = ThreadFinished;
456 break;
457
458 case ThreadRunGHC:
459 {
460 StgRegTable *r;
461 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
462 cap = regTableToCapability(r);
463 ret = r->rRet;
464 break;
465 }
466
467 case ThreadInterpret:
468 cap = interpretBCO(cap);
469 ret = cap->r.rRet;
470 break;
471
472 default:
473 barf("schedule: invalid what_next field");
474 }
475
476 cap->in_haskell = rtsFalse;
477
478 // The TSO might have moved, eg. if it re-entered the RTS and a GC
479 // happened. So find the new location:
480 t = cap->r.rCurrentTSO;
481
482 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
483 // don't want it set when not running a Haskell thread.
484 cap->r.rCurrentTSO = NULL;
485
486 // And save the current errno in this thread.
487 // XXX: possibly bogus for SMP because this thread might already
488 // be running again, see code below.
489 t->saved_errno = errno;
490 #if mingw32_HOST_OS
491 // Similarly for Windows error code
492 t->saved_winerror = GetLastError();
493 #endif
494
495 if (ret == ThreadBlocked) {
496 if (t->why_blocked == BlockedOnBlackHole) {
497 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
498 traceEventStopThread(cap, t, t->why_blocked + 6,
499 owner != NULL ? owner->id : 0);
500 } else {
501 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
502 }
503 } else {
504 traceEventStopThread(cap, t, ret, 0);
505 }
506
507 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
508 ASSERT(t->cap == cap);
509
510 // ----------------------------------------------------------------------
511
512 // Costs for the scheduler are assigned to CCS_SYSTEM
513 stopHeapProfTimer();
514 #if defined(PROFILING)
515 cap->r.rCCCS = CCS_SYSTEM;
516 #endif
517
518 schedulePostRunThread(cap,t);
519
520 ready_to_gc = rtsFalse;
521
522 switch (ret) {
523 case HeapOverflow:
524 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
525 break;
526
527 case StackOverflow:
528 // just adjust the stack for this thread, then pop it back
529 // on the run queue.
530 threadStackOverflow(cap, t);
531 pushOnRunQueue(cap,t);
532 break;
533
534 case ThreadYielding:
535 if (scheduleHandleYield(cap, t, prev_what_next)) {
536 // shortcut for switching between compiler/interpreter:
537 goto run_thread;
538 }
539 break;
540
541 case ThreadBlocked:
542 scheduleHandleThreadBlocked(t);
543 break;
544
545 case ThreadFinished:
546 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
547 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
548 break;
549
550 default:
551 barf("schedule: invalid thread return code %d", (int)ret);
552 }
553
554 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
555 scheduleDoGC(&cap,task,rtsFalse);
556 }
557 } /* end of while() */
558 }
559
560 /* -----------------------------------------------------------------------------
561 * Run queue operations
562 * -------------------------------------------------------------------------- */
563
564 static void
565 removeFromRunQueue (Capability *cap, StgTSO *tso)
566 {
567 if (tso->block_info.prev == END_TSO_QUEUE) {
568 ASSERT(cap->run_queue_hd == tso);
569 cap->run_queue_hd = tso->_link;
570 } else {
571 setTSOLink(cap, tso->block_info.prev, tso->_link);
572 }
573 if (tso->_link == END_TSO_QUEUE) {
574 ASSERT(cap->run_queue_tl == tso);
575 cap->run_queue_tl = tso->block_info.prev;
576 } else {
577 setTSOPrev(cap, tso->_link, tso->block_info.prev);
578 }
579 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
580
581 IF_DEBUG(sanity, checkRunQueue(cap));
582 }
583
584 void
585 promoteInRunQueue (Capability *cap, StgTSO *tso)
586 {
587 removeFromRunQueue(cap, tso);
588 pushOnRunQueue(cap, tso);
589 }
590
591 /* ----------------------------------------------------------------------------
592 * Setting up the scheduler loop
593 * ------------------------------------------------------------------------- */
594
595 static void
596 schedulePreLoop(void)
597 {
598 // initialisation for scheduler - what cannot go into initScheduler()
599
600 #if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
601 win32AllocStack();
602 #endif
603 }
604
605 /* -----------------------------------------------------------------------------
606 * scheduleFindWork()
607 *
608 * Search for work to do, and handle messages from elsewhere.
609 * -------------------------------------------------------------------------- */
610
611 static void
612 scheduleFindWork (Capability **pcap)
613 {
614 scheduleStartSignalHandlers(*pcap);
615
616 scheduleProcessInbox(pcap);
617
618 scheduleCheckBlockedThreads(*pcap);
619
620 #if defined(THREADED_RTS)
621 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
622 #endif
623 }
624
625 #if defined(THREADED_RTS)
626 STATIC_INLINE rtsBool
627 shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
628 {
629 // we need to yield this capability to someone else if..
630 // - another thread is initiating a GC, and we didn't just do a GC
631 // (see Note [GC livelock])
632 // - another Task is returning from a foreign call
633 // - the thread at the head of the run queue cannot be run
634 // by this Task (it is bound to another Task, or it is unbound
635 // and this task it bound).
636 //
637 // Note [GC livelock]
638 //
639 // If we are interrupted to do a GC, then we do not immediately do
640 // another one. This avoids a starvation situation where one
641 // Capability keeps forcing a GC and the other Capabilities make no
642 // progress at all.
643
644 return ((pending_sync && !didGcLast) ||
645 cap->returning_tasks_hd != NULL ||
646 (!emptyRunQueue(cap) && (task->incall->tso == NULL
647 ? peekRunQueue(cap)->bound != NULL
648 : peekRunQueue(cap)->bound != task->incall)));
649 }
650
651 // This is the single place where a Task goes to sleep. There are
652 // two reasons it might need to sleep:
653 // - there are no threads to run
654 // - we need to yield this Capability to someone else
655 // (see shouldYieldCapability())
656 //
657 // Careful: the scheduler loop is quite delicate. Make sure you run
658 // the tests in testsuite/concurrent (all ways) after modifying this,
659 // and also check the benchmarks in nofib/parallel for regressions.
660
661 static void
662 scheduleYield (Capability **pcap, Task *task)
663 {
664 Capability *cap = *pcap;
665 int didGcLast = rtsFalse;
666
667 // if we have work, and we don't need to give up the Capability, continue.
668 //
669 if (!shouldYieldCapability(cap,task,rtsFalse) &&
670 (!emptyRunQueue(cap) ||
671 !emptyInbox(cap) ||
672 sched_state >= SCHED_INTERRUPTING)) {
673 return;
674 }
675
676 // otherwise yield (sleep), and keep yielding if necessary.
677 do {
678 didGcLast = yieldCapability(&cap,task, !didGcLast);
679 }
680 while (shouldYieldCapability(cap,task,didGcLast));
681
682 // note there may still be no threads on the run queue at this
683 // point, the caller has to check.
684
685 *pcap = cap;
686 return;
687 }
688 #endif
689
690 /* -----------------------------------------------------------------------------
691 * schedulePushWork()
692 *
693 * Push work to other Capabilities if we have some.
694 * -------------------------------------------------------------------------- */
695
696 static void
697 schedulePushWork(Capability *cap USED_IF_THREADS,
698 Task *task USED_IF_THREADS)
699 {
700 /* following code not for PARALLEL_HASKELL. I kept the call general,
701 future GUM versions might use pushing in a distributed setup */
702 #if defined(THREADED_RTS)
703
704 Capability *free_caps[n_capabilities], *cap0;
705 nat i, n_free_caps;
706
707 // migration can be turned off with +RTS -qm
708 if (!RtsFlags.ParFlags.migrate) return;
709
710 // Check whether we have more threads on our run queue, or sparks
711 // in our pool, that we could hand to another Capability.
712 if (emptyRunQueue(cap)) {
713 if (sparkPoolSizeCap(cap) < 2) return;
714 } else {
715 if (singletonRunQueue(cap) &&
716 sparkPoolSizeCap(cap) < 1) return;
717 }
718
719 // First grab as many free Capabilities as we can.
720 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
721 cap0 = capabilities[i];
722 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
723 if (!emptyRunQueue(cap0)
724 || cap0->returning_tasks_hd != NULL
725 || cap0->inbox != (Message*)END_TSO_QUEUE) {
726 // it already has some work, we just grabbed it at
727 // the wrong moment. Or maybe it's deadlocked!
728 releaseCapability(cap0);
729 } else {
730 free_caps[n_free_caps++] = cap0;
731 }
732 }
733 }
734
735 // we now have n_free_caps free capabilities stashed in
736 // free_caps[]. Share our run queue equally with them. This is
737 // probably the simplest thing we could do; improvements we might
738 // want to do include:
739 //
740 // - giving high priority to moving relatively new threads, on
741 // the gournds that they haven't had time to build up a
742 // working set in the cache on this CPU/Capability.
743 //
744 // - giving low priority to moving long-lived threads
745
746 if (n_free_caps > 0) {
747 StgTSO *prev, *t, *next;
748 #ifdef SPARK_PUSHING
749 rtsBool pushed_to_all;
750 #endif
751
752 debugTrace(DEBUG_sched,
753 "cap %d: %s and %d free capabilities, sharing...",
754 cap->no,
755 (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
756 "excess threads on run queue":"sparks to share (>=2)",
757 n_free_caps);
758
759 i = 0;
760 #ifdef SPARK_PUSHING
761 pushed_to_all = rtsFalse;
762 #endif
763
764 if (cap->run_queue_hd != END_TSO_QUEUE) {
765 prev = cap->run_queue_hd;
766 t = prev->_link;
767 prev->_link = END_TSO_QUEUE;
768 for (; t != END_TSO_QUEUE; t = next) {
769 next = t->_link;
770 t->_link = END_TSO_QUEUE;
771 if (t->bound == task->incall // don't move my bound thread
772 || tsoLocked(t)) { // don't move a locked thread
773 setTSOLink(cap, prev, t);
774 setTSOPrev(cap, t, prev);
775 prev = t;
776 } else if (i == n_free_caps) {
777 #ifdef SPARK_PUSHING
778 pushed_to_all = rtsTrue;
779 #endif
780 i = 0;
781 // keep one for us
782 setTSOLink(cap, prev, t);
783 setTSOPrev(cap, t, prev);
784 prev = t;
785 } else {
786 appendToRunQueue(free_caps[i],t);
787
788 traceEventMigrateThread (cap, t, free_caps[i]->no);
789
790 if (t->bound) { t->bound->task->cap = free_caps[i]; }
791 t->cap = free_caps[i];
792 i++;
793 }
794 }
795 cap->run_queue_tl = prev;
796
797 IF_DEBUG(sanity, checkRunQueue(cap));
798 }
799
800 #ifdef SPARK_PUSHING
801 /* JB I left this code in place, it would work but is not necessary */
802
803 // If there are some free capabilities that we didn't push any
804 // threads to, then try to push a spark to each one.
805 if (!pushed_to_all) {
806 StgClosure *spark;
807 // i is the next free capability to push to
808 for (; i < n_free_caps; i++) {
809 if (emptySparkPoolCap(free_caps[i])) {
810 spark = tryStealSpark(cap->sparks);
811 if (spark != NULL) {
812 /* TODO: if anyone wants to re-enable this code then
813 * they must consider the fizzledSpark(spark) case
814 * and update the per-cap spark statistics.
815 */
816 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
817
818 traceEventStealSpark(free_caps[i], t, cap->no);
819
820 newSpark(&(free_caps[i]->r), spark);
821 }
822 }
823 }
824 }
825 #endif /* SPARK_PUSHING */
826
827 // release the capabilities
828 for (i = 0; i < n_free_caps; i++) {
829 task->cap = free_caps[i];
830 releaseAndWakeupCapability(free_caps[i]);
831 }
832 }
833 task->cap = cap; // reset to point to our Capability.
834
835 #endif /* THREADED_RTS */
836
837 }
838
839 /* ----------------------------------------------------------------------------
840 * Start any pending signal handlers
841 * ------------------------------------------------------------------------- */
842
843 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
844 static void
845 scheduleStartSignalHandlers(Capability *cap)
846 {
847 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
848 // safe outside the lock
849 startSignalHandlers(cap);
850 }
851 }
852 #else
853 static void
854 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
855 {
856 }
857 #endif
858
859 /* ----------------------------------------------------------------------------
860 * Check for blocked threads that can be woken up.
861 * ------------------------------------------------------------------------- */
862
863 static void
864 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
865 {
866 #if !defined(THREADED_RTS)
867 //
868 // Check whether any waiting threads need to be woken up. If the
869 // run queue is empty, and there are no other tasks running, we
870 // can wait indefinitely for something to happen.
871 //
872 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
873 {
874 awaitEvent (emptyRunQueue(cap));
875 }
876 #endif
877 }
878
879 /* ----------------------------------------------------------------------------
880 * Detect deadlock conditions and attempt to resolve them.
881 * ------------------------------------------------------------------------- */
882
883 static void
884 scheduleDetectDeadlock (Capability **pcap, Task *task)
885 {
886 Capability *cap = *pcap;
887 /*
888 * Detect deadlock: when we have no threads to run, there are no
889 * threads blocked, waiting for I/O, or sleeping, and all the
890 * other tasks are waiting for work, we must have a deadlock of
891 * some description.
892 */
893 if ( emptyThreadQueues(cap) )
894 {
895 #if defined(THREADED_RTS)
896 /*
897 * In the threaded RTS, we only check for deadlock if there
898 * has been no activity in a complete timeslice. This means
899 * we won't eagerly start a full GC just because we don't have
900 * any threads to run currently.
901 */
902 if (recent_activity != ACTIVITY_INACTIVE) return;
903 #endif
904
905 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
906
907 // Garbage collection can release some new threads due to
908 // either (a) finalizers or (b) threads resurrected because
909 // they are unreachable and will therefore be sent an
910 // exception. Any threads thus released will be immediately
911 // runnable.
912 scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
913 cap = *pcap;
914 // when force_major == rtsTrue. scheduleDoGC sets
915 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
916 // signal.
917
918 if ( !emptyRunQueue(cap) ) return;
919
920 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
921 /* If we have user-installed signal handlers, then wait
922 * for signals to arrive rather then bombing out with a
923 * deadlock.
924 */
925 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
926 debugTrace(DEBUG_sched,
927 "still deadlocked, waiting for signals...");
928
929 awaitUserSignals();
930
931 if (signals_pending()) {
932 startSignalHandlers(cap);
933 }
934
935 // either we have threads to run, or we were interrupted:
936 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
937
938 return;
939 }
940 #endif
941
942 #if !defined(THREADED_RTS)
943 /* Probably a real deadlock. Send the current main thread the
944 * Deadlock exception.
945 */
946 if (task->incall->tso) {
947 switch (task->incall->tso->why_blocked) {
948 case BlockedOnSTM:
949 case BlockedOnBlackHole:
950 case BlockedOnMsgThrowTo:
951 case BlockedOnMVar:
952 case BlockedOnMVarRead:
953 throwToSingleThreaded(cap, task->incall->tso,
954 (StgClosure *)nonTermination_closure);
955 return;
956 default:
957 barf("deadlock: main thread blocked in a strange way");
958 }
959 }
960 return;
961 #endif
962 }
963 }
964
965
966 /* ----------------------------------------------------------------------------
967 * Process message in the current Capability's inbox
968 * ------------------------------------------------------------------------- */
969
970 static void
971 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
972 {
973 #if defined(THREADED_RTS)
974 Message *m, *next;
975 int r;
976 Capability *cap = *pcap;
977
978 while (!emptyInbox(cap)) {
979 if (cap->r.rCurrentNursery->link == NULL ||
980 g0->n_new_large_words >= large_alloc_lim) {
981 scheduleDoGC(pcap, cap->running_task, rtsFalse);
982 cap = *pcap;
983 }
984
985 // don't use a blocking acquire; if the lock is held by
986 // another thread then just carry on. This seems to avoid
987 // getting stuck in a message ping-pong situation with other
988 // processors. We'll check the inbox again later anyway.
989 //
990 // We should really use a more efficient queue data structure
991 // here. The trickiness is that we must ensure a Capability
992 // never goes idle if the inbox is non-empty, which is why we
993 // use cap->lock (cap->lock is released as the last thing
994 // before going idle; see Capability.c:releaseCapability()).
995 r = TRY_ACQUIRE_LOCK(&cap->lock);
996 if (r != 0) return;
997
998 m = cap->inbox;
999 cap->inbox = (Message*)END_TSO_QUEUE;
1000
1001 RELEASE_LOCK(&cap->lock);
1002
1003 while (m != (Message*)END_TSO_QUEUE) {
1004 next = m->link;
1005 executeMessage(cap, m);
1006 m = next;
1007 }
1008 }
1009 #endif
1010 }
1011
1012 /* ----------------------------------------------------------------------------
1013 * Activate spark threads (THREADED_RTS)
1014 * ------------------------------------------------------------------------- */
1015
1016 #if defined(THREADED_RTS)
1017 static void
1018 scheduleActivateSpark(Capability *cap)
1019 {
1020 if (anySparks() && !cap->disabled)
1021 {
1022 createSparkThread(cap);
1023 debugTrace(DEBUG_sched, "creating a spark thread");
1024 }
1025 }
1026 #endif // THREADED_RTS
1027
1028 /* ----------------------------------------------------------------------------
1029 * After running a thread...
1030 * ------------------------------------------------------------------------- */
1031
1032 static void
1033 schedulePostRunThread (Capability *cap, StgTSO *t)
1034 {
1035 // We have to be able to catch transactions that are in an
1036 // infinite loop as a result of seeing an inconsistent view of
1037 // memory, e.g.
1038 //
1039 // atomically $ do
1040 // [a,b] <- mapM readTVar [ta,tb]
1041 // when (a == b) loop
1042 //
1043 // and a is never equal to b given a consistent view of memory.
1044 //
1045 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1046 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1047 debugTrace(DEBUG_sched | DEBUG_stm,
1048 "trec %p found wasting its time", t);
1049
1050 // strip the stack back to the
1051 // ATOMICALLY_FRAME, aborting the (nested)
1052 // transaction, and saving the stack of any
1053 // partially-evaluated thunks on the heap.
1054 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1055
1056 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1057 }
1058 }
1059
1060 //
1061 // If the current thread's allocation limit has run out, send it
1062 // the AllocationLimitExceeded exception.
1063
1064 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1065 // Use a throwToSelf rather than a throwToSingleThreaded, because
1066 // it correctly handles the case where the thread is currently
1067 // inside mask. Also the thread might be blocked (e.g. on an
1068 // MVar), and throwToSingleThreaded doesn't unblock it
1069 // correctly in that case.
1070 throwToSelf(cap, t, allocationLimitExceeded_closure);
1071 ASSIGN_Int64((W_*)&(t->alloc_limit),
1072 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1073 }
1074
1075 /* some statistics gathering in the parallel case */
1076 }
1077
1078 /* -----------------------------------------------------------------------------
1079 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1080 * -------------------------------------------------------------------------- */
1081
1082 static rtsBool
1083 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1084 {
1085 if (cap->r.rHpLim == NULL || cap->context_switch) {
1086 // Sometimes we miss a context switch, e.g. when calling
1087 // primitives in a tight loop, MAYBE_GC() doesn't check the
1088 // context switch flag, and we end up waiting for a GC.
1089 // See #1984, and concurrent/should_run/1984
1090 cap->context_switch = 0;
1091 appendToRunQueue(cap,t);
1092 } else {
1093 pushOnRunQueue(cap,t);
1094 }
1095
1096 // did the task ask for a large block?
1097 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1098 // if so, get one and push it on the front of the nursery.
1099 bdescr *bd;
1100 W_ blocks;
1101
1102 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1103
1104 if (blocks > BLOCKS_PER_MBLOCK) {
1105 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1106 }
1107
1108 debugTrace(DEBUG_sched,
1109 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1110 (long)t->id, what_next_strs[t->what_next], blocks);
1111
1112 // don't do this if the nursery is (nearly) full, we'll GC first.
1113 if (cap->r.rCurrentNursery->link != NULL ||
1114 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1115 // infinite loop if the
1116 // nursery has only one
1117 // block.
1118
1119 bd = allocGroup_lock(blocks);
1120 cap->r.rNursery->n_blocks += blocks;
1121
1122 // link the new group after CurrentNursery
1123 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1124
1125 // initialise it as a nursery block. We initialise the
1126 // step, gen_no, and flags field of *every* sub-block in
1127 // this large block, because this is easier than making
1128 // sure that we always find the block head of a large
1129 // block whenever we call Bdescr() (eg. evacuate() and
1130 // isAlive() in the GC would both have to do this, at
1131 // least).
1132 {
1133 bdescr *x;
1134 for (x = bd; x < bd + blocks; x++) {
1135 initBdescr(x,g0,g0);
1136 x->free = x->start;
1137 x->flags = 0;
1138 }
1139 }
1140
1141 // This assert can be a killer if the app is doing lots
1142 // of large block allocations.
1143 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1144
1145 // now update the nursery to point to the new block
1146 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1147 cap->r.rCurrentNursery = bd;
1148
1149 // we might be unlucky and have another thread get on the
1150 // run queue before us and steal the large block, but in that
1151 // case the thread will just end up requesting another large
1152 // block.
1153 return rtsFalse; /* not actually GC'ing */
1154 }
1155 }
1156
1157 // if we got here because we exceeded large_alloc_lim, then
1158 // proceed straight to GC.
1159 if (g0->n_new_large_words >= large_alloc_lim) {
1160 return rtsTrue;
1161 }
1162
1163 // Otherwise, we just ran out of space in the current nursery.
1164 // Grab another nursery if we can.
1165 if (getNewNursery(cap)) {
1166 debugTrace(DEBUG_sched, "thread %ld got a new nursery", t->id);
1167 return rtsFalse;
1168 }
1169
1170 return rtsTrue;
1171 /* actual GC is done at the end of the while loop in schedule() */
1172 }
1173
1174 /* -----------------------------------------------------------------------------
1175 * Handle a thread that returned to the scheduler with ThreadYielding
1176 * -------------------------------------------------------------------------- */
1177
1178 static rtsBool
1179 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1180 {
1181 /* put the thread back on the run queue. Then, if we're ready to
1182 * GC, check whether this is the last task to stop. If so, wake
1183 * up the GC thread. getThread will block during a GC until the
1184 * GC is finished.
1185 */
1186
1187 ASSERT(t->_link == END_TSO_QUEUE);
1188
1189 // Shortcut if we're just switching evaluators: don't bother
1190 // doing stack squeezing (which can be expensive), just run the
1191 // thread.
1192 if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1193 debugTrace(DEBUG_sched,
1194 "--<< thread %ld (%s) stopped to switch evaluators",
1195 (long)t->id, what_next_strs[t->what_next]);
1196 return rtsTrue;
1197 }
1198
1199 // Reset the context switch flag. We don't do this just before
1200 // running the thread, because that would mean we would lose ticks
1201 // during GC, which can lead to unfair scheduling (a thread hogs
1202 // the CPU because the tick always arrives during GC). This way
1203 // penalises threads that do a lot of allocation, but that seems
1204 // better than the alternative.
1205 if (cap->context_switch != 0) {
1206 cap->context_switch = 0;
1207 appendToRunQueue(cap,t);
1208 } else {
1209 pushOnRunQueue(cap,t);
1210 }
1211
1212 IF_DEBUG(sanity,
1213 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1214 checkTSO(t));
1215
1216 return rtsFalse;
1217 }
1218
1219 /* -----------------------------------------------------------------------------
1220 * Handle a thread that returned to the scheduler with ThreadBlocked
1221 * -------------------------------------------------------------------------- */
1222
1223 static void
1224 scheduleHandleThreadBlocked( StgTSO *t
1225 #if !defined(DEBUG)
1226 STG_UNUSED
1227 #endif
1228 )
1229 {
1230
1231 // We don't need to do anything. The thread is blocked, and it
1232 // has tidied up its stack and placed itself on whatever queue
1233 // it needs to be on.
1234
1235 // ASSERT(t->why_blocked != NotBlocked);
1236 // Not true: for example,
1237 // - the thread may have woken itself up already, because
1238 // threadPaused() might have raised a blocked throwTo
1239 // exception, see maybePerformBlockedException().
1240
1241 #ifdef DEBUG
1242 traceThreadStatus(DEBUG_sched, t);
1243 #endif
1244 }
1245
1246 /* -----------------------------------------------------------------------------
1247 * Handle a thread that returned to the scheduler with ThreadFinished
1248 * -------------------------------------------------------------------------- */
1249
1250 static rtsBool
1251 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1252 {
1253 /* Need to check whether this was a main thread, and if so,
1254 * return with the return value.
1255 *
1256 * We also end up here if the thread kills itself with an
1257 * uncaught exception, see Exception.cmm.
1258 */
1259
1260 // blocked exceptions can now complete, even if the thread was in
1261 // blocked mode (see #2910).
1262 awakenBlockedExceptionQueue (cap, t);
1263
1264 //
1265 // Check whether the thread that just completed was a bound
1266 // thread, and if so return with the result.
1267 //
1268 // There is an assumption here that all thread completion goes
1269 // through this point; we need to make sure that if a thread
1270 // ends up in the ThreadKilled state, that it stays on the run
1271 // queue so it can be dealt with here.
1272 //
1273
1274 if (t->bound) {
1275
1276 if (t->bound != task->incall) {
1277 #if !defined(THREADED_RTS)
1278 // Must be a bound thread that is not the topmost one. Leave
1279 // it on the run queue until the stack has unwound to the
1280 // point where we can deal with this. Leaving it on the run
1281 // queue also ensures that the garbage collector knows about
1282 // this thread and its return value (it gets dropped from the
1283 // step->threads list so there's no other way to find it).
1284 appendToRunQueue(cap,t);
1285 return rtsFalse;
1286 #else
1287 // this cannot happen in the threaded RTS, because a
1288 // bound thread can only be run by the appropriate Task.
1289 barf("finished bound thread that isn't mine");
1290 #endif
1291 }
1292
1293 ASSERT(task->incall->tso == t);
1294
1295 if (t->what_next == ThreadComplete) {
1296 if (task->incall->ret) {
1297 // NOTE: return val is stack->sp[1] (see StgStartup.hc)
1298 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1299 }
1300 task->incall->rstat = Success;
1301 } else {
1302 if (task->incall->ret) {
1303 *(task->incall->ret) = NULL;
1304 }
1305 if (sched_state >= SCHED_INTERRUPTING) {
1306 if (heap_overflow) {
1307 task->incall->rstat = HeapExhausted;
1308 } else {
1309 task->incall->rstat = Interrupted;
1310 }
1311 } else {
1312 task->incall->rstat = Killed;
1313 }
1314 }
1315 #ifdef DEBUG
1316 removeThreadLabel((StgWord)task->incall->tso->id);
1317 #endif
1318
1319 // We no longer consider this thread and task to be bound to
1320 // each other. The TSO lives on until it is GC'd, but the
1321 // task is about to be released by the caller, and we don't
1322 // want anyone following the pointer from the TSO to the
1323 // defunct task (which might have already been
1324 // re-used). This was a real bug: the GC updated
1325 // tso->bound->tso which lead to a deadlock.
1326 t->bound = NULL;
1327 task->incall->tso = NULL;
1328
1329 return rtsTrue; // tells schedule() to return
1330 }
1331
1332 return rtsFalse;
1333 }
1334
1335 /* -----------------------------------------------------------------------------
1336 * Perform a heap census
1337 * -------------------------------------------------------------------------- */
1338
1339 static rtsBool
1340 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1341 {
1342 // When we have +RTS -i0 and we're heap profiling, do a census at
1343 // every GC. This lets us get repeatable runs for debugging.
1344 if (performHeapProfile ||
1345 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1346 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1347 return rtsTrue;
1348 } else {
1349 return rtsFalse;
1350 }
1351 }
1352
1353 /* -----------------------------------------------------------------------------
1354 * Start a synchronisation of all capabilities
1355 * -------------------------------------------------------------------------- */
1356
1357 // Returns:
1358 // 0 if we successfully got a sync
1359 // non-0 if there was another sync request in progress,
1360 // and we yielded to it. The value returned is the
1361 // type of the other sync request.
1362 //
1363 #if defined(THREADED_RTS)
1364 static nat requestSync (Capability **pcap, Task *task, nat sync_type)
1365 {
1366 nat prev_pending_sync;
1367
1368 prev_pending_sync = cas(&pending_sync, 0, sync_type);
1369
1370 if (prev_pending_sync)
1371 {
1372 do {
1373 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1374 prev_pending_sync);
1375 ASSERT(*pcap);
1376 yieldCapability(pcap,task,rtsTrue);
1377 } while (pending_sync);
1378 return prev_pending_sync; // NOTE: task->cap might have changed now
1379 }
1380 else
1381 {
1382 return 0;
1383 }
1384 }
1385
1386 //
1387 // Grab all the capabilities except the one we already hold. Used
1388 // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
1389 // before a fork (SYNC_OTHER).
1390 //
1391 // Only call this after requestSync(), otherwise a deadlock might
1392 // ensue if another thread is trying to synchronise.
1393 //
1394 static void acquireAllCapabilities(Capability *cap, Task *task)
1395 {
1396 Capability *tmpcap;
1397 nat i;
1398
1399 for (i=0; i < n_capabilities; i++) {
1400 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
1401 tmpcap = capabilities[i];
1402 if (tmpcap != cap) {
1403 // we better hope this task doesn't get migrated to
1404 // another Capability while we're waiting for this one.
1405 // It won't, because load balancing happens while we have
1406 // all the Capabilities, but even so it's a slightly
1407 // unsavoury invariant.
1408 task->cap = tmpcap;
1409 waitForCapability(&tmpcap, task);
1410 if (tmpcap->no != i) {
1411 barf("acquireAllCapabilities: got the wrong capability");
1412 }
1413 }
1414 }
1415 task->cap = cap;
1416 }
1417
1418 static void releaseAllCapabilities(nat n, Capability *cap, Task *task)
1419 {
1420 nat i;
1421
1422 for (i = 0; i < n; i++) {
1423 if (cap->no != i) {
1424 task->cap = capabilities[i];
1425 releaseCapability(capabilities[i]);
1426 }
1427 }
1428 task->cap = cap;
1429 }
1430 #endif
1431
1432 /* -----------------------------------------------------------------------------
1433 * Perform a garbage collection if necessary
1434 * -------------------------------------------------------------------------- */
1435
1436 static void
1437 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1438 rtsBool force_major)
1439 {
1440 Capability *cap = *pcap;
1441 rtsBool heap_census;
1442 nat collect_gen;
1443 rtsBool major_gc;
1444 #ifdef THREADED_RTS
1445 nat gc_type;
1446 nat i, sync;
1447 StgTSO *tso;
1448 #endif
1449
1450 if (sched_state == SCHED_SHUTTING_DOWN) {
1451 // The final GC has already been done, and the system is
1452 // shutting down. We'll probably deadlock if we try to GC
1453 // now.
1454 return;
1455 }
1456
1457 heap_census = scheduleNeedHeapProfile(rtsTrue);
1458
1459 // Figure out which generation we are collecting, so that we can
1460 // decide whether this is a parallel GC or not.
1461 collect_gen = calcNeeded(force_major || heap_census, NULL);
1462 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1463
1464 #ifdef THREADED_RTS
1465 if (sched_state < SCHED_INTERRUPTING
1466 && RtsFlags.ParFlags.parGcEnabled
1467 && collect_gen >= RtsFlags.ParFlags.parGcGen
1468 && ! oldest_gen->mark)
1469 {
1470 gc_type = SYNC_GC_PAR;
1471 } else {
1472 gc_type = SYNC_GC_SEQ;
1473 }
1474
1475 // In order to GC, there must be no threads running Haskell code.
1476 // Therefore, the GC thread needs to hold *all* the capabilities,
1477 // and release them after the GC has completed.
1478 //
1479 // This seems to be the simplest way: previous attempts involved
1480 // making all the threads with capabilities give up their
1481 // capabilities and sleep except for the *last* one, which
1482 // actually did the GC. But it's quite hard to arrange for all
1483 // the other tasks to sleep and stay asleep.
1484 //
1485
1486 /* Other capabilities are prevented from running yet more Haskell
1487 threads if pending_sync is set. Tested inside
1488 yieldCapability() and releaseCapability() in Capability.c */
1489
1490 do {
1491 sync = requestSync(pcap, task, gc_type);
1492 cap = *pcap;
1493 if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
1494 // someone else had a pending sync request for a GC, so
1495 // let's assume GC has been done and we don't need to GC
1496 // again.
1497 return;
1498 }
1499 if (sched_state == SCHED_SHUTTING_DOWN) {
1500 // The scheduler might now be shutting down. We tested
1501 // this above, but it might have become true since then as
1502 // we yielded the capability in requestSync().
1503 return;
1504 }
1505 } while (sync);
1506
1507 // don't declare this until after we have sync'd, because
1508 // n_capabilities may change.
1509 rtsBool idle_cap[n_capabilities];
1510 #ifdef DEBUG
1511 unsigned int old_n_capabilities = n_capabilities;
1512 #endif
1513
1514 interruptAllCapabilities();
1515
1516 // The final shutdown GC is always single-threaded, because it's
1517 // possible that some of the Capabilities have no worker threads.
1518
1519 if (gc_type == SYNC_GC_SEQ)
1520 {
1521 traceEventRequestSeqGc(cap);
1522 }
1523 else
1524 {
1525 traceEventRequestParGc(cap);
1526 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1527 }
1528
1529 if (gc_type == SYNC_GC_SEQ)
1530 {
1531 // single-threaded GC: grab all the capabilities
1532 acquireAllCapabilities(cap,task);
1533 }
1534 else
1535 {
1536 // If we are load-balancing collections in this
1537 // generation, then we require all GC threads to participate
1538 // in the collection. Otherwise, we only require active
1539 // threads to participate, and we set gc_threads[i]->idle for
1540 // any idle capabilities. The rationale here is that waking
1541 // up an idle Capability takes much longer than just doing any
1542 // GC work on its behalf.
1543
1544 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1545 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1546 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
1547 for (i=0; i < n_capabilities; i++) {
1548 if (capabilities[i]->disabled) {
1549 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1550 } else {
1551 idle_cap[i] = rtsFalse;
1552 }
1553 }
1554 } else {
1555 for (i=0; i < n_capabilities; i++) {
1556 if (capabilities[i]->disabled) {
1557 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1558 } else if (i == cap->no ||
1559 capabilities[i]->idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1560 idle_cap[i] = rtsFalse;
1561 } else {
1562 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1563 if (!idle_cap[i]) {
1564 n_failed_trygrab_idles++;
1565 } else {
1566 n_idle_caps++;
1567 }
1568 }
1569 }
1570 }
1571
1572 // We set the gc_thread[i]->idle flag if that
1573 // capability/thread is not participating in this collection.
1574 // We also keep a local record of which capabilities are idle
1575 // in idle_cap[], because scheduleDoGC() is re-entrant:
1576 // another thread might start a GC as soon as we've finished
1577 // this one, and thus the gc_thread[]->idle flags are invalid
1578 // as soon as we release any threads after GC. Getting this
1579 // wrong leads to a rare and hard to debug deadlock!
1580
1581 for (i=0; i < n_capabilities; i++) {
1582 gc_threads[i]->idle = idle_cap[i];
1583 capabilities[i]->idle++;
1584 }
1585
1586 // For all capabilities participating in this GC, wait until
1587 // they have stopped mutating and are standing by for GC.
1588 waitForGcThreads(cap);
1589
1590 #if defined(THREADED_RTS)
1591 // Stable point where we can do a global check on our spark counters
1592 ASSERT(checkSparkCountInvariant());
1593 #endif
1594 }
1595
1596 #endif
1597
1598 IF_DEBUG(scheduler, printAllThreads());
1599
1600 delete_threads_and_gc:
1601 /*
1602 * We now have all the capabilities; if we're in an interrupting
1603 * state, then we should take the opportunity to delete all the
1604 * threads in the system.
1605 * Checking for major_gc ensures that the last GC is major.
1606 */
1607 if (sched_state == SCHED_INTERRUPTING && major_gc) {
1608 deleteAllThreads(cap);
1609 #if defined(THREADED_RTS)
1610 // Discard all the sparks from every Capability. Why?
1611 // They'll probably be GC'd anyway since we've killed all the
1612 // threads. It just avoids the GC having to do any work to
1613 // figure out that any remaining sparks are garbage.
1614 for (i = 0; i < n_capabilities; i++) {
1615 capabilities[i]->spark_stats.gcd +=
1616 sparkPoolSize(capabilities[i]->sparks);
1617 // No race here since all Caps are stopped.
1618 discardSparksCap(capabilities[i]);
1619 }
1620 #endif
1621 sched_state = SCHED_SHUTTING_DOWN;
1622 }
1623
1624 /*
1625 * When there are disabled capabilities, we want to migrate any
1626 * threads away from them. Normally this happens in the
1627 * scheduler's loop, but only for unbound threads - it's really
1628 * hard for a bound thread to migrate itself. So we have another
1629 * go here.
1630 */
1631 #if defined(THREADED_RTS)
1632 for (i = enabled_capabilities; i < n_capabilities; i++) {
1633 Capability *tmp_cap, *dest_cap;
1634 tmp_cap = capabilities[i];
1635 ASSERT(tmp_cap->disabled);
1636 if (i != cap->no) {
1637 dest_cap = capabilities[i % enabled_capabilities];
1638 while (!emptyRunQueue(tmp_cap)) {
1639 tso = popRunQueue(tmp_cap);
1640 migrateThread(tmp_cap, tso, dest_cap);
1641 if (tso->bound) {
1642 traceTaskMigrate(tso->bound->task,
1643 tso->bound->task->cap,
1644 dest_cap);
1645 tso->bound->task->cap = dest_cap;
1646 }
1647 }
1648 }
1649 }
1650 #endif
1651
1652 #if defined(THREADED_RTS)
1653 // reset pending_sync *before* GC, so that when the GC threads
1654 // emerge they don't immediately re-enter the GC.
1655 pending_sync = 0;
1656 GarbageCollect(collect_gen, heap_census, gc_type, cap);
1657 #else
1658 GarbageCollect(collect_gen, heap_census, 0, cap);
1659 #endif
1660
1661 traceSparkCounters(cap);
1662
1663 switch (recent_activity) {
1664 case ACTIVITY_INACTIVE:
1665 if (force_major) {
1666 // We are doing a GC because the system has been idle for a
1667 // timeslice and we need to check for deadlock. Record the
1668 // fact that we've done a GC and turn off the timer signal;
1669 // it will get re-enabled if we run any threads after the GC.
1670 recent_activity = ACTIVITY_DONE_GC;
1671 #ifndef PROFILING
1672 stopTimer();
1673 #endif
1674 break;
1675 }
1676 // fall through...
1677
1678 case ACTIVITY_MAYBE_NO:
1679 // the GC might have taken long enough for the timer to set
1680 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1681 // but we aren't necessarily deadlocked:
1682 recent_activity = ACTIVITY_YES;
1683 break;
1684
1685 case ACTIVITY_DONE_GC:
1686 // If we are actually active, the scheduler will reset the
1687 // recent_activity flag and re-enable the timer.
1688 break;
1689 }
1690
1691 #if defined(THREADED_RTS)
1692 // Stable point where we can do a global check on our spark counters
1693 ASSERT(checkSparkCountInvariant());
1694 #endif
1695
1696 // The heap census itself is done during GarbageCollect().
1697 if (heap_census) {
1698 performHeapProfile = rtsFalse;
1699 }
1700
1701 #if defined(THREADED_RTS)
1702
1703 // If n_capabilities has changed during GC, we're in trouble.
1704 ASSERT(n_capabilities == old_n_capabilities);
1705
1706 if (gc_type == SYNC_GC_PAR)
1707 {
1708 releaseGCThreads(cap);
1709 for (i = 0; i < n_capabilities; i++) {
1710 if (i != cap->no) {
1711 if (idle_cap[i]) {
1712 ASSERT(capabilities[i]->running_task == task);
1713 task->cap = capabilities[i];
1714 releaseCapability(capabilities[i]);
1715 } else {
1716 ASSERT(capabilities[i]->running_task != task);
1717 }
1718 }
1719 }
1720 task->cap = cap;
1721 }
1722 #endif
1723
1724 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1725 // GC set the heap_overflow flag, so we should proceed with
1726 // an orderly shutdown now. Ultimately we want the main
1727 // thread to return to its caller with HeapExhausted, at which
1728 // point the caller should call hs_exit(). The first step is
1729 // to delete all the threads.
1730 //
1731 // Another way to do this would be to raise an exception in
1732 // the main thread, which we really should do because it gives
1733 // the program a chance to clean up. But how do we find the
1734 // main thread? It should presumably be the same one that
1735 // gets ^C exceptions, but that's all done on the Haskell side
1736 // (GHC.TopHandler).
1737 sched_state = SCHED_INTERRUPTING;
1738 goto delete_threads_and_gc;
1739 }
1740
1741 #ifdef SPARKBALANCE
1742 /* JB
1743 Once we are all together... this would be the place to balance all
1744 spark pools. No concurrent stealing or adding of new sparks can
1745 occur. Should be defined in Sparks.c. */
1746 balanceSparkPoolsCaps(n_capabilities, capabilities);
1747 #endif
1748
1749 #if defined(THREADED_RTS)
1750 if (gc_type == SYNC_GC_SEQ) {
1751 // release our stash of capabilities.
1752 releaseAllCapabilities(n_capabilities, cap, task);
1753 }
1754 #endif
1755
1756 return;
1757 }
1758
1759 /* ---------------------------------------------------------------------------
1760 * Singleton fork(). Do not copy any running threads.
1761 * ------------------------------------------------------------------------- */
1762
1763 pid_t
1764 forkProcess(HsStablePtr *entry
1765 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1766 STG_UNUSED
1767 #endif
1768 )
1769 {
1770 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1771 pid_t pid;
1772 StgTSO* t,*next;
1773 Capability *cap;
1774 nat g;
1775 Task *task = NULL;
1776 nat i;
1777 #ifdef THREADED_RTS
1778 nat sync;
1779 #endif
1780
1781 debugTrace(DEBUG_sched, "forking!");
1782
1783 task = newBoundTask();
1784
1785 cap = NULL;
1786 waitForCapability(&cap, task);
1787
1788 #ifdef THREADED_RTS
1789 do {
1790 sync = requestSync(&cap, task, SYNC_OTHER);
1791 } while (sync);
1792
1793 acquireAllCapabilities(cap,task);
1794
1795 pending_sync = 0;
1796 #endif
1797
1798 // no funny business: hold locks while we fork, otherwise if some
1799 // other thread is holding a lock when the fork happens, the data
1800 // structure protected by the lock will forever be in an
1801 // inconsistent state in the child. See also #1391.
1802 ACQUIRE_LOCK(&sched_mutex);
1803 ACQUIRE_LOCK(&sm_mutex);
1804 ACQUIRE_LOCK(&stable_mutex);
1805 ACQUIRE_LOCK(&task->lock);
1806
1807 for (i=0; i < n_capabilities; i++) {
1808 ACQUIRE_LOCK(&capabilities[i]->lock);
1809 }
1810
1811 #ifdef THREADED_RTS
1812 ACQUIRE_LOCK(&all_tasks_mutex);
1813 #endif
1814
1815 stopTimer(); // See #4074
1816
1817 #if defined(TRACING)
1818 flushEventLog(); // so that child won't inherit dirty file buffers
1819 #endif
1820
1821 pid = fork();
1822
1823 if (pid) { // parent
1824
1825 startTimer(); // #4074
1826
1827 RELEASE_LOCK(&sched_mutex);
1828 RELEASE_LOCK(&sm_mutex);
1829 RELEASE_LOCK(&stable_mutex);
1830 RELEASE_LOCK(&task->lock);
1831
1832 for (i=0; i < n_capabilities; i++) {
1833 releaseCapability_(capabilities[i],rtsFalse);
1834 RELEASE_LOCK(&capabilities[i]->lock);
1835 }
1836
1837 #ifdef THREADED_RTS
1838 RELEASE_LOCK(&all_tasks_mutex);
1839 #endif
1840
1841 boundTaskExiting(task);
1842
1843 // just return the pid
1844 return pid;
1845
1846 } else { // child
1847
1848 #if defined(THREADED_RTS)
1849 initMutex(&sched_mutex);
1850 initMutex(&sm_mutex);
1851 initMutex(&stable_mutex);
1852 initMutex(&task->lock);
1853
1854 for (i=0; i < n_capabilities; i++) {
1855 initMutex(&capabilities[i]->lock);
1856 }
1857
1858 initMutex(&all_tasks_mutex);
1859 #endif
1860
1861 #ifdef TRACING
1862 resetTracing();
1863 #endif
1864
1865 // Now, all OS threads except the thread that forked are
1866 // stopped. We need to stop all Haskell threads, including
1867 // those involved in foreign calls. Also we need to delete
1868 // all Tasks, because they correspond to OS threads that are
1869 // now gone.
1870
1871 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1872 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1873 next = t->global_link;
1874 // don't allow threads to catch the ThreadKilled
1875 // exception, but we do want to raiseAsync() because these
1876 // threads may be evaluating thunks that we need later.
1877 deleteThread_(t->cap,t);
1878
1879 // stop the GC from updating the InCall to point to
1880 // the TSO. This is only necessary because the
1881 // OSThread bound to the TSO has been killed, and
1882 // won't get a chance to exit in the usual way (see
1883 // also scheduleHandleThreadFinished).
1884 t->bound = NULL;
1885 }
1886 }
1887
1888 discardTasksExcept(task);
1889
1890 for (i=0; i < n_capabilities; i++) {
1891 cap = capabilities[i];
1892
1893 // Empty the run queue. It seems tempting to let all the
1894 // killed threads stay on the run queue as zombies to be
1895 // cleaned up later, but some of them may correspond to
1896 // bound threads for which the corresponding Task does not
1897 // exist.
1898 truncateRunQueue(cap);
1899
1900 // Any suspended C-calling Tasks are no more, their OS threads
1901 // don't exist now:
1902 cap->suspended_ccalls = NULL;
1903
1904 #if defined(THREADED_RTS)
1905 // Wipe our spare workers list, they no longer exist. New
1906 // workers will be created if necessary.
1907 cap->spare_workers = NULL;
1908 cap->n_spare_workers = 0;
1909 cap->returning_tasks_hd = NULL;
1910 cap->returning_tasks_tl = NULL;
1911 #endif
1912
1913 // Release all caps except 0, we'll use that for starting
1914 // the IO manager and running the client action below.
1915 if (cap->no != 0) {
1916 task->cap = cap;
1917 releaseCapability(cap);
1918 }
1919 }
1920 cap = capabilities[0];
1921 task->cap = cap;
1922
1923 // Empty the threads lists. Otherwise, the garbage
1924 // collector may attempt to resurrect some of these threads.
1925 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1926 generations[g].threads = END_TSO_QUEUE;
1927 }
1928
1929 // On Unix, all timers are reset in the child, so we need to start
1930 // the timer again.
1931 initTimer();
1932 startTimer();
1933
1934 // TODO: need to trace various other things in the child
1935 // like startup event, capabilities, process info etc
1936 traceTaskCreate(task, cap);
1937
1938 #if defined(THREADED_RTS)
1939 ioManagerStartCap(&cap);
1940 #endif
1941
1942 rts_evalStableIO(&cap, entry, NULL); // run the action
1943 rts_checkSchedStatus("forkProcess",cap);
1944
1945 rts_unlock(cap);
1946 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
1947 }
1948 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1949 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1950 #endif
1951 }
1952
1953 /* ---------------------------------------------------------------------------
1954 * Changing the number of Capabilities
1955 *
1956 * Changing the number of Capabilities is very tricky! We can only do
1957 * it with the system fully stopped, so we do a full sync with
1958 * requestSync(SYNC_OTHER) and grab all the capabilities.
1959 *
1960 * Then we resize the appropriate data structures, and update all
1961 * references to the old data structures which have now moved.
1962 * Finally we release the Capabilities we are holding, and start
1963 * worker Tasks on the new Capabilities we created.
1964 *
1965 * ------------------------------------------------------------------------- */
1966
1967 void
1968 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
1969 {
1970 #if !defined(THREADED_RTS)
1971 if (new_n_capabilities != 1) {
1972 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
1973 }
1974 return;
1975 #elif defined(NOSMP)
1976 if (new_n_capabilities != 1) {
1977 errorBelch("setNumCapabilities: not supported on this platform");
1978 }
1979 return;
1980 #else
1981 Task *task;
1982 Capability *cap;
1983 nat sync;
1984 nat n;
1985 Capability *old_capabilities = NULL;
1986 nat old_n_capabilities = n_capabilities;
1987
1988 if (new_n_capabilities == enabled_capabilities) return;
1989
1990 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
1991 enabled_capabilities, new_n_capabilities);
1992
1993 cap = rts_lock();
1994 task = cap->running_task;
1995
1996 do {
1997 sync = requestSync(&cap, task, SYNC_OTHER);
1998 } while (sync);
1999
2000 acquireAllCapabilities(cap,task);
2001
2002 pending_sync = 0;
2003
2004 if (new_n_capabilities < enabled_capabilities)
2005 {
2006 // Reducing the number of capabilities: we do not actually
2007 // remove the extra capabilities, we just mark them as
2008 // "disabled". This has the following effects:
2009 //
2010 // - threads on a disabled capability are migrated away by the
2011 // scheduler loop
2012 //
2013 // - disabled capabilities do not participate in GC
2014 // (see scheduleDoGC())
2015 //
2016 // - No spark threads are created on this capability
2017 // (see scheduleActivateSpark())
2018 //
2019 // - We do not attempt to migrate threads *to* a disabled
2020 // capability (see schedulePushWork()).
2021 //
2022 // but in other respects, a disabled capability remains
2023 // alive. Threads may be woken up on a disabled capability,
2024 // but they will be immediately migrated away.
2025 //
2026 // This approach is much easier than trying to actually remove
2027 // the capability; we don't have to worry about GC data
2028 // structures, the nursery, etc.
2029 //
2030 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2031 capabilities[n]->disabled = rtsTrue;
2032 traceCapDisable(capabilities[n]);
2033 }
2034 enabled_capabilities = new_n_capabilities;
2035 }
2036 else
2037 {
2038 // Increasing the number of enabled capabilities.
2039 //
2040 // enable any disabled capabilities, up to the required number
2041 for (n = enabled_capabilities;
2042 n < new_n_capabilities && n < n_capabilities; n++) {
2043 capabilities[n]->disabled = rtsFalse;
2044 traceCapEnable(capabilities[n]);
2045 }
2046 enabled_capabilities = n;
2047
2048 if (new_n_capabilities > n_capabilities) {
2049 #if defined(TRACING)
2050 // Allocate eventlog buffers for the new capabilities. Note this
2051 // must be done before calling moreCapabilities(), because that
2052 // will emit events about creating the new capabilities and adding
2053 // them to existing capsets.
2054 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2055 #endif
2056
2057 // Resize the capabilities array
2058 // NB. after this, capabilities points somewhere new. Any pointers
2059 // of type (Capability *) are now invalid.
2060 moreCapabilities(n_capabilities, new_n_capabilities);
2061
2062 // Resize and update storage manager data structures
2063 storageAddCapabilities(n_capabilities, new_n_capabilities);
2064 }
2065 }
2066
2067 // update n_capabilities before things start running
2068 if (new_n_capabilities > n_capabilities) {
2069 n_capabilities = enabled_capabilities = new_n_capabilities;
2070 }
2071
2072 // Start worker tasks on the new Capabilities
2073 startWorkerTasks(old_n_capabilities, new_n_capabilities);
2074
2075 // We're done: release the original Capabilities
2076 releaseAllCapabilities(old_n_capabilities, cap,task);
2077
2078 // We can't free the old array until now, because we access it
2079 // while updating pointers in updateCapabilityRefs().
2080 if (old_capabilities) {
2081 stgFree(old_capabilities);
2082 }
2083
2084 // Notify IO manager that the number of capabilities has changed.
2085 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2086
2087 rts_unlock(cap);
2088
2089 #endif // THREADED_RTS
2090 }
2091
2092
2093
2094 /* ---------------------------------------------------------------------------
2095 * Delete all the threads in the system
2096 * ------------------------------------------------------------------------- */
2097
2098 static void
2099 deleteAllThreads ( Capability *cap )
2100 {
2101 // NOTE: only safe to call if we own all capabilities.
2102
2103 StgTSO* t, *next;
2104 nat g;
2105
2106 debugTrace(DEBUG_sched,"deleting all threads");
2107 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2108 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2109 next = t->global_link;
2110 deleteThread(cap,t);
2111 }
2112 }
2113
2114 // The run queue now contains a bunch of ThreadKilled threads. We
2115 // must not throw these away: the main thread(s) will be in there
2116 // somewhere, and the main scheduler loop has to deal with it.
2117 // Also, the run queue is the only thing keeping these threads from
2118 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2119
2120 #if !defined(THREADED_RTS)
2121 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2122 ASSERT(sleeping_queue == END_TSO_QUEUE);
2123 #endif
2124 }
2125
2126 /* -----------------------------------------------------------------------------
2127 Managing the suspended_ccalls list.
2128 Locks required: sched_mutex
2129 -------------------------------------------------------------------------- */
2130
2131 STATIC_INLINE void
2132 suspendTask (Capability *cap, Task *task)
2133 {
2134 InCall *incall;
2135
2136 incall = task->incall;
2137 ASSERT(incall->next == NULL && incall->prev == NULL);
2138 incall->next = cap->suspended_ccalls;
2139 incall->prev = NULL;
2140 if (cap->suspended_ccalls) {
2141 cap->suspended_ccalls->prev = incall;
2142 }
2143 cap->suspended_ccalls = incall;
2144 }
2145
2146 STATIC_INLINE void
2147 recoverSuspendedTask (Capability *cap, Task *task)
2148 {
2149 InCall *incall;
2150
2151 incall = task->incall;
2152 if (incall->prev) {
2153 incall->prev->next = incall->next;
2154 } else {
2155 ASSERT(cap->suspended_ccalls == incall);
2156 cap->suspended_ccalls = incall->next;
2157 }
2158 if (incall->next) {
2159 incall->next->prev = incall->prev;
2160 }
2161 incall->next = incall->prev = NULL;
2162 }
2163
2164 /* ---------------------------------------------------------------------------
2165 * Suspending & resuming Haskell threads.
2166 *
2167 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2168 * its capability before calling the C function. This allows another
2169 * task to pick up the capability and carry on running Haskell
2170 * threads. It also means that if the C call blocks, it won't lock
2171 * the whole system.
2172 *
2173 * The Haskell thread making the C call is put to sleep for the
2174 * duration of the call, on the suspended_ccalling_threads queue. We
2175 * give out a token to the task, which it can use to resume the thread
2176 * on return from the C function.
2177 *
2178 * If this is an interruptible C call, this means that the FFI call may be
2179 * unceremoniously terminated and should be scheduled on an
2180 * unbound worker thread.
2181 * ------------------------------------------------------------------------- */
2182
2183 void *
2184 suspendThread (StgRegTable *reg, rtsBool interruptible)
2185 {
2186 Capability *cap;
2187 int saved_errno;
2188 StgTSO *tso;
2189 Task *task;
2190 #if mingw32_HOST_OS
2191 StgWord32 saved_winerror;
2192 #endif
2193
2194 saved_errno = errno;
2195 #if mingw32_HOST_OS
2196 saved_winerror = GetLastError();
2197 #endif
2198
2199 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2200 */
2201 cap = regTableToCapability(reg);
2202
2203 task = cap->running_task;
2204 tso = cap->r.rCurrentTSO;
2205
2206 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2207
2208 // XXX this might not be necessary --SDM
2209 tso->what_next = ThreadRunGHC;
2210
2211 threadPaused(cap,tso);
2212
2213 if (interruptible) {
2214 tso->why_blocked = BlockedOnCCall_Interruptible;
2215 } else {
2216 tso->why_blocked = BlockedOnCCall;
2217 }
2218
2219 // Hand back capability
2220 task->incall->suspended_tso = tso;
2221 task->incall->suspended_cap = cap;
2222
2223 // Otherwise allocate() will write to invalid memory.
2224 cap->r.rCurrentTSO = NULL;
2225
2226 ACQUIRE_LOCK(&cap->lock);
2227
2228 suspendTask(cap,task);
2229 cap->in_haskell = rtsFalse;
2230 releaseCapability_(cap,rtsFalse);
2231
2232 RELEASE_LOCK(&cap->lock);
2233
2234 errno = saved_errno;
2235 #if mingw32_HOST_OS
2236 SetLastError(saved_winerror);
2237 #endif
2238 return task;
2239 }
2240
2241 StgRegTable *
2242 resumeThread (void *task_)
2243 {
2244 StgTSO *tso;
2245 InCall *incall;
2246 Capability *cap;
2247 Task *task = task_;
2248 int saved_errno;
2249 #if mingw32_HOST_OS
2250 StgWord32 saved_winerror;
2251 #endif
2252
2253 saved_errno = errno;
2254 #if mingw32_HOST_OS
2255 saved_winerror = GetLastError();
2256 #endif
2257
2258 incall = task->incall;
2259 cap = incall->suspended_cap;
2260 task->cap = cap;
2261
2262 // Wait for permission to re-enter the RTS with the result.
2263 waitForCapability(&cap,task);
2264 // we might be on a different capability now... but if so, our
2265 // entry on the suspended_ccalls list will also have been
2266 // migrated.
2267
2268 // Remove the thread from the suspended list
2269 recoverSuspendedTask(cap,task);
2270
2271 tso = incall->suspended_tso;
2272 incall->suspended_tso = NULL;
2273 incall->suspended_cap = NULL;
2274 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2275
2276 traceEventRunThread(cap, tso);
2277
2278 /* Reset blocking status */
2279 tso->why_blocked = NotBlocked;
2280
2281 if ((tso->flags & TSO_BLOCKEX) == 0) {
2282 // avoid locking the TSO if we don't have to
2283 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2284 maybePerformBlockedException(cap,tso);
2285 }
2286 }
2287
2288 cap->r.rCurrentTSO = tso;
2289 cap->in_haskell = rtsTrue;
2290 errno = saved_errno;
2291 #if mingw32_HOST_OS
2292 SetLastError(saved_winerror);
2293 #endif
2294
2295 /* We might have GC'd, mark the TSO dirty again */
2296 dirty_TSO(cap,tso);
2297 dirty_STACK(cap,tso->stackobj);
2298
2299 IF_DEBUG(sanity, checkTSO(tso));
2300
2301 return &cap->r;
2302 }
2303
2304 /* ---------------------------------------------------------------------------
2305 * scheduleThread()
2306 *
2307 * scheduleThread puts a thread on the end of the runnable queue.
2308 * This will usually be done immediately after a thread is created.
2309 * The caller of scheduleThread must create the thread using e.g.
2310 * createThread and push an appropriate closure
2311 * on this thread's stack before the scheduler is invoked.
2312 * ------------------------------------------------------------------------ */
2313
2314 void
2315 scheduleThread(Capability *cap, StgTSO *tso)
2316 {
2317 // The thread goes at the *end* of the run-queue, to avoid possible
2318 // starvation of any threads already on the queue.
2319 appendToRunQueue(cap,tso);
2320 }
2321
2322 void
2323 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2324 {
2325 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2326 // move this thread from now on.
2327 #if defined(THREADED_RTS)
2328 cpu %= enabled_capabilities;
2329 if (cpu == cap->no) {
2330 appendToRunQueue(cap,tso);
2331 } else {
2332 migrateThread(cap, tso, capabilities[cpu]);
2333 }
2334 #else
2335 appendToRunQueue(cap,tso);
2336 #endif
2337 }
2338
2339 void
2340 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2341 {
2342 Task *task;
2343 DEBUG_ONLY( StgThreadID id );
2344 Capability *cap;
2345
2346 cap = *pcap;
2347
2348 // We already created/initialised the Task
2349 task = cap->running_task;
2350
2351 // This TSO is now a bound thread; make the Task and TSO
2352 // point to each other.
2353 tso->bound = task->incall;
2354 tso->cap = cap;
2355
2356 task->incall->tso = tso;
2357 task->incall->ret = ret;
2358 task->incall->rstat = NoStatus;
2359
2360 appendToRunQueue(cap,tso);
2361
2362 DEBUG_ONLY( id = tso->id );
2363 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2364
2365 cap = schedule(cap,task);
2366
2367 ASSERT(task->incall->rstat != NoStatus);
2368 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2369
2370 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2371 *pcap = cap;
2372 }
2373
2374 /* ----------------------------------------------------------------------------
2375 * Starting Tasks
2376 * ------------------------------------------------------------------------- */
2377
2378 #if defined(THREADED_RTS)
2379 void scheduleWorker (Capability *cap, Task *task)
2380 {
2381 // schedule() runs without a lock.
2382 cap = schedule(cap,task);
2383
2384 // On exit from schedule(), we have a Capability, but possibly not
2385 // the same one we started with.
2386
2387 // During shutdown, the requirement is that after all the
2388 // Capabilities are shut down, all workers that are shutting down
2389 // have finished workerTaskStop(). This is why we hold on to
2390 // cap->lock until we've finished workerTaskStop() below.
2391 //
2392 // There may be workers still involved in foreign calls; those
2393 // will just block in waitForCapability() because the
2394 // Capability has been shut down.
2395 //
2396 ACQUIRE_LOCK(&cap->lock);
2397 releaseCapability_(cap,rtsFalse);
2398 workerTaskStop(task);
2399 RELEASE_LOCK(&cap->lock);
2400 }
2401 #endif
2402
2403 /* ---------------------------------------------------------------------------
2404 * Start new worker tasks on Capabilities from--to
2405 * -------------------------------------------------------------------------- */
2406
2407 static void
2408 startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
2409 {
2410 #if defined(THREADED_RTS)
2411 nat i;
2412 Capability *cap;
2413
2414 for (i = from; i < to; i++) {
2415 cap = capabilities[i];
2416 ACQUIRE_LOCK(&cap->lock);
2417 startWorkerTask(cap);
2418 RELEASE_LOCK(&cap->lock);
2419 }
2420 #endif
2421 }
2422
2423 /* ---------------------------------------------------------------------------
2424 * initScheduler()
2425 *
2426 * Initialise the scheduler. This resets all the queues - if the
2427 * queues contained any threads, they'll be garbage collected at the
2428 * next pass.
2429 *
2430 * ------------------------------------------------------------------------ */
2431
2432 void
2433 initScheduler(void)
2434 {
2435 #if !defined(THREADED_RTS)
2436 blocked_queue_hd = END_TSO_QUEUE;
2437 blocked_queue_tl = END_TSO_QUEUE;
2438 sleeping_queue = END_TSO_QUEUE;
2439 #endif
2440
2441 sched_state = SCHED_RUNNING;
2442 recent_activity = ACTIVITY_YES;
2443
2444 #if defined(THREADED_RTS)
2445 /* Initialise the mutex and condition variables used by
2446 * the scheduler. */
2447 initMutex(&sched_mutex);
2448 #endif
2449
2450 ACQUIRE_LOCK(&sched_mutex);
2451
2452 /* A capability holds the state a native thread needs in
2453 * order to execute STG code. At least one capability is
2454 * floating around (only THREADED_RTS builds have more than one).
2455 */
2456 initCapabilities();
2457
2458 initTaskManager();
2459
2460 /*
2461 * Eagerly start one worker to run each Capability, except for
2462 * Capability 0. The idea is that we're probably going to start a
2463 * bound thread on Capability 0 pretty soon, so we don't want a
2464 * worker task hogging it.
2465 */
2466 startWorkerTasks(1, n_capabilities);
2467
2468 RELEASE_LOCK(&sched_mutex);
2469
2470 }
2471
2472 void
2473 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2474 /* see Capability.c, shutdownCapability() */
2475 {
2476 Task *task = NULL;
2477
2478 task = newBoundTask();
2479
2480 // If we haven't killed all the threads yet, do it now.
2481 if (sched_state < SCHED_SHUTTING_DOWN) {
2482 sched_state = SCHED_INTERRUPTING;
2483 Capability *cap = task->cap;
2484 waitForCapability(&cap,task);
2485 scheduleDoGC(&cap,task,rtsTrue);
2486 ASSERT(task->incall->tso == NULL);
2487 releaseCapability(cap);
2488 }
2489 sched_state = SCHED_SHUTTING_DOWN;
2490
2491 shutdownCapabilities(task, wait_foreign);
2492
2493 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2494 // n_failed_trygrab_idles, n_idle_caps);
2495
2496 boundTaskExiting(task);
2497 }
2498
2499 void
2500 freeScheduler( void )
2501 {
2502 nat still_running;
2503
2504 ACQUIRE_LOCK(&sched_mutex);
2505 still_running = freeTaskManager();
2506 // We can only free the Capabilities if there are no Tasks still
2507 // running. We might have a Task about to return from a foreign
2508 // call into waitForCapability(), for example (actually,
2509 // this should be the *only* thing that a still-running Task can
2510 // do at this point, and it will block waiting for the
2511 // Capability).
2512 if (still_running == 0) {
2513 freeCapabilities();
2514 }
2515 RELEASE_LOCK(&sched_mutex);
2516 #if defined(THREADED_RTS)
2517 closeMutex(&sched_mutex);
2518 #endif
2519 }
2520
2521 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2522 void *user USED_IF_NOT_THREADS)
2523 {
2524 #if !defined(THREADED_RTS)
2525 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2526 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2527 evac(user, (StgClosure **)(void *)&sleeping_queue);
2528 #endif
2529 }
2530
2531 /* -----------------------------------------------------------------------------
2532 performGC
2533
2534 This is the interface to the garbage collector from Haskell land.
2535 We provide this so that external C code can allocate and garbage
2536 collect when called from Haskell via _ccall_GC.
2537 -------------------------------------------------------------------------- */
2538
2539 static void
2540 performGC_(rtsBool force_major)
2541 {
2542 Task *task;
2543 Capability *cap = NULL;
2544
2545 // We must grab a new Task here, because the existing Task may be
2546 // associated with a particular Capability, and chained onto the
2547 // suspended_ccalls queue.
2548 task = newBoundTask();
2549
2550 // TODO: do we need to traceTask*() here?
2551
2552 waitForCapability(&cap,task);
2553 scheduleDoGC(&cap,task,force_major);
2554 releaseCapability(cap);
2555 boundTaskExiting(task);
2556 }
2557
2558 void
2559 performGC(void)
2560 {
2561 performGC_(rtsFalse);
2562 }
2563
2564 void
2565 performMajorGC(void)
2566 {
2567 performGC_(rtsTrue);
2568 }
2569
2570 /* ---------------------------------------------------------------------------
2571 Interrupt execution.
2572 Might be called inside a signal handler so it mustn't do anything fancy.
2573 ------------------------------------------------------------------------ */
2574
2575 void
2576 interruptStgRts(void)
2577 {
2578 sched_state = SCHED_INTERRUPTING;
2579 interruptAllCapabilities();
2580 #if defined(THREADED_RTS)
2581 wakeUpRts();
2582 #endif
2583 }
2584
2585 /* -----------------------------------------------------------------------------
2586 Wake up the RTS
2587
2588 This function causes at least one OS thread to wake up and run the
2589 scheduler loop. It is invoked when the RTS might be deadlocked, or
2590 an external event has arrived that may need servicing (eg. a
2591 keyboard interrupt).
2592
2593 In the single-threaded RTS we don't do anything here; we only have
2594 one thread anyway, and the event that caused us to want to wake up
2595 will have interrupted any blocking system call in progress anyway.
2596 -------------------------------------------------------------------------- */
2597
2598 #if defined(THREADED_RTS)
2599 void wakeUpRts(void)
2600 {
2601 // This forces the IO Manager thread to wakeup, which will
2602 // in turn ensure that some OS thread wakes up and runs the
2603 // scheduler loop, which will cause a GC and deadlock check.
2604 ioManagerWakeup();
2605 }
2606 #endif
2607
2608 /* -----------------------------------------------------------------------------
2609 Deleting threads
2610
2611 This is used for interruption (^C) and forking, and corresponds to
2612 raising an exception but without letting the thread catch the
2613 exception.
2614 -------------------------------------------------------------------------- */
2615
2616 static void
2617 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2618 {
2619 // NOTE: must only be called on a TSO that we have exclusive
2620 // access to, because we will call throwToSingleThreaded() below.
2621 // The TSO must be on the run queue of the Capability we own, or
2622 // we must own all Capabilities.
2623
2624 if (tso->why_blocked != BlockedOnCCall &&
2625 tso->why_blocked != BlockedOnCCall_Interruptible) {
2626 throwToSingleThreaded(tso->cap,tso,NULL);
2627 }
2628 }
2629
2630 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2631 static void
2632 deleteThread_(Capability *cap, StgTSO *tso)
2633 { // for forkProcess only:
2634 // like deleteThread(), but we delete threads in foreign calls, too.
2635
2636 if (tso->why_blocked == BlockedOnCCall ||
2637 tso->why_blocked == BlockedOnCCall_Interruptible) {
2638 tso->what_next = ThreadKilled;
2639 appendToRunQueue(tso->cap, tso);
2640 } else {
2641 deleteThread(cap,tso);
2642 }
2643 }
2644 #endif
2645
2646 /* -----------------------------------------------------------------------------
2647 raiseExceptionHelper
2648
2649 This function is called by the raise# primitve, just so that we can
2650 move some of the tricky bits of raising an exception from C-- into
2651 C. Who knows, it might be a useful re-useable thing here too.
2652 -------------------------------------------------------------------------- */
2653
2654 StgWord
2655 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2656 {
2657 Capability *cap = regTableToCapability(reg);
2658 StgThunk *raise_closure = NULL;
2659 StgPtr p, next;
2660 StgRetInfoTable *info;
2661 //
2662 // This closure represents the expression 'raise# E' where E
2663 // is the exception raise. It is used to overwrite all the
2664 // thunks which are currently under evaluataion.
2665 //
2666
2667 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2668 // LDV profiling: stg_raise_info has THUNK as its closure
2669 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2670 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2671 // 1 does not cause any problem unless profiling is performed.
2672 // However, when LDV profiling goes on, we need to linearly scan
2673 // small object pool, where raise_closure is stored, so we should
2674 // use MIN_UPD_SIZE.
2675 //
2676 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2677 // sizeofW(StgClosure)+1);
2678 //
2679
2680 //
2681 // Walk up the stack, looking for the catch frame. On the way,
2682 // we update any closures pointed to from update frames with the
2683 // raise closure that we just built.
2684 //
2685 p = tso->stackobj->sp;
2686 while(1) {
2687 info = get_ret_itbl((StgClosure *)p);
2688 next = p + stack_frame_sizeW((StgClosure *)p);
2689 switch (info->i.type) {
2690
2691 case UPDATE_FRAME:
2692 // Only create raise_closure if we need to.
2693 if (raise_closure == NULL) {
2694 raise_closure =
2695 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2696 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2697 raise_closure->payload[0] = exception;
2698 }
2699 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2700 (StgClosure *)raise_closure);
2701 p = next;
2702 continue;
2703
2704 case ATOMICALLY_FRAME:
2705 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2706 tso->stackobj->sp = p;
2707 return ATOMICALLY_FRAME;
2708
2709 case CATCH_FRAME:
2710 tso->stackobj->sp = p;
2711 return CATCH_FRAME;
2712
2713 case CATCH_STM_FRAME:
2714 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2715 tso->stackobj->sp = p;
2716 return CATCH_STM_FRAME;
2717
2718 case UNDERFLOW_FRAME:
2719 tso->stackobj->sp = p;
2720 threadStackUnderflow(cap,tso);
2721 p = tso->stackobj->sp;
2722 continue;
2723
2724 case STOP_FRAME:
2725 tso->stackobj->sp = p;
2726 return STOP_FRAME;
2727
2728 case CATCH_RETRY_FRAME: {
2729 StgTRecHeader *trec = tso -> trec;
2730 StgTRecHeader *outer = trec -> enclosing_trec;
2731 debugTrace(DEBUG_stm,
2732 "found CATCH_RETRY_FRAME at %p during raise", p);
2733 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2734 stmAbortTransaction(cap, trec);
2735 stmFreeAbortedTRec(cap, trec);
2736 tso -> trec = outer;
2737 p = next;
2738 continue;
2739 }
2740
2741 default:
2742 p = next;
2743 continue;
2744 }
2745 }
2746 }
2747
2748
2749 /* -----------------------------------------------------------------------------
2750 findRetryFrameHelper
2751
2752 This function is called by the retry# primitive. It traverses the stack
2753 leaving tso->sp referring to the frame which should handle the retry.
2754
2755 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2756 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2757
2758 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2759 create) because retries are not considered to be exceptions, despite the
2760 similar implementation.
2761
2762 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2763 not be created within memory transactions.
2764 -------------------------------------------------------------------------- */
2765
2766 StgWord
2767 findRetryFrameHelper (Capability *cap, StgTSO *tso)
2768 {
2769 StgPtr p, next;
2770 StgRetInfoTable *info;
2771
2772 p = tso->stackobj->sp;
2773 while (1) {
2774 info = get_ret_itbl((StgClosure *)p);
2775 next = p + stack_frame_sizeW((StgClosure *)p);
2776 switch (info->i.type) {
2777
2778 case ATOMICALLY_FRAME:
2779 debugTrace(DEBUG_stm,
2780 "found ATOMICALLY_FRAME at %p during retry", p);
2781 tso->stackobj->sp = p;
2782 return ATOMICALLY_FRAME;
2783
2784 case CATCH_RETRY_FRAME:
2785 debugTrace(DEBUG_stm,
2786 "found CATCH_RETRY_FRAME at %p during retry", p);
2787 tso->stackobj->sp = p;
2788 return CATCH_RETRY_FRAME;
2789
2790 case CATCH_STM_FRAME: {
2791 StgTRecHeader *trec = tso -> trec;
2792 StgTRecHeader *outer = trec -> enclosing_trec;
2793 debugTrace(DEBUG_stm,
2794 "found CATCH_STM_FRAME at %p during retry", p);
2795 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2796 stmAbortTransaction(cap, trec);
2797 stmFreeAbortedTRec(cap, trec);
2798 tso -> trec = outer;
2799 p = next;
2800 continue;
2801 }
2802
2803 case UNDERFLOW_FRAME:
2804 tso->stackobj->sp = p;
2805 threadStackUnderflow(cap,tso);
2806 p = tso->stackobj->sp;
2807 continue;
2808
2809 default:
2810 ASSERT(info->i.type != CATCH_FRAME);
2811 ASSERT(info->i.type != STOP_FRAME);
2812 p = next;
2813 continue;
2814 }
2815 }
2816 }
2817
2818 /* -----------------------------------------------------------------------------
2819 resurrectThreads is called after garbage collection on the list of
2820 threads found to be garbage. Each of these threads will be woken
2821 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2822 on an MVar, or NonTermination if the thread was blocked on a Black
2823 Hole.
2824
2825 Locks: assumes we hold *all* the capabilities.
2826 -------------------------------------------------------------------------- */
2827
2828 void
2829 resurrectThreads (StgTSO *threads)
2830 {
2831 StgTSO *tso, *next;
2832 Capability *cap;
2833 generation *gen;
2834
2835 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2836 next = tso->global_link;
2837
2838 gen = Bdescr((P_)tso)->gen;
2839 tso->global_link = gen->threads;
2840 gen->threads = tso;
2841
2842 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2843
2844 // Wake up the thread on the Capability it was last on
2845 cap = tso->cap;
2846
2847 switch (tso->why_blocked) {
2848 case BlockedOnMVar:
2849 case BlockedOnMVarRead:
2850 /* Called by GC - sched_mutex lock is currently held. */
2851 throwToSingleThreaded(cap, tso,
2852 (StgClosure *)blockedIndefinitelyOnMVar_closure);
2853 break;
2854 case BlockedOnBlackHole:
2855 throwToSingleThreaded(cap, tso,
2856 (StgClosure *)nonTermination_closure);
2857 break;
2858 case BlockedOnSTM:
2859 throwToSingleThreaded(cap, tso,
2860 (StgClosure *)blockedIndefinitelyOnSTM_closure);
2861 break;
2862 case NotBlocked:
2863 /* This might happen if the thread was blocked on a black hole
2864 * belonging to a thread that we've just woken up (raiseAsync
2865 * can wake up threads, remember...).
2866 */
2867 continue;
2868 case BlockedOnMsgThrowTo:
2869 // This can happen if the target is masking, blocks on a
2870 // black hole, and then is found to be unreachable. In
2871 // this case, we want to let the target wake up and carry
2872 // on, and do nothing to this thread.
2873 continue;
2874 default:
2875 barf("resurrectThreads: thread blocked in a strange way: %d",
2876 tso->why_blocked);
2877 }
2878 }
2879 }